Skip to content

Commit

Permalink
Jira source plugin (#5125)
Browse files Browse the repository at this point in the history
Jira source plugin (#5125)

Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Maxwell Brown <[email protected]>
Signed-off-by: Maxwell Brown <[email protected]>
Co-authored-by: Maxwell Brown <[email protected]>
Co-authored-by: Maxwell Brown <[email protected]>
  • Loading branch information
3 people authored Nov 4, 2024
1 parent 5108528 commit 92428be
Show file tree
Hide file tree
Showing 57 changed files with 3,455 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@

# Metrics

### Counter

- `issuesRequested`: measures total number of issue Requests sent.

### Timer

- `requestProcessDuration`: measures latency of requests processed by the jira source plugin.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {
implementation 'org.projectlombok:lombok:1.18.30'
annotationProcessor 'org.projectlombok:lombok:1.18.30'

testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4'

implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
package org.opensearch.dataprepper.plugins.source.jira;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT;

/**
* This class represents a Jira client.
Expand All @@ -21,25 +39,91 @@
public class JiraClient implements CrawlerClient {

private static final Logger log = LoggerFactory.getLogger(JiraClient.class);
private final JiraService service;
private final JiraIterator jiraIterator;
private final ExecutorService executorService;
private final CrawlerSourceConfig configuration;
private final int bufferWriteTimeoutInSeconds = 10;
private ObjectMapper objectMapper = new ObjectMapper();
private Instant lastPollTime;

public JiraClient() {
public JiraClient(JiraService service,
JiraIterator jiraIterator,
PluginExecutorServiceProvider executorServiceProvider,
JiraSourceConfig sourceConfig) {
this.service = service;
this.jiraIterator = jiraIterator;
this.executorService = executorServiceProvider.get();
this.configuration = sourceConfig;
}


@Override
public Iterator<ItemInfo> listItems() {
return null;
jiraIterator.initialize(lastPollTime);
return jiraIterator;
}

@Override
public void setLastPollTime(Instant lastPollTime) {
log.info("Setting the lastPollTime: {}", lastPollTime);
log.trace("Setting the lastPollTime: {}", lastPollTime);
this.lastPollTime = lastPollTime;
}

@VisibleForTesting
void injectObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, CrawlerSourceConfig configuration) {
log.info("Logic for executing the partitions");
public void executePartition(SaasWorkerProgressState state,
Buffer<Record<Event>> buffer,
CrawlerSourceConfig configuration) {
log.trace("Executing the partition: {} with {} ticket(s)",
state.getKeyAttributes(), state.getItemIds().size());
List<String> itemIds = state.getItemIds();
Map<String, Object> keyAttributes = state.getKeyAttributes();
String project = (String) keyAttributes.get(PROJECT);
Instant eventTime = state.getExportStartTime();
List<ItemInfo> itemInfos = new ArrayList<>();
for (String itemId : itemIds) {
if (itemId == null) {
continue;
}
ItemInfo itemInfo = JiraItemInfo.builder()
.withItemId(itemId)
.withId(itemId)
.withProject(project)
.withEventTime(eventTime)
.withMetadata(keyAttributes).build();
itemInfos.add(itemInfo);
}

String eventType = EventType.DOCUMENT.toString();
List<Record<Event>> recordsToWrite = itemInfos
.parallelStream()
.map(t -> (Supplier<String>) (() -> service.getIssue(t.getId())))
.map(supplier -> supplyAsync(supplier, this.executorService))
.map(CompletableFuture::join)
.map(ticketJson -> {
try {
return objectMapper.readValue(ticketJson, new TypeReference<>() {
});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.map(t -> (Event) JacksonEvent.builder()
.withEventType(eventType)
.withData(t)
.build())
.map(event -> new Record<>(event))
.collect(Collectors.toList());

try {
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
} catch (Exception e) {
throw new RuntimeException(e);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package org.opensearch.dataprepper.plugins.source.jira;

import lombok.Getter;
import lombok.Setter;
import org.opensearch.dataprepper.plugins.source.jira.models.IssueBean;
import org.opensearch.dataprepper.plugins.source.jira.utils.Constants;
import org.opensearch.dataprepper.plugins.source.jira.utils.JiraContentType;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.opensearch.dataprepper.plugins.source.jira.JiraService.CONTENT_TYPE;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.CREATED;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.ISSUE_KEY;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT_KEY;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT_NAME;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.UPDATED;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants._ISSUE;

@Setter
@Getter
public class JiraItemInfo implements ItemInfo {
private String project;
private String issueType;
private String id;
private String itemId;
private Map<String, Object> metadata;
private Instant eventTime;

public JiraItemInfo(String id,
String itemId,
String project,
String issueType,
Map<String, Object> metadata,
Instant eventTime
) {
this.id = id;
this.project = project;
this.issueType = issueType;
this.itemId = itemId;
this.metadata = metadata;
this.eventTime = eventTime;
}

public static JiraItemInfoBuilder builder() {
return new JiraItemInfoBuilder();
}

@Override
public String getPartitionKey() {
return project + "|" + issueType + "|" + UUID.randomUUID();
}

@Override
public String getId() {
return id;
}

@Override
public Map<String, Object> getKeyAttributes() {
return Map.of(Constants.PROJECT, project);
}

@Override
public Instant getLastModifiedAt() {
long updatedAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.UPDATED, "0"));
long createdAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.CREATED, "0"));
return createdAtMillis > updatedAtMillis ?
Instant.ofEpochMilli(createdAtMillis) : Instant.ofEpochMilli(updatedAtMillis);
}

public static class JiraItemInfoBuilder {
private Map<String, Object> metadata;
private Instant eventTime;
private String id;
private String itemId;
private String project;
private String issueType;

public JiraItemInfoBuilder() {
}

public JiraItemInfo build() {
return new JiraItemInfo(id, itemId, project, issueType, metadata, eventTime);
}

public JiraItemInfoBuilder withMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
return this;
}

public JiraItemInfoBuilder withEventTime(Instant eventTime) {
this.eventTime = eventTime;
return this;
}

public JiraItemInfoBuilder withItemId(String itemId) {
this.itemId = itemId;
return this;
}

public JiraItemInfoBuilder withId(String id) {
this.id = id;
return this;
}

public JiraItemInfoBuilder withProject(String project) {
this.project = project;
return this;
}

public JiraItemInfoBuilder withIssueType(String issueType) {
this.issueType = issueType;
return this;
}

public JiraItemInfoBuilder withIssueBean(IssueBean issue) {
Map<String, Object> issueMetadata = new HashMap<>();
issueMetadata.put(PROJECT_KEY, issue.getProject());
issueMetadata.put(PROJECT_NAME, issue.getProjectName());
issueMetadata.put(CREATED, issue.getCreatedTimeMillis());
issueMetadata.put(UPDATED, issue.getUpdatedTimeMillis());
issueMetadata.put(ISSUE_KEY, issue.getKey());
issueMetadata.put(CONTENT_TYPE, JiraContentType.ISSUE.getType());

this.project = issue.getProject();
this.id = issue.getKey();
this.issueType = JiraContentType.ISSUE.getType();
this.itemId = _ISSUE + issueMetadata.get(PROJECT_KEY) + "-" + issue.getKey();
this.metadata = issueMetadata;
return this;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.opensearch.dataprepper.plugins.source.jira;


import lombok.Setter;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Instant;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;

@Named
public class JiraIterator implements Iterator<ItemInfo> {

private static final Logger log = LoggerFactory.getLogger(JiraIterator.class);
private final JiraSourceConfig sourceConfig;
private final JiraService service;
private final ExecutorService crawlerTaskExecutor;
@Setter
private long crawlerQWaitTimeMillis = 2000;
private Queue<ItemInfo> itemInfoQueue;
private Instant lastPollTime;
private boolean firstTime = true;

public JiraIterator(final JiraService service,
PluginExecutorServiceProvider executorServiceProvider,
JiraSourceConfig sourceConfig) {
this.service = service;
this.crawlerTaskExecutor = executorServiceProvider.get();
this.sourceConfig = sourceConfig;
}

@Override
public boolean hasNext() {
if (firstTime) {
log.info("Crawling has been started");
itemInfoQueue = service.getJiraEntities(sourceConfig, lastPollTime);
firstTime = false;
}
return !this.itemInfoQueue.isEmpty();
}

@Override
public ItemInfo next() {
return this.itemInfoQueue.remove();
}

/**
* Initialize.
*
* @param jiraChangeLogToken the jira change log token
*/
public void initialize(Instant jiraChangeLogToken) {
this.itemInfoQueue = new ConcurrentLinkedQueue<>();
this.lastPollTime = jiraChangeLogToken;
this.firstTime = true;
}

}
Loading

0 comments on commit 92428be

Please sign in to comment.