Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions objectstore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,29 @@ func (c *Client) Put(ctx context.Context, key string, body io.Reader, size int64
return nil
}

// blobUploadPartSize/blobUploadThreads bound PutStreaming memory at
// partSize*threads while keeping concurrency for multi-GiB blobs.
const (
blobUploadPartSize = uint64(64) << 20
blobUploadThreads = uint(4)
)

// PutStreaming uploads body to key with concurrent multipart, reading from a
// non-seekable source without buffering the whole object. Memory is bounded at
// blobUploadPartSize*blobUploadThreads.
func (c *Client) PutStreaming(ctx context.Context, key string, body io.Reader, size int64) error {
_, err := c.client.PutObject(ctx, c.cfg.Bucket, c.fullKey(key), body, size, minio.PutObjectOptions{
ContentType: "application/octet-stream",
PartSize: blobUploadPartSize,
NumThreads: blobUploadThreads,
ConcurrentStreamParts: true,
})
if err != nil {
return fmt.Errorf("put streaming %s: %w", key, err)
}
return nil
}

// Get returns a streaming reader and size for the given key.
func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, int64, error) {
obj, err := c.client.GetObject(ctx, c.cfg.Bucket, c.fullKey(key), minio.GetObjectOptions{})
Expand Down
6 changes: 6 additions & 0 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func (r *Registry) PushBlobFromStream(ctx context.Context, digest string, body i
return r.client.Put(ctx, blobKey(digest), body, size)
}

// PushBlobStreaming streams a blob straight to the object store without
// buffering it whole. The caller verifies the digest after the stream drains.
func (r *Registry) PushBlobStreaming(ctx context.Context, digest string, body io.Reader, size int64) error {
return r.client.PutStreaming(ctx, blobKey(digest), body, size)
}

// BlobExists reports whether a blob with the given digest exists.
func (r *Registry) BlobExists(ctx context.Context, digest string) (bool, error) {
return r.client.Exists(ctx, blobKey(digest))
Expand Down
44 changes: 29 additions & 15 deletions server/registry_v2_uploads.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package server

import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -78,31 +80,40 @@ func (s *Server) v2CompleteBlobUpload(w http.ResponseWriter, r *http.Request) {
s.persistVerifiedBlob(w, r, name, digest, fu)
}

// persistMonolithicUpload streams a single-PUT blob (digest known up front)
// straight to the object store while hashing inline — no disk spool, receive
// and upload overlap. The digest is verified after the stream drains; a
// mismatch deletes the object so the content-addressed key never keeps
// unverified bytes (the verify happens the instant the upload completes, so the
// window is negligible and content-addressing protects readers regardless).
func (s *Server) persistMonolithicUpload(w http.ResponseWriter, r *http.Request, name, digest string) {
id, err := s.uploads.Start()
if err != nil {
v2Error(w, http.StatusInternalServerError, "INTERNAL_ERROR", err.Error())
dgst := stripSHA256Prefix(digest)

if exists, err := s.reg.BlobExists(r.Context(), dgst); err == nil && exists {
drainBody(r.Body)
s.blobCreated(w, name, digest)
return
}
body := io.LimitReader(r.Body, uploadBodyLimit)
if _, appendErr := s.uploads.Append(id, body); appendErr != nil {
drainBody(body)
s.uploads.Cancel(id)
writeUploadAppendError(w, appendErr)

hasher := sha256.New()
body := io.TeeReader(io.LimitReader(r.Body, uploadBodyLimit), hasher)
if err := s.reg.PushBlobStreaming(r.Context(), dgst, body, r.ContentLength); err != nil {
v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error())
return
}
fu, err := s.uploads.Finalize(id)
if err != nil {
drainBody(body)
writeUploadAppendError(w, err)

if got := "sha256:" + hex.EncodeToString(hasher.Sum(nil)); got != digest {
_ = s.reg.DeleteBlob(r.Context(), dgst)
v2Error(w, http.StatusBadRequest, "DIGEST_INVALID",
fmt.Sprintf("digest mismatch: got %s, expected %s", got, digest))
return
}
defer func() { _ = fu.Close() }()

s.persistVerifiedBlob(w, r, name, digest, fu)
s.blobCreated(w, name, digest)
}

// persistVerifiedBlob verifies the digest then streams to the object store.
// Used by the chunked PATCH upload path, where the full blob is spooled to
// disk first so the digest can be checked before it reaches the object store.
func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, name, digest string, fu *FinalizedUpload) {
if got := fu.Digest(); got != digest {
v2Error(w, http.StatusBadRequest, "DIGEST_INVALID",
Expand All @@ -119,7 +130,10 @@ func (s *Server) persistVerifiedBlob(w http.ResponseWriter, r *http.Request, nam
v2Error(w, http.StatusInternalServerError, "BLOB_UPLOAD_INVALID", err.Error())
return
}
s.blobCreated(w, name, digest)
}

func (s *Server) blobCreated(w http.ResponseWriter, name, digest string) {
w.Header().Set("Location", fmt.Sprintf("/v2/%s/blobs/%s", name, digest))
w.Header().Set("Docker-Content-Digest", digest)
w.WriteHeader(http.StatusCreated)
Expand Down
Loading