如何写一个读写分离中间件

发表于 2年以前  | 总阅读数:339 次

公司 DBA 一直埋怨 Atlas 的难用,希望从客户端层出一个读写分离的方案。开源市场上在客户端做读写分离的开源软件基本上没有。业务方利用 Spring 自带的路由数据源能实现部分读写分离的功能,但是功能不够完善。部分参考 Sharing-JDBC 源码思想,利用部分业余时间,写了这个 Robustdb,总共只使用了十多个类,两千多行代码左右。

一、背景

随着业务量的增长,所有公司都不是直接选择分库分表设计方案的。很长一段时间内,会采用 库垂直拆分和分区表 来解决库表数据量比较大的问题,采用读写分离来解决访问压力比较大的问题。我们公司也是一样。目前绝大部分业务还是使用读写分离的方案。我相信很多公司和我们公司的架构一样,采用中间代理层做读写分离。结构如下:

第一层是 VIP 曾。通过 VIP 做中间映射层,避免了应用绑定数据库的真实 IP,这样在数据库故障时,可以通过 VIP 飘移来将流量打到另一个库。但是 VIP 无法跨机房,为未来的异地多活设计埋下绕不过去的坎。

VIP 下面一层是读写分离代理,我们公司使用的是 360 的 Atlas。Atlas 通过将 SQL 解析为 DML(Data Modify Language)和 DQL(Data Query Language),DML 的请求全部发到主库,DQL 根据配置比例分发到读库(读库包括主库和从库)。

使用 Atlas 有以下不足:

  • Altas 不再维护更新,现存一些 bug,bug 网上很多描述;
  • Altas 中没有具体应用请求 IP 与具体数据库 IP 之间的映射数据,所以无法准确查到访问DB的请求是来自哪个应用;
  • Altas 控制的粒度是 SQL 语句,只能指定某条查询 SQL 语句走主库,不能根据场景指定;
  • DB 在自动关闭某个与 Altas 之间的连接时,Altas 不会刷新,它仍有可能把这个失效的连接给下次请求的应用使用;
  • 使用 Altas,对后期增加其他功能模会比较麻烦。

基于 Atlas 以上问题,以及我们需要将数据库账号和连接配置集中管控。我们设计了下面这套方案:

通过在客户端做读写分离可以解决 Atlas 上面存在的不足。整个流程如下图所示:

二、Robustdb 原理

1、读写分离设计核心点——路由

支持每条 SQL 按照 DML、DQL 类型的默认路由。

需求描述

目前公司采用读写分离的方案来增强数据库的性能,所有的 DML(insert、updata、delete)操作在主库,通过 MySQL 的 binlog 同步,将数据同步到多个读库。所有的 DQL(select) 操作主库或从库,从而增强数据的读能力。

支持方法级别的指定路由

需求描述

在 Service 中指定方法中所有 DB 操作方法操作同一个数据库(主要是主库),保证方法中的 DB 读写都操作主库,避免数据同步延迟导致读从库数据异常。从而保证整个方法的事务属性。

解决思路

我们将获取真实数据库(主库还是哪个从库)放到需要建立连接时的地方,为此我们创建了 BackendConnection(传统是先连接数据库,然后再创建连接)。

在获取数据库连接时,通过对请求的 SQL 进行解析和类型判别,识别为 DML 和 DQL。如果是 DML,则在线程的单 SQL 线程本地变量上设置为 master,DQL 则设置为 slave,为后续选择数据库提供选择参考。

如果要支持方法级别的事务(也就是整个方法的 SQL 请求都发送到主库),需要借助拦截器,我们采用的是 AspectJ 方式的拦截器。会拦截所有带有类型为 dataSourceType 的 annotation 的方法。在执行方法前,在线程的多 SQL 线程本地变量上设置 dataSourceType 的 name 值(name 值为 master 代表走主库,name 值为 slave 代表走从库)。线程的多 SQL 线程本地变量为后续选择数据库提供选择参考。在方法执行完后,清理本地线程变量。

多 SQL 线程本地变量的优先级高于单 SQL 线程本地变量的优先级。

注意点

本地线程变量要使用阿里包装的 Ttl,防止用户在方法内部启动线程池,导致普通的线程本地变量丢失,从而导致选库异常。

使用 Ttl 之后,需要在公司的 JVM 启动参数中增加

-javaagent:/{Path}/transmittable-thread-local-2.6.0-SNAPSHOT.jar

