Source code for pyndn.security.certificate.certificate
# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Adeola Bannis <thecodemaiden@gmail.com>
# From ndn-cxx security by Yingdi Yu <yingdi@cs.ucla.edu>.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
from pyndn.encoding.oid import OID
from pyndn.encoding.der.der_node import *
from pyndn.encoding.der.der import *
from pyndn.security.certificate.public_key import PublicKey
from pyndn.util.blob import Blob
from pyndn.data import Data
from pyndn.meta_info import ContentType
from datetime import datetime
import base64
[docs]class Certificate(Data):
epochStart = datetime(1970,1,1)
def __init__(self, other=None):
"""
Create a new certificate, optionally copying the
contents of another Data object.
:param other: (optional) A Data packet to copy the content from
:type other: Data
"""
super(Certificate,self).__init__(other)
self._subjectDescriptionList = []
self._extensionList = []
if isinstance(other, Data):
self.decode()
else:
self._notBefore = 1e37
self._notAfter = -1e37
self._publicKey = None
[docs] def isTooEarly(self):
"""
Check if the certificate start date is in the future
:return: True if the certificate cannot be used yet
:rtype: boolean
"""
secondsSince1970 = (datetime.now() - self.epochStart).total_seconds
if secondsSince1970 < self._notBefore/1000:
return True
return False
def __str__(self):
s = "Certificate name:\n"
s += " "+self.getName().toUri()+"\n"
s += "Validity:\n"
dateFormat = "%Y%m%dT%H%M%S"
notBeforeStr = datetime.utcfromtimestamp(self._notBefore/1000).strftime(dateFormat)
notAfterStr = datetime.utcfromtimestamp(self._notAfter/1000).strftime(dateFormat)
s += " NotBefore: " + notBeforeStr+"\n"
s += " NotAfter: " + notAfterStr + "\n"
for sd in self._subjectDescriptionList:
s += "Subject Description:\n"
s += " " + str(sd.getOid()) + ": " + sd.getValue().toRawStr() + "\n"
s += "Public key bits:\n"
keyDer = self._publicKey.getKeyDer()
encodedKey = base64.b64encode(keyDer.toBytes())
for idx in range(0, len(encodedKey), 64):
# Use Blob to convert to a str.
s += Blob(encodedKey[idx:idx+64], False).toRawStr() + "\n"
if len(self._extensionList) > 0:
s += "Extensions:\n"
for ext in self._extensionList:
s += " OID: "+ext.getOid()+"\n"
s += " Is critical: " + ('Y' if ext.isCritical() else 'N') + "\n"
s += " Value: " + str(ext.getValue()).encode('hex') + "\n"
return s
[docs] def addSubjectDescription(self, descr):
"""
Add a subject description field to the certificate.
:param descr: The CertificateSubjectDescription object to add
"""
self._subjectDescriptionList.append(descr)
[docs] def addExtension(self, ext):
"""
Add an extension field to the certificate.
:param ext: Th CertificateExtension object to add
"""
self._extensionList.append(ext)
[docs] def toDer(self):
"""
Encode the certificate fields in DER format.
:return: The DER encoded contents of the certificate.
:rtype: DerNode
"""
root = DerSequence()
validity = DerSequence()
notBefore = DerGeneralizedTime(self._notBefore)
notAfter = DerGeneralizedTime(self._notAfter)
validity.addChild(notBefore)
validity.addChild(notAfter)
root.addChild(validity)
subjectList = DerSequence()
for sd in self._subjectDescriptionList:
child = sd.toDer()
subjectList.addChild(child)
root.addChild(subjectList)
root.addChild(self._publicKey.toDer())
if (len(self._extensionList) > 0):
extnList = DerSequence()
for ext in self._extensionList:
child = ext.toDer()
extnList.addChild(child)
root.addChild(extnList)
return root
[docs] def encode(self):
"""
Encode the contents of the certificate in DER format and set the
Content and MetaInfo fields.
"""
root = self.toDer()
outVal = root.encode()
self.setContent(Blob(outVal))
self.getMetaInfo().setType(ContentType.KEY)
[docs] def decode(self):
"""
Populates the fields by decoding DER data from the Content.
"""
root = DerNode.parse(self.getContent())
# we need to ensure that there are:
# validity (notBefore, notAfter)
# subject list
# public key
# (optional) extension list
rootChildren = root.getChildren()
# 1st: validity info
validityChildren = DerNode.getSequence(rootChildren, 0).getChildren()
self._notBefore = validityChildren[0].toVal()
self._notAfter = validityChildren[1].toVal()
# 2nd: subjectList
subjectChildren = DerNode.getSequence(rootChildren, 1).getChildren()
for sd in subjectChildren:
descriptionChildren = sd.getChildren()
oidStr = descriptionChildren[0].toVal()
value = descriptionChildren[1].toVal()
subjectDesc = CertificateSubjectDescription(oidStr, value)
self.addSubjectDescription(subjectDesc)
# 3rd: public key
publicKeyInfo = rootChildren[2].encode()
self._publicKey = PublicKey(publicKeyInfo)
if len(rootChildren) > 3:
extensionChildren = DerNode.getSequence(rootChildren, 3).getChildren()
for extInfo in extensionChildren:
children = extInfo.getChildren()
oidStr = children[0].toVal()
isCritical = children[1].toVal()
value = children[2].toVal()
extension = CertificateExtension(oidStr, isCritical, value)
self.addExtension(extension)
[docs] def wireDecode(self, buf, wireFormat = None):
"""
Make sure the fields are populated after decoding
"""
Data.wireDecode(self, buf, wireFormat)
self.decode()
[docs] def getNotBefore(self):
"""
Returns the earliest date the certificate is valid at.
:return: Timestamp as milliseconds since 1970.
:rtype: float
"""
return self._notBefore
[docs] def getNotAfter(self):
"""
Returns the latest date the certificate is valid at.
:return: Timestamp as milliseconds since 1970.
:rtype: float
"""
return self._notAfter
[docs] def getPublicKeyInfo(self):
"""
:return: The PublicKey object stored in the certificate.
:rtype: PublicKey
"""
return self._publicKey
[docs] def setNotBefore(self, notBefore):
self._notBefore = notBefore
[docs] def setNotAfter(self, notAfter):
self._notAfter = notAfter
[docs] def setPublicKeyInfo(self, publicKey):
"""
Assign a new public key to the certificate.
:param publicKey: The new public key
:type publicKey: PublicKey
"""
self._publicKey = publicKey
[docs] def getSubjectDescriptions(self):
"""
:return: The subject description fields of the certificate.
:rtype: list of CertificateSubjectDescription
"""
return self._subjectDescriptionList
[docs] def getExtensionList(self):
"""
:return: The extension fields of the certificate.
:rtype: list of CertificateExtension
"""
return self._extensionList
[docs] def getExtensions(self):
"""
:deprecated: Use getExtensionList.
"""
return self.getExtensionList()
[docs]class CertificateSubjectDescription:
def __init__(self, oid, value):
"""
Create a subject description field for a certificate.
:param oid: The object identifier
:type oid: str or OID
:param value: The value of the description field
:type value: Blob or bytearray
"""
if type(oid) is str:
self._oid = OID(oid)
else:
# Assume oid is already an OID.
self._oid = oid
self._value = Blob(value)
[docs] def getOid(self):
"""
:return: The object identifier of the subject description field.
:rtype: OID
"""
return self._oid
[docs] def getValue(self):
"""
:return: The value of the subject description field.
:rtype: Blob
"""
return self._value
[docs] def toDer(self):
"""
Encode this field as a DerNode.
:return: Encoded subject description
:rtype: DerSequence
"""
root = DerSequence()
oid = DerOid(self._oid)
value = DerPrintableString(self._value)
root.addChild(oid)
root.addChild(value)
return root
[docs]class CertificateExtension:
def __init__(self, oid, isCritical, value):
"""
Create a certificate extension field.
:param oid: The object identifier for the extension
:type oid: str or OID
:param isCritical: Whether this extension is critical to the certificate
:type isCritical: boolean
:param value: The value of the extension field
:type value: bytearray or Blob
"""
if type(oid) is str:
self._oid = OID(oid)
else:
# Assume oid is already an OID.
self._oid = oid
self._isCritical = isCritical
self._value = Blob(value)
[docs] def toDer(self):
"""
Encode this field as a DerNode.
:return: Encoded certificate extension
:rtype: DerSequence
"""
root = DerSequence()
extensionId = DerOid(self._oid)
isCritical = DerBoolean(self._isCritical)
extensionValue = DerOctetString(self._value)
root.addChild(extensionId)
root.addChild(isCritical)
root.addChild(extensionValue)
return root
[docs] def getOid(self):
"""
:return: The object identifier of the subject description field.
:rtype: OID
"""
return self._oid
[docs] def getIsCritical(self):
"""
:return: Whether the extension is critical to the certificate
:rtype: boolean
"""
return self._isCritical
[docs] def isCritical(self):
"""
:deprecated: Use getIsCritical.
"""
return self.getIsCritical()
[docs] def getValue(self):
"""
:return: The value of the extension field
:rtype: Blob
"""
return self._value