# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Adeola Bannis <thecodemaiden@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
import math
from pyndn.encoding.der.der import Der
from pyndn.util.blob import Blob
from pyndn.encoding.der.der_exceptions import NegativeLengthException, DerEncodingException, DerDecodingException
from datetime import datetime
"""
This module defines the implemented DER node types used in encoding/decoding DER formatted data.
"""
[docs]class DerNode (object):
def __init__(self, nodeType):
"""
Create an untyped DER node. This class should never be instantiated directly: instead, use one of the node classesdefined below.
:param nodeType: The DER node type
:type nodeType: An int defined in the Der class
"""
self._parent = None
self._nodeType = nodeType
self._header = bytearray()
self._payload = bytearray()
[docs] def getSize(self):
"""
Get the total length of the encoding.
:return: The total (header + payload) length
:rtype: int
"""
return len(self._header) + len(self._payload)
def _encodeHeader(self, size):
"""
Encode the given size and update the header.
:param size: The payload size to encode
:type size: int
"""
self._header = bytearray()
self._header.append(self._nodeType)
if size < 0:
raise NegativeLengthException("DER object has negative length")
elif size <= 127:
self._header.append(size & 0xff)
else:
tempBuf = bytearray()
n = 0
val = size
while val != 0:
tempBuf.insert(0, (val & 0xff))
val >>= 8
n += 1
tempBuf.insert(0, ((1<<7) |n ) & 0xff)
self._header.extend(tempBuf)
def _decodeHeader(self, inputBuf, startIdx=0):
"""
Extracts the header from an input buffer.
:param inputBuf: The input buffer to read from.
:type inputBuf: bytearray or Blob
:param startIdx: (optional) An offset into the buffer.
:type startIdx: int
"""
if isinstance(inputBuf, Blob):
inputBuf = inputBuf.buf()
idx = startIdx
nodeType = inputBuf[idx]
idx += 1
self._nodeType = nodeType
sizeLen = inputBuf[idx]
idx += 1
self._header = bytearray([nodeType, sizeLen])
size = sizeLen
isLongFormat = sizeLen & (1 << 7)
if isLongFormat:
lenCount = sizeLen & ((1<<7) -1)
size = 0
while lenCount > 0:
b = inputBuf[idx]
idx += 1
self._header.append(b)
size = 256*size + int(b)
lenCount -= 1
return size
[docs] def encode(self):
"""
:return: The raw data encoding for this node
:rtype: Blob
"""
val = self._header+self._payload
return Blob(val)
[docs] def decode(self, inputBuf, startIdx=0):
"""
Decode and store the data from an input buffer.
:param inputBuf: The input buffer to read from.
:type inputBuf: bytearray or Blob
:param startIdx: (optional) An offset into the buffer.
:type startIdx: int
"""
if isinstance(inputBuf, Blob):
inputBuf = inputBuf.buf()
idx = startIdx
payloadSize = self._decodeHeader(inputBuf, idx)
skipBytes = len(self._header)
if payloadSize > 0:
idx += skipBytes
self._payload.extend(inputBuf[idx:idx+payloadSize])
@staticmethod
[docs] def parse(inputBuf, startIdx=0):
"""
Parse the data from the input buffer recursively and return the root as a subclass of DerNode.
:param inputBuf: The input buffer to read from.
:type inputBuf: bytearray or Blob
:param startIdx: (optional) An offset into the buffer.
:type startIdx: int
"""
if isinstance(inputBuf, Blob):
inputBuf = inputBuf.buf()
nodeType = inputBuf[startIdx] # don't increment, we're just peeking
outputType = None
if nodeType == Der.Boolean:
outputType = DerBoolean
elif nodeType == Der.Integer:
outputType = DerInteger
elif nodeType == Der.BitString:
outputType = DerBitString
elif nodeType == Der.OctetString:
outputType = DerOctetString
elif nodeType == Der.Null:
outputType = DerNull
elif nodeType == Der.ObjectIdentifier:
outputType = DerOid
elif nodeType == Der.Sequence:
outputType = DerSequence
elif nodeType == Der.PrintableString:
outputType = DerPrintableString
elif nodeType == Der.GeneralizedTime:
outputType = DerGeneralizedTime
else:
raise DerDecodingException("Unimplemented DER type {}".format(nodeType))
newNode = outputType()
newNode.decode(inputBuf, startIdx)
return newNode
[docs] def toVal(self):
"""
Convert the encoded data to a standard representation. Overridden by some subclasses (e.g. DerBoolean)
:return: The encoded data
:rtype: Blob
"""
return self.encode()
[docs] def getPayload(self):
"""
Get a copy of the payload bytes.
:return: A copy of the payload.
:rtype: Blob
"""
return Blob(self._payload, True)
[docs] def getChildren(self):
"""
If this object is a DerSequence, get the children of this node. Otherwise,
raise an exception. (DerSequence overrides to implement this method.)
:return: The children of this node
:rtype: array of DerNode
:raises: DerDecodingException if this object is not a DerSequence.
"""
raise DerDecodingException("getChildren: This DerNode is not DerSequence")
@staticmethod
[docs] def getSequence(children, index):
"""
Check that index is in bounds for the children list, and return
children[index].
:param children: The list of DerNode, usually returned by another
call to getChildren.
:type children: array of DerNode
:param int index: The index of the children.
:return: children[index] which is a DerSequence
:rtype: DerSequence
:raises: DerDecodingException if index is out of bounds or if
children[index] is not a DerSequence.
"""
if index < 0 or index >= len(children):
raise DerDecodingException("getSequence: Child index is out of bounds")
if not (type(children[index]) is DerSequence):
raise DerDecodingException(
"getSequence: Child DerNode is not a DerSequence")
return children[index]
[docs]class DerStructure(DerNode):
def __init__(self, nodeType):
"""
Create a DerNode that can hold other DerNodes. Do not instantiate this directly: instead use a DerSequence.
:param nodeType: The DER node type
:type nodeType: An int defined in the Der class
"""
super(DerStructure, self).__init__(nodeType)
self._childChanged = False
self._nodeList = []
self._size = 0
[docs] def getSize(self):
"""
Get the total length of the encoding, including children
:return: The total (header + payload) length
:rtype: int
"""
if self._childChanged:
self.updateSize()
self._childChanged = False
self._header = bytearray()
self._encodeHeader(self._size)
return self._size + len(self._header)
[docs] def getChildren(self):
"""
:return: The children of this node
:rtype: array of DerNode
"""
return self._nodeList
[docs] def updateSize(self):
"""
Returns a Blob
"""
newSize = 0
for n in self._nodeList:
newSize += n.getSize()
self._size = newSize
self._childChanged = False
[docs] def addChild(self, node, notifyParent=False):
"""
Add a child to this node.
:param node: The child node to add.
:type node: DerNode
:param notifyParent: (optional) Set to true to cause any containing nodes to update their size
:type notifyParent: boolean
"""
node._parent = self
self._nodeList.append(node)
if notifyParent:
if self._parent is not None:
self._parent.setChildChanged()
self._childChanged = True
[docs] def setChildChanged(self):
"""
Mark the child list as dirty, so that we update size when necessary.
"""
if self._parent is not None:
self._parent.setChildChanged()
self._childChanged = True
[docs] def encode(self):
"""
:return: The raw data encoding for this node and its children
:rtype: Blob
"""
temp = bytearray()
self.updateSize()
self._header = bytearray()
self._encodeHeader(self._size)
temp.extend(self._header)
for n in self._nodeList:
encodedChild = n.encode()
temp.extend(encodedChild.buf())
return Blob(temp)
[docs] def decode(self, inputBuf, startIdx = 0):
"""
Decode and store the data from an input buffer. Recursively populates child nodes.
:param inputBuf: The input buffer to read from.
:type inputBuf: bytearray or Blob
:param startIdx: (optional) An offset into the buffer.
:type startIdx: int
"""
idx = startIdx
self._size = self._decodeHeader(inputBuf, idx)
idx += len(self._header)
accSize = 0
while accSize < self._size:
node = self.parse(inputBuf, idx)
idx += node.getSize()
accSize += node.getSize()
self.addChild(node, False)
########
# Now for all the node types...
########
[docs]class DerByteString(DerNode):
def __init__(self, inputData, nodeType):
"""
Create a node that handles byte strings. Do not instantiate this type directly: instead use a subclass such as DerOctetString or DerPrintableString.
:param inputData: An input buffer containing the string to encode.
:type inputData: Blob or bytearray
:param nodeType: The specific DER node type.
:type nodeType: An int defined in the Der class.
"""
super(DerByteString, self).__init__(nodeType)
if inputData is not None:
if isinstance(inputData, Blob):
inputData = inputData.buf()
else:
inputData = bytearray(inputData)
self._payload.extend(inputData)
self._encodeHeader(len(self._payload))
[docs] def toVal(self):
"""
For byte string types, the payload encodes the string directly, so it is used as a representation.
:return: The encoded string
:rtype: bytearray
"""
return self._payload # already a byte string
[docs]class DerBoolean(DerNode):
def __init__(self, val=None):
"""
Create a DerNode that encodes a boolean value.
:param val: (optional) The value to encode
:type val: boolean
"""
super(DerBoolean, self).__init__(Der.Boolean)
if val is not None:
val = 0xff if val else 0x00
self._payload.append(val)
self._encodeHeader(len(self._payload))
[docs] def toVal(self):
val = self._payload[0]
return val != 0x00
[docs]class DerInteger(DerNode):
def __init__(self, integer=None):
"""
Create a DerNode that encodes a integer value.
:param integer: (optional) The value to encode
:type integer: int
"""
super(DerInteger, self).__init__(Der.Integer)
if integer is not None:
if integer < 0:
raise DerEncodingException(
"DerInteger: Negative integers are not currently supported");
# convert the integer to bytes the easy/slow way
temp = bytearray()
while True:
temp.insert(0, integer & 0xff)
integer >>= 8
if integer <= 0:
# We check for 0 at the end so we encode one byte if it is 0.
break
if temp[0] >= 0x80:
# Make it a non-negative integer.
temp.insert(0, 0)
self._payload.extend(temp)
self._encodeHeader(len(self._payload))
[docs] def toVal(self):
if len(self._payload) > 0 and self._payload[0] >= 0x80:
raise DerDecodingException(
"DerInteger: Negative integers are not currently supported")
result = 0
for i in range(len(self._payload)):
result *= 256
result += self._payload[i]
return result
[docs]class DerBitString(DerNode):
def __init__(self, inputBuf=None, padding=None):
"""
Create a DerNode that encodes a bit string value.
:param inputBuf: (optional) A buffer containing the bits to encode
:type inputBuf: Blob or bytearray
:param padding: (optional) The number of bits of padding at the end of the bit string
:type padding: int < 8
"""
super(DerBitString, self).__init__(Der.BitString)
if inputBuf is not None:
if isinstance(inputBuf, Blob):
inputBuf = inputBuf.buf()
self._payload.append(padding)
self._payload.extend(inputBuf)
self._encodeHeader(len(self._payload))
[docs]class DerOctetString(DerByteString):
def __init__(self, inputData = None):
"""
Create a DerNode to encode a string of bytes
:param inputData: An input buffer containing the string to encode.
:type inputData: Blob or bytearray
"""
super(DerOctetString, self).__init__(inputData, Der.OctetString)
[docs]class DerNull(DerNode):
def __init__(self):
"""
Create a DerNode to encode a null value
"""
super(DerNull, self).__init__(Der.Null)
self._encodeHeader(0)
[docs]class DerOid(DerNode):
def __init__(self, oid = None):
"""
Create a DerNode to encode an object identifier.
The object identifier string must begin with 0,1, or 2 and must contain at least 2 digits.
:param oid: The OID to encode
:type oid: string or OID
"""
super(DerOid, self).__init__(Der.ObjectIdentifier)
if oid is not None:
if type(oid) is str:
parts = [int(p) for p in oid.split('.')]
self.prepareEncoding(parts)
else:
# Assume oid is of type OID.
self.prepareEncoding(oid.getIntegerList())
[docs] def prepareEncoding(self, value):
"""
Encode a sequence of integers into an OID object.
"""
firstNumber = 0
if len(value) == 0:
raise DerEncodingException("No integer in OID")
else:
if value[0] >= 0 and value[0] <= 2:
firstNumber = value[0]*40
else:
raise DerEncodingException("First integer in OID is out of range")
if len(value) >= 2:
if value[1] >= 0 and value[1] <= 39:
firstNumber += value[1]
else:
raise DerEncodingException("Second integer in OID is out of range")
encodedStr = self._encode128(firstNumber)
if len(value) > 2:
for i in range(2,len(value)):
encodedStr.extend(self._encode128(value[i]))
self._encodeHeader(len(encodedStr))
self._payload.extend(encodedStr)
def _encode128(self, value):
"""
Compute the encoding for one part of an OID, where values greater than 128 must be encoded as multiple bytes.
:param value: A component of an OID
:type value: int
"""
mask = (1 << 7) - 1
outBytes = bytearray()
if value < 128:
outBytes.append(value & mask)
else:
outBytes.insert(0, value & mask)
value >>= 7
while value != 0:
outBytes.insert(0, (value & mask) | (1 << 7))
value >>= 7
return outBytes
def _decode128(self, offset):
"""
Convert an encoded component of the OID to the original integer.
:param offset: The offset into this node's payload
:type offset: int
"""
flagMask = 0x80
result = 0
oldOffset = offset
while self._payload[offset] & flagMask:
result = 128 * result + self._payload[offset]-128
offset += 1
result = result * 128 + self._payload[offset]
return (result, offset-oldOffset+1)
[docs] def toVal(self):
"""
:return: The string representation of the OID
:rtype: string
"""
offset = 0
components = []
while offset < len(self._payload):
nextVal,skip = self._decode128(offset)
offset += skip
components.append(nextVal)
# for some odd reason, the first digits are represented in one byte
firstByte = components[0]
firstDigit = int(math.floor(firstByte/40))
secondDigit = firstByte%40
components = [firstDigit, secondDigit]+components[1:]
return '.'.join([str(b) for b in components])
[docs]class DerSequence(DerStructure):
def __init__(self):
"""
Create a DerNode that contains an ordered sequence of other nodes.
"""
super(DerSequence, self).__init__(Der.Sequence)
[docs]class DerPrintableString(DerByteString):
def __init__(self, inputData = None):
"""
Create a DerNode to encode a printable string
No escaping or other modification is done to the string
:param inputData: An input buffer containing the string to encode.
:type inputData: Blob or bytearray
"""
super(DerPrintableString, self).__init__(inputData, Der.PrintableString)
[docs] def toVal(self):
"""
:return: The string encoded in the node
:rtype: string
"""
return Blob(self._payload, False).toRawStr()
[docs]class DerGeneralizedTime(DerNode):
def __init__(self, msSince1970 = None):
super(DerGeneralizedTime, self).__init__(Der.GeneralizedTime)
"""
Create a DerNode representing a date and time, with millisecond accuracy.
:param msSince1970: (optional) Timestamp as milliseconds since Jan 1, 1970
:type msSince1970: float
"""
if msSince1970 is not None:
derTime = self.toDerTimeString(msSince1970)
self._payload.extend(bytearray(derTime, 'ascii'))
self._encodeHeader(len(self._payload))
@staticmethod
[docs] def toDerTimeString(msSince1970):
"""
Convert a UNIX timestamp to the internal string representation
:param msSince1970: Timestamp as milliseconds since Jan 1, 1970
:type msSince1970: float
:return: The time string
:rtype: str
"""
secondsSince1970 = msSince1970/1000.0
utcTime = datetime.utcfromtimestamp(secondsSince1970)
derTime = utcTime.strftime("%Y%m%d%H%M%SZ")
return derTime
[docs] def toVal(self):
# return the milliseconds since 1970
"""
:return: The timestamp encoded in this node as milliseconds since 1970
:rtype: float
"""
timeStr = Blob(self._payload, False).toRawStr()
dt = datetime.strptime(timeStr, "%Y%m%d%H%M%SZ")
epochStart = datetime(1970, 1,1)
msSince1970 = (dt-epochStart).total_seconds()*1000
return msSince1970