18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 | class AbstractBaseDriver(ABC, DataSourceBase):
"""Abstract base class for all GeoLake-related drivers."""
name: str = _NOT_SET
version: str = _NOT_SET
container: str = "python"
log: logging.Logger
def __new__(cls, *arr, **kw): # pylint: disable=unused-argument
"""Create a new instance of driver and configure logger."""
obj = super().__new__(cls)
assert (
obj.name != _NOT_SET
), f"'name' class attribute was not set for the driver '{cls}'"
assert (
obj.version != _NOT_SET
), f"'name' class attribute was not set for the driver '{cls}'"
obj.log = cls.__configure_logger()
return obj
def __init__(self, *, metadata: dict) -> None:
super().__init__(metadata=metadata)
@classmethod
def __configure_logger(cls) -> logging.Logger:
log = logging.getLogger(f"geolake.intake.{cls.__name__}")
level = os.environ.get("GeoLake_LOG_LEVEL", "INFO")
logformat = os.environ.get(
"GeoLake_LOG_FORMAT",
"%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s",
)
log.setLevel(level) # type: ignore[arg-type]
for handler in log.handlers:
if isinstance(handler, logging.StreamHandler):
break
else:
log.addHandler(logging.StreamHandler())
if logformat:
formatter = logging.Formatter(logformat)
for handler in log.handlers:
handler.setFormatter(formatter)
for handler in log.handlers:
handler.setLevel(level) # type: ignore[arg-type]
return log
@abstractmethod
def read(self) -> Any:
"""Read metadata."""
raise NotImplementedError
@abstractmethod
def load(self) -> Any:
"""Read metadata and load data into the memory."""
raise NotImplementedError
def process(self, query: GeoQuery) -> Any:
"""
Process data with the query.
Parameters
----------
query: `queries.GeoQuery`
A query to use for data processing
Results
-------
res: Any
Result of `query` processing
Examples
--------
```python
>>> data = catalog['dataset']['product'].process(query)
```
"""
data_ = self.read()
return self._process_geokube_dataset(data_, query=query, compute=True)
def _process_geokube_dataset(
self,
dataset: Dataset | DataCube,
query: GeoQuery,
compute: bool = False,
) -> Dataset | DataCube:
self.log.info(
"processing geokube structure with Geoquery: %s '", query
)
if not query:
self.log.info("query is empty!")
return dataset.compute() if compute else dataset
if isinstance(dataset, Dataset):
self.log.info("filtering with: %s", query.filters)
dataset = dataset.filter(**query.filters)
if isinstance(dataset, Delayed) and compute:
dataset = dataset.compute()
if query.variable:
self.log.info("selecting variable: %s", query.variable)
dataset = dataset[query.variable]
if query.area:
self.log.info("subsetting by bounding box: %s", query.area)
dataset = dataset.geobbox(**query.area)
if query.location:
self.log.info("subsetting by location: %s", query.location)
dataset = dataset.locations(**query.location)
if query.time:
self.log.info("subsetting by time: %s", query.time)
dataset = dataset.sel(time=query.time)
if query.vertical:
self.log.info("subsetting by vertical: %s", query.vertical)
method = None if isinstance(query.vertical, slice) else "nearest"
dataset = dataset.sel(vertical=query.vertical, method=method)
if isinstance(dataset, Dataset) and compute:
self.log.info(
"computing delayed datacubes in the dataset with %d"
" records...",
len(dataset),
)
dataset = dataset.apply(
lambda dc: dc.compute() if isinstance(dc, Delayed) else dc
)
return dataset
|