Skip to content

Batch Connector

The Flink Connector library for Pravega makes it possible to use a Pravega Stream as a data source and data sink in a batch program. See the below sections for details.

Table of Contents

FlinkPravegaInputFormat

A Pravega Stream may be used as a data source within a Flink batch program using an instance of io.pravega.connectors.flink.FlinkPravegaInputFormat. The input format reads events of a stream as a DataSet (the basic abstraction of the Flink Batch API). This input format opens the stream for batch reading, which processes stream segments in parallel and does not follow routing key order.

Use the ExecutionEnvironment::createInput method to open a Pravega Stream as a DataSet.

Example

// Define the Pravega configuration
PravegaConfig config = PravegaConfig.fromParams(params);

// Define the event deserializer
DeserializationSchema<EventType> deserializer = ...

// Define the input format based on a Pravega stream
FlinkPravegaInputFormat<EventType> inputFormat = FlinkPravegaInputFormat.<EventType>builder()
    .forStream(...)
    .withPravegaConfig(config)
    .withDeserializationSchema(deserializer)
    .build();

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSource<EventType> dataSet = env.createInput(inputFormat, TypeInformation.of(EventType.class)).setParallelism(2);

Parameters

A builder API is provided to construct an instance of FlinkPravegaInputFormat. See the table below for a summary of builder properties. Note that the builder accepts an instance of PravegaConfig for common configuration properties. See the configurations page for more information.

Method Description
withPravegaConfig The Pravega client configuration, which includes connection info, security info, and a default scope.
forStream The stream to be read from, with optional start and/or end position. May be called repeatedly to read numerous streams in parallel.
withDeserializationSchema The deserialization schema which describes how to turn byte messages into events.

Input Stream(s)

Each Pravega stream exists within a scope. A scope defines a namespace for streams such that names are unique. Across scopes, streams can have the same name. For example, if we have scopes A and B, then we can have a stream called myStream in each one of them. We cannot have a stream with the same name in the same scope. The builder API accepts both qualified and unqualified stream names.

  • In qualified stream names, the scope is explicitly specified, e.g. my-scope/my-stream.
  • In unqualified stream names are assumed to refer to the default scope as set in the PravegaConfig. See the configurations page for more information on default scope.

A stream may be specified in one of three ways:

  1. As a string containing a qualified name, in the form scope/stream.
  2. As a string containing an unqualified name, in the form stream. Such streams are resolved to the default scope.
  3. As an instance of io.pravega.client.stream.Stream, e.g. Stream.of("my-scope", "my-stream").

Multiple streams can be passed as parameter option (using the builder API). The BatchClient implementation is capable of reading from numerous streams in parallel, even across scopes.

StreamCuts

A StreamCut represents a specific position in a Pravega Stream, which may be obtained from various API interactions with the Pravega client. The BatchClient accepts a StreamCut as the start and/or end position of a given stream. For further reading on StreamCuts, please refer to documentation on StreamCut and sample code.

If stream cuts are not provided then the default start position requested is assumed to be the earliest available data in the stream and the default end position is assumed to be all available data in that stream as of when the job execution begins.

Parallelism

FlinkPravegaInputFormat supports parallelization. Use the setParallelism method of DataSet to configure the number of parallel instances to execute. The parallel instances consume the stream in a coordinated manner, each consuming one or more stream segments.

FlinkPravegaOutputFormat

A Pravega Stream may be used as a data sink within a Flink batch program using an instance of io.pravega.connectors.flink.FlinkPravegaOutputFormat. The FlinkPravegaOutputFormat can be supplied as a sink to the DataSet (the basic abstraction of the Flink Batch API).

Example

// Define the Pravega configuration
PravegaConfig config = PravegaConfig.fromParams(params);

// Define the event serializer
SerializationSchema<EventType> serializer = ...

// Define the event router for selecting the Routing Key
PravegaEventRouter<EventType> router = ...

// Define the input format based on a Pravega Stream
FlinkPravegaOutputFormat<EventType> outputFormat = FlinkPravegaOutputFormat.<EventType>builder()
    .forStream(...)
    .withPravegaConfig(config)
    .withSerializationSchema(serializer)
    .withEventRouter(router)
    .build();

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Collection<EventType> inputData = Arrays.asList(...);
env.fromCollection(inputData)
   .output(outputFormat);
env.execute("...");

Parameter

A builder API is provided to construct an instance of FlinkPravegaOutputFormat. See the table below for a summary of builder properties. Note that the builder accepts an instance of PravegaConfig for common configuration properties. See the configurations page for more information.

Method Description
withPravegaConfig The Pravega client configuration, which includes connection info, security info, and a default scope.
forStream The stream to be written to.
withSerializationSchema The serialization schema which describes how to turn events into byte messages.
withEventRouter The router function which determines the Routing Key for a given event.

Output Stream

Each stream in Pravega is contained by a scope. A scope acts as a namespace for one or more streams. The builder API accepts both qualified and unqualified stream names.

  • In qualified, the scope is explicitly specified, e.g. my-scope/my-stream.
  • In Unqualified stream names are assumed to refer to the default scope as set in the PravegaConfig.

A stream may be specified in one of three ways:

  1. As a string containing a qualified name, in the form scope/stream.
  2. As a string containing an unqualified name, in the form stream. Such streams are resolved to the default scope.
  3. As an instance of io.pravega.client.stream.Stream, e.g. Stream.of("my-scope", "my-stream").

Parallelism

FlinkPravegaWriter supports parallelization. Use the setParallelism method to configure the number of parallel instances to execute.

Event Routing

Every event written to a Pravega Stream has an associated Routing Key. The Routing Key is the basis for event ordering. See the Pravega Concepts for details.

To establish the routing key for each event, provide an implementation of io.pravega.connectors.flink.PravegaEventRouter when constructing the writer.

Serialization

Please, see the serialization page for more information on how to use the serializer and deserializer.