Table Connector
The Flink connector library for Pravega provides a table source and table sink for use with the Flink Table API. The Table API provides a unified API for both the Flink streaming and batch environment.
It is possible to use the Pravega Table API either programmatically (using Pravega Descriptor) or declaratively through YAML configuration files for the SQL client.
See the below sections for details.
Table of Contents
- Table Source
- Parameters
- Pravega watermark (Evolving))
- Table Sink
- Parameters
- Using SQL Client
- Environment File
Table Source
A Pravega Stream may be used as a table source within a Flink table program. The Flink Table API is oriented around Flink's TableSchema
classes which describe the table fields. A concrete subclass of FlinkPravegaTableSource
is then used to parse raw stream data as Row
objects that conform to the table schema.
Example
The following example uses the provided table source to read JSON-formatted events from a Pravega Stream:
// define table schema definition
Schema schema = new Schema()
.field("user", DataTypes.STRING())
.field("uri", DataTypes.STRING())
.field("accessTime", DataTypes.TIMESTAMP(3)).rowtime(
new Rowtime().timestampsFromField("accessTime")
.watermarksPeriodicBounded(30000L));
// define pravega reader configurations using Pravega descriptor
Pravega pravega = new Pravega();
pravega.tableSourceReaderBuilder()
.withReaderGroupScope(stream.getScope())
.forStream(stream)
.withPravegaConfig(pravegaConfig);
// (Option-1) Streaming Source
StreamExecutionEnvironment execEnvRead = StreamExecutionEnvironment.getExecutionEnvironment();
// Old Planner
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(execEnvRead);
// Blink Planner, this is recommended.(Difference between 2 planners can be referred in https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners)
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(execEnvRead,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build());
tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true))
.withSchema(schema)
.inAppendMode()
// In Flink 1.10, createTemporaryTable can be used here as well unless time attributes are added into the schema.
// (ref. There are some known issues in Flink https://issues.apache.org/jira/browse/FLINK-16160)
.registerTableSource("MyTableRow");
String sqlQuery = "SELECT user, count(uri) from MyTableRow GROUP BY user";
Table result = tableEnv.sqlQuery(sqlQuery);
...
// (Option-2) Batch Source
ExecutionEnvironment execEnvRead = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnvRead);
execEnvRead.setParallelism(1);
tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true))
.withSchema(schema)
.inAppendMode()
.registerTableSource("MyTableRow");
String sqlQuery = "SELECT ...";
Table result = tableEnv.sqlQuery(sqlQuery);
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
...
### Parameters
A builder API is provided to construct an concrete subclass of `FlinkPravegaTableSource`. See the table below for a summary of builder properties. Note that the builder accepts an instance of `PravegaConfig` for common configuration properties. See the [configurations](configurations.md) page for more information.
Note that the table source supports both the Flink **streaming** and **batch environments**. In the streaming environment, the table source uses a [`FlinkPravegaReader`](streaming.md#flinkpravegareader) connector. In the batch environment, the table source uses a [`FlinkPravegaInputFormat`](batch.md#flinkpravegainputformat) connector. Please see the documentation of [Streaming Connector](streaming.md) and [Batch Connector](#batch.md) to have a better understanding on the below mentioned parameter list.
|Method |Description|
|----------------------|-----------------------------------------------------------------------|
|`withPravegaConfig`|The Pravega client configuration, which includes connection info, security info, and a default scope.|
|`forStream`|The stream to be read from, with optional start and/or end position. May be called repeatedly to read numerous streams in parallel.|
|`uid`|The uid to identify the checkpoint state of this source. _Applies only to streaming API._|
|`withReaderGroupScope`|The scope to store the Reader Group synchronization stream into. _Applies only to streaming API._|
|`withReaderGroupName`|The Reader Group name for display purposes. _Applies only to streaming API._|
|`withReaderGroupRefreshTime`|The interval for synchronizing the Reader Group state across parallel source instances. _Applies only to streaming API._|
|`withCheckpointInitiateTimeout`|The timeout for executing a checkpoint of the Reader Group state. _Applies only to streaming API._|
|`withTimestampAssigner`| (Evolving) The `AssignerWithTimeWindows` implementation to implementation which describes the event timestamp and Pravega watermark strategy in event time semantics. _Applies only to streaming API._|
### Pravega watermark (Evolving)
Pravega watermark for Table API Reader depends on the underlying DataStream settings. The following example shows how to read data with watermark by a table source.
```java
// A user-defined implementation of `AssignerWithTimeWindows`, the event type should be `Row`
public static class MyAssigner extends LowerBoundAssigner<Row> {
public MyAssigner() {}
@Override
public long extractTimestamp(Row element, long previousElementTimestamp) {
// The third attribute of the element is the event timestamp
return (long) element.getField(2);
}
}
Pravega pravega = new Pravega();
pravega.tableSourceReaderBuilder()
// Assign the watermark in the source
.withTimestampAssigner(new MyAssigner())
.withReaderGroupScope(stream.getScope())
.forStream(stream)
.withPravegaConfig(pravegaConfig);
final ConnectTableDescriptor tableDesc = new TestTableDescriptor(pravega)
.withFormat(...)
.withSchema(
new Schema()
.field(...)
// Use the timestamp and Pravega watermark defined in the source
.rowtime(new Rowtime()
.timestampsFromSource()
.watermarksFromSource()
))
.inAppendMode();
Table Sink
A Pravega Stream may be used as an append-only table within a Flink table program. The Flink Table API is oriented around Flink's TableSchema
classes which describe the table fields. A concrete subclass of FlinkPravegaTableSink
is then used to write table rows to a Pravega Stream in a particular format.
Example
The following example uses the provided table sink to write JSON-formatted events to a Pravega Stream:
// (Option-1) Streaming Sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
// Old Planner
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Blink Planner, this is recommended.(Difference between 2 planners can be referred in https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners)
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build());
Table table = tableEnv.fromDataStream(env.fromCollection(Arrays.asList(...));
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.forStream(stream)
.withPravegaConfig(setupUtils.getPravegaConfig());
tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true))
.withSchema(new Schema().field("category", DataTypes.STRING()).
field("value", DataTypes.INT()))
.registerTableSink("PravegaSink");
table.insertInto("PravegaSink");
env.execute();
// (Option-2) Batch Sink
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table table = tableEnv.fromDataSet(env.fromCollection(Arrays.asList(...));
Pravega pravega = new Pravega();
pravega.tableSinkWriterBuilder()
.withRoutingKeyField("category")
.forStream(stream)
.withPravegaConfig(setupUtils.getPravegaConfig());
tableEnv.connect(pravega)
.withFormat(new Json().failOnMissingField(true))
.withSchema(new Schema().field("category", DataTypes.STRING()).
field("value", DataTypes.INT()))
.registerTableSink("PravegaSink");
table.insertInto("PravegaSink");
env.execute();
Parameters
A builder API is provided to construct a concrete subclass of FlinkPravegaTableSink
. See the table below for a summary of builder properties. Note that the builder accepts an instance of PravegaConfig
for common configuration properties. See the configurations page for more information.
Note that the table sink supports both the Flink streaming and batch environments. In the streaming environment, the table sink uses a FlinkPravegaWriter connector. In the batch environment, the table sink uses a FlinkPravegaOutputFormat connector. Please see the documentation of Streaming Connector and Batch Connector to have a better understanding on the below mentioned parameter list.
Method | Description |
---|---|
withPravegaConfig |
The Pravega client configuration, which includes connection info, security info, and a default scope. |
forStream |
The stream to be written to. |
withWriterMode |
The writer mode to provide Best-effort, At-least-once, or Exactly-once guarantees. |
withTxnTimeout |
The timeout for the Pravega Tansaction that supports the exactly-once writer mode. |
withRoutingKeyField |
The table field to use as the Routing Key for written events. |
enableWatermark |
true or false to enable/disable the event-time watermark emitting into Pravega stream. |
Using SQL Client
Flink Sql Client was introduced in Flink 1.6 which aims at providing an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.
It is now possible to access Pravega streams using standard SQL commands through Flink's SQL client. To do so, the following files have to copied to Flink cluster library $FLINK_HOME/lib
path
- Pravega connector jar
- Flink JSON jar (to serialize/deserialize data in json format)
- Flink Avro jar (to serialize/deserialize data in avro format)
Flink format jars can be downloaded from maven central repository.
In a nutshell, here is what we need to do to use Flink SQL client with Pravega.
1. Download Flink binary version supported by the connector.
2. Make sure to copy flink-table.jar and flink-sql-client.jar from $FLINK-HOME/opt/ to $FLINK-HOME/lib/ location.
3. Copy Flink format jars (json, avro) from maven central to $FLINK-HOME/lib/ location.
4. Copy Flink Pravega connector jar file to $FLINK-HOME/lib/ location.
5. Prepare SQL client configuration file (that contains Pravega connector descriptor configurations). Make sure to create any Pravega streams that you will be accessing from SQL client shell ahead of time.
6. Run SQL client shell in embedded mode using the command $FLINK-HOME/bin/sql-client.sh embedded -d <SQL_configuration_file>
7. Run SELECT 'Hello World'
from SQL client shell and make sure it does not throw any errors. It should show an empty results screen if there are no errors.
8. After these steps, you could run SQL commands from the SQL client shell prompt to interact with Pravega.
For more details on how to setup, configure and access the SQL client shell, please follow the getting started documentation.
Environment File
The YAML configuration file schema for providing Pravega table API specific connector configuration is provided below.
tables:
- name: sample # name the new table
type: source-table # declare if the table should be "source-table", "sink-table", or "source-sink-table". If "source-sink-table" provide both reader and writer configurations
update-mode: append # specify the update-mode *only* for streaming tables
# declare the external system to connect to
connector:
type: pravega
version: "1"
metrics: # optional (true|false)
connection-config:
controller-uri: # mandatory
default-scope: # optional (assuming reader or writer provides scope)
security: # optional
auth-type: # optional (base64 encoded string)
auth-token: # optional (base64 encoded string)
validate-hostname: # optional (true|false)
trust-store: # optional (truststore filename)
reader: # required only if type: source-table
stream-info:
- scope: test # optional (uses default-scope value or else throws error)
stream: stream1 # mandatory
start-streamcut: # optional (base64 encoded string)
end-streamcut: # optional (base64 encoded string)
- scope: test # repeating info to provide multiple stream configurations
stream: stream2
start-streamcut:
end-streamcut:
reader-group: # optional
uid: # optional
scope: # optional (uses default-scope or else throws error)
name: # optional
refresh-interval: # optional (long milliseconds)
event-read-timeout-interval: # optional (long milliseconds)
checkpoint-initiate-timeout-interval: # optional (long milliseconds)
writer: # required only if type: sink-table
scope: foo # optional (uses default-scope value)
stream: bar # mandatory
mode: # optional (exactly_once | atleast_once)
txn-lease-renewal-interval: # optional (long milliseconds)
routingkey-field-name: # mandatory (provide field name from schema that has to be used as routing key)
# declare a format for this system (refer flink documentation for details)
format:
# declare the schema of the table (refer flink documentation for details)
schema:
Sample Environment File
Here is a sample environment file for reference which can be used as a source as well as sink to read from and write data into Pravega as table records
But the stream streamX
should be created in advance to have it working.
tables:
- name: sample
type: source-sink-table
update-mode: append
# declare the external system to connect to
connector:
type: pravega
version: "1"
metrics: true
connection-config:
controller-uri: "tcp://localhost:9090"
default-scope: test-scope
reader:
stream-info:
- stream: streamX
writer:
stream: streamX
mode: atleast_once
txn-lease-renewal-interval: 10000
routingkey-field-name: category
format:
type: json
fail-on-missing-field: true
schema:
- name: category
data-type: STRING
- name: value
data-type: INT
- name: timestamp
data-type: TIMESTAMP(3)
functions: []
execution:
# either 'old' (default) or 'blink',blink planner is recommended.Please refer to https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners
planner: blink
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# parallelism of the program
parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
deployment:
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
gateway-address: ""
# (optional) port from cluster to gateway
gateway-port: 0
Sample DDL
Here is a sample DDL for reference which can be used as a source as well as sink to read from and write data into Pravega as table records.
But the stream streamX
should be created in advance to have it working.
CREATE TABLE sample (
category STRING,
cnt INT,
ts TIMESTAMP(3),
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
'update-mode' = 'append',
'connector.type' = 'pravega',
'connector.version' = '1',
'connector.metrics' = 'true',
'connector.connection-config.controller-uri' = 'tcp://localhost:9090',
'connector.connection-config.default-scope' = 'test',
'connector.reader.stream-info.0.stream' = 'streamX',
'connector.writer.stream' = 'streamX',
'connector.writer.mode' = 'atleast_once',
'connector.writer.txn-lease-renewal-interval' = '10000',
'connector.writer.routingkey-field-name' = 'category'
'format.type' = 'json',
'format.fail-on-missing-field' = 'false'
)
The usage and definition of time attribute and DDL and usage of WATERMARK schema can be referred in:
- Time Attribute
- DDL Usage
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
Timestamp format issue with Flink SQL
Please refer to Table formats documentation
Here is the valid data example with the above sample yaml.
{"category": "test-category", "value": 310884, "timestamp": "2017-11-27T00:00:00Z"}