Making it Easier to Build Connectors with Apache Flink: Introducing the Async Sink

November 23, 2022 By Mark Otto 0

Apache Flink is a popular open source framework for stateful computations over data streams. It allows you to formulate queries that are continuously evaluated in near real time against an incoming stream of events. To persist derived insights from these queries in downstream systems, Apache Flink comes with a rich connector ecosystem that supports a wide range of sources and destinations. However, the existing connectors may not always be enough to support all conceivable use cases. Our customers and the community kept asking for more connectors and better integrations with various open source tools and services.

But that’s not an easy problem to solve. Creating and maintaining production-ready sinks for a new destination is a lot of work. For critical use cases, it’s undesirable to lose messages or to compromise on performance when writing into a destination. However, sinks have commonly been developed and maintained independently of each other. This further adds to the complexity and cost of adding sinks to Apache Flink, as more functionality had to be independently reimplemented and optimized for each sink.

To better support our customers and the entire Apache Flink community, we set out to make it easier and less time consuming to build and maintain sinks. We contributed the Async Sink to the Flink 1.15 release, which improved cloud interoperability and added more sink connectors and formats, among other updates. The Async Sink is an abstraction for building sinks with at-least-once semantics. Instead of reimplementing the same core functionality for every new sink that is created, the Async Sink provides common sink functionality that can be extended upon. In the remainder of this post, we’ll explain how the Async Sink works, how you can build a new sink based on the Async Sink, and discuss our plans to continue our contributions to Apache Flink.

Abstracting away common components with the Async Sink

Although sinks have been commonly developed in isolation, their basic functionality is often similar. Sinks buffer multiple messages to send them in a single batch request to improve efficiency. They check completed requests for success and resend messages that were not persisted successfully at a later point. They participate in Flink’s checkpointing mechanism to avoid losing any messages in case the Flink application fails and needs to recover. Lastly, sinks monitor and control the throughput to the destination to not overload it and to fairly divide the capacity amongst multiple concurrent producers. There are usually only two main things that differ between destinations: the structure and information contained in both the destination requests and responses.

Instead of creating independent sinks and duplicating all of this common functionality for every sink, we can abstract away and implement these common requirements once. To implement a new sink, developers then only need to specify those aspects that are specific to the sink: how to build and send requests, and how to identify from the response which records were not persisted successfully and need to be resent. In this way, building a new sink just requires the creation of a lightweight shim that is specific to the destination.

Building a new sink with the Async Sink abstraction

Let’s look at what it takes to build a new sink based on the Async Sink abstraction. For this example, we’ll implement a simplified sink for Amazon Kinesis Data Streams. Kinesis Data Streams is a streaming data service to capture and store data streams. Data is persisted into a kinesis stream by means of the PutRecords API that can persist multiple records with a single batch request.

There are three main aspects that are specific to our sink that we need to implement. First, we need to specify how to extract the information required to make a batch request from the event. In Kinesis Data Streams, this includes the actual payload and a partition key. Second, we need to specify how to construct and make a batch request. And third, we need to inspect the response of the request to know whether all elements of the batch request have been persisted successfully.

Let’s start with extracting the required information from an event. We need to specify how to convert an event to a so-called request entry that forms a batch request. The following code example shows what this looks like for our Kinesis Data Streams sink. The code simply specifies how to extract the actual payload and a partition key from the event and return a PutRecordsRequestEntry object. In this simplified example, we use the string representation of the event as the payload and the hash code of the event as partition key. For a more sophisticated implementation, it may be more desirable to use a serializer that is configurable and provides more flexibility on how to construct the payload and partition key to end users of the sink.

@Override
public PutRecordsRequestEntry apply(InputT event, SinkWriter.Context context) { return PutRecordsRequestEntry.builder() .data(SdkBytes.fromUtf8String(event.toString())) .partitionKey(String.valueOf(event.hashCode())) .build();
}

The sink will buffer these objects until it has collected enough of them according to the buffering hints. These buffering hints include a limit on the number of messages, total size of messages, and a timeout condition.

Next, we need to specify how to construct and make the actual batch request. This is, again, specific to the destination we are writing to, and therefore something we need to implement as part of the submitRequestEntries method that you can see in the code example below. The Async Sink invokes this method with a set of buffered request entries that should form the batch request.

For the Kinesis Data Streams sink, we need to specify how to construct and run the PutRecords request from a set of PutRecordsRequestEntry objects (Lines 6-9 in the example below). In addition to making the batch request, we also need to check the response of the PutRecords request for entries that were not persisted successfully. These entries need to be requeued in the internal buffer so the Async Sink can retry them at a later point (Lines 11-31).

