-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SaaS Crawler Module #5095
SaaS Crawler Module #5095
Conversation
…odule for all of the gradle sources Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Maxwell Brown <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
full test coverage for base folder, spotless fixes
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
implementation 'com.fasterxml.jackson.core:jackson-core' | ||
implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
implementation 'com.mashape.unirest:unirest-java:1.4.9' | ||
implementation 'com.google.code.gson:gson:2.8.9' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this dependency? Please remove if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed them
data-prepper-plugins/saas-source-plugins/jira-source/build.gradle
Outdated
Show resolved
Hide resolved
implementation 'org.projectlombok:lombok:1.18.30' | ||
annotationProcessor 'org.projectlombok:lombok:1.18.30' | ||
|
||
testImplementation platform('org.junit:junit-bom:5.10.0') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need either of these two lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed them
enabled = false | ||
} | ||
|
||
repositories { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this block. You don't need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
.../opensearch/dataprepper/plugins/source/saas/crawler/SaasCrawlerApplicationContextMarker.java
Outdated
Show resolved
Hide resolved
|
||
@Named | ||
public class SaasPluginExecutorServiceProvider { | ||
Logger log = LoggerFactory.getLogger(SaasPluginExecutorServiceProvider.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be private static final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed it
...ensearch/dataprepper/plugins/source/saas/crawler/base/SaasPluginExecutorServiceProvider.java
Outdated
Show resolved
Hide resolved
.../main/java/org/opensearch/dataprepper/plugins/source/saas/crawler/base/SaasSourceConfig.java
Outdated
Show resolved
Hide resolved
… the review input Signed-off-by: Santhosh Gandhe <[email protected]>
Iterator<ItemInfo> itemInfoIterator = client.listItems(); | ||
log.info("Starting to crawl the source"); | ||
long updatedPollTime = 0; | ||
log.info("Creating Partitions"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a log of logs throughout the PR that are info
logs that look like they should be debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted the log level for some and removed a few log statements.
updatedPollTime = Math.max(updatedPollTime, niUpdated); | ||
log.info("updated poll time {}", updatedPollTime); | ||
} | ||
createPartition(itemInfoList, coordinator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we passing a full list to this method? Can we just create each partition inline or do they all need to be stored first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are passing maxItemsPerPage
number of items to this method. i.e. the page size in our paginated crawling. All the items in this page will go into one partition (or a work item). Like a partition per page.
} else { | ||
// Unable to acquire other partitions. | ||
// Probably we will introduce Global state in the future but for now, we don't expect to reach here. | ||
throw new RuntimeException("Unable to acquire other partitions. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe print out the partitionType here if we get to this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to the exception message.
if(leaderPartition != null) { | ||
// Extend the timeout | ||
// will always be a leader until shutdown | ||
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should catch exceptions that can come from this call so your thread doesn't shut down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapped this statement around try catch now. I don't see that this method is throwing any exception though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't most of the time but it can. This was a bug in Dynamo at one time (#4850)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dynamo store hit a 5xx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for clarifying 👍
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
this.buffer = buffer; | ||
|
||
boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition()); | ||
log.info("Leader partition creation status: {}", isPartitionCreated); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will result in one of these logs whenever a new data prepper instance starts
Leader partition creation status: false
and isn't really helpful. Can be debug
processPartition(partition.get(), buffer, sourceConfig); | ||
|
||
} else { | ||
log.info("No partition available. Going to Sleep for a while "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may also be a little noisy. Maybe a metric tracking this would be better?
Signed-off-by: Santhosh Gandhe <[email protected]>
Signed-off-by: Santhosh Gandhe <[email protected]>
Iterator<ItemInfo> listItems(); | ||
|
||
|
||
void setLastPollTime(long lastPollTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add javadoc comments for all API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added 👍
@@ -0,0 +1,81 @@ | |||
package org.opensearch.dataprepper.plugins.source.saas_crawler.base; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you not see a need for common interface for all crawlers? I was thinking there would be an interface with some default implementation and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Crawler relay on a source plugin specific iterator implementation and dispatch the work to source plugin specific client implementation. Crawler itself has generic logic for pagination.
} | ||
itemInfoList.add(nextItem); | ||
Map<String, String> metadata = nextItem.getMetadata(); | ||
long niCreated = Long.parseLong(metadata.get(CREATED)!=null? metadata.get(CREATED):"0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the fallback value of 0 or current time?
createPartition(itemInfoList, coordinator); | ||
}while (itemInfoIterator.hasNext()); | ||
log.debug("Crawling completed in {} ms", System.currentTimeMillis() - startTime); | ||
return updatedPollTime != 0 ? updatedPollTime : startTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, looks like it is falling back to current time here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
private final Crawler crawler; | ||
|
||
|
||
@DataPrepperPluginConstructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think you use @DataPrepperPluginConstructor
for abstract source classes. see ./data-prepper-plugins/http-source-common/src/main/java/org/opensearch/dataprepper/http/BaseHttpSource.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. No use of this annotation here. Removed it.
|
||
@Override | ||
public boolean areAcknowledgementsEnabled() { | ||
return Source.super.areAcknowledgementsEnabled(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a good idea. This should be left to the derived classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Removed this method. Each source plugin will implement their own version.
* contents itself which can be used to apply regex filtering, change data capture etc. general | ||
* assumption here is that fetching metadata should be faster than fetching entire Item | ||
*/ | ||
Map<String, String> metadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to make it Map<String, Object>
, it is probably unrealistic to expect all of metadata values to be Strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering each source may have different needs, agree to make it more generic. Converted the map as suggested.
Signed-off-by: Santhosh Gandhe <[email protected]>
...ensearch/dataprepper/plugins/source/saas_crawler/coordination/state/LeaderProgressState.java
Outdated
Show resolved
Hide resolved
...awler/src/main/java/org/opensearch/dataprepper/plugins/source/saas_crawler/base/Crawler.java
Outdated
Show resolved
Hide resolved
...ler/src/main/java/org/opensearch/dataprepper/plugins/source/saas_crawler/model/ItemInfo.java
Outdated
Show resolved
Hide resolved
...ler/src/main/java/org/opensearch/dataprepper/plugins/source/saas_crawler/model/ItemInfo.java
Outdated
Show resolved
Hide resolved
...ensearch/dataprepper/plugins/source/saas_crawler/base/SaasPluginExecutorServiceProvider.java
Outdated
Show resolved
Hide resolved
data-prepper-plugins/saas-source-plugins/saas-crawler/build.gradle
Outdated
Show resolved
Hide resolved
data-prepper-plugins/saas-source-plugins/saas-crawler/build.gradle
Outdated
Show resolved
Hide resolved
...ira-source/src/main/java/org/opensearch/dataprepper/plugins/source/saas/jira/JiraSource.java
Show resolved
Hide resolved
data-prepper-plugins/saas-source-plugins/jira-source/build.gradle
Outdated
Show resolved
Hide resolved
* JiraConnector connector entry point. | ||
*/ | ||
|
||
public abstract class SaasSourcePlugin implements Source<Record<Event>>, UsesEnhancedSourceCoordination { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also rename this to SourceCrawler
.
...ensearch/dataprepper/plugins/source/saas_crawler/coordination/state/LeaderProgressState.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @san81 for the improvements. This is very good. I think just a few more changes and I'll be good to go.
executorService.submit(workerScheduler); | ||
|
||
//Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute | ||
Thread.sleep(11000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to lower this sleep.
executorService.submit(workerScheduler); | ||
|
||
//Wait for more than a minute as the default while loop wait time in leader scheduler is 1 minute | ||
Thread.sleep(11000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to lower this sleep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with getting the other changes in a follow-on PR.
Signed-off-by: Santhosh Gandhe <[email protected]> converting last_poll_time to java Instant type Signed-off-by: Santhosh Gandhe <[email protected]> we are now capturing Crawling times Signed-off-by: Santhosh Gandhe <[email protected]> ItemInfo long timestamp is now using Instant type Signed-off-by: Santhosh Gandhe <[email protected]> addressing review comments Signed-off-by: Santhosh Gandhe <[email protected]> Instant conversion Signed-off-by: Santhosh Gandhe <[email protected]> addressing review comments Signed-off-by: Santhosh Gandhe <[email protected]> code formatting Signed-off-by: Santhosh Gandhe <[email protected]> removed long polling by enabling setter on the leader scheduler timer Signed-off-by: Santhosh Gandhe <[email protected]> reducing wait times Signed-off-by: Santhosh Gandhe <[email protected]>
0087491
to
39d98e7
Compare
Description
Introducing SaaS Source Plugins module and a base Jira Source plugin class
Issues Resolved
Resolves #4754
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.