Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
CC-231: Add JDBC Sink Connector documentation
Browse files Browse the repository at this point in the history
Restructue the docs to separate pages for the source and the sink

Also adding some 0.10.0 fields to the ConfigDef
  • Loading branch information
shikhar committed Aug 1, 2016
1 parent babe78d commit ce2ecfe
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 51 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Kafka Connect JDBC Connector

kafka-connect-jdbc is a [Kafka Connector](http://kafka.apache.org/documentation.html#connect)
for loading data from any JDBC-compatible database.
for loading data to and from any JDBC-compatible database.

# Development

Expand Down
30 changes: 30 additions & 0 deletions config/sink-quickstart-sqlite.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright 2016 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A simple example that copies from a topic to a SQLite database.
# The first few settings are required for all connectors:
# a name, the connector class to run, and the maximum number of tasks to create:
name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1

# The topics to consume from - required for sink connectors like this one
topics=orders

# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
connection.url=jdbc:sqlite:test.db
auto.create=true
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-sqlite-jdbc-autoincrement
name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC connector. In this example, we connect to a
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
Expand Down
6 changes: 4 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Contents:
.. toctree::
:maxdepth: 3

jdbc_connector
configuration_options
source_connector
source_config_options
sink_connector
sink_config_options
changelog
146 changes: 146 additions & 0 deletions docs/sink_config_options.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
.. _sink-config-options:

JDBC Sink Configuration Options
-------------------------------

Connection
^^^^^^^^^^

``connection.url``
JDBC connection URL.

* Type: string
* Default: ""
* Importance: high

``connection.user``
JDBC connection user.

* Type: string
* Importance: high

``connection.password``
JDBC connection password.

* Type: password
* Importance: high

Writes
^^^^^^

``table.name.format``
A format string for the destination table name, which may contain '${topic}' as a placeholder for the originating topic name.

For example, ``kafka_${topic}`` for the topic 'orders' will map to the table name 'kafka_orders'.

* Type: string
* Default: "${topic}"
* Importance: medium

``batch.size``
Specifies how many records to attempt to batch together for insertion, when possible.

* Type: int
* Default: 3000
* Importance: high


``insert.mode``
The insertion mode to use. Supported modes are:

`insert`

Use standard SQL ``INSERT`` statements.

`upsert`

Use the appropriate upsert semantics for the target database if it is supported by the connector, e.g. ``INSERT OR IGNORE``.

* Type: string
* Default: "insert"
* Importance: medium

.. _sink-pk-config-options:

Primary Keys
^^^^^^^^^^^^

``pk.mode``
The primary key mode, also refer to ``pk.fields`` documentation for interplay. Supported modes are:

`none`

No keys utilized.

`kafka`

Kafka coordinates are used as the PK.

`record_key`

Field(s) from the record key are used, which may be a primitive or a struct.

`record_value`

Field(s) from the record value are used, which must be a struct.

* Type: string
* Default: "none"
* Importance: high

``pk.fields``
List of comma-separated primary key field names. The runtime interpretation of this config depends on the ``pk.mode``:

`none`

Ignored as no fields are used as primary key in this mode.

`kafka`

Must be a trio representing the Kafka coordinates, defaults to ``__connect_topic,__connect_partition,__connect_offset`` if empty.

`record_key`

If empty, all fields from the key struct will be used, otherwise used to whitelist the desired fields - for primitive key only a single field name must be configured.

`record_value`

If empty, all fields from the value struct will be used, otherwise used to whitelist the desired fields.

* Type: list
* Default: []
* Importance: medium

DDL Support
^^^^^^^^^^^

``auto.create``
Whether to automatically create the destination table based on record schema if it is found to be missing by issuing ``CREATE``.

* Type: boolean
* Default: false
* Importance: medium

``auto.evolve``
Whether to automatically dd columns in the table schema when found to be missing relative to the record schema by issuing ``ALTER``.

* Type: boolean
* Default: false
* Importance: medium

Retries
^^^^^^^

``max.retries``
The maximum number of times to retry on errors before failing the task.

* Type: int
* Default: 10
* Importance: medium

``retry.backoff.ms``
The time in milliseconds to wait following an error before a retry attempt is made.

* Type: int
* Default: 3000
* Importance: medium
152 changes: 152 additions & 0 deletions docs/sink_connector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
JDBC Sink Connector
===================

The JDBC sink connector allows you to export data from Kafka topics to any relational database with a JDBC driver.
By using JDBC, this connector can support a wide variety of databases without requiring a dedicated connector for each one.
The connector polls data from Kafka to write to the database based on the topics subscription.
It is possible to achieve idempotent writes with upserts.
Auto-creation of tables, and limited auto-evolution is also supported.

Quickstart
----------

To see the basic functionality of the connector, we'll be copying Avro data from a single topic to a local SQLite database.
This example assumes you are running Kafka and Schema Registry locally on the default ports.

.. note::
We use SQLite in these examples, but you can use your favorite database.
Follow the same steps, but adjust the ``connection.url`` setting for your database.
Confluent Platform includes JDBC drivers for SQLite and PostgreSQL,
but if you're using a different database you'll also need to make sure the JDBC driver is available on the Kafka Connect process's ``CLASSPATH``.

Let's create a configuration file for the connector.
This file is included with the connector in ``./etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties`` and contains the following settings::

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=orders
connection.url=jdbc:sqlite:test.db
auto.create=true

The first few settings are common settings you'll specify for all connectors, except for ``topics`` which is specific to sink connectors like this one.
The ``connection.url`` specifies the database to connect to, in this case a local SQLite database file.
Enabling ``auto.create`` allows us to rely on the connector for creating the table.

Now we can run the connector with this configuration.

.. sourcecode:: bash

$ ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-sqlite.properties

Now, we will produce a record into the `orders` topic.

.. sourcecode:: bash

$ bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price",
"type": "float"}]}'

