Skip to content

Commit

Permalink
Provide for correctly resolving app registration from version supplie…
Browse files Browse the repository at this point in the history
…d during launch.

Fixes #5467
  • Loading branch information
corneil committed Sep 15, 2023
1 parent fa835fa commit 6f6a096
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ public interface AggregateExecutionSupport {
* @return The {@link SchemaVersionTarget} for the taskName specified.
*/
SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefinitionReader taskDefinitionReader);
SchemaVersionTarget findSchemaVersionTarget(String taskName, String version, TaskDefinitionReader taskDefinitionReader);
SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefinition taskDefinition);
SchemaVersionTarget findSchemaVersionTarget(String taskName, String version, TaskDefinition taskDefinition);

/**
* Retrieve the {@link AppRegistration} for the registeredName.
*/
AppRegistration findTaskAppRegistration(String registeredName);
AppRegistration findTaskAppRegistration(String registeredName, String version);

/**
* Return the {@link AggregateTaskExecution} for the {@link TaskExecution} and Schema Target name specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*/

public class DefaultAggregateExecutionSupport implements AggregateExecutionSupport {
private static Logger logger = LoggerFactory.getLogger(AggregateExecutionSupport.class);
private static final Logger logger = LoggerFactory.getLogger(AggregateExecutionSupport.class);

private final AppRegistryService registryService;

Expand Down Expand Up @@ -80,12 +80,29 @@ public SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefiniti
return findSchemaVersionTarget(taskName, definition);
}

@Override
public SchemaVersionTarget findSchemaVersionTarget(String taskName, String version, TaskDefinitionReader taskDefinitionReader) {
logger.debug("findSchemaVersionTarget:{}:{}", taskName, version);
TaskDefinition definition = taskDefinitionReader.findTaskDefinition(taskName);
return findSchemaVersionTarget(taskName, version, definition);
}

@Override
public SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefinition taskDefinition) {
return findSchemaVersionTarget(taskName, null, taskDefinition);
}

@Override
public SchemaVersionTarget findSchemaVersionTarget(String taskName, String version, TaskDefinition taskDefinition) {
logger.debug("findSchemaVersionTarget:{}:{}", taskName, version);
String registeredName = taskDefinition != null ? taskDefinition.getRegisteredAppName() : taskName;
AppRegistration registration = findTaskAppRegistration(registeredName);
AppRegistration registration = findTaskAppRegistration(registeredName, version);
if (registration == null) {
logger.warn("Cannot find AppRegistration for {}", taskName);
if(StringUtils.hasLength(version)) {
logger.warn("Cannot find AppRegistration for {}:{}", taskName, version);
} else {
logger.warn("Cannot find AppRegistration for {}", taskName);
}
return SchemaVersionTarget.defaultTarget();
}
final AppRegistration finalRegistration = registration;
Expand All @@ -101,16 +118,26 @@ public SchemaVersionTarget findSchemaVersionTarget(String taskName, TaskDefiniti
throw new IllegalStateException("Multiple SchemaVersionTargets for " + registration.getBootVersion());
}
SchemaVersionTarget schemaVersionTarget = versionTargets.get(0);
logger.debug("findSchemaVersionTarget:{}={},{}", taskName, registeredName, schemaVersionTarget);
logger.debug("findSchemaVersionTarget:{}:{}:{}={}", taskName, registeredName, version, schemaVersionTarget);
return schemaVersionTarget;
}

