Source code for bsrn.io.merra2

"""
MERRA-2 parquet loader for REST2 clear-sky inputs.

Fetches MERRA-2 parquet from Hugging Face (dazhiyang/bsrn-merra2) into RAM (no disk cache).
"""

import io
import pandas as pd
import requests

from bsrn.constants import MERRA2_HF_REPO_ID, HF_MAINTAINER_EMAIL


def _hf_fetch_to_memory(repo_id, filename):
    """
    Fetch one dataset file from Hugging Face into RAM (no disk cache; used by ``_fetch_merra2_from_hf``).

    Parameters
    ----------
    repo_id : str
        Hugging Face dataset repo id.
    filename : str
        Path within the repo (e.g. ``stn/stnMMYY_merra2.parquet``).

    Returns
    -------
    content : bytes
        Raw file bytes.

    Raises
    ------
    ImportError
        If ``huggingface_hub`` is not installed.
    FileNotFoundError
        On 404 or other fetch failure; message includes maintainer contact.
    """
    try:
        from huggingface_hub import hf_hub_url
    except ImportError:
        raise ImportError(
            "huggingface_hub is required for MERRA-2. Install with: pip install huggingface_hub"
        )
    print(f"Fetching MERRA-2 from Hugging Face: {filename}")
    try:
        url = hf_hub_url(
            repo_id=repo_id, filename=filename, repo_type="dataset"
        )
        resp = requests.get(url, timeout=60)
        resp.raise_for_status()
        return resp.content
    except requests.HTTPError as e:
        if e.response is not None and e.response.status_code == 404:
            raise FileNotFoundError(
                f"{filename} is not yet on huggingface, please contact the maintainer "
                f"Dazhi Yang at {HF_MAINTAINER_EMAIL} for update."
            ) from e
        raise FileNotFoundError(
            f"{filename} is not yet on huggingface, please contact the maintainer "
            f"Dazhi Yang at {HF_MAINTAINER_EMAIL} for update."
        ) from e
    except Exception as e:
        raise FileNotFoundError(
            f"{filename} is not yet on huggingface, please contact the maintainer "
            f"Dazhi Yang at {HF_MAINTAINER_EMAIL} for update."
        ) from e


def _fetch_merra2_from_hf(station_code, index):
    """
    Fetch one parquet per unique (year, month) in *index* from Hugging Face (used by ``fetch_rest2``).

    Parameters
    ----------
    station_code : str
        BSRN station code (case-insensitive).
    index : pd.DatetimeIndex
        Non-empty target index; months present determine which files are fetched.

    Returns
    -------
    contents : list of bytes
        One element per month, in sorted (year, month) order.

    Raises
    ------
    ValueError
        If *index* is empty.
    """
    if index.empty:
        raise ValueError("index must not be empty.")
    stn = station_code.lower()
    # Use (year, month) pairs to avoid PeriodIndex (which drops tz and triggers a warning)
    unique_pairs = set(zip(index.year, index.month))
    contents = []
    for year, month in sorted(unique_pairs):
        filename = f"{stn}{month:02d}{year % 100:02d}_merra2.parquet"
        hf_filename = f"{stn}/{filename}"
        content = _hf_fetch_to_memory(MERRA2_HF_REPO_ID, hf_filename)
        contents.append(content)
    return contents


def _load_merra2_parquet(path_or_bytes):
    """
    Load one MERRA-2 parquet into a UTC-indexed DataFrame (used by ``fetch_rest2`` only).

    Parameters
    ----------
    path_or_bytes : str, path-like, bytes, or file-like
        Path to the parquet file, or bytes content, or file-like object.

    Returns
    -------
    data : pd.DataFrame
        MERRA-2 data with columns AOD55, ALPHA, ALBEDO, TQV, TO3, PS
        and UTC DatetimeIndex.
    """
    if isinstance(path_or_bytes, bytes):
        path_or_bytes = io.BytesIO(path_or_bytes)
    data = pd.read_parquet(path_or_bytes)
    if not isinstance(data.index, pd.DatetimeIndex):
        raise ValueError(
            "MERRA-2 parquet must have DatetimeIndex."
        )
    if data.index.tz is None:
        data.index = data.index.tz_localize("UTC")
    else:
        data.index = data.index.tz_convert("UTC")
    return data


def _parse_merra2_for_rest2(raw, target_index):
    """
    Reindex, interpolate, derive BETA, convert units for REST2 (used by ``fetch_rest2`` only).

    Parameters
    ----------
    raw : pd.DataFrame
        MERRA-2 data with columns AOD55, ALPHA, ALBEDO, TQV, TO3, PS.
    target_index : pd.DatetimeIndex
        Target 1-min index, typically the BSRN time index.

    Returns
    -------
    rest2 : pd.DataFrame
        REST2-ready DataFrame with columns PS, ALBEDO, ALPHA, BETA, TO3, TQV.
    """
    if not isinstance(target_index, pd.DatetimeIndex):
        raise ValueError(
            "target_index must be a DatetimeIndex."
        )

    df = raw.copy()

    # Reindex to 1-min target index and interpolate
    df = df.reindex(target_index)
    num_cols = df.select_dtypes(include="number").columns
    if len(num_cols) > 0:
        df[num_cols] = df[num_cols].interpolate(method="time", limit_direction="both")

    # Unit conversions
    # PS: Pa → hPa (÷ 100)
    if "PS" in df.columns:
        df["PS"] = df["PS"] / 100.0
    # TQV: kg/m² → atm.cm (÷ 10)
    if "TQV" in df.columns:
        df["TQV"] = df["TQV"] / 10.0
    # TO3: Dobson → atm.cm (÷ 1000)
    if "TO3" in df.columns:
        df["TO3"] = df["TO3"] / 1000.0

    # Derive BETA = AOD55 * 0.55^ALPHA; replace AOD55=0 with 0.001 before
    if {"AOD55", "ALPHA"}.issubset(df.columns):
        aod55 = df["AOD55"].replace(0.0, 0.001)
        df["BETA"] = aod55 * (0.55 ** df["ALPHA"])
        df = df.drop(columns=["AOD55"])

    return df


[docs] def fetch_rest2(index, station_code): """ Fetch MERRA-2 from Hugging Face and return REST2-ready inputs aligned to target index. Parallel to fetch_mcclear; both return aligned clear-sky inputs. Parameters ---------- index : pd.DatetimeIndex Target time index to align REST2 outputs to. station_code : str BSRN station code (e.g. "QIQ"). Returns ------- aligned : pd.DataFrame REST2 inputs reindexed to `index` with columns PS, ALBEDO, ALPHA, BETA, TO3, TQV. Raises ------ ValueError If ``index`` is not a :class:`~pandas.DatetimeIndex` or is empty. FileNotFoundError If a required monthly parquet is missing on Hugging Face. """ if not isinstance(index, pd.DatetimeIndex): raise ValueError( "index must be a pandas DatetimeIndex." ) if index.empty: raise ValueError("index must not be empty.") contents = _fetch_merra2_from_hf(station_code, index) dfs = [_load_merra2_parquet(c) for c in contents] raw = pd.concat(dfs).sort_index() raw = raw[~raw.index.duplicated(keep="first")] rest2 = _parse_merra2_for_rest2(raw, index) return rest2