Skip to content

Commit

Permalink
Add lock, keep cache consistent (Chia-Network#3051)
Browse files Browse the repository at this point in the history
* execute task decorator

* use blockchain lock

* indentation

* lint

* execute_task
  • Loading branch information
Yostra committed Apr 29, 2021
1 parent 7cfc07d commit 5f9f631
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 20 deletions.
14 changes: 9 additions & 5 deletions chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def create(cls, db_wrapper: DBWrapper, cache_size: uint32 = uint32(60000))

await self.coin_record_db.execute("CREATE INDEX IF NOT EXISTS coin_spent on coin_record(spent)")

await self.coin_record_db.execute("CREATE INDEX IF NOT EXISTS coin_spent on coin_record(puzzle_hash)")
await self.coin_record_db.execute("CREATE INDEX IF NOT EXISTS coin_puzzle_hash on coin_record(puzzle_hash)")

await self.coin_record_db.commit()
self.coin_record_cache = LRUCache(cache_size)
Expand Down Expand Up @@ -106,15 +106,17 @@ async def new_block(self, block: FullBlock, tx_additions: List[Coin], tx_removal

# Checks DB and DiffStores for CoinRecord with coin_name and returns it
async def get_coin_record(self, coin_name: bytes32) -> Optional[CoinRecord]:
cached = self.coin_record_cache.get(coin_name.hex())
cached = self.coin_record_cache.get(coin_name)
if cached is not None:
return cached
cursor = await self.coin_record_db.execute("SELECT * from coin_record WHERE coin_name=?", (coin_name.hex(),))
row = await cursor.fetchone()
await cursor.close()
if row is not None:
coin = Coin(bytes32(bytes.fromhex(row[6])), bytes32(bytes.fromhex(row[5])), uint64.from_bytes(row[7]))
return CoinRecord(coin, row[1], row[2], row[3], row[4], row[8])
record = CoinRecord(coin, row[1], row[2], row[3], row[4], row[8])
self.coin_record_cache.put(record.coin.name(), record)
return record
return None

async def get_coins_added_at_height(self, height: uint32) -> List[CoinRecord]:
Expand Down Expand Up @@ -205,7 +207,7 @@ async def rollback_to_block(self, block_index: int):
coin_record.coinbase,
coin_record.timestamp,
)
self.coin_record_cache.put(coin_record.coin.name().hex(), new_record)
self.coin_record_cache.put(coin_record.coin.name(), new_record)
if int(coin_record.confirmed_block_index) > block_index:
delete_queue.append(coin_name)

Expand All @@ -223,6 +225,9 @@ async def rollback_to_block(self, block_index: int):

# Store CoinRecord in DB and ram cache
async def _add_coin_record(self, record: CoinRecord, allow_replace: bool) -> None:
if self.coin_record_cache.get(record.coin.name()) is not None:
self.coin_record_cache.remove(record.coin.name())

cursor = await self.coin_record_db.execute(
f"INSERT {'OR REPLACE ' if allow_replace else ''}INTO coin_record VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
Expand All @@ -238,7 +243,6 @@ async def _add_coin_record(self, record: CoinRecord, allow_replace: bool) -> Non
),
)
await cursor.close()
self.coin_record_cache.put(record.coin.name().hex(), record)

