diff --git a/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/RepairSchedulerService.java b/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/RepairSchedulerService.java index 75098dd7e..3ee2bf3be 100644 --- a/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/RepairSchedulerService.java +++ b/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/RepairSchedulerService.java @@ -127,6 +127,12 @@ public final List getCurrentRepairJobs() return myDelegateRepairSchedulerImpl.getCurrentRepairJobs(); } + @Override + public final String getCurrentJobStatus() + { + return myScheduleManager.getCurrentJobStatus(); + } + @ObjectClassDefinition public @interface Configuration { diff --git a/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/ScheduleManagerService.java b/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/ScheduleManagerService.java index fef48ef72..3258c818d 100644 --- a/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/ScheduleManagerService.java +++ b/core.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/osgi/ScheduleManagerService.java @@ -51,6 +51,12 @@ public class ScheduleManagerService implements ScheduleManager unbind = "unbindRunPolicy") private final Set myRunPolicies = Sets.newConcurrentHashSet(); + @Override + public final String getCurrentJobStatus() + { + return myDelegateSchedulerManager.getCurrentJobStatus(); + } + @Reference (service = LockFactory.class, cardinality = ReferenceCardinality.MANDATORY, policy = ReferencePolicy.STATIC) diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairScheduler.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairScheduler.java index 2acdb0791..926711d5a 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairScheduler.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairScheduler.java @@ -45,4 +45,14 @@ public interface RepairScheduler * @return the list of the currently scheduled repair jobs. */ List getCurrentRepairJobs(); + + /** + * Retrieves the current status of the job being managed by this scheduler. + *

+ * It's intended for monitoring and logging purposes, allowing users to query the job's current state + * without affecting its execution. + * + * @return A {@code String} representing the current status of the job. + */ + String getCurrentJobStatus(); } diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairSchedulerImpl.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairSchedulerImpl.java index a1439fd2b..c465faa9a 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairSchedulerImpl.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/RepairSchedulerImpl.java @@ -86,6 +86,12 @@ private RepairSchedulerImpl(final Builder builder) myRepairHistory = builder.myRepairHistory; } + @Override + public String getCurrentJobStatus() + { + return myScheduleManager.getCurrentJobStatus(); + } + @Override public void close() { diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/ScheduleManager.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/ScheduleManager.java index 055399ab1..8e2a85142 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/ScheduleManager.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/ScheduleManager.java @@ -31,4 +31,14 @@ public interface ScheduleManager * The job to deschedule. */ void deschedule(ScheduledJob job); + + /** + * Retrieves the current status of the job being managed by this scheduler. + *

+ * It's intended for monitoring and logging purposes, allowing users to query the job's current state + * without affecting its execution. + * + * @return A {@code String} representing the current status of the job. + */ + String getCurrentJobStatus(); } diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/ScheduleManagerImpl.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/ScheduleManagerImpl.java index e686007d8..65a531488 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/ScheduleManagerImpl.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/ScheduleManagerImpl.java @@ -20,6 +20,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import com.ericsson.bss.cassandra.ecchronos.core.exceptions.LockException; import com.google.common.annotations.VisibleForTesting; @@ -37,7 +38,10 @@ public final class ScheduleManagerImpl implements ScheduleManager, Closeable static final long DEFAULT_RUN_DELAY_IN_MS = TimeUnit.SECONDS.toMillis(30); + private static final String NO_RUNNING_JOB = "No job is currently running"; + private final ScheduledJobQueue myQueue = new ScheduledJobQueue(new DefaultJobComparator()); + private final AtomicReference currentExecutingJob = new AtomicReference<>(); private final Set myRunPolicies = Sets.newConcurrentHashSet(); private final ScheduledFuture myRunFuture; @@ -55,6 +59,20 @@ private ScheduleManagerImpl(final Builder builder) TimeUnit.MILLISECONDS); } + @Override + public String getCurrentJobStatus() + { + ScheduledJob job = currentExecutingJob.get(); + if (job != null) + { + String jobId = job.getId().toString(); + return "Job ID: " + jobId + ", Status: Running"; + } + else + { + return ScheduleManagerImpl.NO_RUNNING_JOB; + } + } public boolean addRunPolicy(final RunPolicy runPolicy) { LOG.debug("Run policy {} added", runPolicy); @@ -151,11 +169,16 @@ private void tryRunNext() { for (ScheduledJob next : myQueue) { - if (validate(next) && tryRunTasks(next)) + if (validate(next)) { - break; + currentExecutingJob.set(next); + if (tryRunTasks(next)) + { + break; + } } } + currentExecutingJob.set(null); } private boolean validate(final ScheduledJob job) diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/DummyJob.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/DummyJob.java index f01bd119a..e5e84d4e1 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/DummyJob.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/DummyJob.java @@ -16,6 +16,7 @@ import java.util.Arrays; import java.util.Iterator; +import java.util.UUID; import java.util.concurrent.TimeUnit; import com.ericsson.bss.cassandra.ecchronos.core.exceptions.ScheduledJobException; @@ -29,6 +30,11 @@ public DummyJob(Priority priority) super(new ConfigurationBuilder().withPriority(priority).withRunInterval(1, TimeUnit.SECONDS).build()); } + public DummyJob(Priority priority, UUID jobId) + { + super(new ConfigurationBuilder().withPriority(priority).build(), jobId); + } + public boolean hasRun() { return hasRun; diff --git a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/TestScheduleManager.java b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/TestScheduleManager.java index 8cbf27780..56b7b0ad5 100644 --- a/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/TestScheduleManager.java +++ b/core/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/scheduling/TestScheduleManager.java @@ -25,8 +25,10 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -241,6 +243,42 @@ public void testDescheduleRunningJob() throws InterruptedException assertThat(myScheduler.getQueueSize()).isEqualTo(0); } + @Test + public void testGetCurrentJobStatus() throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + UUID jobId = UUID.randomUUID(); + ScheduledJob testJob = new TestScheduledJob( + new ScheduledJob.ConfigurationBuilder() + .withPriority(ScheduledJob.Priority.LOW) + .withRunInterval(1, TimeUnit.SECONDS) + .build(), + jobId, + latch); + myScheduler.schedule(testJob); + new Thread(() -> myScheduler.run()).start(); + Thread.sleep(50); + assertThat(myScheduler.getCurrentJobStatus()).isEqualTo("Job ID: " + jobId.toString() + ", Status: Running"); + latch.countDown(); + } + + @Test + public void testGetCurrentJobStatusNoRunning() throws InterruptedException + { + CountDownLatch latch = new CountDownLatch(1); + UUID jobId = UUID.randomUUID(); + ScheduledJob testJob = new TestScheduledJob( + new ScheduledJob.ConfigurationBuilder() + .withPriority(ScheduledJob.Priority.LOW) + .withRunInterval(1, TimeUnit.SECONDS) + .build(), + jobId, + latch); + myScheduler.schedule(testJob); + new Thread(() -> myScheduler.run()).start(); + assertThat(myScheduler.getCurrentJobStatus()).isNotEqualTo("Job ID: " + jobId.toString() + ", Status: Running"); + latch.countDown(); + } private void waitForJobStarted(TestJob job) throws InterruptedException { while(!job.hasStarted()) @@ -266,6 +304,7 @@ private class TestJob extends ScheduledJob private final int numTasks; private final Runnable onCompletion; + public TestJob(Priority priority, CountDownLatch cdl) { this(priority, cdl, 1, () -> {}); @@ -350,4 +389,41 @@ public boolean execute() } } } + + public class TestScheduledJob extends ScheduledJob + { + private final CountDownLatch taskCompletionLatch; + public TestScheduledJob(Configuration configuration, UUID id, CountDownLatch taskCompletionLatch) + { + super(configuration, id); + this.taskCompletionLatch = taskCompletionLatch; + } + @Override + public Iterator iterator() + { + return Collections. singleton(new ControllableTask(taskCompletionLatch)).iterator(); + } + class ControllableTask extends ScheduledTask + { + private final CountDownLatch latch; + public ControllableTask(CountDownLatch latch) + { + this.latch = latch; + } + @Override + public boolean execute() + { + try + { + latch.await(); + return true; + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + } + } + } } diff --git a/docs/ECCTOOL_EXAMPLES.md b/docs/ECCTOOL_EXAMPLES.md index 6ee801af8..33968e2d1 100644 --- a/docs/ECCTOOL_EXAMPLES.md +++ b/docs/ECCTOOL_EXAMPLES.md @@ -168,4 +168,18 @@ Looking at the example output above, the columns are: `Repair time taken` - the time taken for the Cassandra to finish the repairs. By default, repair-info fetches the information on a cluster level. -To check the repair information for the local node use `--local` flag. \ No newline at end of file +To check the repair information for the local node use `--local` flag. + +## running-job + +In this example we will use `ecctool running-job` to check if any job is currently running. It will give one of these +two responses + +```bash +No job is currently running +``` +or + +```bash +Job ID: x-x-x-x-x, Status: Running +``` diff --git a/docs/autogenerated/ECCTOOL.md b/docs/autogenerated/ECCTOOL.md index 708bc241a..6dd973488 100644 --- a/docs/autogenerated/ECCTOOL.md +++ b/docs/autogenerated/ECCTOOL.md @@ -215,3 +215,24 @@ Stops the ecChronos instance by pid fetched from the specified pid file. # Examples For example usage and explanation about output refer to [ECCTOOL_EXAMPLES.md](../ECCTOOL_EXAMPLES.md) + +## ecctool running-job + +Show which (if any) job that is currently running. + +```console +usage: ecctool running-job [-h] [-u URL] + +Show which (if any) job is currently running + +optional arguments: + -h, --help show this help message and exit + -u URL, --url URL The ecChronos host to connect to, specified in the format http://:. +``` + +### -h, --help +show this help message and exit + + +### -u <url>, --url <url> +The ecChronos host to connect to, specified in the format [http:/](http:/)/<host>:<port>. diff --git a/ecchronos-binary/src/bin/ecctool.py b/ecchronos-binary/src/bin/ecctool.py index 97fe2773a..a5d370c4f 100755 --- a/ecchronos-binary/src/bin/ecctool.py +++ b/ecchronos-binary/src/bin/ecctool.py @@ -50,9 +50,17 @@ def get_parser(): add_start_subcommand(sub_parsers) add_stop_subcommand(sub_parsers) add_status_subcommand(sub_parsers) + add_running_job_subcommand(sub_parsers) return parser +def add_running_job_subcommand(sub_parsers): + parser_repairs = sub_parsers.add_parser("running-job", description="Show which (if any) job is currently running ") + + parser_repairs.add_argument("-u", "--url", type=str, + help="The ecChronos host to connect to, specified in the format http://:.", + default=None) + def add_repairs_subcommand(sub_parsers): parser_repairs = sub_parsers.add_parser("repairs", @@ -369,6 +377,12 @@ def status(arguments, print_running=False): print("ecChronos is not running") sys.exit(1) +def running_job(arguments): + request = rest.V2RepairSchedulerRequest(base_url=arguments.url) + result = request.running_job() + print(result) + + def run_subcommand(arguments): if arguments.subcommand == "repairs": @@ -377,6 +391,9 @@ def run_subcommand(arguments): elif arguments.subcommand == "schedules": status(arguments) schedules(arguments) + elif arguments.subcommand == "running-job": + status(arguments) + running_job(arguments) elif arguments.subcommand == "run-repair": status(arguments) run_repair(arguments) diff --git a/ecchronos-binary/src/pylib/ecchronoslib/rest.py b/ecchronos-binary/src/pylib/ecchronoslib/rest.py index 805c5b9d6..99de38d6e 100644 --- a/ecchronos-binary/src/pylib/ecchronoslib/rest.py +++ b/ecchronos-binary/src/pylib/ecchronoslib/rest.py @@ -109,6 +109,36 @@ def request(self, url, method='GET'): except Exception as e: # pylint: disable=broad-except return RequestResult(exception=e, message="Unable to retrieve resource {0}".format(request_url)) + def basic_request(self, url, method='GET'): + request_url = "{0}/{1}".format(self.base_url, url) + try: + request = Request(request_url) + request.get_method = lambda: method + cert_file = os.getenv("ECCTOOL_CERT_FILE") + key_file = os.getenv("ECCTOOL_KEY_FILE") + ca_file = os.getenv("ECCTOOL_CA_FILE") + if cert_file and key_file and ca_file: + context = ssl.create_default_context(cafile=ca_file) + context.load_cert_chain(cert_file, key_file) + response = urlopen(request, context=context) + else: + response = urlopen(request) + + data = response.read() + + response.close() + return data.decode('UTF-8') + except HTTPError as e: + return RequestResult(status_code=e.code, + message="Unable to retrieve resource {0}".format(request_url), + exception=e) + except URLError as e: + return RequestResult(status_code=404, + message="Unable to connect to {0}".format(request_url), + exception=e) + except Exception as e: # pylint: disable=broad-except + return RequestResult(exception=e, + message="Unable to retrieve resource {0}".format(request_url)) class V2RepairSchedulerRequest(RestRequest): @@ -128,6 +158,8 @@ class V2RepairSchedulerRequest(RestRequest): repair_info_url = PROTOCOL + 'repairInfo' + running_job_url = PROTOCOL + 'running-job' + def __init__(self, base_url=None): RestRequest.__init__(self, base_url) @@ -233,3 +265,11 @@ def get_repair_info(self, keyspace=None, table=None, since=None, # pylint: disa if result.is_successful(): result = result.transform_with_data(new_data=RepairInfo(result.data)) return result + + def running_job(self): + request_url = "{0}/{1}".format(self.base_url, V2RepairSchedulerRequest.running_job_url) + request_url = V2RepairSchedulerRequest.running_job_url + + result = self.basic_request(request_url) + + return result diff --git a/ecchronos-binary/src/pylib/ecchronoslib/table_formatter.py b/ecchronos-binary/src/pylib/ecchronoslib/table_formatter.py index d4d622ed0..54f70f86b 100644 --- a/ecchronos-binary/src/pylib/ecchronoslib/table_formatter.py +++ b/ecchronos-binary/src/pylib/ecchronoslib/table_formatter.py @@ -20,8 +20,9 @@ def calculate_max_len(data, i): max_len = 0 for array in data: current_len = len(str(array[i])) - if current_len > max_len: - max_len = current_len + max_len = max(max_len, current_len) + # if current_len > max_len: + # max_len = current_len return max_len diff --git a/rest.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/osgi/RepairManagementScheduleRESTComponent.java b/rest.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/osgi/RepairManagementScheduleRESTComponent.java index 7fb9e837a..036fd43c7 100644 --- a/rest.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/osgi/RepairManagementScheduleRESTComponent.java +++ b/rest.osgi/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/osgi/RepairManagementScheduleRESTComponent.java @@ -38,6 +38,12 @@ public class RepairManagementScheduleRESTComponent implements ScheduleRepairMana policy = ReferencePolicy.STATIC) private volatile RepairScheduler myRepairScheduler; + @Override + public final ResponseEntity getCurrentJobStatus() + { + return myDelegateScheduleRESTImpl.getCurrentJobStatus(); + } + private volatile ScheduleRepairManagementREST myDelegateScheduleRESTImpl; @Activate diff --git a/rest/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/ScheduleRepairManagementREST.java b/rest/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/ScheduleRepairManagementREST.java index 213d0f998..e03ecb834 100644 --- a/rest/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/ScheduleRepairManagementREST.java +++ b/rest/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/ScheduleRepairManagementREST.java @@ -43,4 +43,10 @@ public interface ScheduleRepairManagementREST * @return A JSON representation of {@link Schedule} */ ResponseEntity getSchedules(String id, boolean full); + + /** + * Retrieves the current status of the job being managed by this scheduler. + *@return A {@code String} representing the current status of the job. + */ + ResponseEntity getCurrentJobStatus(); } diff --git a/rest/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/ScheduleRepairManagementRESTImpl.java b/rest/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/ScheduleRepairManagementRESTImpl.java index d8112b9b2..cced1caf0 100644 --- a/rest/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/ScheduleRepairManagementRESTImpl.java +++ b/rest/src/main/java/com/ericsson/bss/cassandra/ecchronos/rest/ScheduleRepairManagementRESTImpl.java @@ -56,6 +56,12 @@ public ScheduleRepairManagementRESTImpl(final RepairScheduler repairScheduler) myRepairScheduler = repairScheduler; } + @Override + @GetMapping(value = REPAIR_MANAGEMENT_ENDPOINT_PREFIX + "/running-job", produces = MediaType.APPLICATION_JSON_VALUE) + public final ResponseEntity getCurrentJobStatus() + { + return ResponseEntity.ok(myRepairScheduler.getCurrentJobStatus()); + } @Override @GetMapping(value = REPAIR_MANAGEMENT_ENDPOINT_PREFIX + "/schedules", produces = MediaType.APPLICATION_JSON_VALUE)