Skip to content

Commit

Permalink
Introduce locks in Repair group and repair scheduler
Browse files Browse the repository at this point in the history
- introduced repair lock factory in repair group
  and repair scheduler
  • Loading branch information
sajid riaz committed Nov 21, 2024
1 parent 4554d61 commit 7baebf0
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.ericsson.bss.cassandra.ecchronos.application.config.repair;

import com.ericsson.bss.cassandra.ecchronos.application.spring.AbstractRepairConfigurationProvider;

import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Locale;
Expand Down Expand Up @@ -47,6 +48,18 @@ public final void setRepairConfigurationClass(final Class<? extends AbstractRepa
myRepairConfigurationClass = repairConfigurationClass;
}

@JsonProperty("lock_type")
public final RepairLockType getRepairLockType()
{
return myRepairLockType;
}

@JsonProperty("lock_type")
public final void setRepairLockType(final String repairLockType)
{
myRepairLockType = RepairLockType.valueOf(repairLockType.toUpperCase(Locale.US));
}

@JsonProperty("history_lookback")
public final Interval getRepairHistoryLookback()
{
Expand All @@ -70,17 +83,5 @@ public final void setRepairHistory(final RepairHistory repairHistory)
{
myRepairHistory = repairHistory;
}

@JsonProperty("lock_type")
public final RepairLockType getRepairLockType()
{
return myRepairLockType;
}

@JsonProperty("lock_type")
public final void setRepairLockType(final String repairLockType)
{
myRepairLockType = RepairLockType.valueOf(repairLockType.toUpperCase(Locale.US));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public ECChronos(
.withRepairHistory(repairHistoryService)
.withFaultReporter(repairFaultReporter)
.withTableStorageStates(myECChronosInternals.getTableStorageStates())
.withRepairLockType(configuration.getRepairConfig().getRepairLockType())
.build();

AbstractRepairConfigurationProvider repairConfigurationProvider = new FileBasedRepairConfiguration(applicationContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public enum RepairLockType
myRepairLockingFactoryProvider = repairLockingProvider;
}

RepairResourceFactory getLockFactory()
public RepairResourceFactory getLockFactory()
{
return myRepairLockingFactoryProvider.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.incremental.IncrementalRepairTask;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.vnode.VnodeRepairTask;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory;
import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairResource;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairResourceFactory;
import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledTask;
import com.ericsson.bss.cassandra.ecchronos.core.state.LongTokenRange;
Expand All @@ -29,15 +33,19 @@
import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairPolicy;
import com.ericsson.bss.cassandra.ecchronos.data.repairhistory.RepairHistoryService;
import com.ericsson.bss.cassandra.ecchronos.utils.enums.repair.RepairType;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.ScheduledJobException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +56,8 @@
public class RepairGroup extends ScheduledTask
{
private static final Logger LOG = LoggerFactory.getLogger(RepairGroup.class);
private static final String LOCK_METADATA_KEYSPACE = "keyspace";
private static final String LOCK_METADATA_TABLE = "table";

private final TableReference myTableReference;
private RepairHistory myRepairHistory;
Expand All @@ -59,6 +69,8 @@ public class RepairGroup extends ScheduledTask
private BigInteger myTokensPerRepair;
private final UUID myJobId;
private Node myNode;
private final RepairLockFactory myRepairLockFactory;
private final RepairResourceFactory myRepairResourceFactory;

/**
* Constructs an IncrementalRepairTask for a specific node and table.
Expand All @@ -82,6 +94,10 @@ public RepairGroup(final int priority, final Builder builder)
.checkNotNull(builder.myTableRepairMetrics, "Table repair metrics must be set");
myRepairPolicies = new ArrayList<>(Preconditions
.checkNotNull(builder.myRepairPolicies, "Repair policies must be set"));
myRepairLockFactory = Preconditions
.checkNotNull(builder.myRepairLockFactory, "Repair lock factory must be set");
myRepairResourceFactory = Preconditions
.checkNotNull(builder.myRepairResourceFactory, "Repair resource factory must be set");

if (!RepairType.INCREMENTAL.equals(myRepairConfiguration.getRepairType()))
{
Expand Down Expand Up @@ -148,6 +164,24 @@ private boolean shouldContinue()
return myRepairPolicies.stream().allMatch(repairPolicy -> repairPolicy.shouldRun(myTableReference));
}

/**
* Get lock for the keyspace and table.
*
* @param lockFactory The lock factory to use.
* @param nodeId
* @return LockFactory.DistributedLock
* @throws LockException Lock factory unable to get a lock.
*/
@Override
public LockFactory.DistributedLock getLock(final LockFactory lockFactory, final UUID nodeId) throws LockException
{
Map<String, String> metadata = new HashMap<>();
metadata.put(LOCK_METADATA_KEYSPACE, myTableReference.getKeyspace());
metadata.put(LOCK_METADATA_TABLE, myTableReference.getTable());

Set<RepairResource> repairResources = myRepairResourceFactory.getRepairResources(myReplicaRepairGroup);
return myRepairLockFactory.getLock(lockFactory, repairResources, metadata, myPriority, nodeId);
}
/**
* String representation.
*
Expand Down Expand Up @@ -219,7 +253,32 @@ public static class Builder
private RepairHistoryService myRepairHistory;
private Node myNode;
private UUID myJobId;
private RepairLockFactory myRepairLockFactory;
private RepairResourceFactory myRepairResourceFactory;

/**
* Build with repair lock factory.
*
* @param repairLockFactory Repair lock factory.
* @return Builder
*/
public Builder withRepairLockFactory(final RepairLockFactory repairLockFactory)
{
myRepairLockFactory = repairLockFactory;
return this;
}

/**
* Build with repair resource factory.
*
* @param repairResourceFactory Repair resource factory.
* @return Builder
*/
public Builder withRepairResourceFactory(final RepairResourceFactory repairResourceFactory)
{
myRepairResourceFactory = repairResourceFactory;
return this;
}

/**
* Build with table reference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler;
package com.ericsson.bss.cassandra.ecchronos.core.impl.repair;

import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory;
import com.ericsson.bss.cassandra.ecchronos.core.repair.RepairLockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledRepairJobView;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableReference;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.table.TableRepairPolicy;
Expand All @@ -26,26 +30,30 @@

public abstract class ScheduledRepairJob extends ScheduledJob
{
protected static final RepairLockFactory REPAIR_LOCK_FACTORY = new RepairLockFactoryImpl();
private final TableReference myTableReference;
private final DistributedJmxProxyFactory myJmxProxyFactory;
private final RepairConfiguration myRepairConfiguration;
private final List<TableRepairPolicy> myRepairPolicies;
private final TableRepairMetrics myTableRepairMetrics;
private final RepairLockType myRepairLockType;

public ScheduledRepairJob(
final Configuration configuration,
final TableReference tableReference,
final DistributedJmxProxyFactory jmxProxyFactory,
final RepairConfiguration repairConfiguration,
final List<TableRepairPolicy> repairPolicies,
final TableRepairMetrics tableRepairMetrics)
final TableRepairMetrics tableRepairMetrics,
final RepairLockType repairLockType)
{
super(configuration);
myTableReference = Preconditions.checkNotNull(tableReference, "Table reference must be set");
myJmxProxyFactory = Preconditions.checkNotNull(jmxProxyFactory, "JMX proxy factory must be set");
myRepairConfiguration = Preconditions.checkNotNull(repairConfiguration, "Repair configuration must be set");
myRepairPolicies = Preconditions.checkNotNull(repairPolicies, "Repair policies must be set");
myTableRepairMetrics = Preconditions.checkNotNull(tableRepairMetrics, "Table repair metrics must be set");
myRepairLockType = Preconditions.checkNotNull(repairLockType, "Repair lock type must be set");
}

public ScheduledRepairJob(
Expand All @@ -55,14 +63,21 @@ public ScheduledRepairJob(
final DistributedJmxProxyFactory jmxProxyFactory,
final RepairConfiguration repairConfiguration,
final List<TableRepairPolicy> repairPolicies,
final TableRepairMetrics tableRepairMetrics)
final TableRepairMetrics tableRepairMetrics,
final RepairLockType repairLockType)
{
super(configuration, id);
myTableReference = Preconditions.checkNotNull(tableReference, "Table reference must be set");
myJmxProxyFactory = Preconditions.checkNotNull(jmxProxyFactory, "JMX proxy factory must be set");
myRepairConfiguration = Preconditions.checkNotNull(repairConfiguration, "Repair configuration must be set");
myRepairPolicies = Preconditions.checkNotNull(repairPolicies, "Repair policies must be set");
myTableRepairMetrics = Preconditions.checkNotNull(tableRepairMetrics, "Table repair metrics must be set");
myRepairLockType = Preconditions.checkNotNull(repairLockType, "Repair lock type must be set");
}

protected final RepairLockType getRepairLockType()
{
return myRepairLockType;
}

/**
Expand Down Expand Up @@ -123,7 +138,7 @@ public boolean equals(final Object o)
myJmxProxyFactory, that.myJmxProxyFactory) && Objects.equals(myRepairConfiguration,
that.myRepairConfiguration) && Objects.equals(
myRepairPolicies, that.myRepairPolicies) && Objects.equals(myTableRepairMetrics,
that.myTableRepairMetrics);
that.myTableRepairMetrics) && myRepairLockType == that.myRepairLockType;
}

/**
Expand All @@ -133,7 +148,7 @@ public boolean equals(final Object o)
public int hashCode()
{
return Objects.hash(super.hashCode(), myTableReference, myJmxProxyFactory, myRepairConfiguration,
myRepairPolicies, myTableRepairMetrics);
myRepairPolicies, myTableRepairMetrics, myRepairLockType);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.incremental;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.RepairGroup;
import com.ericsson.bss.cassandra.ecchronos.core.jmx.DistributedJmxProxyFactory;
import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledRepairJob;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.ScheduledRepairJob;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledRepairJobView;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledTask;
import com.ericsson.bss.cassandra.ecchronos.core.state.ReplicaRepairGroup;
Expand Down Expand Up @@ -56,7 +57,7 @@ public class IncrementalRepairJob extends ScheduledRepairJob
IncrementalRepairJob(final Builder builder)
{
super(builder.myConfiguration, builder.myTableReference, builder.myJmxProxyFactory,
builder.myRepairConfiguration, builder.myRepairPolicies, builder.myTableRepairMetrics);
builder.myRepairConfiguration, builder.myRepairPolicies, builder.myTableRepairMetrics, builder.myRepairLockType);
myNode = Preconditions.checkNotNull(builder.myNode, "Node must be set");
myReplicationState = Preconditions.checkNotNull(builder.myReplicationState, "Replication state must be set");
myCassandraMetrics = Preconditions.checkNotNull(builder.myCassandraMetrics, "Cassandra metrics must be set");
Expand Down Expand Up @@ -132,6 +133,8 @@ public Iterator<ScheduledTask> iterator()
.withJmxProxyFactory(getJmxProxyFactory())
.withTableRepairMetrics(getTableRepairMetrics())
.withReplicaRepairGroup(replicaRepairGroup)
.withRepairLockFactory(REPAIR_LOCK_FACTORY)
.withRepairResourceFactory(getRepairLockType().getLockFactory())
.withRepairPolicies(getRepairPolicies()).withJobId(getId());
List<ScheduledTask> taskList = new ArrayList<>();
taskList.add(builder.build(getRealPriority()));
Expand Down Expand Up @@ -205,6 +208,20 @@ public static class Builder
private RepairConfiguration myRepairConfiguration = RepairConfiguration.DEFAULT;
private final List<TableRepairPolicy> myRepairPolicies = new ArrayList<>();
private CassandraMetrics myCassandraMetrics;
private RepairLockType myRepairLockType;

/**
* Build with repair lock type.
*
* @param repairLockType
* Repair lock type.
* @return Builder
*/
public Builder withRepairLockType(final RepairLockType repairLockType)
{
myRepairLockType = repairLockType;
return this;
}

/**
* Build with configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.ericsson.bss.cassandra.ecchronos.core.impl.repair.scheduler;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.ericsson.bss.cassandra.ecchronos.core.impl.locks.RepairLockType;
import com.ericsson.bss.cassandra.ecchronos.core.impl.metrics.CassandraMetrics;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.incremental.IncrementalRepairJob;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.state.AlarmPostUpdateHook;
Expand All @@ -23,7 +24,7 @@
import com.ericsson.bss.cassandra.ecchronos.core.repair.config.RepairConfiguration;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.RepairScheduler;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduleManager;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledRepairJob;
import com.ericsson.bss.cassandra.ecchronos.core.impl.repair.ScheduledRepairJob;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledRepairJobView;
import com.ericsson.bss.cassandra.ecchronos.core.repair.scheduler.ScheduledJob;
import com.ericsson.bss.cassandra.ecchronos.core.state.RepairState;
Expand Down Expand Up @@ -80,6 +81,7 @@ public final class RepairSchedulerImpl implements RepairScheduler, Closeable
private final CassandraMetrics myCassandraMetrics;
private final List<TableRepairPolicy> myRepairPolicies;
private final TableStorageStates myTableStorageStates;
private final RepairLockType myRepairLockType;

private Set<ScheduledRepairJob> validateScheduleMap(final UUID nodeID, final TableReference tableReference)
{
Expand Down Expand Up @@ -107,6 +109,7 @@ private RepairSchedulerImpl(final Builder builder)
myCassandraMetrics = builder.myCassandraMetrics;
myRepairHistoryService = builder.myRepairHistoryService;
myTableStorageStates = builder.myTableStorageStates;
myRepairLockType = builder.myRepairLockType;
}

@Override
Expand Down Expand Up @@ -305,6 +308,7 @@ private ScheduledRepairJob createScheduledRepairJob(
.withCassandraMetrics(myCassandraMetrics)
.withReplicationState(myReplicationState)
.withRepairPolices(myRepairPolicies)
.withRepairLockType(myRepairLockType)
.build();
}
else
Expand All @@ -325,6 +329,7 @@ private ScheduledRepairJob createScheduledRepairJob(
.withTableStorageStates(myTableStorageStates)
.withRepairPolices(myRepairPolicies)
.withRepairHistory(myRepairHistoryService)
.withRepairLockType(myRepairLockType)
.withNode(node)
.build();
}
Expand Down Expand Up @@ -357,6 +362,19 @@ public static class Builder
private TableRepairMetrics myTableRepairMetrics;
private RepairHistoryService myRepairHistoryService;
private TableStorageStates myTableStorageStates;
private RepairLockType myRepairLockType;

/**
* RepairSchedulerImpl build with repair lock type.
*
* @param repairLockType Repair lock type.
* @return Builder
*/
public Builder withRepairLockType(final RepairLockType repairLockType)
{
myRepairLockType = repairLockType;
return this;
}

/**
* RepairSchedulerImpl build with fault reporter.
Expand Down
Loading

0 comments on commit 7baebf0

Please sign in to comment.