原理就是在 JVM 启动时,加载 transmittable-thread-local 中的类替换逻辑,将以后的 Runnable、Callable、ExecuteService 等线程池相关类替换成增强后的 TtlRunnable、TtlCallable、TtlExecuteService 等。

下面展示一下时序图中类的核心代码,仅供参考:

DataSoueceAspect


@Aspect
@Component
public class DataSourceAspect{
    @Around("execution(* *(..)) && @annotation(dataSourceType)")
    public Object aroundMethod(ProceedingJoinPoint pjd, DataSourceType dataSourceType) throws Throwable {      DataSourceContextHolder.setMultiSqlDataSourceType(dataSourceType.name());
        Object result = pjd.proceed();
        DataSourceContextHolder.clearMultiSqlDataSourceType();
        return result;
    }
}

BackendConnection


public final class BackendConnection extends AbstractConnectionAdapter {

    private AbstractRoutingDataSource abstractRoutingDataSource;

    // 用于缓存一条sql(可能对应多个statement)或者一次事务中的连接
    private final Map<String, Connection> connectionMap = new HashMap<String, Connection>();

    //构造函数
    public BackendConnection(AbstractRoutingDataSource abstractRoutingDataSource) {
        this.abstractRoutingDataSource = abstractRoutingDataSource;
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return getConnectionInternal(sql).prepareStatement(sql);
    }

    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        if(connectionMap == null || connectionMap.isEmpty()){
            return abstractRoutingDataSource.getResolvedDefaultDataSource().getConnection().getMetaData();
        }
        return fetchCachedConnection(connectionMap.keySet().iterator().next().toString()).get().getMetaData();
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
            throws SQLException {
        return getConnectionInternal(sql).prepareStatement(sql,resultSetType,resultSetConcurrency);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
            int resultSetHoldability) throws SQLException {
        return getConnectionInternal(sql).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return getConnectionInternal(sql).prepareStatement(sql, autoGeneratedKeys);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return getConnectionInternal(sql).prepareStatement(sql, columnIndexes);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return getConnectionInternal(sql).prepareStatement(sql, columnNames);
    }

    @Override
    protected Collection<Connection> getConnections() {
        return connectionMap.values();
    }

    /**
     * 根据sql获取连接,对连接进行缓存
     * @param sql
     * @return
     * @throws SQLException
     */
    private Connection getConnectionInternal(final String sql) throws SQLException {
        //设置线程环境遍历
        if (ExecutionEventUtil.isDML(sql)) {
            DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.MASTER);
        } else if (ExecutionEventUtil.isDQL(sql)) {
            DataSourceContextHolder.setSingleSqlDataSourceType(DataSourceType.SLAVE);
        }
        //根据上面设置的环境变量,选择相应的数据源
        Object dataSourceKey = abstractRoutingDataSource.determineCurrentLookupKey();
        String dataSourceName = dataSourceKey.toString();

        //看缓存中是否已经含有相应数据源的连接
        Optional<Connection> connectionOptional = fetchCachedConnection(dataSourceName);
        if (connectionOptional.isPresent()) {
            return connectionOptional.get();
        }
        //缓存中没有相应连接,建立相应连接,并放入缓存
        Connection connection = abstractRoutingDataSource.getTargetDataSource(dataSourceKey).getConnection();
        connection.setAutoCommit(super.getAutoCommit());
        connection.setTransactionIsolation(super.getTransactionIsolation());

        connectionMap.put(dataSourceKey.toString(), connection);

        return connection;
    }

    /**
     * 从缓存中取数据源
     * @param dataSourceName
     * @return
     */
    private Optional<Connection> fetchCachedConnection(final String dataSourceName) {
        if (connectionMap.containsKey(dataSourceName)) {
            return Optional.of(connectionMap.get(dataSourceName));
        }
        return Optional.absent();
    }

}

AbstractRoutingDataSource


/**
 * 
 * @Type AbstractRoutingDataSource
 * @Desc 数据源路由器(spring的AbstractRoutingDataSource将resolvedDataSources的注入放在bean初始化)
 * @Version V1.0
 */
public abstract class AbstractRoutingDataSource extends AbstractDataSource {

    private boolean lenientFallback = true;
    private Map<Object, Object> targetDataSources;
    private Object defaultTargetDataSource;
    private Map<Object, DataSource> resolvedDataSources = new HashMap<Object, DataSource>();
    private DataSource resolvedDefaultDataSource;
    private Logger logger = LoggerFactory.getLogger(AbstractRoutingDataSource.class);

