当前位置: 首页 > article >正文

go-zero(十八)结合Elasticsearch实现高效数据检索

go-zero结合Elasticsearch实现高效数据检索

1. Elasticsearch简单介绍

Elasticsearch(简称 ES) 是一个基于 Lucene 库 构建的 分布式、开源、实时搜索与分析引擎,采用 Apache 2.0 协议。它支持水平扩展,能高效处理大规模数据的存储、搜索、分析和可视化,是 ELK 栈(Elasticsearch、Logstash、Kibana)EFK 栈(Elasticsearch、Fluentd、Kibana) 的核心组件。

1.1核心特点

1. 分布式架构,天生可扩展
  • 自动分片(Sharding):数据自动分片到多个节点,单个索引可拆分为多个分片(Shard),支持水平扩展,轻松处理 PB 级数据。
  • 高可用性(Replication):每个分片支持多个副本(Replica),节点故障时自动切换,保证服务不中断。
  • 去中心化:无主节点依赖(7.x 后引入选举机制),节点自动发现和管理,简化集群运维。
2. 实时搜索与分析
  • 近实时(Near Real-Time):文档写入后默认 1 秒内可被搜索到,满足实时数据查询需求。
  • 全文搜索能力:基于 Lucene 实现高效的全文索引,支持分词、模糊搜索、短语匹配、高亮显示等。
3. 灵活的数据模型
  • JSON 文档存储:数据以 JSON 格式存储,支持动态映射(Dynamic Mapping),无需预定义严格 Schema(但推荐显式定义以优化性能)。
  • 多数据类型支持:处理文本、数值、日期、地理位置、二进制等数据类型,支持复杂嵌套结构。
4. 强大的查询与聚合 DSL
  • Query DSL(Domain-Specific Language):通过 JSON 格式的查询语言,支持布尔逻辑、范围查询、正则匹配、地理空间查询(如附近搜索)等。
  • 聚合分析(Aggregation):支持分组(Terms Aggregation)、统计(Avg/Max/Min)、桶分析(Bucket Aggregation)、管道聚合(Pipeline Aggregation),用于数据统计和可视化。
5. 生态系统丰富
  • 数据摄入:支持 Logstash(ETL 管道)、Beats(轻量级数据采集器)、Kafka 等数据源,以及直接通过 REST API 写入。
  • 可视化与分析:集成 Kibana 实现仪表盘、图表、日志分析;支持与 Grafana、Tableau 等工具对接。
  • 插件扩展:支持分词器(如中文分词 IK Analyzer)、安防(X-Pack Security)、机器学习(X-Pack ML)等插件。
6. 高性能与高可靠
  • 倒排索引优化:基于 Lucene 的倒排索引,查询速度随数据量增长保持稳定。
  • 分布式协调:通过集群节点自动负载均衡,支持自动故障转移和恢复。

1.2 应用场景

1. 日志管理与分析(最经典场景)
  • 场景:收集、存储、分析海量日志(如服务器日志、应用日志、微服务日志)。
  • 实现:通过 Logstash/Beats 采集日志,写入 ES,用 Kibana 进行日志检索、统计(如错误日志高频分析)、异常检测。
  • 优势:秒级查询亿级日志,支持按时间、服务名、错误码等多维度过滤和聚合。
2. 电商搜索与推荐
  • 场景:商品搜索(如淘宝、京东的搜索栏)、智能补全、相关商品推荐。
  • 功能:支持商品名称全文搜索、价格范围筛选、品牌 / 类目过滤、销量排序,结合地理位置搜索附近门店。
  • 技术点:分词器优化(如中文分词)、拼音搜索(解决输入错误)、搜索相关性排序(BM25 算法)。
3. 企业级搜索(内部知识库、文档检索)
  • 场景:企业内部文档搜索(如 Confluence、SharePoint 集成)、代码搜索、法律合同检索。
  • 优势:支持多语言文本处理、文档元数据过滤(如作者、创建时间)、权限控制(通过 X-Pack Security)。
4. 实时数据分析与仪表盘
  • 场景:业务指标监控(如网站 PV/UV、订单量实时统计)、用户行为分析(漏斗模型、留存率)。
  • 实现:将业务数据实时写入 ES,通过 Kibana 生成动态仪表盘,支持下钻分析(Drill Down)和预警通知。
5. 地理空间分析
  • 场景:物流轨迹追踪、共享单车位置查询、疫情传播热力图。
  • 功能:支持地理坐标(Geo-point)、地理区域(Geo-rectangle)、地理距离(Distance Query)查询,结合聚合生成热力图。
6. 安全与 SIEM(安全信息与事件管理)
  • 场景:网络安全日志分析、入侵检测、合规审计。
  • 实现:采集防火墙、WAF、IDS 等设备的日志,通过 ES 进行关联分析(如多维度事件关联)、异常流量检测。
7. 监控与运维(APM、基础设施监控)
  • 场景:应用性能监控(APM)、服务器指标(CPU / 内存 / 磁盘)监控、微服务链路追踪。
  • 工具链:结合 Elastic APM 采集指标数据,用 ES 存储,Kibana 可视化服务调用链、慢查询定位。

2. 环境部署

2.1部署 elasticsearch 和kibana

注意:elasticsearch 和kibana 版本号尽量一致, elasticsearch用8.x版本,kibana 也要用8.x版本,不然无法一起使用

