ingest

 1# Copyright (c) Microsoft Corporation.
 2# Licensed under the MIT License
 3from ._version import VERSION as __version__
 4from .base_ingest_client import IngestionResult, IngestionStatus
 5from .descriptors import BlobDescriptor, FileDescriptor, StreamDescriptor
 6from .exceptions import KustoMissingMappingError, KustoMappingError, KustoQueueError, KustoDuplicateMappingError, KustoInvalidEndpointError, KustoClientError
 7from .ingest_client import QueuedIngestClient
 8from .ingestion_properties import (
 9    ValidationPolicy,
10    ValidationImplications,
11    ValidationOptions,
12    ReportLevel,
13    ReportMethod,
14    IngestionProperties,
15    IngestionMappingKind,
16    ColumnMapping,
17    TransformationMethod,
18)
19from .managed_streaming_ingest_client import ManagedStreamingIngestClient
20from .streaming_ingest_client import KustoStreamingIngestClient
21from .base_ingest_client import BaseIngestClient
22
23__all__ = [
24    "__version__",
25    "IngestionResult",
26    "IngestionStatus",
27    "BlobDescriptor",
28    "FileDescriptor",
29    "StreamDescriptor",
30    "KustoMissingMappingError",
31    "KustoMappingError",
32    "KustoQueueError",
33    "KustoDuplicateMappingError",
34    "KustoInvalidEndpointError",
35    "KustoClientError",
36    "QueuedIngestClient",
37    "ValidationPolicy",
38    "ValidationImplications",
39    "ValidationOptions",
40    "ReportLevel",
41    "ReportMethod",
42    "IngestionProperties",
43    "IngestionMappingKind",
44    "ColumnMapping",
45    "TransformationMethod",
46    "ManagedStreamingIngestClient",
47    "KustoStreamingIngestClient",
48    "BaseIngestClient",
49]
__version__ = '6.0.3'
class IngestionResult:
38class IngestionResult:
39    """
40    The result of an ingestion.
41    """
42
43    status: IngestionStatus
44    "Will be `Queued` if the ingestion is queued, or `Success` if the ingestion is streaming and successful."
45
46    database: str
47    "The name of the database where the ingestion was performed."
48
49    table: str
50    "The name of the table where the ingestion was performed."
51
52    source_id: uuid.UUID
53    "The source id of the ingestion."
54
55    blob_uri: Optional[str]
56    "The blob uri of the ingestion, if exists."
57
58    def __init__(self, status: IngestionStatus, database: str, table: str, source_id: uuid.UUID, blob_uri: Optional[str] = None):
59        self.status = status
60        self.database = database
61        self.table = table
62        self.source_id = source_id
63        self.blob_uri = blob_uri
64
65    def __repr__(self):
66        # Remove query parameters from blob_uri, if exists
67        obfuscated_path = None
68        if isinstance(self.blob_uri, str):
69            obfuscated_path = self.blob_uri.split("?")[0].split(";")[0]
70        blob_uri = f", obfuscated_blob_uri={obfuscated_path}" if obfuscated_path else ""
71        return f"IngestionResult(status={self.status}, database={self.database}, table={self.table}, source_id={self.source_id}{blob_uri})"

The result of an ingestion.

IngestionResult( status: IngestionStatus, database: str, table: str, source_id: uuid.UUID, blob_uri: Optional[str] = None)
58    def __init__(self, status: IngestionStatus, database: str, table: str, source_id: uuid.UUID, blob_uri: Optional[str] = None):
59        self.status = status
60        self.database = database
61        self.table = table
62        self.source_id = source_id
63        self.blob_uri = blob_uri
status: IngestionStatus

Will be Queued if the ingestion is queued, or Success if the ingestion is streaming and successful.

database: str

The name of the database where the ingestion was performed.

table: str

The name of the table where the ingestion was performed.

source_id: uuid.UUID

The source id of the ingestion.

blob_uri: Optional[str]

The blob uri of the ingestion, if exists.

class IngestionStatus(enum.Enum):
26class IngestionStatus(Enum):
27    """
28    The ingestion was queued.
29    """
30
31    QUEUED = "QUEUED"
32    """
33    The ingestion was successfully streamed
34    """
35    SUCCESS = "SUCCESS"

The ingestion was queued.

QUEUED = <IngestionStatus.QUEUED: 'QUEUED'>

The ingestion was successfully streamed

SUCCESS = <IngestionStatus.SUCCESS: 'SUCCESS'>
class BlobDescriptor(ingest.descriptors.DescriptorBase):
131class BlobDescriptor(DescriptorBase):
132    """BlobDescriptor is used to describe a blob that will be used as an ingestion source"""
133
134    _BLOB_URI = "blob_uri"
135
136    def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None):
137        """
138        :param path: blob uri.
139        :type path: str.
140        :param size: estimated size of file if known.
141        :type size: Optional[int].
142        :param source_id: a v4 uuid to serve as the source's id.
143        :type source_id: OptionalUUID
144        """
145        self.path: str = path
146        self.size: Optional[int] = size
147        self.source_id: uuid.UUID = ensure_uuid(source_id)
148
149    def get_tracing_attributes(self) -> dict:
150        # Remove query parameters from self.path, if exists
151        if self.path:
152            obfuscated_path = self.path.split("?")[0].split(";")[0]
153        return {self._BLOB_URI: obfuscated_path, self._SOURCE_ID: str(self.source_id)}
154
155    def fill_size(self):
156        if not self.size:
157            self.size = BlobClient.from_blob_url(self.path).get_blob_properties().size

BlobDescriptor is used to describe a blob that will be used as an ingestion source

BlobDescriptor( path: str, size: Optional[int] = None, source_id: Union[str, uuid.UUID, NoneType] = None)
136    def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None):
137        """
138        :param path: blob uri.
139        :type path: str.
140        :param size: estimated size of file if known.
141        :type size: Optional[int].
142        :param source_id: a v4 uuid to serve as the source's id.
143        :type source_id: OptionalUUID
144        """
145        self.path: str = path
146        self.size: Optional[int] = size
147        self.source_id: uuid.UUID = ensure_uuid(source_id)
Parameters
  • path: blob uri.
  • size: estimated size of file if known.
  • source_id: a v4 uuid to serve as the source's id.
path: str
size: Optional[int]
source_id: uuid.UUID
def get_tracing_attributes(self) -> dict:
149    def get_tracing_attributes(self) -> dict:
150        # Remove query parameters from self.path, if exists
151        if self.path:
152            obfuscated_path = self.path.split("?")[0].split(";")[0]
153        return {self._BLOB_URI: obfuscated_path, self._SOURCE_ID: str(self.source_id)}

Gets dictionary of attributes to be documented during tracing

def fill_size(self):
155    def fill_size(self):
156        if not self.size:
157            self.size = BlobClient.from_blob_url(self.path).get_blob_properties().size
class FileDescriptor(ingest.descriptors.DescriptorBase):
 41class FileDescriptor(DescriptorBase):
 42    """FileDescriptor is used to describe a file that will be used as an ingestion source."""
 43
 44    # Gzip keeps the decompressed stream size as a UINT32 in the last 4 bytes of the stream, however this poses a limit to the expressed size which is 4GB
 45    # The standard says that when the size is bigger than 4GB, the UINT rolls over.
 46    # The below constant expresses the maximal size of a compressed stream that will not cause the UINT32 to rollover given a maximal compression ratio of 1:40
 47    GZIP_MAX_DISK_SIZE_FOR_DETECTION = int(4 * 1024 * 1024 * 1024 / 40)
 48    DEFAULT_COMPRESSION_RATIO = 11
 49    _FILE_PATH = "file_path"
 50
 51    def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None):
 52        """
 53        :param path: file path.
 54        :type path: str.
 55        :param size: estimated size of file if known. if None or 0 will try to guess.
 56        :type size: Optional[int].
 57        :param source_id: a v4 uuid to serve as the source's id.
 58        :type source_id: OptionalUUID
 59        """
 60        self.path: str = path
 61        self._size: Optional[int] = size
 62        self._detect_size_once: bool = not size
 63
 64        self.source_id: uuid.UUID = ensure_uuid(source_id)
 65        self.stream_name: str = os.path.basename(self.path)
 66
 67    @property
 68    def size(self) -> int:
 69        if self._detect_size_once:
 70            self._detect_size()
 71            self._detect_size_once = False
 72
 73        return self._size
 74
 75    @size.setter
 76    def size(self, size: int):
 77        if size:
 78            self._size = size
 79            self._detect_size_once = False
 80
 81    def _detect_size(self):
 82        uncompressed_size = 0
 83        if self.path.endswith(".gz"):
 84            # This logic follow after the C# implementation
 85            # See IngstionHelpers.cs for an explanation as to what stands behind it
 86            with open(self.path, "rb") as f:
 87                disk_size = f.seek(-4, SEEK_END)
 88                uncompressed_size = struct.unpack("I", f.read(4))[0]
 89                if (disk_size >= uncompressed_size) or (disk_size >= self.GZIP_MAX_DISK_SIZE_FOR_DETECTION):
 90                    uncompressed_size = disk_size * self.DEFAULT_COMPRESSION_RATIO
 91
 92        elif self.path.endswith(".zip"):
 93            with ZipFile(self.path) as zip_archive:
 94                for f in zip_archive.infolist():
 95                    uncompressed_size += f.file_size
 96
 97        else:
 98            uncompressed_size = os.path.getsize(self.path)
 99
100        self._size = uncompressed_size
101
102    @property
103    def is_compressed(self) -> bool:
104        return self.path.endswith(".gz") or self.path.endswith(".zip")
105
106    def open(self, should_compress: bool) -> BytesIO:
107        if should_compress:
108            file_stream = self.compress_stream()
109        else:
110            file_stream = open(self.path, "rb")
111        return file_stream
112
113    def compress_stream(self) -> BytesIO:
114        self.stream_name += ".gz"
115        file_stream = BytesIO()
116        with open(self.path, "rb") as f_in, GzipFile(filename="data", fileobj=file_stream, mode="wb") as f_out:
117            shutil.copyfileobj(f_in, f_out)
118        file_stream.seek(0)
119        return file_stream
120
121    def get_tracing_attributes(self) -> dict:
122        return {self._FILE_PATH: self.stream_name, self._SOURCE_ID: str(self.source_id)}
123
124    @classmethod
125    def get_instance(cls, file_descriptor: Union["FileDescriptor", str]) -> "FileDescriptor":
126        if not isinstance(file_descriptor, cls):
127            return cls(file_descriptor)
128        return file_descriptor

FileDescriptor is used to describe a file that will be used as an ingestion source.

FileDescriptor( path: str, size: Optional[int] = None, source_id: Union[str, uuid.UUID, NoneType] = None)
51    def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None):
52        """
53        :param path: file path.
54        :type path: str.
55        :param size: estimated size of file if known. if None or 0 will try to guess.
56        :type size: Optional[int].
57        :param source_id: a v4 uuid to serve as the source's id.
58        :type source_id: OptionalUUID
59        """
60        self.path: str = path
61        self._size: Optional[int] = size
62        self._detect_size_once: bool = not size
63
64        self.source_id: uuid.UUID = ensure_uuid(source_id)
65        self.stream_name: str = os.path.basename(self.path)
Parameters
  • path: file path.
  • size: estimated size of file if known. if None or 0 will try to guess.
  • source_id: a v4 uuid to serve as the source's id.
