Skip to content

Commit

Permalink
remove dead code after decoupling from jsonlog
Browse files Browse the repository at this point in the history
Signed-off-by: Brian Goff <[email protected]>
  • Loading branch information
cpuguy83 committed Jul 22, 2015
1 parent c0391bf commit d3b3ebc
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 317 deletions.
16 changes: 2 additions & 14 deletions daemon/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,25 +323,13 @@ func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {

func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stdout.AddWriter(writer, "")
streamConfig.stdout.AddWriter(writer)
return ioutils.NewBufReader(reader)
}

func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stderr.AddWriter(writer, "")
return ioutils.NewBufReader(reader)
}

func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stdout.AddWriter(writer, "stdout")
return ioutils.NewBufReader(reader)
}

func (streamConfig *StreamConfig) StderrLogPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stderr.AddWriter(writer, "stderr")
streamConfig.stderr.AddWriter(writer)
return ioutils.NewBufReader(reader)
}

Expand Down
95 changes: 12 additions & 83 deletions pkg/broadcastwriter/broadcastwriter.go
Original file line number Diff line number Diff line change
@@ -1,101 +1,33 @@
package broadcastwriter

import (
"bytes"
"io"
"sync"
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/timeutils"
)

// BroadcastWriter accumulate multiple io.WriteCloser by stream.
type BroadcastWriter struct {
sync.Mutex
buf *bytes.Buffer
jsLogBuf *bytes.Buffer
streams map[string](map[io.WriteCloser]struct{})
writers map[io.WriteCloser]struct{}
}

