Jobman documentation¶
jobman is a job management package designed to submit and monitor jobs on remote machines. It is built on the top of the dpdispatcher package.
jobman is designed for the big data era, where the number of remoted jobs is large that handling them manually is almost impossible. Imaging that you have more than 1000 jobs to run, you have access to 3 remote high-performance computing (HPC) serves with different computing environment, and you need to monitor the progress of each job, check the output files, and download the results. This is a tedious and time-consuming task. The jobman package is designed to automate such tasks. jobman will handle the input files, submit the jobs to remote machines, monitor the progress of each job, and download the results to the local machine whenever jobs finished.
Case 1: Distribute jobs to a single remote machine¶
This is a general purpose usage, where task_list can be defined flexibly. In this case, each Task can have different command_list, forward_files, backward_files.
To use, just need to:
- Define the
task_listas list of Task objects. - Use function submit_job_chunk() to submit jobs to remote machines.
from thkit.jobman import ConfigRemoteMachines, Task, submit_job_chunk
config = ConfigRemoteMachines("MACHINE.yml") # load the remote machine config
mdict_list = config.select_machines(mdict_prefix="lammps_")
mdict = mdict_list[0] # use the first machine in the list
task_list = [Task(), Task(), ...] # list of Task objects
submit_job_chunk(
mdict=mdict,
work_dir='./',
task_list=task_list,
forward_common_files=forward_common_files,
backward_common_files=backward_common_files,
)
jobman will handle the rest.
- Info 1: An example of defining
task_list:
task = Task.load_from_dict(
{
"command": f"mpirun -np lmp_serial -in {runfile}",
"task_work_path": "./",
"forward_files": ["all_input_files"],
"backward_files": ["all_getback_files/*"],
"outlog": "out.log",
"errlog": "err.log",
}
)
task_list = [task]
-
Info 2: Configure the remote machine in
MACHINE.ymlfile, following the remote machine schema. -
(Optional) Use a launcher script (e.g.,
launcher.sh) to run python code
#!/bin/bash
source /etc/profile.d/modules.sh
module use /home/tha/app/1modulefiles
module load miniforge
source activate py13
python above_py_script.py
Case 2: Distribute jobs to multiple remote machines¶
This is used for specific purpose (e.g., alff package), where the jobs have the same forward_files, backward_files; but the command_list can be different based on computing environment on each remote machine. Just need to:
- Prepare the
task_dirs, where all of them have the same forward_files, backward_files. - Define a
prepare_command_list()function to prepare the command_list for each remote machine.
from thkit.jobman import alff_submit_job_multi_remotes
from thkit.config import loadconfig
import asyncio
mdict = loadconfig("remote_machine.yml") # load the remote machine config
### Prepare command_list on each machine
def prepare_command_list(machine: dict) -> list:
command_list = []
dft_cmd = machine.get("command", "python")
dft_cmd = f"{dft_cmd} ../cli_gpaw_optimize.py ../{FILE_ARG_ASE}" # `../` to run file in common directory
command_list.append(dft_cmd)
return command_list
### Submit to multiple machines
asyncio.run(
alff_submit_job_multi_remotes(
multi_mdict=mdict,
prepare_command_list=prepare_command_list,
work_dir=work_dir,
task_dirs=task_dirs,
forward_files=forward_files,
backward_files=backward_files,
forward_common_files=forward_common_files,
mdict_prefix="dft",
logger=logger,
)
)
Note:
- Setting remote machines follow the remote machine schema.
- Can import from jobman these classes: Task, Machine, Resources, Submission.
- To handle if some tasks is finished and some tasks are not finished, see the function handle_submission()
API reference:¶
thkit.jobman
¶
Modules:
-
#retired_code– -
helper– -
submit–
#retired_code
¶
Functions:
-
submit_job_chunk–Function to submit a jobs to the remote machine. The function will:
-
async_submit_job_chunk–Convert
submit_job_chunk()into an async function but only need to wait for the completion of the entireforloop (without worrying about the specifics of each operation inside the loop) -
async_submit_job_chunk_tqdm–Revised version of
async_submit_job_chunk()withtqdmprogress bar.
Attributes:
-
runvar– -
global_lock–
runvar = {}
module-attribute
¶
global_lock = asyncio.Lock()
module-attribute
¶
submit_job_chunk(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, logger: object = None)
¶
Function to submit a jobs to the remote machine. The function will:
- Prepare the task list
- Make the submission of jobs to remote machines
- Wait for the jobs to finish and download the results to the local machine
Parameters:
-
mdict(dict) –a dictionary contain settings of the remote machine. The parameters described in the remote machine schema. This dictionary defines the login information, resources, execution command, etc. on the remote machine.
-
task_list(list[Task]) –a list of Task objects. Each task object contains the command to be executed on the remote machine, and the files to be copied to and from the remote machine. The dirs of each task must be relative to the
work_dir. -
forward_common_files(list[str], default:[]) –common files used for all tasks. These files are i n the
work_dir. -
backward_common_files(list[str], default:[]) –common files to download from the remote machine when the jobs are finished.
-
machine_index(int, default:0) –index of the machine in the list of machines.
-
logger(object, default:None) –the logger object to be used for logging.
Note
- Split the
task_listinto chunks to control the number of jobs submitted at once. - Should not use the
Localcontexts, it will interference the current shell environment which leads to the unexpected behavior on local machine. Instead, use another account to connect local machine withSSHcontext.
async_submit_job_chunk(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, logger: object = None)
async
¶
Convert submit_job_chunk() into an async function but only need to wait for the completion of the entire for loop (without worrying about the specifics of each operation inside the loop)
Note
- An async function normally contain a
await ...statement to be awaited (yield control to event loop) - If the 'event loop is blocked' by a asynchronous function (it will not yield control to event loop), the async function will wait for the completion of the synchronous function. So, the async function will not be executed asynchronously. Try to use
await asyncio.to_thread()to run the synchronous function in a separate thread, so that the event loop is not blocked.
async_submit_job_chunk_tqdm(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, logger: object = None)
async
¶
Revised version of async_submit_job_chunk() with tqdm progress bar.
helper
¶
Classes:
-
ConfigRemoteMachines–Class for remote machine configuration files.
Functions:
-
change_logfile_dispatcher–Change the logfile of dpdispatcher.
-
init_jobman_logger–Initialize the default logger under
log/if not provided. -
log_machine_info–Log remote machine information.
ConfigRemoteMachines(machines_file: str)
¶
Bases: Config
Class for remote machine configuration files.
Parameters:
-
machines_file(str) –path to the YAML file contains multiple machines configs.
Methods:
-
validate_machine_config–Validate multiple machines configs.
-
select_machines–Select machine dicts based on the prefix.
-
check_connection–Check whether the connections to all remote machines are valid.
-
check_resource_settings–Check whether the resource settings in all remote machines are valid.
-
validate–Validate the config file with the schema file.
-
loadconfig–Load data from a JSON or YAML file.
Attributes:
-
machines_file(str) – -
multi_mdicts–
machines_file: str = machines_file
instance-attribute
¶
multi_mdicts = self.loadconfig(machines_file)
instance-attribute
¶
validate_machine_config(schema_file: str | None = None)
¶
Validate multiple machines configs.
select_machines(mdict_prefix: str = '') -> list[dict]
¶
Select machine dicts based on the prefix.
To specify multiple remote machines for the same purpose, the top-level keys in the machines_file should start with the same prefix. Example:
- train_1, train_2,... for training jobs
- lammps_1, lammps_2,... for lammps jobs
- gpaw_1, gpaw_2,... for gpaw jobs
Parameters:
-
mdict_prefix(str, default:'') –the prefix to select remote machines for the same purpose. Example: 'dft_', 'md_', 'train_'.
Returns:
check_connection(mdict_prefix: str = '')
¶
Check whether the connections to all remote machines are valid.
Parameters:
-
mdict_prefix(str, default:'') –Only check the remote machines with this prefix.
check_resource_settings(mdict_prefix: str = '')
¶
Check whether the resource settings in all remote machines are valid.
validate(config_dict: dict | None = None, config_file: str | None = None, schema_dict: dict | None = None, schema_file: str | None = None, allow_unknown: bool = False, require_all: bool = False)
staticmethod
¶
Validate the config file with the schema file.
Parameters:
-
config_dict(dict, default:None) –config dictionary. Defaults to None.
-
config_file(str, default:None) –path to the YAML config file, will override
config_dict. Defaults to None. -
schema_dict(dict, default:None) –schema dictionary. Defaults to None.
-
schema_file(str, default:None) –path to the YAML schema file, will override
schema_dict. Defaults to None. -
allow_unknown(bool, default:False) –whether to allow unknown fields in the config file. Defaults to False.
-
require_all(bool, default:False) –whether to require all fields in the schema file to be present in the config file. Defaults to False.
Raises:
-
ValueError–if the config file does not match the schema
loadconfig(filename: str | Path) -> dict
staticmethod
¶
Load data from a JSON or YAML file.
Args: filename (Union[str, Path]): The filename to load data from, whose suffix should be .json, jsonc, .yml, or .yml
Returns:
-
jdata(dict) –(dict) The data loaded from the file
Notes
- The YAML file can contain variable-interpolation, will be processed by OmegaConf. Example input YAML file:
change_logfile_dispatcher(newlogfile: str)
¶
Change the logfile of dpdispatcher.
init_jobman_logger(logfile: str | None = None) -> ColorLogger
¶
Initialize the default logger under log/ if not provided.
log_machine_info(num_jobs: int, mdict: dict, machine_index: int, logger: logging.Logger)
¶
Log remote machine information.
submit
¶
Functions:
-
submit_job_chunk–Function to submit a jobs to the remote machine.
-
async_submit_job_chunk–Convert
submit_job_chunk()into an async function. -
alff_submit_job_multi_remotes–Submit jobs to multiple machines asynchronously.
Attributes:
-
sync_dict– -
global_lock–
sync_dict = {}
module-attribute
¶
global_lock = asyncio.Lock()
module-attribute
¶
submit_job_chunk(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, logger: ColorLogger | None = None)
¶
Function to submit a jobs to the remote machine.
Includes: - Prepare the task list - Make the submission of jobs to remote machines - Wait for the jobs to finish and download the results to the local machine
Parameters:
-
mdict(dict) –a dictionary contain settings of the remote machine. The parameters described in the remote machine schema. This dictionary defines the login information, resources, execution command, etc. on the remote machine.
-
work_dir(str) –the base working directory on the local machine. All task directories are relative to this directory.
-
task_list(list[Task]) –a list of Task objects. Each task object contains the command to be executed on the remote machine, and the files to be copied to and from the remote machine. The dirs of each task must be relative to the
work_dir. -
forward_common_files(list[str], default:[]) –common files used for all tasks. These files are i n the
work_dir. -
backward_common_files(list[str], default:[]) –common files to download from the remote machine when the jobs are finished.
-
machine_index(int, default:0) –index of the machine in the list of machines.
-
logger(object, default:None) –the logger object to be used for logging.
Note
- Split the
task_listinto chunks to control the number of jobs submitted at once. - Should not use the
Localcontexts, it will interference the current shell environment which leads to the unexpected behavior on local machine. Instead, use another account to connect local machine withSSHcontext.
async_submit_job_chunk(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, logger: ColorLogger | None = None)
async
¶
Convert submit_job_chunk() into an async function.
The approach in this function is only need to wait for the completion of the entire for loop (without worrying about the specifics of each operation inside the loop)
Note
- An async function normally contain a
await ...statement to be awaited (yield control to event loop) - If the 'event loop is blocked' by a asynchronous function (it will not yield control to event loop), the async function will wait for the completion of the synchronous function. So, the async function will not be executed asynchronously. Try to use
await asyncio.to_thread()to run the synchronous function in a separate thread, so that the event loop is not blocked. - This version use
richinstead oftqdmfor better handle progess bars (see retired codes). Multipletqdmbars work well if there are not errors during job submission. However, if the jobs raise errors, thetqdmbars will be messed up. rich's remaining time does not work well with multiple progress bars. So, I implemented a customized time remaining column.
alff_submit_job_multi_remotes(mdict_list: list[dict], commandlist_list: list[list[str]], work_dir: str, task_dirs: list[str], forward_files: list[str], backward_files: list[str], forward_common_files: list[str] = [], backward_common_files: list[str] = [], logger: ColorLogger | None = None)
async
¶
Submit jobs to multiple machines asynchronously.
Parameters: