diff --git a/.travis.yml b/.travis.yml index 6a9366323..28bb4b0f2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -86,11 +86,6 @@ script: - cargo test -p rayon - cargo test -p rayon-core - ./ci/highlander.sh - - | - if [ -n "$RUSTFLAGS" ]; then - cargo build -p rayon-futures && - cargo test -p rayon-futures - fi branches: only: diff --git a/Cargo.toml b/Cargo.toml index 676fba52a..541bd6f50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ categories = ["concurrency"] exclude = ["/ci/*", "/scripts/*", "/.travis.yml", "/appveyor.yml", "/bors.toml"] [workspace] -members = ["rayon-demo", "rayon-core", "rayon-futures"] +members = ["rayon-demo", "rayon-core"] exclude = ["ci"] [dependencies] diff --git a/appveyor.yml b/appveyor.yml index a223c7e6f..f3bf4ec96 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -29,10 +29,6 @@ test_script: - cargo build - cargo test -p rayon - cargo test -p rayon-core - - if not "%RUSTFLAGS%"=="%^RUSTFLAGS%" ( - cargo build -p rayon-futures && - cargo test -p rayon-futures - ) branches: only: diff --git a/ci/compat-Cargo.lock b/ci/compat-Cargo.lock index 707883b23..35201a99b 100644 --- a/ci/compat-Cargo.lock +++ b/ci/compat-Cargo.lock @@ -1,3 +1,5 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. [[package]] name = "aho-corasick" version = "0.7.6" @@ -276,11 +278,6 @@ name = "fuchsia-cprng" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "futures" -version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "getrandom" version = "0.1.13" @@ -906,14 +903,6 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rayon-futures" -version = "0.1.0" -dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon-core 1.6.1", -] - [[package]] name = "rdrand" version = "0.4.0" @@ -1311,7 +1300,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" "checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" "checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" -"checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" "checksum getrandom 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "e7db7ca94ed4cd01190ceee0d8a8052f08a247aa1b469a7f68c6a3b71afcf407" "checksum gl_generator 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "39a23d5e872a275135d66895d954269cf5e8661d234eb1c2480f4ce0d586acbd" "checksum gl_generator 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ca98bbde17256e02d17336a6bdb5a50f7d0ccacee502e191d3e3d0ec2f96f84a" diff --git a/rayon-futures/Cargo.toml b/rayon-futures/Cargo.toml deleted file mode 100644 index 971ccdb3f..000000000 --- a/rayon-futures/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "rayon-futures" -version = "0.1.0" # reminder to update html_root_url attribute -authors = ["Niko Matsakis ", - "Josh Stone "] -description = "Futures integration into Rayon" -edition = "2018" -license = "Apache-2.0/MIT" -repository = "https://github.com/rayon-rs/rayon" -documentation = "https://docs.rs/rayon-futures/" -readme = "README.md" -keywords = ["parallel", "thread", "concurrency", "join", "performance"] -categories = ["concurrency"] - -[dependencies] -rayon-core = { version = "1.3", path = "../rayon-core" } -futures = "0.1.16" diff --git a/rayon-futures/LICENSE-APACHE b/rayon-futures/LICENSE-APACHE deleted file mode 100644 index 16fe87b06..000000000 --- a/rayon-futures/LICENSE-APACHE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - -TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - -1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - -2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - -3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - -4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - -5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - -6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - -7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - -8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - -9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - -END OF TERMS AND CONDITIONS - -APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - -Copyright [yyyy] [name of copyright owner] - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. diff --git a/rayon-futures/LICENSE-MIT b/rayon-futures/LICENSE-MIT deleted file mode 100644 index 25597d583..000000000 --- a/rayon-futures/LICENSE-MIT +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2010 The Rust Project Developers - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/rayon-futures/README.md b/rayon-futures/README.md deleted file mode 100644 index f595a5b6b..000000000 --- a/rayon-futures/README.md +++ /dev/null @@ -1,274 +0,0 @@ -# Future integration into Rayon - -**NOTE:** `rayon-futures` currently requires unstable features of -`rayon-core`, which may only be enabled with `rustc --cfg`, -e.g. by setting `RUSTFLAGS=--cfg rayon_unstable` in the environment. - -## How futures work - -Let's start with a brief coverage of how futures work. Our example will -be a simple chain of futures: - - F_map -> F_socket - -Here `F_socket` is a future that maps to a TCP socket. It returns a -`Vec` of data read from that socket. `F_map` is a future will take -that data and do some transformation. (Note that the real futures for -reading from sockets etc do not work in this way, this is just an -example.) - -The idea of futures is that each future offers a `poll()` method. When -`poll()` is invoked, the future will attempt to execute. Typically, -this often involves recursively calling `poll()` on other futures. So, -in our example, `F_map` when it starts would call `F_socket.poll()` to -see if the data is ready. The idea is that `poll()` returns one of -three values: - -- `Ok(Async::Ready(R))` -- the future has completed, here is the result `R`. -- `Err(E)` -- the future has completed and resulted in an error `E`. -- `Ok(Async::NotReady)` -- the future is not yet complete. - -The last one is the most interesting. It means that the future is -blocked on *some event X*, typically an I/O event (i.e., we are -waiting for more data to arrive on a TCP socket). - -When a future returns `NotReady`, it also has one additional job. It -must register the "current task" (think for now of the current thread) -to be re-awoken when the event X has occurred. For most futures, this -job is delegated to another future: e.g., in our example, `F_map` -invokes `F_socket.poll()`. So if `F_socket.poll()` returns not-ready, -then it will have registered the current thread already, and hence -`F_map` can merely propagates the `NotReady` result further up. - -### The current task and executor - -A key concept of the futures.rs library is that of an *executor*. The -executor is the runtime that first invokes the top-level future -(`F_map`, in our example). This is precisely the role that Rayon -plays. Note that in any futures system there may be many -interoperating executors though. - -Part of an executor's job is to maintain some thread-local storage -(TLS) when a future is executing. In particular, it must setup the -"current task" (basically a unique integer, although it's an opaque -type) as well as an "unpark object" of type -`Arc`. [The `Unpark` trait][unpark] offers a single method -(`unpark()`) which can be invoked when the task should be -re-awoken. So `F_socket` might, for example, get the current -`Arc` object and store it for use by an I/O thread. The I/O -thread might invoke `epoll()` or `select()` or whatever and, when it -detects the socket has more data, invoke the `unpark()` method. - -[unpark]: https://docs.rs/futures/0.1/futures/executor/trait.Unpark.html - -## Rayon's futures integration - -When you spawn a future of type `F` into rayon, the idea is that it is -going to start independently executing in the thread-pool. Meanwhile, -the `spawn_future()` method returns to you your own future (let's call -it `F'`) that you can use to poll and monitor its progress. Internally -within Rayon, however, we only allocate a single `Arc` to represent -both of these things -- an `Arc>`, to be precise -- and -this `Arc` hence serves two distinct roles. - -The operations on `F'` (the handle returned to the user) are specified -by the trait `ScopeFutureTrait` and are very simple. The user can -either `poll()` the future, which is checking to see if rayon is done -executing it yet, or `cancel()` the future. `cancel()` occurs when -`F'` is dropped, which indicates that the user no longer has interest -in the result. - -### Future reference counting - -Each spawned future is represented by an `Arc`. This `Arc` actually has -some interesting structure. Each of the edges in the diagram below -represents something that is "kept alive" by holding a ref count (in -some way, usually via an `Arc`): - - F' ---+ [ deque ] --+ - | | - v v - +---> /---------------------\ - | | registry: | ------> [rayon registry] - | | contents: --------\ | - | | | scope | | ------> [spawning scope] - | | | this | | --+ (self references) - | | | ... | | | - | | \-----------------/ | | - | \---------------------/ | - +-------------------------------+ - -Let's walk through them: - -- The incoming edge from `F'` represents the edge from the future that was returned - to the caller of `spawn_future`. This ensures that the future arc will - not be freed so long as the caller is still interested in looking at - its result. -- The incoming edge from `[ deque ]` represents the fact that when the - future is enqueued into a thread-local deque (which it only - sometimes is), that deque holds a ref. This is done by transmuting - the `Arc` into a `*const Job` object (and hence the `*const` - logically holds the ref that was owned by the `Arc`). When the job - is executed, it is transmuted back and the resulting `Arc` is - eventually dropped, releasing the ref. -- The `registry` field holds onto an `Arc` and hence keeps - some central registry alive. This doesn't really do much but prevent - the `Registry` from being dropped. In particular, this doesn't - prevent the threads in a registry from terminating while the future - is unscheduled etc (though other fields in the future do). -- The `scope` field (of type `S`) is the "enclosing scope". This scope - is an abstract value that implements the `FutureScope<'scope>` trait - -- this means that it is responsible for ensuring that `'scope` does - not end until one of the `FutureScope` methods are invoked (which - occurs when the future has finished executing). For example, if the - future is spawned inside a `scope()` call, then the `S` will be a - wrapper (`ScopeFutureScope`) around a `*const Scope<'scope>`. When - the future is created one job is allocated for this future in the - scope, and the scope counter is decremented once the future is - marked as completing. - - In general, the job of the `scope` field is to ensure that the - future type (`F`) remains valid. After all, since `F: 'scope`, `F` - is known to be valid until the lifetime `'scope` ends, and that - lifetime cannot end until the `scope` methods are invoked, so we - know that `F` must stay valid until one of those methods are - invoked. - - All of our data of type `F` is stored in the field `spawn` (not - shown here). This field is always set to `None` before the scope - counter is decremented. See the section on lifetime safety for more - details. -- The `this` field stores an `Arc` which is actually - this same future. Thus the future has a ref count cycle - and cannot be freed until this cycle is broken. That field - is actually an `Option>` and will be set - to `None` once the future is complete, breaking the cycle and - allowing it to be freed when other references are dropped. - -### The future state machine - -Internally, futures go through various states, depicted here: - - PARKED <----+ - | | - v | - UNPARKED | - | | - v | - EXECUTING --+ - | | ^ - | v | - | EXECUTING_UNPARKED - | - v - COMPLETE - -When they are first created, futures begin as *PARKED*. A *PARKED* -future is one that is waiting for something to happen. It is not -scheduled in the deque of any thread. Even before we return from -`spawn_future()`, however, we will transition into *UNPARKED*. An -*UNPARKED* future is one that is waiting to be executed. It is -enqueued in the deque of some Rayon thread and hence will execute when -the thread gets around to it. - -Once the future begins to execute (it itself is a Rayon job), it -transitions into the *EXECUTING* state. This means that it is busy -calling `F.poll()`, basically. While it calls `poll()`, it also sets -up its `contents.this` field as the current "notify" instance. Hence -if `F` returns `NotReady`, it will clone the `this` field and hold -onto it to signal us the future is ready to execute again. - -For now let's assume that `F` is complete and hence returns either -`Ok(Ready(_))` or `Err(_)`. In that case, the future can transition to -`COMPLETE`. At this point, many bits of state that are no longer -needed (e.g., the future itself, but also the `this` field) -are set to `None` and dropped, and the result is stored in the -`result` field. (Moreover, we may have to signal other tasks, but that -is discussed in a future section.) - -If `F` returns `Ok(Async::NotReady)`, then we would typically -transition to the `PARKED` state and await the call to -`notify()`. When `notify()` is called, it would move the future into -the `UNPARK` state and inject it into the registry. - -However, due to the vagaries of thread-scheduling, it *can* happen -that `notify()` is called before we exit the `EXECUTING` state. For -example, we might invoke `F.poll()`, which sends the `Unpark` instance -to the I/O thread, which detects I/O, and invokes `notify()`, all -before `F.poll()` has returned. In that case, the `notify()` method -will transition the state (atomically, of course) to -`EXECUTING_UNPARKED`. In that case, instead of transitioning to -`PARKED` when `F.poll()` returns, the future will simply transition -right back to `EXECUTING` and try calling `poll()` again. This can -repeat a few times. - -### Lifetime safety - -Of course, Rayon's signature feature is that it allows you to use a -future `F` that includes references, so long as those references -outlive the lifetime of the scope `'scope`. So why is this safe? - -The basic idea of why this is safe is as follows. The `ScopeFuture` -struct holds a ref on the scope itself (via the field `scope`). -Until this ref is decremented, the scope will not end (and hence -`'scope` is still active). This ref is only decremented while the -future transitions into the *COMPLETE* state -- so anytime before -then, we know we don't have to worry, the references are still valid. - -As we transition into the *COMPLETE* state is where things get more -interesting. You'll notice that signaling the `self.scope` job as done -is the *last* thing that happens during that transition. Importantly, -before that is done, we drop all access that we have to the type `F`: -that is, we store `None` into the fields that might reference values -of type `F`. This implies that we know that, whatever happens after we -transition into *COMPLETE*, we can't access any of the references -found in `F` anymore. - -This is good, because there *are* still active refs to the -`ScopeFuture` after we enter the *COMPLETE* state. There are two -sources of these: unpark values and the future result. - -**NotifyHandle values.** We may have given away `NotifyHandle` values -- -these contain trait objects that are actually refs to our -`ScopeFuture`. Note that `NotifyHandle: 'static`, so these could be -floating about for any length of time (we had to transmute away the -lifetimes to give them out). This is ok because (a) the `Arc` keeps -the `ScopeFuture` alive and (b) the only thing you can do is to call -`notify()`, which will promptly return since the state is *COMPLETE* -(and, anyhow, as we saw above, it doesn't have access to any -references anyhow). - -**Future result.** The other, more interesting reference to the -`ScopeFuture` is the value that we gave back to the user when we -spawned the future in the first place. This value is more interesting -because it can be used to do non-trivial things, unlike the -`NotifyHandle`. If you look carefully at this handle, you will see that -its type has been designed to hide the type `F`. In fact, it only -reveals the types `T` and `E` which are the ok/err result types of the -future `F`. This is intentonal: suppose that the type `F` includes -some references, but those references don't appear in the result. We -want the "result" future to be able to escape the scope, then, to any -place where the types `T` and `E` are still in scope. If we exposed -`F` here that would not be possible. (Hiding `F` also requires a -transmute to an object type, in this case an internal trait called -`ScopeFutureTrait`.) Note though that it is possible for `T` and `E` -to have references in them. They could even be references tied to the -scope. - -So what can a user do with this result future? They have two -operations available: poll and cancel. Let's look at cancel first, -since it's simpler. If the state is *COMPLETE*, then `cancel()` is an -immediate no-op, so we know that it can't be used to access any -references that may be invalid. In any case, the only thing it does is -to set a field to true and invoke `notify()`, and we already examined -the possible effects of `notify()` in the previous section. - -So what about `poll()`? This is how the user gets the final result out -of the future. The important thing that it does is to access (and -effectively nullify) the field `result`, which stores the result of -the future and hence may have access to `T` and `E` values. These -values may contain references...so how do we know that they are still in -scope? The answer is that those types are exposed in the user's type -of the future, and hence the basic Rust type system should guarantee -that any references are still valid, or else the user shouldn't be -able to call `poll()`. (The same is true at the time of cancellation, -but that's not important, since `cancel()` doesn't do anything of -interest.) diff --git a/rayon-futures/src/compile_fail/future_escape.rs b/rayon-futures/src/compile_fail/future_escape.rs deleted file mode 100644 index 6cda4226b..000000000 --- a/rayon-futures/src/compile_fail/future_escape.rs +++ /dev/null @@ -1,46 +0,0 @@ -/*! ```compile_fail,E0382,E0501,E0503,E0716 - -extern crate futures; -extern crate rayon_core; -extern crate rayon_futures; - -use futures::future::lazy; -use rayon_futures::ScopeFutureExt; - -fn a() { - let data = &mut [format!("Hello, ")]; - - let mut future = None; - rayon_core::scope(|s| { - let data = &mut *data; - future = Some(s.spawn_future(lazy(move || Ok::<_, ()>(&mut data[0])))); - }); - - // `data` is still borrowed as part of future here: - assert_eq!(data[0], "Hello, world!"); //~ ERROR -} - -fn b() { - let data = &mut [format!("Hello, ")]; - - let mut future = None; - rayon_core::scope(|s| { - future = Some(s.spawn_future(lazy(move || Ok::<_, ()>(&mut data[0])))); - }); - - // `data` is moved into the scope above, can't use here - assert_eq!(data[0], "Hello, world!"); //~ ERROR -} - -fn c() { - let mut future = None; - // borrowed value does not live long enough - let data = &mut [format!("Hello, ")]; //~ ERROR - rayon_core::scope(|s| { - future = Some(s.spawn_future(lazy(move || Ok::<_, ()>(&mut data[0])))); - }); -} - -fn main() { } - -``` */ diff --git a/rayon-futures/src/compile_fail/mod.rs b/rayon-futures/src/compile_fail/mod.rs deleted file mode 100644 index 9e08a7bd2..000000000 --- a/rayon-futures/src/compile_fail/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -// These modules contain `compile_fail` doc tests. -mod future_escape; diff --git a/rayon-futures/src/lib.rs b/rayon-futures/src/lib.rs deleted file mode 100644 index 1f82cde40..000000000 --- a/rayon-futures/src/lib.rs +++ /dev/null @@ -1,624 +0,0 @@ -//! Future support in Rayon. -//! -//! See `README.md` for details. -#![deny(missing_debug_implementations)] -#![doc(html_root_url = "https://docs.rs/rayon-futures/0.1")] - -use futures::future::CatchUnwind; -use futures::task::{self, Spawn, Task}; -use futures::{Async, Future, Poll}; -use rayon_core::internal::worker; // May need `RUSTFLAGS='--cfg rayon_unstable'` to compile - -use futures::executor::{self, Notify, NotifyHandle, UnsafeNotify}; -use rayon_core::internal::task::{ScopeHandle, Task as RayonTask, ToScopeHandle}; -use std::any::Any; -use std::fmt; -use std::marker::PhantomData; -use std::mem; -use std::panic::{self, AssertUnwindSafe}; -use std::ptr; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::*; -use std::sync::Arc; -use std::sync::Mutex; - -const STATE_PARKED: usize = 0; -const STATE_UNPARKED: usize = 1; -const STATE_EXECUTING: usize = 2; -const STATE_EXECUTING_UNPARKED: usize = 3; -const STATE_COMPLETE: usize = 4; - -pub trait ScopeFutureExt<'scope> { - fn spawn_future(&self, future: F) -> RayonFuture - where - F: Future + Send + 'scope; -} - -impl<'scope, T> ScopeFutureExt<'scope> for T -where - T: ToScopeHandle<'scope>, -{ - fn spawn_future(&self, future: F) -> RayonFuture - where - F: Future + Send + 'scope, - { - let inner = ScopeFuture::spawn(future, self.to_scope_handle()); - - // We assert that it is safe to hide the type `F` (and, in - // particular, the lifetimes in it). This is true because the API - // offered by a `RayonFuture` only permits access to the result of - // the future (of type `F::Item` or `F::Error`) and those types - // *are* exposed in the `RayonFuture` type. See - // README.md for details. - unsafe { - return RayonFuture { - inner: hide_lifetime(inner), - }; - } - - unsafe fn hide_lifetime<'l, T, E>( - x: Arc + 'l>, - ) -> Arc> { - mem::transmute(x) - } - } -} - -/// Represents the result of a future that has been spawned in the -/// Rayon threadpool. -/// -/// # Panic behavior -/// -/// Any panics that occur while computing the spawned future will be -/// propagated when this future is polled. -pub struct RayonFuture { - inner: Arc, Box>>, -} - -impl RayonFuture { - pub fn rayon_wait(mut self) -> Result { - worker::if_in_worker_thread(|worker_thread| { - // In Rayon worker thread: spin. Unsafe because we must be - // sure that `self.inner.probe()` will trigger some Rayon - // event once it becomes true -- and it will, as when the - // future moves to the complete state, we will invoke - // either `ScopeHandle::panicked()` or `ScopeHandle::ok()` - // on our scope handle. - unsafe { - worker_thread.wait_until_true(|| self.inner.probe()); - } - self.poll().map(|a_v| match a_v { - Async::Ready(v) => v, - Async::NotReady => panic!("probe() returned true but poll not ready"), - }) - }) - .unwrap_or_else(|| self.wait()) - } -} - -impl Future for RayonFuture { - type Item = T; - type Error = E; - - fn wait(self) -> Result { - worker::if_in_worker_thread(|_| { - panic!("using `wait()` in a Rayon thread is unwise; try `rayon_wait()`") - }); - executor::spawn(self).wait_future() - } - - fn poll(&mut self) -> Poll { - match self.inner.poll() { - Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(v)), - Ok(Async::Ready(Err(e))) => Err(e), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => panic::resume_unwind(e), - } - } -} - -impl Drop for RayonFuture { - fn drop(&mut self) { - self.inner.cancel(); - } -} - -impl fmt::Debug for RayonFuture { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("RayonFuture").finish() - } -} -/// //////////////////////////////////////////////////////////////////////// -#[derive(Debug)] -struct ScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - state: AtomicUsize, - contents: Mutex>, -} - -type CU = CatchUnwind>; -type CUItem = as Future>::Item; -type CUError = as Future>::Error; - -struct ScopeFutureContents<'scope, F, S> -where - F: Future + Send, - S: ScopeHandle<'scope>, -{ - spawn: Option>>, - - // Pointer to ourselves. We `None` this out when we are finished - // executing, but it's convenient to keep around normally. - this: Option>, - - // the counter in the scope; since the scope doesn't terminate until - // counter reaches zero, and we hold a ref in this counter, we are - // assured that this pointer remains valid - scope: Option, - - waiting_task: Option, - result: Poll, CUError>, - - canceled: bool, -} - -impl<'scope, F, S> fmt::Debug for ScopeFutureContents<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("ScopeFutureContents").finish() - } -} - -// Newtype so we can implement Into even though the contents are not 'static. -#[derive(Debug)] -struct ArcScopeFuture<'scope, F, S>(Arc>) -where - F: Future + Send, - S: ScopeHandle<'scope>; - -impl<'scope, F, S> Clone for ArcScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn clone(&self) -> Self { - ArcScopeFuture(self.0.clone()) - } -} - -impl<'scope, F, S> Notify for ArcScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn notify(&self, id: usize) { - self.0.notify(id) - } - - fn clone_id(&self, id: usize) -> usize { - self.0.clone_id(id) - } - - fn drop_id(&self, id: usize) { - self.0.drop_id(id) - } -} - -// This is adapted from the implementation of Into for -// Arc in futures-rs, we need to roll our own to drop the 'static bound. -// A ScopeFuture that is inside a ArcScopeFuture. -#[derive(Debug)] -struct ScopeFutureWrapped<'scope, F, S>(PhantomData<(&'scope F, S)>); - -unsafe impl<'scope, F, S> Send for ScopeFutureWrapped<'scope, F, S> {} -unsafe impl<'scope, F, S> Sync for ScopeFutureWrapped<'scope, F, S> {} - -impl<'scope, F, S> Notify for ScopeFutureWrapped<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn notify(&self, id: usize) { - unsafe { - let me: *const ScopeFutureWrapped<'scope, F, S> = self; - ArcScopeFuture::notify( - &*(&me as *const *const ScopeFutureWrapped<'scope, F, S> - as *const ArcScopeFuture<'scope, F, S>), - id, - ) - } - } - - fn clone_id(&self, id: usize) -> usize { - unsafe { - let me: *const ScopeFutureWrapped<'scope, F, S> = self; - ArcScopeFuture::clone_id( - &*(&me as *const *const ScopeFutureWrapped<'scope, F, S> - as *const ArcScopeFuture<'scope, F, S>), - id, - ) - } - } - - fn drop_id(&self, id: usize) { - unsafe { - let me: *const ScopeFutureWrapped<'scope, F, S> = self; - ArcScopeFuture::drop_id( - &*(&me as *const *const ScopeFutureWrapped<'scope, F, S> - as *const ArcScopeFuture<'scope, F, S>), - id, - ) - } - } -} - -unsafe impl<'scope, F, S> UnsafeNotify for ScopeFutureWrapped<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - unsafe fn clone_raw(&self) -> NotifyHandle { - let me: *const ScopeFutureWrapped<'scope, F, S> = self; - let arc = (*(&me as *const *const ScopeFutureWrapped<'scope, F, S> - as *const ArcScopeFuture<'scope, F, S>)) - .clone(); - NotifyHandle::from(arc) - } - - unsafe fn drop_raw(&self) { - let mut me: *const ScopeFutureWrapped<'scope, F, S> = self; - let me = &mut me as *mut *const ScopeFutureWrapped<'scope, F, S> - as *mut ArcScopeFuture<'scope, F, S>; - ptr::drop_in_place(me); - } -} - -impl<'scope, F, S> From> for NotifyHandle -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn from(rc: ArcScopeFuture<'scope, F, S>) -> NotifyHandle { - unsafe { - let ptr = mem::transmute::< - ArcScopeFuture<'scope, F, S>, - *mut ScopeFutureWrapped<'scope, F, S>, - >(rc); - // Hide any lifetimes in `self`. This is safe because, until - // `self` is dropped, the counter is not decremented, and so - // the `'scope` lifetimes cannot end. - // - // Here we assert that hiding the lifetimes in this fashion is - // safe: we claim this is true because the lifetimes we are - // hiding are part of `F`, and we now that any lifetimes in - // `F` outlive `counter`. And we can see from `complete()` - // that we drop all values of type `F` before decrementing - // `counter`. - NotifyHandle::new(mem::transmute(ptr as *mut dyn UnsafeNotify)) - } - } -} - -// Assert that the `*const` is safe to transmit between threads: -unsafe impl<'scope, F, S> Send for ScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ -} -unsafe impl<'scope, F, S> Sync for ScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ -} - -impl<'scope, F, S> ScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn spawn(future: F, scope: S) -> Arc { - // Using `AssertUnwindSafe` is valid here because (a) the data - // is `Send + Sync`, which is our usual boundary and (b) - // panics will be propagated when the `RayonFuture` is polled. - let spawn = task::spawn(AssertUnwindSafe(future).catch_unwind()); - - let future: Arc = Arc::new(ScopeFuture:: { - state: AtomicUsize::new(STATE_PARKED), - contents: Mutex::new(ScopeFutureContents { - spawn: None, - this: None, - scope: Some(scope), - waiting_task: None, - result: Ok(Async::NotReady), - canceled: false, - }), - }); - - // Make the two self-cycles. Note that these imply the future - // cannot be freed until these fields are set to `None` (which - // occurs when it is finished executing). - { - let mut contents = future.contents.try_lock().unwrap(); - contents.spawn = Some(spawn); - contents.this = Some(ArcScopeFuture(future.clone())); - } - - future.notify(0); - - future - } - - fn unpark_inherent(&self) { - loop { - match self.state.load(Relaxed) { - STATE_PARKED => { - if { - self.state - .compare_exchange_weak(STATE_PARKED, STATE_UNPARKED, Release, Relaxed) - .is_ok() - } { - // Contention here is unlikely but possible: a - // previous execution might have moved us to the - // PARKED state but not yet released the lock. - let contents = self.contents.lock().unwrap(); - let task_ref = contents.this.clone().expect("this ref already dropped"); - - // We assert that `contents.scope` will be not - // be dropped until the task is executed. This - // is true because we only drop - // `contents.scope` from within `RayonTask::execute()`. - unsafe { - contents - .scope - .as_ref() - .expect("scope already dropped") - .spawn_task(task_ref.0); - } - return; - } - } - - STATE_EXECUTING => { - if { - self.state - .compare_exchange_weak( - STATE_EXECUTING, - STATE_EXECUTING_UNPARKED, - Release, - Relaxed, - ) - .is_ok() - } { - return; - } - } - - state => { - debug_assert!( - state == STATE_UNPARKED - || state == STATE_EXECUTING_UNPARKED - || state == STATE_COMPLETE - ); - return; - } - } - } - } - - fn begin_execute_state(&self) { - // When we are put into the unparked state, we are enqueued in - // a worker thread. We should then be executed exactly once, - // at which point we transiition to STATE_EXECUTING. Nobody - // should be contending with us to change the state here. - let state = self.state.load(Acquire); - debug_assert_eq!(state, STATE_UNPARKED); - let result = self - .state - .compare_exchange(state, STATE_EXECUTING, Release, Relaxed); - debug_assert_eq!(result, Ok(STATE_UNPARKED)); - } - - fn end_execute_state(&self) -> bool { - loop { - match self.state.load(Relaxed) { - STATE_EXECUTING => { - if { - self.state - .compare_exchange_weak(STATE_EXECUTING, STATE_PARKED, Release, Relaxed) - .is_ok() - } { - // We put ourselves into parked state, no need to - // re-execute. We'll just wait for the Notify. - return true; - } - } - - state => { - debug_assert_eq!(state, STATE_EXECUTING_UNPARKED); - if { - self.state - .compare_exchange_weak(state, STATE_EXECUTING, Release, Relaxed) - .is_ok() - } { - // We finished executing, but an unpark request - // came in the meantime. We need to execute - // again. Return false as we failed to end the - // execution phase. - return false; - } - } - } - } - } -} - -impl<'scope, F, S> Notify for ScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn notify(&self, _: usize) { - self.unpark_inherent(); - } -} - -impl<'scope, F, S> RayonTask for ScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn execute(this: Arc) { - // *generally speaking* there should be no contention for the - // lock, but it is possible -- we can end execution, get re-enqeueud, - // and re-executed, before we have time to return from this fn - let mut contents = this.contents.lock().unwrap(); - - this.begin_execute_state(); - loop { - if contents.canceled { - return contents.complete(Ok(Async::NotReady)); - } else { - match contents.poll() { - Ok(Async::Ready(v)) => { - return contents.complete(Ok(Async::Ready(v))); - } - Ok(Async::NotReady) => { - if this.end_execute_state() { - return; - } - } - Err(err) => { - return contents.complete(Err(err)); - } - } - } - } - } -} - -impl<'scope, F, S> ScopeFutureContents<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn poll(&mut self) -> Poll, CUError> { - let notify = self.this.as_ref().unwrap(); - self.spawn.as_mut().unwrap().poll_future_notify(notify, 0) - } - - fn complete(&mut self, value: Poll, CUError>) { - // So, this is subtle. We know that the type `F` may have some - // data which is only valid until the end of the scope, and we - // also know that the scope doesn't end until `self.counter` - // is decremented below. So we want to be sure to drop - // `self.future` first, lest its dtor try to access some of - // that state or something! - mem::drop(self.spawn.take().unwrap()); - - self.result = value; - let this = self.this.take().unwrap(); - if cfg!(debug_assertions) { - let state = this.0.state.load(Relaxed); - debug_assert!( - state == STATE_EXECUTING || state == STATE_EXECUTING_UNPARKED, - "cannot complete when not executing (state = {})", - state - ); - } - this.0.state.store(STATE_COMPLETE, Release); - - // `notify()` here is arbitrary user-code, so it may well - // panic. We try to capture that panic and forward it - // somewhere useful if we can. - let mut err = None; - if let Some(waiting_task) = self.waiting_task.take() { - match panic::catch_unwind(AssertUnwindSafe(|| waiting_task.notify())) { - Ok(()) => {} - Err(e) => { - err = Some(e); - } - } - } - - // Allow the enclosing scope to end. Asserts that - // `self.counter` is still valid, which we know because caller - // to `new_rayon_future()` ensures it for us. - let scope = self.scope.take().unwrap(); - if let Some(err) = err { - scope.panicked(err); - } else { - scope.ok(); - } - } -} - -trait ScopeFutureTrait: Send + Sync { - /// Returns true when future is in the COMPLETE state. - fn probe(&self) -> bool; - - /// Execute the `poll` operation of a future: read the result if - /// it is ready, return `Async::NotReady` otherwise. - fn poll(&self) -> Poll; - - /// Indicate that we no longer care about the result of the future. - /// Corresponds to `Drop` in the future trait. - fn cancel(&self); -} - -impl<'scope, F, S> ScopeFutureTrait, CUError> for ScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ - fn probe(&self) -> bool { - self.state.load(Acquire) == STATE_COMPLETE - } - - fn poll(&self) -> Poll, CUError> { - // Important: due to transmute hackery, not all the fields are - // truly known to be valid at this point. In particular, the - // type F is erased. But the `state` and `result` fields - // should be valid. - let mut contents = self.contents.lock().unwrap(); - let state = self.state.load(Relaxed); - if state == STATE_COMPLETE { - let r = mem::replace(&mut contents.result, Ok(Async::NotReady)); - return r; - } else { - contents.waiting_task = Some(task::current()); - Ok(Async::NotReady) - } - } - - fn cancel(&self) { - // Fast-path: check if this is already complete and return if - // so. A relaxed load suffices since we are not going to - // access any data as a result of this action. - if self.state.load(Relaxed) == STATE_COMPLETE { - return; - } - - // Slow-path. Get the lock and set the canceled flag to - // true. Also grab the `this` instance (which may be `None`, - // if the future completes before we get the lack). - let mut contents = self.contents.lock().unwrap(); - contents.canceled = true; - - // If the `this` we grabbed was not `None`, then notify it. - // This will schedule the future. - if let Some(ref u) = contents.this { - u.notify(0); - } - } -} - -mod compile_fail; -mod test; diff --git a/rayon-futures/src/test.rs b/rayon-futures/src/test.rs deleted file mode 100644 index 32a0011e9..000000000 --- a/rayon-futures/src/test.rs +++ /dev/null @@ -1,288 +0,0 @@ -#![cfg(test)] - -use super::ScopeFutureExt; -use futures::executor::Notify; -use futures::future::lazy; -use futures::sync::oneshot; -use futures::task; -use futures::{self, Async, Future}; -use rayon_core::{scope, ThreadPool, ThreadPoolBuilder}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; - -/// Basic test of using futures to data on the stack frame. -#[test] -fn future_test() { - let data = &[0, 1]; - - // Here we call `wait` on a select future, which will block at - // least one thread. So we need a second thread to ensure no - // deadlock. - ThreadPoolBuilder::new() - .num_threads(2) - .build() - .unwrap() - .install(|| { - scope(|s| { - let a = s.spawn_future(futures::future::ok::<_, ()>(&data[0])); - let b = s.spawn_future(futures::future::ok::<_, ()>(&data[1])); - let (item1, next) = a.select(b).wait().ok().unwrap(); - let item2 = next.wait().unwrap(); - assert!(*item1 == 0 || *item1 == 1); - assert!(*item2 == 1 - *item1); - }); - }); -} - -/// Test using `map` on a Rayon future. The `map` closure is eecuted -/// for side-effects, and modifies the `data` variable that is owned -/// by enclosing stack frame. -#[test] -fn future_map() { - let data = &mut [format!("Hello, ")]; - - let mut future = None; - scope(|s| { - let a = s.spawn_future(lazy(|| Ok::<_, ()>(&mut data[0]))); - future = Some(s.spawn_future(a.map(|v| { - v.push_str("world!"); - }))); - }); - - // future must have executed for the scope to have ended, even - // though we never invoked `wait` to observe its result - assert_eq!(data[0], "Hello, world!"); - assert!(future.is_some()); -} - -/// Test that we can create a future that returns an `&mut` to data, -/// so long as it outlives the scope. -#[test] -fn future_escape_ref() { - let data = &mut [format!("Hello, ")]; - - { - let mut future = None; - scope(|s| { - let data = &mut *data; - future = Some(s.spawn_future(lazy(move || Ok::<_, ()>(&mut data[0])))); - }); - let s = future.unwrap().wait().unwrap(); - s.push_str("world!"); - } - - assert_eq!(data[0], "Hello, world!"); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn future_panic_prop() { - scope(|s| { - let future = s.spawn_future(lazy(move || Ok::<(), ()>(argh()))); - let _ = future.rayon_wait(); // should panic, not return a value - }); - - fn argh() -> () { - if true { - panic!("Hello, world!"); - } - } -} - -/// Test that, even if we have only one thread, invoke `rayon_wait` -/// will not panic. -#[test] -fn future_rayon_wait_1_thread() { - // run with only 1 worker thread; this would deadlock if we couldn't make progress - let mut result = None; - ThreadPoolBuilder::new() - .num_threads(1) - .build() - .unwrap() - .install(|| { - scope(|s| { - use std::sync::mpsc::channel; - let (tx, rx) = channel(); - let a = s.spawn_future(lazy(move || Ok::(rx.recv().unwrap()))); - // ^^^^ FIXME: why is this needed? - let b = s.spawn_future(a.map(|v| v + 1)); - let c = s.spawn_future(b.map(|v| v + 1)); - s.spawn(move |_| tx.send(20).unwrap()); - result = Some(c.rayon_wait().unwrap()); - }); - }); - assert_eq!(result, Some(22)); -} - -/// Test that invoking `wait` on a `RayonFuture` will panic, if it is inside -/// a Rayon worker thread. -#[test] -#[should_panic] -fn future_wait_panics_inside_rayon_thread() { - scope(|s| { - let future = s.spawn_future(lazy(move || Ok::<(), ()>(()))); - let _ = future.wait(); // should panic, not return a value - }); -} - -/// Test that invoking `wait` on a `RayonFuture` will not panic if we -/// are outside a Rayon worker thread. -#[test] -fn future_wait_works_outside_rayon_threads() { - let mut future = None; - scope(|s| { - future = Some(s.spawn_future(lazy(move || Ok::<(), ()>(())))); - }); - assert_eq!(Ok(()), future.unwrap().wait()); -} - -/// Test that invoking `wait` on a `RayonFuture` will not panic if we -/// are outside a Rayon worker thread. -#[test] -#[should_panic(expected = "Hello, world!")] -fn panicy_unpark() { - scope(|s| { - let (a_tx, a_rx) = oneshot::channel::(); - let rf = s.spawn_future(a_rx); - - // invoke `poll_future` with a `PanicUnpark` instance; - // this should get installed as a 'waiting task' on the - // Rayon future `rf` - let mut spawn = task::spawn(rf); - match spawn.poll_future_notify(&PANIC_UNPARK, 0) { - Ok(Async::NotReady) => { - // good, we expect not to be ready yet - } - r => panic!("spawn poll returned: {:?}", r), - } - - // this should trigger the future `a_rx` to be awoken - // and executing in a Rayon background thread - a_tx.send(22).unwrap(); - - // now we wait for `rf` to complete; when it does, it will - // also signal the `PanicUnpark` to wake up (that is - // *supposed* to be what triggers us to `poll` again, but - // we are sidestepping that) - let v = spawn.into_inner().rayon_wait().unwrap(); - assert_eq!(v, 22); - }); - panic!("scope failed to panic!"); - - #[derive(Clone)] - struct PanicUnpark; - - impl Notify for PanicUnpark { - fn notify(&self, _: usize) { - panic!("Hello, world!"); - } - } - - const PANIC_UNPARK: &'static PanicUnpark = &PanicUnpark; -} - -#[test] -fn double_unpark() { - let unpark0 = Arc::new(TrackUnpark { - value: AtomicUsize::new(0), - }); - let unpark1 = Arc::new(TrackUnpark { - value: AtomicUsize::new(0), - }); - let mut _tag = None; - scope(|s| { - let (a_tx, a_rx) = oneshot::channel::(); - let rf = s.spawn_future(a_rx); - - let mut spawn = task::spawn(rf); - - // test that we don't panic if people try to install a task many times; - // even if they are different tasks - for i in 0..22 { - let u = if i % 2 == 0 { - unpark0.clone() - } else { - unpark1.clone() - }; - match spawn.poll_future_notify(&u, 0) { - Ok(Async::NotReady) => { - // good, we expect not to be ready yet - } - r => panic!("spawn poll returned: {:?}", r), - } - } - - a_tx.send(22).unwrap(); - - // just hold onto `rf` to ensure that nothing is cancelled - _tag = Some(spawn.into_inner()); - }); - - // Since scope is done, our spawned future must have completed. It - // should have signalled the unpark value we gave it -- but - // exactly once, even though we called `poll` many times. - assert_eq!(unpark1.value.load(Ordering::SeqCst), 1); - - // unpark0 was not the last unpark supplied, so it will never be signalled - assert_eq!(unpark0.value.load(Ordering::SeqCst), 0); - - struct TrackUnpark { - value: AtomicUsize, - } - - impl Notify for TrackUnpark { - fn notify(&self, _: usize) { - self.value.fetch_add(1, Ordering::SeqCst); - } - } -} - -#[test] -fn async_future_map() { - let data = Arc::new(Mutex::new(format!("Hello, "))); - - let pool = ThreadPool::global(); - let a = pool.spawn_future(lazy({ - let data = data.clone(); - move || Ok::<_, ()>(data) - })); - let future = pool.spawn_future(a.map(|data| { - let mut v = data.lock().unwrap(); - v.push_str("world!"); - })); - let () = future.wait().unwrap(); - - // future must have executed for the scope to have ended, even - // though we never invoked `wait` to observe its result - assert_eq!(&data.lock().unwrap()[..], "Hello, world!"); -} - -#[test] -#[should_panic(expected = "Hello, world!")] -fn async_future_panic_prop() { - let pool = ThreadPool::global(); - let future = pool.spawn_future(lazy(move || Ok::<(), ()>(argh()))); - let _ = future.rayon_wait(); // should panic, not return a value - - fn argh() -> () { - if true { - panic!("Hello, world!"); - } - } -} - -#[test] -fn async_future_scope_interact() { - let pool = ThreadPool::global(); - let future = pool.spawn_future(lazy(move || Ok::(22))); - - let mut vec = vec![]; - scope(|s| { - let future = s.spawn_future(future.map(|x| x * 2)); - s.spawn(|_| { - vec.push(future.rayon_wait().unwrap()); - }); // just because - }); - - assert_eq!(vec![44], vec); -} diff --git a/rayon-futures/tests/compile-fail/README.md b/rayon-futures/tests/compile-fail/README.md deleted file mode 100644 index 58a148514..000000000 --- a/rayon-futures/tests/compile-fail/README.md +++ /dev/null @@ -1,11 +0,0 @@ -This directory contains test files that are **not expected to -compile**. It is useful for writing tests that things which ought not -to type-check do not, in fact, type-check. - -To write a test, just write a `.rs` file using Rayon. Then, for each -compilation error, write a `//~ ERROR E123` annotation on the line -where the error occurs. `E123` should be the error code that is issued -by rustc. This should be reasonably robust against future compiler -changes, though in some cases the errors may start showing up on -different lines etc as compiler heuristics change. -