Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add batch size field for jira source #5348

Merged
merged 6 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,19 @@ public class JiraSourceConfig implements CrawlerSourceConfig {
@Valid
private AuthenticationConfig authenticationConfig;


/**
* Filter Config to filter what tickets get ingested
* Batch size for fetching tickets
*/
@JsonProperty("filter")
private FilterConfig filterConfig;
@JsonProperty("batch_size")
private int batchSize = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest you add a private static final variable at the top of the class and set it there. Assign that variable here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok



/**
* Number of worker threads to spawn to parallel source fetching
* Filter Config to filter what tickets get ingested
*/
@JsonProperty("workers")
private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;
@JsonProperty("filter")
private FilterConfig filterConfig;

/**
* Default time to wait (with exponential backOff) in the case of
* waiting for the source service to respond
*/
@JsonProperty("backoff_time")
private Duration backOff = DEFAULT_BACKOFF_MILLIS;

public String getAccountUrl() {
return this.getHosts().get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.BASIC;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.OAUTH2;
import static org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig.DEFAULT_NUMBER_OF_WORKERS;

public class JiraSourceConfigTest {
private final PluginConfigVariable accessToken = new MockPluginConfigVariableImpl("access token test");
Expand Down Expand Up @@ -115,11 +113,9 @@ private JiraSourceConfig createJiraSourceConfig(String authtype, boolean hasToke
void testGetters() throws Exception {
jiraSourceConfig = createJiraSourceConfig(BASIC, false);
assertEquals(jiraSourceConfig.getFilterConfig().getIssueTypeConfig().getInclude(), issueTypeList);
assertEquals(jiraSourceConfig.getNumWorkers(), DEFAULT_NUMBER_OF_WORKERS);
assertEquals(jiraSourceConfig.getFilterConfig().getProjectConfig().getNameConfig().getInclude(), projectList);
assertEquals(jiraSourceConfig.getFilterConfig().getStatusConfig().getInclude(), statusList);
assertEquals(jiraSourceConfig.getAccountUrl(), accountUrl);
assertNotNull(jiraSourceConfig.getBackOff());
assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getPassword(), password);
assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getUsername(), username);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
@Named
public class Crawler {
private static final Logger log = LoggerFactory.getLogger(Crawler.class);
private static final int maxItemsPerPage = 50;
private final Timer crawlingTimer;
private final PluginMetrics pluginMetrics =
PluginMetrics.fromNames("sourceCrawler", "crawler");
Expand All @@ -36,14 +35,14 @@ public Crawler(CrawlerClient client) {
}

public Instant crawl(Instant lastPollTime,
EnhancedSourceCoordinator coordinator) {
EnhancedSourceCoordinator coordinator, int batchSize) {
long startTime = System.currentTimeMillis();
client.setLastPollTime(lastPollTime);
Iterator<ItemInfo> itemInfoIterator = client.listItems();
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) {
for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) {
ItemInfo nextItem = itemInfoIterator.next();
if (nextItem == null) {
//we don't expect null items, but just in case, we'll skip them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@
public interface CrawlerSourceConfig {

int DEFAULT_NUMBER_OF_WORKERS = 1;

int getBatchSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class CrawlerSourcePlugin implements Source<Record<Event>>, Uses
private final CrawlerSourceConfig sourceConfig;
private final Crawler crawler;
private final String sourcePluginName;
private final int batchSize;
private EnhancedSourceCoordinator coordinator;
private Buffer<Record<Event>> buffer;

Expand All @@ -59,6 +60,7 @@ public CrawlerSourcePlugin(final String sourcePluginName,
this.sourceConfig = sourceConfig;
this.pluginFactory = pluginFactory;
this.crawler = crawler;
this.batchSize = sourceConfig.getBatchSize();

this.acknowledgementSetManager = acknowledgementSetManager;
this.executorService = executorServiceProvider.get();
Expand All @@ -74,7 +76,7 @@ public void start(Buffer<Record<Event>> buffer) {
boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition());
log.debug("Leader partition creation status: {}", isPartitionCreated);

Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler);
Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler, batchSize);
this.executorService.submit(leaderScheduler);
//Register worker threaders
for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ public class LeaderScheduler implements Runnable {
@Setter
private Duration leaseInterval;
private LeaderPartition leaderPartition;
private final int batchSize;

public LeaderScheduler(EnhancedSourceCoordinator coordinator,
CrawlerSourcePlugin sourcePlugin,
Crawler crawler) {
Crawler crawler,
int batchSize) {
this.coordinator = coordinator;
this.leaseInterval = DEFAULT_LEASE_INTERVAL;
this.sourcePlugin = sourcePlugin;
this.crawler = crawler;
this.batchSize = batchSize;
}

@Override
Expand All @@ -65,7 +68,7 @@ public void run() {
Instant lastPollTime = leaderProgressState.getLastPollTime();

//Start crawling and create child partitions
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize);
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class CrawlerSourcePluginTest {

private testCrawlerSourcePlugin saasSourcePlugin;

private final int batchSize = 50;

@BeforeEach
void setUp() {
when(executorServiceProvider.get()).thenReturn(executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;

import java.lang.reflect.Field;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -68,20 +67,21 @@ public void executePartitionTest() {
void testCrawlWithEmptyList() {
Instant lastPollTime = Instant.ofEpochMilli(0);
when(client.listItems()).thenReturn(Collections.emptyIterator());
crawler.crawl(lastPollTime, coordinator);
int maxItemsPerPage = 50;
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
verify(coordinator, never()).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
int maxItemsPerPage = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved out of the test and directly reference the default from the class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(new TestItemInfo("itemId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));

}
Expand All @@ -90,27 +90,50 @@ void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessExcep
void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
int maxItemsPerPage = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved out of the test and directly reference the default from the class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for (int i = 0; i < maxItemsPerPage + 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testBatchSize() {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = 50;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
int expectedNumberOfInvocations = 1;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));

List<ItemInfo> itemInfoList2 = new ArrayList<>();
int maxItemsPerPage2 = 25;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList2.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2);
expectedNumberOfInvocations += 2;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
int maxItemsPerPage = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved out of the test and directly reference the default from the class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

itemInfoList.add(null);
for (int i = 0; i < maxItemsPerPage - 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));
}

Expand All @@ -121,7 +144,8 @@ void testUpdatingPollTimeNullMetaData() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
int maxItemsPerPage = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved out of the test and directly reference the default from the class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime);
}

Expand All @@ -132,17 +156,11 @@ void testUpdatedPollTimeNiCreatedLarger() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
int maxItemsPerPage = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved out of the test and directly reference the default from the class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
assertNotEquals(lastPollTime, updatedPollTime);
}


private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException {
Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage");
maxItemsPerPageField.setAccessible(true);
return (int) maxItemsPerPageField.get(null);
}

private ItemInfo createTestItemInfo(String id) {
return new TestItemInfo(id, new HashMap<>(), Instant.now());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.Executors;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.atLeast;
Expand All @@ -37,9 +38,11 @@ public class LeaderSchedulerTest {
@Mock
private Crawler crawler;

private final int batchSize = 50;

@Test
void testUnableToAcquireLeaderPartition() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty());

ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -52,7 +55,7 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testLeaderPartitionsCreation(boolean initializationState) throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(initializationState);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
Expand All @@ -66,15 +69,15 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator);
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize);
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(initializationState);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
Expand All @@ -92,12 +95,12 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr

@Test
void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
leaderScheduler.setLeaseInterval(Duration.ofMillis(10));
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(false);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10));
when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
.thenReturn(Optional.of(leaderPartition))
.thenThrow(RuntimeException.class);
Expand All @@ -110,6 +113,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class));
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt());
}
}
Loading