diff --git a/src/app/archive/add_unique_constraints.sql b/src/app/archive/add_unique_constraints.sql new file mode 100644 index 00000000000..96edc1ef184 --- /dev/null +++ b/src/app/archive/add_unique_constraints.sql @@ -0,0 +1,2 @@ +ALTER TABLE zkapp_field_array ADD CONSTRAINT zkapp_field_array_element_ids_key UNIQUE (element_ids); +ALTER TABLE zkapp_events ADD CONSTRAINT zkapp_events_element_ids_key UNIQUE (element_ids); \ No newline at end of file diff --git a/src/app/archive/lib/dune b/src/app/archive/lib/dune index 385408aca97..b5897d8c6fa 100644 --- a/src/app/archive/lib/dune +++ b/src/app/archive/lib/dune @@ -74,6 +74,7 @@ error_json ppx_deriving_yojson.runtime ppx_version.runtime + o1trace ) (inline_tests (flags -verbose -show-counts)) (modes native) diff --git a/src/app/archive/lib/processor.ml b/src/app/archive/lib/processor.ml index cbbef5942ae..eb7744c84cc 100644 --- a/src/app/archive/lib/processor.ml +++ b/src/app/archive/lib/processor.ml @@ -1417,20 +1417,83 @@ module Zkapp_events = struct let table_name = "zkapp_events" + module Field_array_map = Map.Make (struct + type t = int array [@@deriving sexp] + + let compare = Array.compare Int.compare + end) + + (* Account_update.Body.Events'.t is defined as `field array list`, + which is ismorphic to a list of list of fields. + + We are batching the insertion of field and field_array to optimize + the speed of archiving max-cost zkapps. + + 1. we flatten the list of list of fields to get all the field elements + 2. insert all the field elements in one query + 3. construct a map "M" from `field_id` to `field` by querying against the zkapp_field table + 4. use "M" and the list of list of fields to compute the list of list of field_ids + 5. insert all list of `list of field_ids` in one query + 6. construct a map "M'" from `field_array_id` to `field_id array` by querying against + the zkapp_field_array table + 7. use "M'" and the list of list of field_ids to compute the list of field_array_ids + 8. insert the list of field_arrays + *) let add_if_doesn't_exist (module Conn : CONNECTION) (events : Account_update.Body.Events'.t) = let open Deferred.Result.Let_syntax in - let%bind (element_ids : int array) = - Mina_caqti.deferred_result_list_map events - ~f:(Zkapp_field_array.add_if_doesn't_exist (module Conn)) - >>| Array.of_list + let%bind field_array_id_list = + if not @@ List.is_empty events then + let field_list_list = + List.map events ~f:(fun field_array -> + Array.map field_array ~f:Pickles.Backend.Tick.Field.to_string + |> Array.to_list ) + in + let fields = field_list_list |> List.concat in + let%bind field_id_list_list = + if not @@ List.is_empty fields then + let%map field_map = + Mina_caqti.insert_multi_into_col ~table_name:"zkapp_field" + ~col:("field", Caqti_type.string) + (module Conn) + fields + >>| String.Map.of_alist_exn + in + let field_id_list_list = + List.map field_list_list ~f:(List.map ~f:(Map.find_exn field_map)) + in + field_id_list_list + else + (* if there's no fields, then we must have some list of empty lists *) + return @@ List.map field_list_list ~f:(fun _ -> []) + in + (* this conversion should be done by caqti using `typ`, FIX this in the future *) + let field_array_list = + List.map field_id_list_list ~f:(fun id_list -> + List.map id_list ~f:Int.to_string + |> String.concat ~sep:", " |> sprintf "{%s}" ) + in + let%map field_array_map = + Mina_caqti.insert_multi_into_col ~table_name:"zkapp_field_array" + ~col:("element_ids", Mina_caqti.array_int_typ) + (module Conn) + field_array_list + >>| Field_array_map.of_alist_exn + in + let field_array_id_list = + List.map field_id_list_list ~f:(fun field_id_list -> + Map.find_exn field_array_map (Array.of_list field_id_list) ) + |> Array.of_list + in + field_array_id_list + else return @@ Array.of_list [] in Mina_caqti.select_insert_into_cols ~select:("id", Caqti_type.int) ~table_name ~cols:([ "element_ids" ], Mina_caqti.array_int_typ) ~tannot:(function "element_ids" -> Some "int[]" | _ -> None) (module Conn) - element_ids + field_array_id_list let load (module Conn : CONNECTION) id = Conn.find @@ -1491,14 +1554,17 @@ module Zkapp_account_update_body = struct Account_identifiers.add_if_doesn't_exist (module Conn) account_identifier in let%bind update_id = - Zkapp_updates.add_if_doesn't_exist (module Conn) body.update + Metrics.time ~label:"zkapp_updates.add" + @@ fun () -> Zkapp_updates.add_if_doesn't_exist (module Conn) body.update in let increment_nonce = body.increment_nonce in let%bind events_id = - Zkapp_events.add_if_doesn't_exist (module Conn) body.events + Metrics.time ~label:"Zkapp_events.add" + @@ fun () -> Zkapp_events.add_if_doesn't_exist (module Conn) body.events in let%bind actions_id = - Zkapp_events.add_if_doesn't_exist (module Conn) body.actions + Metrics.time ~label:"Zkapp_actions.add" + @@ fun () -> Zkapp_events.add_if_doesn't_exist (module Conn) body.actions in let%bind call_data_id = Zkapp_field.add_if_doesn't_exist (module Conn) body.call_data @@ -1600,6 +1666,8 @@ module Zkapp_account_update = struct (account_update : Account_update.Simple.t) = let open Deferred.Result.Let_syntax in let%bind body_id = + Metrics.time ~label:"Zkapp_account_update_body.add" + @@ fun () -> Zkapp_account_update_body.add_if_doesn't_exist (module Conn) account_update.body @@ -1897,11 +1965,15 @@ module User_command = struct let open Deferred.Result.Let_syntax in let zkapp_command = Zkapp_command.to_simple ps in let%bind zkapp_fee_payer_body_id = + Metrics.time ~label:"Zkapp_fee_payer_body.add" + @@ fun () -> Zkapp_fee_payer_body.add_if_doesn't_exist (module Conn) zkapp_command.fee_payer.body in let%bind zkapp_account_updates_ids = + Metrics.time ~label:"Zkapp_account_update.add" + @@ fun () -> Mina_caqti.deferred_result_list_map zkapp_command.account_updates ~f:(Zkapp_account_update.add_if_doesn't_exist (module Conn)) >>| Array.of_list @@ -2909,6 +2981,8 @@ module Block = struct (failed_str, Some display) in let%bind _seq_no = + Metrics.time ~label:"adding_transactions" + @@ fun () -> Mina_caqti.deferred_result_list_fold transactions ~init:0 ~f:(fun sequence_no -> function | { Mina_base.With_status.status @@ -2934,6 +3008,9 @@ module Block = struct let status, failure_reasons = failure_reasons user_command.status in + Metrics.time + ~label:"block_and_zkapp_command.add_if_doesn't_exist" + @@ fun () -> Block_and_zkapp_command.add_if_doesn't_exist (module Conn) ~block_id ~zkapp_command_id:id ~sequence_no ~status @@ -3473,9 +3550,12 @@ module Block = struct then (* a new block, allows marking some pending blocks as canonical *) let%bind subchain_blocks = - get_subchain - (module Conn) - ~start_block_id:highest_canonical_block_id ~end_block_id:block_id + Metrics.time ~label:"get_subchain (> canonical_height + k)" + (fun () -> + get_subchain + (module Conn) + ~start_block_id:highest_canonical_block_id + ~end_block_id:block_id ) in let block_height_less_k_int64 = Int64.( - ) block.height k_int64 in (* mark canonical, orphaned blocks in subchain at least k behind the new block *) @@ -3483,38 +3563,45 @@ module Block = struct List.filter subchain_blocks ~f:(fun subchain_block -> Int64.( <= ) subchain_block.height block_height_less_k_int64 ) in - Mina_caqti.deferred_result_list_fold canonical_blocks ~init:() - ~f:(fun () block -> - let%bind () = - mark_as_canonical (module Conn) ~state_hash:block.state_hash - in - mark_as_orphaned - (module Conn) - ~state_hash:block.state_hash ~height:block.height ) + Metrics.time ~label:"mark_as_canonical (> canonical_height + k)" + (fun () -> + Mina_caqti.deferred_result_list_fold canonical_blocks ~init:() + ~f:(fun () block -> + let%bind () = + mark_as_canonical (module Conn) ~state_hash:block.state_hash + in + mark_as_orphaned + (module Conn) + ~state_hash:block.state_hash ~height:block.height ) ) else if Int64.( < ) block.height greatest_canonical_height then (* a missing block added in the middle of canonical chain *) let%bind canonical_block_above_id, _above_height = - get_nearest_canonical_block_above (module Conn) block.height + Metrics.time ~label:"get_nearest_canonical_block_above" (fun () -> + get_nearest_canonical_block_above (module Conn) block.height ) in let%bind canonical_block_below_id, _below_height = - get_nearest_canonical_block_below (module Conn) block.height + Metrics.time ~label:"get_neareast_canonical_block_below" (fun () -> + get_nearest_canonical_block_below (module Conn) block.height ) in (* we can always find this chain: the genesis block should be marked as canonical, and we've found a canonical block above this one *) let%bind canonical_blocks = - get_subchain - (module Conn) - ~start_block_id:canonical_block_below_id - ~end_block_id:canonical_block_above_id + Metrics.time ~label:"get_subchain (< canonical_height)" (fun () -> + get_subchain + (module Conn) + ~start_block_id:canonical_block_below_id + ~end_block_id:canonical_block_above_id ) in - Mina_caqti.deferred_result_list_fold canonical_blocks ~init:() - ~f:(fun () block -> - let%bind () = - mark_as_canonical (module Conn) ~state_hash:block.state_hash - in - mark_as_orphaned - (module Conn) - ~state_hash:block.state_hash ~height:block.height ) + Metrics.time ~label:"mark_as_canonical (< canonical_height)" + (fun () -> + Mina_caqti.deferred_result_list_fold canonical_blocks ~init:() + ~f:(fun () block -> + let%bind () = + mark_as_canonical (module Conn) ~state_hash:block.state_hash + in + mark_as_orphaned + (module Conn) + ~state_hash:block.state_hash ~height:block.height ) ) else (* a block at or above highest canonical block, not high enough to mark any blocks as canonical *) Deferred.Result.return () @@ -3644,7 +3731,12 @@ let add_block_aux ?(retries = 3) ~logger ~pool ~add_block ~hash [%log info] "Attempting to add block data for $state_hash" ~metadata: [ ("state_hash", Mina_base.State_hash.to_yojson state_hash) ] ; - let%bind block_id = add_block (module Conn : CONNECTION) block in + let%bind block_id = + O1trace.thread "archive_processor.add_block" + @@ fun () -> + Metrics.time ~label:"add_block" + @@ fun () -> add_block (module Conn : CONNECTION) block + in (* if an existing block has a parent hash that's for the block just added, set its parent id *) @@ -3654,7 +3746,10 @@ let add_block_aux ?(retries = 3) ~logger ~pool ~add_block ~hash ~parent_hash:(hash block) ~parent_id:block_id in (* update chain status for existing blocks *) - let%bind () = Block.update_chain_status (module Conn) ~block_id in + let%bind () = + Metrics.time ~label:"update_chain_status" (fun () -> + Block.update_chain_status (module Conn) ~block_id ) + in let%bind () = match delete_older_than with | Some num_blocks -> diff --git a/src/app/archive/zkapp_tables.sql b/src/app/archive/zkapp_tables.sql index e22af933ed4..4551cf6029f 100644 --- a/src/app/archive/zkapp_tables.sql +++ b/src/app/archive/zkapp_tables.sql @@ -30,7 +30,7 @@ CREATE TABLE zkapp_field */ CREATE TABLE zkapp_field_array ( id serial PRIMARY KEY -, element_ids int[] NOT NULL +, element_ids int[] NOT NULL UNIQUE ); CREATE INDEX idx_zkapp_field_array_element_ids ON zkapp_field_array(element_ids); @@ -80,7 +80,7 @@ CREATE TABLE zkapp_action_states */ CREATE TABLE zkapp_events ( id serial PRIMARY KEY -, element_ids int[] NOT NULL +, element_ids int[] NOT NULL UNIQUE ); CREATE INDEX idx_zkapp_events_element_ids ON zkapp_events(element_ids); diff --git a/src/lib/mina_caqti/mina_caqti.ml b/src/lib/mina_caqti/mina_caqti.ml index 8377968c98f..b1e49d9bcb8 100644 --- a/src/lib/mina_caqti/mina_caqti.ml +++ b/src/lib/mina_caqti/mina_caqti.ml @@ -297,6 +297,36 @@ let select_insert_into_cols ~(select : string * 'select Caqti_type.t) ~cols:(fst cols) () ) value +let sep_by_comma ?(parenthesis = false) xs = + List.map xs ~f:(if parenthesis then sprintf "('%s')" else sprintf "'%s'") + |> String.concat ~sep:", " + +let insert_multi_into_col ~(table_name : string) + ~(col : string * 'col Caqti_type.t) (module Conn : CONNECTION) + (values : string list) = + let open Deferred.Result.Let_syntax in + let insert = + sprintf + {sql| INSERT INTO %s (%s) VALUES %s + ON CONFLICT (%s) + DO NOTHING |sql} + table_name (fst col) + (sep_by_comma ~parenthesis:true values) + (fst col) + in + let%bind () = Conn.exec (Caqti_request.exec Caqti_type.unit insert) () in + let search = + sprintf + {sql| SELECT %s, id FROM %s + WHERE %s in (%s) |sql} + (fst col) table_name (fst col) (sep_by_comma values) + in + Conn.collect_list + (Caqti_request.collect Caqti_type.unit + Caqti_type.(tup2 (snd col) int) + search ) + () + let query ~f pool = match%bind Caqti_async.Pool.use f pool with | Ok v ->