Source code for bsrn.qc.wrapper

"""
High-level QC runners and metadata helpers for BSRN DataFrames.
"""

import numpy as np
import pandas as pd
from bsrn.physics import geometry
from bsrn.constants import BSRN_STATIONS
from . import ppl, erl, closure, diff_ratio, k_index, tracker


def _get_metadata(station_code, lat, lon, elev):
    """
    Resolve lat/lon/elev from explicit values and/or BSRN station registry.

    Parameters
    ----------
    station_code : str or None
        BSRN station abbreviation, if used for lookup.
    lat, lon, elev : float or None
        Explicit coordinates; missing pieces may be filled from the registry.

    Returns
    -------
    tuple of float
        ``(lat, lon, elev)`` in degrees and meters.

    Raises
    ------
    ValueError
        If the station is unknown or coordinates are insufficient.
    """
    # Case 1: user provided explicit coordinates
    if lat is not None and lon is not None and elev is not None:
        return lat, lon, elev

    # Case 2: user provided a BSRN station code
    if station_code is not None:
        if station_code in BSRN_STATIONS:
            meta = BSRN_STATIONS[station_code]
            # Use provided values if available, otherwise fallback to registry
            lat = lat if lat is not None else meta['lat']
            lon = lon if lon is not None else meta['lon']
            elev = elev if elev is not None else meta['elev']
            return lat, lon, elev
        else:
            raise ValueError(
                f"Station '{station_code}' not found in BSRN registry. "
                "For non-BSRN stations, you must provide 'lat', 'lon', and 'elev' manually."
            )

    # Case 3: no station code and missing coordinates
    raise ValueError(
        "Insufficient metadata. Provide a valid BSRN 'station_code' or "
        "explicit 'lat', 'lon', and 'elev'."
    )


# Flag column → value columns to set NaN where flag == 1 (fail). Matches
# :mod:`bsrn.visualization.daily` QC marker grouping (not a blind row-sum).
_QC_SINGLE_FLAG_TARGETS = (
    ("flagPPLGHI", ("ghi",)),
    ("flagPPLBNI", ("bni",)),
    ("flagPPLDHI", ("dhi",)),
    ("flagPPLLWD", ("lwd",)),
    ("flagERLGHI", ("ghi",)),
    ("flagERLBNI", ("bni",)),
    ("flagERLDHI", ("dhi",)),
    ("flagERLLWD", ("lwd",)),
)
# If any listed flag is 1, NaN all listed value columns (OR over flags).
_QC_COMBINED_FLAG_TARGETS = (
    (("flag3lowSZA", "flag3highSZA"), ("ghi", "bni", "dhi")),
    (("flagKKt", "flagKlowSZA", "flagKhighSZA"), ("ghi", "dhi")),
    (("flagKbKt", "flagKb", "flagKt", "flagTracker"), ("ghi", "bni")),
)
_QC_FLAG_COLUMN_NAMES = frozenset(
    f for f, _ in _QC_SINGLE_FLAG_TARGETS
) | frozenset(
    f for grp, _ in _QC_COMBINED_FLAG_TARGETS for f in grp
)


