Skip to content

Commit

Permalink
Functional confluence version
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Feb 1, 2025
1 parent b358db7 commit 6f40d65
Show file tree
Hide file tree
Showing 25 changed files with 472 additions and 713 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.stream.Collectors;

import static java.util.concurrent.CompletableFuture.supplyAsync;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.PROJECT;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.SPACE;

/**
* This class represents a Jira client.
Expand Down Expand Up @@ -93,7 +93,7 @@ public void executePartition(SaasWorkerProgressState state,
state.getKeyAttributes(), state.getItemIds().size());
List<String> itemIds = state.getItemIds();
Map<String, Object> keyAttributes = state.getKeyAttributes();
String project = (String) keyAttributes.get(PROJECT);
String project = (String) keyAttributes.get(SPACE);
Instant eventTime = state.getExportStartTime();
List<ItemInfo> itemInfos = new ArrayList<>();
for (String itemId : itemIds) {
Expand All @@ -103,7 +103,7 @@ public void executePartition(SaasWorkerProgressState state,
ItemInfo itemInfo = ConfluenceItemInfo.builder()
.withItemId(itemId)
.withId(itemId)
.withProject(project)
.withSpace(project)
.withEventTime(eventTime)
.withMetadata(keyAttributes).build();
itemInfos.add(itemInfo);
Expand All @@ -112,12 +112,12 @@ public void executePartition(SaasWorkerProgressState state,
String eventType = EventType.DOCUMENT.toString();
List<Record<Event>> recordsToWrite = itemInfos
.parallelStream()
.map(t -> (Supplier<String>) (() -> service.getIssue(t.getId())))
.map(t -> (Supplier<String>) (() -> service.getContent(t.getId())))
.map(supplier -> supplyAsync(supplier, this.executorService))
.map(CompletableFuture::join)
.map(ticketJson -> {
.map(contentJson -> {
try {
return objectMapper.readValue(ticketJson, new TypeReference<>() {
return objectMapper.readValue(contentJson, new TypeReference<>() {
});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import lombok.Getter;
import lombok.Setter;
import org.opensearch.dataprepper.plugins.source.confluence.models.IssueBean;
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluenceItem;
import org.opensearch.dataprepper.plugins.source.confluence.utils.ConfluenceContentType;
import org.opensearch.dataprepper.plugins.source.confluence.utils.Constants;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
Expand All @@ -22,33 +22,33 @@
import java.util.Map;
import java.util.UUID;

import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.CONTENT_ID;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.CONTENT_TITLE;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.CREATED;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.ISSUE_KEY;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.PROJECT_KEY;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.PROJECT_NAME;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.UPDATED;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants._ISSUE;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.LAST_MODIFIED;
import static org.opensearch.dataprepper.plugins.source.confluence.utils.Constants.SPACE_KEY;

@Setter
@Getter
public class ConfluenceItemInfo implements ItemInfo {
private String project;
private String issueType;
private String space;
// either a page or a BlogPost
private String contentType;
private String id;
private String itemId;
private Map<String, Object> metadata;
private Instant eventTime;

public ConfluenceItemInfo(String id,
String itemId,
String project,
String issueType,
String space,
String contentType,
Map<String, Object> metadata,
Instant eventTime
) {
this.id = id;
this.project = project;
this.issueType = issueType;
this.space = space;
this.contentType = contentType;
this.itemId = itemId;
this.metadata = metadata;
this.eventTime = eventTime;
Expand All @@ -60,7 +60,7 @@ public static ConfluenceItemInfoBuilder builder() {

@Override
public String getPartitionKey() {
return project + "|" + issueType + "|" + UUID.randomUUID();
return space + "|" + contentType + "|" + UUID.randomUUID();
}

@Override
Expand All @@ -70,12 +70,12 @@ public String getId() {

@Override
public Map<String, Object> getKeyAttributes() {
return Map.of(Constants.PROJECT, project);
return Map.of(Constants.SPACE, space);
}

@Override
public Instant getLastModifiedAt() {
long updatedAtMillis = getMetadataField(Constants.UPDATED);
long updatedAtMillis = getMetadataField(Constants.LAST_MODIFIED);
long createdAtMillis = getMetadataField(Constants.CREATED);
return createdAtMillis > updatedAtMillis ?
Instant.ofEpochMilli(createdAtMillis) : Instant.ofEpochMilli(updatedAtMillis);
Expand All @@ -102,14 +102,14 @@ public static class ConfluenceItemInfoBuilder {
private Instant eventTime;
private String id;
private String itemId;
private String project;
private String issueType;
private String space;
private String contentType;

public ConfluenceItemInfoBuilder() {
}

public ConfluenceItemInfo build() {
return new ConfluenceItemInfo(id, itemId, project, issueType, metadata, eventTime);
return new ConfluenceItemInfo(id, itemId, space, contentType, metadata, eventTime);
}

public ConfluenceItemInfoBuilder withMetadata(Map<String, Object> metadata) {
Expand All @@ -132,25 +132,25 @@ public ConfluenceItemInfoBuilder withId(String id) {
return this;
}

public ConfluenceItemInfoBuilder withProject(String project) {
this.project = project;
public ConfluenceItemInfoBuilder withSpace(String space) {
this.space = space;
return this;
}

public ConfluenceItemInfoBuilder withIssueBean(IssueBean issue) {
Map<String, Object> issueMetadata = new HashMap<>();
issueMetadata.put(PROJECT_KEY, issue.getProject());
issueMetadata.put(PROJECT_NAME, issue.getProjectName());
issueMetadata.put(CREATED, issue.getCreatedTimeMillis());
issueMetadata.put(UPDATED, issue.getUpdatedTimeMillis());
issueMetadata.put(ISSUE_KEY, issue.getKey());
issueMetadata.put(ConfluenceService.CONTENT_TYPE, ConfluenceContentType.ISSUE.getType());

this.project = issue.getProject();
this.id = issue.getKey();
this.issueType = ConfluenceContentType.ISSUE.getType();
this.itemId = _ISSUE + issueMetadata.get(PROJECT_KEY) + "-" + issue.getKey();
this.metadata = issueMetadata;
public ConfluenceItemInfoBuilder withIssueBean(ConfluenceItem contentItem) {
Map<String, Object> contentItemMetadata = new HashMap<>();
contentItemMetadata.put(SPACE_KEY, contentItem.getSpaceItem().getKey());
contentItemMetadata.put(CONTENT_TITLE, contentItem.getTitle());
contentItemMetadata.put(CREATED, contentItem.getCreatedTimeMillis());
contentItemMetadata.put(LAST_MODIFIED, contentItem.getUpdatedTimeMillis());
contentItemMetadata.put(CONTENT_ID, contentItem.getId());
contentItemMetadata.put(ConfluenceService.CONTENT_TYPE, ConfluenceContentType.ISSUE.getType());

this.space = contentItem.getSpaceItem().getKey();
this.id = contentItem.getId();
this.contentType = ConfluenceContentType.ISSUE.getType();
this.itemId = contentItem.getId();
this.metadata = contentItemMetadata;
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private boolean isCrawlerRunning() {

private void startCrawlerThreads() {
futureList.add(crawlerTaskExecutor.submit(() ->
service.getJiraEntities(sourceConfig, lastPollTime, itemInfoQueue), false));
service.getPages(sourceConfig, lastPollTime, itemInfoQueue), false));
}

@Override
Expand Down
Loading

0 comments on commit 6f40d65

Please sign in to comment.