Skip to content

Commit

Permalink
Merge pull request moby#14442 from cpuguy83/refactor_logdrvier_reader
Browse files Browse the repository at this point in the history
Refactor log driver reader
  • Loading branch information
calavera committed Jul 22, 2015
2 parents 19ba7f9 + d3b3ebc commit 1c6fe58
Show file tree
Hide file tree
Showing 34 changed files with 2,858 additions and 582 deletions.
2 changes: 1 addition & 1 deletion api/client/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
since := cmd.String([]string{"-since"}, "", "Show logs since timestamp")
times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps")
tail := cmd.String([]string{"-tail"}, "latest", "Number of lines to show from the end of the logs")
tail := cmd.String([]string{"-tail"}, "all", "Number of lines to show from the end of the logs")
cmd.Require(flag.Exact, 1)

cmd.ParseFlags(args, true)
Expand Down
15 changes: 13 additions & 2 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,18 +629,29 @@ func (s *Server) getContainersLogs(version version.Version, w http.ResponseWrite
closeNotifier = notifier.CloseNotify()
}

c, err := s.daemon.Get(vars["name"])
if err != nil {
return err
}

outStream := ioutils.NewWriteFlusher(w)
// write an empty chunk of data (this is to ensure that the
// HTTP Response is sent immediatly, even if the container has
// not yet produced any data)
outStream.Write(nil)

logsConfig := &daemon.ContainerLogsConfig{
Follow: boolValue(r, "follow"),
Timestamps: boolValue(r, "timestamps"),
Since: since,
Tail: r.Form.Get("tail"),
UseStdout: stdout,
UseStderr: stderr,
OutStream: ioutils.NewWriteFlusher(w),
OutStream: outStream,
Stop: closeNotifier,
}

if err := s.daemon.ContainerLogs(vars["name"], logsConfig); err != nil {
if err := s.daemon.ContainerLogs(c, logsConfig); err != nil {
fmt.Fprintf(w, "Error running logs job: %s\n", err)
}

Expand Down
65 changes: 26 additions & 39 deletions daemon/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/docker/docker/pkg/broadcastwriter"
"github.com/docker/docker/pkg/fileutils"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/mount"
"github.com/docker/docker/pkg/nat"
"github.com/docker/docker/pkg/promise"
Expand Down Expand Up @@ -325,25 +324,13 @@ func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {

func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stdout.AddWriter(writer, "")
streamConfig.stdout.AddWriter(writer)
return ioutils.NewBufReader(reader)
}

func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stderr.AddWriter(writer, "")
return ioutils.NewBufReader(reader)
}

func (streamConfig *StreamConfig) StdoutLogPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stdout.AddWriter(writer, "stdout")
return ioutils.NewBufReader(reader)
}

func (streamConfig *StreamConfig) StderrLogPipe() io.ReadCloser {
reader, writer := io.Pipe()
streamConfig.stderr.AddWriter(writer, "stderr")
streamConfig.stderr.AddWriter(writer)
return ioutils.NewBufReader(reader)
}

Expand Down Expand Up @@ -715,6 +702,9 @@ func (container *Container) getLogConfig() runconfig.LogConfig {
}

func (container *Container) getLogger() (logger.Logger, error) {
if container.logDriver != nil && container.IsRunning() {
return container.logDriver, nil
}
cfg := container.getLogConfig()
if err := logger.ValidateLogOpts(cfg.Type, cfg.Config); err != nil {
return nil, err
Expand Down Expand Up @@ -888,36 +878,33 @@ func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writ
}

func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {

if logs {
logDriver, err := c.getLogger()
if err != nil {
logrus.Errorf("Error obtaining the logger %v", err)
return err
}
if _, ok := logDriver.(logger.Reader); !ok {
logrus.Errorf("cannot read logs for [%s] driver", logDriver.Name())
} else {
if cLog, err := logDriver.(logger.Reader).ReadLog(); err != nil {
logrus.Errorf("Error reading logs %v", err)
} else {
dec := json.NewDecoder(cLog)
for {
l := &jsonlog.JSONLog{}

if err := dec.Decode(l); err == io.EOF {
break
} else if err != nil {
logrus.Errorf("Error streaming logs: %s", err)
break
}
if l.Stream == "stdout" && stdout != nil {
io.WriteString(stdout, l.Log)
}
if l.Stream == "stderr" && stderr != nil {
io.WriteString(stderr, l.Log)
}
cLog, ok := logDriver.(logger.LogReader)
if !ok {
return logger.ErrReadLogsNotSupported
}
logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})

LogLoop:
for {
select {
case msg, ok := <-logs.Msg:
if !ok {
break LogLoop
}
if msg.Source == "stdout" && stdout != nil {
stdout.Write(msg.Line)
}
if msg.Source == "stderr" && stderr != nil {
stderr.Write(msg.Line)
}
case err := <-logs.Err:
logrus.Errorf("Error streaming logs: %v", err)
break LogLoop
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions daemon/logger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Context struct {
LogPath string
}

// Hostname returns the hostname from the underlying OS
func (ctx *Context) Hostname() (string, error) {
hostname, err := os.Hostname()
if err != nil {
Expand All @@ -35,6 +36,7 @@ func (ctx *Context) Hostname() (string, error) {
return hostname, nil
}

// Command returns the command that the container being logged was started with
func (ctx *Context) Command() string {
terms := []string{ctx.ContainerEntrypoint}
for _, arg := range ctx.ContainerArgs {
Expand Down
Loading

0 comments on commit 1c6fe58

Please sign in to comment.