Springboot使用Redis发布订阅自动更新缓存数据源
背景
当项目有很多数据源的时候,通常会在启动的时候就把数据源连接加载缓存上,当数据源进行变更后如何自动实时将缓存的数据源进行更新呢?如果是单个项目直接调接口方法就行了,但是涉及到分布式多个系统呢?
解决方案:
使用Redis轻量级消息队列,它可以实现实时通知,实时状态更新等功能,配合AOP实现自动更新数据源状态。
下面结合代码写一个使用示例:
1.首先创建数据源对象
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;/**** @author ws* @since 2022-08-12*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
@TableName("ed_datasource_info")
public class DatasourceInfo implements Serializable {private static final long serialVersionUID = 1L;@TableId(value = "id", type = IdType.AUTO)private Integer id;/*** 数据源编码*/@TableField("datasource_code")private String datasourceCode;/*** 数据源名称*/@TableField("datasource_name")private String datasourceName;/*** 数据源类型*/@TableField("datasource_type")private String datasourceType;/*** 类型 0:数据库 1:Rest-api*/@TableField("type")private Integer type;/*** 创建人*/@TableField("creator")private String creator;/*** 模式*/@TableField("schema_name")private String schemaName;@TableField("create_time")private Date createTime;@TableField("update_time")private Date updateTime;/*** 数据源连接信息*/@TableField("link_json")private String linkJson;}
2.初始化启动加载数据源
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.sztech.common.constant.DataSourceTypeEnum;
import com.sztech.entity.DatasourceInfo;
import com.sztech.service.DatasourceInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class DataSourceRecovery implements InitializingBean {@Resourceprivate DatasourceInfoService datasourceInfoService;@Overridepublic void afterPropertiesSet() throws Exception {refresh();}private void refresh() throws Exception{this.refresh(null);}public void refresh(String sourceCode){QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();queryWrapper.eq("type", DataSourceTypeEnum.DB.getKey());if(StringUtils.isNotBlank(sourceCode)){queryWrapper.eq("datasource_code",sourceCode);}List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);if(CollectionUtils.isEmpty(list)){return;}CountDownLatch countDownLatch = new CountDownLatch(list.size());for(DatasourceInfo datasourceInfo : list){new Thread(new ReadloadThread(datasourceInfo, countDownLatch)).start();}try {countDownLatch.await(1,TimeUnit.MINUTES);} catch (InterruptedException e) {log.error("数据源加载等待超时",e);}}/*** 多线程加载数据源,提高启动速度*/static class ReadloadThread implements Runnable {private DatasourceInfo datasourceInfo;private CountDownLatch countDownLatch;public ReadloadThread() {}public ReadloadThread(DatasourceInfo datasourceInfo,CountDownLatch countDownLatch) {this.datasourceInfo = datasourceInfo;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {DataSourceContext.setClientMap(datasourceInfo);DataSourceContext.setConfigMap(datasourceInfo.getDatasourceCode(),datasourceInfo);}catch (Exception e){log.error("datasource:{},加载失败",datasourceInfo.getDatasourceCode(),e);}finally {countDownLatch.countDown();}}}
}
3.创建DataSourceContext,用于数据源缓存数据源连接
import com.sztech.core.tool.DBTool;
import com.sztech.entity.DatasourceInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** User: wangsheng* Date: 2022-02-11* Time: 14:05*/
public class DataSourceContext {/*** 客户端缓存*/private final static Map<String, IClient> clientMap = new ConcurrentHashMap<>();/*** 数据源配置缓存*/private final static Map<String, DatasourceInfo> configMap = new ConcurrentHashMap<>();public static void setClientMap(DatasourceInfo datasourceInfo) {if(clientMap.containsKey(datasourceInfo.getDatasourceCode())){try {clientMap.get(datasourceInfo.getDatasourceCode()).close();}catch (Exception ignored){}}clientMap.put(datasourceInfo.getDatasourceCode(),DBTool.buildClient(datasourceInfo));}public static void setConfigMap(String key, DatasourceInfo datasourceInfo) {configMap.put(key, datasourceInfo);}public static void removeClientMap(String key) {if(clientMap.containsKey(key)){try {clientMap.get(key).close();}catch (Exception ignored){}}clientMap.remove(key);}public static void removeConfigMap(String key) {configMap.remove(key);}public static IClient getClientMap(String key) {IClient client = clientMap.get(key);if(null == client){throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key));}return client;}public static DatasourceInfo getConfigMap(String key) {DatasourceInfo datasourceInfo = configMap.get(key);if(null == datasourceInfo){throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key));}return datasourceInfo;}
}
package com.sztech.core.tool;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.Instance;
import com.sztech.common.constant.ResultEnum;
import com.sztech.common.exception.BizException;
import com.sztech.common.utils.ReflectionUtils;
import com.sztech.common.utils.SpringUtils;
import com.sztech.common.utils.ThreadPoolUtil;
import com.sztech.core.datasource.DataSourceContext;
import com.sztech.core.datasource.IClient;
import com.sztech.core.datasource.rdbms.RdbmsConfig;
import com.sztech.entity.*;
import com.sztech.pojo.dto.ColumnDto;
import com.sztech.pojo.dto.QueryTableDto;
import com.sztech.pojo.dto.TableDto;
import com.sztech.pojo.node.PartitionColumn;
import com.sztech.pojo.vo.*;
import com.sztech.service.CreateTableLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;/*** Description:* User: wangsheng* Date: 2022-08-12* Time: 16:59*/
@Slf4j
public class DBTool {/*** 建立客户端*/public static IClient buildClient(DatasourceInfo datasourceInfo) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);return client.open(datasourceInfo);}/*** 测试数据源** @return*/public static boolean testSource(DatasourceInfo datasourceInfo) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);return client.testSource(datasourceInfo);}public static List<String> getSchemas(DatasourceInfo datasourceInfo) {List<String> schemas = new ArrayList<>();Connection conn = null;try {IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);Class.forName(client.driverName());String linkJson = datasourceInfo.getLinkJson();RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);conn = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getDecodePassword());DatabaseMetaData metadata = conn.getMetaData();try (ResultSet resultSet = metadata.getSchemas()) {while (resultSet.next()) {String schemaName = resultSet.getString("TABLE_SCHEM");schemas.add(schemaName);}}} catch (SQLException e) {throw new RuntimeException(e);} catch (ClassNotFoundException e) {throw new RuntimeException(e);} finally {if (conn != null) {try {conn.close();} catch (SQLException ex) {ex.printStackTrace();}}}return schemas;}/*** 获取驱动名称*/public static String getDriverName(String datasourceType) {IClient client = ReflectionUtils.getInstanceFromCache(datasourceType, "type", IClient.class);return client.driverName();}/*** 获取表中列信息*/public static List<ColumnDto> getColumns(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getColumns(tableName);}/*** 获取表中分区列信息*/public static List<String> getPartitionColumns(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getPartitionColumns(tableName);}/*** 获取表信息*/public static List<String> getTableNames(String datasourceCode, String tableNameLike) {return DataSourceContext.getClientMap(datasourceCode).getTableNames(tableNameLike);}/*** 获取表信息*/public static List<TableDto> getTables(String datasourceCode) {return DataSourceContext.getClientMap(datasourceCode).getTables();}/*** 获取单个表信息*/public static TableDto getTableByName(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableByName(tableName);}/*** 获取单个表信息(创建时间,字段数)*/public static TableDto getTableField(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableField(tableName);}/*** 获取表信息(获取创建时间)** @param dto* @return*/public static TableInfoVo getTableData(QueryTableDto dto) {IClient client = DataSourceContext.getClientMap(dto.getDataSourceCode());return client.getTableInfo(dto.getTableName());}/*** 根据字段type建表*/public static void createTableByColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSql(columnDtos, tableName, true);log.info("执行建表语句为:" + JSON.toJSONString(sqls));sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));}/*** 根据字段type建表*/public static void createTableByNotTransformedColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSql(columnDtos, tableName, false);log.info("执行建表语句为:" + JSON.toJSONString(sqls));sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));}/*** 创建索引* 注: oracle 索引名在整个库里必须唯一 否则建立失败** @param datasourceCode 数据源编码* @param tableName 表名* @param filedNames filed1,filed2...* @param unique 唯一*/public static void createIndex(String datasourceCode, String tableName, String filedNames, Boolean unique) {DataSourceContext.getClientMap(datasourceCode).createIndex(tableName, filedNames, unique);}/*** sql校验** @param datasourceCode* @param sql* @param sourceType* @return*/public static Map<String, Object> checkSql(String datasourceCode, String sql, String sourceType) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.checkSql(sql, sourceType);}/*** 根据sql创建表** @param datasourceCode* @param sql*/public static void createTableWithSql(String datasourceCode, String sql) {IClient client = DataSourceContext.getClientMap(datasourceCode);log.info("执行建表语句为:" + JSON.toJSONString(sql));client.executeCommandSyn(sql, new HashMap<>());
// DataSourceContext.getClientMap(datasourceCode).createTableWithSql(sql);}/*** 删除表** @param datasourceCode* @param tableName*/public static void dropTable(String datasourceCode, String tableName) {DataSourceContext.getClientMap(datasourceCode).dropTable(tableName);}/*** 单表查询数据*/public static List<Map<String, Object>> selectDataFromTable(String datasourceCode, List<DataTableColumn> columns, String tableName, String search, Integer limit) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getSelectSql(columns, tableName, search, limit);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, null);}/*** 单表查询数据*/public static List<Map<String, Object>> selectFromTable(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getFormSelectSql(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 单表查询数据*/public static List<Map<String, Object>> selectFromForBackUp(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.selectFromForBackUp(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 单表查询数据*/public static List<Map<String, Object>> selectFromFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getFormSelectSqlForFile(columns, searchColumns, tableName, search, pageNum, pageSize, params);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, params);}/*** 查询单表是否存在文件名*/public static List<Map<String, Object>> getExistOldName(String datasourceCode, String tableName, String search) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getExistOldName( tableName, search);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, null);}/*** 单表查询数据(查询归集表专门使用)*/public static List<Map<String, Object>> selectCollectTable(CollectConditionVo vo) {IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());// 获取查询语句String selectSql = client.getCollectTable(vo);log.info("执行语句:" + selectSql);return client.selectDataFromTable(selectSql, vo.getParams());}/*** 单表查询数据量*/public static Map<String, Object> getFormCount(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getCountSql(columns, searchColumns, tableName, search, params);log.info("执行语句:" + selectSql);return client.getCount(selectSql, params);}/*** 查询区县库表的数据量*/public static Map<String, Object> getCountryCount(String datasourceCode, String tableName, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql ="select count(1) as count from "+tableName;log.info("执行语句:" + selectSql);return client.getCount(selectSql, params);}public static Map<String, Object> getFormCountForFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {IClient client = DataSourceContext.getClientMap(datasourceCode);// 获取查询语句String selectSql = client.getCountSqlForFile(columns, searchColumns, tableName, search, params);log.info("执行语句:" + selectSql);return client.getCount(selectSql, params);}/*** 查询表数据量*/public static Long getTableRows(String datasourceCode, String tableName) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getTableRows(tableName);}/*** 查询表对应分区数据量*/public static Long getTablePartitionRows(String datasourceCode, String tableName, List<PartitionColumn> partitionColumns) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getTablePartitionRows(tableName, partitionColumns);}/*** 查询表数据量*/public static Integer getTablePhysicalSize(String datasourceCode, String tableName) {IClient client = DataSourceContext.getClientMap(datasourceCode);return client.getPhysicalSize(tableName);}/*** 获取表最大值** @param datasourceCode 数据源编码* @param tableName 表名* @param incColumnName 自增列名* @return {@link Integer}*/public static Object getMaxValue(String datasourceCode, String tableName, String incColumnName, String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxValue(tableName, incColumnName, condition);}public static Object getMaxValue(String datasourceCode, String schema, String tableName, String incColumnName, String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxValue(schema, tableName, incColumnName, condition);}public static Object getMaxTime(String datasourceCode, String schema, String tableName, String incColumnName, String tongId,String condition) {return DataSourceContext.getClientMap(datasourceCode).getMaxTime(schema, tableName, incColumnName,tongId, condition);}/*** 字段存在** @param datasourceCode 数据源编码* @param tableName 表名* @param fieldName 字段名* @return {@link Boolean}*/public static Boolean fieldExist(String datasourceCode, String tableName, String fieldName) {List<ColumnDto> columns = getColumns(datasourceCode, tableName);return columns.stream().anyMatch(s -> s.getName().equalsIgnoreCase(fieldName));}/*** 数据预览 获取前十条** @return*/public static String dataView(String datasourceCode, String tableName, String condition) {return DataSourceContext.getClientMap(datasourceCode).dataView(tableName, condition);}/*** 创建分区临时表* odps适用*/public static void createPartitionedTableByColumns(List<ColumnDto> columnDtos, String tableName, String tableComment, String partitionedField, String datasourceCode) {DataSourceContext.getClientMap(datasourceCode).createPartitionedTableByColumns(columnDtos, tableName, tableComment, partitionedField);}/*** 同步执行命令*/public static void executeCommandSyn(String datasourceCode, String command, Map<String, Object> params) {DataSourceContext.getClientMap(datasourceCode).executeCommandSyn(command, params);}/*** 异步执行命令* odps适用*/public static Instance executeCommandASyn(String datasourceCode, String command, Map<String, Object> params) {return DataSourceContext.getClientMap(datasourceCode).executeCommandASyn(command, params);}/*** 是否有导出权限* odps适用** @param datasourceCode 数据源编码* @param tableName 表名* @return {@link Boolean}*/public static Boolean exportEnable(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).exportEnable(tableName);}/*** 插入单条数据** @param datasourceCode* @param vo* @return*/public static Integer insert(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).insert(vo);}/*** 批量插入数据** @param datasourceCode* @param vo* @return*/public static Integer[] betchInsert(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsert(vo);}/*** 批量插入数据** @param datasourceCode* @param vo* @return*/public static Integer[] betchInsertByConnection(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsertByConnection(vo);}/*** 这个方法不需要分装参数,直接传字段名称list就好了* @param datasourceCode* @param vo* @return*/public static Integer[] betchInsertForCommom(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).betchInsertForCommom(vo);}/*** 删除数据** @param datasourceCode* @param vo* @return*/public static Integer delete(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).delete(vo);}/*** 这个删除方法可以自定义条件服号* @param datasourceCode* @param vo* @return*/public static Integer deleteForCommon(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForCommon(vo);}public static Integer deleteForFile(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForFile(vo);}public static String deleteForPre(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).deleteForPre(vo);}/*** 修改数据** @param datasourceCode* @param vo* @return*/public static Integer update(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).update(vo);}/*** 修改数据** @param datasourceCode* @param vo* @return*/public static Integer updateForFile(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).updateForFile(vo);}/*** 获取表单基本信息** @param vo* @return*/public static TableMetaDataVo getTableBasicInfo(String datasourceCode, FormTableVo vo) {return DataSourceContext.getClientMap(datasourceCode).getTableBasicInfo(vo);}/*** 根据字段type建表*/public static void createCollectTable(List<CatalogColumnInfo> columnDtos, String tableName, String datasourceCode, String tableComment, Boolean ifPartition) {IClient client = DataSourceContext.getClientMap(datasourceCode);List<String> sqls = client.buildTableSqlForCollect(columnDtos, tableName, tableComment, ifPartition);log.info("执行建表语句为:" + JSON.toJSONString(sqls));try {sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));} catch (Exception e) {e.printStackTrace();String message = e.getMessage();if (e instanceof BizException) {BizException exception = (BizException) e;message = exception.getMsg();}log.error("建表错误=======================>{}:", message);ThreadPoolExecutor instance = ThreadPoolUtil.instance();String finalMessage = message;instance.submit(() -> {CreateTableLog createTableLog = new CreateTableLog();createTableLog.setErrorLog(finalMessage);createTableLog.setParams(JSON.toJSONString(sqls));createTableLog.setCode(tableName);CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);createTableLogService.save(createTableLog);});throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员");}}/*** 根据字段type建表*/public static void updateCollectTable(CreateCollectVo vo) {IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());List<String> sqls = client.buildTableSqlForUpdate(vo);log.info("执行更新表语句为:" + JSON.toJSONString(sqls));try {sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));} catch (Exception e) {e.printStackTrace();String message = e.getMessage();if (e instanceof BizException) {BizException exception = (BizException) e;message = exception.getMsg();}log.error("建表错误=======================>{}:", message);ThreadPoolExecutor instance = ThreadPoolUtil.instance();String finalMessage = message;instance.submit(() -> {CreateTableLog createTableLog = new CreateTableLog();createTableLog.setErrorLog(finalMessage);createTableLog.setParams(JSON.toJSONString(sqls));createTableLog.setCode(vo.getTableName());CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);createTableLogService.save(createTableLog);});log.info("建表失败了开始准备抛出了-------------------------------------->");throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员");}}/*** 获取数据源下所有表信息(包括表名,表字段数,表创建时间)** @param datasourceCode* @param tableNameLike* @return*/public static List<TableDto> getTablesDetail(String datasourceCode, String tableNameLike, Integer start, Integer pageSize, String specifyTableName) {return DataSourceContext.getClientMap(datasourceCode).getTablesDetail(tableNameLike, start, pageSize, specifyTableName);}/*** 获取表数量* @param datasourceCode* @param tableName* @return*/public static Long getTableCountSchema(String datasourceCode, String tableName) {return DataSourceContext.getClientMap(datasourceCode).getTableCountSchema(tableName);}public static Integer getTableColumnCount(String dataSourceCode, String tableName) {return DataSourceContext.getClientMap(dataSourceCode).getTableColumnCount(tableName);}public static Integer getPreTableColumnCount(String dataSourceCode, String tableName) {return DataSourceContext.getClientMap(dataSourceCode).getPreTableColumnCount(tableName);}/*** 获取符号* @return*/public static String getSymbol(String datasourceCode) {return DataSourceContext.getClientMap(datasourceCode).getSymbol();}}
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;@Slf4j
public class ReflectionUtils {private static final Map<String, Set<?>> clazzMap = new ConcurrentHashMap<>();private static final ReentrantLock clazzLock = new ReentrantLock();/*** 通过反射获取接口/抽象类的所有实现类* 通过缓存类信息减少查找时间* 接口与抽象类必须放在实现类的同级目录或者父目录*/@SuppressWarnings("unchecked")public static <T> Set<Class<? extends T>> getReflections(Class<T> clazz) {if (clazzMap.containsKey(clazz.getName())) {return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());}try {clazzLock.lock();if (clazzMap.containsKey(clazz.getName())) {return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());}Reflections reflections = new Reflections(clazz.getPackage().getName());Set<Class<? extends T>> subTypesOf = reflections.getSubTypesOf(clazz);clazzMap.put(clazz.getName(), subTypesOf);return subTypesOf;} catch (Exception e) {log.error("getReflections error", e);} finally {clazzLock.unlock();}return new HashSet<>();}/*** 通过反射获取新对象* @param type type* @param methodName methodName* @param clazz clazz* @return <T>*/public static <T> T getInstance(String type, String methodName, Class<T> clazz) {Set<Class<? extends T>> set = getReflections(clazz);for (Class<? extends T> t : set) {try {//排除抽象类if (Modifier.isAbstract(t.getModifiers())) {continue;}Object obj = t.getMethod(methodName).invoke(t.newInstance());if (type.equalsIgnoreCase(obj.toString())) {return t.newInstance();}} catch (Exception e) {log.error("getInstance error", e);}}throw new RuntimeException("implement class not exist");}/*** 通过反射获取新对象* @param type type* @param methodName methodName* @param clazz clazz* @return <T>*/public static <T> T getInstanceFromCache(String type, String methodName, Class<T> clazz) {return getInstance(type, methodName, clazz);}}
client客户接口端适配多种数据源
import com.ws.websocket.entity.DatasourceInfo;/*** Description:* User: wangsheng* Date: 2022-12-30* Time: 10:31*/
public interface IClient {/*** 连接数据源** @param dataSourceInfo 数据源信息* @return {@link IClient}*/IClient open(DatasourceInfo dataSourceInfo);/*** 关闭数据源*/void close();/*** 驱动类型** @return*/String driverName();/*** 数据源类型** @return {@link String}*/String type();/*** 测试数据源** @param datasourceInfo* @return*/boolean testSource(DatasourceInfo datasourceInfo);}
import com.ws.websocket.entity.DatasourceInfo;
//公共查询
public abstract class AbsClient implements IClient {protected DatasourceInfo datasourceInfo;
}
package com.ws.websocket.util;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;@Slf4j
public abstract class AbsRdbmsClient extends AbsClient {protected DruidDataSource druidDataSource;@Overridepublic IClient open(DatasourceInfo datasourceInfo) {RdbmsConfig rdbmsConfig = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);DruidDataSource druidDataSource = new DruidDataSource();druidDataSource.setInitialSize(5);druidDataSource.setMinIdle(30);druidDataSource.setMaxActive(300);druidDataSource.setMaxWait(10000);druidDataSource.setBreakAfterAcquireFailure(true);// 跳出重试循环druidDataSource.setConnectionErrorRetryAttempts(3);// 重试三次druidDataSource.setTimeBetweenConnectErrorMillis(3000);druidDataSource.setLoginTimeout(3);druidDataSource.setUrl(rdbmsConfig.getJdbcUrl());druidDataSource.setDriverClassName(driverName());druidDataSource.setUsername(rdbmsConfig.getUsername());//解密// druidDataSource.setPassword(RsaUtils.decode(rdbmsConfig.getPassword()));druidDataSource.setPassword(rdbmsConfig.getPassword());// 设置 MetaUtil 工具类所需参数Properties properties = new Properties();properties.put("remarks", "true");properties.put("useInformationSchema", "true");druidDataSource.setConnectProperties(properties);this.druidDataSource = druidDataSource;this.datasourceInfo = datasourceInfo;return this;}@Overridepublic void close() {druidDataSource.close();}@Overridepublic boolean testSource(DatasourceInfo datasourceInfo) {Connection connection = null;try {Class.forName(driverName());String linkJson = datasourceInfo.getLinkJson();RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);connection = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getPassword());// 有效if (connection.isValid(3)) {return true;} else {return false;}} catch (SQLException e) {log.error("数据源测试失败", e);return false;} catch (ClassNotFoundException e) {log.error("未找到驱动信息:{}", driverName());return false;} finally {if (connection != null) {try {connection.close();} catch (SQLException ex) {ex.printStackTrace();}}}}@Dataclass RdbmsConfig {private String jdbcUrl;private String username;private String password;public void setSSL() {String lowerCase = this.jdbcUrl.toLowerCase();if (!lowerCase.contains("usessl")) {if (this.jdbcUrl.contains("?")) {this.jdbcUrl = this.jdbcUrl + "&useSSL=false";} else {this.jdbcUrl = this.jdbcUrl + "?useSSL=false";}}}}
}
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;@Slf4j
public class DmClient extends AbsRdbmsClient {private String schema;@Overridepublic String type() {return "DMDB";}@Overridepublic String driverName() {return "dm.jdbc.driver.DmDriver";}@Overridepublic IClient open(DatasourceInfo datasourceInfo) {RdbmsConfig commonLinkParams = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);this.schema = StringUtils.isNotBlank(datasourceInfo.getSchemaName()) ? datasourceInfo.getSchemaName() : commonLinkParams.getUsername().toUpperCase();datasourceInfo.setSchemaName(schema);return super.open(datasourceInfo);}@Overridepublic void close() {}@Overridepublic boolean testSource(DatasourceInfo datasourceInfo) {return false;}
}
4.创建redis订阅数据源操作频道配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;/*** @Author: wangsheng* @Data: 2022/8/16 16:40*/
@Slf4j
@Configuration
public class RedisListenerConfig {/*** 订阅数据源操作频道** @param connectionFactory connectionFactory* @param dataSourceMonitor 数据源监视器* @return RedisMessageListenerContainer*/@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,DataSourceMonitor dataSourceMonitor){RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(dataSourceMonitor, new PatternTopic("DATASOURCE_CHANNEL"));log.info(dataSourceMonitor.getClass().getName() + " 订阅频道 {}", "DATASOURCE_CHANNEL");return container;}
}
5.redis监听数据源操作
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;/*** Description: redis监听数据源操作* User: wangsheng* Date: 2022-08-12* Time: 17:07*/
@Slf4j
@Component
public class DataSourceMonitor implements MessageListener {@Overridepublic void onMessage(Message message, byte[] bytes) {JSONObject box = JSONObject.parseObject(new String(message.getBody(), StandardCharsets.UTF_8));String operation = box.getString("key");if ("SAVE_OR_UPDATE".equals(operation)) {// 更新 DataSourceContextDatasourceInfo datasourceInfo = box.getObject("value", DatasourceInfo.class);if (datasourceInfo.getType().equals(0)) {String datasourceCode = datasourceInfo.getDatasourceCode();DataSourceContext.setConfigMap(datasourceCode, datasourceInfo);DataSourceContext.setClientMap(datasourceInfo);log.info("redis 监听到数据源 {} 新增或更新,更新 DataSourceContext 完成", datasourceCode);}} else {String datasourceCode = box.getString("value");// 更新 DataSourceContextDataSourceContext.removeConfigMap(datasourceCode);DataSourceContext.removeClientMap(datasourceCode);log.info("redis 监听到数据源 {} 删除,更新 DataSourceContext 完成", datasourceCode);}}}
6.创建AOP自动监听数据源变化
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** @Author: wangsheng* @Data: 2022/8/15 16:37*/
@Slf4j
@Aspect
@Component
public class DatasourceAspect {@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 新增或编辑数据源时发布 Redis 消息*/@AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.saveOrUpdateDatasourceInfo(..))", returning = "datasourceInfo")public void saveOrUpdate(JoinPoint joinPoint, DatasourceInfo datasourceInfo) {HashMap<String, Object> box = new HashMap<>(4);box.put("key", "SAVE_OR_UPDATE");box.put("value", datasourceInfo);// 发布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL",JSONObject.toJSONString(box));log.info("新增或更新数据源 {} 方法切面发布 Redis 消息完成", datasourceInfo.getDatasourceCode());}/*** 删除数据源时发布 Redis 消息*/@AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.deleteDatasourceInfo(..))", returning = "datasourceCode")public void delete(JoinPoint joinPoint, String datasourceCode) {Map<String, Object> box = new HashMap<>(4);box.put("key", "DELETE");box.put("value", datasourceCode);// 发布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));log.info("删除数据源 {} 方法切面发布Redis消息完成", datasourceCode);}
}
这样就解决了数据源连接信息自动加载更新同步的问题,但还是有个问题,当数据源重启后,缓存的连接信息会失效,且AOP无法监听到数据源重启变动,这个时候还需要一个定时任务对数据源进行连接测试,如果失效则重新连接缓存上。
7.创建定时任务
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ws.websocket.entity.DatasourceInfo;
import com.ws.websocket.service.DatasourceInfoService;
import com.ws.websocket.util.DBTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;@Component
@RequiredArgsConstructor
@Slf4j
public class DataSourceRetryConnectSchedule {@Resourceprivate DatasourceInfoService datasourceInfoService;@Resourceprivate StringRedisTemplate stringRedisTemplate;//每2小时执行一次@Scheduled(cron = "0 0 */2 * * ?")public void RetryConnect() {log.info("开始监测数据源连接");QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();queryWrapper.eq("type", 0);List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);if (CollectionUtils.isEmpty(list)) {return;}for (DatasourceInfo datasourceInfo : list) {Boolean bb = DBTool.testSource(datasourceInfo);if (!bb) {log.info("数据源重连{}"+datasourceInfo.getDatasourceName());HashMap<String, Object> box = new HashMap<>(4);box.put("key", "SAVE_OR_UPDATE");box.put("value", datasourceInfo);// 发布 Redis 消息stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));}}}
}
相关文章:
Springboot使用Redis发布订阅自动更新缓存数据源
背景 当项目有很多数据源的时候,通常会在启动的时候就把数据源连接加载缓存上,当数据源进行变更后如何自动实时将缓存的数据源进行更新呢?如果是单个项目直接调接口方法就行了,但是涉及到分布式多个系统呢? 解决方案…...

