Source code for pyndn.security.identity.identity_storage

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
# From ndn-cxx security by Yingdi Yu <yingdi@cs.ucla.edu>.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

"""
This module defines the IdentityStorage abstract class which is a base class for
the storage of identity, public keys and certificates.  Private keys are stored
in PrivateKeyStorage. This is an abstract base class.  A subclass must implement
the methods.
"""

import math
from pyndn.name import Name
from pyndn.util.common import Common
from pyndn.security.security_exception import SecurityException

[docs]class IdentityStorage(object):
[docs] def doesIdentityExist(self, identityName): """ Check if the specified identity already exists. :param Name identityName: The identity name. :return: True if the identity exists, otherwise False. :rtype: bool """ raise RuntimeError("doesIdentityExist is not implemented")
[docs] def addIdentity(self, identityName): """ Add a new identity. Do nothing if the identity already exists. :param Name identityName: The identity name. """ raise RuntimeError("doesIdentityExist is not implemented")
[docs] def revokeIdentity(self): """ Revoke the identity. :return: True if the identity was revoked, False if not. :rtype: bool """ raise RuntimeError("revokeIdentityExist is not implemented")
[docs] def getNewKeyName(self, identityName, useKsk): """ Generate a name for a new key belonging to the identity. :param Name identityName: The identity name. :param bool useKsk: If True, generate a KSK name, otherwise a DSK name. :return: The generated key name. :rtype: Name """ timestamp = math.floor(Common.getNowMilliseconds() / 1000.0) while timestamp <= self._lastTimestamp: # Make the timestamp unique. timestamp += 1 self._lastTimestamp = timestamp nowString = repr(timestamp).replace(".0", "") if useKsk: keyIdStr = "ksk-" + nowString else: keyIdStr = "dsk-" + nowString keyName = Name(identityName).append(keyIdStr) if self.doesKeyExist(keyName): raise SecurityException("Key name already exists") return keyName
[docs] def doesKeyExist(self, keyName): """ Check if the specified key already exists. :param Name keyName: The name of the key. :return: True if the key exists, otherwise False. :rtype: bool """ raise RuntimeError("doesKeyExist is not implemented")
[docs] def addKey(self, keyName, keyType, publicKeyDer): """ Add a public key to the identity storage. Also call addIdentity to ensure that the identityName for the key exists. However, if the key already exists, do nothing. :param Name keyName: The name of the public key to be added. :param keyType: Type of the public key to be added. :type keyType: int from KeyType :param Blob publicKeyDer: A blob of the public key DER to be added. """ raise RuntimeError("addKey is not implemented")
[docs] def getKey(self, keyName): """ Get the public key DER blob from the identity storage. :param Name keyName: The name of the requested public key. :return: The DER Blob. :rtype: Blob :raises SecurityException: if the key doesn't exist. """ raise RuntimeError("getKey is not implemented")
[docs] def activateKey(self, keyName): """ Activate a key. If a key is marked as inactive, its private part will not be used in packet signing. :param Name keyName: The name of the key. """ raise RuntimeError("activateKey is not implemented")
[docs] def deactivateKey(self, keyName): """ Deactivate a key. If a key is marked as inactive, its private part will not be used in packet signing. :param Name keyName: The name of the key. """ raise RuntimeError("deactivateKey is not implemented")
[docs] def deletePublicKeyInfo(self, keyName): """ Remove the key and all certificates associated with it. :param Name keyName: The name of the key. """ raise RuntimeError("deletePublicKeyInfo is not implemented")
[docs] def doesCertificateExist(self, certificateName): """ Check if the specified certificate already exists. :param Name certificateName: The name of the certificate. :return: True if the certificate exists, otherwise False. :rtype: bool """ raise RuntimeError("doesCertificateExist is not implemented")
[docs] def addCertificate(self, certificate): """ Add a certificate to the identity storage. Also call addKey to ensure that the certificate key exists. If the certificate is already installed, don't replace it. :param IdentityCertificate certificate: The certificate to be added. This makes a copy of the certificate. """ raise RuntimeError("addCertificate is not implemented")
[docs] def getCertificate(self, certificateName): """ Get a certificate from the identity storage. :param Name certificateName: The name of the requested certificate. :return: The requested certificate. :rtype: IdentityCertificate :raises SecurityException: if the certificate doesn't exist. """ raise RuntimeError("getCertificate is not implemented")
[docs] def deleteCertificateInfo(self, certificateName): """ Remove a certificate from associated keys. :param Name keyName: The name of the key. """ raise RuntimeError("deleteCertificateInfo is not implemented")
[docs] def deleteIdentityInfo(self, identity): """ Delete an identity and related public keys and certificates. :param Name identity: The identity name. """ raise RuntimeError("deleteIdentityInfo is not implemented") # # Get/Set Default #
[docs] def getDefaultIdentity(self): """ Get the default identity. :return: The name of default identity. :rtype: Name :raises SecurityException: if the default identity is not set. """ raise RuntimeError("getDefaultIdentity is not implemented")
[docs] def getDefaultKeyNameForIdentity(self, identityName): """ Get the default key name for the specified identity. :param Name identityName: The identity name. :return: The default key name. :rtype: Name :raises SecurityException: if the default key name for the identity is not set. """ raise RuntimeError("getDefaultKeyNameForIdentity is not implemented")
[docs] def getDefaultCertificateNameForIdentity(self, identityName): """ Get the default certificate name for the specified identity. :param Name identityName: The identity name. :return: The default certificate name. :rtype: Name :raises SecurityException: if the default key name for the identity is not set or the default certificate name for the key name is not set. """ keyName = self.getDefaultKeyNameForIdentity(identityName) return self.getDefaultCertificateNameForKey(keyName)
[docs] def getDefaultCertificateNameForKey(self, keyName): """ Get the default certificate name for the specified key. :param Name keyName: The key name. :return: The default certificate name. :rtype: Name :raises SecurityException: if the default certificate name for the key name is not set. """ raise RuntimeError("getDefaultCertificateNameForKey is not implemented")
[docs] def getAllIdentities(self, nameList, isDefault): """ Append all the identity names to the nameList. :param Array<Name> nameList: Append result names to nameList. :param bool isDefault: If True, add only the default identity name. If false, add only the non-default identity names. """ raise RuntimeError("getAllIdentities is not implemented")
[docs] def getAllKeyNamesOfIdentity(self, identityName, nameList, isDefault): """ Append all the key names of a particular identity to the nameList. :param Name identityName: The identity name to search for. :param Array<Name> nameList: Append result names to nameList. :param bool isDefault: If True, add only the default key name. If False, add only the non-default key names. """ raise RuntimeError("getAllKeyNamesOfIdentity is not implemented")
[docs] def getAllCertificateNamesOfKey(self, keyName, nameList, isDefault): """ Append all the certificate names of a particular key name to the nameList. :param Name keyName: The key name to search for. :param Array<Name> nameList: Append result names to nameList. :param bool isDefault: If True, add only the default certificate name. If False, add only the non-default certificate names. """ raise RuntimeError("getAllCertificateNamesOfKey is not implemented")
[docs] def setDefaultIdentity(self, identityName): """ Set the default identity. If the identityName does not exist, then clear the default identity so that getDefaultIdentity() raises an exception. :param Name identityName: The default identity name. """ raise RuntimeError("setDefaultIdentity is not implemented")
[docs] def setDefaultKeyNameForIdentity(self, keyName, identityNameCheck = None): """ Set a key as the default key of an identity. The identity name is inferred from keyName. :param Name keyName: The name of the key. :param Name identityNameCheck: (optional) The identity name to check that the keyName contains the same identity name. If an empty name, it is ignored. """ raise RuntimeError("setDefaultKeyNameForIdentity is not implemented")
[docs] def setDefaultCertificateNameForKey(self, keyName, certificateName): """ Set the default key name for the specified identity. :param Name keyName: The key name. :param Name certificateName: The certificate name. """ raise RuntimeError("setDefaultCertificateNameForKey is not implemented")
[docs] def getDefaultCertificate(self): """ Get the certificate of the default identity. :return: The requested certificate. If not found, return None. :rtype: IdentityCertificate """ try: certName = self.getDefaultCertificateNameForIdentity( self.getDefaultIdentity()) except: # The default is not defined. return None return self.getCertificate(certName) # A static value to make each timestamp unique among calls.
_lastTimestamp = math.floor(Common.getNowMilliseconds() / 1000.0)