Skip to content

Commit

Permalink
cleaning up cs
Browse files Browse the repository at this point in the history
  • Loading branch information
PooriaNamyar committed Apr 1, 2024
1 parent 0675f16 commit 783b9f6
Show file tree
Hide file tree
Showing 15 changed files with 454 additions and 451 deletions.
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Solving Max-Min Fair Resource Allocations Quickly on Large Graphs

`Soroush` is a scalable and general max-min fair allocator. It contains a group of approximate and heuristic methods that allow users to control the trade-offs between efficiency, fairness and speed. For more information, see our NSDI24 paper ([Solving Max-Min Fair Resource Allocations Quickly on Large Graphs](https://www.usenix.org/conference/nsdi24/presentation/namyar-solving)).


## Code Structure
```
├── cluster_scheduling # Scripts and implementation for the CS usercase.
| |
| ├── alg # implementation of all the allocators in Soroush.
| |
| ├── scripts # code for generating different problem instances and benchmarking different allocators.
| |
| └── utilities # common utility functions for cluster scheduling.
|
|
└── traffic_engineering # Scripts and implementations for the TE usecase
|
├── alg # implementation of all the allocators in Soroush
|
├── benchmarks #
|
├── scripts #
|
└── utilities #
```

### Installation

Please refer to the Readme under `cluster_scheduling` and `traffic_engineering` for problem specific guidelines.
## Citation
```bibtex
@inproceedings{soroush,
author = {Namyar, Pooria and Arzani, Behnaz and Kandula, Srikanth and Segarra, Santiago and Crankshaw, Daniel and Krishnaswamy, Umesh and Govindan, Ramesh and Raj, Himanshu},
title = {{S}olving {M}ax-{M}in {F}air {R}esource {A}llocations
{Q}uickly on {L}arge {G}raphs},
booktitle = {21st USENIX Symposium on Networked Systems Design and
Implementation (NSDI 24)},
year = {2024},
}
```
3 changes: 1 addition & 2 deletions cluster_scheduling/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Cluster Scheduling
### Instructions
1. Download Gavel from https://github.com/stanford-futuredata/gavel.git
2. In our evaluations, we used Gurobi, so please change the Gavel's solver to point to Gurobi.
1. Specifically, in 'scheduler/policies/max_min_fairness_water_filling.py' change line 150 and 229.
2. In our evaluations, we used Gurobi, so please change the Gavel's solver to point to Gurobi. Specifically, change the solvers in line 150 and 229 of 'scheduler/policies/max_min_fairness_water_filling.py' to cp.GUROBI.

Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,27 @@
import numpy as np
from scipy.sparse import csr_matrix

from utilities import constants
from scripts.problem import Problem
from gavel.scheduler.job_id_pair import JobIdPair
from alg import waterfilling_utils


def get_scale_matrix(scale_factor_vector, priority_vector, row_list, column_list, num_tokens, num_rows, num_cols,
num_jobs, num_sub_jobs, num_gpus, split_ratio_mapping, priority_aware=False, throughput_aware=False,
biased_toward_lower_norm_eff_thru=False, bias_allocation_weight=None, bias_alpha=None):
assert not throughput_aware

bias_coeff = 1
if priority_aware:
bias_coeff *= priority_vector.flatten()
if biased_toward_lower_norm_eff_thru:
max_allocation = np.average(bias_allocation_weight)
bias_coeff *= np.repeat(0.000001 + np.power(bias_alpha, bias_allocation_weight / max_allocation), repeats=num_gpus)

weight_matrix = split_ratio_mapping.flatten() * bias_coeff
scale_data_list_1 = weight_matrix
weight_matrix = weight_matrix / scale_factor_vector.flatten()
scale_data_list_2 = weight_matrix
scale_data_list = np.concatenate((scale_data_list_1, scale_data_list_2))

scale_matrix = csr_matrix((scale_data_list, (row_list, column_list)),
shape=(num_rows, num_cols))
return scale_matrix, weight_matrix


def get_rates(problem: Problem, num_iter_approx_water, num_iter_bet, normalized_throughput_coeff=None,
def get_rates(problem: Problem, num_iter_approx_water, num_iter_adapt_water, normalized_throughput_coeff=None,
throughput_coeff=None, scale_factor_vector=None, priority_vector=None, priority_aware=False,
throughput_aware=False, break_down=False, return_matrix=False, biased_toward_lower_norm_eff_thru=False,
biased_alpha=None):
""" A fast heuristic to compute an approximate max-min fair solution without any optimizations.
Args:
problem: a description of a cluster scheduling problem (e.g., available GPUs).
num_iter_approx_water: number of iterations for the underlying approx waterfiller
(see approx_waterfiller for more details).
num_iter_adapt_water: number of iterations for adaptive waterfiller.
Returns:
job_id_to_job_rate_mapping: a mapping from job id to its assignment from each GPU.
dur: time to find the allocation.
"""
assert num_iter_approx_water == 1
st_time = datetime.now()
if throughput_coeff is None:
Expand Down Expand Up @@ -66,18 +54,17 @@ def get_rates(problem: Problem, num_iter_approx_water, num_iter_bet, normalized_
computation_dur = 0
updating_split_ratios_dur = 0

# time_cong = np.array([0, 0, 0, 0, 0, 0, 0], dtype=np.float64)
for iter_bet_no in range(num_iter_bet):
for iter_bet_no in range(num_iter_adapt_water):
if break_down:
checkpoint = datetime.now()

scale_matrix, weight_matrix = get_scale_matrix(scale_factor_vector, priority_vector, row_list, column_list, num_tokens,
num_rows, num_sub_jobs, num_jobs, num_sub_jobs, num_gpu_types,
split_ratios, priority_aware=priority_aware,
throughput_aware=throughput_aware,
biased_toward_lower_norm_eff_thru=biased_toward_lower_norm_eff_thru,
bias_allocation_weight=bias_allocation_weight,
bias_alpha=biased_alpha)
scale_matrix, weight_matrix = _get_scale_matrix(scale_factor_vector, priority_vector, row_list, column_list, num_tokens,
num_rows, num_sub_jobs, num_jobs, num_sub_jobs, num_gpu_types,
split_ratios, priority_aware=priority_aware,
throughput_aware=throughput_aware,
biased_toward_lower_norm_eff_thru=biased_toward_lower_norm_eff_thru,
bias_allocation_weight=bias_allocation_weight,
bias_alpha=biased_alpha)
if break_down:
routing_matrix_dur += (datetime.now() - checkpoint).total_seconds()
checkpoint = datetime.now()
Expand All @@ -101,19 +88,9 @@ def get_rates(problem: Problem, num_iter_approx_water, num_iter_bet, normalized_
computation_dur += (datetime.now() - checkpoint).total_seconds()
checkpoint = datetime.now()

if iter_bet_no == num_iter_bet - 1:
if iter_bet_no == num_iter_adapt_water - 1:
break

# split_ratios_2 = np.empty((num_jobs, num_gpu_types))
# bias_allocation_weight_2 = np.empty_like(bias_allocation_weight)
# for jid, (priority_weight, scale_factor, throughput_list) in list_jobs:
# sub_jid_list = jid_to_sub_jid_mapping[jid]
# throughput = final_job_allocation[sub_jid_list] * throughput_list
# total_allocation = np.add.reduce(throughput)
# if biased_toward_lower_norm_eff_thru:
# bias_allocation_weight_2[jid] = total_allocation * scale_factor / (priority_weight * np.average(throughput_list))
# split_ratios_2[jid] = throughput / total_allocation

throughput = np.reshape(final_job_allocation, (num_jobs, num_gpu_types))
total_allocation = np.add.reduce(throughput * throughput_coeff, axis=1)
if biased_toward_lower_norm_eff_thru:
Expand All @@ -137,10 +114,33 @@ def get_rates(problem: Problem, num_iter_approx_water, num_iter_bet, normalized_
sub_jid_list = jid_to_sub_jid_mapping[jid]
for gid, gpu in enumerate(problem.gpu_list):
job_id_to_job_rate_mapping[JobIdPair(jid, None)][gpu] = final_job_allocation[sub_jid_list][gid]
print(f"approx-waterfilling;", dur)
print(f"approx-waterfilling: {dur}")
return job_id_to_job_rate_mapping, dur


def _get_scale_matrix(scale_factor_vector, priority_vector, row_list, column_list, num_tokens, num_rows, num_cols,
num_jobs, num_sub_jobs, num_gpus, split_ratio_mapping, priority_aware=False, throughput_aware=False,
biased_toward_lower_norm_eff_thru=False, bias_allocation_weight=None, bias_alpha=None):
assert not throughput_aware

bias_coeff = 1
if priority_aware:
bias_coeff *= priority_vector.flatten()
if biased_toward_lower_norm_eff_thru:
max_allocation = np.average(bias_allocation_weight)
bias_coeff *= np.repeat(0.000001 + np.power(bias_alpha, bias_allocation_weight / max_allocation), repeats=num_gpus)

weight_matrix = split_ratio_mapping.flatten() * bias_coeff
scale_data_list_1 = weight_matrix
weight_matrix = weight_matrix / scale_factor_vector.flatten()
scale_data_list_2 = weight_matrix
scale_data_list = np.concatenate((scale_data_list_1, scale_data_list_2))

scale_matrix = csr_matrix((scale_data_list, (row_list, column_list)),
shape=(num_rows, num_cols))
return scale_matrix, weight_matrix


def _apply_congestion(scale_matrix, job_allocation, non_zeros_jids, gpu_cap, update_rate):
job_allocation_on_gpus = job_allocation[non_zeros_jids]
scaled_allocation = job_allocation_on_gpus * scale_matrix
Expand All @@ -160,38 +160,3 @@ def _apply_congestion(scale_matrix, job_allocation, non_zeros_jids, gpu_cap, upd
if update_rate:
job_allocation[non_zeros_jids] = job_allocation_on_gpus
return fair_share

#
# def _apply_congestion(scale_matrix, job_allocation, non_zeros_jids, gpu_cap, update_rate):
# # time_1 = time_2 = time_3 = time_4 = time_5 = time_6 = time_7 = 0
# # st_time = datetime.now()
# job_allocation_on_gpus = job_allocation[non_zeros_jids]
# scaled_allocation = job_allocation_on_gpus * scale_matrix
# # time_1 = (datetime.now() - st_time).total_seconds()
# # checkpoint1 = datetime.now()
# if np.add.reduce(scaled_allocation) <= gpu_cap or job_allocation_on_gpus.shape[0] == 0:
# return np.inf#, np.array([time_1, time_2, time_3, time_4, time_5, time_6, time_7])
# mask = np.arange(job_allocation_on_gpus.shape[0])
# # time_2 = (datetime.now() - checkpoint1).total_seconds()
# while mask.shape[0]:
# # checkpoint3 = datetime.now()
# num_scaled_jobs = np.add.reduce(scale_matrix[mask])
# fair_share = gpu_cap / num_scaled_jobs
# under_flows = (job_allocation_on_gpus[mask] >= fair_share)
# # time_3 += (datetime.now() - checkpoint3).total_seconds()
# # checkpoint4 = datetime.now()
# if np.logical_and.reduce(under_flows):
# job_allocation_on_gpus[mask] = fair_share
# # time_4 += (datetime.now() - checkpoint4).total_seconds()
# break
# else:
# # time_5 += (datetime.now() - checkpoint4).total_seconds()
# # checkpoint4 = datetime.now()
# gpu_cap -= scaled_allocation[mask] @ (1 - under_flows)
# # time_6 += (datetime.now() - checkpoint4).total_seconds()
# # checkpoint4 = datetime.now()
# mask = np.compress(under_flows, mask)
# # time_7 += (datetime.now() - checkpoint4).total_seconds()
# if update_rate:
# job_allocation[non_zeros_jids] = job_allocation_on_gpus
# return fair_share#, np.array([time_1, time_2, time_3, time_4, time_5, time_6, time_7])
66 changes: 0 additions & 66 deletions cluster_scheduling/alg/approx_water_bet_plus_mcf.py

This file was deleted.

Loading

0 comments on commit 783b9f6

Please sign in to comment.