Source code for bsrn.visualization.daily

import pandas as pd
import numpy as np
from plotnine import (
    ggplot,
    aes,
    geom_line,
    geom_point,
    geom_hline,
    geom_ribbon,
    facet_wrap,
    theme,
    element_text,
    theme_minimal,
    labs,
    scale_x_datetime,
    scale_color_manual,
    scale_shape_manual,
    guides,
    guide_legend,
    element_line,
)
from matplotlib.backends.backend_pdf import PdfPages
import matplotlib.pyplot as plt

from bsrn.constants import WONG_PALETTE

from bsrn.dataset import BSRNDataset

# L1–L6 QC markers: WONG[2–6] for L1–5, black for L6 (WONG[0–1] reserved for line series).
_QC_MARKER_BLACK = "#000000"
_QC_LEVEL_COLORS = {
    "L1": WONG_PALETTE[2],
    "L2": WONG_PALETTE[3],
    "L3": WONG_PALETTE[4],
    "L4": WONG_PALETTE[5],
    "L5": WONG_PALETTE[6],
    "L6": _QC_MARKER_BLACK,
}
_QC_LEVEL_SHAPES = {
    "L1": "o",
    "L2": "^",
    "L3": "s",
    "L4": "D",
    "L5": "v",
    "L6": "p",
}
# Plotted markers stay small; legend keys use 2× size for readability.
_QC_MARKER_SIZE = 0.8
_QC_MARKER_STROKE = 0.12
_QC_MARKER_LEGEND_SIZE = _QC_MARKER_SIZE * 2
_QC_MARKER_LEGEND_STROKE = _QC_MARKER_STROKE * 2

# LR_SPECS missing tokens for plot-only masking (does not alter :meth:`BSRNDataset.data`).
_PLOT_MISSING_I4 = frozenset(
    {"ghi", "bni", "dhi", "lwd", "pressure", "swu", "lwu", "net"}
)
_PLOT_MISSING_F51 = frozenset({"temp", "rh"})
_PLOT_MISSING_LR4000_TEMPS = frozenset(
    {"dt1d", "dt2d", "dt3d", "btd", "dt1u", "dt2u", "dt3u", "btu"}
)


def _mask_bsrn_missing_for_plot(df: pd.DataFrame) -> pd.DataFrame:
    """Replace BSRN archive sentinels with NaN for line/point layers only."""
    out = df.copy()
    for c in _PLOT_MISSING_I4:
        if c not in out.columns:
            continue
        col = pd.to_numeric(out[c], errors="coerce")
        out[c] = col.mask(col == -999, np.nan)
    for c in _PLOT_MISSING_F51:
        if c not in out.columns:
            continue
        col = pd.to_numeric(out[c], errors="coerce")
        out[c] = col.mask(np.isclose(col, -99.9, rtol=0, atol=0.05), np.nan)
    for c in _PLOT_MISSING_LR4000_TEMPS:
        if c not in out.columns:
            continue
        col = pd.to_numeric(out[c], errors="coerce")
        out[c] = col.mask(np.isclose(col, -99.99, rtol=0, atol=0.01), np.nan)
    return out


def _qc_marker_slices(df, mask, ycol, param_facet, level, chunks):
    """Append marker rows for timestamps where mask is True and y is finite."""
    if ycol not in df.columns or mask is None or not mask.any():
        return
    m = mask.fillna(False) & df[ycol].notna()
    if not m.any():
        return
    sub = df.loc[m]
    chunks.append(
        pd.DataFrame(
            {
                "time": sub.index,
                "parameter": param_facet,
                "qc_y": sub[ycol].to_numpy(),
                "qc_level": f"L{level}",
            }
        )
    )


