From 66f76f54da12834d853cc8a0d48c64399e425947 Mon Sep 17 00:00:00 2001 From: Rossb0b Date: Thu, 6 Jun 2024 21:36:28 +0200 Subject: [PATCH 1/2] fix(class/eventManagement/~): artefacts of registration --- src/class/eventManagement/dispatcher.class.ts | 4 +- .../dispatcher/transaction-handler.class.ts | 66 ++++++++++--------- src/class/eventManagement/incomer.class.ts | 2 +- 3 files changed, 38 insertions(+), 34 deletions(-) diff --git a/src/class/eventManagement/dispatcher.class.ts b/src/class/eventManagement/dispatcher.class.ts index 456f662..bb46c81 100644 --- a/src/class/eventManagement/dispatcher.class.ts +++ b/src/class/eventManagement/dispatcher.class.ts @@ -938,7 +938,7 @@ export class Dispatcher extends EventEmit else { for (const incomer of incomers) { if (incomer.baseUUID === origin) { - await this.dispatcherTransactionStore.deleteTransaction(transactionId); + await relatedTransactionStore.deleteTransaction(transactionId); throw new Error("Forbidden multiple registration for a same instance"); } @@ -982,7 +982,7 @@ export class Dispatcher extends EventEmit redisMetadata: { mainTransaction: false, relatedTransaction: transactionId, - eventTransactionId: null, + eventTransactionId: transactionId, resolved: false }, event: event as any diff --git a/src/class/eventManagement/dispatcher/transaction-handler.class.ts b/src/class/eventManagement/dispatcher/transaction-handler.class.ts index dd003b9..3327daa 100644 --- a/src/class/eventManagement/dispatcher/transaction-handler.class.ts +++ b/src/class/eventManagement/dispatcher/transaction-handler.class.ts @@ -556,10 +556,10 @@ export class TransactionHandler { const incomerStateToUpdate = new Set(); for (const [dispatcherTransactionId, dispatcherTransaction] of dispatcherTransactions.entries()) { - const transactionRecipient = dispatcherTransaction.redisMetadata.to; + const relatedIncomer = [...incomers].find((incomer) => incomer.providedUUID === dispatcherTransaction.redisMetadata.to || + incomer.baseUUID === dispatcherTransaction.redisMetadata.to); - const relatedIncomer = [...incomers].find((incomer) => incomer.providedUUID === transactionRecipient || - incomer.baseUUID === transactionRecipient); + const transactionRecipient = relatedIncomer ? relatedIncomer.providedUUID : dispatcherTransaction.redisMetadata.to; const [relatedBackupIncomerTransactionId, relatedBackupIncomerTransaction] = [...backupIncomerTransactions.entries()] .find(([__, incomerTransaction]) => incomerTransaction.redisMetadata.relatedTransaction === dispatcherTransactionId) || @@ -594,49 +594,53 @@ export class TransactionHandler { const relatedIncomerTransactions = await relatedIncomerTransactionStore.getTransactions(); - const [relatedIncomerTransactionId, relatedIncomerTransaction] = [...relatedIncomerTransactions.entries()] - .find(([__, incomerTransaction]) => incomerTransaction.redisMetadata.relatedTransaction === dispatcherTransactionId && + const filteredIncomerTransactions = [...relatedIncomerTransactions.values()] + .filter((incomerTransaction) => incomerTransaction.redisMetadata.relatedTransaction === dispatcherTransactionId && incomerTransaction.redisMetadata.resolved) || []; // Event not resolved yet - if (!relatedIncomerTransactionId) { + if (filteredIncomerTransactions.length === 0) { continue; } - if (dispatcherTransaction.redisMetadata.mainTransaction) { - // Only in case of ping event - incomerStateToUpdate.add(relatedIncomerTransaction.redisMetadata.origin); - toResolve.push(Promise.all([ - relatedIncomerTransactionStore.deleteTransaction(relatedIncomerTransactionId), - this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId) - ])); + for (const filteredIncomerTransaction of filteredIncomerTransactions) { + const filteredIncomerTransactionId = filteredIncomerTransaction.redisMetadata.transactionId; - continue; - } + if (dispatcherTransaction.redisMetadata.mainTransaction) { + // Only in case of ping event + incomerStateToUpdate.add(filteredIncomerTransaction.redisMetadata.origin); + toResolve.push(Promise.all([ + relatedIncomerTransactionStore.deleteTransaction(filteredIncomerTransactionId), + this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId) + ])); + + continue; + } + + if (dispatcherTransaction.name === "APPROVEMENT") { + if (!filteredIncomerTransaction || !filteredIncomerTransaction.redisMetadata.resolved) { + continue; + } + + toResolve.push(Promise.all([ + relatedIncomerTransactionStore.deleteTransaction(filteredIncomerTransactionId), + this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId) + ])); - if (dispatcherTransaction.name === "APPROVEMENT") { - if (!relatedIncomerTransaction || !relatedIncomerTransaction.redisMetadata.resolved) { continue; } + dispatcherTransaction.redisMetadata.resolved = true; + incomerStateToUpdate.add((filteredIncomerTransaction.redisMetadata as any).to); toResolve.push(Promise.all([ - relatedIncomerTransactionStore.deleteTransaction(relatedIncomerTransactionId), - this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId) + relatedIncomerTransactionStore.deleteTransaction(filteredIncomerTransactionId), + this.dispatcherTransactionStore.updateTransaction( + dispatcherTransactionId, + dispatcherTransaction + ) ])); - - continue; } - - dispatcherTransaction.redisMetadata.resolved = true; - incomerStateToUpdate.add((relatedIncomerTransaction.redisMetadata as any).to); - toResolve.push(Promise.all([ - relatedIncomerTransactionStore.deleteTransaction(relatedIncomerTransactionId), - this.dispatcherTransactionStore.updateTransaction( - dispatcherTransactionId, - dispatcherTransaction - ) - ])); } toResolve.push([...incomerStateToUpdate.values()].map( diff --git a/src/class/eventManagement/incomer.class.ts b/src/class/eventManagement/incomer.class.ts index adf006c..2b957cd 100644 --- a/src/class/eventManagement/incomer.class.ts +++ b/src/class/eventManagement/incomer.class.ts @@ -765,7 +765,7 @@ export class Incomer < published: true, resolved: true } - } as Transaction<"incomer">) + } as Transaction<"incomer">, transactionId) ]) ]); From bfe9bd3bf990428b284bbe9aac9f3b453ae8d5f1 Mon Sep 17 00:00:00 2001 From: Rossb0b Date: Mon, 17 Jun 2024 23:42:37 +0200 Subject: [PATCH 2/2] refactor(class/eventManagement/dispatcher/transaction-handler): avoid multiple call to redis --- .../dispatcher/events.class.ts | 5 +- .../dispatcher/transaction-handler.class.ts | 119 ++++++++++++------ 2 files changed, 86 insertions(+), 38 deletions(-) diff --git a/src/class/eventManagement/dispatcher/events.class.ts b/src/class/eventManagement/dispatcher/events.class.ts index 21f04e5..e499c28 100644 --- a/src/class/eventManagement/dispatcher/events.class.ts +++ b/src/class/eventManagement/dispatcher/events.class.ts @@ -94,6 +94,7 @@ export interface DispatchEventOptions { resolved: boolean; }; store: TransactionStore<"incomer"> | TransactionStore<"dispatcher">; + dispatcherTransactionUUID?: string; } export type customValidationCbFn = (event: T) => void; @@ -137,7 +138,7 @@ export class EventsHandler extends EventEmitter { } public async dispatch(options: DispatchEventOptions): Promise { - const { channel, store, redisMetadata, event } = options; + const { channel, store, redisMetadata, event, dispatcherTransactionUUID } = options; const transaction = await store.setTransaction({ ...event, @@ -145,7 +146,7 @@ export class EventsHandler extends EventEmitter { ...event.redisMetadata, ...redisMetadata } as any - }); + }, dispatcherTransactionUUID); await channel.publish({ ...event, diff --git a/src/class/eventManagement/dispatcher/transaction-handler.class.ts b/src/class/eventManagement/dispatcher/transaction-handler.class.ts index 3327daa..98159cf 100644 --- a/src/class/eventManagement/dispatcher/transaction-handler.class.ts +++ b/src/class/eventManagement/dispatcher/transaction-handler.class.ts @@ -1,4 +1,7 @@ /* eslint-disable max-lines */ +// Import Node.js Dependencies +import { randomUUID } from "node:crypto"; + // Import Third-party Dependencies import { Channel } from "@myunisoft/redis"; import { Mutex } from "@openally/mutex"; @@ -35,6 +38,12 @@ interface BackupSpreadTransactionsOptions { incomers: Set; } +interface ResolveTransactions { + incomers: Set; + backupIncomerTransactions: Transactions<"incomer">; + dispatcherTransactions: Transactions<"dispatcher">; +} + interface FindISOIncomerOptions { incomers: RegisteredIncomer[] incomerName: string; @@ -102,10 +111,22 @@ export class TransactionHandler { const free = await this.resolveTransactionsLock.acquire(); try { - await this.handleBackupIncomerTransactions(); + const [incomers, backupIncomerTransactions, dispatcherTransactions] = await Promise.all([ + this.incomerStore.getIncomers(), + this.backupIncomerTransactionStore.getTransactions(), + this.dispatcherTransactionStore.getTransactions() + ]); + + let options = { + incomers, + backupIncomerTransactions, + dispatcherTransactions + }; - await this.resolveSpreadTransactions(); - await this.resolveMainTransactions(); + options = await this.handleBackupIncomerTransactions(options); + + options = await this.resolveSpreadTransactions(options); + await this.resolveMainTransactions(options); } finally { free(); @@ -422,12 +443,8 @@ export class TransactionHandler { )("Main transaction redistributed to an Incomer")); } - private async handleBackupIncomerTransactions() { - const [incomers, backupIncomerTransactions, dispatcherTransactions] = await Promise.all([ - this.incomerStore.getIncomers(), - this.backupIncomerTransactionStore.getTransactions(), - this.dispatcherTransactionStore.getTransactions() - ]); + private async handleBackupIncomerTransactions(options: ResolveTransactions) { + const { incomers, backupIncomerTransactions, dispatcherTransactions } = options; const toResolve = []; @@ -460,6 +477,8 @@ export class TransactionHandler { this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId) ); + backupIncomerTransactions.delete(backupTransactionId); + continue; } @@ -490,29 +509,45 @@ export class TransactionHandler { const concernedIncomerChannel = this.incomerChannelHandler.get(providedUUID) ?? this.incomerChannelHandler.set({ uuid: providedUUID, prefix }); + const dispatcherTransactionUUID = randomUUID(); + const event = { + ...backupIncomerTransaction as IncomerHandlerTransaction["incomerDistributedEventTransaction"], + redisMetadata: { + ...backupIncomerTransaction.redisMetadata, + origin: this.privateUUID, + to: isoListenerIncomer.providedUUID + } + } as any; + + const redisMetadata = { + mainTransaction: backupIncomerTransaction.redisMetadata.mainTransaction, + relatedTransaction: backupIncomerTransaction.redisMetadata.relatedTransaction, + eventTransactionId: null, + resolved: backupIncomerTransaction.redisMetadata.resolved + }; + toResolve.push([ this.eventsHandler.dispatch({ channel: concernedIncomerChannel, store: this.dispatcherTransactionStore, - redisMetadata: { - mainTransaction: backupIncomerTransaction.redisMetadata.mainTransaction, - relatedTransaction: backupIncomerTransaction.redisMetadata.relatedTransaction, - eventTransactionId: null, - resolved: backupIncomerTransaction.redisMetadata.resolved - }, - event: { - ...backupIncomerTransaction as IncomerHandlerTransaction["incomerDistributedEventTransaction"], - redisMetadata: { - ...backupIncomerTransaction.redisMetadata, - origin: this.privateUUID, - to: isoListenerIncomer.providedUUID - } - } as any + redisMetadata, + event, + dispatcherTransactionUUID }), this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId), this.dispatcherTransactionStore.deleteTransaction(relatedDispatcherTransactionId) ]); + dispatcherTransactions.set(dispatcherTransactionUUID, { + ...event, + redisMetadata: { + ...event.redisMetadata, + ...redisMetadata + } + }); + backupIncomerTransactions.delete(backupTransactionId); + dispatcherTransactions.delete(relatedDispatcherTransactionId); + continue; } @@ -539,18 +574,24 @@ export class TransactionHandler { } as Transaction<"dispatcher">), this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId) ); + + dispatcherTransactions.set(relatedDispatcherTransactionId, { + ...relatedDispatcherTransaction, + redisMetadata: { + ...relatedDispatcherTransaction.redisMetadata, + to: isoListenerIncomer.providedUUID + } + } as Transaction<"dispatcher">); } } await Promise.all(toResolve); + + return { incomers, backupIncomerTransactions, dispatcherTransactions }; } - private async resolveSpreadTransactions() { - const [incomers, backupIncomerTransactions, dispatcherTransactions] = await Promise.all([ - this.incomerStore.getIncomers(), - this.backupIncomerTransactionStore.getTransactions(), - this.dispatcherTransactionStore.getTransactions() - ]); + private async resolveSpreadTransactions(options: ResolveTransactions) { + const { incomers, backupIncomerTransactions, dispatcherTransactions } = options; const toResolve = []; const incomerStateToUpdate = new Set(); @@ -576,10 +617,12 @@ export class TransactionHandler { ) ])); + backupIncomerTransactions.delete(relatedBackupIncomerTransactionId); + backupIncomerTransactions.set(dispatcherTransactionId, dispatcherTransaction); + continue; } - // Event not resolved yet continue; } @@ -615,6 +658,8 @@ export class TransactionHandler { this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId) ])); + dispatcherTransactions.delete(dispatcherTransactionId); + continue; } @@ -628,6 +673,8 @@ export class TransactionHandler { this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId) ])); + dispatcherTransactions.delete(dispatcherTransactionId); + continue; } @@ -640,6 +687,8 @@ export class TransactionHandler { dispatcherTransaction ) ])); + + dispatcherTransactions.set(dispatcherTransactionId, dispatcherTransaction); } } @@ -648,14 +697,12 @@ export class TransactionHandler { ); await Promise.all(toResolve); + + return { incomers, backupIncomerTransactions, dispatcherTransactions }; } - private async resolveMainTransactions() { - const [incomers, backupIncomerTransactions, dispatcherTransactions] = await Promise.all([ - this.incomerStore.getIncomers(), - this.backupIncomerTransactionStore.getTransactions(), - this.dispatcherTransactionStore.getTransactions() - ]); + private async resolveMainTransactions(options: ResolveTransactions) { + const { incomers, backupIncomerTransactions, dispatcherTransactions } = options; const toResolve = []; const incomerStateToUpdate = new Set();