Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup dependencies #4

Merged
merged 3 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions modules/siddhi-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
<artifactId>siddhi-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.log4j.wso2</groupId>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.orbit.com.lmax</groupId>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
<dependency>
Expand Down Expand Up @@ -77,14 +77,6 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.osgi</groupId>
<artifactId>org.eclipse.osgi.services</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
<dependency>
<groupId>org.atteo.classindex</groupId>
<artifactId>classindex</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

/**
* Holder object for context information of {@link org.wso2.siddhi.query.api.SiddhiApp}.
Expand All @@ -50,6 +51,7 @@ public class SiddhiAppContext {
private boolean enforceOrder;
private StatisticsManager statisticsManager = null;

private ThreadFactory executorThreadFactory;
private ExecutorService executorService;
private ScheduledExecutorService scheduledExecutorService;
private List<EternalReferencedHolder> eternalReferencedHolders;
Expand Down Expand Up @@ -139,6 +141,14 @@ public void setThreadBarrier(ThreadBarrier threadBarrier) {
this.threadBarrier = threadBarrier;
}

public ThreadFactory getExecutorThreadFactory() {
return this.executorThreadFactory;
}

public void setExecutorThreadFactory(ThreadFactory threadFactory) {
this.executorThreadFactory = threadFactory;
}

public ExecutorService getExecutorService() {
return executorService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
localStreamJunctionMap.putIfAbsent(id, outputStreamJunction);
Expand All @@ -148,7 +148,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id, outputStreamJunction);
Expand All @@ -166,7 +166,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id, outputStreamJunction);
Expand Down Expand Up @@ -291,7 +291,7 @@ private synchronized void clonePartition(String key) {
StreamJunction streamJunction = localStreamJunctionMap.get(streamId + key);
if (streamJunction == null) {
streamJunction = new StreamJunction(streamDefinition, siddhiAppContext
.getExecutorService(),
.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
localStreamJunctionMap.put(streamId + key, streamJunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public void addStreamJunction(String key, List<QueryRuntime> queryRuntimeList) {
}

private StreamJunction createStreamJunction() {
return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorService(),
return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;

/**
* Stream Junction is the place where streams are collected and distributed. There will be an Stream Junction per
Expand All @@ -69,7 +69,7 @@ public class StreamJunction implements EventBufferHolder {
private int bufferSize;
private List<Receiver> receivers = new CopyOnWriteArrayList<Receiver>();
private List<Publisher> publishers = Collections.synchronizedList(new LinkedList<>());
private ExecutorService executorService;
private ThreadFactory threadFactory;
private boolean async = false;
private Disruptor<EventExchangeHolder> disruptor;
private RingBuffer<EventExchangeHolder> ringBuffer;
Expand All @@ -80,12 +80,12 @@ public class StreamJunction implements EventBufferHolder {
private OnErrorAction onErrorAction = OnErrorAction.LOG;
private ExceptionListener exceptionListener;

public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int bufferSize,
public StreamJunction(StreamDefinition streamDefinition, ThreadFactory threadFactory, int bufferSize,
StreamJunction faultStreamJunction, SiddhiAppContext siddhiAppContext) {
this.streamDefinition = streamDefinition;
this.bufferSize = bufferSize;
this.batchSize = bufferSize;
this.executorService = executorService;
this.threadFactory = threadFactory;
this.siddhiAppContext = siddhiAppContext;
if (siddhiAppContext.getStatisticsManager() != null) {
this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext,
Expand Down Expand Up @@ -285,7 +285,7 @@ public synchronized void startProcessing() {
ProducerType producerType = ProducerType.MULTI;
disruptor = new Disruptor<EventExchangeHolder>(
new EventExchangeHolderFactory(streamDefinition.getAttributeList().size()),
bufferSize, executorService, producerType,
bufferSize, threadFactory, producerType,
new BlockingWaitStrategy());
disruptor.handleExceptionsWith(siddhiAppContext.getDisruptorExceptionHandler());
break;
Expand All @@ -294,7 +294,7 @@ public synchronized void startProcessing() {
if (disruptor == null) {
disruptor = new Disruptor<EventExchangeHolder>(
new EventExchangeHolderFactory(streamDefinition.getAttributeList().size()),
bufferSize, executorService);
bufferSize, threadFactory);
disruptor.handleExceptionsWith(siddhiAppContext.getDisruptorExceptionHandler());
}
if (workers > 0) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public String addQuery(QueryRuntime queryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(streamDefinition.getId(), outputStreamJunction);
}
Expand All @@ -217,7 +217,7 @@ public String addQuery(QueryRuntime queryRuntime) {

if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(streamDefinition.getId(), outputStreamJunction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@

import org.apache.log4j.Logger;
import org.atteo.classindex.ClassIndex;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.BundleListener;
import org.osgi.framework.wiring.BundleWiring;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.executor.incremental.IncrementalAggregateBaseTimeFunctionExecutor;
import org.wso2.siddhi.core.executor.incremental.IncrementalShouldUpdateFunctionExecutor;
import org.wso2.siddhi.core.executor.incremental.IncrementalStartTimeEndTimeFunctionExecutor;
import org.wso2.siddhi.core.executor.incremental.IncrementalTimeGetTimeZone;
import org.wso2.siddhi.core.executor.incremental.IncrementalUnixTimeFunctionExecutor;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -49,22 +43,6 @@ public class SiddhiExtensionLoader {
*/
public static void loadSiddhiExtensions(Map<String, Class> siddhiExtensionsMap) {
loadLocalExtensions(siddhiExtensionsMap);
BundleContext bundleContext = ReferenceHolder.getInstance().getBundleContext();
if (bundleContext != null) {
loadExtensionOSGI(bundleContext, siddhiExtensionsMap);
}
}

