Skip to content

Commit

Permalink
feat: added MILLIS_PER_BLOCK Streaming mode to the simulator (#248)
Browse files Browse the repository at this point in the history
Signed-off-by: Alfredo Gutierrez <alfredo@swirldslabs.com>
  • Loading branch information
AlfredoG87 authored Oct 16, 2024
1 parent 1e0288c commit 611e91b
Show file tree
Hide file tree
Showing 11 changed files with 308 additions and 36 deletions.
20 changes: 11 additions & 9 deletions simulator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ There are 2 configuration sets:
### BlockStreamConfig
Uses the prefix `blockStream` so all properties should start with `blockStream.`

| Key | Description | Default Value |
|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------|
| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` |
| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` |
| `delayBetweenBlockItems` | The delay between each block item in nanoseconds | `1_500_000` |
| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` |
| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` |
| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 |
| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` |
| Key | Description | Default Value |
|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------|
| `generationMode` | The desired generation Mode to use, it can only be `DIR` or `AD_HOC` | `DIR` |
| `folderRootPath` | If the generationMode is DIR this will be used as the source of the recording to stream to the Block-Node | `` |
| `delayBetweenBlockItems` | The delay between each block item in nanoseconds, only applicable when streamingMode=CONSTANT_RATE | `1_500_000` |
| `managerImplementation` | The desired implementation of the BlockStreamManager to use, it can only be `BlockAsDirBlockStreamManager`, `BlockAsFileBlockStreamManager` or `BlockAsFileLargeDataSets` | `BlockAsFileBlockStreamManager` |
| `maxBlockItemsToStream` | exit condition for the simulator and the circular implementations such as `BlockAsDir` or `BlockAsFile` implementations | `10_000` |
| `paddedLength` | on the `BlockAsFileLargeDataSets` implementation, the length of the padded left zeroes `000001.blk.gz` | 36 |
| `fileExtension` | on the `BlockAsFileLargeDataSets` implementation, the extension of the files to be streamed | `.blk.gz` |
| `streamingMode` | can either be `CONSTANT_RATE` or `MILLIS_PER_BLOCK`, if `CONSTANT_RATE` | `CONSTANT_RATE` |
| `millisecondsPerBlock` | if streamingMode is `MILLIS_PER_BLOCK` this will be the time to wait between blocks in milliseconds | `1_000` |

### GrpcConfig
Uses the prefix `grpc` so all properties should start with `grpc.`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package com.hedera.block.simulator;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.config.types.StreamingMode;
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
import com.hedera.block.simulator.generator.BlockStreamManager;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.hapi.block.stream.Block;
import com.hedera.hapi.block.stream.BlockItem;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -33,13 +35,13 @@ public class BlockStreamSimulatorApp {
private static final System.Logger LOGGER =
System.getLogger(BlockStreamSimulatorApp.class.getName());

Configuration configuration;
BlockStreamManager blockStreamManager;
PublishStreamGrpcClient publishStreamGrpcClient;
BlockStreamConfig blockStreamConfig;
private final BlockStreamManager blockStreamManager;
private final PublishStreamGrpcClient publishStreamGrpcClient;
private final BlockStreamConfig blockStreamConfig;
private final StreamingMode streamingMode;

private final int delayBetweenBlockItems;

private final int millisecondsPerBlock;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

/**
Expand All @@ -54,12 +56,13 @@ public BlockStreamSimulatorApp(
@NonNull Configuration configuration,
@NonNull BlockStreamManager blockStreamManager,
@NonNull PublishStreamGrpcClient publishStreamGrpcClient) {
this.configuration = configuration;
this.blockStreamManager = blockStreamManager;
this.publishStreamGrpcClient = publishStreamGrpcClient;

blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class);

streamingMode = blockStreamConfig.streamingMode();
millisecondsPerBlock = blockStreamConfig.millisecondsPerBlock();
delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems();
}

Expand All @@ -71,12 +74,48 @@ public BlockStreamSimulatorApp(
* @throws IOException if an I/O error occurs
*/
public void start() throws InterruptedException, BlockSimulatorParsingException, IOException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;

isRunning.set(true);
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started");

if (streamingMode == StreamingMode.MILLIS_PER_BLOCK) {
millisPerBlockStreaming();
} else {
constantRateStreaming();
}

LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void millisPerBlockStreaming()
throws IOException, InterruptedException, BlockSimulatorParsingException {

final long secondsPerBlockNanos = millisecondsPerBlock * 1_000_000L;

Block nextBlock = blockStreamManager.getNextBlock();
while (nextBlock != null) {
long startTime = System.nanoTime();
publishStreamGrpcClient.streamBlock(nextBlock);
long elapsedTime = System.nanoTime() - startTime;
long timeToDelay = secondsPerBlockNanos - elapsedTime;
if (timeToDelay > 0) {
Thread.sleep(timeToDelay / 1_000_000, (int) (timeToDelay % 1_000_000));
} else {
LOGGER.log(
System.Logger.Level.WARNING,
"Block Server is running behind, Streaming took longer than max expected: "
+ millisecondsPerBlock
+ " milliseconds");
}
nextBlock = blockStreamManager.getNextBlock();
}
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

private void constantRateStreaming()
throws InterruptedException, IOException, BlockSimulatorParsingException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;
boolean streamBlockItem = true;
int blockItemsStreamed = 0;

Expand Down Expand Up @@ -104,8 +143,6 @@ public void start() throws InterruptedException, BlockSimulatorParsingException,
streamBlockItem = false;
}
}

LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@

package com.hedera.block.simulator;

/** The Constants class defines the constants for the block simulator. */
public class Constants {

// The file extension for block files.
/** Constructor to prevent instantiation. this is only a utility class */
private Constants() {}

/** The file extension for block files. */
public static final String RECORD_EXTENSION = "blk";
// postfix for gzipped files

/** postfix for gzip files */
public static final String GZ_EXTENSION = ".gz";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hedera.block.simulator.config.data;

import com.hedera.block.simulator.config.types.GenerationMode;
import com.hedera.block.simulator.config.types.StreamingMode;
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import java.nio.file.Files;
Expand All @@ -33,6 +34,8 @@
* @param maxBlockItemsToStream the maximum number of block items to stream
* @param paddedLength the padded length of 0 the block file format
* @param fileExtension the file extension of the block file format
* @param streamingMode the mode of streaming for the block stream
* @param millisecondsPerBlock the milliseconds per block
*/
@ConfigData("blockStream")
public record BlockStreamConfig(
Expand All @@ -43,7 +46,9 @@ public record BlockStreamConfig(
String managerImplementation,
@ConfigProperty(defaultValue = "10_000") int maxBlockItemsToStream,
@ConfigProperty(defaultValue = "36") int paddedLength,
@ConfigProperty(defaultValue = ".blk.gz") String fileExtension) {
@ConfigProperty(defaultValue = ".blk.gz") String fileExtension,
@ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
@ConfigProperty(defaultValue = "1000") int millisecondsPerBlock) {

/**
* Constructor to set the default root path if not provided, it will be set to the data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.block.simulator.config.types;

/** The StreamingMode enum defines the different modes for streaming blocks. */
public enum StreamingMode {

/** It will wait X Nanos between each block. */
CONSTANT_RATE,

/** It will attempt to send a block each X Millis. */
MILLIS_PER_BLOCK;

/**
* Converts a string to a StreamingMode.
*
* @param mode the string to convert
* @return the StreamingMode
*/
public static StreamingMode fromString(String mode) {
return switch (mode) {
case "CONSTANT_RATE" -> CONSTANT_RATE;
case "MILLIS_PER_BLOCK" -> MILLIS_PER_BLOCK;
default -> throw new IllegalArgumentException("Invalid mode: " + mode);
};
}
}
Loading

0 comments on commit 611e91b

Please sign in to comment.