Browse Source

nfse post_https

pull/7/head
Leonardo Tada 10 years ago
parent
commit
84a5063610
  1. 90
      pynfe/processamento/comunicacao.py

90
pynfe/processamento/comunicacao.py

@ -8,6 +8,10 @@ NAMESPACE_SOAP_NFSE, NAMESPACE_BETHA
from pynfe.utils.webservices import NFCE, NFE, NFSE
from .assinatura import AssinaturaA1
from pynfe.entidades.certificado import CertificadoA1
import requests
from suds.transport.http import HttpAuthenticated
from suds.transport import Reply, TransportError
class Comunicacao(object):
u"""Classe abstrata responsavel por definir os metodos e logica das classes
@ -43,7 +47,7 @@ class ComunicacaoSefaz(Comunicacao):
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeAutorizacao'), metodo='NfeAutorizacao', dados=raiz)
# Faz request no Servidor da Sefaz
retorno = self._post(url, xml)
# Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
# Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
if retorno.status_code == 200:
@ -96,7 +100,7 @@ class ComunicacaoSefaz(Comunicacao):
etree.SubElement(raiz, 'nRec').text = numero
# Monta XML para envio da requisição
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeRetAutorizacao'), metodo='NfeRetAutorizacao', dados=raiz)
return self._post(url, xml)
def consulta_nota(self, modelo, chave):
@ -112,12 +116,12 @@ class ComunicacaoSefaz(Comunicacao):
etree.SubElement(raiz, 'chNFe').text = chave
# Monta XML para envio da requisição
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeConsulta2'), metodo='NfeConsulta2', dados=raiz)
return self._post(url, xml)
def consulta_notas_cnpj(self, cnpj, nsu=0):
"""
Serviço de Consulta da Relação de Documentos Destinados para um determinado CNPJ de destinatário informado na NF-e.
Serviço de Consulta da Relação de Documentos Destinados para um determinado CNPJ de destinatário informado na NF-e.
"""
# url do serviço
url = self._get_url(modelo='nfe', consulta='DESTINADAS')
@ -126,16 +130,16 @@ class ComunicacaoSefaz(Comunicacao):
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR NFE DEST'
etree.SubElement(raiz, 'CNPJ').text = cnpj
# Indicador de NF-e consultada:
# 0=Todas as NF-e;
# Indicador de NF-e consultada:
# 0=Todas as NF-e;
# 1=Somente as NF-e que ainda não tiveram manifestação do destinatário (Desconhecimento da operação, Operação não Realizada ou Confirmação da Operação);
# 2=Idem anterior, incluindo as NF-e que também não tiveram a Ciência da Operação.
# 2=Idem anterior, incluindo as NF-e que também não tiveram a Ciência da Operação.
etree.SubElement(raiz, 'indNFe').text = '0'
# Indicador do Emissor da NF-e:
# 0=Todos os Emitentes / Remetentes;
# 1=Somente as NF-e emitidas por emissores / remetentes que não tenham o mesmo CNPJ-Base do destinatário (para excluir as notas fiscais de transferência entre filiais).
# 1=Somente as NF-e emitidas por emissores / remetentes que não tenham o mesmo CNPJ-Base do destinatário (para excluir as notas fiscais de transferência entre filiais).
etree.SubElement(raiz, 'indEmi').text = '0'
# Último NSU recebido pela Empresa. Caso seja informado com zero, ou com um NSU muito antigo, a consulta retornará unicamente as notas fiscais que tenham sido recepcionadas nos últimos 15 dias.
# Último NSU recebido pela Empresa. Caso seja informado com zero, ou com um NSU muito antigo, a consulta retornará unicamente as notas fiscais que tenham sido recepcionadas nos últimos 15 dias.
etree.SubElement(raiz, 'ultNSU').text = str(nsu)
# Monta XML para envio da requisição
@ -160,7 +164,7 @@ class ComunicacaoSefaz(Comunicacao):
def status_servico(self, modelo):
""" Verifica status do servidor da receita. """
""" modelo é a string com tipo de serviço que deseja consultar
Ex: nfe ou nfce
Ex: nfe ou nfce
"""
url = self._get_url(modelo=modelo, consulta='STATUS')
@ -411,7 +415,7 @@ class ComunicacaoNfse(Comunicacao):
# cabecalho
raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = '2.02'
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
cabecalho = xml_declaration + cabecalho
@ -431,39 +435,38 @@ class ComunicacaoNfse(Comunicacao):
raise Exception('Autorizador nao encontrado!')
return self.url
def _post(self, url, xml):
certificadoA1 = CertificadoA1(self.certificado)
chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
chave_cert = (cert, chave)
# Abre a conexão HTTPS
def _post(self, url, xml, metodo):
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
xml_declaration='<?xml version="1.0" encoding="utf-8"?>'
#xml = etree.tostring(xml, encoding='unicode', pretty_print=False).replace('\n','').replace('ns0:','soapenv:').replace(':ns0',':soapenv')
xml = etree.tostring(xml, encoding='unicode', pretty_print=False).replace('\n','').replace('ns0:','').replace(':ns0','')
xml = xml_declaration + xml
print (url)
print (xml)
import ipdb
ipdb.set_trace()
# Faz o request com o servidor
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False)
result.encoding='utf-8'
return result
except requests.exceptions.ConnectionError as e:
from suds.client import Client
cliente = Client(url)
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
return cliente.service.CancelarNfse(cabecalho, xml)
# TODO outros metodos
else:
pass
except Exception as e:
raise e
finally:
certificadoA1.excluir()
def _post2(self, url, xml, metodo):
def _post_https(self, url, xml, metodo):
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
cliente = Client(url)
headers = {"Content-Type": "text/xml;charset=UTF-8", "SOAPAction": ""}
t = self.RequestsTransport(cert='/home/leonardo/Documentos/certificado/TADA_SOFTWARE_LTDA_ME.pfx')
cliente = Client(url, headers=headers, transport=t)
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
@ -477,4 +480,17 @@ class ComunicacaoNfse(Comunicacao):
else:
pass
except Exception as e:
raise e
raise e
class RequestsTransport(HttpAuthenticated):
def __init__(self, **kwargs):
self.cert = kwargs.pop('cert', None)
# super won't work because not using new style class
HttpAuthenticated.__init__(self, **kwargs)
def send(self, request):
self.addcredentials(request)
resp = requests.post(request.url, data=request.message,
headers=request.headers, cert=self.cert)
result = Reply(resp.status_code, resp.headers, resp.content)
return result
Loading…
Cancel
Save