Skip to content

Commit

Permalink
Update tsconfig (#287)
Browse files Browse the repository at this point in the history
* refactor(tsconfig): use @openally/config.typescript

* fix(TS): multiple null checks & bugs with Promises
  • Loading branch information
fraxken authored Aug 14, 2024
1 parent 5f877d9 commit 4210522
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 94 deletions.
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@
},
"devDependencies": {
"@nodesecure/eslint-config": "^1.9.0",
"@openally/config.typescript": "^1.0.3",
"@types/jest": "^29.5.12",
"c8": "^10.1.2",
"dotenv": "^16.4.5",
"glob": "^11.0.0",
"jest": "^29.7.0",
"testcontainers": "^10.10.0",
"ts-jest": "^29.2.4",
"typescript": "^5.5.4",
"tsx": "^4.7.0",
"glob": "^11.0.0",
"c8": "^10.1.2"
"typescript": "^5.5.4"
}
}
46 changes: 26 additions & 20 deletions src/class/eventManagement/dispatcher.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,20 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
private backupDispatcherTransactionStore: TransactionStore<"dispatcher">;
private backupIncomerTransactionStore: TransactionStore<"incomer">;

private transactionHandler: TransactionHandler;
private transactionHandler: TransactionHandler<T>;

private logger: Logger;
private incomerChannelHandler: IncomerChannelHandler<T>;
private activeChannels = new Set<string>();

private pingInterval: number;
private pingIntervalTimer: NodeJS.Timeout;
private pingIntervalTimer: NodeJS.Timeout | undefined;
private checkLastActivityInterval: number;
private checkLastActivityIntervalTimer: NodeJS.Timeout;
private checkLastActivityIntervalTimer: NodeJS.Timeout | undefined;
private resolveTransactionInterval: number;
private checkDispatcherStateInterval: NodeJS.Timeout;
private resetCheckLastActivityTimeout: NodeJS.Timeout;
private resolveTransactionsInterval: NodeJS.Timeout;
private checkDispatcherStateInterval: NodeJS.Timeout | undefined;
private resetCheckLastActivityTimeout: NodeJS.Timeout | undefined;
private resolveTransactionsInterval: NodeJS.Timeout | undefined;
private idleTime: number;
private minTimeout = 0;
// Arbitrary value according to fastify default pluginTimeout
Expand Down Expand Up @@ -288,6 +288,10 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
}

