Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use deterministic job_ids to avoid retrying successful queries #977

Closed
wants to merge 17 commits into from

Conversation

McKnight-42
Copy link
Contributor

@McKnight-42 McKnight-42 commented Oct 23, 2023

resolves #949
docs dbt-labs/docs.getdbt.com/#

Problem

Currently if we experience a transient exception like RemoteDisconnected we can sometimes end up re-running a query that has been successfully kicked off.

Solution

on retry try to poll by a deterministic job_id to see if a successful job has already been kicked off

Checklist

  • I have read the contributing guide and understand what's expected of me
  • I have run this code in development and it appears to resolve the stated issue
  • This PR includes tests, or tests are not required/relevant for this PR
  • This PR has no interface changes (e.g. macros, cli, logs, json artifacts, config files, adapter interface, etc) or this PR has already received feedback and approval from Product or DX

@McKnight-42 McKnight-42 added the Skip Changelog Skips GHA to check for changelog file label Oct 23, 2023
@McKnight-42 McKnight-42 self-assigned this Oct 23, 2023
@cla-bot cla-bot bot added the cla:yes label Oct 23, 2023
Comment on lines 290 to 310
def cancel_open(self) -> None:
pass
names = []
this_connection = self.get_if_exists()
with self.lock:
for thread_id, connection in self.thread_connections.items():
if connection is this_connection:
continue

if connection.handle is not None and connection.state == ConnectionState.OPEN:
client = connection.handle
for job_id in self.jobs_by_thread.get(thread_id, []):

def fn():
return client.cancel_job(job_id)

self._retry_and_handle(msg=f"Cancel job: {job_id}", conn=connection, fn=fn)

self.close(connection)

if connection.name is not None:
names.append(connection.name)
Copy link
Contributor Author

@McKnight-42 McKnight-42 Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return names here? if we do we need to re-add it and change the type for function. to match what's in dbt-core on the SQLConnectionManager

Within dbt-bigquery the only place we call this is in unit tests

@McKnight-42
Copy link
Contributor Author

Having to modify two unit tests to take into account object() not having newly assigned attributes from cancel_open

    adapter.connections.thread_connections[0] = object()

test_cancel_open_connections_master in test_bigquery_adapter.py would it be best to create a new mock?

@McKnight-42
Copy link
Contributor Author

McKnight-42 commented Oct 25, 2023

all integration/functional tests failing now for

dbt.exceptions.DbtRuntimeError: Runtime Error
E             'Client' object has no attribute 'connection'

in raw_execute

@McKnight-42
Copy link
Contributor Author

all integration/functional tests failing now for

dbt.exceptions.DbtRuntimeError: Runtime Error
E             'Client' object has no attribute 'connection'

in raw_execute

solved this part of it by not going through defined client and just using conn.name

@McKnight-42
Copy link
Contributor Author

McKnight-42 commented Oct 25, 2023

@colin-rogers-dbt it's possible we may still have to use uuid like the original pr as I don't think invocation_id is active on a fresh run during the raw_execute so we may need to define a unique identifier instead. thoughts?

this seems to pass locally but not a fan of interaction when you add a breakpoint around ln524 on connections.py seems like it loops to many times in pdb


self.close(connection)

if connection.name is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we specifically want all connections which are not this_connection? Or do we only want connections in which we cancelled jobs? In this current flow, a connection for which connection.state == ConnectionState.CLOSED will show up in names, which doesn't feel like an intuitive list to get from `cancel_open'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i backtrack open_cancel to core it looks like one of the only places we call it is for cancel_open_connections which does make me think the desired result is that all closed connections are accounted for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words we should only be returning the connections which we cancelled during this call, right?

if connection is this_connection:
continue

if connection.handle is not None and connection.state == ConnectionState.OPEN:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be over-engineering, but I would consider putting the contents of this if block into its own method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear are you referring to lines 308-321?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm referring to lines 310-318. Then this would look like:

names = []
this_connection = self.get_if_exists()

with self.lock:
    for thread_id, connection in self.thread_connections.items():

        if connection is this_connection:
            continue

        if connection.handle and connection.state == ConnectionState.OPEN:
            self.close_thread(thread_id, connection)  # or whatever name you choose

        if name := connection.name:
            names.append(name)

return names

Something else worth considering is whether we want to handle all of the threads within a connection. I don't know if there's more than one thread for a connection, but I feel like there is. If there's a connection with more than one thread, you'll close that connection in the second condition above when you get to the first thread. Then you'll skip past the second condition for every other thread since connection.state should be closed at that point.

I think what you probably want is a list of job_ids by connection. Then for each connection you would cancel the job. Once all jobs are cancelled, then close the connection.

# build out determinsitic_id
model_name = conn.credentials.schema # schema name as model name is not
invocation_id = str(uuid.uuid4())
job_id = define_job_id(model_name, invocation_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think uuid.uuid4() is deterministic, which means job_id is not either. Have you considered an md5 hash of sufficient attributes (model, connection name, etc.)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently calling uuid directly as part of getting unit tests swapped over for functionality I think initial/current plan was to use the invocation_id we define via tracking in core https://docs.getdbt.com/reference/dbt-jinja-functions/invocation_id and it itself is a uuid based on docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using invocation_id (which we only sometimes have) should we just use the actual query text (which we have to have)?

@@ -0,0 +1,3 @@
def define_job_id(model_name, invocation_id):
job_id = f"{model_name}_{invocation_id}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the constraint on job_id? Is there a max length? Can all characters that go into a model name also be used in a job id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will definitely have to test this, I think all characters are fine as we should just be combining 2 strings but length may hit a limit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would want to make sure that we can submit a job_id to BQ with some weird characters. People put all kinds of things in their model names. An alternative is to hash the model_name so that it's only alpha-numeric.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 if we want to stick with uuid we can just generate a deterministic one with uuid.uuid5

Comment on lines +6 to +12
def define_job_id(sql, invocation_id=None):
if invocation_id:
job_id = str(uuid.uuid5(invocation_id, sql))
else:
job_id = str(uuid.uuid5(_INVOCATION_ID, sql))
job_id = job_id.replace("-", "_")
return job_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leverage a macro to let end users override that logic to make it unique across invocations if needed

Copy link
Contributor

github-actions bot commented Jun 5, 2024

This PR has been marked as Stale because it has been open with no activity as of late. If you would like the PR to remain open, please comment on the PR or else it will be closed in 7 days.

@github-actions github-actions bot added the Stale label Jun 5, 2024
Copy link
Contributor

Although we are closing this PR as stale, it can still be reopened to continue development. Just add a comment to notify the maintainers.

@github-actions github-actions bot closed this Jun 12, 2024
@mikealfare mikealfare deleted the mcknight/adap-924 branch July 17, 2024 23:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla:yes Skip Changelog Skips GHA to check for changelog file Stale
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[ADAP-924] [Feature] Use deterministic job_ids to avoid retrying successful queries
4 participants