Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fixed should_reload behaviour, close PostgreSQL connections, block until PostgresqlWatcher is ready, refactorings #29

Merged
merged 33 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3edbf37
chore: updated dev requirements
trbtm Jul 8, 2024
8101bcd
chore: format code with black
trbtm Jul 8, 2024
98ed568
chore: updated .gitignore
trbtm Jul 8, 2024
bb99917
fix: type hint, multiprocessing.Pipe is a Callable and not a type
trbtm Jul 8, 2024
df1b804
fix: make Watcher.should_reload return value consistent
trbtm Jul 8, 2024
c6fb087
fix: Handle Connection and Process objects consistenly and close them…
trbtm Jul 8, 2024
434a84c
feat: Customize the postgres channel name
trbtm Jul 8, 2024
90689ef
chore: Some code reorg
trbtm Jul 8, 2024
772a261
docs: added doc string for PostgresqlWatcher.update
trbtm Jul 8, 2024
f94294e
refactor: PostgresqlWatcher.set_update_callback
trbtm Jul 8, 2024
c26887c
refactor!: Rename 'start_process' flag to 'start_listening'
trbtm Jul 8, 2024
bc33cf5
docs: Added doc string to PostgresqlWatcher.__init__
trbtm Jul 8, 2024
9aba002
fix: Added proper destructor for PostgresqlWatcher
trbtm Jul 8, 2024
eb5c8f3
chore: fix type hints and proper handling of the channel_name argumen…
trbtm Jul 8, 2024
94e23d7
test: fix tests
trbtm Jul 8, 2024
1db8e23
feat: Setup logging module for unit tests
trbtm Jul 8, 2024
f0e6479
fix: typo
trbtm Jul 8, 2024
1640e75
feat: channel subscription with proper resource cleanup
trbtm Jul 8, 2024
b86a400
chore: removed unnecessary tests
trbtm Jul 8, 2024
e0a6337
feat: Wait for Process to be ready to receive messages from PostgreSQL
trbtm Jul 8, 2024
87056d6
test: multiple instances of the watcher
trbtm Jul 8, 2024
af31dff
test: make sure every test case uses its own channel
trbtm Jul 8, 2024
f31ba26
test: no update
trbtm Jul 8, 2024
b464dcb
refactor: moved code into with block
trbtm Jul 8, 2024
b130463
feat: automaticall call the update handler if it is provided
trbtm Jul 8, 2024
5ffe939
refactor: sorted imports
trbtm Jul 8, 2024
e6a7bdc
docs: updated README
trbtm Jul 8, 2024
09b2e8d
refactor: improved readibility
trbtm Jul 9, 2024
6f3c031
refactor: resolve a potential infinite loop with a custom Exception
trbtm Jul 9, 2024
241ac5c
refactor: make timeout configurable by the user
trbtm Jul 9, 2024
c053715
fix: docs
trbtm Jul 9, 2024
63a78c3
fix: ensure type hint compatibility with Python 3.9
trbtm Jul 10, 2024
8939531
feat: make sure multiple calls of update() get resolved by one call o…
trbtm Jul 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,4 @@ dmypy.json

.idea/
*.iml
.vscode
44 changes: 36 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pip install casbin-postgresql-watcher
```

## Basic Usage Example
### With Flask-authz

```python
from flask_authz import CasbinEnforcer
from postgresql_watcher import PostgresqlWatcher
Expand All @@ -25,23 +25,51 @@ from casbin.persist.adapters import FileAdapter

casbin_enforcer = CasbinEnforcer(app, adapter)
watcher = PostgresqlWatcher(host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME)
watcher.set_update_callback(casbin_enforcer.e.load_policy)
watcher.set_update_callback(casbin_enforcer.load_policy)
casbin_enforcer.set_watcher(watcher)
```

## Basic Usage Example With SSL Enabled
# Call should_reload before every call of enforce to make sure
# the policy is update to date
watcher.should_reload()
if casbin_enforcer.enforce("alice", "data1", "read"):
# permit alice to read data1
pass
else:
# deny the request, show an error
pass
```

See [PostgresQL documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS) for full details of SSL parameters.
alternatively, if you need more control