public async initialize() {
if (!this.subscriber) {
throw new Error(`redis subscriber not available`);
}

await this.subscriber.subscribe(this.dispatcherChannelName);
this.subscriber.on("message", (channel, message) => {
this.handleMessages(channel, message).catch((error) => this.logger.error({ error: error.stack }));
Expand Down Expand Up @@ -333,7 +337,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
this.eventsHandler.removeAllListeners();
this.removeAllListeners();

await this.subscriber.unsubscribe(
await this.subscriber?.unsubscribe(
this.dispatcherChannelName,
...this.incomerChannelHandler.channels.keys(),
...this.activeChannels.values()
Expand All @@ -351,7 +355,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
try {
parsedMessage = JSON.parse(message);
}
catch (error) {
catch (error: any) {
this.logger.error({ channel, error: error.message });

return;
Expand Down Expand Up @@ -383,7 +387,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit

await this.eventsHandler.handleEvents(channel, parsedMessage);
}
catch (error) {
catch (error: any) {
this.logger.error({
channel,
message: parsedMessage,
Expand Down Expand Up @@ -485,7 +489,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
dispatcherToRemove ? this.removeNonActives([dispatcherToRemove]) : () => void 0
]);
}
catch (error) {
catch (error: any) {
this.logger.error({ error: error.stack }, `${this.selfProvidedUUID} failed while taking back the lead`);

return;
Expand Down Expand Up @@ -529,7 +533,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit

const inactiveIncomers = await this.incomerStore.getNonActives();

const toResolve = [];
const toResolve: Promise<void>[] = [];

let index = 0;
for (const inactive of inactiveIncomers) {
Expand All @@ -545,10 +549,10 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
.map(([transactionId]) => transactionId);

if (recentPingTransactionKeys.length > 0) {
toResolve.push(Promise.all([
toResolve.push(...[
this.incomerStore.updateIncomerState(inactive.providedUUID),
transactionStore.deleteTransactions(recentPingTransactionKeys)
]));
]);

inactiveIncomers.splice(index, 1);
}
Expand All @@ -563,7 +567,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit

private async ping() {
const incomers = await this.incomerStore.getIncomers();
const pingToResolve = [];
const pingToResolve: Promise<void>[] = [];
const concernedIncomers: string[] = [];
for (const incomer of incomers) {
if (incomer === null) {
Expand Down Expand Up @@ -615,14 +619,16 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
}

private async removeNonActives(inactiveIncomers: RegisteredIncomer[]) {
const toHandle = [];
const toHandle: Promise<void>[] = [];

for (const inactive of inactiveIncomers) {
toHandle.push(Promise.all([
toHandle.push(...[
this.incomerStore.deleteIncomer(inactive.providedUUID),
this.transactionHandler.resolveInactiveIncomerTransactions(inactive),
this.activeChannels.add(`${inactive.prefix ? `${inactive.prefix}-` : ""}${inactive.providedUUID}`)
]));
this.transactionHandler.resolveInactiveIncomerTransactions(inactive)
]);
this.activeChannels.add(
`${inactive.prefix ? `${inactive.prefix}-` : ""}${inactive.providedUUID}`
);
}

await Promise.all(toHandle);
Expand Down Expand Up @@ -845,7 +851,7 @@ export class Dispatcher<T extends GenericEvent = GenericEvent> extends EventEmit
for (const incomer of concernedIncomers) {
const relatedEvent = incomer.eventsSubscribe.find((subscribedEvent) => subscribedEvent.name === eventName);

if (!relatedEvent.horizontalScale &&
if (relatedEvent && !relatedEvent.horizontalScale &&
filteredConcernedIncomers.find(
(filteredConcernedIncomer) => filteredConcernedIncomer.eventsSubscribe.find(
(subscribedEvent) => subscribedEvent.name === relatedEvent.name && filteredConcernedIncomer.name === incomer.name
Expand Down
4 changes: 2 additions & 2 deletions src/class/eventManagement/dispatcher/events.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export class EventsHandler<T extends GenericEvent> extends EventEmitter {
readonly dispatcherChannelName: string;

#eventsValidationFn: eventsValidationFn<T>;
#customValidationCbFn: customValidationCbFn<T>;
#customValidationCbFn: customValidationCbFn<T> | undefined;

#logger: Logger;
#standardLogFn: StandardLog<T>;
Expand Down Expand Up @@ -169,7 +169,7 @@ export class EventsHandler<T extends GenericEvent> extends EventEmitter {
try {
this.dispatcherChannelMessagesSchemaValidation(event);
}
catch (error) {
catch (error: any) {
this.#logger.error(this.#standardLogFn({
...event,
redisMetadata: {
Expand Down
4 changes: 2 additions & 2 deletions src/class/eventManagement/dispatcher/incomer-channel.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class IncomerChannelHandler<
get(
uuid: string
): ChannelMessages<T> | null {
return this.channels.get(uuid);
return this.channels.get(uuid) ?? null;
}

async remove(
Expand All @@ -67,7 +67,7 @@ export class IncomerChannelHandler<
return;
}

await this.subscriber.unsubscribe(uuid);
await this.subscriber?.unsubscribe(uuid);
this.channels.delete(uuid);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
return this.dispatcherTransactionStore.deleteTransaction(relatedPingTransactionId);
}

const relatedIncomerPingTransaction = incomerPingTransactions.get(relatedPingTransactionId);
const relatedIncomerPingTransaction = incomerPingTransactions.get(relatedPingTransactionId)!;

return Promise.all([
inactiveIncomerTransactionStore.deleteTransaction(relatedIncomerPingTransaction.redisMetadata.transactionId),
Expand Down Expand Up @@ -336,7 +336,9 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
relatedHandlerTransaction,
relatedHandlerTransaction.redisMetadata.transactionId
),
incomerToRemoveTransactionStore.deleteTransaction(relatedHandlerTransaction.redisMetadata.transactionId)
incomerToRemoveTransactionStore.deleteTransaction(
relatedHandlerTransaction.redisMetadata.transactionId
)
]);

this.logger.debug(
Expand Down Expand Up @@ -683,7 +685,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
private async resolveSpreadTransactions(options: ResolveTransactions): Promise<ResolveTransactions> {
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;

const toResolve = [];
const toResolve: any[] = [];
const incomerStateToUpdate = new Set<string>();

for (const [dispatcherTransactionId, dispatcherTransaction] of dispatcherTransactions.entries()) {
Expand Down Expand Up @@ -738,7 +740,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
}

for (const filteredIncomerTransaction of filteredIncomerTransactions) {
const filteredIncomerTransactionId = filteredIncomerTransaction.redisMetadata.transactionId;
const filteredIncomerTransactionId = filteredIncomerTransaction.redisMetadata.transactionId!;

if (dispatcherTransaction.redisMetadata.mainTransaction) {
// Only in case of ping event
Expand Down Expand Up @@ -794,7 +796,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
private async resolveMainTransactions(options: ResolveTransactions) {
const { incomers, backupIncomerTransactions, dispatcherTransactions } = options;

const toResolve = [];
const toResolve: Promise<void>[] = [];
const incomerStateToUpdate = new Set<string>();

for (const incomer of incomers) {
Expand Down Expand Up @@ -856,7 +858,7 @@ export class TransactionHandler<T extends GenericEvent = GenericEvent> {
}
}

toResolve.push([...incomerStateToUpdate.values()].map(
toResolve.push(...[...incomerStateToUpdate.values()].map(
(incomerId) => this.incomerStore.updateIncomerState(incomerId))
);

Expand Down
1 change: 0 additions & 1 deletion src/class/eventManagement/externals.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export class Externals<T extends GenericEvent = GenericEvent> {
});

this.dispatcher = new Dispatcher({
name: "pulsar",
...options,
pingInterval: Number(process.env.MYUNISOFT_DISPATCHER_PING) || undefined,
checkLastActivityInterval: Number(process.env.MYUNISOFT_DISPATCHER_ACTIVITY_CHECK) || undefined,
Expand Down
40 changes: 19 additions & 21 deletions src/class/eventManagement/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ export class Incomer <
private checkTransactionsStateInterval: NodeJS.Timeout;
private checkDispatcherStateTimeout: NodeJS.Timeout;
private lastPingDate: number;
private eventsValidationFn: Map<string, ValidateFunction<Record<string, any>> | NestedValidationFunctions>;
private customValidationCbFn: (event: T) => void;
private eventsValidationFn: Map<string, ValidateFunction<Record<string, any>> | NestedValidationFunctions> | undefined;
private customValidationCbFn: ((event: T) => void) | undefined;

public externals: Externals<T> | undefined;

Expand Down Expand Up @@ -557,7 +557,7 @@ export class Incomer <
await this.handleIncomerMessages(channel, formattedMessage);
}
}
catch (error) {
catch (error: any) {
this.logger.error({ channel, message: formattedMessage, error: error.stack });
}
}
Expand Down Expand Up @@ -588,7 +588,7 @@ export class Incomer <
throw new Error("Unknown event");
});
}
catch (error) {
catch (error: any) {
this.logger.error({
channel: "dispatcher",
error: error.stack,
Expand Down Expand Up @@ -770,29 +770,27 @@ export class Incomer <
instance: "incomer"
});

const transactionToUpdate = [];
const transactionToUpdate: Promise<any>[] = [];
for (const [transactionId, transaction] of oldTransactions.entries()) {
if (transaction.name === "REGISTER") {
transactionToUpdate.push([
Promise.all([
this.defaultIncomerTransactionStore.deleteTransaction(transactionId),
this.newTransactionStore.setTransaction({
...transaction,
redisMetadata: {
...transaction.redisMetadata,
origin: this.providedUUID,
relatedTransaction: message.redisMetadata.transactionId,
published: true,
resolved: true
}
} as Transaction<"incomer">, transactionId)
])
transactionToUpdate.push(...[
this.defaultIncomerTransactionStore.deleteTransaction(transactionId),
this.newTransactionStore.setTransaction({
...transaction,
redisMetadata: {
...transaction.redisMetadata,
origin: this.providedUUID,
relatedTransaction: message.redisMetadata.transactionId,
published: true,
resolved: true
}
} as Transaction<"incomer">, transactionId)
]);

continue;
}

transactionToUpdate.push(Promise.all([
transactionToUpdate.push(...[
this.defaultIncomerTransactionStore.deleteTransaction(transactionId),
this.newTransactionStore.setTransaction({
...transaction,
Expand All @@ -801,7 +799,7 @@ export class Incomer <
origin: this.providedUUID
}
})
]));
]);
}

await Promise.all(transactionToUpdate);
Expand Down
6 changes: 4 additions & 2 deletions src/class/store/incomer.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,17 @@ export class IncomerStore extends KVPeer<RegisteredIncomer> {
const foundIncomers = await Promise.all(incomerKeys.map(
(incomerKey) => this.getValue(incomerKey)
));
foundIncomers.forEach((incomer) => incomers.add(incomer));
foundIncomers
.filter((incomer) => incomer !== null)
.forEach((incomer) => incomers.add(incomer));
}

return incomers;
}

getIncomer(
incomerId: string
): Promise<RegisteredIncomer> {
): Promise<RegisteredIncomer | null> {
return this.getValue(
this.#buildIncomerKey(incomerId)
);
Expand Down
2 changes: 1 addition & 1 deletion src/class/store/transaction.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class TransactionStore<
transaction !== null &&
(transaction.redisMetadata && "transactionId" in transaction.redisMetadata)
) {
mappedTransactions.set(transaction.redisMetadata.transactionId, transaction);
mappedTransactions.set(transaction.redisMetadata.transactionId!, transaction);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ export * from "./defaultStandardLog.js";
export * from "./ajv.js";

export function handleLoggerMode(
mode: string
mode: string | undefined
): string {
return (mode === "info" || mode === "debug" || mode === "warn" || mode === "silent") ? mode : "info";
}
Loading

0 comments on commit 4210522

Please sign in to comment.