Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,252 changes: 3,252 additions & 0 deletions packages/gooddata-eval/LICENSE.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/gooddata-eval/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Both provider name and provider id are accepted as the prefix.
| Flag | Default | Description |
|---|---|---|
| `--runs K` | `2` | Independent runs per item (pass@K). An item passes if any run passes. |
| `--concurrency K` | `1` | Number of items evaluated concurrently. `1` = sequential (default). Increase to load-test the agent under simultaneous requests. Progress output interleaves when K > 1. |

#### Output

Expand Down
12 changes: 12 additions & 0 deletions packages/gooddata-eval/src/gooddata_eval/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ def _build_parser() -> argparse.ArgumentParser:
),
)
run.add_argument("--runs", type=int, default=2, help="Independent runs per item (pass@K). Default 2.")
run.add_argument(
"--concurrency",
type=int,
default=1,
help="Number of items evaluated concurrently (default 1 = sequential). "
"Increase to load-test the agent under simultaneous requests.",
)
run.add_argument("--json", dest="json_path", help="Write a JSON report to this path.")
run.add_argument("--quiet", action="store_true", help="Suppress per-item progress output.")
run.add_argument(
Expand Down Expand Up @@ -270,6 +277,7 @@ def on_langfuse_item_done(
on_run_done=on_run_done,
on_item_done=on_item_done,
on_langfuse_item_done=on_langfuse_item_done,
concurrency=config.concurrency,
)
finally:
if hasattr(backend, "close"):
Expand Down Expand Up @@ -309,6 +317,9 @@ def on_langfuse_item_done(

def main(argv: list[str] | None = None) -> int:
args = parse_args(argv if argv is not None else sys.argv[1:])
if hasattr(args, "concurrency") and args.concurrency < 1:
print("error: --concurrency must be >= 1.", file=sys.stderr)
return _EXIT_OPERATIONAL_ERROR
try:
host, token = resolve_connection(host=args.host, token=args.token, profile=args.profile)
if args.command == "models":
Expand All @@ -321,6 +332,7 @@ def main(argv: list[str] | None = None) -> int:
langfuse_dataset=args.langfuse_dataset,
models=args.models or [],
runs=args.runs,
concurrency=args.concurrency,
json_path=Path(args.json_path) if args.json_path else None,
log_to_langfuse=args.langfuse,
quiet=args.quiet,
Expand Down
1 change: 1 addition & 0 deletions packages/gooddata-eval/src/gooddata_eval/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class RunConfig:
langfuse_dataset: str | None = None
models: list[str] = field(default_factory=list)
runs: int = 2
concurrency: int = 1
json_path: Path | None = None
log_to_langfuse: bool = False
quiet: bool = False
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,16 @@ def render_console(report: EvalReport, *, console: Console | None = None) -> str
table.add_row(item.id, item.test_kind, result, str(item.runs), latency, avg, quality, notes)

out.print(table)
_wall = report.wall_clock_s
_agent = report.latency_s
if _wall > 0 and abs(_wall - _agent) > 1: # concurrency > 1: show both
timing = f"{_wall:.2f}s wall-clock, {_agent:.2f}s agent time (avg {report.avg_latency_s:.2f}s/run)"
else:
timing = f"{_agent:.2f}s (avg {report.avg_latency_s:.2f}s/run)"
out.print(
f"\nSummary: {report.passed}/{report.total} passed "
f"({report.skipped} skipped, {report.errored} errored) "
f"avg quality {report.avg_quality_score:.0%} "
f"in {report.latency_s:.2f}s (avg {report.avg_latency_s:.2f}s/run)"
f"avg quality {report.avg_quality_score:.0%} in {timing}"
)
return out.export_text() if out.record else ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def _build_run_dict(report: EvalReport) -> dict:
"errored": report.errored,
"latency_s": round(report.latency_s, 3),
"avg_latency_s": round(report.avg_latency_s, 3),
"wall_clock_s": round(report.wall_clock_s, 3),
},
"items": {
item.id: {
Expand Down
48 changes: 40 additions & 8 deletions packages/gooddata-eval/src/gooddata_eval/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"""Dataset run orchestration: per item, K single-turn runs, route by test_kind, aggregate pass@K."""

import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from functools import partial
from typing import Callable, Protocol
Expand Down Expand Up @@ -52,6 +54,7 @@ class EvalReport:
provider_type: str = ""
workspace_id: str = ""
items: list[ItemReport] = field(default_factory=list)
wall_clock_s: float = 0.0 # actual elapsed time; differs from latency_s under concurrency

@property
def total(self) -> int:
Expand Down Expand Up @@ -153,6 +156,7 @@ def run_items(
on_run_done: Callable[[int, int, int, int, bool, float], None] | None = None,
on_item_done: Callable[[int, int, ItemReport], None] | None = None,
on_langfuse_item_done: Callable[[int, int, ItemReport], None] | None = None,
concurrency: int = 1,
) -> EvalReport:
"""Run every item K times, routing by test_kind, and aggregate pass@K.

Expand All @@ -162,19 +166,47 @@ def run_items(
- on_run_done(index, total, run_index, runs, passed, latency) after each individual run
- on_item_done(index, total, report) after an item is fully evaluated
- on_langfuse_item_done(index, total, report) after non-skipped, non-errored items only

concurrency > 1 dispatches items to a ThreadPoolExecutor so multiple
questions are sent to the agent simultaneously. Each item still runs
--runs times sequentially (pass@K). Results are collected in input order.
"""
concurrency = max(1, concurrency)
report = EvalReport(
model=model, provider_name=provider_name, provider_type=provider_type, workspace_id=workspace_id
)
total = len(items)
for index, item in enumerate(items, start=1):
if on_item_start is not None:
on_item_start(index, total, item)

def _process_item(index: int, item: DatasetItem) -> ItemReport:
try:
if on_item_start is not None:
on_item_start(index, total, item)
except Exception: # non-fatal — callback must not abort a parallel run
traceback.print_exc()
run_cb = partial(_forward_run_event, on_run_done, index, total) if on_run_done is not None else None
item_report = _run_one_item(item, backend, runs, on_run_done=run_cb)
report.items.append(item_report)
if on_item_done is not None:
on_item_done(index, total, item_report)
if on_langfuse_item_done is not None and not item_report.skipped and item_report.error is None:
on_langfuse_item_done(index, total, item_report)
try:
if on_item_done is not None:
on_item_done(index, total, item_report)
if on_langfuse_item_done is not None and not item_report.skipped and item_report.error is None:
on_langfuse_item_done(index, total, item_report)
except Exception: # non-fatal — log but don't abort
traceback.print_exc()
return item_report

_t0 = time.perf_counter()
if concurrency <= 1:
for index, item in enumerate(items, start=1):
report.items.append(_process_item(index, item))
else:
# Dispatch concurrently; collect in original order.
with ThreadPoolExecutor(max_workers=concurrency) as pool:
futures = {pool.submit(_process_item, index, item): index for index, item in enumerate(items, start=1)}
results: dict[int, ItemReport] = {}
for future in as_completed(futures):
idx = futures[future]
results[idx] = future.result()
for index in range(1, total + 1):
report.items.append(results[index])
report.wall_clock_s = time.perf_counter() - _t0
return report
40 changes: 40 additions & 0 deletions packages/gooddata-eval/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,43 @@ def test_parse_model_arg_plain_model_no_strip():
# The no-slash path does not strip whitespace; argparse never passes
# whitespace through, so this documents the current behaviour.
assert _parse_model_arg(" gpt-5.2 ") == (None, " gpt-5.2 ")


def test_cli_rejects_zero_concurrency(monkeypatch, fixtures_dir):
monkeypatch.setattr(cli_main, "resolve_connection", lambda host, token, profile: ("https://h", "tok"))
exit_code = cli_main.main(
[
"run",
"--host",
"https://h",
"--token",
"tok",
"--workspace",
"ws1",
"--dataset",
str(fixtures_dir / "sample_dataset"),
"--concurrency",
"0",
]
)
assert exit_code == 2


def test_cli_rejects_negative_concurrency(monkeypatch, fixtures_dir):
monkeypatch.setattr(cli_main, "resolve_connection", lambda host, token, profile: ("https://h", "tok"))
exit_code = cli_main.main(
[
"run",
"--host",
"https://h",
"--token",
"tok",
"--workspace",
"ws1",
"--dataset",
str(fixtures_dir / "sample_dataset"),
"--concurrency",
"-1",
]
)
assert exit_code == 2
113 changes: 112 additions & 1 deletion packages/gooddata-eval/tests/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# (C) 2026 GoodData Corporation
import threading

from gooddata_eval.core.evaluators import supported_test_kinds
from gooddata_eval.core.models import ChatResult, DatasetItem
from gooddata_eval.core.runner import run_items
from gooddata_eval.core.runner import ItemReport, run_items


def _viz_obj():
Expand Down Expand Up @@ -135,3 +137,112 @@ def test_run_items_does_not_invoke_langfuse_callback_for_skipped_items():
on_langfuse_item_done=lambda idx, total, r: langfuse_calls.append(r.id),
)
assert langfuse_calls == []


def test_run_items_concurrency_produces_all_results_in_order():
"""With concurrency > 1 all items still appear in input order."""
items = [
DatasetItem(
id=f"item-{i}",
dataset_name="d",
test_kind="visualization",
question=f"q{i}",
expected_output={"visualization": _viz_obj()},
)
for i in range(6)
]
backend = _FakeBackend([_chat_with(_viz_obj())] * 6)
report = run_items(items, backend, runs=1, concurrency=3)
assert report.total == 6
assert [r.id for r in report.items] == [f"item-{i}" for i in range(6)]
assert all(r.pass_at_k for r in report.items)


def test_run_items_concurrency_1_and_sequential_produce_same_results():
"""concurrency=1 and the default sequential path give identical reports."""
items = [
DatasetItem(
id=f"i{i}",
dataset_name="d",
test_kind="visualization",
question="q",
expected_output={"visualization": _viz_obj()},
)
for i in range(4)
]
backend_a = _FakeBackend([_chat_with(_viz_obj())] * 4)
backend_b = _FakeBackend([_chat_with(_viz_obj())] * 4)
report_seq = run_items(items, backend_a, runs=1)
report_par = run_items(items, backend_b, runs=1, concurrency=1)
assert [r.id for r in report_seq.items] == [r.id for r in report_par.items]
assert [r.pass_at_k for r in report_seq.items] == [r.pass_at_k for r in report_par.items]


def test_run_items_concurrency_errored_item_does_not_crash_pool():
"""An errored item is recorded but does not abort a concurrent run."""

class _BoomBackend:
def ask(self, question: str) -> ChatResult:
raise RuntimeError("agent down")

items = [
DatasetItem(
id=f"e{i}",
dataset_name="d",
test_kind="visualization",
question="q",
expected_output={"visualization": _viz_obj()},
)
for i in range(4)
]
report = run_items(items, _BoomBackend(), runs=1, concurrency=3)
assert report.total == 4
assert report.errored == 4
assert [r.id for r in report.items] == [f"e{i}" for i in range(4)]


def test_run_items_concurrency_callbacks_fire_for_all_items():
"""on_item_done is called exactly once per item under concurrency > 1."""
backend = _FakeBackend([_chat_with(_viz_obj())] * 5)
lock = threading.Lock()
done_ids: list = []

def on_done(index: int, total: int, report: ItemReport) -> None:
with lock:
done_ids.append(report.id)

items = [
DatasetItem(
id=f"c{i}",
dataset_name="d",
test_kind="visualization",
question="q",
expected_output={"visualization": _viz_obj()},
)
for i in range(5)
]
run_items(items, backend, runs=1, concurrency=3, on_item_done=on_done)
assert sorted(done_ids) == [f"c{i}" for i in range(5)]


def test_run_items_callback_exception_is_logged_not_swallowed(capsys):
"""A raising callback prints a traceback to stderr but the run continues."""
backend = _FakeBackend([_chat_with(_viz_obj())] * 2)
items = [
DatasetItem(
id=f"x{i}",
dataset_name="d",
test_kind="visualization",
question="q",
expected_output={"visualization": _viz_obj()},
)
for i in range(2)
]

def bad_callback(index, total, report):
raise RuntimeError("callback bug")

result = run_items(items, backend, runs=1, on_item_done=bad_callback)
assert result.total == 2 # run did not abort
err = capsys.readouterr().err
assert "RuntimeError" in err or "callback bug" in err # traceback was printed
Loading