构建一个实时流式生成 AI 应用程序,使用 Amazon Bedrock、Amazon Manage

使用 Amazon Bedrock、Apache Flink 管理服务 和 Amazon Kinesis 数据流构建实时流生成 AI 应用程序

作者 Felix John 和 Michelle MeiLi Pfister 2024年6月27日

关键要点

生成型 AI 基于大语言模型的生成型 AI 自 2024 年以来发展迅速 可以进行智能对话。实时数据处理 利用 Apache Flink 和 Kinesis 数据流,实现数据的实时处理和分析。应用场景 通过分析在线商店客户评论的情感,获取实时情感分析数据。架构设计 本文提供了一个参考架构及基础设施设置的分步指导,包括使用 AWS 构建解决方案的方法。

在 2024 年,生成型人工智能AI获得了广泛关注,尤其是在大型语言模型LLM领域。这些模型支持高效的智能聊天解决方案。Amazon Bedrock 是一种完全管理的服务,它提供来自各大 AI 公司的高性能基础模型FM,并通过单一 API 提供可扩展的能力,帮助构建符合安全性、隐私和责任要求的生成 AI 应用程序。生成 AI 的应用场景广泛,远不止于聊天机器人,例如可以用于进行输入数据的分析,例如评论的情感分析。

许多企业实时生成数据,包括物联网IoT传感器数据、应用程序日志和用户网页点击流数据。在许多情况下,快速处理这些数据实时或近实时可以帮助企业提高从数据中获取洞察的价值。

处理实时数据的一种选择是使用流处理框架,例如 Apache Flink。Flink 是一种用于处理数据流的框架和分布式处理引擎,AWS 通过 Amazon Managed Service for Apache Flink 提供完全管理的服务,无需设置基础设施或管理资源,就能构建和部署复杂的流应用程序。

数据流处理使生成 AI 能够利用实时数据并为企业提供快速的洞察。本文探讨如何在 AWS 上实现流式架构时整合生成 AI 功能,采用管理服务,例如 Apache Flink 管理服务和 Amazon Kinesis 数据流 进行流数据处理,并通过 Amazon Bedrock 利用生成 AI 能力。我们将重点讨论如何实时推导在线商店客户评论中的情感。本文章还包括参考架构和基础设施设置的分步指南,以及使用 AWS Cloud Development Kit AWS CDK实施解决方案的示例代码。您可以在 GitHub 仓库 中找到代码进行尝试。

解决方案概述

以下图示展示了解决方案架构。架构图的上半部分展示了实时流管道,下半部分则展示了如何访问 Amazon OpenSearch Service 仪表板的细节。

实时流管道的构成之一是一个生产者,通过在本地运行 Python 脚本将评论发送至 Kinesis 数据流。这些评论来自 大型电影评论数据集,其中包含正面或负面的情感。接下来是将数据引入 Apache Flink 管理服务应用程序。在 Flink 中,我们异步调用 Amazon Bedrock使用 Anthropic Claude 3 Haiku处理评论数据。处理结果随后被引入 OpenSearch Service 集群,用于通过 OpenSearch Dashboards 进行可视化。为了简化流程,我们直接在 Python 脚本中调用 Kinesis 数据流的 PutRecords API。在生产环境中,建议使用 Amazon API Gateway 的 REST API 作为 Kinesis 数据流的代理,正如 Amazon Kinesis 的流式数据解决方案 中所描述的。

要访问 OpenSearch 仪表板,需要使用在您的虚拟私有云VPC内与 OpenSearch 服务集群位于同一私有子网中的堡垒主机。为了连接堡垒主机,我们使用 会话管理器,这是 Amazon Systems Manager 的一项功能,允许我们安全连接到堡垒主机,而无需打开入站端口。要访问它,我们使用会话管理器将 OpenSearch 仪表板端口转发到本地计算机。

实施步骤包括:

创建 Flink 应用程序并构建 JAR 文件。部署 AWS CDK 堆栈。设置并连接到 OpenSearch Dashboards。设置流生产者。

前提条件

在继续之前,您应具备以下前提条件:

