当前位置: 首页 > news >正文

如何通过 Apache Camel 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实现一个入门应用程序,逐步演示如何配置和使用 Apache Camel 将数据发送到 Elasticsearch。

什么是 Apache Camel?

Apache Camel 是一个开源集成框架,可简化不同系统的连接,使开发人员可以专注于业务逻辑,而不必担心系统通信的复杂性。Camel 的核心概念是 “routes - 路由”,它定义了消息从源到目的地所遵循的路径,可能包括转换、验证和过滤等中间步骤。

Apache Camel 架构

Camel 使用 “components- 组件” 连接不同的系统和协议,例如数据库和消息传递服务,并使用 “endpoints- 端点” 表示消息的入口点和出口点。这些概念提供了模块化和灵活的设计,使配置和管理复杂集成变得更加容易,高效且可扩展。

使用 Elasticsearch 和 Apache Camel

我们将演示如何配置一个简单的 Java 应用程序,该应用程序使用 Apache Camel 将数据导入 Elasticsearch 集群。还将介绍使用 Apache Camel 中定义的路由在 Elasticsearch 中创建、更新和删除数据的过程。

1. 添加依赖项

配置此集成的第一步是将必要的依赖项添加到项目的 pom.xml 文件中。这将包括 Apache Camel 和 Elasticsearch 库。我们将使用新的 Java API 客户端库,因此我们必须导入 camel-elasticsearch 组件,并且版本必须与 camel-core 库相同。

如果你想使用 Java 低级 Rest 客户端,则必须使用 Elasticsearch 低级 Rest 客户端组件。

<dependency><groupId>org.apache.camel</groupId><artifactId>camel-core</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-elasticsearch</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-jackson</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.14.3</version>
</dependency>

2. 配置和运行 Camel 上下文

配置首先使用 DefaultCamelContext 类创建一个新的 Camel 上下文,该类是定义和执行路由的基础。接下来,我们配置 Elasticsearch 组件,这将允许 Apache Camel 与 Elasticsearch 集群交互。ESlasticsearchComponent 实例配置为连接到地址 localhost:9200,这是本地 Elasticsearch 集群的默认地址。对于需要身份验证的环境设置,你应该阅读有关如何配置组件和启用基本身份验证的文档,称为 “Configure the component and enable basic authentication - 配置组件和启用基本身份验证”。

public class ESComponent {public static ElasticsearchComponent getInstance() {var elasticsearch = new ElasticsearchComponent();elasticsearch.setHostAddresses("localhost:9200");return elasticsearch;}public static String getName() {return "elasticsearch";}
}

然后将该组件添加到 Camel 上下文中,使得定义的路由能够使用该组件在 Elasticsearch 中执行操作。

try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new OperationBulkRoute());context.start();
}

随后,将路由添加到上下文中。我们将创建用于批量索引、更新和删除文档的路由。

3. 配置 Camel 路由

数据索引

我们将配置的第一个路由用于数据索引。我们将使用包含电影目录的 JSON 文件。路由将配置为读取位于 src/main/resources/movies.json 的文件,将 JSON 内容反序列化为 Java 对象,然后应用聚合策略将多条消息合并为一条,从而允许在 Elasticsearch 中进行批量操作。配置了每条消息 500 个项目的大小,即批量将一次索引 500 部电影。

路由 Elasticsearch 操作 bulk:

String URI_BULK_OPERATION = String.format("elasticsearch://elasticsearch?operation=%s&indexName=%s",IndexOperationConfig.BULK_OPERATION,INDEX_NAME);
public class OperationBulkRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(OperationBulkRoute.class);private static final int BULK_SIZE = 500;@Overridepublic void configure() {from("file:src/main/resources?fileName=movies.json&noop=true").routeId("route-bulk-ingest").unmarshal().json().split(body()).aggregate(constant(true), new BulkAggregationStrategy()).completionSize(BULK_SIZE).to(URI_BULK_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();}
}

这批文档将被发送到 Elasticsearch 的批量操作端点。这种方法可确保处理大量数据时的效率和速度。

数据更新

下一个路由是更新文档。我们在上一步中索引了一些电影,现在我们将创建新的路由,通过参考代码搜索文档,然后更新评级字段。

我们设置了一个 Camel 上下文 (DefaultCamelContext),其中注册了一个 Elasticsearch 组件,并添加了一个自定义路由 IngestionRoute。操作首先通过 ProducerTemplate 发送文档代码,然后从 direct:update-ingestion 端点启动路由。