    public BackendConnection getConnection() throws SQLException {
        return new BackendConnection(this);
    }

    public BackendConnection getConnection(String username, String password)
            throws SQLException {
        return new BackendConnection(this);
    }

    public void afterPropertiesSet() {
        if (this.targetDataSources == null) {
            throw new IllegalArgumentException("Property 'targetDataSources' is required");
        }
        this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size());
        for (Map.Entry entry : this.targetDataSources.entrySet()) {
            Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
            DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
            this.resolvedDataSources.put(lookupKey, dataSource);
        }
        if (this.defaultTargetDataSource != null) {
            this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
        }
    }

    public void putNewDataSource(Object key, DataSource dataSource){
        if(this.resolvedDataSources == null){
            this.resolvedDataSources = new HashMap<Object, DataSource>();
        }
        if(this.resolvedDataSources.containsKey(key)){
            this.resolvedDataSources.remove(key);
            logger.info("remove old key:" + key);
        }
        logger.info("add key:" + key + ", value=" + dataSource);
        this.resolvedDataSources.put(key, dataSource);
    }

    /**
     * 数据源选择逻辑
     */
    public DataSource determineTargetDataSource() {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");

        Object lookupKey = determineCurrentLookupKey();
        DataSourceContextHolder.clearSingleSqlDataSourceType();

        int index = 0;
        for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) {
            logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString());
            index++;
        }
        DataSource dataSource = this.resolvedDataSources.get(lookupKey);
        if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
        }
        logger.debug("myAbstractDS, hit DS is " + dataSource.toString());
        return dataSource;
    }

    public DataSource getTargetDataSource(Object lookupKey) {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");

        if(lookupKey == null){
            lookupKey = determineCurrentLookupKey();
        }
        DataSourceContextHolder.clearSingleSqlDataSourceType();

        int index = 0;
        for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) {
            logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString());
            index++;
        }
        DataSource dataSource = this.resolvedDataSources.get(lookupKey);
        if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
        }
        logger.debug("myAbstractDS, hit DS is " + dataSource.toString());
        return dataSource;
    }

    public abstract Object determineCurrentLookupKey();
    public abstract Object getCurrentSlaveKey();

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface));
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        if (iface.isInstance(this)){
            return (T) this;
        }
        return determineTargetDataSource().unwrap(iface);
    }

    protected Object resolveSpecifiedLookupKey(Object lookupKey) {
        return lookupKey;
    }

    protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
        if (dataSource instanceof DataSource) {
            return (DataSource) dataSource;
        }
        else {
            throw new IllegalArgumentException(
                    "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
        }
    }
    //get set方法省略
}

AbstractRoutingDataSource


/**
 * 
 * @Type AbstractRoutingDataSource
 * @Desc 数据源路由器(spring的AbstractRoutingDataSource将resolvedDataSources的注入放在bean初始化)
 * @Version V1.0
 */
public abstract class AbstractRoutingDataSource extends AbstractDataSource {

    private boolean lenientFallback = true;
    private Map<Object, Object> targetDataSources;
    private Object defaultTargetDataSource;
    private Map<Object, DataSource> resolvedDataSources = new HashMap<Object, DataSource>();
    private DataSource resolvedDefaultDataSource;
    private Logger logger = LoggerFactory.getLogger(AbstractRoutingDataSource.class);

    public BackendConnection getConnection() throws SQLException {
        return new BackendConnection(this);
    }

    public BackendConnection getConnection(String username, String password)
            throws SQLException {
        return new BackendConnection(this);
    }

