Skip to content

Commit

Permalink
Merge pull request #2007 from keboola/mj-rm-body-string
Browse files Browse the repository at this point in the history
feat: Remove recordctx.BodyString(), skip []byte -> string conv
  • Loading branch information
Matovidlo authored Sep 11, 2024
2 parents b452c43 + c3b3794 commit 591a741
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 109 deletions.
18 changes: 15 additions & 3 deletions internal/pkg/encoding/jsonnet/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package jsonnet

import (
"unsafe"

"github.com/google/go-jsonnet/ast"
"github.com/keboola/go-utils/pkg/orderedmap"
"github.com/spf13/cast"
Expand Down Expand Up @@ -48,19 +50,29 @@ func ValueToLiteral(v any) ast.Node {

// ValueToJSONType converts Go value to a Json value for the Jsonnet VM.
func ValueToJSONType(in any) any {
if v, ok := in.(*orderedmap.OrderedMap); ok {
switch v := in.(type) {
case []byte:
return bytesToStr(v)
case *orderedmap.OrderedMap:
m := make(map[string]any)
for _, k := range v.Keys() {
m[k], _ = v.Get(k)
m[k] = ValueToJSONType(m[k])
}
return m
}
if v, ok := in.([]any); ok {
case []any:
for i, arrVal := range v {
v[i] = ValueToJSONType(arrVal)
}
return v
}

return in
}

func bytesToStr(b []byte) string {
if len(b) == 0 {
return ""
}
return unsafe.String(unsafe.SliceData(b), len(b))
}
8 changes: 7 additions & 1 deletion internal/pkg/service/stream/api/mapper/source_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
api "github.com/keboola/keboola-as-code/internal/pkg/service/stream/api/gen/stream"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/csvfmt"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/table/column"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
Expand Down Expand Up @@ -79,9 +80,14 @@ func (m *Mapper) NewTestResultResponse(sourceKey key.SourceKey, sinks []definiti
return nil, err
}

csvValueBytes, err := csvfmt.Format(csvValue)
if err != nil {
return nil, err
}

row.Columns = append(row.Columns, &api.TestResultColumn{
Name: c.ColumnName(),
Value: csvValue,
Value: string(csvValueBytes),
})
}

Expand Down
52 changes: 52 additions & 0 deletions internal/pkg/service/stream/mapping/csvfmt/csvfmt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package csvfmt

import (
"strconv"
"unsafe"

"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

func Format(v any) ([]byte, error) {
// Inspired by cast.ToStringE(), but without slow reflection
switch s := v.(type) {
case []byte:
return s, nil
case string:
return []byte(s), nil
case bool:
return strToBytes(strconv.FormatBool(s)), nil
case float64:
return strToBytes(strconv.FormatFloat(s, 'f', -1, 64)), nil
case float32:
return strToBytes(strconv.FormatFloat(float64(s), 'f', -1, 32)), nil
case int:
return strToBytes(strconv.FormatInt(int64(s), 10)), nil
case int64:
return strToBytes(strconv.FormatInt(s, 10)), nil
case int32:
return strToBytes(strconv.FormatInt(int64(s), 10)), nil
case int16:
return strToBytes(strconv.FormatInt(int64(s), 10)), nil
case int8:
return strToBytes(strconv.FormatInt(int64(s), 10)), nil
case uint:
return strToBytes(strconv.FormatUint(uint64(s), 10)), nil
case uint64:
return strToBytes(strconv.FormatUint(s, 10)), nil
case uint32:
return strToBytes(strconv.FormatUint(uint64(s), 10)), nil
case uint16:
return strToBytes(strconv.FormatUint(uint64(s), 10)), nil
case uint8:
return strToBytes(strconv.FormatUint(uint64(s), 10)), nil
case nil:
return nil, nil
default:
return nil, errors.Errorf("unable to cast %#v of type %T to string", v, v)
}
}

func strToBytes(s string) []byte {
return unsafe.Slice(unsafe.StringData(s), len(s))
}
2 changes: 1 addition & 1 deletion internal/pkg/service/stream/mapping/jsonnet/jsonnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func bodyStrFn(fnName string, vm *jsonnet.VM[recordctx.Context]) *jsonnet.Native
return nil, errors.Errorf("no parameter expected, found %d", len(params))
}

body, err := vm.Payload().BodyString()
body, err := vm.Payload().BodyBytes()
if err != nil {
return nil, err
}
Expand Down
22 changes: 0 additions & 22 deletions internal/pkg/service/stream/mapping/recordctx/fasthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type fastHTTPContext struct {
clientIP net.IP
headersMap *orderedmap.OrderedMap
headersString *string
bodyString *string
bodyStringErr error
bodyMap *orderedmap.OrderedMap
bodyMapErr error
}
Expand Down Expand Up @@ -81,26 +79,6 @@ func (c *fastHTTPContext) HeadersMap() *orderedmap.OrderedMap {
return c.headersMap
}

func (c *fastHTTPContext) BodyString() (string, error) {
c.lock.Lock()
defer c.lock.Unlock()

if c.bodyString == nil && c.bodyStringErr == nil {
if bytes, err := c.BodyBytes(); err == nil {
v := string(bytes)
c.bodyString = &v
} else {
c.bodyStringErr = err
}
}

if c.bodyStringErr != nil {
return "", c.bodyStringErr
}

return *c.bodyString, nil
}

func (c *fastHTTPContext) BodyBytes() ([]byte, error) {
return c.req.Request.Body(), nil // returned buffer is valid until the request is released
}
Expand Down
22 changes: 0 additions & 22 deletions internal/pkg/service/stream/mapping/recordctx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type httpContext struct {
clientIP net.IP
headersMap *orderedmap.OrderedMap
headersString *string
bodyString *string
bodyStringErr error
bodyBytes []byte
bodyBytesErr error
bodyMap *orderedmap.OrderedMap
Expand Down Expand Up @@ -81,26 +79,6 @@ func (c *httpContext) HeadersMap() *orderedmap.OrderedMap {
return c.headersMap
}

func (c *httpContext) BodyString() (string, error) {
c.lock.Lock()
defer c.lock.Unlock()

if c.bodyString == nil && c.bodyStringErr == nil {
if bytes, err := c.bodyBytesWithoutLock(); err == nil {
v := string(bytes)
c.bodyString = &v
} else {
c.bodyStringErr = err
}
}

if c.bodyStringErr != nil {
return "", c.bodyStringErr
}

return *c.bodyString, nil
}

func (c *httpContext) BodyBytes() ([]byte, error) {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/service/stream/mapping/recordctx/recordctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type Context interface {
ClientIP() net.IP
HeadersString() string
HeadersMap() *orderedmap.OrderedMap
BodyString() (string, error)
BodyBytes() ([]byte, error)
BodyMap() (*orderedmap.OrderedMap, error)
}
4 changes: 2 additions & 2 deletions internal/pkg/service/stream/mapping/table/column/renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func NewRenderer() *Renderer {
}
}

func (r *Renderer) CSVValue(c Column, ctx recordctx.Context) (string, error) {
func (r *Renderer) CSVValue(c Column, ctx recordctx.Context) (any, error) {
switch c := c.(type) {
case Body:
return ctx.BodyString()
return ctx.BodyBytes()
case Datetime:
// Time is always in UTC, time format has fixed length
return ctx.Timestamp().UTC().Format(TimeFormat), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/gofrs/uuid/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/keboola/keboola-as-code/internal/pkg/service/common/ptr"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/recordctx"
Expand All @@ -23,7 +24,9 @@ func TestRenderer_UUID(t *testing.T) {

val, err := renderer.CSVValue(c, recordctx.FromHTTP(time.Now(), &http.Request{}))
assert.NoError(t, err)
id, err := uuid.FromString(val)
valStr, ok := val.(string)
require.True(t, ok)
id, err := uuid.FromString(valStr)
assert.NoError(t, err)
assert.Equal(t, uuid.V7, id.Version())
}
Expand Down Expand Up @@ -60,7 +63,7 @@ func TestRenderer_Body(t *testing.T) {
body := "a,b,c"
val, err := renderer.CSVValue(c, recordctx.FromHTTP(time.Now(), &http.Request{Body: io.NopCloser(strings.NewReader(body))}))
assert.NoError(t, err)
assert.Equal(t, "a,b,c", val)
assert.Equal(t, []byte("a,b,c"), val)
}

func TestRenderer_Headers(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func (w *Encoder) WriteRecord(record recordctx.Context) error {

// Map the record to tabular data
for i, col := range w.columns {
str, err := columnRenderer.CSVValue(col, record)
value, err := columnRenderer.CSVValue(col, record)
if err != nil {
return errors.PrefixErrorf(err, "cannot convert column %q to CSV value", col)
}
(*values)[i] = str
(*values)[i] = value
}

// Encode the values to CSV format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package fastcsv
import (
"bytes"
"io"
"strconv"
"strings"

"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/mapping/csvfmt"
)

// writer is simplified and optimized version of CSV writer.
Expand All @@ -26,7 +24,7 @@ func (w *writer) WriteRow(cols *[]any) error {
// Write each column
for n, col := range *cols {
// Cast the value to string
toWrite, err := w.toCSVValue(col)
toWrite, err := csvfmt.Format(col)
if err != nil {
return ValueError{ColumnIndex: n, err: err}
}
Expand All @@ -46,13 +44,13 @@ func (w *writer) WriteRow(cols *[]any) error {
// Write all until a special character
for len(toWrite) > 0 {
// Search for special characters
stop := strings.IndexRune(toWrite, '"')
stop := bytes.IndexRune(toWrite, '"')
if stop < 0 {
stop = len(toWrite)
}

// Copy verbatim everything before the special character.
if _, err := w.row.WriteString(toWrite[:stop]); err != nil {
if _, err := w.row.Write(toWrite[:stop]); err != nil {
return err
}

Expand Down Expand Up @@ -87,43 +85,3 @@ func (w *writer) WriteRow(cols *[]any) error {

return nil
}

func (w *writer) toCSVValue(v any) (string, error) {
// Inspired by cast.ToStringE(), but without slow reflection
switch s := v.(type) {
case string:
return s, nil
case bool:
return strconv.FormatBool(s), nil
case float64:
return strconv.FormatFloat(s, 'f', -1, 64), nil
case float32:
return strconv.FormatFloat(float64(s), 'f', -1, 32), nil
case int:
return strconv.FormatInt(int64(s), 10), nil
case int64:
return strconv.FormatInt(s, 10), nil
case int32:
return strconv.FormatInt(int64(s), 10), nil
case int16:
return strconv.FormatInt(int64(s), 10), nil
case int8:
return strconv.FormatInt(int64(s), 10), nil
case uint:
return strconv.FormatUint(uint64(s), 10), nil
case uint64:
return strconv.FormatUint(s, 10), nil
case uint32:
return strconv.FormatUint(uint64(s), 10), nil
case uint16:
return strconv.FormatUint(uint64(s), 10), nil
case uint8:
return strconv.FormatUint(uint64(s), 10), nil
case []byte:
return string(s), nil
case nil:
return "", nil
default:
return "", errors.Errorf("unable to cast %#v of type %T to string", v, v)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,8 @@ func (p *pipeline) WriteRecord(record recordctx.Context) error {
// Increments number of high-level writes in progress
p.acceptedWrites.Add(timestamp, 1)

// Set sync timeout
ctx, cancel := context.WithTimeout(record.Ctx(), 20*time.Second)
defer cancel()

// Wait for sync and return sync error, if any
if err := notifier.Wait(ctx); err != nil {
if err := notifier.Wait(record.Ctx()); err != nil {
return errors.PrefixError(err, "error when waiting for sync")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,12 +598,14 @@ func newDummyEncoder(out io.Writer, writeDone chan struct{}) *dummyEncoder {
}

func (w *dummyEncoder) WriteRecord(record recordctx.Context) error {
body, err := record.BodyString()
body, err := record.BodyBytes()
if err != nil {
return err
}

_, err = w.out.Write([]byte(body + "\n"))
body = append(body, '\n')

_, err = w.out.Write(body)
if err == nil && w.writeDone != nil {
w.writeDone <- struct{}{}
}
Expand Down

0 comments on commit 591a741

Please sign in to comment.