pool.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. package client
  2. import (
  3. "context"
  4. "math"
  5. "math/rand"
  6. "sync"
  7. "time"
  8. "github.com/pingcap/errors"
  9. )
  10. /*
  11. Pool for efficient reuse of connections.
  12. Usage:
  13. pool := client.NewPool(log.Debugf, 100, 400, 5, `127.0.0.1:3306`, `username`, `userpwd`, `dbname`)
  14. ...
  15. conn, _ := pool.GetConn(ctx)
  16. defer pool.PutConn(conn)
  17. conn.Execute/conn.Begin/etc...
  18. */
  19. type (
  20. Timestamp int64
  21. LogFunc func(format string, args ...interface{})
  22. Pool struct {
  23. logFunc LogFunc
  24. minAlive int
  25. maxAlive int
  26. maxIdle int
  27. idleCloseTimeout Timestamp
  28. idlePingTimeout Timestamp
  29. connect func() (*Conn, error)
  30. synchro struct {
  31. sync.Mutex
  32. idleConnections []Connection
  33. stats ConnectionStats
  34. }
  35. readyConnection chan Connection
  36. }
  37. ConnectionStats struct {
  38. // Uses internally
  39. TotalCount int
  40. // Only for stats
  41. IdleCount int
  42. CreatedCount int64
  43. }
  44. Connection struct {
  45. conn *Conn
  46. lastUseAt Timestamp
  47. }
  48. )
  49. var (
  50. // MaxIdleTimeoutWithoutPing - If the connection has been idle for more than this time,
  51. // then ping will be performed before use to check if it alive
  52. MaxIdleTimeoutWithoutPing = 10 * time.Second
  53. // DefaultIdleTimeout - If the connection has been idle for more than this time,
  54. // we can close it (but we should remember about Pool.minAlive)
  55. DefaultIdleTimeout = 30 * time.Second
  56. // MaxNewConnectionAtOnce - If we need to create new connections,
  57. // then we will create no more than this number of connections at a time.
  58. // This restriction will be ignored on pool initialization.
  59. MaxNewConnectionAtOnce = 5
  60. )
  61. // NewPool initializes new connection pool and uses params: addr, user, password, dbName and options.
  62. // minAlive specifies the minimum number of open connections that the pool will try to maintain.
  63. // maxAlive specifies the maximum number of open connections
  64. // (for internal reasons, may be greater by 1 inside newConnectionProducer).
  65. // maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout).
  66. func NewPool(
  67. logFunc LogFunc,
  68. minAlive int,
  69. maxAlive int,
  70. maxIdle int,
  71. addr string,
  72. user string,
  73. password string,
  74. dbName string,
  75. options ...func(conn *Conn),
  76. ) *Pool {
  77. if minAlive > maxAlive {
  78. minAlive = maxAlive
  79. }
  80. if maxIdle > maxAlive {
  81. maxIdle = maxAlive
  82. }
  83. if maxIdle <= minAlive {
  84. maxIdle = minAlive
  85. }
  86. pool := &Pool{
  87. logFunc: logFunc,
  88. minAlive: minAlive,
  89. maxAlive: maxAlive,
  90. maxIdle: maxIdle,
  91. idleCloseTimeout: Timestamp(math.Ceil(DefaultIdleTimeout.Seconds())),
  92. idlePingTimeout: Timestamp(math.Ceil(MaxIdleTimeoutWithoutPing.Seconds())),
  93. connect: func() (*Conn, error) {
  94. return Connect(addr, user, password, dbName, options...)
  95. },
  96. readyConnection: make(chan Connection),
  97. }
  98. pool.synchro.idleConnections = make([]Connection, 0, pool.maxIdle)
  99. go pool.newConnectionProducer()
  100. if pool.minAlive > 0 {
  101. pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, pool.minAlive)
  102. pool.startNewConnections(pool.minAlive)
  103. }
  104. go pool.closeOldIdleConnections()
  105. return pool
  106. }
  107. func (pool *Pool) GetStats(stats *ConnectionStats) {
  108. pool.synchro.Lock()
  109. *stats = pool.synchro.stats
  110. stats.IdleCount = len(pool.synchro.idleConnections)
  111. pool.synchro.Unlock()
  112. }
  113. // GetConn returns connection from the pool or create new
  114. func (pool *Pool) GetConn(ctx context.Context) (*Conn, error) {
  115. for {
  116. connection, err := pool.getConnection(ctx)
  117. if err != nil {
  118. return nil, err
  119. }
  120. // For long time idle connections, we do a ping check
  121. if delta := pool.nowTs() - connection.lastUseAt; delta > pool.idlePingTimeout {
  122. if err := pool.ping(connection.conn); err != nil {
  123. pool.closeConn(connection.conn)
  124. continue
  125. }
  126. }
  127. return connection.conn, nil
  128. }
  129. }
  130. // PutConn returns working connection back to pool
  131. func (pool *Pool) PutConn(conn *Conn) {
  132. pool.putConnection(Connection{
  133. conn: conn,
  134. lastUseAt: pool.nowTs(),
  135. })
  136. }
  137. // DropConn closes the connection without any checks
  138. func (pool *Pool) DropConn(conn *Conn) {
  139. pool.closeConn(conn)
  140. }
  141. func (pool *Pool) putConnection(connection Connection) {
  142. pool.synchro.Lock()
  143. defer pool.synchro.Unlock()
  144. // If someone is already waiting for a connection, then we return it to him
  145. select {
  146. case pool.readyConnection <- connection:
  147. return
  148. default:
  149. }
  150. // Nobody needs this connection
  151. pool.putConnectionUnsafe(connection)
  152. }
  153. func (pool *Pool) nowTs() Timestamp {
  154. return Timestamp(time.Now().Unix())
  155. }
  156. func (pool *Pool) getConnection(ctx context.Context) (Connection, error) {
  157. pool.synchro.Lock()
  158. connection := pool.getIdleConnectionUnsafe()
  159. if connection.conn != nil {
  160. pool.synchro.Unlock()
  161. return connection, nil
  162. }
  163. pool.synchro.Unlock()
  164. // No idle connections are available
  165. select {
  166. case connection := <-pool.readyConnection:
  167. return connection, nil
  168. case <-ctx.Done():
  169. return Connection{}, ctx.Err()
  170. }
  171. }
  172. func (pool *Pool) putConnectionUnsafe(connection Connection) {
  173. if len(pool.synchro.idleConnections) == cap(pool.synchro.idleConnections) {
  174. pool.synchro.stats.TotalCount--
  175. _ = connection.conn.Close() // Could it be more effective to close older connections?
  176. } else {
  177. pool.synchro.idleConnections = append(pool.synchro.idleConnections, connection)
  178. }
  179. }
  180. func (pool *Pool) newConnectionProducer() {
  181. var connection Connection
  182. var err error
  183. for {
  184. connection.conn = nil
  185. pool.synchro.Lock()
  186. connection = pool.getIdleConnectionUnsafe()
  187. if connection.conn == nil {
  188. if pool.synchro.stats.TotalCount >= pool.maxAlive {
  189. // Can't create more connections
  190. pool.synchro.Unlock()
  191. time.Sleep(10 * time.Millisecond)
  192. continue
  193. }
  194. pool.synchro.stats.TotalCount++ // "Reserving" new connection
  195. }
  196. pool.synchro.Unlock()
  197. if connection.conn == nil {
  198. connection, err = pool.createNewConnection()
  199. if err != nil {
  200. pool.synchro.Lock()
  201. pool.synchro.stats.TotalCount-- // Bad luck, should try again
  202. pool.synchro.Unlock()
  203. time.Sleep(time.Duration(10+rand.Intn(90)) * time.Millisecond)
  204. continue
  205. }
  206. }
  207. pool.readyConnection <- connection
  208. }
  209. }
  210. func (pool *Pool) createNewConnection() (Connection, error) {
  211. var connection Connection
  212. var err error
  213. connection.conn, err = pool.connect()
  214. if err != nil {
  215. return Connection{}, errors.Errorf(`Could not connect to mysql: %s`, err)
  216. }
  217. connection.lastUseAt = pool.nowTs()
  218. pool.synchro.Lock()
  219. pool.synchro.stats.CreatedCount++
  220. pool.synchro.Unlock()
  221. return connection, nil
  222. }
  223. func (pool *Pool) getIdleConnectionUnsafe() Connection {
  224. cnt := len(pool.synchro.idleConnections)
  225. if cnt == 0 {
  226. return Connection{}
  227. }
  228. last := cnt - 1
  229. connection := pool.synchro.idleConnections[last]
  230. pool.synchro.idleConnections[last].conn = nil
  231. pool.synchro.idleConnections = pool.synchro.idleConnections[:last]
  232. return connection
  233. }
  234. func (pool *Pool) closeOldIdleConnections() {
  235. var toPing []Connection
  236. ticker := time.NewTicker(5 * time.Second)
  237. for range ticker.C {
  238. toPing = pool.getOldIdleConnections(toPing[:0])
  239. if len(toPing) == 0 {
  240. continue
  241. }
  242. pool.recheckConnections(toPing)
  243. if !pool.spawnConnectionsIfNeeded() {
  244. pool.closeIdleConnectionsIfCan()
  245. }
  246. }
  247. }
  248. func (pool *Pool) getOldIdleConnections(dst []Connection) []Connection {
  249. dst = dst[:0]
  250. pool.synchro.Lock()
  251. synchro := &pool.synchro
  252. idleCnt := len(synchro.idleConnections)
  253. checkBefore := pool.nowTs() - pool.idlePingTimeout
  254. for i := idleCnt - 1; i >= 0; i-- {
  255. if synchro.idleConnections[i].lastUseAt > checkBefore {
  256. continue
  257. }
  258. dst = append(dst, synchro.idleConnections[i])
  259. last := idleCnt - 1
  260. if i < last {
  261. // Removing an item from the middle of a slice
  262. synchro.idleConnections[i], synchro.idleConnections[last] = synchro.idleConnections[last], synchro.idleConnections[i]
  263. }
  264. synchro.idleConnections[last].conn = nil
  265. synchro.idleConnections = synchro.idleConnections[:last]
  266. idleCnt--
  267. }
  268. pool.synchro.Unlock()
  269. return dst
  270. }
  271. func (pool *Pool) recheckConnections(connections []Connection) {
  272. const workerCnt = 2 // Heuristic :)
  273. queue := make(chan Connection, len(connections))
  274. for _, connection := range connections {
  275. queue <- connection
  276. }
  277. close(queue)
  278. var wg sync.WaitGroup
  279. wg.Add(workerCnt)
  280. for worker := 0; worker < workerCnt; worker++ {
  281. go func() {
  282. defer wg.Done()
  283. for connection := range queue {
  284. if err := pool.ping(connection.conn); err != nil {
  285. pool.closeConn(connection.conn)
  286. } else {
  287. pool.putConnection(connection)
  288. }
  289. }
  290. }()
  291. }
  292. wg.Wait()
  293. }
  294. // spawnConnectionsIfNeeded creates new connections if there are not enough of them and returns true in this case
  295. func (pool *Pool) spawnConnectionsIfNeeded() bool {
  296. pool.synchro.Lock()
  297. totalCount := pool.synchro.stats.TotalCount
  298. idleCount := len(pool.synchro.idleConnections)
  299. needSpanNew := pool.minAlive - totalCount
  300. pool.synchro.Unlock()
  301. if needSpanNew <= 0 {
  302. return false
  303. }
  304. // Не хватает соединений, нужно создать еще
  305. if needSpanNew > MaxNewConnectionAtOnce {
  306. needSpanNew = MaxNewConnectionAtOnce
  307. }
  308. pool.logFunc(`Pool: Setup %d new connections (total: %d idle: %d)...`, needSpanNew, totalCount, idleCount)
  309. pool.startNewConnections(needSpanNew)
  310. return true
  311. }
  312. func (pool *Pool) closeIdleConnectionsIfCan() {
  313. pool.synchro.Lock()
  314. canCloseCnt := pool.synchro.stats.TotalCount - pool.minAlive
  315. canCloseCnt-- // -1 to account for an open but unused connection (pool.readyConnection <- connection in newConnectionProducer)
  316. idleCnt := len(pool.synchro.idleConnections)
  317. inFly := pool.synchro.stats.TotalCount - idleCnt
  318. // We can close no more than 10% connections at a time, but at least 1, if possible
  319. idleCanCloseCnt := idleCnt / 10
  320. if idleCanCloseCnt == 0 {
  321. idleCanCloseCnt = 1
  322. }
  323. if canCloseCnt > idleCanCloseCnt {
  324. canCloseCnt = idleCanCloseCnt
  325. }
  326. if canCloseCnt <= 0 {
  327. pool.synchro.Unlock()
  328. return
  329. }
  330. closeFromIdx := idleCnt - canCloseCnt
  331. if closeFromIdx < 0 {
  332. // If there are enough requests in the "flight" now, then we can close all unnecessary
  333. closeFromIdx = 0
  334. }
  335. toClose := append([]Connection{}, pool.synchro.idleConnections[closeFromIdx:]...)
  336. for i := closeFromIdx; i < idleCnt; i++ {
  337. pool.synchro.idleConnections[i].conn = nil
  338. }
  339. pool.synchro.idleConnections = pool.synchro.idleConnections[:closeFromIdx]
  340. pool.synchro.Unlock()
  341. pool.logFunc(`Pool: Close %d idle connections (in fly %d)`, len(toClose), inFly)
  342. for _, connection := range toClose {
  343. pool.closeConn(connection.conn)
  344. }
  345. }
  346. func (pool *Pool) closeConn(conn *Conn) {
  347. pool.synchro.Lock()
  348. pool.synchro.stats.TotalCount--
  349. pool.synchro.Unlock()
  350. _ = conn.Close() // Closing is not an instant action, so do it outside the lock
  351. }
  352. func (pool *Pool) startNewConnections(count int) {
  353. connections := make([]Connection, 0, count)
  354. for i := 0; i < count; i++ {
  355. if conn, err := pool.createNewConnection(); err == nil {
  356. pool.synchro.Lock()
  357. pool.synchro.stats.TotalCount++
  358. pool.synchro.Unlock()
  359. connections = append(connections, conn)
  360. }
  361. }
  362. pool.synchro.Lock()
  363. for _, connection := range connections {
  364. pool.putConnectionUnsafe(connection)
  365. }
  366. pool.synchro.Unlock()
  367. }
  368. func (pool *Pool) ping(conn *Conn) error {
  369. deadline := time.Now().Add(100 * time.Millisecond)
  370. _ = conn.SetDeadline(deadline)
  371. err := conn.Ping()
  372. if err != nil {
  373. pool.logFunc(`Pool: ping query fail: %s`, err.Error())
  374. } else {
  375. _ = conn.SetDeadline(time.Time{})
  376. }
  377. return err
  378. }