try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new IngestionRoute());context.start();ProducerTemplate producerTemplate = context.createProducerTemplate();producerTemplate.sendBody("direct:update-ingestion", documentCode);Thread.sleep(5000);
}

接下来,我们有 IngestionRoute,它是此流程的输入端点。该路由执行几个流水线操作。首先,在 Elasticsearch 中进行搜索以按代码 (direct:search-by-id) 定位文档,其中 SearchByCodeProcessor 根据代码组装查询。然后,检索到的文档由 UpdateRatingProcessor 处理,它将结果转换为 Movie 对象,将电影评级(movie rating)更新为特定值,并准备将更新后的文档发送回 Elasticsearch 进行更新。

public class IngestionRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(IngestionRoute.class);@Overridepublic void configure() throws Exception {from("direct:update-ingestion").pipeline().to("direct:search-by-id").to(URI_SEARCH_OPERATION).to("direct:update-rating").to(URI_UPDATE_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();from("direct:search-by-id").process(new SearchByCodeProcessor());from("direct:update-rating").process(new UpdateRatingProcessor());}
}

SearchByCodeProcessor 处理器仅配置为执行搜索查询:

public class SearchByCodeProcessor implements Processor {@Overridepublic void process(Exchange exchange) throws Exception {var code = exchange.getIn().getBody();String query = "{\n" +"  \"query\": {\n" +"   \"term\": {\n" +"     \"code\": {\n" +"       \"value\":" + code + "\n" +"     }\n" +"   }\n" +"  }\n" +"}";exchange.setProperty("document_code", code);exchange.getIn().setBody(query);}
}

UpdateRatingProcessor 处理器负责更新评级字段。

public class UpdateRatingProcessor implements Processor {private final ObjectMapper objectMapper;public UpdateRatingProcessor() {this.objectMapper = new ObjectMapper();this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);}@Overridepublic void process(Exchange exchange) throws Exception {HitsMetadata response = exchange.getIn().getBody(HitsMetadata.class);var code = Long.parseLong(exchange.getProperty("document_code").toString());if (response != null && response.hits() != null) {var documents = parseToMovies(response);var optionalMovie = documents.stream().filter(document -> code == (document.getSource().getCode())).findAny();optionalMovie.ifPresent(document -> {document.getSource().setRating(13.0);Map<String, Object> updateMap = new HashMap<>();updateMap.put("doc", document.getSource());exchange.getIn().setHeader("indexId", document.getId());exchange.getIn().setBody(updateMap);});}}

数据删除

最后,配置删除文档的路由。在这里,我们将使用文档的 ID 删除文档。在 Elasticsearch 中,要删除文档,我们需要知道文档标识符、存储文档的索引并执行删除请求。在 Apache Camel 中,我们将通过创建新路由来执行此操作,如下所示。

路由从 direct:op-delete 端点开始,该端点作为入口点。当需要删除文档时,将在消息正文中收到其标识符 (_id)。然后,路由使用 simple("${body}") 将 indexId 标头设置为该标识符的值,这会从消息正文中提取 _id。

public class OperationDeleteRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(OperationDeleteRoute.class);@Overridepublic void configure() {from("direct:op-delete").routeId("route-delete").setHeader("indexId", simple("${body}")).to(URI_DELETE_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();;}
}
String URI_DELETE_OPERATION = String.format("elasticsearch://elasticsearch?operation=%s&indexName=%s",IndexOperationConfig.DELETE_OPERATION,INDEX_NAME);

最后,消息被定向到URI_DELETE_OPERATION指定的端点,该端点连接到 Elasticsearch 以执行相应索引中的文档删除操作。
现在我们已经创建了路由,我们可以创建一个 Camel 上下文(DefaultCamelContext),它被配置为包含 Elasticsearch 组件。

try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new OperationDeleteRoute());context.start();ProducerTemplate producerTemplate = context.createProducerTemplate();producerTemplate.sendBody("direct:op-delete", documentId);
}

接下来,将 OperationDeleteRoute 类定义的删除路由(delete route)添加到上下文中。初始化上下文后,使用 ProducerTemplate 将应删除的文档的标识符传递给 direct:op-delete 端点,从而触发删除路由。

结论

Apache Camel 与 Elasticsearch 之间的集成允许实现强大而高效的数据提取,利用 Camel 的灵活性来定义可以处理不同数据操作场景(例如索引、更新和删除)的路由。通过此设置,你可以以可扩展的方式编排和自动化复杂流程,确保你的数据在 Elasticsearch 中得到有效管理。此示例演示了如何将这些工具一起使用来创建高效且适应性强的数据提取解决方案。

