Source code for pyndn.security.policy.policy_manager

# -*- Mode:python; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
#
# Copyright (C) 2014-2016 Regents of the University of California.
# Author: Jeff Thompson <jefft0@remap.ucla.edu>
# From ndn-cxx security by Yingdi Yu <yingdi@cs.ucla.edu>.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# A copy of the GNU Lesser General Public License is in the file COPYING.

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding, ec
from cryptography.hazmat.primitives.serialization import load_der_public_key
from cryptography.hazmat.primitives import hashes
from cryptography.exceptions import InvalidSignature
from pyndn.digest_sha256_signature import DigestSha256Signature
from pyndn.sha256_with_rsa_signature import Sha256WithRsaSignature
from pyndn.sha256_with_ecdsa_signature import Sha256WithEcdsaSignature
from pyndn.security.security_exception import SecurityException

"""
This module defines the PolicyManager class which is an abstract base class to
represent the policy for verifying data packets. You must create an object of a
subclass.
"""

[docs]class PolicyManager(object):
[docs] def skipVerifyAndTrust(self, dataOrInterest): """ Check if the received data packet or signed interest can escape from verification and be trusted as valid. Your derived class should override. :param dataOrInterest: The received data packet or interest. :type dataOrInterest: Data or Interest :return: True if the data or interest does not need to be verified to be trusted as valid, otherwise False. :rtype: bool :raises RuntimeError: for unimplemented if the derived class does not override. """ raise RuntimeError("skipVerifyAndTrust is not implemented")
[docs] def requireVerify(self, dataOrInterest): """ Check if this PolicyManager has a verification rule for the received data packet or signed interest. Your derived class should override. :param dataOrInterest: The received data packet or interest. :type dataOrInterest: Data or Interest :return: True if the data or interest must be verified, otherwise False. :rtype: bool :raises RuntimeError: for unimplemented if the derived class does not override. """ raise RuntimeError("requireVerify is not implemented")
[docs] def checkVerificationPolicy(self, dataOrInterest, stepCount, onVerified, onVerifyFailed, wireFormat = None): """ Check whether the received data packet complies with the verification policy, and get the indication of the next verification step. Your derived class should override. :param dataOrInterest: The Data object or interest with the signature to check. :type dataOrInterest: Data or Interest :param int stepCount: The number of verification steps that have been done, used to track the verification progress. :param onVerified: If the signature is verified, this calls onVerified(dataOrInterest). NOTE: The library will log any exceptions raised by this callback, but for better error handling the callback should catch and properly handle any exceptions. :type onVerified: function object :param onVerifyFailed: If the signature check fails, this calls onVerifyFailed(dataOrInterest). NOTE: The library will log any exceptions raised by this callback, but for better error handling the callback should catch and properly handle any exceptions. :type onVerifyFailed: function object :return: The indication of next verification step, or None if there is no further step. :rtype: ValidationRequest :raises RuntimeError: for unimplemented if the derived class does not override. """ raise RuntimeError("checkVerificationPolicy is not implemented")
[docs] def checkSigningPolicy(self, dataName, certificateName): """ Check if the signing certificate name and data name satisfy the signing policy. Your derived class should override. :param Name dataName: The name of data to be signed. :param Name certificateName: The name of signing certificate. :return: True if the signing certificate can be used to sign the data, otherwise False. :rtype: bool :raises RuntimeError: for unimplemented if the derived class does not override. """ raise RuntimeError("checkSigningPolicy is not implemented")
[docs] def inferSigningIdentity(self, dataName): """ Infer the signing identity name according to the policy. If the signing identity cannot be inferred, return an empty name. Your derived class should override. :param Name dataName: The name of data to be signed. :return: The signing identity or an empty name if cannot infer. :rtype: Name :raises RuntimeError: for unimplemented if the derived class does not override. """ raise RuntimeError("inferSigningIdentity is not implemented")
@staticmethod
[docs] def verifySignature(signature, signedBlob, publicKeyDer): """ Check the type of signature and use the publicKeyDer to verify the signedBlob using the appropriate signature algorithm. :param Blob signature: An object of a subclass of Signature, e.g. Sha256WithRsaSignature. :param SignedBlob signedBlob: the SignedBlob with the signed portion to verify. :param Blob publicKeyDer: The DER-encoded public key used to verify the signature. This is ignored if the signature type does not require a public key. :return: True if the signature verifies, False if not. :rtype: bool :raises: SecurityException if the signature type is not recognized or if publicKeyDer can't be decoded. """ if isinstance(signature, Sha256WithRsaSignature): if publicKeyDer.isNull(): return False return PolicyManager._verifySha256WithRsaSignature( signature.getSignature(), signedBlob, publicKeyDer) elif isinstance(signature, Sha256WithEcdsaSignature): if publicKeyDer.isNull(): return False return PolicyManager._verifySha256WithEcdsaSignature( signature.getSignature(), signedBlob, publicKeyDer) elif isinstance(signature, DigestSha256Signature): return PolicyManager._verifyDigestSha256Signature( signature.getSignature(), signedBlob) else: raise SecurityException( "PolicyManager.verify: Signature type is unknown")
@staticmethod def _verifySha256WithRsaSignature(signature, signedBlob, publicKeyDer): """ Verify the RSA signature on the SignedBlob using the given public key. :param Blob signature: The signature bits. :param SignedBlob signedBlob: the SignedBlob with the signed portion to verify. :param Blob publicKeyDer: The DER-encoded public key used to verify the signature. :return: True if the signature verifies, False if not. :rtype: bool """ # Get the public key. publicKeyDerBytes = publicKeyDer.toBytes() try: publicKey = load_der_public_key( publicKeyDerBytes, backend = default_backend()) except: raise SecurityException("Cannot decode the RSA public key") # Verify. verifier = publicKey.verifier( signature.toBytes(), padding.PKCS1v15(), hashes.SHA256()) verifier.update(signedBlob.toSignedBytes()) try: verifier.verify() return True except InvalidSignature: return False @staticmethod def _verifySha256WithEcdsaSignature(signature, signedBlob, publicKeyDer): """ Verify the ECDSA signature on the SignedBlob using the given public key. :param Blob signature: The signature bits. :param SignedBlob signedBlob: the SignedBlob with the signed portion to verify. :param Blob publicKeyDer: The DER-encoded public key used to verify the signature. :return: True if the signature verifies, False if not. :rtype: bool """ # Get the public key. publicKeyDerBytes = publicKeyDer.toBytes() try: publicKey = load_der_public_key( publicKeyDerBytes, backend = default_backend()) except: raise SecurityException("Cannot decode the ECDSA public key") # Verify. verifier = publicKey.verifier( signature.toBytes(), ec.ECDSA(hashes.SHA256())) verifier.update(signedBlob.toSignedBytes()) try: verifier.verify() return True except InvalidSignature: return False @staticmethod def _verifyDigestSha256Signature(signature, signedBlob): """ Verify the DigestSha256 signature on the SignedBlob by verifying that the digest of SignedBlob equals the signature. :param Blob signature: The signature bits. :param SignedBlob signedBlob: the SignedBlob with the signed portion to verify. :return: True if the signature verifies, False if not. :rtype: bool """ # Get the hash of the bytes to verify. sha256 = hashes.Hash(hashes.SHA256(), backend=default_backend()) sha256.update(signedBlob.toSignedBytes()) signedPortionDigest = sha256.finalize() return signature.toBytes() == signedPortionDigest