当前位置: 首页 > news >正文

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.es</groupId><artifactId>es-sparkstreaming</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><!-- elasticsearch的客户端 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><!-- elasticsearch依赖2.x的log4j --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--        <dependency>--><!--            <groupId>com.fasterxml.jackson.core</groupId>--><!--            <artifactId>jackson-databind</artifactId>--><!--            <version>2.11.1</version>--><!--        </dependency>--><!--        &lt;!&ndash; junit单元测试 &ndash;&gt;--><!--        <dependency>--><!--            <groupId>junit</groupId>--><!--            <artifactId>junit</artifactId>--><!--            <version>4.12</version>--><!--        </dependency>--></dependencies>
</project>

2.功能实现

package com.atguigu.esimport org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentTypeobject SparkStreamingESTest {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")val ssc = new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)ds.foreachRDD(rdd => {rdd.foreach(data => {val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost",9200, "http")))val ss = data.split(" ")val request = new IndexRequest()request.index("product").id(ss(0))val json =s"""| {  "data" : "${ss(1)}" }|""".stripMarginrequest.source(json, XContentType.JSON)val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)println(response.getResult)client.close()})})ssc.start()ssc.awaitTermination()}
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

相关文章:

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍 Spark Streaming 是 Spark core API 的扩展&#xff0c;支持实时数据流的处理&#xff0c;并且具有可扩展&#xff0c; 高吞吐量&#xff0c;容错的特点。 数据可以从许多来源获取&#xff0c;如 Kafka &#xff0c; Flume &#xff0c; Kin…...

Kotlin 中的 协程 基础篇

一、什么叫协程 协程可以称为轻量级线程&#xff0c;线程代码块&#xff1b; 二、GlobalScope 协程 CoroutineScope (协程作用域) 的上下文中通过 launch、async 等构造器来启动。GlobalScope ,即全局作用域内启动了一个新的协程&#xff0c;这意味这该协程的生命周期只受整…...

SQL事务

事务的概念&#xff1a; 事务是在数据库上按照一定的逻辑顺序执行的任务序列&#xff0c;既可以由用户手动执行&#xff0c;也可以由某种数据库程序自动执行。事务就是一些SQL语句组&#xff08;每条单独的SQL语句也算一个事务&#xff09;&#xff0c;其中事务中的SQL…...

关于flutter中 initState() 与 setState() 用法

initState()函数是在组件渲染之前执行的。在Flutter中&#xff0c;initState()是StatefulWidget的生命周期方法之一&#xff0c;在调用build()方法之前被调用。当创建一个StatefulWidget并将其添加到组件树中时&#xff0c;Flutter会实例化该组件的状态对象&#xff0c;并在调用…...

智能电话机器人是如何自主学习的

电话机器人主要通过语音识别和针对语意的理解识别客户所说的内容&#xff0c;针对性的回答问题&#xff0c;为企业高效筛选意向客户。除了电话机器人语音识别之外&#xff0c;电话机器人能够自主学习&#xff0c;不断完善产品知识及话术等&#xff0c;是它智能的另一种体现。那…...

【Rust】Rust学习 第十八章模式用来匹配值的结构

模式是 Rust 中特殊的语法&#xff0c;它用来匹配类型中的结构&#xff0c;无论类型是简单还是复杂。结合使用模式和 match 表达式以及其他结构可以提供更多对程序控制流的支配权。模式由如下一些内容组合而成&#xff1a; 字面值解构的数组、枚举、结构体或者元组变量通配符占…...

我的学习笔记:数据处理

数据清洗 对数据进行处理和加工&#xff0c;以使其适合分析和建模。数据清洗包括去除重复数据、填补缺失值、处理异常值和转换数据格式等操作&#xff0c;以提高数据的可靠性和准确性&#xff0c;避免数据分析时出现偏差&#xff0c;提高决策的准确性。 数据去重&#xff1a;通…...

GB28181国标平台测试软件NTV-GBC(包含服务器和模拟客户端)

