From d20dc379ff7353e7b33103efa8d8f4acc2dc882e Mon Sep 17 00:00:00 2001 From: wenqiangzhu Date: Fri, 13 Jul 2018 11:24:05 +0800 Subject: [PATCH] #339 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://github.com/uavorg/uavstack/issues/339 日志归集优化,对于没有开启归集的日志文件不进行读取 --- .../creditease/agent/feature/LogAgent.java | 11 +++++++ .../logagent/ReliableTaildirEventReader.java | 29 +++++++++++++++++-- .../feature/logagent/TaildirLogComponent.java | 12 +++++++- .../logagent/far/DefaultLogFilterAndRule.java | 17 +++++++++++ .../ReliableTaildirEventReader.java | 6 +++- .../handlers/NewLogDataMessageHandler.java | 5 ++-- 6 files changed, 73 insertions(+), 7 deletions(-) diff --git a/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/LogAgent.java b/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/LogAgent.java index 5405f0a7..e9ac29bf 100644 --- a/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/LogAgent.java +++ b/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/LogAgent.java @@ -28,11 +28,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import com.alibaba.fastjson.JSON; @@ -76,6 +78,9 @@ public class LogAgent extends AgentFeatureComponent { @SuppressWarnings("rawtypes") private Map logCfgMapping = new ConcurrentHashMap<>(); + // 存储刚刚开启归集的日志文件绝对路径,用于调整文件读取开始位置 + private Set newTailFiles = Collections.synchronizedSet(new HashSet()); + private int spantime = 100; /** @@ -728,6 +733,11 @@ public AppLogPatternInfoCollection getLatestLogProfileDataMap() { return LatestLogProfileDataMap; } + public Set getNewTailFileSet() { + + return newTailFiles; + } + public AppLogPatternInfoCollection getIssueLogProfileDataMap() { return IssueLogProfileDataMap; @@ -807,6 +817,7 @@ private void updateAllStrategy(String stragetyJson) { mapping.put("absPath", logPath); logCfgMapping.put(lcfg.getUUID(), mapping); + newTailFiles.add(new File(logPath).getAbsolutePath()); LogFilterAndRule lfar = new DefaultLogFilterAndRule(filter, separator, JSON.parseObject(fields), 0, 0); RuleFilterFactory.getInstance().pubLogFilterAndRule(logPath, lfar); } diff --git a/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/ReliableTaildirEventReader.java b/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/ReliableTaildirEventReader.java index 44987c64..ed6dbf45 100644 --- a/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/ReliableTaildirEventReader.java +++ b/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/ReliableTaildirEventReader.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -401,6 +402,24 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) { long inode = getInode(f); removeInvalidTFInode(f, inode); TailFile tf = tailFiles.get(inode); + + /** + * 如果是刚刚开启日志归集那么跳到文件尾部进行归集 + */ + Set newTailFileSet = logagent.getNewTailFileSet(); + if (newTailFileSet.contains(f.getAbsolutePath())) { + newTailFileSet.remove(f.getAbsolutePath()); + if (tf != null && tf.getRaf() != null) { + tf.getRaf().seek(f.length()); + } + else if (tf != null) { + tf.setPos(f.length()); + } + else { + skipToEnd = true; + } + } + if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { long startPos = skipToEnd ? f.length() : 0;// 第一次读取从头开始读 // how to get line's number ? @@ -414,7 +433,7 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) { tf.setAppUrl(appurl); } else { - boolean updated = tf.getLastUpdated() < f.lastModified(); + boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() < f.length(); if (updated) { if (tf.getRaf() == null) {// 获取文件的读取手柄 tf = openFile(serverid, appid, logid, f, headers, inode, tf.getPos(), tf.getNum()); @@ -423,7 +442,7 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) { if (f.length() < tf.getPos()) { // 文件的长度小于上次读取的指针说明文件内容被删除了,改成从0读取 logger.info(this, "Pos " + tf.getPos() + " is larger than file size! " + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode); - tf.updatePos(tf.getPath(), inode, 0, 0); + tf.updatePos(tf.getPath(), inode, 0, tf.getNum()); } } tf.setNeedTail(updated);// 设置是否需要监控指标 @@ -442,12 +461,16 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) { /** * @param f * @param inodeCurrent + * @throws IOException */ - private void removeInvalidTFInode(File f, long inodeCurrent) { + private void removeInvalidTFInode(File f, long inodeCurrent) throws IOException { for (Long inodeKey : tailFiles.keySet()) { TailFile tf = tailFiles.get(inodeKey); if (tf.getPath().equals(f.getAbsolutePath()) && inodeKey != inodeCurrent) { tailFiles.remove(inodeKey); + if (tf.getRaf() != null) { + tf.getRaf().close(); + } } } } diff --git a/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/TaildirLogComponent.java b/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/TaildirLogComponent.java index c81ba5e6..fc46433d 100644 --- a/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/TaildirLogComponent.java +++ b/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/TaildirLogComponent.java @@ -54,10 +54,12 @@ import com.creditease.agent.feature.LogAgent; import com.creditease.agent.feature.logagent.api.LogFilterAndRule; import com.creditease.agent.feature.logagent.event.Event; +import com.creditease.agent.feature.logagent.far.DefaultLogFilterAndRule; import com.creditease.agent.feature.logagent.objects.LogDataElement; import com.creditease.agent.feature.logagent.objects.LogDataFrame; import com.creditease.agent.feature.logagent.objects.LogPatternInfo; import com.creditease.agent.helpers.NetworkHelper; +import com.creditease.agent.helpers.StringHelper; import com.creditease.agent.log.api.ISystemLogger; import com.creditease.agent.monitor.api.MonitorDataFrame; import com.creditease.agent.monitor.api.NotificationEvent; @@ -374,6 +376,15 @@ public Map> tailFileProcessSeprate(TailFile tf, boolean back public void tailFileCommon(TailFile tf, boolean backoffWithoutNL, Map> serverlogs) throws IOException, InterruptedException { + LogFilterAndRule main = RuleFilterFactory.getInstance().getLogFilterAndRule(tf.getPath()); + + if (main instanceof DefaultLogFilterAndRule) { + DefaultLogFilterAndRule defaultMain = (DefaultLogFilterAndRule) main; + if (StringHelper.isEmpty(defaultMain.getFilterPattern().pattern())) { + log.info(this, tf.getPath() + " collection not enable!"); + return; + } + } long current = System.currentTimeMillis(); boolean isEvents = false; // while (true) { @@ -385,7 +396,6 @@ public void tailFileCommon(TailFile tf, boolean backoffWithoutNL, Map aids = RuleFilterFactory.getInstance().getAidLogFilterAndRuleList(tf.getPath()); List datalog = RuleFilterFactory.getInstance().createChain(reader, batchSize) .setMainLogFilterAndRule(main).setAidLogFilterAndRuleList(aids).doProcess(events, backoffWithoutNL); diff --git a/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/far/DefaultLogFilterAndRule.java b/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/far/DefaultLogFilterAndRule.java index 391d5ac1..d0d19fd4 100644 --- a/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/far/DefaultLogFilterAndRule.java +++ b/com.creditease.uav.agent/src/main/java/com/creditease/agent/feature/logagent/far/DefaultLogFilterAndRule.java @@ -184,4 +184,21 @@ public int getVersion() { return version; } + /** + * @return the filterPattern + */ + public Pattern getFilterPattern() { + + return filterPattern; + } + + /** + * @param filterPattern + * the filterPattern to set + */ + public void setFilterPattern(Pattern filterPattern) { + + this.filterPattern = filterPattern; + } + } diff --git a/com.creditease.uav.collect/src/main/java/com/creditease/uav/collect/client/copylogagent/ReliableTaildirEventReader.java b/com.creditease.uav.collect/src/main/java/com/creditease/uav/collect/client/copylogagent/ReliableTaildirEventReader.java index 252e6b7a..72fc41f5 100644 --- a/com.creditease.uav.collect/src/main/java/com/creditease/uav/collect/client/copylogagent/ReliableTaildirEventReader.java +++ b/com.creditease.uav.collect/src/main/java/com/creditease/uav/collect/client/copylogagent/ReliableTaildirEventReader.java @@ -219,12 +219,16 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) { /** * @param f * @param inodeCurrent + * @throws IOException */ - private void removeInvalidTFInode(File f, long inodeCurrent) { + private void removeInvalidTFInode(File f, long inodeCurrent) throws IOException { for (Long inodeKey : tailFiles.keySet()) { TailFile tf = tailFiles.get(inodeKey); if (tf.getPath().equals(f.getAbsolutePath()) && inodeKey != inodeCurrent) { tailFiles.remove(inodeKey); + if (tf.getRaf() != null) { + tf.getRaf().close(); + } } } } diff --git a/com.creditease.uav.invokechain/src/main/java/com/creditease/uav/healthmanager/newlog/handlers/NewLogDataMessageHandler.java b/com.creditease.uav.invokechain/src/main/java/com/creditease/uav/healthmanager/newlog/handlers/NewLogDataMessageHandler.java index 3a620b0f..4a6facd8 100644 --- a/com.creditease.uav.invokechain/src/main/java/com/creditease/uav/healthmanager/newlog/handlers/NewLogDataMessageHandler.java +++ b/com.creditease.uav.invokechain/src/main/java/com/creditease/uav/healthmanager/newlog/handlers/NewLogDataMessageHandler.java @@ -202,7 +202,8 @@ private void pushLogLineToBulkRequest(MonitorDataFrame mdf, String appid, String */ String logFileType = logFileName; StringBuilder uuidStr = new StringBuilder(); - uuidStr.append(ipport).append(mdf.getServerId()).append("-").append(appid).append("-").append(logid); + uuidStr.append(ipport).append(mdf.getServerId()).append("-").append(appid).append("-").append(logid) + .append("-").append(lnum); if (line.containsKey("content")) { logFileType += "_def"; uuidStr.append("-").append(line.get("content")); @@ -215,7 +216,7 @@ private void pushLogLineToBulkRequest(MonitorDataFrame mdf, String appid, String } /** - * 保证不重复:IP+SvrID+AppID+LogFileName+日志内容(def下为content) + * 保证不重复:IP+SvrID+AppID+LogFileName+lineNum+日志内容(def下为content) */ String uuid = EncodeHelper.encodeMD5(uuidStr.toString());