创建 docker-compose.yml 文件:

version: '3'
services:elasticsearch:container_name: elasticsearchimage: bitnami/elasticsearch:8.9.0environment:- TZ=Asia/Shanghai- discovery.type=single-node- "ES_JAVA_OPTS=-Xms512m -Xmx512m"privileged: trueports:- "9200:9200"restart: alwaysnetworks:- go_zero_netkibana:container_name: kibanaimage: bitnami/kibana:8.9.0restart: alwaysenvironment:- TZ=Asia/Shanghai- I18N_LOCALE=zh-CN- ELASTICSEARCH_HOSTS=http://elasticsearch:9200  # 修正服务名引用ports:- "5601:5601"depends_on:- elasticsearch  # 显式声明依赖networks:- go_zero_netnetworks:go_zero_net:driver: bridgevolumes:esdata:driver: local

环境部署完成后,验证 Elasticsearch 是否启动成功:

curl http://localhost:9200

应该会返回类似以下的 JSON 响应:

{"name" : "d95962b2abfe","cluster_name" : "elasticsearch","cluster_uuid" : "TtO-vhldRGmlrZ6U1cIgQw","version" : {"number" : "8.9.0","build_flavor" : "default","build_type" : "tar","build_hash" : "8aa461beb06aa0417a231c345a1b8c38fb498a0d","build_date" : "2023-07-19T14:43:58.555259655Z","build_snapshot" : false,"lucene_version" : "9.7.0","minimum_wire_compatibility_version" : "7.17.0","minimum_index_compatibility_version" : "7.0.0"},"tagline" : "You Know, for Search"
}

使用浏览器访问 http://localhost:5601/ 如果访问成功,说明kibana正常启动

在这里插入图片描述

3.项目构建

3.1 编写 API 文件

创建 api/search/search.api 文件,定义搜索服务 API:

syntax = "v1"type (SearchRequest {Keyword   string  `json:"keyword"`Page      int     `json:"page,optional,default=1"`PageSize  int     `json:"pageSize,optional,default=10"`Category  string  `json:"category,optional"`MinPrice  float64 `json:"minPrice,optional"`MaxPrice  float64 `json:"maxPrice,optional"`SortField string  `json:"sortField,optional"`SortOrder string  `json:"sortOrder,optional,options=asc|desc"`}ProductItem {ID          string   `json:"id"`Name        string   `json:"name"`Description string   `json:"description"`Price       float64  `json:"price"`Category    string   `json:"category"`Tags        []string `json:"tags"`CreatedAt   int64    `json:"createdAt"`}SearchResponse {Total    int64         `json:"total"`Products []ProductItem `json:"products"`}IndexProductRequest {Product ProductItem `json:"product"`}IndexProductResponse {Success bool   `json:"success"`Message string `json:"message,optional"`}deleteProductRequest {ID string `json:"id"`}
)service search-api {@handler SearchProductspost /api/search/products (SearchRequest) returns (SearchResponse)@handler IndexProductpost /api/products/index (IndexProductRequest) returns (IndexProductResponse)@handler DeleteProductpost /api/delete/products (deleteProductRequest) returns (IndexProductResponse)
}

这个 API 定义文件包含了三个主要 API :

  • 搜索商品
  • 索引(创建/更新)商品
  • 删除商品

现在,切换到项目目录,使用 goctl 工具根据 API 定义生成代码:

# 生成代码
goctl api go -api api/search/search.api -dir .

3.2 封装 Elasticsearch 服务

接下来,我们需要将 Elasticsearch 集成到生成的代码中。

安装 Elasticsearch Go 客户端,因为我们环境部署的是8.X版本,所以这里go-elasticsearch也选择v8版本

go get github.com/elastic/go-elasticsearch/v8

添加 Elasticsearch 配置

修改 internal/config/config.go 文件,添加 Elasticsearch 配置:

package configimport ("github.com/zeromicro/go-zero/rest"
)type Config struct {rest.RestConfElasticsearch struct {Addresses []stringUsername  stringPassword  string}
}

同时,修改 etc/search-api.yaml 配置文件,添加 Elasticsearch 配置:

Name: search-api
Host: 0.0.0.0
Port: 8888Elasticsearch:Addresses:- http://localhost:9200Username: ""Password: ""

创建 internal/pkg/es/es.go 文件,实现 Elasticsearch 客户端的封装:

package esimport ("context""encoding/json""errors""log""strings""github.com/elastic/go-elasticsearch/v8""github.com/elastic/go-elasticsearch/v8/esapi"
)// ElasticsearchClient 封装ES客户端
type ElasticsearchClient struct {client *elasticsearch.Client
}// NewElasticsearchClient 创建新的ES客户端
func NewElasticsearchClient(addresses []string, username, password string) (*ElasticsearchClient, error) {cfg := elasticsearch.Config{Addresses: addresses,Username:  username,Password:  password,}client, err := elasticsearch.NewClient(cfg)if err != nil {return nil, err}// 测试连接res, err := client.Info()if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {return nil, errors.New("Elasticsearch connection failed")}return &ElasticsearchClient{client: client,}, nil
}// CreateIndex 创建索引
func (e *ElasticsearchClient) CreateIndex(index string, mapping string) error {res, err := e.client.Indices.Create(index,e.client.Indices.Create.WithBody(strings.NewReader(mapping)),)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to create index")}return nil
}// IndexExists 检查索引是否存在
func (e *ElasticsearchClient) IndexExists(index string) (bool, error) {res, err := e.client.Indices.Exists([]string{index})if err != nil {return false, err}defer res.Body.Close()return res.StatusCode == 200, nil
}// IndexDocument 索引单个文档
func (e *ElasticsearchClient) IndexDocument(index, id string, document interface{}) error {data, err := json.Marshal(document)if err != nil {return err}req := esapi.IndexRequest{Index:      index,DocumentID: id,Body:       strings.NewReader(string(data)),Refresh:    "true",}res, err := req.Do(context.Background(), e.client)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to index document")}return nil
}// Search 执行搜索请求
func (e *ElasticsearchClient) Search(index string, query map[string]interface{}) (map[string]interface{}, error) {var buf strings.Builderif err := json.NewEncoder(&buf).Encode(query); err != nil {return nil, err}res, err := e.client.Search(e.client.Search.WithContext(context.Background()),e.client.Search.WithIndex(index),e.client.Search.WithBody(strings.NewReader(buf.String())),e.client.Search.WithTrackTotalHits(true),)if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {var e map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&e); err != nil {return nil, err}log.Printf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"])return nil, errors.New("search error")}var r map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&r); err != nil {return nil, err}return r, nil
}// DeleteDocument 删除文档
func (e *ElasticsearchClient) DeleteDocument(index, id string) error {req := esapi.DeleteRequest{Index:      index,DocumentID: id,Refresh:    "true",}res, err := req.Do(context.Background(), e.client)if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to delete document")}return nil
}// BulkIndex 批量索引文档
func (e *ElasticsearchClient) BulkIndex(index string, documents []map[string]interface{}) error {var buf strings.Builderfor _, doc := range documents {// 获取文档IDid, ok := doc["id"].(string)if !ok {id = ""}// 从文档中删除ID字段,避免重复delete(doc, "id")// 创建索引操作元数据meta := map[string]interface{}{"index": map[string]interface{}{"_index": index,"_id":    id,},}// 将元数据写入缓冲区if err := json.NewEncoder(&buf).Encode(meta); err != nil {return err}// 将文档数据写入缓冲区if err := json.NewEncoder(&buf).Encode(doc); err != nil {return err}}// 执行批量请求res, err := e.client.Bulk(strings.NewReader(buf.String()), e.client.Bulk.WithIndex(index), e.client.Bulk.WithRefresh("true"))if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to bulk index documents")}return nil
}

3.3定义商品索引映射

创建 internal/model/product.go 文件,定义商品索引映射:

package modelconst (ProductIndex = "products"
)// ProductIndexMapping 商品索引映射
var ProductIndexMapping = `{"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": {"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding"]}}}},"mappings": {"properties": {"id": {"type": "keyword"},"name": {"type": "text","analyzer": "text_analyzer","fields": {"keyword": {"type": "keyword"}}},"description": {"type": "text","analyzer": "text_analyzer"},"price": {"type": "double"},"category": {"type": "keyword"},"tags": {"type": "keyword"},"createdAt": {"type": "date","format": "epoch_millis"}}}
}`// Product 商品模型
type Product struct {ID          string   `json:"id"`Name        string   `json:"name"`Description string   `json:"description"`Price       float64  `json:"price"`Category    string   `json:"category"`Tags        []string `json:"tags"`CreatedAt   int64    `json:"createdAt"`
}

Mapping介绍

  1. Settings(索引设置)
"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": { ... }
}
  • 分片设置

    • number_of_shards: 1 个主分片(适用于小规模或开发环境)。
    • number_of_replicas: 0 个副本(无冗余,生产环境建议至少设为 1)。
  • 分析器配置

    "analysis": {"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding"]}}
    }
    
    • 文本分析器text_analyzer):
      • 使用 standard 分词器(按词边界分词,适合大多数语言)。
      • 应用 lowercase 过滤器(转为小写)和 asciifolding 过滤器(将非 ASCII 字符转为等效 ASCII,如 ée)。
  1. Mappings(字段映射)
"mappings": {"properties": { ... }
}
  • 字段类型与用途
字段名类型用途与特点
idkeyword精确匹配(如商品 ID),不分析,用于过滤、排序或聚合。
nametext全文搜索字段,使用 text_analyzer 处理(支持分词、小写和 ASCII 折叠)。
.keyword子字段,保留原始值,用于精确匹配(如聚合品牌名)。
descriptiontext长文本描述,同样使用 text_analyzer 进行全文搜索。
pricedouble浮点型数值,支持范围查询(如 price > 100)。
categorykeyword分类标签(如 “electronics”),用于过滤和聚合。
tagskeyword标签数组(如 ["popular", "new"]),支持多值精确匹配。
createdAtdate日期类型(Unix 毫秒时间戳),支持范围查询(如按时间筛选)。

3.4 扩展 ServiceContext

修改 internal/svc/servicecontext.go 文件,添加 Elasticsearch 客户端:

