Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #172 from haizaar/master
Browse files Browse the repository at this point in the history
Python 3.7 and Tornado 5.x support
  • Loading branch information
haizaar authored Nov 4, 2018
2 parents 0047940 + a608ef3 commit f7f394e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ __pycache__
docs/_build/
*.sublime-project
*.sublime-workspace
*.swp
24 changes: 18 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
language: python

python:
- 2.6
- 2.7
- 3.3
- 3.4
- 3.5
- 3.6
- pypy
- pypy3.5

addons:
postgresql: "9.4"
Expand All @@ -25,13 +24,26 @@ env:
- MOMOKO_TEST_HOST=127.0.0.1
- PGHOST=127.0.0.1
matrix:
- MOMOKO_PSYCOPG2_IMPL=psycopg2
- MOMOKO_PSYCOPG2_IMPL=psycopg2cffi
- MOMOKO_PSYCOPG2_IMPL=psycopg2 TORNADO_VER="<5.0.0"
- MOMOKO_PSYCOPG2_IMPL=psycopg2cffi TORNADO_VER="<5.0.0"
- MOMOKO_PSYCOPG2_IMPL=psycopg2 TORNADO_VER=">=5.0.0"
- MOMOKO_PSYCOPG2_IMPL=psycopg2cffi TORNADO_VER=">=5.0.0"

install: "pip install 'tornado>=4.0.0,<5.0.0' ${MOMOKO_PSYCOPG2_IMPL} unittest2"
install: 'pip install tornado"${TORNADO_VER}" ${MOMOKO_PSYCOPG2_IMPL} unittest2'
script: python setup.py test

matrix:
exclude:
- python: pypy
env: MOMOKO_PSYCOPG2_IMPL=psycopg2
env: MOMOKO_PSYCOPG2_IMPL=psycopg2 TORNADO_VER="<5.0.0"
- python: pypy
env: MOMOKO_PSYCOPG2_IMPL=psycopg2 TORNADO_VER=">=5.0.0"
- python: pypy3.5
env: MOMOKO_PSYCOPG2_IMPL=psycopg2 TORNADO_VER="<5.0.0"
- python: pypy3.5
env: MOMOKO_PSYCOPG2_IMPL=psycopg2 TORNADO_VER=">=5.0.0"
include:
# https://github.com/travis-ci/travis-ci/issues/9815
- python: 3.7
dist: xenial
sudo: true
33 changes: 21 additions & 12 deletions momoko/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@
from psycopg2.extras import register_json as _psy_register_json
from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE

import tornado
from tornado.ioloop import IOLoop
from tornado.concurrent import chain_future, Future

from .exceptions import PoolError, PartiallyConnectedError

# Backfill for tornado 5 compatability
# https://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.future_set_exc_info
if tornado.version_info[0] < 5:
def future_set_exc_info(future, exc_info):
future.set_exc_info(exc_info)
else:
from tornado.concurrent import future_set_exc_info

log = logging.getLogger('momoko')


