Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions epoch-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ spec:
# (tmpfs in most clusters) and OOMs on multi-GiB pushes.
- name: EPOCH_UPLOAD_DIR
value: /var/cache/epoch/uploads
# Serve blob GETs as redirects to presigned object-store URLs so
# multi-GiB bytes flow client<->GCS directly instead of being
# proxied (and TLS-re-encrypted) through this pod.
- name: EPOCH_BLOB_REDIRECT
value: "true"
# Optional: absolute base URL clients reach the server at. Used
# to anchor the OCI WWW-Authenticate realm + /v2/token. Required
# when fronting epoch with a proxy that does NOT set
Expand Down Expand Up @@ -131,11 +136,11 @@ spec:
periodSeconds: 10
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: "1"
memory: 512Mi
memory: 1Gi
limits:
cpu: "3"
memory: 4Gi
volumeMounts:
- name: upload-spool
mountPath: /var/cache/epoch/uploads
Expand Down
10 changes: 10 additions & 0 deletions objectstore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ func (c *Client) Put(ctx context.Context, key string, body io.Reader, size int64
return nil
}

// PresignGet returns a time-limited URL that lets a client GET the object
// directly from the backing store, bypassing this process.
func (c *Client) PresignGet(ctx context.Context, key string, ttl time.Duration) (string, error) {
u, err := c.client.PresignedGetObject(ctx, c.cfg.Bucket, c.fullKey(key), ttl, nil)
if err != nil {
return "", fmt.Errorf("presign get %s: %w", key, err)
}
return u.String(), nil
}

