-
Notifications
You must be signed in to change notification settings - Fork 2
/
PGReadWriteSplitting.py
133 lines (107 loc) · 5.89 KB
/
PGReadWriteSplitting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
if TYPE_CHECKING:
from aws_advanced_python_wrapper.hostinfo import HostInfo
from aws_advanced_python_wrapper.pep249 import Connection
import psycopg
from aws_advanced_python_wrapper import AwsWrapperConnection
from aws_advanced_python_wrapper.connection_provider import \
ConnectionProviderManager
from aws_advanced_python_wrapper.errors import (
FailoverFailedError, FailoverSuccessError,
TransactionResolutionUnknownError)
from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \
SqlAlchemyPooledConnectionProvider
def configure_pool(host_info: HostInfo, props: Dict[str, Any]) -> Dict[str, Any]:
return {"pool_size": 5}
def get_pool_key(host_info: HostInfo, props: Dict[str, Any]) -> str:
# Include the URL, user, and database in the connection pool key so that a new
# connection pool will be opened for each different instance-user-database combination.
url = host_info.url
user = props["user"]
db = props["dbname"]
return f"{url}{user}{db}"
def configure_initial_session_states(conn: Connection):
awscursor = conn.cursor()
awscursor.execute("SET TIME ZONE 'UTC'")
def execute_queries_with_failover_handling(conn: Connection, sql: str, params: Optional[Union[Dict, Tuple]] = None):
try:
cursor = conn.cursor()
cursor.execute(sql, params)
return cursor
except FailoverSuccessError:
# Query execution failed and AWS Advanced Python Driver successfully failed over to an available instance.
# https://github.com/awslabs/aws-advanced-python-wrapper/blob/main/docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md#FailoverFailedError---successful-failover
# The old cursor is no longer reusable and the application needs to reconfigure sessions states.
configure_initial_session_states(conn)
# Retry query
cursor = conn.cursor()
cursor.execute(sql)
return cursor
except FailoverFailedError as e:
# User application should open a new connection, check the results of the failed transaction and re-run it if needed. See:
# https://github.com/awslabs/aws-advanced-python-wrapper/blob/main/docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md#FailoverFailedError---unable-to-establish-sql-connection
raise e
except TransactionResolutionUnknownError as e:
# User application should check the status of the failed transaction and restart it if needed. See:
# https://github.com/awslabs/aws-advanced-python-wrapper/blob/main/docs/using-the-python-driver/using-plugins/UsingTheFailoverPlugin.md#TransactionResolutionUnknownError---transaction-resolution-unknown
raise e
if __name__ == "__main__":
params = {
"host": "database.cluster-xyz.us-east-1.rds.amazonaws.com",
"dbname": "postgres",
"user": "john",
"password": "pwd",
"plugins": "read_write_splitting,failover,host_monitoring",
"wrapper_dialect": "aurora-pg",
"autocommit": True
}
"""
Optional: configure read/write splitting to use internal connection pools.
The arguments passed to SqlAlchemyConnectionProvider are optional, see UsingTheReadWriteSplittingPlugin.md
for more info.
"""
provider = SqlAlchemyPooledConnectionProvider(configure_pool, get_pool_key)
ConnectionProviderManager.set_connection_provider(provider)
""" Setup step: open connection and create tables """
with AwsWrapperConnection.connect(psycopg.Connection.connect, **params) as conn:
configure_initial_session_states(conn)
execute_queries_with_failover_handling(
conn, "CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)")
execute_queries_with_failover_handling(
conn, "INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200))
execute_queries_with_failover_handling(
conn, "INSERT INTO bank_test VALUES (%s, %s, %s)", (1, "John Smith", 200))
""" Example step: open connection and perform transaction """
try:
with AwsWrapperConnection.connect(psycopg.Connection.connect, **params) as conn, conn.cursor() as cursor:
configure_initial_session_states(conn)
execute_queries_with_failover_handling(
conn, "UPDATE bank_test SET account_balance=account_balance - 100 WHERE name=%s", ("Jane Doe",))
execute_queries_with_failover_handling(
conn, "UPDATE bank_test SET account_balance=account_balance + 100 WHERE name=%s", ("John Smith",))
# Internally switch to a reader connection
conn.read_only = True
for i in range(2):
cursor = execute_queries_with_failover_handling(conn, "SELECT * FROM bank_test WHERE id = %s", (i,))
results = cursor.fetchall()
for record in results:
print(record)
finally:
with AwsWrapperConnection.connect(psycopg.Connection.connect, **params) as conn:
execute_queries_with_failover_handling(conn, "DROP TABLE bank_test")
""" If connection pools were enabled, close them here """
ConnectionProviderManager.release_resources()