Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixe using ThreadRunner with dataset factories #4093

Merged
merged 25 commits into from
Aug 29, 2024

Conversation

ElenaKhaustova
Copy link
Contributor

@ElenaKhaustova ElenaKhaustova commented Aug 15, 2024

Description

Fixes #4007

The issue is not deterministic and appears depending on the order in which threads are executed.

The issue appears for ThreadRunner when loading the same pattern dataset. Since patterns are resolved at runtime, corresponding datasets are added upon execution: load() -> _get_dataset() -> add(). Inside _get_dataset()

if dataset_name not in self._datasets and matched_pattern:
we check if dataset is in catalog and then call add() based on the state of the preceding check. The issue appears when first thread saved the state dataset is not in catalog but another thread added this dataset right after. So the first thread holds the outdated state based on which we try to add dataset to the catalog again.

This issue does not affect ParallelRunner because we never share objects between processes, so in each process, we have separate catalogs.

Development notes

The main source of the issue is that catalog.load() does not only read but also write, and the catalog itself is not thread-safe now. So it feels like it's a major problem which we can temporarily address as suggested and make it properly when runners redesign work.

There are two solutions suggested:

  1. The first solution includes adding threading.Lock for load() dataset (which does writing by calling add() inside) function which is applied to threads. With it, we check if dataset in the catalog and add dataset together as the whole dataset.load() is protected by the lock. The possible downside of this solution is affected performance but it should only affect the case when load() is done via calling an external API, because otherwise, we still cannot do simultaneous readings because of GIL.
  2. The second solution includes resolving all patterns before passing the catalog to the runner, so we just avoid cases when load() calls add() as all patterns are resolved based on the pipeline datasets. This solution neglects lazy pattern resolution and will not work with dynamic pipelines.

I prefer the first solution cause it feels wrong to write to a shared object without locking it.

Note: if testing you need to run it multiple times to reproduce the error as it's not deterministic.

Developer Certificate of Origin

We need all contributions to comply with the Developer Certificate of Origin (DCO). All commits must be signed off by including a Signed-off-by line in the commit message. See our wiki for guidance.

If your PR is blocked due to unsigned commits, then you must follow the instructions under "Rebase the branch" on the GitHub Checks page for your PR. This will retroactively add the sign-off to all unsigned commits and allow the DCO check to pass.

Checklist

  • Read the contributing guidelines
  • Signed off each commit with a Developer Certificate of Origin (DCO)
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change in the RELEASE.md file
  • Added tests to cover my changes
  • Checked if this change will affect Kedro-Viz, and if so, communicated that with the Viz team

@ElenaKhaustova ElenaKhaustova marked this pull request as ready for review August 15, 2024 10:52
Copy link
Contributor

@noklam noklam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue appears for both ThreadRunner and ParallelRunner when loading the same pattern dataset.

Does this issue also exist in ParallelRunner? I see all github issues are from ThreadRunner.

I am a bit concern about the performance penalty, the main issue here is a very simple check that causing duplicate dataset registration.

Even if we have to go with Lock, I think AbstractRunner is not the right place to add. See issues like this, in the past we have something called ShelveStore, simply importing multiprocess will cause issues in more restricted environment.

@ElenaKhaustova
Copy link
Contributor Author

ElenaKhaustova commented Aug 19, 2024

The issue appears for both ThreadRunner and ParallelRunner when loading the same pattern dataset.

Does this issue also exist in ParallelRunner? I see all github issues are from ThreadRunner.

I am a bit concern about the performance penalty, the main issue here is a very simple check that causing duplicate dataset registration.

Even if we have to go with Lock, I think AbstractRunner is not the right place to add. See issues like this, in the past we have something called ShelveStore, simply importing multiprocess will cause issues in more restricted environment.

Thank you, @noklam!

  1. It looked like it existed for ParallelRunner as it used the same method as ThreadRunner - _run_node_sequential where the problem appears. But I looked deeper into the implementation and after some tests, I can say that it does not affect ParallelRunner because we never share objects between processes, so in each process, we have separate catalogs.
  2. Based on the point 1 and your comment about issues when importing multiprocess I replaced it with threadinng.Lock as threading module seems alright for AWS Lambda: https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/run-parallel-reads-of-s3-objects-by-using-python-in-an-aws-lambda-function.html#:~:text=Lambda%20functions%20support%20up%20to,data%2C%20in%20an%20S3%20bucket.
  3. I agree that AbstractRunner might not be the right place for that, but given the current implementation, we can either do that or make a separate implementation of _run_node_sequential for ThreadRunner. But the main source of the issue is that catalog.load() does not only read but also write, and the catalog itself is not thread-safe now. So it feels like it's a major problem which we can temporarily address as suggested and make it properly when runners redesign work.
  4. I see the point about the performance issues, but it feels wrong to write to a shared object without locking it. Theoretically, it should only affect the case when load() is done via calling an external API, because otherwise, we still cannot do simultaneous readings because of GIL. The solution to replace the dataset when registering it does not seem good as well since we will have to differ the cases when we replace it to bypass this issue and the rest.

Copy link
Contributor

@noklam noklam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the point about the performance issues, but it feels wrong to write to a shared object without locking it. Theoretically, it should only affect the case when load() is done via calling an external API, because otherwise, we still cannot do simultaneous readings because of GIL. The solution to replace the dataset when registering it does not seem good as well since we will have to differ the cases when we replace it to bypass this issue and the rest.

I agree it's wrong to use shared object, it's definitely a bug and this is good enough for a temporary fix.

