Experiment with Kafka-Connect with PostgreSQL Connector Source and MongoDb Connector sink. I'm using a raw confluentinc kafka connect image, so I'm added the plugins in the volumes section of the image.
- make a POST request to http://localhost:8083/connectors with the next configuration bodies
{
"name": "pg-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "commerce",
"topic.prefix": "commerce",
"table.include.list": "public.products"
}
}
{
"name": "mongodb-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "commerce.public.products",
"connection.uri": "mongodb://mongouser:mongopass@mongodb:27017",
"database": "commerce",
"collection": "products",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list": "id",
"document.id.strategy.partial.value.projection.type": "AllowList",
"transforms": "extractPayload,extractAfter",
"transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractPayload.field": "payload",
"transforms.extractAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractAfter.field": "after"
}
}
- Connect:
mongosh "mongodb://mongouser:mongopass@mongodb:27017/commerce?authSource=admin"
- see dbs:
show databases
- see collections:
show collections
- use a db:
use <db-name>
- find records:
db.<collection>.find()