data
1# Copyright (c) Microsoft Corporation. 2# Licensed under the MIT License. 3 4from ._version import VERSION as __version__ 5from .client import KustoClient 6from .client_request_properties import ClientRequestProperties 7from .kcsb import KustoConnectionStringBuilder 8from .data_format import DataFormat 9 10__all__ = [ 11 "__version__", 12 "KustoClient", 13 "ClientRequestProperties", 14 "KustoConnectionStringBuilder", 15 "DataFormat", 16]
49class KustoClient(_KustoClientBase): 50 """ 51 Kusto client for Python. 52 The client is a wrapper around the Kusto REST API. 53 To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/ 54 55 The primary methods are: 56 `execute_query`: executes a KQL query against the Kusto service. 57 `execute_mgmt`: executes a KQL control command against the Kusto service. 58 """ 59 60 _mgmt_default_timeout = timedelta(hours=1) 61 _query_default_timeout = timedelta(minutes=4) 62 _streaming_ingest_default_timeout = timedelta(minutes=10) 63 _client_server_delta = timedelta(seconds=30) 64 65 # The maximum amount of connections to be able to operate in parallel 66 _max_pool_size = 100 67 68 def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str]): 69 """ 70 Kusto Client constructor. 71 :param kcsb: The connection string to initialize KustoClient. 72 :type kcsb: azure.kusto.data.KustoConnectionStringBuilder or str 73 """ 74 super().__init__(kcsb, False) 75 76 # Create a session object for connection pooling 77 self._session = requests.Session() 78 79 adapter = HTTPAdapterWithSocketOptions( 80 socket_options=(HTTPConnection.default_socket_options or []) + self.compose_socket_options(), pool_maxsize=self._max_pool_size 81 ) 82 self._session.mount("http://", adapter) 83 self._session.mount("https://", adapter) 84 85 def close(self): 86 if not self._is_closed: 87 self._session.close() 88 if self._aad_helper: 89 self._aad_helper.close() 90 super().close() 91 92 def __enter__(self): 93 return self 94 95 def __exit__(self, exc_type, exc_val, exc_tb): 96 self.close() 97 98 def set_proxy(self, proxy_url: str): 99 super().set_proxy(proxy_url) 100 self._session.proxies = {"http": proxy_url, "https": proxy_url} 101 102 def set_http_retries(self, max_retries: int): 103 """ 104 Set the number of HTTP retries to attempt 105 """ 106 adapter = HTTPAdapterWithSocketOptions( 107 socket_options=(HTTPConnection.default_socket_options or []) + self.compose_socket_options(), 108 pool_maxsize=self._max_pool_size, 109 max_retries=max_retries, 110 ) 111 self._session.mount("http://", adapter) 112 self._session.mount("https://", adapter) 113 114 @staticmethod 115 def compose_socket_options() -> List[Tuple[int, int, int]]: 116 # Sends TCP Keep-Alive after MAX_IDLE_SECONDS seconds of idleness, once every INTERVAL_SECONDS seconds, and closes the connection after MAX_FAILED_KEEPALIVES failed pings (e.g. 20 => 1:00:30) 117 MAX_IDLE_SECONDS = 30 118 INTERVAL_SECONDS = 180 # Corresponds to Azure Load Balancer Service 4 minute timeout, with 1 minute of slack 119 MAX_FAILED_KEEPALIVES = 20 120 121 if ( 122 sys.platform == "linux" 123 and hasattr(socket, "SOL_SOCKET") 124 and hasattr(socket, "SO_KEEPALIVE") 125 and hasattr(socket, "TCP_KEEPIDLE") 126 and hasattr(socket, "TCP_KEEPINTVL") 127 and hasattr(socket, "TCP_KEEPCNT") 128 ): 129 return [ 130 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 131 (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, MAX_IDLE_SECONDS), 132 (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, INTERVAL_SECONDS), 133 (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, MAX_FAILED_KEEPALIVES), 134 ] 135 elif ( 136 sys.platform == "win32" 137 and hasattr(socket, "SOL_SOCKET") 138 and hasattr(socket, "SO_KEEPALIVE") 139 and hasattr(socket, "TCP_KEEPIDLE") 140 and hasattr(socket, "TCP_KEEPCNT") 141 ): 142 return [ 143 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 144 (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, MAX_IDLE_SECONDS), 145 (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, MAX_FAILED_KEEPALIVES), 146 ] 147 elif sys.platform == "darwin" and hasattr(socket, "SOL_SOCKET") and hasattr(socket, "SO_KEEPALIVE") and hasattr(socket, "IPPROTO_TCP"): 148 TCP_KEEPALIVE = 0x10 149 return [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), (socket.IPPROTO_TCP, TCP_KEEPALIVE, INTERVAL_SECONDS)] 150 else: 151 return [] 152 153 def execute(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet: 154 """ 155 Executes a query or management command. 156 :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string 157 :param str query: Query to be executed. 158 :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. 159 :return: Kusto response data set. 160 :rtype: azure.kusto.data.response.KustoResponseDataSet 161 """ 162 query = query.strip() 163 if query.startswith("."): 164 return self.execute_mgmt(database, query, properties) 165 return self.execute_query(database, query, properties) 166 167 @distributed_trace(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT) 168 def execute_query(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet: 169 """ 170 Execute a KQL query. 171 To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/ 172 :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string 173 :param str query: Query to be executed. 174 :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. 175 :return: Kusto response data set. 176 :rtype: azure.kusto.data.response.KustoResponseDataSet 177 """ 178 database = self._get_database_or_default(database) 179 Span.set_query_attributes(self._kusto_cluster, database, properties) 180 request = ExecuteRequestParams._from_query( 181 query, 182 database, 183 properties, 184 self._request_headers, 185 self._query_default_timeout, 186 self._mgmt_default_timeout, 187 self._client_server_delta, 188 self.client_details, 189 ) 190 return self._execute(self._query_endpoint, request, properties) 191 192 @distributed_trace(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT) 193 def execute_mgmt(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet: 194 """ 195 Execute a KQL control command. 196 To learn more about KQL control commands go to https://docs.microsoft.com/en-us/azure/kusto/management/ 197 :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string 198 :param str query: Query to be executed. 199 :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. 200 :return: Kusto response data set. 201 :rtype: azure.kusto.data.response.KustoResponseDataSet 202 """ 203 database = self._get_database_or_default(database) 204 Span.set_query_attributes(self._kusto_cluster, database, properties) 205 request = ExecuteRequestParams._from_query( 206 query, 207 database, 208 properties, 209 self._request_headers, 210 self._mgmt_default_timeout, 211 self._mgmt_default_timeout, 212 self._client_server_delta, 213 self.client_details, 214 ) 215 return self._execute(self._mgmt_endpoint, request, properties) 216 217 @distributed_trace(name_of_span="KustoClient.streaming_ingest", kind=SpanKind.CLIENT) 218 def execute_streaming_ingest( 219 self, 220 database: Optional[str], 221 table: str, 222 stream: Optional[IO[AnyStr]], 223 blob_url: Optional[str], 224 stream_format: Union[DataFormat, str], 225 properties: Optional[ClientRequestProperties] = None, 226 mapping_name: str = None, 227 ): 228 """ 229 Execute streaming ingest against this client 230 If the Kusto service is not configured to allow streaming ingestion, this may raise an error 231 To learn more about streaming ingestion go to: 232 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming 233 :param Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string 234 :param str table: Target table. 235 :param Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest. 236 :param Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream. 237 :param DataFormat stream_format: Format of the data in the stream. 238 :param ClientRequestProperties properties: additional request properties. 239 :param str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro. 240 """ 241 database = self._get_database_or_default(database) 242 243 stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value 244 endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format 245 if mapping_name is not None: 246 endpoint = endpoint + "&mappingName=" + mapping_name 247 if blob_url: 248 endpoint += "&sourceKind=uri" 249 request = ExecuteRequestParams._from_blob_url( 250 blob_url, 251 properties, 252 self._request_headers, 253 self._streaming_ingest_default_timeout, 254 self._mgmt_default_timeout, 255 self._client_server_delta, 256 self.client_details, 257 ) 258 elif stream: 259 request = ExecuteRequestParams._from_stream( 260 stream, 261 properties, 262 self._request_headers, 263 self._streaming_ingest_default_timeout, 264 self._mgmt_default_timeout, 265 self._client_server_delta, 266 self.client_details, 267 ) 268 else: 269 raise Exception("execute_streaming_ingest is expecting either a stream or blob url") 270 271 Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties) 272 self._execute(endpoint, request, properties) 273 274 def _execute_streaming_query_parsed( 275 self, 276 database: Optional[str], 277 query: str, 278 timeout: timedelta = _KustoClientBase._query_default_timeout, 279 properties: Optional[ClientRequestProperties] = None, 280 ) -> StreamingDataSetEnumerator: 281 request = ExecuteRequestParams._from_query( 282 query, database, properties, self._request_headers, timeout, self._mgmt_default_timeout, self._client_server_delta, self.client_details 283 ) 284 response = self._execute(self._query_endpoint, request, properties, stream_response=True) 285 response.raw.decode_content = True 286 return StreamingDataSetEnumerator(JsonTokenReader(response.raw)) 287 288 @distributed_trace(name_of_span="KustoClient.streaming_query", kind=SpanKind.CLIENT) 289 def execute_streaming_query( 290 self, 291 database: Optional[str], 292 query: str, 293 timeout: timedelta = _KustoClientBase._query_default_timeout, 294 properties: Optional[ClientRequestProperties] = None, 295 ) -> KustoStreamingResponseDataSet: 296 """ 297 Execute a KQL query without reading it all to memory. 298 The resulting KustoStreamingResponseDataSet will stream one table at a time, and the rows can be retrieved sequentially. 299 300 :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string 301 :param str query: Query to be executed. 302 :param timedelta timeout: timeout for the query to be executed 303 :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. 304 :return KustoStreamingResponseDataSet: 305 """ 306 Span.set_query_attributes(self._kusto_cluster, database, properties) 307 308 return KustoStreamingResponseDataSet(self._execute_streaming_query_parsed(database, query, timeout, properties)) 309 310 def _execute( 311 self, 312 endpoint: str, 313 request: ExecuteRequestParams, 314 properties: Optional[ClientRequestProperties] = None, 315 stream_response: bool = False, 316 ) -> Union[KustoResponseDataSet, Response]: 317 """Executes given query against this client""" 318 if self._is_closed: 319 raise KustoClosedError() 320 self.validate_endpoint() 321 322 request_headers = request.request_headers 323 if self._aad_helper: 324 request_headers["Authorization"] = self._aad_helper.acquire_authorization_header() 325 326 # trace http post call for response 327 invoker = lambda: self._session.post( 328 endpoint, 329 headers=request_headers, 330 json=request.json_payload, 331 data=request.payload, 332 timeout=request.timeout.seconds, 333 stream=stream_response, 334 allow_redirects=False, 335 ) 336 337 try: 338 response = MonitoredActivity.invoke( 339 invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers) 340 ) 341 except Exception as e: 342 raise KustoNetworkError(endpoint, None if properties is None else properties.client_request_id) from e 343 344 if stream_response: 345 try: 346 response.raise_for_status() 347 if 300 <= response.status_code < 400: 348 raise Exception("Unexpected redirection, got status code: " + str(response.status)) 349 return response 350 except Exception as e: 351 raise self._handle_http_error(e, self._query_endpoint, None, response, response.status_code, response.json(), response.text) 352 353 response_json = None 354 try: 355 if 300 <= response.status_code < 400: 356 raise Exception("Unexpected redirection, got status code: " + str(response.status)) 357 if response.text: 358 response_json = response.json() 359 else: 360 raise KustoServiceError("The content of the response contains no data.", response) 361 response.raise_for_status() 362 except Exception as e: 363 raise self._handle_http_error(e, endpoint, request.payload, response, response.status_code, response_json, response.text) 364 # trace response processing 365 return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="KustoClient.processing_response")
Kusto client for Python. The client is a wrapper around the Kusto REST API. To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/
The primary methods are:
execute_query: executes a KQL query against the Kusto service.
execute_mgmt: executes a KQL control command against the Kusto service.
68 def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str]): 69 """ 70 Kusto Client constructor. 71 :param kcsb: The connection string to initialize KustoClient. 72 :type kcsb: azure.kusto.data.KustoConnectionStringBuilder or str 73 """ 74 super().__init__(kcsb, False) 75 76 # Create a session object for connection pooling 77 self._session = requests.Session() 78 79 adapter = HTTPAdapterWithSocketOptions( 80 socket_options=(HTTPConnection.default_socket_options or []) + self.compose_socket_options(), pool_maxsize=self._max_pool_size 81 ) 82 self._session.mount("http://", adapter) 83 self._session.mount("https://", adapter)
Kusto Client constructor.
Parameters
- kcsb: The connection string to initialize KustoClient.
102 def set_http_retries(self, max_retries: int): 103 """ 104 Set the number of HTTP retries to attempt 105 """ 106 adapter = HTTPAdapterWithSocketOptions( 107 socket_options=(HTTPConnection.default_socket_options or []) + self.compose_socket_options(), 108 pool_maxsize=self._max_pool_size, 109 max_retries=max_retries, 110 ) 111 self._session.mount("http://", adapter) 112 self._session.mount("https://", adapter)
Set the number of HTTP retries to attempt
114 @staticmethod 115 def compose_socket_options() -> List[Tuple[int, int, int]]: 116 # Sends TCP Keep-Alive after MAX_IDLE_SECONDS seconds of idleness, once every INTERVAL_SECONDS seconds, and closes the connection after MAX_FAILED_KEEPALIVES failed pings (e.g. 20 => 1:00:30) 117 MAX_IDLE_SECONDS = 30 118 INTERVAL_SECONDS = 180 # Corresponds to Azure Load Balancer Service 4 minute timeout, with 1 minute of slack 119 MAX_FAILED_KEEPALIVES = 20 120 121 if ( 122 sys.platform == "linux" 123 and hasattr(socket, "SOL_SOCKET") 124 and hasattr(socket, "SO_KEEPALIVE") 125 and hasattr(socket, "TCP_KEEPIDLE") 126 and hasattr(socket, "TCP_KEEPINTVL") 127 and hasattr(socket, "TCP_KEEPCNT") 128 ): 129 return [ 130 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 131 (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, MAX_IDLE_SECONDS), 132 (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, INTERVAL_SECONDS), 133 (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, MAX_FAILED_KEEPALIVES), 134 ] 135 elif ( 136 sys.platform == "win32" 137 and hasattr(socket, "SOL_SOCKET") 138 and hasattr(socket, "SO_KEEPALIVE") 139 and hasattr(socket, "TCP_KEEPIDLE") 140 and hasattr(socket, "TCP_KEEPCNT") 141 ): 142 return [ 143 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 144 (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, MAX_IDLE_SECONDS), 145 (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, MAX_FAILED_KEEPALIVES), 146 ] 147 elif sys.platform == "darwin" and hasattr(socket, "SOL_SOCKET") and hasattr(socket, "SO_KEEPALIVE") and hasattr(socket, "IPPROTO_TCP"): 148 TCP_KEEPALIVE = 0x10 149 return [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), (socket.IPPROTO_TCP, TCP_KEEPALIVE, INTERVAL_SECONDS)] 150 else: 151 return []
153 def execute(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet: 154 """ 155 Executes a query or management command. 156 :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string 157 :param str query: Query to be executed. 158 :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. 159 :return: Kusto response data set. 160 :rtype: azure.kusto.data.response.KustoResponseDataSet 161 """ 162 query = query.strip() 163 if query.startswith("."): 164 return self.execute_mgmt(database, query, properties) 165 return self.execute_query(database, query, properties)
Executes a query or management command.
Parameters
- Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
- str query: Query to be executed.
- azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns
Kusto response data set.
167 @distributed_trace(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT) 168 def execute_query(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet: 169 """ 170 Execute a KQL query. 171 To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/ 172 :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string 173 :param str query: Query to be executed. 174 :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. 175 :return: Kusto response data set. 176 :rtype: azure.kusto.data.response.KustoResponseDataSet 177 """ 178 database = self._get_database_or_default(database) 179 Span.set_query_attributes(self._kusto_cluster, database, properties) 180 request = ExecuteRequestParams._from_query( 181 query, 182 database, 183 properties, 184 self._request_headers, 185 self._query_default_timeout, 186 self._mgmt_default_timeout, 187 self._client_server_delta, 188 self.client_details, 189 ) 190 return self._execute(self._query_endpoint, request, properties)
Execute a KQL query. To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/
Parameters
- Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
- str query: Query to be executed.
- azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns
Kusto response data set.
192 @distributed_trace(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT) 193 def execute_mgmt(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet: 194 """ 195 Execute a KQL control command. 196 To learn more about KQL control commands go to https://docs.microsoft.com/en-us/azure/kusto/management/ 197 :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string 198 :param str query: Query to be executed. 199 :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. 200 :return: Kusto response data set. 201 :rtype: azure.kusto.data.response.KustoResponseDataSet 202 """ 203 database = self._get_database_or_default(database) 204 Span.set_query_attributes(self._kusto_cluster, database, properties) 205 request = ExecuteRequestParams._from_query( 206 query, 207 database, 208 properties, 209 self._request_headers, 210 self._mgmt_default_timeout, 211 self._mgmt_default_timeout, 212 self._client_server_delta, 213 self.client_details, 214 ) 215 return self._execute(self._mgmt_endpoint, request, properties)
Execute a KQL control command. To learn more about KQL control commands go to https://docs.microsoft.com/en-us/azure/kusto/management/
Parameters
- Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
- str query: Query to be executed.
- azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns
Kusto response data set.
217 @distributed_trace(name_of_span="KustoClient.streaming_ingest", kind=SpanKind.CLIENT) 218 def execute_streaming_ingest( 219 self, 220 database: Optional[str], 221 table: str, 222 stream: Optional[IO[AnyStr]], 223 blob_url: Optional[str], 224 stream_format: Union[DataFormat, str], 225 properties: Optional[ClientRequestProperties] = None, 226 mapping_name: str = None, 227 ): 228 """ 229 Execute streaming ingest against this client 230 If the Kusto service is not configured to allow streaming ingestion, this may raise an error 231 To learn more about streaming ingestion go to: 232 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming 233 :param Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string 234 :param str table: Target table. 235 :param Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest. 236 :param Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream. 237 :param DataFormat stream_format: Format of the data in the stream. 238 :param ClientRequestProperties properties: additional request properties. 239 :param str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro. 240 """ 241 database = self._get_database_or_default(database) 242 243 stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value 244 endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format 245 if mapping_name is not None: 246 endpoint = endpoint + "&mappingName=" + mapping_name 247 if blob_url: 248 endpoint += "&sourceKind=uri" 249 request = ExecuteRequestParams._from_blob_url( 250 blob_url, 251 properties, 252 self._request_headers, 253 self._streaming_ingest_default_timeout, 254 self._mgmt_default_timeout, 255 self._client_server_delta, 256 self.client_details, 257 ) 258 elif stream: 259 request = ExecuteRequestParams._from_stream( 260 stream, 261 properties, 262 self._request_headers, 263 self._streaming_ingest_default_timeout, 264 self._mgmt_default_timeout, 265 self._client_server_delta, 266 self.client_details, 267 ) 268 else: 269 raise Exception("execute_streaming_ingest is expecting either a stream or blob url") 270 271 Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties) 272 self._execute(endpoint, request, properties)
Execute streaming ingest against this client If the Kusto service is not configured to allow streaming ingestion, this may raise an error To learn more about streaming ingestion go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming
Parameters
- Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string
- str table: Target table.
- Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest.
- Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream.
- DataFormat stream_format: Format of the data in the stream.
- ClientRequestProperties properties: additional request properties.
- str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
288 @distributed_trace(name_of_span="KustoClient.streaming_query", kind=SpanKind.CLIENT) 289 def execute_streaming_query( 290 self, 291 database: Optional[str], 292 query: str, 293 timeout: timedelta = _KustoClientBase._query_default_timeout, 294 properties: Optional[ClientRequestProperties] = None, 295 ) -> KustoStreamingResponseDataSet: 296 """ 297 Execute a KQL query without reading it all to memory. 298 The resulting KustoStreamingResponseDataSet will stream one table at a time, and the rows can be retrieved sequentially. 299 300 :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string 301 :param str query: Query to be executed. 302 :param timedelta timeout: timeout for the query to be executed 303 :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties. 304 :return KustoStreamingResponseDataSet: 305 """ 306 Span.set_query_attributes(self._kusto_cluster, database, properties) 307 308 return KustoStreamingResponseDataSet(self._execute_streaming_query_parsed(database, query, timeout, properties))
Execute a KQL query without reading it all to memory. The resulting KustoStreamingResponseDataSet will stream one table at a time, and the rows can be retrieved sequentially.
Parameters
- Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
- str query: Query to be executed.
- timedelta timeout: timeout for the query to be executed
- azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns
8class ClientRequestProperties: 9 """This class is a POD used by client making requests to describe specific needs from the service executing the requests. 10 For more information please look at: https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties 11 """ 12 13 client_request_id: str 14 application: str 15 user: str 16 _CLIENT_REQUEST_ID = "client_request_id" 17 18 results_defer_partial_query_failures_option_name = "deferpartialqueryfailures" 19 request_timeout_option_name = "servertimeout" 20 no_request_timeout_option_name = "norequesttimeout" 21 22 def __init__(self): 23 self._options = {} 24 self._parameters = {} 25 self.client_request_id = None 26 self.application = None 27 self.user = None 28 29 def set_parameter(self, name: str, value: str): 30 """Sets a parameter's value""" 31 assert_string_is_not_empty(name) 32 self._parameters[name] = value 33 34 def has_parameter(self, name: str) -> bool: 35 """Checks if a parameter is specified.""" 36 return name in self._parameters 37 38 def get_parameter(self, name: str, default_value: str) -> str: 39 """Gets a parameter's value.""" 40 return self._parameters.get(name, default_value) 41 42 def set_option(self, name: str, value: Any): 43 """Sets an option's value""" 44 assert_string_is_not_empty(name) 45 self._options[name] = value 46 47 def has_option(self, name: str) -> bool: 48 """Checks if an option is specified.""" 49 return name in self._options 50 51 def get_option(self, name: str, default_value: Any) -> str: 52 """Gets an option's value.""" 53 return self._options.get(name, default_value) 54 55 def to_json(self) -> str: 56 """Safe serialization to a JSON string.""" 57 return json.dumps({"Options": self._options, "Parameters": self._parameters}, default=str) 58 59 def get_tracing_attributes(self) -> dict: 60 """Gets dictionary of attributes to be documented during tracing""" 61 return {self._CLIENT_REQUEST_ID: str(self.client_request_id)}
This class is a POD used by client making requests to describe specific needs from the service executing the requests. For more information please look at: https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties
29 def set_parameter(self, name: str, value: str): 30 """Sets a parameter's value""" 31 assert_string_is_not_empty(name) 32 self._parameters[name] = value
Sets a parameter's value
34 def has_parameter(self, name: str) -> bool: 35 """Checks if a parameter is specified.""" 36 return name in self._parameters
Checks if a parameter is specified.
38 def get_parameter(self, name: str, default_value: str) -> str: 39 """Gets a parameter's value.""" 40 return self._parameters.get(name, default_value)
Gets a parameter's value.
42 def set_option(self, name: str, value: Any): 43 """Sets an option's value""" 44 assert_string_is_not_empty(name) 45 self._options[name] = value
Sets an option's value
47 def has_option(self, name: str) -> bool: 48 """Checks if an option is specified.""" 49 return name in self._options
Checks if an option is specified.
51 def get_option(self, name: str, default_value: Any) -> str: 52 """Gets an option's value.""" 53 return self._options.get(name, default_value)
Gets an option's value.
116class KustoConnectionStringBuilder: 117 """ 118 Parses Kusto connection strings. 119 For usages, check out the sample at: 120 https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py 121 """ 122 123 DEFAULT_DATABASE_NAME = "NetDefaultDB" 124 125 interactive_login: bool = False 126 az_cli_login: bool = False 127 device_login: bool = False 128 token_credential_login: bool = False 129 130 device_callback: DeviceCallbackType = None 131 msi_authentication: bool = False 132 msi_parameters: Optional[dict] = None 133 134 token_provider: Optional[Callable[[], str]] = None 135 async_token_provider: Optional[Callable[[], Coroutine[None, None, str]]] = None 136 137 application_for_tracing: Optional[str] = None 138 user_name_for_tracing: Optional[str] = None 139 140 azure_credential: Optional[Any] = None 141 azure_credential_from_login_endpoint: Optional[Any] = None 142 143 application_public_certificate: Optional[str] = None 144 145 def __init__(self, connection_string: str): 146 """ 147 Creates new KustoConnectionStringBuilder. 148 :param str connection_string: Kusto connection string should be of the format: 149 https://<clusterName>.kusto.windows.net;AAD User ID="user@microsoft.com";Password=P@ssWord 150 For more information please look at: 151 https://kusto.azurewebsites.net/docs/concepts/kusto_connection_strings.html 152 """ 153 assert_string_is_not_empty(connection_string) 154 self._internal_dict = {} 155 156 if connection_string is not None and "=" not in connection_string.partition(";")[0]: 157 connection_string = "Data Source=" + connection_string 158 159 self[SupportedKeywords.AUTHORITY_ID] = "organizations" 160 161 for kvp_string in connection_string.split(";"): 162 key, _, value = kvp_string.partition("=") 163 keyword = Keyword.parse(key) 164 165 value_stripped = value.strip() 166 if keyword.is_str_type(): 167 if keyword.name == SupportedKeywords.DATA_SOURCE: 168 self[keyword.name] = value_stripped.rstrip("/") 169 self._parse_data_source(self.data_source) 170 elif keyword.name == SupportedKeywords.TRACE_USER_NAME: 171 self.user_name_for_tracing = value_stripped 172 elif keyword.name == SupportedKeywords.TRACE_APP_NAME: 173 self.application_for_tracing = value_stripped 174 else: 175 self[keyword.name] = value_stripped 176 elif keyword.is_bool_type(): 177 if value_stripped in ["True", "true"]: 178 self[keyword.name] = True 179 elif value_stripped in ["False", "false"]: 180 self[keyword.name] = False 181 else: 182 raise KeyError("Expected aad federated security to be bool. Recieved %s" % value) 183 184 if self.initial_catalog is None: 185 self.initial_catalog = self.DEFAULT_DATABASE_NAME 186 187 def __setitem__(self, key: "Union[SupportedKeywords, str]", value: Union[str, bool, dict]): 188 keyword = Keyword.parse(key) 189 190 if value is None: 191 raise TypeError("Value cannot be None.") 192 193 if keyword.is_str_type(): 194 self._internal_dict[keyword.name] = value.strip() 195 elif keyword.is_bool_type(): 196 if not isinstance(value, bool): 197 raise TypeError("Expected %s to be bool" % key) 198 self._internal_dict[keyword.name] = value 199 else: 200 raise KeyError("KustoConnectionStringBuilder supports only bools and strings.") 201 202 @classmethod 203 def with_aad_user_password_authentication( 204 cls, connection_string: str, user_id: str, password: str, authority_id: str = "organizations" 205 ) -> "KustoConnectionStringBuilder": 206 """ 207 Creates a KustoConnection string builder that will authenticate with AAD user name and 208 password. 209 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 210 :param str user_id: AAD user ID. 211 :param str password: Corresponding password of the AAD user. 212 :param str authority_id: optional param. defaults to "organizations" 213 """ 214 assert_string_is_not_empty(user_id) 215 assert_string_is_not_empty(password) 216 217 kcsb = cls(connection_string) 218 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 219 kcsb[SupportedKeywords.USER_ID] = user_id 220 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 221 kcsb[SupportedKeywords.PASSWORD] = password 222 223 return kcsb 224 225 @classmethod 226 def with_aad_user_token_authentication(cls, connection_string: str, user_token: str) -> "KustoConnectionStringBuilder": 227 """ 228 Creates a KustoConnection string builder that will authenticate with AAD application and 229 a certificate credentials. 230 :param str connection_string: Kusto connection string should be of the format: 231 https://<clusterName>.kusto.windows.net 232 :param str user_token: AAD user token. 233 """ 234 assert_string_is_not_empty(user_token) 235 236 kcsb = cls(connection_string) 237 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 238 kcsb[SupportedKeywords.USER_TOKEN] = user_token 239 240 return kcsb 241 242 @classmethod 243 def with_aad_application_key_authentication( 244 cls, connection_string: str, aad_app_id: str, app_key: str, authority_id: str 245 ) -> "KustoConnectionStringBuilder": 246 """ 247 Creates a KustoConnection string builder that will authenticate with AAD application and key. 248 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 249 :param str aad_app_id: AAD application ID. 250 :param str app_key: Corresponding key of the AAD application. 251 :param str authority_id: Authority id (aka Tenant id) must be provided 252 """ 253 assert_string_is_not_empty(aad_app_id) 254 assert_string_is_not_empty(app_key) 255 assert_string_is_not_empty(authority_id) 256 257 kcsb = cls(connection_string) 258 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 259 kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id 260 kcsb[SupportedKeywords.APPLICATION_KEY] = app_key 261 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 262 263 return kcsb 264 265 @classmethod 266 def with_aad_application_certificate_authentication( 267 cls, connection_string: str, aad_app_id: str, certificate: str, thumbprint: str, authority_id: str 268 ) -> "KustoConnectionStringBuilder": 269 """ 270 Creates a KustoConnection string builder that will authenticate with AAD application using 271 a certificate. 272 :param str connection_string: Kusto connection string should be of the format: 273 https://<clusterName>.kusto.windows.net 274 :param str aad_app_id: AAD application ID. 275 :param str certificate: A PEM encoded certificate private key. 276 :param str thumbprint: hex encoded thumbprint of the certificate. 277 :param str authority_id: Authority id (aka Tenant id) must be provided 278 """ 279 assert_string_is_not_empty(aad_app_id) 280 assert_string_is_not_empty(certificate) 281 assert_string_is_not_empty(thumbprint) 282 assert_string_is_not_empty(authority_id) 283 284 kcsb = cls(connection_string) 285 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 286 kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id 287 kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = certificate 288 kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint 289 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 290 291 return kcsb 292 293 @classmethod 294 def with_aad_application_certificate_sni_authentication( 295 cls, connection_string: str, aad_app_id: str, private_certificate: str, public_certificate: str, thumbprint: str, authority_id: str 296 ) -> "KustoConnectionStringBuilder": 297 """ 298 Creates a KustoConnection string builder that will authenticate with AAD application using 299 a certificate Subject Name and Issuer. 300 :param str connection_string: Kusto connection string should be of the format: 301 https://<clusterName>.kusto.windows.net 302 :param str aad_app_id: AAD application ID. 303 :param str private_certificate: A PEM encoded certificate private key. 304 :param str public_certificate: A public certificate matching the provided PEM certificate private key. 305 :param str thumbprint: hex encoded thumbprint of the certificate. 306 :param str authority_id: Authority id (aka Tenant id) must be provided 307 """ 308 assert_string_is_not_empty(aad_app_id) 309 assert_string_is_not_empty(private_certificate) 310 assert_string_is_not_empty(public_certificate) 311 assert_string_is_not_empty(thumbprint) 312 assert_string_is_not_empty(authority_id) 313 314 kcsb = cls(connection_string) 315 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 316 kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id 317 kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = private_certificate 318 kcsb.application_public_certificate = public_certificate 319 kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint 320 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 321 322 return kcsb 323 324 @classmethod 325 def with_aad_application_token_authentication(cls, connection_string: str, application_token: str) -> "KustoConnectionStringBuilder": 326 """ 327 Creates a KustoConnection string builder that will authenticate with AAD application and 328 an application token. 329 :param str connection_string: Kusto connection string should be of the format: 330 https://<clusterName>.kusto.windows.net 331 :param str application_token: AAD application token. 332 """ 333 assert_string_is_not_empty(application_token) 334 kcsb = cls(connection_string) 335 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 336 kcsb[SupportedKeywords.APPLICATION_TOKEN] = application_token 337 338 return kcsb 339 340 @classmethod 341 def with_aad_device_authentication( 342 cls, connection_string: str, authority_id: str = "organizations", callback: DeviceCallbackType = None 343 ) -> "KustoConnectionStringBuilder": 344 """ 345 Creates a KustoConnection string builder that will authenticate with AAD application and 346 password. 347 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 348 :param str authority_id: optional param. defaults to "organizations" 349 :param DeviceCallbackType callback: options callback function to be called when authentication is required, accepts three parameters: 350 - ``verification_uri`` (str) the URL the user must visit 351 - ``user_code`` (str) the code the user must enter there 352 - ``expires_on`` (datetime.datetime) the UTC time at which the code will expire 353 """ 354 kcsb = cls(connection_string) 355 kcsb.device_login = True 356 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 357 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 358 kcsb.device_callback = callback 359 360 return kcsb 361 362 @classmethod 363 def with_az_cli_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder": 364 """ 365 Creates a KustoConnection string builder that will use existing authenticated az cli profile 366 password. 367 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 368 """ 369 kcsb = cls(connection_string) 370 kcsb.az_cli_login = True 371 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 372 373 return kcsb 374 375 @classmethod 376 def with_aad_managed_service_identity_authentication( 377 cls, connection_string: str, client_id: str = None, object_id: str = None, msi_res_id: str = None, timeout: int = None 378 ) -> "KustoConnectionStringBuilder": 379 """ 380 Creates a KustoConnection string builder that will authenticate with AAD application, using 381 an application token obtained from a Microsoft Service Identity endpoint. An optional user 382 assigned application ID can be added to the token. 383 384 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 385 :param client_id: an optional user assigned identity provided as an Azure ID of a client 386 :param object_id: an optional user assigned identity provided as an Azure ID of an object 387 :param msi_res_id: an optional user assigned identity provided as an Azure ID of an MSI resource 388 :param timeout: an optional timeout (seconds) to wait for an MSI Authentication to occur 389 """ 390 391 kcsb = cls(connection_string) 392 params = {} 393 exclusive_pcount = 0 394 395 if timeout is not None: 396 params["connection_timeout"] = timeout 397 398 if client_id is not None: 399 params["client_id"] = client_id 400 exclusive_pcount += 1 401 402 if object_id is not None: 403 # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity 404 raise ValueError("User Managed Service Identity with object_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead.") 405 # noinspection PyUnreachableCode 406 params["object_id"] = object_id 407 exclusive_pcount += 1 408 409 if msi_res_id is not None: 410 # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity 411 raise ValueError( 412 "User Managed Service Identity with msi_res_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead." 413 ) 414 # noinspection PyUnreachableCode 415 params["msi_res_id"] = msi_res_id 416 exclusive_pcount += 1 417 418 if exclusive_pcount > 1: 419 raise ValueError("the following parameters are mutually exclusive and can not be provided at the same time: client_uid, object_id, msi_res_id") 420 421 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 422 kcsb.msi_authentication = True 423 kcsb.msi_parameters = params 424 425 return kcsb 426 427 @classmethod 428 def with_token_provider(cls, connection_string: str, token_provider: Callable[[], str]) -> "KustoConnectionStringBuilder": 429 """ 430 Create a KustoConnectionStringBuilder that uses a callback function to obtain a connection token 431 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 432 :param token_provider: a parameterless function that returns a valid bearer token for the relevant kusto resource as a string 433 """ 434 435 assert callable(token_provider) 436 437 kcsb = cls(connection_string) 438 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 439 kcsb.token_provider = token_provider 440 441 return kcsb 442 443 @classmethod 444 def with_async_token_provider( 445 cls, 446 connection_string: str, 447 async_token_provider: Callable[[], Coroutine[None, None, str]], 448 ) -> "KustoConnectionStringBuilder": 449 """ 450 Create a KustoConnectionStringBuilder that uses an async callback function to obtain a connection token 451 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 452 :param async_token_provider: a parameterless function that after awaiting returns a valid bearer token for the relevant kusto resource as a string 453 """ 454 455 assert callable(async_token_provider) 456 457 kcsb = cls(connection_string) 458 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 459 kcsb.async_token_provider = async_token_provider 460 461 return kcsb 462 463 @classmethod 464 def with_interactive_login( 465 cls, connection_string: str, user_id_hint: Optional[str] = None, tenant_hint: Optional[str] = None 466 ) -> "KustoConnectionStringBuilder": 467 kcsb = cls(connection_string) 468 kcsb.interactive_login = True 469 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 470 if user_id_hint is not None: 471 kcsb[SupportedKeywords.USER_ID] = user_id_hint 472 473 if tenant_hint is not None: 474 kcsb[SupportedKeywords.AUTHORITY_ID] = tenant_hint 475 476 return kcsb 477 478 @classmethod 479 def with_azure_token_credential( 480 cls, 481 connection_string: str, 482 credential: Optional[Any] = None, 483 credential_from_login_endpoint: Optional[Callable[[str], Any]] = None, 484 ) -> "KustoConnectionStringBuilder": 485 """ 486 Create a KustoConnectionStringBuilder that uses an azure token credential to obtain a connection token. 487 :param connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 488 :param credential: an optional token credential to use for authentication 489 :param credential_from_login_endpoint: an optional function that returns a token credential for the relevant kusto resource 490 """ 491 kcsb = cls(connection_string) 492 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 493 kcsb.token_credential_login = True 494 kcsb.azure_credential = credential 495 kcsb.azure_credential_from_login_endpoint = credential_from_login_endpoint 496 497 return kcsb 498 499 @classmethod 500 def with_no_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder": 501 """ 502 Create a KustoConnectionStringBuilder that uses no authentication. 503 :param connection_string: Kusto's connection string should be of the format: http://<clusterName>.kusto.windows.net 504 """ 505 if not connection_string.startswith("http://"): 506 raise ValueError("Connection string must start with http://") 507 kcsb = cls(connection_string) 508 kcsb[SupportedKeywords.FEDERATED_SECURITY] = False 509 510 return kcsb 511 512 @property 513 def data_source(self) -> Optional[str]: 514 """The URI specifying the Kusto service endpoint. 515 For example, https://kuskus.kusto.windows.net or net.tcp://localhost 516 """ 517 return self._internal_dict.get(SupportedKeywords.DATA_SOURCE) 518 519 @property 520 def initial_catalog(self) -> Optional[str]: 521 """The default database to be used for requests. 522 By default, it is set to 'NetDefaultDB'. 523 """ 524 return self._internal_dict.get(SupportedKeywords.INITIAL_CATALOG) 525 526 @initial_catalog.setter 527 def initial_catalog(self, value: str) -> None: 528 self._internal_dict[SupportedKeywords.INITIAL_CATALOG] = value 529 530 @property 531 def aad_user_id(self) -> Optional[str]: 532 """The username to use for AAD Federated AuthN.""" 533 return self._internal_dict.get(SupportedKeywords.USER_ID) 534 535 @property 536 def application_client_id(self) -> Optional[str]: 537 """The application client id to use for authentication when federated 538 authentication is used. 539 """ 540 return self._internal_dict.get(SupportedKeywords.APPLICATION_CLIENT_ID) 541 542 @property 543 def application_key(self) -> Optional[str]: 544 """The application key to use for authentication when federated authentication is used""" 545 return self._internal_dict.get(SupportedKeywords.APPLICATION_KEY) 546 547 @property 548 def application_certificate(self) -> Optional[str]: 549 """A PEM encoded certificate private key.""" 550 return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_BLOB) 551 552 @application_certificate.setter 553 def application_certificate(self, value: str): 554 self[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = value 555 556 @property 557 def application_certificate_thumbprint(self) -> Optional[str]: 558 """hex encoded thumbprint of the certificate.""" 559 return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT) 560 561 @application_certificate_thumbprint.setter 562 def application_certificate_thumbprint(self, value: str): 563 self[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = value 564 565 @property 566 def authority_id(self) -> Optional[str]: 567 """The ID of the AAD tenant where the application is configured. 568 (should be supplied only for non-Microsoft tenant)""" 569 return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID) 570 571 @authority_id.setter 572 def authority_id(self, value: str): 573 self[SupportedKeywords.AUTHORITY_ID] = value 574 575 @property 576 def aad_federated_security(self) -> Optional[bool]: 577 """A Boolean value that instructs the client to perform AAD federated authentication.""" 578 return self._internal_dict.get(SupportedKeywords.FEDERATED_SECURITY) 579 580 @property 581 def user_token(self) -> Optional[str]: 582 """User token.""" 583 return self._internal_dict.get(SupportedKeywords.USER_TOKEN) 584 585 @property 586 def application_token(self) -> Optional[str]: 587 """Application token.""" 588 return self._internal_dict.get(SupportedKeywords.APPLICATION_TOKEN) 589 590 @property 591 def client_details(self) -> ClientDetails: 592 return ClientDetails(self.application_for_tracing, self.user_name_for_tracing) 593 594 @property 595 def login_hint(self) -> Optional[str]: 596 return self._internal_dict.get(SupportedKeywords.USER_ID) 597 598 @property 599 def domain_hint(self) -> Optional[str]: 600 return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID) 601 602 @property 603 def password(self) -> Optional[str]: 604 return self._internal_dict.get(SupportedKeywords.PASSWORD) 605 606 def _set_connector_details( 607 self, 608 name: str, 609 version: str, 610 app_name: Optional[str] = None, 611 app_version: Optional[str] = None, 612 send_user: bool = False, 613 override_user: Optional[str] = None, 614 additional_fields: Optional[List[Tuple[str, str]]] = None, 615 ): 616 """ 617 Sets the connector details for tracing purposes. 618 :param name: The name of the connector 619 :param version: The version of the connector 620 :param send_user: Whether to send the user name 621 :param override_user: Override the user name ( if send_user is True ) 622 :param app_name: The name of the containing application 623 :param app_version: The version of the containing application 624 :param additional_fields: Additional fields to add to the header 625 """ 626 client_details = ClientDetails.set_connector_details(name, version, app_name, app_version, send_user, override_user, additional_fields) 627 628 self.application_for_tracing = client_details.application_for_tracing 629 self.user_name_for_tracing = client_details.user_name_for_tracing 630 631 def __str__(self) -> str: 632 dict_copy = self._internal_dict.copy() 633 for key in dict_copy: 634 if Keyword.lookup(key).secret: 635 dict_copy[key] = "****" 636 return self._build_connection_string(dict_copy) 637 638 def __repr__(self) -> str: 639 return self._build_connection_string(self._internal_dict) 640 641 def _build_connection_string(self, kcsb_as_dict: dict) -> str: 642 return ";".join(["{0}={1}".format(word.value, kcsb_as_dict[word]) for word in SupportedKeywords if word in kcsb_as_dict]) 643 644 def _parse_data_source(self, url: str): 645 url = urlparse(url) 646 if not url.netloc: 647 return 648 segments = url.path.lstrip("/").split("/") 649 if len(segments) == 1 and segments[0] and not self.initial_catalog: 650 self.initial_catalog = segments[0] 651 self._internal_dict[SupportedKeywords.DATA_SOURCE] = url._replace(path="").geturl()
Parses Kusto connection strings. For usages, check out the sample at: https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py
145 def __init__(self, connection_string: str): 146 """ 147 Creates new KustoConnectionStringBuilder. 148 :param str connection_string: Kusto connection string should be of the format: 149 https://<clusterName>.kusto.windows.net;AAD User ID="user@microsoft.com";Password=P@ssWord 150 For more information please look at: 151 https://kusto.azurewebsites.net/docs/concepts/kusto_connection_strings.html 152 """ 153 assert_string_is_not_empty(connection_string) 154 self._internal_dict = {} 155 156 if connection_string is not None and "=" not in connection_string.partition(";")[0]: 157 connection_string = "Data Source=" + connection_string 158 159 self[SupportedKeywords.AUTHORITY_ID] = "organizations" 160 161 for kvp_string in connection_string.split(";"): 162 key, _, value = kvp_string.partition("=") 163 keyword = Keyword.parse(key) 164 165 value_stripped = value.strip() 166 if keyword.is_str_type(): 167 if keyword.name == SupportedKeywords.DATA_SOURCE: 168 self[keyword.name] = value_stripped.rstrip("/") 169 self._parse_data_source(self.data_source) 170 elif keyword.name == SupportedKeywords.TRACE_USER_NAME: 171 self.user_name_for_tracing = value_stripped 172 elif keyword.name == SupportedKeywords.TRACE_APP_NAME: 173 self.application_for_tracing = value_stripped 174 else: 175 self[keyword.name] = value_stripped 176 elif keyword.is_bool_type(): 177 if value_stripped in ["True", "true"]: 178 self[keyword.name] = True 179 elif value_stripped in ["False", "false"]: 180 self[keyword.name] = False 181 else: 182 raise KeyError("Expected aad federated security to be bool. Recieved %s" % value) 183 184 if self.initial_catalog is None: 185 self.initial_catalog = self.DEFAULT_DATABASE_NAME
Creates new KustoConnectionStringBuilder.
Parameters
- str connection_string: Kusto connection string should be of the format:
https://
.kusto.windows.net;AAD User ID="user@microsoft.com";Password=P@ssWord For more information please look at: https://kusto.azurewebsites.net/docs/concepts/kusto_connection_strings.html
202 @classmethod 203 def with_aad_user_password_authentication( 204 cls, connection_string: str, user_id: str, password: str, authority_id: str = "organizations" 205 ) -> "KustoConnectionStringBuilder": 206 """ 207 Creates a KustoConnection string builder that will authenticate with AAD user name and 208 password. 209 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 210 :param str user_id: AAD user ID. 211 :param str password: Corresponding password of the AAD user. 212 :param str authority_id: optional param. defaults to "organizations" 213 """ 214 assert_string_is_not_empty(user_id) 215 assert_string_is_not_empty(password) 216 217 kcsb = cls(connection_string) 218 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 219 kcsb[SupportedKeywords.USER_ID] = user_id 220 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 221 kcsb[SupportedKeywords.PASSWORD] = password 222 223 return kcsb
Creates a KustoConnection string builder that will authenticate with AAD user name and password.
Parameters
- str connection_string: Kusto connection string should be of the format: https: //
.kusto.windows.net - str user_id: AAD user ID.
- str password: Corresponding password of the AAD user.
- str authority_id: optional param. defaults to "organizations"
225 @classmethod 226 def with_aad_user_token_authentication(cls, connection_string: str, user_token: str) -> "KustoConnectionStringBuilder": 227 """ 228 Creates a KustoConnection string builder that will authenticate with AAD application and 229 a certificate credentials. 230 :param str connection_string: Kusto connection string should be of the format: 231 https://<clusterName>.kusto.windows.net 232 :param str user_token: AAD user token. 233 """ 234 assert_string_is_not_empty(user_token) 235 236 kcsb = cls(connection_string) 237 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 238 kcsb[SupportedKeywords.USER_TOKEN] = user_token 239 240 return kcsb
Creates a KustoConnection string builder that will authenticate with AAD application and a certificate credentials.
Parameters
- str connection_string: Kusto connection string should be of the format:
https://
.kusto.windows.net - str user_token: AAD user token.
242 @classmethod 243 def with_aad_application_key_authentication( 244 cls, connection_string: str, aad_app_id: str, app_key: str, authority_id: str 245 ) -> "KustoConnectionStringBuilder": 246 """ 247 Creates a KustoConnection string builder that will authenticate with AAD application and key. 248 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 249 :param str aad_app_id: AAD application ID. 250 :param str app_key: Corresponding key of the AAD application. 251 :param str authority_id: Authority id (aka Tenant id) must be provided 252 """ 253 assert_string_is_not_empty(aad_app_id) 254 assert_string_is_not_empty(app_key) 255 assert_string_is_not_empty(authority_id) 256 257 kcsb = cls(connection_string) 258 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 259 kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id 260 kcsb[SupportedKeywords.APPLICATION_KEY] = app_key 261 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 262 263 return kcsb
Creates a KustoConnection string builder that will authenticate with AAD application and key.
Parameters
- str connection_string: Kusto connection string should be of the format: https: //
.kusto.windows.net - str aad_app_id: AAD application ID.
- str app_key: Corresponding key of the AAD application.
- str authority_id: Authority id (aka Tenant id) must be provided
265 @classmethod 266 def with_aad_application_certificate_authentication( 267 cls, connection_string: str, aad_app_id: str, certificate: str, thumbprint: str, authority_id: str 268 ) -> "KustoConnectionStringBuilder": 269 """ 270 Creates a KustoConnection string builder that will authenticate with AAD application using 271 a certificate. 272 :param str connection_string: Kusto connection string should be of the format: 273 https://<clusterName>.kusto.windows.net 274 :param str aad_app_id: AAD application ID. 275 :param str certificate: A PEM encoded certificate private key. 276 :param str thumbprint: hex encoded thumbprint of the certificate. 277 :param str authority_id: Authority id (aka Tenant id) must be provided 278 """ 279 assert_string_is_not_empty(aad_app_id) 280 assert_string_is_not_empty(certificate) 281 assert_string_is_not_empty(thumbprint) 282 assert_string_is_not_empty(authority_id) 283 284 kcsb = cls(connection_string) 285 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 286 kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id 287 kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = certificate 288 kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint 289 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 290 291 return kcsb
Creates a KustoConnection string builder that will authenticate with AAD application using a certificate.
Parameters
- str connection_string: Kusto connection string should be of the format:
https://
.kusto.windows.net - str aad_app_id: AAD application ID.
- str certificate: A PEM encoded certificate private key.
- str thumbprint: hex encoded thumbprint of the certificate.
- str authority_id: Authority id (aka Tenant id) must be provided
293 @classmethod 294 def with_aad_application_certificate_sni_authentication( 295 cls, connection_string: str, aad_app_id: str, private_certificate: str, public_certificate: str, thumbprint: str, authority_id: str 296 ) -> "KustoConnectionStringBuilder": 297 """ 298 Creates a KustoConnection string builder that will authenticate with AAD application using 299 a certificate Subject Name and Issuer. 300 :param str connection_string: Kusto connection string should be of the format: 301 https://<clusterName>.kusto.windows.net 302 :param str aad_app_id: AAD application ID. 303 :param str private_certificate: A PEM encoded certificate private key. 304 :param str public_certificate: A public certificate matching the provided PEM certificate private key. 305 :param str thumbprint: hex encoded thumbprint of the certificate. 306 :param str authority_id: Authority id (aka Tenant id) must be provided 307 """ 308 assert_string_is_not_empty(aad_app_id) 309 assert_string_is_not_empty(private_certificate) 310 assert_string_is_not_empty(public_certificate) 311 assert_string_is_not_empty(thumbprint) 312 assert_string_is_not_empty(authority_id) 313 314 kcsb = cls(connection_string) 315 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 316 kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id 317 kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = private_certificate 318 kcsb.application_public_certificate = public_certificate 319 kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint 320 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 321 322 return kcsb
Creates a KustoConnection string builder that will authenticate with AAD application using a certificate Subject Name and Issuer.
Parameters
- str connection_string: Kusto connection string should be of the format:
https://
.kusto.windows.net - str aad_app_id: AAD application ID.
- str private_certificate: A PEM encoded certificate private key.
- str public_certificate: A public certificate matching the provided PEM certificate private key.
- str thumbprint: hex encoded thumbprint of the certificate.
- str authority_id: Authority id (aka Tenant id) must be provided
324 @classmethod 325 def with_aad_application_token_authentication(cls, connection_string: str, application_token: str) -> "KustoConnectionStringBuilder": 326 """ 327 Creates a KustoConnection string builder that will authenticate with AAD application and 328 an application token. 329 :param str connection_string: Kusto connection string should be of the format: 330 https://<clusterName>.kusto.windows.net 331 :param str application_token: AAD application token. 332 """ 333 assert_string_is_not_empty(application_token) 334 kcsb = cls(connection_string) 335 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 336 kcsb[SupportedKeywords.APPLICATION_TOKEN] = application_token 337 338 return kcsb
Creates a KustoConnection string builder that will authenticate with AAD application and an application token.
Parameters
- str connection_string: Kusto connection string should be of the format:
https://
.kusto.windows.net - str application_token: AAD application token.
340 @classmethod 341 def with_aad_device_authentication( 342 cls, connection_string: str, authority_id: str = "organizations", callback: DeviceCallbackType = None 343 ) -> "KustoConnectionStringBuilder": 344 """ 345 Creates a KustoConnection string builder that will authenticate with AAD application and 346 password. 347 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 348 :param str authority_id: optional param. defaults to "organizations" 349 :param DeviceCallbackType callback: options callback function to be called when authentication is required, accepts three parameters: 350 - ``verification_uri`` (str) the URL the user must visit 351 - ``user_code`` (str) the code the user must enter there 352 - ``expires_on`` (datetime.datetime) the UTC time at which the code will expire 353 """ 354 kcsb = cls(connection_string) 355 kcsb.device_login = True 356 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 357 kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id 358 kcsb.device_callback = callback 359 360 return kcsb
Creates a KustoConnection string builder that will authenticate with AAD application and password.
Parameters
- str connection_string: Kusto connection string should be of the format: https: //
.kusto.windows.net - str authority_id: optional param. defaults to "organizations"
- DeviceCallbackType callback: options callback function to be called when authentication is required, accepts three parameters:
verification_uri(str) the URL the user must visituser_code(str) the code the user must enter thereexpires_on(datetime.datetime) the UTC time at which the code will expire
362 @classmethod 363 def with_az_cli_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder": 364 """ 365 Creates a KustoConnection string builder that will use existing authenticated az cli profile 366 password. 367 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 368 """ 369 kcsb = cls(connection_string) 370 kcsb.az_cli_login = True 371 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 372 373 return kcsb
Creates a KustoConnection string builder that will use existing authenticated az cli profile password.
Parameters
- str connection_string: Kusto connection string should be of the format: https: //
.kusto.windows.net
375 @classmethod 376 def with_aad_managed_service_identity_authentication( 377 cls, connection_string: str, client_id: str = None, object_id: str = None, msi_res_id: str = None, timeout: int = None 378 ) -> "KustoConnectionStringBuilder": 379 """ 380 Creates a KustoConnection string builder that will authenticate with AAD application, using 381 an application token obtained from a Microsoft Service Identity endpoint. An optional user 382 assigned application ID can be added to the token. 383 384 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 385 :param client_id: an optional user assigned identity provided as an Azure ID of a client 386 :param object_id: an optional user assigned identity provided as an Azure ID of an object 387 :param msi_res_id: an optional user assigned identity provided as an Azure ID of an MSI resource 388 :param timeout: an optional timeout (seconds) to wait for an MSI Authentication to occur 389 """ 390 391 kcsb = cls(connection_string) 392 params = {} 393 exclusive_pcount = 0 394 395 if timeout is not None: 396 params["connection_timeout"] = timeout 397 398 if client_id is not None: 399 params["client_id"] = client_id 400 exclusive_pcount += 1 401 402 if object_id is not None: 403 # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity 404 raise ValueError("User Managed Service Identity with object_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead.") 405 # noinspection PyUnreachableCode 406 params["object_id"] = object_id 407 exclusive_pcount += 1 408 409 if msi_res_id is not None: 410 # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity 411 raise ValueError( 412 "User Managed Service Identity with msi_res_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead." 413 ) 414 # noinspection PyUnreachableCode 415 params["msi_res_id"] = msi_res_id 416 exclusive_pcount += 1 417 418 if exclusive_pcount > 1: 419 raise ValueError("the following parameters are mutually exclusive and can not be provided at the same time: client_uid, object_id, msi_res_id") 420 421 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 422 kcsb.msi_authentication = True 423 kcsb.msi_parameters = params 424 425 return kcsb
Creates a KustoConnection string builder that will authenticate with AAD application, using an application token obtained from a Microsoft Service Identity endpoint. An optional user assigned application ID can be added to the token.
Parameters
- str connection_string: Kusto connection string should be of the format: https: //
.kusto.windows.net - client_id: an optional user assigned identity provided as an Azure ID of a client
- object_id: an optional user assigned identity provided as an Azure ID of an object
- msi_res_id: an optional user assigned identity provided as an Azure ID of an MSI resource
- timeout: an optional timeout (seconds) to wait for an MSI Authentication to occur
427 @classmethod 428 def with_token_provider(cls, connection_string: str, token_provider: Callable[[], str]) -> "KustoConnectionStringBuilder": 429 """ 430 Create a KustoConnectionStringBuilder that uses a callback function to obtain a connection token 431 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 432 :param token_provider: a parameterless function that returns a valid bearer token for the relevant kusto resource as a string 433 """ 434 435 assert callable(token_provider) 436 437 kcsb = cls(connection_string) 438 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 439 kcsb.token_provider = token_provider 440 441 return kcsb
Create a KustoConnectionStringBuilder that uses a callback function to obtain a connection token
Parameters
- str connection_string: Kusto connection string should be of the format: https: //
.kusto.windows.net - token_provider: a parameterless function that returns a valid bearer token for the relevant kusto resource as a string
443 @classmethod 444 def with_async_token_provider( 445 cls, 446 connection_string: str, 447 async_token_provider: Callable[[], Coroutine[None, None, str]], 448 ) -> "KustoConnectionStringBuilder": 449 """ 450 Create a KustoConnectionStringBuilder that uses an async callback function to obtain a connection token 451 :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 452 :param async_token_provider: a parameterless function that after awaiting returns a valid bearer token for the relevant kusto resource as a string 453 """ 454 455 assert callable(async_token_provider) 456 457 kcsb = cls(connection_string) 458 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 459 kcsb.async_token_provider = async_token_provider 460 461 return kcsb
Create a KustoConnectionStringBuilder that uses an async callback function to obtain a connection token
Parameters
- str connection_string: Kusto connection string should be of the format: https: //
.kusto.windows.net - async_token_provider: a parameterless function that after awaiting returns a valid bearer token for the relevant kusto resource as a string
463 @classmethod 464 def with_interactive_login( 465 cls, connection_string: str, user_id_hint: Optional[str] = None, tenant_hint: Optional[str] = None 466 ) -> "KustoConnectionStringBuilder": 467 kcsb = cls(connection_string) 468 kcsb.interactive_login = True 469 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 470 if user_id_hint is not None: 471 kcsb[SupportedKeywords.USER_ID] = user_id_hint 472 473 if tenant_hint is not None: 474 kcsb[SupportedKeywords.AUTHORITY_ID] = tenant_hint 475 476 return kcsb
478 @classmethod 479 def with_azure_token_credential( 480 cls, 481 connection_string: str, 482 credential: Optional[Any] = None, 483 credential_from_login_endpoint: Optional[Callable[[str], Any]] = None, 484 ) -> "KustoConnectionStringBuilder": 485 """ 486 Create a KustoConnectionStringBuilder that uses an azure token credential to obtain a connection token. 487 :param connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net 488 :param credential: an optional token credential to use for authentication 489 :param credential_from_login_endpoint: an optional function that returns a token credential for the relevant kusto resource 490 """ 491 kcsb = cls(connection_string) 492 kcsb[SupportedKeywords.FEDERATED_SECURITY] = True 493 kcsb.token_credential_login = True 494 kcsb.azure_credential = credential 495 kcsb.azure_credential_from_login_endpoint = credential_from_login_endpoint 496 497 return kcsb
Create a KustoConnectionStringBuilder that uses an azure token credential to obtain a connection token.
Parameters
- connection_string: Kusto connection string should be of the format: https: //
.kusto.windows.net - credential: an optional token credential to use for authentication
- credential_from_login_endpoint: an optional function that returns a token credential for the relevant kusto resource
499 @classmethod 500 def with_no_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder": 501 """ 502 Create a KustoConnectionStringBuilder that uses no authentication. 503 :param connection_string: Kusto's connection string should be of the format: http://<clusterName>.kusto.windows.net 504 """ 505 if not connection_string.startswith("http://"): 506 raise ValueError("Connection string must start with http://") 507 kcsb = cls(connection_string) 508 kcsb[SupportedKeywords.FEDERATED_SECURITY] = False 509 510 return kcsb
Create a KustoConnectionStringBuilder that uses no authentication.
Parameters
- connection_string: Kusto's connection string should be of the format: http: //
.kusto.windows.net
512 @property 513 def data_source(self) -> Optional[str]: 514 """The URI specifying the Kusto service endpoint. 515 For example, https://kuskus.kusto.windows.net or net.tcp://localhost 516 """ 517 return self._internal_dict.get(SupportedKeywords.DATA_SOURCE)
The URI specifying the Kusto service endpoint. For example, https://kuskus.kusto.windows.net or net.tcp://localhost
519 @property 520 def initial_catalog(self) -> Optional[str]: 521 """The default database to be used for requests. 522 By default, it is set to 'NetDefaultDB'. 523 """ 524 return self._internal_dict.get(SupportedKeywords.INITIAL_CATALOG)
The default database to be used for requests. By default, it is set to 'NetDefaultDB'.
530 @property 531 def aad_user_id(self) -> Optional[str]: 532 """The username to use for AAD Federated AuthN.""" 533 return self._internal_dict.get(SupportedKeywords.USER_ID)
The username to use for AAD Federated AuthN.
535 @property 536 def application_client_id(self) -> Optional[str]: 537 """The application client id to use for authentication when federated 538 authentication is used. 539 """ 540 return self._internal_dict.get(SupportedKeywords.APPLICATION_CLIENT_ID)
The application client id to use for authentication when federated authentication is used.
542 @property 543 def application_key(self) -> Optional[str]: 544 """The application key to use for authentication when federated authentication is used""" 545 return self._internal_dict.get(SupportedKeywords.APPLICATION_KEY)
The application key to use for authentication when federated authentication is used
547 @property 548 def application_certificate(self) -> Optional[str]: 549 """A PEM encoded certificate private key.""" 550 return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_BLOB)
A PEM encoded certificate private key.
556 @property 557 def application_certificate_thumbprint(self) -> Optional[str]: 558 """hex encoded thumbprint of the certificate.""" 559 return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT)
hex encoded thumbprint of the certificate.
575 @property 576 def aad_federated_security(self) -> Optional[bool]: 577 """A Boolean value that instructs the client to perform AAD federated authentication.""" 578 return self._internal_dict.get(SupportedKeywords.FEDERATED_SECURITY)
A Boolean value that instructs the client to perform AAD federated authentication.
580 @property 581 def user_token(self) -> Optional[str]: 582 """User token.""" 583 return self._internal_dict.get(SupportedKeywords.USER_TOKEN)
User token.
585 @property 586 def application_token(self) -> Optional[str]: 587 """Application token.""" 588 return self._internal_dict.get(SupportedKeywords.APPLICATION_TOKEN)
Application token.
19class DataFormat(Enum): 20 """All data formats supported by Kusto.""" 21 22 CSV = ("csv", IngestionMappingKind.CSV, True) 23 TSV = ("tsv", IngestionMappingKind.CSV, True) 24 SCSV = ("scsv", IngestionMappingKind.CSV, True) 25 SOHSV = ("sohsv", IngestionMappingKind.CSV, True) 26 PSV = ("psv", IngestionMappingKind.CSV, True) 27 TXT = ("txt", IngestionMappingKind.CSV, True) 28 TSVE = ("tsve", IngestionMappingKind.CSV, True) 29 JSON = ("json", IngestionMappingKind.JSON, True) 30 SINGLEJSON = ("singlejson", IngestionMappingKind.JSON, True) 31 MULTIJSON = ("multijson", IngestionMappingKind.JSON, True) 32 AVRO = ("avro", IngestionMappingKind.AVRO, False) 33 APACHEAVRO = ("apacheavro", IngestionMappingKind.APACHEAVRO, False) 34 PARQUET = ("parquet", IngestionMappingKind.PARQUET, False) 35 SSTREAM = ("sstream", IngestionMappingKind.SSTREAM, False) 36 ORC = ("orc", IngestionMappingKind.ORC, False) 37 RAW = ("raw", IngestionMappingKind.CSV, True) 38 W3CLOGFILE = ("w3clogfile", IngestionMappingKind.W3CLOGFILE, True) 39 40 def __init__(self, kusto_value: str, ingestion_mapping_kind: IngestionMappingKind, compressible: bool): 41 self.kusto_value = kusto_value # Formatted how Kusto Service expects it 42 self.ingestion_mapping_kind = ingestion_mapping_kind 43 self.compressible = compressible # Binary formats should not be compressed
All data formats supported by Kusto.