From 0d9138a657b6406fbd377948fb478c647a470db8 Mon Sep 17 00:00:00 2001 From: Jack Frain Date: Mon, 10 Jun 2024 16:25:31 +0100 Subject: [PATCH] perf(cu): optimize between cron messages function #774 --- servers/cu/src/domain/lib/loadMessages.js | 178 ++++++++++------- .../cu/src/domain/lib/loadMessages.test.js | 187 ++++++++++++++++-- 2 files changed, 280 insertions(+), 85 deletions(-) diff --git a/servers/cu/src/domain/lib/loadMessages.js b/servers/cu/src/domain/lib/loadMessages.js index 1012ae17a..0a59853f8 100644 --- a/servers/cu/src/domain/lib/loadMessages.js +++ b/servers/cu/src/domain/lib/loadMessages.js @@ -241,96 +241,132 @@ export function cronMessagesBetweenWith ({ const leftBlock = left.block const rightBlock = right.block const leftOrdinate = left.ordinate - /** - * Grab the blocks that are between the left and right boundary, - * according to their timestamp - */ + * Grab the blocks that are between the left and right boundary, + * according to their timestamp + */ const blocksInRange = blocksMeta.filter((b) => b.timestamp > leftBlock.timestamp && - b.timestamp < rightBlock.timestamp + b.timestamp < rightBlock.timestamp ) /** - * Start at the left block timestamp, incrementing one second per iteration. - * - if our current time gets up to the next block, then check for any block-based cron messages to generate - * - Check for any timebased crons to generate on each tick - * - * The curBlock always starts at the leftBlock, then increments as we tick + * Find the GCD of the cron intervals for time-based crons. + * We will iterate through time at + * this interval, as all time crons must be on this interval + */ + const timeGCD = gcdOfArr(timeBased.map((timeCron) => timeCron.value)) + + let timeInterval = timeGCD * 1000 + let nextTimeInterval = null + let currentBlock = leftBlock + /** + * Find the initial offset from the left block to the origin block. + * This allows us to sync up to the origin block initially. + * For example, if the origin timestamp is 1000, the timeInterval is 100, + * and the left block timestamp is 1050, we will start from 1100 as it is + * "in sync with" or "on the interval of" the origin timestamp (1000). + */ + console.log({ timeInterval, max: Number.MAX_SAFE_INTEGER }) + const initialOffset = timeGCD === Number.MAX_SAFE_INTEGER ? 0 : (timeInterval - ((leftBlock.timestamp - originBlock.timestamp) % timeInterval)) % timeInterval + /** + * Iterate through time. Use our GCD as the interval. + * If a cron message is found at the time, yield it */ - let curBlock = leftBlock - for (let curTimestamp = leftBlock.timestamp; curTimestamp < rightBlock.timestamp; curTimestamp += 1000) { + console.log({ timeInterval, initialOffset, l: leftBlock.timestamp }) + for (let t = leftBlock.timestamp + initialOffset; t < rightBlock.timestamp; t += timeInterval) { /** - * We've ticked up to our next block - * so check if it's on a Cron Interval - * - * This way, Block-based messages will always be pushed onto the stream of messages - * before time-based messages + * Check if we need to "resync" with the origin timestamp. + * This can occur if we break from our standard interval due to + * a block timestamp. If this is the case, we will have previously + * set the interval necessary to "resync" in nextTimeInterval */ - const nextBlock = blocksInRange[0] - if (nextBlock && toSeconds(curTimestamp) >= toSeconds(nextBlock.timestamp)) { - /** - * Make sure to remove the block from our range, - * since we've ticked past it, - * - * and save it as the new current block - */ - curBlock = blocksInRange.shift() - - for (let i = 0; i < blockBased.length; i++) { - const cron = blockBased[i] - - if (isBlockOnCron({ height: curBlock.height, originHeight: originBlock.height, cron })) { - yield { - cron: `${i}-${cron.interval}`, - ordinate: leftOrdinate, - name: `Cron Message ${curBlock.timestamp},${leftOrdinate},${i}-${cron.interval}`, - message: { - Owner: processOwner, - Target: processId, - From: processOwner, - Tags: cron.message.tags, - Timestamp: curBlock.timestamp, - 'Block-Height': curBlock.height, - Cron: true - }, - AoGlobal: { - Process: { Id: processId, Owner: processOwner, Tags: processTags }, - Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags } - } - } + if (nextTimeInterval) { + timeInterval = nextTimeInterval + nextTimeInterval = null + } else { + timeInterval = timeGCD + } + /** + * We need to check if we are currently on (or past) a block timestamp. + * If we are, we need to check this block against all block crons + * for cron messages. + */ + let nextBlock = blocksInRange[0] + if (nextBlock && toSeconds(t) >= toSeconds(nextBlock.timestamp)) { + // We shift the blocksInRange to ensure we don't double check a block + currentBlock = blocksInRange.shift() + const offset = currentBlock.height - originBlock.height + // Iterate through block crons + for (let blockIndex = 0; blockIndex < blockBased.length; blockIndex++) { + const blockCron = blockBased[blockIndex] + // If we find a match, yield our cron message + if (offset !== 0 && offset % blockCron.value === 0 && currentBlock.timestamp) { + yield generateCronMessage({ i: blockIndex, cron: blockCron, leftOrdinate, timestamp: currentBlock.timestamp, processOwner, processId, height: currentBlock.height, processTags, moduleId, moduleOwner, moduleTags }) } } } + // Re-initialize our block as the first in list + nextBlock = blocksInRange[0] + /** + * If the block occurs before our next interval check, we want + * to add a "stop" in our iteration at the block timestamp. + * We also set the offset value to "resync" in our nextBlockTimestamp. + */ + if (nextBlock && nextBlock.timestamp - t < timeInterval) { + nextTimeInterval = timeInterval - (nextBlock.timestamp - t) + timeInterval = nextBlock.timestamp - t + } - for (let i = 0; i < timeBased.length; i++) { - const cron = timeBased[i] - - if (isTimestampOnCron({ timestamp: curTimestamp, originTimestamp: originBlock.timestamp, cron })) { - yield { - cron: `${i}-${cron.interval}`, - ordinate: leftOrdinate, - name: `Cron Message ${curTimestamp},${leftOrdinate},${i}-${cron.interval}`, - message: { - Owner: processOwner, - Target: processId, - From: processOwner, - Tags: cron.message.tags, - Timestamp: curTimestamp, - 'Block-Height': curBlock.height, - Cron: true - }, - AoGlobal: { - Process: { Id: processId, Owner: processOwner, Tags: processTags }, - Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags } - } - } + /** + * Finally, iterate through all time-based crons. + * If we find a match, yield our cron message. + */ + const offset = t - originBlock.timestamp + for (let timeIndex = 0; timeIndex < timeBased.length; timeIndex++) { + const timeCron = timeBased[timeIndex] + if (offset !== 0 && offset % (timeCron.value * 1000) === 0) { + yield generateCronMessage({ i: timeIndex, cron: timeCron, leftOrdinate, timestamp: t, processOwner, processId, height: currentBlock.height, processTags, moduleId, moduleOwner, moduleTags }) } } } } } +function gcd (a, b) { + if (b === 0) { + return a + } + return gcd(b, a % b) +} +function gcdOfArr (a) { + // If we have no time-based crons, we want our interval to be the maximum + // so we can simply iterate from block to block. + if (!a.length) return Number.MAX_SAFE_INTEGER + return a.reduce(gcd) +} + +function generateCronMessage ({ i, cron, leftOrdinate, timestamp, processOwner, processId, height, processTags, moduleId, moduleOwner, moduleTags }) { + return { + cron: `${i}-${cron.interval}`, + ordinate: leftOrdinate, + name: `Cron Message ${timestamp},${leftOrdinate},${i}-${cron.interval}`, + message: { + Owner: processOwner, + Target: processId, + From: processOwner, + Tags: cron.message.tags, + Timestamp: timestamp, + 'Block-Height': height, + Cron: true + }, + AoGlobal: { + Process: { Id: processId, Owner: processOwner, Tags: processTags }, + Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags } + } + } +} + function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) { findBlocks = fromPromise(findBlocksSchema.implement(findBlocks)) saveBlocks = fromPromise(saveBlocksSchema.implement(saveBlocks)) diff --git a/servers/cu/src/domain/lib/loadMessages.test.js b/servers/cu/src/domain/lib/loadMessages.test.js index 7cba34902..623ff1813 100644 --- a/servers/cu/src/domain/lib/loadMessages.test.js +++ b/servers/cu/src/domain/lib/loadMessages.test.js @@ -254,8 +254,10 @@ describe('loadMessages', () => { * * SO in order for the test to be accurate, we truncate to floor second, * then convert back to milliseconds, so that we can compare on the second. + * + * Switched to a default value so that actual results can be compared */ - const nowSecond = Math.floor(new Date().getTime() / 1000) * 1000 + const nowSecond = 1718033463000 const originHeight = 125000 const originTime = nowSecond - ms('30d') @@ -300,6 +302,34 @@ describe('loadMessages', () => { message: mockCronMessage } ] + const blockCrons = [ + { + interval: '2-blocks', + unit: 'blocks', + value: 2, + message: mockCronMessage + }, + { + interval: '2-blocks', + unit: 'blocks', + value: 2, + message: mockCronMessage + } + ] + const timeCrons = [ + { + interval: '10-minutes', + unit: 'seconds', + value: ms('10m') / 1000, + message: mockCronMessage + }, + { + interval: '15-minutes', + unit: 'seconds', + value: ms('15m') / 1000, + message: mockCronMessage + } + ] /** * blockRange of 5 blocks and 85 minutes @@ -386,7 +416,8 @@ describe('loadMessages', () => { ] const cronMessagesBetween = cronMessagesBetweenWith({ logger: () => {}, processId, owner, originBlock, crons, blocksMeta }) - + const cronMessagesBetweenBlockCrons = cronMessagesBetweenWith({ logger: () => {}, processId, owner, originBlock, crons: blockCrons, blocksMeta }) + const cronMessagesBetweenTimeCrons = cronMessagesBetweenWith({ logger: () => {}, processId, owner, originBlock, crons: timeCrons, blocksMeta }) const genCronMessages = cronMessagesBetween( // left { @@ -408,23 +439,151 @@ describe('loadMessages', () => { // message } ) + const genCronMessagesBlockCrons = cronMessagesBetweenBlockCrons( + // left + { + block: { + height: originHeight + 11, + timestamp: scheduledMessagesStartTime + }, + ordinate: 1 + // AoGlobal, + // message + }, + // right + { + block: { + height: originHeight + 16, + timestamp: scheduledMessagesStartTime + ms('16m') + ms('29m') + ms('15m') + ms('10m') + } + // AoGlobal, + // message + } + ) + const genCronMessagesTimeCrons = cronMessagesBetweenTimeCrons( + // left + { + block: { + height: originHeight + 11, + timestamp: scheduledMessagesStartTime + }, + ordinate: 1 + // AoGlobal, + // message + }, + // right + { + block: { + height: originHeight + 16, + timestamp: scheduledMessagesStartTime + ms('16m') + ms('29m') + ms('15m') + ms('10m') + } + // AoGlobal, + // message + } + ) - const cronMessages = [] - before(async () => { - for await (const cron of genCronMessages) cronMessages.push(cron) + // These expected cron messages reflect the order the crons should be yielded in. + // They are checked to ensure that the algorithm is yielding crons chronologically. + const expectedCronMessages = { + '1715442663000,0-10-minutes': 1, + '1715443263000,0-10-minutes': 1, + '1715443263000,1-15-minutes': 1, + '1715443563000,0-2-blocks': 1, + '1715443563000,1-2-blocks': 1, + '1715443863000,0-10-minutes': 1, + '1715444163000,1-15-minutes': 1, + '1715444463000,0-10-minutes': 1, + '1715445063000,0-10-minutes': 1, + '1715445063000,1-15-minutes': 1, + '1715445663000,0-10-minutes': 1, + '1715445963000,1-15-minutes': 1, + '1715446263000,0-2-blocks': 1, + '1715446263000,1-2-blocks': 1, + '1715446263000,0-10-minutes': 1 + } + + const expectedBlockCronMessages = { + '1715443563000,0-2-blocks': 1, + '1715443563000,1-2-blocks': 1, + '1715446263000,0-2-blocks': 1, + '1715446263000,1-2-blocks': 1 + } + + const expectedTimeCronMessages = { + '1715442663000,0-10-minutes': 1, + '1715443263000,0-10-minutes': 1, + '1715443263000,1-15-minutes': 1, + '1715443863000,0-10-minutes': 1, + '1715444163000,1-15-minutes': 1, + '1715444463000,0-10-minutes': 1, + '1715445063000,0-10-minutes': 1, + '1715445063000,1-15-minutes': 1, + '1715445663000,0-10-minutes': 1, + '1715445963000,1-15-minutes': 1, + '1715446263000,0-10-minutes': 1 + } + + describe('block and time crons', () => { + const cronMessages = [] + before(async () => { + for await (const cron of genCronMessages) cronMessages.push(cron) + }) + + test('should create cron message according to the crons', async () => { + console.log(countBy((node) => `${node.message.Timestamp},${node.cron}`, cronMessages)) + // Two actual messages + 15 cron messages between them + assert.equal(cronMessages.length + 2, 17) + assert.deepStrictEqual(countBy((node) => `${node.message.Timestamp},${node.cron}`, cronMessages), expectedCronMessages) + }) + + test('should create a unique cron identifier for each generated message', async () => { + assert.equal( + cronMessages.length, + uniqBy((node) => `${node.message.Timestamp},${node.ordinate},${node.cron}`, cronMessages).length + ) + }) }) - test('should create cron message according to the crons', async () => { - console.log(countBy((node) => `${node.message.Timestamp},${node.cron}`, cronMessages)) - // Two actual messages + 15 cron messages between them - assert.equal(cronMessages.length + 2, 17) + describe('block crons only', () => { + const cronMessages = [] + before(async () => { + for await (const cron of genCronMessagesBlockCrons) cronMessages.push(cron) + }) + + test('should create cron message according to the crons', async () => { + console.log(countBy((node) => `${node.message.Timestamp},${node.cron}`, cronMessages)) + // Two actual messages + 15 cron messages between them + assert.equal(cronMessages.length + 2, 6) + assert.deepStrictEqual(countBy((node) => `${node.message.Timestamp},${node.cron}`, cronMessages), expectedBlockCronMessages) + }) + + test('should create a unique cron identifier for each generated message', async () => { + assert.equal( + cronMessages.length, + uniqBy((node) => `${node.message.Timestamp},${node.ordinate},${node.cron}`, cronMessages).length + ) + }) }) - test('should create a unique cron identifier for each generated message', async () => { - assert.equal( - cronMessages.length, - uniqBy((node) => `${node.message.Timestamp},${node.ordinate},${node.cron}`, cronMessages).length - ) + describe('time crons only', () => { + const cronMessages = [] + before(async () => { + for await (const cron of genCronMessagesTimeCrons) cronMessages.push(cron) + }) + + test('should create cron message according to the crons', async () => { + console.log(countBy((node) => `${node.message.Timestamp},${node.cron}`, cronMessages)) + // Two actual messages + 15 cron messages between them + assert.equal(cronMessages.length + 2, 13) + assert.deepStrictEqual(countBy((node) => `${node.message.Timestamp},${node.cron}`, cronMessages), expectedTimeCronMessages) + }) + + test('should create a unique cron identifier for each generated message', async () => { + assert.equal( + cronMessages.length, + uniqBy((node) => `${node.message.Timestamp},${node.ordinate},${node.cron}`, cronMessages).length + ) + }) }) })