Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch insertion events in archive database #14779

Merged
merged 20 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/app/archive/add_unique_constraints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE zkapp_field_array ADD CONSTRAINT zkapp_field_array_element_ids_key UNIQUE (element_ids);
ALTER TABLE zkapp_events ADD CONSTRAINT zkapp_events_element_ids_key UNIQUE (element_ids);
1 change: 1 addition & 0 deletions src/app/archive/lib/dune
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
error_json
ppx_deriving_yojson.runtime
ppx_version.runtime
o1trace
)
(inline_tests (flags -verbose -show-counts))
(modes native)
Expand Down
165 changes: 130 additions & 35 deletions src/app/archive/lib/processor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1417,20 +1417,83 @@ module Zkapp_events = struct

let table_name = "zkapp_events"

module Field_array_map = Map.Make (struct
type t = int array [@@deriving sexp]

let compare = Array.compare Int.compare
end)

(* Account_update.Body.Events'.t is defined as `field array list`,
which is ismorphic to a list of list of fields.

We are batching the insertion of field and field_array to optimize
the speed of archiving max-cost zkapps.

1. we flatten the list of list of fields to get all the field elements
2. insert all the field elements in one query
3. construct a map "M" from `field_id` to `field` by querying against the zkapp_field table
4. use "M" and the list of list of fields to compute the list of list of field_ids
5. insert all list of `list of field_ids` in one query
6. construct a map "M'" from `field_array_id` to `field_id array` by querying against
the zkapp_field_array table
7. use "M'" and the list of list of field_ids to compute the list of field_array_ids
8. insert the list of field_arrays
*)
let add_if_doesn't_exist (module Conn : CONNECTION)
(events : Account_update.Body.Events'.t) =
let open Deferred.Result.Let_syntax in
let%bind (element_ids : int array) =
Mina_caqti.deferred_result_list_map events
~f:(Zkapp_field_array.add_if_doesn't_exist (module Conn))
>>| Array.of_list
let%bind field_array_id_list =
if not @@ List.is_empty events then
let field_list_list =
List.map events ~f:(fun field_array ->
Array.map field_array ~f:Pickles.Backend.Tick.Field.to_string
|> Array.to_list )
in
let fields = field_list_list |> List.concat in
let%bind field_id_list_list =
if not @@ List.is_empty fields then
let%map field_map =
Mina_caqti.insert_multi_into_col ~table_name:"zkapp_field"
~col:("field", Caqti_type.string)
(module Conn)
fields
>>| String.Map.of_alist_exn
in
let field_id_list_list =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let field_id_list_list =

field_id_list_list is not needed at all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a let binding here to make the code more readable. Just to give a name to the result of that computation.

List.map field_list_list ~f:(List.map ~f:(Map.find_exn field_map))
in
field_id_list_list
else
(* if there's no fields, then we must have some list of empty lists *)
return @@ List.map field_list_list ~f:(fun _ -> [])
in
(* this conversion should be done by caqti using `typ`, FIX this in the future *)
let field_array_list =
List.map field_id_list_list ~f:(fun id_list ->
List.map id_list ~f:Int.to_string
|> String.concat ~sep:", " |> sprintf "{%s}" )
in
let%map field_array_map =
Mina_caqti.insert_multi_into_col ~table_name:"zkapp_field_array"
~col:("element_ids", Mina_caqti.array_int_typ)
(module Conn)
field_array_list
>>| Field_array_map.of_alist_exn
in
let field_array_id_list =
List.map field_id_list_list ~f:(fun field_id_list ->
Map.find_exn field_array_map (Array.of_list field_id_list) )
|> Array.of_list
in
field_array_id_list
else return @@ Array.of_list []
in
Mina_caqti.select_insert_into_cols ~select:("id", Caqti_type.int)
~table_name
~cols:([ "element_ids" ], Mina_caqti.array_int_typ)
~tannot:(function "element_ids" -> Some "int[]" | _ -> None)
(module Conn)
element_ids
field_array_id_list

let load (module Conn : CONNECTION) id =
Conn.find
Expand Down Expand Up @@ -1491,14 +1554,17 @@ module Zkapp_account_update_body = struct
Account_identifiers.add_if_doesn't_exist (module Conn) account_identifier
in
let%bind update_id =
Zkapp_updates.add_if_doesn't_exist (module Conn) body.update
Metrics.time ~label:"zkapp_updates.add"
@@ fun () -> Zkapp_updates.add_if_doesn't_exist (module Conn) body.update
in
let increment_nonce = body.increment_nonce in
let%bind events_id =
Zkapp_events.add_if_doesn't_exist (module Conn) body.events
Metrics.time ~label:"Zkapp_events.add"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need all of these Metrics.time?

@@ fun () -> Zkapp_events.add_if_doesn't_exist (module Conn) body.events
in
let%bind actions_id =
Zkapp_events.add_if_doesn't_exist (module Conn) body.actions
Metrics.time ~label:"Zkapp_actions.add"
@@ fun () -> Zkapp_events.add_if_doesn't_exist (module Conn) body.actions
in
let%bind call_data_id =
Zkapp_field.add_if_doesn't_exist (module Conn) body.call_data
Expand Down Expand Up @@ -1600,6 +1666,8 @@ module Zkapp_account_update = struct
(account_update : Account_update.Simple.t) =
let open Deferred.Result.Let_syntax in
let%bind body_id =
Metrics.time ~label:"Zkapp_account_update_body.add"
@@ fun () ->
Zkapp_account_update_body.add_if_doesn't_exist
(module Conn)
account_update.body
Expand Down Expand Up @@ -1897,11 +1965,15 @@ module User_command = struct
let open Deferred.Result.Let_syntax in
let zkapp_command = Zkapp_command.to_simple ps in
let%bind zkapp_fee_payer_body_id =
Metrics.time ~label:"Zkapp_fee_payer_body.add"
@@ fun () ->
Zkapp_fee_payer_body.add_if_doesn't_exist
(module Conn)
zkapp_command.fee_payer.body
in
let%bind zkapp_account_updates_ids =
Metrics.time ~label:"Zkapp_account_update.add"
@@ fun () ->
Mina_caqti.deferred_result_list_map zkapp_command.account_updates
~f:(Zkapp_account_update.add_if_doesn't_exist (module Conn))
>>| Array.of_list
Expand Down Expand Up @@ -2909,6 +2981,8 @@ module Block = struct
(failed_str, Some display)
in
let%bind _seq_no =
Metrics.time ~label:"adding_transactions"
@@ fun () ->
Mina_caqti.deferred_result_list_fold transactions ~init:0
~f:(fun sequence_no -> function
| { Mina_base.With_status.status
Expand All @@ -2934,6 +3008,9 @@ module Block = struct
let status, failure_reasons =
failure_reasons user_command.status
in
Metrics.time
~label:"block_and_zkapp_command.add_if_doesn't_exist"
@@ fun () ->
Block_and_zkapp_command.add_if_doesn't_exist
(module Conn)
~block_id ~zkapp_command_id:id ~sequence_no ~status
Expand Down Expand Up @@ -3473,48 +3550,58 @@ module Block = struct
then
(* a new block, allows marking some pending blocks as canonical *)
let%bind subchain_blocks =
get_subchain
(module Conn)
~start_block_id:highest_canonical_block_id ~end_block_id:block_id
Metrics.time ~label:"get_subchain (> canonical_height + k)"
(fun () ->
get_subchain
(module Conn)
~start_block_id:highest_canonical_block_id
~end_block_id:block_id )
in
let block_height_less_k_int64 = Int64.( - ) block.height k_int64 in
(* mark canonical, orphaned blocks in subchain at least k behind the new block *)
let canonical_blocks =
List.filter subchain_blocks ~f:(fun subchain_block ->
Int64.( <= ) subchain_block.height block_height_less_k_int64 )
in
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:()
~f:(fun () block ->
let%bind () =
mark_as_canonical (module Conn) ~state_hash:block.state_hash
in
mark_as_orphaned
(module Conn)
~state_hash:block.state_hash ~height:block.height )
Metrics.time ~label:"mark_as_canonical (> canonical_height + k)"
(fun () ->
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:()
~f:(fun () block ->
let%bind () =
mark_as_canonical (module Conn) ~state_hash:block.state_hash
in
mark_as_orphaned
(module Conn)
~state_hash:block.state_hash ~height:block.height ) )
else if Int64.( < ) block.height greatest_canonical_height then
(* a missing block added in the middle of canonical chain *)
let%bind canonical_block_above_id, _above_height =
get_nearest_canonical_block_above (module Conn) block.height
Metrics.time ~label:"get_nearest_canonical_block_above" (fun () ->
get_nearest_canonical_block_above (module Conn) block.height )
in
let%bind canonical_block_below_id, _below_height =
get_nearest_canonical_block_below (module Conn) block.height
Metrics.time ~label:"get_neareast_canonical_block_below" (fun () ->
get_nearest_canonical_block_below (module Conn) block.height )
in
(* we can always find this chain: the genesis block should be marked as canonical, and we've found a
canonical block above this one *)
let%bind canonical_blocks =
get_subchain
(module Conn)
~start_block_id:canonical_block_below_id
~end_block_id:canonical_block_above_id
Metrics.time ~label:"get_subchain (< canonical_height)" (fun () ->
get_subchain
(module Conn)
~start_block_id:canonical_block_below_id
~end_block_id:canonical_block_above_id )
in
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:()
~f:(fun () block ->
let%bind () =
mark_as_canonical (module Conn) ~state_hash:block.state_hash
in
mark_as_orphaned
(module Conn)
~state_hash:block.state_hash ~height:block.height )
Metrics.time ~label:"mark_as_canonical (< canonical_height)"
(fun () ->
Mina_caqti.deferred_result_list_fold canonical_blocks ~init:()
~f:(fun () block ->
let%bind () =
mark_as_canonical (module Conn) ~state_hash:block.state_hash
in
mark_as_orphaned
(module Conn)
~state_hash:block.state_hash ~height:block.height ) )
else
(* a block at or above highest canonical block, not high enough to mark any blocks as canonical *)
Deferred.Result.return ()
Expand Down Expand Up @@ -3644,7 +3731,12 @@ let add_block_aux ?(retries = 3) ~logger ~pool ~add_block ~hash
[%log info] "Attempting to add block data for $state_hash"
~metadata:
[ ("state_hash", Mina_base.State_hash.to_yojson state_hash) ] ;
let%bind block_id = add_block (module Conn : CONNECTION) block in
let%bind block_id =
O1trace.thread "archive_processor.add_block"
@@ fun () ->
Metrics.time ~label:"add_block"
@@ fun () -> add_block (module Conn : CONNECTION) block
in
(* if an existing block has a parent hash that's for the block just added,
set its parent id
*)
Expand All @@ -3654,7 +3746,10 @@ let add_block_aux ?(retries = 3) ~logger ~pool ~add_block ~hash
~parent_hash:(hash block) ~parent_id:block_id
in
(* update chain status for existing blocks *)
let%bind () = Block.update_chain_status (module Conn) ~block_id in
let%bind () =
Metrics.time ~label:"update_chain_status" (fun () ->
Block.update_chain_status (module Conn) ~block_id )
in
let%bind () =
match delete_older_than with
| Some num_blocks ->
Expand Down
4 changes: 2 additions & 2 deletions src/app/archive/zkapp_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ CREATE TABLE zkapp_field
*/
CREATE TABLE zkapp_field_array
( id serial PRIMARY KEY
, element_ids int[] NOT NULL
, element_ids int[] NOT NULL UNIQUE
);

CREATE INDEX idx_zkapp_field_array_element_ids ON zkapp_field_array(element_ids);
Expand Down Expand Up @@ -80,7 +80,7 @@ CREATE TABLE zkapp_action_states
*/
CREATE TABLE zkapp_events
( id serial PRIMARY KEY
, element_ids int[] NOT NULL
, element_ids int[] NOT NULL UNIQUE
);

CREATE INDEX idx_zkapp_events_element_ids ON zkapp_events(element_ids);
Expand Down
30 changes: 30 additions & 0 deletions src/lib/mina_caqti/mina_caqti.ml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,36 @@ let select_insert_into_cols ~(select : string * 'select Caqti_type.t)
~cols:(fst cols) () )
value

let sep_by_comma ?(parenthesis = false) xs =
List.map xs ~f:(if parenthesis then sprintf "('%s')" else sprintf "'%s'")
|> String.concat ~sep:", "

let insert_multi_into_col ~(table_name : string)
~(col : string * 'col Caqti_type.t) (module Conn : CONNECTION)
(values : string list) =
let open Deferred.Result.Let_syntax in
let insert =
sprintf
{sql| INSERT INTO %s (%s) VALUES %s
ON CONFLICT (%s)
DO NOTHING |sql}
table_name (fst col)
(sep_by_comma ~parenthesis:true values)
(fst col)
in
let%bind () = Conn.exec (Caqti_request.exec Caqti_type.unit insert) () in
let search =
sprintf
{sql| SELECT %s, id FROM %s
WHERE %s in (%s) |sql}
(fst col) table_name (fst col) (sep_by_comma values)
in
Conn.collect_list
(Caqti_request.collect Caqti_type.unit
Caqti_type.(tup2 (snd col) int)
search )
()

let query ~f pool =
match%bind Caqti_async.Pool.use f pool with
| Ok v ->
Expand Down