当前位置: 首页 > news >正文

使用rabbitmq进行支付之后的消息通知

订单服务完成支付后将支付结果发给每一个与订单服务对接的微服务,订单服务将消息发给交换机,由交换机广播消息,每个订阅消息的微服务都可以接收到支付结果.

微服务收到支付结果根据订单的类型去更新自己的业务数据。

相关技术方案

使用消息队列进行异步通知需要保证消息的可靠性,即生产端将消息成功通知到消费端。

消息从生产端发送到消费端经历了如下过程:

1、消息发送到交换机

2、消息由交换机发送到队列

3、消息者收到消息进行处理

保证消息的可靠性需要保证以上过程的可靠性,本项目使用RabbitMQ可以通过如下方面保证消息的可靠性。

1、生产者确认机制

发送消息前使用数据库事务将消息保证到数据库表中

成功发送到交换机将消息从数据库中删除

2、mq持久化

mq收到消息进行持久化,当mq重启即使消息没有消费完也不会丢失。

需要配置交换机持久化、队列持久化、发送消息时设置持久化。

3、消费者确认机制

消费者消费成功自动发送ack,否则重试消费。

订单服务通过消息队列将支付结果发给学习中心服务,消息队列采用发布订阅模式。

1、订单服务创建支付结果通知交换机。

2、学习中心服务绑定队列到交换机。

首先需要在学习中心服务和订单服务工程配置连接消息队列。

1、首先在订单服务添加消息队列依赖

XML

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、在nacos配置rabbitmq-dev.yaml为通用配置文件

YAML
spring:
  rabbitmq:
    host: 192.168.101.65
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-returns: false #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback
    template:
      mandatory: false #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
    listener:
      simple:
        acknowledge-mode: none #出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq
        retry:
          enabled: true #开启消费者失败重试
          initial-interval: 1000ms #初识的失败等待时长为1秒
          multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 #最大重试次数
          stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false

3、在订单服务接口工程引入rabbitmq-dev.yaml配置文件

YAML
shared-configs:
  - data-id: rabbitmq-${spring.profiles.active}.yaml
    group: xuecheng-plus-common
    refresh: true

4、在订单服务service工程编写MQ配置类,配置交换机

Java
package com.xuecheng.orders.config;

import com.alibaba.fastjson.JSON;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MqMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Mr.M
 * @version 1.0
 * @description TODO
 * @date 2023/2/23 16:59
 */
@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {

    //交换机
    public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
    //支付结果通知消息类型
    public static final String MESSAGE_TYPE = "payresult_notify";
    //支付通知队列
    public static final String PAYNOTIFY_QUEUE = "paynotify_queue";

    //声明交换机,且持久化
    @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
    public FanoutExchange paynotify_exchange_fanout() {
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
    }
    //支付通知队列,且持久化
    @Bean(PAYNOTIFY_QUEUE)
    public Queue course_publish_queue() {
        return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
    }

    //交换机和支付通知队列绑定
    @Bean
    public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //消息处理service
        MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);
            //将消息再添加到消息表
            mqMessageService.addMessage(mqMessage.getMessageType(),mqMessage.getBusinessKey1(),mqMessage.getBusinessKey2(),mqMessage.getBusinessKey3());

        });
    }
}

重启订单服务,登录rabbitmq,查看交换机自动创建成功

查看队列自动成功

4.3.2 发送支付结果

在OrderService中定义接口

Java
/**
 * 发送通知结果
 * @param message
 */
public void notifyPayResult(MqMessage message);

编写接口实现方法:

