# coding: utf-8
from collections import OrderedDict
from datetime import datetime
from pprint import pformat
import cPickle as pickle
import inspect
import itertools
import json
import logging
import re
import jsonschema
import zmq
from .schema import (validate, get_connect_request, get_execute_request,
get_execute_reply, decode_content_data, mime_type)
# Create module-level logger.
logger = logging.getLogger(__name__)
[docs]class PluginBase(object):
'''
Plugin which can be connected to a network of other plugin instances
through a central **hub** (i.e., :class:`zmq_plugin.hub.Hub`).
Note
----
**Thread-safety**
All socket configuration, registration, etc. is performed *only* when the
`reset` method is called explicitly. Thus, all sockets are created in the
thread that calls the `reset` method.
By creating sockets in the thread the calls `reset`, it is straightforward
to, for example, run a `Plugin` in a separate process or thread.
Parameters
----------
name : str
Unique name across all plugins.
query_uri : str
The URI address of the **hub** query socket.
subscribe_options : dict, optional
See :data:`subscribe_options`.
Attributes
----------
callbacks : OrderedDict
Registry of functions to call upon receiving ``execute_reply``
messages, keyed by the ``session`` field of the
``execute_request``/``execute_reply`` header.
command_socket : zmq.Socket
Used to send command requests to the **hub** command socket.
execute_reply_id : itertools.count
Reply message count iterator.
Increments by one each time a reply message is sent.
host : str
Host name or IP address.
hub_name : str
Name of hub.
query_socket : zmq.Socket
Connects to the **hub** query socket to register and query information
about other sockets on the **hub**.
query_uri : str
The URI address of the query socket.
subscribe_options : dict
Each ``(key, value)`` item in dictionary is applied to
:attr:`subscribe_socket` using the :meth:`setsockopt` method.
This is useful, for instance, to set the subscription filter.
subscribe_socket : zmq.Socket
Hub broadcasts messages to all plugins over the publish socket.
transport : str
Transport (e.g., "tcp", "inproc").
'''
def __init__(self, name, query_uri, subscribe_options=None):
self.name = name
host_cre = re.compile(r'^(?P<transport>[^:]+)://'
r'(?P<host>[^:]+)'
r'(:(?P<port>\d+)?)')
match = host_cre.search(query_uri)
self.transport = match.group('transport')
self.host = match.group('host')
self.hub_name = 'hub'
self.query_uri = query_uri
self.query_socket = None
self.command_socket = None
self.subscribe_options = subscribe_options or {}
self.subscribe_socket = None
self.execute_reply_id = itertools.count(1)
# Registry of functions to call upon receiving `execute_reply`
# messages, keyed by the `session` field of the
# `execute_request`/`execute_reply` header.
self.callbacks = OrderedDict()
[docs] def close(self):
'''
Close all sockets.
'''
for socket in (self.query_socket, self.command_socket,
self.subscribe_socket):
if socket is not None:
socket.close()
[docs] def reset(self):
'''
Reset the plugin state.
This includes:
- Resetting the execute reply identifier counter.
- Resetting the :attr:`command_socket`, :attr:`query_socket`, and
:attr:`subscribe_socket` sockets.
- Registering with the central **hub**.
'''
self.execute_reply_id = itertools.count(1)
self.reset_query_socket()
# Get socket info and **hub** name.
connect_request = get_connect_request(self.name, self.hub_name)
reply = self.query(connect_request)
self.hub_name = bytes(reply['header']['source'])
self.hub_socket_info = reply['content']
# Initialize sockets using obtained socket info.
self.reset_subscribe_socket()
self.reset_command_socket()
# Explicitly register with the **hub** and retrieve plugin registry.
self.register()
[docs] def register(self):
'''
Register as a plugin with the central **hub**.
Registration also updates the local plugin registry, which contains the
name of all plugins registered with the **hub** at the time of
registration.
Note that this method is safe to execute multiple times. This provides
a mechanism to refresh the local plugin registry.
'''
connect_request = get_execute_request(self.name, self.hub_name,
'register')
reply = self.query(connect_request)
self.plugin_registry = decode_content_data(reply)
self.logger.info('Registered with hub at "%s"', self.query_uri)
###########################################################################
# Query socket methods
[docs] def reset_query_socket(self):
'''
Create and configure :attr:`query_socket` socket (existing socket is
destroyed if it exists).
'''
context = zmq.Context.instance()
if self.query_socket is not None:
self.query_socket = None
self.query_socket = zmq.Socket(context, zmq.REQ)
self.query_socket.connect(self.query_uri)
[docs] def query(self, request, **kwargs):
'''
Send request message to **hub**, receive response, and return decoded
reply message.
Parameters
----------
request dict
``<...>_request`` message.
'''
try:
self.query_socket.send(json.dumps(request))
reply = json.loads(self.query_socket.recv(**kwargs))
validate(reply)
return reply
except:
self.logger.error('Query error', exc_info=True)
self.reset_query_socket()
raise
@property
def logger(self):
'''
Return logger configured with a name in the following form:
<module_name>.<class_name>.<method_name>->"<self.name>"
'''
return logging.getLogger('.'.join((__name__, str(type(self).__name__),
inspect.stack()[1][3]))
+ '->"%s"' % self.name)
###########################################################################
# Command socket methods
[docs] def reset_command_socket(self):
'''
Create and configure :attr:`command_socket` socket (existing socket is
destroyed if it exists).
'''
context = zmq.Context.instance()
if self.command_socket is not None:
self.command_socket = None
# Create command socket and assign name as identity.
self.command_socket = zmq.Socket(context, zmq.ROUTER)
self.command_socket.setsockopt(zmq.IDENTITY, bytes(self.name))
command_uri = '%s://%s:%s' % (self.transport, self.host,
self.hub_socket_info['command']['port'])
self.command_socket.connect(command_uri)
self.logger.info('Connected command socket to "%s"', command_uri)
[docs] def send_command(self, request):
'''
Send command message request through **hub**.
Parameters
----------
request : dict
Command request message.
'''
self.command_socket.send_multipart(map(str, [self.hub_name, '',
json.dumps(request)]))
[docs] def on_command_recv(self, frames):
'''
Process multi-part message from command socket.
This method may, for example, be called asynchronously as a callback in
run loop through a :class:`zmq.eventloop.ZMQStream` configuration.
See `here`_ for more details.
Parameters
----------
msg_frames : list
Multi-part ZeroMQ message.
.. _`here`: http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html
'''
try:
message_str = frames[-1]
message = json.loads(message_str)
validate(message)
except jsonschema.ValidationError:
self.logger.error('unexpected message', exc_info=True)
message_type = message['header']['msg_type']
if message_type == 'execute_request':
self._process__execute_request(message)
elif message_type == 'execute_reply':
self._process__execute_reply(message)
else:
self.logger.error('Unrecognized message type: %s', message_type)
def _process__execute_reply(self, reply):
'''
Process validated `execute_reply` message.
If a callback function was registered during the execution request call
the callback function on the reply message.
Args:
reply (dict) : `execute_reply` message
Returns:
None
'''
try:
session = reply['header']['session']
if session in self.callbacks:
# A callback was registered for the corresponding request.
func = self.callbacks[session]
# Remove callback.
del self.callbacks[session]
# Call callback with reply.
func(reply)
else:
# No callback registered for session.
pass
except:
self.logger.error('Processing error.', exc_info=True)
def _process__execute_request(self, request):
'''
Process validated `execute_request` message, which includes the name of
the command to execute.
If a method with the name `on_execute__<command>` exists, call the
method on the `request` and send the return value wrapped in an
`execute_reply` message to the source of the request.
If the no matching method exists or if an exception is encountered
while processing the command, send `execute_reply` message with
corresponding error information to the source of the request.
Args:
reply (dict) : `execute_request` message
Returns:
None
'''
try:
func = getattr(self, 'on_execute__' +
request['content']['command'], None)
if func is None:
data = None
error = NameError('Unrecognized command: %s' %
request['content']['command'])
mime_type = None
else:
data = func(request)
# If no `mime_type` was specified, pickle data.
mime_type = getattr(func, 'mime_type',
'application/python-pickle')
error = None
reply = get_execute_reply(request, self.execute_reply_id.next(),
data=data, error=error,
mime_type=mime_type)
validate(reply)
reply_str = json.dumps(reply)
except (Exception, ), exception:
import traceback
reply = get_execute_reply(request, self.execute_reply_id.next(),
error=traceback.format_exc())
#error=exception)
reply_str = json.dumps(reply)
self.command_socket.send_multipart([self.hub_name, '', reply_str])
###########################################################################
# Subscribe socket methods
[docs] def reset_subscribe_socket(self):
'''
Create and configure :attr:`subscribe_socket` socket (existing socket
is destroyed if it exists).
'''
context = zmq.Context.instance()
if self.subscribe_socket is not None:
self.subscribe_socket = None
# Create subscribe socket and assign name as identity.
self.subscribe_socket = zmq.Socket(context, zmq.SUB)
if self.subscribe_options:
for k, v in self.subscribe_options.iteritems():
self.subscribe_socket.setsockopt(k, v)
print 'set sock opt', k, v
subscribe_uri = '%s://%s:%s' % (self.transport, self.host,
self.hub_socket_info['publish']
['port'])
self.subscribe_socket.connect(subscribe_uri)
self.logger.info('Connected subscribe socket to "%s"', subscribe_uri)
[docs] def on_subscribe_recv(self, msg_frames):
'''
Process multi-part message from subscribe socket.
This method may, for example, be called asynchronously as a callback in
run loop through a :obj:`zmq.eventloop.ZMQStream` configuration.
See `here`_ for more details.
Parameters
----------
msg_frames : list
Multi-part ZeroMQ message.
.. _`here`: http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html
'''
try:
logger.info(pformat(pickle.loads(msg_frames[0])))
except:
logger.error('Deserialization error', exc_info=True)
###########################################################################
# Execute methods
[docs] def execute_async(self, target_name, command, callback=None, silent=False,
extra_kwargs=None, **kwargs):
'''
Send request to execute the specified command to the identified target.
**N.B.,** this method is non-blocking, i.e., it does not wait for a
response. For a blocking wrapper around this method, see `execute`
method below.
Parameters
----------
target_name : str
Name (i.e., ZeroMQ identity) of the target.
command : str
Name of command to execute.
callback : function, optional
Function to call on received response.
Callback signature is ``callback_func(reply)``, where ``reply`` is
an ``execute_reply`` message.
Callback is added to :attr:`callbacks`, keyed by session identifier
of request.
silent : bool, optional
A boolean flag which, if ``True``, signals the plugin to execute
this code as quietly as possible.
If :data:`silent` is set to ``True``, reply will *not* broadcast
output on the IOPUB channel.
extra_kwargs : dict
Extra keyword arguments to be passed to command.
Useful to, for example, include keyword arguments whose name
conflict with arguments of :meth:`execute_async`/:meth:`execute`.
**kwargs : dict
Keyword arguments for command.
Returns
-------
str
Session identifier for request.
See also
--------
:meth:`execute`
'''
if extra_kwargs is not None:
kwargs.update(extra_kwargs)
request = get_execute_request(self.name, target_name, command,
data=kwargs, silent=silent)
if callback is not None:
self.callbacks[request['header']['session']] = callback
self.send_command(request)
return request['header']['session']
[docs] def execute(self, target_name, command, timeout_s=None, wait_func=None,
silent=False, extra_kwargs=None, **kwargs):
'''
Send request to execute the specified command to the identified target
and return decoded result object.
**N.B.,** this method blocking, i.e., it waits for a response. See
`execute_async` method for non-blocking variant with `callback`
argument.
Parameters
----------
target_name : str
Name (i.e., ZeroMQ identity) of the target.
command : str
Name of command to execute.
timeout_s : float, optional
If :data:`timeout_s` is set, :class:`IOError` is raised if response
is not received within :data:`timeout_s` seconds.
wait_func : function, optional
If :data:`wait_func` is set, the :data:`wait_func` function is
called repeatedly until response is received.
This is useful to prevent :meth:`execute` from completely blocking
thread execution.
silent : bool, optional
A boolean flag which, if ``True``, signals the plugin to execute
this code as quietly as possible.
If :data:`silent` is set to ``True``, reply will *not* broadcast
output on the IOPUB channel.
extra_kwargs : dict
Extra keyword arguments to be passed to command.
Useful to, for example, include keyword arguments whose name
conflict with arguments of :meth:`execute_async`/:meth:`execute`.
**kwargs : dict
Keyword arguments for command.
Returns
-------
object
Result from remotely executed command.
See also
--------
:meth:`execute_async`
'''
# Create result object that will be updated when response is received.
result = {}
def _callback(reply):
try:
result['data'] = decode_content_data(reply)
except (Exception, ), exception:
result['error'] = exception
session = self.execute_async(target_name, command, callback=_callback,
silent=silent, extra_kwargs=extra_kwargs,
**kwargs)
start = datetime.now()
while session in self.callbacks:
try:
msg_frames = self.command_socket.recv_multipart(zmq.NOBLOCK)
except zmq.Again:
wait_duration_s = (datetime.now() - start).total_seconds()
if timeout_s is not None and (wait_duration_s > timeout_s):
raise IOError('Timed out waiting for response for request '
'(session="%s")' % session)
if wait_func is not None:
wait_func(wait_duration_s)
continue
self.on_command_recv(msg_frames)
if 'error' in result:
raise result['error']
return result['data']
[docs]class Plugin(PluginBase):
@mime_type('text/plain')
[docs] def on_execute__ping(self, request):
return 'pong'