diff --git a/src/main/java/org/jmxtrans/agent/DiscoveryQuery.java b/src/main/java/org/jmxtrans/agent/DiscoveryQuery.java
new file mode 100644
index 00000000..2830bbe8
--- /dev/null
+++ b/src/main/java/org/jmxtrans/agent/DiscoveryQuery.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2010-2013 the original author or authors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ */
+package org.jmxtrans.agent;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.jmxtrans.agent.util.StringUtils2;
+import org.jmxtrans.agent.util.json.JsonArray;
+import org.jmxtrans.agent.util.json.JsonObject;
+import org.jmxtrans.agent.util.logging.Logger;
+
+/**
+ * DiscoveryQuery : Used to discover a list of JMX beans matching a specific naming pattern.
+ * Used with the Zabbix server.
+ *
+ *
+ * For example, the following discovery rule :
+ *
+ *
+ * {@code}
+ * <discoveryQuery
+ * objectName="java.lang:type=GarbageCollector,name=*"
+ * attributes="name,type"
+ * resultAlias="discovery[garbageCollector]"
+ * collectIntervalInSeconds="300"/>
+ *
+ * <query objectName="java.lang:type=GarbageCollector,name=*"
+ * attributes="CollectionTime,CollectionCount"
+ * resultAlias="discovery[GarbageCollector.%name%.#attribute#]" />
+ *
+ *
+ * May yield the following discovery output (formatted for readability) :
+ *
+ *
+ * {@code}
+ * {"data":[
+ * {"{#name}":"PS Scavenge","{#type}":"GarbageCollector"},
+ * {"{#name}":"PS MarkSweep","{#type}":"GarbageCollector"}
+ * ]}
+ *
+ *
+ * On the Zabbix side, create a "Discovery Rule" of type "Zabbix trapper"
+ * with a "Key" that matches the result alias. You can then create "Item prototypes" that use the values.
+ *
+ * Sample Zabbix configuration that matches the example above :
+ *
+ *
+ * {@code}
+ * Discovery rule :
+ * Name : Discover Garbage Collectors
+ * Key : discovery[garbageCollector]
+ *
+ * Item Prototype
+ * Name : Object {#TYPE} named {#NAME}
+ * Key : discovery[{#TYPE}.{#NAME}.CollectionTime]
+ *
+ * Item Prototype
+ * Name : Object {#TYPE} named {#NAME}
+ * Key : discovery[{#TYPE}.{#NAME}.CollectionCount]
+ *
+ *
+ *
+ * NOTE : It can take a few minutes for Zabbix to enable newly created discovery
+ * rules and item prototypes.
+ *
+ * @author Steve McDuff
+ */
+public class DiscoveryQuery extends Query
+{
+ private final Logger logger = Logger.getLogger(getClass().getName());
+
+ public DiscoveryQuery(String objectName, List attributes, String key, Integer position, String type,
+ String resultAlias, ResultNameStrategy resultNameStrategy, Integer collectInterval)
+ {
+ super(objectName, attributes, key, position, type, resultAlias, resultNameStrategy, collectInterval);
+ }
+
+ @Override
+ public void collectAndExport(MBeanServer mbeanServer, OutputWriter outputWriter)
+ {
+ if (resultNameStrategy == null)
+ throw new IllegalStateException(
+ "resultNameStrategy is not defined, query object is not properly initialized");
+
+ try
+ {
+ Set objectNames = mbeanServer.queryNames(objectName, null);
+
+ String discoveryResult = formatDiscoveryValue(objectNames);
+
+ String resultName = resultNameStrategy.getResultName(this, objectName, null, null, null);
+ String type = getType();
+ outputWriter.writeQueryResult(resultName, type, discoveryResult);
+ }
+ catch (Exception ex)
+ {
+ logger.log(Level.WARNING, "DiscoveryQuery. Exception collecting " + objectName + "#" + getAttributes() +
+ (key == null ? "" : "#" + key), ex);
+ }
+ }
+
+ private String formatDiscoveryValue(Set objectNames)
+ {
+ JsonObject result = new JsonObject();
+ JsonArray data = new JsonArray();
+ result.add("data", data);
+
+ for (ObjectName on : objectNames)
+ {
+ try
+ {
+ JsonObject discoveredObject = new JsonObject();
+ for (String attribute : attributes)
+ {
+ // Generate the discovered property with the format {"{#NAME}":"value"}
+ String keyProperty = on.getKeyProperty(attribute);
+ // skip nulls
+ if (keyProperty != null)
+ {
+ String formattedKey = formatDiscoveryKey(attribute);
+ String formattedValue = formatDiscoveryValue(keyProperty);
+ discoveredObject.add(formattedKey, formattedValue);
+ }
+ }
+ data.add(discoveredObject);
+ }
+ catch (Exception ex)
+ {
+ logger.log(Level.WARNING, "DiscoveryQuery.formatDiscoveryValue. Exception collecting " + objectName +
+ "#" + getAttributes() + (key == null ? "" : "#" + key), ex);
+ }
+ }
+
+ String discoveryResult = result.toString();
+ return discoveryResult;
+ }
+
+ public String formatDiscoveryValue(String keyProperty)
+ {
+ // transform the property values to match the way JMXTransAgent
+ // will format them in the default naming strategy.
+ StringBuilder builder = new StringBuilder();
+ StringUtils2.appendEscapedNonAlphaNumericChars(keyProperty, builder);
+ keyProperty = builder.toString();
+ return keyProperty;
+ }
+
+ private String formatDiscoveryKey(String attribute)
+ {
+ return "{#" + attribute.toUpperCase() + "}";
+ }
+
+}
diff --git a/src/main/java/org/jmxtrans/agent/JmxTransConfigurationXmlLoader.java b/src/main/java/org/jmxtrans/agent/JmxTransConfigurationXmlLoader.java
index 9b55a31f..f68f87da 100644
--- a/src/main/java/org/jmxtrans/agent/JmxTransConfigurationXmlLoader.java
+++ b/src/main/java/org/jmxtrans/agent/JmxTransConfigurationXmlLoader.java
@@ -121,6 +121,7 @@ protected JmxTransExporterConfiguration build(Document document) {
buildResultNameStrategy(rootElement, jmxTransExporterConfiguration, resolver);
buildInvocations(rootElement, jmxTransExporterConfiguration);
buildQueries(rootElement, jmxTransExporterConfiguration);
+ buildDiscoveryQueries(rootElement, jmxTransExporterConfiguration);
buildOutputWriters(rootElement, jmxTransExporterConfiguration, resolver);
@@ -190,6 +191,29 @@ private void buildQueries(Element rootElement, JmxTransExporterConfiguration con
configuration.withQuery(objectName, attributes, key, position, type, resultAlias, collectInterval);
}
}
+
+ private void buildDiscoveryQueries(Element rootElement, JmxTransExporterConfiguration configuration) {
+ NodeList queries = rootElement.getElementsByTagName("discoveryQuery");
+ for (int i = 0; i < queries.getLength(); i++) {
+ Element queryElement = (Element) queries.item(i);
+ String objectName = queryElement.getAttribute("objectName");
+ List attributes = getAttributes(queryElement, objectName);
+ String key = queryElement.hasAttribute("key") ? queryElement.getAttribute("key") : null;
+ String resultAlias = queryElement.getAttribute("resultAlias");
+ String type = queryElement.getAttribute("type");
+ Integer position;
+ try {
+ position = queryElement.hasAttribute("position") ? Integer.parseInt(queryElement.getAttribute("position")) : null;
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid 'position' attribute for query objectName=" + objectName +
+ ", attributes=" + attributes + ", resultAlias=" + resultAlias);
+
+ }
+ Integer collectInterval = intAttributeOrNull(queryElement, COLLECT_INTERVAL_NAME);
+
+ configuration.withDiscoveryQuery(objectName, attributes, key, position, type, resultAlias, collectInterval);
+ }
+ }
private List getAttributes(Element queryElement, String objectName) {
String attribute = queryElement.getAttribute("attribute");
diff --git a/src/main/java/org/jmxtrans/agent/JmxTransExporterConfiguration.java b/src/main/java/org/jmxtrans/agent/JmxTransExporterConfiguration.java
index 30eedd10..7f3bafb4 100644
--- a/src/main/java/org/jmxtrans/agent/JmxTransExporterConfiguration.java
+++ b/src/main/java/org/jmxtrans/agent/JmxTransExporterConfiguration.java
@@ -148,4 +148,13 @@ public void destroy() {
getOutputWriter().preDestroy();
}
+ public JmxTransExporterConfiguration withDiscoveryQuery(@Nonnull String objectName,
+ @Nonnull List attributes, @Nullable String key, @Nullable Integer position, @Nullable String type,
+ @Nullable String resultAlias, @Nullable Integer collectInterval) {
+ Query query = new DiscoveryQuery(objectName, attributes, key, position, type, resultAlias, this.resultNameStrategy,
+ collectInterval);
+ queries.add(query);
+ return this;
+ }
+
}
diff --git a/src/main/java/org/jmxtrans/agent/zabbix/ZabbixTcpOutputWriter.java b/src/main/java/org/jmxtrans/agent/zabbix/ZabbixTcpOutputWriter.java
index 80b6d8fb..90245a27 100644
--- a/src/main/java/org/jmxtrans/agent/zabbix/ZabbixTcpOutputWriter.java
+++ b/src/main/java/org/jmxtrans/agent/zabbix/ZabbixTcpOutputWriter.java
@@ -1,311 +1,311 @@
-/*
- * Copyright (c) 2010-2013 the original author or authors
- *
- * Permission is hereby granted, free of charge, to any person obtaining
- * a copy of this software and associated documentation files (the
- * "Software"), to deal in the Software without restriction, including
- * without limitation the rights to use, copy, modify, merge, publish,
- * distribute, sublicense, and/or sell copies of the Software, and to
- * permit persons to whom the Software is furnished to do so, subject to
- * the following conditions:
- *
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
- * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
- * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
- * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- *
- */
-package org.jmxtrans.agent.zabbix;
-
-import static org.jmxtrans.agent.util.ConfigurationUtils.getInt;
-import static org.jmxtrans.agent.util.ConfigurationUtils.getString;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import org.jmxtrans.agent.AbstractOutputWriter;
-import org.jmxtrans.agent.OutputWriter;
-import org.jmxtrans.agent.util.io.IoUtils;
-import org.jmxtrans.agent.util.net.HostAndPort;
-
-/**
- * @author Steve McDuff
- */
-public class ZabbixTcpOutputWriter extends AbstractOutputWriter implements OutputWriter
-{
-
- private static final int ZABBIX_HEADER_LENGTH = 13;
- public final static String SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS = "socket.connectTimeoutInMillis";
- public final static int SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE = 500;
-
- protected HostAndPort zabbixServerHostAndPort;
- private Socket socket;
- private ByteArrayOutputStream out = new ByteArrayOutputStream();
- private byte[] readBuffer = new byte[10000];
- private int socketConnectTimeoutInMillis = SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE;
- private ZabbixMetricMessageBuilder messageBuilder;
-
- private int printMetricCount = 0;
- private int metricBatchSize;
-
- private boolean failedConnection = false;
-
- @Override
- public void postConstruct(Map settings)
- {
- super.postConstruct(settings);
-
- zabbixServerHostAndPort = new HostAndPort(getString(settings, ZabbixOutputWriterCommonSettings.SETTING_HOST),
- getInt(settings, ZabbixOutputWriterCommonSettings.SETTING_PORT,
- ZabbixOutputWriterCommonSettings.SETTING_PORT_DEFAULT_VALUE));
- messageBuilder = new ZabbixMetricMessageBuilder(
- ZabbixOutputWriterCommonSettings.getConfiguredHostName(settings));
- socketConnectTimeoutInMillis = getInt(settings, SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS,
- SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE);
-
- metricBatchSize = ZabbixOutputWriterCommonSettings.getMetricBatchSize(settings);
-
- logger.log(getInfoLevel(),
- "ZabbixTcpOutputWriter is configured with " + zabbixServerHostAndPort + ", serverName=" +
- messageBuilder.getHostName() + ", socketConnectTimeoutInMillis=" + socketConnectTimeoutInMillis +
- ", mmetricBatchSize=" + metricBatchSize);
- }
-
- @Override
- public void writeInvocationResult(@Nonnull String invocationName, @Nullable Object value) throws IOException
- {
- writeQueryResult(invocationName, null, value);
- }
-
- private byte[] messageHeader = "{\"request\":\"sender data\",\"data\":[".getBytes(StandardCharsets.UTF_8);
- private byte[] comma = ",".getBytes(StandardCharsets.UTF_8);
- private byte[] messageFooter = "]}".getBytes(StandardCharsets.UTF_8);
-
-
- @Override
- public void writeQueryResult(@Nonnull String metricName, @Nullable String type, @Nullable Object value)
- throws IOException
- {
- String msg = messageBuilder.buildMessage(metricName, value,
- TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS));
-
- try
- {
- if (logger.isLoggable(getTraceLevel()))
- {
- logger.log(getTraceLevel(), "Send '" + msg + "' to " + zabbixServerHostAndPort);
- }
-
- if (printMetricCount == 0)
- {
- // print the header
- writeMessage(messageHeader);
- }
- else
- {
- writeMessage(comma);
- }
-
- printMetricCount += 1;
- writeMessage(msg);
-
- if (printMetricCount >= metricBatchSize)
- {
- if (logger.isLoggable(Level.FINE))
- {
- logger.fine(
- "Reached batch size maximum of " + metricBatchSize + " .Forcing message output to Zabbix.");
- }
- postCollect();
- }
-
- }
- catch (IOException e)
- {
- logger.log(Level.WARNING,
- "Exception sending '" + msg + "' of size : " + out.size() + "bytes to " + zabbixServerHostAndPort, e);
-
- releaseZabbixConnection();
- throw e;
- }
- }
-
- private void writeMessage(String msg) throws IOException
- {
- if (logger.isLoggable(Level.FINEST))
- {
- logger.finest("Print : " + msg);
- }
-
- byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
- writeMessage(bytes);
- }
-
- protected void writeMessage(byte[] data) throws IOException
- {
- out.write(data);
- }
-
- private void writeZabbixHeader(int length) throws IOException
- {
- byte[] zabbixHeader = new byte[]
- { 'Z', 'B', 'X', 'D', '\1', (byte) (length & 0xFF), (byte) ((length >> 8) & 0x00FF),
- (byte) ((length >> 16) & 0x0000FF), (byte) ((length >> 24) & 0x000000FF), '\0', '\0', '\0', '\0' };
-
- socket.getOutputStream().write(zabbixHeader);
- }
-
- private void releaseZabbixConnection()
- {
- IoUtils.closeQuietly(out);
- IoUtils.closeQuietly(socket);
- }
-
- private void ensureZabbixConnection() throws IOException
- {
- boolean socketIsValid;
- try
- {
- socketIsValid = socket != null && socket.isConnected() && socket.isBound() && !socket.isClosed() &&
- !socket.isInputShutdown() && !socket.isOutputShutdown();
- }
- catch (Exception e)
- {
- socketIsValid = false;
- }
- if (!socketIsValid)
- {
- long start = System.currentTimeMillis();
- try
- {
- socket = new Socket();
- socket.setKeepAlive(true);
- socket.connect(
- new InetSocketAddress(zabbixServerHostAndPort.getHost(), zabbixServerHostAndPort.getPort()),
- socketConnectTimeoutInMillis);
- }
- catch (IOException e)
- {
- ConnectException ce = new ConnectException("Exception connecting to " + zabbixServerHostAndPort);
- ce.initCause(e);
- throw ce;
- }
- long end = System.currentTimeMillis();
-
- if (logger.isLoggable(Level.FINE))
- {
- logger.fine("Connect time : " + (end - start));
- }
- }
- }
-
- @Override
- public void postCollect() throws IOException
- {
- if (printMetricCount == 0)
- {
- // nothing to print
- return;
- }
-
- try
- {
- writeMessage(messageFooter);
-
- byte[] byteArray = out.toByteArray();
- out.reset();
-
- if (logger.isLoggable(Level.FINEST))
- {
- String msg = new String(byteArray, StandardCharsets.UTF_8);
- logger.finest("message : " + msg);
- }
-
- ensureZabbixConnection();
-
- writeZabbixHeader(byteArray.length);
- socket.getOutputStream().write(byteArray);
-
- printMetricCount = 0;
-
- drainInputStream();
-
- // Zabbix seems to drop connections on every message.
- // Forcing a drop prevents timeout exceptions for subsequent messages.
- releaseZabbixConnection();
- failedConnection = false;
- }
- catch (IOException e)
- {
- // log subsequent failures at a lower log level.
- if( failedConnection ) {
- logger.log(Level.FINE, "Exception flushing the stream to " + zabbixServerHostAndPort, e);
- }
- else {
- logger.log(Level.WARNING, "Exception flushing the stream to " + zabbixServerHostAndPort, e);
- }
- releaseZabbixConnection();
- throw e;
- }
- }
-
- private void drainInputStream() throws IOException
- {
- try
- {
- int readSize = socket.getInputStream().read(readBuffer);
- if (logger.isLoggable(Level.FINE))
- {
-
- String message = extractZabbixResponseString(readSize, readBuffer);
- logger.log(Level.FINE, message);
- }
- }
- catch (Exception ex)
- {
- logger.log(Level.WARNING, "Failed to read from scoket " + zabbixServerHostAndPort, ex);
- }
- }
-
- private String extractZabbixResponseString(int readSize, byte[] buffer)
- {
- String msg = "";
- if (readSize > ZABBIX_HEADER_LENGTH)
- {
- // skip the zabbix header
- msg = new String(buffer, ZABBIX_HEADER_LENGTH, readSize - ZABBIX_HEADER_LENGTH, StandardCharsets.UTF_8);
- }
- return msg;
- }
-
- @Override
- public String toString()
- {
- return "ZabbixTcpOutputWriter{" + ", " + zabbixServerHostAndPort + ", serverName='" +
- messageBuilder.getHostName() + '\'' + '}';
- }
-
- @Override
- public void preDestroy()
- {
- super.preDestroy();
- releaseZabbixConnection();
- }
-
-}
+/*
+ * Copyright (c) 2010-2013 the original author or authors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ */
+package org.jmxtrans.agent.zabbix;
+
+import static org.jmxtrans.agent.util.ConfigurationUtils.getInt;
+import static org.jmxtrans.agent.util.ConfigurationUtils.getString;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.jmxtrans.agent.AbstractOutputWriter;
+import org.jmxtrans.agent.OutputWriter;
+import org.jmxtrans.agent.util.io.IoUtils;
+import org.jmxtrans.agent.util.net.HostAndPort;
+
+/**
+ * @author Steve McDuff
+ */
+public class ZabbixTcpOutputWriter extends AbstractOutputWriter implements OutputWriter
+{
+
+ private static final int ZABBIX_HEADER_LENGTH = 13;
+ public final static String SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS = "socket.connectTimeoutInMillis";
+ public final static int SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE = 500;
+
+ protected HostAndPort zabbixServerHostAndPort;
+ private Socket socket;
+ private ByteArrayOutputStream out = new ByteArrayOutputStream();
+ private byte[] readBuffer = new byte[10000];
+ private int socketConnectTimeoutInMillis = SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE;
+ private ZabbixMetricMessageBuilder messageBuilder;
+
+ private int printMetricCount = 0;
+ private int metricBatchSize;
+
+ private boolean failedConnection = false;
+
+ @Override
+ public void postConstruct(Map settings)
+ {
+ super.postConstruct(settings);
+
+ zabbixServerHostAndPort = new HostAndPort(getString(settings, ZabbixOutputWriterCommonSettings.SETTING_HOST),
+ getInt(settings, ZabbixOutputWriterCommonSettings.SETTING_PORT,
+ ZabbixOutputWriterCommonSettings.SETTING_PORT_DEFAULT_VALUE));
+ messageBuilder = new ZabbixMetricMessageBuilder(
+ ZabbixOutputWriterCommonSettings.getConfiguredHostName(settings));
+ socketConnectTimeoutInMillis = getInt(settings, SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS,
+ SETTING_SOCKET_CONNECT_TIMEOUT_IN_MILLIS_DEFAULT_VALUE);
+
+ metricBatchSize = ZabbixOutputWriterCommonSettings.getMetricBatchSize(settings);
+
+ logger.log(getInfoLevel(),
+ "ZabbixTcpOutputWriter is configured with " + zabbixServerHostAndPort + ", serverName=" +
+ messageBuilder.getHostName() + ", socketConnectTimeoutInMillis=" + socketConnectTimeoutInMillis +
+ ", metricBatchSize=" + metricBatchSize);
+ }
+
+ @Override
+ public void writeInvocationResult(@Nonnull String invocationName, @Nullable Object value) throws IOException
+ {
+ writeQueryResult(invocationName, null, value);
+ }
+
+ private byte[] messageHeader = "{\"request\":\"sender data\",\"data\":[".getBytes(StandardCharsets.UTF_8);
+ private byte[] comma = ",".getBytes(StandardCharsets.UTF_8);
+ private byte[] messageFooter = "]}".getBytes(StandardCharsets.UTF_8);
+
+
+ @Override
+ public void writeQueryResult(@Nonnull String metricName, @Nullable String type, @Nullable Object value)
+ throws IOException
+ {
+ String msg = messageBuilder.buildMessage(metricName, value,
+ TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+
+ try
+ {
+ if (logger.isLoggable(getTraceLevel()))
+ {
+ logger.log(getTraceLevel(), "Send '" + msg + "' to " + zabbixServerHostAndPort);
+ }
+
+ if (printMetricCount == 0)
+ {
+ // print the header
+ writeMessage(messageHeader);
+ }
+ else
+ {
+ writeMessage(comma);
+ }
+
+ printMetricCount += 1;
+ writeMessage(msg);
+
+ if (printMetricCount >= metricBatchSize)
+ {
+ if (logger.isLoggable(Level.FINE))
+ {
+ logger.fine(
+ "Reached batch size maximum of " + metricBatchSize + " .Forcing message output to Zabbix.");
+ }
+ postCollect();
+ }
+
+ }
+ catch (IOException e)
+ {
+ logger.log(Level.WARNING,
+ "Exception sending '" + msg + "' of size : " + out.size() + "bytes to " + zabbixServerHostAndPort, e);
+
+ releaseZabbixConnection();
+ throw e;
+ }
+ }
+
+ private void writeMessage(String msg) throws IOException
+ {
+ if (logger.isLoggable(Level.FINEST))
+ {
+ logger.finest("Print : " + msg);
+ }
+
+ byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
+ writeMessage(bytes);
+ }
+
+ protected void writeMessage(byte[] data) throws IOException
+ {
+ out.write(data);
+ }
+
+ private void writeZabbixHeader(int length) throws IOException
+ {
+ byte[] zabbixHeader = new byte[]
+ { 'Z', 'B', 'X', 'D', '\1', (byte) (length & 0xFF), (byte) ((length >> 8) & 0x00FF),
+ (byte) ((length >> 16) & 0x0000FF), (byte) ((length >> 24) & 0x000000FF), '\0', '\0', '\0', '\0' };
+
+ socket.getOutputStream().write(zabbixHeader);
+ }
+
+ private void releaseZabbixConnection()
+ {
+ IoUtils.closeQuietly(out);
+ IoUtils.closeQuietly(socket);
+ }
+
+ private void ensureZabbixConnection() throws IOException
+ {
+ boolean socketIsValid;
+ try
+ {
+ socketIsValid = socket != null && socket.isConnected() && socket.isBound() && !socket.isClosed() &&
+ !socket.isInputShutdown() && !socket.isOutputShutdown();
+ }
+ catch (Exception e)
+ {
+ socketIsValid = false;
+ }
+ if (!socketIsValid)
+ {
+ long start = System.currentTimeMillis();
+ try
+ {
+ socket = new Socket();
+ socket.setKeepAlive(true);
+ socket.connect(
+ new InetSocketAddress(zabbixServerHostAndPort.getHost(), zabbixServerHostAndPort.getPort()),
+ socketConnectTimeoutInMillis);
+ }
+ catch (IOException e)
+ {
+ ConnectException ce = new ConnectException("Exception connecting to " + zabbixServerHostAndPort);
+ ce.initCause(e);
+ throw ce;
+ }
+ long end = System.currentTimeMillis();
+
+ if (logger.isLoggable(Level.FINE))
+ {
+ logger.fine("Connect time : " + (end - start));
+ }
+ }
+ }
+
+ @Override
+ public void postCollect() throws IOException
+ {
+ if (printMetricCount == 0)
+ {
+ // nothing to print
+ return;
+ }
+
+ try
+ {
+ writeMessage(messageFooter);
+
+ byte[] byteArray = out.toByteArray();
+ out.reset();
+
+ if (logger.isLoggable(Level.FINEST))
+ {
+ String msg = new String(byteArray, StandardCharsets.UTF_8);
+ logger.finest("message : " + msg);
+ }
+
+ ensureZabbixConnection();
+
+ writeZabbixHeader(byteArray.length);
+ socket.getOutputStream().write(byteArray);
+
+ printMetricCount = 0;
+
+ drainInputStream();
+
+ // Zabbix seems to drop connections on every message.
+ // Forcing a drop prevents timeout exceptions for subsequent messages.
+ releaseZabbixConnection();
+ failedConnection = false;
+ }
+ catch (IOException e)
+ {
+ // log subsequent failures at a lower log level.
+ if( failedConnection ) {
+ logger.log(Level.FINE, "Exception flushing the stream to " + zabbixServerHostAndPort, e);
+ }
+ else {
+ logger.log(Level.WARNING, "Exception flushing the stream to " + zabbixServerHostAndPort, e);
+ }
+ releaseZabbixConnection();
+ throw e;
+ }
+ }
+
+ private void drainInputStream() throws IOException
+ {
+ try
+ {
+ int readSize = socket.getInputStream().read(readBuffer);
+ if (logger.isLoggable(Level.FINE))
+ {
+
+ String message = extractZabbixResponseString(readSize, readBuffer);
+ logger.log(Level.FINE, message);
+ }
+ }
+ catch (Exception ex)
+ {
+ logger.log(Level.WARNING, "Failed to read from scoket " + zabbixServerHostAndPort, ex);
+ }
+ }
+
+ private String extractZabbixResponseString(int readSize, byte[] buffer)
+ {
+ String msg = "";
+ if (readSize > ZABBIX_HEADER_LENGTH)
+ {
+ // skip the zabbix header
+ msg = new String(buffer, ZABBIX_HEADER_LENGTH, readSize - ZABBIX_HEADER_LENGTH, StandardCharsets.UTF_8);
+ }
+ return msg;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ZabbixTcpOutputWriter{" + ", " + zabbixServerHostAndPort + ", serverName='" +
+ messageBuilder.getHostName() + '\'' + '}';
+ }
+
+ @Override
+ public void preDestroy()
+ {
+ super.preDestroy();
+ releaseZabbixConnection();
+ }
+
+}
diff --git a/src/test/java/org/jmxtrans/agent/zabbix/DiscoveryQueryTest.java b/src/test/java/org/jmxtrans/agent/zabbix/DiscoveryQueryTest.java
new file mode 100644
index 00000000..8474f2b2
--- /dev/null
+++ b/src/test/java/org/jmxtrans/agent/zabbix/DiscoveryQueryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2010-2016 the original author or authors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ */
+package org.jmxtrans.agent.zabbix;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.management.MBeanServer;
+
+import org.jmxtrans.agent.DiscoveryQuery;
+import org.jmxtrans.agent.ResultNameStrategyImpl;
+import org.jmxtrans.agent.util.json.Json;
+import org.jmxtrans.agent.util.json.JsonArray;
+import org.jmxtrans.agent.util.json.JsonObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+/**
+ * DiscoveryQueryTest
+ *
+ * @author Steve McDuff
+ */
+public class DiscoveryQueryTest
+{
+ public DiscoveryQueryTest()
+ {
+ super();
+ }
+
+ /**
+ * Extract the Garbage Collector JMX data. Validate the format
+ * Of the Zabbix discovery output.
+ * @throws Exception
+ */
+ @Test
+ public void fetchDiscoveryQuery() throws Exception
+ {
+ TestOutputWriter output = new TestOutputWriter();
+ ResultNameStrategyImpl strategy = new ResultNameStrategyImpl();
+ List attributes = new ArrayList<>();
+ attributes.add("name");
+ attributes.add("type");
+
+ DiscoveryQuery query = new DiscoveryQuery("java.lang:type=GarbageCollector,name=*", attributes, null, null, null, "discovery[garbageCollector]", strategy, 200);
+
+
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+ query.collectAndExport(mbeanServer, output);
+
+ Assert.assertNotNull(output.lastValue);
+ Assert.assertEquals("discovery[garbageCollector]",output.lastMetricName);
+
+ JsonObject parse = (JsonObject) Json.parse((String)output.lastValue);
+ JsonArray data = (JsonArray) parse.get("data");
+ Assert.assertTrue("Must have at least 2 garbage collectors", data.size() >=2 );
+
+ JsonObject gc1 = (JsonObject) data.get(0);
+ String name1 = gc1.getString("{#NAME}", null);
+ Assert.assertNotNull(name1);
+ String type1 = gc1.getString("{#TYPE}", null);
+ Assert.assertEquals("GarbageCollector",type1);
+
+
+ JsonObject gc2 = (JsonObject) data.get(0);
+ String name2 = gc2.getString("{#NAME}", null);
+ Assert.assertNotNull(name2);
+ String type2 = gc1.getString("{#TYPE}", null);
+ Assert.assertEquals("GarbageCollector",type2);
+
+ }
+
+ @Test
+ public void testFormatValue() {
+
+ ResultNameStrategyImpl strategy = new ResultNameStrategyImpl();
+ List attributes = new ArrayList<>();
+ attributes.add("name");
+ attributes.add("type");
+
+ DiscoveryQuery query = new DiscoveryQuery("java.lang:type=GarbageCollector,name=*", attributes, null, null, null, "discovery[garbageCollector]", strategy, 200);
+
+ String formatDiscoveryValue = query.formatDiscoveryValue("\"Operation : public void my.package.MyClass(my.package.String value, my.other.Value other) throws somethign, and.another\"");
+
+ Assert.assertEquals("Operation___public_void_my_package_MyClass_my_package_String_value__my_other_Value_other__throws_somethign__and_another", formatDiscoveryValue);
+
+
+ }
+
+}
diff --git a/src/test/java/org/jmxtrans/agent/zabbix/TestOutputWriter.java b/src/test/java/org/jmxtrans/agent/zabbix/TestOutputWriter.java
new file mode 100644
index 00000000..023eee77
--- /dev/null
+++ b/src/test/java/org/jmxtrans/agent/zabbix/TestOutputWriter.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2010-2016 the original author or authors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ */
+package org.jmxtrans.agent.zabbix;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.jmxtrans.agent.OutputWriter;
+
+/**
+ * TestOutputWriter
+ *
+ * @author Steve McDuff
+ */
+
+public class TestOutputWriter implements OutputWriter
+{
+
+ public String lastMetricName;
+
+ public Object lastValue;
+
+ @Override
+ public void postConstruct(Map settings)
+ {
+
+ }
+
+ @Override
+ public void preDestroy()
+ {
+
+ }
+
+ @Override
+ public void preCollect() throws IOException
+ {
+
+ }
+
+ @Override
+ public void writeQueryResult(String metricName, String metricType, Object value) throws IOException
+ {
+ lastMetricName = metricName;
+ lastValue = value;
+ }
+
+ @Override
+ public void postCollect() throws IOException
+ {
+
+ }
+
+ @Override
+ public void writeInvocationResult(String invocationName, Object value) throws IOException
+ {
+
+ }
+
+}
diff --git a/src/test/java/org/jmxtrans/agent/zabbix/ZabbixTcpOutputWriterTest.java b/src/test/java/org/jmxtrans/agent/zabbix/ZabbixTcpOutputWriterTest.java
index e5e6c541..07259b25 100644
--- a/src/test/java/org/jmxtrans/agent/zabbix/ZabbixTcpOutputWriterTest.java
+++ b/src/test/java/org/jmxtrans/agent/zabbix/ZabbixTcpOutputWriterTest.java
@@ -49,12 +49,12 @@ public ZabbixTcpOutputWriterTest()
super();
}
-
@Rule
public TcpByteLineServer tcpByteServer = new TcpByteLineServer();
@Test
- public void reconnectsAfterServerClosesConnection() throws Exception {
+ public void reconnectsAfterServerClosesConnection() throws Exception
+ {
ZabbixTcpOutputWriter zabbixWriter = new ZabbixTcpOutputWriter();
Map config = new HashMap<>();
config.put(ZabbixOutputWriterCommonSettings.SETTING_HOST, "127.0.0.1");
@@ -68,55 +68,49 @@ public void reconnectsAfterServerClosesConnection() throws Exception {
// Write one metric and verify that it is received
writeTestMetric(zabbixWriter);
assertEventuallyReceived(tcpByteServer, hasSize(greaterThan(1)));
-
+
tcpByteServer.stop();
}
-
- public void testZabbixTrapper() throws Exception {
- ZabbixTcpOutputWriter zabbixWriter = new ZabbixTcpOutputWriter();
- Map config = new HashMap<>();
- config.put(ZabbixOutputWriterCommonSettings.SETTING_HOST, "myhost");
- config.put(ZabbixOutputWriterCommonSettings.SETTING_PORT, "10051");
- config.put(ZabbixOutputWriterCommonSettings.SETTING_SERVER_NAME, "jmxtransagenttest");
- zabbixWriter.postConstruct(config);
- // Write one metric to see it is received
-
- for( int i=0;i<100;++i) {
- writeTestMetric(zabbixWriter);
- Thread.sleep(10000);
- }
- }
+ public int value = 1;
- public int value=1;
-
- public void switchValue() {
- if( value < 10 ) {
+ public void switchValue()
+ {
+ if (value < 10)
+ {
value += 1;
}
- else {
+ else
+ {
value = 1;
}
}
- private void writeTestMetric(AbstractOutputWriter writer) {
+ private void writeTestMetric(AbstractOutputWriter writer)
+ {
switchValue();
- try {
- writer.writeQueryResult("jmxtransagentinputtest", null, value);
- writer.writeQueryResult("second", null, value+20);
-
- tcpByteServer.readResponse ="ZBXA100000000{\"result\":\"success\"}".getBytes(StandardCharsets.UTF_8);
+ try
+ {
+ writer.writeQueryResult("jmxtransagentinputtest", null, value);
+ writer.writeQueryResult("second", null, value + 20);
+
+ tcpByteServer.readResponse = "ZBXA100000000{\"result\":\"success\"}".getBytes(StandardCharsets.UTF_8);
writer.postCollect();
- } catch (Exception e) {
+ }
+ catch (Exception e)
+ {
e.printStackTrace();
}
}
public void assertEventuallyReceived(TcpByteLineServer server, Matcher> matcher)
- throws Exception {
- for (int i = 0; i < 100; i++) {
- if (matcher.matches(server.getReceivedLines())) {
+ throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ if (matcher.matches(server.getReceivedLines()))
+ {
return;
}
Thread.sleep(10);