Skip to content

Queries

Module with GeoQuery definition.

GeoQuery

Bases: BaseModel

GeoQuery definition class.

Source code in drivers/intake_geokube/queries/geoquery.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class GeoQuery(BaseModel, extra="allow"):
    """GeoQuery definition class."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    variable: list[str] | None = None
    time: SliceQuery | TimeComboDict | None = None
    area: BoundingBoxDict | None = None
    location: dict[str, float | list[float]] | None = None
    vertical: SliceQuery | float | list[float] | None = None
    filters: dict[str, Any] = Field(default_factory=dict)
    format: str | None = None
    format_args: dict[str, Any] | None = None

    @field_serializer("time")
    def serialize_time(self, time: SliceQuery | TimeComboDict | None, _info):
        """Serialize time."""
        if isinstance(time, slice):
            return slice_to_dict(time)
        return time

    @model_validator(mode="after")
    @classmethod
    def area_locations_mutually_exclusive_validator(cls, query):
        """Assert 'locations' and 'area' are not passed at once."""
        if query.area is not None and query.location is not None:
            raise KeyError(
                "area and location couldn't be processed together, please use"
                " one of them"
            )
        return query

    @model_validator(mode="before")
    @classmethod
    def build_filters(cls, values: dict[str, Any]) -> dict[str, Any]:
        """Build filters based on extra arguments."""
        if "filters" in values:
            return values
        filters = {}
        fields = {}
        for k in values.keys():
            if k in cls.model_fields:
                fields[k] = values[k]
                continue
            if isinstance(values[k], dict):
                values[k] = maybe_dict_to_slice(values[k])
            filters[k] = values[k]
        fields["filters"] = filters
        return fields

    def model_dump_original(self, skip_empty: bool = True) -> dict:
        """Return the JSON representation of the original query."""
        res = super().model_dump()
        res = {**res.pop("filters", {}), **res}
        if skip_empty:
            res = dict(filter(lambda item: item[1] is not None, res.items()))
        return res

    @classmethod
    def parse(
        cls, load: "GeoQuery" | dict | str | bytes | bytearray
    ) -> "GeoQuery":
        """Parse load to GeoQuery instance."""
        if isinstance(load, cls):
            return load
        if isinstance(load, (str, bytes, bytearray)):
            load = json.loads(load)
        if isinstance(load, dict):
            load = GeoQuery(**load)
        else:
            raise TypeError(
                f"type of the `load` argument ({type(load).__name__}) is not"
                " supported!"
            )
        return load

area_locations_mutually_exclusive_validator(query) classmethod

Assert 'locations' and 'area' are not passed at once.

Source code in drivers/intake_geokube/queries/geoquery.py
41
42
43
44
45
46
47
48
49
50
@model_validator(mode="after")
@classmethod
def area_locations_mutually_exclusive_validator(cls, query):
    """Assert 'locations' and 'area' are not passed at once."""
    if query.area is not None and query.location is not None:
        raise KeyError(
            "area and location couldn't be processed together, please use"
            " one of them"
        )
    return query

build_filters(values) classmethod

Build filters based on extra arguments.

Source code in drivers/intake_geokube/queries/geoquery.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@model_validator(mode="before")
@classmethod
def build_filters(cls, values: dict[str, Any]) -> dict[str, Any]:
    """Build filters based on extra arguments."""
    if "filters" in values:
        return values
    filters = {}
    fields = {}
    for k in values.keys():
        if k in cls.model_fields:
            fields[k] = values[k]
            continue
        if isinstance(values[k], dict):
            values[k] = maybe_dict_to_slice(values[k])
        filters[k] = values[k]
    fields["filters"] = filters
    return fields

model_dump_original(skip_empty=True)

Return the JSON representation of the original query.

Source code in drivers/intake_geokube/queries/geoquery.py
70
71
72
73
74
75
76
def model_dump_original(self, skip_empty: bool = True) -> dict:
    """Return the JSON representation of the original query."""
    res = super().model_dump()
    res = {**res.pop("filters", {}), **res}
    if skip_empty:
        res = dict(filter(lambda item: item[1] is not None, res.items()))
    return res

parse(load) classmethod

Parse load to GeoQuery instance.

Source code in drivers/intake_geokube/queries/geoquery.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def parse(
    cls, load: "GeoQuery" | dict | str | bytes | bytearray
) -> "GeoQuery":
    """Parse load to GeoQuery instance."""
    if isinstance(load, cls):
        return load
    if isinstance(load, (str, bytes, bytearray)):
        load = json.loads(load)
    if isinstance(load, dict):
        load = GeoQuery(**load)
    else:
        raise TypeError(
            f"type of the `load` argument ({type(load).__name__}) is not"
            " supported!"
        )
    return load

serialize_time(time, _info)

Serialize time.

Source code in drivers/intake_geokube/queries/geoquery.py
34
35
36
37
38
39
@field_serializer("time")
def serialize_time(self, time: SliceQuery | TimeComboDict | None, _info):
    """Serialize time."""
    if isinstance(time, slice):
        return slice_to_dict(time)
    return time

Module with workflow definition.

Task

Bases: BaseModel

Single task model definition.

Source code in drivers/intake_geokube/queries/workflow.py
14
15
16
17
18
19
20
class Task(BaseModel):
    """Single task model definition."""

    id: str | int
    op: str
    use: list[str | int] = Field(default_factory=list)
    args: dict[str, Any] = Field(default_factory=dict)

Workflow

Bases: BaseModel

Workflow model definition.

Source code in drivers/intake_geokube/queries/workflow.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Workflow(BaseModel):
    """Workflow model definition."""

    tasks: list[Task]
    dataset_id: str = "<undefined>"
    product_id: str = "<undefined>"

    @model_validator(mode="before")
    @classmethod
    def obtain_dataset_id(cls, values):
        """Get dataset_id and product_id from included tasks."""
        dataset_id = find_value(values, key="dataset_id", recursive=True)
        if not dataset_id:
            raise KeyError(
                "'dataset_id' key was missing. did you defined it for 'args'?"
            )
        product_id = find_value(values, key="product_id", recursive=True)
        if not product_id:
            raise KeyError(
                "'product_id' key was missing. did you defined it for 'args'?"
            )
        return values | {"dataset_id": dataset_id, "product_id": product_id}

    @field_validator("tasks", mode="after")
    @classmethod
    def match_unique_ids(cls, items):
        """Verify the IDs are uniqe."""
        for id_value, id_count in Counter([item.id for item in items]).items():
            if id_count != 1:
                raise ValueError(f"duplicated key found: `{id_value}`")
        return items

    @classmethod
    def parse(
        cls,
        workflow: Workflow | dict | list[dict] | str | bytes | bytearray,
    ) -> Workflow:
        """Parse to Workflow model."""
        if isinstance(workflow, cls):
            return workflow
        if isinstance(workflow, (str | bytes | bytearray)):
            workflow = json.loads(workflow)
        if isinstance(workflow, list):
            return cls(tasks=workflow)  # type: ignore[arg-type]
        if isinstance(workflow, dict):
            return cls(**workflow)
        raise TypeError(
            f"`workflow` argument of type `{type(workflow).__name__}`"
            " cannot be safetly parsed to the `Workflow`"
        )

match_unique_ids(items) classmethod

Verify the IDs are uniqe.

Source code in drivers/intake_geokube/queries/workflow.py
46
47
48
49
50
51
52
53
@field_validator("tasks", mode="after")
@classmethod
def match_unique_ids(cls, items):
    """Verify the IDs are uniqe."""
    for id_value, id_count in Counter([item.id for item in items]).items():
        if id_count != 1:
            raise ValueError(f"duplicated key found: `{id_value}`")
    return items

obtain_dataset_id(values) classmethod

Get dataset_id and product_id from included tasks.

Source code in drivers/intake_geokube/queries/workflow.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@model_validator(mode="before")
@classmethod
def obtain_dataset_id(cls, values):
    """Get dataset_id and product_id from included tasks."""
    dataset_id = find_value(values, key="dataset_id", recursive=True)
    if not dataset_id:
        raise KeyError(
            "'dataset_id' key was missing. did you defined it for 'args'?"
        )
    product_id = find_value(values, key="product_id", recursive=True)
    if not product_id:
        raise KeyError(
            "'product_id' key was missing. did you defined it for 'args'?"
        )
    return values | {"dataset_id": dataset_id, "product_id": product_id}

parse(workflow) classmethod

Parse to Workflow model.

Source code in drivers/intake_geokube/queries/workflow.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@classmethod
def parse(
    cls,
    workflow: Workflow | dict | list[dict] | str | bytes | bytearray,
) -> Workflow:
    """Parse to Workflow model."""
    if isinstance(workflow, cls):
        return workflow
    if isinstance(workflow, (str | bytes | bytearray)):
        workflow = json.loads(workflow)
    if isinstance(workflow, list):
        return cls(tasks=workflow)  # type: ignore[arg-type]
    if isinstance(workflow, dict):
        return cls(**workflow)
    raise TypeError(
        f"`workflow` argument of type `{type(workflow).__name__}`"
        " cannot be safetly parsed to the `Workflow`"
    )