Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add get current job #592 #632

Merged
merged 15 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public final List<ScheduledRepairJobView> getCurrentRepairJobs()
return myDelegateRepairSchedulerImpl.getCurrentRepairJobs();
}

@Override
public final String getCurrentJobStatus()
{
return myScheduleManager.getCurrentJobStatus();
}

@ObjectClassDefinition
public @interface Configuration
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public class ScheduleManagerService implements ScheduleManager
unbind = "unbindRunPolicy")
private final Set<RunPolicy> myRunPolicies = Sets.newConcurrentHashSet();

@Override
public final String getCurrentJobStatus()
{
return myDelegateSchedulerManager.getCurrentJobStatus();
}

@Reference (service = LockFactory.class,
cardinality = ReferenceCardinality.MANDATORY,
policy = ReferencePolicy.STATIC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ public interface RepairScheduler
* @return the list of the currently scheduled repair jobs.
*/
List<ScheduledRepairJobView> getCurrentRepairJobs();

/**
* Retrieves the current status of the job being managed by this scheduler.
* <p>
* It's intended for monitoring and logging purposes, allowing users to query the job's current state
* without affecting its execution.
*
* @return A {@code String} representing the current status of the job.
*/
String getCurrentJobStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ private RepairSchedulerImpl(final Builder builder)
myRepairHistory = builder.myRepairHistory;
}

@Override
public String getCurrentJobStatus()
{
return myScheduleManager.getCurrentJobStatus();
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,14 @@ public interface ScheduleManager
* The job to deschedule.
*/
void deschedule(ScheduledJob job);

/**
* Retrieves the current status of the job being managed by this scheduler.
* <p>
* It's intended for monitoring and logging purposes, allowing users to query the job's current state
* without affecting its execution.
*
* @return A {@code String} representing the current status of the job.
*/
String getCurrentJobStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -38,6 +39,7 @@ public final class ScheduleManagerImpl implements ScheduleManager, Closeable
static final long DEFAULT_RUN_DELAY_IN_MS = TimeUnit.SECONDS.toMillis(30);

private final ScheduledJobQueue myQueue = new ScheduledJobQueue(new DefaultJobComparator());
private final AtomicReference<ScheduledJob> currentExecutingJob = new AtomicReference<>();
private final Set<RunPolicy> myRunPolicies = Sets.newConcurrentHashSet();
private final ScheduledFuture<?> myRunFuture;

Expand All @@ -55,6 +57,20 @@ private ScheduleManagerImpl(final Builder builder)
TimeUnit.MILLISECONDS);
}

@Override
public String getCurrentJobStatus()
{
ScheduledJob job = currentExecutingJob.get();
if (job != null)
{
String jobId = job.getId().toString();
return "Running Job - ID: " + jobId + ", Status: " + "Running";
DanielwEriksson marked this conversation as resolved.
Show resolved Hide resolved
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
return "No job is currently running";
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
}
}
public boolean addRunPolicy(final RunPolicy runPolicy)
{
LOG.debug("Run policy {} added", runPolicy);
Expand Down Expand Up @@ -151,11 +167,16 @@ private void tryRunNext()
{
for (ScheduledJob next : myQueue)
{
if (validate(next) && tryRunTasks(next))
if (validate(next))
{
break;
currentExecutingJob.set(next); // Set the currently executing job
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
if (tryRunTasks(next))
{
break;
}
}
}
currentExecutingJob.set(null); // Reset after trying to run a job
DanielwEriksson marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean validate(final ScheduledJob job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.Arrays;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.ericsson.bss.cassandra.ecchronos.core.exceptions.ScheduledJobException;
Expand All @@ -29,6 +30,11 @@ public DummyJob(Priority priority)
super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build());
}

public DummyJob(Priority priority, UUID jobId)
{
super(new ConfigurationBuilder().withPriority(priority).build(), jobId);
}

public boolean hasRun()
{
return hasRun;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.*;
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -241,6 +239,25 @@ public void testDescheduleRunningJob() throws InterruptedException
assertThat(myScheduler.getQueueSize()).isEqualTo(0);
}

@Test
public void testGetCurrentJobStatus() throws InterruptedException
{
CountDownLatch latch = new CountDownLatch(1);
UUID jobId = UUID.randomUUID();
ScheduledJob testJob = new TestScheduledJob(
new ScheduledJob.ConfigurationBuilder()
.withPriority(ScheduledJob.Priority.LOW)
.withRunInterval(1, TimeUnit.SECONDS)
.build(),
jobId,
latch);
myScheduler.schedule(testJob);
new Thread(() -> myScheduler.run()).start();
Thread.sleep(100);
VictorCavichioli marked this conversation as resolved.
Show resolved Hide resolved
assertThat(myScheduler.getCurrentJobStatus()).contains("Running Job - ID: " + jobId.toString());
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
latch.countDown();

}
private void waitForJobStarted(TestJob job) throws InterruptedException
{
while(!job.hasStarted())
Expand All @@ -266,6 +283,7 @@ private class TestJob extends ScheduledJob
private final int numTasks;
private final Runnable onCompletion;


public TestJob(Priority priority, CountDownLatch cdl)
{
this(priority, cdl, 1, () -> {});
Expand Down Expand Up @@ -350,4 +368,41 @@ public boolean execute()
}
}
}

