Skip to content

Sentinel

Geokube driver for sentinel data.

SentinelDriver

Bases: AbstractBaseDriver

Driver class for sentinel data.

Source code in drivers/intake_geokube/sentinel/driver.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
class SentinelDriver(AbstractBaseDriver):
    """Driver class for sentinel data."""

    name: str = "sentinel_driver"
    version: str = "0.1b0"

    def __init__(
        self,
        metadata: dict,
        url: str,
        zippattern: str,
        zippath: str,
        type: str,
        username: str | None = None,
        password: str | None = None,
        sentinel_timeout: int | None = None,
        mapping: dict | None = None,
        xarray_kwargs: dict | None = None,
    ) -> None:
        super().__init__(metadata=metadata)
        self.url: str = url
        self.zippattern: str = zippattern
        self.zippath: str = zippath
        self.type_ = type
        _validate_path_and_pattern(path=self.zippath, pattern=self.zippattern)
        self.auth: SentinelAuth = self._get_credentials(username, password)
        self.target_dir: str = get_temp_dir()
        self.sentinel_timeout: int | None = sentinel_timeout
        self.mapping: dict = mapping or {}
        self.xarray_kwargs: dict = xarray_kwargs or {}

    def _get_credentials(
        self, username: str | None, password: str | None
    ) -> SentinelAuth:
        if username and password:
            return SentinelAuth(
                username=username,
                password=password,
            )
        self.log.debug("getting credentials from environmental variables...")
        if (
            "SENTINEL_USERNAME" not in os.environ
            or "SENTINEL_PASSWORD" not in os.environ
        ):
            self.log.error(
                "missing at least of of the mandatory environmental variables:"
                " ['SENTINEL_USERNAME', 'SENTINEL_PASSWORD']"
            )
            raise KeyError(
                "missing at least of of the mandatory environmental variables:"
                " ['SENTINEL_USERNAME', 'SENTINEL_PASSWORD']"
            )
        return SentinelAuth(
            username=os.environ["SENTINEL_USERNAME"],
            password=os.environ["SENTINEL_PASSWORD"],
        )

    def _force_sentinel_type(self, builder):
        self.log.info("forcing sentinel type: %s...", self.type_)
        return builder.filter(_SentinelKeys.TYPE, containing=self.type_)

    def _filter_by_sentinel_attrs(self, builder, query: GeoQuery):
        self.log.info("filtering by sentinel attributes...")
        path_filter_names: set[str] = {
            parsed[1]
            for parsed in string.Formatter().parse(self.zippattern)
            if parsed[1]
        }
        if not query.filters:
            return builder
        sentinel_filter_names: set[str] = (
            query.filters.keys() - path_filter_names
        )
        for sf in sentinel_filter_names:
            builder = builder.filter_attr(sf, query.filters[sf])
        return builder

    def _build_odata_from_geoquery(self, query: GeoQuery) -> ODataRequest:
        self.log.debug("validating geoquery...")
        _validate_geoquery_for_sentinel(query)
        self.log.debug("constructing odata request...")
        builder = ODataRequestBuilder.new(url=self.url)
        if "product_id" in query.filters:
            builder = builder.filter(
                name=_SentinelKeys.UUID, eq=query.filters.get("product_id")
            )
        builder = self._filter_by_sentinel_attrs(builder, query=query)
        builder = self._force_sentinel_type(builder)
        if query.time:
            if isinstance(query.time, dict):
                timecombo_start, timecombo_end = _timecombo_to_day_range(query.time)
                self.log.debug("filtering by timecombo: [%s, %s] ", timecombo_start, timecombo_end)
                builder = builder.filter_date(
                    _SentinelKeys.SENSING_TIME, ge=timecombo_start, le=timecombo_end
                )
            elif isinstance(query.time, slice):
                self.log.debug("filtering by slice: %s", query.time)
                builder = builder.filter_date(
                    _SentinelKeys.SENSING_TIME,
                    ge=query.time.start,
                    le=query.time.stop,
                )
        if query.area:
            self.log.debug("filering by polygon")
            polygon = _bounding_box_to_polygon(query.area)
            builder = builder.intersect_polygon(polygon=polygon)
        if query.location:
            self.log.debug("filering by location")
            point = _location_to_valid_point(query.location)
            builder = builder.intersect_point(point=point)
        return builder.build()

    def _prepare_dataset(self) -> Dataset:
        data: list = []
        attrs_keys: list[str] = _get_attrs_keys_from_pattern(self.zippattern)
        for f in glob.glob(os.path.join(self.target_dir, self.zippath)):
            self.log.debug("processsing file %s", f)
            file_no_tmp_dir = f.removeprefix(self.target_dir).strip(os.sep)
            attr = reverse_format(self.zippattern, file_no_tmp_dir)
            attr[Dataset.FILES_COL] = [f]
            data.append(attr)
        # NOTE: eventually, join files if there are several for the same attrs
        # combintation
        df = (
            pd.DataFrame(data)
            .groupby(attrs_keys)
            .agg({Dataset.FILES_COL: sum})
        )
        datacubes = []
        for ind, files in df.iterrows():
            load = dict(zip(df.index.names, ind))
            load[Dataset.FILES_COL] = files
            load[Dataset.DATACUBE_COL] = dask.delayed(open_datacube)(
                path=files.item(),
                id_pattern=None,
                mapping=self.mapping,
                metadata_caching=False,
                **self.xarray_kwargs,
                preprocess=preprocess_sentinel,
            )
            datacubes.append(load)
        return Dataset(pd.DataFrame(datacubes))

    def read(self) -> NoReturn:
        """Read sentinel data."""
        raise NotImplementedError(
            "reading metadata is not supported for sentinel data"
        )

    def load(self) -> NoReturn:
        """Load sentinel data."""
        raise NotImplementedError(
            "loading entire product is not supported for sentinel data"
        )

    def process(self, query: GeoQuery) -> Dataset:
        """Process sentinel data according to the `query`.

        Returns
        -------
        cube : `geokube.Dataset`

        Examples
        --------
        ```python
        >>> data = catalog['sentinel']['prod_name'].process(query)
        ```
        """
        self.log.info("builder odata request based on passed geoquery...")
        req = self._build_odata_from_geoquery(query)
        self.log.info("downloading data...")
        req.download(
            target_dir=self.target_dir,
            auth=self.auth,
            timeout=self.sentinel_timeout,
        )
        self.log.info("unzipping and removing archives...")
        unzip_and_clear(self.target_dir)
        self.log.info("preparing geokube.Dataset...")
        dataset = self._prepare_dataset()
        dataset = super()._process_geokube_dataset(
            dataset, query=query, compute=True
        )
        return dataset

