diff --git a/packages/gooddata-eval/README.md b/packages/gooddata-eval/README.md index 44733933f..0d9d7da27 100644 --- a/packages/gooddata-eval/README.md +++ b/packages/gooddata-eval/README.md @@ -91,6 +91,7 @@ Both provider name and provider id are accepted as the prefix. | Flag | Default | Description | |---|---|---| | `--runs K` | `2` | Independent runs per item (pass@K). An item passes if any run passes. | +| `--concurrency K` | `1` | Number of items evaluated concurrently. `1` = sequential (default). Increase to load-test the agent under simultaneous requests. Progress output interleaves when K > 1. | #### Output diff --git a/packages/gooddata-eval/src/gooddata_eval/cli/main.py b/packages/gooddata-eval/src/gooddata_eval/cli/main.py index ff041f18b..95aa0d442 100644 --- a/packages/gooddata-eval/src/gooddata_eval/cli/main.py +++ b/packages/gooddata-eval/src/gooddata_eval/cli/main.py @@ -52,6 +52,13 @@ def _build_parser() -> argparse.ArgumentParser: ), ) run.add_argument("--runs", type=int, default=2, help="Independent runs per item (pass@K). Default 2.") + run.add_argument( + "--concurrency", + type=int, + default=1, + help="Number of items evaluated concurrently (default 1 = sequential). " + "Increase to load-test the agent under simultaneous requests.", + ) run.add_argument("--json", dest="json_path", help="Write a JSON report to this path.") run.add_argument("--quiet", action="store_true", help="Suppress per-item progress output.") run.add_argument( @@ -270,6 +277,7 @@ def on_langfuse_item_done( on_run_done=on_run_done, on_item_done=on_item_done, on_langfuse_item_done=on_langfuse_item_done, + concurrency=config.concurrency, ) finally: if hasattr(backend, "close"): @@ -309,6 +317,9 @@ def on_langfuse_item_done( def main(argv: list[str] | None = None) -> int: args = parse_args(argv if argv is not None else sys.argv[1:]) + if hasattr(args, "concurrency") and args.concurrency < 1: + print("error: --concurrency must be >= 1.", file=sys.stderr) + return _EXIT_OPERATIONAL_ERROR try: host, token = resolve_connection(host=args.host, token=args.token, profile=args.profile) if args.command == "models": @@ -321,6 +332,7 @@ def main(argv: list[str] | None = None) -> int: langfuse_dataset=args.langfuse_dataset, models=args.models or [], runs=args.runs, + concurrency=args.concurrency, json_path=Path(args.json_path) if args.json_path else None, log_to_langfuse=args.langfuse, quiet=args.quiet, diff --git a/packages/gooddata-eval/src/gooddata_eval/core/config.py b/packages/gooddata-eval/src/gooddata_eval/core/config.py index 1effe8613..0d96d56d4 100644 --- a/packages/gooddata-eval/src/gooddata_eval/core/config.py +++ b/packages/gooddata-eval/src/gooddata_eval/core/config.py @@ -14,6 +14,7 @@ class RunConfig: langfuse_dataset: str | None = None models: list[str] = field(default_factory=list) runs: int = 2 + concurrency: int = 1 json_path: Path | None = None log_to_langfuse: bool = False quiet: bool = False diff --git a/packages/gooddata-eval/src/gooddata_eval/core/reporting/console.py b/packages/gooddata-eval/src/gooddata_eval/core/reporting/console.py index 316490a84..d16f94fe2 100644 --- a/packages/gooddata-eval/src/gooddata_eval/core/reporting/console.py +++ b/packages/gooddata-eval/src/gooddata_eval/core/reporting/console.py @@ -46,11 +46,16 @@ def render_console(report: EvalReport, *, console: Console | None = None) -> str table.add_row(item.id, item.test_kind, result, str(item.runs), latency, avg, quality, notes) out.print(table) + _wall = report.wall_clock_s + _agent = report.latency_s + if _wall > 0 and abs(_wall - _agent) > 1: # concurrency > 1: show both + timing = f"{_wall:.2f}s wall-clock, {_agent:.2f}s agent time (avg {report.avg_latency_s:.2f}s/run)" + else: + timing = f"{_agent:.2f}s (avg {report.avg_latency_s:.2f}s/run)" out.print( f"\nSummary: {report.passed}/{report.total} passed " f"({report.skipped} skipped, {report.errored} errored) " - f"avg quality {report.avg_quality_score:.0%} " - f"in {report.latency_s:.2f}s (avg {report.avg_latency_s:.2f}s/run)" + f"avg quality {report.avg_quality_score:.0%} in {timing}" ) return out.export_text() if out.record else "" diff --git a/packages/gooddata-eval/src/gooddata_eval/core/reporting/json_report.py b/packages/gooddata-eval/src/gooddata_eval/core/reporting/json_report.py index 1ad4a1cd7..bc6a28186 100644 --- a/packages/gooddata-eval/src/gooddata_eval/core/reporting/json_report.py +++ b/packages/gooddata-eval/src/gooddata_eval/core/reporting/json_report.py @@ -20,6 +20,7 @@ def _build_run_dict(report: EvalReport) -> dict: "errored": report.errored, "latency_s": round(report.latency_s, 3), "avg_latency_s": round(report.avg_latency_s, 3), + "wall_clock_s": round(report.wall_clock_s, 3), }, "items": { item.id: { diff --git a/packages/gooddata-eval/src/gooddata_eval/core/runner.py b/packages/gooddata-eval/src/gooddata_eval/core/runner.py index 7fb0d36d6..9eaf47100 100644 --- a/packages/gooddata-eval/src/gooddata_eval/core/runner.py +++ b/packages/gooddata-eval/src/gooddata_eval/core/runner.py @@ -2,6 +2,8 @@ """Dataset run orchestration: per item, K single-turn runs, route by test_kind, aggregate pass@K.""" import time +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from functools import partial from typing import Callable, Protocol @@ -52,6 +54,7 @@ class EvalReport: provider_type: str = "" workspace_id: str = "" items: list[ItemReport] = field(default_factory=list) + wall_clock_s: float = 0.0 # actual elapsed time; differs from latency_s under concurrency @property def total(self) -> int: @@ -153,6 +156,7 @@ def run_items( on_run_done: Callable[[int, int, int, int, bool, float], None] | None = None, on_item_done: Callable[[int, int, ItemReport], None] | None = None, on_langfuse_item_done: Callable[[int, int, ItemReport], None] | None = None, + concurrency: int = 1, ) -> EvalReport: """Run every item K times, routing by test_kind, and aggregate pass@K. @@ -162,19 +166,47 @@ def run_items( - on_run_done(index, total, run_index, runs, passed, latency) after each individual run - on_item_done(index, total, report) after an item is fully evaluated - on_langfuse_item_done(index, total, report) after non-skipped, non-errored items only + + concurrency > 1 dispatches items to a ThreadPoolExecutor so multiple + questions are sent to the agent simultaneously. Each item still runs + --runs times sequentially (pass@K). Results are collected in input order. """ + concurrency = max(1, concurrency) report = EvalReport( model=model, provider_name=provider_name, provider_type=provider_type, workspace_id=workspace_id ) total = len(items) - for index, item in enumerate(items, start=1): - if on_item_start is not None: - on_item_start(index, total, item) + + def _process_item(index: int, item: DatasetItem) -> ItemReport: + try: + if on_item_start is not None: + on_item_start(index, total, item) + except Exception: # non-fatal — callback must not abort a parallel run + traceback.print_exc() run_cb = partial(_forward_run_event, on_run_done, index, total) if on_run_done is not None else None item_report = _run_one_item(item, backend, runs, on_run_done=run_cb) - report.items.append(item_report) - if on_item_done is not None: - on_item_done(index, total, item_report) - if on_langfuse_item_done is not None and not item_report.skipped and item_report.error is None: - on_langfuse_item_done(index, total, item_report) + try: + if on_item_done is not None: + on_item_done(index, total, item_report) + if on_langfuse_item_done is not None and not item_report.skipped and item_report.error is None: + on_langfuse_item_done(index, total, item_report) + except Exception: # non-fatal — log but don't abort + traceback.print_exc() + return item_report + + _t0 = time.perf_counter() + if concurrency <= 1: + for index, item in enumerate(items, start=1): + report.items.append(_process_item(index, item)) + else: + # Dispatch concurrently; collect in original order. + with ThreadPoolExecutor(max_workers=concurrency) as pool: + futures = {pool.submit(_process_item, index, item): index for index, item in enumerate(items, start=1)} + results: dict[int, ItemReport] = {} + for future in as_completed(futures): + idx = futures[future] + results[idx] = future.result() + for index in range(1, total + 1): + report.items.append(results[index]) + report.wall_clock_s = time.perf_counter() - _t0 return report diff --git a/packages/gooddata-eval/tests/test_cli.py b/packages/gooddata-eval/tests/test_cli.py index c99aa888d..04063d880 100644 --- a/packages/gooddata-eval/tests/test_cli.py +++ b/packages/gooddata-eval/tests/test_cli.py @@ -490,3 +490,43 @@ def test_parse_model_arg_plain_model_no_strip(): # The no-slash path does not strip whitespace; argparse never passes # whitespace through, so this documents the current behaviour. assert _parse_model_arg(" gpt-5.2 ") == (None, " gpt-5.2 ") + + +def test_cli_rejects_zero_concurrency(monkeypatch, fixtures_dir): + monkeypatch.setattr(cli_main, "resolve_connection", lambda host, token, profile: ("https://h", "tok")) + exit_code = cli_main.main( + [ + "run", + "--host", + "https://h", + "--token", + "tok", + "--workspace", + "ws1", + "--dataset", + str(fixtures_dir / "sample_dataset"), + "--concurrency", + "0", + ] + ) + assert exit_code == 2 + + +def test_cli_rejects_negative_concurrency(monkeypatch, fixtures_dir): + monkeypatch.setattr(cli_main, "resolve_connection", lambda host, token, profile: ("https://h", "tok")) + exit_code = cli_main.main( + [ + "run", + "--host", + "https://h", + "--token", + "tok", + "--workspace", + "ws1", + "--dataset", + str(fixtures_dir / "sample_dataset"), + "--concurrency", + "-1", + ] + ) + assert exit_code == 2 diff --git a/packages/gooddata-eval/tests/test_runner.py b/packages/gooddata-eval/tests/test_runner.py index ede002043..5d8b6aad4 100644 --- a/packages/gooddata-eval/tests/test_runner.py +++ b/packages/gooddata-eval/tests/test_runner.py @@ -1,7 +1,9 @@ # (C) 2026 GoodData Corporation +import threading + from gooddata_eval.core.evaluators import supported_test_kinds from gooddata_eval.core.models import ChatResult, DatasetItem -from gooddata_eval.core.runner import run_items +from gooddata_eval.core.runner import ItemReport, run_items def _viz_obj(): @@ -135,3 +137,112 @@ def test_run_items_does_not_invoke_langfuse_callback_for_skipped_items(): on_langfuse_item_done=lambda idx, total, r: langfuse_calls.append(r.id), ) assert langfuse_calls == [] + + +def test_run_items_concurrency_produces_all_results_in_order(): + """With concurrency > 1 all items still appear in input order.""" + items = [ + DatasetItem( + id=f"item-{i}", + dataset_name="d", + test_kind="visualization", + question=f"q{i}", + expected_output={"visualization": _viz_obj()}, + ) + for i in range(6) + ] + backend = _FakeBackend([_chat_with(_viz_obj())] * 6) + report = run_items(items, backend, runs=1, concurrency=3) + assert report.total == 6 + assert [r.id for r in report.items] == [f"item-{i}" for i in range(6)] + assert all(r.pass_at_k for r in report.items) + + +def test_run_items_concurrency_1_and_sequential_produce_same_results(): + """concurrency=1 and the default sequential path give identical reports.""" + items = [ + DatasetItem( + id=f"i{i}", + dataset_name="d", + test_kind="visualization", + question="q", + expected_output={"visualization": _viz_obj()}, + ) + for i in range(4) + ] + backend_a = _FakeBackend([_chat_with(_viz_obj())] * 4) + backend_b = _FakeBackend([_chat_with(_viz_obj())] * 4) + report_seq = run_items(items, backend_a, runs=1) + report_par = run_items(items, backend_b, runs=1, concurrency=1) + assert [r.id for r in report_seq.items] == [r.id for r in report_par.items] + assert [r.pass_at_k for r in report_seq.items] == [r.pass_at_k for r in report_par.items] + + +def test_run_items_concurrency_errored_item_does_not_crash_pool(): + """An errored item is recorded but does not abort a concurrent run.""" + + class _BoomBackend: + def ask(self, question: str) -> ChatResult: + raise RuntimeError("agent down") + + items = [ + DatasetItem( + id=f"e{i}", + dataset_name="d", + test_kind="visualization", + question="q", + expected_output={"visualization": _viz_obj()}, + ) + for i in range(4) + ] + report = run_items(items, _BoomBackend(), runs=1, concurrency=3) + assert report.total == 4 + assert report.errored == 4 + assert [r.id for r in report.items] == [f"e{i}" for i in range(4)] + + +def test_run_items_concurrency_callbacks_fire_for_all_items(): + """on_item_done is called exactly once per item under concurrency > 1.""" + backend = _FakeBackend([_chat_with(_viz_obj())] * 5) + lock = threading.Lock() + done_ids: list = [] + + def on_done(index: int, total: int, report: ItemReport) -> None: + with lock: + done_ids.append(report.id) + + items = [ + DatasetItem( + id=f"c{i}", + dataset_name="d", + test_kind="visualization", + question="q", + expected_output={"visualization": _viz_obj()}, + ) + for i in range(5) + ] + run_items(items, backend, runs=1, concurrency=3, on_item_done=on_done) + assert sorted(done_ids) == [f"c{i}" for i in range(5)] + + +def test_run_items_callback_exception_is_logged_not_swallowed(capsys): + """A raising callback prints a traceback to stderr but the run continues.""" + backend = _FakeBackend([_chat_with(_viz_obj())] * 2) + items = [ + DatasetItem( + id=f"x{i}", + dataset_name="d", + test_kind="visualization", + question="q", + expected_output={"visualization": _viz_obj()}, + ) + for i in range(2) + ] + + def bad_callback(index, total, report): + raise RuntimeError("callback bug") + + result = run_items(items, backend, runs=1, on_item_done=bad_callback) + assert result.total == 2 # run did not abort + err = capsys.readouterr().err + assert "RuntimeError" in err or "callback bug" in err # traceback was printed