qcg.pilotjob.api.manager module

class qcg.pilotjob.api.manager.TimeStamp(manager, timeout_secs=None)

Bases: object

Timestamp utility to trace timeouts and compute the poll times that do not exceed defined timeouts.

Create timestamp. During initialization the timestamp start moment is set to current time.

Parameters:
  • (Manager) (manager) – the manager instance with defined default poll and publisher timeout setting
  • (int|float) (timeout) – the timeout in seconds for operation related with this timestamp, the default value None means the timeout is not defined (infinity)
secs_from_start

Return number of seconds elapsed since start

check_timeout()

Check if timeout has been reached. If timeout_secs has been defined check if timeout_secs have elapsed from started datetime. If timeout_secs is not defined always return False

Returns:True if timeout reached, False otherwise
get_poll_time()

Return the poll time that do not exceed timeout. If timeout already reached, the 0 will be returned.

Returns:poll time in seconds
get_events_timeout()

Return the subscribe timeout that do not exceed total operation timeout. If timeout already reached, the 0 will be returned.

Returns:the timeout time in seconds
class qcg.pilotjob.api.manager.Manager(address=None, cfg=None)

Bases: object

The Manager class is used to communicate with single QCG-PilotJob manager instance.

We assume that QCG-PilotJob manager instance is already running with ZMQ interface. The communication with QCG-PilotJob is fully synchronous.

Initialize instance.

