Skip to content

Commit

Permalink
server: synchronous processing of process completion wait conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
brig committed Mar 20, 2024
1 parent e56be84 commit 3e5e262
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,6 @@
<include file="v1.104.0.xml" relativeToChangelogFile="true"/>
<include file="v2.8.0.xml" relativeToChangelogFile="true"/>
<include file="v2.9.0.xml" relativeToChangelogFile="true"/>
<include file="v2.10.0.xml" relativeToChangelogFile="true"/>

</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet id="210000" author="[email protected]" runInTransaction="false">
<sql>
create index concurrently IDX_WAIT_CONDITIONS on PROCESS_WAIT_CONDITIONS using gin (WAIT_CONDITIONS)
</sql>
</changeSet>

</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void configure(Binder binder) {

newSetBinder(binder, ProcessStatusListener.class).addBinding().to(WaitProcessStatusListener.class);
newSetBinder(binder, ProcessStatusListener.class).addBinding().to(ExternalProcessListenerHandler.class);
newSetBinder(binder, ProcessStatusListener.class).addBinding().to(XYZProcessStatusListener.class);

newSetBinder(binder, Filter.class).addBinding().to(ConcurrentProcessFilter.class);
newSetBinder(binder, Filter.class).addBinding().to(ExclusiveProcessFilter.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
* =====
*/

import com.walmartlabs.concord.db.AbstractDao;
import com.walmartlabs.concord.db.MainDB;
import com.walmartlabs.concord.server.jooq.tables.ProcessQueue;
import com.walmartlabs.concord.server.process.queue.ProcessQueueManager;
import com.walmartlabs.concord.server.sdk.ProcessKey;
import com.walmartlabs.concord.server.sdk.ProcessStatus;
Expand All @@ -31,26 +29,19 @@

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;

import static com.walmartlabs.concord.server.jooq.tables.ProcessQueue.PROCESS_QUEUE;
import static com.walmartlabs.concord.server.process.waits.ProcessCompletionCondition.CompleteCondition;

/**
* Handles the processes that are waiting for other processes to finish.
*/
@Singleton
public class WaitProcessFinishHandler implements ProcessWaitHandler<ProcessCompletionCondition> {

private final Dao dao;
private final ProcessQueueManager processQueueManager;

@Inject
public WaitProcessFinishHandler(@MainDB Configuration dbCfg, ProcessQueueManager processQueueManager) {
this.dao = new Dao(dbCfg);
public WaitProcessFinishHandler(ProcessQueueManager processQueueManager) {
this.processQueueManager = processQueueManager;
}

Expand All @@ -62,64 +53,15 @@ public WaitType getType() {
@Override
@WithTimer
public Result<ProcessCompletionCondition> process(ProcessKey processKey, ProcessCompletionCondition wait) {
Set<ProcessStatus> finishedStatuses = wait.finalStatuses();
Set<UUID> awaitProcesses = wait.processes();
if (awaitProcesses.isEmpty()) {
return Result.resume(wait.resumeEvent());
}

Set<UUID> finishedProcesses = dao.findFinished(awaitProcesses, finishedStatuses);
if (finishedProcesses.isEmpty()) {
if (!awaitProcesses.isEmpty()) {
return Result.of(wait);
}

boolean completed = isCompleted(wait.completeCondition(), awaitProcesses, finishedProcesses);
if (completed) {
if (wait.resumeEvent() != null) {
return Result.resume(wait.resumeEvent());
} else {
return Result.action(tx -> processQueueManager.updateExpectedStatus(tx, processKey, ProcessStatus.WAITING, ProcessStatus.ENQUEUED));
}
}

List<UUID> processes = new ArrayList<>(awaitProcesses);
processes.removeAll(finishedProcesses);

return Result.of(
ProcessCompletionCondition.builder().from(wait)
.processes(processes)
.reason(wait.reason())
.build());
}

private static boolean isCompleted(CompleteCondition condition, Set<UUID> awaitProcesses, Set<UUID> finishedProcesses) {
switch (condition) {
case ALL: {
return awaitProcesses.size() == finishedProcesses.size();
}
case ONE_OF: {
return !finishedProcesses.isEmpty();
}
default:
throw new IllegalArgumentException("Unknown condition type: " + condition);
}
}

private static final class Dao extends AbstractDao {

private Dao(@MainDB Configuration cfg) {
super(cfg);
}

public Set<UUID> findFinished(Set<UUID> awaitProcesses, Set<ProcessStatus> finishedStatuses) {
return txResult(tx -> {
ProcessQueue q = PROCESS_QUEUE.as("q");
return tx.select(q.INSTANCE_ID)
.from(q)
.where(q.INSTANCE_ID.in(awaitProcesses)
.and(q.CURRENT_STATUS.in(finishedStatuses)))
.fetchSet(q.INSTANCE_ID);
});
if (wait.resumeEvent() != null) {
return Result.resume(wait.resumeEvent());
} else {
return Result.action(tx -> processQueueManager.updateExpectedStatus(tx, processKey, ProcessStatus.WAITING, ProcessStatus.ENQUEUED));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.walmartlabs.concord.server.process.waits;

/*-
* *****
* Concord
* -----
* Copyright (C) 2017 - 2024 Walmart Inc.
* -----
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =====
*/

import com.walmartlabs.concord.server.process.queue.ProcessStatusListener;
import com.walmartlabs.concord.server.sdk.ProcessKey;
import com.walmartlabs.concord.server.sdk.ProcessStatus;
import org.jooq.DSLContext;

public class XYZProcessStatusListener implements ProcessStatusListener {

@Override
public void onStatusChange(DSLContext tx, ProcessKey processKey, ProcessStatus status) {
switch (status) {
case SUSPENDED:
case FINISHED:
case FAILED:
case CANCELLED:
case TIMED_OUT:
updateWaitConditions(tx, processKey, status);
break;
default:
// do nothing
}
}

private static void updateWaitConditions(DSLContext tx, ProcessKey processKey, ProcessStatus status) {
String sql = """
UPDATE process_wait_conditions
SET wait_conditions =
(SELECT jsonb_agg(
CASE
WHEN obj->>'type' = 'PROCESS_COMPLETION' and obj->>'completeCondition' = 'ALL'
THEN jsonb_set(obj, '{processes}', (obj->'processes') - ?)
WHEN obj->>'type' = 'PROCESS_COMPLETION' and obj->>'completeCondition' = 'ONE_OF' and obj->'processes' ?? ?
THEN jsonb_set(obj, '{processes}', '[]')
ELSE obj
END
)
FROM jsonb_array_elements(wait_conditions) obj),
version = version + 1
WHERE wait_conditions @> ?::jsonb;
""";

String jsonMatch = String.format("[{\"type\": \"PROCESS_COMPLETION\", \"finalStatuses\": [\"%s\"], \"processes\": [\"%s\"]}]", status.name(), processKey.getInstanceId().toString());

tx.execute(sql, processKey.getInstanceId().toString(), processKey.getInstanceId().toString(), jsonMatch);
}
}

0 comments on commit 3e5e262

Please sign in to comment.