Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
#339

日志归集优化,对于没有开启归集的日志文件不进行读取
  • Loading branch information
wenqiangzhu committed Jul 13, 2018
1 parent 14699c3 commit d20dc37
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.alibaba.fastjson.JSON;
Expand Down Expand Up @@ -76,6 +78,9 @@ public class LogAgent extends AgentFeatureComponent {
@SuppressWarnings("rawtypes")
private Map<String, Map> logCfgMapping = new ConcurrentHashMap<>();

// 存储刚刚开启归集的日志文件绝对路径,用于调整文件读取开始位置
private Set<String> newTailFiles = Collections.synchronizedSet(new HashSet<String>());

private int spantime = 100;

/**
Expand Down Expand Up @@ -728,6 +733,11 @@ public AppLogPatternInfoCollection getLatestLogProfileDataMap() {
return LatestLogProfileDataMap;
}

public Set<String> getNewTailFileSet() {

return newTailFiles;
}

public AppLogPatternInfoCollection getIssueLogProfileDataMap() {

return IssueLogProfileDataMap;
Expand Down Expand Up @@ -807,6 +817,7 @@ private void updateAllStrategy(String stragetyJson) {
mapping.put("absPath", logPath);
logCfgMapping.put(lcfg.getUUID(), mapping);

newTailFiles.add(new File(logPath).getAbsolutePath());
LogFilterAndRule lfar = new DefaultLogFilterAndRule(filter, separator, JSON.parseObject(fields), 0, 0);
RuleFilterFactory.getInstance().pubLogFilterAndRule(logPath, lfar);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -401,6 +402,24 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) {
long inode = getInode(f);
removeInvalidTFInode(f, inode);
TailFile tf = tailFiles.get(inode);

/**
* 如果是刚刚开启日志归集那么跳到文件尾部进行归集
*/
Set<String> newTailFileSet = logagent.getNewTailFileSet();
if (newTailFileSet.contains(f.getAbsolutePath())) {
newTailFileSet.remove(f.getAbsolutePath());
if (tf != null && tf.getRaf() != null) {
tf.getRaf().seek(f.length());
}
else if (tf != null) {
tf.setPos(f.length());
}
else {
skipToEnd = true;
}
}

if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
long startPos = skipToEnd ? f.length() : 0;// 第一次读取从头开始读
// how to get line's number ?
Expand All @@ -414,7 +433,7 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) {
tf.setAppUrl(appurl);
}
else {
boolean updated = tf.getLastUpdated() < f.lastModified();
boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() < f.length();
if (updated) {
if (tf.getRaf() == null) {// 获取文件的读取手柄
tf = openFile(serverid, appid, logid, f, headers, inode, tf.getPos(), tf.getNum());
Expand All @@ -423,7 +442,7 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) {
if (f.length() < tf.getPos()) { // 文件的长度小于上次读取的指针说明文件内容被删除了,改成从0读取
logger.info(this, "Pos " + tf.getPos() + " is larger than file size! "
+ "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
tf.updatePos(tf.getPath(), inode, 0, 0);
tf.updatePos(tf.getPath(), inode, 0, tf.getNum());
}
}
tf.setNeedTail(updated);// 设置是否需要监控指标
Expand All @@ -442,12 +461,16 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) {
/**
* @param f
* @param inodeCurrent
* @throws IOException
*/
private void removeInvalidTFInode(File f, long inodeCurrent) {
private void removeInvalidTFInode(File f, long inodeCurrent) throws IOException {
for (Long inodeKey : tailFiles.keySet()) {
TailFile tf = tailFiles.get(inodeKey);
if (tf.getPath().equals(f.getAbsolutePath()) && inodeKey != inodeCurrent) {
tailFiles.remove(inodeKey);
if (tf.getRaf() != null) {
tf.getRaf().close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@
import com.creditease.agent.feature.LogAgent;
import com.creditease.agent.feature.logagent.api.LogFilterAndRule;
import com.creditease.agent.feature.logagent.event.Event;
import com.creditease.agent.feature.logagent.far.DefaultLogFilterAndRule;
import com.creditease.agent.feature.logagent.objects.LogDataElement;
import com.creditease.agent.feature.logagent.objects.LogDataFrame;
import com.creditease.agent.feature.logagent.objects.LogPatternInfo;
import com.creditease.agent.helpers.NetworkHelper;
import com.creditease.agent.helpers.StringHelper;
import com.creditease.agent.log.api.ISystemLogger;
import com.creditease.agent.monitor.api.MonitorDataFrame;
import com.creditease.agent.monitor.api.NotificationEvent;
Expand Down Expand Up @@ -374,6 +376,15 @@ public Map<TailFile, List<Map>> tailFileProcessSeprate(TailFile tf, boolean back
public void tailFileCommon(TailFile tf, boolean backoffWithoutNL, Map<TailFile, List<Map>> serverlogs)
throws IOException, InterruptedException {

LogFilterAndRule main = RuleFilterFactory.getInstance().getLogFilterAndRule(tf.getPath());

if (main instanceof DefaultLogFilterAndRule) {
DefaultLogFilterAndRule defaultMain = (DefaultLogFilterAndRule) main;
if (StringHelper.isEmpty(defaultMain.getFilterPattern().pattern())) {
log.info(this, tf.getPath() + " collection not enable!");
return;
}
}
long current = System.currentTimeMillis();
boolean isEvents = false;
// while (true) {
Expand All @@ -385,7 +396,6 @@ public void tailFileCommon(TailFile tf, boolean backoffWithoutNL, Map<TailFile,
}

try {
LogFilterAndRule main = RuleFilterFactory.getInstance().getLogFilterAndRule(tf.getPath());
List<LogFilterAndRule> aids = RuleFilterFactory.getInstance().getAidLogFilterAndRuleList(tf.getPath());
List<Map> datalog = RuleFilterFactory.getInstance().createChain(reader, batchSize)
.setMainLogFilterAndRule(main).setAidLogFilterAndRuleList(aids).doProcess(events, backoffWithoutNL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,21 @@ public int getVersion() {
return version;
}

/**
* @return the filterPattern
*/
public Pattern getFilterPattern() {

return filterPattern;
}

/**
* @param filterPattern
* the filterPattern to set
*/
public void setFilterPattern(Pattern filterPattern) {

this.filterPattern = filterPattern;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,16 @@ else if (logPatternInfo2.getFlag() == StateFlag.EXIST) {
/**
* @param f
* @param inodeCurrent
* @throws IOException
*/
private void removeInvalidTFInode(File f, long inodeCurrent) {
private void removeInvalidTFInode(File f, long inodeCurrent) throws IOException {
for (Long inodeKey : tailFiles.keySet()) {
TailFile tf = tailFiles.get(inodeKey);
if (tf.getPath().equals(f.getAbsolutePath()) && inodeKey != inodeCurrent) {
tailFiles.remove(inodeKey);
if (tf.getRaf() != null) {
tf.getRaf().close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ private void pushLogLineToBulkRequest(MonitorDataFrame mdf, String appid, String
*/
String logFileType = logFileName;
StringBuilder uuidStr = new StringBuilder();
uuidStr.append(ipport).append(mdf.getServerId()).append("-").append(appid).append("-").append(logid);
uuidStr.append(ipport).append(mdf.getServerId()).append("-").append(appid).append("-").append(logid)
.append("-").append(lnum);
if (line.containsKey("content")) {
logFileType += "_def";
uuidStr.append("-").append(line.get("content"));
Expand All @@ -215,7 +216,7 @@ private void pushLogLineToBulkRequest(MonitorDataFrame mdf, String appid, String
}

/**
* 保证不重复:IP+SvrID+AppID+LogFileName+日志内容(def下为content)
* 保证不重复:IP+SvrID+AppID+LogFileName+lineNum+日志内容(def下为content)
*/
String uuid = EncodeHelper.encodeMD5(uuidStr.toString());

Expand Down

0 comments on commit d20dc37

Please sign in to comment.