当前位置: 首页 > news >正文

橘子学ES实战操作之管道类型Ingest pipelines的基本使用

简介

我们在使用ES的时候,经常的用法就是把其他数据源比如Mysql的数据灌到ES中。
借用ES的一些功能来提供数据的全文检索以及聚合分析之类的功能。
在这个灌数据的过程中,我们经常会对数据做一些治理,类似ETL的能力。然后把治理后的数据写入ES中。
我们当然可以自己监听MQ的数据在程序中编写代码来实现这个过程。
也可以使用logstash等中间件来实现这个功能。
这些都可以。但是ES中本身也是提供了一个能力来实现这个功能的。
而且基于这个能力你甚至可以扩展出其他的操作。
这个能力就是Ingest pipelines(摄取管道)。

这个系列我们就来看一下摄取管道的能力,并且我会分三个章节来完成。这个章节我们只来看一些他的基本用法,后面我们会依次讲解 enrich processor和自己开发插件实现管道。

一、什么是Ingest pipelines(摄取管道)

我们先来看官方文档的一些信息。本文使用的ES版本为7.17.7。官方文档的位置位于Ingest pipelines官方文档

# 英文:
Ingest pipelines let you perform common transformations on your data before indexing.
For example, you can use pipelines to remove fields, extract values from text, and enrich your data.A pipeline consists of a series of configurable tasks called processors. 
Each processor runs sequentially, making specific changes to incoming documents. 
After the processors have run, Elasticsearch adds the transformed documents to your data stream or index.# 中文:
采集管道可让您在索引之前对数据执行常见的转换。例如,您可以使用管道删除字段、从文本中提取值以及丰富数据。
管道由一系列可配置的任务(称为处理器)组成。每个处理器按顺序运行,对传入的文档进行特定更改。
处理器运行后,Elasticsearch 会将转换后的文档添加到您的数据流或索引中。

而且官方还有一个图示:
在这里插入图片描述
我们从描述和图示可以看到他很像我们在设计模式中说的责任链设计模式,把一个数据从源端拿到之后,在写入目标端的索引之前,经过一系列的processor处理器,最终得到目标数据,写入ES索引中。
实际上他也就是这个功能。

而且你可以使用 Kibana 的摄取管道功能或摄取 API 创建和管理摄取管道。 Elasticsearch 将管道存储在集群状态中。我们说,基本上ES中的一些操作你都可以通过kibana界面或者是DSL命令来实现。他们是等价的,只是一个有界面点点点,一个是写代码。作为一个开发,我们当然要用命令这种"高大上"的东西来实现了,界面点点点还是让那些小白去点吧,没有任何逼格。
而且命令的官方地址位于Ingest pipelines操作命令。
并且我们说这个管道你建立之后,如果你想查看,你可以去集群状态中去查看,具体的操作步骤位于ES查看集群状态

OK,至此我们已经知道了他的一些概念,以及如何去在文档中定位他。下面我们就来看看如何使用。

二、Ingest pipelines基本步骤

1、节点角色

这个后面我会单独写一篇文章来说这个事,这里先提一下,如果你想使用Ingest pipelines功能。要求你的节点角色必须是Ingest,我们这次不纠结这个,因为所有的节点启动默认就有这个角色。我们这次的重点不在节点角色上。
所以我们这个问题目前不存在。

2、操作步骤

我们先来看看他的操作流程,然后我们再探究那些深入的。

2.1、创建一个索引,写入数据

我们先创建一个索引用于操作。

