Source code for bsrn.utils.clear_sky_detection

"""
Clear-sky detection (CSD) utilities.

Implements selected CSD methods from the MATLAB csd-library with a common
Python output interface.

Original MATLAB implementations and methodology definitions are from
Jamie M. Bright's clear-sky detection library:
https://github.com/JamieMBright/csd-library
"""

import warnings
import numpy as np
import pandas as pd


def _as_1d_array(x, name, n=None):
    """
    Convert input to 1D float array and validate length.

    Parameters
    ----------
    x : array-like
        Input to convert.
    name : str
        Name for error messages.
    n : int, optional
        Required length; if set, raise when len(x) != n.

    Returns
    -------
    np.ndarray
        1D float array.

    Raises
    ------
    ValueError
        When n is set and len(x) != n.
    """
    arr = np.asarray(x, dtype=float).reshape(-1)
    if n is not None and len(arr) != n:
        raise ValueError(f"{name} must have length {n}.")
    return arr


def _resolve_index(times, n):
    """
    Resolve output index.

    Parameters
    ----------
    times : array-like, pd.DatetimeIndex, or None
        Time index; if None, return integer range.
    n : int
        Expected length.

    Returns
    -------
    pd.RangeIndex or pd.DatetimeIndex
        Index of length n.

    Raises
    ------
    ValueError
        When len(times) != n.
    """
    if times is None:
        return pd.RangeIndex(start=0, stop=n, step=1, name="time")
    if isinstance(times, pd.DatetimeIndex):
        if len(times) != n:
            raise ValueError("times length must match inputs.")
        return times
    idx = pd.DatetimeIndex(times)
    if len(idx) != n:
        raise ValueError("times length must match inputs.")
    return idx


def _hankel_window(values, window):
    """
    Build MATLAB-style Hankel windows with trailing NaN padding.

    Equivalent to MATLAB: to avoid a loop, the time series are concatenated
    with a tail so one matrix call builds all windows, e.g.
    ``ghi_window = hankel(ghi, [ghi(end), NaN(1, interval_size-1)])``.
    Each row i is the window [values[i], values[i+1], ..., values[i+window-1]],
    with NaN padding where the window extends past the end of the series.

    Parameters
    ----------
    values : array-like
        1D time series.
    window : int
        Window length (number of columns).

    Returns
    -------
    np.ndarray
        Shape (len(values), window); row i = window starting at index i.
    """
    n = len(values)
    mat = np.full((n, window), np.nan, dtype=float)
    for i in range(window):
        end = n - i
        if end > 0:
            mat[:end, i] = values[i:]
    return mat


def _window_sufficient_valid(window_mat):
    """
    True where each row has more than half non-NaN (e.g. window=10 -> at least 5).

    Parameters
    ----------
    window_mat : np.ndarray
        2D array, shape (n, window).

    Returns
    -------
    np.ndarray of bool
        True where row has >= (window+1)//2 finite values.
    """
    n_valid = np.sum(np.isfinite(window_mat), axis=1)
    min_required = (window_mat.shape[1] + 1) // 2
    return n_valid >= min_required



def _safe_divide(num, den):
    """
    Safe division with NaN on invalid entries.

    Parameters
    ----------
    num : array-like
        Numerator.
    den : array-like
        Denominator.

    Returns
    -------
    np.ndarray
        num/den with non-finite results set to NaN.
    """
    with np.errstate(divide="ignore", invalid="ignore"):
        out = num / den
    out[~np.isfinite(out)] = np.nan
    return out


def _nanmax_no_warn(arr, axis):
    """
    nanmax without emitting all-NaN runtime warnings.

    Parameters
    ----------
    arr : np.ndarray
        Input array.
    axis : int
        Axis over which to take maximum.

    Returns
    -------
    np.ndarray
        np.nanmax(arr, axis=axis).
    """
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        out = np.nanmax(arr, axis=axis)
    return out


def _nanstd_no_warn(arr, axis, ddof=0):
    """
    nanstd without emitting empty-slice runtime warnings.

    Parameters
    ----------
    arr : np.ndarray
        Input array.
    axis : int
        Axis over which to take standard deviation.
    ddof : int, default 0
        Delta degrees of freedom for std.

    Returns
    -------
    np.ndarray
        np.nanstd(arr, axis=axis, ddof=ddof).
    """
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=RuntimeWarning)
        out = np.nanstd(arr, axis=axis, ddof=ddof)
    return out


