如何通过 Apache Camel 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz
使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实现一个入门应用程序,逐步演示如何配置和使用 Apache Camel 将数据发送到 Elasticsearch。
什么是 Apache Camel?
Apache Camel 是一个开源集成框架,可简化不同系统的连接,使开发人员可以专注于业务逻辑,而不必担心系统通信的复杂性。Camel 的核心概念是 “routes - 路由”,它定义了消息从源到目的地所遵循的路径,可能包括转换、验证和过滤等中间步骤。
Apache Camel 架构
Camel 使用 “components- 组件” 连接不同的系统和协议,例如数据库和消息传递服务,并使用 “endpoints- 端点” 表示消息的入口点和出口点。这些概念提供了模块化和灵活的设计,使配置和管理复杂集成变得更加容易,高效且可扩展。
使用 Elasticsearch 和 Apache Camel
我们将演示如何配置一个简单的 Java 应用程序,该应用程序使用 Apache Camel 将数据导入 Elasticsearch 集群。还将介绍使用 Apache Camel 中定义的路由在 Elasticsearch 中创建、更新和删除数据的过程。
1. 添加依赖项
配置此集成的第一步是将必要的依赖项添加到项目的 pom.xml 文件中。这将包括 Apache Camel 和 Elasticsearch 库。我们将使用新的 Java API 客户端库,因此我们必须导入 camel-elasticsearch 组件,并且版本必须与 camel-core 库相同。
如果你想使用 Java 低级 Rest 客户端,则必须使用 Elasticsearch 低级 Rest 客户端组件。
<dependency><groupId>org.apache.camel</groupId><artifactId>camel-core</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-elasticsearch</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-jackson</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.14.3</version>
</dependency>
2. 配置和运行 Camel 上下文
配置首先使用 DefaultCamelContext 类创建一个新的 Camel 上下文,该类是定义和执行路由的基础。接下来,我们配置 Elasticsearch 组件,这将允许 Apache Camel 与 Elasticsearch 集群交互。ESlasticsearchComponent 实例配置为连接到地址 localhost:9200,这是本地 Elasticsearch 集群的默认地址。对于需要身份验证的环境设置,你应该阅读有关如何配置组件和启用基本身份验证的文档,称为 “Configure the component and enable basic authentication - 配置组件和启用基本身份验证”。
public class ESComponent {public static ElasticsearchComponent getInstance() {var elasticsearch = new ElasticsearchComponent();elasticsearch.setHostAddresses("localhost:9200");return elasticsearch;}public static String getName() {return "elasticsearch";}
}
然后将该组件添加到 Camel 上下文中,使得定义的路由能够使用该组件在 Elasticsearch 中执行操作。
try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new OperationBulkRoute());context.start();
}
随后,将路由添加到上下文中。我们将创建用于批量索引、更新和删除文档的路由。
3. 配置 Camel 路由
数据索引
我们将配置的第一个路由用于数据索引。我们将使用包含电影目录的 JSON 文件。路由将配置为读取位于 src/main/resources/movies.json 的文件,将 JSON 内容反序列化为 Java 对象,然后应用聚合策略将多条消息合并为一条,从而允许在 Elasticsearch 中进行批量操作。配置了每条消息 500 个项目的大小,即批量将一次索引 500 部电影。
路由 Elasticsearch 操作 bulk:
String URI_BULK_OPERATION = String.format("elasticsearch://elasticsearch?operation=%s&indexName=%s",IndexOperationConfig.BULK_OPERATION,INDEX_NAME);
public class OperationBulkRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(OperationBulkRoute.class);private static final int BULK_SIZE = 500;@Overridepublic void configure() {from("file:src/main/resources?fileName=movies.json&noop=true").routeId("route-bulk-ingest").unmarshal().json().split(body()).aggregate(constant(true), new BulkAggregationStrategy()).completionSize(BULK_SIZE).to(URI_BULK_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();}
}
这批文档将被发送到 Elasticsearch 的批量操作端点。这种方法可确保处理大量数据时的效率和速度。
数据更新
下一个路由是更新文档。我们在上一步中索引了一些电影,现在我们将创建新的路由,通过参考代码搜索文档,然后更新评级字段。
我们设置了一个 Camel 上下文 (DefaultCamelContext),其中注册了一个 Elasticsearch 组件,并添加了一个自定义路由 IngestionRoute。操作首先通过 ProducerTemplate 发送文档代码,然后从 direct:update-ingestion 端点启动路由。
try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new IngestionRoute());context.start();ProducerTemplate producerTemplate = context.createProducerTemplate();producerTemplate.sendBody("direct:update-ingestion", documentCode);Thread.sleep(5000);
}
接下来,我们有 IngestionRoute,它是此流程的输入端点。该路由执行几个流水线操作。首先,在 Elasticsearch 中进行搜索以按代码 (direct:search-by-id) 定位文档,其中 SearchByCodeProcessor 根据代码组装查询。然后,检索到的文档由 UpdateRatingProcessor 处理,它将结果转换为 Movie 对象,将电影评级(movie rating)更新为特定值,并准备将更新后的文档发送回 Elasticsearch 进行更新。
public class IngestionRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(IngestionRoute.class);@Overridepublic void configure() throws Exception {from("direct:update-ingestion").pipeline().to("direct:search-by-id").to(URI_SEARCH_OPERATION).to("direct:update-rating").to(URI_UPDATE_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();from("direct:search-by-id").process(new SearchByCodeProcessor());from("direct:update-rating").process(new UpdateRatingProcessor());}
}
SearchByCodeProcessor 处理器仅配置为执行搜索查询:
public class SearchByCodeProcessor implements Processor {@Overridepublic void process(Exchange exchange) throws Exception {var code = exchange.getIn().getBody();String query = "{\n" +" \"query\": {\n" +" \"term\": {\n" +" \"code\": {\n" +" \"value\":" + code + "\n" +" }\n" +" }\n" +" }\n" +"}";exchange.setProperty("document_code", code);exchange.getIn().setBody(query);}
}
UpdateRatingProcessor 处理器负责更新评级字段。
public class UpdateRatingProcessor implements Processor {private final ObjectMapper objectMapper;public UpdateRatingProcessor() {this.objectMapper = new ObjectMapper();this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);}@Overridepublic void process(Exchange exchange) throws Exception {HitsMetadata response = exchange.getIn().getBody(HitsMetadata.class);var code = Long.parseLong(exchange.getProperty("document_code").toString());if (response != null && response.hits() != null) {var documents = parseToMovies(response);var optionalMovie = documents.stream().filter(document -> code == (document.getSource().getCode())).findAny();optionalMovie.ifPresent(document -> {document.getSource().setRating(13.0);Map<String, Object> updateMap = new HashMap<>();updateMap.put("doc", document.getSource());exchange.getIn().setHeader("indexId", document.getId());exchange.getIn().setBody(updateMap);});}}
数据删除
最后,配置删除文档的路由。在这里,我们将使用文档的 ID 删除文档。在 Elasticsearch 中,要删除文档,我们需要知道文档标识符、存储文档的索引并执行删除请求。在 Apache Camel 中,我们将通过创建新路由来执行此操作,如下所示。
路由从 direct:op-delete 端点开始,该端点作为入口点。当需要删除文档时,将在消息正文中收到其标识符 (_id)。然后,路由使用 simple("${body}") 将 indexId 标头设置为该标识符的值,这会从消息正文中提取 _id。
public class OperationDeleteRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(OperationDeleteRoute.class);@Overridepublic void configure() {from("direct:op-delete").routeId("route-delete").setHeader("indexId", simple("${body}")).to(URI_DELETE_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();;}
}
String URI_DELETE_OPERATION = String.format("elasticsearch://elasticsearch?operation=%s&indexName=%s",IndexOperationConfig.DELETE_OPERATION,INDEX_NAME);
最后,消息被定向到URI_DELETE_OPERATION指定的端点,该端点连接到 Elasticsearch 以执行相应索引中的文档删除操作。
现在我们已经创建了路由,我们可以创建一个 Camel 上下文(DefaultCamelContext),它被配置为包含 Elasticsearch 组件。
try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new OperationDeleteRoute());context.start();ProducerTemplate producerTemplate = context.createProducerTemplate();producerTemplate.sendBody("direct:op-delete", documentId);
}
接下来,将 OperationDeleteRoute 类定义的删除路由(delete route)添加到上下文中。初始化上下文后,使用 ProducerTemplate 将应删除的文档的标识符传递给 direct:op-delete 端点,从而触发删除路由。
结论
Apache Camel 与 Elasticsearch 之间的集成允许实现强大而高效的数据提取,利用 Camel 的灵活性来定义可以处理不同数据操作场景(例如索引、更新和删除)的路由。通过此设置,你可以以可扩展的方式编排和自动化复杂流程,确保你的数据在 Elasticsearch 中得到有效管理。此示例演示了如何将这些工具一起使用来创建高效且适应性强的数据提取解决方案。
参考资料
- Apache Camel
- Apache Camel 架构
- 聚合 Apache Camel
- 文件组件
- Elasticsearch 组件
准备好自己尝试一下了吗?开始免费试用。
想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训何时开始!
原文:https://www.elastic.co/search-labs/blog/elasticsearch-apache-camel-ingest-data
相关文章:

