diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java index ed873fc49b..3cb7b9501c 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java @@ -17,13 +17,12 @@ import org.opensearch.dataprepper.plugins.source.jira.configuration.FilterConfig; import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig; -import java.time.Duration; import java.util.List; @Getter public class JiraSourceConfig implements CrawlerSourceConfig { - private static final Duration DEFAULT_BACKOFF_MILLIS = Duration.ofMinutes(2); + private static final int DEFAULT_BATCH_SIZE = 50; /** * Jira account url @@ -38,26 +37,19 @@ public class JiraSourceConfig implements CrawlerSourceConfig { @Valid private AuthenticationConfig authenticationConfig; - /** - * Filter Config to filter what tickets get ingested + * Batch size for fetching tickets */ - @JsonProperty("filter") - private FilterConfig filterConfig; + @JsonProperty("batch_size") + private int batchSize = DEFAULT_BATCH_SIZE; /** - * Number of worker threads to spawn to parallel source fetching + * Filter Config to filter what tickets get ingested */ - @JsonProperty("workers") - private int numWorkers = DEFAULT_NUMBER_OF_WORKERS; + @JsonProperty("filter") + private FilterConfig filterConfig; - /** - * Default time to wait (with exponential backOff) in the case of - * waiting for the source service to respond - */ - @JsonProperty("backoff_time") - private Duration backOff = DEFAULT_BACKOFF_MILLIS; /** * Boolean property indicating end to end acknowledgments state diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java index b7b30af5f6..94aa04c554 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java @@ -23,11 +23,9 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.BASIC; import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.OAUTH2; -import static org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig.DEFAULT_NUMBER_OF_WORKERS; public class JiraSourceConfigTest { private final PluginConfigVariable accessToken = new MockPluginConfigVariableImpl("access token test"); @@ -115,11 +113,9 @@ private JiraSourceConfig createJiraSourceConfig(String authtype, boolean hasToke void testGetters() throws Exception { jiraSourceConfig = createJiraSourceConfig(BASIC, false); assertEquals(jiraSourceConfig.getFilterConfig().getIssueTypeConfig().getInclude(), issueTypeList); - assertEquals(jiraSourceConfig.getNumWorkers(), DEFAULT_NUMBER_OF_WORKERS); assertEquals(jiraSourceConfig.getFilterConfig().getProjectConfig().getNameConfig().getInclude(), projectList); assertEquals(jiraSourceConfig.getFilterConfig().getStatusConfig().getInclude(), statusList); assertEquals(jiraSourceConfig.getAccountUrl(), accountUrl); - assertNotNull(jiraSourceConfig.getBackOff()); assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getPassword(), password); assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getUsername(), username); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java index 9634289344..ea834eba3b 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java @@ -24,7 +24,6 @@ @Named public class Crawler { private static final Logger log = LoggerFactory.getLogger(Crawler.class); - private static final int maxItemsPerPage = 100; private final Timer crawlingTimer; private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("sourceCrawler", "crawler"); @@ -37,14 +36,14 @@ public Crawler(CrawlerClient client) { } public Instant crawl(Instant lastPollTime, - EnhancedSourceCoordinator coordinator) { + EnhancedSourceCoordinator coordinator, int batchSize) { long startTime = System.currentTimeMillis(); client.setLastPollTime(lastPollTime); Iterator itemInfoIterator = client.listItems(); log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime); do { final List itemInfoList = new ArrayList<>(); - for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) { + for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) { ItemInfo nextItem = itemInfoIterator.next(); if (nextItem == null) { //we don't expect null items, but just in case, we'll skip them diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java index 77902bdbc7..de5ebe3f7c 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java @@ -7,6 +7,8 @@ public interface CrawlerSourceConfig { int DEFAULT_NUMBER_OF_WORKERS = 1; + int getBatchSize(); + /** * Boolean to indicate if acknowledgments enabled for this source * diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java index 70c0182e27..34f19d4107 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java @@ -42,6 +42,7 @@ public abstract class CrawlerSourcePlugin implements Source>, Uses private final CrawlerSourceConfig sourceConfig; private final Crawler crawler; private final String sourcePluginName; + private final int batchSize; private EnhancedSourceCoordinator coordinator; private Buffer> buffer; @@ -59,6 +60,7 @@ public CrawlerSourcePlugin(final String sourcePluginName, this.sourceConfig = sourceConfig; this.pluginFactory = pluginFactory; this.crawler = crawler; + this.batchSize = sourceConfig.getBatchSize(); this.acknowledgementSetManager = acknowledgementSetManager; this.executorService = executorServiceProvider.get(); @@ -74,7 +76,7 @@ public void start(Buffer> buffer) { boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition()); log.debug("Leader partition creation status: {}", isPartitionCreated); - Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler); + Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler, batchSize); this.executorService.submit(leaderScheduler); //Register worker threaders for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) { diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java index 615237e8e5..79705206f1 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java @@ -34,14 +34,17 @@ public class LeaderScheduler implements Runnable { @Setter private Duration leaseInterval; private LeaderPartition leaderPartition; + private final int batchSize; public LeaderScheduler(EnhancedSourceCoordinator coordinator, CrawlerSourcePlugin sourcePlugin, - Crawler crawler) { + Crawler crawler, + int batchSize) { this.coordinator = coordinator; this.leaseInterval = DEFAULT_LEASE_INTERVAL; this.sourcePlugin = sourcePlugin; this.crawler = crawler; + this.batchSize = batchSize; } @Override @@ -65,7 +68,7 @@ public void run() { Instant lastPollTime = leaderProgressState.getLastPollTime(); //Start crawling and create child partitions - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize); leaderProgressState.setLastPollTime(updatedPollTime); leaderPartition.setLeaderProgressState(leaderProgressState); coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java index 8edfa66f71..397078fcf4 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java @@ -62,6 +62,8 @@ public class CrawlerSourcePluginTest { private testCrawlerSourcePlugin saasSourcePlugin; + private final int batchSize = 50; + @BeforeEach void setUp() { when(executorServiceProvider.get()).thenReturn(executorService); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java index 2b3aab7fcc..afac9f3e54 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java @@ -15,7 +15,6 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo; -import java.lang.reflect.Field; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -49,6 +48,8 @@ public class CrawlerTest { private Crawler crawler; + private static final int DEFAULT_BATCH_SIZE = 50; + @BeforeEach public void setup() { crawler = new Crawler(client); @@ -69,49 +70,69 @@ public void executePartitionTest() { void testCrawlWithEmptyList() { Instant lastPollTime = Instant.ofEpochMilli(0); when(client.listItems()).thenReturn(Collections.emptyIterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, never()).createPartition(any(SaasSourcePartition.class)); } @Test - void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException { + void testCrawlWithNonEmptyList(){ Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); - for (int i = 0; i < maxItemsPerPage; i++) { + for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) { itemInfoList.add(new TestItemInfo("itemId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @Test - void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException { + void testCrawlWithMultiplePartitions(){ Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); - for (int i = 0; i < maxItemsPerPage + 1; i++) { + for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class)); + } + @Test + void testBatchSize() { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + int maxItemsPerPage = 50; + for (int i = 0; i < maxItemsPerPage; i++) { + itemInfoList.add(new TestItemInfo("testId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + int expectedNumberOfInvocations = 1; + verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); + + List itemInfoList2 = new ArrayList<>(); + int maxItemsPerPage2 = 25; + for (int i = 0; i < maxItemsPerPage; i++) { + itemInfoList2.add(new TestItemInfo("testId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2); + expectedNumberOfInvocations += 2; + verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); } @Test void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException { Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); itemInfoList.add(null); - for (int i = 0; i < maxItemsPerPage - 1; i++) { + for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @@ -122,7 +143,7 @@ void testUpdatingPollTimeNullMetaData() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime); } @@ -133,17 +154,10 @@ void testUpdatedPollTimeNiCreatedLarger() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); assertNotEquals(lastPollTime, updatedPollTime); } - - private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException { - Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage"); - maxItemsPerPageField.setAccessible(true); - return (int) maxItemsPerPageField.get(null); - } - private ItemInfo createTestItemInfo(String id) { return new TestItemInfo(id, new HashMap<>(), Instant.now()); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index 6390008b21..d5ef0cecde 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executors; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.atLeast; @@ -37,9 +38,11 @@ public class LeaderSchedulerTest { @Mock private Crawler crawler; + private final int batchSize = 50; + @Test void testUnableToAcquireLeaderPartition() throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty()); ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -52,7 +55,7 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException { @ParameterizedTest @ValueSource(booleans = {true, false}) void testLeaderPartitionsCreation(boolean initializationState) throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(initializationState); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); @@ -66,7 +69,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte executorService.shutdownNow(); // Check if crawler was invoked and updated leader lease renewal time - verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator); + verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize); verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); } @@ -74,7 +77,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte @ParameterizedTest @ValueSource(booleans = {true, false}) void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(initializationState); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); @@ -92,12 +95,12 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr @Test void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); leaderScheduler.setLeaseInterval(Duration.ofMillis(10)); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(false); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); - when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10)); + when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10)); when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)) .thenReturn(Optional.of(leaderPartition)) .thenThrow(RuntimeException.class); @@ -110,6 +113,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { executorService.shutdownNow(); // Check if crawler was invoked and updated leader lease renewal time - verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class)); + verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt()); } }