Source code for pyndn.encoding.tlv.tlv_structure_decoder
# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
This module defines the TlvStructureDecoder class.
"""
from pyndn.encoding.tlv.tlv_decoder import TlvDecoder
[docs]class TlvStructureDecoder(object):
"""
Create and initialize a TlvStructureDecoder.
"""
def __init__(self):
self._gotElementEnd = False
self._offset = 0
self._state = self.READ_TYPE
self._headerLength = 0
self._useHeaderBuffer = False
# 8 bytes is enough to hold the extended bytes in the length encoding
# where it is an 8-byte number.
self._headerBuffer = bytearray(8)
self._nBytesToRead = 0
READ_TYPE = 0
READ_TYPE_BYTES = 1
READ_LENGTH = 2
READ_LENGTH_BYTES = 3
READ_VALUE_BYTES = 4
[docs] def findElementEnd(self, input):
"""
Continue scanning input starting from self._offset to find the element
end. If the end of the element which started at offset 0 is found,
this returns True and getOffset() is the length of the element.
Otherwise, this returns False which means you should read more into
input and call again.
:param input: The input buffer. You have to pass in input each time
because the buffer could be reallocated.
:type input: An array type with int elements
:return: True if found the element end, False if not.
:rtype: bool
"""
if self._gotElementEnd:
# Someone is calling when we already got the end.
return True
decoder = TlvDecoder(input)
while True:
if self._offset >= len(input):
# All the cases assume we have some input. Return and wait
# for more.
return False
if self._state == self.READ_TYPE:
firstOctet = input[self._offset]
self._offset += 1
if firstOctet < 253:
# The value is simple, so we can skip straight to reading
# the length.
self._state = self.READ_LENGTH
else:
# Set up to skip the type bytes.
if firstOctet == 253:
self._nBytesToRead = 2
elif firstOctet == 254:
self._nBytesToRead = 4
else:
# value == 255.
self._nBytesToRead = 8
self._state = self.READ_TYPE_BYTES
elif self._state == self.READ_TYPE_BYTES:
nRemainingBytes = len(input) - self._offset
if nRemainingBytes < self._nBytesToRead:
# Need more.
self._offset += nRemainingBytes
self._nBytesToRead -= nRemainingBytes
return False
# Got the type bytes. Move on to read the length.
self._offset += self._nBytesToRead
self._state = self.READ_LENGTH
elif self._state == self.READ_LENGTH:
firstOctet = input[self._offset]
self._offset += 1
if firstOctet < 253:
# The value is simple, so we can skip straight to reading
# the value bytes.
self._nBytesToRead = firstOctet
if self._nBytesToRead == 0:
# No value bytes to read. We're finished.
self._gotElementEnd = True
return True
self._state = self.READ_VALUE_BYTES
else:
# We need to read the bytes in the extended encoding of
# the length.
if firstOctet == 253:
self._nBytesToRead = 2
elif firstOctet == 254:
self._nBytesToRead = 4
else:
# value == 255.
self._nBytesToRead = 8
# We need to use firstOctet in the next state.
self._firstOctet = firstOctet
self._state = self.READ_LENGTH_BYTES
elif self._state == self.READ_LENGTH_BYTES:
nRemainingBytes = len(input) - self._offset
if (not self._useHeaderBuffer and
nRemainingBytes >= self._nBytesToRead):
# We don't have to use the headerBuffer. Set nBytesToRead.
decoder.seek(self._offset)
self._nBytesToRead = decoder.readExtendedVarNumber(
self._firstOctet)
# Update self._offset to the decoder's offset after reading.
self._offset = decoder.getOffset()
else:
self._useHeaderBuffer = True
nNeededBytes = self._nBytesToRead - self._headerLength
if nNeededBytes > nRemainingBytes:
# We can't get all of the header bytes from this input.
# Save in headerBuffer.
if (self._headerLength + nRemainingBytes >
len(self._headerBuffer)):
# We don't expect this to happen.
raise RuntimeError(
"Cannot store more header bytes than the size of headerBuffer")
self._headerBuffer[
self._headerLength:self._headerLength + nRemainingBytes] = \
input[self._offset:self._offset + nRemainingBytes]
self._offset += nRemainingBytes
self._headerLength += nRemainingBytes
return False
# Copy the remaining bytes into headerBuffer, read the
# length and set nBytesToRead.
if (self._headerLength + nNeededBytes >
len(self._headerBuffer)):
# We don't expect this to happen.
raise RuntimeError(
"Cannot store more header bytes than the size of headerBuffer")
self._headerBuffer[
self._headerLength:self._headerLength + nNeededBytes] = \
input[self._offset:self._offset + nNeededBytes]
self._offset += nNeededBytes
# Use a local decoder just for the headerBuffer.
bufferDecoder = TlvDecoder(self._headerBuffer)
# Replace nBytesToRead with the length of the value.
self._nBytesToRead = bufferDecoder.readExtendedVarNumber(
self._firstOctet)
if self._nBytesToRead == 0:
# No value bytes to read. We're finished.
self._gotElementEnd = True
return True
# Get ready to read the value bytes.
self._state = self.READ_VALUE_BYTES
elif self._state == self.READ_VALUE_BYTES:
nRemainingBytes = len(input) - self._offset
if nRemainingBytes < self._nBytesToRead:
# Need more.
self._offset += nRemainingBytes
self._nBytesToRead -= nRemainingBytes
return False
# Got the bytes. We're finished.
self._offset += self._nBytesToRead
self._gotElementEnd = True
return True
else:
# We don't expect this to happen.
raise RuntimeError("findElementEnd: unrecognized state")
[docs] def getOffset(self):
"""
Get the current offset into the input buffer.
:return: The offset.
:rtype: int
"""
return self._offset
[docs] def seek(self, offset):
"""
Set the offset into the input, used for the next read.
:param int offset: The new offset.
"""
self._offset = offset