Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Freezing During Shutdown: Exception Handling in finalize_visit_id Causes Controller to Halt #1068

Closed
MohammadMahdiJavid opened this issue Nov 17, 2023 · 9 comments · Fixed by #1073
Assignees
Labels

Comments

@MohammadMahdiJavid
Copy link

MohammadMahdiJavid commented Nov 17, 2023

Hi,

When the storage controller receives a shutdown signal, it waits for all tasks associated with each visit ID. However, during the shutdown process, if an exception is thrown while processing records in finalize_visit_id, the controller freezes and doesn't continue.
Why does it freeze? Are there any ways to overcome this issue? In the case of an exception, shouldn't it log the error and continue?

Exception (KeyError) is thrown at

del self.store_record_tasks[visit_id]

which is being called from

t = await self.finalize_visit_id(visit_id, success=False)

and at the end causes an infinite loop at

while True:
if self.closing and not self.unsaved_command_sequences:
# we're shutting down and have no unprocessed callbacks
break
visit_id_list = self.storage_controller_handle.get_new_completed_visits()
if not visit_id_list:
time.sleep(1)
continue

because it is waiting for the visit_id to appear in the completion queue.

in my logs i have already that the visit_id is awaited
storage_controller - INFO - Awaited all tasks for visit_id 5095731113063583 while finalizing
which indicates, it has already been finalized

self.logger.debug(
"Awaited all tasks for visit_id %d while finalizing", visit_id
)

but 3 lines below i have the above exception which says that it didn't find the visit id which makes sense since it is finalized 3 lines up and deleted afterwards

@vringar
Copy link
Contributor

vringar commented Nov 22, 2023

Hey,

the function gets called outside of the handler, during shutdown. See here

async def shutdown(self, completion_queue_task: Task[None]) -> None:
self.logger.info("Entering self.shutdown")
completion_tokens = {}
visit_ids = list(self.store_record_tasks.keys())
for visit_id in visit_ids:
t = await self.finalize_visit_id(visit_id, success=False)
if t is not None:
completion_tokens[visit_id] = t
await self.structured_storage.flush_cache()

I didn't expect storage providers to throw exceptions, but if this an issue you are encountering please submit a PR to catch errors that might occur.

@vringar
Copy link
Contributor

vringar commented Nov 23, 2023

What is currently confusing me is that there should be no concurrent access to this list, because we make sure the server is fully shut down before we call self.shutdown.

However, from your description we enter the finalize_visit_id with the same visit_id multiple times in close succession (because it doesn't get caught by this check) which shouldn't be possible because we are iterating over keys of a dict.
That means you should see two "Awaiting all tasks for visit_id 5095731113063583" before you see the "Awaited all tasks for visit_id 5095731113063583 while finalizing".

This points to a deeper logic bug, so while your fix in #1070 will prevent the error, it would also put the same visit id into the completion queue twice which would break other stuff.

@vringar vringar added bug and removed enhancement Not a bug or a feature request labels Nov 23, 2023
@vringar
Copy link
Contributor

vringar commented Nov 23, 2023

As I can't reproduce your issue, could you add something like

self.logger.error(traceback.format_stack(file=sys.stdout, limit=2))

just before the del so that we can figure out where the second caller is coming from?

@MohammadMahdiJavid
Copy link
Author

MohammadMahdiJavid commented Nov 26, 2023

in the logs we have 2 times Awaiting all tasks for visit_id 1780613490525049
i provide a new log with requested format_stack

'File "python3.10/asyncio/events.py", line 80, in _run\n    self._context.run(self._callback, *self._args)\n', 

'File "storage/storage_controller.py", line 430, in _run\n    await self.shutdown(update_completion_queue)\n', 
                               
'File "storage/storage_controller.py", line 319, in shutdown\n    t = await self.finalize_visit_id(visit_id, success=False)\n', 
                               
'File "storage/storage_controller.py", line 253, in finalize_visit_id\n    {traceback.format_stack()}\n'

additional logs which might be useful

browser_manager      - INFO     - Starting to work on CommandSequence with visit_id 1780613490525049 on browser with id 3578416253

browser_manager      - INFO     - Finished working on CommandSequence with visit_id 1780613490525049 on browser with id 3578416253
storage_controller   - INFO     - Terminating handler, because the underlying socket closed
storage_controller   - INFO     - Awaiting all tasks for visit_id 1780613490525049
storage_controller   - INFO     - ========================================
storage_controller   - INFO     - Received shutdown signal! in async def should_shutdown in StorageController
storage_controller   - INFO     - Entering self.shutdown
storage_controller   - INFO     - Awaiting all tasks for visit_id 1200
storage_controller   - INFO     - Awaited all tasks for visit_id 1200 while finalizing
sql_provider         - WARNING  - Visit with visit_id 1200 got interrupted

storage_controller   - INFO     - Awaiting all tasks for visit_id 1339
storage_controller   - INFO     - Awaited all tasks for visit_id 1339 while finalizing
sql_provider         - WARNING  - Visit with visit_id 1339 got interrupted
storage_controller   - INFO     - MY DEBUG: completion token achieved

storage_controller   - INFO     - Awaiting all tasks for visit_id 1780613490525049

Executing <Task pending name='Task-1' coro=<StorageController._run() running at storage/storage_controller.py:430> wait_for=<Task pending name='Task-69839' coro=<BCSQLiteStorageProvider.store_record() running at storage/sql_provider.py:81> cb=[Task.task_wakeup(), Task.task_wakeup()] created at python3.10/asyncio/tasks.py:337> cb=[_run_until_complete_cb() at python3.10/asyncio/base_events.py:184] created at python3.10/asyncio/tasks.py:636> took 0.951 seconds

Executing <Task finished name='Task-69966' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 1.971 seconds

Executing <Task finished name='Task-69968' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.168 seconds

Executing <Task finished name='Task-69969' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.112 seconds

Executing <Task finished name='Task-70038' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 1.976 seconds
Executing <Task finished name='Task-70046' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.166 seconds
Executing <Task finished name='Task-70048' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.112 seconds
Executing <Task finished name='Task-70053' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.168 seconds

storage_controller   - INFO     - Awaited all tasks for visit_id 1780613490525049 while finalizing
storage_controller   - ERROR    - visit_id 1780613490525049, is already awaited, skipping...
storage_controller   - ERROR    - 


'File "python3.10/asyncio/events.py", line 80, in _run\n    self._context.run(self._callback, *self._args)\n', 

'File "storage/storage_controller.py", line 430, in _run\n    await self.shutdown(update_completion_queue)\n', 
                               
'File "storage/storage_controller.py", line 319, in shutdown\n    t = await self.finalize_visit_id(visit_id, success=False)\n', 
                               
'File "storage/storage_controller.py", line 253, in finalize_visit_id\n    {traceback.format_stack()}\n'
                               ]
                              
