Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rosetta: Fast canonical lookups chain_status #9797

Merged
merged 2 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 64 additions & 93 deletions src/app/rosetta/lib/account.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,11 @@ end

module Sql = struct
module Balance_from_last_relevant_command = struct
let query =
Caqti_request.find_opt
Caqti_type.(tup2 string int64)
Caqti_type.(tup2 int64 int64)
{sql|
WITH RECURSIVE chain AS (
(SELECT id, state_hash, parent_id, height, global_slot_since_genesis, timestamp
FROM blocks b
WHERE height = (select MAX(height) from blocks)
ORDER BY timestamp ASC
LIMIT 1)

UNION ALL

SELECT b.id, b.state_hash, b.parent_id, b.height, b.global_slot_since_genesis, b.timestamp FROM blocks b
INNER JOIN chain
ON b.id = chain.parent_id AND chain.id <> chain.parent_id
),

(* This is SQL is the common chunk shared between the query_recent and
* query_old that retreives the relevant balance changes from transactions
* at a block. *)
let common_sql =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment here how this is shared by query_old and query_recent would be helpful for future audiences

{sql|
relevant_internal_block_balances AS (
SELECT
block_internal_command.block_id,
Expand Down Expand Up @@ -114,6 +100,28 @@ relevant_block_balances AS (
UNION
(SELECT block_id, sequence_no, 0 AS secondary_sequence_no, balance FROM relevant_user_block_balances)
)
|sql}

let query_recent =
Caqti_request.find_opt
Caqti_type.(tup2 string int64)
Caqti_type.(tup2 int64 int64)
(sprintf {sql|
WITH RECURSIVE chain AS (
(SELECT id, state_hash, parent_id, height, global_slot_since_genesis, timestamp
FROM blocks b
WHERE height = (select MAX(height) from blocks)
ORDER BY timestamp ASC
LIMIT 1)

UNION ALL

SELECT b.id, b.state_hash, b.parent_id, b.height, b.global_slot_since_genesis, b.timestamp FROM blocks b
INNER JOIN chain
ON b.id = chain.parent_id AND chain.id <> chain.parent_id
),

%s

SELECT
chain.global_slot_since_genesis AS block_global_slot_since_genesis,
Expand All @@ -124,11 +132,35 @@ JOIN relevant_block_balances rbb ON chain.id = rbb.block_id
WHERE chain.height <= $2
ORDER BY (chain.height, sequence_no, secondary_sequence_no) DESC
LIMIT 1
|sql}
|sql} common_sql)

let query_old =
Caqti_request.find_opt
Caqti_type.(tup2 string int64)
Caqti_type.(tup2 int64 int64)
(sprintf {sql|
%s

SELECT
b.global_slot_since_genesis AS block_global_slot_since_genesis,
balance
FROM
blocks b
JOIN relevant_block_balances rbb ON b.id = rbb.block_id
WHERE b.height <= $2 AND b.chain_status = 'canonical'
ORDER BY (chain.height, sequence_no, secondary_sequence_no) DESC
LIMIT 1
|sql} common_sql)

let run (module Conn : Caqti_async.CONNECTION) requested_block_height
address =
Conn.find_opt query (address, requested_block_height)
let open Deferred.Result.Let_syntax in
(* buffer an epsilon of 5 just in case archive node isn't caught up *)
let%bind is_old_height = Sql.Block.run_is_old_height (module Conn) ~height:requested_block_height in
if is_old_height then
Conn.find_opt query_old (address, requested_block_height)
else
Conn.find_opt query_recent (address, requested_block_height)
end

let run (module Conn : Caqti_async.CONNECTION) block_query address =
Expand Down Expand Up @@ -387,79 +419,18 @@ module Balance = struct
in
{amount with metadata= Some metadata}
in
let find_via_db ~block_query ~address =
let%map block_identifier, {liquid_balance; total_balance} =
env.db_block_identifier_and_balance_info ~block_query ~address
in
{ Account_balance_response.block_identifier
; balances=
[ make_balance_amount
~liquid_balance:(Unsigned.UInt64.of_int64 liquid_balance)
~total_balance:(Unsigned.UInt64.of_int64 total_balance) ]
; metadata=Some (`Assoc [ ("created_via_historical_lookup", `Bool true ) ]) }
let%bind block_query =
Query.of_partial_identifier' req.block_identifier
in
match req.block_identifier with
| None ->
(* First try via GraphQL but fallback to archive database (and then
* omit the nonce!) if there was an issue *)
(match%bind
env.gql
?token_id:(Option.map token_id ~f:Unsigned.UInt64.to_string)
~address ()
with
| `Failed e ->
[%log' warn env.logger] "/account/balance : GraphQL request failed, trying again via the archive database" ~metadata:[("error", Errors.erase e |> Rosetta_models.Error.to_yojson)];
find_via_db ~block_query:None ~address
| `Successful res ->
let%bind account =
match res#account with
| None ->
M.fail (Errors.create (`Account_not_found address))
| Some account ->
M.return account
in
let%bind state_hash =
match (account#balance)#stateHash with
| None ->
M.fail
(Errors.create
~context:
"Failed accessing state hash from GraphQL \
communication with the Mina Daemon."
`Chain_info_missing)
| Some state_hash ->
M.return state_hash
in
let%map liquid_balance =
match (account#balance)#liquid with
| None ->
M.fail
(Errors.create
~context:
"Unable to access liquid balance since your Mina \
daemon isn't fully bootstrapped."
`Chain_info_missing)
| Some liquid_balance ->
M.return liquid_balance
in
let metadata =
Option.map
~f:(fun nonce -> `Assoc [("nonce", `Intlit nonce)])
account#nonce
in
let total_balance = (account#balance)#total in
{ Account_balance_response.block_identifier=
{ Block_identifier.index=
Unsigned.UInt32.to_int64 (account#balance)#blockHeight
; hash= state_hash }
; balances= [make_balance_amount ~liquid_balance ~total_balance]
; metadata })
| Some partial_identifier ->
(* TODO: Once multiple token_ids are possible we may need to add handling for that here *)
let%bind block_query =
Query.of_partial_identifier partial_identifier
in
find_via_db ~block_query ~address
let%map block_identifier, {liquid_balance; total_balance} =
env.db_block_identifier_and_balance_info ~block_query ~address
in
{ Account_balance_response.block_identifier
; balances=
[ make_balance_amount
~liquid_balance:(Unsigned.UInt64.of_int64 liquid_balance)
~total_balance:(Unsigned.UInt64.of_int64 total_balance) ]
; metadata=Some (`Assoc [ ("created_via_historical_lookup", `Bool true ) ]) }
end

module Real = Impl (Deferred.Result)
Expand Down
48 changes: 45 additions & 3 deletions src/app/rosetta/lib/block.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ module Block_query = struct
| Some index, Some hash ->
M.return (Some (`Those (`Height index, `Hash hash)))

let of_partial_identifier' (identifier : Partial_block_identifier.t option) =
of_partial_identifier (Option.value identifier ~default:{Partial_block_identifier.index = None; hash = None })

let is_genesis ~hash = function
| Some (`This (`Height index)) ->
Int64.equal index Network.genesis_block_height
Expand Down Expand Up @@ -241,15 +244,39 @@ module Sql = struct

let typ = Caqti_type.(tup3 int Archive_lib.Processor.Block.typ Extras.typ)

let query_height =
let query_max_height =
Caqti_request.find Caqti_type.unit Caqti_type.int64
{| SELECT MAX(height) FROM blocks |}

let query_height_old =
Caqti_request.find_opt Caqti_type.int64 typ
(* The archive database will only reconcile the canonical columns for
* blocks older than k + epsilon
*)
{|
SELECT c.id, c.state_hash, c.parent_id, c.parent_hash, c.creator_id, c.block_winner_id, c.snarked_ledger_hash_id, c.staking_epoch_data_id, c.next_epoch_data_id, c.ledger_hash, c.height, c.global_slot, c.global_slot_since_genesis, c.timestamp, pk.value as creator, bw.value as winner FROM blocks c
INNER JOIN public_keys pk
ON pk.id = c.creator_id
INNER JOIN public_keys bw
ON bw.id = c.block_winner_id
WHERE c.height = ? AND c.chain_status = 'canonical'
|}


let query_height_recent =
Caqti_request.find_opt Caqti_type.int64 typ
(* According to the clarification of the Rosetta spec here
* https://community.rosetta-api.org/t/querying-block-by-just-its-index/84/3 ,
* it is important to select only the block on the canonical chain for a
* given height, and not an arbitrary one.
*
* This query recursively traverses the blockchain from the longest tip
* backwards until it reaches a block of the given height. *)
* backwards until it reaches a block of the given height.
*
* This query is best used only for _short_ (around ~k blocks back
* + epsilon)
* requests since recursive queries stress PostgreSQL.
*)
{|
WITH RECURSIVE chain AS (
(SELECT id, state_hash, parent_id, parent_hash, creator_id, block_winner_id, snarked_ledger_hash_id, staking_epoch_data_id, next_epoch_data_id, ledger_hash, height, global_slot, global_slot_since_genesis, timestamp FROM blocks b WHERE height = (select MAX(height) from blocks)
Expand Down Expand Up @@ -312,9 +339,24 @@ WITH RECURSIVE chain AS (
let run_by_id (module Conn : Caqti_async.CONNECTION) id =
Conn.find_opt query_by_id id

let run_is_old_height (module Conn : Caqti_async.CONNECTION) ~height =
let open Deferred.Result.Let_syntax in
let open Int64 in
let%map max_height =
Conn.find query_max_height ()
in
(* buffer an epsilon of 5 just in case archive node isn't caught up *)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the criterion to distinguish old from new queries based just on the current height, or should it examine whether blocks have marked as canonical?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I was thinking we should just rely on height here -- we know performing the long recursive queries in rosetta cause Postgres to choke, so I'd rather the queries fail in the case that we haven't marked things as canonical as opposed to trying the big recursive queries in that case and then fail.

let epsilon = of_int 5 in
height < (max_height - (of_int Genesis_constants.k)) - epsilon

let run (module Conn : Caqti_async.CONNECTION) = function
| Some (`This (`Height h)) ->
Conn.find_opt query_height h
let open Deferred.Result.Let_syntax in
let%bind is_old_height = run_is_old_height (module Conn) ~height:h in
if is_old_height then
Conn.find_opt query_height_old h
else
Conn.find_opt query_height_recent h
| Some (`That (`Hash h)) ->
Conn.find_opt query_hash h
| Some (`Those (`Height height, `Hash hash)) ->
Expand Down