-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Jira source plugin #5125
Jira source plugin #5125
Changes from 71 commits
57b8ae4
9a7e8fb
1e4abc2
ad57856
f512f68
4677ddc
8c8a269
ad18c94
6539bb9
d18b212
9da83ae
14a9328
deb8198
6199581
2b41090
053a7d3
702ad64
7fff2a2
2679c8e
13e3fc6
1542658
25ca17b
00fb4ee
ea456e4
c29d656
8fa8cee
0ad89b2
f1c265f
844d20c
6ad0ac1
86778af
391fe37
cf805ad
a067769
95a1ffd
df6eeca
205ffc1
2cee7bc
dd90582
b323f0a
17202dc
e848c3a
6cd8279
249c0c7
9db74e2
e644fe3
2bf5e0e
782ea38
97b55c7
d4e0cdb
f6dd991
3a615e6
e3ac8bc
e18a2c1
cf2684f
a0c854b
ab615d5
1dd613f
cc7ddec
658bbf5
28aaa06
9d698e8
a8d8dcc
1553022
06d64ef
6e2cc23
58dd7e7
d45dfc5
4f5bf1f
d33c493
e3843d5
8386377
a719188
e760d3a
4270988
6d4e3cf
1600e9c
3e5d83a
d9c555d
db0d26e
d3c3a33
1590886
bea45c8
6e70427
13f56d2
a8c3218
3c15223
51c24c1
80f6e19
7f1dfae
9a7b3ad
21a7c3a
9336a1b
f564a98
545bdf1
58675e7
8237a45
1b59110
816f5b8
b5945a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,9 @@ | ||
|
||
# Metrics | ||
|
||
### Counter | ||
|
||
- `issuesRequested`: measures total number of issue Requests sent. | ||
|
||
### Timer | ||
|
||
- `requestProcessDuration`: measures latency of requests processed by the jira source plugin. |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -1,18 +1,35 @@ | ||||
package org.opensearch.dataprepper.plugins.source.jira; | ||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException; | ||||
import com.fasterxml.jackson.core.type.TypeReference; | ||||
import com.fasterxml.jackson.databind.ObjectMapper; | ||||
import com.google.common.annotations.VisibleForTesting; | ||||
import org.opensearch.dataprepper.model.buffer.Buffer; | ||||
import org.opensearch.dataprepper.model.event.Event; | ||||
import org.opensearch.dataprepper.model.event.JacksonEvent; | ||||
import org.opensearch.dataprepper.model.record.Record; | ||||
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerClient; | ||||
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig; | ||||
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; | ||||
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; | ||||
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; | ||||
import org.slf4j.Logger; | ||||
import org.slf4j.LoggerFactory; | ||||
|
||||
import javax.inject.Named; | ||||
import java.time.Duration; | ||||
import java.time.Instant; | ||||
import java.util.ArrayList; | ||||
import java.util.Iterator; | ||||
import java.util.List; | ||||
import java.util.Map; | ||||
import java.util.concurrent.CompletableFuture; | ||||
import java.util.concurrent.ExecutorService; | ||||
import java.util.function.Supplier; | ||||
import java.util.stream.Collectors; | ||||
|
||||
import static java.util.concurrent.CompletableFuture.supplyAsync; | ||||
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT; | ||||
|
||||
/** | ||||
* This class represents a Jira client. | ||||
|
@@ -21,25 +38,90 @@ | |||
public class JiraClient implements CrawlerClient { | ||||
|
||||
private static final Logger log = LoggerFactory.getLogger(JiraClient.class); | ||||
private final JiraService service; | ||||
private final JiraIterator jiraIterator; | ||||
private final ExecutorService executorService; | ||||
private final CrawlerSourceConfig configuration; | ||||
private final int bufferWriteTimeoutInSeconds = 10; | ||||
private ObjectMapper objectMapper = new ObjectMapper(); | ||||
private Instant lastPollTime; | ||||
|
||||
public JiraClient() { | ||||
public JiraClient(JiraService service, | ||||
JiraIterator jiraIterator, | ||||
PluginExecutorServiceProvider executorServiceProvider, | ||||
JiraSourceConfig sourceConfig) { | ||||
this.service = service; | ||||
this.jiraIterator = jiraIterator; | ||||
this.executorService = executorServiceProvider.get(); | ||||
this.configuration = sourceConfig; | ||||
} | ||||
|
||||
|
||||
@Override | ||||
public Iterator<ItemInfo> listItems() { | ||||
return null; | ||||
jiraIterator.initialize(lastPollTime); | ||||
return jiraIterator; | ||||
} | ||||
|
||||
@Override | ||||
public void setLastPollTime(Instant lastPollTime) { | ||||
log.info("Setting the lastPollTime: {}", lastPollTime); | ||||
log.trace("Setting the lastPollTime: {}", lastPollTime); | ||||
this.lastPollTime = lastPollTime; | ||||
} | ||||
|
||||
@VisibleForTesting | ||||
void injectObjectMapper(ObjectMapper objectMapper) { | ||||
this.objectMapper = objectMapper; | ||||
} | ||||
|
||||
@Override | ||||
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, CrawlerSourceConfig configuration) { | ||||
log.info("Logic for executing the partitions"); | ||||
public void executePartition(SaasWorkerProgressState state, | ||||
Buffer<Record<Event>> buffer, | ||||
CrawlerSourceConfig configuration) { | ||||
log.trace("Executing the partition: {} with {} ticket(s)", | ||||
state.getKeyAttributes(), state.getItemIds().size()); | ||||
List<String> itemIds = state.getItemIds(); | ||||
Map<String, Object> keyAttributes = state.getKeyAttributes(); | ||||
String project = (String) keyAttributes.get(PROJECT); | ||||
Instant eventTime = state.getExportStartTime(); | ||||
List<ItemInfo> itemInfos = new ArrayList<>(); | ||||
for (String itemId : itemIds) { | ||||
if (itemId == null) { | ||||
continue; | ||||
} | ||||
ItemInfo itemInfo = JiraItemInfo.builder() | ||||
.withItemId(itemId) | ||||
.withId(itemId) | ||||
.withProject(project) | ||||
.withEventTime(eventTime) | ||||
.withMetadata(keyAttributes).build(); | ||||
itemInfos.add(itemInfo); | ||||
} | ||||
|
||||
List<Record<Event>> recordsToWrite = itemInfos | ||||
.parallelStream() | ||||
.map(t -> (Supplier<String>) (() -> service.getIssue(t.getId()))) | ||||
.map(supplier -> supplyAsync(supplier, this.executorService)) | ||||
.map(CompletableFuture::join) | ||||
.map(ticketJson -> { | ||||
try { | ||||
return objectMapper.readValue(ticketJson, new TypeReference<>() { | ||||
}); | ||||
} catch (JsonProcessingException e) { | ||||
throw new RuntimeException(e); | ||||
} | ||||
}) | ||||
.map(t -> (Event) JacksonEvent.builder() | ||||
.withEventType("Ticket") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use Document here. And use the constant value. data-prepper/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventType.java Line 23 in 5108528
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed it to use |
||||
.withData(t) | ||||
.build()) | ||||
.map(event -> new Record<>(event)) | ||||
.collect(Collectors.toList()); | ||||
|
||||
try { | ||||
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis()); | ||||
} catch (Exception e) { | ||||
throw new RuntimeException(e); | ||||
} | ||||
|
||||
} | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package org.opensearch.dataprepper.plugins.source.jira; | ||
|
||
import lombok.Getter; | ||
import lombok.Setter; | ||
import org.opensearch.dataprepper.plugins.source.jira.utils.Constants; | ||
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; | ||
|
||
import java.time.Instant; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
|
||
@Setter | ||
@Getter | ||
public class JiraItemInfo implements ItemInfo { | ||
private String project; | ||
private String issueType; | ||
private String id; | ||
private String itemId; | ||
private Map<String, Object> metadata; | ||
private Instant eventTime; | ||
|
||
public JiraItemInfo(String id, | ||
String itemId, | ||
String project, | ||
String issueType, | ||
Map<String, Object> metadata, | ||
Instant eventTime | ||
) { | ||
this.id = id; | ||
this.project = project; | ||
this.issueType = issueType; | ||
this.itemId = itemId; | ||
this.metadata = metadata; | ||
this.eventTime = eventTime; | ||
} | ||
|
||
public static JiraItemInfoBuilder builder() { | ||
return new JiraItemInfoBuilder(); | ||
} | ||
|
||
@Override | ||
public String getPartitionKey() { | ||
return project + "|" + issueType + "|" + UUID.randomUUID(); | ||
} | ||
|
||
@Override | ||
public String getId() { | ||
return id; | ||
} | ||
|
||
@Override | ||
public Map<String, Object> getKeyAttributes() { | ||
return Map.of(Constants.PROJECT, project); | ||
} | ||
|
||
@Override | ||
public Instant getLastModifiedAt() { | ||
long updatedAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.UPDATED, "0")); | ||
long createdAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.CREATED, "0")); | ||
return createdAtMillis > updatedAtMillis ? | ||
Instant.ofEpochMilli(createdAtMillis) : Instant.ofEpochMilli(updatedAtMillis); | ||
} | ||
|
||
public static class JiraItemInfoBuilder { | ||
private Map<String, Object> metadata; | ||
private Instant eventTime; | ||
private String id; | ||
private String itemId; | ||
private String project; | ||
private String issueType; | ||
|
||
public JiraItemInfoBuilder() { | ||
} | ||
|
||
public JiraItemInfo build() { | ||
return new JiraItemInfo(id, itemId, project, issueType, metadata, eventTime); | ||
} | ||
|
||
public JiraItemInfoBuilder withMetadata(Map<String, Object> metadata) { | ||
this.metadata = metadata; | ||
return this; | ||
} | ||
|
||
public JiraItemInfoBuilder withEventTime(Instant eventTime) { | ||
this.eventTime = eventTime; | ||
return this; | ||
} | ||
|
||
public JiraItemInfoBuilder withItemId(String itemId) { | ||
this.itemId = itemId; | ||
return this; | ||
} | ||
|
||
public JiraItemInfoBuilder withId(String id) { | ||
this.id = id; | ||
return this; | ||
} | ||
|
||
public JiraItemInfoBuilder withProject(String project) { | ||
this.project = project; | ||
return this; | ||
} | ||
|
||
public JiraItemInfoBuilder withIssueType(String issueType) { | ||
this.issueType = issueType; | ||
return this; | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,99 @@ | ||||||
package org.opensearch.dataprepper.plugins.source.jira; | ||||||
|
||||||
|
||||||
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider; | ||||||
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; | ||||||
import org.slf4j.Logger; | ||||||
import org.slf4j.LoggerFactory; | ||||||
|
||||||
import javax.inject.Named; | ||||||
import java.time.Instant; | ||||||
import java.util.ArrayList; | ||||||
import java.util.Iterator; | ||||||
import java.util.List; | ||||||
import java.util.Queue; | ||||||
import java.util.concurrent.ConcurrentLinkedQueue; | ||||||
import java.util.concurrent.ExecutorService; | ||||||
import java.util.concurrent.Future; | ||||||
|
||||||
@Named | ||||||
public class JiraIterator implements Iterator<ItemInfo> { | ||||||
|
||||||
private static final int HAS_NEXT_TIMEOUT = 60; | ||||||
private static final Logger log = LoggerFactory.getLogger(JiraIterator.class); | ||||||
private final JiraSourceConfig sourceConfig; | ||||||
private final JiraService service; | ||||||
private final ExecutorService crawlerTaskExecutor; | ||||||
private final List<Future<Boolean>> futureList = new ArrayList<>(); | ||||||
private Queue<ItemInfo> itemInfoQueue; | ||||||
private Instant lastPollTime; | ||||||
private boolean firstTime = true; | ||||||
|
||||||
public JiraIterator(final JiraService service, | ||||||
PluginExecutorServiceProvider executorServiceProvider, | ||||||
JiraSourceConfig sourceConfig) { | ||||||
this.service = service; | ||||||
this.crawlerTaskExecutor = executorServiceProvider.get(); | ||||||
this.sourceConfig = sourceConfig; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public boolean hasNext() { | ||||||
if (firstTime) { | ||||||
log.info("Crawling has been started"); | ||||||
startCrawlerThreads(); | ||||||
firstTime = Boolean.FALSE; | ||||||
} | ||||||
int timeout = HAS_NEXT_TIMEOUT; | ||||||
while (isCrawlerRunning() | ||||||
&& itemInfoQueue.isEmpty() | ||||||
&& (timeout != 0)) { | ||||||
try { | ||||||
log.trace("Waiting for crawling queue to be filled for next 2 seconds."); | ||||||
Thread.sleep(2000); | ||||||
timeout--; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this "timeout" or "number of retries"? Looks like number of retries to me There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not retries. It is waiting for the Crawler thread to complete |
||||||
} catch (InterruptedException e) { | ||||||
log.error("An exception has occurred while checking for next document in crawling queue."); | ||||||
Thread.currentThread().interrupt(); | ||||||
} | ||||||
} | ||||||
|
||||||
return !this.itemInfoQueue.isEmpty(); | ||||||
} | ||||||
|
||||||
private boolean isCrawlerRunning() { | ||||||
boolean isRunning = Boolean.FALSE; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Also, change the lines below to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed |
||||||
if (!futureList.isEmpty()) { | ||||||
for (Future<Boolean> future : futureList) { | ||||||
if (!future.isDone()) { | ||||||
isRunning = Boolean.TRUE; | ||||||
break; | ||||||
} | ||||||
} | ||||||
} | ||||||
return isRunning; | ||||||
} | ||||||
|
||||||
|
||||||
private void startCrawlerThreads() { | ||||||
futureList.add(crawlerTaskExecutor.submit( | ||||||
() -> service.getJiraEntities(sourceConfig, lastPollTime, itemInfoQueue), false)); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public ItemInfo next() { | ||||||
return this.itemInfoQueue.remove(); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Initialize. | ||||||
* | ||||||
* @param jiraChangeLogToken the jira change log token | ||||||
*/ | ||||||
public void initialize(Instant jiraChangeLogToken) { | ||||||
this.itemInfoQueue = new ConcurrentLinkedQueue<>(); | ||||||
this.lastPollTime = jiraChangeLogToken; | ||||||
this.firstTime = true; | ||||||
} | ||||||
|
||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are moving toward EventFactory. Please fix this in a follow-on PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok 👍