# 我们创建一个索引,他有两个字段,一个是text类型的message,一个是类似数组类型的tags,
# 这里之所以说类似数组,是因为ES没有专门的数组类型,这个我们后面说。
PUT my-index-01
{"settings": {"number_of_replicas": 0 # 我是单节点,设置副本为0,}, "mappings": {"properties": {"message":{"type": "text"},"tags":{"type": "keyword"}}}
}# 然后往里写两条数据
PUT my-index-01/_doc/1
{"message": "手机","tags":  [ "xiao mi ", " hua wei" ]
}PUT my-index-01/_doc/2
{"message": "语言","tags":  [ "java ", " python" ]
}# 查到此时的数据为,我们看到数据没毛病
GET my-index-01/_search
{"_index" : "my-index-01","_type" : "_doc","_id" : "1","_score" : 1.0,"_source" : {"message" : "手机","tags" : ["xiao mi "," hua wei"]}
},
{"_index" : "my-index-01","_type" : "_doc","_id" : "2","_score" : 1.0,"_source" : {"message" : "语言","tags" : ["java "," python"]}
}

但是我们发现,我写进去的数据的tags字段,有的是后面有空格,有的是前面有空格,反正就是对不上。所以我们要把他处理掉。

2.2、创建摄取管道


PUT _ingest/pipeline/my-pipeline-01
{"description": "我的第一个pipeline","processors": [{"foreach": {"field": "tags","processor": {"trim": {"field": "_ingest._value"}}}}]
}

我们看到我创建了一个摄取管道,其中processors部分可以传入多个processor,如同我们前面说的责任链。这里每一个processor都是责任链中的一个处理节点。
我们加入了第一个processor叫做foreach,用来遍历我们的每一条数据的每一个tags字段,然后他又嵌套了一个processor叫做trim,就是对于每个tags的遍历的属性都做去除前后空格的操作,trim在java也常见。而_ingest._value是固定写法,表示的就是每一个tags中的元素。取出来做trim。
这就是一个摄取管道的最基本的要素。
这个DSL如果你觉得抽象,我可以用java伪代码来表示一下。其实就干了这么点事。

public static void main(String[] args) {List<MyIndex01> list = Arrays.asList(new MyIndex01("手机", Arrays.asList("xiao mi ", " hua wei")), new MyIndex01("语言", Arrays.asList("java ", " python")));for (MyIndex01 myIndex01 : list) {List<String> tags = new ArrayList<>();for (String tag : myIndex01.getTags()) {tags.add(tag.trim());}myIndex01.setTags(tags);}
}@Data
@AllArgsConstructor
@NoArgsConstructor
static class MyIndex01 {private String message;private List<String> tags;}

2.2、创建一个目标索引

我们创建目标索引用来清洗我们的数据之后存进去。其实和源索引一样。

PUT my-index-01_target
{"settings": {"number_of_replicas": 0}, "mappings": {"properties": {"message":{"type": "text"},"tags":{"type": "keyword"}}}
}

2.3、pipeline+reindex清洗数据

下面我们就执行reindex把我们源端索引的带着空格的不合理的数据,通过摄取管道,转换到目标索引中。

POST _reindex
{"source": { "index": "my-index-01"},"dest": {"index": "my-index-01_target","pipeline": "my-pipeline-01"  # 在这里指定我们的摄取管道。}
}

执行之后,我们可以来查看一下我们清洗之后写入目标索引my-index-01_target的数据。

GET my-index-01_target/_search
此时我们就看到,tags里面数据的前后的空格都被去掉了。
{"_index" : "my-index-01_target","_type" : "_doc","_id" : "1","_score" : 1.0,"_source" : {"message" : "手机","tags" : ["xiao mi","hua wei"]}
},
{"_index" : "my-index-01_target","_type" : "_doc","_id" : "2","_score" : 1.0,"_source" : {"message" : "语言","tags" : ["java","python"]}
}

OK,基于以上你大概知道了摄取管道如何使用,但是这只是他的一种使用方式,就是pipeline+reindex,可以从给reindex打配合,让数据从一个索引经过摄取管道清洗,然后转移到另一个索引。

2.4、直接写入

还记得我们一开始说的那个话题吗,我们在写入数据的时候,就想清洗之后再写入,而不是写完了再转移。这时候pipeline也可以使用。

POST my-index-01/_doc?pipeline=my-pipeline-01
{"message": "电脑","tags": [ "weiruan    ", "  pingguo" ]
}

我们在写入数据的时候可以直接作用,不需要reindex。完全取决于你的场景。
我们来查看一下这个数据,我们看到没毛病。

{"_index" : "my-index-01","_type" : "_doc","_id" : "q0qiEJEBqSzQT13RTnJx","_score" : 1.0,"_source" : {"message" : "电脑","tags" : ["weiruan","pingguo"]}
}

其余诸如_update_by_query,update数据都可以配合使用。具体都可以参考官网。

2.5、查看摄取管道

见操作文档摄取管道操作

2.6、修改摄取管道

修改很简单,你原来是啥样的,然后改一下,在创建一遍,他就会把你同名的摄取管道给覆盖掉,达到修改的目的。

2.7、删除摄取管道

见操作文档摄取管道操作

2.8、测试摄取管道

这个我单独说一下,他的作用就是一个测试而不真正给你执行创建,就是模拟了一下,但是没有实际执行。
位置位于Simulate pipeline API

三、内置管道

es内部为我们内置了很多管道,我们可以开箱即用,实际上上文中我们使用的foreach和trim本身就是ES内置的摄取管道。
而这些内置管道可以参考文档位置为ES内置摄取管道文档位置
在这里插入图片描述
从图中可以看到有这么多,这我还没截全了。实际上比图中的要多。那么我们这里来尝试几个比较常用的,其余的你看文档可以自己上手。

0、Foreach processor

这个我们上面说过了,就是一个遍历数组元素的作用。

1、Append processor

Append processor文档位置
我们来看官网对他的介绍:

Appends one or more values to an existing array if the field already exists and it is an array. 
Converts a scalar to an array and appends one or more values to it if the field exists and it is a scalar. 
Creates an array containing the provided valuesif the field doesn’t exist. Accepts a single value or an array of values.文档的大意就是如果字段已存在并且是数组,则将一个或多个值追加到现有数组。
将标量转换为数组,如果该字段存在并且它是标量,则向其中追加一个或多个值。
如果该字段不存在,则创建一个包含提供的值的数组。接受单个值或值数组。

怎么解释呢,官网永远不说人话,我们来解释一下。
数组:就是我们前面用的那个tags那种类型。
标量:就是非复杂类型的,比如字符串这种。
所以翻译一下这段话的意思就是:
1、如果你索引里面存在这个字段了,那么就看你这个字段是啥类型的。
如果是数组类型,那么Append processor的作用就是给你这个数组add一个值进去。
如果不是数组类型,是简单类型,那就会把你的这个简单类型转换成数组类型。然后再给你添加进去。

2、如果你索引里面不存在我操作这个字段,那直接就是给你创建成数组类型的,写进去的也是数组格式。

于是你有了这个概念之后我们就来试试,是不是这样呢。

  • 我们还用我们之前的索引my-index-01
PUT my-index-01
{"settings": {"number_of_replicas": 0}, "mappings": {"properties": {"message":{"type": "text"},"tags":{"type": "keyword"}}}
}

我们来看一下这个管道的语法如下:Append processor

{"append": {"field": "tags","value": ["production", "{{{app}}}", "{{{owner}}}"]}
}

我来解释一下,“field”: “tags”,就是你要操作的字段。你要给哪个字段添加内容,这里就写哪个字段。
“value”: [“production”, “{{{app}}}”, “{{{owner}}}”]这一句里面有两种属性,一个是"production",表示你要给每个tags里面添加一个production字符串,而"{{{app}}}“这种语法表示你要把你索引里面的app字段的内容也添加进去。如果没有这个app字段,那添加进去的就是个空字符串”"。
我们来试试。

1.1、给数组字段使用Append processor

PUT _ingest/pipeline/my-pipeline-01
{"description": "我的第一个pipeline","processors": [{"append": {"field": "tags","value": ["levi","{{{message}}}"]}}]
}

我们给tags这个数组类型的字段添加内容,添加的值是字符串"levi",同时我们还给tags字段添加了message属性的值。
此时我们写入一条数据。我们预期的就是我们这条数据加进去之后,tags里面除了本身的"wuya", “zhuomuniao” ,还有我们摄取管道添加的"levi"以及他自己的message字段的值"鸟类"。

POST my-index-01/_doc?pipeline=my-pipeline-01
{"message": "鸟类","tags": [ "wuya", "zhuomuniao" ]
}

我们来看一下这个数据。发现没毛病。
在这里插入图片描述

1.2、给标量字段使用Append processor

标量字段就是我们的message这个字符串字段。我们按照上面的理论可以知道,当我们写入的时候,他会把我们这个字符串字段转变为数组类型。并且把值写进数组。

PUT _ingest/pipeline/my-pipeline-02
{"description": "我的第二个pipeline","processors": [{"append": {"field": "message", # 操作的标量字段"value": ["levi","{{{message}}}"]}}]
}

我们这里是给message字段添加值,而且添加的内容是"levi"字符串,以及message本身这个内容,等于是把message添加两次。


POST my-index-01/_doc?pipeline=my-pipeline-02
{"message": "人类","tags": [ "man", "woman" ]
}

在这里插入图片描述

我们看到,他把我们的message转成数组了。而且确实message里面的人类被写进去两次。

1.3、Append processor操作一个不存在的字段

PUT _ingest/pipeline/my-pipeline-03
{"description": "我的第三个pipeline","processors": [{"append": {"field": "message","value": ["production","{{{123}}}"]}}]
}

我们操作了一个不存在的字段123,这时候我们看看会发生什么。
在这里插入图片描述
其实还是符合我们的预期的。

2、Date processor

Date processor文档位置
我们来看官网对他的介绍:

Parses dates from fields, and then uses the date or timestamp as the timestamp for the document. 
By default, the date processor adds the parsed date as a new field called @timestamp. 
You can specify a different field by setting the target_field configuration parameter. 
Multiple date formats are supported as part of the same date processor definition. 
They will be used sequentially to attempt parsing the date field, in the same order they were defined as part of the processor definition.意思就是:
解析字段中的日期,然后使用日期或时间戳作为文档的时间戳。
默认情况下,日期处理器将解析的日期添加为名为 @timestamp 的新字段。
您可以通过设置 target_field 配置参数来指定不同的字段。支持多种日期格式作为同一日期处理器定义的一部分。
它们将按顺序使用来尝试解析日期字段,其顺序与它们被定义为处理器定义的一部分的顺序相同。换句话说就是他会把你数据中的某个表示日期的字段,给解析成为一个你指定的日期格式的值,
然后赋值给你指定的那个字段。如果你没指定这个字段,那就他会给你自己创建一个叫做@timestamp的字段。
并且这个转换的日期格式可以是多种,实际上我们一般只需要一种,整个系统应该统一日期格式,不然未来
整出什么花活来就难受了。

3、Set processor

Set processor文档位置
我们来看官网对他的介绍:

Sets one field and associates it with the specified value. If the field already exists,its value will be replaced with the provided one.意思就是:
设置一个字段并将其与指定值关联。如果该字段已存在,则其值将替换为提供的值。
换句话说就是为某一个字段设置值,这个值可以是你指定写死的,也可以是来自索引中的某个其他字段。

3.1、创建一个索引

PUT my-index-01
{"settings": {"number_of_replicas": 0}, "mappings": {"properties": {"firstname":{"type": "keyword"},"lastname":{"type": "keyword"},"fullname":{"type": "keyword"}}}
}

3.2、创建set processor

PUT _ingest/pipeline/my-set-pipeline-01
{"description": "我的第一个set pipeline","processors": [{"set": {"field": "fullname","value": "{{{lastname}}} {{{firstname}}}  ok"}}]
}

我们为我们的fullname字段通过set processor设置了值,值的内容分别是文档中的lastname字段和firstname字段的值拼起来的。并且我还在后面固定了一个字符串ok。

3.3、通过摄取管道写入数据

POST my-index-01/_doc?pipeline=my-set-pipeline-01
{"firstname": "levi","lastname": "jack"
}

查看结果:符合我们的预期。
在这里插入图片描述

4、Script processor

这是我们最后要介绍的一个摄取管道,其余的后面有时间慢慢补上,这个比较强大,我们就来看看这个。
Script processor官方文档
我们来看官网对他的介绍:

Runs an inline or stored script on incoming documents. The script runs in the ingest context.The script processor uses the script cache to avoid recompiling the script for each incoming document. 
To improve performance, ensure the script cache is properly sized before using a script processor in production.意思就是:
对传入文档运行内联或存储的脚本。该脚本在摄取管道上下文中运行。
脚本处理器使用脚本缓存来避免为每个传入文档重新编译脚本。
为了提高性能,请在生产中使用脚本处理器之前确保脚本缓存的大小正确。换句话说,以前我们那些管道都是一些内置的,你无外乎就是配置一些参数,但是这个不一样,这个你可以编辑
一些脚本逻辑来控制这个生效逻辑。

他的可选参数如下:其实其他管道也有可选参数,大差不差,之所以前面的没写是因为不难,看文档都能看懂,这个我这里写一下。
在这里插入图片描述
我们先来使用一下,实际上这里很多东西都和脚本有关,具体可以参考文档。

4.1、创建一个索引

PUT my-index-01
{"settings": {"number_of_replicas": 0}, "mappings": {"properties": {"tags":{"type": "keyword"}}}
}

索引很简单,就是一个tags,我们里面准备存一些数组类型。

4.2、创建摄取管道

PUT _ingest/pipeline/my-script-pipeline-01
{"description": "我的第一个script pipeline","processors": [{"script": {"description": "把tags中包含huawei的元素移除","lang": "painless","source": """def tags = ctx['tags'];if(tags != null){for (int i = 0; i < tags.length; i++) {if(tags[i] == params.removetag){tags.remove(i);}}}""","params": {"removetag": "huawei"}}}]
}

你可以看到我们这个脚本实际上和java很像,他的ctx就是你每条数据过管道的时候他获取到这个数据上下文,然后取出你的tags字段遍历,发现和参数一样的就移除。

4.3、通过摄取管道写入数据

POST my-index-01/_doc?pipeline=my-script-pipeline-01
{"tags": ["xiaomi","huawei","meizu"]
}

查看数据我们看到huawei确实被移除了。
在这里插入图片描述

四、来个需求?

1、需求梳理

在这个实操中,我们会用java结合DSL的方式来实现实际开发中如何使用摄取管道的操作。

假如此时我们有个需求,客户提供了一个数据表是mysql表叫做newstable,领导要你把这个表接进来,然后写入你的ES。
这个表就两个字段,一个是name一个是tags

但是呢,他的表里面,
name字段有的前后有空格,有的还各种大小写不统一,领导要你都变成小写。
最离谱的是name字段里面的内容有html标签,这你完全不能忍,展示出来成啥了。
tags字段,你想在你的ES里面存成数组,结果mysql还是用逗号隔开的字符串。
而且tags字段数据是有一些敏感词,对于你们公司来说"tnd"这属于敏感词,你要给他过滤掉。

基于这样的"脏数据",你要在数据进入ES之前,给他治理一波。
开发之前,我们先来梳理一下这些需求的点:

1、前后有空格,问题不大,我们用trim processor来去掉。
2、大小写不统一,都要变为小写。问题也不大,我们用Lowercase processor来处理。
3、还有一个tags字段,你要存数组,但是源端是逗号隔开的字符串,你要切割开,我们使用Split processor。
4、html标签这个问题也不大,我们用HTML strip processor来处理。
5、敏感词过滤,我们用Script processor来处理移除掉。

于是我们选好了我们使用的管道之后,我们就来进行开发,我的环境很简单。

2、工程环境

1、jdk1.8
2、springboot2.3.4.RELEASE
3、maven 3.6.1
4、elasticsearch 7.17.7

以下是我的pom依赖配置。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version></parent><groupId>com.leiv</groupId><artifactId>springboot-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-demo</name><description>springboot-demo</description><packaging>jar</packaging><properties><java.version>1.8</java.version><elasticsearch.version>7.17.7</elasticsearch.version></properties><dependencies><!-- web场景启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--引入ES客户端依赖--><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>${elasticsearch.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

我的yaml文件配置。

elasticsearch:nodes: 127.0.0.1:9200cluster-name: my-applicationcluster-port: 9300

好了,我们已经有了环境,就可以进行开发了。

3、代码编写

3.1、es客户端类配置

/*** 方法描述:es基础配置绑定,映射配置文件*/
@Data
@ConfigurationProperties(prefix = "elasticsearch")
public class EsConfigProperties {private String clusterName;private String[] nodes;private Integer clusterPort;
}@Slf4j
@Configuration
@EnableConfigurationProperties(EsConfigProperties.class)
public class ElasticsearchConfiguration {private EsConfigProperties esConfigProperties;public ElasticsearchConfiguration(EsConfigProperties esConfigProperties) {this.esConfigProperties = esConfigProperties;}/*** 方法描述:初始化es客户端 因为我们还没做安全管理,所以这里使用无密码的客户端* @return*/@Bean(destroyMethod = "close",name = "restHighLevelClient")public RestHighLevelClient initRestClient() {RestHighLevelClient restHighLevelClient = null;try {String[] nodes = esConfigProperties.getNodes();if(Objects.isNull(nodes) || nodes.length == 0){log.info("es客户端初始化失败,未发现es节点配置{}", JSON.toJSONString(nodes));}List<HttpHost> hostList = new ArrayList<>();for (String node : nodes) {String[] nodeAttr = node.split(":");HttpHost httpHost = new HttpHost(nodeAttr[0], Integer.parseInt(nodeAttr[1]));hostList.add(httpHost);}HttpHost [] httpHostArray = hostList.toArray(new HttpHost[]{});RestClientBuilder builder = RestClient.builder(httpHostArray).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {return httpAsyncClientBuilder;}});restHighLevelClient = new RestHighLevelClient(builder);log.info("ES高级客户端初始化完成......");}catch (Exception e){e.printStackTrace();log.info("ES高级客户端初始化失败{}",e.getMessage());}return restHighLevelClient;}/*** 方法描述:解决es和netty版本冲突问题*/@PostConstructvoid init() {System.setProperty("es.set.netty.runtime.available.processors", "false");}}

3.2、在kibana创建管道

实际上这种东西你也可以用java客户端来做,但是一般我们用脚本创建好保存起来就行,java代码有了问题你想改还得发版,所以我们选在在kibana端编写DSL。
我们把之前的梳理搬出来看看我们要创建哪几个管道。

1、前后有空格,问题不大,我们用trim processor来去掉。
2、大小写不统一,都要变为小写。问题也不大,我们用Lowercase processor来处理。
3、还有一个tags字段,你要存数组,但是源端是逗号隔开的字符串,你要切割开,我们使用Split processor。
4、html标签这个问题也不大,我们用HTML strip processor来处理。
5、敏感词过滤,我们用Script processor来处理移除掉。

根据这几个需求我们去挨个创建管道,注意哈,我们的管道是一个链,你完全可以在一个pipline里面配置多个管道,他会让你的每条数据,挨个去执行这些管道的。pipline翻译就是流水线,其实见名知义了。

我们编写我们的管道流水线。注意,我们的管道顺序编织是有顺序的,我们选择先去除html标签,在转小写,因为如果你先转小写可能把html标签给转成别的了。这样会影响你的处理,所以我们在编写的时候要考虑执行顺序带来的影响。

PUT _ingest/pipeline/my-etl-newstable-pipeline
{"description": "我的清洗newstable表数据的pipeline","processors": [{"html_strip": {"field": "name"}},{"trim": {"field": "name"}},{"lowercase": {"field": "name"}},{"split": {"field": "tags","separator": ",","preserve_trailing": true}},{"script": {"description": "把tags中包含tmd,tnd的元素移除","lang": "painless","source": """def tags = ctx['tags'];if(tags != null){for (int i = 0; i < tags.length; i++) {if(tags[i] == params.tndparam){tags.remove(i);}}}""","params": {"tndparam": "tnd"}}}]
}

所以我们最后的处理流程就是:
在这里插入图片描述
OK,至此,我们的管道就创建完毕了。我们接下来模拟在接口中写入数据,使用该管道。

3.3、创建索引

创建我们要写入的索引。

PUT newstable-index
{"settings": {"number_of_replicas": 0}, "mappings": {"properties": {"name":{"type": "keyword"},"tags":{"type": "keyword"}}}
}

3.4、创建实体类和接口

// 模拟mysql表的实体映射
@Data
@AllArgsConstructor
@NoArgsConstructor
public class NewsTableEntity {// mysql中的name映射private String name;// mysql中的tags映射private String tags;
}@RestController
@RequestMapping("/doc/es")
public class EsController {@AutowiredRestHighLevelClient restHighLevelClient;// 写入的ES索引名称private static final String INDEX_NAME = "newstable-index";// 指定pipeline名称,这里要和你在kibana创建的pipeline对上private static final String PIPELINE_NAME = "my-etl-newstable-pipeline";@GetMapping("/addDoc")public boolean addDoc() throws IOException {List<NewsTableEntity> tableEntities = new ArrayList<>();// 模拟mysql表数据,name字段都是有html标签的,而且前后都有空格,而且tags字段都是逗号分隔的字符串,有的还有敏感词tndtableEntities.add(new NewsTableEntity(" <H1> Hello world!</H1> ","HUAWEI,小米,苹果,tnd"));tableEntities.add(new NewsTableEntity(" <P>  你好</P> ","su7,bmw"));// 这里构造批量请求,你单个请求也一样的,实际开发一般都是批量,性能好一些BulkRequest bulkRequest = new BulkRequest();for (NewsTableEntity tableEntity : tableEntities) {IndexRequest request = new IndexRequest(INDEX_NAME).source(JSONObject.toJSONString(tableEntity), XContentType.JSON)// 指定pipeline,这里只是示例,实际开发中根据业务需求来.setPipeline(PIPELINE_NAME);bulkRequest.add(request);}BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);return bulkResponse.hasFailures();}
}

3.5、测试接口

在这里插入图片描述
查看ES中的数据,发现我们已经清洗成功了。name去除了空格和html标签,tags变为了数组,并且移除了敏感词。
在这里插入图片描述

相关文章:

橘子学ES实战操作之管道类型Ingest pipelines的基本使用

简介 我们在使用ES的时候&#xff0c;经常的用法就是把其他数据源比如Mysql的数据灌到ES中。 借用ES的一些功能来提供数据的全文检索以及聚合分析之类的功能。 在这个灌数据的过程中&#xff0c;我们经常会对数据做一些治理&#xff0c;类似ETL的能力。然后把治理后的数据写入…...

VScode:前端开发中的常用快捷键和技巧

1.菜单栏 2.内容相关&#xff1a; 格式化文档 搜索文件名 代码双开对比 右上角选择拆分...

Radmin-同一局域网只需IP就可以控制电脑

Radmin小编十多年前就在用&#xff0c;它是一款非常好用的局域网控制工具&#xff0c;可以完全替代Windows自带的远程桌面&#xff0c;它的安全性和便于操作性都比Windows的远程桌面好用。 Radmin还有一个好处&#xff0c;就是远程别人电脑时&#xff0c;对方那边毫无察觉&…...

【附答案】C/C++ 最常见50道面试题

文章目录 面试题 1&#xff1a;深入探讨变量的声明与定义的区别面试题 2&#xff1a;编写比较“零值”的if语句面试题 3&#xff1a;深入理解sizeof与strlen的差异面试题 4&#xff1a;解析C与C中static关键字的不同用途面试题 5&#xff1a;比较C语言的malloc与C的new面试题 6…...

C++音视频开发笔记目录

目录 &#x1f315;基础知识&#x1f319;详解FFmpeg&#x1f319;播放音视频时发生了什么&#xff1f; & 视频的编解码 & H264是什么&#xff1f; & MP4是什么&#xff1f; &#x1f315;流媒体环境搭建&#x1f319;windows安装FFMpeg&#x1f319;docker一键部署…...

spring项目整合log4j2日志框架(含log4j无法打印出日志的情况,含解决办法)

Spring整合Log4j2的整体流程 Lo 1&#xff09;导入log4j-core依赖 <!--导入日志框架--> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <ver…...

Linux网络:应用层协议http/https

认识URL URL是我们平时说的网址 eg&#xff1a;http常见的URL http://user:passwww.example.jp:80/dir/index.htm?uid1#ch1 注意&#xff1a; 服务器地址就是域名&#xff0c;相当于服务器ip地址 像http服务绑定80端口号&#xff0c;https服务绑定443端口。ssh服务端口绑定…...

transforemr网络理解

1.transformer网络中数据的流动过程&#xff1a; 2.transformer中残差的理解&#xff1a; 残差连接&#xff08;Residual Connection&#xff09; 的核心思想就是通过将输入与经过变化的输出相加&#xff0c;来最大限度地保留原始信息。 transforemr中注意力层网络和前馈神经…...

C++插件管理系统

插件加载目录结构 execute plug.exe plugify.dll plugify.pconfig res cpp-lang-module.pmodule example_plugin.pplugin bin cpp-lang-module.dll example_plugin.dll plugify.pconfig { "baseDir&…...

MyBatis 方法重载的陷阱及解决方案

在使用 MyBatis 进行开发时&#xff0c;尤其是使用注解模式&#xff08;如 Select、Insert 等&#xff09;时&#xff0c;开发者常常会遇到这样一个问题&#xff1a;为什么我的方法重载不能正常工作&#xff1f; 即使在 Java 中允许方法名相同但参数不同的重载&#xff0c;MyBa…...

STM32 ADC+DMA导致写FLASH失败

最近用STM32G070系列的ADCDMA采样时&#xff0c;遇到了一些小坑记录一下&#xff1b; 一、ADCDMA采样时进入死循环&#xff1b; 解决方法&#xff1a;ADC-dma死循环问题_stm32 adc dma死机-CSDN博客 将ADC的DMA中断调整为最高&#xff0c;且增大ADCHAL_ADC_Start_DMA(&ha…...

Python AttributeError: ‘dict_values’ object has no attribute ‘index’

Python AttributeError: ‘dict_values’ object has no attribute ‘index’ 在Python编程中&#xff0c;AttributeError 是一个常见的异常类型&#xff0c;通常发生在尝试访问对象没有的属性或方法时。今天&#xff0c;我们将深入探讨一个具体的 AttributeError&#xff1a;“…...

三丰云免费虚拟主机和免费云服务器评测

三丰云是一家提供免费虚拟主机和免费云服务器的知名服务提供商&#xff0c;深受用户好评。在这篇评测文章中&#xff0c;我们将对三丰云的免费虚拟主机和免费云服务器进行细致评测。 首先&#xff0c;我们来看看三丰云的免费虚拟主机服务。三丰云的免费虚拟主机提供稳定的服务器…...

iOS18更新暂停卡住?iOS18升级失败解决办法分享

最近&#xff0c;苹果发布了iOS 18&#xff0c;许多用户都迫不及待更新更新系统体验新功能。然而&#xff0c;一些用户在网上反馈在iOS 18 更新在安装过程中会卡住或暂停&#xff0c;无法正常升级成功。 如果遇到“iOS 18更新暂停或卡住”问题&#xff0c;不用担心。在本文中&a…...

单片机软件工程师确认硬件

文章目录 简介流程确认能连接usb和调试器确认芯片信息确认芯片存储是否正常确认屏幕是否能点亮确认其他硬件 方式方法 简介 硬件工程师给出板子后&#xff0c;后面就是软件工程师的事儿了。 通常来说并不会很顺利。 流程 确认能连接usb和调试器 也是在“计算机管理”中 或者…...

乐鑫无线WiFi芯片模组,家电设备智能联网新体验,启明云端乐鑫代理商

在当今这个数字化飞速发展的时代&#xff0c;智能家居和物联网(IoT)设备已经成为我们生活中不可或缺的一部分。随着技术的进步&#xff0c;我们对于设备联网的需求也在不断提升。 智能家居、智能门锁、智能医疗设备等&#xff0c;这些设备通过联网实现了数据的实时传输和远程控…...

小米嵌入式面试题目RTOS面试题目 嵌入式面试题目

第一章-非RTOS bootloader工作流程 MCU启动流程 通信协议&#xff0c;SPI IIC MCU怎么选型&#xff0c;STM32F1和F4有什么区别 外部RAM和内部RAM区别&#xff0c;怎么分配 外部总线和内部总线区别 MCU上的固件&#xff0c;数据是怎么分配的 MCU启动流程 IAP是怎么升级的…...

Iceberg与SparkSQL写操作整合

前言 spark操作iceberg之前先要配置spark catalogs&#xff0c;详情参考Iceberg与Spark整合环境配置。 有些操作需要在spark3中开启iceberg sql扩展。 Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API&#xff0c;在Spark版…...

MYSQL1

一、为什么学习数据库 1、岗位技能需求 2、现在的世界,得数据者得天下 3、存储数据的方法 4、程序,网站中,大量数据如何长久保存? 5、数据库是几乎软件体系中最核心的一个存在。 二、数据库相关概念 (一)数据库DB 数据库是将大量数据保存起来&#xff0c;通过计算机加…...

一文解答Swin Transformer + 代码【详解】

文章目录 1、Swin Transformer的介绍1.1 Swin Transformer解决图像问题的挑战1.2 Swin Transformer解决图像问题的方法 2、Swin Transformer的具体过程2.1 Patch Partition 和 Linear Embedding2.2 W-MSA、SW-MSA2.3 Swin Transformer代码解析2.3.1 代码解释 2.4 W-MSA和SW-MSA…...

Vue3:<Teleport>传送门组件的使用和注意事项

你好&#xff0c;我是沐爸&#xff0c;欢迎点赞、收藏、评论和关注。 Vue3 引入了一个新的内置组件 <Teleport>&#xff0c;它允许你将子组件树渲染到 DOM 中的另一个位置&#xff0c;而不是在父组件的模板中直接渲染。这对于需要跳出当前组件的 DOM 层级结构进行渲染的…...

项目之家:又一家项目信息发布合作对接及一手接单平台

这几天“小三劝退师时薪700”的消息甚嚣尘上&#xff0c;只能说从某一侧面来看心理咨询师这个职业的前景还是可以的&#xff0c;有兴趣的朋友可以关注下。话说上一篇文章给大家介绍了U客直谈&#xff0c;今天趁热打铁再给大家分享一个地推拉新项目合作平台~项目之家&#xff1a…...

02-java实习工作一个多月-经历分享

一、描述一下最近不写博客的原因 离我发java实习的工作的第一天的博客已经过去了一个多月了&#xff0c;本来还没入职的情况是打算每天工作都要写一份博客来记录一下的&#xff08;最坏的情况也是每周至少总结一下的&#xff09;&#xff0c;其实这个第一天的博客都是在公司快…...

JVM 调优篇2 jvm的内存结构以及堆栈参数设置与查看

一 jvm的内存模型 2.1 jvm内存模型概览 二 实操案例 2.1 设置和查看栈大小 1.代码 /*** 演示栈中的异常:StackOverflowError** author shkstart* create 2020 下午 9:08** 设置栈的大小&#xff1a; -Xss (-XX:ThreadStackSize)** -XX:PrintFlagsFinal*/ public class S…...

微信可以设置自动回复吗?

在日常的微信聊天中&#xff0c;我们或许会频繁地遭遇客户提出的相同问题&#xff0c;尤其是对于从事销售工作的朋友们来说&#xff0c;客户在添加好友后的第一句话往往是“在吗”或者“你好”。当我们的好友数量众多时&#xff0c;手动逐个回复可能会耗费大量的时间。因此&…...

同样数据源走RTMP播放延迟低还是RTSP低?

背景 在比较同一个数据源&#xff0c;是RTMP播放延迟低还是RTSP延迟低之前&#xff0c;我们先看看RTMP和RTSP的区别&#xff0c;我们知道&#xff0c;RTMP&#xff08;Real-Time Messaging Protocol&#xff09;和RTSP&#xff08;Real Time Streaming Protocol&#xff09;是…...

@开发者极客们,网易2024低代码大赛来啦

极客们&#xff0c;网易云信拍了拍你 9月6日起&#xff0c;2024网易低代码大赛正式开启啦&#xff01; 低代码大赛是由网易主办的权威赛事&#xff0c;鼓励开发者们用低代码开发的方式快速搭建应用&#xff0c;并最终以作品决出优胜。 从2022年11月起&#xff0c;网易低代码大赛…...

数据分析-16-时间序列分析的常用模型

1 什么是时间序列 时间序列是一组按时间顺序排列的数据点的集合,通常以固定的时间间隔进行观测。这些数据点可以是按小时、天、月甚至年进行采样的。时间序列在许多领域中都有广泛应用,例如金融、经济学、气象学和工程等。 时间序列的分析可以帮助我们理解和预测未来的趋势和…...

SpringMVC使用:类型转换数据格式化数据验证

01-类型转换器 先在pom.xml里面导入依赖&#xff0c;一个是mvc框架的依赖&#xff0c;一个是junit依赖 然后在web.xml里面导入以下配置&#xff08;配置的详细说明和用法我在前面文章中有写到&#xff09; 创建此测试类的方法用于测试springmvc是具备自动类型转换功能的 user属…...

多语言ASO – 本地化的10个技巧

ASO优化是一个复杂的领域&#xff0c;即使你只关注讲英语的用户。如果您想面向国际受众并在全球范围内发展您的应用程序业务&#xff0c;您必须在App Store和Google Play Store上本地化应用程序的产品页面。不过&#xff0c;应用程序商店本地化的过程也有很多陷阱。 应用商店本…...