Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions DATA_PROVENANCE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# iSamples Explorer — Data Provenance

How every parquet file the explorer uses is generated, from root to publish.
*Reviewed 2026-06-02 (CC, via codebase audit). Complements `SERIALIZATIONS.md` (format/schema reference); this file is the end-to-end build chain + the automation gaps.*

> **Load-bearing constraint:** the **root export cannot be regenerated.** It was produced from the iSamples Central Solr API (`central.isample.xyz`), **offline since Aug 2025**. The Zenodo-archived export is a **frozen root**. Any *new* data (e.g. concept URIs, thumbnails) therefore must come from a **per-source supplementary file merged into the base by `pid`** — the "sidecar" pattern (see Stage 3) — not from re-exporting.

## Pipeline DAG

```
Source collections (SESAR · OpenContext · GEOME · Smithsonian)
│ iSamples Central Solr API ── OFFLINE since Aug 2025 (cannot re-run) ──┐
▼ │
STAGE 0/1 export_client → JSONL → GeoParquet │ frozen
→ isamples_export_*_geo.parquet (Export format; ~300MB, 6.7M; Zenodo doi:10.5281/zenodo.15278211)
STAGE 2 pqg/pqg/sql_converter.py (export → base PQG; 7-stage DuckDB SQL)
→ narrow (…_narrow.parquet, ~844MB, 106M rows) and wide (…_wide.parquet, ~282MB, 20M rows)
STAGE 3 sidecar/enrichment merge (LEFT JOIN by pid) ← Eric's independently-maintained OC PQG (GCS)
scripts/enrich_wide_with_oc_thumbnails.py → isamples_202604_wide.parquet (+47K thumbnails)
STAGE 4 wide → frontend derived files (mostly AD-HOC / not checked in — see gaps)
→ wide_h3 · h3_summary_res4/6/8 · samples_map_lite · sample_facets_v2 · facet_summaries · facet_cross_filter
→ vocab_labels (scripts/build_vocab_labels.py — the one fully-scripted derived file; built from SKOS TTLs)
STAGE 5 publish to R2 (bucket isamples-ry) + Cloudflare Worker (data.isamples.org, /current/ aliases)
DuckDB-WASM in the browser (explorer.qmd; parquet URLs ~L767-781)
```

## Stages (script / command per step)

| Stage | Input → Output | How (file:line) | Automated? |
|---|---|---|---|
| **0/1 Export** | Solr API → `isamples_export_*_geo.parquet` | `export_client` `ExportClient.perform_full_download()` (`export_client.py:423-469`) → `write_geoparquet_from_json_lines()`; schema `SOURCE_COLUMNS` (`duckdb_utilities.py:9-42`, incl. `keywords: STRUCT(keyword VARCHAR)[]` — **text only, no URI**, L17) | ❌ API offline; **frozen** |
| **2 Base PQG** | export → `*_narrow.parquet` / `*_wide.parquet` | `pqg/pqg/sql_converter.py` `convert_isamples_sql(input, output, wide=…)` (CLI `python pqg/sql_converter.py in.parquet out.parquet [--wide]`); 7 stages, decomposes nested structs → nodes+edges; site dedupe by rounded lat/lon+label | ✅ scripted (exact prod invocation not recorded — gap) |
| **3 Sidecar merge** | base wide + Eric's OC PQG → `isamples_202604_wide.parquet` | `scripts/enrich_wide_with_oc_thumbnails.py` — `LEFT JOIN` OC `(pid, thumbnail_url)` into wide (`COALESCE`). **This is the precedent for merging ANY per-source supplement (incl. concept URIs) by pid.** Drift check: `scripts/check_oc_pqg_drift.py` (detects only; no mirror) | ⚠️ merge scripted; OC mirror + R2 upload manual |
| **4 Frontend derived** | wide → 7 explorer files | `vocab_labels.parquet` ← `scripts/build_vocab_labels.py` (SKOS TTLs via rdflib). **The other 6** (`wide_h3`, `h3_summary_res4/6/8`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`) have **no checked-in build script** — query patterns live in notebooks / `SERIALIZATIONS.md` only | ❌ ad-hoc (1 of 7 scripted) |
| **5 Publish** | files → R2 + Worker | Worker `workers/data-isamples-org/src/index.js` (`wrangler deploy`); immutable cache for `isamples_\d{6}_*.parquet`; `/current/<flavor>.parquet` → 302 via `current/manifest.json`. Bucket `isamples-ry` | ⚠️ Worker scripted; **file upload + manifest update are manual** |

## The sidecar/enrichment pattern (how new data gets in)

Because the export is frozen, new per-source data is added by **merging a supplementary parquet keyed by `pid` into the base wide** — exactly what the thumbnail enrichment does:

```sql
-- scripts/enrich_wide_with_oc_thumbnails.py (core)
CREATE TEMP TABLE oc_thumbs AS
SELECT DISTINCT pid, thumbnail_url FROM read_parquet('<eric_oc_pqg>') WHERE thumbnail_url IS NOT NULL;
COPY (SELECT p.* REPLACE (COALESCE(oc.thumbnail_url, p.thumbnail_url) AS thumbnail_url)
FROM read_parquet('<base_wide>') p LEFT JOIN oc_thumbs oc ON p.pid = oc.pid)
TO '<out>' (FORMAT PARQUET, COMPRESSION ZSTD);
```

Eric Kansa maintains OpenContext PQG **independently** on GCS (`storage.googleapis.com/opencontext-parquet/oc_isamples_pqg.parquet`), so it can carry data the frozen iSamples export lacks. This is the channel for **#263** (external concept URIs): Eric's OC PQG carries them → merged into wide by pid → flows to the derived files. *(Sidecar design endorsed 2026-04-17; the spec `project_isamples_sidecar_pattern.md` lives in the Obsidian vault, not a repo — gap.)*

## Documentation / automation gaps

- **6 of 7 frontend derived files have no checked-in build script** (`wide_h3`, the three `h3_summary_res*`, `samples_map_lite`, `sample_facets_v2`, `facet_summaries`, `facet_cross_filter`). Query patterns exist in notebooks + `SERIALIZATIONS.md §4` but not as runnable COPY-TO scripts. `pqg add-h3` / `pqg facet-summaries` are named in the dev journal (Mar 2026) but **absent from `pqg/__main__.py`**.
- **No R2 upload automation** — file upload to bucket `isamples-ry` + `current/manifest.json` update are manual `wrangler`/dashboard steps.
- **No OC mirror script** — `check_oc_pqg_drift.py` detects GCS↔R2 drift but doesn't perform the mirror.
- **Exact prod invocation** that produced `zenodo_narrow_2025-12-12` / `zenodo_wide_2026-01-09` from the Zenodo export is not recorded (dedupe options unknown).
- **No Makefile / CI / post-render hook** rebuilds derived files when wide changes — every post-Stage-2 step is manual.
- **`SERIALIZATIONS.md:80`** claims every file "can be rebuilt by a script" — aspirational; true for ~4 of 10 files.
- **Sidecar spec** is in Obsidian only, not version-controlled with the code.

## Key files
- `export_client/isamples_export_client/duckdb_utilities.py` — export schema (keywords narrowing @ L17)
- `pqg/pqg/sql_converter.py` — export→PQG engine; `pqg/docs/PQG_SPECIFICATION.md` — format spec
- `isamplesorg.github.io/scripts/enrich_wide_with_oc_thumbnails.py` — the sidecar-merge precedent
- `isamplesorg.github.io/scripts/build_vocab_labels.py` — the one scripted derived file
- `isamplesorg.github.io/scripts/check_oc_pqg_drift.py` — OC drift check
- `isamplesorg.github.io/workers/data-isamples-org/{src/index.js,wrangler.toml}` — Worker + R2 config
- `isamplesorg.github.io/SERIALIZATIONS.md` — format/schema reference (DAG companion to this file)
199 changes: 199 additions & 0 deletions scripts/build_frontend_derived.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""Build the explorer's frontend-derived parquet files from a wide PQG parquet.

Closes the biggest provenance gap (see DATA_PROVENANCE.md): 6 of the 7 derived
files previously had no checked-in build script — only ad-hoc notebook SQL. This
reproduces them deterministically from one `wide` input.

Inputs: a wide PQG parquet (e.g. isamples_YYYYMM_wide.parquet) — entity rows incl.
MaterialSampleRecord + IdentifiedConcept, with `geometry` (WKB) and the
`p__has_{material,context,sample_object}_category` row-id arrays.
Outputs (into --outdir, prefixed --tag, default `isamples_YYYYMM`):
- {tag}_sample_facets_v2.parquet pid, source, material, context, object_type, label, description, place_name
- {tag}_samples_map_lite.parquet pid, label, source, latitude, longitude, place_name[], result_time, h3_res8, h3_res8_hex
- {tag}_wide_h3.parquet wide + h3_res4/h3_res6/h3_res8 (large; use --skip wide_h3 to omit)
- {tag}_h3_summary_res{4,6,8}.parquet h3_cell, sample_count, center_lat, center_lng, dominant_source, source_count, resolution
- {tag}_facet_summaries.parquet facet_type, facet_value, scheme, count
- {tag}_facet_cross_filter.parquet filter_source/material/context/object_type, facet_type, facet_value, count

Usage:
python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --tag isamples_202601
python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --validate-against docs/data
python scripts/build_frontend_derived.py --wide WIDE.parquet --outdir OUT --only sample_facets_v2,facet_summaries
"""
import argparse, os, sys, time
import duckdb

FACET_DIMS = ["source", "material", "context", "object_type"]


def log(msg, t0):
print(f"[{time.time()-t0:.1f}s] {msg}", flush=True)


def base_samples_sql(wide: str) -> str:
"""One row per MaterialSampleRecord with resolved source/facet URIs, lat/lng, h3."""
return f"""
CREATE OR REPLACE TEMP TABLE ic AS
SELECT row_id, pid AS uri FROM read_parquet('{wide}') WHERE otype='IdentifiedConcept';
CREATE OR REPLACE TEMP TABLE samp AS
SELECT
s.pid,
s.n AS source,
s.label,
s.description,
s.place_name, -- VARCHAR[]
s.result_time,
ST_Y(ST_GeomFromWKB(s.geometry)) AS latitude,
ST_X(ST_GeomFromWKB(s.geometry)) AS longitude,
(SELECT uri FROM ic WHERE ic.row_id = s.p__has_material_category[1]) AS material,
(SELECT uri FROM ic WHERE ic.row_id = s.p__has_context_category[1]) AS context,
(SELECT uri FROM ic WHERE ic.row_id = s.p__has_sample_object_type[1]) AS object_type
FROM read_parquet('{wide}') s
WHERE s.otype='MaterialSampleRecord';
-- coordinate-bearing subset + h3 cells (used by map_lite + h3 summaries)
CREATE OR REPLACE TEMP TABLE samp_geo AS
SELECT *,
h3_latlng_to_cell(latitude, longitude, 4) AS h3_res4,
h3_latlng_to_cell(latitude, longitude, 6) AS h3_res6,
h3_latlng_to_cell(latitude, longitude, 8) AS h3_res8
FROM samp WHERE latitude IS NOT NULL AND longitude IS NOT NULL;
"""


def build_sample_facets_v2(con, out):
# located (coordinate-bearing) samples only — matches the published file,
# which is map-scoped (5.98M = GeospatialCoordLocation count).
con.execute(f"""COPY (
SELECT pid, source, material, context, object_type, label, description,
place_name::VARCHAR AS place_name
FROM samp_geo
) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""")


def build_samples_map_lite(con, out):
con.execute(f"""COPY (
SELECT pid, label, source, latitude, longitude, place_name, result_time,
h3_res8::UBIGINT AS h3_res8,
h3_h3_to_string(h3_res8) AS h3_res8_hex
FROM samp_geo
) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""")


def build_h3_summary(con, out, res):
col = f"h3_res{res}"
con.execute(f"""COPY (
SELECT {col}::UBIGINT AS h3_cell,
COUNT(*) AS sample_count,
AVG(latitude) AS center_lat,
AVG(longitude) AS center_lng,
MODE(source) AS dominant_source,
COUNT(DISTINCT source) AS source_count,
{res} AS resolution
FROM samp_geo WHERE {col} IS NOT NULL
GROUP BY {col}
) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""")


def build_facet_summaries(con, out):
# one row per (facet_type, facet_value); scheme kept NULL to match published shape
union = " UNION ALL ".join(
f"SELECT '{d}' AS facet_type, {d} AS facet_value FROM samp_geo WHERE {d} IS NOT NULL"
for d in FACET_DIMS)
con.execute(f"""COPY (
SELECT facet_type, facet_value, NULL::INTEGER AS scheme, COUNT(*) AS count
FROM ({union})
GROUP BY facet_type, facet_value
) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""")


def build_facet_cross_filter(con, out):
# baseline (no filter) + every single-dimension filter value, with counts for
# all facet dims. Mirrors the cube fast-path the explorer reads (one filter_* set).
selects = []
# baseline: all filter_* NULL
for fd in FACET_DIMS:
selects.append(
f"SELECT NULL::VARCHAR AS filter_source, NULL::VARCHAR AS filter_material, "
f"NULL::VARCHAR AS filter_context, NULL::VARCHAR AS filter_object_type, "
f"'{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count "
f"FROM samp_geo WHERE {fd} IS NOT NULL GROUP BY {fd}")
# single-dimension filters
for filt in FACET_DIMS:
for fd in FACET_DIMS:
cols = ", ".join(
(f"{filt} AS filter_{c}" if c == filt else f"NULL::VARCHAR AS filter_{c}")
for c in FACET_DIMS)
selects.append(
f"SELECT {cols}, '{fd}' AS facet_type, {fd} AS facet_value, COUNT(*) AS count "
f"FROM samp_geo WHERE {filt} IS NOT NULL AND {fd} IS NOT NULL GROUP BY {filt}, {fd}")
con.execute(f"""COPY (
SELECT filter_source, filter_material, filter_context, filter_object_type,
facet_type, facet_value, count
FROM ({' UNION ALL '.join(selects)})
) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""")


def build_wide_h3(con, wide, out):
con.execute(f"""COPY (
SELECT *,
CASE WHEN geometry IS NOT NULL
THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 4) END AS h3_res4,
CASE WHEN geometry IS NOT NULL
THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 6) END AS h3_res6,
CASE WHEN geometry IS NOT NULL
THEN h3_latlng_to_cell(ST_Y(ST_GeomFromWKB(geometry)), ST_X(ST_GeomFromWKB(geometry)), 8) END AS h3_res8
FROM read_parquet('{wide}')
) TO '{out}' (FORMAT PARQUET, COMPRESSION ZSTD)""")


def main():
ap = argparse.ArgumentParser()
ap.add_argument("--wide", required=True)
ap.add_argument("--outdir", required=True)
ap.add_argument("--tag", default="isamples_202601")
ap.add_argument("--only", default="", help="comma list: sample_facets_v2,samples_map_lite,wide_h3,h3_summaries,facet_summaries,facet_cross_filter")
ap.add_argument("--skip", default="", help="comma list of the same names to skip")
ap.add_argument("--validate-against", default="", help="dir of published files to compare schema+rowcount")
args = ap.parse_args()
os.makedirs(args.outdir, exist_ok=True)
only = set(filter(None, args.only.split(",")))
skip = set(filter(None, args.skip.split(",")))
want = lambda name: (not only or name in only) and name not in skip

t0 = time.time()
con = duckdb.connect()
con.execute("INSTALL h3 FROM community; LOAD h3; INSTALL spatial; LOAD spatial;")
log("building base sample tables…", t0)
con.execute(base_samples_sql(args.wide))
log(f"samp={con.sql('SELECT COUNT(*) FROM samp').fetchone()[0]:,} samp_geo={con.sql('SELECT COUNT(*) FROM samp_geo').fetchone()[0]:,}", t0)

p = lambda name: os.path.join(args.outdir, f"{args.tag}_{name}.parquet")
if want("sample_facets_v2"): build_sample_facets_v2(con, p("sample_facets_v2")); log("sample_facets_v2 ✓", t0)
if want("facet_summaries"): build_facet_summaries(con, p("facet_summaries")); log("facet_summaries ✓", t0)
if want("facet_cross_filter"):build_facet_cross_filter(con, p("facet_cross_filter")); log("facet_cross_filter ✓", t0)
if want("samples_map_lite"): build_samples_map_lite(con, p("samples_map_lite")); log("samples_map_lite ✓", t0)
if want("h3_summaries"):
for res in (4, 6, 8): build_h3_summary(con, p(f"h3_summary_res{res}"), res)
log("h3_summary_res{4,6,8} ✓", t0)
if want("wide_h3"): build_wide_h3(con, args.wide, p("wide_h3")); log("wide_h3 ✓", t0)

if args.validate_against:
print("\n=== validation vs published ===")
for name in ["sample_facets_v2","samples_map_lite","facet_summaries","facet_cross_filter",
"h3_summary_res4","h3_summary_res6","h3_summary_res8"]:
built = p(name)
pub = os.path.join(args.validate_against, f"{args.tag}_{name}.parquet")
if not (os.path.exists(built) and os.path.exists(pub)):
continue
bn = con.sql(f"SELECT COUNT(*) FROM read_parquet('{built}')").fetchone()[0]
pn = con.sql(f"SELECT COUNT(*) FROM read_parquet('{pub}')").fetchone()[0]
bcols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{built}')").fetchall()]
pcols = [r[0] for r in con.sql(f"DESCRIBE SELECT * FROM read_parquet('{pub}')").fetchall()]
ok = "✓" if bcols == pcols else "✗ COLS DIFFER"
print(f" {name:22} built={bn:>10,} pub={pn:>10,} cols {ok}")
log("done", t0)


if __name__ == "__main__":
main()
Loading