当前位置: 首页 > news >正文

消息队列(12) - 定义服务器类

目录

  • 前言
  • 设计思想

前言

之前,我们写了通信协议的具体设计,接下来我们设计服务器类

设计思想

我们先只考虑一个虚拟主机的情况下, 在一个虚拟主机的情况下,我们需要有一个session会话来帮助我们存储信息,并且既然是网络通信,那么socket关键字肯定也必不可少,我们在引入一个线程池,用来处理多个客户端的请求

  private ServerSocket serverSocket = null;// 当前考虑一个 BrokerServer 上只有一个 虚拟主机private VirtuaHost virtualHost = new VirtuaHost("default");// 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)// 此处的 key 是 channelId, value 为对应的 Socket 对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();// 引入一个线程池, 来处理多个客户端的请求.private ExecutorService executorService = null;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable = true;

代码实现

package com.example.demo.mqServer;import com.example.demo.Common.*;
import com.example.demo.mqServer.core.BasicProperties;import javax.websocket.Session;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/* 消息队列的本体服务器, 是TCP服务器 */
public class BrokerServer {private ServerSocket serverSocket = null;// 当前考虑一个 BrokerServer 上只有一个 虚拟主机private VirtuaHost virtualHost = new VirtuaHost("default");// 使用这个 哈希表 表示当前的所有会话(也就是说有哪些客户端正在和咱们的服务器进行通信)// 此处的 key 是 channelId, value 为对应的 Socket 对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>();// 引入一个线程池, 来处理多个客户端的请求.private ExecutorService executorService = null;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable = true;public  BrokerServer (int port) throws IOException {serverSocket = new ServerSocket(port);}// 开始服务器public void start() throws IOException {System.out.println("[BrokerServer] 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();// 把处理连接的逻辑丢给这个线程池.executorService.submit(() -> {processConnection(clientSocket);});}} catch (SocketException e) {System.out.println("[BrokerServer] 服务器停止运行!");// e.printStackTrace();}}// 停止服务器public void stop() throws IOException {runnable=false;executorService.shutdownNow();serverSocket.close();}// 处理一个客户端的链接// 在这一个链接中, 可能会涉及到多个请求和响应private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()){try (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){while (true){// 1 读取请求并解析Request request = readRequest(dataInputStream);// 2 根据请求计算响应Response response = process(request, clientSocket);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}} catch (EOFException | SocketException e) {// 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常.// 需要借助这个异常来结束循环System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString()+ ":" + clientSocket.getPort());} catch (ClassNotFoundException | MqException e) {e.printStackTrace();}} catch (IOException e) {System.out.println("[BrokerServer] connection 出现异常!");e.printStackTrace();} finally {try {// 当连接处理完了, 就需要记得关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错!");}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 这个刷新缓冲区也是重要的操作!!dataOutputStream.flush();}private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析.BasicArguments basicArguments = (BasicArguments) BinaryTool.toObject(request.getPayload());System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId()+ ", type=" + request.getType() + ", length=" + request.getLength());// 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥.boolean ok = true;if (request.getType() == 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId());} else if (request.getType() == 0x3) {// 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了.ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete((arguments.getQueueName()));} else if (request.getType() == 0x7) {QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());} else if (request.getType() == 0x8) {QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {// 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端.// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的// socket 对象了, 从而可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSockte = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!");}SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid("");subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBody(body);subScribeReturns.setBasicProperties(basicProperties);byte[] payload = BinaryTool.toBytes(subScribeReturns);// 2. 构造响应数据Response response = new Response();// oxc 服务器给消费者客户端托送的消息数据response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 把数据写回到客户端 , 写入到响应之中DataOutputStream dataOutputStream = new DataOutputStream(clientSockte.getOutputStream());writeResponse(dataOutputStream,response);}});} else if (request.getType() == 0xb) {// 调用 basicAck 确认消息.BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {// 当前的 type 是非法的.throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType());}// 3. 构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId()+ ", type=" + response.getType() + ", length=" + response.getLength());return response;}private void clearClosedSession(Socket clientSocket) {// 这里主要做的事情就是, 将遍历哈希表, 将不用的session 清楚掉// 不能在集合类中变查询, 边删除List<String> toDeleteChannelId = new ArrayList<>();for (Map.Entry<String ,Socket> entry:sessions.entrySet()) {toDeleteChannelId.add(entry.getKey());}for (String s:toDeleteChannelId) {sessions.remove(s);}}}

相关文章:

消息队列(12) - 定义服务器类

目录 前言设计思想 前言 之前,我们写了通信协议的具体设计,接下来我们设计服务器类 设计思想 我们先只考虑一个虚拟主机的情况下, 在一个虚拟主机的情况下,我们需要有一个session会话来帮助我们存储信息,并且既然是网络通信,那么socket关键字肯定也必不可少,我们在引入一个线…...

做正确的事 VS 正确的做事,哪个更重要?

管理大师彼得德鲁克曾在《有效的主管》一书中简明扼要地指出&#xff1a;“效率是以正确的方式做事&#xff0c;而效能则是做正确的事。效率和效能不应偏废&#xff0c;但这并不意味着效率和效能具有同样的重要性。我们当然希望同时提高效率和效能&#xff0c;但在效率与效能无…...

每日一题——寻找旋转排序数组中的最小值(I)

寻找旋转排序数组中的最小值——I 题目链接 思路 首先我们以数组[1,2,3,4,5,6,7]举个例子&#xff0c;经过旋转后它无非就这两种情况&#xff1a; 情况一&#xff1a;旋转过后数组变成两段有序数列&#xff1a; 情况二&#xff1a;旋转过后数组不变&#xff0c;仍然有序&…...

C语言每日一题:16:数对。

思路一&#xff1a;基本思路 1.x,y均不大于n&#xff0c;就是小于等于n。 2.x%y大于等于k。 3.一般的思路使用双for循环去遍历每一对数。 代码实现&#xff1a; #include <stdio.h> int main() {int n 0;int k 0;//输入scanf("%d%d", &n, &k);int x…...

中科亿海微浮点数转换定点数

引言 浮点数转换定点数是一种常见的数值转换技术&#xff0c;用于将浮点数表示转换为定点数表示。浮点数表示采用指数和尾数的形式&#xff0c;可以表示较大范围的数值&#xff0c;但存在精度有限的问题。而定点数表示则采用固定小数点位置的形式&#xff0c;具有固定的精度和范…...

JavaScript激活严格模式

在JavaScript中&#xff0c;严格模式是一种特殊的模式&#xff0c;通过’use strict’;去激活严格模式&#xff01;在 JavaScript 中&#xff0c;“use strict” 是一种指令&#xff0c;表示在代码运行时启用严格模式&#xff0c;从而禁止使用一些不安全或者不规范的语法&#…...

Linux cond_resched()简介

文章目录 简介一、cond_resched1.1 _cond_resched1.2 should_resched1.2.1 __preempt_count&#xff1a;1.2.2 函数说明 1.3 preempt_schedule_common1.3.1 preempt_schedule_common1.3.2 preempt_latency_start/stop 1.3.3 preempt_disable_notrace 参考资料 简介 Linux 内核…...

初出茅庐的小李博客之认识编码器

编码器是什么&#xff1a; 一种将角位移或者角速度转换成一连串电数字脉冲的旋转式传感器&#xff0c;我们可以通过编码器测量到底位移或者速度信息。编码器通常由一个旋转部分和一个固定部分组成&#xff0c;旋转部分随着被测量的物体进行旋转&#xff0c;固定部分则保持不动…...

NVIDIA TX2 NX编译及更新设备树

在NVIDIA官网下载相关文件 官网网址:https://developer.nvidia.com/embedded/jetson-linux-archive 我选择的版本为R32.7.4 需要下载3个文件,BSP、根文件系统、BSP源码: 解压 将Tegra_Linux_Sample-Root-Filesystem_R32.7.4_aarch64文件夹下的内容提取到Jetson_Linux_R32.…...

从零开始学Python(二)运算符、if、循环结构

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于Python的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.运算符 1.基本运算符 2.比较运算符 …...

Sentinel整合Spring Cloud Gateway、Zuul详解

Sentinel 支持对 Spring Cloud Gateway、Zuul 等主流的 API Gateway 进行限流。 Sentinel 1.6.0 引入了 Sentinel API Gateway Adapter Common 模块&#xff0c;此模块中包含网关限流的规则和自定义 API 的实体和管理逻辑&#xff1a; GatewayFlowRule&#xff1a;网关限流规则…...

wsl2安装mysql环境

安装完mysql后通过如下命令启动mysql service mysql start 会显示如下错误&#xff1a; mysql: unrecognized service 实际上上面显示的错误是由于mysql没有启动成功造成的 我们要想办法成功启动mysql才可以 1.通过如下操作就可以跳过密码直接进入mysql环境 2.如果想找到my…...

C#质检工具(StyleCop、SonarLint)

1、StyleCop StyleCop工具主要类似java中的checkStyle,是检查代码样式规范的工具。 1.1、StyleCop安装流程: 图1.1 图1.2 图1.3 安装StyleCop插件时可能会遇到下载特慢或卡住不动的情况,需注意: 1)网上说的关闭IPV6功能不管用 2)网上说的自动指定dns不管用 3)网上…...

