From e9a6b79c6d784829b75d59c921afefc6d58e0d0e Mon Sep 17 00:00:00 2001 From: atejshee-tibco Date: Mon, 7 Oct 2024 14:39:14 +0530 Subject: [PATCH] Implementation of Locking Using mongoDB Stores #134 --- .../tibco/be/mongoDB/MongoDBLockProvider.java | 50 +++++++++++++------ .../be/mongoDB/MongoDBStoreProvider.java | 12 ++--- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/store/mongoDB/src/main/java/com/tibco/be/mongoDB/MongoDBLockProvider.java b/store/mongoDB/src/main/java/com/tibco/be/mongoDB/MongoDBLockProvider.java index c4b963a9..671f2c6e 100644 --- a/store/mongoDB/src/main/java/com/tibco/be/mongoDB/MongoDBLockProvider.java +++ b/store/mongoDB/src/main/java/com/tibco/be/mongoDB/MongoDBLockProvider.java @@ -10,14 +10,18 @@ import static com.mongodb.client.model.Filters.eq; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.MongoWriteException; -import com.mongodb.client.result.DeleteResult; import com.tibco.cep.kernel.service.logging.Level; +import com.tibco.cep.store.custom.StoreColumnData; +import com.tibco.cep.store.custom.StoreHelper; +import com.tibco.cep.store.custom.StoreRowHolder; import com.tibco.cep.store.locking.AbstractLockProvider; import com.tibco.cep.store.locking.LockEntry; @@ -25,26 +29,37 @@ public class MongoDBLockProvider extends AbstractLockProvider { public final static MongoDBLockProvider INSTANCE = new MongoDBLockProvider(); private MongoDBStoreProvider storeProvider; + MongoDBDataTypeMapper mapper; private MongoDBLockProvider() { this.storeProvider = (MongoDBStoreProvider) MongoDBStoreProvider.getStoreProviderInstance(); + this.mapper = (MongoDBDataTypeMapper) storeProvider.getStoreDataTypeMapper(); } @Override public boolean unlock(Object key) { logger.log(Level.DEBUG, "MongoDB store lock :: Attempting to unlock"); boolean isUnlocked = false; - Document document = new Document(); - document.append(MongoDBConstants.PROPERTY_KEY_MONGODB_ID, key.toString()); - document.append(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, memberId); - DeleteResult result; + // Create StoreRowHolder + Map columnDataMap = new HashMap(); + + // Column keyname + StoreColumnData column = StoreHelper.getColumn(mapper.getStringType(), key.toString()); + columnDataMap.put(MongoDBConstants.PROPERTY_KEY_MONGODB_ID, column); + + // Column memberId + column = StoreHelper.getColumn(mapper.getStringType(), memberId); + columnDataMap.put(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, column); + StoreRowHolder rowHolder = new StoreRowHolder(MongoDBConstants.LOCKS_COLLECTION_NAME, columnDataMap); + try { - result = storeProvider.deleteWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME); - isUnlocked = result.wasAcknowledged(); + storeProvider.delete(rowHolder); + isUnlocked = true; } catch (Exception e) { logger.log(Level.ERROR, "MongoDB store lock :: Failed while trying to unlock", e); isUnlocked = false; } + return isUnlocked; } @@ -54,10 +69,16 @@ public boolean unlock(Object key) { @Override public void unlockOnMemberId(String memberId) { logger.log(Level.DEBUG, "MongoDB store lock :: Engine shutting down. Releasing all the locks held by " + memberId); - Document document = new Document(); - document.append(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, memberId); + // Create StoreRowHolder + Map columnDataMap = new HashMap(); + + // Column memberId + StoreColumnData column = StoreHelper.getColumn(mapper.getStringType(), memberId); + columnDataMap.put(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, column); + StoreRowHolder rowHolder = new StoreRowHolder(MongoDBConstants.LOCKS_COLLECTION_NAME, columnDataMap); + try { - storeProvider.deleteManyWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME); + storeProvider.delete(rowHolder); } catch (Exception e) { logger.log(Level.ERROR, "MongoDB store lock :: Failed while trying to unlockOnMemberId", e); } @@ -68,7 +89,7 @@ protected LockEntry getLockEntry(Object key) throws Exception { // Fetch entry from locks table based on the key Document document = new Document(); document.append(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD, key.toString()); - Document result = storeProvider.readWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME); + Document result = storeProvider.readLockEntryWithFilter(document, MongoDBConstants.LOCKS_COLLECTION_NAME); if (null != result) { LockEntry lockEntry = new LockEntry(result.get(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD).toString(), result.get(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD).toString(), @@ -106,18 +127,15 @@ private boolean insertOrUpdateLockEntry(LockEntry entry, String oldMemberId) thr if (oldMemberId == null) { // insert new entry - isSuccess = storeProvider.addOrUpdateEntryWithFilter(document, null, MongoDBConstants.LOCKS_COLLECTION_NAME, true); + isSuccess = storeProvider.addOrUpdateLockEntryWithFilter(document, null, MongoDBConstants.LOCKS_COLLECTION_NAME, true); } else { - // update old entry only if the filter matches, set isInsert = false - // don't add a new entry if the filter does not match (upsert = false) - // Create a filter for the update operation List filters = new ArrayList(); filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_KEY_FIELD, entry.getKey())); filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_MEMBERID_FIELD, oldMemberId)); filters.add(eq(MongoDBConstants.DOCUMENT_LOCKS_SOCKET_FIELD, entry.getSocket())); - isSuccess = storeProvider.addOrUpdateEntryWithFilter(document, filters, MongoDBConstants.LOCKS_COLLECTION_NAME, false); + isSuccess = storeProvider.addOrUpdateLockEntryWithFilter(document, filters, MongoDBConstants.LOCKS_COLLECTION_NAME, false); } if (isSuccess) diff --git a/store/mongoDB/src/main/java/com/tibco/be/mongoDB/MongoDBStoreProvider.java b/store/mongoDB/src/main/java/com/tibco/be/mongoDB/MongoDBStoreProvider.java index 2b00f9b3..e6d193f7 100644 --- a/store/mongoDB/src/main/java/com/tibco/be/mongoDB/MongoDBStoreProvider.java +++ b/store/mongoDB/src/main/java/com/tibco/be/mongoDB/MongoDBStoreProvider.java @@ -428,7 +428,7 @@ public void write(List storeRowHolder, boolean isUpdate) throws * @param isInsert * @return boolean success or failure of the addOrUpdate method * */ - public boolean addOrUpdateEntryWithFilter(Document document, List filters, String collectionName, boolean isInsert) throws Exception { + public boolean addOrUpdateLockEntryWithFilter(Document document, List filters, String collectionName, boolean isInsert) throws Exception { MongoCollection collection = mongodatabase.getCollection(collectionName); boolean isSuccess = false; if (isInsert) { @@ -451,7 +451,7 @@ public boolean addOrUpdateEntryWithFilter(Document document, List filters, * @param collectionName * @return Document fetched based off the filter * */ - public Document readWithFilter(Document document, String collectionName) throws Exception { + public Document readLockEntryWithFilter(Document document, String collectionName) throws Exception { MongoCollection collection = mongodatabase.getCollection(collectionName); return collection.find(document).first(); } @@ -459,7 +459,7 @@ public Document readWithFilter(Document document, String collectionName) throws /** * Deletes a document from the collection * */ - public DeleteResult deleteWithFilter(Document document, String collectionName) throws Exception { + public DeleteResult deleteLockEntryWithFilter(Document document, String collectionName) throws Exception { MongoCollection collection = mongodatabase.getCollection(collectionName); return collection.deleteOne(document); } @@ -467,7 +467,7 @@ public DeleteResult deleteWithFilter(Document document, String collectionName) t /** * Deletes all the document from the collection based off the filter * */ - public void deleteManyWithFilter(Document document, String collectionName) throws Exception { + public void deleteManyLockEntryWithFilter(Document document, String collectionName) throws Exception { MongoCollection collection = mongodatabase.getCollection(collectionName); collection.deleteMany(document); } @@ -484,11 +484,11 @@ private void delete_(StoreRowHolder queryHolder) { if (filter != null) { getLogger().log(Level.DEBUG, " ****** Filter delete query :" + filter.toString()); - collection.deleteOne(filter); + collection.deleteMany(filter); getLogger().log(Level.DEBUG, "Deleted Record: " + filter.toBsonDocument().toJson()); } else { - collection.deleteOne(query); + collection.deleteMany(query); getLogger().log(Level.DEBUG, "Deleted All Records: " + query.toJson()); }