Java
@Override
public void notifyPayResult(MqMessage message) {

    //1、消息体,转json
    String msg = JSON.toJSONString(message);
    //设置消息持久化
    Message msgObj = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(message.getId().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){
                    // 3.1.ack,消息成功
                    log.debug("通知支付结果消息发送成功, ID:{}", correlationData.getId());
                    //删除消息表中的记录
                    mqMessageService.completed(message.getId());
                }else{
                    // 3.2.nack,消息失败
                    log.error("通知支付结果消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 发送消息
    rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj,correlationData);

}

订单服务收到第三方平台的支付结果时,在saveAliPayStatus方法中添加代码,向数据库消息表添加消息并进行发送消息,如下所示:

Java
@Transactional
@Override
public void saveAliPayStatus(PayStatusDto payStatusDto) {
        .......
        //保存消息记录,参数1:支付结果通知类型,2: 业务id,3:业务类型
        MqMessage mqMessage = mqMessageService.addMessage("payresult_notify", orders.getOutBusinessId(), orders.getOrderType(), null);
        //通知消息
        notifyPayResult(mqMessage);
    }
}

配置交换机和队列

在order-service工程配置

消息发送方法

Java
/**
 * 发送通知结果
 * @param message
 */
public void notifyPayResult(MqMessage message);
 

4.4 接收支付结果

4.4.1 学习中心服务集成MQ

1、在学习中心服务添加消息队列依赖

XML
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、在学习中心服务接口工程引入rabbitmq-dev.yaml配置文件

YAML
shared-configs:
  - data-id: rabbitmq-${spring.profiles.active}.yaml
    group: xuecheng-plus-common
    refresh: true

3、添加配置类

Java
package com.xuecheng.learning.config;

import com.alibaba.fastjson.JSON;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MqMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Mr.M
 * @version 1.0
 * @description TODO
 * @date 2023/2/23 16:59
 */
@Slf4j
@Configuration
public class PayNotifyConfig {

    //交换机
    public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
    //支付结果通知消息类型
    public static final String MESSAGE_TYPE = "payresult_notify";
    //支付通知队列
    public static final String PAYNOTIFY_QUEUE = "paynotify_queue";

    //声明交换机,且持久化
    @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
    public FanoutExchange paynotify_exchange_fanout() {
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
    }
    //支付通知队列,且持久化
    @Bean(PAYNOTIFY_QUEUE)
    public Queue course_publish_queue() {
        return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
    }

    //交换机和支付通知队列绑定
    @Bean
    public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

}

4.4.2 接收支付结果

监听MQ,接收支付结果,定义ReceivePayNotifyService类如下:

Java
package com.xuecheng.learning.service.impl;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.xuecheng.base.exception.XueChengPlusException;
import com.xuecheng.learning.config.PayNotifyConfig;
import com.xuecheng.learning.service.MyCourseTablesService;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MqMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @author Mr.M
 * @version 1.0
 * @description 接收支付结果
 * @date 2023/2/23 19:04
 */
@Slf4j
@Service
public class ReceivePayNotifyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    MqMessageService mqMessageService;

    @Autowired
    MyCourseTablesService myCourseTablesService;


    //监听消息队列接收支付结果通知
    @RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE)
    public void receive(Message message, Channel channel) {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        //获取消息
        MqMessage mqMessage = JSON.parseObject(message.getBody(), MqMessage.class);
        log.debug("学习中心服务接收支付结果:{}", mqMessage);

        //消息类型
        String messageType = mqMessage.getMessageType();
        //订单类型,60201表示购买课程
        String businessKey2 = mqMessage.getBusinessKey2();
        //这里只处理支付结果通知
        if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType) && "60201".equals(businessKey2)) {
            //选课记录id
            String choosecourseId = mqMessage.getBusinessKey1();
            //添加选课
            boolean b = myCourseTablesService.saveChooseCourseStauts(choosecourseId);
            if(!b){
                //添加选课失败,抛出异常,消息重回队列
                XueChengPlusException.cast("收到支付结果,添加选课失败");
            }
        }


    }


}

相关文章:

使用rabbitmq进行支付之后的消息通知

订单服务完成支付后将支付结果发给每一个与订单服务对接的微服务&#xff0c;订单服务将消息发给交换机&#xff0c;由交换机广播消息&#xff0c;每个订阅消息的微服务都可以接收到支付结果. 微服务收到支付结果根据订单的类型去更新自己的业务数据。 相关技术方案 使用消息…...

