Skip to content

Commit

Permalink
Add get current job #592 (#632)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielwEriksson authored Feb 28, 2024
1 parent 505b399 commit 53c42d5
Show file tree
Hide file tree
Showing 16 changed files with 259 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public final List<ScheduledRepairJobView> getCurrentRepairJobs()
return myDelegateRepairSchedulerImpl.getCurrentRepairJobs();
}

@Override
public final String getCurrentJobStatus()
{
return myScheduleManager.getCurrentJobStatus();
}

@ObjectClassDefinition
public @interface Configuration
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public class ScheduleManagerService implements ScheduleManager
unbind = "unbindRunPolicy")
private final Set<RunPolicy> myRunPolicies = Sets.newConcurrentHashSet();

@Override
public final String getCurrentJobStatus()
{
return myDelegateSchedulerManager.getCurrentJobStatus();
}

@Reference (service = LockFactory.class,
cardinality = ReferenceCardinality.MANDATORY,
policy = ReferencePolicy.STATIC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ public interface RepairScheduler
* @return the list of the currently scheduled repair jobs.
*/
List<ScheduledRepairJobView> getCurrentRepairJobs();

/**
* Retrieves the current status of the job being managed by this scheduler.
* <p>
* It's intended for monitoring and logging purposes, allowing users to query the job's current state
* without affecting its execution.
*
* @return A {@code String} representing the current status of the job.
*/
String getCurrentJobStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ private RepairSchedulerImpl(final Builder builder)
myRepairHistory = builder.myRepairHistory;
}

@Override
public String getCurrentJobStatus()
{
return myScheduleManager.getCurrentJobStatus();
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,14 @@ public interface ScheduleManager
* The job to deschedule.
*/
void deschedule(ScheduledJob job);

/**
* Retrieves the current status of the job being managed by this scheduler.
* <p>
* It's intended for monitoring and logging purposes, allowing users to query the job's current state
* without affecting its execution.
*
* @return A {@code String} representing the current status of the job.
*/
String getCurrentJobStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -37,7 +38,10 @@ public final class ScheduleManagerImpl implements ScheduleManager, Closeable

static final long DEFAULT_RUN_DELAY_IN_MS = TimeUnit.SECONDS.toMillis(30);

private static final String NO_RUNNING_JOB = "No job is currently running";

private final ScheduledJobQueue myQueue = new ScheduledJobQueue(new DefaultJobComparator());
private final AtomicReference<ScheduledJob> currentExecutingJob = new AtomicReference<>();
private final Set<RunPolicy> myRunPolicies = Sets.newConcurrentHashSet();
private final ScheduledFuture<?> myRunFuture;

Expand All @@ -55,6 +59,20 @@ private ScheduleManagerImpl(final Builder builder)
TimeUnit.MILLISECONDS);
}

@Override
public String getCurrentJobStatus()
{
ScheduledJob job = currentExecutingJob.get();
if (job != null)
{
String jobId = job.getId().toString();
return "Job ID: " + jobId + ", Status: Running";
}
else
{
return ScheduleManagerImpl.NO_RUNNING_JOB;
}
}
public boolean addRunPolicy(final RunPolicy runPolicy)
{
LOG.debug("Run policy {} added", runPolicy);
Expand Down Expand Up @@ -151,11 +169,16 @@ private void tryRunNext()
{
for (ScheduledJob next : myQueue)
{
if (validate(next) && tryRunTasks(next))
if (validate(next))
{
break;
currentExecutingJob.set(next);
if (tryRunTasks(next))
{
break;
}
}
}
currentExecutingJob.set(null);
}

private boolean validate(final ScheduledJob job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.Arrays;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.ericsson.bss.cassandra.ecchronos.core.exceptions.ScheduledJobException;
Expand All @@ -29,6 +30,11 @@ public DummyJob(Priority priority)
super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build());
}

public DummyJob(Priority priority, UUID jobId)
{
super(new ConfigurationBuilder().withPriority(priority).build(), jobId);
}

