Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Apr 22, 2024
1 parent f42225b commit e0363e3
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,13 @@ public interface SourceCoordinator<T> {

/**
* Should be called by the source when it is shutting down to indicate that it will no longer be able to perform work on partitions,
* or can be called to give up ownership of its partitions in order to pick up new ones with {@link #getNextPartition(Function)} ()}.
* or can be called to give up ownership of a partition in order to pick up new ones with {@link #getNextPartition(Function)} ()}.
* This is used when source coordinator is shared by multiple threads.
* @param sourcePartition source partition to giveup ownership
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException if the partition could not be given up due to some failure
* @since 2.2
* @since 2.8
*/
void giveUpPartitions();
void giveUpPartition(SourcePartition<T> sourcePartition);


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.concurrent.locks.ReentrantLock;

public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {

Expand Down Expand Up @@ -66,7 +67,6 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {

private final SourceCoordinationConfig sourceCoordinationConfig;
private final SourceCoordinationStore sourceCoordinationStore;
private final PartitionManager<T> partitionManager;

private final Class<T> partitionProgressStateClass;
private final String ownerId;
Expand All @@ -89,6 +89,7 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
private final Counter saveStatePartitionUpdateErrorCounter;
private final Counter closePartitionUpdateErrorCounter;
private final Counter completePartitionUpdateErrorCounter;
private final ReentrantLock lock;

static {
try {
Expand All @@ -101,13 +102,11 @@ public class LeaseBasedSourceCoordinator<T> implements SourceCoordinator<T> {
public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
final SourceCoordinationStore sourceCoordinationStore,
final SourceCoordinationConfig sourceCoordinationConfig,
final PartitionManager<T> partitionManager,
final String sourceIdentifier,
final PluginMetrics pluginMetrics) {
this.sourceCoordinationConfig = sourceCoordinationConfig;
this.sourceCoordinationStore = sourceCoordinationStore;
this.partitionProgressStateClass = partitionProgressStateClass;
this.partitionManager = partitionManager;
this.sourceIdentifier = Objects.nonNull(sourceCoordinationConfig.getPartitionPrefix()) ?
sourceCoordinationConfig.getPartitionPrefix() + "|" + sourceIdentifier :
sourceIdentifier;
Expand All @@ -128,6 +127,7 @@ public LeaseBasedSourceCoordinator(final Class<T> partitionProgressStateClass,
this.saveStatePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, SAVE_STATE_ACTION);
this.closePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, CLOSE_ACTION);
this.completePartitionUpdateErrorCounter = pluginMetrics.counter(PARTITION_UPDATE_ERROR_COUNT, COMPLETE_ACTION);
this.lock = new ReentrantLock();
}

@Override
Expand All @@ -141,26 +141,31 @@ public void initialize() {
public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier) {
validateIsInitialized();

if (partitionManager.getActivePartition().isPresent()) {
return partitionManager.getActivePartition();
}
Optional<SourcePartitionStoreItem> ownedPartitions = Optional.empty();
try {
if (lock.tryLock()) {
ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);

Optional<SourcePartitionStoreItem> ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
if (ownedPartitions.isEmpty()) {

if (ownedPartitions.isEmpty()) {
final Optional<SourcePartitionStoreItem> acquiredGlobalStateForPartitionCreation = acquireGlobalStateForPartitionCreation();

final Optional<SourcePartitionStoreItem> acquiredGlobalStateForPartitionCreation = acquireGlobalStateForPartitionCreation();
if (acquiredGlobalStateForPartitionCreation.isPresent()) {
final Map<String, Object> globalStateMap = convertStringToGlobalStateMap(acquiredGlobalStateForPartitionCreation.get().getPartitionProgressState());
LOG.info("Partition owner {} did not acquire any partitions. Running partition creation supplier to create more partitions", ownerId);
createPartitions(partitionCreationSupplier.apply(globalStateMap));
partitionCreationSupplierInvocationsCounter.increment();

if (acquiredGlobalStateForPartitionCreation.isPresent()) {
final Map<String, Object> globalStateMap = convertStringToGlobalStateMap(acquiredGlobalStateForPartitionCreation.get().getPartitionProgressState());
LOG.info("Partition owner {} did not acquire any partitions. Running partition creation supplier to create more partitions", ownerId);
createPartitions(partitionCreationSupplier.apply(globalStateMap));
partitionCreationSupplierInvocationsCounter.increment();
giveUpAndSaveGlobalStateForPartitionCreation(acquiredGlobalStateForPartitionCreation.get(), globalStateMap);
}

giveUpAndSaveGlobalStateForPartitionCreation(acquiredGlobalStateForPartitionCreation.get(), globalStateMap);
ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
}
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}

ownedPartitions = sourceCoordinationStore.tryAcquireAvailablePartition(sourceIdentifierWithPartitionType, ownerId, DEFAULT_LEASE_TIMEOUT);
}

if (ownedPartitions.isEmpty()) {
Expand All @@ -175,7 +180,6 @@ public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String,
.withPartitionClosedCount(ownedPartitions.get().getClosedCount())
.build();

partitionManager.setActivePartition(sourcePartition);

LOG.debug("Partition key {} was acquired by owner {}", sourcePartition.getPartitionKey(), ownerId);
partitionsAcquiredCounter.increment();
Expand Down Expand Up @@ -225,10 +229,6 @@ public void completePartition(final String partitionKey, final Boolean fromAckno
throw e;
}

if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was completed by owner {}.", partitionKey, ownerId);
partitionsCompletedCounter.increment();
}
Expand Down Expand Up @@ -269,10 +269,6 @@ public void closePartition(final String partitionKey, final Duration reopenAfter
partitionsClosedCounter.increment();
}

if (!fromAcknowledgmentsCallback) {
partitionManager.removeActivePartition();
}

LOG.info("Partition key {} was closed by owner {}. The resulting status of the partition is now {}", partitionKey, ownerId, itemToUpdate.getSourcePartitionStatus());
}

Expand Down Expand Up @@ -309,40 +305,34 @@ public void updatePartitionForAcknowledgmentWait(final String partitionKey, fina
itemToUpdate.setPartitionOwnershipTimeout(Instant.now().plus(ackowledgmentTimeout));

sourceCoordinationStore.tryUpdateSourcePartitionItem(itemToUpdate);

partitionManager.removeActivePartition();
}

@Override
public void giveUpPartitions() {
public void giveUpPartition(SourcePartition<T> sourcePartition) {

if (!initialized) {
return;
}

final Optional<SourcePartition<T>> activePartition = partitionManager.getActivePartition();
if (activePartition.isPresent()) {
final Optional<SourcePartitionStoreItem> optionalItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, activePartition.get().getPartitionKey());
if (optionalItem.isPresent()) {
final SourcePartitionStoreItem updateItem = optionalItem.get();
validatePartitionOwnership(updateItem);

updateItem.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);
updateItem.setPartitionOwner(null);
updateItem.setPartitionOwnershipTimeout(null);

try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to explicitly give up partition {}. Partition can be considered given up.", updateItem.getSourcePartitionKey());
}
final Optional<SourcePartitionStoreItem> optionalItem = sourceCoordinationStore.getSourcePartitionItem(sourceIdentifierWithPartitionType, sourcePartition.getPartitionKey());
if (optionalItem.isPresent()) {
final SourcePartitionStoreItem updateItem = optionalItem.get();
validatePartitionOwnership(updateItem);

updateItem.setSourcePartitionStatus(SourcePartitionStatus.UNASSIGNED);
updateItem.setPartitionOwner(null);
updateItem.setPartitionOwnershipTimeout(null);

LOG.info("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId);
try {
sourceCoordinationStore.tryUpdateSourcePartitionItem(updateItem);
} catch (final PartitionUpdateException e) {
LOG.info("Unable to explicitly give up partition {}. Partition can be considered given up.", updateItem.getSourcePartitionKey());
}
partitionManager.removeActivePartition();
partitionsGivenUpCounter.increment();


LOG.info("Partition key {} was given up by owner {}", updateItem.getSourcePartitionKey(), ownerId);
}
partitionsGivenUpCounter.increment();
}

private T convertStringToPartitionProgressStateClass(final String serializedPartitionProgressState) {
Expand Down Expand Up @@ -380,14 +370,8 @@ private Map<String, Object> convertStringToGlobalStateMap(final String serialize
}
}

private boolean isActivelyOwnedPartition(final String partitionKey) {
final Optional<SourcePartition<T>> activePartition = partitionManager.getActivePartition();
return activePartition.isPresent() && activePartition.get().getPartitionKey().equals(partitionKey);
}

private void validatePartitionOwnership(final SourcePartitionStoreItem item) {
if (Objects.isNull(item.getPartitionOwner()) || !ownerId.equals(item.getPartitionOwner())) {
partitionManager.removeActivePartition();
partitionNotOwnedErrorCounter.increment();
throw new PartitionNotOwnedException(String.format("The partition is no longer owned by this instance of Data Prepper. " +
"The partition ownership timeout most likely expired and was grabbed by another instance of Data Prepper for partition owner %s and partition key %s.",
Expand All @@ -396,12 +380,6 @@ private void validatePartitionOwnership(final SourcePartitionStoreItem item) {
}

private SourcePartitionStoreItem validateAndGetSourcePartitionStoreItem(final String partitionKey, final String action) {
if (!isActivelyOwnedPartition(partitionKey)) {
partitionNotOwnedErrorCounter.increment();
throw new PartitionNotOwnedException(
String.format("Unable to %s for the partition because partition key %s is not owned by this instance of Data Prepper for owner %s", action, partitionKey, ownerId)
);
}

return getSourcePartitionStoreItem(partitionKey, action);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public <T> SourceCoordinator<T> provideSourceCoordinator(final Class<T> clazz, f

LOG.info("Creating LeaseBasedSourceCoordinator with coordination store {} for sub-pipeline {}",
sourceCoordinationConfig.getSourceCoordinationStoreConfig().getName(), subPipelineName);
return new LeaseBasedSourceCoordinator<T>(clazz, sourceCoordinationStore, sourceCoordinationConfig, new PartitionManager<>(), subPipelineName, sourceCoordinatorMetrics);
return new LeaseBasedSourceCoordinator<T>(clazz, sourceCoordinationStore, sourceCoordinationConfig, subPipelineName, sourceCoordinatorMetrics);
}

public EnhancedSourceCoordinator provideEnhancedSourceCoordinator(final Function<SourcePartitionStoreItem, EnhancedSourcePartition> partitionFactory,
Expand Down
Loading

0 comments on commit e0363e3

Please sign in to comment.