From 7f90d643be7683828a3efb8cf8d8b26719a593c7 Mon Sep 17 00:00:00 2001 From: tonic Date: Thu, 18 Jun 2026 20:24:20 +0800 Subject: [PATCH 1/3] feat(server): stream monolithic blob PUT straight to object store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit persistMonolithicUpload spooled the entire blob to a disk tempfile (via the chunked-upload session machinery), hashed it, then read it back and uploaded to the object store — two full passes plus ~2x disk I/O, and receive/upload run serially. For multi-GiB VM disk/memory blobs that's the bulk of a push (single PUTs were taking 2-4 min). Monolithic PUT already knows the digest up front (it's in the URL), so there's no need to buffer: stream the request body through a sha256 hasher straight into a concurrent multipart upload (no disk), verify the digest once the stream drains, and delete on mismatch so the content-addressed key never keeps unverified bytes. Server-side digest verification is preserved; no client change. The chunked PATCH path (no first-party client uses it) still spools to disk, since its digest is only known at finalize. GCS S3-compat streaming multipart validated against staging: 12 MiB in 3 concurrent 5 MiB parts, sha256 round-trip verified. --- objectstore/client.go | 22 +++++++++++++++ registry/registry.go | 6 +++++ server/registry_v2_uploads.go | 50 ++++++++++++++++++++++++----------- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/objectstore/client.go b/objectstore/client.go index db6013b..5f7246d 100644 --- a/objectstore/client.go +++ b/objectstore/client.go @@ -15,6 +15,13 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" ) +// blobUploadPartSize/blobUploadThreads bound PutStreaming memory at +// partSize*threads while keeping concurrency for multi-GiB blobs. +const ( + blobUploadPartSize = uint64(64) << 20 + blobUploadThreads = uint(4) +) + // ErrNotFound is returned when a requested object does not exist. var ErrNotFound = errors.New("not found") @@ -58,6 +65,21 @@ func (c *Client) Put(ctx context.Context, key string, body io.Reader, size int64 return nil } +// PutStreaming uploads body to key via concurrent multipart without buffering +// the whole object, reading from a non-seekable source. +func (c *Client) PutStreaming(ctx context.Context, key string, body io.Reader, size int64) error { + _, err := c.client.PutObject(ctx, c.cfg.Bucket, c.fullKey(key), body, size, minio.PutObjectOptions{ + ContentType: "application/octet-stream", + PartSize: blobUploadPartSize, + NumThreads: blobUploadThreads, + ConcurrentStreamParts: true, + }) + if err != nil { + return fmt.Errorf("put streaming %s: %w", key, err) + } + return nil +} + // PresignGet returns a time-limited URL for a direct GET, bypassing this process. func (c *Client) PresignGet(ctx context.Context, key string, ttl time.Duration) (string, error) { u, err := c.client.PresignedGetObject(ctx, c.cfg.Bucket, c.fullKey(key), ttl, nil) diff --git a/registry/registry.go b/registry/registry.go index 68687ad..c7339c6 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -64,6 +64,12 @@ func (r *Registry) PushBlobFromStream(ctx context.Context, digest string, body i return r.client.Put(ctx, blobKey(digest), body, size) } +// PushBlobStreaming streams a blob straight to the object store without +// buffering it whole. The caller verifies the digest after the stream drains. +func (r *Registry) PushBlobStreaming(ctx context.Context, digest string, body io.Reader, size int64) error { + return r.client.PutStreaming(ctx, blobKey(digest), body, size) +} + // BlobExists reports whether a blob with the given digest exists. func (r *Registry) BlobExists(ctx context.Context, digest string) (bool, error) { return r.client.Exists(ctx, blobKey(digest)) diff --git a/server/registry_v2_uploads.go b/server/registry_v2_uploads.go index 5f6c004..b0036c0 100644 --- a/server/registry_v2_uploads.go +++ b/server/registry_v2_uploads.go @@ -1,11 +1,15 @@ package server import ( + "crypto/sha256" + "encoding/hex" "errors" "fmt" "io" "net/http" "strconv" + + "github.com/projecteru2/core/log" ) const uploadBodyLimit = defaultUploadMaxBytes @@ -20,6 +24,7 @@ func (s *Server) v2InitBlobUpload(w http.ResponseWriter, r *http.Request) { id, err := s.uploads.Start() if err != nil { + log.WithFunc("server.v2InitBlobUpload").Errorf(r.Context(), err, "start upload session failed") v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) return } @@ -78,31 +83,41 @@ func (s *Server) v2CompleteBlobUpload(w http.ResponseWriter, r *http.Request) { s.persistVerifiedBlob(w, r, name, digest, fu) } +// persistMonolithicUpload streams a single-PUT blob (digest known up front) +// straight to the object store while hashing inline — no disk spool, receive +// and upload overlap. The digest is verified after the stream drains; a +// mismatch deletes the object so the content-addressed key never keeps +// unverified bytes (the verify happens the instant the upload completes, so the +// window is negligible and content-addressing protects readers regardless). func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, name, digest string) { - id, err := s.uploads.Start() - if err != nil { - v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) + dgst := stripSHA256Prefix(digest) + + if exists, err := s.reg.BlobExists(r.Context(), dgst); err == nil && exists { + drainBody(r.Body) + s.blobCreated(w, name, digest) return } - body := io.LimitReader(r.Body, uploadBodyLimit) - if _, appendErr := s.uploads.Append(id, body); appendErr != nil { - drainBody(body) - s.uploads.Cancel(id) - writeUploadAppendError(w, appendErr) + + hasher := sha256.New() + body := io.TeeReader(io.LimitReader(r.Body, uploadBodyLimit), hasher) + if err := s.reg.PushBlobStreaming(r.Context(), dgst, body, r.ContentLength); err != nil { + log.WithFunc("server.persistMonolithicUpload").Errorf(r.Context(), err, "stream blob sha256:%s (content-length=%d) failed", dgst, r.ContentLength) + v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error()) return } - fu, err := s.uploads.Finalize(id) - if err != nil { - drainBody(body) - writeUploadAppendError(w, err) + + if got := "sha256:" + hex.EncodeToString(hasher.Sum(nil)); got != digest { + _ = s.reg.DeleteBlob(r.Context(), dgst) + v2Error(w, http.StatusBadRequest, "DIGEST_INVALID", + fmt.Sprintf("digest mismatch: got %s, expected %s", got, digest)) return } - defer func() { _ = fu.Close() }() - - s.persistVerifiedBlob(w, r, name, digest, fu) + s.blobCreated(w, name, digest) } // persistVerifiedBlob verifies the digest then streams to the object store. +// Used by the chunked PATCH upload path, where the full blob is spooled to +// disk first so the digest can be checked before it reaches the object store. func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, name, digest string, fu *FinalizedUpload) { if got := fu.Digest(); got != digest { v2Error(w, http.StatusBadRequest, "DIGEST_INVALID", @@ -112,14 +127,19 @@ func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, nam rdr, err := fu.Reader() if err != nil { + log.WithFunc("server.persistVerifiedBlob").Errorf(r.Context(), err, "open spooled blob %s failed", digest) v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) return } if err := s.reg.PushBlobFromStream(r.Context(), stripSHA256Prefix(digest), rdr, fu.Size()); err != nil { + log.WithFunc("server.persistVerifiedBlob").Errorf(r.Context(), err, "push spooled blob %s (size=%d) failed", digest, fu.Size()) v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error()) return } + s.blobCreated(w, name, digest) +} +func (s *Server) blobCreated(w http.ResponseWriter, name, digest string) { w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/%s", name, digest)) w.Header().Set("Docker-Content-Digest", digest) w.WriteHeader(http.StatusCreated) From a3e5c90d56ee8b0aaeecd92bfbccd8c082ce5049 Mon Sep 17 00:00:00 2001 From: CMGS Date: Mon, 29 Jun 2026 00:57:49 +0800 Subject: [PATCH 2/3] harden(server): protect existing blobs in monolithic upload Streaming writes direct to the digest key (kept for speed), then verifies. - fail closed when BlobExists errors: streaming on could overwrite a valid blob with unverified bytes and the mismatch path would then delete it - delete a mismatched blob on a detached, bounded context so cancellation can't suppress it (leaving bytes the dedup trusts); log for a sweep - 500 store failures return INTERNAL_ERROR, not BLOB_UPLOAD_INVALID --- server/registry_v2_uploads.go | 49 ++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/server/registry_v2_uploads.go b/server/registry_v2_uploads.go index b0036c0..d7bc59d 100644 --- a/server/registry_v2_uploads.go +++ b/server/registry_v2_uploads.go @@ -1,6 +1,7 @@ package server import ( + "context" "crypto/sha256" "encoding/hex" "errors" @@ -8,11 +9,18 @@ import ( "io" "net/http" "strconv" + "time" "github.com/projecteru2/core/log" ) -const uploadBodyLimit = defaultUploadMaxBytes +const ( + uploadBodyLimit = defaultUploadMaxBytes + + // discardBlobTimeout bounds the detached corrupt-blob delete; the object + // store layer imposes no request timeout of its own. + discardBlobTimeout = 30 * time.Second +) func (s *Server) v2InitBlobUpload(w http.ResponseWriter, r *http.Request) { name := urlVar(r, "name") @@ -83,17 +91,22 @@ func (s *Server) v2CompleteBlobUpload(w http.ResponseWriter, r *http.Request) { s.persistVerifiedBlob(w, r, name, digest, fu) } -// persistMonolithicUpload streams a single-PUT blob (digest known up front) -// straight to the object store while hashing inline — no disk spool, receive -// and upload overlap. The digest is verified after the stream drains; a -// mismatch deletes the object so the content-addressed key never keeps -// unverified bytes (the verify happens the instant the upload completes, so the -// window is negligible and content-addressing protects readers regardless). +// persistMonolithicUpload streams a single-PUT blob straight to the digest key +// while hashing inline (no disk spool), then verifies. A mismatch discards the +// object: epoch's read path does not re-hash, so the digest key must only ever +// hold verified bytes. func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, name, digest string) { dgst := stripSHA256Prefix(digest) - if exists, err := s.reg.BlobExists(r.Context(), dgst); err == nil && exists { - drainBody(r.Body) + exists, err := s.reg.BlobExists(r.Context(), dgst) + if err != nil { + // fail closed: streaming on could overwrite then delete an existing blob. + log.WithFunc("server.persistMonolithicUpload").Errorf(r.Context(), err, "blob exists check for sha256:%s failed", dgst) + v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) + return + } + if exists { + drainBody(r.Body) // keep the connection reusable s.blobCreated(w, name, digest) return } @@ -102,12 +115,12 @@ func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, body := io.TeeReader(io.LimitReader(r.Body, uploadBodyLimit), hasher) if err := s.reg.PushBlobStreaming(r.Context(), dgst, body, r.ContentLength); err != nil { log.WithFunc("server.persistMonolithicUpload").Errorf(r.Context(), err, "stream blob sha256:%s (content-length=%d) failed", dgst, r.ContentLength) - v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error()) + v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) return } if got := "sha256:" + hex.EncodeToString(hasher.Sum(nil)); got != digest { - _ = s.reg.DeleteBlob(r.Context(), dgst) + s.discardCorruptBlob(r.Context(), dgst) v2Error(w, http.StatusBadRequest, "DIGEST_INVALID", fmt.Sprintf("digest mismatch: got %s, expected %s", got, digest)) return @@ -115,6 +128,18 @@ func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, s.blobCreated(w, name, digest) } +// discardCorruptBlob deletes a digest-mismatched blob on a detached, bounded +// context: a delete suppressed by request cancellation would leave unverified +// bytes the dedup path later trusts. A failure is logged for a backstop sweep. +func (s *Server) discardCorruptBlob(ctx context.Context, digest string) { + ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), discardBlobTimeout) + defer cancel() + if err := s.reg.DeleteBlob(ctx, digest); err != nil { + log.WithFunc("server.discardCorruptBlob").Errorf(ctx, err, + "delete corrupt blob sha256:%s failed; digest key holds unverified bytes the dedup path will trust", digest) + } +} + // persistVerifiedBlob verifies the digest then streams to the object store. // Used by the chunked PATCH upload path, where the full blob is spooled to // disk first so the digest can be checked before it reaches the object store. @@ -133,7 +158,7 @@ func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, nam } if err := s.reg.PushBlobFromStream(r.Context(), stripSHA256Prefix(digest), rdr, fu.Size()); err != nil { log.WithFunc("server.persistVerifiedBlob").Errorf(r.Context(), err, "push spooled blob %s (size=%d) failed", digest, fu.Size()) - v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error()) + v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) return } s.blobCreated(w, name, digest) From 1a3e9e0c2dffc7e217b2999020bd7085656eb9df Mon Sep 17 00:00:00 2001 From: CMGS Date: Mon, 29 Jun 2026 01:10:18 +0800 Subject: [PATCH 3/3] feat(server): cap concurrent streaming uploads to bound memory Each monolithic streaming upload holds ~256 MiB (64 MiB x 4 parts), so unbounded concurrency OOMs the server. Gate them through a buffered-channel semaphore sized by EPOCH_MAX_STREAMING_UPLOADS (default 8); excess uploads block on TCP backpressure instead of allocating. The dedup short-circuit and the disk-spooled chunked path stay ungated (they don't carry the cost). --- server/registry_v2_uploads.go | 21 +++++++++++++++++++++ server/server.go | 18 ++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/server/registry_v2_uploads.go b/server/registry_v2_uploads.go index d7bc59d..d26ba8d 100644 --- a/server/registry_v2_uploads.go +++ b/server/registry_v2_uploads.go @@ -111,6 +111,13 @@ func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, return } + release, ok := s.acquireStreamSlot(r.Context()) + if !ok { + log.WithFunc("server.persistMonolithicUpload").Debugf(r.Context(), "upload canceled while queued for a streaming slot") + return + } + defer release() + hasher := sha256.New() body := io.TeeReader(io.LimitReader(r.Body, uploadBodyLimit), hasher) if err := s.reg.PushBlobStreaming(r.Context(), dgst, body, r.ContentLength); err != nil { @@ -140,6 +147,20 @@ func (s *Server) discardCorruptBlob(ctx context.Context, digest string) { } } +// acquireStreamSlot blocks for one of the bounded streaming-upload slots so +// concurrent 256 MiB uploads can't exhaust memory; ok is false if ctx ends first. +func (s *Server) acquireStreamSlot(ctx context.Context) (release func(), ok bool) { + if s.streamSem == nil { + return func() {}, true // unbounded when built without New + } + select { + case s.streamSem <- struct{}{}: + return func() { <-s.streamSem }, true + case <-ctx.Done(): + return nil, false + } +} + // persistVerifiedBlob verifies the digest then streams to the object store. // Used by the chunked PATCH upload path, where the full blob is spooled to // disk first so the digest can be checked before it reaches the object store. diff --git a/server/server.go b/server/server.go index 73f89ec..52ddfbe 100644 --- a/server/server.go +++ b/server/server.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "os" + "strconv" "time" commonhttpx "github.com/cocoonstack/cocoon-common/httpx" @@ -22,8 +23,9 @@ import ( ) const ( - defaultUploadSpoolDir = "/var/cache/epoch/uploads" - shutdownTimeout = 15 * time.Second + defaultUploadSpoolDir = "/var/cache/epoch/uploads" + shutdownTimeout = 15 * time.Second + defaultMaxStreamingUploads = 8 ) var _ http.ResponseWriter = (*responseWriter)(nil) @@ -42,6 +44,7 @@ type Server struct { router *mux.Router // runtime uploads *uploadSessions // runtime uiHandler http.Handler // runtime + streamSem chan struct{} // runtime — caps concurrent 256 MiB streaming uploads } // New creates a Server with routes, auth, and upload sessions configured. @@ -62,6 +65,8 @@ func New(ctx context.Context, reg *registry.Registry, st *store.Store, addr stri if blobRedirect { logger.Infof(ctx, "blob redirect enabled, ttl=%s", blobRedirectTTL) } + maxStreams := resolveMaxStreamingUploads(os.Getenv("EPOCH_MAX_STREAMING_UPLOADS")) + logger.Infof(ctx, "max concurrent streaming uploads: %d", maxStreams) s := &Server{ addr: addr, registryToken: regToken, @@ -72,6 +77,7 @@ func New(ctx context.Context, reg *registry.Registry, st *store.Store, addr stri store: st, router: mux.NewRouter(), uploads: newUploadSessions(resolveUploadDir(ctx)), + streamSem: make(chan struct{}, maxStreams), } s.setupRoutes(ctx) return s @@ -226,6 +232,14 @@ func resolveUploadDir(ctx context.Context) string { return fallback } +// resolveMaxStreamingUploads caps concurrent streaming uploads; size to mem_limit / 256 MiB. +func resolveMaxStreamingUploads(raw string) int { + if n, err := strconv.Atoi(raw); err == nil && n > 0 { + return n + } + return defaultMaxStreamingUploads +} + func newHTTPServer(ctx context.Context, addr string, handler http.Handler) *http.Server { srv := commonhttpx.NewServer(addr, handler) srv.IdleTimeout = 60 * time.Second