Source code for dpdispatcher.entrypoints.submission

from pathlib import Path

from dpdispatcher.dlog import dlog
from dpdispatcher.submission import Submission
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.record import record


[docs] def handle_submission( *, submission_hash: str, download_terminated_log: bool = False, download_finished_task: bool = False, clean: bool = False, reset_fail_count: bool = False, ): """Handle terminated submission. Parameters ---------- submission_hash : str Submission hash to download. download_terminated_log : bool, optional Download log files of terminated tasks. download_finished_task : bool, optional Download finished tasks. clean : bool, optional Clean submission. reset_fail_count : bool, optional Reset fail count of all jobs to zero. Raises ------ ValueError At least one action should be specified. """ if ( int(download_terminated_log) + int(download_finished_task) + int(clean) + int(reset_fail_count) == 0 ): raise ValueError("At least one action should be specified.") submission_file = record.get_submission(submission_hash) submission = Submission.submission_from_json(str(submission_file)) submission.belonging_tasks = [ task for job in submission.belonging_jobs for task in job.job_task_list ] # TODO: for unclear reason, the submission_hash may be changed submission.submission_hash = submission_hash submission.machine.context.bind_submission(submission) if reset_fail_count: for job in submission.belonging_jobs: job.fail_count = 0 # save to remote and local submission.submission_to_json() record.write(submission) if int(download_terminated_log) + int(download_finished_task) + int(clean) == 0: # if only reset_fail_count, no need to update submission state (expensive) return submission.update_submission_state() submission.submission_to_json() record.write(submission) terminated_tasks = [] finished_tasks = [] for task in submission.belonging_tasks: task.get_task_state(submission.machine.context) if task.task_state == JobStatus.terminated: terminated_tasks.append(task) elif task.task_state == JobStatus.finished: finished_tasks.append(task) submission.belonging_tasks = [] if download_terminated_log: for task in terminated_tasks: task.backward_files = [task.outlog, task.errlog] submission.belonging_tasks += terminated_tasks if download_finished_task: submission.belonging_tasks += finished_tasks submission.download_jobs() if download_terminated_log: terminated_log_files = [] for task in terminated_tasks: assert submission.local_root is not None terminated_log_files.append( Path(submission.local_root) / task.task_work_path / task.outlog ) terminated_log_files.append( Path(submission.local_root) / task.task_work_path / task.errlog ) dlog.info( "Terminated logs are downloaded into:\n " + "\n ".join([str(f) for f in terminated_log_files]) ) if clean: submission.clean_jobs()