PyTorch翻译官网教程-NLP FROM SCRATCH: GENERATING NAMES WITH A CHARACTER-LEVEL RNN

官网链接 NLP From Scratch: Generating Names with a Character-Level RNN — PyTorch Tutorials 2.0.1cu117 documentation 使用字符级RNN生成名字 这是我们关于“NLP From Scratch”的三篇教程中的第二篇。在第一个教程中</intermediate/char_rnn_classification_tutor…...

【C语言】结构体详解

现实生活中一个事物&#xff0c;会有许多属性连接起来。而C语言引入一种构造数据类型——结构体 将属于一个事物的多个数据组织起来以体现其内部联系。 一、结构体类型的定义 结构体类型 是一种 构造类型&#xff0c;它是由若干成员组成的&#xff0c;每个成员可以是一个基本…...

leetcode242. 有效的字母异位词

题目&#xff1a;leetcode242. 有效的字母异位词 描述&#xff1a; 给定两个字符串 s 和 t &#xff0c;编写一个函数来判断 t 是否是 s 的字母异位词。 注意&#xff1a;若 s 和 t 中每个字符出现的次数都相同&#xff0c;则称 s 和 t 互为字母异位词。 示例 1: 输入: s “…...

Unity 编辑器资源导入处理函数 OnPostprocessAudio :深入解析与实用案例

