Skip to content

Commit

Permalink
feat(blobs): http range
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim authored and t1mt committed Sep 3, 2024
1 parent af8ab58 commit 00e2515
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 27 deletions.
6 changes: 6 additions & 0 deletions internal/mux/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ type ResponseWriter interface {
Size() int64
}

func NewResponseWriter(w http.ResponseWriter) ResponseWriter {

Check failure on line 18 in internal/mux/response.go

View workflow job for this annotation

GitHub Actions / lint

NewResponseWriter returns interface (github.com/spegel-org/spegel/internal/mux.ResponseWriter) (ireturn)
return &response{
ResponseWriter: w,
}
}

var (
_ http.ResponseWriter = &response{}
_ http.Flusher = &response{}
Expand Down
14 changes: 6 additions & 8 deletions pkg/oci/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (c *Containerd) GetManifest(ctx context.Context, dgst digest.Digest) ([]byt
return b, mt, nil
}

func (c *Containerd) GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
func (c *Containerd) GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
if c.contentPath != "" {
path := filepath.Join(c.contentPath, "blobs", dgst.Algorithm().String(), dgst.Encoded())
file, err := os.Open(path)
Expand All @@ -270,13 +270,11 @@ func (c *Containerd) GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadCl
if err != nil {
return nil, err
}
return struct {
io.Reader
io.Closer
}{
Reader: content.NewReader(ra),
Closer: ra,
}, nil
rs, err := newHTTPReadSeeker(logr.FromContextOrDiscard(ctx), ra)
if err != nil {
return nil, err
}
return rs, nil
}

func getEventImage(e typeurl.Any) (string, EventType, error) {
Expand Down
152 changes: 152 additions & 0 deletions pkg/oci/httpreadseeker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package oci

import (
"io"

"github.com/containerd/containerd/content"
"github.com/containerd/errdefs"
"github.com/go-logr/logr"
"github.com/pkg/errors"
)

const maxRetry = 3

type httpReadSeeker struct {

Check failure on line 14 in pkg/oci/httpreadseeker.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct with 64 pointer bytes could be 32 (govet)
size int64
offset int64
rc content.ReaderAt
closed bool

errsWithNoProgress int
log logr.Logger
}

var _ io.ReadSeekCloser = (*httpReadSeeker)(nil)

func newHTTPReadSeeker(log logr.Logger, reader content.ReaderAt) (io.ReadSeekCloser, error) {
if reader == nil {
return nil, errors.Errorf("httpReadSeeker: reader cannot be nil")
}
return &httpReadSeeker{
log: log,
rc: reader,
size: reader.Size(),
}, nil
}

func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
if hrs.closed {
return 0, io.EOF
}

n, err = hrs.rc.ReadAt(p, hrs.offset)
hrs.offset += int64(n)
if n > 0 || err == nil {
hrs.errsWithNoProgress = 0
}
if err == io.ErrUnexpectedEOF {
// connection closed unexpectedly. try reconnecting.
if n == 0 {
hrs.errsWithNoProgress++
if hrs.errsWithNoProgress > maxRetry {
return // too many retries for this offset with no progress
}
}
if hrs.rc != nil {
if clsErr := hrs.rc.Close(); clsErr != nil {
hrs.log.Error(clsErr, "httpReadSeeker: failed to close ReadCloser")
}
hrs.rc = nil
}

} else if err == io.EOF {
// The CRI's imagePullProgressTimeout relies on responseBody.Close to
// update the process monitor's status. If the err is io.EOF, close
// the connection since there is no more available data.
if hrs.rc != nil {
if clsErr := hrs.rc.Close(); clsErr != nil {
hrs.log.Error(clsErr, "httpReadSeeker: failed to close ReadCloser after io.EOF")
}
hrs.rc = nil
}
}
return
}

func (hrs *httpReadSeeker) Close() error {
if hrs.closed {
return nil
}
hrs.closed = true
if hrs.rc != nil {
return hrs.rc.Close()
}

return nil
}

func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
if hrs.closed {
return 0, errors.Errorf("Fetcher.Seek: closed: %v", errdefs.ErrUnavailable)
}

abs := hrs.offset
switch whence {
case io.SeekStart:
abs = offset
case io.SeekCurrent:
abs += offset
case io.SeekEnd:
if hrs.size == -1 {
return 0, errors.Errorf("Fetcher.Seek: unknown size, cannot seek from end: %v", errdefs.ErrUnavailable)
}
abs = hrs.size + offset
default:
return 0, errors.Errorf("Fetcher.Seek: invalid whence: %v", errdefs.ErrInvalidArgument)
}

if abs < 0 {
return 0, errors.Errorf("Fetcher.Seek: negative offset: %v", errdefs.ErrInvalidArgument)
}

if abs != hrs.offset {
if hrs.rc != nil {
if err := hrs.rc.Close(); err != nil {
hrs.log.Error(err, "Fetcher.Seek: failed to close ReadCloser")
}

hrs.rc = nil
}

hrs.offset = abs
}

return hrs.offset, nil
}

