当前位置: 首页 > article >正文

TDengine 语言连接器(Go)

简介

driver-go 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 database/sql 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。

Go 版本兼容性

支持 Go 1.14 及以上版本。

支持的平台

  • 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。
  • WebSocket/REST 连接支持所有能运行 Go 的平台。

版本历史

driver-go 版本主要变化TDengine 版本
v3.7.0支持 decimal 类型3.3.6.0 及更高版本
v3.6.0stmt2 原生接口,DSN 支持密码包含特殊字符(url.QueryEscape)3.3.5.0 及更高版本
v3.5.8修复空指针异常-
v3.5.7taosWS 和 taosRestful 支持传入 request id-
v3.5.6提升 websocket 查询和写入性能3.3.2.0 及更高版本
v3.5.5restful 支持跳过 ssl 证书检查-
v3.5.4兼容 TDengine 3.3.0.0 tmq raw data-
v3.5.3重构 taosWS-
v3.5.2websocket 压缩和优化消息订阅性能3.2.3.0 及更高版本
v3.5.1原生 stmt 查询和 geometry 类型支持3.2.1.0 及更高版本
v3.5.0获取消费进度及按照指定进度开始消费3.0.5.0 及更高版本
v3.3.1基于 websocket 的 schemaless 协议写入3.0.4.1 及更高版本
v3.1.0提供贴近 kafka 的订阅 api-
v3.0.4新增 request id 相关接口3.0.2.2 及更高版本
v3.0.3基于 websocket 的 statement 写入-
v3.0.2基于 websocket 的数据查询和写入3.0.1.5 及更高版本
v3.0.1基于 websocket 的消息订阅-
v3.0.0适配 TDengine 3.0 查询和写入3.0.0.0 及更高版本

异常和错误码

如果是 TDengine 错误可以通过以下方式获取错误码和错误信息。

// import "github.com/taosdata/driver-go/v3/errors"if err != nil {tError, is := err.(*errors.TaosError)if is {fmt.Println("errorCode:", int(tError.Code))fmt.Println("errorMessage:", tError.ErrStr)} else {fmt.Println(err.Error())}}

TDengine 其他功能模块的报错,请参考 错误码

数据类型映射

TDengine DataTypeGo Type
TIMESTAMPtime.Time
TINYINTint8
SMALLINTint16
INTint32
BIGINTint64
TINYINT UNSIGNEDuint8
SMALLINT UNSIGNEDuint16
INT UNSIGNEDuint32
BIGINT UNSIGNEDuint64
FLOATfloat32
DOUBLEfloat64
BOOLbool
BINARYstring
NCHARstring
JSON[]byte
GEOMETRY[]byte
VARBINARY[]byte
DECIMALstring

注意:JSON 类型仅在 tag 中支持。
GEOMETRY 类型是 little endian 字节序的二进制数据,符合 WKB 规范。详细信息请参考 数据类型
WKB 规范请参考Well-Known Binary (WKB)

示例程序汇总

示例程序源码请参考:示例程序

常见问题

  1. database/sql 中 stmt(参数绑定)相关接口崩溃

    REST 不支持参数绑定相关接口,建议使用db.Execdb.Query

  2. 使用 use db 语句后执行其他语句报错 [0x217] Database not specified or available

    在 REST 接口中 SQL 语句的执行无上下文关联,使用 use db 语句不会生效,解决办法见上方使用限制章节。

  3. 使用 taosSql 不报错使用 taosRestful 报错 [0x217] Database not specified or available

    因为 REST 接口无状态,使用 use db 语句不会生效,解决办法见上方使用限制章节。

  4. readBufferSize 参数调大后无明显效果

    readBufferSize 调大后会减少获取结果时 syscall 的调用。如果查询结果的数据量不大,修改该参数不会带来明显提升,如果该参数修改过大,瓶颈会在解析 JSON 数据。如果需要优化查询速度,需要根据实际情况调整该值来达到查询效果最优。

  5. disableCompression 参数设置为 false 时查询效率降低

    disableCompression 参数设置为 false 时查询结果会使用 gzip 压缩后传输,拿到数据后要先进行 gzip 解压。

  6. go get 命令无法获取包,或者获取包超时

设置 Go 代理 go env -w GOPROXY=https://goproxy.cn,direct

API 参考

database/sql 驱动

driver-go 实现了 Go 的 database/sql/driver 接口,可以直接使用 Go 的 database/sql 包。提供了三个驱动:github.com/taosdata/driver-go/v3/taosSqlgithub.com/taosdata/driver-go/v3/taosRestfulgithub.com/taosdata/driver-go/v3/taosWS 分别对应 原生连接REST 连接WebSocket 连接

DSN 规范

数据源名称具有通用格式,例如 PEAR DB,但没有类型前缀(方括号表示可选):

[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...&paramN=valueN]

完整形式的 DSN:

username:password@protocol(address)/dbname?param=value

当密码中包含特殊字符时,需要使用 url.QueryEscape 进行转义。

原生连接

导入驱动:

import ("database/sql"_ "github.com/taosdata/driver-go/v3/taosSql"
)

使用 taosSql 作为 driverName 并且使用一个正确的 DSN 作为 dataSourceName 如下:

