Source code for bsrn.utils.quality

"""
Quality control summary for BSRN data.
"""

import numpy as np
import pandas as pd
import bsrn.physics.geometry as geometry
import bsrn.qc.ppl as ppl
import bsrn.qc.erl as erl
import bsrn.qc.closure as closure
import bsrn.qc.k_index as k_index
import bsrn.qc.diff_ratio as diff_ratio
import bsrn.qc.tracker as tracker
from bsrn.modeling.clear_sky import add_clearsky_columns

_BSRN_MISSING_NUMERIC = (-999.0, -99.9, -99.99)
_BSRN_MISSING_ATOL = 5e-3


def _series_not_missing(s):
    """
    True where ``s`` is a measured value (not NaN / not BSRN missing codes).

    Parameters
    ----------
    s : pandas.Series
        Input column.

    Returns
    -------
    pandas.Series
        Boolean mask aligned to ``s.index``.
    """
    x = pd.to_numeric(s, errors="coerce")
    mask = x.notna().to_numpy(dtype=bool, copy=True)
    vals = x.to_numpy(dtype=np.float64, copy=True)
    for m in _BSRN_MISSING_NUMERIC:
        mask &= ~np.isclose(vals, m, rtol=0.0, atol=_BSRN_MISSING_ATOL, equal_nan=False)
    return pd.Series(mask, index=s.index)


def _series_finite_numeric(s):
    """
    True where ``s`` coerces to a finite float.

    Parameters
    ----------
    s : pandas.Series
        Input column.

    Returns
    -------
    pandas.Series
        Boolean mask aligned to ``s.index``.
    """
    x = pd.to_numeric(s, errors="coerce")
    v = x.to_numpy(dtype=np.float64, copy=False)
    ok = x.notna().to_numpy(dtype=bool, copy=True) & np.isfinite(v)
    return pd.Series(ok, index=s.index)


