Skip to content

Commit

Permalink
stress: logs to graph test results (#1227)
Browse files Browse the repository at this point in the history
Closes: TOWNS-12692

I still need to play more with this to check if it's enough.
Usually, if something fails, it will be on the join chat step.
  • Loading branch information
miguel-nascimento authored Oct 15, 2024
1 parent 4b47dee commit d82d1a3
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 53 deletions.
40 changes: 28 additions & 12 deletions packages/stress/src/mode/chat/root_chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { kickoffChat } from './kickoffChat'
import { joinChat } from './joinChat'
import { updateProfile } from './updateProfile'
import { chitChat } from './chitChat'
import { sumarizeChat } from './sumarizeChat'
import { summarizeChat } from './summarizeChat'
import { statsReporter } from './statsReporter'
import { getChatConfig } from '../common/common'
import { getLogger } from '../../utils/logger'
Expand Down Expand Up @@ -45,10 +45,11 @@ export async function startStressChat(opts: {
`clients.length !== chatConfig.clientsPerProcess ${clients.length} !== ${chatConfig.clientsPerProcess}`,
)

let cancelStatsReporting: (() => void) | undefined
let cancelReactionCounter: (() => void) | undefined
const { reactionCounter, logStep } = statsReporter(chatConfig)

if (chatConfig.processIndex === 0) {
cancelStatsReporting = statsReporter(clients[0], chatConfig)
cancelReactionCounter = reactionCounter(clients[0])

for (
let i = chatConfig.clientsCount;
Expand Down Expand Up @@ -77,11 +78,16 @@ export async function startStressChat(opts: {
const span = clients.slice(i, i + PARALLEL_UPDATES)
const results = await Promise.allSettled(span.map((client) => joinChat(client, chatConfig)))
results.forEach((r, index) => {
const client = span[index]
if (r.status === 'rejected') {
const client = span[index]
client.logger.error(r, 'error joinChat')
errors.push(r.reason)
}
logStep(
client,
'JOIN_CHAT',
r.status === 'fulfilled',
r.status === 'fulfilled' ? { span } : { reason: r.reason },
)
})
}

Expand All @@ -92,30 +98,40 @@ export async function startStressChat(opts: {
span.map((client) => updateProfile(client, chatConfig)),
)
results.forEach((r, index) => {
const client = span[index]
if (r.status === 'rejected') {
const client = span[index]
client.logger.error(r, 'error updateProfile')
errors.push(r.reason)
}
logStep(
client,
'UPDATE_PROFILE',
r.status === 'fulfilled',
r.status === 'fulfilled' ? { span } : { reason: r.reason },
)
})
}

logger.info('chitChat')
const results = await Promise.allSettled(clients.map((client) => chitChat(client, chatConfig)))
results.forEach((r, index) => {
const client = clients[index]
if (r.status === 'rejected') {
const client = clients[index]
client.logger.error(r, 'error chitChat')
errors.push(r.reason)
}
logStep(
client,
'CHIT_CHAT',
r.status === 'fulfilled',
r.status === 'fulfilled' ? {} : { reason: r.reason },
)
})

logger.info('sumarizeChat')
const summary = await sumarizeChat(clients, chatConfig, errors)
logger.info('summarizeChat')
const summary = await summarizeChat(clients, chatConfig, errors)

logger.info({ summary }, 'done')

cancelStatsReporting?.()
cancelReactionCounter?.()

for (let i = 0; i < clients.length; i += 1) {
const client = clients[i]
Expand Down
96 changes: 56 additions & 40 deletions packages/stress/src/mode/chat/statsReporter.ts
Original file line number Diff line number Diff line change
@@ -1,57 +1,73 @@
import type { StressClient } from '../../utils/stressClient'
import { ChatConfig } from '../common/types'
import { getLogger } from '../../utils/logger'

const logger = getLogger('stress:statsReporter')

export function statsReporter(rootClient: StressClient, chatConfig: ChatConfig) {
let canceled = false
let lastReactionCount = 0
const interval = setInterval(() => {
if (canceled) {
return
}
void (async () => {
if (chatConfig.kickoffMessageEventId && chatConfig.countClientsMessageEventId) {
const reactionCount = countReactions(
rootClient,
chatConfig.announceChannelId,
chatConfig.kickoffMessageEventId,
)
export function statsReporter(chatConfig: ChatConfig) {
return {
logStep: (
client: StressClient,
step: string,
isSuccess: boolean,
metadata?: Record<string, unknown>,
) => {
client.logger.info(
{
sequence: 'STRESS_RESULT',
step,
result: isSuccess ? 'PASS' : 'FAIL',
metadata,
},
'stress step result',
)
},
reactionCounter: (rootClient: StressClient) => {
let canceled = false
let lastReactionCount = 0
const interval = setInterval(() => {
if (canceled) {
return
}
if (lastReactionCount === reactionCount) {
return
}
lastReactionCount = reactionCount
await updateCountClients(
rootClient,
chatConfig.announceChannelId,
chatConfig.countClientsMessageEventId,
chatConfig.clientsCount,
reactionCount,
)
void (async () => {
if (chatConfig.kickoffMessageEventId && chatConfig.countClientsMessageEventId) {
const reactionCount = countReactions(
rootClient,
chatConfig.announceChannelId,
chatConfig.kickoffMessageEventId,
)
if (canceled) {
return
}
if (lastReactionCount === reactionCount) {
return
}
lastReactionCount = reactionCount
await updateCountClients(
rootClient,
chatConfig.announceChannelId,
chatConfig.countClientsMessageEventId,
chatConfig.clientsCount,
reactionCount,
)
}
})()
}, 5000)
return () => {
rootClient.logger.debug('canceled')
clearInterval(interval)
canceled = true
}
})()
}, 5000)

return () => {
logger.debug('stat reporter canceled')
clearInterval(interval)
canceled = true
},
}
}

export const updateCountClients = async (
client: StressClient,
rootClient: StressClient,
announceChannelId: string,
countClientsMessageEventId: string,
totalClients: number,
reactionCounts: number,
) => {
logger.info(`Clients: ${reactionCounts}/${totalClients} 🤖`)
return await client.streamsClient.sendChannelMessage_Edit_Text(
rootClient.logger.info(`Clients: ${reactionCounts}/${totalClients} 🤖`)
return await rootClient.streamsClient.sendChannelMessage_Edit_Text(
announceChannelId,
countClientsMessageEventId,
{
Expand All @@ -65,11 +81,11 @@ export const updateCountClients = async (
}

export const countReactions = (
client: StressClient,
rootClient: StressClient,
announceChannelId: string,
rootMessageId: string,
) => {
const channel = client.streamsClient.stream(announceChannelId)
const channel = rootClient.streamsClient.stream(announceChannelId)
if (!channel) {
return 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { channelMessagePostWhere } from '../../utils/timeline'
import { isDefined } from '@river-build/sdk'
import { makeCodeBlock } from '../../utils/messages'

export async function sumarizeChat(
export async function summarizeChat(
localClients: StressClient[],
cfg: ChatConfig,
errors: unknown[],
Expand Down
1 change: 1 addition & 0 deletions packages/stress/src/utils/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const baseLogger = pino(pinoOptions)
export function getLogger(name: string, meta: Record<string, unknown> = {}) {
return baseLogger.child({
name,
sessionId: process.env.SESSION_ID,
containerIndex: process.env.CONTAINER_INDEX,
processIndex: process.env.PROCESS_INDEX,
...meta,
Expand Down

0 comments on commit d82d1a3

Please sign in to comment.