Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parallel replaygain analysis #3478

Merged
merged 20 commits into from
Dec 14, 2020
Merged

Conversation

ybnd
Copy link
Contributor

@ybnd ybnd commented Jan 28, 2020

Implements #2224

  • Add --jobs or -j to replaygain to set the pool size

  • Single-threaded execution by default, if --jobs is unset

  • If multithreaded, calls Backend.compute_album_gain or Backend.compute_track_gain asynchronously with metadata storing/writing in the callback

* Add `--jobs` or `-j` to `replaygain`-> set the pool size
* Single-threaded execution by default, if `--jobs` is unset

* If multithreaded, calls `Backend.compute_album_gain` or `Backend.compute_track_gain` asynchronously with metadata storing/writing in the callback
Copy link
Member

@sampsyo sampsyo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome; thanks for tackling this!!

I have a few comments in here about how we can reduce some code duplication by introducing a "maybe run in parallel" utility.

Could I ask you to please also add to the documentation for the plugin (and add a changelog entry)?

beetsplug/replaygain.py Show resolved Hide resolved
beetsplug/replaygain.py Show resolved Hide resolved
self._log.debug(u'done analyzing {0}', item)

try:
if hasattr(self, 'pool'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could simplify these conditionals by introducing a utility like _apply that takes an optional pool argument? If the pool is None, then this utility would just call the function directly. Otherwise, it would use a proper call to run the function asynchronously. Then we can centralize the "maybe do this parallel" logic rather than replicating it for the individual calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be much better. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in 388d2d2, with the exception of the optional pool argument, do the "pool or no pool" check in _has_pool instead. The plugin should still have pool as an attribute for parallel execution to work at import ~ 79c5535.

@@ -1381,17 +1433,30 @@ def commands(self):
def func(lib, opts, args):
write = ui.should_write(opts.write)
force = opts.force
jobs = opts.jobs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of defaulting to off when jobs = 0, let's use our cpu_count utility:

def cpu_count():

Copy link
Contributor Author

@ybnd ybnd Jan 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do something like default = math.ceil(cpu_count() / 4) to keep CPU usage reasonable by default for large queries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And maybe have it default to no parallelism if configured with threaded: no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the best policy would be to match the behavior of the convert plugin:
https://github.com/beetbox/beets/blob/master/beetsplug/convert.py#L119

The option is named threads there, and it does default to the CPU count. I think it's probably OK to default to that? If people want to run in the background without disruption, it's easy to turn down the count…

beetsplug/replaygain.py Outdated Show resolved Hide resolved
@ybnd
Copy link
Contributor Author

ybnd commented Jan 30, 2020

Ok, so I remembered why I made it default to non-parallel :)

While it does work with my on-disk library, it fails in test_replaygain.py on my machine, maybe because there the library is :in_memory:? For some reason it works in CI though.

The Travis build fails because I'm not propagating exceptions to the main thread. Apparently it's a common issue, didn't know that...

Copy link
Member

@sampsyo sampsyo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! A couple more comments within.

beetsplug/replaygain.py Show resolved Hide resolved
write = ui.should_write(opts.write)
force = opts.force

if opts.threads != 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this condition might be supposed to be != 1? Or better yet, the condition should come after checking the config file too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I wanted to include the option to completely bypass ThreadPool from the CLI with --threads 0, seems like a useful feature down the line in case we ever suspect the parallel processing is messing something up
  • ThreadPool(1) still provides one worker thread that can work asynchronously which may be useful in some cases? I don’t know.

beetsplug/replaygain.py Outdated Show resolved Hide resolved
@sampsyo
Copy link
Member

sampsyo commented Jan 31, 2020

Looking pretty good! I assume we should probably figure out the exception logging thing before merging?

* ExceptionWatcher instance running in parallel to the pool, monitoring a queue for exceptions
* Pooled threads push exceptions to this queue, log non-fatal exceptions
* Application exits on fatal exception in pooled thread

* More front end info logs in the CLI
@ybnd
Copy link
Contributor Author

ybnd commented Feb 4, 2020

Added an exception handling thread.

ReplayGainError exceptions raised in the worker threads jost get logged, while FatalReplayGainError and other exceptions are reraised in the main thread.