load()

Load sentinel data.

Source code in drivers/intake_geokube/sentinel/driver.py
319
320
321
322
323
def load(self) -> NoReturn:
    """Load sentinel data."""
    raise NotImplementedError(
        "loading entire product is not supported for sentinel data"
    )

process(query)

Process sentinel data according to the query.

Returns

cube : geokube.Dataset

Examples
>>> data = catalog['sentinel']['prod_name'].process(query)
Source code in drivers/intake_geokube/sentinel/driver.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def process(self, query: GeoQuery) -> Dataset:
    """Process sentinel data according to the `query`.

    Returns
    -------
    cube : `geokube.Dataset`

    Examples
    --------
    ```python
    >>> data = catalog['sentinel']['prod_name'].process(query)
    ```
    """
    self.log.info("builder odata request based on passed geoquery...")
    req = self._build_odata_from_geoquery(query)
    self.log.info("downloading data...")
    req.download(
        target_dir=self.target_dir,
        auth=self.auth,
        timeout=self.sentinel_timeout,
    )
    self.log.info("unzipping and removing archives...")
    unzip_and_clear(self.target_dir)
    self.log.info("preparing geokube.Dataset...")
    dataset = self._prepare_dataset()
    dataset = super()._process_geokube_dataset(
        dataset, query=query, compute=True
    )
    return dataset

