Coverage for src/stable_yield_lab/reporting/__init__.py: 74%

218 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-04 20:38 +0000

1from __future__ import annotations 

2 

3from pathlib import Path 

4from typing import cast 

5 

6import pandas as pd 

7 

8from stable_yield_lab.core import PoolRepository 

9 

10from ..analytics.metrics import Metrics 

11 

12_RISK_COLUMNS = [ 

13 "sharpe_ratio", 

14 "sortino_ratio", 

15 "max_drawdown", 

16 "negative_period_share", 

17] 

18 

19 

20def _ensure_outdir(outdir: str | Path) -> Path: 

21 p = Path(outdir) 

22 p.mkdir(parents=True, exist_ok=True) 

23 return p 

24 

25 

26def _risk_metrics(series: pd.Series) -> dict[str, float]: 

27 metrics = {col: float("nan") for col in _RISK_COLUMNS} 

28 if series is None or series.empty: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 return metrics 

30 

31 clean = series.dropna() 

32 if clean.empty: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 return metrics 

34 

35 mean_return = float(clean.mean()) 

36 std_return = float(clean.std(ddof=1)) 

37 if std_return > 0.0: 

38 metrics["sharpe_ratio"] = mean_return / std_return 

39 

40 downside = clean[clean < 0.0] 

41 if not downside.empty: 

42 downside_std = float((downside.pow(2).mean()) ** 0.5) 

43 if downside_std > 0.0: 43 ↛ 46line 43 didn't jump to line 46 because the condition on line 43 was always true

44 metrics["sortino_ratio"] = mean_return / downside_std 

45 

46 nav = (1.0 + clean).cumprod() 

47 if not nav.empty: 47 ↛ 51line 47 didn't jump to line 51 because the condition on line 47 was always true

48 drawdown = nav / nav.cummax() - 1.0 

49 metrics["max_drawdown"] = float(drawdown.min()) 

50 

51 metrics["negative_period_share"] = float(clean.lt(0.0).mean()) 

52 return metrics 

53 

54 

55def _weighted_portfolio_returns(returns: pd.DataFrame, weights: pd.Series) -> pd.Series: 

56 if returns.empty or weights.empty: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 return pd.Series(dtype=float) 

58 

59 aligned_weights = weights.reindex(returns.columns).fillna(0.0) 

60 aligned_weights = aligned_weights[aligned_weights > 0.0] 

61 if aligned_weights.empty: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 return pd.Series(dtype=float) 

63 

64 subset = returns.loc[:, aligned_weights.index] 

65 if subset.empty: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 return pd.Series(dtype=float) 

67 

68 weight_sum = (~subset.isna()).mul(aligned_weights, axis=1).sum(axis=1) 

69 weighted_sum = subset.mul(aligned_weights, axis=1).sum(axis=1, min_count=1) 

70 portfolio = weighted_sum / weight_sum 

71 return portfolio.dropna() 

72 

73 

74def _infer_periods_per_year(index: pd.DatetimeIndex) -> float | None: 

75 if index.size < 2: 

76 return None 

77 

78 ordered = index.sort_values() 

79 diffs = ordered.to_series().diff().dropna() 

80 if diffs.empty: 

81 return None 

82 

83 avg_days = diffs.dt.total_seconds().mean() / 86_400.0 

84 if avg_days and avg_days > 0.0: 

85 return 365.25 / avg_days 

86 return None 

87 

88 

89def _annualized_return(series: pd.Series) -> float | None: 

90 if series.empty: 

91 return None 

92 index = series.index 

93 if not isinstance(index, pd.DatetimeIndex): 

94 return None 

95 periods_per_year = _infer_periods_per_year(index) 

96 if periods_per_year is None: 

97 return None 

98 

99 growth = float((1.0 + series).prod()) 

100 periods = series.shape[0] 

101 if growth <= 0.0 or periods <= 0: 

102 return None 

103 

104 return float(growth ** (periods_per_year / periods) - 1.0) 

105 

106 

107def _tvl_weighted_average(values: pd.Series, weights: pd.Series) -> float: 

108 mask = (~values.isna()) & (~weights.isna()) & (weights > 0.0) 

109 if not mask.any(): 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true

110 return float("nan") 

111 

112 vals = values.loc[mask].astype(float).tolist() 

113 wts = weights.loc[mask].astype(float).tolist() 

