qcg.pilotjob.launcher.launcher module

class qcg.pilotjob.launcher.launcher.Launcher(config, wdir, aux_dir, manager)

Bases: object

The launcher service used to launch applications on remote nodes.

All nodes should have shared file system.

work_dir

path to the working directory

Type

str

aux_dir

path to the auxilary directory

Type

str

zmq_ctx

ZMQ context

Type

zmq.Context

agents

= requested agent instances

Type

dict

nodes

registered agent instances

Type

dict

jobs_def_cb

application finish default callback

Type

def

jobs_cb

application finish callbacks

Type

dict

node_local_agent_cmd

list of command arguments to start agent on local node

Type

list

node_ssh_agent_cmd

list of command arguments to start agent on remote node via ssh

Type

str

in_socket
Type

zmq.Socket

local_address
Type

str

local_export_address
Type

str

iface_task
Type

asyncio.Future

Initialize instance.

Parameters
  • config (dict) – configuration dictionary

  • wdir (str) – path to the working directory (the same on all nodes)

  • aux_dir (str) – path to the auxilary directory (the same on all nodes)

  • manager (Manager) – manager used to call scheduler loop on certain events

MIN_PORT_RANGE = 10000
MAX_PORT_RANGE = 40000
START_TIMEOUT_SECS = 600
SHUTDOWN_TIMEOUT_SECS = 30
MAXIMUM_CONCURRENT_CONNECTIONS = 1000
set_job_finish_callback(jobs_finish_cb, *jobs_finish_cb_args)

Set default function for notifing about finished jobs.

Parameters
  • callback (jobs_finish_cb - optional default job finish) –

  • parameters (jobs_finish_cb_args - job finish callback) –

async start(instances, local_port=None)

Initialize launcher with given agent instances. The instances data must contain list of places along with the data needed to initialize instance services.

Parameters
  • instances ({} []) – agent_id - the agent identifier ssh - if the instance should be run via ssh (account?,host?,remote_python?) slurm - if the instance should be run via slurm () local - if the instance should be run on a local machine

  • connections (local_port - optional local port for the incoming) – the available random port will be chosen from the range

  • defined (if not) – the available random port will be chosen from the range

:param : the available random port will be chosen from the range

async stop()

Stop all agents and release resources.

async submit(agent_id, app_id, jname, args, stdin=None, stdout=None, stderr=None, env=None, wdir=None, cores=None, finish_cb=None, finish_cb_args=None)

Submit application to be launched by the selected agent.

Parameters
  • applicaton (agent_id - agent that should launch) –

  • identifier (app_id - application) –

  • name (jname - job) –

  • arguments (finish_cb_args - job finish callback) –

  • file (stderr - path to the standard error) –

  • file

  • file

  • variables (env - environment) –

  • directory (wdir - working) –

  • to (cores - the list of cores application should be binded) –

  • callback (finish_cb - job finish) –

  • arguments

async cancel(agent_id, app_id)

Cancel sumited application by the selected agent.

Parameters
  • applicaton (agent_id - agent that launched) –

  • identifier (app_id - application) –

qcg.pilotjob.launcher.launcher.finish_callback_default(text, message)

Just for testing.

qcg.pilotjob.launcher.launcher.finish_callback(text, jobid, message)

Just for testing.

async qcg.pilotjob.launcher.launcher.run_job(launcher, agent_id, appid, args, stdin=None, stdout=None, stderr=None, env=None)

Just for testing.

async qcg.pilotjob.launcher.launcher.test()

Some simple test