Source code for py_swf.clients.decision

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals

from collections import namedtuple

from botocore.vendored.requests.exceptions import ReadTimeout

from py_swf.errors import NoTaskFound


__all__ = ['DecisionClient', 'DecisionTask']


DecisionTask = namedtuple('DecisionTask', 'events task_token workflow_id workflow_run_id workflow_type')
"""Contains the metadata to execute a decision task.

See the response syntax in :meth:`~SWF.Client.poll_for_decision_task`.
"""


def nametuplefy(thing):
    """Recursively turns a dict into namedtuples."""
    if type(thing) == dict:
        # Only supports string keys
        Dict = namedtuple('Dict', ' '.join(thing.keys()))

        nametuplefied_children = {}

        for k, v in thing.items():
            nametuplefied_children[k] = nametuplefy(v)

        return Dict(**nametuplefied_children)
    if type(thing) == list:
        return list(map(nametuplefy, thing))
    else:
        return thing


[docs]class DecisionClient(object): """A client that provides a pythonic API for polling and responding to decision tasks through an SWF boto3 client. :param decision_config: Contains SWF values commonly used when making SWF api calls. :type decision_config: :class:`~py_swf.config_definitions.DecisionConfig` :param boto_client: A raw SWF boto3 client. :type boto_client: :class:`~SWF.Client` """ def __init__(self, decision_config, boto_client): self.decision_config = decision_config self.boto_client = boto_client
[docs] def poll(self, identity=None, use_raw_event_history=False): """Opens a connection to AWS and long-polls for decision tasks. When a decision is available, this function will return with exactly one decision task to execute. Only returns a contiguous subset of the most recent events. If you want to grab the entire history for a workflow, use :meth:`~py_swf.decision.DecisionClient.walk_execution_history` Passthrough to :meth:`~SWF.Client.poll_for_decision_task`. :param identity: A freeform text that identifies the client that performed the longpoll. Useful for debugging history. :type identity: string :param use_raw_event_history: Whether to use the raw dictionary event history returned from AWS. Otherwise attempts to turn dictionaries into namedtuples recursively. :type use_raw_event_history: bool :return: A decision task to execute. :rtype: DecisionTask :raises py_swf.errors.NoTaskFound: Raised when polling for a decision task times out without receiving any tasks. """ kwargs = dict( domain=self.decision_config.domain, reverseOrder=True, taskList={ 'name': self.decision_config.task_list, }, ) # boto doesn't like None values for optional kwargs if identity is not None: kwargs['identity'] = identity try: results = self.boto_client.poll_for_decision_task( **kwargs ) except ReadTimeout as e: raise NoTaskFound(e) # Sometimes SWF gives us an incomplete response, ignore these. if not results.get('taskToken', None): raise NoTaskFound('Received results with no taskToken') events = results['events'] if not use_raw_event_history: events = nametuplefy(events) return DecisionTask( events=events, task_token=results['taskToken'], workflow_id=results['workflowExecution']['workflowId'], workflow_run_id=results['workflowExecution']['runId'], workflow_type=results['workflowType'], )
[docs] def walk_execution_history( self, workflow_id, workflow_run_id, reverse_order=True, use_raw_event_history=False, maximum_page_size=1000, ): """Lazily walks through the entire workflow history for a given workflow_id. This will make successive calls to SWF on demand when pagination is needed. See :meth:`~SWF.Client.get_workflow_execution_history` for more information. :param workflow_id: The workflow_id returned from :meth:`~py_swf.clients.decision.DecisionClient.poll`. :type identity: string :param workflow_run_id: The workflow_run_id returned from :meth:`~py_swf.clients.decision.DecisionClient.poll`. :type identity: string :param reverse_order: Passthru for reverseOrder to :meth:`~SWF.Client.get_workflow_execution_history` :type identity: bool :param use_raw_event_history: Whether to use the raw dictionary event history returned from AWS. Otherwise attempts to turn dictionaries into namedtuples recursively. :type use_raw_event_history: bool :param maximum_page_size: Passthru for maximumPageSize to :meth:`~SWF.Client.get_workflow_execution_history` :type identity: int :return: A generator that returns successive elements in the workflow execution history. :rtype: collections.Iterable """ kwargs = dict( domain=self.decision_config.domain, reverseOrder=reverse_order, execution=dict( workflowId=workflow_id, runId=workflow_run_id, ), maximumPageSize=maximum_page_size, ) while True: results = self.boto_client.get_workflow_execution_history( **kwargs ) next_page_token = results.get('nextPageToken', None) events = results['events'] for event in events: if not use_raw_event_history: event = nametuplefy(event) yield event if next_page_token is None: break kwargs['nextPageToken'] = next_page_token
[docs] def finish_decision_with_activity( self, task_token, activity_id, activity_name, activity_version, activity_input, schedule_to_close_timeout=None, schedule_to_start_timeout=None, start_to_close_timeout=None, heartbeat_timeout=None, ): """Responds to a given decision task's task_token to schedule an activity task to run. Passthrough to :meth:`~SWF.Client.respond_decision_task_completed`. :param task_token: The task_token returned from :meth:`~py_swf.clients.decision.DecisionClient.poll`. :type identity: string :param activity_id: A unique identifier for the activity task. :type identity: string :param activity_name: Which activity name to execute. :type identity: string :param activity_name: Version of the activity name. :type identity: string :param activity_input: Freeform text of the input for the activity :type identity: string :param schedule_to_close_timeout: Override default timeout for activity from schedule to finish More info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ScheduleActivityTaskDecisionAttributes.html :type identity: int :param schedule_to_start_timeout: Override default timeout for activity from schedule to start More info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ScheduleActivityTaskDecisionAttributes.html :type identity: int :param start_to_close_timeout: Override default timeout for activity from start to finish More info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ScheduleActivityTaskDecisionAttributes.html :type identity: int :return: None :rtype: NoneType """ activity_task = build_activity_task( activity_id, activity_name, activity_version, activity_input, self.decision_config, schedule_to_close_timeout, schedule_to_start_timeout, start_to_close_timeout, heartbeat_timeout, ) self.boto_client.respond_decision_task_completed( taskToken=task_token, decisions=[activity_task], )
[docs] def finish_workflow(self, task_token, result): """Responds to a given decision task's task_token to finish and terminate the workflow. Passthrough to :meth:`~SWF.Client.respond_decision_task_completed`. :param task_token: The task_token returned from :meth:`~py_swf.clients.decision.DecisionClient.poll`. :type identity: string :param result: Freeform text that represents the final result of the workflow. :type identity: string :return: None :rtype: NoneType """ workflow_complete = build_workflow_complete(result) self.boto_client.respond_decision_task_completed( taskToken=task_token, decisions=[workflow_complete], )
def build_workflow_complete(result): return { 'decisionType': 'CompleteWorkflowExecution', 'completeWorkflowExecutionDecisionAttributes': { 'result': result, }, } def build_activity_task( activity_id, activity_name, activity_version, input, decision_config, schedule_to_close_timeout, schedule_to_start_timeout, start_to_close_timeout, heartbeat_timeout, ): if schedule_to_close_timeout is None: schedule_to_close_timeout = decision_config.schedule_to_close_timeout if schedule_to_start_timeout is None: schedule_to_start_timeout = decision_config.schedule_to_start_timeout if start_to_close_timeout is None: start_to_close_timeout = decision_config.start_to_close_timeout if heartbeat_timeout is None: heartbeat_timeout = decision_config.heartbeat_timeout return { 'decisionType': 'ScheduleActivityTask', 'scheduleActivityTaskDecisionAttributes': { 'activityType': { 'name': activity_name, 'version': activity_version, }, 'activityId': activity_id, 'input': input, 'taskList': { 'name': decision_config.task_list, }, 'scheduleToCloseTimeout': str(schedule_to_close_timeout), 'scheduleToStartTimeout': str(schedule_to_start_timeout), 'startToCloseTimeout': str(start_to_close_timeout), 'heartbeatTimeout': str(heartbeat_timeout), }, }