def _build_qc_marker_frame(day_df, day_zenith):
    """
    Per-minute QC failures from existing ``flag*`` columns only (no ``run_qc``).
    """
    df = day_df.copy()
    df["zenith"] = day_zenith.reindex(df.index).to_numpy()
    chunks = []

    pairs_l1 = [
        ("flagPPLGHI", "ghi", "GHI"),
        ("flagPPLBNI", "bni", "BNI"),
        ("flagPPLDHI", "dhi", "DHI"),
        ("flagPPLLWD", "lwd", "LWD"),
    ]
    for flg, ycol, pan in pairs_l1:
        if flg in df.columns:
            _qc_marker_slices(df, df[flg] == 1, ycol, pan, 1, chunks)

    pairs_l2 = [
        ("flagERLGHI", "ghi", "GHI"),
        ("flagERLBNI", "bni", "BNI"),
        ("flagERLDHI", "dhi", "DHI"),
        ("flagERLLWD", "lwd", "LWD"),
    ]
    for flg, ycol, pan in pairs_l2:
        if flg in df.columns:
            _qc_marker_slices(df, df[flg] == 1, ycol, pan, 2, chunks)

    if "flag3lowSZA" in df.columns and "flag3highSZA" in df.columns:
        m3 = (df["flag3lowSZA"] == 1) | (df["flag3highSZA"] == 1)
        for ycol, pan in (("ghi", "GHI"), ("bni", "BNI"), ("dhi", "DHI")):
            _qc_marker_slices(df, m3, ycol, pan, 3, chunks)
        _qc_marker_slices(df, m3, "gh_diff", "GHI-SUM", 3, chunks)

    if all(c in df.columns for c in ("flagKKt", "flagKlowSZA", "flagKhighSZA")):
        m4 = (df["flagKKt"] == 1) | (df["flagKlowSZA"] == 1) | (df["flagKhighSZA"] == 1)
        _qc_marker_slices(df, m4, "ghi", "GHI", 4, chunks)
        _qc_marker_slices(df, m4, "dhi", "DHI", 4, chunks)
        z = df["zenith"] if "zenith" in df.columns else None
        if z is not None:
            gr = df["gh_ratio"].where(z < 90)
        else:
            gr = df["gh_ratio"]
        tmp = df.copy()
        tmp["_ghr_plot"] = gr
        _qc_marker_slices(tmp, m4, "_ghr_plot", "GHI/SUM", 4, chunks)

    if all(c in df.columns for c in ("flagKbKt", "flagKb", "flagKt")):
        m5 = (df["flagKbKt"] == 1) | (df["flagKb"] == 1) | (df["flagKt"] == 1)
        _qc_marker_slices(df, m5, "ghi", "GHI", 5, chunks)
        _qc_marker_slices(df, m5, "bni", "BNI", 5, chunks)

    if "flagTracker" in df.columns:
        _qc_marker_slices(df, df["flagTracker"] == 1, "ghi", "GHI", 6, chunks)
        _qc_marker_slices(df, df["flagTracker"] == 1, "bni", "BNI", 6, chunks)

    if not chunks:
        return None
    out = pd.concat(chunks, ignore_index=True)
    if out.empty:
        return None
    cat_lvls = [f"L{i}" for i in range(1, 7)]
    out["qc_level"] = pd.Categorical(out["qc_level"], categories=cat_lvls, ordered=True)
    return out


def _load_month_archive_for_daily(file_path=None, df=None):
    """
    Load one month and add closure diagnostics (GHI vs sum).

    Requires ``zenith`` and ``apparent_zenith`` on the frame (from
    :meth:`~bsrn.dataset.BSRNDataset.solpos`). This module does not compute
    geometry, PPL masks, clear-sky, or QC. For readability, LR missing sentinels
    (``-999``, ``-99.9``, ``-99.99``, etc.) are replaced with NaN on a **copy**;
    :meth:`~bsrn.dataset.BSRNDataset.data` cache is not modified.
    """
    if df is not None:
        plot_df = df.copy()
    else:
        plot_df = BSRNDataset.from_file(file_path).data()

    plot_df = _mask_bsrn_missing_for_plot(plot_df)

    unique_months = np.unique(plot_df.index.to_period("M"))
    if len(unique_months) != 1:
        raise ValueError(
            f"Input file must contain exactly one month of data. "
            f"Found {len(unique_months)} months: {unique_months}"
        )

    if "zenith" not in plot_df.columns or "apparent_zenith" not in plot_df.columns:
        raise ValueError(
            "Daily plots require ``zenith`` and ``apparent_zenith`` on the DataFrame. "
            "Call :meth:`BSRNDataset.solpos` before plotting."
        )
    zenith = plot_df["zenith"]
    apparent_zenith = plot_df["apparent_zenith"]
    mu0 = np.cos(np.radians(apparent_zenith))

    plot_df["sum_irrad"] = plot_df["bni"] * mu0 + plot_df["dhi"]
    plot_df["gh_ratio"] = plot_df["ghi"] / plot_df["sum_irrad"]
    plot_df["gh_diff"] = plot_df["ghi"] - plot_df["sum_irrad"]

    return plot_df.sort_index(), zenith


