DataSource的实现 怎样在MyBatis中实现DataSource
波波烤鸭 人气:0一、DataSource
首先大家要清楚DataSource属于MyBatis三层架构设计的基础层
然后我们来看看具体的实现。
在数据持久层中,数据源是一个非常重要的组件,其性能直接关系到整个数据持久层的性能,在实际开发中我们常用的数据源有 Apache Common DBCP,C3P0,Druid 等,MyBatis不仅可以集成第三方数据源,还提供的有自己实现的数据源。
在MyBatis中提供了两个 javax.sql.DataSource 接口的实现,分别是 PooledDataSource 和 UnpooledDataSource .
二、DataSourceFactory
DataSourceFactory是用来创建DataSource对象的,接口中声明了两个方法,作用如下
public interface DataSourceFactory { // 设置 DataSource 的相关属性,一般紧跟在初始化完成之后 void setProperties(Properties props); // 获取 DataSource 对象 DataSource getDataSource(); }
DataSourceFactory接口的两个具体实现是 UnpooledDataSourceFactory 和 PooledDataSourceFactory 这两个工厂对象的作用通过名称我们也能发现是用来创建不带连接池的数据源对象和创建带连接池的数据源对象,先来看下 UnpooledDataSourceFactory 中的方法
/** * 完成对 UnpooledDataSource 的配置 * @param properties 封装的有 DataSource 所需要的相关属性信息 */ @Override public void setProperties(Properties properties) { Properties driverProperties = new Properties(); // 创建 DataSource 对应的 MetaObject 对象 MetaObject metaDataSource = SystemMetaObject.forObject(dataSource); // 遍历 Properties 集合,该集合中配置了数据源需要的信息 for (Object key : properties.keySet()) { String propertyName = (String) key; // 获取属性名称 if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) { // 以 "driver." 开头的配置项是对 DataSource 的配置 String value = properties.getProperty(propertyName); driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value); } else if (metaDataSource.hasSetter(propertyName)) { // 有该属性的 setter 方法 String value = (String) properties.get(propertyName); Object convertedValue = convertValue(metaDataSource, propertyName, value); // 设置 DataSource 的相关属性值 metaDataSource.setValue(propertyName, convertedValue); } else { throw new DataSourceException("Unknown DataSource property: " + propertyName); } } if (driverProperties.size() > 0) { // 设置 DataSource.driverProperties 的属性值 metaDataSource.setValue("driverProperties", driverProperties); } }
UnpooledDataSourceFactory的getDataSource方法实现比较简单,直接返回DataSource属性记录的 UnpooledDataSource 对象
三、UnpooledDataSource
UnpooledDataSource 是 DataSource接口的其中一个实现,但是 UnpooledDataSource 并没有提供数据库连接池的支持,我们来看下他的具体实现吧
声明的相关属性信息
private ClassLoader driverClassLoader; // 加载Driver的类加载器 private Properties driverProperties; // 数据库连接驱动的相关信息 // 缓存所有已注册的数据库连接驱动 private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>(); private String driver; // 驱动 private String url; // 数据库 url private String username; // 账号 private String password; // 密码 private Boolean autoCommit; // 是否自动提交 private Integer defaultTransactionIsolationLevel; // 事务隔离级别 private Integer defaultNetworkTimeout;
然后在静态代码块中完成了 Driver的复制
static { // 从 DriverManager 中获取 Drivers Enumeration<Driver> drivers = DriverManager.getDrivers(); while (drivers.hasMoreElements()) { Driver driver = drivers.nextElement(); // 将获取的 Driver 记录到 Map 集合中 registeredDrivers.put(driver.getClass().getName(), driver); } }
UnpooledDataSource 中获取Connection的方法最终都会调用 doGetConnection() 方法。
private Connection doGetConnection(Properties properties) throws SQLException { // 初始化数据库驱动 initializeDriver(); // 创建真正的数据库连接 Connection connection = DriverManager.getConnection(url, properties); // 配置Connection的自动提交和事务隔离级别 configureConnection(connection); return connection; }
四、PooledDataSource
有开发经验的小伙伴都知道,在操作数据库的时候数据库连接的创建过程是非常耗时的,数据库能够建立的连接数量也是非常有限的,所以数据库连接池的使用是非常重要的,使用数据库连接池会给我们带来很多好处,比如可以实现数据库连接的重用,提高响应速度,防止数据库连接过多造成数据库假死,避免数据库连接泄漏等等。
首先来看下声明的相关的属性
// 管理状态 private final PoolState state = new PoolState(this); // 记录UnpooledDataSource,用于生成真实的数据库连接对象 private final UnpooledDataSource dataSource; // OPTIONAL CONFIGURATION FIELDS protected int poolMaximumActiveConnections = 10; // 最大活跃连接数 protected int poolMaximumIdleConnections = 5; // 最大空闲连接数 protected int poolMaximumCheckoutTime = 20000; // 最大checkout时间 protected int poolTimeToWait = 20000; // 无法获取连接的线程需要等待的时长 protected int poolMaximumLocalBadConnectionTolerance = 3; // protected String poolPingQuery = "NO PING QUERY SET"; // 测试的SQL语句 protected boolean poolPingEnabled; // 是否允许发送测试SQL语句 // 当连接超过 poolPingConnectionsNotUsedFor毫秒未使用时,会发送一次测试SQL语句,检测连接是否正常 protected int poolPingConnectionsNotUsedFor; // 根据数据库URL,用户名和密码生成的一个hash值。 private int expectedConnectionTypeCode;
然后重点来看下 getConnection 方法,该方法是用来给调用者提供 Connection 对象的。
@Override public Connection getConnection() throws SQLException { return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection(); }
我们会发现其中调用了 popConnection 方法,在该方法中 返回的是 PooledConnection 对象,而 PooledConnection 对象实现了 InvocationHandler 接口,所以会使用到Java的动态代理,其中相关的属性为
private static final String CLOSE = "close"; private static final Class<?>[] IFACES = new Class<?>[] { Connection.class }; private final int hashCode; private final PooledDataSource dataSource; // 真正的数据库连接 private final Connection realConnection; // 数据库连接的代理对象 private final Connection proxyConnection; private long checkoutTimestamp; // 从连接池中取出该连接的时间戳 private long createdTimestamp; // 该连接创建的时间戳 private long lastUsedTimestamp; // 最后一次被使用的时间戳 private int connectionTypeCode; // 又数据库URL、用户名和密码计算出来的hash值,可用于标识该连接所在的连接池 // 连接是否有效的标志 private boolean valid;
重点关注下invoke 方法
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (CLOSE.equals(methodName)) { // 如果是 close 方法被执行则将连接放回连接池中,而不是真正的关闭数据库连接 dataSource.pushConnection(this); return null; } try { if (!Object.class.equals(method.getDeclaringClass())) { // issue #579 toString() should never fail // throw an SQLException instead of a Runtime // 通过上面的 valid 字段来检测 连接是否有效 checkConnection(); } // 调用真正数据库连接对象的对应方法 return method.invoke(realConnection, args); } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } }
还有就是前面提到的 PoolState 对象,它主要是用来管理 PooledConnection 对象状态的组件,通过两个 ArrayList 集合分别管理空闲状态的连接和活跃状态的连接,定义如下:
protected PooledDataSource dataSource; // 空闲的连接 protected final List<PooledConnection> idleConnections = new ArrayList<>(); // 活跃的连接 protected final List<PooledConnection> activeConnections = new ArrayList<>(); protected long requestCount = 0; // 请求数据库连接的次数 protected long accumulatedRequestTime = 0; // 获取连接累计的时间 // CheckoutTime 表示应用从连接池中取出来,到归还连接的时长 // accumulatedCheckoutTime 记录了所有连接累计的CheckoutTime时长 protected long accumulatedCheckoutTime = 0; // 当连接长时间没有归还连接时,会被认为该连接超时 // claimedOverdueConnectionCount 记录连接超时的个数 protected long claimedOverdueConnectionCount = 0; // 累计超时时间 protected long accumulatedCheckoutTimeOfOverdueConnections = 0; // 累计等待时间 protected long accumulatedWaitTime = 0; // 等待次数 protected long hadToWaitCount = 0; // 无效连接数 protected long badConnectionCount = 0;
再回到 popConnection 方法中来看
private PooledConnection popConnection(String username, String password) throws SQLException { boolean countedWait = false; PooledConnection conn = null; long t = System.currentTimeMillis(); int localBadConnectionCount = 0; while (conn == null) { synchronized (state) { // 同步 if (!state.idleConnections.isEmpty()) { // 检测空闲连接 // Pool has available connection 连接池中有空闲的连接 conn = state.idleConnections.remove(0); // 获取连接 if (log.isDebugEnabled()) { log.debug("Checked out connection " + conn.getRealHashCode() + " from pool."); } } else {// 当前连接池 没有空闲连接 // Pool does not have available connection if (state.activeConnections.size() < poolMaximumActiveConnections) { // 活跃数没有达到最大连接数 可以创建新的连接 // Can create new connection 创建新的数据库连接 conn = new PooledConnection(dataSource.getConnection(), this); if (log.isDebugEnabled()) { log.debug("Created connection " + conn.getRealHashCode() + "."); } } else { // 活跃数已经达到了最大数 不能创建新的连接 // Cannot create new connection 获取最先创建的活跃连接 PooledConnection oldestActiveConnection = state.activeConnections.get(0); // 获取该连接的超时时间 long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); // 检查是否超时 if (longestCheckoutTime > poolMaximumCheckoutTime) { // Can claim overdue connection 对超时连接的信息进行统计 state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; // 将超时连接移除 activeConnections state.activeConnections.remove(oldestActiveConnection); if (!oldestActiveConnection.getRealConnection().getAutoCommit()) { // 如果超时连接没有提交 则自动回滚 try { oldestActiveConnection.getRealConnection().rollback(); } catch (SQLException e) { /* Just log a message for debug and continue to execute the following statement like nothing happened. Wrap the bad connection with a new PooledConnection, this will help to not interrupt current executing thread and give current thread a chance to join the next competition for another valid/good database connection. At the end of this loop, bad {@link @conn} will be set as null. */ log.debug("Bad connection. Could not roll back"); } } // 创建 PooledConnection,但是数据库中的真正连接并没有创建 conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp()); conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp()); // 将超时的 PooledConnection 设置为无效 oldestActiveConnection.invalidate(); if (log.isDebugEnabled()) { log.debug("Claimed overdue connection " + conn.getRealHashCode() + "."); } } else { // Must wait 无空闲连接,无法创建新连接和无超时连接 那就只能等待 try { if (!countedWait) { state.hadToWaitCount++; // 统计等待次数 countedWait = true; } if (log.isDebugEnabled()) { log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection."); } long wt = System.currentTimeMillis(); state.wait(poolTimeToWait); // 阻塞等待 // 统计累计的等待时间 state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { break; } } } } if (conn != null) { // ping to server and check the connection is valid or not // 检查 PooledConnection 是否有效 if (conn.isValid()) { if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } // 配置 PooledConnection 的相关属性 conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password)); conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); state.activeConnections.add(conn); state.requestCount++; // 进行相关的统计 state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection."); } state.badConnectionCount++; localBadConnectionCount++; conn = null; if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Could not get a good connection to the database."); } throw new SQLException("PooledDataSource: Could not get a good connection to the database."); } } } } }
为了更好的理解代码的含义,我们绘制了对应的流程图
然后我们来看下当我们从连接池中使用完成了数据库的相关操作后,是如何来关闭连接的呢?通过前面的 invoke 方法的介绍其实我们能够发现,当我们执行代理对象的 close 方法的时候其实是执行的 pushConnection 方法。
具体的实现代码为
protected void pushConnection(PooledConnection conn) throws SQLException { synchronized (state) { // 从 activeConnections 中移除 PooledConnection 对象 state.activeConnections.remove(conn); if (conn.isValid()) { // 检测 连接是否有效 if (state.idleConnections.size() < poolMaximumIdleConnections // 是否达到上限 && conn.getConnectionTypeCode() == expectedConnectionTypeCode // 该 PooledConnection 是否为该连接池的连接 ) { state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 累计 checkout 时长 if (!conn.getRealConnection().getAutoCommit()) { // 回滚未提交的事务 conn.getRealConnection().rollback(); } // 为返还连接创建新的 PooledConnection 对象 PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this); // 添加到 空闲连接集合中 state.idleConnections.add(newConn); newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); conn.invalidate(); // 将原来的 PooledConnection 连接设置为无效 if (log.isDebugEnabled()) { log.debug("Returned connection " + newConn.getRealHashCode() + " to pool."); } // 唤醒阻塞等待的线程 state.notifyAll(); } else { // 空闲连接达到上限或者 PooledConnection不属于当前的连接池 state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 累计 checkout 时长 if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.getRealConnection().close(); // 关闭真正的数据库连接 if (log.isDebugEnabled()) { log.debug("Closed connection " + conn.getRealHashCode() + "."); } conn.invalidate(); // 设置 PooledConnection 无线 } } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection."); } state.badConnectionCount++; // 统计无效的 PooledConnection 对象个数 } } }
为了便于理解,我们同样的来绘制对应的流程图:
还有就是我们在源码中多处有看到 conn.isValid方法来检测连接是否有效
public boolean isValid() { return valid && realConnection != null && dataSource.pingConnection(this); }
dataSource.pingConnection(this)中会真正的实现数据库的SQL执行操作
最后一点要注意的是在我们修改了任意的PooledDataSource中的属性的时候都会执行forceCloseAll来强制关闭所有的连接。
/** * Closes all active and idle connections in the pool. */ public void forceCloseAll() { synchronized (state) { // 更新 当前的 连接池 标识 expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword()); for (int i = state.activeConnections.size(); i > 0; i--) {// 处理全部的活跃连接 try { // 获取 获取的连接 PooledConnection conn = state.activeConnections.remove(i - 1); conn.invalidate(); // 标识为无效连接 // 获取真实的 数据库连接 Connection realConn = conn.getRealConnection(); if (!realConn.getAutoCommit()) { realConn.rollback(); // 回滚未处理的事务 } realConn.close(); // 关闭真正的数据库连接 } catch (Exception e) { // ignore } } // 同样的逻辑处理空闲的连接 for (int i = state.idleConnections.size(); i > 0; i--) { try { PooledConnection conn = state.idleConnections.remove(i - 1); conn.invalidate(); Connection realConn = conn.getRealConnection(); if (!realConn.getAutoCommit()) { realConn.rollback(); } realConn.close(); } catch (Exception e) { // ignore } } } if (log.isDebugEnabled()) { log.debug("PooledDataSource forcefully closed/removed all connections."); } }
加载全部内容