read()

Read sentinel data.

Source code in drivers/intake_geokube/sentinel/driver.py
313
314
315
316
317
def read(self) -> NoReturn:
    """Read sentinel data."""
    raise NotImplementedError(
        "reading metadata is not supported for sentinel data"
    )

preprocess_sentinel(dset)

Preprocessing function for sentinel data.

Parameters

dset : xarray.Dataset xarray.Dataset to be preprocessed

Returns

ds : xarray.Dataset Preprocessed xarray.Dataset

Source code in drivers/intake_geokube/sentinel/driver.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def preprocess_sentinel(dset: xr.Dataset) -> xr.Dataset:
    """Preprocessing function for sentinel data.

    Parameters
    ----------
    dset :  xarray.Dataset
        xarray.Dataset to be preprocessed

    Returns
    -------
    ds : xarray.Dataset
        Preprocessed xarray.Dataset
    """
    crs = CRS.from_cf(dset["spatial_ref"].attrs)
    transformer = Transformer.from_crs(
        crs_from=crs, crs_to=GeographicCRS(), always_xy=True
    )
    x_vals, y_vals = dset["x"].to_numpy(), dset["y"].to_numpy()
    lon_vals, lat_vals = transformer.transform(*np.meshgrid(x_vals, y_vals))  # type: ignore[call-overload] # pylint: disable=unpacking-non-sequence
    source_path = dset.encoding["source"]
    sensing_time = os.path.splitext(source_path.split(os.sep)[-6])[0].split(
        "_"
    )[-1]
    time = pd.to_datetime([sensing_time]).to_numpy()
    dset = dset.assign_coords({
        "time": time,
        "latitude": (("x", "y"), lat_vals),
        "longitude": (("x", "y"), lon_vals),
    }).rename({"band_data": _get_field_name_from_path(source_path)})
    expanded_timedim_dataarrays = {var_name: dset[var_name].expand_dims('time') for var_name in dset.data_vars}
    dset = dset.update(expanded_timedim_dataarrays)
    return dset

unzip_and_clear(target)

Unzip ZIP archives in 'target' dir and remove archive.

Source code in drivers/intake_geokube/sentinel/driver.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def unzip_and_clear(target: str) -> None:
    """Unzip ZIP archives in 'target' dir and remove archive."""
    assert os.path.exists(target), f"directory '{target}' does not exist"
    for file in os.listdir(target):
        if not file.endswith(".zip"):
            continue
        prod_id = os.path.splitext(os.path.basename(file))[0]
        target_prod = os.path.join(target, prod_id)
        os.makedirs(target_prod, exist_ok=True)
        try:
            with zipfile.ZipFile(os.path.join(target, file)) as archive:
                archive.extractall(path=target_prod)
        except zipfile.BadZipFile as err:
            raise RuntimeError("downloaded ZIP archive is invalid") from err
        os.remove(os.path.join(target, file))

Module with OData API classes definitions.

HttpMethod

Bases: Enum

Enum with HTTP methods.

Source code in drivers/intake_geokube/sentinel/odata_builder.py
52
53
54
55
56
57
58
59
60
61
class HttpMethod(Enum):
    """Enum with HTTP methods."""

    GET = auto()
    POST = auto()

    @property
    def method_name(self) -> str:
        """Get name of the HTTP method."""
        return self.name.lower()

method_name: str property

Get name of the HTTP method.

ODataRequest

OData request object.