Expand Down Expand Up @@ -414,7 +423,7 @@ def mogrify(self, *args, **kwargs):
See :py:meth:`momoko.Connection.mogrify` for documentation about the
parameters.
"""
return self._operate(Connection.mogrify, args, kwargs, async=False)
return self._operate(Connection.mogrify, args, kwargs, async_=False)

def register_hstore(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -451,7 +460,7 @@ def close(self):
self.conns.empty()
self.closed = True

def _operate(self, method, args=(), kwargs=None, async=True, keep=False, connection=None):
def _operate(self, method, args=(), kwargs=None, async_=True, keep=False, connection=None):
kwargs = kwargs or {}
future = Future()

Expand All @@ -461,7 +470,7 @@ def when_available(fut):
try:
conn = fut.result()
except psycopg2.Error:
future.set_exc_info(sys.exc_info())
future_set_exc_info(future, sys.exc_info())
if retry and not keep:
self.putconn(retry[0])
return
Expand All @@ -473,7 +482,7 @@ def when_available(fut):
log.debug("Method failed synchronously")
return self._retry(retry, when_available, conn, keep, future)

if not async:
if not async_:
future.set_result(future_or_result)
if not keep:
self.putconn(conn)
Expand Down Expand Up @@ -509,7 +518,7 @@ def _retry(self, retry, what, conn, keep, future):
else:
future.set_exception(self._no_conn_available_error)
else:
future.set_exc_info(sys.exc_info())
future_set_exc_info(future, sys.exc_info())
if not keep:
self.putconn(conn)
return
Expand Down Expand Up @@ -599,7 +608,7 @@ def on_ping_done(ping_fut):
if conn.closed:
ping_future.set_exception(self._no_conn_available_error)
else:
ping_future.set_exc_info(sys.exc_info())
future_set_exc_info(ping_future, sys.exc_info())
self.putconn(conn)
else:
ping_future.set_result(conn)
Expand Down Expand Up @@ -667,7 +676,7 @@ def connect(self):
Initiate asynchronous connect.
Returns future that resolves to this connection object.
"""
kwargs = {"async": True}
kwargs = {"async_": True}
if self.connection_factory:
kwargs["connection_factory"] = self.connection_factory
if self.cursor_factory:
Expand All @@ -680,7 +689,7 @@ def connect(self):
self.connection = psycopg2.connect(self.dsn, **kwargs)
except psycopg2.Error:
self.connection = None
future.set_exc_info(sys.exc_info())
future_set_exc_info(future, sys.exc_info())
return future

self.fileno = self.connection.fileno()
Expand Down Expand Up @@ -716,9 +725,9 @@ def _close_on_fail(self, future):
def _io_callback(self, future, result, fd=None, events=None):
try:
state = self.connection.poll()
except (psycopg2.Warning, psycopg2.Error):
except (psycopg2.Warning, psycopg2.Error) as err:
self.ioloop.remove_handler(self.fileno)
future.set_exc_info(sys.exc_info())
future_set_exc_info(future, sys.exc_info())
else:
try:
if state == POLL_OK:
Expand Down Expand Up @@ -881,7 +890,7 @@ def exec_statements(future):
if auto_rollback and not self.closed:
self._rollback(transaction_future, error)
else:
transaction_future.set_exc_info(sys.exc_info())
future_set_exc_info(transaction_future, sys.exc_info())
return

try:
Expand Down Expand Up @@ -917,7 +926,7 @@ def _register(self, future, registrator, fut):
try:
cursor = fut.result()
except Exception:
future.set_exc_info(sys.exc_info())
future_set_exc_info(future, sys.exc_info())
return

oid, array_oid = cursor.fetchone()
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from distutils.core import setup, Extension, Command


dependencies = ['tornado >= 4.0, <5.0', ]
dependencies = ['tornado >= 4.0, <6.0', ]
psycopg2_impl = os.environ.get('MOMOKO_PSYCOPG2_IMPL', 'psycopg2')

if psycopg2_impl == 'psycopg2cffi':
Expand Down Expand Up @@ -50,12 +50,12 @@
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: Implementation :: PyPy',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Topic :: Database',
'Topic :: Database :: Front-Ends'
]
Expand Down
57 changes: 49 additions & 8 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import datetime
import subprocess

import tornado
from tornado import gen
from tornado.testing import unittest, AsyncTestCase, gen_test

Expand Down Expand Up @@ -58,6 +59,20 @@
from psycopg2ct import compat
compat.register()

# Some compatibility with python 2.7
try:
ProcessLookupError
except NameError:
class ProcessLookupError(Exception): pass
try:
from subprocess import TimeoutExpired
def pwait(process, timeout=None):
process.wait(timeout)
except ImportError:
class TimeoutExpired(Exception): pass
def pwait(process, timeout=None):
process.wait()


