当前位置: 首页 > news >正文

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.es</groupId><artifactId>es-sparkstreaming</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><!-- elasticsearch的客户端 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><!-- elasticsearch依赖2.x的log4j --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--        <dependency>--><!--            <groupId>com.fasterxml.jackson.core</groupId>--><!--            <artifactId>jackson-databind</artifactId>--><!--            <version>2.11.1</version>--><!--        </dependency>--><!--        &lt;!&ndash; junit单元测试 &ndash;&gt;--><!--        <dependency>--><!--            <groupId>junit</groupId>--><!--            <artifactId>junit</artifactId>--><!--            <version>4.12</version>--><!--        </dependency>--></dependencies>
</project>

2.功能实现

package com.atguigu.esimport org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentTypeobject SparkStreamingESTest {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")val ssc = new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)ds.foreachRDD(rdd => {rdd.foreach(data => {val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost",9200, "http")))val ss = data.split(" ")val request = new IndexRequest()request.index("product").id(ss(0))val json =s"""| {  "data" : "${ss(1)}" }|""".stripMarginrequest.source(json, XContentType.JSON)val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)println(response.getResult)client.close()})})ssc.start()ssc.awaitTermination()}
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

相关文章:

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍 Spark Streaming 是 Spark core API 的扩展&#xff0c;支持实时数据流的处理&#xff0c;并且具有可扩展&#xff0c; 高吞吐量&#xff0c;容错的特点。 数据可以从许多来源获取&#xff0c;如 Kafka &#xff0c; Flume &#xff0c; Kin…...

Kotlin 中的 协程 基础篇

一、什么叫协程 协程可以称为轻量级线程&#xff0c;线程代码块&#xff1b; 二、GlobalScope 协程 CoroutineScope (协程作用域) 的上下文中通过 launch、async 等构造器来启动。GlobalScope ,即全局作用域内启动了一个新的协程&#xff0c;这意味这该协程的生命周期只受整…...

SQL事务

事务的概念&#xff1a; 事务是在数据库上按照一定的逻辑顺序执行的任务序列&#xff0c;既可以由用户手动执行&#xff0c;也可以由某种数据库程序自动执行。事务就是一些SQL语句组&#xff08;每条单独的SQL语句也算一个事务&#xff09;&#xff0c;其中事务中的SQL…...

关于flutter中 initState() 与 setState() 用法

initState()函数是在组件渲染之前执行的。在Flutter中&#xff0c;initState()是StatefulWidget的生命周期方法之一&#xff0c;在调用build()方法之前被调用。当创建一个StatefulWidget并将其添加到组件树中时&#xff0c;Flutter会实例化该组件的状态对象&#xff0c;并在调用…...

智能电话机器人是如何自主学习的

电话机器人主要通过语音识别和针对语意的理解识别客户所说的内容&#xff0c;针对性的回答问题&#xff0c;为企业高效筛选意向客户。除了电话机器人语音识别之外&#xff0c;电话机器人能够自主学习&#xff0c;不断完善产品知识及话术等&#xff0c;是它智能的另一种体现。那…...

【Rust】Rust学习 第十八章模式用来匹配值的结构

模式是 Rust 中特殊的语法&#xff0c;它用来匹配类型中的结构&#xff0c;无论类型是简单还是复杂。结合使用模式和 match 表达式以及其他结构可以提供更多对程序控制流的支配权。模式由如下一些内容组合而成&#xff1a; 字面值解构的数组、枚举、结构体或者元组变量通配符占…...

我的学习笔记:数据处理

数据清洗 对数据进行处理和加工&#xff0c;以使其适合分析和建模。数据清洗包括去除重复数据、填补缺失值、处理异常值和转换数据格式等操作&#xff0c;以提高数据的可靠性和准确性&#xff0c;避免数据分析时出现偏差&#xff0c;提高决策的准确性。 数据去重&#xff1a;通…...

GB28181国标平台测试软件NTV-GBC(包含服务器和模拟客户端)

