From 668baa650668907d8c8e62a7962b988e46f01396 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 24 Jan 2024 19:10:15 +0800 Subject: [PATCH] add KeyExtractor interface and util --- .../flink/sink/OBKVHBaseDynamicTableSink.java | 3 +- .../flink/OBKVHBaseConnectorITCase.java | 14 ++++++---- .../connector/flink/sink/OceanBaseSink.java | 5 ++-- .../connector/flink/sink/OceanBaseWriter.java | 8 ++---- .../flink/table/DataChangeRecord.java | 15 ++++++++++ .../connector/flink/table/HTableInfo.java | 6 ++++ .../connector/flink/table/Table.java | 3 ++ .../connector/flink/table/TableInfo.java | 5 ++++ .../flink/OceanBaseConnectorOptions.java | 11 ++++++-- .../OceanBaseConnectionProvider.java | 2 +- .../flink/sink/OceanBaseDynamicTableSink.java | 9 ++---- .../flink/OceanBaseConnectorITCase.java | 28 ++----------------- 12 files changed, 58 insertions(+), 51 deletions(-) diff --git a/flink-connector-obkv-hbase/src/main/java/com/oceanbase/connector/flink/sink/OBKVHBaseDynamicTableSink.java b/flink-connector-obkv-hbase/src/main/java/com/oceanbase/connector/flink/sink/OBKVHBaseDynamicTableSink.java index 691bc7e2..04082e3a 100644 --- a/flink-connector-obkv-hbase/src/main/java/com/oceanbase/connector/flink/sink/OBKVHBaseDynamicTableSink.java +++ b/flink-connector-obkv-hbase/src/main/java/com/oceanbase/connector/flink/sink/OBKVHBaseDynamicTableSink.java @@ -18,6 +18,7 @@ import com.oceanbase.connector.flink.OBKVHBaseConnectorOptions; import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider; +import com.oceanbase.connector.flink.table.DataChangeRecord; import com.oceanbase.connector.flink.table.HTableInfo; import com.oceanbase.connector.flink.table.OBKVHBaseRowDataSerializationSchema; import com.oceanbase.connector.flink.table.TableId; @@ -51,7 +52,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { connectorOptions.getSchemaName(), connectorOptions.getTableName()), physicalSchema)), - null, + DataChangeRecord.KeyExtractor.simple(), new OBKVHBaseRecordFlusher(connectionProvider))); } diff --git a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java index 136c5e3e..064271c9 100644 --- a/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java +++ b/flink-connector-obkv-hbase/src/test/java/com/oceanbase/connector/flink/OBKVHBaseConnectorITCase.java @@ -16,10 +16,10 @@ package com.oceanbase.connector.flink; -import com.oceanbase.connector.flink.connection.CompatibleMode; import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider; import com.oceanbase.connector.flink.sink.OBKVHBaseRecordFlusher; import com.oceanbase.connector.flink.sink.OceanBaseSink; +import com.oceanbase.connector.flink.table.DataChangeRecord; import com.oceanbase.connector.flink.table.HTableInfo; import com.oceanbase.connector.flink.table.OBKVHBaseRowDataSerializationSchema; import com.oceanbase.connector.flink.table.TableId; @@ -89,10 +89,12 @@ protected Map getOptions() { public void before() throws Exception { OBKVHBaseConnectorOptions options = new OBKVHBaseConnectorOptions(getOptions()); OBKVHBaseConnectionProvider connectionProvider = new OBKVHBaseConnectionProvider(options); - client = - connectionProvider.getHTableClient( - new TableId( - CompatibleMode.HBASE, OB_SERVER.getDatabaseName(), getTestTable())); + TableId tableId = + new TableId( + connectionProvider.getCompatibleMode(), + options.getSchemaName(), + options.getTableName()); + client = connectionProvider.getHTableClient(tableId); } @After @@ -153,7 +155,7 @@ public void testDataStreamSink() throws Exception { connectorOptions.getSchemaName(), connectorOptions.getTableName()), physicalSchema)), - null, + DataChangeRecord.KeyExtractor.simple(), new OBKVHBaseRecordFlusher(connectionProvider)); List dataSet = diff --git a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseSink.java b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseSink.java index 8e7b779a..4efac062 100644 --- a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseSink.java +++ b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseSink.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.util.function.SerializableFunction; public class OceanBaseSink implements Sink { @@ -32,14 +31,14 @@ public class OceanBaseSink implements Sink { private final ConnectorOptions options; private final TypeSerializer typeSerializer; private final RecordSerializationSchema recordSerializer; - private final SerializableFunction keyExtractor; + private final DataChangeRecord.KeyExtractor keyExtractor; private final RecordFlusher recordFlusher; public OceanBaseSink( ConnectorOptions options, TypeSerializer typeSerializer, RecordSerializationSchema recordSerializer, - SerializableFunction keyExtractor, + DataChangeRecord.KeyExtractor keyExtractor, RecordFlusher recordFlusher) { this.options = options; this.typeSerializer = typeSerializer; diff --git a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseWriter.java b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseWriter.java index bf4da87d..e0109767 100644 --- a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseWriter.java +++ b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseWriter.java @@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.util.concurrent.ExecutorThreadFactory; -import org.apache.flink.util.function.SerializableFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +49,7 @@ public class OceanBaseWriter implements SinkWriter { private final SinkWriterMetricGroup metricGroup; private final TypeSerializer typeSerializer; private final RecordSerializationSchema recordSerializer; - private final SerializableFunction keyExtractor; + private final DataChangeRecord.KeyExtractor keyExtractor; private final RecordFlusher recordFlusher; private final List schemaChangeRecords = new ArrayList<>(1); @@ -70,7 +69,7 @@ public OceanBaseWriter( Sink.InitContext initContext, TypeSerializer typeSerializer, RecordSerializationSchema recordSerializer, - SerializableFunction keyExtractor, + DataChangeRecord.KeyExtractor keyExtractor, RecordFlusher recordFlusher) { this.options = options; this.metricGroup = initContext.metricGroup(); @@ -119,12 +118,11 @@ private void addToBuffer(Record record) { if (record instanceof SchemaChangeRecord) { synchronized (schemaChangeRecords) { schemaChangeRecords.add((SchemaChangeRecord) record); - bufferCount++; metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc(); } } else if (record instanceof DataChangeRecord) { DataChangeRecord dataChangeRecord = (DataChangeRecord) record; - Object key = keyExtractor == null ? null : keyExtractor.apply(dataChangeRecord); + Object key = keyExtractor == null ? null : keyExtractor.extract(dataChangeRecord); if (key == null) { synchronized (dataChangeRecordBuffer) { dataChangeRecordBuffer diff --git a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/DataChangeRecord.java b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/DataChangeRecord.java index ebb72fde..d2503534 100644 --- a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/DataChangeRecord.java +++ b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/DataChangeRecord.java @@ -16,6 +16,8 @@ package com.oceanbase.connector.flink.table; +import java.io.Serializable; + public class DataChangeRecord implements Record { private static final long serialVersionUID = 1L; @@ -25,6 +27,19 @@ public enum Type { DELETE, } + public interface KeyExtractor extends Serializable { + + Object extract(DataChangeRecord record); + + static KeyExtractor simple() { + return record -> + new DataChangeRecordData( + record.getTable().getKey().stream() + .map(record::getFieldValue) + .toArray()); + } + } + private final Table table; private final Type type; private final DataChangeRecordData data; diff --git a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/HTableInfo.java b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/HTableInfo.java index d8343d18..08ce6538 100644 --- a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/HTableInfo.java +++ b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/HTableInfo.java @@ -22,6 +22,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RowType; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -118,6 +119,11 @@ public TableId getTableId() { return tableId; } + @Override + public List getKey() { + return Collections.singletonList(rowKeyName); + } + public String getRowKeyName() { return rowKeyName; } diff --git a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/Table.java b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/Table.java index 8de859ac..711893ed 100644 --- a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/Table.java +++ b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/Table.java @@ -17,10 +17,13 @@ package com.oceanbase.connector.flink.table; import java.io.Serializable; +import java.util.List; public interface Table extends Serializable { TableId getTableId(); + List getKey(); + Integer getFieldIndex(String fieldName); } diff --git a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/TableInfo.java b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/TableInfo.java index dcdf4a62..2613d3e6 100644 --- a/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/TableInfo.java +++ b/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/table/TableInfo.java @@ -73,6 +73,11 @@ public TableId getTableId() { return tableId; } + @Override + public List getKey() { + return primaryKey; + } + @Override public Integer getFieldIndex(String fieldName) { return fieldIndexMap.get(fieldName); diff --git a/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/OceanBaseConnectorOptions.java b/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/OceanBaseConnectorOptions.java index 3dee88fd..337649fd 100644 --- a/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/OceanBaseConnectorOptions.java +++ b/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/OceanBaseConnectorOptions.java @@ -16,6 +16,7 @@ package com.oceanbase.connector.flink; +import com.oceanbase.connector.flink.connection.CompatibleMode; import com.oceanbase.connector.flink.utils.OptionUtils; import org.apache.flink.configuration.ConfigOption; @@ -89,12 +90,16 @@ public class OceanBaseConnectorOptions extends ConnectorOptions { public OceanBaseConnectorOptions(Map config) { super(config); + CompatibleMode compatibleMode = getCompatibleMode(); + if (CompatibleMode.MYSQL != compatibleMode && CompatibleMode.ORACLE != compatibleMode) { + throw new IllegalArgumentException("Unsupported compatible mode: " + compatibleMode); + } if (getUrl().contains("mysql")) { if (!getDriverClassName().contains("mysql")) { throw new IllegalArgumentException( "Wrong 'driver-class-name', should use mysql driver for url: " + getUrl()); } - if (!getCompatibleMode().equalsIgnoreCase("mysql")) { + if (!CompatibleMode.MYSQL.equals(compatibleMode)) { throw new IllegalArgumentException( "Wrong 'compatible-mode', the mysql driver can only be used on 'mysql' mode."); } @@ -114,8 +119,8 @@ public OceanBaseConnectorOptions(Map config) { } } - public String getCompatibleMode() { - return allConfig.get(COMPATIBLE_MODE); + public CompatibleMode getCompatibleMode() { + return CompatibleMode.parse(allConfig.get(COMPATIBLE_MODE)); } public String getDriverClassName() { diff --git a/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/connection/OceanBaseConnectionProvider.java b/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/connection/OceanBaseConnectionProvider.java index 9fae8397..f8b64bb7 100644 --- a/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/connection/OceanBaseConnectionProvider.java +++ b/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/connection/OceanBaseConnectionProvider.java @@ -73,7 +73,7 @@ public String toString() { public OceanBaseConnectionProvider(OceanBaseConnectorOptions options) { this.options = options; - this.compatibleMode = CompatibleMode.parse(options.getCompatibleMode()); + this.compatibleMode = options.getCompatibleMode(); this.dialect = CompatibleMode.MYSQL.equals(compatibleMode) ? new OceanBaseMySQLDialect() diff --git a/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseDynamicTableSink.java b/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseDynamicTableSink.java index accfe560..38345f41 100644 --- a/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseDynamicTableSink.java +++ b/flink-connector-oceanbase/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseDynamicTableSink.java @@ -18,7 +18,7 @@ import com.oceanbase.connector.flink.OceanBaseConnectorOptions; import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider; -import com.oceanbase.connector.flink.table.DataChangeRecordData; +import com.oceanbase.connector.flink.table.DataChangeRecord; import com.oceanbase.connector.flink.table.OceanBaseRowDataSerializationSchema; import com.oceanbase.connector.flink.table.TableId; import com.oceanbase.connector.flink.table.TableInfo; @@ -52,12 +52,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { connectorOptions.getSchemaName(), connectorOptions.getTableName()), physicalSchema)), - record -> - new DataChangeRecordData( - ((TableInfo) record.getTable()) - .getPrimaryKey().stream() - .map(record::getFieldValue) - .toArray()), + DataChangeRecord.KeyExtractor.simple(), new OceanBaseRecordFlusher(connectorOptions, connectionProvider))); } diff --git a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java index a9b3e3ea..22eceb66 100644 --- a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java +++ b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseConnectorITCase.java @@ -21,6 +21,7 @@ import com.oceanbase.connector.flink.dialect.OceanBaseMySQLDialect; import com.oceanbase.connector.flink.sink.OceanBaseRecordFlusher; import com.oceanbase.connector.flink.sink.OceanBaseSink; +import com.oceanbase.connector.flink.table.DataChangeRecord; import com.oceanbase.connector.flink.table.OceanBaseRowDataSerializationSchema; import com.oceanbase.connector.flink.table.OceanBaseTestData; import com.oceanbase.connector.flink.table.OceanBaseTestDataSerializationSchema; @@ -56,7 +57,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.junit.Assert.assertTrue; @@ -93,18 +93,7 @@ public void testMultipleTableSink() throws Exception { connectorOptions, null, new OceanBaseTestDataSerializationSchema(), - record -> - ((TableInfo) record.getTable()) - .getPrimaryKey().stream() - .map( - key -> { - Object value = - record.getFieldValue(key); - return value == null - ? "null" - : value.toString(); - }) - .collect(Collectors.joining("#")), + DataChangeRecord.KeyExtractor.simple(), new OceanBaseRecordFlusher( connectorOptions, new OceanBaseConnectionProvider(connectorOptions))); @@ -213,18 +202,7 @@ public void testDataStreamSink() throws Exception { connectorOptions.getSchemaName(), connectorOptions.getTableName()), physicalSchema)), - record -> - ((TableInfo) record.getTable()) - .getPrimaryKey().stream() - .map( - key -> { - Object value = - record.getFieldValue(key); - return value == null - ? "null" - : value.toString(); - }) - .collect(Collectors.joining("#")), + DataChangeRecord.KeyExtractor.simple(), new OceanBaseRecordFlusher(connectorOptions, connectionProvider)); List dataSet =