"""
Central BSRN dataset: one monthly station file as a typed, validated object.
Encapsulates station identity, resolved geographic metadata, and minute-
resolution data in a single Pydantic model. ``lr0100`` is the source of
truth for minute data; ``data()`` returns a cached ``DataFrame`` with
only the mean/value columns by default. LR0300 / LR4000 columns are
available on demand via ``data(include=[...])``. Most pipeline methods
(solar position, clear-sky, QC) mutate the cached frame in-place;
the ``average`` method replaces the cache with a coarser-index result
from :func:`~bsrn.utils.averaging.pretty_average`. Use :meth:`qc_test`
then optionally :meth:`qc_mask` to apply QC-based masking.
BSRN 中心数据集:将一个月度站点文件封装为带类型校验的对象。``lr0100``
为分钟数据源;``data()`` 返回仅含均值列的缓存 ``DataFrame``。
LR0300 / LR4000 列按需通过 ``data(include=[...])`` 获取。多数管线方法
原地修改缓存;可先 ``qc_test`` 再选 ``qc_mask``。``average`` 方法以
:func:`~bsrn.utils.averaging.pretty_average` 的较粗索引结果替换缓存。
"""
from __future__ import annotations
import calendar
from typing import Optional
import numpy as np
import pandas as pd
from pydantic import (BaseModel, ConfigDict, PrivateAttr,
field_validator, model_validator)
from .archive.records_models import LR0100, LR0300, LR4000
from .constants import BSRN_STATIONS
from .io.reader import read_bsrn_archive
# Variable maps: LR field name → short column name exposed by data().
# Only these columns appear; _std/_min/_max are dropped.
# 变量映射:LR 字段名 → data() 暴露的短列名。
_LR0100_VAR_MAP = {
"ghi_avg": "ghi", "bni_avg": "bni",
"dhi_avg": "dhi", "lwd_avg": "lwd",
"temperature": "temp", "humidity": "rh",
"pressure": "pressure",
}
_LR0300_VAR_MAP = {
"swu_avg": "swu", "lwu_avg": "lwu", "net_avg": "net",
}
_LR4000_VAR_MAP = {
"domeT1_down": "dt1d", "domeT2_down": "dt2d",
"domeT3_down": "dt3d", "bodyT_down": "btd",
"domeT1_up": "dt1u", "domeT2_up": "dt2u",
"domeT3_up": "dt3u", "bodyT_up": "btu",
}
[docs]
class BSRNDataset(BaseModel):
"""
One monthly BSRN dataset: station identity + minute data.
单月 BSRN 数据集:站点标识 + 分钟级数据。
Typical enrichment on the cached :meth:`data` frame is
:meth:`solpos`, then :meth:`clear_sky`, then :meth:`qc_test`
(each mutates that frame in place and returns it). Optional
:meth:`qc_mask` sets failed irradiance to NaN and can drop flag columns.
:meth:`average` replaces the cache with a coarser time series from
:func:`~bsrn.utils.averaging.pretty_average`.
常见流程为在缓存的 :meth:`data` 帧上依次调用
:meth:`solpos`、:meth:`clear_sky`、:meth:`qc_test`
(均原地修改该帧并返回同一对象)。可选 :meth:`qc_mask` 将未通过处辐照度置
NaN 并可删除标记列。:meth:`average` 以
:func:`~bsrn.utils.averaging.pretty_average` 的结果替换缓存(较粗时间索引)。
Parameters
----------
station_code : str
Three-letter BSRN station code (must exist in
``BSRN_STATIONS``).
三位 BSRN 站点代码(须在 ``BSRN_STATIONS`` 中)。
year : int
Four-digit measurement year.
四位测量年份。
month : int
Measurement month (1--12).
测量月份(1--12)。
lr0100 : LR0100
Validated LR0100 logical record (minute radiation and
met data). This is the source of truth; ``data()`` is
derived from it.
已校验的 LR0100 逻辑记录(分钟辐射与气象数据)。
为数据源;``data()`` 由其派生。
lr0300 : LR0300 or None
Validated LR0300 logical record (reflected / upward
SW, LW, net radiation). Default ``None``.
已校验的 LR0300 逻辑记录(反射/上行辐射)。
lr4000 : LR4000 or None
Validated LR4000 logical record (pyrgeometer minute
data). Default ``None``.
已校验的 LR4000 逻辑记录(长波表分钟数据)。
station_name : str or None
Resolved from ``BSRN_STATIONS`` when omitted.
省略时从 ``BSRN_STATIONS`` 解析。
lat : float or None
Latitude (degrees); resolved from ``BSRN_STATIONS``.
纬度(度);从 ``BSRN_STATIONS`` 解析。
lon : float or None
Longitude (degrees); resolved from ``BSRN_STATIONS``.
经度(度);从 ``BSRN_STATIONS`` 解析。
elev : float or None
Elevation (m above sea level); resolved from
``BSRN_STATIONS``.
海拔(米);从 ``BSRN_STATIONS`` 解析。
resolution : int or None
Temporal resolution in minutes (e.g. ``1``, ``2``,
``3``, ``5``). Defaults to ``1``.
时间分辨率(分钟);默认 ``1``。
Raises
------
ValueError
If ``station_code`` is not in ``BSRN_STATIONS``,
``month`` is out of 1--12.
``station_code`` 不在 ``BSRN_STATIONS`` 或 ``month``
不在 1--12 时。
"""
model_config = ConfigDict(
arbitrary_types_allowed=True,
frozen=False,
)
# ------------------------------------------------------------------ #
# Fields #
# ------------------------------------------------------------------ #
# Core (required): identity + minute radiation data.
# 核心(必填):站点标识 + 分钟辐射数据。
station_code: str
year: int
month: int
lr0100: LR0100
# Optional logical records. / 可选逻辑记录。
lr0300: Optional[LR0300] = None
lr4000: Optional[LR4000] = None
# Resolved from BSRN_STATIONS; users may override.
# 从 BSRN_STATIONS 解析;用户可覆盖。
station_name: str = None
lat: float = None
lon: float = None
elev: float = None
resolution: int = None
# Internal cached DataFrame; excluded from serialisation.
_df_cache: Optional[pd.DataFrame] = PrivateAttr(default=None)
# ------------------------------------------------------------------ #
# Validators #
# ------------------------------------------------------------------ #
@field_validator("station_code")
@classmethod
def _validate_station_code(cls, v):
code = v.upper()
if code not in BSRN_STATIONS:
raise ValueError(
f"unknown station code {v!r}; "
f"not in BSRN_STATIONS"
)
return code
@field_validator("month")
@classmethod
def _validate_month(cls, v):
if v < 1 or v > 12:
raise ValueError(
f"month must be 1--12, got {v}"
)
return v
@model_validator(mode="after")
def _resolve_metadata(self):
"""
Fill ``station_name``, ``lat``, ``lon``, ``elev`` from
``BSRN_STATIONS`` when not explicitly provided.
未显式传入时从 ``BSRN_STATIONS`` 填充站点名、经纬度、
海拔。
"""
meta = BSRN_STATIONS[self.station_code]
if self.station_name is None:
self.station_name = meta["name"]
if self.lat is None:
self.lat = meta["lat"]
if self.lon is None:
self.lon = meta["lon"]
if self.elev is None:
self.elev = meta["elev"]
if self.resolution is None:
self.resolution = self._infer_resolution()
return self
# ------------------------------------------------------------------ #
# Factory #
# ------------------------------------------------------------------ #
[docs]
@classmethod
def from_file(cls, path):
"""
Parse a BSRN ``.dat.gz`` station-to-archive file and return
a fully validated ``BSRNDataset``.
解析 BSRN ``.dat.gz`` 台站存档文件并返回完整校验的
``BSRNDataset``。
Parameters
----------
path : str or Path
Path to the ``.dat.gz`` file (filename format
``XXXMMYY.dat.gz``).
``.dat.gz`` 文件路径(文件名格式
``XXXMMYY.dat.gz``)。
Returns
-------
BSRNDataset
Raises
------
FileNotFoundError
If *path* does not exist.
ValueError
If the filename cannot be parsed or no LR0100 block
is found.
"""
return cls(**read_bsrn_archive(path))
# ------------------------------------------------------------------ #
# data() #
# ------------------------------------------------------------------ #
[docs]
def data(self, include=None):
"""
Minute-resolution DataFrame derived from ``lr0100``.
由 ``lr0100`` 派生的分钟分辨率 DataFrame。
The base frame contains only the LR0100 **mean / scalar**
columns under short names (``ghi``, ``bni``, ``dhi``,
``lwd``, ``temp``, ``rh``, ``pressure``). It is built once
and cached so that pipeline methods (``solpos``, ``average``, etc.)
can enrich it in-place.
基础帧仅包含 LR0100 均值/标量列的短名。首次构建后缓存,
管线方法可原地扩展。
Parameters
----------
include : sequence of str, optional
Extra logical records to merge: ``"lr0300"`` and/or
``"lr4000"`` (case-insensitive). When given, the
corresponding mean/value columns are appended.
要合并的额外逻辑记录(不区分大小写)。
Returns
-------
pandas.DataFrame
UTC ``DatetimeIndex``; default columns are LR0100
means only.
"""
if self._df_cache is None:
self._df_cache = self._build_base_frame()
if not include:
return self._df_cache
want = {s.lower() for s in include}
extra = {}
if "lr0300" in want and self.lr0300 is not None:
for lr_col, short in _LR0300_VAR_MAP.items():
vec = getattr(self.lr0300, lr_col, None)
if vec is not None:
extra[short] = np.asarray(vec)
if "lr4000" in want and self.lr4000 is not None:
for lr_col, short in _LR4000_VAR_MAP.items():
vec = getattr(self.lr4000, lr_col, None)
if vec is not None:
extra[short] = np.asarray(vec)
if not extra:
return self._df_cache
return self._df_cache.assign(**extra)
@property
def plot(self):
"""
Accessor for built-in plotting routines.
内置绘图程序的适配器。
"""
return BSRNPlot(self)
# ------------------------------------------------------------------ #
# Pipeline methods (delegate to standalone functions) #
# 管线方法(委托给独立函数) #
# ------------------------------------------------------------------ #
[docs]
def solpos(self):
"""
Add solar position and extraterrestrial irradiance columns
to the cached ``data()`` frame.
向缓存的 ``data()`` 帧添加太阳位置和地外辐射列。
Delegates to
:func:`~bsrn.physics.geometry.add_solpos_columns` using
the resolved ``lat``, ``lon``, ``elev``.
Returns
-------
pandas.DataFrame
``data()`` with added columns: ``zenith``,
``apparent_zenith``, ``azimuth``, ``bni_extra``,
``ghi_extra``.
"""
from .physics.geometry import add_solpos_columns
return add_solpos_columns(
self.data(), station_code=self.station_code,
lat=self.lat, lon=self.lon, elev=self.elev,
)
[docs]
def clear_sky(self, model="ineichen", mcclear_email=None):
"""
Add clear-sky irradiance columns to the cached ``data()``
frame.
向缓存的 ``data()`` 帧添加晴空辐射列。
Delegates to
:func:`~bsrn.modeling.clear_sky.add_clearsky_columns`.
Parameters
----------
model : str, optional
Clear-sky model name (default ``'ineichen'``).
晴空模型名称(默认 ``'ineichen'``)。
mcclear_email : str, optional
E-mail for CAMS McClear API (only when
``model='mcclear'``).
CAMS McClear API 邮箱(仅 ``model='mcclear'``
时使用)。
Returns
-------
pandas.DataFrame
``data()`` with added clear-sky columns
(``ghi_clear``, ``bni_clear``, ``dhi_clear``, …).
"""
from .modeling.clear_sky import add_clearsky_columns
return add_clearsky_columns(
self.data(), station_code=self.station_code,
lat=self.lat, lon=self.lon, elev=self.elev,
model=model, mcclear_email=mcclear_email,
)
[docs]
def qc_test(self, tests=('ppl', 'erl', 'closure',
'diff_ratio', 'k_index', 'tracker')):
"""
Run QC tests and add flag columns to the cached ``data()``
frame.
运行 QC 测试并向缓存的 ``data()`` 帧添加标志列。
Delegates to :func:`~bsrn.qc.wrapper.run_qc`.
Parameters
----------
tests : tuple of str, optional
QC test names to run (default: all six).
要运行的 QC 测试名称(默认全部六项)。
Returns
-------
pandas.DataFrame
``data()`` with added ``flag_*`` columns.
"""
from .qc.wrapper import run_qc
return run_qc(
self.data(), station_code=self.station_code,
lat=self.lat, lon=self.lon, elev=self.elev,
tests=tests,
)
[docs]
def qc_mask(self, flag_remove=True):
"""
Set irradiance values to NaN where QC flags fail; optionally drop
flag columns.
在未通过 QC 处将辐照度置 NaN;可选删除标记列。
Call :meth:`qc_test` first so flag columns exist. Delegates to
:func:`~bsrn.qc.wrapper.mask_failed_irradiance` on ``data()``.
须先调用 :meth:`qc_test` 以生成标记列。委托
:func:`~bsrn.qc.wrapper.mask_failed_irradiance` 作用于 ``data()``。
Parameters
----------
flag_remove : bool, optional
If True (default), drop standard QC flag columns after masking.
为 True(默认)时掩膜后删除标准 QC 标记列。
Returns
-------
pandas.DataFrame
``data()`` after masking (same cached object).
掩膜后的 ``data()``(同一缓存对象)。
"""
from .qc.wrapper import mask_failed_irradiance
return mask_failed_irradiance(self.data(), flag_remove=flag_remove)
[docs]
def average(self, freq, alignment="ceiling", aggfunc="mean",
match_ceiling_labels=True):
"""
Time-average the cached ``data()`` with explicit labeled windows.
使用显式标签窗对缓存的 ``data()`` 做时间平均。
Delegates to :func:`~bsrn.utils.averaging.pretty_average` and
**replaces** the internal cache with the returned frame (new index).
委托 :func:`~bsrn.utils.averaging.pretty_average`,并以返回帧**替换**
内部缓存(新索引)。
Native timestep for **center** windows is taken from ``self.resolution``
(minutes) when set; otherwise passed as ``None`` for
:func:`~bsrn.utils.averaging.pretty_average` to infer.
**center** 窗的原生步长取自 ``self.resolution``(分钟);未设置时传
``None`` 由 :func:`~bsrn.utils.averaging.pretty_average` 推断。
Parameters
----------
freq : str
Fixed bin frequency (e.g. ``'1h'``, ``'30min'``).
固定分箱频率。
alignment : {'floor', 'ceiling', 'center'}, optional
Window alignment (default ``'ceiling'``).
窗对齐方式(默认 ``'ceiling'``)。
aggfunc : str or callable, optional
Aggregation function (default ``'mean'``).
聚合函数(默认 ``'mean'``)。
match_ceiling_labels : bool, optional
When ``alignment='center'``, monthly edge trim style (default
``True``, ceiling-like).
``alignment='center'`` 时的月界裁剪方式(默认 ``True``,类 ceiling)。
Returns
-------
pandas.DataFrame
One row per output label; also stored as the new cache.
Raises
------
TypeError
If ``data().index`` is not a :class:`~pandas.DatetimeIndex`.
``data()`` 索引非 :class:`~pandas.DatetimeIndex` 时。
ValueError
Propagated from :func:`~bsrn.utils.averaging.pretty_average` when
``freq`` is not a fixed frequency.
``freq`` 非固定频率等由 ``pretty_average`` 抛出。
"""
from .utils.averaging import pretty_average
res = None
if self.resolution is not None:
res = pd.Timedelta(minutes=int(self.resolution))
out = pretty_average(
self.data(), freq, alignment=alignment, aggfunc=aggfunc,
resolution=res, match_ceiling_labels=match_ceiling_labels,
)
self._df_cache = out
return out
# ------------------------------------------------------------------ #
# Private helpers #
# ------------------------------------------------------------------ #
def _build_base_frame(self):
"""Build the LR0100-means-only DataFrame (called once)."""
y, m = map(int, self.lr0100.yearMonth.split("-"))
n = None
cols = {}
for lr_col, short in _LR0100_VAR_MAP.items():
vec = getattr(self.lr0100, lr_col, None)
if vec is not None:
arr = np.asarray(vec)
cols[short] = arr
if n is None:
n = len(arr)
if n is None:
n = calendar.monthrange(y, m)[1] * 1440
idx = pd.date_range(
f"{y}-{m:02d}-01", periods=n,
freq=f"{self.resolution} min", tz="UTC",
)
return pd.DataFrame(cols, index=idx)
def _infer_resolution(self):
"""
Infer temporal resolution from ``lr0100`` vector length
vs calendar month.
根据 ``lr0100`` 向量长度与日历月推断时间分辨率。
"""
y, m = map(int, self.lr0100.yearMonth.split("-"))
ndays = calendar.monthrange(y, m)[1]
expected_1min = ndays * 1440
for lr_col in _LR0100_VAR_MAP:
vec = getattr(self.lr0100, lr_col, None)
if vec is not None:
n = len(vec)
return expected_1min // n
return 1
# ---------------------------------------------------------------------- #
# BSRNPlot Accessor Class
# ---------------------------------------------------------------------- #
[docs]
class BSRNPlot:
"""
Visualization accessor for BSRNDataset.
BSRNDataset 的可视化适配器。
"""
[docs]
def __init__(self, ds: "BSRNDataset"):
self._ds = ds
def __call__(self, dates, output_file=None, **kwargs):
"""
Default to daily time series plot.
默认为画日时间序列图。
"""
return self.daily(dates, output_file=output_file, **kwargs)
[docs]
def daily(self, dates, output_file=None, **kwargs):
"""
Plot daily daily plots (automatically delegates to day or booklet mode).
画时间序列图(根据输入的日期自动生成单日图或多页手册图)。
Parameters
----------
dates : str, pd.Timestamp, or sequence
Date or dates to plot.
绘图日期或日期序列。
output_file : str
Output path for the plot.
输出图像的路径。
Returns
-------
None
"""
import numpy as np
import pandas as pd
from .visualization.daily import (
plot_bsrn_daily_day,
plot_bsrn_daily_booklet,
)
df = self._ds.data()
is_multi = isinstance(dates, (list, tuple, pd.Index, np.ndarray)) and len(dates) > 1
if is_multi:
# Filter the dataframe to only include the requested dates
date_objs = pd.to_datetime(dates).date
mask = np.isin(df.index.date, date_objs)
filtered_df = df.loc[mask].copy()
return plot_bsrn_daily_booklet(
file_path=None,
output_file=output_file,
df=filtered_df,
**kwargs,
)
else:
# Single date given
date_to_plot = dates[0] if isinstance(dates, (list, tuple, pd.Index, np.ndarray)) else dates
return plot_bsrn_daily_day(
file_path=None,
day=date_to_plot,
output_file=output_file,
df=df,
**kwargs,
)
[docs]
def table(self, output_file=None, title=None):
"""
Plot the QC results summary table.
画质量控制结果汇总表。
Parameters
----------
output_file : str, optional
Output path for the plot.
输出图像的路径。
title : str, optional
Plot title.
图表标题。
Returns
-------
None
"""
from .visualization.table import plot_table
from .utils.quality import get_daily_stats
daily_stats = get_daily_stats(
self._ds.data(),
self._ds.lat,
self._ds.lon,
self._ds.elev,
station_code=self._ds.station_code,
)
return plot_table(daily_stats, title=title, output_file=output_file)