diff --git a/objectstore/client.go b/objectstore/client.go index f789844..ef41657 100644 --- a/objectstore/client.go +++ b/objectstore/client.go @@ -58,6 +58,29 @@ func (c *Client) Put(ctx context.Context, key string, body io.Reader, size int64 return nil } +// blobUploadPartSize/blobUploadThreads bound PutStreaming memory at +// partSize*threads while keeping concurrency for multi-GiB blobs. +const ( + blobUploadPartSize = uint64(64) << 20 + blobUploadThreads = uint(4) +) + +// PutStreaming uploads body to key with concurrent multipart, reading from a +// non-seekable source without buffering the whole object. Memory is bounded at +// blobUploadPartSize*blobUploadThreads. +func (c *Client) PutStreaming(ctx context.Context, key string, body io.Reader, size int64) error { + _, err := c.client.PutObject(ctx, c.cfg.Bucket, c.fullKey(key), body, size, minio.PutObjectOptions{ + ContentType: "application/octet-stream", + PartSize: blobUploadPartSize, + NumThreads: blobUploadThreads, + ConcurrentStreamParts: true, + }) + if err != nil { + return fmt.Errorf("put streaming %s: %w", key, err) + } + return nil +} + // Get returns a streaming reader and size for the given key. func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, int64, error) { obj, err := c.client.GetObject(ctx, c.cfg.Bucket, c.fullKey(key), minio.GetObjectOptions{}) diff --git a/registry/registry.go b/registry/registry.go index 5dc7799..cbf8f44 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -64,6 +64,12 @@ func (r *Registry) PushBlobFromStream(ctx context.Context, digest string, body i return r.client.Put(ctx, blobKey(digest), body, size) } +// PushBlobStreaming streams a blob straight to the object store without +// buffering it whole. The caller verifies the digest after the stream drains. +func (r *Registry) PushBlobStreaming(ctx context.Context, digest string, body io.Reader, size int64) error { + return r.client.PutStreaming(ctx, blobKey(digest), body, size) +} + // BlobExists reports whether a blob with the given digest exists. func (r *Registry) BlobExists(ctx context.Context, digest string) (bool, error) { return r.client.Exists(ctx, blobKey(digest)) diff --git a/server/registry_v2_uploads.go b/server/registry_v2_uploads.go index 5f6c004..5206e68 100644 --- a/server/registry_v2_uploads.go +++ b/server/registry_v2_uploads.go @@ -1,6 +1,8 @@ package server import ( + "crypto/sha256" + "encoding/hex" "errors" "fmt" "io" @@ -78,31 +80,40 @@ func (s *Server) v2CompleteBlobUpload(w http.ResponseWriter, r *http.Request) { s.persistVerifiedBlob(w, r, name, digest, fu) } +// persistMonolithicUpload streams a single-PUT blob (digest known up front) +// straight to the object store while hashing inline — no disk spool, receive +// and upload overlap. The digest is verified after the stream drains; a +// mismatch deletes the object so the content-addressed key never keeps +// unverified bytes (the verify happens the instant the upload completes, so the +// window is negligible and content-addressing protects readers regardless). func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, name, digest string) { - id, err := s.uploads.Start() - if err != nil { - v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error()) + dgst := stripSHA256Prefix(digest) + + if exists, err := s.reg.BlobExists(r.Context(), dgst); err == nil && exists { + drainBody(r.Body) + s.blobCreated(w, name, digest) return } - body := io.LimitReader(r.Body, uploadBodyLimit) - if _, appendErr := s.uploads.Append(id, body); appendErr != nil { - drainBody(body) - s.uploads.Cancel(id) - writeUploadAppendError(w, appendErr) + + hasher := sha256.New() + body := io.TeeReader(io.LimitReader(r.Body, uploadBodyLimit), hasher) + if err := s.reg.PushBlobStreaming(r.Context(), dgst, body, r.ContentLength); err != nil { + v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error()) return } - fu, err := s.uploads.Finalize(id) - if err != nil { - drainBody(body) - writeUploadAppendError(w, err) + + if got := "sha256:" + hex.EncodeToString(hasher.Sum(nil)); got != digest { + _ = s.reg.DeleteBlob(r.Context(), dgst) + v2Error(w, http.StatusBadRequest, "DIGEST_INVALID", + fmt.Sprintf("digest mismatch: got %s, expected %s", got, digest)) return } - defer func() { _ = fu.Close() }() - - s.persistVerifiedBlob(w, r, name, digest, fu) + s.blobCreated(w, name, digest) } // persistVerifiedBlob verifies the digest then streams to the object store. +// Used by the chunked PATCH upload path, where the full blob is spooled to +// disk first so the digest can be checked before it reaches the object store. func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, name, digest string, fu *FinalizedUpload) { if got := fu.Digest(); got != digest { v2Error(w, http.StatusBadRequest, "DIGEST_INVALID", @@ -119,7 +130,10 @@ func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, nam v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error()) return } + s.blobCreated(w, name, digest) +} +func (s *Server) blobCreated(w http.ResponseWriter, name, digest string) { w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/%s", name, digest)) w.Header().Set("Docker-Content-Digest", digest) w.WriteHeader(http.StatusCreated)