Source code for bsrn.utils.averaging

"""
Explicit time-window averages for :class:`~pandas.DatetimeIndex` frames (LR0100-style).

This is **not** :meth:`pandas.DataFrame.resample` semantics. For **floor / ceiling / center**
windows, monthly label trimming, coverage rules, and examples, see
``docs/tutorials/3.time_averaging.ipynb``.

非 ``resample`` 语义;窗定义、月界裁剪、覆盖规则与示例见上述教程笔记本。
"""

import numpy as np
import pandas as pd

_MIN_VALID_FRACTION = 0.5  # Strict majority (> half) for coverage checks. / 覆盖判定的严格多数


def _period_delta(freq):
    """
    Map a fixed pandas frequency string to window length Δ.
    将固定 pandas 频率字符串映射为窗长 Δ。

    Parameters
    ----------
    freq : str
        Fixed offset alias (e.g. ``'30min'``, ``'1h'``). Must resolve to a fixed step.
        固定偏移别名(如 ``'30min'``、``'1h'``),须为固定步长。

    Returns
    -------
    pandas.Timedelta
        Window length Δ. / 窗长 Δ。

    Raises
    ------
    ValueError
        If ``freq`` is not a fixed frequency. / ``freq`` 非固定频率时抛出。
    """
    off = pd.tseries.frequencies.to_offset(freq)
    try:
        return pd.Timedelta(off)
    except (ValueError, TypeError) as e:
        raise ValueError(
            f"freq {freq!r} must be a fixed frequency (e.g. '30min', '1h'). / "
            "需要固定频率(如 '30min', '1h')。"
        ) from e


def _archive_timestep_1_or_3(index):
    """
    Infer native timestep as 1 or 3 minutes from median index spacing (BSRN archives).
    由索引步长中位数推断 1 或 3 分钟步长(BSRN 存档惯例)。

    Parameters
    ----------
    index : pandas.DatetimeIndex
        Input timestamps. / 输入时间戳。

    Returns
    -------
    pandas.Timedelta
        ``Timedelta(minutes=1)`` or ``Timedelta(minutes=3)`` (fallback: 1 min).
        ``Timedelta(minutes=1)`` 或 ``Timedelta(minutes=3)``(默认回退 1 分钟)。
    """
    if len(index) < 2:
        return pd.Timedelta(minutes=1)
    d = pd.Series(index).diff().median()
    if pd.isna(d) or d <= pd.Timedelta(0):
        return pd.Timedelta(minutes=1)
    sec = float(d.total_seconds())
    if 0.5 * 60 <= sec <= 1.5 * 60:
        return pd.Timedelta(minutes=1)
    if 2.5 * 60 <= sec <= 3.5 * 60:
        return pd.Timedelta(minutes=3)
    return pd.Timedelta(minutes=1)


def _label_grid(index, freq):
    """
    Build a regular label grid from ``floor(min)`` through ``ceil(max)`` at ``freq``.
    自 ``floor(min)`` 至 ``ceil(max)`` 按 ``freq`` 生成规则标签网格。

    Parameters
    ----------
    index : pandas.DatetimeIndex
        Data extent. / 数据时间范围。
    freq : str
        Bin frequency. / 分箱频率。

    Returns
    -------
    pandas.DatetimeIndex
        Labels inclusive of endpoints; empty if invalid range. / 含端点的标签;范围无效则为空。
    """
    lo = index.min().floor(freq)
    hi = index.max().ceil(freq)
    if lo > hi:
        return pd.DatetimeIndex([], tz=index.tz)
    return pd.date_range(lo, hi, freq=freq, inclusive="both", tz=index.tz)


def _trim_labels_for_alignment(labels, index, freq, alignment,
                               match_ceiling_labels=True):
    """
    Drop edge labels that would mix months (align with floor / ceiling / center grids).
    去掉跨月边界的标签,使与 floor / ceiling / center 网格一致。

    Parameters
    ----------
    labels : pandas.DatetimeIndex
        Full grid from :func:`_label_grid`. / :func:`_label_grid` 的完整网格。
    index : pandas.DatetimeIndex
        Actual data index (defines month span ``lo``, ``hi``). / 实际数据索引(定义月界 ``lo``、``hi``)。
    freq : str
        Bin frequency. / 分箱频率。
    alignment : {'floor', 'ceiling', 'center'}
        Window alignment. / 窗对齐方式。
    match_ceiling_labels : bool, default True
        For ``center`` only: if True, apply ceiling-style trim (``labels > lo``); else floor-style
        (``labels < hi``). / 仅 ``center``:True 时与 ceiling 同裁剪,False 时与 floor 同裁剪。

    Returns
    -------
    pandas.DatetimeIndex
        Trimmed labels. / 裁剪后的标签。

    Raises
    ------
    ValueError
        If ``alignment`` is not recognized. / ``alignment`` 未识别时抛出。
    """
    if len(labels) == 0:
        return labels
    lo = index.min().floor(freq)
    hi = index.max().ceil(freq)
    if alignment == "floor":
        return labels[labels < hi]
    if alignment == "ceiling":
        return labels[labels > lo]
    if alignment == "center":
        if match_ceiling_labels:
            return labels[labels > lo]
        return labels[labels < hi]
    raise ValueError(f"Unknown alignment: {alignment!r}")


