Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions objectstore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
)

// blobUploadPartSize/blobUploadThreads bound PutStreaming memory at
// partSize*threads while keeping concurrency for multi-GiB blobs.
const (
blobUploadPartSize = uint64(64) << 20
blobUploadThreads = uint(4)
)

// ErrNotFound is returned when a requested object does not exist.
var ErrNotFound = errors.New("not found")

Expand Down Expand Up @@ -58,6 +65,21 @@ func (c *Client) Put(ctx context.Context, key string, body io.Reader, size int64
return nil
}

// PutStreaming uploads body to key via concurrent multipart without buffering
// the whole object, reading from a non-seekable source.
func (c *Client) PutStreaming(ctx context.Context, key string, body io.Reader, size int64) error {
_, err := c.client.PutObject(ctx, c.cfg.Bucket, c.fullKey(key), body, size, minio.PutObjectOptions{
ContentType: "application/octet-stream",
PartSize: blobUploadPartSize,
NumThreads: blobUploadThreads,
ConcurrentStreamParts: true,
})
if err != nil {
return fmt.Errorf("put streaming %s: %w", key, err)
}
return nil
}

// PresignGet returns a time-limited URL for a direct GET, bypassing this process.
func (c *Client) PresignGet(ctx context.Context, key string, ttl time.Duration) (string, error) {
u, err := c.client.PresignedGetObject(ctx, c.cfg.Bucket, c.fullKey(key), ttl, nil)
Expand Down
6 changes: 6 additions & 0 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func (r *Registry) PushBlobFromStream(ctx context.Context, digest string, body i
return r.client.Put(ctx, blobKey(digest), body, size)
}

// PushBlobStreaming streams a blob straight to the object store without
// buffering it whole. The caller verifies the digest after the stream drains.
func (r *Registry) PushBlobStreaming(ctx context.Context, digest string, body io.Reader, size int64) error {
return r.client.PutStreaming(ctx, blobKey(digest), body, size)
}

// BlobExists reports whether a blob with the given digest exists.
func (r *Registry) BlobExists(ctx context.Context, digest string) (bool, error) {
return r.client.Exists(ctx, blobKey(digest))
Expand Down
94 changes: 80 additions & 14 deletions server/registry_v2_uploads.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
package server

import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"time"

"github.com/projecteru2/core/log"
)

const uploadBodyLimit = defaultUploadMaxBytes
const (
uploadBodyLimit = defaultUploadMaxBytes

// discardBlobTimeout bounds the detached corrupt-blob delete; the object
// store layer imposes no request timeout of its own.
discardBlobTimeout = 30 * time.Second
)

func (s *Server) v2InitBlobUpload(w http.ResponseWriter, r *http.Request) {
name := urlVar(r, "name")
Expand All @@ -20,6 +32,7 @@ func (s *Server) v2InitBlobUpload(w http.ResponseWriter, r *http.Request) {

id, err := s.uploads.Start()
if err != nil {
log.WithFunc("server.v2InitBlobUpload").Errorf(r.Context(), err, "start upload session failed")
v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error())
return
}
Expand Down Expand Up @@ -78,31 +91,79 @@ func (s *Server) v2CompleteBlobUpload(w http.ResponseWriter, r *http.Request) {
s.persistVerifiedBlob(w, r, name, digest, fu)
}

// persistMonolithicUpload streams a single-PUT blob straight to the digest key
// while hashing inline (no disk spool), then verifies. A mismatch discards the
// object: epoch's read path does not re-hash, so the digest key must only ever
// hold verified bytes.
func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, name, digest string) {
id, err := s.uploads.Start()
dgst := stripSHA256Prefix(digest)

exists, err := s.reg.BlobExists(r.Context(), dgst)
if err != nil {
// fail closed: streaming on could overwrite then delete an existing blob.
log.WithFunc("server.persistMonolithicUpload").Errorf(r.Context(), err, "blob exists check for sha256:%s failed", dgst)
v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error())
return
}
body := io.LimitReader(r.Body, uploadBodyLimit)
if _, appendErr := s.uploads.Append(id, body); appendErr != nil {
drainBody(body)
s.uploads.Cancel(id)
writeUploadAppendError(w, appendErr)
if exists {
drainBody(r.Body) // keep the connection reusable
s.blobCreated(w, name, digest)
return
}
fu, err := s.uploads.Finalize(id)
if err != nil {
drainBody(body)
writeUploadAppendError(w, err)

release, ok := s.acquireStreamSlot(r.Context())
if !ok {
log.WithFunc("server.persistMonolithicUpload").Debugf(r.Context(), "upload canceled while queued for a streaming slot")
return
}
defer func() { _ = fu.Close() }()
defer release()

s.persistVerifiedBlob(w, r, name, digest, fu)
hasher := sha256.New()
body := io.TeeReader(io.LimitReader(r.Body, uploadBodyLimit), hasher)
if err := s.reg.PushBlobStreaming(r.Context(), dgst, body, r.ContentLength); err != nil {
log.WithFunc("server.persistMonolithicUpload").Errorf(r.Context(), err, "stream blob sha256:%s (content-length=%d) failed", dgst, r.ContentLength)
v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error())
return
}

if got := "sha256:" + hex.EncodeToString(hasher.Sum(nil)); got != digest {
s.discardCorruptBlob(r.Context(), dgst)
v2Error(w, http.StatusBadRequest, "DIGEST_INVALID",
fmt.Sprintf("digest mismatch: got %s, expected %s", got, digest))
return
}
s.blobCreated(w, name, digest)
}

// discardCorruptBlob deletes a digest-mismatched blob on a detached, bounded
// context: a delete suppressed by request cancellation would leave unverified
// bytes the dedup path later trusts. A failure is logged for a backstop sweep.
func (s *Server) discardCorruptBlob(ctx context.Context, digest string) {
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), discardBlobTimeout)
defer cancel()
if err := s.reg.DeleteBlob(ctx, digest); err != nil {
log.WithFunc("server.discardCorruptBlob").Errorf(ctx, err,
"delete corrupt blob sha256:%s failed; digest key holds unverified bytes the dedup path will trust", digest)
}
}

