diff --git a/server/db/src/main/resources/com/walmartlabs/concord/server/db/v2.10.0.xml b/server/db/src/main/resources/com/walmartlabs/concord/server/db/v2.10.0.xml index a06d93fc23..5be58ee844 100644 --- a/server/db/src/main/resources/com/walmartlabs/concord/server/db/v2.10.0.xml +++ b/server/db/src/main/resources/com/walmartlabs/concord/server/db/v2.10.0.xml @@ -119,4 +119,10 @@ onDelete="SET NULL"/> + + + create index concurrently IDX_WAIT_CONDITIONS on PROCESS_WAIT_CONDITIONS using gin (WAIT_CONDITIONS) + + + diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessModule.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessModule.java index 8a944573cb..990032ef5c 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessModule.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/ProcessModule.java @@ -84,6 +84,7 @@ public void configure(Binder binder) { newSetBinder(binder, ProcessStatusListener.class).addBinding().to(WaitProcessStatusListener.class); newSetBinder(binder, ProcessStatusListener.class).addBinding().to(ExternalProcessListenerHandler.class); + newSetBinder(binder, ProcessStatusListener.class).addBinding().to(WaitConditionUpdater.class); newSetBinder(binder, Filter.class).addBinding().to(ConcurrentProcessFilter.class); newSetBinder(binder, Filter.class).addBinding().to(ExclusiveProcessFilter.class); diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/waits/WaitConditionUpdater.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/waits/WaitConditionUpdater.java new file mode 100644 index 0000000000..3b4e16b39d --- /dev/null +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/waits/WaitConditionUpdater.java @@ -0,0 +1,67 @@ +package com.walmartlabs.concord.server.process.waits; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc. + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.walmartlabs.concord.server.process.queue.ProcessStatusListener; +import com.walmartlabs.concord.server.sdk.ProcessKey; +import com.walmartlabs.concord.server.sdk.ProcessStatus; +import org.jooq.DSLContext; + +public class WaitConditionUpdater implements ProcessStatusListener { + + @Override + public void onStatusChange(DSLContext tx, ProcessKey processKey, ProcessStatus status) { + switch (status) { + case SUSPENDED: + case FINISHED: + case FAILED: + case CANCELLED: + case TIMED_OUT: + updateWaitConditions(tx, processKey, status); + break; + default: + // do nothing + } + } + + private static void updateWaitConditions(DSLContext tx, ProcessKey processKey, ProcessStatus status) { + String sql = """ + UPDATE process_wait_conditions + SET wait_conditions = + (SELECT jsonb_agg( + CASE + WHEN obj->>'type' = 'PROCESS_COMPLETION' and obj->>'completeCondition' = 'ALL' + THEN jsonb_set(obj, '{processes}', (obj->'processes') - ?) + WHEN obj->>'type' = 'PROCESS_COMPLETION' and obj->>'completeCondition' = 'ONE_OF' and obj->'processes' ?? ? + THEN jsonb_set(obj, '{processes}', '[]') + ELSE obj + END + ) + FROM jsonb_array_elements(wait_conditions) obj), + version = version + 1 + WHERE wait_conditions @> ?::jsonb; + """; + + String jsonMatch = String.format("[{\"type\": \"PROCESS_COMPLETION\", \"finalStatuses\": [\"%s\"], \"processes\": [\"%s\"]}]", status.name(), processKey.getInstanceId().toString()); + + tx.execute(sql, processKey.getInstanceId().toString(), processKey.getInstanceId().toString(), jsonMatch); + } +} diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/waits/WaitProcessFinishHandler.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/waits/WaitProcessFinishHandler.java index 9f50d5031e..0dee10d3d6 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/waits/WaitProcessFinishHandler.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/waits/WaitProcessFinishHandler.java @@ -20,9 +20,7 @@ * ===== */ -import com.walmartlabs.concord.db.AbstractDao; import com.walmartlabs.concord.db.MainDB; -import com.walmartlabs.concord.server.jooq.tables.ProcessQueue; import com.walmartlabs.concord.server.process.queue.ProcessQueueManager; import com.walmartlabs.concord.server.sdk.ProcessKey; import com.walmartlabs.concord.server.sdk.ProcessStatus; @@ -31,26 +29,19 @@ import javax.inject.Inject; import javax.inject.Singleton; -import java.util.ArrayList; -import java.util.List; import java.util.Set; import java.util.UUID; -import static com.walmartlabs.concord.server.jooq.tables.ProcessQueue.PROCESS_QUEUE; -import static com.walmartlabs.concord.server.process.waits.ProcessCompletionCondition.CompleteCondition; - /** * Handles the processes that are waiting for other processes to finish. */ @Singleton public class WaitProcessFinishHandler implements ProcessWaitHandler { - private final Dao dao; private final ProcessQueueManager processQueueManager; @Inject - public WaitProcessFinishHandler(@MainDB Configuration dbCfg, ProcessQueueManager processQueueManager) { - this.dao = new Dao(dbCfg); + public WaitProcessFinishHandler(ProcessQueueManager processQueueManager) { this.processQueueManager = processQueueManager; } @@ -62,64 +53,15 @@ public WaitType getType() { @Override @WithTimer public Result process(ProcessKey processKey, ProcessCompletionCondition wait) { - Set finishedStatuses = wait.finalStatuses(); Set awaitProcesses = wait.processes(); - if (awaitProcesses.isEmpty()) { - return Result.resume(wait.resumeEvent()); - } - - Set finishedProcesses = dao.findFinished(awaitProcesses, finishedStatuses); - if (finishedProcesses.isEmpty()) { + if (!awaitProcesses.isEmpty()) { return Result.of(wait); } - boolean completed = isCompleted(wait.completeCondition(), awaitProcesses, finishedProcesses); - if (completed) { - if (wait.resumeEvent() != null) { - return Result.resume(wait.resumeEvent()); - } else { - return Result.action(tx -> processQueueManager.updateExpectedStatus(tx, processKey, ProcessStatus.WAITING, ProcessStatus.ENQUEUED)); - } - } - - List processes = new ArrayList<>(awaitProcesses); - processes.removeAll(finishedProcesses); - - return Result.of( - ProcessCompletionCondition.builder().from(wait) - .processes(processes) - .reason(wait.reason()) - .build()); - } - - private static boolean isCompleted(CompleteCondition condition, Set awaitProcesses, Set finishedProcesses) { - switch (condition) { - case ALL: { - return awaitProcesses.size() == finishedProcesses.size(); - } - case ONE_OF: { - return !finishedProcesses.isEmpty(); - } - default: - throw new IllegalArgumentException("Unknown condition type: " + condition); - } - } - - private static final class Dao extends AbstractDao { - - private Dao(@MainDB Configuration cfg) { - super(cfg); - } - - public Set findFinished(Set awaitProcesses, Set finishedStatuses) { - return txResult(tx -> { - ProcessQueue q = PROCESS_QUEUE.as("q"); - return tx.select(q.INSTANCE_ID) - .from(q) - .where(q.INSTANCE_ID.in(awaitProcesses) - .and(q.CURRENT_STATUS.in(finishedStatuses))) - .fetchSet(q.INSTANCE_ID); - }); + if (wait.resumeEvent() != null) { + return Result.resume(wait.resumeEvent()); + } else { + return Result.action(tx -> processQueueManager.updateExpectedStatus(tx, processKey, ProcessStatus.WAITING, ProcessStatus.ENQUEUED)); } } }