如何通过 Apache Camel 将数据导入 Elasticsearch
作者:来自 Elastic Andre Luiz 使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实…...

打造民国风格炫酷个人网页:用HTML和CSS3传递民国风韵
附源码!!! 感谢支持 小弟不断创作网站demo感兴趣的可以关注支持一下 对了 俺在结尾带上了自己用的 背景 大家可以尝试换一下效果更好哦~~~ 如何创建一个民国风格的炫酷网页 在这篇博客中,我们将展示如何制作一个结合民国风格和…...

豆包MarsCode编程助手:产品功能解析与应用场景探索!
随着现代技术的不断进化升级,人工智能正在逐步改变着我们的日常工作方式。特别是对于复杂的项目,代码编写、优化、调试、测试等环节充满挑战。为了简化这些环节、提高开发效率,许多智能编程工具应运而生,豆包MarsCode 编程助手就是…...

爬虫全网抓取
爬虫全网抓取是指利用网络爬虫技术,通过自动化的方式遍历互联网上各个网站、论坛、博客等,从这些网页中提取所需的数据。它通常涉及以下几个步骤: 目标设定:确定要抓取哪些类型的网页内容,比如新闻、商品信息、用户评论…...

【计算机组成原理】详细解读带符号整数在计算机中的运算
有符号整数的运算 导读一、补码的优势二、补码的加法运算三、补码的减法运算四、原码、反码、补码的特性结语 导读 大家好,很高兴又和大家见面啦!!! 经过前面的介绍,我们已经初步认识了有符号整数的三种表示形式&…...

vue3常见的bug 修复bug
Vue 3 作为 Vue.js 的最新版本,在性能、开发体验以及代码可维护性等方面带来了显著的提升。然而,就像任何软件框架一样,Vue 3 在使用过程中也可能遇到一些典型的bug或问题。以下是一些可能遇到的典型问题: 响应式系统相关的问题&…...

C++课程笔记 类和对象
类概念 结构体:只要属性 类:有属性也有方法 c可以省略struct c不行 #include<iostream> using namespace std;typedef struct queue1 {int a;queue1 q() {queue1 q(2);return q;};queue1(){}queue1(int qa){a qa;} }q1; int main() {queue1 Q1;…...

提问即创作:用Prompt提示词引领AI灵感爆发
文章目录 🍊AI内容创作的精髓:提示词Prompt1 什么是提示词工程?1.1 提示词是如何影响AI的输出结果?1.2 提示词的原理是什么1.3 提示词工程师的前景1.4 谁能成为提示词工程师?1.5 提示词的未来前景 2 提示词的基本书写技巧3 常见的提示词框架…...

一码空传临时网盘PHP源码,支持提取码功能
源码介绍 一码空传临时网盘源码V2.0免费授权,该源码提供了一个简单易用的无数据库版临时网盘解决方案。前端采用了layui开发框架,后端使用原生PHP编写,没有引入任何开发框架,保持了代码的简洁和高效。 这个程序使用了一个无数据…...

自然语言处理实战项目
自然语言处理实战项目 自然语言处理(NLP, Natural Language Processing)是人工智能的重要分支之一,致力于让计算机理解、生成并与人类进行语言交互。随着深度学习、神经网络和大数据的发展,NLP技术在近年来取得了飞跃性的进展&am…...

人工智能物联网的去中心化和分布式学习:全面综述、新兴挑战和机遇
这篇论文的标题是《Decentralized and Distributed Learning for AIoT: A Comprehensive Review, Emerging Challenges, and Opportunities》,作者是Hanyue Xu, Kah Phooi Seng, Li Minn Ang, 和 Jeremy Smith。论文发表在IEEE Access期刊上,接收日期为2…...

滑动窗口算法—最小覆盖子串
题目 ”最小覆盖子串“问题,难度为Hard,题目如下: 给你两个字符串 S 和 T,请你在 S 中找到包含 T 中全部字母的最短子串。如果 S 中没有这样一个子串,则算法返回空串,如果存在这样一个子串,则可…...

应用案例|开源 PolarDB-X 在互联网安全场景的应用实践
背景介绍 中盾数科集团始创于2012年,是由网络安全服务而发展起来的科技型、多元化的企业集团。旗下包括网络安全服务、信创一体化服务、箱式液冷、区块链、位置服务、视觉服务等六大板块,业务覆盖湖南、甘肃、贵州等多个省份。 业务挑战 中盾集团基于A…...

【大数据】MapReduce的“内存增强版”——Spark
【大数据】MapReduce的“内存增强版”——Spark 文章脉络 Spark架构 Spark-core SparkConf 和 SparkContext RDD Spark集群 Spark-sql 在大数据时代,数据处理和分析成为企业竞争的重要手段。Hadoop作为大数据处理的基石,其核心组件MapReduce在众多…...

o1模型:引领AI技术在STEM领域的突破与应用
o1模型是OpenAI最新推出的大型语言模型,它在多个领域展现出了卓越的能力,被认为是AI技术发展的一个重要里程碑。以下是对o1模型的详细介绍和分析: o1模型的简介和性能评估 o1模型在物理、化学、生物学等领域的基准任务上达到了博士生水平&…...

数据库系统 第57节 数据库迁移
数据库迁移是一个复杂的过程,涉及到将数据从一个数据库系统转移到另一个数据库系统。这个过程通常需要仔细规划和执行,以确保数据的完整性和可用性。以下是数据库迁移的一些关键方面: 数据迁移工具: 这些工具可以帮助自动化迁移过…...

【主机入侵检测】Wazuh规则详解
前言 Wazuh 规则是一组用XML格式编写的条件,它们定义了应该如何解释日志数据。这些规则由Wazuh Manager使用,用于在日志消息中检测特定的模式或行为,并相应地生成警报或响应。它们在威胁检测中扮演着至关重要的角色,因为它们允许系…...

redis有序集合写入和求交集的速度
背景 团队小伙伴做了一个需求。大概的需求是有很多的图片作品,图片作品有一些类别,每个人进入到每个类别的作品业,根据权重优先查看权重最高的的作品,权重大概是基于每个人对该作品的浏览计算,浏览过的作品放在最后展…...

微服务之服务注册与发现:Etcd、Zookeeper、Consul 与 Nacos 比较
在微服务架构中,服务注册与发现是实现服务动态管理和负载均衡的关键。本文将对四款主流的服务注册与发现工具——Etcd、Zookeeper、Consul、Nacos进行深入对比,从功能、性能、一致性、生态集成、应用场景等多个维度展开分析,帮助您选择最适合…...

