From 1d38a8cfd623c17a70dc39524a956d2ffd79e8c7 Mon Sep 17 00:00:00 2001 From: Leonardo Gregianin Date: Thu, 25 Jun 2020 10:57:48 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Criada=20a=20comunica=C3=A7=C3=A3o=20co?= =?UTF-8?q?m=20o=20webservice=20do=20MDF-e.=20Separa=C3=A7=C3=A3o=20das=20?= =?UTF-8?q?classes=20de=20NFe,=20NFSe=20e=20MDFe.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pynfe/entidades/evento.py | 111 ++++++- pynfe/processamento/__init__.py | 5 +- pynfe/processamento/comunicacao.py | 647 +----------------------------------- pynfe/processamento/mdfe.py | 214 ++++++++++++ pynfe/processamento/nfe.py | 437 ++++++++++++++++++++++++ pynfe/processamento/nfse.py | 234 +++++++++++++ pynfe/processamento/resposta.py | 28 ++ pynfe/processamento/serializacao.py | 104 +++++- pynfe/utils/__init__.py | 14 +- pynfe/utils/flags.py | 5 + pynfe/utils/webservices.py | 15 + 11 files changed, 1175 insertions(+), 639 deletions(-) create mode 100644 pynfe/processamento/mdfe.py create mode 100644 pynfe/processamento/nfe.py create mode 100644 pynfe/processamento/nfse.py create mode 100644 pynfe/processamento/resposta.py diff --git a/pynfe/entidades/evento.py b/pynfe/entidades/evento.py index ffe7410..473f810 100644 --- a/pynfe/entidades/evento.py +++ b/pynfe/entidades/evento.py @@ -4,8 +4,10 @@ @author: Junior Tada, Leonardo Tada """ +from decimal import Decimal from .base import Entidade + class Evento(Entidade): # - Identificador da TAG a ser assinada, a regra de formação do Id é: “ID” + tpEvento + chave da NF-e + nSeqEvento id = str() @@ -114,4 +116,111 @@ class EventoManifestacaoDest(Evento): # - Informar a justificativa porque a operação não foi realizada, este campo deve ser informado somente no evento de Operação não Realizada. (min 15 max 255 caracteres) justificativa = str() - \ No newline at end of file + + +class EventoEncerramento(Evento): + + def __init__(self, *args, **kwargs): + super(EventoEncerramento, self).__init__(*args, **kwargs) + # - Código do evento = 110112 + self.tp_evento = '110112' + # - "Encerramento" + self.descricao = 'Encerramento' + + # - Informar o número do Protocolo de Autorização da MDF-e a ser Encerrada + protocolo = str() + # - Data e hora do evento no formato AAAA-MM-DDThh:mm:ssTZD + dtenc = None + # - uf de onde a manifesto foi encerrado + cuf = str() + # - minicipio onde o manifesto foi encerrado + cmun = str() + + +class EventoInclusaoCondutor(Evento): + + def __init__(self, *args, **kwargs): + super(EventoInclusaoCondutor, self).__init__(*args, **kwargs) + # - Código do evento = 110114 + self.tp_evento = '110114' + # - "Encerramento" + self.descricao = 'Inclusão Condutor' + + # - Nome do motorista + nome_motorista = str() + # - CPF do motorista + cpf_motorista = str() + + +class EventoInclusaoDFe(Evento): + + def __init__(self, *args, **kwargs): + super(EventoInclusaoDFe, self).__init__(*args, **kwargs) + # - Código do evento = 110115 + self.tp_evento = '110115' + # - "Inclusao DF-e" + self.descricao = 'Inclusao DF-e' + + # - Informar o número do Protocolo de Autorização da MDF-e a ser Incluida nova NF-e + protocolo = str() + # - Código IBGE do Município de Carregamento + cmun_carrega = str() + # - Nome do Município de Carregamento + xmun_carrega = str() + # - Código IBGE do Município de Descarga + cmun_descarga = str() + # - Nome do Município de Descarga + xmun_descarga = str() + # - Chave de Acesso da NF-e a ser incluída no MDFe + chave_nfe = str() + + +class EventoInclusaoPagamento(Evento): + + def __init__(self, *args, **kwargs): + super(EventoInclusaoPagamento, self).__init__(*args, **kwargs) + # - Código do evento = 110116 + self.tp_evento = '110116' + # - "Pagamento Operacao MDF-e" + self.descricao = 'Pagamento Operacao MDF-e' + + # - Informar o número do Protocolo de Autorização da MDF-e a ser Incluida nova NF-e + protocolo = str() + + # - Quantidade de viagens + qtd_viagens = str() + # - Número da viagem + nro_viagens = str() + + # Informações do pagamento + # - Nome do Contratante + nome_contratante = str() + # - CPF/CNPJ do Contratante + cpfcnpj_contratante = str() + + # Componentes do Pagamento + # - Tipo do pagamento + tpComp = str() + # - Valor + vComp = Decimal() + + # - Valor total do contrato + vContrato = Decimal() + # - Tipo do pagamento (0=a vista e 1=a prazo) + indPag = str() + + # Se o pagamento for a prazo + # - Numero da parcela + nParcela = str() + # - Data vencimento + dVenc = None + # - Valor da parcela + vParcela = Decimal() + + # Informações bancárias + # - CNPJ da Instituição de Pagamento eletrônico do Frete + CNPJIPEF = str() + # - Código do Banco + codBanco = str() + # - Código da Agência + codAgencia = str() diff --git a/pynfe/processamento/__init__.py b/pynfe/processamento/__init__.py index e63097d..7dfb5f6 100644 --- a/pynfe/processamento/__init__.py +++ b/pynfe/processamento/__init__.py @@ -2,5 +2,8 @@ from .serializacao import SerializacaoXML from .serializacao import SerializacaoNfse from .validacao import Validacao from .assinatura import AssinaturaA1 -from .comunicacao import ComunicacaoSefaz +from pynfe.entidades.certificado import CertificadoA1 +from .nfe import ComunicacaoNFe +from .mdfe import ComunicacaoMDFe +from .nfse import ComunicacaoNfse from .danfe import DanfeNfce diff --git a/pynfe/processamento/comunicacao.py b/pynfe/processamento/comunicacao.py index 2a9bd41..41cc426 100644 --- a/pynfe/processamento/comunicacao.py +++ b/pynfe/processamento/comunicacao.py @@ -1,8 +1,7 @@ # -*- coding: utf-8 -*- -import re import ssl import datetime -import requests + from pynfe.utils import etree, so_numeros from pynfe.utils.flags import ( NAMESPACE_NFE, @@ -15,14 +14,13 @@ from pynfe.utils.flags import ( NAMESPACE_METODO ) from pynfe.utils.webservices import NFE, NFCE, NFSE -from pynfe.entidades.certificado import CertificadoA1 from .assinatura import AssinaturaA1 class Comunicacao(object): """ Classe abstrata responsavel por definir os metodos e logica das classes - de comunicação com os webservices da NF-e. + de comunicação com os webservices. """ _ambiente = 1 # 1 = Produção, 2 = Homologação @@ -30,637 +28,22 @@ class Comunicacao(object): certificado = None certificado_senha = None url = None + _versao = False + _assinatura = AssinaturaA1 + _namespace = False + _header = False + _envio_mensagem = False + _namespace_metodo = False + _accept = False + _soap_action = False + _ws_url = False + _namespace_soap = NAMESPACE_SOAP + _namespace_xsi = NAMESPACE_XSI + _namespace_xsd = NAMESPACE_XSD + _soap_version = 'soap' def __init__(self, uf, certificado, certificado_senha, homologacao=False): self.uf = uf self.certificado = certificado self.certificado_senha = certificado_senha self._ambiente = 2 if homologacao else 1 - - -class ComunicacaoSefaz(Comunicacao): - """Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados.""" - - _versao = VERSAO_PADRAO - _assinatura = AssinaturaA1 - - def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1): - """ - Método para realizar autorização da nota de acordo com o modelo - :param modelo: Modelo - :param nota_fiscal: XML assinado - :param id_lote: Id do lote - numero autoincremental gerado pelo sistema - :param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono - :return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário, - envia todo o soap de resposta da Sefaz para decisão do usuário. - """ - # url do serviço - url = self._get_url(modelo=modelo, consulta='AUTORIZACAO') - - # Monta XML do corpo da requisição - raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO) - etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema - etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono - raiz.append(nota_fiscal) - - # Monta XML para envio da requisição - xml = self._construir_xml_soap('NFeAutorizacao4', raiz) - # Faz request no Servidor da Sefaz - retorno = self._post(url, xml) - - # Em caso de sucesso, retorna xml com nfe e protocolo de autorização. - # Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário. - if retorno.status_code == 200: - # namespace - ns = {'ns': NAMESPACE_NFE} - # Procuta status no xml - try: - prot = etree.fromstring(retorno.text) - except ValueError: - # em SP retorno.text apresenta erro - prot = etree.fromstring(retorno.content) - if ind_sinc == 1: - try: - # Protocolo com envio OK - try: - inf_prot = prot[0][0] # root protNFe - except IndexError: - # Estados como GO vem com a tag header - inf_prot = prot[1][0] - - lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text - # Lote processado - if lote_status == '104': - prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0] - status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text - # autorizado usa da NF-e - # retorna xml final (protNFe+NFe) - if status == '100': - raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO) - raiz.append(nota_fiscal) - raiz.append(prot_nfe) - return 0, raiz - except IndexError: - # Protocolo com algum erro no Envio - return 1, retorno, nota_fiscal - else: - # Retorna id do protocolo para posterior consulta em caso de sucesso. - rec = prot[0][0] - status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text - # Lote Recebido com Sucesso! - if status == '103': - nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text - return 0, nrec, nota_fiscal - return 1, retorno, nota_fiscal - - def consulta_recibo(self, modelo, numero): - """ - Este método oferece a consulta do resultado do processamento de um lote de NF-e. - O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de - 15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado - deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em - Processamento". - :param modelo: Modelo da nota - :param numero: Número da nota - :return: - """ - - # url do serviço - url = self._get_url(modelo=modelo, consulta='RECIBO') - - # Monta XML do corpo da requisição - raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) - etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) - etree.SubElement(raiz, 'nRec').text = numero - - # Monta XML para envio da requisição - xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz) - return self._post(url, xml) - - def consulta_nota(self, modelo, chave): - """ - Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal - da Secretaria de Fazenda Estadual. - :param modelo: Modelo da nota - :param chave: Chave da nota - :return: - """ - # url do serviço - url = self._get_url(modelo=modelo, consulta='CHAVE') - # Monta XML do corpo da requisição - raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) - etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) - etree.SubElement(raiz, 'xServ').text = 'CONSULTAR' - etree.SubElement(raiz, 'chNFe').text = chave - # Monta XML para envio da requisição - xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz) - return self._post(url, xml) - - def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0): - """ - O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag - informada no XML. As tags são distNSU, consNSU e consChNFe. - a) distNSU – Distribuição de Conjunto de DF-e a Partir do NSU Informado - b) consNSU – Consulta DF-e Vinculado ao NSU Informado - c) consChNFe – Consulta de NF-e por Chave de Acesso Informada - :param cnpj: CNPJ do interessado - :param cpf: CPF do interessado - :param chave: Chave da NF-e a ser consultada - :param nsu: Ultimo nsu ou nsu específico para ser consultado. - :return: - """ - # url - url = self._get_url_an(consulta='DISTRIBUICAO') - # Monta XML para envio da requisição - raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE) - etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) - if self.uf: - etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()] - if cnpj: - etree.SubElement(raiz, 'CNPJ').text = cnpj - else: - etree.SubElement(raiz, 'CPF').text = cpf - if not chave: - distNSU = etree.SubElement(raiz, 'distNSU') - etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15) - if chave: - consChNFe = etree.SubElement(raiz, 'consChNFe') - etree.SubElement(consChNFe, 'chNFe').text = chave - #Monta XML para envio da requisição - xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz) - - - return self._post(url, xml) - - def consulta_cadastro(self, modelo, cnpj): - """ - Consulta de cadastro - :param modelo: Modelo da nota - :param cnpj: CNPJ da empresa - :return: - """ - # UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC - lista_svrs = ['AC', 'RN', 'PB', 'SC'] - - # RS implementa um método diferente na consulta de cadastro - if self.uf.upper() == 'RS': - url = NFE['RS']['CADASTRO'] - elif self.uf.upper() in lista_svrs: - url = NFE['SVRS']['CADASTRO'] - elif self.uf.upper() == 'SVC-RS': - url = NFE['SVC-RS']['CADASTRO'] - else: - url = self._get_url(modelo=modelo, consulta='CADASTRO') - - raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE) - info = etree.SubElement(raiz, 'infCons') - etree.SubElement(info, 'xServ').text = 'CONS-CAD' - etree.SubElement(info, 'UF').text = self.uf.upper() - etree.SubElement(info, 'CNPJ').text = cnpj - # etree.SubElement(info, 'CPF').text = cpf - - # Monta XML para envio da requisição - xml = self._construir_xml_soap('CadConsultaCadastro4', raiz) - # Chama método que efetua a requisição POST no servidor SOAP - return self._post(url, xml) - - def evento(self, modelo, evento, id_lote=1): - """ - Envia um evento de nota fiscal (cancelamento e carta de correção) - :param modelo: Modelo da nota - :param evento: Eventro - :param id_lote: Id do lote - :return: - """ - - # url do serviço - try: - # manifestacao url é do AN - if evento[0][5].text.startswith('2'): - url = self._get_url_an(consulta='EVENTOS') - else: - url = self._get_url(modelo=modelo, consulta='EVENTOS') - except Exception: - url = self._get_url(modelo=modelo, consulta='EVENTOS') - - # Monta XML do corpo da requisição - raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE) - etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema - raiz.append(evento) - xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz) - return self._post(url, xml) - - def status_servico(self, modelo): - """ - Verifica status do servidor da receita. - :param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce - :return: - """ - url = self._get_url(modelo, 'STATUS') - # Monta XML do corpo da requisição - raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) - etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) - etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] - etree.SubElement(raiz, 'xServ').text = 'STATUS' - xml = self._construir_xml_soap('NFeStatusServico4', raiz) - return self._post(url, xml) - - def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'): - """ - Serviço destinado ao atendimento de solicitações de inutilização de numeração. - :param modelo: Modelo da nota - :param cnpj: CNPJda empresa - :param numero_inicial: Número inicial - :param numero_final: Número final - :param justificativa: Justificativa - :param ano: Ano - :param serie: Série - :return: - """ - - # url do servico - url = self._get_url(modelo=modelo, consulta='INUTILIZACAO') - - # Valores default - ano = str(ano or datetime.date.today().year)[-2:] - uf = CODIGOS_ESTADOS[self.uf.upper()] - cnpj = so_numeros(cnpj) - - # Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) + - # CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID” - id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % { - 'uf': uf, - 'ano': ano, - 'cnpj': cnpj, - 'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e; - 'serie': serie.zfill(3), - 'num_ini': str(numero_inicial).zfill(9), - 'num_fin': str(numero_final).zfill(9), - } - - # Monta XML do corpo da requisição # FIXME - raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) - inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico) - etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente) - etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR' - etree.SubElement(inf_inut, 'cUF').text = uf - etree.SubElement(inf_inut, 'ano').text = ano - etree.SubElement(inf_inut, 'CNPJ').text = cnpj - etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e - etree.SubElement(inf_inut, 'serie').text = serie - etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial) - etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final) - etree.SubElement(inf_inut, 'xJust').text = justificativa - - # assinatura - a1 = AssinaturaA1(self.certificado, self.certificado_senha) - xml = a1.assinar(raiz) - - # Monta XML para envio da requisição - xml = self._construir_xml_soap('NFeInutilizacao4', xml) - # Faz request no Servidor da Sefaz e retorna resposta - return self._post(url, xml) - - def _get_url_an(self, consulta): - # producao - if self._ambiente == 1: - if consulta == 'DISTRIBUICAO': - ambiente = 'https://www1.' - else: - ambiente = 'https://www.' - # homologacao - else: - ambiente = 'https://hom.' - - self.url = ambiente + NFE['AN'][consulta] - return self.url - - def _get_url(self, modelo, consulta): - """ Retorna a url para comunicação com o webservice """ - # estado que implementam webservices proprios - lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS'] - if self.uf.upper() in lista: - if self._ambiente == 1: - ambiente = 'HTTPS' - else: - ambiente = 'HOMOLOGACAO' - if modelo == 'nfe': - # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 - self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta] - elif modelo == 'nfce': - # PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe - if self.uf.upper() == 'PE' or self.uf.upper() == 'BA': - self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] - else: - # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 - self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta] - else: - raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') - # Estados que utilizam outros ambientes - else: - lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO'] - if self.uf.upper() in lista_svrs: - if self._ambiente == 1: - ambiente = 'HTTPS' - else: - ambiente = 'HOMOLOGACAO' - if modelo == 'nfe': - # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 - self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta] - elif modelo == 'nfce': - # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 - self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] - else: - raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') - # unico UF que utiliza SVAN ainda para NF-e - # SVRS para NFC-e - elif self.uf.upper() == 'MA': - if self._ambiente == 1: - ambiente = 'HTTPS' - else: - ambiente = 'HOMOLOGACAO' - if modelo == 'nfe': - # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 - self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta] - elif modelo == 'nfce': - # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 - self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] - else: - raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') - else: - raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}") - return self.url - - def _construir_xml_soap(self, metodo, dados, cabecalho=False): - """Mota o XML para o envio via SOAP""" - raiz = etree.Element('{%s}Envelope' % NAMESPACE_SOAP, nsmap={ - 'xsi': NAMESPACE_XSI, 'xsd': NAMESPACE_XSD,'soap': NAMESPACE_SOAP}) - body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP) - ## distribuição tem um corpo de xml diferente - if metodo == 'NFeDistribuicaoDFe': - x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo) - a = etree.SubElement(x, 'nfeDadosMsg') - else: - a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo) - a.append(dados) - return raiz - - def _post_header(self): - """Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" - # PE é a única UF que exige SOAPAction no header - response = { - 'content-type': 'application/soap+xml; charset=utf-8;', - 'Accept': 'application/soap+xml; charset=utf-8;', - } - if self.uf.upper() == 'PE': - response["SOAPAction"] = "" - return response - - def _post(self, url, xml): - certificado_a1 = CertificadoA1(self.certificado) - chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True) - chave_cert = (cert, chave) - # Abre a conexão HTTPS - try: - xml_declaration = '' - - # limpa xml com caracteres bugados para infNFeSupl em NFC-e - xml = re.sub( - '(.*?)', - lambda x: x.group(0).replace('<', '<').replace('>', '>').replace('&', ''), - etree.tostring(xml, encoding='unicode').replace('\n', '') - ) - xml = xml_declaration + xml - # Faz o request com o servidor - result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False) - result.encoding = 'utf-8' - return result - except requests.exceptions.RequestException as e: - raise e - finally: - certificado_a1.excluir() - - -class ComunicacaoNfse(Comunicacao): - """ Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """ - - _versao = '' - _namespace = '' - - def __init__(self, certificado, certificado_senha, autorizador, homologacao=False): - self.certificado = certificado - self.certificado_senha = certificado_senha - self._ambiente = 2 if homologacao else 1 - self.autorizador = autorizador.upper() - if self.autorizador == 'GINFES': - self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd' - self._versao = '3' - elif self.autorizador == 'BETHA': - self._namespace = NAMESPACE_BETHA - self._versao = '2.02' - else: - raise Exception('Autorizador não encontrado!') - - def autorizacao(self, nota): - # url do serviço - url = self._get_url() - if self.autorizador == 'BETHA': - # xml - xml = etree.tostring(nota, encoding='unicode', pretty_print=False) - # comunica via wsdl - return self._post(url, xml, 'gerar') - else: - raise Exception('Este método só esta implementado no autorizador betha.') - - def enviar_lote(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'GINFES': - # xml - xml = '' + xml - # comunica via wsdl - return self._post_https(url, xml, 'enviar_lote') - else: - raise Exception('Este método só esta implementado no autorizador ginfes.') - - def consultar(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'GINFES': - # xml - xml = '' + xml - # comunica via wsdl - return self._post_https(url, xml, 'consulta') - else: - raise Exception('Este método só esta implementado no autorizador ginfes.') - - def consultar_rps(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'BETHA': - # comunica via wsdl - return self._post(url, xml, 'consultaRps') - elif self.autorizador == 'GINFES': - return self._post_https(url, xml, 'consultaRps') - # TODO outros autorizadres - else: - raise Exception('Autorizador não encontrado!') - - def consultar_faixa(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'BETHA': - # comunica via wsdl - return self._post(url, xml, 'consultaFaixa') - else: - raise Exception('Este método só esta implementado no autorizador betha.') - - def consultar_lote(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'GINFES': - # xml - xml = '' + xml - # comunica via wsdl - return self._post_https(url, xml, 'consulta_lote') - else: - raise Exception('Este método só esta implementado no autorizador ginfes.') - - def consultar_situacao_lote(self, xml): - # url do serviço - url = self._get_url() - if self.autorizador == 'GINFES': - # comunica via wsdl - return self._post_https(url, xml, 'consulta_situacao_lote') - else: - raise Exception('Este método só esta implementado no autorizador ginfes.') - - def cancelar(self, xml): - # url do serviço - url = self._get_url() - # Betha - if self.autorizador == 'BETHA': - # comunica via wsdl - return self._post(url, xml, 'cancelar') - # Ginfes - elif self.autorizador == 'GINFES': - # comunica via wsdl com certificado - return self._post_https(url, xml, 'cancelar') - # TODO outros autorizadres - else: - raise Exception('Autorizador não encontrado!') - - def _cabecalho(self, retorna_string=True): - """ Monta o XML do cabeçalho da requisição wsdl - Namespaces padrão homologação (Ginfes) """ - - xml_declaration = '' - # cabecalho = '3' - # cabecalho - raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao) - etree.SubElement(raiz, 'versaoDados').text = self._versao - - if retorna_string: - cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','') - cabecalho = xml_declaration + cabecalho - return cabecalho - else: - return raiz - - def _cabecalho2(self, retorna_string=True): - """ Monta o XML do cabeçalho da requisição wsdl - Namespaces que funcionaram em produção (Ginfes)""" - - xml_declaration = '' - - # cabecalho - raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao) - etree.SubElement(raiz, 'versaoDados').text = self._versao - - if retorna_string: - cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '') - cabecalho = xml_declaration + cabecalho - return cabecalho - else: - return raiz - - def _cabecalho_ginfes(self): - """ Retorna o XML do cabeçalho gerado pelo xsd""" - from pynfe.processamento.autorizador_nfse import SerializacaoGinfes - return SerializacaoGinfes().cabecalho() - - def _get_url(self): - """ Retorna a url para comunicação com o webservice """ - if self._ambiente == 1: - ambiente = 'HTTPS' - else: - ambiente = 'HOMOLOGACAO' - if self.autorizador in NFSE: - self.url = NFSE[self.autorizador][ambiente] - else: - raise Exception('Autorizador nao encontrado!') - return self.url - - def _post(self, url, xml, metodo): - """ Comunicação wsdl (http) sem certificado digital """ - # cabecalho - cabecalho = self._cabecalho() - # comunicacao wsdl - try: - from suds.client import Client - cliente = Client(url) - # gerar nfse - if metodo == 'gerar': - return cliente.service.GerarNfse(cabecalho, xml) - elif metodo == 'consultaRps': - return cliente.service.ConsultarNfsePorRps(cabecalho, xml) - elif metodo == 'consultaFaixa': - return cliente.service.ConsultarNfseFaixa(cabecalho, xml) - elif metodo == 'cancelar': - return cliente.service.CancelarNfse(cabecalho, xml) - # TODO outros metodos - else: - raise Exception('Método não implementado no autorizador.') - except Exception as e: - raise e - - def _post_https(self, url, xml, metodo): - """ Comunicação wsdl (https) utilizando certificado do usuário """ - # cabecalho - cabecalho = self._cabecalho() - # comunicacao wsdl - try: - from suds.client import Client - from pynfe.utils.https_nfse import HttpAuthenticated - - certificadoA1 = CertificadoA1(self.certificado) - chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True) - - cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url)) - - # gerar nfse - if metodo == 'gerar': - return cliente.service.GerarNfse(cabecalho, xml) - elif metodo == 'enviar_lote': - return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml) - elif metodo == 'consulta': - return cliente.service.ConsultarNfseV3(cabecalho, xml) - elif metodo == 'consulta_lote': - return cliente.service.ConsultarLoteRpsV3(cabecalho, xml) - elif metodo == 'consulta_situacao_lote': - return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml) - elif metodo == 'consultaRps': - return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml) - elif metodo == 'consultaFaixa': - return cliente.service.ConsultarNfseFaixa(cabecalho, xml) - elif metodo == 'cancelar': - # versão 2 - return cliente.service.CancelarNfse(xml) - # versão 3 - # return cliente.service.CancelarNfseV3(cabecalho, xml) - # TODO outros metodos - else: - raise Exception('Método não implementado no autorizador.') - except Exception as e: - raise e diff --git a/pynfe/processamento/mdfe.py b/pynfe/processamento/mdfe.py new file mode 100644 index 0000000..ab5e153 --- /dev/null +++ b/pynfe/processamento/mdfe.py @@ -0,0 +1,214 @@ +# -*- coding: utf-8 -*- +import time +import re +import requests +import collections +from io import StringIO + + +from pynfe.utils.flags import ( + NAMESPACE_MDFE, + MODELO_MDFE, + VERSAO_MDFE, + NAMESPACE_MDFE_METODO, + NAMESPACE_SOAP, + NAMESPACE_XSI, + NAMESPACE_XSD, + CODIGOS_ESTADOS +) +from pynfe.utils.webservices import MDFE +from pynfe.entidades.certificado import CertificadoA1 +from pynfe.utils import etree, extrai_id_srtxml +from .comunicacao import Comunicacao +from .resposta import analisar_retorno + +MDFE_SITUACAO_JA_ENVIADO = ('100', '101', '132') + + +class ComunicacaoMDFe(Comunicacao): + + _modelo = MODELO_MDFE + _namespace = NAMESPACE_MDFE + _versao = VERSAO_MDFE + _header = 'mdfeCabecMsg' + _envio_mensagem = 'mdfeDadosMsg' + _retorno_mensagem = 'mdfeRecepcaoResult' + _namespace_metodo = NAMESPACE_MDFE_METODO + + _accept = True + _soap_action = False + _namespace_soap = NAMESPACE_SOAP + _namespace_xsi = NAMESPACE_XSI + _namespace_xsd = NAMESPACE_XSD + _soap_version = 'soap12' + _edoc_situacao_ja_enviado = MDFE_SITUACAO_JA_ENVIADO + _edoc_situacao_arquivo_recebido_com_sucesso = '103' + _edoc_situacao_em_processamento = '105' + _edoc_situacao_servico_em_operacao = '107' + + consulta_servico_ao_enviar = True + maximo_tentativas_consulta_recibo = 5 + + def status_servico(self): + url = self._get_url('STATUS') + # Monta XML do corpo da requisição + raiz = etree.Element('consStatServMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE) + etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) + etree.SubElement(raiz, 'xServ').text = 'STATUS' + xml = self._construir_xml_soap('MDFeStatusServico', raiz) + return self._post(url, xml) + + def consulta(self, chave): + url = self._get_url('CONSULTA') + # Monta XML do corpo da requisição + raiz = etree.Element('consSitMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE) + etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) + etree.SubElement(raiz, 'xServ').text = 'CONSULTAR' + etree.SubElement(raiz, 'chMDFe').text = chave + # Monta XML para envio da requisição + xml = self._construir_xml_soap('MDFeConsulta', raiz) + return self._post(url, xml) + + def consulta_nao_encerrados(self, cpfcnpj): + url = self._get_url('NAO_ENCERRADOS') + # Monta XML do corpo da requisição + # raiz = etree.Element('consMDFeNaoEnc', xmlns=NAMESPACE_MDFE, versao=self._versao) + attr = collections.OrderedDict() + attr['xmlns'] = NAMESPACE_MDFE + attr['versao'] = self._versao + raiz = etree.Element('consMDFeNaoEnc', attr) + etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) + etree.SubElement(raiz, 'xServ').text = 'CONSULTAR NÃO ENCERRADOS' + if len(cpfcnpj) == 11: + etree.SubElement(raiz, 'CPF').text = cpfcnpj.zfill(11) + else: + etree.SubElement(raiz, 'CNPJ').text = cpfcnpj.zfill(14) + # Monta XML para envio da requisição + xml = self._construir_xml_soap('MDFeConsNaoEnc', raiz) + return self._post(url, xml) + + def consulta_recibo(self, numero): + url = self._get_url('RET_RECEPCAO') + # Monta XML do corpo da requisição + raiz = etree.Element('consReciMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE) + etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) + etree.SubElement(raiz, 'nRec').text = numero.zfill(15) + # Monta XML para envio da requisição + xml = self._construir_xml_soap('MDFeRetRecepcao', raiz) + return self._post(url, xml) + + def evento(self, evento): + """ + Envia eventos do MDFe como: + Encerramento + Cancelamento + Inclusao Condutor + Inclusao DF-e + Pagamento Operacao MDF-e + :param evento: Nome do Evento + :return: + """ + # url do serviço + url = self._get_url('EVENTOS') + # Monta XML do corpo da requisição + xml = self._construir_xml_soap('MDFeRecepcaoEvento', evento) + return self._post(url, xml) + + def _construir_xml_soap(self, metodo, dados): + """Mota o XML para o envio via SOAP""" + + ns = collections.OrderedDict() + ns['xsi'] = self._namespace_xsi + ns['xsd'] = self._namespace_xsd + ns[self._soap_version] = self._namespace_soap + raiz = etree.Element( + '{%s}Envelope' % self._namespace_soap, + nsmap=ns + ) + + if self._header: + cabecalho = self._cabecalho_soap(metodo) + c = etree.SubElement(raiz, '{%s}Header' % self._namespace_soap) + c.append(cabecalho) + + body = etree.SubElement(raiz, '{%s}Body' % self._namespace_soap) + + a = etree.SubElement( + body, + self._envio_mensagem, + xmlns=self._namespace_metodo+metodo + ) + a.append(dados) + return raiz + + def _post_header(self, soap_webservice_method=False): + """Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" + header = { + b'content-type': b'text/xml; charset=utf-8;', + } + + # PE é a únca UF que exige SOAPAction no header + if soap_webservice_method: + header[b'SOAPAction'] = \ + (self._namespace_metodo + soap_webservice_method).encode('utf-8') + + if self._accept: + header[b'Accept'] = b'application/soap+xml; charset=utf-8;' + + return header + + def _post(self, url, xml): + certificado_a1 = CertificadoA1(self.certificado) + chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True) + chave_cert = (cert, chave) + # Abre a conexão HTTPS + try: + xml_declaration = '' + + # limpa xml com caracteres bugados para infNFeSupl em NFC-e + xml = re.sub( + '(.*?)', + lambda x: x.group(0).replace('<', '<').replace('>', '>').replace('&', ''), + etree.tostring(xml, encoding='unicode').replace('\n', '') + ) + xml = xml_declaration + xml + + # print(xml) + # print('-' * 20) + + # Faz o request com o servidor + result = requests.post( + url, + xml, + headers=self._post_header(), + cert=chave_cert, + verify=False + ) + result.encoding = 'utf-8' + return result + except requests.exceptions.RequestException as e: + raise e + finally: + certificado_a1.excluir() + + def _cabecalho_soap(self, metodo): + """Monta o XML do cabeçalho da requisição SOAP""" + + raiz = etree.Element( + self._header, + xmlns=self._namespace_metodo + metodo + ) + etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] + etree.SubElement(raiz, 'versaoDados').text = '3.00' + return raiz + + def _get_url(self, consulta): + # producao + if self._ambiente == 1: + ambiente = MDFE['SVRS']['HTTPS'] + # homologacao + else: + ambiente = MDFE['SVRS']['HOMOLOGACAO'] + + self.url = ambiente + MDFE['SVRS'][consulta] + return self.url diff --git a/pynfe/processamento/nfe.py b/pynfe/processamento/nfe.py new file mode 100644 index 0000000..b15da6c --- /dev/null +++ b/pynfe/processamento/nfe.py @@ -0,0 +1,437 @@ +# -*- coding: utf-8 -*- +import datetime +import re +import requests + +from pynfe.utils import etree, so_numeros +from pynfe.utils.flags import ( + NAMESPACE_NFE, + VERSAO_PADRAO, + CODIGOS_ESTADOS, + NAMESPACE_METODO, + NAMESPACE_SOAP, + NAMESPACE_XSI, + NAMESPACE_XSD, +) + +from pynfe.entidades.certificado import CertificadoA1 +from pynfe.utils.webservices import NFE, NFCE +from .assinatura import AssinaturaA1 +from .comunicacao import Comunicacao + + +class ComunicacaoNFe(Comunicacao): + """Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados.""" + + _versao = VERSAO_PADRAO + _assinatura = AssinaturaA1 + _namespace = NAMESPACE_NFE + _header = False + _envio_mensagem = 'nfeDadosMsg' + _namespace_metodo = NAMESPACE_METODO + _accept = False + _namespace_soap = NAMESPACE_SOAP + _namespace_xsi = NAMESPACE_XSI + _namespace_xsd = NAMESPACE_XSD + _soap_version = 'soap' + + def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1): + """ + Método para realizar autorização da nota de acordo com o modelo + :param modelo: Modelo + :param nota_fiscal: XML assinado + :param id_lote: Id do lote - numero autoincremental gerado pelo sistema + :param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono + :return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário, + envia todo o soap de resposta da Sefaz para decisão do usuário. + """ + # url do serviço + url = self._get_url(modelo=modelo, consulta='AUTORIZACAO') + + # Monta XML do corpo da requisição + raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO) + etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema + etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono + raiz.append(nota_fiscal) + + # Monta XML para envio da requisição + xml = self._construir_xml_soap('NFeAutorizacao4', raiz) + # Faz request no Servidor da Sefaz + retorno = self._post(url, xml) + + # Em caso de sucesso, retorna xml com nfe e protocolo de autorização. + # Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário. + if retorno.status_code == 200: + # namespace + ns = {'ns': NAMESPACE_NFE} + # Procuta status no xml + try: + prot = etree.fromstring(retorno.text) + except ValueError: + # em SP retorno.text apresenta erro + prot = etree.fromstring(retorno.content) + if ind_sinc == 1: + try: + # Protocolo com envio OK + try: + inf_prot = prot[0][0] # root protNFe + except IndexError: + # Estados como GO vem com a tag header + inf_prot = prot[1][0] + + lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text + # Lote processado + if lote_status == '104': + prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0] + status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text + # autorizado usa da NF-e + # retorna xml final (protNFe+NFe) + if status == '100': + raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO) + raiz.append(nota_fiscal) + raiz.append(prot_nfe) + return 0, raiz + except IndexError: + # Protocolo com algum erro no Envio + return 1, retorno, nota_fiscal + else: + # Retorna id do protocolo para posterior consulta em caso de sucesso. + rec = prot[0][0] + status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text + # Lote Recebido com Sucesso! + if status == '103': + nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text + return 0, nrec, nota_fiscal + return 1, retorno, nota_fiscal + + def consulta_recibo(self, modelo, numero): + """ + Este método oferece a consulta do resultado do processamento de um lote de NF-e. + O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de + 15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado + deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em + Processamento". + :param modelo: Modelo da nota + :param numero: Número da nota + :return: + """ + + # url do serviço + url = self._get_url(modelo=modelo, consulta='RECIBO') + + # Monta XML do corpo da requisição + raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) + etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) + etree.SubElement(raiz, 'nRec').text = numero + + # Monta XML para envio da requisição + xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz) + return self._post(url, xml) + + def consulta_nota(self, modelo, chave): + """ + Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal + da Secretaria de Fazenda Estadual. + :param modelo: Modelo da nota + :param chave: Chave da nota + :return: + """ + # url do serviço + url = self._get_url(modelo=modelo, consulta='CHAVE') + # Monta XML do corpo da requisição + raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) + etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) + etree.SubElement(raiz, 'xServ').text = 'CONSULTAR' + etree.SubElement(raiz, 'chNFe').text = chave + # Monta XML para envio da requisição + xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz) + return self._post(url, xml) + + def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0): + """ + O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag + informada no XML. As tags são distNSU, consNSU e consChNFe. + a) distNSU – Distribuição de Conjunto de DF-e a Partir do NSU Informado + b) consNSU – Consulta DF-e Vinculado ao NSU Informado + c) consChNFe – Consulta de NF-e por Chave de Acesso Informada + :param cnpj: CNPJ do interessado + :param cpf: CPF do interessado + :param chave: Chave da NF-e a ser consultada + :param nsu: Ultimo nsu ou nsu específico para ser consultado. + :return: + """ + # url + url = self._get_url_an(consulta='DISTRIBUICAO') + # Monta XML para envio da requisição + raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE) + etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) + if self.uf: + etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()] + if cnpj: + etree.SubElement(raiz, 'CNPJ').text = cnpj + else: + etree.SubElement(raiz, 'CPF').text = cpf + if not chave: + distNSU = etree.SubElement(raiz, 'distNSU') + etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15) + if chave: + consChNFe = etree.SubElement(raiz, 'consChNFe') + etree.SubElement(consChNFe, 'chNFe').text = chave + #Monta XML para envio da requisição + xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz) + return self._post(url, xml) + + def consulta_cadastro(self, modelo, cnpj): + """ + Consulta de cadastro + :param modelo: Modelo da nota + :param cnpj: CNPJ da empresa + :return: + """ + # UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC + lista_svrs = ['AC', 'RN', 'PB', 'SC'] + + # RS implementa um método diferente na consulta de cadastro + if self.uf.upper() == 'RS': + url = NFE['RS']['CADASTRO'] + elif self.uf.upper() in lista_svrs: + url = NFE['SVRS']['CADASTRO'] + elif self.uf.upper() == 'SVC-RS': + url = NFE['SVC-RS']['CADASTRO'] + else: + url = self._get_url(modelo=modelo, consulta='CADASTRO') + + raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE) + info = etree.SubElement(raiz, 'infCons') + etree.SubElement(info, 'xServ').text = 'CONS-CAD' + etree.SubElement(info, 'UF').text = self.uf.upper() + etree.SubElement(info, 'CNPJ').text = cnpj + # etree.SubElement(info, 'CPF').text = cpf + + # Monta XML para envio da requisição + xml = self._construir_xml_soap('CadConsultaCadastro4', raiz) + # Chama método que efetua a requisição POST no servidor SOAP + return self._post(url, xml) + + def evento(self, modelo, evento, id_lote=1): + """ + Envia um evento de nota fiscal (cancelamento e carta de correção) + :param modelo: Modelo da nota + :param evento: Eventro + :param id_lote: Id do lote + :return: + """ + + # url do serviço + try: + # manifestacao url é do AN + if evento[0][5].text.startswith('2'): + url = self._get_url_an(consulta='EVENTOS') + else: + url = self._get_url(modelo=modelo, consulta='EVENTOS') + except Exception: + url = self._get_url(modelo=modelo, consulta='EVENTOS') + + # Monta XML do corpo da requisição + raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE) + etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema + raiz.append(evento) + xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz) + return self._post(url, xml) + + def status_servico(self): + """ + Verifica status do servidor da receita. + :param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce + :return: + """ + url = self._get_url('mdfe', 'STATUS') + # Monta XML do corpo da requisição + raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) + etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) + etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] + etree.SubElement(raiz, 'xServ').text = 'STATUS' + xml = self._construir_xml_soap('NFeStatusServico4', raiz) + return self._post(url, xml) + + def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'): + """ + Serviço destinado ao atendimento de solicitações de inutilização de numeração. + :param modelo: Modelo da nota + :param cnpj: CNPJda empresa + :param numero_inicial: Número inicial + :param numero_final: Número final + :param justificativa: Justificativa + :param ano: Ano + :param serie: Série + :return: + """ + + # url do servico + url = self._get_url(modelo=modelo, consulta='INUTILIZACAO') + + # Valores default + ano = str(ano or datetime.date.today().year)[-2:] + uf = CODIGOS_ESTADOS[self.uf.upper()] + cnpj = so_numeros(cnpj) + + # Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) + + # CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID” + id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % { + 'uf': uf, + 'ano': ano, + 'cnpj': cnpj, + 'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e; + 'serie': serie.zfill(3), + 'num_ini': str(numero_inicial).zfill(9), + 'num_fin': str(numero_final).zfill(9), + } + + # Monta XML do corpo da requisição # FIXME + raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) + inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico) + etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente) + etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR' + etree.SubElement(inf_inut, 'cUF').text = uf + etree.SubElement(inf_inut, 'ano').text = ano + etree.SubElement(inf_inut, 'CNPJ').text = cnpj + etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e + etree.SubElement(inf_inut, 'serie').text = serie + etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial) + etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final) + etree.SubElement(inf_inut, 'xJust').text = justificativa + + # assinatura + a1 = AssinaturaA1(self.certificado, self.certificado_senha) + xml = a1.assinar(raiz) + + # Monta XML para envio da requisição + xml = self._construir_xml_soap('NFeInutilizacao4', xml) + # Faz request no Servidor da Sefaz e retorna resposta + return self._post(url, xml) + + def _get_url_an(self, consulta): + # producao + if self._ambiente == 1: + if consulta == 'DISTRIBUICAO': + ambiente = 'https://www1.' + else: + ambiente = 'https://www.' + # homologacao + else: + ambiente = 'https://hom.' + + self.url = ambiente + NFE['AN'][consulta] + return self.url + + def _get_url(self, modelo, consulta): + """ Retorna a url para comunicação com o webservice """ + # estado que implementam webservices proprios + lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS'] + if self.uf.upper() in lista: + if self._ambiente == 1: + ambiente = 'HTTPS' + else: + ambiente = 'HOMOLOGACAO' + if modelo == 'nfe': + # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 + self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta] + elif modelo == 'nfce': + # PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe + if self.uf.upper() == 'PE' or self.uf.upper() == 'BA': + self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] + else: + # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 + self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta] + else: + raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') + # Estados que utilizam outros ambientes + else: + lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO'] + if self.uf.upper() in lista_svrs: + if self._ambiente == 1: + ambiente = 'HTTPS' + else: + ambiente = 'HOMOLOGACAO' + if modelo == 'nfe': + # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 + self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta] + elif modelo == 'nfce': + # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 + self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] + else: + raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') + # unico UF que utiliza SVAN ainda para NF-e + # SVRS para NFC-e + elif self.uf.upper() == 'MA': + if self._ambiente == 1: + ambiente = 'HTTPS' + else: + ambiente = 'HOMOLOGACAO' + if modelo == 'nfe': + # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 + self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta] + elif modelo == 'nfce': + # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 + self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] + else: + raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') + else: + raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}") + return self.url + + def _construir_xml_soap(self, metodo, dados, cabecalho=False): + """Mota o XML para o envio via SOAP""" + + raiz = etree.Element( + '{%s}Envelope' % NAMESPACE_SOAP, + nsmap={ + 'xsi': NAMESPACE_XSI, + 'xsd': NAMESPACE_XSD, + 'soap': NAMESPACE_SOAP + } + ) + body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP) + # distribuição tem um corpo de xml diferente + if metodo == 'NFeDistribuicaoDFe': + x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo) + a = etree.SubElement(x, 'nfeDadosMsg') + else: + a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo) + a.append(dados) + return raiz + + def _post_header(self): + """Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" + # PE é a única UF que exige SOAPAction no header + response = { + 'content-type': 'application/soap+xml; charset=utf-8;', + 'Accept': 'application/soap+xml; charset=utf-8;', + } + if self.uf.upper() == 'PE': + response["SOAPAction"] = "" + return response + + def _post(self, url, xml): + certificado_a1 = CertificadoA1(self.certificado) + chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True) + chave_cert = (cert, chave) + # Abre a conexão HTTPS + try: + xml_declaration = '' + + # limpa xml com caracteres bugados para infNFeSupl em NFC-e + xml = re.sub( + '(.*?)', + lambda x: x.group(0).replace('<', '<').replace('>', '>').replace('&', ''), + etree.tostring(xml, encoding='unicode').replace('\n', '') + ) + xml = xml_declaration + xml + # Faz o request com o servidor + result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False) + result.encoding = 'utf-8' + return result + except requests.exceptions.RequestException as e: + raise e + finally: + certificado_a1.excluir() diff --git a/pynfe/processamento/nfse.py b/pynfe/processamento/nfse.py new file mode 100644 index 0000000..5d75a52 --- /dev/null +++ b/pynfe/processamento/nfse.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +from pynfe.utils import etree +from pynfe.utils.flags import ( + NAMESPACE_XSI, + NAMESPACE_BETHA, +) +from pynfe.utils.webservices import NFSE +from pynfe.entidades.certificado import CertificadoA1 +from .comunicacao import Comunicacao + + +class ComunicacaoNfse(Comunicacao): + """ Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """ + + _versao = '' + _namespace = '' + + def __init__(self, certificado, certificado_senha, autorizador, homologacao=False): + self.certificado = certificado + self.certificado_senha = certificado_senha + self._ambiente = 2 if homologacao else 1 + self.autorizador = autorizador.upper() + if self.autorizador == 'GINFES': + self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd' + self._versao = '3' + elif self.autorizador == 'BETHA': + self._namespace = NAMESPACE_BETHA + self._versao = '2.02' + else: + raise Exception('Autorizador não encontrado!') + + def autorizacao(self, nota): + # url do serviço + url = self._get_url() + if self.autorizador == 'BETHA': + # xml + xml = etree.tostring(nota, encoding='unicode', pretty_print=False) + # comunica via wsdl + return self._post(url, xml, 'gerar') + else: + raise Exception('Este método só esta implementado no autorizador betha.') + + def enviar_lote(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'GINFES': + # xml + xml = '' + xml + # comunica via wsdl + return self._post_https(url, xml, 'enviar_lote') + else: + raise Exception('Este método só esta implementado no autorizador ginfes.') + + def consultar(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'GINFES': + # xml + xml = '' + xml + # comunica via wsdl + return self._post_https(url, xml, 'consulta') + else: + raise Exception('Este método só esta implementado no autorizador ginfes.') + + def consultar_rps(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'BETHA': + # comunica via wsdl + return self._post(url, xml, 'consultaRps') + elif self.autorizador == 'GINFES': + return self._post_https(url, xml, 'consultaRps') + # TODO outros autorizadres + else: + raise Exception('Autorizador não encontrado!') + + def consultar_faixa(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'BETHA': + # comunica via wsdl + return self._post(url, xml, 'consultaFaixa') + else: + raise Exception('Este método só esta implementado no autorizador betha.') + + def consultar_lote(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'GINFES': + # xml + xml = '' + xml + # comunica via wsdl + return self._post_https(url, xml, 'consulta_lote') + else: + raise Exception('Este método só esta implementado no autorizador ginfes.') + + def consultar_situacao_lote(self, xml): + # url do serviço + url = self._get_url() + if self.autorizador == 'GINFES': + # comunica via wsdl + return self._post_https(url, xml, 'consulta_situacao_lote') + else: + raise Exception('Este método só esta implementado no autorizador ginfes.') + + def cancelar(self, xml): + # url do serviço + url = self._get_url() + # Betha + if self.autorizador == 'BETHA': + # comunica via wsdl + return self._post(url, xml, 'cancelar') + # Ginfes + elif self.autorizador == 'GINFES': + # comunica via wsdl com certificado + return self._post_https(url, xml, 'cancelar') + # TODO outros autorizadres + else: + raise Exception('Autorizador não encontrado!') + + def _cabecalho(self, retorna_string=True): + """ Monta o XML do cabeçalho da requisição wsdl + Namespaces padrão homologação (Ginfes) """ + + xml_declaration = '' + # cabecalho = '3' + # cabecalho + raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao) + etree.SubElement(raiz, 'versaoDados').text = self._versao + + if retorna_string: + cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','') + cabecalho = xml_declaration + cabecalho + return cabecalho + else: + return raiz + + def _cabecalho2(self, retorna_string=True): + """ Monta o XML do cabeçalho da requisição wsdl + Namespaces que funcionaram em produção (Ginfes)""" + + xml_declaration = '' + + # cabecalho + raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao) + etree.SubElement(raiz, 'versaoDados').text = self._versao + + if retorna_string: + cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '') + cabecalho = xml_declaration + cabecalho + return cabecalho + else: + return raiz + + def _cabecalho_ginfes(self): + """ Retorna o XML do cabeçalho gerado pelo xsd""" + from pynfe.processamento.autorizador_nfse import SerializacaoGinfes + return SerializacaoGinfes().cabecalho() + + def _get_url(self): + """ Retorna a url para comunicação com o webservice """ + if self._ambiente == 1: + ambiente = 'HTTPS' + else: + ambiente = 'HOMOLOGACAO' + if self.autorizador in NFSE: + self.url = NFSE[self.autorizador][ambiente] + else: + raise Exception('Autorizador nao encontrado!') + return self.url + + def _post(self, url, xml, metodo): + """ Comunicação wsdl (http) sem certificado digital """ + # cabecalho + cabecalho = self._cabecalho() + # comunicacao wsdl + try: + from suds.client import Client + cliente = Client(url) + # gerar nfse + if metodo == 'gerar': + return cliente.service.GerarNfse(cabecalho, xml) + elif metodo == 'consultaRps': + return cliente.service.ConsultarNfsePorRps(cabecalho, xml) + elif metodo == 'consultaFaixa': + return cliente.service.ConsultarNfseFaixa(cabecalho, xml) + elif metodo == 'cancelar': + return cliente.service.CancelarNfse(cabecalho, xml) + # TODO outros metodos + else: + raise Exception('Método não implementado no autorizador.') + except Exception as e: + raise e + + def _post_https(self, url, xml, metodo): + """ Comunicação wsdl (https) utilizando certificado do usuário """ + # cabecalho + cabecalho = self._cabecalho() + # comunicacao wsdl + try: + from suds.client import Client + from pynfe.utils.https_nfse import HttpAuthenticated + + certificadoA1 = CertificadoA1(self.certificado) + chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True) + + cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url)) + + # gerar nfse + if metodo == 'gerar': + return cliente.service.GerarNfse(cabecalho, xml) + elif metodo == 'enviar_lote': + return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml) + elif metodo == 'consulta': + return cliente.service.ConsultarNfseV3(cabecalho, xml) + elif metodo == 'consulta_lote': + return cliente.service.ConsultarLoteRpsV3(cabecalho, xml) + elif metodo == 'consulta_situacao_lote': + return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml) + elif metodo == 'consultaRps': + return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml) + elif metodo == 'consultaFaixa': + return cliente.service.ConsultarNfseFaixa(cabecalho, xml) + elif metodo == 'cancelar': + # versão 2 + return cliente.service.CancelarNfse(xml) + # versão 3 + # return cliente.service.CancelarNfseV3(cabecalho, xml) + # TODO outros metodos + else: + raise Exception('Método não implementado no autorizador.') + except Exception as e: + raise e diff --git a/pynfe/processamento/resposta.py b/pynfe/processamento/resposta.py new file mode 100644 index 0000000..40f1529 --- /dev/null +++ b/pynfe/processamento/resposta.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +import re +from pynfe.utils import etree + + +class RetornoSoap(object): + + def __init__(self, webservice, retorno, resposta): + self.webservice = webservice + self.resposta = resposta + self.retorno = retorno + + +def analisar_retorno(webservice, retorno, classe_resposta): + + # retorno.raise_for_status() + # print(retorno.text) + + match = re.search('(.*?)', retorno.text) + + if match: + resultado = etree.tostring(etree.fromstring(match.group(1))[0]) + # classe_resposta.Validate_simpletypes_ = False + # resposta = classe_resposta.parseString(resultado) + resposta = resultado + # resposta = retorno.text + + return RetornoSoap(webservice, retorno, resposta) diff --git a/pynfe/processamento/serializacao.py b/pynfe/processamento/serializacao.py index c571654..9f0ddc2 100644 --- a/pynfe/processamento/serializacao.py +++ b/pynfe/processamento/serializacao.py @@ -1,9 +1,19 @@ # -*- coding: utf-8 -*- from pynfe.entidades import NotaFiscal -from pynfe.utils import etree, so_numeros, obter_municipio_por_codigo, \ - obter_pais_por_codigo, obter_municipio_e_codigo, formatar_decimal, \ +from pynfe.utils import ( + etree, so_numeros, obter_municipio_por_codigo, + obter_pais_por_codigo, obter_municipio_e_codigo, formatar_decimal, remover_acentos, obter_uf_por_codigo, obter_codigo_por_municipio -from pynfe.utils.flags import CODIGOS_ESTADOS, VERSAO_PADRAO, NAMESPACE_NFE, NAMESPACE_SIG, VERSAO_QRCODE +) +from pynfe.utils.flags import ( + CODIGOS_ESTADOS, + VERSAO_PADRAO, + VERSAO_MDFE, + NAMESPACE_NFE, + NAMESPACE_MDFE, + NAMESPACE_SIG, + VERSAO_QRCODE +) from pynfe.utils.webservices import NFCE import base64 import hashlib @@ -674,7 +684,6 @@ class SerializacaoXML(Serializacao): raiz.append(self._serializar_responsavel_tecnico( nota_fiscal.responsavel_tecnico[0], retorna_string=False)) - if retorna_string: return etree.tostring(raiz, encoding="unicode", pretty_print=True) else: @@ -712,6 +721,93 @@ class SerializacaoXML(Serializacao): else: return raiz + def serializar_evento_mdfe(self, evento, tag_raiz='eventoMDFe', retorna_string=False): + tz = datetime.now().astimezone().strftime('%z') + tz = "{}:{}".format(tz[:-2], tz[-2:]) + raiz = etree.Element(tag_raiz, versao=VERSAO_MDFE, xmlns=NAMESPACE_MDFE) + e = etree.SubElement(raiz, 'infEvento', Id=evento.identificador) + etree.SubElement(e, 'cOrgao').text = CODIGOS_ESTADOS[evento.uf.upper()] + etree.SubElement(e, 'tpAmb').text = str(self._ambiente) + if len(so_numeros(evento.cnpj)) == 11: + etree.SubElement(e, 'CPF').text = evento.cnpj + else: + etree.SubElement(e, 'CNPJ').text = evento.cnpj + etree.SubElement(e, 'chMDFe').text = evento.chave + etree.SubElement(e, 'dhEvento').text = evento.data_emissao.strftime('%Y-%m-%dT%H:%M:%S') + tz + etree.SubElement(e, 'tpEvento').text = evento.tp_evento + etree.SubElement(e, 'nSeqEvento').text = str(evento.n_seq_evento) + det = etree.SubElement(e, 'detEvento', versaoEvento=VERSAO_MDFE) + if evento.descricao == 'Encerramento': + encerramento = etree.SubElement(det, 'evEncMDFe') + etree.SubElement(encerramento, 'descEvento').text = evento.descricao + etree.SubElement(encerramento, 'nProt').text = evento.protocolo + etree.SubElement(encerramento, 'dtEnc').text = evento.dtenc.strftime('%Y-%m-%d') + etree.SubElement(encerramento, 'cUF').text = evento.cuf + etree.SubElement(encerramento, 'cMun').text = evento.cmun + elif evento.descricao == 'Inclusão Condutor': + inclusao = etree.SubElement(det, 'evIncCondutorMDFe') + etree.SubElement(inclusao, 'descEvento').text = evento.descricao + condutor = etree.SubElement(inclusao, 'condutor') + etree.SubElement(condutor, 'xNome').text = evento.nome_motorista + etree.SubElement(condutor, 'CPF').text = evento.cpf_motorista + elif evento.descricao == 'Inclusao DF-e': + inclusao = etree.SubElement(det, 'evIncDFeMDFe') + etree.SubElement(inclusao, 'descEvento').text = evento.descricao + etree.SubElement(inclusao, 'nProt').text = evento.protocolo + etree.SubElement(inclusao, 'cMunCarrega').text = evento.cmun_carrega + etree.SubElement(inclusao, 'xMunCarrega').text = evento.xmun_carrega + infDoc = etree.SubElement(inclusao, 'infDoc') + etree.SubElement(infDoc, 'cMunDescarga').text = evento.cmun_descarga + etree.SubElement(infDoc, 'xMunDescarga').text = evento.xmun_descarga + etree.SubElement(infDoc, 'chNFe').text = evento.chave_nfe + elif evento.descricao == 'Pagamento Operacao MDF-e': + pagamento = etree.SubElement(det, 'evPagtoOperMDFe') + etree.SubElement(pagamento, 'descEvento').text = evento.descricao + etree.SubElement(pagamento, 'nProt').text = evento.protocolo + + # Viagens + infViagens = etree.SubElement(pagamento, 'infViagens') + etree.SubElement(infViagens, 'qtdViagens').text = evento.qtd_viagens.zfill(5) + etree.SubElement(infViagens, 'nroViagem').text = evento.nro_viagens.zfill(5) + + # Informações do pagamento + infPag = etree.SubElement(pagamento, 'infPag') + etree.SubElement(infPag, 'xNome').text = evento.nome_contratante + if len(evento.cpfcnpj_contratante) == 11: + etree.SubElement(infPag, 'CPF').text = evento.cpfcnpj_contratante + else: + etree.SubElement(infPag, 'CNPJ').text = evento.cpfcnpj_contratante + + # Componentes de Pagamento do Frete + Comp = etree.SubElement(infPag, 'Comp') + etree.SubElement(Comp, 'tpComp').text = evento.tpComp.zfill(2) + etree.SubElement(Comp, 'vComp').text = '{:.2f}'.format(evento.vComp) + + # Continuação das Informações do pagamento + etree.SubElement(infPag, 'vContrato').text = '{:.2f}'.format(evento.vContrato) + etree.SubElement(infPag, 'indPag').text = evento.indPag + + # Se indPag == 1 (0=A vista e 1=A prazo) + if evento.indPag != '': + if int(evento.indPag) == 1: + infPrazo = etree.SubElement(infPag, 'infPrazo') + etree.SubElement(infPrazo, 'nParcela').text = evento.nParcela.zfill(3) + etree.SubElement(infPrazo, 'dVenc').text = evento.dVenc.strftime('%Y-%m-%d') + etree.SubElement(infPrazo, 'vParcela').text = '{:.2f}'.format(evento.vParcela) + + # Informações bancárias + infBanc = etree.SubElement(infPag, 'infBanc') + if evento.CNPJIPEF != '': + etree.SubElement(infBanc, 'CNPJIPEF').text = evento.CNPJIPEF.zfill(14) + else: + etree.SubElement(infBanc, 'codBanco').text = evento.codBanco + etree.SubElement(infBanc, 'codAgencia').text = evento.codAgencia + + if retorna_string: + return etree.tostring(raiz, encoding="unicode", pretty_print=True) + else: + return raiz + class SerializacaoQrcode(object): """ Classe que gera e serializa o qrcode de NFC-e no xml """ diff --git a/pynfe/utils/__init__.py b/pynfe/utils/__init__.py index 2aeeab6..a971aa1 100644 --- a/pynfe/utils/__init__.py +++ b/pynfe/utils/__init__.py @@ -3,13 +3,17 @@ import os import codecs from unicodedata import normalize +import re try: from lxml import etree except ImportError: raise Exception('Falhou ao importar lxml/ElementTree') -from io import StringIO +try: + from StringIO import StringIO +except ImportError: + from io import StringIO try: from . import flags @@ -146,3 +150,11 @@ def obter_uf_por_codigo(codigo_uf): def remover_acentos(txt): return normalize('NFKD', txt).encode('ASCII','ignore').decode('ASCII') + + +def extrai_id_srtxml(edoc): + result = '' + match = re.search('Id=[^0-9]+(\d+)"', edoc) + if match: + result = match.group(1) + return result \ No newline at end of file diff --git a/pynfe/utils/flags.py b/pynfe/utils/flags.py index 084389d..d43df33 100644 --- a/pynfe/utils/flags.py +++ b/pynfe/utils/flags.py @@ -10,6 +10,11 @@ NAMESPACE_METODO = 'http://www.portalfiscal.inf.br/nfe/wsdl/' NAMESPACE_SOAP_NFSE = 'http://schemas.xmlsoap.org/soap/envelope/' NAMESPACE_BETHA = 'http://www.betha.com.br/e-nota-contribuinte-ws' +NAMESPACE_MDFE = 'http://www.portalfiscal.inf.br/mdfe' +NAMESPACE_MDFE_METODO = 'http://www.portalfiscal.inf.br/mdfe/wsdl/' +MODELO_MDFE = '58' +VERSAO_MDFE = '3.00' + VERSAO_PADRAO = '4.00' VERSAO_QRCODE = '2' diff --git a/pynfe/utils/webservices.py b/pynfe/utils/webservices.py index 49c6029..acc1585 100644 --- a/pynfe/utils/webservices.py +++ b/pynfe/utils/webservices.py @@ -498,3 +498,18 @@ NFSE = { 'HOMOLOGACAO':'https://homologacao.ginfes.com.br/ServiceGinfesImpl?wsdl' } } + +# MDF-e +MDFE = { + # unico autorizador de MDF-e + 'SVRS': { + 'RECEPCAO': 'mdferecepcao/MDFeRecepcao.asmx', + 'RET_RECEPCAO': 'mdferetrecepcao/MDFeRetRecepcao.asmx', + 'EVENTOS': 'mdferecepcaoevento/MDFeRecepcaoEvento.asmx', + 'CONSULTA': 'mdfeconsulta/MDFeConsulta.asmx', + 'STATUS': 'mdfestatusservico/MDFeStatusServico.asmx', + 'NAO_ENCERRADOS': 'mdfeconsnaoenc/MDFeConsNaoEnc.asmx', + 'HTTPS': 'https://mdfe.svrs.rs.gov.br/ws/', + 'HOMOLOGACAO': 'https://mdfe-homologacao.svrs.rs.gov.br/ws/' + } +}