var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)

支持的 DSN 参数:

  • cfg 指定 taos.cfg 目录
  • cgoThread 指定 cgo 同时执行的数量,默认为系统核数
  • cgoAsyncHandlerPoolSize 指定异步函数的 handle 大小,默认为 10000
Rest 连接

导入驱动:

import ("database/sql"_ "github.com/taosdata/driver-go/v3/taosRestful"
)

使用 taosRestful 作为 driverName 并且使用一个正确的 DSN 作为 dataSourceName 如下:

var taosUri = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosUri)

支持的 DSN 参数:

  • disableCompression 是否接受压缩数据,默认为 true 不接受压缩数据,如果传输数据使用 gzip 压缩设置为 false。
  • readBufferSize 读取数据的缓存区大小默认为 4K(4096),当查询结果数据量多时可以适当调大该值。
  • token 连接云服务时使用的 token。
  • skipVerify 是否跳过证书验证,默认为 false 不跳过证书验证,如果连接的是不安全的服务设置为 true。
WebSocket 连接

导入驱动:

import ("database/sql"_ "github.com/taosdata/driver-go/v3/taosWS"
)

使用 taosWS 作为 driverName 并且使用一个正确的 DSN 作为 dataSourceName 如下:

var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)

支持的 DSN 参数:

  • enableCompression 是否发送压缩数据,默认为 false 不发送压缩数据,如果传输数据使用压缩设置为 true。
  • readTimeout 读取数据的超时时间,默认为 5m。
  • writeTimeout 写入数据的超时时间,默认为 10s。

:::note

  • 与原生连接方式不同,REST 接口是无状态的。在使用 REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。
  • 如果在 DSN 中指定了 dbname,那么,REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。

:::

连接功能

Go 驱动支持创建连接,返回支持 sql/driver 标准的 Connector 接口的对象,还提供了 af 包,扩充了一些无模式写入接口。

标准接口

database/sql 包中创建连接的接口

  • func Open(driverName, dataSourceName string) (*DB, error)
    • 接口说明:(database/sql) 连接数据库
    • 参数说明
      • driverName:驱动名称。
      • dataSourceName:连接参数 DSN。
    • 返回值:连接对象,错误信息。
扩展接口

af 包中创建连接的接口

  • func Open(host, user, pass, db string, port int) (*Connector, error)
    • 接口说明:连接数据库。
    • 参数说明
      • host:主机地址。
      • user:用户名。
      • pass:密码。
      • db:数据库名称。
      • port:端口号。
    • 返回值:连接对象,错误信息。
无模式写入

af 包中使用原生连接进行无模式写入的接口。

  • func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error

    • 接口说明:无模式写入 influxDB 格式数据。
    • 参数说明
      • lines:写入的数据。
      • precision:时间精度。
    • 返回值:错误信息。
  • func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error

    • 接口说明:无模式写入 OpenTSDB JSON 格式数据。
    • 参数说明
      • payload:写入的数据。
    • 返回值:错误信息。
  • func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error

    • 接口说明:无模式写入 OpenTSDB Telnet 格式数据。
    • 参数说明
      • lines:写入的数据。
    • 返回值:错误信息。

ws/schemaless 包中使用 WebSocket 无模式写入的接口

  • func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error
    • 接口说明:无模式写入数据。
    • 参数说明
      • lines:写入的数据。
      • protocol:写入的数据协议支持的协议 InfluxDBLineProtocol = 1 OpenTSDBTelnetLineProtocol = 2 OpenTSDBJsonFormatProtocol = 3
      • precision:时间精度。
      • ttl:数据过期时间,0 表示不过期。
      • reqID:请求 ID。
    • 返回值:错误信息。

执行 SQL

Go 驱动提供了符合 database/sql 标准的接口,支持以下功能:

  1. 执行 SQL 语句:执行静态 SQL 语句,并返回其生成的结果对象。
  2. 查询执行:可以执行返回数据集的查询(SELECT 语句)。
  3. 更新执行:可以执行影响行数的 SQL 语句,如 INSERTUPDATEDELETE 等。
  4. 获取结果:可以获取查询执行后返回的结果集,并遍历查询返回的数据。
  5. 获取更新计数:对于非查询 SQL 语句,可以获取执行后影响的行数。
  6. 关闭资源:释放数据库资源。
标准接口
  • func (db *DB) Close() error

    • 接口说明:关闭连接。
    • 返回值:错误信息。
  • func (db *DB) Exec(query string, args ...any) (Result, error)

    • 接口说明:执行查询但不返回任何行。
    • 参数说明
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:Result 对象(只有影响行数),错误信息。
  • func (db *DB) Query(query string, args ...any) (*Rows, error)

    • 接口说明:执行查询并返回行的结果。
    • 参数说明
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:Rows 对象,错误信息。
  • func (db *DB) QueryRow(query string, args ...any) *Row

    • 接口说明:执行查询并返回一行结果。
    • 参数说明
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:Row 对象。
扩展接口
  • func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)

    • 接口说明:执行查询但不返回任何行。
    • 参数说明
      • ctx:上下文,使用 Value 传递请求 id 进行链路追踪,key 为 taos_req_id value 为 int64 类型值。
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:结果 Result 对象(只有影响行数),错误信息。
  • func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

    • 接口说明:执行查询并返回行结果。
    • 参数说明
      • ctx:上下文,使用 Value 传递请求 id 进行链路追踪,key 为 taos_req_id value 为 int64 类型值。
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:结果集 Rows 对象,错误信息。
  • func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row

    • 接口说明:执行查询并返回一行结果,错误信息会在扫描 Row 时延迟返回。
    • 参数说明
      • ctx:上下文,使用 Value 传递请求 id 进行链路追踪,key 为 taos_req_id value 为 int64 类型值。
      • query:要执行的命令。
      • args:命令参数。
    • 返回值:单行结果 Row 对象。