def _window_mask(index, L, delta, alignment, resolution):
    """
    Boolean mask: timestamps belonging to the averaging window for label ``L``.
    布尔掩码:属于标签 ``L`` 对应平均窗的时间戳。

    Parameters
    ----------
    index : pandas.DatetimeIndex
        Row timestamps. / 行时间戳。
    L : pandas.Timestamp
        Label (bin anchor). / 标签(分箱锚点)。
    delta : pandas.Timedelta
        Bin length Δ. / 分箱长度 Δ。
    alignment : {'floor', 'ceiling', 'center'}
        Window alignment (see module docstring). / 窗对齐(见模块说明)。
    resolution : pandas.Timedelta
        Native timestep; shifts the **center** window start by ``+resolution``.
        原生步长;**center** 窗左端为 ``L - Δ/2 + resolution``。

    Returns
    -------
    numpy.ndarray
        Boolean array, same length as ``index``. / 与 ``index`` 等长的布尔数组。

    Raises
    ------
    ValueError
        If ``alignment`` is not recognized. / ``alignment`` 未识别时抛出。
    """
    half = delta / 2
    if alignment == "floor":
        return (index >= L) & (index < L + delta)
    if alignment == "ceiling":
        return (index > L - delta) & (index <= L)
    if alignment == "center":
        return (index >= L - half + resolution) & (index <= L + half)
    raise ValueError(f"Unknown alignment: {alignment!r}")


def _expected_timesteps(delta, resolution):
    """
    Nominal number of native steps in one bin (for coverage checks, floor/ceiling).
    单个分箱内名义原生步数(用于 floor/ceiling 覆盖检查)。

    Parameters
    ----------
    delta : pandas.Timedelta
        Bin length. / 分箱长度。
    resolution : pandas.Timedelta
        Native timestep. / 原生步长。

    Returns
    -------
    int
        At least 1. / 至少为 1。
    """
    return max(1, int(round(float(delta / resolution))))


def _count_valid_timesteps(part, df_columns_numeric):
    """
    Count rows with at least one finite numeric value in selected columns.
    统计所选列中至少有一个有限数值的行数。

    Parameters
    ----------
    part : pandas.DataFrame
        Window subset. / 窗口子集。
    df_columns_numeric : list of str
        Column names to test (numeric dtypes). / 要检测的数值列名。

    Returns
    -------
    int
        Row count. / 行数。
    """
    if part.empty:
        return 0
    if len(df_columns_numeric) == 0:
        return len(part)
    return int(part[df_columns_numeric].notna().any(axis=1).sum())


def _nan_row_like(df):
    """
    One row of NaNs with the same columns as ``df`` (placeholder for insufficient coverage).
    与 ``df`` 同列的一行 NaN(覆盖不足时的占位)。

    Parameters
    ----------
    df : pandas.DataFrame
        Template frame. / 模板表。

    Returns
    -------
    pandas.Series
        All-NaN row. / 全 NaN 行。
    """
    return pd.Series(np.nan, index=df.columns)


def _finalize_row(agg, df):
    """
    Normalize aggregation output to a ``Series`` aligned to ``df.columns``.
    将聚合结果规范为与 ``df.columns`` 对齐的 ``Series``。

    Parameters
    ----------
    agg : pandas.Series or scalar or mapping
        Result of ``aggfunc``. / ``aggfunc`` 的返回值。
    df : pandas.DataFrame
        Column order reference. / 列顺序参考。

    Returns
    -------
    pandas.Series
        Reindexed series. / 重索引后的序列。
    """
    if isinstance(agg, pd.Series):
        s = agg
    else:
        s = pd.Series(agg)
    return s.reindex(df.columns)


