Golang | 搜索哨兵-对接分布式gRPC服务
- 哨兵(centennial)负责接待客人,直接与调用方对接。
- 哨兵的核心组件包括service HUB和connection pool。
- service HUB用于与服务中心通信,获取可提供服务的节点信息。
- connection pool用于缓存与index worker的连接,避免每次搜索时重新建立连接。
- 连接池初始化为空map。
- 提供函数获取指定endpoint的GRPC连接。
- 函数首先检查本地缓存中是否有可用连接,若无则创建新连接。
- 创建连接时默认立即返回,可选阻塞模式直到连接可用。
- 连接建立后放入缓存并返回。
- 哨兵提供添加、删除和搜索三个核心功能。
- 添加功能:随机选择一台index worker添加新文档。
- 删除功能:遍历所有endpoint,并行删除指定文档。
- 搜索功能:将搜索请求发送到所有endpoint,合并搜索结果。
- 使用channel进行并发搜索结果的收集。
- 上游并发写入channel,下游读取channel数据到切片。
- 使用wait group等待所有搜索任务完成。
- 关闭channel后仍可读取,确保读取到所有数据。
package index_serviceimport ("context""fmt""github.com/jmh000527/criker-search/index_service/service_hub""github.com/jmh000527/criker-search/types""github.com/jmh000527/criker-search/utils""google.golang.org/grpc""google.golang.org/grpc/connectivity""google.golang.org/grpc/credentials/insecure""sync""sync/atomic""time"
)// Sentinel 哨兵前台,与外部系统对接的接口。
type Sentinel struct {hub service_hub.ServiceHub // 从 Hub 中获取 IndexServiceWorker 的集合。可以直接访问 ServiceHub,也可能通过代理模式进行访问。connPool sync.Map // 与各个 IndexServiceWorker 建立的 gRPC 连接池。缓存连接以避免每次请求都重新建立连接,提升效率。
}// NewSentinel 创建并返回一个 Sentinel 实例。
//
// 参数:
// - etcdServers: 一个字符串数组,包含了 etcd 服务器的地址。
//
// 返回值:
// - *Sentinel: 一个新的 Sentinel 实例。
func NewSentinel(etcdServers []string) *Sentinel {return &Sentinel{// hub: GetServiceHub(etcdServers, 10), // 直接访问 ServiceHubhub: service_hub.GetServiceHubProxy(etcdServers, 3, 100), // 使用代理模式访问 ServiceHubconnPool: sync.Map{}, // 初始化 gRPC 连接池}
}// GetGrpcConn 向指定的 endpoint 建立 gRPC 连接。
// 如果连接已经存在于缓存中且状态可用,则直接返回缓存的连接。
// 如果连接状态不可用或不存在,则重新建立连接并存储到缓存中。
//
// 参数:
// - endpoint: 要连接的 gRPC 服务的地址。
//
// 返回值:
// - *grpc.ClientConn: 返回与 endpoint 建立的 gRPC 连接,如果连接失败则返回 nil。
func (sentinel *Sentinel) GetGrpcConn(endpoint string) *grpc.ClientConn {v, exists := sentinel.connPool.Load(endpoint)// 连接缓存中存在if exists {conn := v.(*grpc.ClientConn)// 如果连接状态不可用,则从连接缓存中删除if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Shutdown {utils.Log.Printf("连接到 endpoint %s 的状态为 %s", endpoint, conn.GetState().String())conn.Close()sentinel.connPool.Delete(endpoint)} else {return conn}}// 连接到服务,控制连接超时ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)defer cancel()// 获取 gRPC 连接// grpc.Dial 是异步连接,连接状态为正在连接。// 如果设置了 grpc.WithBlock 选项,则会阻塞等待(等待握手成功)。// 需要注意的是,当未设置 grpc.WithBlock 时,ctx 超时控制对其无任何效果。grpcConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())if err != nil {utils.Log.Printf("连接到 %s 的 gRPC 失败,错误: %s", endpoint, err.Error())return nil}utils.Log.Printf("连接到 %s 的 gRPC 成功", endpoint)// 将 gRPC 连接缓存到连接池中sentinel.connPool.Store(endpoint, grpcConn)return grpcConn
}// AddDoc 向集群中的 IndexService 添加文档。如果文档已存在,会先删除旧文档再添加新文档。
//
// 参数:
// - doc: 要添加的文档,类型为 types.Document。
//
// 返回值:
// - int: 成功添加的文档数量。
// - error: 如果在添加文档时出现错误,返回相应的错误信息。
func (sentinel *Sentinel) AddDoc(doc types.Document) (int, error) {// 根据负载均衡策略,选择一个 IndexService 节点,将文档添加到该节点endpoint := sentinel.hub.GetServiceEndpoint(IndexService)if len(endpoint) == 0 {return 0, fmt.Errorf("未找到服务 %s 的有效节点", IndexService)}// 创建到该节点的 gRPC 连接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {return 0, fmt.Errorf("连接到 %s 的 gRPC 失败", endpoint)}// 创建 gRPC 客户端并进行调用client := NewIndexServiceClient(grpcConn)affected, err := client.AddDoc(context.Background(), &doc)if err != nil {return 0, err}utils.Log.Printf("成功向 worker %s 添加 %d 个文档", endpoint, affected.Count)return int(affected.Count), nil
}// DeleteDoc 从集群中删除与 docId 对应的文档,返回成功删除的文档数量(通常不会超过 1)。
//
// 参数:
// - docId: 要删除的文档的唯一标识符。
//
// 返回值:
// - int: 成功删除的文档数量。
func (sentinel *Sentinel) DeleteDoc(docId string) int {// 获取该服务的所有 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var n int32wg := sync.WaitGroup{}wg.Add(len(endpoints))for _, endpoint := range endpoints {// 并行地向各个 IndexServiceWorker 删除对应的 docId 的文档。// 正常情况下,只有一个 worker 上有该文档。go func(endpoint string) {defer wg.Done()grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("连接到 %s 的 gRPC 失败", endpoint)return}client := NewIndexServiceClient(grpcConn)affected, err := client.DeleteDoc(context.Background(), &DocId{docId})if err != nil {utils.Log.Printf("从 worker %s 删除文档 %s 失败,错误: %s", endpoint, docId, err)return}if affected.Count > 0 {atomic.AddInt32(&n, affected.Count)utils.Log.Printf("从 worker %s 删除文档 %s 成功", endpoint, docId)}}(endpoint)}wg.Wait()return int(atomic.LoadInt32(&n))
}// Search 执行检索操作,并返回文档列表。
//
// 参数:
// - query: 指定的检索查询条件,类型为 *types.TermQuery。
// - onFlag: 开启的标志位,类型为 uint64。
// - offFlag: 关闭的标志位,类型为 uint64。
// - orFlags: OR 标志位的切片,类型为 []uint64。
//
// 返回值:
// - []*types.Document: 经过检索的文档列表,可能为空。
//
// 详细描述:
// 1. 从服务中心获取所有的 endpoints。
// 2. 使用 goroutines 并行地对每个 endpoint 执行检索操作。
// 3. 将每个检索结果发送到 resultChan 通道中。
// 4. 在另一个 goroutine 中,从 resultChan 通道中读取结果,并将其存储在 docs 切片中。
// 5. 等待所有的检索操作完成后,关闭 resultChan,并等待从 resultChan 中读取完所有结果。
// 6. 返回存储的文档列表。
func (sentinel *Sentinel) Search(query *types.TermQuery, onFlag, offFlag uint64, orFlags []uint64) []*types.Document {// 获取该服务所有的 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return nil}// 用于存储检索结果的切片和通道docs := make([]*types.Document, 0, 1000)resultChan := make(chan *types.Document, 1000)// 使用 WaitGroup 并行开启协程去每个 endpoint 执行检索操作var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()// 获取 gRPC 连接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn == nil {utils.Log.Printf("连接到 %s 的 gRPC 连接失败", endpoint)return}client := NewIndexServiceClient(grpcConn)// 执行检索请求searchResult, err := client.Search(context.Background(), &SearchRequest{Query: query,OnFlag: onFlag,OffFlag: offFlag,OrFlags: orFlags,})if err != nil {utils.Log.Printf("向 worker %s 执行查询 %s 失败,错误: %s", endpoint, query, err)return}if len(searchResult.Results) > 0 {utils.Log.Printf("向 worker %s 执行查询 %s 成功,获取到 %v 个文档", endpoint, query, len(searchResult.Results))for _, result := range searchResult.Results {resultChan <- result}}}(endpoint)}// 启动另一个 goroutine 从 resultChan 中获取结果signalChan := make(chan struct{})go func() {for doc := range resultChan {docs = append(docs, doc)}// 读取完成,通知主 goroutinesignalChan <- struct{}{}}()// 等待所有检索操作完成wg.Wait()// 关闭 resultChan 通道close(resultChan)// 等待结果读取完毕<-signalChanreturn docs
}// Count 获取所有服务中的搜索条目数量。
//
// 参数:
// - 无参数。
//
// 返回值:
// - int: 所有服务中的文档总数量。
//
// 详细描述:
// 1. 从服务中心获取所有的 endpoints。
// 2. 使用 goroutines 并行地对每个 endpoint 执行计数操作。
// 3. 将每个 worker 中的文档数量累加到总计数中。
// 4. 等待所有计数操作完成后,返回文档总数量。
func (sentinel *Sentinel) Count() int {var n int32// 获取所有服务的 endpointsendpoints := sentinel.hub.GetServiceEndpoints(IndexService)if len(endpoints) == 0 {return 0}var wg sync.WaitGroupwg.Add(len(endpoints))for _, endpoint := range endpoints {go func(endpoint string) {defer wg.Done()// 获取 gRPC 连接grpcConn := sentinel.GetGrpcConn(endpoint)if grpcConn != nil {client := NewIndexServiceClient(grpcConn)// 执行计数请求affected, err := client.Count(context.Background(), new(CountRequest))if err != nil {utils.Log.Printf("从 worker %s 获取文档数量失败: %s", endpoint, err)}if affected.Count > 0 {// 累加计数atomic.AddInt32(&n, affected.Count)utils.Log.Printf("worker %s 共有 %d 个文档", endpoint, affected.Count)}}}(endpoint)}// 等待所有计数操作完成wg.Wait()return int(atomic.LoadInt32(&n))
}// Close 关闭各个grpc client连接,关闭etcd client连接
func (sentinel *Sentinel) Close() (err error) {sentinel.connPool.Range(func(key, value any) bool {conn := value.(*grpc.ClientConn)err = conn.Close()return true})sentinel.hub.Close()return
}
相关文章:
Golang | 搜索哨兵-对接分布式gRPC服务
哨兵(centennial)负责接待客人,直接与调用方对接。哨兵的核心组件包括service HUB和connection pool。service HUB用于与服务中心通信,获取可提供服务的节点信息。connection pool用于缓存与index worker的连接,避免每…...

智慧充电桩数字化管理平台:环境监测与动态数据可视化技术有哪些作用?
随着新能源汽车的普及,智慧充电桩作为基础设施的重要组成部分,正逐步向数字化、智能化方向发展。环境监测与动态数据可视化技术的应用,为充电桩的高效管理和运维提供了全新解决方案。通过实时采集环境参数与运行数据,并结合可视化…...
debian12.9或ubuntu,vagrant离线安装插件vagrant-libvirt
系统盘: https://mirror.lzu.edu.cn/debian-cd/12.9.0/amd64/iso-dvd/debian-12.9.0-amd64-DVD-1.iso 需要的依赖包,无需安装ruby( sudo apt install -y ruby-full ruby-dev rubygems ) : apt install -y iptables; apt install -y curl;rootdebian129:~# dpkg -l iptables …...

家政小程序开发,开启便捷生活新篇章
在快节奏的现代生活中,家务琐事常常让人分身乏术,如何高效解决家政服务需求成了众多家庭的难题。家政小程序开发,正是为解决这一痛点而生,它将为您带来前所未有的便捷生活体验。 想象一下,您只需打开手机上的家政小程…...
C++ 重载(Overload)、重写(Override)、隐藏(Hiding) 的区别
C 重载(Overload)、重写(Override)、隐藏(Hiding) 的区别 这三个概念是 C 面向对象的核心知识点,也是面试必问内容。下面我们从定义、发生条件、代码示例、底层原理全方位解析它们的区别。 一、核心区别对比表(速记版) 特性重载(Overload)…...

李臻20242817_安全文件传输系统项目报告_第14周
安全文件传输系统项目报告(第 14 周) 1. 代码链接 Gitee 仓库地址:https://gitee.com/li-zhen1215/homework/tree/master/Secure-file 代码结构说明: SecureFileTransfer/ ├── client/ # 客户端主目…...

20250531MATLAB三维绘图
MATLAB三维绘图 三维曲线:plot3功能介绍代码实现过程plot3实现效果 三维曲面空间曲面作图命令:meshmeshgrid语法示例应用meshgrid实操训练 peakspeaks 的基本用法peaks数学表达式实操训练自定义网格大小使用自定义网格 meshMATLAB代码对齐快捷键Ctrli墨西…...

深入理解C#异步编程:原理、实践与最佳方案
在现代软件开发中,应用程序的性能和响应能力至关重要。特别是在处理I/O密集型操作(如网络请求、文件读写、数据库查询)时,传统的同步编程方式会导致线程阻塞,降低程序的吞吐量。C# 的异步编程模型(async/aw…...

基于千帆大模型的AI体检报告解读系统实战:使用OSS与PDFBox实现PDF内容识别
目录 说明 前言 需求 流程说明 表结构说明 整体流程 百度智能云 注册和实名认证 创建应用 费用说明 大模型API说明 集成大模型 设计Prompt 上传体检报告 读取PDF内容 功能实现 智能评测 抽取大模型工具 功能实现 总结 说明 AI体检报告解读、病例小结或者…...
Redis缓存落地总结
最近在优化电子签系统,涉及到缓存相关的也一并优化了,写个文档做个总结,防止以后开发时又考虑不全 1、避免大key 避免缓存大PDF文件: 💡 经验值:单个Redis Value不超过10KB,集合元素不超过500…...

Spring,SpringMVC,SpringBoot
1.Spring最核心包括aop和ioc概念 AOP 能够将将哪些于业务无关的,并且大量重复的业务逻辑进行封装起来,便于减少重复代码,降低模块之间的耦合度,给未来的系统更好的可用性和可维护性。 Spring中AOP是采用动态代理,JDK代…...
npm、pnpm、yarn使用以及区别
npm 使用 安装包:在项目目录下,npm install <包名> 用于本地安装包到 node_modules 目录,并添加到 package.json 的 dependencies 中;npm install -g <包名> 用于全局安装,适用于命令行工具等。初始化项目…...
flutter加载dll 报错问题
解决flutter加载dll 报错问题 LoadLibrary 报错 126 or 193 明确一点:flutter构建exe 时默认是MSVC的。 1. 先检查dll 的位数是否满足 file ***.dll output: PE32 executable (DLL) (console) x86-64, for MS Windows, 19 sections 这种是64位的机器。 满足的话可…...

数据分析学习笔记——A/B测试
目录 前言 A/B测试中的统计学方法 假设检验 Levenes Test莱文测试 t 检验(两组均值差异) 实战案例 数据来源及参考资料 代码详解 导入数据 计算ROI Request检验 GMV检验 ROI检验 结语 前言 什么是A/B测试?说白了就是中学生物实…...
【python深度学习】Day 41 简单CNN
知识回顾 数据增强卷积神经网络定义的写法batch归一化:调整一个批次的分布,常用与图像数据特征图:只有卷积操作输出的才叫特征图调度器:直接修改基础学习率 卷积操作常见流程如下: 1. 输入 → 卷积层 → Batch归一化层…...

基于RK3568/RK3588/全志H3/飞腾芯片/音视频通话程序/语音对讲/视频对讲/实时性好/极低延迟
一、前言说明 近期收到几个需求都是做音视频通话,很多人会选择用webrtc的方案,这个当然是个不错的方案,但是依赖的东西太多,而且相关组件代码量很大,开发难度大。所以最终选择自己属性的方案,那就是推流拉…...

解决 Win11 睡眠后黑屏无法唤醒的问题
目录 一、问题描述二、解决方法1. 禁用快速启动2. 设置 Management Engine Interface3. 允许混合睡眠其他命令 4. 修复系统文件5. 更新 Windows 或驱动程序6. 其他1)更改电源选项2)刷新 Hiberfil.sys 文件3)重置电源计划4)运行系统…...