rust学习六、简单的struct结构
一、结构定义 struct-翻译为结构/结构体 总体上有两种定义方式:带有详细属性名的;不带属性名(元组) 从工程角度出发,并不推荐不带属性的定义方式,因为不友好。希望rust后面不要搞类似好像很友好ÿ…...
.NET周刊【2月第2期 2025-02-09】
国内文章 开箱即用的.NET MAUI组件库 V-Control 发布了! https://www.cnblogs.com/jevonsflash/p/18701494 文章介绍了V-Control,一个适用于.NET MAUI的组件库。作者计划将其开源,强调.NET MAUI是生产力强的跨平台移动开发工具。V-Control提供多种组件…...

Linux的基础指令和环境部署,项目部署实战(下)
目录 上一篇:Linxu的基础指令和环境部署,项目部署实战(上)-CSDN博客 1. 搭建Java部署环境 1.1 apt apt常用命令 列出所有的软件包 更新软件包数据库 安装软件包 移除软件包 1.2 JDK 1.2.1. 更新 1.2.2. 安装openjdk&am…...

【分布式理论15】分布式调度1:分布式资源调度的由来与过程
文章目录 一、操作系统的资源调度:从单核到多核二、 分布式系统的资源调度:从单台服务器到集群三、 固定资源映射四、 动态资源分配:灵活的任务-资源匹配五、 资源调度过程:从申请到执行 本文主要讨论主题: 从操作系统…...
Python常见面试题的详解12
1. hasattr ()、getattr ()、setattr () 函数是如何使用的? 要点 这三个函数用于对对象的属性进行检查、获取和设置操作,是 Python 中进行对象属性动态操作的重要工具。 hasattr():用于检查对象是否具有指定属性或方法。 getattr()&#x…...
未来AI方向落地场景:小语言模型,super_private_agent
未来AI方向落地场景:小语言模型,super_private_agent 目录 未来AI方向落地场景:小语言模型,super_private_agent小语言模型super - private - agent(注重隐私的智能代理)碳基生命和硅基生命交互界面面向agent的专用交互协议和数据接口从web平台经济到网络平台举例说明社交…...
使用 PyTorch 实现标准卷积神经网络(CNN)
卷积神经网络(CNN)是深度学习中的重要组成部分,广泛应用于图像处理、语音识别、视频分析等任务。在这篇博客中,我们将使用 PyTorch 实现一个标准的卷积神经网络(CNN),并介绍各个部分的作用。 什…...
开题报告——基于Spring Boot的垃圾分类预约回收系统
关于本科毕业设计(论文)开题报告的规定 为切实做好本科毕业设计(论文)的开题报告工作,保证论文质量,特作如下规定: 一、开题报告是本科毕业设计(论文)的必经过程,所有本科生在写作毕业设计(论文)之前都必须作开题报告。 二、开题报告主要检验学生对专业知识的驾驭能…...
YOLOv5 目标检测优化:降低误检与漏检
1. 引言 在目标检测任务中,误检(False Positive, FP)和漏检(False Negative, FN)是影响检测性能的两个主要问题。误检意味着模型检测到了不存在的目标,而漏检则指模型未能检测到真实存在的目标。本文将介绍…...

