diff --git a/pynfe/processamento/comunicacao.py b/pynfe/processamento/comunicacao.py index 13d4d29..5f59baf 100644 --- a/pynfe/processamento/comunicacao.py +++ b/pynfe/processamento/comunicacao.py @@ -551,228 +551,3 @@ class ComunicacaoSefaz(Comunicacao): raise e finally: certificado_a1.excluir() - - -class ComunicacaoNfse(Comunicacao): - """ Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """ - - _versao = '' - _namespace = '' - - def __init__(self, certificado, certificado_senha, autorizador, homologacao=False): - self.certificado = certificado - self.certificado_senha = certificado_senha - self._ambiente = 2 if homologacao else 1 - self.autorizador = autorizador.upper() - if self.autorizador == 'GINFES': - self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd' - self._versao = '3' - elif self.autorizador == 'BETHA': - self._namespace = NAMESPACE_BETHA - self._versao = '2.02' - else: - raise Exception('Autorizador não encontrado!') - - def autorizacao(self, nota): - # url do serviço - url = self._get_url() - if self.autorizador == 'BETHA': - # xml - xml = etree.tostring(nota, encoding='unicode', pretty_print=False) - # comunica via wsdl - return self._post(url, xml, 'gerar') - else: - raise Exception('Este método só esta implementado no autorizador betha.') - - def enviar_lote(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'GINFES': - # xml - xml = '' + xml - # comunica via wsdl - return self._post_https(url, xml, 'enviar_lote') - else: - raise Exception('Este método só esta implementado no autorizador ginfes.') - - def consultar(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'GINFES': - # xml - xml = '' + xml - # comunica via wsdl - return self._post_https(url, xml, 'consulta') - else: - raise Exception('Este método só esta implementado no autorizador ginfes.') - - def consultar_rps(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'BETHA': - # comunica via wsdl - return self._post(url, xml, 'consultaRps') - elif self.autorizador == 'GINFES': - return self._post_https(url, xml, 'consultaRps') - # TODO outros autorizadres - else: - raise Exception('Autorizador não encontrado!') - - def consultar_faixa(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'BETHA': - # comunica via wsdl - return self._post(url, xml, 'consultaFaixa') - else: - raise Exception('Este método só esta implementado no autorizador betha.') - - def consultar_lote(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'GINFES': - # xml - xml = '' + xml - # comunica via wsdl - return self._post_https(url, xml, 'consulta_lote') - else: - raise Exception('Este método só esta implementado no autorizador ginfes.') - - def consultar_situacao_lote(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'GINFES': - # comunica via wsdl - return self._post_https(url, xml, 'consulta_situacao_lote') - else: - raise Exception('Este método só esta implementado no autorizador ginfes.') - - def cancelar(self, xml): - # url do serviço - url = self._get_url() - # Betha - if self.autorizador == 'BETHA': - # comunica via wsdl - return self._post(url, xml, 'cancelar') - # Ginfes - elif self.autorizador == 'GINFES': - # comunica via wsdl com certificado - return self._post_https(url, xml, 'cancelar') - # TODO outros autorizadres - else: - raise Exception('Autorizador não encontrado!') - - def _cabecalho(self, retorna_string=True): - """ Monta o XML do cabeçalho da requisição wsdl - Namespaces padrão homologação (Ginfes) """ - - xml_declaration = '' - # cabecalho = '3' - # cabecalho - raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao) - etree.SubElement(raiz, 'versaoDados').text = self._versao - - if retorna_string: - cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','') - cabecalho = xml_declaration + cabecalho - return cabecalho - else: - return raiz - - def _cabecalho2(self, retorna_string=True): - """ Monta o XML do cabeçalho da requisição wsdl - Namespaces que funcionaram em produção (Ginfes)""" - - xml_declaration = '' - - # cabecalho - raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao) - etree.SubElement(raiz, 'versaoDados').text = self._versao - - if retorna_string: - cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '') - cabecalho = xml_declaration + cabecalho - return cabecalho - else: - return raiz - - def _cabecalho_ginfes(self): - """ Retorna o XML do cabeçalho gerado pelo xsd""" - from pynfe.processamento.autorizador_nfse import SerializacaoGinfes - return SerializacaoGinfes().cabecalho() - - def _get_url(self): - """ Retorna a url para comunicação com o webservice """ - if self._ambiente == 1: - ambiente = 'HTTPS' - else: - ambiente = 'HOMOLOGACAO' - if self.autorizador in NFSE: - self.url = NFSE[self.autorizador][ambiente] - else: - raise Exception('Autorizador nao encontrado!') - return self.url - - def _post(self, url, xml, metodo): - """ Comunicação wsdl (http) sem certificado digital """ - # cabecalho - cabecalho = self._cabecalho() - # comunicacao wsdl - try: - from suds.client import Client - cliente = Client(url) - # gerar nfse - if metodo == 'gerar': - return cliente.service.GerarNfse(cabecalho, xml) - elif metodo == 'consultaRps': - return cliente.service.ConsultarNfsePorRps(cabecalho, xml) - elif metodo == 'consultaFaixa': - return cliente.service.ConsultarNfseFaixa(cabecalho, xml) - elif metodo == 'cancelar': - return cliente.service.CancelarNfse(cabecalho, xml) - # TODO outros metodos - else: - raise Exception('Método não implementado no autorizador.') - except Exception as e: - raise e - - def _post_https(self, url, xml, metodo): - """ Comunicação wsdl (https) utilizando certificado do usuário """ - # cabecalho - cabecalho = self._cabecalho() - # comunicacao wsdl - try: - from suds.client import Client - from pynfe.utils.https_nfse import HttpAuthenticated - - certificadoA1 = CertificadoA1(self.certificado) - chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True) - - cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url)) - - # gerar nfse - if metodo == 'gerar': - return cliente.service.GerarNfse(cabecalho, xml) - elif metodo == 'enviar_lote': - return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml) - elif metodo == 'consulta': - return cliente.service.ConsultarNfseV3(cabecalho, xml) - elif metodo == 'consulta_lote': - return cliente.service.ConsultarLoteRpsV3(cabecalho, xml) - elif metodo == 'consulta_situacao_lote': - return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml) - elif metodo == 'consultaRps': - return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml) - elif metodo == 'consultaFaixa': - return cliente.service.ConsultarNfseFaixa(cabecalho, xml) - elif metodo == 'cancelar': - # versão 2 - return cliente.service.CancelarNfse(xml) - # versão 3 - # return cliente.service.CancelarNfseV3(cabecalho, xml) - # TODO outros metodos - else: - raise Exception('Método não implementado no autorizador.') - except Exception as e: - raise e \ No newline at end of file diff --git a/pynfe/processamento/nfse.py b/pynfe/processamento/nfse.py new file mode 100644 index 0000000..5d75a52 --- /dev/null +++ b/pynfe/processamento/nfse.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +from pynfe.utils import etree +from pynfe.utils.flags import ( + NAMESPACE_XSI, + NAMESPACE_BETHA, +) +from pynfe.utils.webservices import NFSE +from pynfe.entidades.certificado import CertificadoA1 +from .comunicacao import Comunicacao + + +class ComunicacaoNfse(Comunicacao): + """ Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """ + + _versao = '' + _namespace = '' + + def __init__(self, certificado, certificado_senha, autorizador, homologacao=False): + self.certificado = certificado + self.certificado_senha = certificado_senha + self._ambiente = 2 if homologacao else 1 + self.autorizador = autorizador.upper() + if self.autorizador == 'GINFES': + self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd' + self._versao = '3' + elif self.autorizador == 'BETHA': + self._namespace = NAMESPACE_BETHA + self._versao = '2.02' + else: + raise Exception('Autorizador não encontrado!') + + def autorizacao(self, nota): + # url do serviço + url = self._get_url() + if self.autorizador == 'BETHA': + # xml + xml = etree.tostring(nota, encoding='unicode', pretty_print=False) + # comunica via wsdl + return self._post(url, xml, 'gerar') + else: + raise Exception('Este método só esta implementado no autorizador betha.') + + def enviar_lote(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'GINFES': + # xml + xml = '' + xml + # comunica via wsdl + return self._post_https(url, xml, 'enviar_lote') + else: + raise Exception('Este método só esta implementado no autorizador ginfes.') + + def consultar(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'GINFES': + # xml + xml = '' + xml + # comunica via wsdl + return self._post_https(url, xml, 'consulta') + else: + raise Exception('Este método só esta implementado no autorizador ginfes.') + + def consultar_rps(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'BETHA': + # comunica via wsdl + return self._post(url, xml, 'consultaRps') + elif self.autorizador == 'GINFES': + return self._post_https(url, xml, 'consultaRps') + # TODO outros autorizadres + else: + raise Exception('Autorizador não encontrado!') + + def consultar_faixa(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'BETHA': + # comunica via wsdl + return self._post(url, xml, 'consultaFaixa') + else: + raise Exception('Este método só esta implementado no autorizador betha.') + + def consultar_lote(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'GINFES': + # xml + xml = '' + xml + # comunica via wsdl + return self._post_https(url, xml, 'consulta_lote') + else: + raise Exception('Este método só esta implementado no autorizador ginfes.') + + def consultar_situacao_lote(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'GINFES': + # comunica via wsdl + return self._post_https(url, xml, 'consulta_situacao_lote') + else: + raise Exception('Este método só esta implementado no autorizador ginfes.') + + def cancelar(self, xml): + # url do serviço + url = self._get_url() + # Betha + if self.autorizador == 'BETHA': + # comunica via wsdl + return self._post(url, xml, 'cancelar') + # Ginfes + elif self.autorizador == 'GINFES': + # comunica via wsdl com certificado + return self._post_https(url, xml, 'cancelar') + # TODO outros autorizadres + else: + raise Exception('Autorizador não encontrado!') + + def _cabecalho(self, retorna_string=True): + """ Monta o XML do cabeçalho da requisição wsdl + Namespaces padrão homologação (Ginfes) """ + + xml_declaration = '' + # cabecalho = '3' + # cabecalho + raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao) + etree.SubElement(raiz, 'versaoDados').text = self._versao + + if retorna_string: + cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','') + cabecalho = xml_declaration + cabecalho + return cabecalho + else: + return raiz + + def _cabecalho2(self, retorna_string=True): + """ Monta o XML do cabeçalho da requisição wsdl + Namespaces que funcionaram em produção (Ginfes)""" + + xml_declaration = '' + + # cabecalho + raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao) + etree.SubElement(raiz, 'versaoDados').text = self._versao + + if retorna_string: + cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '') + cabecalho = xml_declaration + cabecalho + return cabecalho + else: + return raiz + + def _cabecalho_ginfes(self): + """ Retorna o XML do cabeçalho gerado pelo xsd""" + from pynfe.processamento.autorizador_nfse import SerializacaoGinfes + return SerializacaoGinfes().cabecalho() + + def _get_url(self): + """ Retorna a url para comunicação com o webservice """ + if self._ambiente == 1: + ambiente = 'HTTPS' + else: + ambiente = 'HOMOLOGACAO' + if self.autorizador in NFSE: + self.url = NFSE[self.autorizador][ambiente] + else: + raise Exception('Autorizador nao encontrado!') + return self.url + + def _post(self, url, xml, metodo): + """ Comunicação wsdl (http) sem certificado digital """ + # cabecalho + cabecalho = self._cabecalho() + # comunicacao wsdl + try: + from suds.client import Client + cliente = Client(url) + # gerar nfse + if metodo == 'gerar': + return cliente.service.GerarNfse(cabecalho, xml) + elif metodo == 'consultaRps': + return cliente.service.ConsultarNfsePorRps(cabecalho, xml) + elif metodo == 'consultaFaixa': + return cliente.service.ConsultarNfseFaixa(cabecalho, xml) + elif metodo == 'cancelar': + return cliente.service.CancelarNfse(cabecalho, xml) + # TODO outros metodos + else: + raise Exception('Método não implementado no autorizador.') + except Exception as e: + raise e + + def _post_https(self, url, xml, metodo): + """ Comunicação wsdl (https) utilizando certificado do usuário """ + # cabecalho + cabecalho = self._cabecalho() + # comunicacao wsdl + try: + from suds.client import Client + from pynfe.utils.https_nfse import HttpAuthenticated + + certificadoA1 = CertificadoA1(self.certificado) + chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True) + + cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url)) + + # gerar nfse + if metodo == 'gerar': + return cliente.service.GerarNfse(cabecalho, xml) + elif metodo == 'enviar_lote': + return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml) + elif metodo == 'consulta': + return cliente.service.ConsultarNfseV3(cabecalho, xml) + elif metodo == 'consulta_lote': + return cliente.service.ConsultarLoteRpsV3(cabecalho, xml) + elif metodo == 'consulta_situacao_lote': + return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml) + elif metodo == 'consultaRps': + return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml) + elif metodo == 'consultaFaixa': + return cliente.service.ConsultarNfseFaixa(cabecalho, xml) + elif metodo == 'cancelar': + # versão 2 + return cliente.service.CancelarNfse(xml) + # versão 3 + # return cliente.service.CancelarNfseV3(cabecalho, xml) + # TODO outros metodos + else: + raise Exception('Método não implementado no autorizador.') + except Exception as e: + raise e