Source code for bsrn.dataset

"""
Central BSRN dataset: one monthly station file as a typed, validated object.

Encapsulates station identity, resolved geographic metadata, and minute-
resolution data in a single Pydantic model. ``lr0100`` is the source of
truth for minute data; ``data()`` returns a cached ``DataFrame`` with
only the mean/value columns by default. LR0300 / LR4000 columns are
available on demand via ``data(include=[...])``. Most pipeline methods
(solar position, clear-sky, QC) mutate the cached frame in-place;
the ``average`` method replaces the cache with a coarser-index result
from :func:`~bsrn.utils.averaging.pretty_average`. Use :meth:`qc_test`
then optionally :meth:`qc_mask` to apply QC-based masking.
"""

from __future__ import annotations

import calendar
from typing import Optional

import numpy as np
import pandas as pd
from pydantic import (BaseModel, ConfigDict, Field, PrivateAttr,
                      field_validator, model_validator)

from .archive.records_models import LR0100, LR0300, LR4000
from .constants import BSRN_STATIONS
from .io.reader import read_bsrn_archive

# Variable maps: LR field name → short column name exposed by data().
# Only these columns appear; _std/_min/_max are dropped.
_LR0100_VAR_MAP = {
    "ghi_avg": "ghi", "bni_avg": "bni",
    "dhi_avg": "dhi", "lwd_avg": "lwd",
    "temperature": "temp", "humidity": "rh",
    "pressure": "pressure",
}
_LR0300_VAR_MAP = {
    "swu_avg": "swu", "lwu_avg": "lwu", "net_avg": "net",
}
_LR4000_VAR_MAP = {
    "domeT1_down": "dt1d", "domeT2_down": "dt2d",
    "domeT3_down": "dt3d", "bodyT_down": "btd",
    "domeT1_up": "dt1u", "domeT2_up": "dt2u",
    "domeT3_up": "dt3u", "bodyT_up": "btu",
}


