Skip to content

Commit

Permalink
Merge pull request #97 from koculu/96-enhancement-add-new-methods-to-…
Browse files Browse the repository at this point in the history
…expose-opindexes-for-async-replication

Add new methods to expose opIndexes for async replication.
  • Loading branch information
koculu authored Oct 14, 2024
2 parents ac3c9b2 + 4660bf3 commit c58504f
Show file tree
Hide file tree
Showing 12 changed files with 482 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/Playground/Test1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static void TestReverseIterator(
{
x += " ooops!";
return true;
}, out _);
});
}
maintainer.WaitForBackgroundThreads();
}
Expand Down
8 changes: 5 additions & 3 deletions src/ZoneTree.UnitTests/AtomicUpdateTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public void IntIntAtomicIncrement(WriteAheadLogMode walMode)
++y;
return true;
},
out _
(in int _, long _, OperationResult result) =>
{
}
);
Interlocked.Increment(ref off);
}
Expand Down Expand Up @@ -117,7 +119,7 @@ public void IntIntAtomicIncrementForBTree(WriteAheadLogMode walMode)
{
++y;
return true;
}, out _);
});
Interlocked.Increment(ref off);
}
Expand Down Expand Up @@ -183,7 +185,7 @@ public void IntIntMutableSegmentOnlyAtomicIncrement(WriteAheadLogMode walMode)
{
++y;
return true;
}, out _);
});
Interlocked.Increment(ref off);
}
Expand Down
78 changes: 78 additions & 0 deletions src/ZoneTree.UnitTests/ReplicatorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using Tenray.ZoneTree.Core;

namespace Tenray.ZoneTree.UnitTests;

public sealed class ReplicatorTests
{
[Test]
public void TestReplicator()
{
var dataPath = "data/TestReplicator";
if (Directory.Exists(dataPath))
Directory.Delete(dataPath, true);
var recordCount = 50_000;
var keyCount = 15_000;
var maxMemory = 10_000;
void CreateData()
{
using var zoneTree = new ZoneTreeFactory<int, int>()
.SetDataDirectory(dataPath + "/source")
.SetMutableSegmentMaxItemCount(maxMemory)
.OpenOrCreate();

using var replica = new ZoneTreeFactory<int, int>()
.SetDataDirectory(dataPath + "/replica")
.SetMutableSegmentMaxItemCount(maxMemory)
.OpenOrCreate();

using var replicator = new Replicator<int, int>(replica, dataPath + "/replica-op-index");
using var maintainer1 = zoneTree.CreateMaintainer();
using var maintainer2 = replica.CreateMaintainer();
var random = new Random();
int replicated = 0;
Parallel.For(0, recordCount, (i) =>
{
var key = i % keyCount;
var value = random.Next();
var opIndex = zoneTree.Upsert(key, value);
Task.Run(() =>
{
replicator.OnUpsert(key, value, opIndex);
Interlocked.Increment(ref replicated);
});
});
while (replicated < recordCount) Task.Delay(500).Wait();
maintainer1.EvictToDisk();
maintainer2.EvictToDisk();
maintainer1.WaitForBackgroundThreads();
maintainer2.WaitForBackgroundThreads();
}

void TestEqual()
{
using var zoneTree = new ZoneTreeFactory<int, int>()
.SetDataDirectory(dataPath + "/source")
.Open();

using var replica = new ZoneTreeFactory<int, int>()
.SetDataDirectory(dataPath + "/replica")
.Open();

using var iterator1 = zoneTree.CreateIterator();
using var iterator2 = replica.CreateIterator();
while (true)
{
var n1 = iterator1.Next();
var n2 = iterator2.Next();
Assert.That(n2, Is.EqualTo(n1));
if (!n1) break;
Assert.That(iterator2.Current, Is.EqualTo(iterator1.Current));
}
zoneTree.Maintenance.Drop();
replica.Maintenance.Drop();
}

CreateData();
TestEqual();
}
}
2 changes: 1 addition & 1 deletion src/ZoneTree.UnitTests/StringTreeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void HelloWorldTest2()
{
x += "b";
return true;
}, out _);
});
zoneTree.TryGet(39, out value);
Assert.That(value, Is.EqualTo("Hello Zone Tree!b"));
}
Expand Down
6 changes: 1 addition & 5 deletions src/ZoneTree/Collections/BTree/BTree.NodeIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public sealed class NodeIterator

public TValue[] Values { get; }

public long[] OpIndexes { get; }

public TKey CurrentKey => Keys[CurrentIndex];