GZIP_MAX_DISK_SIZE_FOR_DETECTION = 107374182
DEFAULT_COMPRESSION_RATIO = 11
path: str
source_id: uuid.UUID
stream_name: str
size: int
67    @property
68    def size(self) -> int:
69        if self._detect_size_once:
70            self._detect_size()
71            self._detect_size_once = False
72
73        return self._size
is_compressed: bool
102    @property
103    def is_compressed(self) -> bool:
104        return self.path.endswith(".gz") or self.path.endswith(".zip")
def open(self, should_compress: bool) -> _io.BytesIO:
106    def open(self, should_compress: bool) -> BytesIO:
107        if should_compress:
108            file_stream = self.compress_stream()
109        else:
110            file_stream = open(self.path, "rb")
111        return file_stream
def compress_stream(self) -> _io.BytesIO:
113    def compress_stream(self) -> BytesIO:
114        self.stream_name += ".gz"
115        file_stream = BytesIO()
116        with open(self.path, "rb") as f_in, GzipFile(filename="data", fileobj=file_stream, mode="wb") as f_out:
117            shutil.copyfileobj(f_in, f_out)
118        file_stream.seek(0)
119        return file_stream
def get_tracing_attributes(self) -> dict:
121    def get_tracing_attributes(self) -> dict:
122        return {self._FILE_PATH: self.stream_name, self._SOURCE_ID: str(self.source_id)}

Gets dictionary of attributes to be documented during tracing

@classmethod
def get_instance( cls, file_descriptor: Union[FileDescriptor, str]) -> FileDescriptor:
124    @classmethod
125    def get_instance(cls, file_descriptor: Union["FileDescriptor", str]) -> "FileDescriptor":
126        if not isinstance(file_descriptor, cls):
127            return cls(file_descriptor)
128        return file_descriptor
class StreamDescriptor(ingest.descriptors.DescriptorBase):
160class StreamDescriptor(DescriptorBase):
161    """StreamDescriptor is used to describe a stream that will be used as ingestion source"""
162
163    _STREAM_NAME = "stream_name"
164
165    # TODO: currently we always assume that streams are gz compressed (will get compressed before sending), should we expand that?
166    def __init__(
167        self, stream: IO[AnyStr], source_id: OptionalUUID = None, is_compressed: bool = False, stream_name: Optional[str] = None, size: Optional[int] = None
168    ):
169        """
170        :param stream: in-memory stream object.
171        :type stream: io.BaseIO
172        :param source_id: a v4 uuid to serve as the sources id.
173        :type source_id: OptionalUUID
174        :param is_compressed: specify if the provided stream is compressed
175        :type is_compressed: boolean
176        """
177        self.stream: IO[AnyStr] = stream
178        self.source_id: uuid.UUID = ensure_uuid(source_id)
179        self.is_compressed: bool = is_compressed
180        self.stream_name: str = stream_name
181        if self.stream_name is None:
182            self.stream_name = "stream"
183            if is_compressed:
184                self.stream_name += ".gz"
185        self.size: Optional[int] = size
186
187    def compress_stream(self) -> None:
188        stream = self.stream
189        zipped_stream = BytesIO()
190        stream_buffer = stream.read()
191        with GzipFile(filename="data", fileobj=zipped_stream, mode="wb") as f_out:
192            if isinstance(stream_buffer, str):
193                data = bytes(stream_buffer, "utf-8")
194                f_out.write(data)
195            else:
196                f_out.write(stream_buffer)
197        zipped_stream.seek(0)
198        self.is_compressed = True
199        self.stream_name += ".gz"
200        self.stream = zipped_stream
201
202    @staticmethod
203    def from_file_descriptor(file_descriptor: Union[FileDescriptor, str]) -> "StreamDescriptor":
204        """
205        Transforms FileDescriptor instance into StreamDescriptor instance. Note that stream is open when instance is returned
206        :param Union[FileDescriptor, str] file_descriptor: File Descriptor instance
207        :return new StreamDescriptor instance
208        """
209        descriptor = FileDescriptor.get_instance(file_descriptor)
210        stream = open(descriptor.path, "rb")
211        is_compressed = descriptor.path.endswith(".gz") or descriptor.path.endswith(".zip")
212        stream_descriptor = StreamDescriptor(stream, descriptor.source_id, is_compressed, descriptor.stream_name, descriptor.size)
213        return stream_descriptor
214
215    @classmethod
216    def get_instance(cls, stream_descriptor: Union["StreamDescriptor", IO[AnyStr]]) -> "StreamDescriptor":
217        if not isinstance(stream_descriptor, cls):
218            descriptor = cls(stream_descriptor)
219        else:
220            descriptor = copy(stream_descriptor)
221        return descriptor
222
223    def get_tracing_attributes(self) -> dict:
224        return {self._STREAM_NAME: self.stream_name, self._SOURCE_ID: str(self.source_id)}

StreamDescriptor is used to describe a stream that will be used as ingestion source

StreamDescriptor( stream: IO[~AnyStr], source_id: Union[str, uuid.UUID, NoneType] = None, is_compressed: bool = False, stream_name: Optional[str] = None, size: Optional[int] = None)
166    def __init__(
167        self, stream: IO[AnyStr], source_id: OptionalUUID = None, is_compressed: bool = False, stream_name: Optional[str] = None, size: Optional[int] = None
168    ):
169        """
170        :param stream: in-memory stream object.
171        :type stream: io.BaseIO
172        :param source_id: a v4 uuid to serve as the sources id.
173        :type source_id: OptionalUUID
174        :param is_compressed: specify if the provided stream is compressed
175        :type is_compressed: boolean
176        """
177        self.stream: IO[AnyStr] = stream
178        self.source_id: uuid.UUID = ensure_uuid(source_id)
179        self.is_compressed: bool = is_compressed
180        self.stream_name: str = stream_name
181        if self.stream_name is None:
182            self.stream_name = "stream"
183            if is_compressed:
184                self.stream_name += ".gz"
185        self.size: Optional[int] = size
Parameters
  • stream: in-memory stream object.
  • source_id: a v4 uuid to serve as the sources id.
  • is_compressed: specify if the provided stream is compressed
stream: IO[~AnyStr]
source_id: uuid.UUID
is_compressed: bool
stream_name: str
size: Optional[int]
def compress_stream(self) -> None:
187    def compress_stream(self) -> None:
188        stream = self.stream
189        zipped_stream = BytesIO()
190        stream_buffer = stream.read()
191        with GzipFile(filename="data", fileobj=zipped_stream, mode="wb") as f_out:
192            if isinstance(stream_buffer, str):
193                data = bytes(stream_buffer, "utf-8")
194                f_out.write(data)
195            else:
196                f_out.write(stream_buffer)
197        zipped_stream.seek(0)
198        self.is_compressed = True
199        self.stream_name += ".gz"
200        self.stream = zipped_stream
@staticmethod
def from_file_descriptor( file_descriptor: Union[FileDescriptor, str]) -> StreamDescriptor:
202    @staticmethod
203    def from_file_descriptor(file_descriptor: Union[FileDescriptor, str]) -> "StreamDescriptor":
204        """
205        Transforms FileDescriptor instance into StreamDescriptor instance. Note that stream is open when instance is returned
206        :param Union[FileDescriptor, str] file_descriptor: File Descriptor instance
207        :return new StreamDescriptor instance
208        """
209        descriptor = FileDescriptor.get_instance(file_descriptor)
210        stream = open(descriptor.path, "rb")
211        is_compressed = descriptor.path.endswith(".gz") or descriptor.path.endswith(".zip")
212        stream_descriptor = StreamDescriptor(stream, descriptor.source_id, is_compressed, descriptor.stream_name, descriptor.size)
213        return stream_descriptor

Transforms FileDescriptor instance into StreamDescriptor instance. Note that stream is open when instance is returned

Parameters
  • Union[FileDescriptor, str] file_descriptor: File Descriptor instance :return new StreamDescriptor instance
@classmethod
def get_instance( cls, stream_descriptor: Union[StreamDescriptor, IO[~AnyStr]]) -> StreamDescriptor:
215    @classmethod
216    def get_instance(cls, stream_descriptor: Union["StreamDescriptor", IO[AnyStr]]) -> "StreamDescriptor":
217        if not isinstance(stream_descriptor, cls):
218            descriptor = cls(stream_descriptor)
219        else:
220            descriptor = copy(stream_descriptor)
221        return descriptor
def get_tracing_attributes(self) -> dict:
223    def get_tracing_attributes(self) -> dict:
224        return {self._STREAM_NAME: self.stream_name, self._SOURCE_ID: str(self.source_id)}

Gets dictionary of attributes to be documented during tracing

class KustoMissingMappingError(ingest.KustoClientError):
24class KustoMissingMappingError(KustoClientError):
25    """
26    Raised when provided a mapping kind without a mapping reference or column mapping.
27    """

Raised when provided a mapping kind without a mapping reference or column mapping.

class KustoMappingError(ingest.KustoClientError):
 7class KustoMappingError(KustoClientError):
 8    """
 9    Raised when the provided mapping arguments are invalid.
10    """

Raised when the provided mapping arguments are invalid.

class KustoQueueError(ingest.KustoClientError):
40class KustoQueueError(KustoClientError):
41    """Raised when not succeeding to upload message to queue in all retries"""
42
43    def __init__(self):
44        message = "Failed to upload message to queues in all reties."
45        super(KustoQueueError, self).__init__(message)

Raised when not succeeding to upload message to queue in all retries

class KustoDuplicateMappingError(ingest.KustoClientError):
13class KustoDuplicateMappingError(KustoClientError):
14    """
15    Raised when ingestion properties include both
16    column mappings and a mapping reference
17    """
18
19    def __init__(self):
20        message = "Ingestion properties can't contain both an explicit mapping and a mapping reference."
21        super(KustoDuplicateMappingError, self).__init__(message)

Raised when ingestion properties include both column mappings and a mapping reference

class KustoInvalidEndpointError(ingest.KustoClientError):
30class KustoInvalidEndpointError(KustoClientError):
31    """Raised when trying to ingest to invalid cluster type."""
32
33    def __init__(self, expected_service_type, actual_service_type, suggested_endpoint_url=None):
34        message = f"You are using '{expected_service_type}' client type, but the provided endpoint is of ServiceType '{actual_service_type}'. Initialize the client with the appropriate endpoint URI"
35        if suggested_endpoint_url:
36            message = message + ": '" + suggested_endpoint_url + "'"
37        super(KustoInvalidEndpointError, self).__init__(message)

Raised when trying to ingest to invalid cluster type.

