Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse domains rather than spawning them for each parallel test #457

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
42 changes: 27 additions & 15 deletions lib/STM_domain.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ module Make (Spec: Spec) = struct
let res_arr = Array.map (fun c -> Domain.cpu_relax(); Spec.run c sut) cs_arr in
List.combine cs (Array.to_list res_arr)

let agree_prop_par (seq_pref,cmds1,cmds2) =
let agree_prop_par ~pool (seq_pref,cmds1,cmds2) =
let sut = Spec.init_sut () in
let pref_obs = interp_sut_res sut seq_pref in
let wait = Atomic.make true in
let dom1 = Domain.spawn (fun () -> while Atomic.get wait do Domain.cpu_relax() done; try Ok (interp_sut_res sut cmds1) with exn -> Error exn) in
let dom2 = Domain.spawn (fun () -> Atomic.set wait false; try Ok (interp_sut_res sut cmds2) with exn -> Error exn) in
let obs1 = Domain.join dom1 in
let obs2 = Domain.join dom2 in
let prom1 = Domain_pair.async_d1 pool (fun () -> while Atomic.get wait do Domain.cpu_relax () done; try Ok (interp_sut_res sut cmds1) with exn -> Error exn) in
let prom2 = Domain_pair.async_d2 pool (fun () -> Atomic.set wait false; try Ok (interp_sut_res sut cmds2) with exn -> Error exn) in
let obs1 = Domain_pair.await prom1 in
let obs2 = Domain_pair.await prom2 in
let () = Spec.cleanup sut in
let obs1 = match obs1 with Ok v -> v | Error exn -> raise exn in
let obs2 = match obs2 with Ok v -> v | Error exn -> raise exn in
Expand All @@ -39,20 +39,20 @@ module Make (Spec: Spec) = struct
(fun (c,r) -> Printf.sprintf "%s : %s" (Spec.show_cmd c) (show_res r))
(pref_obs,obs1,obs2)

let agree_prop_par_asym (seq_pref, cmds1, cmds2) =
let agree_prop_par_asym ~pool (seq_pref, cmds1, cmds2) =
let sut = Spec.init_sut () in
let pref_obs = interp_sut_res sut seq_pref in
let wait = Atomic.make 2 in
let child_dom =
Domain.spawn (fun () ->
let prom =
Domain_pair.async_d1 pool (fun () ->
Atomic.decr wait;
while Atomic.get wait <> 0 do Domain.cpu_relax() done;
while Atomic.get wait <> 0 do () done;
try Ok (interp_sut_res sut cmds2) with exn -> Error exn)
in
Atomic.decr wait;
while Atomic.get wait <> 0 do Domain.cpu_relax() done;
while Atomic.get wait <> 0 do () done;
let parent_obs = try Ok (interp_sut_res sut cmds1) with exn -> Error exn in
let child_obs = Domain.join child_dom in
let child_obs = Domain_pair.await prom in
let () = Spec.cleanup sut in
let parent_obs = match parent_obs with Ok v -> v | Error exn -> raise exn in
let child_obs = match child_obs with Ok v -> v | Error exn -> raise exn in
Expand All @@ -70,7 +70,10 @@ module Make (Spec: Spec) = struct
(arb_cmds_triple seq_len par_len)
(fun triple ->
assume (all_interleavings_ok triple);
repeat rep_count agree_prop_par triple) (* 25 times each, then 25 * 10 times when shrinking *)
let pool = Util.Domain_pair.init () in
Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> repeat rep_count (agree_prop_par ~pool) triple)) (* 25 times each, then 25 * 10 times when shrinking *)

let neg_agree_test_par ~count ~name =
let rep_count = 25 in
Expand All @@ -80,7 +83,10 @@ module Make (Spec: Spec) = struct
(arb_cmds_triple seq_len par_len)
(fun triple ->
assume (all_interleavings_ok triple);
repeat rep_count agree_prop_par triple) (* 25 times each, then 25 * 10 times when shrinking *)
let pool = Util.Domain_pair.init () in
Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> repeat rep_count (agree_prop_par ~pool) triple)) (* 25 times each, then 25 * 10 times when shrinking *)

let agree_test_par_asym ~count ~name =
let rep_count = 25 in
Expand All @@ -90,7 +96,10 @@ module Make (Spec: Spec) = struct
(arb_cmds_triple seq_len par_len)
(fun triple ->
assume (all_interleavings_ok triple);
repeat rep_count agree_prop_par_asym triple) (* 25 times each, then 25 * 10 times when shrinking *)
let pool = Util.Domain_pair.init () in
Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> repeat rep_count (agree_prop_par_asym ~pool) triple)) (* 25 times each, then 25 * 10 times when shrinking *)

