data.aio

1from .client import KustoClient
2
3__all__ = [
4    "KustoClient",
5]
@documented_by(KustoClientSync)
class KustoClient(data.client_base._KustoClientBase):
 27@documented_by(KustoClientSync)
 28class KustoClient(_KustoClientBase):
 29    @documented_by(KustoClientSync.__init__)
 30    def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str]):
 31        super().__init__(kcsb, True)
 32
 33        self._session = ClientSession()
 34
 35    async def __aenter__(self) -> "KustoClient":
 36        return self
 37
 38    async def close(self):
 39        if not self._is_closed:
 40            await self._session.close()
 41            if self._aad_helper:
 42                await self._aad_helper.close_async()
 43        super().close()
 44
 45    async def __aexit__(self, exc_type, exc_val, exc_tb):
 46        await self.close()
 47
 48    @aio_documented_by(KustoClientSync.execute)
 49    async def execute(self, database: Optional[str], query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
 50        query = query.strip()
 51        if query.startswith("."):
 52            return await self.execute_mgmt(database, query, properties)
 53        return await self.execute_query(database, query, properties)
 54
 55    @distributed_trace_async(name_of_span="AioKustoClient.query_cmd", kind=SpanKind.CLIENT)
 56    @aio_documented_by(KustoClientSync.execute_query)
 57    async def execute_query(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
 58        database = self._get_database_or_default(database)
 59        Span.set_query_attributes(self._kusto_cluster, database, properties)
 60        request = ExecuteRequestParams._from_query(
 61            query,
 62            database,
 63            properties,
 64            self._request_headers,
 65            self._query_default_timeout,
 66            self._mgmt_default_timeout,
 67            self._client_server_delta,
 68            self.client_details,
 69        )
 70        return await self._execute(self._query_endpoint, request, properties)
 71
 72    @distributed_trace_async(name_of_span="AioKustoClient.control_cmd", kind=SpanKind.CLIENT)
 73    @aio_documented_by(KustoClientSync.execute_mgmt)
 74    async def execute_mgmt(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
 75        database = self._get_database_or_default(database)
 76        Span.set_query_attributes(self._kusto_cluster, database, properties)
 77        request = ExecuteRequestParams._from_query(
 78            query,
 79            database,
 80            properties,
 81            self._request_headers,
 82            self._mgmt_default_timeout,
 83            self._mgmt_default_timeout,
 84            self._client_server_delta,
 85            self.client_details,
 86        )
 87        return await self._execute(self._mgmt_endpoint, request, properties)
 88
 89    @distributed_trace_async(name_of_span="AioKustoClient.streaming_ingest", kind=SpanKind.CLIENT)
 90    @aio_documented_by(KustoClientSync.execute_streaming_ingest)
 91    async def execute_streaming_ingest(
 92        self,
 93        database: Optional[str],
 94        table: str,
 95        stream: Optional[io.IOBase],
 96        blob_url: Optional[str],
 97        stream_format: Union[DataFormat, str],
 98        properties: ClientRequestProperties = None,
 99        mapping_name: str = None,
100    ):
101        database = self._get_database_or_default(database)
102
103        stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
104        endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
105        if mapping_name is not None:
106            endpoint = endpoint + "&mappingName=" + mapping_name
107
108        if blob_url:
109            endpoint += "&sourceKind=uri"
110            request = ExecuteRequestParams._from_blob_url(
111                blob_url,
112                properties,
113                self._request_headers,
114                self._streaming_ingest_default_timeout,
115                self._mgmt_default_timeout,
116                self._client_server_delta,
117                self.client_details,
118            )
119        elif stream:
120            request = ExecuteRequestParams._from_stream(
121                stream,
122                properties,
123                self._request_headers,
124                self._streaming_ingest_default_timeout,
125                self._mgmt_default_timeout,
126                self._client_server_delta,
127                self.client_details,
128            )
129        else:
130            raise Exception("execute_streaming_ingest is expecting either a stream or blob url")
131
132        Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
133        await self._execute(endpoint, request, properties)
134
135    @aio_documented_by(KustoClientSync._execute_streaming_query_parsed)
136    async def _execute_streaming_query_parsed(
137        self,
138        database: Optional[str],
139        query: str,
140        timeout: timedelta = _KustoClientBase._query_default_timeout,
141        properties: Optional[ClientRequestProperties] = None,
142    ) -> StreamingDataSetEnumerator:
143        request = ExecuteRequestParams._from_query(
144            query, database, properties, self._request_headers, timeout, self._mgmt_default_timeout, self._client_server_delta, self.client_details
145        )
146        response = await self._execute(self._query_endpoint, request, properties, stream_response=True)
147        return StreamingDataSetEnumerator(JsonTokenReader(response.content))
148
149    @distributed_trace_async(name_of_span="AioKustoClient.streaming_query", kind=SpanKind.CLIENT)
150    @aio_documented_by(KustoClientSync.execute_streaming_query)
151    async def execute_streaming_query(
152        self,
153        database: Optional[str],
154        query: str,
155        timeout: timedelta = _KustoClientBase._query_default_timeout,
156        properties: Optional[ClientRequestProperties] = None,
157    ) -> KustoStreamingResponseDataSet:
158        database = self._get_database_or_default(database)
159        Span.set_query_attributes(self._kusto_cluster, database, properties)
160
161        response = await self._execute_streaming_query_parsed(database, query, timeout, properties)
162        return KustoStreamingResponseDataSet(response)
163
164    @aio_documented_by(KustoClientSync._execute)
165    async def _execute(
166        self,
167        endpoint: str,
168        request: ExecuteRequestParams,
169        properties: Optional[ClientRequestProperties] = None,
170        stream_response: bool = False,
171    ) -> Union[KustoResponseDataSet, ClientResponse]:
172        """Executes given query against this client"""
173        if self._is_closed:
174            raise KustoClosedError()
175        self.validate_endpoint()
176
177        request_headers = request.request_headers
178        timeout = request.timeout
179        if self._aad_helper:
180            request_headers["Authorization"] = await self._aad_helper.acquire_authorization_header_async()
181
182        invoker = lambda: self._session.post(
183            endpoint,
184            headers=request_headers,
185            json=request.json_payload,
186            data=request.payload,
187            timeout=timeout.seconds,
188            proxy=self._proxy_url,
189            allow_redirects=False,
190        )
191
192        try:
193            response = await MonitoredActivity.invoke_async(
194                invoker, name_of_span="AioKustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers)
195            )
196        except Exception as e:
197            raise KustoNetworkError(endpoint, None if properties is None else properties.client_request_id) from e
198
199        if stream_response:
200            try:
201                response.raise_for_status()
202                if 300 <= response.status < 400:
203                    raise Exception("Unexpected redirection, got status code: " + str(response.status))
204                return response
205            except Exception as e:
206                try:
207                    response_text = await response.text()
208                except Exception:
209                    response_text = None
210                try:
211                    response_json = await response.json()
212                except Exception:
213                    response_json = None
214                raise self._handle_http_error(e, endpoint, request.payload, response, response.status, response_json, response_text)
215
216        async with response:
217            response_json = None
218            try:
219                if 300 <= response.status < 400:
220                    raise Exception("Unexpected redirection, got status code: " + str(response.status))
221                response_json = await response.json()
222                response.raise_for_status()
223            except Exception as e:
224                try:
225                    response_text = await response.text()
226                except Exception:
227                    response_text = None
228                raise self._handle_http_error(e, endpoint, request.payload, response, response.status, response_json, response_text)
229            return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="AioKustoClient.processing_response")

Kusto client for Python. The client is a wrapper around the Kusto REST API. To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/

The primary methods are: execute_query: executes a KQL query against the Kusto service. execute_mgmt: executes a KQL control command against the Kusto service.

@documented_by(KustoClientSync.__init__)
KustoClient(kcsb: Union[data.KustoConnectionStringBuilder, str])
29    @documented_by(KustoClientSync.__init__)
30    def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str]):
31        super().__init__(kcsb, True)
32
33        self._session = ClientSession()

