Skip to content

Commit

Permalink
perf(cu): optimize between cron messages function #774
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrain99 committed Jun 12, 2024
1 parent 3a8ef59 commit 0d9138a
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 85 deletions.
178 changes: 107 additions & 71 deletions servers/cu/src/domain/lib/loadMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,96 +241,132 @@ export function cronMessagesBetweenWith ({
const leftBlock = left.block
const rightBlock = right.block
const leftOrdinate = left.ordinate

/**
* Grab the blocks that are between the left and right boundary,
* according to their timestamp
*/
* Grab the blocks that are between the left and right boundary,
* according to their timestamp
*/
const blocksInRange = blocksMeta.filter((b) =>
b.timestamp > leftBlock.timestamp &&
b.timestamp < rightBlock.timestamp
b.timestamp < rightBlock.timestamp
)

/**
* Start at the left block timestamp, incrementing one second per iteration.
* - if our current time gets up to the next block, then check for any block-based cron messages to generate
* - Check for any timebased crons to generate on each tick
*
* The curBlock always starts at the leftBlock, then increments as we tick
* Find the GCD of the cron intervals for time-based crons.
* We will iterate through time at
* this interval, as all time crons must be on this interval
*/
const timeGCD = gcdOfArr(timeBased.map((timeCron) => timeCron.value))

let timeInterval = timeGCD * 1000
let nextTimeInterval = null
let currentBlock = leftBlock
/**
* Find the initial offset from the left block to the origin block.
* This allows us to sync up to the origin block initially.
* For example, if the origin timestamp is 1000, the timeInterval is 100,
* and the left block timestamp is 1050, we will start from 1100 as it is
* "in sync with" or "on the interval of" the origin timestamp (1000).
*/
console.log({ timeInterval, max: Number.MAX_SAFE_INTEGER })
const initialOffset = timeGCD === Number.MAX_SAFE_INTEGER ? 0 : (timeInterval - ((leftBlock.timestamp - originBlock.timestamp) % timeInterval)) % timeInterval
/**
* Iterate through time. Use our GCD as the interval.
* If a cron message is found at the time, yield it
*/
let curBlock = leftBlock
for (let curTimestamp = leftBlock.timestamp; curTimestamp < rightBlock.timestamp; curTimestamp += 1000) {
console.log({ timeInterval, initialOffset, l: leftBlock.timestamp })
for (let t = leftBlock.timestamp + initialOffset; t < rightBlock.timestamp; t += timeInterval) {
/**
* We've ticked up to our next block
* so check if it's on a Cron Interval
*
* This way, Block-based messages will always be pushed onto the stream of messages
* before time-based messages
* Check if we need to "resync" with the origin timestamp.
* This can occur if we break from our standard interval due to
* a block timestamp. If this is the case, we will have previously
* set the interval necessary to "resync" in nextTimeInterval
*/
const nextBlock = blocksInRange[0]
if (nextBlock && toSeconds(curTimestamp) >= toSeconds(nextBlock.timestamp)) {
/**
* Make sure to remove the block from our range,
* since we've ticked past it,
*
* and save it as the new current block
*/
curBlock = blocksInRange.shift()

for (let i = 0; i < blockBased.length; i++) {
const cron = blockBased[i]

if (isBlockOnCron({ height: curBlock.height, originHeight: originBlock.height, cron })) {
yield {
cron: `${i}-${cron.interval}`,
ordinate: leftOrdinate,
name: `Cron Message ${curBlock.timestamp},${leftOrdinate},${i}-${cron.interval}`,
message: {
Owner: processOwner,
Target: processId,
From: processOwner,
Tags: cron.message.tags,
Timestamp: curBlock.timestamp,
'Block-Height': curBlock.height,
Cron: true
},
AoGlobal: {
Process: { Id: processId, Owner: processOwner, Tags: processTags },
Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags }
}
}
if (nextTimeInterval) {
timeInterval = nextTimeInterval
nextTimeInterval = null
} else {
timeInterval = timeGCD
}
/**
* We need to check if we are currently on (or past) a block timestamp.
* If we are, we need to check this block against all block crons
* for cron messages.
*/
let nextBlock = blocksInRange[0]
if (nextBlock && toSeconds(t) >= toSeconds(nextBlock.timestamp)) {
// We shift the blocksInRange to ensure we don't double check a block
currentBlock = blocksInRange.shift()
const offset = currentBlock.height - originBlock.height
// Iterate through block crons
for (let blockIndex = 0; blockIndex < blockBased.length; blockIndex++) {
const blockCron = blockBased[blockIndex]
// If we find a match, yield our cron message
if (offset !== 0 && offset % blockCron.value === 0 && currentBlock.timestamp) {
yield generateCronMessage({ i: blockIndex, cron: blockCron, leftOrdinate, timestamp: currentBlock.timestamp, processOwner, processId, height: currentBlock.height, processTags, moduleId, moduleOwner, moduleTags })
}
}
}
// Re-initialize our block as the first in list
nextBlock = blocksInRange[0]
/**
* If the block occurs before our next interval check, we want
* to add a "stop" in our iteration at the block timestamp.
* We also set the offset value to "resync" in our nextBlockTimestamp.
*/
if (nextBlock && nextBlock.timestamp - t < timeInterval) {
nextTimeInterval = timeInterval - (nextBlock.timestamp - t)
timeInterval = nextBlock.timestamp - t
}

for (let i = 0; i < timeBased.length; i++) {
const cron = timeBased[i]

if (isTimestampOnCron({ timestamp: curTimestamp, originTimestamp: originBlock.timestamp, cron })) {
yield {
cron: `${i}-${cron.interval}`,
ordinate: leftOrdinate,
name: `Cron Message ${curTimestamp},${leftOrdinate},${i}-${cron.interval}`,
message: {
Owner: processOwner,
Target: processId,
From: processOwner,
Tags: cron.message.tags,
Timestamp: curTimestamp,
'Block-Height': curBlock.height,
Cron: true
},
AoGlobal: {
Process: { Id: processId, Owner: processOwner, Tags: processTags },
Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags }
}
}
/**
* Finally, iterate through all time-based crons.
* If we find a match, yield our cron message.
*/
const offset = t - originBlock.timestamp
for (let timeIndex = 0; timeIndex < timeBased.length; timeIndex++) {
const timeCron = timeBased[timeIndex]
if (offset !== 0 && offset % (timeCron.value * 1000) === 0) {
yield generateCronMessage({ i: timeIndex, cron: timeCron, leftOrdinate, timestamp: t, processOwner, processId, height: currentBlock.height, processTags, moduleId, moduleOwner, moduleTags })
}
}
}
}
}

function gcd (a, b) {
if (b === 0) {
return a
}
return gcd(b, a % b)
}
function gcdOfArr (a) {
// If we have no time-based crons, we want our interval to be the maximum
// so we can simply iterate from block to block.
if (!a.length) return Number.MAX_SAFE_INTEGER
return a.reduce(gcd)
}

function generateCronMessage ({ i, cron, leftOrdinate, timestamp, processOwner, processId, height, processTags, moduleId, moduleOwner, moduleTags }) {
return {
cron: `${i}-${cron.interval}`,
ordinate: leftOrdinate,
name: `Cron Message ${timestamp},${leftOrdinate},${i}-${cron.interval}`,
message: {
Owner: processOwner,
Target: processId,
From: processOwner,
Tags: cron.message.tags,
Timestamp: timestamp,
'Block-Height': height,
Cron: true
},
AoGlobal: {
Process: { Id: processId, Owner: processOwner, Tags: processTags },
Module: { Id: moduleId, Owner: moduleOwner, Tags: moduleTags }
}
}
}

function reconcileBlocksWith ({ loadBlocksMeta, findBlocks, saveBlocks }) {
findBlocks = fromPromise(findBlocksSchema.implement(findBlocks))
saveBlocks = fromPromise(saveBlocksSchema.implement(saveBlocks))
Expand Down
Loading

0 comments on commit 0d9138a

Please sign in to comment.