package svcimport ("go-zero-es-demo/internal/config""go-zero-es-demo/internal/pkg/es"
)type ServiceContext struct {Config   config.ConfigEsClient *es.ElasticsearchClient
}func NewServiceContext(c config.Config) *ServiceContext {esClient, err := es.NewElasticsearchClient(c.Elasticsearch.Addresses,c.Elasticsearch.Username,c.Elasticsearch.Password,)if err != nil {panic(err)}return &ServiceContext{Config:   c,EsClient: esClient,}
}
//定义索引初始化函数
func InitElasticsearch(client *es.ElasticsearchClient) error {// 检查商品索引是否存在exists, err := client.IndexExists(model.ProductIndex)if err != nil {return err}// 如果索引不存在,则创建if !exists {err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)if err != nil {return err}}return nil
}

3.5 实现搜索逻辑

修改 internal/logic/searchproductslogic.go 文件,实现商品搜索功能:

package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type SearchProductsLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewSearchProductsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SearchProductsLogic {return &SearchProductsLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}func (l *SearchProductsLogic) SearchProducts(req *types.SearchRequest) (resp *types.SearchResponse, err error) {// 构建搜索查询from := (req.Page - 1) * req.PageSizesize := req.PageSize// 基本查询结构query := map[string]interface{}{"from": from,"size": size,}// 构建搜索条件boolQuery := map[string]interface{}{}mustClauses := []map[string]interface{}{}// 关键词搜索if req.Keyword != "" {mustClauses = append(mustClauses, map[string]interface{}{"multi_match": map[string]interface{}{"query":  req.Keyword,"fields": []string{"name^3", "description", "tags"},},})}// 分类过滤if req.Category != "" {mustClauses = append(mustClauses, map[string]interface{}{"term": map[string]interface{}{"category": req.Category,},})}// 价格范围过滤if req.MinPrice > 0 || req.MaxPrice > 0 {rangeQuery := map[string]interface{}{}if req.MinPrice > 0 {rangeQuery["gte"] = req.MinPrice}if req.MaxPrice > 0 {rangeQuery["lte"] = req.MaxPrice}mustClauses = append(mustClauses, map[string]interface{}{"range": map[string]interface{}{"price": rangeQuery,},})}// 添加bool查询if len(mustClauses) > 0 {boolQuery["must"] = mustClausesquery["query"] = map[string]interface{}{"bool": boolQuery,}} else {query["query"] = map[string]interface{}{"match_all": map[string]interface{}{},}}// 排序if req.SortField != "" {order := "asc"if req.SortOrder == "desc" {order = "desc"}query["sort"] = []map[string]interface{}{{req.SortField: map[string]interface{}{"order": order,},},}}// 执行搜索请求result, err := l.svcCtx.EsClient.Search(model.ProductIndex, query)if err != nil {return nil, err}// 解析搜索结果total := int64(result["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64))hits := result["hits"].(map[string]interface{})["hits"].([]interface{})products := make([]types.ProductItem, 0, len(hits))for _, hit := range hits {source := hit.(map[string]interface{})["_source"].(map[string]interface{})// 提取标签数组tags := []string{}if tagsRaw, ok := source["tags"].([]interface{}); ok {for _, tag := range tagsRaw {if tagStr, ok := tag.(string); ok {tags = append(tags, tagStr)}}}product := types.ProductItem{ID:          source["id"].(string),Name:        source["name"].(string),Description: source["description"].(string),Price:       source["price"].(float64),Category:    source["category"].(string),Tags:        tags,CreatedAt:   int64(source["createdAt"].(float64)),}products = append(products, product)}return &types.SearchResponse{Total:    total,Products: products,}, nil
}

3.6 实现索引和删除逻辑

修改 internal/logic/indexproductlogic.go 文件,实现商品索引功能:

package logicimport ("context""time""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type IndexProductLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewIndexProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *IndexProductLogic {return &IndexProductLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}func (l *IndexProductLogic) IndexProduct(req *types.IndexProductRequest) (resp *types.IndexProductResponse, err error) {// 如果未提供创建时间,则使用当前时间if req.Product.CreatedAt == 0 {req.Product.CreatedAt = time.Now().UnixMilli()}// 索引文档err = l.svcCtx.EsClient.IndexDocument(model.ProductIndex, req.Product.ID, req.Product)if err != nil {return &types.IndexProductResponse{Success: false,Message: "Failed to index product: " + err.Error(),}, nil}return &types.IndexProductResponse{Success: true,Message: "Product indexed successfully",}, nil
}

修改 internal/logic/deleteproductlogic.go 文件,实现商品删除功能:

package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type DeleteProductLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewDeleteProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteProductLogic {return &DeleteProductLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}func (l *DeleteProductLogic) DeleteProduct(req *types.DeleteProductRequest) (resp *types.IndexProductResponse, err error) {// todo: add your logic here and delete this line// 删除文档err = l.svcCtx.EsClient.DeleteDocument(model.ProductIndex, req.ID)if err != nil {return &types.IndexProductResponse{Success: false,Message: "Failed to delete product: " + err.Error(),}, nil}return &types.IndexProductResponse{Success: true,Message: "Product deleted successfully",}, nil
}

最后,修改主程序入口 search.go 文件,添加 Elasticsearch 索引初始化逻辑:


func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()ctx := svc.NewServiceContext(c)// 初始化 Elasticsearch 索引if err := svc.InitElasticsearch(ctx.EsClient); err != nil {panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))}handler.RegisterHandlers(server, ctx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)server.Start()
}

4. 项目测试

现在开始启动项目来测试