结果获取

Go 驱动支持获取查询结果集,以及对应的结果集元数据,提供了用于读取结果集中元数据和数据的方法。

结果集

通过 Rows 对象获取查询结果集,提供了以下方法:

  • func (rs *Rows) Next() bool

    • 接口说明:准备下一行数据。
    • 返回值:是否有下一行数据。
  • func (rs *Rows) Columns() ([]string, error)

    • 接口说明:返回列名。
    • 返回值:列名,错误信息。
  • func (rs *Rows) Scan(dest ...any) error

    • 接口说明:将当前行的列值复制到 dest 指向的值中。
    • 参数说明
      • dest:目标值。
    • 返回值:错误信息。
  • func (rs *Rows) Close() error

    • 接口说明:关闭行。
    • 返回值:错误信息。
  • func (r *Row) Scan(dest ...any) error

    • 接口说明:将当前行的列值复制到 dest 指向的值中。
    • 参数说明
      • dest:目标值。
    • 返回值:错误信息。

通过 Result 对象获取更新结果集,提供了以下方法:

  • func (dr driverResult) RowsAffected() (int64, error)
    • 接口说明:返回受影响的行数。
    • 返回值:受影响的行数,错误信息。
结果集元数据

通过 Rows 对象获取查询结果集元数据,提供了以下方法:

  • func (rs *Rows) ColumnTypes() ([]*ColumnType, error)

    • 接口说明:返回列类型。
    • 返回值:列类型,错误信息。
  • func (ci *ColumnType) Name() string

    • 接口说明:返回列名。
    • 返回值:列名。
  • func (ci *ColumnType) Length() (length int64, ok bool)

    • 接口说明:返回列长度。
    • 返回值:列长度,是否有长度。
  • func (ci *ColumnType) ScanType() reflect.Type

    • 接口说明:返回列类型对应的 Go 类型。
    • 返回值:列类型。
  • func (ci *ColumnType) DatabaseTypeName() string

    • 接口说明:返回列类型数据库名称。
    • 返回值:列类型名称。

参数绑定

Prepare 允许使用预编译的 SQL 语句,可以提高性能并提供参数化查询的能力,从而增加安全性。

标准接口

使用 sql/driverConn 接口中的 Prepare 方法准备一个与此连接绑定的准备好的语句,返回 Stmt 对象,使用。

  • Prepare(query string) (Stmt, error)

    • 接口说明:准备返回一个与此连接绑定的准备好的语句 (statement)。
    • 参数说明
      • query:要进行参数绑定的语句。
    • 返回值:Stmt 对象,错误信息。
  • func (s *Stmt) Exec(args ...any) (Result, error)

    • 接口说明:使用给定的参数执行准备好的语句并返回总结该语句效果的结果(只可以绑定列值,不支持绑定表名和 tag)。
    • 参数说明
      • args:命令参数,Go 原始类型会自动转换数据库类型,类型不匹配可能会丢精度,建议使用与数据库相同的类型,时间类型使用 int64 或 RFC3339Nano 格式化后的字符串。
    • 返回值:结果 Result 对象(只有影响行数),错误信息。
  • func (s *Stmt) Query(args ...any) (*Rows, error)

    • 接口说明:使用给定的参数执行准备好的语句并返回行的结果。
    • 参数说明
      • args:命令参数,Go 原始类型会自动转换数据库类型,类型不匹配可能会丢精度,建议使用与数据库相同的类型,时间类型使用 int64 或 RFC3339Nano 格式化后的字符串。
    • 返回值:结果集 Rows 对象,错误信息。
  • func (s *Stmt) Close() error

    • 接口说明:关闭语句。
    • 返回值:错误信息。
扩展接口

