Skip to content

Commit

Permalink
chia|tests: Introduce PlotManager + some plot loading improvements …
Browse files Browse the repository at this point in the history
…and fixes (#7848)

* harvester|plotting|tests: Introduce `PlotManager` class

This moves all plots related and plot-directory related stuff from the
harvester into the class `PlotManager`, adjusts all related code
accordingly and adds some extra wrappers there.

* harvester|plotting|tests: Return how many new plots were loaded

* plotting: Fix `failed_to_open_filenames` re-try interval

With `< 1200` it just tries it on the next refresh.

* plotting: Fix and improve duplicates handling

* harvester|plotting: Thread locks for `PlotManager.plots`

* chia|tests: Load plots in batches

* chia|tests: Move plot refreshing into a separate thread

* plotting: Properly handle removed plots

And fix tests accordingly. It seems like this fix Chia-Network/chia-blockchain#3350 wasn't really a fix, rather adjusting to allow for a bug?

* plotting|harvester|tests: Introduce `PlotRefreshResult`

* tests: Expand `test_farmer_harvester_rpc.py`

* chia|tests: Move some stuff from `plot_tools.py` into new file `util.py`

* refactor: Rename `plot_tools.py` to `manager.py`

* chia|tests: Use pure dataclass for `PlotsRefreshParameter`

With `uint16` as type saving to config doesn't work, this is a
preparation for the next commit.

* harvester: Adjust deprecation message, use `info` instead of `warning`

* plotting: Fix typo

* refactor: Rename `filename` to `file_path`

Fits better and does avoid shadowing with filename from outer scope.

* chia|tests: Move some methods from `plotting.manager` to `plotting.util`

* plotting: Make `refresh_callback` mandatory

(cherry picked from commit 53149f2e57743e203aba1d25a89786edcf1f78ab)
  • Loading branch information
xdustinface authored and miosukakura committed Oct 15, 2021
1 parent 8e4ca5f commit 4dd09ba
Show file tree
Hide file tree
Showing 11 changed files with 900 additions and 246 deletions.
12 changes: 5 additions & 7 deletions chaingreen/cmds/plots.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


def show_plots(root_path: Path):
from chaingreen.plotting.plot_tools import get_plot_directories
from chia.plotting.util import get_plot_directories

print("Directories where plots are being searched for:")
print("Note that subdirectories must be added manually")
Expand Down Expand Up @@ -185,10 +185,9 @@ def check_cmd(
)
@click.pass_context
def add_cmd(ctx: click.Context, final_dir: str):
from chaingreen.plotting.plot_tools import add_plot_directory
from chia.plotting.util import add_plot_directory

add_plot_directory(Path(final_dir), ctx.obj["root_path"])
print(f'Added plot directory "{final_dir}".')
add_plot_directory(ctx.obj["root_path"], final_dir)


@plots_cmd.command("remove", short_help="Removes a directory of plots from config.yaml")
Expand All @@ -202,10 +201,9 @@ def add_cmd(ctx: click.Context, final_dir: str):
)
@click.pass_context
def remove_cmd(ctx: click.Context, final_dir: str):
from chaingreen.plotting.plot_tools import remove_plot_directory
from chia.plotting.util import remove_plot_directory

remove_plot_directory(Path(final_dir), ctx.obj["root_path"])
print(f'Removed plot directory "{final_dir}".')
remove_plot_directory(ctx.obj["root_path"], final_dir)


@plots_cmd.command("show", short_help="Shows the directory of current plots")
Expand Down
154 changes: 73 additions & 81 deletions chaingreen/harvester/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,65 +3,68 @@
import logging
from concurrent.futures.thread import ThreadPoolExecutor
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set, Tuple

from blspy import G1Element
from typing import Callable, Dict, List, Optional, Tuple

import chaingreen.server.ws_connection as ws # lgtm [py/import-and-import-from]
from chaingreen.consensus.constants import ConsensusConstants
from chaingreen.plotting.plot_tools import PlotInfo
from chaingreen.plotting.plot_tools import add_plot_directory as add_plot_directory_pt
from chaingreen.plotting.plot_tools import get_plot_directories as get_plot_directories_pt
from chaingreen.plotting.plot_tools import load_plots
from chaingreen.plotting.plot_tools import remove_plot_directory as remove_plot_directory_pt
from chaingreen.plotting.manager import PlotManager
from chaingreen.plotting.util import (
add_plot_directory,
get_plot_directories,
remove_plot_directory,
remove_plot,
PlotsRefreshParameter,
PlotRefreshResult,
)
from chaingreen.util.streamable import dataclass_from_dict

log = logging.getLogger(__name__)


class Harvester:
provers: Dict[Path, PlotInfo]
failed_to_open_filenames: Dict[Path, int]
no_key_filenames: Set[Path]
farmer_public_keys: List[G1Element]
pool_public_keys: List[G1Element]
plot_manager: PlotManager
root_path: Path
_is_shutdown: bool
executor: ThreadPoolExecutor
state_changed_callback: Optional[Callable]
cached_challenges: List
constants: ConsensusConstants
_refresh_lock: asyncio.Lock
event_loop: asyncio.events.AbstractEventLoop

def __init__(self, root_path: Path, config: Dict, constants: ConsensusConstants):
self.log = log
self.root_path = root_path
# TODO, remove checks below later after some versions / time
refresh_parameter: PlotsRefreshParameter = PlotsRefreshParameter()
if "plot_loading_frequency_seconds" in config:
self.log.info(
"`harvester.plot_loading_frequency_seconds` is deprecated. Consider replacing it with the new section "
"`harvester.plots_refresh_parameter`. See `initial-config.yaml`."
)
if "plots_refresh_parameter" in config:
refresh_parameter = dataclass_from_dict(PlotsRefreshParameter, config["plots_refresh_parameter"])

# From filename to prover
self.provers = {}
self.failed_to_open_filenames = {}
self.no_key_filenames = set()

self.plot_manager = PlotManager(
root_path, refresh_parameter=refresh_parameter, refresh_callback=self._plot_refresh_callback
)
self._is_shutdown = False
self.farmer_public_keys = []
self.pool_public_keys = []
self.match_str = None
self.show_memo: bool = False
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=config["num_threads"])
self.state_changed_callback = None
self.server = None
self.constants = constants
self.cached_challenges = []
self.log = log
self.state_changed_callback: Optional[Callable] = None
self.last_load_time: float = 0
self.plot_load_frequency = config.get("plot_loading_frequency_seconds", 120)
self.parallel_read: bool = config.get("parallel_read", True)

async def _start(self):
self._refresh_lock = asyncio.Lock()
self.event_loop = asyncio.get_event_loop()

def _close(self):
self._is_shutdown = True
self.executor.shutdown(wait=True)
self.plot_manager.stop_refreshing()

async def _await_closed(self):
pass
Expand All @@ -73,79 +76,68 @@ def _state_changed(self, change: str):
if self.state_changed_callback is not None:
self.state_changed_callback(change)

def on_disconnect(self, connection: ws.WSChaingreenConnection):
def _plot_refresh_callback(self, update_result: PlotRefreshResult):
self.log.info(
f"refresh_batch: loaded_plots {update_result.loaded_plots}, "
f"loaded_size {update_result.loaded_size / (1024 ** 4):.2f} TiB, "
f"removed_plots {update_result.removed_plots}, processed_plots {update_result.processed_files}, "
f"remaining_plots {update_result.remaining_files}, "
f"duration: {update_result.duration:.2f} seconds"
)
if update_result.loaded_plots > 0:
self.event_loop.call_soon_threadsafe(self._state_changed, "plots")

def on_disconnect(self, connection: ws.WSChiaConnection):
self.log.info(f"peer disconnected {connection.get_peer_info()}")
self._state_changed("close_connection")

def get_plots(self) -> Tuple[List[Dict], List[str], List[str]]:
self.log.debug(f"get_plots prover items: {len(self.provers)}")
self.log.debug(f"get_plots prover items: {self.plot_manager.plot_count()}")
response_plots: List[Dict] = []
for path, plot_info in self.provers.items():
prover = plot_info.prover
response_plots.append(
{
"filename": str(path),
"size": prover.get_size(),
"plot-seed": prover.get_id(), # Deprecated
"plot_id": prover.get_id(),
"pool_public_key": plot_info.pool_public_key,
"pool_contract_puzzle_hash": plot_info.pool_contract_puzzle_hash,
"plot_public_key": plot_info.plot_public_key,
"file_size": plot_info.file_size,
"time_modified": plot_info.time_modified,
}
)
self.log.debug(
f"get_plots response: plots: {len(response_plots)}, "
f"failed_to_open_filenames: {len(self.failed_to_open_filenames)}, "
f"no_key_filenames: {len(self.no_key_filenames)}"
)
return (
response_plots,
[str(s) for s, _ in self.failed_to_open_filenames.items()],
[str(s) for s in self.no_key_filenames],
)

