From 384774ce4155bd87f08bbebd73a0f3cd99728916 Mon Sep 17 00:00:00 2001 From: gsidhwani_nr Date: Mon, 14 Oct 2024 13:30:13 +0530 Subject: [PATCH] feat: first functional draft log4jAppender --- custom-log4j-appender/build.gradle | 42 +++++ .../main/java/com/newrelic/labs/LogEntry.java | 37 ++++ .../java/com/newrelic/labs/LogForwarder.java | 156 +++++++++++++++ .../com/newrelic/labs/LowercaseKeyMap.java | 19 ++ .../labs/NewRelicBatchingAppender.java | 177 ++++++++++++++++++ settings.gradle | 1 + 6 files changed, 432 insertions(+) create mode 100644 custom-log4j-appender/build.gradle create mode 100644 custom-log4j-appender/src/main/java/com/newrelic/labs/LogEntry.java create mode 100644 custom-log4j-appender/src/main/java/com/newrelic/labs/LogForwarder.java create mode 100644 custom-log4j-appender/src/main/java/com/newrelic/labs/LowercaseKeyMap.java create mode 100644 custom-log4j-appender/src/main/java/com/newrelic/labs/NewRelicBatchingAppender.java diff --git a/custom-log4j-appender/build.gradle b/custom-log4j-appender/build.gradle new file mode 100644 index 0000000..8cc3311 --- /dev/null +++ b/custom-log4j-appender/build.gradle @@ -0,0 +1,42 @@ +plugins { + id 'java' + id 'com.github.johnrengelman.shadow' version '6.1.0' +} + +repositories { + mavenCentral() // Ensure you are using Maven Central repository +} + +dependencies { + // OkHttp for HTTP requests + implementation 'com.squareup.okhttp3:okhttp:4.9.3' + + // Jackson for JSON processing + implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.1' + + // Log4j Core for logging + implementation 'org.apache.logging.log4j:log4j-core:2.14.1' + implementation 'org.apache.logging.log4j:log4j-api:2.14.1' +} + +jar { + manifest { + attributes( + 'Implementation-Title': 'com.newrelic.labs.custom-log4j-appender', + 'Implementation-Vendor': 'New Relic Labs', + 'Implementation-Vendor-Id': 'com.newrelic.labs', + 'Implementation-Version': '1.0' + ) + } +} + +shadowJar { + mergeServiceFiles() + manifest { + attributes 'Main-Class': 'com.newrelic.labs.logforwarder.MAIN' + } +} + +tasks.withType(JavaCompile) { + options.encoding = 'UTF-8' +} \ No newline at end of file diff --git a/custom-log4j-appender/src/main/java/com/newrelic/labs/LogEntry.java b/custom-log4j-appender/src/main/java/com/newrelic/labs/LogEntry.java new file mode 100644 index 0000000..b9f1332 --- /dev/null +++ b/custom-log4j-appender/src/main/java/com/newrelic/labs/LogEntry.java @@ -0,0 +1,37 @@ +package com.newrelic.labs; + +public class LogEntry { + private final String message; + private final String applicationName; + private final String name; + private final String logtype; + private final long timestamp; + + public LogEntry(String message, String applicationName, String name, String logtype, long timestamp) { + this.message = message; + this.applicationName = applicationName; + this.name = name; + this.logtype = logtype; + this.timestamp = timestamp; + } + + public String getMessage() { + return message; + } + + public String getApplicationName() { + return applicationName; + } + + public String getName() { + return name; + } + + public String getLogType() { + return logtype; + } + + public long getTimestamp() { + return timestamp; + } +} \ No newline at end of file diff --git a/custom-log4j-appender/src/main/java/com/newrelic/labs/LogForwarder.java b/custom-log4j-appender/src/main/java/com/newrelic/labs/LogForwarder.java new file mode 100644 index 0000000..24cd668 --- /dev/null +++ b/custom-log4j-appender/src/main/java/com/newrelic/labs/LogForwarder.java @@ -0,0 +1,156 @@ +package com.newrelic.labs; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.zip.GZIPOutputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +public class LogForwarder { + private final BlockingQueue logQueue; + private final String apiKey; + private final String apiURL; + private final OkHttpClient client = new OkHttpClient(); + private final ObjectMapper objectMapper = new ObjectMapper(); + private final long maxMessageSize; + + public LogForwarder(String apiKey, String apiURL, long maxMessageSize, BlockingQueue logQueue) { + this.apiKey = apiKey; + this.apiURL = apiURL; + this.maxMessageSize = maxMessageSize; + this.logQueue = logQueue; + } + + public void addToQueue(List lines, String applicationName, String name, String logtype) { + for (String line : lines) { + logQueue.add(new LogEntry(line, applicationName, name, logtype, System.currentTimeMillis())); + } + } + + public boolean isInitialized() { + return apiKey != null && apiURL != null; + } + + public void flush(List logEntries) { + InetAddress localhost = null; + try { + localhost = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + String hostname = localhost != null ? localhost.getHostName() : "unknown"; + + @SuppressWarnings("unused") + MediaType mediaType = MediaType.parse("application/json"); + + try { + List> logEvents = new ArrayList<>(); + for (LogEntry entry : logEntries) { + Map logEvent = objectMapper.convertValue(entry, LowercaseKeyMap.class); + logEvent.put("hostname", hostname); + logEvent.put("logtype", entry.getLogType()); + logEvent.put("timestamp", entry.getTimestamp()); // Use log creation timestamp + logEvent.put("applicationName", entry.getApplicationName()); + logEvent.put("name", entry.getName()); + logEvent.put("source", "NRBatchingAppender"); // Add custom field + logEvents.add(logEvent); + } + + String jsonPayload = objectMapper.writeValueAsString(logEvents); + byte[] compressedPayload = gzipCompress(jsonPayload); + + if (compressedPayload.length > maxMessageSize) { // Configurable size limit + System.err.println("Batch size exceeds limit, splitting batch..."); + List subBatch = new ArrayList<>(); + int currentSize = 0; + for (LogEntry entry : logEntries) { + String entryJson = objectMapper.writeValueAsString(entry); + int entrySize = gzipCompress(entryJson).length; + if (currentSize + entrySize > maxMessageSize) { + sendLogs(subBatch); + subBatch.clear(); + currentSize = 0; + } + subBatch.add(entry); + currentSize += entrySize; + } + if (!subBatch.isEmpty()) { + sendLogs(subBatch); + } + } else { + sendLogs(logEntries); + } + } catch (IOException e) { + System.err.println("Error during log forwarding: " + e.getMessage()); + } + } + + private void sendLogs(List logEntries) throws IOException { + InetAddress localhost = InetAddress.getLocalHost(); + String hostname = localhost.getHostName(); + + List> logEvents = new ArrayList<>(); + for (LogEntry entry : logEntries) { + Map logEvent = objectMapper.convertValue(entry, LowercaseKeyMap.class); + logEvent.put("hostname", hostname); + logEvent.put("logtype", entry.getLogType()); + logEvent.put("applicationName", entry.getApplicationName()); + logEvent.put("name", entry.getName()); + logEvent.put("timestamp", entry.getTimestamp()); // Use log creation timestamp + logEvent.put("source", "NRBatchingAppender"); // Add custom field + logEvents.add(logEvent); + } + + String jsonPayload = objectMapper.writeValueAsString(logEvents); + byte[] compressedPayload = gzipCompress(jsonPayload); + + MediaType mediaType = MediaType.parse("application/json"); + + RequestBody requestBody = RequestBody.create(compressedPayload, mediaType); + Request request = new Request.Builder().url(apiURL).post(requestBody).addHeader("X-License-Key", apiKey) + .addHeader("Content-Type", "application/json").addHeader("Content-Encoding", "gzip").build(); + + try (Response response = client.newCall(request).execute()) { + if (!response.isSuccessful()) { + System.err.println("Failed to send logs to New Relic: " + response.code() + " - " + response.message()); + System.err.println("Response body: " + response.body().string()); + } else { + LocalDateTime timestamp = LocalDateTime.now(); + System.out.println("Logs sent to New Relic successfully: " + "at " + timestamp + " size: " + + compressedPayload.length + " Bytes"); + System.out.println("Response: " + response.body().string()); + } + } + } + + private byte[] gzipCompress(String input) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOS = new GZIPOutputStream(bos)) { + gzipOS.write(input.getBytes()); + } + return bos.toByteArray(); + } + + public void close() { + List remainingLogs = new ArrayList<>(); + logQueue.drainTo(remainingLogs); + if (!remainingLogs.isEmpty()) { + System.out.println("Flushing remaining " + remainingLogs.size() + " log events to New Relic..."); + flush(remainingLogs); + } + } +} \ No newline at end of file diff --git a/custom-log4j-appender/src/main/java/com/newrelic/labs/LowercaseKeyMap.java b/custom-log4j-appender/src/main/java/com/newrelic/labs/LowercaseKeyMap.java new file mode 100644 index 0000000..b7c0df2 --- /dev/null +++ b/custom-log4j-appender/src/main/java/com/newrelic/labs/LowercaseKeyMap.java @@ -0,0 +1,19 @@ +package com.newrelic.labs; + +import java.util.HashMap; +import java.util.Map; + +@SuppressWarnings("serial") +public class LowercaseKeyMap extends HashMap { + @Override + public Object put(String key, Object value) { + return super.put(key.toLowerCase(), value); + } + + @Override + public void putAll(Map m) { + for (Map.Entry entry : m.entrySet()) { + this.put(entry.getKey().toLowerCase(), entry.getValue()); + } + } +} \ No newline at end of file diff --git a/custom-log4j-appender/src/main/java/com/newrelic/labs/NewRelicBatchingAppender.java b/custom-log4j-appender/src/main/java/com/newrelic/labs/NewRelicBatchingAppender.java new file mode 100644 index 0000000..10fb011 --- /dev/null +++ b/custom-log4j-appender/src/main/java/com/newrelic/labs/NewRelicBatchingAppender.java @@ -0,0 +1,177 @@ +package com.newrelic.labs; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.apache.logging.log4j.status.StatusLogger; + +@SuppressWarnings("unused") +@Plugin(name = "NewRelicBatchingAppender", category = "Core", elementType = "appender", printObject = true) +public class NewRelicBatchingAppender extends AbstractAppender { + + private final BlockingQueue queue; + + private final String apiKey; + private final String apiUrl; + private final String applicationName; + private final String logType; + private final String name; + private final LogForwarder logForwarder; + private static final Logger logger = StatusLogger.getLogger(); + + private final int batchSize; + private final long maxMessageSize; + private final long flushInterval; + + private static final int DEFAULT_BATCH_SIZE = 5000; + private static final long DEFAULT_MAX_MESSAGE_SIZE = 1048576; // 1 MB + private static final long DEFAULT_FLUSH_INTERVAL = 120000; // 2 minutes + private static final String LOG_TYPE = "muleLog"; // defaultType + + protected NewRelicBatchingAppender(String name, Filter filter, Layout layout, + final boolean ignoreExceptions, String apiKey, String apiUrl, String applicationName, Integer batchSize, + Long maxMessageSize, Long flushInterval, String logType) { + super(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY); + this.queue = new LinkedBlockingQueue<>(); + this.apiKey = apiKey; + this.apiUrl = apiUrl; + this.applicationName = applicationName; + this.name = name; + this.batchSize = batchSize != null ? batchSize : DEFAULT_BATCH_SIZE; + this.maxMessageSize = maxMessageSize != null ? maxMessageSize : DEFAULT_MAX_MESSAGE_SIZE; + this.flushInterval = flushInterval != null ? flushInterval : DEFAULT_FLUSH_INTERVAL; + this.logType = ((logType != null) && (logType.length() > 0)) ? logType : LOG_TYPE; + this.logForwarder = new LogForwarder(apiKey, apiUrl, this.maxMessageSize, this.queue); + startFlushingTask(); + } + + @PluginFactory + public static NewRelicBatchingAppender createAppender(@PluginAttribute("name") String name, + @PluginElement("Layout") Layout layout, + @PluginElement("Filter") final Filter filter, @PluginAttribute("apiKey") String apiKey, + @PluginAttribute("apiUrl") String apiUrl, @PluginAttribute("applicationName") String applicationName, + @PluginAttribute(value = "batchSize") Integer batchSize, + @PluginAttribute(value = "maxMessageSize") Long maxMessageSize, @PluginAttribute("logType") String logType, + @PluginAttribute(value = "flushInterval") Long flushInterval) { + + if (name == null) { + logger.error("No name provided for NewRelicBatchingAppender"); + return null; + } + + if (layout == null) { + layout = PatternLayout.createDefaultLayout(); + } + + if (apiKey == null || apiUrl == null || applicationName == null) { + logger.error("API key, API URL, and application name must be provided for NewRelicBatchingAppender"); + return null; + } + + return new NewRelicBatchingAppender(name, filter, layout, true, apiKey, apiUrl, applicationName, batchSize, + maxMessageSize, flushInterval, logType); + } + + @Override + public void append(LogEvent event) { + if (!checkEntryConditions()) { + logger.warn("Appender not initialized. Dropping log entry"); + return; + } + + String message = new String(getLayout().toByteArray(event)); + String loggerName = event.getLoggerName(); + long timestamp = event.getTimeMillis(); // Capture the log creation timestamp + + // Extract MuleAppName from the message + String muleAppName = extractMuleAppName(message); + + logger.debug("Queueing message for New Relic: " + message); + + try { + // Directly add to the queue + queue.add(new LogEntry(message, applicationName, muleAppName, logType, timestamp)); + } catch (Exception e) { + logger.error("Unable to insert log entry into log queue. ", e); + } + } + + private String extractMuleAppName(String message) { + Pattern pattern = Pattern.compile("\\[.*?\\]\\..*?\\[([^\\]]+)\\]"); + Matcher matcher = pattern.matcher(message); + if (matcher.find()) { + return matcher.group(1); + } + return "generic"; + } + + private boolean checkEntryConditions() { + boolean initialized = logForwarder != null && logForwarder.isInitialized(); + logger.debug("Check entry conditions: " + initialized); + return initialized; + } + + private void startFlushingTask() { + Runnable flushTask = new Runnable() { + @Override + public void run() { + while (true) { + try { + logger.debug("Flushing task running..."); + List batch = new ArrayList<>(); + queue.drainTo(batch, batchSize); + if (!batch.isEmpty()) { + logger.debug("Flushing {} log entries to New Relic", batch.size()); + logForwarder.flush(batch); + } + Thread.sleep(flushInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Flushing task interrupted", e); + } + } + } + }; + + Thread flushThread = new Thread(flushTask); + flushThread.setDaemon(true); + flushThread.start(); + + // Log the configuration settings in use + logger.info( + "NewRelicBatchingAppender initialized with settings: batchSize={}, maxMessageSize={}, flushInterval={}", + batchSize, maxMessageSize, flushInterval); + } + + @Override + public boolean stop(final long timeout, final TimeUnit timeUnit) { + logger.debug("Stopping NewRelicBatchingAppender {}", getName()); + setStopping(); + final boolean stopped = super.stop(timeout, timeUnit, false); + try { + logForwarder.close(); + } catch (Exception e) { + logger.error("Unable to close appender", e); + } + setStopped(); + logger.debug("NewRelicBatchingAppender {} has been stopped", getName()); + return stopped; + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 67070a5..902c7fa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1,2 @@ rootProject.name = 'java-instrumentation-template' +include 'custom-log4j-appender'