From 55a7855f03329dbc99aee44e234b28f1128714b3 Mon Sep 17 00:00:00 2001 From: Ivan Bodrov Date: Sun, 22 Dec 2024 17:18:53 -0500 Subject: [PATCH] concord-server-sdk: initial support for pipeline events --- .../concord/server/events/EventModule.java | 2 + .../pipelines/EnqueueProcessPipeline.java | 47 +++++++++++++++-- .../server/sdk/events/PipelineEvent.java | 52 +++++++++++++++++++ .../sdk/events/PipelineEventListener.java | 29 +++++++++++ 4 files changed, 127 insertions(+), 3 deletions(-) create mode 100644 server/sdk/src/main/java/com/walmartlabs/concord/server/sdk/events/PipelineEvent.java create mode 100644 server/sdk/src/main/java/com/walmartlabs/concord/server/sdk/events/PipelineEventListener.java diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/events/EventModule.java b/server/impl/src/main/java/com/walmartlabs/concord/server/events/EventModule.java index 4e7931c3b2..b2db3a4316 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/events/EventModule.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/events/EventModule.java @@ -26,6 +26,7 @@ import com.walmartlabs.concord.server.events.externalevent.ExternalEventTriggerV1Processor; import com.walmartlabs.concord.server.events.externalevent.ExternalEventTriggerV2Processor; import com.walmartlabs.concord.server.events.github.GithubTriggerProcessor; +import com.walmartlabs.concord.server.sdk.events.PipelineEventListener; import com.walmartlabs.concord.server.sdk.events.ProcessEventListener; import static com.google.inject.Scopes.SINGLETON; @@ -42,6 +43,7 @@ public void configure(Binder binder) { binder.bind(TriggerProcessExecutor.class).in(SINGLETON); newSetBinder(binder, ProcessEventListener.class); + newSetBinder(binder, PipelineEventListener.class); binder.bind(GithubTriggerProcessor.class).in(SINGLETON); newSetBinder(binder, GithubTriggerProcessor.EventEnricher.class).addBinding().to(GithubTriggerProcessor.RepositoryInfoEnricher.class); diff --git a/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/EnqueueProcessPipeline.java b/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/EnqueueProcessPipeline.java index 0fdd52d24b..8c270168a1 100644 --- a/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/EnqueueProcessPipeline.java +++ b/server/impl/src/main/java/com/walmartlabs/concord/server/process/pipelines/EnqueueProcessPipeline.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,11 +21,19 @@ */ import com.google.inject.Injector; +import com.walmartlabs.concord.server.process.Payload; import com.walmartlabs.concord.server.process.pipelines.processors.*; +import com.walmartlabs.concord.server.sdk.events.PipelineEvent; +import com.walmartlabs.concord.server.sdk.events.PipelineEventListener; import javax.inject.Inject; import javax.inject.Named; import java.util.List; +import java.util.Set; + +import static com.walmartlabs.concord.server.sdk.events.PipelineEvent.EventType.ENQUEUE_PROCESS_PIPELINE_END; +import static com.walmartlabs.concord.server.sdk.events.PipelineEvent.EventType.ENQUEUE_PROCESS_PIPELINE_START; +import static java.util.Objects.requireNonNull; /** * Handles NEW "regular" processes. Puts the processes into the ENQUEUED status. @@ -36,10 +44,12 @@ public class EnqueueProcessPipeline extends Pipeline { private final ExceptionProcessor exceptionProcessor; private final FinalizerProcessor finalizerProcessor; + private final Set eventListeners; @Inject public EnqueueProcessPipeline(Injector injector, - CustomEnqueueProcessors customProcessors) { + CustomEnqueueProcessors customProcessors, + Set eventListeners) { super(List.of( injector.getInstance(LoggingMDCProcessor.class), injector.getInstance(PayloadRestoreProcessor.class), @@ -76,6 +86,37 @@ public EnqueueProcessPipeline(Injector injector, this.exceptionProcessor = injector.getInstance(FailProcessor.class); this.finalizerProcessor = injector.getInstance(CleanupProcessor.class); + + this.eventListeners = requireNonNull(eventListeners); + } + + @Override + public Payload process(Payload payload) { + var startEvent = PipelineEvent.builder() + .eventType(ENQUEUE_PROCESS_PIPELINE_START) + .processKey(payload.getProcessKey()) + .build(); + eventListeners.forEach(l -> l.onPipelineEvent(startEvent)); + + try { + var result = super.process(payload); + + var endEvent = PipelineEvent.builder() + .eventType(ENQUEUE_PROCESS_PIPELINE_END) + .processKey(payload.getProcessKey()) + .build(); + eventListeners.forEach(l -> l.onPipelineEvent(endEvent)); + + return result; + } catch (Throwable error) { + var errorEvent = PipelineEvent.builder() + .eventType(ENQUEUE_PROCESS_PIPELINE_END) + .processKey(payload.getProcessKey()) + .build(); + eventListeners.forEach(l -> l.onPipelineError(errorEvent, error)); + + throw error; + } } @Override diff --git a/server/sdk/src/main/java/com/walmartlabs/concord/server/sdk/events/PipelineEvent.java b/server/sdk/src/main/java/com/walmartlabs/concord/server/sdk/events/PipelineEvent.java new file mode 100644 index 0000000000..bcf562ec6f --- /dev/null +++ b/server/sdk/src/main/java/com/walmartlabs/concord/server/sdk/events/PipelineEvent.java @@ -0,0 +1,52 @@ +package com.walmartlabs.concord.server.sdk.events; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc. + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.walmartlabs.concord.server.sdk.ProcessKey; +import org.immutables.value.Value; + +import java.io.Serial; +import java.io.Serializable; + +@Value.Immutable +@Value.Style(jdkOnly = true) +@JsonSerialize(as = ImmutablePipelineEvent.class) +@JsonDeserialize(as = ImmutablePipelineEvent.class) +public interface PipelineEvent extends Serializable { + + @Serial + long serialVersionUID = 1L; + + ProcessKey processKey(); + + EventType eventType(); + + enum EventType { + ENQUEUE_PROCESS_PIPELINE_START, + ENQUEUE_PROCESS_PIPELINE_END + } + + static ImmutablePipelineEvent.Builder builder() { + return ImmutablePipelineEvent.builder(); + } +} diff --git a/server/sdk/src/main/java/com/walmartlabs/concord/server/sdk/events/PipelineEventListener.java b/server/sdk/src/main/java/com/walmartlabs/concord/server/sdk/events/PipelineEventListener.java new file mode 100644 index 0000000000..570734e719 --- /dev/null +++ b/server/sdk/src/main/java/com/walmartlabs/concord/server/sdk/events/PipelineEventListener.java @@ -0,0 +1,29 @@ +package com.walmartlabs.concord.server.sdk.events; + +/*- + * ***** + * Concord + * ----- + * Copyright (C) 2017 - 2024 Walmart Inc. + * ----- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ===== + */ + +public interface PipelineEventListener { + + void onPipelineEvent(PipelineEvent event); + + default void onPipelineError(PipelineEvent event, Throwable error) { + } +}