Source code in drivers/intake_geokube/sentinel/odata_builder.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class ODataRequest:
    """OData request object."""

    _ALL_HTTP_CODES: int = -1
    _DOWNLOAD_PATTERN: str = (
        "https://zipper.dataspace.copernicus.eu"
        "/odata/v1/Products({pid})/$value"
    )

    def __init__(self, odata: _ODataEntity) -> None:
        self.request_params: dict = {}
        self.odata = odata
        self._convert_filter_param()
        self._convert_order_param()

    def _convert_order_param(self) -> None:
        if self.odata.params["orderby"]:
            self.request_params["orderby"] = self.odata.params["orderby"]

    def _convert_filter_param(self) -> None:
        param: str = ""
        for i in range(len(self.odata.params["filter"])):
            if not param:
                param = self.odata.params["filter"][i]
            else:
                param = f"{param} {self.odata.params['filter'][i]}"
            if i < len(self.odata.params["filter"]) - 1:
                param = f"{param} {self.odata.conj[i]}"
        self.request_params["filter"] = param

    def _query(
        self,
        headers: dict | None = None,
        auth: Any | None = None,
        timeout: int | None = None,
    ) -> requests.Response:
        if self.odata.params and not self.odata.url.endswith("?"):
            self.odata.url = f"{self.odata.url}?"
        params = {}
        if self.request_params:
            params = {
                f"${key}": value for key, value in self.request_params.items()
            }
        match self.odata.method:
            case HttpMethod.GET:
                return requests.get(
                    self.odata.url,
                    params=params,
                    headers=headers,
                    timeout=timeout,
                )
            case HttpMethod.POST:
                return requests.post(
                    self.odata.url,
                    data=self.odata.body,
                    auth=auth,
                    timeout=timeout,
                )
            case _:
                raise NotImplementedError(
                    f"method {self.odata.method} is not supported"
                )

    def with_callback(
        self,
        callback: Callable[[requests.Response], Any],
        http_code: int | None = None,
    ) -> "ODataRequest":
        """
        Add callbacks for request response.

        Parameters
        ----------
        callback : callable
            A callback function taking just a single argument,
            i.e `requests.Response` object
        http_code : int
            HTTP code for which callback should be used.
            If not passed, callback will be executed for all codes.
        """
        if http_code:
            if http_code in self.odata.callbacks:
                warnings.warn(
                    f"callback for HTTP code {http_code} will be overwritten"
                )
            self.odata.callbacks[http_code] = callback
        else:
            self.odata.callbacks[self._ALL_HTTP_CODES] = callback
        return self

    def query(
        self,
        headers: dict | None = None,
        auth: Any | None = None,
        timeout: int | None = None,
    ) -> Any:
        """Query data based on the built request.

        Parameters
        ----------
        headers : dict, optional
            Headers passed to HTTP request
        auth : Any, optional
            Authorization object or tuple (<username>,<pass>) for basic authentication

        Returns
        -------
        res : Any
            Value returned from the appropriate callback or `requests.Response` object otherwise
        """
        response = self._query(headers=headers, auth=auth, timeout=timeout)
        if response.status_code in self.odata.callbacks:
            return self.odata.callbacks[response.status_code](response)
        if self._ALL_HTTP_CODES in self.odata.callbacks:
            return self.odata.callbacks[self._ALL_HTTP_CODES](response)
        return response

    def download(
        self,
        target_dir: str,
        headers: dict | None = None,
        auth: Any | None = None,
        timeout: int | None = None,
    ) -> Any:
        """Download requested data to `target_dir`.

        Parameters
        ----------
        target_dir : str
            Path to the directory where files should be downloaded
        headers : dict, optional
            Headers passed to HTTP request
        auth : Any, optional
            Authorization object or tuple (<username>,<pass>) for basic
            authentication
        """
        os.makedirs(target_dir, exist_ok=True)
        response = self._query(headers=headers, auth=auth, timeout=timeout)
        response.raise_for_status()
        if response.status_code in self.odata.callbacks:
            self.odata.callbacks[response.status_code](response)
        if self._ALL_HTTP_CODES in self.odata.callbacks:
            self.odata.callbacks[self._ALL_HTTP_CODES](response)
        df = pd.DataFrame(response.json()["value"])
        if len(df) == 0:
            raise ValueError("no product found for the request")
        if not isinstance(auth, SentinelAuth):
            raise TypeError(
                f"expected authentication of the type '{SentinelAuth}' but"
                f" passed '{type(auth)}'"
            )
        for pid in tqdm(df["Id"]):
            response = requests.get(
                self._DOWNLOAD_PATTERN.format(pid=pid),
                stream=True,
                auth=auth,
                timeout=timeout,
            )
            response.raise_for_status()
            create_zip_from_response(
                response, os.path.join(target_dir, f"{pid}.zip")
            )

