Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance swarm when necessary #34

Merged
merged 12 commits into from
Oct 12, 2022
Prev Previous commit
Next Next commit
Fix ModuleContainer.shutdown() and its usages
  • Loading branch information
borzunov committed Oct 11, 2022
commit f3ea120c8194760617acb71bee887f198904e1a9
21 changes: 8 additions & 13 deletions src/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
prefetch_batches: int = 1,
sender_threads: int = 1,
mean_block_selection_delay: float = 0.5,
mean_balance_check_period: float = 60, # TODO:
mean_balance_check_period: float = 300, # TODO:
use_auth_token: Optional[str] = None,
load_in_8bit: bool = False,
*,
Expand Down Expand Up @@ -175,6 +175,7 @@ def run(self):

while True:
timeout = random.random() * 2 * self.mean_balance_check_period
# TODO: Follow ModuleContainer status (to restart/stop if it crashes)
if self.stop.wait(timeout):
return
if self._should_choose_other_blocks():
Expand Down Expand Up @@ -251,7 +252,7 @@ def __init__(
def run(self):
"""
Runs ModuleContainer in the current thread. Initializes dht if necessary, starts connection handlers,
runs hivemind.Runtime (self.runtime) to process incoming requests.
runs Runtime (self.runtime) to process incoming requests.
"""
logger.info(f"Serving {len(self.module_backends)} blocks:")
for expert_name, backend in self.module_backends.items():
Expand All @@ -267,15 +268,10 @@ def run(self):
if self.checkpoint_saver is not None:
self.checkpoint_saver.start()

for process in self.conn_handlers:
if not process.is_alive():
process.start()
process.ready.result()
for handler in self.conn_handlers:
handler.run_in_background()

try:
self.runtime.run()
finally:
self.shutdown()
self.runtime.run()

# noinspection PyMethodOverriding
@classmethod
Expand Down Expand Up @@ -413,9 +409,8 @@ def shutdown(self):

self.ready.clear()

for process in self.conn_handlers:
process.terminate()
process.join()
for handler in self.conn_handlers:
handler.shutdown()
logger.debug("Connection handlers terminated")

if self.checkpoint_saver is not None:
Expand Down