Kusto Client constructor.

Parameters
  • kcsb: The connection string to initialize KustoClient.
async def close(self):
38    async def close(self):
39        if not self._is_closed:
40            await self._session.close()
41            if self._aad_helper:
42                await self._aad_helper.close_async()
43        super().close()
@aio_documented_by(KustoClientSync.execute)
async def execute( self, database: Optional[str], query: str, properties: data.ClientRequestProperties = None) -> data.response.KustoResponseDataSet:
48    @aio_documented_by(KustoClientSync.execute)
49    async def execute(self, database: Optional[str], query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
50        query = query.strip()
51        if query.startswith("."):
52            return await self.execute_mgmt(database, query, properties)
53        return await self.execute_query(database, query, properties)

Aio function: Executes a query or management command.

Parameters
  • Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
  • str query: Query to be executed.
  • azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns

Kusto response data set.

@distributed_trace_async(name_of_span='AioKustoClient.query_cmd', kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_query)
async def execute_query( self, database: str, query: str, properties: data.ClientRequestProperties = None) -> data.response.KustoResponseDataSet:
55    @distributed_trace_async(name_of_span="AioKustoClient.query_cmd", kind=SpanKind.CLIENT)
56    @aio_documented_by(KustoClientSync.execute_query)
57    async def execute_query(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
58        database = self._get_database_or_default(database)
59        Span.set_query_attributes(self._kusto_cluster, database, properties)
60        request = ExecuteRequestParams._from_query(
61            query,
62            database,
63            properties,
64            self._request_headers,
65            self._query_default_timeout,
66            self._mgmt_default_timeout,
67            self._client_server_delta,
68            self.client_details,
69        )
70        return await self._execute(self._query_endpoint, request, properties)

Aio function: Execute a KQL query. To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/

Parameters
  • Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
  • str query: Query to be executed.
  • azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns

Kusto response data set.

@distributed_trace_async(name_of_span='AioKustoClient.control_cmd', kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_mgmt)
async def execute_mgmt( self, database: str, query: str, properties: data.ClientRequestProperties = None) -> data.response.KustoResponseDataSet:
72    @distributed_trace_async(name_of_span="AioKustoClient.control_cmd", kind=SpanKind.CLIENT)
73    @aio_documented_by(KustoClientSync.execute_mgmt)
74    async def execute_mgmt(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
75        database = self._get_database_or_default(database)
76        Span.set_query_attributes(self._kusto_cluster, database, properties)
77        request = ExecuteRequestParams._from_query(
78            query,
79            database,
80            properties,
81            self._request_headers,
82            self._mgmt_default_timeout,
83            self._mgmt_default_timeout,
84            self._client_server_delta,
85            self.client_details,
86        )
87        return await self._execute(self._mgmt_endpoint, request, properties)

Aio function: Execute a KQL control command. To learn more about KQL control commands go to https://docs.microsoft.com/en-us/azure/kusto/management/

Parameters
  • Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
  • str query: Query to be executed.
  • azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns

Kusto response data set.

@distributed_trace_async(name_of_span='AioKustoClient.streaming_ingest', kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_streaming_ingest)
async def execute_streaming_ingest( self, database: Optional[str], table: str, stream: Optional[io.IOBase], blob_url: Optional[str], stream_format: Union[data.DataFormat, str], properties: data.ClientRequestProperties = None, mapping_name: str = None):
 89    @distributed_trace_async(name_of_span="AioKustoClient.streaming_ingest", kind=SpanKind.CLIENT)
 90    @aio_documented_by(KustoClientSync.execute_streaming_ingest)
 91    async def execute_streaming_ingest(
 92        self,
 93        database: Optional[str],
 94        table: str,
 95        stream: Optional[io.IOBase],
 96        blob_url: Optional[str],
 97        stream_format: Union[DataFormat, str],
 98        properties: ClientRequestProperties = None,
 99        mapping_name: str = None,
100    ):
101        database = self._get_database_or_default(database)
102
103        stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
104        endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
105        if mapping_name is not None:
106            endpoint = endpoint + "&mappingName=" + mapping_name
107
108        if blob_url:
109            endpoint += "&sourceKind=uri"
110            request = ExecuteRequestParams._from_blob_url(
111                blob_url,
112                properties,
113                self._request_headers,
114                self._streaming_ingest_default_timeout,
115                self._mgmt_default_timeout,
116                self._client_server_delta,
117                self.client_details,
118            )
119        elif stream:
120            request = ExecuteRequestParams._from_stream(
121                stream,
122                properties,
123                self._request_headers,
124                self._streaming_ingest_default_timeout,
125                self._mgmt_default_timeout,
126                self._client_server_delta,
127                self.client_details,
128            )
129        else:
130            raise Exception("execute_streaming_ingest is expecting either a stream or blob url")
131
132        Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
133        await self._execute(endpoint, request, properties)

Aio function: Execute streaming ingest against this client If the Kusto service is not configured to allow streaming ingestion, this may raise an error To learn more about streaming ingestion go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming

Parameters
  • Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string
  • str table: Target table.
  • Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest.
  • Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream.
  • DataFormat stream_format: Format of the data in the stream.
  • ClientRequestProperties properties: additional request properties.
  • str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
@distributed_trace_async(name_of_span='AioKustoClient.streaming_query', kind=SpanKind.CLIENT)
@aio_documented_by(KustoClientSync.execute_streaming_query)
async def execute_streaming_query( self, database: Optional[str], query: str, timeout: datetime.timedelta = datetime.timedelta(seconds=270), properties: Optional[data.ClientRequestProperties] = None) -> data.aio.response.KustoStreamingResponseDataSet:
149    @distributed_trace_async(name_of_span="AioKustoClient.streaming_query", kind=SpanKind.CLIENT)
150    @aio_documented_by(KustoClientSync.execute_streaming_query)
151    async def execute_streaming_query(
152        self,
153        database: Optional[str],
154        query: str,
155        timeout: timedelta = _KustoClientBase._query_default_timeout,
156        properties: Optional[ClientRequestProperties] = None,
157    ) -> KustoStreamingResponseDataSet:
158        database = self._get_database_or_default(database)
159        Span.set_query_attributes(self._kusto_cluster, database, properties)
160
161        response = await self._execute_streaming_query_parsed(database, query, timeout, properties)
162        return KustoStreamingResponseDataSet(response)

Aio function: Execute a KQL query without reading it all to memory. The resulting KustoStreamingResponseDataSet will stream one table at a time, and the rows can be retrieved sequentially.

Parameters
  • Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
  • str query: Query to be executed.
  • timedelta timeout: timeout for the query to be executed
  • azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns