Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment: Improve concurrent merge performance by weakly owning branch updates #8268

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,9 @@ jobs:
name: Run latest lakeFS app on AWS S3
needs: [deploy-image, login-to-amazon-ecr]
runs-on: ubuntu-22.04
strategy:
matrix:
branch_ownership: [false, true]
env:
TAG: ${{ needs.deploy-image.outputs.tag }}
REPO: ${{ needs.login-to-amazon-ecr.outputs.registry }}
Expand Down Expand Up @@ -866,6 +869,7 @@ jobs:
LAKEFS_BLOCKSTORE_TYPE: s3
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.ESTI_AWS_ACCESS_KEY_ID }}
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }}
LAKEFS_GRAVELER_BRANCH_OWNERSHIP_ENABLED: ${{ matrix.branch_ownership }}
LAKEFS_DATABASE_TYPE: postgres
DOCKER_REG: ${{ needs.login-to-amazon-ecr.outputs.registry }}
ESTI_BLOCKSTORE_TYPE: s3
Expand Down
1 change: 0 additions & 1 deletion buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ plugins:
out: pkg
opt:
- paths=source_relative

139 changes: 139 additions & 0 deletions cmd/lakectl/cmd/abuse_merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package cmd

import (
"context"
"fmt"
"net/http"
"os"
"strconv"
"syscall"
"time"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/api/helpers"
"github.com/treeverse/lakefs/pkg/testutil/stress"
"github.com/treeverse/lakefs/pkg/uri"
)

var abuseMergeCmd = &cobra.Command{
Use: "merge <branch URI>",
Short: "Merge non-conflicting objects to the source branch in parallel",
Hidden: false,
Args: cobra.ExactArgs(1),
ValidArgsFunction: ValidArgsRepository,
Run: func(cmd *cobra.Command, args []string) {
u := MustParseBranchURI("branch URI", args[0])
amount := Must(cmd.Flags().GetInt("amount"))
parallelism := Must(cmd.Flags().GetInt("parallelism"))

fmt.Println("Source branch: ", u)

generator := stress.NewGenerator("merge", parallelism, stress.WithSignalHandlersFor(os.Interrupt, syscall.SIGTERM))

// generate randomly selected keys as input
generator.Setup(func(add stress.GeneratorAddFn) {
for i := 0; i < amount; i++ {
add(strconv.Itoa(i + 1))
}
})

client := getClient()
resp, err := client.GetRepositoryWithResponse(cmd.Context(), u.Repository)
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK)
if resp.JSON200 == nil {
DieFmt("Bad response from server: %+v", resp)
}

ctx := cmd.Context()

// execute ALL the things!
generator.Run(func(input chan string, output chan stress.Result) {
client := getClient()
for work := range input {
start := time.Now()
err := mergeSomething(ctx, client, u, work)
output <- stress.Result{
Error: err,
Took: time.Since(start),
}
// Don't block or sleep to maximise parallel load.
}
})
},
}

func mergeSomething(ctx context.Context, client *apigen.ClientWithResponses, base *uri.URI, name string) error {
createBranchResponse, err := client.CreateBranchWithResponse(ctx, base.Repository,
apigen.CreateBranchJSONRequestBody{
Name: name,
Source: base.Ref,
},
)
if err != nil || !apiutil.IsStatusCodeOK(createBranchResponse.StatusCode()) {
if err == nil {
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
err = helpers.ResponseAsError(createBranchResponse)
}
return fmt.Errorf("create branch %s: %w", name, err)
}

u := base.WithRef(name)
// Use a different name on each branch, to avoid conflicts.
path := fmt.Sprintf("object-%s", name)
u.Path = &path

getResponse, err := client.GetPhysicalAddressWithResponse(ctx, u.Repository, u.Ref, &apigen.GetPhysicalAddressParams{Path: *u.Path})
if err != nil || getResponse.JSON200 == nil {
if err == nil {
err = helpers.ResponseAsError(getResponse)
}
return fmt.Errorf("get physical address for %s: %w", name, err)
}
// Link the object but do not actually upload anything - it is not
// important for merging, and would only reduce load.
stagingLocation := getResponse.JSON200
linkResponse, err := client.LinkPhysicalAddressWithResponse(ctx, u.Repository, u.Ref,
&apigen.LinkPhysicalAddressParams{
Path: *u.Path,
},
apigen.LinkPhysicalAddressJSONRequestBody{
Checksum: "deadbeef0000cafe",
Staging: apigen.StagingLocation{
PhysicalAddress: stagingLocation.PhysicalAddress,
},
UserMetadata: nil,
})
if err != nil || linkResponse.JSON200 == nil {
if err == nil {
err = helpers.ResponseAsError(linkResponse)
}
return fmt.Errorf("link physical address for %s: %w", name, err)
}

commitResponse, err := client.CommitWithResponse(ctx, u.Repository, u.Ref, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{Message: fmt.Sprintf("commit %s", name)})
if err != nil || commitResponse.JSON201 == nil {
if err == nil {
err = helpers.ResponseAsError(commitResponse)
}
return fmt.Errorf("commit for %s: %w", name, err)
}

mergeResponse, err := client.MergeIntoBranchWithResponse(ctx, u.Repository, u.Ref, base.Ref, apigen.MergeIntoBranchJSONRequestBody{})
if err != nil || mergeResponse.JSON200 == nil {
if err == nil {
err = helpers.ResponseAsError(mergeResponse)
}
return fmt.Errorf("merge from %s: %w", name, err)
}

return nil
}

