You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
237 lines
9.1 KiB
237 lines
9.1 KiB
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import division, print_function, unicode_literals
|
|
|
|
from pynfe.utils import etree
|
|
from pynfe.utils.flags import (
|
|
NAMESPACE_XSI,
|
|
NAMESPACE_BETHA,
|
|
)
|
|
from pynfe.utils.webservices import NFSE
|
|
from pynfe.entidades.certificado import CertificadoA1
|
|
from .comunicacao import Comunicacao
|
|
|
|
|
|
class ComunicacaoNFSe(Comunicacao):
|
|
""" Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
|
|
|
|
_versao = ''
|
|
_namespace = ''
|
|
|
|
def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
|
|
self.certificado = certificado
|
|
self.certificado_senha = certificado_senha
|
|
self._ambiente = 2 if homologacao else 1
|
|
self.autorizador = autorizador.upper()
|
|
if self.autorizador == 'GINFES':
|
|
self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
|
|
self._versao = '3'
|
|
elif self.autorizador == 'BETHA':
|
|
self._namespace = NAMESPACE_BETHA
|
|
self._versao = '2.02'
|
|
else:
|
|
raise Exception('Autorizador não encontrado!')
|
|
|
|
def autorizacao(self, nota):
|
|
# url do serviço
|
|
url = self._get_url()
|
|
if self.autorizador == 'BETHA':
|
|
# xml
|
|
xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
|
|
# comunica via wsdl
|
|
return self._post(url, xml, 'gerar')
|
|
else:
|
|
raise Exception('Este método só esta implementado no autorizador betha.')
|
|
|
|
def enviar_lote(self, xml):
|
|
# url do serviço
|
|
url = self._get_url()
|
|
if self.autorizador == 'GINFES':
|
|
# xml
|
|
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
|
|
# comunica via wsdl
|
|
return self._post_https(url, xml, 'enviar_lote')
|
|
else:
|
|
raise Exception('Este método só esta implementado no autorizador ginfes.')
|
|
|
|
def consultar(self, xml):
|
|
# url do serviço
|
|
url = self._get_url()
|
|
if self.autorizador == 'GINFES':
|
|
# xml
|
|
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
|
|
# comunica via wsdl
|
|
return self._post_https(url, xml, 'consulta')
|
|
else:
|
|
raise Exception('Este método só esta implementado no autorizador ginfes.')
|
|
|
|
def consultar_rps(self, xml):
|
|
# url do serviço
|
|
url = self._get_url()
|
|
if self.autorizador == 'BETHA':
|
|
# comunica via wsdl
|
|
return self._post(url, xml, 'consultaRps')
|
|
elif self.autorizador == 'GINFES':
|
|
return self._post_https(url, xml, 'consultaRps')
|
|
# TODO outros autorizadres
|
|
else:
|
|
raise Exception('Autorizador não encontrado!')
|
|
|
|
def consultar_faixa(self, xml):
|
|
# url do serviço
|
|
url = self._get_url()
|
|
if self.autorizador == 'BETHA':
|
|
# comunica via wsdl
|
|
return self._post(url, xml, 'consultaFaixa')
|
|
else:
|
|
raise Exception('Este método só esta implementado no autorizador betha.')
|
|
|
|
def consultar_lote(self, xml):
|
|
# url do serviço
|
|
url = self._get_url()
|
|
if self.autorizador == 'GINFES':
|
|
# xml
|
|
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
|
|
# comunica via wsdl
|
|
return self._post_https(url, xml, 'consulta_lote')
|
|
else:
|
|
raise Exception('Este método só esta implementado no autorizador ginfes.')
|
|
|
|
def consultar_situacao_lote(self, xml):
|
|
# url do serviço
|
|
url = self._get_url()
|
|
if self.autorizador == 'GINFES':
|
|
# comunica via wsdl
|
|
return self._post_https(url, xml, 'consulta_situacao_lote')
|
|
else:
|
|
raise Exception('Este método só esta implementado no autorizador ginfes.')
|
|
|
|
def cancelar(self, xml):
|
|
# url do serviço
|
|
url = self._get_url()
|
|
# Betha
|
|
if self.autorizador == 'BETHA':
|
|
# comunica via wsdl
|
|
return self._post(url, xml, 'cancelar')
|
|
# Ginfes
|
|
elif self.autorizador == 'GINFES':
|
|
# comunica via wsdl com certificado
|
|
return self._post_https(url, xml, 'cancelar')
|
|
# TODO outros autorizadres
|
|
else:
|
|
raise Exception('Autorizador não encontrado!')
|
|
|
|
def _cabecalho(self, retorna_string=True):
|
|
""" Monta o XML do cabeçalho da requisição wsdl
|
|
Namespaces padrão homologação (Ginfes) """
|
|
|
|
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
|
|
# cabecalho = '<ns2:cabecalho versao="3" xmlns:ns2="http://www.ginfes.com.br/cabecalho_v03.xsd"
|
|
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><versaoDados>3</versaoDados></ns2:cabecalho>'
|
|
# cabecalho
|
|
raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
|
|
etree.SubElement(raiz, 'versaoDados').text = self._versao
|
|
|
|
if retorna_string:
|
|
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
|
|
cabecalho = xml_declaration + cabecalho
|
|
return cabecalho
|
|
else:
|
|
return raiz
|
|
|
|
def _cabecalho2(self, retorna_string=True):
|
|
""" Monta o XML do cabeçalho da requisição wsdl
|
|
Namespaces que funcionaram em produção (Ginfes)"""
|
|
|
|
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
|
|
|
|
# cabecalho
|
|
raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
|
|
etree.SubElement(raiz, 'versaoDados').text = self._versao
|
|
|
|
if retorna_string:
|
|
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
|
|
cabecalho = xml_declaration + cabecalho
|
|
return cabecalho
|
|
else:
|
|
return raiz
|
|
|
|
def _cabecalho_ginfes(self):
|
|
""" Retorna o XML do cabeçalho gerado pelo xsd"""
|
|
from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
|
|
return SerializacaoGinfes().cabecalho()
|
|
|
|
def _get_url(self):
|
|
""" Retorna a url para comunicação com o webservice """
|
|
if self._ambiente == 1:
|
|
ambiente = 'HTTPS'
|
|
else:
|
|
ambiente = 'HOMOLOGACAO'
|
|
if self.autorizador in NFSE:
|
|
self.url = NFSE[self.autorizador][ambiente]
|
|
else:
|
|
raise Exception('Autorizador nao encontrado!')
|
|
return self.url
|
|
|
|
def _post(self, url, xml, metodo):
|
|
""" Comunicação wsdl (http) sem certificado digital """
|
|
# cabecalho
|
|
cabecalho = self._cabecalho()
|
|
# comunicacao wsdl
|
|
try:
|
|
from suds.client import Client
|
|
cliente = Client(url)
|
|
# gerar nfse
|
|
if metodo == 'gerar':
|
|
return cliente.service.GerarNfse(cabecalho, xml)
|
|
elif metodo == 'consultaRps':
|
|
return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
|
|
elif metodo == 'consultaFaixa':
|
|
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
|
|
elif metodo == 'cancelar':
|
|
return cliente.service.CancelarNfse(cabecalho, xml)
|
|
# TODO outros metodos
|
|
else:
|
|
raise Exception('Método não implementado no autorizador.')
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def _post_https(self, url, xml, metodo):
|
|
""" Comunicação wsdl (https) utilizando certificado do usuário """
|
|
# cabecalho
|
|
cabecalho = self._cabecalho()
|
|
# comunicacao wsdl
|
|
try:
|
|
from suds.client import Client
|
|
from pynfe.utils.https_nfse import HttpAuthenticated
|
|
|
|
certificadoA1 = CertificadoA1(self.certificado)
|
|
chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
|
|
|
|
cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
|
|
|
|
# gerar nfse
|
|
if metodo == 'gerar':
|
|
return cliente.service.GerarNfse(cabecalho, xml)
|
|
elif metodo == 'enviar_lote':
|
|
return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
|
|
elif metodo == 'consulta':
|
|
return cliente.service.ConsultarNfseV3(cabecalho, xml)
|
|
elif metodo == 'consulta_lote':
|
|
return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
|
|
elif metodo == 'consulta_situacao_lote':
|
|
return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
|
|
elif metodo == 'consultaRps':
|
|
return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
|
|
elif metodo == 'consultaFaixa':
|
|
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
|
|
elif metodo == 'cancelar':
|
|
# versão 2
|
|
return cliente.service.CancelarNfse(xml)
|
|
# versão 3
|
|
# return cliente.service.CancelarNfseV3(cabecalho, xml)
|
|
# TODO outros metodos
|
|
else:
|
|
raise Exception('Método não implementado no autorizador.')
|
|
except Exception as e:
|
|
raise e
|