Skip to content

Working with Pravega: ReaderGroup Notifications

The ReaderGroup api supports different types of notifications. Currently, we have two types implemented, but we plan to add more over time. The types we currently support are the following:

  1. Segment Notification

A segment notification is triggered when the total number of segments managed by the reader group changes. During a scale operation segments can be split into multiple or merged into some other segment causing the total number of segments to change. The total number of segments can also change when the configuration of the reader group changes, for example, when it adds or removes a stream.

The method for subscribing to segment notifications is shown below

@Cleanup
ReaderGroupManager groupManager = new ReaderGroupManagerImpl(SCOPE, controller, clientFactory,
        connectionFactory);
groupManager.createReaderGroup(GROUP_NAME, ReaderGroupConfig.builder().
                                                            .stream(Stream.of(SCOPE, STREAM))
                                                            .build());

groupManager.getReaderGroup(GROUP_NAME).getSegmentNotifier(executor).registerListener(segmentNotification -> {
       int numOfReaders = segmentNotification.getNumOfReaders();
       int segments = segmentNotification.getNumOfSegments();
       if (numOfReaders < segments) {
          //Scale up number of readers based on application capacity
       } else {
         //More readers available time to shut down some
       }
});
The application can register a listener to be notified of SegmentNotification using the registerListener api. This api takes io.pravega.client.stream.notifications.Listener as a parameter. Here the application can add custom logic to change the set of online readers according to the number of segments. For example, if the number of segments increases, then application might consider increasing the number of online readers. If the number of segments instead decreases according to a segment notification, then the application might want to change the set of online readers accordingly.

  1. EndOfData Notification

An end of data notifier is triggered when the readers have read all the data of the stream(s) managed by the reader group. This is useful to process the stream data with a batch job where the application wants to read data of sealed stream(s).

The method for subscribing to end of data notifications is shown below

@Cleanup
ReaderGroupManager groupManager = new ReaderGroupManagerImpl(SCOPE, controller, clientFactory,
        connectionFactory);
groupManager.createReaderGroup(GROUP_NAME, ReaderGroupConfig.builder()
                                                            .stream(Stream.of(SCOPE, SEALED_STREAM))
                                                            .build());

groupManager.getReaderGroup(GROUP_NAME).getEndOfDataNotifier(executor).registerListener(notification -> {
      //custom action e.g: close all readers
});
The application can register a listener to be notified of EndOfDataNotification using the registerListener api. This api takes io.pravega.client.stream.notifications.Listener as a parameter. Here the application can add custom logic that can be invoked once all the data of the sealed streams are read.