af 包中提供了使用原生连接进行参数绑定的更多接口

  • func (conn *Connector) Stmt() *Stmt

    • 接口说明:返回一个与此连接绑定的 Stmt 对象。
    • 返回值:Stmt 对象。
  • func (s *Stmt) Prepare(sql string) error

    • 接口说明:准备一个 sql。
    • 参数说明
      • sql:要进行参数绑定的语句。
    • 返回值:错误信息。
  • func (s *Stmt) NumParams() (int, error)

    • 接口说明:返回参数数量。
    • 返回值:参数数量,错误信息。
  • func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param)

    • 接口说明:设置表名和 tag。
    • 参数说明
      • tableName:表名。
      • tags:tag。
    • 返回值:错误信息。
  • func (s *Stmt) SetTableName(tableName string) error

    • 接口说明:设置表名。
    • 参数说明
      • tableName:表名。
    • 返回值:错误信息。
  • func (s *Stmt) BindRow(row *param.Param) error

    • 接口说明:绑定行。
    • 参数说明
      • row:行数据。
    • 返回值:错误信息。
  • func (s *Stmt) GetAffectedRows() int

    • 接口说明:获取受影响的行数。
    • 返回值:受影响的行数。
  • func (s *Stmt) AddBatch() error

    • 接口说明:添加批处理。
    • 返回值:错误信息。
  • func (s *Stmt) Execute() error

    • 接口说明:执行批处理。
    • 返回值:错误信息。
  • func (s *Stmt) UseResult() (driver.Rows, error)

    • 接口说明:使用结果。
    • 返回值:结果集 Rows 对象,错误信息。
  • func (s *Stmt) Close() error

    • 接口说明:关闭语句。
    • 返回值:错误信息。

从 3.6.0 版本开始,提供 stmt2 绑定参数的接口

  • func (conn *Connector) Stmt2(reqID int64, singleTableBindOnce bool) *Stmt2
    • 接口说明:从连接创建 stmt2。
    • 参数说明
      • reqID:请求 ID。
      • singleTableBindOnce:单个子表在单次执行中只有一次数据绑定。
    • 返回值:stmt2 对象。
  • func (s *Stmt2) Prepare(sql string) error
    • 接口说明:绑定 sql 语句。
    • 参数说明
      • sql:要绑定的 sql 语句。
    • 返回值:错误信息。
  • func (s *Stmt2) Bind(params []*stmt.TaosStmt2BindData) error
    • 接口说明:绑定数据。
    • 参数说明
      • params 要绑定的数据。
    • 返回值:错误信息。
  • func (s *Stmt2) Execute() error
    • 接口说明:执行语句。
    • 返回值:错误信息。
  • func (s *Stmt2) GetAffectedRows() int
    • 接口说明:获取受影响行数(只在插入语句有效)。
    • 返回值:受影响行数。
  • func (s *Stmt2) UseResult() (driver.Rows, error)
    • 接口说明:获取结果集(只在查询语句有效)。
    • 返回值:结果集 Rows 对象,错误信息。
  • func (s *Stmt2) Close() error
    • 接口说明:关闭 stmt2。
    • 返回值:错误信息。

ws/stmt 包提供了通过 WebSocket 进行参数绑定的接口

  • func (c *Connector) Init() (*Stmt, error)

    • 接口说明:初始化。
    • 返回值:Stmt 对象,错误信息。
  • func (s *Stmt) Prepare(sql string) error

    • 接口说明:准备一个 sql。
    • 参数说明
      • sql:要进行参数绑定的语句。
    • 返回值:错误信息。
  • func (s *Stmt) SetTableName(name string) error

    • 接口说明:设置表名。
    • 参数说明
      • name:表名。
    • 返回值:错误信息。
  • func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)

    • 接口说明:设置 tag。
    • 参数说明
      • tags:tag。
      • bindType:类型信息。
    • 返回值:错误信息。
  • func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error

    • 接口说明:绑定参数。
    • 参数说明
      • params:参数。
      • bindType:类型信息。
    • 返回值:错误信息。
  • func (s *Stmt) AddBatch() error

    • 接口说明:添加批处理。
    • 返回值:错误信息。
  • func (s *Stmt) Exec() error

    • 接口说明:执行批处理。
    • 返回值:错误信息。
  • func (s *Stmt) GetAffectedRows() int

    • 接口说明:获取受影响的行数。
    • 返回值:受影响的行数。
  • func (s *Stmt) UseResult() (*Rows, error)

    • 接口说明:使用结果。
    • 返回值:Rows 对象,错误信息。
  • func (s *Stmt) Close() error

    • 接口说明:关闭语句。
    • 返回值:错误信息。

Rows 行结果参考 sql/driver 包中的 Rows 接口,提供以下接口

  • func (rs *Rows) Columns() []string

    • 接口说明:返回列名。
    • 返回值:列名。
  • func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string

    • 接口说明:返回列类型数据库名称。
    • 参数说明
      • i:列索引。
    • 返回值:列类型名称。
  • func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)

    • 接口说明:返回列长度。
    • 参数说明
      • i:列索引。
    • 返回值:列长度,是否有长度。
  • func (rs *Rows) ColumnTypeScanType(i int) reflect.Type

    • 接口说明:返回列类型对应的 Go 类型。
    • 参数说明
      • i:列索引。
    • 返回值:列类型。
  • func (rs *Rows) Next(dest []driver.Value) error

    • 接口说明:准备下一行数据,并赋值给目标。
    • 参数说明
      • dest:目标值。
    • 返回值:错误信息。
  • func (rs *Rows) Close() error

    • 接口说明:关闭行。
    • 返回值:错误信息。

common/param 包中提供了参数绑定数据结构

