From b4a415800b1e92d58e6087a3654a09ccdb94483e Mon Sep 17 00:00:00 2001 From: Mariano Date: Sat, 1 May 2021 19:02:29 +0900 Subject: [PATCH 1/5] Improve wallet consistency --- chia/cmds/wallet_funcs.py | 6 +- chia/rpc/wallet_rpc_api.py | 216 +++++++++++++++------------ chia/wallet/cc_wallet/cc_wallet.py | 73 ++++----- chia/wallet/did_wallet/did_wallet.py | 83 +++++----- chia/wallet/wallet.py | 97 ++++++------ chia/wallet/wallet_state_manager.py | 20 +-- 6 files changed, 266 insertions(+), 229 deletions(-) diff --git a/chia/cmds/wallet_funcs.py b/chia/cmds/wallet_funcs.py index d1341b84d8c1..0d596bd193e5 100644 --- a/chia/cmds/wallet_funcs.py +++ b/chia/cmds/wallet_funcs.py @@ -112,7 +112,9 @@ async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint if typ != "STANDARD_WALLET": print(f"Wallet ID {wallet_id} type {typ} {summary['name']}") print(f" -Confirmed: " f"{balances['confirmed_wallet_balance']/units['colouredcoin']}") - print(f" -Confirmed - Pending Outgoing: {balances['unconfirmed_wallet_balance']/units['colouredcoin']}") + print( + f" -Confirmed minus Pending Outgoing: {balances['unconfirmed_wallet_balance']/units['colouredcoin']}" + ) print(f" -Spendable: {balances['spendable_balance']/units['colouredcoin']}") print(f" -Pending change: {balances['pending_change']/units['colouredcoin']}") else: @@ -122,7 +124,7 @@ async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint f"({balances['confirmed_wallet_balance']/units['chia']} {address_prefix})" ) print( - f" -Unconfirmed: {balances['unconfirmed_wallet_balance']} mojo " + f" -Confirmed - Pending Outgoing:: {balances['unconfirmed_wallet_balance']} mojo " f"({balances['unconfirmed_wallet_balance']/units['chia']} {address_prefix})" ) print( diff --git a/chia/rpc/wallet_rpc_api.py b/chia/rpc/wallet_rpc_api.py index 007c1c4a6c70..bddaa057f086 100644 --- a/chia/rpc/wallet_rpc_api.py +++ b/chia/rpc/wallet_rpc_api.py @@ -335,30 +335,37 @@ async def create_new_wallet(self, request: Dict): host = request["host"] if request["wallet_type"] == "cc_wallet": if request["mode"] == "new": - cc_wallet: CCWallet = await CCWallet.create_new_cc(wallet_state_manager, main_wallet, request["amount"]) - colour = cc_wallet.get_colour() - asyncio.create_task(self._create_backup_and_upload(host)) + async with self.service.wallet_state_manager.lock: + cc_wallet: CCWallet = await CCWallet.create_new_cc( + wallet_state_manager, main_wallet, request["amount"] + ) + colour = cc_wallet.get_colour() + asyncio.create_task(self._create_backup_and_upload(host)) return { "type": cc_wallet.type(), "colour": colour, "wallet_id": cc_wallet.id(), } elif request["mode"] == "existing": - cc_wallet = await CCWallet.create_wallet_for_cc(wallet_state_manager, main_wallet, request["colour"]) - asyncio.create_task(self._create_backup_and_upload(host)) + async with self.service.wallet_state_manager.lock: + cc_wallet = await CCWallet.create_wallet_for_cc( + wallet_state_manager, main_wallet, request["colour"] + ) + asyncio.create_task(self._create_backup_and_upload(host)) return {"type": cc_wallet.type()} elif request["wallet_type"] == "rl_wallet": if request["rl_type"] == "admin": log.info("Create rl admin wallet") - rl_admin: RLWallet = await RLWallet.create_rl_admin(wallet_state_manager) - success = await rl_admin.admin_create_coin( - uint64(int(request["interval"])), - uint64(int(request["limit"])), - request["pubkey"], - uint64(int(request["amount"])), - uint64(int(request["fee"])) if "fee" in request else uint64(0), - ) - asyncio.create_task(self._create_backup_and_upload(host)) + async with self.service.wallet_state_manager.lock: + rl_admin: RLWallet = await RLWallet.create_rl_admin(wallet_state_manager) + success = await rl_admin.admin_create_coin( + uint64(int(request["interval"])), + uint64(int(request["limit"])), + request["pubkey"], + uint64(int(request["amount"])), + uint64(int(request["fee"])) if "fee" in request else uint64(0), + ) + asyncio.create_task(self._create_backup_and_upload(host)) assert rl_admin.rl_info.admin_pubkey is not None return { "success": success, @@ -369,8 +376,9 @@ async def create_new_wallet(self, request: Dict): } elif request["rl_type"] == "user": log.info("Create rl user wallet") - rl_user: RLWallet = await RLWallet.create_rl_user(wallet_state_manager) - asyncio.create_task(self._create_backup_and_upload(host)) + async with self.service.wallet_state_manager.lock: + rl_user: RLWallet = await RLWallet.create_rl_user(wallet_state_manager) + asyncio.create_task(self._create_backup_and_upload(host)) assert rl_user.rl_info.user_pubkey is not None return { "id": rl_user.id(), @@ -385,13 +393,14 @@ async def create_new_wallet(self, request: Dict): backup_dids.append(hexstr_to_bytes(d)) if len(backup_dids) > 0: num_needed = uint64(request["num_of_backup_ids_needed"]) - did_wallet: DIDWallet = await DIDWallet.create_new_did_wallet( - wallet_state_manager, - main_wallet, - int(request["amount"]), - backup_dids, - uint64(num_needed), - ) + async with self.service.wallet_state_manager.lock: + did_wallet: DIDWallet = await DIDWallet.create_new_did_wallet( + wallet_state_manager, + main_wallet, + int(request["amount"]), + backup_dids, + uint64(num_needed), + ) my_did = did_wallet.get_my_DID() return { "success": True, @@ -400,9 +409,10 @@ async def create_new_wallet(self, request: Dict): "wallet_id": did_wallet.id(), } elif request["did_type"] == "recovery": - did_wallet = await DIDWallet.create_new_did_wallet_from_recovery( - wallet_state_manager, main_wallet, request["filename"] - ) + async with self.service.wallet_state_manager.lock: + did_wallet = await DIDWallet.create_new_did_wallet_from_recovery( + wallet_state_manager, main_wallet, request["filename"] + ) assert did_wallet.did_info.temp_coin is not None assert did_wallet.did_info.temp_puzhash is not None assert did_wallet.did_info.temp_pubkey is not None @@ -542,7 +552,7 @@ async def send_transaction(self, request): fee = uint64(request["fee"]) else: fee = uint64(0) - async with self.service.wallet_state_manager.tx_lock: + async with self.service.wallet_state_manager.lock: tx: TransactionRecord = await wallet.generate_signed_transaction(amount, puzzle_hash, fee) await wallet.push_transaction(tx) @@ -594,7 +604,7 @@ async def cc_spend(self, request): fee = uint64(request["fee"]) else: fee = uint64(0) - async with self.service.wallet_state_manager.tx_lock: + async with self.service.wallet_state_manager.lock: tx: TransactionRecord = await wallet.generate_signed_transaction([amount], [puzzle_hash], fee) await wallet.push_transaction(tx) @@ -615,11 +625,12 @@ async def create_offer_for_ids(self, request): offer = request["ids"] file_name = request["filename"] - ( - success, - spend_bundle, - error, - ) = await self.service.wallet_state_manager.trade_manager.create_offer_for_ids(offer, file_name) + async with self.service.wallet_state_manager.lock: + ( + success, + spend_bundle, + error, + ) = await self.service.wallet_state_manager.trade_manager.create_offer_for_ids(offer, file_name) if success: self.service.wallet_state_manager.trade_manager.write_offer_to_disk(Path(file_name), spend_bundle) return {} @@ -629,11 +640,12 @@ async def get_discrepancies_for_offer(self, request): assert self.service.wallet_state_manager is not None file_name = request["filename"] file_path = Path(file_name) - ( - success, - discrepancies, - error, - ) = await self.service.wallet_state_manager.trade_manager.get_discrepancies_for_offer(file_path) + async with self.service.wallet_state_manager.lock: + ( + success, + discrepancies, + error, + ) = await self.service.wallet_state_manager.trade_manager.get_discrepancies_for_offer(file_path) if success: return {"discrepancies": discrepancies} @@ -642,11 +654,12 @@ async def get_discrepancies_for_offer(self, request): async def respond_to_offer(self, request): assert self.service.wallet_state_manager is not None file_path = Path(request["filename"]) - ( - success, - trade_record, - error, - ) = await self.service.wallet_state_manager.trade_manager.respond_to_offer(file_path) + async with self.service.wallet_state_manager.lock: + ( + success, + trade_record, + error, + ) = await self.service.wallet_state_manager.trade_manager.respond_to_offer(file_path) if not success: raise ValueError(error) return {} @@ -683,10 +696,11 @@ async def cancel_trade(self, request: Dict): secure = request["secure"] trade_id = hexstr_to_bytes(request["trade_id"]) - if secure: - await wsm.trade_manager.cancel_pending_offer_safely(trade_id) - else: - await wsm.trade_manager.cancel_pending_offer(trade_id) + async with self.service.wallet_state_manager.lock: + if secure: + await wsm.trade_manager.cancel_pending_offer_safely(trade_id) + else: + await wsm.trade_manager.cancel_pending_offer(trade_id) return {} async def get_backup_info(self, request: Dict): @@ -725,18 +739,20 @@ async def did_update_recovery_ids(self, request): new_amount_verifications_required = uint64(request["num_verifications_required"]) else: new_amount_verifications_required = len(recovery_list) - success = await wallet.update_recovery_list(recovery_list, new_amount_verifications_required) - # Update coin with new ID info - updated_puz = await wallet.get_new_puzzle() - spend_bundle = await wallet.create_spend(updated_puz.get_tree_hash()) + async with self.service.wallet_state_manager.lock: + success = await wallet.update_recovery_list(recovery_list, new_amount_verifications_required) + # Update coin with new ID info + updated_puz = await wallet.get_new_puzzle() + spend_bundle = await wallet.create_spend(updated_puz.get_tree_hash()) if spend_bundle is not None and success: return {"success": True} return {"success": False} async def did_spend(self, request): wallet_id = int(request["wallet_id"]) - wallet: DIDWallet = self.service.wallet_state_manager.wallets[wallet_id] - spend_bundle = await wallet.create_spend(request["puzzlehash"]) + async with self.service.wallet_state_manager.lock: + wallet: DIDWallet = self.service.wallet_state_manager.wallets[wallet_id] + spend_bundle = await wallet.create_spend(request["puzzlehash"]) if spend_bundle is not None: return {"success": True} return {"success": False} @@ -745,7 +761,8 @@ async def did_get_did(self, request): wallet_id = int(request["wallet_id"]) wallet: DIDWallet = self.service.wallet_state_manager.wallets[wallet_id] my_did: str = wallet.get_my_DID() - coins = await wallet.select_coins(1) + async with self.service.wallet_state_manager.lock: + coins = await wallet.select_coins(1) if coins is None or coins == set(): return {"success": True, "wallet_id": wallet_id, "my_did": my_did} else: @@ -772,30 +789,31 @@ async def did_recovery_spend(self, request): if len(request["attest_filenames"]) < wallet.did_info.num_of_backup_ids_needed: return {"success": False, "reason": "insufficient messages"} - ( - info_list, - message_spend_bundle, - ) = await wallet.load_attest_files_for_recovery_spend(request["attest_filenames"]) - - if "pubkey" in request: - pubkey = G1Element.from_bytes(hexstr_to_bytes(request["pubkey"])) - else: - assert wallet.did_info.temp_pubkey is not None - pubkey = wallet.did_info.temp_pubkey - - if "puzhash" in request: - puzhash = hexstr_to_bytes(request["puzhash"]) - else: - assert wallet.did_info.temp_puzhash is not None - puzhash = wallet.did_info.temp_puzhash - - success = await wallet.recovery_spend( - wallet.did_info.temp_coin, - puzhash, - info_list, - pubkey, - message_spend_bundle, - ) + async with self.service.wallet_state_manager.lock: + ( + info_list, + message_spend_bundle, + ) = await wallet.load_attest_files_for_recovery_spend(request["attest_filenames"]) + + if "pubkey" in request: + pubkey = G1Element.from_bytes(hexstr_to_bytes(request["pubkey"])) + else: + assert wallet.did_info.temp_pubkey is not None + pubkey = wallet.did_info.temp_pubkey + + if "puzhash" in request: + puzhash = hexstr_to_bytes(request["puzhash"]) + else: + assert wallet.did_info.temp_puzhash is not None + puzhash = wallet.did_info.temp_puzhash + + success = await wallet.recovery_spend( + wallet.did_info.temp_coin, + puzhash, + info_list, + pubkey, + message_spend_bundle, + ) return {"success": success} async def did_get_pubkey(self, request): @@ -807,12 +825,13 @@ async def did_get_pubkey(self, request): async def did_create_attest(self, request): wallet_id = int(request["wallet_id"]) wallet: DIDWallet = self.service.wallet_state_manager.wallets[wallet_id] - info = await wallet.get_info_for_recovery() - coin = hexstr_to_bytes(request["coin_name"]) - pubkey = G1Element.from_bytes(hexstr_to_bytes(request["pubkey"])) - spend_bundle = await wallet.create_attestment( - coin, hexstr_to_bytes(request["puzhash"]), pubkey, request["filename"] - ) + async with self.service.wallet_state_manager.lock: + info = await wallet.get_info_for_recovery() + coin = hexstr_to_bytes(request["coin_name"]) + pubkey = G1Element.from_bytes(hexstr_to_bytes(request["pubkey"])) + spend_bundle = await wallet.create_attestment( + coin, hexstr_to_bytes(request["puzhash"]), pubkey, request["filename"] + ) if spend_bundle is not None: return { "success": True, @@ -856,14 +875,15 @@ async def rl_set_user_info(self, request): wallet_id = uint32(int(request["wallet_id"])) rl_user = self.service.wallet_state_manager.wallets[wallet_id] origin = request["origin"] - await rl_user.set_user_info( - uint64(request["interval"]), - uint64(request["limit"]), - origin["parent_coin_info"], - origin["puzzle_hash"], - origin["amount"], - request["admin_pubkey"], - ) + async with self.service.wallet_state_manager.lock: + await rl_user.set_user_info( + uint64(request["interval"]), + uint64(request["limit"]), + origin["parent_coin_info"], + origin["puzzle_hash"], + origin["amount"], + request["admin_pubkey"], + ) return {} async def send_clawback_transaction(self, request): @@ -873,7 +893,7 @@ async def send_clawback_transaction(self, request): wallet: RLWallet = self.service.wallet_state_manager.wallets[wallet_id] fee = int(request["fee"]) - async with self.service.wallet_state_manager.tx_lock: + async with self.service.wallet_state_manager.lock: tx = await wallet.clawback_rl_coin_transaction(fee) await wallet.push_transaction(tx) @@ -889,7 +909,8 @@ async def add_rate_limited_funds(self, request): puzzle_hash = wallet.rl_get_aggregation_puzzlehash(wallet.rl_info.rl_puzzle_hash) request["wallet_id"] = 1 request["puzzle_hash"] = puzzle_hash - await wallet.rl_add_funds(request["amount"], puzzle_hash, request["fee"]) + async with self.service.wallet_state_manager.lock: + await wallet.rl_add_funds(request["amount"], puzzle_hash, request["fee"]) return {"status": "SUCCESS"} async def get_farmed_amount(self, request): @@ -948,7 +969,8 @@ async def create_signed_transaction(self, request): if "coins" in request and len(request["coins"]) > 0: coins = set([Coin.from_json_dict(coin_json) for coin_json in request["coins"]]) - signed_tx = await self.service.wallet_state_manager.main_wallet.generate_signed_transaction( - amount_0, puzzle_hash_0, fee, coins=coins, ignore_max_send_amount=True, primaries=additional_outputs - ) + async with self.service.wallet_state_manager.lock: + signed_tx = await self.service.wallet_state_manager.main_wallet.generate_signed_transaction( + amount_0, puzzle_hash_0, fee, coins=coins, ignore_max_send_amount=True, primaries=additional_outputs + ) return {"signed_tx": signed_tx} diff --git a/chia/wallet/cc_wallet/cc_wallet.py b/chia/wallet/cc_wallet/cc_wallet.py index bb5ac1dfbb1a..ab27244b15a7 100644 --- a/chia/wallet/cc_wallet/cc_wallet.py +++ b/chia/wallet/cc_wallet/cc_wallet.py @@ -281,7 +281,7 @@ def get_colour(self) -> str: return bytes(self.cc_info.my_genesis_checker).hex() async def coin_added(self, coin: Coin, header_hash: bytes32, removals: List[Coin], height: uint32): - """ Notification from wallet state manager that wallet has been received. """ + """Notification from wallet state manager that wallet has been received.""" self.log.info(f"CC wallet has been notified that {coin} was added") search_for_parent: bool = True @@ -493,47 +493,50 @@ async def get_cc_spendable_coins(self, records=None) -> List[WalletCoinRecord]: return result async def select_coins(self, amount: uint64) -> Set[Coin]: - """ Returns a set of coins that can be used for generating a new transaction. """ - async with self.wallet_state_manager.lock: - spendable_am = await self.get_confirmed_balance() + """ + Returns a set of coins that can be used for generating a new transaction. + Note: Must be called under wallet state manager lock + """ - if amount > spendable_am: - error_msg = f"Can't select amount higher than our spendable balance {amount}, spendable {spendable_am}" - self.log.warning(error_msg) - raise ValueError(error_msg) + spendable_am = await self.get_confirmed_balance() - self.log.info(f"About to select coins for amount {amount}") - spendable: List[WalletCoinRecord] = await self.get_cc_spendable_coins() + if amount > spendable_am: + error_msg = f"Can't select amount higher than our spendable balance {amount}, spendable {spendable_am}" + self.log.warning(error_msg) + raise ValueError(error_msg) - sum = 0 - used_coins: Set = set() + self.log.info(f"About to select coins for amount {amount}") + spendable: List[WalletCoinRecord] = await self.get_cc_spendable_coins() - # Use older coins first - spendable.sort(key=lambda r: r.confirmed_block_height) + sum = 0 + used_coins: Set = set() - # Try to use coins from the store, if there isn't enough of "unused" - # coins use change coins that are not confirmed yet - unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet( - self.id() + # Use older coins first + spendable.sort(key=lambda r: r.confirmed_block_height) + + # Try to use coins from the store, if there isn't enough of "unused" + # coins use change coins that are not confirmed yet + unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet( + self.id() + ) + for coinrecord in spendable: + if sum >= amount and len(used_coins) > 0: + break + if coinrecord.coin.name() in unconfirmed_removals: + continue + sum += coinrecord.coin.amount + used_coins.add(coinrecord.coin) + self.log.info(f"Selected coin: {coinrecord.coin.name()} at height {coinrecord.confirmed_block_height}!") + + # This happens when we couldn't use one of the coins because it's already used + # but unconfirmed, and we are waiting for the change. (unconfirmed_additions) + if sum < amount: + raise ValueError( + "Can't make this transaction at the moment. Waiting for the change from the previous transaction." ) - for coinrecord in spendable: - if sum >= amount and len(used_coins) > 0: - break - if coinrecord.coin.name() in unconfirmed_removals: - continue - sum += coinrecord.coin.amount - used_coins.add(coinrecord.coin) - self.log.info(f"Selected coin: {coinrecord.coin.name()} at height {coinrecord.confirmed_block_height}!") - - # This happens when we couldn't use one of the coins because it's already used - # but unconfirmed, and we are waiting for the change. (unconfirmed_additions) - if sum < amount: - raise ValueError( - "Can't make this transaction at the moment. Waiting for the change from the previous transaction." - ) - self.log.info(f"Successfully selected coins: {used_coins}") - return used_coins + self.log.info(f"Successfully selected coins: {used_coins}") + return used_coins async def get_sigs(self, innerpuz: Program, innersol: Program, coin_name: bytes32) -> List[G2Element]: puzzle_hash = innerpuz.get_tree_hash() diff --git a/chia/wallet/did_wallet/did_wallet.py b/chia/wallet/did_wallet/did_wallet.py index befaca8711fd..f1942602515b 100644 --- a/chia/wallet/did_wallet/did_wallet.py +++ b/chia/wallet/did_wallet/did_wallet.py @@ -49,6 +49,9 @@ async def create_new_did_wallet( num_of_backup_ids_needed: uint64 = None, name: str = None, ): + """ + This must be called under the wallet state manager lock + """ self = DIDWallet() self.base_puzzle_program = None self.base_inner_puzzle_hash = None @@ -238,54 +241,53 @@ async def get_unconfirmed_balance(self, record_list=None) -> uint64: return uint64(result) async def select_coins(self, amount, exclude: List[Coin] = None) -> Optional[Set[Coin]]: - """ Returns a set of coins that can be used for generating a new transaction. """ - async with self.wallet_state_manager.lock: - if exclude is None: - exclude = [] - - spendable_amount = await self.get_spendable_balance() - if amount > spendable_amount: - self.log.warning(f"Can't select {amount}, from spendable {spendable_amount} for wallet id {self.id()}") - return None - - self.log.info(f"About to select coins for amount {amount}") - unspent: List[WalletCoinRecord] = list( - await self.wallet_state_manager.get_spendable_coins_for_wallet(self.wallet_info.id) - ) - sum_value = 0 - used_coins: Set = set() + """Returns a set of coins that can be used for generating a new transaction.""" + if exclude is None: + exclude = [] - # Use older coins first - unspent.sort(key=lambda r: r.confirmed_block_height) + spendable_amount = await self.get_spendable_balance() + if amount > spendable_amount: + self.log.warning(f"Can't select {amount}, from spendable {spendable_amount} for wallet id {self.id()}") + return None - # Try to use coins from the store, if there isn't enough of "unused" - # coins use change coins that are not confirmed yet - unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet( - self.wallet_info.id - ) - for coinrecord in unspent: - if sum_value >= amount and len(used_coins) > 0: - break - if coinrecord.coin.name() in unconfirmed_removals: - continue - if coinrecord.coin in exclude: - continue - sum_value += coinrecord.coin.amount - used_coins.add(coinrecord.coin) + self.log.info(f"About to select coins for amount {amount}") + unspent: List[WalletCoinRecord] = list( + await self.wallet_state_manager.get_spendable_coins_for_wallet(self.wallet_info.id) + ) + sum_value = 0 + used_coins: Set = set() - # This happens when we couldn't use one of the coins because it's already used - # but unconfirmed, and we are waiting for the change. (unconfirmed_additions) - if sum_value < amount: - raise ValueError( - "Can't make this transaction at the moment. Waiting for the change from the previous transaction." - ) + # Use older coins first + unspent.sort(key=lambda r: r.confirmed_block_height) + + # Try to use coins from the store, if there isn't enough of "unused" + # coins use change coins that are not confirmed yet + unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet( + self.wallet_info.id + ) + for coinrecord in unspent: + if sum_value >= amount and len(used_coins) > 0: + break + if coinrecord.coin.name() in unconfirmed_removals: + continue + if coinrecord.coin in exclude: + continue + sum_value += coinrecord.coin.amount + used_coins.add(coinrecord.coin) + + # This happens when we couldn't use one of the coins because it's already used + # but unconfirmed, and we are waiting for the change. (unconfirmed_additions) + if sum_value < amount: + raise ValueError( + "Can't make this transaction at the moment. Waiting for the change from the previous transaction." + ) self.log.info(f"Successfully selected coins: {used_coins}") return used_coins # This will be used in the recovery case where we don't have the parent info already async def coin_added(self, coin: Coin, header_hash: bytes32, removals: List[Coin], height: int): - """ Notification from wallet state manager that wallet has been received. """ + """Notification from wallet state manager that wallet has been received.""" self.log.info("DID wallet has been notified that coin was added") inner_puzzle = await self.inner_puzzle_for_did_puzzle(coin.puzzle_hash) new_info = DIDInfo( @@ -752,6 +754,9 @@ async def get_parent_for_coin(self, coin) -> Optional[CCParent]: return parent_info async def generate_new_decentralised_id(self, amount: uint64) -> Optional[SpendBundle]: + """ + This must be called under the wallet state manager lock + """ coins = await self.standard_wallet.select_coins(amount) if coins is None: diff --git a/chia/wallet/wallet.py b/chia/wallet/wallet.py index 2d45ddd078e5..aafcbc8edfb0 100644 --- a/chia/wallet/wallet.py +++ b/chia/wallet/wallet.py @@ -127,6 +127,7 @@ async def get_pending_change_balance(self) -> uint64: for record in unconfirmed_tx: if not record.is_in_mempool(): + self.log.warning(f"Record: {record} not in mempool") continue our_spend = False for coin in record.removals: @@ -229,60 +230,60 @@ def make_solution( return solution_for_conditions(condition_list) async def select_coins(self, amount, exclude: List[Coin] = None) -> Set[Coin]: - """Returns a set of coins that can be used for generating a new transaction.""" - async with self.wallet_state_manager.lock: - if exclude is None: - exclude = [] + """ + Returns a set of coins that can be used for generating a new transaction. + Note: This must be called under a wallet state manager lock + """ + if exclude is None: + exclude = [] - spendable_amount = await self.get_spendable_balance() + spendable_amount = await self.get_spendable_balance() - if amount > spendable_amount: - error_msg = ( - f"Can't select amount higher than our spendable balance. Amount: {amount}, spendable: " - f" {spendable_amount}" - ) - self.log.warning(error_msg) - raise ValueError(error_msg) - - self.log.info(f"About to select coins for amount {amount}") - unspent: List[WalletCoinRecord] = list( - await self.wallet_state_manager.get_spendable_coins_for_wallet(self.id()) + if amount > spendable_amount: + error_msg = ( + f"Can't select amount higher than our spendable balance. Amount: {amount}, spendable: " + f" {spendable_amount}" ) - sum_value = 0 - used_coins: Set = set() + self.log.warning(error_msg) + raise ValueError(error_msg) - # Use older coins first - unspent.sort(reverse=True, key=lambda r: r.coin.amount) + self.log.info(f"About to select coins for amount {amount}") + unspent: List[WalletCoinRecord] = list( + await self.wallet_state_manager.get_spendable_coins_for_wallet(self.id()) + ) + sum_value = 0 + used_coins: Set = set() - # Try to use coins from the store, if there isn't enough of "unused" - # coins use change coins that are not confirmed yet - unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet( - self.id() - ) - for coinrecord in unspent: - if sum_value >= amount and len(used_coins) > 0: - break - if coinrecord.coin.name() in unconfirmed_removals: - continue - if coinrecord.coin in exclude: - continue - sum_value += coinrecord.coin.amount - used_coins.add(coinrecord.coin) - self.log.debug( - f"Selected coin: {coinrecord.coin.name()} at height {coinrecord.confirmed_block_height}!" - ) + # Use older coins first + unspent.sort(reverse=True, key=lambda r: r.coin.amount) - # This happens when we couldn't use one of the coins because it's already used - # but unconfirmed, and we are waiting for the change. (unconfirmed_additions) - if sum_value < amount: - raise ValueError( - "Can't make this transaction at the moment. Waiting for the change from the previous transaction." - ) + # Try to use coins from the store, if there isn't enough of "unused" + # coins use change coins that are not confirmed yet + unconfirmed_removals: Dict[bytes32, Coin] = await self.wallet_state_manager.unconfirmed_removals_for_wallet( + self.id() + ) + for coinrecord in unspent: + if sum_value >= amount and len(used_coins) > 0: + break + if coinrecord.coin.name() in unconfirmed_removals: + continue + if coinrecord.coin in exclude: + continue + sum_value += coinrecord.coin.amount + used_coins.add(coinrecord.coin) + self.log.debug(f"Selected coin: {coinrecord.coin.name()} at height {coinrecord.confirmed_block_height}!") + + # This happens when we couldn't use one of the coins because it's already used + # but unconfirmed, and we are waiting for the change. (unconfirmed_additions) + if sum_value < amount: + raise ValueError( + "Can't make this transaction at the moment. Waiting for the change from the previous transaction." + ) self.log.debug(f"Successfully selected coins: {used_coins}") return used_coins - async def generate_unsigned_transaction( + async def _generate_unsigned_transaction( self, amount: uint64, newpuzzlehash: bytes32, @@ -294,6 +295,7 @@ async def generate_unsigned_transaction( ) -> List[CoinSolution]: """ Generates a unsigned transaction in form of List(Puzzle, Solutions) + Note: this must be called under a wallet state manager lock """ if primaries_input is None: primaries: Optional[List[Dict]] = None @@ -377,13 +379,16 @@ async def generate_signed_transaction( primaries: Optional[List[Dict[str, bytes32]]] = None, ignore_max_send_amount: bool = False, ) -> TransactionRecord: - """Use this to generate transaction.""" + """ + Use this to generate transaction. + Note: this must be called under a wallet state manager lock + """ if primaries is None: non_change_amount = amount else: non_change_amount = uint64(amount + sum(p["amount"] for p in primaries)) - transaction = await self.generate_unsigned_transaction( + transaction = await self._generate_unsigned_transaction( amount, puzzle_hash, fee, origin_id, coins, primaries, ignore_max_send_amount ) assert len(transaction) > 0 diff --git a/chia/wallet/wallet_state_manager.py b/chia/wallet/wallet_state_manager.py index 53931353178d..34eeb83c90fb 100644 --- a/chia/wallet/wallet_state_manager.py +++ b/chia/wallet/wallet_state_manager.py @@ -121,7 +121,6 @@ async def create( else: self.log = logging.getLogger(__name__) self.lock = asyncio.Lock() - self.tx_lock = asyncio.Lock() self.log.debug(f"Starting in db path: {db_path}") self.db_connection = await aiosqlite.connect(db_path) @@ -533,12 +532,13 @@ async def unconfirmed_removals_for_wallet(self, wallet_id: int) -> Dict[bytes32, return removals async def coins_of_interest_received(self, removals: List[Coin], additions: List[Coin], height: uint32): - for coin in additions: - await self.puzzle_hash_created(coin) - trade_additions = await self.coins_of_interest_added(additions, height) - trade_removals = await self.coins_of_interest_removed(removals, height) - if len(trade_additions) > 0 or len(trade_removals) > 0: - await self.trade_manager.coins_of_interest_farmed(trade_removals, trade_additions, height) + async with self.lock: + for coin in additions: + await self.puzzle_hash_created(coin) + trade_additions = await self.coins_of_interest_added(additions, height) + trade_removals = await self.coins_of_interest_removed(removals, height) + if len(trade_additions) > 0 or len(trade_removals) > 0: + await self.trade_manager.coins_of_interest_farmed(trade_removals, trade_additions, height) async def coins_of_interest_added(self, coins: List[Coin], height: uint32) -> List[Coin]: ( @@ -785,7 +785,7 @@ async def get_transaction(self, tx_id: bytes32) -> Optional[TransactionRecord]: async def get_filter_additions_removals( self, new_block: HeaderBlock, transactions_filter: bytes, fork_point_with_peak: Optional[uint32] ) -> Tuple[List[bytes32], List[bytes32]]: - """ Returns a list of our coin ids, and a list of puzzle_hashes that positively match with provided filter. """ + """Returns a list of our coin ids, and a list of puzzle_hashes that positively match with provided filter.""" # assert new_block.prev_header_hash in self.blockchain.blocks tx_filter = PyBIP158([b for b in transactions_filter]) @@ -863,7 +863,7 @@ async def get_filter_additions_removals( return additions_of_interest, removals_of_interest async def get_relevant_additions(self, additions: List[Coin]) -> List[Coin]: - """ Returns the list of coins that are relevant to us.(We can spend them) """ + """Returns the list of coins that are relevant to us.(We can spend them)""" result: List[Coin] = [] my_puzzle_hashes: Set[bytes32] = self.puzzle_store.all_puzzle_hashes @@ -891,7 +891,7 @@ async def get_wallet_for_coin(self, coin_id: bytes32) -> Any: return wallet async def get_relevant_removals(self, removals: List[Coin]) -> List[Coin]: - """ Returns a list of our unspent coins that are in the passed list. """ + """Returns a list of our unspent coins that are in the passed list.""" result: List[Coin] = [] wallet_coin_records = await self.coin_store.get_unspent_coins_at_height() From 33817111708517e1f63895e2f24554fa12a5def4 Mon Sep 17 00:00:00 2001 From: Mariano Date: Sat, 1 May 2021 19:59:41 +0900 Subject: [PATCH 2/5] Improve CLI significantly, and fix self-tx balances --- chia/cmds/wallet_funcs.py | 26 ++++++++++---------------- chia/rpc/wallet_rpc_client.py | 3 +++ chia/wallet/wallet_state_manager.py | 16 ++++++++++------ 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/chia/cmds/wallet_funcs.py b/chia/cmds/wallet_funcs.py index 0d596bd193e5..7b4b4fcb97d2 100644 --- a/chia/cmds/wallet_funcs.py +++ b/chia/cmds/wallet_funcs.py @@ -104,6 +104,7 @@ async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint address_prefix = config["network_overrides"]["config"][config["selected_network"]]["address_prefix"] print(f"Wallet height: {await wallet_client.get_height_info()}") + print(f"Sync status: {'Synced' if (await wallet_client.get_synced()) else 'Not synced'}") print(f"Balances, fingerprint: {fingerprint}") for summary in summaries_response: wallet_id = summary["id"] @@ -111,29 +112,22 @@ async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint typ = WalletType(int(summary["type"])).name if typ != "STANDARD_WALLET": print(f"Wallet ID {wallet_id} type {typ} {summary['name']}") - print(f" -Confirmed: " f"{balances['confirmed_wallet_balance']/units['colouredcoin']}") - print( - f" -Confirmed minus Pending Outgoing: {balances['unconfirmed_wallet_balance']/units['colouredcoin']}" - ) - print(f" -Spendable: {balances['spendable_balance']/units['colouredcoin']}") - print(f" -Pending change: {balances['pending_change']/units['colouredcoin']}") + print(f" -Total Balance: " f"{balances['confirmed_wallet_balance']/units['colouredcoin']}") + print(f" -Pending Total Balance: {balances['unconfirmed_wallet_balance']/units['colouredcoin']}") + print(f" -Spendable Balance: {balances['spendable_balance']/units['colouredcoin']}") else: print(f"Wallet ID {wallet_id} type {typ}") print( - f" -Confirmed: {balances['confirmed_wallet_balance']} mojo " - f"({balances['confirmed_wallet_balance']/units['chia']} {address_prefix})" - ) - print( - f" -Confirmed - Pending Outgoing:: {balances['unconfirmed_wallet_balance']} mojo " - f"({balances['unconfirmed_wallet_balance']/units['chia']} {address_prefix})" + f" -Total Balance: {balances['confirmed_wallet_balance']/units['chia']} {address_prefix} " + f"({balances['confirmed_wallet_balance']} mojo)" ) print( - f" -Spendable: {balances['spendable_balance']} mojo " - f"({balances['spendable_balance']/units['chia']} {address_prefix})" + f" -Pending Total Balance: {balances['unconfirmed_wallet_balance']/units['chia']} {address_prefix} " + f"({balances['unconfirmed_wallet_balance']} mojo)" ) print( - f" -Pending change: {balances['pending_change']} mojo " - f"({balances['pending_change']/units['chia']} {address_prefix})" + f" -Spendable: {balances['spendable_balance']/units['chia']} {address_prefix} " + f"({balances['spendable_balance']} mojo)" ) diff --git a/chia/rpc/wallet_rpc_client.py b/chia/rpc/wallet_rpc_client.py index fccce89a1e09..6b3e40ee6c20 100644 --- a/chia/rpc/wallet_rpc_client.py +++ b/chia/rpc/wallet_rpc_client.py @@ -74,6 +74,9 @@ async def delete_all_keys(self) -> None: async def get_sync_status(self) -> bool: return (await self.fetch("get_sync_status", {}))["syncing"] + async def get_synced(self) -> bool: + return (await self.fetch("get_sync_status", {}))["synced"] + async def get_height_info(self) -> uint32: return (await self.fetch("get_height_info", {}))["height"] diff --git a/chia/wallet/wallet_state_manager.py b/chia/wallet/wallet_state_manager.py index 34eeb83c90fb..db0fa956af33 100644 --- a/chia/wallet/wallet_state_manager.py +++ b/chia/wallet/wallet_state_manager.py @@ -497,14 +497,18 @@ async def get_unconfirmed_balance( """ confirmed = await self.get_confirmed_balance_for_wallet(wallet_id, unspent_coin_records) unconfirmed_tx: List[TransactionRecord] = await self.tx_store.get_unconfirmed_for_wallet(wallet_id) - removal_amount = 0 + removal_amount: int = 0 + addition_amount: int = 0 for record in unconfirmed_tx: - - removal_amount += record.amount - removal_amount += record.fee_amount - - result = confirmed - removal_amount + for removal in record.removals: + removal_amount += removal.amount + for addition in record.additions: + # This change or a self transaction + if self.does_coin_belong_to_wallet(addition, wallet_id): + addition_amount += addition.amount + + result = confirmed - removal_amount + addition_amount return uint128(result) async def unconfirmed_additions_for_wallet(self, wallet_id: int) -> Dict[bytes32, Coin]: From 51cc82e94e3404e4179e845833b00655c1e4e8b5 Mon Sep 17 00:00:00 2001 From: Mariano Date: Sat, 1 May 2021 20:30:03 +0900 Subject: [PATCH 3/5] Fix await --- chia/wallet/wallet_state_manager.py | 2 +- spam.sh | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 spam.sh diff --git a/chia/wallet/wallet_state_manager.py b/chia/wallet/wallet_state_manager.py index db0fa956af33..0fd66881386b 100644 --- a/chia/wallet/wallet_state_manager.py +++ b/chia/wallet/wallet_state_manager.py @@ -505,7 +505,7 @@ async def get_unconfirmed_balance( removal_amount += removal.amount for addition in record.additions: # This change or a self transaction - if self.does_coin_belong_to_wallet(addition, wallet_id): + if await self.does_coin_belong_to_wallet(addition, wallet_id): addition_amount += addition.amount result = confirmed - removal_amount + addition_amount diff --git a/spam.sh b/spam.sh new file mode 100644 index 000000000000..ada60b708c8b --- /dev/null +++ b/spam.sh @@ -0,0 +1,24 @@ + +# Let's countdown before sending a transaction +for ((count=$SEND_QTY; count>0; count--)); do + # Transaction fees + RANDOMISE_FEE="`seq $MIN_FEE 0.01 $MAX_FEE | sort -R | head -n 1`" + printf "Transaction fee: $RANDOMISE_FEE\n" + + # Amount to send + AMOUNT="`seq $MIN_AMOUNT 0.01 $MAX_AMOUNT | sort -R | head -n 1`" + printf "Amount to send: $AMOUNT\n" + + # Send transaction to a random wallet! + WALLET_ADDRESS="`sort -R $WALLETS | head -n 1`" + + #printf "chia wallet send -t $WALLET_ADDRESS -m $RANDOMISE_FEE -a $AMOUNT\n" + chia wallet send -t $WALLET_ADDRESS -m $RANDOMISE_FEE -a $AMOUNT + + # Let's sleep for a moment before the next transaction + TIME_LAG="`seq 0 1 300 | sort -R | head -n 1`" + printf "Waiting time: $TIME_LAG seconds\n" + sleep $TIME_LAG + + printf "\n\n"; +done \ No newline at end of file From a3446b46136b6be19d3b8734c481ec2d53b599ce Mon Sep 17 00:00:00 2001 From: Mariano Date: Sat, 1 May 2021 23:50:38 +0900 Subject: [PATCH 4/5] Fix deadlock and test --- chia/cmds/wallet_funcs.py | 9 +++---- chia/wallet/wallet_blockchain.py | 42 ++++++++++++++++------------- chia/wallet/wallet_state_manager.py | 14 +++++----- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/chia/cmds/wallet_funcs.py b/chia/cmds/wallet_funcs.py index 7b4b4fcb97d2..48912a874f28 100644 --- a/chia/cmds/wallet_funcs.py +++ b/chia/cmds/wallet_funcs.py @@ -132,14 +132,13 @@ async def print_balances(args: dict, wallet_client: WalletRpcClient, fingerprint async def get_wallet(wallet_client: WalletRpcClient, fingerprint: int = None) -> Optional[Tuple[WalletRpcClient, int]]: - fingerprints = await wallet_client.get_public_keys() + if fingerprint is not None: + fingerprints = [fingerprint] + else: + fingerprints = await wallet_client.get_public_keys() if len(fingerprints) == 0: print("No keys loaded. Run 'chia keys generate' or import a key") return None - if fingerprint is not None: - if fingerprint not in fingerprints: - print(f"Fingerprint {fingerprint} does not exist") - return None if len(fingerprints) == 1: fingerprint = fingerprints[0] if fingerprint is not None: diff --git a/chia/wallet/wallet_blockchain.py b/chia/wallet/wallet_blockchain.py index ca3e96109157..7bbea36460ab 100644 --- a/chia/wallet/wallet_blockchain.py +++ b/chia/wallet/wallet_blockchain.py @@ -65,6 +65,7 @@ class WalletBlockchain(BlockchainInterface): coins_of_interest_received: Any reorg_rollback: Any + wallet_state_manager_lock: asyncio.Lock # Whether blockchain is shut down or not _shut_down: bool @@ -79,6 +80,7 @@ async def create( consensus_constants: ConsensusConstants, coins_of_interest_received: Callable, # f(removals: List[Coin], additions: List[Coin], height: uint32) reorg_rollback: Callable, + lock: asyncio.Lock, ): """ Initializes a blockchain with the BlockRecords from disk, assuming they have all been @@ -100,6 +102,7 @@ async def create( self.coins_of_interest_received = coins_of_interest_received self.reorg_rollback = reorg_rollback self.log = logging.getLogger(__name__) + self.wallet_state_manager_lock = lock await self._load_chain_from_store() return self @@ -211,24 +214,27 @@ async def receive_block( ) # Always add the block to the database - async with self.block_store.db_wrapper.lock: - try: - await self.block_store.db_wrapper.begin_transaction() - await self.block_store.add_block_record(header_block_record, block_record) - self.add_block_record(block_record) - self.clean_block_record(block_record.height - self.constants.BLOCKS_CACHE_SIZE) - - fork_height: Optional[uint32] = await self._reconsider_peak(block_record, genesis, fork_point_with_peak) - await self.block_store.db_wrapper.commit_transaction() - except BaseException as e: - self.log.error(f"Error during db transaction: {e}") - await self.block_store.db_wrapper.rollback_transaction() - raise - if fork_height is not None: - self.log.info(f"💰 Updated wallet peak to height {block_record.height}, weight {block_record.weight}, ") - return ReceiveBlockResult.NEW_PEAK, None, fork_height - else: - return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None + async with self.wallet_state_manager_lock: + async with self.block_store.db_wrapper.lock: + try: + await self.block_store.db_wrapper.begin_transaction() + await self.block_store.add_block_record(header_block_record, block_record) + self.add_block_record(block_record) + self.clean_block_record(block_record.height - self.constants.BLOCKS_CACHE_SIZE) + + fork_height: Optional[uint32] = await self._reconsider_peak( + block_record, genesis, fork_point_with_peak + ) + await self.block_store.db_wrapper.commit_transaction() + except BaseException as e: + self.log.error(f"Error during db transaction: {e}") + await self.block_store.db_wrapper.rollback_transaction() + raise + if fork_height is not None: + self.log.info(f"💰 Updated wallet peak to height {block_record.height}, weight {block_record.weight}, ") + return ReceiveBlockResult.NEW_PEAK, None, fork_height + else: + return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None async def _reconsider_peak( self, block_record: BlockRecord, genesis: bool, fork_point_with_peak: Optional[uint32] diff --git a/chia/wallet/wallet_state_manager.py b/chia/wallet/wallet_state_manager.py index 0fd66881386b..185e4f4f2c7b 100644 --- a/chia/wallet/wallet_state_manager.py +++ b/chia/wallet/wallet_state_manager.py @@ -140,6 +140,7 @@ async def create( self.constants, self.coins_of_interest_received, self.reorg_rollback, + self.lock, ) self.weight_proof_handler = WeightProofHandler(self.constants, self.blockchain) @@ -536,13 +537,12 @@ async def unconfirmed_removals_for_wallet(self, wallet_id: int) -> Dict[bytes32, return removals async def coins_of_interest_received(self, removals: List[Coin], additions: List[Coin], height: uint32): - async with self.lock: - for coin in additions: - await self.puzzle_hash_created(coin) - trade_additions = await self.coins_of_interest_added(additions, height) - trade_removals = await self.coins_of_interest_removed(removals, height) - if len(trade_additions) > 0 or len(trade_removals) > 0: - await self.trade_manager.coins_of_interest_farmed(trade_removals, trade_additions, height) + for coin in additions: + await self.puzzle_hash_created(coin) + trade_additions = await self.coins_of_interest_added(additions, height) + trade_removals = await self.coins_of_interest_removed(removals, height) + if len(trade_additions) > 0 or len(trade_removals) > 0: + await self.trade_manager.coins_of_interest_farmed(trade_removals, trade_additions, height) async def coins_of_interest_added(self, coins: List[Coin], height: uint32) -> List[Coin]: ( From eaf200132cb93e9b210b452df69e0fa4781d7be8 Mon Sep 17 00:00:00 2001 From: Mariano Date: Sat, 1 May 2021 23:51:54 +0900 Subject: [PATCH 5/5] Remove spam.sh --- spam.sh | 24 ------------------------ 1 file changed, 24 deletions(-) delete mode 100644 spam.sh diff --git a/spam.sh b/spam.sh deleted file mode 100644 index ada60b708c8b..000000000000 --- a/spam.sh +++ /dev/null @@ -1,24 +0,0 @@ - -# Let's countdown before sending a transaction -for ((count=$SEND_QTY; count>0; count--)); do - # Transaction fees - RANDOMISE_FEE="`seq $MIN_FEE 0.01 $MAX_FEE | sort -R | head -n 1`" - printf "Transaction fee: $RANDOMISE_FEE\n" - - # Amount to send - AMOUNT="`seq $MIN_AMOUNT 0.01 $MAX_AMOUNT | sort -R | head -n 1`" - printf "Amount to send: $AMOUNT\n" - - # Send transaction to a random wallet! - WALLET_ADDRESS="`sort -R $WALLETS | head -n 1`" - - #printf "chia wallet send -t $WALLET_ADDRESS -m $RANDOMISE_FEE -a $AMOUNT\n" - chia wallet send -t $WALLET_ADDRESS -m $RANDOMISE_FEE -a $AMOUNT - - # Let's sleep for a moment before the next transaction - TIME_LAG="`seq 0 1 300 | sort -R | head -n 1`" - printf "Waiting time: $TIME_LAG seconds\n" - sleep $TIME_LAG - - printf "\n\n"; -done \ No newline at end of file