一个 AWS 账户。Java 11 或更高版本。Apache Maven 396 或更高版本。安装 AWS 命令行工具。有关安装说明,请参见 快速开始 AWS CLI。安装 AWS CDK。有关安装说明,请参见 安装 AWS CDK。Python 39 或更高版本。安装会话管理器插件。该插件允许通过会话管理器访问 OpenSearch Dashboards。有关安装说明,请参见 安装 AWS CLI 的会话管理器插件。访问 Amazon Bedrock 上 Anthropic 的 Claude 模型。有关如何添加模型访问的说明,请参见 添加模型访问。使用的数据集为 大型电影评论数据集,原始论文为:Andrew L Maas Raymond E Daly Peter T Pham Dan Huang Andrew Y Ng 和 Christopher Potts (2011) 用于情感分析的学习词向量。 第49届计算语言学协会年会ACL 2011。

实施细节

本节将重点介绍解决方案的 Flink 应用程序代码。您可以在 GitHub 中找到相关代码。flinkasyncbedrock 目录下的 StreamingJobjava 文件是该应用程序的入口点。该应用程序使用 FlinkKinesisConsumer,这是一个 连接器 ,用于读取来自 Kinesis 数据流的流式数据。它应用了映射转换,将每个输入字符串转换为 Review 类对象的实例,得到了 DataStreamltReviewgt ,以方便处理。

Flink 应用程序使用在 StreamingJobjava 文件中定义的辅助类 AsyncDataStream 来将异步外部操作集成到 Flink 中。更具体地说,以下代码通过将 AsyncBedrockRequest 函数应用于 inputReviewStream 中的每个元素,创建异步数据流。该程序使用 unorderedWait来 提高吞吐量并减少空闲时间,因为事件顺序并不是必需的。超时时间设置为 25000 毫秒,以便给予 Amazon Bedrock API 足够的时间处理较长的评论。最大并发模式或容量限制为每次 1000 个请求。具体代码如下所示:

javaDataStreamltProcessedReviewgt processedReviewStream = AsyncDataStreamunorderedWait(inputReviewStream new AsyncBedrockRequest(applicationProperties) 25000 TimeUnitMILLISECONDS 1000)uid(processedReviewStream)

构建一个实时流式生成 AI 应用程序,使用 Amazon Bedrock、Amazon Manage

Flink 应用程序发起对 Amazon Bedrock API 的异步调用,为每个传入事件调用 Anthropic Claude 3 Haiku 基础模型。我们之所以选择 Anthropic Claude 3 Haiku,是因为它是 Anthropic 的最快、最小的模型,能够迅速响应。以下代码片段来自 AsyncBedrockRequestjava 文件,展示了如何设置呼叫 Anthropic Claude 消息 API 的必要配置:

