diff --git a/CHANGES.md b/CHANGES.md index f157dce6..ed3fa83f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Version 1.0.0 (Not yet Released) +* Create On Demand Repair Job on Agent - Issue #775 * Modify DistributedNativeConnectionProvider to Return a Map - Issue #778 * Bump Spring, Tomcat, Jackson and other dependencies to Remove Vulnerabilities in Agent - Issue #776 * Add Locks In SchedulerManager - Issue #768 diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java index fb9c10b7..b25e89f1 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java @@ -19,9 +19,12 @@ import com.ericsson.bss.cassandra.ecchronos.connection.DistributedJmxConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.DefaultRepairConfigurationProvider; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.OnDemandRepairSchedulerImpl; import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler.RepairSchedulerImpl; import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.RepairStateFactoryImpl; import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TimeBasedRunPolicy; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairScheduler; import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.RepairScheduler; import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; import com.ericsson.bss.cassandra.ecchronos.core.table.ReplicatedTableProvider; @@ -44,6 +47,7 @@ public class ECChronos implements Closeable private final ECChronosInternals myECChronosInternals; private final RepairSchedulerImpl myRepairSchedulerImpl; private final TimeBasedRunPolicy myTimeBasedRunPolicy; + private final OnDemandRepairSchedulerImpl myOnDemandRepairSchedulerImpl; public ECChronos( final Config configuration, @@ -90,6 +94,18 @@ public ECChronos( .withRepairLockType(configuration.getRepairConfig().getRepairLockType()) .build(); + myOnDemandRepairSchedulerImpl = OnDemandRepairSchedulerImpl.builder() + .withScheduleManager(myECChronosInternals.getScheduleManager()) + .withTableRepairMetrics(myECChronosInternals.getTableRepairMetrics()) + .withJmxProxyFactory(myECChronosInternals.getJmxProxyFactory()) + .withReplicationState(replicationState) + .withRepairLockType(configuration.getRepairConfig().getRepairLockType()) + .withSession(session) + .withRepairConfiguration(configuration.getRepairConfig().asRepairConfiguration()) + .withRepairHistory(repairHistoryService) + .withOnDemandStatus(new OnDemandStatus(nativeConnectionProvider)) + .build(); + AbstractRepairConfigurationProvider repairConfigurationProvider = new FileBasedRepairConfiguration(applicationContext); defaultRepairConfigurationProvider.fromBuilder(DefaultRepairConfigurationProvider.newBuilder() @@ -124,6 +140,12 @@ public ReplicatedTableProvider replicatedTableProvider() return myECChronosInternals.getReplicatedTableProvider(); } + @Bean + public OnDemandRepairScheduler onDemandRepairScheduler() + { + return myOnDemandRepairSchedulerImpl; + } + @Override public final void close() { @@ -131,6 +153,7 @@ public final void close() myTimeBasedRunPolicy.close(); myRepairSchedulerImpl.close(); myECChronosInternals.close(); + myOnDemandRepairSchedulerImpl.close(); } } diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/IncrementalOnDemandRepairJob.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/IncrementalOnDemandRepairJob.java new file mode 100644 index 00000000..37fcf7f4 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/IncrementalOnDemandRepairJob.java @@ -0,0 +1,228 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledTask; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicaRepairGroup; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public final class IncrementalOnDemandRepairJob extends OnDemandRepairJob +{ + private static final Logger LOG = LoggerFactory.getLogger(IncrementalOnDemandRepairJob.class); + private final ReplicationState myReplicationState; + private final List myTasks; + private final int myTotalTasks; + + public IncrementalOnDemandRepairJob(final Builder builder) + { + super(builder.myConfiguration, builder.myJmxProxyFactory, builder.myRepairConfiguration, + builder.myRepairLockType, builder.myOnFinishedHook, builder.myTableRepairMetrics, builder.myOngoingJob, + builder.myCurrentNode); + myReplicationState = Preconditions.checkNotNull(builder.myReplicationState, + "Replication state must be set"); + myTasks = initializeTasks(); + myTotalTasks = myTasks.size(); + } + + private List initializeTasks() + { + ReplicaRepairGroup replicaRepairGroup = new ReplicaRepairGroup( + myReplicationState.getReplicas(getTableReference(), getCurrentNode()), + ImmutableList.of(), -1L); + + RepairGroup.Builder groupBuilder = createRepairGroupBuilder(replicaRepairGroup); + List taskList = new ArrayList<>(); + taskList.add(groupBuilder.build(Priority.HIGHEST.getValue())); + return taskList; + } + + private RepairGroup.Builder createRepairGroupBuilder(final ReplicaRepairGroup replicaRepairGroup) + { + return RepairGroup.newBuilder() + .withTableReference(getTableReference()) + .withRepairConfiguration(getRepairConfiguration()) + .withReplicaRepairGroup(replicaRepairGroup) + .withJmxProxyFactory(getJmxProxyFactory()) + .withTableRepairMetrics(getTableRepairMetrics()) + .withRepairResourceFactory(getRepairLockType().getLockFactory()) + .withRepairLockFactory(REPAIR_LOCK_FACTORY) + .withJobId(getId()); + } + + @Override + public Iterator iterator() + { + return new ArrayList<>(myTasks).iterator(); + } + + @Override + public OnDemandRepairJobView getView() + { + return new OnDemandRepairJobView( + getId(), + getOngoingJob().getHostId(), + getOngoingJob().getTableReference(), + getStatus(), + getProgress(), + getOngoingJob().getCompletedTime(), getOngoingJob().getRepairType()); + } + + public double getProgress() + { + int finishedTasks = myTotalTasks - myTasks.size(); + return myTotalTasks == 0 || OngoingJob.Status.finished.equals(getOngoingJob().getStatus()) + ? 1 + : (double) finishedTasks / myTotalTasks; + } + + @Override + public void postExecute(final boolean successful, final ScheduledTask task) + { + if (!successful) + { + LOG.error("Error running {}", task); + setFailed(true); + } + else + { + myTasks.remove(task); + } + super.postExecute(successful, task); + } + + @Override + public void finishJob() + { + UUID id = getId(); + getOnFinishedHook().accept(id); + if (myTasks.isEmpty()) + { + getOngoingJob().finishJob(); + LOG.info("Completed incremental on demand repair: {}", id); + } + if (hasFailed()) + { + getOngoingJob().failJob(); + LOG.error("Failed incremental on demand repair: {}", id); + } + super.finishJob(); + } + + @Override + public State getState() + { + if (hasFailed()) + { + return State.FAILED; + } + return myTasks.isEmpty() ? State.FINISHED : State.RUNNABLE; + } + + @Override + public String toString() + { + return String.format("Incremental On Demand Repair job of %s", getTableReference()); + } + + public static class Builder + { + private final Configuration myConfiguration = new ConfigurationBuilder() + .withPriority(Priority.HIGHEST) + .withRunInterval(0, TimeUnit.DAYS) + .build(); + private DistributedJmxProxyFactory myJmxProxyFactory; + private TableRepairMetrics myTableRepairMetrics = null; + private RepairConfiguration myRepairConfiguration = RepairConfiguration.newBuilder().withRepairType(RepairType.INCREMENTAL).build(); + private RepairLockType myRepairLockType; + private Consumer myOnFinishedHook = table -> + { + }; + private Node myCurrentNode; + private OngoingJob myOngoingJob; + private ReplicationState myReplicationState; + + public final Builder withNode(final Node node) + { + this.myCurrentNode = node; + return this; + } + + public final Builder withJmxProxyFactory(final DistributedJmxProxyFactory jmxProxyFactory) + { + this.myJmxProxyFactory = jmxProxyFactory; + return this; + } + + public final Builder withTableRepairMetrics(final TableRepairMetrics tableRepairMetrics) + { + this.myTableRepairMetrics = tableRepairMetrics; + return this; + } + + public final Builder withRepairLockType(final RepairLockType repairLockType) + { + this.myRepairLockType = repairLockType; + return this; + } + + public final Builder withOnFinished(final Consumer onFinishedHook) + { + this.myOnFinishedHook = onFinishedHook; + return this; + } + + public final Builder withRepairConfiguration(final RepairConfiguration repairConfiguration) + { + this.myRepairConfiguration = repairConfiguration; + return this; + } + + public final Builder withOngoingJob(final OngoingJob ongoingJob) + { + this.myOngoingJob = ongoingJob; + return this; + } + + public final Builder withReplicationState(final ReplicationState replicationState) + { + this.myReplicationState = replicationState; + return this; + } + + public final IncrementalOnDemandRepairJob build() + { + return new IncrementalOnDemandRepairJob(this); + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OnDemandRepairJob.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OnDemandRepairJob.java new file mode 100644 index 00000000..27340c9b --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OnDemandRepairJob.java @@ -0,0 +1,152 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockFactory; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import java.util.UUID; +import java.util.function.Consumer; +import com.google.common.base.Preconditions; + +public abstract class OnDemandRepairJob extends ScheduledJob +{ + protected static final RepairLockFactory REPAIR_LOCK_FACTORY = new RepairLockFactoryImpl(); + private final DistributedJmxProxyFactory myJmxProxyFactory; + private final RepairConfiguration myRepairConfiguration; + private final RepairLockType myRepairLockType; + private final Consumer myOnFinishedHook; + private final TableRepairMetrics myTableRepairMetrics; + private final OngoingJob myOngoingJob; + private final Node myCurrentNode; + + private boolean hasFailed; + + public OnDemandRepairJob(final Configuration configuration, final DistributedJmxProxyFactory jmxProxyFactory, + final RepairConfiguration repairConfiguration, final RepairLockType repairLockType, + final Consumer onFinishedHook, final TableRepairMetrics tableRepairMetrics, + final OngoingJob ongoingJob, + final Node currentNode) + { + super(configuration, ongoingJob.getJobId()); + + myOngoingJob = Preconditions.checkNotNull(ongoingJob, + "Ongoing job must be set"); + myJmxProxyFactory = Preconditions.checkNotNull(jmxProxyFactory, + "JMX Proxy Factory must be set"); + myTableRepairMetrics = Preconditions.checkNotNull(tableRepairMetrics, + "Table repair metrics must be set"); + myRepairConfiguration = Preconditions.checkNotNull(repairConfiguration, + "Repair configuration must be set"); + myRepairLockType = Preconditions.checkNotNull(repairLockType, + "Repair lock type must be set"); + myOnFinishedHook = Preconditions.checkNotNull(onFinishedHook, + "On finished hook must be set"); + myCurrentNode = Preconditions.checkNotNull(currentNode, + "On current node must be set"); + } + + /** + * Get the table reference for this job. + * @return Table reference + */ + public TableReference getTableReference() + { + return myOngoingJob.getTableReference(); + } + + protected final DistributedJmxProxyFactory getJmxProxyFactory() + { + return myJmxProxyFactory; + } + + /** + * Get the repair configuration for this job. + * @return Repair configuration + */ + public RepairConfiguration getRepairConfiguration() + { + return myRepairConfiguration; + } + + protected final RepairLockType getRepairLockType() + { + return myRepairLockType; + } + + protected final TableRepairMetrics getTableRepairMetrics() + { + return myTableRepairMetrics; + } + + protected final Consumer getOnFinishedHook() + { + return myOnFinishedHook; + } + + public final OngoingJob getOngoingJob() + { + return myOngoingJob; + } + + protected final Node getCurrentNode() + { + return myCurrentNode; + } + + protected final void setFailed(final boolean failed) + { + hasFailed = failed; + } + + protected final boolean hasFailed() + { + return hasFailed; + } + + protected final OnDemandRepairJobView.Status getStatus() + { + if (hasFailed || getOngoingJob().getStatus() == OngoingJob.Status.failed) + { + return OnDemandRepairJobView.Status.ERROR; + } + else if (getOngoingJob().getStatus() == OngoingJob.Status.finished) + { + return OnDemandRepairJobView.Status.COMPLETED; + } + return OnDemandRepairJobView.Status.IN_QUEUE; + } + + public abstract OnDemandRepairJobView getView(); + + @Override + public final long getLastSuccessfulRun() + { + return -1; + } + + @Override + public final boolean runnable() + { + return getState().equals(State.RUNNABLE); + } +} + diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OnDemandStatus.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OnDemandStatus.java new file mode 100644 index 00000000..0294dd8b --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OnDemandStatus.java @@ -0,0 +1,448 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.data.UdtValue; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.type.UserDefinedType; +import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; +import com.ericsson.bss.cassandra.ecchronos.core.impl.metadata.NodeResolverImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.ReplicationStateImpl; +import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.NodeResolver; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.update; + +public final class OnDemandStatus +{ + private static final Logger LOG = LoggerFactory.getLogger(OnDemandStatus.class); + + private static final String KEYSPACE_NAME = "ecchronos"; + private static final String TABLE_NAME = "on_demand_repair_status"; + private static final String HOST_ID_COLUMN_NAME = "host_id"; + private static final String STATUS_COLUMN_NAME = "status"; + private static final String JOB_ID_COLUMN_NAME = "job_id"; + private static final String REPAIR_TYPE_COLUMN_NAME = "repair_type"; + private static final String TABLE_REFERENCE_COLUMN_NAME = "table_reference"; + private static final String TOKEN_MAP_HASH_COLUMN_NAME = "token_map_hash"; + private static final String REPAIRED_TOKENS_COLUMN_NAME = "repaired_tokens"; + private static final String UDT_TOKEN_RANGE_NAME = "token_range"; + private static final String UDT_START_TOKEN_NAME = "start"; + private static final String UDT_END_TOKEN_NAME = "end"; + private static final String UDT_TABLE_REFERENCE_NAME = "table_reference"; + private static final String UDT_ID_NAME = "id"; + private static final String UDT_KEYSPACE_NAME = "keyspace_name"; + private static final String UDT_TABLE_NAME = "table_name"; + private static final String COMPLETED_TIME_COLUMN_NAME = "completed_time"; + + private final CqlSession mySession; + private final List myNodeList; + private final UserDefinedType myUDTTokenType; + private final UserDefinedType myUDTTableReferenceType; + private final PreparedStatement myGetStatusStatement; + private final PreparedStatement myInsertNewJobStatement; + private final PreparedStatement myUpdateRepairedTokenForJobStatement; + private final PreparedStatement myUpdateJobToFinishedStatement; + private final PreparedStatement myUpdateJobToFailedStatement; + private final TableReferenceFactory myTableReferenceFactory; + + /** + * Constructor. + * + * @param nativeConnectionProvider The native connection provider. + */ + public OnDemandStatus(final DistributedNativeConnectionProvider nativeConnectionProvider) + { + mySession = nativeConnectionProvider.getCqlSession(); + myNodeList = new ArrayList<>(nativeConnectionProvider.getNodes().values()); + myTableReferenceFactory = new TableReferenceFactoryImpl(mySession); + myUDTTokenType = mySession.getMetadata() + .getKeyspace(KEYSPACE_NAME) + .flatMap(ks -> ks.getUserDefinedType(UDT_TOKEN_RANGE_NAME)) + .orElseThrow(() -> new IllegalArgumentException("Missing UDT " + UDT_TOKEN_RANGE_NAME)); + myUDTTableReferenceType = mySession.getMetadata() + .getKeyspace(KEYSPACE_NAME) + .flatMap(ks -> ks.getUserDefinedType(UDT_TABLE_REFERENCE_NAME)) + .orElseThrow(() -> new IllegalArgumentException("Missing UDT " + UDT_TABLE_REFERENCE_NAME)); + + SimpleStatement getStatusStatement = selectFrom(KEYSPACE_NAME, TABLE_NAME) + .all() + .whereColumn(HOST_ID_COLUMN_NAME) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + SimpleStatement insertNewJobStatement = insertInto(KEYSPACE_NAME, TABLE_NAME) + .value(HOST_ID_COLUMN_NAME, bindMarker()) + .value(JOB_ID_COLUMN_NAME, bindMarker()) + .value(TABLE_REFERENCE_COLUMN_NAME, bindMarker()) + .value(TOKEN_MAP_HASH_COLUMN_NAME, bindMarker()) + .value(REPAIRED_TOKENS_COLUMN_NAME, bindMarker()) + .value(REPAIR_TYPE_COLUMN_NAME, bindMarker()) + .value(STATUS_COLUMN_NAME, literal("started")) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + SimpleStatement updateRepairedTokenForJobStatement = update(KEYSPACE_NAME, TABLE_NAME) + .setColumn(REPAIRED_TOKENS_COLUMN_NAME, bindMarker()) + .whereColumn(HOST_ID_COLUMN_NAME) + .isEqualTo(bindMarker()) + .whereColumn(JOB_ID_COLUMN_NAME) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + SimpleStatement updateJobToFinishedStatement = update(KEYSPACE_NAME, TABLE_NAME) + .setColumn(STATUS_COLUMN_NAME, literal("finished")) + .setColumn(COMPLETED_TIME_COLUMN_NAME, bindMarker()) + .whereColumn(HOST_ID_COLUMN_NAME) + .isEqualTo(bindMarker()) + .whereColumn(JOB_ID_COLUMN_NAME) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + SimpleStatement updateJobToFailedStatement = update(KEYSPACE_NAME, TABLE_NAME) + .setColumn(STATUS_COLUMN_NAME, literal("failed")) + .setColumn(COMPLETED_TIME_COLUMN_NAME, bindMarker()) + .whereColumn(HOST_ID_COLUMN_NAME) + .isEqualTo(bindMarker()) + .whereColumn(JOB_ID_COLUMN_NAME) + .isEqualTo(bindMarker()) + .build() + .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + + myGetStatusStatement = mySession.prepare(getStatusStatement); + myInsertNewJobStatement = mySession.prepare(insertNewJobStatement); + myUpdateRepairedTokenForJobStatement = mySession.prepare(updateRepairedTokenForJobStatement); + myUpdateJobToFinishedStatement = mySession.prepare(updateJobToFinishedStatement); + myUpdateJobToFailedStatement = mySession.prepare(updateJobToFailedStatement); + } + + public List getNodes() + { + return Collections.unmodifiableList(myNodeList); + } + + /** + * Retrieves the ongoing repair jobs with a 'started' status for all nodes in the cluster. + * + *

