Skip to content

Commit

Permalink
Check for purged outputs before reusing a run, as part of #438.
Browse files Browse the repository at this point in the history
Also shut down cleanly after a keyboard interrupt.
  • Loading branch information
donkirkby committed Apr 16, 2018
1 parent 6e2292c commit b7938c9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 15 deletions.
5 changes: 5 additions & 0 deletions micall/monitor/kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ def find_other_runs(self):
continue
try:
self.kive_retry(run.is_complete)
outputs = self.kive_retry(run.get_results)
if any(output.dataset_id is None
for output in outputs.values()):
# Output has been purged, can't reuse the run.
continue
input_list = run.raw['inputs']
inputs = sorted(input_list, key=itemgetter('index'))
input_ids = tuple(inp['dataset'] for inp in inputs)
Expand Down
37 changes: 35 additions & 2 deletions micall/tests/test_kive_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,10 +639,43 @@ def test_poll_first_sample_with_other_running(raw_data_with_two_samples,
dataset_id=quality_dataset_id)
dataset1.name = '140101_M01234_quality.csv'
mock_session.find_datasets.side_effect = [[dataset1], [], []]
filter_run = MagicMock(
name='filter_run',
other_run = MagicMock(
name='other_run',
pipeline_id=default_config.micall_main_pipeline_id,
raw=dict(inputs=[dict(index=1, dataset=quality_dataset_id)]))
mock_session.find_runs.return_value = [other_run]
kive_watcher = KiveWatcher(default_config)

kive_watcher.add_sample_group(
base_calls=base_calls,
sample_group=SampleGroup('2110A',
('2110A-V3LOOP_S13_L001_R1_001.fastq.gz',
None)))
kive_watcher.poll_runs()

mock_session.run_pipeline.assert_called_once()


def test_poll_first_sample_with_other_purged(raw_data_with_two_samples,
mock_open_kive,
default_config):
""" A matching run finished recently, but it was purged. """
base_calls = (raw_data_with_two_samples /
"MiSeq/runs/140101_M01234/Data/Intensities/BaseCalls")
mock_session = mock_open_kive.return_value
quality_dataset_id = 100
dataset1 = Mock(name='quality_csv',
groups_allowed=ALLOWED_GROUPS,
dataset_id=quality_dataset_id)
dataset1.name = '140101_M01234_quality.csv'
purged_dataset = Mock(name='purged_csv',
dataset_id=None)
mock_session.find_datasets.side_effect = [[dataset1], [], []]
filter_run = MagicMock(
name='filter_run',
pipeline_id=default_config.micall_filter_quality_pipeline_id,
raw=dict(inputs=[dict(index=1, dataset=quality_dataset_id)]),
**{'get_results.return_value': dict(purged_csv=purged_dataset)})
mock_session.find_runs.return_value = [filter_run]
kive_watcher = KiveWatcher(default_config)

Expand Down
34 changes: 21 additions & 13 deletions micall_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,13 @@ def parse_args(argv=None):
return args


def main():
args = parse_args()
logger.info('Starting up.')
def main_loop(args, sample_queue):
result_handler = partial(update_qai.process_folder,
qai_server=args.qai_server,
qai_user=args.qai_user,
qai_password=args.qai_password,
pipeline_version=args.pipeline_version)
kive_watcher = KiveWatcher(args, result_handler, retry=True)

sample_queue = Queue(maxsize=2)
wait = True
finder_thread = Thread(target=find_samples,
args=(args.raw_data,
args.pipeline_version,
sample_queue,
wait),
daemon=True)
finder_thread.start()
while True:
kive_watcher.poll_runs()
if kive_watcher.is_full():
Expand All @@ -134,5 +122,25 @@ def main():
break


def main():
args = parse_args()
logger.info('Starting up.')

sample_queue = Queue(maxsize=2)
wait = True
finder_thread = Thread(target=find_samples,
args=(args.raw_data,
args.pipeline_version,
sample_queue,
wait),
daemon=True)
finder_thread.start()

try:
main_loop(args, sample_queue)
except KeyboardInterrupt:
logger.info('Shutting down.')


if __name__ == '__main__':
main()

0 comments on commit b7938c9

Please sign in to comment.