Skip to content

Commit

Permalink
Merge branch 'bugfix/issue-1'
Browse files Browse the repository at this point in the history
* bugfix/issue-1:
  fix: maxCap does not restrict created connections #1
  • Loading branch information
buraksezer committed Jun 10, 2021
2 parents fbd91d0 + 3d94e98 commit c18fbd7
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 9 deletions.
46 changes: 39 additions & 7 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
)

// channelPool implements the Pool interface based on buffered channels.
Expand All @@ -13,6 +14,10 @@ type channelPool struct {
mu sync.RWMutex
conns chan net.Conn

maxCap int
cond *sync.Cond
aliveConnections int32

// net.Conn generator
factory Factory
}
Expand All @@ -33,6 +38,8 @@ func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {

c := &channelPool{
conns: make(chan net.Conn, maxCap),
cond: sync.NewCond(&sync.Mutex{}),
maxCap: maxCap,
factory: factory,
}

Expand All @@ -42,8 +49,10 @@ func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
conn, err := factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
return nil, fmt.Errorf("factory is not able to fill the pool: %w", err)
}

atomic.AddInt32(&c.aliveConnections, 1)
c.conns <- conn
}

Expand All @@ -58,6 +67,23 @@ func (c *channelPool) getConnsAndFactory() (chan net.Conn, Factory) {
return conns, factory
}

func (c *channelPool) makeNewConn(factory Factory) (net.Conn, error) {
c.cond.L.Lock()
defer c.cond.L.Unlock()

for atomic.LoadInt32(&c.aliveConnections) >= int32(c.maxCap) {
c.cond.Wait()
}

conn, err := factory()
if err != nil {
return nil, err
}

atomic.AddInt32(&c.aliveConnections, 1)
return c.wrapConn(conn), nil
}

// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
Expand All @@ -77,12 +103,7 @@ func (c *channelPool) Get() (net.Conn, error) {

return c.wrapConn(conn), nil
default:
conn, err := factory()
if err != nil {
return nil, err
}

return c.wrapConn(conn), nil
return c.makeNewConn(factory)
}
}

Expand All @@ -105,8 +126,15 @@ func (c *channelPool) put(conn net.Conn) error {
// block and the default case will be executed.
select {
case c.conns <- conn:
c.cond.L.Lock()
if atomic.LoadInt32(&c.aliveConnections) < int32(c.maxCap) {
c.cond.Signal()
}
c.cond.L.Unlock()

return nil
default:
atomic.AddInt32(&c.aliveConnections, -1)
// pool is full, close passed connection
return conn.Close()
}
Expand All @@ -133,3 +161,7 @@ func (c *channelPool) Len() int {
conns, _ := c.getConnsAndFactory()
return len(conns)
}

func (c *channelPool) NumberOfConns() int {
return int(atomic.LoadInt32(&c.aliveConnections))
}
81 changes: 81 additions & 0 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"net"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -290,3 +291,83 @@ func simpleTCPServer() {
}()
}
}

func TestPoolMaximumCapacity(t *testing.T) {
var successful int32
p, _ := NewChannelPool(InitialCap, MaximumCap, func() (net.Conn, error) {
return net.Dial(network, address)
})
defer p.Close()

var wg sync.WaitGroup
numWorkers := MaximumCap * 2
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
done := make(chan struct{})
go func() {
defer close(done)

_, err := p.Get()
if err != nil {
t.Error(err)
}
atomic.AddInt32(&successful, 1)
}()

select {
case <-time.After(time.Second):
case <-done:
}
wg.Done()
}()
}
wg.Wait()

if atomic.LoadInt32(&successful) != int32(MaximumCap) {
t.Errorf("expected successfull Get calls: %d, got: %d", MaximumCap, successful)
}

if p.NumberOfConns() != MaximumCap {
t.Errorf("expected connection count %d, got %d", MaximumCap, p.NumberOfConns())
}
}

func TestPoolMaximumCapacity_Close(t *testing.T) {
var successfull int32
p, _ := NewChannelPool(InitialCap, MaximumCap, func() (net.Conn, error) {
return net.Dial(network, address)
})
defer p.Close()

var wg sync.WaitGroup
numWorkers := MaximumCap * 2
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
done := make(chan struct{})
go func() {
defer close(done)

c, err := p.Get()
if err != nil {
t.Error(err)
}
atomic.AddInt32(&successfull, 1)
c.Close()
}()

select {
case <-time.After(time.Second):
case <-done:
}
wg.Done()
}()
}
wg.Wait()

if atomic.LoadInt32(&successfull) != int32(numWorkers) {
t.Errorf("expected successful Get calls: %d, got: %d",
numWorkers, atomic.LoadInt32(&successfull))
}
}
7 changes: 5 additions & 2 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var (
)

// Pool interface describes a pool implementation. A pool should have maximum
// capacity. An ideal pool is threadsafe and easy to use.
// capacity. An ideal pool is thread-safe and easy to use.
type Pool interface {
// Get returns a new connection from the pool. Closing the connections puts
// it back to the Pool. Closing it when the pool is destroyed or full will
Expand All @@ -23,6 +23,9 @@ type Pool interface {
// no longer usable.
Close()

// Len returns the current number of connections of the pool.
// Len returns the current number of idle connections of the pool.
Len() int

// NumberOfConns returns the total number of alive connections of the pool.
NumberOfConns() int
}

0 comments on commit c18fbd7

Please sign in to comment.