    public void afterPropertiesSet() {
        if (this.targetDataSources == null) {
            throw new IllegalArgumentException("Property 'targetDataSources' is required");
        }
        this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size());
        for (Map.Entry entry : this.targetDataSources.entrySet()) {
            Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
            DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
            this.resolvedDataSources.put(lookupKey, dataSource);
        }
        if (this.defaultTargetDataSource != null) {
            this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
        }
    }

    public void putNewDataSource(Object key, DataSource dataSource){
        if(this.resolvedDataSources == null){
            this.resolvedDataSources = new HashMap<Object, DataSource>();
        }
        if(this.resolvedDataSources.containsKey(key)){
            this.resolvedDataSources.remove(key);
            logger.info("remove old key:" + key);
        }
        logger.info("add key:" + key + ", value=" + dataSource);
        this.resolvedDataSources.put(key, dataSource);
    }

    /**
     * 数据源选择逻辑
     */
    public DataSource determineTargetDataSource() {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");

        Object lookupKey = determineCurrentLookupKey();
        DataSourceContextHolder.clearSingleSqlDataSourceType();

        int index = 0;
        for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) {
            logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString());
            index++;
        }
        DataSource dataSource = this.resolvedDataSources.get(lookupKey);
        if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
        }
        logger.debug("myAbstractDS, hit DS is " + dataSource.toString());
        return dataSource;
    }

    public DataSource getTargetDataSource(Object lookupKey) {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");

        if(lookupKey == null){
            lookupKey = determineCurrentLookupKey();
        }
        DataSourceContextHolder.clearSingleSqlDataSourceType();

        int index = 0;
        for (Entry<Object, DataSource> element : resolvedDataSources.entrySet()) {
            logger.debug("myAbstractDS, index:" + index + ", key:" + element.getKey() + ", value:" + element.getValue().toString());
            index++;
        }
        DataSource dataSource = this.resolvedDataSources.get(lookupKey);
        if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
        }
        logger.debug("myAbstractDS, hit DS is " + dataSource.toString());
        return dataSource;
    }

    public abstract Object determineCurrentLookupKey();
    public abstract Object getCurrentSlaveKey();

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface));
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        if (iface.isInstance(this)){
            return (T) this;
        }
        return determineTargetDataSource().unwrap(iface);
    }

    protected Object resolveSpecifiedLookupKey(Object lookupKey) {
        return lookupKey;
    }

    protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
        if (dataSource instanceof DataSource) {
            return (DataSource) dataSource;
        }
        else {
            throw new IllegalArgumentException(
                    "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
        }
    }
    //get set方法省略
}

DataSourceContextHolder

public class DataSourceContextHolder {

    private static final TransmittableThreadLocal<String> singleSqlContextHolder = new TransmittableThreadLocal<String>();
    private static final TransmittableThreadLocal<String> multiSqlContextHolder = new TransmittableThreadLocal<String>();

    /**
     * @Description: 设置单条sql数据源类型
     * @param dataSourceType  数据库类型
     * @return void
     * @throws
     */
    public static void setSingleSqlDataSourceType(String dataSourceType) {
        singleSqlContextHolder.set(dataSourceType);
    }

    /**
     * @Description: 获取单条sql数据源类型
     * @param 
     * @return String
     * @throws
     */
    public static String getSingleSqlDataSourceType() {
        return singleSqlContextHolder.get();
    }

    /**
     * @Description: 清除单条sql数据源类型
     * @param 
     * @return void
     * @throws
     */
    public static void clearSingleSqlDataSourceType() {
        singleSqlContextHolder.remove();
    }

    /**
     * @Description: 设置多条sql数据源类型
     * @param dataSourceType  数据库类型
     * @return void
     * @throws
     */
    public static void setMultiSqlDataSourceType(String dataSourceType) {
        multiSqlContextHolder.set(dataSourceType);
    }

    /**
     * @Description: 获取多条sql数据源类型
     * @param 
     * @return String
     * @throws
     */
    public static String getMultiSqlDataSourceType() {
        return multiSqlContextHolder.get();
    }

    /**
     * @Description: 清除多条sql数据源类型
     * @param 
     * @return void
     * @throws
     */
    public static void clearMultiSqlDataSourceType() {
        multiSqlContextHolder.remove();
    }

    /**
     * 判断当前线程是否为使用从库为数据源. 最外层service有slave的aop标签  或者 service没有aop标签且单条sql为DQL
     * 
     * @return
     */
    public static boolean isSlave() {
        return "slave".equals(multiSqlContextHolder.get()) || (multiSqlContextHolder.get()==null && "slave".equals(singleSqlContextHolder.get())) ;
    }  
}

DynamicDataSource


public class DynamicDataSource extends AbstractRoutingDataSource implements InitializingBean{  

    private static final Logger logger = LoggerFactory.getLogger(DynamicDataSource.class); 
    private Integer slaveCount = 0;  

    // 轮询计数,初始为-1,AtomicInteger是线程安全的  
    private AtomicInteger counter = new AtomicInteger(-1); 