# Update coin_record to be spent in DB
async def _set_spent(self, coin_name: bytes32, index: uint32) -> uint64:
Expand Down
3 changes: 2 additions & 1 deletion chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from chia.types.mempool_item import MempoolItem
from chia.types.peer_info import PeerInfo
from chia.types.unfinished_block import UnfinishedBlock
from chia.util.api_decorators import api_request, peer_required, bytes_required
from chia.util.api_decorators import api_request, peer_required, bytes_required, execute_task
from chia.util.generator_tools import get_block_header
from chia.util.hash import std_hash
from chia.util.ints import uint8, uint32, uint64, uint128
Expand Down Expand Up @@ -92,6 +92,7 @@ async def respond_peers_introducer(
await peer.close()
return None

@execute_task
@peer_required
@api_request
async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaConnection) -> Optional[Message]:
Expand Down
9 changes: 9 additions & 0 deletions chia/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ def __init__(
self.app_shut_down_task: Optional[asyncio.Task] = None
self.received_message_callback: Optional[Callable] = None
self.api_tasks: Dict[bytes32, asyncio.Task] = {}
self.execute_tasks: Set[bytes32] = set()

self.tasks_from_peer: Dict[bytes32, Set[bytes32]] = {}
self.banned_peers: Dict[str, float] = {}
self.invalid_protocol_ban_seconds = 10
Expand Down Expand Up @@ -465,6 +467,8 @@ def cancel_tasks_from_peer(self, peer_id: bytes32):

task_ids = self.tasks_from_peer[peer_id]
for task_id in task_ids:
if task_id in self.execute_tasks:
continue
task = self.api_tasks[task_id]
task.cancel()

Expand Down Expand Up @@ -501,6 +505,9 @@ async def api_call(full_message: Message, connection: WSChiaConnection, task_id)
if self.api.api_ready is False:
return None

if hasattr(f, "execute_task"):
self.execute_tasks.add(task_id)

if hasattr(f, "peer_required"):
coroutine = f(full_message.data, connection)
else:
Expand Down Expand Up @@ -542,6 +549,8 @@ async def wrapped_coroutine() -> Optional[Message]:
self.api_tasks.pop(task_id)
if task_id in self.tasks_from_peer[connection.peer_node_id]:
self.tasks_from_peer[connection.peer_node_id].remove(task_id)
if task_id in self.execute_tasks:
self.execute_tasks.remove(task_id)

task_id = token_bytes()
api_task = asyncio.create_task(api_call(payload_inc, connection_inc, task_id))
Expand Down
17 changes: 4 additions & 13 deletions chia/simulator/full_node_simulator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio
import traceback
from typing import List, Optional

from chia.consensus.block_record import BlockRecord
Expand All @@ -16,7 +14,6 @@ def __init__(self, full_node, block_tools) -> None:
super().__init__(full_node)
self.bt = block_tools
self.full_node = full_node
self.lock = asyncio.Lock()
self.config = full_node.config
self.time_per_block = None
if "simulation" in self.config and self.config["simulation"] is True:
Expand Down Expand Up @@ -47,8 +44,7 @@ async def get_all_full_blocks(self) -> List[FullBlock]:

@api_request
async def farm_new_transaction_block(self, request: FarmNewBlockProtocol):
await self.lock.acquire()
try:
async with self.full_node.blockchain.lock:
self.log.info("Farming new block!")
current_blocks = await self.get_all_full_blocks()
if len(current_blocks) == 0:
Expand Down Expand Up @@ -77,16 +73,11 @@ async def farm_new_transaction_block(self, request: FarmNewBlockProtocol):
previous_generator=self.full_node.full_node_store.previous_generator,
)
rr = RespondBlock(more[-1])
await self.full_node.respond_block(rr)
except Exception as e:
error_stack = traceback.format_exc()
self.log.error(f"Error while farming block: {error_stack} {e}")
finally:
self.lock.release()
await self.full_node.respond_block(rr)

@api_request
async def farm_new_block(self, request: FarmNewBlockProtocol):
async with self.lock:
async with self.full_node.blockchain.lock:
self.log.info("Farming new block!")
current_blocks = await self.get_all_full_blocks()
if len(current_blocks) == 0:
Expand All @@ -111,7 +102,7 @@ async def farm_new_block(self, request: FarmNewBlockProtocol):
current_time=self.use_current_time,
)
rr: RespondBlock = RespondBlock(more[-1])
await self.full_node.respond_block(rr)
await self.full_node.respond_block(rr)

@api_request
async def reorg_from_index_to_new_index(self, request: ReorgProtocol):
Expand Down
8 changes: 8 additions & 0 deletions chia/util/api_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ def inner():
return func

return inner()


def execute_task(func):
def inner():
setattr(func, "execute_task", True)
return func

return inner()
3 changes: 2 additions & 1 deletion chia/wallet/wallet_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from chia.server.outbound_message import NodeType
from chia.server.ws_connection import WSChiaConnection
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.util.api_decorators import api_request, peer_required
from chia.util.api_decorators import api_request, peer_required, execute_task
from chia.util.errors import Err
from chia.wallet.wallet_node import WalletNode

Expand Down Expand Up @@ -39,6 +39,7 @@ async def reject_additions_request(self, response: wallet_protocol.RejectAdditio
"""
pass

@execute_task
@peer_required
@api_request
async def new_peak_wallet(self, peak: wallet_protocol.NewPeakWallet, peer: WSChiaConnection):
Expand Down

0 comments on commit 5f9f631

Please sign in to comment.