KustoInvalidEndpointError( expected_service_type, actual_service_type, suggested_endpoint_url=None)
33    def __init__(self, expected_service_type, actual_service_type, suggested_endpoint_url=None):
34        message = f"You are using '{expected_service_type}' client type, but the provided endpoint is of ServiceType '{actual_service_type}'. Initialize the client with the appropriate endpoint URI"
35        if suggested_endpoint_url:
36            message = message + ": '" + suggested_endpoint_url + "'"
37        super(KustoInvalidEndpointError, self).__init__(message)
class KustoClientError(azure.kusto.data.exceptions.KustoError):
148class KustoClientError(KustoError):
149    """Raised when a Kusto client is unable to send or complete a request."""

Raised when a Kusto client is unable to send or complete a request.

class QueuedIngestClient(ingest.BaseIngestClient):
 26class QueuedIngestClient(BaseIngestClient):
 27    """
 28    Queued ingest client provides methods to allow queued ingestion into kusto (ADX).
 29    To learn more about the different types of ingestions and when to use each, visit:
 30    https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
 31    """
 32
 33    _INGEST_PREFIX = "ingest-"
 34    _SERVICE_CLIENT_TIMEOUT_SECONDS = 10 * 60
 35    _MAX_RETRIES = 3
 36
 37    def __init__(self, kcsb: Union[str, KustoConnectionStringBuilder], auto_correct_endpoint: bool = True):
 38        """Kusto Ingest Client constructor.
 39        :param kcsb: The connection string to initialize KustoClient.
 40        """
 41        super().__init__()
 42        if not isinstance(kcsb, KustoConnectionStringBuilder):
 43            kcsb = KustoConnectionStringBuilder(kcsb)
 44
 45        if auto_correct_endpoint:
 46            kcsb["Data Source"] = BaseIngestClient.get_ingestion_endpoint(kcsb.data_source)
 47
 48        self._proxy_dict: Optional[Dict[str, str]] = None
 49        self._connection_datasource = kcsb.data_source
 50        self._resource_manager = _ResourceManager(KustoClient(kcsb))
 51        self._endpoint_service_type = None
 52        self._suggested_endpoint_uri = None
 53        self.application_for_tracing = kcsb.client_details.application_for_tracing
 54        self.client_version_for_tracing = kcsb.client_details.version_for_tracing
 55
 56    def close(self) -> None:
 57        self._resource_manager.close()
 58        super().close()
 59
 60    def set_proxy(self, proxy_url: str):
 61        self._resource_manager.set_proxy(proxy_url)
 62        self._proxy_dict = {"http": proxy_url, "https": proxy_url}
 63
 64    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_file", kind=SpanKind.CLIENT)
 65    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
 66        """Enqueue an ingest command from local files.
 67        To learn more about ingestion methods go to:
 68        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
 69        :param file_descriptor: a FileDescriptor to be ingested.
 70        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
 71        """
 72        file_descriptor = FileDescriptor.get_instance(file_descriptor)
 73        IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
 74
 75        super().ingest_from_file(file_descriptor, ingestion_properties)
 76
 77        containers = self._get_containers()
 78
 79        file_descriptor, should_compress = BaseIngestClient._prepare_file(file_descriptor, ingestion_properties)
 80        with file_descriptor.open(should_compress) as stream:
 81            blob_descriptor = self.upload_blob(
 82                containers,
 83                file_descriptor,
 84                ingestion_properties.database,
 85                ingestion_properties.table,
 86                stream,
 87                self._proxy_dict,
 88                self._SERVICE_CLIENT_TIMEOUT_SECONDS,
 89                self._MAX_RETRIES,
 90            )
 91        return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)
 92
 93    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_stream", kind=SpanKind.CLIENT)
 94    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
 95        """Ingest from io streams.
 96        :param stream_descriptor: An object that contains a description of the stream to be ingested.
 97        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
 98        """
 99        stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
100        IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
101
102        super().ingest_from_stream(stream_descriptor, ingestion_properties)
103
104        containers = self._get_containers()
105
106        stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
107        blob_descriptor = self.upload_blob(
108            containers,
109            stream_descriptor,
110            ingestion_properties.database,
111            ingestion_properties.table,
112            stream_descriptor.stream,
113            self._proxy_dict,
114            self._SERVICE_CLIENT_TIMEOUT_SECONDS,
115            self._MAX_RETRIES,
116        )
117        return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)
118
119    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_blob", kind=SpanKind.CLIENT)
120    def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult:
121        """Enqueue an ingest command from azure blobs.
122        To learn more about ingestion methods go to:
123        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
124        :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
125        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
126        """
127        IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
128
129        if self._is_closed:
130            raise KustoClosedError()
131
132        queues = self._resource_manager.get_ingestion_queues()
133
134        authorization_context = self._resource_manager.get_authorization_context()
135        ingestion_blob_info = IngestionBlobInfo(
136            blob_descriptor,
137            ingestion_properties=ingestion_properties,
138            auth_context=authorization_context,
139            application_for_tracing=self.application_for_tracing,
140            client_version_for_tracing=self.client_version_for_tracing,
141        )
142        ingestion_blob_info_json = ingestion_blob_info.to_json()
143        retries_left = min(self._MAX_RETRIES, len(queues))
144        for queue in queues:
145            try:
146                with QueueServiceClient(queue.account_uri, proxies=self._proxy_dict) as queue_service:
147                    with queue_service.get_queue_client(queue=queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client:
148                        # trace enqueuing of blob for ingestion
149                        invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS)
150                        enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id)
151                        MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes)
152
153                self._resource_manager.report_resource_usage_result(queue.storage_account_name, True)
154                return IngestionResult(
155                    IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path
156                )
157            except Exception as e:
158                retries_left = retries_left - 1
159                # TODO: log the retry once we have a proper logging system
160                self._resource_manager.report_resource_usage_result(queue.storage_account_name, False)
161                if retries_left == 0:
162                    raise KustoQueueError() from e
163
164    def _get_containers(self) -> List[_ResourceUri]:
165        return self._resource_manager.get_containers()
166
167    def upload_blob(
168        self,
169        containers: List[_ResourceUri],
170        descriptor: Union[FileDescriptor, "StreamDescriptor"],
171        database: str,
172        table: str,
173        stream: IO[AnyStr],
174        proxy_dict: Optional[Dict[str, str]],
175        timeout: int,
176        max_retries: int,
177    ) -> "BlobDescriptor":
178        """
179        Uploads and transforms FileDescriptor or StreamDescriptor into a BlobDescriptor instance
180        :param List[_ResourceUri] containers: blob containers
181        :param Union[FileDescriptor, "StreamDescriptor"] descriptor:
182        :param string database: database to be ingested to
183        :param string table: table to be ingested to
184        :param IO[AnyStr] stream: stream to be ingested from
185        :param Optional[Dict[str, str]] proxy_dict: proxy urls
186        :param int timeout: Azure service call timeout in seconds
187        :return new BlobDescriptor instance
188        """
189        blob_name = "{db}__{table}__{guid}__{file}".format(db=database, table=table, guid=descriptor.source_id, file=descriptor.stream_name)
190
191        retries_left = min(max_retries, len(containers))
192        for container in containers:
193            try:
194                blob_service = BlobServiceClient(container.account_uri, proxies=proxy_dict)
195                blob_client = blob_service.get_blob_client(container=container.object_name, blob=blob_name)
196                blob_client.upload_blob(data=stream, timeout=timeout)
197                self._resource_manager.report_resource_usage_result(container.storage_account_name, True)
198                return BlobDescriptor(blob_client.url, descriptor.size, descriptor.source_id)
199            except Exception as e:
200                retries_left = retries_left - 1
201                # TODO: log the retry once we have a proper logging system
202                self._resource_manager.report_resource_usage_result(container.storage_account_name, False)
203                if retries_left == 0:
204                    raise KustoBlobError(e)

Queued ingest client provides methods to allow queued ingestion into kusto (ADX). To learn more about the different types of ingestions and when to use each, visit: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods

QueuedIngestClient( kcsb: Union[str, azure.kusto.data.kcsb.KustoConnectionStringBuilder], auto_correct_endpoint: bool = True)
37    def __init__(self, kcsb: Union[str, KustoConnectionStringBuilder], auto_correct_endpoint: bool = True):
38        """Kusto Ingest Client constructor.
39        :param kcsb: The connection string to initialize KustoClient.
40        """
41        super().__init__()
42        if not isinstance(kcsb, KustoConnectionStringBuilder):
43            kcsb = KustoConnectionStringBuilder(kcsb)
44
45        if auto_correct_endpoint:
46            kcsb["Data Source"] = BaseIngestClient.get_ingestion_endpoint(kcsb.data_source)
47
48        self._proxy_dict: Optional[Dict[str, str]] = None
49        self._connection_datasource = kcsb.data_source
50        self._resource_manager = _ResourceManager(KustoClient(kcsb))
51        self._endpoint_service_type = None
52        self._suggested_endpoint_uri = None
53        self.application_for_tracing = kcsb.client_details.application_for_tracing
54        self.client_version_for_tracing = kcsb.client_details.version_for_tracing

Kusto Ingest Client constructor.

Parameters
  • kcsb: The connection string to initialize KustoClient.
application_for_tracing
client_version_for_tracing
def close(self) -> None:
56    def close(self) -> None:
57        self._resource_manager.close()
58        super().close()
def set_proxy(self, proxy_url: str):
60    def set_proxy(self, proxy_url: str):
61        self._resource_manager.set_proxy(proxy_url)
62        self._proxy_dict = {"http": proxy_url, "https": proxy_url}

Set proxy for the ingestion client.

Parameters
  • str proxy_url: proxy url.
@distributed_trace(name_of_span='QueuedIngestClient.ingest_from_file', kind=SpanKind.CLIENT)
def ingest_from_file( self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
64    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_file", kind=SpanKind.CLIENT)
65    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
66        """Enqueue an ingest command from local files.
67        To learn more about ingestion methods go to:
68        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
69        :param file_descriptor: a FileDescriptor to be ingested.
70        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
71        """
72        file_descriptor = FileDescriptor.get_instance(file_descriptor)
73        IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
74
75        super().ingest_from_file(file_descriptor, ingestion_properties)
76
77        containers = self._get_containers()
78
79        file_descriptor, should_compress = BaseIngestClient._prepare_file(file_descriptor, ingestion_properties)
80        with file_descriptor.open(should_compress) as stream:
81            blob_descriptor = self.upload_blob(
82                containers,
83                file_descriptor,
84                ingestion_properties.database,
85                ingestion_properties.table,
86                stream,
87                self._proxy_dict,
88                self._SERVICE_CLIENT_TIMEOUT_SECONDS,
89                self._MAX_RETRIES,
90            )
91        return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)

Enqueue an ingest command from local files. To learn more about ingestion methods go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods

Parameters
  • file_descriptor: a FileDescriptor to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