以下是按照偏移设置参数的接口:

  • func NewParam(size int) *Param

    • 接口说明:创建一个参数绑定数据结构。
    • 参数说明
      • size:参数数量。
    • 返回值:Param 对象。
  • func (p *Param) SetBool(offset int, value bool)

    • 接口说明:设置布尔值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:布尔值。
  • func (p *Param) SetNull(offset int)

    • 接口说明:设置空值。
    • 参数说明
      • offset:偏移量 (列或标签)。
  • func (p *Param) SetTinyint(offset int, value int)

    • 接口说明:设置 Tinyint 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Tinyint 值。
  • func (p *Param) SetSmallint(offset int, value int)

    • 接口说明:设置 Smallint 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Smallint 值。
  • func (p *Param) SetInt(offset int, value int)

    • 接口说明:设置 Int 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Int 值。
  • func (p *Param) SetBigint(offset int, value int)

    • 接口说明:设置 Bigint 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Bigint 值。
  • func (p *Param) SetUTinyint(offset int, value uint)

    • 接口说明:设置 UTinyint 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:UTinyint 值。
  • func (p *Param) SetUSmallint(offset int, value uint)

    • 接口说明:设置 USmallint 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:USmallint 值。
  • func (p *Param) SetUInt(offset int, value uint)

    • 接口说明:设置 UInt 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:UInt 值。
  • func (p *Param) SetUBigint(offset int, value uint)

    • 接口说明:设置 UBigint 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:UBigint 值。
  • func (p *Param) SetFloat(offset int, value float32)

    • 接口说明:设置 Float 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Float 值。
  • func (p *Param) SetDouble(offset int, value float64)

    • 接口说明:设置 Double 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Double 值。
  • func (p *Param) SetBinary(offset int, value []byte)

    • 接口说明:设置 Binary 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Binary 值。
  • func (p *Param) SetVarBinary(offset int, value []byte)

    • 接口说明:设置 VarBinary 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:VarBinary 值。
  • func (p *Param) SetNchar(offset int, value string)

    • 接口说明:设置 Nchar 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Nchar 值。
  • func (p *Param) SetTimestamp(offset int, value time.Time, precision int)

    • 接口说明:设置 Timestamp 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Timestamp 值。
      • precision:时间精度。
  • func (p *Param) SetJson(offset int, value []byte)

    • 接口说明:设置 Json 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Json 值。
  • func (p *Param) SetGeometry(offset int, value []byte)

    • 接口说明:设置 Geometry 值。
    • 参数说明
      • offset:偏移量 (列或标签)。
      • value:Geometry 值。

以下是链式调用设置参数的接口:

  • func (p *Param) AddBool(value bool) *Param
    • 接口说明:添加布尔值。
    • 参数说明
      • value:布尔值。
    • 返回值:Param 对象。

其他类型与布尔值类似,具体接口如下:

  • AddNull
  • AddTinyint
  • AddSmallint
  • AddInt
  • AddBigint
  • AddUTinyint
  • AddUSmallint
  • AddUInt
  • AddUBigint
  • AddFloat
  • AddDouble
  • AddBinary
  • AddVarBinary
  • AddNchar
  • AddTimestamp
  • AddJson
  • AddGeometry

以下是设置列类型信息的接口:

  • func NewColumnType(size int) *ColumnType

    • 接口说明:创建一个列类型信息数据结构。
    • 参数说明
      • size:列数量。
    • 返回值:ColumnType 对象。
  • func (c *ColumnType) AddBool() *ColumnType

    • 接口说明:添加布尔类型。
    • 返回值:ColumnType 对象。

其他类型与布尔类型类似,具体接口如下:

  • AddTinyint
  • AddSmallint
  • AddInt
  • AddBigint
  • AddUTinyint
  • AddUSmallint
  • AddUInt
  • AddUBigint
  • AddFloat
  • AddDouble
  • AddBinary
  • AddVarBinary
  • AddNchar
  • AddTimestamp
  • AddJson
  • AddGeometry

数据订阅

Go 驱动支持数据订阅功能,提供了基于原生连接和 WebSocket 连接的数据订阅接口。原生实现在 af/tmq 包中,WebSocket 实现在 ws/tmq 包中。

消费者
  • func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
    • 接口说明:创建一个消费者。
    • 参数说明
      • conf:配置信息。
    • 返回值:Consumer 对象,错误信息。

配置信息定义为:

type ConfigValue interface{}
type ConfigMap map[string]ConfigValue

创建消费者支持属性列表:

  • ws.url:WebSocket 连接地址。
  • ws.message.channelLen:WebSocket 消息通道缓存长度,默认 0。
  • ws.message.timeout:WebSocket 消息超时时间,默认 5m。
  • ws.message.writeWait:WebSocket 写入消息超时时间,默认 10s。
  • ws.message.enableCompression:WebSocket 是否启用压缩,默认 false。
  • ws.autoReconnect:WebSocket 是否自动重连,默认 false。
  • ws.reconnectIntervalMs:WebSocket 重连间隔时间毫秒,默认 2000。
  • ws.reconnectRetryCount:WebSocket 重连重试次数,默认 3。

