Skip to content

Commit

Permalink
Implementation of Locking Using mongoDB Stores TIBCOSoftware#134
Browse files Browse the repository at this point in the history
  • Loading branch information
atejshee-tibco committed Oct 7, 2024
1 parent a609f4c commit e9a6b79
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,56 @@
import static com.mongodb.client.model.Filters.eq;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.bson.Document;
import org.bson.conversions.Bson;

import com.mongodb.MongoWriteException;
import com.mongodb.client.result.DeleteResult;
import com.tibco.cep.kernel.service.logging.Level;
import com.tibco.cep.store.custom.StoreColumnData;
import com.tibco.cep.store.custom.StoreHelper;
import com.tibco.cep.store.custom.StoreRowHolder;
import com.tibco.cep.store.locking.AbstractLockProvider;
import com.tibco.cep.store.locking.LockEntry;

public class MongoDBLockProvider extends AbstractLockProvider {

public final static MongoDBLockProvider INSTANCE = new MongoDBLockProvider();
private MongoDBStoreProvider storeProvider;
MongoDBDataTypeMapper mapper;

private MongoDBLockProvider() {
this.storeProvider = (MongoDBStoreProvider) MongoDBStoreProvider.getStoreProviderInstance();
this.mapper = (MongoDBDataTypeMapper) storeProvider.getStoreDataTypeMapper();
}

@Override
public boolean unlock(Object key) {
logger.log(Level.DEBUG, "MongoDB store lock :: Attempting to unlock");
boolean isUnlocked = false;
Document document = new Document();
document.append(MongoDBConstants.PROPERTY_KEY_MONGODB_ID, key.toString());
document.append(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, memberId);
DeleteResult result;
// Create StoreRowHolder
Map<String, StoreColumnData> columnDataMap = new HashMap<String, StoreColumnData>();

// Column keyname
StoreColumnData column = StoreHelper.getColumn(mapper.getStringType(), key.toString());
columnDataMap.put(MongoDBConstants.PROPERTY_KEY_MONGODB_ID, column);

// Column memberId
column = StoreHelper.getColumn(mapper.getStringType(), memberId);
columnDataMap.put(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, column);
StoreRowHolder rowHolder = new StoreRowHolder(MongoDBConstants.LOCKS_COLLECTION_NAME, columnDataMap);

try {
result = storeProvider.deleteWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME);
isUnlocked = result.wasAcknowledged();
storeProvider.delete(rowHolder);
isUnlocked = true;
} catch (Exception e) {
logger.log(Level.ERROR, "MongoDB store lock :: Failed while trying to unlock", e);
isUnlocked = false;
}

return isUnlocked;
}

Expand All @@ -54,10 +69,16 @@ public boolean unlock(Object key) {
@Override
public void unlockOnMemberId(String memberId) {
logger.log(Level.DEBUG, "MongoDB store lock :: Engine shutting down. Releasing all the locks held by " + memberId);
Document document = new Document();
document.append(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, memberId);
// Create StoreRowHolder
Map<String, StoreColumnData> columnDataMap = new HashMap<String, StoreColumnData>();

// Column memberId
StoreColumnData column = StoreHelper.getColumn(mapper.getStringType(), memberId);
columnDataMap.put(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, column);
StoreRowHolder rowHolder = new StoreRowHolder(MongoDBConstants.LOCKS_COLLECTION_NAME, columnDataMap);

try {
storeProvider.deleteManyWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME);
storeProvider.delete(rowHolder);
} catch (Exception e) {
logger.log(Level.ERROR, "MongoDB store lock :: Failed while trying to unlockOnMemberId", e);
}
Expand All @@ -68,7 +89,7 @@ protected LockEntry getLockEntry(Object key) throws Exception {
// Fetch entry from locks table based on the key
Document document = new Document();
document.append(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD, key.toString());
Document result = storeProvider.readWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME);
Document result = storeProvider.readLockEntryWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME);
if (null != result) {
LockEntry lockEntry = new LockEntry(result.get(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD).toString(),
result.get(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD).toString(),
Expand Down Expand Up @@ -106,18 +127,15 @@ private boolean insertOrUpdateLockEntry(LockEntry entry, String oldMemberId) thr
if (oldMemberId == null) {
// insert new entry

isSuccess = storeProvider.addOrUpdateEntryWithFilter(document, null, MongoDBConstants.LOCKS_COLLECTION_NAME, true);
isSuccess = storeProvider.addOrUpdateLockEntryWithFilter(document, null, MongoDBConstants.LOCKS_COLLECTION_NAME, true);
} else {
// update old entry only if the filter matches, set isInsert = false
// don't add a new entry if the filter does not match (upsert = false)

// Create a filter for the update operation
List<Bson> filters = new ArrayList<Bson>();
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD, entry.getKey()));
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, oldMemberId));
filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_SOCKET_FIELD, entry.getSocket()));

isSuccess = storeProvider.addOrUpdateEntryWithFilter(document, filters, MongoDBConstants.LOCKS_COLLECTION_NAME, false);
isSuccess = storeProvider.addOrUpdateLockEntryWithFilter(document, filters, MongoDBConstants.LOCKS_COLLECTION_NAME, false);
}

if (isSuccess)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public void write(List<StoreRowHolder> storeRowHolder, boolean isUpdate) throws
* @param isInsert
* @return boolean success or failure of the addOrUpdate method
* */
public boolean addOrUpdateEntryWithFilter(Document document, List<Bson> filters, String collectionName, boolean isInsert) throws Exception {
public boolean addOrUpdateLockEntryWithFilter(Document document, List<Bson> filters, String collectionName, boolean isInsert) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
boolean isSuccess = false;
if (isInsert) {
Expand All @@ -451,23 +451,23 @@ public boolean addOrUpdateEntryWithFilter(Document document, List<Bson> filters,
* @param collectionName
* @return Document fetched based off the filter
* */
public Document readWithFilter(Document document, String collectionName) throws Exception {
public Document readLockEntryWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
return collection.find(document).first();
}

/**
* Deletes a document from the collection
* */
public DeleteResult deleteWithFilter(Document document, String collectionName) throws Exception {
public DeleteResult deleteLockEntryWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
return collection.deleteOne(document);
}

/**
* Deletes all the document from the collection based off the filter
* */
public void deleteManyWithFilter(Document document, String collectionName) throws Exception {
public void deleteManyLockEntryWithFilter(Document document, String collectionName) throws Exception {
MongoCollection<Document> collection = mongodatabase.getCollection(collectionName);
collection.deleteMany(document);
}
Expand All @@ -484,11 +484,11 @@ private void delete_(StoreRowHolder queryHolder) {

if (filter != null) {
getLogger().log(Level.DEBUG, " ****** Filter delete query :" + filter.toString());
collection.deleteOne(filter);
collection.deleteMany(filter);
getLogger().log(Level.DEBUG, "Deleted Record: " + filter.toBsonDocument().toJson());

} else {
collection.deleteOne(query);
collection.deleteMany(query);
getLogger().log(Level.DEBUG, "Deleted All Records: " + query.toJson());
}

Expand Down

0 comments on commit e9a6b79

Please sign in to comment.