"""
CAMS solar radiation service (CRS) HTTP retrieval helpers.
CRS and McClear are both exposed on SoDa (same ``api.soda-solardata.com`` WPS endpoint);
this module mirrors :mod:`bsrn.io.mcclear` but uses WPS ``Identifier=get_cams_radiation``.
"""
import io
import pandas as pd
import requests
from huggingface_hub import hf_hub_url
from bsrn.constants import (
BSRN_STATIONS,
CRS_API_HOST,
CRS_HF_REPO_ID,
CRS_INTEGRATED_COLUMNS,
HF_MAINTAINER_EMAIL,
CRS_OUTPUT_COLUMNS,
CRS_VARIABLE_MAP,
CRS_HIMAWARI_MIN_START_UTC,
CRS_MSG_MIN_START_UTC,
)
from bsrn.physics.geometry import in_satellite_disk
from bsrn.io.retrieval import get_bsrn_file_inventory, months_from_ftp_filenames
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
def _crs_min_start_utc(latitude, longitude):
"""
Get the earliest allowed CRS start date [UTC] for a given site.
Parameters
----------
latitude : float
Site latitude [degrees].
longitude : float
Site longitude [degrees].
Returns
-------
min_start : pd.Timestamp or None
The earliest start date, or None if the site is outside all disks.
"""
in_himawari = in_satellite_disk(latitude, longitude, "Himawari")
in_msg = in_satellite_disk(latitude, longitude, "MSG")
if not in_himawari and not in_msg:
return None
# Earliest allowed start: union of applicable satellite minima (favor earliest)
candidates = []
if in_himawari:
candidates.append(pd.Timestamp(CRS_HIMAWARI_MIN_START_UTC))
if in_msg:
candidates.append(pd.Timestamp(CRS_MSG_MIN_START_UTC))
return min(candidates)
def _check_crs_coverage(latitude, longitude, start):
"""
Require the site inside the Himawari or MSG **60° reliability disk** and *start* not
before the applicable minimum.
Parameters
----------
latitude : float
Site latitude [degrees].
longitude : float
Site longitude [degrees].
start : datetime-like
Request period start.
Raises
------
ValueError
If the site is outside both satellite disks or *start* is before the required minimum.
"""
min_start = _crs_min_start_utc(latitude, longitude)
if min_start is None:
raise ValueError(
"Site is outside the Himawari (140.7°E) and MSG (0°E) 60° reliability disks."
)
# Compare *start* as UTC-naive timestamp
start_ts = pd.Timestamp(start)
if start_ts.tzinfo is not None:
start_cmp = start_ts.tz_convert("UTC").tz_localize(None)
else:
start_cmp = start_ts
if start_cmp < min_start:
raise ValueError(
f"CRS request start must be on or after {min_start.date()} for this location."
)
def _parse_crs(raw_or_buffer):
"""
Parse SoDa CAMS CRS CSV into the project irradiance frame (used by ``download_crs`` only).
Parameters
----------
raw_or_buffer : str or file-like
Raw SoDa CAMS text or readable text buffer.
Returns
-------
data : pd.DataFrame
UTC index and columns ``ghi_crs``, ``bni_crs``, ``dhi_crs`` [W/m²] only.
Raises
------
ValueError
Missing header line, missing columns after rename, or unreadable CSV.
References
----------
.. [1] CAMS radiation service — SoDa.
https://www.soda-pro.com/web-services/radiation/cams-radiation-service
"""
if isinstance(raw_or_buffer, str):
fbuf = io.StringIO(raw_or_buffer)
else:
fbuf = raw_or_buffer
# Skip preamble until column-name row
while True:
line = fbuf.readline()
if not line:
raise ValueError("Invalid CRS payload: header not found.")
line = line.rstrip("\n")
if line.startswith("# Observation period"):
names = line.lstrip("# ").split(";")
break
data = pd.read_csv(fbuf, sep=";", comment="#", header=None, names=names)
# Interval bounds from first column
obs_period = data["Observation period"].str.split("/")
# Using the first part of the period (start-time) for floor-style labeling.
data.index = pd.to_datetime(obs_period.str[0], utc=True)
# SoDa integrated irradiance → mean irradiance over the step [W/m²]
integrated_cols = [c for c in CRS_INTEGRATED_COLUMNS if c in data.columns]
time_delta = pd.to_datetime(obs_period.str[1]) - pd.to_datetime(obs_period.str[0])
hours = time_delta.dt.total_seconds() / 3600.0
data[integrated_cols] = data[integrated_cols].divide(hours.tolist(), axis="rows")
data.index.name = None # Remove index name
data = data.rename(columns=CRS_VARIABLE_MAP)
missing = [c for c in CRS_OUTPUT_COLUMNS if c not in data.columns]
if missing:
raise ValueError(
"CRS payload missing required columns after rename: "
f"{missing}."
)
return data[CRS_OUTPUT_COLUMNS].copy()
def _hf_fetch_to_memory(repo_id, filename):
"""
Fetch a file from Hugging Face Hub directly to memory (bytes).
Parameters
----------
repo_id : str
Hugging Face repository ID (e.g., "dazhiyang/bsrn-v1").
filename : str
Path within the repository (e.g., "qiq/qiq0624_crs.parquet").
Returns
-------
content : bytes
Raw file bytes.
Raises
------
FileNotFoundError
If the file is missing on the Hub or the HTTP request fails.
"""
print(f"Fetching CRS from Hugging Face: {filename}")
try:
url = hf_hub_url(
repo_id=repo_id, filename=filename, repo_type="dataset"
)
resp = requests.get(url, timeout=60)
resp.raise_for_status()
return resp.content
except requests.HTTPError as e:
if e.response is not None and e.response.status_code == 404:
raise FileNotFoundError(
f"{filename} is not yet on huggingface, please contact the maintainer "
f"Dazhi Yang at {HF_MAINTAINER_EMAIL} for update."
) from e
raise FileNotFoundError(
f"{filename} is not yet on huggingface, please contact the maintainer "
f"Dazhi Yang at {HF_MAINTAINER_EMAIL} for update."
) from e
except Exception as e:
raise FileNotFoundError(
f"{filename} is not yet on huggingface, please contact the maintainer "
f"Dazhi Yang at {HF_MAINTAINER_EMAIL} for update."
) from e
def _fetch_crs_from_hf(station_code, index):
"""
Fetch raw bytes from Hugging Face for the months required by the index.
Parameters
----------
station_code : str
BSRN station code (case-insensitive).
index : pd.DatetimeIndex
Non-empty target index; months present determine which files are fetched.
Returns
-------
contents : list of bytes
One element per month, in sorted order.
Raises
------
ValueError
If index is empty.
"""
if index.empty:
raise ValueError("index must not be empty.")
stn = station_code.lower()
# With floor-labeling (start-of-interval), the month in the index directly
# determines which monthly file we need to fetch.
unique_months = sorted(set(zip(index.year, index.month)))
contents = [] # Accumulate raw bytes
for year, month in unique_months:
yy = str(year)[2:]
mm = f"{month:02d}"
# Monthly filename format: qiq0124_crs.parquet
filename = f"{stn}{mm}{yy}_crs.parquet"
hf_filename = f"{stn}/{filename}"
content = _hf_fetch_to_memory(CRS_HF_REPO_ID, hf_filename)
contents.append(content)
return contents
def _load_crs_parquet(path_or_bytes):
"""
Load one CRS parquet into a UTC-indexed DataFrame.
Parameters
----------
path_or_bytes : str, path-like, bytes, or file-like
Path to the parquet file, or bytes content, or file-like object.
Returns
-------
data : pd.DataFrame
CRS data with columns ghi_crs, bni_crs, dhi_crs and UTC DatetimeIndex.
"""
if isinstance(path_or_bytes, bytes):
path_or_bytes = io.BytesIO(path_or_bytes)
data = pd.read_parquet(path_or_bytes)
if not isinstance(data.index, pd.DatetimeIndex):
raise ValueError("CRS parquet must have DatetimeIndex.")
if data.index.tz is None:
data.index = data.index.tz_localize("UTC") # Localize to UTC
else:
data.index = data.index.tz_convert("UTC") # Convert to UTC
return data
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def check_crs_availability(stations, username, password):
"""
Check which BSRN stations are geographically covered by CAMS Radiation Service (CRS)
**and** have BSRN archive files overlapping the CRS temporal range.
Workflow:
1. Filter *stations* by spatial coverage (MSG and Himawari disks).
2. Query BSRN FTP for the covered subset to obtain file inventories.
3. Extract years from filenames and intersect with the CRS year range.
Parameters
----------
stations : list of str
BSRN station codes to check (e.g. ``['BIL', 'BON', 'DRA']``).
username : str
BSRN FTP username.
password : str
BSRN FTP password.
Returns
-------
availability : dict
A dictionary mapping station codes to availability metadata:
``{station_code: {'years': [list of years], 'months': [list of (y,m) tuples]}}``.
``years`` is used for bulk API downloads, and ``months`` for monthly
parquet writing. Stations with no overlap are omitted.
"""
# Mission start years for MSG and Himawari
y_min_msg = 2004
y_min_hima = 2016
y_max = pd.Timestamp.now(tz="UTC").year
# Step 1: geographic filter
covered = {} # maps station to its min_year
for code in stations:
code_upper = code.upper()
if code_upper not in BSRN_STATIONS:
continue
meta = BSRN_STATIONS[code_upper]
lat, lon = meta["lat"], meta["lon"]
# Use library logic to determine coverage and minimum start date
in_msg = in_satellite_disk(lat, lon, "MSG")
in_hima = in_satellite_disk(lat, lon, "Himawari")
if in_msg or in_hima:
# Union of applicable satellite minima
min_y = y_min_msg
if in_hima and not in_msg:
min_y = y_min_hima
elif in_hima and in_msg:
min_y = min(y_min_msg, y_min_hima)
covered[code_upper] = min_y
if not covered:
return {}
# Step 2: FTP inventory for covered stations
inventory = get_bsrn_file_inventory(list(covered.keys()), username, password)
# Step 3: extract years and intersect with CRS range
availability = {}
for stn, files in inventory.items():
stn_upper = stn.upper()
min_y = covered[stn_upper]
# Standardize month extraction
all_months = months_from_ftp_filenames(files)
ym_filtered = [(y, m) for y, m in all_months if min_y <= y <= y_max]
if ym_filtered:
unique_years = sorted(list(set(y for y, m in ym_filtered)))
# Store metadata for station
availability[stn_upper] = {
"years": unique_years,
"months": sorted(list(set(ym_filtered)))
}
return availability
[docs]
def download_crs(latitude, longitude, start, end, email, elev=None,
summarization="PT01H", timeout=30):
"""
Download and parse CAMS Radiation Service (CRS) time series from SoDa.
CRS provides **all-sky** satellite-derived irradiances (not a clear-sky model like McClear).
Requests use ``time_ref=UT`` and ``verbose=false`` (fixed; not configurable).
Parsed frame contains only UTC index and all-sky ``ghi_crs``, ``bni_crs``, ``dhi_crs`` [W/m²]
(other SoDa fields are dropped). Location and *start* are validated by ``_check_crs_coverage``.
Parameters
----------
latitude : float
Latitude in decimal degrees. [degrees]
longitude : float
Longitude in decimal degrees. [degrees]
start : datetime.datetime or pandas.Timestamp
Start date (inclusive) of requested period.
end : datetime.datetime or pandas.Timestamp
End date (inclusive) of requested period.
email : str
SoDa account email.
elev : float, optional
Station elevation [m]. If None, use SoDa default terrain lookup (-999).
summarization : str, default "PT01H"
ISO-8601 duration for temporal aggregation (e.g., "PT01M", "PT15M", "PT01H").
timeout : int, default 30
HTTP request timeout in seconds.
Returns
-------
data : pd.DataFrame
Columns ghi_crs, bni_crs, dhi_crs only; UTC DatetimeIndex.
Raises
------
requests.HTTPError
SoDa returned a non-success HTTP status (often with ows:ExceptionText in the body).
ValueError
Coverage or start failed _check_crs_coverage, XML instead of CSV, parse error, or empty data.
References
----------
.. [1] Schroedter-Homscheidt, M., et al. (2016). User's Guide to the CAMS Radiation Service.
European Commission.
"""
if elev is None:
elev = -999 # Default to terrain lookup
_check_crs_coverage(latitude, longitude, start)
# WPS date strings in UTC
start_ts = pd.Timestamp(start)
end_ts = pd.Timestamp(end)
if start_ts.tzinfo is not None:
start_str = start_ts.tz_convert("UTC").strftime("%Y-%m-%d")
else:
start_str = start_ts.strftime("%Y-%m-%d")
if end_ts.tzinfo is not None:
end_str = end_ts.tz_convert("UTC").strftime("%Y-%m-%d")
else:
end_str = end_ts.strftime("%Y-%m-%d")
# Double-encode @ for query string
email_encoded = email.replace("@", "%2540")
# SoDa Execute payload (DataInputs)
data_inputs_dict = {
"latitude": latitude,
"longitude": longitude,
"altitude": elev,
"date_begin": start_str,
"date_end": end_str,
"time_ref": "UT",
"summarization": summarization,
"username": email_encoded,
"verbose": "false",
}
data_inputs = ";".join([f"{key}={value}" for key, value in data_inputs_dict.items()])
params = {
"Service": "WPS",
"Request": "Execute",
"Identifier": "get_cams_radiation",
"version": "1.0.0",
"RawDataOutput": "irradiation",
}
base_url = f"https://{CRS_API_HOST}/service/wps"
try:
res = requests.get(
base_url + "?DataInputs=" + data_inputs,
params=params,
timeout=timeout,
)
except requests.Timeout as exc:
raise requests.Timeout(
f"CRS request timed out for {base_url}: {exc}"
) from exc
# Enrich HTTPError with OWS exception text when present
if not res.ok:
text = res.text or ""
if "ows:ExceptionText" in text:
try:
errors = text.split("ows:ExceptionText")[1][1:-2]
except Exception:
errors = text
res.reason = f"{res.reason}: <{errors}>"
res.raise_for_status()
body_text = res.content.decode("utf-8")
stripped = body_text.lstrip()
# 200 OK can still be XML on some errors
if stripped.startswith("<?xml") or stripped.startswith("<ows:ExceptionReport"):
raise ValueError(
"SoDa CRS returned XML instead of CSV."
)
data = _parse_crs(body_text)
if len(data.index) == 0:
raise ValueError(
"SoDa CRS returned no data rows."
)
return data
[docs]
def fetch_crs_hf(index, station_code):
"""
Fetch CRS from Hugging Face and return inputs aligned to target index.
Parameters
----------
index : pd.DatetimeIndex
Target time index to align CRS outputs to.
station_code : str
BSRN station code (e.g., "QIQ").
Returns
-------
aligned : pd.DataFrame
CRS inputs reindexed to `index` with columns ghi_crs, bni_crs, dhi_crs.
Raises
------
ValueError
If ``index`` is not a :class:`~pandas.DatetimeIndex` or is empty.
FileNotFoundError
From Hugging Face fetch helpers when a monthly parquet is unavailable.
"""
if not isinstance(index, pd.DatetimeIndex):
raise ValueError(
"index must be a pandas DatetimeIndex."
)
if index.empty:
raise ValueError("index must not be empty.")
contents = _fetch_crs_from_hf(station_code, index)
dfs = [_load_crs_parquet(c) for c in contents]
raw = pd.concat(dfs).sort_index()
raw = raw[~raw.index.duplicated(keep="first")]
# By default, use direct reindexing.
# To align with ceiling average, we often need +1 hour shift here
# but I'm leaving it as-is for now based on user's 'revert'.
aligned = raw.reindex(index)
return aligned
[docs]
def add_crs_columns(df, station_code=None, lat=None, lon=None, elev=None):
"""
Adds CRS (CAMS Radiation Service) all-sky columns to a DataFrame.
Fetches data from Hugging Face automatically.
Location can be given by BSRN station code or by explicit lat/lon/elev.
Parameters
----------
df : pd.DataFrame
DataFrame to which columns will be added. Index must be DatetimeIndex.
station_code : str, optional
BSRN station abbreviation. [e.g. 'QIQ'] Used if lat/lon/elev not provided.
lat : float, optional
Latitude. [degrees] Required for non-BSRN stations if station_code omitted.
lon : float, optional
Longitude. [degrees] Required for non-BSRN stations if station_code omitted.
elev : float, optional
Elevation. [m] Required for non-BSRN stations if station_code omitted.
Returns
-------
df : pd.DataFrame
The input DataFrame with added crs columns.
Raises
------
ValueError
If ``df.index`` is not a :class:`~pandas.DatetimeIndex`.
ValueError
If neither a valid station_code nor complete (lat, lon, elev) is provided.
"""
if not isinstance(df.index, pd.DatetimeIndex):
raise ValueError("DataFrame index must be a pandas DatetimeIndex.")
# Resolve metadata: explicit lat/lon/elev or BSRN lookup
if lat is not None and lon is not None and elev is not None:
pass # use provided coordinates
elif station_code is not None and station_code in BSRN_STATIONS:
meta = BSRN_STATIONS[station_code]
lat, lon, elev = meta["lat"], meta["lon"], meta["elev"]
elif station_code is not None:
raise ValueError(
f"Station '{station_code}' not found in BSRN registry. "
"For non-BSRN stations, provide 'lat', 'lon', and 'elev' explicitly."
)
else:
raise ValueError(
"Insufficient metadata. Provide a valid BSRN 'station_code' or "
"explicit 'lat', 'lon', and 'elev'."
)
if station_code is None:
raise ValueError("fetch_crs_hf currently requires 'station_code' to fetch parquets from Hugging Face.")
crs_data = fetch_crs_hf(df.index, station_code)
for col in crs_data.columns:
df[col] = crs_data[col]
return df