Skip to content

Commit

Permalink
Remove strict_types flag - we will always try to add node.data
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed May 18, 2023
1 parent 9a2cf3c commit 0d89c8a
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 290 deletions.
103 changes: 62 additions & 41 deletions e2e/file_clickhouse/clickhouse_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ type Config struct {
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
r := require.New(t)
c.ctx, c.cancel = context.WithTimeout(context.Background(), time.Minute*2)

conn, err := ch.Dial(c.ctx, ch.Options{
Address: "127.0.0.1:9001",
})
require.NoError(t, err)
r.NoError(err)
c.conn = conn

err = conn.Do(c.ctx, ch.Query{
Body: `DROP TABLE IF EXISTS test_table_insert`})
require.NoError(t, err)
r.NoError(err)

err = conn.Do(c.ctx, ch.Query{
Body: `CREATE TABLE IF NOT EXISTS test_table_insert
Expand All @@ -56,10 +57,13 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
ts_with_tz DateTime('Europe/Moscow'),
ts64 DateTime64(3),
ts64_auto DateTime64(9, 'UTC'),
ts_rfc3339nano DateTime64(9),
f32 Float32,
f64 Float64,
created_at DateTime64(6, 'UTC') DEFAULT now()
) ENGINE = Memory`,
})
require.NoError(t, err)
r.NoError(err)

c.inputDir = t.TempDir()
offsetsDir := t.TempDir()
Expand Down Expand Up @@ -109,29 +113,36 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
TS: c.sampleTime,
TSWithTZ: c.sampleTime,
TS64: c.sampleTime,
F32: 542.12345454545454,
F64: 0.5555555555555555,
},
}
}

func (c *Config) Send(t *testing.T) {
r := require.New(t)

file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
r.NoError(err)
defer func() {
_ = file.Close()
}()

for i := range c.samples {
sampleRaw, err := json.Marshal(&c.samples[i])
require.NoError(t, err)
r.NoError(err)
_, err = file.Write(sampleRaw)
require.NoError(t, err)
r.NoError(err)
_, err = file.WriteString("\n")
require.NoError(t, err)
r.NoError(err)
}
_ = file.Sync()
}

func (c *Config) Validate(t *testing.T) {
a := assert.New(t)
r := require.New(t)

var rows int
for i := 0; i < 1000; i++ {
cnt := proto.ColUInt64{}
Expand All @@ -141,7 +152,7 @@ func (c *Config) Validate(t *testing.T) {
{Name: "count()", Data: &cnt},
},
})
require.NoError(t, err)
r.NoError(err)

rows = int(cnt.Row(0))
if rows != len(c.samples) {
Expand All @@ -150,27 +161,30 @@ func (c *Config) Validate(t *testing.T) {
}
}

assert.Equal(t, len(c.samples), rows)
a.Equal(len(c.samples), rows)

var (
c1 = new(proto.ColStr)
c2 = new(proto.ColInt8)
c3 = new(proto.ColInt16)
c4 = new(proto.ColInt16).Nullable()
c5 = new(proto.ColStr).Nullable()
level = new(proto.ColEnum8)
ipv4 = new(proto.ColIPv4).Nullable()
ipv6 = new(proto.ColIPv6).Nullable()
ts = new(proto.ColDateTime)
tsWithTz = new(proto.ColDateTime)
ts64 = new(proto.ColDateTime64)
ts64Auto = new(proto.ColDateTime64)
c1 = new(proto.ColStr)
c2 = new(proto.ColInt8)
c3 = new(proto.ColInt16)
c4 = new(proto.ColInt16).Nullable()
c5 = new(proto.ColStr).Nullable()
level = new(proto.ColEnum8)
ipv4 = new(proto.ColIPv4).Nullable()
ipv6 = new(proto.ColIPv6).Nullable()
ts = new(proto.ColDateTime)
tsWithTz = new(proto.ColDateTime)
ts64 = new(proto.ColDateTime64)
ts64Auto = new(proto.ColDateTime64)
ts3339nano = new(proto.ColDateTime64)
f32 = new(proto.ColFloat32)
f64 = new(proto.ColFloat64)
)
ts64.WithPrecision(proto.PrecisionMilli)
ts64Auto.WithPrecision(proto.PrecisionNano)

