diff --git a/epoch-server.yaml b/epoch-server.yaml index 9c1b374..51fcdd0 100644 --- a/epoch-server.yaml +++ b/epoch-server.yaml @@ -68,6 +68,11 @@ spec: # (tmpfs in most clusters) and OOMs on multi-GiB pushes. - name: EPOCH_UPLOAD_DIR value: /var/cache/epoch/uploads + # Serve blob GETs as redirects to presigned object-store URLs so + # multi-GiB bytes flow client<->GCS directly instead of being + # proxied (and TLS-re-encrypted) through this pod. + - name: EPOCH_BLOB_REDIRECT + value: "true" # Optional: absolute base URL clients reach the server at. Used # to anchor the OCI WWW-Authenticate realm + /v2/token. Required # when fronting epoch with a proxy that does NOT set @@ -131,11 +136,11 @@ spec: periodSeconds: 10 resources: requests: - cpu: 100m - memory: 128Mi - limits: cpu: "1" - memory: 512Mi + memory: 1Gi + limits: + cpu: "3" + memory: 4Gi volumeMounts: - name: upload-spool mountPath: /var/cache/epoch/uploads diff --git a/objectstore/client.go b/objectstore/client.go index f789844..3db8db4 100644 --- a/objectstore/client.go +++ b/objectstore/client.go @@ -58,6 +58,16 @@ func (c *Client) Put(ctx context.Context, key string, body io.Reader, size int64 return nil } +// PresignGet returns a time-limited URL that lets a client GET the object +// directly from the backing store, bypassing this process. +func (c *Client) PresignGet(ctx context.Context, key string, ttl time.Duration) (string, error) { + u, err := c.client.PresignedGetObject(ctx, c.cfg.Bucket, c.fullKey(key), ttl, nil) + if err != nil { + return "", fmt.Errorf("presign get %s: %w", key, err) + } + return u.String(), nil +} + // Get returns a streaming reader and size for the given key. func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, int64, error) { obj, err := c.client.GetObject(ctx, c.cfg.Bucket, c.fullKey(key), minio.GetObjectOptions{}) diff --git a/registry/registry.go b/registry/registry.go index 5dc7799..4c6cb1b 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -74,6 +74,12 @@ func (r *Registry) StreamBlob(ctx context.Context, digest string) (io.ReadCloser return r.client.Get(ctx, blobKey(digest)) } +// PresignBlobGet returns a time-limited URL for fetching a blob directly from +// the object store, bypassing this process. +func (r *Registry) PresignBlobGet(ctx context.Context, digest string, ttl time.Duration) (string, error) { + return r.client.PresignGet(ctx, blobKey(digest), ttl) +} + // BlobSize returns the size of a blob in bytes. func (r *Registry) BlobSize(ctx context.Context, digest string) (int64, error) { return r.client.Head(ctx, blobKey(digest)) diff --git a/server/registry_v2_blobs.go b/server/registry_v2_blobs.go index 34463d4..7f27088 100644 --- a/server/registry_v2_blobs.go +++ b/server/registry_v2_blobs.go @@ -4,13 +4,35 @@ import ( "io" "net/http" "strconv" + "time" + + "github.com/projecteru2/core/log" "github.com/cocoonstack/epoch/manifest" ) +const defaultBlobRedirectTTL = time.Hour + +// resolveBlobRedirectTTL parses EPOCH_BLOB_REDIRECT_TTL, falling back to the +// default for empty, unparseable, or non-positive values. +func resolveBlobRedirectTTL(raw string) time.Duration { + if raw == "" { + return defaultBlobRedirectTTL + } + d, err := time.ParseDuration(raw) + if err != nil || d <= 0 { + return defaultBlobRedirectTTL + } + return d +} + func (s *Server) v2GetBlob(w http.ResponseWriter, r *http.Request) { dgst := stripSHA256Prefix(urlVar(r, "digest")) + if s.blobRedirect && s.redirectBlob(w, r, dgst) { + return + } + body, size, err := s.reg.StreamBlob(r.Context(), dgst) if err != nil { if isNotFound(err) { @@ -31,6 +53,31 @@ func (s *Server) v2GetBlob(w http.ResponseWriter, r *http.Request) { _, _ = io.Copy(w, body) } +// redirectBlob points the client at a presigned object-store URL so blob bytes +// flow directly from storage instead of being proxied through this process. +// Returns false without writing a response when the blob can't be redirected, +// so v2GetBlob falls back to streaming. +func (s *Server) redirectBlob(w http.ResponseWriter, r *http.Request, dgst string) bool { + logger := log.WithFunc("server.redirectBlob") + exists, err := s.reg.BlobExists(r.Context(), dgst) + if err != nil { + logger.Warnf(r.Context(), "blob exists check for %s failed, falling back to proxy: %v", dgst, err) + return false + } + if !exists { + v2Error(w, http.StatusNotFound, "BLOB_UNKNOWN", "blob not found") + return true + } + url, err := s.reg.PresignBlobGet(r.Context(), dgst, s.blobRedirectTTL) + if err != nil { + logger.Warnf(r.Context(), "presign blob %s failed, falling back to proxy: %v", dgst, err) + return false + } + w.Header().Set("Docker-Content-Digest", "sha256:"+dgst) + http.Redirect(w, r, url, http.StatusTemporaryRedirect) + return true +} + func (s *Server) v2HeadBlob(w http.ResponseWriter, r *http.Request) { dgst := stripSHA256Prefix(urlVar(r, "digest")) diff --git a/server/registry_v2_blobs_test.go b/server/registry_v2_blobs_test.go new file mode 100644 index 0000000..0a31c0e --- /dev/null +++ b/server/registry_v2_blobs_test.go @@ -0,0 +1,25 @@ +package server + +import ( + "testing" + "time" +) + +func TestResolveBlobRedirectTTL(t *testing.T) { + cases := []struct { + raw string + want time.Duration + }{ + {"", defaultBlobRedirectTTL}, + {"30m", 30 * time.Minute}, + {"2h", 2 * time.Hour}, + {"garbage", defaultBlobRedirectTTL}, + {"0", defaultBlobRedirectTTL}, + {"-5m", defaultBlobRedirectTTL}, + } + for _, c := range cases { + if got := resolveBlobRedirectTTL(c.raw); got != c.want { + t.Errorf("resolveBlobRedirectTTL(%q) = %s, want %s", c.raw, got, c.want) + } + } +} diff --git a/server/server.go b/server/server.go index c734e63..50108e3 100644 --- a/server/server.go +++ b/server/server.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "os" + "strconv" "time" commonhttpx "github.com/cocoonstack/cocoon-common/httpx" @@ -29,9 +30,11 @@ var _ http.ResponseWriter = (*responseWriter)(nil) // Server is the Epoch HTTP server providing OCI Distribution and control plane APIs. type Server struct { - addr string // config - registryToken string // config — Bearer token for /v2/ (empty = no token required) - sso *SSOConfig // config — nil = UI auth disabled + addr string // config + registryToken string // config — Bearer token for /v2/ (empty = no token required) + sso *SSOConfig // config — nil = UI auth disabled + blobRedirect bool // config — redirect blob GETs to presigned object-store URLs + blobRedirectTTL time.Duration reg *registry.Registry // resources store *store.Store // resources @@ -54,14 +57,21 @@ func New(ctx context.Context, reg *registry.Registry, st *store.Store, addr stri if regToken != "" { logger.Info(ctx, "registry token auth enabled") } + blobRedirect, _ := strconv.ParseBool(os.Getenv("EPOCH_BLOB_REDIRECT")) + blobRedirectTTL := resolveBlobRedirectTTL(os.Getenv("EPOCH_BLOB_REDIRECT_TTL")) + if blobRedirect { + logger.Infof(ctx, "blob redirect enabled, ttl=%s", blobRedirectTTL) + } s := &Server{ - addr: addr, - registryToken: regToken, - sso: sso, - reg: reg, - store: st, - router: mux.NewRouter(), - uploads: newUploadSessions(resolveUploadDir(ctx)), + addr: addr, + registryToken: regToken, + sso: sso, + blobRedirect: blobRedirect, + blobRedirectTTL: blobRedirectTTL, + reg: reg, + store: st, + router: mux.NewRouter(), + uploads: newUploadSessions(resolveUploadDir(ctx)), } s.setupRoutes(ctx) return s