The console producer is waiting for input. Copy and paste the following record into the terminal:

.. sourcecode:: bash

{"id": 999, "product": "foo", "quantity": 100, "price": 50}

Now if we query the database, we will see that the `orders` table was automatically created and contains the record.

.. sourcecode:: bash

$ sqlite3 test.db
sqlite> select * from orders;
foo|50.0|100|999

Features
--------

Data mapping
^^^^^^^^^^^^

The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with the schema registry, or the JSON converter with schemas enabled.
Kafka record keys if present can be primitive types or a Connect struct, and the record value must be a Connect struct.
Fields being selected from Connect structs must be of primitive types.
If the data in the topic is not of a compatible format, implementing a custom ``Converter`` may be necessary.

Key handling
^^^^^^^^^^^^

The default is for primary keys to not be extracted with ``pk.mode`` set to `none`,
which is not suitable for advanced usage such as upsert semantics and when the connector is responsible for auto-creating the destination table.
There are different modes that enable to use fields from the Kafka record key, the Kafka record value, or the Kafka coordinates for the record.

Refer to :ref:`primary key configuration options <sink-pk-config-options>` for further detail.

Idempotent writes
^^^^^^^^^^^^^^^^^

The default ``insert.mode`` is `insert`. If it is configured as `upsert`, the connector will use upsert semantics rather than plain `INSERT` statements.
Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, which provides idempotence.

If there are failures, the Kafka offset used for recovery may not be up-to-date with what was committed as of the time of the failure, which can lead to re-processing during recovery.
The upsert mode is highly recommended as it helps avoid constraint violations or duplicate data if records need to be re-processed.

Aside from failure recovery, the source topic may also naturally contain multiple records over time with the same primary key, making upserts desirable.

As there is no standard syntax for upsert, the following table describes the database-specific DML that is used.

=========== ================================================
Database Upsert style
=========== ================================================
MySQL `INSERT .. ON DUPLICATE KEY REPLACE ..`
Oracle `MERGE ..`
PostgreSQL `INSERT .. ON CONFLICT .. DO UPDATE SET ..`
SQLite `INSERT OR REPLACE ..`
SQL Server `MERGE ..`
Other *not supported*
=========== ================================================

Auto-creation and Auto-evoluton
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. tip:: Make sure the JDBC user has the appropriate permissions for DDL.

If ``auto.create`` is enabled, the connector can `CREATE` the destination table if it is found to be missing.
The creation takes place online with records being consumed from the topic, since the connector uses the record schema as a basis for the table definition.
Primary keys are specified based on the key configuration settings.

If ``auto.evolve`` is enabled, the connector can perform limited auto-evolution by issuing `ALTER` on the destination table when it encounters a record for which a column is found to be missing.
Since data-type changes and removal of columns can be dangerous, the connector does not attempt to perform such evolutions on the table.
Addition of primary key constraints is also not attempted.

For both auto-creation and auto-evolution, the nullability of a column is based on the optionality of the corresponding field in the schema,
and default values are also specified based on the default value of the corresponding field if applicable.
We use the following mapping from Connect schema types to database-specific types:

+-------------+-----------------+-----------------+------------------+---------+----------------+
| Schema Type | MySQL | Oracle | PostgreSQL | SQLite | SQL Server |
+=============+=================+=================+==================+=========+================+
| INT8 | TINYINT | NUMBER(3,0) | SMALLINT | NUMERIC | TINYINT |
+-------------+-----------------+-----------------+------------------+---------+----------------+
| INT16 | SMALLINT | NUMBER(5,0) | SMALLINT | NUMERIC | SMALLINT |
+-------------+-----------------+-----------------+------------------+---------+----------------+
| INT32 | INT | NUMBER(10,0) | INT | NUMERIC | INT |
+-------------+-----------------+-----------------+------------------+---------+----------------+
| INT64 | BIGINT | NUMBER(19,0) | BIGINT | NUMERIC | BIGINT |
+-------------+-----------------+-----------------+------------------+---------+----------------+
| FLOAT32 | FLOAT | BINARY_FLOAT | REAL | REAL | REAL |
+-------------+-----------------+-----------------+------------------+---------+----------------+
| FLOAT64 | DOUBLE | BINARY_DOUBLE | DOUBLE PRECISION | REAL | FLOAT |
+-------------+-----------------+-----------------+------------------+---------+----------------+
| BOOLEAN | TINYINT | NUMBER(1,0) | BOOLEAN | NUMERIC | BIT |
+-------------+-----------------+-----------------+------------------+---------+----------------+
| STRING | VARCHAR(256) | NVARCHAR2(4000) | TEXT | TEXT | VARCHAR(MAX) |
+-------------+-----------------+-----------------+------------------+---------+----------------+
| BYTES | VARBINARY(1024) | BLOB | BYTEA | BLOB | VARBINARY(MAX) |
+-------------+-----------------+-----------------+------------------+---------+----------------+

Auto-creation or auto-evolution is not supported for databases not mentioned here.

.. important::
For backwards-compatible table schema evolution, new fields in record schemas should be optional or have a default value. If you need to delete a field, the table schema should be manually
altered to either drop the corresponding column, assign it a default value, or make it nullable.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Configuration Options
---------------------
JDBC Source Configuration Options
---------------------------------

``connection.url``
JDBC connection URL for the database to load.
Expand Down
Loading

0 comments on commit ce2ecfe

Please sign in to comment.