@Override
protected void submitRequestEntries( List<PutRecordsRequestEntry> requestEntriesToSend, Consumer<List<PutRecordsRequestEntry>> requestEntriesToRetry) { //construct and run the PutRecords request PutRecordsRequest batchRequest = PutRecordsRequest.builder().records(requestEntriesToSend).streamName(streamName).build(); CompletableFuture<PutRecordsResponse> future = kinesisClient.putRecords(batchRequest); //check the response of the PutRecords request future.whenComplete( (response, err) -> { if (err != null) { // entire batch request failed, all request entries need to be retried requestEntriesToRetry.accept(requestEntriesToSend); } else if (response.failedRecordCount() > 0) { // some request entries in the batch request were not persisted and need to be retried List<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount()); List<PutRecordsResultEntry> records = response.records(); for (int i = 0; i < records.size(); i++) { if (records.get(i).errorCode() != null) { failedRequestEntries.add(requestEntriesToSend.get(i)); } } requestEntriesToRetry.accept(failedRequestEntries); } else { // all request entries of the batch request have been successfully persisted requestEntriesToRetry.accept(Collections.emptyList()); } });
}

That’s basically it. These are the main components of the sink you need to implement for a basic Kinesis Data Streams sink. These are parts that are specific to the destination and cannot be abstracted away.

For each event the sink receives, it applies the conversion and buffers the result. Once the conditions of the buffering hints are met, the sink will then construct and send a batch request. The buffering hints also help to satisfy constraints of the destination. For instance, the PutRecords API supports up to 500 records with a total size of 5 MiB and the buffering hints help to enforce these limits. From the response of the request, the sink identifies which request entries were not persisted correctly and requeues them in the internal queue. In addition, the sink will automatically adapt the throughput to the limits of the destination and slow down the entire Flink application by applying back pressure in case the destination becomes overloaded.

However, we left out a couple of details for the sake of simplicity. Some additional boilerplate code is required to assemble these individual pieces into a complete sink. For a production-ready sink, we would also need to extract the message size to support size-based buffering hints, implement serialization for request entries to obtain exactly once semantics, and add support for Flink’s Python and Table API. In addition, adding tests to the implementation is highly encouraged to obtain a well-tested implementation.

We have just used Kinesis Data Streams as an example here to explain the basic components that are required to create a simplified sink. We have implemented a complete and production-ready Kinesis Data Streams sink in Flink 1.15. If you want to sink data into a Kinesis data stream or are interested in a complete example, you can find the sources in the official Apache Flink GitHub repository. If you are curious to see additional examples, you can refer to the Amazon Kinesis Data Firehose sink that is also part of Flink 1.15 or a sample implementation of an Amazon CloudWatch sink.

What’s next?

We’ve started the work on the Async Sink to make it easier to build integrations with AWS services. But we soon realized that our contributions could be generalized to be useful to a much wider set of use cases. We are excited to see how the community is already using the Async Sink since it became available with the Flink 1.15 release. In addition to the sinks for Kinesis Data Streams and Amazon Kinesis Data Firehose that we have contributed, the community has been working on a sink for Amazon DynamoDB and Redis Streams. There are also efforts planned to refactor the Apache Cassandra sink implementation with the Async Sink.

We have been working on additional improvements for the Async Sink since the initial release. We’ve implemented a rate-limiting strategy that is slowing down the sink (and the entire Flink application) if the destination becomes overloaded. For the initial release, this strategy cannot be adapted easily and we are currently working to make it easier to configure the default strategy (FLIP-242: Introduce configurable RateLimitingStrategy for Async Sink). We are also seeking feedback from the community on potential future extensions.

Beyond connectors, we want to continue contributing to Apache Flink. There have been efforts in the community to create a Flink Kubernetes operator. We are currently looking to extend the capabilities of that operator with support for additional deployment modes (FLIP-225: Implement standalone mode support in the kubernetes operator). These efforts will help to improve the security posture of Flink deployments in a multi-tenant environment. Moreover, we are adding support for asynchronous job submission (FLIP-236: Asynchronous Job Submission). This will help to reduce friction when deploying Flink applications with expensive initialization work as part of their main method.

We are excited to continue to work with the open source community to improve Apache Flink. It’s great to be part of the journey to make Apache Flink even more powerful to enable stream more processing use cases. We are curious to see how the contributions will be used by others to get value from their streaming data. If you are using the Async Sink to create a sink of your own, please let us know on the Flink mailing list or by creating a ticket on the Apache Flink Jira. We’d love to get your feedback and thoughts.