-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Geoip database update implementation #4105
Geoip database update implementation #4105
Conversation
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
5c864af
to
5bc1419
Compare
|
||
for (final EntryConfig entry : geoIPProcessorConfig.getEntries()) { | ||
final String source = entry.getSource(); | ||
final List<String> attributes = entry.getFields(); | ||
final List<GeoIPField> fields = entry.getFields(); | ||
final Set<GeoIPDatabase> databases = entry.getGeoIPDatabases(); | ||
final String ipAddress = event.get(source, String.class); | ||
|
||
//Lookup from DB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be in a separate method for readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a getter in EntryConfig. It will be calculated only once and subsequent call will just return the object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker but would still be more readable to wrap the L131-151 into a private method.
public String tarFolderPath = tempFolderPath + "/tar"; | ||
public String downloadTarFilepath = tarFolderPath + "/out.tar.gz"; | ||
public void initiateDownload(List<String> config) throws Exception; | ||
Logger LOG = LoggerFactory.getLogger(DBSource.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be at interface level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to refactor this. There is a LOG in line 44, I will try to remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loggers in interfaces is probably unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @asifsmohammed . This is making a lot of improvements! I made a number of high-level comments.
def gradleName = enterpriseDatabaseName.replaceAll('-', '') | ||
def downloadEnterpriseTask = tasks.register("download${gradleName}", Download) { | ||
src(url) | ||
dest "build/resources/test/mmdb-files/geo-ip2/" + enterpriseDatabaseName + ".mmdb" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can improve this through Groovy's GStrings:
dest "build/resources/test/mmdb-files/geo-ip2/${enterpriseDatabaseName}.mmdb"
} | ||
|
||
jacocoTestCoverageVerification { | ||
dependsOn jacocoTestReport | ||
violationRules { | ||
rule { | ||
limit { | ||
minimum = 0.1 // temporarily reduce coverage for the builds to pass | ||
minimum = 0.8 // temporarily reduce coverage for the builds to pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
private List<String> includeFields = new ArrayList<>(); | ||
|
||
@JsonProperty("exclude_fields") | ||
private List<String> excludeFields = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to use @JsonCreator
in the enum class to make this serializable. Then you can change this to: private List<String> excludeFields
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to deserialize to enum directly because processor doesn't have control of Object Mapper to ignore invalid enum option. It throws an exception. So I kept it as List without any default value. If it's empty no fields will be excluded and all fields are added.
return geoIPFields; | ||
} | ||
|
||
public Set<GeoIPDatabase> getGeoIPDatabases() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be moved to a higher-level class that determines the databases from the fields. The database list is not directly linked to the configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I had it here because it simplifies getting this list for each entry, instead of calculation it in the processor. But I can move it to processor initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend a specific class for database determination. It will have a specific set of concerns and be more testable.
public List<String> getFields() { | ||
return fields; | ||
|
||
public List<GeoIPField> getFields() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could consider moving this to an abstraction class. But, I don't feel as strongly about that as with the databases. These are more closely related to the configuration than the databases.
@JsonProperty("source") | ||
@NotEmpty | ||
private String source; | ||
|
||
@JsonProperty("target") | ||
private String target = DEFAULT_TARGET; | ||
|
||
@JsonProperty("fields") | ||
private List<String> fields; | ||
@JsonProperty("include_fields") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to think we should move these up a level. Part of the reason to combine the processing in a single processor is to simplify the configurations. If we move it to the top, then users only have to configure once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss it. I think if we are moving this there is no use of entries, and user can configure multiple instances of geoip processor as we are only downloading data once.
|
||
package org.opensearch.dataprepper.plugins.processor.exception; | ||
|
||
public class DatabaseReaderInitializationException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per my comments about grouping errors for the purpose of tagging (and I think we can extend that to metrics), let's create a few base exceptions that represent the important main groups.
e.g.
public class EngineFailureException extends RuntimeException
public class DatabaseReaderInitializationException extends EngineFailureException
This comment applies to all the exception types.
@@ -17,7 +17,8 @@ | |||
public enum DBSourceOptions { | |||
PATH("path"), | |||
URL("url"), | |||
S3("s3"); | |||
S3("s3"), | |||
CDN("cdn"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generically for the user, this should be configured as an HTTP endpoint.
I think you can make this:
HTTP("http")
The fact that the implementation is a CDN is not relevant for the user. That is a detail of the server itself, not the client (ie. Data Prepper).
public String tarFolderPath = tempFolderPath + "/tar"; | ||
public String downloadTarFilepath = tarFolderPath + "/out.tar.gz"; | ||
public void initiateDownload(List<String> config) throws Exception; | ||
Logger LOG = LoggerFactory.getLogger(DBSource.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loggers in interfaces is probably unnecessary.
import static org.opensearch.dataprepper.plugins.processor.databaseenrich.GeoIPDatabaseReader.MAXMIND_DATABASE_EXTENSION; | ||
|
||
|
||
public class CDNDownloadService implements DBSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this HttpDownloadService
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, please consider how we can make the project support two different scenarios:
- An HTTP endpoint using the manifest (what you have now)
- An HTTP endpoint without a manifest. Say a user just wants to host these files in their own HTTP server.
Signed-off-by: Asif Sohail Mohammed <[email protected]>
Signed-off-by: Asif Sohail Mohammed <[email protected]>
@@ -93,7 +93,7 @@ void verify_enrichment_of_data_from_maxmind_url() throws UnknownHostException { | |||
if (IPValidationCheck.isPublicIpAddress(ipAddress)) { | |||
InetAddress inetAddress = InetAddress.getByName(ipAddress); | |||
//All attributes are considered by default with the null value | |||
geoData = geoIPProcessorService.getGeoData(inetAddress, null); | |||
// geoData = geoIPProcessorService.getGeoData(inetAddress, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't fixed these integ tests yet, I need to change this because the constructor changed.
final String whenCondition = geoIPProcessorConfig.getWhenCondition(); | ||
|
||
if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) { | ||
continue; | ||
} | ||
geoIpEventsProcessed.increment(); | ||
|
||
// TODO: Need to decide the behaviour, right now if all databases are expired we don't enrich the data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just having this processor be a no-op on Events is ok when the database is expired, but we should be sure to tag these Events so that they can be routed appropriately in downstream sink and processors. I would say we could even default the tag to be _expired_geoip_database
, and could make it configurable with tags_on_expired_database
or something along those lines, given that this is one of the 2 special cases of errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah there will be tags added if database expired. The default tags are not being added right now and we will consider a mechanism on how we can add tags in plugins and get back to it. Right now it will be configurable
final Set<GeoIPDatabase> databases = entryDatabaseMap.get(entry); | ||
String ipAddress = null; | ||
try { | ||
ipAddress = event.get(source, String.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a placeholder for future implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but as you mentioned above I will try to put all doExecute logic in try/catch in upcoming PR's
final GeoIPDatabaseReader geoIPDatabaseReader = geoIPProcessorService.getGeoIPDatabaseReader(); | ||
final boolean databasesExpired = geoIPDatabaseReader.isExpired(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wrap the entirety of the doExecute
block in a generic try/catch that will ensure geoip never throws a runtime exception. We should also have a try/catch in the looping of Events, where we can catch any exceptions for one Event, log them, and then continue to the next Event.
settings.gradle
Outdated
@@ -117,7 +117,7 @@ include 'data-prepper-expression' | |||
include 'data-prepper-plugins:mutate-string-processors' | |||
include 'data-prepper-plugins:s3-source' | |||
include 'data-prepper-plugins:s3-sink' | |||
include 'data-prepper-plugins:rss-source' | |||
//include 'data-prepper-plugins:rss-source' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to remove this in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should not have removed it, but the build has failed multiple times in RSS. I will add it back
|
||
for (final EntryConfig entry : geoIPProcessorConfig.getEntries()) { | ||
final String source = entry.getSource(); | ||
final List<String> attributes = entry.getFields(); | ||
final List<GeoIPField> fields = entry.getFields(); | ||
final Set<GeoIPDatabase> databases = entry.getGeoIPDatabases(); | ||
final String ipAddress = event.get(source, String.class); | ||
|
||
//Lookup from DB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker but would still be more readable to wrap the L131-151 into a private method.
@@ -119,21 +160,65 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor | |||
event.getMetadata().addTags(tagsOnFailure); | |||
} | |||
} | |||
geoIPDatabaseReader.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be missed if you have an exception that isn't caught. I recommend you use try-with-resources.
try(final GeoIPDatabaseReader geoIPDatabaseReader = geoIPProcessorService.getGeoIPDatabaseReader()) {
...
}
The try block will call .close()
, so you won't even need this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will address this in a follow-on PR.
public void close() { | ||
final int count = closeCount.decrementAndGet(); | ||
if (count == 0) { | ||
LOG.info("Closing old geoip database readers"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be the DEBUG
level.
|
||
@Override | ||
public void close() { | ||
final int count = closeCount.decrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This counting logic is common between both the GeoIp Lite and Enterprise DatabaseReader
implementations. I'd consider a decorator which handles the counting.
AutoCountingDatabaseReader implements DatabaseReader, AutoCloseable {
private final DatabaseReader delegateDatabaseReader; // pass in through constructor
@Override
public Map<String, Object> getGeoData(final InetAddress inetAddress, final List<GeoIPField> fields, final Set<GeoIPDatabase> geoIPDatabases) {
return delegateDatabaseReader.getGeoData(inetAddress, fields, geoIpDatabases);
}
@Override
public void close() {
final int count = closeCount.decrementAndGet();
if (count == 0) {
LOG.info("Closing old geoip database readers");
delegateReader.close();
}
}
This could come as a follow-on PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will address this in a follow-on PR.
} | ||
|
||
@Override | ||
public void retain() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider finding a way to automatically retain - perhaps on creation. This is not necessary and could be a follow-on PR.
} | ||
|
||
@Override | ||
public GeoIPProcessorService getGeoIPProcessorService() { | ||
return new GeoIPProcessorService(geoIpServiceConfig); | ||
if (geoIpServiceConfig != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making this an Optional
. Then the code that calls it can look somewhat like:
this.service = supplier.getGeoIpProcessorService().orElseThrow(() -> ... existing exception);
@@ -57,6 +68,19 @@ boolean isAwsAuthenticationOptionsRequired() { | |||
return true; | |||
} | |||
|
|||
@AssertTrue(message = "database_paths should be https endpoint if using URL and if insecure is set to false") | |||
public boolean isSecureEndpoint() throws URISyntaxException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider renaming this method. The logic below looks incorrect based on the name. Perhaps isHttpsEndpointOrInsecure
?
* @since 2.7 | ||
*/ | ||
public String getDatabaseDestination() { | ||
return databaseDestination + File.separator + File.separator + "geoip"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you have two File.separator
s appended?
try { | ||
final URL url = new URL(CDNEndpoint); | ||
httpURLConnection = (HttpURLConnection) url.openConnection(); | ||
httpURLConnection.addRequestProperty("User-Agent", "Custom-User-Agent"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we add these? Should they be configurable?
Signed-off-by: Asif Sohail Mohammed <[email protected]>
@@ -69,8 +68,10 @@ public GeoIPProcessor(final PluginMetrics pluginMetrics, | |||
final GeoIpConfigSupplier geoIpConfigSupplier, | |||
final ExpressionEvaluator expressionEvaluator) { | |||
super(pluginMetrics); | |||
this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService(); | |||
Objects.requireNonNull(geoIPProcessorService, "geoip_service configuration is required when using geoip processor."); | |||
if (geoIpConfigSupplier.getGeoIPProcessorService().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simplify this code:
this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService().orElseThrow(() -> new IlegalStateException("geoip_service configuration is required when using geoip processor."));
Perhaps do that in the follow-up PRs.
bb494de
into
opensearch-project:main
Description
This PR adds implementation for geoip processor.
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.