def _ggplot_bsrn_daily_one_day(day_df, day_zenith, title=None,
                                show_qc_markers=True):
    """
    Build the standard 3×3 facet times ggplot for one UTC day.

    Clear-sky ribbon/lines are drawn only when the corresponding ``*_clear`` columns exist on ``day_df``.
    QC markers are drawn only when ``show_qc_markers`` and matching ``flag*`` columns exist.
    """
    measured_color = WONG_PALETTE[0]
    clearsky_color = WONG_PALETTE[1]
    ribbon_color = clearsky_color
    width_inch = 160 / 25.4
    height_inch = width_inch / 1.8

    day_df = day_df.copy()
    day_work = day_df.reset_index()
    _tcol = day_work.columns[0]
    if _tcol != "time":
        day_work = day_work.rename(columns={_tcol: "time"})

    main_vars = ["ghi", "bni", "dhi"]
    if "lwd" in day_work.columns:
        main_vars.append("lwd")
    clear_vars = [
        c
        for c in ("ghi_clear", "bni_clear", "dhi_clear", "lwd_clear")
        if c in day_work.columns
    ]

    day_main_measured = day_work.melt(
        id_vars=["time"],
        value_vars=main_vars,
        var_name="parameter",
        value_name="measured",
    )
    day_main_measured["parameter"] = day_main_measured["parameter"].str.upper()
    if clear_vars:
        day_main_clear = day_work.melt(
            id_vars=["time"],
            value_vars=clear_vars,
            var_name="parameter",
            value_name="clearsky",
        )
        day_main_clear["parameter"] = (
            day_main_clear["parameter"].str.replace("_clear", "").str.upper()
        )
        day_main = pd.merge(
            day_main_measured, day_main_clear, on=["time", "parameter"], how="left"
        )
    else:
        day_main = day_main_measured.copy()
        day_main["clearsky"] = np.nan

    day_df_diag = day_df.copy()
    day_df_diag.loc[day_zenith >= 90, "gh_ratio"] = np.nan

    diag_vars = [
        c
        for c in ("gh_diff", "gh_ratio", "temp", "rh", "pressure")
        if c in day_df_diag.columns
    ]
    diag_work = day_df_diag.reset_index()
    _t2 = diag_work.columns[0]
    if _t2 != "time":
        diag_work = diag_work.rename(columns={_t2: "time"})
    day_diag = diag_work.melt(
        id_vars=["time"],
        value_vars=diag_vars,
        var_name="parameter",
        value_name="value",
    )
    name_map = {
        "gh_diff": "GHI-SUM",
        "gh_ratio": "GHI/SUM",
        "temp": "TMP",
        "rh": "RH",
        "pressure": "SP",
    }
    day_diag["parameter"] = day_diag["parameter"].map(name_map)

    all_params = ["GHI", "BNI", "DHI", "GHI-SUM", "GHI/SUM", "LWD", "TMP", "RH", "SP"]
    cat_type = pd.CategoricalDtype(categories=all_params, ordered=True)

    day_diag_renamed = day_diag.rename(columns={"value": "measured"})
    day_diag_renamed["clearsky"] = np.nan
    day_all = pd.concat([day_main, day_diag_renamed], ignore_index=True)
    day_all["parameter"] = day_all["parameter"].astype(cat_type)

    marker_df = None
    if show_qc_markers:
        marker_df = _build_qc_marker_frame(day_df, day_zenith)
        if marker_df is not None:
            marker_df = marker_df.copy()
            marker_df["parameter"] = marker_df["parameter"].astype(cat_type)

    day_thresh = pd.DataFrame(
        {
            "time": day_df.index,
            "upper": np.where(day_zenith < 75, 1.08, 1.15),
            "lower": np.where(day_zenith < 75, 0.92, 0.85),
            "parameter": "GHI/SUM",
        },
        index=day_df.index,
    )
    day_thresh["parameter"] = day_thresh["parameter"].astype(cat_type)
    day_thresh.loc[day_zenith >= 90, ["upper", "lower"]] = np.nan

    hline_df = pd.DataFrame({"parameter": ["GHI/SUM"], "y": [1.0]})
    hline_df["parameter"] = hline_df["parameter"].astype(cat_type)

    cs_data = day_all.dropna(subset=["clearsky"])
    cs_sw = cs_data[cs_data["parameter"].isin(["GHI", "BNI", "DHI"])]

    qc_levels = [f"L{i}" for i in range(1, 7)]
    legend_pos = "bottom" if marker_df is not None and not marker_df.empty else "none"
    if legend_pos == "bottom":
        height_inch = height_inch + 0.35

    layers = [ggplot(day_all, aes(x="time"))]
    if not cs_data.empty:
        if not cs_sw.empty:
            layers.append(
                geom_ribbon(
                    data=cs_sw,
                    mapping=aes(ymin=0, ymax="clearsky"),
                    fill=ribbon_color,
                    alpha=0.25,
                )
            )
        layers.append(
            geom_line(
                data=cs_data,
                mapping=aes(y="clearsky"),
                color=clearsky_color,
                size=0.3,
            )
        )
    layers.append(geom_line(aes(y="measured"), color=measured_color, size=0.3))
    if marker_df is not None and not marker_df.empty:
        layers.append(
            geom_point(
                data=marker_df,
                mapping=aes(x="time", y="qc_y", color="qc_level", shape="qc_level"),
                size=_QC_MARKER_SIZE,
                stroke=_QC_MARKER_STROKE,
                na_rm=True,
            )
        )
        layers.append(
            scale_color_manual(
                name="QC level",
                breaks=qc_levels,
                values=[_QC_LEVEL_COLORS[k] for k in qc_levels],
            )
        )
        layers.append(
            scale_shape_manual(
                name="QC level",
                breaks=qc_levels,
                values=[_QC_LEVEL_SHAPES[k] for k in qc_levels],
            )
        )
        layers.append(
            guides(
                color=guide_legend(nrow=1),
                shape=guide_legend(
                    nrow=1,
                    override_aes={
                        "size": _QC_MARKER_LEGEND_SIZE,
                        "stroke": _QC_MARKER_LEGEND_STROKE,
                    },
                ),
            )
        )
    layers.extend(
        [
            geom_line(
                data=day_thresh,
                mapping=aes(y="upper"),
                color="#999999",
                size=0.3,
                linetype="dashed",
            ),
            geom_line(
                data=day_thresh,
                mapping=aes(y="lower"),
                color="#999999",
                size=0.3,
                linetype="dashed",
            ),
            geom_hline(
                data=hline_df, mapping=aes(yintercept="y"), color="#999999", size=0.2
            ),
            facet_wrap("~parameter", nrow=3, ncol=3, scales="free_y", drop=False),
            labs(
                **(
                    {"x": "Time (UTC)", "y": "Value", "title": title}
                    if title is not None
                    else {"x": "Time (UTC)", "y": "Value"}
                )
            ),
            theme_minimal(),
            theme(
                **(
                    {
                        "text": element_text(
                            family="Times New Roman", size=9, face="plain"
                        ),
                        "axis_title": element_text(size=9, face="plain"),
                        "axis_text": element_text(size=9, face="plain"),
                        "plot_title": element_text(
                            size=9, face="plain", margin={"b": 10}
                        ),
                        "legend_position": legend_pos,
                        "legend_title": element_text(size=9, face="plain"),
                        "legend_text": element_text(size=9, face="plain"),
                        "figure_size": (width_inch, height_inch),
                        "panel_grid_minor": element_line(alpha=0),
                        "strip_text": element_text(size=9, face="plain"),
                        "axis_text_x": element_text(
                            rotation=0, size=9, face="plain"
                        ),
                    }
                    | (
                        {
                            # Tight legend under facets, but leave room so it does not cover x title.
                            "legend_margin": 0,
                            "legend_box_spacing": 0,
                            "axis_title_x": element_text(
                                size=9,
                                face="plain",
                                margin={"t": 4, "b": 8},
                            ),
                        }
                        if legend_pos == "bottom"
                        else {}
                    )
                )
            ),
            scale_x_datetime(date_labels="%H:%M"),
        ]
    )
    return sum(layers[1:], layers[0])