网络安全治理模型
0x02 知识点 安全的目标是提供 可用性 Avialability机密性 confidentiality完整性 Integrity真实性 Authenticity不可否认性 Nonrepudiation 安全治理是一个提供监督、问责和合规性的框架 信息安全系统 Information Security Management System ISMS 策略,工作程…...

网络原理-
文章目录 协议应用层传输层网络层 数据链路层 协议 在网络通信中,协议是非常重要的概念.协议就是一种约定. 在网络通信过程中,对协议进行了分层 接下来就按照顺序向大家介绍每一种核心的协议. 应用层 应用层是咱们程序员打交道最多的一层协议.应用层里有很多现成的协议,但…...
HTML/CSS中交集选择器
1.作用:选中同时符合多个条件的元素 交集就是或的意思 2.语法:选择器1选择器2选择器3......选择器n{} 3.举例: /* 选中:类名为beauty的p元素,此种写法用的非常的多 */p.beauty{color: red;}/* 选中:类名包含rich和beauty的元素 */.rich.beauty{color: blue;} 4.注意: 1.有标签…...
机器学习(1)安装Pytorch
1.安装命令 pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 2.安装过程Log: Looking in indexes: https://download.pytorch.org/whl/cu118 Co…...
Spring Boot过滤器链:从入门到精通
文章目录 一、过滤器链是什么?二、为什么需要过滤器链?三、Spring Boot中的过滤器链是如何工作的?(一)过滤器的生命周期(二)过滤器链的执行流程 四、如何在Spring Boot中定义自己的过滤器&#…...