// acquireStreamSlot blocks for one of the bounded streaming-upload slots so
// concurrent 256 MiB uploads can't exhaust memory; ok is false if ctx ends first.
func (s *Server) acquireStreamSlot(ctx context.Context) (release func(), ok bool) {
if s.streamSem == nil {
return func() {}, true // unbounded when built without New
}
select {
case s.streamSem <- struct{}{}:
return func() { <-s.streamSem }, true
case <-ctx.Done():
return nil, false
}
}

// persistVerifiedBlob verifies the digest then streams to the object store.
// Used by the chunked PATCH upload path, where the full blob is spooled to
// disk first so the digest can be checked before it reaches the object store.
func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, name, digest string, fu *FinalizedUpload) {
if got := fu.Digest(); got != digest {
v2Error(w, http.StatusBadRequest, "DIGEST_INVALID",
Expand All @@ -112,14 +173,19 @@ func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, nam

rdr, err := fu.Reader()
if err != nil {
log.WithFunc("server.persistVerifiedBlob").Errorf(r.Context(), err, "open spooled blob %s failed", digest)
v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error())
return
}
if err := s.reg.PushBlobFromStream(r.Context(), stripSHA256Prefix(digest), rdr, fu.Size()); err != nil {
v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error())
log.WithFunc("server.persistVerifiedBlob").Errorf(r.Context(), err, "push spooled blob %s (size=%d) failed", digest, fu.Size())
v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error())
return
}
s.blobCreated(w, name, digest)
}

func (s *Server) blobCreated(w http.ResponseWriter, name, digest string) {
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/%s", name, digest))
w.Header().Set("Docker-Content-Digest", digest)
w.WriteHeader(http.StatusCreated)
Expand Down
18 changes: 16 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"os"
"strconv"
"time"

commonhttpx "github.com/cocoonstack/cocoon-common/httpx"
Expand All @@ -22,8 +23,9 @@ import (
)

const (
defaultUploadSpoolDir = "/var/cache/epoch/uploads"
shutdownTimeout = 15 * time.Second
defaultUploadSpoolDir = "/var/cache/epoch/uploads"
shutdownTimeout = 15 * time.Second
defaultMaxStreamingUploads = 8
)

var _ http.ResponseWriter = (*responseWriter)(nil)
Expand All @@ -42,6 +44,7 @@ type Server struct {
router *mux.Router // runtime
uploads *uploadSessions // runtime
uiHandler http.Handler // runtime
streamSem chan struct{} // runtime — caps concurrent 256 MiB streaming uploads
}

// New creates a Server with routes, auth, and upload sessions configured.
Expand All @@ -62,6 +65,8 @@ func New(ctx context.Context, reg *registry.Registry, st *store.Store, addr stri
if blobRedirect {
logger.Infof(ctx, "blob redirect enabled, ttl=%s", blobRedirectTTL)
}
maxStreams := resolveMaxStreamingUploads(os.Getenv("EPOCH_MAX_STREAMING_UPLOADS"))
logger.Infof(ctx, "max concurrent streaming uploads: %d", maxStreams)
s := &Server{
addr: addr,
registryToken: regToken,
Expand All @@ -72,6 +77,7 @@ func New(ctx context.Context, reg *registry.Registry, st *store.Store, addr stri
store: st,
router: mux.NewRouter(),
uploads: newUploadSessions(resolveUploadDir(ctx)),
streamSem: make(chan struct{}, maxStreams),
}
s.setupRoutes(ctx)
return s
Expand Down Expand Up @@ -226,6 +232,14 @@ func resolveUploadDir(ctx context.Context) string {
return fallback
}

// resolveMaxStreamingUploads caps concurrent streaming uploads; size to mem_limit / 256 MiB.
func resolveMaxStreamingUploads(raw string) int {
if n, err := strconv.Atoi(raw); err == nil && n > 0 {
return n
}
return defaultMaxStreamingUploads
}

func newHTTPServer(ctx context.Context, addr string, handler http.Handler) *http.Server {
srv := commonhttpx.NewServer(addr, handler)
srv.IdleTimeout = 60 * time.Second
Expand Down