Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(class/eventManagement/~): artefacts of registration #243

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/class/eventManagement/dispatcher.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
else {
for (const incomer of incomers) {
if (incomer.baseUUID === origin) {
await this.dispatcherTransactionStore.deleteTransaction(transactionId);
await relatedTransactionStore.deleteTransaction(transactionId);

throw new Error("Forbidden multiple registration for a same instance");
}
Expand Down Expand Up @@ -982,7 +982,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
redisMetadata: {
mainTransaction: false,
relatedTransaction: transactionId,
eventTransactionId: null,
eventTransactionId: transactionId,
resolved: false
},
event: event as any
Expand Down
5 changes: 3 additions & 2 deletions src/class/eventManagement/dispatcher/events.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export interface DispatchEventOptions<T extends GenericEvent> {
resolved: boolean;
};
store: TransactionStore<"incomer"> | TransactionStore<"dispatcher">;
dispatcherTransactionUUID?: string;
}

export type customValidationCbFn<T extends GenericEvent> = (event: T) => void;
Expand Down Expand Up @@ -137,15 +138,15 @@ export class EventsHandler<T extends GenericEvent> extends EventEmitter {
}

public async dispatch(options: DispatchEventOptions<T>): Promise<void> {
const { channel, store, redisMetadata, event } = options;
const { channel, store, redisMetadata, event, dispatcherTransactionUUID } = options;

const transaction = await store.setTransaction({
...event,
redisMetadata: {
...event.redisMetadata,
...redisMetadata
} as any
});
}, dispatcherTransactionUUID);

await channel.publish({
...event,
Expand Down
183 changes: 117 additions & 66 deletions src/class/eventManagement/dispatcher/transaction-handler.class.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
/* eslint-disable max-lines */
// Import Node.js Dependencies
import { randomUUID } from "node:crypto";

// Import Third-party Dependencies
import { Channel } from "@myunisoft/redis";
import { Mutex } from "@openally/mutex";
Expand Down Expand Up @@ -35,6 +38,12 @@ interface BackupSpreadTransactionsOptions {
incomers: Set<RegisteredIncomer>;
}

interface ResolveTransactions {
incomers: Set<RegisteredIncomer>;
backupIncomerTransactions: Transactions<"incomer">;
dispatcherTransactions: Transactions<"dispatcher">;
}

interface FindISOIncomerOptions {
incomers: RegisteredIncomer[]
incomerName: string;
Expand Down Expand Up @@ -102,10 +111,22 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
const free = await this.resolveTransactionsLock.acquire();

try {
await this.handleBackupIncomerTransactions();
const [incomers, backupIncomerTransactions, dispatcherTransactions] = await Promise.all([
this.incomerStore.getIncomers(),
this.backupIncomerTransactionStore.getTransactions(),
this.dispatcherTransactionStore.getTransactions()
]);

let options = {
incomers,
backupIncomerTransactions,
dispatcherTransactions
};

options = await this.handleBackupIncomerTransactions(options);

await this.resolveSpreadTransactions();
await this.resolveMainTransactions();
options = await this.resolveSpreadTransactions(options);
await this.resolveMainTransactions(options);
}
finally {
free();
Expand Down Expand Up @@ -422,12 +443,8 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
)("Main transaction redistributed to an Incomer"));
}

private async handleBackupIncomerTransactions() {
const [incomers, backupIncomerTransactions, dispatcherTransactions] = await Promise.all([
this.incomerStore.getIncomers(),
this.backupIncomerTransactionStore.getTransactions(),
this.dispatcherTransactionStore.getTransactions()
]);
private async handleBackupIncomerTransactions(options: ResolveTransactions) {
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;

const toResolve = [];

Expand Down Expand Up @@ -460,6 +477,8 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId)
);

backupIncomerTransactions.delete(backupTransactionId);

continue;
}

Expand Down Expand Up @@ -490,29 +509,45 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
const concernedIncomerChannel = this.incomerChannelHandler.get(providedUUID) ??
this.incomerChannelHandler.set({ uuid: providedUUID, prefix });