@distributed_trace(name_of_span='QueuedIngestClient.ingest_from_stream', kind=SpanKind.CLIENT)
def ingest_from_stream( self, stream_descriptor: Union[StreamDescriptor, IO[~AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
 93    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_stream", kind=SpanKind.CLIENT)
 94    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
 95        """Ingest from io streams.
 96        :param stream_descriptor: An object that contains a description of the stream to be ingested.
 97        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
 98        """
 99        stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
100        IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
101
102        super().ingest_from_stream(stream_descriptor, ingestion_properties)
103
104        containers = self._get_containers()
105
106        stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
107        blob_descriptor = self.upload_blob(
108            containers,
109            stream_descriptor,
110            ingestion_properties.database,
111            ingestion_properties.table,
112            stream_descriptor.stream,
113            self._proxy_dict,
114            self._SERVICE_CLIENT_TIMEOUT_SECONDS,
115            self._MAX_RETRIES,
116        )
117        return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)

Ingest from io streams.

Parameters
  • stream_descriptor: An object that contains a description of the stream to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
@distributed_trace(name_of_span='QueuedIngestClient.ingest_from_blob', kind=SpanKind.CLIENT)
def ingest_from_blob( self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult:
119    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_blob", kind=SpanKind.CLIENT)
120    def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult:
121        """Enqueue an ingest command from azure blobs.
122        To learn more about ingestion methods go to:
123        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
124        :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
125        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
126        """
127        IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
128
129        if self._is_closed:
130            raise KustoClosedError()
131
132        queues = self._resource_manager.get_ingestion_queues()
133
134        authorization_context = self._resource_manager.get_authorization_context()
135        ingestion_blob_info = IngestionBlobInfo(
136            blob_descriptor,
137            ingestion_properties=ingestion_properties,
138            auth_context=authorization_context,
139            application_for_tracing=self.application_for_tracing,
140            client_version_for_tracing=self.client_version_for_tracing,
141        )
142        ingestion_blob_info_json = ingestion_blob_info.to_json()
143        retries_left = min(self._MAX_RETRIES, len(queues))
144        for queue in queues:
145            try:
146                with QueueServiceClient(queue.account_uri, proxies=self._proxy_dict) as queue_service:
147                    with queue_service.get_queue_client(queue=queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client:
148                        # trace enqueuing of blob for ingestion
149                        invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS)
150                        enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id)
151                        MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes)
152
153                self._resource_manager.report_resource_usage_result(queue.storage_account_name, True)
154                return IngestionResult(
155                    IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path
156                )
157            except Exception as e:
158                retries_left = retries_left - 1
159                # TODO: log the retry once we have a proper logging system
160                self._resource_manager.report_resource_usage_result(queue.storage_account_name, False)
161                if retries_left == 0:
162                    raise KustoQueueError() from e

Enqueue an ingest command from azure blobs. To learn more about ingestion methods go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods

Parameters
  • azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
def upload_blob( self, containers: List[ingest._resource_manager._ResourceUri], descriptor: Union[FileDescriptor, StreamDescriptor], database: str, table: str, stream: IO[~AnyStr], proxy_dict: Optional[Dict[str, str]], timeout: int, max_retries: int) -> BlobDescriptor:
167    def upload_blob(
168        self,
169        containers: List[_ResourceUri],
170        descriptor: Union[FileDescriptor, "StreamDescriptor"],
171        database: str,
172        table: str,
173        stream: IO[AnyStr],
174        proxy_dict: Optional[Dict[str, str]],
175        timeout: int,
176        max_retries: int,
177    ) -> "BlobDescriptor":
178        """
179        Uploads and transforms FileDescriptor or StreamDescriptor into a BlobDescriptor instance
180        :param List[_ResourceUri] containers: blob containers
181        :param Union[FileDescriptor, "StreamDescriptor"] descriptor:
182        :param string database: database to be ingested to
183        :param string table: table to be ingested to
184        :param IO[AnyStr] stream: stream to be ingested from
185        :param Optional[Dict[str, str]] proxy_dict: proxy urls
186        :param int timeout: Azure service call timeout in seconds
187        :return new BlobDescriptor instance
188        """
189        blob_name = "{db}__{table}__{guid}__{file}".format(db=database, table=table, guid=descriptor.source_id, file=descriptor.stream_name)
190
191        retries_left = min(max_retries, len(containers))
192        for container in containers:
193            try:
194                blob_service = BlobServiceClient(container.account_uri, proxies=proxy_dict)
195                blob_client = blob_service.get_blob_client(container=container.object_name, blob=blob_name)
196                blob_client.upload_blob(data=stream, timeout=timeout)
197                self._resource_manager.report_resource_usage_result(container.storage_account_name, True)
198                return BlobDescriptor(blob_client.url, descriptor.size, descriptor.source_id)
199            except Exception as e:
200                retries_left = retries_left - 1
201                # TODO: log the retry once we have a proper logging system
202                self._resource_manager.report_resource_usage_result(container.storage_account_name, False)
203                if retries_left == 0:
204                    raise KustoBlobError(e)

Uploads and transforms FileDescriptor or StreamDescriptor into a BlobDescriptor instance

Parameters
  • List[_ResourceUri] containers: blob containers
  • Union[FileDescriptor, "StreamDescriptor"] descriptor:
  • string database: database to be ingested to
  • string table: table to be ingested to
  • IO[AnyStr] stream: stream to be ingested from
  • Optional[Dict[str, str]] proxy_dict: proxy urls
  • int timeout: Azure service call timeout in seconds :return new BlobDescriptor instance
class ValidationPolicy:
26class ValidationPolicy:
27    """Validation policy to ingest command."""
28
29    def __init__(self, validation_options=ValidationOptions.DoNotValidate, validation_implications=ValidationImplications.BestEffort):
30        self.ValidationOptions = validation_options
31        self.ValidationImplications = validation_implications

Validation policy to ingest command.

ValidationPolicy( validation_options=<ValidationOptions.DoNotValidate: 0>, validation_implications=<ValidationImplications.BestEffort: 1>)
29    def __init__(self, validation_options=ValidationOptions.DoNotValidate, validation_implications=ValidationImplications.BestEffort):
30        self.ValidationOptions = validation_options
31        self.ValidationImplications = validation_implications
ValidationOptions
ValidationImplications
class ValidationImplications(enum.IntEnum):
19class ValidationImplications(IntEnum):
20    """Validation implications to ingest command."""
21
22    Fail = 0
23    BestEffort = 1

Validation implications to ingest command.

class ValidationOptions(enum.IntEnum):
11class ValidationOptions(IntEnum):
12    """Validation options to ingest command."""
13
14    DoNotValidate = 0
15    ValidateCsvInputConstantColumns = 1
16    ValidateCsvInputColumnLevelOnly = 2

Validation options to ingest command.

DoNotValidate = <ValidationOptions.DoNotValidate: 0>
ValidateCsvInputConstantColumns = <ValidationOptions.ValidateCsvInputConstantColumns: 1>
ValidateCsvInputColumnLevelOnly = <ValidationOptions.ValidateCsvInputColumnLevelOnly: 2>
class ReportLevel(enum.IntEnum):
34class ReportLevel(IntEnum):
35    """Report level to ingest command."""
36
37    FailuresOnly = 0
38    DoNotReport = 1
39    FailuresAndSuccesses = 2

Report level to ingest command.

FailuresOnly = <ReportLevel.FailuresOnly: 0>
DoNotReport = <ReportLevel.DoNotReport: 1>
FailuresAndSuccesses = <ReportLevel.FailuresAndSuccesses: 2>
class ReportMethod(enum.IntEnum):
42class ReportMethod(IntEnum):
43    """Report method to ingest command."""
44
45    Queue = 0

Report method to ingest command.

Queue = <ReportMethod.Queue: 0>
class IngestionProperties:
150class IngestionProperties:
151    """
152    Class to represent ingestion properties.
153    For more information check out https://docs.microsoft.com/en-us/azure/data-explorer/ingestion-properties
154    """
155
156    _DATABASE = "database"
157    _TABLE = "table"
158
159    def __init__(
160        self,
161        database: str,
162        table: str,
163        data_format: DataFormat = DataFormat.CSV,
164        column_mappings: Optional[List[ColumnMapping]] = None,
165        ingestion_mapping_kind: Optional[IngestionMappingKind] = None,
166        ingestion_mapping_reference: Optional[str] = None,
167        ingest_if_not_exists: Optional[List[str]] = None,
168        ingest_by_tags: Optional[List[str]] = None,
169        drop_by_tags: Optional[List[str]] = None,
170        additional_tags: Optional[List[str]] = None,
171        flush_immediately: bool = False,
172        ignore_first_record: bool = False,
173        report_level: ReportLevel = ReportLevel.DoNotReport,
174        report_method: ReportMethod = ReportMethod.Queue,
175        validation_policy: Optional[ValidationPolicy] = None,
176        additional_properties: Optional[dict] = None,
177    ):
178        if ingestion_mapping_reference is None and column_mappings is None:
179            if ingestion_mapping_kind is not None:
180                raise KustoMissingMappingError(f"When ingestion mapping kind is set ('{ingestion_mapping_kind.value}'), a mapping must be provided.")
181        else:  # A mapping is provided
182            if ingestion_mapping_kind is not None:
183                if data_format.ingestion_mapping_kind != ingestion_mapping_kind:
184                    raise KustoMappingError(
185                        f"Wrong ingestion mapping for format '{data_format.kusto_value}'; mapping kind should be '{data_format.ingestion_mapping_kind.value}', "
186                        f"but was '{ingestion_mapping_kind.value}'. "
187                    )
188            else:
189                ingestion_mapping_kind = data_format.ingestion_mapping_kind
190
191            if column_mappings is not None:
192                if ingestion_mapping_reference is not None:
193                    raise KustoDuplicateMappingError()
194
195                validation_errors = []
196
197                for mapping in column_mappings:
198                    (valid, mapping_errors) = mapping.is_valid(ingestion_mapping_kind)
199                    if not valid:
200                        validation_errors.extend(f"Column mapping '{mapping.column}' is invalid - '{e}'" for e in mapping_errors)
201
202                if validation_errors:
203                    errors = "\n".join(validation_errors)
204                    raise KustoMappingError(f"Failed with validation errors:\n{errors}")
205
206        self.database = database
207        self.table = table
208        self.format = data_format
209        self.ingestion_mapping = column_mappings
210        self.ingestion_mapping_type = ingestion_mapping_kind
211        self.ingestion_mapping_reference = ingestion_mapping_reference
212        self.additional_tags = additional_tags
213        self.ingest_if_not_exists = ingest_if_not_exists
214        self.ingest_by_tags = ingest_by_tags
215        self.drop_by_tags = drop_by_tags
216        self.flush_immediately = flush_immediately
217        self.ignore_first_record = ignore_first_record
218        self.report_level = report_level
219        self.report_method = report_method
220        self.validation_policy = validation_policy
221        self.additional_properties = additional_properties
222
223    def get_tracing_attributes(self) -> dict:
224        """Gets dictionary of attributes to be documented during tracing"""
225        return {self._DATABASE: self.database, self._TABLE: self.table}

Class to represent ingestion properties. For more information check out https://docs.microsoft.com/en-us/azure/data-explorer/ingestion-properties

IngestionProperties( database: str, table: str, data_format: azure.kusto.data.data_format.DataFormat = <DataFormat.CSV: ('csv', <IngestionMappingKind.CSV: 'Csv'>, True)>, column_mappings: Optional[List[ColumnMapping]] = None, ingestion_mapping_kind: Optional[azure.kusto.data.data_format.IngestionMappingKind] = None, ingestion_mapping_reference: Optional[str] = None, ingest_if_not_exists: Optional[List[str]] = None, ingest_by_tags: Optional[List[str]] = None, drop_by_tags: Optional[List[str]] = None, additional_tags: Optional[List[str]] = None, flush_immediately: bool = False, ignore_first_record: bool = False, report_level: ReportLevel = <ReportLevel.DoNotReport: 1>, report_method: ReportMethod = <ReportMethod.Queue: 0>, validation_policy: Optional[ValidationPolicy] = None, additional_properties: Optional[dict] = None)
159    def __init__(
160        self,
161        database: str,
162        table: str,
163        data_format: DataFormat = DataFormat.CSV,
164        column_mappings: Optional[List[ColumnMapping]] = None,
165        ingestion_mapping_kind: Optional[IngestionMappingKind] = None,
166        ingestion_mapping_reference: Optional[str] = None,
167        ingest_if_not_exists: Optional[List[str]] = None,
168        ingest_by_tags: Optional[List[str]] = None,
169        drop_by_tags: Optional[List[str]] = None,
170        additional_tags: Optional[List[str]] = None,
171        flush_immediately: bool = False,
172        ignore_first_record: bool = False,
173        report_level: ReportLevel = ReportLevel.DoNotReport,
174        report_method: ReportMethod = ReportMethod.Queue,
175        validation_policy: Optional[ValidationPolicy] = None,
176        additional_properties: Optional[dict] = None,
177    ):
178        if ingestion_mapping_reference is None and column_mappings is None:
179            if ingestion_mapping_kind is not None:
180                raise KustoMissingMappingError(f"When ingestion mapping kind is set ('{ingestion_mapping_kind.value}'), a mapping must be provided.")
181        else:  # A mapping is provided
182            if ingestion_mapping_kind is not None:
183                if data_format.ingestion_mapping_kind != ingestion_mapping_kind:
184                    raise KustoMappingError(
185                        f"Wrong ingestion mapping for format '{data_format.kusto_value}'; mapping kind should be '{data_format.ingestion_mapping_kind.value}', "
186                        f"but was '{ingestion_mapping_kind.value}'. "
187                    )
188            else:
189                ingestion_mapping_kind = data_format.ingestion_mapping_kind
190
191            if column_mappings is not None:
192                if ingestion_mapping_reference is not None:
193                    raise KustoDuplicateMappingError()
194
195                validation_errors = []
196
197                for mapping in column_mappings:
198                    (valid, mapping_errors) = mapping.is_valid(ingestion_mapping_kind)
199                    if not valid:
200                        validation_errors.extend(f"Column mapping '{mapping.column}' is invalid - '{e}'" for e in mapping_errors)
201
202                if validation_errors:
203                    errors = "\n".join(validation_errors)
204                    raise KustoMappingError(f"Failed with validation errors:\n{errors}")
205
206        self.database = database
207        self.table = table
208        self.format = data_format
209        self.ingestion_mapping = column_mappings
210        self.ingestion_mapping_type = ingestion_mapping_kind
211        self.ingestion_mapping_reference = ingestion_mapping_reference
212        self.additional_tags = additional_tags
213        self.ingest_if_not_exists = ingest_if_not_exists
214        self.ingest_by_tags = ingest_by_tags
215        self.drop_by_tags = drop_by_tags
216        self.flush_immediately = flush_immediately
217        self.ignore_first_record = ignore_first_record
218        self.report_level = report_level
219        self.report_method = report_method
220        self.validation_policy = validation_policy
221        self.additional_properties = additional_properties
database
table
format
ingestion_mapping
ingestion_mapping_type
ingestion_mapping_reference
additional_tags
ingest_if_not_exists
ingest_by_tags
drop_by_tags
flush_immediately
ignore_first_record
report_level
report_method
validation_policy
additional_properties
def get_tracing_attributes(self) -> dict:
223    def get_tracing_attributes(self) -> dict:
224        """Gets dictionary of attributes to be documented during tracing"""
225        return {self._DATABASE: self.database, self._TABLE: self.table}

Gets dictionary of attributes to be documented during tracing

class IngestionMappingKind(enum.Enum):
 7class IngestionMappingKind(Enum):
 8    CSV = "Csv"
 9    JSON = "Json"
10    AVRO = "Avro"
11    APACHEAVRO = "ApacheAvro"
12    PARQUET = "Parquet"
13    SSTREAM = "SStream"
14    ORC = "Orc"
15    W3CLOGFILE = "W3CLogFile"
16    UNKNOWN = "Unknown"

An enumeration.

CSV = <IngestionMappingKind.CSV: 'Csv'>
JSON = <IngestionMappingKind.JSON: 'Json'>
AVRO = <IngestionMappingKind.AVRO: 'Avro'>
APACHEAVRO = <IngestionMappingKind.APACHEAVRO: 'ApacheAvro'>
PARQUET = <IngestionMappingKind.PARQUET: 'Parquet'>
SSTREAM = <IngestionMappingKind.SSTREAM: 'SStream'>
ORC = <IngestionMappingKind.ORC: 'Orc'>
W3CLOGFILE = <IngestionMappingKind.W3CLOGFILE: 'W3CLogFile'>
UNKNOWN = <IngestionMappingKind.UNKNOWN: 'Unknown'>
class ColumnMapping:
 64class ColumnMapping:
 65    """Use this class to create mappings for IngestionProperties.ingestionMappings and utilize mappings that were not
 66    pre-created (it is recommended to create the mappings in advance and use ingestionMappingReference).
 67    To read more about mappings look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings"""
 68
 69    PATH = "Path"
 70    TRANSFORMATION_METHOD = "Transform"
 71    ORDINAL = "Ordinal"
 72    CONST_VALUE = "ConstValue"
 73    FIELD_NAME = "Field"
 74
 75    NEEDED_PROPERTIES: Dict[IngestionMappingKind, List[str]] = {
 76        IngestionMappingKind.CSV: [ORDINAL, CONST_VALUE, TRANSFORMATION_METHOD],
 77        IngestionMappingKind.JSON: [PATH, CONST_VALUE, TRANSFORMATION_METHOD],
 78        IngestionMappingKind.AVRO: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD],
 79        IngestionMappingKind.APACHEAVRO: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD],
 80        IngestionMappingKind.SSTREAM: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD],
 81        IngestionMappingKind.PARQUET: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD],
 82        IngestionMappingKind.ORC: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD],
 83        IngestionMappingKind.W3CLOGFILE: [CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD],
 84    }
 85
 86    CONSTANT_TRANSFORMATION_METHODS = [TransformationMethod.SOURCE_LOCATION.value, TransformationMethod.SOURCE_LINE_NUMBER.value]
 87
 88    # TODO - add safe and convenient ctors, like in node
 89    def __init__(
 90        self,
 91        column_name: str,
 92        column_type: str,
 93        path: str = None,
 94        transform: TransformationMethod = TransformationMethod.NONE,
 95        ordinal: int = None,
 96        const_value: str = None,
 97        field=None,
 98        columns=None,
 99        storage_data_type=None,
100    ):
101        """
102        :param columns: Deprecated. Columns is not used anymore.
103        :param storage_data_type: Deprecated. StorageDataType is not used anymore.
104        """
105        self.column = column_name
106        self.datatype = column_type
107        self.properties = {}
108        if path:
109            self.properties[self.PATH] = path
110        if transform != TransformationMethod.NONE:
111            self.properties[self.TRANSFORMATION_METHOD] = transform.value
112        if ordinal is not None:
113            self.properties[self.ORDINAL] = str(ordinal)
114        if const_value:
115            self.properties[self.CONST_VALUE] = const_value
116        if field:
117            self.properties[self.FIELD_NAME] = field
118
119    def is_valid(self, kind: IngestionMappingKind) -> (bool, List[str]):
120        if not self.column:
121            return False, ["Column name is required"]
122
123        results = []
124
125        needed_props = self.NEEDED_PROPERTIES[kind]
126
127        if all(prop not in self.properties for prop in needed_props):
128            results.append(f"{kind} needs at least one of the required properties: {needed_props}")
129
130        if self.properties.get(self.TRANSFORMATION_METHOD):
131            if (self.properties.get(self.PATH) or self.properties.get(self.FIELD_NAME)) and self.properties.get(
132                self.TRANSFORMATION_METHOD
133            ) in self.CONSTANT_TRANSFORMATION_METHODS:
134                results.append(
135                    f"When specifying {self.PATH} or {self.FIELD_NAME}, {self.TRANSFORMATION_METHOD} must not be one of "
136                    f"{','.join(str(x) for x in self.CONSTANT_TRANSFORMATION_METHODS)}, not {self.properties.get(self.TRANSFORMATION_METHOD)}."
137                )
138
139            if (not self.properties.get(self.PATH) and not self.properties.get(self.FIELD_NAME)) and self.properties.get(
140                self.TRANSFORMATION_METHOD
141            ) not in self.CONSTANT_TRANSFORMATION_METHODS:
142                results.append(
143                    f"When not specifying {self.PATH} or {self.FIELD_NAME}, {self.TRANSFORMATION_METHOD} must be one of"
144                    f" {','.join(str(x) for x in self.CONSTANT_TRANSFORMATION_METHODS)}, not {self.properties.get(self.TRANSFORMATION_METHOD)}."
145                )
146
147        return not bool(results), results

