The design in the source code is not as complicated as you think. If you don’t believe me, take a look at the implementation of MyBatis DataSource

In this article, we introduce the design and implementation of DataSource in MyBatis. It's actually quite easy.

DataSource

First of all, everyone must be clear that DataSource belongs to the basic layer of MyBatis's three-tier architecture design,

Insert picture description here


and then let's take a look at the specific implementation.
In the data persistence layer, the data source is a very important component, and its performance is directly related to the performance of the entire data persistence layer. In actual development, we commonly use data sources such as Apache Common DBCP, C3P0, Druid, etc., MyBatis can not only integrate Third-party data sources also provide data sources that have their own implementation.

Two implementations of javax.sql.DataSource interface are provided in MyBatis, namely PooledDataSource and UnpooledDataSource.

Insert picture description here

1 DataSourceFactory

DataSourceFactory is used to create DataSource objects. Two methods are declared in the interface. The functions are as follows

public interface DataSourceFactory {
  // 设置 DataSource 的相关属性,一般紧跟在初始化完成之后
  void setProperties(Properties props);

  // 获取 DataSource 对象
  DataSource getDataSource();

}

Two specific implementations of the DataSourceFactory interface are UnpooledDataSourceFactory and PooledDataSourceFactory. The role of these two factory objects can also be found by the name that they are used to create data source objects without connection pools and create data source objects with connection pools. Let’s take a look first. Methods in UnpooledDataSourceFactory

  /**
   * 完成对 UnpooledDataSource 的配置
   * @param properties 封装的有 DataSource 所需要的相关属性信息
   */
  @Override
  public void setProperties(Properties properties) {
    Properties driverProperties = new Properties();
    // 创建 DataSource 对应的 MetaObject 对象
    MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
    // 遍历 Properties 集合,该集合中配置了数据源需要的信息
    for (Object key : properties.keySet()) {
      String propertyName = (String) key; // 获取属性名称
      if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
        // 以 "driver." 开头的配置项是对 DataSource 的配置
        String value = properties.getProperty(propertyName);
        driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
      } else if (metaDataSource.hasSetter(propertyName)) {
        // 有该属性的 setter 方法
        String value = (String) properties.get(propertyName);
        Object convertedValue = convertValue(metaDataSource, propertyName, value);
        // 设置 DataSource 的相关属性值
        metaDataSource.setValue(propertyName, convertedValue);
      } else {
        throw new DataSourceException("Unknown DataSource property: " + propertyName);
      }
    }
    if (driverProperties.size() > 0) {
      // 设置 DataSource.driverProperties 的属性值
      metaDataSource.setValue("driverProperties", driverProperties);
    }
  }

​ The getDataSource method of UnpooledDataSourceFactory is relatively simple to implement, directly returning the UnpooledDataSource object recorded by the DataSource property

2 UnpooledDataSource

UnpooledDataSource is one of the implementations of the DataSource interface, but UnpooledDataSource does not provide database connection pool support. Let’s take a look at his specific implementation.

Declared related attribute information

  private ClassLoader driverClassLoader; // 加载Driver的类加载器
  private Properties driverProperties; // 数据库连接驱动的相关信息
  // 缓存所有已注册的数据库连接驱动
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();

  private String driver; // 驱动
  private String url; // 数据库 url
  private String username; // 账号
  private String password; // 密码

  private Boolean autoCommit; // 是否自动提交
  private Integer defaultTransactionIsolationLevel; // 事务隔离级别
  private Integer defaultNetworkTimeout;

​ Then the copy of Driver is completed in the static code block

  static {
    // 从 DriverManager 中获取 Drivers
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      // 将获取的 Driver 记录到 Map 集合中
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }

​ The method of obtaining Connection in UnpooledDataSource will eventually call the doGetConnection() method.

  private Connection doGetConnection(Properties properties) throws SQLException {
      // 初始化数据库驱动
      initializeDriver();
    // 创建真正的数据库连接
    Connection connection = DriverManager.getConnection(url, properties);
    // 配置Connection的自动提交和事务隔离级别
    configureConnection(connection);
    return connection;
  }

3 PooledDataSource

Friends with development experience know that the process of creating a database connection when operating a database is very time-consuming, and the number of connections that can be established by the database is also very limited, so the use of database connection pools is very important. Use database connections Pools will bring us many benefits, such as the ability to reuse database connections, improve response speed, prevent excessive database connections from causing database suspension, and avoid database connection leaks.

