data

 1# Copyright (c) Microsoft Corporation.
 2# Licensed under the MIT License.
 3
 4from ._version import VERSION as __version__
 5from .client import KustoClient
 6from .client_request_properties import ClientRequestProperties
 7from .kcsb import KustoConnectionStringBuilder
 8from .data_format import DataFormat
 9
10__all__ = [
11    "__version__",
12    "KustoClient",
13    "ClientRequestProperties",
14    "KustoConnectionStringBuilder",
15    "DataFormat",
16]
__version__ = '6.0.3'
class KustoClient(data.client_base._KustoClientBase):
 49class KustoClient(_KustoClientBase):
 50    """
 51    Kusto client for Python.
 52    The client is a wrapper around the Kusto REST API.
 53    To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/
 54
 55    The primary methods are:
 56    `execute_query`:  executes a KQL query against the Kusto service.
 57    `execute_mgmt`: executes a KQL control command against the Kusto service.
 58    """
 59
 60    _mgmt_default_timeout = timedelta(hours=1)
 61    _query_default_timeout = timedelta(minutes=4)
 62    _streaming_ingest_default_timeout = timedelta(minutes=10)
 63    _client_server_delta = timedelta(seconds=30)
 64
 65    # The maximum amount of connections to be able to operate in parallel
 66    _max_pool_size = 100
 67
 68    def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str]):
 69        """
 70        Kusto Client constructor.
 71        :param kcsb: The connection string to initialize KustoClient.
 72        :type kcsb: azure.kusto.data.KustoConnectionStringBuilder or str
 73        """
 74        super().__init__(kcsb, False)
 75
 76        # Create a session object for connection pooling
 77        self._session = requests.Session()
 78
 79        adapter = HTTPAdapterWithSocketOptions(
 80            socket_options=(HTTPConnection.default_socket_options or []) + self.compose_socket_options(), pool_maxsize=self._max_pool_size
 81        )
 82        self._session.mount("http://", adapter)
 83        self._session.mount("https://", adapter)
 84
 85    def close(self):
 86        if not self._is_closed:
 87            self._session.close()
 88            if self._aad_helper:
 89                self._aad_helper.close()
 90        super().close()
 91
 92    def __enter__(self):
 93        return self
 94
 95    def __exit__(self, exc_type, exc_val, exc_tb):
 96        self.close()
 97
 98    def set_proxy(self, proxy_url: str):
 99        super().set_proxy(proxy_url)
