# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the ThreadsafeFace class which extends Face to provide the
main methods for NDN communication in a thread-safe manner.
"""
from pyndn.util.common import Common
from pyndn.transport.async_tcp_transport import AsyncTcpTransport
from pyndn.transport.async_unix_transport import AsyncUnixTransport
from pyndn.name import Name
from pyndn.interest_filter import InterestFilter
from pyndn.face import Face
[docs]class ThreadsafeFace(Face):
"""
Create a new ThreadsafeFace to use the asyncio loop to process events and
schedule communication calls. You must start the loop on the thread in which
you want the library to call communication callbacks such as onData and
onInterest.
In Python <= 3.2, you must have the prerequisite Trollius library. See the
INSTALL file for installation details. For usage, see the example
test_get_async_threadsafe.py.
This constructor has the forms ThreadsafeFace(loop),
ThreadsafeFace(loop, transport, connectionInfo) or
ThreadsafeFace(loop, host, port). If the default Face(loop) constructor is
used, if the forwarder's Unix socket file exists then connect using
AsyncUnixTransport, otherwise connect to "localhost" on port 6363 using
AsyncTcpTransport. You do not need to call processEvents since the
asyncio loop does all processing. (Exception: If you pass a transport that
is not an async transport like AsyncTcpTransport, then your application
needs to call processEvents.)
:param loop: The event loop, for example from asyncio.get_event_loop(). It
is the responsibility of the application to start and stop the loop.
:param Transport transport: An object of a subclass of Transport used for
communication. If you do not want to call processEvents, then the
transport should be an async transport like AsyncTcpTransport, in which
case the transport should use the same loop.
:param Transport.ConnectionInfo connectionInfo: An object of a subclass of
Transport.ConnectionInfo to be used to connect to the transport.
:param str host: In the Face(host, port) form of the constructor, host is
the host of the NDN hub to connect using TcpTransport.
:param int port: (optional) In the Face(host, port) form of the constructor,
port is the port of the NDN hub. If omitted. use 6363.
"""
def __init__(self, loop, arg1 = None, arg2 = None):
self._loop = loop
# Imitate the Face constructor, but use AsyncTcpTransport, etc.
if arg1 == None or Common.typeIsString(arg1):
filePath = ""
if arg1 == None and arg2 == None:
# Check if we can connect using UnixSocket.
filePath = self._getUnixSocketFilePathForLocalhost()
if filePath == "":
transport = AsyncTcpTransport(loop)
host = arg1 if arg1 != None else "localhost"
connectionInfo = AsyncTcpTransport.ConnectionInfo(
host, arg2 if type(arg2) is int else 6363)
else:
transport = AsyncUnixTransport(loop)
connectionInfo = AsyncUnixTransport.ConnectionInfo(filePath)
else:
transport = arg1
connectionInfo = arg2
super(ThreadsafeFace, self).__init__(transport, connectionInfo)
[docs] def expressInterest(
self, interestOrName, arg2, arg3 = None, arg4 = None, arg5 = None,
arg6 = None):
"""
Override to use the event loop given to the constructor to schedule
expressInterest to be called in a thread-safe manner. See
Face.expressInterest for calling details.
"""
args = self._getExpressInterestArgs(
interestOrName, arg2, arg3, arg4, arg5, arg6)
self._loop.call_soon_threadsafe(
self._node.expressInterest, args['pendingInterestId'],
args['interestCopy'], args['onData'], args['onTimeout'],
args['onNetworkNack'], args['wireFormat'], self)
return args['pendingInterestId']
[docs] def removePendingInterest(self, pendingInterestId):
"""
Override to use the event loop given to the constructor to schedule
removePendingInterest to be called in a thread-safe manner. See
Face.removePendingInterest for calling details.
"""
self._loop.call_soon_threadsafe(
self._node.removePendingInterest, pendingInterestId)
[docs] def registerPrefix(
self, prefix, onInterest, onRegisterFailed, onRegisterSuccess = None,
flags = None, wireFormat = None):
"""
Override to use the event loop given to the constructor to schedule
registerPrefix to be called in a thread-safe manner. See
Face.registerPrefix for calling details.
"""
registeredPrefixId = self._node.getNextEntryId()
# Node.registerPrefix requires a copy of the prefix.
# We make a copy so that the caller can change the original object while
# call_soon_threadsafe is waiting to process.
self._loop.call_soon_threadsafe(
self._registerPrefixHelper, registeredPrefixId, Name(prefix),
onInterest, onRegisterFailed, onRegisterSuccess, flags, wireFormat)
return registeredPrefixId
[docs] def removeRegisteredPrefix(self, registeredPrefixId):
"""
Override to use the event loop given to the constructor to schedule
removeRegisteredPrefix to be called in a thread-safe manner. See
Face.removeRegisteredPrefix for calling details.
"""
self._loop.call_soon_threadsafe(
self._node.removeRegisteredPrefix, registeredPrefixId)
[docs] def setInterestFilter(self, filterOrPrefix, onInterest):
"""
Override to use the event loop given to the constructor to schedule
setInterestFilter to be called in a thread-safe manner. See
Face.setInterestFilter for calling details.
"""
interestFilterId = self._node.getNextEntryId()
# If filterOrPrefix is already an InterestFilter, the InterestFilter
# constructor will make a copy as required by Node.setInterestFilter.
# We make a copy so that the caller can change the original object while
# call_soon_threadsafe is waiting to process.
filterCopy = InterestFilter(filterOrPrefix)
self._loop.call_soon_threadsafe(
self._node.setInterestFilter, interestFilterId, filterCopy,
onInterest, self)
return interestFilterId
[docs] def unsetInterestFilter(self, interestFilterId):
"""
Override to use the event loop given to the constructor to schedule
unsetInterestFilter to be called in a thread-safe manner. See
Face.unsetInterestFilter for calling details.
"""
self._loop.call_soon_threadsafe(
self._node.unsetInterestFilter, interestFilterId)
[docs] def send(self, encoding):
"""
Override to use the event loop given to the constructor to schedule
send to be called in a manner. See
Face.send for calling details.
"""
self._loop.call_soon_threadsafe(
super(ThreadsafeFace, self).send, encoding)
[docs] def callLater(self, delayMilliseconds, callback):
"""
Override to call callback() after the given delay, using
self._loop.call_later. This means that processEvents() is not needed to
handle interest timeouts. Even though this is public, it is not part of
the public API of Face.
:param float delayMilliseconds: The delay in milliseconds.
:param callback: This calls callback() after the delay.
:type callback: function object
"""
# Convert milliseconds to seconds.
self._loop.call_later(delayMilliseconds / 1000.0, callback)