Browse Source

Enviando o xml via requests e montando o cabeçalho soap manualmente

não tive sucesso usando suds
tags/0.1.5
Danimar 9 years ago
parent
commit
ad14364765
  1. 26
      pytrustnfe/HttpClient.py
  2. 13
      pytrustnfe/servicos/assinatura.py
  3. 78
      pytrustnfe/servicos/comunicacao.py
  4. 44
      pytrustnfe/servicos/nfe_autorizacao.py

26
pytrustnfe/HttpClient.py

@ -4,34 +4,28 @@ Created on Jun 16, 2015
@author: danimar @author: danimar
''' '''
from httplib import HTTPSConnection
import requests
class HttpClient(object): class HttpClient(object):
def __init__(self, url, chave_pem, certificado_pem):
def __init__(self, url, cert_path, key_path):
self.url = url self.url = url
self.chave_pem = chave_pem
self.certificado_pem = certificado_pem
self.cert_path = cert_path
self.key_path = key_path
def _headers(self): def _headers(self):
return { return {
u'Content-type': u'application/soap+xml; charset=utf-8',
u'Content-type': u'application/soap+xml; charset=utf-8; action="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao/nfeAutorizacaoLote',
u'Accept': u'application/soap+xml; charset=utf-8' u'Accept': u'application/soap+xml; charset=utf-8'
} }
def post_xml(self, post, xml): def post_xml(self, post, xml):
conexao = HTTPSConnection(self.url, '443', key_file=self.chave_pem,
cert_file=self.certificado_pem)
try: try:
conexao.request(u'POST', post, xml, self._headers())
response = conexao.getresponse()
if response.status == 200:
return response.read()
return response.read()
url = 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx'
res = requests.post(url, data=xml, cert=(self.cert_path, self.key_path),
verify=False, headers=self._headers())
return res.text
except Exception as e: except Exception as e:
print(str(e)) print(str(e))
finally:
conexao.close()

13
pytrustnfe/servicos/assinatura.py

