Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #108 from jchumnanvech/master
Browse files Browse the repository at this point in the history
Ability to add auto shrinking of connection pool size
  • Loading branch information
haizaar committed Jul 8, 2015
2 parents 0769a43 + b2d570a commit bf020b1
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 6 deletions.
29 changes: 25 additions & 4 deletions momoko/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
from functools import partial
from collections import deque
import time
import datetime
from functools import wraps
from contextlib import contextmanager
Expand All @@ -44,7 +45,7 @@ def __init__(self):
self.empty()

def empty(self):
self.free = set()
self.free = deque()
self.busy = set()
self.dead = set()
self.pending = set()
Expand All @@ -56,7 +57,8 @@ def add_free(self, conn):

if not self.waiting_queue:
log.debug("No outstanding requests - adding to free pool")
self.free.add(conn)
conn.last_used_time = time.time()
self.free.append(conn)
return

log.debug("There are outstanding requests - resumed future from waiting queue")
Expand Down Expand Up @@ -101,10 +103,16 @@ def abort_waiting_queue(self, error):
future.set_exception(error)

def close_alive(self):
for conn in self.free.union(self.busy):
for conn in self.busy.union(self.free):
if not conn.closed:
conn.close()

def shrink(self, target_size, delay_in_seconds):
now = time.time()
while len(self.free) > target_size and now - self.free[0].last_used_time > delay_in_seconds:
conn = self.free.popleft()
conn.close()

@property
def all_dead(self):
return not (self.free or self.busy)
Expand All @@ -124,7 +132,11 @@ def __init__(self,
ioloop=None,
raise_connect_errors=True,
reconnect_interval=500,
setsession=()):
setsession=(),
auto_shrink=False,
shrink_delay=datetime.timedelta(minutes=2),
shrink_period=datetime.timedelta(minutes=2)
):

assert size > 0, "The connection pool size must be a number above 0."

Expand All @@ -149,6 +161,15 @@ def __init__(self,

self._last_connect_time = 0
self._no_conn_availble_error = psycopg2.DatabaseError("No database connection available")
self.shrink_period = shrink_period
self.shrink_delay = shrink_delay
self.auto_shrink = auto_shrink
if auto_shrink:
self._auto_shrink()

def _auto_shrink(self):
self.conns.shrink(self.size, self.shrink_delay.seconds)
self.ioloop.add_timeout(self.shrink_period, self._auto_shrink)

def connect(self):
"""
Expand Down
69 changes: 67 additions & 2 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from itertools import chain
import inspect
import logging
import datetime
import threading
from tornado.concurrent import Future

from tornado import gen
from tornado.testing import unittest, AsyncTestCase, gen_test
Expand Down Expand Up @@ -349,7 +352,8 @@ class PoolBaseTest(BaseTest):
max_size = None
raise_connect_errors = False

def build_pool(self, dsn=None, setsession=(), con_factory=None, cur_factory=None, size=None):
def build_pool(self, dsn=None, setsession=(), con_factory=None, cur_factory=None, size=None, auto_shrink=False,
shrink_delay=datetime.timedelta(seconds=1), shrink_period=datetime.timedelta(milliseconds=500)):
db = momoko.Pool(
dsn=(dsn or self.dsn),
size=(size or self.pool_size),
Expand All @@ -359,6 +363,9 @@ def build_pool(self, dsn=None, setsession=(), con_factory=None, cur_factory=None
raise_connect_errors=self.raise_connect_errors,
connection_factory=con_factory,
cursor_factory=cur_factory,
auto_shrink=auto_shrink,
shrink_period=shrink_period,
shrink_delay=shrink_delay
)
return db.connect()

Expand All @@ -374,7 +381,7 @@ def runner(x):

def kill_connections(self, db, amount=None):
amount = amount or (len(db.conns.free) + len(db.conns.busy))
for conn in db.conns.free.union(db.conns.busy):
for conn in db.conns.busy.union(db.conns.free):
if not amount:
break
if not conn.closed:
Expand Down Expand Up @@ -722,6 +729,64 @@ def test_partially_connected(self):
self.assertRaises(exp, self.build_pool_sync, dsn=bad_dsn)


class MomokoPoolShrinkTest(MomokoPoolParallelTest):
pool_size = 2
max_size = 5

@gen_test
def test_pool_shrinking(self):
db = yield self.build_pool(auto_shrink=True, shrink_delay=datetime.timedelta(seconds=1),
shrink_period=datetime.timedelta(milliseconds=500))
f1 = db.execute("Select 1")
f2 = db.execute("Select 2")
f3 = db.execute("Select 3")
f4 = db.execute("Select 4")
f5 = db.execute("Select 5")
cursors = yield [f1, f2, f3, f4, f5]
yield gen.sleep(.7)

self.assertEqual(db.conns.total, 5)
self.assertEqual(cursors[0].fetchone()[0], 1)
self.assertEqual(cursors[1].fetchone()[0], 2)
self.assertEqual(cursors[2].fetchone()[0], 3)
self.assertEqual(cursors[3].fetchone()[0], 4)
self.assertEqual(cursors[4].fetchone()[0], 5)

yield gen.sleep(1)

self.assertEqual(db.conns.total, 2)

@gen_test
def test_pool_shrinking_with_shrink_delay(self):
db = yield self.build_pool(auto_shrink=True, shrink_delay=datetime.timedelta(seconds=1),
shrink_period=datetime.timedelta(milliseconds=500))
f1 = db.execute("Select 1")
f2 = db.execute("Select 2")
f3 = db.execute("Select 3")
f4 = db.execute("Select 4")
f5 = db.execute("Select 5")
cursors = yield [f1, f2, f3, f4, f5]
yield gen.sleep(.7)

self.assertEqual(db.conns.total, 5)
self.assertEqual(cursors[0].fetchone()[0], 1)
self.assertEqual(cursors[1].fetchone()[0], 2)
self.assertEqual(cursors[2].fetchone()[0], 3)
self.assertEqual(cursors[3].fetchone()[0], 4)
self.assertEqual(cursors[4].fetchone()[0], 5)

f1 = db.execute("Select 1")
f2 = db.execute("Select 2")
f3 = db.execute("Select 3")
cursors = yield [f1, f2, f3]
self.assertEqual(cursors[0].fetchone()[0], 1)
self.assertEqual(cursors[1].fetchone()[0], 2)
self.assertEqual(cursors[2].fetchone()[0], 3)

yield gen.sleep(1)

self.assertEqual(db.conns.total, 3)

if __name__ == '__main__':
if debug:
FORMAT = '%(asctime)-15s %(levelname)s:%(name)s %(funcName)-15s: %(message)s'
Expand Down

0 comments on commit bf020b1

Please sign in to comment.