// AddWriter adds new io.WriteCloser for stream.
// If stream is "", then all writes proceed as is. Otherwise every line from
// input will be packed to serialized jsonlog.JSONLog.
func (w *BroadcastWriter) AddWriter(writer io.WriteCloser, stream string) {
// AddWriter adds new io.WriteCloser.
func (w *BroadcastWriter) AddWriter(writer io.WriteCloser) {
w.Lock()
if _, ok := w.streams[stream]; !ok {
w.streams[stream] = make(map[io.WriteCloser]struct{})
}
w.streams[stream][writer] = struct{}{}
w.writers[writer] = struct{}{}
w.Unlock()
}

// Write writes bytes to all writers. Failed writers will be evicted during
// this call.
func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
w.Lock()
if writers, ok := w.streams[""]; ok {
for sw := range writers {
if n, err := sw.Write(p); err != nil || n != len(p) {
// On error, evict the writer
delete(writers, sw)
}
}
if len(w.streams) == 1 {
if w.buf.Len() >= 4096 {
w.buf.Reset()
} else {
w.buf.Write(p)
}
w.Unlock()
return len(p), nil
for sw := range w.writers {
if n, err := sw.Write(p); err != nil || n != len(p) {
// On error, evict the writer
delete(w.writers, sw)
}
}
if w.jsLogBuf == nil {
w.jsLogBuf = new(bytes.Buffer)
w.jsLogBuf.Grow(1024)
}
var timestamp string
created := time.Now().UTC()
w.buf.Write(p)
for {
if n := w.buf.Len(); n == 0 {
break
}
i := bytes.IndexByte(w.buf.Bytes(), '\n')
if i < 0 {
break
}
lineBytes := w.buf.Next(i + 1)
if timestamp == "" {
timestamp, err = timeutils.FastMarshalJSON(created)
if err != nil {
continue
}
}

for stream, writers := range w.streams {
if stream == "" {
continue
}
jsonLog := jsonlog.JSONLogBytes{Log: lineBytes, Stream: stream, Created: timestamp}
err = jsonLog.MarshalJSONBuf(w.jsLogBuf)
if err != nil {
logrus.Errorf("Error making JSON log line: %s", err)
continue
}
w.jsLogBuf.WriteByte('\n')
b := w.jsLogBuf.Bytes()
for sw := range writers {
if _, err := sw.Write(b); err != nil {
delete(writers, sw)
}
}
}
w.jsLogBuf.Reset()
}
w.jsLogBuf.Reset()
w.Unlock()
return len(p), nil
}
Expand All @@ -104,19 +36,16 @@ func (w *BroadcastWriter) Write(p []byte) (n int, err error) {
// will be saved.
func (w *BroadcastWriter) Clean() error {
w.Lock()
for _, writers := range w.streams {
for w := range writers {
w.Close()
}
for w := range w.writers {
w.Close()
}
w.streams = make(map[string](map[io.WriteCloser]struct{}))
w.writers = make(map[io.WriteCloser]struct{})
w.Unlock()
return nil
}

func New() *BroadcastWriter {
return &BroadcastWriter{
streams: make(map[string](map[io.WriteCloser]struct{})),
buf: bytes.NewBuffer(nil),
writers: make(map[io.WriteCloser]struct{}),
}
}
44 changes: 7 additions & 37 deletions pkg/broadcastwriter/broadcastwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func TestBroadcastWriter(t *testing.T) {

// Test 1: Both bufferA and bufferB should contain "foo"
bufferA := &dummyWriter{}
writer.AddWriter(bufferA, "")
writer.AddWriter(bufferA)
bufferB := &dummyWriter{}
writer.AddWriter(bufferB, "")
writer.AddWriter(bufferB)
writer.Write([]byte("foo"))

if bufferA.String() != "foo" {
Expand All @@ -48,7 +48,7 @@ func TestBroadcastWriter(t *testing.T) {
// Test2: bufferA and bufferB should contain "foobar",
// while bufferC should only contain "bar"
bufferC := &dummyWriter{}
writer.AddWriter(bufferC, "")
writer.AddWriter(bufferC)
writer.Write([]byte("bar"))

if bufferA.String() != "foobar" {
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestRaceBroadcastWriter(t *testing.T) {
writer := New()
c := make(chan bool)
go func() {
writer.AddWriter(devNullCloser(0), "")
writer.AddWriter(devNullCloser(0))
c <- true
}()
writer.Write([]byte("hello"))
Expand All @@ -111,9 +111,9 @@ func BenchmarkBroadcastWriter(b *testing.B) {
writer := New()
setUpWriter := func() {
for i := 0; i < 100; i++ {
writer.AddWriter(devNullCloser(0), "stdout")
writer.AddWriter(devNullCloser(0), "stderr")
writer.AddWriter(devNullCloser(0), "")
writer.AddWriter(devNullCloser(0))
writer.AddWriter(devNullCloser(0))
writer.AddWriter(devNullCloser(0))
}
}
testLine := "Line that thinks that it is log line from docker"
Expand Down Expand Up @@ -142,33 +142,3 @@ func BenchmarkBroadcastWriter(b *testing.B) {
b.StartTimer()
}
}

func BenchmarkBroadcastWriterWithoutStdoutStderr(b *testing.B) {
writer := New()
setUpWriter := func() {
for i := 0; i < 100; i++ {
writer.AddWriter(devNullCloser(0), "")
}
}
testLine := "Line that thinks that it is log line from docker"
var buf bytes.Buffer
for i := 0; i < 100; i++ {
buf.Write([]byte(testLine + "\n"))
}
// line without eol
buf.Write([]byte(testLine))
testText := buf.Bytes()
b.SetBytes(int64(5 * len(testText)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
setUpWriter()

for j := 0; j < 5; j++ {
if _, err := writer.Write(testText); err != nil {
b.Fatal(err)
}
}

writer.Clean()
}
}
26 changes: 0 additions & 26 deletions pkg/jsonlog/jsonlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package jsonlog
import (
"encoding/json"
"fmt"
"io"
"time"
)

Expand All @@ -29,28 +28,3 @@ func (jl *JSONLog) Reset() {
jl.Stream = ""
jl.Created = time.Time{}
}

func WriteLog(src io.Reader, dst io.Writer, format string, since time.Time) error {
dec := json.NewDecoder(src)
l := &JSONLog{}
for {
l.Reset()
if err := dec.Decode(l); err != nil {
if err == io.EOF {
return nil
}
return err
}
if !since.IsZero() && l.Created.Before(since) {
continue
}

line, err := l.Format(format)
if err != nil {
return err
}
if _, err := io.WriteString(dst, line); err != nil {
return err
}
}
}
Loading

0 comments on commit d3b3ebc

Please sign in to comment.