Coverage for src/stable_yield_lab/sources/csv.py: 100%
15 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-04 20:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-04 20:38 +0000
1"""CSV-backed data source implementations."""
3from __future__ import annotations
5from datetime import UTC, datetime
7import pandas as pd
9from ..core import Pool
12class CSVSource:
13 """Load pools from a CSV mapping columns to :class:`Pool` fields."""
15 def __init__(self, path: str) -> None:
16 self.path = path
18 def fetch(self) -> list[Pool]:
19 df = pd.read_csv(self.path)
20 pools: list[Pool] = []
21 now = datetime.now(tz=UTC).timestamp()
22 for _, r in df.iterrows():
23 pools.append(
24 Pool(
25 name=str(r.get("name", "")),
26 chain=str(r.get("chain", "")),
27 stablecoin=str(r.get("stablecoin", "")),
28 tvl_usd=float(r.get("tvl_usd", 0.0)),
29 base_apy=float(r.get("base_apy", 0.0)),
30 reward_apy=float(r.get("reward_apy", 0.0)),
31 is_auto=bool(r.get("is_auto", True)),
32 source=str(r.get("source", "csv")),
33 risk_score=float(r.get("risk_score", 2.0)),
34 timestamp=float(r.get("timestamp", now)),
35 )
36 )
37 return pools
40__all__ = ["CSVSource"]