Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,37 @@ async def execute_attacks(
objectives = objectives_by_risk.get(risk_value, [])

if not objectives:
self.logger.info(f"No objectives for {risk_value}, skipping")
self.logger.info(f"No objectives for {risk_value}, recording as failed")
# Record zero-objective categories for every requested strategy
# so _determine_run_status detects the failure and errored
# counts reflect the gap.
from .._utils.formatting_utils import get_strategy_name

failed_entry = {
"data_file": "",
"status": "failed",
"error": "No attack objectives could be prepared for this risk category",
"asr": 0.0,
"expected_count": 0,
}
foundry_strats, special_strats = StrategyMapper.filter_for_foundry(attack_strategies)
for strategy in foundry_strats:
strategy_key = get_strategy_name(strategy)
if strategy_key not in red_team_info:
red_team_info[strategy_key] = {}
red_team_info[strategy_key][risk_value] = {**failed_entry}
for strategy in special_strats:
flat = strategy if not isinstance(strategy, list) else strategy[0]
if flat != AttackStrategy.Baseline:
strategy_key = get_strategy_name(strategy)
if strategy_key not in red_team_info:
red_team_info[strategy_key] = {}
red_team_info[strategy_key][risk_value] = {**failed_entry}
if include_baseline:
strategy_key = get_strategy_name(AttackStrategy.Baseline)
if strategy_key not in red_team_info:
red_team_info[strategy_key] = {}
red_team_info[strategy_key][risk_value] = {**failed_entry}
continue

self.logger.info(f"Processing {len(objectives)} objectives for {risk_value}")
Expand Down Expand Up @@ -186,6 +216,7 @@ async def execute_attacks(
"error": str(e),
"partial_failure": True,
"asr": 0.0,
"expected_count": len(objectives),
}
else:
self.logger.error(f"Error executing attacks for {risk_value}: {e}")
Expand All @@ -197,6 +228,7 @@ async def execute_attacks(
"status": "failed",
"error": str(e),
"asr": 0.0,
"expected_count": len(objectives),
}
continue

Expand All @@ -223,6 +255,7 @@ async def execute_attacks(
output_path=output_path,
attack_strategies=attack_strategies,
include_baseline=include_baseline,
num_objectives=len(objectives),
)

for strategy_name, strategy_data in strategy_results.items():
Expand Down Expand Up @@ -357,6 +390,7 @@ def _group_results_by_strategy(
output_path: str,
attack_strategies: List[Union[AttackStrategy, List[AttackStrategy]]],
include_baseline: bool,
num_objectives: int = 0,
) -> Dict[str, Dict[str, Any]]:
"""Group attack results by strategy for red_team_info format.

Expand All @@ -375,6 +409,8 @@ def _group_results_by_strategy(
:type attack_strategies: List[Union[AttackStrategy, List[AttackStrategy]]]
:param include_baseline: Whether baseline was included in execution
:type include_baseline: bool
:param num_objectives: Number of objectives sent for this risk category
:type num_objectives: int
:return: Dictionary mapping strategy name to result data
:rtype: Dict[str, Dict[str, Any]]
"""
Expand All @@ -395,6 +431,7 @@ def _group_results_by_strategy(
"data_file": output_path,
"status": "completed",
"asr": overall_asr,
"expected_count": num_objectives,
}

# Add entries for special strategies that were executed (e.g., IndirectJailbreak via XPIA)
Expand All @@ -407,6 +444,7 @@ def _group_results_by_strategy(
"data_file": output_path,
"status": "completed",
"asr": overall_asr,
"expected_count": num_objectives,
}

# Add baseline entry if it was included
Expand All @@ -415,6 +453,7 @@ def _group_results_by_strategy(
"data_file": output_path,
"status": "completed",
"asr": overall_asr,
"expected_count": num_objectives,
}

# Fallback if no strategies produced results
Expand All @@ -423,6 +462,7 @@ def _group_results_by_strategy(
"data_file": output_path,
"status": "completed",
"asr": overall_asr,
"expected_count": num_objectives,
}

return results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,16 +1424,55 @@ def _format_thresholds_for_output(self) -> Dict[str, Any]:
return formatted_thresholds

@staticmethod
def _compute_result_count(output_items: List[Dict[str, Any]]) -> Dict[str, int]:
def _extract_expected_total(red_team_info: Optional[Dict]) -> Optional[int]:
"""Extract the total expected objective count from red_team_info.

Each strategy/risk-category entry may carry an ``expected_count`` field
set by the execution layer. Since the same risk category can appear
under multiple strategies (e.g. baseline + crescendo), we de-duplicate
by risk category to avoid double-counting.

:param red_team_info: The red_team_info tracking dictionary
:return: Total expected objectives, or None if information is unavailable
"""
if not isinstance(red_team_info, dict):
return None

# Collect the max expected_count per risk category across strategies
per_risk: Dict[str, int] = {}
for risk_data in red_team_info.values():
if not isinstance(risk_data, dict):
continue
for risk_category, details in risk_data.items():
if not isinstance(details, dict):
continue
count = details.get("expected_count")
if count is not None:
try:
per_risk[risk_category] = max(per_risk.get(risk_category, 0), int(count))
except (ValueError, TypeError):
continue

if not per_risk:
return None

return sum(per_risk.values())

@staticmethod
def _compute_result_count(
output_items: List[Dict[str, Any]],
expected_total: Optional[int] = None,
) -> Dict[str, int]:
"""Aggregate run-level pass/fail counts from individual output items.

Counts reflect attack success rate (ASR) semantics:
- passed: attacks that were unsuccessful (system defended successfully)
- failed: attacks that were successful (system was compromised)
- errored: rows that failed to process due to errors
- errored: rows that failed to process due to errors, plus any
objectives that never produced output items (expected vs actual delta)
"""

total = len(output_items)
actual_total = len(output_items)
passed = failed = errored = 0

for item in output_items:
Expand Down Expand Up @@ -1474,6 +1513,15 @@ def _compute_result_count(output_items: List[Dict[str, Any]]) -> Dict[str, int]:
else:
errored += 1

# Account for objectives that never produced output items (e.g., attack
# execution errors that were caught and swallowed, or risk categories
# with zero objectives prepared).
if expected_total is not None and expected_total > actual_total:
missing = expected_total - actual_total
errored += missing

total = expected_total if expected_total is not None and expected_total > actual_total else actual_total

return {
"total": total,
"passed": passed,
Expand Down Expand Up @@ -1675,7 +1723,7 @@ def _determine_run_status(
if not isinstance(details, dict):
continue
status = details.get("status", "").lower()
if status in ("incomplete", "failed", "timeout", "pending", "running"):
if status in ("incomplete", "failed", "timeout", "pending", "running", "partial_failure"):
return "failed"

return "completed"
Expand Down Expand Up @@ -1769,7 +1817,10 @@ def _build_results_payload(
if run_name is None:
run_name = scan_name or f"redteam-run-{run_id[:8]}"

result_count = self._compute_result_count(output_items)
result_count = self._compute_result_count(
output_items,
expected_total=self._extract_expected_total(red_team_info),
)
per_testing_results = self._compute_per_testing_criteria(output_items)
data_source = self._build_data_source_section(parameters, red_team_info)
status = self._determine_run_status(scan_result, red_team_info, output_items)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@ def test_group_results_by_strategy_with_indirect_jailbreak(

@pytest.mark.asyncio
async def test_execute_attacks_empty_objectives(self, mock_credential, mock_azure_ai_project, mock_logger):
"""Test execute_attacks with no objectives."""
"""Test execute_attacks with no objectives for any risk category."""
manager = FoundryExecutionManager(
credential=mock_credential,
azure_ai_project=mock_azure_ai_project,
Expand All @@ -1762,12 +1762,103 @@ async def test_execute_attacks_empty_objectives(self, mock_credential, mock_azur
result = await manager.execute_attacks(
objective_target=mock_target,
risk_categories=[RiskCategory.Violence],
attack_strategies=[AttackStrategy.Base64],
attack_strategies=[AttackStrategy.Baseline],
objectives_by_risk={}, # No objectives
)

# Should return empty dict when no objectives
assert result == {}
# When no objectives are available at all the category should be
# recorded as failed in red_team_info so _determine_run_status can
# detect the gap.
assert "baseline" in result
assert "violence" in result["baseline"]
entry = result["baseline"]["violence"]
assert entry["status"] == "failed"
assert entry["expected_count"] == 0
assert "error" in entry

@pytest.mark.asyncio
async def test_execute_attacks_zero_objectives_records_failed(
self, mock_credential, mock_azure_ai_project, mock_logger
):
"""When a risk category has zero objectives, it should be recorded as failed
in red_team_info so that _determine_run_status marks the run as failed."""
manager = FoundryExecutionManager(
credential=mock_credential,
azure_ai_project=mock_azure_ai_project,
logger=mock_logger,
output_dir="/test/output",
)

mock_target = MagicMock()

result = await manager.execute_attacks(
objective_target=mock_target,
risk_categories=[RiskCategory.Violence, RiskCategory.SelfHarm],
attack_strategies=[AttackStrategy.Baseline],
objectives_by_risk={
"violence": [], # explicitly empty
# self_harm not present at all
},
)

# Both risk categories should be recorded as failed
assert "baseline" in result
assert result["baseline"]["violence"]["status"] == "failed"
assert result["baseline"]["self_harm"]["status"] == "failed"

@pytest.mark.asyncio
async def test_execute_attacks_zero_objectives_records_all_strategies(
self, mock_credential, mock_azure_ai_project, mock_logger
):
"""When a risk category has zero objectives with multiple strategies,
failed entries should be created for every strategy, not just baseline."""
manager = FoundryExecutionManager(
credential=mock_credential,
azure_ai_project=mock_azure_ai_project,
logger=mock_logger,
output_dir="/test/output",
)

mock_target = MagicMock()

result = await manager.execute_attacks(
objective_target=mock_target,
risk_categories=[RiskCategory.Violence],
attack_strategies=[AttackStrategy.Base64, AttackStrategy.Baseline],
objectives_by_risk={}, # No objectives
)

# Both base64 and baseline strategies should have a failed entry
assert "base64" in result
assert result["base64"]["violence"]["status"] == "failed"
assert "baseline" in result
assert result["baseline"]["violence"]["status"] == "failed"

def test_group_results_by_strategy_includes_expected_count(
self, mock_credential, mock_azure_ai_project, mock_logger
):
"""Verify _group_results_by_strategy includes expected_count in entries."""
manager = FoundryExecutionManager(
credential=mock_credential,
azure_ai_project=mock_azure_ai_project,
logger=mock_logger,
output_dir="/test/output",
)

mock_orchestrator = MagicMock()
mock_orchestrator.calculate_asr.return_value = 0.5

results = manager._group_results_by_strategy(
orchestrator=mock_orchestrator,
risk_value="violence",
output_path="/test/output.jsonl",
attack_strategies=[AttackStrategy.Baseline],
include_baseline=True,
num_objectives=32,
)

assert "baseline" in results
assert results["baseline"]["expected_count"] == 32

@pytest.mark.asyncio
async def test_execute_attacks_filters_multi_turn_without_adversarial(
Expand Down
Loading