亲宝软件园·资讯

展开

Scala数据库连接池的简单实现

ncqingchuan1976 人气:0

在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。如果没有可用连接,则可以在一定时间内等待,直到队列中有可用的连接,否则将抛出异常。

以下是DataSoucre的代码,DataSoucre负责对连接的管理以及分发,同时设置队列的大小,等待时间,连接的账号、密码等。

核心方法为getConenction()方法。且实现AutoCloseable接口,以便后面可以使用using方法自动关闭资源。队列中的连接为封装了conenction的DbConnection类。

package pool
import scala.util.control.Breaks._
import scala.collection.mutable.ArrayBuffer
import java.{util => ju}
import scala.collection.mutable.Buffer
import scala.util.control.Breaks
 
class DataSource(
    val driverName: String,
    val url: String,
    val user: String,
    val password: String,
    val minSize: Integer = 1,
    val maxSize: Integer = 10,
    val keepAliveTimeout: Long = 1000
) extends AutoCloseable {
 
  if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) {
    throw new IllegalArgumentException("These arguments are Illegal")
  }
 
  Class.forName(driverName)
  private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]()
  private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true)
 
  for (i <- 0 until minSize) {
    pool += new DbConnection(url, user, password)
  }
 
  def getConenction(): DbConnection = {
    val starEntry = System.currentTimeMillis()
    Breaks.breakable {
      while (true) {
        lock.lock()
        try {
          for (con <- pool) {
            if (!con.used) {
              con.used = true
              return con;
            }
          }
          if (pool.size < maxSize) {
            var con = new DbConnection(url, user, password) { used = true }
            pool.append(con)
            return con
          }
        } finally {
          lock.unlock()
        }
        if (System.currentTimeMillis() - starEntry > keepAliveTimeout) {
          break()
        }
      }
    }
    throw new IllegalArgumentException("Connection Pool is empty")
  }
  def close(): Unit = {
    lock.lock()
    try {
      if (pool != null) {
        pool.foreach(t => t.innerConnection.close())
        pool.clear()
      }
    } finally {
      lock.unlock()
    }
  }
}

以下是Dbconnection类,该类提供了三个方法且实现了AutoCloseable接口

BeginTransaction:开启事务,并返回封装了的DbTransaction类

close:将连接释放

CreateCommand:创建DbCommand类,该类是负责操作连接的类,比如提交sql,读取数据等

package pool
 
import java.sql.Connection
import java.sql.DriverAction
import java.sql.DriverManager
 
class DbConnection(
    val url: String,
    val user: String,
    val password: String
) extends AutoCloseable {
 
  private[pool] var used: Boolean = false
  private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password)
 
  def close(): Unit = {
    if (used) {
      used = false
    }
  }
 
  def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = {
    if (innerConnection.getAutoCommit()) {
      innerConnection.setAutoCommit(false)
    }
    innerConnection.setTransactionIsolation(isolationLevel)
    new DbTransaction(this)
  }
 
  def CreateCommand(): DbCommand = {
    new DbCommand(this)
  }
}

以下是DbCommand类的代码,该类负责操作数据库。如ExecuteResultSet,ExecuteScalar等。

ExecuteScalar:查询数据库并返回第一行第一个值的方法。

ExecuteResultSet:该方法有两个重载方法。

参数为callBack: ResultSet => Unit的方法,提供了一个回调函数,解析数据的操作可以在回调中实现。

无参的版本则通过反射直接将ResultSet通过字段位置映射,转换成你需要的类型。

package pool
 
import java.sql.CallableStatement
import java.sql.ResultSet
import java.sql.SQLType
import java.sql.Statement
import java.sql.Types
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
 
import Dispose.using
import java.{util => ju}
 