func (hrs *httpReadSeeker) Seek2(offset int64, whence int) (int64, error) {
if hrs.closed {
return 0, errors.Errorf("Seek closed: %v", errdefs.ErrUnavailable)
}

var err error
newOffset := hrs.offset

switch whence {
case io.SeekCurrent:
newOffset += offset
case io.SeekEnd:
newOffset = hrs.size + offset
case io.SeekStart:
newOffset = offset
}

if newOffset < 0 {
err = errors.Errorf("cannot seek to negative position")
} else {
// No problems, set the offset.
hrs.offset = newOffset
}

return hrs.offset, err
}
12 changes: 9 additions & 3 deletions pkg/oci/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,22 @@ func (m *Memory) GetManifest(ctx context.Context, dgst digest.Digest) ([]byte, s
return b, mt, nil
}

func (m *Memory) GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
func (m *Memory) GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
m.mx.RLock()
defer m.mx.RUnlock()

b, ok := m.blobs[dgst]
if !ok {
return nil, errors.Join(ErrNotFound, fmt.Errorf("blob with digest %s not found", dgst))
}
rc := io.NopCloser(bytes.NewBuffer(b))
return rc, nil
rc := bytes.NewReader(b)
return struct {
io.ReadSeeker
io.Closer
}{
ReadSeeker: rc,
Closer: io.NopCloser(rc),
}, nil
}

func (m *Memory) AddImage(img Image) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Client interface {

// GetBlob returns a stream of the blob content for the given digest.
// Will return ErrNotFound if the digest cannot be found.
GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error)
GetBlob(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error)
}

type UnknownDocument struct {
Expand Down
10 changes: 3 additions & 7 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
Expand Down Expand Up @@ -345,7 +344,7 @@ func (r *Registry) handleBlob(rw mux.ResponseWriter, req *http.Request, ref refe
if req.Method == http.MethodHead {
return
}
var w io.Writer = rw
var w mux.ResponseWriter = rw
if r.throttler != nil {
w = r.throttler.Writer(rw)
}
Expand All @@ -355,11 +354,8 @@ func (r *Registry) handleBlob(rw mux.ResponseWriter, req *http.Request, ref refe
return
}
defer rc.Close()
_, err = io.Copy(w, rc)
if err != nil {
r.log.Error(err, "error occurred when copying blob")
return
}

http.ServeContent(w, req, ref.dgst.String(), time.Time{}, rc)
}

func (r *Registry) isExternalRequest(req *http.Request) bool {
Expand Down
12 changes: 6 additions & 6 deletions pkg/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package throttle

import (
"fmt"
"io"
"time"

"github.com/spegel-org/spegel/internal/mux"
"golang.org/x/time/rate"
)

Expand All @@ -22,20 +22,20 @@ func NewThrottler(br Byterate) *Throttler {
}
}

func (t *Throttler) Writer(w io.Writer) io.Writer {
func (t *Throttler) Writer(w mux.ResponseWriter) mux.ResponseWriter {

Check failure on line 25 in pkg/throttle/throttle.go

View workflow job for this annotation

GitHub Actions / lint

Writer returns interface (github.com/spegel-org/spegel/internal/mux.ResponseWriter) (ireturn)
return &writer{
limiter: t.limiter,
writer: w,
limiter: t.limiter,
ResponseWriter: w,
}
}

type writer struct {
mux.ResponseWriter
limiter *rate.Limiter
writer io.Writer
}

func (w *writer) Write(p []byte) (int, error) {
n, err := w.writer.Write(p)
n, err := w.ResponseWriter.Write(p)
if err != nil {
return 0, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/throttle/throttle_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package throttle

import (
"bytes"
"net/http/httptest"
"testing"
"time"

"github.com/spegel-org/spegel/internal/mux"
"github.com/stretchr/testify/require"
)

Expand All @@ -13,7 +14,7 @@ func TestThrottler(t *testing.T) {

br := 500 * Bps
throttler := NewThrottler(br)
w := throttler.Writer(bytes.NewBuffer([]byte{}))
w := throttler.Writer(mux.NewResponseWriter(httptest.NewRecorder()))
chunkSize := 100
start := time.Now()
for i := 0; i < 10; i++ {
Expand Down

0 comments on commit 00e2515

Please sign in to comment.