Skip to content

Commit

Permalink
Merge pull request #168 from ar-io/fix-arns-emitter
Browse files Browse the repository at this point in the history
fix(arns): update event emitter to provide more events and logs while…
  • Loading branch information
dtfiedler authored Jul 12, 2024
2 parents dd46eff + 1d67dfe commit 9a54bc3
Showing 1 changed file with 44 additions and 6 deletions.
50 changes: 44 additions & 6 deletions src/utils/processes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { pLimit } from 'plimit-lit';

import { ANT } from '../common/ant.js';
import { IO } from '../common/io.js';
import { ILogger } from '../common/logger.js';
import { ILogger, Logger } from '../common/logger.js';
import { IO_TESTNET_PROCESS_ID } from '../constants.js';
import {
AoANTState,
Expand Down Expand Up @@ -94,35 +94,46 @@ export class ArNSEventEmitter extends EventEmitter {
protected contract: AoIORead;
private timeoutMs: number; // timeout for each request to 3 seconds
private throttle;
private logger: ILogger;
constructor({
contract = IO.init({
processId: IO_TESTNET_PROCESS_ID,
}),
timeoutMs = 60_000,
concurrency = 30,
logger = Logger.default,
}: {
contract?: AoIORead;
timeoutMs?: number;
concurrency?: number;
}) {
logger?: ILogger;
} = {}) {
super();
this.contract = contract;
this.timeoutMs = timeoutMs;
this.throttle = pLimit(concurrency);
this.logger = logger;
}

async fetchProcessesOwnedByWallet({ address }: { address: WalletAddress }) {
const uniqueContractProcessIds: Record<
string,
{ state: AoANTState | undefined; names: Record<string, AoArNSNameData> }
{
state: AoANTState | undefined;
names: Record<string, AoArNSNameData>;
}
> = {};

await timeout(
this.timeoutMs,
fetchAllArNSRecords({ contract: this.contract }),
fetchAllArNSRecords({ contract: this.contract, emitter: this }),
)
.catch((e) => {
this.emit('error', `Error getting ArNS records: ${e}`);
this.logger.error(`Error getting ArNS records`, {
message: e?.message,
stack: e?.stack,
});
return {};
})
.then((records) => {
Expand All @@ -142,8 +153,8 @@ export class ArNSEventEmitter extends EventEmitter {
});

const idCount = Object.keys(uniqueContractProcessIds).length;
// check the contract owner and controllers
this.emit('progress', 0, idCount);
// check the contract owner and controllers
await Promise.all(
Object.keys(uniqueContractProcessIds).map(async (processId, i) =>
this.throttle(async () => {
Expand Down Expand Up @@ -188,12 +199,15 @@ export const fetchAllArNSRecords = async ({
contract = IO.init({
processId: IO_TESTNET_PROCESS_ID,
}),
logger,
emitter,
logger = Logger.default,
}: {
contract?: AoIORead;
emitter?: EventEmitter;
logger?: ILogger;
}): Promise<Record<string, AoArNSNameData>> => {
let cursor: string | undefined;
const startTimestamp = Date.now();
const records: Record<string, AoArNSNameData> = {};
do {
const pageResult = await contract
Expand All @@ -203,6 +217,9 @@ export const fetchAllArNSRecords = async ({
message: e?.message,
stack: e?.stack,
});

emitter?.emit('arns:error', `Error getting ArNS records: ${e}`);

return undefined;
});

Expand All @@ -214,8 +231,29 @@ export const fetchAllArNSRecords = async ({
const { name, ...recordDetails } = record;
records[name] = recordDetails;
});

logger.debug('Fetched page of ArNS records', {
totalRecordCount: pageResult.totalItems,
fetchedRecordCount: Object.keys(records).length,
cursor: pageResult.nextCursor,
});

emitter?.emit('arns:pageLoaded', {
totalRecordCount: pageResult.totalItems,
fetchedRecordCount: Object.keys(records).length,
records: pageResult.items,
cursor: pageResult.nextCursor,
});

cursor = pageResult.nextCursor;
} while (cursor !== undefined);

emitter?.emit('arns:end', records);

logger.debug('Fetched all ArNS records', {
totalRecordCount: Object.keys(records).length,
durationMs: Date.now() - startTimestamp,
});

return records;
};

0 comments on commit 9a54bc3

Please sign in to comment.