@jtpavlock
Copy link
Contributor

Pinging @sampsyo in case he didn't see the last request for review.

Did you have anything else to add to this?

@ybnd
Copy link
Contributor Author

ybnd commented Aug 12, 2020

Fixed the conflicts on my end.

Some tests are failing though, fixing those now.

Comment on lines 1312 to 1319
def _store(self, item):
try:
item.store()
except OperationalError:
# test_replaygain.py :memory: library can fail with
# `sqlite3.OperationalError: no such table: items`
# but the second attempt succeeds
item.store()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, the exception is never thrown when working with an 'actual' database.

@stale
Copy link

stale bot commented Dec 11, 2020

Is this still relevant? If so, what is blocking it? Is there anything you can do to help move it forward?

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Dec 11, 2020
@sampsyo
Copy link
Member

sampsyo commented Dec 11, 2020

So sorry for the delay on this. Everything looks awesome, for the most part—I have no specific code-level comments on the actual parallelization.

However, I do think we need to do something about the OperationalError handler that @ybnd highlighted. OperationalError is a very generic exception and can arise in lots of cases where we really do want to know something went wrong with the database—for example, if we make an error in generating the SQL query. Is there any way we can move this handler/suppression to the test itself instead of putting it here in the plugin?

@stale stale bot removed the stale label Dec 11, 2020
@ybnd
Copy link
Contributor Author

ybnd commented Dec 12, 2020

@sampsyo No problem, I know you've been super busy :)

Yeah, that would be way more sensible. I'll look into it!

Copy link
Contributor Author

@ybnd ybnd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pulled the sqlite3.OperationalError thing into test_replaygain.py as suggested.

Somewhat annoyingly this whole workaround is not "necessary" for the CI builds since they just skip all of the real tests...
I double-checked with the master branch and it's this PR that's causing this, for sure.

How do you think I should proceed?

  • Leave it like this and submit an issue (which you could assign to me so I could keep looking into it)
  • Look into it before going forward with this PR

in test_replaygain.py, so that it can still fail appropriately
outside of these tests.
"""
item.store()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the OperationalError handler, added an explanation as to why this seemingly useless method is there

item.store()


@patch.object(ReplayGainPlugin, '_store', _store_retry_once)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see there's no way to try/except on a higher level; the "false alarm" happens at every item, so it looks like we still need to ignore it for every item...

@sampsyo
Copy link
Member

sampsyo commented Dec 14, 2020

Awesome. This is really mysterious, but I like your temporary workaround to make the tests go through. Since you're comfortable with it, I'd like to forge ahead and merge this. Would you mind opening a new issue to get to the bottom of what's going on?

Many thanks once again!!

@skapazzo
Copy link

This seems to give me problems: replaygain metadata doesn't get written to file on import.

If I import an album with replaygain: auto: yes and any backend that has do_parallel = True replaygain values get calculated and saved in the db but not written to the files.
If I set do_parallel = False for the backend I'm using the metadata gets written as expected; setting threads = 0 doesn't help.

@jackwilsdon
Copy link
Member

@skapazzo could you please open a new issue for this?

@skapazzo
Copy link

@skapazzo could you please open a new issue for this?

Sure, I'll do it straight away.

sampsyo added a commit that referenced this pull request Oct 1, 2022
The parallelism strategy in #3478, in retrospect, used a pretty funky
way to deal with exceptions in the asynchronous work---since
`apply_async` has an `error_callback` parameter that's meant for exactly
this. The problem is that the wrapped function would correctly log the
exception *and then return `None`*, confusing any downstream code.
Instead of just adding `None`-awareness to the callback, let's just
avoid running the callback altogether in the case of an error.
JOJ0 pushed a commit to JOJ0/beets that referenced this pull request Oct 30, 2022
The parallelism strategy in beetbox#3478, in retrospect, used a pretty funky
way to deal with exceptions in the asynchronous work---since
`apply_async` has an `error_callback` parameter that's meant for exactly
this. The problem is that the wrapped function would correctly log the
exception *and then return `None`*, confusing any downstream code.
Instead of just adding `None`-awareness to the callback, let's just
avoid running the callback altogether in the case of an error.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants