# eval_benchmark_multithreaded.py """Unified benchmarking script for ReCall, ZeroSearch, and R1‑Searcher with optional multi‑threaded execution. Example usage (single‑threaded) ------------------------------- ```bash python eval_benchmark.py \ --dataset frames \ --agent r1-searcher \ --model-url http://0.0.0.0:1233 \ --out-base /tmp/evals \ --mode single ``` Example usage (multi‑threaded, 128 workers) ------------------------------------------ ```bash python eval_benchmark.py \ --dataset frames \ --agent recall \ --model-url http://0.0.0.0:1231 \ --out-base /tmp/evals \ --mode multi \ --workers 128 ``` The script will: 1. Load the specified dataset JSONL file that contains objects with keys `question` and `answer`. 2. Build the chosen agent wrapper (`recall`, `zerosearch`, or `r1-searcher`). 3. Stream one JSONL line per example with *all* details needed for analysis. 4. Optionally run the evaluation loop in parallel using a configurable number of worker threads. 5. Automatically construct the output path as: ``` {out_base}/{model_name}/{dataset}.jsonl ``` where `model_name` is derived from the `--model-url` (characters after the last `/`). """ from __future__ import annotations import argparse import json import logging import os import pathlib import re import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, List import unicodedata from openai import OpenAI, APIStatusError from tqdm import tqdm # -------------------------------------------------------------------- # Agent imports (ensure PYTHONPATH is set appropriately) # -------------------------------------------------------------------- from re_call import ReCall # user's wrapper # from re_call import ZeroSearchInference, ZeroSearchConfig # from re_call import R1Searcher, R1SearchConfig as R1Cfg # from re_call import O1Cfg, O1Searcher from pathlib import Path # from re_call import SDSCfg, SDSSearcher # -------------------------------------------------------------------- # Environment Keys – override with real keys or environment variables # -------------------------------------------------------------------- #for recall # search_env = "from search_api import web_search, web_visit" # search_schemas =[ # { # "name": "web_search", # "description": "Google search and return links to web-pages with a brief snippet given a text query", # "parameters": { # "type": "object", # "properties": { # "query": {"type": "string"}, # }, # "required": ["query"], # }, # }, # { # "name": "web_visit", # "description": "Visit webpage and return its content", # "parameters": { # "type": "object", # "properties": { # "url": {"type": "string", "description": "The URL of the webpage to visit. Must be a single URL"}, # }, # "required": ["url"], # }, # } # ] # for recall search_env = "from search_api import search_urls, open_url, search_and_parse_query, query_url" search_schemas =[ { "name": "search_urls", "description": "Google search and return links to web-pages with a brief snippet given a text query", "parameters": { "type": "object", "properties": { "query": {"type": "string"}, "top_k": {"type": "integer", "default": 10}, }, "required": ["query"], }, }, { "name": "query_url", "description": "Visit webpage and return evidence based retrival for the provided goal", "parameters": { "type": "object", "properties": { "url": {"type": "string", "description": "The URL of the webpage to visit. Must be a single URL"}, "goal": {"type": "string", "description": "The specific information goal for visiting webpage"}, }, "required": ["url", "goal"], }, } ] EXECUTOR_URL = os.environ["HOST_SERPER_URL"] DATA_ROOT = pathlib.Path("./eval_datasets") SEM = threading.Semaphore(3) # limit concurrent judge calls JUDGE_MODEL = "gpt-4.1-mini" try: base = Path(__file__).resolve().parent except NameError: # e.g., REPL/Jupyter base = Path.cwd() TOKENIZER_DIR = (base / "tokenizer-info").resolve() # ───────────────────────── tokenizer ──────────────────────────────────────── try: from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_DIR, trust_remote_code=True) except Exception as e: import sys sys.exit(f"❌ Could not load Qwen3 tokenizer: {e}") import hashlib def get_uid(sample: dict) -> str: """Generate a UID using SHA256 hash of question.""" return hashlib.sha256(sample["question"].strip().encode("utf-8")).hexdigest() # -------------------------------------------------------------------- # Regex & utilities # -------------------------------------------------------------------- def extract_answer_tagged(text: str) -> str: ANS_RE = re.compile(r"(.*?)", re.S) match = ANS_RE.findall(text) if match : return match[-1].strip().lower() else: print("No answer tags found") return text[-200:] #because o1-searcher fails to follow format def extract_answer_boxed(response): def remove_boxed(s): if "\\boxed " in s: left = "\\boxed " assert s[:len(left)] == left return s[len(left):] left = "\\boxed{" assert s[:len(left)] == left assert s[-1] == "}" return s[len(left):-1] def last_boxed_only_string(string): idx = string.rfind("\\boxed") if "\\boxed " in string: return "\\boxed " + string.split("\\boxed ")[-1].split("$")[0] if idx < 0: idx = string.rfind("\\fbox") if idx < 0: return None i = idx right_brace_idx = None num_left_braces_open = 0 while i < len(string): if string[i] == "{": num_left_braces_open += 1 if string[i] == "}": num_left_braces_open -= 1 if num_left_braces_open == 0: right_brace_idx = i break i += 1 if right_brace_idx is None: retval = None else: retval = string[idx:right_brace_idx + 1] return retval answer = remove_boxed(last_boxed_only_string(response)) return answer JUDGE_SYS = """ You are an impartial judge evaluating the correctness of a model's answer against a ground-truth answer for a given question. Your task is to: 1. Compare the model's answer to the ground-truth answer. 2. Determine if the model's answer is correct or incorrect. **Input Format:** - Question: {question} - Ground Truth: {ground_truth} - Model Answer: {model_answer} **Output Format:** correct/incorrect/unknown **Guidelines:** - The model's answer is correct if it matches the ground-truth answer in meaning and content it is case-insensitive, ignore minor punctuation or formatting differences. - If the model's answer contains additional information, it is still correct as long as the core answer matches the ground truth. - Be precise output a single word correct / incorrect / unknown and **nothing else** - For MCQ questions match the option ID A. B. C. or D. if its correct the answer is correct. """ # - If the model's answer is partially correct or contains errors, it is incorrect. # Thread‑local OpenAI client cache def _oa() -> OpenAI: th = threading.current_thread() if not hasattr(th, "_oa"): th._oa = OpenAI() return th._oa def judge(q: str, gt: str, pred: str) -> str: if pred == "": return "unknown" prompt = JUDGE_SYS.format(question=q, ground_truth=gt, model_answer=pred) try: with SEM: resp = _oa().chat.completions.create( model=JUDGE_MODEL, messages=[ {"role": "system", "content": JUDGE_SYS}, {"role": "user", "content": prompt}, ], temperature=0.0, max_tokens=100, ) return resp.choices[0].message.content.strip().lower() except APIStatusError: return "unknown" # -------------------------------------------------------------------- # Agent factory # -------------------------------------------------------------------- def build_agent(kind: str, model_url: str): kind = kind.lower() print(kind) if kind == "recall": return ReCall(executor_url=EXECUTOR_URL) else: raise ValueError(f"Unknown agent kind: {kind}") # if kind == "o1-search" or kind == "sds": # cfg = O1Cfg() # return O1Searcher(cfg, thinker_url=model_url) # if kind == "zerosearch": # cfg = ZeroSearchConfig(thinker_url=model_url) # return ZeroSearchInference(cfg) # if kind in ("r1-search", "r1-searcher", "r1"): # cfg = R1Cfg(serper_api_key=os.getenv("SERPER_API_KEY", "")) # return R1Searcher(cfg=cfg, model_url=model_url) # raise ValueError(f"Unknown agent kind: {kind}") # -------------------------------------------------------------------- # Core evaluation routine for a single example (thread‑safe) # -------------------------------------------------------------------- def evaluate_example(example: Dict[str, str], agent_kind: str, model_url: str) -> Dict[str, str]: """Run one example through the pipeline and return result row.""" question = example["question"].strip() answer_gt = example["answer"].strip() idx = example["id"].strip() # Build a *fresh* agent per thread to avoid shared‑state issues agent = build_agent(agent_kind, model_url=model_url) if agent_kind == "recall" and model_url == "deepseek-ai/DeepSeek-R1": # print(agent_kind) # print("B"*100) transcript, tool_calls = agent.run_deepseek( env=search_env, func_schemas=search_schemas, question=question, model_name="deepseek-ai/DeepSeek-R1", temperature=0.6, max_tokens=40960, # tokenizer = tokenizer ) elif agent_kind == "recall": transcript, tool_calls, chat = agent.run( env=search_env, func_schemas=search_schemas, question=question, model_url=model_url, temperature=0.6, max_new_tokens=40960, tokenizer = tokenizer ) # tool_calls = agent.extract_tool_calls(transcript) else: # zerosearch or r1‑searcher transcript, tool_calls = agent.run(question) if agent_kind in [ "r1-searcher", "zerosearch", # "o1-search", ]: pred = extract_answer_tagged(transcript) if agent_kind in [ "recall", "SDS" "o1-searcher" ]: try: pred = extract_answer_boxed(transcript) except: print("falling to last string") pred = transcript[-200:] else: try: pred = extract_answer_boxed(transcript) except: print("falling to last string") pred = transcript[-200:] verdict = judge(question, answer_gt.lower(), pred.lower()) return { "id": idx, "question": question, "answer_gt": answer_gt, "model_answer": pred, "judge": verdict, "tool_calls": tool_calls, "transcript": transcript, "chat": chat } # -------------------------------------------------------------------- # CLI entry‑point # -------------------------------------------------------------------- def build_output_path(out_base, agent, dataset, name) -> pathlib.Path: """Construct output path as {out_base}/{model_name}/{dataset}.jsonl.""" return out_base / f"{agent}" / f"{dataset}-{name}.jsonl" def normalize(s: str) -> str: return unicodedata.normalize("NFKD", s.strip().lower()) def load_existing_results(path: pathlib.Path) -> tuple[list[dict], set[str]]: results = [] uids = set() if not path.exists(): return results, uids with open(path, "r", encoding="utf-8") as f: for line in f: try: row = json.loads(line) if row['model_answer'] != "": results.append(row) uids.add(row["id"]) except Exception: continue return results, uids def main(): parser = argparse.ArgumentParser(description="Benchmark QA agents on a dataset (single or multi‑threaded)") parser.add_argument("--dataset", required=True, help="dataset name (frames, …)") parser.add_argument("--agent", required=True, choices=["recall", "zerosearch", "r1-searcher", "o1-search", "SDS", "deepseek-r1"], help="agent wrapper") parser.add_argument("--out", required=True, help="base directory for outputs") parser.add_argument("--model-url", required=False, help="URL of the model server") parser.add_argument("--limit", type=int, default=0, help="optional cap on number of questions") parser.add_argument("--mode", choices=["single", "multi"], default="single", help="execution mode") parser.add_argument("--workers", type=int, default=8, help="number of worker threads for multi‑mode") parser.add_argument("--name", type=str, default="", help="suffix for save dir") args = parser.parse_args() # ---------------------------------------------------------------- # Dataset loading # ---------------------------------------------------------------- ds_path = DATA_ROOT / f"{args.dataset}.jsonl" if not ds_path.exists(): raise FileNotFoundError(ds_path) with ds_path.open() as f: data = [json.loads(line) for line in f] # ---------------------------------------------------------------- # Output path setup # ---------------------------------------------------------------- out_base = pathlib.Path(args.out).expanduser().resolve() out_path = build_output_path(out_base, args.agent, args.dataset, args.name) print(out_path) out_path.parent.mkdir(parents=True, exist_ok=True) if args.limit: data = data[: args.limit] # data = data[246:] correct = 0 start_time = time.perf_counter() # ---------------------------------------------------------------- # SINGLE‑THREADED EXECUTION # ---------------------------------------------------------------- if args.mode == "single": with open(out_path, "w", encoding="utf-8") as fout: for ex in tqdm(data, desc="QA loop (single)"): row = evaluate_example(ex, args.agent, args.model_url) if row["judge"] == "correct": correct += 1 # context for row row.update({"agent": args.agent, "dataset": args.dataset}) fout.write(json.dumps(row, ensure_ascii=False) + "\n") fout.flush() # ---------------------------------------------------------------- # MULTI‑THREADED EXECUTION # ---------------------------------------------------------------- else: workers = max(1, args.workers) logging.info("Running in multi‑threaded mode with %d workers", workers) with ThreadPoolExecutor(max_workers=workers) as executor, open(out_path, "a", encoding="utf-8") as fout: futures = {executor.submit(evaluate_example, ex, args.agent, args.model_url): ex for ex in data} for fut in tqdm(as_completed(futures), total=len(futures), desc="QA loop (multi)"): try: row = fut.result() except Exception as exc: logging.exception("Evaluation failed: %s", exc) continue # print(row['id']) if row["judge"] == "correct": correct += 1 row.update({"agent": args.agent, "dataset": args.dataset}) fout.write(json.dumps(row, ensure_ascii=False) + "\n") fout.flush() elapsed = time.perf_counter() - start_time accuracy = correct / len(data) if data else 0.0 print(f"Accuracy: {correct}/{len(data)} = {accuracy:.1%}") print(f"Elapsed time: {elapsed:.2f}s ({elapsed/len(data):.2f}s per example)") if __name__ == "__main__": main()