Skip to content

API

thkit

The Python package of general ulitities.

Developed and maintained by C.Thang Nguyen

Modules:

Attributes:

THKIT_ROOT = Path(__file__).parent module-attribute

__author__ = 'C.Thang Nguyen' module-attribute

__contact__ = 'http://thangckt.github.io/email' module-attribute

config

Functions:

  • validate_config

    Validate the config file with the schema file.

  • load_config

    Load data from a JSON or YAML file. The YAML file can contain variable-interpolation, will be processed by OmegaConf.

  • load_jsonc

    Load data from a JSON file that allow comments.

validate_config(config_dict=None, config_file=None, schema_dict=None, schema_file=None, allow_unknown=False, require_all=False)

Validate the config file with the schema file.

Parameters:

  • config_dict (dict, default: None ) –

    config dictionary. Defaults to None.

  • config_file (str, default: None ) –

    path to the YAML config file, will override config_dict. Defaults to None.

  • schema_dict (dict, default: None ) –

    schema dictionary. Defaults to None.

  • schema_file (str, default: None ) –

    path to the YAML schema file, will override schema_dict. Defaults to None.

  • allow_unknown (bool, default: False ) –

    whether to allow unknown fields in the config file. Defaults to False.

  • require_all (bool, default: False ) –

    whether to require all fields in the schema file to be present in the config file. Defaults to False.

Raises:

  • ValueError

    if the config file does not match the schema

load_config(filename: Union[str, Path]) -> dict

Load data from a JSON or YAML file. The YAML file can contain variable-interpolation, will be processed by OmegaConf.

Args: filename (Union[str, Path]): The filename to load data from, whose suffix should be .json, jsonc, .yml, or .yml

Returns:

  • jdata ( dict ) –

    (dict) The data loaded from the file

load_jsonc(filename: str) -> dict

Load data from a JSON file that allow comments.

io

Functions:

  • write_yaml

    Write data to a YAML file.

  • read_yaml

    Read data from a YAML file.

  • combine_text_files

    Combine text files into a single file in a memory-efficient. Read and write in chunks to avoid loading large files into memory

  • unpack_dict

    Unpack one level of nested dictionary.

  • download_rawtext

    Download raw text from a URL.

write_yaml(jdata: dict, filename: Union[str, Path])

Write data to a YAML file.

read_yaml(filename: Union[str, Path]) -> dict

Read data from a YAML file.

combine_text_files(files: list[str], output_file: str, chunk_size: int = 1024)

Combine text files into a single file in a memory-efficient. Read and write in chunks to avoid loading large files into memory

Parameters:

  • files (list[str]) –

    List of file paths to combine.

  • output_file (str) –

    Path to the output file.

  • chunk_size (int, default: 1024 ) –

    Size of each chunk in KB to read/write. Defaults to 1024 KB.

unpack_dict(nested_dict: dict) -> dict

Unpack one level of nested dictionary.

download_rawtext(url: str, outfile: str = None) -> str

Download raw text from a URL.

jobman

jobman is a job management package designed to submit and monitor jobs on remote machines. It is built on the top of the dpdispatcher package.

jobman is designed for the big data era, where the number of remoted jobs is large that handling them manually is almost impossible. Imaging that you have more than 1000 jobs to run, you have access to 3 remote high-performance computing (HPC) serves with different computing environment, and you need to monitor the progress of each job, check the output files, and download the results. This is a tedious and time-consuming task. The jobman package is designed to automate such tasks. jobman will handle the input files, submit the jobs to remote machines, monitor the progress of each job, and download the results to the local machine whenever jobs finished.

Case 1: Distribute jobs to single remote machines

This is used for general purpose, which can define the task_list flexibly where each task can have different command_list, forward_files, backward_files. Just need to:

  • Define the task_list as a list of Task objects.
  • Use function submit_job_chunk() to submit jobs to remote machines.
from thkit.jobman import submit_job_chunk, Task
from thkit.config import load_config

mdict = load_config("remote_machine.yml")  # load the remote machine config
task_list = [Task(...), Task(...), ...]    # list of Task objects
submit_job_chunk(
    mdict=mdict,
    work_dir=work_dir,
    task_list=task_list,
    forward_common_files=forward_common_files,
    backward_common_files=backward_common_files,
)

Case 2: Distribute jobs to multiple remote machines

