API for MapR Streams Python Client

MapR Streams Python Client is binding for librdkafka and it supports the following APIs.

class mapr_streams_python.Consumer

A high-level Kafka Consumer.

Method Behavior
Consumer(**kwargs) Create new Consumer instance using provided configuration dictionary.
assign(partitions) Set consumer partition assignment to the provided list of TopicPartition and starts consuming.
  • partitions (list(TopicPartition)) – List of topic+partitions and optionally initial offsets to start consuming
close() Close down and terminate the Kafka Consumer.
  • Stops consuming
  • Commits offsets
  • Leave consumer group
commit([message=None][, offsets=None][, async=True])

Commit a message or a list of offsets.

Message and offsets are mutually exclusive, if neither is set the current partition assignment’s offsets are used instead.

  • message (confluent_kafka.Message) – Commit message’s offset+1.
  • offsets (list(TopicPartition)) – List of topic+partitions+offsets to commit.
  • async (bool) – Asynchronous commit, return immediately.
committed(partitions[, timeout=None])

Retrieve committed offsets for the list of partitions.

  • partitions (list(TopicPartition)) - List of topic+partitions to query for stored offsets.
  • timeout (float) – Request timeout

Returns: List of topic+partitions with offset and possibly error set.

Return type: list(TopicPartition)

Raises: KafkaException

Note: MapR Streams offset starts at 1.
on_commit(err, partitions)

A callback for Consumer.commit() that triggers custom actions when a commit request completes.

  • err (KafkaError) – Commit error object, or None on success.
  • Partitions (list(TopicPartition)) – List of partitions with their committed offsets or per-partition errors

Consume messages, calls callbacks and returns events.

The application must check the returned Message object’s Message.error() method to distinguish between proper messages (error() returns None), or an event or error (see error().code() for specifics).

Parameter(s): timeout (float) – Maximum time to block waiting for message, event or callback

Returns: A Message object or None on timeout

Return type: Message or None

position(partitions[, timeout=None])

Retrieve current positions (offsets) for the list of partitions.

Parameter(s): partitions (list(TopicPartition)) – List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1

Returns: List of topic+partitions with offset and possibly error set.

Return type: list(TopicPartition)

Raises: KafkaException

This function returns 0 when the messages have not yet been consumed from partitions. librdkafka returns -1001 instead.
subscribe(topics[, listener=None])

Set subscription to supplied list of topics This replaces a previous subscription.

  • topics (list(str)) – List of topics (strings) to subscribe to.

  • on_assign (callable) – callback to provide handling of customized offsets on completion of a successful partition re-assignment.

  • on_revoke (callable) – callback to provide handling of offset commits to a customized store on the start of a rebalance operation.

Raises: KafkaException
Note: You cannot use the rd_kafka_subscribe API to subscribe a consumer to topics when that consumer is already assigned to topics. If you call this API for an assigned consumer, error RD_KAFKA_RESP_ERR__CONFLICT is returned.
on_assign(consumer, partitions) Same as librdkafka.
unsubscribe() Same as librdkafka.
on_revoke(consumer, partitions) Parameter(s):
  • consumer (Consumer) – Consumer instance.

  • partitions (list(TopicPartition)) – Absolute list of partitions being assigned or revoked.

сlass mapr_streams_python.Producer

Asynchronous Kafka Producer.

Method Behavior

Create new Producer instance using provided configuration dict.

len() This API returns a positive number to indicate that messages are waiting to be produced to a MapR streams topic but the value does not indicate the actual number of messages. librdkafka returns the actual number of messages that are waiting to be sent to or acknowledged by the broker.

Return type: int


Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls poll() until len()is zero.


Polls the producer for events and calls the corresponding callbacks (if registered).

  • timeout (float) – Maximum time to block waiting for events

Returns: Number of events processed (callbacks served).

Return type: int

produce(topic[, value][, key][, partition][, callback])

Produce message to topic. This is an asynchronous operation, an application may use the callback( alias on_delivery) argument to pass a function (or lambda) that will be called from poll() when the message has been successfully delivered or permanently fails delivery.

  • topic (str) – Topic to produce message to

  • value (str|bytes) – Message payload
  • key (str|bytes) – Message key
  • partition (int) – Partition to produce to, else uses the configured partitioner.
  • on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed deliver
  • BufferError – if the internal producer message queue is full (queue.buffering.max.messages exceeded)
  • KafkaException – for other errors, see exception code
Note: When this function is called with NULL payload, an invalid argument error is sent to the callback. librdkafka creates a message with NULL payload and key value instead.

class mapr_streams_python.Message

The Message object represents either a single consumed or produced message, or an event . An application must check with error() to see if the object is a proper message (error() returns None) or an error/event. This class is not user-instantiable.

Method Behavior

Returns: Message value (payload) size in bytes.

Return type: int


The message object is also used to propagate errors and events. Applications must check error() to determine if the Message is a proper message (error() returns None) or an error or event (error() returns a KafkaError object)

Return type: None or KafkaError


Returns: message key or None if not available

Return type: str|bytes or None


Returns: message offset or None if not available

Return type: int or None


Returns: partition number or None if not available

Return type: int or None


Returns: topic name or None if not available

Return type: str or None


Returns: message value (payload) or None if not available

Return type: str|bytes or None

class mapr_streams_python.TopicPartition

TopicPartition is a generic type to hold a single partition and various information about it. It is typically used to provide a list of topics or partitions for various operations, such as Consumer.assign().

Method Behavior
TopicPartition(topic[, partition][, offset])

Instantiate a TopicPartition object.

  • topic (string) – Topic name

  • partition (int) – Partition id

  • offset (int) – Initial partition offset

Return type: TopicPartition


Attribute that indicates an error (with KafkaError) unless None.

offset Attribute for offset.
partition Attribute for partition number.
topic Attribute for topic name.

class mapr_streams_python.KafkaError

Kafka error and event object.

The KafkaError class serves multiple purposes:
  • Propagation of errors
  • Propagation of events
  • Exceptions

This class is not user-instantiable.

Method Behavior

Returns the error/event code for comparison toKafkaError.<ERR_CONSTANTS>.

Returns: error/event code

Return type: int


Returns the enum name for error/event.

Returns: error/event enum name string

Return type: str


Returns the human-readable error/event string.

Returns: error/event enum message string

Return type: str