Skip to content

Commit

Permalink
add KeyExtractor interface and util
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 24, 2024
1 parent a13684c commit 668baa6
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.oceanbase.connector.flink.OBKVHBaseConnectorOptions;
import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.HTableInfo;
import com.oceanbase.connector.flink.table.OBKVHBaseRowDataSerializationSchema;
import com.oceanbase.connector.flink.table.TableId;
Expand Down Expand Up @@ -51,7 +52,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
connectorOptions.getSchemaName(),
connectorOptions.getTableName()),
physicalSchema)),
null,
DataChangeRecord.KeyExtractor.simple(),
new OBKVHBaseRecordFlusher(connectionProvider)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.connection.CompatibleMode;
import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider;
import com.oceanbase.connector.flink.sink.OBKVHBaseRecordFlusher;
import com.oceanbase.connector.flink.sink.OceanBaseSink;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.HTableInfo;
import com.oceanbase.connector.flink.table.OBKVHBaseRowDataSerializationSchema;
import com.oceanbase.connector.flink.table.TableId;
Expand Down Expand Up @@ -89,10 +89,12 @@ protected Map<String, String> getOptions() {
public void before() throws Exception {
OBKVHBaseConnectorOptions options = new OBKVHBaseConnectorOptions(getOptions());
OBKVHBaseConnectionProvider connectionProvider = new OBKVHBaseConnectionProvider(options);
client =
connectionProvider.getHTableClient(
new TableId(
CompatibleMode.HBASE, OB_SERVER.getDatabaseName(), getTestTable()));
TableId tableId =
new TableId(
connectionProvider.getCompatibleMode(),
options.getSchemaName(),
options.getTableName());
client = connectionProvider.getHTableClient(tableId);
}

@After
Expand Down Expand Up @@ -153,7 +155,7 @@ public void testDataStreamSink() throws Exception {
connectorOptions.getSchemaName(),
connectorOptions.getTableName()),
physicalSchema)),
null,
DataChangeRecord.KeyExtractor.simple(),
new OBKVHBaseRecordFlusher(connectionProvider));

List<RowData> dataSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.util.function.SerializableFunction;