Unity 编辑器资源导入处理函数 OnPostprocessAudio 用法 点击封面跳转下载页面 简介 在Unity中&#xff0c;我们可以使用编辑器资源导入处理函数&#xff08;OnPostprocessAudio&#xff09;来自定义处理音频资源的导入过程。这个函数是继承自AssetPostprocessor类的&#xff…...

uniapp开发(由浅到深)

文章目录 1. 项目构建1.1 脚手架构建1.2 HBuilderX创建 uni-app项目步骤&#xff1a; 2 . 包依赖2.1 uView2.2 使用uni原生ui插件2.3 uni-modules2.4 vuex使用 3.跨平台兼容3.1 条件编译 4.API 使用4.1 正逆参数传递 5. 接口封装6. 多端打包3.1 微信小程序3.2 打包App3.2.1 自有…...

QT-基于Buildroot构建系统镜像下实现QT开发

QT-基于Buildroot构建系统镜像下实现QT开发 BuildRootUboot的仓库地址和commit idKernel 的仓库地址和commit id BuildRoot已编译库在Windows上的Create上创建项目编译QT项目 BuildRoot 这部分按照100ask官网的教程走即可: Uboot的仓库地址和commit id https://e.coding.net/…...

优雅地处理RabbitMQ中的消息丢失

目录 一、异常处理 二、消息重试机制 三、错误日志记录 四、死信队列 五、监控与告警 优雅地处理RabbitMQ中的消息丢失对于构建可靠的消息系统至关重要。下面将介绍一些优雅处理消息丢失的方案&#xff0c;包括异常处理、重试机制、错误日志记录、死信队列和监控告警等。…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

【2025年】解决Burpsuite抓不到https包的问题

环境&#xff1a;windows11 burpsuite:2025.5 在抓取https网站时&#xff0c;burpsuite抓取不到https数据包&#xff0c;只显示&#xff1a; 解决该问题只需如下三个步骤&#xff1a; 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站&#xff1a;https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本&#xff1a; Windows版&#xff08;推荐下载标准版&#xff09; Windows系统安装步骤 运行安装程序&#xff1a; 双击下载的.exe安装文件 如果出现安全提示&…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

【Go语言基础【13】】函数、闭包、方法

文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数&#xff08;函数作为参数、返回值&#xff09; 三、匿名函数与闭包1. 匿名函数&#xff08;Lambda函…...

三分算法与DeepSeek辅助证明是单峰函数

前置 单峰函数有唯一的最大值&#xff0c;最大值左侧的数值严格单调递增&#xff0c;最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值&#xff0c;最小值左侧的数值严格单调递减&#xff0c;最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...

AI语音助手的Python实现

引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...