// Get returns a streaming reader and size for the given key.
func (c *Client) Get(ctx context.Context, key string) (io.ReadCloser, int64, error) {
obj, err := c.client.GetObject(ctx, c.cfg.Bucket, c.fullKey(key), minio.GetObjectOptions{})
Expand Down
6 changes: 6 additions & 0 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (r *Registry) StreamBlob(ctx context.Context, digest string) (io.ReadCloser
return r.client.Get(ctx, blobKey(digest))
}

// PresignBlobGet returns a time-limited URL for fetching a blob directly from
// the object store, bypassing this process.
func (r *Registry) PresignBlobGet(ctx context.Context, digest string, ttl time.Duration) (string, error) {
return r.client.PresignGet(ctx, blobKey(digest), ttl)
}

// BlobSize returns the size of a blob in bytes.
func (r *Registry) BlobSize(ctx context.Context, digest string) (int64, error) {
return r.client.Head(ctx, blobKey(digest))
Expand Down
47 changes: 47 additions & 0 deletions server/registry_v2_blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,35 @@ import (
"io"
"net/http"
"strconv"
"time"

"github.com/projecteru2/core/log"

"github.com/cocoonstack/epoch/manifest"
)

const defaultBlobRedirectTTL = time.Hour

// resolveBlobRedirectTTL parses EPOCH_BLOB_REDIRECT_TTL, falling back to the
// default for empty, unparseable, or non-positive values.
func resolveBlobRedirectTTL(raw string) time.Duration {
if raw == "" {
return defaultBlobRedirectTTL
}
d, err := time.ParseDuration(raw)
if err != nil || d <= 0 {
return defaultBlobRedirectTTL
}
return d
}

func (s *Server) v2GetBlob(w http.ResponseWriter, r *http.Request) {
dgst := stripSHA256Prefix(urlVar(r, "digest"))

if s.blobRedirect && s.redirectBlob(w, r, dgst) {
return
}

body, size, err := s.reg.StreamBlob(r.Context(), dgst)
if err != nil {
if isNotFound(err) {
Expand All @@ -31,6 +53,31 @@ func (s *Server) v2GetBlob(w http.ResponseWriter, r *http.Request) {
_, _ = io.Copy(w, body)
}

// redirectBlob points the client at a presigned object-store URL so blob bytes
// flow directly from storage instead of being proxied through this process.
// Returns false without writing a response when the blob can't be redirected,
// so v2GetBlob falls back to streaming.
func (s *Server) redirectBlob(w http.ResponseWriter, r *http.Request, dgst string) bool {
logger := log.WithFunc("server.redirectBlob")
exists, err := s.reg.BlobExists(r.Context(), dgst)
if err != nil {
logger.Warnf(r.Context(), "blob exists check for %s failed, falling back to proxy: %v", dgst, err)
return false
}
if !exists {
v2Error(w, http.StatusNotFound, "BLOB_UNKNOWN", "blob not found")
return true
}
url, err := s.reg.PresignBlobGet(r.Context(), dgst, s.blobRedirectTTL)
if err != nil {
logger.Warnf(r.Context(), "presign blob %s failed, falling back to proxy: %v", dgst, err)
return false
}
w.Header().Set("Docker-Content-Digest", "sha256:"+dgst)
http.Redirect(w, r, url, http.StatusTemporaryRedirect)
return true
}

func (s *Server) v2HeadBlob(w http.ResponseWriter, r *http.Request) {
dgst := stripSHA256Prefix(urlVar(r, "digest"))

Expand Down
25 changes: 25 additions & 0 deletions server/registry_v2_blobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package server

import (
"testing"
"time"
)

func TestResolveBlobRedirectTTL(t *testing.T) {
cases := []struct {
raw string
want time.Duration
}{
{"", defaultBlobRedirectTTL},
{"30m", 30 * time.Minute},
{"2h", 2 * time.Hour},
{"garbage", defaultBlobRedirectTTL},
{"0", defaultBlobRedirectTTL},
{"-5m", defaultBlobRedirectTTL},
}
for _, c := range cases {
if got := resolveBlobRedirectTTL(c.raw); got != c.want {
t.Errorf("resolveBlobRedirectTTL(%q) = %s, want %s", c.raw, got, c.want)
}
}
}
30 changes: 20 additions & 10 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"os"
"strconv"
"time"

commonhttpx "github.com/cocoonstack/cocoon-common/httpx"
Expand All @@ -29,9 +30,11 @@ var _ http.ResponseWriter = (*responseWriter)(nil)

// Server is the Epoch HTTP server providing OCI Distribution and control plane APIs.
type Server struct {
addr string // config
registryToken string // config — Bearer token for /v2/ (empty = no token required)
sso *SSOConfig // config — nil = UI auth disabled
addr string // config
registryToken string // config — Bearer token for /v2/ (empty = no token required)
sso *SSOConfig // config — nil = UI auth disabled
blobRedirect bool // config — redirect blob GETs to presigned object-store URLs
blobRedirectTTL time.Duration

reg *registry.Registry // resources
store *store.Store // resources
Expand All @@ -54,14 +57,21 @@ func New(ctx context.Context, reg *registry.Registry, st *store.Store, addr stri
if regToken != "" {
logger.Info(ctx, "registry token auth enabled")
}
blobRedirect, _ := strconv.ParseBool(os.Getenv("EPOCH_BLOB_REDIRECT"))
blobRedirectTTL := resolveBlobRedirectTTL(os.Getenv("EPOCH_BLOB_REDIRECT_TTL"))
if blobRedirect {
logger.Infof(ctx, "blob redirect enabled, ttl=%s", blobRedirectTTL)
}
s := &Server{
addr: addr,
registryToken: regToken,
sso: sso,
reg: reg,
store: st,
router: mux.NewRouter(),
uploads: newUploadSessions(resolveUploadDir(ctx)),
addr: addr,
registryToken: regToken,
sso: sso,
blobRedirect: blobRedirect,
blobRedirectTTL: blobRedirectTTL,
reg: reg,
store: st,
router: mux.NewRouter(),
uploads: newUploadSessions(resolveUploadDir(ctx)),
}
s.setupRoutes(ctx)
return s
Expand Down