Skip to content

Commit

Permalink
Task executions cleanup propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
claudiahub committed Aug 2, 2023
1 parent 2b14e42 commit 86db491
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
27 changes: 20 additions & 7 deletions ui/src/app/shared/api/task.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,29 +117,42 @@ export class TaskService {
}

executionsClean(taskExecutions: TaskExecution[]): Observable<any> {
return new Observable<any>(subscriber => {
this.executionsCleanAll(taskExecutions)
.then(value => {
subscriber.next(taskExecutions.length);
})
.catch(reason => {
subscriber.error(reason);
});
});
}

private async executionsCleanAll(taskExecutions: TaskExecution[]): Promise<any> {
const taskExecutionsChildren = taskExecutions.filter(taskExecution => taskExecution.parentExecutionId);
const taskExecutionsParents = taskExecutions.filter(taskExecution => !taskExecution.parentExecutionId);
return this.executionsCleanBySchema(taskExecutionsChildren).pipe(
mergeMap(result => this.executionsCleanBySchema(taskExecutionsParents))
);
const result = (await this.executionsCleanBySchema(taskExecutionsChildren)) as number;
return result + (await this.executionsCleanBySchema(taskExecutionsParents));
}

executionsCleanBySchema(taskExecutions: TaskExecution[]): Observable<any> {
private async executionsCleanBySchema(taskExecutions: TaskExecution[]): Promise<any> {
const groupBySchemaTarget = taskExecutions.reduce((group, task) => {
const schemaTarget = task.schemaTarget;
group[schemaTarget] = group[schemaTarget] ?? [];
group[schemaTarget].push(task);
return group;
}, {});
const observables: Observable<any>[] = [];
let result = 0;
for (const schemaTarget in groupBySchemaTarget) {
if (schemaTarget) {
const group: TaskExecution[] = groupBySchemaTarget[schemaTarget];
const ids = group.map(task => task.executionId);
observables.push(this.taskExecutionsCleanByIds(ids, schemaTarget));
if (ids.length > 0) {
result = (await this.taskExecutionsCleanByIds(ids, schemaTarget).toPromise()) as number;
}
}
}
return forkJoin(observables);
return result;
}

taskExecutionsCleanByIds(ids: number[], schemaTarget: string): Observable<any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class CleanupComponent {
data => {
this.notificationService.success(
this.translate.instant('executions.cleanup.message.successTitle'),
this.translate.instant('executions.cleanup.message.successContent', {count: data.length})
this.translate.instant('executions.cleanup.message.successContent', {count: data})
);
this.onCleaned.emit(data);
this.isOpen = false;
Expand Down

0 comments on commit 86db491

Please sign in to comment.