Parameters:
  • address (str) –

    [proto://]host[:port] the default values for ‘proto’ and ‘port’ are respectively - ‘tcp’ and ‘5555’; if ‘address’ is not defined the following procedure will be performed:

    1. if the environment contains QCG_PM_ZMQ_ADDRESS - the value of this var will be used,
    else
    1. the tcp://127.0.0.1:5555 default address will be used
  • cfg (dict) – ‘default_poll_delay’ - the default delay between following status polls in wait methods ‘default_pub_timeout’ - the default timeout for waiting on published events ‘log_file’ - the location of the log file ‘log_level’ - the log level (‘DEBUG’); by default the log level is set to INFO
DEFAULT_ADDRESS_ENV = 'QCG_PM_ZMQ_ADDRESS'
DEFAULT_ADDRESS = 'tcp://127.0.0.1:5555'
DEFAULT_PROTO = 'tcp'
DEFAULT_PORT = '5555'
DEFAULT_POLL_DELAY = 5
DEFAULT_PUB_TIMEOUT = 300
send_request(request)

Method for testing purposes - allows to send any request to the QCG PJM. The received response is validated for correct format.

Parameters:request (dict) – the request data to send
Returns:validated response
Return type:dict
resources()

Return available resources.

Return information about current resource status of QCG PJM.

Returns:data in format described in ‘resourceInfo’ method of QCG PJM.
Return type:dict
Raises:see _send_and_validate_result
submit(jobs)

Submit jobs.

Parameters:

jobs (Jobs) – the job descriptions to submit

Returns:

list of submitted job names

Return type:

list(str)

Raises:
  • InternalError - in case of unexpected result format
  • see _send_and_validate_result
list()

List all jobs.

Return a list of all job names registered in the QCG PJM. Beside the name, each job will contain additional data, like:

status (str) - current job status messages (str, optional) - error message generated during job processing inQueue (int, optional) - current job position in scheduling queue
Returns:

dictionary with job names and attributes

Return type:

dict

Raises:
  • InternalError - in case of unexpected result format
  • see _send_and_validate_result
status(names)

Return current status of jobs.

Parameters:names (str|list(str)) – list of job names to get status for
Returns:
dictionary with job names and status data in format of dictionary with following keys:
status (int): 0 - job found, other value - job not found message (str): an error description data (dict):
jobName: job name status: current job status
Return type:dict
Raises:see _send_and_validate_result
info(names, **kwargs)

Return detailed information about jobs.

Parameters:
  • names (str|list(str)) – list of job names to get detailed information about
  • kwargs (**dict) –

    additional keyword arguments to the info method, currently following attributes are supported:

    withChilds (bool): if True the detailed information about all job’s iterations will be returned
Returns:

dictionary with job names and detailed information in format of dictionary with following keys:

status (int): 0 - job found, other value - job not found message (str): an error description data (dict):

jobName (str): job name status (str): current job status iterations (dict, optional): the information about iteration job

start: start index of iterations stop: stop index of iterations total: total number of iterations finished: already finished number of iterations failed: already failed number of iterations

childs (list(dict), optional): only when ‘withChilds’ option has been used, each entry contains:

iteration (int): the iteration index state (str): current state of iteration runtime (dict): runtime information

messages (str, optional): error description runtime (dict, optional): runtime information, see below history (str): history of status changes, see below

The runtime information can contains following keys:
allocation (str): information about allocated resources in form:

NODE_NAME0[CORE_ID0[:CORE_ID1+]][,NODE_NAME1[CORE_ID0[:CORE_ID1+]]…..]

the nodes are separated by the comma, and each node contain CPU’s identifiers separated by colon : enclosed in square brackets

wd (str): path to the working directory rtime (str): the running time (set at the job’s or job’s iteration finish) exit_code (int): the exit code (set at the job’s or job’s iteration finish)

The history information contains multiple lines, where each line has format:

YEAR-MONTH-DAY HOUR:MINUTE:SECOND.MILLIS: STATE

The first part is a job’s or job’s iteration status change timestamp, and second is the new state.

Return type:

dict

Raises:
  • InternalError – in case the response format is invalid
  • ConnectionError – in case of non zero exit code, or if connection has not been established yet
info_parsed(names, **kwargs)

Return detailed and parsed information about jobs.

The request sent to the QCG-PilotJob manager instance is the same as in info, but the result information is parsed into more simpler to use JobInfo object.

Parameters:
  • names (str|list(str)) – list of job names to get detailed information about
  • kwargs (**dict) –

    additional keyword arguments to the info method, currently following attributes are supported:

    withChilds (bool): if True the detailed information about all job’s iterations will be returned
Returns:

a dictionary with job names and information parsed into JobInfo object

Return type:

dict(str, JobInfo)

Raises:
  • InternalError – in case the response format is invalid
  • ConnectionError – in case of non zero exit code, or if connection has not been established yet
remove(names)

Remove jobs from QCG-PilotJob manager instance.

This function might be useful if we want to submit jobs with the same names as previously used, or to release memory allocated for storing information about already finished jobs. After removing, there will be not possible to get any information about removed jobs.

Parameters:

names (str|list(str)) – list of job names to remove from QCG-PilotJob manager

Raises:
  • InternalError – in case the response format is invalid
  • ConnectionError – in case of non zero exit code, or if connection has not been established yet
cancel(names)

Cancel jobs execution.

This method is currently not supported.

Parameters:names (str|list(str)) – list of job names to cancel
Raises:InternalError – always
finish()

Send finish request to the QCG-PilotJob manager, close connection.

Sending finish request to the QCG-PilotJob manager result in closing instance of QCG-PilotJob manager (with some delay). There will be not possible to send any new requests to this instance of QCG-PilotJob manager.

Raises:
  • InternalError – in case the response format is invalid
  • ConnectionError – in case of non zero exit code, or if connection has not been established yet
cleanup()

Clean up resources.

The custom logging handlers are removed from top logger.

system_status()
wait4(names, timeout=None)

Wait for finish of specific jobs.

This method waits until all specified jobs finish its execution (successfully or not). The QCG-PilotJob manager is periodically polled about status of not finished jobs. The poll interval (2 sec by default) can be changed by defining a ‘poll_delay’ key with appropriate value (in seconds) in configuration of instance.

Parameters:
  • names (str|list(str)) – list of job names to get detailed information about
  • timeout (int|float) – maximum number of seconds to wait
Returns:

dict - a map with job names and their terminal status

Raises:
  • TimeoutElapsed – in case of timeout elapsed
  • InternalError – in case the response format is invalid
  • ConnectionError – in case of non zero exit code, or if connection has not been established yet
wait4all(timeout_secs=None)

Wait for finish of all submitted jobs.

Parameters:(int|float) (timeout_secs) – optional timeout setting in seconds

:raise TimeoutElapsed when timeout elapsed (if defined as argument)

This method waits until all jobs submitted to service finish its execution (successfully or not).

wait4_any_job_finish(timeout_secs=None)

Wait for finish one of any submitted job.

This method waits until one of any jobs submitted to service finish its execution (successfully or not).

:arg timeout_secs (float|int) - timeout in milliseconds, endlessly (None) by default

:raise TimeoutElapsed when timeout elapsed (if defined as argument)

:return (str, str) identifier of finished job and it’s status or None, None if timeout has been reached.

static is_status_finished(status)

Check if status of a job is a terminal status.

Parameters:status (str) – a job status
Returns:true if a given status is a terminal status
Return type:bool
class qcg.pilotjob.api.manager.LocalManager(server_args=None, cfg=None)

Bases: qcg.pilotjob.api.manager.Manager

The Manager class which launches locally (in separate thread) instance of QCG-PilotJob manager

The communication model as all functionality is the same as in Manager class.

Initialize instance.

Launch QCG-PilotJob manager instance in background thread and connect to it. The port number for ZMQ interface of QCG-PilotJob manager instance is randomly selected.

Parameters:
  • server_args (list(str)) –

    the command line arguments for QCG-PilotJob manager instance

    --net enable network interface
    --net-port NET_PORT
     port to listen for network interface (implies –net)
    --net-port-min NET_PORT_MIN
     minimum port range to listen for network interface if exact port number is not defined (implies –net)
    --net-port-max NET_PORT_MAX
     maximum port range to listen for network interface if exact port number is not defined (implies –net)
    --file enable file interface
    --file-path FILE_PATH
     path to the request file (implies –file)
    --wd WD working directory for the service
    --envschema ENVSCHEMA
     job environment schema [auto|slurm]
    --resources RESOURCES
     source of information about available resources [auto|slurm|local] as well as a method of job execution (through local processes or as a Slurm sub jobs)
    --report-format REPORT_FORMAT
     format of job report file [text|json]
    --report-file REPORT_FILE
     name of the job report file
    --nodes NODES configuration of available resources (implies –resources local)
    –log {critical,error,warning,info,debug,notset}
    log level
    --system-core reserve one of the core for the QCG-PJM
    --disable-nl disable custom launching method
    --show-progress
     print information about executing tasks
    --governor run manager in the governor mode, where jobs will be scheduled to execute to the dependant managers
    --parent PARENT
     address of the parent manager, current instance will receive jobs from the parent manaqger
    --id ID optional manager instance identifier - will be generated automatically when not defined
    --tags TAGS optional manager instance tags separated by commas
    --slurm-partition-nodes SLURM_PARTITION_NODES
     split Slurm allocation by given number of nodes, where each group will be controlled by separate manager (implies –governor)
    --slurm-limit-nodes-range-begin SLURM_LIMIT_NODES_RANGE_BEGIN
     limit Slurm allocation to specified range of nodes (starting node)
    --slurm-limit-nodes-range-end SLURM_LIMIT_NODES_RANGE_END
     limit Slurm allocation to specified range of nodes (ending node)

    each command line argument and (optionaly) it’s value should be passed as separate entry in the list

  • cfg (dict) –
    ‘init_timeout’ - the timeout (in seconds) client should wait for QCG-PilotJob manager start until it raise
    error, 300 by default

    ’poll_delay’ - the delay between following status polls in wait methods ‘log_file’ - the location of the log file ‘log_level’ - the log level (‘DEBUG’); by default the log level is set to INFO

finish()

Send a finish control message to the manager and stop the manager’s process.

Sending finish request to the QCG-PilotJob manager result in closing instance of QCG-PilotJob manager (with some delay). There will be not possible to send any new requests to this instance of QCG-PilotJob manager.

If the manager process won’t stop in 10 seconds it will be terminated. We also call the ‘cleanup’ method.

Raises:
  • InternalError – in case the response format is invalid
  • ConnectionError – in case of non zero exit code, or if connection has not been established yet
kill_manager_process()

Terminate the manager’s process with the SIGTERM signal.

In normal conditions the finish method should be called.

static is_notebook()