当前位置: 首页 > news >正文

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.es</groupId><artifactId>es-sparkstreaming</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><!-- elasticsearch的客户端 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><!-- elasticsearch依赖2.x的log4j --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--        <dependency>--><!--            <groupId>com.fasterxml.jackson.core</groupId>--><!--            <artifactId>jackson-databind</artifactId>--><!--            <version>2.11.1</version>--><!--        </dependency>--><!--        &lt;!&ndash; junit单元测试 &ndash;&gt;--><!--        <dependency>--><!--            <groupId>junit</groupId>--><!--            <artifactId>junit</artifactId>--><!--            <version>4.12</version>--><!--        </dependency>--></dependencies>
</project>

2.功能实现

package com.atguigu.esimport org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentTypeobject SparkStreamingESTest {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")val ssc = new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)ds.foreachRDD(rdd => {rdd.foreach(data => {val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost",9200, "http")))val ss = data.split(" ")val request = new IndexRequest()request.index("product").id(ss(0))val json =s"""| {  "data" : "${ss(1)}" }|""".stripMarginrequest.source(json, XContentType.JSON)val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)println(response.getResult)client.close()})})ssc.start()ssc.awaitTermination()}
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

相关文章:

Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍 Spark Streaming 是 Spark core API 的扩展&#xff0c;支持实时数据流的处理&#xff0c;并且具有可扩展&#xff0c; 高吞吐量&#xff0c;容错的特点。 数据可以从许多来源获取&#xff0c;如 Kafka &#xff0c; Flume &#xff0c; Kin…...

Kotlin 中的 协程 基础篇

一、什么叫协程 协程可以称为轻量级线程&#xff0c;线程代码块&#xff1b; 二、GlobalScope 协程 CoroutineScope (协程作用域) 的上下文中通过 launch、async 等构造器来启动。GlobalScope ,即全局作用域内启动了一个新的协程&#xff0c;这意味这该协程的生命周期只受整…...

SQL事务

事务的概念&#xff1a; 事务是在数据库上按照一定的逻辑顺序执行的任务序列&#xff0c;既可以由用户手动执行&#xff0c;也可以由某种数据库程序自动执行。事务就是一些SQL语句组&#xff08;每条单独的SQL语句也算一个事务&#xff09;&#xff0c;其中事务中的SQL…...

关于flutter中 initState() 与 setState() 用法

initState()函数是在组件渲染之前执行的。在Flutter中&#xff0c;initState()是StatefulWidget的生命周期方法之一&#xff0c;在调用build()方法之前被调用。当创建一个StatefulWidget并将其添加到组件树中时&#xff0c;Flutter会实例化该组件的状态对象&#xff0c;并在调用…...

智能电话机器人是如何自主学习的

电话机器人主要通过语音识别和针对语意的理解识别客户所说的内容&#xff0c;针对性的回答问题&#xff0c;为企业高效筛选意向客户。除了电话机器人语音识别之外&#xff0c;电话机器人能够自主学习&#xff0c;不断完善产品知识及话术等&#xff0c;是它智能的另一种体现。那…...

【Rust】Rust学习 第十八章模式用来匹配值的结构

模式是 Rust 中特殊的语法&#xff0c;它用来匹配类型中的结构&#xff0c;无论类型是简单还是复杂。结合使用模式和 match 表达式以及其他结构可以提供更多对程序控制流的支配权。模式由如下一些内容组合而成&#xff1a; 字面值解构的数组、枚举、结构体或者元组变量通配符占…...

我的学习笔记:数据处理

数据清洗 对数据进行处理和加工&#xff0c;以使其适合分析和建模。数据清洗包括去除重复数据、填补缺失值、处理异常值和转换数据格式等操作&#xff0c;以提高数据的可靠性和准确性&#xff0c;避免数据分析时出现偏差&#xff0c;提高决策的准确性。 数据去重&#xff1a;通…...

GB28181国标平台测试软件NTV-GBC(包含服务器和模拟客户端)

