Skip to content

Commit

Permalink
Restore strict_types flag
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed May 22, 2023
1 parent e9a8227 commit aed89a2
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 101 deletions.
23 changes: 22 additions & 1 deletion plugin/output/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,19 @@ type Config struct {
// > If you need more types, please, create an issue.
Columns []Column `json:"columns" required:"true"` // *

// > @3@4@5@6
// >
// > If true, file.d will fall when types are mismatched.
// >
// > If false, file.d will cast any JSON type to the column type.
// > For example, if an event value is a Number, but the column type is a Bool,
// > the Number will be converted to the "true" if the value is "1".
// > But if the value is an Object and the column is an Int
// > File.d converts the Object to "0" to prevent fall.
// >
// > Note: String column accepts any json type - the value will be encoded to JSON.
StrictTypes bool `json:"strict_types" default:"true"` // *

// > @3@4@5@6
// >
// > The level of the Compression.
Expand Down Expand Up @@ -391,7 +404,15 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) {
for _, event := range batch.Events {
for _, col := range data.cols {
node := event.Root.Dig(col.Name)
if err := col.ColInput.Append(node); err != nil {

var insaneNode InsaneNode
if node != nil && p.config.StrictTypes {
insaneNode = node.MutateToStrict()
} else if node != nil {
insaneNode = NonStrictNode{node}
}

if err := col.ColInput.Append(insaneNode); err != nil {
p.logger.Fatal("can't append value in the batch",
zap.Error(err),
zap.String("column", col.Name),
Expand Down
20 changes: 11 additions & 9 deletions plugin/output/clickhouse/colgenerator/insane_column.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package clickhouse

import (
"github.com/ClickHouse/ch-go/proto"
insaneJSON "github.com/vitkovskii/insane-json"
)

{{ range $type := .Types }}
Expand Down Expand Up @@ -47,7 +46,7 @@ func New{{- $type.ColumnTypeName }}(nullable bool) *{{ $type.ColumnTypeName }} {
{{ end }}

// Append the insaneJSON.Node to the batch.
func (t *{{ $type.ColumnTypeName }}) Append(node *insaneJSON.Node) error {
func (t *{{ $type.ColumnTypeName }}) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
{{- if $type.CannotBeNull }}
return ErrNodeIsNil
Expand All @@ -60,15 +59,18 @@ func (t *{{ $type.ColumnTypeName }}) Append(node *insaneJSON.Node) error {
{{- end }}
}
{{ if $type.CannotConvert -}}
val := node.{{- $type.InsaneConvertFunc }}()
val, err := node.{{- $type.InsaneConvertFunc }}()
if err != nil {
return err
}
{{ else -}}
val := {{ $type.ConvertInsaneJSONValue }}(node.{{- $type.InsaneConvertFunc }}())
{{ end }}
{{- if not $type.CannotBeNull -}}
if t.nullable {
t.nullCol.Append(proto.NewNullable(val))
return nil
v, err := node.{{- $type.InsaneConvertFunc }}()
if err != nil {
return err
}

val := {{ $type.ConvertInsaneJSONValue }}(v)

{{ end -}}

t.col.Append(val)
Expand Down
89 changes: 66 additions & 23 deletions plugin/output/clickhouse/column_custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/ClickHouse/ch-go/proto"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)

Expand All @@ -28,20 +27,25 @@ func NewColDateTime(col *proto.ColDateTime) *ColDateTime {
}
}

func (t *ColDateTime) Append(node *insaneJSON.Node) error {
func (t *ColDateTime) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
return ErrNodeIsNil
}

var val time.Time
switch {
case node.IsNumber():
val = time.Unix(int64(node.AsInt()), 0)
nodeVal, err := node.AsInt64()
if err != nil {
return err
}

val = time.Unix(nodeVal, 0)
case node.IsString():
var err error
val, err = time.Parse(time.RFC3339Nano, node.AsString())
val, err = parseRFC3339Nano(node)
if err != nil {
return fmt.Errorf("parse time as RFC3339Nano: %w", err)
return err
}
default:
return fmt.Errorf("value=%q is not a string or number", node.EncodeToString())
Expand All @@ -65,22 +69,27 @@ func NewColDateTime64(col *proto.ColDateTime64, scale int64) *ColDateTime64 {
}
}

func (t *ColDateTime64) Append(node *insaneJSON.Node) error {
func (t *ColDateTime64) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
return ErrNodeIsNil
}

var val time.Time
switch {
case node.IsNumber():
v := node.AsInt64()
v, err := node.AsInt64()
if err != nil {
return err
}

// convert to nanoseconds
nsec := v * t.scale
val = time.Unix(nsec/1e9, nsec%1e9)
case node.IsString():
var err error
val, err = time.Parse(time.RFC3339Nano, node.AsString())
val, err = parseRFC3339Nano(node)
if err != nil {
return fmt.Errorf("parse time as RFC3339Nano: %w", err)
return err
}
default:
return fmt.Errorf("value=%q is not a string or number", node.EncodeToString())
Expand All @@ -106,7 +115,7 @@ func NewColIPv4(nullable bool) *ColIPv4 {
}
}

func (t *ColIPv4) Append(node *insaneJSON.Node) error {
func (t *ColIPv4) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
if !t.nullable {
return ErrNodeIsNil
Expand Down Expand Up @@ -150,7 +159,7 @@ func NewColIPv6(nullable bool) *ColIPv6 {
}
}

func (t *ColIPv6) Append(node *insaneJSON.Node) error {
func (t *ColIPv6) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
if !t.nullable {
return ErrNodeIsNil
Expand Down Expand Up @@ -190,11 +199,15 @@ func NewColEnum8(col *proto.ColEnum) *ColEnum8 {
}
}

func (t *ColEnum8) Append(node *insaneJSON.Node) error {
func (t *ColEnum8) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
return ErrNodeIsNil
}
val := node.AsString()
val, err := node.AsString()
if err != nil {
return err
}

t.col.Append(val)

return nil
Expand All @@ -211,11 +224,15 @@ func NewColEnum16(col *proto.ColEnum) *ColEnum8 {
}
}

func (t *ColEnum16) Append(node *insaneJSON.Node) error {
func (t *ColEnum16) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
return ErrNodeIsNil
}
val := node.AsString()
val, err := node.AsString()
if err != nil {
return err
}

t.col.Append(val)

return nil
Expand All @@ -239,7 +256,7 @@ func NewColString(nullable bool, logger *zap.Logger) *ColString {
}

// Append the insaneJSON.Node to the batch.
func (t *ColString) Append(node *insaneJSON.Node) error {
func (t *ColString) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
if !t.nullable {
return ErrNodeIsNil
Expand All @@ -250,7 +267,11 @@ func (t *ColString) Append(node *insaneJSON.Node) error {

var val string
if node.IsString() {
val = node.AsString()
var err error
val, err = node.AsString()
if err != nil {
return err
}
} else {
val = node.EncodeToString()
t.logger.Warn("appending non-string value to the String column", zap.String("value", val))
Expand Down Expand Up @@ -281,15 +302,18 @@ func NewColFloat32(nullable bool) *ColFloat32 {
}

// Append the insaneJSON.Node to the batch.
func (t *ColFloat32) Append(node *insaneJSON.Node) error {
func (t *ColFloat32) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
if !t.nullable {
return ErrNodeIsNil
}
t.nullCol.Append(proto.Null[float32]())
return nil
}
v := node.AsString()
v, err := node.AsString()
if err != nil {
return err
}

val, err := strconv.ParseFloat(v, 32)
if err != nil {
Expand Down Expand Up @@ -321,15 +345,18 @@ func NewColFloat64(nullable bool) *ColFloat64 {
}

// Append the insaneJSON.Node to the batch.
func (t *ColFloat64) Append(node *insaneJSON.Node) error {
func (t *ColFloat64) Append(node InsaneNode) error {
if node == nil || node.IsNull() {
if !t.nullable {
return ErrNodeIsNil
}
t.nullCol.Append(proto.Null[float64]())
return nil
}
v := node.AsString()
v, err := node.AsString()
if err != nil {
return err
}

val, err := strconv.ParseFloat(v, 64)
if err != nil {
Expand All @@ -345,8 +372,11 @@ func (t *ColFloat64) Append(node *insaneJSON.Node) error {
return nil
}

func ipFromNode(node *insaneJSON.Node) (netip.Addr, error) {
v := node.AsString()
func ipFromNode(node InsaneNode) (netip.Addr, error) {
v, err := node.AsString()
if err != nil {
return netip.Addr{}, err
}

addr, err := netip.ParseAddr(v)
if err != nil {
Expand All @@ -355,3 +385,16 @@ func ipFromNode(node *insaneJSON.Node) (netip.Addr, error) {

return addr, nil
}

func parseRFC3339Nano(node InsaneNode) (time.Time, error) {
nodeVal, err := node.AsString()
if err != nil {
return time.Time{}, err
}

t, err := time.Parse(time.RFC3339Nano, nodeVal)
if err != nil {
return time.Time{}, fmt.Errorf("parse time as RFC3339Nano: %w", err)
}
return t, nil
}
Loading

0 comments on commit aed89a2

Please sign in to comment.