First look at the related attributes declared

  // 管理状态
  private final PoolState state = new PoolState(this);

  // 记录UnpooledDataSource,用于生成真实的数据库连接对象
  private final UnpooledDataSource dataSource;

  // OPTIONAL CONFIGURATION FIELDS
  protected int poolMaximumActiveConnections = 10; // 最大活跃连接数
  protected int poolMaximumIdleConnections = 5; // 最大空闲连接数
  protected int poolMaximumCheckoutTime = 20000; // 最大checkout时间
  protected int poolTimeToWait = 20000; // 无法获取连接的线程需要等待的时长
  protected int poolMaximumLocalBadConnectionTolerance = 3; //
  protected String poolPingQuery = "NO PING QUERY SET"; // 测试的SQL语句
  protected boolean poolPingEnabled; // 是否允许发送测试SQL语句
  // 当连接超过 poolPingConnectionsNotUsedFor毫秒未使用时,会发送一次测试SQL语句,检测连接是否正常
  protected int poolPingConnectionsNotUsedFor;
 // 根据数据库URL,用户名和密码生成的一个hash值。
  private int expectedConnectionTypeCode;

Then focus on the getConnection method, which is used to provide the caller with a Connection object.

  @Override
  public Connection getConnection() throws SQLException {
    return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
  }

We will find that the popConnection method is called. In this method, the PooledConnection object is returned, and the PooledConnection object implements the InvocationHandler interface, so Java's dynamic proxy will be used, and the relevant attributes are

  private static final String CLOSE = "close";
  private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };

  private final int hashCode;
  private final PooledDataSource dataSource;
  //  真正的数据库连接
  private final Connection realConnection;
  //  数据库连接的代理对象
  private final Connection proxyConnection;
  private long checkoutTimestamp; // 从连接池中取出该连接的时间戳
  private long createdTimestamp; // 该连接创建的时间戳
  private long lastUsedTimestamp; // 最后一次被使用的时间戳
  private int connectionTypeCode; // 又数据库URL、用户名和密码计算出来的hash值,可用于标识该连接所在的连接池
  // 连接是否有效的标志
  private boolean valid;

Focus on the invoke method

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    if (CLOSE.equals(methodName)) {
      // 如果是 close 方法被执行则将连接放回连接池中,而不是真正的关闭数据库连接
      dataSource.pushConnection(this);
      return null;
    }
    try {
      if (!Object.class.equals(method.getDeclaringClass())) {
        // issue #579 toString() should never fail
        // throw an SQLException instead of a Runtime
        // 通过上面的 valid 字段来检测 连接是否有效
        checkConnection();
      }
      // 调用真正数据库连接对象的对应方法
      return method.invoke(realConnection, args);
    } catch (Throwable t) {
      throw ExceptionUtil.unwrapThrowable(t);
    }

  }

There is also the aforementioned PoolState object, which is mainly used to manage the state of the PooledConnection object. It manages the connection in the idle state and the connection in the active state through two ArrayList collections, which are defined as follows:

  protected PooledDataSource dataSource;
  // 空闲的连接
  protected final List<PooledConnection> idleConnections = new ArrayList<>();
  // 活跃的连接
  protected final List<PooledConnection> activeConnections = new ArrayList<>();
  protected long requestCount = 0; // 请求数据库连接的次数
  protected long accumulatedRequestTime = 0; // 获取连接累计的时间
  // CheckoutTime 表示应用从连接池中取出来,到归还连接的时长
  // accumulatedCheckoutTime 记录了所有连接累计的CheckoutTime时长
  protected long accumulatedCheckoutTime = 0;
  // 当连接长时间没有归还连接时,会被认为该连接超时
  // claimedOverdueConnectionCount 记录连接超时的个数
  protected long claimedOverdueConnectionCount = 0;
  // 累计超时时间
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  // 累计等待时间
  protected long accumulatedWaitTime = 0;
  // 等待次数
  protected long hadToWaitCount = 0;
  // 无效连接数
  protected long badConnectionCount = 0;

