# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the MemoryContentCache class which holds a set of Data
packets and answers an Interest to return the correct Data packet. The cache is
periodically cleaned up to remove each stale Data packet based on its
FreshnessPeriod (if it has one).
Note: This class is an experimental feature. See the API docs for more detail at
http://named-data.net/doc/ndn-ccl-api/memory-content-cache.html .
"""
import logging
import collections
from pyndn.forwarding_flags import ForwardingFlags
from pyndn.interest_filter import InterestFilter
from pyndn.encoding.wire_format import WireFormat
from pyndn.name import Name
from pyndn.util.common import Common
[docs]class MemoryContentCache(object):
"""
Create a new MemoryContentCache to use the given Face.
:param Face face: The Face to use to call registerPrefix and
setInterestFilter, and which will call this object's OnInterest callback.
:param float cleanupIntervalMilliseconds: (optional) The interval in
milliseconds between each check to clean up stale content in the cache. If
omitted, use a default of 1000 milliseconds. If this is a large number,
then effectively the stale content will not be removed from the cache.
"""
def __init__(self, face, cleanupIntervalMilliseconds = None):
if cleanupIntervalMilliseconds == None:
cleanupIntervalMilliseconds = 1000.0
self._face = face
self._cleanupIntervalMilliseconds = cleanupIntervalMilliseconds
self._nextCleanupTime = (Common.getNowMilliseconds() +
cleanupIntervalMilliseconds)
# The map key is the prefix.toUri(). The value is an OnInterest function.
self._onDataNotFoundForPrefix = {}
# elements are int
self._interestFilterIdList = []
# elements are int
self._registeredPrefixIdList = []
# elements are MemoryContentCache._Content
self._noStaleTimeCache = []
# elements are MemoryContentCache.StaleTimeContent
self._staleTimeCache = []
self._emptyComponent = Name.Component()
self._pendingInterestTable = [] # of PendingInterest
[docs] def registerPrefix(
self, prefix, onRegisterFailed, onRegisterSuccess = None,
onDataNotFound = None, flags = None, wireFormat = None):
"""
Call registerPrefix on the Face given to the constructor so that this
MemoryContentCache will answer interests whose name has the prefix.
Alternatively, if the Face's registerPrefix has already been called,
then you can call this object's setInterestFilter.
:param Name prefix: The Name for the prefix to register. This copies the
Name.
:param onRegisterFailed: If this fails to register the prefix for any
reason, this calls onRegisterFailed(prefix) where prefix is the prefix
given to registerPrefix.
NOTE: The library will log any exceptions raised by this callback, but
for better error handling the callback should catch and properly
handle any exceptions.
:type onRegisterFailed: function object
:param onRegisterSuccess: (optional) This calls
onRegisterSuccess[0](prefix, registeredPrefixId) when this receives a
success message from the forwarder. If onRegisterSuccess is omitted or
[None], this does not use it. (As a special case, this optional
parameter is supplied as a list of one function object, instead of
just a function object, in order to detect when it is used instead of
the following optional onDataNotFound function object.)
NOTE: The library will log any exceptions raised by this callback, but
for better error handling the callback should catch and properly
handle any exceptions.
:type onRegisterSuccess: list of one function object
:param onDataNotFound: (optional) If a data packet for an interest is
not found in the cache, this forwards the interest by calling
onDataNotFound(prefix, interest, face, interestFilterId, filter). Your
callback can find the Data packet for the interest and call
face.putData(data). If your callback cannot find the Data packet, it can
optionally call storePendingInterest(interest, face) to store the
pending interest in this object to be satisfied by a later call to
add(data). If you want to automatically store all pending interests,
you can simply use getStorePendingInterest() for onDataNotFound. If
onDataNotFound is omitted or None, this does not use it.
NOTE: The library will log any exceptions raised by this callback, but
for better error handling the callback should catch and properly
handle any exceptions.
:type onDataNotFound: function object
:param ForwardingFlags flags: (optional) See Face.registerPrefix.
:param wireFormat: (optional) See Face.registerPrefix.
:type wireFormat: A subclass of WireFormat
"""
arg3 = onRegisterSuccess
arg4 = onDataNotFound
arg5 = flags
arg6 = wireFormat
# arg3, arg4, arg5, arg6 may be:
# [OnRegisterSuccess], OnDataNotFound, ForwardingFlags, WireFormat
# [OnRegisterSuccess], OnDataNotFound, ForwardingFlags, None
# [OnRegisterSuccess], OnDataNotFound, WireFormat, None
# [OnRegisterSuccess], OnDataNotFound, None, None
# [OnRegisterSuccess], ForwardingFlags, WireFormat, None
# [OnRegisterSuccess], ForwardingFlags, None, None
# [OnRegisterSuccess], WireFormat, None, None
# [OnRegisterSuccess], None, None, None
# OnDataNotFound, ForwardingFlags, WireFormat, None
# OnDataNotFound, ForwardingFlags, None, None
# OnDataNotFound, WireFormat, None, None
# OnDataNotFound, None, None, None
# ForwardingFlags, WireFormat, None, None
# ForwardingFlags, None, None, None
# WireFormat, None, None, None
# None, None, None, None
if type(arg3) is list and len(arg3) == 1:
onRegisterSuccess = arg3[0]
else:
onRegisterSuccess = None
if isinstance(arg3, collections.Callable):
onDataNotFound = arg3
elif isinstance(arg4, collections.Callable):
onDataNotFound = arg4
else:
onDataNotFound = None
if isinstance(arg3, ForwardingFlags):
flags = arg3
elif isinstance(arg4, ForwardingFlags):
flags = arg4
elif isinstance(arg5, ForwardingFlags):
flags = arg5
else:
flags = ForwardingFlags()
if isinstance(arg3, WireFormat):
wireFormat = arg3
elif isinstance(arg4, WireFormat):
wireFormat = arg4
elif isinstance(arg5, WireFormat):
wireFormat = arg5
elif isinstance(arg6, WireFormat):
wireFormat = arg6
else:
# Don't use a default argument since getDefaultWireFormat can change.
wireFormat = WireFormat.getDefaultWireFormat()
if onDataNotFound != None:
self._onDataNotFoundForPrefix[prefix.toUri()] = onDataNotFound
registeredPrefixId = self._face.registerPrefix(
prefix, self._onInterest, onRegisterFailed, onRegisterSuccess,
flags, wireFormat)
self._registeredPrefixIdList.append(registeredPrefixId)
[docs] def setInterestFilter(self, filterOrPrefix, onDataNotFound = None):
"""
Call setInterestFilter on the Face given to the constructor so that this
MemoryContentCache will answer interests whose name matches the filter.
There are two forms of setInterestFilter.
The first form uses the exact given InterestFilter:
setInterestFilter(filter, [onDataNotFound]).
The second form creates an InterestFilter from the given prefix Name:
setInterestFilter(prefix, [onDataNotFound]).
:param InterestFilter filter: The InterestFilter with a prefix and
optional regex filter used to match the name of an incoming Interest.
This makes a copy of filter.
:param Name prefix: The Name prefix used to match the name of an
incoming Interest. This makes a copy of the Name.
:param onDataNotFound: (optional) If a data packet for an interest is
not found in the cache, this forwards the interest by calling
onDataNotFound(prefix, interest, face, interestFilterId, filter). Your
callback can find the Data packet for the interest and call
face.putData(data). If your callback cannot find the Data packet, it can
optionally call storePendingInterest(interest, face) to store the
pending interest in this object to be satisfied by a later call to
add(data). If you want to automatically store all pending interests,
you can simply use getStorePendingInterest() for onDataNotFound. If
onDataNotFound is omitted or None, this does not use it.
NOTE: The library will log any exceptions raised by this callback, but
for better error handling the callback should catch and properly
handle any exceptions.
:type onDataNotFound: function object
"""
if onDataNotFound != None:
if type(filterOrPrefix) is InterestFilter:
prefix = filterOrPrefix.getPrefix()
else:
prefix = filterOrPrefix
self._onDataNotFoundForPrefix[prefix.toUri()] = onDataNotFound
interestFilterId = self._face.setInterestFilter(
filterOrPrefix, self._onInterest)
self._interestFilterIdList.append(interestFilterId)
[docs] def unregisterAll(self):
"""
Call Face.unsetInterestFilter and Face.removeRegisteredPrefix for all
the prefixes given to the setInterestFilter and registerPrefix method on
this MemoryContentCache object so that it will not receive interests any
more. You can call this if you want to "shut down" this
MemoryContentCache while your application is still running.
"""
for interestFilterId in self._interestFilterIdList:
self._face.unsetInterestFilter(interestFilterId)
self._interestFilterIdList = []
for registeredPrefixId in self._registeredPrefixIdList:
self._face.removeRegisteredPrefix(registeredPrefixId)
self._registeredPrefixIdList = []
# Also clear each onDataNotFoundForPrefix given to registerPrefix.
self._onDataNotFoundForPrefix = {}
[docs] def add(self, data):
"""
Add the Data packet to the cache so that it is available to use to
answer interests. If data.getMetaInfo().getFreshnessPeriod() is not None,
set the staleness time to now plus data.getMetaInfo().getFreshnessPeriod(),
which is checked during cleanup to remove stale content. This also
checks if cleanupIntervalMilliseconds milliseconds have passed and
removes stale content from the cache. After removing stale content,
remove timed-out pending interests from storePendingInterest(), then if
the added Data packet satisfies any interest, send it through the
face and remove the interest from the pending interest table.
:param Data data: The Data packet object to put in the cache. This
copies the fields from the object.
"""
self._doCleanup()
if (data.getMetaInfo().getFreshnessPeriod() != None and
data.getMetaInfo().getFreshnessPeriod() >= 0.0):
# The content will go stale, so use staleTimeCache.
content = MemoryContentCache._StaleTimeContent(data)
# Insert into _staleTimeCache, sorted on content._staleTimeMilliseconds.
# Search from the back since we expect it to go there.
i = len(self._staleTimeCache) - 1
while i >= 0:
if (self._staleTimeCache[i]._staleTimeMilliseconds <=
content._staleTimeMilliseconds):
break
i -= 1
# Element i is the greatest less than or equal to
# content._staleTimeMilliseconds, so insert after it.
self._staleTimeCache.insert(i + 1, content)
else:
# The data does not go stale, so use _noStaleTimeCache.
self._noStaleTimeCache.append(MemoryContentCache._Content(data))
# Remove timed-out interests and check if the data packet matches any
# pending interest.
# Go backwards through the list so we can erase entries.
nowMilliseconds = Common.getNowMilliseconds()
for i in range(len(self._pendingInterestTable) - 1, -1, -1):
pendingInterest = self._pendingInterestTable[i]
if pendingInterest.isTimedOut(nowMilliseconds):
self._pendingInterestTable.pop(i)
continue
if pendingInterest.getInterest().matchesName(data.getName()):
try:
# Send to the same face from the original call to onInterest.
# wireEncode returns the cached encoding if available.
pendingInterest.getFace().send(data.wireEncode())
except Exception as ex:
logging.getLogger(__name__).error(
"Error in face.send: %s", str(ex))
return
# The pending interest is satisfied, so remove it.
self._pendingInterestTable.pop(i)
[docs] def storePendingInterest(self, interest, face):
"""
Store an interest from an OnInterest callback in the internal pending
interest table (normally because there is no Data packet available yet
to satisfy the interest). add(data) will check if the added Data packet
satisfies any pending interest and send it through the face.
:param Interest interest: The Interest for which we don't have a Data
packet yet. You should not modify the interest after calling this.
:param Face face: The Face with the connection which
received the interest. This comes from the OnInterest callback.
"""
self._pendingInterestTable.append(
self._PendingInterest(interest, face))
[docs] def getStorePendingInterest(self):
"""
Return a callback to use for onDataNotFound in registerPrefix which
simply calls storePendingInterest() to store the interest that doesn't
match a Data packet. add(data) will check if the added Data packet
satisfies any pending interest and send it.
:return: A callback to use for onDataNotFound in registerPrefix().
:rtype: function object
"""
return self._storePendingInterestCallback
def _storePendingInterestCallback(
self, prefix, interest, face, interestFilterId, filter):
"""
This is a private method to return from getStorePendingInterest(). We
need a separate method because the arguments are different from
storePendingInterest.
"""
self.storePendingInterest(interest, face)
def _onInterest(self, prefix, interest, face, interestFilterId, filter):
"""
This is the OnInterest callback which is called when the library
receives an interest whose name has the prefix given to registerPrefix.
First check if cleanupIntervalMilliseconds milliseconds have passed and
remove stale content from the cache. Then search the cache for the Data
packet, matching any interest selectors including ChildSelector, and
send the Data packet to the face. If no matching Data packet is in
the cache, call the callback in onDataNotFoundForPrefix (if defined).
"""
self._doCleanup()
selectedComponent = 0
selectedEncoding = None
# We need to iterate over both arrays.
totalSize = len(self._staleTimeCache) + len(self._noStaleTimeCache)
for i in range(totalSize):
if i < len(self._staleTimeCache):
content = self._staleTimeCache[i]
else:
# We have iterated over the first array. Get from the second.
content = self._noStaleTimeCache[i - len(self._staleTimeCache)]
if (interest.matchesName(content.getName())):
if (interest.getChildSelector() < 0):
# No child selector, so send the first match that we have found.
face.send(content.getDataEncoding())
return
else:
# Update selectedEncoding based on the child selector.
if (content.getName().size() > interest.getName().size()):
component = content.getName().get(interest.getName().size())
else:
component = self._emptyComponent
gotBetterMatch = False
if selectedEncoding == None:
# Save the first match.
gotBetterMatch = True
else:
if interest.getChildSelector() == 0:
# Leftmost child.
if component.compare(selectedComponent) < 0:
gotBetterMatch = True
else:
# Rightmost child.
if component.compare(selectedComponent) > 0:
gotBetterMatch = True
if gotBetterMatch:
selectedComponent = component
selectedEncoding = content.getDataEncoding()
if selectedEncoding != None:
# We found the leftmost or rightmost child.
face.send(selectedEncoding)
else:
# Call the onDataNotFound callback (if defined).
if prefix.toUri() in self._onDataNotFoundForPrefix:
try:
self._onDataNotFoundForPrefix[prefix.toUri()](
prefix, interest, face, interestFilterId, filter)
except:
logging.exception("Error in onDataNotFound")
def _doCleanup(self):
"""
Check if now is greater than nextCleanupTime and, if so, remove stale
content from staleTimeCache and reset nextCleanupTime based on
cleanupIntervalMilliseconds. Since add(Data) does a sorted insert into
staleTimeCache, the check for stale data is quick and does not require
searching the entire staleTimeCache.
"""
now = Common.getNowMilliseconds()
if now >= self._nextCleanupTime:
# staleTimeCache is sorted on staleTimeMilliseconds, so we only need
# to erase the stale entries at the front, then quit.
while (len(self._staleTimeCache) > 0 and
self._staleTimeCache[0].isStale(now)):
del self._staleTimeCache[0]
self._nextCleanupTime = now + self._cleanupIntervalMilliseconds
"""
_Content is a private class to hold the name and encoding for each entry in
the cache. This base class is for a Data packet without a FreshnessPeriod.
"""
class _Content(object):
"""
Create a new Content entry to hold data's name and wire encoding.
:param Data data: The Data packet whose name and wire encoding are
copied.
"""
def __init__(self, data):
# Copy the name.
self._name = Name(data.getName())
# wireEncode returns the cached encoding if available.
self._dataEncoding = data.wireEncode().buf()
def getName(self):
return self._name
def getDataEncoding(self):
return self._dataEncoding
"""
_StaleTimeContent extends _Content to include the staleTimeMilliseconds for
when this entry should be cleaned up from the cache.
"""
class _StaleTimeContent(_Content):
"""
Create a new StaleTimeContent to hold data's name and wire encoding as
well as the staleTimeMilliseconds which is now plus
data.getMetaInfo().getFreshnessPeriod().
:param Data data: The Data packet whose name and wire encoding are
copied.
"""
def __init__(self, data):
super(MemoryContentCache._StaleTimeContent, self).__init__(data)
# Set up staleTimeMilliseconds which is The time when the content
# becomse stale in milliseconds according to
# Common.getNowMilliseconds().
self._staleTimeMilliseconds = (Common.getNowMilliseconds() +
data.getMetaInfo().getFreshnessPeriod())
def isStale(self, nowMilliseconds):
"""
Check if this content is stale.
:param float nowMilliseconds: The current time in milliseconds from
Common.getNowMilliseconds().
:return: True if this content is stale, otherwise False.
:rtype: bool
"""
return self._staleTimeMilliseconds <= nowMilliseconds
class _PendingInterest(object):
"""
A PendingInterest holds an interest which onInterest received but could
not satisfy. When we add a new data packet to the cache, we will also
check if it satisfies a pending interest.
Create a new PendingInterest and set the _timeoutTime based on the
current time and the interest lifetime.
:param Interest interest: The interest.
:param Face face: The face from the onInterest callback.
If the interest is satisfied later by a new data packet, we will send
the data packet to the face.
"""
def __init__(self, interest, face):
self._interest = interest
self._face = face
# Set up _timeoutTimeMilliseconds.
if self._interest.getInterestLifetimeMilliseconds() >= 0.0:
self._timeoutTimeMilliseconds = (Common.getNowMilliseconds() +
self._interest.getInterestLifetimeMilliseconds())
else:
# No timeout.
self._timeoutTimeMilliseconds = -1.0
def getInterest(self):
"""
Return the interest given to the constructor.
"""
return self._interest
def getFace(self):
"""
Return the face given to the constructor.
"""
return self._face
def isTimedOut(self, nowMilliseconds):
"""
Check if this interest is timed out.
:param float nowMilliseconds: The current time in milliseconds from
Common.getNowMilliseconds.
:return: True if this interest timed out, otherwise False.
:rtype: bool
"""
return (self._timeoutTimeMilliseconds >= 0.0 and
nowMilliseconds >= self._timeoutTimeMilliseconds)