Kafka Connect 2.0.1: JDBC Examples: Using Whitelists and Custom Queries

This section provides common usage scenarios including using whitelists and custom queries.

Using Whitelists

Use a whitelist to limit changes to a subset of tables in a MySQL database, using id and modifiedcolumns that are standard on all whitelisted tables to detect rows that have been modified. This mode is the most robust because it can combine the unique, immutable row IDs with modification timestamps to guarantee modifications are not missed even if the process dies in the middle of an incremental update query.

The following is an example of a whitelist.

Note: Before running this example, you need to create the stream /kafka-connect
name=mysql-whitelist-timestamp-source 
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10  
connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=alice&password=secret 
table.whitelist=users,products,transactions  
mode=timestamp+incrementing 
timestamp.column.name=modified 
incrementing.column.name=id  
topic.prefix=/kafka-connect:mysql- 

Using Custom Queries

Use a custom query instead of loading tables to join data from multiple tables. As long as the query does not include its own filtering, you can still use the built-in modes for incremental queries (in this case, using a timestamp column).
Note: This limits you to a single output per connector and because there is no table name, the topic “prefix” is actually the full topic name in this case.

The following is an example of a custom query.

Note: Before running this example, you need to create the stream /kafka-connect
name=mysql-whitelist-timestamp-source 
        connector.class=io.confluent.connect.jdbc.JdbcSourceConnector 
        tasks.max=10  
        connection.url=jdbc:postgresql://postgres.example.com/test_db?user=bob&password=secret&ssl=true 
        query=SELECT users.id, 
        users.name, 
        transactions.timestamp, 
        transactions.user_id, 
        transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id) 
        mode=timestamp 
        timestamp.column.name=timestamp  
        topic.prefix=/kafka-connect:mysql-joined-data