当前位置: 首页 > news >正文

如何通过 Apache Camel 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中,我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能,我们将实现一个入门应用程序,逐步演示如何配置和使用 Apache Camel 将数据发送到 Elasticsearch。

什么是 Apache Camel?

Apache Camel 是一个开源集成框架,可简化不同系统的连接,使开发人员可以专注于业务逻辑,而不必担心系统通信的复杂性。Camel 的核心概念是 “routes - 路由”,它定义了消息从源到目的地所遵循的路径,可能包括转换、验证和过滤等中间步骤。

Apache Camel 架构

Camel 使用 “components- 组件” 连接不同的系统和协议,例如数据库和消息传递服务,并使用 “endpoints- 端点” 表示消息的入口点和出口点。这些概念提供了模块化和灵活的设计,使配置和管理复杂集成变得更加容易,高效且可扩展。

使用 Elasticsearch 和 Apache Camel

我们将演示如何配置一个简单的 Java 应用程序,该应用程序使用 Apache Camel 将数据导入 Elasticsearch 集群。还将介绍使用 Apache Camel 中定义的路由在 Elasticsearch 中创建、更新和删除数据的过程。

1. 添加依赖项

配置此集成的第一步是将必要的依赖项添加到项目的 pom.xml 文件中。这将包括 Apache Camel 和 Elasticsearch 库。我们将使用新的 Java API 客户端库,因此我们必须导入 camel-elasticsearch 组件,并且版本必须与 camel-core 库相同。

如果你想使用 Java 低级 Rest 客户端,则必须使用 Elasticsearch 低级 Rest 客户端组件。

<dependency><groupId>org.apache.camel</groupId><artifactId>camel-core</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-elasticsearch</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>org.apache.camel</groupId><artifactId>camel-jackson</artifactId><version>4.7.0</version>
</dependency><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.14.3</version>
</dependency>

2. 配置和运行 Camel 上下文

配置首先使用 DefaultCamelContext 类创建一个新的 Camel 上下文,该类是定义和执行路由的基础。接下来,我们配置 Elasticsearch 组件,这将允许 Apache Camel 与 Elasticsearch 集群交互。ESlasticsearchComponent 实例配置为连接到地址 localhost:9200,这是本地 Elasticsearch 集群的默认地址。对于需要身份验证的环境设置,你应该阅读有关如何配置组件和启用基本身份验证的文档,称为 “Configure the component and enable basic authentication - 配置组件和启用基本身份验证”。

public class ESComponent {public static ElasticsearchComponent getInstance() {var elasticsearch = new ElasticsearchComponent();elasticsearch.setHostAddresses("localhost:9200");return elasticsearch;}public static String getName() {return "elasticsearch";}
}

然后将该组件添加到 Camel 上下文中,使得定义的路由能够使用该组件在 Elasticsearch 中执行操作。

try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new OperationBulkRoute());context.start();
}

随后,将路由添加到上下文中。我们将创建用于批量索引、更新和删除文档的路由。

3. 配置 Camel 路由

数据索引

我们将配置的第一个路由用于数据索引。我们将使用包含电影目录的 JSON 文件。路由将配置为读取位于 src/main/resources/movies.json 的文件,将 JSON 内容反序列化为 Java 对象,然后应用聚合策略将多条消息合并为一条,从而允许在 Elasticsearch 中进行批量操作。配置了每条消息 500 个项目的大小,即批量将一次索引 500 部电影。

路由 Elasticsearch 操作 bulk:

String URI_BULK_OPERATION = String.format("elasticsearch://elasticsearch?operation=%s&indexName=%s",IndexOperationConfig.BULK_OPERATION,INDEX_NAME);
public class OperationBulkRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(OperationBulkRoute.class);private static final int BULK_SIZE = 500;@Overridepublic void configure() {from("file:src/main/resources?fileName=movies.json&noop=true").routeId("route-bulk-ingest").unmarshal().json().split(body()).aggregate(constant(true), new BulkAggregationStrategy()).completionSize(BULK_SIZE).to(URI_BULK_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();}
}

这批文档将被发送到 Elasticsearch 的批量操作端点。这种方法可确保处理大量数据时的效率和速度。

数据更新

下一个路由是更新文档。我们在上一步中索引了一些电影,现在我们将创建新的路由,通过参考代码搜索文档,然后更新评级字段。

我们设置了一个 Camel 上下文 (DefaultCamelContext),其中注册了一个 Elasticsearch 组件,并添加了一个自定义路由 IngestionRoute。操作首先通过 ProducerTemplate 发送文档代码,然后从 direct:update-ingestion 端点启动路由。

try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new IngestionRoute());context.start();ProducerTemplate producerTemplate = context.createProducerTemplate();producerTemplate.sendBody("direct:update-ingestion", documentCode);Thread.sleep(5000);
}

接下来,我们有 IngestionRoute,它是此流程的输入端点。该路由执行几个流水线操作。首先,在 Elasticsearch 中进行搜索以按代码 (direct:search-by-id) 定位文档,其中 SearchByCodeProcessor 根据代码组装查询。然后,检索到的文档由 UpdateRatingProcessor 处理,它将结果转换为 Movie 对象,将电影评级(movie rating)更新为特定值,并准备将更新后的文档发送回 Elasticsearch 进行更新。

public class IngestionRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(IngestionRoute.class);@Overridepublic void configure() throws Exception {from("direct:update-ingestion").pipeline().to("direct:search-by-id").to(URI_SEARCH_OPERATION).to("direct:update-rating").to(URI_UPDATE_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();from("direct:search-by-id").process(new SearchByCodeProcessor());from("direct:update-rating").process(new UpdateRatingProcessor());}
}

SearchByCodeProcessor 处理器仅配置为执行搜索查询:

public class SearchByCodeProcessor implements Processor {@Overridepublic void process(Exchange exchange) throws Exception {var code = exchange.getIn().getBody();String query = "{\n" +"  \"query\": {\n" +"   \"term\": {\n" +"     \"code\": {\n" +"       \"value\":" + code + "\n" +"     }\n" +"   }\n" +"  }\n" +"}";exchange.setProperty("document_code", code);exchange.getIn().setBody(query);}
}

UpdateRatingProcessor 处理器负责更新评级字段。

public class UpdateRatingProcessor implements Processor {private final ObjectMapper objectMapper;public UpdateRatingProcessor() {this.objectMapper = new ObjectMapper();this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);}@Overridepublic void process(Exchange exchange) throws Exception {HitsMetadata response = exchange.getIn().getBody(HitsMetadata.class);var code = Long.parseLong(exchange.getProperty("document_code").toString());if (response != null && response.hits() != null) {var documents = parseToMovies(response);var optionalMovie = documents.stream().filter(document -> code == (document.getSource().getCode())).findAny();optionalMovie.ifPresent(document -> {document.getSource().setRating(13.0);Map<String, Object> updateMap = new HashMap<>();updateMap.put("doc", document.getSource());exchange.getIn().setHeader("indexId", document.getId());exchange.getIn().setBody(updateMap);});}}

数据删除

最后,配置删除文档的路由。在这里,我们将使用文档的 ID 删除文档。在 Elasticsearch 中,要删除文档,我们需要知道文档标识符、存储文档的索引并执行删除请求。在 Apache Camel 中,我们将通过创建新路由来执行此操作,如下所示。

路由从 direct:op-delete 端点开始,该端点作为入口点。当需要删除文档时,将在消息正文中收到其标识符 (_id)。然后,路由使用 simple("${body}") 将 indexId 标头设置为该标识符的值,这会从消息正文中提取 _id。