其他参数请参考:Consumer 参数列表,注意 TDengine 服务端自 3.2.0.0 版本开始消息订阅中的 auto.offset.reset 默认值发生变化。

  • func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

    • 接口说明:订阅主题。
    • 参数说明
      • topic:主题。
      • rebalanceCb:平衡回调(未使用)。
    • 返回值:错误信息。
  • func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error

    • 接口说明:订阅主题列表。
    • 参数说明
      • topics:主题列表。
      • rebalanceCb:平衡回调(未使用)。
    • 返回值:错误信息。
  • func (c *Consumer) Unsubscribe() error

    • 接口说明:取消订阅。
    • 返回值:错误信息。
  • func (c *Consumer) Poll(timeoutMs int) tmq.Event

    • 接口说明:轮询事件。
    • 参数说明
      • timeoutMs:超时时间。
    • 返回值:事件。
  • func (c *Consumer) Commit() ([]tmq.TopicPartition, error)

    • 接口说明:提交偏移量。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)

    • 接口说明:获取分配信息。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error

    • 接口说明:跳转到偏移量。
    • 参数说明
      • partition:分区和偏移信息。
      • ignoredTimeoutMs:超时时间(未使用)。
    • 返回值:错误信息。
  • func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)

    • 接口说明:获取提交的偏移量。
    • 参数说明
      • partitions:分区列表。
      • timeoutMs:超时时间。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)

    • 接口说明:提交偏移量。
    • 参数说明
      • offsets:偏移量列表。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)

    • 接口说明:获取当前偏移量。
    • 参数说明
      • partitions:分区列表。
    • 返回值:TopicPartition 列表,错误信息。
  • func (c *Consumer) Close() error

    • 接口说明:关闭消费者。
    • 返回值:错误信息。
消费记录

Poll 返回 tmq.Event 事件时,可以通过判断 tmq.Event 的类型获取消费记录或错误信息。当类型为 *tmq.DataMessage 时,可以获取消费记录。

  • func (m *DataMessage) Topic() string

    • 接口说明:获取主题。
    • 返回值:主题。
  • func (m *DataMessage) DBName() string

    • 接口说明:获取数据库名称。
    • 返回值:数据库名称。
  • func (m *DataMessage) Offset() Offset

    • 接口说明:获取偏移量。
    • 返回值:偏移量。
  • func (m *DataMessage) Value() interface{}

    • 接口说明:获取值,具体值为 []*tmq.data
    • 返回值:消费到的值。

tmq.data 结构如下:

type Data struct {TableName stringData      [][]driver.Value
}
  • TableName 为表名
  • Data 为数据,每个元素为一行数据,每行数据为一个数组,数组元素为列值。

当 Poll 返回类型为 tmq.Error 时,可以使用 func (e Error) Error() string 获取错误信息。

分区信息

当消费到数据类型为 *tmq.DataMessage 时,可以从 TopicPartition 属性中获取分区信息。

type TopicPartition struct {Topic     *stringPartition int32Offset    OffsetMetadata  *stringError     error
}
  • Topic:主题。
  • Partition:分区。
  • Offset:偏移量。
  • Metadata:元数据(未使用)。
  • Error:错误信息。

可以使用 func (p TopicPartition) String() string 获取分区信息。

偏移量元数据

TopicPartition 中获取的偏移量信息,可以通过 Offset 属性获取偏移量元数据。当偏移量为 -2147467247 时表示未设置偏移量。

反序列化

当消费到数据类型为 *tmq.DataMessage 时,可以使用 func (m *DataMessage) Value() interface{} 获取数据,数据类型为 []*tmq.data

附录

  • driver-go 文档。
  • 视频教程。

访问官网

更多内容欢迎访问 TDengine 官网

相关文章:

TDengine 语言连接器(Go)

简介 driver-go 是 TDengine 的官方 Go 语言连接器,实现了 Go 语言 database/sql 包的接口。Go 开发人员可以通过它开发存取 TDengine 集群数据的应用软件。 Go 版本兼容性 支持 Go 1.14 及以上版本。 支持的平台 原生连接支持的平台和 TDengine 客户端驱动支持…...

【AI大模型】大模型RAG技术Langchain4j 核心组件深入详解

目录 一、前言 二、Langchain4j概述 2.1 Langchain4j 是什么 2.2 Langchain4j 主要特点 2.3 Langchain4j 核心组件 2.4 Langchain4j 核心优势 三、Langchanin4j组件应用实战 3.1 前置准备 3.1.1 导入如下依赖 3.1.2 获取apikey 3.1.3 获取官方文档 3.2 聊天组件 3.…...

汉化进度100%

P3834 #include<bits/stdc.h> #define int long long #define 定义整型变量 int #define 这是一个常量 const #define 无返回值函数 void #define 这是一个循环条件在后面 for #define 定义结构体 struct #define 如果 if #define 否则 else #define 定义无返回值的 sig…...

最新如何在服务器中解决FFmpeg下载、安装和配置问题教程(Linux|Windows|Mac|Ubuntu)

最新如何在服务器中解决FFmpeg下载、安装和配置问题教程&#xff08;Linux&#xff5c;Windows&#xff5c;Mac&#xff5c;Ubuntu&#xff09; 摘要&#xff1a; FFmpeg是一个强大的开源工具&#xff0c;广泛应用于音视频处理&#xff0c;支持格式转换、视频剪辑、流媒体推送…...

