Skip to content

Commit

Permalink
Addressed feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed committed Feb 13, 2024
1 parent 9776fa2 commit 0cdbdad
Show file tree
Hide file tree
Showing 28 changed files with 370 additions and 257 deletions.
6 changes: 3 additions & 3 deletions data-prepper-plugins/geoip-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ databaseNames.forEach { databaseName -> {
def gradleName = databaseName.replaceAll('-', '')
def downloadTask = tasks.register("download${gradleName}", Download) {
src(url)
dest "build/resources/test/mmdb-files/geo-lite2/" + databaseName + ".mmdb"
dest "build/resources/test/mmdb-files/geo-lite2/${databaseName}.mmdb"
overwrite true
}
downloadFiles.get().dependsOn downloadTask
Expand All @@ -63,7 +63,7 @@ enterpriseDatabaseNames.forEach { enterpriseDatabaseName -> {
def gradleName = enterpriseDatabaseName.replaceAll('-', '')
def downloadEnterpriseTask = tasks.register("download${gradleName}", Download) {
src(url)
dest "build/resources/test/mmdb-files/geo-ip2/" + enterpriseDatabaseName + ".mmdb"
dest "build/resources/test/mmdb-files/geo-ip2/${enterpriseDatabaseName}.mmdb"
overwrite true
}
downloadFiles.get().dependsOn downloadEnterpriseTask
Expand All @@ -84,7 +84,7 @@ jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 0.8 // temporarily reduce coverage for the builds to pass
minimum = 0.85
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.plugins.processor.configuration.EntryConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.GeoIPDatabaseReader;
import org.opensearch.dataprepper.plugins.processor.exception.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.exception.InvalidIPAddressException;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIPProcessorService;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpConfigSupplier;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationCheck;
Expand All @@ -26,7 +27,10 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -43,14 +47,15 @@ public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Even
static final String GEO_IP_EVENTS_PROCESSED = "eventsProcessed";
static final String GEO_IP_EVENTS_FAILED_LOOKUP = "eventsFailedLookup";
static final String GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION = "eventsFailedEngineException";
static final List<String> DATABASE_EXPIRED_TAGS = List.of("database_expired");
private final Counter geoIpEventsProcessed;
private final Counter geoIpEventsFailedLookup;
private final Counter geoIpEventsFailedEngineException;
private final GeoIPProcessorConfig geoIPProcessorConfig;
private final List<String> tagsOnFailure;
private final GeoIPProcessorService geoIPProcessorService;
private final ExpressionEvaluator expressionEvaluator;
private final Map<EntryConfig, List<GeoIPField>> entryFieldsMap;
final Map<EntryConfig, Set<GeoIPDatabase>> entryDatabaseMap;

/**
* GeoIPProcessor constructor for initialization of required attributes
Expand All @@ -73,6 +78,9 @@ public GeoIPProcessor(final PluginMetrics pluginMetrics,
this.geoIpEventsFailedLookup = pluginMetrics.counter(GEO_IP_EVENTS_FAILED_LOOKUP);
//TODO: Use the exception metric for exceptions from service
this.geoIpEventsFailedEngineException = pluginMetrics.counter(GEO_IP_EVENTS_FAILED_ENGINE_EXCEPTION);

this.entryFieldsMap = populateGeoIPFields();
this.entryDatabaseMap = populateGeoIPDatabases();
}

/**
Expand All @@ -99,17 +107,25 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor

// TODO: Need to decide the behaviour, right now if all databases are expired we don't enrich the data.
if (databasesExpired) {
event.getMetadata().addTags(DATABASE_EXPIRED_TAGS);
// TODO: Finalize the tags
continue;
}

boolean isEventFailedLookup = false;

for (final EntryConfig entry : geoIPProcessorConfig.getEntries()) {
final String source = entry.getSource();
final List<GeoIPField> fields = entry.getFields();
final Set<GeoIPDatabase> databases = entry.getGeoIPDatabases();
final String ipAddress = event.get(source, String.class);
final List<GeoIPField> fields = entryFieldsMap.get(entry);
final Set<GeoIPDatabase> databases = entryDatabaseMap.get(entry);
String ipAddress = null;
try {
ipAddress = event.get(source, String.class);
} catch (final Exception e) {
// add tags
LOG.error(DataPrepperMarkers.EVENT, "Failed to get IP address from [{}] in event: [{}]. Caused by:[{}]",
source, event, e.getMessage());
}


//Lookup from DB
if (ipAddress != null && !ipAddress.isEmpty()) {
Expand All @@ -124,13 +140,13 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
} else {
isEventFailedLookup = true;
}
} catch (final EnrichFailedException ex) {
isEventFailedLookup = true;
LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]. Caused by:{}",
event, ipAddress, ex.getMessage());
} catch (final UnknownHostException e) {
} catch (final InvalidIPAddressException | UnknownHostException e) {
isEventFailedLookup = true;
LOG.error(DataPrepperMarkers.EVENT, "Failed to validate IP address: [{}] in event: [{}]. Caused by:[{}]",
ipAddress, event, e.getMessage());
} catch (final EnrichFailedException e) {
isEventFailedLookup = true;
LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]. Caused by:{}",
event, ipAddress, e.getMessage());
}
} else {
Expand All @@ -148,6 +164,49 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
return records;
}

private Map<EntryConfig, List<GeoIPField>> populateGeoIPFields() {
final Map<EntryConfig, List<GeoIPField>> entryConfigFieldsMap = new HashMap<>();
for (final EntryConfig entry: geoIPProcessorConfig.getEntries()) {
final List<String> includeFields = entry.getIncludeFields();
final List<String> excludeFields = entry.getExcludeFields();
List<GeoIPField> geoIPFields = new ArrayList<>();
if (includeFields != null && !includeFields.isEmpty()) {
for (final String field : includeFields) {
final GeoIPField geoIPField = GeoIPField.findByName(field);
if (geoIPField != null) {
geoIPFields.add(geoIPField);
}
}
} else if (excludeFields != null) {
final List<GeoIPField> excludeGeoIPFields = new ArrayList<>();
for (final String field : excludeFields) {
final GeoIPField geoIPField = GeoIPField.findByName(field);
if (geoIPField != null) {
excludeGeoIPFields.add(geoIPField);
}
}
geoIPFields = new ArrayList<>(List.of(GeoIPField.values()));
geoIPFields.removeAll(excludeGeoIPFields);
}
entryConfigFieldsMap.put(entry, geoIPFields);
}
return entryConfigFieldsMap;
}

private Map<EntryConfig, Set<GeoIPDatabase>> populateGeoIPDatabases() {
final Map<EntryConfig, Set<GeoIPDatabase>> entryConfigGeoIPDatabaseMap = new HashMap<>();
for (final EntryConfig entry : geoIPProcessorConfig.getEntries()) {
final List<GeoIPField> geoIPFields = entryFieldsMap.get(entry);
final Set<GeoIPDatabase> geoIPDatabasesToUse = new HashSet<>();
for (final GeoIPField geoIPField : geoIPFields) {
final Set<GeoIPDatabase> geoIPDatabases = geoIPField.getGeoIPDatabases();
geoIPDatabasesToUse.addAll(geoIPDatabases);
}
entryConfigGeoIPDatabaseMap.put(entry, geoIPDatabasesToUse);
}
return entryConfigGeoIPDatabaseMap;
}

@Override
public void prepareForShutdown() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.configuration;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import org.opensearch.dataprepper.plugins.processor.GeoIPDatabase;
import org.opensearch.dataprepper.plugins.processor.GeoIPField;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class EntryConfig {
static final String DEFAULT_TARGET = "geo";
Expand All @@ -32,13 +26,6 @@ public class EntryConfig {
@JsonProperty("exclude_fields")
private List<String> excludeFields;

@JsonIgnore
private List<GeoIPField> geoIPFields;

@JsonIgnore
private Set<GeoIPDatabase> geoIPDatabasesToUse;


public String getSource() {
return source;
}
Expand All @@ -47,45 +34,12 @@ public String getTarget() {
return target;
}


public List<GeoIPField> getFields() {
if (geoIPFields != null) {
return geoIPFields;
}
geoIPFields = new ArrayList<>();
if (includeFields != null) {
for(final String field: includeFields) {
final GeoIPField geoIPField = GeoIPField.findByName(field);
if (geoIPField != null) {
geoIPFields.add(geoIPField);
}
}
return geoIPFields;
} else if (excludeFields != null) {
final List<GeoIPField> excludeGeoIPFields = new ArrayList<>();
for(final String field: excludeFields) {
final GeoIPField geoIPField = GeoIPField.findByName(field);
if (geoIPField != null) {
excludeGeoIPFields.add(geoIPField);
}
}
geoIPFields = new ArrayList<>(List.of(GeoIPField.values()));
geoIPFields.removeAll(excludeGeoIPFields);
return geoIPFields;
}
return geoIPFields;
public List<String> getIncludeFields() {
return includeFields;
}

public Set<GeoIPDatabase> getGeoIPDatabases() {
if (geoIPDatabasesToUse != null) {
return geoIPDatabasesToUse;
}
geoIPDatabasesToUse = new HashSet<>();
for (final GeoIPField geoIPField: getFields()) {
final Set<GeoIPDatabase> geoIPDatabases = geoIPField.getGeoIPDatabases();
geoIPDatabasesToUse.addAll(geoIPDatabases);
}
return geoIPDatabasesToUse;
public List<String> getExcludeFields() {
return excludeFields;
}

@AssertTrue(message = "include_fields and exclude_fields are mutually exclusive. include_fields or exclude_fields is required.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.plugins.processor.exception;

public class DatabaseReaderInitializationException extends RuntimeException {
public class DatabaseReaderInitializationException extends EngineFailureException {
public DatabaseReaderInitializationException(final String exceptionMsg) {
super(exceptionMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* Implementation class for DownloadFailedException Custom exception
*/
public class DownloadFailedException extends RuntimeException {
public class DownloadFailedException extends EngineFailureException {
public DownloadFailedException(final String exceptionMsg) {
super(exceptionMsg);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.exception;

public class EngineFailureException extends RuntimeException {
public EngineFailureException(final String exceptionMsg) {
super(exceptionMsg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.exception;

public class InvalidIPAddressException extends EnrichFailedException {
public InvalidIPAddressException(final String exceptionMsg) {
super(exceptionMsg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.dataprepper.plugins.processor.exception;

public class NoValidDatabaseFoundException extends RuntimeException {
public class NoValidDatabaseFoundException extends EngineFailureException {
public NoValidDatabaseFoundException(final String exceptionMsg) {
super(exceptionMsg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@

package org.opensearch.dataprepper.plugins.processor.extension.databasedownload;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.TrustManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSession;
import javax.net.ssl.HostnameVerifier;
import java.io.File;
import java.io.UncheckedIOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
Expand All @@ -24,28 +19,8 @@
import java.util.List;

public interface DBSource {

Logger LOG = LoggerFactory.getLogger(DBSource.class);
void initiateDownload(List<String> config) throws Exception;

/**
* create Folder If Not Exist
* @param outputFilePath Output File Path
* @return File
*/
static File createFolderIfNotExist(String outputFilePath) {
final File destFile = new File(outputFilePath);
try {
if (!destFile.exists()) {
destFile.mkdirs();
}
}
catch (UncheckedIOException ex) {
LOG.info("Create Folder If NotExist Exception {0}", ex);
}
return destFile;
}

/**
* initiateSSL
* @throws NoSuchAlgorithmException NoSuchAlgorithmException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public enum DBSourceOptions {
PATH("path"),
URL("url"),
S3("s3"),
CDN("cdn");
HTTP_MANIFEST("http_manifest");

private final String option;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ private void downloadDatabases() throws Exception {
switchDirectory();

final String destinationPath = maxMindConfig.getDatabaseDestination() + File.separator + currentDatabaseDir;
geoIPFileManager.createDirectoryIfNotExist(destinationPath);
switch (dbSourceOptions) {
case CDN:
dbSource = new CDNDownloadService(destinationPath);
case HTTP_MANIFEST:
dbSource = new ManifestDownloadService(destinationPath);
dbSource.initiateDownload(databasePaths);
downloadReady =true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,11 @@ public void deleteFile(final File file) {
file.delete();
}

public void createDirectoryIfNotExist(final String outputFilePath) {
final File destFile = new File(outputFilePath);
if (!destFile.exists()) {
destFile.mkdirs();
}
}

}
Loading

0 comments on commit 0cdbdad

Please sign in to comment.