|
|
|
@ -5,44 +5,30 @@ |
|
|
|
import os |
|
|
|
import logging |
|
|
|
import suds |
|
|
|
from OpenSSL import crypto |
|
|
|
from base64 import b64encode, b64decode |
|
|
|
from uuid import uuid4 |
|
|
|
from lxml import etree |
|
|
|
from pytrustnfe.xml import render_xml, valida_schema, sanitize_response |
|
|
|
from pytrustnfe.client import get_authenticated_client |
|
|
|
from pytrustnfe.certificado import extract_cert_and_key_from_pfx, save_cert_key |
|
|
|
from pytrustnfe.nfe.assinatura import Assinatura |
|
|
|
|
|
|
|
|
|
|
|
from signxml import XMLSigner |
|
|
|
from signxml import methods |
|
|
|
|
|
|
|
|
|
|
|
def sign_xml(xml, cert, key): |
|
|
|
parser = etree.XMLParser(remove_blank_text=True, remove_comments=True) |
|
|
|
elem = etree.fromstring(xml, parser=parser) |
|
|
|
|
|
|
|
root = etree.Element('root') |
|
|
|
rps = elem.find('RPS') |
|
|
|
|
|
|
|
signer = XMLSigner( |
|
|
|
digest_algorithm=u'sha1', signature_algorithm="rsa-sha1", |
|
|
|
method=methods.enveloped, |
|
|
|
c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315') |
|
|
|
ns = {} |
|
|
|
ns[None] = signer.namespaces['ds'] |
|
|
|
signer.namespaces = ns |
|
|
|
signed_root = signer.sign(rps, key=str(key), cert=cert) |
|
|
|
|
|
|
|
root.append( |
|
|
|
signed_root.find('{http://www.w3.org/2000/09/xmldsig#}Signature')) |
|
|
|
elem.remove(rps) |
|
|
|
elem.append(signed_root) |
|
|
|
elem.append(root.find('{http://www.w3.org/2000/09/xmldsig#}Signature')) |
|
|
|
return etree.tostring(elem) |
|
|
|
def sign_tag(certificado, **kwargs): |
|
|
|
pkcs12 = crypto.load_pkcs12(certificado.pfx, certificado.password) |
|
|
|
key = pkcs12.get_privatekey() |
|
|
|
for item in kwargs['nfse']['lista_rps']: |
|
|
|
signed = crypto.sign(key, item['assinatura'], 'SHA1') |
|
|
|
item['assinatura'] = b64encode(signed) |
|
|
|
|
|
|
|
|
|
|
|
def _send(certificado, method, **kwargs): |
|
|
|
# A little hack to test |
|
|
|
path = os.path.join(os.path.dirname(__file__), 'templates') |
|
|
|
if method == 'TesteEnvioLoteRPS' or method == 'EnvioLoteRPS': |
|
|
|
sign_tag(certificado, **kwargs) |
|
|
|
|
|
|
|
if method == 'TesteEnvioLoteRPS': |
|
|
|
xml = render_xml(path, 'EnvioLoteRPS.xml', **kwargs) |
|
|
|
else: |
|
|
|
@ -54,7 +40,9 @@ def _send(certificado, method, **kwargs): |
|
|
|
cert_path, key_path = save_cert_key(cert, key) |
|
|
|
client = get_authenticated_client(base_url, cert_path, key_path) |
|
|
|
|
|
|
|
xml_signed = sign_xml(xml, cert, key) |
|
|
|
pfx_path = certificado.save_pfx() |
|
|
|
signer = Assinatura(pfx_path, certificado.password) |
|
|
|
xml_signed = signer.assina_xml(xml, '') |
|
|
|
|
|
|
|
try: |
|
|
|
response = getattr(client.service, method)(1, xml_signed) |
|
|
|
|