Tkinter图像和多媒体处理

Tkinter不仅支持图形界面的构建,还能处理图像和多媒体内容。通过Canvas控件、PIL(Python Imaging Library)库和tkinter的内置功能,您可以在Tkinter应用中展示图像、处理图像并播放简单的多媒体内容。掌握这些技术可以帮助您创建更丰富的图形界面。 10.1 显示图像 Tkinter…...

【C语言】结构体 (深入)

前言&#xff1a; 在上一张讲解了结构体的基本知识&#xff0c;在本章深入讲解一下结构体。 如内存对齐&#xff0c;传参&#xff0c;实现尾段。 首先提一个问题吧&#xff0c;如下的代码结果输出是多少&#xff1f; #include <stdio.h> struct s1 {char name;int id…...

苍穹外卖day03

店铺状态接口 引入Redis&#xff0c;因为像存储店铺状态这种只有一个字段&#xff08;没必要存储在数据库&#xff09;&#xff0c;且登录后台就要被访问的数据&#xff08;加快查询速度&#xff0c;减少数据库压力&#xff09; 使用步骤&#xff1a;导入相关maven依赖、配置…...

文件流---------获取文件的内容到控制台

总流程&#xff1a;先创建一个文本文件------->里面写入一些内容&#xff08;纯字母和字母加文字&#xff09;-----------> 然后通过输入流获取文件里面的内容&#xff0c;两种方式。 1.第一种&#xff0c;获取单个的字符 &#xff0c;先创建文件 &#xff0c;java.txt…...

【PyTorch项目实战】反卷积(Deconvolution)

文章目录 一、卷积&#xff08;Convolution&#xff09;二、反卷积&#xff08;Deconvolution&#xff09; —— 又称去卷积1. 反卷积&#xff08;Richardson-Lucy&#xff0c;RL&#xff09; —— —— 通过不断迭代更新图像估计值2. 转置卷积&#xff08;Transpose Convoluti…...

SpringBoot无法访问静态资源文件CSS、Js问题

在做一个关于基于IDEASpringBootMaveThymeleaf的系统实现实验时候遇到了这个问题一直无法解决 后来看到一篇博客终于解决了。 springboot项目在自动生成的时候会有两个文件夹&#xff0c;一个是static,一个是templates&#xff0c;如果我们使用 <dependency><groupI…...

powerbi制作中国式复杂报表

今天主要想实现的功能是使用powerbi制作一个中国式的复杂报表&#xff0c;其中需要多表头&#xff0c;另外需要多个度量值如图我们最终要实现的样式是这样的&#xff1a; 错误示范 因为这些作为多表头的维度需要在同一行上作为不同的列显示所以他们需要来自于同一个字段&#…...

CMake中set_property接口及属性作用详解

在 CMake 中&#xff0c;set_property 是一个用于设置 属性&#xff08;Property&#xff09; 的核心命令。属性是 CMake 中用于控制构建过程的核心机制之一&#xff0c;可以理解为与特定对象&#xff08;如目标、目录、源文件等&#xff09;关联的键值对&#xff0c;用于存储配…...

设计模式——抽象工厂模式总结

理解了前面的工厂模式后&#xff0c;再理解抽象工厂模式就很容易了。 工厂模式&#xff1a;https://blog.csdn.net/inside802/article/details/147170118?spm1011.2415.3001.10575&sharefrommp_manage_link 抽象工厂模式就是工厂模式的更加抽象化&#xff0c;父类不仅不承…...

ChatGPT-如何让AI写作不那么生硬!

在使用聊天机器人撰写文章时&#xff0c;可能会遇到频繁使用“首先”、“其次”、“再次”等转折连接词&#xff0c;这会让文章显得呆板和机械&#xff0c;降低了阅读体验。 解决这个问题可以尝试以下方式&#xff01; 多样化连接词&#xff1a; 使用更多多样的连接词和过渡短…...

禁止页面滚动的方法-微信小程序

在微信小程序中&#xff0c;有几种方法可以禁止页面滚动&#xff1a; 一、通过页面配置禁止滚动 在页面的JSON配置文件中设置&#xff0c;此方法完全禁止页面的滚动行为&#xff1a; {"disableScroll": true }二、通过 CSS 样式禁止滚动 在页面的WXSS文件中添加&…...

C++——继承、权限对继承的影响

目录 继承基本概念 编程示例 1.基类&#xff08;父类&#xff09;Person 代码特点说明 权限对类的影响 ​编辑 编程示例 1. 公有继承 (public inheritance) 2. 保护继承 (protected inheritance) 3. 私有继承 (private inheritance) 重要规则 实际应用 继承基本概…...

js中 剩余运算符(Rest Operator )(...)和展开运算符(Spread Operator)(...)的区别及用法

1、基本说明 在JavaScript中&#xff0c;剩余运算符&#xff08;Rest Operator&#xff09;和展开运算符&#xff08;Spread Operator&#xff09;虽然在某些方面有相似之处&#xff0c;但它们各自有不同的用途和功能。下面详细解释这两种运算符的区别&#xff1a; 1.1. 剩余…...

雅思练习总结(二十六)

