Skip to content

Commit

Permalink
Introduce atomic operation result that is guaranteed to be called once.
Browse files Browse the repository at this point in the history
  • Loading branch information
koculu committed Oct 14, 2024
1 parent 1a0a62d commit 4660bf3
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/Playground/Test1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static void TestReverseIterator(
{
x += " ooops!";
return true;
}, out _);
});
}
maintainer.WaitForBackgroundThreads();
}
Expand Down
8 changes: 5 additions & 3 deletions src/ZoneTree.UnitTests/AtomicUpdateTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public void IntIntAtomicIncrement(WriteAheadLogMode walMode)
++y;
return true;
},
out _
(in int _, long _, OperationResult result) =>
{
}
);
Interlocked.Increment(ref off);
}
Expand Down Expand Up @@ -117,7 +119,7 @@ public void IntIntAtomicIncrementForBTree(WriteAheadLogMode walMode)
{
++y;
return true;
}, out _);
});
Interlocked.Increment(ref off);
}
Expand Down Expand Up @@ -183,7 +185,7 @@ public void IntIntMutableSegmentOnlyAtomicIncrement(WriteAheadLogMode walMode)
{
++y;
return true;
}, out _);
});
Interlocked.Increment(ref off);
}
Expand Down
2 changes: 1 addition & 1 deletion src/ZoneTree.UnitTests/ReplicatorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void CreateData()
.SetMutableSegmentMaxItemCount(maxMemory)
.OpenOrCreate();

var replicator = new Replicator<int, int>(replica, dataPath + "/replica-op-index");
using var replicator = new Replicator<int, int>(replica, dataPath + "/replica-op-index");
using var maintainer1 = zoneTree.CreateMaintainer();
using var maintainer2 = replica.CreateMaintainer();
var random = new Random();
Expand Down
2 changes: 1 addition & 1 deletion src/ZoneTree.UnitTests/StringTreeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void HelloWorldTest2()
{
x += "b";
return true;
}, out _);
});
zoneTree.TryGet(39, out value);
Assert.That(value, Is.EqualTo("Hello Zone Tree!b"));
}
Expand Down
8 changes: 5 additions & 3 deletions src/ZoneTree/Core/Replicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,20 @@ public void OnUpsert(TKey key, TValue value, long opIndex)
(ref long newOpIndex) =>
{
newOpIndex = opIndex;
Replica.Upsert(key, value);
return true;
},
(ref long existingOpIndex) =>
{
if (opIndex < existingOpIndex)
return false;
Replica.Upsert(key, value);
existingOpIndex = opIndex;
return true;
},
out _);
(in long _, long _, OperationResult result) =>
{
if (result == OperationResult.Cancelled) return;
Replica.Upsert(key, value);
});
}

public void Dispose()
Expand Down
82 changes: 71 additions & 11 deletions src/ZoneTree/Core/ZoneTree.ReadWrite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,25 @@ public bool TryAtomicUpdate(in TKey key, in TValue value, out long opIndex)
}
}

static OperationResultDelegate<TValue> EmptyOperationResultDelegate =
(in TValue value, long opIndex, OperationResult result) =>
{
};

