Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server) Avoid duplicate registration of StoreEventListener #2620

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private void listenChanges() {
}
return false;
};
this.store().provider().listen(this.storeEventListener);
this.store().provider().listenDataCacheClear(this.storeEventListener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I have read the issue, but I still don't quite understand in what scenarios.the problem is encountered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply put,i found that registering a listener for each transaction instance is redundant because each transaction receives events and then performs a clear operation on the schema/vertex/edge cache , i think that this is redundant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note each provider has multiple listeners, which is our design intention, because each listener wants to get notifications.
This modification looks a bit special at the moment, and we will propose a solution after we figure out the problem. Could you post the error stack if there is an error?


// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ private void listenChanges() {
}
return false;
};
this.graphParams().loadGraphStore().provider().listen(this.storeEventListener);
this.graphParams().loadGraphStore().provider()
.listenSchemaCacheClear(this.storeEventListener);

// Listen cache event: "cache"(invalid cache item)
this.cacheEventListener = event -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imbajin can you share some context why we added CachedSchemaTransactionV2.java? not sure it's used in what scenarios, and can we merge V1+V2 into one class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CachedSchemaTransactionV2 is the CachedSchemaTransaction corresponding to the SchemaTransaction of the pd-store version. We added a V2 to distinguish it from the previous version. Due to the inconsistency of the parameters required for construction, it is currently not easy to reuse, will be merged later...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
haohao0103 marked this conversation as resolved.
Show resolved Hide resolved

import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.raft.StoreSnapshotFile;
Expand All @@ -44,6 +45,8 @@ public abstract class AbstractBackendStoreProvider
private final EventHub storeEventHub = new EventHub("store");

protected Map<String, BackendStore> stores = null;
private static volatile Boolean schemaCacheClearListened = Boolean.FALSE;
haohao0103 marked this conversation as resolved.
Show resolved Hide resolved
private static volatile Boolean vertexEdgeCacheClearListened = Boolean.FALSE;

protected final void notifyAndWaitEvent(String event) {
Future<?> future = this.storeEventHub.notify(event, this);
Expand All @@ -70,6 +73,30 @@ public void listen(EventListener listener) {
this.storeEventHub.listen(EventHub.ANY_EVENT, listener);
}

@Override
public void listenSchemaCacheClear(EventListener listener) {
if (!schemaCacheClearListened) {
synchronized (this) {
if (!schemaCacheClearListened) {
listen(listener);
schemaCacheClearListened = Boolean.TRUE;
}
}
}
}

@Override
public void listenDataCacheClear(EventListener listener) {
if (!vertexEdgeCacheClearListened) {
synchronized (this) {
if (!vertexEdgeCacheClearListened) {
listen(listener);
vertexEdgeCacheClearListened = Boolean.TRUE;
}
}
}
}

@Override
public void unlisten(EventListener listener) {
this.storeEventHub.unlisten(EventHub.ANY_EVENT, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public interface BackendStoreProvider {

void listen(EventListener listener);

default void listenSchemaCacheClear(EventListener listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer listenSchemaCacheAtMostOnce

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks , you mean to update the method name to listenSchemaCacheAtMostOnce?

}

default void listenDataCacheClear(EventListener listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer listenGraphCacheAtMostOnce

}

void unlisten(EventListener listener);

EventHub storeEventHub();
Expand Down
Loading