def _csd_to_output(index, cloud_flag, method, diagnostics=None,
                   return_diagnostics=False):
    """
    Standardize CSD outputs.

    Parameters
    ----------
    index : pd.Index
        Output index.
    cloud_flag : array-like
        0=clear, 1=cloudy, NaN=invalid.
    method : str
        Method name for output column.
    diagnostics : dict, optional
        Extra arrays to add to output.
    return_diagnostics : bool, default False
        If True, include diagnostics in output.

    Returns
    -------
    pd.DataFrame
        Columns: is_clearsky, cloud_flag, method [, diagnostics].
    """
    cloud = np.asarray(cloud_flag, dtype=float)
    is_clearsky = pd.array(cloud == 0, dtype="boolean")
    is_clearsky[np.isnan(cloud)] = pd.NA

    out = pd.DataFrame(
        {
            "is_clearsky": is_clearsky,
            "cloud_flag": cloud,
            "method": method,
        },
        index=index,
    )
    if return_diagnostics and diagnostics is not None:
        for col, arr in diagnostics.items():
            out[col] = np.asarray(arr, dtype=float)
    return out


def _brightsun_component_flag(meas, clear, zenith, window=10, is_ghi=True,
                              return_diagnostics=False):
    """
    Bright-Sun component-level clear-sky test for one irradiance component.

    This helper implements the per-component criteria used inside the
    Bright-Sun tri-component method [1]_ (for either GHI or DHI):

    - Sliding-window statistics (length ``window`` minutes) on measured and
      clear-sky series.
    - Relative mean and max differences (|meas-clear|/clear) with
      zenith-dependent thresholds (:math:`c1`, :math:`c2`).
    - Normalized line-length difference of the time series (meas vs clear)
      with a zenith-dependent acceptance band (:math:`c3`).
    - Normalized slope variability (windowed std of first differences
      divided by mean) with threshold :math:`\\sigma_{\\mathrm{lim}}` (:math:`c4`).
    - Maximum absolute point-wise slope difference (X) with zenith-dependent
      limit (:math:`c5`).
    - Valid-clear check on clear-sky mean (:math:`c6`).

    For each time step, **1 = cloud, 0 = clear** at the component level.
    When all six criteria accept the point, it is considered clear for that
    component.

    Parameters
    ----------
    meas : array-like
        Measured component (GHI or DHI), 1D.
    clear : array-like
        Corresponding clear-sky estimate for the same component.
    zenith : array-like
        Solar zenith angle [degrees].
    window : int, default 10
        Sliding-window length (minutes) used to build Hankel matrices.
    is_ghi : bool, default True
        If True, apply Bright-Sun GHI parameterization; otherwise use the
        DHI parameterization.
    return_diagnostics : bool, default False
        If True, also return a diagnostics dict with individual criteria,
        thresholds, and intermediates.

    Returns
    -------
    cloud_flag : np.ndarray
        1 = cloudy, 0 = clear, NaN = invalid input.
    diagnostics : dict, optional
        Only when ``return_diagnostics=True``; contains arrays for c1–c6,
        intermediate statistics, and zenith-dependent thresholds.

    References
    ----------
    .. [1] Bright, J. M., Sun, X., Gueymard, C. A., Acord, B., Wang, P., &
       Engerer, N. A. (2020). Bright-Sun: A globally applicable 1-min
       irradiance clear-sky detection model. Renewable and Sustainable
       Energy Reviews, 121, 109706.
    """
    meas = _as_1d_array(meas, "meas")
    clear = _as_1d_array(clear, "clear", n=len(meas))
    zenith = _as_1d_array(zenith, "zenith", n=len(meas))
    n = len(meas)

    # Zenith grid 20–90°, 0.01 step; turnaround at 30° (MATLAB zenith_turn_around)
    z_ref = np.arange(20.0, 90.01, 0.01)
    n1 = int(round((30.0 - 20.0) / 0.01)) + 1
    if is_ghi:
        c1_lim_arr = np.flip(np.concatenate([
            np.linspace(0.5, 0.125, n1),
            np.linspace(0.125, 0.25, len(z_ref) - n1),
        ]))
        c2_lim_arr = np.flip(np.concatenate([
            np.linspace(0.5, 0.125, n1),
            np.linspace(0.125, 0.25, len(z_ref) - n1),
        ]))
        c3_lower_arr = np.flip(np.concatenate([
            np.linspace(-0.5, -0.5, n1),
            np.linspace(-0.5, -7.0, len(z_ref) - n1),
        ]))
        sigma_lim = 0.4
        c5_slope = 15.0
        c5_small = 45.0
    else:
        c1_lim_arr = np.flip(np.concatenate([
            np.linspace(0.5, 0.5, n1),
            np.linspace(0.5, 0.25, len(z_ref) - n1),
        ]))
        c2_lim_arr = np.flip(np.concatenate([
            np.linspace(0.5, 0.5, n1),
            np.linspace(0.5, 0.25, len(z_ref) - n1),
        ]))
        c3_lower_arr = np.flip(np.concatenate([
            np.linspace(-1.7, -1.7, n1),
            np.linspace(-1.7, -6.0, len(z_ref) - n1),
        ]))
        sigma_lim = 0.2
        c5_slope = 8.0
        c5_small = 24.0

    c3_upper_arr = np.abs(c3_lower_arr)
    c5_lim_arr = np.flip(np.concatenate([
        np.linspace(c5_slope, c5_slope, n1),
        np.linspace(c5_slope, c5_small, len(z_ref) - n1),
    ]))

    inds = np.searchsorted(z_ref, np.clip(zenith, z_ref[0], z_ref[-1]), side="left")
    inds = np.clip(inds, 0, len(z_ref) - 1)
    c1_lim = c1_lim_arr[inds]
    c2_lim = c2_lim_arr[inds]
    c3_lower = c3_lower_arr[inds]
    c3_upper = c3_upper_arr[inds]
    c5_lim = c5_lim_arr[inds]

    # Build Hankel matrices: each row = length-`window` time series segment,
    # used to compute local statistics over sliding windows.
    m_win = _hankel_window(meas, window)
    c_win = _hankel_window(clear, window)
    sufficient_m = _window_sufficient_valid(m_win)
    sufficient_c = _window_sufficient_valid(c_win)

    # Window-mean and window-max for measured / clear-sky component.
    meas_mean = np.nanmean(m_win, axis=1)
    clear_mean = np.nanmean(c_win, axis=1)
    meas_max = _nanmax_no_warn(m_win, axis=1)
    clear_max = _nanmax_no_warn(c_win, axis=1)
    meas_mean[~sufficient_m] = np.nan
    clear_mean[~sufficient_c] = np.nan
    meas_max[~sufficient_m] = np.nan
    clear_max[~sufficient_c] = np.nan

    # First differences for slope-based criteria and line-length.
    # MATLAB: diff along axis-0 (rows) + NaN-row pad → (n, window)
    _pad = lambda m: np.vstack([np.diff(m, axis=0),
                                np.full((1, m.shape[1]), np.nan)])
    meas_diff = _pad(m_win)
    clear_diff = _pad(c_win)
    meas_slope_nstd = _safe_divide(
        _nanstd_no_warn(meas_diff, axis=1, ddof=0), meas_mean)
    meas_line = np.nansum(
        np.sqrt(meas_diff * meas_diff + 1.0), axis=1)
    clear_line = np.nansum(
        np.sqrt(clear_diff * clear_diff + 1.0), axis=1)
    meas_slope_nstd[~sufficient_m] = np.nan
    meas_line[~sufficient_m] = np.nan
    clear_line[~sufficient_c] = np.nan

    line_diff_norm = _safe_divide(
        meas_line - clear_line, clear_line)
    X = _nanmax_no_warn(
        np.abs(meas_diff - clear_diff), axis=1)
    line_diff_norm[~sufficient_m | ~sufficient_c] = np.nan
    X[~sufficient_m | ~sufficient_c] = np.nan

    # 1=cloud by default; 0=clear when criterion passes (MATLAB: c1=0 when rel diff < lim)
    c1 = np.ones(n, dtype=float)
    c1[_safe_divide(np.abs(meas_mean - clear_mean), clear_mean) < c1_lim] = 0.0
    c2 = np.ones(n, dtype=float)
    c2[_safe_divide(np.abs(meas_max - clear_max), clear_max) < c2_lim] = 0.0
    c3 = np.ones(n, dtype=float)
    c3[(line_diff_norm > c3_lower) & (line_diff_norm < c3_upper)] = 0.0
    c4 = np.ones(n, dtype=float)
    c4[meas_slope_nstd < sigma_lim] = 0.0
    c5 = np.ones(n, dtype=float)
    c5[X < c5_lim] = 0.0
    c6 = np.ones(n, dtype=float)
    c6[(clear_mean != 0) & np.isfinite(clear_mean)] = 0.0

    cloud_flag = ((c1 + c2 + c3 + c4 + c5 + c6) > 0).astype(float)
    bad = np.isnan(meas) | np.isnan(clear)
    cloud_flag[bad] = np.nan
    if not return_diagnostics:
        return cloud_flag
    diagnostics = {
        "c1": c1,
        "c2": c2,
        "c3": c3,
        "c4": c4,
        "c5": c5,
        "c6": c6,
        "meas_mean": meas_mean,
        "clear_mean": clear_mean,
        "meas_max": meas_max,
        "clear_max": clear_max,
        "line_diff_norm": line_diff_norm,
        "meas_slope_nstd": meas_slope_nstd,
        "X": X,
        "c1_lim": c1_lim,
        "c2_lim": c2_lim,
        "c3_lower": c3_lower,
        "c3_upper": c3_upper,
        "c5_lim": c5_lim,
    }
    for key, arr in diagnostics.items():
        a = np.asarray(arr, dtype=float)
        a[bad] = np.nan
        diagnostics[key] = a
    return cloud_flag, diagnostics