This is used for specific purpose (e.g., alff package), where the jobs have the same forward_files, backward_files; but the command_list can be different based on computing environment on each remote machine. Just need to:

  • Prepare the task_dirs, where all of them have the same forward_files, backward_files.
  • Define a prepare_command_list() function to prepare the command_list for each remote machine.
from thkit.jobman import alff_submit_job_multi_remotes
from thkit.config import load_config
import asyncio

mdict = load_config("remote_machine.yml")  # load the remote machine config

### Prepare command_list on each machine
def prepare_command_list(machine: dict) -> list:
    command_list = []
    dft_cmd = machine.get("command", "python")
    dft_cmd = f"{dft_cmd} ../cli_gpaw_optimize.py ../{FILE_ASE_ARG}"  # `../` to run file in common directory
    command_list.append(dft_cmd)
    return command_list

### Submit to multiple machines
asyncio.run(
    alff_submit_job_multi_remotes(
        multi_mdict=mdict,
        prepare_command_list=prepare_command_list,
        work_dir=work_dir,
        task_dirs=task_dirs,
        forward_files=forward_files,
        backward_files=backward_files,
        forward_common_files=forward_common_files,
        mdict_prefix="dft",
        Logger=Logger,
    )
)
Note

Functions:

_machine_locks = {} module-attribute

_DEFAULT_LOG_FILE = f'{time.strftime('%y%b%d_%H%M%S')}_dispatch.log' module-attribute

_COLOR_MAP = {0: 'blue', 1: 'green', 2: 'yellow', 3: 'magenta', 4: 'cyan', 5: 'red', 6: 'white', 7: 'white', 8: 'white', 9: 'white', 10: 'white'} module-attribute

_prepare_submission(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = []) -> Submission

Function to simplify the preparation of the Submission object for dispatching jobs.

submit_job_chunk(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, Logger: object = None)

Function to submit a jobs to the remote machine. The function will:

  • Prepare the task list
  • Make the submission of jobs to remote machines
  • Wait for the jobs to finish and download the results to the local machine

Parameters:

  • mdict (dict) –

    a dictionary contain settings of the remote machine. The parameters described in the remote machine schema. This dictionary defines the login information, resources, execution command, etc. on the remote machine.

  • task_list (list[Task]) –

    a list of Task objects. Each task object contains the command to be executed on the remote machine, and the files to be copied to and from the remote machine. The dirs of each task must be relative to the work_dir.

  • forward_common_files (list[str], default: [] ) –

    common files used for all tasks. These files are i n the work_dir.

  • backward_common_files (list[str], default: [] ) –

    common files to download from the remote machine when the jobs are finished.

  • machine_index (int, default: 0 ) –

    index of the machine in the list of machines.

  • Logger (object, default: None ) –

    the logger object to be used for logging.

Note
  • Split the task_list into chunks to control the number of jobs submitted at once.
  • Should not use the Local contexts, it will interference the current shell environment which leads to the unexpected behavior on local machine. Instead, use another account to connect local machine with SSH context.

async_submit_job_chunk(mdict: dict, work_dir: str, task_list: list[Task], forward_common_files: list[str] = [], backward_common_files: list[str] = [], machine_index: int = 0, Logger: object = None) async

Convert submit_job_chunk() into an async function but only need to wait for the completion of the entire for loop (without worrying about the specifics of each operation inside the loop)

Note
  • An async function normally contain a await ... statement to be awaited (yield control to event loop)
  • If the 'event loop is blocked' by a asynchronous function (it will not yield control to event loop), the async function will wait for the completion of the synchronous function. So, the async function will not be executed asynchronously. Try to use await asyncio.to_thread() to run the synchronous function in a separate thread, so that the event loop is not blocked.

_get_machine_lock(machine_index)

_run_submission_wrapper(submission, check_interval=30, machine_index=0) async

Ensure only one instance of 'submission.run_submission' runs at a time. - If use one global lock for all machines, it will prevent concurrent execution of submissions on different machines. Therefore, each machine must has its own lock, so different machines can process jobs in parallel.

_alff_prepare_task_list(command_list: list[str], task_dirs: list[str], forward_files: list[str], backward_files: list[str], outlog: str, errlog: str) -> list[Task]

Prepare the task list for alff package.

The feature of jobs in alff package are they have the same: command_list, forward_files, backward_files. So this function is to shorthand prepare the list of Task object for alff package. For general usage, should prepare the task list from scratch.