[docs] def mask_failed_irradiance(df, *, flag_remove=True): """ Set shortwave / longwave irradiance columns to NaN where QC flags fail. Uses the same flag-to-column mapping as :func:`run_qc` outputs: each single-flag column (e.g. ``flagPPLGHI``) clears only its component; closure, diffuse-ratio, k-index, and tracker groups clear the components those tests share. Mutates ``df`` in place for irradiance values. When ``flag_remove`` is True (default), standard :func:`run_qc` flag columns are dropped afterward. Parameters ---------- df : pandas.DataFrame Frame that already contains ``run_qc`` flag columns. flag_remove : bool, optional If True (default), drop QC flag columns after masking. Returns ------- pandas.DataFrame ``df`` (same object), updated in place. Notes ----- Call this **after** :func:`run_qc` when you want failed minutes cleared; :func:`run_qc` does not invoke it automatically. To keep an unmodified copy, run ``mask_failed_irradiance(df.copy(), ...)``. Does not clear auxiliary columns (e.g. ``ghi_clear``, ``zenith``). """ if not isinstance(df, pd.DataFrame): raise TypeError("Input 'df' must be a pandas DataFrame.") for fcol, vcols in _QC_SINGLE_FLAG_TARGETS: if fcol not in df.columns: continue bad = df[fcol].to_numpy() == 1 if not bad.any(): continue for vc in vcols: if vc in df.columns: df.loc[bad, vc] = np.nan for fcols, vcols in _QC_COMBINED_FLAG_TARGETS: present = [c for c in fcols if c in df.columns] if not present: continue bad = (df[present].to_numpy() == 1).any(axis=1) if not bad.any(): continue for vc in vcols: if vc in df.columns: df.loc[bad, vc] = np.nan if flag_remove: drop_flags = [c for c in df.columns if c in _QC_FLAG_COLUMN_NAMES] if drop_flags: df.drop(columns=drop_flags, inplace=True) return df
[docs] def run_qc(df, station_code=None, lat=None, lon=None, elev=None, tests=('ppl', 'erl', 'closure', 'diff_ratio', 'k_index', 'tracker')): r""" Run a suite of QC tests on a BSRN DataFrame with optimized geometry calculations [1]_ [2]_. Parameters ---------- df : pd.DataFrame Input BSRN data containing irradiance ($G_h, B_n, D_h, L_d$) and/or meteorological ($T, RH, P$) columns. station_code : str, optional BSRN station code to retrieve coordinates. lat : float, optional Latitude ($\phi$). [degrees] lon : float, optional Longitude ($\lambda$). [degrees] elev : float, optional Elevation ($H$). [m] tests : tuple of str, optional List of tests to run (e.g., 'ppl', 'erl', 'closure'). Default is all. Returns ------- df : pd.DataFrame DataFrame with added QC flag columns (0 = Pass, 1 = Fail). Raises ------ TypeError If ``df`` is not a :class:`~pandas.DataFrame`. ValueError If the index is not a :class:`~pandas.DatetimeIndex` or metadata resolution fails (see :func:`_get_metadata`). References ---------- .. [1] Long, C. N., & Shi, Y. (2008). An automated quality assessment and control algorithm for surface radiation measurements. The Open Atmospheric Science Journal, 2(1), 23-37. .. [2] Forstinger, A., et al. (2021). Expert quality control of solar radiation ground data sets. In SWC 2021: ISES Solar World Congress. International Solar Energy Society. """ if not isinstance(df, pd.DataFrame): raise TypeError("Input 'df' must be a pandas DataFrame.") if not isinstance(df.index, pd.DatetimeIndex): raise ValueError( "DataFrame index must be a DatetimeIndex to calculate solar position." ) # 0. Validate metadata try: lat, lon, elev = _get_metadata(station_code, lat, lon, elev) except ValueError as e: raise ValueError(f"QC metadata error: {str(e)}") from e # 1. Pre-calculate solar geometry solpos = geometry.get_solar_position(df.index, lat, lon, elev) zenith = solpos["zenith"] bni_extra = geometry.get_bni_extra(df.index) # 2. Apply requested tests if 'ppl' in tests: if 'ghi' in df.columns: df['flagPPLGHI'] = (~ppl.ghi_ppl_test(df['ghi'], zenith, bni_extra)).astype(int) if 'bni' in df.columns: df['flagPPLBNI'] = (~ppl.bni_ppl_test(df['bni'], bni_extra)).astype(int) if 'dhi' in df.columns: df['flagPPLDHI'] = (~ppl.dhi_ppl_test(df['dhi'], zenith, bni_extra)).astype(int) if 'lwd' in df.columns: df['flagPPLLWD'] = (~ppl.lwd_ppl_test(df['lwd'])).astype(int) if 'erl' in tests: if 'ghi' in df.columns: df['flagERLGHI'] = (~erl.ghi_erl_test(df['ghi'], zenith, bni_extra)).astype(int) if 'bni' in df.columns: df['flagERLBNI'] = (~erl.bni_erl_test(df['bni'], zenith, bni_extra)).astype(int) if 'dhi' in df.columns: df['flagERLDHI'] = (~erl.dhi_erl_test(df['dhi'], zenith, bni_extra)).astype(int) if 'lwd' in df.columns: df['flagERLLWD'] = (~erl.lwd_erl_test(df['lwd'])).astype(int) if 'closure' in tests: if all(c in df.columns for c in ['ghi', 'bni', 'dhi']): df['flag3lowSZA'] = (~closure.closure_low_sza_test(df['ghi'], df['bni'], df['dhi'], zenith)).astype(int) df['flag3highSZA'] = (~closure.closure_high_sza_test(df['ghi'], df['bni'], df['dhi'], zenith)).astype(int) if 'diff_ratio' in tests: if all(c in df.columns for c in ['ghi', 'dhi']): df['flagKKt'] = (~diff_ratio.k_kt_combined_test(df['ghi'], df['dhi'], bni_extra, zenith)).astype(int) df['flagKlowSZA'] = (~diff_ratio.k_low_sza_test(df['ghi'], df['dhi'], zenith)).astype(int) df['flagKhighSZA'] = (~diff_ratio.k_high_sza_test(df['ghi'], df['dhi'], zenith)).astype(int) if 'k_index' in tests: if all(c in df.columns for c in ['ghi', 'bni']): df['flagKbKt'] = (~k_index.kb_kt_test(df['ghi'], df['bni'], bni_extra, zenith)).astype(int) df['flagKb'] = (~k_index.kb_limit_test(df['bni'], bni_extra, elev, df['ghi'])).astype(int) df['flagKt'] = (~k_index.kt_limit_test(df['ghi'], bni_extra, zenith)).astype(int) if 'tracker' in tests: if all(c in df.columns for c in ['ghi', 'bni']): ghi_extra = geometry.get_ghi_extra(df.index, zenith) ghi_c = df['ghi_clear'] if 'ghi_clear' in df.columns else None bni_c = df['bni_clear'] if 'bni_clear' in df.columns else None f_pass = tracker.tracker_off_test(df['ghi'], df['bni'], zenith, ghi_extra=ghi_extra, ghi_clear=ghi_c, bni_clear=bni_c) df['flagTracker'] = (~f_pass).astype(int) return df
[docs] def test_physically_possible(df, station_code=None, lat=None, lon=None, elev=None): """ Run all Phase 1 (Physically Possible) checks on a DataFrame. Parameters ---------- df : pd.DataFrame Input BSRN data. station_code : str, optional BSRN station code. lat, lon, elev : float, optional Station coordinates and elevation. Returns ------- df : pd.DataFrame DataFrame with added 'flagPPL*' flag columns. Raises ------ TypeError, ValueError Same as :func:`run_qc` for the ``ppl`` test subset. """ return run_qc(df, station_code, lat, lon, elev, tests=('ppl',))
[docs] def test_extremely_rare(df, station_code=None, lat=None, lon=None, elev=None): """ Run all Phase 2 (Extremely Rare) checks on a DataFrame. Parameters ---------- df : pd.DataFrame Input BSRN data. station_code : str, optional BSRN station code. lat, lon, elev : float, optional Station coordinates and elevation. Returns ------- df : pd.DataFrame DataFrame with added 'flagERL*' flag columns. Raises ------ TypeError, ValueError Same as :func:`run_qc` for the ``erl`` test subset. """ return run_qc(df, station_code, lat, lon, elev, tests=('erl',))
[docs] def test_closure(df, station_code=None, lat=None, lon=None, elev=None): """ Run all Phase 3 (Closure) consistency checks on a DataFrame. Parameters ---------- df : pd.DataFrame Input BSRN data. station_code : str, optional BSRN station code. lat, lon, elev : float, optional Station coordinates and elevation. Returns ------- df : pd.DataFrame DataFrame with added 'flag3lowSZA' and 'flag3highSZA' flag columns. Raises ------ TypeError, ValueError Same as :func:`run_qc` for the ``closure`` test subset. """ return run_qc(df, station_code, lat, lon, elev, tests=('closure',))
[docs] def test_diff_ratio(df, station_code=None, lat=None, lon=None, elev=None): """ Run all Phase 3 Diffuse Ratio (k) consistency checks on a DataFrame. Parameters ---------- df : pd.DataFrame Input BSRN data. station_code : str, optional BSRN station code. lat, lon, elev : float, optional Station coordinates and elevation. Returns ------- df : pd.DataFrame DataFrame with added 'flagKKt', 'flagKlowSZA', 'flagKhighSZA' flag columns. Raises ------ TypeError, ValueError Same as :func:`run_qc` for the ``diff_ratio`` test subset. """ return run_qc(df, station_code, lat, lon, elev, tests=('diff_ratio',))
[docs] def test_k_index(df, station_code=None, lat=None, lon=None, elev=None): """ Run all Phase 3 Radiometric Index (k-index) checks on a DataFrame. Parameters ---------- df : pd.DataFrame Input BSRN data. station_code : str, optional BSRN station code. lat, lon, elev : float, optional Station coordinates and elevation. Returns ------- df : pd.DataFrame DataFrame with added 'flagKbKt', 'flagKb', 'flagKt' flag columns. Raises ------ TypeError, ValueError Same as :func:`run_qc` for the ``k_index`` test subset. """ return run_qc(df, station_code, lat, lon, elev, tests=('k_index',))
[docs] def test_tracker_off(df, station_code=None, lat=None, lon=None, elev=None): """ Run Tracker-off detection on a DataFrame. Parameters ---------- df : pd.DataFrame Input BSRN data. station_code : str, optional BSRN station code. lat, lon, elev : float, optional Station coordinates and elevation. Returns ------- df : pd.DataFrame DataFrame with added 'flagTracker' flag column. Raises ------ TypeError, ValueError Same as :func:`run_qc` for the ``tracker`` test subset. """ return run_qc(df, station_code, lat, lon, elev, tests=('tracker',))