Skip to content

Commit

Permalink
fix in _handle_sts_session to refresh sts token at 80% of TTL for issue
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxghub committed Jun 23, 2024
1 parent afe18fd commit fd0377a
Showing 1 changed file with 14 additions and 16 deletions.
30 changes: 14 additions & 16 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
import socket
import string
import uuid
from datetime import datetime
from datetime import datetime, timezone, timedelta
from queue import Empty

from botocore.client import Config
Expand Down Expand Up @@ -734,23 +734,21 @@ def sqs(self, queue=None):
return c

def _handle_sts_session(self, queue, q):
if not hasattr(self, 'sts_expiration'): # STS token - token init
datetime_now_utc = datetime.now(timezone.utc).replace(
tzinfo=None
)
sts_token_timeout = self.transport_options.get('sts_token_timeout', 900)
# STS token is generated only if it is not present or
# the time reaches 80% of the token TTL
if (not hasattr(self, 'sts_expiration')) or (
self.sts_expiration.replace(tzinfo=None)
- timedelta(seconds=int(sts_token_timeout * 0.2))
< datetime_now_utc
):
sts_creds = self.generate_sts_session_token(
self.transport_options.get('sts_role_arn'),
self.transport_options.get('sts_token_timeout', 900))
self.sts_expiration = sts_creds['Expiration']
c = self._predefined_queue_clients[queue] = self.new_sqs_client(
region=q.get('region', self.region),
access_key_id=sts_creds['AccessKeyId'],
secret_access_key=sts_creds['SecretAccessKey'],
session_token=sts_creds['SessionToken'],
sts_token_timeout,
)
return c
# STS token - refresh if expired
elif self.sts_expiration.replace(tzinfo=None) < datetime.utcnow():
sts_creds = self.generate_sts_session_token(
self.transport_options.get('sts_role_arn'),
self.transport_options.get('sts_token_timeout', 900))
self.sts_expiration = sts_creds['Expiration']
c = self._predefined_queue_clients[queue] = self.new_sqs_client(
region=q.get('region', self.region),
Expand All @@ -759,7 +757,7 @@ def _handle_sts_session(self, queue, q):
session_token=sts_creds['SessionToken'],
)
return c
else: # STS token - ruse existing
else: # STS token still valid - reuse existing
return self._predefined_queue_clients[queue]

def generate_sts_session_token(self, role_arn, token_expiry_seconds):
Expand Down

0 comments on commit fd0377a

Please sign in to comment.