参考资料

  • Apache Camel
  • Apache Camel 架构
  • 聚合 Apache Camel
  • 文件组件
  • Elasticsearch 组件


准备好自己尝试一下了吗?开始免费试用。
想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训何时开始!

原文:https://www.elastic.co/search-labs/blog/elasticsearch-apache-camel-ingest-data

相关文章:

如何通过 Apache Camel 将数据导入 Elasticsearch

作者&#xff1a;来自 Elastic Andre Luiz 使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中&#xff0c;我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能&#xff0c;我们将实…...

打造民国风格炫酷个人网页:用HTML和CSS3传递民国风韵

附源码&#xff01;&#xff01;&#xff01; 感谢支持 小弟不断创作网站demo感兴趣的可以关注支持一下 对了 俺在结尾带上了自己用的 背景 大家可以尝试换一下效果更好哦~~~ 如何创建一个民国风格的炫酷网页 在这篇博客中&#xff0c;我们将展示如何制作一个结合民国风格和…...

豆包MarsCode编程助手:产品功能解析与应用场景探索!

随着现代技术的不断进化升级&#xff0c;人工智能正在逐步改变着我们的日常工作方式。特别是对于复杂的项目&#xff0c;代码编写、优化、调试、测试等环节充满挑战。为了简化这些环节、提高开发效率&#xff0c;许多智能编程工具应运而生&#xff0c;豆包MarsCode 编程助手就是…...

爬虫全网抓取

爬虫全网抓取是指利用网络爬虫技术&#xff0c;通过自动化的方式遍历互联网上各个网站、论坛、博客等&#xff0c;从这些网页中提取所需的数据。它通常涉及以下几个步骤&#xff1a; 目标设定&#xff1a;确定要抓取哪些类型的网页内容&#xff0c;比如新闻、商品信息、用户评论…...

【计算机组成原理】详细解读带符号整数在计算机中的运算

有符号整数的运算 导读一、补码的优势二、补码的加法运算三、补码的减法运算四、原码、反码、补码的特性结语 导读 大家好&#xff0c;很高兴又和大家见面啦&#xff01;&#xff01;&#xff01; 经过前面的介绍&#xff0c;我们已经初步认识了有符号整数的三种表示形式&…...

vue3常见的bug 修复bug

Vue 3 作为 Vue.js 的最新版本&#xff0c;在性能、开发体验以及代码可维护性等方面带来了显著的提升。然而&#xff0c;就像任何软件框架一样&#xff0c;Vue 3 在使用过程中也可能遇到一些典型的bug或问题。以下是一些可能遇到的典型问题&#xff1a; 响应式系统相关的问题&…...

C++课程笔记 类和对象

类概念 结构体&#xff1a;只要属性 类&#xff1a;有属性也有方法 c可以省略struct c不行 #include<iostream> using namespace std;typedef struct queue1 {int a;queue1 q() {queue1 q(2);return q;};queue1(){}queue1(int qa){a qa;} }q1; int main() {queue1 Q1;…...

提问即创作:用Prompt提示词引领AI灵感爆发

文章目录 &#x1f34a;AI内容创作的精髓&#xff1a;提示词Prompt1 什么是提示词工程?1.1 提示词是如何影响AI的输出结果?1.2 提示词的原理是什么1.3 提示词工程师的前景1.4 谁能成为提示词工程师&#xff1f;1.5 提示词的未来前景 2 提示词的基本书写技巧3 常见的提示词框架…...

一码空传临时网盘PHP源码,支持提取码功能

源码介绍 一码空传临时网盘源码V2.0免费授权&#xff0c;该源码提供了一个简单易用的无数据库版临时网盘解决方案。前端采用了layui开发框架&#xff0c;后端使用原生PHP编写&#xff0c;没有引入任何开发框架&#xff0c;保持了代码的简洁和高效。 这个程序使用了一个无数据…...

自然语言处理实战项目

自然语言处理实战项目 自然语言处理&#xff08;NLP, Natural Language Processing&#xff09;是人工智能的重要分支之一&#xff0c;致力于让计算机理解、生成并与人类进行语言交互。随着深度学习、神经网络和大数据的发展&#xff0c;NLP技术在近年来取得了飞跃性的进展&am…...