[ElasticSearch] RestAPI
🌸个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 🏵️热门专栏: 🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 🍕 Collection与…...

Linux中的shell脚本
什么是shell脚本 shell脚本是文本的一种shell脚本是可以运行的文本shell脚本的内容是由逻辑和数据组成shell脚本是解释型语言 用file命令可以查看文件是否是一个脚本文件 file filename 脚本书写规范 注释 单行注释 使用#号来进行单行注释 多行注释 使用 : " 注释内容…...

dvwa3——CSRF
LOW: 先尝试change一组密码:123456 修改成功,我们观察上面的url代码 http://localhost/DVWA/vulnerabilities/csrf/?password_new123456&password_conf123456&ChangeChange# 将password_new部分与password_conf部分改成我们想要的…...

【学习笔记】Transformer
学习的博客(在此致谢): 初识CV - Transformer模型详解(图解最完整版) 1 整体结构 Transformer由Encoder和Decoder组成,分别包含6个block。 Transformer的工作流程大体如下: 获取每个单词的em…...

欢乐熊大话蓝牙知识12:用 BLE 打造家庭 IoT 网络的三种方式
🏠 用 BLE 打造家庭 IoT 网络的三种方式 不止是“蓝牙耳机”,BLE 还能把你家“点亮成精”! 👋 前言:BLE 不只是蓝牙耳机的“代名词” 蓝牙?很多人一听就联想到“耳机连接失败请重试”。但你知道吗?现在 BLE(Bluetooth Low Energy)在智能家居中已经偷偷搞起了大事情。…...