雅思练习总结(二十六) 本文章是雅思练习总结(二十六),总结了文章《MAKING EVERYDROP COUNT》,内容包括原文精翻,文章脉络总结,单词扩展学习3个部分 1 文章原文及翻译 MAKING EVERYDROP COUNT 翻译:让每一滴水,都充满价值 A The history of human civilisation i…...

华为手机清理大数据的方法

清理手机最大的问题是&#xff0c;手动和自动清理了多次&#xff0c;花费了很长时间&#xff0c;但是只腾挪出来了一点点空间&#xff0c;还是有很大空间无法使用&#xff0c;这篇文章就告诉你怎样做&#xff0c;以花瓣剪辑为例&#xff0c;如下&#xff1a; 删除数据&#xff…...

单元测试原则之——不要过度模拟

什么是过度模拟? 过度模拟(over-mocking)是指在单元测试中,模拟了太多依赖项,甚至模拟了本不需要模拟的简单对象或行为。过度模拟会导致: 测试代码变得复杂,难以阅读和维护。测试逻辑偏离了实际业务逻辑,无法验证真实代码的行为。忽略了被测单元与依赖项之间的真实交互…...

操作系统基础:07 我们的任务

课程回顾与后续规划 上节课我们探讨了操作系统的历史。了解历史能让我们明智&#xff0c;从操作系统的发展历程中&#xff0c;我们总结出两个核心的里程碑式图像&#xff1a;多进程&#xff08;多任务切换&#xff09;图像和文件操作图像 。Unix和Windows等系统的成功&#xf…...

微服务的服务调用详解以及常见解决方案对比

微服务服务调用详解 1. 服务调用分类 服务调用根据通信方式、同步性、实现模式可分为以下类型&#xff1a; 按通信协议分类 类型典型协议/框架特点RPC&#xff08;远程过程调用&#xff09;Dubbo、gRPC、Apache Thrift高性能、二进制协议、强类型定义HTTP/RESTSpring RestTe…...

Verilog:LED呼吸灯

模块接口说明 信号方向描述clk输入系统时钟&#xff08;100MHz&#xff0c;周期10ns&#xff09;rst_n输入低电平有效的异步复位信号led_en输入总使能信号&#xff08;1开启呼吸灯&#xff0c;0关闭&#xff09;speed_en输入呼吸速度调节使能信号speed[2:0]输入呼吸速度分级&a…...

一个很好用的vue2在线签名组件

在前端开发的日常工作中&#xff0c;我们常常会遇到需要用户进行在线签名的需求&#xff0c;比如电子合同签署、表单确认等场景。最近&#xff0c;我在项目里使用了一款极为好用的 Vue2 在线签名组件&#xff0c;今天就来和大家分享一下使用心得。 效果图 上代码 在 views 下…...

C语言实现TcpDump

一、 在 C 语言中实现 TCP 抓包功能&#xff0c;通常可以使用 libpcap 库。libpcap 是一个广泛使用的网络抓包库&#xff0c;它提供了捕获网络数据包的接口。 libpcap 是一个广泛使用的 C 语言库&#xff0c;用于捕获和过滤网络数据包。它提供了一个通用接口&#xff0c;用于访…...

吴恩达深度学习复盘(14)迁移学习|项目基本周期

迁移学习 迁移学习是一种机器学习技术&#xff0c;它允许我们将从一个任务中学习到的知识应用到另一个相关的任务中。其核心思想在于&#xff0c;很多情况下&#xff0c;从头开始训练一个模型需要大量的数据和计算资源&#xff0c;而迁移学习能够复用在已有数据上训练好的模型…...

【STM32】STemWin库,使用template API

目录 CubeMX配置 工程文件配置 Keil配置 STemwin配置 GUIConf.c LCDConf.c 打点函数 修改屏幕分辨率 GUI_X.c 主函数 添加区域填充函数 移植过程中需要一些参考手册&#xff0c;如下 STemwin使用指南 emWin User Guide & Reference Manual CubeMX配置 参考驱…...

Matlab Add Legend To Graph-图例添加到图

Add Legeng To Graph: Matlab的legend&#xff08;&#xff09;函数-图例添加到图 将图例添加到图 ,图例是标记绘制在图上的数据序列的有用方法。 下列示例说明如何创建图例并进行一些常见修改&#xff0c;例如更改位置、设置字体大小以及添加标题。您还可以创建具有多列的图…...

AI基础04-日志数据采集

上篇文章我们学习了视频的数据采集&#xff0c;今天主要了解一下日志数据采集的方法。日志数据采集的目的通常是&#xff1a;调试、运维监控和业务分析。调试主要是工程师在程序异常时针对关键环节把相关参数通过日志打印出来&#xff0c;找出哪个环节出现了问题。运维监控主要…...

文章记单词 | 第29篇(六级)

一&#xff0c;单词释义 AI /ˌeɪ ˈaɪ/ abbr. 人工智能&#xff08;Artificial Intelligence&#xff09;inventory /ˈɪnvəntri/ n. 存货清单&#xff1b;财产清单&#xff1b;库存货物&#xff1b;存货&#xff1b;v. 编制目录&#xff1b;开列清单&#xff1b;盘存cha…...