/**
* Load Extensions in OSGi environment.
*
* @param bundleContext OSGi bundleContext
* @param siddhiExtensionsMap reference map for the Siddhi extension
*/
private static void loadExtensionOSGI(BundleContext bundleContext, Map<String, Class> siddhiExtensionsMap) {
ExtensionBundleListener extensionBundleListener = new ExtensionBundleListener(siddhiExtensionsMap);
bundleContext.addBundleListener(extensionBundleListener);
extensionBundleListener.loadAllExtensions(bundleContext);
}

/**
Expand Down Expand Up @@ -151,52 +129,4 @@ private static void addExtensionToMap(String fqExtensionName, Class extensionCla
"loaded with the same namespace and name '" + fqExtensionName + "'");
}
}

/**
* Class to listen to Bundle changes to update available extensions.
*/
private static class ExtensionBundleListener implements BundleListener {

private Map<Class, Integer> bundleExtensions = new HashMap<Class, Integer>();
private Map<String, Class> siddhiExtensionsMap;

ExtensionBundleListener(Map<String, Class> siddhiExtensionsMap) {
this.siddhiExtensionsMap = siddhiExtensionsMap;
}

@Override
public void bundleChanged(BundleEvent bundleEvent) {
if (bundleEvent.getType() == BundleEvent.STARTED) {
addExtensions(bundleEvent.getBundle());
} else {
removeExtensions(bundleEvent.getBundle());
}
}

private void addExtensions(Bundle bundle) {
ClassLoader classLoader = bundle.adapt(BundleWiring.class).getClassLoader();
Iterable<Class<?>> extensions = ClassIndex.getAnnotated(Extension.class, classLoader);
for (Class extension : extensions) {
addExtensionToMap(extension, siddhiExtensionsMap);
bundleExtensions.put(extension, (int) bundle.getBundleId());
}
}

private void removeExtensions(Bundle bundle) {
bundleExtensions.entrySet().stream().filter(entry -> entry.getValue() ==
bundle.getBundleId()).forEachOrdered(entry -> {
siddhiExtensionsMap.remove(entry.getKey());
});
bundleExtensions.entrySet().removeIf(entry -> entry.getValue() ==
bundle.getBundleId());
}

void loadAllExtensions(BundleContext bundleContext) {
for (Bundle b : bundleContext.getBundles()) {
if (b.getState() == Bundle.ACTIVE) {
addExtensions(b);
}
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public static OutputCallback constructOutputCallback(OutputStream outStream, Str
StreamJunction outputStreamJunction = streamJunctionMap.get(id + key);
if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(outputStreamDefinition,
siddhiAppContext.getExecutorService(),
siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id + key, outputStreamJunction);
}
Expand Down
Loading