def _aggregate(part, aggfunc):
    """
    Apply ``aggfunc`` to ``part`` (string name or callable).
    对 ``part`` 应用 ``aggfunc``(字符串名或可调用对象)。

    Parameters
    ----------
    part : pandas.DataFrame
        Window subset. / 窗口子集。
    aggfunc : str or callable
        ``'mean'``, ``'sum'``, ``'median'``, any name accepted by ``DataFrame.agg``, or
        ``callable(DataFrame) -> Series|scalar``.
        ``'mean'``、``'sum'``、``'median'``、``DataFrame.agg`` 支持的名称,或 ``callable``。

    Returns
    -------
    scalar or pandas.Series
        Aggregation result. / 聚合结果。
    """
    if isinstance(aggfunc, str):
        if aggfunc == "mean":
            return part.mean(numeric_only=True)
        if aggfunc == "sum":
            return part.sum(numeric_only=True)
        if aggfunc == "median":
            return part.median(numeric_only=True)
        return part.agg(aggfunc)
    return aggfunc(part)


[docs] def pretty_average(df, freq, alignment="ceiling", aggfunc="mean", resolution=None, match_ceiling_labels=True): """ Average ``df`` over explicit labeled windows (not pandas ``resample`` semantics). 按显式标签窗聚合(非 ``resample``)。语义与示例见 ``docs/tutorials/3.time_averaging.ipynb``。 Parameters ---------- df : pandas.DataFrame Must use a :class:`~pandas.DatetimeIndex`. 须为 :class:`~pandas.DatetimeIndex`。 freq : str Fixed bin frequency (e.g. ``'1h'``, ``'30min'``). / 固定分箱频率。 alignment : {'floor', 'ceiling', 'center'}, default ``'ceiling'`` **floor** ``[L, L+Δ)`` · **ceiling** ``(L-Δ, L]`` · **center** ``[L-Δ/2+res, L+Δ/2]``. 定义见教程 ``docs/tutorials/3.time_averaging.ipynb``。 aggfunc : str or callable, default ``'mean'`` Passed to :func:`_aggregate`. / 传入 :func:`_aggregate`。 resolution : pandas.Timedelta or None, default None Native timestep for **center** windows; if None, inferred as 1 or 3 min via :func:`_archive_timestep_1_or_3`. / **center** 窗的原生步长;None 时由 :func:`_archive_timestep_1_or_3` 推断为 1 或 3 分钟。 match_ceiling_labels : bool, default True When ``alignment='center'``, trim monthly edge labels like **ceiling** (default) or **floor**. ``alignment='center'`` 时,月界裁剪与 **ceiling**(默认)或 **floor** 一致。 Returns ------- pandas.DataFrame One row per output label; insufficient coverage yields NaN rows (labels retained). 每个输出标签一行;覆盖不足时保留标签但行为 NaN。 Raises ------ TypeError If ``df.index`` is not a :class:`~pandas.DatetimeIndex`. / 索引非 :class:`~pandas.DatetimeIndex`。 """ if not isinstance(df.index, pd.DatetimeIndex): raise TypeError( "pretty_average requires a DatetimeIndex; set df.index or use set_index. / " "pretty_average 需要 DatetimeIndex。" ) if df.empty: return df.copy() delta = _period_delta(freq) res = resolution if resolution is not None else _archive_timestep_1_or_3(df.index) labels = _label_grid(df.index, freq) labels = _trim_labels_for_alignment( labels, df.index, freq, alignment, match_ceiling_labels=match_ceiling_labels ) if len(labels) == 0: return df.iloc[0:0].copy() rows = [] out_labels = [] num_cols = list(df.select_dtypes(include=["number"]).columns) n_exp = _expected_timesteps(delta, res) for L in labels: mask = _window_mask(df.index, L, delta, alignment, res) part = df.loc[mask] n_valid = _count_valid_timesteps(part, num_cols) if alignment == "center": n_den = len(part) insufficient = n_den == 0 or n_valid <= n_den * _MIN_VALID_FRACTION else: insufficient = n_valid <= n_exp * _MIN_VALID_FRACTION if insufficient: rows.append(_nan_row_like(df)) out_labels.append(L) continue rows.append(_finalize_row(_aggregate(part, aggfunc), df)) out_labels.append(L) if not rows: return df.iloc[0:0].copy() out = pd.DataFrame(rows, index=pd.DatetimeIndex(out_labels, tz=df.index.tz)) return out