Use this class to create mappings for IngestionProperties.ingestionMappings and utilize mappings that were not pre-created (it is recommended to create the mappings in advance and use ingestionMappingReference). To read more about mappings look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings

ColumnMapping( column_name: str, column_type: str, path: str = None, transform: TransformationMethod = <TransformationMethod.NONE: 'None'>, ordinal: int = None, const_value: str = None, field=None, columns=None, storage_data_type=None)
 89    def __init__(
 90        self,
 91        column_name: str,
 92        column_type: str,
 93        path: str = None,
 94        transform: TransformationMethod = TransformationMethod.NONE,
 95        ordinal: int = None,
 96        const_value: str = None,
 97        field=None,
 98        columns=None,
 99        storage_data_type=None,
100    ):
101        """
102        :param columns: Deprecated. Columns is not used anymore.
103        :param storage_data_type: Deprecated. StorageDataType is not used anymore.
104        """
105        self.column = column_name
106        self.datatype = column_type
107        self.properties = {}
108        if path:
109            self.properties[self.PATH] = path
110        if transform != TransformationMethod.NONE:
111            self.properties[self.TRANSFORMATION_METHOD] = transform.value
112        if ordinal is not None:
113            self.properties[self.ORDINAL] = str(ordinal)
114        if const_value:
115            self.properties[self.CONST_VALUE] = const_value
116        if field:
117            self.properties[self.FIELD_NAME] = field
Parameters
  • columns: Deprecated. Columns is not used anymore.
  • storage_data_type: Deprecated. StorageDataType is not used anymore.