const dispatcherTransactionUUID = randomUUID();
const event = {
...backupIncomerTransaction as IncomerHandlerTransaction["incomerDistributedEventTransaction"],
redisMetadata: {
...backupIncomerTransaction.redisMetadata,
origin: this.privateUUID,
to: isoListenerIncomer.providedUUID
}
} as any;

const redisMetadata = {
mainTransaction: backupIncomerTransaction.redisMetadata.mainTransaction,
relatedTransaction: backupIncomerTransaction.redisMetadata.relatedTransaction,
eventTransactionId: null,
resolved: backupIncomerTransaction.redisMetadata.resolved
};

toResolve.push([
this.eventsHandler.dispatch({
channel: concernedIncomerChannel,
store: this.dispatcherTransactionStore,
redisMetadata: {
mainTransaction: backupIncomerTransaction.redisMetadata.mainTransaction,
relatedTransaction: backupIncomerTransaction.redisMetadata.relatedTransaction,
eventTransactionId: null,
resolved: backupIncomerTransaction.redisMetadata.resolved
},
event: {
...backupIncomerTransaction as IncomerHandlerTransaction["incomerDistributedEventTransaction"],
redisMetadata: {
...backupIncomerTransaction.redisMetadata,
origin: this.privateUUID,
to: isoListenerIncomer.providedUUID
}
} as any
redisMetadata,
event,
dispatcherTransactionUUID
}),
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId),
this.dispatcherTransactionStore.deleteTransaction(relatedDispatcherTransactionId)
]);

dispatcherTransactions.set(dispatcherTransactionUUID, {
...event,
redisMetadata: {
...event.redisMetadata,
...redisMetadata
}
});
backupIncomerTransactions.delete(backupTransactionId);
dispatcherTransactions.delete(relatedDispatcherTransactionId);

continue;
}

Expand All @@ -539,27 +574,33 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
} as Transaction<"dispatcher">),
this.backupIncomerTransactionStore.deleteTransaction(backupTransactionId)
);

dispatcherTransactions.set(relatedDispatcherTransactionId, {
...relatedDispatcherTransaction,
redisMetadata: {
...relatedDispatcherTransaction.redisMetadata,
to: isoListenerIncomer.providedUUID
}
} as Transaction<"dispatcher">);
}
}

await Promise.all(toResolve);

return { incomers, backupIncomerTransactions, dispatcherTransactions };
}

private async resolveSpreadTransactions() {
const [incomers, backupIncomerTransactions, dispatcherTransactions] = await Promise.all([
this.incomerStore.getIncomers(),
this.backupIncomerTransactionStore.getTransactions(),
this.dispatcherTransactionStore.getTransactions()
]);
private async resolveSpreadTransactions(options: ResolveTransactions) {
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;

const toResolve = [];
const incomerStateToUpdate = new Set<string>();

for (const [dispatcherTransactionId, dispatcherTransaction] of dispatcherTransactions.entries()) {
const transactionRecipient = dispatcherTransaction.redisMetadata.to;
const relatedIncomer = [...incomers].find((incomer) => incomer.providedUUID === dispatcherTransaction.redisMetadata.to ||
incomer.baseUUID === dispatcherTransaction.redisMetadata.to);

const relatedIncomer = [...incomers].find((incomer) => incomer.providedUUID === transactionRecipient ||
incomer.baseUUID === transactionRecipient);
const transactionRecipient = relatedIncomer ? relatedIncomer.providedUUID : dispatcherTransaction.redisMetadata.to;

const [relatedBackupIncomerTransactionId, relatedBackupIncomerTransaction] = [...backupIncomerTransactions.entries()]
.find(([__, incomerTransaction]) => incomerTransaction.redisMetadata.relatedTransaction === dispatcherTransactionId) ||
Expand All @@ -576,10 +617,12 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
)
]));

backupIncomerTransactions.delete(relatedBackupIncomerTransactionId);
backupIncomerTransactions.set(dispatcherTransactionId, dispatcherTransaction);

continue;
}

// Event not resolved yet
continue;
}