[docs] def get_daily_stats(df, lat, lon, elev, station_code=None): """ Calculate daily QC statistics and sunshine duration. Parameters ---------- df : pd.DataFrame BSRN data archive. lat : float Latitude. [degrees] lon : float Longitude. [degrees] elev : float Elevation. [m] station_code : str, optional BSRN station abbreviation. If ``ghi_clear`` / ``bni_clear`` are not on ``df``, used with :func:`~bsrn.modeling.clear_sky.add_clearsky_columns` so the **tracker** row matches :func:`~bsrn.qc.wrapper.run_qc` (Ineichen references). Returns ------- daily_df : pd.DataFrame Daily counts of flags and sunshine duration metrics. Notes ----- Minutes with BSRN missing codes (``-999``, ``-99.9``, ``-99.99``) or NaN in the relevant input column do not add to per-test failure sums. ``is_sunshine`` only counts minutes with a valid (non-missing) BNI value above the threshold. **TRACKER** uses Ineichen ``ghi_clear`` / ``bni_clear`` like ``run_qc`` (not the GHIE-only fallback). Pass ``station_code`` so Linke turbidity matches the registry; if only ``lat`` / ``lon`` / ``elev`` are available, clear-sky is still computed but turbidity defaults may differ from ``run_qc`` with a named station. """ # Calculate solar geometry solpos = geometry.get_solar_position(df.index, lat, lon, elev) zenith = solpos["zenith"] bni_extra = geometry.get_bni_extra(df.index) ghi_extra = geometry.get_ghi_extra(df.index, zenith) nm = _series_not_missing g, b, d, lwd = df["ghi"], df["bni"], df["dhi"], df["lwd"] nm_g, nm_b, nm_d, nm_l = nm(g), nm(b), nm(d), nm(lwd) bni_x = pd.Series(bni_extra, index=df.index) geo_ok = _series_finite_numeric(zenith) & _series_finite_numeric(bni_x) # Sunshine duration (h): ACT = BNI > 120 W/m^2; MAX = zenith < 90° df["is_sunshine"] = ((df["bni"] > 120) & nm_b).astype(int) df["is_daylight"] = (zenith < 90).astype(int) # QC tests (True = pass, False = fail); failure sums exclude missing inputs per test. df["GHI_PPL"] = ( (~ppl.ghi_ppl_test(g, zenith, bni_extra)) & nm_g & geo_ok ).astype(int) df["GHI_ERL"] = ( (~erl.ghi_erl_test(g, zenith, bni_extra)) & nm_g & geo_ok ).astype(int) df["DHI_PPL"] = ( (~ppl.dhi_ppl_test(d, zenith, bni_extra)) & nm_d & geo_ok ).astype(int) df["DHI_ERL"] = ( (~erl.dhi_erl_test(d, zenith, bni_extra)) & nm_d & geo_ok ).astype(int) df["BNI_PPL"] = ((~ppl.bni_ppl_test(b, bni_extra)) & nm_b & geo_ok).astype(int) df["BNI_ERL"] = ( (~erl.bni_erl_test(b, zenith, bni_extra)) & nm_b & geo_ok ).astype(int) df["LWD_PPL"] = (~ppl.lwd_ppl_test(lwd) & nm_l).astype(int) df["LWD_ERL"] = (~erl.lwd_erl_test(lwd) & nm_l).astype(int) v3 = nm_g & nm_b & nm_d low = closure.closure_low_sza_test(g, b, d, zenith) high = closure.closure_high_sza_test(g, b, d, zenith) df["CMP_CLO"] = (((~low) | (~high)) & v3).astype(int) v2 = nm_g & nm_d df["CMP_DIF"] = ( ((~diff_ratio.k_low_sza_test(g, d, zenith)) | (~diff_ratio.k_high_sza_test(g, d, zenith))) & v2 ).astype(int) fail_kbkt = ~k_index.kb_kt_test(g, b, bni_extra, zenith) & nm_g & nm_b & geo_ok fail_kkt = ( ~diff_ratio.k_kt_combined_test(g, d, bni_extra, zenith) & nm_g & nm_d & geo_ok ) fail_kbl = ~k_index.kb_limit_test(b, bni_extra, elev, g) & nm_b & nm_g & geo_ok fail_ktl = ~k_index.kt_limit_test(g, bni_extra, zenith) & nm_g & geo_ok df["CMP_K"] = (fail_kbkt | fail_kkt | fail_kbl | fail_ktl).astype(int) # Tracker: match run_qc using Ineichen ghi_clear/bni_clear when not on df ghi_clear = df["ghi_clear"] if "ghi_clear" in df.columns else None bni_clear = df["bni_clear"] if "bni_clear" in df.columns else None if ghi_clear is None or bni_clear is None: if station_code is not None: _sky = add_clearsky_columns(df.copy(), station_code=station_code) else: _sky = add_clearsky_columns(df.copy(), lat=lat, lon=lon, elev=elev) ghi_clear = _sky["ghi_clear"] bni_clear = _sky["bni_clear"] vt = nm_g & nm_b & geo_ok pass_trk = tracker.tracker_off_test( g, b, zenith, ghi_extra=ghi_extra, ghi_clear=ghi_clear, bni_clear=bni_clear ) df["TRACKER"] = ((~pass_trk) & vt).astype(int) # Aggregate by day daily = df.groupby(df.index.date).agg({ 'is_sunshine': 'sum', 'is_daylight': 'sum', 'GHI_PPL': 'sum', 'GHI_ERL': 'sum', 'DHI_PPL': 'sum', 'DHI_ERL': 'sum', 'BNI_PPL': 'sum', 'BNI_ERL': 'sum', 'LWD_PPL': 'sum', 'LWD_ERL': 'sum', 'CMP_CLO': 'sum', 'CMP_DIF': 'sum', 'CMP_K': 'sum', 'TRACKER': 'sum' }) daily['SD_ACT'] = daily['is_sunshine'] / 60.0 daily['SD_MAX'] = daily['is_daylight'] / 60.0 daily['SD_REL'] = (daily['SD_ACT'] / daily['SD_MAX'] * 100.0).fillna(0) return daily