PATH = 'Path'
TRANSFORMATION_METHOD = 'Transform'
ORDINAL = 'Ordinal'
CONST_VALUE = 'ConstValue'
FIELD_NAME = 'Field'
NEEDED_PROPERTIES: Dict[azure.kusto.data.data_format.IngestionMappingKind, List[str]] = {<IngestionMappingKind.CSV: 'Csv'>: ['Ordinal', 'ConstValue', 'Transform'], <IngestionMappingKind.JSON: 'Json'>: ['Path', 'ConstValue', 'Transform'], <IngestionMappingKind.AVRO: 'Avro'>: ['Path', 'ConstValue', 'Field', 'Transform'], <IngestionMappingKind.APACHEAVRO: 'ApacheAvro'>: ['Path', 'ConstValue', 'Field', 'Transform'], <IngestionMappingKind.SSTREAM: 'SStream'>: ['Path', 'ConstValue', 'Field', 'Transform'], <IngestionMappingKind.PARQUET: 'Parquet'>: ['Path', 'ConstValue', 'Field', 'Transform'], <IngestionMappingKind.ORC: 'Orc'>: ['Path', 'ConstValue', 'Field', 'Transform'], <IngestionMappingKind.W3CLOGFILE: 'W3CLogFile'>: ['ConstValue', 'Field', 'Transform']}
CONSTANT_TRANSFORMATION_METHODS = ['SourceLocation', 'SourceLineNumber']
column
datatype
properties
def is_valid( self, kind: azure.kusto.data.data_format.IngestionMappingKind) -> (<class 'bool'>, typing.List[str]):
119    def is_valid(self, kind: IngestionMappingKind) -> (bool, List[str]):
120        if not self.column:
121            return False, ["Column name is required"]
122
123        results = []
124
125        needed_props = self.NEEDED_PROPERTIES[kind]
126
127        if all(prop not in self.properties for prop in needed_props):
128            results.append(f"{kind} needs at least one of the required properties: {needed_props}")
129
130        if self.properties.get(self.TRANSFORMATION_METHOD):
131            if (self.properties.get(self.PATH) or self.properties.get(self.FIELD_NAME)) and self.properties.get(
132                self.TRANSFORMATION_METHOD
133            ) in self.CONSTANT_TRANSFORMATION_METHODS:
134                results.append(
135                    f"When specifying {self.PATH} or {self.FIELD_NAME}, {self.TRANSFORMATION_METHOD} must not be one of "
136                    f"{','.join(str(x) for x in self.CONSTANT_TRANSFORMATION_METHODS)}, not {self.properties.get(self.TRANSFORMATION_METHOD)}."
137                )
138
139            if (not self.properties.get(self.PATH) and not self.properties.get(self.FIELD_NAME)) and self.properties.get(
140                self.TRANSFORMATION_METHOD
141            ) not in self.CONSTANT_TRANSFORMATION_METHODS:
142                results.append(
143                    f"When not specifying {self.PATH} or {self.FIELD_NAME}, {self.TRANSFORMATION_METHOD} must be one of"
144                    f" {','.join(str(x) for x in self.CONSTANT_TRANSFORMATION_METHODS)}, not {self.properties.get(self.TRANSFORMATION_METHOD)}."
145                )
146
147        return not bool(results), results
class TransformationMethod(enum.Enum):
48class TransformationMethod(Enum):
49    """Transformations to configure over json column mapping
50    To read more about mapping transformations look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings#mapping-transformations"""
51
52    NONE = "None"
53    PROPERTY_BAG_ARRAY_TO_DICTIONARY = ("PropertyBagArrayToDictionary",)
54    SOURCE_LOCATION = "SourceLocation"
55    SOURCE_LINE_NUMBER = "SourceLineNumber"
56    GET_PATH_ELEMENT = "GetPathElement"
57    UNKNOWN_ERROR = "UnknownMethod"
58    DATE_TIME_FROM_UNIX_SECONDS = "DateTimeFromUnixSeconds"
59    DATE_TIME_FROM_UNIX_MILLISECONDS = "DateTimeFromUnixMilliseconds"
60    DATE_TIME_FROM_UNIX_MICROSECONDS = "DateTimeFromUnixMicroseconds"
61    DATE_TIME_FROM_UNIX_NANOSECONDS = "DateTimeFromUnixNanoseconds"

Transformations to configure over json column mapping To read more about mapping transformations look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings#mapping-transformations

NONE = <TransformationMethod.NONE: 'None'>
PROPERTY_BAG_ARRAY_TO_DICTIONARY = <TransformationMethod.PROPERTY_BAG_ARRAY_TO_DICTIONARY: ('PropertyBagArrayToDictionary',)>
SOURCE_LOCATION = <TransformationMethod.SOURCE_LOCATION: 'SourceLocation'>
SOURCE_LINE_NUMBER = <TransformationMethod.SOURCE_LINE_NUMBER: 'SourceLineNumber'>
GET_PATH_ELEMENT = <TransformationMethod.GET_PATH_ELEMENT: 'GetPathElement'>
UNKNOWN_ERROR = <TransformationMethod.UNKNOWN_ERROR: 'UnknownMethod'>
DATE_TIME_FROM_UNIX_SECONDS = <TransformationMethod.DATE_TIME_FROM_UNIX_SECONDS: 'DateTimeFromUnixSeconds'>
DATE_TIME_FROM_UNIX_MILLISECONDS = <TransformationMethod.DATE_TIME_FROM_UNIX_MILLISECONDS: 'DateTimeFromUnixMilliseconds'>
DATE_TIME_FROM_UNIX_MICROSECONDS = <TransformationMethod.DATE_TIME_FROM_UNIX_MICROSECONDS: 'DateTimeFromUnixMicroseconds'>
DATE_TIME_FROM_UNIX_NANOSECONDS = <TransformationMethod.DATE_TIME_FROM_UNIX_NANOSECONDS: 'DateTimeFromUnixNanoseconds'>
class ManagedStreamingIngestClient(ingest.BaseIngestClient):
 27class ManagedStreamingIngestClient(BaseIngestClient):
 28    """
 29    Managed Streaming Ingestion Client.
 30    Will try to ingest with streaming, but if it fails, will fall back to queued ingestion.
 31    Each transient failure will be retried with exponential backoff.
 32
 33    Managed streaming ingest client will fall back to queued if:
 34        - Multiple transient errors were encountered when trying to do streaming ingestion
 35        - The ingestion is too large for streaming ingestion (over 4MB)
 36        - The ingestion is directly from a blob
 37    """
 38
 39    MAX_STREAMING_SIZE_IN_BYTES = 4 * 1024 * 1024
 40
 41    def __init__(
 42        self,
 43        engine_kcsb: Union[KustoConnectionStringBuilder, str],
 44        dm_kcsb: Union[KustoConnectionStringBuilder, str, None] = None,
 45        auto_correct_endpoint: bool = True,
 46    ):
 47        super().__init__()
 48        self.queued_client = QueuedIngestClient(dm_kcsb if dm_kcsb is not None else engine_kcsb, auto_correct_endpoint)
 49        self.streaming_client = KustoStreamingIngestClient(engine_kcsb, auto_correct_endpoint)
 50        self._set_retry_settings()
 51
 52    def close(self) -> None:
 53        if not self._is_closed:
 54            self.queued_client.close()
 55            self.streaming_client.close()
 56        super().close()
 57
 58    def _set_retry_settings(self, max_seconds_per_retry: float = _utils.MAX_WAIT, num_of_attempts: int = 3):
 59        self._num_of_attempts = num_of_attempts
 60        self._max_seconds_per_retry = max_seconds_per_retry
 61
 62    def set_proxy(self, proxy_url: str):
 63        self.queued_client.set_proxy(proxy_url)
 64        self.streaming_client.set_proxy(proxy_url)
 65
 66    @distributed_trace(kind=SpanKind.CLIENT)
 67    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
 68        file_descriptor = FileDescriptor.get_instance(file_descriptor)
 69        IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
 70
 71        super().ingest_from_file(file_descriptor, ingestion_properties)
 72
 73        stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor)
 74
 75        with stream_descriptor.stream:
 76            return self.ingest_from_stream(stream_descriptor, ingestion_properties)
 77
 78    @distributed_trace(kind=SpanKind.CLIENT)
 79    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
 80        stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
 81        IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
 82
 83        super().ingest_from_stream(stream_descriptor, ingestion_properties)
 84
 85        stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
 86        stream = stream_descriptor.stream
 87
 88        buffered_stream = read_until_size_or_end(stream, self.MAX_STREAMING_SIZE_IN_BYTES + 1)
 89        length = len(buffered_stream.getbuffer())
 90
 91        stream_descriptor.stream = buffered_stream
 92
 93        try:
 94            res = self._stream_with_retries(length, stream_descriptor, ingestion_properties)
 95            if res:
 96                return res
 97            stream_descriptor.stream = chain_streams([buffered_stream, stream])
 98        except KustoApiError as ex:
 99            error = ex.get_api_error()
100            if error.permanent:
101                raise
102            buffered_stream.seek(0, SEEK_SET)
103        except KustoThrottlingError:
104            _ = buffered_stream.seek(0, SEEK_SET)
105
106        return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties)
107
108    @distributed_trace(kind=SpanKind.CLIENT)
109    def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties):
110        """
111        Enqueue an ingest command from azure blobs.
112
113        For ManagedStreamingIngestClient, this method always uses Queued Ingest, since it would be easier and faster to ingest blobs.
114
115        To learn more about ingestion methods go to:
116        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
117        :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
118        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
119        """
120        IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
121
122        if self._is_closed:
123            raise KustoClosedError()
124        blob_descriptor.fill_size()
125        try:
126            res = self._stream_with_retries(blob_descriptor.size, blob_descriptor, ingestion_properties)
127            if res:
128                return res
129        except KustoApiError as ex:
130            error = ex.get_api_error()
131            if error.permanent:
132                raise
133        except KustoThrottlingError:
134            pass
135
136        return self.queued_client.ingest_from_blob(blob_descriptor, ingestion_properties)
137
138    def _stream_with_retries(
139        self,
140        length: int,
141        descriptor: DescriptorBase,
142        props: IngestionProperties,
143    ) -> Optional[IngestionResult]:
144        from_stream = isinstance(descriptor, StreamDescriptor)
145        if length > self.MAX_STREAMING_SIZE_IN_BYTES:
146            return None
147        for attempt in Retrying(stop=stop_after_attempt(self._num_of_attempts), wait=wait_random_exponential(max=self._max_seconds_per_retry), reraise=True):
148            with attempt:
149                client_request_id = ManagedStreamingIngestClient._get_request_id(descriptor.source_id, attempt.retry_state.attempt_number - 1)
150                # trace attempt to ingest from stream
151                if from_stream:
152                    descriptor.stream.seek(0, SEEK_SET)
153                    invoker = lambda: self.streaming_client._ingest_from_stream_with_client_request_id(descriptor, props, client_request_id)
154                else:
155                    invoker = lambda: self.streaming_client.ingest_from_blob(descriptor, props, client_request_id)
156                return MonitoredActivity.invoke(
157                    invoker,
158                    name_of_span="ManagedStreamingIngestClient.ingest_from_stream_attempt",
159                    tracing_attributes={"attemptNumber": attempt, "sourceIsStream": from_stream},
160                )
161
162    @staticmethod
163    def _get_request_id(source_id: uuid.UUID, attempt: int):
164        return f"KPC.executeManagedStreamingIngest;{source_id};{attempt}"

