Working with Pravega: Basic Reader and Writer

Lets examine how to build Pravega applications. The simplest kind of Pravega application uses a Reader to read from a Stream or a Writer that writes to a Stream. A simple sample application of both can be found in the Pravega samples repository (HelloWorldReader and HelloWorldWriter) applications. These samples give a sense on how a Java application could use the Pravega's Java Client Library to access Pravega functionality.

Instructions for running the sample applications can be found in the Pravega Samples. Get familiar with the Pravega Concepts before executing the sample applications.

HelloWorldWriter

The HelloWorldWriter application demonstrates the usage of EventStreamWriter to write an Event to Pravega.

The key part of HelloWorldWriter is in the run() method. The purpose of the run() method is to create a Stream and output the given Event to that Stream.

public void run(String routingKey, String message) {
    StreamManager streamManager = StreamManager.create(controllerURI);

    final boolean scopeCreation = streamManager.createScope(scope);
    StreamConfiguration streamConfig = StreamConfiguration.builder()
                                                          .scalingPolicy(ScalingPolicy.fixed(1))
                                                          .build();
    final boolean streamCreation = streamManager.createStream(scope, streamName, streamConfig);

    try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
         EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName,
                                                          new JavaSerializer<String>(),
                                                          EventWriterConfig.builder().build())) {

         System.out.format("Writing message: '%s' with routing-key: '%s' to stream '%s / %s'%n",
                                                        message, routingKey, scope, streamName);
         final CompletableFuture<Void> writeFuture = writer.writeEvent(routingKey, message);
    }
}

Creating a Stream and the StreamManager Interface

Scopes and Streams are created and manipulated via the StreamManager interface to the Pravega Controller. An URI to any of the Pravega Controller instance(s) in your cluster is required to create a StreamManager object. In the setup for the HelloWorld sample applications, the controllerURI is configured as a command line parameter when the sample application is launched.

Note: For the "standalone" deployment of Pravega, the Controller is listening on localhost, port 9090.

The StreamManager provides access to various control plane functions in Pravega related to Scopes and Streams:

Method Parameters Discussion
(static) create (URI controller) Given a URI to one of the Pravega Controller instances in the Pravega Cluster, create a Stream Manager object.
createScope (String scopeName) Creates a Scope with the given name.
Returns True if the Scope is created, returns False if the Scope already exists.
This method can be called even if the Stream is already existing.
deleteScope (String scopeName) Deletes a Scope with the given name.
Returns True if the scope was deleted, returns False otherwise.
If the Scope contains Streams, the deleteScope operation will fail with an exception.
If we delete a non-existent Scope, the method will succeed and return False.
createStream (String scopeName, String streamName, StreamConfiguration config) Create a Stream within a given Scope.
Both Scope name and Stream name are limited using the following characters: Letters (a-z A-Z), numbers (0-9) and delimiters: "." and "-" are allowed.
The Scope must exist, an exception is thrown if we create a Stream in a non-existent Scope.
A Stream Configuration is built using a builder pattern.
Returns True if the Stream is created, returns False if the Stream already exists.
This method can be called even if the Stream is already existing.
updateStream (String scopeName, String streamName, StreamConfiguration config) Swap out the Stream's configuration.  
The Stream must already exist, an exception is thrown if we update a non-existent Stream.
Returns True if the Stream was changed.
sealStream (String scopeName, String streamName) Prevent any further writes to a Stream.
The Stream must already exist, an exception is thrown if you seal a non-existent Stream.
Returns True if the Stream is successfully sealed.
deleteStream (String scopeName, String streamName) Remove the Stream from Pravega and recover any resources used by that Stream.
Returns False if the Stream is non-existent.
Returns True if the Stream was deleted.

The execution of API createScope(scope) establishes that the Scope exists. Then we can create the Stream using the API createStream(scope, streamName, streamConfig). The StreamManager requires three parameters to create a Stream:

  • Scope Name.
  • Stream Name.
  • Stream Configuration.

The most interesting task is to create the Stream Configuration (streamConfig). Like many objects in Pravega, a Stream takes a configuration object that allows a developer to control various behaviors of the Stream. All configuration object instantiated via builder pattern. That allows a developer to control various aspects of a Stream's behavior in terms of policies; Retention Policy and Scaling Policy are the most important ones related to Streams. For the sake of simplicity, in our sample application we instantiate a Stream with a single segment (ScalingPolicy.fixed(1)) and using the default (infinite) retention policy.

Once the Stream Configuration (streamConfig) object is built, creating the Stream is straight forward using createStream(). After the Stream is created, we are all set to start writing Event(s) to the Stream.

Writing an Event using EventWriter

Applications use an EventStreamWriter object to write Events to a Stream. The EventStreamWriter is created using the ClientFactory object.  The ClientFactory is used to create Readers, Writers and other types of Pravega Client objects such as the State Synchronizer (see Working with Pravega: State Synchronizer).