02.上帝之心算法用GPU计算提速50倍
本文介绍了上帝之心的算法及其Python实现,使用Python语言的性能分析工具测算性能瓶颈,将算法最耗时的部分重构至CUDA C语言在纯GPU上运行,利用GPU核心更多并行更快的优势显著提高算法运算速度,实现了结果不变的情况下将耗时缩短五…...

MES管理系统:Java+Vue,含源码与文档,实现生产过程实时监控、调度与优化,提升制造企业效能
前言: 在当今竞争激烈的制造业环境中,企业面临着提高生产效率、降低成本、提升产品质量以及快速响应市场变化等多重挑战。MES管理系统作为连接企业上层计划管理系统与底层工业控制之间的桥梁,扮演着至关重要的角色。它能够实时收集、分析和处…...

LeetCode算法题 (搜索二维矩阵)Day18!!!C/C++
https://leetcode.cn/problems/search-a-2d-matrix/description/ 一、题目分析 给你一个满足下述两条属性的 m x n 整数矩阵: 每行中的整数从左到右按非严格递增顺序排列。每行的第一个整数大于前一行的最后一个整数。 给你一个整数 target ,如果 ta…...

VectorStore 组件深入学习与检索方法
考虑到目前市面上的向量数据库众多,每个数据库的操作方式也无统一标准,但是仍然存在着一些公共特征,LangChain 基于这些通用的特征封装了 VectorStore 基类,在这个基类下,可以将方法划分成 6 种: 相似性搜…...

