当前位置: 首页 > article >正文

Spark-streaming核心编程

1.导入依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>

<version>3.0.0</version>

</dependency>

2.编写代码

创建SparkConfStreamingContext

定义Kafka相关参数,如bootstrap serversgroup idkeyvaluedeserializer

使用KafkaUtils.createDirectStream方法创建DStream,该方法接受StreamingContext、位置策略、消费者策略等参数。

提取数据中的value部分,并进行word count计算。

启动StreamingContext并等待其终止。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object DirectAPI {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

    val ssc = new StreamingContext(sparkConf,Seconds(3))

    //定义kafka相关参数

    val kafkaPara :Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG

      ->"node01:9092,node02:9092,node03:9092",

      ConsumerConfig.GROUP_ID_CONFIG->"kafka",

      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",

      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

    )

    //通过读取kafka数据,创建DStream

    val kafkaDStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](

      ssc,LocationStrategies.PreferConsistent,

      ConsumerStrategies.Subscribe[String,String](Set("kafka"),kafkaPara)

    )

    //提取出数据中的value部分

    val valueDStream :DStream[String] = kafkaDStream.map(record=>record.value())

    //wordCount计算逻辑

    valueDStream.flatMap(_.split(" "))

      .map((_,1))

      .reduceByKey(_+_)

      .print()

    ssc.start()

    ssc.awaitTermination()

  }

  }

3.运行程序

开启Kafka集群。

4.使用Kafka生产者产生数据。

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

​5运行Spark Streaming程序,接收Kafka生产的数据并进行处理。

6.查看消费进度

使用Kafka提供的kafka-consumer-groups.sh脚本查看消费组的消费进度。

相关文章:

Spark-streaming核心编程

1.导入依赖‌&#xff1a; <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.0.0</version> </dependency> 2.编写代码‌&#xff1a; 创建Sp…...

Exposure Adjusted Incidence Rate (EAIR) 暴露调整发病率:精准量化疾病风险

1. 核心概念 1.1 传统发病率的局限性 1.1.1 公式与定义 传统发病率公式为新发病例数除以总人口数乘以观察时间。例如在某社区观察1年,有10例新发病例,总人口1000人,发病率即为10/10001=0.01。 此公式假设所有个体暴露时间和风险相同,但实际中个体差异大,如部分人暴露时间…...

vue3+TS+echarts 折线图

需要实现的效果如下 <script setup lang"ts" name"RepsSingleLineChart">import * as echarts from echartsimport { getInitecharts } from /utils/echartimport type { EChartsOption } from echarts// 定义 props 类型interface Props {id: strin…...

MYSQL中为什么不建议delete数据

在 MySQL 中不建议频繁使用 delete 删除数据的原因主要在于性能、数据安全等方面的问题&#xff0c;以下是具体介绍&#xff1a; 性能问题 磁盘空间与碎片&#xff1a;delete 操作只是将数据标记为 “已删除”&#xff0c;并不会立即释放磁盘空间&#xff0c;频繁执行会导致大量…...

Linux多线程技术

什么是线程 在一个程序里的多执行路线就是线程。线程是进程中的最小执行单元&#xff0c;可理解为 “进程内的一条执行流水线”。 进程和线程的区别 进程是资源分配的基本单位&#xff0c;线程是CPU调度的基本单位。 fork创建出一个新的进程&#xff0c;会创建出一个新的拷贝&…...

12个HPC教程汇总!从入门到实战,覆盖分子模拟/材料计算/生物信息分析等多个领域

在科学研究、工程仿真、人工智能和大数据分析等领域&#xff0c;高性能计算 (High Performance Computing, HPC) 正扮演着越来越重要的角色。它通过并行处理、大规模计算资源的整合&#xff0c;极大提升了计算效率&#xff0c;使原本耗时数日的任务能够在数小时内完成。 随着计…...

[OpenGL] Lambertian材质漫反射BRDF方程的解释与推导

一、简介 本文简单的介绍了 Physical Based Rendering, PBR 中的 Lambertian 材质漫反射BRDF公式 f r l a m b e r t i a n c d i f f π fr_{lambertian}\frac{c_{diff}}{\pi} frlambertian​πcdiff​​的推导。 二、漫反射项 根据 渲染方程&#xff1a; L o ( v ) ∫ …...

小火电视桌面TV版下载-小火桌面纯净版下载-官方历史版本安装包

别再费心地寻找小火桌面的官方历史版本安装包啦&#xff0c;试试乐看家桌面吧&#xff0c;它作为纯净版本的第三方桌面&#xff0c;具有诸多优点。 界面简洁纯净&#xff1a;乐看家桌面设计简洁流畅&#xff0c;页面简洁、纯净无广告&#xff0c;为用户打造了一个干净的电视操…...

