Table Connector
The Flink connector library for Pravega provides a table source and table sink for use with the Flink Table API. The Table API provides a unified API for both the Flink streaming and batch environment. See the below sections for details.
FlinkPravegaJsonTableSource
andFlinkPravegaJsonTableSink
implementation has been deprecated and replaced with ConnectorDescriptor / TableFactory based implementation introduced in Flink 1.6. With these changes, it is possible to use the Pravega Table API either programmatically (using Pravega Descriptor) or declaratively through YAML configuration files for the SQL client.
Table of Contents
- Table Source
- Parameters
- Custom Formats
- Time Attribute Support
- Pravega watermark (Evolving))
- Table Sink
- Parameters
- Custom Formats
- Using SQL Client
- Environment File
Table Source
A Pravega Stream may be used as a table source within a Flink table program. The Flink Table API is oriented around Flink's TableSchema
classes which describe the table fields. A concrete subclass of FlinkPravegaTableSource
is then used to parse raw stream data as Row
objects that conform to the table schema.
Example
The following example uses the provided table source to read JSON-formatted events from a Pravega Stream:
// define table schema definition
Schema schema = new Schema()
.field("user", Types.STRING())
.field("uri", Types.STRING())
.field("accessTime", Types.SQL_TIMESTAMP()).rowtime(
new Rowtime().timestampsFromField("accessTime")
.watermarksPeriodicBounded(30000L));
// define pravega reader configurations using Pravega descriptor
Pravega pravega = new Pravega();
pravega.tableSourceReaderBuilder()
.withReaderGroupScope(stream.getScope())
.forStream(stream)
.withPravegaConfig(pravegaConfig);
// (Option-1) Streaming Source
StreamExecutionEnvironment execEnvRead = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnvRead);
StreamTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true).deriveSchema())
.withSchema(schema)
.inAppendMode();
final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(desc);
final TableSource<?> source = TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
.createStreamTableSource(propertiesMap);
tableEnv.registerTableSource("MyTableRow", source);
String sqlQuery = "SELECT user, count(uri) from MyTableRow GROUP BY user";
Table result = tableEnv.sqlQuery(sqlQuery);
...
// (Option-2) Batch Source
ExecutionEnvironment execEnvRead = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnvRead);
execEnvRead.setParallelism(1);
BatchTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true).deriveSchema())
.withSchema(schema);
final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(desc);
final TableSource<?> source = TableFactoryService.find(BatchTableSourceFactory.class, propertiesMap)
.createBatchTableSource(propertiesMap);
tableEnv.registerTableSource("MyTableRow", source);
String sqlQuery = "SELECT ...";
Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
...
@deprecated
// Create a Flink Table environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Load the Pravega configuration
PravegaConfig config = PravegaConfig.fromParams(params);
String[] fieldNames = {"user", "uri", "accessTime"};
// Read data from the stream using Table reader
TableSchema tableSchema = TableSchema.builder()
.field("user", Types.STRING())
.field("uri", Types.STRING())
.field("accessTime", Types.SQL_TIMESTAMP())
.build();
FlinkPravegaJsonTableSource source = FlinkPravegaJsonTableSource.builder()
.forStream(stream)
.withPravegaConfig(pravegaConfig)
.failOnMissingField(true)
.withRowtimeAttribute("accessTime",
new ExistingField("accessTime"),
new BoundedOutOfOrderTimestamps(30000L))
.withSchema(tableSchema)
.withReaderGroupScope(stream.getScope())
.build();
// (Option-1) Read table as stream data
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerTableSource("MyTableRow", source);
String sqlQuery = "SELECT user, count(uri) from MyTableRow GROUP BY user";
Table result = tableEnv.sqlQuery(sqlQuery);
...
// (Option-2) Read table as batch data (use tumbling window as part of the query)
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerTableSource("MyTableRow", source);
String sqlQuery = "SELECT user, " +
"TUMBLE_END(accessTime, INTERVAL '5' MINUTE) AS accessTime, " +
"COUNT(uri) AS cnt " +
"from MyTableRow GROUP BY " +
"user, TUMBLE(accessTime, INTERVAL '5' MINUTE)";
Table result = tableEnv.sqlQuery(sqlQuery);
...
Parameters
A builder API is provided to construct an concrete subclass of FlinkPravegaTableSource
. See the table below for a summary of builder properties. Note that the builder accepts an instance of PravegaConfig
for common configuration properties. See the configurations page for more information.
Note that the table source supports both the Flink streaming and batch environments. In the streaming environment, the table source uses a FlinkPravegaReader
connector. In the batch environment, the table source uses a FlinkPravegaInputFormat
connector. Please see the documentation of Streaming Connector and Batch Connector to have a better understanding on the below mentioned parameter list.
Method | Description |
---|---|
withPravegaConfig |
The Pravega client configuration, which includes connection info, security info, and a default scope. |
forStream |
The stream to be read from, with optional start and/or end position. May be called repeatedly to read numerous streams in parallel. |
uid |
The uid to identify the checkpoint state of this source. Applies only to streaming API. |
withReaderGroupScope |
The scope to store the Reader Group synchronization stream into. Applies only to streaming API. |
withReaderGroupName |
The Reader Group name for display purposes. Applies only to streaming API. |
withReaderGroupRefreshTime |
The interval for synchronizing the Reader Group state across parallel source instances. Applies only to streaming API. |
withCheckpointInitiateTimeout |
The timeout for executing a checkpoint of the Reader Group state. Applies only to streaming API. |
withTimestampAssigner |
(Evolving) The AssignerWithTimeWindows implementation to implementation which describes the event timestamp and Pravega watermark strategy in event time semantics. Applies only to streaming API. |
The below configurations are applicable only for the deprecated
FlinkPravegaJsonTableSource
implementation.
Method | Description |
---|---|
withSchema |
The table schema which describes which JSON fields to expect. |
withProctimeAttribute |
The name of the processing time attribute in the supplied table schema. |
withRowTimeAttribute |
supply the name of the rowtime attribute in the table schema, a TimeStampExtractor instance to extract the rowtime attribute value from the event and a WaterMarkStratergy to generate watermarks for the rowtime attribute. |
failOnMissingField |
A flag indicating whether to fail if a JSON field is missing. |
Custom Formats
@deprecated and the steps outlined in this section is applicable only for FlinkPravegaJsonTableSource
based implementation. Please use Pravega
descriptor instead.
To work with stream events in a format other than JSON, extend FlinkPravegaTableSource
. Please see the implementation of FlinkPravegaJsonTableSource
for more details.
Time Attribute Support
@deprecated and the steps outlined in this section is applicable only for FlinkPravegaJsonTableSource
based implementation. Please use Pravega
descriptor instead.
With the use of withProctimeAttribute
or withRowTimeAttribute
builder method, one could supply the time attribute information of the event. The configured field must be present in the table schema and of type Types.SQL_TIMESTAMP()
.
Pravega watermark (Evolving)
Pravega watermark for Table API Reader depends on the underlying DataStream settings. The following example shows how to read data with watermark by a table source.
// A user-defined implementation of `AssignerWithTimeWindows`, the event type should be `Row`
public static class MyAssigner extends LowerBoundAssigner<Row> {
public MyAssigner() {}
@Override
public long extractTimestamp(Row element, long previousElementTimestamp) {
// The third attribute of the element is the event timestamp
return (long) element.getField(2);
}
}
Pravega pravega = new Pravega();
pravega.tableSourceReaderBuilder()
// Assign the watermark in the source
.withTimestampAssigner(new MyAssigner())
.withReaderGroupScope(stream.getScope())
.forStream(stream)
.withPravegaConfig(pravegaConfig);
final ConnectTableDescriptor tableDesc = new TestTableDescriptor(pravega)
.withFormat(...)
.withSchema(
new Schema()
.field(...)
// Use the timestamp and Pravega watermark defined in the source
.rowtime(new Rowtime()
.timestampsFromSource()
.watermarksFromSource()
))
.inAppendMode();
Table Sink
A Pravega Stream may be used as an append-only table within a Flink table program. The Flink Table API is oriented around Flink's TableSchema
classes which describe the table fields. A concrete subclass of FlinkPravegaTableSink
is then used to write table rows to a Pravega Stream in a particular format.
Example
The following example uses the provided table sink to write JSON-formatted events to a Pravega Stream:
// (Option-1) Streaming Sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table table = tableEnv.fromDataStream(env.fromCollection(Arrays.asList(...));
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.forStream(stream)
.withPravegaConfig(setupUtils.getPravegaConfig());
StreamTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true).deriveSchema())
.withSchema(new Schema().field("category", Types.STRING()).field("value", Types.INT()))
.inAppendMode();
desc.registerTableSink("test");
final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(desc);
final TableSink<?> sink = TableFactoryService.find(StreamTableSinkFactory.class, propertiesMap)
.createStreamTableSink(propertiesMap);
table.writeToSink(sink);
env.execute();
// (Option-2) Batch Sink
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table table = tableEnv.fromDataSet(env.fromCollection(Arrays.asList(...));
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.forStream(stream)
.withPravegaConfig(setupUtils.getPravegaConfig());
BatchTableDescriptor desc = tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true).deriveSchema())
.withSchema(new Schema().field("category", Types.STRING()).field("value", Types.INT()));
desc.registerTableSink("test");
final Map<String, String> propertiesMap = DescriptorProperties.toJavaMap(desc);
final TableSink<?> sink = TableFactoryService.find(BatchTableSinkFactory.class, propertiesMap)
.createBatchTableSink(propertiesMap);
table.writeToSink(sink);
env.execute();
@deprecated
// Create a Flink Table environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Load the Pravega configuration
PravegaConfig config = PravegaConfig.fromParams(ParameterTool.fromArgs(args));
// Define a table (see Flink documentation)
Table table = ...
// Write the table to a Pravega Stream
FlinkPravegaJsonTableSink sink = FlinkPravegaJsonTableSink.builder()
.forStream("sensor_stream")
.withPravegaConfig(config)
.withRoutingKeyField("sensor_id")
.withWriterMode(EXACTLY_ONCE)
.build();
table.writeToSink(sink);
Parameters
A builder API is provided to construct a concrete subclass of FlinkPravegaTableSink
. See the table below for a summary of builder properties. Note that the builder accepts an instance of PravegaConfig
for common configuration properties. See the configurations page for more information.
Note that the table sink supports both the Flink streaming and batch environments. In the streaming environment, the table sink uses a FlinkPravegaWriter connector. In the batch environment, the table sink uses a FlinkPravegaOutputFormat connector. Please see the documentation of Streaming Connector and Batch Connector to have a better understanding on the below mentioned parameter list.
Method | Description |
---|---|
withPravegaConfig |
The Pravega client configuration, which includes connection info, security info, and a default scope. |
forStream |
The stream to be written to. |
withWriterMode |
The writer mode to provide Best-effort, At-least-once, or Exactly-once guarantees. |
withTxnTimeout |
The timeout for the Pravega Tansaction that supports the exactly-once writer mode. |
withRoutingKeyField |
The table field to use as the Routing Key for written events. |
enableWatermark |
true or false to enable/disable the event-time watermark emitting into Pravega stream. |
The below configurations are applicable only for the deprecated
FlinkPravegaJsonTableSink
implementation.
Method | Description |
---|---|
withSchema |
The table schema which describes which JSON fields to expect. |
Custom Formats
@deprecated and the steps outlined in this section is applicable only for FlinkPravegaJsonTableSink
based implementation. Please use Pravega
descriptor instead.
To work with stream events in a format other than JSON, extend FlinkPravegaTableSink
. Please see the implementation of FlinkPravegaJsonTableSink for more details.
Using SQL Client
Flink Sql Client was introduced in Flink 1.6 which aims at providing an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.
It is now possible to access Pravega streams using standard SQL commands through Flink's SQL client. To do so, the following files have to copied to Flink cluster library $FLINK_HOME/lib
path
- Pravega connector jar
- Flink JSON jar (to serialize/deserialize data in json format)
- Flink Avro jar (to serialize/deserialize data in avro format)
Flink format jars can be downloaded from maven central repository.
In a nutshell, here is what we need to do to use Flink SQL client with Pravega.
1. Download Flink binary version supported by the connector.
2. Make sure to copy flink-table.jar and flink-sql-client.jar from $FLINK-HOME/opt/ to $FLINK-HOME/lib/ location.
3. Copy Flink format jars (json, avro) from maven central to $FLINK-HOME/lib/ location.
4. Copy Flink Pravega connector jar file to $FLINK-HOME/lib/ location.
5. Prepare SQL client configuration file (that contains Pravega connector descriptor configurations). Make sure to create any Pravega streams that you will be accessing from SQL client shell ahead of time.
6. Run SQL client shell in embedded mode using the command $FLINK-HOME/bin/sql-client.sh embedded -d <SQL_configuration_file>
7. Run SELECT 'Hello World'
from SQL client shell and make sure it does not throw any errors. It should show an empty results screen if there are no errors.
8. After these steps, you could run SQL commands from the SQL client shell prompt to interact with Pravega.
For more details on how to setup, configure and access the SQL client shell, please follow the getting started documentation.
Environment File
The YAML configuration file schema for providing Pravega table API specific connector configuration is provided below.
tables:
- name: sample # name the new table
type: source # declare if the table should be "source", "sink", or "both". If "both" provide both reader and writer configurations
update-mode: append # specify the update-mode *only* for streaming tables
# declare the external system to connect to
connector:
type: pravega
version: "1"
metrics: # optional (true|false)
connection-config:
controller-uri: # mandatory
default-scope: # optional (assuming reader or writer provides scope)
security: # optional
auth-type: # optional (base64 encoded string)
auth-token: # optional (base64 encoded string)
validate-hostname: # optional (true|false)
trust-store: # optional (truststore filename)
reader: # required only if type: source
stream-info:
- scope: test # optional (uses default-scope value or else throws error)
stream: stream1 # mandatory
start-streamcut: # optional (base64 encoded string)
end-streamcut: # optional (base64 encoded string)
- scope: test # repeating info to provide multiple stream configurations
stream: stream2
start-streamcut:
end-streamcut:
reader-group: # optional
uid: # optional
scope: # optional (uses default-scope or else throws error)
name: # optional
refresh-interval: # optional (long milliseconds)
event-read-timeout-interval: # optional (long milliseconds)
checkpoint-initiate-timeout-interval: # optional (long milliseconds)
writer: # required only if type: sink
scope: foo # optional (uses default-scope value)
stream: bar # mandatory
mode: # optional (exactly_once | atleast_once)
txn-lease-renewal-interval: # optional (long milliseconds)
routingkey-field-name: # mandatory (provide field name from schema that has to be used as routing key)
# declare a format for this system (refer flink documentation for details)
format:
# declare the schema of the table (refer flink documentation for details)
schema:
Sample Environment File
Here is a sample environment file for reference which can be used as a source as well as sink to read from and write data into Pravega as table records
tables:
- name: sample
type: both
update-mode: append
# declare the external system to connect to
connector:
type: pravega
version: "1"
metrics: true
connection-config:
controller-uri: "tcp://localhost:9090"
default-scope: wVamQsOSaCxvYiHQVhRl
reader:
stream-info:
- stream: streamX
writer:
stream: streamX
mode: atleast_once
txn-lease-renewal-interval: 10000
routingkey-field-name: category
format:
type: json
fail-on-missing-field: true
derive-schema: true
schema:
- name: category
type: VARCHAR
- name: value
type: INT
functions: []
execution:
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# parallelism of the program
parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
deployment:
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
gateway-address: ""
# (optional) port from cluster to gateway
gateway-port: 0