import momoko
import momoko.exceptions
Expand Down Expand Up @@ -374,6 +389,14 @@ def test_ping_with_named_cursor(self):
conn = yield momoko.connect(self.dsn, ioloop=self.io_loop, cursor_factory=RealDictCursor)
yield conn.ping()

@gen_test
def test_cursor_factory_with_json(self):
"""Testing that register_json works with RealDictCursor"""
conn = yield momoko.connect(self.dsn, ioloop=self.io_loop, cursor_factory=RealDictCursor)
yield conn.register_json()
cursor = yield conn.execute("SELECT 1 AS a")
self.assertEqual(cursor.fetchone(), {"a": 1})


#
# Pool tests
Expand Down Expand Up @@ -453,14 +476,24 @@ class ProxyMixIn(object):

def start_proxy(self):
# Dirty way to make sure there are no proxies leftovers
subprocess.call(("killall", TCPPROXY_PATH), stderr=open("/dev/null", "w"))
with open("/dev/null", "w") as stderr:
subprocess.call(("killall", TCPPROXY_PATH), stderr=stderr)

proxy_conf = "127.0.0.1:%s -> %s:%s" % (db_proxy_port, db_host, db_port)
self.proxy = subprocess.Popen((TCPPROXY_PATH, proxy_conf,))
time.sleep(0.1)

def terminate_proxy(self):
self.proxy.terminate()
try:
self.proxy.terminate()
try:
pwait(self.proxy, 1)
except TimeoutExpired:
log.warn("Proxy didn't die within a second")
self.proxy.kill()
# ProcessLookupError == Exception on Py2.7, but OSError still slips through :?
except (ProcessLookupError, OSError):
pass

def kill_connections(self, db, amount=None):
self.terminate_proxy()
Expand Down Expand Up @@ -648,19 +681,26 @@ def test_connection_factory(self):
cursor = yield db.execute("SELECT 1 AS a")
self.assertEqual(cursor.fetchone(), {"a": 1})

@unittest.skipIf(not test_hstore, "Skipping test as requested")
@gen_test
def test_cursor_factory_with_extensions(self):
"""Testing that NamedTupleCursor factory is working with hstore and json"""
def test_cursor_factory_with_hstore_extension(self):
"""Testing that NamedTupleCursor factory is working with hstore"""
db = yield self.build_pool(cur_factory=NamedTupleCursor)

yield db.register_hstore()
yield db.register_json()

cursor = yield self.db.execute("SELECT 'a=>b, c=>d'::hstore;")
cursor = yield db.execute("SELECT 'a=>b, c=>d'::hstore;")
self.assertEqual(cursor.fetchall(), [({"a": "b", "c": "d"},)])

cursor = yield self.db.execute("SELECT %s;", ({'e': 'f', 'g': 'h'},))
self.assertEqual(cursor.fetchall(), [({"e": "f", "g": "h"},)])
@gen_test
def test_cursor_factory_with_json_extension(self):
"""Testing that NamedTupleCursor factory is working with json"""
db = yield self.build_pool(cur_factory=NamedTupleCursor)

yield db.register_json()

cursor = yield db.execute('SELECT \'{"a": "b", "c": "d"}\'::json;')
self.assertEqual(cursor.fetchall(), [({"a": "b", "c": "d"},)])


class MomokoPoolParallelTest(PoolBaseTest):
Expand Down Expand Up @@ -822,6 +862,7 @@ def test_ping_error(self):
except db.DatabaseNotAvailable:
pass

@unittest.skipIf(tornado.version_info[0] == 5, "This test does not work with Tornado 5.x for reasons yet known")
@gen_test
def test_abort_waiting_queue(self, final_exception=momoko.Pool.DatabaseNotAvailable):
"""Testing that waiting queue is aborted properly when all connections are dead"""
Expand Down

0 comments on commit f7f394e

Please sign in to comment.