java@Overridepublic void asyncInvoke(Review review final ResultFuture resultFuture) throws Exception { // [] JSONObject usermessage = new JSONObject() put(role user) put(content reviewText )

JSONObject assistantmessage = new JSONObject()    put(role assistant)    put(content {)JSONArray messages = new JSONArray()        put(usermessage)        put(assistantmessage)String payload = new JSONObject()        put(system systemPrompt)        put(anthropicversion bedrock20230531)        put(temperature 00)        put(maxtokens 4096)        put(messages messages)        toString()InvokeModelRequest request = InvokeModelRequestbuilder()        body(SdkBytesfromUtf8String(payload))        modelId(anthropicclaude3haiku20240307v10)        build()CompletableFutureltInvokeModelResponsegt completableFuture = clientinvokeModel(request)        whenComplete((response exception) gt {            if (exception != null) {                LOGerror(模型调用失败   exception)            }        })        orTimeout(250000 TimeUnitMILLISECONDS)

提示工程

该应用程序使用先进的提示工程技术来引导生成 AI 模型的响应并提供一致的输出。以下提示旨在从单个评论中提取总结和情感:

javaString systemPrompt = 将ltreviewgt标签中的评论总结为一句简洁的句子,并提供情感可为正面或负面。以有效的 JSON 对象返回,包含以下键:summary sentiment。 ltexamplegt {summary 评论者非常不喜欢电影,认为其不切实际、说教,并且观看非常无聊。 sentiment 负面} lt/examplegt

该提示指导 Anthropic Claude 模型以 JSON 格式返回提取的情感和总结。为了保持生成 AI 模型输出的一致性和良好结构,提示使用了多种 提示工程技术 以改善输出。例如,提示使用 XML 标签为 Anthropic Claude 提供更清晰的结构。此外,提示包含一个示例,以增强 Anthropic Claude 的表现,并指导其生成所需的输出。此外,提示预填充了助手消息,以帮助提供一致的输出格式。代码如下:

飞跃加速器官网入口

javaJSONObject assistantmessage = new JSONObject() put(role assistant) put(content {)

构建 Flink 应用程序

第一步是下载该代码库并构建 Flink 应用程序的 JAR 文件。请按照以下步骤进行操作

将代码库克隆到您的工作区:

bashgit clone https//githubcom/awssamples/awsstreaminggenerativeaiapplicationgit

进入下载的代码库的正确目录并构建 Flink 应用程序:

bashcd flinkasyncbedrock ampamp mvn clean package

Maven 将编译 Java 源代码并将其打包为可分发的 JAR 格式,在目录 flinkasyncbedrock/target/ 下命名为 flinkasyncbedrock01jar。部署 AWS CDK 堆栈后,该 JAR 文件将上传到 Amazon Simple Storage ServiceAmazon S3来创建您的 Apache Flink 管理服务应用程序。

部署 AWS CDK 堆栈

构建完 Flink 应用程序后,可以部署 AWS CDK 堆栈并创建所需资源:

进入正确的目录 cdk 并部署堆栈:

bashcd cdk ampamp npm install ampamp cdk deploy

这将创建您 AWS 账户中所需的资源,包括 Apache Flink 管理服务应用程序、Kinesis 数据流、OpenSearch 服务集群和在您的 VPC 私有子网上快速连接到 OpenSearch Dashboards 的堡垒主机。

记录输出值,输出将类似于以下内容:

plaintext StreamingGenerativeAIStack

部署时间 141426s

输出StreamingGenerativeAIStackBastionHostBastionHostIdC743CBD6 = i0970816fa778f9821StreamingGenerativeAIStackaccessOpenSearchClusterOutput = aws ssm startsession target i0970816fa778f9821 documentname AWSStartPortForwardingSessionToRemoteHost parameters {portNumber[443]localPortNumber[8157] host[vpcgenerativeaiopensearchqfssmne2lwpzpzheoue7rkylmiuseast1esamazonawscom]}StreamingGenerativeAIStackbastionHostIdOutput = i0970816fa778f9821StreamingGenerativeAIStackdomainEndpoint = vpcgenerativeaiopensearchqfssmne2lwpzpzheoue7rkylmiuseast1esamazonawscomStreamingGenerativeAIStackregionOutput = useast1堆栈 ARNarnawscloudformationuseast1stack/StreamingGenerativeAIStack/3dec75f0cc9e11ee9b1612348a4fbf87

总时间 141861s

设置并连接到 OpenSearch Dashboards

接下来,您可以设置并连接到 OpenSearch Dashboards。这是 Flink 应用程序将提取的情感和总结数据写入的地方。请按照以下步骤进行:

在单独的终端窗口中运行以下命令以从本地工作区建立到 OpenSearch 的连接。该命令可以在名为 accessOpenSearchClusterOutput 的输出中找到。

对于 Mac/Linux,使用以下命令:

bashaws ssm startsession target ltBastionHostIdgt documentname AWSStartPortForwardingSessionToRemoteHost parameters {portNumber[443]localPortNumber[8157] host[ltOpenSearchDomainHostgt]}

对于 Windows,使用以下命令:

bashaws ssm startsession target ltBastionHostIdgt documentname AWSStartPortForwardingSessionToRemoteHost parameters host=ltOpenSearchDomainHostgtportNumber=443localPortNumber=8157

运行后应该得到类似于以下的输出:

通过发出以下命令在 OpenSearch 中创建所需的索引:

对于 Mac/Linux,使用以下命令:

bashcurl location k request PUT https//localhost8157/processedreviews header ContentType application/json dataraw { mappings { properties { reviewId {type integer} userId {type keyword} summary {type keyword} sentiment {type keyword} dateTime {type date}} }}

对于 Windows,使用以下命令:

powershellurl = https//localhost8157/processedreviews