def _reno_cloud_flag(ghi, ghi_clear, window=10, mean_lim=75.0, max_lim=75.0,
                     lower_L_lim=-5.0, upper_L_lim=10.0, sigma_lim=0.005,
                     X_lim=8.0):
    """
    Core Reno-style five-criteria cloud flag with an additional valid-clear
    criterion, following csd-library Reno2016CSD logic.

    Parameters
    ----------
    ghi : array-like
        Global horizontal irradiance. [W/m^2]
    ghi_clear : array-like
        Clear-sky GHI. [W/m^2]
    window : int, default 10
        Window size for criteria.
    mean_lim, max_lim, lower_L_lim, upper_L_lim, sigma_lim, X_lim : float
        Thresholds for the five criteria and line-diff bounds.

    Returns
    -------
    cloud_flag : np.ndarray
        0=clear, 1=cloudy, NaN=invalid.
    diagnostics : dict
        Arrays for each criterion and intermediates.
    """
    ghi = _as_1d_array(ghi, "ghi")
    ghi_clear = _as_1d_array(ghi_clear, "ghi_clear", n=len(ghi))
    n = len(ghi)

    # Build sliding-window matrices (each row = one window of length `window`).
    ghi_window = _hankel_window(ghi, window)
    clear_window = _hankel_window(ghi_clear, window)

    # Require more than half of window points to be valid; else treat stats as invalid.
    sufficient_ghi = _window_sufficient_valid(ghi_window)
    sufficient_clear = _window_sufficient_valid(clear_window)

    # Window mean and max: used in criteria c1 (mean difference) and c2 (max difference).
    meas_mean = np.nanmean(ghi_window, axis=1)
    clear_mean = np.nanmean(clear_window, axis=1)
    meas_max = _nanmax_no_warn(ghi_window, axis=1)
    clear_max = _nanmax_no_warn(clear_window, axis=1)
    meas_mean[~sufficient_ghi] = np.nan
    clear_mean[~sufficient_clear] = np.nan
    meas_max[~sufficient_ghi] = np.nan
    clear_max[~sufficient_clear] = np.nan

    # Along-window differences and derived stats: slope variability (c4) and line length (c3).
    # MATLAB: diff along axis-0 (rows) + NaN-row pad → (n, window)
    _pad = lambda m: np.vstack([np.diff(m, axis=0),
                                np.full((1, m.shape[1]), np.nan)])
    meas_diff = _pad(ghi_window)
    clear_diff = _pad(clear_window)

    meas_slope_nstd = _safe_divide(
        _nanstd_no_warn(meas_diff, axis=1, ddof=0), meas_mean)
    meas_line = np.nansum(
        np.sqrt(meas_diff * meas_diff + 1.0), axis=1)
    clear_line = np.nansum(
        np.sqrt(clear_diff * clear_diff + 1.0), axis=1)
    meas_slope_nstd[~sufficient_ghi] = np.nan
    meas_line[~sufficient_ghi] = np.nan
    clear_line[~sufficient_clear] = np.nan

    line_diff = meas_line - clear_line
    X = _nanmax_no_warn(
        np.abs(meas_diff - clear_diff), axis=1)
    line_diff[~sufficient_ghi | ~sufficient_clear] = np.nan
    X[~sufficient_ghi | ~sufficient_clear] = np.nan

    # Five Reno criteria plus valid-clear check: any True -> cloudy (1).
    c1 = np.abs(meas_mean - clear_mean) > mean_lim      # mean GHI vs clear-sky
    c2 = np.abs(meas_max - clear_max) > max_lim          # max GHI vs clear-sky
    c3 = (line_diff < lower_L_lim) | (line_diff > upper_L_lim)  # line-length difference
    c4 = meas_slope_nstd > sigma_lim                    # normalized slope std
    c5 = X > X_lim                                      # max absolute diff
    c6 = (clear_mean == 0) | (~np.isfinite(clear_mean))  # invalid or zero clear-sky

    cloud_flag = (c1 | c2 | c3 | c4 | c5 | c6).astype(float)
    bad = np.isnan(ghi) | np.isnan(ghi_clear)
    cloud_flag[bad] = np.nan

    # Package diagnostics and mask invalid input rows.
    diagnostics = {
        "c1": c1.astype(float),
        "c2": c2.astype(float),
        "c3": c3.astype(float),
        "c4": c4.astype(float),
        "c5": c5.astype(float),
        "c6": c6.astype(float),
        "meas_mean": meas_mean,
        "clear_mean": clear_mean,
        "meas_max": meas_max,
        "clear_max": clear_max,
        "line_diff": line_diff,
        "meas_slope_nstd": meas_slope_nstd,
        "X": X,
    }
    for key in diagnostics:
        arr = np.asarray(diagnostics[key], dtype=float)
        arr[bad] = np.nan
        diagnostics[key] = arr
    return cloud_flag, diagnostics


