import os
import pandas as pd
import numpy as np
from typing import Dict, Sequence, Union, Callable, List
from reclaim.dynamic_features.utils.statistical_metrics import (
annual_mean,
annual_std,
skewness,
kurtosis_val,
coefficient_of_variation,
max_days_above_90th,
max_annual_persistence
)
from reclaim.dynamic_features.utils.inflow_outflow import (
mean_annual_flow_m3_per_s,
mean_annual_flow_std_m3_per_s,
max_annual_flow_m3_per_s,
mean_annual_flow_variability
)
from reclaim.dynamic_features.utils.ts_aggregate import compute_ts_aggregates
# Define which features depend on which variable
VARIABLE_FEATURES = {
"inflow": {
"MAI": mean_annual_flow_m3_per_s,
"PAI": max_annual_flow_m3_per_s,
"I_cv": mean_annual_flow_variability,
"I_std": mean_annual_flow_std_m3_per_s,
"I_above_90": max_days_above_90th,
"I_max_persis": max_annual_persistence,
},
"outflow": {
"MAO": mean_annual_flow_m3_per_s,
"O_std": mean_annual_flow_std_m3_per_s,
"O_cv": mean_annual_flow_variability,
},
"evaporation": {
"E_mean": annual_mean,
"E_std": annual_std,
},
"surface_area": {
"SA_mean": annual_mean,
"SA_std": annual_std,
"SA_cv": coefficient_of_variation,
"SA_skew": skewness,
"SA_kurt": kurtosis_val,
"SA_mean_clip": annual_mean,
"SA_above_90": max_days_above_90th,
},
"nssc": {
"NSSC1_mean": annual_mean,
"NSSC1_std": annual_std,
"NSSC1_cv": coefficient_of_variation,
"NSSC1_skew": skewness,
"NSSC1_kurt": kurtosis_val,
},
"nssc2": {
"NSSC2_mean": annual_mean,
"NSSC2_above_90": max_days_above_90th,
"NSSC2_max_persis": max_annual_persistence,
},
}
[docs]
def reservoir_based_dynamic_features(
variable_info: Dict[str, Dict[str, str]],
observation_intervals: List[Sequence[int]]
) -> pd.DataFrame:
"""
Compute dynamic reservoir features for a single reservoir using inflow, outflow,
surface area, evaporation, and sediment-related time series.
Required time series keys (case-sensitive):
- ``inflow``: Daily inflow in m³/day
- ``outflow``: Daily outflow in m³/day
- ``evaporation``: Daily evaporation in mm/day
- ``surface_area``: Reservoir surface area in km²
- ``nssc``: Normalized suspended sediment concentration variant 1 (red/green) (dimensionless)
- ``nssc2``: Normalized suspended sediment concentration variant 2 (near-infrared/red) (dimensionless)
Parameters
----------
variable_info : dict
Dictionary of input series metadata.
Each key corresponds to a variable (``inflow``, ``outflow``, ``evaporation``, ``surface_area``, ``nssc``, ``nssc2``).
Each value is a dict with the following structure::
{
"path": str, # Path to the CSV file
"time_column": str, # Name of the datetime column
"data_column": str # Name of the variable column
}
Example::
{
"inflow": {"path": "data/inflow.csv", "time_column": "date", "data_column": "inflow (m3/d)"},
"outflow": {"path": "data/outflow.csv", "time_column": "date", "data_column": "outflow (m3/d)"}
}
observation_intervals : list of list of int
List of [start_year, end_year] intervals to compute features over.
Returns
-------
pd.DataFrame
A DataFrame containing as many rows as ``observation_intervals`` and columns corresponding to the computed reservoir dynamic features.
Missing variables in ``variable_info`` will result in NaN values for their features.
Notes
-----
- All inflow/outflow metrics are converted to m³/s internally.
- Surface area statistics are reported both for full record and clipped period.
- NSSC statistics are dimensionless.
- If a variable is missing in ``variable_info``, its corresponding features are NaN.
"""
all_vars = []
# Loop through required variables
for var, feat_dict in VARIABLE_FEATURES.items():
if var not in variable_info:
all_vars.append(
pd.DataFrame(np.nan, index=range(len(observation_intervals)),
columns=feat_dict.keys())
)
continue
path = variable_info[var]["path"]
time_col = variable_info[var]["time_column"]
data_col = variable_info[var]["data_column"]
try:
df_var = compute_ts_aggregates(
ts_csv_path=path,
time_column=time_col,
value_column=data_col,
feature_functions=feat_dict,
intervals=observation_intervals,
)
all_vars.append(df_var)
except Exception:
df_var = pd.DataFrame()
all_vars.append(df_var)
return pd.concat(all_vars, axis=1)