Kafka Connect 2.0.1: HDFS Configuration Options

Use the following parameters to configure the Kafka Connect for MapR-ES HDFS connector.

Note: For the HDFS connector, both Avro and Parquet files can be written.

In standalone mode, specify the HDFS connector configuration in the quickstart-hdfs.properties file. You can also configure the offset storage location and the port for the REST interface, which are specified in the connect-standalone.properties file. See Kafka Connect 2.0.1: Configuring in Standalone Mode.

/opt/mapr/kafka-connect-hdfs/kafka-connect-hdfs-2.0.1/etc/kafka-connect-hdfs/quickstart-hdfs.properties
/opt/mapr/kafka/kafka-0.9.0/config/connect-standalone.properties
In distributed mode, HDFS connector configuration is provided in the POST and PUT requests when creating or modifying the connector. See Kafka Connect 2.0.1: POST /connectors and Kafka Connect 2.0.1: PUT /connectors/(string:name)/config for more information about using the REST API. Additional configurations such as defining the topics that will store the connector state, task configuration state, and connector offset state are specified in the connect-distributed.properties file. See Kafka Connect 2.0.1: Configuring in Distributed Mode .
/opt/mapr/kafka/kafka-0.9.0/config/connect-distributed.properties
Table 1. HDFS Configuration Parameters
Parameter Description

flush.size

Number of records written to MapR file system before invoking file commits.

  • Type: int
  • Default: ""

hdfs.url

The MapR file system connection URL. This configuration has the format of maprfs:://hostname:port and specifies the MapR file system to export data to.

  • Type: string
  • Default: ""

connect.hdfs.keytab

The path to the keytab file for the HDFS connector principal. This keytab file should only be readable by the connector user.

  • Type: string
  • Default: ""

connect.hdfs.principal

The principal used when MapR file system is using Kerberos for authentication.

  • Type: string
  • Default: ""

format.class

The format class used when writing data to MapR file system.

  • Type: string
  • Default: "io.confluent.connect.hdfs.avro.AvroFormat"
    Note: If you want to write to a Parquet set, use "io.confluent.connect.hdfs.parquet.ParquetFormat"

hadoop.conf.dir

The Hadoop configuration directory.

  • Type: string
  • Default: ""

hadoop.home

The Hadoop home directory.

  • Type: string
  • Default: ""

hdfs.authentication.kerberos

Specifies whether MapR file system uses Kerberos for authentication.

  • Type: boolean
  • Default: false

hdfs.namenode.principal

The Kerberos principal for CLDB.

  • Type: string
  • Default: ""

hive.conf.dir

The Hive configuration directory.

  • Type: string
  • Default: ""

hive.database

The database used when the connector creates tables in Hive.

  • Type: string
  • Default: "default"

hive.home

The Hive home directory.

  • Type: string
  • Default: ""

hive.integration

Specifies whether Hive is integrated when running the connector.

  • Type: boolean
  • Default: false

hive.metastore.uris

The Hive metastore URIs. Can be an IP address or fully-qualified domain name and port of the metastore host.

  • Type: string
  • Default: ""

logs.dir

Top-level MapR file system directory to store the write ahead logs.

  • Type: string
  • Default: "logs"

partitioner.class

The partitioner used when writing data to MapR file system. You can use DefaultPartitioner, which preserves the Kafka partitions; FieldPartitioner, which partitions the data to different directories according to the value of the partitioning field specified in partition.field.name; TimeBasedPartitioner, which partitions data according to the time ingested to MapR file system.

  • Type: string
  • Default: "io.confluent.connect.hdfs.partitioner.DefaultPartitioner"

rotate.interval.ms

The time interval (milliseconds) before invoking file commits. This configuration ensures that file commits are invoked every configured interval. This configuration is useful when data ingestion rate is low and the connector didn't write enough messages to commit files. The default value -1 means that this feature is disabled.

  • Type: long
  • Default: -1

schema.compatibility

The schema compatibility rule used when the connector is observing schema changes. The supported configurations are NONE, BACKWARD, FORWARD and FULL.

  • Type: string
  • Default: "NONE"

topics

A list of topics to use as input for the HDFS connector.
  • Type: string
  • Default: ""

topics.dir

Top-level MapR file system directory to store the data ingested from Kafka.

  • Type: string
  • Default: "topics"
locale

The locale used when partitioning with TimeBasedPartitioner.

  • Type: string
  • Default: ""

partition.duration.ms

The duration of a partition (milliseconds) used by TimeBasedPartitioner. The default value -1 means that TimeBasedPartitioner is not being used.

  • Type: long
  • Default: -1

partition.field.name

The name of the partitioning field when FieldPartitioner is used.

  • Type: string
  • Default: ""

path.format

This configuration is used to set the format of the data directories when partitioning with TimeBasedPartitioner. The format set in this configuration converts the Unix timestamp to proper directories strings. For example, if you setpath.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/, the data directories will have the format /year=2015/month=12/day=07/hour=15

  • Type: string
  • Default: ""

shutdown.timeout.ms

Clean shutdown timeout. This makes sure that asynchronous Hive metastore updates are completed during connector shutdown.

  • Type: long
  • Default: 3000

timezone

The timezone to use when partitioning with TimeBasedPartitioner.

  • Type: string
  • Default: ""

filename.offset.zero.pad.width

Sets the width to the zero-pad offsets in MapR file system filenames. If the offsets are too short it provides fixed width filenames that can be ordered by simple lexicographic sorting.

  • Type: int
  • Default: 10

kerberos.ticket.renew.period.ms

The period in milliseconds to renew the Kerberos ticket.

  • Type: long
  • Default: 3600000 (milliseconds)

retry.backoff.ms

Used to notify Kafka Connect to retry delivering a message batch or performing recovery in case of transient exceptions. The retry backoff is in milliseconds.

  • Type: long
  • Default: 5000 (milliseconds)

schema.cache.size

The sized of the schema cache used in the Avro converter.

  • Type: int
  • Default: 1000

storage.class

The underlying storage layer. The default is MapR file system.

  • Type: string
  • Default: "io.confluent.connect.hdfs.storage.HdfsStorage"