go run search.go

测试添加索引,我这里只粘贴了一个,其他数据自行补充:

curl --location '127.0.0.1:8888/api/products/index' \
--header 'Content-Type: application/json' \
--data '{"product":{"id": "1","name": "陶瓷马克杯","description": "简约北欧风格陶瓷马克杯,采用高温烧制,釉面光滑易清洗,适合日常咖啡或茶饮。","price": 39.9,"category": "家居","tags": ["厨房用品", "创意礼品", "陶瓷制品"],"createdAt": 1689321456}
}

运行结果:


{"success": true,"message": "Product indexed successfully"
}

测试索引搜索

curl --location '127.0.0.1:8888/api/search/products' \
--header 'Content-Type: application/json' \
--data '{"keyword" :"面包"
}
'

得到以下类似的结果:

{"total": 4,"products": [{"id": "5","name": "全麦面包","description": "无添加全麦面包,富含膳食纤维和蛋白质,口感松软,适合早餐搭配果酱或奶酪食用。","price": 19.8,"category": "食品","tags": ["健康食品","早餐必备","全麦谷物"],"createdAt": 1690001234},{"id": "4","name": "保湿面霜","description": "深层保湿面霜,含透明质酸和胶原蛋白成分,有效改善干燥肌肤,适合所有肤质日常护理。","price": 129,"category": "美妆","tags": ["护肤用品","保湿补水","温和配方"],"createdAt": 1687654321}]
}

测试索引删除

curl --location '127.0.0.1:8888/api/delete/products' \
--header 'Content-Type: application/json' \
--data '
{"id" :"666"
}'

5 功能拓展

完成基本功能后,我们可以添加一些高级功能,使搜索服务更加强大。

6.1 聚合查询

首先,在 API 定义文件 api/search/search.api 中添加新的类型和接口:

type (CategoryStat {Category  string  `json:"category"`Count     int64   `json:"count"`AvgPrice  float64 `json:"avgPrice"`MaxPrice  float64 `json:"maxPrice"`MinPrice  float64 `json:"minPrice"`}CategoryStatsResponse {CategoryStats []CategoryStat `json:"categoryStats"`}
)service search-api {// ...existing endpoints...@handler GetCategoryStatsget /api/stats/categories returns (CategoryStatsResponse)
}

使用 goctl 更新生成的代码:

goctl api go -api api/search/search.api -dir .

internal/pkg/es/es.go 中添加聚合查询方法:

