Skip to content

Commit

Permalink
Switch tcplog to sync to simpify usage
Browse files Browse the repository at this point in the history
  • Loading branch information
evanphx committed Apr 10, 2015
1 parent bdfd1a6 commit dfa3ad6
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 30 deletions.
40 changes: 38 additions & 2 deletions plugins/lib/tcplog/tcplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Logger struct {
PumpClosed bool
PumpDropped uint64
ConnDropped uint64

syncConn net.Conn
}

func NewLogger(address string, ssl bool, formatter Formatter) *Logger {
Expand Down Expand Up @@ -62,10 +64,44 @@ func (l *Logger) Read(message interface{}) (err error) {
return errors.New("Unable to read message type")
}

return l.write(data)
retry:
for l.syncConn == nil {
c, err := l.dial()
if err != nil {
time.Sleep(1 * time.Second)
continue
}

l.syncConn = c
}

_, err = l.syncConn.Write(data)

if err != nil {
goto retry
}

return nil
}

func (l *Logger) ReadAsync(message interface{}) (err error) {
var data []byte

switch m := message.(type) {
case []byte:
data = m
case string:
data = []byte(m)
case *cypress.Message:
data, _ = l.Format(m)
default:
return errors.New("Unable to read message type")
}

return l.writeAsync(data)
}

func (l *Logger) write(line []byte) (err error) {
func (l *Logger) writeAsync(line []byte) (err error) {
if l.PumpClosed == true {
return errors.New("Pump is closed")
}
Expand Down
27 changes: 21 additions & 6 deletions plugins/lib/tcplog/tcplog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,26 @@ func (tf *TestFormatter) Format(m *cypress.Message) ([]byte, error) {
func TestRead(t *testing.T) {
n := neko.Start(t)

var l *Logger
var (
l *Logger
s *TcpServer
)

n.Setup(func() {
l = NewLogger("", false, &TestFormatter{})
s = NewTcpServer()

go s.Run("127.0.0.1")

l = NewLogger(<-s.Address, false, &TestFormatter{})
})

n.It("reads a byte slice", func() {
ok := l.Read([]byte("This is a long line"))
assert.NoError(t, ok)

m := <-s.Messages

assert.Equal(t, string("This is a long line"), string(m))
})

n.It("reads a string", func() {
Expand Down Expand Up @@ -59,11 +70,15 @@ func TestWrite(t *testing.T) {
)

n.Setup(func() {
l = NewLogger("", false, &TestFormatter{})
s := NewTcpServer()

go s.Run("127.0.0.1")

l = NewLogger(<-s.Address, false, &TestFormatter{})
})

n.It("adds a log line to the pump", func() {
l.write(line)
l.writeAsync(line)

select {
case pumpLine := <-l.Pump:
Expand All @@ -79,7 +94,7 @@ func TestWrite(t *testing.T) {

n.It("adds an error line to the pump if lines were dropped", func() {
l.PumpDropped = 1
l.write(line)
l.writeAsync(line)

select {
case <-l.Pump:
Expand All @@ -98,7 +113,7 @@ func TestWrite(t *testing.T) {

n.It("does not add a log line and increments dropped counter if pump is full ", func() {
l.Pump = make(chan []byte, 0)
l.write(line)
l.writeAsync(line)

select {
case <-l.Pump:
Expand Down
2 changes: 0 additions & 2 deletions plugins/logentries/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ func (p *Send) Execute(args []string) error {
return err
}

logentries.Run()

return cypress.Glue(dec, logentries)
}

Expand Down
1 change: 0 additions & 1 deletion plugins/logentries/logentries_send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestLogentriesRunWithTestServer(t *testing.T) {
go s.Run("127.0.0.1")

l := NewLogger(<-s.Address, false, "token")
go l.Run()

message := tcplog.NewMessage(t)
l.Read(message)
Expand Down
15 changes: 0 additions & 15 deletions plugins/logstash/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package logstash
import (
"fmt"
"os"
"sync"

"github.com/vektra/cypress"
"github.com/vektra/cypress/cli/commands"
Expand All @@ -25,20 +24,6 @@ func (p *Send) Execute(args []string) error {
return err
}

var wg sync.WaitGroup

defer wg.Wait()

wg.Add(1)

go func() {
defer wg.Done()

logstash.Run()
}()

defer logstash.Close()

return cypress.Glue(dec, logstash)
}

Expand Down
1 change: 0 additions & 1 deletion plugins/logstash/logstash_send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func TestLogstashRunWithTestServer(t *testing.T) {
go s.Run("127.0.0.1")

l := NewLogger(<-s.Address, false)
go l.Run()

message := tcplog.NewMessage(t)
l.Read(message)
Expand Down
2 changes: 0 additions & 2 deletions plugins/papertrail/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ func (p *Send) Execute(args []string) error {
return err
}

papertrail.Run()

return cypress.Glue(dec, papertrail)
}

Expand Down
1 change: 0 additions & 1 deletion plugins/papertrail/papertrail_send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func TestPapertrailRunWithTestServer(t *testing.T) {
go s.Run("127.0.0.1")

l := NewLogger(<-s.Address, false)
go l.Run()

message := tcplog.NewMessage(t)
l.Read(message)
Expand Down

0 comments on commit dfa3ad6

Please sign in to comment.