114 return Metrics.weighted_mean(vals, wts) 

115 

116 

117def cross_section_report( 

118 repo: PoolRepository, 

119 outdir: str | Path, 

120 *, 

121 perf_fee_bps: float = 0.0, 

122 mgmt_fee_bps: float = 0.0, 

123 top_n: int = 20, 

124 returns: pd.DataFrame | None = None, 

125 realised_apy_lookback_days: int | None = 365, 

126 realised_apy_min_observations: int = 5, 

127) -> dict[str, Path]: 

128 """Generate file-first CSV outputs for the given snapshot repository. 

129 

130 Parameters 

131 ---------- 

132 repo: 

133 Snapshot repository describing pools at a point in time. 

134 outdir: 

135 Directory where CSV reports are written. 

136 perf_fee_bps, mgmt_fee_bps: 

137 Fee assumptions applied before writing ``net_apy`` values. 

138 top_n: 

139 Number of pools to include in ``topN.csv``. 

140 returns: 

141 Optional wide DataFrame of realised periodic returns (index timestamp, 

142 columns pool name). When provided, the ``concentration.csv`` output 

143 includes Sharpe ratio, Sortino ratio, maximum drawdown, and the share of 

144 negative periods for each pool and aggregate grouping. 

145 realised_apy_lookback_days: 

146 Number of trailing days considered when computing realised APY. Set to 

147 ``None`` to use the full history available. 

148 realised_apy_min_observations: 

149 Minimum number of non-null observations required before a realised APY 

150 is calculated. Pools with fewer observations emit a warning. 

151 

152 Returns 

153 ------- 

154 dict[str, Path] 

155 Mapping of report label to the written CSV path. 

156 

157 Writes the following CSVs: 

158 - pools.csv: all pools with net_apy column 

159 - by_chain.csv: aggregated by chain with TVL-weighted APY 

160 - by_source.csv: aggregated by source (protocol) 

161 - by_stablecoin.csv: aggregated by stablecoin symbol 

162 - topN.csv: top-N pools by base_apy 

163 - concentration.csv: HHI metrics across chain and stablecoin augmented 

164 with realised risk statistics when ``returns`` are supplied 

165 - warnings.csv: per-pool realised APY warnings and observation counts 

166 """ 

167 out = _ensure_outdir(outdir) 

168 paths: dict[str, Path] = {} 

169 

170 df = repo.to_dataframe() 

171 df = Metrics.add_net_apy_column(df, perf_fee_bps=perf_fee_bps, mgmt_fee_bps=mgmt_fee_bps) 

172 

173 metrics_index = pd.Index(df["name"], name="name") if not df.empty else pd.Index([], name="name") 

174 realised_metrics = pd.DataFrame(index=metrics_index) 

175 realised_metrics["realised_apy"] = pd.Series(float("nan"), index=metrics_index, dtype=float) 

176 realised_metrics["realised_apy_observations"] = pd.Series( 

177 pd.NA, index=metrics_index, dtype="Int64" 

178 ) 

179 realised_metrics["realised_apy_warning"] = pd.Series(pd.NA, index=metrics_index, dtype="string") 

180 warnings_records: list[dict[str, object]] = [] 

181 

182 metrics_map: dict[str, dict[str, float]] = {} 

183 if returns is not None and not returns.empty and not df.empty: 

184 returns_df = returns.copy() 

185 if isinstance(returns_df, pd.Series): 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 returns_df = returns_df.to_frame() 

187 returns_df.index = pd.to_datetime(returns_df.index, utc=True, errors="coerce") 

188 returns_df = returns_df.loc[~returns_df.index.isna()] 

189 returns_df = returns_df.sort_index() 

190 returns_df = returns_df.apply(pd.to_numeric, errors="coerce") 

191 returns_df = returns_df.reindex(columns=metrics_index) 

192 returns_df = returns_df.dropna(how="all") 

193 

194 if ( 

195 realised_apy_lookback_days is not None 

196 and realised_apy_lookback_days > 0 

197 and not returns_df.empty 

198 ): 

199 cutoff = returns_df.index.max() - pd.Timedelta(days=int(realised_apy_lookback_days)) 

200 returns_df = returns_df.loc[returns_df.index >= cutoff] 

201 returns_df = returns_df.dropna(how="all") 

202 

203 metadata = df.set_index("name") 

204 min_required = max(1, realised_apy_min_observations) 

