Coverage for src/stable_yield_lab/reporting/__init__.py: 74%
218 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-04 20:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-04 20:38 +0000
1from __future__ import annotations
3from pathlib import Path
4from typing import cast
6import pandas as pd
8from stable_yield_lab.core import PoolRepository
10from ..analytics.metrics import Metrics
12_RISK_COLUMNS = [
13 "sharpe_ratio",
14 "sortino_ratio",
15 "max_drawdown",
16 "negative_period_share",
17]
20def _ensure_outdir(outdir: str | Path) -> Path:
21 p = Path(outdir)
22 p.mkdir(parents=True, exist_ok=True)
23 return p
26def _risk_metrics(series: pd.Series) -> dict[str, float]:
27 metrics = {col: float("nan") for col in _RISK_COLUMNS}
28 if series is None or series.empty: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true
29 return metrics
31 clean = series.dropna()
32 if clean.empty: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true
33 return metrics
35 mean_return = float(clean.mean())
36 std_return = float(clean.std(ddof=1))
37 if std_return > 0.0:
38 metrics["sharpe_ratio"] = mean_return / std_return
40 downside = clean[clean < 0.0]
41 if not downside.empty:
42 downside_std = float((downside.pow(2).mean()) ** 0.5)
43 if downside_std > 0.0: 43 ↛ 46line 43 didn't jump to line 46 because the condition on line 43 was always true
44 metrics["sortino_ratio"] = mean_return / downside_std
46 nav = (1.0 + clean).cumprod()
47 if not nav.empty: 47 ↛ 51line 47 didn't jump to line 51 because the condition on line 47 was always true
48 drawdown = nav / nav.cummax() - 1.0
49 metrics["max_drawdown"] = float(drawdown.min())
51 metrics["negative_period_share"] = float(clean.lt(0.0).mean())
52 return metrics
55def _weighted_portfolio_returns(returns: pd.DataFrame, weights: pd.Series) -> pd.Series:
56 if returns.empty or weights.empty: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 return pd.Series(dtype=float)
59 aligned_weights = weights.reindex(returns.columns).fillna(0.0)
60 aligned_weights = aligned_weights[aligned_weights > 0.0]
61 if aligned_weights.empty: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 return pd.Series(dtype=float)
64 subset = returns.loc[:, aligned_weights.index]
65 if subset.empty: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 return pd.Series(dtype=float)
68 weight_sum = (~subset.isna()).mul(aligned_weights, axis=1).sum(axis=1)
69 weighted_sum = subset.mul(aligned_weights, axis=1).sum(axis=1, min_count=1)
70 portfolio = weighted_sum / weight_sum
71 return portfolio.dropna()
74def _infer_periods_per_year(index: pd.DatetimeIndex) -> float | None:
75 if index.size < 2:
76 return None
78 ordered = index.sort_values()
79 diffs = ordered.to_series().diff().dropna()
80 if diffs.empty:
81 return None
83 avg_days = diffs.dt.total_seconds().mean() / 86_400.0
84 if avg_days and avg_days > 0.0:
85 return 365.25 / avg_days
86 return None
89def _annualized_return(series: pd.Series) -> float | None:
90 if series.empty:
91 return None
92 index = series.index
93 if not isinstance(index, pd.DatetimeIndex):
94 return None
95 periods_per_year = _infer_periods_per_year(index)
96 if periods_per_year is None:
97 return None
99 growth = float((1.0 + series).prod())
100 periods = series.shape[0]
101 if growth <= 0.0 or periods <= 0:
102 return None
104 return float(growth ** (periods_per_year / periods) - 1.0)
107def _tvl_weighted_average(values: pd.Series, weights: pd.Series) -> float:
108 mask = (~values.isna()) & (~weights.isna()) & (weights > 0.0)
109 if not mask.any(): 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true
110 return float("nan")
112 vals = values.loc[mask].astype(float).tolist()
113 wts = weights.loc[mask].astype(float).tolist()
114 return Metrics.weighted_mean(vals, wts)
117def cross_section_report(
118 repo: PoolRepository,
119 outdir: str | Path,
120 *,
121 perf_fee_bps: float = 0.0,
122 mgmt_fee_bps: float = 0.0,
123 top_n: int = 20,
124 returns: pd.DataFrame | None = None,
125 realised_apy_lookback_days: int | None = 365,
126 realised_apy_min_observations: int = 5,
127) -> dict[str, Path]:
128 """Generate file-first CSV outputs for the given snapshot repository.
130 Parameters
131 ----------
132 repo:
133 Snapshot repository describing pools at a point in time.
134 outdir:
135 Directory where CSV reports are written.
136 perf_fee_bps, mgmt_fee_bps:
137 Fee assumptions applied before writing ``net_apy`` values.
138 top_n:
139 Number of pools to include in ``topN.csv``.
140 returns:
141 Optional wide DataFrame of realised periodic returns (index timestamp,
142 columns pool name). When provided, the ``concentration.csv`` output
143 includes Sharpe ratio, Sortino ratio, maximum drawdown, and the share of
144 negative periods for each pool and aggregate grouping.
145 realised_apy_lookback_days:
146 Number of trailing days considered when computing realised APY. Set to
147 ``None`` to use the full history available.
148 realised_apy_min_observations:
149 Minimum number of non-null observations required before a realised APY
150 is calculated. Pools with fewer observations emit a warning.
152 Returns
153 -------
154 dict[str, Path]
155 Mapping of report label to the written CSV path.
157 Writes the following CSVs:
158 - pools.csv: all pools with net_apy column
159 - by_chain.csv: aggregated by chain with TVL-weighted APY
160 - by_source.csv: aggregated by source (protocol)
161 - by_stablecoin.csv: aggregated by stablecoin symbol
162 - topN.csv: top-N pools by base_apy
163 - concentration.csv: HHI metrics across chain and stablecoin augmented
164 with realised risk statistics when ``returns`` are supplied
165 - warnings.csv: per-pool realised APY warnings and observation counts
166 """
167 out = _ensure_outdir(outdir)
168 paths: dict[str, Path] = {}
170 df = repo.to_dataframe()
171 df = Metrics.add_net_apy_column(df, perf_fee_bps=perf_fee_bps, mgmt_fee_bps=mgmt_fee_bps)
173 metrics_index = pd.Index(df["name"], name="name") if not df.empty else pd.Index([], name="name")
174 realised_metrics = pd.DataFrame(index=metrics_index)
175 realised_metrics["realised_apy"] = pd.Series(float("nan"), index=metrics_index, dtype=float)
176 realised_metrics["realised_apy_observations"] = pd.Series(
177 pd.NA, index=metrics_index, dtype="Int64"
178 )
179 realised_metrics["realised_apy_warning"] = pd.Series(pd.NA, index=metrics_index, dtype="string")
180 warnings_records: list[dict[str, object]] = []
182 metrics_map: dict[str, dict[str, float]] = {}
183 if returns is not None and not returns.empty and not df.empty:
184 returns_df = returns.copy()
185 if isinstance(returns_df, pd.Series): 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 returns_df = returns_df.to_frame()
187 returns_df.index = pd.to_datetime(returns_df.index, utc=True, errors="coerce")
188 returns_df = returns_df.loc[~returns_df.index.isna()]
189 returns_df = returns_df.sort_index()
190 returns_df = returns_df.apply(pd.to_numeric, errors="coerce")
191 returns_df = returns_df.reindex(columns=metrics_index)
192 returns_df = returns_df.dropna(how="all")
194 if (
195 realised_apy_lookback_days is not None
196 and realised_apy_lookback_days > 0
197 and not returns_df.empty
198 ):
199 cutoff = returns_df.index.max() - pd.Timedelta(days=int(realised_apy_lookback_days))
200 returns_df = returns_df.loc[returns_df.index >= cutoff]
201 returns_df = returns_df.dropna(how="all")
203 metadata = df.set_index("name")
204 min_required = max(1, realised_apy_min_observations)
206 for pool_name in metrics_index:
207 series = returns_df.get(pool_name, pd.Series(dtype=float))
208 series = series.dropna()
209 observations = int(series.shape[0])
210 warning: str | None = None
211 realised_value = float("nan")
213 if observations < min_required: 213 ↛ 225line 213 didn't jump to line 225 because the condition on line 213 was always true
214 shortfall_text = (
215 f"Only {min_required - 1} observations or fewer in history lookback "
216 f"(available {observations}; minimum {min_required})"
217 )
218 if realised_apy_lookback_days is None or realised_apy_lookback_days <= 0: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 warning = shortfall_text
220 else:
221 warning = (
222 f"{shortfall_text} within the past {int(realised_apy_lookback_days)} days"
223 )
224 else:
225 realised = _annualized_return(series)
226 if realised is None:
227 warning = "Unable to annualise returns with available history"
228 else:
229 realised_value = realised
231 realised_metrics.loc[pool_name, "realised_apy"] = realised_value
232 realised_metrics.loc[pool_name, "realised_apy_observations"] = observations
233 if warning is not None: 233 ↛ 206line 233 didn't jump to line 206 because the condition on line 233 was always true
234 realised_metrics.loc[pool_name, "realised_apy_warning"] = warning
235 warnings_records.append(
236 {
237 "pool": pool_name,
238 "observations": observations,
239 "message": warning,
240 }
241 )
243 if not returns_df.empty:
244 weights = metadata.get("tvl_usd", pd.Series(dtype=float)).astype(float)
245 returns_aligned = returns_df.reindex(columns=metadata.index)
247 total_series = _weighted_portfolio_returns(returns_aligned, weights)
248 metrics_map["total"] = _risk_metrics(total_series)
250 for chain, names in metadata.groupby("chain").groups.items():
251 chain_weights = weights.loc[list(names)]
252 series = _weighted_portfolio_returns(returns_aligned, chain_weights)
253 metrics_map[f"chain:{chain}"] = _risk_metrics(series)
255 for stable, names in metadata.groupby("stablecoin").groups.items():
256 stable_weights = weights.loc[list(names)]
257 series = _weighted_portfolio_returns(returns_aligned, stable_weights)
258 metrics_map[f"stablecoin:{stable}"] = _risk_metrics(series)
260 for pool_name in returns_aligned.columns:
261 metrics_map[f"pool:{pool_name}"] = _risk_metrics(returns_aligned[pool_name])
263 df = df.join(realised_metrics, on="name")
264 if "realised_apy" not in df.columns: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 df["realised_apy"] = pd.Series(dtype=float)
266 if "realised_apy_observations" not in df.columns: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 df["realised_apy_observations"] = pd.Series(dtype="Int64")
268 if "realised_apy_warning" not in df.columns: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 df["realised_apy_warning"] = pd.Series(dtype="string")
271 paths["pools"] = out / "pools.csv"
272 df.to_csv(paths["pools"], index=False)
274 warnings_df = pd.DataFrame(warnings_records, columns=["pool", "observations", "message"])
275 if warnings_df.empty:
276 warnings_df = pd.DataFrame(columns=["pool", "observations", "message"])
277 else:
278 warnings_df["observations"] = warnings_df["observations"].astype("Int64")
279 warnings_df["message"] = warnings_df["message"].astype("string")
280 paths["warnings"] = out / "warnings.csv"
281 warnings_df.to_csv(paths["warnings"], index=False)
283 # Aggregations
284 by_chain = Metrics.groupby_chain(repo)
285 chain_realised = pd.DataFrame(
286 columns=["chain", "realised_apy_avg", "realised_apy_wavg", "realised_apy_observations"]
287 )
288 if not df.empty: 288 ↛ 304line 288 didn't jump to line 304 because the condition on line 288 was always true
289 chain_realised = (
290 df.groupby("chain")
291 .agg(
292 realised_apy_avg=("realised_apy", "mean"),
293 realised_apy_wavg=(
294 "realised_apy",
295 lambda x: _tvl_weighted_average(x, df.loc[x.index, "tvl_usd"]),
296 ),
297 realised_apy_observations=(
298 "realised_apy_observations",
299 lambda x: int(x.fillna(0).sum()),
300 ),
301 )
302 .reset_index()
303 )
304 chain_realised = chain_realised.astype({"realised_apy_observations": "Int64"}, errors="ignore")
305 if by_chain.empty: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 by_chain = chain_realised
307 else:
308 by_chain = by_chain.merge(chain_realised, on="chain", how="left")
309 paths["by_chain"] = out / "by_chain.csv"
310 by_chain.to_csv(paths["by_chain"], index=False)
312 def _agg(df: pd.DataFrame, key: str) -> pd.DataFrame:
313 if df.empty: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 return df
315 g = df.groupby(key).agg(
316 pools=("name", "count"),
317 tvl=("tvl_usd", "sum"),
318 apr_avg=("base_apy", "mean"),
319 apr_wavg=(
320 "base_apy",
321 lambda x: (x * df.loc[x.index, "tvl_usd"]).sum() / df.loc[x.index, "tvl_usd"].sum(),
322 ),
323 )
324 return g.reset_index()
326 by_source = _agg(df, "source")
327 by_stable = _agg(df, "stablecoin")
328 paths["by_source"] = out / "by_source.csv"
329 paths["by_stablecoin"] = out / "by_stablecoin.csv"
330 by_source.to_csv(paths["by_source"], index=False)
331 by_stable.to_csv(paths["by_stablecoin"], index=False)
333 # Top N
334 top = df.sort_values("base_apy", ascending=False).head(top_n)
335 paths["topN"] = out / "topN.csv"
336 top.to_csv(paths["topN"], index=False)
338 # Concentration metrics enriched with realised risk metrics
339 hhi_total = Metrics.hhi(df, value_col="tvl_usd")
340 hhi_chain = Metrics.hhi(df, value_col="tvl_usd", group_col="chain")
341 hhi_stable = Metrics.hhi(df, value_col="tvl_usd", group_col="stablecoin")
343 def _metrics_for(scope: str) -> dict[str, float]:
344 values = metrics_map.get(scope)
345 if not values:
346 return {col: float("nan") for col in _RISK_COLUMNS}
347 return {col: values.get(col, float("nan")) for col in _RISK_COLUMNS}
349 conc_records: list[dict[str, float | str]] = []
351 total_hhi = float(hhi_total["hhi"].iloc[0]) if not hhi_total.empty else float("nan")
352 record = cast(dict[str, float | str], {"scope": "total", "hhi": total_hhi})
353 record.update(_metrics_for("total"))
354 conc_records.append(record)
356 if not hhi_chain.empty: 356 ↛ 363line 356 didn't jump to line 363 because the condition on line 356 was always true
357 for _, row in hhi_chain.iterrows():
358 scope = f"chain:{row['chain']}"
359 record = cast(dict[str, float | str], {"scope": scope, "hhi": float(row["hhi"])})
360 record.update(_metrics_for(scope))
361 conc_records.append(record)
363 if not hhi_stable.empty: 363 ↛ 370line 363 didn't jump to line 370 because the condition on line 363 was always true
364 for _, row in hhi_stable.iterrows():
365 scope = f"stablecoin:{row['stablecoin']}"
366 record = cast(dict[str, float | str], {"scope": scope, "hhi": float(row["hhi"])})
367 record.update(_metrics_for(scope))
368 conc_records.append(record)
370 existing_scopes = {rec["scope"] for rec in conc_records}
371 for scope, values in metrics_map.items():
372 if scope in existing_scopes:
373 continue
374 record = cast(dict[str, float | str], {"scope": scope, "hhi": float("nan")})
375 record.update({col: values.get(col, float("nan")) for col in _RISK_COLUMNS})
376 conc_records.append(record)
378 conc_all = pd.DataFrame(conc_records)
379 paths["concentration"] = out / "concentration.csv"
380 conc_all.to_csv(paths["concentration"], index=False)
382 return paths