Skip to content

Commit

Permalink
Polish resubmit.
Browse files Browse the repository at this point in the history
mesos cluster master/slaves maybe busy, do not make it worse

1. not resubmit too many in a round.
2. use history max stage_time for its timeout.
  • Loading branch information
youngsofun committed Nov 20, 2018
1 parent 7e263e5 commit 137dbe2
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions dpark/taskset.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def readable(size):


LOCALITY_WAIT = 0
WAIT_FOR_RUNNING = 10
WAIT_FOR_RUNNING = 15
MAX_TASK_FAILURES = 4
MAX_TASK_MEMORY = 20 << 10 # 20GB

Expand Down Expand Up @@ -101,6 +101,7 @@ def __init__(self, sched, tasks, cpus=1, mem=100, gpus=0,
self.id_retry_host = {}
self.task_local_set = set()
self.mem_digest = TDigest()
self.max_stage_time = 0
self.mem90 = 0 # TODO: move to stage

@property
Expand Down Expand Up @@ -217,6 +218,7 @@ def statusUpdate(self, task_id, num_try, status, reason=None, message=None,

if status == TaskState.running:
task.start_time = time.time()
self.max_stage_time = max(self.max_stage_time, task.start_time - task.stage_time)
elif status == TaskState.finished:
if stats:
self.mem_digest.add(stats.bytes_max_rss / (1024. ** 2))
Expand Down Expand Up @@ -363,6 +365,8 @@ def _task_lost(self, task_id, num_try, status, reason, message, exception=None):
self.counter.launched -= 1

def check_task_timeout(self):
"""In lock, so be fast!"""

now = time.time()
if self.last_check + 5 > now:
return False
Expand All @@ -376,22 +380,30 @@ def check_task_timeout(self):
n)
self.counter.launched = n

# staged but not run for too long
# mesos may be busy.
num_resubmit = 0
for i in range(self.counter.n):
task = self.tasks[i]
if (self.launched[i] and task.status == TaskState.staging
and task.stage_time + WAIT_FOR_RUNNING < now):
and task.stage_time + self.max_stage_time + WAIT_FOR_RUNNING < now):
logger.warning('task %s timeout %.1f (at %s), re-assign it',
task.id, now - task.stage_time, task.host)
self.counter.staging_timeout += 1

self.launched[i] = False
self.counter.launched -= 1
num_resubmit += 1
if num_resubmit > 3:
break

# running for too long
num_resubmit = 0
if self.counter.finished > self.counter.n * 2.0 / 3:
scale = 1.0 * self.counter.n / self.counter.finished
tasks = sorted((task.start_time, i, task)
for i, task in enumerate(self.tasks)
if self.launched[i] and not self.finished[i])
if self.launched[i] and not self.finished[i] and task.status == TaskState.running)
for _t, idx, task in tasks:
time_used = now - task.start_time
if time_used > self.max_task_time * (2 ** task.num_try) * scale:
Expand All @@ -401,7 +413,8 @@ def check_task_timeout(self):
logger.info('re-submit task %s for timeout %.1f, '
'try %d', task.id, time_used, task.num_try)
task.time_used += time_used
task.start_time = now
task.stage_time = 0
task.start_time = 0
self.launched[idx] = False
self.counter.launched -= 1
else:
Expand All @@ -410,6 +423,9 @@ def check_task_timeout(self):
self._abort('task %s timeout' % task)
else:
break
num_resubmit += 1
if num_resubmit > 3:
break
return self.counter.launched < n

def _abort(self, message):
Expand Down

0 comments on commit 137dbe2

Please sign in to comment.