From e060896e671db7a1a7c6e0570b8ce9e046b3973d Mon Sep 17 00:00:00 2001 From: Ahmed Yasin Koculu Date: Sat, 7 Sep 2024 22:49:43 +0200 Subject: [PATCH] Added operation index to support asynchronous replication and audits --- README.md | 2 +- src/Playground/Test1.cs | 2 +- src/ZoneTree.UnitTests/AtomicUpdateTests.cs | 7 +- .../FixedSizeKeyAndValueTests.cs | 52 ++++++------ src/ZoneTree.UnitTests/OpIndexTests.cs | 51 +++++++++++ src/ZoneTree.UnitTests/StringTreeTests.cs | 2 +- src/ZoneTree.UnitTests/TTLTests.cs | 6 +- src/ZoneTree/Core/ZoneTree.Merge.cs | 1 + src/ZoneTree/Core/ZoneTree.ReadWrite.cs | 84 +++++++++++++------ src/ZoneTree/Core/ZoneTree.cs | 2 + src/ZoneTree/Core/ZoneTreeLoader.cs | 8 +- src/ZoneTree/Core/ZoneTreeMeta.cs | 2 + src/ZoneTree/Core/ZoneTreeMetaWAL.cs | 12 +++ src/ZoneTree/IZoneTree.cs | 39 ++++++--- src/ZoneTree/Segments/IMutableSegment.cs | 4 +- .../Segments/InMemory/MutableSegment.cs | 16 ++-- src/ZoneTree/docs/ZoneTree/README-NUGET.md | 2 +- 17 files changed, 212 insertions(+), 80 deletions(-) create mode 100644 src/ZoneTree.UnitTests/OpIndexTests.cs diff --git a/README.md b/README.md index f7cd6c0..f014e2e 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,7 @@ zoneTree.TryAtomicAddOrUpdate(39, "a", (ref string x) => { x += "b"; return true; -}); +}, out var opIndex); ``` --- diff --git a/src/Playground/Test1.cs b/src/Playground/Test1.cs index 7be4e0d..5b54b7a 100644 --- a/src/Playground/Test1.cs +++ b/src/Playground/Test1.cs @@ -108,7 +108,7 @@ public static void TestReverseIterator( { x += " ooops!"; return true; - }); + }, out _); } maintainer.WaitForBackgroundThreads(); } diff --git a/src/ZoneTree.UnitTests/AtomicUpdateTests.cs b/src/ZoneTree.UnitTests/AtomicUpdateTests.cs index 39147fd..1382481 100644 --- a/src/ZoneTree.UnitTests/AtomicUpdateTests.cs +++ b/src/ZoneTree.UnitTests/AtomicUpdateTests.cs @@ -48,7 +48,8 @@ public void IntIntAtomicIncrement(WriteAheadLogMode walMode) { ++y; return true; - } + }, + out _ ); Interlocked.Increment(ref off); } @@ -116,7 +117,7 @@ public void IntIntAtomicIncrementForBTree(WriteAheadLogMode walMode) { ++y; return true; - }); + }, out _); Interlocked.Increment(ref off); } @@ -182,7 +183,7 @@ public void IntIntMutableSegmentOnlyAtomicIncrement(WriteAheadLogMode walMode) { ++y; return true; - }); + }, out _); Interlocked.Increment(ref off); } diff --git a/src/ZoneTree.UnitTests/FixedSizeKeyAndValueTests.cs b/src/ZoneTree.UnitTests/FixedSizeKeyAndValueTests.cs index 24518bc..16958ac 100644 --- a/src/ZoneTree.UnitTests/FixedSizeKeyAndValueTests.cs +++ b/src/ZoneTree.UnitTests/FixedSizeKeyAndValueTests.cs @@ -75,10 +75,10 @@ public void IntStringDeleteTest() using var data = new ZoneTreeFactory() .SetDataDirectory(dataPath) .OpenOrCreate(); - data.TryAtomicAdd(1, "1"); - data.TryAtomicAdd(2, "2"); - data.TryAtomicAdd(3, "3"); - data.TryDelete(2); + data.TryAtomicAdd(1, "1", out _); + data.TryAtomicAdd(2, "2", out _); + data.TryAtomicAdd(3, "3", out _); + data.TryDelete(2, out _); Assert.That(data.ContainsKey(1), Is.True); Assert.That(data.ContainsKey(2), Is.False); Assert.That(data.ContainsKey(3), Is.True); @@ -95,10 +95,10 @@ public void IntNullableIntDeleteTest() .SetDataDirectory(dataPath) .SetValueSerializer(new NullableInt32Serializer()) .OpenOrCreate(); - data.TryAtomicAdd(1, 1); - data.TryAtomicAdd(2, 2); - data.TryAtomicAdd(3, 3); - data.TryDelete(2); + data.TryAtomicAdd(1, 1, out _); + data.TryAtomicAdd(2, 2, out _); + data.TryAtomicAdd(3, 3, out _); + data.TryDelete(2, out _); Assert.That(data.ContainsKey(1), Is.True); Assert.That(data.ContainsKey(2), Is.False); Assert.That(data.ContainsKey(3), Is.True); @@ -116,13 +116,13 @@ public void IntStringGarbageCollectionTest() using var data = new ZoneTreeFactory() .SetDataDirectory(dataPath) .OpenOrCreate(); - data.TryAtomicAdd(1, "1"); - data.TryAtomicAdd(2, "2"); - data.TryAtomicAdd(3, "3"); - data.TryDelete(2); - data.TryAtomicAdd(4, "4"); - data.TryAtomicUpdate(3, "33"); - data.TryDelete(2); + data.TryAtomicAdd(1, "1", out _); + data.TryAtomicAdd(2, "2", out _); + data.TryAtomicAdd(3, "3", out _); + data.TryDelete(2, out _); + data.TryAtomicAdd(4, "4", out _); + data.TryAtomicUpdate(3, "33", out _); + data.TryDelete(2, out _); Assert.That(data.ContainsKey(1), Is.True); Assert.That(data.ContainsKey(2), Is.False); Assert.That(data.ContainsKey(3), Is.True); @@ -161,13 +161,13 @@ public void IntStringReadOnlySegmentLoadingTest() using var data = new ZoneTreeFactory() .SetDataDirectory(dataPath) .OpenOrCreate(); - data.TryAtomicAdd(1, "1"); - data.TryAtomicAdd(2, "2"); - data.TryAtomicAdd(3, "3"); - data.TryDelete(2); - data.TryAtomicAdd(4, "4"); - data.TryAtomicUpdate(3, "33"); - data.TryDelete(2); + data.TryAtomicAdd(1, "1", out _); + data.TryAtomicAdd(2, "2", out _); + data.TryAtomicAdd(3, "3", out _); + data.TryDelete(2, out _); + data.TryAtomicAdd(4, "4", out _); + data.TryAtomicUpdate(3, "33", out _); + data.TryDelete(2, out _); Assert.That(data.ContainsKey(1), Is.True); Assert.That(data.ContainsKey(2), Is.False); Assert.That(data.ContainsKey(3), Is.True); @@ -207,10 +207,10 @@ public void IntStringDiskSegmentLoadingTest() using var data = new ZoneTreeFactory() .SetDataDirectory(dataPath) .OpenOrCreate(); - data.TryAtomicAdd(1, "1"); - data.TryAtomicAdd(2, "2"); - data.TryAtomicAdd(3, "3"); - data.TryDelete(2); + data.TryAtomicAdd(1, "1", out _); + data.TryAtomicAdd(2, "2", out _); + data.TryAtomicAdd(3, "3", out _); + data.TryDelete(2, out _); Assert.That(data.ContainsKey(1), Is.True); Assert.That(data.ContainsKey(2), Is.False); Assert.That(data.ContainsKey(3), Is.True); diff --git a/src/ZoneTree.UnitTests/OpIndexTests.cs b/src/ZoneTree.UnitTests/OpIndexTests.cs new file mode 100644 index 0000000..4360abd --- /dev/null +++ b/src/ZoneTree.UnitTests/OpIndexTests.cs @@ -0,0 +1,51 @@ +using System.Collections.Concurrent; +using Tenray.ZoneTree.PresetTypes; +using Tenray.ZoneTree.Serializers; + +namespace Tenray.ZoneTree.UnitTests; + +public sealed class OpIndexTests +{ + [Test] + public void TestOpIndex() + { + var dataPath = "data/TestOpIndex"; + if (Directory.Exists(dataPath)) + Directory.Delete(dataPath, true); + var recordCount = 10_000; + var opIndexes = new ConcurrentBag(); + void CreateData() + { + using var zoneTree = new ZoneTreeFactory() + .SetDataDirectory(dataPath) + .SetMutableSegmentMaxItemCount(100) + .OpenOrCreate(); + + using var maintainer = zoneTree.CreateMaintainer(); + Parallel.For(0, recordCount, (i) => + { + var opIndex = zoneTree.Upsert(i, i); + opIndexes.Add(opIndex); + }); + maintainer.EvictToDisk(); + maintainer.WaitForBackgroundThreads(); + } + + void ReloadData() + { + using var zoneTree = new ZoneTreeFactory() + .SetDataDirectory(dataPath) + .SetMutableSegmentMaxItemCount(100) + .Open(); + + var opIndex = zoneTree.Upsert(recordCount + 1, recordCount + 1); + Assert.That(opIndex, Is.EqualTo(recordCount + 1)); + zoneTree.Maintenance.Drop(); + } + CreateData(); + ReloadData(); + Assert.IsTrue( + opIndexes.Order().ToArray() + .SequenceEqual(Enumerable.Range(1, recordCount).Select(x => (long)x))); + } +} diff --git a/src/ZoneTree.UnitTests/StringTreeTests.cs b/src/ZoneTree.UnitTests/StringTreeTests.cs index 0a8a505..030498a 100644 --- a/src/ZoneTree.UnitTests/StringTreeTests.cs +++ b/src/ZoneTree.UnitTests/StringTreeTests.cs @@ -114,7 +114,7 @@ public void HelloWorldTest2() { x += "b"; return true; - }); + }, out _); zoneTree.TryGet(39, out value); Assert.That(value, Is.EqualTo("Hello Zone Tree!b")); } diff --git a/src/ZoneTree.UnitTests/TTLTests.cs b/src/ZoneTree.UnitTests/TTLTests.cs index c909df2..05784ba 100644 --- a/src/ZoneTree.UnitTests/TTLTests.cs +++ b/src/ZoneTree.UnitTests/TTLTests.cs @@ -35,13 +35,15 @@ public void TestTTL() 5, out v1, bool (ref TTLValue v) => - v.SlideExpiration(TimeSpan.FromMilliseconds(300))); + v.SlideExpiration(TimeSpan.FromMilliseconds(300)), + out _); Thread.Sleep(450); // initial expiration (300) + slided expiration (300) - Thread.Sleep(150) f2 = zoneTree.TryGetAndUpdate( 5, out v2, bool (ref TTLValue v) => - v.SlideExpiration(TimeSpan.FromMilliseconds(300))); + v.SlideExpiration(TimeSpan.FromMilliseconds(300)), + out _); Assert.That(f1, Is.True); Assert.That(f2, Is.False); diff --git a/src/ZoneTree/Core/ZoneTree.Merge.cs b/src/ZoneTree/Core/ZoneTree.Merge.cs index 1b14f18..c730736 100644 --- a/src/ZoneTree/Core/ZoneTree.Merge.cs +++ b/src/ZoneTree/Core/ZoneTree.Merge.cs @@ -36,6 +36,7 @@ void MoveMutableSegmentForward(IMutableSegment mutableSegment) mutableSegment.Freeze(); ReadOnlySegmentQueue.Enqueue(mutableSegment); + MetaWal.EnqueueMaximumOpIndex(mutableSegment.MaximumOpIndex); MetaWal.EnqueueReadOnlySegment(mutableSegment.SegmentId); MutableSegment = new MutableSegment( diff --git a/src/ZoneTree/Core/ZoneTree.ReadWrite.cs b/src/ZoneTree/Core/ZoneTree.ReadWrite.cs index 1895297..55c486c 100644 --- a/src/ZoneTree/Core/ZoneTree.ReadWrite.cs +++ b/src/ZoneTree/Core/ZoneTree.ReadWrite.cs @@ -61,15 +61,22 @@ public bool TryGet(in TKey key, out TValue value) return TryGetFromReadonlySegments(in key, out value); } - public bool TryAdd(in TKey key, in TValue value) + public bool TryAdd(in TKey key, in TValue value, out long opIndex) { if (ContainsKey(key)) + { + opIndex = 0; return false; - Upsert(in key, in value); + } + opIndex = Upsert(in key, in value); return true; } - public bool TryGetAndUpdate(in TKey key, out TValue value, ValueUpdaterDelegate valueUpdater) + public bool TryGetAndUpdate( + in TKey key, + out TValue value, + ValueUpdaterDelegate valueUpdater, + out long opIndex) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); @@ -77,22 +84,32 @@ public bool TryGetAndUpdate(in TKey key, out TValue value, ValueUpdaterDelegate< if (MutableSegment.TryGet(key, out value)) { if (IsDeleted(key, value)) + { + opIndex = 0; return false; + } } else if (!TryGetFromReadonlySegments(in key, out value)) + { + opIndex = 0; return false; + } if (!valueUpdater(ref value)) { // return true because // no update happened, but the value is found. + opIndex = 0; return true; } - Upsert(in key, in value); + opIndex = Upsert(in key, in value); return true; } - public bool TryAtomicGetAndUpdate(in TKey key, out TValue value, ValueUpdaterDelegate valueUpdater) + public bool TryAtomicGetAndUpdate( + in TKey key, + out TValue value, + ValueUpdaterDelegate valueUpdater) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); @@ -119,7 +136,7 @@ public bool TryAtomicGetAndUpdate(in TKey key, out TValue value, ValueUpdaterDel } } - public bool TryAtomicAdd(in TKey key, in TValue value) + public bool TryAtomicAdd(in TKey key, in TValue value, out long opIndex) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); @@ -127,13 +144,16 @@ public bool TryAtomicAdd(in TKey key, in TValue value) lock (AtomicUpdateLock) { if (ContainsKey(key)) + { + opIndex = 0; return false; - Upsert(key, value); + } + opIndex = Upsert(key, value); return true; } } - public bool TryAtomicUpdate(in TKey key, in TValue value) + public bool TryAtomicUpdate(in TKey key, in TValue value, out long opIndex) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); @@ -141,18 +161,26 @@ public bool TryAtomicUpdate(in TKey key, in TValue value) lock (AtomicUpdateLock) { if (!ContainsKey(key)) + { + opIndex = 0; return false; - Upsert(key, value); + } + opIndex = Upsert(key, value); return true; } } - public bool TryAtomicAddOrUpdate(in TKey key, in TValue valueToAdd, ValueUpdaterDelegate valueUpdater) + public bool TryAtomicAddOrUpdate( + in TKey key, + in TValue valueToAdd, + ValueUpdaterDelegate valueUpdater, + out long opIndex) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); AddOrUpdateResult status; IMutableSegment mutableSegment; + opIndex = 0; while (true) { lock (AtomicUpdateLock) @@ -164,19 +192,17 @@ public bool TryAtomicAddOrUpdate(in TKey key, in TValue valueToAdd, ValueUpdater } else if (mutableSegment.TryGet(in key, out var existing)) { - if (!valueUpdater(ref existing)) - return false; - status = mutableSegment.Upsert(key, existing); + if (!valueUpdater(ref existing)) return false; + status = mutableSegment.Upsert(key, existing, out opIndex); } else if (TryGetFromReadonlySegments(in key, out existing)) { - if (!valueUpdater(ref existing)) - return false; - status = mutableSegment.Upsert(key, existing); + if (!valueUpdater(ref existing)) return false; + status = mutableSegment.Upsert(key, existing, out opIndex); } else { - status = mutableSegment.Upsert(key, valueToAdd); + status = mutableSegment.Upsert(key, valueToAdd, out opIndex); } } switch (status) @@ -192,24 +218,24 @@ public bool TryAtomicAddOrUpdate(in TKey key, in TValue valueToAdd, ValueUpdater } } - public void AtomicUpsert(in TKey key, in TValue value) + public long AtomicUpsert(in TKey key, in TValue value) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); lock (AtomicUpdateLock) { - Upsert(in key, in value); + return Upsert(in key, in value); } } - public void Upsert(in TKey key, in TValue value) + public long Upsert(in TKey key, in TValue value) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); while (true) { var mutableSegment = MutableSegment; - var status = mutableSegment.Upsert(key, value); + var status = mutableSegment.Upsert(key, value, out var opIndex); switch (status) { case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: @@ -218,29 +244,33 @@ public void Upsert(in TKey key, in TValue value) MoveMutableSegmentForward(mutableSegment); continue; default: - return; + return opIndex; } } } - public bool TryDelete(in TKey key) + public bool TryDelete(in TKey key, out long opIndex) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); if (!ContainsKey(key)) + { + opIndex = 0; return false; - ForceDelete(in key); + } + opIndex = ForceDelete(in key); return true; } - public void ForceDelete(in TKey key) + public long ForceDelete(in TKey key) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); + long opIndex; while (true) { var mutableSegment = MutableSegment; - var status = mutableSegment.Delete(key); + var status = mutableSegment.Delete(key, out opIndex); switch (status) { case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: @@ -248,7 +278,7 @@ public void ForceDelete(in TKey key) case AddOrUpdateResult.RETRY_SEGMENT_IS_FULL: MoveMutableSegmentForward(mutableSegment); continue; - default: return; + default: return opIndex; } } } diff --git a/src/ZoneTree/Core/ZoneTree.cs b/src/ZoneTree/Core/ZoneTree.cs index ba966f2..54f4038 100644 --- a/src/ZoneTree/Core/ZoneTree.cs +++ b/src/ZoneTree/Core/ZoneTree.cs @@ -144,6 +144,7 @@ public ZoneTree(ZoneTreeOptions options) options, IncrementalIdProvider.NextId(), new IncrementalIdProvider()); IsDeleted = options.IsDeleted; FillZoneTreeMeta(); + ZoneTreeMeta.MaximumOpIndex = MutableSegment.OpIndexProvider.LastId; MetaWal.SaveMetaData( ZoneTreeMeta, MutableSegment.SegmentId, @@ -209,6 +210,7 @@ public void SaveMetaData() lock (ShortMergerLock) lock (AtomicUpdateLock) { + ZoneTreeMeta.MaximumOpIndex = MutableSegment.OpIndexProvider.LastId; MetaWal.SaveMetaData( ZoneTreeMeta, MutableSegment.SegmentId, diff --git a/src/ZoneTree/Core/ZoneTreeLoader.cs b/src/ZoneTree/Core/ZoneTreeLoader.cs index 37e91cc..6570765 100644 --- a/src/ZoneTree/Core/ZoneTreeLoader.cs +++ b/src/ZoneTree/Core/ZoneTreeLoader.cs @@ -102,6 +102,7 @@ void LoadZoneTreeMetaWAL() var records = metaWal.GetAllRecords(); var readOnlySegments = ZoneTreeMeta.ReadOnlySegments.ToList(); var bottomSegments = ZoneTreeMeta.BottomSegments?.ToList() ?? new List(); + var maximumOpIndex = 0L; foreach (var record in records) { var segmentId = record.SegmentId; @@ -138,10 +139,14 @@ void LoadZoneTreeMetaWAL() case MetaWalOperation.DeleteBottomSegment: bottomSegments.Remove(segmentId); break; + case MetaWalOperation.EnqueueMaximumOpIndex: + maximumOpIndex = Math.Max(maximumOpIndex, record.SegmentId); + break; } } ZoneTreeMeta.ReadOnlySegments = readOnlySegments; ZoneTreeMeta.BottomSegments = bottomSegments; + ZoneTreeMeta.MaximumOpIndex = maximumOpIndex; metaWal.SaveMetaData( ZoneTreeMeta, ZoneTreeMeta.MutableSegment, @@ -257,7 +262,7 @@ public ZoneTree LoadZoneTree() LoadZoneTreeMeta(); LoadZoneTreeMetaWAL(); SetMaximumId(); - var maximumOpIndex = LoadReadOnlySegments(); + var maximumOpIndex = Math.Max(ZoneTreeMeta.MaximumOpIndex, LoadReadOnlySegments()); bool collectGarbage = Options.EnableSingleSegmentGarbageCollection && !ZoneTreeMeta.HasDiskSegment && ReadOnlySegments.Count == 0; var mutableSegmentWal = LoadMutableSegment(maximumOpIndex, collectGarbage); if (collectGarbage) @@ -279,6 +284,7 @@ public ZoneTree LoadZoneTree() } LoadDiskSegment(); LoadBottomSegments(); + var zoneTree = new ZoneTree(Options, ZoneTreeMeta, ReadOnlySegments, MutableSegment, DiskSegment, BottomSegments, maximumSegmentId); return zoneTree; diff --git a/src/ZoneTree/Core/ZoneTreeMeta.cs b/src/ZoneTree/Core/ZoneTreeMeta.cs index 9fd2a10..d255b76 100644 --- a/src/ZoneTree/Core/ZoneTreeMeta.cs +++ b/src/ZoneTree/Core/ZoneTreeMeta.cs @@ -33,6 +33,8 @@ public sealed class ZoneTreeMeta public IReadOnlyList BottomSegments { get; set; } + public long MaximumOpIndex { get; set; } + [JsonIgnore] public bool HasDiskSegment => DiskSegment != 0 || BottomSegments?.Count > 0; } diff --git a/src/ZoneTree/Core/ZoneTreeMetaWAL.cs b/src/ZoneTree/Core/ZoneTreeMetaWAL.cs index 1a3e1ea..891b59e 100644 --- a/src/ZoneTree/Core/ZoneTreeMetaWAL.cs +++ b/src/ZoneTree/Core/ZoneTreeMetaWAL.cs @@ -70,6 +70,16 @@ public static bool Exists(ZoneTreeOptions options) } + public void EnqueueMaximumOpIndex(long maximumOpIndex) + { + var record = new MetaWalRecord + { + Operation = MetaWalOperation.EnqueueMaximumOpIndex, + SegmentId = maximumOpIndex + }; + AppendRecord(record); + } + public void EnqueueReadOnlySegment(long segmentId) { var record = new MetaWalRecord @@ -230,6 +240,7 @@ public void SaveMetaData( MutableSegmentMaxItemCount = zoneTreeMeta.MutableSegmentMaxItemCount, DiskSegmentMaxItemCount = zoneTreeMeta.DiskSegmentMaxItemCount, BottomSegments = bottomSegments, + MaximumOpIndex = zoneTreeMeta.MaximumOpIndex, }; var bytes = JsonSerializeToUtf8Bytes(newZoneTreeMeta); @@ -336,6 +347,7 @@ public enum MetaWalOperation DequeueBottomSegment, InsertBottomSegment, DeleteBottomSegment, + EnqueueMaximumOpIndex, } [StructLayout(LayoutKind.Sequential)] diff --git a/src/ZoneTree/IZoneTree.cs b/src/ZoneTree/IZoneTree.cs index 770c38d..32950af 100644 --- a/src/ZoneTree/IZoneTree.cs +++ b/src/ZoneTree/IZoneTree.cs @@ -46,8 +46,9 @@ public interface IZoneTree : IDisposable /// /// The key of the element to add. /// The value of the element to add. + /// The operation index. /// true if the key and value were added successfully; otherwise, false if the key already exists. - bool TryAdd(in TKey key, in TValue value); + bool TryAdd(in TKey key, in TValue value, out long opIndex); /// /// Tries to get the value of the given key and @@ -56,8 +57,13 @@ public interface IZoneTree : IDisposable /// The key of the element. /// The value of the element associated with the key. /// The delegate function that updates the value. + /// The operation index. /// true if the key is found; otherwise, false - bool TryGetAndUpdate(in TKey key, out TValue value, ValueUpdaterDelegate valueUpdater); + bool TryGetAndUpdate( + in TKey key, + out TValue value, + ValueUpdaterDelegate valueUpdater, + out long opIndex); /// /// Tries to get the value of the given key and @@ -67,25 +73,30 @@ public interface IZoneTree : IDisposable /// The value of the element associated with the key. /// The delegate function that updates the value. /// true if the key is found; otherwise, false - bool TryAtomicGetAndUpdate(in TKey key, out TValue value, ValueUpdaterDelegate valueUpdater); + bool TryAtomicGetAndUpdate( + in TKey key, + out TValue value, + ValueUpdaterDelegate valueUpdater); /// /// Attempts to add the specified key and value atomically across LSM-Tree segments. /// /// The key of the element to add. /// The value of the element to add. It can be null. + /// The operation index. /// true if the key/value pair was added successfully; /// otherwise, false. - bool TryAtomicAdd(in TKey key, in TValue value); + bool TryAtomicAdd(in TKey key, in TValue value, out long opIndex); /// /// Attempts to update the specified key's value atomically across LSM-Tree segments. /// /// The key of the element to update. /// The value of the element to update. It can be null. + /// The operation index. /// true if the key/value pair was updated successfully; /// otherwise, false. - bool TryAtomicUpdate(in TKey key, in TValue value); + bool TryAtomicUpdate(in TKey key, in TValue value, out long opIndex); /// /// Attempts to add or update the specified key and value atomically across LSM-Tree segments. @@ -93,16 +104,22 @@ public interface IZoneTree : IDisposable /// The key of the element to add. /// The value of the element to add. It can be null. /// The delegate function that updates the value. + /// The operation index. /// true if the key/value pair was added; /// false, if the key/value pair was updated. - bool TryAtomicAddOrUpdate(in TKey key, in TValue valueToAdd, ValueUpdaterDelegate valueUpdater); + bool TryAtomicAddOrUpdate( + in TKey key, + in TValue valueToAdd, + ValueUpdaterDelegate valueUpdater, + out long opIndex); /// /// Adds or updates the specified key/value pair atomically across LSM-Tree segments. /// /// The key of the element to upsert. /// The value of the element to upsert. - void AtomicUpsert(in TKey key, in TValue value); + /// The operation index. It can be used to distrubute the operations in stable order. + long AtomicUpsert(in TKey key, in TValue value); /// /// Adds or updates the specified key/value pair. @@ -133,7 +150,8 @@ public interface IZoneTree : IDisposable /// /// The key of the element to upsert. /// The value of the element to upsert. - void Upsert(in TKey key, in TValue value); + /// The operation index. It can be used to distrubute the operations in stable order. + long Upsert(in TKey key, in TValue value); /// /// Attempts to delete the specified key. @@ -141,7 +159,7 @@ public interface IZoneTree : IDisposable /// The key of the element to delete. /// true if the key was found and deleted; /// false if the key was not found. - bool TryDelete(in TKey key); + bool TryDelete(in TKey key, out long opIndex); /// /// Deletes the specified key regardless of existence. (hint: LSM Tree delete is an insert) @@ -149,7 +167,8 @@ public interface IZoneTree : IDisposable /// It increases the data lake size. /// /// The key of the element to delete. - void ForceDelete(in TKey key); + /// The operation index. It can be used to distrubute the operations in stable order. + long ForceDelete(in TKey key); /// /// Counts Keys in the entire database. diff --git a/src/ZoneTree/Segments/IMutableSegment.cs b/src/ZoneTree/Segments/IMutableSegment.cs index 71a0e06..f659de8 100644 --- a/src/ZoneTree/Segments/IMutableSegment.cs +++ b/src/ZoneTree/Segments/IMutableSegment.cs @@ -11,9 +11,9 @@ public interface IMutableSegment : IReadOnlySegment /// bool IsFrozen { get; } - AddOrUpdateResult Upsert(in TKey key, in TValue value); + AddOrUpdateResult Upsert(in TKey key, in TValue value, out long opIndex); - AddOrUpdateResult Delete(in TKey key); + AddOrUpdateResult Delete(in TKey key, out long opIndex); void Freeze(); diff --git a/src/ZoneTree/Segments/InMemory/MutableSegment.cs b/src/ZoneTree/Segments/InMemory/MutableSegment.cs index 6009ab4..748a650 100644 --- a/src/ZoneTree/Segments/InMemory/MutableSegment.cs +++ b/src/ZoneTree/Segments/InMemory/MutableSegment.cs @@ -144,18 +144,24 @@ public bool TryGet(in TKey key, out TValue value) return BTree.TryGetValue(key, out value); } - public AddOrUpdateResult Upsert(in TKey key, in TValue value) + public AddOrUpdateResult Upsert(in TKey key, in TValue value, out long opIndex) { try { Interlocked.Increment(ref WritesInProgress); if (IsFrozenFlag) + { + opIndex = 0; return AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN; + } if (BTree.Length >= MutableSegmentMaxItemCount) + { + opIndex = 0; return AddOrUpdateResult.RETRY_SEGMENT_IS_FULL; - var result = BTree.Upsert(in key, in value, out var opIndex); + } + var result = BTree.Upsert(in key, in value, out opIndex); WriteAheadLog.Append(in key, in value, opIndex); return result ? AddOrUpdateResult.ADDED : AddOrUpdateResult.UPDATED; } @@ -165,12 +171,12 @@ public AddOrUpdateResult Upsert(in TKey key, in TValue value) } } - public AddOrUpdateResult Delete(in TKey key) + public AddOrUpdateResult Delete(in TKey key, out long opIndex) { try { Interlocked.Increment(ref WritesInProgress); - + opIndex = 0; if (IsFrozenFlag) return AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN; @@ -192,7 +198,7 @@ public AddOrUpdateResult Delete(in TKey key) MarkValueDeleted(ref x); insertedValue = x; return AddOrUpdateResult.UPDATED; - }, out var opIndex); + }, out opIndex); WriteAheadLog.Append(in key, in insertedValue, opIndex); return status; } diff --git a/src/ZoneTree/docs/ZoneTree/README-NUGET.md b/src/ZoneTree/docs/ZoneTree/README-NUGET.md index d4a581e..524d8b7 100644 --- a/src/ZoneTree/docs/ZoneTree/README-NUGET.md +++ b/src/ZoneTree/docs/ZoneTree/README-NUGET.md @@ -174,7 +174,7 @@ zoneTree.TryAtomicAddOrUpdate(39, "a", (ref string x) => { x += "b"; return true; -}); +}, out var opIndex); ``` ---