Skip to content

Commit

Permalink
[apache#2148] improvement(server): Rename ShuffleBufferWithLinkedList…
Browse files Browse the repository at this point in the history
… to DefaultShuffleBuffer
  • Loading branch information
Junfan Zhang committed Sep 26, 2024
1 parent a364f2a commit d6759d4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleFlushManager;

public class ShuffleBufferWithLinkedList extends AbstractShuffleBuffer {
public class DefaultShuffleBuffer extends AbstractShuffleBuffer {
// blocks will be added to inFlushBlockMap as <eventId, blocks> pair
// it will be removed after flush to storage
// the strategy ensure that shuffle is in memory or storage
private Set<ShufflePartitionedBlock> blocks;
private Map<Long, Set<ShufflePartitionedBlock>> inFlushBlockMap;

public ShuffleBufferWithLinkedList() {
public DefaultShuffleBuffer() {
this.blocks = new LinkedHashSet<>();
this.inFlushBlockMap = JavaUtils.newConcurrentMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public StatusCode registerBuffer(
if (shuffleBufferType == ShuffleBufferType.SKIP_LIST) {
shuffleBuffer = new ShuffleBufferWithSkipList();
} else {
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
}
bufferRangeMap.put(Range.closed(startPartition, endPartition), shuffleBuffer);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShuffleBufferWithLinkedListTest extends BufferTestBase {
public class DefaultShuffleBufferTest extends BufferTestBase {

private static AtomicInteger atomSequenceNo = new AtomicInteger(0);

@Test
public void appendTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
shuffleBuffer.append(createData(10));
// ShufflePartitionedBlock has constant 32 bytes overhead
assertEquals(42, shuffleBuffer.getSize());
Expand All @@ -59,7 +59,7 @@ public void appendTest() {

@Test
public void appendMultiBlocksTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
ShufflePartitionedData data1 = createData(10);
ShufflePartitionedData data2 = createData(10);
ShufflePartitionedBlock[] dataCombine = new ShufflePartitionedBlock[2];
Expand All @@ -71,7 +71,7 @@ public void appendMultiBlocksTest() {

@Test
public void toFlushEventTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
ShuffleDataFlushEvent event = shuffleBuffer.toFlushEvent("appId", 0, 0, 1, null);
assertNull(event);
shuffleBuffer.append(createData(10));
Expand All @@ -85,7 +85,7 @@ public void toFlushEventTest() {
@Test
public void getShuffleDataWithExpectedTaskIdsFilterTest() {
/** case1: all blocks in cached(or in flushed map) and size < readBufferSize */
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 55);
Expand Down Expand Up @@ -197,7 +197,7 @@ public void getShuffleDataWithExpectedTaskIdsFilterTest() {

@Test
public void getShuffleDataWithLocalOrderTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
ShufflePartitionedData spd1 = createData(1, 1, 15);
ShufflePartitionedData spd2 = createData(1, 0, 15);
ShufflePartitionedData spd3 = createData(1, 2, 15);
Expand Down Expand Up @@ -248,7 +248,7 @@ public void getShuffleDataWithLocalOrderTest() {

@Test
public void getShuffleDataTest() {
ShuffleBuffer shuffleBuffer = new ShuffleBufferWithLinkedList();
ShuffleBuffer shuffleBuffer = new DefaultShuffleBuffer();
// case1: cached data only, blockId = -1, readBufferSize > buffer size
ShufflePartitionedData spd1 = createData(10);
ShufflePartitionedData spd2 = createData(20);
Expand All @@ -261,7 +261,7 @@ public void getShuffleDataTest() {
assertArrayEquals(expectedData, sdr.getData());

// case2: cached data only, blockId = -1, readBufferSize = buffer size
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(20);
spd2 = createData(20);
shuffleBuffer.append(spd1);
Expand All @@ -273,7 +273,7 @@ public void getShuffleDataTest() {
assertArrayEquals(expectedData, sdr.getData());

// case3-1: cached data only, blockId = -1, readBufferSize < buffer size
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(20);
spd2 = createData(21);
shuffleBuffer.append(spd1);
Expand All @@ -285,7 +285,7 @@ public void getShuffleDataTest() {
assertArrayEquals(expectedData, sdr.getData());

// case3-2: cached data only, blockId = -1, readBufferSize < buffer size
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
ShufflePartitionedData spd3 = createData(15);
Expand All @@ -307,7 +307,7 @@ public void getShuffleDataTest() {
assertArrayEquals(expectedData, sdr.getData());

// case5: flush data only, blockId = -1, readBufferSize < buffer size
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
shuffleBuffer.append(spd1);
Expand All @@ -328,13 +328,13 @@ public void getShuffleDataTest() {
assertEquals(0, sdr.getBufferSegments().size());

// case6: no data in buffer & flush buffer
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
sdr = shuffleBuffer.getShuffleData(Constants.INVALID_BLOCK_ID, 10);
assertEquals(0, sdr.getBufferSegments().size());
assertEquals(0, sdr.getDataLength());

// case7: get data with multiple flush buffer and cached buffer
shuffleBuffer = new ShuffleBufferWithLinkedList();
shuffleBuffer = new DefaultShuffleBuffer();
spd1 = createData(15);
spd2 = createData(15);
spd3 = createData(15);
Expand Down

0 comments on commit d6759d4

Please sign in to comment.