SIO Workers

The idea behind sioworkers module is that sometimes systems need to perform some relatively long-term computations. This module provides a set of convenience classes and functions which can be helpful implementing the batch tasks themselves. It is not a batch-scheduler.

This mission is accomplished by providing a unified pythonic interface for representing parameters, input and output of batch jobs and running the jobs once these parameters are available.

The environ

This mysterious “pythonic interface” is actually a dictionary. Its keys are strings, and values are Python primitive types, like lists, dictionaries, strings etc. In practice this may be anything serializable to JSON. This dictionary is called environ everywhere. The environ is the only argument passed to sio.workers.runner.run() function and the only thing returned by it.

Many jobs use the filetracker module, so you may be happier if you learn about it somewhat.

environ keys common to all jobs

Keys that must be present to run a job:

job_type
name of the job to run.

Keys affected by all jobs:

result
SUCCESS if the job finished without throwing an exception, FAILURE otherwise,
exception
(set only if an exception was thrown) the exception, converted to string,
traceback
(set only if an exception was thrown) the traceback, converted to string.

Refer to the documentation of a particular job to learn what other arguments are expected and what information is returned back in the environ.

In general regular errors which may happen as a result of the job should not be signalled by throwing an exception (for example compilation errors for the compilation job). Exceptions should suggest some potentially important system problems like sandbox misconfiguration or out of disk space.

Running jobs

From Python:

sio.workers.runner.run(environ)

Performs the work passed in environ.

Returns the modified environ. It might be modified in-place by work implementations.

The following keys in environ have special meaning:

job_type
Mandatory key naming the job to be run.
prefilters
Optional list of filter names to apply before performing the work.
postfilters
Optional list of filter names to apply after performing the work.

Refer to Interacting with Filetracker for more information about filters.

There are also bindings for Celery in sio.celery.

From the shell, you may use the sio-batch script, which expects an environment variable environ to be some JSON. After running the job, the output is printed to the standard output in the following format:

--- BEGIN ENVIRON ---
<jsonified environ>
--- END ENVIRON ---

For developers

Hi, developer! Nice to meet you!

Creating jobs

Creating jobs ist überleicht.

You just need to define a function with one argument... the environ, returning one thing... the environ. You may define it in any module, provided that it is registered with pkg_resources aka setuptools as an entry point, under the key sio.jobs.

The function may use the current directory in any way — it will be run from inside a temporary directory which will be deleted automatically.

For example, the following setup.py defines a module with a job named szescblotastop:

from setuptools import setup, find_packages
setup(
    name = "mymud",
    version = '0.1',
    packages = find_packages(),
    entry_points = {
        'sio.jobs': [
            'szescblotastop = mudmodule.mudsubmodule.mud.mud.mud:mud_fun',
        ]
    }
)

Sandboxes

class sio.workers.sandbox.Sandbox(name)

Represents a sandbox... that is some place in the filesystem when the previously prepared package with some software is extracted.

This class deals only with using sandboxes, not creating, changing or uploading them. Each sandbox is uniquely identified by name. The moment you create the instance of Sandbox, an appropriate archive is downloaded and extracted (if not exists; also a check for newer version is performed). The path to the extracted sandbox is in the path attribute. This path is valid as long as the Sandbox instance exists (is not garbage collected).

Sandbox images are looked up from two places:

  • from Filetracker, at path /sandboxes/<name>,
  • if not found there, the URL from SIO_SANDBOXES_URL environment variable is used,
  • if such environment variable is not defined, some default URL is used.

Sandboxes are extracted to the folder named in SIO_SANDBOXES_BASEDIR environment variable (or in ~/.sio-sandboxes if the variable is not in the environment).

Note

Processes must not modify the content of the extracted sandbox in any way. It is also safe to use the same sandbox by multiple processes concurrently, as the folder is locked to ensure no problems if an upgrade is needed.

Note

Sandbox is a context manager, so it should be used in a with statement. Upon entering, the sandbox is downloaded, extracted and locked, to prevent other processes from performing an upgrade.

Note

Do not constuct instances of this class yourself, use get_sandbox(). Otherwise you may encounter deadlocks when having two Sandbox instances of the same name.

Executing external programs

sio.workers.execute.execute(command, env=None, split_lines=False, ignore_errors=False, extra_ignore_errors=(), stdin='', mem_limit=None, time_limit=None, real_time_limit=None, environ=None, environ_prefix='')

Utility function to execute a command and return the output.

command
The command to execute — may be a list or a string. If this is a list, all the arguments will be shell-quoted unless wrapped in sio.workers.execute.nowrap. If this is a string, it will be directly passed to subprocess.Popen with shell=True.
env
The dictionary passed as environment. Non-string values are automatically converted to strings. If not present, the current process’ environment is used. In all cases, the environment is augmented by adding LC_ALL and LANGUAGE set to en_US.UTF-8.
split_lines
If True, the output from the called program is returned as a list of lines, otherwise just one big string.
ignore_errors
Do not throw ExecError if the program exits with non-zero code.
extra_ignore_errors
Do not throw ExecError if the program exits with one of the error codes in extra_ignore_errors.
stdin
Data to pass to the standard input of the program.
mem_limit
Memory limit (ulimit -v), in MB.
time_limit
CPU time limit (ulimit -s), in seconds.
real_time_limit
Wall clock time limit, in seconds.
environ
If present, this should be the environ dictionary. It’s used to extract values for mem_limit, time_limit and real_time_limit from it.
environ_prefix
Prefix for mem_limit, time_limit and real_time_limit keys in environ.

The function return the tuple (retcode, output) where retcode is the program’s return code and the output is program’s stdout and stderr.

Interacting with Filetracker

Filetracker should be your friend if you are coding for sio-workers. We can somewhat help you interacting with it by providing the most demanded functions in the world:

sio.workers.ft.download(environ, key, dest=None, skip_if_exists=False, **kwargs)

Downloads the file from environ[key] and saves it to dest.

dest
A filename, directory name or None. In the two latter cases, the file is named the same as in environ[key].
skip_if_exists
If True and dest points to an existing file (not a directory or None), then the file is not downloaded.
**kwargs
Passed directly to filetracker.Client.get_file().

The value under environ['use_filetracker'] affects downloading in the followins way:

  • if True, nothing special happens
  • if False, the file is not downloaded from filetracker, but the passed path is assumed to be a regular filesystem path
  • if 'auto', the file is assumed to be a local filename only if it is a relative path (this is usually the case when developers play).

Returns the path to the saved file.

sio.workers.ft.upload(environ, key, source, dest=None, **kwargs)

Uploads the file from source to filetracker under environ[key] name.

source
Filename to upload.
dest
A filename, directory name or None. In the two latter cases, the file is named the same as in environ[key].
**kwargs
Passed directly to filetracker.Client.put_file().

See the note about environ['use_filetracker'] in sio.workers.ft.download().

Returns the filetracker path to the saved file.

sio.workers.ft.instance()

Returns a singleton instance of filetracker.Client.

There is also a convenience function for starting the Filetracker server, but this is only useful in complex setups when one wants to configure the worker machines to share cached files between themselves.

sio.workers.ft.launch_filetracker_server()

Launches the Filetracker server if FILETRACKER_PUBLIC_URL is present in os.environ and the server does not appear to be running.

The server is run in the background and the function returns once the server is up and running.

There is also a command-line script called sio-run-filetracker which calls this function.

Example

Here’s an example of a job running the specified binary file in a controlled environment (beware, as this is not the actual implementation of the exec job from sio-exec package):

from sio.workers import ft, Failure
from sio.workers.execute import execute, noquote
from sio.workers.sandbox import get_sandbox

def run(environ):
    exe_file = ft.download(environ, 'exe_file', 'exe', add_to_cache=True)
    os.chmod(exe_file, 0700)
    in_file = ft.download(environ, 'in_file', 'in', add_to_cache=True)
    sandbox = get_sandbox('exec-sandbox')
    env = os.environ.copy()
    env['MEM_LIMIT'] = 256000
    retcode, output = execute(
            [os.path.join(sandbox.path, 'bin', 'supervisor'), '-f', '3',
                './exe',
                noquote('<'), 'in',
                noquote('3>'), 'supervisor_result',
                noquote('>'), 'out'],
            env=env)
    result_file = open('supervisor_result')
    environ['status_line'] = result_file.readline().strip()
    result_file.close()
    ft.upload(environ, 'out_file', 'out')
    return environ

Creating filters

Filters are boring. There are no filters at the moment.

Filters are functions with one argument... the environ, returning one thing... the environ. They may be defined in any modules, provided that they are registered with pkg_resources aka setuptools as entry points, under the key sio.workers.filters.

For example, the following setup.py defines a module with a filter:

from setuptools import setup, find_packages
setup(
    name = "mypackage",
    version = '0.1',
    packages = find_packages(),
    entry_points = {
        'sio.workers.filters': [
            'superfilter = mypackage.submodule:superfilter_function',
        ]
    }
)

The ping job

There is also a single job called ping available for testing. It expects an ping key in the environment and and basically does:

environ['pong'] = environ['ping']

Integration with Celery

There is also a script sio-celery-worker which starts the Celery daemon with the default configuration. The configuration is available in sio.celery.default_config, so a custom celeryconfig.py (for use with a stock celeryd) may look like this:

from sio.celery.default_config import *
BROKER_URL = 'amqp://foo@bar:server/vhost'

Available jobs

Indices and tables