Coverage for src/stable_yield_lab/pipeline/__init__.py: 96%
43 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-04 20:38 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-04 20:38 +0000
1from __future__ import annotations
3"""Data orchestration pipeline for StableYieldLab adapters."""
5from collections.abc import Iterable, Iterator, Sequence
6import logging
7from typing import Protocol, TypeVar
9import pandas as pd
11from ..core import Pool, PoolRepository, PoolReturn, ReturnRepository
12from ..risk_scoring import score_pool
13from ..sources import DataSource
15logger = logging.getLogger(__name__)
18T = TypeVar("T")
21class HistoricalSource(Protocol):
22 """Protocol for adapters returning historical :class:`PoolReturn` rows."""
24 def fetch(self) -> list[PoolReturn]: ... 24 ↛ exitline 24 didn't return from function 'fetch' because
27PipelineSource = DataSource | HistoricalSource
30def _iter_instances(items: Iterable[object], cls: type[T]) -> Iterator[T]:
31 for item in items:
32 if isinstance(item, cls): 32 ↛ 31line 32 didn't jump to line 31 because the condition on line 32 was always true
33 yield item
36class Pipeline:
37 """Composable pipeline orchestrating snapshot and historical adapters."""
39 def __init__(self, sources: Sequence[PipelineSource]) -> None:
40 self._sources: list[PipelineSource] = list(sources)
42 def run(self) -> PoolRepository:
43 repo = PoolRepository()
44 for source in self._sources:
45 try:
46 items = source.fetch()
47 except Exception as exc:
48 logger.warning("Source %s failed: %s", source.__class__.__name__, exc)
49 continue
50 for pool in _iter_instances(items, Pool):
51 repo.add(score_pool(pool))
52 return repo
54 def run_history(self) -> pd.DataFrame:
55 repo = ReturnRepository()
56 for source in self._sources:
57 try:
58 items = source.fetch()
59 except Exception as exc:
60 logger.warning("Source %s failed: %s", source.__class__.__name__, exc)
61 continue
62 repo.extend(_iter_instances(items, PoolReturn))
63 return repo.to_timeseries()
66__all__ = ["Pipeline"]