//nolint:gochecknoinits,mnd
func init() {
abuseMergeCmd.Flags().Int("amount", 1000, "amount of merges to perform")
abuseMergeCmd.Flags().Int("parallelism", abuseDefaultParallelism, "number of merges to perform in parallel")

abuseCmd.AddCommand(abuseMergeCmd)
}
19 changes: 19 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -3031,6 +3031,25 @@ lakectl abuse list <source ref URI> [flags]



### lakectl abuse merge

Merge non-conflicting objects to the source branch in parallel

```
lakectl abuse merge <branch URI> [flags]
```

#### Options
{:.no_toc}

```
--amount int amount of merges to perform (default 1000)
-h, --help help for merge
--parallelism int number of merges to perform in parallel (default 100)
```



### lakectl abuse random-delete

Delete keys from a file and generate random delete from the source ref for those keys.
Expand Down
26 changes: 19 additions & 7 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,17 @@ func (c *ctxCloser) Close() error {
return nil
}

func makeWeakBranchOwnershipParams(cfg config.WeakOwnership) ref.WeakBranchOwnershipParams {
if !cfg.Enabled {
// zero Durations => no branch ownership
return ref.WeakBranchOwnershipParams{}
}
return ref.WeakBranchOwnershipParams{
AcquireInterval: cfg.Acquire,
RefreshInterval: cfg.Refresh,
}
}

func New(ctx context.Context, cfg Config) (*Catalog, error) {
ctx, cancelFn := context.WithCancel(ctx)
adapter, err := factory.BuildBlockAdapter(ctx, nil, cfg.Config)
Expand Down Expand Up @@ -364,13 +375,14 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
addressProvider := ident.NewHexAddressProvider()
refManager := ref.NewRefManager(
ref.ManagerConfig{
Executor: executor,
KVStore: cfg.KVStore,
KVStoreLimited: storeLimiter,
AddressProvider: addressProvider,
RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache),
MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay,
Executor: executor,
KVStore: cfg.KVStore,
KVStoreLimited: storeLimiter,
AddressProvider: addressProvider,
RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache),
CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache),
MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay,
WeakBranchOwnershipParams: makeWeakBranchOwnershipParams(cfg.Config.Graveler.BranchOwnership),
})
gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, cfg.Config.Committed.BlockStoragePrefix)
settingManager := settings.NewManager(refManager, cfg.KVStore)
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ type Database struct {
} `mapstructure:"cosmosdb"`
}

// WeakOwnership configures an approximate (weak) ownership.
type WeakOwnership struct {
Enabled bool `mapstructure:"enabled"`
Refresh time.Duration `mapstructure:"refresh"`
Acquire time.Duration `mapstructure:"acquire"`
}

