Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ressurect PR #157

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ lint:
.PHONY: mock
mock:
@go get github.com/golang/mock/gomock
@go install github.com/golang/mock/mockgen
@~/go/bin/mockgen -source=plugin/output/s3/s3.go -destination=plugin/output/s3/mock/s3.go
@~/go/bin/mockgen -source=plugin/output/postgres/postgres.go -destination=plugin/output/postgres/mock/postgres.go
@mockgen -source=plugin/output/s3/structs.go -destination=plugin/output/s3/mock/s3.go
@mockgen -source=plugin/output/postgres/postgres.go -destination=plugin/output/postgres/mock/postgres.go
@mockgen -source=pipeline/plugin.go -destination=pipeline/mocks/pipeline_plugin_mock/plugin_mock.go -package=pipeline_plugin_mock
53 changes: 53 additions & 0 deletions offset/json_offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package offset

import (
"bytes"
"encoding/json"
"io"
)

type jsonValue struct {
value interface{}
}

func (o *jsonValue) Load(r io.Reader) error {
b, err := io.ReadAll(r)
if err != nil {
return err
}

d := json.NewDecoder(bytes.NewReader(b))
d.UseNumber()

return d.Decode(o.value)
}

func (o *jsonValue) Save(w io.Writer) error {
b, err := json.Marshal(o.value)
if err != nil {
return err
}
_, err = w.Write(b)
if err != nil {
return err
}
_, err = w.Write([]byte("\n"))
if err != nil {
return err
}
return nil
}

func newJSONOffset(path string, value interface{}) *Offset {
res := NewOffset(path)
res.Callback = &jsonValue{value}
return res
}

func SaveJSON(path string, value interface{}) error {
return newJSONOffset(path, value).Save()
}

func LoadJSON(path string, value interface{}) error {
return newJSONOffset(path, value).Load()
}
17 changes: 16 additions & 1 deletion offset/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestYAML(t *testing.T) {
assert.Equal(t, offset, loaded)
}