Parameters:

  • command_list (list[str]) –

    the list of commands to be executed on the remote machine.

  • task_dirs (list[str]) –

    the list of directories for each task. They must be relative to the work_dir in function _prepare_submission

  • forward_files (list[str]) –

    the list of files to be copied to the remote machine. These files must existed in each task_dir.

  • backward_files (list[str]) –

    the list of files to be copied back from the remote machine.

  • outlog (str) –

    the name of the output log file.

  • errlog (str) –

    the name of the error log file.

Returns:

  • list[Task]

    list[Task]: a list of Task objects.

alff_submit_job_multi_remotes(multi_mdict: dict, prepare_command_list: callable, work_dir: str, task_dirs: list[str], forward_files: list[str], backward_files: list[str], forward_common_files: list[str] = [], backward_common_files: list[str] = [], mdict_prefix: str = 'dft', Logger: object = None) async

Submit jobs to multiple machines asynchronously.

Parameters:

  • multi_mdict (dict) –

    the big_dict contains multiple mdicts. Each mdict contains parameters of one remote machine, which parameters as in the remote machine schema.

  • prepare_command_list(callable)

    a function to prepare the command list based on each remote machine.

  • mdict_prefix(str)

    the prefix to select remote machines for the same purpose. Example: 'dft', 'md', 'train'.

change_logpath_dispatcher(newlogfile: str = _DEFAULT_LOG_FILE)

Change the logfile of dpdispatcher.

_info_current_dispatch(num_tasks: int, num_tasks_current_chunk: int, job_limit, chunk_index, old_time=None, new_time=None, machine_index=0) -> str

Return the information of the current chunk of tasks.

_remote_info(machine_dict) -> str

Return the remote machine information. Args: mdict (dict): the machine dictionary

_init_default_logger(logfile: str = _DEFAULT_LOG_FILE)

Initialize the default logger not provided

validate_machine_config(machine_file: str)

Validate the YAML file contains machine config

path

Functions:

  • make_dir

    Create a directory with a backup option.

  • make_dir_ask_backup

    Make a directory and ask for backup if the directory already exists.

  • ask_yes_no

    Asks a yes/no/backup question and returns the response.

  • list_paths

    List all files/folders in given directories and their subdirectories that match the given patterns.

  • collect_files

    Collect files from a list of paths (files/folders). Will search files in folders and their subdirectories.

  • change_pathname

    change path names

  • remove_files

    Remove files from a given list of file paths.

  • remove_dirs

    Remove a list of directories.

  • remove_files_in_paths

    Remove files in the files list in the paths list.

  • remove_dirs_in_paths

    Remove directories in the dirs list in the paths list.

  • copy_file

    Copy a file/folder from the source path to the destination path.

  • move_file

    Move a file/folder from the source path to the destination path.

  • scan_dirs

    Return directories containing with_files and none of without_files.

make_dir(path: str, backup: bool = True)

Create a directory with a backup option.

make_dir_ask_backup(dir_path: str)

Make a directory and ask for backup if the directory already exists.

ask_yes_no(question: str) -> str

Asks a yes/no/backup question and returns the response.

list_paths(paths: list[str], patterns: list[str], recursive=True) -> list[str]

List all files/folders in given directories and their subdirectories that match the given patterns.

Parameters

paths : list[str] The list of paths to search files/folders. patterns : list[str] The list of patterns to apply to the files. Each filter can be a file extension or a pattern.

Returns:

List[str]: A list of matching paths.

Example:
folders = ["path1", "path2", "path3"]
patterns = ["*.ext1", "*.ext2", "something*.ext3", "*folder/"]
files = list_files_in_dirs(folders, patterns)
Note:
  • glob() does not list hidden files by default. To include hidden files, use glob(".*", recursive=True).
  • When use recursive=True, must include ** in the pattern to search subdirectories.
    • glob("*", recursive=True) will search all FILES & FOLDERS in the CURRENT directory.
    • glob("*/", recursive=True) will search all FOLDERS in the current CURRENT directory.
    • glob("**", recursive=True) will search all FILES & FOLDERS in the CURRENT & SUB subdirectories.
    • glob("**/", recursive=True) will search all FOLDERS in the current CURRENT & SUB subdirectories.
    • "/*" is equivalent to "".
    • "/*/" is equivalent to "/".
  • IMPORTANT: "/" will replicate the behavior of "**", then give unexpected results.

