diff --git a/src/features/file_download_feature.py b/src/features/file_download_feature.py index 4b844346..8ac6bdd9 100644 --- a/src/features/file_download_feature.py +++ b/src/features/file_download_feature.py @@ -11,6 +11,8 @@ from react.observable import read_until_closed from utils.file_utils import create_unique_filename +INLINE_IMAGE_TYPE = 'inline-image' + RESULT_FILES_FOLDER = 'resultFiles' LOGGER = logging.getLogger('script_server.file_download_feature') @@ -22,54 +24,154 @@ def __init__(self, user_file_storage, temp_folder) -> None: self.result_folder = os.path.join(temp_folder, RESULT_FILES_FOLDER) user_file_storage.start_autoclean(self.result_folder, 1000 * 60 * 60 * 24) - self._execution_download_files = {} + self._execution_handlers = {} def subscribe(self, execution_service: ExecutionService): - download_feature = self + def start_listener(execution_id): + handler = _ScriptHandler(execution_id, execution_service, self.result_folder, self.user_file_storage) + self._execution_handlers[execution_id] = handler + + execution_service.add_start_listener(start_listener) - def execution_finished(execution_id): - config = execution_service.get_config(execution_id) - if not download_feature._is_downloadable(config): - return + def get_downloadable_files(self, execution_id): + handler = self._execution_handlers.get(execution_id) + if not handler: + return [] + + return handler.result_files.copy() + + def get_result_files_folder(self): + return self.result_folder - output_stream = execution_service.get_anonymized_output_stream(execution_id) + def allowed_to_download(self, file_path, execution_owner): + return self.user_file_storage.allowed_to_access(file_path, execution_owner) - output_stream_data = read_until_closed(output_stream) - script_output = ''.join(output_stream_data) + def subscribe_on_inline_images(self, execution_id, callback): + handler = self._execution_handlers.get(execution_id) + if not handler: + LOGGER.warn('Failed to find handler for execution #' + execution_id) + return - parameter_values = execution_service.get_user_parameter_values(execution_id) - owner = execution_service.get_owner(execution_id) + handler.add_inline_image_listener(callback) - downloadable_files = download_feature._prepare_downloadable_files( - config, - script_output, - parameter_values, - owner) - download_feature._execution_download_files[execution_id] = downloadable_files - execution_service.add_finish_listener(execution_finished) +class _ScriptHandler: - def get_downloadable_files(self, execution_id): - return self._execution_download_files.get(execution_id, []) + def __init__(self, execution_id, execution_service: ExecutionService, result_folder, file_storage) -> None: + self.execution_id = execution_id + self.execution_service = execution_service - @staticmethod - def _is_downloadable(config): - return not is_empty(config.output_files) + self.config = self.execution_service.get_config(execution_id) - def get_result_files_folder(self): - return self.result_folder + self.result_files_paths = self._get_paths(execution_id, self._is_post_finish_path) + self.inline_image_paths = self._get_paths(execution_id, self._is_inline_image_path) - def _prepare_downloadable_files(self, config, script_output, script_param_values, execution_owner): - output_files = config.output_files + self.prepared_files = {} + self.result_files = [] + self.inline_images = {} - if not output_files: + self.inline_image_listeners = [] + + if not self.result_files_paths and not self.inline_image_paths: + return + + self.output_stream = self.execution_service.get_anonymized_output_stream(execution_id) + + execution_owner = execution_service.get_owner(execution_id) + self.download_folder = file_storage.prepare_new_folder(execution_owner, result_folder) + LOGGER.info('Created download folder for ' + execution_owner + ': ' + self.download_folder) + + if self.result_files_paths: + execution_service.add_finish_listener(self._execution_finished, execution_id) + + if self.inline_image_paths: + self._listen_for_images() + + def add_inline_image_listener(self, callback): + self.inline_image_listeners.append(callback) + + def _get_paths(self, execution_id, predicate): + config = self.config + if is_empty(config.output_files): return [] - output_files = substitute_parameter_values( + paths = [_extract_path(f) for f in config.output_files if predicate(f)] + paths = [p for p in paths if p] + + parameter_values = self.execution_service.get_user_parameter_values(execution_id) + return substitute_parameter_values( config.parameters, - config.output_files, - script_param_values) + paths, + parameter_values) + + @staticmethod + def _is_post_finish_path(file): + if isinstance(file, str): + return True + + if isinstance(file, dict): + return file.get('type') != INLINE_IMAGE_TYPE + + return False + + @staticmethod + def _is_inline_image_path(file): + return isinstance(file, dict) and file.get('type') == INLINE_IMAGE_TYPE + + def _execution_finished(self): + output_stream_data = read_until_closed(self.output_stream) + script_output = ''.join(output_stream_data) + + downloadable_files = self._prepare_downloadable_files( + self.result_files_paths, + self.config, + script_output) + + self.result_files.extend(downloadable_files.values()) + + def _listen_for_images(self): + image_paths = self.inline_image_paths + script_handler = self + + class InlineImageListener: + def __init__(self) -> None: + self.last_buffer = '' + + def on_next(self, output: str): + output = self.last_buffer + output + self.last_buffer = '' + + if '\n' not in output: + self.last_buffer = output + return + + last_newline_index = output.rfind('\n') + self.last_buffer = output[last_newline_index + 1:] + output = output[:last_newline_index] + + if output: + self.prepare_images(output) + + def on_close(self): + if not self.last_buffer: + return + + self.prepare_images(self.last_buffer) + self.last_buffer = '' + + @staticmethod + def prepare_images(output): + images = script_handler._prepare_downloadable_files( + image_paths, + script_handler.config, + output) + + for key, value in images.items(): + script_handler._add_inline_image(key, value) + self.output_stream.subscribe(InlineImageListener()) + + def _prepare_downloadable_files(self, output_files, config, script_output, *, should_exist=True): correct_files = [] for output_file in output_files: @@ -79,23 +181,25 @@ def _prepare_downloadable_files(self, config, script_output, script_param_values for file in files: file_path = file_utils.normalize_path(file, config.working_directory) if not os.path.exists(file_path): - LOGGER.warning('file ' + file + ' (full path = ' + file_path + ') not found') + if should_exist: + LOGGER.warning('file ' + file + ' (full path = ' + file_path + ') not found') elif os.path.isdir(file_path): LOGGER.warning('file ' + file + ' is a directory. Not allowed') elif file_path not in correct_files: correct_files.append(file_path) - else: + elif should_exist: LOGGER.warning("Couldn't find file for " + output_file) if not correct_files: - return [] - - download_folder = self.user_file_storage.prepare_new_folder(execution_owner, self.result_folder) - LOGGER.info('Created download folder for ' + execution_owner + ': ' + download_folder) + return {} - result = [] + result = {} for file in correct_files: - preferred_download_file = os.path.join(download_folder, os.path.basename(file)) + if file in self.prepared_files: + result[file] = self.prepared_files[file] + continue + + preferred_download_file = os.path.join(self.download_folder, os.path.basename(file)) try: download_file = create_unique_filename(preferred_download_file) @@ -105,12 +209,22 @@ def _prepare_downloadable_files(self, config, script_output, script_param_values copyfile(file, download_file) - result.append(download_file) + result[file] = download_file + self.prepared_files[file] = download_file return result - def allowed_to_download(self, file_path, execution_owner): - return self.user_file_storage.allowed_to_access(file_path, execution_owner) + def _add_inline_image(self, original_path, download_path): + if original_path in self.inline_images: + return + + self.inline_images[original_path] = download_path + + for listener in self.inline_image_listeners: + try: + listener(original_path, download_path) + except Exception: + LOGGER.error('Failed to notify image listener') def substitute_parameter_values(parameter_configs, output_files, values): @@ -173,3 +287,13 @@ def find_matching_files(file_pattern, script_output): files.extend(matching_files) return files + + +def _extract_path(output_file): + if isinstance(output_file, str): + return output_file + elif isinstance(output_file, dict): + path = output_file.get('path') + if not string_utils.is_blank(path): + return path.strip() + return None diff --git a/src/tests/file_download_feature_test.py b/src/tests/file_download_feature_test.py index ee5aefe0..632d96bd 100644 --- a/src/tests/file_download_feature_test.py +++ b/src/tests/file_download_feature_test.py @@ -10,7 +10,8 @@ from tests import test_utils from tests.test_utils import create_parameter_model, _MockProcessWrapper, _IdGeneratorMock, create_config_model, \ create_audit_names, create_script_param_config -from utils import file_utils +from utils import file_utils, os_utils +from utils.file_utils import normalize_path class TestFileMatching(unittest.TestCase): @@ -356,3 +357,222 @@ def assert_downloadable_files(self, prepared_files, original_files): original_content = file_utils.read_file(original_file) self.assertEqual(original_content, prepared_content, 'Different content for file ' + filename) + + +def inline_image(path): + return {'type': 'inline-image', 'path': path} + + +class TestInlineImages(unittest.TestCase): + def setUp(self) -> None: + test_utils.setup() + + executor._process_creator = _MockProcessWrapper + self.executor_service = ExecutionService(_IdGeneratorMock()) + + self.file_download_feature = file_download_feature.FileDownloadFeature( + UserFileStorage(b'123456'), test_utils.temp_folder) + self.file_download_feature.subscribe(self.executor_service) + + self.images = [] + + def tearDown(self) -> None: + test_utils.cleanup() + + executions = self.executor_service.get_active_executions('userX') + for execution in executions: + self.executor_service.kill_script(execution) + + def _add_image(self, original_path, new_path): + self.images.append((original_path, new_path)) + + def test_single_static_image(self): + path = test_utils.create_file('test.png') + config = create_config_model('my_script', output_files=[inline_image(path)]) + + execution_id = self.start_execution(config) + + self.write_output(execution_id, '123\n456') + + self.wait_output_chunks(execution_id, chunks_count=1) + + self.assert_images(path) + + def test_multiple_static_images(self): + path1 = test_utils.create_file('test1.png') + path2 = test_utils.create_file('test2.png') + path3 = test_utils.create_file('test3.png') + config = create_config_model('my_script', + output_files=[inline_image(path1), inline_image(path2), inline_image(path3)]) + + execution_id = self.start_execution(config) + + self.write_output(execution_id, '123\n' + '456') + + self.wait_output_chunks(execution_id, chunks_count=1) + + self.assert_images(path1, path2, path3) + + def test_single_static_image_when_multiple_outputs(self): + path = test_utils.create_file('test.png') + config = create_config_model('my_script', output_files=[inline_image(path)]) + + execution_id = self.start_execution(config) + + self.write_output(execution_id, '123\n456') + self.wait_output_chunks(execution_id, chunks_count=1) + + self.write_output(execution_id, '789\n0') + self.wait_output_chunks(execution_id, chunks_count=2) + + self.assert_images(path) + + def test_single_dynamic_image(self): + path = test_utils.create_file('test.png') + config = create_config_model('my_script', output_files=[inline_image('##any_path.png#')]) + + execution_id = self.start_execution(config) + + full_path = file_utils.normalize_path(path) + self.write_output(execution_id, '123\n' + full_path + '\n456') + self.wait_output_chunks(execution_id, chunks_count=1) + + self.assert_images(full_path) + + def test_mixed_images_when_multiple_output(self): + path1 = test_utils.create_file('test123.png') + path2 = test_utils.create_file('images/test.png') + path3 = test_utils.create_file('a.b.c.png') + path4 = test_utils.create_file('test456.png') + path5 = test_utils.create_file('some/long/path/me.jpg') + + config = create_config_model('my_script', output_files=[ + inline_image(test_utils.temp_folder + os_utils.path_sep() + '#test\d+.png#'), + inline_image(path2), + inline_image(path3), + inline_image('##any_path/path/\w+#.jpg') + ]) + + execution_id = self.start_execution(config) + + paths = [normalize_path(p) for p in (path1, path2, path3, path4, path5)] + for index, path in enumerate(paths): + self.write_output(execution_id, '__ ' + path + ' __\n') + self.wait_output_chunks(execution_id, chunks_count=index + 1) + + self.write_output(execution_id, '__ ' + path2 + ' __\n') + self.wait_output_chunks(execution_id, chunks_count=len(paths) + 1) + + self.assert_images(*paths) + + def test_find_multiple_images_by_same_pattern(self): + path1 = test_utils.create_file('test123.png') + test_utils.create_file('images/test.png') + path3 = test_utils.create_file('a.b.c.png') + path4 = test_utils.create_file('some/sub/folder/test456.png') + + config = create_config_model('my_script', output_files=[ + inline_image('##any_path.png#') + ]) + + execution_id = self.start_execution(config) + + paths = [normalize_path(p) for p in (path1, path3, path4)] + for index, path in enumerate(paths): + self.write_output(execution_id, '__ ' + path + ' __\n') + self.wait_output_chunks(execution_id, chunks_count=index + 1) + + self.assert_images(*paths) + + def test_image_path_split_in_chunks(self): + path = test_utils.create_file('test123.png') + + config = create_config_model('my_script', output_files=[inline_image('##any_path.png#')]) + + execution_id = self.start_execution(config) + + normalized = normalize_path(path) + + self.write_output(execution_id, normalized[:4]) + self.wait_output_chunks(execution_id, chunks_count=1) + + self.write_output(execution_id, normalized[4:] + '\n') + self.wait_output_chunks(execution_id, chunks_count=2) + + self.assert_images(path) + + def test_image_path_split_in_chunks_and_no_newlines(self): + path = test_utils.create_file('test123.png') + + config = create_config_model('my_script', output_files=[inline_image('##any_path.png#')]) + + execution_id = self.start_execution(config) + + normalized = normalize_path(path) + + self.write_output(execution_id, normalized[:4]) + self.wait_output_chunks(execution_id, chunks_count=1) + + self.write_output(execution_id, normalized[4:]) + self.wait_output_chunks(execution_id, chunks_count=2) + + self.executor_service.get_active_executor(execution_id).process_wrapper.stop() + self.wait_close(execution_id) + + self.assert_images(path) + + def wait_output_chunks(self, execution_id, *, chunks_count): + waiter = OutputWaiter() + self.executor_service.get_anonymized_output_stream(execution_id).subscribe(waiter) + waiter.wait_chunks(chunks_count, timeout=0.5) + + def wait_close(self, execution_id): + chunk_condition = threading.Condition() + closed = False + + def waiter(): + global closed + closed = True + with chunk_condition: + chunk_condition.notify_all() + + self.executor_service.get_anonymized_output_stream(execution_id).subscribe_on_close(waiter) + with chunk_condition: + chunk_condition.wait_for(lambda: closed, 0.5) + + def write_output(self, execution_id, output): + process_wrapper = self.executor_service.get_active_executor(execution_id).process_wrapper + process_wrapper.write_output(output) + + def start_execution(self, config): + execution_id = self.executor_service.start_script(config, {}, 'userX', {}) + self.file_download_feature.subscribe_on_inline_images(execution_id, self._add_image) + return execution_id + + def assert_images(self, *paths): + normalized_paths = [file_utils.normalize_path(p) for p in paths] + actual_paths = [image[0] for image in self.images] + + self.assertCountEqual(normalized_paths, actual_paths) + + +class OutputWaiter: + def __init__(self) -> None: + self.chunks = [] + self.chunk_condition = threading.Condition() + + def on_next(self, chunk): + self.chunks.append(chunk) + + with self.chunk_condition: + self.chunk_condition.notify_all() + + def on_close(self): + pass + + def wait_chunks(self, chunk_count, *, timeout): + with self.chunk_condition: + result = self.chunk_condition.wait_for(lambda: len(self.chunks) >= chunk_count, timeout) + + if not result: + raise Exception('Chunk count did not reach ' + str(chunk_count)) diff --git a/src/tests/test_utils.py b/src/tests/test_utils.py index 8d21a129..2b8d8b0a 100644 --- a/src/tests/test_utils.py +++ b/src/tests/test_utils.py @@ -16,7 +16,7 @@ _original_env = {} -def create_file(filepath, overwrite=False, text=None): +def create_file(filepath, overwrite=False, text='test text'): if not os.path.exists(temp_folder): os.makedirs(temp_folder) @@ -29,9 +29,6 @@ def create_file(filepath, overwrite=False, text=None): if os.path.exists(file_path) and not overwrite: raise Exception('File ' + file_path + ' already exists') - if text is None: - text = 'test text' - file_utils.write_file(file_path, text) return file_path