func TestSaveLoad(t *testing.T) {
func TestSaveLoadYaml(t *testing.T) {
path := getTmpPath(t, "offset.yaml")
offset := testOffset{}
offset.set("some_name", 123)
Expand All @@ -58,6 +58,21 @@ func TestSaveLoad(t *testing.T) {
assert.Equal(t, offset, loaded)
}

func TestSaveLoadJson(t *testing.T) {
path := getTmpPath(t, "offset.json")
offset := testOffset{}
offset.set("some_name", 12345)

err := SaveJSON(path, &offset)
assert.NoError(t, err)

loaded := testOffset{}
err = LoadYAML(path, &loaded)
assert.NoError(t, err)

assert.Equal(t, offset, loaded)
}

func TestAppendFile(t *testing.T) {
path := getTmpPath(t, "offset.yaml")
for i := 1; i < 5; i++ {
Expand Down
File renamed without changes.
210 changes: 184 additions & 26 deletions plugin/output/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/pipeline"

"go.uber.org/zap"
"golang.org/x/net/context"
)
Expand All @@ -41,9 +40,13 @@ type Plugin struct {
fileName string
tsFileName string

SealUpCallback func(string)
SealUpCallback func(string, string)
FileMetaCallback func(filename string) (fileOuterDestination, ext string)

mu *sync.RWMutex

metaWriter *meta
pairOfTimestamps *pair
}

type data struct {
Expand Down Expand Up @@ -90,6 +93,26 @@ type Config struct {
//> File mode for log files
FileMode cfg.Base8 `json:"file_mode" default:"0666" parse:"base8"` //*
FileMode_ int64

//> MetaCfg describes metafile with sealfile desription
MetaCfg MetaConfig `json:"meta_config" child:"true"` //*
}

type MetaConfig struct {
//> Stores metadata of sealed files.
EnableMetaFiles bool `json:"enable_meta_files"` //*
//> MetaDataFile saves metainformation about sealed file.
MetaDataDir string `json:"metadata_dir" default:"/var/log/file-d.meta"` //*
//> SealedMetaPrefix defines prefix of sealed meta file
SealedMetaPrefix string `json:"sealed_metafile_prefix" default:"sealed_meta_"` //*
//> TimestampField that contains valid timestamp.
TimestampField string `json:"timestamp_field" default:"timestamp"` //*
//> Static part of json meta.
StaticMeta string `json:"static_meta" default:""` //*
//> SealedFileNameField stores name of sead file
SealedFileNameField string `json:"sealed_filename_field" default:"sealed_name"` //*
//> SealedFilePathFieldName stores name of sead file with path
SealedFilePathFieldName string `json:"sealed_filepath_name_field" default:"sealed_path"` //*
}

func init() {
Expand All @@ -114,29 +137,85 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.fileName = file[0 : len(file)-len(p.fileExtension)]
p.tsFileName = "%s" + "-" + p.fileName

p.batcher = pipeline.NewBatcher(pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
})

p.mu = &sync.RWMutex{}
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

if p.config.MetaCfg.EnableMetaFiles {
metaWriter, err := newMeta(
metaInit{
config: p.config,
fileName: p.fileName,
separator: fileNameSeparator,
filePrefix: metaFilePrefix,
},
)
if err != nil {
params.Logger.Fatalf("can't create meta: %s", err.Error())
}

p.metaWriter = metaWriter

p.pairOfTimestamps = NewPair()
}

p.batcher = pipeline.NewBatcher(
pipeline.BatcherOptions{
PipelineName: params.PipelineName,
OutputType: outPluginType,
OutFn: p.out,
MaintenanceFn: p.maintenance,
Controller: p.controller,
Workers: p.config.WorkersCount_,
BatchSizeCount: p.config.BatchSize_,
BatchSizeBytes: p.config.BatchSizeBytes_,
FlushTimeout: p.config.BatchFlushTimeout_,
MaintenanceInterval: pipeline.DefaultMaintenanceInterval,
},
/* params.PipelineName,
outPluginType,
p.out,
nil,
p.controller,
p.config.WorkersCount_,
p.config.BatchSize_,
p.config.BatchFlushTimeout_,
0, */
)

p.mu = &sync.RWMutex{}

if err := os.MkdirAll(p.targetDir, os.ModePerm); err != nil {
p.logger.Fatalf("could not create target dir: %s, error: %s", p.targetDir, err.Error())
}

p.idx = p.getStartIdx()
p.createNew()
if err := p.startUpCreateNewFile(); err != nil {
p.logger.Panic(err)
}
p.setNextSealUpTime()

if p.config.MetaCfg.EnableMetaFiles {
longpanic.Go(func() {
for {
select {
case <-ctx.Done():
return
//case <-time.After(p.config.RetentionInterval_ / 10):
case <-time.After(time.Second * 2):
p.mu.RLock()
min, max := p.pairOfTimestamps.Get()
p.mu.RUnlock()
if min == 0 && max == 0 {
continue
}
if err := p.metaWriter.updateMetaFileWithLock(min, max); err != nil {
p.logger.Errorf("can't update meta: %s", err.Error())
}
}
}
})
}

if p.file == nil {
p.logger.Panic("file struct is nil!")
}
Expand All @@ -150,6 +229,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.batcher.Start(ctx)
}

func (p *Plugin) maintenance(workerData *pipeline.WorkerData) {}

func (p *Plugin) Stop() {
// we MUST NOT close file, through p.file.Close(), fileSealUpTicker already do this duty.
p.batcher.Stop()
Expand All @@ -175,12 +256,43 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

outBuf := data.outBuf[:0]

minTs, maxTs := int64(0), int64(0)
for _, event := range batch.Events {
outBuf, _ = event.Encode(outBuf)
outBuf = append(outBuf, byte('\n'))

if p.config.MetaCfg.EnableMetaFiles {
ts, err := event.Root.DigStrict(p.config.MetaCfg.TimestampField)
if err != nil {
p.logger.Errorf("doesn't have timestamp field: %s", err.Error())
continue
}
tsInt, err := ts.AsInt64()
if err != nil {
p.logger.Errorf("time field isn't int64: %s", err.Error())
continue
}
if tsInt < minTs || minTs == 0 {
minTs = tsInt
}
if tsInt > maxTs {
maxTs = tsInt
}
}
}
data.outBuf = outBuf

if p.config.MetaCfg.EnableMetaFiles {
// lock prevents values leaking from new log file to old meta file.
// it's possible without lock:
// 1. log file sealed
// 2. new batch processed and pair updated
// 3. meta file sealed
p.mu.Lock()
p.pairOfTimestamps.UpdatePair(minTs, maxTs)
p.mu.Unlock()
}

p.write(outBuf)
}

Expand Down Expand Up @@ -215,24 +327,56 @@ func (p *Plugin) write(data []byte) {
}
}

func (p *Plugin) createNew() {
p.tsFileName = fmt.Sprintf("%d%s%s%s", time.Now().Unix(), fileNameSeparator, p.fileName, p.fileExtension)
// startUpCreateNewFile creates of restores log and meta files during startUp.
func (p *Plugin) startUpCreateNewFile() error {
return p.createNew("", "")
}

// createNew creates new or appends to existing log file
func (p *Plugin) createNew(sealingLogFile, sealingOuterPath string) error {
timestamp := time.Now().Unix()
// file name like: 1654785468_file-d.log
p.tsFileName = fmt.Sprintf("%d%s%s%s", timestamp, fileNameSeparator, p.fileName, p.fileExtension)
logger.Infof("tsFileName in createNew=%s", p.tsFileName)
f := fmt.Sprintf("%s%s", p.targetDir, p.tsFileName)
// full path like: /var/log/1654785468_file-d.log
fileName := fmt.Sprintf("%s%s", p.targetDir, p.tsFileName)
// search for old unsealed file
pattern := fmt.Sprintf("%s*%s%s%s", p.targetDir, fileNameSeparator, p.fileName, p.fileExtension)
matches, err := filepath.Glob(pattern)
if err != nil {
p.logger.Fatalf("can't glob: pattern=%s, err=%ss", pattern, err.Error())
}
// existense of file means crash of prev run before it was sealed. Pull up old file instead of new
if len(matches) == 1 {
p.tsFileName = path.Base(matches[0])
f = fmt.Sprintf("%s%s", p.targetDir, p.tsFileName)
fileName = fmt.Sprintf("%s%s", p.targetDir, p.tsFileName)
}
file, err := os.OpenFile(f, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.FileMode(p.config.FileMode_))
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_RDWR, os.FileMode(p.config.FileMode_))
if err != nil {
p.logger.Panicf("could not open or create file: %s, error: %s", f, err.Error())
return fmt.Errorf("could not open or create file: %s, error: %s", fileName, err.Error())
}
p.file = file

// new meta file
if p.config.MetaCfg.EnableMetaFiles {
// createNew() guarded by mutex on upper layer.
f, l := p.pairOfTimestamps.Reset()
if err := p.metaWriter.newMetaFile(
p.fileName,
sealUpDTO{
sealingOuterPath: sealingOuterPath,
sealingLogFile: sealingLogFile,
firstTimestamp: f,
lastTimestamp: l,
},
timestamp,
p.pairOfTimestamps,
); err != nil {
return err
}
}

return nil
}

// sealUp manages current file: renames, closes, and creates new.
Expand All @@ -246,19 +390,33 @@ func (p *Plugin) sealUp() {
}

// newFileName will be like: ".var/log/log_1_01-02-2009_15:04.log
newFileName := filepath.Join(p.targetDir, fmt.Sprintf("%s%s%d%s%s%s", p.fileName, fileNameSeparator, p.idx, fileNameSeparator, time.Now().Format(p.config.Layout), p.fileExtension))
p.rename(newFileName)
now := time.Now().Format(p.config.Layout)
finalOldFileName := filepath.Join(p.targetDir, fmt.Sprintf("%s%s%d%s%s%s", p.fileName, fileNameSeparator, p.idx, fileNameSeparator, now, p.fileExtension))
p.rename(finalOldFileName)
oldFile := p.file
p.mu.Lock()
p.createNew()

outerPath, ext := "", ""
if p.FileMetaCallback != nil {
outerPath, ext = p.FileMetaCallback(filepath.Base(finalOldFileName))
}

if err := p.createNew(fmt.Sprintf("%s%s", filepath.Base(finalOldFileName), ext), outerPath); err != nil {
p.logger.Panic(err)
}

p.nextSealUpTime = time.Now().Add(p.config.RetentionInterval_)
p.mu.Unlock()
if err := oldFile.Close(); err != nil {
p.logger.Panicf("could not close file: %s, error: %s", oldFile.Name(), err.Error())
}
logger.Errorf("sealing in %d, newFile: %s", time.Now().Unix(), newFileName)

logger.Errorf("sealing in %d, newFile: %s", time.Now().Unix(), finalOldFileName)
if p.SealUpCallback != nil {
longpanic.Go(func() { p.SealUpCallback(newFileName) })
if outerPath != "" {
outerPath = filepath.Base(outerPath)
}
longpanic.Go(func() { p.SealUpCallback(finalOldFileName, outerPath) })
}
}

Expand Down
Loading