Skip to content

Commit

Permalink
fix : add sync-write option & fix tableId and buffer key conflicts (#51)
Browse files Browse the repository at this point in the history
* fix : add sync-write option and fix tableId and buffer key conflicts

* check non-pk table record type

* disable scheduled flushing when buffer-flush.interval is zero value
  • Loading branch information
whhe authored Jan 26, 2024
1 parent 0177cf0 commit 7554fb6
Show file tree
Hide file tree
Showing 28 changed files with 465 additions and 259 deletions.
3 changes: 2 additions & 1 deletion docs/sink/flink-connector-obkv-hbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ Once executed, the records should have been written to OceanBase.
| sys.username | Yes | | String | The username of sys tenant. |
| sys.password | Yes | | String | The password of sys tenant. |
| hbase.properties | No | | String | Properties to configure 'obkv-hbase-client-java', multiple values are separated by semicolons. |
| buffer-flush.interval | No | 1s | Duration | Buffer flush interval. |
| sync-write | No | false | Boolean | Whether to write data synchronously, will not use buffer if it's set to 'true'. |
| buffer-flush.interval | No | 1s | Duration | Buffer flush interval. Set '0' to disable scheduled flushing. |
| buffer-flush.buffer-size | No | 1000 | Integer | Buffer size. |
| max-retries | No | 3 | Integer | Max retry times on failure. |

Expand Down
27 changes: 14 additions & 13 deletions docs/sink/flink-connector-obkv-hbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,20 @@ VALUES ('1', ROW ('r1f1c1', 'r1f1c2')),

## 配置项

| 参数名 | 是否必需 | 默认值 | 类型 | 描述 |
|--------------------------|------|------|----------|---------------------------------------------------------------------------|
| url || | String | 集群的 config url,可以通过 <code>SHOW PARAMETERS LIKE 'obconfig_url'</code> 查询。 |
| schema-name || | String | OceanBase 的 db 名。 |
| table-name || | String | HBase 表名,注意在 OceanBase 中表名的结构是 <code>hbase_table_name$family_name</code>。 |
| username || | String | 非 sys 租户的用户名。 |
| password || | String | 非 sys 租户的密码。 |
| sys.username || | String | sys 租户的用户名。 |
| sys.password || | String | sys 租户用户的密码。 |
| hbase.properties || | String | 配置 'obkv-hbase-client-java' 的属性,多个值用分号分隔。 |
| buffer-flush.interval || 1s | Duration | 缓冲区刷新周期。 |
| buffer-flush.buffer-size || 1000 | Integer | 缓冲区大小。 |
| max-retries || 3 | Integer | 失败重试次数。 |
| 参数名 | 是否必需 | 默认值 | 类型 | 描述 |
|--------------------------|------|-------|----------|---------------------------------------------------------------------------|
| url || | String | 集群的 config url,可以通过 <code>SHOW PARAMETERS LIKE 'obconfig_url'</code> 查询。 |
| schema-name || | String | OceanBase 的 db 名。 |
| table-name || | String | HBase 表名,注意在 OceanBase 中表名的结构是 <code>hbase_table_name$family_name</code>。 |
| username || | String | 非 sys 租户的用户名。 |
| password || | String | 非 sys 租户的密码。 |
| sys.username || | String | sys 租户的用户名。 |
| sys.password || | String | sys 租户用户的密码。 |
| hbase.properties || | String | 配置 'obkv-hbase-client-java' 的属性,多个值用分号分隔。 |
| sync-write || false | Boolean | 是否开启同步写,设置为 true 时将不使用 buffer 直接写入数据库。 |
| buffer-flush.interval || 1s | Duration | 缓冲区刷新周期。设置为 '0' 时将关闭定期刷新。 |
| buffer-flush.buffer-size || 1000 | Integer | 缓冲区大小。 |
| max-retries || 3 | Integer | 失败重试次数。 |

## 参考信息

Expand Down
3 changes: 2 additions & 1 deletion docs/sink/flink-connector-oceanbase.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ Once executed, the records should have been written to OceanBase.
| cluster-name | No | No | | String | The cluster name of OceanBase, required when 'partition.enabled' is 'true'. |
| tenant-name | No | No | | String | The tenant name of OceanBase, required when 'partition.enabled' is 'true'. |
| druid-properties | No | No | | String | Druid connection pool properties, multiple values are separated by semicolons. |
| buffer-flush.interval | No | No | 1s | Duration | Buffer flush interval. |
| sync-write | No | No | false | Boolean | Whether to write data synchronously, will not use buffer if it's set to 'true'. |
| buffer-flush.interval | No | No | 1s | Duration | Buffer flush interval. Set '0' to disable scheduled flushing. |
| buffer-flush.buffer-size | No | No | 1000 | Integer | Buffer size. |
| max-retries | No | No | 3 | Integer | Max retry times on failure. |
| memstore-check.enabled | No | No | true | Boolean | Whether enable memstore check. |
Expand Down
3 changes: 2 additions & 1 deletion docs/sink/flink-connector-oceanbase_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ VALUES (1, 'Tom', 99),
| cluster-name ||| | String | 集群名,'partition.enabled' 为 true 时为必填。 |
| tenant-name ||| | String | 租户名,'partition.enabled' 为 true 时为必填。 |
| druid-properties ||| | String | Druid 连接池属性,多个值用分号分隔。 |
| buffer-flush.interval ||| 1s | Duration | 缓冲区刷新周期。 |
| sync-write ||| false | Boolean | 是否开启同步写,设置为 true 时将不使用 buffer 直接写入数据库。 |
| buffer-flush.interval ||| 1s | Duration | 缓冲区刷新周期。设置为 '0' 时将关闭定期刷新。 |
| buffer-flush.buffer-size ||| 1000 | Integer | 缓冲区大小。 |
| max-retries ||| 3 | Integer | 失败重试次数。 |
| memstore-check.enabled ||| true | Boolean | 是否开启内存检查。 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(OBKVHBaseConnectorOptions.SYNC_WRITE);
options.add(OBKVHBaseConnectorOptions.BUFFER_FLUSH_INTERVAL);
options.add(OBKVHBaseConnectorOptions.BUFFER_SIZE);
options.add(OBKVHBaseConnectorOptions.MAX_RETRIES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.oceanbase.connector.flink.connection;

import com.oceanbase.connector.flink.OBKVHBaseConnectorOptions;
import com.oceanbase.connector.flink.table.TableId;
import com.oceanbase.connector.flink.utils.TableCache;

import com.alipay.oceanbase.hbase.OHTableClient;
Expand Down Expand Up @@ -49,15 +50,16 @@ private TableCache<HTableInterface> getHTableCache() {
return hTableCache;
}

public HTableInterface getHTableClient(String databaseName, String tableName) {
String tableId = databaseName + "." + tableName;
public HTableInterface getHTableClient(TableId tableId) {
return getHTableCache()
.get(
tableId,
tableId.identifier(),
() -> {
try {
OHTableClient tableClient =
new OHTableClient(tableName, getConfig(databaseName));
new OHTableClient(
tableId.getTableName(),
getConfig(tableId.getSchemaName()));
tableClient.init();
return tableClient;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.oceanbase.connector.flink.sink;

import com.oceanbase.connector.flink.OBKVHBaseConnectorOptions;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.HTableInfo;
import com.oceanbase.connector.flink.table.OBKVHBaseRowDataSerializationSchema;
import com.oceanbase.connector.flink.table.TableId;

import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
Expand All @@ -42,10 +44,11 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
typeSerializer,
new OBKVHBaseRowDataSerializationSchema(
new HTableInfo(
connectorOptions.getSchemaName(),
connectorOptions.getTableName(),
new TableId(
connectorOptions.getSchemaName(),
connectorOptions.getTableName()),
physicalSchema)),
null,
DataChangeRecord.KeyExtractor.simple(),
new OBKVHBaseRecordFlusher(connectorOptions)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,17 @@

public class OBKVHBaseRecordFlusher implements RecordFlusher {

private final OBKVHBaseConnectorOptions options;
private final OBKVHBaseConnectionProvider connectionProvider;

public OBKVHBaseRecordFlusher(OBKVHBaseConnectorOptions options) {
this.connectionProvider = new OBKVHBaseConnectionProvider(options);
this(options, new OBKVHBaseConnectionProvider(options));
}

public OBKVHBaseRecordFlusher(
OBKVHBaseConnectorOptions options, OBKVHBaseConnectionProvider connectionProvider) {
this.options = options;
this.connectionProvider = connectionProvider;
}

@Override
Expand Down Expand Up @@ -98,8 +105,7 @@ public void flush(List<DataChangeRecord> batch) throws Exception {
}

flush(
connectionProvider.getHTableClient(
tableInfo.getDatabaseName(), tableInfo.getTableName()),
connectionProvider.getHTableClient(tableInfo.getTableId()),
familyPutListMap,
familyDeleteListMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider;
import com.oceanbase.connector.flink.sink.OBKVHBaseRecordFlusher;
import com.oceanbase.connector.flink.sink.OceanBaseSink;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.HTableInfo;
import com.oceanbase.connector.flink.table.OBKVHBaseRowDataSerializationSchema;
import com.oceanbase.connector.flink.table.TableId;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -87,7 +89,8 @@ protected Map<String, String> getOptions() {
public void before() throws Exception {
OBKVHBaseConnectorOptions options = new OBKVHBaseConnectorOptions(getOptions());
OBKVHBaseConnectionProvider connectionProvider = new OBKVHBaseConnectionProvider(options);
client = connectionProvider.getHTableClient(OB_SERVER.getDatabaseName(), getTestTable());
TableId tableId = new TableId(options.getSchemaName(), options.getTableName());
client = connectionProvider.getHTableClient(tableId);
}

@After
Expand Down Expand Up @@ -141,10 +144,11 @@ public void testDataStreamSink() throws Exception {
null,
new OBKVHBaseRowDataSerializationSchema(
new HTableInfo(
connectorOptions.getSchemaName(),
connectorOptions.getTableName(),
new TableId(
connectorOptions.getSchemaName(),
connectorOptions.getTableName()),
physicalSchema)),
null,
DataChangeRecord.KeyExtractor.simple(),
new OBKVHBaseRecordFlusher(connectorOptions));

List<RowData> dataSet =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,19 @@ public abstract class ConnectorOptions implements Serializable {
.noDefaultValue()
.withDescription("The table name.");

public static final ConfigOption<Boolean> SYNC_WRITE =
ConfigOptions.key("sync-write")
.booleanType()
.defaultValue(false)
.withDescription("Whether to write synchronously.");

public static final ConfigOption<Duration> BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
"The flush interval, over this time, asynchronous threads will flush data. Default value is '1s'.");
"The flush interval, over this time, asynchronous threads will flush data. Default value is '1s'. "
+ "If it's set to zero value like '0', scheduled flushing will be disabled.");

public static final ConfigOption<Integer> BUFFER_SIZE =
ConfigOptions.key("buffer-flush.buffer-size")
Expand Down Expand Up @@ -103,6 +110,10 @@ public String getTableName() {
return allConfig.get(TABLE_NAME);
}

public boolean getSyncWrite() {
return allConfig.get(SYNC_WRITE);
}

public long getBufferFlushInterval() {
return allConfig.get(BUFFER_FLUSH_INTERVAL).toMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.util.function.SerializableFunction;

public class OceanBaseSink<T> implements Sink<T> {

Expand All @@ -32,14 +31,14 @@ public class OceanBaseSink<T> implements Sink<T> {
private final ConnectorOptions options;
private final TypeSerializer<T> typeSerializer;
private final RecordSerializationSchema<T> recordSerializer;
private final SerializableFunction<DataChangeRecord, String> keyExtractor;
private final DataChangeRecord.KeyExtractor keyExtractor;
private final RecordFlusher recordFlusher;

public OceanBaseSink(
ConnectorOptions options,
TypeSerializer<T> typeSerializer,
RecordSerializationSchema<T> recordSerializer,
SerializableFunction<DataChangeRecord, String> keyExtractor,
DataChangeRecord.KeyExtractor keyExtractor,
RecordFlusher recordFlusher) {
this.options = options;
this.typeSerializer = typeSerializer;
Expand Down
Loading

0 comments on commit 7554fb6

Please sign in to comment.