Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python xmlsec library support #961

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 207 additions & 58 deletions src/saml2/sigver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" Functions connected to signing and verifying.
Based on the use of xmlsec1 binaries and not the python xmlsec module.
Based on the use of xmlsec1 binaries and/or the python xmlsec module.
"""

import base64
Expand Down Expand Up @@ -878,16 +878,20 @@ class CryptoBackendXMLSecurity(CryptoBackend):
CryptoBackend implementation using pyXMLSecurity to sign and verify
XML documents.

Encrypt and decrypt is currently unsupported by pyXMLSecurity.

pyXMLSecurity uses lxml (libxml2) to parse XML data, but otherwise
try to get by with native Python code. It does native Python RSA
signatures, or alternatively PyKCS11 to offload cryptographic work
to an external PKCS#11 module.
This implementation is hypothetically more efficient than the CryptoBackendXmlSec1
implementation, but is less tested and as such is not yet the default option. To
enable it, set .crypto_backend = "XMLSecurity" in your Saml2Client initializer's
first argument.
"""

def __init__(self):
__DEBUG = 0

def __init__(self, **kwargs):
CryptoBackend.__init__(self)
try:
self.non_xml_crypto = RSACrypto(kwargs["rsa_key"])
except KeyError:
pass

@property
def version(self):
Expand All @@ -897,25 +901,155 @@ def version(self):
except (ImportError, AttributeError):
return "0.0.0"

def encrypt(self, text, recv_key, template, session_key_type, xpath=""):
"""

:param text: The text to be compiled
:param recv_key: Filename of a file where the key resides
:param template: Filename of a file with the pre-encryption part
:param session_key_type: Type and size of a new session key
'des-192' generates a new 192 bits DES key for DES3 encryption
:param xpath: What should be encrypted
:return:
"""
logger.debug("Encryption input len: %d", len(text))

import lxml.etree
import xmlsec

manager = xmlsec.KeysManager()
key = xmlsec.Key.from_file(
recv_key,
xmlsec.constants.KeyDataFormatCertPem,
None
)
manager.add_key(key)

template = lxml.etree.parse(template).getroot()
enc_ctx = xmlsec.EncryptionContext(manager)
if session_key_type == "des-192": # TODO: Will need to be expanded when additional key type support is added
enc_ctx.key = xmlsec.Key.generate(
xmlsec.constants.KeyDataDes,
192,
xmlsec.constants.KeyDataTypeSession
)
data = lxml.etree.fromstring(text)
if xpath:
data = data.xpath(xpath)[0]
enc_data = enc_ctx.encrypt_xml(template, data)

if xpath:
# Hack to fix deletion of duplicated xmlns:ns1 entry
# Could potentially be fixed in CryptoBackendXmlSec1, as duplicated namespace attributes are not recommended
result = lxml.etree.fromstring(text)
result.replace(result.xpath(xpath)[0], lxml.etree.fromstring("<REPLACE/>"))
result = str(lxml.etree.tostring(result), encoding="utf-8")
result = str(lxml.etree.tostring(enc_data), encoding="utf-8").join(result.split("<REPLACE/>"))
else:
result = str(lxml.etree.tostring(enc_data), encoding="utf-8")

return "<?xml version=\"1.0\"?>" + result # Hack to keep version tags identical, otherwise would have encoding attribute

def encrypt_assertion(self, statement, enc_key, template, key_type="des-192", node_xpath=None, node_id=None):
"""
Will encrypt an assertion

:param statement: A XML document that contains the assertion to encrypt
:param enc_key: File name of a file containing the encryption key
:param template: A template for the encryption part to be added.
:param key_type: The type of session key to use.
:return: The encrypted text
"""
import lxml.etree
import xmlsec

if isinstance(statement, SamlBase):
statement = pre_encrypt_assertion(statement)

if not node_xpath:
node_xpath = ASSERT_XPATH

manager = xmlsec.KeysManager()
key = xmlsec.Key.from_file(
enc_key,
xmlsec.constants.KeyDataFormatCertPem,
None
)
manager.add_key(key)

template = lxml.etree.parse(template).getroot()
enc_ctx = xmlsec.EncryptionContext(manager)
enc_ctx.key = xmlsec.Key.generate(
xmlsec.constants.KeyDataAes if key_type.startswith("aes") else xmlsec.constants.KeyDataDes,
int(key_type[-3:]) if len(key_type) >= 3 and key_type[-3:].isdigit() else 192,
xmlsec.constants.KeyDataTypeSession
)
data = lxml.etree.fromstring(statement).xpath(node_xpath)[0]
enc_data = enc_ctx.encrypt_xml(template, data)

# Hack to fix deletion of duplicated xmlns:ns1 entry
# Could potentially be fixed in CryptoBackendXmlSec1, as duplicated namespace attributes are not recommended
result = lxml.etree.fromstring(statement)
result.replace(result.xpath(node_xpath)[0], lxml.etree.fromstring("<REPLACE/>"))
result = str(lxml.etree.tostring(result), encoding="utf-8")
result = str(lxml.etree.tostring(enc_data), encoding="utf-8").join(result.split("<REPLACE/>"))

return "<?xml version=\"1.0\"?>" + result # Hack to keep version tags identical, otherwise would have encoding attribute

def decrypt(self, enctext, key_file):
"""

:param enctext: XML document containing an encrypted part
:param key_file: The key to use for the decryption
:return: The decrypted document
"""
logger.debug("Decrypt input len: %d", len(enctext))

import lxml.etree
import xmlsec

manager = xmlsec.KeysManager()
key = xmlsec.Key.from_file(
key_file,
xmlsec.constants.KeyDataFormatPem,
None
)
manager.add_key(key)

enc_ctx = xmlsec.EncryptionContext(manager)
enc_data = xmlsec.tree.find_child(lxml.etree.fromstring(enctext), xmlsec.constants.NodeEncryptedData, xmlsec.constants.EncNs)
decrypted = enc_ctx.decrypt(enc_data)
result = lxml.etree.fromstring(enctext)
result.replace(xmlsec.tree.find_child(result, xmlsec.constants.NodeEncryptedData, xmlsec.constants.EncNs), decrypted)
result = str(lxml.etree.tostring(result), encoding="utf-8")

return "<?xml version=\"1.0\"?>" + result # Hack to keep version tags identical, otherwise would have encoding attribute

def sign_statement(self, statement, node_name, key_file, node_id):
"""
Sign an XML statement.

The parameters actually used in this CryptoBackend
implementation are :

:param statement: XML as string
:param node_name: Name of the node to sign
:param key_file: xmlsec key_spec string(), filename,
'pkcs11://' URI or PEM data
:returns: Signed XML as string
:param statement: The statement to be signed
:param node_name: string like 'urn:oasis:names:...:Assertion'
:param key_file: The file where the key can be found
:param node_id: (not needed given xmlsec.tree.find_node)
:return: The signed statement
"""
import lxml.etree
import xmlsec

xml = xmlsec.parse_xml(statement)
signed = xmlsec.sign(xml, key_file)
signed_str = lxml.etree.tostring(signed, xml_declaration=False, encoding="UTF-8")
if isinstance(statement, SamlBase):
statement = str(statement)
template = lxml.etree.fromstring(statement)

source_node = xmlsec.tree.find_node(template, node_name.split(':')[-1], ":".join(node_name.split(":")[:-1]))
signature_node = xmlsec.tree.find_node(source_node, xmlsec.constants.NodeSignature)
ctx = xmlsec.SignatureContext()
ctx.key = xmlsec.Key.from_file(key_file, xmlsec.constants.KeyDataFormatPem)
ctx.register_id(source_node, "ID")