public boolean hasRun()
{
return hasRun;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -241,6 +243,42 @@ public void testDescheduleRunningJob() throws InterruptedException
assertThat(myScheduler.getQueueSize()).isEqualTo(0);
}

@Test
public void testGetCurrentJobStatus() throws InterruptedException
{
CountDownLatch latch = new CountDownLatch(1);
UUID jobId = UUID.randomUUID();
ScheduledJob testJob = new TestScheduledJob(
new ScheduledJob.ConfigurationBuilder()
.withPriority(ScheduledJob.Priority.LOW)
.withRunInterval(1, TimeUnit.SECONDS)
.build(),
jobId,
latch);
myScheduler.schedule(testJob);
new Thread(() -> myScheduler.run()).start();
Thread.sleep(50);
assertThat(myScheduler.getCurrentJobStatus()).isEqualTo("Job ID: " + jobId.toString() + ", Status: Running");
latch.countDown();
}

@Test
public void testGetCurrentJobStatusNoRunning() throws InterruptedException
{
CountDownLatch latch = new CountDownLatch(1);
UUID jobId = UUID.randomUUID();
ScheduledJob testJob = new TestScheduledJob(
new ScheduledJob.ConfigurationBuilder()
.withPriority(ScheduledJob.Priority.LOW)
.withRunInterval(1, TimeUnit.SECONDS)
.build(),
jobId,
latch);
myScheduler.schedule(testJob);
new Thread(() -> myScheduler.run()).start();
assertThat(myScheduler.getCurrentJobStatus()).isNotEqualTo("Job ID: " + jobId.toString() + ", Status: Running");
latch.countDown();
}
private void waitForJobStarted(TestJob job) throws InterruptedException
{
while(!job.hasStarted())
Expand All @@ -266,6 +304,7 @@ private class TestJob extends ScheduledJob
private final int numTasks;
private final Runnable onCompletion;


public TestJob(Priority priority, CountDownLatch cdl)
{
this(priority, cdl, 1, () -> {});
Expand Down Expand Up @@ -350,4 +389,41 @@ public boolean execute()
}
}
}

public class TestScheduledJob extends ScheduledJob
{
private final CountDownLatch taskCompletionLatch;
public TestScheduledJob(Configuration configuration, UUID id, CountDownLatch taskCompletionLatch)
{
super(configuration, id);
this.taskCompletionLatch = taskCompletionLatch;
}
@Override
public Iterator<ScheduledTask> iterator()
{
return Collections.<ScheduledTask> singleton(new ControllableTask(taskCompletionLatch)).iterator();
}
class ControllableTask extends ScheduledTask
{
private final CountDownLatch latch;
public ControllableTask(CountDownLatch latch)
{
this.latch = latch;
}
@Override
public boolean execute()
{
try
{
latch.await();
return true;
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return false;
}
}
}
}
}
16 changes: 15 additions & 1 deletion docs/ECCTOOL_EXAMPLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,18 @@ Looking at the example output above, the columns are:
`Repair time taken` - the time taken for the Cassandra to finish the repairs.

By default, repair-info fetches the information on a cluster level.
To check the repair information for the local node use `--local` flag.
To check the repair information for the local node use `--local` flag.

## running-job

In this example we will use `ecctool running-job` to check if any job is currently running. It will give one of these
two responses

```bash
No job is currently running
```
or

```bash
Job ID: x-x-x-x-x, Status: Running
```
21 changes: 21 additions & 0 deletions docs/autogenerated/ECCTOOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,24 @@ Stops the ecChronos instance by pid fetched from the specified pid file.
# Examples

For example usage and explanation about output refer to [ECCTOOL_EXAMPLES.md](../ECCTOOL_EXAMPLES.md)

## ecctool running-job

Show which (if any) job that is currently running.

```console
usage: ecctool running-job [-h] [-u URL]

Show which (if any) job is currently running

optional arguments:
-h, --help show this help message and exit
-u URL, --url URL The ecChronos host to connect to, specified in the format http://<host>:<port>.
```