### With Flask-authz
```python
from flask_authz import CasbinEnforcer
from postgresql_watcher import PostgresqlWatcher
from flask import Flask
from casbin.persist.adapters import FileAdapter

casbin_enforcer = CasbinEnforcer(app, adapter)
watcher = PostgresqlWatcher(host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME, sslmode="verify_full", sslcert=SSLCERT, sslrootcert=SSLROOTCERT, sslkey=SSLKEY)
watcher.set_update_callback(casbin_enforcer.e.load_policy)
watcher = PostgresqlWatcher(host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME)
casbin_enforcer.set_watcher(watcher)

# Call should_reload before every call of enforce to make sure
# the policy is update to date
if watcher.should_reload():
casbin_enforcer.load_policy()

if casbin_enforcer.enforce("alice", "data1", "read"):
# permit alice to read data1
pass
else:
# deny the request, show an error
pass
```

## Basic Usage Example With SSL Enabled

See [PostgresQL documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS) for full details of SSL parameters.

```python
...
watcher = PostgresqlWatcher(host=HOST, port=PORT, user=USER, password=PASSWORD, dbname=DBNAME, sslmode="verify_full", sslcert=SSLCERT, sslrootcert=SSLROOTCERT, sslkey=SSLKEY)
...
```
2 changes: 1 addition & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
black==20.8b1
black==24.4.2
2 changes: 1 addition & 1 deletion postgresql_watcher/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .watcher import PostgresqlWatcher
from .watcher import PostgresqlWatcher, PostgresqlWatcherChannelSubscriptionTimeoutError
108 changes: 108 additions & 0 deletions postgresql_watcher/casbin_channel_subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from enum import IntEnum
from logging import Logger
from multiprocessing.connection import Connection
from select import select
from signal import signal, SIGINT, SIGTERM
from time import sleep
from typing import Optional

from psycopg2 import connect, extensions, InterfaceError


CASBIN_CHANNEL_SELECT_TIMEOUT = 1 # seconds


def casbin_channel_subscription(
process_conn: Connection,
logger: Logger,
host: str,
user: str,
password: str,
channel_name: str,
port: int = 5432,
dbname: str = "postgres",
delay: int = 2,
sslmode: Optional[str] = None,
sslrootcert: Optional[str] = None,
sslcert: Optional[str] = None,
sslkey: Optional[str] = None,
):
# delay connecting to postgresql (postgresql connection failure)
sleep(delay)
db_connection = connect(
host=host,
port=port,
user=user,
password=password,
dbname=dbname,
sslmode=sslmode,
sslrootcert=sslrootcert,
sslcert=sslcert,
sslkey=sslkey,
)
# Can only receive notifications when not in transaction, set this for easier usage
db_connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
db_cursor = db_connection.cursor()
context_manager = _ConnectionManager(db_connection, db_cursor)

with context_manager:
db_cursor.execute(f"LISTEN {channel_name};")
logger.debug("Waiting for casbin policy update")
process_conn.send(_ChannelSubscriptionMessage.IS_READY)

while not db_cursor.closed:
try:
select_result = select(
[db_connection],
[],
[],
CASBIN_CHANNEL_SELECT_TIMEOUT,
)
if select_result != ([], [], []):
logger.debug("Casbin policy update identified")
db_connection.poll()
while db_connection.notifies:
notify = db_connection.notifies.pop(0)
logger.debug(f"Notify: {notify.payload}")
process_conn.send(_ChannelSubscriptionMessage.RECEIVED_UPDATE)
except (InterfaceError, OSError) as e:
# Log an exception if these errors occurred without the context beeing closed
if not context_manager.connections_were_closed:
logger.critical(e, exc_info=True)
break


class _ChannelSubscriptionMessage(IntEnum):
IS_READY = 1
RECEIVED_UPDATE = 2


class _ConnectionManager:
"""
You can not use 'with' and a connection / cursor directly in this setup.
For more details see this issue: https://github.com/psycopg/psycopg2/issues/941#issuecomment-864025101.
As a workaround this connection manager / context manager class is used, that also handles SIGINT and SIGTERM and
closes the database connection.
"""

def __init__(self, connection, cursor) -> None:
self.connection = connection
self.cursor = cursor
self.connections_were_closed = False

def __enter__(self):
signal(SIGINT, self._close_connections)
signal(SIGTERM, self._close_connections)
return self

def _close_connections(self, *_):
if self.cursor is not None:
self.cursor.close()
self.cursor = None
if self.connection is not None:
self.connection.close()
self.connection = None
self.connections_were_closed = True

def __exit__(self, *_):
self._close_connections()
Loading
Loading