"""
bsrn data retrieval module.
Handles FTP connections and automated downloads.
"""
import os
import re
import time
from ftplib import FTP
from bsrn.constants import BSRN_FTP_HOST
BSRN_FILENAME_PATTERN = re.compile(
r"([a-zA-Z0-9]{3})(\d{2})(\d{2})(?:[._]([a-z0-9_-]+))?(?:\.dat\.gz|\.\d{3}|\.parquet)",
re.IGNORECASE,
)
def _safe_ftp_quit(ftp):
"""Call ``quit`` on *ftp* if set; ignore errors (cleanup path)."""
if ftp is None:
return
try:
ftp.quit()
except Exception:
pass
def _ftp_relogin(ftp, host, username, password):
"""Reconnect an existing client after a transient failure."""
try:
ftp.connect(host)
ftp.login(user=username, passwd=password)
ftp.set_pasv(True)
except Exception:
pass
def _connect_ftp(host, username, password):
"""Return a logged-in passive-mode FTP client."""
ftp = FTP(host)
ftp.set_pasv(True)
ftp.login(user=username, passwd=password)
return ftp
def _monthly_dat_gz_filename(station, year, month):
"""Build ``stnMMYY.dat.gz`` (lowercase station, 2-digit month and year)."""
year_str = str(year)[-2:]
month_int = int(month)
return f"{station.lower()}{month_int:02d}{year_str}.dat.gz"
def _filter_station_archive_files(files):
"""Keep only station-to-archive style names from an FTP ``nlst`` result."""
return [
f for f in files
if f.lower().endswith(".dat.gz")
or (len(f) > 4 and f[-4:].startswith(".") and f[-3:].isdigit())
]
[docs]
def get_bsrn_file_inventory(stations, username, password, host=BSRN_FTP_HOST):
"""
Connects to bsrn ftp and lists available station-to-archive files.
Parameters
----------
stations : list
List of station abbreviations (e.g., ['PAY', 'NYA']).
username : str
BSRN FTP username.
password : str
BSRN FTP password.
host : str, default BSRN_FTP_HOST
FTP host address.
Returns
-------
inventory : dict
Mapping of station abbreviations to lists of filenames.
References
----------
.. [1] Driemel, A., et al. (2018). Baseline Surface Radiation Network (BSRN):
structure and data description (1992–2017). Earth System Science Data,
10(3), 1491-1501.
"""
inventory = {}
try:
with FTP(host) as ftp:
ftp.set_pasv(True)
ftp.login(user=username, passwd=password)
for i, stn in enumerate(stations):
print(
f"[{i + 1}/{len(stations)}] Fetching inventory for station "
f"{stn.upper()}...",
)
stn_lower = stn.lower()
success = False
# Retry logic for the connection
for attempt in range(2):
try:
ftp.cwd("/")
ftp.cwd(stn_lower)
files = ftp.nlst()
inventory[stn.upper()] = _filter_station_archive_files(files)
success = True
break
except Exception as e:
if attempt == 0:
_ftp_relogin(ftp, host, username, password)
else:
print(f"BSRN FTP: Failed to retrieve {stn} after retry: {e}")
if not success:
inventory[stn.upper()] = []
except Exception as e:
print(f"BSRN FTP: Major connection error: {e}")
return inventory
[docs]
def download_bsrn_single(station, year, month, local_dir, username,
password, host=BSRN_FTP_HOST):
"""
Download a single BSRN file by specifying station, year, and month.
Parameters
----------
station : str
Station abbreviation (e.g., 'pay').
year : int or str
Four-digit year (e.g., 2024 or '2024').
month : int or str
Month number or string (1-12 or '01'-'12').
local_dir : str
The local directory to save the file.
username : str
BSRN FTP username.
password : str
BSRN FTP password.
host : str, default BSRN_FTP_HOST
FTP host address.
Returns
-------
local_path : str or None
The path to the downloaded file, or None if failed.
"""
filename = _monthly_dat_gz_filename(station, year, month)
return download_bsrn_files([filename], local_dir, username, password, host=host)[0]
[docs]
def download_bsrn_stn(station, local_dir, username,
password, host=BSRN_FTP_HOST):
"""
Download all available station-to-archive files for a specific station.
Parameters
----------
station : str
Station abbreviation (e.g., 'pay').
local_dir : str
The local directory to save the files.
username : str
BSRN FTP username.
password : str
BSRN FTP password.
host : str, default BSRN_FTP_HOST
FTP host address.
Returns
-------
downloaded_paths : list
List of paths to the downloaded files.
"""
inventory = get_bsrn_file_inventory([station], username, password, host=host)
filenames = inventory.get(station.upper(), [])
return download_bsrn_files(filenames, local_dir, username, password, host=host)
[docs]
def download_bsrn_mon(stations, year, month, local_dir,
username, password, host=BSRN_FTP_HOST):
"""
Download station-to-archive files for multiple stations for a specific month and year.
Parameters
----------
stations : list of str
List of station abbreviations (e.g., ['pay', 'nya']).
year : int or str
Four-digit year (e.g., 2024 or '2024').
month : int or str
Month number or string (1-12 or '01'-'12').
local_dir : str
The local directory to save the files.
username : str
BSRN FTP username.
password : str
BSRN FTP password.
host : str, default BSRN_FTP_HOST
FTP host address.
Returns
-------
downloaded_paths : list
List of paths to the downloaded files.
"""
filenames = [_monthly_dat_gz_filename(stn, year, month) for stn in stations]
return download_bsrn_files(filenames, local_dir, username, password, host=host)
[docs]
def download_bsrn_files(filenames, local_dir, username,
password, host=BSRN_FTP_HOST, retries=3):
"""
Download many BSRN files efficiently using a single FTP connection with robust retries.
Parameters
----------
filenames : list of str
List of filenames to download (e.g., ['pay0123.dat.gz']).
local_dir : str
The local directory to save the files.
username : str
BSRN FTP username.
password : str
BSRN FTP password.
host : str, default BSRN_FTP_HOST
FTP host address.
retries : int, default 3
Number of retry attempts for both connection and file transfer errors.
Returns
-------
downloaded_paths : list
List of paths to the downloaded files.
"""
os.makedirs(local_dir, exist_ok=True)
downloaded_paths = []
ftp = None
try:
ftp = _connect_ftp(host, username, password)
for filename in filenames:
filename_lower = filename.lower()
local_path = os.path.join(local_dir, filename_lower)
station_code = filename_lower[:3]
success = False
for attempt in range(retries):
try:
if ftp is None:
ftp = _connect_ftp(host, username, password)
ftp.cwd("/")
ftp.cwd(station_code)
with open(local_path, "wb") as f:
ftp.retrbinary(f"RETR {filename_lower}", f.write)
downloaded_paths.append(local_path)
success = True
break
except Exception as e:
print(f"BSRN FTP: Attempt {attempt + 1} failed for {filename}: {e}")
_safe_ftp_quit(ftp)
ftp = None
if attempt < retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
if not success:
downloaded_paths.append(None)
except Exception as e:
print(f"BSRN FTP: Major connection error: {e}")
while len(downloaded_paths) < len(filenames):
downloaded_paths.append(None)
finally:
_safe_ftp_quit(ftp)
return downloaded_paths
[docs]
def parse_bsrn_filename(filename):
"""
Extract station code, year, month, and optional suffix from a filename.
Parameters
----------
filename : str
BSRN filename (e.g., 'pay0123.dat.gz') or parquet (e.g., 'ber0198_crs.parquet').
Returns
-------
station : str or None
Three-letter station code (uppercase).
year : int or None
Four-digit calendar year (e.g., 2023).
month : int or None
Month number in ``1`` … ``12``.
suffix : str or None
Optional filename suffix (e.g., ``nsrdb_aggregated``).
"""
match = BSRN_FILENAME_PATTERN.match(filename)
if not match:
return None, None, None, None
# Get components
station = match.group(1).upper()
month = int(match.group(2))
yy = int(match.group(3))
suffix = match.group(4)
# 4-digit year conversion
# BSRN convention: 00-79 -> 2000-2079, 80-99 -> 1980-1999
year = 2000 + yy if yy < 80 else 1900 + yy
return station, year, month, suffix
[docs]
def months_from_ftp_filenames(filenames):
"""
Extract a unique set of (year, month) tuples from a list of BSRN filenames.
Parameters
----------
filenames : list of str
List of filenames from BSRN FTP.
Returns
-------
months : list of tuple
Sorted list of (year, month) tuples.
"""
ym_set = set()
for f in filenames:
_, y, m, _ = parse_bsrn_filename(f)
if y is not None:
ym_set.add((y, m))
return sorted(ym_set)