桥接模式详解和分析JDBC中的应用
🎯 设计模式专栏,持续更新中, 欢迎订阅:JAVA实现设计模式 🛠️ 希望小伙伴们一键三连,有问题私信都会回复,或者在评论区直接发言 桥接模式 文章目录 桥接模式桥接模式的四个核心组成:…...

【python - 函数】
一、交互式会话 在与 Python 的交互式会话中,你可以在提示符 >>> 后键入一些 Python 代码,Python 解释器会读取并执行你键入的各种命令。 要启动交互式会话,请在终端 (Mac/Unix/Linux) 中键入 python3 或在 Windows 中打开 Python…...

scipy中稀疏矩阵特征值问题概述
在Python的scipy库中,这三种算法——ARPACK、LOBPCG、和AMG——都是用于求解稀疏矩阵特征值问题的数值方法。它们各自有不同的特性和适用场景,以下是详细说明: 1. ARPACK (Arnoldi Package) ARPACK(Arnoldi Package)…...

浅谈线性表——队列
文章目录 一、什么是队列?二、队列底层三、自我实现一个队列3.1、链式存储3.1.1、单向链表实现队列的实现代码3.1.2、双向链表实现队列的实现代码 3.2、顺序存储3.2.1、循环队列的实现代码 一、什么是队列? 队列是只允许在一端进行插入数据操作…...

2-94 基于matlab的最佳维纳滤波器的盲解卷积算法
基于matlab的最佳维纳滤波器的盲解卷积算法。维纳滤波将地震子波转换为任意所需要的形态。维纳滤波不同于反滤波,它是在最小平方的意义上为最 佳。基于最佳纳滤波理论的滤波器算法是莱文逊(Wiener—Levinson)算法。程序提供了4种子波和4种期望输出:零延迟…...

【提示词】浅谈GPT等大模型中的Prompt
Prompt是人工智能(AI)提示词,是一种利用自然语言来指导或激发人工智能模型完成特定任务的方法。在AI语境中,Prompt是一种自然语言输入,通常指的是向模型提出的一个请求或问题,这个请求或问题的形式和内容会…...

最强AI照片说话Windows一体包下载地址,口型合成音频驱动图片,免安装,下载即用
照片数字一键整合包:点击下载 一键安装包,简单一键启动,即刻使用,秒级体验。 目前效果最好的音频驱动图片说话的软件,比sadtalker、MuseTalk更清晰,效果更好,可以作为DID heygen的开源平替。原…...

Windows下使用cmake编译OpenCV
Windows下使用cmake编译OpenCV cmake下载OpenCV下载编译OpenCV cmake下载 下载地址:https://cmake.org/download/ 下载完成,点击选择路径安装即可 OpenCV下载 下载地址:https://github.com/opencv/opencv/releases/tag/4.8.1因为我们是编译…...

设计模式---中介者模式
设计模式---中介者模式 定义与设计思路中介者模式的引入:机场控制塔中介者模式的设计框架 定义与设计思路 定义:用一个中介对象来封装一系列对象交互。中介者使各对象不需要相互引用,从而使其耦合松散,而且可以独立地改变它们之间…...

六氟化硫密度微水在线监测配套5孔M12格兰头航空插头插座
我们将为大家介绍如何使用六氟化硫密度微水在线监测配套5孔M12格兰头连接器。在本教程中,我们将向您展示简单易懂的步骤,让您轻松掌握。 所需材料: 1. 六氟化硫密度微水在线监测器 2. 5孔M12格兰头连接器 3. 电源线 4. 符合要求的电缆 5…...

linux -L4.linux 暂停和启动进程
接第3课,L3 第3课-查看进程 通过端口号,查看对应的进程 netstat -tulnp | grep :9513暂停这个进程 Kill -STOP 5376重启这个进程 Kill -CONT 5376要查看特定PID对应的端口,你可以使用netstat命令结合grep工具来过滤输出。以下是一个基于L…...