GB28181国标平台测试软件NTV-GBC用于对GB28181国标平台进行测试(测试用例需要服务器软件&#xff0c;服务器软件可以是任何标准的国标平台&#xff0c;我们测试使用的是NTV-GBS&#xff09;&#xff0c;软件实现了设备注册、注销、目录查询&#xff0c;消息订阅、INVITE&#x…...

云原生:重塑企业的技术疆界

云原生技术正在重新塑造我们对软件开发、部署和运维的理解。这些技术带来了灵活性、可扩展性以及在复杂环境中保证稳定性的可能性&#xff0c;这些都是企业在云原生场景中比较关注的问题。本文将主要聚焦于云原生场景&#xff0c;探讨其影响和作用。 云原生的定义 云原生计算基…...

华为星闪,一项将 “ 更稳 WiFi ” 和 “ 更好蓝牙 ” 融合起来的通信标准

兼顾多用途和专业化的 AI 大模型、移除安卓代码的 HarmonyOS NEXT 、给折叠屏应用提供适配方向的《 折叠屏/平板应用体验评估标准 》。。。 不过除了这些比较贴近我们普通用户&#xff0c;容易讲清楚的东西&#xff0c;华为还官宣了一个大家可能没注意的黑科技&#xff1a; 星…...

IDEA创建Mybatis格式XML文件

设置位置&#xff1a;File | Settings | Editor | File and Code Templates 选择Files&#xff0c;点击号 Name中输入xml模板名&#xff08;名称自行决定&#xff09;&#xff0c;后缀名extension输入xml&#xff08;固定&#xff09; 内容处输入Mybatis的xml文件模板内容&…...

二叉树中的最大路径和-递归

路径 被定义为一条从树中任意节点出发&#xff0c;沿父节点-子节点连接&#xff0c;达到任意节点的序列。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root…...

Python if-else 速记

文章目录 在 Python 中使用三元运算符作为 if-else 速记总结 编程中经常使用速记符号来简化我们的工作。 速记符号是一种可以更简洁、更省时省力地完成工作的方法。 本文将讨论 Python 中使用的速记符号作为 if-else 语句的快捷方式。 在 Python 中使用三元运算符作为 if-else…...

Python使用内置的json模块来处理JSON数据

目录 1、解释说明&#xff1a; 2、使用示例&#xff1a; 3、注意事项&#xff1a; 1、解释说明&#xff1a; 在Python中&#xff0c;我们可以使用内置的json模块来处理JSON数据。这个模块提供了四个主要的函数&#xff1a;dumps、loads、dump、load。 - dumps&#xff1a;将…...

亿赛通电子文档安全管理系统 RCE漏洞

亿赛通电子文档安全管理系统 RCE漏洞 一、 产品简介二、 漏洞概述三、 复现环境四、 漏洞复现小龙POC检测: 五、 修复建议 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失…...

信息安全面试题合集

0x00 前言 本篇会记录一些可能会遇到的面试题&#xff0c;持续更新 0x01 Web SQL注入 sql注入常见的闭合方式有哪些&#xff1f;Mysql5.0上下sql注入有什么区别&#xff1f;SQL注入空格被过滤&#xff0c;有什么绕过方式&#xff1f;过滤了逗号&#xff0c;有什么绕过方式&…...

vue 简单实验 自定义组件 传参数 props

1.代码 <script src"https://unpkg.com/vuenext" rel"external nofollow" ></script> <div id"todo-list-app"><todo-item v-bind:todo"todo1"></todo-item> </div> <script> const ListR…...

目标检测笔记(十一):如何结合特定区域进行目标检测(基于OpenCV的人脸检测实例)

文章目录 背景代码结果 背景 由于我们在做项目的时候可能会涉及到某个指定区域进行目标检测或者人脸识别等任务&#xff0c;所以这篇博客是为了探究如何在传统目标检测的基础上来结合特定区域进行检测&#xff0c;以OpenCV自带的包为例。 一般来说有两种方式实现区域指定&…...

PID直观感受简述

0、仿真控制框图 1、增加p的作用&#xff08;增加响应&#xff09;P 2、增加I的作用&#xff08;消除稳差&#xff09;PI 3、增加D的作用&#xff08;抑制波动&#xff09;PID 加入对噪声很敏 4、综合比对...

Tomcat运行后localhost:8080访问自己编写的网页

主要是注意项目结构&#xff0c;home.html放在src/resources/templates下的home.html下&#xff0c;application.properties可以不做任何配置。还有就是关于web包的位置&#xff0c;作者一开始将web包与tabtab包平行&#xff0c;访问8080出现了此类报错&#xff1a; Whitelabel…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

可靠性+灵活性:电力载波技术在楼宇自控中的核心价值

可靠性灵活性&#xff1a;电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中&#xff0c;电力载波技术&#xff08;PLC&#xff09;凭借其独特的优势&#xff0c;正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据&#xff0c;无需额外布…...

Go 语言接口详解

Go 语言接口详解 核心概念 接口定义 在 Go 语言中&#xff0c;接口是一种抽象类型&#xff0c;它定义了一组方法的集合&#xff1a; // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的&#xff1a; // 矩形结构体…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

排序算法总结(C++)

目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指&#xff1a;同样大小的样本 **&#xff08;同样大小的数据&#xff09;**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;已成为技术领域的焦点。从智能写作到代码生成&#xff0c;LLM 的应用场景不断扩展&#xff0c;深刻改变了我们的工作和生活方式。然而&#xff0c;理解这些模型的内部…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...

算法打卡第18天

从中序与后序遍历序列构造二叉树 (力扣106题) 给定两个整数数组 inorder 和 postorder &#xff0c;其中 inorder 是二叉树的中序遍历&#xff0c; postorder 是同一棵树的后序遍历&#xff0c;请你构造并返回这颗 二叉树 。 示例 1: 输入&#xff1a;inorder [9,3,15,20,7…...

解析两阶段提交与三阶段提交的核心差异及MySQL实现方案

引言 在分布式系统的事务处理中&#xff0c;如何保障跨节点数据操作的一致性始终是核心挑战。经典的两阶段提交协议&#xff08;2PC&#xff09;通过准备阶段与提交阶段的协调机制&#xff0c;以同步决策模式确保事务原子性。其改进版本三阶段提交协议&#xff08;3PC&#xf…...