storage_controller   - INFO     - Awaiting all tasks for visit_id 1340
storage_controller   - INFO     - Awaited all tasks for visit_id 1340 while finalizing
sql_provider         - WARNING  - Visit with visit_id 1340 got interrupted

@MohammadMahdiJavid
Copy link
Author

and at the top of the logs i have

- ERROR    - Traceback (most recent call last):
    manager.execute_command_sequence(command_sequence)
    
  File "openwpm/task_manager.py", line 428, in execute_command_sequence
    agg_queue_size = self.storage_controller_handle.get_most_recent_status()
    
  File "storage/storage_controller.py", line 614, in get_most_recent_status
    raise RuntimeError(
RuntimeError: No status update from the storage controller process for 131 seconds.

and after this exception i can not run any manager.execute_command_sequence
they are not related?

@MohammadMahdiJavid
Copy link
Author

but since the function returns None

        if visit_id not in self.store_record_tasks:
            self.logger.error(
                "visit_id %d, is already awaited, skipping...",
                visit_id,
            )
            return None

https://github.com/openwpm/OpenWPM/pull/1070/commits/57adefc07167035faa1cdf9351f927d328d48d93#diff-5c57c8ae8ef4f387b82ff0868d46bde18281d7b78861bf03c0f7dd440effcb22R225#L225

it doesn't go inside completion_tokens, which means it also doesn't go inside completion_queue on

self.completion_queue.put((visit_id, False))

t = await self.finalize_visit_id(visit_id, success=False)
if t is not None:
completion_tokens[visit_id] = t
await self.structured_storage.flush_cache()
await completion_queue_task
for visit_id, token in completion_tokens.items():
await token
self.completion_queue.put((visit_id, False))
await self.structured_storage.shutdown()

@vringar
Copy link
Contributor

vringar commented Nov 30, 2023

🚀 thank you so much for discovering this bug!

     t = await self.finalize_visit_id(visit_id, success=False) 
-    if t is not None: 
-        completion_tokens[visit_id] = t
+    completion_tokens[visit_id] = t

Would be the first part of the fix, that puts items in the completion queue during shutdown, even when they don't have a completion token. (We can await None and it will instantly return None so this is not a problem.)

As for the original issue, I now think the correct patch is

+ store_record_tasks = self.store_record_tasks[visit_id]
+ del self.store_record_tasks[visit_id]
+ for task in store_record_tasks:
-  for task in self.store_record_tasks[visit_id]:
            await task
- del self.store_record_tasks[visit_id]

This way when we enter the function the second time after yielding in await task we won't see the tasks in the dict anymore.

@vringar
Copy link
Contributor

vringar commented Dec 5, 2023

Hey @MohammadMahdiJavid,

I deployed the fix I described as PR #1073.

Could you test on the latest master branch and confirm everything is fixed?

MohammadMahdiJavid added a commit to MohammadMahdiJavid/OpenWPM that referenced this issue Dec 6, 2023
MohammadMahdiJavid added a commit to MohammadMahdiJavid/OpenWPM that referenced this issue Dec 6, 2023
@MohammadMahdiJavid
Copy link
Author

MohammadMahdiJavid commented Dec 6, 2023

Hi,

Apologies for the delay in response, each experiment takes a few days to finish
I changed things as shown in the PR, and experimented 2 times, however couldn't regenerate the error
I believe the fix works

although i messed up by forgetting to put some visit ids into completion_queue

i'm gonna test the deployed changes and come back again if anything goes wrong

just a quick note for enhancement
since we raise an exception, in case of exception handling we need to
self._last_status = None in order not to use previous one for new command, right? (applied in the PR)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants