Source code for bsrn.utils.averaging

"""
Explicit time-window averages for :class:`~pandas.DatetimeIndex` frames (LR0100-style).

This is **not** :meth:`pandas.DataFrame.resample` semantics. For **floor / ceiling / center**
windows, monthly label trimming, coverage rules, and examples, see
``docs/tutorials/4.time_averaging.ipynb``.
"""

import numpy as np
import pandas as pd

_MIN_VALID_FRACTION = 0.5  # Strict majority (> half) for coverage checks.


def _period_delta(freq):
    """
    Map a fixed pandas frequency string to window length Δ.

    Parameters
    ----------
    freq : str
        Fixed offset alias (e.g. ``'30min'``, ``'1h'``). Must resolve to a fixed step.

    Returns
    -------
    pandas.Timedelta
        Window length Δ.

    Raises
    ------
    ValueError
        If ``freq`` is not a fixed frequency.
    """
    off = pd.tseries.frequencies.to_offset(freq)
    try:
        return pd.Timedelta(off)
    except (ValueError, TypeError) as e:
        raise ValueError(
            f"freq {freq!r} must be a fixed frequency (e.g. '30min', '1h')."
        ) from e


def _archive_timestep_1_or_3(index):
    """
    Infer native timestep as 1 or 3 minutes from median index spacing (BSRN archives).

    Parameters
    ----------
    index : pandas.DatetimeIndex
        Input timestamps.

    Returns
    -------
    pandas.Timedelta
        ``Timedelta(minutes=1)`` or ``Timedelta(minutes=3)`` (fallback: 1 min).
    """
    if len(index) < 2:
        return pd.Timedelta(minutes=1)
    d = pd.Series(index).diff().median()
    if pd.isna(d) or d <= pd.Timedelta(0):
        return pd.Timedelta(minutes=1)
    sec = float(d.total_seconds())
    if 0.5 * 60 <= sec <= 1.5 * 60:
        return pd.Timedelta(minutes=1)
    if 2.5 * 60 <= sec <= 3.5 * 60:
        return pd.Timedelta(minutes=3)
    return pd.Timedelta(minutes=1)


def _label_grid(index, freq):
    """
    Build a regular label grid from ``floor(min)`` through ``ceil(max)`` at ``freq``.

    Parameters
    ----------
    index : pandas.DatetimeIndex
        Data extent.
    freq : str
        Bin frequency.

    Returns
    -------
    pandas.DatetimeIndex
        Labels inclusive of endpoints; empty if invalid range.
    """
    lo = index.min().floor(freq)
    hi = index.max().ceil(freq)
    if lo > hi:
        return pd.DatetimeIndex([], tz=index.tz)
    return pd.date_range(lo, hi, freq=freq, inclusive="both", tz=index.tz)


def _trim_labels_for_alignment(labels, index, freq, alignment,
                               match_ceiling_labels=True):
    """
    Drop edge labels that would mix months (align with floor / ceiling / center grids).

    Parameters
    ----------
    labels : pandas.DatetimeIndex
        Full grid from :func:`_label_grid`.
    index : pandas.DatetimeIndex
        Actual data index (defines month span ``lo``, ``hi``).
    freq : str
        Bin frequency.
    alignment : {'floor', 'ceiling', 'center'}
        Window alignment.
    match_ceiling_labels : bool, default True
        For ``center`` only: if True, apply ceiling-style trim (``labels > lo``); else floor-style
        (``labels < hi``).

    Returns
    -------
    pandas.DatetimeIndex
        Trimmed labels.

    Raises
    ------
    ValueError
        If ``alignment`` is not recognized.
    """
    if len(labels) == 0:
        return labels
    lo = index.min().floor(freq)
    hi = index.max().ceil(freq)
    if alignment == "floor":
        return labels[labels < hi]
    if alignment == "ceiling":
        return labels[labels > lo]
    if alignment == "center":
        if match_ceiling_labels:
            return labels[labels > lo]
        return labels[labels < hi]
    raise ValueError(f"Unknown alignment: {alignment!r}")


def _window_mask(index, L, delta, alignment, resolution):
    """
    Boolean mask: timestamps belonging to the averaging window for label ``L``.

    Parameters
    ----------
    index : pandas.DatetimeIndex
        Row timestamps.
    L : pandas.Timestamp
        Label (bin anchor).
    delta : pandas.Timedelta
        Bin length Δ.
    alignment : {'floor', 'ceiling', 'center'}
        Window alignment (see module docstring).
    resolution : pandas.Timedelta
        Native timestep; shifts the **center** window start by ``+resolution``.

    Returns
    -------
    numpy.ndarray
        Boolean array, same length as ``index``.

    Raises
    ------
    ValueError
        If ``alignment`` is not recognized.
    """
    half = delta / 2
    if alignment == "floor":
        return (index >= L) & (index < L + delta)
    if alignment == "ceiling":
        return (index > L - delta) & (index <= L)
    if alignment == "center":
        return (index >= L - half + resolution) & (index <= L + half)
    raise ValueError(f"Unknown alignment: {alignment!r}")


