From ad14364765dac0c18cc9bc8a7554514c8f1e2a23 Mon Sep 17 00:00:00 2001 From: Danimar Date: Sun, 7 Aug 2016 04:36:02 -0300 Subject: [PATCH] =?UTF-8?q?Enviando=20o=20xml=20via=20requests=20e=20monta?= =?UTF-8?q?ndo=20o=20cabe=C3=A7alho=20soap=20manualmente=20n=C3=A3o=20tive?= =?UTF-8?q?=20sucesso=20usando=20suds?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytrustnfe/HttpClient.py | 30 +++++-------- pytrustnfe/servicos/assinatura.py | 13 +++--- pytrustnfe/servicos/comunicacao.py | 82 +++++++++------------------------- pytrustnfe/servicos/nfe_autorizacao.py | 46 ++++++++----------- 4 files changed, 59 insertions(+), 112 deletions(-) diff --git a/pytrustnfe/HttpClient.py b/pytrustnfe/HttpClient.py index 80f17eb..a2ee8fe 100644 --- a/pytrustnfe/HttpClient.py +++ b/pytrustnfe/HttpClient.py @@ -4,34 +4,28 @@ Created on Jun 16, 2015 @author: danimar ''' -from httplib import HTTPSConnection +import requests class HttpClient(object): - def __init__(self, url, chave_pem, certificado_pem): + def __init__(self, url, cert_path, key_path): self.url = url - self.chave_pem = chave_pem - self.certificado_pem = certificado_pem + self.cert_path = cert_path + self.key_path = key_path def _headers(self): return { - u'Content-type': u'application/soap+xml; charset=utf-8', - u'Accept': u'application/soap+xml; charset=utf-8' + u'Content-type': u'application/soap+xml; charset=utf-8; action="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao/nfeAutorizacaoLote', + u'Accept': u'application/soap+xml; charset=utf-8' } - def post_xml(self, post, xml): - - conexao = HTTPSConnection(self.url, '443', key_file=self.chave_pem, - cert_file=self.certificado_pem) - + def post_xml(self, post, xml): try: - conexao.request(u'POST', post, xml, self._headers()) - response = conexao.getresponse() - if response.status == 200: - return response.read() - return response.read() + url = 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx' + res = requests.post(url, data=xml, cert=(self.cert_path, self.key_path), + verify=False, headers=self._headers()) + return res.text except Exception as e: print(str(e)) - finally: - conexao.close() + diff --git a/pytrustnfe/servicos/assinatura.py b/pytrustnfe/servicos/assinatura.py index 6a0a0a6..7b29e6f 100644 --- a/pytrustnfe/servicos/assinatura.py +++ b/pytrustnfe/servicos/assinatura.py @@ -5,7 +5,7 @@ Created on Jun 14, 2015 @author: danimar ''' -from signxml import xmldsig +from signxml import XMLSigner from signxml import methods from lxml import etree from OpenSSL import crypto @@ -36,14 +36,13 @@ def assinar(xml, cert, key, reference): parent.remove(elem) # element = xml.find('{' + xml.nsmap[None] + '}NFe') - signer = xmldsig(xml, digest_algorithm=u'sha1') + signer = XMLSigner(digest_algorithm=u'sha1',signature_algorithm="rsa-sha1", + method=methods.enveloped, + c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315') ns = {} ns[None] = signer.namespaces['ds'] signer.namespaces = ns - signed_root = signer.sign( - key=str(key), cert=cert, reference_uri=reference, - algorithm="rsa-sha1", method=methods.enveloped, - c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315') + signed_root = signer.sign(xml, key=str(key), cert=cert, reference_uri=reference) - xmldsig(signed_root, digest_algorithm=u'sha1').verify(x509_cert=cert) + # XMLSigner(signed_root, digest_algorithm=u'sha1').verify(x509_cert=cert) return etree.tostring(signed_root) diff --git a/pytrustnfe/servicos/comunicacao.py b/pytrustnfe/servicos/comunicacao.py index be5b06b..a987214 100644 --- a/pytrustnfe/servicos/comunicacao.py +++ b/pytrustnfe/servicos/comunicacao.py @@ -13,7 +13,7 @@ from uuid import uuid4 from pytrustnfe.HttpClient import HttpClient from pytrustnfe.Certificado import converte_pfx_pem -from xml.dom.minidom import parseString +from ..xml import sanitize_response common_namespaces = {'soap': 'http://www.w3.org/2003/05/soap-envelope'} @@ -31,63 +31,35 @@ class Comunicacao(object): self.cert = cert self.key = key - def _get_client(self, base_url): - cache_location = '/tmp/suds' - cache = suds.cache.DocumentCache(location=cache_location) - - f = open('/tmp/suds/cert_nfe.cer', 'w') - f.write(self.cert) - f.close() - f = open('/tmp/suds/key_nfe.cer', 'w') - f.write(self.key) - f.close() - session = requests.Session() - session.verify = False - session.cert = ('/tmp/suds/cert_nfe.cer', - '/tmp/suds/key_nfe.cer') - - return suds.client.Client( - base_url, - cache=cache, - transport=suds_requests.RequestsTransport(session) - ) - def _soap_xml(self, body): - xml = ''' - - -422.00 - - - -' + body - xml += '' + xml = '' + xml += '' + xml += '' + xml += '433.10' + xml += '' + xml += body + xml += '' + return xml.rstrip('\n') def _preparar_temp_pem(self): - chave_temp = '/tmp/' + uuid4().hex - certificado_temp = '/tmp/' + uuid4().hex - - chave, certificado = converte_pfx_pem(self.certificado, self.senha) - arq_temp = open(chave_temp, 'w') - arq_temp.write(chave) + cert_path = '/tmp/' + uuid4().hex + key_path = '/tmp/' + uuid4().hex + + arq_temp = open(cert_path, 'w') + arq_temp.write(self.cert) arq_temp.close() - arq_temp = open(certificado_temp, 'w') - arq_temp.write(certificado) + arq_temp = open(key_path, 'w') + arq_temp.write(self.key) arq_temp.close() - return chave_temp, certificado_temp + return cert_path, key_path def _validar_dados(self): assert self.url != '', "Url servidor não configurada" assert self.web_service != '', "Web service não especificado" - assert self.certificado != '', "Certificado não configurado" - assert self.senha != '', "Senha não configurada" assert self.metodo != '', "Método não configurado" - assert self.tag_retorno != '', "Tag de retorno não configurado" + def _validar_nfe(self, obj): if not isinstance(obj, dict): @@ -95,20 +67,10 @@ class Comunicacao(object): def _executar_consulta(self, xmlEnviar): self._validar_dados() - # chave, certificado = self._preparar_temp_pem() + cert_path, key_path = self._preparar_temp_pem() - client = HttpClient(self.url, self.certificado, self.senha) + client = HttpClient(self.url, cert_path, key_path) soap_xml = self._soap_xml(xmlEnviar) xml_retorno = client.post_xml(self.web_service, soap_xml) - - dom = parseString(xml_retorno) - nodes = dom.getElementsByTagNameNS(common_namespaces['soap'], 'Fault') - if len(nodes) > 0: - return nodes[0].toxml(), None - - nodes = dom.getElementsByTagName(self.tag_retorno) - if len(nodes) > 0: - obj = objectify.fromstring(nodes[0].toxml()) - return nodes[0].toxml(), obj - - return xml_retorno, objectify.fromstring(xml_retorno) + + return sanitize_response(xml_retorno) diff --git a/pytrustnfe/servicos/nfe_autorizacao.py b/pytrustnfe/servicos/nfe_autorizacao.py index 700a5c2..78331ba 100644 --- a/pytrustnfe/servicos/nfe_autorizacao.py +++ b/pytrustnfe/servicos/nfe_autorizacao.py @@ -4,6 +4,7 @@ Created on 21/06/2015 @author: danimar ''' +import os from lxml import etree from suds.sax.element import Element from suds.sax.text import Raw @@ -19,31 +20,22 @@ class NfeAutorizacao(Comunicacao): def __init__(self, cert, key): Comunicacao.__init__(self, cert, key) - def autorizar_nfe(self, nfe, id): + def autorizar_nfe(self, nfe, id): + self.url = 'nfe-homologacao.sefazrs.rs.gov.br' + self.web_service = '/ws/NfeAutorizacao/NFeAutorizacao.asmx' + self.metodo = 'nfeAutorizacaoLote' + self._validar_nfe(nfe) - xml = render_xml('nfeEnv.xml', **nfe) - - xml_signed = assinar(xml, self.cert, self.key, '#%s' % id) - - client = self._get_client( - 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx?wsdl') - - cabecalho = client.factory.create('nfeCabecMsg') - cabecalho.cUF = '43' - cabecalho.versaoDados = '3.10' - client.set_options(soapheaders=cabecalho) - - p = Parser() - import ipdb; ipdb.set_trace() - resposta = client.service.nfeAutorizacaoLote( - p.parse(string=xml_signed).root()) - - print client.last_sent() - print client.last_received() - - consulta_recibo = utils.gerar_consulta_recibo(resposta) - - client = self._get_client( - 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx' - ) - return client.service.nfeRetAutorizacao(consulta_recibo) + path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml') + xml = render_xml(path, 'nfeEnv.xml', **nfe) + + #xmlElem = etree.fromstring(xml) TODO Assinar + #xml_signed = assinar(xmlElem, self.cert, self.key, '#%s' % id) + print xml + xml_response, obj = self._executar_consulta(xml) + + return { + 'sent_xml': xml, + 'received_xml': xml_response, + 'object': obj.Body.nfeAutorizacaoLoteResult + }