On November 11, 2024, the Apache Flink neighborhood launched a brand new model of AWS companies connectors, an AWS open supply contribution. This new launch, model 5.0.0, introduces a brand new supply connector to learn knowledge from Amazon Kinesis Knowledge Streams. On this put up, we clarify how the brand new options of this connector can enhance efficiency and reliability of your Apache Flink utility.
Apache Flink has each a supply and sink connector, to learn from and write to Kinesis Knowledge Streams. On this put up, we deal with the brand new supply connector, as a result of model 5.0.0 doesn’t introduce new performance for the sink.
Apache Flink is a framework and distributed stream processing engine designed to carry out computation at in-memory pace and at any scale. Amazon Managed Service for Apache Flink gives a completely managed, serverless expertise to run your Flink functions, applied in Java, Python or SQL, and utilizing all of the APIs out there in Flink: SQL, Desk, DataStream, and ProcessFunction API.
Apache Flink connectors
Flink helps studying and writing knowledge to exterior programs, by way of connectors, that are elements that permit your utility to work together with stream-storage message brokers, databases, or object shops. Kinesis Knowledge Streams is a well-liked supply and vacation spot for streaming functions. Flink offers each supply and sink connectors for Kinesis Knowledge Streams.
The next diagram illustrates a pattern structure.
Earlier than continuing additional, it’s vital to make clear three phrases usually used interchangeably in knowledge streaming and within the Apache Flink documentation:
- Kinesis Knowledge Streams refers back to the Amazon service
- Kinesis supply and Kinesis client consult with the Apache Flink elements, specifically the supply connectors, that permits studying knowledge from Kinesis Knowledge Streams
- On this put up, we use the time period stream referring to a single Kinesis knowledge stream
Introducing the brand new Flink Kinesis supply connector
The launch of the model 5.0.0 of AWS connectors introduces a brand new connector for studying occasions from Kinesis Knowledge Streams. The brand new connector is named Kinesis Streams Supply and supersedes the Kinesis Client because the supply connector for Kinesis Knowledge Streams.
The brand new connector introduces a number of new options and adheres to the brand new Flink Supply
interface, and is appropriate with Flink 2.x, the primary main model launch by the Flink neighborhood. Flink 2.x introduces a variety of breaking adjustments, together with eradicating the SourceFunction
interface utilized by legacy connectors. The legacy Kinesis Client will not work with Flink 2.x.
Organising the connector is barely completely different than with the legacy Kinesis connector. Let’s begin with the DataStream API.
The best way to use the brand new connector with the DataStream API
So as to add the brand new connector to your utility, you want to replace the connector dependency. For the DataStream API, the dependency has modified its identify to flink-connector-aws-kinesis-streams
.
On the time of writing, the most recent connector model is 5.0.0 and it helps the newest secure Flink variations, 1.19 and 1.20. The connector can be appropriate with Flink 2.0, however no connector has been formally launched for Flink 2.x but. Assuming you might be utilizing Flink 1.20, the brand new dependency is the next:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis streams</artifactId>
<model>5.0.0-1.20</model>
</dependency>
The connector makes use of the brand new Flink Supply
interface. This interface implements the brand new FLIP-27 commonplace, and replaces the legacy SourceFunction
interface that has been deprecated. SourceFunction
might be fully eliminated in Flink 2.x.
In your utility, now you can use a fluent and expressive builder interface to instantiate and configure the supply. The minimal setup solely requires the stream Amazon Useful resource Title (ARN) and the deserialization schema:
KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder()
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
.setDeserializationSchema(new SimpleStringSchema())
.construct();
The brand new supply class is named KinesisStreamSource
. To not be confused with the legacy supply, FlinkKinesisConsumer
.
You possibly can then add the supply to the execution atmosphere utilizing the brand new fromSource()
methodology. This methodology requires explicitly specifying the watermark technique, together with a reputation for the supply:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
DataStream<String> kinesisRecordsWithEventTimeWatermarks = env.fromSource(
kdsSource,
WatermarkStrategy.<String>forMonotonousTimestamps()
.withIdleness(Period.ofSeconds(1)),
"Kinesis supply");
These few strains of code introduce among the predominant adjustments within the interface of the connector, which we focus on within the following sections.
Stream ARN
Now you can outline the Kinesis knowledge stream ARN, versus the stream identify. This makes it less complicated to eat from streams cross-Area and cross-account.
When working in Amazon Managed Service for Apache Flink, you solely want so as to add to the applying AWS Id and Entry Administration (IAM) function permissions to entry the stream. The ARN permits pointing to a stream situated in a special AWS Area or account, with out assuming roles or passing any exterior credentials.
Specific watermark
One of the vital vital traits of the brand new Supply
interface is that you must explicitly outline a watermark technique while you connect the supply to the execution atmosphere. In case your utility solely implements processing-time semantics, you may specify WatermarkStrategy.noWatermarks()
.
That is an enchancment by way of code readability. Wanting on the supply, you realize instantly which sort of watermark you’ve gotten, or for those who don’t have any. Beforehand, many connectors have been offering some sort of default watermarks that the person might override. Nonetheless, the default watermark of every connector was barely completely different and complicated for the person.
With the brand new connector, you may obtain the identical conduct because the legacy FlinkKinesisConsumer
default watermarks, utilizing WatermarkStrategy.forMonotonousTimestamps()
, as proven within the earlier instance. This technique generates watermarks primarily based on the approximateArrivalTimestamp
returned by Kinesis Knowledge Streams. This timestamp corresponds to the time when the file was printed to Kinesis Knowledge Streams.
Idleness and watermark alignment
With the watermark technique, you may moreover outline an idleness, which permits the watermark to progress even when some shards of the stream are idle and receiving no information. Check with Dealing With Idle Sources for extra particulars about idleness and watermark turbines.
A function launched by the brand new Supply
interface, and absolutely supported by the brand new Kinesis supply, is watermark alignment. Watermark alignment works in the other way of idleness. It slows down consuming from a shard that’s progressing quicker than others. That is significantly helpful when replaying knowledge from a stream, to scale back the quantity of information buffered within the utility state. Check with Watermark alignment for extra particulars.
Arrange the connector with the Desk API and SQL
Assuming you might be utilizing Flink 1.20, the dependency containing each Kinesis supply and sink for the Desk API and SQL is the next (each Flink 1.19 and 1.20 are supported, regulate the model accordingly):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis</artifactId>
<model>5.0.0-1.20</model>
</dependency>
This dependency comprises each the brand new supply and the legacy supply. Check with Versioning in case you might be planning to make use of each in the identical utility.
When defining the supply in SQL or the Desk API, you employ the connector identify kinesis, because it was with the legacy supply. Nonetheless, many parameters have modified with the brand new supply:
CREATE TABLE KinesisTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`conduct` STRING,
`ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
'connector' = 'kinesis',
'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
'aws.area' = 'us-east-1',
'supply.init.place' = 'LATEST',
'format' = 'csv'
);
A few notable connector choices modified from the legacy supply are:
stream.arn
specifies the stream ARN, versus the stream identify used within the legacy supply.init.initpos
defines the beginning place. This feature works equally to the legacy supply, however the possibility identify is completely different. It was beforehandscan.stream.initpos
.
For the complete checklist of connector choices consult with Connector Choices.
New options and enhancements
On this part, we focus on a very powerful options launched by the brand new connector. These options can be found within the DataStream API, and likewise the Desk API and SQL.
Ordering ensures
Crucial enchancment launched by the brand new connector is about ordering ensures.
With Kinesis Knowledge Streams, the order of the message is retained per partitionId
. That is achieved by placing all information with the identical partitionId
in the identical shard. Nonetheless, when the stream scales, splitting or merging shards, information with the identical partitionId
find yourself in a brand new shard. Kinesis retains observe of the parent-child lineage when resharding occurs.
One recognized limitation of the legacy Kinesis supply is that it was unable to comply with the parent-child shard lineage. As a consequence, ordering couldn’t be assured when resharding occurs. The issue was significantly related when the applying replayed previous messages from a stream that had been resharded as a result of ordering could be misplaced. This additionally made watermark technology and event-time processing non-deterministic.
With the brand new connector, ordering is retained additionally when resharding occurs. That is achieved following the parent-child shard lineage, and consuming all information from a guardian shard earlier than continuing with the kid shard.
A greater default shard assigner
Every Kinesis knowledge stream is comprised of many shards. Additionally, the Flink supply operator runs in a number of parallel subtasks. The shard assigner is the part that decides tips on how to assign the shards of the stream throughout the supply subtasks. The shard assigner’s job is non-trivial, as a result of shard break up or merge operations (resharding) may occur when the stream scales up or down.
The brand new connector comes with a brand new default assigner, UniformShardAssigner
. This assigner maintains uniform distribution of the stream partitionId
throughout parallel subtasks, additionally when resharding occurs. That is achieved by trying on the vary of partition keys (HashKeyRange
) of every shard.
This shard assigner was already out there within the earlier connector model, however for backward compatibility, it was not the default and also you needed to set it up explicitly. That is not the case with the brand new supply. The previous default shard assigner, the legacy FlinkKinesisConsumer
, was evenly distributing shards (not partitionId
) throughout subtasks. On this case, the precise knowledge distribution may develop into uneven within the case of resharding, due to the mixture of open and closed shards within the stream. Check with Shard Task Technique for extra particulars.
Lowered JAR measurement
The dimensions of the JAR file has been diminished by 99%, from about 60 MB all the way down to 200 KB. This considerably reduces the scale of the fat-JAR of your utility utilizing the connector. A smaller JAR can pace up many operations that require redeploying the applying.
AWS SDK for Java 2.x
The brand new connector relies on the newer AWS SDK for Java 2.x, which provides a number of options and improves help for non-blocking I/O. This makes the connector future-proof as a result of the AWS SDK v1 will attain end-of-support by finish of 2025.
AWS SDK built-in retry technique
The brand new connector depends on the AWS SDK built-in retry technique, versus a customized technique applied by the legacy connector. Counting on the AWS SDK improves the classification of some errors as retriable or non-retriable.
Eliminated dependency on the Kinesis Shopper Library and Kinesis Producer Library
The brand new connector package deal not consists of the Kinesis Shopper Library (KCL) and Kinesis Producer Library (KPL), contributing to the substantial discount of the JAR measurement that we have now talked about.
An implication of this variation is that the brand new connector not helps de-aggregation out of the field. Except you might be publishing information to the stream utilizing the KPL and also you enabled aggregation, this is not going to make any distinction for you. In case your producers use KPL aggregation, you may take into account implementing a customized DeserializationSchema
to de-aggregate the information within the supply.
Migrating from the legacy connector
Flink sources sometimes save the place within the checkpoint and savepoints, referred to as snapshots in Amazon Managed Service for Apache Flink. Whenever you cease and restart the applying, or while you replace the applying to deploy a change, the default conduct is saving the supply place within the snapshot simply earlier than stopping the applying, and restoring the place when the applying restarts. This permits Flink to offer exactly-once ensures on the supply.
Nonetheless, as a result of main adjustments launched by the brand new KinesisSource
, the saved state is not appropriate with the legacy FlinkKinesisConsumer
. Which means that while you improve the supply of an current utility, you may’t immediately restore the supply place from the snapshot.
Because of this, migrating your utility to the brand new supply requires some consideration. The precise migration course of depends upon your use case. There are two basic situations:
- Your utility makes use of the DataStream API and you might be following Flink finest practices defining a UID on every operator
- Your utility makes use of the Desk API or SQL, or your utility used the DataStream API and you aren’t defining a UID on every operator
Let’s cowl every of those situations.
Your utility makes use of the DataStream API and you might be defining a UID on every operator
On this case, you may take into account selectively resetting the state of the supply operator, retaining every other utility state. The final method is as follows:
- Replace your utility dependencies and code, changing the
FlinkKinesisConsumer
with the brand newKinesisSource
. - Change the UID of the supply operator (use a special string). Go away all different operators’ UIDs This can selectively reset the state of the supply whereas retaining the state of all different operators.
- Configure the supply beginning place utilizing
AT_TIMESTAMP
and set the timestamp to only earlier than the second you’ll deploy the change. See Configuring Beginning Place to discover ways to set the beginning place. We advocate passing the timestamp as a runtime property to make this extra versatile. The configured supply beginning place is used solely when the applying can’t restore the state from a savepoint (or snapshot). On this case, we’re intentionally forcing this, altering the UID of the supply operator. - Replace the Amazon Managed Service for Apache Flink utility, choosing the brand new JAR containing the modified utility. Restart from the most recent snapshot (default conduct) and choose
allowNonRestoredState = true
. With out this flag, Flink would forestall restarting the applying, not having the ability to restore the state of the previous supply that was saved within the snapshot. See Savepointing for extra particulars aboutallowNonRestoredState
.
This method will trigger the reprocessing of some information from the supply, and inside state exactly-once consistency could be damaged. Rigorously consider the impression of reprocessing in your utility, and the impression of duplicates on the downstream programs.
Your utility makes use of the Desk API or SQL, or your utility used the DataStream API and you aren’t defining a UID on every operator
On this case, you may’t selectively reset the state of the supply operator.
Why does this occur? When utilizing the Desk API or SQL, or the DataStream API with out defining the operator’s UID explicitly, Flink routinely generates identifiers for all operators primarily based on the construction of the job graph of your utility. These identifiers are used to establish the state of every operator when saved within the snapshots, and to revive it to the proper operator while you restart the applying.
Modifications to the applying may trigger adjustments within the underlying knowledge move. This adjustments the auto-generated identifier. In case you are utilizing the DataStream API and you might be specifying the UID, Flink makes use of your identifiers as a substitute of the auto-generated identifies, and is ready to map again the state to the operator, even while you make adjustments to the applying. That is an intrinsic limitation of Flink, defined in Set UUIDs For All Operators. Enabling allowNonRestoredState
doesn’t remedy this downside, as a result of Flink isn’t capable of map the state saved within the snapshot with the precise operators, after the adjustments.
In our migration state of affairs, the one possibility is resetting the state of your utility. You possibly can obtain this in Amazon Managed Service for Apache Flink by choosing Skip restore from snapshot (SKIP_RESTORE_FROM_SNAPSHOT
) while you deploy the change that replaces the supply connector.
After the applying utilizing the brand new supply is up and working, you may change again to the default conduct of when restarting the applying, utilizing the most recent snapshots (RESTORE_FROM_LATEST_SNAPSHOT
). This fashion, no knowledge loss occurs when the applying is restarted.
Choosing the proper connector package deal and model
The dependency model you want to decide is often <connector-version>-<flink-version>
. For instance, the most recent Kinesis connector model is 5.0.0. In case you are utilizing a Flink runtime model 1.20.x, your dependency for the DataStream API is 5.0.0-1.20
.
For essentially the most up-to-date connector variations, see Use Apache Flink connectors with Managed Service for Apache Flink.
Connector artifact
In earlier variations of the connector (4.x and earlier than), there have been separate packages for the supply and sink. This extra stage of complexity has been eliminated with model 5.x.
In your Java utility, or Python functions the place you package deal JAR dependencies utilizing Maven, as proven within the Amazon Managed Service for Apache Flink examples GitHub repository, the next dependency comprises the brand new model of each supply and sink connectors:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<model>5.0.0-1.20</model>
</dependency>
Ensure you’re utilizing the most recent out there model. On the time of writing, that is 5.0.0. You possibly can confirm the out there artifact variations in Maven Central. Additionally, use the proper model relying in your Flink runtime model. The earlier instance is for Flink 1.20.0.
Connector artifacts for Python utility
In the event you use Python, we advocate packaging JAR dependencies utilizing Maven, as proven within the Amazon Managed Service for Apache Flink examples GitHub repository. Nonetheless, for those who’re passing immediately a single JAR to your Amazon Managed Service for Apache Flink utility, you want to use the artifact that features all transitive dependencies. Within the case of the brand new Kinesis supply and sink, that is referred to as flink-sql-connector-aws-kinesis-streams
. This artifact consists of solely the brand new supply. Check with Amazon Kinesis Knowledge Streams SQL Connector for the best package deal, in case you wish to use each the brand new and the legacy supply.
Conclusion
The brand new Flink Kinesis supply connector introduces many new options that enhance stability and efficiency, and prepares your utility for Flink 2.x. Help for watermark idleness and alignment is a very vital function in case your utility makes use of event-time semantics. The power to retain file ordering improves knowledge consistency, specifically when stream resharding occurs, and while you replay previous knowledge from a stream that has been reshared.
You must rigorously plan the change for those who’re migrating your utility from the legacy Kinesis supply connector, and ensure you comply with Flink’s finest practices like specifying a UID on all DataStream operators.
Yow will discover a working instance of Java DataStream API utility utilizing the brand new connector, within the Amazon Managed Service for Apache Flink samples GitHub repository.
To be taught extra in regards to the new Flink Kinesis supply connector, consult with Amazon Kinesis Knowledge Streams Connector and Amazon Kinesis Knowledge Streams SQL Connector.
In regards to the Creator
Lorenzo Nicora works as a Senior Streaming Options Architect at AWS, serving to clients throughout EMEA. He has been constructing cloud-centered, data-intensive programs for over 25 years, working throughout industries each by way of consultancies and product firms. He has used open supply applied sciences extensively and contributed to a number of initiatives, together with Apache Flink.