public class OperationDeleteRoute extends RouteBuilder {private static final Log log = LogFactory.getLog(OperationDeleteRoute.class);@Overridepublic void configure() {from("direct:op-delete").routeId("route-delete").setHeader("indexId", simple("${body}")).to(URI_DELETE_OPERATION).process(exchange -> {var body = exchange.getIn().getBody(String.class);log.info(String.format("Response: %s", body));}).end();;}
}
String URI_DELETE_OPERATION = String.format("elasticsearch://elasticsearch?operation=%s&indexName=%s",IndexOperationConfig.DELETE_OPERATION,INDEX_NAME);

最后,消息被定向到URI_DELETE_OPERATION指定的端点,该端点连接到 Elasticsearch 以执行相应索引中的文档删除操作。
现在我们已经创建了路由,我们可以创建一个 Camel 上下文(DefaultCamelContext),它被配置为包含 Elasticsearch 组件。

try (var context = new DefaultCamelContext()) {context.addComponent(ESComponent.getName(), ESComponent.getInstance());context.addRoutes(new OperationDeleteRoute());context.start();ProducerTemplate producerTemplate = context.createProducerTemplate();producerTemplate.sendBody("direct:op-delete", documentId);
}

接下来,将 OperationDeleteRoute 类定义的删除路由(delete route)添加到上下文中。初始化上下文后,使用 ProducerTemplate 将应删除的文档的标识符传递给 direct:op-delete 端点,从而触发删除路由。

结论

Apache Camel 与 Elasticsearch 之间的集成允许实现强大而高效的数据提取,利用 Camel 的灵活性来定义可以处理不同数据操作场景(例如索引、更新和删除)的路由。通过此设置,你可以以可扩展的方式编排和自动化复杂流程,确保你的数据在 Elasticsearch 中得到有效管理。此示例演示了如何将这些工具一起使用来创建高效且适应性强的数据提取解决方案。

参考资料

  • Apache Camel
  • Apache Camel 架构
  • 聚合 Apache Camel
  • 文件组件
  • Elasticsearch 组件


准备好自己尝试一下了吗?开始免费试用。
想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训何时开始!

原文:https://www.elastic.co/search-labs/blog/elasticsearch-apache-camel-ingest-data

相关文章:

如何通过 Apache Camel 将数据导入 Elasticsearch

作者&#xff1a;来自 Elastic Andre Luiz 使用 Apache Camel 将数据提取到 Elasticsearch 的过程将搜索引擎的稳健性与集成框架的灵活性相结合。在本文中&#xff0c;我们将探讨 Apache Camel 如何简化和优化将数据提取到 Elasticsearch。为了说明此功能&#xff0c;我们将实…...

打造民国风格炫酷个人网页:用HTML和CSS3传递民国风韵

附源码&#xff01;&#xff01;&#xff01; 感谢支持 小弟不断创作网站demo感兴趣的可以关注支持一下 对了 俺在结尾带上了自己用的 背景 大家可以尝试换一下效果更好哦~~~ 如何创建一个民国风格的炫酷网页 在这篇博客中&#xff0c;我们将展示如何制作一个结合民国风格和…...

豆包MarsCode编程助手:产品功能解析与应用场景探索!

随着现代技术的不断进化升级&#xff0c;人工智能正在逐步改变着我们的日常工作方式。特别是对于复杂的项目&#xff0c;代码编写、优化、调试、测试等环节充满挑战。为了简化这些环节、提高开发效率&#xff0c;许多智能编程工具应运而生&#xff0c;豆包MarsCode 编程助手就是…...

爬虫全网抓取

爬虫全网抓取是指利用网络爬虫技术&#xff0c;通过自动化的方式遍历互联网上各个网站、论坛、博客等&#xff0c;从这些网页中提取所需的数据。它通常涉及以下几个步骤&#xff1a; 目标设定&#xff1a;确定要抓取哪些类型的网页内容&#xff0c;比如新闻、商品信息、用户评论…...

【计算机组成原理】详细解读带符号整数在计算机中的运算

有符号整数的运算 导读一、补码的优势二、补码的加法运算三、补码的减法运算四、原码、反码、补码的特性结语 导读 大家好&#xff0c;很高兴又和大家见面啦&#xff01;&#xff01;&#xff01; 经过前面的介绍&#xff0c;我们已经初步认识了有符号整数的三种表示形式&…...

vue3常见的bug 修复bug

Vue 3 作为 Vue.js 的最新版本&#xff0c;在性能、开发体验以及代码可维护性等方面带来了显著的提升。然而&#xff0c;就像任何软件框架一样&#xff0c;Vue 3 在使用过程中也可能遇到一些典型的bug或问题。以下是一些可能遇到的典型问题&#xff1a; 响应式系统相关的问题&…...

C++课程笔记 类和对象

类概念 结构体&#xff1a;只要属性 类&#xff1a;有属性也有方法 c可以省略struct c不行 #include<iostream> using namespace std;typedef struct queue1 {int a;queue1 q() {queue1 q(2);return q;};queue1(){}queue1(int qa){a qa;} }q1; int main() {queue1 Q1;…...

提问即创作:用Prompt提示词引领AI灵感爆发

文章目录 &#x1f34a;AI内容创作的精髓&#xff1a;提示词Prompt1 什么是提示词工程?1.1 提示词是如何影响AI的输出结果?1.2 提示词的原理是什么1.3 提示词工程师的前景1.4 谁能成为提示词工程师&#xff1f;1.5 提示词的未来前景 2 提示词的基本书写技巧3 常见的提示词框架…...

一码空传临时网盘PHP源码,支持提取码功能

源码介绍 一码空传临时网盘源码V2.0免费授权&#xff0c;该源码提供了一个简单易用的无数据库版临时网盘解决方案。前端采用了layui开发框架&#xff0c;后端使用原生PHP编写&#xff0c;没有引入任何开发框架&#xff0c;保持了代码的简洁和高效。 这个程序使用了一个无数据…...

自然语言处理实战项目

自然语言处理实战项目 自然语言处理&#xff08;NLP, Natural Language Processing&#xff09;是人工智能的重要分支之一&#xff0c;致力于让计算机理解、生成并与人类进行语言交互。随着深度学习、神经网络和大数据的发展&#xff0c;NLP技术在近年来取得了飞跃性的进展&am…...

人工智能物联网的去中心化和分布式学习:全面综述、新兴挑战和机遇

这篇论文的标题是《Decentralized and Distributed Learning for AIoT: A Comprehensive Review, Emerging Challenges, and Opportunities》&#xff0c;作者是Hanyue Xu, Kah Phooi Seng, Li Minn Ang, 和 Jeremy Smith。论文发表在IEEE Access期刊上&#xff0c;接收日期为2…...

滑动窗口算法—最小覆盖子串

题目 ”最小覆盖子串“问题&#xff0c;难度为Hard&#xff0c;题目如下&#xff1a; 给你两个字符串 S 和 T&#xff0c;请你在 S 中找到包含 T 中全部字母的最短子串。如果 S 中没有这样一个子串&#xff0c;则算法返回空串&#xff0c;如果存在这样一个子串&#xff0c;则可…...

应用案例|开源 PolarDB-X 在互联网安全场景的应用实践

背景介绍 中盾数科集团始创于2012年&#xff0c;是由网络安全服务而发展起来的科技型、多元化的企业集团。旗下包括网络安全服务、信创一体化服务、箱式液冷、区块链、位置服务、视觉服务等六大板块&#xff0c;业务覆盖湖南、甘肃、贵州等多个省份。 业务挑战 中盾集团基于A…...

【大数据】MapReduce的“内存增强版”——Spark

【大数据】MapReduce的“内存增强版”——Spark 文章脉络 Spark架构 Spark-core SparkConf 和 SparkContext RDD Spark集群 Spark-sql 在大数据时代&#xff0c;数据处理和分析成为企业竞争的重要手段。Hadoop作为大数据处理的基石&#xff0c;其核心组件MapReduce在众多…...

o1模型:引领AI技术在STEM领域的突破与应用

o1模型是OpenAI最新推出的大型语言模型&#xff0c;它在多个领域展现出了卓越的能力&#xff0c;被认为是AI技术发展的一个重要里程碑。以下是对o1模型的详细介绍和分析&#xff1a; o1模型的简介和性能评估 o1模型在物理、化学、生物学等领域的基准任务上达到了博士生水平&…...

数据库系统 第57节 数据库迁移

数据库迁移是一个复杂的过程&#xff0c;涉及到将数据从一个数据库系统转移到另一个数据库系统。这个过程通常需要仔细规划和执行&#xff0c;以确保数据的完整性和可用性。以下是数据库迁移的一些关键方面&#xff1a; 数据迁移工具&#xff1a; 这些工具可以帮助自动化迁移过…...

【主机入侵检测】Wazuh规则详解

前言 Wazuh 规则是一组用XML格式编写的条件&#xff0c;它们定义了应该如何解释日志数据。这些规则由Wazuh Manager使用&#xff0c;用于在日志消息中检测特定的模式或行为&#xff0c;并相应地生成警报或响应。它们在威胁检测中扮演着至关重要的角色&#xff0c;因为它们允许系…...

redis有序集合写入和求交集的速度

背景 团队小伙伴做了一个需求。大概的需求是有很多的图片作品&#xff0c;图片作品有一些类别&#xff0c;每个人进入到每个类别的作品业&#xff0c;根据权重优先查看权重最高的的作品&#xff0c;权重大概是基于每个人对该作品的浏览计算&#xff0c;浏览过的作品放在最后展…...

微服务之服务注册与发现:Etcd、Zookeeper、Consul 与 Nacos 比较

在微服务架构中&#xff0c;服务注册与发现是实现服务动态管理和负载均衡的关键。本文将对四款主流的服务注册与发现工具——Etcd、Zookeeper、Consul、Nacos进行深入对比&#xff0c;从功能、性能、一致性、生态集成、应用场景等多个维度展开分析&#xff0c;帮助您选择最适合…...

桥接模式详解和分析JDBC中的应用

&#x1f3af; 设计模式专栏&#xff0c;持续更新中&#xff0c; 欢迎订阅&#xff1a;JAVA实现设计模式 &#x1f6e0;️ 希望小伙伴们一键三连&#xff0c;有问题私信都会回复&#xff0c;或者在评论区直接发言 桥接模式 文章目录 桥接模式桥接模式的四个核心组成&#xff1a…...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用&#xff1a;作为微服务架构的网关&#xff0c;统一入口&#xff0c;处理所有外部请求。 核心能力&#xff1a; 路由转发&#xff08;基于路径、服务名等&#xff09;过滤器&#xff08;鉴权、限流、日志、Header 处理&#xff09;支持负…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

HashMap中的put方法执行流程(流程图)

1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中&#xff0c;其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下&#xff1a; 初始判断与哈希计算&#xff1a; 首先&#xff0c;putVal 方法会检查当前的 table&#xff08;也就…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

libfmt: 现代C++的格式化工具库介绍与酷炫功能

libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库&#xff0c;提供了高效、安全的文本格式化功能&#xff0c;是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全&#xff1a…...

什么是VR全景技术

VR全景技术&#xff0c;全称为虚拟现实全景技术&#xff0c;是通过计算机图像模拟生成三维空间中的虚拟世界&#xff0c;使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验&#xff0c;结合图文、3D、音视频等多媒体元素…...

篇章二 论坛系统——系统设计

目录 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 1. 数据库设计 1.1 数据库名: forum db 1.2 表的设计 1.3 编写SQL 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 通过需求分析获得概念类并结合业务实现过程中的技术需要&#x…...