这篇文章由 Pascal Vogel 和 Philipp Klose 编写,旨在探讨 Apache Kafka 事件串流的当前趋势以及如何有效地利用 Amazon EventBridge Pipes 进行数据转换。
事件串流与 Apache Kafka 已成为现代 以数据为中心 和事件驱动架构EDAs的核心部分,它为用户行为的实时分析、异常与欺诈检测、物联网事件处理等应用场景提供了解决方案。Kafka 的流生产者与消费者通常会使用模式注册表,确保各组件在发送序列化和处理反序列化事件时遵循约定的事件结构,从而避免应用程序的错误和崩溃。
在 Kafka 中,一种常见的模式格式是 Apache Avro,它支持以紧凑的二进制格式表示复杂数据结构。为了更轻松地将 Kafka 集成到其他 AWS 和第三方服务中,AWS 提供了 Amazon EventBridge Pipes,这是一种无服务器的点对点集成服务。然而,许多下游服务通常期待 JSON 编码的事件,因此在每个下游服务中都需要实现自定义的和重复的从 Avro 到 JSON 的模式验证和转换逻辑。
本文将展示如何可靠地消费、验证、转换并发送来自 Kafka 的 Avro 事件到 AWS 和第三方服务,以利用 EventBridge Pipes,减少下游服务中的自定义反序列化逻辑。此外,您还可以使用 EventBridge 事件总线作为 Pipes 中的目标,从 Pipes 筛选和分发事件到多个目标,包括跨帐户和跨区域交付。
本文分为两个场景:
使用 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 和 AWS Glue Schema Registry。使用 Confluent Cloud 和 Confluent Schema Registry。可以查看相关的 GitHub 仓库以获取完整的源代码及详细的部署说明,具体请访问 Glue Schema Registry 或 Confluent Schema Registry。
飞跃vnp加速器在 AWS 上构建事件流应用程序时,可以使用 Amazon MSK、Confluent Cloud 或在 Amazon Elastic Compute Cloud (Amazon EC2) 实例上自托管的 Kafka。
为了避免 事件流和事件驱动架构中的常见问题,如数据不一致和不兼容,建议在事件生产者和消费者之间定义和共享事件模式。在 Kafka 中,模式注册表用于管理、演变和强制执行事件生产者和消费者的模式。 AWS Glue Schema Registry 提供了一个中心位置来发现、管理和演变模式。以 Confluent Cloud 为例,Confluent Schema Registry 也可以承担同样的角色。Glue Schema Registry 和 Confluent Schema Registry 都支持常见的模式格式,如 Avro、Protobuf 和 JSON。
要将 Kafka 与 AWS 服务、第三方服务及自有应用程序进行集成,可以使用 EventBridge Pipes。EventBridge Pipes 帮助您创建事件 源 和 目标 之间的点对点集成,同时支持可选的筛选、转换和丰富。EventBridge Pipes 使您在构建 EDA 时,能够大幅减少需要编写和维护的集成代码。
许多 AWS 和第三方服务需要 JSON 编码的有效负载事件作为输入,意味着它们不能直接消费 Avro 或 Protobuf 负载。为了解决每个消费者中的重复 Avro 到 JSON 的验证和转换逻辑,您可以使用 EventBridge Pipes 的 丰富 步骤。该解决方案利用 AWS Lambda 函数在丰富步骤中反序列化并验证 Kafka 事件的模式,处理错误并将事件转换为 JSON,然后再传递给下游服务。
本博客中介绍的解决方案包含以下关键要素:
管道源是使用 MSK 或 Confluent Cloud 部署的 Kafka 集群。EventBridge Pipes 从 Kafka 流中批量读取事件,并将其发送到丰富函数具体示例如 此处。丰富步骤Lambda 函数根据配置的模式注册表Glue 或 Confluent反序列化并验证事件,将 Avro 事件转换为 JSON,并实现集成的错误处理,然后将结果返回给管道。本示例解决方案的目标是一个通过 EventBridge Pipes 被调用的 EventBridge 自定义事件总线,该事件总线将从丰富 Lambda 函数返回的 JSON 编码事件传输至。EventBridge Pipes 支持多种目标,包括 Lambda、AWS Step Functions、Amazon API Gateway、API 目的地等,能够实现 EDA,而无需编写集成代码。在本示例解决方案中,事件总线通过一个 EventBridge 规则 将所有事件发送到 Amazon CloudWatch Logs。您可以扩展该示例以调用 其他 EventBridge 目标。可选地,您还可以为事件在 EventBridge 模式注册表 中添加 OpenAPI 3 或 JSONSchema Draft 4 的模式,您可以手动从 Avro 模式生成,或使用 EventBridge 的 模式发现。这使您可以下载各种编程语言如 JavaScript、Python 和 Java的 JSON 转换事件的代码绑定,以便在 EventBridge 目标中正确使用它们。
文章的其余部分将具体描述 Glue 和 Confluent 模式注册表的解决方案及其代码示例。
这一部分描述如何使用 EventBridge Pipes 和 Glue 模式注册表实现事件模式验证和从 Avro 到 JSON 的转换。您可以在 GitHub 找到源代码和详细的部署说明。
您需要一个运行中的 Amazon MSK 无服务器 集群,并配置 Glue 模式注册表。此示例包含一个 Avro 模式和 Glue 模式注册表。有关使用 Glue 模式注册表进行模式验证的介绍,请参阅相关的 AWS 博客文章:在 Amazon MSK 和 Amazon Kinesis 数据流中使用 AWS Glue 模式注册表验证、演变和控制模式。
使用 /AWS Cloud Development Kit (AWS CDK) 提供的模板来部署:
连接到您现有的 Amazon MSK 无服务器 Kafka 主题作为源的 EventBridge 管道,使用 AWS 身份与访问管理 (IAM) 进行身份验证。EventBridge Pipes 使用 Amazon MSK 源类型 从您的 Kafka 主题读取事件。一个用 Java 编写的丰富 Lambda 函数,用于执行事件反序列化、验证和从 Avro 到 JSON 的转换。一个 Amazon Simple Queue Service (Amazon SQS) 死信队列,用于存放反序列化失败的事件。EventBridge 自定义事件总线作为管道目标。EventBridge 规则将所有传入事件写入一个 CloudWatch Logs 日志组。对于基于 MSK 的源,EventBridge 支持配置参数,如批次窗口、批次大小和起始位置,您可以使用在示例 CDK 堆栈中的 [CfnPipe](https//docsawsamazoncom/cdk/api/v2/python/awscdkawspipes/CfnPipehtml) 类的参数进行设置。
示例的 EventBridge 管道以 10 个事件为一批从 Kafka 中消费事件,因为其目标是 EventBridge 事件总线,而事件总线的最大批次大小为 10。有关其他目标的最佳配置选择,请参阅 EventBridge Pipes 用户指南中的 批处理和并发。
此部分描述如何使用 EventBridge Pipes 和 Confluent 模式注册表实施事件模式验证和从 Avro 到 JSON 的转换。您可以在 GitHub 找到源代码和详细的部署说明。
设置此解决方案时,您需要在 Confluent Cloud 上运行的 Kafka 流以及设置好的 Confluent 模式注册表。请参考 Confluent Cloud 的相关模式注册表教程,以设置您的 Confluent Kafka 流的模式注册表。
要连接到您的 Confluent Cloud Kafka 集群,您需要 Confluent Cloud 和 Confluent 模式注册表的 API 密钥。 AWS Secrets Manager 用于安全存储您的 Confluent 秘密。
使用 AWS CDK 提供的模板来部署:
一个 EventBridge 管道,通过存储在 Secrets Manager 中的 API 密钥,连接到您现有的 Confluent Kafka 主题作为源。EventBridge Pipes 使用 自管理Apache Kafka 流源类型,从您的 Confluent Kafka 主题读取事件。一个用 Python 编写的丰富 Lambda 函数,用于执行事件反序列化、验证和从 Avro 到 JSON 的转换。一个 SQS 死信队列,用于存放反序列化失败的事件。EventBridge 自定义事件总线作为管道目标。EventBridge 规则将所有传入事件写入 CloudWatch Logs 日志组。对于自管理的 Kafka 源,EventBridge 支持配置参数,如批次窗口、批次大小和起始位置,您可以利用在示例 CDK 堆栈中 [CfnPipe](https//docsawsamazoncom/cdk/api/v2/python/awscdkawspipes/CfnPipehtml) 类的参数进行设置。

示例的 EventBridge 管道以10个事件为一批从 Kafka 中消费事件,因为其目标是 EventBridge 事件总线,最大批次大小为10。有关其他目标的最佳配置选择,请参阅 EventBridge Pipes 用户指南中的 批处理和并发。
之前描述的解决方案都包括了一个用于模式验证和从 Avro 转换为 JSON 的丰富 Lambda 函数。
Java Lambda 函数使用 AWS Glue Schema Registry Library 集成Glue 模式注册表。Python Lambda 函数则使用 confluentkafka 库,与 Confluent 模式注册表进行集成,并利用 Powertools for AWS Lambda (Python) 实现无服务器最佳实践,如日志记录和追踪。
丰富的 Lambda 函数执行以下任务:
从 Kafka 流中轮询的事件数据,键和值都是经过 base64 编码的。因此,对于传递给函数的每个事件,键和值都需进行解码。假定事件键已由生产者序列化为字符串类型。事件值利用 Glue 模式注册表 Serde Java或 confluentkafka 的 [AvroDeserializer](https//docsconfluentio/platform/current/clients/confluentkafkapython/html/indexhtml#schemaregistryavrodeserializer) Python进行反序列化。函数成功转换后的 JSON 事件将返回给 EventBridge 管道,然后该管道针对每一个事件调用目标。对于 Avro 反序列化失败的事件,则会发送到 SQS 死信队列。本文展示了如何使用 Amazon EventBridge Pipes、Glue 模式注册表和 Confluent 模式注册表实现事件消费、Avro 模式验证和转换为 JSON 的过程。
示例代码可在 AWS Samples GitHub 仓库 中找到,链接为 Glue 模式注册表 和 Confluent 模式注册表。欲了解更多模式,欢迎访问 Serverless Patterns Collection。
有关无服务器学习资源,请访问 Serverless Land。
标签: 贡献者,无服务器