@Override
public AppRegistration findTaskAppRegistration(String registeredAppName) {
AppRegistration registration = registryService.find(registeredAppName, ApplicationType.task);
public AppRegistration findTaskAppRegistration(String registeredName) {
return findTaskAppRegistration(registeredName, null);
}

@Override
public AppRegistration findTaskAppRegistration(String registeredAppName, String version) {
AppRegistration registration = StringUtils.hasLength(version) ?
registryService.find(registeredAppName, ApplicationType.task, version) :
registryService.find(registeredAppName, ApplicationType.task);
if (registration == null) {
registration = registryService.find(registeredAppName, ApplicationType.app);
registration = StringUtils.hasLength(version) ?
registryService.find(registeredAppName, ApplicationType.app, version) :
registryService.find(registeredAppName, ApplicationType.app);
}
logger.debug("findTaskAppRegistration:{}:{}={}", registeredAppName, version, registration);
return registration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public ApplicationType getApplicationType() {
return applicationType;
}

public AppDefinition getAppDefinition() { return appDefinition; }

@Override
public boolean equals(Object obj) {
if (this == obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public AppRegistration find(String name, ApplicationType type) {

@Override
public AppRegistration find(String name, ApplicationType type, String version) {
return this.appRegistrationRepository.findAppRegistrationByNameAndTypeAndVersion(name, type, version);
AppRegistration registration = this.appRegistrationRepository.findAppRegistrationByNameAndTypeAndVersion(name, type, version);
logger.debug("find:{}:{}:{}={}", name, type, version, registration);
return registration;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ public interface TaskExecutionCreationService {
* @param taskName the name to be associated with the {@link TaskExecution}
* @return {@link TaskExecution}
*/
TaskExecution createTaskExecution(String taskName);
TaskExecution createTaskExecution(String taskName, String version);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ TaskExecutionInformation findTaskExecutionInformation(String taskName,
List<AppDeploymentRequest> createTaskDeploymentRequests(String taskName, String dslText);

Set<String> composedTaskChildNames(String taskName);
Set<String> taskNames(String taskName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,12 @@ public void schedule(
}

String taskAppName = taskDefinition.getRegisteredAppName();
SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(taskAppName, taskDefinitionReader);
String taskLabel = taskDefinition.getAppDefinition().getName();
if(!StringUtils.hasText(taskLabel)) {
taskLabel = taskAppName;
}
String version = taskDeploymentProperties.get("version." + taskLabel);
SchemaVersionTarget schemaVersionTarget = aggregateExecutionSupport.findSchemaVersionTarget(taskAppName, version, taskDefinition);
Assert.notNull(schemaVersionTarget, "schemaVersionTarget not found for " + taskAppName);
TaskParser taskParser = new TaskParser(taskDefinition.getName(), taskDefinition.getDslText(), true, true);
TaskNode taskNode = taskParser.parse();
Expand Down Expand Up @@ -258,8 +263,15 @@ public void schedule(
if (names.size() > 1) {
appId = names.get(1);
}
SchemaVersionTarget appSchemaTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(registeredName, taskDefinitionReader);
logger.debug("ctr:{}:registeredName={}, schemaTarget={}", names, registeredName, appSchemaTarget.getName());
String appVersion = taskDeploymentProperties.get("version." + taskAppName + "-" + appId + "." + appId);
if(!StringUtils.hasText(appVersion)) {
appVersion = taskDeploymentProperties.get("version." + taskAppName + "-" + appId);
}
if(!StringUtils.hasText(appVersion)) {
appVersion = taskDeploymentProperties.get("version." + appId);
}
SchemaVersionTarget appSchemaTarget = this.aggregateExecutionSupport.findSchemaVersionTarget(registeredName, appVersion, taskDefinitionReader);
logger.debug("ctr:{}:registeredName={}, version={}, schemaTarget={}", names, registeredName, appVersion, appSchemaTarget.getName());
taskDeploymentProperties.put("app.composed-task-runner.composed-task-app-properties.app." + scheduleName + "-" + appId + ".spring.cloud.task.tablePrefix",
appSchemaTarget.getTaskPrefix());
taskDeploymentProperties.put("app.composed-task-runner.composed-task-app-properties.app." + appId + ".spring.cloud.task.tablePrefix",
Expand Down Expand Up @@ -437,9 +449,7 @@ private Launcher getTaskLauncher(String platformName) {
private List<Launcher> getLaunchers() {
List<Launcher> launchers = new ArrayList<>();
for (TaskPlatform taskPlatform : this.taskPlatforms) {
for (Launcher launcher : taskPlatform.getLaunchers()) {
launchers.add(launcher);
}
launchers.addAll(taskPlatform.getLaunchers());
}
return launchers;
}
Expand Down
Loading

0 comments on commit 6f6a096

Please sign in to comment.