【100天精通python】Day47:python网络编程_Web开发:web服务器,前端基础以及静态服务器

目录 1 网络编程与web编程 1.1 网络编程 1.2 web编程 1.3 前后端交互的基本原理 2 Web开发基础 2.1 HTTP协议 2.2 Web服务器 2.3 前端基础 2.3.1 HTML&#xff08;超文本标记语言&#xff09; 2. 3.2 CSS&#xff08;层叠样式表&#xff09; 2.3.3 JavaScript 2.…...

Web框架Beego

beego简介第一个beego项目beego项目结构分析bee 工具简介beego参数配置beego路由设置beego控制器介绍beego获取参数beego ORMbeego orm高级查询beego 原生sql查询beego 模板语法指南beego模板处理...

Kubernetes(K8s)基本环境部署

此处只做学习使用&#xff0c;配置单master环境。 一、环境准备 1、ip主机规划&#xff08;准备五台新机&#xff09;>修改各个节点的主机名 注意&#xff1a;关闭防火墙与selinux 节点主机名ip身份joshua1 kubernetes-master.openlab.cn 192.168.134.151masterjoshua2k…...

antd5:form组件底层封装库field-form-1.37.0启动

一开始node版本是18.16.0 npm install发现安装依赖成功 npm start发现启动出错 node:internal/crypto/hash:71this[kHandle] new _Hash(algorithm, xofLen);^Error: error:0308010C:digital envelope routines::unsupportedat new Hash (node:internal/crypto/hash:71:19)…...

深度学习经典检测方法的概述

深度学习经典的检测方法 two-stage&#xff08;两阶段&#xff09;&#xff1a;Faster-rcnn Mask-Rcnn系列 两阶段&#xff08;two-stage&#xff09;是指先通过一个区域提取网络&#xff08;region proposal network&#xff0c;RPN&#xff09;生成候选框&#xff0c;再通过…...

viewpager2导致的mViews下标越界问题

viewpager2种在嵌套一个RecyclerView场景&#xff1a;左右滑动&#xff0c;上下滑动&#xff0c;出现mViews为null问题。 //RecyclerView布局为 new StaggeredGridLayoutManager(2,StaggeredGridLayoutManager.VERTICAL) 由于使用viewpager2导致布局缓存的销毁&#xff0c;会…...

无涯教程-JavaScript - NORMSDIST函数

NORMSDIST函数替代Excel 2010中的NORM.S.DIST函数。 描述 该函数返回标准正态累积分布函数。分布的平均值为0(零),标准偏差为1。使用此功能代替标准法线区域的表格。 语法 NORMSDIST (z)争论 Argument描述Required/OptionalZThe value for which you want the distributio…...

Mysql查询(SELECT)

基本查询&#xff1a;SELECT FROM SELECT 查询字段 FROM 表名; SELECT * FROM userinfo; 条件查询&#xff1a;用where表示查询条件 SELECT 查询字段 FROM 表名 WHERE 条件; 模糊查询&#xff1a;like %匹配0或多个字符&#xff0c;一般不用左模糊&#xff08;%放在左边&…...

基于JAVAEE技术的ssm校园车辆管理系统源码和论文

基于JAVAEE技术的ssm校园车辆管理系统源码和论文105 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 1.选题背景和意义 背景&#xff1a; 随着第二次工业革命后&#xff0c;内燃机的发明与完善&#xff0c;解…...

opencv-人脸识别

对https://blog.csdn.net/weixin_46291251/article/details/117996591这哥们代码的一些修改 import cv2 import numpy as np import os import shutil import threading import tkinter as tk from PIL import Image, ImageTkchoice 0# 首先读取config文件&#xff0c;第一行…...

九、idSpanMap使用基数树代替原本的unordered_map 十、使用基数树前后性能对比

九、idSpanMap使用基数树代替原本的unordered_map 我们原本的idSpanMap用的是STL容器中的unordered_map哈希桶&#xff0c;因为STL的容器本身是不保证线程安全的&#xff0c;所以我们在访问时需要加锁保证线程安全&#xff0c;这也就是我们写的内存池的性能的瓶颈点。因为我做…...