[docs] class BSRNDataset(BaseModel): """ One monthly BSRN dataset: station identity and minute data. Typical enrichment on the cached :meth:`data` frame is :meth:`solpos`, then :meth:`clear_sky`, then :meth:`qc_test` (each mutates that frame in place and returns it). Optional :meth:`qc_mask` sets failed irradiance to NaN and can drop flag columns. :meth:`average` replaces the cache with a coarser time series from :func:`~bsrn.utils.averaging.pretty_average`. Parameters ---------- station_code : str Three-letter BSRN station code (must exist in ``BSRN_STATIONS``). year : int Four-digit measurement year. month : int Measurement month (1--12). lr0100 : LR0100 Validated LR0100 logical record (minute radiation and met data). This is the source of truth; ``data()`` is derived from it. lr0300 : LR0300 or None Validated LR0300 logical record (reflected / upward SW, LW, net radiation). Default ``None``. lr4000 : LR4000 or None Validated LR4000 logical record (pyrgeometer minute data). Default ``None``. metadata_lrs : dict Mapping for additional non-core logical records keyed by ``lr####`` (for example ``'lr0001'``). Default empty dict. station_name : str or None Resolved from ``BSRN_STATIONS`` when omitted. lat : float or None Latitude (degrees); resolved from ``BSRN_STATIONS``. lon : float or None Longitude (degrees); resolved from ``BSRN_STATIONS``. elev : float or None Elevation (m above sea level); resolved from ``BSRN_STATIONS``. resolution : int or None Temporal resolution in minutes (e.g. ``1``, ``2``, ``3``, ``5``). Defaults to ``1``. Raises ------ ValueError If ``station_code`` is not in ``BSRN_STATIONS`` or ``month`` is outside 1--12. """ model_config = ConfigDict( arbitrary_types_allowed=True, frozen=False, ) # ------------------------------------------------------------------ # # Fields # # ------------------------------------------------------------------ # # Core (required): identity + minute radiation data. station_code: str year: int month: int lr0100: LR0100 # Optional logical records. lr0300: Optional[LR0300] = None lr4000: Optional[LR4000] = None metadata_lrs: dict = Field(default_factory=dict) # Resolved from BSRN_STATIONS; users may override. station_name: str = None lat: float = None lon: float = None elev: float = None resolution: int = None # Internal cached DataFrame; excluded from serialisation. _df_cache: Optional[pd.DataFrame] = PrivateAttr(default=None) # ------------------------------------------------------------------ # # Validators # # ------------------------------------------------------------------ # @field_validator("station_code") @classmethod def _validate_station_code(cls, v): code = v.upper() if code not in BSRN_STATIONS: raise ValueError( f"unknown station code {v!r}; " f"not in BSRN_STATIONS" ) return code @field_validator("month") @classmethod def _validate_month(cls, v): if v < 1 or v > 12: raise ValueError( f"month must be 1--12, got {v}" ) return v @model_validator(mode="after") def _resolve_metadata(self): """ Fill ``station_name``, ``lat``, ``lon``, ``elev`` from ``BSRN_STATIONS`` when not explicitly provided. """ meta = BSRN_STATIONS[self.station_code] if self.station_name is None: self.station_name = meta["name"] if self.lat is None: self.lat = meta["lat"] if self.lon is None: self.lon = meta["lon"] if self.elev is None: self.elev = meta["elev"] if self.resolution is None: self.resolution = self._infer_resolution() return self # ------------------------------------------------------------------ # # Factory # # ------------------------------------------------------------------ #
[docs] @classmethod def from_file(cls, path, include_lrs=None, strict=False): """ Parse a BSRN ``.dat.gz`` station-to-archive file and return a fully validated ``BSRNDataset``. Parameters ---------- path : str or Path Path to the ``.dat.gz`` file (filename format ``XXXMMYY.dat.gz``). include_lrs : sequence of str or 'all', optional Logical records to parse. Supports ``'lr0100'`` (required), ``'lr0300'``, ``'lr4000'``, and ``'lr0001'``. Default ``None`` parses all supported records. strict : bool, optional Passed to :func:`~bsrn.io.reader.read_bsrn_archive`. If ``True``, malformed optional LR blocks raise. If ``False`` (default), malformed optional LRs are returned as ``None``. Returns ------- BSRNDataset Raises ------ FileNotFoundError If *path* does not exist. ValueError If the filename cannot be parsed or no LR0100 block is found. """ return cls( **read_bsrn_archive( path, include_lrs=include_lrs, strict=strict, ) )
# ------------------------------------------------------------------ # # data() # # ------------------------------------------------------------------ #
[docs] def data(self, include=None): """ Minute-resolution DataFrame derived from ``lr0100``. The base frame contains only the LR0100 **mean / scalar** columns under short names (``ghi``, ``bni``, ``dhi``, ``lwd``, ``temp``, ``rh``, ``pressure``). It is built once and cached so that pipeline methods (``solpos``, ``average``, etc.) can enrich it in-place. Parameters ---------- include : sequence of str, optional Extra logical records to merge: ``"lr0300"`` and/or ``"lr4000"`` (case-insensitive). When given, the corresponding mean/value columns are appended. Returns ------- pandas.DataFrame UTC ``DatetimeIndex``; default columns are LR0100 means only. """ if self._df_cache is None: self._df_cache = self._build_base_frame() if not include: return self._df_cache want = {s.lower() for s in include} extra = {} if "lr0300" in want and self.lr0300 is not None: for lr_col, short in _LR0300_VAR_MAP.items(): vec = getattr(self.lr0300, lr_col, None) if vec is not None: extra[short] = np.asarray(vec) if "lr4000" in want and self.lr4000 is not None: for lr_col, short in _LR4000_VAR_MAP.items(): vec = getattr(self.lr4000, lr_col, None) if vec is not None: extra[short] = np.asarray(vec) if not extra: return self._df_cache return self._df_cache.assign(**extra)
@property def plot(self): """ Accessor for built-in plotting routines. """ return BSRNPlot(self) def get_lr(self, lr_code): """ Return one logical record by code. Parameters ---------- lr_code : str Logical record code in ``lr####`` form (case-insensitive), for example ``'lr0100'`` or ``'lr0001'``. Returns ------- object or None Requested LR object when present, else ``None``. """ key = str(lr_code).strip().lower() if key == "lr0100": return self.lr0100 if key == "lr0300": return self.lr0300 if key == "lr4000": return self.lr4000 return self.metadata_lrs.get(key) def has_lr(self, lr_code): """ Check whether one logical record exists on the dataset. Parameters ---------- lr_code : str Logical record code in ``lr####`` form (case-insensitive). Returns ------- bool True if the requested LR exists, else False. """ return self.get_lr(lr_code) is not None # ------------------------------------------------------------------ # # Pipeline methods (delegate to standalone functions) # # ------------------------------------------------------------------ #
[docs] def solpos(self): """ Add solar position and extraterrestrial irradiance columns to the cached ``data()`` frame. Delegates to :func:`~bsrn.physics.geometry.add_solpos_columns` using the resolved ``lat``, ``lon``, ``elev``. Returns ------- pandas.DataFrame ``data()`` with added columns: ``zenith``, ``apparent_zenith``, ``azimuth``, ``bni_extra``, ``ghi_extra``. """ from .physics.geometry import add_solpos_columns return add_solpos_columns( self.data(), station_code=self.station_code, lat=self.lat, lon=self.lon, elev=self.elev, )
[docs] def clear_sky(self, model="ineichen", mcclear_email=None): """ Add clear-sky irradiance columns to the cached ``data()`` frame. Delegates to :func:`~bsrn.modeling.clear_sky.add_clearsky_columns`. Parameters ---------- model : str, optional Clear-sky model name (default ``'ineichen'``). mcclear_email : str, optional E-mail for CAMS McClear API (only when ``model='mcclear'``). Returns ------- pandas.DataFrame ``data()`` with added clear-sky columns (``ghi_clear``, ``bni_clear``, ``dhi_clear``, …). """ from .modeling.clear_sky import add_clearsky_columns return add_clearsky_columns( self.data(), station_code=self.station_code, lat=self.lat, lon=self.lon, elev=self.elev, model=model, mcclear_email=mcclear_email, )
[docs] def qc_test(self, tests=('ppl', 'erl', 'closure', 'diff_ratio', 'k_index', 'tracker')): """ Run QC tests and add flag columns to the cached ``data()`` frame. Delegates to :func:`~bsrn.qc.wrapper.run_qc`. Parameters ---------- tests : tuple of str, optional QC test names to run (default: all six). Returns ------- pandas.DataFrame ``data()`` with added ``flag_*`` columns. """ from .qc.wrapper import run_qc return run_qc( self.data(), station_code=self.station_code, lat=self.lat, lon=self.lon, elev=self.elev, tests=tests, )
[docs] def qc_mask(self, flag_remove=True): """ Set irradiance values to NaN where QC flags fail; optionally drop flag columns. Call :meth:`qc_test` first so flag columns exist. Delegates to :func:`~bsrn.qc.wrapper.mask_failed_irradiance` on ``data()``. Parameters ---------- flag_remove : bool, optional If True (default), drop standard QC flag columns after masking. Returns ------- pandas.DataFrame ``data()`` after masking (same cached object). """ from .qc.wrapper import mask_failed_irradiance return mask_failed_irradiance(self.data(), flag_remove=flag_remove)
[docs] def average(self, freq, alignment="ceiling", aggfunc="mean", match_ceiling_labels=True): """ Time-average the cached ``data()`` with explicit labeled windows. Delegates to :func:`~bsrn.utils.averaging.pretty_average` and **replaces** the internal cache with the returned frame (new index). Native timestep for **center** windows is taken from ``self.resolution`` (minutes) when set; otherwise passed as ``None`` for :func:`~bsrn.utils.averaging.pretty_average` to infer. Parameters ---------- freq : str Fixed bin frequency (e.g. ``'1h'``, ``'30min'``). alignment : {'floor', 'ceiling', 'center'}, optional Window alignment (default ``'ceiling'``). aggfunc : str or callable, optional Aggregation function (default ``'mean'``). match_ceiling_labels : bool, optional When ``alignment='center'``, monthly edge trim style (default ``True``, ceiling-like). Returns ------- pandas.DataFrame One row per output label; also stored as the new cache. Raises ------ TypeError If ``data().index`` is not a :class:`~pandas.DatetimeIndex`. ValueError Propagated from :func:`~bsrn.utils.averaging.pretty_average` when ``freq`` is not a fixed frequency. """ from .utils.averaging import pretty_average res = None if self.resolution is not None: res = pd.Timedelta(minutes=int(self.resolution)) out = pretty_average( self.data(), freq, alignment=alignment, aggfunc=aggfunc, resolution=res, match_ceiling_labels=match_ceiling_labels, ) self._df_cache = out return out
# ------------------------------------------------------------------ # # Private helpers # # ------------------------------------------------------------------ # def _build_base_frame(self): """Build the LR0100-means-only DataFrame (called once).""" y, m = map(int, self.lr0100.yearMonth.split("-")) n = None cols = {} for lr_col, short in _LR0100_VAR_MAP.items(): vec = getattr(self.lr0100, lr_col, None) if vec is not None: arr = np.asarray(vec) cols[short] = arr if n is None: n = len(arr) if n is None: n = calendar.monthrange(y, m)[1] * 1440 idx = pd.date_range( f"{y}-{m:02d}-01", periods=n, freq=f"{self.resolution} min", tz="UTC", ) return pd.DataFrame(cols, index=idx) def _infer_resolution(self): """ Infer temporal resolution from ``lr0100`` vector length vs calendar month. """ y, m = map(int, self.lr0100.yearMonth.split("-")) ndays = calendar.monthrange(y, m)[1] expected_1min = ndays * 1440 for lr_col in _LR0100_VAR_MAP: vec = getattr(self.lr0100, lr_col, None) if vec is not None: n = len(vec) return expected_1min // n return 1
# ---------------------------------------------------------------------- # # BSRNPlot Accessor Class # ---------------------------------------------------------------------- #
[docs] class BSRNPlot: """ Visualization accessor for BSRNDataset. """
[docs] def __init__(self, ds: "BSRNDataset"): self._ds = ds
def __call__(self, dates, output_file=None, **kwargs): """ Default to daily time series plot. """ return self.daily(dates, output_file=output_file, **kwargs)
[docs] def daily(self, dates, output_file=None, **kwargs): """ Plot daily time series (day or booklet mode from *dates*). Parameters ---------- dates : str, pd.Timestamp, or sequence Date or dates to plot. output_file : str Output path for the figure. Returns ------- None """ import numpy as np import pandas as pd from .visualization.daily import ( plot_bsrn_daily_day, plot_bsrn_daily_booklet, ) df = self._ds.data() is_multi = isinstance(dates, (list, tuple, pd.Index, np.ndarray)) and len(dates) > 1 if is_multi: # Filter the dataframe to only include the requested dates date_objs = pd.to_datetime(dates).date mask = np.isin(df.index.date, date_objs) filtered_df = df.loc[mask].copy() return plot_bsrn_daily_booklet( file_path=None, output_file=output_file, df=filtered_df, **kwargs, ) else: # Single date given date_to_plot = dates[0] if isinstance(dates, (list, tuple, pd.Index, np.ndarray)) else dates return plot_bsrn_daily_day( file_path=None, day=date_to_plot, output_file=output_file, df=df, **kwargs, )
[docs] def table(self, output_file=None, title=None): """ Plot the QC results summary table. Parameters ---------- output_file : str, optional Output path for the figure. title : str, optional Plot title. Returns ------- None """ from .visualization.table import plot_table from .utils.quality import get_daily_stats daily_stats = get_daily_stats( self._ds.data(), self._ds.lat, self._ds.lon, self._ds.elev, station_code=self._ds.station_code, ) return plot_table(daily_stats, title=title, output_file=output_file)