    // 记录读库的key  
    private List<Object> slaveDataSources = new ArrayList<Object>(0); 

    // slave库的权重
    private  Map<Object,Integer>  slaveDataSourcesWeight;

    private Object currentSlaveKey;

    public DynamicDataSource() {
        super();
    }

    /**
     * 构造函数
     * @param defaultTargetDataSource
     * @param targetDataSources
     * @param slaveDataSourcesWeight
     */
    public DynamicDataSource(Object defaultTargetDataSource, Map<Object,Object> targetDataSources, Map<Object,Integer> slaveDataSourcesWeight) {
        this.setResolvedDataSources(new HashMap<Object, DataSource>(targetDataSources.size()));
        for (Map.Entry<Object, Object> entry : targetDataSources.entrySet()) {
            DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
            this.putNewDataSource(entry.getKey(), dataSource);
        }
        if (defaultTargetDataSource != null) {
            this.setResolvedDefaultDataSource(resolveSpecifiedDataSource(defaultTargetDataSource));
        }
        this.setSlaveDataSourcesWeight(slaveDataSourcesWeight);
        this.afterPropertiesSet();
    }

    @Override  
    public Object determineCurrentLookupKey() {  
        // 使用DataSourceContextHolder保证线程安全,并且得到当前线程中的数据源key  
        if (DataSourceContextHolder.isSlave()) {  
            currentSlaveKey = getSlaveKey();  
            return currentSlaveKey;  
        }  
        //TODO
        Object key = "master";  
        return key;  
    }  

    @Override  
    public void afterPropertiesSet() {  
        try {  
            super.afterPropertiesSet();
            Map<Object, DataSource> resolvedDataSources = this.getResolvedDataSources();  
            //清空从库节点,重新生成
            slaveDataSources.clear();
            slaveCount = 0;
            for (Map.Entry<Object, DataSource> entry : resolvedDataSources.entrySet()) { 
                if(slaveDataSourcesWeight.get(entry.getKey())==null){
                    continue;
                }
                for(int i=0; i<slaveDataSourcesWeight.get(entry.getKey());i++){
                    slaveDataSources.add(entry.getKey());  
                    slaveCount++;
                } 
            }  
        } catch (Exception e) {  
            logger.error("afterPropertiesSet error! ", e);  
        }  
    }  

    /** 
     * 轮询算法实现 
     *  
     * @return 
     */  
    public Object getSlaveKey() {  
        if(slaveCount <= 0 || slaveDataSources == null || slaveDataSources.size() <= 0){
            return null;
        }
        Integer index = counter.incrementAndGet() % slaveCount;  
        if (counter.get() > 9999) { // 以免超出Integer范围  
            counter.set(-1); // 还原  
        }  
        return slaveDataSources.get(index);  
    }

    public Map<Object, Integer> getSlaveDataSourcesWeight() {
        return slaveDataSourcesWeight;
    }

    public void setSlaveDataSourcesWeight(Map<Object, Integer> slaveDataSourcesWeight) {
        this.slaveDataSourcesWeight = slaveDataSourcesWeight;
    }

    public Object getCurrentSlaveKey() {
        return currentSlaveKey;
    }
}

2、读库流量分配策略设计

我们所有的数据库连接都是管控起来的,包括每个库的流量配置都是支持动态分配的。

支持读库按不同比例承接读请求。通过配置页面动态调整应用的数据库连接以及比例,支持随机或者顺序的方式将流量分配到相应的读库中去。

这里我们使用的配置管理下发中心是我们公司自己开发的 gconfig,当然替换成开源的 diamond 或者 applo 也是可以的。

当接收到配管中心的调整指令,会动态更新应用数据源连接,然后更新 beanFactory 中的 datasource。核心函数如下:

/**
 * 更新beanFactory
 * @param properties
 */