class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable {
  if (queryTimeout < 0) {
    throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.")
  }
  val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]()
  private val mirror = ru.runtimeMirror(getClass().getClassLoader())
  private var statement: CallableStatement = null
 
  /** @author:qingchuan
    *
    * @return
    */
  def ExecuteScalar(): Any = {
    var obj: Any = None
    ExecuteResultSet(t => {
      if (t.next()) {
        if (t.getMetaData().getColumnCount() > 0)
          obj = t.getObject(1)
      }
    })
    obj
  }
 
  /** @author
    *   qingchuan
    * @version 1.0
    *
    * @param callBack
    */
  def ExecuteResultSet(callBack: ResultSet => Unit): Unit = {
    if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.")
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    using(statement.executeQuery()) { t =>
      callBack(t)
      if (!t.isClosed())
        getOutParameterValue()
    }
  }
 
  def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = {
    val classSymbol = mirror.symbolOf[T].asClass
    val classMirror = mirror.reflectClass(classSymbol)
    val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod)
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    val result = new ArrayBuffer[T]()
    ExecuteResultSet(t => {
      while (t.next()) {
        var i = 1
        val values: Buffer[Any] = ArrayBuffer()
        for (f <- fields) {
          values += t.getObject(i)
          i += 1
        }
        result += consMethodMirror.apply(values: _*).asInstanceOf[T]
      }
    })
    result
  }
 
  def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = {
    statement = connection.innerConnection.prepareCall(commandText)
    var trans: DbTransaction = null
    val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
    for (t <- values) {
      var i = 1
      val filedMirror = mirror.reflect(t)
      for (f <- fields) {
        val instance = filedMirror.reflectField(f)
        statement.setObject(i, instance.get)
        i += 1
      }
      statement.addBatch()
    }
 
    try {
      trans = connection.BeginTransaction()
      val obj = statement.executeBatch()
      trans.Commit()
      statement.clearBatch()
      obj.sum
    } catch {
      case e: Exception => {
        if (trans != null)
          trans.RollBack()
        throw e
      }
    }
  }
 
  def ExecuteNoneQuery(): Integer = {
    statement = connection.innerConnection.prepareCall(commandText)
    statement.setQueryTimeout(queryTimeout)
    addParatemetrs()
    val obj = statement.executeUpdate()
    getOutParameterValue()
    obj
  }
 
  def CreateParameter(): DbParameter = {
    new DbParameter();
  }
 
  private def getOutParameterValue(): Unit = {
    for (i <- 1 to Parameters.size) {
      val parameter: DbParameter = Parameters(i - 1);
      if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) {
        parameter.value = statement.getObject(i);
      }
    }
  }
 
  private def addParatemetrs(): Unit = {
    statement.clearParameters()
    for (i <- 1 to Parameters.size) {
      val p = Parameters(i - 1);
      if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.setObject(i, p.value)
      }
      if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) {
        statement.registerOutParameter(p.parameterName, p.sqlType, p.scale)
      }
    }
  }
  def close() {
    if (statement != null) {
      statement.close()
    }
  }
}
 
case class DbParameter(
    var parameterName: String = null,
    var value: Any = null,
    var parameterDirection: Integer = ParameterDirection.Input,
    var scale: Integer = 0,
    var sqlType: Integer = null
) {}
 
object ParameterDirection {
  val Input = 1
  val InputOutput = 2
  val Output = 3
}

以下代码是DbTransaction,该类提供了事务的操作如提交、回滚。

package pool
 
class DbTransaction(private val connection: DbConnection) {
 
  def Commit(): Unit = {
    connection.innerConnection.commit()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true);
    }
  }
  def RollBack(): Unit = {
    connection.innerConnection.rollback()
    if (!connection.innerConnection.getAutoCommit()) {
      connection.innerConnection.setAutoCommit(true)
    }
  }
 
  def getConnection(): DbConnection = {
    connection
  }
 
  def getTransactionIsolation(): Int = {
    connection.innerConnection.getTransactionIsolation()
  }
 
}
 
object IsolationLevel {
 
  val TRANSACTION_NONE = 0
 
  val TRANSACTION_READ_UNCOMMITTED = 1;
 
  val TRANSACTION_READ_COMMITTED = 2;
 
  val TRANSACTION_REPEATABLE_READ = 4;
 
  val TRANSACTION_SERIALIZABLE = 8;
 
}

最后是using的方法。通过柯里化以及Try-catch-finally的方式 自动关闭实现了AutoCloseable接口的资源。

package pool
 
object Dispose {
  def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = {
    try {
      op(cls)
    } catch {
      case e: Exception => throw e
    } finally {
      cls.close()
    }
  }
}

以下是客户端调用,代码模拟了15个线程并发访问数据库,连接池最多3个资源,从而说明连接池是可以复用这些连接的。

import pool.DataSource
import pool.DbCommand
import pool.DbParameter
import pool.DbTransaction
import pool.Dispose.using
import pool.IsolationLevel
import pool.ParameterDirection
 
import java.sql.Date
import java.sql.ResultSet
import java.sql.Types
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import javax.xml.crypto.Data
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
import com.nimbusds.oauth2.sdk.util.date.SimpleDate
import java.text.SimpleDateFormat
object App {
  def main(args: Array[String]): Unit = {
    val pool = new DataSource(
      "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true",
      "账号",
      "密码",
      minSize = 1,
      maxSize = 3,
      keepAliveTimeout = 3000
    )
 
    val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
    for (i <- 1 to 15) {
      val thread: Thread = new Thread(() => {
        val date = new Date(System.currentTimeMillis())
        using(pool.getConenction()) { con =>
          {
            using(new DbCommand(con)) { cmd =>
              {
                cmd.commandText = "{call p_get_out(?,?)}"
                val p1 = new DbParameter("@id", i)
                val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20)
                cmd.Parameters.append(p1)
                cmd.Parameters.append(p2)
                val result = cmd.ExecuteScalar()
                println(s"result=${result},output=${p2.value},parameter=${i}")
              }
            }
          }
        }
      })
      thread.start()
    }
  }
}

开发环境VsCode,SQL Server数据库。以下是引用的第三方库。

version := "1.0"
libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8"
libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"

以下是执行结果。

加载全部内容

相关教程
猜你喜欢
用户评论