@ -5,7 +5,7 @@ Created on Jun 14, 2015
@author: danimar @author: danimar
''' '''
from signxml import xmldsig
from signxml import XMLSigner
from signxml import methods from signxml import methods
from lxml import etree from lxml import etree
from OpenSSL import crypto from OpenSSL import crypto
@ -36,14 +36,13 @@ def assinar(xml, cert, key, reference):
parent.remove(elem) parent.remove(elem)
# element = xml.find('{' + xml.nsmap[None] + '}NFe') # element = xml.find('{' + xml.nsmap[None] + '}NFe')
signer = xmldsig(xml, digest_algorithm=u'sha1')
signer = XMLSigner(digest_algorithm=u'sha1',signature_algorithm="rsa-sha1",
method=methods.enveloped,
c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315')
ns = {} ns = {}
ns[None] = signer.namespaces['ds'] ns[None] = signer.namespaces['ds']
signer.namespaces = ns signer.namespaces = ns
signed_root = signer.sign(
key=str(key), cert=cert, reference_uri=reference,
algorithm="rsa-sha1", method=methods.enveloped,
c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315')
signed_root = signer.sign(xml, key=str(key), cert=cert, reference_uri=reference)
xmldsig(signed_root, digest_algorithm=u'sha1').verify(x509_cert=cert)
# XMLSigner(signed_root, digest_algorithm=u'sha1').verify(x509_cert=cert)
return etree.tostring(signed_root) return etree.tostring(signed_root)

78
pytrustnfe/servicos/comunicacao.py

@ -13,7 +13,7 @@ from uuid import uuid4
from pytrustnfe.HttpClient import HttpClient from pytrustnfe.HttpClient import HttpClient
from pytrustnfe.Certificado import converte_pfx_pem from pytrustnfe.Certificado import converte_pfx_pem
from xml.dom.minidom import parseString
from ..xml import sanitize_response
common_namespaces = {'soap': 'http://www.w3.org/2003/05/soap-envelope'} common_namespaces = {'soap': 'http://www.w3.org/2003/05/soap-envelope'}
@ -31,63 +31,35 @@ class Comunicacao(object):
self.cert = cert self.cert = cert
self.key = key self.key = key
def _get_client(self, base_url):
cache_location = '/tmp/suds'
cache = suds.cache.DocumentCache(location=cache_location)
f = open('/tmp/suds/cert_nfe.cer', 'w')
f.write(self.cert)
f.close()
f = open('/tmp/suds/key_nfe.cer', 'w')
f.write(self.key)
f.close()
session = requests.Session()
session.verify = False
session.cert = ('/tmp/suds/cert_nfe.cer',
'/tmp/suds/key_nfe.cer')
return suds.client.Client(
base_url,
cache=cache,
transport=suds_requests.RequestsTransport(session)
)
def _soap_xml(self, body): def _soap_xml(self, body):
xml = '''<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">
<soap:Header>
<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/'''
xml += self.metodo
xml += '''"><cUF>42</cUF><versaoDados>2.00</versaoDados>
</nfeCabecMsg>
</soap:Header>
<soap:Body>
<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/'''
xml += self.metodo + '">' + body
xml += '</nfeDadosMsg></soap:Body></soap:Envelope>'
xml = '<?xml version="1.0" encoding="utf-8"?>'
xml += '<soap12:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap12="http://www.w3.org/2003/05/soap-envelope"><soap12:Header>'
xml += '<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao">'
xml += '<cUF>43</cUF><versaoDados>3.10</versaoDados></nfeCabecMsg></soap12:Header><soap12:Body>'
xml += '<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao">'
xml += body
xml += '</nfeDadosMsg></soap12:Body></soap12:Envelope>'
return xml.rstrip('\n')
def _preparar_temp_pem(self): def _preparar_temp_pem(self):
chave_temp = '/tmp/' + uuid4().hex
certificado_temp = '/tmp/' + uuid4().hex
cert_path = '/tmp/' + uuid4().hex
key_path = '/tmp/' + uuid4().hex
chave, certificado = converte_pfx_pem(self.certificado, self.senha)
arq_temp = open(chave_temp, 'w')
arq_temp.write(chave)
arq_temp = open(cert_path, 'w')
arq_temp.write(self.cert)
arq_temp.close() arq_temp.close()
arq_temp = open(certificado_temp, 'w')
arq_temp.write(certificado)
arq_temp = open(key_path, 'w')
arq_temp.write(self.key)
arq_temp.close() arq_temp.close()
return chave_temp, certificado_temp
return cert_path, key_path
def _validar_dados(self): def _validar_dados(self):
assert self.url != '', "Url servidor não configurada" assert self.url != '', "Url servidor não configurada"
assert self.web_service != '', "Web service não especificado" assert self.web_service != '', "Web service não especificado"
assert self.certificado != '', "Certificado não configurado"
assert self.senha != '', "Senha não configurada"
assert self.metodo != '', "Método não configurado" assert self.metodo != '', "Método não configurado"
assert self.tag_retorno != '', "Tag de retorno não configurado"
def _validar_nfe(self, obj): def _validar_nfe(self, obj):
if not isinstance(obj, dict): if not isinstance(obj, dict):
@ -95,20 +67,10 @@ class Comunicacao(object):
def _executar_consulta(self, xmlEnviar): def _executar_consulta(self, xmlEnviar):
self._validar_dados() self._validar_dados()
# chave, certificado = self._preparar_temp_pem()
cert_path, key_path = self._preparar_temp_pem()
client = HttpClient(self.url, self.certificado, self.senha)
client = HttpClient(self.url, cert_path, key_path)
soap_xml = self._soap_xml(xmlEnviar) soap_xml = self._soap_xml(xmlEnviar)
xml_retorno = client.post_xml(self.web_service, soap_xml) xml_retorno = client.post_xml(self.web_service, soap_xml)
dom = parseString(xml_retorno)
nodes = dom.getElementsByTagNameNS(common_namespaces['soap'], 'Fault')
if len(nodes) > 0:
return nodes[0].toxml(), None
nodes = dom.getElementsByTagName(self.tag_retorno)
if len(nodes) > 0:
obj = objectify.fromstring(nodes[0].toxml())
return nodes[0].toxml(), obj
return xml_retorno, objectify.fromstring(xml_retorno)
return sanitize_response(xml_retorno)

44
pytrustnfe/servicos/nfe_autorizacao.py

@ -4,6 +4,7 @@ Created on 21/06/2015
@author: danimar @author: danimar
''' '''
import os
from lxml import etree from lxml import etree
from suds.sax.element import Element from suds.sax.element import Element
from suds.sax.text import Raw from suds.sax.text import Raw
@ -20,30 +21,21 @@ class NfeAutorizacao(Comunicacao):
Comunicacao.__init__(self, cert, key) Comunicacao.__init__(self, cert, key)
def autorizar_nfe(self, nfe, id): def autorizar_nfe(self, nfe, id):
self._validar_nfe(nfe)
xml = render_xml('nfeEnv.xml', **nfe)
xml_signed = assinar(xml, self.cert, self.key, '#%s' % id)
client = self._get_client(
'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx?wsdl')
cabecalho = client.factory.create('nfeCabecMsg')
cabecalho.cUF = '43'
cabecalho.versaoDados = '3.10'
client.set_options(soapheaders=cabecalho)
self.url = 'nfe-homologacao.sefazrs.rs.gov.br'
self.web_service = '/ws/NfeAutorizacao/NFeAutorizacao.asmx'
self.metodo = 'nfeAutorizacaoLote'
p = Parser()
import ipdb; ipdb.set_trace()
resposta = client.service.nfeAutorizacaoLote(
p.parse(string=xml_signed).root())
print client.last_sent()
print client.last_received()
consulta_recibo = utils.gerar_consulta_recibo(resposta)
client = self._get_client(
'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx'
)
return client.service.nfeRetAutorizacao(consulta_recibo)
self._validar_nfe(nfe)
path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml')
xml = render_xml(path, 'nfeEnv.xml', **nfe)
#xmlElem = etree.fromstring(xml) TODO Assinar
#xml_signed = assinar(xmlElem, self.cert, self.key, '#%s' % id)
print xml
xml_response, obj = self._executar_consulta(xml)
return {
'sent_xml': xml,
'received_xml': xml_response,
'object': obj.Body.nfeAutorizacaoLoteResult
}
Loading…
Cancel
Save