# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""
Scenario class for grouping and executing multiple AtomicAttacks.
This module provides the Scenario class that orchestrates the execution of multiple
AtomicAttack instances sequentially, enabling comprehensive security testing campaigns.
"""
import logging
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Type
from tqdm.auto import tqdm
from pyrit.memory import CentralMemory
from pyrit.models import AttackResult
from pyrit.models.scenario_result import ScenarioIdentifier, ScenarioResult
from pyrit.prompt_target import PromptTarget
from pyrit.scenarios.atomic_attack import AtomicAttack
from pyrit.scenarios.scenario_strategy import ScenarioStrategy
logger = logging.getLogger(__name__)
[docs]
class Scenario(ABC):
"""
Groups and executes multiple AtomicAttack instances sequentially.
A Scenario represents a comprehensive testing campaign composed of multiple
atomic attack tests (AtomicAttacks). It executes each AtomicAttack in sequence and
aggregates the results into a ScenarioResult.
Example:
>>> from pyrit.scenarios import Scenario, AtomicAttack
>>> from pyrit.executor.attack.single_turn.prompt_sending import PromptSendingAttack
>>> from pyrit.prompt_target import OpenAIChatTarget
>>> from pyrit.prompt_converter import Base64Converter
>>>
>>> target = OpenAIChatTarget()
>>>
>>> # Create a custom scenario subclass
>>> class MyScenario(Scenario):
... async def _get_atomic_attacks_async(self) -> List[AtomicAttack]:
... base64_attack = PromptSendingAttack(
... objective_target=target,
... converters=[Base64Converter()]
... )
... return [
... AtomicAttack(
... attack=base64_attack,
... objectives=["Tell me how to make a bomb"]
... )
... ]
>>>
>>> # Create and execute scenario
>>> scenario = MyScenario(
... name="Security Test Campaign",
... version=1,
... attack_strategies=["base64"]
... )
>>> await scenario.initialize_async()
>>> result = await scenario.run_async()
>>> print(f"Completed {len(result.attack_results)} tests")
"""
[docs]
def __init__(
self,
*,
name: str,
version: int,
max_concurrency: int = 1,
memory_labels: Optional[Dict[str, str]] = None,
objective_target: Optional[PromptTarget] = None,
objective_scorer_identifier: Optional[Dict[str, str]] = None,
) -> None:
"""
Initialize a scenario.
Args:
name (str): Descriptive name for the scenario.
version (int): Version number of the scenario.
max_concurrency (int): Maximum number of concurrent attack executions. Defaults to 1.
memory_labels (Optional[Dict[str, str]]): Additional labels to apply to all
attack runs in the scenario. These help track and categorize the scenario.
objective_target (Optional[PromptTarget]): The target system to attack.
objective_scorer_identifier (Optional[Dict[str, str]]): Identifier for the objective scorer.
Note:
Attack runs are populated by calling initialize_async(), which invokes the
subclass's _get_attack_runs_async() method.
The scenario description is automatically extracted from the class's docstring (__doc__)
with whitespace normalized for display.
"""
# Use the class docstring with normalized whitespace as description
description = " ".join(self.__class__.__doc__.split()) if self.__class__.__doc__ else ""
self._identifier = ScenarioIdentifier(
name=type(self).__name__, scenario_version=version, description=description
)
self._objective_target = objective_target
if not objective_target:
raise ValueError("Objective target must be provided.")
self._objective_target_identifier = objective_target.get_identifier()
self._objective_scorer_identifier = objective_scorer_identifier or {}
self._name = name
self._memory_labels = memory_labels or {}
self._memory = CentralMemory.get_memory_instance()
self._max_concurrency = max_concurrency
self._atomic_attacks: List[AtomicAttack] = []
@property
def name(self) -> str:
"""Get the name of the scenario."""
return self._name
@property
def atomic_attack_count(self) -> int:
"""Get the number of atomic attacks in this scenario."""
return len(self._atomic_attacks)
[docs]
@classmethod
@abstractmethod
def get_strategy_class(cls) -> Type[ScenarioStrategy]:
"""
Get the strategy enum class for this scenario.
This abstract method must be implemented by all scenario subclasses to return
the ScenarioStrategy enum class that defines the available attack strategies
for the scenario.
Returns:
Type[ScenarioStrategy]: The strategy enum class (e.g., FoundryStrategy, EncodingStrategy).
Example:
>>> class MyScenario(Scenario):
... @classmethod
... def get_strategy_class(cls) -> Type[ScenarioStrategy]:
... return MyStrategy
>>>
>>> # Registry can now discover strategies without instantiation
>>> strategy_class = MyScenario.get_strategy_class()
>>> all_strategies = list(strategy_class)
"""
pass
[docs]
@classmethod
@abstractmethod
def get_default_strategy(cls) -> ScenarioStrategy:
"""
Get the default strategy used when no strategies are specified.
This abstract method must be implemented by all scenario subclasses to return
the default aggregate strategy (like EASY, ALL) used when scenario_strategies
parameter is None.
Returns:
ScenarioStrategy: The default aggregate strategy (e.g., FoundryStrategy.EASY, EncodingStrategy.ALL).
Example:
>>> class MyScenario(Scenario):
... @classmethod
... def get_default_strategy(cls) -> ScenarioStrategy:
... return MyStrategy.EASY
>>>
>>> # Registry can discover default strategy without instantiation
>>> default = MyScenario.get_default_strategy()
"""
pass
[docs]
async def initialize_async(self) -> None:
"""
Initialize the scenario by populating self._atomic_attacks
This method allows scenarios to be initialized with atomic attacks after construction,
which is useful when atomic attacks require async operations to be built.
Args:
atomic_attacks: List of AtomicAttack instances to execute in this scenario.
Returns:
Scenario: Self for method chaining.
Example:
>>> scenario = MyScenario(
... objective_target=target,
... attack_strategies=["base64", "leetspeak"]
... )
>>> atomic_attacks = await scenario.build_atomic_attacks_async()
>>> await scenario.initialize_async()
>>> results = await scenario.run_async()
"""
self._atomic_attacks = await self._get_atomic_attacks_async()
@abstractmethod
async def _get_atomic_attacks_async(self) -> List[AtomicAttack]:
"""
Retrieve the list of AtomicAttack instances in this scenario.
This method can be overridden by subclasses to perform async operations
needed to build or fetch the atomic attacks.
Returns:
List[AtomicAttack]: The list of AtomicAttack instances in this scenario.
"""
pass
[docs]
async def run_async(self) -> ScenarioResult:
"""
Execute all atomic attacks in the scenario sequentially.
Each AtomicAttack is executed in order, and all results are aggregated
into a ScenarioResult containing the scenario metadata and all attack results.
Args:
max_concurrency (int): Maximum number of concurrent attack executions
within each AtomicAttack. Defaults to 1 for sequential execution.
Returns:
ScenarioResult: Contains scenario identifier and aggregated list of all
attack results from all atomic attacks.
Raises:
ValueError: If the scenario has no atomic attacks configured. If your scenario
requires initialization, call await scenario.initialize() first.
Example:
>>> result = await scenario.run_async(max_concurrency=3)
>>> print(f"Scenario: {result.scenario_identifier.name}")
>>> print(f"Total results: {len(result.attack_results)}")
>>> for attack_result in result.attack_results:
... print(f"Objective: {attack_result.objective}, Outcome: {attack_result.outcome}")
"""
if not self._atomic_attacks:
raise ValueError(
"Cannot run scenario with no atomic attacks. Either supply them in initialization or"
"call await scenario.initialize_async() first."
)
logger.info(f"Starting scenario '{self._name}' execution with {len(self._atomic_attacks)} atomic attacks")
all_results: Dict[str, List[AttackResult]] = {}
for i, atomic_attack in enumerate(
tqdm(self._atomic_attacks, desc=f"Executing {self._name}", unit="attack"), start=1
):
logger.info(f"Executing atomic attack {i}/{len(self._atomic_attacks)} in scenario '{self._name}'")
try:
atomic_results = await atomic_attack.run_async(max_concurrency=self._max_concurrency)
all_results.setdefault(atomic_attack.atomic_attack_name, []).extend(atomic_results.results)
logger.info(
f"Atomic attack {i}/{len(self._atomic_attacks)} completed with "
f"{len(atomic_results.results)} results"
)
except Exception as e:
logger.error(
f"Atomic attack {i}/{len(self._atomic_attacks)} failed in scenario '{self._name}': {str(e)}"
)
raise ValueError(f"Failed to execute atomic attack {i} in scenario '{self._name}': {str(e)}") from e
logger.info(f"Scenario '{self._name}' completed successfully with {len(all_results)} total results")
result = ScenarioResult(
scenario_identifier=self._identifier,
objective_target_identifier=self._objective_target_identifier,
objective_scorer_identifier=self._objective_scorer_identifier,
labels=self._memory_labels,
attack_results=all_results,
)
self._memory.add_scenario_results_to_memory(scenario_results=[result])
return result