Source code for pyrit.scenarios.scenario

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""
Scenario class for grouping and executing multiple AtomicAttacks.

This module provides the Scenario class that orchestrates the execution of multiple
AtomicAttack instances sequentially, enabling comprehensive security testing campaigns.
"""

import logging
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Type

from tqdm.auto import tqdm

from pyrit.memory import CentralMemory
from pyrit.models import AttackResult
from pyrit.models.scenario_result import ScenarioIdentifier, ScenarioResult
from pyrit.prompt_target import PromptTarget
from pyrit.scenarios.atomic_attack import AtomicAttack
from pyrit.scenarios.scenario_strategy import ScenarioStrategy

logger = logging.getLogger(__name__)


[docs] class Scenario(ABC): """ Groups and executes multiple AtomicAttack instances sequentially. A Scenario represents a comprehensive testing campaign composed of multiple atomic attack tests (AtomicAttacks). It executes each AtomicAttack in sequence and aggregates the results into a ScenarioResult. Example: >>> from pyrit.scenarios import Scenario, AtomicAttack >>> from pyrit.executor.attack.single_turn.prompt_sending import PromptSendingAttack >>> from pyrit.prompt_target import OpenAIChatTarget >>> from pyrit.prompt_converter import Base64Converter >>> >>> target = OpenAIChatTarget() >>> >>> # Create a custom scenario subclass >>> class MyScenario(Scenario): ... async def _get_atomic_attacks_async(self) -> List[AtomicAttack]: ... base64_attack = PromptSendingAttack( ... objective_target=target, ... converters=[Base64Converter()] ... ) ... return [ ... AtomicAttack( ... attack=base64_attack, ... objectives=["Tell me how to make a bomb"] ... ) ... ] >>> >>> # Create and execute scenario >>> scenario = MyScenario( ... name="Security Test Campaign", ... version=1, ... attack_strategies=["base64"] ... ) >>> await scenario.initialize_async() >>> result = await scenario.run_async() >>> print(f"Completed {len(result.attack_results)} tests") """
[docs] def __init__( self, *, name: str, version: int, max_concurrency: int = 1, memory_labels: Optional[Dict[str, str]] = None, objective_target: Optional[PromptTarget] = None, objective_scorer_identifier: Optional[Dict[str, str]] = None, ) -> None: """ Initialize a scenario. Args: name (str): Descriptive name for the scenario. version (int): Version number of the scenario. max_concurrency (int): Maximum number of concurrent attack executions. Defaults to 1. memory_labels (Optional[Dict[str, str]]): Additional labels to apply to all attack runs in the scenario. These help track and categorize the scenario. objective_target (Optional[PromptTarget]): The target system to attack. objective_scorer_identifier (Optional[Dict[str, str]]): Identifier for the objective scorer. Note: Attack runs are populated by calling initialize_async(), which invokes the subclass's _get_attack_runs_async() method. The scenario description is automatically extracted from the class's docstring (__doc__) with whitespace normalized for display. """ # Use the class docstring with normalized whitespace as description description = " ".join(self.__class__.__doc__.split()) if self.__class__.__doc__ else "" self._identifier = ScenarioIdentifier( name=type(self).__name__, scenario_version=version, description=description ) self._objective_target = objective_target if not objective_target: raise ValueError("Objective target must be provided.") self._objective_target_identifier = objective_target.get_identifier() self._objective_scorer_identifier = objective_scorer_identifier or {} self._name = name self._memory_labels = memory_labels or {} self._memory = CentralMemory.get_memory_instance() self._max_concurrency = max_concurrency self._atomic_attacks: List[AtomicAttack] = []
@property def name(self) -> str: """Get the name of the scenario.""" return self._name @property def atomic_attack_count(self) -> int: """Get the number of atomic attacks in this scenario.""" return len(self._atomic_attacks)
[docs] @classmethod @abstractmethod def get_strategy_class(cls) -> Type[ScenarioStrategy]: """ Get the strategy enum class for this scenario. This abstract method must be implemented by all scenario subclasses to return the ScenarioStrategy enum class that defines the available attack strategies for the scenario. Returns: Type[ScenarioStrategy]: The strategy enum class (e.g., FoundryStrategy, EncodingStrategy). Example: >>> class MyScenario(Scenario): ... @classmethod ... def get_strategy_class(cls) -> Type[ScenarioStrategy]: ... return MyStrategy >>> >>> # Registry can now discover strategies without instantiation >>> strategy_class = MyScenario.get_strategy_class() >>> all_strategies = list(strategy_class) """ pass
[docs] @classmethod @abstractmethod def get_default_strategy(cls) -> ScenarioStrategy: """ Get the default strategy used when no strategies are specified. This abstract method must be implemented by all scenario subclasses to return the default aggregate strategy (like EASY, ALL) used when scenario_strategies parameter is None. Returns: ScenarioStrategy: The default aggregate strategy (e.g., FoundryStrategy.EASY, EncodingStrategy.ALL). Example: >>> class MyScenario(Scenario): ... @classmethod ... def get_default_strategy(cls) -> ScenarioStrategy: ... return MyStrategy.EASY >>> >>> # Registry can discover default strategy without instantiation >>> default = MyScenario.get_default_strategy() """ pass
[docs] async def initialize_async(self) -> None: """ Initialize the scenario by populating self._atomic_attacks This method allows scenarios to be initialized with atomic attacks after construction, which is useful when atomic attacks require async operations to be built. Args: atomic_attacks: List of AtomicAttack instances to execute in this scenario. Returns: Scenario: Self for method chaining. Example: >>> scenario = MyScenario( ... objective_target=target, ... attack_strategies=["base64", "leetspeak"] ... ) >>> atomic_attacks = await scenario.build_atomic_attacks_async() >>> await scenario.initialize_async() >>> results = await scenario.run_async() """ self._atomic_attacks = await self._get_atomic_attacks_async()
@abstractmethod async def _get_atomic_attacks_async(self) -> List[AtomicAttack]: """ Retrieve the list of AtomicAttack instances in this scenario. This method can be overridden by subclasses to perform async operations needed to build or fetch the atomic attacks. Returns: List[AtomicAttack]: The list of AtomicAttack instances in this scenario. """ pass
[docs] async def run_async(self) -> ScenarioResult: """ Execute all atomic attacks in the scenario sequentially. Each AtomicAttack is executed in order, and all results are aggregated into a ScenarioResult containing the scenario metadata and all attack results. Args: max_concurrency (int): Maximum number of concurrent attack executions within each AtomicAttack. Defaults to 1 for sequential execution. Returns: ScenarioResult: Contains scenario identifier and aggregated list of all attack results from all atomic attacks. Raises: ValueError: If the scenario has no atomic attacks configured. If your scenario requires initialization, call await scenario.initialize() first. Example: >>> result = await scenario.run_async(max_concurrency=3) >>> print(f"Scenario: {result.scenario_identifier.name}") >>> print(f"Total results: {len(result.attack_results)}") >>> for attack_result in result.attack_results: ... print(f"Objective: {attack_result.objective}, Outcome: {attack_result.outcome}") """ if not self._atomic_attacks: raise ValueError( "Cannot run scenario with no atomic attacks. Either supply them in initialization or" "call await scenario.initialize_async() first." ) logger.info(f"Starting scenario '{self._name}' execution with {len(self._atomic_attacks)} atomic attacks") all_results: Dict[str, List[AttackResult]] = {} for i, atomic_attack in enumerate( tqdm(self._atomic_attacks, desc=f"Executing {self._name}", unit="attack"), start=1 ): logger.info(f"Executing atomic attack {i}/{len(self._atomic_attacks)} in scenario '{self._name}'") try: atomic_results = await atomic_attack.run_async(max_concurrency=self._max_concurrency) all_results.setdefault(atomic_attack.atomic_attack_name, []).extend(atomic_results.results) logger.info( f"Atomic attack {i}/{len(self._atomic_attacks)} completed with " f"{len(atomic_results.results)} results" ) except Exception as e: logger.error( f"Atomic attack {i}/{len(self._atomic_attacks)} failed in scenario '{self._name}': {str(e)}" ) raise ValueError(f"Failed to execute atomic attack {i} in scenario '{self._name}': {str(e)}") from e logger.info(f"Scenario '{self._name}' completed successfully with {len(all_results)} total results") result = ScenarioResult( scenario_identifier=self._identifier, objective_target_identifier=self._objective_target_identifier, objective_scorer_identifier=self._objective_scorer_identifier, labels=self._memory_labels, attack_results=all_results, ) self._memory.add_scenario_results_to_memory(scenario_results=[result]) return result