vue3之echarts3D圆柱
vue3之echarts3D圆柱 效果: 版本 "echarts": "^5.1.2" 核心代码: <template><div ref"charts" class"charts"></div><svg><linearGradient id"labColor" x1"0&q…...
Redux中间件redux-thunk和redux-saga的具体区别是什么?
Redux 中间件是增强 Redux 功能的重要工具,redux-thunk 和 redux-saga 是两个常用的中间件,它们在处理异步操作和副作用时提供了不同的方式和理念。以下是两者的具体区别: 1. 概念与设计理念 redux-thunk 简洁:redux-thunk 是一…...
代码随想录算法训练营第四十三天| 动态规划06
322. 零钱兑换 如果求组合数就是外层for循环遍历物品,内层for遍历背包。 如果求排列数就是外层for遍历背包,内层for循环遍历物品。 这句话结合本题 大家要好好理解。 视频讲解:动态规划之完全背包,装满背包最少的物品件数是多少&…...

UI自动化教程 —— 元素定位技巧:精确找到你需要的页面元素
引言 在UI自动化测试中,准确地定位页面元素是至关重要的。无论是点击按钮、填写表单还是验证页面内容,都需要首先找到相应的页面元素。Playwright 提供了多种方法来实现这一点,包括使用CSS选择器和XPath进行元素定位,以及利用文本…...
MySQL六大日志的功能介绍。
前言 首先,MySQL的日志应该包括二进制日志(Binary Log)、错误日志(Error Log)、查询日志(General Query Log)、慢查询日志(Slow Query Log)、重做日志(Redo …...

C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...

关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...

ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
uniapp中使用aixos 报错
问题: 在uniapp中使用aixos,运行后报如下错误: AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...