Coverage for src/stable_yield_lab/pipeline/__init__.py: 96%

43 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-04 20:38 +0000

1from __future__ import annotations 

2 

3"""Data orchestration pipeline for StableYieldLab adapters.""" 

4 

5from collections.abc import Iterable, Iterator, Sequence 

6import logging 

7from typing import Protocol, TypeVar 

8 

9import pandas as pd 

10 

11from ..core import Pool, PoolRepository, PoolReturn, ReturnRepository 

12from ..risk_scoring import score_pool 

13from ..sources import DataSource 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18T = TypeVar("T") 

19 

20 

21class HistoricalSource(Protocol): 

22 """Protocol for adapters returning historical :class:`PoolReturn` rows.""" 

23 

24 def fetch(self) -> list[PoolReturn]: ... 24 ↛ exitline 24 didn't return from function 'fetch' because

25 

26 

27PipelineSource = DataSource | HistoricalSource 

28 

29 

30def _iter_instances(items: Iterable[object], cls: type[T]) -> Iterator[T]: 

31 for item in items: 

32 if isinstance(item, cls): 32 ↛ 31line 32 didn't jump to line 31 because the condition on line 32 was always true

33 yield item 

34 

35 

36class Pipeline: 

37 """Composable pipeline orchestrating snapshot and historical adapters.""" 

38 

39 def __init__(self, sources: Sequence[PipelineSource]) -> None: 

40 self._sources: list[PipelineSource] = list(sources) 

41 

42 def run(self) -> PoolRepository: 

43 repo = PoolRepository() 

44 for source in self._sources: 

45 try: 

46 items = source.fetch() 

47 except Exception as exc: 

48 logger.warning("Source %s failed: %s", source.__class__.__name__, exc) 

49 continue 

50 for pool in _iter_instances(items, Pool): 

51 repo.add(score_pool(pool)) 

52 return repo 

53 

54 def run_history(self) -> pd.DataFrame: 

55 repo = ReturnRepository() 

56 for source in self._sources: 

57 try: 

58 items = source.fetch() 

59 except Exception as exc: 

60 logger.warning("Source %s failed: %s", source.__class__.__name__, exc) 

61 continue 

62 repo.extend(_iter_instances(items, PoolReturn)) 

63 return repo.to_timeseries() 

64 

65 

66__all__ = ["Pipeline"]