Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): AWS backend using thanos.io/objstore #11221

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,37 @@
package azure

import (
"net/http"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/azure"
yaml "gopkg.in/yaml.v2"
)

func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
bucketConfig := azure.Config{
StorageAccountName: cfg.StorageAccountName,
StorageAccountKey: cfg.StorageAccountKey.String(),
StorageConnectionString: cfg.ConnectionString.String(),
ContainerName: cfg.ContainerName,
Endpoint: cfg.EndpointSuffix,
MaxRetries: cfg.MaxRetries,
HTTPConfig: azure.HTTPConfig{
IdleConnTimeout: model.Duration(cfg.IdleConnTimeout),
ResponseHeaderTimeout: model.Duration(cfg.ResponseHeaderTimeout),
InsecureSkipVerify: cfg.InsecureSkipVerify,
TLSHandshakeTimeout: model.Duration(cfg.TLSHandshakeTimeout),
ExpectContinueTimeout: model.Duration(cfg.ExpectContinueTimeout),
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.MaxConnsPerHost,
},
return newBucketClient(cfg, name, logger, azure.NewBucketWithConfig)
}

func newBucketClient(cfg Config, name string, logger log.Logger, factory func(log.Logger, azure.Config, string, http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {
// Start with default config to make sure that all parameters are set to sensible values, especially
// HTTP Config field.
bucketConfig := azure.DefaultConfig
bucketConfig.StorageAccountName = cfg.StorageAccountName
bucketConfig.StorageAccountKey = cfg.StorageAccountKey.String()
bucketConfig.StorageConnectionString = cfg.StorageConnectionString.String()
bucketConfig.ContainerName = cfg.ContainerName
bucketConfig.MaxRetries = cfg.MaxRetries
bucketConfig.UserAssignedID = cfg.UserAssignedID

if cfg.Endpoint != "" {
// azure.DefaultConfig has the default Endpoint, overwrite it only if a different one was explicitly provided.
bucketConfig.Endpoint = cfg.Endpoint
}

// Thanos currently doesn't support passing the config as is, but expects a YAML,
// so we're going to serialize it.
serialized, err := yaml.Marshal(bucketConfig)
if err != nil {
return nil, err
var rt http.RoundTripper
if cfg.Transport != nil {
rt = cfg.Transport
}

return azure.NewBucket(logger, serialized, name, nil)
return factory(logger, bucketConfig, name, rt)
}
29 changes: 15 additions & 14 deletions pkg/storage/bucket/azure/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package azure

import (
"flag"
"net/http"

"github.com/grafana/dskit/flagext"

"github.com/grafana/loki/v3/pkg/storage/bucket/http"
)

// Config holds the config options for an Azure backend
type Config struct {
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
ConnectionString flagext.Secret `yaml:"connection_string"`
ContainerName string `yaml:"container_name"`
EndpointSuffix string `yaml:"endpoint_suffix"`
MaxRetries int `yaml:"max_retries"`
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
StorageConnectionString flagext.Secret `yaml:"connection_string"`
ContainerName string `yaml:"container_name"`
Endpoint string `yaml:"endpoint_suffix"`
MaxRetries int `yaml:"max_retries"`
UserAssignedID string `yaml:"user_assigned_id"`

http.Config `yaml:"http"`
// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
}

// RegisterFlags registers the flags for Azure storage
Expand All @@ -28,10 +29,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers the flags for Azure storage
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key")
f.Var(&cfg.ConnectionString, prefix+"azure.connection-string", "If `connection-string` is set, the values of `account-name` and `endpoint-suffix` values will not be used. Use this method over `account-key` if you need to authenticate via a SAS token. Or if you use the Azurite emulator.")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "loki", "Azure storage container name")
f.StringVar(&cfg.EndpointSuffix, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key. If unset, Azure managed identities will be used for authentication instead.")
f.Var(&cfg.StorageConnectionString, prefix+"azure.connection-string", "If `connection-string` is set, the value of `endpoint-suffix` will not be used. Use this method over `account-key` if you need to authenticate via a SAS token. Or if you use the Azurite emulator.")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "", "Azure storage container name")
f.StringVar(&cfg.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN. If set to empty string, default endpoint suffix is used.")
f.IntVar(&cfg.MaxRetries, prefix+"azure.max-retries", 20, "Number of retries for recoverable errors")
cfg.Config.RegisterFlagsWithPrefix(prefix+"azure.", f)
f.StringVar(&cfg.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned managed identity. If empty, then System assigned identity is used.")
}
98 changes: 0 additions & 98 deletions pkg/storage/bucket/azure/config_test.go

This file was deleted.

143 changes: 143 additions & 0 deletions pkg/storage/bucket/object_client_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package bucket

import (
"context"
"io"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/thanos-io/objstore"

"github.com/grafana/loki/v3/pkg/storage/chunk/client"
)

type ObjectClientAdapter struct {
bucket, hedgedBucket objstore.Bucket
logger log.Logger
isRetryableErr func(err error) bool
}

func NewObjectClientAdapter(bucket, hedgedBucket objstore.Bucket, logger log.Logger) *ObjectClientAdapter {
if hedgedBucket == nil {
hedgedBucket = bucket
}

return &ObjectClientAdapter{
bucket: bucket,
hedgedBucket: hedgedBucket,
logger: log.With(logger, "component", "bucket_to_object_client_adapter"),
// default to no retryable errors. Override with WithRetryableErrFunc
isRetryableErr: func(_ error) bool {
return false
},
}
}

func WithRetryableErrFunc(f func(err error) bool) func(*ObjectClientAdapter) {
return func(o *ObjectClientAdapter) {
o.isRetryableErr = f
}
}

func (o *ObjectClientAdapter) Stop() {
}

// ObjectExists checks if a given objectKey exists in the bucket
func (o *ObjectClientAdapter) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return o.bucket.Exists(ctx, objectKey)
}

// GetAttributes returns the attributes of the specified object key from the configured bucket.
func (o *ObjectClientAdapter) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
attr := client.ObjectAttributes{}
thanosAttr, err := o.hedgedBucket.Attributes(ctx, objectKey)
if err != nil {
return attr, err
}

attr.Size = thanosAttr.Size
return attr, nil
}

// PutObject puts the specified bytes into the configured bucket at the provided key
func (o *ObjectClientAdapter) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return o.bucket.Upload(ctx, objectKey, object)
}

