In this project we are going to sync a table in Postgresql with ElasticSerach index, such that every change in the table will change the index too. For the sync we will use kafka connect.
dependencies:
- Docker
- Curl or any other tool for sending http requests
Clone repo, cd into directory, and checkout appropriate branch.
From the kafka-connect-CDC-example directory:
$ docker-compose up -d
And that is it! Wait a few minute and then you can check that all the services are up.
I recommends to enter the Control Center and see that everithing is OK. it should look like this:
Note that the cluster mark as "Unhealthy" because it has less then 3 brokers.
Next let's verify that postgres and ES started fine.
For the ES use $ curl http://localhost:9200
.
For postgres we will try to connect the DB using another container:
$ docker run -it --network kafkaconnectcdcexample_default --name psql \
--link postgres:postgres --rm postgres sh -c 'psql -h postgres -U postgres -d cdc';
Password for user postgres:
Enter postgres
as password.
psql (12.3 (Debian 12.3-1.pgdg100+1), server 11.8 (Debian 11.8-1.pgdg90+1))
Type "help" for help.
cdc=#
We going to have 2 connectors. One is a source who read from Postgres, the other one is a sink connector who insert the data into ES. To create them we will use curl:
Postgres source:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" -d @postgres-source.json http://localhost:8083/connectors;
And ES sink:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" -d @es-sink.json http://localhost:8083/connectors;
Finally we can try adding and editing records and see the changes in our ES. Let's connect to the Postgres cluster:
$ docker run -it --network kafkaconnectcdcexample_default --name psql \
--link postgres:postgres --rm postgres sh -c 'psql -h postgres -U postgres -d cdc';
Password for user postgres:
Enter postgres
as password.
psql (12.3 (Debian 12.3-1.pgdg100+1), server 11.8 (Debian 11.8-1.pgdg90+1))
Type "help" for help.
cdc=#
next, create the table 'customers' and add some records:
CREATE TABLE customers (
user_id serial PRIMARY KEY,
username VARCHAR ( 50 ) UNIQUE NOT NULL,
password VARCHAR ( 50 ) NOT NULL,
email VARCHAR ( 255 ) UNIQUE NOT NULL,
created_on TIMESTAMP NOT NULL,
last_login TIMESTAMP
);
insert into customers (username,password,email,created_on,last_login)
values ('user1','pass3','[email protected]',current_timestamp,current_timestamp),
('user2','pass3','[email protected]',current_timestamp,current_timestamp),
('david','pass4','[email protected]',current_timestamp,current_timestamp);
Now, we can see them in ES:
curl http://localhost:9200/dbserver.public.customers/_search?pretty
And if we change soome records:
update customers
set password='newpass2'
where username='david';
delete from customers
where username='user1';
The records will be updated in our ES!
service | address |
---|---|
Control center | http://localhost:9021 |
Kafka | http://localhost:9092 |
Zookeeper | http://localhost:2181 |
Kafka-Connect | http://localhost:8083 |
Postgres | http://localhost:5432 |
ElasticSearch | http://localhost:9200 |