// Config - Output struct of configuration, used to validate. If you read a key using a viper accessor
// rather than accessing a field of this struct, that key will *not* be validated. So don't
// do that.
Expand Down Expand Up @@ -330,6 +337,13 @@ type Config struct {
RateLimit int `mapstructure:"rate_limit"`
} `mapstructure:"background"`
MaxBatchDelay time.Duration `mapstructure:"max_batch_delay"`
// Parameters for tuning performance of concurrent branch
// update operations. These do not affect correctness or
// liveness. Internally this is "*weak* branch ownership"
// because this ownership may safely fail. This distinction
// is unimportant during configuration, so use a shorter
// name.
BranchOwnership WeakOwnership `mapstructure:"branch_ownership"`
} `mapstructure:"graveler"`
Gateways struct {
S3 struct {
Expand Down
17 changes: 17 additions & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,23 @@ func setDefaults(cfgType string) {
// 3ms of delay with ~300 requests/second per resource sounds like a reasonable tradeoff.
viper.SetDefault("graveler.max_batch_delay", 3*time.Millisecond)

viper.SetDefault("graveler.branch_ownership.enabled", false)
// ... but if branch ownership is enabled, set up some useful defaults!

// The single concurrent branch updater has these requirements from
// KV with these settings:
//
// - Cleanly acquiring ownership performs 1 read operation and 1
// write operation.
//
// - While ownership is held, add 2.5 read and 2.5 write operation
// per second, and an additional ~7 read and write operations
// per second per branch operation waiting to acquire ownership.

// See comments on WeakOwner for how to compute these numbers.
viper.SetDefault("graveler.branch_ownership.refresh", 400*time.Millisecond)
viper.SetDefault("graveler.branch_ownership.acquire", 150*time.Millisecond)

viper.SetDefault("ugc.prepare_interval", time.Minute)
viper.SetDefault("ugc.prepare_max_file_size", 20*1024*1024)

Expand Down
74 changes: 67 additions & 7 deletions pkg/graveler/ref/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/treeverse/lakefs/pkg/batch"
"github.com/treeverse/lakefs/pkg/cache"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/httputil"
"github.com/treeverse/lakefs/pkg/ident"
"github.com/treeverse/lakefs/pkg/kv"
"github.com/treeverse/lakefs/pkg/kv/util"
"github.com/treeverse/lakefs/pkg/logging"
)

Expand All @@ -37,6 +39,7 @@ type Manager struct {
repoCache cache.Cache
commitCache cache.Cache
maxBatchDelay time.Duration
branchOwnership *util.WeakOwner
}

func branchFromProto(pb *graveler.BranchData) *graveler.Branch {
Expand Down Expand Up @@ -66,17 +69,56 @@ func protoFromBranch(branchID graveler.BranchID, b *graveler.Branch) *graveler.B
return branch
}

// WeakBranchOwnershipParams configures weak ownership of branches. Branch
// correctness is safe _regardless_ of the values of these parameters. They
// exist solely to reduce expensive operations when multiple concurrent
// updates race on the same branch. Only one update can win a race, and
// ownership prevents others from interfering by consuming resources on the
// instance.
type WeakBranchOwnershipParams struct {
// AcquireInterval is the interval at which to attempt to acquire
// ownership of a branch. It is a bound on the latency of the time
// for one worker to acquire a branch when multiple operations race
// on that branch. Reducing it increases read load on the branch
// ownership record when concurrent operations occur.
AcquireInterval time.Duration
// RefreshInterval the interval for which to assert ownership of a
// branch. It is a bound on the time to perform an operation on a
// branch IF a previous worker crashed while owning that branch. It
// has no effect when there are no crashes. Reducing it increases
// write load on the branch ownership record when concurrent
// operations occur.
//
// If zero or negative, ownership will not be asserted and branch
// operations will race. This is safe but can be slow.
RefreshInterval time.Duration
}

type ManagerConfig struct {
Executor batch.Batcher
KVStore kv.Store
KVStoreLimited kv.Store
AddressProvider ident.AddressProvider
RepositoryCacheConfig CacheConfig
CommitCacheConfig CacheConfig
MaxBatchDelay time.Duration
Executor batch.Batcher
KVStore kv.Store
KVStoreLimited kv.Store
AddressProvider ident.AddressProvider
RepositoryCacheConfig CacheConfig
CommitCacheConfig CacheConfig
MaxBatchDelay time.Duration
WeakBranchOwnershipParams WeakBranchOwnershipParams
}

func NewRefManager(cfg ManagerConfig) *Manager {
var branchOwnership *util.WeakOwner
if cfg.WeakBranchOwnershipParams.RefreshInterval > 0 {
log := logging.ContextUnavailable().WithField("component", "RefManager weak branch ownership")
branchOwnership = util.NewWeakOwner(
log,
cfg.KVStore,
"run-refs/weak-branch-owner",
cfg.WeakBranchOwnershipParams.AcquireInterval,
cfg.WeakBranchOwnershipParams.RefreshInterval,
)
log.Info("Initialized")
}

return &Manager{
kvStore: cfg.KVStore,
kvStoreLimited: cfg.KVStoreLimited,
Expand All @@ -85,6 +127,7 @@ func NewRefManager(cfg ManagerConfig) *Manager {
repoCache: newCache(cfg.RepositoryCacheConfig),
commitCache: newCache(cfg.CommitCacheConfig),
maxBatchDelay: cfg.MaxBatchDelay,
branchOwnership: branchOwnership,
}
}

Expand Down Expand Up @@ -395,6 +438,23 @@ func (m *Manager) SetBranch(ctx context.Context, repository *graveler.Repository
}

func (m *Manager) BranchUpdate(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, f graveler.BranchUpdateFunc) error {
// TODO(ariels): Get request ID in a nicer way.
requestIDPtr := httputil.RequestIDFromContext(ctx)
// Grab ownership if configured. Also check we actually have a
// request-ID on the request. (lakeFS middleware should *always*
// place a request ID anyways.)
if m.branchOwnership != nil && requestIDPtr != nil {
requestID := *requestIDPtr
release, err := m.branchOwnership.Own(ctx, requestID, string(branchID))
if err != nil {
logging.FromContext(ctx).
WithFields(logging.Fields{}).
WithError(err).
Warn("Failed to get ownership on branch; continuing but may be slow")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you check the error value? For instance, I don't think we should continue the update flow for context cancellation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? An error like that will keep happening, and the very next operation on the context will fail. I prefer not to complicate this code with handling of context cancellation (which should really examine either err or if ctx.Done() { ...ctx.Err()...}`). All this code does is emit a log

... error=start owning key... context cancelled... message=Failed to get ownership on branch; continuing but may be slow

which will be followed by a log somewhat like

... error=get item: context cancelled... message=...

I don't think it's worth adding confusion here.

} else {
defer release()
}
}
b, pred, err := m.getBranchWithPredicate(ctx, repository, branchID)
if err != nil {
return err
Expand Down
Loading
Loading