Skip to content

Commit

Permalink
Add support for Job instance and execution dao tests
Browse files Browse the repository at this point in the history
Updated MARIADB schema  to use sequences instead of tables for Boot 3 batch tables
  • Loading branch information
cppwfs committed Nov 28, 2023
1 parent 0398437 commit a0aa8a0
Show file tree
Hide file tree
Showing 11 changed files with 590 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public int countJobExecutionsForJob(String name, BatchStatus status) throws NoSu
}

private int countJobExecutions(String jobName, BatchStatus status) throws NoSuchJobException {
if (StringUtils.isEmpty(jobName)) {
if (!StringUtils.hasText(jobName)) {
if (status != null) {
return jobExecutionDao.countJobExecutions(status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,38 +141,6 @@ CREATE TABLE BOOT3_BATCH_JOB_EXECUTION_CONTEXT
);


CREATE TABLE BOOT3_BATCH_STEP_EXECUTION_SEQ
(
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
);

INSERT INTO BOOT3_BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BOOT3_BATCH_STEP_EXECUTION_SEQ);

CREATE TABLE BOOT3_BATCH_JOB_EXECUTION_SEQ
(
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
);

INSERT INTO BOOT3_BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BOOT3_BATCH_JOB_EXECUTION_SEQ);

CREATE TABLE BOOT3_BATCH_JOB_SEQ
(
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
);

INSERT INTO BOOT3_BATCH_JOB_SEQ (ID, UNIQUE_KEY)
select *
from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
where not exists(select * from BOOT3_BATCH_JOB_SEQ);
CREATE SEQUENCE BOOT3_BATCH_STEP_EXECUTION_SEQ START WITH 1 MINVALUE 1 MAXVALUE 9223372036854775806 INCREMENT BY 1 NOCACHE NOCYCLE;
CREATE SEQUENCE BOOT3_BATCH_JOB_EXECUTION_SEQ START WITH 1 MINVALUE 1 MAXVALUE 9223372036854775806 INCREMENT BY 1 NOCACHE NOCYCLE;
CREATE SEQUENCE BOOT3_BATCH_JOB_SEQ START WITH 1 MINVALUE 1 MAXVALUE 9223372036854775806 INCREMENT BY 1 NOCACHE NOCYCLE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.dataflow.server.batch;

import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.ext.ScriptUtils;
import org.testcontainers.jdbc.JdbcDatabaseDelegate;

import javax.sql.DataSource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

public class AbstractDaoTests {

protected DataSource createDataSourceForContainer(JdbcDatabaseContainer dbContainer) {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(dbContainer.getDriverClassName());
dataSource.setUrl(dbContainer.getJdbcUrl());
dataSource.setUsername(dbContainer.getUsername());
dataSource.setPassword(dbContainer.getPassword());
return dataSource;
}
protected void createDataFlowSchema(JdbcDatabaseContainer dbContainer, String schemaName) throws IOException {
JdbcDatabaseDelegate containerDelegate = new JdbcDatabaseDelegate(dbContainer, "");
ScriptUtils.runInitScript(containerDelegate, "schemas/drop-table-schema-" + schemaName + ".sql");

getResourceFiles("schemas/" + schemaName).forEach(str -> {
if (str.contains("dataflow"))
ScriptUtils.runInitScript(containerDelegate, "schemas/" + schemaName + "/" + str);
});
}

private List<String> getResourceFiles(String path) throws IOException {
List<String> fileNames = new ArrayList<>();

try (
InputStream stream = getResourceFileAsStream(path);
BufferedReader br = new BufferedReader(new InputStreamReader(stream))) {
String fileName;

while ((fileName = br.readLine()) != null) {
fileNames.add(fileName);
}
}
return fileNames;
}

private InputStream getResourceFileAsStream(String resourceFile) {
InputStream stream = Thread.currentThread().getContextClassLoader().getResourceAsStream(resourceFile);
return stream == null ? getClass().getResourceAsStream(resourceFile) : stream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.dataflow.server.batch;

import org.junit.jupiter.api.Test;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.repository.dao.AbstractJdbcBatchMetadataDao;
import org.springframework.batch.core.repository.dao.JdbcJobExecutionDao;
import org.springframework.batch.core.repository.dao.JdbcStepExecutionDao;
import org.springframework.batch.item.database.support.DataFieldMaxValueIncrementerFactory;
import org.springframework.cloud.dataflow.core.database.support.DatabaseType;
import org.springframework.cloud.dataflow.core.database.support.MultiSchemaIncrementerFactory;
import org.springframework.cloud.task.batch.listener.support.JdbcTaskBatchDao;
import org.springframework.cloud.task.repository.TaskRepository;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.containers.JdbcDatabaseContainer;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public abstract class AbstractJdbcJobSearchableExecutionDaoTests extends AbstractDaoTests {

private static final String BASE_JOB_INSTANCE_NAME = "JOB_INST_";

protected JdbcSearchableJobExecutionDao jdbcSearchableJobExecutionDao;

protected JdbcSearchableJobInstanceDao jdbcSearchableJobInstanceDao;

protected DataSource dataSource;

protected DataFieldMaxValueIncrementerFactory incrementerFactory;

protected JdbcJobExecutionDao jobExecutionDao;

protected JdbcStepExecutionDao stepExecutionDao;

protected JdbcTemplate jdbcTemplate;

protected TaskRepository taskRepository;

protected JdbcTaskBatchDao jdbcTaskBatchDao;

public void setupSearchableExecutionDaoTest(JdbcDatabaseContainer dbContainer, String schemaName,
DatabaseType databaseType) throws Exception {
this.dataSource = createDataSourceForContainer(dbContainer);
this.jdbcTemplate = new JdbcTemplate(this.dataSource);
createDataFlowSchema(dbContainer, schemaName);

this.jdbcSearchableJobExecutionDao = new JdbcSearchableJobExecutionDao();
this.jdbcSearchableJobExecutionDao.setDataSource(this.dataSource);
this.jdbcSearchableJobExecutionDao.afterPropertiesSet();
this.jdbcSearchableJobInstanceDao = new JdbcSearchableJobInstanceDao();
this.jdbcSearchableJobInstanceDao.setJdbcTemplate(this.jdbcTemplate);
incrementerFactory = new MultiSchemaIncrementerFactory(dataSource);

this.jdbcSearchableJobInstanceDao.setJobIncrementer(incrementerFactory.getIncrementer(databaseType.name(),
AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX + "JOB_SEQ"));
jobExecutionDao = new JdbcJobExecutionDao();
jobExecutionDao.setJobExecutionIncrementer(incrementerFactory.getIncrementer(databaseType.name(),
AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX + "JOB_EXECUTION_SEQ"));
this.jobExecutionDao.setJdbcTemplate(new JdbcTemplate(this.dataSource));
jobExecutionDao.afterPropertiesSet();
this.stepExecutionDao = new JdbcStepExecutionDao();
this.stepExecutionDao.setJdbcTemplate(this.jdbcTemplate);
this.stepExecutionDao.setStepExecutionIncrementer(incrementerFactory.getIncrementer(databaseType.name(),
AbstractJdbcBatchMetadataDao.DEFAULT_TABLE_PREFIX + "STEP_EXECUTION_SEQ"));
this.stepExecutionDao.afterPropertiesSet();
}

@Test
public void testJobExecutionCount() {
assertThat(this.jdbcSearchableJobExecutionDao.countJobExecutions()).isEqualTo(0);
JobInstance jobInstance = jdbcSearchableJobInstanceDao.createJobInstance(BASE_JOB_INSTANCE_NAME,
new JobParameters());
jobExecutionDao.saveJobExecution(new JobExecution(jobInstance, new JobParameters()));
assertThat(this.jdbcSearchableJobExecutionDao.countJobExecutions()).isEqualTo(1);
}

@Test
public void testGetCompletedJobExecutionsByType() {
String suffix = "_BY_NAME";
assertThat(this.jdbcSearchableJobExecutionDao.getJobExecutions(BASE_JOB_INSTANCE_NAME + suffix,
BatchStatus.COMPLETED, 0, 5).size()).isEqualTo(0);
createJobExecutions(BASE_JOB_INSTANCE_NAME + suffix, 7);
createJobExecutions(BASE_JOB_INSTANCE_NAME + suffix + "_FAILED", BatchStatus.FAILED, 5);

assertThat(this.jdbcSearchableJobExecutionDao.getJobExecutions(BASE_JOB_INSTANCE_NAME + suffix,
BatchStatus.COMPLETED, 0, 7).size()).isEqualTo(7);
assertThat(this.jdbcSearchableJobExecutionDao.getJobExecutions(BASE_JOB_INSTANCE_NAME + suffix,
BatchStatus.COMPLETED, 0, 5).size()).isEqualTo(5);
}

@Test
public void testGetJobExecutions() {
String suffix = "_BY_NAME";
String suffixFailed = suffix + "_FAILED";
assertThat(this.jdbcSearchableJobExecutionDao.getJobExecutions(BASE_JOB_INSTANCE_NAME + suffix,
BatchStatus.COMPLETED, 0, 5).size()).isEqualTo(0);
createJobExecutions(BASE_JOB_INSTANCE_NAME + suffix, 5);
createJobExecutions(BASE_JOB_INSTANCE_NAME + suffixFailed, BatchStatus.FAILED, 7);

assertThat(this.jdbcSearchableJobExecutionDao.getJobExecutions(BASE_JOB_INSTANCE_NAME + suffix,
0, 20).size()).isEqualTo(5);
assertThat(this.jdbcSearchableJobExecutionDao.getJobExecutions(BASE_JOB_INSTANCE_NAME + suffixFailed,
0, 20).size()).isEqualTo(7);

}

@Test
public void testJobExecutionCountByName() {
String suffix = "COUNT_BY_NAME";
assertThat(this.jdbcSearchableJobExecutionDao.countJobExecutions(BASE_JOB_INSTANCE_NAME + suffix))
.isEqualTo(0);
createJobExecutions(BASE_JOB_INSTANCE_NAME + suffix, 5);
assertThat(this.jdbcSearchableJobExecutionDao.countJobExecutions(BASE_JOB_INSTANCE_NAME + suffix))
.isEqualTo(5);
}

@Test
public void testJobExecutionCountByStatus() {
String suffix = "_COUNT_BY_NAME";
assertThat(this.jdbcSearchableJobExecutionDao.countJobExecutions(BatchStatus.COMPLETED)).isEqualTo(0);
createJobExecutions(BASE_JOB_INSTANCE_NAME + suffix, 5);
assertThat(this.jdbcSearchableJobExecutionDao.countJobExecutions(BatchStatus.COMPLETED)).isEqualTo(5);
}

@Test
public void testJobExecutionCountByNameAndStatus() {
String suffix = "_COUNT_BY_NAME_STATUS";
assertThat(this.jdbcSearchableJobExecutionDao.countJobExecutions(BASE_JOB_INSTANCE_NAME + suffix,
BatchStatus.COMPLETED)).isEqualTo(0);
createJobExecutions(BASE_JOB_INSTANCE_NAME + suffix, 5);
assertThat(this.jdbcSearchableJobExecutionDao.countJobExecutions(BASE_JOB_INSTANCE_NAME + suffix,
BatchStatus.COMPLETED)).isEqualTo(5);
}

@Test
public void testJobExecutionsWithStepCount() {
String suffix = "_JOB_EXECUTIONS_WITH_STEP_COUNT";

createJobExecutions(BASE_JOB_INSTANCE_NAME + suffix, 5);

List<JobExecutionWithStepCount> jobExecutionsWithStepCount =
this.jdbcSearchableJobExecutionDao.getJobExecutionsWithStepCount(0, 20);
assertThat(jobExecutionsWithStepCount.size()).isEqualTo(5);
assertThat(jobExecutionsWithStepCount.get(0).getStepCount()).isEqualTo(3);
}

@Test
public void testJobExecutionsWithStepCountByJobInstance() {
String suffix = "_JOB_EXECUTIONS_WITH_STEP_COUNT_BY_JOB_INSTANCE";

createJobExecutions(BASE_JOB_INSTANCE_NAME + suffix, 5);
JobInstance jobInstance = this.jdbcSearchableJobInstanceDao.getJobInstances(
BASE_JOB_INSTANCE_NAME + suffix, 0, 5).get(0);

List<JobExecutionWithStepCount> jobExecutionsWithStepCount =
this.jdbcSearchableJobExecutionDao.getJobExecutionsWithStepCountFilteredByJobInstanceId((int) jobInstance
.getInstanceId(), 0, 10);
assertThat(jobExecutionsWithStepCount.size()).isEqualTo(5);
assertThat(jobExecutionsWithStepCount.get(0).getStepCount()).isEqualTo(3);
}

private List<JobExecution> createJobExecutions(String name, int numberOfJobs) {
return createJobExecutions(name, BatchStatus.COMPLETED, numberOfJobs);
}

private List<JobExecution> createJobExecutions(String name, BatchStatus batchStatus, int numberOfJobs) {
List<JobExecution> jobExecutions = new ArrayList<>();
JobInstance jobInstance = jdbcSearchableJobInstanceDao.createJobInstance(name, new JobParameters());

for (int i = 0; i < numberOfJobs; i++) {
JobExecution jobExecution = new JobExecution(jobInstance, new JobParameters());
jobExecution.setStatus(batchStatus);
jobExecution.createStepExecution("StepOne");
jobExecution.createStepExecution("StepTwo");
jobExecution.createStepExecution("StepThree");
jobExecutionDao.saveJobExecution(jobExecution);
StepExecution stepExecution = new StepExecution("StepOne", jobExecution);
this.stepExecutionDao.saveStepExecution(stepExecution);
stepExecution = new StepExecution("StepTwo", jobExecution);
this.stepExecutionDao.saveStepExecution(stepExecution);
stepExecution = new StepExecution("StepThree", jobExecution);
this.stepExecutionDao.saveStepExecution(stepExecution);
jobExecutions.add(jobExecution);
}
return jobExecutions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.dataflow.server.batch;

import org.junit.jupiter.api.Test;
import org.springframework.batch.core.JobParameters;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testcontainers.containers.JdbcDatabaseContainer;

import javax.sql.DataSource;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public abstract class AbstractJdbcJobSearchableInstanceDaoTests extends AbstractDaoTests {
private static final String BASE_JOB_INST_NAME = "JOB_INST_";

private DataSource dataSource;

public JdbcSearchableJobInstanceDao jdbcSearchableJobInstanceDao;

public void setupSearchableExecutionDaoTest(JdbcDatabaseContainer dbContainer, String schemaName) throws Exception {
this.dataSource = createDataSourceForContainer(dbContainer);
createDataFlowSchema(dbContainer, schemaName);
jdbcSearchableJobInstanceDao = new JdbcSearchableJobInstanceDao();
jdbcSearchableJobInstanceDao.setJdbcTemplate(new JdbcTemplate(this.dataSource));
jdbcSearchableJobInstanceDao.afterPropertiesSet();
}

@Test
public void testCountJobInstances() {
assertThat(jdbcSearchableJobInstanceDao.countJobInstances(BASE_JOB_INST_NAME)).isEqualTo(0);
jdbcSearchableJobInstanceDao.createJobInstance(BASE_JOB_INST_NAME, new JobParameters());
assertThat(jdbcSearchableJobInstanceDao.countJobInstances(BASE_JOB_INST_NAME)).isEqualTo(1);
}
}
Loading

0 comments on commit a0aa8a0

Please sign in to comment.