download(target_dir, headers=None, auth=None, timeout=None)

Download requested data to target_dir.

Parameters

target_dir : str Path to the directory where files should be downloaded headers : dict, optional Headers passed to HTTP request auth : Any, optional Authorization object or tuple (,) for basic authentication

Source code in drivers/intake_geokube/sentinel/odata_builder.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def download(
    self,
    target_dir: str,
    headers: dict | None = None,
    auth: Any | None = None,
    timeout: int | None = None,
) -> Any:
    """Download requested data to `target_dir`.

    Parameters
    ----------
    target_dir : str
        Path to the directory where files should be downloaded
    headers : dict, optional
        Headers passed to HTTP request
    auth : Any, optional
        Authorization object or tuple (<username>,<pass>) for basic
        authentication
    """
    os.makedirs(target_dir, exist_ok=True)
    response = self._query(headers=headers, auth=auth, timeout=timeout)
    response.raise_for_status()
    if response.status_code in self.odata.callbacks:
        self.odata.callbacks[response.status_code](response)
    if self._ALL_HTTP_CODES in self.odata.callbacks:
        self.odata.callbacks[self._ALL_HTTP_CODES](response)
    df = pd.DataFrame(response.json()["value"])
    if len(df) == 0:
        raise ValueError("no product found for the request")
    if not isinstance(auth, SentinelAuth):
        raise TypeError(
            f"expected authentication of the type '{SentinelAuth}' but"
            f" passed '{type(auth)}'"
        )
    for pid in tqdm(df["Id"]):
        response = requests.get(
            self._DOWNLOAD_PATTERN.format(pid=pid),
            stream=True,
            auth=auth,
            timeout=timeout,
        )
        response.raise_for_status()
        create_zip_from_response(
            response, os.path.join(target_dir, f"{pid}.zip")
        )

query(headers=None, auth=None, timeout=None)

Query data based on the built request.

Parameters

headers : dict, optional Headers passed to HTTP request auth : Any, optional Authorization object or tuple (,) for basic authentication

Returns

res : Any Value returned from the appropriate callback or requests.Response object otherwise

Source code in drivers/intake_geokube/sentinel/odata_builder.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def query(
    self,
    headers: dict | None = None,
    auth: Any | None = None,
    timeout: int | None = None,
) -> Any:
    """Query data based on the built request.

    Parameters
    ----------
    headers : dict, optional
        Headers passed to HTTP request
    auth : Any, optional
        Authorization object or tuple (<username>,<pass>) for basic authentication

    Returns
    -------
    res : Any
        Value returned from the appropriate callback or `requests.Response` object otherwise
    """
    response = self._query(headers=headers, auth=auth, timeout=timeout)
    if response.status_code in self.odata.callbacks:
        return self.odata.callbacks[response.status_code](response)
    if self._ALL_HTTP_CODES in self.odata.callbacks:
        return self.odata.callbacks[self._ALL_HTTP_CODES](response)
    return response

with_callback(callback, http_code=None)

Add callbacks for request response.

Parameters

callback : callable A callback function taking just a single argument, i.e requests.Response object http_code : int HTTP code for which callback should be used. If not passed, callback will be executed for all codes.