Expand All @@ -594,64 +637,72 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {

const relatedIncomerTransactions = await relatedIncomerTransactionStore.getTransactions();

const [relatedIncomerTransactionId, relatedIncomerTransaction] = [...relatedIncomerTransactions.entries()]
.find(([__, incomerTransaction]) => incomerTransaction.redisMetadata.relatedTransaction === dispatcherTransactionId &&
const filteredIncomerTransactions = [...relatedIncomerTransactions.values()]
.filter((incomerTransaction) => incomerTransaction.redisMetadata.relatedTransaction === dispatcherTransactionId &&
incomerTransaction.redisMetadata.resolved) ||
[];

// Event not resolved yet
if (!relatedIncomerTransactionId) {
if (filteredIncomerTransactions.length === 0) {
continue;
}

if (dispatcherTransaction.redisMetadata.mainTransaction) {
// Only in case of ping event
incomerStateToUpdate.add(relatedIncomerTransaction.redisMetadata.origin);
toResolve.push(Promise.all([
relatedIncomerTransactionStore.deleteTransaction(relatedIncomerTransactionId),
this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId)
]));
for (const filteredIncomerTransaction of filteredIncomerTransactions) {
const filteredIncomerTransactionId = filteredIncomerTransaction.redisMetadata.transactionId;

continue;
}
if (dispatcherTransaction.redisMetadata.mainTransaction) {
// Only in case of ping event
incomerStateToUpdate.add(filteredIncomerTransaction.redisMetadata.origin);
toResolve.push(Promise.all([
relatedIncomerTransactionStore.deleteTransaction(filteredIncomerTransactionId),
this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId)
]));

dispatcherTransactions.delete(dispatcherTransactionId);

if (dispatcherTransaction.name === "APPROVEMENT") {
if (!relatedIncomerTransaction || !relatedIncomerTransaction.redisMetadata.resolved) {
continue;
}

if (dispatcherTransaction.name === "APPROVEMENT") {
if (!filteredIncomerTransaction || !filteredIncomerTransaction.redisMetadata.resolved) {
continue;
}

toResolve.push(Promise.all([
relatedIncomerTransactionStore.deleteTransaction(filteredIncomerTransactionId),
this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId)
]));

dispatcherTransactions.delete(dispatcherTransactionId);

continue;
}

dispatcherTransaction.redisMetadata.resolved = true;
incomerStateToUpdate.add((filteredIncomerTransaction.redisMetadata as any).to);
toResolve.push(Promise.all([
relatedIncomerTransactionStore.deleteTransaction(relatedIncomerTransactionId),
this.dispatcherTransactionStore.deleteTransaction(dispatcherTransactionId)
relatedIncomerTransactionStore.deleteTransaction(filteredIncomerTransactionId),
this.dispatcherTransactionStore.updateTransaction(
dispatcherTransactionId,
dispatcherTransaction
)
]));

continue;
dispatcherTransactions.set(dispatcherTransactionId, dispatcherTransaction);
}

dispatcherTransaction.redisMetadata.resolved = true;
incomerStateToUpdate.add((relatedIncomerTransaction.redisMetadata as any).to);
toResolve.push(Promise.all([
relatedIncomerTransactionStore.deleteTransaction(relatedIncomerTransactionId),
this.dispatcherTransactionStore.updateTransaction(
dispatcherTransactionId,
dispatcherTransaction
)
]));
}

toResolve.push([...incomerStateToUpdate.values()].map(
(incomerId) => this.incomerStore.updateIncomerState(incomerId))
);

await Promise.all(toResolve);

return { incomers, backupIncomerTransactions, dispatcherTransactions };
}

private async resolveMainTransactions() {
const [incomers, backupIncomerTransactions, dispatcherTransactions] = await Promise.all([
this.incomerStore.getIncomers(),
this.backupIncomerTransactionStore.getTransactions(),
this.dispatcherTransactionStore.getTransactions()
]);
private async resolveMainTransactions(options: ResolveTransactions) {
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;

const toResolve = [];
const incomerStateToUpdate = new Set<string>();
Expand Down
2 changes: 1 addition & 1 deletion src/class/eventManagement/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ export class Incomer <
published: true,
resolved: true
}
} as Transaction<"incomer">)
} as Transaction<"incomer">, transactionId)
])
]);

Expand Down
Loading