Blob Data Source
The Blob Data Source ingests documents from the documents container in your Azure Storage Account into Azure AI Search and keeps the index synchronized when files are updated or removed. It is designed for production-scale document processing with incremental updates, smart freshness detection, and automated cleanup.
How it Works
The Blob Data Source operates through two independent jobs that can be scheduled separately for optimal resource usage and data freshness.
Indexing job (blob-storage-indexer) scans the configured blob container (optionally filtered by BLOB_PREFIX) and processes documents into searchable chunks. It uses smart freshness detection by comparing each blob's last_modified timestamp with the indexed version, skipping unchanged files to save processing time and costs. For each new or modified file, the indexer downloads the blob, chunks it using Azure Document Intelligence, deletes any existing chunks for that file (by parent_id), and uploads new chunks with stable, search-safe IDs. Each chunk document sets source = "blob" to enable cleanup operations, and when available in blob metadata, includes metadata_security_id for security trimming scenarios.
Purging job (blob-storage-purger) maintains index hygiene by removing orphaned documents. It compares parent documents in blob storage with parent documents in the AI Search index (where source == "blob"), identifies chunks belonging to files that no longer exist in storage, and deletes them in batches. The job waits briefly after deletion to ensure accurate document counts, as Azure AI Search uses eventual consistency for index statistics.
Files are processed in parallel using a configurable semaphore (INDEXER_MAX_CONCURRENCY, default: 4) to balance throughput with service limits. Each file goes through download → security metadata extraction → document chunking → chunk conversion → batch upload. Batch uploads use INDEXER_BATCH_SIZE (default: 500) to optimize AI Search API calls while staying within request size limits.
Supported formats include PDF, Word (.docx), PowerPoint (.pptx), Excel (.xlsx), text (.txt, .md), images (.jpg, .png, .bmp, .tiff), and HTML. OCR extraction is performed automatically for PDFs and images using Azure Document Intelligence.
Ingestion Flow
Indexing Job
┌────────────────────────────────────────────────────────────────────────────────┐
│ BLOB STORAGE INGESTION FLOW │
│ │
│ ┌────────────────────┐ ┌─────────────────────────────────────────┐ │
│ │ Azure Blob │ │ Index State Cache │ │
│ │ Storage │ │ • Load existing timestamps │ │
│ │ • Source Container│◄──────────│ • Build parent_id → lastModified map │ │
│ │ • Blob Metadata │ │ • Detect already-indexed docs │ │
│ │ • Security IDs │ │ (incremental updates) │ │
│ └─────────┬──────────┘ └─────────────────────────────────────────┘ │
│ │ │
│ │ List Blobs + Filter by Prefix │
│ v │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Freshness Check │ │
│ │ │ │
│ │ For each blob: │ │
│ │ • Compare blob.last_modified vs indexed timestamp │ │
│ │ • Skip if blob.last_modified ≤ indexed timestamp │ │
│ │ • Queue for processing if blob is newer or not indexed │ │
│ └───────────────────────────────┬─────────────────────────────────────────┘ │
│ │ │
└──────────────────────────────────┼─────────────────────────────────────────────┘
│
v
┌────────────────────────────────────────────────────────────────────────────────┐
│ PROCESSING PIPELINE │
│ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ blob_storage_indexer.py │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Download │ │ Security │ │ Document │ │ Chunk │ │ │
│ │ │ Blob │─>│ Metadata │─>│ Chunker │─>│ Conversion │ │ │
│ │ │ (Binary) │ │ Extraction │ │ (PDF/Docs) │ │ to Search │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ └──────┬───────┘ │ │
│ │ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │ │
│ │ │ Index │ │ Delete Old │◄──────────────────────────┘ │ │
│ │ │ Batch │◄─│ Parent Docs │ (Remove existing chunks) │ │
│ │ │ Upload │ │ by parent_id│ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
│ │
│ • Parallel Processing: Configurable semaphore (INDEXER_MAX_CONCURRENCY) │
│ • Batch Size: 500 docs per AI Search batch (INDEXER_BATCH_SIZE) │
│ • Error Handling: Per-blob try/catch with individual file logs │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
│
v
┌────────────────────────────────────────────────────────────────────────────────┐
│ OUTPUT & STORAGE │
│ │
│ ┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ Azure AI Search │ │ Azure Blob Storage │ │ Telemetry │ │
│ │ • Indexed Chunks │ │ • Run Summaries │ │ • App Insights │ │
│ │ • Vector Embeddings │ │ • Per-File Logs │ │ • Structured Logs │ │
│ │ • Security IDs │ │ • Processing State │ │ • Performance │ │
│ │ • Metadata │ │ (jobs container) │ │ • Error Tracking │ │
│ │ source=blob │ │ │ │ │ │
│ └──────────────────────┘ └──────────────────────┘ └──────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Scheduling
Jobs are enabled through CRON expressions:
CRON_RUN_BLOB_INDEX: runs the indexing jobCRON_RUN_BLOB_PURGE: runs the purge job- Leave unset to disable
The scheduler uses SCHEDULER_TIMEZONE (IANA format, e.g., Europe/Berlin), falling back to the host machine’s timezone if not specified.
On startup, if a CRON is configured, the corresponding job is scheduled and also triggered once immediately.
Examples:
0 * * * *→ hourly*/15 * * * *→ every 15 minutes0 0 * * *→ daily at midnight
Settings
STORAGE_ACCOUNT_NAMEandDOCUMENTS_STORAGE_CONTAINER: source locationSEARCH_SERVICE_QUERY_ENDPOINTandSEARCH_RAG_INDEX_NAME: target indexBLOB_PREFIX(optional): restricts the scan scopeJOBS_LOG_CONTAINER(default: jobs): container for logsINDEXER_MAX_CONCURRENCY: concurrency, defaults:4.INDEXER_BATCH_SIZE: batch size, defaults:500
INDEXER_MAX_CONCURRENCYcontrols how many files are processed in parallel (download → chunk → upload).INDEXER_BATCH_SIZEcontrols how many chunk documents are sent in each upload call to Azure AI Search. Increase these to raise throughput, but watch for throttling (HTTP 429), timeouts, and memory usage; lower them if you see retries or instability. The default batch size (500) follows common guidance to keep batches reasonable (typically ≤ 1000).
Logs
Both jobs write logs to the configured jobs container. Logs are grouped by job type:
-
Indexer (
blob-storage-indexer)- Per-file logs and per-run summaries under
files/andruns/ - Summaries include:
sourceFiles,candidates,success/failed,totalChunksUploaded
- Per-file logs and per-run summaries under
-
Purger (
blob-storage-purger)- Per-run summaries under
runs/ - Summaries include:
blobDocumentsCount,indexParentsCountBefore/After,indexChunkDocumentsBefore,indexParentsPurged,indexChunkDocumentsDeleted
- Per-run summaries under
Observability
The blob storage indexer emits structured Application Insights events (RUN-*, ITEM-*) with JSON payloads embedded in the message field.
Latest Job Runs
View recent indexing jobs with key metrics:
let Logs = union isfuzzy=true traces, AppTraces;
Logs
| where message contains "RUN-COMPLETE" and message contains "blob-storage-indexer"
| extend payload = parse_json(extract('\\{.*', 0, message))
| where tostring(payload.event) == "RUN-COMPLETE"
| project timestamp,
runId = tostring(payload.runId),
status = tostring(payload.status),
sourceFiles = toint(payload.sourceFiles),
itemsDiscovered = toint(payload.itemsDiscovered),
indexedItems = toint(payload.indexedItems),
skippedNoChange = toint(payload.skippedNoChange),
failed = toint(payload.failed),
totalChunksUploaded = toint(payload.totalChunksUploaded),
durationSeconds = todouble(payload.durationSeconds)
| order by timestamp desc
Purge Summary Stats
Inspect the most recent purge runs and their cleanup metrics:
let Logs = union isfuzzy=true traces, AppTraces;
Logs
| where message contains "blob-storage-purger"
and message contains "Summary:"
| extend payload = parse_json(extract('\\{.*', 0, message))
| extend runStartedAt = todatetime(payload.runStartedAt),
runFinishedAt = todatetime(payload.runFinishedAt)
| project
timestamp,
runStartedAt,
runFinishedAt,
durationSeconds = datetime_diff("second", runFinishedAt, runStartedAt),
blobDocumentsCount = toint(payload.blobDocumentsCount),
indexParentsCountBefore = toint(payload.indexParentsCountBefore),
indexChunkDocumentsBefore = toint(payload.indexChunkDocumentsBefore),
indexParentsPurged = toint(payload.indexParentsPurged),
indexChunkDocumentsDeleted= toint(payload.indexChunkDocumentsDeleted),
indexParentsCountAfter = toint(payload.indexParentsCountAfter)
| order by timestamp desc
Items in Specific Indexer Run
List all files processed during a particular run:
let TargetRunId = '20251121T231125Z';
let Logs = union isfuzzy=true traces, AppTraces;
Logs
| where message contains "ITEM-COMPLETE" and message contains "blob-storage-indexer"
| extend payload = parse_json(extract('\\{.*', 0, message))
| where tostring(payload.event) == "ITEM-COMPLETE" and tostring(payload.runId) == TargetRunId
| project timestamp,
blobName = tostring(payload.blobName),
parentId = tostring(payload.parentId),
status = tostring(payload.status),
totalChunks = toint(payload.totalChunks),
contentType = tostring(payload.contentType),
fileUrl = tostring(payload.fileUrl)
| order by timestamp desc
File Indexing History
Track processing history for a specific file:
let TargetParent = '/documents/employee_handbook.pdf';
let Logs = union isfuzzy=true traces, AppTraces;
Logs
| where message contains "ITEM-COMPLETE" and message contains "blob-storage-indexer"
| extend payload = parse_json(extract('\\{.*', 0, message))
| where tostring(payload.event) == "ITEM-COMPLETE" and tostring(payload.parentId) == TargetParent
| project timestamp,
runId = tostring(payload.runId),
blobName = tostring(payload.blobName),
status = tostring(payload.status),
totalChunks = toint(payload.totalChunks),
contentType = tostring(payload.contentType),
fileUrl = tostring(payload.fileUrl)
| order by timestamp desc
Recent Indexer Errors
View recent warnings and errors:
let Logs = union isfuzzy=true traces, AppTraces;
Logs
| where severityLevel >= 3 and message contains "blob-storage-indexer"
| extend payload = parse_json(extract('\\{.*', 0, message))
| project timestamp,
severityLevel,
event = tostring(payload.event),
runId = tostring(payload.runId),
blobName = tostring(payload.blobName),
parentId = tostring(payload.parentId),
error = tostring(payload.error),
message
| order by timestamp desc