GB28181国标平台测试软件NTV-GBC用于对GB28181国标平台进行测试(测试用例需要服务器软件&#xff0c;服务器软件可以是任何标准的国标平台&#xff0c;我们测试使用的是NTV-GBS&#xff09;&#xff0c;软件实现了设备注册、注销、目录查询&#xff0c;消息订阅、INVITE&#x…...

云原生:重塑企业的技术疆界

云原生技术正在重新塑造我们对软件开发、部署和运维的理解。这些技术带来了灵活性、可扩展性以及在复杂环境中保证稳定性的可能性&#xff0c;这些都是企业在云原生场景中比较关注的问题。本文将主要聚焦于云原生场景&#xff0c;探讨其影响和作用。 云原生的定义 云原生计算基…...

华为星闪,一项将 “ 更稳 WiFi ” 和 “ 更好蓝牙 ” 融合起来的通信标准

兼顾多用途和专业化的 AI 大模型、移除安卓代码的 HarmonyOS NEXT 、给折叠屏应用提供适配方向的《 折叠屏/平板应用体验评估标准 》。。。 不过除了这些比较贴近我们普通用户&#xff0c;容易讲清楚的东西&#xff0c;华为还官宣了一个大家可能没注意的黑科技&#xff1a; 星…...

IDEA创建Mybatis格式XML文件

设置位置&#xff1a;File | Settings | Editor | File and Code Templates 选择Files&#xff0c;点击号 Name中输入xml模板名&#xff08;名称自行决定&#xff09;&#xff0c;后缀名extension输入xml&#xff08;固定&#xff09; 内容处输入Mybatis的xml文件模板内容&…...

二叉树中的最大路径和-递归

路径 被定义为一条从树中任意节点出发&#xff0c;沿父节点-子节点连接&#xff0c;达到任意节点的序列。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root…...

Python if-else 速记

文章目录 在 Python 中使用三元运算符作为 if-else 速记总结 编程中经常使用速记符号来简化我们的工作。 速记符号是一种可以更简洁、更省时省力地完成工作的方法。 本文将讨论 Python 中使用的速记符号作为 if-else 语句的快捷方式。 在 Python 中使用三元运算符作为 if-else…...

Python使用内置的json模块来处理JSON数据

目录 1、解释说明&#xff1a; 2、使用示例&#xff1a; 3、注意事项&#xff1a; 1、解释说明&#xff1a; 在Python中&#xff0c;我们可以使用内置的json模块来处理JSON数据。这个模块提供了四个主要的函数&#xff1a;dumps、loads、dump、load。 - dumps&#xff1a;将…...

亿赛通电子文档安全管理系统 RCE漏洞

亿赛通电子文档安全管理系统 RCE漏洞 一、 产品简介二、 漏洞概述三、 复现环境四、 漏洞复现小龙POC检测: 五、 修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失…...

信息安全面试题合集

0x00 前言 本篇会记录一些可能会遇到的面试题&#xff0c;持续更新 0x01 Web SQL注入 sql注入常见的闭合方式有哪些&#xff1f;Mysql5.0上下sql注入有什么区别&#xff1f;SQL注入空格被过滤&#xff0c;有什么绕过方式&#xff1f;过滤了逗号&#xff0c;有什么绕过方式&…...

vue 简单实验 自定义组件 传参数 props

1.代码 <script src"https://unpkg.com/vuenext" rel"external nofollow" ></script> <div id"todo-list-app"><todo-item v-bind:todo"todo1"></todo-item> </div> <script> const ListR…...

目标检测笔记(十一):如何结合特定区域进行目标检测(基于OpenCV的人脸检测实例)

文章目录 背景代码结果 背景 由于我们在做项目的时候可能会涉及到某个指定区域进行目标检测或者人脸识别等任务&#xff0c;所以这篇博客是为了探究如何在传统目标检测的基础上来结合特定区域进行检测&#xff0c;以OpenCV自带的包为例。 一般来说有两种方式实现区域指定&…...

PID直观感受简述

0、仿真控制框图 1、增加p的作用&#xff08;增加响应&#xff09;P 2、增加I的作用&#xff08;消除稳差&#xff09;PI 3、增加D的作用&#xff08;抑制波动&#xff09;PID 加入对噪声很敏 4、综合比对...

Tomcat运行后localhost:8080访问自己编写的网页

主要是注意项目结构&#xff0c;home.html放在src/resources/templates下的home.html下&#xff0c;application.properties可以不做任何配置。还有就是关于web包的位置&#xff0c;作者一开始将web包与tabtab包平行&#xff0c;访问8080出现了此类报错&#xff1a; Whitelabel…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包&#xff1a; for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

聊一聊接口测试的意义有哪些?

目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开&#xff0c;首…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?

uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件&#xff0c;用于在原生应用中加载 HTML 页面&#xff1a; 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit&#xff08;传感器服务&#xff09;# 前言 在运动类应用中&#xff0c;运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据&#xff0c;如配速、距离、卡路里消耗等&#xff0c;用户可以更清晰…...