Coverage for src/stable_yield_lab/analytics/attribution.py: 67%

140 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-04 20:38 +0000

1"""Portfolio return attribution using realised returns and weight schedules.""" 

2 

3from __future__ import annotations 

4 

5from dataclasses import dataclass 

6import math 

7from typing import Any 

8 

9import pandas as pd 

10 

11 

12@dataclass(frozen=True) 

13class AttributionResult: 

14 """Container holding attribution outputs for downstream reporting.""" 

15 

16 portfolio: dict[str, Any] 

17 by_pool: pd.DataFrame 

18 by_window: pd.DataFrame 

19 period_returns: pd.Series 

20 

21 

22def _ensure_datetime_index(index: pd.Index, *, label: str) -> pd.DatetimeIndex: 

23 """Return a datetime index, coercing the input when possible.""" 

24 

25 if isinstance(index, pd.DatetimeIndex): 25 ↛ 28line 25 didn't jump to line 28 because the condition on line 25 was always true

26 dt_index = index 

27 else: 

28 dt_index = pd.to_datetime(index, utc=True, errors="coerce") 

29 if dt_index.isna().any(): 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true

30 raise TypeError(f"{label} index must be datetime-like") 

31 return pd.DatetimeIndex(dt_index).sort_values() 

32 

33 

34def _infer_periods_per_year(index: pd.DatetimeIndex) -> float: 

35 """Infer the periodicity of the return series expressed as periods per year.""" 

36 

37 if len(index) < 2: 

38 return 1.0 

39 diffs = index.to_series().diff().dropna().dt.total_seconds() 

40 mean_seconds = float(diffs.mean()) if not diffs.empty else 0.0 

41 if mean_seconds <= 0: 

42 return 1.0 

43 seconds_per_year = 365.25 * 24 * 3600 

44 return seconds_per_year / mean_seconds 

45 

46 

47def _prepare_weight_schedule( 

48 weight_schedule: pd.DataFrame | pd.Series, 

49 returns_index: pd.DatetimeIndex, 

50 columns: list[str], 

51) -> tuple[pd.DataFrame, pd.Series]: 

52 """Align the weight schedule to the return index and compute window labels.""" 

53 

54 if isinstance(weight_schedule, pd.Series): 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true

55 schedule = pd.DataFrame([weight_schedule], index=[returns_index[0]]) 

56 else: 

57 schedule = weight_schedule.copy() 

58 if "timestamp" in schedule.columns and not isinstance(schedule.index, pd.DatetimeIndex): 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 schedule = schedule.set_index("timestamp") 

60 

61 if schedule.empty: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 raise ValueError("weight_schedule must contain at least one row") 

63 

64 schedule.index = _ensure_datetime_index(schedule.index, label="weight_schedule") 

65 schedule = schedule.loc[~schedule.index.duplicated(keep="last")] 

66 schedule = schedule.sort_index() 

67 schedule = schedule.reindex(columns=columns).fillna(0.0) 

68 

69 aligned = schedule.reindex(returns_index, method="ffill") 

70 if aligned.isna().any().any(): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 raise ValueError("weight_schedule does not cover the full return history") 

72 

73 window_labels = pd.Series(schedule.index, index=schedule.index) 

74 window_labels = window_labels.reindex(returns_index, method="ffill") 

75 if window_labels.isna().any(): 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 raise ValueError("weight_schedule does not cover the full return history") 

77 

78 return aligned, window_labels 

79 

80 

81def compute_attribution( 

82 returns: pd.DataFrame, 

83 weight_schedule: pd.DataFrame | pd.Series | None, 

84 *, 

85 periods_per_year: float | None = None, 

86 initial_nav: float = 1.0, 

87) -> AttributionResult: 

