Skip to content

Iot

Driver for IoT data.

IotDriver

Bases: AbstractBaseDriver

Driver class for IoT data.

Source code in drivers/intake_geokube/iot/driver.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class IotDriver(AbstractBaseDriver):
    """Driver class for IoT data."""

    name: str = "iot_driver"
    version: str = "0.1b0"

    def __init__(
        self,
        mqtt_kwargs,
        time_window,
        data_model,
        start=False,
        metadata=None,
        **kwargs,
    ):
        super().__init__(metadata=metadata)
        self.mqtt_kwargs = mqtt_kwargs
        self.kwargs = kwargs
        self.stream = None
        self.time_window = time_window
        self.start = start
        self.df_model = _build(data_model)

    def _get_schema(self):
        if not self.stream:
            self.log.debug("creating stream...")
            stream = streamz.Stream.from_mqtt(**self.mqtt_kwargs)
            self.stream = stream.accumulate(
                _mqtt_preprocess, returns_state=False, start=pd.DataFrame()
            ).to_dataframe(example=self.df_model)
            self.stream.stream.sink(d.append)
        if self.start:
            self.log.info("streaming started...")
            self.stream.start()
        return {"stream": str(self.stream)}

    def read(self) -> streamz.dataframe.core.DataFrame:
        """Read IoT data.

        Returns
        -------
        stream : `streamz.dataframe.DataFrame`
        """
        self.log.info("reading stream...")
        self._get_schema()
        return self.stream

    def load(self) -> NoReturn:
        """Load IoT data."""
        self.log.error("loading entire product is not supported for IoT data")
        raise NotImplementedError(
            "loading entire product is not supported for IoT data"
        )

    def process(self, query: GeoQuery) -> streamz.dataframe.core.DataFrame:
        """Process IoT data with the passed query.

        Parameters
        ----------
        query : intake_geokube.GeoQuery
            A query to use

        Returns
        -------
        stream  : streamz.dataframe.DataFrame
            A DataFrame object with streamed content
        """
        df = d[0]
        if not query:
            self.log.info(
                "method 'process' called without query. processing skipped."
            )
            return df
        if query.time:
            if not isinstance(query.time, slice):
                self.log.error(
                    "expected 'query.time' type is slice but found %s",
                    type(query.time),
                )
                raise TypeError(
                    "expected 'query.time' type is slice but found"
                    f" {type(query.time)}"
                )
            self.log.info("querying by time: %s", query.time)
            df = df[query.time.start : query.time.stop]
        else:
            self.log.info(
                "getting latest data for the predefined tie window: %s",
                self.time_window,
            )
            start = dateparser.parse(f"NOW - {self.time_window}")
            stop = dateparser.parse("NOW")
            df = df[start:stop]  # type: ignore[misc]
        if query.filters:
            self.log.info("filtering with: %s", query.filters)
            mask = np.logical_and.reduce(
                [df[k] == v for k, v in query.filters.items()]
            )
            df = df[mask]
        if query.variable:
            self.log.info("selecting variables: %s", query.variable)
            df = df[query.variable]
        return df

load()

Load IoT data.

Source code in drivers/intake_geokube/iot/driver.py
114
115
116
117
118
119
def load(self) -> NoReturn:
    """Load IoT data."""
    self.log.error("loading entire product is not supported for IoT data")
    raise NotImplementedError(
        "loading entire product is not supported for IoT data"
    )

process(query)

Process IoT data with the passed query.

Parameters

query : intake_geokube.GeoQuery A query to use

Returns

stream : streamz.dataframe.DataFrame A DataFrame object with streamed content

Source code in drivers/intake_geokube/iot/driver.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def process(self, query: GeoQuery) -> streamz.dataframe.core.DataFrame:
    """Process IoT data with the passed query.

    Parameters
    ----------
    query : intake_geokube.GeoQuery
        A query to use

    Returns
    -------
    stream  : streamz.dataframe.DataFrame
        A DataFrame object with streamed content
    """
    df = d[0]
    if not query:
        self.log.info(
            "method 'process' called without query. processing skipped."
        )
        return df
    if query.time:
        if not isinstance(query.time, slice):
            self.log.error(
                "expected 'query.time' type is slice but found %s",
                type(query.time),
            )
            raise TypeError(
                "expected 'query.time' type is slice but found"
                f" {type(query.time)}"
            )
        self.log.info("querying by time: %s", query.time)
        df = df[query.time.start : query.time.stop]
    else:
        self.log.info(
            "getting latest data for the predefined tie window: %s",
            self.time_window,
        )
        start = dateparser.parse(f"NOW - {self.time_window}")
        stop = dateparser.parse("NOW")
        df = df[start:stop]  # type: ignore[misc]
    if query.filters:
        self.log.info("filtering with: %s", query.filters)
        mask = np.logical_and.reduce(
            [df[k] == v for k, v in query.filters.items()]
        )
        df = df[mask]
    if query.variable:
        self.log.info("selecting variables: %s", query.variable)
        df = df[query.variable]
    return df

read()

Read IoT data.

Returns

stream : streamz.dataframe.DataFrame

Source code in drivers/intake_geokube/iot/driver.py
103
104
105
106
107
108
109
110
111
112
def read(self) -> streamz.dataframe.core.DataFrame:
    """Read IoT data.

    Returns
    -------
    stream : `streamz.dataframe.DataFrame`
    """
    self.log.info("reading stream...")
    self._get_schema()
    return self.stream