HackMyVM-First
信息搜集 主机发现 ┌──(kali㉿kali)-[~] └─$ nmap -sn 192.168.43.0/24 Starting Nmap 7.95 ( https://nmap.org ) at 2025-05-31 06:13 EDT Nmap scan report for 192.168.43.1 Host is up (0.0080s latency). MAC Address: C6:45:66:05:91:88 (Unknown) …...
30V/150A MOSFET 150N03在无人机驱动动力系统中的性能边界与热设计挑战
产品技术概述 150N03 是一款基于沟槽式工艺(Trench Technology)的N沟道功率MOSFET,其核心价值在于: 电压/电流规格:VDSS30V, ID150A (Tc25℃) 工艺特征:高密度元胞设计实现超低导通电阻 双面散热架构:顶部裸露铜架底…...
数据共享交换平台之数据资源目录
依据信息资源体系规范,构建多维度、多层级的资源目录体系,完整的展示和管理资源目录。资源目录提供以下功能: 多层级资源目录展示,能够将资源目录按照技术维度和管理维度进行分类管理,并能够将资源目录按照数据湖、基础…...

跨平台浏览器集成库JxBrowser 支持 Chrome 扩展程序,高效赋能 Java 桌面应用
JxBrowser 是 TeamDev 开发的跨平台库,用于在 Java 应用程序中集成 Chromium 浏览器。它支持 HTML5、CSS3、JavaScript 等,具备硬件加速渲染、双向 Java 与 JavaScript 连接、丰富的事件监听等功能,能处理网页保存、打印等操作,助…...