Skip to content

Commit

Permalink
cshared: remove printing from tests in favor of assertions.
Browse files Browse the repository at this point in the history
Signed-off-by: Phillip Whelan <[email protected]>
  • Loading branch information
pwhelan committed Apr 16, 2024
1 parent 1dad5e4 commit ae5d13a
Showing 1 changed file with 79 additions and 39 deletions.
118 changes: 79 additions & 39 deletions cshared_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package plugin

import (
"bytes"
"context"
"fmt"
"reflect"
Expand Down Expand Up @@ -436,6 +435,8 @@ type testOutputHandlerReflect struct {
param string
flushCounter metric.Counter
log Logger
Test *testing.T
Check func(Message) error
}

func (plug *testOutputHandlerReflect) Init(ctx context.Context, fbit *Fluentbit) error {
Expand All @@ -447,35 +448,27 @@ func (plug *testOutputHandlerReflect) Init(ctx context.Context, fbit *Fluentbit)
}

func (plug *testOutputHandlerReflect) Flush(ctx context.Context, ch <-chan Message) error {
// Iterate Records
plug.Test.Helper()
count := 0
printout := bytes.NewBuffer([]byte{})

for {
select {
case msg := <-ch:
rec := reflect.ValueOf(msg.Record)
printout.WriteString("[{")
if rec.Kind() == reflect.Map {
keyCount := 0
for _, key := range rec.MapKeys() {
if keyCount > 0 {
printout.WriteString(", ")
}
strct := rec.MapIndex(key)
printout.WriteString(
fmt.Sprintf("\"%s\":\"%v\"",
key.Interface(), strct.Interface()))
keyCount++
if rec.Kind() != reflect.Map {
return fmt.Errorf("incorrect record type in flush")
}

if plug.Check != nil {
if err := plug.Check(msg); err != nil {
return err
}
}
printout.WriteString("}]")
count++
case <-ctx.Done():
if count <= 0 {
return fmt.Errorf("no records flushed")
}
fmt.Print(printout.String())
return nil
}
}
Expand All @@ -497,38 +490,25 @@ func (plug *testOutputHandlerMapString) Init(ctx context.Context, fbit *Fluentbi

func (plug *testOutputHandlerMapString) Flush(ctx context.Context, ch <-chan Message) error {
count := 0
printout := bytes.NewBuffer([]byte{})

for {
select {
case msg := <-ch:
printout.WriteString("[{")
keyCount := 0
record, ok := msg.Record.(map[string]interface{})
if !ok {
panic("unable to convert record")
return fmt.Errorf("unable to convert record to map[string]")
}
for key, value := range record {
if keyCount > 0 {
printout.WriteString(", ")
}

val, ok := value.(string)
for _, value := range record {
_, ok := value.(string)
if !ok {
panic("unable to convert value")
return fmt.Errorf("unable to convert value")
}

printout.WriteString(
fmt.Sprintf("\"%s\":\"%v\"", key, val))
keyCount++
}
printout.WriteString("}]")
count++
case <-ctx.Done():
if count <= 0 {
return fmt.Errorf("no records flushed")
}
fmt.Print(printout.String())
return nil
}
}
Expand All @@ -547,23 +527,22 @@ func TestOutputSimulated(t *testing.T) {
ch := make(chan Message)
tag := "tag"

outputReflect := testOutputHandlerReflect{}
outputReflect := testOutputHandlerReflect{Test: t}

wg.Add(1)
go func(ctxt context.Context, wg *sync.WaitGroup, ch <-chan Message) {
err := outputReflect.Flush(ctxt, ch)
if err != nil {
t.Error(err)
t.Fail()
}
wg.Done()
}(ctxt, &wg, ch)

ch <- Message{
Time: time.Now(),
Record: map[string]interface{}{
"foo": "bar",
"foobar": "1",
"foo": "bar",
"bar": "1",
},
tag: &tag,
}
Expand Down Expand Up @@ -600,7 +579,65 @@ func TestOutputSimulated(t *testing.T) {
}

func TestOutputFlush(t *testing.T) {
out := testOutputHandlerReflect{}
var wg sync.WaitGroup

out := testOutputHandlerReflect{
Test: t,
Check: func(msg Message) error {
wg.Done()

record, ok := msg.Record.(map[string]interface{})
if !ok {
t.Errorf("unable to decode record")
return nil
}

foo, ok := record["foo"]
if !ok {
t.Errorf("unable to access value for foo")
return nil
}

// this is a little bit disconcerting. does the msgpack
// decoder default to using []uint8 for strings?
if foostr, okfoo := foo.([]uint8); !okfoo {
t.Errorf("unable to get value for foo")
return nil
} else if string(foostr) != "bar" {
t.Errorf("invalid value for foo")
return nil
}

bar, ok := record["bar"]
if !ok {
t.Errorf("unable to access value for bar")
return nil
}

if barnum, okbar := bar.(int64); !okbar {
t.Errorf("unable to get value for bar")
return nil
} else if barnum != 0 {
t.Errorf("invalid value for bar")
return nil
}

foobar, ok := record["foobar"]
if !ok {
t.Errorf("unable to access value for foobar")
return nil
}

if foobarfloat, okfoobar := foobar.(float64); !okfoobar {
t.Errorf("unable to get value for foobar")
return nil
} else if foobarfloat != 1.337 {
t.Errorf("invalid value for foobar")
return nil
}
return nil
},
}
_ = prepareOutputFlush(&out)

msg := Message{
Expand All @@ -618,8 +655,11 @@ func TestOutputFlush(t *testing.T) {
t.Fail()
}

wg.Add(1)
if err := pluginFlush("foobar", b); err == output.FLB_ERROR {
t.Error(err)
t.Fail()
}

wg.Wait()
}

0 comments on commit ae5d13a

Please sign in to comment.