public class TestScheduledJob extends ScheduledJob
{
private final CountDownLatch taskCompletionLatch;
public TestScheduledJob(Configuration configuration, UUID id, CountDownLatch taskCompletionLatch)
{
super(configuration, id);
this.taskCompletionLatch = taskCompletionLatch;
}
@Override
public Iterator<ScheduledTask> iterator()
{
return Collections.<ScheduledTask> singleton(new ControllableTask(taskCompletionLatch)).iterator();
}
class ControllableTask extends ScheduledTask
{
private final CountDownLatch latch;
public ControllableTask(CountDownLatch latch)
{
this.latch = latch;
}
@Override
public boolean execute()
{
try
{
latch.await();
return true;
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return false;
}
}
}
}
}
17 changes: 17 additions & 0 deletions ecchronos-binary/src/bin/ecctool.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ def get_parser():
add_start_subcommand(sub_parsers)
add_stop_subcommand(sub_parsers)
add_status_subcommand(sub_parsers)
add_running_job_subcommand(sub_parsers)

return parser

def add_running_job_subcommand(sub_parsers):
parser_repairs = sub_parsers.add_parser("running-job", description="Show which (if any) job is currently running ")

parser_repairs.add_argument("-u", "--url", type=str,
help="The ecChronos host to connect to, specified in the format http://<host>:<port>.",
default=None)