My question will be, would it make sense to resolve dataset pattern slightly earlier? We introduce dataset pattern to make catalog.yml simpler, this is resolved during pipeline execution. Most of the information is available once the pipeline is ready (roughly at before_pipeline_run). If we resolve the pattern here , it may be sufficient. (that is, of course ignoring the "dynamic pipeline" case where new dataset is being injected. But I think those case will suffer from the same problem anyway and most likely won't work with ParallelRunner/ThreadRunner from the first place.

@ElenaKhaustova
Copy link
Contributor Author

I see the point about the performance issues, but it feels wrong to write to a shared object without locking it. Theoretically, it should only affect the case when load() is done via calling an external API, because otherwise, we still cannot do simultaneous readings because of GIL. The solution to replace the dataset when registering it does not seem good as well since we will have to differ the cases when we replace it to bypass this issue and the rest.

I agree it's wrong to use shared object, it's definitely a bug and this is good enough for a temporary fix.

My question will be, would it make sense to resolve dataset pattern slightly earlier? We introduce dataset pattern to make catalog.yml simpler, this is resolved during pipeline execution. Most of the information is available once the pipeline is ready (roughly at before_pipeline_run). If we resolve the pattern here , it may be sufficient. (that is, of course ignoring the "dynamic pipeline" case where new dataset is being injected. But I think those case will suffer from the same problem anyway and most likely won't work with ParallelRunner/ThreadRunner from the first place.

Yes, it can be done as well, see the update. Not sure which fix is worse though 🤔

@ElenaKhaustova ElenaKhaustova changed the title Lock dataset load when multithreading and multiprocessing Lock dataset load when multithreading Aug 21, 2024
@astrojuanlu
Copy link
Member

My worry is that, as already pointed out, both fixes touch parts of Kedro that should be unencumbered by this.

The main source of the issue is that catalog.load() does not only read but also write, and the catalog itself is not thread-safe now.

What's the cost of keeping the issue unfixed and instead make the upcoming DataCatalog thread safe?

So it feels like it's a major problem which we can temporarily address as suggested and make it properly when runners redesign work.

One thing is not clear to me: is it enough to redesign the DataCatalog? Or would the runners need to be redesigned as well?

@ElenaKhaustova
Copy link
Contributor Author

My worry is that, as already pointed out, both fixes touch parts of Kedro that should be unencumbered by this.

The main source of the issue is that catalog.load() does not only read but also write, and the catalog itself is not thread-safe now.

What's the cost of keeping the issue unfixed and instead make the upcoming DataCatalog thread safe?

So it feels like it's a major problem which we can temporarily address as suggested and make it properly when runners redesign work.

One thing is not clear to me: is it enough to redesign the DataCatalog? Or would the runners need to be redesigned as well?

This particular issue will go in the new version if we implement this: #4110 (that's basically what we do in the second solution) and splitting reading from writing should address the rest of the cases.

But there might still be cases if we do some conditional writings within the catalog that require changes on the runners' side to make them safe. So, I would say that we will most probably need to redesign runners as well.

@astrojuanlu
Copy link
Member

Then my vote goes for the solution that touches the AbstractRunner and leaves the Session alone

@ElenaKhaustova ElenaKhaustova marked this pull request as draft August 21, 2024 17:41
@ElenaKhaustova
Copy link
Contributor Author

When working on DC2.0 I noted an issue second solution. Converted to draft until resolved.

@ElenaKhaustova
Copy link
Contributor Author

Solution 2 is fixed to consider only datasets from the target pipeline that match patterns.

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
@noklam
Copy link
Contributor

noklam commented Aug 23, 2024

Sorry for not realising this earlier😅

With the new concern with Spark, my priorities are:

  1. Solution 2 - Eagerly resolve pattern before pipeline.
  2. Do nothing (because I assume this actually break the usage of parallelise spark pipeline with 0.19?). Is there any easy way we can confirm this?
  3. Solution 1 (It should run without error, but I suspect this make it work more or less the same as SequentialRunner.)

I checked the telemetry but it's hard to confirm since most likely people disable telemetry on Databricks etc.
image

Cc @astrojuanlu

@astrojuanlu
Copy link
Member

Ugh... yeah difficult tradeoffs. I see this has been reported by at least 2 different users, but worsening the Spark/Databricks experience is not a great outcome either.

That, on top of the solutions being ugly already, makes me think that maybe we should indeed not do anything for now.

@noklam what are the implications of resolving

@noklam
Copy link
Contributor

noklam commented Aug 23, 2024

@lrcouto Would it be possible to test this since you are working on the pipeline setup? I think this may be a good test case.

@ElenaKhaustova
Copy link
Contributor Author

Alright, let's wait until the performance tests are out and then decide on the temporal approach based on the results.

@lrcouto
Copy link
Contributor

lrcouto commented Aug 26, 2024

I tried running a test project with spark, just to get an idea. Ran each 10 times or so.

Running it with the ThreadRunner before the changes:

  • Took around 3 minutes.
  • Used around 10~12% of my CPU.
  • Used around 20~22% of my memory.

Running it with the ThreadRunner after the changes:

  • Took around 4:30 minutes.
  • Used around 12~14% of my CPU.
  • Used around 23~25% of my memory.

I still want to try it again with something a little more complex, but I did perceive a small performance decrease. The test project I'm using is not super realistic and is not using any actual data, just generating and grouping 1.000.000 row dataframes to make the computer do things, so I still want to test it with something more akin to reality.

@astrojuanlu
Copy link
Member

Thanks a lot @lrcouto for sharing these results. From 3 minutes to 4:30 is still a 50 % performance decrease... do you have any deviation measurements?

@lrcouto
Copy link
Contributor

lrcouto commented Aug 26, 2024

Thanks a lot @lrcouto for sharing these results. From 3 minutes to 4:30 is still a 50 % performance decrease... do you have any deviation measurements?

I think I still want to test it a few more times just to make sure it wasn't an external factor (another process running on the background or something) that caused that difference, because it does look a bit high for me. I'll run a couple more times and bring the details.

@lrcouto
Copy link
Contributor

lrcouto commented Aug 26, 2024

I re-did the test making sure that there was nothing running in the background that could affect the results, and ran them in separate environments. The results were similar to what I've posted before.

Running 10x with regular Kedro, without the changes:

  • Run times in seconds:
194.364, 193.176, 204.715, 196.733, 191.605, 195.560, 195.658, 198.238, 194.492, 193.692
  • The longest run took 3m24.715s and the shortest run took 3m11.605s.
  • The mean between all the run times was 195.8233 seconds (3min15.8233s).
  • Standard deviation was 3.451.

Running 10x after the changes:

  • Run times in seconds:
266.357, 266.647, 269.864, 271.820, 264.308, 272.031, 270.103, 265.654, 261.948, 262.988
  • The longest run took 4m32.031s and the shortest took 4m21.948s
  • The mean between all the run times was 267.172 seconds (4min27.172s)
  • Standard deviation was 3.425.

@astrojuanlu
Copy link
Member

astrojuanlu commented Aug 26, 2024

In [1]: regular_kedro = [194.364, 193.176, 204.715, 196.733, 191.605, 195.560, 1
   ...: 95.658, 198.238, 194.492, 193.692]

In [2]: pr_4093 = [266.357, 266.647, 269.864, 271.820, 264.308, 272.031, 270.103
   ...: , 265.654, 261.948, 262.988]

In [3]: from scipy import stats

In [4]: stats.ttest_ind(regular_kedro, pr_4093)
Out[4]: TtestResult(statistic=np.float64(-44.027308662700634), pvalue=np.float64
(8.798458120158517e-20), df=np.float64(18.0))

🤓 we have enough evidence to reject that the times didn't worsen (read that twice)

Stats jokes aside, it's still a 40 % increase in run time. Can potentially be very annoying for some users.

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
…fix/4007-thread-runner-with-dataset-factories
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
@ElenaKhaustova ElenaKhaustova changed the title Lock dataset load when multithreading Fixe using ThreadRunner with dataset factories Aug 28, 2024
@ElenaKhaustova
Copy link
Contributor Author

Reverted to solution 2 after the test results for solution 1

@pytest.mark.usefixtures("mock_settings_context_class")
@pytest.mark.parametrize("fake_pipeline_name", [None, _FAKE_PIPELINE_NAME])
@pytest.mark.parametrize("match_pattern", [True, False])
def test_run_thread_runner(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test also verify that dataset factories are resolved? Or is it not possible to test that specifically because the behaviour is not deterministic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not test that because we mock all the calls to runner, pipelines, catalog, etc, as for the rest of the session tests. It only tests that when using ThreadRunner, we enter into the branches to resolve the patterns.

The resolution logic is tested with the catalog tests, though it can be tested here. But for that, we'll have to properly initialise runner, pipelines, and catalog - which might be too complex given that the resolution logic is already tested separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks for clarifying! 🙂

Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ElenaKhaustova , great work investigating the issue and coming up with a suitable solution!

Copy link
Contributor

@ankatiyar ankatiyar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment regarding the release notes but looks good! ⭐

RELEASE.md Outdated
@@ -1,8 +1,10 @@
# Upcoming Release
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the heading here as it was, it should be changes during the release so the automatic release notes extraction works properly!

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Copy link
Contributor

@noklam noklam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⭐️great work on figuring out the mocking and it is great to see the pipeline testing already help us to make some decisions 🔥 @lrcouto

@ElenaKhaustova ElenaKhaustova enabled auto-merge (squash) August 29, 2024 09:58
@ElenaKhaustova ElenaKhaustova merged commit f6319dd into main Aug 29, 2024
41 checks passed
@ElenaKhaustova ElenaKhaustova deleted the fix/4007-thread-runner-with-dataset-factories branch August 29, 2024 10:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DatasetAlreadyExistsError when using dataset factories and ThreadRunner
7 participants