Source code for py_swf.clients.admin

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals

import functools

from botocore.exceptions import ClientError


__all__ = ['WorkflowRegistrar']


def idempotent_create(func):
    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except ClientError as e:
            if e.response['Error']['Code'] not in set([
                'DomainAlreadyExistsFault',
                'TypeAlreadyExistsFault',
            ]):
                raise
    return wrapped


[docs]class WorkflowRegistrar(object): """A client that allows creation of SWF environments. Allows you to create domains, task lists, workflow types, and activity types. :param boto_client: A raw SWF boto3 client. :type boto_client: :class:`~SWF.Client` """ def __init__(self, boto_client): self.boto_client = boto_client @idempotent_create
[docs] def register_domain(self, name, description=None, retention=90): """Creates an swf domain. Passthrough to :meth:`~SWF.Client.register_domain`. :param name: Name of the domain :type name: string :param description: An optional description of the domain. :type description: string :param retention: An optional retention of the domain. Measured in days. If None is passed in, the retention is 0. Defaults to 90 days. :type retention: int :return: None :rtype: NoneType """ if retention is None: retention = 'NONE' kwargs = dict( name=name, workflowExecutionRetentionPeriodInDays=str(retention), ) if description is not None: kwargs['description'] = description self.boto_client.register_domain( **kwargs )
@idempotent_create
[docs] def register_activity_type( self, domain, name, version, task_heartbeat_timeout=None, task_list_name=None, task_priority=None, task_schedule_to_close_timeout=None, task_schedule_to_start_timeout=None, task_start_to_close_timeout=None, description=None, ): """Creates an activity type Passthrough to :meth:`~SWF.Client.register_activity_type`. :param domain: Registers the activity type to this domain. :type domain: string :param name: Name of the activity type to register. :type name: string :param version: Freeform text that represents the version of the activity type. :type version: string :param task_heartbeat_timeout: Represents a timeout for subsequent heartbeats sent by an activity task. Measured in seconds. :type task_heartbeat_timeout: double :param task_list_name: Registers the activity type to this task list name. :type task_list_name: string :param task_priority: Represents a priority for this task to run. Valid within the range of (-2147483648, 2147483647). Higher numbers indicate higher priority. :type task_priority: int :param task_schedule_to_close_timeout: The timeout for activities when scheduled until response. Measured in seconds. :type task_schedule_to_close_timeout: double :param task_schedule_to_start_timeout: The timeout for activities when submitted until they are scheduled. Measured in seconds. :type task_schedule_to_start_timeout: double :param task_start_to_close_timeout: End-to-end timeout for when activities are submitted until response. Measured in seconds. :type task_start_to_close_timeout: double :param description: Describes what the activity task does :type description: string :return: None :rtype: NoneType """ kwargs = dict( domain=domain, name=name, version=version, ) if task_heartbeat_timeout is not None: kwargs['defaultTaskHeartbeatTimeout'] = str(task_heartbeat_timeout) if task_list_name is not None: kwargs['defaultTaskList'] = { "name": task_list_name, } if task_priority is not None: kwargs['defaultTaskPriority'] = str(task_priority) if task_schedule_to_close_timeout is not None: kwargs['defaultTaskScheduleToCloseTimeout'] = str(task_schedule_to_close_timeout) if task_schedule_to_start_timeout is not None: kwargs['defaultTaskScheduleToStartTimeout'] = str(task_schedule_to_start_timeout) if task_start_to_close_timeout is not None: kwargs['defaultTaskStartToCloseTimeout'] = str(task_start_to_close_timeout) if description is not None: kwargs['description'] = description self.boto_client.register_activity_type( **kwargs )
@idempotent_create
[docs] def register_workflow_type( self, domain, name, version, child_policy=None, execution_start_to_close_timeout=None, lambda_role=None, task_list_name=None, task_priority=None, task_start_to_close_timeout=None, description=None, ): """Creates a workflow type Passthrough to :meth:`~SWF.Client.register_workflow_type`. :param domain: Registers the workflow type to this domain. :type domain: string :param name: Name of the workflow type to register. :type name: string :param version: Freeform text that represents the version of the workflow type. :type version: string :param child_policy: Determins the child workflow policy upon termination. :type child_policy: string of ('TERMINATE'|'REQUEST_CANCEL'|'ABANDON') :param execution_start_to_close_timeout: Represents the end-to-end timeout for a workflow. Measured in seconds. :type execution_start_to_close_timeout: double :param lambda_role: The IAM role attached to this workflow execution to use when invoking AWS Lambda functions. :type lambda_role: string :param task_list_name: Registers this workflow type to this task list. :type task_list_name: string :param task_priority: Sets the task priority for this workflow type. Valid values are in the range of (-2147483648, 2147483647). Higher numbers indicate higher priority. :type task_priority: int :param task_start_to_close_timeout: Represents end-to-end activity task timeouts. Measured in seconds. :type task_start_to_close_timeout: double :param description: Describes what the workflow type does. :type description: string :return: None :rtype: NoneType """ kwargs = dict( domain=domain, name=name, version=version, ) if child_policy is not None: kwargs["defaultChildPolicy"] = child_policy if execution_start_to_close_timeout is not None: kwargs["defaultExecutionStartToCloseTimeout"] = execution_start_to_close_timeout if lambda_role is not None: kwargs["defaultLambdaRole"] = lambda_role if task_list_name is not None: kwargs["defaultTaskList"] = { "name": task_list_name, } if task_priority is not None: kwargs["defaultTaskPriority"] = task_priority if task_start_to_close_timeout is not None: kwargs["defaultTaskStartToCloseTimeout"] = task_start_to_close_timeout if description is not None: kwargs["description"] = description self.boto_client.register_workflow_type( **kwargs )