diff --git a/packages/caliper-core/lib/worker/caliper-worker.js b/packages/caliper-core/lib/worker/caliper-worker.js index 10d34a2d6..d43e820b0 100644 --- a/packages/caliper-core/lib/worker/caliper-worker.js +++ b/packages/caliper-core/lib/worker/caliper-worker.js @@ -84,9 +84,10 @@ class CaliperWorker { * @param {Object} rateController rate controller object * @async */ - async runFixedNumber(workloadModule, number, rateController) { + async _runFixedNumber(workloadModule, number, rateController) { const stats = this.internalTxObserver.getCurrentStatistics(); let error = undefined; + while (stats.getTotalSubmittedTx() < number && !error) { await rateController.applyRateControl(); @@ -107,6 +108,7 @@ class CaliperWorker { await CaliperWorker._waitForTxsToFinish(stats); } + /** * Perform test with specified test duration * @param {object} workloadModule The user test module. @@ -114,11 +116,11 @@ class CaliperWorker { * @param {Object} rateController rate controller object * @async */ - async runDuration(workloadModule, duration, rateController) { + async _runDuration(workloadModule, duration, rateController) { const stats = this.internalTxObserver.getCurrentStatistics(); let startTime = stats.getRoundStartTime(); let error = undefined; - while ((Date.now() - startTime) < (duration * 1000) && !error) { + while ((Date.now() - startTime) < (duration * 1000) && !error) { await rateController.applyRateControl(); // If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises, @@ -161,7 +163,7 @@ class CaliperWorker { await this.workloadModule.initializeWorkloadModule(this.workerIndex, prepareTestMessage.getWorkersNumber(), roundIndex, prepareTestMessage.getWorkloadSpec().arguments, this.connector, context); await CaliperUtils.sleep(this.txUpdateTime); } catch (err) { - Logger.info(`Worker [${this.workerIndex}] encountered an error during prepare test phase for round ${roundIndex}: ${(err.stack ? err.stack : err)}`); + Logger.warn(`Worker [${this.workerIndex}] encountered an error during prepare test phase for round ${roundIndex}: ${(err.stack ? err.stack : err)}`); throw err; } finally { await this.connector.releaseContext(context); @@ -201,12 +203,11 @@ class CaliperWorker { if (testMessage.getRoundDuration()) { const duration = testMessage.getRoundDuration(); // duration in seconds - await this.runDuration(this.workloadModule, duration, rateController); + await this._runDuration(this.workloadModule, duration, rateController); } else { const number = testMessage.getNumberOfTxs(); - await this.runFixedNumber(this.workloadModule, number, rateController); + await this._runFixedNumber(this.workloadModule, number, rateController); } - Logger.debug(`Worker #${this.workerIndex} finished round #${roundIndex}`, this.internalTxObserver.getCurrentStatistics().getCumulativeTxStatistics()); return this.internalTxObserver.getCurrentStatistics(); } catch (err) { diff --git a/packages/caliper-core/test/worker/caliper-worker.js b/packages/caliper-core/test/worker/caliper-worker.js index ab989b847..990240fda 100644 --- a/packages/caliper-core/test/worker/caliper-worker.js +++ b/packages/caliper-core/test/worker/caliper-worker.js @@ -35,6 +35,7 @@ mockStats.getTotalSubmittedTx.onFirstCall().returns(0); mockStats.getTotalSubmittedTx.onSecondCall().returns(1); const deactivateMethod = sinon.stub(); let logwarningMethod = sinon.stub(); +let logerrorMethod = sinon.stub(); class MockCaliperUtils { static resolvePath(path) { @@ -54,7 +55,7 @@ class MockCaliperUtils { static getLogger() { return { debug: sinon.stub(), - error: sinon.stub(), + error: logerrorMethod, warn: logwarningMethod, info: sinon.stub() }; @@ -72,6 +73,8 @@ class MockInternalTxObserver { class MockTxObserverDispatch { activate() {} } + + MockTxObserverDispatch.prototype.deactivate = deactivateMethod; mockery.enable({ @@ -84,13 +87,14 @@ mockery.registerMock('./tx-observers/tx-observer-dispatch', MockTxObserverDispat const loggerSandbox = sinon.createSandbox(); const CaliperUtils = require('../../lib/common/utils/caliper-utils'); -loggerSandbox.replace(CaliperUtils, "getLogger", MockCaliperUtils.getLogger); +loggerSandbox.replace(CaliperUtils, 'getLogger', MockCaliperUtils.getLogger); const CaliperWorker = require('../../lib/worker/caliper-worker'); describe('Caliper worker', () => { after(() => { loggerSandbox.restore(); + sinon.reset(); }); describe('When executing a round', () => { @@ -98,19 +102,13 @@ describe('Caliper worker', () => { const sandbox = sinon.createSandbox(); beforeEach(() => { - logwarningMethod.reset(); - mockRate.end.reset(); - mockWorkload.cleanupWorkloadModule.reset(); - mockWorkload.submitTransaction.reset(); - mockStats.getTotalSubmittedTx.resetHistory(); - deactivateMethod.reset(); - mockConnector = sinon.createStubInstance(ConnectorInterface); mockConnector.getContext.resolves(1); mockMessenger = sinon.createStubInstance(MessengerInterface); mockTestMessage = sinon.createStubInstance(TestMessage); mockTestMessage.getRateControlSpec.returns({type: '1zero-rate'}); mockTestMessage.getWorkloadSpec.returns({module: 'test/workload'}); + mockWorkload.initializeWorkloadModule.resolves() mockTestMessage.getNumberOfTxs.returns(1); sandbox.replace(CaliperUtils, 'resolvePath', MockCaliperUtils.resolvePath); sandbox.replace(CaliperUtils, 'loadModuleFunction', MockCaliperUtils.loadModuleFunction); @@ -119,7 +117,15 @@ describe('Caliper worker', () => { afterEach(() => { sandbox.restore(); - }) + logwarningMethod.reset(); + mockRate.end.reset(); + mockWorkload.cleanupWorkloadModule.reset(); + mockWorkload.submitTransaction.reset(); + mockStats.getTotalSubmittedTx.resetHistory(); + deactivateMethod.reset(); + mockWorkload.initializeWorkloadModule.resetHistory(); + logerrorMethod.resetHistory(); + }); const validateCallsAndWarnings = (warnings) => { sinon.assert.calledOnce(mockWorkload.submitTransaction); @@ -145,7 +151,7 @@ describe('Caliper worker', () => { await worker.prepareTest(mockTestMessage); mockWorkload.submitTransaction.rejects(new Error('failure')); - await worker.executeRound(mockTestMessage).should.be.rejectedWith(/failure/); + await worker.executeRound(mockTestMessage).should.be.rejected; validateCallsAndWarnings(0); }); @@ -161,5 +167,303 @@ describe('Caliper worker', () => { await worker.executeRound(mockTestMessage); validateCallsAndWarnings(4); }); + + [5, 10].forEach(numberOfTxs => { + it(`should run ${numberOfTxs} transactions and wait for completion when no errors occur`, async () => { + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + await worker.prepareTest(mockTestMessage); + + mockTestMessage.getNumberOfTxs.returns(numberOfTxs); + mockTestMessage.getRoundDuration.returns(null); + + mockWorkload.submitTransaction.resetHistory(); + mockStats.getTotalSubmittedTx.resetHistory(); + mockStats.getTotalFinishedTx.resetHistory(); + mockStats.getCumulativeTxStatistics.resetHistory(); + + let submittedTx = 0; + let finishedTx = 0; + + // Stub the methods + mockStats.getTotalSubmittedTx.callsFake(() => submittedTx); + mockStats.getTotalFinishedTx.callsFake(() => finishedTx); + mockStats.getCumulativeTxStatistics.returns({}); + + worker.internalTxObserver.getCurrentStatistics = () => mockStats; + + mockWorkload.submitTransaction.callsFake(async () => { + submittedTx += 1; + finishedTx += 1; + return Promise.resolve(); + }); + + await worker.executeRound(mockTestMessage); + + sinon.assert.callCount(mockWorkload.submitTransaction, numberOfTxs); + sinon.assert.calledOnce(deactivateMethod); + sinon.assert.calledOnce(mockRate.end); + sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule); + sinon.assert.called(mockConnector.releaseContext); + }); + }); + + [5, 10].forEach(numberOfTxs => { + it(`should stop the round when an error occurs while running ${numberOfTxs} transactions`, async () => { + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + await worker.prepareTest(mockTestMessage); + + mockTestMessage.getNumberOfTxs.returns(numberOfTxs); + mockTestMessage.getRoundDuration.returns(null); + + let submittedTx = 0; + + mockWorkload.submitTransaction.resetHistory(); + + // Stub the methods + mockStats.getTotalSubmittedTx.callsFake(() => submittedTx); + mockStats.getTotalFinishedTx.callsFake(() => submittedTx) + + + mockWorkload.submitTransaction.callsFake(async () => { + submittedTx += 1; + if (submittedTx === 3) { + return Promise.reject(new Error('Transaction submission failed')); + } + return Promise.resolve(); + }); + + await worker.executeRound(mockTestMessage).should.be.rejectedWith('Transaction submission failed'); + + // Ensure transactions stop after the error and resources are cleaned up + sinon.assert.callCount(mockWorkload.submitTransaction, 4); + sinon.assert.calledOnce(deactivateMethod); + sinon.assert.calledOnce(mockRate.end); + sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule); + sinon.assert.called(mockConnector.releaseContext); + + mockTestMessage.getRoundDuration.reset(); + mockWorkload.submitTransaction.reset(); + mockStats.getTotalFinishedTx.reset(); + }); + }); + + it('should execute the round for a specified duration', async function() { + this.timeout(5000); + + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + await worker.prepareTest(mockTestMessage); + + mockWorkload.submitTransaction.resolves(); + + mockTestMessage.getRoundDuration.returns(1); + mockTestMessage.getNumberOfTxs.returns(null); + + const startTime = Date.now(); + const mockStats = { + getRoundStartTime: () => startTime, + getTotalSubmittedTx: sinon.stub(), + getTotalFinishedTx: sinon.stub(), + getCumulativeTxStatistics: sinon.stub().returns({}) + }; + worker.internalTxObserver.getCurrentStatistics = () => mockStats; + + await worker.executeRound(mockTestMessage); + + const endTime = Date.now(); + const elapsedTime = endTime - startTime; + + elapsedTime.should.be.within(900, 1200); + + const callCount = mockWorkload.submitTransaction.callCount; + callCount.should.be.greaterThan(0); + + sinon.assert.calledOnce(deactivateMethod); + sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule); + sinon.assert.called(mockConnector.releaseContext); + }); + + it('should stop the round and perform cleanup when an error occurs during a duration-based round', async function() { + this.timeout(5000); // Allow enough time for the test to run + + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + await worker.prepareTest(mockTestMessage); + + mockTestMessage.getRoundDuration.returns(1); // Set duration to 1 second + mockTestMessage.getNumberOfTxs.returns(null); // Ensure we're using duration, not a fixed number + + const startTime = Date.now(); + + // Mock statistics object to simulate submitted and finished transactions + const mockStats = { + getRoundStartTime: () => startTime, + getTotalSubmittedTx: sinon.stub(), + getTotalFinishedTx: sinon.stub(), + getCumulativeTxStatistics: sinon.stub().returns({}) + }; + + // Inject mock stats into the worker + worker.internalTxObserver.getCurrentStatistics = () => mockStats; + + let submittedTx = 0; + + // Simulate an error after 2 successful transaction submissions + mockWorkload.submitTransaction.callsFake(async () => { + submittedTx += 1; + if (submittedTx === 2) { // After 2 transactions, simulate an error + throw new Error('Transaction error during duration round'); + } + return Promise.resolve(); // Successful submission before the error + }); + + // Expect the round to be rejected with the error that occurs during the transaction submission + await worker.executeRound(mockTestMessage).should.be.rejectedWith('Transaction error during duration round'); + + // Ensure that 2 transactions were submitted before the error was thrown + sinon.assert.callCount(mockWorkload.submitTransaction, 2); + + // Ensure that cleanup operations were performed despite the error + sinon.assert.calledOnce(deactivateMethod); + sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule); + sinon.assert.called(mockConnector.releaseContext); + }); + + it('should log a warning and propagate the error when an error occurs during prepareTest', async () => { + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + const errorMessage = 'Initialization error'; + mockConnector.getContext.rejects(new Error(errorMessage)); + mockTestMessage.getRoundIndex.returns(1); + mockTestMessage.getWorkloadSpec.returns({ module: 'test/workload' }); + mockTestMessage.getWorkerArguments.returns([]); + + await worker.prepareTest(mockTestMessage).should.be.rejectedWith(errorMessage); + + sinon.assert.calledOnce(mockConnector.getContext); + sinon.assert.calledOnce(logwarningMethod); + }); + + it('should call initializeWorkloadModule and releaseContext during successful prepareTest execution', async () => { + // Arrange + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + + // Set up the mocks to resolve successfully + mockConnector.getContext.resolves(1); + mockConnector.releaseContext.resolves(); + mockWorkload.initializeWorkloadModule.resolves(); + + // Set up test message + mockTestMessage.getRoundIndex.returns(1); + mockTestMessage.getWorkloadSpec.returns({ module: 'test/workload', arguments: { arg1: 'value1' } }); + mockTestMessage.getWorkersNumber.returns(3); + mockTestMessage.getWorkerArguments.returns([]); + + // Act + await worker.prepareTest(mockTestMessage); + + // Assert + // Ensure getContext was called once with correct arguments + sinon.assert.calledOnce(mockConnector.getContext); + sinon.assert.calledWithExactly(mockConnector.getContext, 1, []); + + // Ensure initializeWorkloadModule was called once with correct arguments + sinon.assert.calledOnce(mockWorkload.initializeWorkloadModule); + sinon.assert.calledWithExactly( + mockWorkload.initializeWorkloadModule, + 1, // workerIndex + 3, // totalWorkers + 1, // roundIndex + { arg1: 'value1' }, // workload arguments + mockConnector, // connector + 1 // context + ); + + // Ensure releaseContext was called once with the correct context + sinon.assert.calledOnce(mockConnector.releaseContext); + sinon.assert.calledWithExactly(mockConnector.releaseContext, 1); + + // Ensure no warnings or errors were logged + sinon.assert.notCalled(logwarningMethod); + sinon.assert.notCalled(logerrorMethod); + + // Reset mocks if necessary + mockTestMessage.getWorkloadSpec.reset(); + }); + + it('should handle errors during initializeWorkloadModule and ensure releaseContext is called', async () => { + // Arrange + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + const errorMessage = 'Workload module initialization error'; + + // Set up the mocks + mockConnector.getContext.resolves(1); + mockConnector.releaseContext.resolves(); + mockWorkload.initializeWorkloadModule.rejects(new Error(errorMessage)); + + // Set up test message + mockTestMessage.getRoundIndex.returns(1); + mockTestMessage.getWorkloadSpec.returns({ module: 'test/workload', arguments: {} }); + mockTestMessage.getWorkersNumber.returns(1); + mockTestMessage.getWorkerArguments.returns([]); + + // Act & Assert + await worker.prepareTest(mockTestMessage).should.be.rejectedWith(errorMessage); + + // Assert + // Ensure getContext was called once with correct arguments + sinon.assert.calledOnce(mockConnector.getContext); + sinon.assert.calledWithExactly(mockConnector.getContext, 1, []); + + // Ensure initializeWorkloadModule was called once + sinon.assert.calledOnce(mockWorkload.initializeWorkloadModule); + + // Ensure releaseContext was called once with the correct context + sinon.assert.calledOnce(mockConnector.releaseContext); + sinon.assert.calledWithExactly(mockConnector.releaseContext, 1); + + // Ensure a warning was logged with the correct error message + sinon.assert.calledOnce(logwarningMethod); + sinon.assert.calledWithMatch(logwarningMethod, sinon.match(errorMessage)); + // Ensure no errors were logged + sinon.assert.notCalled(logerrorMethod); + + // Reset mocks if necessary + mockTestMessage.getWorkloadSpec.reset(); + }); + + it('should not submit transactions after the duration ends', async function() { + this.timeout(5000); + + const startTime = 0; + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + await worker.prepareTest(mockTestMessage); + + const clock = sinon.useFakeTimers(); + mockWorkload.submitTransaction.resolves(); + + const mockStats = { + getRoundStartTime: () => startTime, + getTotalSubmittedTx: sinon.stub(), + getTotalFinishedTx: sinon.stub(), + getCumulativeTxStatistics: sinon.stub().returns({}) + }; + worker.internalTxObserver.getCurrentStatistics = () => mockStats; + + mockTestMessage.getRoundDuration.returns(1); + mockTestMessage.getNumberOfTxs.returns(null); + + const executePromise = worker.executeRound(mockTestMessage); + + await clock.tickAsync(1000); // Advance time by 1 second + // Yield to the event loop to allow pending microtasks to complete + await Promise.resolve(); + + const callCountAtDurationEnd = mockWorkload.submitTransaction.callCount; + + await clock.tickAsync(1000); // Advance time by another second + await executePromise; + + clock.restore(); + + sinon.assert.callCount(mockWorkload.submitTransaction, callCountAtDurationEnd); + }); }); });