Skip to content

Base

Module with AbstractBaseDriver definition.

AbstractBaseDriver

Bases: ABC, DataSourceBase

Abstract base class for all GeoLake-related drivers.

Source code in drivers/intake_geokube/base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class AbstractBaseDriver(ABC, DataSourceBase):
    """Abstract base class for all GeoLake-related drivers."""

    name: str = _NOT_SET
    version: str = _NOT_SET
    container: str = "python"
    log: logging.Logger

    def __new__(cls, *arr, **kw):  # pylint: disable=unused-argument
        """Create a new instance of driver and configure logger."""
        obj = super().__new__(cls)
        assert (
            obj.name != _NOT_SET
        ), f"'name' class attribute was not set for the driver '{cls}'"
        assert (
            obj.version != _NOT_SET
        ), f"'name' class attribute was not set for the driver '{cls}'"
        obj.log = cls.__configure_logger()
        return obj

    def __init__(self, *, metadata: dict) -> None:
        super().__init__(metadata=metadata)

    @classmethod
    def __configure_logger(cls) -> logging.Logger:
        log = logging.getLogger(f"geolake.intake.{cls.__name__}")
        level = os.environ.get("GeoLake_LOG_LEVEL", "INFO")
        logformat = os.environ.get(
            "GeoLake_LOG_FORMAT",
            "%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s",
        )
        log.setLevel(level)  # type: ignore[arg-type]
        for handler in log.handlers:
            if isinstance(handler, logging.StreamHandler):
                break
        else:
            log.addHandler(logging.StreamHandler())
        if logformat:
            formatter = logging.Formatter(logformat)
            for handler in log.handlers:
                handler.setFormatter(formatter)
        for handler in log.handlers:
            handler.setLevel(level)  # type: ignore[arg-type]
        return log

    @abstractmethod
    def read(self) -> Any:
        """Read metadata."""
        raise NotImplementedError

    @abstractmethod
    def load(self) -> Any:
        """Read metadata and load data into the memory."""
        raise NotImplementedError

    def process(self, query: GeoQuery) -> Any:
        """
        Process data with the query.

        Parameters
        ----------
        query: `queries.GeoQuery`
            A query to use for data processing

        Results
        -------
        res: Any
            Result of `query` processing

        Examples
        --------
        ```python
        >>> data = catalog['dataset']['product'].process(query)
        ```
        """
        data_ = self.read()
        return self._process_geokube_dataset(data_, query=query, compute=True)

    def _process_geokube_dataset(
        self,
        dataset: Dataset | DataCube,
        query: GeoQuery,
        compute: bool = False,
    ) -> Dataset | DataCube:
        self.log.info(
            "processing geokube structure with Geoquery: %s '", query
        )
        if not query:
            self.log.info("query is empty!")
            return dataset.compute() if compute else dataset
        if isinstance(dataset, Dataset):
            self.log.info("filtering with: %s", query.filters)
            dataset = dataset.filter(**query.filters)
        if isinstance(dataset, Delayed) and compute:
            dataset = dataset.compute()
        if query.variable:
            self.log.info("selecting variable: %s", query.variable)
            dataset = dataset[query.variable]
        if query.area:
            self.log.info("subsetting by bounding box: %s", query.area)
            dataset = dataset.geobbox(**query.area)
        if query.location:
            self.log.info("subsetting by location: %s", query.location)
            dataset = dataset.locations(**query.location)
        if query.time:
            self.log.info("subsetting by time: %s", query.time)
            dataset = dataset.sel(time=query.time)
        if query.vertical:
            self.log.info("subsetting by vertical: %s", query.vertical)
            method = None if isinstance(query.vertical, slice) else "nearest"
            dataset = dataset.sel(vertical=query.vertical, method=method)
        if isinstance(dataset, Dataset) and compute:
            self.log.info(
                "computing delayed datacubes in the dataset with %d"
                " records...",
                len(dataset),
            )
            dataset = dataset.apply(
                lambda dc: dc.compute() if isinstance(dc, Delayed) else dc
            )
        return dataset

__new__(*arr, **kw)

Create a new instance of driver and configure logger.

Source code in drivers/intake_geokube/base.py
26
27
28
29
30
31
32
33
34
35
36
def __new__(cls, *arr, **kw):  # pylint: disable=unused-argument
    """Create a new instance of driver and configure logger."""
    obj = super().__new__(cls)
    assert (
        obj.name != _NOT_SET
    ), f"'name' class attribute was not set for the driver '{cls}'"
    assert (
        obj.version != _NOT_SET
    ), f"'name' class attribute was not set for the driver '{cls}'"
    obj.log = cls.__configure_logger()
    return obj

load() abstractmethod

Read metadata and load data into the memory.

Source code in drivers/intake_geokube/base.py
68
69
70
71
@abstractmethod
def load(self) -> Any:
    """Read metadata and load data into the memory."""
    raise NotImplementedError

process(query)

Process data with the query.

Parameters

query: queries.GeoQuery A query to use for data processing

Results

res: Any Result of query processing

Examples
>>> data = catalog['dataset']['product'].process(query)
Source code in drivers/intake_geokube/base.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def process(self, query: GeoQuery) -> Any:
    """
    Process data with the query.

    Parameters
    ----------
    query: `queries.GeoQuery`
        A query to use for data processing

    Results
    -------
    res: Any
        Result of `query` processing

    Examples
    --------
    ```python
    >>> data = catalog['dataset']['product'].process(query)
    ```
    """
    data_ = self.read()
    return self._process_geokube_dataset(data_, query=query, compute=True)

read() abstractmethod

Read metadata.

Source code in drivers/intake_geokube/base.py
63
64
65
66
@abstractmethod
def read(self) -> Any:
    """Read metadata."""
    raise NotImplementedError