// Aggregate 执行聚合查询
func (e *ElasticsearchClient) Aggregate(index string, query map[string]interface{}) (map[string]interface{}, error) {var buf strings.Builderif err := json.NewEncoder(&buf).Encode(query); err != nil {return nil, err}res, err := e.client.Search(e.client.Search.WithContext(context.Background()),e.client.Search.WithIndex(index),e.client.Search.WithBody(strings.NewReader(buf.String())),e.client.Search.WithSize(0), // 聚合查询通常不需要返回文档)if err != nil {return nil, err}defer res.Body.Close()if res.IsError() {var e map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&e); err != nil {return nil, err}return nil, errors.New("aggregate error")}var r map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&r); err != nil {return nil, err}return r, nil
}

实现统计逻辑,在 internal/logic/getcategorystatslogic.go 文件中:

package logicimport ("context""go-zero-es-demo/internal/model""go-zero-es-demo/internal/svc""go-zero-es-demo/internal/types""github.com/zeromicro/go-zero/core/logx"
)type GetCategoryStatsLogic struct {logx.Loggerctx    context.ContextsvcCtx *svc.ServiceContext
}func NewGetCategoryStatsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCategoryStatsLogic {return &GetCategoryStatsLogic{Logger: logx.WithContext(ctx),ctx:    ctx,svcCtx: svcCtx,}
}func (l *GetCategoryStatsLogic) GetCategoryStats() (resp *types.CategoryStatsResponse, err error) {// 构建聚合查询query := map[string]interface{}{"aggs": map[string]interface{}{"categories": map[string]interface{}{"terms": map[string]interface{}{"field": "category","size":  20,},"aggs": map[string]interface{}{"avg_price": map[string]interface{}{"avg": map[string]interface{}{"field": "price",},},"max_price": map[string]interface{}{"max": map[string]interface{}{"field": "price",},},"min_price": map[string]interface{}{"min": map[string]interface{}{"field": "price",},},},},},}// 执行聚合查询result, err := l.svcCtx.EsClient.Aggregate(model.ProductIndex, query)if err != nil {return nil, err}// 解析结果aggregations := result["aggregations"].(map[string]interface{})categories := aggregations["categories"].(map[string]interface{})buckets := categories["buckets"].([]interface{})stats := make([]types.CategoryStat, 0, len(buckets))for _, bucket := range buckets {b := bucket.(map[string]interface{})key := b["key"].(string)docCount := int64(b["doc_count"].(float64))avgPrice := b["avg_price"].(map[string]interface{})["value"].(float64)maxPrice := b["max_price"].(map[string]interface{})["value"].(float64)minPrice := b["min_price"].(map[string]interface{})["value"].(float64)stats = append(stats, types.CategoryStat{Category:  key,Count:     docCount,AvgPrice:  avgPrice,MaxPrice:  maxPrice,MinPrice:  minPrice,})}return &types.CategoryStatsResponse{CategoryStats: stats,}, nil
}

运行测试


curl --location '127.0.0.1:8888/api/stats/categories'

会得到如下的类似的结果:

{"categoryStats": [{"category": "家居","count": 2,"avgPrice": 26.4,"maxPrice": 39.9,"minPrice": 12.9},{"category": "服饰","count": 1,"avgPrice": 59.99,"maxPrice": 59.99,"minPrice": 59.99},{"category": "电子","count": 1,"avgPrice": 89.5,"maxPrice": 89.5,"minPrice": 89.5},{"category": "美妆","count": 1,"avgPrice": 129,"maxPrice": 129,"minPrice": 129},{"category": "运动","count": 1,"avgPrice": 79.5,"maxPrice": 79.5,"minPrice": 79.5},{"category": "食品","count": 1,"avgPrice": 19.8,"maxPrice": 19.8,"minPrice": 19.8}]
}

6.2 同义词搜索

要启用同义词搜索,需要修改索引映射。首先,修改 internal/model/product.go 文件中的索引映射:

// 修改 ProductIndexMapping 变量
var ProductIndexMapping = `{"settings": {"number_of_shards": 1,"number_of_replicas": 0,"analysis": {"filter": {"synonym_filter": {"type": "synonym","synonyms": ["音响, 音箱, 音像","衣服, 服装, 服饰","首饰, 手饰, 饰品"]}},"analyzer": {"text_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "asciifolding", "synonym_filter"]}}}},"mappings": {"properties": {"id": {"type": "keyword"},"name": {"type": "text","analyzer": "text_analyzer","fields": {"keyword": {"type": "keyword"}}},"description": {"type": "text","analyzer": "text_analyzer"},"price": {"type": "double"},"category": {"type": "keyword"},"tags": {"type": "keyword"},"createdAt": {"type": "date","format": "epoch_millis"}}}
}`

增加ynonym_filter: 同义词过滤器,定义了3组同义词:

"音响, 音箱, 音像",
"衣服, 服装, 服饰",
"首饰, 手饰, 饰品"

当修改索引映射后,需要重建索引。因此在 internal/pkg/es/es.go 中添加删除索引的方法:

// DeleteIndex 删除索引
func (e *ElasticsearchClient) DeleteIndex(index string) error {res, err := e.client.Indices.Delete([]string{index})if err != nil {return err}defer res.Body.Close()if res.IsError() {return errors.New("failed to delete index")}return nil
}

/svc/servicecontext.go 中添加更新映射的逻辑:

// UpdateElasticsearchIndex 更新索引映射(需要重建索引)
func UpdateElasticsearchIndex(client *es.ElasticsearchClient) error {// 检查索引是否存在exists, err := client.IndexExists(model.ProductIndex)if err != nil {return err}// 如果索引已存在,则删除并重建if exists {// 获取原索引的所有文档query := map[string]interface{}{"query": map[string]interface{}{"match_all": map[string]interface{}{},},"size": 10000, // 注意:实际应用中应使用滚动API处理大量数据}result, err := client.Search(model.ProductIndex, query)if err != nil {return err}// 提取文档hits := result["hits"].(map[string]interface{})["hits"].([]interface{})documents := make([]map[string]interface{}, 0, len(hits))for _, hit := range hits {hitMap := hit.(map[string]interface{})source := hitMap["_source"].(map[string]interface{})id := hitMap["_id"].(string)// 确保文档有IDsource["id"] = iddocuments = append(documents, source)}// 删除索引err = client.DeleteIndex(model.ProductIndex)if err != nil {return err}// 创建新索引err = client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)if err != nil {return err}// 如果有文档,重新索引它们if len(documents) > 0 {err = client.BulkIndex(model.ProductIndex, documents)if err != nil {return err}}return nil}// 如果索引不存在,则创建return client.CreateIndex(model.ProductIndex, model.ProductIndexMapping)
}

修改mian函数:


func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()ctx := svc.NewServiceContext(c)//更新并初始化索引err := svc.UpdateElasticsearchIndex(ctx.EsClient)if err != nil {return}/*// 初始化 Elasticsearch 索引if err := svc.InitElasticsearch(ctx.EsClient); err != nil {panic(fmt.Sprintf("初始化 Elasticsearch 失败: %v", err))}*/handler.RegisterHandlers(server, ctx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)server.Start()
}

为了方便测试,我们增加几个新的数据:

[{"id": "1","name": "豪华音像","description": "","price": 999.99,"category": "数码","tags": ["smartphone", "popular", "new"],"createdAt": 1715788800000  // 2025-05-14},{"id": "2","name": "智能音箱","description": "续航长达18小时","price": 1199.99,"category": "数码","tags": ["laptop", "apple", "portable"],"createdAt": 1715702400000  // 2025-05-13},{"id": "3","name": "轻便音响","description": "","price": 1799.99,"category": "数码","tags": ["tv", "4k", "oled"],"createdAt": 1715616000000  // 2025-05-12}
]

相关文章:

go-zero(十八)结合Elasticsearch实现高效数据检索

go-zero结合Elasticsearch实现高效数据检索 1. Elasticsearch简单介绍 Elasticsearch(简称 ES) 是一个基于 Lucene 库 构建的 分布式、开源、实时搜索与分析引擎,采用 Apache 2.0 协议。它支持水平扩展,能高效处理大规模数据的存…...

AM32电调学习解读九:ESC上电启动关闭全流程波形分析

这是第九篇,前面的文章把各个模块的实现都介绍了一轮,本章是从运行的角度结合波形图,把整个流程走一遍。 先看下一运行的配置,我把一些配置关闭了,这样跑起来会好分析一些,不同配置跑起来效果会有差异。使用…...

怎么打包发布到npm?——从零到一的详细指南

怎么打包发布到npm?——从零到一的详细指南 目录 怎么打包发布到npm?——从零到一的详细指南一、准备工作1. 注册 npm 账号2. 安装 Node.js 和 npm 二、初始化项目三、编写你的代码四、配置 package.json五、打包你的项目六、登录 npm七、发布到 npm八、…...

NX二次开发C#---遍历当前工作部件实体并设置颜色

该代码片段展示了如何在Siemens NX软件中使用C#进行自动化操作。通过NXOpen和UFSession API,代码首先获取当前工作部件,并遍历其中的所有实体。对于每个实体,代码检查其类型和子类型是否为“实体”,如果是,则将其颜色设…...

如何用体育数据做分析:从基础统计到AI驱动的决策科学

一、体育数据分析的演进与价值创造 体育数据分析已从简单的比分记录发展为融合统计学、计算机科学和运动科学的交叉学科。现代体育组织通过数据分析可以实现: 竞技表现提升:勇士队利用投篮热图优化战术布置 商业价值挖掘:曼联通过球迷行为数…...

09、底层注解-@Import导入组件

09、底层注解-Import导入组件 Import是Spring框架中的一个注解,用于将组件导入到Spring的应用上下文中。以下是Import注解的详细介绍: #### 基本用法 - **导入配置类** java Configuration public class MainConfig { // 配置内容 } Configuration Impo…...

【notes】VScode 使用总结

文章目录 扩展 c/cwindows7 系统下 c/c 自动升级导致的插件无法正常使用 设置 文件格式设置打开文件的默认格式 扩展 c/c windows7 系统下 c/c 自动升级导致的插件无法正常使用 问题 1. c/c扩展的1.25.x版本不再支持windows7 系统,当设置VScode自动升级拓展插件时…...

【论文阅读】KIMI K1.5: SCALING REINFORCEMENT LEARNING WITH LLMS

KIMI K1.5: SCALING REINFORCEMENT LEARNING WITH LLMS Scaling的解释: 通过系统性的方法扩展强化学习算法的能力,使其能够处理更复杂的问题、更大的状态/动作空间、更长的训练周期或更高效的资源利用 原文摘要: 研究背景与问题定位 传统预训…...

云服务器开发软件操作步骤

云服务器开发软件的主要步骤。通常,这包括选择云服务提供商、配置服务器环境、开发、测试、部署、维护等阶段。每个阶段都需要详细解释,可能需要分步骤说明。例如,选择云服务提供商时,需要考虑AWS、阿里云、腾讯云等,比…...

Qwen3 - 0.6B与Bert文本分类实验:深度见解与性能剖析

Changelog [25/04/28] 新增Qwen3-0.6B在Ag_news数据集Zero-Shot的效果。新增Qwen3-0.6B线性层分类方法的效果。调整Bert训练参数(epoch、eval_steps),以实现更细致的观察,避免严重过拟合的情况。 TODO: 利用Qwen3-0.6…...

4.6 sys模块

sys --- 仅作了解 面试之前冲击一下 python的垃圾回收机制 import sys # 1. api_version : 获取python的内部版本号 print(sys.api_version) #1013 # 2. copyright: 获取cpython的版本 print(sys.copyright) #3.getfilesystemencoding() getdefaultencoding():获…...

UWB定位方案在水力发电站人员安全的应用推荐

一、行业应用背景‌ 水力发电站具有‌环境复杂‌(金属设备密集、高温高压区域多)、‌安全风险高‌(人员误入高危区域易引发事故)等特点,传统定位技术难以满足精度与可靠性要求。品铂科技基于UWB的高精度定位系统已在多…...

青少年编程与数学 02-019 Rust 编程基础 16课题、包、单元包及模块

青少年编程与数学 02-019 Rust 编程基础 16课题、包、单元包及模块 一、包1. **什么是 Crate?**2. **Crate 的类型**3. **Crate 的结构**4. **使用 Crate**5. **创建和管理 Crate**6. **发布 Crate**7. **Crate 的优势**8. **示例**创建一个 library crate 二、单元…...

bat 批处理获取日期、时间

在Windows批处理脚本编程中,获取当前日期和时间是一项常见且重要的操作。 1. 获取当前日期和时间的基本脚本 echo off for /F "tokens2" %%i in (date /t) do set mydate%%i set mytime%time% echo Current time is %mydate%:%mytime%输出示例&#xff…...

手写tomcat:基本功能实现(3)

TomcatRoute类 TomcatRoute类是Servlet容器,是Tomcat中最核心的部分,其本身是一个HashMap,其功能为:将路径和对象写入Servlet容器中。 package com.qcby.config;import com.qcby.Util.SearchClassUtil; import com.qcby.servlet…...

Spring Cloud Seata 快速入门及生产实战指南

文章目录 前言一、快速入门(AT模式)二、生产环境实战要点总结 前言 上一篇博客带大家深入解析Seata的核心原理及架构,理解了“为什么需要分布式事务”以及“Seata如何解决数据一致性问题”,相信大家已经对分布式事务的理论框架有…...

电商平台自动化

为什么要进行独立站自动化 纯人工测试人力成本高,相对效率低 回归测试在通用模块重复进行人工测试,测试效率低 前期调研备选自动化框架(工具): Katalon Applitools Testim 阿里云EMAS Playwright Appium Cypress 相关…...

Java微服务架构实战:Spring Boot与Spring Cloud的完美结合

Java微服务架构实战:Spring Boot与Spring Cloud的完美结合 引言 随着云计算和分布式系统的快速发展,微服务架构已成为现代软件开发的主流模式。Java作为企业级应用开发的核心语言,结合Spring Boot和Spring Cloud,为开发者提供了…...

王树森推荐系统公开课 召回11:地理位置召回、作者召回、缓存召回

GeoHash 召回 属于地理位置召回,用户可能对附近发生的事情感兴趣。GeoHash 是一种对经纬度的编码,地图上每个单位矩形的 GeoHash 的前几位是相同的,GeoHash 编码截取前几位后,将相同编码发布的内容按时间顺序(先是时间…...

无刷直流水泵构成及工作原理详解--【其利天下技术】

无刷直流水泵是相对于有刷直流泵而言的。 一:无刷直流水泵简介 无刷直流水泵即BLDC PUMP,其中“BL”意为“无刷”,DC即直流电机。 无刷直流水泵(BLDC PUMP)以电子换向器取代了机械换向器,所以无刷直流水泵既具有直流电机良好的调…...

less中使用 @supports

在Less中使用supports supports 是CSS的条件规则,用于检测浏览器是否支持特定的CSS属性或值。在Less中,你可以像在普通CSS中一样使用supports,同时还能利用Less的特性来增强它。 基本用法 /* 检测浏览器是否支持display: flex */ supports …...

大数据:新能源汽车宇宙的未来曲率引擎

** 发布日期:2025-05-14** 关键词:大数据、新能源、机器学习、碳中和、CSDN爆款 1. 大数据科普:定义、特征与技术核心 1.1 什么是大数据? 大数据(Big Data)指规模巨大、类型多样、生成速度快且价值密度低…...

【Java ee】关于抓包软件Fiddler Classic的安装与使用

Web Debugging Proxy Tool | Fiddler Classic 安装网站↑ 下载好安装包之后,双击一路next就可以了 一、抓包软件 电脑上安装了抓包软件之后,抓包软件就可以监听你的网卡上通过的数据。 本来是你的客户端通过网卡,把数据发给目标服务器&a…...

第五部分:第五节 - Express 路由与中间件进阶:厨房的分工与异常处理

随着你的 Express 应用变得越来越大,所有的路由和中间件都写在一个文件里会变得难以管理。这时候就需要将代码进行拆分和组织。此外,一个健壮的后端应用必须能够优雅地处理错误和一些常见的 Web 开发问题,比如跨域。 路由模块化 (express.Ro…...

在 CentOS 7.9 上部署 node_exporter 并接入 Prometheus + Grafana 实现主机监控

文章目录 在 CentOS 7.9 上部署 node_exporter 并接入 Prometheus Grafana 实现主机监控环境说明node_exporter 安装与配置下载并解压 node_exporter创建 Systemd 启动服务验证服务状态验证端口监听 Prometheus 配置 node_exporter 监控项修改 prometheus.yml重新加载 Prometh…...

C++--内存管理

内存管理 1. C/C内存分布 在C语言阶段,常说局部变量存储在栈区,动态内存中的数据存储在堆区,静态变量存储在静态区(数据段),常量存储在常量区(代码段),其实这里所说的栈…...

Java实现PDF加水印功能:技术解析与实践指南

Java实现PDF加水印功能:技术解析与实践指南 在当今数字化办公环境中,PDF文件因其跨平台兼容性和格式稳定性而被广泛应用。然而,为了保护文档的版权、标记文档状态(如“草稿”“机密”等)或增加文档的可追溯性&#xf…...

Django + Celery 打造企业级大模型异步任务管理平台 —— 从需求到完整实践(含全模板源码)

如需完整工程文件(含所有模板),可回复获取详细模板代码。 面向人群:自动化测试工程师、企业中后台开发人员、希望提升效率的 AI 业务从业者 核心收获:掌握 Django 三表关系设计、Celery 异步任务实践、基础 Web 交互与前后端分离思路,源码可直接落地,方便二次扩展 一、系…...

TC3xx学习笔记-UCB BMHD使用详解(二)

文章目录 前言Confirmation的定义Dual UCB: Confirmation StatesDual UCB: Errored State or ECC Error in the UCB Confirmation CodesECC Error in the UCB ContentDual Password UCB ORIG and COPY Re-programming UCB_BMHDx_ORIG and UCB_BMHDx_COPY (x 0-3)BMHD Protecti…...

用Python实现数据库数据自动化导出PDF报告:从MySQL到个性化文档的全流程实践

本文将介绍如何使用Python构建一个自动化工具,实现从MySQL数据库提取员工数据,并为每位员工生成包含定制化表格的PDF报告。通过该方案,可显著提升数据导出效率,避免手动操作误差,同时支持灵活的格式定制。 需求&#…...