Source code for py_swf.clients.activity_task
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from collections import namedtuple
from botocore.vendored.requests.exceptions import ReadTimeout
from py_swf.errors import NoTaskFound
__all__ = ['ActivityTaskClient', 'ActivityTask']
ActivityTask = namedtuple('ActivityTask', 'activity_id type version input task_token workflow_id workflow_run_id')
"""Contains the metadata to execute an activity task.
See the response syntax in :meth:`~SWF.Client.poll_for_activity_task`.
"""
[docs]class ActivityTaskClient(object):
"""A client that provides a pythonic API for polling and responding to activity tasks through an SWF boto3 client.
:param activity_task_config: Contains SWF values commonly used when making SWF api calls.
:type activity_task_config: :class:`~py_swf.config_definitions.ActivityTaskConfig`
:param boto_client: A raw SWF boto3 client.
:type boto_client: :class:`~SWF.Client`
"""
def __init__(self, activity_task_config, boto_client):
self.activity_task_config = activity_task_config
self.boto_client = boto_client
[docs] def poll(self, identity=None):
"""Opens a connection to AWS and long-polls for activity tasks.
When an activity is available, this function will return with exactly one activity task to execute.
Passthrough to :meth:`~SWF.Client.poll_for_activity_task`.
:param identity: A freeform text that identifies the client that performed the longpoll. Useful for debugging history.
:type identity: string
:return: An activity task to execute.
:rtype: ActivityTask
:raises py_swf.errors.NoTaskFound: Raised when polling for an activity times out without receiving any tasks.
"""
kwargs = dict(
domain=self.activity_task_config.domain,
taskList={
'name': self.activity_task_config.task_list,
},
)
if identity is not None:
kwargs['identity'] = identity
try:
results = self.boto_client.poll_for_activity_task(
**kwargs
)
except ReadTimeout as e:
raise NoTaskFound(e)
# Sometimes SWF gives us an incomplete response, ignore these.
if not results.get('taskToken', None):
raise NoTaskFound('Received results with no taskToken')
return ActivityTask(
activity_id=results['activityId'],
type=results['activityType']['name'],
version=results['activityType']['version'],
input=results['input'],
task_token=results['taskToken'],
workflow_id=results['workflowExecution']['workflowId'],
workflow_run_id=results['workflowExecution']['runId'],
)
[docs] def finish(self, task_token, result):
"""Responds to an activity task with a success.
Passthrough to :meth:`~SWF.Client.respond_activity_task_completed`.
:param task_token: The task_token returned from :meth:`~py_swf.clients.activity_task.ActivityTaskClient.poll`.
:type task_token: string
:param result: The result of the executed activity task.
:type result: string
:return: None
:rtype: NoneType
"""
self.boto_client.respond_activity_task_completed(
result=result,
taskToken=task_token,
)
[docs] def fail(self, task_token, reason, details=None):
"""Responds to an activity task with a failure.
Passthrough to :meth:`~SWF.Client.respond_activity_task_failed`.
:param task_token: The task_token returned from :meth:`~py_swf.clients.activity_task.ActivityTaskClient.poll`.
:type task_token: string
:param reason: Description of the error that may assist in diagnostics
:type reason: string
:param details: Optional. Detailed information about the failure
:type details: string
:return: None
:rtype: NoneType
"""
kwargs = dict(
reason=reason,
)
if details is not None:
kwargs["details"] = details
self.boto_client.respond_activity_task_failed(
taskToken=task_token,
**kwargs
)