VSFTPD+虚拟用户+SSL/TLS部署安装全过程(踩坑全通)

Author : Spinach | GHB Link : http://blog.csdn.net/bocai8058文章目录 前言准备配置虚拟用户1.创建虚拟用户列表文件2.生成数据库文件3.设置虚拟用户独立访问权限 配置PAM认证1.创建PAM配置文件2.测试PAM认证 创建虚拟用户映射的系统用户生成SSL/TLS证书配置VSFTPD服务1…...

07 Python 字符串全解析

文章目录 一. 字符串的定义二. 字符串的基本用法1. 访问字符串中的字符2. 字符串切片3. 字符串拼接4. 字符串重复5.字符串比较6.字符串成员运算 三. 字符串的常用方法1. len() 函数2. upper() 和 lower() 方法3. strip() 方法4. replace() 方法5. split() 方法 四. 字符串的进阶…...

androidstudio安装配置

B站配置视频AndroidStudio安装配置教程&#xff08;最新版本教程&#xff09;3分钟搞定 快速安装使用_哔哩哔哩_bilibili 1、环境变量 D:\AndroidSdk ANDROID_HOME ANDROID_SDK_HOME 2、新建 3、配置 distributionUrlhttps://mirrors.cloud.tencent.com/gradle/gradle-8.11.1-…...

全面解析 MCP(Model Context Protocol):AI 大模型的“万能连接器”

一、MCP 的定义与技术定位 **MCP(Model Context Protocol,模型上下文协议)**是由 Anthropic 公司于 2024 年 11 月推出的开源协议,旨在为 AI 大模型与外部数据源、工具之间建立标准化连接通道。它被业界称为 “AI 的 USB-C 接口”,通过统一的通信协议和数据结构,解决大模…...

《AI大模型趣味实战》基于RAG向量数据库的知识库AI问答助手设计与实现

基于RAG向量数据库的知识库AI问答助手设计与实现 引言 随着大语言模型&#xff08;LLM&#xff09;技术的快速发展&#xff0c;构建本地知识库AI问答助手已成为许多企业级应用的需求。本研究报告将详细介绍如何基于FLASK开发一个使用本地OLLAMA大模型底座的知识库AI问答助手&…...

Lua 第8部分 补充知识

8.1 局部变量和代码块 Lua 语言中的变量在默认情况下是全局变量 &#xff0c;所有的局部变量在使用前必须声明 。 与全局变量不同&#xff0c;局部变量的生效范围仅限于声明它的代码块。一个代码块&#xff08; block &#xff09;是一个控制结构的主体&#xff0c;或是一个函…...

正则表达式三剑客之——awk命令

目录 一.什么是awk 二.awk的语法格式 1.选项 2. 模式&#xff08;Pattern&#xff09; 3. 操作&#xff08;Action&#xff09; 4. 输入文件&#xff08;file&#xff09; 5.总结 三.awk的工作原理 1. 逐行扫描输入 2. 匹配模式 1.正则表达式&#xff1a; 2.逻辑…...

BeeWorks Meet:私有化部署视频会议的高效选择

在数字化时代&#xff0c;视频会议已成为企业沟通协作的重要工具。然而&#xff0c;对于金融、政务、医疗等对数据安全和隐私保护要求极高的行业来说&#xff0c;传统的公有云视频会议解决方案往往难以满足其严格的安全标准。此时&#xff0c;BeeWorks Meet 私有化部署视频会议…...

[Mybatis-plus]

简介 MyBatis-Plus &#xff08;简称 MP&#xff09;是一个 MyBatis的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变。Mybatis-plus官网地址 注意&#xff0c;在引入了mybatis-plus之后&#xff0c;不要再额外引入mybatis和mybatis-spring&#xff0c;避免因为版本…...

IPv6 技术细节 | 源 IP 地址选择 / Anycast / 地址自动配置 / 地址聚类分配

注&#xff1a;本文为 “IPv6 技术细节” 相关文章合集。 部分文章中提到的其他文章&#xff0c;一并引入。 略作重排&#xff0c;未整理去重。 如有内容异常&#xff0c;请看原文。 闲谈 IPv6 - 典型特征的一些技术细节 iteye_21199 于 2012-11-10 20:54:00 发布 0. 巨大的…...

【高频考点精讲】ES6 String的新增方法,处理字符串更方便了

ES6 String的新增方法:处理字符串从未如此优雅 【初级】前端开发工程师面试100题(一) 【初级】前端开发工程师面试100题(二) 【初级】前端开发工程师的面试100题(速记版) 作为天天和字符串打交道的码农,谁还没被indexOf和substring折磨过?ES6给String对象新增的几个方…...

