Skip to content

Serialization

Serialization refers to converting a data element in your Flink program to/from a message in a Pravega stream.

Flink defines a standard interface for data serialization to/from byte messages delivered by various connectors. The core interfaces are: - org.apache.flink.streaming.util.serialization.SerializationSchema - org.apache.flink.streaming.util.serialization.DeserializationSchema

Built-in serializers include: - org.apache.flink.streaming.util.serialization.SimpleStringSchema - org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema

The Pravega connector is designed to use Flink's serialization interfaces. For example, to read each stream event as a UTF-8 string:

DeserializationSchema<String> schema = new SimpleStringSchema();
FlinkPravegaReader<String> reader = new FlinkPravegaReader<>(..., schema);
DataStream<MyEvent> stream = env.addSource(reader);

Interoperability with Other Applications

A common scenario is using Flink to process Pravega stream data produced by a non-Flink application. The Pravega client library used by such applications defines the io.pravega.client.stream.Serializer interface for working with event data. The implementations of Serializer directly in a Flink program via built-in adapters can be used: - io.pravega.connectors.flink.serialization.PravegaSerializationSchema - io.pravega.connectors.flink.serialization.PravegaDeserializationSchema

Below is an example, to pass an instance of the appropriate Pravega de/serializer class to the adapter's constructor:

import io.pravega.client.stream.impl.JavaSerializer;
...
DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
    MyEvent.class, new JavaSerializer<MyEvent>());
FlinkPravegaReader<MyEvent> reader = new FlinkPravegaReader<>(..., adapter);
DataStream<MyEvent> stream = env.addSource(reader);

Note that the Pravega serializer must implement java.io.Serializable to be usable in a Flink program.

Deserialize with metadata

Pravega reader client wraps the event with the metadata in an EventRead data structure. Some Flink jobs might care about the stream position of the event data which is in EventRead, e.g. for indexing purposes.

PravegaDeserializationSchema offers a method to extract event with the metadata

public T extractEvent(EventRead<T> eventRead) {
    return eventRead.getEvent();
}

The default implementation can be overwritten to involve in metadata structure like EventPointer into the event by a custom extended PravegaDeserializationSchema. For example:

private static class MyJsonDeserializationSchema extends PravegaDeserializationSchema<JsonNode> {
    private boolean includeMetadata;

    public MyJsonDeserializationSchema(boolean includeMetadata) {
        super(JsonNode.class, new JSONSerializer());
        this.includeMetadata = includeMetadata;
    }

    @Override
    public JsonNode extractEvent(EventRead<JsonNode> eventRead) {
        JsonNode node = eventRead.getEvent();
        if (includeMetadata) {
            return ((ObjectNode) node).put("eventpointer", eventRead.getEventPointer().toBytes().array());
        }
        return node;
    }
}