diff --git a/pytrustnfe/servicos/assinatura.py b/pytrustnfe/servicos/assinatura.py index 2d3b9c9..a42adab 100644 --- a/pytrustnfe/servicos/assinatura.py +++ b/pytrustnfe/servicos/assinatura.py @@ -5,14 +5,21 @@ Created on Jun 14, 2015 @author: danimar ''' -import xmlsec -import libxml2 -import os.path from signxml import xmldsig from signxml import methods from lxml import etree +from OpenSSL import crypto -NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#' + +def extract_cert_and_key_from_pfx(pfx, password): + pfx = crypto.load_pkcs12(pfx, password) + # PEM formatted private key + key = crypto.dump_privatekey(crypto.FILETYPE_PEM, + pfx.get_privatekey()) + # PEM formatted certificate + cert = crypto.dump_certificate(crypto.FILETYPE_PEM, + pfx.get_certificate()) + return cert, key def recursively_empty(e): @@ -21,7 +28,7 @@ def recursively_empty(e): return all((recursively_empty(c) for c in e.iterchildren())) -def assinar(xml, cert, key, reference, pfx, senha): +def assinar(xml, cert, key, reference): context = etree.iterwalk(xml) for action, elem in context: parent = elem.getparent() @@ -43,87 +50,4 @@ def assinar(xml, cert, key, reference, pfx, senha): c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315') xmldsig(signed_root, digest_algorithm=u'sha1').verify(x509_cert=cert) - # signature = Assinatura(pfx, senha) - # xmlsec = signature.assina_xml(not_signed, reference) - # xmlsec = xmlsec.replace(""" -# ]>\n""", "") return etree.tostring(signed_root) - # , xmlsec - - -class Assinatura(object): - - def __init__(self, arquivo, senha): - self.arquivo = arquivo - self.senha = senha - - def _checar_certificado(self): - if not os.path.isfile(self.arquivo): - raise Exception('Caminho do certificado não existe.') - - def _inicializar_cripto(self): - libxml2.initParser() - libxml2.substituteEntitiesDefault(1) - - xmlsec.init() - xmlsec.cryptoAppInit(None) - xmlsec.cryptoInit() - - def _finalizar_cripto(self): - xmlsec.cryptoShutdown() - xmlsec.cryptoAppShutdown() - xmlsec.shutdown() - - libxml2.cleanupParser() - - def assina_xml(self, xml, reference): - self._checar_certificado() - self._inicializar_cripto() - try: - doc_xml = libxml2.parseMemory(xml.encode('utf-8'), - len(xml.encode('utf-8'))) - import ipdb; ipdb.set_trace() - signNode = xmlsec.TmplSignature(doc_xml, - xmlsec.transformInclC14NId(), - xmlsec.transformRsaSha1Id(), None) - - doc_xml.getRootElement().addChild(signNode) - refNode = signNode.addReference( - xmlsec.transformSha1Id(), - None, reference, None) - - refNode.addTransform(xmlsec.transformEnvelopedId()) - refNode.addTransform(xmlsec.transformInclC14NId()) - keyInfoNode = signNode.ensureKeyInfo() - keyInfoNode.addX509Data() - - dsig_ctx = xmlsec.DSigCtx() - chave = xmlsec.cryptoAppKeyLoad( - filename=str(self.arquivo), - format=xmlsec.KeyDataFormatPkcs12, - pwd=str(self.senha), pwdCallback=None, pwdCallbackCtx=None) - - dsig_ctx.signKey = chave - dsig_ctx.sign(signNode) - - status = dsig_ctx.status - dsig_ctx.destroy() - - if status != xmlsec.DSigStatusSucceeded: - raise RuntimeError( - 'Erro ao realizar a assinatura do arquivo; status: "' + - str(status) + '"') - - xpath = doc_xml.xpathNewContext() - xpath.xpathRegisterNs('sig', NAMESPACE_SIG) - certs = xpath.xpathEval('//sig:X509Data/sig:X509Certificate') - for i in range(len(certs)-1): - certs[i].unlinkNode() - certs[i].freeNode() - - xml = doc_xml.serialize() - return xml - finally: - doc_xml.freeDoc() - self._finalizar_cripto() diff --git a/pytrustnfe/servicos/comunicacao.py b/pytrustnfe/servicos/comunicacao.py index 99ed9d9..be5b06b 100644 --- a/pytrustnfe/servicos/comunicacao.py +++ b/pytrustnfe/servicos/comunicacao.py @@ -5,6 +5,9 @@ Created on Jun 14, 2015 @author: danimar ''' +import suds.client +import suds_requests +import requests from lxml import objectify from uuid import uuid4 from pytrustnfe.HttpClient import HttpClient @@ -25,8 +28,29 @@ class Comunicacao(object): tag_retorno = '' def __init__(self, cert, key): - self.certificado = cert - self.senha = key + self.cert = cert + self.key = key + + def _get_client(self, base_url): + cache_location = '/tmp/suds' + cache = suds.cache.DocumentCache(location=cache_location) + + f = open('/tmp/suds/cert_nfe.cer', 'w') + f.write(self.cert) + f.close() + f = open('/tmp/suds/key_nfe.cer', 'w') + f.write(self.key) + f.close() + session = requests.Session() + session.verify = False + session.cert = ('/tmp/suds/cert_nfe.cer', + '/tmp/suds/key_nfe.cer') + + return suds.client.Client( + base_url, + cache=cache, + transport=suds_requests.RequestsTransport(session) + ) def _soap_xml(self, body): xml = ''' diff --git a/pytrustnfe/servicos/nfe_autorizacao.py b/pytrustnfe/servicos/nfe_autorizacao.py index d7595d6..dfc95b4 100644 --- a/pytrustnfe/servicos/nfe_autorizacao.py +++ b/pytrustnfe/servicos/nfe_autorizacao.py @@ -5,6 +5,7 @@ Created on 21/06/2015 @author: danimar ''' from lxml import etree +from suds.sax.element import Element from pytrustnfe.servicos.comunicacao import Comunicacao from pytrustnfe import utils from pytrustnfe.xml import render_xml @@ -13,43 +14,30 @@ from pytrustnfe.servicos.assinatura import assinar class NfeAutorizacao(Comunicacao): - def __init__(self, cert, key, certificado, senha): - Comunicacao.__init__(self, certificado, senha) - self.cert = cert - self.key = key + def __init__(self, cert, key): + Comunicacao.__init__(self, cert, key) - def autorizar_nfe(self, nfe): + def autorizar_nfe(self, nfe, id): self._validar_nfe(nfe) xml = render_xml('nfeEnv.xml', **nfe) - self.metodo = 'NFeAutorizacao' - self.tag_retorno = 'retEnviNFe' - self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx' - self.url = 'nfe.sefazrs.rs.gov.br' + xml_signed = assinar(xml, self.cert, self.key, '#%s' % id) - return self._executar_consulta(xml) + client = self._get_client( + 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx?wsdl') - def autorizar_nfe_e_recibo(self, nfe, id): - self._validar_nfe(nfe) - xml = render_xml('nfeEnv.xml', **nfe) - - return assinar(xml, self.cert, self.key, - '#%s' % id, - self.certificado, self.senha) + cabecalho = client.factory.create('nfeCabecMsg') + cabecalho.cUF = '43' + cabecalho.versaoDados = '3.10' + client.set_options(soapheaders=cabecalho) - self.metodo = 'NFeAutorizacao' - self.tag_retorno = 'retEnviNFe' - self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx' - self.url = 'nfe.sefazrs.rs.gov.br' - - xml_recibo, recibo = self._executar_consulta(xml) - - consulta_recibo = utils.gerar_consulta_recibo(recibo) - self._validar_nfe(nfe) + resposta = client.service.nfeAutorizacaoLote(xml_signed) + print client.last_sent() + print client.last_received() - self.metodo = 'NFeRetAutorizacao' - self.tag_retorno = 'retConsReciNFe' - self.web_service = 'ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx' - self.url = 'nfe.sefazrs.rs.gov.br' + consulta_recibo = utils.gerar_consulta_recibo(resposta) - return self._executar_consulta(xml), consulta_recibo + client = self._get_client( + 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx' + ) + return client.service.nfeRetAutorizacao(consulta_recibo) diff --git a/pytrustnfe/utils.py b/pytrustnfe/utils.py index 933e4e3..264e658 100644 --- a/pytrustnfe/utils.py +++ b/pytrustnfe/utils.py @@ -19,11 +19,7 @@ def datetime_tostring(data): def gerar_consulta_recibo(recibo): - c = DynamicXml('consReciNFe') - c(xmlns="http://www.portalfiscal.inf.br/nfe", versao="2.00") - c.tpAmb = recibo.tpAmb - c.nRec = recibo.infRec.nRec - return c + return {'tpAmb': recibo.tpAmb, 'nRec': recibo.infRec.nRec} def gerar_chave(obj_chave, prefix=None):