Managing Streams with Java

Provides Java code snippets for performing CRUD operations on MapR-ES streams.

Creating Streams

StreamDescriptor is used to set attributes for streams that you want to create.

public StreamDescriptor createStreamDescriptor(int numPartitions, String adminUsers, String producerUsers, String consumerUsers, String copyUsers, String topicUsers) {
        StreamDescriptor desc = Streams.newStreamDescriptor();
        desc.setDefaultPartitions(numPartitions);
        desc.setCompressionAlgo("zlib");
        desc.setAutoCreateTopics(false);
        desc.setAdminPerms(adminUsers);
        desc.setConsumePerms(consumerUsers);
        desc.setCopyPerms(copyUsers);
        desc.setTopicPerms(topicUsers);
        
        return desc;
}

Admin is used with the createStream method to create the stream with the pre-established attribute values.

public void createStreamUtilFunction(String streamPathAndName, StreamDescriptor desc) throws IllegalArgumentException, IOException{
        Configuration conf = new Configuration();
        Admin streamAdmin = Streams.newAdmin(conf);
        streamAdmin.createStream(streamPathAndName, desc);
        streamAdmin.close();
}

Editing Stream Attributes

StreamDescriptor is used to retrieve the stream's attribute values. Admin with the editStream method is used to set or modify the stream's attribute values.

Admin.editStream(String streamPathAndName, StreamDescriptor desc)
Configuration conf = new Configuration();
Admin streamAdmin = Streams.newAdmin(conf);
        StreamDescriptor desc = streamAdmin.getStreamDescriptor(streamPathAndName);
public void editStreamUtilFunction(String streamPathAndName, StreamDescriptor desc) throws IllegalArgumentException, IOException{
        Configuration conf = new Configuration(); 
        Admin streamAdmin = Streams.newAdmin(conf);
        streamAdmin.editStream(streamPathAndName, desc);
        streamAdmin.close();				
}

Deleting Streams

public void deleteStreamUtilFunction(String streamPathAndName) throws IllegalArgumentException, IOException{
          Configuration conf = new Configuration();
          Admin streamAdmin = Streams.newAdmin(conf);
          streamAdmin.deleteStream(streamPathAndName);
          streamAdmin.close();
}