Skip to content

Commit

Permalink
Polish "Add async task execution cleanup"
Browse files Browse the repository at this point in the history
  • Loading branch information
onobc committed Nov 1, 2023
1 parent 79845a0 commit ecd86af
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 74 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/ci-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ jobs:
- name: Build
run: |
mvn -B -s .github/settings.xml clean install
- name: Test Report
uses: dorny/test-reporter@v1
if: ${{ success() || failure() }}
- name: Capture Test Results
if: failure()
uses: actions/upload-artifact@v3
with:
name: Unit Tests
path: '**/surefire-reports/*.xml'
reporter: java-junit
list-tests: failed
retention-days: 3
# clean m2 cache
- name: Clean cache
run: |
Expand Down
5 changes: 5 additions & 0 deletions spring-cloud-dataflow-server-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mariadb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,51 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
import org.springframework.boot.task.TaskExecutorBuilder;
import org.springframework.cloud.dataflow.core.DataFlowPropertyKeys;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import static org.springframework.cloud.dataflow.server.config.DataflowAsyncConfiguration.ASYNC_PREFIX;
import static org.springframework.cloud.dataflow.server.config.DataflowAsyncAutoConfiguration.ASYNC_PROPS_PREFIX;

/**
* Class to override the executor at the application level. It also enables async executions for the Spring Cloud Data Flow Server.
* Enables async executions for the Spring Cloud Dataflow server.
* Uses the Spring Boot autoconfigured {@code TaskExecutorBuilder} to create an async executor and register it
* with name {@link #DATAFLOW_ASYNC_EXECUTOR}.
*
* @author Tobias Soloschenko
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = ASYNC_PREFIX, name = "enabled")
@ConditionalOnBean({EnableDataFlowServerConfiguration.Marker.class})
@ConditionalOnProperty(prefix = ASYNC_PROPS_PREFIX, name = "enabled", havingValue = "true")
@AutoConfigureAfter(TaskExecutionAutoConfiguration.class)
@EnableAsync
class DataflowAsyncConfiguration implements AsyncConfigurer {
public class DataflowAsyncAutoConfiguration implements AsyncConfigurer {

private static final Logger logger = LoggerFactory.getLogger(DataflowAsyncConfiguration.class);
private static final Logger logger = LoggerFactory.getLogger(DataflowAsyncAutoConfiguration.class);

public static final String ASYNC_PREFIX = DataFlowPropertyKeys.PREFIX + "async";
public static final String ASYNC_PROPS_PREFIX = DataFlowPropertyKeys.PREFIX + "async";

public static final String DATAFLOW_ASYNC_EXECUTOR = "dataflowAsyncExecutor";

private static final String THREAD_NAME_PREFIX = "scdf-async-";

@Bean(name = "asyncExecutor")
Executor getAsyncExecutor(TaskExecutorBuilder taskExecutorBuilder) {
return taskExecutorBuilder.threadNamePrefix(THREAD_NAME_PREFIX).build();
private final TaskExecutorBuilder taskExecutorBuilder;

public DataflowAsyncAutoConfiguration(TaskExecutorBuilder taskExecutorBuilder) {
this.taskExecutorBuilder = taskExecutorBuilder;
}

@Bean(name = DATAFLOW_ASYNC_EXECUTOR)
@Override
public Executor getAsyncExecutor() {
return this.taskExecutorBuilder.threadNamePrefix(THREAD_NAME_PREFIX).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.springframework.cloud.dataflow.rest.util.TaskSanitizer;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.config.DataflowAsyncAutoConfiguration;
import org.springframework.cloud.dataflow.server.controller.support.TaskExecutionControllerDeleteAction;
import org.springframework.cloud.dataflow.server.repository.NoSuchTaskDefinitionException;
import org.springframework.cloud.dataflow.server.repository.NoSuchTaskExecutionException;
Expand Down Expand Up @@ -337,6 +338,9 @@ public void cleanup(
* Cleanup resources associated with one or more task executions. The
* optional {@code actions} and {@code completed} parameters can be used to not only clean up task execution resources,
* but can also trigger the deletion of task execution and job data in the persistence store.
* <p>
* When the {@code spring.cloud.dataflow.async.enabled} property is set to {@code true} the cleanup will happen
* asynchronously.
*
* @param actions the actions to perform (default 'CLEANUP')
* @param completed whether to include only completed task executions (default false)
Expand All @@ -345,7 +349,7 @@ public void cleanup(
*/
@RequestMapping(method = RequestMethod.DELETE)
@ResponseStatus(HttpStatus.OK)
@Async
@Async(DataflowAsyncAutoConfiguration.DATAFLOW_ASYNC_EXECUTOR)
public void cleanupAll(
@RequestParam(defaultValue = "CLEANUP", name = "action") TaskExecutionControllerDeleteAction[] actions,
@RequestParam(defaultValue = "false", name = "completed") boolean completed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.dataflow.server.config.DataFlowServerAutoConfiguration,\
org.springframework.cloud.dataflow.server.config.DataFlowControllerAutoConfiguration, \
org.springframework.cloud.dataflow.server.config.SpringDocAutoConfiguration, \
org.springframework.cloud.dataflow.server.config.DataflowAsyncConfiguration
org.springframework.cloud.dataflow.server.config.DataflowAsyncAutoConfiguration

org.springframework.context.ApplicationContextInitializer=\
org.springframework.cloud.dataflow.common.flyway.FlywayVendorReplacingApplicationContextInitializer
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.dataflow.server.controller;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.batch.BatchProperties;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase.Replace;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.dataflow.aggregate.task.AggregateExecutionSupport;
import org.springframework.cloud.dataflow.aggregate.task.TaskDefinitionReader;
import org.springframework.cloud.dataflow.core.Launcher;
import org.springframework.cloud.dataflow.core.TaskDefinition;
import org.springframework.cloud.dataflow.core.TaskDeployment;
import org.springframework.cloud.dataflow.core.TaskPlatform;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;
import org.springframework.cloud.dataflow.server.config.DataflowAsyncAutoConfiguration;
import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties;
import org.springframework.cloud.dataflow.server.configuration.JobDependencies;
import org.springframework.cloud.dataflow.server.job.LauncherRepository;
import org.springframework.cloud.dataflow.server.repository.TaskDefinitionRepository;
import org.springframework.cloud.dataflow.server.repository.TaskDeploymentRepository;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.deployer.spi.task.TaskLauncher;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.dao.TaskExecutionDao;
import org.springframework.http.MediaType;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

/**
* Unit tests for the {@link TaskExecutionController#cleanupAll async cleanup} API.
*
* @author Chris Bono
*/
@SpringBootTest(
properties = "spring.cloud.dataflow.async.enabled=true",
classes = { JobDependencies.class, TaskExecutionAutoConfiguration.class, DataflowAsyncAutoConfiguration.class,
PropertyPlaceholderAutoConfiguration.class, BatchProperties.class})
@EnableConfigurationProperties({CommonApplicationProperties.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@AutoConfigureTestDatabase(replace = Replace.ANY)
@EnableDataFlowServer
public class TaskExecutionControllerCleanupAsyncTests {

@Autowired
private TaskExecutionDaoContainer daoContainer;

@Autowired
private TaskDefinitionRepository taskDefinitionRepository;

private MockMvc mockMvc;

@Autowired
private WebApplicationContext wac;

@Autowired
private AggregateExecutionSupport aggregateExecutionSupport;

@Autowired
private TaskLauncher taskLauncher;

@Autowired
private LauncherRepository launcherRepository;

@Autowired
private TaskPlatform taskPlatform;

@Autowired
private TaskDeploymentRepository taskDeploymentRepository;

@Autowired
TaskDefinitionReader taskDefinitionReader;

@BeforeEach
public void setupMockMVC() {
assertThat(this.launcherRepository.findByName("default")).isNull();
Launcher launcher = new Launcher("default", "local", taskLauncher);
launcherRepository.save(launcher);
taskPlatform.setLaunchers(Collections.singletonList(launcher));
this.mockMvc = MockMvcBuilders.webAppContextSetup(wac)
.defaultRequest(get("/").accept(MediaType.APPLICATION_JSON)).build();
}

@Test
void cleanupAll() throws Exception {
String taskExecutionId = "asyncCleanupAllTaskExecId";
setupTaskExecutions("asyncCleanupAllTaskName", taskExecutionId);
mockMvc.perform(delete("/tasks/executions"))
.andDo(print())
.andExpect(status().is(200));
verify(taskLauncher, times(0)).cleanup(taskExecutionId);
Awaitility.await()
.atMost(Duration.ofSeconds(3))
.untilAsserted(() -> verify(taskLauncher, times(2)).cleanup(taskExecutionId));
}

private void setupTaskExecutions(String taskName, String taskExecutionId) {
taskDefinitionRepository.save(new TaskDefinition(taskName, "taskDslGoesHere"));
SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(taskName, taskDefinitionReader);
TaskExecutionDao taskExecutionDao = daoContainer.get(schemaVersionTarget.getName());

List<String> taskArgs = new ArrayList<>();
taskArgs.add("foo=bar");
TaskExecution taskExecution1 = taskExecutionDao.createTaskExecution(taskName, new Date(), taskArgs, taskExecutionId);
taskExecutionDao.createTaskExecution(taskName, new Date(), taskArgs, taskExecutionId, taskExecution1.getExecutionId());

TaskDeployment taskDeployment = new TaskDeployment();
taskDeployment.setTaskDefinitionName(taskName);
taskDeployment.setTaskDeploymentId(taskExecutionId);
taskDeployment.setPlatformName("default");
taskDeployment.setCreatedOn(Instant.now());
taskDeploymentRepository.save(taskDeployment);
}

}
Loading

0 comments on commit ecd86af

Please sign in to comment.