This method iterates over the list of nodes and fetches ongoing repair jobs for each node. + * It uses the {@link #processResultSet(ReplicationState, UUID)} method to process the result set + * for each node, filtering only those jobs whose status is 'started'. The results are aggregated + * into a map, where the key is the host ID of the node, and the value is a set of ongoing jobs + * associated with that node.

+ * + * @param replicationState The replication state used to determine the repair status for each node. + * This parameter is required to evaluate the ongoing jobs accurately. + * @return A map where each key represents a node's host ID and each value is a set of ongoing + * repair jobs for that node. The map provides a comprehensive view of ongoing jobs with + * status start across the entire cluster. + */ + public Map> getOngoingStartedJobsForAllNodes(final ReplicationState replicationState) + { + Map> allOngoingJobs = new HashMap<>(); + for (Node node : getNodes()) + { + Set ongoingJobs = processResultSet(replicationState, node.getHostId()); + allOngoingJobs.put(node.getHostId(), ongoingJobs); + } + return allOngoingJobs; + } + + /** + * Get all jobs for this host that have the status 'started'. + * + * @param replicationState The replication state. + * @return Set of ongoing jobs + */ + public Set getOngoingJobs(final ReplicationState replicationState, final UUID hostId) + { + return processResultSet(replicationState, hostId); + } + + private Set processResultSet(final ReplicationState replicationState, final UUID hostId) + { + ResultSet result = mySession.execute(myGetStatusStatement.bind(hostId)); + + Set ongoingJobs = new HashSet<>(); + for (Row row : result.all()) + { + OngoingJob.Status status; + try + { + status = OngoingJob.Status.valueOf(row.getString(STATUS_COLUMN_NAME)); + } + catch (IllegalArgumentException e) + { + LOG.warn("Ignoring table repair job with id {}, unable to parse status", + row.getUuid(JOB_ID_COLUMN_NAME)); + continue; + } + + if (status.equals(OngoingJob.Status.started)) + { + createOngoingJob(replicationState, ongoingJobs, row, status, hostId); + } + } + return ongoingJobs; + } + + /** + * Get jobs for all the nodes, independent of their status. + * + * @return Set of ongoing jobs + */ + public Set getAllClusterWideJobs() + { + NodeResolver nodeResolver = new NodeResolverImpl(mySession); + Set ongoingJobs = new HashSet<>(); + for (Node node : getNodes()) + { + ReplicationState replState = new ReplicationStateImpl(nodeResolver, mySession); + ongoingJobs.addAll(getAllJobsForHost(replState, node.getHostId())); + } + return ongoingJobs; + } + + /** + * Get all jobs for this host, independent of the status. + * + * @param replicationState The replication state + * @param hostId + * @return Set of ongoing jobs + */ + public Set getAllJobs(final ReplicationState replicationState, final UUID hostId) + { + return getAllJobsForHost(replicationState, hostId); + } + + private Set getAllJobsForHost(final ReplicationState replicationState, final UUID hostId) + { + ResultSet result = mySession.execute(myGetStatusStatement.bind(hostId)); + + Set ongoingJobs = new HashSet<>(); + for (Row row : result.all()) + { + OngoingJob.Status status; + try + { + status = OngoingJob.Status.valueOf(row.getString(STATUS_COLUMN_NAME)); + } + catch (IllegalArgumentException e) + { + LOG.warn("Ignoring table repair job with id {} and hostId {}, unable to parse status", + row.getUuid(JOB_ID_COLUMN_NAME), + hostId); + continue; + } + + createOngoingJob(replicationState, ongoingJobs, row, status, hostId); + } + + return ongoingJobs; + } + + private void createOngoingJob(final ReplicationState replicationState, + final Set ongoingJobs, + final Row row, + final OngoingJob.Status status, + final UUID hostId) + { + UUID jobId = row.getUuid(JOB_ID_COLUMN_NAME); + int tokenMapHash = row.getInt(TOKEN_MAP_HASH_COLUMN_NAME); + Set repairedTokens = row.getSet(REPAIRED_TOKENS_COLUMN_NAME, UdtValue.class); + UdtValue uDTTableReference = row.getUdtValue(TABLE_REFERENCE_COLUMN_NAME); + String keyspace = uDTTableReference.getString(UDT_KEYSPACE_NAME); + String table = uDTTableReference.getString(UDT_TABLE_NAME); + TableReference tableReference = myTableReferenceFactory.forTable(keyspace, table); + Instant completed = row.get(COMPLETED_TIME_COLUMN_NAME, Instant.class); + RepairType repairType = RepairType.VNODE; + String repairTypeStr = row.getString(REPAIR_TYPE_COLUMN_NAME); + if (repairTypeStr != null && !repairTypeStr.isEmpty()) + { + repairType = RepairType.valueOf(repairTypeStr); + } + Long completedTime = null; + if (completed != null) + { + completedTime = completed.toEpochMilli(); + } + + if (uDTTableReference.getUuid(UDT_ID_NAME).equals(tableReference.getId())) + { + OngoingJob ongoingJob = new OngoingJob.Builder() + .withOnDemandStatus(this) + .withTableReference(tableReference) + .withReplicationState(replicationState) + .withOngoingJobInfo(jobId, tokenMapHash, repairedTokens, status, completedTime, repairType) + .withHostId(hostId) + .withRepairType(repairType) + .build(); + ongoingJobs.add(ongoingJob); + } + else + { + LOG.info("Ignoring table repair job with id {} of table {} as it was for table {}.{}({})", + jobId, + tableReference, + keyspace, + table, + uDTTableReference.getUuid(UDT_ID_NAME)); + } + } + + /** + * Add a new job. + * + * @param jobId The job id. + * @param tableReference The table reference. + * @param tokenMapHash The token map hash. + */ + public void addNewJob(final UUID hostId, + final UUID jobId, + final TableReference tableReference, + final int tokenMapHash, + final RepairType repairType) + { + addNewJob(hostId, jobId, tableReference, tokenMapHash, Collections.EMPTY_SET, repairType); + } + + /** + * Add a new job for a specific host. + * + * @param jobId The job id. + * @param host The host. + * @param tableReference The table reference. + * @param tokenMapHash The token map hash. + * @param repairedRanges The ranges. + */ + public void addNewJob(final UUID host, + final UUID jobId, + final TableReference tableReference, + final int tokenMapHash, + final Set repairedRanges, + final RepairType repairType) + { + Set repairedRangesUDT = new HashSet<>(); + if (repairedRanges != null) + { + repairedRanges.forEach(t -> repairedRangesUDT.add(createUDTTokenRangeValue(t.start, t.end))); + } + UdtValue uDTTableReference = myUDTTableReferenceType.newValue() + .setUuid(UDT_ID_NAME, tableReference.getId()) + .setString(UDT_KEYSPACE_NAME, tableReference.getKeyspace()) + .setString(UDT_TABLE_NAME, tableReference.getTable()); + BoundStatement statement = myInsertNewJobStatement.bind(host, jobId, uDTTableReference, tokenMapHash, + repairedRangesUDT, repairType.toString()); + mySession.execute(statement); + } + + /** + * Update job with repaired tokens. + * + * @param hostId + * @param jobId Job id. + * @param repairedTokens Repaired tokens. + */ + public void updateJob(final UUID hostId, final UUID jobId, final Set repairedTokens) + { + mySession.execute(myUpdateRepairedTokenForJobStatement.bind(repairedTokens, hostId, jobId)); + } + + /** + * Update a job as finished with current time stamp. + * + * @param jobId Id of the job set as finished. + * @param hostId + */ + public void finishJob(final UUID jobId, final UUID hostId) + { + mySession.execute(myUpdateJobToFinishedStatement.bind(Instant.ofEpochMilli(System.currentTimeMillis()), + hostId, jobId)); + } + + /** + * Update a job to failed status with current timestamp. + * + * @param jobId Id of the job to set as failed. + * @param hostId + */ + public void failJob(final UUID jobId, final UUID hostId) + { + mySession.execute(myUpdateJobToFailedStatement.bind(Instant.ofEpochMilli(System.currentTimeMillis()), + hostId, jobId)); + } + + /** + * Create a new UDT token range value from the given start and end tokens. + * + * @param start Start token. + * @param end End token. + * @return UdtValue + */ + public UdtValue createUDTTokenRangeValue(final Long start, final Long end) + { + return myUDTTokenType.newValue() + .setString(UDT_START_TOKEN_NAME, start.toString()) + .setString(UDT_END_TOKEN_NAME, end.toString()); + } + + /** + * Get the start token from a UDT. + * + * @param t The UDT value. + * @return long + */ + public long getStartTokenFrom(final UdtValue t) + { + return Long.valueOf(t.getString(UDT_START_TOKEN_NAME)); + } + + /** + * Get the end token from a UDT. + * + * @param t The UDT value. + * @return long + */ + public long getEndTokenFrom(final UdtValue t) + { + return Long.valueOf(t.getString(UDT_END_TOKEN_NAME)); + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OngoingJob.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OngoingJob.java new file mode 100644 index 00000000..552a8aba --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/OngoingJob.java @@ -0,0 +1,359 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import com.datastax.oss.driver.api.core.data.UdtValue; +import com.google.common.collect.ImmutableSet; + +@SuppressWarnings ("FinalClass") +public final class OngoingJob +{ + public enum Status + { + started, finished, failed + } + + private final UUID myJobId; + private final UUID myHostId; + private final TableReference myTableReference; + private final Map> myTokens; + private final Set myRepairedTokens; + private final OnDemandStatus myOnDemandStatus; + private final ReplicationState myReplicationState; + private final Integer myTokenHash; + private final Status myStatus; + private final long myCompletedTime; + private final RepairType myRepairType; + private final Node myCurrentNode; + + private OngoingJob(final Builder builder) + { + myOnDemandStatus = builder.myOnDemandStatus; + myJobId = builder.myJobId == null ? UUID.randomUUID() : builder.myJobId; + myHostId = builder.myHostId; + myCurrentNode = myOnDemandStatus.getNodes() + .stream() + .filter(node -> myHostId.equals(node.getHostId())) + .findFirst() + .get(); + myTableReference = builder.myTableReference; + myReplicationState = builder.myReplicationState; + myTokens = myReplicationState.getTokenRangeToReplicas(myTableReference, myCurrentNode); + myRepairedTokens = builder.myRepairedTokens; + myTokenHash = builder.myTokenMapHash; + myStatus = builder.myStatus; + myCompletedTime = builder.myCompletedTime; + myRepairType = builder.myRepairType; + + if (myTokenHash == null) + { + myOnDemandStatus.addNewJob(myHostId, myJobId, myTableReference, myTokens.keySet().hashCode(), myRepairType); + } + } + + public UUID getJobId() + { + return myJobId; + } + + public UUID getHostId() + { + return myHostId; + } + + public Status getStatus() + { + return myStatus; + } + + public long getCompletedTime() + { + return myCompletedTime; + } + + public TableReference getTableReference() + { + return myTableReference; + } + + public RepairType getRepairType() + { + return myRepairType; + } + + public Set getRepairedTokens() + { + Set repairedLongTokenRanges = new HashSet<>(); + myRepairedTokens.forEach(t -> repairedLongTokenRanges + .add(new LongTokenRange(myOnDemandStatus.getStartTokenFrom(t), myOnDemandStatus.getEndTokenFrom(t)))); + return repairedLongTokenRanges; + } + + public void finishRanges(final Set ranges) + { + ranges.forEach(t -> myRepairedTokens.add(myOnDemandStatus.createUDTTokenRangeValue(t.start, t.end))); + myOnDemandStatus.updateJob(myHostId, myJobId, myRepairedTokens); + } + + public Map> getTokens() + { + return myTokens; + } + + public boolean hasTopologyChanged() + { + return !myTokens.equals(myReplicationState.getTokenRangeToReplicas(myTableReference, myCurrentNode)) + || (myTokenHash != null + && (myTokenHash != myTokens.keySet().hashCode() + && myTokenHash != myTokens.hashCode())); + } + + public void startClusterWideJob(final RepairType repairType) + { + Map> allTokenRanges = + myReplicationState.getTokenRanges(myTableReference, myCurrentNode); + + Map> repairedRangesPerNode = new HashMap<>(); + Map> remainingRangesPerNode = new HashMap<>(); + + distributeTokenRanges(allTokenRanges, repairedRangesPerNode, remainingRangesPerNode); + + for (Map.Entry> entry : remainingRangesPerNode.entrySet()) + { + DriverNode node = entry.getKey(); + Set remainingRanges = entry.getValue(); + Set repairedRanges = repairedRangesPerNode.get(node); + createClusterWideJob(node, remainingRanges, repairedRanges, repairType); + } + } + + private void distributeTokenRanges( + final Map> allTokenRanges, + final Map> repairedRangesPerNode, + final Map> remainingRangesPerNode) + { + + for (Map.Entry> range : allTokenRanges.entrySet()) + { + processTokenRange(range.getKey(), + range.getValue(), + repairedRangesPerNode, + remainingRangesPerNode); + } + } + + private void processTokenRange(final LongTokenRange tokenRange, + final Set nodes, + final Map> repairedRangesPerNode, + final Map> remainingRangesPerNode) + { + + boolean isRepaired = myTokens.containsKey(tokenRange); + + for (DriverNode node : nodes) + { + if (isRepaired) + { + repairedRangesPerNode + .computeIfAbsent(node, k -> new HashSet<>()) + .add(tokenRange); + } + else + { + remainingRangesPerNode + .computeIfAbsent(node, k -> new HashSet<>()) + .add(tokenRange); + isRepaired = true; // Ensure only one node repairs the range + } + } + } + + private void createClusterWideJob(final DriverNode node, + final Set remainingRanges, + final Set repairedRanges, + final RepairType repairType) + { + + Set allTokenRanges = new HashSet<>(); + if (remainingRanges != null) + { + allTokenRanges.addAll(remainingRanges); + } + if (repairedRanges != null) + { + allTokenRanges.addAll(repairedRanges); + } + + if (repairType == RepairType.INCREMENTAL) + { + myOnDemandStatus.addNewJob( + node.getId(), + myJobId, + myTableReference, + 0, + Collections.emptySet(), + repairType); + } + else + { + myOnDemandStatus.addNewJob( + node.getId(), + myJobId, + myTableReference, + allTokenRanges.hashCode(), + repairedRanges, + repairType); + } + } + + public void finishJob() + { + myOnDemandStatus.finishJob(myJobId, myHostId); + } + + public void failJob() + { + myOnDemandStatus.failJob(myJobId, myHostId); + } + + public static class Builder + { + private UUID myJobId = null; + private UUID myHostId; + private TableReference myTableReference; + private Set myRepairedTokens = new HashSet<>(); + private OnDemandStatus myOnDemandStatus; + private ReplicationState myReplicationState; + private Integer myTokenMapHash = null; + private Status myStatus = Status.started; + private long myCompletedTime = -1; + private RepairType myRepairType = RepairType.VNODE; + + /** + * Ongoing job build with ongoing job info. + * + * @param theJobId The job id. + * @param theTokenMapHash Token map hash. + * @param theRepairedTokens Repaired tokens. + * @param theStatus Status. + * @param theCompletedTime Completion time. + * @return The builder + */ + public Builder withOngoingJobInfo(final UUID theJobId, + final int theTokenMapHash, + final Set theRepairedTokens, + final Status theStatus, + final Long theCompletedTime, + final RepairType repairType) + { + this.myJobId = theJobId; + this.myTokenMapHash = theTokenMapHash; + this.myRepairedTokens = theRepairedTokens; + this.myStatus = theStatus; + if (theCompletedTime != null) + { + this.myCompletedTime = theCompletedTime; + } + if (repairType != null) + { + this.myRepairType = repairType; + } + return this; + } + + /** + * Ongoing job build with table reference. + * + * @param aTableReference Table reference. + * @return The builder + */ + public Builder withTableReference(final TableReference aTableReference) + { + this.myTableReference = aTableReference; + return this; + } + + /** + * Ongoing job build with on demand status. + * + * @param theOnDemandStatus Status. + * @return The builder + */ + public Builder withOnDemandStatus(final OnDemandStatus theOnDemandStatus) + { + this.myOnDemandStatus = theOnDemandStatus; + return this; + } + + /** + * Ongoing job build with replication state. + * + * @param aReplicationState Replication state. + * @return The builder + */ + public Builder withReplicationState(final ReplicationState aReplicationState) + { + this.myReplicationState = aReplicationState; + return this; + } + + /** + * Ongoing job build with host ID. + * + * @param aHostId Host id. + * @return The builder + */ + public Builder withHostId(final UUID aHostId) + { + this.myHostId = aHostId; + return this; + } + + /** + * Ongoing job with repairType. + * + * @param repairType The repair type. + * @return The builder + */ + public Builder withRepairType(final RepairType repairType) + { + this.myRepairType = repairType; + return this; + } + + /** + * Ongoing job build. + * + * @return The job + */ + public OngoingJob build() + { + return new OngoingJob(this); + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairGroup.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairGroup.java index 322df5c0..7558f7f9 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairGroup.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/RepairGroup.java @@ -31,7 +31,6 @@ import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairPolicy; -import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService; import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.ScheduledJobException; @@ -250,7 +249,7 @@ public static class Builder private TableRepairMetrics myTableRepairMetrics; private List myRepairPolicies = new ArrayList<>(); private BigInteger myTokensPerRepair = LongTokenRange.FULL_RANGE; - private RepairHistoryService myRepairHistory; + private RepairHistory myRepairHistory; private Node myNode; private UUID myJobId; private RepairLockFactory myRepairLockFactory; @@ -370,7 +369,7 @@ public Builder withTokensPerRepair(final BigInteger tokensPerRepair) * @param repairHistory Repair history. * @return Builder */ - public Builder withRepairHistory(final RepairHistoryService repairHistory) + public Builder withRepairHistory(final RepairHistory repairHistory) { myRepairHistory = repairHistory; return this; diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/VnodeOnDemandRepairJob.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/VnodeOnDemandRepairJob.java new file mode 100644 index 00000000..a631974d --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/VnodeOnDemandRepairJob.java @@ -0,0 +1,303 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode.VnodeRepairGroupFactory; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledTask; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistory; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicaRepairGroup; +import com.ericsson.bss.cassandra.ecchronos.core.state.VnodeRepairState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * A Job that will schedule and run vnode repair on one table once. It creates VnodeRepairTasks + * to fully repair the table for the current node. Once all VnodeRepairTask VnodeRepairTasks are completed, + * the repair is finished and the job will be descheduled. + */ +public final class VnodeOnDemandRepairJob extends OnDemandRepairJob +{ + private static final Logger LOG = LoggerFactory.getLogger(VnodeOnDemandRepairJob.class); + private final RepairHistory myRepairHistory; + private final Map> myTasks; + private final int myTotalTokens; + + private VnodeOnDemandRepairJob(final Builder builder) + { + super(builder.configuration, builder.jmxProxyFactory, builder.repairConfiguration, + builder.repairLockType, builder.onFinishedHook, builder.tableRepairMetrics, builder.ongoingJob, builder.currentNode); + myRepairHistory = Preconditions.checkNotNull(builder.repairHistory, + "Repair history must be set"); + myTotalTokens = getOngoingJob().getTokens().size(); + myTasks = createRepairTasks(getOngoingJob().getTokens(), getOngoingJob().getRepairedTokens(), builder.currentNode); + } + + private Map> createRepairTasks(final Map> tokenRanges, + final Set repairedTokens, + final Node currentNode) + { + + Map> remainingTokenRanges = filterRemainingTokenRanges(tokenRanges, repairedTokens); + List vnodeRepairStates = createVnodeRepairStates(remainingTokenRanges); + List repairGroups = generateRepairGroups(vnodeRepairStates); + + return mapRepairGroupsToTasks(repairGroups, currentNode); + } + + private Map> + filterRemainingTokenRanges(final Map> tokenRanges, + final Set repairedTokens) + { + if (repairedTokens.isEmpty()) + { + return tokenRanges; + } + Map> remainingTokenRanges = new HashMap<>(tokenRanges); + repairedTokens.forEach(remainingTokenRanges::remove); + return remainingTokenRanges; + } + + private List createVnodeRepairStates(final Map> remainingTokenRanges) + { + List vnodeRepairStates = new ArrayList<>(); + for (Map.Entry> entry : remainingTokenRanges.entrySet()) + { + vnodeRepairStates.add(new VnodeRepairState(entry.getKey(), entry.getValue(), -1)); + } + return vnodeRepairStates; + } + + private List generateRepairGroups(final List vnodeRepairStates) + { + return VnodeRepairGroupFactory.INSTANCE.generateReplicaRepairGroups(vnodeRepairStates); + } + + private Map> mapRepairGroupsToTasks(final List repairGroups, + final Node currentNode) + { + Map> taskMap = new ConcurrentHashMap<>(); + for (ReplicaRepairGroup replicaRepairGroup : repairGroups) + { + Set groupTokenRange = new HashSet<>(); + replicaRepairGroup.iterator().forEachRemaining(groupTokenRange::add); + ScheduledTask task = createScheduledTask(replicaRepairGroup, currentNode); + taskMap.put(task, groupTokenRange); + } + return taskMap; + } + + private ScheduledTask createScheduledTask(final ReplicaRepairGroup replicaRepairGroup, final Node currentNode) + { + return RepairGroup.newBuilder() + .withTableReference(getTableReference()) + .withRepairConfiguration(getRepairConfiguration()) + .withReplicaRepairGroup(replicaRepairGroup) + .withJmxProxyFactory(getJmxProxyFactory()) + .withTableRepairMetrics(getTableRepairMetrics()) + .withRepairResourceFactory(getRepairLockType().getLockFactory()) + .withRepairLockFactory(REPAIR_LOCK_FACTORY) + .withRepairHistory(myRepairHistory) + .withJobId(getId()) + .withNode(currentNode) + .build(ScheduledJob.Priority.HIGHEST.getValue()); + } + + @Override + public OnDemandRepairJobView getView() + { + return new OnDemandRepairJobView( + getId(), + getOngoingJob().getHostId(), + getTableReference(), + getStatus(), + getProgress(), + getOngoingJob().getCompletedTime(), getOngoingJob().getRepairType()); + } + + @Override + public Iterator iterator() + { + return myTasks.keySet().iterator(); + } + + @Override + public void postExecute(final boolean successful, final ScheduledTask task) + { + if (!successful) + { + LOG.error("Error running {}", task); + setFailed(true); + } + else + { + Set repairedTokenSet = myTasks.remove(task); + getOngoingJob().finishRanges(repairedTokenSet); + } + + super.postExecute(successful, task); + } + + @Override + public void finishJob() + { + UUID id = getId(); + getOnFinishedHook().accept(id); + if (myTasks.isEmpty()) + { + getOngoingJob().finishJob(); + LOG.info("Completed on demand repair: {}", id); + } + + if (hasFailed()) + { + getOngoingJob().failJob(); + LOG.error("Failed on demand repair: {}", id); + } + super.finishJob(); + } + + @Override + public ScheduledJob.State getState() + { + if (hasFailed()) + { + LOG.error("Repair job with id {} failed", getId()); + return ScheduledJob.State.FAILED; + } + if (getOngoingJob().hasTopologyChanged()) + { + LOG.error("Repair job with id {} failed. Token ranges have changed since repair has was triggered", + getId()); + setFailed(true); + return ScheduledJob.State.FAILED; + } + return myTasks.isEmpty() ? ScheduledJob.State.FINISHED : ScheduledJob.State.RUNNABLE; + } + + public double getProgress() + { + if (myTotalTokens == 0) + { + LOG.debug("Total tokens for this job are 0"); + return 0; + } + + OngoingJob ongoingJob = getOngoingJob(); + OngoingJob.Status state = ongoingJob.getStatus(); + int repairedTokens = ongoingJob.getRepairedTokens().size(); + return state == OngoingJob.Status.finished ? 1.0 : (double) repairedTokens / myTotalTokens; + } + + @Override + public String toString() + { + return String.format("Vnode On Demand Repair job of %s", getTableReference()); + } + + public static class Builder + { + private final ScheduledJob.Configuration configuration = new ScheduledJob.ConfigurationBuilder() + .withPriority(ScheduledJob.Priority.HIGHEST) + .withRunInterval(0, TimeUnit.DAYS) + .build(); + private DistributedJmxProxyFactory jmxProxyFactory; + private TableRepairMetrics tableRepairMetrics = null; + private RepairConfiguration repairConfiguration = RepairConfiguration.DEFAULT; + private RepairLockType repairLockType; + private Consumer onFinishedHook = table -> + { + }; + private RepairHistory repairHistory; + private OngoingJob ongoingJob; + private Node currentNode; + + public final Builder withNode(final Node node) + { + this.currentNode = node; + return this; + } + + public final Builder withJmxProxyFactory(final DistributedJmxProxyFactory aJMXProxyFactory) + { + this.jmxProxyFactory = aJMXProxyFactory; + return this; + } + + public final Builder withTableRepairMetrics(final TableRepairMetrics theTableRepairMetrics) + { + this.tableRepairMetrics = theTableRepairMetrics; + return this; + } + + public final Builder withRepairLockType(final RepairLockType aRepairLockType) + { + this.repairLockType = aRepairLockType; + return this; + } + + public final Builder withOnFinished(final Consumer theOnFinishedHook) + { + this.onFinishedHook = theOnFinishedHook; + return this; + } + + public final Builder withRepairConfiguration(final RepairConfiguration aRepairConfiguration) + { + this.repairConfiguration = aRepairConfiguration; + return this; + } + + public final Builder withRepairHistory(final RepairHistory aRepairHistory) + { + this.repairHistory = aRepairHistory; + return this; + } + + public final Builder withOngoingJob(final OngoingJob anOngoingJob) + { + this.ongoingJob = anOngoingJob; + return this; + } + + public final VnodeOnDemandRepairJob build() + { + return new VnodeOnDemandRepairJob(this); + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/OnDemandRepairSchedulerImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/OnDemandRepairSchedulerImpl.java new file mode 100644 index 00000000..68594286 --- /dev/null +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/OnDemandRepairSchedulerImpl.java @@ -0,0 +1,523 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler; + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.NodeState; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.IncrementalOnDemandRepairJob; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandRepairJob; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OngoingJob; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.VnodeOnDemandRepairJob; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.Metadata; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairScheduler; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduleManager; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob; +import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistory; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException; +import java.io.Closeable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A factory creating {@link OnDemandRepairJob}'s for tables. + */ +public final class OnDemandRepairSchedulerImpl implements OnDemandRepairScheduler, Closeable +{ + private static final Logger LOG = LoggerFactory.getLogger(OnDemandRepairSchedulerImpl.class); + private static final String ON_DEMAND_JOB_FAIL = "Failed to get ongoing on demand jobs: {}, automatic retry in {}s"; + private static final int ONGOING_JOBS_PERIOD_SECONDS = 10; + + private final Map myScheduledJobs = new HashMap<>(); + private final Object myLock = new Object(); + + private final DistributedJmxProxyFactory myJmxProxyFactory; + private final TableRepairMetrics myTableRepairMetrics; + private final ScheduleManager myScheduleManager; + private final ReplicationState myReplicationState; + private final RepairLockType myRepairLockType; + private final CqlSession mySession; + private final RepairConfiguration myRepairConfiguration; + private final RepairHistory myRepairHistory; + private final OnDemandStatus myOnDemandStatus; + private final ScheduledExecutorService myExecutor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("OngoingJobsScheduler-%d").build()); + + private OnDemandRepairSchedulerImpl(final Builder builder) + { + myJmxProxyFactory = builder.myJmxProxyFactory; + myTableRepairMetrics = builder.myTableRepairMetrics; + myScheduleManager = builder.myScheduleManager; + myReplicationState = builder.myReplicationState; + myRepairLockType = builder.repairLockType; + mySession = builder.session; + myRepairConfiguration = builder.repairConfiguration; + myRepairHistory = builder.repairHistory; + myOnDemandStatus = builder.onDemandStatus; + myExecutor.scheduleAtFixedRate(() -> getOngoingStartedJobsForAllNodes(), 0, ONGOING_JOBS_PERIOD_SECONDS, TimeUnit.SECONDS); + } + + private void getOngoingStartedJobsForAllNodes() + { + try + { + Map> allOngoingJobs = myOnDemandStatus.getOngoingStartedJobsForAllNodes(myReplicationState); + allOngoingJobs.values().forEach(jobs -> jobs.forEach(this::scheduleOngoingJob)); + } + catch (Exception e) + { + logFailureMessage(e); + } + } + + private static void logFailureMessage(final Exception e) + { + LOG.warn(ON_DEMAND_JOB_FAIL, + e.getMessage(), + ONGOING_JOBS_PERIOD_SECONDS); + } + + /** + * Retrieves and schedules ongoing on-demand repair jobs for a specific host. + * + * @param hostId The {@link UUID} representing the host for which to schedule ongoing jobs. + */ + public void scheduleOngoingJobs(final UUID hostId) + { + try + { + Set ongoingJobs = myOnDemandStatus.getOngoingJobs(myReplicationState, hostId); + ongoingJobs.forEach(this::scheduleOngoingJob); + } + catch (Exception e) + { + logFailureMessage(e); + } + } + + /** + * Close. + */ + @Override + public void close() + { + synchronized (myLock) + { + for (OnDemandRepairJob job : myScheduledJobs.values()) + { + descheduleTable(job); + } + myScheduledJobs.clear(); + myExecutor.shutdown(); + } + } + + /** + * Schedule cluster wide job on any random available up node. + * + * @param tableReference + * The table to schedule a job on. + * @return Repair job view list + */ + @Override + public List scheduleClusterWideJob(final TableReference tableReference, + final RepairType repairType) throws EcChronosException + { + UUID randomAvailableNodeId = selectRandomAvailableNode(); + OnDemandRepairJobView jobView = scheduleJob(tableReference, true, repairType, randomAvailableNodeId); + return getAllClusterWideRepairJobs().stream() + .filter(j -> j.getId().equals(jobView.getId())) + .collect(Collectors.toList()); + } + + private UUID selectRandomAvailableNode() throws EcChronosException + { + List availableNodes = myOnDemandStatus.getNodes() + .stream() + .filter(node -> node.getState() == NodeState.UP) + .collect(Collectors.toList()); + + if (availableNodes.isEmpty()) + { + throw new EcChronosException("No available nodes are up and connected to act as coordinator"); + } + + Random random = new Random(); + Node selectedNode = availableNodes.get(random.nextInt(availableNodes.size())); + + return selectedNode.getHostId(); + } + + /** + * Schedule job on particular node. + * + * @param tableReference + * The table to schedule a job on. + * @param repairType The repair type for the on demand repair. + * @return RepairJobView + */ + @Override + public OnDemandRepairJobView + scheduleJob(final TableReference tableReference, final RepairType repairType, final UUID nodeId) throws EcChronosException + { + return scheduleJob(tableReference, false, repairType, nodeId); + } + + private OnDemandRepairJobView scheduleJob(final TableReference tableReference, + final boolean isClusterWide, + final RepairType repairType, + final UUID nodeId) throws EcChronosException + { + synchronized (myLock) + { + validateTableReference(tableReference); + OnDemandRepairJob job = getRepairJob(tableReference, isClusterWide, repairType, nodeId); + myScheduledJobs.put(job.getId(), job); + myScheduleManager.schedule(nodeId, job); + return job.getView(); + } + } + + private void validateTableReference(final TableReference tableReference) throws EcChronosException + { + if (tableReference == null) + { + throw new EcChronosException("Table reference cannot be null"); + } + + Optional keyspace = Metadata.getKeyspace(mySession, tableReference.getKeyspace()); + if (keyspace.isEmpty() || Metadata.getTable(keyspace.get(), tableReference.getTable()).isEmpty()) + { + throw new EcChronosException("Keyspace and/or table does not exist"); + } + } + + private void scheduleOngoingJob(final OngoingJob ongoingJob) + { + synchronized (myLock) + { + OnDemandRepairJob job = getOngoingRepairJob(ongoingJob); + if (myScheduledJobs.putIfAbsent(job.getId(), job) == null) + { + LOG.info("Scheduling ongoing job: {}", job.getId()); + myScheduleManager.schedule(ongoingJob.getHostId(), job); + } + } + } + + public List getActiveRepairJobs() + { + synchronized (myLock) + { + return myScheduledJobs.values() + .stream() + .map(OnDemandRepairJob::getView) + .collect(Collectors.toList()); + } + } + + /** + * Get all cluster wide repair jobs. + * + * @return Repair job view list + */ + @Override + public List getAllClusterWideRepairJobs() + { + return myOnDemandStatus.getAllClusterWideJobs() + .stream() + .map(this::getOngoingRepairJob) + .map(OnDemandRepairJob::getView) + .collect(Collectors.toList()); + } + + /** + * Get all repair jobs for specific host. + * + * @return Repair job view list + */ + @Override + public List getAllRepairJobs(final UUID hostId) + { + return myOnDemandStatus.getAllJobs(myReplicationState, hostId) + .stream() + .map(this::getOngoingRepairJob) + .map(OnDemandRepairJob::getView) + .collect(Collectors.toList()); + } + + private void removeScheduledJob(final UUID id, final UUID hostId) + { + synchronized (myLock) + { + ScheduledJob job = myScheduledJobs.remove(id); + myScheduleManager.deschedule(hostId, job); + } + } + + private void descheduleTable(final OnDemandRepairJob job) + { + if (job != null) + { + myScheduleManager.deschedule(job.getOngoingJob().getHostId(), job); + } + } + + private OnDemandRepairJob getRepairJob(final TableReference tableReference, + final boolean isClusterWide, + final RepairType repairType, + final UUID hostId) + { + OngoingJob ongoingJob = createOngoingJob(tableReference, repairType, hostId); + if (isClusterWide) + { + ongoingJob.startClusterWideJob(repairType); + } + return getOngoingRepairJob(ongoingJob); + } + + private OngoingJob createOngoingJob(final TableReference tableReference, + final RepairType repairType, + final UUID hostId) + { + return new OngoingJob.Builder() + .withOnDemandStatus(myOnDemandStatus) + .withTableReference(tableReference) + .withReplicationState(myReplicationState) + .withHostId(hostId) + .withRepairType(repairType) + .build(); + } + + private OnDemandRepairJob getOngoingRepairJob(final OngoingJob ongoingJob) + { + OnDemandRepairJob job; + Node node = getNodeByHostId(ongoingJob.getHostId()); + + RepairConfiguration repairConfiguration = RepairConfiguration.newBuilder(myRepairConfiguration) + .withRepairType(ongoingJob.getRepairType()) + .build(); + if (ongoingJob.getRepairType().equals(RepairType.INCREMENTAL)) + { + job = buildIncrementalOnDemandRepairJob(ongoingJob, repairConfiguration, node); + } + else + { + job = buildVnodeOnDemandRepairJob(ongoingJob, repairConfiguration, node); + } + return job; + } + + private VnodeOnDemandRepairJob buildVnodeOnDemandRepairJob(final OngoingJob ongoingJob, + final RepairConfiguration repairConfiguration, + final Node node) + { + return new VnodeOnDemandRepairJob.Builder() + .withJmxProxyFactory(myJmxProxyFactory) + .withTableRepairMetrics(myTableRepairMetrics) + .withRepairLockType(myRepairLockType) + .withOnFinished(id -> removeScheduledJob(id, ongoingJob.getHostId())) + .withRepairConfiguration(repairConfiguration) + .withRepairHistory(myRepairHistory) + .withOngoingJob(ongoingJob) + .withNode(node) + .build(); + } + + private IncrementalOnDemandRepairJob buildIncrementalOnDemandRepairJob(final OngoingJob ongoingJob, + final RepairConfiguration repairConfiguration, + final Node node) + { + return new IncrementalOnDemandRepairJob.Builder() + .withJmxProxyFactory(myJmxProxyFactory) + .withTableRepairMetrics(myTableRepairMetrics) + .withRepairLockType(myRepairLockType) + .withOnFinished(id -> removeScheduledJob(id, ongoingJob.getHostId())) + .withRepairConfiguration(repairConfiguration) + .withReplicationState(myReplicationState) + .withOngoingJob(ongoingJob) + .withNode(node) + .build(); + } + + private Node getNodeByHostId(final UUID hostId) + { + return myOnDemandStatus.getNodes() + .stream() + .filter(node -> node.getHostId().equals(hostId)) + .findFirst() + .orElseThrow(() -> new NoSuchElementException("No node found with host ID: " + hostId)); + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private DistributedJmxProxyFactory myJmxProxyFactory; + private TableRepairMetrics myTableRepairMetrics; + private ScheduleManager myScheduleManager; + private ReplicationState myReplicationState; + private RepairLockType repairLockType; + private CqlSession session; + private RepairConfiguration repairConfiguration; + private RepairHistory repairHistory; + private OnDemandStatus onDemandStatus; + + /** + * Build on demand repair scheduler with JMX proxy factory. + * + * @param theJMXProxyFactory JMX proxy factory. + * @return Builder + */ + public Builder withJmxProxyFactory(final DistributedJmxProxyFactory theJMXProxyFactory) + { + myJmxProxyFactory = theJMXProxyFactory; + return this; + } + + /** + * Build on demand repair scheduler with table repair metrics. + * + * @param theTableRepairMetrics Table repair metrics. + * @return Builder + */ + public Builder withTableRepairMetrics(final TableRepairMetrics theTableRepairMetrics) + { + myTableRepairMetrics = theTableRepairMetrics; + return this; + } + + /** + * Build on demand repair scheduler with scheule manager. + * + * @param theScheduleManager Schedule manager. + * @return Builder + */ + public Builder withScheduleManager(final ScheduleManager theScheduleManager) + { + myScheduleManager = theScheduleManager; + return this; + } + + /** + * Build on demand repair scheduler with replication state. + * + * @param theReplicationState Replication state. + * @return Builder + */ + public Builder withReplicationState(final ReplicationState theReplicationState) + { + myReplicationState = theReplicationState; + return this; + } + + /** + * Build on demand repair scheduler with repair lock type. + * + * @param theRepairLockType Repair lock type. + * @return Builder + */ + public Builder withRepairLockType(final RepairLockType theRepairLockType) + { + this.repairLockType = theRepairLockType; + return this; + } + + /** + * Build on demand repair scheduler with session. + * + * @param theSession Session. + * @return Builder + */ + public Builder withSession(final CqlSession theSession) + { + this.session = theSession; + return this; + } + + /** + * Build on demand repair scheduler with repair configuration. + * + * @param theRepairConfiguration Repair configuration. + * @return Builder + */ + public Builder withRepairConfiguration(final RepairConfiguration theRepairConfiguration) + { + this.repairConfiguration = theRepairConfiguration; + return this; + } + + /** + * Build on demand repair scheduler with repair history. + * + * @param theRepairHistory Repair history. + * @return Builder + */ + public Builder withRepairHistory(final RepairHistory theRepairHistory) + { + this.repairHistory = theRepairHistory; + return this; + } + + /** + * Build on demand repair scheduler with on demand status. + * + * @param theOnDemandStatus Status. + * @return Builder + */ + public Builder withOnDemandStatus(final OnDemandStatus theOnDemandStatus) + { + this.onDemandStatus = theOnDemandStatus; + return this; + } + + /** + * Build on demand repair scheduler. + * + * @return OnDemandRepairSchedulerImpl + */ + public OnDemandRepairSchedulerImpl build() + { + return new OnDemandRepairSchedulerImpl(this); + } + } +} diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java index 6d20e248..9bacba1e 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java @@ -276,7 +276,7 @@ private boolean tryRunTask( { LOG.debug("Lock has been acquired on node with Id {} with lock {}", nodeID, lock); boolean successful = runTask(task); - job.postExecute(successful); + job.postExecute(successful, task); return true; } catch (Exception e) diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableRepairJob.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableRepairJob.java index d6314ee4..a54966c3 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableRepairJob.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableRepairJob.java @@ -186,7 +186,7 @@ public Iterator iterator() * If the job ran successfully. */ @Override - public void postExecute(final boolean successful) + public void postExecute(final boolean successful, final ScheduledTask task) { try { @@ -197,7 +197,7 @@ public void postExecute(final boolean successful) LOG.warn("Unable to check repair history, {}", this, e); } - super.postExecute(successful); + super.postExecute(successful, task); } /** diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestOnDemandStatus.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestOnDemandStatus.java new file mode 100644 index 00000000..1c88a8c0 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/TestOnDemandStatus.java @@ -0,0 +1,592 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.data.UdtValue; +import com.datastax.oss.driver.api.core.type.UserDefinedType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.AbstractCassandraContainerTest; +import com.ericsson.bss.cassandra.ecchronos.core.impl.table.TableReferenceFactoryImpl; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReferenceFactory; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.time.Instant; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class TestOnDemandStatus extends AbstractCassandraContainerTest +{ + private static final String STATUS_FAILED = "failed"; + private static final String STATUS_FINISHED = "finished"; + private static final String STATUS_STARTED = "started"; + private static final String KEYSPACE_NAME = "ecchronos"; + private static final String TABLE_NAME = "on_demand_repair_status"; + private static final String TEST_TABLE_NAME = "test_table"; + private static final String HOST_ID_COLUMN_NAME = "host_id"; + private static final String STATUS_COLUMN_NAME = "status"; + private static final String JOB_ID_COLUMN_NAME = "job_id"; + private static final String REPAIR_TYPE_COLUMN_NAME = "repair_type"; + private static final String TABLE_REFERENCE_COLUMN_NAME = "table_reference"; + private static final String TOKEN_MAP_HASH_COLUMN_NAME = "token_map_hash"; + private static final String REPAIRED_TOKENS_COLUMN_NAME = "repaired_tokens"; + private static final String UDT_TABLE_REFERENCE_NAME = "table_reference"; + private static final String UDT_ID_NAME = "id"; + private static final String UDT_KAYSPACE_NAME = "keyspace_name"; + private static final String UDT_TABLE_NAME = "table_name"; + private static final String COMPLETED_TIME_COLUMN_NAME = "completed_time"; + + private UUID myHostId; + private TableReferenceFactory myTableReferenceFactory; + private UserDefinedType myUDTTableReferenceType; + + @Mock + private ReplicationState myReplicationState; + + @Before + public void startup() + { + myHostId = getNativeConnectionProvider().getNodes().entrySet().iterator().next().getKey();; + + mySession.execute(String.format( + "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 1}", + KEYSPACE_NAME)); + mySession.execute( + String.format("CREATE TYPE IF NOT EXISTS %s.token_range (start text, end text)", KEYSPACE_NAME)); + mySession.execute(String.format( + "CREATE TYPE IF NOT EXISTS %s.table_reference (id uuid, keyspace_name text, table_name text)", + KEYSPACE_NAME)); + mySession.execute(String.format( + "CREATE TABLE IF NOT EXISTS %s.%s (host_id uuid, job_id uuid, table_reference frozen, token_map_hash int, repaired_tokens frozen>>, status text, completed_time timestamp, repair_type text, PRIMARY KEY(host_id, job_id)) WITH default_time_to_live = 2592000 AND gc_grace_seconds = 0", + KEYSPACE_NAME, TABLE_NAME)); + mySession.execute( + String.format("CREATE TABLE IF NOT EXISTS %s.%s (col1 int, col2 int, PRIMARY KEY(col1))", KEYSPACE_NAME, + TEST_TABLE_NAME)); + + myTableReferenceFactory = new TableReferenceFactoryImpl(mySession); + myUDTTableReferenceType = mySession.getMetadata().getKeyspace(KEYSPACE_NAME).get() + .getUserDefinedType(UDT_TABLE_REFERENCE_NAME).get(); + } + + @After + public void testCleanup() + { + mySession.execute(String.format("TRUNCATE %s.%s", KEYSPACE_NAME, TABLE_NAME)); + } + + @Test + public void testOndemandStatusIsCreated() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + assertThat(onDemandStatus).isNotNull(); + } + + @Test + public void testUDTToken() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + Long start = -10L; + Long end = 100L; + UdtValue token = onDemandStatus.createUDTTokenRangeValue(start, end); + assertThat(onDemandStatus.getStartTokenFrom(token)).isEqualTo(start); + assertThat(onDemandStatus.getEndTokenFrom(token)).isEqualTo(end); + } + + @Test + public void testAddNewJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + ResultSet result = mySession.execute("SELECT * FROM " + KEYSPACE_NAME + "." + TABLE_NAME); + + List rows = result.all(); + assertThat(rows.size()).isEqualTo(1); + + Row row = rows.get(0); + assertThat(row.getUuid(HOST_ID_COLUMN_NAME)).isEqualByComparingTo(myHostId); + assertThat(row.getUuid(JOB_ID_COLUMN_NAME)).isEqualByComparingTo(jobId); + UdtValue uDTTableReference = row.getUdtValue(TABLE_REFERENCE_COLUMN_NAME); + assertThat(uDTTableReference).isNotNull(); + assertThat(uDTTableReference.getType()).isEqualTo(myUDTTableReferenceType); + assertThat(uDTTableReference.getUuid(UDT_ID_NAME)).isEqualTo(tableReference.getId()); + assertThat(uDTTableReference.getString(UDT_KAYSPACE_NAME)).isEqualTo(tableReference.getKeyspace()); + assertThat(uDTTableReference.getString(UDT_TABLE_NAME)).isEqualTo(tableReference.getTable()); + assertThat(row.getInt(TOKEN_MAP_HASH_COLUMN_NAME)).isEqualTo(hashValue); + assertThat(row.getSet(REPAIRED_TOKENS_COLUMN_NAME, UdtValue.class)).isEmpty(); + assertThat(row.getString(STATUS_COLUMN_NAME)).isEqualTo(STATUS_STARTED); + assertThat(row.get(COMPLETED_TIME_COLUMN_NAME, Instant.class)).isNull(); + assertThat(row.getString(REPAIR_TYPE_COLUMN_NAME)).isEqualTo(RepairType.VNODE.toString()); + } + + @Test + public void testUpdateRepairedTokens() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + + ResultSet result = mySession.execute("SELECT * FROM " + KEYSPACE_NAME + "." + TABLE_NAME); + + List rows = result.all(); + assertThat(rows.size()).isEqualTo(1); + + Row row = rows.get(0); + assertThat(row.getUuid(HOST_ID_COLUMN_NAME)).isEqualByComparingTo(myHostId); + assertThat(row.getUuid(JOB_ID_COLUMN_NAME)).isEqualByComparingTo(jobId); + UdtValue uDTTableReference = row.getUdtValue(TABLE_REFERENCE_COLUMN_NAME); + assertThat(uDTTableReference).isNotNull(); + assertThat(uDTTableReference.getType()).isEqualTo(myUDTTableReferenceType); + assertThat(uDTTableReference.getUuid(UDT_ID_NAME)).isEqualTo(tableReference.getId()); + assertThat(uDTTableReference.getString(UDT_KAYSPACE_NAME)).isEqualTo(tableReference.getKeyspace()); + assertThat(row.getInt(TOKEN_MAP_HASH_COLUMN_NAME)).isEqualTo(hashValue); + assertThat(row.getSet(REPAIRED_TOKENS_COLUMN_NAME, UdtValue.class)).isEqualTo(repairedTokens); + assertThat(row.getString(STATUS_COLUMN_NAME)).isEqualTo(STATUS_STARTED); + assertThat(row.get(COMPLETED_TIME_COLUMN_NAME, Instant.class)).isNull(); + assertThat(row.getString(REPAIR_TYPE_COLUMN_NAME)).isEqualTo(RepairType.VNODE.toString()); + } + + @Test + public void testUpdateToFinished() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + + onDemandStatus.finishJob(jobId, myHostId); + + ResultSet result = mySession.execute("SELECT * FROM " + KEYSPACE_NAME + "." + TABLE_NAME); + + List rows = result.all(); + assertThat(rows.size()).isEqualTo(1); + + Row row = rows.get(0); + assertThat(row.getUuid(HOST_ID_COLUMN_NAME)).isEqualByComparingTo(myHostId); + assertThat(row.getUuid(JOB_ID_COLUMN_NAME)).isEqualByComparingTo(jobId); + UdtValue uDTTableReference = row.getUdtValue(TABLE_REFERENCE_COLUMN_NAME); + assertThat(uDTTableReference).isNotNull(); + assertThat(uDTTableReference.getType()).isEqualTo(myUDTTableReferenceType); + assertThat(uDTTableReference.getUuid(UDT_ID_NAME)).isEqualTo(tableReference.getId()); + assertThat(uDTTableReference.getString(UDT_KAYSPACE_NAME)).isEqualTo(tableReference.getKeyspace()); + assertThat(row.getInt(TOKEN_MAP_HASH_COLUMN_NAME)).isEqualTo(hashValue); + assertThat(row.getSet(REPAIRED_TOKENS_COLUMN_NAME, UdtValue.class)).isEqualTo(repairedTokens); + assertThat(row.getString(STATUS_COLUMN_NAME)).isEqualTo(STATUS_FINISHED); + assertThat(row.get(COMPLETED_TIME_COLUMN_NAME, Instant.class)).isNotNull(); + assertThat(row.getString(REPAIR_TYPE_COLUMN_NAME)).isEqualTo(RepairType.VNODE.toString()); + } + + @Test + public void testUpdateToFailed() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + + onDemandStatus.failJob(jobId, myHostId); + + ResultSet result = mySession.execute("SELECT * FROM " + KEYSPACE_NAME + "." + TABLE_NAME); + + List rows = result.all(); + assertThat(rows.size()).isEqualTo(1); + + Row row = rows.get(0); + assertThat(row.getUuid(HOST_ID_COLUMN_NAME)).isEqualByComparingTo(myHostId); + assertThat(row.getUuid(JOB_ID_COLUMN_NAME)).isEqualByComparingTo(jobId); + UdtValue uDTTableReference = row.getUdtValue(TABLE_REFERENCE_COLUMN_NAME); + assertThat(uDTTableReference).isNotNull(); + assertThat(uDTTableReference.getType()).isEqualTo(myUDTTableReferenceType); + assertThat(uDTTableReference.getUuid(UDT_ID_NAME)).isEqualTo(tableReference.getId()); + assertThat(uDTTableReference.getString(UDT_KAYSPACE_NAME)).isEqualTo(tableReference.getKeyspace()); + assertThat(row.getInt(TOKEN_MAP_HASH_COLUMN_NAME)).isEqualTo(hashValue); + assertThat(row.getSet(REPAIRED_TOKENS_COLUMN_NAME, UdtValue.class)).isEqualTo(repairedTokens); + assertThat(row.getString(STATUS_COLUMN_NAME)).isEqualTo(STATUS_FAILED); + assertThat(row.get(COMPLETED_TIME_COLUMN_NAME, Instant.class)).isNotNull(); + } + + @Test + public void testGetAllClusterWideJobsNoJobs() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + Set clusterWideJobs = onDemandStatus.getAllClusterWideJobs(); + + assertThat(clusterWideJobs).isEmpty(); + } + + @Test + public void testGetAllClusterWideJobs() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set ongoingJobs = onDemandStatus.getAllClusterWideJobs(); + + assertThat(ongoingJobs.size()).isEqualTo(1); + + OngoingJob ongoingJob = ongoingJobs.iterator().next(); + assertThat(ongoingJob.getJobId()).isEqualTo(jobId); + assertThat(ongoingJob.getTableReference().getKeyspace()).isEqualTo(KEYSPACE_NAME); + assertThat(ongoingJob.getTableReference().getTable()).isEqualTo(TEST_TABLE_NAME); + assertThat(ongoingJob.getRepairedTokens()).isEmpty(); + assertThat(ongoingJob.getStatus()).isEqualTo(OngoingJob.Status.started); + assertThat(ongoingJob.getCompletedTime()).isEqualTo(-1L); + assertThat(ongoingJob.getRepairType()).isEqualTo(RepairType.VNODE); + } + + @Test + public void testGetOngoingJobsNoJobs() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + Set ongoingJobs = onDemandStatus.getOngoingJobs(myReplicationState, myHostId); + + assertThat(ongoingJobs).isEmpty(); + } + + @Test + public void testGetOngoingJobsWithNewJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set ongoingJobs = onDemandStatus.getOngoingJobs(myReplicationState, myHostId); + + assertThat(ongoingJobs.size()).isEqualTo(1); + + OngoingJob ongoingJob = ongoingJobs.iterator().next(); + assertThat(ongoingJob.getJobId()).isEqualTo(jobId); + assertThat(ongoingJob.getTableReference().getKeyspace()).isEqualTo(KEYSPACE_NAME); + assertThat(ongoingJob.getTableReference().getTable()).isEqualTo(TEST_TABLE_NAME); + assertThat(ongoingJob.getRepairedTokens()).isEmpty(); + assertThat(ongoingJob.getStatus()).isEqualTo(OngoingJob.Status.started); + assertThat(ongoingJob.getCompletedTime()).isEqualTo(-1L); + assertThat(ongoingJob.getRepairType()).isEqualTo(RepairType.VNODE); + } + + @Test + public void testGetOngoingJobsWithNewTable() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + mySession.execute("DROP TABLE " + KEYSPACE_NAME + "." + TEST_TABLE_NAME); + mySession.execute(String.format("CREATE TABLE %s.%s (col1 int, col2 int, PRIMARY KEY(col1))", KEYSPACE_NAME, + TEST_TABLE_NAME)); + + Set ongoingJobs = onDemandStatus.getOngoingJobs(myReplicationState, myHostId); + + assertThat(ongoingJobs).isEmpty(); + } + + @Test + public void testGetOngoingJobsWithUpdatedJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set expectedRepairedTokens = new HashSet<>(); + expectedRepairedTokens.add(new LongTokenRange(-50L, 700L)); + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + + Set ongoingJobs = onDemandStatus.getOngoingJobs(myReplicationState, myHostId); + + OngoingJob ongoingJob = ongoingJobs.iterator().next(); + assertThat(ongoingJob.getJobId()).isEqualTo(jobId); + assertThat(ongoingJob.getTableReference().getKeyspace()).isEqualTo(KEYSPACE_NAME); + assertThat(ongoingJob.getTableReference().getTable()).isEqualTo(TEST_TABLE_NAME); + assertThat(ongoingJob.getRepairedTokens()).isEqualTo(expectedRepairedTokens); + assertThat(ongoingJob.getStatus()).isEqualTo(OngoingJob.Status.started); + assertThat(ongoingJob.getCompletedTime()).isEqualTo(-1L); + assertThat(ongoingJob.getRepairType()).isEqualTo(RepairType.VNODE); + } + + @Test + public void testGetOngoingJobsWithFinishedJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set expectedRepairedTokens = new HashSet<>(); + expectedRepairedTokens.add(new LongTokenRange(-50L, 700L)); + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + onDemandStatus.finishJob(jobId, myHostId); + + Set ongoingJobs = onDemandStatus.getOngoingJobs(myReplicationState, myHostId); + + assertThat(ongoingJobs).isEmpty(); + } + + @Test + public void testGetOngoingJobsWithFailedJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set expectedRepairedTokens = new HashSet<>(); + expectedRepairedTokens.add(new LongTokenRange(-50L, 700L)); + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + onDemandStatus.failJob(jobId, myHostId); + + Set ongoingJobs = onDemandStatus.getOngoingJobs(myReplicationState, myHostId); + + assertThat(ongoingJobs).isEmpty(); + } + + @Test + public void testGetAllJobsNoJobs() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + Set ongoingJobs = onDemandStatus.getAllJobs(myReplicationState, myHostId); + + assertThat(ongoingJobs).isEmpty(); + } + + @Test + public void testGetAllJobsWithNewJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set ongoingJobs = onDemandStatus.getAllJobs(myReplicationState, myHostId); + + assertThat(ongoingJobs.size()).isEqualTo(1); + + OngoingJob ongoingJob = ongoingJobs.iterator().next(); + assertThat(ongoingJob.getJobId()).isEqualTo(jobId); + assertThat(ongoingJob.getTableReference().getKeyspace()).isEqualTo(KEYSPACE_NAME); + assertThat(ongoingJob.getTableReference().getTable()).isEqualTo(TEST_TABLE_NAME); + assertThat(ongoingJob.getRepairedTokens()).isEmpty(); + assertThat(ongoingJob.getStatus()).isEqualTo(OngoingJob.Status.started); + assertThat(ongoingJob.getCompletedTime()).isEqualTo(-1L); + assertThat(ongoingJob.getRepairType()).isEqualTo(RepairType.VNODE); + } + + @Test + public void testGetAllJobsWithNewTable() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + mySession.execute("DROP TABLE " + KEYSPACE_NAME + "." + TEST_TABLE_NAME); + mySession.execute(String.format("CREATE TABLE %s.%s (col1 int, col2 int, PRIMARY KEY(col1))", KEYSPACE_NAME, + TEST_TABLE_NAME)); + + Set ongoingJobs = onDemandStatus.getAllJobs(myReplicationState, myHostId); + + assertThat(ongoingJobs).isEmpty(); + } + + @Test + public void testGetAllJobsWithUpdatedJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set expectedRepairedTokens = new HashSet<>(); + expectedRepairedTokens.add(new LongTokenRange(-50L, 700L)); + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + + Set ongoingJobs = onDemandStatus.getAllJobs(myReplicationState, myHostId); + + OngoingJob ongoingJob = ongoingJobs.iterator().next(); + assertThat(ongoingJob.getJobId()).isEqualTo(jobId); + assertThat(ongoingJob.getTableReference().getKeyspace()).isEqualTo(KEYSPACE_NAME); + assertThat(ongoingJob.getTableReference().getTable()).isEqualTo(TEST_TABLE_NAME); + assertThat(ongoingJob.getRepairedTokens()).isEqualTo(expectedRepairedTokens); + assertThat(ongoingJob.getStatus()).isEqualTo(OngoingJob.Status.started); + assertThat(ongoingJob.getCompletedTime()).isEqualTo(-1L); + assertThat(ongoingJob.getRepairType()).isEqualTo(RepairType.VNODE); + } + + @Test + public void testGetAllJobsWithFinishedJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set expectedRepairedTokens = new HashSet<>(); + expectedRepairedTokens.add(new LongTokenRange(-50L, 700L)); + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + onDemandStatus.finishJob(jobId, myHostId); + + Set ongoingJobs = onDemandStatus.getAllJobs(myReplicationState, myHostId); + + OngoingJob ongoingJob = ongoingJobs.iterator().next(); + assertThat(ongoingJob.getJobId()).isEqualTo(jobId); + assertThat(ongoingJob.getTableReference().getKeyspace()).isEqualTo(KEYSPACE_NAME); + assertThat(ongoingJob.getTableReference().getTable()).isEqualTo(TEST_TABLE_NAME); + assertThat(ongoingJob.getRepairedTokens()).isEqualTo(expectedRepairedTokens); + assertThat(ongoingJob.getStatus()).isEqualTo(OngoingJob.Status.finished); + assertThat(ongoingJob.getCompletedTime()).isPositive(); + assertThat(ongoingJob.getRepairType()).isEqualTo(RepairType.VNODE); + } + + @Test + public void testGetAllJobsWithFailedJob() + { + OnDemandStatus onDemandStatus = new OnDemandStatus(getNativeConnectionProvider()); + + UUID jobId = UUID.randomUUID(); + int hashValue = 1; + TableReference tableReference = myTableReferenceFactory.forTable(KEYSPACE_NAME, TEST_TABLE_NAME); + Map> tokenMap = new HashMap<>(); + when(myReplicationState.getTokenRangeToReplicas(tableReference, getNativeConnectionProvider().getNodes().get(0))).thenReturn(tokenMap); + onDemandStatus.addNewJob(myHostId, jobId, tableReference, hashValue, RepairType.VNODE); + + Set expectedRepairedTokens = new HashSet<>(); + expectedRepairedTokens.add(new LongTokenRange(-50L, 700L)); + Set repairedTokens = new HashSet<>(); + repairedTokens.add(onDemandStatus.createUDTTokenRangeValue(-50L, 700L)); + onDemandStatus.updateJob(myHostId, jobId, repairedTokens); + onDemandStatus.failJob(jobId, myHostId); + + Set ongoingJobs = onDemandStatus.getAllJobs(myReplicationState, myHostId); + + OngoingJob ongoingJob = ongoingJobs.iterator().next(); + assertThat(ongoingJob.getJobId()).isEqualTo(jobId); + assertThat(ongoingJob.getTableReference().getKeyspace()).isEqualTo(KEYSPACE_NAME); + assertThat(ongoingJob.getTableReference().getTable()).isEqualTo(TEST_TABLE_NAME); + assertThat(ongoingJob.getRepairedTokens()).isEqualTo(expectedRepairedTokens); + assertThat(ongoingJob.getStatus()).isEqualTo(OngoingJob.Status.failed); + assertThat(ongoingJob.getCompletedTime()).isPositive(); + assertThat(ongoingJob.getRepairType()).isEqualTo(RepairType.VNODE); + } +} + + diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/incremental/TestIncrementalOnDemandRepairJob.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/incremental/TestIncrementalOnDemandRepairJob.java new file mode 100644 index 00000000..b21133da --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/incremental/TestIncrementalOnDemandRepairJob.java @@ -0,0 +1,243 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.incremental; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.IncrementalOnDemandRepairJob; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OngoingJob; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledTask; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Iterator; +import java.util.UUID; +import java.util.function.Consumer; +import static com.ericsson.bss.cassandra.ecchronos.core.impl.table.MockTableReferenceFactory.tableReference; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.ignoreStubs; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestIncrementalOnDemandRepairJob +{ + private static final String keyspaceName = "keyspace"; + private static final String tableName = "table"; + + @Mock + private DistributedJmxProxyFactory myJmxProxyFactory; + + @Mock + private TableRepairMetrics myTableRepairMetrics; + + @Mock + private OngoingJob myOngoingJob; + + @Mock + private ReplicationState myReplicationState; + + @Mock + private Consumer myHook; + + @Mock + private Node myNode; + + private final TableReference myTableReference = tableReference(keyspaceName, tableName); + private final UUID myHostId = UUID.randomUUID(); + + @Before + public void setup() + { + when(myOngoingJob.getTableReference()).thenReturn(myTableReference); + UUID uuid = UUID.randomUUID(); + when(myOngoingJob.getJobId()).thenReturn(uuid); + when(myOngoingJob.getHostId()).thenReturn(myHostId); + when(myOngoingJob.getRepairType()).thenReturn(RepairType.INCREMENTAL); + } + + @After + public void finalVerification() + { + verifyNoMoreInteractions(ignoreStubs(myJmxProxyFactory)); + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + } + + @Test + public void testCurrentJobCorrectlyReturned() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference, + OnDemandRepairJobView.Status.IN_QUEUE, 0, System.currentTimeMillis(), RepairType.INCREMENTAL); + assertThat(repairJob.getId()).isEqualTo(repairJob.getId()); + assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1); + assertThat(repairJob.getTableReference()).isEqualTo(myTableReference); + assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference()); + assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus()); + assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId()); + assertThat(repairJob.getView().getRepairType()).isEqualTo(expectedView.getRepairType()); + } + + @Test + public void testCurrentFailedJobCorrectlyReturned() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + Iterator it = repairJob.iterator(); + repairJob.postExecute(false, it.next()); + OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference, + OnDemandRepairJobView.Status.ERROR, 0, System.currentTimeMillis(), RepairType.INCREMENTAL); + assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1); + assertThat(repairJob.getTableReference()).isEqualTo(myTableReference); + assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference()); + assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus()); + assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId()); + assertThat(repairJob.getView().getRepairType()).isEqualTo(expectedView.getRepairType()); + } + + @Test + public void testFailedJobCorrectlyReturned() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + when(myOngoingJob.getStatus()).thenReturn(OngoingJob.Status.failed); + OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference, + OnDemandRepairJobView.Status.ERROR, 0, System.currentTimeMillis(), RepairType.INCREMENTAL); + assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1); + assertThat(repairJob.getTableReference()).isEqualTo(myTableReference); + assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference()); + assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus()); + assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId()); + assertThat(repairJob.getView().getRepairType()).isEqualTo(expectedView.getRepairType()); + } + + @Test + public void testFinishedJobCorrectlyReturned() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + when(myOngoingJob.getStatus()).thenReturn(OngoingJob.Status.finished); + OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference, + OnDemandRepairJobView.Status.COMPLETED, 0, System.currentTimeMillis(), RepairType.INCREMENTAL); + assertThat(repairJob.getId()).isEqualTo(repairJob.getId()); + assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1); + assertThat(repairJob.getTableReference()).isEqualTo(myTableReference); + assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference()); + assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus()); + assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId()); + assertThat(repairJob.getView().getRepairType()).isEqualTo(expectedView.getRepairType()); + } + + @Test + public void testJobFinishedAfterExecution() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + Iterator it = repairJob.iterator(); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.RUNNABLE); + repairJob.postExecute(true, it.next()); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.FINISHED); + } + + @Test + public void testJobFinishedAfterRestart() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + Iterator it = repairJob.iterator(); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.RUNNABLE); + repairJob.postExecute(true, it.next()); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.FINISHED); + } + + @Test + public void testJobUnsuccessful() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + Iterator it = repairJob.iterator(); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.RUNNABLE); + repairJob.postExecute(false, it.next()); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.FAILED); + } + + @Test + public void testGetProgress() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + assertThat(repairJob.getProgress()).isEqualTo(0); + Iterator it = repairJob.iterator(); + repairJob.postExecute(true, it.next()); + assertThat(repairJob.getProgress()).isEqualTo(1); + } + + @Test + public void testGetProgressWhenJobFinished() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + when(myOngoingJob.getStatus()).thenReturn(OngoingJob.Status.finished); + assertThat(repairJob.getProgress()).isEqualTo(1); + } + + @Test + public void testGetProgressWhenJobFailed() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + when(myOngoingJob.getStatus()).thenReturn(OngoingJob.Status.failed); + assertThat(repairJob.getProgress()).isEqualTo(0); + } + + @Test + public void testFinishJobSuccessful() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + Iterator it = repairJob.iterator(); + repairJob.postExecute(true, it.next()); + repairJob.finishJob(); + verify(myHook).accept(any(UUID.class)); + verify(myOngoingJob).finishJob(); + } + + @Test + public void testFinishJobFailed() + { + IncrementalOnDemandRepairJob repairJob = createIncrementalOnDemandRepairJob(); + Iterator it = repairJob.iterator(); + repairJob.postExecute(false, it.next()); + repairJob.finishJob(); + verify(myHook).accept(any(UUID.class)); + verify(myOngoingJob).failJob(); + } + + private IncrementalOnDemandRepairJob createIncrementalOnDemandRepairJob() + { + return new IncrementalOnDemandRepairJob.Builder() + .withJmxProxyFactory(myJmxProxyFactory) + .withTableRepairMetrics(myTableRepairMetrics) + .withRepairLockType(RepairLockType.VNODE) + .withReplicationState(myReplicationState) + .withOngoingJob(myOngoingJob) + .withOnFinished(myHook) + .withNode(myNode) + .build(); + } +} + diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestOnDemandRepairSchedulerImpl.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestOnDemandRepairSchedulerImpl.java new file mode 100644 index 00000000..8f729a1d --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestOnDemandRepairSchedulerImpl.java @@ -0,0 +1,353 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.IncrementalOnDemandRepairJob; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OnDemandStatus; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OngoingJob; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.VnodeOnDemandRepairJob; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView; + +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduleManager; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob; +import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistory; +import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicationState; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException; +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.HashMap; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import static com.ericsson.bss.cassandra.ecchronos.core.impl.table.MockTableReferenceFactory.tableReference; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.*; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +@RunWith(MockitoJUnitRunner.class) +public class TestOnDemandRepairSchedulerImpl +{ + private static final TableReference TABLE_REFERENCE = tableReference("keyspace1", "table1"); + + @Mock + private DistributedJmxProxyFactory jmxProxyFactory; + + @Mock + private ScheduleManager scheduleManager; + + @Mock + private TableRepairMetrics myTableRepairMetrics; + + @Mock + private ReplicationState replicationState; + + @Mock + private RepairHistory repairHistory; + + @Mock + private Metadata metadata; + + @Mock + private CqlSession session; + + @Mock + private OnDemandStatus myOnDemandStatus; + + @Mock + private KeyspaceMetadata myKeyspaceMetadata; + + @Mock + private TableMetadata myTableMetadata; + + @Mock + private OngoingJob myOngingJob; + + @Mock + private Node myNode; + + private final UUID myNodeId = UUID.randomUUID(); + + @Before + public void setup() + { + when(session.getMetadata()).thenReturn(metadata); + when(myOngingJob.getTableReference()).thenReturn(TABLE_REFERENCE); + when(myOngingJob.getHostId()).thenReturn(myNodeId); + when(myOngingJob.getJobId()).thenReturn(UUID.randomUUID()); + when(myNode.getHostId()).thenReturn(myNodeId); + when(myOngingJob.getRepairType()).thenReturn(RepairType.VNODE); + when(myOnDemandStatus.getNodes()).thenReturn(Arrays.asList(myNode)); + } + + @Test + public void testScheduleVnodeRepairOnTable() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + when(metadata.getKeyspace(TABLE_REFERENCE.getKeyspace())).thenReturn(Optional.of(myKeyspaceMetadata)); + when(myKeyspaceMetadata.getTable(TABLE_REFERENCE.getTable())).thenReturn(Optional.of(myTableMetadata)); + + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + OnDemandRepairJobView repairJobView = repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.VNODE, myNode.getHostId()); + verify(scheduleManager).schedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + + assertTableViewExist(repairScheduler, repairJobView); + + repairScheduler.close(); + verify(scheduleManager).deschedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + verifyNoMoreInteractions(scheduleManager); + } + + @Test + public void testScheduleIncrementalRepairOnTable() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + when(metadata.getKeyspace(TABLE_REFERENCE.getKeyspace())).thenReturn(Optional.of(myKeyspaceMetadata)); + when(myKeyspaceMetadata.getTable(TABLE_REFERENCE.getTable())).thenReturn(Optional.of(myTableMetadata)); + + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + OnDemandRepairJobView repairJobView = repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.INCREMENTAL, myNode.getHostId()); + verify(scheduleManager).schedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + + assertTableViewExist(repairScheduler, repairJobView); + + repairScheduler.close(); + verify(scheduleManager).deschedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + verifyNoMoreInteractions(scheduleManager); + } + + @Test + public void testScheduleTwoVnodeRepairOnTable() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + when(metadata.getKeyspace(TABLE_REFERENCE.getKeyspace())).thenReturn(Optional.of(myKeyspaceMetadata)); + when(myKeyspaceMetadata.getTable(TABLE_REFERENCE.getTable())).thenReturn(Optional.of(myTableMetadata)); + + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + OnDemandRepairJobView repairJobView = repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.VNODE, myNode.getHostId()); + OnDemandRepairJobView repairJobView2 = repairScheduler.scheduleJob(TABLE_REFERENCE,RepairType.VNODE, myNode.getHostId()); + verify(scheduleManager, times(2)).schedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + + assertTableViewExist(repairScheduler, repairJobView, repairJobView2); + + repairScheduler.close(); + verify(scheduleManager, times(2)).deschedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + verifyNoMoreInteractions(scheduleManager); + } + + @Test + public void testScheduleTwoIncrementalRepairOnTable() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + when(metadata.getKeyspace(TABLE_REFERENCE.getKeyspace())).thenReturn(Optional.of(myKeyspaceMetadata)); + when(myKeyspaceMetadata.getTable(TABLE_REFERENCE.getTable())).thenReturn(Optional.of(myTableMetadata)); + + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + OnDemandRepairJobView repairJobView = repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.INCREMENTAL, myNode.getHostId()); + OnDemandRepairJobView repairJobView2 = repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.INCREMENTAL, myNode.getHostId()); + verify(scheduleManager, times(2)).schedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + + assertTableViewExist(repairScheduler, repairJobView, repairJobView2); + + repairScheduler.close(); + verify(scheduleManager, times(2)).deschedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + verifyNoMoreInteractions(scheduleManager); + } + + @Test + public void testRestartVnodeRepairOnTable() + { + when(myOngingJob.getRepairType()).thenReturn(RepairType.VNODE); + Map> allOnGoingJobs = new HashMap<>(); + Set ongoingJobs = new HashSet<>(); + ongoingJobs.add(myOngingJob); + allOnGoingJobs.put(myNodeId, ongoingJobs); + when(myOnDemandStatus.getOngoingStartedJobsForAllNodes(replicationState)).thenReturn(allOnGoingJobs); + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + verify(scheduleManager, timeout(1000)).schedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + + repairScheduler.close(); + verify(scheduleManager).deschedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + verifyNoMoreInteractions(scheduleManager); + } + + @Test + public void testRestartIncrementalRepairOnTable() + { + when(myOngingJob.getRepairType()).thenReturn(RepairType.INCREMENTAL); + when(replicationState.getReplicas(TABLE_REFERENCE, myNode)).thenReturn(ImmutableSet.of(mock(DriverNode.class))); + Map> allOnGoingJobs = new HashMap<>(); + Set ongoingJobs = new HashSet<>(); + ongoingJobs.add(myOngingJob); + allOnGoingJobs.put(myNodeId, ongoingJobs); + when(myOnDemandStatus.getOngoingStartedJobsForAllNodes(replicationState)).thenReturn(allOnGoingJobs); + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + + verify(scheduleManager, timeout(1000)).schedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + + repairScheduler.close(); + verify(scheduleManager).deschedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + verifyNoMoreInteractions(scheduleManager); + } + + @Test + public void testRestartVnodeRepairOnTableWithException() + { + Set ongoingJobs = new HashSet<>(); + ongoingJobs.add(myOngingJob); + List> errors = new ArrayList<>(); + when(myOngingJob.getRepairType()).thenReturn(RepairType.VNODE); + Map> allOnGoingJobs = new HashMap<>(); + allOnGoingJobs.put(myNodeId, ongoingJobs); + when(myOnDemandStatus.getOngoingStartedJobsForAllNodes(replicationState)).thenReturn(allOnGoingJobs); + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + + verify(scheduleManager, timeout(15000)).schedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + + repairScheduler.close(); + verify(scheduleManager).deschedule(eq(myNode.getHostId()), any(VnodeOnDemandRepairJob.class)); + + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + verifyNoMoreInteractions(scheduleManager); + } + + @Test + public void testRestartIncrementalRepairOnTableWithException() + { + Set ongoingJobs = new HashSet<>(); + ongoingJobs.add(myOngingJob); + List> errors = new ArrayList<>(); + when(myOngingJob.getRepairType()).thenReturn(RepairType.INCREMENTAL); + Map> allOnGoingJobs = new HashMap<>(); + allOnGoingJobs.put(myNodeId, ongoingJobs); + when(myOnDemandStatus.getOngoingStartedJobsForAllNodes(replicationState)).thenReturn(allOnGoingJobs); + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + + verify(scheduleManager, timeout(15000)).schedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + + repairScheduler.close(); + verify(scheduleManager).deschedule(eq(myNode.getHostId()), any(IncrementalOnDemandRepairJob.class)); + + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + verifyNoMoreInteractions(scheduleManager); + } + + @Test (expected = EcChronosException.class) + public void testScheduleVnodeRepairOnNonExistentKeyspaceTable() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(ScheduledJob.class)); + repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.VNODE, myNode.getHostId()); + } + + @Test (expected = EcChronosException.class) + public void testScheduleIncrementalRepairOnNonExistentKeyspaceTable() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(ScheduledJob.class)); + repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.INCREMENTAL, myNode.getHostId()); + } + + @Test (expected = EcChronosException.class) + public void testScheduleVnodeRepairOnNonExistentTable() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + when(metadata.getKeyspace(TABLE_REFERENCE.getKeyspace())).thenReturn(Optional.of(myKeyspaceMetadata)); + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(ScheduledJob.class)); + repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.VNODE, myNode.getHostId()); + } + + @Test (expected = EcChronosException.class) + public void testScheduleIncrementalRepairOnNonExistentTable() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + when(metadata.getKeyspace(TABLE_REFERENCE.getKeyspace())).thenReturn(Optional.of(myKeyspaceMetadata)); + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(ScheduledJob.class)); + repairScheduler.scheduleJob(TABLE_REFERENCE, RepairType.INCREMENTAL, myNode.getHostId()); + } + + @Test (expected = EcChronosException.class) + public void testScheduleVnodeRepairOnNull() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(ScheduledJob.class)); + repairScheduler.scheduleJob(null, RepairType.VNODE, myNode.getHostId()); + } + + @Test (expected = EcChronosException.class) + public void testScheduleIncrementalRepairOnNull() throws EcChronosException + { + OnDemandRepairSchedulerImpl repairScheduler = defaultOnDemandRepairSchedulerImplBuilder().build(); + verify(scheduleManager, never()).schedule(eq(myNode.getHostId()), any(ScheduledJob.class)); + repairScheduler.scheduleJob(null, RepairType.INCREMENTAL, myNode.getHostId()); + } + + private void assertTableViewExist(OnDemandRepairSchedulerImpl repairScheduler, OnDemandRepairJobView... expectedViews) + { + List repairJobViews = repairScheduler.getActiveRepairJobs(); + assertThat(repairJobViews).containsExactlyInAnyOrder(expectedViews); + } + + private OnDemandRepairSchedulerImpl.Builder defaultOnDemandRepairSchedulerImplBuilder() + { + return OnDemandRepairSchedulerImpl.builder() + .withJmxProxyFactory(jmxProxyFactory) + .withTableRepairMetrics(myTableRepairMetrics) + .withScheduleManager(scheduleManager) + .withReplicationState(replicationState) + .withSession(session) + .withRepairLockType(RepairLockType.VNODE) + .withRepairConfiguration(RepairConfiguration.DEFAULT) + .withRepairHistory(repairHistory) + .withOnDemandStatus(myOnDemandStatus); + } +} + diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduledJobQueue.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduledJobQueue.java index f41ab2c0..b0a85b7d 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduledJobQueue.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduledJobQueue.java @@ -77,7 +77,7 @@ public void testNonRunnableQueueIsEmpty() throws ScheduledJobException for (ScheduledJob job : queue) { - job.postExecute(true); + job.postExecute(true, null); } assertThat(queue.iterator()).toIterable().isEmpty(); diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/TestVnodeOnDemandRepairJob.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/TestVnodeOnDemandRepairJob.java new file mode 100644 index 00000000..ffe333d0 --- /dev/null +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/vnode/TestVnodeOnDemandRepairJob.java @@ -0,0 +1,246 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode; + + +import com.datastax.oss.driver.api.core.metadata.Node; +import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.OngoingJob; +import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.VnodeOnDemandRepairJob; +import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory; +import com.ericsson.bss.cassandra.ecchronos.core.metadata.DriverNode; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.OnDemandRepairJobView; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob; +import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledTask; +import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange; +import com.ericsson.bss.cassandra.ecchronos.core.state.RepairHistory; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import com.google.common.collect.ImmutableSet; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static com.ericsson.bss.cassandra.ecchronos.core.impl.table.MockTableReferenceFactory.tableReference; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.ignoreStubs; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TestVnodeOnDemandRepairJob +{ + private static final String keyspaceName = "keyspace"; + private static final String tableName = "table"; + + @Mock + private DistributedJmxProxyFactory myJmxProxyFactory; + + @Mock + private TableRepairMetrics myTableRepairMetrics; + + @Mock + private RepairHistory myRepairHistory; + + @Mock + private DriverNode mockReplica1; + + @Mock + private DriverNode mockReplica2; + + @Mock + private DriverNode mockReplica3; + + @Mock + private OngoingJob myOngoingJob; + + @Mock + private Node myNode; + + private final TableReference myTableReference = tableReference(keyspaceName, tableName); + private final UUID myHostId = UUID.randomUUID(); + + @Before + public void setup() + { + when(myOngoingJob.getTableReference()).thenReturn(myTableReference); + UUID uuid = UUID.randomUUID(); + when(myOngoingJob.getJobId()).thenReturn(uuid); + when(myOngoingJob.getHostId()).thenReturn(myHostId); + } + + @After + public void finalVerification() + { + verifyNoMoreInteractions(ignoreStubs(myJmxProxyFactory)); + verifyNoMoreInteractions(ignoreStubs(myTableRepairMetrics)); + } + + @Test + public void testJobCorrectlyReturned() + { + VnodeOnDemandRepairJob repairJob = createVnodeOnDemandRepairJob(0); + OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference, + OnDemandRepairJobView.Status.IN_QUEUE, 0, System.currentTimeMillis(), RepairType.VNODE); + assertThat(repairJob.getId()).isEqualTo(repairJob.getId()); + assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1); + assertThat(repairJob.getTableReference()).isEqualTo(myTableReference); + assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference()); + assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus()); + assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId()); + } + + @Test + public void testFailedJobCorrectlyReturned() + { + VnodeOnDemandRepairJob repairJob = createVnodeOnDemandRepairJob(0); + Iterator it = repairJob.iterator(); + repairJob.postExecute(false, it.next()); + OnDemandRepairJobView expectedView = new OnDemandRepairJobView(repairJob.getId(), myHostId, myTableReference, + OnDemandRepairJobView.Status.ERROR, 0, System.currentTimeMillis(), RepairType.VNODE); + assertThat(repairJob.getLastSuccessfulRun()).isEqualTo(-1); + assertThat(repairJob.getTableReference()).isEqualTo(myTableReference); + assertThat(repairJob.getView().getTableReference()).isEqualTo(expectedView.getTableReference()); + assertThat(repairJob.getView().getStatus()).isEqualTo(expectedView.getStatus()); + assertThat(repairJob.getView().getHostId()).isEqualTo(expectedView.getHostId()); + } + + @Test + public void testJobFinishedAfterExecution() + { + VnodeOnDemandRepairJob repairJob = createVnodeOnDemandRepairJob(0); + Iterator it = repairJob.iterator(); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.RUNNABLE); + repairJob.postExecute(true, it.next()); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.RUNNABLE); + repairJob.postExecute(true, it.next()); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.FINISHED); + } + + @Test + public void testJobFinishedAfterRestart() + { + VnodeOnDemandRepairJob repairJob = createRestartedOnDemandRepairJob(); + Iterator it = repairJob.iterator(); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.RUNNABLE); + repairJob.postExecute(true, it.next()); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.FINISHED); + } + + @Test + public void testJobFailedWhenTopologyChange() + { + VnodeOnDemandRepairJob repairJob = createVnodeOnDemandRepairJob(0); + when(myOngoingJob.hasTopologyChanged()).thenReturn(true); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.FAILED); + } + + @Test + public void testJobUnsuccessful() + { + VnodeOnDemandRepairJob repairJob = createVnodeOnDemandRepairJob(0); + Iterator it = repairJob.iterator(); + repairJob.postExecute(true, it.next()); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.RUNNABLE); + repairJob.postExecute(false, it.next()); + assertThat(repairJob.getState()).isEqualTo(ScheduledJob.State.FAILED); + } + + @Test + public void testGetProgress() + { + VnodeOnDemandRepairJob repairJobZeroProgress = createVnodeOnDemandRepairJob(0); + assertThat(repairJobZeroProgress.getProgress()).isEqualTo(0); + + VnodeOnDemandRepairJob repairJobHalfProgress = createVnodeOnDemandRepairJob(50); + assertThat(repairJobHalfProgress.getProgress()).isEqualTo(0.5); + + VnodeOnDemandRepairJob repairJobFullProgress = createVnodeOnDemandRepairJob(100); + assertThat(repairJobFullProgress.getProgress()).isEqualTo(1.0); + + when(repairJobHalfProgress.getOngoingJob().getStatus()).thenReturn(OngoingJob.Status.finished); + assertThat(repairJobHalfProgress.getProgress()).isEqualTo(1.0); + } + + private VnodeOnDemandRepairJob createVnodeOnDemandRepairJob(int repairedTokenPercentage) + { + LongTokenRange range1 = new LongTokenRange(1, 2); + LongTokenRange range2 = new LongTokenRange(1, 3); + Map> tokenRangeToReplicas = new HashMap<>(); + tokenRangeToReplicas.put(range1, + ImmutableSet.of(mockReplica1, mockReplica2, mockReplica3)); + tokenRangeToReplicas.put(range2, + ImmutableSet.of(mockReplica1, mockReplica2)); + + Set repairedTokens = new HashSet<>(); + if (repairedTokenPercentage >= 50) + { + repairedTokens.add(range1); + } + if (repairedTokenPercentage == 100) + { + repairedTokens.add(range2); + } + + when(myOngoingJob.getTokens()).thenReturn(tokenRangeToReplicas); + when(myOngoingJob.getRepairedTokens()).thenReturn(repairedTokens); + + return new VnodeOnDemandRepairJob.Builder() + .withJmxProxyFactory(myJmxProxyFactory) + .withTableRepairMetrics(myTableRepairMetrics) + .withRepairLockType(RepairLockType.VNODE) + .withRepairHistory(myRepairHistory) + .withOngoingJob(myOngoingJob) + .withNode(myNode) + .build(); + } + + private VnodeOnDemandRepairJob createRestartedOnDemandRepairJob() + { + LongTokenRange range1 = new LongTokenRange(1, 2); + LongTokenRange range2 = new LongTokenRange(1, 3); + Map> tokenRangeToReplicas = new HashMap<>(); + tokenRangeToReplicas.put(range1, + ImmutableSet.of(mockReplica1, mockReplica2, mockReplica3)); + tokenRangeToReplicas.put(range2, + ImmutableSet.of(mockReplica1, mockReplica2)); + when(myOngoingJob.getTokens()).thenReturn(tokenRangeToReplicas); + + Set repairedTokens = new HashSet<>(); + repairedTokens.add(range1); + when(myOngoingJob.getRepairedTokens()).thenReturn(repairedTokens); + + return new VnodeOnDemandRepairJob.Builder() + .withJmxProxyFactory(myJmxProxyFactory) + .withTableRepairMetrics(myTableRepairMetrics) + .withRepairLockType(RepairLockType.VNODE) + .withRepairHistory(myRepairHistory) + .withOngoingJob(myOngoingJob) + .withNode(myNode) + .build(); + } +} + diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java index 17efb259..9f358d39 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/locks/LockFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Telefonaktiebolaget LM Ericsson + * Copyright 2024 Telefonaktiebolaget LM Ericsson * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ public interface LockFactory * @param hostId * The hostId. * @return The lock if able to lock the resource. + * @throws LockException If an error occurs while trying to acquire the lock. */ DistributedLock tryLock(String dataCenter, String resource, int priority, Map metadata, UUID hostId) throws LockException; @@ -54,7 +55,7 @@ DistributedLock tryLock(String dataCenter, String resource, int priority, Map getLockMetadata(String dataCenter, String resource) throws LockException; diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/OnDemandRepairJobView.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/OnDemandRepairJobView.java new file mode 100644 index 00000000..595e6793 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/OnDemandRepairJobView.java @@ -0,0 +1,199 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler; + +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import java.util.Objects; +import java.util.UUID; + +/** + * Represents an on-demand repair job view. + * This class encapsulates the status and progress of an on-demand repair job, + * along with related metadata like the host ID, completion time, and repair type. + */ +public class OnDemandRepairJobView +{ + /** + * Enum representing the possible statuses of an on-demand repair job. + */ + public enum Status + { + /** + * Represents a completed repair job. + */ + COMPLETED, + + /** + * Represents a repair job that is currently in the queue and waiting to be processed. + */ + IN_QUEUE, + + /** + * Represents a repair job that has encountered a warning during execution. + */ + WARNING, + + /** + * Represents a repair job that has encountered an error during execution. + */ + ERROR, + + /** + * Represents a repair job that is blocked and cannot proceed. + */ + BLOCKED + } + + private final UUID myId; + private final TableReference myTableReference; + private final Status myStatus; + private final double myProgress; + private final UUID myHostId; + private final long myCompletionTime; + private final RepairType myRepairType; + + /** + * Constructor for OnDemandRepairJobView. + * + * @param id The UUID representing the repair job ID. + * @param hostId The UUID representing the host ID associated with the repair. + * @param tableReference The table reference for the repair job. + * @param status The status of the repair job. + * @param progress The progress of the repair job, between 0.0 and 1.0. + * @param completionTime The completion time of the repair job in milliseconds. + * @param repairType The type of repair being performed. + */ + public OnDemandRepairJobView(final UUID id, + final UUID hostId, + final TableReference tableReference, + final Status status, + final double progress, + final long completionTime, + final RepairType repairType) + { + myId = id; + myTableReference = tableReference; + myStatus = status; + myProgress = progress; + myHostId = hostId; + myCompletionTime = completionTime; + myRepairType = repairType; + } + + /** + * Get id. + * + * @return UUID + */ + public UUID getId() + { + return myId; + } + + /** + * Get table reference. + * + * @return TableReference + */ + public TableReference getTableReference() + { + return myTableReference; + } + + /** + * Get status. + * + * @return Status + */ + public Status getStatus() + { + return myStatus; + } + + /** + * Get progress. + * + * @return double + */ + public double getProgress() + { + return myProgress; + } + + /** + * Get host id. + * + * @return UUID + */ + public UUID getHostId() + { + return myHostId; + } + + /** + * Get completion time. + * + * @return long + */ + public long getCompletionTime() + { + return myCompletionTime; + } + + /** + * Get repair type. + * + * @return RepairType + */ + public RepairType getRepairType() + { + return myRepairType; + } + + /** + * Equality check. + * + * @return boolean + */ + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + OnDemandRepairJobView that = (OnDemandRepairJobView) o; + return Double.compare(that.myProgress, myProgress) == 0 && myCompletionTime == that.myCompletionTime + && Objects.equals(myId, that.myId) && Objects.equals(myTableReference, that.myTableReference) + && myStatus == that.myStatus && Objects.equals(myHostId, that.myHostId) + && Objects.equals(myRepairType, that.myRepairType); + } + + /** + * Hash representation of the object. + * + * @return int + */ + @Override + public int hashCode() + { + return Objects.hash(myId, myTableReference, myStatus, myProgress, myHostId, myCompletionTime, myRepairType); + } +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/OnDemandRepairScheduler.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/OnDemandRepairScheduler.java new file mode 100644 index 00000000..979c4f66 --- /dev/null +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/OnDemandRepairScheduler.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler; + +import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference; +import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.EcChronosException; +import java.util.List; +import java.util.UUID; + +/** + * A factory that creates a {@link OnDemandRepairJobView} based on a {@link TableReference}. The job will deschedule itself + * automatically when it completes. + */ +public interface OnDemandRepairScheduler +{ + /** + * Create a repair that is slated to run once for a specified table. + * + * @param tableReference The table to schedule a job on. + * @param repairType The type of the repair. + * @param nodeId The ID of the node on which the repair should run. + * @return A view of the scheduled job. + * @throws EcChronosException Thrown when the keyspace/table doesn't exist. + */ + OnDemandRepairJobView scheduleJob(TableReference tableReference, + RepairType repairType, + UUID nodeId) throws EcChronosException; + + /** + * Create a repair that is slated to run once for a specified table for all replicas. + * + * @param tableReference The table to schedule a job on. + * @param repairType The type of the repair. + * @return Views of the scheduled job. + * @throws EcChronosException Thrown when the keyspace/table doesn't exist. + */ + List scheduleClusterWideJob(TableReference tableReference, + RepairType repairType) throws EcChronosException; + + /** + * Retrieves all cluster-wide repair jobs. + * + * @return A list of all cluster-wide repair jobs. + */ + List getAllClusterWideRepairJobs(); + + /** + * Retrieves all repair jobs for a specific host. + * + * @param hostId The ID of the host to retrieve repair jobs for. + * @return A list of all repair jobs for the specified host. + */ + List getAllRepairJobs(UUID hostId); +} diff --git a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/ScheduledJob.java b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/ScheduledJob.java index d0d1abaf..fcf58643 100644 --- a/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/ScheduledJob.java +++ b/core/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/repair/scheduler/ScheduledJob.java @@ -59,7 +59,7 @@ public ScheduledJob(final Configuration configuration, final UUID id) * @param successful * If the job ran successfully. */ - public void postExecute(final boolean successful) + public void postExecute(final boolean successful, final ScheduledTask task) { if (successful) {