GB28181国标平台测试软件NTV-GBC用于对GB28181国标平台进行测试(测试用例需要服务器软件&#xff0c;服务器软件可以是任何标准的国标平台&#xff0c;我们测试使用的是NTV-GBS&#xff09;&#xff0c;软件实现了设备注册、注销、目录查询&#xff0c;消息订阅、INVITE&#x…...

云原生:重塑企业的技术疆界

云原生技术正在重新塑造我们对软件开发、部署和运维的理解。这些技术带来了灵活性、可扩展性以及在复杂环境中保证稳定性的可能性&#xff0c;这些都是企业在云原生场景中比较关注的问题。本文将主要聚焦于云原生场景&#xff0c;探讨其影响和作用。 云原生的定义 云原生计算基…...

华为星闪,一项将 “ 更稳 WiFi ” 和 “ 更好蓝牙 ” 融合起来的通信标准

兼顾多用途和专业化的 AI 大模型、移除安卓代码的 HarmonyOS NEXT 、给折叠屏应用提供适配方向的《 折叠屏/平板应用体验评估标准 》。。。 不过除了这些比较贴近我们普通用户&#xff0c;容易讲清楚的东西&#xff0c;华为还官宣了一个大家可能没注意的黑科技&#xff1a; 星…...

IDEA创建Mybatis格式XML文件

设置位置&#xff1a;File | Settings | Editor | File and Code Templates 选择Files&#xff0c;点击号 Name中输入xml模板名&#xff08;名称自行决定&#xff09;&#xff0c;后缀名extension输入xml&#xff08;固定&#xff09; 内容处输入Mybatis的xml文件模板内容&…...

二叉树中的最大路径和-递归

路径 被定义为一条从树中任意节点出发&#xff0c;沿父节点-子节点连接&#xff0c;达到任意节点的序列。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root…...

Python if-else 速记

文章目录 在 Python 中使用三元运算符作为 if-else 速记总结 编程中经常使用速记符号来简化我们的工作。 速记符号是一种可以更简洁、更省时省力地完成工作的方法。 本文将讨论 Python 中使用的速记符号作为 if-else 语句的快捷方式。 在 Python 中使用三元运算符作为 if-else…...

Python使用内置的json模块来处理JSON数据

目录 1、解释说明&#xff1a; 2、使用示例&#xff1a; 3、注意事项&#xff1a; 1、解释说明&#xff1a; 在Python中&#xff0c;我们可以使用内置的json模块来处理JSON数据。这个模块提供了四个主要的函数&#xff1a;dumps、loads、dump、load。 - dumps&#xff1a;将…...

亿赛通电子文档安全管理系统 RCE漏洞

亿赛通电子文档安全管理系统 RCE漏洞 一、 产品简介二、 漏洞概述三、 复现环境四、 漏洞复现小龙POC检测: 五、 修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失…...

信息安全面试题合集

0x00 前言 本篇会记录一些可能会遇到的面试题&#xff0c;持续更新 0x01 Web SQL注入 sql注入常见的闭合方式有哪些&#xff1f;Mysql5.0上下sql注入有什么区别&#xff1f;SQL注入空格被过滤&#xff0c;有什么绕过方式&#xff1f;过滤了逗号&#xff0c;有什么绕过方式&…...

vue 简单实验 自定义组件 传参数 props

1.代码 <script src"https://unpkg.com/vuenext" rel"external nofollow" ></script> <div id"todo-list-app"><todo-item v-bind:todo"todo1"></todo-item> </div> <script> const ListR…...

目标检测笔记(十一):如何结合特定区域进行目标检测(基于OpenCV的人脸检测实例)

文章目录 背景代码结果 背景 由于我们在做项目的时候可能会涉及到某个指定区域进行目标检测或者人脸识别等任务&#xff0c;所以这篇博客是为了探究如何在传统目标检测的基础上来结合特定区域进行检测&#xff0c;以OpenCV自带的包为例。 一般来说有两种方式实现区域指定&…...

PID直观感受简述

0、仿真控制框图 1、增加p的作用&#xff08;增加响应&#xff09;P 2、增加I的作用&#xff08;消除稳差&#xff09;PI 3、增加D的作用&#xff08;抑制波动&#xff09;PID 加入对噪声很敏 4、综合比对...

Tomcat运行后localhost:8080访问自己编写的网页

主要是注意项目结构&#xff0c;home.html放在src/resources/templates下的home.html下&#xff0c;application.properties可以不做任何配置。还有就是关于web包的位置&#xff0c;作者一开始将web包与tabtab包平行&#xff0c;访问8080出现了此类报错&#xff1a; Whitelabel…...

VS Code终端切换全攻略:从PowerShell到CMD的保姆级教程(含常见问题解决)

VS Code终端切换全攻略&#xff1a;从PowerShell到CMD的保姆级教程&#xff08;含常见问题解决&#xff09; 在开发者的日常工作中&#xff0c;终端是不可或缺的工具。VS Code作为最受欢迎的代码编辑器之一&#xff0c;其内置终端功能强大且高度可定制。然而&#xff0c;许多开…...

zotero-style:智能文献管理在学术研究中的创新实践

zotero-style&#xff1a;智能文献管理在学术研究中的创新实践 【免费下载链接】zotero-style zotero-style - 一个 Zotero 插件&#xff0c;提供了一系列功能来增强 Zotero 的用户体验&#xff0c;如阅读进度可视化和标签管理&#xff0c;适合研究人员和学者。 项目地址: ht…...

STP安全特性实战:如何用bpduguard和bpdufilter防止网络攻击(附真实案例)

STP安全特性实战&#xff1a;如何用bpduguard和bpdufilter防止网络攻击&#xff08;附真实案例&#xff09; 在企业网络架构中&#xff0c;生成树协议&#xff08;STP&#xff09;的安全防护常常被忽视&#xff0c;直到某天凌晨2点&#xff0c;值班工程师突然接到全网瘫痪的告警…...

CREST:如何用5分钟开启分子构象探索之旅?

CREST&#xff1a;如何用5分钟开启分子构象探索之旅&#xff1f; 【免费下载链接】crest Conformer-Rotamer Ensemble Sampling Tool based on the xtb Semiempirical Extended Tight-Binding Program Package 项目地址: https://gitcode.com/gh_mirrors/crest/crest 在…...

Windows 11优化终极指南:一键清理预装软件与提升系统性能

Windows 11优化终极指南&#xff1a;一键清理预装软件与提升系统性能 【免费下载链接】Win11Debloat 一个简单的PowerShell脚本&#xff0c;用于从Windows中移除预装的无用软件&#xff0c;禁用遥测&#xff0c;从Windows搜索中移除Bing&#xff0c;以及执行各种其他更改以简化…...

Llama-3.2V-11B-cot惊艳案例:电影截图角色关系推演与剧情发展预测展示

Llama-3.2V-11B-cot惊艳案例&#xff1a;电影截图角色关系推演与剧情发展预测展示 1. 视觉推理工具简介 Llama-3.2V-11B-cot是基于Meta多模态大模型开发的高性能视觉推理工具&#xff0c;专为双卡4090环境深度优化。该工具不仅修复了视觉权重加载的关键问题&#xff0c;还支持…...

Python多线程真能并行了吗?(GIL绕过技术全图谱:subprocess/numba/multiprocessing/cython/rustpy)

第一章&#xff1a;Python无锁GIL环境下的并发模型面试题汇总Python 的全局解释器锁&#xff08;GIL&#xff09;长期被视为多线程并发的瓶颈&#xff0c;但近年来随着 CPython 3.13 引入实验性无锁 GIL&#xff08;--without-pymalloc 配合 --with-per-object-gil 原型&#x…...

Mac上PPT讲稿一键变文稿:用AppleScript自动化导出备注到TXT(附完整代码)

Mac上PPT讲稿一键变文稿&#xff1a;用AppleScript自动化导出备注到TXT&#xff08;附完整代码&#xff09; 每次做完PPT&#xff0c;看着密密麻麻的备注栏&#xff0c;你是不是也头疼怎么把这些零散的讲稿整理成连贯的文档&#xff1f;作为一位经常需要准备培训材料的讲师&…...

Qt官网抽风连不上?亲测有效的Qt6在线安装网络问题终极解决手册

Qt6在线安装网络问题终极解决手册&#xff1a;从反复失败到一次成功 看着Qt安装器上那个刺眼的"无法连接服务器"提示&#xff0c;我第27次点击了重试按钮。作为一名有十年经验的开发者&#xff0c;我从未想过会在安装环境这一步耗费整整一个下午。这不是个例——根据…...

《数据结构》| 第十章 排序算法实战指南

1. 排序算法入门&#xff1a;为什么我们需要这么多排序方法&#xff1f; 第一次接触排序算法时&#xff0c;很多人都会有这样的疑问&#xff1a;既然都能把数据排好序&#xff0c;为什么还要学这么多种算法&#xff1f;这就像装修时既有电钻又有锤子——每种工具都有最适合的使…...