Developing MapR-ES Python Applications

This topic includes basic information about how to develop a MapR-ES Python application and an example program that you can run.

Before you Begin

Confirm that your environment meets the following requirements:

Create a MapR-ES Producer Application

In general, you want to create a producer that performs the following steps:
  1. Import the producer class.
  2. Define the producer and its configuration.
  3. Produce data.
  4. Wait for all messages to be sent to consumer.
As of MEP 5.0 MapR-ES Python Client: In the following example code, three messages are produced to a topic named mytopic in a stream named my_stream.
from confluent_kafka import Producer
p = Producer({'streams.producer.default.stream': '/my_stream'})
some_data_source= ["msg1", "msg2", "msg3"]
for data in some_data_source:
     p.produce('mytopic', data.encode('utf-8'))
     p.flush()
As of MEP 3.0 MapR-ES Python Client: In the following example code, three messages are produced to a topic named mytopic in a stream named my_stream.
from mapr_streams_python import Producer
p = Producer({'streams.producer.default.stream': '/my_stream'})
some_data_source= ["msg1", "msg2", "msg3"]
for data in some_data_source:
    p.produce('mytopic', data.encode('utf-8'))
    p.flush()

Create a MapR-ES Consumer Application

In general, you want to create a consumer that performs the following steps:
  1. Import the consumer class.
  2. Define the consumer and its configuration.
  3. Consume data.
  4. Wait for all messages to be consumed.
As of MEP 5.0 MapR-ES Python Client: In following example code, the MapR-ES consumer is subscribed to my_stream/mytopic and it prints the content of each message that it reads.
from confluent_kafka import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
  msg = c.poll(timeout=1.0)
  if msg is None: continue
  if not msg.error():
    print('Received message: %s' % msg.value().decode('utf-8'))
  elif msg.error().code() != KafkaError._PARTITION_EOF:
    print(msg.error())
    running = False
c.close()
As of MEP 3.0 MapR-ES Python Client: In following example code, the MapR-ES consumer is subscribed to my_stream/mytopic and it prints the content of each message that it reads.
from mapr_streams_python import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
  msg = c.poll(timeout=1.0)
  if msg is None: continue
  if not msg.error():
    print('Received message: %s' % msg.value().decode('utf-8'))
  elif msg.error().code() != KafkaError._PARTITION_EOF:
    print(msg.error())
    running = False
c.close()

Run the Example Applications

To run the sample producer and consumer applications:
  1. Create a stream named mystream.
  2. Create a file named producer.py.
  3. Add the producer example code into the producer.py file.
  4. Create a file named consumer.py.
  5. Add the consumer example code into the consumer.py file.
  6. Verify that you have completed the steps to configure the MapR-ES C client or complete the steps now. See Configuring the MapR-ES C Client.
    Note: The MapR-ES Python Client is dependent on the MapR-ES C Client. Therefore, the MapR-ES C Client must be configured before you can run the application.
  7. Run producer.py from the command line to generate messages.
    $ python producer.py
  8. Run consumer.py from the command line:
    $ python consumer.py