def _expected_timesteps(delta, resolution):
    """
    Nominal number of native steps in one bin (for coverage checks, floor/ceiling).

    Parameters
    ----------
    delta : pandas.Timedelta
        Bin length.
    resolution : pandas.Timedelta
        Native timestep.

    Returns
    -------
    int
        At least 1.
    """
    return max(1, int(round(float(delta / resolution))))


def _count_valid_timesteps(part, df_columns_numeric):
    """
    Count rows with at least one finite numeric value in selected columns.

    Parameters
    ----------
    part : pandas.DataFrame
        Window subset.
    df_columns_numeric : list of str
        Column names to test (numeric dtypes).

    Returns
    -------
    int
        Row count.
    """
    if part.empty:
        return 0
    if len(df_columns_numeric) == 0:
        return len(part)
    return int(part[df_columns_numeric].notna().any(axis=1).sum())


def _nan_row_like(df):
    """
    One row of NaNs with the same columns as ``df`` (placeholder for insufficient coverage).

    Parameters
    ----------
    df : pandas.DataFrame
        Template frame.

    Returns
    -------
    pandas.Series
        All-NaN row.
    """
    return pd.Series(np.nan, index=df.columns)


def _finalize_row(agg, df):
    """
    Normalize aggregation output to a ``Series`` aligned to ``df.columns``.

    Parameters
    ----------
    agg : pandas.Series or scalar or mapping
        Result of ``aggfunc``.
    df : pandas.DataFrame
        Column order reference.

    Returns
    -------
    pandas.Series
        Reindexed series.
    """
    if isinstance(agg, pd.Series):
        s = agg
    else:
        s = pd.Series(agg)
    return s.reindex(df.columns)


def _aggregate(part, aggfunc):
    """
    Apply ``aggfunc`` to ``part`` (string name or callable).

    Parameters
    ----------
    part : pandas.DataFrame
        Window subset.
    aggfunc : str or callable
        ``'mean'``, ``'sum'``, ``'median'``, any name accepted by ``DataFrame.agg``, or
        ``callable(DataFrame) -> Series|scalar``.

    Returns
    -------
    scalar or pandas.Series
        Aggregation result.
    """
    if isinstance(aggfunc, str):
        if aggfunc == "mean":
            return part.mean(numeric_only=True)
        if aggfunc == "sum":
            return part.sum(numeric_only=True)
        if aggfunc == "median":
            return part.median(numeric_only=True)
        return part.agg(aggfunc)
    return aggfunc(part)


[docs] def pretty_average(df, freq, alignment="ceiling", aggfunc="mean", resolution=None, match_ceiling_labels=True): """ Average ``df`` over explicit labeled windows (not pandas ``resample`` semantics). Semantics and examples: ``docs/tutorials/4.time_averaging.ipynb``. Parameters ---------- df : pandas.DataFrame Must use a :class:`~pandas.DatetimeIndex`. freq : str Fixed bin frequency (e.g. ``'1h'``, ``'30min'``). alignment : {'floor', 'ceiling', 'center'}, default ``'ceiling'`` **floor** ``[L, L+Δ)`` · **ceiling** ``(L-Δ, L]`` · **center** ``[L-Δ/2+res, L+Δ/2]``. Definitions: tutorial ``docs/tutorials/4.time_averaging.ipynb``. aggfunc : str or callable, default ``'mean'`` Passed to :func:`_aggregate`. resolution : pandas.Timedelta or None, default None Native timestep for **center** windows; if None, inferred as 1 or 3 min via :func:`_archive_timestep_1_or_3`. match_ceiling_labels : bool, default True When ``alignment='center'``, trim monthly edge labels like **ceiling** (default) or **floor**. Returns ------- pandas.DataFrame One row per output label; insufficient coverage yields NaN rows (labels retained). Raises ------ TypeError If ``df.index`` is not a :class:`~pandas.DatetimeIndex`. """ if not isinstance(df.index, pd.DatetimeIndex): raise TypeError( "pretty_average requires a DatetimeIndex; set df.index or use set_index." ) if df.empty: return df.copy() delta = _period_delta(freq) res = resolution if resolution is not None else _archive_timestep_1_or_3(df.index) labels = _label_grid(df.index, freq) labels = _trim_labels_for_alignment( labels, df.index, freq, alignment, match_ceiling_labels=match_ceiling_labels ) if len(labels) == 0: return df.iloc[0:0].copy() rows = [] out_labels = [] num_cols = list(df.select_dtypes(include=["number"]).columns) n_exp = _expected_timesteps(delta, res) for L in labels: mask = _window_mask(df.index, L, delta, alignment, res) part = df.loc[mask] n_valid = _count_valid_timesteps(part, num_cols) if alignment == "center": n_den = len(part) insufficient = n_den == 0 or n_valid <= n_den * _MIN_VALID_FRACTION else: insufficient = n_valid <= n_exp * _MIN_VALID_FRACTION if insufficient: rows.append(_nan_row_like(df)) out_labels.append(L) continue rows.append(_finalize_row(_aggregate(part, aggfunc), df)) out_labels.append(L) if not rows: return df.iloc[0:0].copy() out = pd.DataFrame(rows, index=pd.DatetimeIndex(out_labels, tz=df.index.tz)) return out