Kafka Connect 2.0.1: Configuring in Distributed Mode

This section describes how to configure and run workers in distributed mode.

Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, multiple worker processes are started using the same group.id. These processes automatically coordinate to schedule execution of connectors and tasks across all available workers. If a worker is added, shuts down, or fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers.

Note: Distributed mode is available for MEP 2.0.1 and MEP 3.0.0.
The following diagrams illustrates a three-node Kafka Connect distributed mode cluster where:
  • Connectors are monitoring the source or sink system for changes that require reconfiguring tasks.
  • Tasks are automatically balanced across the active workers by copying a subset of a connector’s data.
  • The division of work between tasks is shown by the partitions that each task is assigned.

Interaction with a distributed-mode cluster is via the REST API (rather than on the command line). To create a connector, the workers are started and then REST request is made to create a connector. See Kafka Connect 2.0.1: REST API.

Note: Kafka Connect workers do not have a special “leader” process that you have to interact with to use the REST API. All nodes can respond to REST requests, including creating, listing, modifying, and destroying connectors.

To run the worker in distributed mode:

  1. In the connect-distributed.properties file, define the topics that will store the connector state, task configuration state, and connector offset state.

    In distributed mode, the workers need to be able to discover each other and have shared storage for connector configuration and offset data. In addition to the usual worker settings, ensure you have configured the following for the cluster:

    • group.id - ID that uniquely identifies the cluster these workers belong to. Ensure this is unique for all groups that work with a cluster.
    • config.storage.topic - Topic to store the connector and task configuration state in. Although this topic can be auto-created if your cluster has auto topic creation enabled, it is highly recommended that you create it before starting the cluster. This topic should always have a single partition and be highly replicated (3x or more).
    • offset.storage.topic - Topic to store the connector offset state in. To support large MapR-ES clusters, this topic should have a large number of partitions (for example, 25 or 50 partitions and highly replicated (3x or more).
    • rest.port - Port where the REST interface listens for HTTP requests. If you run more than one worker per host (for example, if you are testing distributed mode locally during development), this setting must have different values for each instance.
  2. Set the group.id value for all of the workers in the cluster.
    Note: All workers that belong to the same cluster must have the same group.id value.
  3. Run the distributed worker command, connect-distributed.sh.
    For example:
    cd /opt/mapr/kafka/kafka-0.9.0/
    ./bin/connect-distributed.sh 
    ./config/connect-distributed.properties
Note: >Distributed mode does not have any additional command line parameters. If other instances are already running, new workers either start a new group or join an existing one, and then wait for work to do. For information on managing the connectors running in the cluster, see Kafka Connect 2.0.1: REST API.