Skip to content

Commit

Permalink
Merge pull request #14779 from MinaProtocol/fix/archive-db-slowness
Browse files Browse the repository at this point in the history
batch insertion events in archive database
  • Loading branch information
ghost-not-in-the-shell authored Jan 10, 2024
2 parents bb41ad3 + 49e407d commit ceaae53
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 37 deletions.
2 changes: 2 additions & 0 deletions src/app/archive/add_unique_constraints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE zkapp_field_array ADD CONSTRAINT zkapp_field_array_element_ids_key UNIQUE (element_ids);
ALTER TABLE zkapp_events ADD CONSTRAINT zkapp_events_element_ids_key UNIQUE (element_ids);
1 change: 1 addition & 0 deletions src/app/archive/lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
error_json
ppx_deriving_yojson.runtime
ppx_version.runtime
o1trace
)
(inline_tests (flags -verbose -show-counts))
(modes native)
Expand Down
165 changes: 130 additions & 35 deletions src/app/archive/lib/processor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1417,20 +1417,83 @@ module Zkapp_events = struct

let table_name = "zkapp_events"

module Field_array_map = Map.Make (struct
type t = int array [@@deriving sexp]

let compare = Array.compare Int.compare
end)

(* Account_update.Body.Events'.t is defined as `field array list`,
which is ismorphic to a list of list of fields.
We are batching the insertion of field and field_array to optimize
the speed of archiving max-cost zkapps.
1. we flatten the list of list of fields to get all the field elements
2. insert all the field elements in one query
3. construct a map "M" from `field_id` to `field` by querying against the zkapp_field table
4. use "M" and the list of list of fields to compute the list of list of field_ids
5. insert all list of `list of field_ids` in one query
6. construct a map "M'" from `field_array_id` to `field_id array` by querying against
the zkapp_field_array table
7. use "M'" and the list of list of field_ids to compute the list of field_array_ids
8. insert the list of field_arrays
*)
let add_if_doesn't_exist (module Conn : CONNECTION)
(events : Account_update.Body.Events'.t) =
let open Deferred.Result.Let_syntax in
let%bind (element_ids : int array) =
Mina_caqti.deferred_result_list_map events
~f:(Zkapp_field_array.add_if_doesn't_exist (module Conn))
>>| Array.of_list
let%bind field_array_id_list =
if not @@ List.is_empty events then
let field_list_list =
List.map events ~f:(fun field_array ->
Array.map field_array ~f:Pickles.Backend.Tick.Field.to_string
|> Array.to_list )
in
let fields = field_list_list |> List.concat in
let%bind field_id_list_list =
if not @@ List.is_empty fields then
let%map field_map =
Mina_caqti.insert_multi_into_col ~table_name:"zkapp_field"
~col:("field", Caqti_type.string)
(module Conn)
fields
>>| String.Map.of_alist_exn
in
let field_id_list_list =
List.map field_list_list ~f:(List.map ~f:(Map.find_exn field_map))
in
field_id_list_list
else
(* if there's no fields, then we must have some list of empty lists *)
return @@ List.map field_list_list ~f:(fun _ -> [])
in
(* this conversion should be done by caqti using `typ`, FIX this in the future *)
let field_array_list =
List.map field_id_list_list ~f:(fun id_list ->
List.map id_list ~f:Int.to_string
|> String.concat ~sep:", " |> sprintf "{%s}" )
in
let%map field_array_map =
Mina_caqti.insert_multi_into_col ~table_name:"zkapp_field_array"
~col:("element_ids", Mina_caqti.array_int_typ)
(module Conn)
field_array_list
>>| Field_array_map.of_alist_exn
in
let field_array_id_list =
List.map field_id_list_list ~f:(fun field_id_list ->
Map.find_exn field_array_map (Array.of_list field_id_list) )
|> Array.of_list
in
field_array_id_list
else return @@ Array.of_list []
in
Mina_caqti.select_insert_into_cols ~select:("id", Caqti_type.int)
~table_name
~cols:([ "element_ids" ], Mina_caqti.array_int_typ)
~tannot:(function "element_ids" -> Some "int[]" | _ -> None)
(module Conn)
element_ids
field_array_id_list

let load (module Conn : CONNECTION) id =
Conn.find
Expand Down Expand Up @@ -1491,14 +1554,17 @@ module Zkapp_account_update_body = struct
Account_identifiers.add_if_doesn't_exist (module Conn) account_identifier
in
let%bind update_id =
Zkapp_updates.add_if_doesn't_exist (module Conn) body.update
Metrics.time ~label:"zkapp_updates.add"
@@ fun () -> Zkapp_updates.add_if_doesn't_exist (module Conn) body.update
in
let increment_nonce = body.increment_nonce in
let%bind events_id =
Zkapp_events.add_if_doesn't_exist (module Conn) body.events
Metrics.time ~label:"Zkapp_events.add"
@@ fun () -> Zkapp_events.add_if_doesn't_exist (module Conn) body.events
in
let%bind actions_id =
Zkapp_events.add_if_doesn't_exist (module Conn) body.actions
Metrics.time ~label:"Zkapp_actions.add"
@@ fun () -> Zkapp_events.add_if_doesn't_exist (module Conn) body.actions
in
let%bind call_data_id =
Zkapp_field.add_if_doesn't_exist (module Conn) body.call_data
Expand Down Expand Up @@ -1600,6 +1666,8 @@ module Zkapp_account_update = struct
(account_update : Account_update.Simple.t) =
let open Deferred.Result.Let_syntax in
let%bind body_id =
Metrics.time ~label:"Zkapp_account_update_body.add"
@@ fun () ->
Zkapp_account_update_body.add_if_doesn't_exist
(module Conn)
account_update.body
Expand Down Expand Up @@ -1897,11 +1965,15 @@ module User_command = struct
let open Deferred.Result.Let_syntax in
let zkapp_command = Zkapp_command.to_simple ps in
let%bind zkapp_fee_payer_body_id =
Metrics.time ~label:"Zkapp_fee_payer_body.add"
@@ fun () ->
Zkapp_fee_payer_body.add_if_doesn't_exist
(module Conn)
zkapp_command.fee_payer.body
in
let%bind zkapp_account_updates_ids =
Metrics.time ~label:"Zkapp_account_update.add"
@@ fun () ->
Mina_caqti.deferred_result_list_map zkapp_command.account_updates
~f:(Zkapp_account_update.add_if_doesn't_exist (module Conn))
>>| Array.of_list
Expand Down Expand Up @@ -2909,6 +2981,8 @@ module Block = struct
(failed_str, Some display)
in
let%bind _seq_no =
Metrics.time ~label:"adding_transactions"
@@ fun () ->
Mina_caqti.deferred_result_list_fold transactions ~init:0
~f:(fun sequence_no -> function
| { Mina_base.With_status.status
Expand All @@ -2934,6 +3008,9 @@ module Block = struct
let status, failure_reasons =
failure_reasons user_command.status
in
Metrics.time
~label:"block_and_zkapp_command.add_if_doesn't_exist"
@@ fun () ->
Block_and_zkapp_command.add_if_doesn't_exist
(module Conn)
~block_id ~zkapp_command_id:id ~sequence_no ~status
Expand Down Expand Up @@ -3473,48 +3550,58 @@ module Block = struct
then
(* a new block, allows marking some pending blocks as canonical *)
let%bind subchain_blocks =
get_subchain
(module Conn)
~start_block_id:highest_canonical_block_id ~end_block_id:block_id
Metrics.time ~label:"get_subchain (> canonical_height + k)"
(fun () ->
get_subchain
(module Conn)
~start_block_id:highest_canonical_block_id
~end_block_id:block_id )
in
let block_height_less_k_int64 = Int64.( - ) block.height k_int64 in
(* mark canonical, orphaned blocks in subchain at least k behind the new block *)
let canonical_blocks =
List.filter subchain_blocks ~f:(fun subchain_block ->
Int64.( <= ) subchain_block.height block_height_less_k_int64 )
in
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:()
~f:(fun () block ->
let%bind () =
mark_as_canonical (module Conn) ~state_hash:block.state_hash
in
mark_as_orphaned
(module Conn)
~state_hash:block.state_hash ~height:block.height )
Metrics.time ~label:"mark_as_canonical (> canonical_height + k)"
(fun () ->
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:()
~f:(fun () block ->
let%bind () =
mark_as_canonical (module Conn) ~state_hash:block.state_hash
in
mark_as_orphaned
(module Conn)
~state_hash:block.state_hash ~height:block.height ) )
else if Int64.( < ) block.height greatest_canonical_height then
(* a missing block added in the middle of canonical chain *)
let%bind canonical_block_above_id, _above_height =
get_nearest_canonical_block_above (module Conn) block.height
Metrics.time ~label:"get_nearest_canonical_block_above" (fun () ->
get_nearest_canonical_block_above (module Conn) block.height )
in
let%bind canonical_block_below_id, _below_height =
get_nearest_canonical_block_below (module Conn) block.height
Metrics.time ~label:"get_neareast_canonical_block_below" (fun () ->
get_nearest_canonical_block_below (module Conn) block.height )
in
(* we can always find this chain: the genesis block should be marked as canonical, and we've found a
canonical block above this one *)
let%bind canonical_blocks =
get_subchain
(module Conn)
~start_block_id:canonical_block_below_id
~end_block_id:canonical_block_above_id
Metrics.time ~label:"get_subchain (< canonical_height)" (fun () ->
get_subchain
(module Conn)
~start_block_id:canonical_block_below_id
~end_block_id:canonical_block_above_id )
in
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:()
~f:(fun () block ->
let%bind () =
mark_as_canonical (module Conn) ~state_hash:block.state_hash
in
mark_as_orphaned
(module Conn)
~state_hash:block.state_hash ~height:block.height )
Metrics.time ~label:"mark_as_canonical (< canonical_height)"
(fun () ->
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:()
~f:(fun () block ->
let%bind () =
mark_as_canonical (module Conn) ~state_hash:block.state_hash
in
mark_as_orphaned
(module Conn)
~state_hash:block.state_hash ~height:block.height ) )
else
(* a block at or above highest canonical block, not high enough to mark any blocks as canonical *)
Deferred.Result.return ()
Expand Down Expand Up @@ -3644,7 +3731,12 @@ let add_block_aux ?(retries = 3) ~logger ~pool ~add_block ~hash
[%log info] "Attempting to add block data for $state_hash"
~metadata:
[ ("state_hash", Mina_base.State_hash.to_yojson state_hash) ] ;
let%bind block_id = add_block (module Conn : CONNECTION) block in
let%bind block_id =
O1trace.thread "archive_processor.add_block"
@@ fun () ->
Metrics.time ~label:"add_block"
@@ fun () -> add_block (module Conn : CONNECTION) block
in
(* if an existing block has a parent hash that's for the block just added,
set its parent id
*)
Expand All @@ -3654,7 +3746,10 @@ let add_block_aux ?(retries = 3) ~logger ~pool ~add_block ~hash
~parent_hash:(hash block) ~parent_id:block_id
in
(* update chain status for existing blocks *)
let%bind () = Block.update_chain_status (module Conn) ~block_id in
let%bind () =
Metrics.time ~label:"update_chain_status" (fun () ->
Block.update_chain_status (module Conn) ~block_id )
in
let%bind () =
match delete_older_than with
| Some num_blocks ->
Expand Down
4 changes: 2 additions & 2 deletions src/app/archive/zkapp_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ CREATE TABLE zkapp_field
*/
CREATE TABLE zkapp_field_array
( id serial PRIMARY KEY
, element_ids int[] NOT NULL
, element_ids int[] NOT NULL UNIQUE
);

CREATE INDEX idx_zkapp_field_array_element_ids ON zkapp_field_array(element_ids);
Expand Down Expand Up @@ -80,7 +80,7 @@ CREATE TABLE zkapp_action_states
*/
CREATE TABLE zkapp_events
( id serial PRIMARY KEY
, element_ids int[] NOT NULL
, element_ids int[] NOT NULL UNIQUE
);

CREATE INDEX idx_zkapp_events_element_ids ON zkapp_events(element_ids);
Expand Down
30 changes: 30 additions & 0 deletions src/lib/mina_caqti/mina_caqti.ml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,36 @@ let select_insert_into_cols ~(select : string * 'select Caqti_type.t)
~cols:(fst cols) () )
value

let sep_by_comma ?(parenthesis = false) xs =
List.map xs ~f:(if parenthesis then sprintf "('%s')" else sprintf "'%s'")
|> String.concat ~sep:", "

let insert_multi_into_col ~(table_name : string)
~(col : string * 'col Caqti_type.t) (module Conn : CONNECTION)
(values : string list) =
let open Deferred.Result.Let_syntax in
let insert =
sprintf
{sql| INSERT INTO %s (%s) VALUES %s
ON CONFLICT (%s)
DO NOTHING |sql}
table_name (fst col)
(sep_by_comma ~parenthesis:true values)
(fst col)
in
let%bind () = Conn.exec (Caqti_request.exec Caqti_type.unit insert) () in
let search =
sprintf
{sql| SELECT %s, id FROM %s
WHERE %s in (%s) |sql}
(fst col) table_name (fst col) (sep_by_comma values)
in
Conn.collect_list
(Caqti_request.collect Caqti_type.unit
Caqti_type.(tup2 (snd col) int)
search )
()

let query ~f pool =
match%bind Caqti_async.Pool.use f pool with
| Ok v ->
Expand Down

0 comments on commit ceaae53

Please sign in to comment.