# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
# Author: Adeola Bannis <thecodemaiden@gmail.com>
# From ndn-cxx security by Yingdi Yu <yingdi@cs.ucla.edu>.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.
"""
BasicIdentityStorage extends IdentityStorage to implement a basic storage of
identity, public keys and certificates using SQLite.
"""
import os
import math
import sqlite3
from pyndn import Name, KeyLocator
from pyndn.util.blob import Blob
from pyndn.security.security_exception import SecurityException
from pyndn.security.identity.identity_storage import IdentityStorage
from pyndn.security.certificate.identity_certificate import IdentityCertificate
INIT_ID_TABLE = ["""
CREATE TABLE IF NOT EXISTS
Identity(
identity_name BLOB NOT NULL,
default_identity INTEGER DEFAULT 0,
PRIMARY KEY (identity_name)
);
""",
"CREATE INDEX identity_index ON Identity(identity_name);"]
INIT_KEY_TABLE = ["""
CREATE TABLE IF NOT EXISTS
Key(
identity_name BLOB NOT NULL,
key_identifier BLOB NOT NULL,
key_type INTEGER,
public_key BLOB,
default_key INTEGER DEFAULT 0,
active INTEGER DEFAULT 0,
PRIMARY KEY (identity_name, key_identifier)
);
""",
"CREATE INDEX key_index ON Key(identity_name);"]
INIT_CERT_TABLE = ["""
CREATE TABLE IF NOT EXISTS
Certificate(
cert_name BLOB NOT NULL,
cert_issuer BLOB NOT NULL,
identity_name BLOB NOT NULL,
key_identifier BLOB NOT NULL,
not_before TIMESTAMP,
not_after TIMESTAMP,
certificate_data BLOB NOT NULL,
valid_flag INTEGER DEFAULT 1,
default_cert INTEGER DEFAULT 0,
PRIMARY KEY (cert_name)
);
""",
"CREATE INDEX cert_index ON Certificate(cert_name);",
"CREATE INDEX subject ON Certificate(identity_name);"]
[docs]class BasicIdentityStorage(IdentityStorage):
"""
Create a new BasicIdentityStorage to work with an SQLite file.
:param str databaseFilePath: (optional) The path of the SQLite file. If
omitted, use the default location.
"""
def __init__(self, databaseFilePath = None):
super(BasicIdentityStorage, self).__init__()
if databaseFilePath == None or databaseFilePath == "":
if not "HOME" in os.environ:
# Don't expect this to happen
home = "."
else:
home = os.environ["HOME"]
identityDirectory = os.path.join(home, ".ndn")
if not os.path.exists(identityDirectory):
os.makedirs(identityDirectory)
databaseFilePath = os.path.join(identityDirectory, "ndnsec-public-info.db")
self._database = sqlite3.connect(databaseFilePath)
# Check if the ID table exists.
cursor = self._database.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' And name='Identity'")
if cursor.fetchone() == None:
for command in INIT_ID_TABLE:
self._database.execute(command)
cursor.close()
# Check if the Key table exists.
cursor = self._database.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' And name='Key'")
if cursor.fetchone() == None:
for command in INIT_KEY_TABLE:
self._database.execute(command)
cursor.close()
# Check if the Certificate table exists.
cursor = self._database.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' And name='Certificate'")
if cursor.fetchone() == None:
for command in INIT_CERT_TABLE:
self._database.execute(command)
cursor.close()
self._database.commit()
[docs] def doesIdentityExist(self, identityName):
"""
Check if the specified identity already exists.
:param Name identityName: The identity name.
:return: True if the identity exists, otherwise False.
:rtype: bool
"""
result = False
cursor = self._database.cursor()
cursor.execute(
"SELECT count(*) FROM Identity WHERE identity_name=?",
(identityName.toUri(),))
(count,) = cursor.fetchone()
if count > 0:
result = True
cursor.close()
return result
[docs] def addIdentity(self, identityName):
"""
Add a new identity. Do nothing if the identity already exists.
:param Name identityName: The identity name.
"""
identityUri = identityName.toUri()
if self.doesIdentityExist(identityName):
return
cursor = self._database.cursor()
cursor.execute("INSERT INTO Identity (identity_name) VALUES(?)",
(identityUri,))
self._database.commit()
cursor.close()
[docs] def revokeIdentity(self):
"""
Revoke the identity.
:return: True if the identity was revoked, False if not.
:rtype: bool
"""
return False
[docs] def doesKeyExist(self, keyName):
"""
Check if the specified key already exists.
:param Name keyName: The name of the key.
:return: True if the key exists, otherwise False.
:rtype: bool
"""
keyId = keyName[-1].toEscapedString()
identityName = keyName[:-1]
cursor = self._database.cursor()
cursor.execute(
"SELECT count(*) FROM Key WHERE identity_name=? AND key_identifier=?",
(identityName.toUri(), keyId))
keyIdExists = False
(count,) = cursor.fetchone()
if count > 0:
keyIdExists = True
cursor.close()
return keyIdExists
[docs] def addKey(self, keyName, keyType, publicKeyDer):
"""
Add a public key to the identity storage. Also call addIdentity to ensure
that the identityName for the key exists. However, if the key already
exists, do nothing.
:param Name keyName: The name of the public key to be added.
:param keyType: Type of the public key to be added.
:type keyType: int from KeyType
:param Blob publicKeyDer: A blob of the public key DER to be added.
"""
if keyName.size() == 0:
return
if self.doesKeyExist(keyName):
return
identityName = keyName[:-1]
identityUri = identityName.toUri()
self.addIdentity(identityName)
keyId = keyName[-1].toEscapedString()
keyBuffer = sqlite3.Binary(bytearray(publicKeyDer.buf()))
cursor = self._database.cursor()
cursor.execute(
"INSERT INTO Key (identity_name, key_identifier, key_type, public_key) VALUES(?,?,?,?)",
(identityUri, keyId, keyType, keyBuffer))
self._database.commit()
cursor.close()
[docs] def getKey(self, keyName):
"""
Get the public key DER blob from the identity storage.
:param Name keyName: The name of the requested public key.
:return: The DER Blob.
:rtype: Blob
:raises SecurityException: if the key doesn't exist.
"""
if keyName.size() == 0:
raise SecurityException(
"BasicIdentityStorage::getKey: Empty keyName")
identityUri = keyName[:-1].toUri()
keyId = keyName[-1].toEscapedString()
cursor = self._database.cursor()
cursor.execute("SELECT public_key FROM Key WHERE identity_name=? AND key_identifier=?",
(identityUri, keyId))
row = cursor.fetchone()
if row != None:
(keyData, ) = row
cursor.close()
return Blob(bytearray(keyData), False)
else:
cursor.close()
raise SecurityException(
"BasicIdentityStorage::getKey: The key does not exist")
[docs] def activateKey(self, keyName):
"""
Activate a key. If a key is marked as inactive, its private part will
not be used in packet signing.
:param Name keyName: The name of the key.
"""
self._updateKeyStatus(keyName, True)
[docs] def deactivateKey(self, keyName):
"""
Deactivate a key. If a key is marked as inactive, its private part will
not be used in packet signing.
:param Name keyName: The name of the key.
"""
self._updateKeyStatus(keyName, False)
[docs] def deletePublicKeyInfo(self, keyName):
"""
Remove the key and all certificates associated with it.
:param Name keyName: The name of the key.
"""
if keyName.size() == 0:
return
keyId = keyName[-1].toEscapedString()
identityName = keyName[:-1]
cursor = self._database.cursor()
cursor.execute("DELETE FROM Certificate WHERE identity_name=? AND key_identifier=?",
(identityName.toUri(), keyId))
cursor.execute("DELETE FROM Key WHERE identity_name=? and key_identifier=?",
(identityName.toUri(), keyId))
self._database.commit()
cursor.close()
[docs] def doesCertificateExist(self, certificateName):
"""
Check if the specified certificate already exists.
:param Name certificateName: The name of the certificate.
:return: True if the certificate exists, otherwise False.
:rtype: bool
"""
cursor = self._database.cursor()
cursor.execute(
"SELECT count(*) FROM Certificate WHERE cert_name=?",
(certificateName.toUri(),))
certExists = False
(count,) = cursor.fetchone()
if count > 0:
certExists = True
cursor.close()
return certExists
[docs] def addCertificate(self, certificate):
"""
Add a certificate to the identity storage. Also call addKey to ensure
that the certificate key exists. If the certificate is already
installed, don't replace it.
:param IdentityCertificate certificate: The certificate to be added.
This makes a copy of the certificate.
"""
certificateName = certificate.getName()
keyName = certificate.getPublicKeyName()
self.addKey(keyName, certificate.getPublicKeyInfo().getKeyType(),
certificate.getPublicKeyInfo().getKeyDer())
if self.doesCertificateExist(certificateName):
return
keyId = keyName.get(-1).toEscapedString()
identity = keyName[:-1]
# Insert the certificate.
signature = certificate.getSignature()
signerName = KeyLocator.getFromSignature(signature).getKeyName()
# Convert from milliseconds to seconds since 1/1/1970.
notBefore = int(math.floor(certificate.getNotBefore() / 1000.0))
notAfter = int(math.floor(certificate.getNotAfter() / 1000.0))
encodedCert = sqlite3.Binary(bytearray(certificate.wireEncode().buf()))
cursor = self._database.cursor()
cursor.execute(
"INSERT INTO Certificate (cert_name, cert_issuer, identity_name, key_identifier, not_before, not_after, certificate_data) " +
"VALUES (?,?,?,?,?,?,?)",
(certificateName.toUri(), signerName.toUri(), identity.toUri(), keyId,
notBefore, notAfter, encodedCert))
self._database.commit()
cursor.close()
[docs] def getCertificate(self, certificateName):
"""
Get a certificate from the identity storage.
:param Name certificateName: The name of the requested certificate.
:return: The requested certificate.
:rtype: IdentityCertificate
:raises SecurityException: if the certificate doesn't exist.
"""
cursor = self._database.cursor()
cursor.execute("SELECT certificate_data FROM Certificate WHERE cert_name=?",
(certificateName.toUri(), ))
row = cursor.fetchone()
if row != None:
(certData, ) = row
cursor.close()
certificate = IdentityCertificate()
try:
certificate.wireDecode(bytearray(certData))
except ValueError:
raise SecurityException(
"BasicIdentityStorage::getCertificate: The certificate cannot be decoded")
return certificate
else:
cursor.close()
raise SecurityException(
"BasicIdentityStorage::getCertificate: The certificate does not exist")
[docs] def deleteCertificateInfo(self, certificateName):
"""
Remove a certificate from associated keys.
:param Name keyName: The name of the key.
"""
if certificateName.size() == 0:
return
cursor = self._database.cursor()
cursor.execute("DELETE FROM Certificate WHERE cert_name=?",
(certificateName.toUri(),))
self._database.commit()
cursor.close()
[docs] def deleteIdentityInfo(self, identityName):
"""
Delete an identity and related public keys and certificates.
:param Name identity: The identity name.
"""
identity = identityName.toUri()
cursor = self._database.cursor()
cursor.execute("DELETE FROM Certificate WHERE identity_name=?",
(identity,))
cursor.execute("DELETE FROM Key WHERE identity_name=?",
(identity,))
cursor.execute("DELETE FROM Identity WHERE identity_name=?",
(identity,))
self._database.commit()
cursor.close()
#
# Get/Set Default
#
[docs] def getDefaultIdentity(self):
"""
Get the default identity.
:return: The name of default identity.
:rtype: Name
:raises SecurityException: if the default identity is not set.
"""
cursor = self._database.cursor()
cursor.execute(
"SELECT identity_name FROM Identity WHERE default_identity=1")
row = cursor.fetchone()
if row != None:
(identity,) = row
cursor.close()
return Name(identity)
else:
cursor.close()
raise SecurityException(
"BasicIdentityStorage::getDefaultIdentity: The default identity is not defined")
[docs] def getDefaultKeyNameForIdentity(self, identityName):
"""
Get the default key name for the specified identity.
:param Name identityName: The identity name.
:return: The default key name.
:rtype: Name
:raises SecurityException: if the default key name for the identity is
not set.
"""
cursor = self._database.cursor()
cursor.execute(
"SELECT key_identifier FROM Key WHERE identity_name=? AND default_key=1",
(identityName.toUri(),))
row = cursor.fetchone()
if row != None:
(keyName,) = row
cursor.close()
return Name(identityName).append(keyName)
else:
cursor.close()
raise SecurityException(
"BasicIdentityStorage::getDefaultKeyNameForIdentity: The default key for the identity is not defined")
[docs] def getDefaultCertificateNameForKey(self, keyName):
"""
Get the default certificate name for the specified key.
:param Name keyName: The key name.
:return: The default certificate name.
:rtype: Name
:raises SecurityException: if the default certificate name for the key
name is not set.
"""
keyId = keyName[-1].toEscapedString()
identityName = keyName[:-1]
cursor = self._database.cursor()
cursor.execute(
"SELECT cert_name FROM Certificate WHERE identity_name=? AND key_identifier=? AND default_cert=1",
(identityName.toUri(), keyId))
row = cursor.fetchone()
if row != None:
(certName,) = row
cursor.close()
return Name(certName)
else:
cursor.close()
raise SecurityException(
"BasicIdentityStorage::getDefaultCertificateNameForKey: The default certificate for the key name is not defined")
[docs] def getAllIdentities(self, nameList, isDefault):
"""
Append all the identity names to the nameList.
:param Array<Name> nameList: Append result names to nameList.
:param bool isDefault: If True, add only the default identity name. If
false, add only the non-default identity names.
"""
if isDefault:
query = "SELECT identity_name FROM Identity WHERE default_identity=1"
else:
query = "SELECT identity_name FROM Identity WHERE default_identity=0"
cursor = self._database.cursor()
cursor.execute(query)
keyIds = cursor.fetchall()
for (keyId, ) in keyIds:
nameList.append(Name(keyId))
cursor.close()
[docs] def getAllKeyNamesOfIdentity(self, identityName, nameList, isDefault):
"""
Append all the key names of a particular identity to the nameList.
:param Name identityName: The identity name to search for.
:param Array<Name> nameList: Append result names to nameList.
:param bool isDefault: If True, add only the default key name. If False,
add only the non-default key names.
"""
if isDefault:
query = "SELECT key_identifier FROM Key WHERE default_key=1 and identity_name=?"
else:
query = "SELECT key_identifier FROM Key WHERE default_key=0 and identity_name=?"
cursor = self._database.cursor()
cursor.execute(query, (identityName.toUri(), ))
keyIds = cursor.fetchall()
for (keyId, ) in keyIds:
nameList.append(Name(identityName).append(keyId))
cursor.close()
[docs] def getAllCertificateNamesOfKey(self, keyName, nameList, isDefault):
"""
Append all the certificate names of a particular key name to the nameList.
:param Name keyName: The key name to search for.
:param Array<Name> nameList: Append result names to nameList.
:param bool isDefault: If True, add only the default certificate name.
If False, add only the non-default certificate names.
"""
if isDefault:
query = """SELECT cert_name FROM Certificate
WHERE default_cert=1 and identity_name=? and key_identifier=?"""
else:
query = """SELECT cert_name FROM Certificate
WHERE default_cert=0 and identity_name=? and key_identifier=?"""
cursor = self._database.cursor()
cursor.execute(query, (keyName.getPrefix(-1).toUri(),
keyName.get(-1).toEscapedString()))
keyIds = cursor.fetchall()
for (keyId, ) in keyIds:
nameList.append(Name(keyId))
cursor.close()
[docs] def setDefaultIdentity(self, identityName):
"""
Set the default identity. If the identityName does not exist, then clear
the default identity so that getDefaultIdentity() raises an exception.
:param Name identityName: The default identity name.
"""
# Reset the previous default identity.
cursor = self._database.cursor()
cursor.execute(
"UPDATE Identity SET default_identity=0 WHERE default_identity=1")
# Set the current default identity.
cursor.execute(
"UPDATE Identity SET default_identity=1 WHERE identity_name=?",
(identityName.toUri(), ))
self._database.commit()
cursor.close()
[docs] def setDefaultKeyNameForIdentity(self, keyName, identityNameCheck = None):
"""
Set a key as the default key of an identity. The identity name is
inferred from keyName.
:param Name keyName: The name of the key.
:param Name identityNameCheck: (optional) The identity name to check
that the keyName contains the same identity name. If an empty name, it
is ignored.
"""
keyId = keyName[-1].toEscapedString()
identityName = keyName[:-1]
if (not (identityNameCheck is None) and
identityNameCheck.size() != 0 and
not identityNameCheck.equals(identityName)):
raise SecurityException(
"Specified identity name does not match the key name")
# Reset the previous default key.
identityUri = identityName.toUri()
cursor = self._database.cursor()
cursor.execute(
"UPDATE Key SET default_key=0 WHERE default_key=1 and identity_name=?",
(identityUri, ))
# Set the current default Key.
cursor.execute(
"UPDATE Key SET default_key=1 WHERE identity_name=? AND key_identifier=?",
(identityUri, keyId))
self._database.commit()
cursor.close()
[docs] def setDefaultCertificateNameForKey(self, keyName, certificateName):
"""
Set the default key name for the specified identity.
:param Name keyName: The key name.
:param Name certificateName: The certificate name.
"""
keyId = keyName[-1].toEscapedString()
identityName = keyName[:-1]
# Reset the previous default certificate.
identityUri = identityName.toUri()
cursor = self._database.cursor()
cursor.execute(
"UPDATE Certificate SET default_cert=0 WHERE default_cert=1 AND identity_name=? AND key_identifier=?",
(identityUri, keyId))
# Set the current default Certificate.
cursor.execute(
"UPDATE Certificate SET default_cert=1 WHERE identity_name=? AND key_identifier=? AND cert_name=?",
(identityUri, keyId, certificateName.toUri()))
self._database.commit()
cursor.close()
def _updateKeyStatus(self, keyName, isActive):
"""
Update the active flag of Key.
:param Name keyName: The key name.
:param bool isActive: The active flag.
"""
keyId = keyName[-1].toEscapedString()
identityName = keyName[:-1]
cursor = self._database.cursor()
cursor.execute(
"UPDATE Key SET active=? WHERE identity_name=? AND key_identifier=?",
((1 if isActive else 0), identityName.toUri(), keyId))
self._database.commit()
cursor.close()