Source code for pyndn.encoding.tlv_0_1_1_wire_format

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

from random import SystemRandom
from pyndn.name import Name
from pyndn.exclude import Exclude
from pyndn.name import Name
from pyndn.meta_info import ContentType
from pyndn.forwarding_flags import ForwardingFlags
from pyndn.key_locator import KeyLocatorType
from pyndn.digest_sha256_signature import DigestSha256Signature
from pyndn.sha256_with_rsa_signature import Sha256WithRsaSignature
from pyndn.util.blob import Blob
from pyndn.encoding.wire_format import WireFormat
from pyndn.encoding.tlv.tlv_encoder import TlvEncoder
from pyndn.encoding.tlv.tlv_decoder import TlvDecoder
from pyndn.encoding.tlv.tlv import Tlv

# The Python documentation says "Use SystemRandom if you require a
#   cryptographically secure pseudo-random number generator."
# http://docs.python.org/2/library/random.html
_systemRandom = SystemRandom()

"""
This module defines the Tlv0_1_1WireFormat class which extends WireFormat to
override its methods to implment encoding and decoding Interest, Data, etc.
with the NDN-TLV wire format, version 0.1.1.
"""

[docs]class Tlv0_1_1WireFormat(WireFormat): _instance = None
[docs] def encodeName(self, name): """ Encode name in NDN-TLV and return the encoding. :param Name name: The Name object to encode. :return: A Blob containing the encoding. :rtype: Blob """ encoder = TlvEncoder(256) self._encodeName(name, encoder) return Blob(encoder.getOutput(), False)
[docs] def decodeName(self, name, input): """ Decode input as an NDN-TLV name and set the fields of the Name object. :param Name name: The Name object whose fields are updated. :param input: The array with the bytes to decode. :type input: An array type with int elements """ decoder = TlvDecoder(input) self._decodeName(name, decoder)
[docs] def encodeInterest(self, interest): """ Encode interest in NDN-TLV and return the encoding. :param Interest interest: The Interest object to encode. :return: A Tuple of (encoding, signedPortionBeginOffset, signedPortionEndOffset) where encoding is a Blob containing the encoding, signedPortionBeginOffset is the offset in the encoding of the beginning of the signed portion, and signedPortionEndOffset is the offset in the encoding of the end of the signed portion. The signed portion starts from the first name component and ends just before the final name component (which is assumed to be a signature for a signed interest). :rtype: (Blob, int, int) """ encoder = TlvEncoder(256) saveLength = len(encoder) # Encode backwards. encoder.writeOptionalNonNegativeIntegerTlvFromFloat( Tlv.InterestLifetime, interest.getInterestLifetimeMilliseconds()) # Encode the Nonce as 4 bytes. if interest.getNonce().size() == 0: # This is the most common case. Generate a nonce. nonce = bytearray(4) for i in range(4): nonce[i] = _systemRandom.randint(0, 0xff) encoder.writeBlobTlv(Tlv.Nonce, nonce) elif interest.getNonce().size() < 4: nonce = bytearray(4) # Copy existing nonce bytes. nonce[:interest.getNonce().size()] = interest.getNonce().buf() # Generate random bytes for remaining bytes in the nonce. for i in range(interest.getNonce().size(), 4): nonce[i] = _systemRandom.randint(0, 0xff) encoder.writeBlobTlv(Tlv.Nonce, nonce) elif interest.getNonce().size() == 4: # Use the nonce as-is. encoder.writeBlobTlv(Tlv.Nonce, interest.getNonce().buf()) else: # Truncate. encoder.writeBlobTlv(Tlv.Nonce, interest.getNonce().buf()[:4]) self._encodeSelectors(interest, encoder) (tempSignedPortionBeginOffset, tempSignedPortionEndOffset) = \ self._encodeName(interest.getName(), encoder) signedPortionBeginOffsetFromBack = (len(encoder) - tempSignedPortionBeginOffset) signedPortionEndOffsetFromBack = (len(encoder) - tempSignedPortionEndOffset) encoder.writeTypeAndLength(Tlv.Interest, len(encoder) - saveLength) signedPortionBeginOffset = (len(encoder) - signedPortionBeginOffsetFromBack) signedPortionEndOffset = len(encoder) - signedPortionEndOffsetFromBack return (Blob(encoder.getOutput(), False), signedPortionBeginOffset, signedPortionEndOffset)
[docs] def decodeInterest(self, interest, input): """ Decode input as an NDN-TLV interest and set the fields of the interest object. :param Interest interest: The Interest object whose fields are updated. :param input: The array with the bytes to decode. :type input: An array type with int elements :return: A Tuple of (signedPortionBeginOffset, signedPortionEndOffset) where signedPortionBeginOffset is the offset in the encoding of the beginning of the signed portion, and signedPortionEndOffset is the offset in the encoding of the end of the signed portion. The signed portion starts from the first name component and ends just before the final name component (which is assumed to be a signature for a signed interest). :rtype: (int, int) """ decoder = TlvDecoder(input) endOffset = decoder.readNestedTlvsStart(Tlv.Interest) offsets = self._decodeName(interest.getName(), decoder) if decoder.peekType(Tlv.Selectors, endOffset): self._decodeSelectors(interest, decoder) # Require a Nonce, but don't force it to be 4 bytes. nonce = Blob(decoder.readBlobTlv(Tlv.Nonce)) interest.setInterestLifetimeMilliseconds( decoder.readOptionalNonNegativeIntegerTlvAsFloat (Tlv.InterestLifetime, endOffset)) # Set the nonce last because setting other interest fields clears it. interest.setNonce(nonce) decoder.finishNestedTlvs(endOffset) return offsets
[docs] def encodeData(self, data): """ Encode data in NDN-TLV and return the encoding and signed offsets. :param Data data: The Data object to encode. :return: A Tuple of (encoding, signedPortionBeginOffset, signedPortionEndOffset) where encoding is a Blob containing the encoding, signedPortionBeginOffset is the offset in the encoding of the beginning of the signed portion, and signedPortionEndOffset is the offset in the encoding of the end of the signed portion. :rtype: (Blob, int, int) """ encoder = TlvEncoder(1500) saveLength = len(encoder) # Encode backwards. encoder.writeBlobTlv(Tlv.SignatureValue, data.getSignature().getSignature().buf()) signedPortionEndOffsetFromBack = len(encoder) self._encodeSignatureInfo(data.getSignature(), encoder) encoder.writeBlobTlv(Tlv.Content, data.getContent().buf()) self._encodeMetaInfo(data.getMetaInfo(), encoder) self._encodeName(data.getName(), encoder) signedPortionBeginOffsetFromBack = len(encoder) encoder.writeTypeAndLength(Tlv.Data, len(encoder) - saveLength) signedPortionBeginOffset = (len(encoder) - signedPortionBeginOffsetFromBack) signedPortionEndOffset = len(encoder) - signedPortionEndOffsetFromBack return (Blob(encoder.getOutput(), False), signedPortionBeginOffset, signedPortionEndOffset)
[docs] def decodeData(self, data, input): """ Decode input as an NDN-TLV data packet, set the fields in the data object, and return the signed offsets. :param Data data: The Data object whose fields are updated. :param input: The array with the bytes to decode. :type input: An array type with int elements :return: A Tuple of (signedPortionBeginOffset, signedPortionEndOffset) where signedPortionBeginOffset is the offset in the encoding of the beginning of the signed portion, and signedPortionEndOffset is the offset in the encoding of the end of the signed portion. :rtype: (int, int) """ decoder = TlvDecoder(input) endOffset = decoder.readNestedTlvsStart(Tlv.Data) signedPortionBeginOffset = decoder.getOffset() self._decodeName(data.getName(), decoder) self._decodeMetaInfo(data.getMetaInfo(), decoder) data.setContent(Blob(decoder.readBlobTlv(Tlv.Content))) self._decodeSignatureInfo(data, decoder) signedPortionEndOffset = decoder.getOffset() data.getSignature().setSignature(Blob(decoder.readBlobTlv(Tlv.SignatureValue))) decoder.finishNestedTlvs(endOffset) return (signedPortionBeginOffset, signedPortionEndOffset)
[docs] def encodeControlParameters(self, controlParameters): """ Encode controlParameters and return the encoding. :param controlParameters: The ControlParameters object to encode. :type controlParameters: ControlParameters :return: A Blob containing the encoding. :rtype: Blob """ encoder = TlvEncoder(256) saveLength = len(encoder) # Encode backwards. encoder.writeOptionalNonNegativeIntegerTlvFromFloat( Tlv.ControlParameters_ExpirationPeriod, controlParameters.getExpirationPeriod()) if controlParameters.getStrategy().size() > 0: strategySaveLength = len(encoder) self._encodeName(controlParameters.getStrategy(), encoder) encoder.writeTypeAndLength( Tlv.ControlParameters_Strategy, len(encoder) - strategySaveLength) flags = controlParameters.getForwardingFlags().getNfdForwardingFlags() if (flags != ForwardingFlags().getNfdForwardingFlags()): # The flags are not the default value. encoder.writeNonNegativeIntegerTlv( Tlv.ControlParameters_Flags, flags) encoder.writeOptionalNonNegativeIntegerTlv( Tlv.ControlParameters_Cost, controlParameters.getCost()) encoder.writeOptionalNonNegativeIntegerTlv( Tlv.ControlParameters_Origin, controlParameters.getOrigin()) encoder.writeOptionalNonNegativeIntegerTlv( Tlv.ControlParameters_LocalControlFeature, controlParameters.getLocalControlFeature()) if len(controlParameters.getUri()) != 0: encoder.writeBlobTlv( Tlv.ControlParameters_Uri, Blob(controlParameters.getUri()).buf()) encoder.writeOptionalNonNegativeIntegerTlv( Tlv.ControlParameters_FaceId, controlParameters.getFaceId()) if controlParameters.getName() != None: self._encodeName(controlParameters.getName(), encoder) encoder.writeTypeAndLength(Tlv.ControlParameters_ControlParameters, len(encoder) - saveLength) return Blob(encoder.getOutput(), False)
[docs] def decodeControlParameters(self, controlParameters, input): """ Decode input as an NDN-TLV ControlParameters and set the fields of the controlParameters object. :param ControlParameters controlParameters: The ControlParameters object whose fields are updated. :param input: The array with the bytes to decode. :type input: An array type with int elements """ controlParameters.clear() decoder = TlvDecoder(input) endOffset = decoder.readNestedTlvsStart( Tlv.ControlParameters_ControlParameters) # decode name if decoder.peekType(Tlv.Name, endOffset): name = Name() self._decodeName(name, decoder) controlParameters.setName(name) # decode face ID controlParameters.setFaceId(decoder.readOptionalNonNegativeIntegerTlv (Tlv.ControlParameters_FaceId, endOffset)) # decode URI if decoder.peekType(Tlv.ControlParameters_Uri, endOffset): uri = Blob( decoder.readOptionalBlobTlv(Tlv.ControlParameters_Uri, endOffset), False) controlParameters.setUri(str(uri)) # decode integers controlParameters.setLocalControlFeature( decoder.readOptionalNonNegativeIntegerTlv (Tlv.ControlParameters_LocalControlFeature, endOffset)) controlParameters.setOrigin( decoder.readOptionalNonNegativeIntegerTlv (Tlv.ControlParameters_Origin, endOffset)) controlParameters.setCost( decoder.readOptionalNonNegativeIntegerTlv (Tlv.ControlParameters_Cost, endOffset)) # set forwarding flags if decoder.peekType(Tlv.ControlParameters_Flags, endOffset): flags = ForwardingFlags() flags.setNfdForwardingFlags( decoder.readNonNegativeIntegerTlv(Tlv.ControlParameters_Flags)) controlParameters.setForwardingFlags(flags) # decode strategy if decoder.peekType(Tlv.ControlParameters_Strategy, endOffset): strategyEndOffset = decoder.readNestedTlvsStart( Tlv.ControlParameters_Strategy) self._decodeName(controlParameters.getStrategy(), decoder) decoder.finishNestedTlvs(strategyEndOffset) # decode expiration period controlParameters.setExpirationPeriod( decoder.readOptionalNonNegativeIntegerTlv( Tlv.ControlParameters_ExpirationPeriod, endOffset)) decoder.finishNestedTlvs(endOffset)
[docs] def encodeSignatureInfo(self, signature): """ Encode signature as an NDN-TLV SignatureInfo and return the encoding. :param signature: An object of a subclass of Signature to encode. :type signature: An object of a subclass of Signature :return: A Blob containing the encoding. :rtype: Blob """ encoder = TlvEncoder(256) self._encodeSignatureInfo(signature, encoder) return Blob(encoder.getOutput(), False) # SignatureHolder is used by decodeSignatureInfoAndValue.
[docs] class SignatureHolder(object):
[docs] def setSignature(self, signature): self._signature = signature
[docs] def getSignature(self): return self._signature
[docs] def decodeSignatureInfoAndValue(self, signatureInfo, signatureValue): """ Decode signatureInfo as a signature info and signatureValue as the related SignatureValue, and return a new object which is a subclass of Signature. :param signatureInfo: The array with the signature info input buffer to decode. :type signatureInfo: An array type with int elements :param signatureValue: The array with the signature value input buffer to decode. :type signatureValue: An array type with int elements :return: A new object which is a subclass of Signature. :rtype: a subclass of Signature """ # Use a SignatureHolder to imitate a Data object for _decodeSignatureInfo. signatureHolder = self.SignatureHolder() decoder = TlvDecoder(signatureInfo) self._decodeSignatureInfo(signatureHolder, decoder) decoder = TlvDecoder(signatureValue) signatureHolder.getSignature().setSignature( Blob(decoder.readBlobTlv(Tlv.SignatureValue))) return signatureHolder.getSignature()
[docs] def encodeSignatureValue(self, signature): """ Encode the signatureValue in the Signature object as an NDN-TLV SignatureValue (the signature bits) and return the encoding. :param signature: An object of a subclass of Signature with the signature value to encode. :type signature: An object of a subclass of Signature :return: A Blob containing the encoding. :rtype: Blob """ encoder = TlvEncoder(256) encoder.writeBlobTlv(Tlv.SignatureValue, signature.getSignature().buf()) return Blob(encoder.getOutput(), False)
[docs] def encodeEncryptedContent(self, encryptedContent): """ Encode the EncryptedContent in NDN-TLV and return the encoding. :param EncryptedContent encryptedContent: The EncryptedContent object to encode. :return: A Blob containing the encoding. :rtype: Blob """ encoder = TlvEncoder(256) saveLength = len(encoder) # Encode backwards. encoder.writeBlobTlv( Tlv.Encrypt_EncryptedPayload, encryptedContent.getPayload().buf()) encoder.writeOptionalBlobTlv( Tlv.Encrypt_InitialVector, encryptedContent.getInitialVector().buf()) # Assume the algorithmType value is the same as the TLV type. encoder.writeNonNegativeIntegerTlv( Tlv.Encrypt_EncryptionAlgorithm, encryptedContent.getAlgorithmType()) Tlv0_1_1WireFormat._encodeKeyLocator( Tlv.KeyLocator, encryptedContent.getKeyLocator(), encoder) encoder.writeTypeAndLength( Tlv.Encrypt_EncryptedContent, len(encoder) - saveLength) return Blob(encoder.getOutput(), False)
[docs] def decodeEncryptedContent(self, encryptedContent, input): """ Decode input as an EncryptedContent in NDN-TLV and set the fields of the encryptedContent object. :param EncryptedContent encryptedContent: The EncryptedContent object whose fields are updated. :param input: The array with the bytes to decode. :type input: An array type with int elements """ decoder = TlvDecoder(input) endOffset = decoder.readNestedTlvsStart(Tlv.Encrypt_EncryptedContent) Tlv0_1_1WireFormat._decodeKeyLocator( Tlv.KeyLocator, encryptedContent.getKeyLocator(), decoder) encryptedContent.setAlgorithmType( decoder.readNonNegativeIntegerTlv(Tlv.Encrypt_EncryptionAlgorithm)) encryptedContent.setInitialVector( Blob(decoder.readOptionalBlobTlv (Tlv.Encrypt_InitialVector, endOffset), True)) encryptedContent.setPayload( Blob(decoder.readBlobTlv(Tlv.Encrypt_EncryptedPayload), True)) decoder.finishNestedTlvs(endOffset)
@classmethod
[docs] def get(self): """ Get a singleton instance of a Tlv0_1_1WireFormat. To always use the preferred version NDN-TLV, you should use TlvWireFormat.get(). :return: The singleton instance. :rtype: Tlv0_1_1WireFormat """ if self._instance == None: self._instance = Tlv0_1_1WireFormat() return self._instance
@staticmethod def _encodeName(name, encoder): """ Encode the name to the encoder. :param Name name: The name to encode. :param TlvEncoder encoder: The encoder to receive the encoding. :return: A Tuple of (signedPortionBeginOffset, signedPortionEndOffset) where signedPortionBeginOffset is the offset in the encoding of the beginning of the signed portion, and signedPortionEndOffset is the offset in the encoding of the end of the signed portion. The signed portion starts from the first name component and ends just before the final name component (which is assumed to be a signature for a signed interest). :rtype: (int, int) """ saveLength = len(encoder) # Encode the components backwards. for i in range(len(name) - 1, -1, -1): encoder.writeBlobTlv(Tlv.NameComponent, name[i].getValue().buf()) if i == len(name) - 1: signedPortionEndOffsetFromBack = len(encoder) signedPortionBeginOffsetFromBack = len(encoder) encoder.writeTypeAndLength(Tlv.Name, len(encoder) - saveLength) signedPortionBeginOffset = (len(encoder) - signedPortionBeginOffsetFromBack) if len(name) == 0: # There is no "final component", so set signedPortionEndOffset # arbitrarily. signedPortionEndOffset = signedPortionBeginOffset else: signedPortionEndOffset = len(encoder) - signedPortionEndOffsetFromBack return (signedPortionBeginOffset, signedPortionEndOffset) @staticmethod def _decodeName(name, decoder): """ Clear the name, decode a Name from the decoder and set the fields of the name object. :param Name name: The name object whose fields are updated. :param TlvDecoder decode: The decoder with the input. :return: A Tuple of (signedPortionBeginOffset, signedPortionEndOffset) where signedPortionBeginOffset is the offset in the encoding of the beginning of the signed portion, and signedPortionEndOffset is the offset in the encoding of the end of the signed portion. The signed portion starts from the first name component and ends just before the final name component (which is assumed to be a signature for a signed interest). :rtype: (int, int) """ name.clear() endOffset = decoder.readNestedTlvsStart(Tlv.Name) signedPortionBeginOffset = decoder.getOffset() # In case there are no components, set signedPortionEndOffset arbitrarily. signedPortionEndOffset = signedPortionBeginOffset while decoder.getOffset() < endOffset: signedPortionEndOffset = decoder.getOffset() name.append(decoder.readBlobTlv(Tlv.NameComponent)) decoder.finishNestedTlvs(endOffset) return (signedPortionBeginOffset, signedPortionEndOffset) @staticmethod def _encodeSelectors(interest, encoder): """ Encode the interest selectors. If no selectors are written, do not output a Selectors TLV. """ saveLength = len(encoder) # Encode backwards. if interest.getMustBeFresh(): encoder.writeTypeAndLength(Tlv.MustBeFresh, 0) encoder.writeOptionalNonNegativeIntegerTlv( Tlv.ChildSelector, interest.getChildSelector()) if interest.getExclude().size() > 0: Tlv0_1_1WireFormat._encodeExclude(interest.getExclude(), encoder) if interest.getKeyLocator().getType() != None: Tlv0_1_1WireFormat._encodeKeyLocator( Tlv.PublisherPublicKeyLocator, interest.getKeyLocator(), encoder) encoder.writeOptionalNonNegativeIntegerTlv( Tlv.MaxSuffixComponents, interest.getMaxSuffixComponents()) encoder.writeOptionalNonNegativeIntegerTlv( Tlv.MinSuffixComponents, interest.getMinSuffixComponents()) # Only output the type and length if values were written. if len(encoder) != saveLength: encoder.writeTypeAndLength(Tlv.Selectors, len(encoder) - saveLength) @staticmethod def _decodeSelectors(interest, decoder): endOffset = decoder.readNestedTlvsStart(Tlv.Selectors) interest.setMinSuffixComponents( decoder.readOptionalNonNegativeIntegerTlv (Tlv.MinSuffixComponents, endOffset)) interest.setMaxSuffixComponents( decoder.readOptionalNonNegativeIntegerTlv (Tlv.MaxSuffixComponents, endOffset)) if decoder.peekType(Tlv.PublisherPublicKeyLocator, endOffset): Tlv0_1_1WireFormat._decodeKeyLocator( Tlv.PublisherPublicKeyLocator, interest.getKeyLocator(), decoder) else: interest.getKeyLocator().clear() if decoder.peekType(Tlv.Exclude, endOffset): Tlv0_1_1WireFormat._decodeExclude(interest.getExclude(), decoder) else: interest.getExclude().clear() interest.setChildSelector( decoder.readOptionalNonNegativeIntegerTlv (Tlv.ChildSelector, endOffset)) interest.setMustBeFresh( decoder.readBooleanTlv(Tlv.MustBeFresh, endOffset)) decoder.finishNestedTlvs(endOffset) @staticmethod def _encodeExclude(exclude, encoder): saveLength = len(encoder) # TODO: Do we want to order the components (except for ANY)? # Encode the entries backwards. for i in range(len(exclude) - 1, -1, -1): entry = exclude[i] if entry.getType() == Exclude.COMPONENT: encoder.writeBlobTlv(Tlv.NameComponent, entry.getComponent().getValue().buf()) elif entry.getType() == Exclude.ANY: encoder.writeTypeAndLength(Tlv.Any, 0) else: # We don't expect this to happen, but check anyway. raise RuntimeError("Unrecognized Exclude type" + str(entry.getType())) encoder.writeTypeAndLength(Tlv.Exclude, len(encoder) - saveLength) @staticmethod def _decodeExclude(exclude, decoder): endOffset = decoder.readNestedTlvsStart(Tlv.Exclude) exclude.clear() while True: if decoder.peekType(Tlv.NameComponent, endOffset): exclude.appendComponent(decoder.readBlobTlv(Tlv.NameComponent)) elif decoder.readBooleanTlv(Tlv.Any, endOffset): exclude.appendAny() else: # Else no more entries. break decoder.finishNestedTlvs(endOffset) @staticmethod def _encodeMetaInfo(metaInfo, encoder): saveLength = len(encoder) # Encode backwards. finalBlockIdBuf = metaInfo.getFinalBlockId().getValue().buf() if finalBlockIdBuf != None and len(finalBlockIdBuf) > 0: # FinalBlockId has an inner NameComponent. finalBlockIdSaveLength = len(encoder) encoder.writeBlobTlv(Tlv.NameComponent, finalBlockIdBuf) encoder.writeTypeAndLength( Tlv.FinalBlockId, len(encoder) - finalBlockIdSaveLength) encoder.writeOptionalNonNegativeIntegerTlvFromFloat( Tlv.FreshnessPeriod, metaInfo.getFreshnessPeriod()) if metaInfo.getType() != ContentType.BLOB: # Not the default, so we need to encode the type. if (metaInfo.getType() == ContentType.LINK or metaInfo.getType() == ContentType.KEY): # The ContentType enum is set up with the correct integer for # each NDN-TLV ContentType. encoder.writeNonNegativeIntegerTlv( Tlv.ContentType, metaInfo.getType()) else: raise RuntimeError("unrecognized TLV ContentType") encoder.writeTypeAndLength(Tlv.MetaInfo, len(encoder) - saveLength) @staticmethod def _decodeMetaInfo(metaInfo, decoder): endOffset = decoder.readNestedTlvsStart(Tlv.MetaInfo) # The ContentType enum is set up with the correct integer for each # NDN-TLV ContentType. If readOptionalNonNegativeIntegerTlv returns # None, then setType will convert it to BLOB. metaInfo.setType(decoder.readOptionalNonNegativeIntegerTlv( Tlv.ContentType, endOffset)) metaInfo.setFreshnessPeriod( decoder.readOptionalNonNegativeIntegerTlvAsFloat( Tlv.FreshnessPeriod, endOffset)) if decoder.peekType(Tlv.FinalBlockId, endOffset): finalBlockIdEndOffset = decoder.readNestedTlvsStart(Tlv.FinalBlockId) metaInfo.setFinalBlockId(decoder.readBlobTlv(Tlv.NameComponent)) decoder.finishNestedTlvs(finalBlockIdEndOffset) else: metaInfo.setFinalBlockId(None) decoder.finishNestedTlvs(endOffset) @staticmethod def _encodeSignatureInfo(signature, encoder): """ An internal method to encode signature as the appropriate form of SignatureInfo in NDN-TLV. :param Signature signature: An object of a subclass of Signature to encode. :param TlvEncoder encoder: The TlvEncoder to receive the encoding. """ saveLength = len(encoder) if type(signature) is Sha256WithRsaSignature: # Encode backwards. Tlv0_1_1WireFormat._encodeKeyLocator( Tlv.KeyLocator, signature.getKeyLocator(), encoder) encoder.writeNonNegativeIntegerTlv( Tlv.SignatureType, Tlv.SignatureType_SignatureSha256WithRsa) elif type(signature) is DigestSha256Signature: encoder.writeNonNegativeIntegerTlv( Tlv.SignatureType, Tlv.SignatureType_DigestSha256) else: raise RuntimeError( "encodeSignatureInfo: Unrecognized Signature object type") encoder.writeTypeAndLength(Tlv.SignatureInfo, len(encoder) - saveLength) @staticmethod def _decodeSignatureInfo(signatureHolder, decoder): endOffset = decoder.readNestedTlvsStart(Tlv.SignatureInfo) signatureType = decoder.readNonNegativeIntegerTlv(Tlv.SignatureType) if signatureType == Tlv.SignatureType_SignatureSha256WithRsa: signatureHolder.setSignature(Sha256WithRsaSignature()) # Modify signatureHolder's signature object because if we create an object # and set it, then signatureHolder will have to copy all the fields. signatureInfo = signatureHolder.getSignature() Tlv0_1_1WireFormat._decodeKeyLocator( Tlv.KeyLocator, signatureInfo.getKeyLocator(), decoder) elif signatureType == Tlv.SignatureType_DigestSha256: signatureHolder.setSignature(DigestSha256Signature()) else: raise RuntimeError( "decodeSignatureInfo: unrecognized SignatureInfo type" + str(signatureType)) decoder.finishNestedTlvs(endOffset) @staticmethod def _encodeKeyLocator(type, keyLocator, encoder): saveLength = len(encoder) # Encode backwards. if keyLocator.getType() != None: if keyLocator.getType() == KeyLocatorType.KEYNAME: Tlv0_1_1WireFormat._encodeName(keyLocator.getKeyName(), encoder) elif (keyLocator.getType() == KeyLocatorType.KEY_LOCATOR_DIGEST and len(keyLocator.getKeyData()) > 0): encoder.writeBlobTlv(Tlv.KeyLocatorDigest, keyLocator.getKeyData().buf()) else: raise RuntimeError("Unrecognized KeyLocatorType " + str(keyLocator.getType())) encoder.writeTypeAndLength(type, len(encoder) - saveLength) @staticmethod def _decodeKeyLocator(expectedType, keyLocator, decoder): endOffset = decoder.readNestedTlvsStart(expectedType) keyLocator.clear() if decoder.getOffset() == endOffset: # The KeyLocator is omitted, so leave the fields as none. return if decoder.peekType(Tlv.Name, endOffset): # KeyLocator is a Name. keyLocator.setType(KeyLocatorType.KEYNAME) Tlv0_1_1WireFormat._decodeName(keyLocator.getKeyName(), decoder) elif decoder.peekType(Tlv.KeyLocatorDigest, endOffset): # KeyLocator is a KeyLocatorDigest. keyLocator.setType(KeyLocatorType.KEY_LOCATOR_DIGEST) keyLocator.setKeyData( Blob(decoder.readBlobTlv(Tlv.KeyLocatorDigest))) else: raise RuntimeError("decodeKeyLocator: Unrecognized key locator type") decoder.finishNestedTlvs(endOffset)