### -h, --help
show this help message and exit


### -u &lt;url&gt;, --url &lt;url&gt;
The ecChronos host to connect to, specified in the format [http:/](http:/)/&lt;host&gt;:&lt;port&gt;.
17 changes: 17 additions & 0 deletions ecchronos-binary/src/bin/ecctool.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ def get_parser():
add_start_subcommand(sub_parsers)
add_stop_subcommand(sub_parsers)
add_status_subcommand(sub_parsers)
add_running_job_subcommand(sub_parsers)

return parser

def add_running_job_subcommand(sub_parsers):
parser_repairs = sub_parsers.add_parser("running-job", description="Show which (if any) job is currently running ")

parser_repairs.add_argument("-u", "--url", type=str,
help="The ecChronos host to connect to, specified in the format http://<host>:<port>.",
default=None)


def add_repairs_subcommand(sub_parsers):
parser_repairs = sub_parsers.add_parser("repairs",
Expand Down Expand Up @@ -369,6 +377,12 @@ def status(arguments, print_running=False):
print("ecChronos is not running")
sys.exit(1)

def running_job(arguments):
request = rest.V2RepairSchedulerRequest(base_url=arguments.url)
result = request.running_job()
print(result)



def run_subcommand(arguments):
if arguments.subcommand == "repairs":
Expand All @@ -377,6 +391,9 @@ def run_subcommand(arguments):
elif arguments.subcommand == "schedules":
status(arguments)
schedules(arguments)
elif arguments.subcommand == "running-job":
status(arguments)
running_job(arguments)
elif arguments.subcommand == "run-repair":
status(arguments)
run_repair(arguments)
Expand Down
40 changes: 40 additions & 0 deletions ecchronos-binary/src/pylib/ecchronoslib/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,36 @@ def request(self, url, method='GET'):
except Exception as e: # pylint: disable=broad-except
return RequestResult(exception=e,
message="Unable to retrieve resource {0}".format(request_url))
def basic_request(self, url, method='GET'):
request_url = "{0}/{1}".format(self.base_url, url)
try:
request = Request(request_url)
request.get_method = lambda: method
cert_file = os.getenv("ECCTOOL_CERT_FILE")
key_file = os.getenv("ECCTOOL_KEY_FILE")
ca_file = os.getenv("ECCTOOL_CA_FILE")
if cert_file and key_file and ca_file:
context = ssl.create_default_context(cafile=ca_file)
context.load_cert_chain(cert_file, key_file)
response = urlopen(request, context=context)
else:
response = urlopen(request)

data = response.read()

response.close()
return data.decode('UTF-8')
except HTTPError as e:
return RequestResult(status_code=e.code,
message="Unable to retrieve resource {0}".format(request_url),
exception=e)
except URLError as e:
return RequestResult(status_code=404,
message="Unable to connect to {0}".format(request_url),
exception=e)
except Exception as e: # pylint: disable=broad-except
return RequestResult(exception=e,
message="Unable to retrieve resource {0}".format(request_url))


class V2RepairSchedulerRequest(RestRequest):
Expand All @@ -128,6 +158,8 @@ class V2RepairSchedulerRequest(RestRequest):

repair_info_url = PROTOCOL + 'repairInfo'

running_job_url = PROTOCOL + 'running-job'

def __init__(self, base_url=None):
RestRequest.__init__(self, base_url)

Expand Down Expand Up @@ -233,3 +265,11 @@ def get_repair_info(self, keyspace=None, table=None, since=None, # pylint: disa
if result.is_successful():
result = result.transform_with_data(new_data=RepairInfo(result.data))
return result

def running_job(self):
request_url = "{0}/{1}".format(self.base_url, V2RepairSchedulerRequest.running_job_url)
request_url = V2RepairSchedulerRequest.running_job_url

result = self.basic_request(request_url)

return result
Loading

0 comments on commit 53c42d5

Please sign in to comment.