Add ExperimentalListPackageCustomSchemas gRPC endpoint#1981
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #1981 +/- ##
==========================================
+ Coverage 57.90% 58.77% +0.87%
==========================================
Files 140 141 +1
Lines 13441 13418 -23
==========================================
+ Hits 7783 7887 +104
+ Misses 4470 4322 -148
- Partials 1188 1209 +21 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds support for streaming per-package “custom schema” FBC blobs over gRPC, backed by new cache storage primitives in both pogreb and JSON cache backends. This extends the opm serve gRPC surface to expose non-standard schema content without introducing new typed protobuf messages.
Changes:
- Adds
ListPackageCustomSchemas(schema, packageName)server-streaming gRPC RPC returninggoogle.protobuf.Struct. - Extends cache backends with meta blob storage/retrieval keyed by
(schema, packageName)and updates cache build to persist non-standard schemas. - Updates protoc/tooling wiring to include protobuf well-known types (
struct.proto) during codegen.
Reviewed changes
Copilot reviewed 13 out of 15 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| scripts/ensure-protoc.sh | Downloads protoc include files so well-known protos (e.g., struct.proto) can be imported during codegen. |
| README.md | Documents the new gRPC method in the public list of endpoints. |
| pkg/server/server.go | Implements the new gRPC streaming handler and converts stored JSON blobs into structpb.Struct. |
| pkg/server/server_test.go | Adds integration-style gRPC tests for streaming results, empty results, and invalid-argument behavior. |
| pkg/client/client_test.go | Updates the client stub interface to include the new RPC. |
| pkg/cache/pogrebv1.go | Adds meta blob Put/Send support in the pogreb backend. |
| pkg/cache/meta_key.go | Introduces meta key validation helpers for schema/packageName components. |
| pkg/cache/json.go | Adds meta blob directory layout and Put/Send support in the JSON backend. |
| pkg/cache/cache.go | Extends cache interface + build pipeline to store non-standard schema blobs and expose query method. |
| pkg/cache/cache_test.go | Adds unit tests covering custom schema storage/retrieval and packageless blob skipping. |
| pkg/api/registry.proto | Adds the new RPC and request message; imports google/protobuf/struct.proto. |
| pkg/api/registry.pb.go | Regenerated protobuf Go types (incl. new request type). |
| pkg/api/registry_grpc.pb.go | Regenerated gRPC service stubs (incl. new streaming method). |
| Makefile | Updates protoc include paths used by make codegen. |
| AGENTS.md | Documents the new gRPC method in internal agent docs. |
Files not reviewed (2)
- pkg/api/registry.pb.go: Language not supported
- pkg/api/registry_grpc.pb.go: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
9e6cfcf to
8f19197
Compare
8f19197 to
029eaa4
Compare
318e335 to
4226e11
Compare
4226e11 to
f45c501
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 16 changed files in this pull request and generated 2 comments.
Files not reviewed (2)
- pkg/api/registry.pb.go: Language not supported
- pkg/api/registry_grpc.pb.go: Language not supported
Comments suppressed due to low confidence (1)
pkg/cache/cache.go:323
- In Build’s WalkMetasFS callback, custom-schema metas with an empty packageName are still appended to byPackageReaders[""] and later written into the package index via
pkgs[pkgName] = pkgIndex[pkgName]. This will cause ListPackages to include an empty package name when packageless custom schema blobs exist. Consider short-circuiting after storing the meta (PutMeta) for non-standard schemas when packageName == "" so they don’t participate in package index construction (and don’t create a pkgs[""] entry).
switch meta.Schema {
case declcfg.SchemaPackage, declcfg.SchemaChannel, declcfg.SchemaBundle, declcfg.SchemaDeprecation:
default:
mk, err := newValidatedMetaKey(meta.Schema, packageName)
if err != nil {
return fmt.Errorf("invalid custom schema meta: %v", err)
}
if err := c.backend.PutMeta(ctx, mk, meta.Blob); err != nil {
return fmt.Errorf("store custom schema meta %v: %v", mk, err)
}
}
if _, err := tmpFile.Write(meta.Blob); err != nil {
return err
}
sr := io.NewSectionReader(tmpFile, offset, int64(len(meta.Blob)))
byPackageReaders[packageName] = append(byPackageReaders[packageName], sr)
offset += int64(len(meta.Blob))
f45c501 to
ba1392c
Compare
ba1392c to
fbe2335
Compare
26f2d3a to
3176657
Compare
| if err := json.Unmarshal(blob, &m); err != nil { | ||
| return status.Errorf(codes.Internal, "unmarshal custom schema blob: %v", err) | ||
| } |
There was a problem hiding this comment.
This seems like it could be expensive if we're able to use a non-JSON blob in the cache? iirc the pogreb cache stores the other blobs directly with proto serialization. Could we have the closure directly provide structpb.NewStruct(m), and let the implementation decide how to get the blob into that format?
There was a problem hiding this comment.
kk - I've updated the closure's signature and refactored. Now it provides a pbf struct as opposed to []bytes
| The server also exposes an experimental `ExperimentalRegistry` service: | ||
|
|
||
| ```sh | ||
| $ grpcurl -plaintext localhost:50051 list api.ExperimentalRegistry | ||
| ExperimentalListPackageCustomSchemas | ||
| ``` | ||
|
|
||
| Experimental endpoints require the `X-Acknowledge-Experimental: true` gRPC metadata header. Without it, the endpoint returns an empty response. | ||
|
|
||
| ```sh | ||
| grpcurl -plaintext -H 'X-Acknowledge-Experimental: true' \ | ||
| -d '{"schema":"custom.operator.io","packageName":"mypkg"}' \ | ||
| localhost:50051 api.ExperimentalRegistry/ExperimentalListPackageCustomSchemas | ||
| ``` |
There was a problem hiding this comment.
WDYT about leaving all of this undocumented?
There was a problem hiding this comment.
Same with AGENTS.md?
Someone developing a client may point their agent at this repo. To be fair, the agent can probably figure this out anyway if they see our repo, but maybe we don't make it easy?
| md, _ := metadata.FromIncomingContext(stream.Context()) | ||
| // Silently return an empty stream when the experimental header is absent | ||
| // so that unaware clients see no disruption. | ||
| if vals := md.Get("x-acknowledge-experimental"); len(vals) == 0 || vals[0] != "true" { |
There was a problem hiding this comment.
I believe so. The docs say that the key is converted to lower-case before searching the headers.
There was a problem hiding this comment.
I've tested it out manually and can confirm its not case-sensitive
There was a problem hiding this comment.
I also moved the header to a constant
9e45498 to
9b06802
Compare
9b06802 to
ab796de
Compare
ab796de to
712779c
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 15 changed files in this pull request and generated 4 comments.
Files not reviewed (2)
- pkg/api/registry.pb.go: Language not supported
- pkg/api/registry_grpc.pb.go: Language not supported
Comments suppressed due to low confidence (2)
pkg/cache/pogrebv1.go:188
- SendMetas returns nil for any error from q.db.Get(...), not just "key not found". This will silently hide real DB failures and make clients think there are simply no results. Consider returning the error unless it is the expected not-found condition.
func (q *pogrebV1Backend) SendMetas(ctx context.Context, key metaKey, sender func([]byte) error) error {
data, err := q.db.Get(q.metaDBKey(key))
if err != nil || len(data) == 0 {
return nil
}
pkg/cache/pogrebv1.go:206
- SendMetas ignores trailing bytes if the stored value is corrupted such that fewer than 4 bytes remain after parsing blobs (the loop exits and returns nil). This can silently drop data/corruption signals. Consider validating that the value is fully consumed (len(data)==0) after the loop, and returning an error otherwise.
return err
}
data = data[blobLen:]
}
return nil
}
712779c to
2efb1d3
Compare
2efb1d3 to
2296dab
Compare
2296dab to
3089411
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 15 changed files in this pull request and generated 4 comments.
Files not reviewed (2)
- pkg/api/registry.pb.go: Language not supported
- pkg/api/registry_grpc.pb.go: Language not supported
Comments suppressed due to low confidence (2)
pkg/cache/pogrebv1.go:191
SendMetasreturns an error whenq.db.Getfails. For lookups where no metas exist for the requested key, this should return an empty result (nil) rather than surfacing a storage-layer not-found as a fatal error to callers. Treat the not-found case as "no metas" and return nil.
binary.BigEndian.PutUint32(header, uint32(len(blob))) //#nosec G115 -- bounds checked above
return q.db.Put(dbKey, append(existing, append(header, blob...)...))
}
func (q *pogrebV1Backend) SendMetas(ctx context.Context, key metaKey, sender func([]byte) error) error {
data, err := q.db.Get(q.metaDBKey(key))
if err != nil {
return fmt.Errorf("read meta blobs: %w", err)
scripts/ensure-protoc.sh:16
- The
fetch_cmduses unquoted${DEST}inrm/mkdir -p, which can break if the repo root path contains spaces or glob characters. Quote these path expansions consistently (similar to how other parts of the command already quote${DEST}) to avoid word-splitting issues.
fetch_cmd="(curl -sSfLo '${DEST}/protoc-${ver}.zip' 'https://github.com/protocolbuffers/protobuf/releases/download/v${ver}/protoc-${ver}-${os}-${arch}.zip' && unzip -o -j -d '${DEST}' '${DEST}/protoc-${ver}.zip' bin/protoc && unzip -o -d '${DEST}' '${DEST}/protoc-${ver}.zip' 'include/*' && rm ${DEST}/protoc-${ver}.zip)"
if [[ "${ver}" != "$(eval ${ver_cmd})" ]]; then
echo "protoc missing or not version '${ver}', downloading..."
mkdir -p ${DEST}
eval ${fetch_cmd}
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 15 changed files in this pull request and generated 1 comment.
Files not reviewed (2)
- pkg/api/registry.pb.go: Language not supported
- pkg/api/registry_grpc.pb.go: Language not supported
Comments suppressed due to low confidence (1)
scripts/ensure-protoc.sh:16
- This script builds a command string containing the user-supplied version and executes it via eval. If a caller passes a malicious version string containing shell metacharacters/quotes, it could lead to arbitrary command execution. Consider avoiding eval by invoking curl/unzip directly (or strictly validating/sanitizing the version argument and quoting ${DEST} in the rm/mkdir calls).
os="$(uname -s | sed 's/Darwin/osx/')"
arch="$(uname -m | sed 's/arm64/aarch_64/')"
fetch_cmd="(curl -sSfLo '${DEST}/protoc-${ver}.zip' 'https://github.com/protocolbuffers/protobuf/releases/download/v${ver}/protoc-${ver}-${os}-${arch}.zip' && unzip -o -j -d '${DEST}' '${DEST}/protoc-${ver}.zip' bin/protoc && unzip -o -d '${DEST}' '${DEST}/protoc-${ver}.zip' 'include/*' && rm ${DEST}/protoc-${ver}.zip)"
if [[ "${ver}" != "$(eval ${ver_cmd})" ]]; then
echo "protoc missing or not version '${ver}', downloading..."
mkdir -p ${DEST}
eval ${fetch_cmd}
Introduce a new ExperimentalRegistry gRPC service with a streaming endpoint for retrieving custom schema blobs (non-core FBC schemas like "custom.operator.io") from catalog caches, filtered by schema name and optionally by package name. Design: - Separate ExperimentalRegistry service to isolate experimental API surface from the stable Registry service - Callers must send X-Acknowledge-Experimental: true as gRPC metadata; without it the endpoint silently returns an empty stream - Schema and package name inputs are validated against an allowlist regex to prevent path traversal - Both JSON (filesystem) and pogreb (embedded KV) cache backends implement PutMeta/SendMetas for storing and streaming custom schema blobs at build and query time - Pogreb stores multiple blobs per (schema, package) key using length-prefixed encoding for O(1) lookup Also: - Extract protobuf well-known types (google/protobuf/struct.proto) during protoc download so codegen can import them - Add ListBundles to README API listing Signed-off-by: Per G. da Silva <pegoncal@redhat.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Per G. da Silva <pegoncal@redhat.com>
| func (q *pogrebV1Backend) PutMeta(_ context.Context, key metaKey, blob []byte) error { | ||
| if len(blob) > math.MaxUint32 { | ||
| return fmt.Errorf("meta blob too large: %d bytes exceeds uint32 max", len(blob)) | ||
| } | ||
| dbKey := q.metaDBKey(key) | ||
| existing, err := q.db.Get(dbKey) | ||
| if err != nil { | ||
| return fmt.Errorf("read existing meta blobs: %w", err) | ||
| } | ||
| header := make([]byte, 4) | ||
| binary.BigEndian.PutUint32(header, uint32(len(blob))) //#nosec G115 -- bounds checked above | ||
| return q.db.Put(dbKey, append(existing, append(header, blob...)...)) | ||
| } |
There was a problem hiding this comment.
This is a copilot hiccup - Get returns nil on not found not an error
| func (q *pogrebV1Backend) SendMetas(ctx context.Context, key metaKey, sender func([]byte) error) error { | ||
| data, err := q.db.Get(q.metaDBKey(key)) | ||
| if err != nil { | ||
| return fmt.Errorf("read meta blobs: %w", err) | ||
| } | ||
| if len(data) == 0 { | ||
| return nil | ||
| } |
There was a problem hiding this comment.
This is a copilot hiccup - Get returns nil on not found not an error
| func (q *pogrebV1Backend) metaDBKey(in metaKey) []byte { | ||
| return []byte(fmt.Sprintf("%s%s/%s", metaKeyPrefix, in.Schema, in.PackageName)) | ||
| } |
There was a problem hiding this comment.
The length prefix is to do with how the list of documents are stored against the key
| func (s *ExperimentalRegistryServer) ExperimentalListPackageCustomSchemas(req *api.ExperimentalListPackageCustomSchemasRequest, stream api.ExperimentalRegistry_ExperimentalListPackageCustomSchemasServer) error { | ||
| md, _ := metadata.FromIncomingContext(stream.Context()) | ||
| // Silently return an empty stream when the experimental header is absent | ||
| // so that unaware clients see no disruption. | ||
| if vals := md.Get(headerXAcknowledgeExperimental); len(vals) == 0 || vals[0] != "true" { |
|
Re: experimental header gating Intentionally checking only |
|
/lgtm |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: grokspawn The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
| } | ||
| header := make([]byte, 4) | ||
| binary.BigEndian.PutUint32(header, uint32(len(blob))) //#nosec G115 -- bounds checked above | ||
| return q.db.Put(dbKey, append(existing, append(header, blob...)...)) |
There was a problem hiding this comment.
This is still JSON we're writing to the cache right? I was thinking something like this:
st := &structpb.Struct{}
st.UnmarshalJSON(meta.Blob)
protoBytes := proto.Marshal(st)
header := make([]byte, 4)
binary.BigEndian.PutUint32(header, uint32(len(protoBytes))) //#nosec G115 -- bounds checked above
return q.db.Put(dbKey, append(existing, append(header, protoBytes...)...))Unmarshaling that at read time will be much more efficient, I think.
| return q.db.Put(dbKey, append(existing, append(header, blob...)...)) | ||
| } | ||
|
|
||
| func (q *pogrebV1Backend) SendMetas(ctx context.Context, key metaKey, sender func([]byte) error) error { |
There was a problem hiding this comment.
Plumb sender func(meta *structpb.Struct) error all the way through to the backend. That way the cached can choose how to represent it.
| func (q *pogrebV1Backend) SendMetas(ctx context.Context, key metaKey, sender func([]byte) error) error { | |
| func (q *pogrebV1Backend) SendMetas(ctx context.Context, key metaKey, sender func(meta *structpb.Struct) error) error { |
| return fmt.Errorf("meta blob too large: %d bytes exceeds uint32 max", len(blob)) | ||
| } | ||
| dbKey := q.metaDBKey(key) | ||
| existing, err := q.db.Get(dbKey) |
There was a problem hiding this comment.
I didn't think about the iterative append process of the cache key/value here. This might be hell on the Go GC if there are every multiple blobs for the same key. In our existing use case, we don't expect that. Maybe if this becomes problematic for a future use case, we make another cache implementation that supports prefix/range scans.
Summary
Adds a streaming gRPC endpoint for querying custom (non-core) FBC schema blobs from catalog caches. This enables clients like catalogd to serve arbitrary user-defined catalog metadata alongside the standard olm.package/olm.channel/olm.bundle schemas.
ExperimentalRegistrygRPC service withExperimentalListPackageCustomSchemasstreaming RPCX-Acknowledge-Experimental: truegRPC metadata header — absent header silently returns an empty stream^[a-zA-Z0-9][a-zA-Z0-9._-]*$) to prevent path traversal in cache lookupsPutMeta/SendMetasfor custom schema blob storagegoogle/protobuf/struct.proto) for codegenTest plan
metaKeyvalidation (valid keys, empty, path traversal, dot sequences, leading special chars, filesystem-unsafe characters)go test ./pkg/cache/... ./pkg/server/...