205 

206 for pool_name in metrics_index: 

207 series = returns_df.get(pool_name, pd.Series(dtype=float)) 

208 series = series.dropna() 

209 observations = int(series.shape[0]) 

210 warning: str | None = None 

211 realised_value = float("nan") 

212 

213 if observations < min_required: 213 ↛ 225line 213 didn't jump to line 225 because the condition on line 213 was always true

214 shortfall_text = ( 

215 f"Only {min_required - 1} observations or fewer in history lookback " 

216 f"(available {observations}; minimum {min_required})" 

217 ) 

218 if realised_apy_lookback_days is None or realised_apy_lookback_days <= 0: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 warning = shortfall_text 

220 else: 

221 warning = ( 

222 f"{shortfall_text} within the past {int(realised_apy_lookback_days)} days" 

223 ) 

224 else: 

225 realised = _annualized_return(series) 

226 if realised is None: 

227 warning = "Unable to annualise returns with available history" 

228 else: 

229 realised_value = realised 

230 

231 realised_metrics.loc[pool_name, "realised_apy"] = realised_value 

232 realised_metrics.loc[pool_name, "realised_apy_observations"] = observations 

233 if warning is not None: 233 ↛ 206line 233 didn't jump to line 206 because the condition on line 233 was always true

234 realised_metrics.loc[pool_name, "realised_apy_warning"] = warning 

235 warnings_records.append( 

236 { 

237 "pool": pool_name, 

238 "observations": observations, 

239 "message": warning, 

240 } 

241 ) 

242 

243 if not returns_df.empty: 

244 weights = metadata.get("tvl_usd", pd.Series(dtype=float)).astype(float) 

245 returns_aligned = returns_df.reindex(columns=metadata.index) 

246 

247 total_series = _weighted_portfolio_returns(returns_aligned, weights) 

248 metrics_map["total"] = _risk_metrics(total_series) 

249 

250 for chain, names in metadata.groupby("chain").groups.items(): 

251 chain_weights = weights.loc[list(names)] 

252 series = _weighted_portfolio_returns(returns_aligned, chain_weights) 

253 metrics_map[f"chain:{chain}"] = _risk_metrics(series) 

254 

255 for stable, names in metadata.groupby("stablecoin").groups.items(): 

256 stable_weights = weights.loc[list(names)] 

257 series = _weighted_portfolio_returns(returns_aligned, stable_weights) 

258 metrics_map[f"stablecoin:{stable}"] = _risk_metrics(series) 

259 

260 for pool_name in returns_aligned.columns: 

261 metrics_map[f"pool:{pool_name}"] = _risk_metrics(returns_aligned[pool_name]) 

262 

263 df = df.join(realised_metrics, on="name") 

264 if "realised_apy" not in df.columns: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 df["realised_apy"] = pd.Series(dtype=float) 

266 if "realised_apy_observations" not in df.columns: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 df["realised_apy_observations"] = pd.Series(dtype="Int64") 

268 if "realised_apy_warning" not in df.columns: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 df["realised_apy_warning"] = pd.Series(dtype="string") 

270 

271 paths["pools"] = out / "pools.csv" 

272 df.to_csv(paths["pools"], index=False) 

273 

274 warnings_df = pd.DataFrame(warnings_records, columns=["pool", "observations", "message"]) 

275 if warnings_df.empty: 

276 warnings_df = pd.DataFrame(columns=["pool", "observations", "message"]) 

277 else: 

278 warnings_df["observations"] = warnings_df["observations"].astype("Int64") 

279 warnings_df["message"] = warnings_df["message"].astype("string") 

280 paths["warnings"] = out / "warnings.csv" 

281 warnings_df.to_csv(paths["warnings"], index=False) 

282 

283 # Aggregations 

284 by_chain = Metrics.groupby_chain(repo) 

285 chain_realised = pd.DataFrame( 

286 columns=["chain", "realised_apy_avg", "realised_apy_wavg", "realised_apy_observations"] 

287 ) 

288 if not df.empty: 288 ↛ 304line 288 didn't jump to line 304 because the condition on line 288 was always true