let neg_agree_test_par_asym ~count ~name =
let rep_count = 25 in
Expand All @@ -100,5 +109,8 @@ module Make (Spec: Spec) = struct
(arb_cmds_triple seq_len par_len)
(fun triple ->
assume (all_interleavings_ok triple);
repeat rep_count agree_prop_par_asym triple) (* 25 times each, then 25 * 10 times when shrinking *)
let pool = Util.Domain_pair.init () in
Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> repeat rep_count (agree_prop_par_asym ~pool) triple)) (* 25 times each, then 25 * 10 times when shrinking *)
end
4 changes: 2 additions & 2 deletions lib/STM_domain.mli
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ module Make : functor (Spec : STM.Spec) ->
(** [interp_sut_res sut cs] interprets the commands [cs] over the system {!Spec.sut}
and returns the list of corresponding {!Spec.cmd} and result pairs. *)

val agree_prop_par : Spec.cmd list * Spec.cmd list * Spec.cmd list -> bool
val agree_prop_par : pool:Util.Domain_pair.t -> Spec.cmd list * Spec.cmd list * Spec.cmd list -> bool
(** Parallel agreement property based on {!Stdlib.Domain}.
[agree_prop_par (seq_pref, tl1, tl2)] first interprets [seq_pref]
and then spawns two parallel, symmetric domains interpreting [tl1] and
Expand All @@ -49,7 +49,7 @@ module Make : functor (Spec : STM.Spec) ->
@return [true] if there exists a sequential interleaving of the results
which agrees with a model interpretation. *)

val agree_prop_par_asym : Spec.cmd list * Spec.cmd list * Spec.cmd list -> bool
val agree_prop_par_asym : pool:Util.Domain_pair.t -> Spec.cmd list * Spec.cmd list * Spec.cmd list -> bool
(** Asymmetric parallel agreement property based on {!Stdlib.Domain}.
[agree_prop_par_asym (seq_pref, tl1, tl2)] first interprets [seq_pref],
and then interprets [tl1] while a spawned domain interprets [tl2]
Expand Down
51 changes: 38 additions & 13 deletions lib/lin_domain.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
open Lin
open Util

module Make_internal (Spec : Internal.CmdSpec [@alert "-internal"]) = struct
module M = Internal.Make(Spec) [@alert "-internal"]
Expand All @@ -7,25 +8,25 @@ module Make_internal (Spec : Internal.CmdSpec [@alert "-internal"]) = struct
(* operate over arrays to avoid needless allocation underway *)
let interp sut cs =
let cs_arr = Array.of_list cs in
let res_arr = Array.map (fun c -> Domain.cpu_relax(); Spec.run c sut) cs_arr in
let res_arr = Array.map (fun c -> Spec.run c sut) cs_arr in
List.combine cs (Array.to_list res_arr)

let run_parallel (seq_pref,cmds1,cmds2) =
let run_parallel ~pool (seq_pref,cmds1,cmds2) =
let sut = Spec.init () in
let pref_obs = interp sut seq_pref in
let wait = Atomic.make true in
let dom1 = Domain.spawn (fun () -> while Atomic.get wait do Domain.cpu_relax() done; try Ok (interp sut cmds1) with exn -> Error exn) in
let dom2 = Domain.spawn (fun () -> Atomic.set wait false; try Ok (interp sut cmds2) with exn -> Error exn) in
let obs1 = Domain.join dom1 in
let obs2 = Domain.join dom2 in
let prom1 = Domain_pair.async_d1 pool (fun () -> while Atomic.get wait do Domain.cpu_relax () done; try Ok (interp sut cmds1) with exn -> Error exn) in
let prom2 = Domain_pair.async_d2 pool (fun () -> Atomic.set wait false; try Ok (interp sut cmds2) with exn -> Error exn) in
let obs1 = Domain_pair.await prom1 in
let obs2 = Domain_pair.await prom2 in
Spec.cleanup sut ;
let obs1 = match obs1 with Ok v -> v | Error exn -> raise exn in
let obs2 = match obs2 with Ok v -> v | Error exn -> raise exn in
(pref_obs,obs1,obs2)

(* Linearization property based on [Domain] and an Atomic flag *)
let lin_prop (seq_pref,cmds1,cmds2) =
let pref_obs,obs1,obs2 = run_parallel (seq_pref,cmds1,cmds2) in
let lin_prop ~pool (seq_pref,cmds1,cmds2) =
let pref_obs,obs1,obs2 = run_parallel ~pool (seq_pref,cmds1,cmds2) in
let seq_sut = Spec.init () in
check_seq_cons pref_obs obs1 obs2 seq_sut []
|| QCheck.Test.fail_reportf " Results incompatible with sequential execution\n\n%s"
Expand All @@ -34,18 +35,42 @@ module Make_internal (Spec : Internal.CmdSpec [@alert "-internal"]) = struct
(pref_obs,obs1,obs2)

(* "Don't crash under parallel usage" property *)
let stress_prop (seq_pref,cmds1,cmds2) =
let _ = run_parallel (seq_pref,cmds1,cmds2) in
let stress_prop ~pool (seq_pref,cmds1,cmds2) =
let _ = run_parallel ~pool (seq_pref,cmds1,cmds2) in
true

let lin_test ~count ~name =
M.lin_test ~rep_count:50 ~count ~retries:3 ~name ~lin_prop:lin_prop
let rep_count, retries = 50,3 in
let arb_cmd_triple = arb_cmds_triple 20 12 in
QCheck.Test.make ~count ~retries ~name
arb_cmd_triple
(fun triple ->
let pool = Util.Domain_pair.init () in
Stdlib.Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> repeat rep_count (lin_prop ~pool) triple))

let neg_lin_test ~count ~name =
neg_lin_test ~rep_count:50 ~count ~retries:3 ~name ~lin_prop:lin_prop
let rep_count, retries = 50,3 in
let arb_cmd_triple = arb_cmds_triple 20 12 in
QCheck.Test.make_neg ~count ~retries ~name
arb_cmd_triple
(fun triple ->
let pool = Util.Domain_pair.init () in
Stdlib.Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> repeat rep_count (lin_prop ~pool) triple))

let stress_test ~count ~name =
M.lin_test ~rep_count:25 ~count ~retries:5 ~name ~lin_prop:stress_prop
let rep_count, retries = 25,5 in
let arb_cmd_triple = arb_cmds_triple 20 12 in
QCheck.Test.make ~count ~retries ~name
arb_cmd_triple
(fun triple ->
let pool = Util.Domain_pair.init () in
Stdlib.Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> repeat rep_count (stress_prop ~pool) triple))
end

module Make (Spec : Spec) = Make_internal(MakeCmd(Spec))
14 changes: 11 additions & 3 deletions lib/lin_domain.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@ open Lin

(** functor to build an internal module representing parallel tests *)
module Make_internal (Spec : Internal.CmdSpec [@alert "-internal"]) : sig
val arb_cmds_triple : int -> int -> (Spec.cmd list * Spec.cmd list * Spec.cmd list) QCheck.arbitrary
val lin_prop : (Spec.cmd list * Spec.cmd list * Spec.cmd list) -> bool
val stress_prop : (Spec.cmd list * Spec.cmd list * Spec.cmd list) -> bool
val arb_cmds_triple
: int
-> int
-> (Spec.cmd list * Spec.cmd list * Spec.cmd list) QCheck.arbitrary

val lin_prop : pool:Util.Domain_pair.t -> (Spec.cmd list * Spec.cmd list * Spec.cmd list) -> bool

val stress_prop : pool:Util.Domain_pair.t -> (Spec.cmd list * Spec.cmd list * Spec.cmd list) -> bool

val lin_test : count:int -> name:string -> QCheck.Test.t

val neg_lin_test : count:int -> name:string -> QCheck.Test.t

val stress_test : count:int -> name:string -> QCheck.Test.t
end
[@@alert internal "This module is exposed for internal uses only, its API may change at any time"]
Expand Down
104 changes: 104 additions & 0 deletions lib/util.ml
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,107 @@ module Equal = struct
| _ -> false
let equal_array eq x y = equal_seq eq (Array.to_seq x) (Array.to_seq y)
end

module Domain_pair = struct
type 'a promise =
{ mutable result : 'a option;
mutex : Mutex.t;
fulfilled : Condition.t;
}

type command = Task : (unit -> 'a) * 'a promise -> command

type task_mvar =
{ mutable task : command option;
task_mutex : Mutex.t;
new_task : Condition.t;
promise_mutex : Mutex.t;
promise_fulfilled : Condition.t;
}

type t =
{ task1 : task_mvar;
task2 : task_mvar;
d1 : unit Domain.t;
d2 : unit Domain.t;
done_ : bool Atomic.t;
}

let async runner f =
let promise =
{ result = None; mutex = runner.promise_mutex; fulfilled = runner.promise_fulfilled }
in
Mutex.lock runner.task_mutex;
runner.task <- (Some (Task (f, promise)));
Condition.signal runner.new_task;
Mutex.unlock runner.task_mutex;
promise

let async_d1 pair f =
async pair.task1 f

let async_d2 pair f =
async pair.task2 f

let await promise =
Mutex.lock promise.mutex;
while Option.is_none promise.result do
Condition.wait promise.fulfilled promise.mutex
done;
let result = Option.get promise.result in
promise.result <- None;
Mutex.unlock promise.mutex;
result

let domain_fun done_ runner =
while not (Atomic.get done_) do
Mutex.lock runner.task_mutex;
while Option.is_none runner.task && not (Atomic.get done_) do
Condition.wait runner.new_task runner.task_mutex
done;
if not (Atomic.get done_) then (
let Task (f, promise) = Option.get runner.task in
runner.task <- None;
Mutex.unlock runner.task_mutex;
Mutex.lock promise.mutex;
promise.result <- Some (f ());
Condition.signal promise.fulfilled;
Mutex.unlock promise.mutex;
)
done

let init () =
let done_ = Atomic.make false in
let task1 =
{ task = None;
task_mutex = Mutex.create ();
new_task = Condition.create ();
promise_mutex = Mutex.create ();
promise_fulfilled = Condition.create ();
}
and task2 =
{ task = None;
task_mutex = Mutex.create ();
new_task = Condition.create ();
promise_mutex = Mutex.create ();
promise_fulfilled = Condition.create ();
}
in
{ task1;
task2;
done_;
d1 = Domain.spawn (fun () -> domain_fun done_ task1);
d2 = Domain.spawn (fun () -> domain_fun done_ task2);
}

let takedown pair =
Atomic.set pair.done_ true;
Mutex.lock pair.task1.task_mutex;
Condition.signal pair.task1.new_task;
Mutex.unlock pair.task1.task_mutex;
Mutex.lock pair.task2.task_mutex;
Condition.signal pair.task2.new_task;
Mutex.unlock pair.task2.task_mutex;
Domain.join pair.d1;
Domain.join pair.d2
end
10 changes: 10 additions & 0 deletions lib/util.mli
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,13 @@ module Equal : sig
val equal_seq : 'a t -> 'a Seq.t t
val equal_array : 'a t -> 'a array t
end

module Domain_pair : sig
type t
type 'a promise
val async_d1 : t -> (unit -> 'a) -> 'a promise
val async_d2 : t -> (unit -> 'a) -> 'a promise
val await : 'a promise -> 'a
val init : unit -> t
val takedown : t -> unit
end
5 changes: 4 additions & 1 deletion src/domain/stm_tests_dls.ml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ let neg_agree_test_par ~count ~name =
Test.make_neg ~retries:10 ~count ~name
(DLS_STM_dom.arb_cmds_triple seq_len par_len)
(fun triple ->
let pool = Util.Domain_pair.init () in
assume (DLS_STM_dom.all_interleavings_ok triple);
agree_prop_par triple) (* just repeat 1 * 10 times when shrinking *)
Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> agree_prop_par ~pool triple)) (* just repeat 1 * 10 times when shrinking *)
;;
QCheck_base_runner.run_tests_main [
agree_test ~count:1000 ~name:"STM Domain.DLS test sequential";
Expand Down
8 changes: 5 additions & 3 deletions test/cleanup_lin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ Test.check_exn
Test.make ~count:1000 ~name:("exactly one-cleanup test")
(RT.arb_cmds_triple seq_len par_len)
(fun input ->
let pool = Util.Domain_pair.init () in
try
ignore (RT.lin_prop input);
ignore (RT.lin_prop ~pool input);
Util.Domain_pair.takedown pool;
Atomic.get cleanup_counter = 0
with
| RConf.Already_cleaned -> failwith "Already cleaned"
| _ -> Atomic.get cleanup_counter = 0
| RConf.Already_cleaned -> (Util.Domain_pair.takedown pool; failwith "Already cleaned")
| _ -> (Util.Domain_pair.takedown pool; Atomic.get cleanup_counter = 0)
))
7 changes: 6 additions & 1 deletion test/cleanup_stm.ml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ for _i=1 to 100 do
try
Test.check_exn ~rand
(Test.make ~count:1000 ~name:"STM ensure cleanup test parallel"
(RT_dom.arb_cmds_triple 20 12) RT_dom.agree_prop_par) (* without retries *)
(RT_dom.arb_cmds_triple 20 12)
(fun triple ->
let pool = Util.Domain_pair.init () in
Fun.protect
~finally:(fun () -> Util.Domain_pair.takedown pool)
(fun () -> RT_dom.agree_prop_par ~pool triple))) (* without retries *)
with _e -> incr i; assert (!status = Some Cleaned);
done;
assert (!i = 100);
Expand Down
Loading