"""
Central BSRN dataset: one monthly station file as a typed, validated object.
Encapsulates station identity, resolved geographic metadata, and minute-
resolution data in a single Pydantic model. ``lr0100`` is the source of
truth for minute data; ``data()`` returns a cached ``DataFrame`` with
only the mean/value columns by default. LR0300 / LR4000 columns are
available on demand via ``data(include=[...])``. Most pipeline methods
(solar position, clear-sky, QC) mutate the cached frame in-place;
the ``average`` method replaces the cache with a coarser-index result
from :func:`~bsrn.utils.averaging.pretty_average`. Use :meth:`qc_test`
then optionally :meth:`qc_mask` to apply QC-based masking.
"""
from __future__ import annotations
import calendar
from typing import Optional
import numpy as np
import pandas as pd
from pydantic import (BaseModel, ConfigDict, Field, PrivateAttr,
field_validator, model_validator)
from .archive.records_models import LR0100, LR0300, LR4000
from .constants import BSRN_STATIONS
from .io.reader import read_bsrn_archive
# Variable maps: LR field name → short column name exposed by data().
# Only these columns appear; _std/_min/_max are dropped.
_LR0100_VAR_MAP = {
"ghi_avg": "ghi", "bni_avg": "bni",
"dhi_avg": "dhi", "lwd_avg": "lwd",
"temperature": "temp", "humidity": "rh",
"pressure": "pressure",
}
_LR0300_VAR_MAP = {
"swu_avg": "swu", "lwu_avg": "lwu", "net_avg": "net",
}
_LR4000_VAR_MAP = {
"domeT1_down": "dt1d", "domeT2_down": "dt2d",
"domeT3_down": "dt3d", "bodyT_down": "btd",
"domeT1_up": "dt1u", "domeT2_up": "dt2u",
"domeT3_up": "dt3u", "bodyT_up": "btu",
}
[docs]
class BSRNDataset(BaseModel):
"""
One monthly BSRN dataset: station identity and minute data.
Typical enrichment on the cached :meth:`data` frame is
:meth:`solpos`, then :meth:`clear_sky`, then :meth:`qc_test`
(each mutates that frame in place and returns it). Optional
:meth:`qc_mask` sets failed irradiance to NaN and can drop flag columns.
:meth:`average` replaces the cache with a coarser time series from
:func:`~bsrn.utils.averaging.pretty_average`.
Parameters
----------
station_code : str
Three-letter BSRN station code (must exist in ``BSRN_STATIONS``).
year : int
Four-digit measurement year.
month : int
Measurement month (1--12).
lr0100 : LR0100
Validated LR0100 logical record (minute radiation and
met data). This is the source of truth; ``data()`` is
derived from it.
lr0300 : LR0300 or None
Validated LR0300 logical record (reflected / upward
SW, LW, net radiation). Default ``None``.
lr4000 : LR4000 or None
Validated LR4000 logical record (pyrgeometer minute
data). Default ``None``.
metadata_lrs : dict
Mapping for additional non-core logical records keyed by
``lr####`` (for example ``'lr0001'``). Default empty dict.
station_name : str or None
Resolved from ``BSRN_STATIONS`` when omitted.
lat : float or None
Latitude (degrees); resolved from ``BSRN_STATIONS``.
lon : float or None
Longitude (degrees); resolved from ``BSRN_STATIONS``.
elev : float or None
Elevation (m above sea level); resolved from
``BSRN_STATIONS``.
resolution : int or None
Temporal resolution in minutes (e.g. ``1``, ``2``,
``3``, ``5``). Defaults to ``1``.
Raises
------
ValueError
If ``station_code`` is not in ``BSRN_STATIONS`` or
``month`` is outside 1--12.
"""
model_config = ConfigDict(
arbitrary_types_allowed=True,
frozen=False,
)
# ------------------------------------------------------------------ #
# Fields #
# ------------------------------------------------------------------ #
# Core (required): identity + minute radiation data.
station_code: str
year: int
month: int
lr0100: LR0100
# Optional logical records.
lr0300: Optional[LR0300] = None
lr4000: Optional[LR4000] = None
metadata_lrs: dict = Field(default_factory=dict)
# Resolved from BSRN_STATIONS; users may override.
station_name: str = None
lat: float = None
lon: float = None
elev: float = None
resolution: int = None
# Internal cached DataFrame; excluded from serialisation.
_df_cache: Optional[pd.DataFrame] = PrivateAttr(default=None)
# ------------------------------------------------------------------ #
# Validators #
# ------------------------------------------------------------------ #
@field_validator("station_code")
@classmethod
def _validate_station_code(cls, v):
code = v.upper()
if code not in BSRN_STATIONS:
raise ValueError(
f"unknown station code {v!r}; "
f"not in BSRN_STATIONS"
)
return code
@field_validator("month")
@classmethod
def _validate_month(cls, v):
if v < 1 or v > 12:
raise ValueError(
f"month must be 1--12, got {v}"
)
return v
@model_validator(mode="after")
def _resolve_metadata(self):
"""
Fill ``station_name``, ``lat``, ``lon``, ``elev`` from
``BSRN_STATIONS`` when not explicitly provided.
"""
meta = BSRN_STATIONS[self.station_code]
if self.station_name is None:
self.station_name = meta["name"]
if self.lat is None:
self.lat = meta["lat"]
if self.lon is None:
self.lon = meta["lon"]
if self.elev is None:
self.elev = meta["elev"]
if self.resolution is None:
self.resolution = self._infer_resolution()
return self
# ------------------------------------------------------------------ #
# Factory #
# ------------------------------------------------------------------ #
[docs]
@classmethod
def from_file(cls, path, include_lrs=None, strict=False):
"""
Parse a BSRN ``.dat.gz`` station-to-archive file and return
a fully validated ``BSRNDataset``.
Parameters
----------
path : str or Path
Path to the ``.dat.gz`` file (filename format
``XXXMMYY.dat.gz``).
include_lrs : sequence of str or 'all', optional
Logical records to parse. Supports ``'lr0100'`` (required),
``'lr0300'``, ``'lr4000'``, and ``'lr0001'``. Default ``None``
parses all supported records.
strict : bool, optional
Passed to :func:`~bsrn.io.reader.read_bsrn_archive`.
If ``True``, malformed optional LR blocks raise.
If ``False`` (default), malformed optional LRs are returned
as ``None``.
Returns
-------
BSRNDataset
Raises
------
FileNotFoundError
If *path* does not exist.
ValueError
If the filename cannot be parsed or no LR0100 block
is found.
"""
return cls(
**read_bsrn_archive(
path, include_lrs=include_lrs, strict=strict,
)
)
# ------------------------------------------------------------------ #
# data() #
# ------------------------------------------------------------------ #
[docs]
def data(self, include=None):
"""
Minute-resolution DataFrame derived from ``lr0100``.
The base frame contains only the LR0100 **mean / scalar**
columns under short names (``ghi``, ``bni``, ``dhi``,
``lwd``, ``temp``, ``rh``, ``pressure``). It is built once
and cached so that pipeline methods (``solpos``, ``average``, etc.)
can enrich it in-place.
Parameters
----------
include : sequence of str, optional
Extra logical records to merge: ``"lr0300"`` and/or
``"lr4000"`` (case-insensitive). When given, the
corresponding mean/value columns are appended.
Returns
-------
pandas.DataFrame
UTC ``DatetimeIndex``; default columns are LR0100
means only.
"""
if self._df_cache is None:
self._df_cache = self._build_base_frame()
if not include:
return self._df_cache
want = {s.lower() for s in include}
extra = {}
if "lr0300" in want and self.lr0300 is not None:
for lr_col, short in _LR0300_VAR_MAP.items():
vec = getattr(self.lr0300, lr_col, None)
if vec is not None:
extra[short] = np.asarray(vec)
if "lr4000" in want and self.lr4000 is not None:
for lr_col, short in _LR4000_VAR_MAP.items():
vec = getattr(self.lr4000, lr_col, None)
if vec is not None:
extra[short] = np.asarray(vec)
if not extra:
return self._df_cache
return self._df_cache.assign(**extra)
@property
def plot(self):
"""
Accessor for built-in plotting routines.
"""
return BSRNPlot(self)
def get_lr(self, lr_code):
"""
Return one logical record by code.
Parameters
----------
lr_code : str
Logical record code in ``lr####`` form (case-insensitive),
for example ``'lr0100'`` or ``'lr0001'``.
Returns
-------
object or None
Requested LR object when present, else ``None``.
"""
key = str(lr_code).strip().lower()
if key == "lr0100":
return self.lr0100
if key == "lr0300":
return self.lr0300
if key == "lr4000":
return self.lr4000
return self.metadata_lrs.get(key)
def has_lr(self, lr_code):
"""
Check whether one logical record exists on the dataset.
Parameters
----------
lr_code : str
Logical record code in ``lr####`` form (case-insensitive).
Returns
-------
bool
True if the requested LR exists, else False.
"""
return self.get_lr(lr_code) is not None
# ------------------------------------------------------------------ #
# Pipeline methods (delegate to standalone functions) #
# ------------------------------------------------------------------ #
[docs]
def solpos(self):
"""
Add solar position and extraterrestrial irradiance columns
to the cached ``data()`` frame.
Delegates to
:func:`~bsrn.physics.geometry.add_solpos_columns` using
the resolved ``lat``, ``lon``, ``elev``.
Returns
-------
pandas.DataFrame
``data()`` with added columns: ``zenith``,
``apparent_zenith``, ``azimuth``, ``bni_extra``,
``ghi_extra``.
"""
from .physics.geometry import add_solpos_columns
return add_solpos_columns(
self.data(), station_code=self.station_code,
lat=self.lat, lon=self.lon, elev=self.elev,
)
[docs]
def clear_sky(self, model="ineichen", mcclear_email=None):
"""
Add clear-sky irradiance columns to the cached ``data()``
frame.
Delegates to
:func:`~bsrn.modeling.clear_sky.add_clearsky_columns`.
Parameters
----------
model : str, optional
Clear-sky model name (default ``'ineichen'``).
mcclear_email : str, optional
E-mail for CAMS McClear API (only when
``model='mcclear'``).
Returns
-------
pandas.DataFrame
``data()`` with added clear-sky columns
(``ghi_clear``, ``bni_clear``, ``dhi_clear``, …).
"""
from .modeling.clear_sky import add_clearsky_columns
return add_clearsky_columns(
self.data(), station_code=self.station_code,
lat=self.lat, lon=self.lon, elev=self.elev,
model=model, mcclear_email=mcclear_email,
)
[docs]
def qc_test(self, tests=('ppl', 'erl', 'closure',
'diff_ratio', 'k_index', 'tracker')):
"""
Run QC tests and add flag columns to the cached ``data()``
frame.
Delegates to :func:`~bsrn.qc.wrapper.run_qc`.
Parameters
----------
tests : tuple of str, optional
QC test names to run (default: all six).
Returns
-------
pandas.DataFrame
``data()`` with added ``flag_*`` columns.
"""
from .qc.wrapper import run_qc
return run_qc(
self.data(), station_code=self.station_code,
lat=self.lat, lon=self.lon, elev=self.elev,
tests=tests,
)
[docs]
def qc_mask(self, flag_remove=True):
"""
Set irradiance values to NaN where QC flags fail; optionally drop
flag columns.
Call :meth:`qc_test` first so flag columns exist. Delegates to
:func:`~bsrn.qc.wrapper.mask_failed_irradiance` on ``data()``.
Parameters
----------
flag_remove : bool, optional
If True (default), drop standard QC flag columns after masking.
Returns
-------
pandas.DataFrame
``data()`` after masking (same cached object).
"""
from .qc.wrapper import mask_failed_irradiance
return mask_failed_irradiance(self.data(), flag_remove=flag_remove)
[docs]
def average(self, freq, alignment="ceiling", aggfunc="mean",
match_ceiling_labels=True):
"""
Time-average the cached ``data()`` with explicit labeled windows.
Delegates to :func:`~bsrn.utils.averaging.pretty_average` and
**replaces** the internal cache with the returned frame (new index).
Native timestep for **center** windows is taken from ``self.resolution``
(minutes) when set; otherwise passed as ``None`` for
:func:`~bsrn.utils.averaging.pretty_average` to infer.
Parameters
----------
freq : str
Fixed bin frequency (e.g. ``'1h'``, ``'30min'``).
alignment : {'floor', 'ceiling', 'center'}, optional
Window alignment (default ``'ceiling'``).
aggfunc : str or callable, optional
Aggregation function (default ``'mean'``).
match_ceiling_labels : bool, optional
When ``alignment='center'``, monthly edge trim style (default
``True``, ceiling-like).
Returns
-------
pandas.DataFrame
One row per output label; also stored as the new cache.
Raises
------
TypeError
If ``data().index`` is not a :class:`~pandas.DatetimeIndex`.
ValueError
Propagated from :func:`~bsrn.utils.averaging.pretty_average` when
``freq`` is not a fixed frequency.
"""
from .utils.averaging import pretty_average
res = None
if self.resolution is not None:
res = pd.Timedelta(minutes=int(self.resolution))
out = pretty_average(
self.data(), freq, alignment=alignment, aggfunc=aggfunc,
resolution=res, match_ceiling_labels=match_ceiling_labels,
)
self._df_cache = out
return out
# ------------------------------------------------------------------ #
# Private helpers #
# ------------------------------------------------------------------ #
def _build_base_frame(self):
"""Build the LR0100-means-only DataFrame (called once)."""
y, m = map(int, self.lr0100.yearMonth.split("-"))
n = None
cols = {}
for lr_col, short in _LR0100_VAR_MAP.items():
vec = getattr(self.lr0100, lr_col, None)
if vec is not None:
arr = np.asarray(vec)
cols[short] = arr
if n is None:
n = len(arr)
if n is None:
n = calendar.monthrange(y, m)[1] * 1440
idx = pd.date_range(
f"{y}-{m:02d}-01", periods=n,
freq=f"{self.resolution} min", tz="UTC",
)
return pd.DataFrame(cols, index=idx)
def _infer_resolution(self):
"""
Infer temporal resolution from ``lr0100`` vector length
vs calendar month.
"""
y, m = map(int, self.lr0100.yearMonth.split("-"))
ndays = calendar.monthrange(y, m)[1]
expected_1min = ndays * 1440
for lr_col in _LR0100_VAR_MAP:
vec = getattr(self.lr0100, lr_col, None)
if vec is not None:
n = len(vec)
return expected_1min // n
return 1
# ---------------------------------------------------------------------- #
# BSRNPlot Accessor Class
# ---------------------------------------------------------------------- #
[docs]
class BSRNPlot:
"""
Visualization accessor for BSRNDataset.
"""
[docs]
def __init__(self, ds: "BSRNDataset"):
self._ds = ds
def __call__(self, dates, output_file=None, **kwargs):
"""
Default to daily time series plot.
"""
return self.daily(dates, output_file=output_file, **kwargs)
[docs]
def daily(self, dates, output_file=None, **kwargs):
"""
Plot daily time series (day or booklet mode from *dates*).
Parameters
----------
dates : str, pd.Timestamp, or sequence
Date or dates to plot.
output_file : str
Output path for the figure.
Returns
-------
None
"""
import numpy as np
import pandas as pd
from .visualization.daily import (
plot_bsrn_daily_day,
plot_bsrn_daily_booklet,
)
df = self._ds.data()
is_multi = isinstance(dates, (list, tuple, pd.Index, np.ndarray)) and len(dates) > 1
if is_multi:
# Filter the dataframe to only include the requested dates
date_objs = pd.to_datetime(dates).date
mask = np.isin(df.index.date, date_objs)
filtered_df = df.loc[mask].copy()
return plot_bsrn_daily_booklet(
file_path=None,
output_file=output_file,
df=filtered_df,
**kwargs,
)
else:
# Single date given
date_to_plot = dates[0] if isinstance(dates, (list, tuple, pd.Index, np.ndarray)) else dates
return plot_bsrn_daily_day(
file_path=None,
day=date_to_plot,
output_file=output_file,
df=df,
**kwargs,
)
[docs]
def table(self, output_file=None, title=None):
"""
Plot the QC results summary table.
Parameters
----------
output_file : str, optional
Output path for the figure.
title : str, optional
Plot title.
Returns
-------
None
"""
from .visualization.table import plot_table
from .utils.quality import get_daily_stats
daily_stats = get_daily_stats(
self._ds.data(),
self._ds.lat,
self._ds.lon,
self._ds.elev,
station_code=self._ds.station_code,
)
return plot_table(daily_stats, title=title, output_file=output_file)