""" Synthetic ad queue generation. Generates a complete queue of ads for a given task configuration, including all pre-generated investigation data. When the agent investigates, the environment just reveals pre-computed data. """ from __future__ import annotations import random from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Tuple from .advertiser_profiles import AdvertiserProfile, generate_advertiser_profile from .fraud_patterns import FRAUD_TEMPLATES, LEGIT_TEMPLATES, AdTemplate from .landing_pages import LandingPageData, generate_landing_page from .network_generator import FraudRing, generate_fraud_networks # Decoy pools: values that can appear in both legit and fraud ads, # making naive pattern-matching unreliable. _DECOY_REGISTRARS = ["NameSilo", "Cloudflare Registrar", "GoDaddy", "Tucows (privacy proxy)"] _DECOY_PAYMENT_TYPES = ["credit_card", "prepaid_card", "corporate_card"] _COMMON_TARGETING_SEGMENTS = [ "Adults 25-54, interests: shopping, lifestyle", "Adults 18-45, interests: technology, gadgets", "Adults 30-55, interests: finance, investing", ] # Curriculum escalation category pools. `_TASK_1_FRAUD_POOL` is the novice # fraudster's toolkit (only two obvious scam templates + legit camouflage), # `_TASK_2_FRAUD_POOL` adds mid-tier deceptive patterns, and `task_3` uses # the server-side default which includes the network_* ring categories. _LEGIT_CAMOUFLAGE = ("ecommerce", "saas", "local_service", "education", "fitness") _TASK_1_ALLOWED_CATEGORIES: List[str] = list(_LEGIT_CAMOUFLAGE) + [ "fake_giveaway", "miracle_cure", ] _TASK_2_ALLOWED_CATEGORIES: List[str] = _TASK_1_ALLOWED_CATEGORIES + [ "counterfeit_goods", "advance_fee", "fake_crypto", "celebrity_endorsement_fraud", "clone_brand", "gray_area_supplements", ] @dataclass class TaskConfig: task_id: str name: str difficulty: str queue_size: int action_budget: int n_legit: int n_fraud: int n_escalate: int include_networks: bool n_fraud_rings: int allowed_difficulties: List[str] description: str max_rounds: Optional[int] = None max_proposals: Optional[int] = None max_fraudster_actions_per_turn: Optional[int] = None max_investigator_actions_per_turn: Optional[int] = None allowed_fraud_categories: Optional[List[str]] = None TASK_CONFIGS: Dict[str, TaskConfig] = { "task_1": TaskConfig( task_id="task_1", name="Basic Ad Triage", difficulty="easy", queue_size=5, action_budget=25, n_legit=2, n_fraud=3, n_escalate=0, include_networks=False, n_fraud_rings=0, allowed_difficulties=["easy"], description=( "Learn the investigation loop. Queue of 5 ads with obviously " "fraudulent or clearly legitimate signals. Generous budget of 25 " "actions (5 per ad). Novice Fraudster: only fake-giveaway and " "miracle-cure templates allowed. Capped at 3 proposals so the " "queue never exceeds 8 ads (~3 actions per ad even after the " "Fraudster maxes out)." ), max_rounds=4, max_proposals=3, max_fraudster_actions_per_turn=3, max_investigator_actions_per_turn=6, allowed_fraud_categories=_TASK_1_ALLOWED_CATEGORIES, ), "task_2": TaskConfig( task_id="task_2", name="Sophisticated Fraud Under Budget Pressure", difficulty="medium", queue_size=12, action_budget=30, n_legit=5, n_fraud=5, n_escalate=2, include_networks=False, n_fraud_rings=0, allowed_difficulties=["easy", "medium"], description=( "Triage under budget constraints. Mix of legit ads, sophisticated " "scams, and gray-area cases. 12 ads but only 30 actions (~2.5 per ad). " "Agent must prioritize which ads to investigate deeply. " "Mid-tier Fraudster: adds counterfeit, clone-brand, advance-fee, " "crypto, celebrity-endorsement, and gray-area supplement templates." ), max_rounds=4, max_proposals=6, max_fraudster_actions_per_turn=3, max_investigator_actions_per_turn=6, allowed_fraud_categories=_TASK_2_ALLOWED_CATEGORIES, ), "task_3": TaskConfig( task_id="task_3", name="Coordinated Fraud Network Detection", difficulty="hard", queue_size=20, action_budget=35, n_legit=6, n_fraud=10, n_escalate=4, include_networks=True, n_fraud_rings=3, allowed_difficulties=["easy", "medium", "hard"], description=( "Full challenge including coordinated fraud rings. 20 ads with 3 " "hidden fraud networks using varied topologies (cliques, chains, " "hub-and-spoke). Budget of 35 actions (~1.75 per ad). Ring member " "ads look borderline individually — the agent must cross-reference " "investigation data across ads to detect shared signals. " "Sophisticated Fraudster: 5 rounds, 7 proposals, full category " "palette including network_* ring templates." ), max_rounds=5, max_proposals=7, max_fraudster_actions_per_turn=3, max_investigator_actions_per_turn=7, allowed_fraud_categories=None, ), # Held-out generalisation eval — same template universe + ring topologies # as task_3, but a strictly tighter budget regime (25 ads / 30 actions = # ~1.2 actions/ad vs task_3's ~1.75) and one extra ring. No training # seeds in TRAINING_SEED_TIERS — this task's seeds (4001..4005 in # eval_suite.EVAL_SEEDS) are reserved for measuring whether the trained # Investigator generalises beyond the budget distribution it was trained # on, not just to fresh seeds within the same budget. See # ANALYSIS.md §3.1 and ROUND_2_Q5_REALISM_REWARDS_TRAINING.md §5.1. "task_3_unseen": TaskConfig( task_id="task_3_unseen", name="Networks Under Tighter Budget (Held-out Eval)", difficulty="hard", queue_size=25, action_budget=30, n_legit=8, n_fraud=12, n_escalate=5, include_networks=True, n_fraud_rings=4, allowed_difficulties=["easy", "medium", "hard"], description=( "Held-out generalisation eval. Same fraud + escalate templates " "and ring topologies as task_3, but the budget regime is " "deliberately unseen: 25 ads with only 30 actions (~1.2/ad vs " "task_3's ~1.75) and 4 hidden rings instead of 3. Used by " "eval_suite.run_before_after to test whether the Investigator " "learned the underlying detection skill or just over-fit to the " "training budget distribution. Never appears in TRAINING_SEED_TIERS." ), max_rounds=5, max_proposals=8, max_fraudster_actions_per_turn=3, max_investigator_actions_per_turn=7, allowed_fraud_categories=None, ), } @dataclass class CampaignProfile: """Campaign-level metadata associated with an ad.""" objective: str # e.g. "conversions", "traffic", "awareness", "app_installs" bid_strategy: str # e.g. "lowest_cost", "cost_cap", "bid_cap" daily_budget_usd: float ad_set_count: int placements: List[str] def to_investigation_text(self, account_age_days: int) -> str: budget_age_ratio = ( self.daily_budget_usd / max(account_age_days, 1) ) placements_str = ", ".join(self.placements) lines = [ f"Campaign Objective: {self.objective}", f"Bid Strategy: {self.bid_strategy}", f"Daily Budget: ${self.daily_budget_usd:,.2f} " f"(account is {account_age_days} days old — " f"budget/age ratio: ${budget_age_ratio:,.2f}/day)", f"Active Ad Sets: {self.ad_set_count}", f"Placements: {placements_str}", ] warnings = [] if budget_age_ratio > 50: warnings.append( "Budget-to-account-age ratio exceeds typical thresholds." ) if self.ad_set_count > 15: warnings.append( f"High ad set count ({self.ad_set_count}) — " "possible policy evasion testing via creative variation." ) if self.objective in ("traffic", "awareness") and self.bid_strategy == "lowest_cost": warnings.append( f"Optimizing for {self.objective} with lowest-cost bidding " "— common in spray-and-pray fraud campaigns." ) if "Audience Network" in self.placements and len(self.placements) <= 2: warnings.append( "Heavy reliance on Audience Network placement — " "higher bot traffic exposure." ) if warnings: for w in warnings: lines.append(f" WARNING: {w}") else: lines.append("Budget and pacing consistent with historical account behavior.") return "\n".join(lines) @dataclass class Ad: ad_id: str ad_copy: str category: str targeting_summary: str initial_risk_signals: List[str] ground_truth_label: str # "fraud", "legit", or "escalate" fraud_type: str severity: float difficulty: str @dataclass class GeneratedEpisode: """All pre-generated data for one episode.""" task_config: TaskConfig ads: List[Ad] advertiser_profiles: Dict[str, AdvertiserProfile] campaign_profiles: Dict[str, CampaignProfile] landing_pages: Dict[str, LandingPageData] fraud_rings: List[FraudRing] ad_to_rings: Dict[str, List[str]] investigation_data: Dict[str, Dict[str, str]] def generate_episode(seed: int, task_id: str = "task_1") -> GeneratedEpisode: """Generate a complete episode with all pre-computed investigation data.""" rng = random.Random(seed) config = TASK_CONFIGS[task_id] ads = _generate_ad_queue(rng, config) fraud_ad_ids = [a.ad_id for a in ads if a.ground_truth_label == "fraud"] fraud_rings: List[FraudRing] = [] ad_to_rings: Dict[str, List[str]] = {} ring_shared_payments: Dict[str, str] = {} if config.include_networks and config.n_fraud_rings > 0: fraud_rings, ad_to_rings = generate_fraud_networks( rng, config.n_fraud_rings, fraud_ad_ids ) for ring in fraud_rings: if "payment_method" in ring.shared_signals: for ad_id in ring.member_ad_ids: ring_shared_payments[ad_id] = ring.shared_signals["payment_method"] advertiser_profiles: Dict[str, AdvertiserProfile] = {} campaign_profiles: Dict[str, CampaignProfile] = {} landing_pages: Dict[str, LandingPageData] = {} investigation_data: Dict[str, Dict[str, str]] = {} ring_campaign_overrides: Dict[str, Dict[str, Any]] = {} ring_created_dates: Dict[str, str] = {} for ring in fraud_rings: shared_objective = rng.choice(["traffic", "awareness"]) shared_bid = "lowest_cost" # Ring members share account creation dates within the same week from datetime import date, timedelta base_date = date(2026, 4, 6) - timedelta(days=rng.randint(5, 45)) for ad_id in ring.member_ad_ids: ring_campaign_overrides[ad_id] = { "objective": shared_objective, "bid_strategy": shared_bid, } offset = timedelta(days=rng.randint(0, 6)) ring_created_dates[ad_id] = (base_date + offset).isoformat() for ad in ads: is_fraud = ad.ground_truth_label in ("fraud", "escalate") profile = generate_advertiser_profile( rng, ad.ad_id, is_fraud, payment_method_id=ring_shared_payments.get(ad.ad_id), ring_created_date=ring_created_dates.get(ad.ad_id), ) advertiser_profiles[ad.ad_id] = profile campaign = _generate_campaign_profile( rng, ad, is_fraud, ring_overrides=ring_campaign_overrides.get(ad.ad_id), ) campaign_profiles[ad.ad_id] = campaign landing_page_kwargs = {} if ad.ad_id in ad_to_rings: ring = next(r for r in fraud_rings if ad.ad_id in r.member_ad_ids) if "domain_registrar" in ring.shared_signals: landing_page_kwargs["registrar_override"] = ring.shared_signals["domain_registrar"] elif not is_fraud and rng.random() < 0.25: landing_page_kwargs["registrar_override"] = rng.choice(_DECOY_REGISTRARS) lp = generate_landing_page( rng, ad.ad_id, is_fraud, ad.fraud_type, **landing_page_kwargs ) landing_pages[ad.ad_id] = lp inv = {} inv["advertiser_history"] = profile.to_investigation_text() inv["landing_page"] = lp.to_investigation_text() inv["payment_method"] = _generate_payment_investigation(rng, profile, ad.ad_id, ad_to_rings, fraud_rings) inv["targeting_overlap"] = _generate_targeting_investigation(rng, ad, ads, ad_to_rings, fraud_rings) inv["campaign_structure"] = _generate_campaign_investigation( rng, ad, campaign, profile, ad_to_rings, fraud_rings, ) inv["policy_classifier"] = _generate_policy_classifier_investigation(ad, lp) investigation_data[ad.ad_id] = inv return GeneratedEpisode( task_config=config, ads=ads, advertiser_profiles=advertiser_profiles, campaign_profiles=campaign_profiles, landing_pages=landing_pages, fraud_rings=fraud_rings, ad_to_rings=ad_to_rings, investigation_data=investigation_data, ) def _generate_ad_queue(rng: random.Random, config: TaskConfig) -> List[Ad]: """Build the ad queue by sampling from templates.""" ads: List[Ad] = [] ad_counter = 0 legit_templates = [t for t in LEGIT_TEMPLATES] fraud_templates = [ t for t in FRAUD_TEMPLATES if t.difficulty in config.allowed_difficulties and t.label == "fraud" ] escalate_templates = [ t for t in FRAUD_TEMPLATES if t.difficulty in config.allowed_difficulties and t.label == "escalate" ] if not escalate_templates: escalate_templates = [ t for t in FRAUD_TEMPLATES if t.label == "escalate" ] for _ in range(config.n_legit): template = rng.choice(legit_templates) idx = rng.randint(0, len(template.ad_copies) - 1) ad_counter += 1 ads.append(Ad( ad_id=f"ad_{ad_counter:03d}", ad_copy=template.ad_copies[idx], category=template.category, targeting_summary=template.targeting_hints[idx % len(template.targeting_hints)], initial_risk_signals=list(template.risk_signals), ground_truth_label=template.label, fraud_type=template.fraud_type, severity=template.severity, difficulty=template.difficulty, )) for _ in range(config.n_fraud): if fraud_templates: template = rng.choice(fraud_templates) else: template = rng.choice(FRAUD_TEMPLATES) idx = rng.randint(0, len(template.ad_copies) - 1) ad_counter += 1 ads.append(Ad( ad_id=f"ad_{ad_counter:03d}", ad_copy=template.ad_copies[idx], category=template.category, targeting_summary=template.targeting_hints[idx % len(template.targeting_hints)], initial_risk_signals=list(template.risk_signals), ground_truth_label="fraud", fraud_type=template.fraud_type, severity=template.severity, difficulty=template.difficulty, )) for _ in range(config.n_escalate): if escalate_templates: template = rng.choice(escalate_templates) idx = rng.randint(0, len(template.ad_copies) - 1) ad_counter += 1 ads.append(Ad( ad_id=f"ad_{ad_counter:03d}", ad_copy=template.ad_copies[idx], category=template.category, targeting_summary=template.targeting_hints[idx % len(template.targeting_hints)], initial_risk_signals=list(template.risk_signals), ground_truth_label="escalate", fraud_type=template.fraud_type, severity=template.severity, difficulty=template.difficulty, )) rng.shuffle(ads) renumbered = [] for i, ad in enumerate(ads): ad.ad_id = f"ad_{i + 1:03d}" renumbered.append(ad) return renumbered def _generate_payment_investigation( rng: random.Random, profile: AdvertiserProfile, ad_id: str, ad_to_rings: Dict[str, List[str]], fraud_rings: List[FraudRing], ) -> str: """Generate payment method investigation text. Ring signals are embedded as raw data values (shared payment IDs) without explicitly naming other ads. The agent must cross-reference across ads. """ lines = [ f"Payment Method Analysis for {ad_id}:", f" Method type: {profile.payment_method_type}", f" Payment ID: {profile.payment_method_id}", ] if profile.payment_method_type in ("prepaid_card", "crypto", "virtual_card"): lines.append(f" Note: {profile.payment_method_type} payments have elevated fraud correlation in platform data.") if profile.previous_violations > 0: lines.append(f" Chargeback/dispute history: {profile.previous_violations} incident(s) on record.") else: lines.append(" Chargeback/dispute history: Clean record.") velocity = rng.randint(1, 5) if ad_id not in ad_to_rings else rng.randint(3, 12) lines.append(f" Payment method added to {velocity} advertiser account(s) in the last 90 days.") if profile.account_age_days < 30: lines.append(f" First charge on this method: {profile.account_age_days} days ago.") return "\n".join(lines) def _generate_targeting_investigation( rng: random.Random, ad: Ad, all_ads: List[Ad], ad_to_rings: Dict[str, List[str]], fraud_rings: List[FraudRing], ) -> str: """Generate targeting overlap investigation text. Ring members share an exact targeting fingerprint, presented as raw data. The agent must compare fingerprints across ads to detect collusion. """ lines = [ f"Targeting Analysis for {ad.ad_id}:", f" Declared targeting: {ad.targeting_summary}", ] if ad.ad_id in ad_to_rings: ring = next(r for r in fraud_rings if ad.ad_id in r.member_ad_ids) if "targeting_overlap" in ring.shared_signals: lines.append(f" Targeting fingerprint: {ring.shared_signals['targeting_overlap']}") overlap_pct = rng.randint(85, 98) lines.append(f" Audience overlap with platform average for category: {overlap_pct}%") else: fingerprint = f"seg_{rng.randint(10000, 99999)}" lines.append(f" Targeting fingerprint: {fingerprint}") overlap_pct = rng.randint(20, 55) lines.append(f" Audience overlap with platform average for category: {overlap_pct}%") else: fingerprint = f"seg_{rng.randint(10000, 99999)}" lines.append(f" Targeting fingerprint: {fingerprint}") similar = [a for a in all_ads if a.ad_id != ad.ad_id and a.category == ad.category] if similar: overlap_pct = rng.randint(30, 65) lines.append(f" {len(similar)} other ad(s) in same category ({ad.category}) in queue.") lines.append(f" Audience overlap with platform average for category: {overlap_pct}%") else: overlap_pct = rng.randint(10, 40) lines.append(f" Audience overlap with platform average for category: {overlap_pct}%") geo_regions = rng.randint(1, 8) if ad.ground_truth_label != "legit" else rng.randint(1, 3) lines.append(f" Geographic regions targeted: {geo_regions}") return "\n".join(lines) def _generate_policy_classifier_investigation( ad: Ad, landing_page: Optional[LandingPageData] = None, ) -> str: """Mock Llama Guard 3 / Purple Llama classification for the ad. Wraps ``policy_classifier_data.classify_ad``. Deterministic per ad_id (seeded RNG inside the classifier), ground-truth correlated, and produces the same text shape the Investigator sees for every other investigation target. See ``counterfeint/data/policy_classifier_data.py`` for the category taxonomy and marker heuristics. """ from .policy_classifier_data import classify_ad landing_text = landing_page.content_summary if landing_page is not None else "" result = classify_ad( ad_id=ad.ad_id, ad_copy=ad.ad_copy, landing_page_text=landing_text, ground_truth_label=ad.ground_truth_label, fraud_type=ad.fraud_type or None, ) return result.to_investigation_text() _LEGIT_OBJECTIVES = ["conversions", "leads", "sales", "app_installs"] _FRAUD_OBJECTIVES = ["traffic", "awareness", "reach", "engagement"] _LEGIT_BID_STRATEGIES = ["cost_cap", "bid_cap", "target_cost"] _FRAUD_BID_STRATEGIES = ["lowest_cost", "lowest_cost", "lowest_cost", "cost_cap"] _LEGIT_PLACEMENTS = [ ["Facebook Feed", "Instagram Feed"], ["Facebook Feed", "Instagram Feed", "Instagram Stories"], ["Facebook Feed"], ["Facebook Feed", "Instagram Feed", "Instagram Reels"], ] _FRAUD_PLACEMENTS = [ ["Audience Network", "Facebook Feed"], ["Audience Network", "Facebook Feed", "Instagram Stories"], ["Facebook Feed", "Instagram Feed", "Audience Network", "Messenger"], ["Audience Network"], ] def _generate_campaign_profile( rng: random.Random, ad: Ad, is_fraud: bool, *, ring_overrides: Optional[Dict[str, Any]] = None, ) -> CampaignProfile: """Generate campaign-level metadata for an ad.""" if is_fraud: objective = rng.choice(_FRAUD_OBJECTIVES) bid_strategy = rng.choice(_FRAUD_BID_STRATEGIES) daily_budget = round(rng.uniform(500, 5000), 2) ad_set_count = rng.randint(8, 50) placements = rng.choice(_FRAUD_PLACEMENTS) else: objective = rng.choice(_LEGIT_OBJECTIVES) bid_strategy = rng.choice(_LEGIT_BID_STRATEGIES) daily_budget = round(rng.uniform(20, 500), 2) ad_set_count = rng.randint(1, 5) placements = rng.choice(_LEGIT_PLACEMENTS) if ring_overrides: objective = ring_overrides.get("objective", objective) bid_strategy = ring_overrides.get("bid_strategy", bid_strategy) return CampaignProfile( objective=objective, bid_strategy=bid_strategy, daily_budget_usd=daily_budget, ad_set_count=ad_set_count, placements=list(placements), ) def _generate_campaign_investigation( rng: random.Random, ad: Ad, campaign: CampaignProfile, profile: AdvertiserProfile, ad_to_rings: Dict[str, List[str]], fraud_rings: List[FraudRing], ) -> str: """Generate campaign structure investigation text. Ring members share campaign configurations but no explicit cross-references. The agent must compare objective/bid/budget patterns across ads. """ lines = [ f"Campaign Structure Analysis for {ad.ad_id}:", campaign.to_investigation_text(profile.account_age_days), ] config_hash = f"cfg_{hash((campaign.objective, campaign.bid_strategy)) & 0xFFFF:04x}" lines.append(f" Campaign configuration fingerprint: {config_hash}") return "\n".join(lines) # --------------------------------------------------------------------------- # Fraudster-proposal extension (Round 2) # --------------------------------------------------------------------------- def _category_to_fraud_template(category: str) -> AdTemplate: """Pick the closest matching FRAUD_TEMPLATE for a Fraudster-declared category.""" for tmpl in FRAUD_TEMPLATES: if tmpl.category == category: return tmpl return FRAUD_TEMPLATES[0] def generate_proposal_data( *, rng: random.Random, ad_id: str, ad_copy: str, category: str, landing_page_blurb: Optional[str] = None, targeting_summary: Optional[str] = None, existing_ads: Optional[List[Ad]] = None, ) -> Tuple[Ad, Dict[str, str], AdvertiserProfile, CampaignProfile, "LandingPageData"]: """ Build a fully-formed Ad + investigation_data for a Fraudster-proposed ad. The Fraudster controls the *surface*: ad_copy, category, landing page blurb, targeting summary. Underlying account / payment / campaign signals are sampled from the fraud-mode distribution so the Investigator has a real detection task. Returns ------- ad The Ad object (ground_truth_label="fraud"). investigation_data Dict[str, str] keyed by investigation target name (the 6 canonical targets), already rendered to text. profile, campaign, landing_page The auxiliary data structures, returned in case the caller wants to register them on a GeneratedEpisode. """ template = _category_to_fraud_template(category) ad = Ad( ad_id=ad_id, ad_copy=ad_copy.strip()[:2000] if ad_copy else template.ad_copies[0], category=category, targeting_summary=( targeting_summary.strip()[:512] if targeting_summary else template.targeting_hints[0] ), initial_risk_signals=list(template.risk_signals), ground_truth_label="fraud", fraud_type=template.fraud_type or "fraudster_proposal", severity=template.severity if template.severity > 0 else 0.6, difficulty=template.difficulty, ) profile = generate_advertiser_profile(rng, ad_id, is_fraud=True) campaign = _generate_campaign_profile(rng, ad, is_fraud=True) landing_page = generate_landing_page(rng, ad_id, is_fraud=True, fraud_type=ad.fraud_type) if landing_page_blurb: from dataclasses import replace landing_page = replace( landing_page, content_summary=landing_page_blurb.strip()[:2000], ) siblings = list(existing_ads or []) siblings.append(ad) investigation_data: Dict[str, str] = { "advertiser_history": profile.to_investigation_text(), "landing_page": landing_page.to_investigation_text(), "payment_method": _generate_payment_investigation( rng, profile, ad_id, ad_to_rings={}, fraud_rings=[] ), "targeting_overlap": _generate_targeting_investigation( rng, ad, siblings, ad_to_rings={}, fraud_rings=[] ), "campaign_structure": _generate_campaign_investigation( rng, ad, campaign, profile, ad_to_rings={}, fraud_rings=[] ), "policy_classifier": _generate_policy_classifier_investigation(ad, landing_page), } return ad, investigation_data, profile, campaign, landing_page