public TValue CurrentValue => Values[CurrentIndex];
Expand All @@ -28,14 +26,12 @@ public NodeIterator(
BTree<TKey, TValue> tree,
LeafNode leafNode,
TKey[] keys,
TValue[] values,
long[] opIndexes = null)
TValue[] values)
{
Tree = tree;
Node = leafNode;
Keys = keys;
Values = values;
OpIndexes = opIndexes;
}

public NodeIterator GetPreviousNodeIterator()
Expand Down
101 changes: 101 additions & 0 deletions src/ZoneTree/Collections/BTree/BTree.Write.OpIndex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
using Tenray.ZoneTree.Exceptions;

namespace Tenray.ZoneTree.Collections.BTree;

public delegate TValue GetValueDelegate<TKey, TValue>(long opIndex);

/// <summary>
/// In memory B+Tree.
/// This class is thread-safe.
/// </summary>
/// <typeparam name="TKey">Key Type</typeparam>
/// <typeparam name="TValue">Value Type</typeparam>
public sealed partial class BTree<TKey, TValue>
{
public bool Upsert(in TKey key, GetValueDelegate<TKey, TValue> valueGetter, out TValue value, out long opIndex)
{
if (IsReadOnly)
throw new BTreeIsReadOnlyException();
try
{
WriteLock();
while (true)
{
var root = Root;
root.WriteLock();
if (root != Root)
{
root.WriteUnlock();
continue;
}

if (!root.IsFull)
{
return UpsertNonFull(root, in key, valueGetter, out value, out opIndex);
}
var newRoot = new Node(GetNodeLocker(), NodeSize);
newRoot.Children[0] = root;
newRoot.WriteLock();
TrySplitChild(newRoot, 0, root);
var result = UpsertNonFull(newRoot, in key, valueGetter, out value, out opIndex);
Root = newRoot;
root.WriteUnlock();
return result;
}
}
catch (Exception)
{
Root.WriteUnlock();
throw;
}
finally
{
WriteUnlock();
}
}

bool UpsertNonFull(Node node, in TKey key, GetValueDelegate<TKey, TValue> valueGetter, out TValue value, out long opIndex)
{
while (true)
{
var found = node.TryGetPosition(Comparer, in key, out var position);
if (node is LeafNode leaf)
{
opIndex = OpIndexProvider.NextId();
if (found)
{
value = valueGetter(opIndex);
leaf.Update(position, in key, value);
node.WriteUnlock();
return false;
}
value = valueGetter(opIndex);
leaf.Insert(position, in key, value);
Interlocked.Increment(ref _length);
node.WriteUnlock();
return true;
}
if (found)
++position;
var child = node.Children[position];
child.WriteLock();
if (child.IsFull)
{
var splitted = TrySplitChild(node, position, child);
child.WriteUnlock();
if (!splitted)
{
continue;
}

if (Comparer.Compare(in key, in node.Keys[position]) >= 0)
++position;

child = node.Children[position];
child.WriteLock();
}
node.WriteUnlock();
node = child;
}
}
}
63 changes: 63 additions & 0 deletions src/ZoneTree/Core/Replicator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
namespace Tenray.ZoneTree.Core;

using System.Collections.Concurrent;
using System.Collections.Generic;
using Tenray.ZoneTree.Collections;

public sealed class Replicator<TKey, TValue> : IDisposable
{
readonly IZoneTree<TKey, TValue> Replica;

readonly IZoneTree<TKey, long> LatestOpIndexes;

readonly IMaintainer Maintainer;

bool isDisposed;

public Replicator(
IZoneTree<TKey, TValue> replica,
string dataPath,
Action<ZoneTreeFactory<TKey, long>> configure = null)
{
this.Replica = replica;
var factory = new ZoneTreeFactory<TKey, long>()
.SetDataDirectory(dataPath);
if (configure != null) configure(factory);
LatestOpIndexes = factory.OpenOrCreate();
Maintainer = LatestOpIndexes.CreateMaintainer();
Maintainer.EnableJobForCleaningInactiveCaches = true;
}

public void OnUpsert(TKey key, TValue value, long opIndex)
{
LatestOpIndexes.TryAtomicAddOrUpdate(
key,
(ref long newOpIndex) =>
{
newOpIndex = opIndex;
return true;
},
(ref long existingOpIndex) =>
{
if (opIndex < existingOpIndex)
return false;
existingOpIndex = opIndex;
return true;
},
(in long _, long _, OperationResult result) =>
{
if (result == OperationResult.Cancelled) return;
Replica.Upsert(key, value);
});
}

public void Dispose()
{
if (isDisposed) return;
isDisposed = true;
Maintainer.EvictToDisk();
Maintainer.WaitForBackgroundThreads();
Maintainer.Dispose();
LatestOpIndexes.Dispose();
}
}
Loading

0 comments on commit c58504f

Please sign in to comment.