Skip to content

Commit

Permalink
Merge branch 'rl-1.0.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
amenezes committed Dec 11, 2022
2 parents 3a432d9 + 0f08059 commit 4ba78e4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
2 changes: 1 addition & 1 deletion discovery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)
from .client import Consul

__version__ = "1.0.0"
__version__ = "1.0.1"
__all__ = [
"Consul",
"HealthState",
Expand Down
20 changes: 14 additions & 6 deletions discovery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def __init__(
)
self._leader_id: Optional[str] = None

@property
def current_leader_id(self) -> Optional[str]:
return self._leader_id

async def leader_ip(self, *args, **kwargs) -> str:
try:
current_leader = await self.status.leader(*args, **kwargs)
Expand Down Expand Up @@ -89,6 +93,7 @@ async def register(
**kwargs,
) -> None:
self._leader_id = await self.leader_id(**kwargs)
log.debug(f"Consul leader id: [current='{self.current_leader_id}']")
try:
await self.agent.service.register(service.dict(), **kwargs)
except Exception as err:
Expand All @@ -97,7 +102,7 @@ async def register(
if enable_watch:
loop = asyncio.get_running_loop()
loop.create_task(
self._watch_connection(service, enable_watch, **kwargs),
self._watch_connection(service, **kwargs),
name="discovery-client-watch-connection",
)

Expand All @@ -108,15 +113,18 @@ async def reconnect(self, service: Service, *args, **kwargs) -> None:
await self.deregister(service.id) # type: ignore
await self.register(service, *args, **kwargs)

async def _watch_connection(self, service: Service, *args, **kwargs) -> None:
async def _watch_connection(self, service: Service, **kwargs) -> None:
while True:
await asyncio.sleep(self.reconnect_timeout)
try:
await asyncio.sleep(self.reconnect_timeout)
current_id = await self.leader_id()
if current_id != self._leader_id:
await self.reconnect(service, *args, **kwargs)
if current_id != self.current_leader_id:
log.debug(
f"Consul leader id changed: [current='{self.current_leader_id}', new='{current_id}']"
)
await self.reconnect(service, **kwargs)
except Exception:
log.error(
log.info(
f"Failed to connect to Consul, trying again at {self.reconnect_timeout}/s"
)

Expand Down

0 comments on commit 4ba78e4

Please sign in to comment.