Skip to content

Commit

Permalink
Use unique version ids within the same batch (#505)
Browse files Browse the repository at this point in the history
Put requests grouped on a single batch will have a unique version-id.

Semantic of version-id is not changed otherwise, different versions of
same record will have unique and increasing ids.
  • Loading branch information
merlimat authored Aug 27, 2024
1 parent 3cb50ba commit b0988d2
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 37 deletions.
36 changes: 36 additions & 0 deletions oxia/async_client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -848,3 +848,39 @@ func TestAsyncClientImpl_SequenceOrdering(t *testing.T) {
assert.NoError(t, client.Close())
assert.NoError(t, standaloneServer.Close())
}

func TestAsyncClientImpl_versionId(t *testing.T) {
standaloneServer, err := server.NewStandalone(server.NewTestConfig(t.TempDir()))
assert.NoError(t, err)

serviceAddress := fmt.Sprintf("localhost:%d", standaloneServer.RpcPort())
client, err := NewAsyncClient(serviceAddress)
assert.NoError(t, err)

ch0 := client.Put("/a", []byte("0"))
ch1 := client.Put("/a", []byte("1"))
ch2 := client.Put("/a", []byte("2"))

chb0 := client.Put("/b", []byte("0"))

r0 := <-ch0
r1 := <-ch1
r2 := <-ch2
rb0 := <-chb0

assert.NoError(t, r0.Err)
assert.NoError(t, r1.Err)
assert.NoError(t, r2.Err)
assert.NoError(t, rb0.Err)

assert.EqualValues(t, 0, r0.Version.VersionId)
assert.EqualValues(t, 1, r1.Version.VersionId)
assert.EqualValues(t, 2, r2.Version.VersionId)
assert.EqualValues(t, 3, rb0.Version.VersionId)

ch3 := client.Put("/a", []byte("3"))
r3 := <-ch3
assert.EqualValues(t, 4, r3.Version.VersionId)

assert.NoError(t, standaloneServer.Close())
}
4 changes: 2 additions & 2 deletions oxia/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func ExampleAsyncClient() {

// Output:
// First operation complete: version: 0 - error: <nil>
// First operation complete: version: 0 - error: <nil>
// First operation complete: version: 0 - error: <nil>
// First operation complete: version: 1 - error: <nil>
// First operation complete: version: 2 - error: <nil>
}

func ExampleNotifications() {
Expand Down
66 changes: 47 additions & 19 deletions server/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"log/slog"
"sync/atomic"
"time"

pb "google.golang.org/protobuf/proto"
Expand All @@ -40,8 +41,9 @@ var (
)

const (
commitOffsetKey = common.InternalKeyPrefix + "commit-offset"
termKey = common.InternalKeyPrefix + "term"
commitOffsetKey = common.InternalKeyPrefix + "commit-offset"
commitLastVersionIdKey = common.InternalKeyPrefix + "last-version-id"
termKey = common.InternalKeyPrefix + "term"
)

type UpdateOperationCallback interface {
Expand Down Expand Up @@ -118,13 +120,20 @@ func NewDB(namespace string, shardId int64, factory Factory, notificationRetenti
return nil, err
}

lastVersionId, err := db.readLastVersionId()
if err != nil {
return nil, err
}
db.versionIdTracker.Store(lastVersionId)

db.notificationsTracker = newNotificationsTracker(namespace, shardId, commitOffset, kv, notificationRetentionTime, clock)
return db, nil
}

type db struct {
kv KV
shardId int64
versionIdTracker atomic.Int64
notificationsTracker *notificationsTracker
log *slog.Logger

Expand Down Expand Up @@ -168,7 +177,7 @@ func (d *db) applyWriteRequest(b *proto.WriteRequest, batch WriteBatch, commitOf

d.putCounter.Add(len(b.Puts))
for _, putReq := range b.Puts {
pr, err := d.applyPut(commitOffset, batch, notifications, putReq, timestamp, updateOperationCallback)
pr, err := d.applyPut(batch, notifications, putReq, timestamp, updateOperationCallback, false)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -208,7 +217,11 @@ func (d *db) ProcessWrite(b *proto.WriteRequest, commitOffset int64, timestamp u
return nil, err
}

if err := d.addCommitOffset(commitOffset, batch, timestamp); err != nil {
if err := d.addASCIILong(commitOffsetKey, commitOffset, batch, timestamp); err != nil {
return nil, err
}

if err := d.addASCIILong(commitLastVersionIdKey, d.versionIdTracker.Load(), batch, timestamp); err != nil {
return nil, err
}

Expand Down Expand Up @@ -239,13 +252,13 @@ func (*db) addNotifications(batch WriteBatch, notifications *notifications) erro
return batch.Put(notificationKey(notifications.batch.Offset), value)
}

func (d *db) addCommitOffset(commitOffset int64, batch WriteBatch, timestamp uint64) error {
commitOffsetValue := []byte(fmt.Sprintf("%d", commitOffset))
_, err := d.applyPut(commitOffset, batch, nil, &proto.PutRequest{
Key: commitOffsetKey,
Value: commitOffsetValue,
func (d *db) addASCIILong(key string, value int64, batch WriteBatch, timestamp uint64) error {
asciiValue := []byte(fmt.Sprintf("%d", value))
_, err := d.applyPut(batch, nil, &proto.PutRequest{
Key: key,
Value: asciiValue,
ExpectedVersionId: nil,
}, timestamp, NoOpCallback)
}, timestamp, NoOpCallback, true)
return err
}

Expand Down Expand Up @@ -334,10 +347,18 @@ func (d *db) RangeScan(request *proto.RangeScanRequest) (RangeScanIterator, erro
}

func (d *db) ReadCommitOffset() (int64, error) {
return d.readASCIILong(commitOffsetKey)
}

func (d *db) readLastVersionId() (int64, error) {
return d.readASCIILong(commitLastVersionIdKey)
}

func (d *db) readASCIILong(key string) (int64, error) {
kv := d.kv

getReq := &proto.GetRequest{
Key: commitOffsetKey,
Key: key,
IncludeValue: true,
}
gr, err := applyGet(kv, getReq)
Expand All @@ -348,20 +369,20 @@ func (d *db) ReadCommitOffset() (int64, error) {
return wal.InvalidOffset, nil
}

var commitOffset int64
if _, err = fmt.Sscanf(string(gr.Value), "%d", &commitOffset); err != nil {
var res int64
if _, err = fmt.Sscanf(string(gr.Value), "%d", &res); err != nil {
return wal.InvalidOffset, err
}
return commitOffset, nil
return res, nil
}

func (d *db) UpdateTerm(newTerm int64) error {
batch := d.kv.NewWriteBatch()

if _, err := d.applyPut(wal.InvalidOffset, batch, nil, &proto.PutRequest{
if _, err := d.applyPut(batch, nil, &proto.PutRequest{
Key: termKey,
Value: []byte(fmt.Sprintf("%d", newTerm)),
}, now(), NoOpCallback); err != nil {
}, now(), NoOpCallback, true); err != nil {
return err
}

Expand Down Expand Up @@ -397,7 +418,7 @@ func (d *db) ReadTerm() (term int64, err error) {
return term, nil
}

func (d *db) applyPut(commitOffset int64, batch WriteBatch, notifications *notifications, putReq *proto.PutRequest, timestamp uint64, updateOperationCallback UpdateOperationCallback) (*proto.PutResponse, error) {
func (d *db) applyPut(batch WriteBatch, notifications *notifications, putReq *proto.PutRequest, timestamp uint64, updateOperationCallback UpdateOperationCallback, internal bool) (*proto.PutResponse, error) { //nolint:revive
var se *proto.StorageEntry
var err error
var newKey string
Expand Down Expand Up @@ -428,9 +449,16 @@ func (d *db) applyPut(commitOffset int64, batch WriteBatch, notifications *notif
}, nil
}

var versionId int64
if internal {
versionId = wal.InvalidOffset
} else {
versionId = d.versionIdTracker.Add(1)
}

if se == nil {
se = proto.StorageEntryFromVTPool()
se.VersionId = commitOffset
se.VersionId = versionId
se.ModificationsCount = 0
se.Value = putReq.Value
se.CreationTimestamp = timestamp
Expand All @@ -439,7 +467,7 @@ func (d *db) applyPut(commitOffset int64, batch WriteBatch, notifications *notif
se.ClientIdentity = putReq.ClientIdentity
se.PartitionKey = putReq.PartitionKey
} else {
se.VersionId = commitOffset
se.VersionId = versionId
se.ModificationsCount++
se.Value = putReq.Value
se.ModificationTimestamp = timestamp
Expand Down
18 changes: 9 additions & 9 deletions server/kv/db_notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ func TestDB_Notifications(t *testing.T) {
assert.EqualValues(t, 0, *n.VersionId)

t1 := now()
_, _ = db.ProcessWrite(&proto.WriteRequest{
wr1, _ := db.ProcessWrite(&proto.WriteRequest{
Puts: []*proto.PutRequest{{
Key: "a",
Value: []byte("1"),
}},
}, 1, t1, NoOpCallback)

t2 := now()
_, _ = db.ProcessWrite(&proto.WriteRequest{
wr2, _ := db.ProcessWrite(&proto.WriteRequest{
Puts: []*proto.PutRequest{{
Key: "b",
Value: []byte("0"),
Expand All @@ -85,7 +85,7 @@ func TestDB_Notifications(t *testing.T) {
n, found = nb.Notifications["a"]
assert.True(t, found)
assert.Equal(t, proto.NotificationType_KEY_MODIFIED, n.Type)
assert.EqualValues(t, 1, *n.VersionId)
assert.EqualValues(t, wr1.Puts[0].Version.VersionId, *n.VersionId)

nb = notifications[1]
assert.Equal(t, t2, nb.Timestamp)
Expand All @@ -95,11 +95,11 @@ func TestDB_Notifications(t *testing.T) {
n, found = nb.Notifications["b"]
assert.True(t, found)
assert.Equal(t, proto.NotificationType_KEY_CREATED, n.Type)
assert.EqualValues(t, 2, *n.VersionId)
assert.EqualValues(t, wr2.Puts[0].Version.VersionId, *n.VersionId)

// Write one batch
t3 := now()
_, _ = db.ProcessWrite(&proto.WriteRequest{
wr3, _ := db.ProcessWrite(&proto.WriteRequest{
Puts: []*proto.PutRequest{{
Key: "c",
Value: []byte("0"),
Expand All @@ -124,11 +124,11 @@ func TestDB_Notifications(t *testing.T) {
n, found = nb.Notifications["c"]
assert.True(t, found)
assert.Equal(t, proto.NotificationType_KEY_CREATED, n.Type)
assert.EqualValues(t, 3, *n.VersionId)
assert.EqualValues(t, wr3.Puts[0].Version.VersionId, *n.VersionId)
n, found = nb.Notifications["d"]
assert.True(t, found)
assert.Equal(t, proto.NotificationType_KEY_CREATED, n.Type)
assert.EqualValues(t, 3, *n.VersionId)
assert.EqualValues(t, wr3.Puts[1].Version.VersionId, *n.VersionId)
n, found = nb.Notifications["a"]
assert.True(t, found)
assert.Equal(t, proto.NotificationType_KEY_DELETED, n.Type)
Expand All @@ -137,7 +137,7 @@ func TestDB_Notifications(t *testing.T) {
// When there are multiple keys in one batch, only 1 notification
// is going to get triggered
t4 := now()
_, _ = db.ProcessWrite(&proto.WriteRequest{
wr4, _ := db.ProcessWrite(&proto.WriteRequest{
Puts: []*proto.PutRequest{{
Key: "x1",
Value: []byte("0"),
Expand All @@ -159,7 +159,7 @@ func TestDB_Notifications(t *testing.T) {
n, found = nb.Notifications["x1"]
assert.True(t, found)
assert.Equal(t, proto.NotificationType_KEY_MODIFIED, n.Type)
assert.EqualValues(t, 4, *n.VersionId)
assert.EqualValues(t, wr4.Puts[1].Version.VersionId, *n.VersionId)

assert.NoError(t, db.Close())
assert.NoError(t, factory.Close())
Expand Down
58 changes: 52 additions & 6 deletions server/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestDBSimple(t *testing.T) {
},
{ // Should succeed, the key was inserted in the batch
Key: "c",
ExpectedVersionId: pb.Int64(0),
ExpectedVersionId: pb.Int64(2),
},
{ // Should fail: the key was already just deleted
Key: "c",
Expand All @@ -91,7 +91,7 @@ func TestDBSimple(t *testing.T) {

r1 := res.Puts[1]
assert.Equal(t, proto.Status_OK, r1.Status)
assert.EqualValues(t, 0, r1.Version.VersionId)
assert.EqualValues(t, 1, r1.Version.VersionId)
assert.EqualValues(t, 0, r1.Version.ModificationsCount)

r2 := res.Puts[2]
Expand All @@ -104,7 +104,7 @@ func TestDBSimple(t *testing.T) {

r4 := res.Puts[4]
assert.Equal(t, proto.Status_OK, r4.Status)
assert.EqualValues(t, 0, r4.Version.VersionId)
assert.EqualValues(t, 2, r4.Version.VersionId)
assert.EqualValues(t, 0, r4.Version.ModificationsCount)

assert.Equal(t, 4, len(res.Deletes))
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestDBSameKeyMutations(t *testing.T) {
assert.Equal(t, 2, len(writeRes.Puts))
r0 = writeRes.Puts[0]
assert.Equal(t, proto.Status_OK, r0.Status)
assert.EqualValues(t, 1, r0.Version.VersionId)
assert.EqualValues(t, writeRes.Puts[0].Version.VersionId, r0.Version.VersionId)
assert.EqualValues(t, 1, r0.Version.ModificationsCount)
assert.Equal(t, t0, r0.Version.CreatedTimestamp)
assert.Equal(t, t1, r0.Version.ModifiedTimestamp)
Expand All @@ -241,7 +241,7 @@ func TestDBSameKeyMutations(t *testing.T) {
assert.NoError(t, err)

assert.Equal(t, proto.Status_OK, getRes.Status)
assert.EqualValues(t, 1, getRes.Version.VersionId)
assert.EqualValues(t, writeRes.Puts[0].Version.VersionId, getRes.Version.VersionId)
assert.Equal(t, "v1", string(getRes.Value))
assert.Equal(t, t0, getRes.Version.CreatedTimestamp)
assert.Equal(t, t1, getRes.Version.ModifiedTimestamp)
Expand All @@ -254,7 +254,7 @@ func TestDBSameKeyMutations(t *testing.T) {
assert.NoError(t, err)

assert.Equal(t, proto.Status_OK, getRes.Status)
assert.EqualValues(t, 1, getRes.Version.VersionId)
assert.EqualValues(t, writeRes.Puts[0].Version.VersionId, getRes.Version.VersionId)
assert.Nil(t, getRes.Value)
assert.Equal(t, t0, getRes.Version.CreatedTimestamp)
assert.Equal(t, t1, getRes.Version.ModifiedTimestamp)
Expand Down Expand Up @@ -865,3 +865,49 @@ func TestDBRangeScan(t *testing.T) {
assert.NoError(t, db.Close())
assert.NoError(t, factory.Close())
}

func TestDb_versionId(t *testing.T) {
factory, err := NewPebbleKVFactory(testKVOptions)
assert.NoError(t, err)
db, err := NewDB(common.DefaultNamespace, 1, factory, 0, common.SystemClock)
assert.NoError(t, err)

req := &proto.WriteRequest{
Puts: []*proto.PutRequest{
{
Key: "a",
Value: []byte("0"),
},
{
Key: "a",
Value: []byte("1"),
},
{
Key: "a",
Value: []byte("2"),
},
},
}

res, err := db.ProcessWrite(req, 0, 0, NoOpCallback)
assert.NoError(t, err)

assert.Equal(t, 3, len(res.Puts))
r0 := res.Puts[0]
assert.Equal(t, proto.Status_OK, r0.Status)
assert.EqualValues(t, 0, r0.Version.VersionId)
assert.EqualValues(t, 0, r0.Version.ModificationsCount)

r1 := res.Puts[1]
assert.Equal(t, proto.Status_OK, r1.Status)
assert.EqualValues(t, 1, r1.Version.VersionId)
assert.EqualValues(t, 1, r1.Version.ModificationsCount)

r2 := res.Puts[2]
assert.Equal(t, proto.Status_OK, r2.Status)
assert.EqualValues(t, 2, r2.Version.VersionId)
assert.EqualValues(t, 2, r2.Version.ModificationsCount)

assert.NoError(t, db.Close())
assert.NoError(t, factory.Close())
}
Loading

0 comments on commit b0988d2

Please sign in to comment.