qcg.pilotjob.api.manager module

class qcg.pilotjob.api.manager.TimeStamp(manager, timeout_secs=None)

Bases: object

Timestamp utility to trace timeouts and compute the poll times that do not exceed defined timeouts.

Create timestamp. During initialization the timestamp start moment is set to current time.

Parameters
  • (Manager) (manager) – the manager instance with defined default poll and publisher timeout setting

  • (int|float) (timeout) – the timeout in seconds for operation related with this timestamp, the default value None means the timeout is not defined (infinity)

property secs_from_start

Return number of seconds elapsed since start

check_timeout()

Check if timeout has been reached. If timeout_secs has been defined check if timeout_secs have elapsed from started datetime. If timeout_secs is not defined always return False

Returns

True if timeout reached, False otherwise

get_poll_time()

Return the poll time that do not exceed timeout. If timeout already reached, the 0 will be returned.

Returns

poll time in seconds

get_events_timeout()

Return the subscribe timeout that do not exceed total operation timeout. If timeout already reached, the 0 will be returned.

Returns

the timeout time in seconds

class qcg.pilotjob.api.manager.Manager(address=None, cfg=None)

Bases: object

The Manager class is used to communicate with single QCG-PilotJob manager instance.

We assume that QCG-PilotJob manager instance is already running with ZMQ interface. The communication with QCG-PilotJob is fully synchronous.

Initialize instance.

