Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FivetranOperator fails when a connector goes to rescheduled state #107

Open
SouravBhowmikDE opened this issue Sep 4, 2024 · 4 comments
Open

Comments

@SouravBhowmikDE
Copy link

SouravBhowmikDE commented Sep 4, 2024

Hi
We have a Google connector that goes to rescheduled state every now and then.
And whenever the connector that goes to rescheduled state, the FivetranOperator Airflow task always fails.
It seems that the FivetranOperator has a bug and it does not handle the rescheduled state correctly.
Can someone please fix the issue asap.

Below is the logs from the failed FivetranOperator Airflow task:

[2024-09-04, 07:37:09 CDT] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled__2024-09-04T10:37:00+00:00 [queued]>
[2024-09-04, 07:37:09 CDT] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled__2024-09-04T10:37:00+00:00 [queued]>
[2024-09-04, 07:37:09 CDT] {{taskinstance.py:1361}} INFO - Starting attempt 1 of 1
[2024-09-04, 07:37:09 CDT] {{taskinstance.py:1382}} INFO - Executing <Task(FivetranOperator): fivetran_sync_google_ads_dcr> on 2024-09-04 10:37:00+00:00
[2024-09-04, 07:37:09 CDT] {{standard_task_runner.py:57}} INFO - Started process 21551 to run task
[2024-09-04, 07:37:09 CDT] {{standard_task_runner.py:84}} INFO - Running: ['airflow', 'tasks', 'run', 'fivetran_sync_google_ads_dcr', 'fivetran_sync_google_ads_dcr', 'scheduled__2024-09-04T10:37:00+00:00', '--job-id', '10888165', '--raw', '--subdir', 'DAGS_FOLDER/fivetran/fivetran_ingress.py', '--cfg-path', '/tmp/tmplk3d8cgu']
[2024-09-04, 07:37:09 CDT] {{standard_task_runner.py:85}} INFO - Job 10888165: Subtask fivetran_sync_google_ads_dcr
[2024-09-04, 07:37:09 CDT] {{task_command.py:416}} INFO - Running <TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled__2024-09-04T10:37:00+00:00 [running]> on host ip-172-22-27-20.ec2.internal
[2024-09-04, 07:37:10 CDT] {{taskinstance.py:1662}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='fivetran_sync_google_ads_dcr' AIRFLOW_CTX_TASK_ID='fivetran_sync_google_ads_dcr' AIRFLOW_CTX_EXECUTION_DATE='2024-09-04T10:37:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-09-04T10:37:00+00:00'
[2024-09-04, 07:37:10 CDT] {{base.py:73}} INFO - Using connection ID 'data_fivetran' for task execution.
[2024-09-04, 07:37:10 CDT] {{hooks.py:320}} INFO - Connector type: google_ads, connector schema: google_ads_dcr
[2024-09-04, 07:37:10 CDT] {{hooks.py:321}} INFO - Connectors logs at https://fivetran.com/dashboard/connectors/google_ads/google_ads_dcr/logs
[2024-09-04, 07:37:11 CDT] {{hooks.py:487}} INFO - Connector hard_stockholder: sync_state = scheduled
[2024-09-04, 07:37:11 CDT] {{taskinstance.py:1526}} INFO - Pausing task as DEFERRED. dag_id=fivetran_sync_google_ads_dcr, task_id=fivetran_sync_google_ads_dcr, execution_date=20240904T103700, start_date=20240904T123709
[2024-09-04, 07:37:12 CDT] {{local_task_job_runner.py:225}} INFO - Task exited with return code 100 (task deferral)
[2024-09-04, 07:37:13 CDT] {{base.py:73}} INFO - Using connection ID 'data_fivetran' for task execution.
[2024-09-04, 07:37:13 CDT] {{hooks.py:487}} INFO - Connector hard_stockholder: sync_state = syncing
[2024-09-04, 07:37:13 CDT] {{triggers.py:89}} INFO - sync is still running...
[2024-09-04, 07:37:13 CDT] {{triggers.py:90}} INFO - sleeping for 30 seconds.
[2024-09-04, 07:37:43 CDT] {{hooks.py:487}} INFO - Connector hard_stockholder: sync_state = syncing
[2024-09-04, 07:37:43 CDT] {{triggers.py:89}} INFO - sync is still running...
[2024-09-04, 07:37:43 CDT] {{triggers.py:90}} INFO - sleeping for 30 seconds.
[2024-09-04, 07:38:13 CDT] {{hooks.py:487}} INFO - Connector hard_stockholder: sync_state = rescheduled
[2024-09-04, 07:38:13 CDT] {{hooks.py:514}} INFO - Connector is in "rescheduled" state and needs to be manually restarted
[2024-09-04, 07:38:13 CDT] {{hooks.py:556}} INFO - Starting connector again in 0 seconds
[2024-09-04, 07:38:13 CDT] {{hooks.py:570}} INFO - Restarting connector now
[2024-09-04, 07:38:13 CDT] {{triggerer_job_runner.py:599}} INFO - Trigger fivetran_sync_google_ads_dcr/scheduled__2024-09-04T10:37:00+00:00/fivetran_sync_google_ads_dcr/-1/1 (ID 4971) fired: TriggerEvent<{'status': 'error', 'message': "'BasicAuth' object is not callable"}>
[2024-09-04, 07:38:22 CDT] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled__2024-09-04T10:37:00+00:00 [queued]>
[2024-09-04, 07:38:22 CDT] {{taskinstance.py:1159}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled__2024-09-04T10:37:00+00:00 [queued]>
[2024-09-04, 07:38:22 CDT] {{taskinstance.py:1359}} INFO - Resuming after deferral
[2024-09-04, 07:38:22 CDT] {{taskinstance.py:1382}} INFO - Executing <Task(FivetranOperator): fivetran_sync_google_ads_dcr> on 2024-09-04 10:37:00+00:00
[2024-09-04, 07:38:22 CDT] {{standard_task_runner.py:57}} INFO - Started process 22523 to run task
[2024-09-04, 07:38:22 CDT] {{standard_task_runner.py:84}} INFO - Running: ['airflow', 'tasks', 'run', 'fivetran_sync_google_ads_dcr', 'fivetran_sync_google_ads_dcr', 'scheduled__2024-09-04T10:37:00+00:00', '--job-id', '10888173', '--raw', '--subdir', 'DAGS_FOLDER/fivetran/fivetran_ingress.py', '--cfg-path', '/tmp/tmpkx044i49']
[2024-09-04, 07:38:22 CDT] {{standard_task_runner.py:85}} INFO - Job 10888173: Subtask fivetran_sync_google_ads_dcr
[2024-09-04, 07:38:22 CDT] {{task_command.py:416}} INFO - Running <TaskInstance: fivetran_sync_google_ads_dcr.fivetran_sync_google_ads_dcr scheduled__2024-09-04T10:37:00+00:00 [running]> on host ip-172-22-24-53.ec2.internal
[2024-09-04, 07:38:22 CDT] {{taskinstance.py:1937}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1606, in resume_execution
return execute_callable(context)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/fivetran_provider_async/operators.py", line 170, in execute_complete
raise AirflowException(msg)
airflow.exceptions.AirflowException: error: 'BasicAuth' object is not callable
[2024-09-04, 07:38:22 CDT] {{taskinstance.py:1400}} INFO - Marking task as FAILED. dag_id=fivetran_sync_google_ads_dcr, task_id=fivetran_sync_google_ads_dcr, execution_date=20240904T103700, start_date=20240904T123709, end_date=20240904T123822
[2024-09-04, 07:38:22 CDT] {{base.py:73}} INFO - Using connection ID 'slack_conn_id' for task execution.
[2024-09-04, 07:38:22 CDT] {{base.py:73}} INFO - Using connection ID 'slack_conn_id' for task execution.
[2024-09-04, 07:38:23 CDT] {{standard_task_runner.py:104}} ERROR - Failed to execute job 10888173 for task fivetran_sync_google_ads_dcr (error: 'BasicAuth' object is not callable; 22523)
[2024-09-04, 07:38:23 CDT] {{local_task_job_runner.py:228}} INFO - Task exited with return code 1

@pankajastro
Copy link
Contributor

Hi @SouravBhowmikDE, would you be interested in submitting a fix? I’d be happy to review and merge it.

@SouravBhowmikDE
Copy link
Author

Hi @pankajastro Sure, I will give this a try in a few days. I have never done this before.

@sean-rose
Copy link

If you do this, please take care to not have the operator continue waiting if the connector is set to manual schedule mode. For connectors with a manual schedule, the RESCHEDULED state should continue to be treated like the sync failed so the Airflow operator can fail and potentially get retried to actually re-trigger the sync.

IMO Fivetran should never have connectors with a manual schedule end in a RESCHEDULED state, and I submitted a Fivetran feature request to that effect, but I have little hope Fivetran will actually take action on that.

@JeremyDOwens
Copy link

JeremyDOwens commented Dec 10, 2024

Overview

I'm not sure that failing is correct @sean-rose . There is already logic in place to wait and trigger the sync again after a configurable amount of time (

if sync_state == "rescheduled" and connector_details["schedule_type"] == "manual":
). This works properly once the task retries. Side note: This happens synchronously, and the operator does not defer while waiting to reschedule.

My theory

The undesirable behavior is that the operator is hitting an unhandled exception airflow.exceptions.AirflowException: error: 'BasicAuth' object is not callable (presumably uncaught here:

) triggering this line:
yield TriggerEvent({"status": "error", "message": str(e)})
because we aren't handling the "rescheduled" case.

The TriggerEvent error is then passed back to the operator and executing this line:

Deeper dive:

The synchronous _do_api_call method uses requests.request and passes in a tuple for auth
The async _do_api_call_async method uses aiohttp.ClientSession.request and passes in an aiohttp.BasicAuth object.

If, in any case the auth key within the kwargs object that gets passed around has been set to reference a BasicAuth object, any calls to _do_api_call (sync) will fail, because this line won't overwrite kwargs["auth"]. We'd end up passing the BasicAuth object to requests.request (

)

This case occurs here when the pause_and_restart function is called - leading to start_fivetran_sync

return self.start_fivetran_sync(connector_id)

see this excerpt:

[2024-12-10, 17:26:21 UTC] {hooks.py:502} INFO - Connector ninth_desert: sync_state = syncing
[2024-12-10, 17:26:21 UTC] {triggers.py:90} INFO - sync is still running...
[2024-12-10, 17:26:21 UTC] {triggers.py:91} INFO - sleeping for 15 seconds.
[2024-12-10, 17:26:36 UTC] {hooks.py:502} INFO - Connector ninth_desert: sync_state = syncing
[2024-12-10, 17:26:36 UTC] {triggers.py:90} INFO - sync is still running...
[2024-12-10, 17:26:36 UTC] {triggers.py:91} INFO - sleeping for 15 seconds.
[2024-12-10, 17:26:51 UTC] {hooks.py:502} INFO - Connector ninth_desert: sync_state = rescheduled
[2024-12-10, 17:26:51 UTC] {hooks.py:529} INFO - Connector is in "rescheduled" state and needs to be manually restarted
[2024-12-10, 17:26:51 UTC] {hooks.py:571} INFO - Starting connector again in 0 seconds
[2024-12-10, 17:26:51 UTC] {hooks.py:585} INFO - Restarting connector now
[2024-12-10, 17:26:51 UTC] {triggerer_job_runner.py:631} INFO - Trigger digital_ads_cosmos/manual__2024-12-09T19:26:22.093579+00:00/fivetran_syncs.run_fivetran_google_ads_sync/-1/2 (ID 3) fired: TriggerEvent<{'status': 'error', 'message': "'BasicAuth' object is not callable"}>

Edit:

The problem actually appears to be that pause_and_restart when called by the subclass FivetranHookAsync calls start_fivetran_sync. Within that call, it executes _do_api_call, which calls _prepare_api_call_kwargs. In this situation, the instance of the hook is from the subclass, and it uses the overridden method - thus returning a BasicAuth instance for the kwargs["auth]" value and erroneously passing it to requests.request.

The root cause here is that by overriding the _prepare_api_call_kwargs method in FivetranHookAsync all of the synchronous methods will fail when called from an instance of the async subclass.

My suggestion

Suggested fix:

  • Adjust the logic in the superclass implementation of _prepare_api_call_kwargs to always set the auth value to the tuple so that the behavior is deterministic (no risk of bad object passthrough using setdefault).
  • Adjust logic in the subclass implementation of _prepare_api_call_kwargs to avoid the call to the superclass if there is already an instance of the BasicAuth object
  • Rename the overridden _prepare_api_call_kwargs to _prepare_api_call_kwargs_async
  • Update _do_api_call_async to reference the renamed method

@pankajastro Do you have any thoughts?

Sample PR is up - let me know what you think

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants