Skip to content

jobman

thkit.jobman

jobman is a job management package designed to submit and monitor jobs on remote machines. It is built on the top of the dpdispatcher package.

jobman is designed for the big data era, where the number of remoted jobs is large that handling them manually is almost impossible. Imaging that you have more than 1000 jobs to run, you have access to 3 remote high-performance computing (HPC) serves with different computing environment, and you need to monitor the progress of each job, check the output files, and download the results. This is a tedious and time-consuming task. The jobman package is designed to automate such tasks. jobman will handle the input files, submit the jobs to remote machines, monitor the progress of each job, and download the results to the local machine whenever jobs finished.

Case 1: Distribute jobs to single remote machines

This is used for general purpose, which can define the task_list flexibly where each task can have different command_list, forward_files, backward_files. Just need to:

  • Define the task_list as a list of Task objects.
  • Use function submit_job_chunk() to submit jobs to remote machines.
from thkit.jobman import submit_job_chunk, Task
from thkit.config import load_config

mdict = load_config("remote_machine.yml")  # load the remote machine config
task_list = [Task(...), Task(...), ...]    # list of Task objects
submit_job_chunk(
    mdict=mdict,
    work_dir=work_dir,
    task_list=task_list,
    forward_common_files=forward_common_files,
    backward_common_files=backward_common_files,
)

Case 2: Distribute jobs to multiple remote machines

This is used for specific purpose (e.g., alff package), where the jobs have the same forward_files, backward_files; but the command_list can be different based on computing environment on each remote machine. Just need to:

  • Prepare the task_dirs, where all of them have the same forward_files, backward_files.
  • Define a prepare_command_list() function to prepare the command_list for each remote machine.
from thkit.jobman import alff_submit_job_multi_remotes
from thkit.config import load_config
import asyncio

mdict = load_config("remote_machine.yml")  # load the remote machine config

### Prepare command_list on each machine
def prepare_command_list(machine: dict) -> list:
    command_list = []
    dft_cmd = machine.get("command", "python")
    dft_cmd = f"{dft_cmd} ../cli_gpaw_optimize.py ../{FILE_ASE_ARG}"  # `../` to run file in common directory
    command_list.append(dft_cmd)
    return command_list

### Submit to multiple machines
asyncio.run(
    alff_submit_job_multi_remotes(
        multi_mdict=mdict,
        prepare_command_list=prepare_command_list,
        work_dir=work_dir,
        task_dirs=task_dirs,
        forward_files=forward_files,
        backward_files=backward_files,
        forward_common_files=forward_common_files,
        mdict_prefix="dft",
        Logger=Logger,
    )
)
Note

Functions:

submit_job_chunk(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, Logger: object = None)

Function to submit a jobs to the remote machine. The function will:

  • Prepare the task list
  • Make the submission of jobs to remote machines
  • Wait for the jobs to finish and download the results to the local machine

Parameters:

  • mdict (dict) –

    a dictionary contain settings of the remote machine. The parameters described in the remote machine schema. This dictionary defines the login information, resources, execution command, etc. on the remote machine.

  • task_list (list[Task]) –

    a list of Task objects. Each task object contains the command to be executed on the remote machine, and the files to be copied to and from the remote machine. The dirs of each task must be relative to the work_dir.

  • forward_common_files (list[str], default: [] ) –

    common files used for all tasks. These files are i n the work_dir.

  • backward_common_files (list[str], default: [] ) –

    common files to download from the remote machine when the jobs are finished.

  • machine_index (int, default: 0 ) –

    index of the machine in the list of machines.

  • Logger (object, default: None ) –

    the logger object to be used for logging.

Note
  • Split the task_list into chunks to control the number of jobs submitted at once.
  • Should not use the Local contexts, it will interference the current shell environment which leads to the unexpected behavior on local machine. Instead, use another account to connect local machine with SSH context.

async_submit_job_chunk(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, Logger: object = None) async

Convert submit_job_chunk() into an async function but only need to wait for the completion of the entire for loop (without worrying about the specifics of each operation inside the loop)

Note
  • An async function normally contain a await ... statement to be awaited (yield control to event loop)
  • If the 'event loop is blocked' by a asynchronous function (it will not yield control to event loop), the async function will wait for the completion of the synchronous function. So, the async function will not be executed asynchronously. Try to use await asyncio.to_thread() to run the synchronous function in a separate thread, so that the event loop is not blocked.

alff_submit_job_multi_remotes(multi_mdict: dict, prepare_command_list: callable, work_dir: str, task_dirs: list[str], forward_files: list[str], backward_files: list[str], forward_common_files: list[str] = [], backward_common_files: list[str] = [], mdict_prefix: str = 'dft', Logger: object = None) async

Submit jobs to multiple machines asynchronously.

Parameters:

  • multi_mdict (dict) –

    the big_dict contains multiple mdicts. Each mdict contains parameters of one remote machine, which parameters as in the remote machine schema.

  • prepare_command_list(callable)

    a function to prepare the command list based on each remote machine.

  • mdict_prefix(str)

    the prefix to select remote machines for the same purpose. Example: 'dft', 'md', 'train'.

change_logpath_dispatcher(newlogfile: str = _DEFAULT_LOG_FILE)

Change the logfile of dpdispatcher.

validate_machine_config(machine_file: str)

Validate the YAML file contains machine config