Parameters
  • address (str) –

    [proto://]host[:port] the default values for ‘proto’ and ‘port’ are respectively - ‘tcp’ and ‘5555’; if ‘address’ is not defined the following procedure will be performed:

    1. if the environment contains QCG_PM_ZMQ_ADDRESS - the value of this var will be used,

    else

    1. the tcp://127.0.0.1:5555 default address will be used

  • cfg (dict) – ‘default_poll_delay’ - the default delay between following status polls in wait methods ‘default_pub_timeout’ - the default timeout for waiting on published events ‘log_file’ - the location of the log file ‘log_level’ - the log level (‘DEBUG’); by default the log level is set to INFO

DEFAULT_ADDRESS_ENV = 'QCG_PM_ZMQ_ADDRESS'
DEFAULT_ADDRESS = 'tcp://127.0.0.1:5555'
DEFAULT_PROTO = 'tcp'
DEFAULT_PORT = '5555'
DEFAULT_POLL_DELAY = 5
DEFAULT_PUB_TIMEOUT = 300
send_request(request)

Method for testing purposes - allows to send any request to the QCG PJM. The received response is validated for correct format.

Parameters

request (dict) – the request data to send

Returns

validated response

Return type

dict

resources()

Return available resources.

Return information about current resource status of QCG PJM.

Returns

data in format described in ‘resourceInfo’ method of QCG PJM.

Return type

dict

Raises

see _send_and_validate_result

submit(jobs)

Submit jobs.

Parameters

jobs (Jobs) – the job descriptions to submit

Returns

list of submitted job names

Return type

list(str)

Raises
  • InternalError - in case of unexpected result format

  • see _send_and_validate_result

list()

List all jobs.

Return a list of all job names registered in the QCG PJM. Beside the name, each job will contain additional data, like:

status (str) - current job status messages (str, optional) - error message generated during job processing inQueue (int, optional) - current job position in scheduling queue

Returns

dictionary with job names and attributes

Return type

dict

Raises
  • InternalError - in case of unexpected result format

  • see _send_and_validate_result

status(names)

Return current status of jobs.

Parameters

names (str|list(str)) – list of job names to get status for

Returns

dictionary with job names and status data in format of dictionary with following keys:

status (int): 0 - job found, other value - job not found message (str): an error description data (dict):

jobName: job name status: current job status

Return type

dict

Raises

see _send_and_validate_result

info(names, **kwargs)

Return detailed information about jobs.

Parameters
  • names (str|list(str)) – list of job names to get detailed information about

  • kwargs (**dict) –

    additional keyword arguments to the info method, currently following attributes are supported:

    withChilds (bool): if True the detailed information about all job’s iterations will be returned

Returns

dictionary with job names and detailed information in format of dictionary with following keys:

status (int): 0 - job found, other value - job not found message (str): an error description data (dict):

jobName (str): job name status (str): current job status iterations (dict, optional): the information about iteration job

start: start index of iterations stop: stop index of iterations total: total number of iterations finished: already finished number of iterations failed: already failed number of iterations

childs (list(dict), optional): only when ‘withChilds’ option has been used, each entry contains:

iteration (int): the iteration index state (str): current state of iteration runtime (dict): runtime information

messages (str, optional): error description runtime (dict, optional): runtime information, see below history (str): history of status changes, see below

The runtime information can contains following keys:
allocation (str): information about allocated resources in form:

NODE_NAME0[CORE_ID0[:CORE_ID1+]][,NODE_NAME1[CORE_ID0[:CORE_ID1+]]…..]

the nodes are separated by the comma, and each node contain CPU’s identifiers separated by colon : enclosed in square brackets

wd (str): path to the working directory rtime (str): the running time (set at the job’s or job’s iteration finish) exit_code (int): the exit code (set at the job’s or job’s iteration finish)

The history information contains multiple lines, where each line has format:

YEAR-MONTH-DAY HOUR:MINUTE:SECOND.MILLIS: STATE

The first part is a job’s or job’s iteration status change timestamp, and second is the new state.

Return type

dict

Raises
  • InternalError – in case the response format is invalid

  • ConnectionError – in case of non zero exit code, or if connection has not been established yet

info_parsed(names, **kwargs)

Return detailed and parsed information about jobs.

The request sent to the QCG-PilotJob manager instance is the same as in info, but the result information is parsed into more simpler to use JobInfo object.

Parameters
  • names (str|list(str)) – list of job names to get detailed information about

  • kwargs (**dict) –

    additional keyword arguments to the info method, currently following attributes are supported:

    withChilds (bool): if True the detailed information about all job’s iterations will be returned

Returns

a dictionary with job names and information parsed into JobInfo object

Return type

dict(str, JobInfo)

Raises
  • InternalError – in case the response format is invalid

  • ConnectionError – in case of non zero exit code, or if connection has not been established yet

remove(names)

Remove jobs from QCG-PilotJob manager instance.

This function might be useful if we want to submit jobs with the same names as previously used, or to release memory allocated for storing information about already finished jobs. After removing, there will be not possible to get any information about removed jobs.

Parameters

names (str|list(str)) – list of job names to remove from QCG-PilotJob manager

Raises
  • InternalError – in case the response format is invalid

  • ConnectionError – in case of non zero exit code, or if connection has not been established yet

cancel(names)

Cancel jobs execution.

This method is currently not supported.

Parameters

names (str|list(str)) – list of job names to cancel

Raises

InternalError – always

finish()

Send finish request to the QCG-PilotJob manager, close connection.

Sending finish request to the QCG-PilotJob manager result in closing instance of QCG-PilotJob manager (with some delay). There will be not possible to send any new requests to this instance of QCG-PilotJob manager.

Raises
  • InternalError – in case the response format is invalid

  • ConnectionError – in case of non zero exit code, or if connection has not been established yet

cleanup()

Clean up resources.

The custom logging handlers are removed from top logger.

system_status()
wait4(names, timeout=None)

Wait for finish of specific jobs.

This method waits until all specified jobs finish its execution (successfully or not). The QCG-PilotJob manager is periodically polled about status of not finished jobs. The poll interval (2 sec by default) can be changed by defining a ‘poll_delay’ key with appropriate value (in seconds) in configuration of instance.

Parameters
  • names (str|list(str)) – list of job names to get detailed information about

  • timeout (int|float) – maximum number of seconds to wait

Returns

dict - a map with job names and their terminal status

Raises
wait4all(timeout_secs=None)

Wait for finish of all submitted jobs.

Parameters

(int|float) (timeout_secs) – optional timeout setting in seconds

:raise TimeoutElapsed when timeout elapsed (if defined as argument)

This method waits until all jobs submitted to service finish its execution (successfully or not).

wait4_any_job_finish(timeout_secs=None)

Wait for finish one of any submitted job.

This method waits until one of any jobs submitted to service finish its execution (successfully or not).

:arg timeout_secs (float|int) - timeout in milliseconds, endlessly (None) by default

:raise TimeoutElapsed when timeout elapsed (if defined as argument)

:return (str, str) identifier of finished job and it’s status or None, None if timeout has been reached.

static is_status_finished(status)

Check if status of a job is a terminal status.

Parameters

status (str) – a job status

Returns

true if a given status is a terminal status

Return type

bool

class qcg.pilotjob.api.manager.LocalManager(server_args=None, cfg=None)

Bases: Manager

The Manager class which launches locally (in separate thread) instance of QCG-PilotJob manager

The communication model as all functionality is the same as in Manager class.

Initialize instance.

Launch QCG-PilotJob manager instance in background thread and connect to it. The port number for ZMQ interface of QCG-PilotJob manager instance is randomly selected.

Parameters
  • server_args (list(str)) –

    the command line arguments for QCG-PilotJob manager instance

    --net

    enable network interface

    --net-port NET_PORT

    port to listen for network interface (implies –net)

    --net-port-min NET_PORT_MIN

    minimum port range to listen for network interface if exact port number is not defined (implies –net)

    --net-port-max NET_PORT_MAX

    maximum port range to listen for network interface if exact port number is not defined (implies –net)

    --file

    enable file interface

    --file-path FILE_PATH

    path to the request file (implies –file)

    --wd WD

    working directory for the service

    --envschema ENVSCHEMA

    job environment schema [auto|slurm]

    --resources RESOURCES

    source of information about available resources [auto|slurm|local] as well as a method of job execution (through local processes or as a Slurm sub jobs)

    --report-format REPORT_FORMAT

    format of job report file [text|json]

    --report-file REPORT_FILE

    name of the job report file

    --nodes NODES

    configuration of available resources (implies –resources local)

    –log {critical,error,warning,info,debug,notset}

    log level

    --system-core

    reserve one of the core for the QCG-PJM

    --disable-nl

    disable custom launching method

    --show-progress

    print information about executing tasks

    --governor

    run manager in the governor mode, where jobs will be scheduled to execute to the dependant managers

    --parent PARENT

    address of the parent manager, current instance will receive jobs from the parent manaqger

    --id ID

    optional manager instance identifier - will be generated automatically when not defined

    --tags TAGS

    optional manager instance tags separated by commas

    --slurm-partition-nodes SLURM_PARTITION_NODES

    split Slurm allocation by given number of nodes, where each group will be controlled by separate manager (implies –governor)

    --slurm-limit-nodes-range-begin SLURM_LIMIT_NODES_RANGE_BEGIN

    limit Slurm allocation to specified range of nodes (starting node)

    --slurm-limit-nodes-range-end SLURM_LIMIT_NODES_RANGE_END

    limit Slurm allocation to specified range of nodes (ending node)

    each command line argument and (optionaly) it’s value should be passed as separate entry in the list

  • cfg (dict) –

    ‘init_timeout’ - the timeout (in seconds) client should wait for QCG-PilotJob manager start until it raise

    error, 300 by default

    ’poll_delay’ - the delay between following status polls in wait methods ‘log_file’ - the location of the log file ‘log_level’ - the log level (‘DEBUG’); by default the log level is set to INFO

finish()

Send a finish control message to the manager and stop the manager’s process.

Sending finish request to the QCG-PilotJob manager result in closing instance of QCG-PilotJob manager (with some delay). There will be not possible to send any new requests to this instance of QCG-PilotJob manager.

If the manager process won’t stop in 10 seconds it will be terminated. We also call the ‘cleanup’ method.

Raises
  • InternalError – in case the response format is invalid

  • ConnectionError – in case of non zero exit code, or if connection has not been established yet

kill_manager_process()

Terminate the manager’s process with the SIGTERM signal.

In normal conditions the finish method should be called.

static is_notebook()