OceanBase 配置项系统变量实现及应用详解(3):新增配置项的方法
本专题的第一篇文章,配置项的定义及使用方法,详细阐述了配置项的基础用法。对于那些对源码抱有浓厚兴趣的同学来说,或许还希望深入了解配置项的实现原理,甚至渴望亲自添加新的配置项,以满足个性化的功能需求。
本文通过剖析“如何新增配置项”这一话题,并结合配置项源码的具体实现,来详细讲解配置项的定义、初始化、内部访问及同步机制。
如何新增配置项?
要新增一个配置项,首先在 src/share/parameter/ob_parameter_seed.ipp 文件中,按照下面的格式定义配置项。
// 定义一个集群级别的、INT类型的配置项,动态生效
DEF_INT(cpu_count, OB_CLUSTER_PARAMETER, "0", "[0,]","the number of CPU\\'s in the system. ""If this parameter is set to zero, the number will be set according to sysconf; ""otherwise, this parameter is used. Range: [0,+∞) in integer",ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));// 定义一个租户级别的、TIME类型的配置项,动态生效,并附带一个合法性检查类 ObConfigStaleTimeChecker
DEF_TIME_WITH_CHECKER(max_stale_time_for_weak_consistency, OB_TENANT_PARAMETER, "5s",common::ObConfigStaleTimeChecker,"[5s,)","the max data stale time that cluster weak read version behind current timestamp,""no smaller than weak_read_version_refresh_interval, range: [5s, +∞)",ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
其中每个参数的含义如下:
参数 | 说明 | 举例 |
配置项宏 | 定义配置的宏,可以选择附带合法性检查函数 | DEF_XXX、DEF_XXX_WITH_CHECKER |
配置项名 | 配置项的名字 | 一般格式:xxx_xxx_xxx |
生效范围 | 租户配置项 or 集群配置项 | OB_CLUSTER_PARAMETER、OB_TENANT_PARAMETER |
默认值 | 默认值必须在取值范围中 | "0", "True", "100ms", "20GB", "random" |
取值范围 | 布尔类型和字符串类型不需要取值范围 | "[0,]", "[0M,]", "[1s, 180s]" |
描述 | 介绍该配置项的功能和注意事项 | "enable xxx" |
配置项属性 | ① Section:所属模块; ② Source:来源; ③ EditLevel:生效方式; | ① OBSERVER、TENANT、TRANS ② DEFAULT ③ DYNAMIC_EFFECTIVE、STATIC_EFFECTIVE |
DEF_XXX 宏最终展开是一个类的声明,并同时定义一个对象,对象名就是配置项名。
比如 DEF_TIME_WITH_CHECKER 宏,最终展开是一个 ObConfigTimeItem 类,重载 operator= 操作符让该类可以像基础数据类型一样进行赋值操作。
#define _DEF_PARAMETER_CHECKER_EASY(access_specifier, param, scope, name, def, checker, args...) \
access_specifier: \class ObConfig ## param ## Item ## _ ## name \: public common::ObConfig ## param ## Item \{ \public: \ObConfig ## param ## Item ## _ ## name() \: common::ObConfig ## param ## Item( \local_container(), scope, #name, def, args) \{ \add_checker(OB_NEW(checker, g_config_mem_attr)); \} \template <class T> \ObConfig ## param ## Item ## _ ## name& operator=(T value) \{ \common::ObConfig ## param ## Item::operator=(value); \return *this; \} \} name;
ObConfigTimeItemXxx 继承的是一个 ObConfigIntegralItem 类,所以 time 类型的数据本质上是一个 int64_t 变量。
class ObConfigTimeItem: public ObConfigIntegralItem
{
public:......static const uint64_t VALUE_BUF_SIZE = 32UL;char value_str_[VALUE_BUF_SIZE];char value_reboot_str_[VALUE_BUF_SIZE];
};
ObConfigIntegralItem 类重载了操作符 operator const int64_t &() const,在代码中访问该类对象的对象名时,实际上返回的是对象的 value_值。
class ObConfigIntegralItem: public ObConfigItem
{// get_value() return the real-time valueint64_t get_value() const { return value_; }// get() return the real-time value if it does not need reboot, otherwise it return initial_valueint64_t get() const { return value_; }operator const int64_t &() const { return value_; }private:int64_t value_;
配置项定义完成后,重新编译部署 OBServer,就可以对这个配置项进行查询和修改了。不过现在这个配置项没有任何功能,还需要在代码中用起来才能生效。
配置项初始化
集群配置项 ObServerConfig
集群配置项在 OBServer 启动时进行初始化,首先会从配置项的定义中获取默认值,然后从持久化配置文件中获取历史值(非首次启动),最后从 OBServer 的执行参数中获取最新值。它们的优先级是:执行参数 > 持久化配置文件 > 默认值。
配置项对象构造
集群配置项定义在 ObServerConfig 类中,通过在 ObConfigManager 类中定义了一个 ObServerConfig 实例,当 OBServer 启动时,它们及其成员变量的构造函数就会被调用。
class ObServerConfig : public ObCommonConfig
{
public:friend class ObServerMemoryConfig;int init(const ObSystemConfig &config);static ObServerConfig &get_instance();#undef OB_CLUSTER_PARAMETER
#define OB_CLUSTER_PARAMETER(args...) args
#include "share/parameter/ob_parameter_seed.ipp"
#undef OB_CLUSTER_PARAMETER
}
每个配置项都有默认值,在配置项的构造函数中会将 value_ 置为默认值。
以下是相关函数的调用流程,以及关键函数的代码解析。有的函数只是省略了参数列表,不代表没有参数。
- ObConfigTimeItem::ObConfigTimeItem()
- ObConfigIntegralItem::init()
- ObConfigItem::init()
- ObConfigItem::set_value(const char *str)
- ObConfigIntegralItem::set(const char *str)
其中 str 就是一开始定义的配置项默认值,将 str 中的字符串转化为对应类型的数据,然后赋值给 value_。
inline bool ObConfigIntegralItem::set(const char *str)
{bool valid = true;const int64_t value = parse(str, valid);if (valid) {value_ = value;}return valid;
}
配置文件解析
当集群配置项都构造完成后,再从持久化的配置文件中 etc/observer.config.bin 加载配置项。(在 OBServer 初次启动时,配置文件为空,所以不会执行这一步)
- ObServer::init()
- ObServer::init_config()
- ObConfigManager::load_config(const char *path)
- ObServerConfig::deserialize_with_compat()
- OB_DEF_DESERIALIZE(ObServerConfig)
先调用 ObCommonConfig::deserialize() 函数解析集群配置项,再调用 OTC_MGR.deserialize() 函数解析租户配置项。
OB_DEF_DESERIALIZE(ObServerConfig)
{} else if (OB_FAIL(ObCommonConfig::deserialize(buf, data_len, pos))) {LOG_ERROR("deserialize cluster config failed", K(ret));} else if (OB_FAIL(OTC_MGR.deserialize(buf, data_len, pos))){LOG_ERROR("deserialize tenant config failed", K(ret));}
- OB_DEF_DESERIALIZE(ObCommonConfig)
- ObCommonConfig::add_extra_config()
将文件中集群配置项的值加载到内存数据结构中。
int ObCommonConfig::add_extra_config(const char *config_str,int64_t version /* = 0 */ ,bool check_name /* = false */,bool check_unit /* = true */)
{......while (OB_SUCC(ret) && OB_NOT_NULL(token)) {if (strncmp(token, "enable_production_mode=", 23) == 0) {......} else if (OB_ISNULL(pp_item = container_.get(ObConfigStringKey(name)))) {......if (OB_FAIL(ret) || OB_ISNULL(pp_item)) {} else if (check_unit && !(*pp_item)->check_unit(value)) {ret = OB_INVALID_CONFIG;LOG_ERROR("Invalid config value", K(name), K(value), K(ret));} else if (!(*pp_item)->set_value(value)) {ret = OB_INVALID_CONFIG;LOG_ERROR("Invalid config value", K(name), K(value), K(ret));} else if (!(*pp_item)->check()) {ret = OB_INVALID_CONFIG;const char* range = (*pp_item)->range();if (OB_ISNULL(range) || strlen(range) == 0) {LOG_ERROR("Invalid config, value out of range", K(name), K(value), K(ret));} else {_LOG_ERROR("Invalid config, value out of %s (for reference only). name=%s, value=%s, ret=%d", range, name, value, ret);}} else {(*pp_item)->set_version(version);LOG_INFO("Load config succ", K(name), K(value));}
执行参数解析
当配置文件中的数据加载完成后,再解析 OBServer 执行命令中的配置项参数。
- ObServer::init_config()
其中 config_.add_extra_config(opts_.optstr_, start_time_) 函数将参数解析为配置项的值,然后加载到配置项结构中。最后调用 dump2file() 函数,将前面解析出来的配置项全部持久化到"etc/observer.config.bin"文件中。
int ObServer::init_config()
{int ret = OB_SUCCESS;bool has_config_file = true;// set dump pathconst char *dump_path = "etc/observer.config.bin";config_mgr_.set_dump_path(dump_path);if (OB_FILE_NOT_EXIST == (ret = config_mgr_.load_config())) {has_config_file = false;ret = OB_SUCCESS;}......if (opts_.optstr_ && strlen(opts_.optstr_) > 0) {if (OB_FAIL(config_.add_extra_config(opts_.optstr_, start_time_))) {LOG_ERROR("invalid config from cmdline options", K(opts_.optstr_), KR(ret));}}......if (is_arbitration_mode()) {// arbitration mode, dump config params to file directlyif (OB_FAIL(config_mgr_.dump2file())) {LOG_ERROR("config_mgr_ dump2file failed", KR(ret));} else {LOG_INFO("config_mgr_ dump2file success", KR(ret));}
如果是用 OBD 部署集群,OBServer 第一次启动时,命令中会带上"xxx.yaml"文件中的启动参数。例如:
/data/user/observer1/bin/observer -p 23400 -P 23401 -z zone1 -c 1 -d /data/user/observer1/store -i lo -r 127.0.0.1:23401:23400 -o __min_full_resource_pool_memory=268435456,major_freeze_duty_time=Disable,datafile_size=20G,memory_limit=10G,system_memory=5G,cpu_count=24,stack_size=512K,cache_wash_threshold=1G,workers_per_cpu_quota=10,schema_history_expire_time=1d,net_thread_count=4,minor_freeze_times=10,enable_separate_sys_clog=False,enable_merge_by_turn=False,syslog_io_bandwidth_limit=10G,enable_async_syslog=False
集群第一次部署完成之后,"etc/observer.config.bin"文件才会被创建,之后重启 OBServer 就可以不带参数了,进程会从该文件中读取修改后的配置项。
租户配置项 ObTenantConfig
创建租户时初始化
租户级别配置项首先在创建租户时进行初始化,在代码中是一个 ObTenantConfig 对象。
(不只这一条调用路径,但最终会调用 add_tenant_config() 函数)
- ObRpcNotifyTenantServerUnitResourceP::process()
- ObTenantNodeBalancer::notify_create_tenant(oceanbase::obrpc::TenantServerUnitConfig const&)
- ObTenantNodeBalancer::check_new_tenant(oceanbase::share::ObUnitInfoGetter::ObTenantConfig const&, long)
- ObMultiTenant::create_tenant(oceanbase::omt::ObTenantMeta const&, bool, long)
- ObTenantConfigMgr::add_tenant_config(uint64_t tenant_id)
申请一个新的 ObTenantConfig 对象,调用 init() 初始化,然后添加到 config_map_中。
int ObTenantConfigMgr::add_tenant_config(uint64_t tenant_id)
{int ret = OB_SUCCESS;ObTenantConfig *const *config = nullptr;DRWLock::WRLockGuard guard(rwlock_);if (is_virtual_tenant_id(tenant_id)|| OB_NOT_NULL(config = config_map_.get(ObTenantID(tenant_id)))) {if (nullptr != config) {ObTenantConfig *new_config = *config;new_config->set_deleting(false);}} else {ObTenantConfig *new_config = nullptr;new_config = OB_NEW(ObTenantConfig, SET_USE_UNEXPECTED_500("TenantConfig"), tenant_id);if (OB_NOT_NULL(new_config)) {if(OB_FAIL(new_config->init(this))) {LOG_WARN("new tenant config init failed", K(ret));} else if (OB_FAIL(config_map_.set_refactored(ObTenantID(tenant_id),new_config, 0))) {LOG_WARN("add new tenant config failed", K(ret));}......
因为 ObTenantConfig 引入了租户配置项的定义,因此在构造 ObTenantConfig 对象时就完成了配置项的构造。
class ObTenantConfig : public ObCommonConfig
{#undef OB_TENANT_PARAMETER
#define OB_TENANT_PARAMETER(args...) args
#include "share/parameter/ob_parameter_seed.ipp"
#undef OB_TENANT_PARAMETER
};
重启时初始化
在 OBServer 重新启动时,已有租户的配置项也会进行初始化。
- OB_DEF_DESERIALIZE(ObServerConfig)
调用 OTC_MGR.deserialize() 函数。
OB_DEF_DESERIALIZE(ObServerConfig)
{} else if (OB_FAIL(ObCommonConfig::deserialize(buf, data_len, pos))) {LOG_ERROR("deserialize cluster config failed", K(ret));} else if (OB_FAIL(OTC_MGR.deserialize(buf, data_len, pos))){LOG_ERROR("deserialize tenant config failed", K(ret));}
- OB_DEF_DESERIALIZE(ObTenantConfigMgr)
读取文件中租户配置项,根据 tenant_id 获取 config,然后将buf中的配置项解析到 config 中。
OB_DEF_DESERIALIZE(ObTenantConfigMgr)
{int ret = OB_SUCCESS;if (data_len == 0 || pos >= data_len) {} else {while(OB_SUCC(ret) && pos < data_len) {......ObTenantConfig *config = nullptr;if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {if (ret != OB_HASH_NOT_EXIST || OB_FAIL(add_tenant_config(tenant_id))) {LOG_ERROR("get tenant config failed", K(tenant_id), K(ret));break;}ret = config_map_.get_refactored(ObTenantID(tenant_id), config);}if (OB_SUCC(ret)) {pos = saved_pos;ret = config->deserialize(buf, data_len, pos);}......
在代码中查询配置项
外部查询
当用户使用"show ..."类型的命令时,OBServer 内部有统一的处理函数:ObShowResolver::resolve()。查询配置项的命令会被解析为 T_SHOW_PARAMETERS 类型,通过查询 __all_virtual_tenant_parameter_stat 表获取配置项的值。
int ObShowResolver::resolve(const ParseNode &parse_tree)
{......case T_SHOW_PARAMETERS: {......if (params_.show_seed_) {char local_ip[OB_MAX_SERVER_ADDR_SIZE] = "";if (OB_UNLIKELY(true != GCONF.self_addr_.ip_to_string(local_ip, sizeof(local_ip)))) {ret = OB_CONVERT_ERROR;} else {GEN_SQL_STEP_1(ObShowSqlSet::SHOW_PARAMETERS_SEED);GEN_SQL_STEP_2(ObShowSqlSet::SHOW_PARAMETERS_SEED, REAL_NAME(OB_SYS_DATABASE_NAME, OB_ORA_SYS_SCHEMA_NAME), REAL_NAME(OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_TNAME, OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_ORA_TNAME),local_ip, GCONF.self_addr_.get_port());}} else if (OB_SYS_TENANT_ID == show_tenant_id) {GEN_SQL_STEP_1(ObShowSqlSet::SHOW_PARAMETERS);GEN_SQL_STEP_2(ObShowSqlSet::SHOW_PARAMETERS,REAL_NAME(OB_SYS_DATABASE_NAME, OB_ORA_SYS_SCHEMA_NAME),REAL_NAME(OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_TNAME, OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_ORA_TNAME),show_tenant_id);} else {GEN_SQL_STEP_1(ObShowSqlSet::SHOW_PARAMETERS);GEN_SQL_STEP_2(ObShowSqlSet::SHOW_PARAMETERS,REAL_NAME(OB_SYS_DATABASE_NAME, OB_ORA_SYS_SCHEMA_NAME),REAL_NAME(OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_TNAME, OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_ORA_TNAME),show_tenant_id);}
}// 最终查询的虚拟表为 __all_virtual_tenant_parameter_stat
const char *const OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_TNAME = "__all_virtual_tenant_parameter_stat";
查询该虚拟表需要先初始化一个 ObAllVirtualTenantParameterStat 对象,然后调用 inner_open() 函数,再循环调用 inner_get_next_row() 函数遍历虚拟表。
- ObVirtualTableIterator::get_next_row(common::ObNewRow*&)
- ObAllVirtualTenantParameterStat::inner_get_next_row(common::ObNewRow*&)
其中 inner_sys_get_next_row() 函数获取集群配置项,inner_tenant_get_next_row() 函数获取租户配置项,最后根据查询条件进行筛选,返回满足条件的配置项。
int ObAllVirtualTenantParameterStat::inner_get_next_row(ObNewRow *&row)
{int ret = OB_SUCCESS;if (OB_UNLIKELY(!inited_)) {ret = OB_NOT_INIT;SERVER_LOG(WARN, "not inited", K(inited_), KR(ret));} else if (show_seed_) {ret = inner_seed_get_next_row(row);} else {if (OB_SUCC(inner_sys_get_next_row(row))) {} else if (OB_ITER_END == ret) {ret = inner_tenant_get_next_row(row);}}return ret;
}
- ObAllVirtualTenantParameterStat::inner_sys_get_next_row(common::ObNewRow*&)
从 GCONF 中获取集群配置项。
int ObAllVirtualTenantParameterStat::inner_sys_get_next_row(common::ObNewRow *&row)
{/*cluster parameter does not belong to any tenant*/return fill_row_(row, sys_iter_, GCONF.get_container(), NULL);
}// 集群配置项迭代器也就是 GCONF 中一个 hashmap 的迭代器sys_iter_ = GCONF.get_container().begin();
- ObAllVirtualTenantParameterStat::inner_tenant_get_next_row(common::ObNewRow *&row)
从 tenant_config_中获取租户配置项。
int ObAllVirtualTenantParameterStat::inner_tenant_get_next_row(common::ObNewRow *&row)
{// 租户配置项迭代器也就是 tenant_config_ 中一个 hashmap 的迭代器if (cur_tenant_idx_ < 0 // first come-in// current tenant is over|| (tenant_config_.is_valid() && tenant_iter_ == tenant_config_->get_container().end())) {// find next valid tenantwhile (OB_SUCC(ret) && ++cur_tenant_idx_ < tenant_id_list_.count()) {uint64_t tenant_id = tenant_id_list_.at(cur_tenant_idx_);tenant_config_.set_config(TENANT_CONF(tenant_id));if (tenant_config_.is_valid()) {tenant_iter_ = tenant_config_->get_container().begin();......}......} else {const uint64_t tenant_id = tenant_id_list_.at(cur_tenant_idx_);if (OB_FAIL(fill_row_(row,tenant_iter_,tenant_config_->get_container(),&tenant_id))) {SERVER_LOG(WARN, "fill row fail", KR(ret), K(tenant_id), K(tenant_config_->get_tenant_id()),K(cur_tenant_idx_), K(tenant_id_list_));}
内部获取
集群配置项
因为配置项重载了操作符 operator &(),所以集群配置项直接通过 GCONF.xxx 的形式访问即可。
#include "share/config/ob_server_config.h"GCONF.enable_sql_audit
在代码中有需要的地方,可以用“if (GCONF.enable_xxx)”来控制分支的走向,或者用“GCONF.xxx_time”来进行时间的计算,这样就可以把配置项使用起来了。
租户配置项
访问租户配置项需要先调用 OTC_MGR.read_tenant_config() 函数获取租户的 config,然后从 config 中获取指定的配置项。以租户配置项 max_stale_time_for_weak_consistency 为例,为其封装一个取值函数。
- ObWeakReadUtil::max_stale_time_for_weak_consistency(const uint64_t tenant_id, int64_t ignore_warn)
如果获取租户 config 成功,则从 config 中获取配置项的值,否则返回配置项的默认值并打印日志。
int64_t ObWeakReadUtil::max_stale_time_for_weak_consistency(const uint64_t tenant_id, int64_t ignore_warn)
{int64_t max_stale_time = 0;OTC_MGR.read_tenant_config(tenant_id,oceanbase::omt::ObTenantConfigMgr::default_fallback_tenant_id(),/* success */ [&max_stale_time](const omt::ObTenantConfig &config) mutable {max_stale_time = config.max_stale_time_for_weak_consistency;},/* failure */ [tenant_id, ignore_warn, &max_stale_time]() mutable {max_stale_time = DEFAULT_MAX_STALE_TIME_FOR_WEAK_CONSISTENCY;if (IGNORE_TENANT_EXIST_WARN != ignore_warn && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "tenant not exist when get max stale time for weak consistency,"" use default max stale time instead",K(tenant_id), K(max_stale_time), K(lbt()));}});return max_stale_time;
}
- ObTenantConfigMgr::read_tenant_config()
从 config_map_中获取 tenant_id 对应的config,成功则调用 SuccessFunctor,失败则调用 FailureFunctor。
int ObTenantConfigMgr::read_tenant_config(const uint64_t tenant_id,const uint64_t fallback_tenant_id,const SuccessFunctor &on_success,const FailureFunctor &on_failure) const
{int ret = OB_SUCCESS;ObTenantConfig *config = nullptr;DRWLock::RDLockGuard guard(rwlock_);if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {if (fallback_tenant_id > 0 && OB_INVALID_ID != fallback_tenant_id) {if (OB_FAIL(config_map_.get_refactored(ObTenantID(fallback_tenant_id), config))) {LOG_WARN("failed to get tenant config", K(fallback_tenant_id), K(ret), K(lbt()));}} else {LOG_WARN("failed to get tenant config", K(tenant_id), K(ret));}}if (OB_SUCC(ret) && OB_NOT_NULL(config)) {on_success(*config);} else {on_failure();LOG_WARN("fail read tenant config", K(tenant_id), K(ret));}return ret;
}
在代码中修改配置项
外部修改
修改集群配置项
系统租户执行修改集群配置项命令时,内部会向 __all_sys_parameter 表中插入一条记录(该表只记录增量数据),实际执行的是以下sql命令。
select config_version, zone, svr_type, svr_ip, svr_port, name, data_type, value, info, section, scope, source, edit_level from __all_sys_parameter
INSERT INTO __all_sys_parameter (zone, svr_type, svr_ip, svr_port, name, data_type, value, info, config_version, gmt_modified, section, scope, source, edit_level) VALUES ('', 'observer', 'ANY', 0, 'cpu_count', 'varchar', '9', '', 1689734424757370, usec_to_time(1689734424757370), 'OBSERVER', 'CLUSTER', 'DEFAULT', 'DYNAMIC_EFFECTIVE') ON DUPLICATE KEY UPDATE data_type = 'varchar', value = '9', info = '', config_version = 1689734424757370, gmt_modified = usec_to_time(1689734424757370), section = 'OBSERVER', scope = 'CLUSTER', source = 'DEFAULT', edit_level = 'DYNAMIC_EFFECTIVE'
select config_version, zone, svr_type, svr_ip, svr_port, name, data_type, value, info, section, scope, source, edit_level from __all_sys_parameter
select config_version, zone, svr_type, svr_ip, svr_port, name, data_type, value, info, section, scope, source, edit_level from __all_sys_parameter
将修改后的值插入内部表之后,修改配置项命令就执行完成了。之后内部会将表中的增量数据刷新到各节点的本地数据结构中,此时才算真正完成了集群配置项更新。
修改租户配置项
执行租户配置项修改命令后,内部会执行两条sql,先往 __tenant_parameter 内部表中插入一行数据(该表只记录增量数据),然后往 __all_rootservice_event_history 内部表中插入一条修改记录。
INSERT INTO __tenant_parameter (tenant_id, zone, svr_type, svr_ip, svr_port, name, data_type, value, info, config_version, gmt_modified, section, scope, source, edit_level) VALUES (1004, '', 'observer', 'ANY', 0, 'max_stale_time_for_weak_consistency', 'varchar', '7s', '', 1689732907024711, usec_to_time(1689732907024711), 'TENANT', 'TENANT', 'DEFAULT', 'DYNAMIC_EFFECTIVE') ON DUPLICATE KEY UPDATE data_type = 'varchar', value = '7s', info = '', config_version = 1689732907024711, gmt_modified = usec_to_time(1689732907024711), section = 'TENANT', scope = 'TENANT', source = 'DEFAULT', edit_level = 'DYNAMIC_EFFECTIVE'
INSERT INTO __all_rootservice_event_history (gmt_create, module, event, name1, value1, name2, value2, rs_svr_ip, rs_svr_port) VALUES (usec_to_time(1689732907031822), 'root_service', 'admin_set_config', 'ret', 0, 'arg', '{items:[{name:"max_stale_time_for_weak_consistency", value:"7s", comment:"", zone:"", server:"0.0.0.0:0", tenant_name:"", exec_tenant_id:1004, tenant_ids:[1004]}], is_inner:false}', '127.0.0.1', 23401
同样的,修改内部表之后SQL命令就会返回,之后再由后台线程刷新各节点的租户配置项。
内部更新(同步机制)
内部主动更新配置项,就是把 __tenant_parameter 和 __all_sys_parameter 内部表中的增量配置项同步到本地的过程。
集群配置项同步
- ObLeaseStateMgr::start_heartbeat()
后台任务 hb_ 每2s执行一次。
int ObLeaseStateMgr::start_heartbeat()
{int ret = OB_SUCCESS;if (!inited_) {ret = OB_NOT_INIT;LOG_WARN("not init", K(ret));} else {const bool repeat = false;if (OB_FAIL(hb_timer_.schedule(hb_, DELAY_TIME, repeat))) {LOG_WARN("schedule failed", LITERAL_K(DELAY_TIME), K(repeat), K(ret));}}return ret;
}static const int64_t DELAY_TIME = 2 * 1000 * 1000;//2s
- ObLeaseStateMgr::HeartBeat::runTimerTask()
......
- ObHeartBeatProcess::do_heartbeat_event(oceanbase::share::ObLeaseResponse const&)
- ObHeartBeatProcess::ObZoneLeaseInfoUpdateTask::runTimerTask()
......
- ObConfigManager::got_version(long, bool)
- ObConfigManager::UpdateTask::runTimerTask()
该函数是刷新配置项的后台定时任务,会调用 update_local() 函数,将内部表中的数据同步到本地配置项中。
void ObConfigManager::UpdateTask::runTimerTask()
{......} else if (update_local_) {config_mgr_->current_version_ = version;if (OB_FAIL(config_mgr_->system_config_.clear())) {// just print log, ignore retLOG_WARN("Clear system config map failed", K(ret));} else {// do nothing}if (OB_FAIL(config_mgr_->update_local(version))) {LOG_WARN("Update local config failed", K(ret));// recovery current_version_config_mgr_->current_version_ = old_current_version;// retry update local config in 1s laterif (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, *this, 1000 * 1000L, false))) {LOG_WARN("Reschedule update local config failed", K(ret));}
- ObConfigManager::update_local(int64_t expected_version)
该函数主要做了以下操作:
- sql_client_retry_weak.read():从 __all_sys_parameter 内部表中读取增量配置项;
- system_config_.update():将配置项的新值更新到本地;
- reload_config():重新加载和校验配置项;
- dump2file():将配置项同步到 observer.config.bin 文件中;
int ObConfigManager::update_local(int64_t expected_version)
{int ret = OB_SUCCESS;if (OB_ISNULL(sql_proxy_)) {ret = OB_NOT_INIT;LOG_WARN("sql proxy is null", K(ret));} else {ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);SMART_VAR(ObMySQLProxy::MySQLResult, result) {int64_t start = ObTimeUtility::current_time();const char *sqlstr = "select config_version, zone, svr_type, svr_ip, svr_port, name, ""data_type, value, info, section, scope, source, edit_level ""from __all_sys_parameter";if (OB_FAIL(sql_client_retry_weak.read(result, sqlstr))) {LOG_WARN("read config from __all_sys_parameter failed", K(sqlstr), K(ret));} else if (OB_FAIL(system_config_.update(result))) {LOG_WARN("failed to load system config", K(ret));......if (OB_SUCC(ret)) {if ('\0' == dump_path_[0]) {ret = OB_NOT_INIT;LOG_ERROR("Dump path doesn't set, stop read config", K(ret));} else if (OB_FAIL(server_config_.read_config())) {LOG_ERROR("Read server config failed", K(ret));} else if (OB_FAIL(reload_config())) {LOG_WARN("Reload configuration failed", K(ret));} else {DRWLock::RDLockGuard guard(OTC_MGR.rwlock_); // need protect tenant config because it will also serialize tenant configif (OB_FAIL(dump2file())) {LOG_WARN("Dump to file failed", K_(dump_path), K(ret));......
租户配置项同步
- ObLeaseStateMgr::HeartBeat::runTimerTask()
......
- ObTenantConfig::got_version(long, bool)
- ObTenantConfig::TenantConfigUpdateTask::runTimerTask()
租户配置项同样有一个后台定时任务,只要配置项的最新版本大于本地版本,就会触发更新操作。
void ObTenantConfig::TenantConfigUpdateTask::runTimerTask()
{int ret = OB_SUCCESS;if (OB_ISNULL(config_mgr_)) {ret = OB_NOT_INIT;LOG_WARN("invalid argument", K_(config_mgr), K(ret));} else if (OB_ISNULL(tenant_config_)){ret = OB_NOT_INIT;LOG_WARN("invalid argument", K_(tenant_config), K(ret));} else {const int64_t saved_current_version = tenant_config_->current_version_;const int64_t version = version_;THIS_WORKER.set_timeout_ts(INT64_MAX);if (tenant_config_->current_version_ >= version) {ret = OB_ALREADY_DONE;} else if (update_local_) {tenant_config_->current_version_ = version;if (OB_FAIL(tenant_config_->system_config_.clear())) {LOG_WARN("Clear system config map failed", K(ret));} else if (OB_FAIL(config_mgr_->update_local(tenant_config_->tenant_id_, version))) {LOG_WARN("ObTenantConfigMgr update_local failed", K(ret), K(tenant_config_));} else {config_mgr_->notify_tenant_config_changed(tenant_config_->tenant_id_);}
- ObTenantConfigMgr::update_local(uint64_t tenant_id, int64_t expected_version)
查询 __tenant_parameter 内部表获取当前租户的 config,然后将新的数据更新到 config中。
int ObTenantConfigMgr::update_local(uint64_t tenant_id, int64_t expected_version)
{SMART_VAR(ObMySQLProxy::MySQLResult, result) {if (OB_FAIL(sql.assign_fmt("select config_version, zone, svr_type, svr_ip, svr_port, name, ""data_type, value, info, section, scope, source, edit_level ""from %s where tenant_id = '%lu'", OB_TENANT_PARAMETER_TNAME, tenant_id))) {} else if (OB_FAIL(sql_client_retry_weak.read(result, exec_tenant_id, sql.ptr()))) {LOG_WARN("read config from __tenant_parameter failed",KR(ret), K(tenant_id), K(exec_tenant_id), K(sql));} else {DRWLock::WRLockGuard guard(rwlock_);ret = config_map_.get_refactored(ObTenantID(tenant_id), config);if (OB_FAIL(ret)) {LOG_ERROR("failed to get tenant config", K(tenant_id), K(ret));} else {ret = config->update_local(expected_version, result);}
- ObTenantConfig::update_local()
调用 system_config_.update() 函数将配置项更新到本地,然后调用 dump2file() 将配置项持久化到文件中。
int ObTenantConfig::update_local(int64_t expected_version, ObMySQLProxy::MySQLResult &result,bool save2file /* = true */)
{int ret = OB_SUCCESS;if (OB_FAIL(system_config_.update(result))) {LOG_WARN("failed to load system config", K(ret));if (OB_SUCC(ret)) {if (OB_FAIL(read_config())) {LOG_ERROR("Read tenant config failed", K_(tenant_id), K(ret));} else if (save2file && OB_FAIL(config_mgr_->dump2file())) {LOG_WARN("Dump to file failed", K(ret));
小结
新增配置项并不复杂,代码中已经实现了成熟的访问和同步机制,只需要使用合适的宏并填上一些参数就可以定义新配置项了,而后续如何使用这个配置项才是实现新功能的关键。在修改配置项的过程中,实际上还会进行一些合法性检查,这部分会在后面的文章中与系统变量一起进行说明。
本专题下一篇文章是关于“系统变量的定义和源码解析”,系统变量的机制有别于配置项,还有全局级和会话级的区分,感兴趣的同学欢迎继续关注。
参考资料
- 配置项总览
- OceanBase源码
相关文章:
OceanBase 配置项系统变量实现及应用详解(3):新增配置项的方法
本专题的第一篇文章,配置项的定义及使用方法,详细阐述了配置项的基础用法。对于那些对源码抱有浓厚兴趣的同学来说,或许还希望深入了解配置项的实现原理,甚至渴望亲自添加新的配置项,以满足个性化的功能需求。 本文通…...

PCI PTS 硬件安全模块(HSM)模块化安全要求 v5.0
符合条件的 PCI SSC 利益相关者在 30 天的意见征询 (RFC) 期间审查 PCI PTS 硬件安全模块 (HSM) 模块化安全要求 v5.0 草案并提供反馈。 PCI PTS 硬件安全模块(HSM)模块化安全要求 v5.0图 从 7 月 8 日到 8 月 8 日,邀请符合条件的 PCI SSC 利益相关者在 30 天的意见…...

javaweb中的请求与响应--基于postman工具的应用(附带postman的详细安装步骤)
一、前言 后端的第一天感觉难度就上来了,可能是基础太过薄弱了吧。目前看视频已经有点跟不上了,果然15天想要拿下还是太勉强了点。30天还差不多。不知道读者们有没有好好的去学这方面的知识,没有什么是学不会的,关键是坚持。 Po…...

StarRocks下载使用说明和基础操作
简介 StarRocks 是一款高性能分析型数据仓库,使用向量化、MPP 架构、CBO、智能物化视图、可实时更新的列式存储引擎等技术实现多维、实时、高并发的数据分析。StarRocks 既支持从各类实时和离线的数据源高效导入数据,也支持直接分析数据湖上各种格式的数…...
桥接模式案例
桥接模式(Bridge Pattern)是一种结构型设计模式,它将抽象部分与实现部分分离,使它们可以独立变化。桥接模式通过创 建一个桥接接口,将抽象部分和实现部分连接起来,从而实现两者的解耦。下面是一个详细的桥接…...

Spring源码二十二:Bean实例化流程五
上一篇Spring源码二十一:Bean实例化流程四,咱们主要分析里createBeanInstance方法Spring给我们提供给的FactoryMethod方法,举例说明了factoryMethod属性如何使用,同时简单讨论了具体实现逻辑。 这一篇咱们将进入反射实例化Bean&am…...
Unity3D中UI层级改变详解
在Unity3D开发中,UI层级的调整是常见的需求,它直接关系到用户界面(UI)元素的显示顺序。在Unity的UI系统中,主要使用UGUI(Unitys Graphical User Interface)来实现界面布局和元素展示。本文将详细讲解Unity3D中如何改变…...

centos安装数据库同步工具sqoop并导入数据,导出数据,添加定时任务
目录 1.安装jdk 1.1上传jdk安装包到/opt目录下并解压 1.2解压 1.3配置环境变量 2.安装hadoop 2.1.下载hadoop 2.2.解压hadoop 2.3配置环境变量 3.安装sqoop 3.1下载 3.2解压 3.3下载依赖包并复制到指定位置 3.3.1下载commons-lang-2.6-bin.tar.gz 3.3.2将mysql-c…...
asp .net core 避免请求body数据量过大
方法1, 全局避免 引入包 dotnet add package Microsoft.AspNetCore.Http.Features using Microsoft.AspNetCore.Http.Features;public void ConfigureServices(IServiceCollection services) {services.Configure<FormOptions>(options >{// 设置允许的最…...
搭建discuz论坛(lvs+nginx+http+mysql+nfs)8台服务器
搭建discuz论坛(lvsnginxhttpmysqlnfs) 一、IP规划 服务名IP地址服务LVS1192.168.100.110keepalivedipvsadmLVS2192.168.100.111keepalivedipvsadmnginx1192.168.100.113nginxnginx2192.168.100.114nginxnfs192.168.100.116nfs-utilweb1192.168.100.11…...

就业平台小程序的设计
管理员账户功能包括:系统首页,个人中心,学生管理,企业管理,企业类型管理,留言板管理,系统管理 微信端账号功能包括:系统首页,招聘信息,简历,我的 …...
hid-ft260驱动学习笔记 5 - ft260_i2c_probe
目录 1. 保存ft260_device到私有数据 2. 初始化I2C设备分配属性 3. 添加I2C适配器 4. 初始化GPIO 5. ft260_i2c_algo 5.1 ft260_functionality 5.2 ft260_i2c_xfer 5.3 ft260_smbus_xfer 6. ft260_i2c_quirks 这个函数是i2c的接口probe函数。 1. 保存ft260_device到私…...
Android上如何使用perfetto分析systrace
Android上如何使用perfetto分析systrace Perfetto 是一个用于性能分析的工具,提供了对 Android 系统内部工作情况的详细视图。它可以用来替代传统的 systrace 工具,提供更加全面的性能分析功能。以下是如何使用 Perfetto 分析 Systrace 数据的详细指南&…...

React Hooks学习笔记
一、usestate的使用方法-初始化state函数 import React, { useState } from "react"; function App() {const [count, setCount] useState(0);return (<div><p>点击{count}次</p><button onClick{() > setCount(count 1)}>点击</bu…...

BGP第二日
上图为今日所用拓扑 ,其中R1和R4,R3和R5为EBGP邻居,R1和R3为IBGP邻居,AS200区域做OSPF动态路由 一.BGP建立邻居的六种状态 1.idle 空闲状态:建立邻居最初的状态 2.Connect 连接状态:在…...

rabbitmq集群创建admin用户之后,提示can access virtual hosts是No access状态
问题描述: 因业务需要使用的rabbitmq是3.7.8版本的,rabbitmq在3.3.0之后就允许使用guest账号的权限了,所以需要创建一个administrator标签的用户。 如下操作创建的用户: 创建完成之后就提示如下的报错: 注:…...

ARM功耗管理之多核处理器启动
安全之安全(security)博客目录导读 思考:SecureBoot?多核处理器启动流程?PSCI启动方式? 一般嵌入式系统使用的都是对称多处理器(Symmetric Multi-Processor, SMP)系统,包含了多个cpu, 这几个cp…...

java使用easypoi模版导出word详细步骤
文章目录 第一步、引入pom依赖第二步、新建导出工具类WordUtil第三步、创建模版word4.编写接口代码5.导出结果示例 第一步、引入pom依赖 <dependency><groupId>cn.afterturn</groupId><artifactId>easypoi-spring-boot-starter</artifactId><…...
Android 内部保持数据的方式
Android内部保持数据的方式主要有五种,每种方式都有其特定的用途和优点。以下是详细的介绍: SQLite数据库 定义:SQLite是一个轻量级的、跨平台的数据库,所有的信息都存储在单一文件内,占用内存小,并且支持…...

uniapp 表格,动态表头表格封装渲染
1.接口表格数据: {"headers": [{"label": "实例名","name": "v1","order": 1,"hide": false,"dateTypeValue": null},{"label": "所属科室","name&quo…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业
6月9日,国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解,“超级…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...
【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)
1.获取 authorizationCode: 2.利用 authorizationCode 获取 accessToken:文档中心 3.获取手机:文档中心 4.获取昵称头像:文档中心 首先创建 request 若要获取手机号,scope必填 phone,permissions 必填 …...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...

免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...
【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案
目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后,迭代器会失效,因为顺序迭代器在内存中是连续存储的,元素删除后,后续元素会前移。 但一些场景中,我们又需要在执行删除操作…...
HybridVLA——让单一LLM同时具备扩散和自回归动作预测能力:训练时既扩散也回归,但推理时则扩散
前言 如上一篇文章《dexcap升级版之DexWild》中的前言部分所说,在叠衣服的过程中,我会带着团队对比各种模型、方法、策略,毕竟针对各个场景始终寻找更优的解决方案,是我个人和我司「七月在线」的职责之一 且个人认为,…...