def add_repairs_subcommand(sub_parsers):
parser_repairs = sub_parsers.add_parser("repairs",
Expand Down Expand Up @@ -369,6 +377,12 @@ def status(arguments, print_running=False):
print("ecChronos is not running")
sys.exit(1)

def running_job(arguments):
request = rest.V2RepairSchedulerRequest(base_url=arguments.url)
result = request.running_job()
print(result)



def run_subcommand(arguments):
if arguments.subcommand == "repairs":
Expand All @@ -377,6 +391,9 @@ def run_subcommand(arguments):
elif arguments.subcommand == "schedules":
status(arguments)
schedules(arguments)
elif arguments.subcommand == "running-job":
status(arguments)
running_job(arguments)
elif arguments.subcommand == "run-repair":
status(arguments)
run_repair(arguments)
Expand Down
40 changes: 40 additions & 0 deletions ecchronos-binary/src/pylib/ecchronoslib/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,36 @@ def request(self, url, method='GET'):
except Exception as e: # pylint: disable=broad-except
return RequestResult(exception=e,
message="Unable to retrieve resource {0}".format(request_url))
def basic_request(self, url, method='GET'):
request_url = "{0}/{1}".format(self.base_url, url)
try:
request = Request(request_url)
request.get_method = lambda: method
cert_file = os.getenv("ECCTOOL_CERT_FILE")
key_file = os.getenv("ECCTOOL_KEY_FILE")
ca_file = os.getenv("ECCTOOL_CA_FILE")
if cert_file and key_file and ca_file:
context = ssl.create_default_context(cafile=ca_file)
context.load_cert_chain(cert_file, key_file)
response = urlopen(request, context=context)
else:
response = urlopen(request)

data = response.read()

response.close()
return data.decode('UTF-8')
except HTTPError as e:
return RequestResult(status_code=e.code,
message="Unable to retrieve resource {0}".format(request_url),
exception=e)
except URLError as e:
return RequestResult(status_code=404,
message="Unable to connect to {0}".format(request_url),
exception=e)
except Exception as e: # pylint: disable=broad-except
return RequestResult(exception=e,
message="Unable to retrieve resource {0}".format(request_url))


class V2RepairSchedulerRequest(RestRequest):
Expand All @@ -128,6 +158,8 @@ class V2RepairSchedulerRequest(RestRequest):

repair_info_url = PROTOCOL + 'repairInfo'

running_job_url = PROTOCOL + 'running-job'

def __init__(self, base_url=None):
RestRequest.__init__(self, base_url)

Expand Down Expand Up @@ -233,3 +265,11 @@ def get_repair_info(self, keyspace=None, table=None, since=None, # pylint: disa
if result.is_successful():
result = result.transform_with_data(new_data=RepairInfo(result.data))
return result

def running_job(self):
request_url = "{0}/{1}".format(self.base_url, V2RepairSchedulerRequest.running_job_url)
request_url = V2RepairSchedulerRequest.running_job_url

result = self.basic_request(request_url)

return result
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public class RepairManagementScheduleRESTComponent implements ScheduleRepairMana
policy = ReferencePolicy.STATIC)
private volatile RepairScheduler myRepairScheduler;

@Override
public final ResponseEntity<String> getCurrentJobStatus()
{
return myDelegateScheduleRESTImpl.getCurrentJobStatus();
}

private volatile ScheduleRepairManagementREST myDelegateScheduleRESTImpl;

@Activate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,10 @@ public interface ScheduleRepairManagementREST
* @return A JSON representation of {@link Schedule}
*/
ResponseEntity<Schedule> getSchedules(String id, boolean full);

/**
* Retrieves the current status of the job being managed by this scheduler.
*@return A {@code String} representing the current status of the job.
*/
ResponseEntity<String> getCurrentJobStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public ScheduleRepairManagementRESTImpl(final RepairScheduler repairScheduler)
myRepairScheduler = repairScheduler;
}

@Override
@GetMapping(value = REPAIR_MANAGEMENT_ENDPOINT_PREFIX + "/running-job", produces = MediaType.APPLICATION_JSON_VALUE)
public final ResponseEntity<String> getCurrentJobStatus()
{
return ResponseEntity.ok(myRepairScheduler.getCurrentJobStatus());
}

@Override
@GetMapping(value = REPAIR_MANAGEMENT_ENDPOINT_PREFIX + "/schedules", produces = MediaType.APPLICATION_JSON_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ public void setupMocks()
ScheduleREST = new ScheduleRepairManagementRESTImpl(myRepairScheduler);
}

@Test
VictorCavichioli marked this conversation as resolved.
Show resolved Hide resolved
public void testGetCurrentJobOneExist()
{
UUID jobId = UUID.randomUUID();
String t = "Running Job - ID: " + jobId + ", Status: " + "Running";
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
when(myRepairScheduler.getCurrentJobStatus()).thenReturn(t);
assertThat( myRepairScheduler.getCurrentJobStatus()).isEqualTo(t);
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testGetCurrentJobNoneExist()
{
VictorCavichioli marked this conversation as resolved.
Show resolved Hide resolved
String t = "No job is currently running";
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
when(myRepairScheduler.getCurrentJobStatus()).thenReturn(t);
assertThat( myRepairScheduler.getCurrentJobStatus()).isEqualTo(t);
jwaeab marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
public void testGetNoSchedules()
{
Expand Down
Loading