[docs] def reno_csd(ghi, ghi_clear, times=None, return_diagnostics=False): """ Reno2016 clear-sky detection [1]_. MATLAB mapping: `Reno2016CSD(ghi, ghics, plot_figure)` with `ghics -> ghi_clear`. Parameters ---------- ghi : array-like Global horizontal irradiance (`ghi`). [W/m^2] ghi_clear : array-like Clear-sky global horizontal irradiance (`ghics -> ghi_clear`). [W/m^2] times : array-like or pd.DatetimeIndex, optional Time index for outputs. return_diagnostics : bool, default False If True, include method diagnostics. Returns ------- out : pd.DataFrame Standardized output with `is_clearsky`, `cloud_flag`, and optional diagnostics. Raises ------ ValueError When input lengths do not match. References ---------- .. [1] Reno, M. J., & Hansen, C. W. (2016). Identification of periods of clear sky irradiance in time series of GHI measurements. Renewable Energy, 90, 520-531. """ ghi = _as_1d_array(ghi, "ghi") idx = _resolve_index(times, len(ghi)) cloud_flag, diagnostics = _reno_cloud_flag(ghi, ghi_clear) return _csd_to_output(idx, cloud_flag, "reno", diagnostics, return_diagnostics)
[docs] def ineichen_csd(ghi, ghi_extra, zenith, times=None, return_diagnostics=False): """ Ineichen2009 clear-sky detection [1]_. MATLAB mapping: `Ineichen2009CSD(ghi, exth, zen, plot_figure)` with `exth -> ghi_extra`, `zen -> zenith`. Convention: MATLAB csd(kt_prime<0.65)=1 marks low clearness (cloudy); here cloud_flag 0=clear, 1=cloudy, so cloud_flag=1 where kt_prime<0.65. Parameters ---------- ghi : array-like Global horizontal irradiance (`ghi`). [W/m^2] ghi_extra : array-like Horizontal extraterrestrial irradiance (`exth -> ghi_extra`). [W/m^2] zenith : array-like Solar zenith angle (`zen -> zenith`). [degrees] times : array-like or pd.DatetimeIndex, optional Time index for outputs. return_diagnostics : bool, default False If True, include method diagnostics. Returns ------- out : pd.DataFrame Standardized output with `is_clearsky`, `cloud_flag`, and optional diagnostics. Raises ------ ValueError When input lengths do not match. References ---------- .. [1] Ineichen, P., Barroso, C. S., Geiger, B., Hollmann, R., Marsouin, A., & Mueller, R. (2009). Satellite Application Facilities irradiance products: hourly time step comparison and validation over Europe. International Journal of Remote Sensing, 30(21), 5549-5571. """ ghi = _as_1d_array(ghi, "ghi") ghi_extra = _as_1d_array(ghi_extra, "ghi_extra", n=len(ghi)) zenith = _as_1d_array(zenith, "zenith", n=len(ghi)) idx = _resolve_index(times, len(ghi)) kt = _safe_divide(ghi, ghi_extra) h = 90.0 - zenith with np.errstate(divide="ignore", invalid="ignore"): M = 1.0 / (np.sin(np.radians(h)) + 0.15 * np.power(h + 3.885, -1.253)) kt_prime = kt / (1.031 * np.exp(-1.4 / (0.9 + 9.4 / M)) + 0.1) # Convention: Python cloud_flag 0=clear, 1=cloudy. In the reference MATLAB, csd(kt_prime<0.65)=1 # assigns 1 where modified clearness is low (cloudy). So: kt_prime < 0.65 -> cloudy (cloud_flag=1). cloud_flag = np.zeros(len(ghi), dtype=float) cloud_flag[kt_prime < 0.65] = 1.0 bad = (np.isnan(ghi) | np.isnan(ghi_extra) | np.isnan(zenith) | (zenith >= 90.0) | (ghi_extra <= 0.0)) cloud_flag[bad] = np.nan diagnostics = {"kt": kt, "M": M, "kt_prime": kt_prime} for key in diagnostics: arr = np.asarray(diagnostics[key], dtype=float) arr[bad] = np.nan diagnostics[key] = arr return _csd_to_output(idx, cloud_flag, "ineichen", diagnostics, return_diagnostics)
[docs] def lefevre_csd(ghi, dhi, ghi_extra, zenith, times=None, return_diagnostics=False): """ Lefevre2013 clear-sky detection [1]_. MATLAB mapping: `Lefevre2013CSD(ghi, dif, exth, zen, plot_figure)` with `dif -> dhi`, `exth -> ghi_extra`, `zen -> zenith`. Parameters ---------- ghi : array-like Global horizontal irradiance (`ghi`). [W/m^2] dhi : array-like Diffuse horizontal irradiance (`dif -> dhi`). [W/m^2] ghi_extra : array-like Horizontal extraterrestrial irradiance (`exth -> ghi_extra`). [W/m^2] zenith : array-like Solar zenith angle (`zen -> zenith`). [degrees] times : array-like or pd.DatetimeIndex, optional Time index for outputs. return_diagnostics : bool, default False If True, include method diagnostics. Returns ------- out : pd.DataFrame Standardized output with `is_clearsky`, `cloud_flag`, and optional diagnostics. Raises ------ ValueError When input lengths do not match. References ---------- .. [1] Lefèvre, M., Oumbe, A., Blanc, P., Espinar, B., Gschwind, B., Qu, Z., ... & Morcrette, J. J. (2013). McClear: a new model estimating downwelling solar radiation at ground level in clear-sky conditions. Atmospheric Measurement Techniques, 6(9), 2403-2418. """ ghi = _as_1d_array(ghi, "ghi") dhi = _as_1d_array(dhi, "dhi", n=len(ghi)) ghi_extra = _as_1d_array(ghi_extra, "ghi_extra", n=len(ghi)) zenith = _as_1d_array(zenith, "zenith", n=len(ghi)) idx = _resolve_index(times, len(ghi)) cloud_flag = np.ones(len(ghi), dtype=float) k = _safe_divide(dhi, ghi) # Invalidate k where GHI or DHI are non-physical (negative or zero GHI). non_physical = (ghi <= 0) | (dhi < 0) k[non_physical] = np.nan first_filter_clear = k < 0.3 cloud_flag[first_filter_clear] = 0.0 kt = _safe_divide(ghi, ghi_extra) h = 90.0 - zenith with np.errstate(divide="ignore", invalid="ignore"): M = 1.0 / (np.sin(np.radians(h)) + 0.15 * np.power(h + 3.885, -1.253)) kt_prime = kt / (1.031 * np.exp(-1.4 / (0.9 + 9.4 / M)) + 0.1) # Lefevre second filter: require enough first-filter-retained points # in both [t-90, t] and [t, t+90], then evaluate std of kt' over [t-90, t+90]. retained = first_filter_clear & np.isfinite(kt_prime) retained_f = pd.Series(retained.astype(float)) min_retained = int(np.ceil(0.3 * 91)) prev_count = retained_f.rolling(91, min_periods=91).sum().to_numpy() next_count = retained_f.iloc[::-1].rolling(91, min_periods=91).sum().iloc[::-1].to_numpy() enough_30pct = (prev_count >= min_retained) & (next_count >= min_retained) kt_prime_masked = np.where(retained, kt_prime, np.nan) KTp = pd.Series(kt_prime_masked).rolling(181, center=True, min_periods=1).std(ddof=0).to_numpy() # Keep second filter on the subset retained by first filter (k<0.3), # consistent with the textual method description ("remaining data"). second_filter_clear = first_filter_clear & enough_30pct & np.isfinite(KTp) & (KTp < 0.02) cloud_flag[second_filter_clear] = 0.0 bad = ( np.isnan(ghi) | np.isnan(dhi) | np.isnan(ghi_extra) | np.isnan(zenith) | (ghi <= 0) | (dhi < 0) ) cloud_flag[bad] = np.nan diagnostics = {"k": k, "kt": kt, "M": M, "kt_prime": kt_prime, "KTp": KTp} for key in diagnostics: # Writable copy: ``np.asarray`` can alias read-only buffers (e.g. pandas ``rolling`` output). arr = np.array(diagnostics[key], dtype=float, copy=True) arr[bad] = np.nan diagnostics[key] = arr return _csd_to_output(idx, cloud_flag, "lefevre", diagnostics, return_diagnostics)
def _optimise_alpha(meas, clear): """ Closed-form RMSE-optimal scalar alpha: min_a sqrt(mean((y-a*x)^2)). Mathematically equivalent to MATLAB ``fminsearch`` on the same scalar-alpha RMSE objective. Parameters ---------- meas : np.ndarray Measured irradiance (clear-period subset). clear : np.ndarray Clear-sky irradiance (clear-period subset). Returns ------- float or None Optimal alpha, or None if denominator is non-positive. """ denom = np.nansum(clear * clear) if not np.isfinite(denom) or denom <= 0: return None alpha = np.nansum(meas * clear) / denom if not np.isfinite(alpha): return None return float(alpha)
[docs] def brightsun_csd(zenith, ghi, ghi_clear, dhi, dhi_clear, times, return_diagnostics=False): """ BrightSun2020CSDc clear-sky detection (tri-component) [1]_. MATLAB mapping: ``BrightSun2020CSDc(zen, ghi, ghics, dif, difcs, LST)`` with ``zen -> zenith``, ``ghics -> ghi_clear``, ``dif -> dhi``, ``difcs -> dhi_clear``. The method proceeds in four stages matching the MATLAB routine: 1. Initial Reno-style CSD guess for candidate clear periods. 2. Daily clear-sky optimisation scales GHI, DHI, BNI clear-sky curves independently (alpha bounds ``[0.7, 1.5]`` for GHI/BNI, ``[0.3, 1.5]`` for DHI). 3. Tri-component analysis on optimised curves. 4. Cascaded duration filters (90/30/10-min). Parameters ---------- zenith : array-like Solar zenith angle. [degrees] ghi : array-like Global horizontal irradiance. [W/m^2] ghi_clear : array-like Clear-sky GHI. [W/m^2] dhi : array-like Diffuse horizontal irradiance. [W/m^2] dhi_clear : array-like Clear-sky DHI. [W/m^2] times : array-like or pd.DatetimeIndex Time index (MATLAB ``LST`` equivalent). return_diagnostics : bool, default False If True, include method diagnostics. Returns ------- out : pd.DataFrame Columns: ``is_clearsky``, ``cloud_flag`` (duration-filtered), ``method``; diagnostics when requested. Raises ------ ValueError When input lengths do not match. References ---------- .. [1] Bright, J. M., Sun, X., Gueymard, C. A., Acord, B., Wang, P., & Engerer, N. A. (2020). Bright-Sun: A globally applicable 1-min irradiance clear-sky detection model. Renewable and Sustainable Energy Reviews, 121, 109706. """ zenith = _as_1d_array(zenith, "zenith") ghi = _as_1d_array(ghi, "ghi", n=len(zenith)) ghi_clear = _as_1d_array( ghi_clear, "ghi_clear", n=len(zenith)) dhi = _as_1d_array(dhi, "dhi", n=len(zenith)) dhi_clear = _as_1d_array( dhi_clear, "dhi_clear", n=len(zenith)) idx = _resolve_index(times, len(zenith)) n_output = len(zenith) # Cap DHI at GHI dhi = np.minimum(dhi, ghi) dhi_clear = np.minimum(dhi_clear, ghi_clear) # BNI from closure mu0 = np.cos(np.radians(zenith)) bni = _safe_divide(ghi - dhi, mu0) bni_clear = _safe_divide(ghi_clear - dhi_clear, mu0) # NaN removal (MATLAB pattern) idxs_nan = (np.isnan(ghi) | np.isnan(bni) | np.isnan(dhi)) not_nan = ~idxs_nan ghi_c = ghi[not_nan].copy() bni_c = bni[not_nan].copy() dhi_c = dhi[not_nan].copy() ghics_c = ghi_clear[not_nan].copy() bnics_c = bni_clear[not_nan].copy() dhics_c = dhi_clear[not_nan].copy() zen_c = zenith[not_nan].copy() n_clean = len(ghi_c) times_c = None if isinstance(idx, pd.DatetimeIndex): times_c = idx[not_nan] # ---- Stage 1: initial Reno guess (sigma_lim=0.1) ---- csd_initial, _ = _reno_cloud_flag( ghi_c, ghics_c, mean_lim=75.0, max_lim=75.0, lower_L_lim=-5.0, upper_L_lim=10.0, sigma_lim=0.1, X_lim=8.0, ) csd_initial = np.where( np.isfinite(csd_initial), csd_initial, 1.0) # ---- Stage 2: daily optimisation (GHI, DHI, BNI) ---- opt_thres = 30.0 upper_a = 1.5 lower_a = 0.7 lower_a_dhi = 0.3 if times_c is not None and len(times_c) > 0: day_key = times_c.floor("D") for d in np.unique(day_key): ix = np.where(day_key == d)[0] csdd = csd_initial[ix].copy() csdd[(dhi_c[ix] < opt_thres) | (ghi_c[ix] < opt_thres)] = 1.0 cm = csdd == 0.0 if np.sum(cm) <= 60: continue # GHI optimisation a = _optimise_alpha(ghi_c[ix][cm], ghics_c[ix][cm]) if a is not None: ghics_c[ix] *= np.clip(a, lower_a, upper_a) # DHI optimisation a = _optimise_alpha(dhi_c[ix][cm], dhics_c[ix][cm]) if a is not None: dhics_c[ix] *= np.clip(a, lower_a_dhi, upper_a) # BNI optimisation a = _optimise_alpha(bni_c[ix][cm], bnics_c[ix][cm]) if a is not None: bnics_c[ix] *= np.clip(a, lower_a, upper_a) # ---- Stage 3: tri-component analysis ---- cloud_ghi = _brightsun_component_flag( ghi_c, ghics_c, zen_c, window=10, is_ghi=True) cloud_dhi = _brightsun_component_flag( dhi_c, dhics_c, zen_c, window=10, is_ghi=False) # BNI: zenith-dependent kcb threshold z_ref = np.arange(30.0, 90.01, 0.01) kc_lims = np.flip(np.linspace(0.5, 0.9, len(z_ref))) inds = np.searchsorted( z_ref, np.clip(zen_c, z_ref[0], z_ref[-1]), side="left") inds = np.clip(inds, 0, len(z_ref) - 1) kc_lim_c = kc_lims[inds] kcb_c = _safe_divide(bni_c, bnics_c) cloud_bni = np.ones(n_clean, dtype=float) cloud_bni[kcb_c > kc_lim_c] = 0.0 csd_overall = np.ones(n_clean, dtype=float) csd_overall[(cloud_ghi + cloud_dhi + cloud_bni) == 0] = 0.0 # ---- Stage 4: duration filters ---- # Inline centred Hankel filter: sum forward window, shift to centre def _dur(flags, w, tol): h = np.nansum(_hankel_window(flags, w), axis=1) h = np.concatenate([np.full(w // 2, np.nan), h[:n_clean - w // 2]]) out = np.zeros(n_clean, dtype=float) out[h > tol] = 1.0 return out # 1st: 90 min, tolerance 10 csd_1st = _dur(csd_overall, 90, 10) # Sunrise/sunset proximity (zen ≈ 85°) ss_idx = np.where(np.round(zen_c) == 85)[0] if len(ss_idx) > 0: all_i = np.arange(n_clean) ss_s = np.sort(ss_idx) pos = np.searchsorted(ss_s, all_i) pos = np.clip(pos, 0, len(ss_s) - 1) pl = np.clip(pos - 1, 0, len(ss_s) - 1) dist_ss = np.minimum( np.abs(all_i - ss_s[pos]), np.abs(all_i - ss_s[pl])) else: dist_ss = np.full(n_clean, n_clean) csd_1st[dist_ss < 90] = 0.0 # 2nd: 30 min, tolerance 0 csd_2nd = _dur(csd_overall, 30, 0) # 3rd: 10 min, tolerance 2 (sunrise/sunset override) csd_3rd = _dur(csd_overall, 10, 2) csd_2nd[(csd_3rd == 0) & (dist_ss < 90)] = 0.0 # Final: all must agree clear csd_filtered = np.ones(n_clean, dtype=float) csd_filtered[ (csd_overall + csd_1st + csd_2nd) == 0] = 0.0 # ---- Map back to full length ---- cloud_flag = np.ones(n_output, dtype=float) cloud_flag[not_nan] = csd_filtered cloud_flag[idxs_nan] = np.nan def _expand(arr): full = np.full(n_output, np.nan) full[not_nan] = arr return full diagnostics = { "cloud_ghi": _expand(cloud_ghi), "cloud_dhi": _expand(cloud_dhi), "cloud_bni": _expand(cloud_bni), "kcb": _expand(kcb_c), "kc_lim": _expand(kc_lim_c), "mu0": mu0, "bni": bni, "bni_clear": bni_clear, } return _csd_to_output( idx, cloud_flag, "brightsun", diagnostics, return_diagnostics)
[docs] def detect_clearsky(method, **kwargs): """ Wrapper for clear-sky detection methods. Parameters ---------- method : {"reno", "ineichen", "lefevre", "brightsun"} Method name. **kwargs : dict Arguments forwarded to method function. Returns ------- out : pd.DataFrame Standardized CSD output. Raises ------ ValueError When method is not one of reno, ineichen, lefevre, brightsun. """ method_key = str(method).strip().lower() if method_key == "reno": return reno_csd(**kwargs) if method_key == "ineichen": return ineichen_csd(**kwargs) if method_key == "lefevre": return lefevre_csd(**kwargs) if method_key == "brightsun": return brightsun_csd(**kwargs) raise ValueError( "Unsupported method. Choose one of brightsun, ineichen, lefevre, reno." )