public void refreshDataSource(String properties) {
    YamlDynamicDataSource dataSource;
    try {
        dataSource = new YamlDynamicDataSource(properties);
    } catch (IOException e) {
        throw new RuntimeException("convert datasource config failed!");
    }

    // 验证必须字段是否存在
    if (dataSource == null && dataSource.getResolvedDataSources() == null
            || dataSource.getResolvedDefaultDataSource() == null || dataSource.getSlaveDataSourcesWeight() == null) {
        throw new RuntimeException("datasource config error!");
    }
    ConcurrentHashMap<Object, DataSource> newDataSource = new ConcurrentHashMap<Object, DataSource>(
            dataSource.getResolvedDataSources());

    //更新数据源的bean
    DynamicDataSource dynamicDataSource = (DynamicDataSource) ((DefaultListableBeanFactory) beanFactory)
            .getBean(dataSourceName);
    dynamicDataSource.setResolvedDefaultDataSource(dataSource.getResolvedDefaultDataSource());
    dynamicDataSource.setResolvedDataSources(new HashMap<Object, DataSource>());//将数据源清空,重新添加
    for (Entry<Object, DataSource> element : newDataSource.entrySet()) {
        dynamicDataSource.putNewDataSource(element.getKey(), element.getValue());
    }
    dynamicDataSource.setSlaveDataSourcesWeight(dataSource.getSlaveDataSourcesWeight());
    dynamicDataSource.afterPropertiesSet();

}

三、性能

我们经过性能测试,发现 Robustdb 的性能在一定层度上比 Atlas 性能更好。压测结果如下:

参考

  • https://tech.meituan.com/mtddl.html
  • https://tech.meituan.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E9%AB%98%E5%8F%AF%E7%94%A8%E6%9E%B6%E6%9E%84%E7%9A%84%E6%BC%94%E8%BF%9B%E4%B8%8E%E8%AE%BE%E6%83%B3.html

本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/OqPcavESojpy920K5DIGtg

 相关推荐

刘强东夫妇:“移民美国”传言被驳斥

京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。

发布于:1年以前  |  808次阅读  |  详细内容 »

博主曝三大运营商,将集体采购百万台华为Mate60系列

日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。

发布于:1年以前  |  770次阅读  |  详细内容 »

ASML CEO警告:出口管制不是可行做法,不要“逼迫中国大陆创新”

据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。

发布于:1年以前  |  756次阅读  |  详细内容 »

抖音中长视频App青桃更名抖音精选,字节再发力对抗B站

今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。

发布于:1年以前  |  648次阅读  |  详细内容 »

威马CDO:中国每百户家庭仅17户有车

日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。

发布于:1年以前  |  589次阅读  |  详细内容 »

研究发现维生素 C 等抗氧化剂会刺激癌症生长和转移

近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。

发布于:1年以前  |  449次阅读  |  详细内容 »

苹果据称正引入3D打印技术,用以生产智能手表的钢质底盘

据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。

发布于:1年以前  |  446次阅读  |  详细内容 »

千万级抖音网红秀才账号被封禁

9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...

发布于:1年以前  |  445次阅读  |  详细内容 »

亚马逊股东起诉公司和贝索斯,称其在购买卫星发射服务时忽视了 SpaceX

9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。

发布于:1年以前  |  444次阅读  |  详细内容 »

苹果上线AppsbyApple网站,以推广自家应用程序

据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。

发布于:1年以前  |  442次阅读  |  详细内容 »

特斯拉美国降价引发投资者不满:“这是短期麻醉剂”

特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。

发布于:1年以前  |  441次阅读  |  详细内容 »

光刻机巨头阿斯麦:拿到许可,继续对华出口

据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。

发布于:1年以前  |  437次阅读  |  详细内容 »

马斯克与库克首次隔空合作:为苹果提供卫星服务

近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。

发布于:1年以前  |  430次阅读  |  详细内容 »

𝕏(推特)调整隐私政策,可拿用户发布的信息训练 AI 模型

据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。

发布于:1年以前  |  428次阅读  |  详细内容 »

荣耀CEO谈华为手机回归:替老同事们高兴,对行业也是好事

9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI操控无人机能力超越人类冠军

《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。

发布于:1年以前  |  423次阅读  |  详细内容 »

AI生成的蘑菇科普书存在可致命错误

近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。

发布于:1年以前  |  420次阅读  |  详细内容 »

社交媒体平台𝕏计划收集用户生物识别数据与工作教育经历

社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”

发布于:1年以前  |  411次阅读  |  详细内容 »

国产扫地机器人热销欧洲,国产割草机器人抢占欧洲草坪

2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。

发布于:1年以前  |  406次阅读  |  详细内容 »

罗永浩吐槽iPhone15和14不会有区别,除了序列号变了

罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。

发布于:1年以前  |  398次阅读  |  详细内容 »
 相关文章
Android插件化方案 5年以前  |  237231次阅读
vscode超好用的代码书签插件Bookmarks 2年以前  |  8065次阅读
 目录