ctx.sign(signature_node)
signed_str = lxml.etree.tostring(template, xml_declaration=False, encoding="UTF-8")
if not isinstance(signed_str, str):
signed_str = signed_str.decode("utf-8")
return signed_str
Expand All @@ -924,24 +1058,35 @@ def validate_signature(self, signedtext, cert_file, cert_type, node_name, node_i
"""
Validate signature on XML document.

The parameters actually used in this CryptoBackend
implementation are :

:param signedtext: The signed XML data as string
:param cert_file: xmlsec key_spec string(), filename,
'pkcs11://' URI or PEM data
:param cert_type: string, must be 'pem' for now
:returns: True on successful validation, False otherwise
:param signedtext: The XML document as a string
:param cert_file: The public key that was used to sign the document
:param cert_type: The file type of the certificate
:param node_name: The name of the class that is signed
:param node_id: The identifier of the node (not needed given xmlsec.tree.find_node)
:return: Boolean True if the signature was correct otherwise False.
"""
if cert_type != "pem":
raise Unsupported("Only PEM certs supported here")

import lxml.etree
import xmlsec

xml = xmlsec.parse_xml(signedtext)
if not isinstance(signedtext, bytes):
signedtext = signedtext.encode("utf-8")
template = lxml.etree.fromstring(signedtext)
xmlsec.tree.add_ids(template, ["ID"])
source_node = xmlsec.tree.find_node(template, node_name.split(':')[-1], ":".join(node_name.split(":")[:-1]))
signature_node = xmlsec.tree.find_node(source_node, xmlsec.constants.NodeSignature)

ctx = xmlsec.SignatureContext()
if cert_type == "pem":
ctx.key = xmlsec.Key.from_file(cert_file, xmlsec.constants.KeyDataFormatCertPem)
elif cert_type == "der":
ctx.key = xmlsec.Key.from_file(cert_file, xmlsec.constants.KeyDataFormatCertDer)
else:
ctx.key = xmlsec.Key.from_file(cert_file, xmlsec.constants.KeyDataFormatUnknown)
ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])

try:
return xmlsec.verify(xml, cert_file)
ctx.verify(signature_node)
return True
except xmlsec.XMLSigException:
return False

Expand Down Expand Up @@ -992,7 +1137,23 @@ def security_context(conf):
sec_backend = RSACrypto(rsa_key)
elif conf.crypto_backend == "XMLSecurity":
# new and somewhat untested pyXMLSecurity crypto backend.
try:
import xmlsec
except ImportError:
logger.error(f"Python xmlsec library not found")
raise

crypto = CryptoBackendXMLSecurity()

_file_name = conf.getattr("key_file", "")
if _file_name:
try:
rsa_key = import_rsa_key_from_file(_file_name)
except Exception as err:
logger.error(f"Cannot import key from {_file_name}: {err}")
raise
else:
sec_backend = RSACrypto(rsa_key)
else:
err_msg = "Unknown crypto_backend {backend}"
err_msg = err_msg.format(backend=conf.crypto_backend)
Expand Down Expand Up @@ -1814,32 +1975,20 @@ def pre_signature_part(
return signature


# <?xml version="1.0" encoding="UTF-8"?>
# <EncryptedData Id="ED" Type="http://www.w3.org/2001/04/xmlenc#Element"
# xmlns="http://www.w3.org/2001/04/xmlenc#">
# <EncryptionMethod Algorithm="http://www.w3
# .org/2001/04/xmlenc#tripledes-cbc"/>
# <ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
# <EncryptedKey Id="EK" xmlns="http://www.w3.org/2001/04/xmlenc#">
# <EncryptionMethod Algorithm="http://www.w3
# .org/2001/04/xmlenc#rsa-1_5"/>
# <ds:KeyInfo xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
# <ds:KeyName>my-rsa-key</ds:KeyName>
# </ds:KeyInfo>
# <CipherData>
# <CipherValue>
# </CipherValue>
# </CipherData>
# <ReferenceList>
# <DataReference URI="#ED"/>
# </ReferenceList>
# </EncryptedKey>
# </ds:KeyInfo>
# <CipherData>
# <CipherValue>
# </CipherValue>
# </CipherData>
# </EncryptedData>
#<ns0:EncryptedData xmlns:ns0="http://www.w3.org/2001/04/xmlenc#" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" Id="ED_..." Type="http://www.w3.org/2001/04/xmlenc#Element">
# <ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#tripledes-cbc" />
# <ns1:KeyInfo>
# <ns0:EncryptedKey Id="EK_...">
# <ns0:EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p" />
# <ns0:CipherData>
# <ns0:CipherValue />
# </ns0:CipherData>
# </ns0:EncryptedKey>
# </ns1:KeyInfo>
# <ns0:CipherData>
# <ns0:CipherValue />
# </ns0:CipherData>
#</ns0:EncryptedData>


def pre_encryption_part(
Expand Down
Loading