政府科技项目验收全流程分享

科技验收测试 &#xff08;验收申请→主管部门初审→科技厅审核→组织验收→归档备案→信用管理&#xff09;&#xff1a; &#xff08;一&#xff09;验收申请 项目承担单位通过省科技业务管理系统提交验收申请。 按期完成的项目&#xff0c;项目承担单位应当在项目合同书…...

基于Matlab实现生活中的图像信号分类(附上源码+数据集)

在我们的日常生活中&#xff0c;我们经常会遇到各种各样的图像信号&#xff0c;例如照片、视频、图标等等。对这些图像信号进行分类和识别对于我们来说是非常有用的。在本文中&#xff0c;我将介绍如何使用Matlab来实现生活中的图像信号分类。 文章目录 介绍源码数据集下载 介…...

YOLOv5算法改进(12)— 替换主干网络之Swin Transformer

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。Swin Transformer是一种基于Transformer的深度学习模型&#xff0c;它在视觉任务中表现出色。与之前的Vision Transformer&#xff08;ViT&#xff09;不同&#xff0c;Swin Transformer具有高效和精确的特性&#xff0c;并…...

php 权限节点的位运算

一&#xff0c;概述 在 PHP 中&#xff0c;位运算可以用来进行权限节点的判断。通常&#xff0c;每个权限节点都会用一个不同的位表示&#xff08;2的n次方&#xff0c;从0开始&#xff09;&#xff0c;可以将这些位组合成一个权限值。然后&#xff0c;可以使用位运算符来检查…...

ClickHouse进阶(六):副本与分片-2-Distributed引擎

进入正文前&#xff0c;感谢宝子们订阅专题、点赞、评论、收藏&#xff01;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; &#x1f3e1;个人主页&#xff1a;含各种IT体系技术,IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客 &#x1f4cc;订阅…...

Git和Github的基本用法

目录 背景 下载安装 安装 git for windows 安装 tortoise git 使用 Github 创建项目 注册账号 创建项目 下载项目到本地 Git 操作的三板斧 放入代码 三板斧第一招: git add 三板斧第二招: git commit 三板斧第三招: git push 小结 &#x1f388;个人主页&#xf…...

279. 完全平方数

279.完全平方数 给你一个整数 n &#xff0c;返回 和为 n 的完全平方数的最少数量 。 完全平方数 是一个整数&#xff0c;其值等于另一个整数的平方&#xff1b;换句话说&#xff0c;其值等于一个整数自乘的积。例如&#xff0c;1、4、9 和 16 都是完全平方数&#xff0c;而 …...

一篇文章学会C#的正则表达式

https://blog.csdn.net/qq_38507850/article/details/79179128 正则表达式 一句话概括就是用来对字符串根据自己的规则进行匹配的&#xff0c;可以匹配(返回)出符合自己要求的匹配结果&#xff0c;有人说字符串类的函数也可以&#xff0c;确实是这样&#xff0c;但是字符串的函…...

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

黑马Mybatis

Mybatis 表现层&#xff1a;页面展示 业务层&#xff1a;逻辑处理 持久层&#xff1a;持久数据化保存 在这里插入图片描述 Mybatis快速入门 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6501c2109c4442118ceb6014725e48e4.png //logback.xml <?xml ver…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

JDK 17 新特性

#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持&#xff0c;不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的&#xff…...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程&#xff1a;首先由HR先筛选一部分简历后&#xff0c;在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如&#xff1a;Boss直聘&#xff08;招聘方平台&#xff09; 直接按照条件进行筛选 例如&#xff1a…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

JAVA后端开发——多租户

数据隔离是多租户系统中的核心概念&#xff0c;确保一个租户&#xff08;在这个系统中可能是一个公司或一个独立的客户&#xff09;的数据对其他租户是不可见的。在 RuoYi 框架&#xff08;您当前项目所使用的基础框架&#xff09;中&#xff0c;这通常是通过在数据表中增加一个…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...