diff --git a/Makefile b/Makefile index ac3f8a9f9..c2b509c42 100644 --- a/Makefile +++ b/Makefile @@ -87,6 +87,6 @@ lint: .PHONY: mock mock: @go get github.com/golang/mock/gomock - @go install github.com/golang/mock/mockgen - @~/go/bin/mockgen -source=plugin/output/s3/s3.go -destination=plugin/output/s3/mock/s3.go - @~/go/bin/mockgen -source=plugin/output/postgres/postgres.go -destination=plugin/output/postgres/mock/postgres.go + @mockgen -source=plugin/output/s3/structs.go -destination=plugin/output/s3/mock/s3.go + @mockgen -source=plugin/output/postgres/postgres.go -destination=plugin/output/postgres/mock/postgres.go + @mockgen -source=pipeline/plugin.go -destination=pipeline/mocks/pipeline_plugin_mock/plugin_mock.go -package=pipeline_plugin_mock diff --git a/offset/json_offset.go b/offset/json_offset.go new file mode 100644 index 000000000..6abbd44a6 --- /dev/null +++ b/offset/json_offset.go @@ -0,0 +1,53 @@ +package offset + +import ( + "bytes" + "encoding/json" + "io" +) + +type jsonValue struct { + value interface{} +} + +func (o *jsonValue) Load(r io.Reader) error { + b, err := io.ReadAll(r) + if err != nil { + return err + } + + d := json.NewDecoder(bytes.NewReader(b)) + d.UseNumber() + + return d.Decode(o.value) +} + +func (o *jsonValue) Save(w io.Writer) error { + b, err := json.Marshal(o.value) + if err != nil { + return err + } + _, err = w.Write(b) + if err != nil { + return err + } + _, err = w.Write([]byte("\n")) + if err != nil { + return err + } + return nil +} + +func newJSONOffset(path string, value interface{}) *Offset { + res := NewOffset(path) + res.Callback = &jsonValue{value} + return res +} + +func SaveJSON(path string, value interface{}) error { + return newJSONOffset(path, value).Save() +} + +func LoadJSON(path string, value interface{}) error { + return newJSONOffset(path, value).Load() +} diff --git a/offset/offset_test.go b/offset/offset_test.go index 283187348..8e3b19494 100644 --- a/offset/offset_test.go +++ b/offset/offset_test.go @@ -43,7 +43,7 @@ func TestYAML(t *testing.T) { assert.Equal(t, offset, loaded) } -func TestSaveLoad(t *testing.T) { +func TestSaveLoadYaml(t *testing.T) { path := getTmpPath(t, "offset.yaml") offset := testOffset{} offset.set("some_name", 123) @@ -58,6 +58,21 @@ func TestSaveLoad(t *testing.T) { assert.Equal(t, offset, loaded) } +func TestSaveLoadJson(t *testing.T) { + path := getTmpPath(t, "offset.json") + offset := testOffset{} + offset.set("some_name", 12345) + + err := SaveJSON(path, &offset) + assert.NoError(t, err) + + loaded := testOffset{} + err = LoadYAML(path, &loaded) + assert.NoError(t, err) + + assert.Equal(t, offset, loaded) +} + func TestAppendFile(t *testing.T) { path := getTmpPath(t, "offset.yaml") for i := 1; i < 5; i++ { diff --git a/offset/simple_offset.go b/offset/yaml_offset.go similarity index 100% rename from offset/simple_offset.go rename to offset/yaml_offset.go diff --git a/plugin/output/file/file.go b/plugin/output/file/file.go index 336fbac1f..537cec56c 100644 --- a/plugin/output/file/file.go +++ b/plugin/output/file/file.go @@ -14,7 +14,6 @@ import ( "github.com/ozontech/file.d/logger" "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/pipeline" - "go.uber.org/zap" "golang.org/x/net/context" ) @@ -41,9 +40,13 @@ type Plugin struct { fileName string tsFileName string - SealUpCallback func(string) + SealUpCallback func(string, string) + FileMetaCallback func(filename string) (fileOuterDestination, ext string) mu *sync.RWMutex + + metaWriter *meta + pairOfTimestamps *pair } type data struct { @@ -90,6 +93,26 @@ type Config struct { //> File mode for log files FileMode cfg.Base8 `json:"file_mode" default:"0666" parse:"base8"` //* FileMode_ int64 + + //> MetaCfg describes metafile with sealfile desription + MetaCfg MetaConfig `json:"meta_config" child:"true"` //* +} + +type MetaConfig struct { + //> Stores metadata of sealed files. + EnableMetaFiles bool `json:"enable_meta_files"` //* + //> MetaDataFile saves metainformation about sealed file. + MetaDataDir string `json:"metadata_dir" default:"/var/log/file-d.meta"` //* + //> SealedMetaPrefix defines prefix of sealed meta file + SealedMetaPrefix string `json:"sealed_metafile_prefix" default:"sealed_meta_"` //* + //> TimestampField that contains valid timestamp. + TimestampField string `json:"timestamp_field" default:"timestamp"` //* + //> Static part of json meta. + StaticMeta string `json:"static_meta" default:""` //* + //> SealedFileNameField stores name of sead file + SealedFileNameField string `json:"sealed_filename_field" default:"sealed_name"` //* + //> SealedFilePathFieldName stores name of sead file with path + SealedFilePathFieldName string `json:"sealed_filepath_name_field" default:"sealed_path"` //* } func init() { @@ -114,29 +137,85 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.fileName = file[0 : len(file)-len(p.fileExtension)] p.tsFileName = "%s" + "-" + p.fileName - p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{ - PipelineName: params.PipelineName, - OutputType: outPluginType, - OutFn: p.out, - Controller: p.controller, - Workers: p.config.WorkersCount_, - BatchSizeCount: p.config.BatchSize_, - BatchSizeBytes: p.config.BatchSizeBytes_, - FlushTimeout: p.config.BatchFlushTimeout_, - }) - - p.mu = &sync.RWMutex{} ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel + if p.config.MetaCfg.EnableMetaFiles { + metaWriter, err := newMeta( + metaInit{ + config: p.config, + fileName: p.fileName, + separator: fileNameSeparator, + filePrefix: metaFilePrefix, + }, + ) + if err != nil { + params.Logger.Fatalf("can't create meta: %s", err.Error()) + } + + p.metaWriter = metaWriter + + p.pairOfTimestamps = NewPair() + } + + p.batcher = pipeline.NewBatcher( + pipeline.BatcherOptions{ + PipelineName: params.PipelineName, + OutputType: outPluginType, + OutFn: p.out, + MaintenanceFn: p.maintenance, + Controller: p.controller, + Workers: p.config.WorkersCount_, + BatchSizeCount: p.config.BatchSize_, + BatchSizeBytes: p.config.BatchSizeBytes_, + FlushTimeout: p.config.BatchFlushTimeout_, + MaintenanceInterval: pipeline.DefaultMaintenanceInterval, + }, + /* params.PipelineName, + outPluginType, + p.out, + nil, + p.controller, + p.config.WorkersCount_, + p.config.BatchSize_, + p.config.BatchFlushTimeout_, + 0, */ + ) + + p.mu = &sync.RWMutex{} + if err := os.MkdirAll(p.targetDir, os.ModePerm); err != nil { p.logger.Fatalf("could not create target dir: %s, error: %s", p.targetDir, err.Error()) } p.idx = p.getStartIdx() - p.createNew() + if err := p.startUpCreateNewFile(); err != nil { + p.logger.Panic(err) + } p.setNextSealUpTime() + if p.config.MetaCfg.EnableMetaFiles { + longpanic.Go(func() { + for { + select { + case <-ctx.Done(): + return + //case <-time.After(p.config.RetentionInterval_ / 10): + case <-time.After(time.Second * 2): + p.mu.RLock() + min, max := p.pairOfTimestamps.Get() + p.mu.RUnlock() + if min == 0 && max == 0 { + continue + } + if err := p.metaWriter.updateMetaFileWithLock(min, max); err != nil { + p.logger.Errorf("can't update meta: %s", err.Error()) + } + } + } + }) + } + if p.file == nil { p.logger.Panic("file struct is nil!") } @@ -150,6 +229,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.batcher.Start(ctx) } +func (p *Plugin) maintenance(workerData *pipeline.WorkerData) {} + func (p *Plugin) Stop() { // we MUST NOT close file, through p.file.Close(), fileSealUpTicker already do this duty. p.batcher.Stop() @@ -175,12 +256,43 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) { outBuf := data.outBuf[:0] + minTs, maxTs := int64(0), int64(0) for _, event := range batch.Events { outBuf, _ = event.Encode(outBuf) outBuf = append(outBuf, byte('\n')) + + if p.config.MetaCfg.EnableMetaFiles { + ts, err := event.Root.DigStrict(p.config.MetaCfg.TimestampField) + if err != nil { + p.logger.Errorf("doesn't have timestamp field: %s", err.Error()) + continue + } + tsInt, err := ts.AsInt64() + if err != nil { + p.logger.Errorf("time field isn't int64: %s", err.Error()) + continue + } + if tsInt < minTs || minTs == 0 { + minTs = tsInt + } + if tsInt > maxTs { + maxTs = tsInt + } + } } data.outBuf = outBuf + if p.config.MetaCfg.EnableMetaFiles { + // lock prevents values leaking from new log file to old meta file. + // it's possible without lock: + // 1. log file sealed + // 2. new batch processed and pair updated + // 3. meta file sealed + p.mu.Lock() + p.pairOfTimestamps.UpdatePair(minTs, maxTs) + p.mu.Unlock() + } + p.write(outBuf) } @@ -215,24 +327,56 @@ func (p *Plugin) write(data []byte) { } } -func (p *Plugin) createNew() { - p.tsFileName = fmt.Sprintf("%d%s%s%s", time.Now().Unix(), fileNameSeparator, p.fileName, p.fileExtension) +// startUpCreateNewFile creates of restores log and meta files during startUp. +func (p *Plugin) startUpCreateNewFile() error { + return p.createNew("", "") +} + +// createNew creates new or appends to existing log file +func (p *Plugin) createNew(sealingLogFile, sealingOuterPath string) error { + timestamp := time.Now().Unix() + // file name like: 1654785468_file-d.log + p.tsFileName = fmt.Sprintf("%d%s%s%s", timestamp, fileNameSeparator, p.fileName, p.fileExtension) logger.Infof("tsFileName in createNew=%s", p.tsFileName) - f := fmt.Sprintf("%s%s", p.targetDir, p.tsFileName) + // full path like: /var/log/1654785468_file-d.log + fileName := fmt.Sprintf("%s%s", p.targetDir, p.tsFileName) + // search for old unsealed file pattern := fmt.Sprintf("%s*%s%s%s", p.targetDir, fileNameSeparator, p.fileName, p.fileExtension) matches, err := filepath.Glob(pattern) if err != nil { p.logger.Fatalf("can't glob: pattern=%s, err=%ss", pattern, err.Error()) } + // existense of file means crash of prev run before it was sealed. Pull up old file instead of new if len(matches) == 1 { p.tsFileName = path.Base(matches[0]) - f = fmt.Sprintf("%s%s", p.targetDir, p.tsFileName) + fileName = fmt.Sprintf("%s%s", p.targetDir, p.tsFileName) } - file, err := os.OpenFile(f, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.FileMode(p.config.FileMode_)) + file, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.FileMode(p.config.FileMode_)) if err != nil { - p.logger.Panicf("could not open or create file: %s, error: %s", f, err.Error()) + return fmt.Errorf("could not open or create file: %s, error: %s", fileName, err.Error()) } p.file = file + + // new meta file + if p.config.MetaCfg.EnableMetaFiles { + // createNew() guarded by mutex on upper layer. + f, l := p.pairOfTimestamps.Reset() + if err := p.metaWriter.newMetaFile( + p.fileName, + sealUpDTO{ + sealingOuterPath: sealingOuterPath, + sealingLogFile: sealingLogFile, + firstTimestamp: f, + lastTimestamp: l, + }, + timestamp, + p.pairOfTimestamps, + ); err != nil { + return err + } + } + + return nil } // sealUp manages current file: renames, closes, and creates new. @@ -246,19 +390,33 @@ func (p *Plugin) sealUp() { } // newFileName will be like: ".var/log/log_1_01-02-2009_15:04.log - newFileName := filepath.Join(p.targetDir, fmt.Sprintf("%s%s%d%s%s%s", p.fileName, fileNameSeparator, p.idx, fileNameSeparator, time.Now().Format(p.config.Layout), p.fileExtension)) - p.rename(newFileName) + now := time.Now().Format(p.config.Layout) + finalOldFileName := filepath.Join(p.targetDir, fmt.Sprintf("%s%s%d%s%s%s", p.fileName, fileNameSeparator, p.idx, fileNameSeparator, now, p.fileExtension)) + p.rename(finalOldFileName) oldFile := p.file p.mu.Lock() - p.createNew() + + outerPath, ext := "", "" + if p.FileMetaCallback != nil { + outerPath, ext = p.FileMetaCallback(filepath.Base(finalOldFileName)) + } + + if err := p.createNew(fmt.Sprintf("%s%s", filepath.Base(finalOldFileName), ext), outerPath); err != nil { + p.logger.Panic(err) + } + p.nextSealUpTime = time.Now().Add(p.config.RetentionInterval_) p.mu.Unlock() if err := oldFile.Close(); err != nil { p.logger.Panicf("could not close file: %s, error: %s", oldFile.Name(), err.Error()) } - logger.Errorf("sealing in %d, newFile: %s", time.Now().Unix(), newFileName) + + logger.Errorf("sealing in %d, newFile: %s", time.Now().Unix(), finalOldFileName) if p.SealUpCallback != nil { - longpanic.Go(func() { p.SealUpCallback(newFileName) }) + if outerPath != "" { + outerPath = filepath.Base(outerPath) + } + longpanic.Go(func() { p.SealUpCallback(finalOldFileName, outerPath) }) } } diff --git a/plugin/output/file/file_test.go b/plugin/output/file/file_test.go index 4dd73b2b1..9a7fdd87c 100644 --- a/plugin/output/file/file_test.go +++ b/plugin/output/file/file_test.go @@ -1,7 +1,9 @@ package file import ( + "encoding/json" "fmt" + "io/ioutil" "os" "path" "path/filepath" @@ -20,6 +22,7 @@ import ( const ( targetFile = "filetests/log.log" targetFileThreshold = "filetests/%d%slog.log" + metaFileDir = "filetests/meta" ) var ( @@ -138,6 +141,122 @@ func TestSealUpHasContent(t *testing.T) { } } +func TestSealUpHasContentMetaOn(t *testing.T) { + cfg := Config{ + TargetFile: targetFile, + RetentionInterval_: 200 * time.Millisecond, + Layout: "01", + FileMode_: 0o666, + MetaCfg: MetaConfig{ + EnableMetaFiles: true, + MetaDataDir: metaFileDir, + SealedMetaPrefix: "sealed_meta_", + SealedFilePathFieldName: "sealed_file_path", + SealedFileNameField: "sealed_file_name", + }, + } + + dir, file := filepath.Split(cfg.TargetFile) + + extension := filepath.Ext(file) + test.ClearDir(t, dir) + createDir(t, dir) + defer test.ClearDir(t, dir) + + d := []byte("some data") + testFileName := fmt.Sprintf(targetFileThreshold, time.Now().Unix(), fileNameSeparator) + f := createFile(t, testFileName, &d) + defer f.Close() + infoInitial, _ := f.Stat() + assert.NotZero(t, infoInitial.Size()) + + pair := NewPair() + min := int64(10) + max := int64(9999) + pair.UpdatePair(max, min) + + p := Plugin{ + config: &cfg, + mu: &sync.RWMutex{}, + file: f, + targetDir: dir, + fileExtension: extension, + fileName: file[0 : len(file)-len(extension)], + tsFileName: path.Base(testFileName), + pairOfTimestamps: pair, + } + + objName := filepath.Join(metaFileDir, "heh12345.log.zip") + ext := ".zip" + p.FileMetaCallback = func(in string) (string, string) { + return objName, ext + } + + metaWriter, err := newMeta( + metaInit{ + config: &cfg, + fileName: p.fileName, + separator: fileNameSeparator, + filePrefix: metaFilePrefix, + }, + ) + require.NoError(t, err) + now := time.Now().Unix() + err = metaWriter.newMetaFile(metaWriter.fileName, sealUpDTO{}, now, pair) + require.NoError(t, err) + p.metaWriter = metaWriter + + // call func + p.sealUp() + + // check work result + pattern := fmt.Sprintf("%s/*%s", p.targetDir, p.fileExtension) + matches := test.GetMatches(t, pattern) + assert.Equal(t, 2, len(matches)) + + // check new file was created and it is empty + info, err := p.file.Stat() + assert.EqualValues(t, 0, info.Size()) + assert.NoError(t, err) + + // check old file was renamed. And renamed file is not empty and contains data + for _, v := range matches { + if v != fmt.Sprintf("%s%s", dir, p.tsFileName) { + info, err := os.Stat(v) + assert.NoError(t, err) + assert.EqualValues(t, len(d), info.Size()) + } + } + + // check that meta files exists + metaPattern := fmt.Sprintf("%s/*", p.config.MetaCfg.MetaDataDir) + metaMatches := test.GetMatches(t, metaPattern) + fmt.Println(metaPattern, metaMatches) + assert.Equal(t, 2, len(metaMatches)) + + sealedMeta, err := os.Open(metaMatches[1]) + require.NoError(t, err) + defer sealedMeta.Close() + fileBytes, _ := ioutil.ReadAll(sealedMeta) + dest := make(map[string]interface{}) + json.Unmarshal(fileBytes, &dest) + require.NoError(t, err) + + // check that sealed meta contains correct data + val, ok := dest["first_timestamp"] + assert.True(t, ok) + assert.Equal(t, float64(min), val) + val, ok = dest["last_timestamp"] + assert.True(t, ok) + assert.Equal(t, float64(max), val) + val, ok = dest[cfg.MetaCfg.SealedFileNameField] + assert.True(t, ok) + assert.Equal(t, filepath.Base(objName), val) + val, ok = dest[cfg.MetaCfg.SealedFilePathFieldName] + assert.True(t, ok) + assert.Equal(t, objName, val) +} + func TestSealUpNoContent(t *testing.T) { cfg := Config{ TargetFile: targetFile, diff --git a/plugin/output/file/files.go b/plugin/output/file/files.go index 42048f78e..58eb94a01 100644 --- a/plugin/output/file/files.go +++ b/plugin/output/file/files.go @@ -73,6 +73,7 @@ func (p *Plugins) Add(plugName string, plug Plugable) { } // Exists asks if such file.Plugin exists in Plugins. +// Concurrent safe. func (p *Plugins) Exists(plugName string) (exists bool) { return p.IsStatic(plugName) || p.IsDynamic(plugName) } diff --git a/plugin/output/file/meta.go b/plugin/output/file/meta.go new file mode 100644 index 000000000..b0e4c39fe --- /dev/null +++ b/plugin/output/file/meta.go @@ -0,0 +1,179 @@ +package file + +import ( + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "sync" + + "github.com/ozontech/file.d/logger" + "github.com/ozontech/file.d/offset" +) + +const ( + metaFilePrefix = "meta_" + firstTimestampField = "first_timestamp" + lastTimestampField = "last_timestamp" +) + +type meta struct { + filePrefix, + fileName, + separator, + dir, + sealedFileFieldName, + sealedFilePathFieldName string + staticMeta map[string]interface{} + + fileMode int64 + mu sync.Mutex + + sealedFilePrefix string + tsFileName string + file *os.File +} + +type metaInit struct { + config *Config + fileName, + separator, + filePrefix string +} + +type sealUpDTO struct { + firstTimestamp, + lastTimestamp int64 + sealingLogFile, + sealingOuterPath string +} + +func newMeta(init metaInit) (*meta, error) { + staticMetaMap := make(map[string]interface{}) + metaString := init.config.MetaCfg.StaticMeta + if metaString != "" { + if err := json.Unmarshal([]byte(metaString), &staticMetaMap); err != nil { + return nil, err + } + } + + return &meta{ + fileName: init.fileName, + separator: init.separator, + dir: init.config.MetaCfg.MetaDataDir, + filePrefix: init.filePrefix, + sealedFilePrefix: init.config.MetaCfg.SealedMetaPrefix, + sealedFileFieldName: init.config.MetaCfg.SealedFileNameField, + sealedFilePathFieldName: init.config.MetaCfg.SealedFilePathFieldName, + staticMeta: staticMetaMap, + fileMode: init.config.FileMode_, + }, nil +} + +// sealUpCurrentMeta creates finalized meta for sealed file. +func (m *meta) sealUpCurrentMeta(sealupDTO sealUpDTO) error { + // ignore during startup + if m.tsFileName == "" { + return nil + } + sealedName := m.sealedFilePrefix + sealupDTO.sealingLogFile + + if err := m.updateMetaFile(sealupDTO.firstTimestamp, sealupDTO.lastTimestamp, sealupDTO.sealingOuterPath); err != nil { + return err + } + + currentMetaFileName := filepath.Join(m.dir, m.tsFileName) + if err := os.Rename(currentMetaFileName, filepath.Join(m.dir, sealedName)); err != nil { + return fmt.Errorf("can't rename metafile: %s", err.Error()) + } + + return nil +} + +// newMetaFile creates new or pulls up old file +func (m *meta) newMetaFile( + filename string, + sealUp sealUpDTO, + timestamp int64, + intPair *pair, +) error { + m.mu.Lock() + defer m.mu.Unlock() + + if err := m.sealUpCurrentMeta(sealUp); err != nil { + return err + } + + m.tsFileName = fmt.Sprintf("%s%d%s%s", m.filePrefix, timestamp, m.separator, filename) + pattern := filepath.Join(m.dir, m.filePrefix+"*"+m.fileName+".json") + matches, err := filepath.Glob(pattern) + if err != nil { + return fmt.Errorf("can't glob: pattern=%s, err=%ss", pattern, err.Error()) + } + // Reuse old file + if len(matches) == 1 { + m.tsFileName = path.Base(matches[0]) + + var result map[string]interface{} + err := offset.LoadJSON(filepath.Join(m.dir, m.tsFileName), &result) + if err != nil { + logger.Errorf("can't load json meta: %s", err.Error()) + } + + // update pair after fail + if f, ok := result[firstTimestampField]; ok { + if err := intPair.UpdatePairJsonNumber(f); err != nil { + logger.Errorf("candidate isn't json.Number: %s", err.Error()) + } + } + if f, ok := result[lastTimestampField]; ok { + if err := intPair.UpdatePairJsonNumber(f); err != nil { + logger.Errorf("candidate isn't json.Number: %s", err.Error()) + } + } + } else { + m.tsFileName += ".json" + } + metaName := filepath.Join(m.dir, m.tsFileName) + + _, err = os.Stat(m.dir) + if err != nil { + if os.IsNotExist(err) { + if err = os.MkdirAll(m.dir, os.ModePerm); err != nil { + return fmt.Errorf("can't create meta dir: %s", err.Error()) + } + } else { + return fmt.Errorf("can't check if meta dir exists: %s", err.Error()) + } + } + + metaFile, err := os.OpenFile(metaName, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm) + if err != nil { + return fmt.Errorf("could not open or create file: %s, error: %s", metaName, err.Error()) + } + m.file = metaFile + + return nil +} + +func (m *meta) updateMetaFile(firstTimestamp, lastTimestamp int64, sealUpOuterPath string) error { + values := make(map[string]interface{}) + for k, v := range m.staticMeta { + values[k] = v + } + values[firstTimestampField] = firstTimestamp + values[lastTimestampField] = lastTimestamp + values[m.sealedFileFieldName] = filepath.Base(sealUpOuterPath) + values[m.sealedFilePathFieldName] = sealUpOuterPath + + return offset.SaveJSON(filepath.Join(m.dir, m.tsFileName), values) +} + +// updateMetaFileWithLock keeps meta in actual state +func (m *meta) updateMetaFileWithLock(firstTimestamp, lastTimestamp int64) error { + m.mu.Lock() + defer m.mu.Unlock() + + return m.updateMetaFile(firstTimestamp, lastTimestamp, "") +} diff --git a/plugin/output/file/pair.go b/plugin/output/file/pair.go new file mode 100644 index 000000000..39a677acf --- /dev/null +++ b/plugin/output/file/pair.go @@ -0,0 +1,50 @@ +package file + +import "encoding/json" + +// pair represents pair of given ints +type pair struct { + min, max int64 +} + +// NewPair creates float pair +func NewPair() *pair { + return &pair{} +} + +// UpdatePair compares and replaces min and max with candidates. +func (p *pair) UpdatePair(candidates ...int64) { + for _, candidate := range candidates { + if p.min == 0 || candidate < p.min { + p.min = candidate + } + if candidate > p.max { + p.max = candidate + } + } +} + +// UpdatePairJsonNumber updates pair if value is valid json number. +func (p *pair) UpdatePairJsonNumber(candidate interface{}) error { + if fInt, ok := candidate.(json.Number); ok { + res, err := fInt.Int64() + if err != nil { + return err + } + p.UpdatePair(res) + } + + return nil +} + +// Reset returns min, max and resets +func (p *pair) Reset() (min int64, max int64) { + min, max = p.min, p.max + p.min, p.max = 0, 0 + return min, max +} + +// Get returns current pair values +func (p *pair) Get() (min int64, max int64) { + return p.min, p.max +} diff --git a/plugin/output/file/pair_test.go b/plugin/output/file/pair_test.go new file mode 100644 index 000000000..4de11ec1d --- /dev/null +++ b/plugin/output/file/pair_test.go @@ -0,0 +1,61 @@ +package file + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPairUpdate(t *testing.T) { + pair := NewPair() + + min := int64(1) + max := int64(949289429149812319) + candidates := []int64{ + max, + rand.Int63n(max), + rand.Int63n(max), + rand.Int63n(max), + min, + rand.Int63n(max), + rand.Int63n(max), + rand.Int63n(max), + } + + pair.UpdatePair(candidates...) + + require.Equal(t, min, pair.min) + require.Equal(t, max, pair.max) +} + +func TestPairGet(t *testing.T) { + pair := NewPair() + + min := int64(33) + max := int64(6666) + pair.min = min + pair.max = max + + getMin, getMax := pair.Get() + + require.Equal(t, min, getMin) + require.Equal(t, max, getMax) +} + +func TestPairReset(t *testing.T) { + pair := NewPair() + + min := int64(444) + max := int64(66666) + pair.min = min + pair.max = max + + getMin, getMax := pair.Reset() + + require.Equal(t, min, getMin) + require.Equal(t, max, getMax) + + require.Equal(t, int64(0), pair.min) + require.Equal(t, int64(0), pair.max) +} diff --git a/plugin/output/s3/mock/s3.go b/plugin/output/s3/mock/s3.go index f9a0497bf..1d20f9d71 100644 --- a/plugin/output/s3/mock/s3.go +++ b/plugin/output/s3/mock/s3.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: plugin/output/s3/s3.go +// Source: plugin/output/s3/structs.go // Package mock_s3 is a generated GoMock package. package mock_s3 diff --git a/plugin/output/s3/object_store_test.go b/plugin/output/s3/object_store_test.go new file mode 100644 index 000000000..dc9c10381 --- /dev/null +++ b/plugin/output/s3/object_store_test.go @@ -0,0 +1,25 @@ +package s3 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestObjectStoreClientLimiterCanCreate(t *testing.T) { + t.Run("can", func(t *testing.T) { + limiter := NewObjectStoreClientLimiter(1) + require.True(t, limiter.CanCreate()) + }) + t.Run("can't", func(t *testing.T) { + limiter := NewObjectStoreClientLimiter(0) + require.False(t, limiter.CanCreate()) + }) +} + +func TestObjectStoreClientIncrement(t *testing.T) { + limiter := NewObjectStoreClientLimiter(1) + require.Equal(t, 0, limiter.created) + limiter.Increment() + require.Equal(t, 1, limiter.created) +} diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go index 558822ec5..a687a97f0 100644 --- a/plugin/output/s3/s3.go +++ b/plugin/output/s3/s3.go @@ -18,6 +18,7 @@ import ( "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/metric" + "github.com/ozontech/file.d/offset" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/plugin/output/file" "go.uber.org/zap" @@ -136,6 +137,24 @@ type compressor interface { getName(fileName string) string } +type fileDTO struct { + fileName, + bucketName, + objectName string +} + +type singleBucketConfig struct { + // s3 section + Endpoint string `json:"endpoint" required:"true"` + AccessKey string `json:"access_key" required:"true"` + SecretKey string `json:"secret_key" required:"true"` + Bucket string `json:"bucket" required:"true"` + Secure bool `json:"secure" default:"false"` + FilePluginInfo file.Config `json:"file_plugin" required:"true"` +} + +type MultiBuckets []singleBucketConfig + type Plugin struct { controller pipeline.OutputPluginController logger *zap.SugaredLogger @@ -153,26 +172,9 @@ type Plugin struct { compressCh chan fileDTO uploadCh chan fileDTO - compressor compressor } -type fileDTO struct { - fileName string - bucketName string -} - -type singleBucketConfig struct { - // s3 section - Endpoint string `json:"endpoint" required:"true"` - AccessKey string `json:"access_key" required:"true"` - SecretKey string `json:"secret_key" required:"true"` - Bucket string `json:"bucket" required:"true"` - Secure bool `json:"secure" default:"false"` - FilePluginInfo file.Config `json:"file_plugin" required:"true"` -} -type MultiBuckets []singleBucketConfig - //! config-params //^ config-params type Config struct { @@ -185,7 +187,6 @@ type Config struct { CompressionType string `json:"compression_type" default:"zip" options:"zip"` //* // s3 section - //> @3@4@5@6 //> Endpoint address of default bucket. Endpoint string `json:"endpoint" required:"true"` //* @@ -216,20 +217,6 @@ type Config struct { DynamicBucketsLimit int `json:"dynamic_buckets_limit" default:"32"` //* } -func (c *Config) IsMultiBucketExists(bucketName string) bool { - if c.MultiBuckets == nil { - return false - } - - for _, bucket := range c.MultiBuckets { - if bucketName == bucket.Bucket { - return true - } - } - - return false -} - func init() { fd.DefaultPluginRegistry.RegisterOutput(&pipeline.PluginStaticInfo{ Type: outPluginType, @@ -241,8 +228,9 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { return &Plugin{}, &Config{} } +// Start initializes and runs plugin. This function calls by pipeline and have hardcoded dependencies. func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { - p.StartWithMinio(config, params, p.minioClientsFactory) + p.StartInner(config, params, p.minioClientsFactory) } func (p *Plugin) registerPluginMetrics() { @@ -253,7 +241,8 @@ func (p *Plugin) registerPluginMetrics() { }) } -func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.OutputPluginParams, factory objStoreFactory) { +// StartInner initializes and runs plugin. Unlike `Start` arguments passed by aggregation which allows mocks testing. +func (p *Plugin) StartInner(config pipeline.AnyConfig, params *pipeline.OutputPluginParams, objectStoreFactory objStoreFactory) { p.registerPluginMetrics() p.controller = params.Controller @@ -279,16 +268,16 @@ func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.Outp } // initialize minio clients. - defaultClient, clients, err := factory(p.config) + defaultClient, clients, err := objectStoreFactory(p.config) if err != nil { p.logger.Panicf("could not create minio client, error: %s", err.Error()) } p.defaultClient = defaultClient p.clients = clients - // dynamicDirs needs defaultClient set. + // dynamicDirs needs defaultClient set dynamicDirs := p.getDynamicDirsArtifacts(targetDirs) - // file for each bucket. + // file for each bucket fileNames := p.getFileNames(outPlugCount) p.uploadCh = make(chan fileDTO, p.config.FileConfig.WorkersCount_*4) @@ -309,10 +298,12 @@ func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.Outp p.uploadExistingFiles(targetDirs, dynamicDirs, fileNames) } +// Stop ends plugin job. func (p *Plugin) Stop() { p.outPlugins.Stop() } +// Out send event from file plugins. func (p *Plugin) Out(event *pipeline.Event) { p.outPlugins.Out(event, pipeline.PluginSelector{ CondType: pipeline.ByNameSelector, @@ -320,6 +311,20 @@ func (p *Plugin) Out(event *pipeline.Event) { }) } +// IsMultiBucketExists check existense on bucket. +func (c *Config) IsMultiBucketExists(bucketName string) bool { + if c.MultiBuckets == nil { + return false + } + + for _, bucket := range c.MultiBuckets { + if bucketName == bucket.Bucket { + return true + } + } + return false +} + // getBucketName decides which s3 bucket shall receive event. func (p *Plugin) getBucketName(event *pipeline.Event) string { bucketName := event.Root.Dig(p.config.BucketEventField).AsString() @@ -376,14 +381,14 @@ func (p *Plugin) getDynamicDirsArtifacts(targetDirs map[string]string) map[strin // creates new dynamic plugin if such s3 bucket exists. func (p *Plugin) tryRunNewPlugin(bucketName string) (isCreated bool) { - // To avoid concurrent creation of bucketName plugin. + // to avoid concurrent creation of bucketName plugin p.dynamicPlugCreationMu.Lock() defer p.dynamicPlugCreationMu.Unlock() - // Probably other worker created plugin concurrently, need check dynamic bucket one more time. + // probably other worker created plugin concurrently, need check dynamic bucket one more time if p.outPlugins.IsDynamic(bucketName) { return true } - // If limit of dynamic buckets reached fallback to DefaultBucket. + // if limit of dynamic buckets reached fallback to DefaultBucket if !p.limiter.CanCreate() { p.logger.Warn( "Limit of %d dynamic buckets reached, can't create new. Fallback to %s.", @@ -414,6 +419,9 @@ func (p *Plugin) tryRunNewPlugin(bucketName string) (isCreated bool) { anyPlugin, _ := file.Factory() outPlugin := anyPlugin.(*file.Plugin) outPlugin.SealUpCallback = p.addFileJobWithBucket(bucketName) + if p.config.FileConfig.MetaCfg.EnableMetaFiles { + outPlugin.FileMetaCallback = p.genObjInfo(bucketName) + } localBucketConfig := p.config.FileConfig localBucketConfig.TargetFile = fmt.Sprintf("%s%s%s", bucketDir, bucketName, p.fileExtension) @@ -435,18 +443,22 @@ func (p *Plugin) uploadExistingFiles(targetDirs, dynamicDirs, fileNames map[stri allDirs[k] = v } for bucketName, dir := range allDirs { - // get all compressed files. + // get all compressed files pattern := fmt.Sprintf("%s*%s", dir, p.compressor.getExtension()) compressedFiles, err := filepath.Glob(pattern) if err != nil { p.logger.Panicf("could not read dir: %s", dir) } - // sort compressed files by creation time. + // sort compressed files by creation time sort.Slice(compressedFiles, p.getSortFunc(compressedFiles)) - // upload archive. + // upload archive for _, z := range compressedFiles { - p.logger.Infof("uploaded file: %s, bucket: %s", z, bucketName) - p.uploadCh <- fileDTO{fileName: z, bucketName: bucketName} + objName, ok := p.restoreObjectName(z) + if !ok { + objName = p.generateObjectName(z) + } + + p.uploadCh <- fileDTO{fileName: z, bucketName: bucketName, objectName: objName} } // compress all files that we have in the dir p.compressFilesInDir(bucketName, targetDirs, fileNames) @@ -461,10 +473,14 @@ func (p *Plugin) compressFilesInDir(bucketName string, targetDirs, fileNames map if err != nil { p.logger.Panicf("could not read dir: %s", targetDirs[bucketName]) } - // sort files by creation time. + // sort files by creation time sort.Slice(files, p.getSortFunc(files)) for _, f := range files { - p.compressCh <- fileDTO{fileName: f, bucketName: bucketName} + objName, ok := p.restoreObjectName(fmt.Sprintf("%s%s", f, p.compressor.getExtension())) + if !ok { + objName = p.generateObjectName(f) + } + p.compressCh <- fileDTO{fileName: f, bucketName: bucketName, objectName: objName} } } @@ -483,9 +499,22 @@ func (p *Plugin) getSortFunc(files []string) func(i, j int) bool { } } -func (p *Plugin) addFileJobWithBucket(bucketName string) func(filename string) { - return func(filename string) { - p.compressCh <- fileDTO{fileName: filename, bucketName: bucketName} +// addFileJobWithBucket receives sealed log files +func (p *Plugin) addFileJobWithBucket(bucketName string) func(filename, objectName string) { + return func(filename, objectName string) { + if objectName == "" { + objectName = p.generateObjectName(filename) + } + p.compressCh <- fileDTO{fileName: filename, objectName: objectName, bucketName: bucketName} + } +} + +// genObjInfo returns new s3 object name, path and compressor extention +func (p *Plugin) genObjInfo(bucket string) func(name string) (objectPath, ext string) { + return func(name string) (objectPath, ext string) { + objectName := p.generateObjectName(name + p.compressor.getExtension()) + + return filepath.Join(p.config.Endpoint, bucket, objectName), p.compressor.getExtension() } } @@ -495,14 +524,16 @@ func (p *Plugin) uploadWork() { for { err := p.uploadToS3(compressed) if err == nil { - p.logger.Infof("successfully uploaded object: %s", compressed) + p.logger.Infof("successfully uploaded object: %v", compressed) // delete archive after uploading err = os.Remove(compressed.fileName) if err != nil { p.logger.Panicf("could not delete file: %s, err: %s", compressed, err.Error()) } + break } + sleepTime += sleepTime p.logger.Errorf("could not upload object: %s, next attempt in %s, error: %s", compressed, sleepTime.String(), err.Error()) time.Sleep(sleepTime) @@ -520,7 +551,8 @@ func (p *Plugin) compressWork() { p.logger.Panicf("could not delete file: %s, error: %s", dto, err.Error()) } dto.fileName = compressedName - p.uploadCh <- fileDTO{fileName: dto.fileName, bucketName: dto.bucketName} + + p.uploadCh <- fileDTO{fileName: dto.fileName, objectName: dto.objectName, bucketName: dto.bucketName} } } @@ -533,7 +565,8 @@ func (p *Plugin) uploadToS3(compressedDTO fileDTO) error { } _, err := cl.FPutObject( - compressedDTO.bucketName, p.generateObjectName(compressedDTO.fileName), + compressedDTO.bucketName, + compressedDTO.objectName, compressedDTO.fileName, p.compressor.getObjectOptions(), ) @@ -541,6 +574,7 @@ func (p *Plugin) uploadToS3(compressedDTO fileDTO) error { metric.GetCounter(subsystemName, sendErrorCounter).Inc() return fmt.Errorf("could not upload file: %s into bucket: %s, error: %s", compressedDTO.fileName, compressedDTO.bucketName, err.Error()) } + return nil } @@ -549,6 +583,30 @@ func (p *Plugin) generateObjectName(name string) string { n := strconv.FormatInt(r.Int63n(math.MaxInt64), 16) n = n[len(n)-8:] objectName := path.Base(name) + objectName = objectName[0 : len(objectName)-len(p.compressor.getExtension())] return fmt.Sprintf("%s.%s%s", objectName, n, p.compressor.getExtension()) } + +// restoreObjectName tries restore object name from sealed metafile +func (p *Plugin) restoreObjectName(file string) (objectName string, success bool) { + if !p.config.FileConfig.MetaCfg.EnableMetaFiles { + return "", false + } + + var result map[string]interface{} + if err := offset.LoadJSON(filepath.Join(p.config.FileConfig.MetaCfg.MetaDataDir, fmt.Sprintf( + "%s%s", p.config.FileConfig.MetaCfg.SealedMetaPrefix, filepath.Base(file), + )), &result); err != nil { + p.logger.Error("can't restore s3 object name: %s", err.Error()) + return "", false + } + + if objectName, ok := result[p.config.FileConfig.MetaCfg.SealedFileNameField]; ok { + if strVal, success := objectName.(string); success { + return strVal, success + } + } + + return "", false +} diff --git a/plugin/output/s3/s3_internals.go b/plugin/output/s3/s3_internals.go index 6f8821c2a..91b330a9a 100644 --- a/plugin/output/s3/s3_internals.go +++ b/plugin/output/s3/s3_internals.go @@ -18,9 +18,10 @@ var ( type objStoreFactory func(cfg *Config) (ObjectStoreClient, map[string]ObjectStoreClient, error) +// minioClientsFactory returns factory for s3 clients func (p *Plugin) minioClientsFactory(cfg *Config) (ObjectStoreClient, map[string]ObjectStoreClient, error) { minioClients := make(map[string]ObjectStoreClient) - // initialize minio clients object for main bucket. + // initialize minio clients object for main bucket defaultClient, err := minio.New(cfg.Endpoint, cfg.AccessKey, cfg.SecretKey, cfg.Secure) if err != nil { return nil, nil, err @@ -38,15 +39,16 @@ func (p *Plugin) minioClientsFactory(cfg *Config) (ObjectStoreClient, map[string return defaultClient, minioClients, nil } +// getStatucDirs returns dir of statis buckets func (p *Plugin) getStaticDirs(outPlugCount int) (map[string]string, error) { - // dir for all bucket files. + // dir for all bucket files dir, _ := filepath.Split(p.config.FileConfig.TargetFile) targetDirs := make(map[string]string, outPlugCount) targetDirs[p.config.DefaultBucket] = dir - // multi_buckets from config are sub dirs on in Config.FileConfig.TargetFile dir. + // multi_buckets from config are sub dirs on in Config.FileConfig.TargetFile dir for _, singleBucket := range p.config.MultiBuckets { - // todo bucket names can't intersect, add ability to have equal bucket names in different s3 servers. + // todo bucket names can't intersect, add ability to have equal bucket names in different s3 servers if _, ok := targetDirs[singleBucket.Bucket]; ok { return nil, fmt.Errorf("bucket name %s has duplicated", singleBucket.Bucket) } @@ -55,6 +57,7 @@ func (p *Plugin) getStaticDirs(outPlugCount int) (map[string]string, error) { return targetDirs, nil } +// getFileNames return file name for each bucket func (p *Plugin) getFileNames(outPlugCount int) map[string]string { fileNames := make(map[string]string, outPlugCount) @@ -69,7 +72,7 @@ func (p *Plugin) getFileNames(outPlugCount int) map[string]string { return fileNames } -// Try to create buckets from dirs lying in dynamic_dirs route +// Try to create buckets from dirs lying in dynamic_dirs route. func (p *Plugin) createPlugsFromDynamicBucketArtifacts(targetDirs map[string]string) { dynamicDirsPath := filepath.Join(targetDirs[p.config.DefaultBucket], DynamicBucketDir) dynamicDir, err := ioutil.ReadDir(dynamicDirsPath) @@ -78,12 +81,12 @@ func (p *Plugin) createPlugsFromDynamicBucketArtifacts(targetDirs map[string]str return } for _, f := range dynamicDir { - // dir name == bucket. Were interested only in dirs. + // dir name == bucket. Were interested only in dirs if !f.IsDir() { continue } - // If bucket was dynamic and now declared as static, we just ignore it. + // if bucket was dynamic and now declared as static, we just ignore it if p.config.IsMultiBucketExists(f.Name()) { continue } @@ -95,6 +98,7 @@ func (p *Plugin) createPlugsFromDynamicBucketArtifacts(targetDirs map[string]str } } +// createOutPlugin creates and returns file plugin for gives bucket func (p *Plugin) createOutPlugin(bucketName string) (*file.Plugin, error) { exists, err := p.clients[bucketName].BucketExists(bucketName) if err != nil { @@ -107,10 +111,14 @@ func (p *Plugin) createOutPlugin(bucketName string) (*file.Plugin, error) { anyPlugin, _ := file.Factory() outPlugin := anyPlugin.(*file.Plugin) outPlugin.SealUpCallback = p.addFileJobWithBucket(bucketName) + if p.config.FileConfig.MetaCfg.EnableMetaFiles { + outPlugin.FileMetaCallback = p.genObjInfo(bucketName) + } return outPlugin, nil } +// startPlugins create and runs all underlying file plugins func (p *Plugin) startPlugins(Params *pipeline.OutputPluginParams, outPlugCount int, targetDirs, fileNames map[string]string) error { outPlugins := make(map[string]file.Plugable, outPlugCount) outPlugin, err := p.createOutPlugin(p.config.DefaultBucket) @@ -120,7 +128,7 @@ func (p *Plugin) startPlugins(Params *pipeline.OutputPluginParams, outPlugCount outPlugins[p.config.DefaultBucket] = outPlugin p.logger.Infof("bucket %s exists", p.config.DefaultBucket) - // If multi_buckets described on file.d config, check each of them as well. + // if multi_buckets described on file.d config, check each of them as well for _, singleBucket := range p.config.MultiBuckets { outPlugin, err := p.createOutPlugin(singleBucket.Bucket) if err != nil { @@ -144,8 +152,8 @@ func (p *Plugin) startPlugins(Params *pipeline.OutputPluginParams, outPlugCount Params: Params, } } else { - // For multi_buckets copy main config and replace file path with bucket sub dir path. - // Example /var/log/file.d.log => /var/log/static_bucket/bucketName/bucketName.log + // for multi_buckets copy main config and replace file path with bucket sub dir path + // example /var/log/file.d.log => /var/log/static_bucket/bucketName/bucketName.log localBucketConfig := p.config.FileConfig localBucketConfig.TargetFile = fmt.Sprintf("%s%s%s", targetDirs[bucketName], fileNames[bucketName], p.fileExtension) starterData = pipeline.PluginsStarterData{ diff --git a/plugin/output/s3/s3_test.go b/plugin/output/s3/s3_test.go index d6aae4773..2bef8515f 100644 --- a/plugin/output/s3/s3_test.go +++ b/plugin/output/s3/s3_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/golang/mock/gomock" "github.com/minio/minio-go" "github.com/ozontech/file.d/cfg" @@ -21,7 +23,6 @@ import ( "github.com/ozontech/file.d/test" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" - "go.uber.org/atomic" "golang.org/x/net/context" ) @@ -42,7 +43,7 @@ type testS3Plugin struct { } func (p *testS3Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { - p.StartWithMinio(config, params, p.objStoreF) + p.StartInner(config, params, p.objStoreF) } func fPutObjectOk(bucketName, objectName, filePath string, opts minio.PutObjectOptions) (n int64, err error) { diff --git a/plugin/output/s3/structs.go b/plugin/output/s3/structs.go new file mode 100644 index 000000000..3ed7f9723 --- /dev/null +++ b/plugin/output/s3/structs.go @@ -0,0 +1 @@ +package s3