人工智能物联网的去中心化和分布式学习:全面综述、新兴挑战和机遇

这篇论文的标题是《Decentralized and Distributed Learning for AIoT: A Comprehensive Review, Emerging Challenges, and Opportunities》&#xff0c;作者是Hanyue Xu, Kah Phooi Seng, Li Minn Ang, 和 Jeremy Smith。论文发表在IEEE Access期刊上&#xff0c;接收日期为2…...

滑动窗口算法—最小覆盖子串

题目 ”最小覆盖子串“问题&#xff0c;难度为Hard&#xff0c;题目如下&#xff1a; 给你两个字符串 S 和 T&#xff0c;请你在 S 中找到包含 T 中全部字母的最短子串。如果 S 中没有这样一个子串&#xff0c;则算法返回空串&#xff0c;如果存在这样一个子串&#xff0c;则可…...

应用案例|开源 PolarDB-X 在互联网安全场景的应用实践

背景介绍 中盾数科集团始创于2012年&#xff0c;是由网络安全服务而发展起来的科技型、多元化的企业集团。旗下包括网络安全服务、信创一体化服务、箱式液冷、区块链、位置服务、视觉服务等六大板块&#xff0c;业务覆盖湖南、甘肃、贵州等多个省份。 业务挑战 中盾集团基于A…...

【大数据】MapReduce的“内存增强版”——Spark

【大数据】MapReduce的“内存增强版”——Spark 文章脉络 Spark架构 Spark-core SparkConf 和 SparkContext RDD Spark集群 Spark-sql 在大数据时代&#xff0c;数据处理和分析成为企业竞争的重要手段。Hadoop作为大数据处理的基石&#xff0c;其核心组件MapReduce在众多…...

o1模型:引领AI技术在STEM领域的突破与应用

o1模型是OpenAI最新推出的大型语言模型&#xff0c;它在多个领域展现出了卓越的能力&#xff0c;被认为是AI技术发展的一个重要里程碑。以下是对o1模型的详细介绍和分析&#xff1a; o1模型的简介和性能评估 o1模型在物理、化学、生物学等领域的基准任务上达到了博士生水平&…...

数据库系统 第57节 数据库迁移

数据库迁移是一个复杂的过程&#xff0c;涉及到将数据从一个数据库系统转移到另一个数据库系统。这个过程通常需要仔细规划和执行&#xff0c;以确保数据的完整性和可用性。以下是数据库迁移的一些关键方面&#xff1a; 数据迁移工具&#xff1a; 这些工具可以帮助自动化迁移过…...

【主机入侵检测】Wazuh规则详解

前言 Wazuh 规则是一组用XML格式编写的条件&#xff0c;它们定义了应该如何解释日志数据。这些规则由Wazuh Manager使用&#xff0c;用于在日志消息中检测特定的模式或行为&#xff0c;并相应地生成警报或响应。它们在威胁检测中扮演着至关重要的角色&#xff0c;因为它们允许系…...

redis有序集合写入和求交集的速度

背景 团队小伙伴做了一个需求。大概的需求是有很多的图片作品&#xff0c;图片作品有一些类别&#xff0c;每个人进入到每个类别的作品业&#xff0c;根据权重优先查看权重最高的的作品&#xff0c;权重大概是基于每个人对该作品的浏览计算&#xff0c;浏览过的作品放在最后展…...

微服务之服务注册与发现:Etcd、Zookeeper、Consul 与 Nacos 比较

在微服务架构中&#xff0c;服务注册与发现是实现服务动态管理和负载均衡的关键。本文将对四款主流的服务注册与发现工具——Etcd、Zookeeper、Consul、Nacos进行深入对比&#xff0c;从功能、性能、一致性、生态集成、应用场景等多个维度展开分析&#xff0c;帮助您选择最适合…...

桥接模式详解和分析JDBC中的应用

&#x1f3af; 设计模式专栏&#xff0c;持续更新中&#xff0c; 欢迎订阅&#xff1a;JAVA实现设计模式 &#x1f6e0;️ 希望小伙伴们一键三连&#xff0c;有问题私信都会回复&#xff0c;或者在评论区直接发言 桥接模式 文章目录 桥接模式桥接模式的四个核心组成&#xff1a…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)

Aspose.PDF 限制绕过方案&#xff1a;Java 字节码技术实战分享&#xff08;仅供学习&#xff09; 一、Aspose.PDF 简介二、说明&#xff08;⚠️仅供学习与研究使用&#xff09;三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...