Skip to content

Commit

Permalink
Add num days to task cleanup (#5453)
Browse files Browse the repository at this point in the history
* Add num days to task cleanup.

See #5422

Co-authored-by: Tobias Soloschenko <tsoloschenko@apache.org>
  • Loading branch information
onobc and klopfdreh authored Sep 4, 2023
1 parent e702147 commit af60a25
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.cloud.dataflow.aggregate.task;

import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -94,10 +95,20 @@ public interface AggregateTaskExplorer {
* Get a list of executions for a task by name and completion status.
*
* @param taskName the name of the task to be searched
* @param completed Indicator to find only completed tasks
* @param onlyCompleted whether to include only completed tasks
* @return list of task executions
*/
List<AggregateTaskExecution> findTaskExecutionsByName(String taskName, boolean completed);
List<AggregateTaskExecution> findTaskExecutions(String taskName, boolean onlyCompleted);

/**
* Get a list of executions for a task by name, completion status and end time.
*
* @param taskName the name of the task to be searched
* @param endTime the tasks that ended before the endTime
* @return list of task executions
* @since 2.11.0
*/
List<AggregateTaskExecution> findTaskExecutionsBeforeEndTime(String taskName, Date endTime);

/**
* Get a collection/page of executions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
package org.springframework.cloud.dataflow.aggregate.task;

import java.util.Collection;
import java.util.Date;
import java.util.List;

import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.lang.NonNull;

/**
* Repository to access {@link TaskExecution}s. Mirrors the {@link TaskExecutionDao}
* but contains Spring Cloud Data Flow specific operations. This functionality might
* be migrated to Spring Cloud Task itself.
*
* @author Corneil du Plessis
* @since 2.11.0
*/
public interface DataflowTaskExecutionQueryDao {
/**
Expand Down Expand Up @@ -59,26 +62,44 @@ public interface DataflowTaskExecutionQueryDao {
List<AggregateTaskExecution> findChildTaskExecutions(Collection<Long> parentIds, String schemaTarget);

/**
* Retrieves a subset of task executions by task name and execution status.
* Find task executions by task name and completion status.
*
* @param taskName the name of the task to search for in the repository.
* @param completed indicator to retrieve only completed task executions.
* @return a list that contains task executions.
* @param completed whether to include only completed task executions.
* @return list of task executions
*/
List<AggregateTaskExecution> findTaskExecutionsByName(String taskName, boolean completed);
List<AggregateTaskExecution> findTaskExecutions(String taskName, boolean completed);

/**
* Find task executions by task name whose end date is before the specified date.
*
* @param taskName the name of the task to search for in the repository.
* @param endTime the time before the task ended.
* @return list of task executions.
*/
List<AggregateTaskExecution> findTaskExecutionsBeforeEndTime(String taskName, @NonNull Date endTime);

/**
* Retrieves current number of task executions for a taskName.
*
* @param taskName the name of the task to search for in the repository.
* @param taskName the name of the task
* @return current number of task executions for the taskName.
*/
long getTaskExecutionCountByTaskName(String taskName);

/**
* Retrieves the number of task execution that have completed.
* Retrieves current number of task executions for a taskName and with a non-null endTime before the specified date.
*
* @param taskName the name of the task to search
* @param taskName the name of the task
* @param endTime the time before task ended
* @return the number of completed task executions
*/
long getCompletedTaskExecutionCountByTaskNameAndBeforeDate(String taskName, @NonNull Date endTime);

/**
* Retrieves current number of task executions for a taskName and with a non-null endTime.
*
* @param taskName the name of the task
* @return the number of completed task executions
*/
long getCompletedTaskExecutionCountByTaskName(String taskName);
Expand All @@ -88,7 +109,7 @@ public interface DataflowTaskExecutionQueryDao {
* null.
*
* @param taskName the name of the task to search for in the repository.
* @return current number of task executions for the taskName.
* @return the number of running task executions
*/
long getRunningTaskExecutionCountByTaskName(String taskName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
*/
package org.springframework.cloud.dataflow.aggregate.task.impl;

import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,6 +49,7 @@
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -86,75 +89,65 @@ public class AggregateDataFlowTaskExecutionQueryDao implements DataflowTaskExecu
private static final String FIND_TASK_ARGUMENTS = "SELECT TASK_EXECUTION_ID, "
+ "TASK_PARAM from AGGREGATE_TASK_EXECUTION_PARAMS where TASK_EXECUTION_ID = :taskExecutionId and SCHEMA_TARGET = :schemaTarget";

private static final String GET_EXECUTION_BY_ID = "SELECT TASK_EXECUTION_ID,"
+ "START_TIME, END_TIME, TASK_NAME, EXIT_CODE,"
+ "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID,"
+ "PARENT_EXECUTION_ID, SCHEMA_TARGET"
+ " from AGGREGATE_TASK_EXECUTION where TASK_EXECUTION_ID = :taskExecutionId and SCHEMA_TARGET = :schemaTarget";

private final static String GET_CHILD_EXECUTION_BY_ID = "SELECT TASK_EXECUTION_ID," +
"START_TIME, END_TIME, TASK_NAME, EXIT_CODE," +
"EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," +
"PARENT_EXECUTION_ID, SCHEMA_TARGET" +
" from AGGREGATE_TASK_EXECUTION" +
private static final String GET_EXECUTIONS = "SELECT " + SELECT_CLAUSE +
" from AGGREGATE_TASK_EXECUTION";

private static final String GET_EXECUTION_BY_ID = GET_EXECUTIONS +
" where TASK_EXECUTION_ID = :taskExecutionId and SCHEMA_TARGET = :schemaTarget";

private final static String GET_CHILD_EXECUTION_BY_ID = GET_EXECUTIONS +
" where PARENT_EXECUTION_ID = :taskExecutionId" +
" and (SELECT COUNT(*) FROM AGGREGATE_TASK_EXECUTION_PARAMS P " +
" WHERE P.TASK_EXECUTION_ID=TASK_EXECUTION_ID " +
" AND P.SCHEMA_TARGET=SCHEMA_TARGET" +
" AND P.TASK_PARAM = :schemaTarget) > 0";

private final static String GET_CHILD_EXECUTION_BY_IDS = "SELECT TASK_EXECUTION_ID," +
"START_TIME, END_TIME, TASK_NAME, EXIT_CODE," +
"EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," +
"PARENT_EXECUTION_ID, SCHEMA_TARGET" +
" from AGGREGATE_TASK_EXECUTION" +
private final static String GET_CHILD_EXECUTION_BY_IDS = GET_EXECUTIONS +
" where PARENT_EXECUTION_ID IN (:taskExecutionIds)" +
" and (SELECT COUNT(*) FROM AGGREGATE_TASK_EXECUTION_PARAMS P " +
" WHERE P.TASK_EXECUTION_ID=TASK_EXECUTION_ID " +
" AND P.SCHEMA_TARGET=SCHEMA_TARGET" +
" AND P.TASK_PARAM = :schemaTarget) > 0";

private static final String GET_EXECUTION_BY_EXTERNAL_EXECUTION_ID = "SELECT TASK_EXECUTION_ID,"
+ "START_TIME, END_TIME, TASK_NAME, EXIT_CODE,"
+ "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID,"
+ "PARENT_EXECUTION_ID, SCHEMA_TARGET"
+ " from AGGREGATE_TASK_EXECUTION where EXTERNAL_EXECUTION_ID = :externalExecutionId and TASK_NAME = :taskName";

private static final String GET_EXECUTIONS_BY_NAME_COMPLETED = "SELECT TASK_EXECUTION_ID,"
+ "START_TIME, END_TIME, TASK_NAME, EXIT_CODE,"
+ "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID,"
+ "PARENT_EXECUTION_ID, SCHEMA_TARGET"
+ " from AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME IS NOT NULL";

private static final String GET_EXECUTIONS_BY_NAME = "SELECT TASK_EXECUTION_ID,"
+ "START_TIME, END_TIME, TASK_NAME, EXIT_CODE,"
+ "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID,"
+ "PARENT_EXECUTION_ID, SCHEMA_TARGET"
+ " from AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName";
private static final String GET_EXECUTIONS_COMPLETED = "SELECT TASK_EXECUTION_ID,"
+ "START_TIME, END_TIME, TASK_NAME, EXIT_CODE,"
+ "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID,"
+ "PARENT_EXECUTION_ID, SCHEMA_TARGET"
+ " from AGGREGATE_TASK_EXECUTION where END_TIME IS NOT NULL";

private static final String GET_EXECUTIONS = "SELECT TASK_EXECUTION_ID,"
+ "START_TIME, END_TIME, TASK_NAME, EXIT_CODE,"
+ "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID,"
+ "PARENT_EXECUTION_ID, SCHEMA_TARGET"
+ " from AGGREGATE_TASK_EXECUTION";
private static final String GET_EXECUTION_BY_EXTERNAL_EXECUTION_ID = GET_EXECUTIONS +
" where EXTERNAL_EXECUTION_ID = :externalExecutionId and TASK_NAME = :taskName";

private static final String GET_EXECUTIONS_BY_NAME_COMPLETED = GET_EXECUTIONS +
" where TASK_NAME = :taskName AND END_TIME IS NOT NULL";

private static final String GET_EXECUTIONS_BY_NAME = GET_EXECUTIONS +
" where TASK_NAME = :taskName";

private static final String GET_EXECUTIONS_COMPLETED = GET_EXECUTIONS +
" where END_TIME IS NOT NULL";

private static final String GET_EXECUTION_BY_NAME_COMPLETED_BEFORE_END_TIME = GET_EXECUTIONS +
" where TASK_NAME = :taskName AND END_TIME IS NOT NULL AND END_TIME < :endTime";

private static final String GET_EXECUTIONS_COMPLETED_BEFORE_END_TIME = GET_EXECUTIONS +
" where END_TIME IS NOT NULL AND END_TIME < :endTime";

private static final String TASK_EXECUTION_COUNT = "SELECT COUNT(*) FROM "
+ "AGGREGATE_TASK_EXECUTION ";

private static final String TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM "
+ "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName";

private static final String TASK_EXECUTION_COUNT_BY_NAME_AND_BEFORE_END_TIME = "SELECT COUNT(*) FROM "
+ "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME < :endTime";

private static final String COMPLETED_TASK_EXECUTION_COUNT = "SELECT COUNT(*) FROM "
+ "AGGREGATE_TASK_EXECUTION WHERE END_TIME IS NOT NULL";

private static final String COMPLETED_TASK_EXECUTION_COUNT_AND_BEFORE_END_TIME = "SELECT COUNT(*) FROM "
+ "AGGREGATE_TASK_EXECUTION WHERE END_TIME IS NOT NULL AND END_TIME < :endTime";

private static final String COMPLETED_TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM "
+ "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME IS NOT NULL ";

private static final String COMPLETED_TASK_EXECUTION_COUNT_BY_NAME_AND_BEFORE_END_TIME = "SELECT COUNT(*) FROM "
+ "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME IS NOT NULL AND END_TIME < :endTime ";


private static final String RUNNING_TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM "
+ "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME IS NULL ";
Expand Down Expand Up @@ -281,8 +274,8 @@ public List<AggregateTaskExecution> findChildTaskExecutions(Collection<Long> par
}

@Override
public List<AggregateTaskExecution> findTaskExecutionsByName(String taskName, boolean completed) {
if(StringUtils.hasLength(taskName)) {
public List<AggregateTaskExecution> findTaskExecutions(String taskName, boolean completed) {
if (StringUtils.hasLength(taskName)) {
final SqlParameterSource queryParameters = new MapSqlParameterSource()
.addValue("taskName", taskName);
String query = completed ? GET_EXECUTIONS_BY_NAME_COMPLETED : GET_EXECUTIONS_BY_NAME;
Expand All @@ -292,6 +285,16 @@ public List<AggregateTaskExecution> findTaskExecutionsByName(String taskName, bo
}
}

@Override
public List<AggregateTaskExecution> findTaskExecutionsBeforeEndTime(String taskName, @NonNull Date endTime) {
final SqlParameterSource queryParameters = new MapSqlParameterSource()
.addValue("taskName", taskName)
.addValue("endTime", endTime);
String query;
query = taskName.isEmpty() ? GET_EXECUTIONS_COMPLETED_BEFORE_END_TIME : GET_EXECUTION_BY_NAME_COMPLETED_BEFORE_END_TIME;
return this.jdbcTemplate.query(query, queryParameters, new CompositeTaskExecutionRowMapper());
}

@Override
public long getTaskExecutionCountByTaskName(String taskName) {
Long count;
Expand Down Expand Up @@ -328,6 +331,27 @@ public long getCompletedTaskExecutionCountByTaskName(String taskName) {
return count != null ? count : 0L;
}

@Override
public long getCompletedTaskExecutionCountByTaskNameAndBeforeDate(String taskName, @NonNull Date endTime) {
Long count;
if (StringUtils.hasText(taskName)) {
final SqlParameterSource queryParameters = new MapSqlParameterSource()
.addValue("taskName", taskName, Types.VARCHAR)
.addValue("endTime", endTime, Types.DATE);

try {
count = this.jdbcTemplate.queryForObject(COMPLETED_TASK_EXECUTION_COUNT_BY_NAME_AND_BEFORE_END_TIME, queryParameters, Long.class);
} catch (EmptyResultDataAccessException e) {
count = 0L;
}
} else {
final SqlParameterSource queryParameters = new MapSqlParameterSource()
.addValue("endTime", endTime, Types.DATE);
count = this.jdbcTemplate.queryForObject(COMPLETED_TASK_EXECUTION_COUNT_AND_BEFORE_END_TIME, queryParameters, Long.class);
}
return count != null ? count : 0L;
}

@Override
public long getRunningTaskExecutionCountByTaskName(String taskName) {
Long count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -187,8 +188,13 @@ public long getRunningTaskExecutionCount() {
}

@Override
public List<AggregateTaskExecution> findTaskExecutionsByName(String taskName, boolean completed) {
return this.taskExecutionQueryDao.findTaskExecutionsByName(taskName, completed);
public List<AggregateTaskExecution> findTaskExecutions(String taskName, boolean completed) {
return this.taskExecutionQueryDao.findTaskExecutions(taskName, completed);
}

@Override
public List<AggregateTaskExecution> findTaskExecutionsBeforeEndTime(String taskName, Date endTime) {
return this.taskExecutionQueryDao.findTaskExecutionsBeforeEndTime(taskName, endTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public RootResource info() {
root.add(unescapeTemplateVariables(linkTo(methodOn(TaskExecutionController.class).view(null,null)).withRel("tasks/executions/execution")));
root.add(unescapeTemplateVariables(entityLinks.linkToItemResource(TaskAppStatusResource.class, "{name}")
.withRel("tasks/validation")));
root.add(linkTo(methodOn(TasksInfoController.class).getInfo(null, null)).withRel("tasks/info/executions"));
root.add(linkTo(methodOn(TasksInfoController.class).getInfo(null, null, null)).withRel("tasks/info/executions"));
root.add(linkTo(methodOn(TaskLogsController.class).getLog(null, null, null)).withRel("tasks/logs"));

if (featuresProperties.isSchedulesEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,18 +337,20 @@ public void cleanup(
* optional {@code actions} and {@code completed} parameters can be used to not only clean up task execution resources,
* but can also trigger the deletion of task execution and job data in the persistence store.
*
* @param actions Defaults to "CLEANUP" if not specified
* @param completed Defaults to cleanup only completed task executions
* @param taskName Optional name of task to clean up.
* @param actions the actions to perform (default 'CLEANUP')
* @param completed whether to include only completed task executions (default false)
* @param taskName name of the task (default '')
* @param days only include tasks that have ended at least this many days ago (default null)
*/
@RequestMapping(method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.OK)
public void cleanupAll(
@RequestParam(defaultValue = "CLEANUP", name = "action") TaskExecutionControllerDeleteAction[] actions,
@RequestParam(defaultValue = "false", name = "completed") boolean completed,
@RequestParam(defaultValue = "", name = "name") String taskName
@RequestParam(defaultValue = "", name = "name") String taskName,
@RequestParam(name="days", required = false) Integer days
) {
this.taskDeleteService.cleanupExecutions(new HashSet<>(Arrays.asList(actions)), taskName, completed);
this.taskDeleteService.cleanupExecutions(new HashSet<>(Arrays.asList(actions)), taskName, completed, days);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ public TasksInfoController(TaskExecutionService taskExecutionService) {
@ResponseStatus(HttpStatus.OK)
public TaskExecutionsInfoResource getInfo(
@RequestParam(required = false, defaultValue = "false", name="completed") String completed,
@RequestParam(required = false, defaultValue = "", name="name") String taskName
@RequestParam(required = false, defaultValue = "", name="name") String taskName,
@RequestParam(required = false, name="days") Integer days
) {
return this.taskExecutionsAssembler.toModel(this.taskExecutionService.getAllTaskExecutionsCount(Boolean.parseBoolean(completed), taskName));
return this.taskExecutionsAssembler.toModel(this.taskExecutionService.getAllTaskExecutionsCount(Boolean.parseBoolean(completed), taskName, days));
}

/**
Expand Down
Loading

0 comments on commit af60a25

Please sign in to comment.