Source code for pyrit.auth.azure_storage_auth

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from datetime import datetime, timedelta
from urllib.parse import urlparse

from azure.identity.aio import DefaultAzureCredential
from azure.storage.blob import ContainerSasPermissions, UserDelegationKey, generate_container_sas
from azure.storage.blob.aio import BlobServiceClient


[docs] class AzureStorageAuth: """ A utility class for Azure Storage authentication, providing methods to generate SAS tokens using user delegation keys. """
[docs] @staticmethod async def get_user_delegation_key(blob_service_client: BlobServiceClient) -> UserDelegationKey: """ Retrieves a user delegation key valid for one day. Args: blob_service_client (BlobServiceClient): An instance of BlobServiceClient to interact with Azure Blob Storage. Returns: UserDelegationKey: A user delegation key valid for one day. """ delegation_key_start_time = datetime.now() delegation_key_expiry_time = delegation_key_start_time + timedelta(days=1) user_delegation_key = await blob_service_client.get_user_delegation_key( key_start_time=delegation_key_start_time, key_expiry_time=delegation_key_expiry_time ) return user_delegation_key
[docs] @staticmethod async def get_sas_token(container_url: str) -> str: """ Generates a SAS token for the specified blob using a user delegation key. Args: container_url (str): The URL of the Azure Blob Storage container. Returns: str: The generated SAS token. """ if not container_url: raise ValueError( "Azure Storage Container URL is not provided. The correct format " "is 'https://storageaccountname.core.windows.net/containername'." ) parsed_url = urlparse(container_url) if not parsed_url.scheme or not parsed_url.netloc or not parsed_url.path: raise ValueError( "Invalid Azure Storage Container URL." " The correct format is 'https://storageaccountname.core.windows.net/containername'." ) account_url = f"{parsed_url.scheme}://{parsed_url.netloc}" credential = DefaultAzureCredential() try: async with BlobServiceClient(account_url=account_url, credential=credential) as blob_service_client: user_delegation_key = await AzureStorageAuth.get_user_delegation_key( blob_service_client=blob_service_client ) container_name = parsed_url.path.lstrip("/") storage_account_name = parsed_url.netloc.split(".")[0] # Set start_time 5 minutes before the current time to account for any clock skew start_time = datetime.now() - timedelta(minutes=5) expiry_time = start_time + timedelta(days=1) sas_token = generate_container_sas( account_name=storage_account_name, container_name=container_name, user_delegation_key=user_delegation_key, permission=ContainerSasPermissions(read=True, write=True, create=True, list=True, delete=True), expiry=expiry_time, start=start_time, ) finally: await credential.close() return sas_token