Scala数据库连接池的简单实现
ncqingchuan1976 人气:0在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。如果没有可用连接,则可以在一定时间内等待,直到队列中有可用的连接,否则将抛出异常。
以下是DataSoucre的代码,DataSoucre负责对连接的管理以及分发,同时设置队列的大小,等待时间,连接的账号、密码等。
核心方法为getConenction()方法。且实现AutoCloseable接口,以便后面可以使用using方法自动关闭资源。队列中的连接为封装了conenction的DbConnection类。
package pool import scala.util.control.Breaks._ import scala.collection.mutable.ArrayBuffer import java.{util => ju} import scala.collection.mutable.Buffer import scala.util.control.Breaks class DataSource( val driverName: String, val url: String, val user: String, val password: String, val minSize: Integer = 1, val maxSize: Integer = 10, val keepAliveTimeout: Long = 1000 ) extends AutoCloseable { if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) { throw new IllegalArgumentException("These arguments are Illegal") } Class.forName(driverName) private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]() private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true) for (i <- 0 until minSize) { pool += new DbConnection(url, user, password) } def getConenction(): DbConnection = { val starEntry = System.currentTimeMillis() Breaks.breakable { while (true) { lock.lock() try { for (con <- pool) { if (!con.used) { con.used = true return con; } } if (pool.size < maxSize) { var con = new DbConnection(url, user, password) { used = true } pool.append(con) return con } } finally { lock.unlock() } if (System.currentTimeMillis() - starEntry > keepAliveTimeout) { break() } } } throw new IllegalArgumentException("Connection Pool is empty") } def close(): Unit = { lock.lock() try { if (pool != null) { pool.foreach(t => t.innerConnection.close()) pool.clear() } } finally { lock.unlock() } } }
以下是Dbconnection类,该类提供了三个方法且实现了AutoCloseable接口
BeginTransaction:开启事务,并返回封装了的DbTransaction类
close:将连接释放
CreateCommand:创建DbCommand类,该类是负责操作连接的类,比如提交sql,读取数据等
package pool import java.sql.Connection import java.sql.DriverAction import java.sql.DriverManager class DbConnection( val url: String, val user: String, val password: String ) extends AutoCloseable { private[pool] var used: Boolean = false private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password) def close(): Unit = { if (used) { used = false } } def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = { if (innerConnection.getAutoCommit()) { innerConnection.setAutoCommit(false) } innerConnection.setTransactionIsolation(isolationLevel) new DbTransaction(this) } def CreateCommand(): DbCommand = { new DbCommand(this) } }
以下是DbCommand类的代码,该类负责操作数据库。如ExecuteResultSet,ExecuteScalar等。
ExecuteScalar:查询数据库并返回第一行第一个值的方法。
ExecuteResultSet:该方法有两个重载方法。
参数为callBack: ResultSet => Unit的方法,提供了一个回调函数,解析数据的操作可以在回调中实现。
无参的版本则通过反射直接将ResultSet通过字段位置映射,转换成你需要的类型。
package pool import java.sql.CallableStatement import java.sql.ResultSet import java.sql.SQLType import java.sql.Statement import java.sql.Types import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Buffer import scala.language.implicitConversions import scala.reflect.ClassTag import scala.reflect.runtime.{universe => ru} import Dispose.using import java.{util => ju} class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable { if (queryTimeout < 0) { throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.") } val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]() private val mirror = ru.runtimeMirror(getClass().getClassLoader()) private var statement: CallableStatement = null /** @author:qingchuan * * @return */ def ExecuteScalar(): Any = { var obj: Any = None ExecuteResultSet(t => { if (t.next()) { if (t.getMetaData().getColumnCount() > 0) obj = t.getObject(1) } }) obj } /** @author * qingchuan * @version 1.0 * * @param callBack */ def ExecuteResultSet(callBack: ResultSet => Unit): Unit = { if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.") statement = connection.innerConnection.prepareCall(commandText) statement.setQueryTimeout(queryTimeout) addParatemetrs() using(statement.executeQuery()) { t => callBack(t) if (!t.isClosed()) getOutParameterValue() } } def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = { val classSymbol = mirror.symbolOf[T].asClass val classMirror = mirror.reflectClass(classSymbol) val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod) val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm) val result = new ArrayBuffer[T]() ExecuteResultSet(t => { while (t.next()) { var i = 1 val values: Buffer[Any] = ArrayBuffer() for (f <- fields) { values += t.getObject(i) i += 1 } result += consMethodMirror.apply(values: _*).asInstanceOf[T] } }) result } def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = { statement = connection.innerConnection.prepareCall(commandText) var trans: DbTransaction = null val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm) for (t <- values) { var i = 1 val filedMirror = mirror.reflect(t) for (f <- fields) { val instance = filedMirror.reflectField(f) statement.setObject(i, instance.get) i += 1 } statement.addBatch() } try { trans = connection.BeginTransaction() val obj = statement.executeBatch() trans.Commit() statement.clearBatch() obj.sum } catch { case e: Exception => { if (trans != null) trans.RollBack() throw e } } } def ExecuteNoneQuery(): Integer = { statement = connection.innerConnection.prepareCall(commandText) statement.setQueryTimeout(queryTimeout) addParatemetrs() val obj = statement.executeUpdate() getOutParameterValue() obj } def CreateParameter(): DbParameter = { new DbParameter(); } private def getOutParameterValue(): Unit = { for (i <- 1 to Parameters.size) { val parameter: DbParameter = Parameters(i - 1); if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) { parameter.value = statement.getObject(i); } } } private def addParatemetrs(): Unit = { statement.clearParameters() for (i <- 1 to Parameters.size) { val p = Parameters(i - 1); if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) { statement.setObject(i, p.value) } if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) { statement.registerOutParameter(p.parameterName, p.sqlType, p.scale) } } } def close() { if (statement != null) { statement.close() } } } case class DbParameter( var parameterName: String = null, var value: Any = null, var parameterDirection: Integer = ParameterDirection.Input, var scale: Integer = 0, var sqlType: Integer = null ) {} object ParameterDirection { val Input = 1 val InputOutput = 2 val Output = 3 }
以下代码是DbTransaction,该类提供了事务的操作如提交、回滚。
package pool class DbTransaction(private val connection: DbConnection) { def Commit(): Unit = { connection.innerConnection.commit() if (!connection.innerConnection.getAutoCommit()) { connection.innerConnection.setAutoCommit(true); } } def RollBack(): Unit = { connection.innerConnection.rollback() if (!connection.innerConnection.getAutoCommit()) { connection.innerConnection.setAutoCommit(true) } } def getConnection(): DbConnection = { connection } def getTransactionIsolation(): Int = { connection.innerConnection.getTransactionIsolation() } } object IsolationLevel { val TRANSACTION_NONE = 0 val TRANSACTION_READ_UNCOMMITTED = 1; val TRANSACTION_READ_COMMITTED = 2; val TRANSACTION_REPEATABLE_READ = 4; val TRANSACTION_SERIALIZABLE = 8; }
最后是using的方法。通过柯里化以及Try-catch-finally的方式 自动关闭实现了AutoCloseable接口的资源。
package pool object Dispose { def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = { try { op(cls) } catch { case e: Exception => throw e } finally { cls.close() } } }
以下是客户端调用,代码模拟了15个线程并发访问数据库,连接池最多3个资源,从而说明连接池是可以复用这些连接的。
import pool.DataSource import pool.DbCommand import pool.DbParameter import pool.DbTransaction import pool.Dispose.using import pool.IsolationLevel import pool.ParameterDirection import java.sql.Date import java.sql.ResultSet import java.sql.Types import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime import javax.xml.crypto.Data import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Buffer import scala.language.experimental.macros import scala.reflect.ClassTag import scala.reflect.runtime.{universe => ru} import com.nimbusds.oauth2.sdk.util.date.SimpleDate import java.text.SimpleDateFormat object App { def main(args: Array[String]): Unit = { val pool = new DataSource( "com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true", "账号", "密码", minSize = 1, maxSize = 3, keepAliveTimeout = 3000 ) val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); for (i <- 1 to 15) { val thread: Thread = new Thread(() => { val date = new Date(System.currentTimeMillis()) using(pool.getConenction()) { con => { using(new DbCommand(con)) { cmd => { cmd.commandText = "{call p_get_out(?,?)}" val p1 = new DbParameter("@id", i) val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20) cmd.Parameters.append(p1) cmd.Parameters.append(p2) val result = cmd.ExecuteScalar() println(s"result=${result},output=${p2.value},parameter=${i}") } } } } }) thread.start() } } }
开发环境VsCode,SQL Server数据库。以下是引用的第三方库。
version := "1.0" libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8" libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"
以下是执行结果。
加载全部内容