100        self._session.proxies = {"http": proxy_url, "https": proxy_url}
101
102    def set_http_retries(self, max_retries: int):
103        """
104        Set the number of HTTP retries to attempt
105        """
106        adapter = HTTPAdapterWithSocketOptions(
107            socket_options=(HTTPConnection.default_socket_options or []) + self.compose_socket_options(),
108            pool_maxsize=self._max_pool_size,
109            max_retries=max_retries,
110        )
111        self._session.mount("http://", adapter)
112        self._session.mount("https://", adapter)
113
114    @staticmethod
115    def compose_socket_options() -> List[Tuple[int, int, int]]:
116        # Sends TCP Keep-Alive after MAX_IDLE_SECONDS seconds of idleness, once every INTERVAL_SECONDS seconds, and closes the connection after MAX_FAILED_KEEPALIVES failed pings (e.g. 20 => 1:00:30)
117        MAX_IDLE_SECONDS = 30
118        INTERVAL_SECONDS = 180  # Corresponds to Azure Load Balancer Service 4 minute timeout, with 1 minute of slack
119        MAX_FAILED_KEEPALIVES = 20
120
121        if (
122            sys.platform == "linux"
123            and hasattr(socket, "SOL_SOCKET")
124            and hasattr(socket, "SO_KEEPALIVE")
125            and hasattr(socket, "TCP_KEEPIDLE")
126            and hasattr(socket, "TCP_KEEPINTVL")
127            and hasattr(socket, "TCP_KEEPCNT")
128        ):
129            return [
130                (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
131                (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, MAX_IDLE_SECONDS),
132                (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, INTERVAL_SECONDS),
133                (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, MAX_FAILED_KEEPALIVES),
134            ]
135        elif (
136            sys.platform == "win32"
137            and hasattr(socket, "SOL_SOCKET")
138            and hasattr(socket, "SO_KEEPALIVE")
139            and hasattr(socket, "TCP_KEEPIDLE")
140            and hasattr(socket, "TCP_KEEPCNT")
141        ):
142            return [
143                (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
144                (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, MAX_IDLE_SECONDS),
145                (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, MAX_FAILED_KEEPALIVES),
146            ]
147        elif sys.platform == "darwin" and hasattr(socket, "SOL_SOCKET") and hasattr(socket, "SO_KEEPALIVE") and hasattr(socket, "IPPROTO_TCP"):
148            TCP_KEEPALIVE = 0x10
149            return [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), (socket.IPPROTO_TCP, TCP_KEEPALIVE, INTERVAL_SECONDS)]
150        else:
151            return []
152
153    def execute(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
154        """
155        Executes a query or management command.
156        :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
157        :param str query: Query to be executed.
158        :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
159        :return: Kusto response data set.
160        :rtype: azure.kusto.data.response.KustoResponseDataSet
161        """
162        query = query.strip()
163        if query.startswith("."):
164            return self.execute_mgmt(database, query, properties)
165        return self.execute_query(database, query, properties)
166
167    @distributed_trace(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT)
168    def execute_query(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
169        """
170        Execute a KQL query.
171        To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/
172        :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
173        :param str query: Query to be executed.
174        :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
175        :return: Kusto response data set.
176        :rtype: azure.kusto.data.response.KustoResponseDataSet
177        """
178        database = self._get_database_or_default(database)
179        Span.set_query_attributes(self._kusto_cluster, database, properties)
180        request = ExecuteRequestParams._from_query(
181            query,
182            database,
183            properties,
184            self._request_headers,
185            self._query_default_timeout,
186            self._mgmt_default_timeout,
187            self._client_server_delta,
188            self.client_details,
189        )
190        return self._execute(self._query_endpoint, request, properties)
191
192    @distributed_trace(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT)
193    def execute_mgmt(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
194        """
195        Execute a KQL control command.
196        To learn more about KQL control commands go to  https://docs.microsoft.com/en-us/azure/kusto/management/
197        :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
198        :param str query: Query to be executed.
199        :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
200        :return: Kusto response data set.
201        :rtype: azure.kusto.data.response.KustoResponseDataSet
202        """
203        database = self._get_database_or_default(database)
204        Span.set_query_attributes(self._kusto_cluster, database, properties)
205        request = ExecuteRequestParams._from_query(
206            query,
207            database,
208            properties,
209            self._request_headers,
210            self._mgmt_default_timeout,
211            self._mgmt_default_timeout,
212            self._client_server_delta,
213            self.client_details,
214        )
215        return self._execute(self._mgmt_endpoint, request, properties)
216
217    @distributed_trace(name_of_span="KustoClient.streaming_ingest", kind=SpanKind.CLIENT)
218    def execute_streaming_ingest(
219        self,
220        database: Optional[str],
221        table: str,
222        stream: Optional[IO[AnyStr]],
223        blob_url: Optional[str],
224        stream_format: Union[DataFormat, str],
225        properties: Optional[ClientRequestProperties] = None,
226        mapping_name: str = None,
227    ):
228        """
229        Execute streaming ingest against this client
230        If the Kusto service is not configured to allow streaming ingestion, this may raise an error
231        To learn more about streaming ingestion go to:
232        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming
233        :param Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string
234        :param str table: Target table.
235        :param Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest.
236        :param Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream.
237        :param DataFormat stream_format: Format of the data in the stream.
238        :param ClientRequestProperties properties: additional request properties.
239        :param str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
240        """
241        database = self._get_database_or_default(database)
242
243        stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
244        endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
245        if mapping_name is not None:
246            endpoint = endpoint + "&mappingName=" + mapping_name
247        if blob_url:
248            endpoint += "&sourceKind=uri"
249            request = ExecuteRequestParams._from_blob_url(
250                blob_url,
251                properties,
252                self._request_headers,
253                self._streaming_ingest_default_timeout,
254                self._mgmt_default_timeout,
255                self._client_server_delta,
256                self.client_details,
257            )
258        elif stream:
259            request = ExecuteRequestParams._from_stream(
260                stream,
261                properties,
262                self._request_headers,
263                self._streaming_ingest_default_timeout,
264                self._mgmt_default_timeout,
265                self._client_server_delta,
266                self.client_details,
267            )
268        else:
269            raise Exception("execute_streaming_ingest is expecting either a stream or blob url")
270
271        Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
272        self._execute(endpoint, request, properties)
273
274    def _execute_streaming_query_parsed(
275        self,
276        database: Optional[str],
277        query: str,
278        timeout: timedelta = _KustoClientBase._query_default_timeout,
279        properties: Optional[ClientRequestProperties] = None,
280    ) -> StreamingDataSetEnumerator:
281        request = ExecuteRequestParams._from_query(
282            query, database, properties, self._request_headers, timeout, self._mgmt_default_timeout, self._client_server_delta, self.client_details
283        )
284        response = self._execute(self._query_endpoint, request, properties, stream_response=True)
285        response.raw.decode_content = True
286        return StreamingDataSetEnumerator(JsonTokenReader(response.raw))
287
288    @distributed_trace(name_of_span="KustoClient.streaming_query", kind=SpanKind.CLIENT)
289    def execute_streaming_query(
290        self,
291        database: Optional[str],
292        query: str,
293        timeout: timedelta = _KustoClientBase._query_default_timeout,
294        properties: Optional[ClientRequestProperties] = None,
295    ) -> KustoStreamingResponseDataSet:
296        """
297        Execute a KQL query without reading it all to memory.
298        The resulting KustoStreamingResponseDataSet will stream one table at a time, and the rows can be retrieved sequentially.
299
300        :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
301        :param str query: Query to be executed.
302        :param timedelta timeout: timeout for the query to be executed
303        :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
304        :return KustoStreamingResponseDataSet:
305        """
306        Span.set_query_attributes(self._kusto_cluster, database, properties)
307
308        return KustoStreamingResponseDataSet(self._execute_streaming_query_parsed(database, query, timeout, properties))
309
310    def _execute(
311        self,
312        endpoint: str,
313        request: ExecuteRequestParams,
314        properties: Optional[ClientRequestProperties] = None,
315        stream_response: bool = False,
316    ) -> Union[KustoResponseDataSet, Response]:
317        """Executes given query against this client"""
318        if self._is_closed:
319            raise KustoClosedError()
320        self.validate_endpoint()
321
322        request_headers = request.request_headers
323        if self._aad_helper:
324            request_headers["Authorization"] = self._aad_helper.acquire_authorization_header()
325
326        # trace http post call for response
327        invoker = lambda: self._session.post(
328            endpoint,
329            headers=request_headers,
330            json=request.json_payload,
331            data=request.payload,
332            timeout=request.timeout.seconds,
333            stream=stream_response,
334            allow_redirects=False,
335        )
336
337        try:
338            response = MonitoredActivity.invoke(
339                invoker, name_of_span="KustoClient.http_post", tracing_attributes=Span.create_http_attributes("POST", endpoint, request_headers)
340            )
341        except Exception as e:
342            raise KustoNetworkError(endpoint, None if properties is None else properties.client_request_id) from e
343
344        if stream_response:
345            try:
346                response.raise_for_status()
347                if 300 <= response.status_code < 400:
348                    raise Exception("Unexpected redirection, got status code: " + str(response.status))
349                return response
350            except Exception as e:
351                raise self._handle_http_error(e, self._query_endpoint, None, response, response.status_code, response.json(), response.text)
352
353        response_json = None
354        try:
355            if 300 <= response.status_code < 400:
356                raise Exception("Unexpected redirection, got status code: " + str(response.status))
357            if response.text:
358                response_json = response.json()
359            else:
360                raise KustoServiceError("The content of the response contains no data.", response)
361            response.raise_for_status()
362        except Exception as e:
363            raise self._handle_http_error(e, endpoint, request.payload, response, response.status_code, response_json, response.text)
364        # trace response processing
365        return MonitoredActivity.invoke(lambda: self._kusto_parse_by_endpoint(endpoint, response_json), name_of_span="KustoClient.processing_response")

Kusto client for Python. The client is a wrapper around the Kusto REST API. To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/

The primary methods are: execute_query: executes a KQL query against the Kusto service. execute_mgmt: executes a KQL control command against the Kusto service.

KustoClient(kcsb: Union[KustoConnectionStringBuilder, str])
68    def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str]):
69        """
70        Kusto Client constructor.
71        :param kcsb: The connection string to initialize KustoClient.
72        :type kcsb: azure.kusto.data.KustoConnectionStringBuilder or str
73        """
74        super().__init__(kcsb, False)
75
76        # Create a session object for connection pooling
77        self._session = requests.Session()
78
79        adapter = HTTPAdapterWithSocketOptions(
80            socket_options=(HTTPConnection.default_socket_options or []) + self.compose_socket_options(), pool_maxsize=self._max_pool_size
81        )
82        self._session.mount("http://", adapter)
83        self._session.mount("https://", adapter)

Kusto Client constructor.

Parameters
  • kcsb: The connection string to initialize KustoClient.
def close(self):
85    def close(self):
86        if not self._is_closed:
87            self._session.close()
88            if self._aad_helper:
89                self._aad_helper.close()
90        super().close()
def set_proxy(self, proxy_url: str):
 98    def set_proxy(self, proxy_url: str):
 99        super().set_proxy(proxy_url)
100        self._session.proxies = {"http": proxy_url, "https": proxy_url}
def set_http_retries(self, max_retries: int):
102    def set_http_retries(self, max_retries: int):
103        """
104        Set the number of HTTP retries to attempt
105        """
106        adapter = HTTPAdapterWithSocketOptions(
107            socket_options=(HTTPConnection.default_socket_options or []) + self.compose_socket_options(),
108            pool_maxsize=self._max_pool_size,
109            max_retries=max_retries,
110        )
111        self._session.mount("http://", adapter)
112        self._session.mount("https://", adapter)

Set the number of HTTP retries to attempt

@staticmethod
def compose_socket_options() -> List[Tuple[int, int, int]]:
114    @staticmethod
115    def compose_socket_options() -> List[Tuple[int, int, int]]:
116        # Sends TCP Keep-Alive after MAX_IDLE_SECONDS seconds of idleness, once every INTERVAL_SECONDS seconds, and closes the connection after MAX_FAILED_KEEPALIVES failed pings (e.g. 20 => 1:00:30)
117        MAX_IDLE_SECONDS = 30
118        INTERVAL_SECONDS = 180  # Corresponds to Azure Load Balancer Service 4 minute timeout, with 1 minute of slack
119        MAX_FAILED_KEEPALIVES = 20
120
121        if (
122            sys.platform == "linux"
123            and hasattr(socket, "SOL_SOCKET")
124            and hasattr(socket, "SO_KEEPALIVE")
125            and hasattr(socket, "TCP_KEEPIDLE")
126            and hasattr(socket, "TCP_KEEPINTVL")
127            and hasattr(socket, "TCP_KEEPCNT")
128        ):
129            return [
130                (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
131                (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, MAX_IDLE_SECONDS),
132                (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, INTERVAL_SECONDS),
133                (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, MAX_FAILED_KEEPALIVES),
134            ]
135        elif (
136            sys.platform == "win32"
137            and hasattr(socket, "SOL_SOCKET")
138            and hasattr(socket, "SO_KEEPALIVE")
139            and hasattr(socket, "TCP_KEEPIDLE")
140            and hasattr(socket, "TCP_KEEPCNT")
141        ):
142            return [
143                (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
144                (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, MAX_IDLE_SECONDS),
145                (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, MAX_FAILED_KEEPALIVES),
146            ]
147        elif sys.platform == "darwin" and hasattr(socket, "SOL_SOCKET") and hasattr(socket, "SO_KEEPALIVE") and hasattr(socket, "IPPROTO_TCP"):
148            TCP_KEEPALIVE = 0x10
149            return [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), (socket.IPPROTO_TCP, TCP_KEEPALIVE, INTERVAL_SECONDS)]
150        else:
151            return []
def execute( self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> data.response.KustoResponseDataSet:
153    def execute(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
154        """
155        Executes a query or management command.
156        :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
157        :param str query: Query to be executed.
158        :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
159        :return: Kusto response data set.
160        :rtype: azure.kusto.data.response.KustoResponseDataSet
161        """
162        query = query.strip()
163        if query.startswith("."):
164            return self.execute_mgmt(database, query, properties)
165        return self.execute_query(database, query, properties)

Executes a query or management command.

Parameters
  • Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
  • str query: Query to be executed.
  • azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns

Kusto response data set.

@distributed_trace(name_of_span='KustoClient.query_cmd', kind=SpanKind.CLIENT)
def execute_query( self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> data.response.KustoResponseDataSet:
167    @distributed_trace(name_of_span="KustoClient.query_cmd", kind=SpanKind.CLIENT)
168    def execute_query(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
169        """
170        Execute a KQL query.
171        To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/
172        :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
173        :param str query: Query to be executed.
174        :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
175        :return: Kusto response data set.
176        :rtype: azure.kusto.data.response.KustoResponseDataSet
177        """
178        database = self._get_database_or_default(database)
179        Span.set_query_attributes(self._kusto_cluster, database, properties)
180        request = ExecuteRequestParams._from_query(
181            query,
182            database,
183            properties,
184            self._request_headers,
185            self._query_default_timeout,
186            self._mgmt_default_timeout,
187            self._client_server_delta,
188            self.client_details,
189        )
190        return self._execute(self._query_endpoint, request, properties)

Execute a KQL query. To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/

Parameters
  • Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
  • str query: Query to be executed.
  • azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns

Kusto response data set.

@distributed_trace(name_of_span='KustoClient.control_cmd', kind=SpanKind.CLIENT)
def execute_mgmt( self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> data.response.KustoResponseDataSet:
192    @distributed_trace(name_of_span="KustoClient.control_cmd", kind=SpanKind.CLIENT)
193    def execute_mgmt(self, database: Optional[str], query: str, properties: Optional[ClientRequestProperties] = None) -> KustoResponseDataSet:
194        """
195        Execute a KQL control command.
196        To learn more about KQL control commands go to  https://docs.microsoft.com/en-us/azure/kusto/management/
197        :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
198        :param str query: Query to be executed.
199        :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
200        :return: Kusto response data set.
201        :rtype: azure.kusto.data.response.KustoResponseDataSet
202        """
203        database = self._get_database_or_default(database)
204        Span.set_query_attributes(self._kusto_cluster, database, properties)
205        request = ExecuteRequestParams._from_query(
206            query,
207            database,
208            properties,
209            self._request_headers,
210            self._mgmt_default_timeout,
211            self._mgmt_default_timeout,
212            self._client_server_delta,
213            self.client_details,
214        )
215        return self._execute(self._mgmt_endpoint, request, properties)

Execute a KQL control command. To learn more about KQL control commands go to https://docs.microsoft.com/en-us/azure/kusto/management/

Parameters
  • Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
  • str query: Query to be executed.
  • azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns

Kusto response data set.

@distributed_trace(name_of_span='KustoClient.streaming_ingest', kind=SpanKind.CLIENT)
def execute_streaming_ingest( self, database: Optional[str], table: str, stream: Optional[IO[~AnyStr]], blob_url: Optional[str], stream_format: Union[DataFormat, str], properties: Optional[ClientRequestProperties] = None, mapping_name: str = None):
217    @distributed_trace(name_of_span="KustoClient.streaming_ingest", kind=SpanKind.CLIENT)
218    def execute_streaming_ingest(
219        self,
220        database: Optional[str],
221        table: str,
222        stream: Optional[IO[AnyStr]],
223        blob_url: Optional[str],
224        stream_format: Union[DataFormat, str],
225        properties: Optional[ClientRequestProperties] = None,
226        mapping_name: str = None,
227    ):
228        """
229        Execute streaming ingest against this client
230        If the Kusto service is not configured to allow streaming ingestion, this may raise an error
231        To learn more about streaming ingestion go to:
232        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming
233        :param Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string
234        :param str table: Target table.
235        :param Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest.
236        :param Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream.
237        :param DataFormat stream_format: Format of the data in the stream.
238        :param ClientRequestProperties properties: additional request properties.
239        :param str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
240        """
241        database = self._get_database_or_default(database)
242
243        stream_format = stream_format.kusto_value if isinstance(stream_format, DataFormat) else DataFormat[stream_format.upper()].kusto_value
244        endpoint = self._streaming_ingest_endpoint + database + "/" + table + "?streamFormat=" + stream_format
245        if mapping_name is not None:
246            endpoint = endpoint + "&mappingName=" + mapping_name
247        if blob_url:
248            endpoint += "&sourceKind=uri"
249            request = ExecuteRequestParams._from_blob_url(
250                blob_url,
251                properties,
252                self._request_headers,
253                self._streaming_ingest_default_timeout,
254                self._mgmt_default_timeout,
255                self._client_server_delta,
256                self.client_details,
257            )
258        elif stream:
259            request = ExecuteRequestParams._from_stream(
260                stream,
261                properties,
262                self._request_headers,
263                self._streaming_ingest_default_timeout,
264                self._mgmt_default_timeout,
265                self._client_server_delta,
266                self.client_details,
267            )
268        else:
269            raise Exception("execute_streaming_ingest is expecting either a stream or blob url")
270
271        Span.set_streaming_ingest_attributes(self._kusto_cluster, database, table, properties)
272        self._execute(endpoint, request, properties)

Execute streaming ingest against this client If the Kusto service is not configured to allow streaming ingestion, this may raise an error To learn more about streaming ingestion go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming

Parameters
  • Optional[str] database: Target database. If not provided, will default to the "Initial Catalog" value in the connection string
  • str table: Target table.
  • Optional[IO[AnyStr]] stream: a stream object or which contains the data to ingest.
  • Optional[str] blob_url: An url to a blob which contains the data to ingest. Provide either this or stream.
  • DataFormat stream_format: Format of the data in the stream.
  • ClientRequestProperties properties: additional request properties.
  • str mapping_name: Pre-defined mapping of the table. Required when stream_format is json/avro.
@distributed_trace(name_of_span='KustoClient.streaming_query', kind=SpanKind.CLIENT)
def execute_streaming_query( self, database: Optional[str], query: str, timeout: datetime.timedelta = datetime.timedelta(seconds=270), properties: Optional[ClientRequestProperties] = None) -> data.response.KustoStreamingResponseDataSet:
288    @distributed_trace(name_of_span="KustoClient.streaming_query", kind=SpanKind.CLIENT)
289    def execute_streaming_query(
290        self,
291        database: Optional[str],
292        query: str,
293        timeout: timedelta = _KustoClientBase._query_default_timeout,
294        properties: Optional[ClientRequestProperties] = None,
295    ) -> KustoStreamingResponseDataSet:
296        """
297        Execute a KQL query without reading it all to memory.
298        The resulting KustoStreamingResponseDataSet will stream one table at a time, and the rows can be retrieved sequentially.
299
300        :param Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
301        :param str query: Query to be executed.
302        :param timedelta timeout: timeout for the query to be executed
303        :param azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
304        :return KustoStreamingResponseDataSet:
305        """
306        Span.set_query_attributes(self._kusto_cluster, database, properties)
307
308        return KustoStreamingResponseDataSet(self._execute_streaming_query_parsed(database, query, timeout, properties))

Execute a KQL query without reading it all to memory. The resulting KustoStreamingResponseDataSet will stream one table at a time, and the rows can be retrieved sequentially.

Parameters
  • Optional[str] database: Database against query will be executed. If not provided, will default to the "Initial Catalog" value in the connection string
  • str query: Query to be executed.
  • timedelta timeout: timeout for the query to be executed
  • azure.kusto.data.ClientRequestProperties properties: Optional additional properties.
Returns
class ClientRequestProperties:
 8class ClientRequestProperties:
 9    """This class is a POD used by client making requests to describe specific needs from the service executing the requests.
10    For more information please look at: https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties
11    """
12
13    client_request_id: str
14    application: str
15    user: str
16    _CLIENT_REQUEST_ID = "client_request_id"
17
18    results_defer_partial_query_failures_option_name = "deferpartialqueryfailures"
19    request_timeout_option_name = "servertimeout"
20    no_request_timeout_option_name = "norequesttimeout"
21
22    def __init__(self):
23        self._options = {}
24        self._parameters = {}
25        self.client_request_id = None
26        self.application = None
27        self.user = None
28
29    def set_parameter(self, name: str, value: str):
30        """Sets a parameter's value"""
31        assert_string_is_not_empty(name)
32        self._parameters[name] = value
33
34    def has_parameter(self, name: str) -> bool:
35        """Checks if a parameter is specified."""
36        return name in self._parameters
37
38    def get_parameter(self, name: str, default_value: str) -> str:
39        """Gets a parameter's value."""
40        return self._parameters.get(name, default_value)
41
42    def set_option(self, name: str, value: Any):
43        """Sets an option's value"""
44        assert_string_is_not_empty(name)
45        self._options[name] = value
46
47    def has_option(self, name: str) -> bool:
48        """Checks if an option is specified."""
49        return name in self._options
50
51    def get_option(self, name: str, default_value: Any) -> str:
52        """Gets an option's value."""
53        return self._options.get(name, default_value)
54
55    def to_json(self) -> str:
56        """Safe serialization to a JSON string."""
57        return json.dumps({"Options": self._options, "Parameters": self._parameters}, default=str)
58
59    def get_tracing_attributes(self) -> dict:
60        """Gets dictionary of attributes to be documented during tracing"""
61        return {self._CLIENT_REQUEST_ID: str(self.client_request_id)}

This class is a POD used by client making requests to describe specific needs from the service executing the requests. For more information please look at: https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties

client_request_id: str
application: str
user: str
results_defer_partial_query_failures_option_name = 'deferpartialqueryfailures'
request_timeout_option_name = 'servertimeout'
no_request_timeout_option_name = 'norequesttimeout'
def set_parameter(self, name: str, value: str):
29    def set_parameter(self, name: str, value: str):
30        """Sets a parameter's value"""
31        assert_string_is_not_empty(name)
32        self._parameters[name] = value

Sets a parameter's value

def has_parameter(self, name: str) -> bool:
34    def has_parameter(self, name: str) -> bool:
35        """Checks if a parameter is specified."""
36        return name in self._parameters

Checks if a parameter is specified.

def get_parameter(self, name: str, default_value: str) -> str:
38    def get_parameter(self, name: str, default_value: str) -> str:
39        """Gets a parameter's value."""
40        return self._parameters.get(name, default_value)

Gets a parameter's value.

def set_option(self, name: str, value: Any):
42    def set_option(self, name: str, value: Any):
43        """Sets an option's value"""
44        assert_string_is_not_empty(name)
45        self._options[name] = value

Sets an option's value

def has_option(self, name: str) -> bool:
47    def has_option(self, name: str) -> bool:
48        """Checks if an option is specified."""
49        return name in self._options

Checks if an option is specified.

def get_option(self, name: str, default_value: Any) -> str:
51    def get_option(self, name: str, default_value: Any) -> str:
52        """Gets an option's value."""
53        return self._options.get(name, default_value)

Gets an option's value.

def to_json(self) -> str:
55    def to_json(self) -> str:
56        """Safe serialization to a JSON string."""
57        return json.dumps({"Options": self._options, "Parameters": self._parameters}, default=str)

Safe serialization to a JSON string.

def get_tracing_attributes(self) -> dict:
59    def get_tracing_attributes(self) -> dict:
60        """Gets dictionary of attributes to be documented during tracing"""
61        return {self._CLIENT_REQUEST_ID: str(self.client_request_id)}

Gets dictionary of attributes to be documented during tracing

class KustoConnectionStringBuilder:
116class KustoConnectionStringBuilder:
117    """
118    Parses Kusto connection strings.
119    For usages, check out the sample at:
120        https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py
121    """
122
123    DEFAULT_DATABASE_NAME = "NetDefaultDB"
124
125    interactive_login: bool = False
126    az_cli_login: bool = False
127    device_login: bool = False
128    token_credential_login: bool = False
129
130    device_callback: DeviceCallbackType = None
131    msi_authentication: bool = False
132    msi_parameters: Optional[dict] = None
133
134    token_provider: Optional[Callable[[], str]] = None
135    async_token_provider: Optional[Callable[[], Coroutine[None, None, str]]] = None
136
137    application_for_tracing: Optional[str] = None
138    user_name_for_tracing: Optional[str] = None
139
140    azure_credential: Optional[Any] = None
141    azure_credential_from_login_endpoint: Optional[Any] = None
142
143    application_public_certificate: Optional[str] = None
144
145    def __init__(self, connection_string: str):
146        """
147        Creates new KustoConnectionStringBuilder.
148        :param str connection_string: Kusto connection string should be of the format:
149        https://<clusterName>.kusto.windows.net;AAD User ID="user@microsoft.com";Password=P@ssWord
150        For more information please look at:
151        https://kusto.azurewebsites.net/docs/concepts/kusto_connection_strings.html
152        """
153        assert_string_is_not_empty(connection_string)
154        self._internal_dict = {}
155
156        if connection_string is not None and "=" not in connection_string.partition(";")[0]:
157            connection_string = "Data Source=" + connection_string
158
159        self[SupportedKeywords.AUTHORITY_ID] = "organizations"
160
161        for kvp_string in connection_string.split(";"):
162            key, _, value = kvp_string.partition("=")
163            keyword = Keyword.parse(key)
164
165            value_stripped = value.strip()
166            if keyword.is_str_type():
167                if keyword.name == SupportedKeywords.DATA_SOURCE:
168                    self[keyword.name] = value_stripped.rstrip("/")
169                    self._parse_data_source(self.data_source)
170                elif keyword.name == SupportedKeywords.TRACE_USER_NAME:
171                    self.user_name_for_tracing = value_stripped
172                elif keyword.name == SupportedKeywords.TRACE_APP_NAME:
173                    self.application_for_tracing = value_stripped
174                else:
175                    self[keyword.name] = value_stripped
176            elif keyword.is_bool_type():
177                if value_stripped in ["True", "true"]:
178                    self[keyword.name] = True
179                elif value_stripped in ["False", "false"]:
180                    self[keyword.name] = False
181                else:
182                    raise KeyError("Expected aad federated security to be bool. Recieved %s" % value)
183
184        if self.initial_catalog is None:
185            self.initial_catalog = self.DEFAULT_DATABASE_NAME
186
187    def __setitem__(self, key: "Union[SupportedKeywords, str]", value: Union[str, bool, dict]):
188        keyword = Keyword.parse(key)
189
190        if value is None:
191            raise TypeError("Value cannot be None.")
192
193        if keyword.is_str_type():
194            self._internal_dict[keyword.name] = value.strip()
195        elif keyword.is_bool_type():
196            if not isinstance(value, bool):
197                raise TypeError("Expected %s to be bool" % key)
198            self._internal_dict[keyword.name] = value
199        else:
200            raise KeyError("KustoConnectionStringBuilder supports only bools and strings.")
201
202    @classmethod
203    def with_aad_user_password_authentication(
204        cls, connection_string: str, user_id: str, password: str, authority_id: str = "organizations"
205    ) -> "KustoConnectionStringBuilder":
206        """
207        Creates a KustoConnection string builder that will authenticate with AAD user name and
208        password.
209        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
210        :param str user_id: AAD user ID.
211        :param str password: Corresponding password of the AAD user.
212        :param str authority_id: optional param. defaults to "organizations"
213        """
214        assert_string_is_not_empty(user_id)
215        assert_string_is_not_empty(password)
216
217        kcsb = cls(connection_string)
218        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
219        kcsb[SupportedKeywords.USER_ID] = user_id
220        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
221        kcsb[SupportedKeywords.PASSWORD] = password
222
223        return kcsb
224
225    @classmethod
226    def with_aad_user_token_authentication(cls, connection_string: str, user_token: str) -> "KustoConnectionStringBuilder":
227        """
228        Creates a KustoConnection string builder that will authenticate with AAD application and
229        a certificate credentials.
230        :param str connection_string: Kusto connection string should be of the format:
231        https://<clusterName>.kusto.windows.net
232        :param str user_token: AAD user token.
233        """
234        assert_string_is_not_empty(user_token)
235
236        kcsb = cls(connection_string)
237        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
238        kcsb[SupportedKeywords.USER_TOKEN] = user_token
239
240        return kcsb
241
242    @classmethod
243    def with_aad_application_key_authentication(
244        cls, connection_string: str, aad_app_id: str, app_key: str, authority_id: str
245    ) -> "KustoConnectionStringBuilder":
246        """
247        Creates a KustoConnection string builder that will authenticate with AAD application and key.
248        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
249        :param str aad_app_id: AAD application ID.
250        :param str app_key: Corresponding key of the AAD application.
251        :param str authority_id: Authority id (aka Tenant id) must be provided
252        """
253        assert_string_is_not_empty(aad_app_id)
254        assert_string_is_not_empty(app_key)
255        assert_string_is_not_empty(authority_id)
256
257        kcsb = cls(connection_string)
258        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
259        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
260        kcsb[SupportedKeywords.APPLICATION_KEY] = app_key
261        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
262
263        return kcsb
264
265    @classmethod
266    def with_aad_application_certificate_authentication(
267        cls, connection_string: str, aad_app_id: str, certificate: str, thumbprint: str, authority_id: str
268    ) -> "KustoConnectionStringBuilder":
269        """
270        Creates a KustoConnection string builder that will authenticate with AAD application using
271        a certificate.
272        :param str connection_string: Kusto connection string should be of the format:
273        https://<clusterName>.kusto.windows.net
274        :param str aad_app_id: AAD application ID.
275        :param str certificate: A PEM encoded certificate private key.
276        :param str thumbprint: hex encoded thumbprint of the certificate.
277        :param str authority_id: Authority id (aka Tenant id) must be provided
278        """
279        assert_string_is_not_empty(aad_app_id)
280        assert_string_is_not_empty(certificate)
281        assert_string_is_not_empty(thumbprint)
282        assert_string_is_not_empty(authority_id)
283
284        kcsb = cls(connection_string)
285        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
286        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
287        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = certificate
288        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint
289        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
290
291        return kcsb
292
293    @classmethod
294    def with_aad_application_certificate_sni_authentication(
295        cls, connection_string: str, aad_app_id: str, private_certificate: str, public_certificate: str, thumbprint: str, authority_id: str
296    ) -> "KustoConnectionStringBuilder":
297        """
298        Creates a KustoConnection string builder that will authenticate with AAD application using
299        a certificate Subject Name and Issuer.
300        :param str connection_string: Kusto connection string should be of the format:
301        https://<clusterName>.kusto.windows.net
302        :param str aad_app_id: AAD application ID.
303        :param str private_certificate: A PEM encoded certificate private key.
304        :param str public_certificate: A public certificate matching the provided PEM certificate private key.
305        :param str thumbprint: hex encoded thumbprint of the certificate.
306        :param str authority_id: Authority id (aka Tenant id) must be provided
307        """
308        assert_string_is_not_empty(aad_app_id)
309        assert_string_is_not_empty(private_certificate)
310        assert_string_is_not_empty(public_certificate)
311        assert_string_is_not_empty(thumbprint)
312        assert_string_is_not_empty(authority_id)
313
314        kcsb = cls(connection_string)
315        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
316        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
317        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = private_certificate
318        kcsb.application_public_certificate = public_certificate
319        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint
320        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
321
322        return kcsb
323
324    @classmethod
325    def with_aad_application_token_authentication(cls, connection_string: str, application_token: str) -> "KustoConnectionStringBuilder":
326        """
327        Creates a KustoConnection string builder that will authenticate with AAD application and
328        an application token.
329        :param str connection_string: Kusto connection string should be of the format:
330        https://<clusterName>.kusto.windows.net
331        :param str application_token: AAD application token.
332        """
333        assert_string_is_not_empty(application_token)
334        kcsb = cls(connection_string)
335        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
336        kcsb[SupportedKeywords.APPLICATION_TOKEN] = application_token
337
338        return kcsb
339
340    @classmethod
341    def with_aad_device_authentication(
342        cls, connection_string: str, authority_id: str = "organizations", callback: DeviceCallbackType = None
343    ) -> "KustoConnectionStringBuilder":
344        """
345        Creates a KustoConnection string builder that will authenticate with AAD application and
346        password.
347        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
348        :param str authority_id: optional param. defaults to "organizations"
349        :param DeviceCallbackType callback: options callback function to be called when authentication is required, accepts three parameters:
350                - ``verification_uri`` (str) the URL the user must visit
351                - ``user_code`` (str) the code the user must enter there
352                - ``expires_on`` (datetime.datetime) the UTC time at which the code will expire
353        """
354        kcsb = cls(connection_string)
355        kcsb.device_login = True
356        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
357        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
358        kcsb.device_callback = callback
359
360        return kcsb
361
362    @classmethod
363    def with_az_cli_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder":
364        """
365        Creates a KustoConnection string builder that will use existing authenticated az cli profile
366        password.
367        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
368        """
369        kcsb = cls(connection_string)
370        kcsb.az_cli_login = True
371        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
372
373        return kcsb
374
375    @classmethod
376    def with_aad_managed_service_identity_authentication(
377        cls, connection_string: str, client_id: str = None, object_id: str = None, msi_res_id: str = None, timeout: int = None
378    ) -> "KustoConnectionStringBuilder":
379        """
380        Creates a KustoConnection string builder that will authenticate with AAD application, using
381        an application token obtained from a Microsoft Service Identity endpoint. An optional user
382        assigned application ID can be added to the token.
383
384        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
385        :param client_id: an optional user assigned identity provided as an Azure ID of a client
386        :param object_id: an optional user assigned identity provided as an Azure ID of an object
387        :param msi_res_id: an optional user assigned identity provided as an Azure ID of an MSI resource
388        :param timeout: an optional timeout (seconds) to wait for an MSI Authentication to occur
389        """
390
391        kcsb = cls(connection_string)
392        params = {}
393        exclusive_pcount = 0
394
395        if timeout is not None:
396            params["connection_timeout"] = timeout
397
398        if client_id is not None:
399            params["client_id"] = client_id
400            exclusive_pcount += 1
401
402        if object_id is not None:
403            # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
404            raise ValueError("User Managed Service Identity with object_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead.")
405            # noinspection PyUnreachableCode
406            params["object_id"] = object_id
407            exclusive_pcount += 1
408
409        if msi_res_id is not None:
410            # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
411            raise ValueError(
412                "User Managed Service Identity with msi_res_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead."
413            )
414            # noinspection PyUnreachableCode
415            params["msi_res_id"] = msi_res_id
416            exclusive_pcount += 1
417
418        if exclusive_pcount > 1:
419            raise ValueError("the following parameters are mutually exclusive and can not be provided at the same time: client_uid, object_id, msi_res_id")
420
421        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
422        kcsb.msi_authentication = True
423        kcsb.msi_parameters = params
424
425        return kcsb
426
427    @classmethod
428    def with_token_provider(cls, connection_string: str, token_provider: Callable[[], str]) -> "KustoConnectionStringBuilder":
429        """
430        Create a KustoConnectionStringBuilder that uses a callback function to obtain a connection token
431        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
432        :param token_provider: a parameterless function that returns a valid bearer token for the relevant kusto resource as a string
433        """
434
435        assert callable(token_provider)
436
437        kcsb = cls(connection_string)
438        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
439        kcsb.token_provider = token_provider
440
441        return kcsb
442
443    @classmethod
444    def with_async_token_provider(
445        cls,
446        connection_string: str,
447        async_token_provider: Callable[[], Coroutine[None, None, str]],
448    ) -> "KustoConnectionStringBuilder":
449        """
450        Create a KustoConnectionStringBuilder that uses an async callback function to obtain a connection token
451        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
452        :param async_token_provider: a parameterless function that after awaiting returns a valid bearer token for the relevant kusto resource as a string
453        """
454
455        assert callable(async_token_provider)
456
457        kcsb = cls(connection_string)
458        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
459        kcsb.async_token_provider = async_token_provider
460
461        return kcsb
462
463    @classmethod
464    def with_interactive_login(
465        cls, connection_string: str, user_id_hint: Optional[str] = None, tenant_hint: Optional[str] = None
466    ) -> "KustoConnectionStringBuilder":
467        kcsb = cls(connection_string)
468        kcsb.interactive_login = True
469        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
470        if user_id_hint is not None:
471            kcsb[SupportedKeywords.USER_ID] = user_id_hint
472
473        if tenant_hint is not None:
474            kcsb[SupportedKeywords.AUTHORITY_ID] = tenant_hint
475
476        return kcsb
477
478    @classmethod
479    def with_azure_token_credential(
480        cls,
481        connection_string: str,
482        credential: Optional[Any] = None,
483        credential_from_login_endpoint: Optional[Callable[[str], Any]] = None,
484    ) -> "KustoConnectionStringBuilder":
485        """
486        Create a KustoConnectionStringBuilder that uses an azure token credential to obtain a connection token.
487        :param connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
488        :param credential: an optional token credential to use for authentication
489        :param credential_from_login_endpoint: an optional function that returns a token credential for the relevant kusto resource
490        """
491        kcsb = cls(connection_string)
492        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
493        kcsb.token_credential_login = True
494        kcsb.azure_credential = credential
495        kcsb.azure_credential_from_login_endpoint = credential_from_login_endpoint
496
497        return kcsb
498
499    @classmethod
500    def with_no_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder":
501        """
502        Create a KustoConnectionStringBuilder that uses no authentication.
503        :param connection_string: Kusto's connection string should be of the format: http://<clusterName>.kusto.windows.net
504        """
505        if not connection_string.startswith("http://"):
506            raise ValueError("Connection string must start with http://")
507        kcsb = cls(connection_string)
508        kcsb[SupportedKeywords.FEDERATED_SECURITY] = False
509
510        return kcsb
511
512    @property
513    def data_source(self) -> Optional[str]:
514        """The URI specifying the Kusto service endpoint.
515        For example, https://kuskus.kusto.windows.net or net.tcp://localhost
516        """
517        return self._internal_dict.get(SupportedKeywords.DATA_SOURCE)
518
519    @property
520    def initial_catalog(self) -> Optional[str]:
521        """The default database to be used for requests.
522        By default, it is set to 'NetDefaultDB'.
523        """
524        return self._internal_dict.get(SupportedKeywords.INITIAL_CATALOG)
525
526    @initial_catalog.setter
527    def initial_catalog(self, value: str) -> None:
528        self._internal_dict[SupportedKeywords.INITIAL_CATALOG] = value
529
530    @property
531    def aad_user_id(self) -> Optional[str]:
532        """The username to use for AAD Federated AuthN."""
533        return self._internal_dict.get(SupportedKeywords.USER_ID)
534
535    @property
536    def application_client_id(self) -> Optional[str]:
537        """The application client id to use for authentication when federated
538        authentication is used.
539        """
540        return self._internal_dict.get(SupportedKeywords.APPLICATION_CLIENT_ID)
541
542    @property
543    def application_key(self) -> Optional[str]:
544        """The application key to use for authentication when federated authentication is used"""
545        return self._internal_dict.get(SupportedKeywords.APPLICATION_KEY)
546
547    @property
548    def application_certificate(self) -> Optional[str]:
549        """A PEM encoded certificate private key."""
550        return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_BLOB)
551
552    @application_certificate.setter
553    def application_certificate(self, value: str):
554        self[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = value
555
556    @property
557    def application_certificate_thumbprint(self) -> Optional[str]:
558        """hex encoded thumbprint of the certificate."""
559        return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT)
560
561    @application_certificate_thumbprint.setter
562    def application_certificate_thumbprint(self, value: str):
563        self[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = value
564
565    @property
566    def authority_id(self) -> Optional[str]:
567        """The ID of the AAD tenant where the application is configured.
568        (should be supplied only for non-Microsoft tenant)"""
569        return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID)
570
571    @authority_id.setter
572    def authority_id(self, value: str):
573        self[SupportedKeywords.AUTHORITY_ID] = value
574
575    @property
576    def aad_federated_security(self) -> Optional[bool]:
577        """A Boolean value that instructs the client to perform AAD federated authentication."""
578        return self._internal_dict.get(SupportedKeywords.FEDERATED_SECURITY)
579
580    @property
581    def user_token(self) -> Optional[str]:
582        """User token."""
583        return self._internal_dict.get(SupportedKeywords.USER_TOKEN)
584
585    @property
586    def application_token(self) -> Optional[str]:
587        """Application token."""
588        return self._internal_dict.get(SupportedKeywords.APPLICATION_TOKEN)
589
590    @property
591    def client_details(self) -> ClientDetails:
592        return ClientDetails(self.application_for_tracing, self.user_name_for_tracing)
593
594    @property
595    def login_hint(self) -> Optional[str]:
596        return self._internal_dict.get(SupportedKeywords.USER_ID)
597
598    @property
599    def domain_hint(self) -> Optional[str]:
600        return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID)
601
602    @property
603    def password(self) -> Optional[str]:
604        return self._internal_dict.get(SupportedKeywords.PASSWORD)
605
606    def _set_connector_details(
607        self,
608        name: str,
609        version: str,
610        app_name: Optional[str] = None,
611        app_version: Optional[str] = None,
612        send_user: bool = False,
613        override_user: Optional[str] = None,
614        additional_fields: Optional[List[Tuple[str, str]]] = None,
615    ):
616        """
617        Sets the connector details for tracing purposes.
618        :param name:  The name of the connector
619        :param version:  The version of the connector
620        :param send_user: Whether to send the user name
621        :param override_user: Override the user name ( if send_user is True )
622        :param app_name: The name of the containing application
623        :param app_version: The version of the containing application
624        :param additional_fields: Additional fields to add to the header
625        """
626        client_details = ClientDetails.set_connector_details(name, version, app_name, app_version, send_user, override_user, additional_fields)
627
628        self.application_for_tracing = client_details.application_for_tracing
629        self.user_name_for_tracing = client_details.user_name_for_tracing
630
631    def __str__(self) -> str:
632        dict_copy = self._internal_dict.copy()
633        for key in dict_copy:
634            if Keyword.lookup(key).secret:
635                dict_copy[key] = "****"
636        return self._build_connection_string(dict_copy)
637
638    def __repr__(self) -> str:
639        return self._build_connection_string(self._internal_dict)
640
641    def _build_connection_string(self, kcsb_as_dict: dict) -> str:
642        return ";".join(["{0}={1}".format(word.value, kcsb_as_dict[word]) for word in SupportedKeywords if word in kcsb_as_dict])
643
644    def _parse_data_source(self, url: str):
645        url = urlparse(url)
646        if not url.netloc:
647            return
648        segments = url.path.lstrip("/").split("/")
649        if len(segments) == 1 and segments[0] and not self.initial_catalog:
650            self.initial_catalog = segments[0]
651            self._internal_dict[SupportedKeywords.DATA_SOURCE] = url._replace(path="").geturl()

Parses Kusto connection strings. For usages, check out the sample at: https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py

KustoConnectionStringBuilder(connection_string: str)
145    def __init__(self, connection_string: str):
146        """
147        Creates new KustoConnectionStringBuilder.
148        :param str connection_string: Kusto connection string should be of the format:
149        https://<clusterName>.kusto.windows.net;AAD User ID="user@microsoft.com";Password=P@ssWord
150        For more information please look at:
151        https://kusto.azurewebsites.net/docs/concepts/kusto_connection_strings.html
152        """
153        assert_string_is_not_empty(connection_string)
154        self._internal_dict = {}
155
156        if connection_string is not None and "=" not in connection_string.partition(";")[0]:
157            connection_string = "Data Source=" + connection_string
158
159        self[SupportedKeywords.AUTHORITY_ID] = "organizations"
160
161        for kvp_string in connection_string.split(";"):
162            key, _, value = kvp_string.partition("=")
163            keyword = Keyword.parse(key)
164
165            value_stripped = value.strip()
166            if keyword.is_str_type():
167                if keyword.name == SupportedKeywords.DATA_SOURCE:
168                    self[keyword.name] = value_stripped.rstrip("/")
169                    self._parse_data_source(self.data_source)
170                elif keyword.name == SupportedKeywords.TRACE_USER_NAME:
171                    self.user_name_for_tracing = value_stripped
172                elif keyword.name == SupportedKeywords.TRACE_APP_NAME:
173                    self.application_for_tracing = value_stripped
174                else:
175                    self[keyword.name] = value_stripped
176            elif keyword.is_bool_type():
177                if value_stripped in ["True", "true"]:
178                    self[keyword.name] = True
179                elif value_stripped in ["False", "false"]:
180                    self[keyword.name] = False
181                else:
182                    raise KeyError("Expected aad federated security to be bool. Recieved %s" % value)
183
184        if self.initial_catalog is None:
185            self.initial_catalog = self.DEFAULT_DATABASE_NAME

Creates new KustoConnectionStringBuilder.

Parameters
DEFAULT_DATABASE_NAME = 'NetDefaultDB'
interactive_login: bool = False
az_cli_login: bool = False
device_login: bool = False
token_credential_login: bool = False
device_callback: Callable[[str, str, datetime.datetime], NoneType] = None
msi_authentication: bool = False
msi_parameters: Optional[dict] = None
token_provider: Optional[Callable[[], str]] = None
async_token_provider: Optional[Callable[[], Coroutine[NoneType, NoneType, str]]] = None
application_for_tracing: Optional[str] = None
user_name_for_tracing: Optional[str] = None
azure_credential: Optional[Any] = None
azure_credential_from_login_endpoint: Optional[Any] = None
application_public_certificate: Optional[str] = None
@classmethod
def with_aad_user_password_authentication( cls, connection_string: str, user_id: str, password: str, authority_id: str = 'organizations') -> KustoConnectionStringBuilder:
202    @classmethod
203    def with_aad_user_password_authentication(
204        cls, connection_string: str, user_id: str, password: str, authority_id: str = "organizations"
205    ) -> "KustoConnectionStringBuilder":
206        """
207        Creates a KustoConnection string builder that will authenticate with AAD user name and
208        password.
209        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
210        :param str user_id: AAD user ID.
211        :param str password: Corresponding password of the AAD user.
212        :param str authority_id: optional param. defaults to "organizations"
213        """
214        assert_string_is_not_empty(user_id)
215        assert_string_is_not_empty(password)
216
217        kcsb = cls(connection_string)
218        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
219        kcsb[SupportedKeywords.USER_ID] = user_id
220        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
221        kcsb[SupportedKeywords.PASSWORD] = password
222
223        return kcsb

Creates a KustoConnection string builder that will authenticate with AAD user name and password.

Parameters
  • str connection_string: Kusto connection string should be of the format: https: //.kusto.windows.net
  • str user_id: AAD user ID.
  • str password: Corresponding password of the AAD user.
  • str authority_id: optional param. defaults to "organizations"
@classmethod
def with_aad_user_token_authentication( cls, connection_string: str, user_token: str) -> KustoConnectionStringBuilder:
225    @classmethod
226    def with_aad_user_token_authentication(cls, connection_string: str, user_token: str) -> "KustoConnectionStringBuilder":
227        """
228        Creates a KustoConnection string builder that will authenticate with AAD application and
229        a certificate credentials.
230        :param str connection_string: Kusto connection string should be of the format:
231        https://<clusterName>.kusto.windows.net
232        :param str user_token: AAD user token.
233        """
234        assert_string_is_not_empty(user_token)
235
236        kcsb = cls(connection_string)
237        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
238        kcsb[SupportedKeywords.USER_TOKEN] = user_token
239
240        return kcsb

Creates a KustoConnection string builder that will authenticate with AAD application and a certificate credentials.

Parameters
  • str connection_string: Kusto connection string should be of the format: https://.kusto.windows.net
  • str user_token: AAD user token.
@classmethod
def with_aad_application_key_authentication( cls, connection_string: str, aad_app_id: str, app_key: str, authority_id: str) -> KustoConnectionStringBuilder:
242    @classmethod
243    def with_aad_application_key_authentication(
244        cls, connection_string: str, aad_app_id: str, app_key: str, authority_id: str
245    ) -> "KustoConnectionStringBuilder":
246        """
247        Creates a KustoConnection string builder that will authenticate with AAD application and key.
248        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
249        :param str aad_app_id: AAD application ID.
250        :param str app_key: Corresponding key of the AAD application.
251        :param str authority_id: Authority id (aka Tenant id) must be provided
252        """
253        assert_string_is_not_empty(aad_app_id)
254        assert_string_is_not_empty(app_key)
255        assert_string_is_not_empty(authority_id)
256
257        kcsb = cls(connection_string)
258        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
259        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
260        kcsb[SupportedKeywords.APPLICATION_KEY] = app_key
261        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
262
263        return kcsb

Creates a KustoConnection string builder that will authenticate with AAD application and key.

Parameters
  • str connection_string: Kusto connection string should be of the format: https: //.kusto.windows.net
  • str aad_app_id: AAD application ID.
  • str app_key: Corresponding key of the AAD application.
  • str authority_id: Authority id (aka Tenant id) must be provided
@classmethod
def with_aad_application_certificate_authentication( cls, connection_string: str, aad_app_id: str, certificate: str, thumbprint: str, authority_id: str) -> KustoConnectionStringBuilder:
265    @classmethod
266    def with_aad_application_certificate_authentication(
267        cls, connection_string: str, aad_app_id: str, certificate: str, thumbprint: str, authority_id: str
268    ) -> "KustoConnectionStringBuilder":
269        """
270        Creates a KustoConnection string builder that will authenticate with AAD application using
271        a certificate.
272        :param str connection_string: Kusto connection string should be of the format:
273        https://<clusterName>.kusto.windows.net
274        :param str aad_app_id: AAD application ID.
275        :param str certificate: A PEM encoded certificate private key.
276        :param str thumbprint: hex encoded thumbprint of the certificate.
277        :param str authority_id: Authority id (aka Tenant id) must be provided
278        """
279        assert_string_is_not_empty(aad_app_id)
280        assert_string_is_not_empty(certificate)
281        assert_string_is_not_empty(thumbprint)
282        assert_string_is_not_empty(authority_id)
283
284        kcsb = cls(connection_string)
285        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
286        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
287        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = certificate
288        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint
289        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
290
291        return kcsb

Creates a KustoConnection string builder that will authenticate with AAD application using a certificate.

Parameters
  • str connection_string: Kusto connection string should be of the format: https://.kusto.windows.net
  • str aad_app_id: AAD application ID.
  • str certificate: A PEM encoded certificate private key.
  • str thumbprint: hex encoded thumbprint of the certificate.
  • str authority_id: Authority id (aka Tenant id) must be provided
@classmethod
def with_aad_application_certificate_sni_authentication( cls, connection_string: str, aad_app_id: str, private_certificate: str, public_certificate: str, thumbprint: str, authority_id: str) -> KustoConnectionStringBuilder:
293    @classmethod
294    def with_aad_application_certificate_sni_authentication(
295        cls, connection_string: str, aad_app_id: str, private_certificate: str, public_certificate: str, thumbprint: str, authority_id: str
296    ) -> "KustoConnectionStringBuilder":
297        """
298        Creates a KustoConnection string builder that will authenticate with AAD application using
299        a certificate Subject Name and Issuer.
300        :param str connection_string: Kusto connection string should be of the format:
301        https://<clusterName>.kusto.windows.net
302        :param str aad_app_id: AAD application ID.
303        :param str private_certificate: A PEM encoded certificate private key.
304        :param str public_certificate: A public certificate matching the provided PEM certificate private key.
305        :param str thumbprint: hex encoded thumbprint of the certificate.
306        :param str authority_id: Authority id (aka Tenant id) must be provided
307        """
308        assert_string_is_not_empty(aad_app_id)
309        assert_string_is_not_empty(private_certificate)
310        assert_string_is_not_empty(public_certificate)
311        assert_string_is_not_empty(thumbprint)
312        assert_string_is_not_empty(authority_id)
313
314        kcsb = cls(connection_string)
315        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
316        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
317        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = private_certificate
318        kcsb.application_public_certificate = public_certificate
319        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint
320        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
321
322        return kcsb

Creates a KustoConnection string builder that will authenticate with AAD application using a certificate Subject Name and Issuer.

Parameters
  • str connection_string: Kusto connection string should be of the format: https://.kusto.windows.net
  • str aad_app_id: AAD application ID.
  • str private_certificate: A PEM encoded certificate private key.
  • str public_certificate: A public certificate matching the provided PEM certificate private key.
  • str thumbprint: hex encoded thumbprint of the certificate.
  • str authority_id: Authority id (aka Tenant id) must be provided
@classmethod
def with_aad_application_token_authentication( cls, connection_string: str, application_token: str) -> KustoConnectionStringBuilder:
324    @classmethod
325    def with_aad_application_token_authentication(cls, connection_string: str, application_token: str) -> "KustoConnectionStringBuilder":
326        """
327        Creates a KustoConnection string builder that will authenticate with AAD application and
328        an application token.
329        :param str connection_string: Kusto connection string should be of the format:
330        https://<clusterName>.kusto.windows.net
331        :param str application_token: AAD application token.
332        """
333        assert_string_is_not_empty(application_token)
334        kcsb = cls(connection_string)
335        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
336        kcsb[SupportedKeywords.APPLICATION_TOKEN] = application_token
337
338        return kcsb

Creates a KustoConnection string builder that will authenticate with AAD application and an application token.

Parameters
  • str connection_string: Kusto connection string should be of the format: https://.kusto.windows.net
  • str application_token: AAD application token.
@classmethod
def with_aad_device_authentication( cls, connection_string: str, authority_id: str = 'organizations', callback: Callable[[str, str, datetime.datetime], NoneType] = None) -> KustoConnectionStringBuilder:
340    @classmethod
341    def with_aad_device_authentication(
342        cls, connection_string: str, authority_id: str = "organizations", callback: DeviceCallbackType = None
343    ) -> "KustoConnectionStringBuilder":
344        """
345        Creates a KustoConnection string builder that will authenticate with AAD application and
346        password.
347        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
348        :param str authority_id: optional param. defaults to "organizations"
349        :param DeviceCallbackType callback: options callback function to be called when authentication is required, accepts three parameters:
350                - ``verification_uri`` (str) the URL the user must visit
351                - ``user_code`` (str) the code the user must enter there
352                - ``expires_on`` (datetime.datetime) the UTC time at which the code will expire
353        """
354        kcsb = cls(connection_string)
355        kcsb.device_login = True
356        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
357        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
358        kcsb.device_callback = callback
359
360        return kcsb

Creates a KustoConnection string builder that will authenticate with AAD application and password.

Parameters
  • str connection_string: Kusto connection string should be of the format: https: //.kusto.windows.net
  • str authority_id: optional param. defaults to "organizations"
  • DeviceCallbackType callback: options callback function to be called when authentication is required, accepts three parameters:
    • verification_uri (str) the URL the user must visit
    • user_code (str) the code the user must enter there
    • expires_on (datetime.datetime) the UTC time at which the code will expire
@classmethod
def with_az_cli_authentication(cls, connection_string: str) -> KustoConnectionStringBuilder:
362    @classmethod
363    def with_az_cli_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder":
364        """
365        Creates a KustoConnection string builder that will use existing authenticated az cli profile
366        password.
367        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
368        """
369        kcsb = cls(connection_string)
370        kcsb.az_cli_login = True
371        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
372
373        return kcsb

Creates a KustoConnection string builder that will use existing authenticated az cli profile password.

Parameters
  • str connection_string: Kusto connection string should be of the format: https: //.kusto.windows.net
@classmethod
def with_aad_managed_service_identity_authentication( cls, connection_string: str, client_id: str = None, object_id: str = None, msi_res_id: str = None, timeout: int = None) -> KustoConnectionStringBuilder:
375    @classmethod
376    def with_aad_managed_service_identity_authentication(
377        cls, connection_string: str, client_id: str = None, object_id: str = None, msi_res_id: str = None, timeout: int = None
378    ) -> "KustoConnectionStringBuilder":
379        """
380        Creates a KustoConnection string builder that will authenticate with AAD application, using
381        an application token obtained from a Microsoft Service Identity endpoint. An optional user
382        assigned application ID can be added to the token.
383
384        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
385        :param client_id: an optional user assigned identity provided as an Azure ID of a client
386        :param object_id: an optional user assigned identity provided as an Azure ID of an object
387        :param msi_res_id: an optional user assigned identity provided as an Azure ID of an MSI resource
388        :param timeout: an optional timeout (seconds) to wait for an MSI Authentication to occur
389        """
390
391        kcsb = cls(connection_string)
392        params = {}
393        exclusive_pcount = 0
394
395        if timeout is not None:
396            params["connection_timeout"] = timeout
397
398        if client_id is not None:
399            params["client_id"] = client_id
400            exclusive_pcount += 1
401
402        if object_id is not None:
403            # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
404            raise ValueError("User Managed Service Identity with object_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead.")
405            # noinspection PyUnreachableCode
406            params["object_id"] = object_id
407            exclusive_pcount += 1
408
409        if msi_res_id is not None:
410            # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
411            raise ValueError(
412                "User Managed Service Identity with msi_res_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead."
413            )
414            # noinspection PyUnreachableCode
415            params["msi_res_id"] = msi_res_id
416            exclusive_pcount += 1
417
418        if exclusive_pcount > 1:
419            raise ValueError("the following parameters are mutually exclusive and can not be provided at the same time: client_uid, object_id, msi_res_id")
420
421        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
422        kcsb.msi_authentication = True
423        kcsb.msi_parameters = params
424
425        return kcsb

Creates a KustoConnection string builder that will authenticate with AAD application, using an application token obtained from a Microsoft Service Identity endpoint. An optional user assigned application ID can be added to the token.

Parameters
  • str connection_string: Kusto connection string should be of the format: https: //.kusto.windows.net
  • client_id: an optional user assigned identity provided as an Azure ID of a client
  • object_id: an optional user assigned identity provided as an Azure ID of an object
  • msi_res_id: an optional user assigned identity provided as an Azure ID of an MSI resource
  • timeout: an optional timeout (seconds) to wait for an MSI Authentication to occur
@classmethod
def with_token_provider( cls, connection_string: str, token_provider: Callable[[], str]) -> KustoConnectionStringBuilder:
427    @classmethod
428    def with_token_provider(cls, connection_string: str, token_provider: Callable[[], str]) -> "KustoConnectionStringBuilder":
429        """
430        Create a KustoConnectionStringBuilder that uses a callback function to obtain a connection token
431        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
432        :param token_provider: a parameterless function that returns a valid bearer token for the relevant kusto resource as a string
433        """
434
435        assert callable(token_provider)
436
437        kcsb = cls(connection_string)
438        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
439        kcsb.token_provider = token_provider
440
441        return kcsb

Create a KustoConnectionStringBuilder that uses a callback function to obtain a connection token

Parameters
  • str connection_string: Kusto connection string should be of the format: https: //.kusto.windows.net
  • token_provider: a parameterless function that returns a valid bearer token for the relevant kusto resource as a string
@classmethod
def with_async_token_provider( cls, connection_string: str, async_token_provider: Callable[[], Coroutine[NoneType, NoneType, str]]) -> KustoConnectionStringBuilder:
443    @classmethod
444    def with_async_token_provider(
445        cls,
446        connection_string: str,
447        async_token_provider: Callable[[], Coroutine[None, None, str]],
448    ) -> "KustoConnectionStringBuilder":
449        """
450        Create a KustoConnectionStringBuilder that uses an async callback function to obtain a connection token
451        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
452        :param async_token_provider: a parameterless function that after awaiting returns a valid bearer token for the relevant kusto resource as a string
453        """
454
455        assert callable(async_token_provider)
456
457        kcsb = cls(connection_string)
458        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
459        kcsb.async_token_provider = async_token_provider
460
461        return kcsb

Create a KustoConnectionStringBuilder that uses an async callback function to obtain a connection token

Parameters
  • str connection_string: Kusto connection string should be of the format: https: //.kusto.windows.net
  • async_token_provider: a parameterless function that after awaiting returns a valid bearer token for the relevant kusto resource as a string
@classmethod
def with_interactive_login( cls, connection_string: str, user_id_hint: Optional[str] = None, tenant_hint: Optional[str] = None) -> KustoConnectionStringBuilder:
463    @classmethod
464    def with_interactive_login(
465        cls, connection_string: str, user_id_hint: Optional[str] = None, tenant_hint: Optional[str] = None
466    ) -> "KustoConnectionStringBuilder":
467        kcsb = cls(connection_string)
468        kcsb.interactive_login = True
469        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
470        if user_id_hint is not None:
471            kcsb[SupportedKeywords.USER_ID] = user_id_hint
472
473        if tenant_hint is not None:
474            kcsb[SupportedKeywords.AUTHORITY_ID] = tenant_hint
475
476        return kcsb
@classmethod
def with_azure_token_credential( cls, connection_string: str, credential: Optional[Any] = None, credential_from_login_endpoint: Optional[Callable[[str], Any]] = None) -> KustoConnectionStringBuilder:
478    @classmethod
479    def with_azure_token_credential(
480        cls,
481        connection_string: str,
482        credential: Optional[Any] = None,
483        credential_from_login_endpoint: Optional[Callable[[str], Any]] = None,
484    ) -> "KustoConnectionStringBuilder":
485        """
486        Create a KustoConnectionStringBuilder that uses an azure token credential to obtain a connection token.
487        :param connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
488        :param credential: an optional token credential to use for authentication
489        :param credential_from_login_endpoint: an optional function that returns a token credential for the relevant kusto resource
490        """
491        kcsb = cls(connection_string)
492        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
493        kcsb.token_credential_login = True
494        kcsb.azure_credential = credential
495        kcsb.azure_credential_from_login_endpoint = credential_from_login_endpoint
496
497        return kcsb

Create a KustoConnectionStringBuilder that uses an azure token credential to obtain a connection token.

Parameters
  • connection_string: Kusto connection string should be of the format: https: //.kusto.windows.net
  • credential: an optional token credential to use for authentication
  • credential_from_login_endpoint: an optional function that returns a token credential for the relevant kusto resource
@classmethod
def with_no_authentication(cls, connection_string: str) -> KustoConnectionStringBuilder:
499    @classmethod
500    def with_no_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder":
501        """
502        Create a KustoConnectionStringBuilder that uses no authentication.
503        :param connection_string: Kusto's connection string should be of the format: http://<clusterName>.kusto.windows.net
504        """
505        if not connection_string.startswith("http://"):
506            raise ValueError("Connection string must start with http://")
507        kcsb = cls(connection_string)
508        kcsb[SupportedKeywords.FEDERATED_SECURITY] = False
509
510        return kcsb

Create a KustoConnectionStringBuilder that uses no authentication.

Parameters
  • connection_string: Kusto's connection string should be of the format: http: //.kusto.windows.net
data_source: Optional[str]
512    @property
513    def data_source(self) -> Optional[str]:
514        """The URI specifying the Kusto service endpoint.
515        For example, https://kuskus.kusto.windows.net or net.tcp://localhost
516        """
517        return self._internal_dict.get(SupportedKeywords.DATA_SOURCE)

The URI specifying the Kusto service endpoint. For example, https://kuskus.kusto.windows.net or net.tcp://localhost

initial_catalog: Optional[str]
519    @property
520    def initial_catalog(self) -> Optional[str]:
521        """The default database to be used for requests.
522        By default, it is set to 'NetDefaultDB'.
523        """
524        return self._internal_dict.get(SupportedKeywords.INITIAL_CATALOG)

The default database to be used for requests. By default, it is set to 'NetDefaultDB'.

aad_user_id: Optional[str]
530    @property
531    def aad_user_id(self) -> Optional[str]:
532        """The username to use for AAD Federated AuthN."""
533        return self._internal_dict.get(SupportedKeywords.USER_ID)

The username to use for AAD Federated AuthN.

application_client_id: Optional[str]
535    @property
536    def application_client_id(self) -> Optional[str]:
537        """The application client id to use for authentication when federated
538        authentication is used.
539        """
540        return self._internal_dict.get(SupportedKeywords.APPLICATION_CLIENT_ID)

The application client id to use for authentication when federated authentication is used.

application_key: Optional[str]
542    @property
543    def application_key(self) -> Optional[str]:
544        """The application key to use for authentication when federated authentication is used"""
545        return self._internal_dict.get(SupportedKeywords.APPLICATION_KEY)

The application key to use for authentication when federated authentication is used

application_certificate: Optional[str]
547    @property
548    def application_certificate(self) -> Optional[str]:
549        """A PEM encoded certificate private key."""
550        return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_BLOB)

A PEM encoded certificate private key.

application_certificate_thumbprint: Optional[str]
556    @property
557    def application_certificate_thumbprint(self) -> Optional[str]:
558        """hex encoded thumbprint of the certificate."""
559        return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT)

hex encoded thumbprint of the certificate.

authority_id: Optional[str]
565    @property
566    def authority_id(self) -> Optional[str]:
567        """The ID of the AAD tenant where the application is configured.
568        (should be supplied only for non-Microsoft tenant)"""
569        return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID)

The ID of the AAD tenant where the application is configured. (should be supplied only for non-Microsoft tenant)

aad_federated_security: Optional[bool]
575    @property
576    def aad_federated_security(self) -> Optional[bool]:
577        """A Boolean value that instructs the client to perform AAD federated authentication."""
578        return self._internal_dict.get(SupportedKeywords.FEDERATED_SECURITY)

A Boolean value that instructs the client to perform AAD federated authentication.

user_token: Optional[str]
580    @property
581    def user_token(self) -> Optional[str]:
582        """User token."""
583        return self._internal_dict.get(SupportedKeywords.USER_TOKEN)

User token.

application_token: Optional[str]
585    @property
586    def application_token(self) -> Optional[str]:
587        """Application token."""
588        return self._internal_dict.get(SupportedKeywords.APPLICATION_TOKEN)

Application token.

client_details: data.client_details.ClientDetails
590    @property
591    def client_details(self) -> ClientDetails:
592        return ClientDetails(self.application_for_tracing, self.user_name_for_tracing)
login_hint: Optional[str]
594    @property
595    def login_hint(self) -> Optional[str]:
596        return self._internal_dict.get(SupportedKeywords.USER_ID)
domain_hint: Optional[str]
598    @property
599    def domain_hint(self) -> Optional[str]:
600        return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID)
password: Optional[str]
602    @property
603    def password(self) -> Optional[str]:
604        return self._internal_dict.get(SupportedKeywords.PASSWORD)
class DataFormat(enum.Enum):
19class DataFormat(Enum):
20    """All data formats supported by Kusto."""
21
22    CSV = ("csv", IngestionMappingKind.CSV, True)
23    TSV = ("tsv", IngestionMappingKind.CSV, True)
24    SCSV = ("scsv", IngestionMappingKind.CSV, True)
25    SOHSV = ("sohsv", IngestionMappingKind.CSV, True)
26    PSV = ("psv", IngestionMappingKind.CSV, True)
27    TXT = ("txt", IngestionMappingKind.CSV, True)
28    TSVE = ("tsve", IngestionMappingKind.CSV, True)
29    JSON = ("json", IngestionMappingKind.JSON, True)
30    SINGLEJSON = ("singlejson", IngestionMappingKind.JSON, True)
31    MULTIJSON = ("multijson", IngestionMappingKind.JSON, True)
32    AVRO = ("avro", IngestionMappingKind.AVRO, False)
33    APACHEAVRO = ("apacheavro", IngestionMappingKind.APACHEAVRO, False)
34    PARQUET = ("parquet", IngestionMappingKind.PARQUET, False)
35    SSTREAM = ("sstream", IngestionMappingKind.SSTREAM, False)
36    ORC = ("orc", IngestionMappingKind.ORC, False)
37    RAW = ("raw", IngestionMappingKind.CSV, True)
38    W3CLOGFILE = ("w3clogfile", IngestionMappingKind.W3CLOGFILE, True)
39
40    def __init__(self, kusto_value: str, ingestion_mapping_kind: IngestionMappingKind, compressible: bool):
41        self.kusto_value = kusto_value  # Formatted how Kusto Service expects it
42        self.ingestion_mapping_kind = ingestion_mapping_kind
43        self.compressible = compressible  # Binary formats should not be compressed

All data formats supported by Kusto.

CSV = <DataFormat.CSV: ('csv', <IngestionMappingKind.CSV: 'Csv'>, True)>
TSV = <DataFormat.TSV: ('tsv', <IngestionMappingKind.CSV: 'Csv'>, True)>
SCSV = <DataFormat.SCSV: ('scsv', <IngestionMappingKind.CSV: 'Csv'>, True)>
SOHSV = <DataFormat.SOHSV: ('sohsv', <IngestionMappingKind.CSV: 'Csv'>, True)>
PSV = <DataFormat.PSV: ('psv', <IngestionMappingKind.CSV: 'Csv'>, True)>
TXT = <DataFormat.TXT: ('txt', <IngestionMappingKind.CSV: 'Csv'>, True)>
TSVE = <DataFormat.TSVE: ('tsve', <IngestionMappingKind.CSV: 'Csv'>, True)>
JSON = <DataFormat.JSON: ('json', <IngestionMappingKind.JSON: 'Json'>, True)>
SINGLEJSON = <DataFormat.SINGLEJSON: ('singlejson', <IngestionMappingKind.JSON: 'Json'>, True)>
MULTIJSON = <DataFormat.MULTIJSON: ('multijson', <IngestionMappingKind.JSON: 'Json'>, True)>
AVRO = <DataFormat.AVRO: ('avro', <IngestionMappingKind.AVRO: 'Avro'>, False)>
APACHEAVRO = <DataFormat.APACHEAVRO: ('apacheavro', <IngestionMappingKind.APACHEAVRO: 'ApacheAvro'>, False)>
PARQUET = <DataFormat.PARQUET: ('parquet', <IngestionMappingKind.PARQUET: 'Parquet'>, False)>
SSTREAM = <DataFormat.SSTREAM: ('sstream', <IngestionMappingKind.SSTREAM: 'SStream'>, False)>
ORC = <DataFormat.ORC: ('orc', <IngestionMappingKind.ORC: 'Orc'>, False)>
RAW = <DataFormat.RAW: ('raw', <IngestionMappingKind.CSV: 'Csv'>, True)>
W3CLOGFILE = <DataFormat.W3CLOGFILE: ('w3clogfile', <IngestionMappingKind.W3CLOGFILE: 'W3CLogFile'>, True)>
kusto_value
ingestion_mapping_kind
compressible