Source code in drivers/intake_geokube/sentinel/odata_builder.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def with_callback(
    self,
    callback: Callable[[requests.Response], Any],
    http_code: int | None = None,
) -> "ODataRequest":
    """
    Add callbacks for request response.

    Parameters
    ----------
    callback : callable
        A callback function taking just a single argument,
        i.e `requests.Response` object
    http_code : int
        HTTP code for which callback should be used.
        If not passed, callback will be executed for all codes.
    """
    if http_code:
        if http_code in self.odata.callbacks:
            warnings.warn(
                f"callback for HTTP code {http_code} will be overwritten"
            )
        self.odata.callbacks[http_code] = callback
    else:
        self.odata.callbacks[self._ALL_HTTP_CODES] = callback
    return self

ODataRequestBuilder

Bases: _ODataOperation

OData API request builder.

Source code in drivers/intake_geokube/sentinel/odata_builder.py
563
564
565
566
567
568
569
570
571
572
573
574
class ODataRequestBuilder(
    _ODataOperation
):  # pylint: disable=too-few-public-methods
    """OData API request builder."""

    _BASE_PATTERN: str = "{url}/Products"

    @classmethod
    def new(cls, url: str) -> _ODataOperation:
        """Start building OData request."""
        url = cls._BASE_PATTERN.format(url=url.strip("/"))
        return _ODataOperation(_ODataEntity(url=url))

new(url) classmethod

Start building OData request.

Source code in drivers/intake_geokube/sentinel/odata_builder.py
570
571
572
573
574
@classmethod
def new(cls, url: str) -> _ODataOperation:
    """Start building OData request."""
    url = cls._BASE_PATTERN.format(url=url.strip("/"))
    return _ODataOperation(_ODataEntity(url=url))

datetime_to_isoformat(date)

Convert string of datetime object to ISO datetime string.

Parameters

data : str or datetime

Returns

date_str : str A ISO-compliant datetime format

Source code in drivers/intake_geokube/sentinel/odata_builder.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def datetime_to_isoformat(date: str | datetime) -> str:
    """Convert string of datetime object to ISO datetime string.

    Parameters
    ----------
    data : `str` or `datetime`

    Returns
    -------
    date_str : str
        A ISO-compliant datetime format
    """
    if isinstance(date, str):
        try:
            value = pd.to_datetime([date]).item().isoformat()
        except ValueError as exc:
            raise ValueError(f"cannot parse '{date}' to datetime") from exc
    elif isinstance(date, datetime):
        value = value.isoformat()
    else:
        raise TypeError(f"type '{type(date)}' is not supported")
    return f"{value}Z"

Module with auth utils for accessing sentinel data.

SentinelAuth

Bases: AuthBase

Class ewith authentication for accessing sentinel data.

Source code in drivers/intake_geokube/sentinel/auth.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class SentinelAuth(AuthBase):  # pylint: disable=too-few-public-methods
    """Class ewith authentication for accessing sentinel data."""

    _SENTINEL_AUTH_URL: str = os.environ.get(
        "SENTINEL_AUTH_URL",
        "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token",
    )

    def __init__(self, username: str, password: str) -> None:
        self.username = username
        self.password = password

    @classmethod
    def _get_access_token(cls, username: str, password: str) -> str:
        data = {
            "client_id": "cdse-public",
            "username": username,
            "password": password,
            "grant_type": "password",
        }
        try:
            response = requests.post(
                cls._SENTINEL_AUTH_URL, data=data, timeout=10
            )
            response.raise_for_status()
        except Exception as e:
            raise RuntimeError(
                "Access token creation failed. Reponse from the server was:"
                f" {response.json()}"
            ) from e
        return response.json()["access_token"]

    def __call__(self, request):
        """Add authorization header."""
        token: str = self._get_access_token(self.username, self.password)
        request.headers["Authorization"] = f"Bearer {token}"
        return request

__call__(request)

Add authorization header.

Source code in drivers/intake_geokube/sentinel/auth.py
41
42
43
44
45
def __call__(self, request):
    """Add authorization header."""
    token: str = self._get_access_token(self.username, self.password)
    request.headers["Authorization"] = f"Bearer {token}"
    return request