88 """Compute realised performance attribution by pool and rebalance window. 

89 

90 Parameters 

91 ---------- 

92 returns: 

93 DataFrame of periodic simple returns expressed as decimal fractions. Rows 

94 represent timestamps and columns represent pools. 

95 weight_schedule: 

96 Target weights per pool. A wide DataFrame or Series indexed by 

97 rebalance timestamps. ``None`` falls back to equal weights across the 

98 available pools. 

99 periods_per_year: 

100 Annualisation factor used to convert realised total return into APY. If 

101 omitted the value is inferred from the timestamp spacing. 

102 initial_nav: 

103 Starting capital used to scale contributions. 

104 

105 Notes 

106 ----- 

107 For each period :math:`t` the capital change attributed to pool :math:`i` 

108 is ``ΔNAV_{i,t} = NAV_{t-1} · w_{i,t} · r_{i,t}``, where ``w`` are the 

109 schedule weights and ``r`` are realised simple returns. Pool level 

110 contributions normalise the sum of these capital changes by the initial 

111 capital ``NAV_0`` yielding an additive decomposition of the total realised 

112 simple return. Rebalance window contributions aggregate ``ΔNAV`` over each 

113 interval defined by the weight schedule. The realised APY is computed from 

114 the geometric growth factor ``G = NAV_T / NAV_0`` via ``APY = G^{P/T} - 1`` 

115 where ``P`` denotes ``periods_per_year`` and ``T`` the number of observed 

116 periods. Contribution shares scale this APY by each component's share of the 

117 total simple return. 

118 """ 

119 

120 if returns.empty: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 empty = pd.DataFrame( 

122 columns=[ 

123 "pool", 

124 "avg_weight", 

125 "nav_contribution", 

126 "return_contribution", 

127 "return_share", 

128 "apy_contribution", 

129 ] 

130 ) 

131 windows = pd.DataFrame( 

132 columns=[ 

133 "window_start", 

134 "window_end", 

135 "periods", 

136 "start_nav", 

137 "end_nav", 

138 "nav_change", 

139 "window_return", 

140 "return_contribution", 

141 "return_share", 

142 "apy_contribution", 

143 "window_apy", 

144 ] 

145 ) 

146 portfolio = { 

147 "initial_nav": float(initial_nav), 

148 "final_nav": float(initial_nav), 

149 "total_return": 0.0, 

150 "realized_apy": 0.0, 

151 "periods": 0, 

152 "periods_per_year": periods_per_year or float("nan"), 

153 } 

154 return AttributionResult( 

155 portfolio=portfolio, 

156 by_pool=empty, 

157 by_window=windows, 

158 period_returns=pd.Series(dtype=float), 

159 ) 

160 

161 initial_value = float(initial_nav) 

162 returns = returns.copy() 

163 returns_index = _ensure_datetime_index(returns.index, label="returns") 

164 returns.index = returns_index 

165 returns = returns.sort_index() 

166 columns = list(returns.columns) 

167 if weight_schedule is None: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 if not columns: 

169 raise ValueError("returns must contain columns when weight_schedule is None") 

170 weight_schedule = pd.Series(1.0 / len(columns), index=columns) 

171 aligned_weights, window_labels = _prepare_weight_schedule( 

172 weight_schedule, returns_index, columns 

173 ) 

174 

175 weight_sums = aligned_weights.sum(axis=1) 

176 if (weight_sums <= 0).any(): 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 raise ValueError("weight_schedule rows must sum to a positive value") 

178 norm_weights = aligned_weights.div(weight_sums, axis=0).fillna(0.0) 

179 

180 clean_returns = returns.fillna(0.0).astype(float) 

181 norm_weights = norm_weights.astype(float) 

182 

183 nav = initial_value 

184 pool_nav_contrib = pd.Series(0.0, index=columns, dtype=float) 

185 weight_accum = pd.Series(0.0, index=columns, dtype=float) 

186 window_stats: dict[pd.Timestamp, dict[str, Any]] = {} 

187 period_returns = [] 

188 

189 for timestamp in clean_returns.index: 

190 nav_prev = nav 

191 weights_row = norm_weights.loc[timestamp].reindex(columns).fillna(0.0) 

192 returns_row = clean_returns.loc[timestamp].reindex(columns).fillna(0.0) 

193 

194 period_ret = float((weights_row * returns_row).sum()) 

195 period_returns.append(period_ret) 

196 delta_nav_by_pool = nav_prev * weights_row * returns_row 

197 delta_nav = float(delta_nav_by_pool.sum()) 

198 nav = nav_prev + delta_nav 

199 

200 pool_nav_contrib += delta_nav_by_pool 

201 weight_accum += weights_row 

202 

203 window_key = pd.Timestamp(window_labels.loc[timestamp]) 

204 stats = window_stats.get(window_key) 

205 if stats is None: 

206 stats = { 

207 "window_start": window_key, 

208 "window_end": timestamp, 

209 "start_nav": nav_prev, 

210 "end_nav": nav, 

211 "nav_change": 0.0, 

212 "periods": 0, 

213 } 

214 window_stats[window_key] = stats 

215 stats["window_end"] = timestamp 

216 stats["end_nav"] = nav 

217 stats["nav_change"] = float(stats["nav_change"]) + delta_nav 

218 stats["periods"] = int(stats["periods"]) + 1 

219 

220 total_return = (nav / initial_value) - 1.0 

221 periods = len(clean_returns) 

222 if periods_per_year is None: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 periods_per_year = _infer_periods_per_year(returns_index) 

224 if periods_per_year <= 0: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 raise ValueError("periods_per_year must be positive") 

226 

227 horizon_years = periods / periods_per_year if periods_per_year else float("nan") 

228 growth = nav / float(initial_nav) 

229 realized_apy = (growth ** (periods_per_year / periods)) - 1.0 if periods > 0 else 0.0 

230 

231 contrib_returns = pool_nav_contrib / initial_value 

232 close_to_zero = math.isclose(total_return, 0.0, abs_tol=1e-12) 

233 if close_to_zero: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 return_share = contrib_returns * 0.0 

235 else: 

236 return_share = contrib_returns / total_return 

237 apy_contrib = return_share * realized_apy 

238 avg_weight = weight_accum / periods 

239 

240 by_pool = pd.DataFrame( 

241 { 

242 "pool": columns, 

243 "avg_weight": avg_weight.values, 

244 "nav_contribution": pool_nav_contrib.values, 

245 "return_contribution": contrib_returns.values, 

246 "return_share": return_share.values, 

247 "apy_contribution": apy_contrib.values, 

248 } 

249 ) 

250 

251 window_records: list[dict[str, Any]] = [] 

252 for key in sorted(window_stats.keys()): 

253 stats = window_stats[key] 

254 start_nav = float(stats["start_nav"]) 

255 nav_change = float(stats["nav_change"]) 

256 end_nav = float(stats["end_nav"]) 

257 periods_in_window = int(stats["periods"]) 

258 window_return = nav_change / start_nav if start_nav else 0.0 

259 return_contribution = nav_change / initial_value 

260 if close_to_zero: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 window_share = 0.0 

262 else: 

263 window_share = return_contribution / total_return 

264 if periods_in_window > 0: 264 ↛ 267line 264 didn't jump to line 267 because the condition on line 264 was always true

265 window_apy = ((1.0 + window_return) ** (periods_per_year / periods_in_window)) - 1.0 

266 else: 

267 window_apy = float("nan") 

268 window_records.append( 

269 { 

270 "window_start": stats["window_start"], 

271 "window_end": stats["window_end"], 

272 "periods": periods_in_window, 

273 "start_nav": start_nav, 

274 "end_nav": end_nav, 

275 "nav_change": nav_change, 

276 "window_return": window_return, 

277 "return_contribution": return_contribution, 

278 "return_share": window_share, 

279 "apy_contribution": window_share * realized_apy, 

280 "window_apy": window_apy, 

281 } 

282 ) 

283 

284 by_window = pd.DataFrame(window_records) 

285 

286 portfolio_summary = { 

287 "initial_nav": initial_value, 

288 "final_nav": nav, 

289 "total_return": total_return, 

290 "realized_apy": realized_apy, 

291 "periods": periods, 

292 "periods_per_year": periods_per_year, 

293 "horizon_years": horizon_years, 

294 } 

295 

296 return AttributionResult( 

297 portfolio=portfolio_summary, 

298 by_pool=by_pool, 

299 by_window=by_window, 

300 period_returns=pd.Series(period_returns, index=returns_index, name="portfolio_return"), 

301 ) 

302 

303 

304def load_weight_schedule(path: str | bytes | Any) -> pd.DataFrame: 

305 """Load a weight schedule from CSV in either wide or long format.""" 

306 

307 df = pd.read_csv(path) 

308 if "timestamp" not in df.columns: 

309 raise ValueError("weight schedule CSV must contain a 'timestamp' column") 

310 if {"name", "weight"}.issubset(df.columns): 

311 schedule = df.pivot(index="timestamp", columns="name", values="weight") 

312 else: 

313 schedule = df.set_index("timestamp") 

314 schedule.index = pd.to_datetime(schedule.index, utc=True) 

315 return schedule.sort_index()