Skip to content

Commit

Permalink
Implementation of Locking Using mongoDB Stores TIBCOSoftware#134
Browse files Browse the repository at this point in the history
  • Loading branch information
atejshee-tibco committed Sep 11, 2024
1 parent 71856d7 commit a609f4c
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ public interface MongoDBConstants {
String PROPERTY_KEY_MONGODB_ID = "_id";
String PROPERTY_KEY_MONGODB_TIME_CREATED = "time_created_";
String PROPERTY_KEY_MONGODB_TIME_MODIFIED = "time_last_modified_";
String LOCKS_COLLECTION_NAME = "locks";
String DOCUMENT_LOCKS_KEY_FIELD = "key";
String DOCUMENT_LOCKS_MEMBERID_FIELD = "memberid";
String DOCUMENT_LOCKS_SOCKET_FIELD = "socket";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright © 2023-2025 Cloud Software Group, Inc.
* */
/**
* Locking implementation class which uses MongoDB store as the lock provider
*
* */
package com.tibco.be.mongoDB;

import static com.mongodb.client.model.Filters.eq;

import java.util.ArrayList;
import java.util.List;

import org.bson.Document;
import org.bson.conversions.Bson;

import com.mongodb.MongoWriteException;
import com.mongodb.client.result.DeleteResult;
import com.tibco.cep.kernel.service.logging.Level;
import com.tibco.cep.store.locking.AbstractLockProvider;
import com.tibco.cep.store.locking.LockEntry;

public class MongoDBLockProvider extends AbstractLockProvider {

public final static MongoDBLockProvider INSTANCE = new MongoDBLockProvider();
private MongoDBStoreProvider storeProvider;

private MongoDBLockProvider() {
this.storeProvider = (MongoDBStoreProvider) MongoDBStoreProvider.getStoreProviderInstance();
}

@Override
public boolean unlock(Object key) {
logger.log(Level.DEBUG, "MongoDB store lock :: Attempting to unlock");
boolean isUnlocked = false;
Document document = new Document();
document.append(MongoDBConstants.PROPERTY_KEY_MONGODB_ID, key.toString());
document.append(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, memberId);
DeleteResult result;
try {
result = storeProvider.deleteWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME);
isUnlocked = result.wasAcknowledged();
} catch (Exception e) {
logger.log(Level.ERROR, "MongoDB store lock :: Failed while trying to unlock", e);
isUnlocked = false;
}
return isUnlocked;
}

/**
* This method will be used to release all the locks held by a member when it gets killed
* */
@Override
public void unlockOnMemberId(String memberId) {
logger.log(Level.DEBUG, "MongoDB store lock :: Engine shutting down. Releasing all the locks held by " + memberId);
Document document = new Document();
document.append(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, memberId);
try {
storeProvider.deleteManyWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME);
} catch (Exception e) {
logger.log(Level.ERROR, "MongoDB store lock :: Failed while trying to unlockOnMemberId", e);
}
}

@Override
protected LockEntry getLockEntry(Object key) throws Exception {
// Fetch entry from locks table based on the key
Document document = new Document();
document.append(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD, key.toString());
Document result = storeProvider.readWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME);
if (null != result) {
LockEntry lockEntry = new LockEntry(result.get(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD).toString(),
result.get(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD).toString(),
result.get(MongoDBConstants.DOCUMENT_LOCKS_SOCKET_FIELD).toString());
return lockEntry;
} else {
return null;
}

}

@Override
protected boolean addLockEntry(LockEntry entry) throws Exception {
return insertOrUpdateLockEntry(entry, null);
}

@Override
protected boolean updateLockEntry(LockEntry entry, String oldMemberId) throws Exception {
return insertOrUpdateLockEntry(entry, oldMemberId);
}

private boolean insertOrUpdateLockEntry(LockEntry entry, String oldMemberId) throws Exception {
boolean isSuccess = false;

try {
storeProvider.startTransaction();
if (oldMemberId == null || (oldMemberId != null &&
oldMemberId.equalsIgnoreCase(entry.getMemberId())) ) {
// Create the document
Document document = new Document();
document.append(MongoDBConstants.PROPERTY_KEY_MONGODB_ID, entry.getKey());
document.append(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, entry.getMemberId());
document.append(MongoDBConstants.DOCUMENT_LOCKS_SOCKET_FIELD, entry.getSocket());

if (oldMemberId == null) {
// insert new entry

isSuccess = storeProvider.addOrUpdateEntryWithFilter(document, null, MongoDBConstants.LOCKS_COLLECTION_NAME, true);
} else {
// update old entry only if the filter matches, set isInsert = false
// don't add a new entry if the filter does not match (upsert = false)

// Create a filter for the update operation
List<Bson> filters = new ArrayList<Bson>();
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD, entry.getKey()));
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, oldMemberId));
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_SOCKET_FIELD, entry.getSocket()));

isSuccess = storeProvider.addOrUpdateEntryWithFilter(document, filters, MongoDBConstants.LOCKS_COLLECTION_NAME, false);
}

if (isSuccess)
storeProvider.commit();
else
storeProvider.rollback();
} else {
isSuccess = false;
}
} catch (MongoWriteException mwe) {
logger.log(Level.DEBUG, "MongoDB store lock :: Cannot insert duplicate entry in Mongo DB store", mwe);
storeProvider.rollback();
isSuccess = false;
} catch (Exception e) {
logger.log(Level.DEBUG, "MongoDB store lock :: Failed while trying to insert/update entry in Mongo DB store", e);
storeProvider.rollback();
isSuccess = false;
} finally {
storeProvider.endTransaction();
}
return isSuccess;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;

import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Updates.set;
Expand Down Expand Up @@ -67,9 +68,13 @@ public class MongoDBStoreProvider extends BaseStoreProvider {

public MongoDBStoreProvider(Cluster cluster, StoreProviderConfig storeConfig) throws Exception {
super(cluster, storeConfig);

lockProvider = MongoDBLockProvider.INSTANCE;
}

public static BaseStoreProvider getStoreProviderInstance() {
return INSTANCE;
}

@Override
public void commit() {
try {
Expand Down Expand Up @@ -413,6 +418,59 @@ public void write(List<StoreRowHolder> storeRowHolder, boolean isUpdate) throws
}

}

/**
* Adds or updates a document in the collection
*
* @param document
* @param filters
* @param collectionName
* @param isInsert
* @return boolean success or failure of the addOrUpdate method
* */
public boolean addOrUpdateEntryWithFilter(Document document, List<Bson> filters, String collectionName, boolean isInsert) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
boolean isSuccess = false;
if (isInsert) {
isSuccess = collection.insertOne(document) == null ? false : true;
} else {
// updateOptions has upsert set as false here because we do not want to add a new entry if it already exists
// The update operation would not succeed if the entry already exists
UpdateOptions updateOptions = new UpdateOptions();

// here, rather than sending the document as is, use Updates.set() and send it to the updateOne() method
isSuccess = collection.updateOne(and(filters), document, updateOptions.upsert(isInsert)) == null ? false : true;
}
return isSuccess;
}

/**
* Fetches a document from the collection
*
* @param document
* @param collectionName
* @return Document fetched based off the filter
* */
public Document readWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
return collection.find(document).first();
}

/**
* Deletes a document from the collection
* */
public DeleteResult deleteWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
return collection.deleteOne(document);
}

/**
* Deletes all the document from the collection based off the filter
* */
public void deleteManyWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
collection.deleteMany(document);
}

private void delete_(StoreRowHolder queryHolder) {
try {
Expand Down

0 comments on commit a609f4c

Please sign in to comment.