Managed Streaming Ingestion Client. Will try to ingest with streaming, but if it fails, will fall back to queued ingestion. Each transient failure will be retried with exponential backoff.

Managed streaming ingest client will fall back to queued if: - Multiple transient errors were encountered when trying to do streaming ingestion - The ingestion is too large for streaming ingestion (over 4MB) - The ingestion is directly from a blob

ManagedStreamingIngestClient( engine_kcsb: Union[azure.kusto.data.kcsb.KustoConnectionStringBuilder, str], dm_kcsb: Union[azure.kusto.data.kcsb.KustoConnectionStringBuilder, str, NoneType] = None, auto_correct_endpoint: bool = True)
41    def __init__(
42        self,
43        engine_kcsb: Union[KustoConnectionStringBuilder, str],
44        dm_kcsb: Union[KustoConnectionStringBuilder, str, None] = None,
45        auto_correct_endpoint: bool = True,
46    ):
47        super().__init__()
48        self.queued_client = QueuedIngestClient(dm_kcsb if dm_kcsb is not None else engine_kcsb, auto_correct_endpoint)
49        self.streaming_client = KustoStreamingIngestClient(engine_kcsb, auto_correct_endpoint)
50        self._set_retry_settings()
MAX_STREAMING_SIZE_IN_BYTES = 4194304
queued_client
streaming_client
def close(self) -> None:
52    def close(self) -> None:
53        if not self._is_closed:
54            self.queued_client.close()
55            self.streaming_client.close()
56        super().close()
def set_proxy(self, proxy_url: str):
62    def set_proxy(self, proxy_url: str):
63        self.queued_client.set_proxy(proxy_url)
64        self.streaming_client.set_proxy(proxy_url)

Set proxy for the ingestion client.

Parameters
  • str proxy_url: proxy url.
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_file( self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
66    @distributed_trace(kind=SpanKind.CLIENT)
67    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
68        file_descriptor = FileDescriptor.get_instance(file_descriptor)
69        IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
70
71        super().ingest_from_file(file_descriptor, ingestion_properties)
72
73        stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor)
74
75        with stream_descriptor.stream:
76            return self.ingest_from_stream(stream_descriptor, ingestion_properties)

Ingest from local files.

Parameters
  • file_descriptor: a FileDescriptor to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_stream( self, stream_descriptor: Union[StreamDescriptor, IO[~AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
 78    @distributed_trace(kind=SpanKind.CLIENT)
 79    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
 80        stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
 81        IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
 82
 83        super().ingest_from_stream(stream_descriptor, ingestion_properties)
 84
 85        stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
 86        stream = stream_descriptor.stream
 87
 88        buffered_stream = read_until_size_or_end(stream, self.MAX_STREAMING_SIZE_IN_BYTES + 1)
 89        length = len(buffered_stream.getbuffer())
 90
 91        stream_descriptor.stream = buffered_stream
 92
 93        try:
 94            res = self._stream_with_retries(length, stream_descriptor, ingestion_properties)
 95            if res:
 96                return res
 97            stream_descriptor.stream = chain_streams([buffered_stream, stream])
 98        except KustoApiError as ex:
 99            error = ex.get_api_error()
100            if error.permanent:
101                raise
102            buffered_stream.seek(0, SEEK_SET)
103        except KustoThrottlingError:
104            _ = buffered_stream.seek(0, SEEK_SET)
105
106        return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties)

Ingest from io streams.

Parameters
  • stream_descriptor: An object that contains a description of the stream to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_blob( self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties):
108    @distributed_trace(kind=SpanKind.CLIENT)
109    def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties):
110        """
111        Enqueue an ingest command from azure blobs.
112
113        For ManagedStreamingIngestClient, this method always uses Queued Ingest, since it would be easier and faster to ingest blobs.
114
115        To learn more about ingestion methods go to:
116        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
117        :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
118        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
119        """
120        IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
121
122        if self._is_closed:
123            raise KustoClosedError()
124        blob_descriptor.fill_size()
125        try:
126            res = self._stream_with_retries(blob_descriptor.size, blob_descriptor, ingestion_properties)
127            if res:
128                return res
129        except KustoApiError as ex:
130            error = ex.get_api_error()
131            if error.permanent:
132                raise
133        except KustoThrottlingError:
134            pass
135
136        return self.queued_client.ingest_from_blob(blob_descriptor, ingestion_properties)

Enqueue an ingest command from azure blobs.

For ManagedStreamingIngestClient, this method always uses Queued Ingest, since it would be easier and faster to ingest blobs.

To learn more about ingestion methods go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods

Parameters
  • azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
class KustoStreamingIngestClient(ingest.BaseIngestClient):
 18class KustoStreamingIngestClient(BaseIngestClient):
 19    """Kusto streaming ingest client for Python.
 20    KustoStreamingIngestClient works with both 2.x and 3.x flavors of Python.
 21    All primitive types are supported.
 22    Tests are run using pytest.
 23    """
 24
 25    def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str], auto_correct_endpoint: bool = True):
 26        """Kusto Streaming Ingest Client constructor.
 27        :param KustoConnectionStringBuilder kcsb: The connection string to initialize KustoClient.
 28        """
 29        super().__init__()
 30
 31        if isinstance(kcsb, str):
 32            kcsb = KustoConnectionStringBuilder(kcsb)
 33
 34        if auto_correct_endpoint:
 35            kcsb["Data Source"] = BaseIngestClient.get_query_endpoint(kcsb.data_source)
 36        self._kusto_client = KustoClient(kcsb)
 37
 38    def close(self):
 39        if not self._is_closed:
 40            self._kusto_client.close()
 41        super().close()
 42
 43    def set_proxy(self, proxy_url: str):
 44        self._kusto_client.set_proxy(proxy_url)
 45
 46    @distributed_trace(kind=SpanKind.CLIENT)
 47    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
 48        """Ingest from local files.
 49        :param file_descriptor: a FileDescriptor to be ingested.
 50        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
 51        """
 52        file_descriptor = FileDescriptor.get_instance(file_descriptor)
 53        IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
 54
 55        super().ingest_from_file(file_descriptor, ingestion_properties)
 56
 57        stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor)
 58
 59        with stream_descriptor.stream:
 60            return self.ingest_from_stream(stream_descriptor, ingestion_properties)
 61
 62    @distributed_trace(kind=SpanKind.CLIENT)
 63    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
 64        """Ingest from io streams.
 65        :param azure.kusto.ingest.StreamDescriptor stream_descriptor: An object that contains a description of the stream to
 66               be ingested.
 67        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
 68        """
 69        stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
 70        IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
 71
 72        super().ingest_from_stream(stream_descriptor, ingestion_properties)
 73
 74        return self._ingest_from_stream_with_client_request_id(stream_descriptor, ingestion_properties, None)
 75
 76    def _ingest_from_stream_with_client_request_id(
 77        self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties, client_request_id: Optional[str]
 78    ) -> IngestionResult:
 79        stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
 80        additional_properties = None
 81        if client_request_id:
 82            additional_properties = ClientRequestProperties()
 83            additional_properties.client_request_id = client_request_id
 84
 85        self._kusto_client.execute_streaming_ingest(
 86            ingestion_properties.database,
 87            ingestion_properties.table,
 88            stream_descriptor.stream,
 89            None,
 90            ingestion_properties.format.name,
 91            additional_properties,
 92            mapping_name=ingestion_properties.ingestion_mapping_reference,
 93        )
 94
 95        return IngestionResult(IngestionStatus.SUCCESS, ingestion_properties.database, ingestion_properties.table, stream_descriptor.source_id)
 96
 97    def ingest_from_blob(
 98        self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties, client_request_id: Optional[str] = None
 99    ) -> IngestionResult:
100        IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
101        additional_properties = None
102        if client_request_id:
103            additional_properties = ClientRequestProperties()
104            additional_properties.client_request_id = client_request_id
105
106        self._kusto_client.execute_streaming_ingest(
107            ingestion_properties.database,
108            ingestion_properties.table,
109            None,
110            blob_descriptor.path,
111            ingestion_properties.format.name,
112            additional_properties,
113            mapping_name=ingestion_properties.ingestion_mapping_reference,
114        )
115        return IngestionResult(IngestionStatus.SUCCESS, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id)

Kusto streaming ingest client for Python. KustoStreamingIngestClient works with both 2.x and 3.x flavors of Python. All primitive types are supported. Tests are run using pytest.

KustoStreamingIngestClient( kcsb: Union[azure.kusto.data.kcsb.KustoConnectionStringBuilder, str], auto_correct_endpoint: bool = True)
25    def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str], auto_correct_endpoint: bool = True):
26        """Kusto Streaming Ingest Client constructor.
27        :param KustoConnectionStringBuilder kcsb: The connection string to initialize KustoClient.
28        """
29        super().__init__()
30
31        if isinstance(kcsb, str):
32            kcsb = KustoConnectionStringBuilder(kcsb)
33
34        if auto_correct_endpoint:
35            kcsb["Data Source"] = BaseIngestClient.get_query_endpoint(kcsb.data_source)
36        self._kusto_client = KustoClient(kcsb)

Kusto Streaming Ingest Client constructor.

Parameters
  • KustoConnectionStringBuilder kcsb: The connection string to initialize KustoClient.
def close(self):
38    def close(self):
39        if not self._is_closed:
40            self._kusto_client.close()
41        super().close()
def set_proxy(self, proxy_url: str):
43    def set_proxy(self, proxy_url: str):
44        self._kusto_client.set_proxy(proxy_url)

Set proxy for the ingestion client.

Parameters
  • str proxy_url: proxy url.
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_file( self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
46    @distributed_trace(kind=SpanKind.CLIENT)
47    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
48        """Ingest from local files.
49        :param file_descriptor: a FileDescriptor to be ingested.
50        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
51        """
52        file_descriptor = FileDescriptor.get_instance(file_descriptor)
53        IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
54
55        super().ingest_from_file(file_descriptor, ingestion_properties)
56
57        stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor)
58
59        with stream_descriptor.stream:
60            return self.ingest_from_stream(stream_descriptor, ingestion_properties)

Ingest from local files.

Parameters
  • file_descriptor: a FileDescriptor to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