// GetObject returns a reader and the size for the specified object key from the configured bucket.
// size is set to -1 if it cannot be succefully determined, it is up to the caller to check this value before using it.
func (o *ObjectClientAdapter) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := o.hedgedBucket.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

size, err := objstore.TryToGetSize(reader)
if err != nil {
size = -1
level.Warn(o.logger).Log("msg", "failed to get size of object", "err", err)
}

return reader, size, err
}

func (o *ObjectClientAdapter) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
return o.hedgedBucket.GetRange(ctx, objectKey, offset, length)
}

// List objects with given prefix.
func (o *ObjectClientAdapter) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
var iterParams []objstore.IterOption

// If delimiter is empty we want to list all files
if delimiter == "" {
iterParams = append(iterParams, objstore.WithRecursiveIter)
}

err := o.bucket.Iter(ctx, prefix, func(objectKey string) error {
// CommonPrefixes are keys that have the prefix and have the delimiter
// as a suffix
if delimiter != "" && strings.HasSuffix(objectKey, delimiter) {
commonPrefixes = append(commonPrefixes, client.StorageCommonPrefix(objectKey))
return nil
}

// TODO: remove this once thanos support IterWithAttributes
attr, err := o.bucket.Attributes(ctx, objectKey)
if err != nil {
return errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}

storageObjects = append(storageObjects, client.StorageObject{
Key: objectKey,
ModifiedAt: attr.LastModified,
})

return nil
}, iterParams...)
if err != nil {
return nil, nil, err
}

return storageObjects, commonPrefixes, nil
}

// DeleteObject deletes the specified object key from the configured bucket.
func (o *ObjectClientAdapter) DeleteObject(ctx context.Context, objectKey string) error {
return o.bucket.Delete(ctx, objectKey)
}

// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
func (o *ObjectClientAdapter) IsObjectNotFoundErr(err error) bool {
return o.bucket.IsObjNotFoundErr(err)
}

// IsRetryableErr returns true if the request failed due to some retryable server-side scenario
func (o *ObjectClientAdapter) IsRetryableErr(err error) bool {
return o.isRetryableErr(err)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package gcp
package bucket

import (
"bytes"
Expand All @@ -12,7 +12,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
)

func TestGCSThanosObjStore_List(t *testing.T) {
func TestObjectClientAdapter_List(t *testing.T) {
tests := []struct {
name string
prefix string
Expand Down Expand Up @@ -95,10 +95,10 @@ func TestGCSThanosObjStore_List(t *testing.T) {
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/b", buff))
require.NoError(t, newBucket.Upload(context.Background(), "depply/nested/folder/c", buff))

gcpClient := &GCSThanosObjectClient{}
gcpClient.client = newBucket
client := NewObjectClientAdapter(newBucket, nil, nil)
client.bucket = newBucket

storageObj, storageCommonPref, err := gcpClient.List(context.Background(), tt.prefix, tt.delimiter)
storageObj, storageCommonPref, err := client.List(context.Background(), tt.prefix, tt.delimiter)
if tt.wantErr != nil {
require.Equal(t, tt.wantErr.Error(), err.Error())
continue
Expand Down
Loading
Loading