"""
NSRDB solar radiation database retrieval helpers.
Supports three variants (conus, full-disc, aggregated) via NLR API and Hugging Face.
"""
import io
import pandas as pd
import requests
from huggingface_hub import hf_hub_url
from bsrn.constants import (
BSRN_STATIONS,
NSRDB_API_BASE_URL,
HF_MAINTAINER_EMAIL,
NSRDB_OUTPUT_COLUMNS,
NSRDB_VARIABLE_MAP,
NSRDB_VARIANTS,
)
from bsrn.physics.geometry import in_satellite_disk
from bsrn.io.retrieval import get_bsrn_file_inventory, months_from_ftp_filenames
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
def _in_nsrdb_coverage(lat, lon, variant):
"""
Check if (lat, lon) is within the spatial coverage of an NSRDB variant.
Applies bounding-box filter (if defined) then satellite disk geometry.
Parameters
----------
lat : float
Latitude in decimal degrees. [degrees]
lon : float
Longitude in decimal degrees. [degrees]
variant : str
NSRDB variant name: 'conus', 'full-disc', or 'aggregated'.
Returns
-------
bool
True if location is within the variant's spatial footprint.
"""
v = NSRDB_VARIANTS[variant]
bbox = v.get("bbox")
if bbox is not None:
lat_lo, lat_hi = bbox["lat"]
lon_lo, lon_hi = bbox["lon"]
if not (lat_lo <= lat <= lat_hi and lon_lo <= lon <= lon_hi):
return False
return any(in_satellite_disk(lat, lon, sk) for sk in v["satellites"])
def _parse_nsrdb(raw_text):
"""
Parse NSRDB API CSV payload (skips metadata row, handles units).
Parameters
----------
raw_text : str
CSV response body from the NLR API.
Returns
-------
data : pd.DataFrame
UTC :class:`~pandas.DatetimeIndex` and project-standard irradiance columns.
"""
# NSRDB CSV has metadata on line 0, header on line 1, data starts on line 2
df = pd.read_csv(io.StringIO(raw_text), skiprows=2)
# Convert to UTC DatetimeIndex
df["dt"] = pd.to_datetime(df[["Year", "Month", "Day", "Hour", "Minute"]], utc=True)
df = df.set_index("dt")
# Rename and select columns
df = df.rename(columns=NSRDB_VARIABLE_MAP)
valid_cols = [c for c in NSRDB_OUTPUT_COLUMNS if c in df.columns]
return df[valid_cols].copy()
def _hf_fetch_to_memory(repo_id, filename):
"""
Fetch a file from Hugging Face Hub directly to memory (bytes).
Parameters
----------
repo_id : str
Hugging Face repository ID.
filename : str
Path within the repository.
Returns
-------
content : bytes
Raw file bytes.
Raises
------
FileNotFoundError
If the dataset file is missing (HTTP 404) on the Hub.
requests.HTTPError
On other non-success HTTP statuses.
"""
print(f"Fetching NSRDB from Hugging Face: {filename}")
try:
url = hf_hub_url(repo_id=repo_id, filename=filename, repo_type="dataset")
resp = requests.get(url, timeout=60)
resp.raise_for_status()
return resp.content
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
raise FileNotFoundError(
f"{filename} not on HF Hub. Contact {HF_MAINTAINER_EMAIL} for updates."
) from e
raise
def _fetch_nsrdb_from_hf(station_code, index, variant="conus"):
"""
Fetch monthly NSRDB parquet blobs from Hugging Face for months in *index*.
Parameters
----------
station_code : str
BSRN station code (case-insensitive).
index : pd.DatetimeIndex
Non-empty target index; shifted month boundaries select files.
variant : str, default ``"conus"``
NSRDB variant key in :data:`~bsrn.constants.NSRDB_VARIANTS`.
Returns
-------
contents : list of bytes
Raw parquet bytes per successfully fetched month (missing months skipped).
"""
if index.empty:
raise ValueError("index must not be empty.")
stn = station_code.lower()
v = NSRDB_VARIANTS[variant]
hf_repo_id = v["hf_repo_id"]
# Align to months. Using -1s shift for boundary labels.
shifted = index.shift(-1, freq="s")
unique_months = sorted(set(zip(shifted.year, shifted.month)))
contents = []
for year, month in unique_months:
yy = str(year)[2:]
mm = f"{month:02d}"
# Filename updated to match the new convention with variant suffix
filename = f"{stn}{mm}{yy}_nsrdb_{variant}.parquet"
# HF repo structure is {stn}/{filename}
hf_filename = f"{stn}/{filename}"
try:
content = _hf_fetch_to_memory(hf_repo_id, hf_filename)
contents.append(content)
except (FileNotFoundError, requests.HTTPError):
# If a month is missing, we continue and reindexing will fill NaNs
continue
return contents
def _load_nsrdb_parquet(path_or_bytes, target_index=None):
"""
Load one NSRDB parquet and optionally interpolate to *target_index*.
Parameters
----------
path_or_bytes : str, path-like, bytes, or file-like
Parquet path, bytes, or readable buffer.
target_index : pd.DatetimeIndex or None
If given, reindex and time-interpolate to this index.
Returns
-------
data : pd.DataFrame
UTC-indexed NSRDB columns (possibly interpolated).
"""
if isinstance(path_or_bytes, bytes):
path_or_bytes = io.BytesIO(path_or_bytes)
data = pd.read_parquet(path_or_bytes)
if data.index.tz is None:
data.index = data.index.tz_localize("UTC")
else:
data.index = data.index.tz_convert("UTC")
if target_index is not None:
# Reindex and interpolate to 1-min or other target resolution
data = data.reindex(target_index.union(data.index)).sort_index()
data = data.interpolate(method="time").reindex(target_index)
return data
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def check_nsrdb_availability(stations, username, password, variant="conus"):
"""
Check which BSRN stations are geographically covered by an NSRDB variant
**and** have BSRN archive files overlapping the variant's year range.
Workflow:
1. Filter *stations* by spatial coverage (bbox + satellite disk).
2. Query BSRN FTP for the covered subset to obtain file inventories.
3. Extract years from filenames and intersect with the variant's year range.
Parameters
----------
stations : list of str
BSRN station codes to check (e.g. ``['BIL', 'BON', 'DRA']``).
username : str
BSRN FTP username.
password : str
BSRN FTP password.
variant : str, default "conus"
NSRDB variant name: 'conus', 'full-disc', or 'aggregated'.
Returns
-------
availability : dict
A dictionary mapping station codes to availability metadata:
``{station_code: {'years': [list of years], 'months': [list of (y,m) tuples]}}``.
``years`` is used for bulk API downloads, and ``months`` for monthly
parquet writing. Stations with no overlap are omitted.
Raises
------
ValueError
If *variant* is not a recognised NSRDB variant name.
"""
if variant not in NSRDB_VARIANTS:
raise ValueError(
f"Unknown NSRDB variant: {variant}."
)
v = NSRDB_VARIANTS[variant]
y_min, y_max = v["years"]
# Step 1: geographic filter
covered = []
for code in stations:
code_upper = code.upper()
if code_upper not in BSRN_STATIONS:
continue
meta = BSRN_STATIONS[code_upper]
if _in_nsrdb_coverage(meta["lat"], meta["lon"], variant):
covered.append(code_upper)
if not covered:
return {}
# Step 2: FTP inventory for covered stations
inventory = get_bsrn_file_inventory(covered, username, password)
# Step 3: extract years and intersect with variant range
# BSRN filenames: e.g. pay0123.dat.gz or qiq0224.004
# Pattern includes station code (3), month (2), year (2)
availability = {}
for stn, files in inventory.items():
stn_upper = stn.upper()
# Standardize month extraction
all_months = months_from_ftp_filenames(files)
ym_filtered = [(y, m) for y, m in all_months if y_min <= y <= y_max]
if ym_filtered:
unique_years = sorted(list(set(y for y, m in ym_filtered)))
availability[stn_upper] = {
"years": unique_years,
"months": sorted(list(set(ym_filtered))) # Ensure unique and sorted
}
return availability
[docs]
def download_nsrdb(latitude, longitude, year, api_key, email,
variant="conus", timeout=120):
"""
Download NSRDB data from NLR API.
Parameters
----------
latitude : float
Site latitude [degrees].
longitude : float
Site longitude [degrees].
year : int
Calendar year to download.
api_key : str
NLR developer API key.
email : str
User email registered with the API.
variant : str, default ``"conus"``
NSRDB variant name (see :data:`~bsrn.constants.NSRDB_VARIANTS`).
timeout : int, default 120
HTTP request timeout [s].
Returns
-------
df : pd.DataFrame
NSRDB irradiance columns for the requested *year*.
Raises
------
ValueError
If *year* is outside the variant's year range.
ValueError
If the location is not within the variant's spatial coverage.
References
----------
.. [1] Sengupta, M., Xie, Y., Lopez, A., Habte, A., Maclaurin, G., & Shelby, J. (2018). The
national solar radiation data base (NSRDB). Renewable and Sustainable Energy
Reviews, 89, 51-60.
.. [2] Xie, Y., Yang, J., Sengupta, M., Liu, Y., & Zhou, X. (2022). Improving the
prediction of DNI with physics-based representation of all-sky circumsolar
radiation. Solar Energy, 231, 758-766.
.. [3] Xie, Y., Sengupta, M., Yang, J., Buster, G., Benton, B., Habte, A., & Liu, Y. (2023).
Integration of a physics-based direct normal irradiance (DNI) model to enhance
the National Solar Radiation Database (NSRDB). Solar energy, 266, 112195.
.. [4] Xie, Y., Sengupta, M., & Dudhia, J. (2016). A Fast All-sky Radiation Model for
Solar applications (FARMS): Algorithm and performance evaluation. Solar Energy,
135, 435-445.
"""
v = NSRDB_VARIANTS[variant]
y_min, y_max = v["years"]
if year is not None and not (y_min <= year <= y_max):
raise ValueError(
f"Year {year} outside range {y_min}–{y_max} for variant '{variant}'."
)
if not _in_nsrdb_coverage(latitude, longitude, variant):
raise ValueError(f"Location not covered by variant '{variant}'.")
url = f"{NSRDB_API_BASE_URL}{v['endpoint']}"
params = {
"api_key": api_key,
"wkt": f"POINT({longitude} {latitude})",
"attributes": "ghi,dni,dhi",
"names": str(year),
"utc": "true",
"leap_day": "true",
"interval": str(v["interval"]),
"email": email,
"affiliation": "BSRN Research",
"reason": "academic research",
}
resp = requests.get(url, params=params, timeout=timeout)
resp.raise_for_status()
# NREL-style errors sometimes return 200 OK with error msg in text
if "error" in resp.text.lower() and len(resp.text) < 500:
raise ValueError(f"NSRDB API Error: {resp.text}")
return _parse_nsrdb(resp.text)
[docs]
def fetch_nsrdb_hf(index, station_code, variant="conus"):
"""
Fetch NSRDB from Hugging Face aligned to target index.
Parameters
----------
index : pd.DatetimeIndex
Target grid (typically 1-minute BSRN timestamps).
station_code : str
BSRN station code for parquet naming on the Hub.
variant : str, default ``"conus"``
NSRDB variant (folder and filename suffix).
Returns
-------
aligned : pd.DataFrame
Columns from :data:`~bsrn.constants.NSRDB_OUTPUT_COLUMNS`, reindexed to *index*.
"""
contents = _fetch_nsrdb_from_hf(station_code, index, variant)
if not contents:
# Return empty frame with correct columns
return pd.DataFrame(index=index, columns=NSRDB_OUTPUT_COLUMNS)
dfs = [_load_nsrdb_parquet(c, target_index=index) for c in contents]
aligned = pd.concat(dfs).sort_index()
# Handle overlaps if any
aligned = aligned[~aligned.index.duplicated(keep="first")]
return aligned.reindex(index)
[docs]
def add_nsrdb_columns(df, station_code=None, lat=None,
lon=None, elev=None, variant="conus"):
"""
Adds NSRDB all-sky columns to a DataFrame.
Fetches data from Hugging Face automatically.
Location can be given by BSRN station code or by explicit lat/lon/elev.
Parameters
----------
df : pd.DataFrame
DataFrame to which columns will be added. Index must be DatetimeIndex.
station_code : str, optional
BSRN station abbreviation. [e.g. 'BIL'] Used if lat/lon/elev not provided.
lat : float, optional
Latitude. [degrees] Required for non-BSRN stations if station_code omitted.
lon : float, optional
Longitude. [degrees] Required for non-BSRN stations if station_code omitted.
elev : float, optional
Elevation. [m] Required for non-BSRN stations if station_code omitted.
variant : str, default "conus"
NSRDB variant name: 'conus', 'full-disc', or 'aggregated'.
Returns
-------
df : pd.DataFrame
The input DataFrame with added NSRDB columns.
Raises
------
ValueError
If ``df.index`` is not a :class:`~pandas.DatetimeIndex`.
ValueError
If neither a valid station_code nor complete (lat, lon, elev) is provided.
"""
if not isinstance(df.index, pd.DatetimeIndex):
raise ValueError("DataFrame index must be a pandas DatetimeIndex.")
# Resolve metadata: explicit lat/lon/elev or BSRN lookup
if lat is not None and lon is not None and elev is not None:
pass # use provided coordinates
elif station_code is not None and station_code in BSRN_STATIONS:
meta = BSRN_STATIONS[station_code]
lat, lon, elev = meta["lat"], meta["lon"], meta["elev"]
elif station_code is not None:
raise ValueError(
f"Station '{station_code}' not found in BSRN registry. "
"For non-BSRN stations, provide 'lat', 'lon', and 'elev' explicitly."
)
else:
raise ValueError(
"Insufficient metadata. Provide a valid BSRN 'station_code' or "
"explicit 'lat', 'lon', and 'elev'."
)
if station_code is None:
raise ValueError("fetch_nsrdb_hf currently requires 'station_code' to fetch parquets from Hugging Face.")
nsrdb_data = fetch_nsrdb_hf(df.index, station_code, variant=variant)
for col in nsrdb_data.columns:
df[col] = nsrdb_data[col]
return df