A ClientFactory is created in the context of a Scope, since all Readers, Writers and other Clients created by the ClientFactory are created in the context of that Scope. The ClientFactory also needs a URI to one of the Pravega Controllers (ClientFactory.withScope(scope, controllerURI)) , just like StreamManager.

As the ClientFactory and the objects it creates consume resources from Pravega and implement AutoCloseable, it is a good practice to create these objects using a try-with-resources. By doing this, we make sure that, regardless of how the application ends, the Pravega resources will be properly closed in the right order.

Once the ClientFactory is instantiated, we can use it to create a Writer. There are several things a developer needs to know before creating a Writer:

  1. What is the name of the Stream to write to?  (The Scope has already been determined when the ClientFactory was created.)

  2. What Type of Event objects will be written to the Stream?

  3. What serializer will be used to convert an Event object to bytes? (Recall that Pravega only knows about sequences of bytes, it is unaware about Java objects.)

  4. Does the Writer need to be configured with any special behavior?

     EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName,
                                                      new JavaSerializer<String>(),
                                                      EventWriterConfig.builder().build()))

The EventStreamWriter writes to the Stream specified in the configuration of the HelloWorldWriter sample application (by default the stream is named "helloStream" in the "examples" Scope). The Writer processes Java String objects as Events and uses the built in Java serializer for Strings.

Note: Pravega allows users to write their own serializer. For more information and example, please refer to Pravega Serializer

The EventWriterConfig allows the developer to specify things like the number of attempts to retry a request before giving up and associated exponential back-off settings. Pravega takes care to retry requests in the presence of connection failures or Pravega component outages, which may temporarily prevent a request from succeeding, so application logic doesn't need to be complicated by dealing with intermittent cluster failures. In the sample application, EventWriterConfig was considered as the default settings.

EventStreamWriter provides a writeEvent() operation that writes the given non-null Event object to the Stream using a given Routing key to determine which Stream Segment it should written to.  Many operations in Pravega, such as writeEvent(), are asynchronous and return some sort of Future object. If the application needed to make sure the Event was durably written to Pravega and available for Readers, it could wait on the Future before proceeding. In the case of Pravega's HelloWorld example, it does wait on the Future.

EventStreamWriter can also be used to begin a Transaction.  We cover Transactions in more detail in Working with Pravega: Transactions.

HelloWorldReader

The HelloWorldReader is a simple demonstration of using the EventStreamReader. The application reads Events from the given Stream and prints a string representation of those Events onto the console.

Just like the HelloWorldWriter example, the key part of the HelloWorldReader application is in the run() method:

public void run() {
   StreamManager streamManager = StreamManager.create(controllerURI);

   final boolean scopeIsNew = streamManager.createScope(scope);
   StreamConfiguration streamConfig = StreamConfiguration.builder()
                                                         .scalingPolicy(ScalingPolicy.fixed(1))
                                                         .build();
   final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);

   final String readerGroup = UUID.randomUUID().toString().replace("-", "");
   final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
                                                                .stream(Stream.of(scope, streamName))
                                                                .build();
   try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI))
    {
       readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
   }

   try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
        EventStreamReader<String> reader = clientFactory.createReader("reader",
                                                                      readerGroup,
                                                                      new JavaSerializer<String>(),
                                                                      ReaderConfig.builder().build()))
        {
        System.out.format("Reading all the events from %s/%s%n", scope, streamName);
        EventRead<String> event = null;
        do {
           try {
               event = reader.readNextEvent(READER_TIMEOUT_MS);
               if (event.getEvent() != null) {
                   System.out.format("Read event '%s'%n", event.getEvent());
               }
           } catch (ReinitializationRequiredException e) {
               //There are certain circumstances where the reader needs to be reinitialized
               e.printStackTrace();
           }
       } while (event.getEvent() != null);
       System.out.format("No more events from %s/%s%n", scope, streamName);
   }

The API streamManager.createScope() and streamManager.createStream() set up the Scope and Stream just like in the HelloWorldWriter application. The API ReaderGroupConfig set up the Reader Group as the prerequisite to creating the EventStreamReader and using it to read Events from the Stream (createReader(),reader.readNextEvent()).

Reader Group Basics

Any Reader in Pravega belongs to some ReaderGroup.  A ReaderGroup is a grouping of one or more Readers that consume from a Stream in parallel. Before we create a Reader, we need to either create a ReaderGroup (or be aware of the name of an existing ReaderGroup). This application only uses the basics from Reader Group.

ReaderGroup objects are created from a ReaderGroupManager object. The ReaderGroupManager object, in turn, is created on a given Scope with a URI to one of the Pravega Controllers, very much like a ClientFactory is created. Note that, the createReaderGroup is also in a try-with-resources statement to make sure that the ReaderGroupManager is properly cleaned up. The ReaderGroupManager allows a developer to create, delete and retrieve ReaderGroup objects using the name.

