Skip to content

Commit

Permalink
feat: winston logger with discord
Browse files Browse the repository at this point in the history
  • Loading branch information
meomeocoj committed Aug 31, 2024
1 parent 4adc486 commit aa04a2e
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 58 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"build": "lerna run build --concurrency 1",
"build-tsc": "tsc -p",
"deploy": "yarn lerna publish",
"build:watch": "lerna run build --stream",
"prepare": "husky install",
"build:docker": "lerna run build --concurrency 1"
},
Expand Down
79 changes: 60 additions & 19 deletions packages/cw-to-ton/src/PacketProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { sleep } from "./utils";
import { ACK } from "./dtos/packets/AckPacket";
import { TonHandler } from "./services";
import { TransferPacket } from "./dtos/packets/TransferPacket";
import { Logger } from "winston";
import { getExistenceProofSnakeCell } from "@oraichain/ton-bridge-contracts";
import { ExistenceProof } from "cosmjs-types/cosmos/ics23/v1/proofs";

//@ts-ignore
BigInt.prototype.toJSON = function () {
Expand All @@ -19,13 +22,15 @@ export class PacketProcessorArgs {
cosmosProofHandler: CosmosProofHandler;
tonHandler: TonHandler;
pollingInterval: number;
logger: Logger;
}

export class PacketProcessor {
cosmosBlockOffset: CosmosBlockOffset;
cosmosProofHandler: CosmosProofHandler;
tonHandler: TonHandler;
pollingInterval: number;
logger: Logger;
// Memory packets
lock: boolean = false;
pendingRelayPackets: (
Expand All @@ -41,13 +46,17 @@ export class PacketProcessor {
this.cosmosProofHandler = args.cosmosProofHandler;
this.tonHandler = args.tonHandler;
this.pollingInterval = args.pollingInterval || 5000;
this.logger = args.logger;
}

addPendingTransferPackets(transferPackets: TransferPacketWithBasicInfo[]) {
if (transferPackets.length === 0) {
return;
}
this.pendingRelayPackets.push(...transferPackets);
this.logger.info(
`PacketProcessor:Added ${transferPackets.length} TransferPackets`
);
}

addPendingAckPackets(ackPackets: AckPacketWithBasicInfo[]) {
Expand All @@ -60,25 +69,26 @@ export class PacketProcessor {
this.pendingRelayPackets.push(packet);
}
});
this.logger.info(`PacketProcessor:Added ${ackPackets.length} AckPackets`);
}

async run() {
logger.info("PacketProcessor:Start running");
this.logger.info("PacketProcessor:Start running");
while (true) {
try {
this.lock = true;
logger.debug(
this.logger.debug(
`PacketProcessor:Before pop all pending packets, ${JSON.stringify(this.getPendingRelayPackets())}`
);
logger.debug(
this.logger.debug(
`PacketProcessor:Before pop all pending packets, ${JSON.stringify(this.getPendingAckSuccessPackets())}`
);
const pendingPackets = this._popAllPendingRelayPackets();
const pendingAckSuccessPacket = this._popAllPendingAckSuccessPackets();
logger.debug(
this.logger.debug(
`PacketProcessor:After pop all pending packets, ${JSON.stringify(this.getPendingRelayPackets())}`
);
logger.debug(
this.logger.debug(
`PacketProcessor:After pop all pending packets, ${JSON.stringify(this.getPendingAckSuccessPackets())}`
);

Expand All @@ -89,10 +99,11 @@ export class PacketProcessor {
];
if (pendingPackets.length === 0) {
this.addPendingAckPackets(pendingAckSuccessPacket);
this.logger.info("PacketProcessor:No pending packets");
await sleep(this.pollingInterval);
continue;
}
logger.info(
this.logger.info(
`PacketProcessor:Processing ${this.processingPackets.length} packets`
);
let heightForQueryProof = this.getHeightLatestPackets([
Expand All @@ -106,27 +117,42 @@ export class PacketProcessor {
latestLightClientHeight,
neededUpdateHeight
);
logger.debug(
this.logger.debug(
`PacketProcessor:heightForQueryProof ${heightForQueryProof}`
);
logger.debug(
this.logger.debug(
`PacketProcessor:neededUpdateHeight ${neededUpdateHeight}`
);
logger.debug(
this.logger.debug(
`PacketProcessor:latestLightClientHeight ${latestLightClientHeight}`
);

if (finalUpdateHeight === neededUpdateHeight) {
if (
finalUpdateHeight === neededUpdateHeight &&
finalUpdateHeight !== latestLightClientHeight
) {
this.logger.info(
`PacketProcessor:Update light client to ${finalUpdateHeight}`
);
const clientData =
await this.cosmosProofHandler.createUpdateClientData(
finalUpdateHeight
);
await this.tonHandler.updateLightClient(clientData);
} else if (finalUpdateHeight == latestLightClientHeight) {
heightForQueryProof = latestLightClientHeight - 1;
this.logger.info(
`PacketProcessor:Light client height is larger than neededUpdateHeight. Update heightForQueryProof ${heightForQueryProof}`
);
}
// FIXME: This may reach rate limit if the number of packets is too large
const packetProof = await Promise.all(
this.logger.info(
`PacketProcessor:Get proofs at ${heightForQueryProof}`
);
this.logger.debug(
"PacketProcessor:packet.data" + JSON.stringify(this.processingPackets)
);
const serializedProofs = await Promise.allSettled(
this.processingPackets.map((packet) => {
if (packet.data instanceof TransferPacket) {
return this.cosmosProofHandler.getPacketProofs(
Expand All @@ -141,30 +167,45 @@ export class PacketProcessor {
})
);

if (packetProof.length !== this.processingPackets.length) {
const packetProofs = serializedProofs.map((proofs) => {
if (proofs.status === "rejected") {
return [];
}
return proofs.value.map((proof) => {
if (proof) return ExistenceProof.fromJSON(proof);
});
});

if (packetProofs.length !== this.processingPackets.length) {
throw new Error(
"`PacketProcessor:Packet proof length not match with processing packets length"
"PacketProcessor:Packet proof length not match with processing packets length"
);
}

// Get proof from minProvenHeight
while (this.processingPackets.length > 1) {
while (this.processingPackets.length > 0) {
const packet = this.processingPackets.shift();
logger.debug(`PacketProcessor:packet.data ${packet.data}`);
const proof = packetProof.shift();
this.logger.debug(`PacketProcessor:packet.data ${packet.data}`);
const proof = packetProofs.shift();
const data = packet.data;
// TODO: should change to highload_wallet contract
if (proof.length === 0) {
this.logger.error(
`PacketProcessor:NotFound proof for ${data.getName()} at ${packet.hash}`
);
continue;
}
await this.tonHandler.sendPacket(finalUpdateHeight, data, proof);
logger.info(
this.logger.info(
`PacketProcessor:Send packet ${data.getName()} to TON successfully`
);
}
await this.cosmosBlockOffset.updateBlockOffset(finalUpdateHeight);
logger.info(
this.logger.info(
`PacketProcessor:Update block offset to ${finalUpdateHeight}`
);
} catch (error) {
logger.error(`PacketProcessor:Error when run:${error}`);
this.logger.error(`PacketProcessor:Error when run`, error);
throw new Error(`PacketProcessor:Error when run:${error}`);
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cw-to-ton/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as dotenv from "dotenv";
dotenv.config();

dotenv.config();
export type Config = {
tonMnemonic: string;
cosmosRpcUrl: string;
Expand Down
34 changes: 23 additions & 11 deletions packages/cw-to-ton/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
} from "@oraichain/ton-bridge-contracts";
import { Address } from "@ton/core";
import { PacketProcessor } from "./PacketProcessor";

import {
CosmosProofHandler,
CosmwasmWatcherEvent,
Expand All @@ -17,14 +16,23 @@ import {
} from "./services";
import { Packets } from "./@types";
import { CosmosBlockOffset } from "./models";
import { Logger } from "winston";

export async function createCwToTonRelayerWithConfig(config: Config) {
export async function createCwToTonRelayerWithConfig(
config: Config,
injectLogger: Logger
) {
const logger = injectLogger;
const duckDb = await DuckDb.getInstance(config.connectionString);
const cosmosBlockOffset = new CosmosBlockOffset(duckDb);
await cosmosBlockOffset.createTable();
const startOffset = await cosmosBlockOffset.mayLoadBlockOffset(
config.syncBlockOffSet
);
logger.info(`CW_TO_TON start at: ${startOffset}`);
if (config.wasmBridge === "") {
throw new Error("WASM_BRIDGE is required");
}

const {
walletContract,
client: tonClient,
Expand All @@ -35,7 +43,6 @@ export async function createCwToTonRelayerWithConfig(config: Config) {
config.tonCenter,
config.tonApiKey
);

const lightClientMaster = tonClient.open(
LightClientMaster.createFromAddress(
Address.parse(TonDefaultConfig.cosmosLightClientMaster)
Expand All @@ -48,7 +55,6 @@ export async function createCwToTonRelayerWithConfig(config: Config) {
config.cosmosRpcUrl,
config.wasmBridge
);

const tonHandler = new TonHandler(
walletContract,
tonClient,
Expand All @@ -57,25 +63,31 @@ export async function createCwToTonRelayerWithConfig(config: Config) {
bridgeAdapter,
config.syncInterval
);

const packetProcessor = new PacketProcessor({
cosmosBlockOffset,
cosmosProofHandler,
tonHandler,
pollingInterval: config.syncInterval,
logger,
});

const watcher = await createCosmosBridgeWatcher(config);
packetProcessor.run();

watcher.on(
CosmwasmWatcherEvent.DATA,
async (data: Packets & { offset: number }) => {
const { transferPackets, ackPackets } = data;
packetProcessor.addPendingTransferPackets(transferPackets);
packetProcessor.addPendingAckPackets(ackPackets);
const { transferPackets, ackPackets, offset } = data;
logger.info(`CosmosWatcher synced at block: ${offset}`);
if (transferPackets && transferPackets.length > 0) {
logger.info(`Found ${transferPackets.length} TransferPackets`);
packetProcessor.addPendingTransferPackets(transferPackets);
}
if (ackPackets && ackPackets.length > 0) {
logger.info(`Found ${ackPackets.length} AckPackets`);
packetProcessor.addPendingAckPackets(ackPackets);
}
}
);

return watcher;
}

Expand Down
4 changes: 3 additions & 1 deletion packages/cw-to-ton/src/services/cosmos.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ export class CosmwasmWatcher<T> extends EventEmitter {
(parsedData.transferPackets.length > 0 ||
parsedData.ackPackets.length > 0)
) {
this.emit(CosmwasmWatcherEvent.DATA, { parsedData, offset });
this.emit(CosmwasmWatcherEvent.DATA, { ...parsedData, offset });
} else {
this.emit(CosmwasmWatcherEvent.DATA, { offset });
}
} catch (e) {
this.emit("error", `CosmwasmWatcher:Error when parsing data:${e}`);
Expand Down
2 changes: 0 additions & 2 deletions packages/cw-to-ton/src/services/ton.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ export class TonHandler {
};

async updateLightClient(clientData: LightClientData) {
logger.debug(`TonHandler:updateLightClient:${JSON.stringify(clientData)}`);
const header = deserializeHeader(clientData.header);
const height = BigInt(header.height);
await retry(
Expand Down Expand Up @@ -125,7 +124,6 @@ export class TonHandler {
packet: IntoCell,
proofs: ExistenceProof[]
) {
logger.debug(`TonHandler:sendPacket:${JSON.stringify(packet)}`);
try {
const seqno = await retry(async () => {
try {
Expand Down
20 changes: 12 additions & 8 deletions packages/orchestrator/.env.example
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
NODE_ENV=mainnet
REDIS_HOST="localhost"
REDIS_PORT=6379
TON_MNEMONIC=
COSMOS_MNEMONIC=
COSMOS_RPC_URL=
SYNC_BLOCK_OFFSET=
COSMOS_RPC_URL="https://rpc.orai.io/"
SYNC_BLOCK_OFFSET=31548100
SYNC_LIMIT=100
SYNC_THREADS=4
SYNC_INTERVAL=5000
TON_CENTER="https://toncenter.orai.io/jsonRPC"
TON_API_KEY=
TON_LITE_CLIENT_LIST="https://ton.org/global.config.json"
CONNECTION_STRING="relayer.duckdb"
WASM_BRIDGE=
WASM_VALIDATORS=
COSMOS_LIGHT_CLIENT_MASTER=
TON_BRIDGE=
WASM_VALIDATORS=orai16crw7g2rcvuga7vlnyxgwtdxtan46k8qqjjwhjqdjvjgk96n95es35q8vm
COSMOS_LIGHT_CLIENT_MASTER=EQDzy_POlimFDyzrHd3OQsb9sZCngyG3O7Za4GRFzM-rrO93
TON_BRIDGE=EQC-aFP0rJXwTgKZQJPbPfTSpBFc8wxOgKHWD9cPvOl_DnaY
WASM_BRIDGE=orai159l8l9c5ckhqpuwdfgs9p4v599nqt3cjlfahalmtrhfuncnec2ms5mz60e
# App configuration
LOG_LEVEL="info"
HEALTH_CHECK_PORT=3001
WEBHOOK_URL=


3 changes: 2 additions & 1 deletion packages/orchestrator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"@oraichain/tonbridge-relayer-to-cw": "^1.0.9",
"@oraichain/tonbridge-relayer-to-ton": "^1.1.0",
"bullmq": "^5.8.7",
"express": "^4.19.2"
"express": "^4.19.2",
"winston-transport-discord": "^1.0.3"
},
"devDependencies": {
"@types/express": "^4",
Expand Down
10 changes: 8 additions & 2 deletions packages/orchestrator/src/@types/global.d.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Logger } from "winston";

declare global {
// eslint-disable-next-line no-var
var logger: Logger;
namespace NodeJS {
interface ProcessEnv {
NODE_ENV: string;
REDIS_HOST: string;
REDIS_PORT: number;
TON_MNEMONIC: string;
COSMOS_MNEMONIC: string;
// SYNC_OPTS
Expand All @@ -23,6 +25,10 @@ declare global {
WASM_VALIDATORS: string;
TON_BRIDGE: string;
COSMOS_LIGHT_CLIENT_MASTER: string;

// APP CONFIG
WEBHOOK_URL: string;
HEALTH_CHECK_PORT: number;
}
interface BigInt {
toJSON(): string;
Expand Down
Loading

0 comments on commit aa04a2e

Please sign in to comment.