"""
Visualization of irradiance separation: k vs kt scatter (Erbs, BRL, etc.).
"""
import pandas as pd
import numpy as np
import matplotlib as mpl
from plotnine import (
ggplot, aes, geom_tile,
theme_minimal, theme,
element_text, element_blank, labs,
scale_x_continuous, scale_y_continuous,
scale_fill_cmap, coord_fixed, facet_wrap
)
from bsrn.physics import geometry
from bsrn.utils.calculations import calc_kt
from bsrn.modeling import erbs_separation, brl_separation, engerer2_separation, yang4_separation
# Supported model names for k vs kt plot (Erbs, BRL, Engerer2, Yang4).
SUPPORTED_SEPARATION_MODELS = ("erbs", "brl", "engerer2", "yang4")
# Display names and facet order for k vs kt plot.
MODEL_DISPLAY_NAMES = {"erbs": "Erbs", "brl": "BRL", "engerer2": "Engerer2", "yang4": "Yang4"}
FACET_ORDER = ("Erbs", "BRL", "Engerer2", "Yang4")
[docs]
def plot_k_vs_kt(df, models, lat, lon, ghi_col="ghi", dhi_col="dhi", k_mod_cols=None,
output_file=None, title=None):
"""
Faceted scatter plot of k (diffuse fraction) vs kt (clearness index) from a DataFrame.
One panel per model; measured (gray) and model (colored) in each panel.
Parameters
----------
df : pd.DataFrame
Input data with DatetimeIndex and at least ``ghi_col`` and ``dhi_col``.
models : sequence of str
Model names to plot, e.g. ``('erbs', 'brl')``. Each in SUPPORTED_SEPARATION_MODELS.
lat : float
Latitude. [degrees]
lon : float
Longitude. [degrees]
ghi_col : str, default "ghi"
Column name for GHI.
dhi_col : str, default "dhi"
Column name for DHI.
k_mod_cols : dict or None, optional
Map model name to column name for model k. If None, runs each separation model.
For ``engerer2`` and ``yang4``, k must be pre-computed and provided here (both need clear-sky GHI).
output_file : str, optional
Path to save the figure.
title : str, optional
Overall plot title.
Notes
-----
To include Engerer2 or Yang4 (both require clear-sky GHI): add clear-sky GHI to ``df``
(e.g. :func:`bsrn.physics.clearsky.add_clearsky_columns`), run the separation with
``ghi_clear=df["ghi_clear"].values``, assign the result's ``"k"`` to a column, then pass
``k_mod_cols={"engerer2": "k_engerer2", "yang4": "k_yang4"}``.
Returns
-------
p : plotnine.ggplot
The ggplot object. If ``output_file`` was set, this is a **new** instance
(the one used for saving is not safe to draw again after ``plt.close``).
"""
if not isinstance(df.index, pd.DatetimeIndex):
raise ValueError("df must have a DatetimeIndex.")
for col, name in [(ghi_col, "ghi"), (dhi_col, "dhi")]:
if col not in df.columns:
raise ValueError(f"DataFrame missing column '{col}' ({name}).")
models = tuple(m.strip().lower() for m in models)
if not models:
raise ValueError("models must be a non-empty sequence.")
for m in models:
if m not in SUPPORTED_SEPARATION_MODELS:
raise ValueError(
f"model must be one of {SUPPORTED_SEPARATION_MODELS}, got {m!r}."
)
use = df[[ghi_col, dhi_col]].dropna()
if len(use) == 0:
raise ValueError("No valid ghi/dhi rows after dropna.")
times = use.index
ghi = np.asarray(use[ghi_col], dtype=float)
dhi = np.asarray(use[dhi_col], dtype=float)
k_meas = np.full_like(ghi, np.nan, dtype=float)
pos = ghi > 0
if np.any(pos):
# Only index-wise divide (not np.where / ufunc where=), avoids divide-by-zero warnings.
k_meas[pos] = dhi[pos] / ghi[pos]
k_meas = np.clip(k_meas, 0.0, 1.0)
zenith = np.asarray(
geometry.get_solar_position(times, lat, lon)["zenith"], dtype=float
)
bni_extra = np.asarray(geometry.get_bni_extra(times), dtype=float)
kt = calc_kt(ghi, zenith, bni_extra, min_mu0=0.065, max_clearness_index=1.0)
day = zenith < 87
kt = kt[day]
k_meas = k_meas[day]
times_day = times[day]
ghi_day = ghi[day]
n = len(kt)
k_mod_cols = k_mod_cols or {}
rows = []
for model in models:
if model in k_mod_cols and k_mod_cols[model] in df.columns:
k_mod = np.asarray(df.loc[use.index, k_mod_cols[model]], dtype=float)[day]
else:
if model == "erbs":
result = erbs_separation(times_day, ghi_day, lat, lon)
elif model == "brl":
result = brl_separation(times_day, ghi_day, lat, lon)
elif model == "engerer2":
raise ValueError(
"engerer2 requires pre-computed k. Run engerer2_separation and pass column in k_mod_cols."
)
elif model == "yang4":
raise ValueError(
"yang4 requires pre-computed k. Run yang4_separation and pass column in k_mod_cols."
)
else:
raise ValueError(f"Unknown model {model!r}.")
k_mod = np.asarray(result["k"], dtype=float)
label = MODEL_DISPLAY_NAMES[model]
for i in range(n):
rows.append({"kt": kt[i], "k": k_meas[i], "model": label, "source": "measured"})
rows.append({"kt": kt[i], "k": k_mod[i], "model": label, "source": "model"})
plot_df = pd.DataFrame(rows)
# Fix facet order: Erbs, BRL, Engerer2, Yang4 (only include present models).
order = [x for x in FACET_ORDER if x in plot_df["model"].values]
plot_df["model"] = pd.Categorical(plot_df["model"], categories=order, ordered=True)
# Scattermore-style: rasterize points to a fixed grid for clean, fast plots.
df_meas = plot_df[plot_df["source"] == "measured"].dropna().copy()
df_mod = plot_df[plot_df["source"] == "model"].dropna().copy()
df_mod = df_mod[df_mod["k"] < 1].copy()
# Density per model on grid for raster fill (like R MASS::kde2d).
parts = []
for _, grp in df_mod.groupby("model", observed=True):
grp = grp.copy()
grp["density"] = _get_density(grp["kt"].values, grp["k"].values, n=200)
parts.append(grp)
df_mod = pd.concat(parts)
n_pixels = 512 # Scattermore-style resolution (like geom_scattermore pixels).
facet_order = [x for x in FACET_ORDER if x in plot_df["model"].values]
raster_meas = _points_to_raster(
df_meas, "kt", "k", value=None, n=n_pixels,
x_range=(0, 1), y_range=(0, 1), fill_style="gray", model_col="model",
facet_models=facet_order
)
raster_mod = _points_to_raster(
df_mod, "kt", "k", value="density", n=n_pixels,
x_range=(0, 1), y_range=(0, 1), fill_style="viridis", model_col="model"
)
# Figure width 160 mm (standard journal column width)
n_facets = plot_df["model"].nunique()
width_mm = 160.0
width_inch = width_mm / 25.4
panel_width_inch = width_inch / n_facets
fig_h = panel_width_inch * 0.9 + 0.35 # Extra height for legend bar below.
# Use STIX math font so $k$ / $k_t$ match Times New Roman text (plotnine uses matplotlib).
prev_fontset = mpl.rcParams.get("mathtext.fontset")
try:
mpl.rcParams["mathtext.fontset"] = "stix"
cell_w = 1.0 / n_pixels
cell_h = 1.0 / n_pixels
def _ggplot():
g = (
ggplot()
+ geom_tile(
data=raster_meas,
mapping=aes(x="x", y="y"),
fill="#909090",
width=cell_w, height=cell_h
)
+ geom_tile(
data=raster_mod,
mapping=aes(x="x", y="y", fill="value"),
width=cell_w, height=cell_h
)
+ facet_wrap("model", ncol=n_facets, scales="free")
+ scale_fill_cmap(cmap_name="viridis", name="density")
+ labs(
x=r"$k_t$ (clearness index)",
y=r"$k$ (diffuse fraction)"
)
+ scale_x_continuous(limits=(0, 1), breaks=np.arange(0, 1.1, 0.2))
+ scale_y_continuous(limits=(0, 1), breaks=np.arange(0, 1.1, 0.2))
+ coord_fixed(ratio=1)
+ theme_minimal()
+ theme(
text=element_text(family="Times New Roman", size=9),
plot_title=element_text(size=9),
axis_title=element_text(size=9),
axis_text=element_text(size=9),
legend_position="bottom",
legend_title=element_text(size=9),
legend_text=element_text(size=9),
legend_key_width=100,
legend_key_height=5,
legend_margin=-12,
legend_box_spacing=0,
axis_title_x=element_text(size=9, margin={"t": 1, "b": 2}),
plot_margin_top=0,
plot_margin_right=0,
plot_margin_bottom=0,
plot_margin_left=0,
panel_grid_minor=element_blank(),
figure_size=(width_inch, fig_h)
)
)
if title is not None:
g = g + labs(title=title)
return g
p = _ggplot()
if output_file:
# Rasterize tile layers so PDF stores them as bitmaps (KB not MB).
fig = p.draw()
for ax in fig.axes:
for col in ax.collections:
col.set_rasterized(True)
fig.savefig(output_file, dpi=300, bbox_inches="tight")
mpl.pyplot.close(fig)
# Fresh ggplot: draw()+close leaves plotnine layout state invalid for a second draw
# (e.g. Jupyter _repr_mimebundle_ → save → IndexError in layout.get_scales).
p = _ggplot()
return p
finally:
if prev_fontset is not None:
mpl.rcParams["mathtext.fontset"] = prev_fontset
def _points_to_raster(df, xcol, ycol, value=None, n=512, x_range=(0, 1), y_range=(0, 1),
fill_style="gray", model_col="model", facet_models=None):
"""
Rasterize (x, y) points into a grid and assign fill colors (scattermore-style).
Parameters
----------
df : pd.DataFrame
Data with xcol, ycol, and optionally value and model_col.
xcol, ycol : str
Column names for x and y.
value : str or None
If None, use count per cell; else aggregate this column (mean) per cell.
n : int
Grid resolution (n x n pixels).
x_range, y_range : tuple
(min, max) for x and y.
fill_style : str
'gray' (count → light gray to dark) or 'viridis' (value → viridis).
model_col : str
Column name for facet (model). Preserved in output.
facet_models : sequence or None
If provided and df has no model_col, replicate the raster for each value (e.g. for measured in all facets).
Returns
-------
raster_df : pd.DataFrame
Columns x, y, fill (hex), and model_col if present.
"""
def _single_raster(_df, _xcol, _ycol, _value):
x = np.asarray(_df[_xcol], dtype=float)
y = np.asarray(_df[_ycol], dtype=float)
valid = np.isfinite(x) & np.isfinite(y)
valid &= (x >= x_range[0]) & (x <= x_range[1]) & (y >= y_range[0]) & (y <= y_range[1])
if not np.any(valid):
return None
xf, yf = x[valid], y[valid]
x_edges = np.linspace(x_range[0], x_range[1], n + 1)
y_edges = np.linspace(y_range[0], y_range[1], n + 1)
xi = np.clip(np.digitize(xf, x_edges) - 1, 0, n - 1)
yi = np.clip(np.digitize(yf, y_edges) - 1, 0, n - 1)
if _value is None:
z = np.zeros((n, n))
np.add.at(z, (yi, xi), 1.0)
else:
v = np.asarray(_df.loc[valid, _value], dtype=float)
sums = np.zeros((n, n))
cnt = np.zeros((n, n))
np.add.at(sums, (yi, xi), v)
np.add.at(cnt, (yi, xi), 1.0)
with np.errstate(divide="ignore", invalid="ignore"):
z = np.where(cnt > 0, sums / cnt, 0.0)
x_center = (x_edges[:-1] + x_edges[1:]) / 2
y_center = (y_edges[:-1] + y_edges[1:]) / 2
rows = []
for i in range(n):
for j in range(n):
if z[i, j] <= 0:
continue
rows.append({"x": x_center[j], "y": y_center[i], "z": z[i, j]})
return pd.DataFrame(rows) if rows else None
out_list = []
if model_col in df.columns and df[model_col].nunique() > 1:
for _, grp in df.groupby(model_col, observed=True):
single = _single_raster(grp, xcol, ycol, value)
if single is not None:
single[model_col] = grp[model_col].iloc[0]
out_list.append(single)
else:
single = _single_raster(df, xcol, ycol, value)
if single is not None:
if facet_models is not None:
for m in facet_models:
s = single.copy()
s[model_col] = m
out_list.append(s)
elif model_col in df.columns:
single[model_col] = df[model_col].iloc[0]
out_list.append(single)
else:
out_list.append(single)
if not out_list:
cols = ["x", "y"] + (["value"] if fill_style == "viridis" else [])
cols += [model_col] if (facet_models or (model_col in df.columns)) else []
return pd.DataFrame(columns=cols)
out = pd.concat(out_list, ignore_index=True)
zvals = out["z"].values
out = out.drop(columns=["z"])
if fill_style == "gray":
# Single gray for measured layer (no density scale)
pass
else:
# Viridis: keep numeric value for legend bar
zmin, zmax = np.nanmin(zvals), np.nanmax(zvals)
out["value"] = (
np.ones_like(zvals)
if zmax <= zmin
else np.clip((zvals - zmin) / (zmax - zmin), 0, 1)
)
if model_col in out.columns:
order = [x for x in FACET_ORDER if x in out[model_col].values]
if order:
out[model_col] = pd.Categorical(out[model_col], categories=order, ordered=True)
return out
def _get_density(x, y, n=200):
"""
Grid-based 2-D density estimate, equivalent to R's MASS::kde2d + findInterval lookup.
"""
from scipy.ndimage import gaussian_filter
finite = np.isfinite(x) & np.isfinite(y)
density = np.full(len(x), np.nan)
if finite.sum() < 2:
return density
xf, yf = x[finite], y[finite]
counts, xedges, yedges = np.histogram2d(xf, yf, bins=n)
z = gaussian_filter(counts.T, sigma=n / 25.0)
ix = np.clip(np.digitize(xf, xedges) - 1, 0, n - 1)
iy = np.clip(np.digitize(yf, yedges) - 1, 0, n - 1)
density[finite] = z[iy, ix]
return density