diff --git a/simulator/README.md b/simulator/README.md index 0851d64c..99084be7 100644 --- a/simulator/README.md +++ b/simulator/README.md @@ -38,15 +38,17 @@ There are 2 configuration sets: ### BlockStreamConfig Uses the prefix `blockStream` so all properties should start with `blockStream.` -| Key | Description | Default Value | -|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------| -| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` | -| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` | -| `delayBetweenBlockItems` | The delay between each block item in nanoseconds | `1_500_000` | -| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` | -| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` | -| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 | -| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` | +| Key | Description | Default Value | +|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------| +| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` | +| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` | +| `delayBetweenBlockItems` | The delay between each block item in nanoseconds, only applicable when streamingMode=CONSTANT_RATE | `1_500_000` | +| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` | +| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` | +| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 | +| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` | +| `streamingMode` | can either be `CONSTANT_RATE` or `MILLIS_PER_BLOCK`, if `CONSTANT_RATE` | `CONSTANT_RATE` | +| `millisecondsPerBlock` | if streamingMode is `MILLIS_PER_BLOCK` this will be the time to wait between blocks in milliseconds | `1_000` | ### GrpcConfig Uses the prefix `grpc` so all properties should start with `grpc.` diff --git a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java index 7e7859d3..96bec9f9 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java +++ b/simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java @@ -17,9 +17,11 @@ package com.hedera.block.simulator; import com.hedera.block.simulator.config.data.BlockStreamConfig; +import com.hedera.block.simulator.config.types.StreamingMode; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; import com.swirlds.config.api.Configuration; import edu.umd.cs.findbugs.annotations.NonNull; @@ -33,13 +35,13 @@ public class BlockStreamSimulatorApp { private static final System.Logger LOGGER = System.getLogger(BlockStreamSimulatorApp.class.getName()); - Configuration configuration; - BlockStreamManager blockStreamManager; - PublishStreamGrpcClient publishStreamGrpcClient; - BlockStreamConfig blockStreamConfig; + private final BlockStreamManager blockStreamManager; + private final PublishStreamGrpcClient publishStreamGrpcClient; + private final BlockStreamConfig blockStreamConfig; + private final StreamingMode streamingMode; private final int delayBetweenBlockItems; - + private final int millisecondsPerBlock; private final AtomicBoolean isRunning = new AtomicBoolean(false); /** @@ -54,12 +56,13 @@ public BlockStreamSimulatorApp( @NonNull Configuration configuration, @NonNull BlockStreamManager blockStreamManager, @NonNull PublishStreamGrpcClient publishStreamGrpcClient) { - this.configuration = configuration; this.blockStreamManager = blockStreamManager; this.publishStreamGrpcClient = publishStreamGrpcClient; blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class); + streamingMode = blockStreamConfig.streamingMode(); + millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock(); delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems(); } @@ -71,12 +74,48 @@ public BlockStreamSimulatorApp( * @throws IOException if an I/O error occurs */ public void start() throws InterruptedException, BlockSimulatorParsingException, IOException { - int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000; - int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000; isRunning.set(true); LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started"); + if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) { + millisPerBlockStreaming(); + } else { + constantRateStreaming(); + } + + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); + } + + private void millisPerBlockStreaming() + throws IOException, InterruptedException, BlockSimulatorParsingException { + + final long secondsPerBlockNanos = millisecondsPerBlock * 1_000_000L; + + Block nextBlock = blockStreamManager.getNextBlock(); + while (nextBlock != null) { + long startTime = System.nanoTime(); + publishStreamGrpcClient.streamBlock(nextBlock); + long elapsedTime = System.nanoTime() - startTime; + long timeToDelay = secondsPerBlockNanos - elapsedTime; + if (timeToDelay > 0) { + Thread.sleep(timeToDelay / 1_000_000, (int) (timeToDelay % 1_000_000)); + } else { + LOGGER.log( + System.Logger.Level.WARNING, + "Block Server is running behind, Streaming took longer than max expected: " + + millisecondsPerBlock + + " milliseconds"); + } + nextBlock = blockStreamManager.getNextBlock(); + } + LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); + } + + private void constantRateStreaming() + throws InterruptedException, IOException, BlockSimulatorParsingException { + int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000; + int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000; boolean streamBlockItem = true; int blockItemsStreamed = 0; @@ -104,8 +143,6 @@ public void start() throws InterruptedException, BlockSimulatorParsingException, streamBlockItem = false; } } - - LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped"); } /** diff --git a/simulator/src/main/java/com/hedera/block/simulator/Constants.java b/simulator/src/main/java/com/hedera/block/simulator/Constants.java index 9b4bda17..28e4da33 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/Constants.java +++ b/simulator/src/main/java/com/hedera/block/simulator/Constants.java @@ -16,10 +16,15 @@ package com.hedera.block.simulator; +/** The Constants class defines the constants for the block simulator. */ public class Constants { - // The file extension for block files. + /** Constructor to prevent instantiation. this is only a utility class */ + private Constants() {} + + /** The file extension for block files. */ public static final String RECORD_EXTENSION = "blk"; - // postfix for gzipped files + + /** postfix for gzip files */ public static final String GZ_EXTENSION = ".gz"; } diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java index 7c97934a..ecc6d50a 100644 --- a/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java @@ -17,6 +17,7 @@ package com.hedera.block.simulator.config.data; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import com.swirlds.config.api.ConfigData; import com.swirlds.config.api.ConfigProperty; import java.nio.file.Files; @@ -33,6 +34,8 @@ * @param maxBlockItemsToStream the maximum number of block items to stream * @param paddedLength the padded length of 0 the block file format * @param fileExtension the file extension of the block file format + * @param streamingMode the mode of streaming for the block stream + * @param millisecondsPerBlock the milliseconds per block */ @ConfigData("blockStream") public record BlockStreamConfig( @@ -43,7 +46,9 @@ public record BlockStreamConfig( String managerImplementation, @ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream, @ConfigProperty(defaultValue = "36") int paddedLength, - @ConfigProperty(defaultValue = ".blk.gz") String fileExtension) { + @ConfigProperty(defaultValue = ".blk.gz") String fileExtension, + @ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode, + @ConfigProperty(defaultValue = "1000") int millisecondsPerBlock) { /** * Constructor to set the default root path if not provided, it will be set to the data diff --git a/simulator/src/main/java/com/hedera/block/simulator/config/types/StreamingMode.java b/simulator/src/main/java/com/hedera/block/simulator/config/types/StreamingMode.java new file mode 100644 index 00000000..39ccd46d --- /dev/null +++ b/simulator/src/main/java/com/hedera/block/simulator/config/types/StreamingMode.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.config.types; + +/** The StreamingMode enum defines the different modes for streaming blocks. */ +public enum StreamingMode { + + /** It will wait X Nanos between each block. */ + CONSTANT_RATE, + + /** It will attempt to send a block each X Millis. */ + MILLIS_PER_BLOCK; + + /** + * Converts a string to a StreamingMode. + * + * @param mode the string to convert + * @return the StreamingMode + */ + public static StreamingMode fromString(String mode) { + return switch (mode) { + case "CONSTANT_RATE" -> CONSTANT_RATE; + case "MILLIS_PER_BLOCK" -> MILLIS_PER_BLOCK; + default -> throw new IllegalArgumentException("Invalid mode: " + mode); + }; + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java index 24ba0705..f0833822 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/BlockStreamSimulatorTest.java @@ -22,11 +22,18 @@ import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import com.hedera.block.simulator.generator.BlockStreamManager; import com.hedera.block.simulator.grpc.PublishStreamGrpcClient; +import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.hapi.block.stream.output.BlockHeader; import com.swirlds.config.api.Configuration; import java.io.IOException; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.logging.Handler; +import java.util.logging.LogRecord; +import java.util.logging.Logger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,8 +45,6 @@ @ExtendWith(MockitoExtension.class) class BlockStreamSimulatorTest { - private Configuration configuration; - @Mock private BlockStreamManager blockStreamManager; @Mock private PublishStreamGrpcClient publishStreamGrpcClient; @@ -49,8 +54,13 @@ class BlockStreamSimulatorTest { @BeforeEach void setUp() throws IOException { - configuration = - TestUtils.getTestConfiguration(Map.of("blockStream.maxBlockItemsToStream", "100")); + Configuration configuration = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.maxBlockItemsToStream", + "100", + "blockStream.streamingMode", + "CONSTANT_RATE")); blockStreamSimulator = new BlockStreamSimulatorApp( @@ -84,7 +94,9 @@ void start_exitByBlockNull() "blockStream.BlockAsFileBlockStreamManager", "BlockAsFileLargeDataSets", "blockStream.rootPath", - getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"))); + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.streamingMode", + "CONSTANT_RATE")); BlockStreamSimulatorApp blockStreamSimulator = new BlockStreamSimulatorApp( @@ -102,4 +114,121 @@ private String getAbsoluteFolder(String relativePath) { void stop_doesNotThrowException() { assertDoesNotThrow(() -> blockStreamSimulator.stop()); } + + @Test + void start_millisPerSecond() + throws InterruptedException, IOException, BlockSimulatorParsingException { + BlockStreamManager blockStreamManager = Mockito.mock(BlockStreamManager.class); + BlockItem blockItem = + BlockItem.newBuilder() + .blockHeader(BlockHeader.newBuilder().number(1L).build()) + .build(); + Block block = Block.newBuilder().items(blockItem).build(); + when(blockStreamManager.getNextBlock()).thenReturn(block, block, null); + + Configuration configuration = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.maxBlockItemsToStream", + "2", + "blockStream.BlockAsFileBlockStreamManager", + "BlockAsFileLargeDataSets", + "blockStream.rootPath", + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.streamingMode", + "MILLIS_PER_BLOCK")); + + BlockStreamSimulatorApp blockStreamSimulator = + new BlockStreamSimulatorApp( + configuration, blockStreamManager, publishStreamGrpcClient); + + blockStreamSimulator.start(); + assertTrue(blockStreamSimulator.isRunning()); + } + + @Test + void start_millisPerSecond_streamingLagVerifyWarnLog() + throws InterruptedException, IOException, BlockSimulatorParsingException { + List logRecords = captureLogs(); + + BlockStreamManager blockStreamManager = Mockito.mock(BlockStreamManager.class); + BlockItem blockItem = + BlockItem.newBuilder() + .blockHeader(BlockHeader.newBuilder().number(1L).build()) + .build(); + Block block = Block.newBuilder().items(blockItem).build(); + when(blockStreamManager.getNextBlock()).thenReturn(block, block, null); + PublishStreamGrpcClient publishStreamGrpcClient = + Mockito.mock(PublishStreamGrpcClient.class); + + // simulate that the first block takes 15ms to stream, when the limit is 10, to force to go + // over WARN Path. + when(publishStreamGrpcClient.streamBlock(block)) + .thenAnswer( + invocation -> { + Thread.sleep(15); + return true; + }) + .thenReturn(true); + + Configuration configuration = + TestUtils.getTestConfiguration( + Map.of( + "blockStream.maxBlockItemsToStream", + "2", + "blockStream.BlockAsFileBlockStreamManager", + "BlockAsFileLargeDataSets", + "blockStream.rootPath", + getAbsoluteFolder("src/test/resources/block-0.0.3-blk/"), + "blockStream.streamingMode", + "MILLIS_PER_BLOCK", + "blockStream.millisecondsPerBlock", + "10")); + + BlockStreamSimulatorApp blockStreamSimulator = + new BlockStreamSimulatorApp( + configuration, blockStreamManager, publishStreamGrpcClient); + + blockStreamSimulator.start(); + assertTrue(blockStreamSimulator.isRunning()); + + // Assert log exists + boolean found_log = + logRecords.stream() + .anyMatch( + logRecord -> + logRecord + .getMessage() + .contains( + "Block Server is running behind, Streaming" + + " took longer than max expected: 10" + + " milliseconds")); + assertTrue(found_log); + } + + private List captureLogs() { + // Capture logs + Logger logger = Logger.getLogger(BlockStreamSimulatorApp.class.getName()); + final List logRecords = new ArrayList<>(); + + // Custom handler to capture logs + Handler handler = + new Handler() { + @Override + public void publish(LogRecord record) { + logRecords.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() throws SecurityException {} + }; + + // Add handler to logger + logger.addHandler(handler); + + return logRecords; + } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java index 36c7f412..7f7bf49f 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/config/data/BlockStreamConfigTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.*; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -31,6 +32,8 @@ class BlockStreamConfigTest { private final int maxBlockItemsToStream = 10_000; private final int paddedLength = 36; private final String fileExtension = ".blk"; + private final StreamingMode streamingMode = StreamingMode.CONSTANT_RATE; + private final int millisPerBlock = 1000; private String getAbsoluteFolder(String relativePath) { return Paths.get(relativePath).toAbsolutePath().toString(); @@ -56,7 +59,9 @@ void testValidAbsolutePath() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension); + fileExtension, + streamingMode, + millisPerBlock); assertEquals(folderRootPath, config.folderRootPath()); assertEquals(GenerationMode.DIR, config.generationMode()); @@ -77,7 +82,9 @@ void testEmptyFolderRootPath() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension); + fileExtension, + streamingMode, + millisPerBlock); // Verify that the path is set to the default Path expectedPath = Paths.get("src/main/resources/block-0.0.3/").toAbsolutePath(); @@ -103,7 +110,9 @@ void testRelativeFolderPathThrowsException() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension)); + fileExtension, + streamingMode, + millisPerBlock)); // Verify the exception message assertEquals(relativeFolderPath + " Root path must be absolute", exception.getMessage()); @@ -131,7 +140,9 @@ void testNonExistentFolderThrowsException() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension)); + fileExtension, + streamingMode, + millisPerBlock)); // Verify the exception message assertEquals("Folder does not exist: " + path, exception.getMessage()); @@ -152,7 +163,9 @@ void testGenerationModeNonDirDoesNotCheckFolderExistence() { blockStreamManagerImplementation, maxBlockItemsToStream, paddedLength, - fileExtension); + fileExtension, + streamingMode, + millisPerBlock); // Verify that the configuration was created successfully assertEquals(folderRootPath, config.folderRootPath()); diff --git a/simulator/src/test/java/com/hedera/block/simulator/config/types/StreamingModeTest.java b/simulator/src/test/java/com/hedera/block/simulator/config/types/StreamingModeTest.java new file mode 100644 index 00000000..ca723e51 --- /dev/null +++ b/simulator/src/test/java/com/hedera/block/simulator/config/types/StreamingModeTest.java @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2024 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hedera.block.simulator.config.types; + +import static org.junit.jupiter.api.Assertions.*; + +class StreamingModeTest { + + @org.junit.jupiter.api.Test + void fromString() { + assertEquals(StreamingMode.CONSTANT_RATE, StreamingMode.fromString("CONSTANT_RATE")); + assertEquals(StreamingMode.MILLIS_PER_BLOCK, StreamingMode.fromString("MILLIS_PER_BLOCK")); + assertThrows(IllegalArgumentException.class, () -> StreamingMode.fromString("INVALID")); + } +} diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java index 6ea25a54..be19eb0a 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java @@ -22,6 +22,7 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import java.io.IOException; import java.nio.file.Paths; @@ -82,7 +83,9 @@ private BlockStreamManager getBlockAsDirBlockStreamManager(String rootFolder) { "BlockAsDirBlockStreamManager", 10_000, 36, - ".blk"); + ".blk", + StreamingMode.CONSTANT_RATE, + 1000); return new BlockAsDirBlockStreamManager(blockStreamConfig); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java index b833a57d..b5a95023 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java @@ -20,6 +20,7 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import java.io.IOException; import java.nio.file.Paths; @@ -84,7 +85,9 @@ private BlockAsFileBlockStreamManager getBlockAsFileBlockStreamManager(String ro "BlockAsFileBlockStreamManager", 10_000, 36, - ".blk"); + ".blk", + StreamingMode.CONSTANT_RATE, + 1000); return new BlockAsFileBlockStreamManager(blockStreamConfig); } } diff --git a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java index 01b3006b..4aeb049e 100644 --- a/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java +++ b/simulator/src/test/java/com/hedera/block/simulator/generator/BlockAsFileLargeDataSetsTest.java @@ -20,6 +20,7 @@ import com.hedera.block.simulator.config.data.BlockStreamConfig; import com.hedera.block.simulator.config.types.GenerationMode; +import com.hedera.block.simulator.config.types.StreamingMode; import com.hedera.block.simulator.exception.BlockSimulatorParsingException; import com.hedera.hapi.block.stream.BlockItem; import java.io.File; @@ -103,7 +104,9 @@ void gettingNextBlockItemThrowsParsingException(@TempDir Path tempDir) throws IO "BlockAsFileBlockStreamManager", 10_000, 36, - ".blk"); + ".blk", + StreamingMode.CONSTANT_RATE, + 1000); BlockAsFileLargeDataSets blockStreamManager = new BlockAsFileLargeDataSets(blockStreamConfig); @@ -123,7 +126,9 @@ private BlockAsFileLargeDataSets getBlockAsFileLargeDatasetsBlockStreamManager( "BlockAsFileBlockStreamManager", 10_000, 36, - ".blk"); + ".blk", + StreamingMode.CONSTANT_RATE, + 1000); return new BlockAsFileLargeDataSets(blockStreamConfig); }