Coverage for src/stable_yield_lab/analytics/attribution.py: 67%
140 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-04 20:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-04 20:38 +0000
1"""Portfolio return attribution using realised returns and weight schedules."""
3from __future__ import annotations
5from dataclasses import dataclass
6import math
7from typing import Any
9import pandas as pd
12@dataclass(frozen=True)
13class AttributionResult:
14 """Container holding attribution outputs for downstream reporting."""
16 portfolio: dict[str, Any]
17 by_pool: pd.DataFrame
18 by_window: pd.DataFrame
19 period_returns: pd.Series
22def _ensure_datetime_index(index: pd.Index, *, label: str) -> pd.DatetimeIndex:
23 """Return a datetime index, coercing the input when possible."""
25 if isinstance(index, pd.DatetimeIndex): 25 ↛ 28line 25 didn't jump to line 28 because the condition on line 25 was always true
26 dt_index = index
27 else:
28 dt_index = pd.to_datetime(index, utc=True, errors="coerce")
29 if dt_index.isna().any(): 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true
30 raise TypeError(f"{label} index must be datetime-like")
31 return pd.DatetimeIndex(dt_index).sort_values()
34def _infer_periods_per_year(index: pd.DatetimeIndex) -> float:
35 """Infer the periodicity of the return series expressed as periods per year."""
37 if len(index) < 2:
38 return 1.0
39 diffs = index.to_series().diff().dropna().dt.total_seconds()
40 mean_seconds = float(diffs.mean()) if not diffs.empty else 0.0
41 if mean_seconds <= 0:
42 return 1.0
43 seconds_per_year = 365.25 * 24 * 3600
44 return seconds_per_year / mean_seconds
47def _prepare_weight_schedule(
48 weight_schedule: pd.DataFrame | pd.Series,
49 returns_index: pd.DatetimeIndex,
50 columns: list[str],
51) -> tuple[pd.DataFrame, pd.Series]:
52 """Align the weight schedule to the return index and compute window labels."""
54 if isinstance(weight_schedule, pd.Series): 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 schedule = pd.DataFrame([weight_schedule], index=[returns_index[0]])
56 else:
57 schedule = weight_schedule.copy()
58 if "timestamp" in schedule.columns and not isinstance(schedule.index, pd.DatetimeIndex): 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 schedule = schedule.set_index("timestamp")
61 if schedule.empty: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 raise ValueError("weight_schedule must contain at least one row")
64 schedule.index = _ensure_datetime_index(schedule.index, label="weight_schedule")
65 schedule = schedule.loc[~schedule.index.duplicated(keep="last")]
66 schedule = schedule.sort_index()
67 schedule = schedule.reindex(columns=columns).fillna(0.0)
69 aligned = schedule.reindex(returns_index, method="ffill")
70 if aligned.isna().any().any(): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 raise ValueError("weight_schedule does not cover the full return history")
73 window_labels = pd.Series(schedule.index, index=schedule.index)
74 window_labels = window_labels.reindex(returns_index, method="ffill")
75 if window_labels.isna().any(): 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 raise ValueError("weight_schedule does not cover the full return history")
78 return aligned, window_labels
81def compute_attribution(
82 returns: pd.DataFrame,
83 weight_schedule: pd.DataFrame | pd.Series | None,
84 *,
85 periods_per_year: float | None = None,
86 initial_nav: float = 1.0,
87) -> AttributionResult:
88 """Compute realised performance attribution by pool and rebalance window.
90 Parameters
91 ----------
92 returns:
93 DataFrame of periodic simple returns expressed as decimal fractions. Rows
94 represent timestamps and columns represent pools.
95 weight_schedule:
96 Target weights per pool. A wide DataFrame or Series indexed by
97 rebalance timestamps. ``None`` falls back to equal weights across the
98 available pools.
99 periods_per_year:
100 Annualisation factor used to convert realised total return into APY. If
101 omitted the value is inferred from the timestamp spacing.
102 initial_nav:
103 Starting capital used to scale contributions.
105 Notes
106 -----
107 For each period :math:`t` the capital change attributed to pool :math:`i`
108 is ``ΔNAV_{i,t} = NAV_{t-1} · w_{i,t} · r_{i,t}``, where ``w`` are the
109 schedule weights and ``r`` are realised simple returns. Pool level
110 contributions normalise the sum of these capital changes by the initial
111 capital ``NAV_0`` yielding an additive decomposition of the total realised
112 simple return. Rebalance window contributions aggregate ``ΔNAV`` over each
113 interval defined by the weight schedule. The realised APY is computed from
114 the geometric growth factor ``G = NAV_T / NAV_0`` via ``APY = G^{P/T} - 1``
115 where ``P`` denotes ``periods_per_year`` and ``T`` the number of observed
116 periods. Contribution shares scale this APY by each component's share of the
117 total simple return.
118 """
120 if returns.empty: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 empty = pd.DataFrame(
122 columns=[
123 "pool",
124 "avg_weight",
125 "nav_contribution",
126 "return_contribution",
127 "return_share",
128 "apy_contribution",
129 ]
130 )
131 windows = pd.DataFrame(
132 columns=[
133 "window_start",
134 "window_end",
135 "periods",
136 "start_nav",
137 "end_nav",
138 "nav_change",
139 "window_return",
140 "return_contribution",
141 "return_share",
142 "apy_contribution",
143 "window_apy",
144 ]
145 )
146 portfolio = {
147 "initial_nav": float(initial_nav),
148 "final_nav": float(initial_nav),
149 "total_return": 0.0,
150 "realized_apy": 0.0,
151 "periods": 0,
152 "periods_per_year": periods_per_year or float("nan"),
153 }
154 return AttributionResult(
155 portfolio=portfolio,
156 by_pool=empty,
157 by_window=windows,
158 period_returns=pd.Series(dtype=float),
159 )
161 initial_value = float(initial_nav)
162 returns = returns.copy()
163 returns_index = _ensure_datetime_index(returns.index, label="returns")
164 returns.index = returns_index
165 returns = returns.sort_index()
166 columns = list(returns.columns)
167 if weight_schedule is None: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true
168 if not columns:
169 raise ValueError("returns must contain columns when weight_schedule is None")
170 weight_schedule = pd.Series(1.0 / len(columns), index=columns)
171 aligned_weights, window_labels = _prepare_weight_schedule(
172 weight_schedule, returns_index, columns
173 )
175 weight_sums = aligned_weights.sum(axis=1)
176 if (weight_sums <= 0).any(): 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 raise ValueError("weight_schedule rows must sum to a positive value")
178 norm_weights = aligned_weights.div(weight_sums, axis=0).fillna(0.0)
180 clean_returns = returns.fillna(0.0).astype(float)
181 norm_weights = norm_weights.astype(float)
183 nav = initial_value
184 pool_nav_contrib = pd.Series(0.0, index=columns, dtype=float)
185 weight_accum = pd.Series(0.0, index=columns, dtype=float)
186 window_stats: dict[pd.Timestamp, dict[str, Any]] = {}
187 period_returns = []
189 for timestamp in clean_returns.index:
190 nav_prev = nav
191 weights_row = norm_weights.loc[timestamp].reindex(columns).fillna(0.0)
192 returns_row = clean_returns.loc[timestamp].reindex(columns).fillna(0.0)
194 period_ret = float((weights_row * returns_row).sum())
195 period_returns.append(period_ret)
196 delta_nav_by_pool = nav_prev * weights_row * returns_row
197 delta_nav = float(delta_nav_by_pool.sum())
198 nav = nav_prev + delta_nav
200 pool_nav_contrib += delta_nav_by_pool
201 weight_accum += weights_row
203 window_key = pd.Timestamp(window_labels.loc[timestamp])
204 stats = window_stats.get(window_key)
205 if stats is None:
206 stats = {
207 "window_start": window_key,
208 "window_end": timestamp,
209 "start_nav": nav_prev,
210 "end_nav": nav,
211 "nav_change": 0.0,
212 "periods": 0,
213 }
214 window_stats[window_key] = stats
215 stats["window_end"] = timestamp
216 stats["end_nav"] = nav
217 stats["nav_change"] = float(stats["nav_change"]) + delta_nav
218 stats["periods"] = int(stats["periods"]) + 1
220 total_return = (nav / initial_value) - 1.0
221 periods = len(clean_returns)
222 if periods_per_year is None: 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 periods_per_year = _infer_periods_per_year(returns_index)
224 if periods_per_year <= 0: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 raise ValueError("periods_per_year must be positive")
227 horizon_years = periods / periods_per_year if periods_per_year else float("nan")
228 growth = nav / float(initial_nav)
229 realized_apy = (growth ** (periods_per_year / periods)) - 1.0 if periods > 0 else 0.0
231 contrib_returns = pool_nav_contrib / initial_value
232 close_to_zero = math.isclose(total_return, 0.0, abs_tol=1e-12)
233 if close_to_zero: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 return_share = contrib_returns * 0.0
235 else:
236 return_share = contrib_returns / total_return
237 apy_contrib = return_share * realized_apy
238 avg_weight = weight_accum / periods
240 by_pool = pd.DataFrame(
241 {
242 "pool": columns,
243 "avg_weight": avg_weight.values,
244 "nav_contribution": pool_nav_contrib.values,
245 "return_contribution": contrib_returns.values,
246 "return_share": return_share.values,
247 "apy_contribution": apy_contrib.values,
248 }
249 )
251 window_records: list[dict[str, Any]] = []
252 for key in sorted(window_stats.keys()):
253 stats = window_stats[key]
254 start_nav = float(stats["start_nav"])
255 nav_change = float(stats["nav_change"])
256 end_nav = float(stats["end_nav"])
257 periods_in_window = int(stats["periods"])
258 window_return = nav_change / start_nav if start_nav else 0.0
259 return_contribution = nav_change / initial_value
260 if close_to_zero: 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true
261 window_share = 0.0
262 else:
263 window_share = return_contribution / total_return
264 if periods_in_window > 0: 264 ↛ 267line 264 didn't jump to line 267 because the condition on line 264 was always true
265 window_apy = ((1.0 + window_return) ** (periods_per_year / periods_in_window)) - 1.0
266 else:
267 window_apy = float("nan")
268 window_records.append(
269 {
270 "window_start": stats["window_start"],
271 "window_end": stats["window_end"],
272 "periods": periods_in_window,
273 "start_nav": start_nav,
274 "end_nav": end_nav,
275 "nav_change": nav_change,
276 "window_return": window_return,
277 "return_contribution": return_contribution,
278 "return_share": window_share,
279 "apy_contribution": window_share * realized_apy,
280 "window_apy": window_apy,
281 }
282 )
284 by_window = pd.DataFrame(window_records)
286 portfolio_summary = {
287 "initial_nav": initial_value,
288 "final_nav": nav,
289 "total_return": total_return,
290 "realized_apy": realized_apy,
291 "periods": periods,
292 "periods_per_year": periods_per_year,
293 "horizon_years": horizon_years,
294 }
296 return AttributionResult(
297 portfolio=portfolio_summary,
298 by_pool=by_pool,
299 by_window=by_window,
300 period_returns=pd.Series(period_returns, index=returns_index, name="portfolio_return"),
301 )
304def load_weight_schedule(path: str | bytes | Any) -> pd.DataFrame:
305 """Load a weight schedule from CSV in either wide or long format."""
307 df = pd.read_csv(path)
308 if "timestamp" not in df.columns:
309 raise ValueError("weight schedule CSV must contain a 'timestamp' column")
310 if {"name", "weight"}.issubset(df.columns):
311 schedule = df.pivot(index="timestamp", columns="name", values="weight")
312 else:
313 schedule = df.set_index("timestamp")
314 schedule.index = pd.to_datetime(schedule.index, utc=True)
315 return schedule.sort_index()