@distributed_trace(kind=SpanKind.CLIENT)
def ingest_from_stream( self, stream_descriptor: Union[StreamDescriptor, IO[~AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
62    @distributed_trace(kind=SpanKind.CLIENT)
63    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
64        """Ingest from io streams.
65        :param azure.kusto.ingest.StreamDescriptor stream_descriptor: An object that contains a description of the stream to
66               be ingested.
67        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
68        """
69        stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
70        IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
71
72        super().ingest_from_stream(stream_descriptor, ingestion_properties)
73
74        return self._ingest_from_stream_with_client_request_id(stream_descriptor, ingestion_properties, None)

Ingest from io streams.

Parameters
  • azure.kusto.ingest.StreamDescriptor stream_descriptor: An object that contains a description of the stream to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
def ingest_from_blob( self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties, client_request_id: Optional[str] = None) -> IngestionResult:
 97    def ingest_from_blob(
 98        self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties, client_request_id: Optional[str] = None
 99    ) -> IngestionResult:
100        IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
101        additional_properties = None
102        if client_request_id:
103            additional_properties = ClientRequestProperties()
104            additional_properties.client_request_id = client_request_id
105
106        self._kusto_client.execute_streaming_ingest(
107            ingestion_properties.database,
108            ingestion_properties.table,
109            None,
110            blob_descriptor.path,
111            ingestion_properties.format.name,
112            additional_properties,
113            mapping_name=ingestion_properties.ingestion_mapping_reference,
114        )
115        return IngestionResult(IngestionStatus.SUCCESS, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id)
class BaseIngestClient:
 74class BaseIngestClient(metaclass=ABCMeta):
 75    def __init__(self):
 76        self._is_closed: bool = False
 77
 78    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
 79        """Ingest from local files.
 80        :param file_descriptor: a FileDescriptor to be ingested.
 81        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
 82        """
 83        if self._is_closed:
 84            raise KustoClosedError()
 85
 86    @abstractmethod
 87    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
 88        """Ingest from io streams.
 89        :param stream_descriptor: An object that contains a description of the stream to be ingested.
 90        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
 91        """
 92        if self._is_closed:
 93            raise KustoClosedError()
 94
 95    @abstractmethod
 96    def set_proxy(self, proxy_url: str):
 97        """Set proxy for the ingestion client.
 98        :param str proxy_url: proxy url.
 99        """
100        if self._is_closed:
101            raise KustoClosedError()
102
103    def ingest_from_dataframe(
104        self, df: "pandas.DataFrame", ingestion_properties: IngestionProperties, data_format: Optional[DataFormat] = None
105    ) -> IngestionResult:
106        """Enqueue an ingest command from local files.
107        To learn more about ingestion methods go to:
108        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
109        :param pandas.DataFrame df: input dataframe to ingest.
110        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
111        :param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSON or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON.
112        """
113
114        if self._is_closed:
115            raise KustoClosedError()
116
117        from pandas import DataFrame
118
119        if not isinstance(df, DataFrame):
120            raise ValueError("Expected DataFrame instance, found {}".format(type(df)))
121
122        is_json = True
123
124        # If we are given CSV mapping, or the mapping format is explicitly set to CSV, we should use CSV
125        if not data_format:
126            if ingestion_properties is not None and (ingestion_properties.ingestion_mapping_type == IngestionMappingKind.CSV):
127                is_json = False
128        elif data_format == DataFormat.CSV:
129            is_json = False
130        elif data_format == DataFormat.JSON:
131            is_json = True
132        else:
133            raise ValueError("Unsupported format: {}. Supported formats are: CSV, JSON, None".format(data_format))
134
135        file_name = "df_{id}_{timestamp}_{uid}.{ext}.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4(), ext="json" if is_json else "csv")
136        temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
137        with gzip.open(temp_file_path, "wt", encoding="utf-8") as temp_file:
138            if is_json:
139                df.to_json(temp_file, orient="records", date_format="iso", lines=True)
140                ingestion_properties.format = DataFormat.JSON
141            else:
142                df.to_csv(temp_file, index=False, encoding="utf-8", header=False)
143                ingestion_properties.ignore_first_record = False
144                ingestion_properties.format = DataFormat.CSV
145
146        try:
147            return self.ingest_from_file(temp_file_path, ingestion_properties)
148        finally:
149            os.unlink(temp_file_path)
150
151    @staticmethod
152    def _prepare_stream(stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> StreamDescriptor:
153        """
154        Prepares a StreamDescriptor instance for ingest operation based on ingestion properties
155        :param StreamDescriptor stream_descriptor: Stream descriptor instance
156        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
157        :return prepared stream descriptor
158        """
159        new_descriptor = StreamDescriptor.get_instance(stream_descriptor)
160
161        if isinstance(new_descriptor.stream, TextIOWrapper):
162            new_descriptor.stream = new_descriptor.stream.buffer
163
164        should_compress = BaseIngestClient._should_compress(new_descriptor, ingestion_properties)
165        if should_compress:
166            new_descriptor.compress_stream()
167
168        return new_descriptor
169
170    @staticmethod
171    def _prepare_file(file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> Tuple[FileDescriptor, bool]:
172        """
173        Prepares a FileDescriptor instance for ingest operation based on ingestion properties
174        :param FileDescriptor file_descriptor: File descriptor instance
175        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
176        :return prepared file descriptor
177        """
178        descriptor = FileDescriptor.get_instance(file_descriptor)
179
180        should_compress = BaseIngestClient._should_compress(descriptor, ingestion_properties)
181        return descriptor, should_compress
182
183    @staticmethod
184    def _should_compress(new_descriptor: Union[FileDescriptor, StreamDescriptor], ingestion_properties: IngestionProperties) -> bool:
185        """
186        Checks if descriptor should be compressed based on ingestion properties and current format
187        """
188        return not new_descriptor.is_compressed and ingestion_properties.format.compressible
189
190    def close(self) -> None:
191        self._is_closed = True
192
193    def __enter__(self):
194        return self
195
196    def __exit__(self, exc_type, exc_val, exc_tb):
197        self.close()
198
199    @staticmethod
200    def get_ingestion_endpoint(cluster_url: str) -> str:
201        if INGEST_PREFIX in cluster_url or not cluster_url or BaseIngestClient.is_reserved_hostname(cluster_url):
202            return cluster_url
203        else:
204            return cluster_url.replace(PROTOCOL_SUFFIX, PROTOCOL_SUFFIX + INGEST_PREFIX, 1)
205
206    @staticmethod
207    def get_query_endpoint(cluster_url: str) -> str:
208        if INGEST_PREFIX in cluster_url:
209            return cluster_url.replace(INGEST_PREFIX, "", 1)
210        else:
211            return cluster_url
212
213    @staticmethod
214    def is_reserved_hostname(raw_uri: str) -> bool:
215        url = urlparse(raw_uri)
216        if not url.netloc:
217            return True
218        authority = url.netloc.split(":")[0]  # removes port if exists
219        try:
220            is_ip = ipaddress.ip_address(authority)
221        except ValueError:
222            is_ip = False
223        is_localhost = "localhost" in authority
224        return is_localhost or is_ip or authority.lower() == "onebox.dev.kusto.windows.net"
def ingest_from_file( self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
78    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
79        """Ingest from local files.
80        :param file_descriptor: a FileDescriptor to be ingested.
81        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
82        """
83        if self._is_closed:
84            raise KustoClosedError()

Ingest from local files.

Parameters
  • file_descriptor: a FileDescriptor to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
@abstractmethod
def ingest_from_stream( self, stream_descriptor: Union[StreamDescriptor, IO[~AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
86    @abstractmethod
87    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
88        """Ingest from io streams.
89        :param stream_descriptor: An object that contains a description of the stream to be ingested.
90        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
91        """
92        if self._is_closed:
93            raise KustoClosedError()

Ingest from io streams.

Parameters
  • stream_descriptor: An object that contains a description of the stream to be ingested.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
@abstractmethod
def set_proxy(self, proxy_url: str):
 95    @abstractmethod
 96    def set_proxy(self, proxy_url: str):
 97        """Set proxy for the ingestion client.
 98        :param str proxy_url: proxy url.
 99        """
100        if self._is_closed:
101            raise KustoClosedError()

Set proxy for the ingestion client.

Parameters
  • str proxy_url: proxy url.
def ingest_from_dataframe( self, df: pandas.core.frame.DataFrame, ingestion_properties: IngestionProperties, data_format: Optional[azure.kusto.data.data_format.DataFormat] = None) -> IngestionResult:
103    def ingest_from_dataframe(
104        self, df: "pandas.DataFrame", ingestion_properties: IngestionProperties, data_format: Optional[DataFormat] = None
105    ) -> IngestionResult:
106        """Enqueue an ingest command from local files.
107        To learn more about ingestion methods go to:
108        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
109        :param pandas.DataFrame df: input dataframe to ingest.
110        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
111        :param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSON or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON.
112        """
113
114        if self._is_closed:
115            raise KustoClosedError()
116
117        from pandas import DataFrame
118
119        if not isinstance(df, DataFrame):
120            raise ValueError("Expected DataFrame instance, found {}".format(type(df)))
121
122        is_json = True
123
124        # If we are given CSV mapping, or the mapping format is explicitly set to CSV, we should use CSV
125        if not data_format:
126            if ingestion_properties is not None and (ingestion_properties.ingestion_mapping_type == IngestionMappingKind.CSV):
127                is_json = False
128        elif data_format == DataFormat.CSV:
129            is_json = False
130        elif data_format == DataFormat.JSON:
131            is_json = True
132        else:
133            raise ValueError("Unsupported format: {}. Supported formats are: CSV, JSON, None".format(data_format))
134
135        file_name = "df_{id}_{timestamp}_{uid}.{ext}.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4(), ext="json" if is_json else "csv")
136        temp_file_path = os.path.join(tempfile.gettempdir(), file_name)
137        with gzip.open(temp_file_path, "wt", encoding="utf-8") as temp_file:
138            if is_json:
139                df.to_json(temp_file, orient="records", date_format="iso", lines=True)
140                ingestion_properties.format = DataFormat.JSON
141            else:
142                df.to_csv(temp_file, index=False, encoding="utf-8", header=False)
143                ingestion_properties.ignore_first_record = False
144                ingestion_properties.format = DataFormat.CSV
145
146        try:
147            return self.ingest_from_file(temp_file_path, ingestion_properties)
148        finally:
149            os.unlink(temp_file_path)

Enqueue an ingest command from local files. To learn more about ingestion methods go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods

Parameters
  • pandas.DataFrame df: input dataframe to ingest.
  • azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
  • DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSON or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON.
def close(self) -> None:
190    def close(self) -> None:
191        self._is_closed = True
@staticmethod
def get_ingestion_endpoint(cluster_url: str) -> str:
199    @staticmethod
200    def get_ingestion_endpoint(cluster_url: str) -> str:
201        if INGEST_PREFIX in cluster_url or not cluster_url or BaseIngestClient.is_reserved_hostname(cluster_url):
202            return cluster_url
203        else:
204            return cluster_url.replace(PROTOCOL_SUFFIX, PROTOCOL_SUFFIX + INGEST_PREFIX, 1)
@staticmethod
def get_query_endpoint(cluster_url: str) -> str:
206    @staticmethod
207    def get_query_endpoint(cluster_url: str) -> str:
208        if INGEST_PREFIX in cluster_url:
209            return cluster_url.replace(INGEST_PREFIX, "", 1)
210        else:
211            return cluster_url
@staticmethod
def is_reserved_hostname(raw_uri: str) -> bool:
213    @staticmethod
214    def is_reserved_hostname(raw_uri: str) -> bool:
215        url = urlparse(raw_uri)
216        if not url.netloc:
217            return True
218        authority = url.netloc.split(":")[0]  # removes port if exists
219        try:
220            is_ip = ipaddress.ip_address(authority)
221        except ValueError:
222            is_ip = False
223        is_localhost = "localhost" in authority
224        return is_localhost or is_ip or authority.lower() == "onebox.dev.kusto.windows.net"