Source code for pyndn.util.segment_fetcher

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2015-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
# Author: From ndn-cxx util/segment-fetcher https://github.com/named-data/ndn-cxx
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

"""
This module defines the SegmentFetcher class which is a utility class to fetch
the latest version of segmented data.

SegmentFetcher assumes that the data is named /<prefix>/<version>/<segment>,
where:

- <prefix> is the specified name prefix,
- <version> is an unknown version that needs to be discovered, and
- <segment> is a segment number. (The number of segments is unknown and is
  controlled by the `FinalBlockId` field in at least the last Data packet.

The following logic is implemented in SegmentFetcher:

1. Express the first Interest to discover the version:

   >> Interest: /<prefix>?ChildSelector=1&MustBeFresh=true

2. Infer the latest version of the Data: <version> = Data.getName().get(-2)

3. If the segment number in the retrieved packet == 0, go to step 5.

4. Send an Interest for segment 0:

   >> Interest: /<prefix>/<version>/<segment=0>

5. Keep sending Interests for the next segment while the retrieved Data does
   not have a FinalBlockId or the FinalBlockId != Data.getName().get(-1).

   >> Interest: /<prefix>/<version>/<segment=(N+1))>

6. Call the onComplete callback with a Blob that concatenates the content
   from all the segmented objects.

If an error occurs during the fetching process, the onError callback is called
with a proper error code.  The following errors are possible:

- `INTEREST_TIMEOUT`: if any of the Interests times out
- `DATA_HAS_NO_SEGMENT`: if any of the retrieved Data packets don't have a segment
  as the last component of the name (not counting the implicit digest)
- `SEGMENT_VERIFICATION_FAILED`: if any retrieved segment fails
  the user-provided VerifySegment callback

In order to validate individual segments, a verifySegment callback needs to
be specified. If the callback returns False, the fetching process is aborted
with SEGMENT_VERIFICATION_FAILED. If data validation is not required, the
provided DontVerifySegment object can be used.

Example:
    def onComplete(content):
        ...

    def onError(errorCode, message):
        ...

    interest = Interest(Name("/data/prefix"))
    interest.setInterestLifetimeMilliseconds(1000)

    SegmentFetcher.fetch(
      face, interest, SegmentFetcher.DontVerifySegment, onComplete, onError)
"""

import logging
from pyndn.interest import Interest
from pyndn.util.blob import Blob