[docs] def plot_bsrn_daily_day(file_path, day, show_qc_markers=True, output_file=None, title=None, df=None): """ Plot one UTC day from a single-month BSRN archive (same layout as each booklet page). The frame (from ``df`` or loaded from ``file_path``) must already include ``zenith`` and ``apparent_zenith`` (e.g. after :meth:`BSRNDataset.solpos`). Parameters ---------- file_path : str, optional Path to a single-month archive when ``df`` is not passed; otherwise ignored. day : str, datetime.date, datetime.datetime, or pd.Timestamp Calendar day to plot (UTC), e.g. ``"2024-01-15"`` or ``pd.Timestamp("2024-01-15")``. show_qc_markers : bool, default True If True, overlay QC failure markers where matching ``flag*`` columns exist (no QC is run here; use :meth:`~bsrn.dataset.BSRNDataset.qc_test` first). output_file : str, optional If set, save the figure (e.g. ``".pdf"`` or ``".png"``) via plotnine. title : str, optional Plot title. If None (default), no title is drawn. Returns ------- p : plotnine.ggplot.ggplot The figure; display in notebooks with the last expression or call ``.draw()``. """ plot_df, zenith = _load_month_archive_for_daily(file_path=file_path, df=df) idx_tz = getattr(plot_df.index, "tz", None) day_anchor = pd.Timestamp(day) if idx_tz is not None: if day_anchor.tzinfo is None: day_anchor = day_anchor.tz_localize(idx_tz) else: day_anchor = day_anchor.tz_convert(idx_tz) day_start = day_anchor.floor("D") day_end = day_start + pd.Timedelta(days=1) mask = (plot_df.index >= day_start) & (plot_df.index < day_end) day_df = plot_df.loc[mask] if day_df.empty: src = file_path if file_path is not None else "<dataframe>" raise ValueError( f"No rows for UTC day {day_start.date()} in {src!r} " f"(index range {plot_df.index.min()}{plot_df.index.max()})." ) day_zenith = zenith.loc[day_df.index] p = _ggplot_bsrn_daily_one_day( day_df, day_zenith, title, show_qc_markers=show_qc_markers, ) if output_file is not None: p.save(output_file, dpi=300) return p
[docs] def plot_bsrn_daily_booklet(file_path, output_file, show_qc_markers=True, title=None, df=None): """ Generate a multi-page PDF booklet where each day is one page from a BSRN archive file. The frame must include ``zenith`` and ``apparent_zenith`` (see :meth:`BSRNDataset.solpos`). Parameters ---------- file_path : str Path to the BSRN station-to-archive file (.dat.gz). output_file : str Path to the output PDF file. show_qc_markers : bool, default True Draw QC markers per day when ``flag*`` columns are present. title : str, optional Plot title on every page. If None (default), no title. Returns ------- None The function saves the plots to the specified PDF file. """ plot_df, zenith = _load_month_archive_for_daily(file_path=file_path, df=df) print(f"Generating PDF booklet: {output_file}...") with PdfPages(output_file) as pdf: for _, day_df in plot_df.groupby(plot_df.index.date): day_zenith = zenith.loc[day_df.index] p = _ggplot_bsrn_daily_one_day( day_df, day_zenith, title, show_qc_markers=show_qc_markers, ) fig = p.draw() pdf.savefig(fig, bbox_inches="tight") plt.close(fig) print("Done.")