async def refresh_plots(self):
locked: bool = self._refresh_lock.locked()
changed: bool = False
if not locked:
async with self._refresh_lock:
# Avoid double refreshing of plots
(changed, self.provers, self.failed_to_open_filenames, self.no_key_filenames,) = load_plots(
self.provers,
self.failed_to_open_filenames,
self.farmer_public_keys,
self.pool_public_keys,
self.match_str,
self.show_memo,
self.root_path,
with self.plot_manager:
for path, plot_info in self.plot_manager.plots.items():
prover = plot_info.prover
response_plots.append(
{
"filename": str(path),
"size": prover.get_size(),
"plot-seed": prover.get_id(), # Deprecated
"plot_id": prover.get_id(),
"pool_public_key": plot_info.pool_public_key,
"pool_contract_puzzle_hash": plot_info.pool_contract_puzzle_hash,
"plot_public_key": plot_info.plot_public_key,
"file_size": plot_info.file_size,
"time_modified": plot_info.time_modified,
}
)
if changed:
self._state_changed("plots")
self.log.debug(
f"get_plots response: plots: {len(response_plots)}, "
f"failed_to_open_filenames: {len(self.plot_manager.failed_to_open_filenames)}, "
f"no_key_filenames: {len(self.plot_manager.no_key_filenames)}"
)
return (
response_plots,
[str(s) for s, _ in self.plot_manager.failed_to_open_filenames.items()],
[str(s) for s in self.plot_manager.no_key_filenames],
)

def delete_plot(self, str_path: str):
path = Path(str_path).resolve()
if path in self.provers:
del self.provers[path]

# Remove absolute and relative paths
if path.exists():
path.unlink()

remove_plot(Path(str_path))
self.plot_manager.trigger_refresh()
self._state_changed("plots")
return True

async def add_plot_directory(self, str_path: str) -> bool:
add_plot_directory_pt(str_path, self.root_path)
await self.refresh_plots()
add_plot_directory(self.root_path, str_path)
self.plot_manager.trigger_refresh()
return True

async def get_plot_directories(self) -> List[str]:
return get_plot_directories_pt(self.root_path)
return get_plot_directories(self.root_path)

async def remove_plot_directory(self, str_path: str) -> bool:
remove_plot_directory_pt(str_path, self.root_path)
remove_plot_directory(self.root_path, str_path)
self.plot_manager.trigger_refresh()
return True

def set_server(self, server):
Expand Down
80 changes: 37 additions & 43 deletions chaingreen/harvester/harvester_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from chaingreen.consensus.pot_iterations import calculate_iterations_quality, calculate_sp_interval_iters
from chaingreen.harvester.harvester import Harvester
from chaingreen.plotting.plot_tools import PlotInfo, parse_plot_info
from chaingreen.plotting.util import PlotInfo, parse_plot_info
from chaingreen.protocols import harvester_protocol
from chaingreen.protocols.farmer_protocol import FarmingInfo
from chaingreen.protocols.harvester_protocol import Plot
Expand Down Expand Up @@ -37,14 +37,11 @@ async def harvester_handshake(self, harvester_handshake: harvester_protocol.Harv
as well as the farmer pks, which must be put into the plots, before the plotting process begins.
We cannot use any plots which have different keys in them.
"""
self.harvester.farmer_public_keys = harvester_handshake.farmer_public_keys
self.harvester.pool_public_keys = harvester_handshake.pool_public_keys

await self.harvester.refresh_plots()
self.harvester.plot_manager.set_public_keys(
harvester_handshake.farmer_public_keys, harvester_handshake.pool_public_keys
)

if len(self.harvester.provers) == 0:
self.harvester.log.warning("Not farming any plots on this harvester. Check your configuration.")
return None
self.harvester.plot_manager.start_refreshing()

@peer_required
@api_request
Expand All @@ -63,18 +60,13 @@ async def new_signage_point_harvester(
4. Looks up the full proof of space in the plot for each quality, approximately 64 reads per quality
5. Returns the proof of space to the farmer
"""
if len(self.harvester.pool_public_keys) == 0 or len(self.harvester.farmer_public_keys) == 0:
if not self.harvester.plot_manager.public_keys_available():
# This means that we have not received the handshake yet
return None

start = time.time()
assert len(new_challenge.challenge_hash) == 32

# Refresh plots to see if there are any new ones
if start - self.harvester.last_load_time > self.harvester.plot_load_frequency:
await self.harvester.refresh_plots()
self.harvester.last_load_time = time.time()

loop = asyncio.get_running_loop()

def blocking_lookup(filename: Path, plot_info: PlotInfo) -> List[Tuple[bytes32, ProofOfSpace]]:
Expand Down Expand Up @@ -189,22 +181,23 @@ async def lookup_challenge(
awaitables = []
passed = 0
total = 0
for try_plot_filename, try_plot_info in self.harvester.provers.items():
try:
if try_plot_filename.exists():
# Passes the plot filter (does not check sp filter yet though, since we have not reached sp)
# This is being executed at the beginning of the slot
total += 1
if ProofOfSpace.passes_plot_filter(
self.harvester.constants,
try_plot_info.prover.get_id(),
new_challenge.challenge_hash,
new_challenge.sp_hash,
):
passed += 1
awaitables.append(lookup_challenge(try_plot_filename, try_plot_info))
except Exception as e:
self.harvester.log.error(f"Error plot file {try_plot_filename} may no longer exist {e}")
with self.harvester.plot_manager:
for try_plot_filename, try_plot_info in self.harvester.plot_manager.plots.items():
try:
if try_plot_filename.exists():
# Passes the plot filter (does not check sp filter yet though, since we have not reached sp)
# This is being executed at the beginning of the slot
total += 1
if ProofOfSpace.passes_plot_filter(
self.harvester.constants,
try_plot_info.prover.get_id(),
new_challenge.challenge_hash,
new_challenge.sp_hash,
):
passed += 1
awaitables.append(lookup_challenge(try_plot_filename, try_plot_info))
except Exception as e:
self.harvester.log.error(f"Error plot file {try_plot_filename} may no longer exist {e}")

# Concurrently executes all lookups on disk, to take advantage of multiple disk parallelism
total_proofs_found = 0
Expand Down Expand Up @@ -239,7 +232,7 @@ async def lookup_challenge(
self.harvester.log.info(
f"{len(awaitables)} plots were eligible for farming {new_challenge.challenge_hash.hex()[:10]}..."
f" Found {total_proofs_found} proofs. Time: {time.time() - start:.5f} s. "
f"Total {len(self.harvester.provers)} plots"
f"Total {self.harvester.plot_manager.plot_count()} plots"
)

@api_request
Expand All @@ -250,19 +243,20 @@ async def request_signatures(self, request: harvester_protocol.RequestSignatures
be used for pooling.
"""
plot_filename = Path(request.plot_identifier[64:]).resolve()
try:
plot_info = self.harvester.provers[plot_filename]
except KeyError:
self.harvester.log.warning(f"KeyError plot {plot_filename} does not exist.")
return None
with self.harvester.plot_manager:
try:
plot_info = self.harvester.plot_manager.plots[plot_filename]
except KeyError:
self.harvester.log.warning(f"KeyError plot {plot_filename} does not exist.")
return None

# Look up local_sk from plot to save locked memory
(
pool_public_key_or_puzzle_hash,
farmer_public_key,
local_master_sk,
) = parse_plot_info(plot_info.prover.get_memo())
local_sk = master_sk_to_local_sk(local_master_sk)
# Look up local_sk from plot to save locked memory
(
pool_public_key_or_puzzle_hash,
farmer_public_key,
local_master_sk,
) = parse_plot_info(plot_info.prover.get_memo())
local_sk = master_sk_to_local_sk(local_master_sk)

if isinstance(pool_public_key_or_puzzle_hash, G1Element):
include_taproot = False
Expand Down
Loading

0 comments on commit 4dd09ba

Please sign in to comment.