Skip to content

Commit

Permalink
GC: Read commit explicitly when it is missing from prescanned commits (
Browse files Browse the repository at this point in the history
…#8282)

* GC: Read commit explicitly when it is missing from prescanned commits

The issue describes why this is a good idea.

Fixes #8261.

* Add CommitsMap test

* make gofmt

* [CR] Observe number of commits map misses

Observing as a log for now, not a metric, as it is unclear what an
interesting aggregation would be.  OTOH logs are good to understand how this
behaves.
  • Loading branch information
arielshaqed authored Oct 14, 2024
1 parent bb9315c commit 43a1c9b
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 38 deletions.
129 changes: 95 additions & 34 deletions pkg/graveler/retention/active_commits.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package retention

import (
"context"
"errors"
"fmt"
"time"

"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/logging"
)

type CommitNode struct {
Expand All @@ -15,52 +15,115 @@ type CommitNode struct {
MetaRangeID graveler.MetaRangeID
}

func NewCommitNode(creationDate time.Time, mainParent graveler.CommitID, metaRangeID graveler.MetaRangeID) CommitNode {
// CommitsMap is an immutable cache of commits. Each commit can be set
// once, and will be read if needed. It is *not* thread-safe.
type CommitsMap struct {
ctx context.Context
Log logging.Logger
NumMisses int64
CommitGetter RepositoryCommitGetter
Map map[graveler.CommitID]CommitNode
}

func NewCommitsMap(ctx context.Context, commitGetter RepositoryCommitGetter) (CommitsMap, error) {
initialMap := make(map[graveler.CommitID]CommitNode)
it, err := commitGetter.List(ctx)
if err != nil {
return CommitsMap{}, fmt.Errorf("list existing commits into map: %w", err)
}
defer it.Close()
for it.Next() {
commit := it.Value()
initialMap[commit.CommitID] = nodeFromCommit(commit.Commit)
}
return CommitsMap{
ctx: ctx,
Log: logging.FromContext(ctx),
NumMisses: int64(0),
CommitGetter: commitGetter,
Map: initialMap,
}, nil
}

// Set sets a commit. It will not be looked up again in CommitGetter.
func (c *CommitsMap) Set(id graveler.CommitID, node CommitNode) {
c.Map[id] = node
}

// Get gets a commit. If the commit has not been Set it uses CommitGetter
// to read it.
func (c *CommitsMap) Get(id graveler.CommitID) (CommitNode, error) {
ret, ok := c.Map[id]
if ok {
return ret, nil
}
// Unlikely: id raced with initial bunch of Sets.
commit, err := c.CommitGetter.Get(c.ctx, id)
if err != nil {
return CommitNode{}, fmt.Errorf("get missing commit ID %s: %w", id, err)
}
ret = nodeFromCommit(commit)
c.Map[id] = ret
c.NumMisses++
c.Log.WithFields(logging.Fields{
"commit_id": id,
"created": ret.CreationDate,
"age": time.Since(ret.CreationDate),
}).Warn("Loaded single commit, probably new")
return ret, nil
}

// GetMap returns the entire map of commits. It is probably incorrect to
// modify it.
func (c *CommitsMap) GetMap() map[graveler.CommitID]CommitNode {
return c.Map
}

// nodeFromCommit returns a new CommitNode for a Commit.
func nodeFromCommit(commit *graveler.Commit) CommitNode {
var mainParent graveler.CommitID
if len(commit.Parents) > 0 {
// every branch retains only its main ancestry, acquired by recursively taking the first parent:
mainParent = commit.Parents[0]
if commit.Version < graveler.CommitVersionParentSwitch {
mainParent = commit.Parents[len(commit.Parents)-1]
}
}
return CommitNode{
CreationDate: creationDate,
CreationDate: commit.CreationDate,
MainParent: mainParent,
MetaRangeID: metaRangeID,
MetaRangeID: commit.MetaRangeID,
}
}

var ErrCommitNotFound = errors.New("commit not found")

// GetGarbageCollectionCommits returns the sets of active commits, according to the repository's garbage collection rules.
// See https://github.com/treeverse/lakeFS/issues/1932 for more details.
// Upon completion, the given startingPointIterator is closed.
func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCStartingPointIterator, commitGetter *RepositoryCommitGetter, rules *graveler.GarbageCollectionRules) (map[graveler.CommitID]graveler.MetaRangeID, error) {
func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCStartingPointIterator, commitGetter RepositoryCommitGetter, rules *graveler.GarbageCollectionRules) (map[graveler.CommitID]graveler.MetaRangeID, error) {
// From each starting point in the given startingPointIterator, it iterates through its main ancestry.
// All commits reached are added to the active set, until and including the first commit performed before the start of the retention period.
processed := make(map[graveler.CommitID]time.Time)
activeMap := make(map[graveler.CommitID]struct{})

commitsIterator, err := commitGetter.ListCommits(ctx)
commitsMap, err := NewCommitsMap(ctx, commitGetter)
if err != nil {
return nil, err
}
commitsMap := make(map[graveler.CommitID]CommitNode)
defer commitsIterator.Close()
for commitsIterator.Next() {
commitRecord := commitsIterator.Value()
var mainParent graveler.CommitID
if len(commitRecord.Commit.Parents) > 0 {
// every branch retains only its main ancestry, acquired by recursively taking the first parent:
mainParent = commitRecord.Commit.Parents[0]
if commitRecord.Commit.Version < graveler.CommitVersionParentSwitch {
mainParent = commitRecord.Commit.Parents[len(commitRecord.Commit.Parents)-1]
}
}
commitsMap[commitRecord.CommitID] = NewCommitNode(commitRecord.Commit.CreationDate, mainParent, commitRecord.MetaRangeID)
return nil, fmt.Errorf("initial read commits: %w", err)
}
// Observe NumMisses. This should not be a metric unless we see it happen a _lot_.
defer func() {
logging.FromContext(ctx).
WithField("num_misses", commitsMap.NumMisses).
Info("Commits map - misses are due to concurrent commits")
}()

now := time.Now()
defer startingPointIterator.Close()
for startingPointIterator.Next() {
startingPoint := startingPointIterator.Value()
retentionDays := int(rules.DefaultRetentionDays)
commitNode, ok := commitsMap[startingPoint.CommitID]
if !ok {
return nil, fmt.Errorf("%w: %s", ErrCommitNotFound, startingPoint.CommitID)
commitNode, err := commitsMap.Get(startingPoint.CommitID)
if err != nil {
return nil, fmt.Errorf("%w: %s", err, startingPoint.CommitID)
}
if startingPoint.BranchID == "" {
// If the current commit is NOT a branch HEAD (a dangling commit) - add a hypothetical HEAD as its child
Expand All @@ -70,8 +133,7 @@ func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCS
}
} else {
// If the current commit IS a branch HEAD - fetch and retention rules for this branch and...
var branchRetentionDays int32
if branchRetentionDays, ok = rules.BranchRetentionDays[string(startingPoint.BranchID)]; ok {
if branchRetentionDays, ok := rules.BranchRetentionDays[string(startingPoint.BranchID)]; ok {
retentionDays = int(branchRetentionDays)
}
activeMap[startingPoint.CommitID] = struct{}{}
Expand All @@ -86,8 +148,7 @@ func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCS
// Start traversing the commit's ancestors (path):
for commitNode.MainParent != "" {
nextCommitID := commitNode.MainParent
var previousThreshold time.Time
if previousThreshold, ok = processed[nextCommitID]; ok && !previousThreshold.After(branchExpirationThreshold) {
if previousThreshold, ok := processed[nextCommitID]; ok && !previousThreshold.After(branchExpirationThreshold) {
// If the parent commit was already processed and its threshold was longer than the current threshold,
// i.e. the current threshold doesn't hold for it, stop processing it because the other path decision
// wins
Expand All @@ -100,9 +161,9 @@ func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCS
activeMap[nextCommitID] = struct{}{}
}
// Continue down the rabbit hole.
commitNode, ok = commitsMap[nextCommitID]
if !ok {
return nil, fmt.Errorf("%w: %s", ErrCommitNotFound, nextCommitID)
commitNode, err = commitsMap.Get(nextCommitID)
if err != nil {
return nil, fmt.Errorf("%w: %s", err, nextCommitID)
}
// Set the parent commit ID's expiration threshold as the current (this is true because this one is the
// longest, because we wouldn't have gotten here otherwise)
Expand All @@ -112,7 +173,7 @@ func GetGarbageCollectionCommits(ctx context.Context, startingPointIterator *GCS
if startingPointIterator.Err() != nil {
return nil, startingPointIterator.Err()
}
return makeCommitMap(commitsMap, activeMap), nil
return makeCommitMap(commitsMap.GetMap(), activeMap), nil
}

func makeCommitMap(commitNodes map[graveler.CommitID]CommitNode, commitSet map[graveler.CommitID]struct{}) map[graveler.CommitID]graveler.MetaRangeID {
Expand Down
109 changes: 108 additions & 1 deletion pkg/graveler/retention/active_commits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package retention

import (
"context"
"errors"
"fmt"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -58,6 +60,111 @@ func findMainAncestryLeaves(now time.Time, heads map[string]int32, commits map[s
return res
}

// fakeRepositoryCommitGetter is a RepositoryCommitGetter used to test CommitsMap.
type fakeRepositoryCommitGetter struct {
// Commits is the list of pre-existing commits.
Commits []*graveler.CommitRecord
// AnotherCommit, if not nil, is returned from Get (but not List).
// It can only be returned once.
AnotherCommit *graveler.CommitRecord
// GetCalled is true after the first call to Get.
GetCalled bool
}

var errTooManyGets = errors.New("more than one Get on fakeRepositoryCommitGetter")

func (c *fakeRepositoryCommitGetter) List(_ context.Context) (graveler.CommitIterator, error) {
return testutil.NewFakeCommitIterator(c.Commits), nil
}

func (c *fakeRepositoryCommitGetter) Get(_ context.Context, id graveler.CommitID) (*graveler.Commit, error) {
if c.AnotherCommit == nil {
return nil, fmt.Errorf("%w: Get(%s) when no other commits", graveler.ErrNotFound, id)
}
if id != c.AnotherCommit.CommitID {
return nil, fmt.Errorf("%w: Get(%s) when expecting Get(%s)", graveler.ErrNotFound, id, c.AnotherCommit.CommitID)
}
if c.GetCalled {
return nil, fmt.Errorf("%s: %w", id, errTooManyGets)
}
c.GetCalled = true
return c.AnotherCommit.Commit, nil
}

func TestCommitsMap(t *testing.T) {
cases := []struct {
Name string
CommitGetter *fakeRepositoryCommitGetter
}{
{
Name: "FromList",
CommitGetter: &fakeRepositoryCommitGetter{
Commits: []*graveler.CommitRecord{
{
CommitID: "a",
Commit: &graveler.Commit{
MetaRangeID: graveler.MetaRangeID("metarange:A"),
},
},
{
CommitID: "b",
Commit: &graveler.Commit{
MetaRangeID: graveler.MetaRangeID("metarange:B"),
},
},
},
AnotherCommit: nil,
},
}, {
Name: "FromGet",
CommitGetter: &fakeRepositoryCommitGetter{
Commits: []*graveler.CommitRecord{
{
CommitID: "a",
Commit: &graveler.Commit{
MetaRangeID: graveler.MetaRangeID("metarange:A"),
},
},
},
AnotherCommit: &graveler.CommitRecord{
CommitID: "b",
Commit: &graveler.Commit{
MetaRangeID: graveler.MetaRangeID("metarange:B"),
},
},
},
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
commitsMap, err := NewCommitsMap(context.Background(), tc.CommitGetter)
if err != nil {
t.Fatal(err)
}

a, err := commitsMap.Get(graveler.CommitID("a"))
if err != nil {
t.Errorf("Failed to get a: %s", err)
}
if a.MetaRangeID != "metarange:A" {
t.Errorf("Got metarange %s for a, expected \"metarange:A\"", a.MetaRangeID)
}
b, err := commitsMap.Get(graveler.CommitID("b"))
if err != nil {
t.Errorf("Failed to get b: %s", err)
}
if b.MetaRangeID != "metarange:B" {
t.Errorf("Got metarange %s for b, expected \"metarange:B\"", b.MetaRangeID)
}
c, err := commitsMap.Get(graveler.CommitID("c"))
if !errors.Is(err, graveler.ErrNotFound) {
t.Errorf("Got node %+v, error %s for c, expected not found", c.MetaRangeID, err)
}
})
}
}

func TestActiveCommits(t *testing.T) {
tests := map[string]struct {
commits map[string]testCommit
Expand Down Expand Up @@ -264,7 +371,7 @@ func TestActiveCommits(t *testing.T) {

gcCommits, err := GetGarbageCollectionCommits(ctx, NewGCStartingPointIterator(
testutil.NewFakeCommitIterator(findMainAncestryLeaves(now, tst.headsRetentionDays, tst.commits)),
testutil.NewFakeBranchIterator(branches)), &RepositoryCommitGetter{
testutil.NewFakeBranchIterator(branches)), &repositoryCommitGetter{
refManager: refManagerMock,
repository: repositoryRecord,
}, garbageCollectionRules)
Expand Down
15 changes: 12 additions & 3 deletions pkg/graveler/retention/garbage_collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,24 @@ func (m *GarbageCollectionManager) SaveGarbageCollectionUncommitted(ctx context.
}, stat.Size(), fd, block.PutOpts{})
}

type RepositoryCommitGetter struct {
type RepositoryCommitGetter interface {
List(ctx context.Context) (graveler.CommitIterator, error)
Get(ctx context.Context, id graveler.CommitID) (*graveler.Commit, error)
}

type repositoryCommitGetter struct {
refManager graveler.RefManager
repository *graveler.RepositoryRecord
}

func (r *RepositoryCommitGetter) ListCommits(ctx context.Context) (graveler.CommitIterator, error) {
func (r *repositoryCommitGetter) List(ctx context.Context) (graveler.CommitIterator, error) {
return r.refManager.ListCommits(ctx, r.repository)
}

func (r *repositoryCommitGetter) Get(ctx context.Context, id graveler.CommitID) (*graveler.Commit, error) {
return r.refManager.GetCommit(ctx, r.repository, id)
}

func NewGarbageCollectionManager(blockAdapter block.Adapter, refManager graveler.RefManager, committedBlockStoragePrefix string) *GarbageCollectionManager {
return &GarbageCollectionManager{
blockAdapter: blockAdapter,
Expand Down Expand Up @@ -149,7 +158,7 @@ func (m *GarbageCollectionManager) SaveRules(ctx context.Context, storageNamespa
}

func (m *GarbageCollectionManager) SaveGarbageCollectionCommits(ctx context.Context, repository *graveler.RepositoryRecord, rules *graveler.GarbageCollectionRules) (string, error) {
commitGetter := &RepositoryCommitGetter{
commitGetter := &repositoryCommitGetter{
refManager: m.refManager,
repository: repository,
}
Expand Down

0 comments on commit 43a1c9b

Please sign in to comment.