Skip to content

Commit

Permalink
Update runner logic that shows errors and running status
Browse files Browse the repository at this point in the history
  • Loading branch information
peterMuriuki committed Jan 23, 2023
1 parent eee3a29 commit 7947d40
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 74 deletions.
14 changes: 1 addition & 13 deletions apps/web/src/lib/server/appConfig/index.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
import { PipelinesController, validateConfigs } from '@onaio/symbology-calc-core';
import { PipelinesController } from '@onaio/symbology-calc-core';
import type { SingleApiSymbolConfig } from 'src/lib/shared/types';
import { geoSymbolLogger } from '../logger/winston';
import { allSymbolConfigsAccessor } from '$lib/server/constants';
import { uniqWith } from 'lodash-es';
import yup from 'yup';
import type { Config } from '@onaio/symbology-calc-core';
import { getConfig } from './utils';
import { readMetricOverride, writePripelineMetrics } from '../logger/configMetrics';

export function webValidateConfigs(config: Config) {
const extraSchema = yup.object().shape({
uuid: yup.string().uuid().required('Symbology config does not have a uuid')
});

validateConfigs(config);
extraSchema.validateSync(config);
}

export function getAllSymbologyConfigs() {
const rawSymbologyConfigs = getConfig(
allSymbolConfigsAccessor,
Expand All @@ -31,7 +20,6 @@ export function getAllSymbologyConfigs() {
readMetric: readMetricOverride
};
});
allSymbologyConfigs.forEach(webValidateConfigs);
const uniqAllSymbolConfigs = uniqWith(allSymbologyConfigs, (config1, config2) => {
return config1.uuid === config2.uuid;
});
Expand Down
1 change: 0 additions & 1 deletion apps/web/src/routes/configs/+server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ export async function PUT({ request }) {
...configsByUuid,
[payload.uuid]: { ...payload, apiToken: configToBeReplaced.apiToken }
};
console.log({ newConfigs }, configToBeReplaced.apiToken);
const newDataConfigs = {
...data,
allSymbologyConfigs: Object.values(newConfigs)
Expand Down
9 changes: 6 additions & 3 deletions apps/web/src/routes/workflows/+page.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import type { ConfigRunner } from '@onaio/symbology-calc-core';
export function load() {
const configs = getClientSideSymbologyConfigs();
const ConfigsWithMetrics = configs.map((config) => {
const metricForThisConfig = getLastPipelineMetricForConfig(config.uuid);
const isRunning = (pipelineController.getPipelines(config.uuid) as ConfigRunner)?.isRunning();
const configId = config.uuid;
const metricForThisConfig = getLastPipelineMetricForConfig(configId);
const pipeLineRunner = pipelineController.getPipelines(configId) as ConfigRunner;
const isRunning = pipeLineRunner?.isRunning();
return {
...config,
metric: metricForThisConfig,
isRunning
isRunning,
invalidityErrror: pipeLineRunner.invalidError
};
});
return {
Expand Down
7 changes: 7 additions & 0 deletions apps/web/src/routes/workflows/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@
</button>
</div>
<div class="card-body">
{#if config.invalidityErrror !== null}
<div class="card mb-1">
<div class="card-body">
<span class="text-danger">{config.invalidityErrror}</span>
</div>
</div>
{/if}
<dl class="row">
<dt class="col-sm-3">API Base url</dt>
<dd class="col-sm-9">{config.baseUrl}</dd>
Expand Down
4 changes: 4 additions & 0 deletions packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ export interface Config {
// cron-like syntax that represents when the pipeline represented by this config runs.
// this can be more easily generated using an online tool like https://crontab.cronhub.io/
schedule: CronTabString;
// how many registration form submissions to process at a time.
regFormSubmissionChunks?: number;
// store metric; progress information regarding a running pipeline or the last run of an pipeline
writeMetric: WriteMetric;
}

```
Expand Down
40 changes: 27 additions & 13 deletions packages/core/src/evaluator/configRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import {
createInfoLog,
createErrorLog,
defaultWriteMetric,
colorDeciderFactory
colorDeciderFactory,
validateConfigs
} from '../helpers/utils';
import cron from 'node-cron';
import { createMetricFactory } from './helpers/utils';
Expand All @@ -22,9 +23,19 @@ export class ConfigRunner {
public config: Config;
/** Whether pipeline/runner is currently evaluating */
private running = false;
/** request abort controller */
private abortController;
/** stores validity of config */
public invalidError: string | null = null;

constructor(config: Config) {
this.config = config;
this.abortController = new AbortController();
try {
validateConfigs(config);
} catch (err: unknown) {
this.invalidError = (err as Error).message;
}
}

/** Runs the pipeline, generator that yields metric information regarding the current run */
Expand All @@ -37,14 +48,11 @@ export class ConfigRunner {
logger,
baseUrl,
apiToken,
requestController,
uuid: configId,
regFormSubmissionChunks: facilityProcessingChunks
} = config;
const regFormSubmissionChunks = facilityProcessingChunks ?? 1000;

this.running = true;

const startTime = Date.now();
const createMetric = createMetricFactory(startTime, configId);
let evaluatedSubmissions = 0;
Expand All @@ -62,20 +70,21 @@ export class ConfigRunner {
totalRegFormSubmissions
);

const service = new OnaApiService(baseUrl, apiToken, logger, requestController);
const service = new OnaApiService(baseUrl, apiToken, logger, this.abortController);
const colorDecider = colorDeciderFactory(symbolConfig, logger);

abortableBlock: {
const regForm = await service.fetchSingleForm(regFormId);
if (regForm.isFailure) {
return createMetric(
yield createMetric(
evaluatedSubmissions,
notModifiedWithoutError,
notModifiedWithError,
modified,
totalRegFormSubmissions,
true
);
return;
}
const regFormSubmissionsNum = regForm.getValue()[numOfSubmissionsAccessor];
totalRegFormSubmissions = regFormSubmissionsNum;
Expand Down Expand Up @@ -140,24 +149,28 @@ export class ConfigRunner {
totalRegFormSubmissions,
true
);
this.running = false;
}

/** Wrapper around the transform generator, collates the metrics and calls a callback that
* inverts the control of writing the metric information to the configs writeMetric method.
*/
async transform() {
// create a function that parses the config and supplies default values.
const config = this.config;
if (this.invalidError) {
return Result.fail(`Configuration is not valid, ${this.invalidError}`);
}
// create a function that parses the config and supplies default values.
const WriteMetric = config.writeMetric ?? defaultWriteMetric;
if (this.running) {
return Result.fail('Pipeline is already running.');
} else {
this.running = true;
let finalMetric;
for await (const metric of this.transformGenerator()) {
WriteMetric(metric);
finalMetric = metric;
}
this.running = false;
return Result.ok(finalMetric);
}
}
Expand All @@ -176,11 +189,7 @@ export class ConfigRunner {

/** Cancel evaluation using a configured abortController */
cancel() {
const config = this.config;
const abortController = config.requestController;
if (!abortController) {
return Result.fail(`Abort controller not set for config : ${config.uuid}`);
}
const abortController = this.abortController;
if (!this.running) {
return;
}
Expand All @@ -198,4 +207,9 @@ export class ConfigRunner {
isRunning() {
return this.running;
}

/** Whether config for this runner is valid */
isValid() {
return this.invalidError === null;
}
}
2 changes: 0 additions & 2 deletions packages/core/src/evaluator/pipelinesController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import { ConfigRunner } from './configRunner';
import { isEqual } from 'lodash-es';
import { Result } from '../helpers/Result';

console.log({ cron });

interface GetConfigs {
(): Config[];
}
Expand Down
8 changes: 2 additions & 6 deletions packages/core/src/evaluator/tests/fixtures/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ import { defaultWriteMetric } from '../../../helpers/utils';
export { form3623, form3623Submissions, form3624Submissions };

export const apiToken = 'apiToken';
export const createConfigs = (
loggerMock: jest.Mock,
controller = new AbortController()
): Config => ({
export const createConfigs = (loggerMock: jest.Mock): Config => ({
uuid: 'uuid',
regFormId: '3623',
visitFormId: '3624',
Expand Down Expand Up @@ -80,6 +77,5 @@ export const createConfigs = (
apiToken,
baseUrl: 'https://test-api.ona.io',
schedule: '* * * * *',
writeMetric: defaultWriteMetric,
requestController: controller
writeMetric: defaultWriteMetric
});
1 change: 0 additions & 1 deletion packages/core/src/evaluator/tests/helpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ describe('transform facility tests', () => {
);

expect(response.modified).toBeFalsy();
console.log(typeof response.error);
expect(response.error).toEqual('400: {"message":"error"}: Network request failed.');
});

Expand Down
10 changes: 6 additions & 4 deletions packages/core/src/evaluator/tests/pipelinesController.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ jest.mock('node-cron', () => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
schedule: (cronString: string, _callback: () => unknown) => {
return `task-${cronString}`;
}
},
validate: () => true
};
});

Expand Down Expand Up @@ -112,10 +113,11 @@ it('works correctly nominal case', async () => {
const configRunner = pipelinesController.getPipelines(configs.uuid);
const runner = configRunner as ConfigRunner;
expect(runner.isRunning()).toBeFalsy();
expect(runner.isValid()).toBeTruthy();
const response = runner.transform();
expect(runner.isRunning()).toBeTruthy();
const metric = await response;
console.log({ configRunner });

expect(runner.isRunning()).toBeFalsy();
expect(loggerMock.mock.calls).toEqual(logCalls);
expect(metric.getValue()).toEqual({
Expand All @@ -140,12 +142,12 @@ it('error when fetching the registration form', async () => {
nock(configs.baseUrl).get(`/${formEndpoint}/3623`).replyWithError('Could not find form with id');

const pipelinesController = new PipelinesController(() => [configs]);
const configRunner = pipelinesController.getPipelines(configs.uuid);
const configRunner = pipelinesController.getPipelines(configs.uuid) as ConfigRunner;
const runner = configRunner as ConfigRunner;
await runner.transform().catch((err) => {
throw err;
});

expect(configRunner.isRunning()).toBeFalsy();
expect(loggerMock.mock.calls).toEqual([
[
{
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/helpers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ export interface Config {
schedule: CronTabString;
// how many registration form submissions to process at a time.
regFormSubmissionChunks?: number;
// store metric; progress information regarding a running or the last run of an pipeline
// store metric; progress information regarding a running pipeline or the last run of an pipeline
writeMetric: WriteMetric;
// Abort controller to abort evaluation of this pipeline
requestController: AbortController;
}

export enum PriorityLevel {
Expand Down
28 changes: 0 additions & 28 deletions packages/core/src/services/onaApi/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export const customFetch = async (input: RequestInfo, init?: RequestInit) => {
}
};
const response = await persistentFetch(input, requestOptionsWithRetry).catch((err) => {
console.log('===>', err);
throw Error(`${err.type}: ${err.name}: ${err.message}.`);
});
if (response?.ok) {
Expand Down Expand Up @@ -156,11 +155,8 @@ export class OnaApiService {
const paginatedSubmissionsUrl = `${fullSubmissionsUrl}?${sParams.toString()}`;

page = page + 1;
// const stop1StartFetchPage = performance.now(); //
console.log('We also got here');
yield await customFetch(paginatedSubmissionsUrl, { ...this.getCommonFetchOptions() })
.then((res) => {
console.log({ res });
return (res.json() as Promise<FormSubmissionT[]>).then((res) => {
this.logger?.(
createInfoLog(
Expand All @@ -178,18 +174,6 @@ export class OnaApiService {
);
return Result.fail<FormSubmissionT[]>(err.message);
});
// .finally(() => {
// const stop2EndFetchPage = performance.now();
// if (pageSize > 100) {
// this.logger?.(
// createInfoLog(
// `Fetched page: ${page}: from ${stop1StartFetchPage} to ${stop2EndFetchPage} i.e in: ${
// stop2EndFetchPage - stop1StartFetchPage
// }`
// )
// );
// }
// });
} while (page * pageSize <= totalSubmissions);
}

Expand Down Expand Up @@ -217,7 +201,6 @@ export class OnaApiService {
};
const fullEditSubmissionUrl = `${this.baseUrl}/${editSubmissionPath}`;

// const stop1StartEdit = performance.now();
return await customFetch(fullEditSubmissionUrl, {
...this.getCommonFetchOptions(),
method: 'POST',
Expand All @@ -234,24 +217,13 @@ export class OnaApiService {
});
})
.catch((err) => {
console.log('==>', { err });
this.logger?.(
createErrorLog(
`Failed to edit sumbission with _id: ${submissionPayload._id} for form with id: ${formId} with err: ${err.message}`
)
);
return Result.fail(err);
});
// .finally(() => {
// const stop1StopEdit = performance.now();
// this.logger?.(
// createInfoLog(
// `Editing submission with _id: ${submissionPayload._id} took ${
// stop1StopEdit - stop1StartEdit
// }`
// )
// );
// });
}
}

Expand Down

0 comments on commit 7947d40

Please sign in to comment.