diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/AggregateTaskExplorer.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/AggregateTaskExplorer.java index 8d1184ea6f..3eadd28b25 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/AggregateTaskExplorer.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/AggregateTaskExplorer.java @@ -16,6 +16,7 @@ package org.springframework.cloud.dataflow.aggregate.task; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Set; @@ -94,10 +95,20 @@ public interface AggregateTaskExplorer { * Get a list of executions for a task by name and completion status. * * @param taskName the name of the task to be searched - * @param completed Indicator to find only completed tasks + * @param onlyCompleted whether to include only completed tasks * @return list of task executions */ - List findTaskExecutionsByName(String taskName, boolean completed); + List findTaskExecutions(String taskName, boolean onlyCompleted); + + /** + * Get a list of executions for a task by name, completion status and end time. + * + * @param taskName the name of the task to be searched + * @param endTime the tasks that ended before the endTime + * @return list of task executions + * @since 2.11.0 + */ + List findTaskExecutionsBeforeEndTime(String taskName, Date endTime); /** * Get a collection/page of executions. diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/DataflowTaskExecutionQueryDao.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/DataflowTaskExecutionQueryDao.java index 61999648d9..6432a94926 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/DataflowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/DataflowTaskExecutionQueryDao.java @@ -16,6 +16,7 @@ package org.springframework.cloud.dataflow.aggregate.task; import java.util.Collection; +import java.util.Date; import java.util.List; import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; @@ -23,6 +24,7 @@ import org.springframework.cloud.task.repository.dao.TaskExecutionDao; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; +import org.springframework.lang.NonNull; /** * Repository to access {@link TaskExecution}s. Mirrors the {@link TaskExecutionDao} @@ -30,6 +32,7 @@ * be migrated to Spring Cloud Task itself. * * @author Corneil du Plessis + * @since 2.11.0 */ public interface DataflowTaskExecutionQueryDao { /** @@ -59,26 +62,44 @@ public interface DataflowTaskExecutionQueryDao { List findChildTaskExecutions(Collection parentIds, String schemaTarget); /** - * Retrieves a subset of task executions by task name and execution status. + * Find task executions by task name and completion status. * * @param taskName the name of the task to search for in the repository. - * @param completed indicator to retrieve only completed task executions. - * @return a list that contains task executions. + * @param completed whether to include only completed task executions. + * @return list of task executions */ - List findTaskExecutionsByName(String taskName, boolean completed); + List findTaskExecutions(String taskName, boolean completed); + + /** + * Find task executions by task name whose end date is before the specified date. + * + * @param taskName the name of the task to search for in the repository. + * @param endTime the time before the task ended. + * @return list of task executions. + */ + List findTaskExecutionsBeforeEndTime(String taskName, @NonNull Date endTime); /** * Retrieves current number of task executions for a taskName. * - * @param taskName the name of the task to search for in the repository. + * @param taskName the name of the task * @return current number of task executions for the taskName. */ long getTaskExecutionCountByTaskName(String taskName); /** - * Retrieves the number of task execution that have completed. + * Retrieves current number of task executions for a taskName and with a non-null endTime before the specified date. * - * @param taskName the name of the task to search + * @param taskName the name of the task + * @param endTime the time before task ended + * @return the number of completed task executions + */ + long getCompletedTaskExecutionCountByTaskNameAndBeforeDate(String taskName, @NonNull Date endTime); + + /** + * Retrieves current number of task executions for a taskName and with a non-null endTime. + * + * @param taskName the name of the task * @return the number of completed task executions */ long getCompletedTaskExecutionCountByTaskName(String taskName); @@ -88,7 +109,7 @@ public interface DataflowTaskExecutionQueryDao { * null. * * @param taskName the name of the task to search for in the repository. - * @return current number of task executions for the taskName. + * @return the number of running task executions */ long getRunningTaskExecutionCountByTaskName(String taskName); diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/AggregateDataFlowTaskExecutionQueryDao.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/AggregateDataFlowTaskExecutionQueryDao.java index 1cb048e8c7..fa2169b0ea 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/AggregateDataFlowTaskExecutionQueryDao.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/AggregateDataFlowTaskExecutionQueryDao.java @@ -15,19 +15,21 @@ */ package org.springframework.cloud.dataflow.aggregate.task.impl; -import javax.sql.DataSource; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import javax.sql.DataSource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +49,7 @@ import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.core.namedparam.SqlParameterSource; +import org.springframework.lang.NonNull; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -86,62 +89,43 @@ public class AggregateDataFlowTaskExecutionQueryDao implements DataflowTaskExecu private static final String FIND_TASK_ARGUMENTS = "SELECT TASK_EXECUTION_ID, " + "TASK_PARAM from AGGREGATE_TASK_EXECUTION_PARAMS where TASK_EXECUTION_ID = :taskExecutionId and SCHEMA_TARGET = :schemaTarget"; - private static final String GET_EXECUTION_BY_ID = "SELECT TASK_EXECUTION_ID," - + "START_TIME, END_TIME, TASK_NAME, EXIT_CODE," - + "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," - + "PARENT_EXECUTION_ID, SCHEMA_TARGET" - + " from AGGREGATE_TASK_EXECUTION where TASK_EXECUTION_ID = :taskExecutionId and SCHEMA_TARGET = :schemaTarget"; - - private final static String GET_CHILD_EXECUTION_BY_ID = "SELECT TASK_EXECUTION_ID," + - "START_TIME, END_TIME, TASK_NAME, EXIT_CODE," + - "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," + - "PARENT_EXECUTION_ID, SCHEMA_TARGET" + - " from AGGREGATE_TASK_EXECUTION" + + private static final String GET_EXECUTIONS = "SELECT " + SELECT_CLAUSE + + " from AGGREGATE_TASK_EXECUTION"; + + private static final String GET_EXECUTION_BY_ID = GET_EXECUTIONS + + " where TASK_EXECUTION_ID = :taskExecutionId and SCHEMA_TARGET = :schemaTarget"; + + private final static String GET_CHILD_EXECUTION_BY_ID = GET_EXECUTIONS + " where PARENT_EXECUTION_ID = :taskExecutionId" + " and (SELECT COUNT(*) FROM AGGREGATE_TASK_EXECUTION_PARAMS P " + " WHERE P.TASK_EXECUTION_ID=TASK_EXECUTION_ID " + " AND P.SCHEMA_TARGET=SCHEMA_TARGET" + " AND P.TASK_PARAM = :schemaTarget) > 0"; - private final static String GET_CHILD_EXECUTION_BY_IDS = "SELECT TASK_EXECUTION_ID," + - "START_TIME, END_TIME, TASK_NAME, EXIT_CODE," + - "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," + - "PARENT_EXECUTION_ID, SCHEMA_TARGET" + - " from AGGREGATE_TASK_EXECUTION" + + private final static String GET_CHILD_EXECUTION_BY_IDS = GET_EXECUTIONS + " where PARENT_EXECUTION_ID IN (:taskExecutionIds)" + " and (SELECT COUNT(*) FROM AGGREGATE_TASK_EXECUTION_PARAMS P " + " WHERE P.TASK_EXECUTION_ID=TASK_EXECUTION_ID " + " AND P.SCHEMA_TARGET=SCHEMA_TARGET" + " AND P.TASK_PARAM = :schemaTarget) > 0"; - private static final String GET_EXECUTION_BY_EXTERNAL_EXECUTION_ID = "SELECT TASK_EXECUTION_ID," - + "START_TIME, END_TIME, TASK_NAME, EXIT_CODE," - + "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," - + "PARENT_EXECUTION_ID, SCHEMA_TARGET" - + " from AGGREGATE_TASK_EXECUTION where EXTERNAL_EXECUTION_ID = :externalExecutionId and TASK_NAME = :taskName"; - - private static final String GET_EXECUTIONS_BY_NAME_COMPLETED = "SELECT TASK_EXECUTION_ID," - + "START_TIME, END_TIME, TASK_NAME, EXIT_CODE," - + "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," - + "PARENT_EXECUTION_ID, SCHEMA_TARGET" - + " from AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME IS NOT NULL"; - - private static final String GET_EXECUTIONS_BY_NAME = "SELECT TASK_EXECUTION_ID," - + "START_TIME, END_TIME, TASK_NAME, EXIT_CODE," - + "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," - + "PARENT_EXECUTION_ID, SCHEMA_TARGET" - + " from AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName"; - private static final String GET_EXECUTIONS_COMPLETED = "SELECT TASK_EXECUTION_ID," - + "START_TIME, END_TIME, TASK_NAME, EXIT_CODE," - + "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," - + "PARENT_EXECUTION_ID, SCHEMA_TARGET" - + " from AGGREGATE_TASK_EXECUTION where END_TIME IS NOT NULL"; - - private static final String GET_EXECUTIONS = "SELECT TASK_EXECUTION_ID," - + "START_TIME, END_TIME, TASK_NAME, EXIT_CODE," - + "EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID," - + "PARENT_EXECUTION_ID, SCHEMA_TARGET" - + " from AGGREGATE_TASK_EXECUTION"; + private static final String GET_EXECUTION_BY_EXTERNAL_EXECUTION_ID = GET_EXECUTIONS + + " where EXTERNAL_EXECUTION_ID = :externalExecutionId and TASK_NAME = :taskName"; + + private static final String GET_EXECUTIONS_BY_NAME_COMPLETED = GET_EXECUTIONS + + " where TASK_NAME = :taskName AND END_TIME IS NOT NULL"; + + private static final String GET_EXECUTIONS_BY_NAME = GET_EXECUTIONS + + " where TASK_NAME = :taskName"; + + private static final String GET_EXECUTIONS_COMPLETED = GET_EXECUTIONS + + " where END_TIME IS NOT NULL"; + + private static final String GET_EXECUTION_BY_NAME_COMPLETED_BEFORE_END_TIME = GET_EXECUTIONS + + " where TASK_NAME = :taskName AND END_TIME IS NOT NULL AND END_TIME < :endTime"; + + private static final String GET_EXECUTIONS_COMPLETED_BEFORE_END_TIME = GET_EXECUTIONS + + " where END_TIME IS NOT NULL AND END_TIME < :endTime"; private static final String TASK_EXECUTION_COUNT = "SELECT COUNT(*) FROM " + "AGGREGATE_TASK_EXECUTION "; @@ -149,12 +133,21 @@ public class AggregateDataFlowTaskExecutionQueryDao implements DataflowTaskExecu private static final String TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM " + "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName"; + private static final String TASK_EXECUTION_COUNT_BY_NAME_AND_BEFORE_END_TIME = "SELECT COUNT(*) FROM " + + "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME < :endTime"; + private static final String COMPLETED_TASK_EXECUTION_COUNT = "SELECT COUNT(*) FROM " + "AGGREGATE_TASK_EXECUTION WHERE END_TIME IS NOT NULL"; + private static final String COMPLETED_TASK_EXECUTION_COUNT_AND_BEFORE_END_TIME = "SELECT COUNT(*) FROM " + + "AGGREGATE_TASK_EXECUTION WHERE END_TIME IS NOT NULL AND END_TIME < :endTime"; + private static final String COMPLETED_TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM " + "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME IS NOT NULL "; + private static final String COMPLETED_TASK_EXECUTION_COUNT_BY_NAME_AND_BEFORE_END_TIME = "SELECT COUNT(*) FROM " + + "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME IS NOT NULL AND END_TIME < :endTime "; + private static final String RUNNING_TASK_EXECUTION_COUNT_BY_NAME = "SELECT COUNT(*) FROM " + "AGGREGATE_TASK_EXECUTION where TASK_NAME = :taskName AND END_TIME IS NULL "; @@ -281,8 +274,8 @@ public List findChildTaskExecutions(Collection par } @Override - public List findTaskExecutionsByName(String taskName, boolean completed) { - if(StringUtils.hasLength(taskName)) { + public List findTaskExecutions(String taskName, boolean completed) { + if (StringUtils.hasLength(taskName)) { final SqlParameterSource queryParameters = new MapSqlParameterSource() .addValue("taskName", taskName); String query = completed ? GET_EXECUTIONS_BY_NAME_COMPLETED : GET_EXECUTIONS_BY_NAME; @@ -292,6 +285,16 @@ public List findTaskExecutionsByName(String taskName, bo } } + @Override + public List findTaskExecutionsBeforeEndTime(String taskName, @NonNull Date endTime) { + final SqlParameterSource queryParameters = new MapSqlParameterSource() + .addValue("taskName", taskName) + .addValue("endTime", endTime); + String query; + query = taskName.isEmpty() ? GET_EXECUTIONS_COMPLETED_BEFORE_END_TIME : GET_EXECUTION_BY_NAME_COMPLETED_BEFORE_END_TIME; + return this.jdbcTemplate.query(query, queryParameters, new CompositeTaskExecutionRowMapper()); + } + @Override public long getTaskExecutionCountByTaskName(String taskName) { Long count; @@ -328,6 +331,27 @@ public long getCompletedTaskExecutionCountByTaskName(String taskName) { return count != null ? count : 0L; } + @Override + public long getCompletedTaskExecutionCountByTaskNameAndBeforeDate(String taskName, @NonNull Date endTime) { + Long count; + if (StringUtils.hasText(taskName)) { + final SqlParameterSource queryParameters = new MapSqlParameterSource() + .addValue("taskName", taskName, Types.VARCHAR) + .addValue("endTime", endTime, Types.DATE); + + try { + count = this.jdbcTemplate.queryForObject(COMPLETED_TASK_EXECUTION_COUNT_BY_NAME_AND_BEFORE_END_TIME, queryParameters, Long.class); + } catch (EmptyResultDataAccessException e) { + count = 0L; + } + } else { + final SqlParameterSource queryParameters = new MapSqlParameterSource() + .addValue("endTime", endTime, Types.DATE); + count = this.jdbcTemplate.queryForObject(COMPLETED_TASK_EXECUTION_COUNT_AND_BEFORE_END_TIME, queryParameters, Long.class); + } + return count != null ? count : 0L; + } + @Override public long getRunningTaskExecutionCountByTaskName(String taskName) { Long count; diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateTaskExplorer.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateTaskExplorer.java index b7aa10408b..805b54f0d6 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateTaskExplorer.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateTaskExplorer.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -187,8 +188,13 @@ public long getRunningTaskExecutionCount() { } @Override - public List findTaskExecutionsByName(String taskName, boolean completed) { - return this.taskExecutionQueryDao.findTaskExecutionsByName(taskName, completed); + public List findTaskExecutions(String taskName, boolean completed) { + return this.taskExecutionQueryDao.findTaskExecutions(taskName, completed); + } + + @Override + public List findTaskExecutionsBeforeEndTime(String taskName, Date endTime) { + return this.taskExecutionQueryDao.findTaskExecutionsBeforeEndTime(taskName, endTime); } @Override diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/RootController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/RootController.java index 447c7585b9..2d7fc501ea 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/RootController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/RootController.java @@ -155,7 +155,7 @@ public RootResource info() { root.add(unescapeTemplateVariables(linkTo(methodOn(TaskExecutionController.class).view(null,null)).withRel("tasks/executions/execution"))); root.add(unescapeTemplateVariables(entityLinks.linkToItemResource(TaskAppStatusResource.class, "{name}") .withRel("tasks/validation"))); - root.add(linkTo(methodOn(TasksInfoController.class).getInfo(null, null)).withRel("tasks/info/executions")); + root.add(linkTo(methodOn(TasksInfoController.class).getInfo(null, null, null)).withRel("tasks/info/executions")); root.add(linkTo(methodOn(TaskLogsController.class).getLog(null, null, null)).withRel("tasks/logs")); if (featuresProperties.isSchedulesEnabled()) { diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionController.java index e5564dc5b1..3bde9bbfd3 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionController.java @@ -337,18 +337,20 @@ public void cleanup( * optional {@code actions} and {@code completed} parameters can be used to not only clean up task execution resources, * but can also trigger the deletion of task execution and job data in the persistence store. * - * @param actions Defaults to "CLEANUP" if not specified - * @param completed Defaults to cleanup only completed task executions - * @param taskName Optional name of task to clean up. + * @param actions the actions to perform (default 'CLEANUP') + * @param completed whether to include only completed task executions (default false) + * @param taskName name of the task (default '') + * @param days only include tasks that have ended at least this many days ago (default null) */ @RequestMapping(method = RequestMethod.DELETE) @ResponseStatus(HttpStatus.OK) public void cleanupAll( @RequestParam(defaultValue = "CLEANUP", name = "action") TaskExecutionControllerDeleteAction[] actions, @RequestParam(defaultValue = "false", name = "completed") boolean completed, - @RequestParam(defaultValue = "", name = "name") String taskName + @RequestParam(defaultValue = "", name = "name") String taskName, + @RequestParam(name="days", required = false) Integer days ) { - this.taskDeleteService.cleanupExecutions(new HashSet<>(Arrays.asList(actions)), taskName, completed); + this.taskDeleteService.cleanupExecutions(new HashSet<>(Arrays.asList(actions)), taskName, completed, days); } /** diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TasksInfoController.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TasksInfoController.java index fada8417d2..32feee37e8 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TasksInfoController.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/controller/TasksInfoController.java @@ -62,9 +62,10 @@ public TasksInfoController(TaskExecutionService taskExecutionService) { @ResponseStatus(HttpStatus.OK) public TaskExecutionsInfoResource getInfo( @RequestParam(required = false, defaultValue = "false", name="completed") String completed, - @RequestParam(required = false, defaultValue = "", name="name") String taskName + @RequestParam(required = false, defaultValue = "", name="name") String taskName, + @RequestParam(required = false, name="days") Integer days ) { - return this.taskExecutionsAssembler.toModel(this.taskExecutionService.getAllTaskExecutionsCount(Boolean.parseBoolean(completed), taskName)); + return this.taskExecutionsAssembler.toModel(this.taskExecutionService.getAllTaskExecutionsCount(Boolean.parseBoolean(completed), taskName, days)); } /** diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskDeleteService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskDeleteService.java index 5fa742a279..85ad7bfc24 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskDeleteService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskDeleteService.java @@ -45,13 +45,24 @@ public interface TaskDeleteService { void cleanupExecutions(Set actionsAsSet, Set ids, String schemaTarget); /** - * Clean up the resources that resuled from running the task with the given name and actions. + * Clean up the resources that resulted from running the task with the given name. * - * @param actionsAsSet the actions + * @param actionsAsSet the actions to perform + * @param taskName the task name + * @param onlyCompleted whether to include only completed tasks + */ + void cleanupExecutions(Set actionsAsSet, String taskName, boolean onlyCompleted); + + /** + * Clean up the resources that resulted from running the task with the given name. + * + * @param actionsAsSet the actions to perform * @param taskName the task name - * @param completed the completion state of the task executions + * @param onlyCompleted whether to include only completed tasks (ignored when {@code includeTasksEndedMinDaysAgo} is specified) + * @param includeTasksEndedMinDaysAgo only include tasks that have ended at least this many days ago + * @since 2.11.0 */ - void cleanupExecutions(Set actionsAsSet, String taskName, boolean completed); + void cleanupExecutions(Set actionsAsSet, String taskName, boolean onlyCompleted, Integer includeTasksEndedMinDaysAgo); /** * Delete one or more Task executions. diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionService.java index 592924fd18..07d1b99f29 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/TaskExecutionService.java @@ -21,7 +21,6 @@ import org.springframework.cloud.dataflow.core.LaunchResponse; import org.springframework.cloud.dataflow.core.TaskManifest; -import org.springframework.cloud.dataflow.schema.AggregateTaskExecution; /** * Provides Task related services. @@ -88,10 +87,20 @@ public interface TaskExecutionService { /** * Returns the count of all the task execution IDs with the option to include only the completed task executions. - * @param onlyCompleted filter by completed task executions + * @param onlyCompleted whether to include only completed task executions * @param taskName the task name, if null then retrieve all the tasks * @return the number of executions * @since 2.8 */ Integer getAllTaskExecutionsCount(boolean onlyCompleted, String taskName); + + /** + * Returns the count of all the task execution IDs with the option to include only the completed task executions. + * @param onlyCompleted whether to include only completed task executions (ignored when {@code includeTasksEndedMinDaysAgo} is specified) + * @param taskName the task name, if null then retrieve all the tasks + * @param includeTasksEndedMinDaysAgo only include tasks that have ended at least this many days ago + * @return the number of executions, 0 if no data, never null + * @since 2.11.0 + */ + Integer getAllTaskExecutionsCount(boolean onlyCompleted, String taskName, Integer includeTasksEndedMinDaysAgo); } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskDeleteService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskDeleteService.java index 8a63fe868f..8059e31c30 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskDeleteService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskDeleteService.java @@ -16,7 +16,6 @@ package org.springframework.cloud.dataflow.server.service.impl; -import javax.sql.DataSource; import java.util.Collection; import java.util.HashSet; import java.util.LinkedHashMap; @@ -30,6 +29,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import javax.sql.DataSource; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,7 +194,18 @@ private void performCleanupExecution(long id, String schemaTarget) { @Override @Transactional public void cleanupExecutions(Set actionsAsSet, String taskName, boolean completed) { - List tasks = this.taskExplorer.findTaskExecutionsByName(taskName, completed); + cleanupExecutions(actionsAsSet, taskName, completed, null); + } + + @Override + @Transactional + public void cleanupExecutions(Set actionsAsSet, String taskName, boolean completed, Integer days) { + List tasks; + if (days != null) { + tasks = this.taskExplorer.findTaskExecutionsBeforeEndTime(taskName, TaskServicesDateUtils.numDaysAgoFromLocalMidnightToday(days)); + } else { + tasks = this.taskExplorer.findTaskExecutions(taskName, completed); + } final Set parentExecutions = new HashSet<>(); final Set childExecutions = new HashSet<>(); boolean removeData = actionsAsSet.contains(TaskExecutionControllerDeleteAction.REMOVE_DATA); @@ -310,7 +322,7 @@ public void deleteTaskExecutions(Set taskExecutionIds, String schemaTarget @Override public void deleteTaskExecutions(String taskName, boolean onlyCompleted) { - Map> tasks = this.taskExplorer.findTaskExecutionsByName(taskName, onlyCompleted) + Map> tasks = this.taskExplorer.findTaskExecutions(taskName, onlyCompleted) .stream().collect(Collectors.groupingBy(AggregateTaskExecution::getSchemaTarget)); for (String schemaTarget : tasks.keySet()) { Set executionIds = tasks.get(schemaTarget) diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java index 546df03882..c51652da55 100644 --- a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionService.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -1030,9 +1031,17 @@ public Set getAllTaskExecutionIds(boolean onlyCompleted, String taskName) @Override public Integer getAllTaskExecutionsCount(boolean onlyCompleted, String taskName) { - return (int) ( - onlyCompleted ? dataflowTaskExecutionQueryDao.getCompletedTaskExecutionCountByTaskName(taskName) - : dataflowTaskExecutionQueryDao.getTaskExecutionCountByTaskName(taskName) - ); + return getAllTaskExecutionsCount(onlyCompleted, taskName, null); + } + + @Override + public Integer getAllTaskExecutionsCount(boolean onlyCompleted, String taskName, Integer days) { + if (days != null) { + Date dateBeforeDays = TaskServicesDateUtils.numDaysAgoFromLocalMidnightToday(days); + return (int) dataflowTaskExecutionQueryDao.getCompletedTaskExecutionCountByTaskNameAndBeforeDate(taskName, dateBeforeDays); + } else { + return (int) (onlyCompleted ? dataflowTaskExecutionQueryDao.getCompletedTaskExecutionCountByTaskName(taskName) + : dataflowTaskExecutionQueryDao.getTaskExecutionCountByTaskName(taskName)); + } } } diff --git a/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/TaskServicesDateUtils.java b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/TaskServicesDateUtils.java new file mode 100644 index 0000000000..ae88a243e1 --- /dev/null +++ b/spring-cloud-dataflow-server-core/src/main/java/org/springframework/cloud/dataflow/server/service/impl/TaskServicesDateUtils.java @@ -0,0 +1,47 @@ +/* + * Copyright 2016-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.dataflow.server.service.impl; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.Date; + +import org.springframework.lang.NonNull; + +/** + * Provides date functionality for the task services. + * + * @author Tobias Soloschenko + */ +final class TaskServicesDateUtils { + + private TaskServicesDateUtils() { + } + + /** + * Gets the date representation for the given number of days in the past. + * + * @param numDaysAgo the number of days ago + * @return the date for {@code numDaysAgo} from today at midnight (locally) + */ + public static Date numDaysAgoFromLocalMidnightToday(@NonNull Integer numDaysAgo) { + LocalDateTime localDateTime = LocalDateTime.of(LocalDate.now(), LocalTime.MIDNIGHT).minusDays(numDaysAgo); + return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()); + } +}