【工具】使用 MCP Inspector 调试服务的完全指南

Model Context Protocol (MCP) Inspector 是一个交互式开发工具&#xff0c;专为测试和调试 MCP 服务器而设计。本文将详细介绍如何使用 Inspector 工具有效地调试和测试 MCP 服务。 1. MCP Inspector 简介 MCP Inspector 提供了直观的界面&#xff0c;让开发者能够&#xff…...

【音视频】AVIO输入模式

内存IO模式 AVIOContext *avio_alloc_context( unsigned char *buffer, int buffer_size, int write_flag, void *opaque, int (*read_packet)(void *opaque, uint8_t *buf, int buf_size), int (*write_packet)(void *opaque, uint8_t *buf, int buf_size), int64_t (*seek)(…...

AI与思维模型【76】——SWOT思维模型

一、定义 SWOT思维模型是一种用于分析事物内部和外部因素的战略规划工具。其中&#xff0c;S代表优势&#xff08;Strengths&#xff09;&#xff0c;是指事物自身所具备的独特能力、资源或特点&#xff0c;这些因素有助于其在竞争中取得优势&#xff1b;W代表劣势&#xff08…...

大模型提示词如何编写

一、提示词的核心三要素 明确目标&#xff08;What&#xff09; 告诉 AI「你要它做什么」&#xff0c;越具体越好。 ❌ 模糊&#xff1a;写一篇文章 ✅ 清晰&#xff1a;写一篇 800 字的高考作文&#xff0c;主题 “坚持与创新”&#xff0c;结构分引言、三个论点&#xff08;…...

python如何取消word中的缩进

在python-docx中&#xff0c;取消缩进可以通过将相应的缩进属性设置为None或0来实现。以下是取消不同类型缩进的方法&#xff1a; 取消左缩进 from docx import Documentdoc Document(existing_document.docx)for paragraph in doc.paragraphs:# 取消左缩进paragraph.paragr…...

DDL小练习

1.创建一张t_user表 要求属性有id(INT),name(VARCHAR),sex(VARCHAR),birthday(DATE) 其中id和name不能为空&#xff0c;添加数据并测试。 创建数据库 create database spt2503; 创建数据库中的t_user表 create table t_user (id int not null, name varchar(20) not…...

Uniapp:scroll-view(区域滑动视图)

目录 一、基本概述二、属性说明三、基本使用3.1 纵向滚动3.2 横向滚动一、基本概述 scroll-view,可滚动视图区域。用于区域滚动。 二、属性说明 属性名类型默认值说明平台差异说明scroll-xBooleanfalse允许横向滚动scroll-yBooleanfalse允许纵向滚动三、基本使用 3.1 纵向滚…...

【前端】【面试】在前端开发中,如何实现图片的渐进式加载,以及这样做的好处是什么?

题目&#xff1a;在前端开发中&#xff0c;如何实现图片的渐进式加载&#xff0c;以及这样做的好处是什么&#xff1f; 在浏览器端实现图片的“渐进式加载”&#xff08;Progressive Image Loading&#xff09;常用的三种方式 方法思路典型实现要点适用场景优缺点简述1. 使…...

单精度浮点运算/定点运算下 MATLAB (VS) VIVADO

VIVADO中单精度浮点数IP核计算结果与MATLAB单精度浮点数计算结果的对比 MATLAB定点运算仿真&#xff0c;对比VIVADO计算的结果 目录 前言 一、VIVADO与MATLAB单精度浮点数运算结果对比 二、MATLAB定点运算仿真 总结 前言 本文介绍了怎么在MATLAB中使用单精度浮点数进行运算…...

基于大模型对先天性巨结肠全流程预测及医疗方案研究报告

目录 一、引言 1.1 研究背景与意义 1.2 研究目的与创新点 二、大模型在先天性巨结肠预测中的理论基础 2.1 大模型概述 2.2 大模型预测先天性巨结肠的可行性分析 三、术前预测与准备方案 3.1 大模型对术前病情的预测 3.1.1 疾病确诊预测 3.1.2 病情严重程度评估 3.2 …...

【AI插件开发】Notepad++ AI插件开发1.0发布和使用说明

一、产品简介 AiCoder是一款为Notepad设计的轻量级AI辅助插件&#xff0c;提供以下核心功能&#xff1a; 嵌入式提问&#xff1a;对选中的文本内容进行AI分析&#xff0c;通过侧边栏聊天界面与AI交互&#xff0c;实现多轮对话、问题解答或代码生成。对话式提问&#xff1a;独…...