Skip to content

Commit

Permalink
Update pipelinecontroller class interface
Browse files Browse the repository at this point in the history
  • Loading branch information
peterMuriuki committed Jan 23, 2023
1 parent efaae2e commit 926ee8e
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 40 deletions.
4 changes: 2 additions & 2 deletions apps/web/src/hooks.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { pipelineController } from '$lib/server/appConfig';
import { configDir } from '$lib/server/constants';
import { watch } from 'node:fs/promises';

pipelineController.evaluateOnSchedule();
pipelineController.runOnSchedule();

(async () => {
const watcher = watch(configDir);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _event of watcher) {
if (_event.eventType === 'change' && _event.filename === 'local.json') {
pipelineController.reEvaluatedScheduled();
pipelineController.refreshConfigRunners();
}
}
})();
84 changes: 48 additions & 36 deletions packages/core/src/evaluator/pipelinesController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class PipelinesController {
/** Setup cron schedule tasks for all configs or single config if id is provided
* @param configId - id for the config whose pipeline should be queed to run on a schedule.
*/
evaluateOnSchedule(configId?: string) {
runOnSchedule(configId?: string) {
if (configId) {
const interestingPipeline = this.pipelines[configId];
const task = interestingPipeline.schedule();
Expand All @@ -45,47 +45,75 @@ export class PipelinesController {
/** stops and removes the cron tasks that run pipelines.
* @param configId - stop only the pipeline that is keyed by this configid
*/
stopScheduledEvaluations(configId?: string) {
stopAndRemoveTasks(configId?: string) {
if (configId) {
const interestingTask = this.tasks[configId];
const interestingPipeline = this.pipelines[configId];
interestingPipeline.cancel();
interestingTask.stop();
interestingTask?.stop();
delete this.tasks[configId];
} else {
for (const pipeline of Object.values(this.pipelines)) {
pipeline.cancel();
}
for (const task of Object.values(this.tasks)) {
Object.values(this.tasks).forEach((task) => {
task.stop();
});
this.tasks = {};
}
}

/** cancel all running pipelines
* @param configId - cancel single pipeline keyed by this configId
*/
cancelPipelines(configId?: string) {
if (configId) {
const interestingPipeline = this.pipelines[configId];
return interestingPipeline.cancel();
} else {
const cancelResults = [];
for (const pipelines of Object.values(this.pipelines)) {
cancelResults.push(pipelines.cancel());
}
return cancelResults;
}
}

removePipelines(configId?: string) {
if (configId) {
delete this.pipelines[configId];
} else {
this.pipelines = {};
}
}

/** Called to restart pipeline's evaluation should configs change */
reEvaluatedScheduled() {
refreshConfigRunners() {
const configs = this.getConfigs();
const existingPipelinesCounter = {
...this.pipelines
};

for (const config of configs) {
const thisConfigId = config.uuid;
delete existingPipelinesCounter[thisConfigId];
const existingPipeline = this.pipelines[thisConfigId];
delete existingPipelinesCounter[thisConfigId];

if (!existingPipeline) {
// new config was added.
this.pipelines[thisConfigId] = new ConfigRunner(config);
} else if (!isEqual(this.pipelines.config, config)) {
this.stopScheduledEvaluations(thisConfigId);
// update pipeline
existingPipeline.updateConfig(config);
existingPipeline.schedule();
this.runOnSchedule(thisConfigId);
continue;
}
if (isEqual(existingPipeline.config, config)) {
continue;
} else {
this.cancelPipelines(thisConfigId);
this.removePipelines(thisConfigId);
this.stopAndRemoveTasks(thisConfigId);
this.pipelines[thisConfigId] = new ConfigRunner(config);
this.runOnSchedule(thisConfigId);
}
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for (const [uuid, _] of Object.entries(existingPipelinesCounter)) {
this.stopScheduledEvaluations(uuid);
delete this.pipelines[uuid];
delete this.tasks[uuid];
this.cancelPipelines(uuid);
this.removePipelines(uuid);
this.stopAndRemoveTasks(uuid);
}
}

Expand All @@ -101,22 +129,6 @@ export class PipelinesController {
return Result.ok('Pipeline triggered successfully, running in the background');
}

/** cancel all running pipelines
* @param configId - cancel single pipeline keyed by this configId
*/
cancel(configId?: string) {
if (configId) {
const interestingPipeline = this.pipelines[configId];
return interestingPipeline.cancel();
} else {
const cancelResults = [];
for (const pipelines of Object.values(this.pipelines)) {
cancelResults.push(pipelines.cancel());
}
return cancelResults;
}
}

/** returns all pipelines created so far */
getPipelines(configId?: string) {
if (configId) {
Expand Down
40 changes: 38 additions & 2 deletions packages/core/src/evaluator/tests/pipelinesController.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ it('works correctly nominal case', async () => {
expect(pipelinesController.getPipelines()).toMatchObject({});
expect(pipelinesController.getTasks()).toMatchObject({});

pipelinesController.evaluateOnSchedule();
pipelinesController.runOnSchedule();
expect(pipelinesController.getPipelines()).toMatchObject({});
expect(pipelinesController.getTasks()).toMatchObject({});

pipelinesController.cancel();
pipelinesController.cancelPipelines();
expect(pipelinesController.getPipelines()).toMatchObject({});
expect(pipelinesController.getTasks()).toMatchObject({});

Expand Down Expand Up @@ -161,4 +161,40 @@ it('error when fetching the registration form', async () => {
expect(nock.pendingMocks()).toEqual([]);
});

test('updates configs from empty configs', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let startingConfigs: any[] = [[]];
const pipelinesController = new PipelinesController(() => startingConfigs[0]);

expect(pipelinesController.getPipelines()).toEqual([]);
expect(pipelinesController.getTasks()).toEqual([]);

const sampleConfig = {
baseUrl: 'https://stage-api.ona.io',
regFormId: '3623',
visitFormId: '3624',
apiToken: '<Replace with api token>',
symbolConfig: [
{
priorityLevel: 'Very_High',
frequency: '8',
symbologyOnOverflow: [
{
overFlowDays: '3',
color: '#ff0000'
}
]
}
],
schedule: '0 5 */7 * *',
uuid: 'fcbae261-780d-4fd5-abcf-766f51af085e'
};

startingConfigs = [[sampleConfig]];

pipelinesController.refreshConfigRunners();
expect(pipelinesController.getPipelines()).toMatchObject([expect.any(ConfigRunner)]);
expect(pipelinesController.getTasks()).toHaveLength(1);
});

// can cancel evaluation.

0 comments on commit 926ee8e

Please sign in to comment.