Table Connector
The Flink connector library for Pravega provides a table source and table sink for use with the Flink Table API. The Table API provides a unified table source API for both the Flink streaming and batch environment, and also sink for the Flink streaming environment.
It is possible to treat the Pravega streams as tables with the help of Flink.
See the below sections for details.
Table of Contents
- Introduction
- How to create a table
- Connector options
- Features
- Batch and Streaming read
- Specify start and end streamcut
- Changelog Source
- Routing key by column
- Consistency guarantees
- Useful Flink links
Introduction
Before Flink 1.10 connector, the connector has implemented Flink legacy TableFactory
interface to support table mapping,
and provided FlinkPravegaTableSource
and FlinkPravegaTableSink
to read and write Pravega as Flink tables via a Pravega descriptor.
Since Flink 1.11 connector, as Flink introduces a new Table API with FLIP-95,
we integrate Flink Factory
interface and provided FlinkPravegaDynamicTableSource
and FlinkPravegaDynamicTableSink
to simplify the application coding.
Note that the legacy table API is deprecated and will be removed in the future releases, we strongly suggest users to switch to the new table API. We will focus on the new table API introduction in the document below, please refer to the documentation of older versions if you want to check the legacy table API.
Pravega table source supports both the Flink streaming and batch environments. Pravega table sink is an append-only table sink, it does NOT support upsert/retract output.
How to create a table
Pravega Stream can be used as a table source/sink within a Flink table program. The example below shows how to create a table connecting a Pravega stream as both source and sink:
create table pravega (
user_id STRING,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
log_ts TIMESTAMP(3),
ts as log_ts + INTERVAL '1' SECOND,
watermark for ts as ts
)
with (
'connector' = 'pravega'
'controller-uri' = 'tcp://localhost:9090',
'scope' = 'scope',
'scan.execution.type' = 'streaming',
'scan.reader-group.name' = 'group1',
'scan.streams' = 'stream',
'sink.stream' = 'stream',
'sink.routing-key.field.name' = 'user_id',
'format' = 'json'
)
Connector options
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'pravega' |
controller-uri | required | (none) | String | Pravega controller URI |
security.auth-type | optional | (none) | String | Static authentication/authorization type for security |
security.auth-token | optional | (none) | String | Static authentication/authorization token for security |
security.validate-hostname | optional | (none) | Boolean | If host name validation should be enabled when TLS is enabled |
security.trust-store | optional | (none) | String | Trust Store for Pravega client |
scan.execution.type | optional | streaming | String | Execution type for scan source. Valid values are 'streaming', 'batch'. |
scan.reader-group.name | required for streaming source | (none) | String | Pravega reader group name |
scan.streams | required for source | (none) | List |
Semicolon-separated list of stream names from which the table is read. |
scan.start-streamcuts | optional | (none) | List |
Semicolon-separated list of base64 encoded strings for start streamcuts, begin of the stream if not specified |
scan.end-streamcuts | optional | (none) | List |
Semicolon-separated list of base64 encoded strings for end streamcuts, unbounded end if not specified |
scan.reader-group.max-outstanding-checkpoint-request | optional | 3 | Integer | Maximum outstanding checkpoint requests to Pravega |
scan.reader-group.refresh.interval | optional | 3 s | Duration | Refresh interval for reader group |
scan.event-read.timeout.interval | optional | 1 s | Duration | Timeout for the call to read events from Pravega |
scan.reader-group.checkpoint-initiate-timeout.interval | optional | 5 s | Duration | Timeout for call that initiates the Pravega checkpoint |
sink.stream | required for sink | (none) | String | Stream name to which the table is written |
sink.semantic | optional | at-least-once | String | Semantic when commit. Valid values are 'at-least-once', 'exactly-once', 'best-effort' |
sink.txn-lease-renewal.interval | optional | 30 s | Duration | Transaction lease renewal period, valid for exactly-once semantic. |
sink.enable.watermark-propagation | optional | false | Boolean | If watermark propagation should be enabled from Flink table to Pravega stream |
sink.routing-key.field.name | optional | (none) | String | Field name to use as a Pravega event routing key, field type must be STRING, random routing if not specified. |
Features
Batch and Streaming read
scan.execution.type
can be specified as user's choice to perform batch read or streaming read.
In the streaming environment, the table source uses a FlinkPravegaReader
connector.
In the batch environment, the table source uses a FlinkPravegaInputFormat
connector.
Please see the documentation of Streaming Connector and Batch Connector to have a better understanding on the below mentioned parameter list.
Specify start and end streamcut
A StreamCut
represents a consistent position in the stream, and can be fetched from other applications uses Pravega client through checkpoints or custom defined index.
scan.start-streamcuts
and scan.end-streamcuts
can be specified to perform bounded read and "start-at-some-point" read for Pravega streams.
Pravega source supports read from multiple streams, and if read from multiple streams, please make sure the order of the streamcuts keeps the same as the order of the streams.
Changelog Source
If messages in Pravega stream is change event captured from other databases using CDC tools, then you can use a CDC format to interpret messages as INSERT/UPDATE/DELETE messages into Flink SQL system.
Flink provides two CDC formats debezium-json
and canal-json
to interpret change events captured by Debezium and Canal.
The changelog source is a very useful feature in many cases, such as synchronizing incremental data from databases to other systems, auditing logs, materialized views on databases, temporal join changing history of a database table and so on.
See more about how to use the CDC formats in debezium-json
and canal-json
Routing key by column
Pravega writers can use domain specific meaningful Routing Keys (like customer ID, Timestamp, Machine ID, etc.) to group similar together and make such parallelism with segment scaling.
Pravega makes ordering guarantees in terms of routing keys.
Pravega sink supports event routing according to a certain event field by specifying sink.routing-key.field.name
. This field type must be STRING
, and it will be random routing if not specified.
Consistency guarantees
By default, a Pravega sink ingests data with at-least-once guarantees if the query is executed with checkpointing enabled.
sink.semantic: exactly-once
can be specified to turn on the transactional writes with exactly-once guarantees.
Useful Flink links
Users can try with Pravega table APIs quickly though Flink SQL client. Here is some tutorial to setup the environment. https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
The usage and definition of time attribute and WATERMARK schema can be referred in: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html