data.aio
27@documented_by(KustoClientSync) 28class KustoClient(_KustoClientBase): 29 @documented_by(KustoClientSync.__init__) 30 def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str]): 31 super().__init__(kcsb, True) 32 33 self._session = ClientSession() 34 35 async def __aenter__(self) -> "KustoClient": 36 return self 37 38 async def close(self): 39 if not self._is_closed: 40 await self._session.close() 41 if self._aad_helper: 42 await self._aad_helper.close_async() 43 super().close() 44 45 async def __aexit__(self, exc_type, exc_val, exc_tb): 46 await self.close() 47 48 @aio_documented_by(KustoClientSync.execute) 49 async def execute(self, database: Optional[str], query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: 50 query = query.strip() 51 if query.startswith("."): 52 return await self.execute_mgmt(database, query, properties) 53 return await self.execute_query(database, query, properties) 54 55 @distributed_trace_async(name_of_span="AioKustoClient.query_cmd", kind=SpanKind.CLIENT) 56 @aio_documented_by(KustoClientSync.execute_query) 57 async def execute_query(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: 58 database = self._get_database_or_default(database) 59 Span.set_query_attributes(self._kusto_cluster, database, properties) 60 request = ExecuteRequestParams._from_query( 61 query, 62 database, 63 properties, 64 self._request_headers, 65 self._query_default_timeout, 66 self._mgmt_default_timeout, 67 self._client_server_delta, 68 self.client_details, 69 ) 70 return await self._execute(self._query_endpoint, request, properties) 71 72 @distributed_trace_async(name_of_span="AioKustoClient.control_cmd", kind=SpanKind.CLIENT) 73 @aio_documented_by(KustoClientSync.execute_mgmt) 74 async def execute_mgmt(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: 75 database = self._get_database_or_default(database) 76 Span.set_query_attributes(self._kusto_cluster, database, properties) 77 request = ExecuteRequestParams._from_query( 78 query, 79 database, 80 properties, 81 self._request_headers, 82 self._mgmt_default_timeout, 83 self._mgmt_default_timeout, 84 self._client_server_delta, 85 self.client_details, 86 ) 87 return await self._execute(self._mgmt_endpoint, request, properties) 88 89 @distributed_trace_async(name_of_span="AioKustoClient.streaming_ingest", kind=SpanKind.CLIENT) 90 @aio_documented_by(KustoClientSync.execute_streaming_ingest) 91 async def execute_streaming_ingest( 92 self, 93 database: Optional[str], 94 table: str, 95 stream: Optional[io.IOBase], 96 blob_url: Optional[str], 97 stream_format: Union[DataFormat, str], 98 properties: ClientRequestProperties = None, 99 mapping_name: str = None, 100 ): 101 database = self._get_database_or_default(database) 102 103 stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value 104 endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format 105 if mapping_name is not None: 106 endpoint = endpoint + "&mappingName=" + mapping_name 107 108 if blob_url: 109 endpoint += "&sourceKind=uri" 110 request = ExecuteRequestParams._from_blob_url( 111 blob_url, 112 properties, 113 self._request_headers, 114 self._streaming_ingest_default_timeout, 115 self._mgmt_default_timeout, 116 self._client_server_delta, 117 self.client_details, 118 ) 119 elif stream: 120 request = ExecuteRequestParams._from_stream( 121 stream, 122 properties, 123 self._request_headers, 124 self._streaming_ingest_default_timeout, 125 self._mgmt_default_timeout, 126 self._client_server_delta, 127 self.client_details, 128 ) 129 else: 130 raise Exception("execute_streaming_ingest is expecting either a stream or blob url") 131 132 Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties) 133 await self._execute(endpoint, request, properties) 134 135 @aio_documented_by(KustoClientSync._execute_streaming_query_parsed) 136 async def _execute_streaming_query_parsed( 137 self, 138 database: Optional[str], 139 query: str, 140 timeout: timedelta = _KustoClientBase._query_default_timeout, 141 properties: Optional[ClientRequestProperties] = None, 142 ) -> StreamingDataSetEnumerator: 143 request = ExecuteRequestParams._from_query( 144 query, database, properties, self._request_headers, timeout, self._mgmt_default_timeout, self._client_server_delta, self.client_details 145 ) 146 response = await self._execute(self._query_endpoint, request, properties, stream_response=True) 147 return StreamingDataSetEnumerator(JsonTokenReader(response.content)) 148 149 @distributed_trace_async(name_of_span="AioKustoClient.streaming_query", kind=SpanKind.CLIENT) 150 @aio_documented_by(KustoClientSync.execute_streaming_query) 151 async def execute_streaming_query( 152 self, 153 database: Optional[str], 154 query: str, 155 timeout: timedelta = _KustoClientBase._query_default_timeout, 156 properties: Optional[ClientRequestProperties] = None, 157 ) -> KustoStreamingResponseDataSet: 158 database = self._get_database_or_default(database) 159 Span.set_query_attributes(self._kusto_cluster, database, properties) 160 161 response = await self._execute_streaming_query_parsed(database, query, timeout, properties) 162 return KustoStreamingResponseDataSet(response) 163 164 @aio_documented_by(KustoClientSync._execute) 165 async def _execute( 166 self, 167 endpoint: str, 168 request: ExecuteRequestParams, 169 properties: Optional[ClientRequestProperties] = None, 170 stream_response: bool = False, 171 ) -> Union[KustoResponseDataSet, ClientResponse]: 172 """Executes given query against this client""" 173 if self._is_closed: 174 raise KustoClosedError() 175 self.validate_endpoint() 176 177 request_headers = request.request_headers 178 timeout = request.timeout 179 if self._aad_helper: 180 request_headers["Authorization"] = await self._aad_helper.acquire_authorization_header_async() 181 182 invoker = lambda: self._session.post( 183 endpoint, 184 headers=request_headers, 185 json=request.json_payload, 186 data=request.payload, 187 timeout=timeout.seconds, 188 proxy=self._proxy_url, 189 allow_redirects=False, 190 ) 191 192 try: 193 response = await MonitoredActivity.invoke_async( 194 invoker, name_of_span="AioKustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers) 195 ) 196 except Exception as e: 197 raise KustoNetworkError(endpoint, None if properties is None else properties.client_request_id) from e 198 199 if stream_response: 200 try: 201 response.raise_for_status() 202 if 300 <= response.status < 400: 203 raise Exception("Unexpected redirection, got status code: " + str(response.status)) 204 return response 205 except Exception as e: 206 try: 207 response_text = await response.text() 208 except Exception: 209 response_text = None 210 try: 211 response_json = await response.json() 212 except Exception: 213 response_json = None 214 raise self._handle_http_error(e, endpoint, request.payload, response, response.status, response_json, response_text) 215 216 async with response: 217 response_json = None 218 try: 219 if 300 <= response.status < 400: 220 raise Exception("Unexpected redirection, got status code: " + str(response.status)) 221 response_json = await response.json() 222 response.raise_for_status() 223 except Exception as e: 224 try: 225 response_text = await response.text() 226 except Exception: 227 response_text = None 228 raise self._handle_http_error(e, endpoint, request.payload, response, response.status, response_json, response_text) 229 return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="AioKustoClient.processing_response")
Kusto client for Python. The client is a wrapper around the Kusto REST API. To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/
The primary methods are:
execute_query: executes a KQL query against the Kusto service.
execute_mgmt: executes a KQL control command against the Kusto service.
29 @documented_by(KustoClientSync.__init__) 30 def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str]): 31 super().__init__(kcsb, True) 32 33 self._session = ClientSession()
Kusto Client constructor.
Parameters
- kcsb: The connection string to initialize KustoClient.
48 @aio_documented_by(KustoClientSync.execute) 49 async def execute(self, database: Optional[str], query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: 50 query = query.strip() 51 if query.startswith("."): 52 return await self.execute_mgmt(database, query, properties) 53 return await self.execute_query(database, query, properties)
Aio function: Executes a query or management command.
Parameters
- Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
- str query: Query to be executed.
- azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns
Kusto response data set.
55 @distributed_trace_async(name_of_span="AioKustoClient.query_cmd", kind=SpanKind.CLIENT) 56 @aio_documented_by(KustoClientSync.execute_query) 57 async def execute_query(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: 58 database = self._get_database_or_default(database) 59 Span.set_query_attributes(self._kusto_cluster, database, properties) 60 request = ExecuteRequestParams._from_query( 61 query, 62 database, 63 properties, 64 self._request_headers, 65 self._query_default_timeout, 66 self._mgmt_default_timeout, 67 self._client_server_delta, 68 self.client_details, 69 ) 70 return await self._execute(self._query_endpoint, request, properties)
Aio function: Execute a KQL query. To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/
Parameters
- Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
- str query: Query to be executed.
- azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns
Kusto response data set.
72 @distributed_trace_async(name_of_span="AioKustoClient.control_cmd", kind=SpanKind.CLIENT) 73 @aio_documented_by(KustoClientSync.execute_mgmt) 74 async def execute_mgmt(self, database: str, query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet: 75 database = self._get_database_or_default(database) 76 Span.set_query_attributes(self._kusto_cluster, database, properties) 77 request = ExecuteRequestParams._from_query( 78 query, 79 database, 80 properties, 81 self._request_headers, 82 self._mgmt_default_timeout, 83 self._mgmt_default_timeout, 84 self._client_server_delta, 85 self.client_details, 86 ) 87 return await self._execute(self._mgmt_endpoint, request, properties)
Aio function: Execute a KQL control command. To learn more about KQL control commands go to https://docs.microsoft.com/en-us/azure/kusto/management/
Parameters
- Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
- str query: Query to be executed.
- azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns
Kusto response data set.
89 @distributed_trace_async(name_of_span="AioKustoClient.streaming_ingest", kind=SpanKind.CLIENT) 90 @aio_documented_by(KustoClientSync.execute_streaming_ingest) 91 async def execute_streaming_ingest( 92 self, 93 database: Optional[str], 94 table: str, 95 stream: Optional[io.IOBase], 96 blob_url: Optional[str], 97 stream_format: Union[DataFormat, str], 98 properties: ClientRequestProperties = None, 99 mapping_name: str = None, 100 ): 101 database = self._get_database_or_default(database) 102 103 stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value 104 endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format 105 if mapping_name is not None: 106 endpoint = endpoint + "&mappingName=" + mapping_name 107 108 if blob_url: 109 endpoint += "&sourceKind=uri" 110 request = ExecuteRequestParams._from_blob_url( 111 blob_url, 112 properties, 113 self._request_headers, 114 self._streaming_ingest_default_timeout, 115 self._mgmt_default_timeout, 116 self._client_server_delta, 117 self.client_details, 118 ) 119 elif stream: 120 request = ExecuteRequestParams._from_stream( 121 stream, 122 properties, 123 self._request_headers, 124 self._streaming_ingest_default_timeout, 125 self._mgmt_default_timeout, 126 self._client_server_delta, 127 self.client_details, 128 ) 129 else: 130 raise Exception("execute_streaming_ingest is expecting either a stream or blob url") 131 132 Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties) 133 await self._execute(endpoint, request, properties)
Aio function: Execute streaming ingest against this client If the Kusto service is not configured to allow streaming ingestion, this may raise an error To learn more about streaming ingestion go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming
Parameters
- Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string
- str table: Target table.
- Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest.
- Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream.
- DataFormat stream_format: Format of the data in the stream.
- ClientRequestProperties properties: additional request properties.
- str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
149 @distributed_trace_async(name_of_span="AioKustoClient.streaming_query", kind=SpanKind.CLIENT) 150 @aio_documented_by(KustoClientSync.execute_streaming_query) 151 async def execute_streaming_query( 152 self, 153 database: Optional[str], 154 query: str, 155 timeout: timedelta = _KustoClientBase._query_default_timeout, 156 properties: Optional[ClientRequestProperties] = None, 157 ) -> KustoStreamingResponseDataSet: 158 database = self._get_database_or_default(database) 159 Span.set_query_attributes(self._kusto_cluster, database, properties) 160 161 response = await self._execute_streaming_query_parsed(database, query, timeout, properties) 162 return KustoStreamingResponseDataSet(response)
Aio function: Execute a KQL query without reading it all to memory. The resulting KustoStreamingResponseDataSet will stream one table at a time, and the rows can be retrieved sequentially.
Parameters
- Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
- str query: Query to be executed.
- timedelta timeout: timeout for the query to be executed
- azure.kusto.data.ClientRequestProperties properties: Optional additional properties.