Skip to content

Commit

Permalink
Improved pool stats handling (#15764)
Browse files Browse the repository at this point in the history
* Improved pool stats handling

* Minor update

* Made `increment_pool_stats` free function

* Updated `increment_pool_stats` function args

* Fixed isort issue

* WIP: Adding test cases

* Fixed lint error

* Fixed lint error 2

* Fixed lint error 3

* Added test case
  • Loading branch information
ChiaMineJP committed Jul 17, 2023
1 parent aa3e156 commit e772e4f
Show file tree
Hide file tree
Showing 4 changed files with 858 additions and 28 deletions.
89 changes: 81 additions & 8 deletions chia/farmer/farmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
import traceback
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import Any, Dict, List, Optional, Set, Tuple, Union

import aiohttp
from blspy import AugSchemeMPL, G1Element, G2Element, PrivateKey
Expand Down Expand Up @@ -41,7 +41,7 @@
from chia.util.config import config_path_for_filename, load_config, lock_and_load_config, save_config
from chia.util.errors import KeychainProxyConnectionFailure
from chia.util.hash import std_hash
from chia.util.ints import uint8, uint16, uint64
from chia.util.ints import uint8, uint16, uint32, uint64
from chia.util.keychain import Keychain
from chia.util.logging import TimedDuplicateFilter
from chia.wallet.derive_keys import (
Expand All @@ -61,6 +61,44 @@
UPDATE_POOL_INFO_FAILURE_RETRY_INTERVAL: int = 120
UPDATE_POOL_FARMER_INFO_INTERVAL: int = 300


def strip_old_entries(pairs: List[Tuple[float, Any]], before: float) -> List[Tuple[float, Any]]:
for index, [timestamp, points] in enumerate(pairs):
if timestamp >= before:
if index == 0:
return pairs
if index > 0:
return pairs[index:]
return []


def increment_pool_stats(
pool_states: Dict[bytes32, Any],
p2_singleton_puzzlehash: bytes32,
name: str,
current_time: float,
count: int = 1,
value: Optional[Union[int, Dict[str, Any]]] = None,
) -> None:
if p2_singleton_puzzlehash not in pool_states:
return
pool_state = pool_states[p2_singleton_puzzlehash]
if f"{name}_since_start" in pool_state:
pool_state[f"{name}_since_start"] += count
if f"{name}_24h" in pool_state:
if value is None:
pool_state[f"{name}_24h"].append((uint32(current_time), pool_state["current_difficulty"]))
else:
pool_state[f"{name}_24h"].append((uint32(current_time), value))

# Age out old 24h information for every signage point regardless
# of any failures. Note that this still lets old data remain if
# the client isn't receiving signage points.
cutoff_24h = current_time - (24 * 60 * 60)
pool_state[f"{name}_24h"] = strip_old_entries(pairs=pool_state[f"{name}_24h"], before=cutoff_24h)
return


"""
HARVESTER PROTOCOL (FARMER <-> HARVESTER)
"""
Expand Down Expand Up @@ -256,8 +294,12 @@ def state_changed(self, change: str, data: Dict[str, Any]) -> None:

def handle_failed_pool_response(self, p2_singleton_puzzle_hash: bytes32, error_message: str) -> None:
self.log.error(error_message)
self.pool_state[p2_singleton_puzzle_hash]["pool_errors_24h"].append(
ErrorResponse(uint16(PoolErrorCode.REQUEST_FAILED.value), error_message).to_json_dict()
increment_pool_stats(
self.pool_state,
p2_singleton_puzzle_hash,
"pool_errors",
time.time(),
value=ErrorResponse(uint16(PoolErrorCode.REQUEST_FAILED.value), error_message).to_json_dict(),
)

def on_disconnect(self, connection: WSChiaConnection) -> None:
Expand Down Expand Up @@ -324,7 +366,13 @@ async def _pool_get_farmer(
log_level = logging.INFO
if "error_code" in response:
log_level = logging.WARNING
self.pool_state[pool_config.p2_singleton_puzzle_hash]["pool_errors_24h"].append(response)
increment_pool_stats(
self.pool_state,
pool_config.p2_singleton_puzzle_hash,
"pool_errors",
time.time(),
value=response,
)
self.log.log(log_level, f"GET /farmer response: {response}")
return response
else:
Expand Down Expand Up @@ -366,7 +414,13 @@ async def _pool_post_farmer(
log_level = logging.INFO
if "error_code" in response:
log_level = logging.WARNING
self.pool_state[pool_config.p2_singleton_puzzle_hash]["pool_errors_24h"].append(response)
increment_pool_stats(
self.pool_state,
pool_config.p2_singleton_puzzle_hash,
"pool_errors",
time.time(),
value=response,
)
self.log.log(log_level, f"POST /farmer response: {response}")
return response
else:
Expand Down Expand Up @@ -408,7 +462,13 @@ async def _pool_put_farmer(
log_level = logging.INFO
if "error_code" in response:
log_level = logging.WARNING
self.pool_state[pool_config.p2_singleton_puzzle_hash]["pool_errors_24h"].append(response)
increment_pool_stats(
self.pool_state,
pool_config.p2_singleton_puzzle_hash,
"pool_errors",
time.time(),
value=response,
)
self.log.log(log_level, f"PUT /farmer response: {response}")
else:
self.handle_failed_pool_response(
Expand Down Expand Up @@ -446,6 +506,7 @@ async def update_pool_state(self) -> None:

if p2_singleton_puzzle_hash not in self.pool_state:
self.pool_state[p2_singleton_puzzle_hash] = {
"p2_singleton_puzzle_hash": p2_singleton_puzzle_hash.hex(),
"points_found_since_start": 0,
"points_found_24h": [],
"points_acknowledged_since_start": 0,
Expand All @@ -455,11 +516,23 @@ async def update_pool_state(self) -> None:
"current_points": 0,
"current_difficulty": None,
"pool_errors_24h": [],
"valid_partials_since_start": 0,
"valid_partials_24h": [],
"invalid_partials_since_start": 0,
"invalid_partials_24h": [],
"stale_partials_since_start": 0,
"stale_partials_24h": [],
"missing_partials_since_start": 0,
"missing_partials_24h": [],
"authentication_token_timeout": None,
"plot_count": 0,
"pool_config": pool_config,
}
self.log.info(f"Added pool: {pool_config}")
else:
self.pool_state[p2_singleton_puzzle_hash]["pool_config"] = pool_config

pool_state = self.pool_state[p2_singleton_puzzle_hash]
pool_state["pool_config"] = pool_config

# Skip state update when self pooling
if pool_config.pool_url == "":
Expand Down
Loading

0 comments on commit e772e4f

Please sign in to comment.