289 chain_realised = ( 

290 df.groupby("chain") 

291 .agg( 

292 realised_apy_avg=("realised_apy", "mean"), 

293 realised_apy_wavg=( 

294 "realised_apy", 

295 lambda x: _tvl_weighted_average(x, df.loc[x.index, "tvl_usd"]), 

296 ), 

297 realised_apy_observations=( 

298 "realised_apy_observations", 

299 lambda x: int(x.fillna(0).sum()), 

300 ), 

301 ) 

302 .reset_index() 

303 ) 

304 chain_realised = chain_realised.astype({"realised_apy_observations": "Int64"}, errors="ignore") 

305 if by_chain.empty: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 by_chain = chain_realised 

307 else: 

308 by_chain = by_chain.merge(chain_realised, on="chain", how="left") 

309 paths["by_chain"] = out / "by_chain.csv" 

310 by_chain.to_csv(paths["by_chain"], index=False) 

311 

312 def _agg(df: pd.DataFrame, key: str) -> pd.DataFrame: 

313 if df.empty: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 return df 

315 g = df.groupby(key).agg( 

316 pools=("name", "count"), 

317 tvl=("tvl_usd", "sum"), 

318 apr_avg=("base_apy", "mean"), 

319 apr_wavg=( 

320 "base_apy", 

321 lambda x: (x * df.loc[x.index, "tvl_usd"]).sum() / df.loc[x.index, "tvl_usd"].sum(), 

322 ), 

323 ) 

324 return g.reset_index() 

325 

326 by_source = _agg(df, "source") 

327 by_stable = _agg(df, "stablecoin") 

328 paths["by_source"] = out / "by_source.csv" 

329 paths["by_stablecoin"] = out / "by_stablecoin.csv" 

330 by_source.to_csv(paths["by_source"], index=False) 

331 by_stable.to_csv(paths["by_stablecoin"], index=False) 

332 

333 # Top N 

334 top = df.sort_values("base_apy", ascending=False).head(top_n) 

335 paths["topN"] = out / "topN.csv" 

336 top.to_csv(paths["topN"], index=False) 

337 

338 # Concentration metrics enriched with realised risk metrics 

339 hhi_total = Metrics.hhi(df, value_col="tvl_usd") 

340 hhi_chain = Metrics.hhi(df, value_col="tvl_usd", group_col="chain") 

341 hhi_stable = Metrics.hhi(df, value_col="tvl_usd", group_col="stablecoin") 

342 

343 def _metrics_for(scope: str) -> dict[str, float]: 

344 values = metrics_map.get(scope) 

345 if not values: 

346 return {col: float("nan") for col in _RISK_COLUMNS} 

347 return {col: values.get(col, float("nan")) for col in _RISK_COLUMNS} 

348 

349 conc_records: list[dict[str, float | str]] = [] 

350 

351 total_hhi = float(hhi_total["hhi"].iloc[0]) if not hhi_total.empty else float("nan") 

352 record = cast(dict[str, float | str], {"scope": "total", "hhi": total_hhi}) 

353 record.update(_metrics_for("total")) 

354 conc_records.append(record) 

355 

356 if not hhi_chain.empty: 356 ↛ 363line 356 didn't jump to line 363 because the condition on line 356 was always true

357 for _, row in hhi_chain.iterrows(): 

358 scope = f"chain:{row['chain']}" 

359 record = cast(dict[str, float | str], {"scope": scope, "hhi": float(row["hhi"])}) 

360 record.update(_metrics_for(scope)) 

361 conc_records.append(record) 

362 

363 if not hhi_stable.empty: 363 ↛ 370line 363 didn't jump to line 370 because the condition on line 363 was always true

364 for _, row in hhi_stable.iterrows(): 

365 scope = f"stablecoin:{row['stablecoin']}" 

366 record = cast(dict[str, float | str], {"scope": scope, "hhi": float(row["hhi"])}) 

367 record.update(_metrics_for(scope)) 

368 conc_records.append(record) 

369 

370 existing_scopes = {rec["scope"] for rec in conc_records} 

371 for scope, values in metrics_map.items(): 

372 if scope in existing_scopes: 

373 continue 

374 record = cast(dict[str, float | str], {"scope": scope, "hhi": float("nan")}) 

375 record.update({col: values.get(col, float("nan")) for col in _RISK_COLUMNS}) 

376 conc_records.append(record) 

377 

378 conc_all = pd.DataFrame(conc_records) 

379 paths["concentration"] = out / "concentration.csv" 

380 conc_all.to_csv(paths["concentration"], index=False) 

381 

382 return paths