Source code for dispel.providers.registry

"""TODO: Write documentation about registry."""
import importlib
import inspect
import pkgutil
from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Type, Union

from dispel import providers
from dispel.data.core import Reading
from dispel.data.values import AbbreviatedValue
from dispel.data.values import AbbreviatedValue as AV
from dispel.processing import ProcessingStepsType, process
from dispel.processing.data_trace import DataTrace

ReaderRegistryT = Dict[Tuple[Callable[[Any], bool], Type[Reading]], Dict]
READERS: ReaderRegistryT = {}

PROCESSING_FUNCTIONS: Dict[str, Dict[type, Callable[[Reading], DataTrace]]] = {}
PROCESSING_STEPS: Dict[Tuple[Iterable[str], Type[Reading]], ProcessingStepsType] = {}

PROVIDERS = {}


[docs] def register_reader( readable: Callable[[Any], bool], reading_type: Type[Reading], registry: Optional[ReaderRegistryT] = None, ) -> Callable: """Decorate a function to serve as a reader. TODO: write documentation """ registry = registry or READERS def wrap(func): if (idx := (readable, reading_type)) in registry: raise ValueError("Reader have to be unique per readable and reading_type") # TODO: consider replacing dictionary with data class for type safety registry[idx] = { "func": func, "readable": readable, "reading_type": reading_type, } return func return wrap
[docs] def register_processing( task_name: Union[str, AV], steps: ProcessingStepsType, codes: Union[str, Tuple[str, ...]], supported_type: Type[Reading] = Reading, ) -> Callable: """Register a processing function and steps for automatic processing. Parameters ---------- task_name The standard task name. steps The processing steps to be registered. codes The possible task code acronyms. supported_type The supported reading type by the processing function. Returns ------- Callable The decorated function. """ def func(reading: Reading) -> DataTrace: if not isinstance(reading, supported_type): raise ValueError( f"Unsupported reading type: {type(reading)}. " f"Expected {supported_type}." ) return process(reading, steps, task_name=task_name) if isinstance(codes, str): codes = (codes,) for code in codes: if code not in PROCESSING_FUNCTIONS: PROCESSING_FUNCTIONS[code] = {} if supported_type in PROCESSING_FUNCTIONS[code]: raise ValueError( f"Already registered processing function for {code} and " f"{supported_type}" ) PROCESSING_FUNCTIONS[code][supported_type] = func PROCESSING_STEPS[(codes, supported_type)] = steps return func
[docs] def get_processing_function( code: str, reading_type: type ) -> Callable[[Reading], DataTrace]: """FIXME: documentation.""" if code not in PROCESSING_FUNCTIONS: raise ValueError(f"{code} missing in {PROCESSING_FUNCTIONS.keys()} keys.") hierarchy = inspect.getmro(reading_type) for cls in hierarchy: if cls in PROCESSING_FUNCTIONS[code]: return PROCESSING_FUNCTIONS[code][cls] raise ValueError( f"{reading_type} not supported for {code}. Supported types: " f"{PROCESSING_FUNCTIONS[code].keys()}" )
[docs] def process_factory( task_name: Union[str, AbbreviatedValue], steps: ProcessingStepsType, codes: Union[str, Tuple[str, ...]], supported_type: Type[Reading] = Reading, ) -> Callable[[Reading], DataTrace]: """Register and return the corresponding processing function. Parameters ---------- task_name The standard task name. steps The processing step(s) to be registered. codes The possible task code acronyms. supported_type The type of reading supported for the processing Returns ------- Callable[[Reading], DataTrace] The decorated processing function. """ return register_processing(task_name, steps, codes, supported_type)
def _load_provider_io(provider_module): mod_io = None try: mod_io = importlib.import_module(f"{provider_module.__name__}.io") except ModuleNotFoundError: pass return mod_io def _load_provider_tasks(provider_module): test_mod_iter = pkgutil.iter_modules( [provider_module.__path__[0] + "/tasks"], provider_module.__name__ + ".tasks.", ) tests = {} for finder, name, _ in test_mod_iter: tests[name] = importlib.import_module(name) return tests def _load_provider(provider_name: str): """Load provider modules and trigger registration of functionality.""" provider_module = importlib.import_module(provider_name) mod_io = _load_provider_io(provider_module) tests = _load_provider_tasks(provider_module) # TODO replace dictionary entries with data class for type safety return {"provider": provider_module, "io": mod_io, "tests": tests}
[docs] def discover_providers(): """Discover providers from the provider package. TODO: describe the discovery process """ provider_iter = pkgutil.iter_modules(providers.__path__, providers.__name__ + ".") for finder, name, ispkg in provider_iter: if ispkg and name not in PROVIDERS: PROVIDERS[name] = _load_provider(name)
discover_providers()