Developing a MapR Streams Python Application

This topic includes basic information about how to develop a MapR Streams Python application and an example program that you can run.

Before you Begin

Confirm that your environment meets the following requirements:
  • MapR cluster version 5.2.1 or greater.
  • MapR Streams C Client (mapr-librdkafka) is installed and configured on the node. See Configuring the MapR Streams C Client.
  • MapR Streams Python Client (mapr-streams-python) is installed on the node. See Installing MapR-ES Python Client.
  • Python 2.7.x and greater or Python 3.x and greater is installed on the node.

Create a MapR Streams Producer Application

In general, you want to create a producer that performs the following steps:
  1. Import the producer class.
  2. Define the producer and its configuration.
  3. Produce data.
  4. Wait for all messages to be sent to consumer.

The following example code produces three messages to a topic named mytopic in a stream named my_stream.

from mapr_streams_python import Producer
p = Producer({'streams.producer.default.stream': '/my_stream'})
some_data_source= ["msg1", "msg2", "msg3"]
for data in some_data_source:
     p.produce('mytopic', data.encode('utf-8'))
p.flush()

Create a MapR Streams Consumer Application

In general, you want to create a consumer that performs the following steps:
  1. Import the consumer class.
  2. Define the consumer and its configuration.
  3. Consume data.
  4. Wait for all messages to be consumed.
In the following example, the MapR Streams consumer is subscribed to my_stream/mytopic and it prints the content of each message that it reads.
from mapr_streams_python import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
  msg = c.poll(timeout=1.0)
  if msg is None: continue
  if not msg.error():
    print('Received message: %s' % msg.value().decode('utf-8'))
  elif msg.error().code() != KafkaError._PARTITION_EOF:
    print(msg.error())
    running = False
c.close()

Run the Example Applications

To run the sample producer and consumer applications:
  1. Create a stream named mystream.
  2. Create a file named producer.py.
  3. Add the producer example code into the producer.py file.
  4. Create a file named consumer.py.
  5. Add the consumer example code into the consumer.py file.
  6. Verify that you have completed the steps to configure the MapR Streams C client or complete the steps now. See Configuring the MapR Streams C Client.
    Note: The MapR Streams Python Client is dependent on the MapR Streams C Client. Therefore, the MapR Streams C Client must be configured before you can run the application.
  7. Run producer.py from the command line to generate messages.
    $ python producer.py
  8. Run consumer.py from the command line:
    $ python consumer.py