Skip to content

Commit

Permalink
concord-server-sdk: initial support for pipeline events
Browse files Browse the repository at this point in the history
  • Loading branch information
ibodrov committed Dec 22, 2024
1 parent 5ab8d98 commit 55a7855
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.walmartlabs.concord.server.events.externalevent.ExternalEventTriggerV1Processor;
import com.walmartlabs.concord.server.events.externalevent.ExternalEventTriggerV2Processor;
import com.walmartlabs.concord.server.events.github.GithubTriggerProcessor;
import com.walmartlabs.concord.server.sdk.events.PipelineEventListener;
import com.walmartlabs.concord.server.sdk.events.ProcessEventListener;

import static com.google.inject.Scopes.SINGLETON;
Expand All @@ -42,6 +43,7 @@ public void configure(Binder binder) {
binder.bind(TriggerProcessExecutor.class).in(SINGLETON);

newSetBinder(binder, ProcessEventListener.class);
newSetBinder(binder, PipelineEventListener.class);

binder.bind(GithubTriggerProcessor.class).in(SINGLETON);
newSetBinder(binder, GithubTriggerProcessor.EventEnricher.class).addBinding().to(GithubTriggerProcessor.RepositoryInfoEnricher.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -21,11 +21,19 @@
*/

import com.google.inject.Injector;
import com.walmartlabs.concord.server.process.Payload;
import com.walmartlabs.concord.server.process.pipelines.processors.*;
import com.walmartlabs.concord.server.sdk.events.PipelineEvent;
import com.walmartlabs.concord.server.sdk.events.PipelineEventListener;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.List;
import java.util.Set;

import static com.walmartlabs.concord.server.sdk.events.PipelineEvent.EventType.ENQUEUE_PROCESS_PIPELINE_END;
import static com.walmartlabs.concord.server.sdk.events.PipelineEvent.EventType.ENQUEUE_PROCESS_PIPELINE_START;
import static java.util.Objects.requireNonNull;

/**
* Handles NEW "regular" processes. Puts the processes into the ENQUEUED status.
Expand All @@ -36,10 +44,12 @@ public class EnqueueProcessPipeline extends Pipeline {

private final ExceptionProcessor exceptionProcessor;
private final FinalizerProcessor finalizerProcessor;
private final Set<PipelineEventListener> eventListeners;

@Inject
public EnqueueProcessPipeline(Injector injector,
CustomEnqueueProcessors customProcessors) {
CustomEnqueueProcessors customProcessors,
Set<PipelineEventListener> eventListeners) {
super(List.of(
injector.getInstance(LoggingMDCProcessor.class),
injector.getInstance(PayloadRestoreProcessor.class),
Expand Down Expand Up @@ -76,6 +86,37 @@ public EnqueueProcessPipeline(Injector injector,

this.exceptionProcessor = injector.getInstance(FailProcessor.class);
this.finalizerProcessor = injector.getInstance(CleanupProcessor.class);

this.eventListeners = requireNonNull(eventListeners);
}

@Override
public Payload process(Payload payload) {
var startEvent = PipelineEvent.builder()
.eventType(ENQUEUE_PROCESS_PIPELINE_START)
.processKey(payload.getProcessKey())
.build();
eventListeners.forEach(l -> l.onPipelineEvent(startEvent));

try {
var result = super.process(payload);

var endEvent = PipelineEvent.builder()
.eventType(ENQUEUE_PROCESS_PIPELINE_END)
.processKey(payload.getProcessKey())
.build();
eventListeners.forEach(l -> l.onPipelineEvent(endEvent));

return result;
} catch (Throwable error) {
var errorEvent = PipelineEvent.builder()
.eventType(ENQUEUE_PROCESS_PIPELINE_END)
.processKey(payload.getProcessKey())
.build();
eventListeners.forEach(l -> l.onPipelineError(errorEvent, error));

throw error;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.walmartlabs.concord.server.sdk.events;

/*-
* *****
* Concord
* -----
* Copyright (C) 2017 - 2024 Walmart Inc.
* -----
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =====
*/

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.walmartlabs.concord.server.sdk.ProcessKey;
import org.immutables.value.Value;

import java.io.Serial;
import java.io.Serializable;

@Value.Immutable
@Value.Style(jdkOnly = true)
@JsonSerialize(as = ImmutablePipelineEvent.class)
@JsonDeserialize(as = ImmutablePipelineEvent.class)
public interface PipelineEvent extends Serializable {

@Serial
long serialVersionUID = 1L;

ProcessKey processKey();

EventType eventType();

enum EventType {
ENQUEUE_PROCESS_PIPELINE_START,
ENQUEUE_PROCESS_PIPELINE_END
}

static ImmutablePipelineEvent.Builder builder() {
return ImmutablePipelineEvent.builder();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.walmartlabs.concord.server.sdk.events;

/*-
* *****
* Concord
* -----
* Copyright (C) 2017 - 2024 Walmart Inc.
* -----
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =====
*/

public interface PipelineEventListener {

void onPipelineEvent(PipelineEvent event);

default void onPipelineError(PipelineEvent event, Throwable error) {
}
}

0 comments on commit 55a7855

Please sign in to comment.