From 7ab9afd84106e31af8f1ead76c7d5b2d7205eebc Mon Sep 17 00:00:00 2001 From: ihippik Date: Mon, 12 Sep 2022 22:28:11 +0400 Subject: [PATCH] Add some OID types --- listener/pg_type.go | 21 +++++++++++++++++ listener/wal_transaction.go | 45 ++++++++++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 10 deletions(-) create mode 100644 listener/pg_type.go diff --git a/listener/pg_type.go b/listener/pg_type.go new file mode 100644 index 00000000..7387c6bd --- /dev/null +++ b/listener/pg_type.go @@ -0,0 +1,21 @@ +package listener + +// PostgreSQL OIDs +// https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.dat +const ( + Int2OID = 21 + Int4OID = 23 + Int8OID = 20 + + TextOID = 25 + VarcharOID = 1043 + + TimestampOID = 1114 + TimestamptzOID = 1184 + DateOID = 1082 + TimeOID = 1083 + + JSONBOID = 3802 + UUIDOID = 2950 + BoolOID = 16 +) diff --git a/listener/wal_transaction.go b/listener/wal_transaction.go index f4ce0f8a..fc644978 100644 --- a/listener/wal_transaction.go +++ b/listener/wal_transaction.go @@ -7,7 +7,6 @@ import ( "time" "github.com/google/uuid" - "github.com/jackc/pgx/pgtype" "github.com/sirupsen/logrus" ) @@ -67,26 +66,52 @@ type Column struct { // AssertValue converts bytes to a specific type depending // on the type of this data in the database table. func (c *Column) AssertValue(src []byte) { - var val interface{} + var ( + val any + err error + ) + if src == nil { c.value = nil return } + strSrc := string(src) + + const ( + timestampLayout = "2006-01-02 15:04:05" + timestampWithTZLayout = "2006-01-02 15:04:05.000000-07" + ) + switch c.valueType { - case pgtype.BoolOID: - val, _ = strconv.ParseBool(strSrc) - case pgtype.Int4OID: - val, _ = strconv.Atoi(strSrc) - case pgtype.TextOID, pgtype.VarcharOID: + case BoolOID: + val, err = strconv.ParseBool(strSrc) + case Int2OID, Int4OID: + val, err = strconv.Atoi(strSrc) + case Int8OID: + val, err = strconv.ParseInt(strSrc, 10, 64) + case TextOID, VarcharOID: val = strSrc - case pgtype.TimestampOID, pgtype.TimestamptzOID: + case TimestampOID: + val, err = time.Parse(timestampLayout, strSrc) + case TimestamptzOID: + val, err = time.Parse(timestampWithTZLayout, strSrc) + case DateOID, TimeOID: + val = strSrc + case UUIDOID: + val, err = uuid.Parse(strSrc) + case JSONBOID: val = strSrc default: - logrus.WithField("pgtype", c.valueType). - Warnln("unknown oid type") + logrus.WithFields(logrus.Fields{"pgtype": c.valueType, "column_name": c.name}).Warnln("unknown oid type") val = strSrc } + + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{"pgtype": c.valueType, "column_name": c.name}). + Errorln("column data parse error") + } + c.value = val }