public bool TryAtomicAddOrUpdate(
in TKey key,
in TValue valueToAdd,
ValueUpdaterDelegate<TValue> valueUpdater,
out long opIndex)
OperationResultDelegate<TValue> result)
{
if (IsReadOnly)
throw new ZoneTreeIsReadOnlyException();
AddOrUpdateResult status;
IMutableSegment<TKey, TValue> mutableSegment;
opIndex = 0;
var opIndex = 0L;
if (result == null)
result = EmptyOperationResultDelegate;

while (true)
{
lock (AtomicUpdateLock)
Expand All @@ -193,17 +201,40 @@ public bool TryAtomicAddOrUpdate(
}
else if (mutableSegment.TryGet(in key, out var existing))
{
if (!valueUpdater(ref existing)) return false;
if (!valueUpdater(ref existing))
{
result(in existing, 0, OperationResult.Cancelled);
return false;
}
status = mutableSegment.Upsert(key, existing, out opIndex);
if (status == AddOrUpdateResult.UPDATED)
{
result(in existing, opIndex, OperationResult.Updated);
return false;
}
}
else if (TryGetFromReadonlySegments(in key, out existing))
{
if (!valueUpdater(ref existing)) return false;
if (!valueUpdater(ref existing))
{
result(in existing, 0, OperationResult.Cancelled);
return false;
}
status = mutableSegment.Upsert(key, existing, out opIndex);
if (status == AddOrUpdateResult.ADDED)
{
result(in existing, opIndex, OperationResult.Updated);
return false;
}
}
else
{
status = mutableSegment.Upsert(key, valueToAdd, out opIndex);
if (status == AddOrUpdateResult.ADDED)
{
result(in existing, opIndex, OperationResult.Added);
return true;
}
}
}
switch (status)
Expand All @@ -214,7 +245,7 @@ public bool TryAtomicAddOrUpdate(
MoveMutableSegmentForward(mutableSegment);
continue;
default:
return status == AddOrUpdateResult.ADDED;
throw new Exception("Impossible.");
}
}
}
Expand All @@ -223,13 +254,15 @@ public bool TryAtomicAddOrUpdate(
in TKey key,
ValueAdderDelegate<TValue> valueAdder,
ValueUpdaterDelegate<TValue> valueUpdater,
out long opIndex)
OperationResultDelegate<TValue> result)
{
if (IsReadOnly)
throw new ZoneTreeIsReadOnlyException();
AddOrUpdateResult status;
IMutableSegment<TKey, TValue> mutableSegment;
opIndex = 0;
var opIndex = 0L;
if (result == null)
result = EmptyOperationResultDelegate;
while (true)
{
lock (AtomicUpdateLock)
Expand All @@ -241,18 +274,45 @@ public bool TryAtomicAddOrUpdate(
}
else if (mutableSegment.TryGet(in key, out var existing))
{
if (!valueUpdater(ref existing)) return false;
if (!valueUpdater(ref existing))
{
result(in existing, 0, OperationResult.Cancelled);
return false;
}
status = mutableSegment.Upsert(key, existing, out opIndex);
if (status == AddOrUpdateResult.UPDATED)
{
result(in existing, opIndex, OperationResult.Updated);
return false;
}
}
else if (TryGetFromReadonlySegments(in key, out existing))
{
if (!valueUpdater(ref existing)) return false;
if (!valueUpdater(ref existing))
{
result(in existing, 0, OperationResult.Cancelled);
return false;
}
status = mutableSegment.Upsert(key, existing, out opIndex);
if (status == AddOrUpdateResult.ADDED)
{
result(in existing, opIndex, OperationResult.Updated);
return false;
}
}
else
{
if (!valueAdder(ref existing)) return false;
if (!valueAdder(ref existing))
{
result(in existing, 0, OperationResult.Cancelled);
return false;
}
status = mutableSegment.Upsert(key, existing, out opIndex);
if (status == AddOrUpdateResult.ADDED)
{
result(in existing, opIndex, OperationResult.Added);
return true;
}
}
}
switch (status)
Expand All @@ -263,7 +323,7 @@ public bool TryAtomicAddOrUpdate(
MoveMutableSegmentForward(mutableSegment);
continue;
default:
return status == AddOrUpdateResult.ADDED;
throw new Exception("Impossible.");
}
}
}
Expand Down
34 changes: 26 additions & 8 deletions src/ZoneTree/IZoneTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,35 +100,37 @@ bool TryAtomicGetAndUpdate(
bool TryAtomicUpdate(in TKey key, in TValue value, out long opIndex);

/// <summary>
/// Attempts to add or update the specified key and value atomically across LSM-Tree segments.
/// Attempts to add or update the specified key and value atomically across LSM-Tree segments and calls the result delegate atomically.
/// valueUpdater can be called one or more times.
/// </summary>
/// <param name="key">The key of the element to add.</param>
/// <param name="valueToAdd">The value of the element to add. It can be null.</param>
/// <param name="valueUpdater">The delegate function that updates the value.</param>
/// <param name="opIndex">The operation index.</param>
/// <param name="result">The operation result delegate.</param>
/// <returns>true if the key/value pair was added;
/// false, if the key/value pair was updated.</returns>
bool TryAtomicAddOrUpdate(
in TKey key,
in TValue valueToAdd,
ValueUpdaterDelegate<TValue> valueUpdater,
out long opIndex);
OperationResultDelegate<TValue> result = null);

/// <summary>
/// Attempts to add or update the specified key and value atomically across LSM-Tree segments.
/// Attempts to add or update the specified key and value atomically across LSM-Tree segments and calls the result delegate atomically.
/// valueAdder can be called one or more times.
/// valueUpdater can be called one or more times.
/// </summary>
/// <param name="key">The key of the element to add.</param>
/// <param name="valueAdder">he delegate function that adds the value.</param>
/// <param name="valueUpdater">The delegate function that updates the value.</param>
/// <param name="opIndex">The operation index.</param>
/// <param name="result">The operation result delegate.</param>
/// <returns>true if the key/value pair was added;
/// false, if the key/value pair was updated.</returns>
bool TryAtomicAddOrUpdate(
in TKey key,
ValueAdderDelegate<TValue> valueAdder,
ValueUpdaterDelegate<TValue> valueUpdater,
out long opIndex);

OperationResultDelegate<TValue> result = null);

/// <summary>
/// Adds or updates the specified key/value pair atomically across LSM-Tree segments.
Expand Down Expand Up @@ -301,4 +303,20 @@ IZoneTreeIterator<TKey, TValue> CreateReverseIterator(
/// <typeparam name="TValue">The value type</typeparam>
/// <param name="value">The value as a reference to be added.</param>
/// <returns>true if the value is added, false otherwise.</returns>
public delegate bool ValueAdderDelegate<TValue>(ref TValue value);
public delegate bool ValueAdderDelegate<TValue>(ref TValue value);

public enum OperationResult
{
Added,
Updated,
Cancelled
}

/// <summary>
/// Value adder delegate.
/// </summary>
/// <typeparam name="TValue">The value type</typeparam>
/// <param name="value">The value that has been updated or added.</param>
/// <param name="opIndex">The operation index.</param>
/// <param name="operationResult">The operation result.</param>
public delegate void OperationResultDelegate<TValue>(in TValue value, long opIndex, OperationResult operationResult);

0 comments on commit 4660bf3

Please sign in to comment.