ingest
1# Copyright (c) Microsoft Corporation. 2# Licensed under the MIT License 3from ._version import VERSION as __version__ 4from .base_ingest_client import IngestionResult, IngestionStatus 5from .descriptors import BlobDescriptor, FileDescriptor, StreamDescriptor 6from .exceptions import KustoMissingMappingError, KustoMappingError, KustoQueueError, KustoDuplicateMappingError, KustoInvalidEndpointError, KustoClientError 7from .ingest_client import QueuedIngestClient 8from .ingestion_properties import ( 9 ValidationPolicy, 10 ValidationImplications, 11 ValidationOptions, 12 ReportLevel, 13 ReportMethod, 14 IngestionProperties, 15 IngestionMappingKind, 16 ColumnMapping, 17 TransformationMethod, 18) 19from .managed_streaming_ingest_client import ManagedStreamingIngestClient 20from .streaming_ingest_client import KustoStreamingIngestClient 21from .base_ingest_client import BaseIngestClient 22 23__all__ = [ 24 "__version__", 25 "IngestionResult", 26 "IngestionStatus", 27 "BlobDescriptor", 28 "FileDescriptor", 29 "StreamDescriptor", 30 "KustoMissingMappingError", 31 "KustoMappingError", 32 "KustoQueueError", 33 "KustoDuplicateMappingError", 34 "KustoInvalidEndpointError", 35 "KustoClientError", 36 "QueuedIngestClient", 37 "ValidationPolicy", 38 "ValidationImplications", 39 "ValidationOptions", 40 "ReportLevel", 41 "ReportMethod", 42 "IngestionProperties", 43 "IngestionMappingKind", 44 "ColumnMapping", 45 "TransformationMethod", 46 "ManagedStreamingIngestClient", 47 "KustoStreamingIngestClient", 48 "BaseIngestClient", 49]
38class IngestionResult: 39 """ 40 The result of an ingestion. 41 """ 42 43 status: IngestionStatus 44 "Will be `Queued` if the ingestion is queued, or `Success` if the ingestion is streaming and successful." 45 46 database: str 47 "The name of the database where the ingestion was performed." 48 49 table: str 50 "The name of the table where the ingestion was performed." 51 52 source_id: uuid.UUID 53 "The source id of the ingestion." 54 55 blob_uri: Optional[str] 56 "The blob uri of the ingestion, if exists." 57 58 def __init__(self, status: IngestionStatus, database: str, table: str, source_id: uuid.UUID, blob_uri: Optional[str] = None): 59 self.status = status 60 self.database = database 61 self.table = table 62 self.source_id = source_id 63 self.blob_uri = blob_uri 64 65 def __repr__(self): 66 # Remove query parameters from blob_uri, if exists 67 obfuscated_path = None 68 if isinstance(self.blob_uri, str): 69 obfuscated_path = self.blob_uri.split("?")[0].split(";")[0] 70 blob_uri = f", obfuscated_blob_uri={obfuscated_path}" if obfuscated_path else "" 71 return f"IngestionResult(status={self.status}, database={self.database}, table={self.table}, source_id={self.source_id}{blob_uri})"
The result of an ingestion.
Will be Queued if the ingestion is queued, or Success if the ingestion is streaming and successful.
26class IngestionStatus(Enum): 27 """ 28 The ingestion was queued. 29 """ 30 31 QUEUED = "QUEUED" 32 """ 33 The ingestion was successfully streamed 34 """ 35 SUCCESS = "SUCCESS"
The ingestion was queued.
131class BlobDescriptor(DescriptorBase): 132 """BlobDescriptor is used to describe a blob that will be used as an ingestion source""" 133 134 _BLOB_URI = "blob_uri" 135 136 def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None): 137 """ 138 :param path: blob uri. 139 :type path: str. 140 :param size: estimated size of file if known. 141 :type size: Optional[int]. 142 :param source_id: a v4 uuid to serve as the source's id. 143 :type source_id: OptionalUUID 144 """ 145 self.path: str = path 146 self.size: Optional[int] = size 147 self.source_id: uuid.UUID = ensure_uuid(source_id) 148 149 def get_tracing_attributes(self) -> dict: 150 # Remove query parameters from self.path, if exists 151 if self.path: 152 obfuscated_path = self.path.split("?")[0].split(";")[0] 153 return {self._BLOB_URI: obfuscated_path, self._SOURCE_ID: str(self.source_id)} 154 155 def fill_size(self): 156 if not self.size: 157 self.size = BlobClient.from_blob_url(self.path).get_blob_properties().size
BlobDescriptor is used to describe a blob that will be used as an ingestion source
136 def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None): 137 """ 138 :param path: blob uri. 139 :type path: str. 140 :param size: estimated size of file if known. 141 :type size: Optional[int]. 142 :param source_id: a v4 uuid to serve as the source's id. 143 :type source_id: OptionalUUID 144 """ 145 self.path: str = path 146 self.size: Optional[int] = size 147 self.source_id: uuid.UUID = ensure_uuid(source_id)
Parameters
- path: blob uri.
- size: estimated size of file if known.
- source_id: a v4 uuid to serve as the source's id.
149 def get_tracing_attributes(self) -> dict: 150 # Remove query parameters from self.path, if exists 151 if self.path: 152 obfuscated_path = self.path.split("?")[0].split(";")[0] 153 return {self._BLOB_URI: obfuscated_path, self._SOURCE_ID: str(self.source_id)}
Gets dictionary of attributes to be documented during tracing
41class FileDescriptor(DescriptorBase): 42 """FileDescriptor is used to describe a file that will be used as an ingestion source.""" 43 44 # Gzip keeps the decompressed stream size as a UINT32 in the last 4 bytes of the stream, however this poses a limit to the expressed size which is 4GB 45 # The standard says that when the size is bigger than 4GB, the UINT rolls over. 46 # The below constant expresses the maximal size of a compressed stream that will not cause the UINT32 to rollover given a maximal compression ratio of 1:40 47 GZIP_MAX_DISK_SIZE_FOR_DETECTION = int(4 * 1024 * 1024 * 1024 / 40) 48 DEFAULT_COMPRESSION_RATIO = 11 49 _FILE_PATH = "file_path" 50 51 def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None): 52 """ 53 :param path: file path. 54 :type path: str. 55 :param size: estimated size of file if known. if None or 0 will try to guess. 56 :type size: Optional[int]. 57 :param source_id: a v4 uuid to serve as the source's id. 58 :type source_id: OptionalUUID 59 """ 60 self.path: str = path 61 self._size: Optional[int] = size 62 self._detect_size_once: bool = not size 63 64 self.source_id: uuid.UUID = ensure_uuid(source_id) 65 self.stream_name: str = os.path.basename(self.path) 66 67 @property 68 def size(self) -> int: 69 if self._detect_size_once: 70 self._detect_size() 71 self._detect_size_once = False 72 73 return self._size 74 75 @size.setter 76 def size(self, size: int): 77 if size: 78 self._size = size 79 self._detect_size_once = False 80 81 def _detect_size(self): 82 uncompressed_size = 0 83 if self.path.endswith(".gz"): 84 # This logic follow after the C# implementation 85 # See IngstionHelpers.cs for an explanation as to what stands behind it 86 with open(self.path, "rb") as f: 87 disk_size = f.seek(-4, SEEK_END) 88 uncompressed_size = struct.unpack("I", f.read(4))[0] 89 if (disk_size >= uncompressed_size) or (disk_size >= self.GZIP_MAX_DISK_SIZE_FOR_DETECTION): 90 uncompressed_size = disk_size * self.DEFAULT_COMPRESSION_RATIO 91 92 elif self.path.endswith(".zip"): 93 with ZipFile(self.path) as zip_archive: 94 for f in zip_archive.infolist(): 95 uncompressed_size += f.file_size 96 97 else: 98 uncompressed_size = os.path.getsize(self.path) 99 100 self._size = uncompressed_size 101 102 @property 103 def is_compressed(self) -> bool: 104 return self.path.endswith(".gz") or self.path.endswith(".zip") 105 106 def open(self, should_compress: bool) -> BytesIO: 107 if should_compress: 108 file_stream = self.compress_stream() 109 else: 110 file_stream = open(self.path, "rb") 111 return file_stream 112 113 def compress_stream(self) -> BytesIO: 114 self.stream_name += ".gz" 115 file_stream = BytesIO() 116 with open(self.path, "rb") as f_in, GzipFile(filename="data", fileobj=file_stream, mode="wb") as f_out: 117 shutil.copyfileobj(f_in, f_out) 118 file_stream.seek(0) 119 return file_stream 120 121 def get_tracing_attributes(self) -> dict: 122 return {self._FILE_PATH: self.stream_name, self._SOURCE_ID: str(self.source_id)} 123 124 @classmethod 125 def get_instance(cls, file_descriptor: Union["FileDescriptor", str]) -> "FileDescriptor": 126 if not isinstance(file_descriptor, cls): 127 return cls(file_descriptor) 128 return file_descriptor
FileDescriptor is used to describe a file that will be used as an ingestion source.
51 def __init__(self, path: str, size: Optional[int] = None, source_id: OptionalUUID = None): 52 """ 53 :param path: file path. 54 :type path: str. 55 :param size: estimated size of file if known. if None or 0 will try to guess. 56 :type size: Optional[int]. 57 :param source_id: a v4 uuid to serve as the source's id. 58 :type source_id: OptionalUUID 59 """ 60 self.path: str = path 61 self._size: Optional[int] = size 62 self._detect_size_once: bool = not size 63 64 self.source_id: uuid.UUID = ensure_uuid(source_id) 65 self.stream_name: str = os.path.basename(self.path)
Parameters
- path: file path.
- size: estimated size of file if known. if None or 0 will try to guess.
- source_id: a v4 uuid to serve as the source's id.
121 def get_tracing_attributes(self) -> dict: 122 return {self._FILE_PATH: self.stream_name, self._SOURCE_ID: str(self.source_id)}
Gets dictionary of attributes to be documented during tracing
160class StreamDescriptor(DescriptorBase): 161 """StreamDescriptor is used to describe a stream that will be used as ingestion source""" 162 163 _STREAM_NAME = "stream_name" 164 165 # TODO: currently we always assume that streams are gz compressed (will get compressed before sending), should we expand that? 166 def __init__( 167 self, stream: IO[AnyStr], source_id: OptionalUUID = None, is_compressed: bool = False, stream_name: Optional[str] = None, size: Optional[int] = None 168 ): 169 """ 170 :param stream: in-memory stream object. 171 :type stream: io.BaseIO 172 :param source_id: a v4 uuid to serve as the sources id. 173 :type source_id: OptionalUUID 174 :param is_compressed: specify if the provided stream is compressed 175 :type is_compressed: boolean 176 """ 177 self.stream: IO[AnyStr] = stream 178 self.source_id: uuid.UUID = ensure_uuid(source_id) 179 self.is_compressed: bool = is_compressed 180 self.stream_name: str = stream_name 181 if self.stream_name is None: 182 self.stream_name = "stream" 183 if is_compressed: 184 self.stream_name += ".gz" 185 self.size: Optional[int] = size 186 187 def compress_stream(self) -> None: 188 stream = self.stream 189 zipped_stream = BytesIO() 190 stream_buffer = stream.read() 191 with GzipFile(filename="data", fileobj=zipped_stream, mode="wb") as f_out: 192 if isinstance(stream_buffer, str): 193 data = bytes(stream_buffer, "utf-8") 194 f_out.write(data) 195 else: 196 f_out.write(stream_buffer) 197 zipped_stream.seek(0) 198 self.is_compressed = True 199 self.stream_name += ".gz" 200 self.stream = zipped_stream 201 202 @staticmethod 203 def from_file_descriptor(file_descriptor: Union[FileDescriptor, str]) -> "StreamDescriptor": 204 """ 205 Transforms FileDescriptor instance into StreamDescriptor instance. Note that stream is open when instance is returned 206 :param Union[FileDescriptor, str] file_descriptor: File Descriptor instance 207 :return new StreamDescriptor instance 208 """ 209 descriptor = FileDescriptor.get_instance(file_descriptor) 210 stream = open(descriptor.path, "rb") 211 is_compressed = descriptor.path.endswith(".gz") or descriptor.path.endswith(".zip") 212 stream_descriptor = StreamDescriptor(stream, descriptor.source_id, is_compressed, descriptor.stream_name, descriptor.size) 213 return stream_descriptor 214 215 @classmethod 216 def get_instance(cls, stream_descriptor: Union["StreamDescriptor", IO[AnyStr]]) -> "StreamDescriptor": 217 if not isinstance(stream_descriptor, cls): 218 descriptor = cls(stream_descriptor) 219 else: 220 descriptor = copy(stream_descriptor) 221 return descriptor 222 223 def get_tracing_attributes(self) -> dict: 224 return {self._STREAM_NAME: self.stream_name, self._SOURCE_ID: str(self.source_id)}
StreamDescriptor is used to describe a stream that will be used as ingestion source
166 def __init__( 167 self, stream: IO[AnyStr], source_id: OptionalUUID = None, is_compressed: bool = False, stream_name: Optional[str] = None, size: Optional[int] = None 168 ): 169 """ 170 :param stream: in-memory stream object. 171 :type stream: io.BaseIO 172 :param source_id: a v4 uuid to serve as the sources id. 173 :type source_id: OptionalUUID 174 :param is_compressed: specify if the provided stream is compressed 175 :type is_compressed: boolean 176 """ 177 self.stream: IO[AnyStr] = stream 178 self.source_id: uuid.UUID = ensure_uuid(source_id) 179 self.is_compressed: bool = is_compressed 180 self.stream_name: str = stream_name 181 if self.stream_name is None: 182 self.stream_name = "stream" 183 if is_compressed: 184 self.stream_name += ".gz" 185 self.size: Optional[int] = size
Parameters
- stream: in-memory stream object.
- source_id: a v4 uuid to serve as the sources id.
- is_compressed: specify if the provided stream is compressed
187 def compress_stream(self) -> None: 188 stream = self.stream 189 zipped_stream = BytesIO() 190 stream_buffer = stream.read() 191 with GzipFile(filename="data", fileobj=zipped_stream, mode="wb") as f_out: 192 if isinstance(stream_buffer, str): 193 data = bytes(stream_buffer, "utf-8") 194 f_out.write(data) 195 else: 196 f_out.write(stream_buffer) 197 zipped_stream.seek(0) 198 self.is_compressed = True 199 self.stream_name += ".gz" 200 self.stream = zipped_stream
202 @staticmethod 203 def from_file_descriptor(file_descriptor: Union[FileDescriptor, str]) -> "StreamDescriptor": 204 """ 205 Transforms FileDescriptor instance into StreamDescriptor instance. Note that stream is open when instance is returned 206 :param Union[FileDescriptor, str] file_descriptor: File Descriptor instance 207 :return new StreamDescriptor instance 208 """ 209 descriptor = FileDescriptor.get_instance(file_descriptor) 210 stream = open(descriptor.path, "rb") 211 is_compressed = descriptor.path.endswith(".gz") or descriptor.path.endswith(".zip") 212 stream_descriptor = StreamDescriptor(stream, descriptor.source_id, is_compressed, descriptor.stream_name, descriptor.size) 213 return stream_descriptor
Transforms FileDescriptor instance into StreamDescriptor instance. Note that stream is open when instance is returned
Parameters
- Union[FileDescriptor, str] file_descriptor: File Descriptor instance :return new StreamDescriptor instance
24class KustoMissingMappingError(KustoClientError): 25 """ 26 Raised when provided a mapping kind without a mapping reference or column mapping. 27 """
Raised when provided a mapping kind without a mapping reference or column mapping.
7class KustoMappingError(KustoClientError): 8 """ 9 Raised when the provided mapping arguments are invalid. 10 """
Raised when the provided mapping arguments are invalid.
40class KustoQueueError(KustoClientError): 41 """Raised when not succeeding to upload message to queue in all retries""" 42 43 def __init__(self): 44 message = "Failed to upload message to queues in all reties." 45 super(KustoQueueError, self).__init__(message)
Raised when not succeeding to upload message to queue in all retries
13class KustoDuplicateMappingError(KustoClientError): 14 """ 15 Raised when ingestion properties include both 16 column mappings and a mapping reference 17 """ 18 19 def __init__(self): 20 message = "Ingestion properties can't contain both an explicit mapping and a mapping reference." 21 super(KustoDuplicateMappingError, self).__init__(message)
Raised when ingestion properties include both column mappings and a mapping reference
30class KustoInvalidEndpointError(KustoClientError): 31 """Raised when trying to ingest to invalid cluster type.""" 32 33 def __init__(self, expected_service_type, actual_service_type, suggested_endpoint_url=None): 34 message = f"You are using '{expected_service_type}' client type, but the provided endpoint is of ServiceType '{actual_service_type}'. Initialize the client with the appropriate endpoint URI" 35 if suggested_endpoint_url: 36 message = message + ": '" + suggested_endpoint_url + "'" 37 super(KustoInvalidEndpointError, self).__init__(message)
Raised when trying to ingest to invalid cluster type.
33 def __init__(self, expected_service_type, actual_service_type, suggested_endpoint_url=None): 34 message = f"You are using '{expected_service_type}' client type, but the provided endpoint is of ServiceType '{actual_service_type}'. Initialize the client with the appropriate endpoint URI" 35 if suggested_endpoint_url: 36 message = message + ": '" + suggested_endpoint_url + "'" 37 super(KustoInvalidEndpointError, self).__init__(message)
148class KustoClientError(KustoError): 149 """Raised when a Kusto client is unable to send or complete a request."""
Raised when a Kusto client is unable to send or complete a request.
26class QueuedIngestClient(BaseIngestClient): 27 """ 28 Queued ingest client provides methods to allow queued ingestion into kusto (ADX). 29 To learn more about the different types of ingestions and when to use each, visit: 30 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 31 """ 32 33 _INGEST_PREFIX = "ingest-" 34 _SERVICE_CLIENT_TIMEOUT_SECONDS = 10 * 60 35 _MAX_RETRIES = 3 36 37 def __init__(self, kcsb: Union[str, KustoConnectionStringBuilder], auto_correct_endpoint: bool = True): 38 """Kusto Ingest Client constructor. 39 :param kcsb: The connection string to initialize KustoClient. 40 """ 41 super().__init__() 42 if not isinstance(kcsb, KustoConnectionStringBuilder): 43 kcsb = KustoConnectionStringBuilder(kcsb) 44 45 if auto_correct_endpoint: 46 kcsb["Data Source"] = BaseIngestClient.get_ingestion_endpoint(kcsb.data_source) 47 48 self._proxy_dict: Optional[Dict[str, str]] = None 49 self._connection_datasource = kcsb.data_source 50 self._resource_manager = _ResourceManager(KustoClient(kcsb)) 51 self._endpoint_service_type = None 52 self._suggested_endpoint_uri = None 53 self.application_for_tracing = kcsb.client_details.application_for_tracing 54 self.client_version_for_tracing = kcsb.client_details.version_for_tracing 55 56 def close(self) -> None: 57 self._resource_manager.close() 58 super().close() 59 60 def set_proxy(self, proxy_url: str): 61 self._resource_manager.set_proxy(proxy_url) 62 self._proxy_dict = {"http": proxy_url, "https": proxy_url} 63 64 @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_file", kind=SpanKind.CLIENT) 65 def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult: 66 """Enqueue an ingest command from local files. 67 To learn more about ingestion methods go to: 68 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 69 :param file_descriptor: a FileDescriptor to be ingested. 70 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 71 """ 72 file_descriptor = FileDescriptor.get_instance(file_descriptor) 73 IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties) 74 75 super().ingest_from_file(file_descriptor, ingestion_properties) 76 77 containers = self._get_containers() 78 79 file_descriptor, should_compress = BaseIngestClient._prepare_file(file_descriptor, ingestion_properties) 80 with file_descriptor.open(should_compress) as stream: 81 blob_descriptor = self.upload_blob( 82 containers, 83 file_descriptor, 84 ingestion_properties.database, 85 ingestion_properties.table, 86 stream, 87 self._proxy_dict, 88 self._SERVICE_CLIENT_TIMEOUT_SECONDS, 89 self._MAX_RETRIES, 90 ) 91 return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties) 92 93 @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_stream", kind=SpanKind.CLIENT) 94 def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult: 95 """Ingest from io streams. 96 :param stream_descriptor: An object that contains a description of the stream to be ingested. 97 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 98 """ 99 stream_descriptor = StreamDescriptor.get_instance(stream_descriptor) 100 IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties) 101 102 super().ingest_from_stream(stream_descriptor, ingestion_properties) 103 104 containers = self._get_containers() 105 106 stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties) 107 blob_descriptor = self.upload_blob( 108 containers, 109 stream_descriptor, 110 ingestion_properties.database, 111 ingestion_properties.table, 112 stream_descriptor.stream, 113 self._proxy_dict, 114 self._SERVICE_CLIENT_TIMEOUT_SECONDS, 115 self._MAX_RETRIES, 116 ) 117 return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties) 118 119 @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_blob", kind=SpanKind.CLIENT) 120 def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult: 121 """Enqueue an ingest command from azure blobs. 122 To learn more about ingestion methods go to: 123 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 124 :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested. 125 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 126 """ 127 IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties) 128 129 if self._is_closed: 130 raise KustoClosedError() 131 132 queues = self._resource_manager.get_ingestion_queues() 133 134 authorization_context = self._resource_manager.get_authorization_context() 135 ingestion_blob_info = IngestionBlobInfo( 136 blob_descriptor, 137 ingestion_properties=ingestion_properties, 138 auth_context=authorization_context, 139 application_for_tracing=self.application_for_tracing, 140 client_version_for_tracing=self.client_version_for_tracing, 141 ) 142 ingestion_blob_info_json = ingestion_blob_info.to_json() 143 retries_left = min(self._MAX_RETRIES, len(queues)) 144 for queue in queues: 145 try: 146 with QueueServiceClient(queue.account_uri, proxies=self._proxy_dict) as queue_service: 147 with queue_service.get_queue_client(queue=queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client: 148 # trace enqueuing of blob for ingestion 149 invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS) 150 enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id) 151 MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes) 152 153 self._resource_manager.report_resource_usage_result(queue.storage_account_name, True) 154 return IngestionResult( 155 IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path 156 ) 157 except Exception as e: 158 retries_left = retries_left - 1 159 # TODO: log the retry once we have a proper logging system 160 self._resource_manager.report_resource_usage_result(queue.storage_account_name, False) 161 if retries_left == 0: 162 raise KustoQueueError() from e 163 164 def _get_containers(self) -> List[_ResourceUri]: 165 return self._resource_manager.get_containers() 166 167 def upload_blob( 168 self, 169 containers: List[_ResourceUri], 170 descriptor: Union[FileDescriptor, "StreamDescriptor"], 171 database: str, 172 table: str, 173 stream: IO[AnyStr], 174 proxy_dict: Optional[Dict[str, str]], 175 timeout: int, 176 max_retries: int, 177 ) -> "BlobDescriptor": 178 """ 179 Uploads and transforms FileDescriptor or StreamDescriptor into a BlobDescriptor instance 180 :param List[_ResourceUri] containers: blob containers 181 :param Union[FileDescriptor, "StreamDescriptor"] descriptor: 182 :param string database: database to be ingested to 183 :param string table: table to be ingested to 184 :param IO[AnyStr] stream: stream to be ingested from 185 :param Optional[Dict[str, str]] proxy_dict: proxy urls 186 :param int timeout: Azure service call timeout in seconds 187 :return new BlobDescriptor instance 188 """ 189 blob_name = "{db}__{table}__{guid}__{file}".format(db=database, table=table, guid=descriptor.source_id, file=descriptor.stream_name) 190 191 retries_left = min(max_retries, len(containers)) 192 for container in containers: 193 try: 194 blob_service = BlobServiceClient(container.account_uri, proxies=proxy_dict) 195 blob_client = blob_service.get_blob_client(container=container.object_name, blob=blob_name) 196 blob_client.upload_blob(data=stream, timeout=timeout) 197 self._resource_manager.report_resource_usage_result(container.storage_account_name, True) 198 return BlobDescriptor(blob_client.url, descriptor.size, descriptor.source_id) 199 except Exception as e: 200 retries_left = retries_left - 1 201 # TODO: log the retry once we have a proper logging system 202 self._resource_manager.report_resource_usage_result(container.storage_account_name, False) 203 if retries_left == 0: 204 raise KustoBlobError(e)
Queued ingest client provides methods to allow queued ingestion into kusto (ADX). To learn more about the different types of ingestions and when to use each, visit: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
37 def __init__(self, kcsb: Union[str, KustoConnectionStringBuilder], auto_correct_endpoint: bool = True): 38 """Kusto Ingest Client constructor. 39 :param kcsb: The connection string to initialize KustoClient. 40 """ 41 super().__init__() 42 if not isinstance(kcsb, KustoConnectionStringBuilder): 43 kcsb = KustoConnectionStringBuilder(kcsb) 44 45 if auto_correct_endpoint: 46 kcsb["Data Source"] = BaseIngestClient.get_ingestion_endpoint(kcsb.data_source) 47 48 self._proxy_dict: Optional[Dict[str, str]] = None 49 self._connection_datasource = kcsb.data_source 50 self._resource_manager = _ResourceManager(KustoClient(kcsb)) 51 self._endpoint_service_type = None 52 self._suggested_endpoint_uri = None 53 self.application_for_tracing = kcsb.client_details.application_for_tracing 54 self.client_version_for_tracing = kcsb.client_details.version_for_tracing
Kusto Ingest Client constructor.
Parameters
- kcsb: The connection string to initialize KustoClient.
60 def set_proxy(self, proxy_url: str): 61 self._resource_manager.set_proxy(proxy_url) 62 self._proxy_dict = {"http": proxy_url, "https": proxy_url}
Set proxy for the ingestion client.
Parameters
- str proxy_url: proxy url.
64 @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_file", kind=SpanKind.CLIENT) 65 def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult: 66 """Enqueue an ingest command from local files. 67 To learn more about ingestion methods go to: 68 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 69 :param file_descriptor: a FileDescriptor to be ingested. 70 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 71 """ 72 file_descriptor = FileDescriptor.get_instance(file_descriptor) 73 IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties) 74 75 super().ingest_from_file(file_descriptor, ingestion_properties) 76 77 containers = self._get_containers() 78 79 file_descriptor, should_compress = BaseIngestClient._prepare_file(file_descriptor, ingestion_properties) 80 with file_descriptor.open(should_compress) as stream: 81 blob_descriptor = self.upload_blob( 82 containers, 83 file_descriptor, 84 ingestion_properties.database, 85 ingestion_properties.table, 86 stream, 87 self._proxy_dict, 88 self._SERVICE_CLIENT_TIMEOUT_SECONDS, 89 self._MAX_RETRIES, 90 ) 91 return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)
Enqueue an ingest command from local files. To learn more about ingestion methods go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
Parameters
- file_descriptor: a FileDescriptor to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
93 @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_stream", kind=SpanKind.CLIENT) 94 def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult: 95 """Ingest from io streams. 96 :param stream_descriptor: An object that contains a description of the stream to be ingested. 97 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 98 """ 99 stream_descriptor = StreamDescriptor.get_instance(stream_descriptor) 100 IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties) 101 102 super().ingest_from_stream(stream_descriptor, ingestion_properties) 103 104 containers = self._get_containers() 105 106 stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties) 107 blob_descriptor = self.upload_blob( 108 containers, 109 stream_descriptor, 110 ingestion_properties.database, 111 ingestion_properties.table, 112 stream_descriptor.stream, 113 self._proxy_dict, 114 self._SERVICE_CLIENT_TIMEOUT_SECONDS, 115 self._MAX_RETRIES, 116 ) 117 return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)
Ingest from io streams.
Parameters
- stream_descriptor: An object that contains a description of the stream to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
119 @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_blob", kind=SpanKind.CLIENT) 120 def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult: 121 """Enqueue an ingest command from azure blobs. 122 To learn more about ingestion methods go to: 123 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 124 :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested. 125 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 126 """ 127 IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties) 128 129 if self._is_closed: 130 raise KustoClosedError() 131 132 queues = self._resource_manager.get_ingestion_queues() 133 134 authorization_context = self._resource_manager.get_authorization_context() 135 ingestion_blob_info = IngestionBlobInfo( 136 blob_descriptor, 137 ingestion_properties=ingestion_properties, 138 auth_context=authorization_context, 139 application_for_tracing=self.application_for_tracing, 140 client_version_for_tracing=self.client_version_for_tracing, 141 ) 142 ingestion_blob_info_json = ingestion_blob_info.to_json() 143 retries_left = min(self._MAX_RETRIES, len(queues)) 144 for queue in queues: 145 try: 146 with QueueServiceClient(queue.account_uri, proxies=self._proxy_dict) as queue_service: 147 with queue_service.get_queue_client(queue=queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client: 148 # trace enqueuing of blob for ingestion 149 invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS) 150 enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id) 151 MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes) 152 153 self._resource_manager.report_resource_usage_result(queue.storage_account_name, True) 154 return IngestionResult( 155 IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path 156 ) 157 except Exception as e: 158 retries_left = retries_left - 1 159 # TODO: log the retry once we have a proper logging system 160 self._resource_manager.report_resource_usage_result(queue.storage_account_name, False) 161 if retries_left == 0: 162 raise KustoQueueError() from e
Enqueue an ingest command from azure blobs. To learn more about ingestion methods go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
Parameters
- azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
167 def upload_blob( 168 self, 169 containers: List[_ResourceUri], 170 descriptor: Union[FileDescriptor, "StreamDescriptor"], 171 database: str, 172 table: str, 173 stream: IO[AnyStr], 174 proxy_dict: Optional[Dict[str, str]], 175 timeout: int, 176 max_retries: int, 177 ) -> "BlobDescriptor": 178 """ 179 Uploads and transforms FileDescriptor or StreamDescriptor into a BlobDescriptor instance 180 :param List[_ResourceUri] containers: blob containers 181 :param Union[FileDescriptor, "StreamDescriptor"] descriptor: 182 :param string database: database to be ingested to 183 :param string table: table to be ingested to 184 :param IO[AnyStr] stream: stream to be ingested from 185 :param Optional[Dict[str, str]] proxy_dict: proxy urls 186 :param int timeout: Azure service call timeout in seconds 187 :return new BlobDescriptor instance 188 """ 189 blob_name = "{db}__{table}__{guid}__{file}".format(db=database, table=table, guid=descriptor.source_id, file=descriptor.stream_name) 190 191 retries_left = min(max_retries, len(containers)) 192 for container in containers: 193 try: 194 blob_service = BlobServiceClient(container.account_uri, proxies=proxy_dict) 195 blob_client = blob_service.get_blob_client(container=container.object_name, blob=blob_name) 196 blob_client.upload_blob(data=stream, timeout=timeout) 197 self._resource_manager.report_resource_usage_result(container.storage_account_name, True) 198 return BlobDescriptor(blob_client.url, descriptor.size, descriptor.source_id) 199 except Exception as e: 200 retries_left = retries_left - 1 201 # TODO: log the retry once we have a proper logging system 202 self._resource_manager.report_resource_usage_result(container.storage_account_name, False) 203 if retries_left == 0: 204 raise KustoBlobError(e)
Uploads and transforms FileDescriptor or StreamDescriptor into a BlobDescriptor instance
Parameters
- List[_ResourceUri] containers: blob containers
- Union[FileDescriptor, "StreamDescriptor"] descriptor:
- string database: database to be ingested to
- string table: table to be ingested to
- IO[AnyStr] stream: stream to be ingested from
- Optional[Dict[str, str]] proxy_dict: proxy urls
- int timeout: Azure service call timeout in seconds :return new BlobDescriptor instance
26class ValidationPolicy: 27 """Validation policy to ingest command.""" 28 29 def __init__(self, validation_options=ValidationOptions.DoNotValidate, validation_implications=ValidationImplications.BestEffort): 30 self.ValidationOptions = validation_options 31 self.ValidationImplications = validation_implications
Validation policy to ingest command.
19class ValidationImplications(IntEnum): 20 """Validation implications to ingest command.""" 21 22 Fail = 0 23 BestEffort = 1
Validation implications to ingest command.
11class ValidationOptions(IntEnum): 12 """Validation options to ingest command.""" 13 14 DoNotValidate = 0 15 ValidateCsvInputConstantColumns = 1 16 ValidateCsvInputColumnLevelOnly = 2
Validation options to ingest command.
34class ReportLevel(IntEnum): 35 """Report level to ingest command.""" 36 37 FailuresOnly = 0 38 DoNotReport = 1 39 FailuresAndSuccesses = 2
Report level to ingest command.
Report method to ingest command.
150class IngestionProperties: 151 """ 152 Class to represent ingestion properties. 153 For more information check out https://docs.microsoft.com/en-us/azure/data-explorer/ingestion-properties 154 """ 155 156 _DATABASE = "database" 157 _TABLE = "table" 158 159 def __init__( 160 self, 161 database: str, 162 table: str, 163 data_format: DataFormat = DataFormat.CSV, 164 column_mappings: Optional[List[ColumnMapping]] = None, 165 ingestion_mapping_kind: Optional[IngestionMappingKind] = None, 166 ingestion_mapping_reference: Optional[str] = None, 167 ingest_if_not_exists: Optional[List[str]] = None, 168 ingest_by_tags: Optional[List[str]] = None, 169 drop_by_tags: Optional[List[str]] = None, 170 additional_tags: Optional[List[str]] = None, 171 flush_immediately: bool = False, 172 ignore_first_record: bool = False, 173 report_level: ReportLevel = ReportLevel.DoNotReport, 174 report_method: ReportMethod = ReportMethod.Queue, 175 validation_policy: Optional[ValidationPolicy] = None, 176 additional_properties: Optional[dict] = None, 177 ): 178 if ingestion_mapping_reference is None and column_mappings is None: 179 if ingestion_mapping_kind is not None: 180 raise KustoMissingMappingError(f"When ingestion mapping kind is set ('{ingestion_mapping_kind.value}'), a mapping must be provided.") 181 else: # A mapping is provided 182 if ingestion_mapping_kind is not None: 183 if data_format.ingestion_mapping_kind != ingestion_mapping_kind: 184 raise KustoMappingError( 185 f"Wrong ingestion mapping for format '{data_format.kusto_value}'; mapping kind should be '{data_format.ingestion_mapping_kind.value}', " 186 f"but was '{ingestion_mapping_kind.value}'. " 187 ) 188 else: 189 ingestion_mapping_kind = data_format.ingestion_mapping_kind 190 191 if column_mappings is not None: 192 if ingestion_mapping_reference is not None: 193 raise KustoDuplicateMappingError() 194 195 validation_errors = [] 196 197 for mapping in column_mappings: 198 (valid, mapping_errors) = mapping.is_valid(ingestion_mapping_kind) 199 if not valid: 200 validation_errors.extend(f"Column mapping '{mapping.column}' is invalid - '{e}'" for e in mapping_errors) 201 202 if validation_errors: 203 errors = "\n".join(validation_errors) 204 raise KustoMappingError(f"Failed with validation errors:\n{errors}") 205 206 self.database = database 207 self.table = table 208 self.format = data_format 209 self.ingestion_mapping = column_mappings 210 self.ingestion_mapping_type = ingestion_mapping_kind 211 self.ingestion_mapping_reference = ingestion_mapping_reference 212 self.additional_tags = additional_tags 213 self.ingest_if_not_exists = ingest_if_not_exists 214 self.ingest_by_tags = ingest_by_tags 215 self.drop_by_tags = drop_by_tags 216 self.flush_immediately = flush_immediately 217 self.ignore_first_record = ignore_first_record 218 self.report_level = report_level 219 self.report_method = report_method 220 self.validation_policy = validation_policy 221 self.additional_properties = additional_properties 222 223 def get_tracing_attributes(self) -> dict: 224 """Gets dictionary of attributes to be documented during tracing""" 225 return {self._DATABASE: self.database, self._TABLE: self.table}
Class to represent ingestion properties. For more information check out https://docs.microsoft.com/en-us/azure/data-explorer/ingestion-properties
159 def __init__( 160 self, 161 database: str, 162 table: str, 163 data_format: DataFormat = DataFormat.CSV, 164 column_mappings: Optional[List[ColumnMapping]] = None, 165 ingestion_mapping_kind: Optional[IngestionMappingKind] = None, 166 ingestion_mapping_reference: Optional[str] = None, 167 ingest_if_not_exists: Optional[List[str]] = None, 168 ingest_by_tags: Optional[List[str]] = None, 169 drop_by_tags: Optional[List[str]] = None, 170 additional_tags: Optional[List[str]] = None, 171 flush_immediately: bool = False, 172 ignore_first_record: bool = False, 173 report_level: ReportLevel = ReportLevel.DoNotReport, 174 report_method: ReportMethod = ReportMethod.Queue, 175 validation_policy: Optional[ValidationPolicy] = None, 176 additional_properties: Optional[dict] = None, 177 ): 178 if ingestion_mapping_reference is None and column_mappings is None: 179 if ingestion_mapping_kind is not None: 180 raise KustoMissingMappingError(f"When ingestion mapping kind is set ('{ingestion_mapping_kind.value}'), a mapping must be provided.") 181 else: # A mapping is provided 182 if ingestion_mapping_kind is not None: 183 if data_format.ingestion_mapping_kind != ingestion_mapping_kind: 184 raise KustoMappingError( 185 f"Wrong ingestion mapping for format '{data_format.kusto_value}'; mapping kind should be '{data_format.ingestion_mapping_kind.value}', " 186 f"but was '{ingestion_mapping_kind.value}'. " 187 ) 188 else: 189 ingestion_mapping_kind = data_format.ingestion_mapping_kind 190 191 if column_mappings is not None: 192 if ingestion_mapping_reference is not None: 193 raise KustoDuplicateMappingError() 194 195 validation_errors = [] 196 197 for mapping in column_mappings: 198 (valid, mapping_errors) = mapping.is_valid(ingestion_mapping_kind) 199 if not valid: 200 validation_errors.extend(f"Column mapping '{mapping.column}' is invalid - '{e}'" for e in mapping_errors) 201 202 if validation_errors: 203 errors = "\n".join(validation_errors) 204 raise KustoMappingError(f"Failed with validation errors:\n{errors}") 205 206 self.database = database 207 self.table = table 208 self.format = data_format 209 self.ingestion_mapping = column_mappings 210 self.ingestion_mapping_type = ingestion_mapping_kind 211 self.ingestion_mapping_reference = ingestion_mapping_reference 212 self.additional_tags = additional_tags 213 self.ingest_if_not_exists = ingest_if_not_exists 214 self.ingest_by_tags = ingest_by_tags 215 self.drop_by_tags = drop_by_tags 216 self.flush_immediately = flush_immediately 217 self.ignore_first_record = ignore_first_record 218 self.report_level = report_level 219 self.report_method = report_method 220 self.validation_policy = validation_policy 221 self.additional_properties = additional_properties
7class IngestionMappingKind(Enum): 8 CSV = "Csv" 9 JSON = "Json" 10 AVRO = "Avro" 11 APACHEAVRO = "ApacheAvro" 12 PARQUET = "Parquet" 13 SSTREAM = "SStream" 14 ORC = "Orc" 15 W3CLOGFILE = "W3CLogFile" 16 UNKNOWN = "Unknown"
An enumeration.
64class ColumnMapping: 65 """Use this class to create mappings for IngestionProperties.ingestionMappings and utilize mappings that were not 66 pre-created (it is recommended to create the mappings in advance and use ingestionMappingReference). 67 To read more about mappings look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings""" 68 69 PATH = "Path" 70 TRANSFORMATION_METHOD = "Transform" 71 ORDINAL = "Ordinal" 72 CONST_VALUE = "ConstValue" 73 FIELD_NAME = "Field" 74 75 NEEDED_PROPERTIES: Dict[IngestionMappingKind, List[str]] = { 76 IngestionMappingKind.CSV: [ORDINAL, CONST_VALUE, TRANSFORMATION_METHOD], 77 IngestionMappingKind.JSON: [PATH, CONST_VALUE, TRANSFORMATION_METHOD], 78 IngestionMappingKind.AVRO: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD], 79 IngestionMappingKind.APACHEAVRO: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD], 80 IngestionMappingKind.SSTREAM: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD], 81 IngestionMappingKind.PARQUET: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD], 82 IngestionMappingKind.ORC: [PATH, CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD], 83 IngestionMappingKind.W3CLOGFILE: [CONST_VALUE, FIELD_NAME, TRANSFORMATION_METHOD], 84 } 85 86 CONSTANT_TRANSFORMATION_METHODS = [TransformationMethod.SOURCE_LOCATION.value, TransformationMethod.SOURCE_LINE_NUMBER.value] 87 88 # TODO - add safe and convenient ctors, like in node 89 def __init__( 90 self, 91 column_name: str, 92 column_type: str, 93 path: str = None, 94 transform: TransformationMethod = TransformationMethod.NONE, 95 ordinal: int = None, 96 const_value: str = None, 97 field=None, 98 columns=None, 99 storage_data_type=None, 100 ): 101 """ 102 :param columns: Deprecated. Columns is not used anymore. 103 :param storage_data_type: Deprecated. StorageDataType is not used anymore. 104 """ 105 self.column = column_name 106 self.datatype = column_type 107 self.properties = {} 108 if path: 109 self.properties[self.PATH] = path 110 if transform != TransformationMethod.NONE: 111 self.properties[self.TRANSFORMATION_METHOD] = transform.value 112 if ordinal is not None: 113 self.properties[self.ORDINAL] = str(ordinal) 114 if const_value: 115 self.properties[self.CONST_VALUE] = const_value 116 if field: 117 self.properties[self.FIELD_NAME] = field 118 119 def is_valid(self, kind: IngestionMappingKind) -> (bool, List[str]): 120 if not self.column: 121 return False, ["Column name is required"] 122 123 results = [] 124 125 needed_props = self.NEEDED_PROPERTIES[kind] 126 127 if all(prop not in self.properties for prop in needed_props): 128 results.append(f"{kind} needs at least one of the required properties: {needed_props}") 129 130 if self.properties.get(self.TRANSFORMATION_METHOD): 131 if (self.properties.get(self.PATH) or self.properties.get(self.FIELD_NAME)) and self.properties.get( 132 self.TRANSFORMATION_METHOD 133 ) in self.CONSTANT_TRANSFORMATION_METHODS: 134 results.append( 135 f"When specifying {self.PATH} or {self.FIELD_NAME}, {self.TRANSFORMATION_METHOD} must not be one of " 136 f"{','.join(str(x) for x in self.CONSTANT_TRANSFORMATION_METHODS)}, not {self.properties.get(self.TRANSFORMATION_METHOD)}." 137 ) 138 139 if (not self.properties.get(self.PATH) and not self.properties.get(self.FIELD_NAME)) and self.properties.get( 140 self.TRANSFORMATION_METHOD 141 ) not in self.CONSTANT_TRANSFORMATION_METHODS: 142 results.append( 143 f"When not specifying {self.PATH} or {self.FIELD_NAME}, {self.TRANSFORMATION_METHOD} must be one of" 144 f" {','.join(str(x) for x in self.CONSTANT_TRANSFORMATION_METHODS)}, not {self.properties.get(self.TRANSFORMATION_METHOD)}." 145 ) 146 147 return not bool(results), results
Use this class to create mappings for IngestionProperties.ingestionMappings and utilize mappings that were not pre-created (it is recommended to create the mappings in advance and use ingestionMappingReference). To read more about mappings look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings
89 def __init__( 90 self, 91 column_name: str, 92 column_type: str, 93 path: str = None, 94 transform: TransformationMethod = TransformationMethod.NONE, 95 ordinal: int = None, 96 const_value: str = None, 97 field=None, 98 columns=None, 99 storage_data_type=None, 100 ): 101 """ 102 :param columns: Deprecated. Columns is not used anymore. 103 :param storage_data_type: Deprecated. StorageDataType is not used anymore. 104 """ 105 self.column = column_name 106 self.datatype = column_type 107 self.properties = {} 108 if path: 109 self.properties[self.PATH] = path 110 if transform != TransformationMethod.NONE: 111 self.properties[self.TRANSFORMATION_METHOD] = transform.value 112 if ordinal is not None: 113 self.properties[self.ORDINAL] = str(ordinal) 114 if const_value: 115 self.properties[self.CONST_VALUE] = const_value 116 if field: 117 self.properties[self.FIELD_NAME] = field
Parameters
- columns: Deprecated. Columns is not used anymore.
- storage_data_type: Deprecated. StorageDataType is not used anymore.
119 def is_valid(self, kind: IngestionMappingKind) -> (bool, List[str]): 120 if not self.column: 121 return False, ["Column name is required"] 122 123 results = [] 124 125 needed_props = self.NEEDED_PROPERTIES[kind] 126 127 if all(prop not in self.properties for prop in needed_props): 128 results.append(f"{kind} needs at least one of the required properties: {needed_props}") 129 130 if self.properties.get(self.TRANSFORMATION_METHOD): 131 if (self.properties.get(self.PATH) or self.properties.get(self.FIELD_NAME)) and self.properties.get( 132 self.TRANSFORMATION_METHOD 133 ) in self.CONSTANT_TRANSFORMATION_METHODS: 134 results.append( 135 f"When specifying {self.PATH} or {self.FIELD_NAME}, {self.TRANSFORMATION_METHOD} must not be one of " 136 f"{','.join(str(x) for x in self.CONSTANT_TRANSFORMATION_METHODS)}, not {self.properties.get(self.TRANSFORMATION_METHOD)}." 137 ) 138 139 if (not self.properties.get(self.PATH) and not self.properties.get(self.FIELD_NAME)) and self.properties.get( 140 self.TRANSFORMATION_METHOD 141 ) not in self.CONSTANT_TRANSFORMATION_METHODS: 142 results.append( 143 f"When not specifying {self.PATH} or {self.FIELD_NAME}, {self.TRANSFORMATION_METHOD} must be one of" 144 f" {','.join(str(x) for x in self.CONSTANT_TRANSFORMATION_METHODS)}, not {self.properties.get(self.TRANSFORMATION_METHOD)}." 145 ) 146 147 return not bool(results), results
48class TransformationMethod(Enum): 49 """Transformations to configure over json column mapping 50 To read more about mapping transformations look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings#mapping-transformations""" 51 52 NONE = "None" 53 PROPERTY_BAG_ARRAY_TO_DICTIONARY = ("PropertyBagArrayToDictionary",) 54 SOURCE_LOCATION = "SourceLocation" 55 SOURCE_LINE_NUMBER = "SourceLineNumber" 56 GET_PATH_ELEMENT = "GetPathElement" 57 UNKNOWN_ERROR = "UnknownMethod" 58 DATE_TIME_FROM_UNIX_SECONDS = "DateTimeFromUnixSeconds" 59 DATE_TIME_FROM_UNIX_MILLISECONDS = "DateTimeFromUnixMilliseconds" 60 DATE_TIME_FROM_UNIX_MICROSECONDS = "DateTimeFromUnixMicroseconds" 61 DATE_TIME_FROM_UNIX_NANOSECONDS = "DateTimeFromUnixNanoseconds"
Transformations to configure over json column mapping To read more about mapping transformations look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings#mapping-transformations
27class ManagedStreamingIngestClient(BaseIngestClient): 28 """ 29 Managed Streaming Ingestion Client. 30 Will try to ingest with streaming, but if it fails, will fall back to queued ingestion. 31 Each transient failure will be retried with exponential backoff. 32 33 Managed streaming ingest client will fall back to queued if: 34 - Multiple transient errors were encountered when trying to do streaming ingestion 35 - The ingestion is too large for streaming ingestion (over 4MB) 36 - The ingestion is directly from a blob 37 """ 38 39 MAX_STREAMING_SIZE_IN_BYTES = 4 * 1024 * 1024 40 41 def __init__( 42 self, 43 engine_kcsb: Union[KustoConnectionStringBuilder, str], 44 dm_kcsb: Union[KustoConnectionStringBuilder, str, None] = None, 45 auto_correct_endpoint: bool = True, 46 ): 47 super().__init__() 48 self.queued_client = QueuedIngestClient(dm_kcsb if dm_kcsb is not None else engine_kcsb, auto_correct_endpoint) 49 self.streaming_client = KustoStreamingIngestClient(engine_kcsb, auto_correct_endpoint) 50 self._set_retry_settings() 51 52 def close(self) -> None: 53 if not self._is_closed: 54 self.queued_client.close() 55 self.streaming_client.close() 56 super().close() 57 58 def _set_retry_settings(self, max_seconds_per_retry: float = _utils.MAX_WAIT, num_of_attempts: int = 3): 59 self._num_of_attempts = num_of_attempts 60 self._max_seconds_per_retry = max_seconds_per_retry 61 62 def set_proxy(self, proxy_url: str): 63 self.queued_client.set_proxy(proxy_url) 64 self.streaming_client.set_proxy(proxy_url) 65 66 @distributed_trace(kind=SpanKind.CLIENT) 67 def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult: 68 file_descriptor = FileDescriptor.get_instance(file_descriptor) 69 IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties) 70 71 super().ingest_from_file(file_descriptor, ingestion_properties) 72 73 stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor) 74 75 with stream_descriptor.stream: 76 return self.ingest_from_stream(stream_descriptor, ingestion_properties) 77 78 @distributed_trace(kind=SpanKind.CLIENT) 79 def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult: 80 stream_descriptor = StreamDescriptor.get_instance(stream_descriptor) 81 IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties) 82 83 super().ingest_from_stream(stream_descriptor, ingestion_properties) 84 85 stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties) 86 stream = stream_descriptor.stream 87 88 buffered_stream = read_until_size_or_end(stream, self.MAX_STREAMING_SIZE_IN_BYTES + 1) 89 length = len(buffered_stream.getbuffer()) 90 91 stream_descriptor.stream = buffered_stream 92 93 try: 94 res = self._stream_with_retries(length, stream_descriptor, ingestion_properties) 95 if res: 96 return res 97 stream_descriptor.stream = chain_streams([buffered_stream, stream]) 98 except KustoApiError as ex: 99 error = ex.get_api_error() 100 if error.permanent: 101 raise 102 buffered_stream.seek(0, SEEK_SET) 103 except KustoThrottlingError: 104 _ = buffered_stream.seek(0, SEEK_SET) 105 106 return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties) 107 108 @distributed_trace(kind=SpanKind.CLIENT) 109 def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties): 110 """ 111 Enqueue an ingest command from azure blobs. 112 113 For ManagedStreamingIngestClient, this method always uses Queued Ingest, since it would be easier and faster to ingest blobs. 114 115 To learn more about ingestion methods go to: 116 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 117 :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested. 118 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 119 """ 120 IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties) 121 122 if self._is_closed: 123 raise KustoClosedError() 124 blob_descriptor.fill_size() 125 try: 126 res = self._stream_with_retries(blob_descriptor.size, blob_descriptor, ingestion_properties) 127 if res: 128 return res 129 except KustoApiError as ex: 130 error = ex.get_api_error() 131 if error.permanent: 132 raise 133 except KustoThrottlingError: 134 pass 135 136 return self.queued_client.ingest_from_blob(blob_descriptor, ingestion_properties) 137 138 def _stream_with_retries( 139 self, 140 length: int, 141 descriptor: DescriptorBase, 142 props: IngestionProperties, 143 ) -> Optional[IngestionResult]: 144 from_stream = isinstance(descriptor, StreamDescriptor) 145 if length > self.MAX_STREAMING_SIZE_IN_BYTES: 146 return None 147 for attempt in Retrying(stop=stop_after_attempt(self._num_of_attempts), wait=wait_random_exponential(max=self._max_seconds_per_retry), reraise=True): 148 with attempt: 149 client_request_id = ManagedStreamingIngestClient._get_request_id(descriptor.source_id, attempt.retry_state.attempt_number - 1) 150 # trace attempt to ingest from stream 151 if from_stream: 152 descriptor.stream.seek(0, SEEK_SET) 153 invoker = lambda: self.streaming_client._ingest_from_stream_with_client_request_id(descriptor, props, client_request_id) 154 else: 155 invoker = lambda: self.streaming_client.ingest_from_blob(descriptor, props, client_request_id) 156 return MonitoredActivity.invoke( 157 invoker, 158 name_of_span="ManagedStreamingIngestClient.ingest_from_stream_attempt", 159 tracing_attributes={"attemptNumber": attempt, "sourceIsStream": from_stream}, 160 ) 161 162 @staticmethod 163 def _get_request_id(source_id: uuid.UUID, attempt: int): 164 return f"KPC.executeManagedStreamingIngest;{source_id};{attempt}"
Managed Streaming Ingestion Client. Will try to ingest with streaming, but if it fails, will fall back to queued ingestion. Each transient failure will be retried with exponential backoff.
Managed streaming ingest client will fall back to queued if: - Multiple transient errors were encountered when trying to do streaming ingestion - The ingestion is too large for streaming ingestion (over 4MB) - The ingestion is directly from a blob
41 def __init__( 42 self, 43 engine_kcsb: Union[KustoConnectionStringBuilder, str], 44 dm_kcsb: Union[KustoConnectionStringBuilder, str, None] = None, 45 auto_correct_endpoint: bool = True, 46 ): 47 super().__init__() 48 self.queued_client = QueuedIngestClient(dm_kcsb if dm_kcsb is not None else engine_kcsb, auto_correct_endpoint) 49 self.streaming_client = KustoStreamingIngestClient(engine_kcsb, auto_correct_endpoint) 50 self._set_retry_settings()
62 def set_proxy(self, proxy_url: str): 63 self.queued_client.set_proxy(proxy_url) 64 self.streaming_client.set_proxy(proxy_url)
Set proxy for the ingestion client.
Parameters
- str proxy_url: proxy url.
66 @distributed_trace(kind=SpanKind.CLIENT) 67 def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult: 68 file_descriptor = FileDescriptor.get_instance(file_descriptor) 69 IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties) 70 71 super().ingest_from_file(file_descriptor, ingestion_properties) 72 73 stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor) 74 75 with stream_descriptor.stream: 76 return self.ingest_from_stream(stream_descriptor, ingestion_properties)
Ingest from local files.
Parameters
- file_descriptor: a FileDescriptor to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
78 @distributed_trace(kind=SpanKind.CLIENT) 79 def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult: 80 stream_descriptor = StreamDescriptor.get_instance(stream_descriptor) 81 IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties) 82 83 super().ingest_from_stream(stream_descriptor, ingestion_properties) 84 85 stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties) 86 stream = stream_descriptor.stream 87 88 buffered_stream = read_until_size_or_end(stream, self.MAX_STREAMING_SIZE_IN_BYTES + 1) 89 length = len(buffered_stream.getbuffer()) 90 91 stream_descriptor.stream = buffered_stream 92 93 try: 94 res = self._stream_with_retries(length, stream_descriptor, ingestion_properties) 95 if res: 96 return res 97 stream_descriptor.stream = chain_streams([buffered_stream, stream]) 98 except KustoApiError as ex: 99 error = ex.get_api_error() 100 if error.permanent: 101 raise 102 buffered_stream.seek(0, SEEK_SET) 103 except KustoThrottlingError: 104 _ = buffered_stream.seek(0, SEEK_SET) 105 106 return self.queued_client.ingest_from_stream(stream_descriptor, ingestion_properties)
Ingest from io streams.
Parameters
- stream_descriptor: An object that contains a description of the stream to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
108 @distributed_trace(kind=SpanKind.CLIENT) 109 def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties): 110 """ 111 Enqueue an ingest command from azure blobs. 112 113 For ManagedStreamingIngestClient, this method always uses Queued Ingest, since it would be easier and faster to ingest blobs. 114 115 To learn more about ingestion methods go to: 116 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 117 :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested. 118 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 119 """ 120 IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties) 121 122 if self._is_closed: 123 raise KustoClosedError() 124 blob_descriptor.fill_size() 125 try: 126 res = self._stream_with_retries(blob_descriptor.size, blob_descriptor, ingestion_properties) 127 if res: 128 return res 129 except KustoApiError as ex: 130 error = ex.get_api_error() 131 if error.permanent: 132 raise 133 except KustoThrottlingError: 134 pass 135 136 return self.queued_client.ingest_from_blob(blob_descriptor, ingestion_properties)
Enqueue an ingest command from azure blobs.
For ManagedStreamingIngestClient, this method always uses Queued Ingest, since it would be easier and faster to ingest blobs.
To learn more about ingestion methods go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
Parameters
- azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
18class KustoStreamingIngestClient(BaseIngestClient): 19 """Kusto streaming ingest client for Python. 20 KustoStreamingIngestClient works with both 2.x and 3.x flavors of Python. 21 All primitive types are supported. 22 Tests are run using pytest. 23 """ 24 25 def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str], auto_correct_endpoint: bool = True): 26 """Kusto Streaming Ingest Client constructor. 27 :param KustoConnectionStringBuilder kcsb: The connection string to initialize KustoClient. 28 """ 29 super().__init__() 30 31 if isinstance(kcsb, str): 32 kcsb = KustoConnectionStringBuilder(kcsb) 33 34 if auto_correct_endpoint: 35 kcsb["Data Source"] = BaseIngestClient.get_query_endpoint(kcsb.data_source) 36 self._kusto_client = KustoClient(kcsb) 37 38 def close(self): 39 if not self._is_closed: 40 self._kusto_client.close() 41 super().close() 42 43 def set_proxy(self, proxy_url: str): 44 self._kusto_client.set_proxy(proxy_url) 45 46 @distributed_trace(kind=SpanKind.CLIENT) 47 def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult: 48 """Ingest from local files. 49 :param file_descriptor: a FileDescriptor to be ingested. 50 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 51 """ 52 file_descriptor = FileDescriptor.get_instance(file_descriptor) 53 IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties) 54 55 super().ingest_from_file(file_descriptor, ingestion_properties) 56 57 stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor) 58 59 with stream_descriptor.stream: 60 return self.ingest_from_stream(stream_descriptor, ingestion_properties) 61 62 @distributed_trace(kind=SpanKind.CLIENT) 63 def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult: 64 """Ingest from io streams. 65 :param azure.kusto.ingest.StreamDescriptor stream_descriptor: An object that contains a description of the stream to 66 be ingested. 67 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 68 """ 69 stream_descriptor = StreamDescriptor.get_instance(stream_descriptor) 70 IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties) 71 72 super().ingest_from_stream(stream_descriptor, ingestion_properties) 73 74 return self._ingest_from_stream_with_client_request_id(stream_descriptor, ingestion_properties, None) 75 76 def _ingest_from_stream_with_client_request_id( 77 self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties, client_request_id: Optional[str] 78 ) -> IngestionResult: 79 stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties) 80 additional_properties = None 81 if client_request_id: 82 additional_properties = ClientRequestProperties() 83 additional_properties.client_request_id = client_request_id 84 85 self._kusto_client.execute_streaming_ingest( 86 ingestion_properties.database, 87 ingestion_properties.table, 88 stream_descriptor.stream, 89 None, 90 ingestion_properties.format.name, 91 additional_properties, 92 mapping_name=ingestion_properties.ingestion_mapping_reference, 93 ) 94 95 return IngestionResult(IngestionStatus.SUCCESS, ingestion_properties.database, ingestion_properties.table, stream_descriptor.source_id) 96 97 def ingest_from_blob( 98 self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties, client_request_id: Optional[str] = None 99 ) -> IngestionResult: 100 IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties) 101 additional_properties = None 102 if client_request_id: 103 additional_properties = ClientRequestProperties() 104 additional_properties.client_request_id = client_request_id 105 106 self._kusto_client.execute_streaming_ingest( 107 ingestion_properties.database, 108 ingestion_properties.table, 109 None, 110 blob_descriptor.path, 111 ingestion_properties.format.name, 112 additional_properties, 113 mapping_name=ingestion_properties.ingestion_mapping_reference, 114 ) 115 return IngestionResult(IngestionStatus.SUCCESS, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id)
Kusto streaming ingest client for Python. KustoStreamingIngestClient works with both 2.x and 3.x flavors of Python. All primitive types are supported. Tests are run using pytest.
25 def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str], auto_correct_endpoint: bool = True): 26 """Kusto Streaming Ingest Client constructor. 27 :param KustoConnectionStringBuilder kcsb: The connection string to initialize KustoClient. 28 """ 29 super().__init__() 30 31 if isinstance(kcsb, str): 32 kcsb = KustoConnectionStringBuilder(kcsb) 33 34 if auto_correct_endpoint: 35 kcsb["Data Source"] = BaseIngestClient.get_query_endpoint(kcsb.data_source) 36 self._kusto_client = KustoClient(kcsb)
Kusto Streaming Ingest Client constructor.
Parameters
- KustoConnectionStringBuilder kcsb: The connection string to initialize KustoClient.
Set proxy for the ingestion client.
Parameters
- str proxy_url: proxy url.
46 @distributed_trace(kind=SpanKind.CLIENT) 47 def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult: 48 """Ingest from local files. 49 :param file_descriptor: a FileDescriptor to be ingested. 50 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 51 """ 52 file_descriptor = FileDescriptor.get_instance(file_descriptor) 53 IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties) 54 55 super().ingest_from_file(file_descriptor, ingestion_properties) 56 57 stream_descriptor = StreamDescriptor.from_file_descriptor(file_descriptor) 58 59 with stream_descriptor.stream: 60 return self.ingest_from_stream(stream_descriptor, ingestion_properties)
Ingest from local files.
Parameters
- file_descriptor: a FileDescriptor to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
62 @distributed_trace(kind=SpanKind.CLIENT) 63 def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult: 64 """Ingest from io streams. 65 :param azure.kusto.ingest.StreamDescriptor stream_descriptor: An object that contains a description of the stream to 66 be ingested. 67 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 68 """ 69 stream_descriptor = StreamDescriptor.get_instance(stream_descriptor) 70 IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties) 71 72 super().ingest_from_stream(stream_descriptor, ingestion_properties) 73 74 return self._ingest_from_stream_with_client_request_id(stream_descriptor, ingestion_properties, None)
Ingest from io streams.
Parameters
- azure.kusto.ingest.StreamDescriptor stream_descriptor: An object that contains a description of the stream to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
97 def ingest_from_blob( 98 self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties, client_request_id: Optional[str] = None 99 ) -> IngestionResult: 100 IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties) 101 additional_properties = None 102 if client_request_id: 103 additional_properties = ClientRequestProperties() 104 additional_properties.client_request_id = client_request_id 105 106 self._kusto_client.execute_streaming_ingest( 107 ingestion_properties.database, 108 ingestion_properties.table, 109 None, 110 blob_descriptor.path, 111 ingestion_properties.format.name, 112 additional_properties, 113 mapping_name=ingestion_properties.ingestion_mapping_reference, 114 ) 115 return IngestionResult(IngestionStatus.SUCCESS, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id)
74class BaseIngestClient(metaclass=ABCMeta): 75 def __init__(self): 76 self._is_closed: bool = False 77 78 def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult: 79 """Ingest from local files. 80 :param file_descriptor: a FileDescriptor to be ingested. 81 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 82 """ 83 if self._is_closed: 84 raise KustoClosedError() 85 86 @abstractmethod 87 def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult: 88 """Ingest from io streams. 89 :param stream_descriptor: An object that contains a description of the stream to be ingested. 90 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 91 """ 92 if self._is_closed: 93 raise KustoClosedError() 94 95 @abstractmethod 96 def set_proxy(self, proxy_url: str): 97 """Set proxy for the ingestion client. 98 :param str proxy_url: proxy url. 99 """ 100 if self._is_closed: 101 raise KustoClosedError() 102 103 def ingest_from_dataframe( 104 self, df: "pandas.DataFrame", ingestion_properties: IngestionProperties, data_format: Optional[DataFormat] = None 105 ) -> IngestionResult: 106 """Enqueue an ingest command from local files. 107 To learn more about ingestion methods go to: 108 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 109 :param pandas.DataFrame df: input dataframe to ingest. 110 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 111 :param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSON or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON. 112 """ 113 114 if self._is_closed: 115 raise KustoClosedError() 116 117 from pandas import DataFrame 118 119 if not isinstance(df, DataFrame): 120 raise ValueError("Expected DataFrame instance, found {}".format(type(df))) 121 122 is_json = True 123 124 # If we are given CSV mapping, or the mapping format is explicitly set to CSV, we should use CSV 125 if not data_format: 126 if ingestion_properties is not None and (ingestion_properties.ingestion_mapping_type == IngestionMappingKind.CSV): 127 is_json = False 128 elif data_format == DataFormat.CSV: 129 is_json = False 130 elif data_format == DataFormat.JSON: 131 is_json = True 132 else: 133 raise ValueError("Unsupported format: {}. Supported formats are: CSV, JSON, None".format(data_format)) 134 135 file_name = "df_{id}_{timestamp}_{uid}.{ext}.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4(), ext="json" if is_json else "csv") 136 temp_file_path = os.path.join(tempfile.gettempdir(), file_name) 137 with gzip.open(temp_file_path, "wt", encoding="utf-8") as temp_file: 138 if is_json: 139 df.to_json(temp_file, orient="records", date_format="iso", lines=True) 140 ingestion_properties.format = DataFormat.JSON 141 else: 142 df.to_csv(temp_file, index=False, encoding="utf-8", header=False) 143 ingestion_properties.ignore_first_record = False 144 ingestion_properties.format = DataFormat.CSV 145 146 try: 147 return self.ingest_from_file(temp_file_path, ingestion_properties) 148 finally: 149 os.unlink(temp_file_path) 150 151 @staticmethod 152 def _prepare_stream(stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> StreamDescriptor: 153 """ 154 Prepares a StreamDescriptor instance for ingest operation based on ingestion properties 155 :param StreamDescriptor stream_descriptor: Stream descriptor instance 156 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 157 :return prepared stream descriptor 158 """ 159 new_descriptor = StreamDescriptor.get_instance(stream_descriptor) 160 161 if isinstance(new_descriptor.stream, TextIOWrapper): 162 new_descriptor.stream = new_descriptor.stream.buffer 163 164 should_compress = BaseIngestClient._should_compress(new_descriptor, ingestion_properties) 165 if should_compress: 166 new_descriptor.compress_stream() 167 168 return new_descriptor 169 170 @staticmethod 171 def _prepare_file(file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> Tuple[FileDescriptor, bool]: 172 """ 173 Prepares a FileDescriptor instance for ingest operation based on ingestion properties 174 :param FileDescriptor file_descriptor: File descriptor instance 175 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 176 :return prepared file descriptor 177 """ 178 descriptor = FileDescriptor.get_instance(file_descriptor) 179 180 should_compress = BaseIngestClient._should_compress(descriptor, ingestion_properties) 181 return descriptor, should_compress 182 183 @staticmethod 184 def _should_compress(new_descriptor: Union[FileDescriptor, StreamDescriptor], ingestion_properties: IngestionProperties) -> bool: 185 """ 186 Checks if descriptor should be compressed based on ingestion properties and current format 187 """ 188 return not new_descriptor.is_compressed and ingestion_properties.format.compressible 189 190 def close(self) -> None: 191 self._is_closed = True 192 193 def __enter__(self): 194 return self 195 196 def __exit__(self, exc_type, exc_val, exc_tb): 197 self.close() 198 199 @staticmethod 200 def get_ingestion_endpoint(cluster_url: str) -> str: 201 if INGEST_PREFIX in cluster_url or not cluster_url or BaseIngestClient.is_reserved_hostname(cluster_url): 202 return cluster_url 203 else: 204 return cluster_url.replace(PROTOCOL_SUFFIX, PROTOCOL_SUFFIX + INGEST_PREFIX, 1) 205 206 @staticmethod 207 def get_query_endpoint(cluster_url: str) -> str: 208 if INGEST_PREFIX in cluster_url: 209 return cluster_url.replace(INGEST_PREFIX, "", 1) 210 else: 211 return cluster_url 212 213 @staticmethod 214 def is_reserved_hostname(raw_uri: str) -> bool: 215 url = urlparse(raw_uri) 216 if not url.netloc: 217 return True 218 authority = url.netloc.split(":")[0] # removes port if exists 219 try: 220 is_ip = ipaddress.ip_address(authority) 221 except ValueError: 222 is_ip = False 223 is_localhost = "localhost" in authority 224 return is_localhost or is_ip or authority.lower() == "onebox.dev.kusto.windows.net"
78 def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult: 79 """Ingest from local files. 80 :param file_descriptor: a FileDescriptor to be ingested. 81 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 82 """ 83 if self._is_closed: 84 raise KustoClosedError()
Ingest from local files.
Parameters
- file_descriptor: a FileDescriptor to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
86 @abstractmethod 87 def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult: 88 """Ingest from io streams. 89 :param stream_descriptor: An object that contains a description of the stream to be ingested. 90 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 91 """ 92 if self._is_closed: 93 raise KustoClosedError()
Ingest from io streams.
Parameters
- stream_descriptor: An object that contains a description of the stream to be ingested.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
95 @abstractmethod 96 def set_proxy(self, proxy_url: str): 97 """Set proxy for the ingestion client. 98 :param str proxy_url: proxy url. 99 """ 100 if self._is_closed: 101 raise KustoClosedError()
Set proxy for the ingestion client.
Parameters
- str proxy_url: proxy url.
103 def ingest_from_dataframe( 104 self, df: "pandas.DataFrame", ingestion_properties: IngestionProperties, data_format: Optional[DataFormat] = None 105 ) -> IngestionResult: 106 """Enqueue an ingest command from local files. 107 To learn more about ingestion methods go to: 108 https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods 109 :param pandas.DataFrame df: input dataframe to ingest. 110 :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties. 111 :param DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSON or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON. 112 """ 113 114 if self._is_closed: 115 raise KustoClosedError() 116 117 from pandas import DataFrame 118 119 if not isinstance(df, DataFrame): 120 raise ValueError("Expected DataFrame instance, found {}".format(type(df))) 121 122 is_json = True 123 124 # If we are given CSV mapping, or the mapping format is explicitly set to CSV, we should use CSV 125 if not data_format: 126 if ingestion_properties is not None and (ingestion_properties.ingestion_mapping_type == IngestionMappingKind.CSV): 127 is_json = False 128 elif data_format == DataFormat.CSV: 129 is_json = False 130 elif data_format == DataFormat.JSON: 131 is_json = True 132 else: 133 raise ValueError("Unsupported format: {}. Supported formats are: CSV, JSON, None".format(data_format)) 134 135 file_name = "df_{id}_{timestamp}_{uid}.{ext}.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4(), ext="json" if is_json else "csv") 136 temp_file_path = os.path.join(tempfile.gettempdir(), file_name) 137 with gzip.open(temp_file_path, "wt", encoding="utf-8") as temp_file: 138 if is_json: 139 df.to_json(temp_file, orient="records", date_format="iso", lines=True) 140 ingestion_properties.format = DataFormat.JSON 141 else: 142 df.to_csv(temp_file, index=False, encoding="utf-8", header=False) 143 ingestion_properties.ignore_first_record = False 144 ingestion_properties.format = DataFormat.CSV 145 146 try: 147 return self.ingest_from_file(temp_file_path, ingestion_properties) 148 finally: 149 os.unlink(temp_file_path)
Enqueue an ingest command from local files. To learn more about ingestion methods go to: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
Parameters
- pandas.DataFrame df: input dataframe to ingest.
- azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
- DataFormat data_format: Format to convert the dataframe to - Can be DataFormat.CSV, DataFormat.JSON or None. If not specified, it will try to infer it from the mapping, if not found, it will default to JSON.
199 @staticmethod 200 def get_ingestion_endpoint(cluster_url: str) -> str: 201 if INGEST_PREFIX in cluster_url or not cluster_url or BaseIngestClient.is_reserved_hostname(cluster_url): 202 return cluster_url 203 else: 204 return cluster_url.replace(PROTOCOL_SUFFIX, PROTOCOL_SUFFIX + INGEST_PREFIX, 1)
213 @staticmethod 214 def is_reserved_hostname(raw_uri: str) -> bool: 215 url = urlparse(raw_uri) 216 if not url.netloc: 217 return True 218 authority = url.netloc.split(":")[0] # removes port if exists 219 try: 220 is_ip = ipaddress.ip_address(authority) 221 except ValueError: 222 is_ip = False 223 is_localhost = "localhost" in authority 224 return is_localhost or is_ip or authority.lower() == "onebox.dev.kusto.windows.net"