forked from jackc/pgx
-
Notifications
You must be signed in to change notification settings - Fork 1
/
conn_pool.go
602 lines (507 loc) · 17.4 KB
/
conn_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
package pgx
import (
"context"
"io"
"sync"
"time"
"github.com/pkg/errors"
"github.com/jackc/pgx/pgtype"
)
type ConnPoolConfig struct {
ConnConfig
MaxConnections int // max simultaneous connections to use, default 5, must be at least 2
AfterConnect func(*Conn) error // function to call on every new connection
AcquireTimeout time.Duration // max wait time when all connections are busy (0 means no timeout)
}
type ConnPool struct {
allConnections []*Conn
availableConnections []*Conn
cond *sync.Cond
config ConnConfig // config used when establishing connection
inProgressConnects int
maxConnections int
resetCount int
afterConnect func(*Conn) error
logger Logger
logLevel LogLevel
closed bool
preparedStatements map[string]*PreparedStatement
acquireTimeout time.Duration
connInfo *pgtype.ConnInfo
}
type ConnPoolStat struct {
MaxConnections int // max simultaneous connections to use
CurrentConnections int // current live connections
AvailableConnections int // unused live connections
}
// CheckedOutConnections returns the amount of connections that are currently
// checked out from the pool.
func (stat *ConnPoolStat) CheckedOutConnections() int {
return stat.CurrentConnections - stat.AvailableConnections
}
// ErrAcquireTimeout occurs when an attempt to acquire a connection times out.
var ErrAcquireTimeout = errors.New("timeout acquiring connection from pool")
// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool.
var ErrClosedPool = errors.New("cannot acquire from closed pool")
// NewConnPool creates a new ConnPool. config.ConnConfig is passed through to
// Connect directly.
func NewConnPool(config ConnPoolConfig) (p *ConnPool, err error) {
p = new(ConnPool)
p.config = config.ConnConfig
p.connInfo = minimalConnInfo
p.maxConnections = config.MaxConnections
if p.maxConnections == 0 {
p.maxConnections = 5
}
if p.maxConnections < 1 {
return nil, errors.New("MaxConnections must be at least 1")
}
p.acquireTimeout = config.AcquireTimeout
if p.acquireTimeout < 0 {
return nil, errors.New("AcquireTimeout must be equal to or greater than 0")
}
p.afterConnect = config.AfterConnect
if config.LogLevel != 0 {
p.logLevel = config.LogLevel
} else {
// Preserve pre-LogLevel behavior by defaulting to LogLevelDebug
p.logLevel = LogLevelDebug
}
p.logger = config.Logger
if p.logger == nil {
p.logLevel = LogLevelNone
}
p.allConnections = make([]*Conn, 0, p.maxConnections)
p.availableConnections = make([]*Conn, 0, p.maxConnections)
p.preparedStatements = make(map[string]*PreparedStatement)
p.cond = sync.NewCond(new(sync.Mutex))
// Initially establish one connection
var c *Conn
c, err = p.createConnection()
if err != nil {
return
}
p.allConnections = append(p.allConnections, c)
p.availableConnections = append(p.availableConnections, c)
p.connInfo = c.ConnInfo.DeepCopy()
return
}
// Acquire takes exclusive use of a connection until it is released.
func (p *ConnPool) Acquire() (*Conn, error) {
p.cond.L.Lock()
c, err := p.acquire(nil)
p.cond.L.Unlock()
return c, err
}
func (p *ConnPool) AcquireEx(ctx context.Context) (*Conn, error) {
var deadline *time.Time
if p.acquireTimeout > 0 {
tmp := time.Now().Add(p.acquireTimeout)
deadline = &tmp
}
ctxDeadline, ok := ctx.Deadline()
if ok && (deadline == nil || ctxDeadline.Before(*deadline)) {
deadline = &ctxDeadline
}
p.cond.L.Lock()
c, err := p.acquire(deadline)
p.cond.L.Unlock()
return c, err
}
// deadlinePassed returns true if the given deadline has passed.
func (p *ConnPool) deadlinePassed(deadline *time.Time) bool {
return deadline != nil && time.Now().After(*deadline)
}
// acquire performs acquision assuming pool is already locked
func (p *ConnPool) acquire(deadline *time.Time) (*Conn, error) {
if p.closed {
return nil, ErrClosedPool
}
// A connection is available
// The pool works like a queue. Available connection will be returned
// from the head. A new connection will be added to the tail.
numAvailable := len(p.availableConnections)
if numAvailable > 0 {
c := p.availableConnections[0]
c.poolResetCount = p.resetCount
copy(p.availableConnections, p.availableConnections[1:])
p.availableConnections = p.availableConnections[:numAvailable-1]
return c, nil
}
// Set initial timeout/deadline value. If the method (acquire) happens to
// recursively call itself the deadline should retain its value.
if deadline == nil && p.acquireTimeout > 0 {
tmp := time.Now().Add(p.acquireTimeout)
deadline = &tmp
}
// Make sure the deadline (if it is) has not passed yet
if p.deadlinePassed(deadline) {
return nil, ErrAcquireTimeout
}
// If there is a deadline then start a timeout timer
var timer *time.Timer
if deadline != nil {
timer = time.AfterFunc(deadline.Sub(time.Now()), func() {
p.cond.Broadcast()
})
defer timer.Stop()
}
// No connections are available, but we can create more
if len(p.allConnections)+p.inProgressConnects < p.maxConnections {
// Create a new connection.
// Careful here: createConnectionUnlocked() removes the current lock,
// creates a connection and then locks it back.
c, err := p.createConnectionUnlocked()
if err != nil {
return nil, err
}
c.poolResetCount = p.resetCount
p.allConnections = append(p.allConnections, c)
return c, nil
}
// All connections are in use and we cannot create more
if p.logLevel >= LogLevelWarn {
p.logger.Log(LogLevelWarn, "waiting for available connection", nil)
}
// Wait until there is an available connection OR room to create a new connection
for len(p.availableConnections) == 0 && len(p.allConnections)+p.inProgressConnects == p.maxConnections {
if p.deadlinePassed(deadline) {
return nil, ErrAcquireTimeout
}
p.cond.Wait()
}
// Stop the timer so that we do not spawn it on every acquire call.
if timer != nil {
timer.Stop()
}
return p.acquire(deadline)
}
// Release gives up use of a connection.
func (p *ConnPool) Release(conn *Conn) {
if conn.ctxInProgress {
panic("should never release when context is in progress")
}
if conn.txStatus != 'I' {
conn.Exec("rollback")
}
if len(conn.channels) > 0 {
if err := conn.Unlisten("*"); err != nil {
conn.die(err)
}
conn.channels = make(map[string]struct{})
}
conn.notifications = nil
p.cond.L.Lock()
if conn.poolResetCount != p.resetCount {
conn.Close()
p.cond.L.Unlock()
p.cond.Signal()
return
}
if conn.IsAlive() {
p.availableConnections = append(p.availableConnections, conn)
} else {
p.removeFromAllConnections(conn)
}
p.cond.L.Unlock()
p.cond.Signal()
}
// removeFromAllConnections Removes the given connection from the list.
// It returns true if the connection was found and removed or false otherwise.
func (p *ConnPool) removeFromAllConnections(conn *Conn) bool {
for i, c := range p.allConnections {
if conn == c {
p.allConnections = append(p.allConnections[:i], p.allConnections[i+1:]...)
return true
}
}
return false
}
// Close ends the use of a connection pool. It prevents any new connections from
// being acquired and closes available underlying connections. Any acquired
// connections will be closed when they are released.
func (p *ConnPool) Close() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.closed = true
for _, c := range p.availableConnections {
_ = c.Close()
}
// This will cause any checked out connections to be closed on release
p.resetCount++
}
// Reset closes all open connections, but leaves the pool open. It is intended
// for use when an error is detected that would disrupt all connections (such as
// a network interruption or a server state change).
//
// It is safe to reset a pool while connections are checked out. Those
// connections will be closed when they are returned to the pool.
func (p *ConnPool) Reset() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.resetCount++
p.allConnections = p.allConnections[0:0]
for _, conn := range p.availableConnections {
conn.Close()
}
p.availableConnections = p.availableConnections[0:0]
}
// invalidateAcquired causes all acquired connections to be closed when released.
// The pool must already be locked.
func (p *ConnPool) invalidateAcquired() {
p.resetCount++
for _, c := range p.availableConnections {
c.poolResetCount = p.resetCount
}
p.allConnections = p.allConnections[:len(p.availableConnections)]
copy(p.allConnections, p.availableConnections)
}
// Stat returns connection pool statistics
func (p *ConnPool) Stat() (s ConnPoolStat) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
s.MaxConnections = p.maxConnections
s.CurrentConnections = len(p.allConnections)
s.AvailableConnections = len(p.availableConnections)
return
}
func (p *ConnPool) createConnection() (*Conn, error) {
c, err := connect(p.config, p.connInfo)
if err != nil {
return nil, err
}
return p.afterConnectionCreated(c)
}
// createConnectionUnlocked Removes the current lock, creates a new connection, and
// then locks it back.
// Here is the point: lets say our pool dialer's OpenTimeout is set to 3 seconds.
// And we have a pool with 20 connections in it, and we try to acquire them all at
// startup.
// If it happens that the remote server is not accessible, then the first connection
// in the pool blocks all the others for 3 secs, before it gets the timeout. Then
// connection #2 holds the lock and locks everything for the next 3 secs until it
// gets OpenTimeout err, etc. And the very last 20th connection will fail only after
// 3 * 20 = 60 secs.
// To avoid this we put Connect(p.config) outside of the lock (it is thread safe)
// what would allow us to make all the 20 connection in parallel (more or less).
func (p *ConnPool) createConnectionUnlocked() (*Conn, error) {
p.inProgressConnects++
p.cond.L.Unlock()
c, err := connect(p.config, p.connInfo.DeepCopy())
p.cond.L.Lock()
p.inProgressConnects--
if err != nil {
return nil, err
}
return p.afterConnectionCreated(c)
}
// afterConnectionCreated executes (if it is) afterConnect() callback and prepares
// all the known statements for the new connection.
func (p *ConnPool) afterConnectionCreated(c *Conn) (*Conn, error) {
if p.afterConnect != nil {
err := p.afterConnect(c)
if err != nil {
c.die(err)
return nil, err
}
}
for _, ps := range p.preparedStatements {
opts := &PrepareExOptions{ParameterOIDs: ps.ParameterOIDs}
if _, err := c.PrepareEx(context.Background(), ps.Name, ps.SQL, opts); err != nil {
c.die(err)
return nil, err
}
}
return c, nil
}
// Exec acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnPool) Exec(sql string, arguments ...interface{}) (commandTag CommandTag, err error) {
var c *Conn
if c, err = p.Acquire(); err != nil {
return
}
defer p.Release(c)
return c.Exec(sql, arguments...)
}
func (p *ConnPool) ExecEx(ctx context.Context, sql string, options *QueryExOptions, arguments ...interface{}) (commandTag CommandTag, err error) {
var c *Conn
if c, err = p.Acquire(); err != nil {
return
}
defer p.Release(c)
return c.ExecEx(ctx, sql, options, arguments...)
}
// Query acquires a connection and delegates the call to that connection. When
// *Rows are closed, the connection is released automatically.
func (p *ConnPool) Query(sql string, args ...interface{}) (*Rows, error) {
c, err := p.Acquire()
if err != nil {
// Because checking for errors can be deferred to the *Rows, build one with the error
return &Rows{closed: true, err: err}, err
}
rows, err := c.Query(sql, args...)
if err != nil {
p.Release(c)
return rows, err
}
rows.connPool = p
return rows, nil
}
func (p *ConnPool) QueryEx(ctx context.Context, sql string, options *QueryExOptions, args ...interface{}) (*Rows, error) {
c, err := p.Acquire()
if err != nil {
// Because checking for errors can be deferred to the *Rows, build one with the error
return &Rows{closed: true, err: err}, err
}
rows, err := c.QueryEx(ctx, sql, options, args...)
if err != nil {
p.Release(c)
return rows, err
}
rows.connPool = p
return rows, nil
}
// QueryRow acquires a connection and delegates the call to that connection. The
// connection is released automatically after Scan is called on the returned
// *Row.
func (p *ConnPool) QueryRow(sql string, args ...interface{}) *Row {
rows, _ := p.Query(sql, args...)
return (*Row)(rows)
}
func (p *ConnPool) QueryRowEx(ctx context.Context, sql string, options *QueryExOptions, args ...interface{}) *Row {
rows, _ := p.QueryEx(ctx, sql, options, args...)
return (*Row)(rows)
}
// Begin acquires a connection and begins a transaction on it. When the
// transaction is closed the connection will be automatically released.
func (p *ConnPool) Begin() (*Tx, error) {
return p.BeginEx(context.Background(), nil)
}
// Prepare creates a prepared statement on a connection in the pool to test the
// statement is valid. If it succeeds all connections accessed through the pool
// will have the statement available.
//
// Prepare creates a prepared statement with name and sql. sql can contain
// placeholders for bound parameters. These placeholders are referenced
// positional as $1, $2, etc.
//
// Prepare is idempotent; i.e. it is safe to call Prepare multiple times with
// the same name and sql arguments. This allows a code path to Prepare and
// Query/Exec/PrepareEx without concern for if the statement has already been prepared.
func (p *ConnPool) Prepare(name, sql string) (*PreparedStatement, error) {
return p.PrepareEx(context.Background(), name, sql, nil)
}
// PrepareEx creates a prepared statement on a connection in the pool to test the
// statement is valid. If it succeeds all connections accessed through the pool
// will have the statement available.
//
// PrepareEx creates a prepared statement with name and sql. sql can contain placeholders
// for bound parameters. These placeholders are referenced positional as $1, $2, etc.
// It differs from Prepare as it allows additional options (such as parameter OIDs) to be passed via struct
//
// PrepareEx is idempotent; i.e. it is safe to call PrepareEx multiple times with the same
// name and sql arguments. This allows a code path to PrepareEx and Query/Exec/Prepare without
// concern for if the statement has already been prepared.
func (p *ConnPool) PrepareEx(ctx context.Context, name, sql string, opts *PrepareExOptions) (*PreparedStatement, error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
if ps, ok := p.preparedStatements[name]; ok && ps.SQL == sql {
return ps, nil
}
c, err := p.acquire(nil)
if err != nil {
return nil, err
}
p.availableConnections = append(p.availableConnections, c)
// Double check that the statement was not prepared by someone else
// while we were acquiring the connection (since acquire is not fully
// blocking now, see createConnectionUnlocked())
if ps, ok := p.preparedStatements[name]; ok && ps.SQL == sql {
return ps, nil
}
ps, err := c.PrepareEx(ctx, name, sql, opts)
if err != nil {
return nil, err
}
for _, c := range p.availableConnections {
_, err := c.PrepareEx(ctx, name, sql, opts)
if err != nil {
return nil, err
}
}
p.invalidateAcquired()
p.preparedStatements[name] = ps
return ps, err
}
// Deallocate releases a prepared statement from all connections in the pool.
func (p *ConnPool) Deallocate(name string) (err error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
for _, c := range p.availableConnections {
if err := c.Deallocate(name); err != nil {
return err
}
}
p.invalidateAcquired()
delete(p.preparedStatements, name)
return nil
}
// BeginEx acquires a connection and starts a transaction with txOptions
// determining the transaction mode. When the transaction is closed the
// connection will be automatically released.
func (p *ConnPool) BeginEx(ctx context.Context, txOptions *TxOptions) (*Tx, error) {
for {
c, err := p.Acquire()
if err != nil {
return nil, err
}
tx, err := c.BeginEx(ctx, txOptions)
if err != nil {
alive := c.IsAlive()
p.Release(c)
// If connection is still alive then the error is not something trying
// again on a new connection would fix, so just return the error. But
// if the connection is dead try to acquire a new connection and try
// again.
if alive || ctx.Err() != nil {
return nil, err
}
continue
}
tx.connPool = p
return tx, nil
}
}
// CopyFrom acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnPool) CopyFrom(tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int, error) {
c, err := p.Acquire()
if err != nil {
return 0, err
}
defer p.Release(c)
return c.CopyFrom(tableName, columnNames, rowSrc)
}
// CopyFromReader acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnPool) CopyFromReader(r io.Reader, sql string) (CommandTag, error) {
c, err := p.Acquire()
if err != nil {
return "", err
}
defer p.Release(c)
return c.CopyFromReader(r, sql)
}
// CopyToWriter acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnPool) CopyToWriter(w io.Writer, sql string, args ...interface{}) (CommandTag, error) {
c, err := p.Acquire()
if err != nil {
return "", err
}
defer p.Release(c)
return c.CopyToWriter(w, sql, args...)
}
// BeginBatch acquires a connection and begins a batch on that connection. When
// *Batch is finished, the connection is released automatically.
func (p *ConnPool) BeginBatch() *Batch {
c, err := p.Acquire()
return &Batch{conn: c, connPool: p, err: err}
}