collect_files(paths: list[str], patterns: list[str]) -> list[str]

Collect files from a list of paths (files/folders). Will search files in folders and their subdirectories.

Parameters

paths : list[str] The list of paths to collect files from. patterns : list[str] The list of patterns to apply to the files. Each filter can be a file extension or a pattern.

Returns:

List[str]: A list of paths matching files.

change_pathname(paths: list[str], old_string: str, new_string: str, replace: bool = False) -> None

change path names

Parameters:

  • paths (list[str]) –

    paths to the files/dirs

  • old_string (str) –

    old string in path name

  • new_string (str) –

    new string in path name

  • replace (bool, default: False ) –

    replace the old path name if the new one exists. Defaults to False.

remove_files(files: list[str]) -> None

Remove files from a given list of file paths.

Parameters:

  • files (list[str]) –

    list of file paths

remove_dirs(dirs: list[str]) -> None

Remove a list of directories.

Parameters:

  • dirs (list[str]) –

    list of directories to remove.

remove_files_in_paths(files: list, paths: list) -> None

Remove files in the files list in the paths list.

remove_dirs_in_paths(dirs: list, paths: list) -> None

Remove directories in the dirs list in the paths list.

copy_file(src_path: str, dest_path: str)

Copy a file/folder from the source path to the destination path.

move_file(src_path: str, dest_path: str)

Move a file/folder from the source path to the destination path.

scan_dirs(dirs: list[str], with_files: list[str] = None, without_files: list[str] = None) -> list[str]

Return directories containing with_files and none of without_files.

Parameters:

  • dirs (list[str]) –

    List of directory paths to scan.

  • with_files (list[str], default: None ) –

    Files that must exist in the directory. Defaults to [].

  • without_files (list[str], default: None ) –

    Files that must not exist in the directory. Defaults to [].

Returns:

  • list[str]

    List of directory paths meeting the conditions.

pkg

Functions:

create_logger(logger_name: str = None, log_file: str = None, level: str = 'INFO', level_logfile: str = None, format_: str = 'info') -> logging.Logger

Create and configure a logger with console and optional file handlers.

check_package(package_name: str, auto_install: bool = False, git_repo: str = None, conda_channel: str = None)

Check if the required packages are installed

_install_package(package_name: str, git_repo: str = None, conda_channel: str = None)

Install the required package
  • Default using: pip install -U {package_name}
  • If git_repo is provided: pip install -U git+{git_repo}
  • If conda_channel is provided: conda install -c {conda_channel} {package_name}

package_name (str): package name
git_repo (str): git path for the package. Default: None. E.g., http://somthing.git
conda_channel (str): conda channel for the package. Default: None. E.g., conda-forge

get_func_args(func)

Get the arguments of a function

dependency_info(modules=['numpy', 'polars', 'thkit', 'ase']) -> str

Get the dependency information

sth2sth

Functions:

txt2str(file_path: Union[str, Path]) -> str

str2txt(text: str, file_path: Union[str, Path]) -> None

txt2list(file_path: Union[str, Path]) -> list[str]

list2txt(text_list: list, file_path: Union[str, Path]) -> None

float2str(floatnum, decimals=6)

convert float number to str REF: https://stackoverflow.com/questions/2440692/formatting-floats-without-trailing-zeros

Parameters:

  • floatnum (float) –

    float number

  • fmt (str) –

    format of the output string

Returns:

  • s ( str ) –

    string of the float number

stuff

Functions:

chunk_list(input_list: list, n: int) -> Generator

Yield successive n-sized chunks from input_list.

unpack_indices(list_inputs: list[int | str]) -> list[int]

Expand the input list of indices to a list of integers. Eg: list_inputs = [1, 2, "3-5:2", "6-10"]

text_fill_center(input_text='example', fill='-', max_length=60)

Create a line with centered text.

text_fill_left(input_text='example', left_margin=15, fill='-', max_length=60)

Create a line with left-aligned text.

text_fill_box(input_text='', fill=' ', sp='|', max_length=60)

Put the string at the center of | |.

text_repeat(input_str: str, length: int) -> str

Repeat the input string to a specified length.

text_color(text: str, color: str = 'blue') -> str

ANSI escape codes for color the text. follow this link for more details.

time_uuid() -> str

simple_uuid()

Generate a simple random UUID of 4 digits.