Skip to content

Commit

Permalink
Merge pull request #137 from atejshee-tibco/atejshee-be-contribution
Browse files Browse the repository at this point in the history
Implementation of Locking Using mongoDB Stores #134
  • Loading branch information
rakulkar-tibco authored Oct 7, 2024
2 parents 71856d7 + e9a6b79 commit 3b03173
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ public interface MongoDBConstants {
String PROPERTY_KEY_MONGODB_ID = "_id";
String PROPERTY_KEY_MONGODB_TIME_CREATED = "time_created_";
String PROPERTY_KEY_MONGODB_TIME_MODIFIED = "time_last_modified_";
String LOCKS_COLLECTION_NAME = "locks";
String DOCUMENT_LOCKS_KEY_FIELD = "key";
String DOCUMENT_LOCKS_MEMBERID_FIELD = "memberid";
String DOCUMENT_LOCKS_SOCKET_FIELD = "socket";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright © 2023-2025 Cloud Software Group, Inc.
* */
/**
* Locking implementation class which uses MongoDB store as the lock provider
*
* */
package com.tibco.be.mongoDB;

import static com.mongodb.client.model.Filters.eq;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.bson.Document;
import org.bson.conversions.Bson;

import com.mongodb.MongoWriteException;
import com.tibco.cep.kernel.service.logging.Level;
import com.tibco.cep.store.custom.StoreColumnData;
import com.tibco.cep.store.custom.StoreHelper;
import com.tibco.cep.store.custom.StoreRowHolder;
import com.tibco.cep.store.locking.AbstractLockProvider;
import com.tibco.cep.store.locking.LockEntry;

public class MongoDBLockProvider extends AbstractLockProvider {

public final static MongoDBLockProvider INSTANCE = new MongoDBLockProvider();
private MongoDBStoreProvider storeProvider;
MongoDBDataTypeMapper mapper;

private MongoDBLockProvider() {
this.storeProvider = (MongoDBStoreProvider) MongoDBStoreProvider.getStoreProviderInstance();
this.mapper = (MongoDBDataTypeMapper) storeProvider.getStoreDataTypeMapper();
}

@Override
public boolean unlock(Object key) {
logger.log(Level.DEBUG, "MongoDB store lock :: Attempting to unlock");
boolean isUnlocked = false;
// Create StoreRowHolder
Map<String, StoreColumnData> columnDataMap = new HashMap<String, StoreColumnData>();

// Column keyname
StoreColumnData column = StoreHelper.getColumn(mapper.getStringType(), key.toString());
columnDataMap.put(MongoDBConstants.PROPERTY_KEY_MONGODB_ID, column);

// Column memberId
column = StoreHelper.getColumn(mapper.getStringType(), memberId);
columnDataMap.put(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, column);
StoreRowHolder rowHolder = new StoreRowHolder(MongoDBConstants.LOCKS_COLLECTION_NAME, columnDataMap);

try {
storeProvider.delete(rowHolder);
isUnlocked = true;
} catch (Exception e) {
logger.log(Level.ERROR, "MongoDB store lock :: Failed while trying to unlock", e);
isUnlocked = false;
}

return isUnlocked;
}

/**
* This method will be used to release all the locks held by a member when it gets killed
* */
@Override
public void unlockOnMemberId(String memberId) {
logger.log(Level.DEBUG, "MongoDB store lock :: Engine shutting down. Releasing all the locks held by " + memberId);
// Create StoreRowHolder
Map<String, StoreColumnData> columnDataMap = new HashMap<String, StoreColumnData>();

// Column memberId
StoreColumnData column = StoreHelper.getColumn(mapper.getStringType(), memberId);
columnDataMap.put(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, column);
StoreRowHolder rowHolder = new StoreRowHolder(MongoDBConstants.LOCKS_COLLECTION_NAME, columnDataMap);

try {
storeProvider.delete(rowHolder);
} catch (Exception e) {
logger.log(Level.ERROR, "MongoDB store lock :: Failed while trying to unlockOnMemberId", e);
}
}

@Override
protected LockEntry getLockEntry(Object key) throws Exception {
// Fetch entry from locks table based on the key
Document document = new Document();
document.append(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD, key.toString());
Document result = storeProvider.readLockEntryWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME);
if (null != result) {
LockEntry lockEntry = new LockEntry(result.get(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD).toString(),
result.get(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD).toString(),
result.get(MongoDBConstants.DOCUMENT_LOCKS_SOCKET_FIELD).toString());
return lockEntry;
} else {
return null;
}

}

@Override
protected boolean addLockEntry(LockEntry entry) throws Exception {
return insertOrUpdateLockEntry(entry, null);
}

@Override
protected boolean updateLockEntry(LockEntry entry, String oldMemberId) throws Exception {
return insertOrUpdateLockEntry(entry, oldMemberId);
}

private boolean insertOrUpdateLockEntry(LockEntry entry, String oldMemberId) throws Exception {
boolean isSuccess = false;

try {
storeProvider.startTransaction();
if (oldMemberId == null || (oldMemberId != null &&
oldMemberId.equalsIgnoreCase(entry.getMemberId())) ) {
// Create the document
Document document = new Document();
document.append(MongoDBConstants.PROPERTY_KEY_MONGODB_ID, entry.getKey());
document.append(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, entry.getMemberId());
document.append(MongoDBConstants.DOCUMENT_LOCKS_SOCKET_FIELD, entry.getSocket());

if (oldMemberId == null) {
// insert new entry

isSuccess = storeProvider.addOrUpdateLockEntryWithFilter(document, null, MongoDBConstants.LOCKS_COLLECTION_NAME, true);
} else {
// Create a filter for the update operation
List<Bson> filters = new ArrayList<Bson>();
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD, entry.getKey()));
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, oldMemberId));
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_SOCKET_FIELD, entry.getSocket()));

isSuccess = storeProvider.addOrUpdateLockEntryWithFilter(document, filters, MongoDBConstants.LOCKS_COLLECTION_NAME, false);
}

if (isSuccess)
storeProvider.commit();
else
storeProvider.rollback();
} else {
isSuccess = false;
}
} catch (MongoWriteException mwe) {
logger.log(Level.DEBUG, "MongoDB store lock :: Cannot insert duplicate entry in Mongo DB store", mwe);
storeProvider.rollback();
isSuccess = false;
} catch (Exception e) {
logger.log(Level.DEBUG, "MongoDB store lock :: Failed while trying to insert/update entry in Mongo DB store", e);
storeProvider.rollback();
isSuccess = false;
} finally {
storeProvider.endTransaction();
}
return isSuccess;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;

import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Updates.set;
Expand Down Expand Up @@ -67,9 +68,13 @@ public class MongoDBStoreProvider extends BaseStoreProvider {

public MongoDBStoreProvider(Cluster cluster, StoreProviderConfig storeConfig) throws Exception {
super(cluster, storeConfig);

lockProvider = MongoDBLockProvider.INSTANCE;
}

public static BaseStoreProvider getStoreProviderInstance() {
return INSTANCE;
}

@Override
public void commit() {
try {
Expand Down Expand Up @@ -413,6 +418,59 @@ public void write(List<StoreRowHolder> storeRowHolder, boolean isUpdate) throws
}

}

/**
* Adds or updates a document in the collection
*
* @param document
* @param filters
* @param collectionName
* @param isInsert
* @return boolean success or failure of the addOrUpdate method
* */
public boolean addOrUpdateLockEntryWithFilter(Document document, List<Bson> filters, String collectionName, boolean isInsert) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
boolean isSuccess = false;
if (isInsert) {
isSuccess = collection.insertOne(document) == null ? false : true;
} else {
// updateOptions has upsert set as false here because we do not want to add a new entry if it already exists
// The update operation would not succeed if the entry already exists
UpdateOptions updateOptions = new UpdateOptions();

// here, rather than sending the document as is, use Updates.set() and send it to the updateOne() method
isSuccess = collection.updateOne(and(filters), document, updateOptions.upsert(isInsert)) == null ? false : true;
}
return isSuccess;
}

/**
* Fetches a document from the collection
*
* @param document
* @param collectionName
* @return Document fetched based off the filter
* */
public Document readLockEntryWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
return collection.find(document).first();
}

/**
* Deletes a document from the collection
* */
public DeleteResult deleteLockEntryWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
return collection.deleteOne(document);
}

/**
* Deletes all the document from the collection based off the filter
* */
public void deleteManyLockEntryWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
collection.deleteMany(document);
}

private void delete_(StoreRowHolder queryHolder) {
try {
Expand All @@ -426,11 +484,11 @@ private void delete_(StoreRowHolder queryHolder) {

if (filter != null) {
getLogger().log(Level.DEBUG, " ****** Filter delete query :" + filter.toString());
collection.deleteOne(filter);
collection.deleteMany(filter);
getLogger().log(Level.DEBUG, "Deleted Record: " + filter.toBsonDocument().toJson());

} else {
collection.deleteOne(query);
collection.deleteMany(query);
getLogger().log(Level.DEBUG, "Deleted All Records: " + query.toJson());
}

Expand Down

0 comments on commit 3b03173

Please sign in to comment.