require.NoError(t, c.conn.Do(c.ctx, ch.Query{
Body: `select c1, c2, c3, c4, c5, level, ipv4, ipv6, ts, ts_with_tz, ts64, ts64_auto
Body: `select c1, c2, c3, c4, c5, level, ipv4, ipv6, ts, ts_with_tz, ts64, ts64_auto, ts_rfc3339nano, f32, f64
from test_table_insert
order by c1`,
Result: proto.Results{
Expand All @@ -186,35 +200,42 @@ func (c *Config) Validate(t *testing.T) {
proto.ResultColumn{Name: "ts_with_tz", Data: tsWithTz},
proto.ResultColumn{Name: "ts64", Data: ts64},
proto.ResultColumn{Name: "ts64_auto", Data: ts64Auto},
proto.ResultColumn{Name: "ts_rfc3339nano", Data: ts3339nano},
proto.ResultColumn{Name: "f32", Data: f32},
proto.ResultColumn{Name: "f64", Data: f64},
},
OnResult: func(_ context.Context, _ proto.Block) error {
return nil
},
}))

assert.Equal(t, len(c.samples), c1.Rows())
a.Equal(len(c.samples), c1.Rows())
for i := 0; i < c1.Rows(); i++ {
sample := c.samples[i]

c1Expected, _ := json.Marshal(sample.C1)
assert.Equal(t, string(trim(c1Expected)), c1.Row(i))

assert.Equal(t, sample.C2, c2.Row(i))
assert.Equal(t, sample.C3, c3.Row(i))
assert.Equal(t, sample.C4, c4.Row(i))
assert.Equal(t, sample.C5, c5.Row(i))
assert.Equal(t, sample.IPv4, ipv4.Row(i))
assert.Equal(t, sample.IPv6, ipv6.Row(i))
assert.Equal(t, sample.TS.Unix(), ts.Row(i).Unix())

assert.Equal(t, sample.TSWithTZ.Unix(), tsWithTz.Row(i).Unix())
assert.Equal(t, "Europe/Moscow", tsWithTz.Row(i).Location().String())

assert.Equal(t, sample.TS64.UnixMilli()*1e6, ts64.Row(i).UnixNano())
assert.Equal(t, "Local", ts64.Row(i).Location().String())

assert.True(t, ts64Auto.Row(i).After(sample.TS64), "%s before %s", ts64Auto.Row(i).String(), sample.TS64.String()) // we are use set_time plugin and override this value
assert.Equal(t, "UTC", ts64Auto.Row(i).Location().String())
a.Equal(string(trim(c1Expected)), c1.Row(i))

a.Equal(sample.C2, c2.Row(i))
a.Equal(sample.C3, c3.Row(i))
a.Equal(sample.C4, c4.Row(i))
a.Equal(sample.C5, c5.Row(i))
a.Equal(sample.IPv4, ipv4.Row(i))
a.Equal(sample.IPv6, ipv6.Row(i))
a.Equal(sample.TS.Unix(), ts.Row(i).Unix())
a.False(ts3339nano.Row(i).IsZero())
a.Greater(ts3339nano.Row(0), time.Now().Add(-time.Second*20))
a.Equal(sample.F32, f32.Row(i))
a.Equal(sample.F64, f64.Row(i))

a.Equal(sample.TSWithTZ.Unix(), tsWithTz.Row(i).Unix())
a.Equal("Europe/Moscow", tsWithTz.Row(i).Location().String())

a.Equal(sample.TS64.UnixMilli()*1e6, ts64.Row(i).UnixNano())
a.Equal("Local", ts64.Row(i).Location().String())

a.True(ts64Auto.Row(i).After(sample.TS64), "%s before %s", ts64Auto.Row(i).String(), sample.TS64.String()) // we are use set_time plugin and override this value
a.Equal("UTC", ts64Auto.Row(i).Location().String())
}
}

Expand Down
15 changes: 14 additions & 1 deletion e2e/file_clickhouse/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ pipelines:
input:
type: file
actions:
- type: debug
- type: set_time
format: unixtime
field: ts
override: false
- type: set_time
format: unixtime
field: ts_with_tz
override: false
- type: set_time
format: timestampnano
field: ts64_auto
override: true
- type: set_time
format: rfc3339nano
field: ts_rfc3339nano
override: true
- type: debug
output:
type: clickhouse
addresses:
Expand Down Expand Up @@ -44,3 +51,9 @@ pipelines:
type: DateTime64(3, 'UTC')
- name: ts64_auto
type: DateTime64(9, 'UTC')
- name: ts_rfc3339nano
type: DateTime64(9)
- name: f32
type: Float32
- name: f64
type: Float64
11 changes: 10 additions & 1 deletion e2e/file_clickhouse/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ type Sample struct {
TS time.Time `json:"ts"`
TSWithTZ time.Time `json:"ts_with_tz"`
TS64 time.Time `json:"ts_64"`
TS64Auto time.Time `json:"ts_64_auto"`
F32 float32 `json:"f32"`
F64 float64 `json:"f64"`

// we are set this in the set_time action
TS64Auto time.Time `json:"ts_64_auto"`
TSRFC3339Nano time.Time `json:"ts_rfc3339nano"`
}

var _ json.Marshaler = (*Sample)(nil)
Expand Down Expand Up @@ -52,6 +57,8 @@ func (s *Sample) MarshalJSON() ([]byte, error) {
Level string `json:"level,omitempty"`
Ipv4 string `json:"ipv4,omitempty"`
Ipv6 string `json:"ipv6,omitempty"`
F32 float32 `json:"f32"`
F64 float64 `json:"f64"`
TS int64 `json:"ts"`
TSWithTZ int64 `json:"ts_with_tz"`
TS64 int64 `json:"ts64"`
Expand All @@ -64,6 +71,8 @@ func (s *Sample) MarshalJSON() ([]byte, error) {
Level: levelToString[s.Level],
Ipv4: ipv4,
Ipv6: ipv6,
F32: s.F32,
F64: s.F64,
TS: s.TS.Unix(),
TSWithTZ: s.TS.Unix(),
TS64: s.TS64.UnixMilli(),
Expand Down
15 changes: 3 additions & 12 deletions plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,6 @@ type Config struct {
// > If you need more types, please, create an issue.
Columns []Column `json:"columns" required:"true"` // *

// > @3@4@5@6
// >
// > If false, file.d will try to cast event type to column type.
// For example, if an event value is an object, but the column type is a String,
// the object will be encoded to string.
// > If true, file.d will fall when types are mismatched.
StrictTypes bool `json:"strict_types" default:"true"` // *

// > @3@4@5@6
// >
// > The level of the Compression.
Expand Down Expand Up @@ -281,7 +273,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.logger.Fatal("'db_request_timeout' can't be <1")
}

schema, err := inferInsaneColInputs(p.config.Columns, p.config.StrictTypes)
schema, err := inferInsaneColInputs(p.config.Columns, p.logger)
if err != nil {
p.logger.Fatal("invalid database schema", zap.Error(err))
}
Expand Down Expand Up @@ -385,7 +377,7 @@ func (d data) reset() {
func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
if *workerData == nil {
// we don't check the error, schema already validated in the Start
columns, _ := inferInsaneColInputs(p.config.Columns, p.config.StrictTypes)
columns, _ := inferInsaneColInputs(p.config.Columns, p.logger)
input := inputFromColumns(columns)
*workerData = data{
cols: columns,
Expand All @@ -398,9 +390,8 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {

for _, event := range batch.Events {
for _, col := range data.cols {
node, _ := event.Root.DigStrict(col.Name)
node := event.Root.Dig(col.Name)
if err := col.ColInput.Append(node); err != nil {
// TODO: handle case when we can't append in the batch, e.g. columns is not nullable, but value it is
p.logger.Fatal("can't append value in the batch",
zap.Error(err),
zap.String("column", col.Name),
Expand Down
2 changes: 2 additions & 0 deletions plugin/output/clickhouse/colgenerator/colgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ func clickhouseTypes() []Type {
Type{
ChTypeName: "Float32",
GoName: "float32",
CustomImpl: true,
},
Type{
ChTypeName: "Float64",
GoName: "float64",
CustomImpl: true,
},
Type{
ChTypeName: "DateTime",
Expand Down
15 changes: 3 additions & 12 deletions plugin/output/clickhouse/colgenerator/insane_column.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func New{{- $type.ColumnTypeName }}(nullable bool) *{{ $type.ColumnTypeName }} {
{{ end }}

// Append the insaneJSON.Node to the batch.
func (t *{{ $type.ColumnTypeName }}) Append(node *insaneJSON.StrictNode) error {
func (t *{{ $type.ColumnTypeName }}) Append(node *insaneJSON.Node) error {
if node == nil || node.IsNull() {
{{- if $type.CannotBeNull }}
return ErrNodeIsNil
Expand All @@ -60,18 +60,9 @@ func (t *{{ $type.ColumnTypeName }}) Append(node *insaneJSON.StrictNode) error {
{{- end }}
}
{{ if $type.CannotConvert -}}
val, err := node.{{- $type.InsaneConvertFunc }}()
if err != nil {
return err
}
val := node.{{- $type.InsaneConvertFunc }}()
{{ else -}}
v, err := node.{{- $type.InsaneConvertFunc }}()
if err != nil {
return err
}

val := {{ $type.ConvertInsaneJSONValue }}(v)

val := {{ $type.ConvertInsaneJSONValue }}(node.{{- $type.InsaneConvertFunc }}())
{{ end }}
{{- if not $type.CannotBeNull -}}
if t.nullable {
Expand Down
Loading

0 comments on commit 0d89c8a

Please sign in to comment.