Let's go back to the popConnection method

  private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      synchronized (state) { // 同步
        if (!state.idleConnections.isEmpty()) { // 检测空闲连接
          // Pool has available connection 连接池中有空闲的连接
          conn = state.idleConnections.remove(0); // 获取连接
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {// 当前连接池 没有空闲连接
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) { // 活跃数没有达到最大连接数 可以创建新的连接
            // Can create new connection 创建新的数据库连接
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else { // 活跃数已经达到了最大数 不能创建新的连接
            // Cannot create new connection 获取最先创建的活跃连接
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            // 获取该连接的超时时间
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            // 检查是否超时
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // Can claim overdue connection  对超时连接的信息进行统计
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              // 将超时连接移除 activeConnections
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                // 如果超时连接没有提交 则自动回滚
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              // 创建 PooledConnection,但是数据库中的真正连接并没有创建
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              // 将超时的 PooledConnection 设置为无效
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // Must wait  无空闲连接,无法创建新连接和无超时连接 那就只能等待
              try {
                if (!countedWait) {
                  state.hadToWaitCount++; // 统计等待次数
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait); // 阻塞等待
                // 统计累计的等待时间
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // ping to server and check the connection is valid or not
          // 检查 PooledConnection 是否有效
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            // 配置 PooledConnection 的相关属性
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++; // 进行相关的统计
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

In order to better understand the meaning of the code, we drew the corresponding flowchart

Insert picture description here


and then let's look at how to close the connection when we use the database from the connection pool to complete the related operations? Through the introduction of the previous invoke method, we can actually find that when we execute the close method of the proxy object, it is actually the pushConnection method executed.

Insert picture description here


The specific implementation code is

  protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) {
      // 从 activeConnections 中移除 PooledConnection 对象
      state.activeConnections.remove(conn);
      if (conn.isValid()) { // 检测 连接是否有效
        if (state.idleConnections.size() < poolMaximumIdleConnections // 是否达到上限
            && conn.getConnectionTypeCode() == expectedConnectionTypeCode // 该 PooledConnection 是否为该连接池的连接
        ) {
          state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 累计 checkout 时长
          if (!conn.getRealConnection().getAutoCommit()) { // 回滚未提交的事务
            conn.getRealConnection().rollback();
          }
          // 为返还连接创建新的 PooledConnection 对象
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          // 添加到 空闲连接集合中
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          conn.invalidate(); // 将原来的 PooledConnection 连接设置为无效
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          // 唤醒阻塞等待的线程
          state.notifyAll();
        } else { // 空闲连接达到上限或者 PooledConnection不属于当前的连接池
          state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 累计 checkout 时长
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.getRealConnection().close(); // 关闭真正的数据库连接
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          conn.invalidate(); // 设置 PooledConnection 无线
        }
      } else {
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++; // 统计无效的 PooledConnection 对象个数
      }
    }
  }

In order to facilitate understanding, we also draw the corresponding flowchart:

Insert picture description here

Also, we have seen the conn.isValid method in many places in the source code to check whether the connection is valid

  public boolean isValid() {
    return valid && realConnection != null && dataSource.pingConnection(this);
  }

dataSource.pingConnection(this) will truly implement the SQL execution of the database

Insert picture description here


The last point to note is that when we modify the properties of any PooledDataSource, forceCloseAll will be executed to force all connections to be closed.

Insert picture description here
  /**
   * Closes all active and idle connections in the pool.
   */
  public void forceCloseAll() {
    synchronized (state) {
      // 更新 当前的 连接池 标识
      expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      for (int i = state.activeConnections.size(); i > 0; i--) {// 处理全部的活跃连接
        try {
          // 获取 获取的连接
          PooledConnection conn = state.activeConnections.remove(i - 1);
          conn.invalidate(); // 标识为无效连接
          // 获取真实的 数据库连接
          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback(); // 回滚未处理的事务
          }
          realConn.close(); // 关闭真正的数据库连接
        } catch (Exception e) {
          // ignore
        }
      }
      // 同样的逻辑处理空闲的连接
      for (int i = state.idleConnections.size(); i > 0; i--) {
        try {
          PooledConnection conn = state.idleConnections.remove(i - 1);
          conn.invalidate();

          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
    }
    if (log.isDebugEnabled()) {
      log.debug("PooledDataSource forcefully closed/removed all connections.");
    }
  }

~~ Well, the content of the DataSource module will be introduced to everyone here, if it is helpful to you, welcome to like, follow and add to favorites