To create a ReaderGroup, the developer needs a name for the Reader Group and a configuration with a set of one or more Streams to read from. The Reader Group's name (alphanumeric) might be meaningful to the application, like "WebClickStreamReaders".  In cases where we require multiple Readers reading in parallel and each Reader in a separate process, it is helpful to have a human readable name for the Reader Group. In this example, we have one Reader, reading in isolation, so a UUID is a safe way to name the Reader Group. The Reader Group is created via the ReaderGroupManager and since the ReaderGroupManager is created within the context of a Scope, we can safely conclude that Reader Group names are namespaced by that Scope.  

The developer specifies the Stream which should be the part of the Reader Group and its lower and upper bounds. In the sample application, we start at the beginning of the Stream as follows:

final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
                                                             .stream(Stream.of(scope, streamName))
                                                             .build();
Other configuration items, such as Checkpointing are options that will be available through the ReaderGroupConfig.

The Reader Group can be configured to read from multiple Streams. For example, imagine a situation where there is a collection of Stream of sensor data coming from a factory floor, each machine has its own Stream of sensor data.  We can build applications that uses a Reader Group per Stream so that the application reasons about data from exactly one machine. We can build other applications that use a Reader Group configured to read from all of the Streams. To keep it simple, in the sample application the Reader Group only reads from one Stream.

We can call createReaderGroup with the same parameters multiple times and the same Reader Group will be returned each time after it is initially created (idempotent operation). Note that in other cases, if the developer knows the name of the Reader Group to use and knows it has already been created, they can use getReaderGroup() on ReaderGroupManager to retrieve the ReaderGroup object by name.

At this point, we have the Scope and Stream is set up and the ReaderGroup object created. Next, we need to create a Reader and start reading Events.

Reading Event using an EventStreamReader

First, we create a ClientFactory object, the same way we did it in the HelloWorldWriter application. Then we use the ClientFactory to create an EventStreamReader object. The following are the four parameters to create a Reader:

  • Name for the Reader.
  • Reader Group it should be part of.
  • The type of object expected on the Stream.
  • Serializer to convert from the bytes stored in Pravega into the Event objects and a ReaderConfig.
EventStreamReader<String> reader = clientFactory.createReader("reader",
                                                              readerGroup,
                                                              new JavaSerializer<String>(),
                                                              ReaderConfig.builder().build()))

The name of the Reader can be any valid Pravega naming convention (numbers and letters). Note that the name of the Reader is namespaced within the Scope. EventStreamWriter and EventStreamReader uses Java generic types to allow a developer to specify a type safe Reader. In the sample application, we read Strings from the stream and use the standard Java String Serializer to convert the bytes read from the stream into String objects.

Note: Pravega allows users to write their own serializer. For more information and example, please refer to Pravega Serializer

Finally, we use a ReaderConfig object with default values. Note that you cannot create the same Reader multiple times. That is, an application may call createReader() to add new Readers to the Reader Group. But if the Reader Group already contains a Reader with that name, an exception is thrown.

After creating an EventStreamReader, we can use it to read Events from the Stream. The readNextEvent() operation returns the next Event available on the Stream, or if there is no such Event, blocks for a specified time. After the expiry of the timeout period, if no Event is available for reading, then Null is returned. The null check (EventRead<String> event = null) is used to avoid printing out a spurious Null event message to the console and also used to terminate the loop. Note that the Event itself is wrapped in an EventRead object.

It is worth noting that readNextEvent() may throw an exception ReinitializationRequiredException and the object is reinitialized. This exception would be handled in cases where the Readers in the Reader Group need to reset to a Checkpoint or the Reader Group itself has been altered and the set of Streams being read has been therefore changed. TruncatedDataException is thrown when we try to read the deleted data. It is however possible to recover from the later by calling readNextEvent() again (it will just skip forward).

Thus, the simple HelloWorldReader loops, reading Events from a Stream until there are no more Events, and then the application terminates.

Experimental Batch Reader

BatchClient is used for applications that require parallel, unordered reads of historical stream data. Using the Batch Reader all the segments in a Stream can be listed and read from. Hence, the Events for a given Routing Key which can reside on multiple segments are not read in order.

Obviously this API is not for every application, the main advantage is that it allows for low level integration with batch processing frameworks such as MapReduce.

Example

To iterate over all the segments in the stream:

//Passing null to fromStreamCut and toStreamCut will result in using the current start of stream and the current end of stream respectively.
Iterator<SegmentRange> segments = client.listSegments(stream, null, null).getIterator();
SegmentRange segmentInfo = segments.next();
To read the events from a segment:

SegmentIterator<T> events = client.readSegment(segmentInfo, deserializer);
while (events.hasNext())
{
    processEvent(events.next());
}