Source code for pyndn.encrypt.algo.encryptor

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
# Author: From ndn-group-encrypt src/encryptor https://github.com/named-data/ndn-group-encrypt
#
# Copyright (C) 2015-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

"""
This module defines the Encryptor class which has static constants and utility
methods for encryption, such as encryptData.
Note: This class is an experimental feature. The API may change.
"""

from random import SystemRandom
from pyndn.name import Name
from pyndn.util.blob import Blob
from pyndn.key_locator import KeyLocator, KeyLocatorType
from pyndn.encoding.tlv_wire_format import TlvWireFormat
from pyndn.encrypt.encrypted_content import EncryptedContent
from pyndn.encrypt.algo.encrypt_params import EncryptParams, EncryptAlgorithmType

# The Python documentation says "Use SystemRandom if you require a
#   cryptographically secure pseudo-random number generator."
# http://docs.python.org/2/library/random.html
_systemRandom = SystemRandom()

[docs]class Encryptor(object): NAME_COMPONENT_FOR = Name.Component("FOR") NAME_COMPONENT_READ = Name.Component("READ") NAME_COMPONENT_SAMPLE = Name.Component("SAMPLE") NAME_COMPONENT_ACCESS = Name.Component("ACCESS") NAME_COMPONENT_E_KEY = Name.Component("E-KEY") NAME_COMPONENT_D_KEY = Name.Component("D-KEY") NAME_COMPONENT_C_KEY = Name.Component("C-KEY") @staticmethod
[docs] def encryptData(data, payload, keyName, key, params): """ Prepare an encrypted data packet by encrypting the payload using the key according to the params. In addition, this prepares the encoded EncryptedContent with the encryption result using keyName and params. The encoding is set as the content of the data packet. If params defines an asymmetric encryption algorithm and the payload is larger than the maximum plaintext size, this encrypts the payload with a symmetric key that is asymmetrically encrypted and provided as a nonce in the content of the data packet. The packet's <dataName>/ is updated to be <dataName>/FOR/<keyName> :param Data data: The data packet which is updated. :param Blob payload: The payload to encrypt. :param Name keyName: The key name for the EncryptedContent. :param Blob key: The encryption key value. :param EncryptParams params: The parameters for encryption. """ dataName = data.getName() dataName.append(Encryptor.NAME_COMPONENT_FOR).append(keyName) data.setName(dataName) algorithmType = params.getAlgorithmType() if (algorithmType == EncryptAlgorithmType.AesCbc or algorithmType == EncryptAlgorithmType.AesEcb): content = Encryptor._encryptSymmetric(payload, key, keyName, params) data.setContent(content.wireEncode(TlvWireFormat.get())) elif (algorithmType == EncryptAlgorithmType.RsaPkcs or algorithmType == EncryptAlgorithmType.RsaOaep): # Cryptography doesn't have a direct way to get the maximum plain text # size, so try to encrypt the payload first and catch the error if # it is too big. try: content = Encryptor._encryptAsymmetric(payload, key, keyName, params) data.setContent(content.wireEncode(TlvWireFormat.get())) return except ValueError as ex: message = ex.args[0] if not ("Data too long for key size" in message): raise ex # Else the payload is larger than the maximum plaintext size. Continue. # 128-bit nonce. nonceKeyBuffer = bytearray(16) for i in range(16): nonceKeyBuffer[i] = _systemRandom.randint(0, 0xff) nonceKey = Blob(nonceKeyBuffer, False) nonceKeyName = Name(keyName) nonceKeyName.append("nonce") symmetricParams = EncryptParams( EncryptAlgorithmType.AesCbc, AesAlgorithm.BLOCK_SIZE) nonceContent = Encryptor._encryptSymmetric( payload, nonceKey, nonceKeyName, symmetricParams) payloadContent = Encryptor._encryptAsymmetric( nonceKey, key, keyName, params) nonceContentEncoding = nonceContent.wireEncode() payloadContentEncoding = payloadContent.wireEncode() content = bytearray( nonceContentEncoding.size() + payloadContentEncoding.size()) content[0:payloadContentEncoding.size()] = payloadContentEncoding.buf() content[payloadContentEncoding.size():] = nonceContentEncoding.buf() data.setContent(Blob(content, False)) else: raise RuntimeError("Unsupported encryption method")
@staticmethod def _encryptSymmetric(payload, key, keyName, params): """ Encrypt the payload using the symmetric key according to params, and return an EncryptedContent. :param Blob payload: The data to encrypt. :param Blob key: The key value. :param Name keyName: The key name for the EncryptedContent key locator. :param EncryptParams params: The parameters for encryption. :return: A new EncryptedContent. :rtype: EncryptedContent """ algorithmType = params.getAlgorithmType() initialVector = params.getInitialVector() keyLocator = KeyLocator() keyLocator.setType(KeyLocatorType.KEYNAME) keyLocator.setKeyName(keyName) if (algorithmType == EncryptAlgorithmType.AesCbc or algorithmType == EncryptAlgorithmType.AesEcb): if (algorithmType == EncryptAlgorithmType.AesCbc): if initialVector.size() != AesAlgorithm.BLOCK_SIZE: raise RuntimeError("incorrect initial vector size") encryptedPayload = AesAlgorithm.encrypt(key, payload, params) result = EncryptedContent() result.setAlgorithmType(algorithmType) result.setKeyLocator(keyLocator) result.setPayload(encryptedPayload) result.setInitialVector(initialVector) return result else: raise RuntimeError("Unsupported encryption method") @staticmethod def _encryptAsymmetric(payload, key, keyName, params): """ Encrypt the payload using the asymmetric key according to params, and return an EncryptedContent. :param Blob payload: The data to encrypt. The size should be within range of the key. :param Blob key: The key value. :param Name keyName: The key name for the EncryptedContent key locator. :param EncryptParams params: The parameters for encryption. :return: A new EncryptedContent. :rtype: EncryptedContent """ algorithmType = params.getAlgorithmType() keyLocator = KeyLocator() keyLocator.setType(KeyLocatorType.KEYNAME) keyLocator.setKeyName(keyName) if (algorithmType == EncryptAlgorithmType.RsaPkcs or algorithmType == EncryptAlgorithmType.RsaOaep): encryptedPayload = RsaAlgorithm.encrypt(key, payload, params) result = EncryptedContent() result.setAlgorithmType(algorithmType) result.setKeyLocator(keyLocator) result.setPayload(encryptedPayload) return result else: raise RuntimeError("Unsupported encryption method") # Import these at the end of the file to avoid circular references.
from pyndn.encrypt.algo.aes_algorithm import AesAlgorithm from pyndn.encrypt.algo.rsa_algorithm import RsaAlgorithm