public class OceanBaseSink<T> implements Sink<T> {

Expand All @@ -32,14 +31,14 @@ public class OceanBaseSink<T> implements Sink<T> {
private final ConnectorOptions options;
private final TypeSerializer<T> typeSerializer;
private final RecordSerializationSchema<T> recordSerializer;
private final SerializableFunction<DataChangeRecord, Object> keyExtractor;
private final DataChangeRecord.KeyExtractor keyExtractor;
private final RecordFlusher recordFlusher;

public OceanBaseSink(
ConnectorOptions options,
TypeSerializer<T> typeSerializer,
RecordSerializationSchema<T> recordSerializer,
SerializableFunction<DataChangeRecord, Object> keyExtractor,
DataChangeRecord.KeyExtractor keyExtractor,
RecordFlusher recordFlusher) {
this.options = options;
this.typeSerializer = typeSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.SerializableFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,7 +49,7 @@ public class OceanBaseWriter<T> implements SinkWriter<T> {
private final SinkWriterMetricGroup metricGroup;
private final TypeSerializer<T> typeSerializer;
private final RecordSerializationSchema<T> recordSerializer;
private final SerializableFunction<DataChangeRecord, Object> keyExtractor;
private final DataChangeRecord.KeyExtractor keyExtractor;
private final RecordFlusher recordFlusher;

private final List<SchemaChangeRecord> schemaChangeRecords = new ArrayList<>(1);
Expand All @@ -70,7 +69,7 @@ public OceanBaseWriter(
Sink.InitContext initContext,
TypeSerializer<T> typeSerializer,
RecordSerializationSchema<T> recordSerializer,
SerializableFunction<DataChangeRecord, Object> keyExtractor,
DataChangeRecord.KeyExtractor keyExtractor,
RecordFlusher recordFlusher) {
this.options = options;
this.metricGroup = initContext.metricGroup();
Expand Down Expand Up @@ -119,12 +118,11 @@ private void addToBuffer(Record record) {
if (record instanceof SchemaChangeRecord) {
synchronized (schemaChangeRecords) {
schemaChangeRecords.add((SchemaChangeRecord) record);
bufferCount++;
metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
}
} else if (record instanceof DataChangeRecord) {
DataChangeRecord dataChangeRecord = (DataChangeRecord) record;
Object key = keyExtractor == null ? null : keyExtractor.apply(dataChangeRecord);
Object key = keyExtractor == null ? null : keyExtractor.extract(dataChangeRecord);
if (key == null) {
synchronized (dataChangeRecordBuffer) {
dataChangeRecordBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.oceanbase.connector.flink.table;

import java.io.Serializable;

public class DataChangeRecord implements Record {

private static final long serialVersionUID = 1L;
Expand All @@ -25,6 +27,19 @@ public enum Type {
DELETE,
}

public interface KeyExtractor extends Serializable {

Object extract(DataChangeRecord record);

static KeyExtractor simple() {
return record ->
new DataChangeRecordData(
record.getTable().getKey().stream()
.map(record::getFieldValue)
.toArray());
}
}

private final Table table;
private final Type type;
private final DataChangeRecordData data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -118,6 +119,11 @@ public TableId getTableId() {
return tableId;
}

@Override
public List<String> getKey() {
return Collections.singletonList(rowKeyName);
}

public String getRowKeyName() {
return rowKeyName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package com.oceanbase.connector.flink.table;

import java.io.Serializable;
import java.util.List;

public interface Table extends Serializable {

TableId getTableId();

List<String> getKey();

Integer getFieldIndex(String fieldName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public TableId getTableId() {
return tableId;
}

@Override
public List<String> getKey() {
return primaryKey;
}

@Override
public Integer getFieldIndex(String fieldName) {
return fieldIndexMap.get(fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.oceanbase.connector.flink;

import com.oceanbase.connector.flink.connection.CompatibleMode;
import com.oceanbase.connector.flink.utils.OptionUtils;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -89,12 +90,16 @@ public class OceanBaseConnectorOptions extends ConnectorOptions {

public OceanBaseConnectorOptions(Map<String, String> config) {
super(config);
CompatibleMode compatibleMode = getCompatibleMode();
if (CompatibleMode.MYSQL != compatibleMode && CompatibleMode.ORACLE != compatibleMode) {
throw new IllegalArgumentException("Unsupported compatible mode: " + compatibleMode);
}
if (getUrl().contains("mysql")) {
if (!getDriverClassName().contains("mysql")) {
throw new IllegalArgumentException(
"Wrong 'driver-class-name', should use mysql driver for url: " + getUrl());
}
if (!getCompatibleMode().equalsIgnoreCase("mysql")) {
if (!CompatibleMode.MYSQL.equals(compatibleMode)) {
throw new IllegalArgumentException(
"Wrong 'compatible-mode', the mysql driver can only be used on 'mysql' mode.");
}
Expand All @@ -114,8 +119,8 @@ public OceanBaseConnectorOptions(Map<String, String> config) {
}
}

public String getCompatibleMode() {
return allConfig.get(COMPATIBLE_MODE);
public CompatibleMode getCompatibleMode() {
return CompatibleMode.parse(allConfig.get(COMPATIBLE_MODE));
}

public String getDriverClassName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public String toString() {

public OceanBaseConnectionProvider(OceanBaseConnectorOptions options) {
this.options = options;
this.compatibleMode = CompatibleMode.parse(options.getCompatibleMode());
this.compatibleMode = options.getCompatibleMode();
this.dialect =
CompatibleMode.MYSQL.equals(compatibleMode)
? new OceanBaseMySQLDialect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.oceanbase.connector.flink.OceanBaseConnectorOptions;
import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider;
import com.oceanbase.connector.flink.table.DataChangeRecordData;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.OceanBaseRowDataSerializationSchema;
import com.oceanbase.connector.flink.table.TableId;
import com.oceanbase.connector.flink.table.TableInfo;
Expand Down Expand Up @@ -52,12 +52,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
connectorOptions.getSchemaName(),
connectorOptions.getTableName()),
physicalSchema)),
record ->
new DataChangeRecordData(
((TableInfo) record.getTable())
.getPrimaryKey().stream()
.map(record::getFieldValue)
.toArray()),
DataChangeRecord.KeyExtractor.simple(),
new OceanBaseRecordFlusher(connectorOptions, connectionProvider)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.oceanbase.connector.flink.dialect.OceanBaseMySQLDialect;
import com.oceanbase.connector.flink.sink.OceanBaseRecordFlusher;
import com.oceanbase.connector.flink.sink.OceanBaseSink;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.OceanBaseRowDataSerializationSchema;
import com.oceanbase.connector.flink.table.OceanBaseTestData;
import com.oceanbase.connector.flink.table.OceanBaseTestDataSerializationSchema;
Expand Down Expand Up @@ -56,7 +57,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -93,18 +93,7 @@ public void testMultipleTableSink() throws Exception {
connectorOptions,
null,
new OceanBaseTestDataSerializationSchema(),
record ->
((TableInfo) record.getTable())
.getPrimaryKey().stream()
.map(
key -> {
Object value =
record.getFieldValue(key);
return value == null
? "null"
: value.toString();
})
.collect(Collectors.joining("#")),
DataChangeRecord.KeyExtractor.simple(),
new OceanBaseRecordFlusher(
connectorOptions,
new OceanBaseConnectionProvider(connectorOptions)));
Expand Down Expand Up @@ -213,18 +202,7 @@ public void testDataStreamSink() throws Exception {
connectorOptions.getSchemaName(),
connectorOptions.getTableName()),
physicalSchema)),
record ->
((TableInfo) record.getTable())
.getPrimaryKey().stream()
.map(
key -> {
Object value =
record.getFieldValue(key);
return value == null
? "null"
: value.toString();
})
.collect(Collectors.joining("#")),
DataChangeRecord.KeyExtractor.simple(),
new OceanBaseRecordFlusher(connectorOptions, connectionProvider));

List<RowData> dataSet =
Expand Down

0 comments on commit 668baa6

Please sign in to comment.