[docs]class SegmentFetcher(object): """ A private constructor to create a new SegmentFetcher to use the Face. An application should use SegmentFetcher.fetch. :param Face face: This calls face.expressInterest to fetch more segments. :param verifySegment: When a Data packet is received this calls verifySegment(data) where data is a Data object. If it returns False then abort fetching and call onError with SegmentFetcher.ErrorCode.SEGMENT_VERIFICATION_FAILED. :type verifySegment: function object :param onComplete: When all segments are received, call onComplete(content) where content is a Blob which has the concatenation of the content of all the segments. NOTE: The library will log any exceptions raised by this callback, but for better error handling the callback should catch and properly handle any exceptions. :type onComplete: function object :param onError: Call onError.onError(errorCode, message) for timeout or an error processing segments. errorCode is a value from SegmentFetcher.ErrorCode and message is a related string. NOTE: The library will log any exceptions raised by this callback, but for better error handling the callback should catch and properly handle any exceptions. :type onError: function object """ def __init__(self, face, verifySegment, onComplete, onError): self._face = face self._verifySegment = verifySegment self._onComplete = onComplete self._onError = onError self._contentParts = [] # of Blob
[docs] class ErrorCode(object): """ An ErrorCode value is passed in the onError callback. """ INTEREST_TIMEOUT = 1 DATA_HAS_NO_SEGMENT = 2 SEGMENT_VERIFICATION_FAILED = 3
@staticmethod
[docs] def DontVerifySegment(data): """ DontVerifySegment may be used in fetch to skip validation of Data packets. """ return True
@staticmethod
[docs] def fetch(face, baseInterest, verifySegment, onComplete, onError): """ Initiate segment fetching. For more details, see the documentation for the module. :param Face face: This calls face.expressInterest to fetch more segments. :param Interest baseInterest: An Interest for the initial segment of the requested data, where baseInterest.getName() has the name prefix. This interest may include a custom InterestLifetime and selectors that will propagate to all subsequent Interests. The only exception is that the initial Interest will be forced to include selectors "ChildSelector=1" and "MustBeFresh=true" which will be turned off in subsequent Interests. :param verifySegment: When a Data packet is received this calls verifySegment(data) where data is a Data object. If it returns False then abort fetching and call onError with SegmentFetcher.ErrorCode.SEGMENT_VERIFICATION_FAILED. If data validation is not required, use SegmentFetcher.DontVerifySegment. :type verifySegment: function object :param onComplete: When all segments are received, call onComplete(content) where content is a Blob which has the concatenation of the content of all the segments. NOTE: The library will log any exceptions raised by this callback, but for better error handling the callback should catch and properly handle any exceptions. :type onComplete: function object :param onError: Call onError.onError(errorCode, message) for timeout or an error processing segments. errorCode is a value from SegmentFetcher.ErrorCode and message is a related string. NOTE: The library will log any exceptions raised by this callback, but for better error handling the callback should catch and properly handle any exceptions. :type onError: function object """ SegmentFetcher(face, verifySegment, onComplete, onError)._fetchFirstSegment( baseInterest)
def _fetchFirstSegment(self, baseInterest): interest = Interest(baseInterest) interest.setChildSelector(1) interest.setMustBeFresh(True) self._face.expressInterest(interest, self._onData, self._onTimeout) def _fetchNextSegment(self, originalInterest, dataName, segment): # Start with the original Interest to preserve any special selectors. interest = Interest(originalInterest) # Changing a field clears the nonce so that the library will # generate a new one. interest.setMustBeFresh(False) interest.setName(dataName.getPrefix(-1).appendSegment(segment)) self._face.expressInterest(interest, self._onData, self._onTimeout) def _onData(self, originalInterest, data): if not self._verifySegment(data): try: self._onError( self.ErrorCode.SEGMENT_VERIFICATION_FAILED, "Segment verification failed") except: logging.exception("Error in onError") return if not self._endsWithSegmentNumber(data.getName()): # We don't expect a name without a segment number. Treat it as # a bad packet. try: self._onError( self.ErrorCode.DATA_HAS_NO_SEGMENT, "Got an unexpected packet without a segment number: " + data.getName().toUri()) except: logging.exception("Error in onError") else: currentSegment = 0 try: currentSegment = data.getName().get(-1).toSegment() except RuntimeError as ex: try: self._onError( self.ErrorCode.DATA_HAS_NO_SEGMENT, "Error decoding the name segment number " + data.getName().get(-1).toEscapedString() + ": " + str(ex)) except: logging.exception("Error in onError") return expectedSegmentNumber = len(self._contentParts) if currentSegment != expectedSegmentNumber: # Try again to get the expected segment. This also includes # the case where the first segment is not segment 0. self._fetchNextSegment( originalInterest, data.getName(), expectedSegmentNumber) else: # Save the content and check if we are finished. self._contentParts.append(data.getContent()) if data.getMetaInfo().getFinalBlockId().getValue().size() > 0: finalSegmentNumber = 0 try: finalSegmentNumber = (data.getMetaInfo() .getFinalBlockId().toSegment()) except RuntimeError as ex: try: self._onError( self.ErrorCode.DATA_HAS_NO_SEGMENT, "Error decoding the FinalBlockId segment number " + data.getMetaInfo().getFinalBlockId().toEscapedString() + ": " + str(ex)) except: logging.exception("Error in onError") return if currentSegment == finalSegmentNumber: # We are finished. # Get the total size and concatenate to get content. totalSize = 0 for i in range(len(self._contentParts)): totalSize += self._contentParts[i].size() content = bytearray(totalSize) offset = 0 for i in range(len(self._contentParts)): part = self._contentParts[i] content[offset:offset + part.size()] = part.buf() offset += part.size() try: self._onComplete(Blob(content, False)) except: logging.exception("Error in onComplete") return # Fetch the next segment. self._fetchNextSegment( originalInterest, data.getName(), expectedSegmentNumber + 1) def _onTimeout(self, interest): try: self._onError( self.ErrorCode.INTEREST_TIMEOUT, "Time out for interest " + interest.getName().toUri()) except: logging.exception("Error in onError") @staticmethod def _endsWithSegmentNumber(name): """ Check if the last component in the name is a segment number. :param Name name: The name to check. :return: True if the name ends with a segment number, otherwise False. :rtype: bool """ return (name.size() >= 1 and name.get(-1).getValue().size() >= 1 and name.get(-1).getValue().buf()[0] == 0)