Skip to content

Commit

Permalink
Rebased to latest
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jun 13, 2024
1 parent af7d1b5 commit 6102091
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.peerforwarder;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient;
import org.opensearch.dataprepper.peerforwarder.discovery.DiscoveryMode;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class DefaultPeerForwarderProvider implements PeerForwarderProvider {

private final PeerForwarderClientFactory peerForwarderClientFactory;
private final PeerForwarderClient peerForwarderClient;
private final PeerForwarderConfiguration peerForwarderConfiguration;
private final PluginMetrics pluginMetrics;
private final Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap = new HashMap<>();
private HashRing hashRing;

DefaultPeerForwarderProvider(final PeerForwarderClientFactory peerForwarderClientFactory,
final PeerForwarderClient peerForwarderClient,
final PeerForwarderConfiguration peerForwarderConfiguration,
final PluginMetrics pluginMetrics) {
this.peerForwarderClientFactory = peerForwarderClientFactory;
this.peerForwarderClient = peerForwarderClient;
this.peerForwarderConfiguration = peerForwarderConfiguration;
this.pluginMetrics = pluginMetrics;
}

public PeerForwarder register(final String pipelineName, final Processor processor, final String pluginId, final Set<String> identificationKeys,
final Integer pipelineWorkerThreads) {
if (pipelinePeerForwarderReceiveBufferMap.containsKey(pipelineName) &&
pipelinePeerForwarderReceiveBufferMap.get(pipelineName).containsKey(pluginId)) {
throw new RuntimeException("Data Prepper 2.0 will only support a single peer-forwarder per pipeline/plugin type");
}

final PeerForwarderReceiveBuffer<Record<Event>> peerForwarderReceiveBuffer = createBufferPerPipelineProcessor(pipelineName, pluginId);

if (isPeerForwardingRequired()) {
if (hashRing == null) {
hashRing = peerForwarderClientFactory.createHashRing();
}
return new RemotePeerForwarder(
peerForwarderClient,
hashRing,
peerForwarderReceiveBuffer,
pipelineName,
pluginId,
identificationKeys,
pluginMetrics,
peerForwarderConfiguration.getBatchDelay(),
peerForwarderConfiguration.getFailedForwardingRequestLocalWriteTimeout(),
peerForwarderConfiguration.getForwardingBatchSize(),
peerForwarderConfiguration.getForwardingBatchQueueDepth(),
peerForwarderConfiguration.getForwardingBatchTimeout(),
pipelineWorkerThreads
);
}
else {
return new LocalPeerForwarder();
}
}

private PeerForwarderReceiveBuffer<Record<Event>> createBufferPerPipelineProcessor(final String pipelineName, final String pluginId) {
final PeerForwarderReceiveBuffer<Record<Event>> peerForwarderReceiveBuffer = new
PeerForwarderReceiveBuffer<>(peerForwarderConfiguration.getBufferSize(), peerForwarderConfiguration.getBatchSize(), pipelineName, pluginId);

final Map<String, PeerForwarderReceiveBuffer<Record<Event>>> pluginsBufferMap =
pipelinePeerForwarderReceiveBufferMap.computeIfAbsent(pipelineName, k -> new HashMap<>());

pluginsBufferMap.put(pluginId, peerForwarderReceiveBuffer);

return peerForwarderReceiveBuffer;
}

public boolean isPeerForwardingRequired() {
return arePeersConfigured() && pipelinePeerForwarderReceiveBufferMap.size() > 0;
}

public boolean arePeersConfigured() {
final DiscoveryMode discoveryMode = peerForwarderConfiguration.getDiscoveryMode();
if (discoveryMode.equals(DiscoveryMode.LOCAL_NODE)) {
return false;
}
else if (discoveryMode.equals(DiscoveryMode.STATIC) && peerForwarderConfiguration.getStaticEndpoints().size() <= 1) {
return false;
}
return true;
}

public Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> getPipelinePeerForwarderReceiveBufferMap() {
return pipelinePeerForwarderReceiveBufferMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.peerforwarder;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;

import java.util.Map;
import java.util.Set;

public class LocalModePeerForwarderProvider implements PeerForwarderProvider {

private final PeerForwarderProvider peerForwarderProvider;
private boolean isRemotePeerForwarderRegistered;

public LocalModePeerForwarderProvider(final PeerForwarderProvider peerForwarderProvider) {
this.peerForwarderProvider = peerForwarderProvider;
this.isRemotePeerForwarderRegistered = false;
}

@Override
public PeerForwarder register(final String pipelineName, final Processor processor, final String pluginId, final Set<String> identificationKeys, final Integer pipelineWorkerThreads) {
if (((RequiresPeerForwarding)processor).isForLocalProcessingOnly(null)) {
return new LocalPeerForwarder();
}
isRemotePeerForwarderRegistered = true;
return peerForwarderProvider.register(pipelineName, processor, pluginId, identificationKeys, pipelineWorkerThreads);
}

@Override
public boolean isPeerForwardingRequired() {
return isRemotePeerForwarderRegistered;
}

@Override
public Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> getPipelinePeerForwarderReceiveBufferMap() {
return (isRemotePeerForwarderRegistered) ?
peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap() :
Map.of();
}

@Override
public boolean arePeersConfigured() {
return isRemotePeerForwarderRegistered ? peerForwarderProvider.arePeersConfigured() : false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.inject.Named;


@Configuration
class PeerForwarderAppConfig {
Expand Down Expand Up @@ -71,12 +75,18 @@ public PeerForwarderClient peerForwarderClient(final PeerForwarderConfiguration
peerForwarderConfiguration, peerForwarderClientFactory, peerForwarderCodec, pluginMetrics);
}

@Bean
public PeerForwarderProvider peerForwarderProvider(final PeerForwarderClientFactory peerForwarderClientFactory,
@Bean(name = "defaultPeerForwarder")
public DefaultPeerForwarderProvider peerForwarderProvider(final PeerForwarderClientFactory peerForwarderClientFactory,
final PeerForwarderClient peerForwarderClient,
final PeerForwarderConfiguration peerForwarderConfiguration,
@Qualifier("peerForwarderMetrics") final PluginMetrics pluginMetrics) {
return new PeerForwarderProvider(peerForwarderClientFactory, peerForwarderClient, peerForwarderConfiguration, pluginMetrics);
return new DefaultPeerForwarderProvider(peerForwarderClientFactory, peerForwarderClient, peerForwarderConfiguration, pluginMetrics);
}

@Bean
@Primary
public PeerForwarderProvider peerForwarderProvider(@Named("defaultPeerForwarder") final PeerForwarderProvider peerForwarderProvider) {
return new LocalModePeerForwarderProvider(peerForwarderProvider);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,97 +5,49 @@

package org.opensearch.dataprepper.peerforwarder;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient;
import org.opensearch.dataprepper.peerforwarder.discovery.DiscoveryMode;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class PeerForwarderProvider {

private final PeerForwarderClientFactory peerForwarderClientFactory;
private final PeerForwarderClient peerForwarderClient;
private final PeerForwarderConfiguration peerForwarderConfiguration;
private final PluginMetrics pluginMetrics;
private final Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap = new HashMap<>();
private HashRing hashRing;

PeerForwarderProvider(final PeerForwarderClientFactory peerForwarderClientFactory,
final PeerForwarderClient peerForwarderClient,
final PeerForwarderConfiguration peerForwarderConfiguration,
final PluginMetrics pluginMetrics) {
this.peerForwarderClientFactory = peerForwarderClientFactory;
this.peerForwarderClient = peerForwarderClient;
this.peerForwarderConfiguration = peerForwarderConfiguration;
this.pluginMetrics = pluginMetrics;
}

public PeerForwarder register(final String pipelineName, final String pluginId, final Set<String> identificationKeys,
final Integer pipelineWorkerThreads) {
if (pipelinePeerForwarderReceiveBufferMap.containsKey(pipelineName) &&
pipelinePeerForwarderReceiveBufferMap.get(pipelineName).containsKey(pluginId)) {
throw new RuntimeException("Data Prepper 2.0 will only support a single peer-forwarder per pipeline/plugin type");
}

final PeerForwarderReceiveBuffer<Record<Event>> peerForwarderReceiveBuffer = createBufferPerPipelineProcessor(pipelineName, pluginId);

if (isPeerForwardingRequired()) {
if (hashRing == null) {
hashRing = peerForwarderClientFactory.createHashRing();
}
return new RemotePeerForwarder(
peerForwarderClient,
hashRing,
peerForwarderReceiveBuffer,
pipelineName,
pluginId,
identificationKeys,
pluginMetrics,
peerForwarderConfiguration.getBatchDelay(),
peerForwarderConfiguration.getFailedForwardingRequestLocalWriteTimeout(),
peerForwarderConfiguration.getForwardingBatchSize(),
peerForwarderConfiguration.getForwardingBatchQueueDepth(),
peerForwarderConfiguration.getForwardingBatchTimeout(),
pipelineWorkerThreads
);
}
else {
return new LocalPeerForwarder();
}
}

private PeerForwarderReceiveBuffer<Record<Event>> createBufferPerPipelineProcessor(final String pipelineName, final String pluginId) {
final PeerForwarderReceiveBuffer<Record<Event>> peerForwarderReceiveBuffer = new
PeerForwarderReceiveBuffer<>(peerForwarderConfiguration.getBufferSize(), peerForwarderConfiguration.getBatchSize(), pipelineName, pluginId);

final Map<String, PeerForwarderReceiveBuffer<Record<Event>>> pluginsBufferMap =
pipelinePeerForwarderReceiveBufferMap.computeIfAbsent(pipelineName, k -> new HashMap<>());

pluginsBufferMap.put(pluginId, peerForwarderReceiveBuffer);

return peerForwarderReceiveBuffer;
}

public boolean isPeerForwardingRequired() {
return arePeersConfigured() && pipelinePeerForwarderReceiveBufferMap.size() > 0;
}

private boolean arePeersConfigured() {
final DiscoveryMode discoveryMode = peerForwarderConfiguration.getDiscoveryMode();
if (discoveryMode.equals(DiscoveryMode.LOCAL_NODE)) {
return false;
}
else if (discoveryMode.equals(DiscoveryMode.STATIC) && peerForwarderConfiguration.getStaticEndpoints().size() <= 1) {
return false;
}
return true;
}

public Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> getPipelinePeerForwarderReceiveBufferMap() {
return pipelinePeerForwarderReceiveBufferMap;
}
public interface PeerForwarderProvider {
/**
* Registers a pipeline and identification keys
*
* @param pipelineName pipeline name
* @param processor processor
* @param pluginId plugin id
* @param identificationKeys identification keys
* @param pipelineWorkerThreads number of pipeline worker threads
* @return peer forwarder
* @since 2.9
*/
PeerForwarder register(final String pipelineName, final Processor processor, final String pluginId, final Set<String> identificationKeys, final Integer pipelineWorkerThreads);

/**
* Returns if peer forwarding required
*
* @return returns if peer forwarding required or nto
* @since 2.9
*/
boolean isPeerForwardingRequired();

/**
* Returns if peers configured
*
* @return returns if peers configured
* @since 2.9
*/
boolean arePeersConfigured();

/**
* Returns pipeline peer forwarder receive buffer map
*
* @return Map of buffer per pipeline per pluginId
* @since 2.9
*/
Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> getPipelinePeerForwarderReceiveBufferMap();
}

Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ public static List<Processor> decorateProcessors(
"Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId);
}

final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, pluginId, identificationKeys, pipelineWorkerThreads);

return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor))
.collect(Collectors.toList());
return processors.stream()
.map(processor -> {
PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, processor, pluginId, identificationKeys, pipelineWorkerThreads);
return new PeerForwardingProcessorDecorator(peerForwarder, processor);
})
.collect(Collectors.toList());
}

private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor) {
Expand Down
Loading

0 comments on commit 6102091

Please sign in to comment.