Developing MapR Event Store For Apache Kafka C#/.NET Applications

Describes general tasks for developing C#/.NET applications.

Before Your Begin

Confirm that your environment meets the following requirements:

  • MapR cluster version 6.0.1 or greater.
  • MapR Event Store For Apache Kafka C Client (mapr-librdkafka 0.11.3) is installed and configured on the node. See Configuring the MapR Event Store For Apache Kafka C Client.
  • MapR Event Store For Apache Kafka C#/.NET Client (mapr-streams-dotnet) is installed on the node.
  • .NET SKD 4.5.x or 4.6.x
  • .NET Core SDK 1.1
  • nuget.exe

Create a Producer Application

In general, you want to create a producer that performs the following steps:
  1. Import the producer class.
  2. Define the producer and its configuration.
  3. Produce data.
  4. Wait for all messages to be sent to consumer.

In the following example code, three messages are produced to a topic named mytopic in a stream named my_stream.

class Producer
    {
        public static async void Produce()
        {
            string stream = "/my_stream";
            string topicName = "mytopic";

            var config = new Dictionary<string, object> { { "streams.producer.default.stream", stream } };
            var messages = new string[] { "Msg1", "Msg2", "Msg3" };

            using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
            {
                foreach (var msg in messages)
                {
                    var deliveryReport = await producer.ProduceAsync(topicName, null, msg);
                    Console.WriteLine($"Delivery report:{deliveryReport.TopicPartitionOffset}");
                }

                producer.Flush(TimeSpan.FromSeconds(1));
            }
        }
    }

Create a Consumer Application

In general, you want to create a consumer that performs the following steps:
  1. Import the consumer class.
  2. Define the consumer and its configuration.
  3. Consume data.
  4. Wait for all messages to be consumed.

In following example code, the MapR Event Store For Apache Kafka consumer is subscribed to my_stream/mytopic and it prints the content of each message that it reads.

using Confluent.Kafka;
using Confluent.Kafka.Serialization

    class Consumer
    {
        public static void Consume()
        {
            var stream = "/mystream";
            var topic = "mytopic";

            var config = new Dictionary<string, object>
            {
                { "group.id", "simple-csharp-consumer" },
                { "streams.consumer.default.stream", stream }
            };

            bool running = true;

            using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
            {
                var l = new List<TopicPartitionOffset> { new TopicPartitionOffset(topic, 0, 0) };
                consumer.Assign(l);


                // Raised on critical errors, e.g. connection failures.
                consumer.OnError += (_, error) =>
                {
                    Console.WriteLine($"Error: {error}");
                    running = false;
                };

                // Raised on deserialization errors or when a consumed message has an error != NoError.
                consumer.OnConsumeError += (_, error) =>
                {
                    Console.WriteLine($"Consume error: {error}");
                    running = false;
                };

                while (running)
                {
                    Message<Ignore, string> msg;
                    if (consumer.Consume(out msg, TimeSpan.FromSeconds(10)))
                    {
                        Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
                    }
                }
            }
        }
    }

Run the Example Applications

To run the sample producer and consumer applications:

  1. Create a stream named mystream.
  2. Create a folder application.
  3. Create a file named example.cs.
  4. Add producer example code into the example.cs file.
  5. Add consumer example code into the example.cs file.
  6. Add an entry point for your application:
        class Demo
                  {
                  public static void Main(string[] args)
                  {
                  Producer.Produce();
                  Consumer.Consume();
                  }
                  }
  7. Create a project file named example.csproj.
  8. Add the following dependency properties into the example.csproj file:
    <?xml version="1.0" encoding="utf-8"?> 
    <Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> 
      <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> 
      <PropertyGroup> 
        <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> 
        <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> 
        <ProjectGuid>{99EDBA4B-D7DA-48BB-8D0C-AF4B12387935}</ProjectGuid> 
        <OutputType>Exe</OutputType> 
        <RuntimeIdentifiers>win10-x64</RuntimeIdentifiers>     
        <RootNamespace>app</RootNamespace> 
        <AssemblyName>app</AssemblyName> 
        <TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion> 
        <FileAlignment>512</FileAlignment> 
        <AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects> 
      </PropertyGroup> 
      <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> 
        <PlatformTarget>AnyCPU</PlatformTarget> 
        <DebugSymbols>true</DebugSymbols> 
        <DebugType>full</DebugType> 
        <Optimize>false</Optimize> 
        <OutputPath>bin\Debug\</OutputPath> 
        <DefineConstants>DEBUG;TRACE</DefineConstants> 
        <ErrorReport>prompt</ErrorReport> 
        <WarningLevel>4</WarningLevel> 
      </PropertyGroup> 
      <ItemGroup> 
        <Compile Include="app.cs" /> 
      </ItemGroup> 
      <ItemGroup> 
        <PackageReference Include="mapr-streams-dotnet" Version="0.11.3" /> 
      </ItemGroup> 
        <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> 
    </Project>
  9. Verify that you have completed the steps to configure the MapR Event Store For Apache Kafka C client or complete the steps now. See Configuring the MapR Event Store For Apache Kafka C Client.
    Note: The MapR Event Store For Apache Kafka C#/.NET Client is dependent on the MapR Event Store For Apache Kafka C Client. Therefore, the MapR Event Store For Apache Kafka C Client must be configured before you can run the application.
  10. Open your project folder on the command line and run:
    dotnet run