diff --git a/pynfe/entidades/evento.py b/pynfe/entidades/evento.py
index ffe7410..473f810 100644
--- a/pynfe/entidades/evento.py
+++ b/pynfe/entidades/evento.py
@@ -4,8 +4,10 @@
@author: Junior Tada, Leonardo Tada
"""
+from decimal import Decimal
from .base import Entidade
+
class Evento(Entidade):
# - Identificador da TAG a ser assinada, a regra de formação do Id é: “ID” + tpEvento + chave da NF-e + nSeqEvento
id = str()
@@ -114,4 +116,111 @@ class EventoManifestacaoDest(Evento):
# - Informar a justificativa porque a operação não foi realizada, este campo deve ser informado somente no evento de Operação não Realizada. (min 15 max 255 caracteres)
justificativa = str()
-
\ No newline at end of file
+
+
+class EventoEncerramento(Evento):
+
+ def __init__(self, *args, **kwargs):
+ super(EventoEncerramento, self).__init__(*args, **kwargs)
+ # - Código do evento = 110112
+ self.tp_evento = '110112'
+ # - "Encerramento"
+ self.descricao = 'Encerramento'
+
+ # - Informar o número do Protocolo de Autorização da MDF-e a ser Encerrada
+ protocolo = str()
+ # - Data e hora do evento no formato AAAA-MM-DDThh:mm:ssTZD
+ dtenc = None
+ # - uf de onde a manifesto foi encerrado
+ cuf = str()
+ # - minicipio onde o manifesto foi encerrado
+ cmun = str()
+
+
+class EventoInclusaoCondutor(Evento):
+
+ def __init__(self, *args, **kwargs):
+ super(EventoInclusaoCondutor, self).__init__(*args, **kwargs)
+ # - Código do evento = 110114
+ self.tp_evento = '110114'
+ # - "Encerramento"
+ self.descricao = 'Inclusão Condutor'
+
+ # - Nome do motorista
+ nome_motorista = str()
+ # - CPF do motorista
+ cpf_motorista = str()
+
+
+class EventoInclusaoDFe(Evento):
+
+ def __init__(self, *args, **kwargs):
+ super(EventoInclusaoDFe, self).__init__(*args, **kwargs)
+ # - Código do evento = 110115
+ self.tp_evento = '110115'
+ # - "Inclusao DF-e"
+ self.descricao = 'Inclusao DF-e'
+
+ # - Informar o número do Protocolo de Autorização da MDF-e a ser Incluida nova NF-e
+ protocolo = str()
+ # - Código IBGE do Município de Carregamento
+ cmun_carrega = str()
+ # - Nome do Município de Carregamento
+ xmun_carrega = str()
+ # - Código IBGE do Município de Descarga
+ cmun_descarga = str()
+ # - Nome do Município de Descarga
+ xmun_descarga = str()
+ # - Chave de Acesso da NF-e a ser incluída no MDFe
+ chave_nfe = str()
+
+
+class EventoInclusaoPagamento(Evento):
+
+ def __init__(self, *args, **kwargs):
+ super(EventoInclusaoPagamento, self).__init__(*args, **kwargs)
+ # - Código do evento = 110116
+ self.tp_evento = '110116'
+ # - "Pagamento Operacao MDF-e"
+ self.descricao = 'Pagamento Operacao MDF-e'
+
+ # - Informar o número do Protocolo de Autorização da MDF-e a ser Incluida nova NF-e
+ protocolo = str()
+
+ # - Quantidade de viagens
+ qtd_viagens = str()
+ # - Número da viagem
+ nro_viagens = str()
+
+ # Informações do pagamento
+ # - Nome do Contratante
+ nome_contratante = str()
+ # - CPF/CNPJ do Contratante
+ cpfcnpj_contratante = str()
+
+ # Componentes do Pagamento
+ # - Tipo do pagamento
+ tpComp = str()
+ # - Valor
+ vComp = Decimal()
+
+ # - Valor total do contrato
+ vContrato = Decimal()
+ # - Tipo do pagamento (0=a vista e 1=a prazo)
+ indPag = str()
+
+ # Se o pagamento for a prazo
+ # - Numero da parcela
+ nParcela = str()
+ # - Data vencimento
+ dVenc = None
+ # - Valor da parcela
+ vParcela = Decimal()
+
+ # Informações bancárias
+ # - CNPJ da Instituição de Pagamento eletrônico do Frete
+ CNPJIPEF = str()
+ # - Código do Banco
+ codBanco = str()
+ # - Código da Agência
+ codAgencia = str()
diff --git a/pynfe/processamento/__init__.py b/pynfe/processamento/__init__.py
index e63097d..7dfb5f6 100644
--- a/pynfe/processamento/__init__.py
+++ b/pynfe/processamento/__init__.py
@@ -2,5 +2,8 @@ from .serializacao import SerializacaoXML
from .serializacao import SerializacaoNfse
from .validacao import Validacao
from .assinatura import AssinaturaA1
-from .comunicacao import ComunicacaoSefaz
+from pynfe.entidades.certificado import CertificadoA1
+from .nfe import ComunicacaoNFe
+from .mdfe import ComunicacaoMDFe
+from .nfse import ComunicacaoNfse
from .danfe import DanfeNfce
diff --git a/pynfe/processamento/comunicacao.py b/pynfe/processamento/comunicacao.py
index 2a9bd41..41cc426 100644
--- a/pynfe/processamento/comunicacao.py
+++ b/pynfe/processamento/comunicacao.py
@@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
-import re
import ssl
import datetime
-import requests
+
from pynfe.utils import etree, so_numeros
from pynfe.utils.flags import (
NAMESPACE_NFE,
@@ -15,14 +14,13 @@ from pynfe.utils.flags import (
NAMESPACE_METODO
)
from pynfe.utils.webservices import NFE, NFCE, NFSE
-from pynfe.entidades.certificado import CertificadoA1
from .assinatura import AssinaturaA1
class Comunicacao(object):
"""
Classe abstrata responsavel por definir os metodos e logica das classes
- de comunicação com os webservices da NF-e.
+ de comunicação com os webservices.
"""
_ambiente = 1 # 1 = Produção, 2 = Homologação
@@ -30,637 +28,22 @@ class Comunicacao(object):
certificado = None
certificado_senha = None
url = None
+ _versao = False
+ _assinatura = AssinaturaA1
+ _namespace = False
+ _header = False
+ _envio_mensagem = False
+ _namespace_metodo = False
+ _accept = False
+ _soap_action = False
+ _ws_url = False
+ _namespace_soap = NAMESPACE_SOAP
+ _namespace_xsi = NAMESPACE_XSI
+ _namespace_xsd = NAMESPACE_XSD
+ _soap_version = 'soap'
def __init__(self, uf, certificado, certificado_senha, homologacao=False):
self.uf = uf
self.certificado = certificado
self.certificado_senha = certificado_senha
self._ambiente = 2 if homologacao else 1
-
-
-class ComunicacaoSefaz(Comunicacao):
- """Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
-
- _versao = VERSAO_PADRAO
- _assinatura = AssinaturaA1
-
- def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1):
- """
- Método para realizar autorização da nota de acordo com o modelo
- :param modelo: Modelo
- :param nota_fiscal: XML assinado
- :param id_lote: Id do lote - numero autoincremental gerado pelo sistema
- :param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
- :return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário,
- envia todo o soap de resposta da Sefaz para decisão do usuário.
- """
- # url do serviço
- url = self._get_url(modelo=modelo, consulta='AUTORIZACAO')
-
- # Monta XML do corpo da requisição
- raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
- etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
- etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono
- raiz.append(nota_fiscal)
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeAutorizacao4', raiz)
- # Faz request no Servidor da Sefaz
- retorno = self._post(url, xml)
-
- # Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
- # Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
- if retorno.status_code == 200:
- # namespace
- ns = {'ns': NAMESPACE_NFE}
- # Procuta status no xml
- try:
- prot = etree.fromstring(retorno.text)
- except ValueError:
- # em SP retorno.text apresenta erro
- prot = etree.fromstring(retorno.content)
- if ind_sinc == 1:
- try:
- # Protocolo com envio OK
- try:
- inf_prot = prot[0][0] # root protNFe
- except IndexError:
- # Estados como GO vem com a tag header
- inf_prot = prot[1][0]
-
- lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
- # Lote processado
- if lote_status == '104':
- prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0]
- status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
- # autorizado usa da NF-e
- # retorna xml final (protNFe+NFe)
- if status == '100':
- raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
- raiz.append(nota_fiscal)
- raiz.append(prot_nfe)
- return 0, raiz
- except IndexError:
- # Protocolo com algum erro no Envio
- return 1, retorno, nota_fiscal
- else:
- # Retorna id do protocolo para posterior consulta em caso de sucesso.
- rec = prot[0][0]
- status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
- # Lote Recebido com Sucesso!
- if status == '103':
- nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
- return 0, nrec, nota_fiscal
- return 1, retorno, nota_fiscal
-
- def consulta_recibo(self, modelo, numero):
- """
- Este método oferece a consulta do resultado do processamento de um lote de NF-e.
- O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de
- 15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado
- deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em
- Processamento".
- :param modelo: Modelo da nota
- :param numero: Número da nota
- :return:
- """
-
- # url do serviço
- url = self._get_url(modelo=modelo, consulta='RECIBO')
-
- # Monta XML do corpo da requisição
- raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(raiz, 'nRec').text = numero
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz)
- return self._post(url, xml)
-
- def consulta_nota(self, modelo, chave):
- """
- Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal
- da Secretaria de Fazenda Estadual.
- :param modelo: Modelo da nota
- :param chave: Chave da nota
- :return:
- """
- # url do serviço
- url = self._get_url(modelo=modelo, consulta='CHAVE')
- # Monta XML do corpo da requisição
- raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
- etree.SubElement(raiz, 'chNFe').text = chave
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz)
- return self._post(url, xml)
-
- def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0):
- """
- O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag
- informada no XML. As tags são distNSU, consNSU e consChNFe.
- a) distNSU – Distribuição de Conjunto de DF-e a Partir do NSU Informado
- b) consNSU – Consulta DF-e Vinculado ao NSU Informado
- c) consChNFe – Consulta de NF-e por Chave de Acesso Informada
- :param cnpj: CNPJ do interessado
- :param cpf: CPF do interessado
- :param chave: Chave da NF-e a ser consultada
- :param nsu: Ultimo nsu ou nsu específico para ser consultado.
- :return:
- """
- # url
- url = self._get_url_an(consulta='DISTRIBUICAO')
- # Monta XML para envio da requisição
- raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
- if self.uf:
- etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()]
- if cnpj:
- etree.SubElement(raiz, 'CNPJ').text = cnpj
- else:
- etree.SubElement(raiz, 'CPF').text = cpf
- if not chave:
- distNSU = etree.SubElement(raiz, 'distNSU')
- etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15)
- if chave:
- consChNFe = etree.SubElement(raiz, 'consChNFe')
- etree.SubElement(consChNFe, 'chNFe').text = chave
- #Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz)
-
-
- return self._post(url, xml)
-
- def consulta_cadastro(self, modelo, cnpj):
- """
- Consulta de cadastro
- :param modelo: Modelo da nota
- :param cnpj: CNPJ da empresa
- :return:
- """
- # UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC
- lista_svrs = ['AC', 'RN', 'PB', 'SC']
-
- # RS implementa um método diferente na consulta de cadastro
- if self.uf.upper() == 'RS':
- url = NFE['RS']['CADASTRO']
- elif self.uf.upper() in lista_svrs:
- url = NFE['SVRS']['CADASTRO']
- elif self.uf.upper() == 'SVC-RS':
- url = NFE['SVC-RS']['CADASTRO']
- else:
- url = self._get_url(modelo=modelo, consulta='CADASTRO')
-
- raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE)
- info = etree.SubElement(raiz, 'infCons')
- etree.SubElement(info, 'xServ').text = 'CONS-CAD'
- etree.SubElement(info, 'UF').text = self.uf.upper()
- etree.SubElement(info, 'CNPJ').text = cnpj
- # etree.SubElement(info, 'CPF').text = cpf
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('CadConsultaCadastro4', raiz)
- # Chama método que efetua a requisição POST no servidor SOAP
- return self._post(url, xml)
-
- def evento(self, modelo, evento, id_lote=1):
- """
- Envia um evento de nota fiscal (cancelamento e carta de correção)
- :param modelo: Modelo da nota
- :param evento: Eventro
- :param id_lote: Id do lote
- :return:
- """
-
- # url do serviço
- try:
- # manifestacao url é do AN
- if evento[0][5].text.startswith('2'):
- url = self._get_url_an(consulta='EVENTOS')
- else:
- url = self._get_url(modelo=modelo, consulta='EVENTOS')
- except Exception:
- url = self._get_url(modelo=modelo, consulta='EVENTOS')
-
- # Monta XML do corpo da requisição
- raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
- raiz.append(evento)
- xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz)
- return self._post(url, xml)
-
- def status_servico(self, modelo):
- """
- Verifica status do servidor da receita.
- :param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce
- :return:
- """
- url = self._get_url(modelo, 'STATUS')
- # Monta XML do corpo da requisição
- raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
- etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
- etree.SubElement(raiz, 'xServ').text = 'STATUS'
- xml = self._construir_xml_soap('NFeStatusServico4', raiz)
- return self._post(url, xml)
-
- def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'):
- """
- Serviço destinado ao atendimento de solicitações de inutilização de numeração.
- :param modelo: Modelo da nota
- :param cnpj: CNPJda empresa
- :param numero_inicial: Número inicial
- :param numero_final: Número final
- :param justificativa: Justificativa
- :param ano: Ano
- :param serie: Série
- :return:
- """
-
- # url do servico
- url = self._get_url(modelo=modelo, consulta='INUTILIZACAO')
-
- # Valores default
- ano = str(ano or datetime.date.today().year)[-2:]
- uf = CODIGOS_ESTADOS[self.uf.upper()]
- cnpj = so_numeros(cnpj)
-
- # Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) +
- # CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID”
- id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % {
- 'uf': uf,
- 'ano': ano,
- 'cnpj': cnpj,
- 'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e;
- 'serie': serie.zfill(3),
- 'num_ini': str(numero_inicial).zfill(9),
- 'num_fin': str(numero_final).zfill(9),
- }
-
- # Monta XML do corpo da requisição # FIXME
- raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
- inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
- etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
- etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
- etree.SubElement(inf_inut, 'cUF').text = uf
- etree.SubElement(inf_inut, 'ano').text = ano
- etree.SubElement(inf_inut, 'CNPJ').text = cnpj
- etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e
- etree.SubElement(inf_inut, 'serie').text = serie
- etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
- etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
- etree.SubElement(inf_inut, 'xJust').text = justificativa
-
- # assinatura
- a1 = AssinaturaA1(self.certificado, self.certificado_senha)
- xml = a1.assinar(raiz)
-
- # Monta XML para envio da requisição
- xml = self._construir_xml_soap('NFeInutilizacao4', xml)
- # Faz request no Servidor da Sefaz e retorna resposta
- return self._post(url, xml)
-
- def _get_url_an(self, consulta):
- # producao
- if self._ambiente == 1:
- if consulta == 'DISTRIBUICAO':
- ambiente = 'https://www1.'
- else:
- ambiente = 'https://www.'
- # homologacao
- else:
- ambiente = 'https://hom.'
-
- self.url = ambiente + NFE['AN'][consulta]
- return self.url
-
- def _get_url(self, modelo, consulta):
- """ Retorna a url para comunicação com o webservice """
- # estado que implementam webservices proprios
- lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS']
- if self.uf.upper() in lista:
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if modelo == 'nfe':
- # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
- self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta]
- elif modelo == 'nfce':
- # PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe
- if self.uf.upper() == 'PE' or self.uf.upper() == 'BA':
- self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
- else:
- # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
- self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta]
- else:
- raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
- # Estados que utilizam outros ambientes
- else:
- lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO']
- if self.uf.upper() in lista_svrs:
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if modelo == 'nfe':
- # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
- self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
- elif modelo == 'nfce':
- # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
- self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
- else:
- raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
- # unico UF que utiliza SVAN ainda para NF-e
- # SVRS para NFC-e
- elif self.uf.upper() == 'MA':
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if modelo == 'nfe':
- # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
- self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
- elif modelo == 'nfce':
- # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
- self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
- else:
- raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
- else:
- raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}")
- return self.url
-
- def _construir_xml_soap(self, metodo, dados, cabecalho=False):
- """Mota o XML para o envio via SOAP"""
- raiz = etree.Element('{%s}Envelope' % NAMESPACE_SOAP, nsmap={
- 'xsi': NAMESPACE_XSI, 'xsd': NAMESPACE_XSD,'soap': NAMESPACE_SOAP})
- body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP)
- ## distribuição tem um corpo de xml diferente
- if metodo == 'NFeDistribuicaoDFe':
- x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo)
- a = etree.SubElement(x, 'nfeDadosMsg')
- else:
- a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo)
- a.append(dados)
- return raiz
-
- def _post_header(self):
- """Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
- # PE é a única UF que exige SOAPAction no header
- response = {
- 'content-type': 'application/soap+xml; charset=utf-8;',
- 'Accept': 'application/soap+xml; charset=utf-8;',
- }
- if self.uf.upper() == 'PE':
- response["SOAPAction"] = ""
- return response
-
- def _post(self, url, xml):
- certificado_a1 = CertificadoA1(self.certificado)
- chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
- chave_cert = (cert, chave)
- # Abre a conexão HTTPS
- try:
- xml_declaration = ''
-
- # limpa xml com caracteres bugados para infNFeSupl em NFC-e
- xml = re.sub(
- '(.*?)',
- lambda x: x.group(0).replace('<', '<').replace('>', '>').replace('&', ''),
- etree.tostring(xml, encoding='unicode').replace('\n', '')
- )
- xml = xml_declaration + xml
- # Faz o request com o servidor
- result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False)
- result.encoding = 'utf-8'
- return result
- except requests.exceptions.RequestException as e:
- raise e
- finally:
- certificado_a1.excluir()
-
-
-class ComunicacaoNfse(Comunicacao):
- """ Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
-
- _versao = ''
- _namespace = ''
-
- def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
- self.certificado = certificado
- self.certificado_senha = certificado_senha
- self._ambiente = 2 if homologacao else 1
- self.autorizador = autorizador.upper()
- if self.autorizador == 'GINFES':
- self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
- self._versao = '3'
- elif self.autorizador == 'BETHA':
- self._namespace = NAMESPACE_BETHA
- self._versao = '2.02'
- else:
- raise Exception('Autorizador não encontrado!')
-
- def autorizacao(self, nota):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'BETHA':
- # xml
- xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
- # comunica via wsdl
- return self._post(url, xml, 'gerar')
- else:
- raise Exception('Este método só esta implementado no autorizador betha.')
-
- def enviar_lote(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'GINFES':
- # xml
- xml = '' + xml
- # comunica via wsdl
- return self._post_https(url, xml, 'enviar_lote')
- else:
- raise Exception('Este método só esta implementado no autorizador ginfes.')
-
- def consultar(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'GINFES':
- # xml
- xml = '' + xml
- # comunica via wsdl
- return self._post_https(url, xml, 'consulta')
- else:
- raise Exception('Este método só esta implementado no autorizador ginfes.')
-
- def consultar_rps(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'BETHA':
- # comunica via wsdl
- return self._post(url, xml, 'consultaRps')
- elif self.autorizador == 'GINFES':
- return self._post_https(url, xml, 'consultaRps')
- # TODO outros autorizadres
- else:
- raise Exception('Autorizador não encontrado!')
-
- def consultar_faixa(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'BETHA':
- # comunica via wsdl
- return self._post(url, xml, 'consultaFaixa')
- else:
- raise Exception('Este método só esta implementado no autorizador betha.')
-
- def consultar_lote(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'GINFES':
- # xml
- xml = '' + xml
- # comunica via wsdl
- return self._post_https(url, xml, 'consulta_lote')
- else:
- raise Exception('Este método só esta implementado no autorizador ginfes.')
-
- def consultar_situacao_lote(self, xml):
- # url do serviço
- url = self._get_url()
- if self.autorizador == 'GINFES':
- # comunica via wsdl
- return self._post_https(url, xml, 'consulta_situacao_lote')
- else:
- raise Exception('Este método só esta implementado no autorizador ginfes.')
-
- def cancelar(self, xml):
- # url do serviço
- url = self._get_url()
- # Betha
- if self.autorizador == 'BETHA':
- # comunica via wsdl
- return self._post(url, xml, 'cancelar')
- # Ginfes
- elif self.autorizador == 'GINFES':
- # comunica via wsdl com certificado
- return self._post_https(url, xml, 'cancelar')
- # TODO outros autorizadres
- else:
- raise Exception('Autorizador não encontrado!')
-
- def _cabecalho(self, retorna_string=True):
- """ Monta o XML do cabeçalho da requisição wsdl
- Namespaces padrão homologação (Ginfes) """
-
- xml_declaration = ''
- # cabecalho = '3'
- # cabecalho
- raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
- etree.SubElement(raiz, 'versaoDados').text = self._versao
-
- if retorna_string:
- cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
- cabecalho = xml_declaration + cabecalho
- return cabecalho
- else:
- return raiz
-
- def _cabecalho2(self, retorna_string=True):
- """ Monta o XML do cabeçalho da requisição wsdl
- Namespaces que funcionaram em produção (Ginfes)"""
-
- xml_declaration = ''
-
- # cabecalho
- raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
- etree.SubElement(raiz, 'versaoDados').text = self._versao
-
- if retorna_string:
- cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
- cabecalho = xml_declaration + cabecalho
- return cabecalho
- else:
- return raiz
-
- def _cabecalho_ginfes(self):
- """ Retorna o XML do cabeçalho gerado pelo xsd"""
- from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
- return SerializacaoGinfes().cabecalho()
-
- def _get_url(self):
- """ Retorna a url para comunicação com o webservice """
- if self._ambiente == 1:
- ambiente = 'HTTPS'
- else:
- ambiente = 'HOMOLOGACAO'
- if self.autorizador in NFSE:
- self.url = NFSE[self.autorizador][ambiente]
- else:
- raise Exception('Autorizador nao encontrado!')
- return self.url
-
- def _post(self, url, xml, metodo):
- """ Comunicação wsdl (http) sem certificado digital """
- # cabecalho
- cabecalho = self._cabecalho()
- # comunicacao wsdl
- try:
- from suds.client import Client
- cliente = Client(url)
- # gerar nfse
- if metodo == 'gerar':
- return cliente.service.GerarNfse(cabecalho, xml)
- elif metodo == 'consultaRps':
- return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
- elif metodo == 'consultaFaixa':
- return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
- elif metodo == 'cancelar':
- return cliente.service.CancelarNfse(cabecalho, xml)
- # TODO outros metodos
- else:
- raise Exception('Método não implementado no autorizador.')
- except Exception as e:
- raise e
-
- def _post_https(self, url, xml, metodo):
- """ Comunicação wsdl (https) utilizando certificado do usuário """
- # cabecalho
- cabecalho = self._cabecalho()
- # comunicacao wsdl
- try:
- from suds.client import Client
- from pynfe.utils.https_nfse import HttpAuthenticated
-
- certificadoA1 = CertificadoA1(self.certificado)
- chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
-
- cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
-
- # gerar nfse
- if metodo == 'gerar':
- return cliente.service.GerarNfse(cabecalho, xml)
- elif metodo == 'enviar_lote':
- return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
- elif metodo == 'consulta':
- return cliente.service.ConsultarNfseV3(cabecalho, xml)
- elif metodo == 'consulta_lote':
- return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
- elif metodo == 'consulta_situacao_lote':
- return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
- elif metodo == 'consultaRps':
- return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
- elif metodo == 'consultaFaixa':
- return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
- elif metodo == 'cancelar':
- # versão 2
- return cliente.service.CancelarNfse(xml)
- # versão 3
- # return cliente.service.CancelarNfseV3(cabecalho, xml)
- # TODO outros metodos
- else:
- raise Exception('Método não implementado no autorizador.')
- except Exception as e:
- raise e
diff --git a/pynfe/processamento/mdfe.py b/pynfe/processamento/mdfe.py
new file mode 100644
index 0000000..ab5e153
--- /dev/null
+++ b/pynfe/processamento/mdfe.py
@@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+import time
+import re
+import requests
+import collections
+from io import StringIO
+
+
+from pynfe.utils.flags import (
+ NAMESPACE_MDFE,
+ MODELO_MDFE,
+ VERSAO_MDFE,
+ NAMESPACE_MDFE_METODO,
+ NAMESPACE_SOAP,
+ NAMESPACE_XSI,
+ NAMESPACE_XSD,
+ CODIGOS_ESTADOS
+)
+from pynfe.utils.webservices import MDFE
+from pynfe.entidades.certificado import CertificadoA1
+from pynfe.utils import etree, extrai_id_srtxml
+from .comunicacao import Comunicacao
+from .resposta import analisar_retorno
+
+MDFE_SITUACAO_JA_ENVIADO = ('100', '101', '132')
+
+
+class ComunicacaoMDFe(Comunicacao):
+
+ _modelo = MODELO_MDFE
+ _namespace = NAMESPACE_MDFE
+ _versao = VERSAO_MDFE
+ _header = 'mdfeCabecMsg'
+ _envio_mensagem = 'mdfeDadosMsg'
+ _retorno_mensagem = 'mdfeRecepcaoResult'
+ _namespace_metodo = NAMESPACE_MDFE_METODO
+
+ _accept = True
+ _soap_action = False
+ _namespace_soap = NAMESPACE_SOAP
+ _namespace_xsi = NAMESPACE_XSI
+ _namespace_xsd = NAMESPACE_XSD
+ _soap_version = 'soap12'
+ _edoc_situacao_ja_enviado = MDFE_SITUACAO_JA_ENVIADO
+ _edoc_situacao_arquivo_recebido_com_sucesso = '103'
+ _edoc_situacao_em_processamento = '105'
+ _edoc_situacao_servico_em_operacao = '107'
+
+ consulta_servico_ao_enviar = True
+ maximo_tentativas_consulta_recibo = 5
+
+ def status_servico(self):
+ url = self._get_url('STATUS')
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consStatServMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'xServ').text = 'STATUS'
+ xml = self._construir_xml_soap('MDFeStatusServico', raiz)
+ return self._post(url, xml)
+
+ def consulta(self, chave):
+ url = self._get_url('CONSULTA')
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consSitMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
+ etree.SubElement(raiz, 'chMDFe').text = chave
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('MDFeConsulta', raiz)
+ return self._post(url, xml)
+
+ def consulta_nao_encerrados(self, cpfcnpj):
+ url = self._get_url('NAO_ENCERRADOS')
+ # Monta XML do corpo da requisição
+ # raiz = etree.Element('consMDFeNaoEnc', xmlns=NAMESPACE_MDFE, versao=self._versao)
+ attr = collections.OrderedDict()
+ attr['xmlns'] = NAMESPACE_MDFE
+ attr['versao'] = self._versao
+ raiz = etree.Element('consMDFeNaoEnc', attr)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'xServ').text = 'CONSULTAR NÃO ENCERRADOS'
+ if len(cpfcnpj) == 11:
+ etree.SubElement(raiz, 'CPF').text = cpfcnpj.zfill(11)
+ else:
+ etree.SubElement(raiz, 'CNPJ').text = cpfcnpj.zfill(14)
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('MDFeConsNaoEnc', raiz)
+ return self._post(url, xml)
+
+ def consulta_recibo(self, numero):
+ url = self._get_url('RET_RECEPCAO')
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consReciMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'nRec').text = numero.zfill(15)
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('MDFeRetRecepcao', raiz)
+ return self._post(url, xml)
+
+ def evento(self, evento):
+ """
+ Envia eventos do MDFe como:
+ Encerramento
+ Cancelamento
+ Inclusao Condutor
+ Inclusao DF-e
+ Pagamento Operacao MDF-e
+ :param evento: Nome do Evento
+ :return:
+ """
+ # url do serviço
+ url = self._get_url('EVENTOS')
+ # Monta XML do corpo da requisição
+ xml = self._construir_xml_soap('MDFeRecepcaoEvento', evento)
+ return self._post(url, xml)
+
+ def _construir_xml_soap(self, metodo, dados):
+ """Mota o XML para o envio via SOAP"""
+
+ ns = collections.OrderedDict()
+ ns['xsi'] = self._namespace_xsi
+ ns['xsd'] = self._namespace_xsd
+ ns[self._soap_version] = self._namespace_soap
+ raiz = etree.Element(
+ '{%s}Envelope' % self._namespace_soap,
+ nsmap=ns
+ )
+
+ if self._header:
+ cabecalho = self._cabecalho_soap(metodo)
+ c = etree.SubElement(raiz, '{%s}Header' % self._namespace_soap)
+ c.append(cabecalho)
+
+ body = etree.SubElement(raiz, '{%s}Body' % self._namespace_soap)
+
+ a = etree.SubElement(
+ body,
+ self._envio_mensagem,
+ xmlns=self._namespace_metodo+metodo
+ )
+ a.append(dados)
+ return raiz
+
+ def _post_header(self, soap_webservice_method=False):
+ """Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
+ header = {
+ b'content-type': b'text/xml; charset=utf-8;',
+ }
+
+ # PE é a únca UF que exige SOAPAction no header
+ if soap_webservice_method:
+ header[b'SOAPAction'] = \
+ (self._namespace_metodo + soap_webservice_method).encode('utf-8')
+
+ if self._accept:
+ header[b'Accept'] = b'application/soap+xml; charset=utf-8;'
+
+ return header
+
+ def _post(self, url, xml):
+ certificado_a1 = CertificadoA1(self.certificado)
+ chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
+ chave_cert = (cert, chave)
+ # Abre a conexão HTTPS
+ try:
+ xml_declaration = ''
+
+ # limpa xml com caracteres bugados para infNFeSupl em NFC-e
+ xml = re.sub(
+ '(.*?)',
+ lambda x: x.group(0).replace('<', '<').replace('>', '>').replace('&', ''),
+ etree.tostring(xml, encoding='unicode').replace('\n', '')
+ )
+ xml = xml_declaration + xml
+
+ # print(xml)
+ # print('-' * 20)
+
+ # Faz o request com o servidor
+ result = requests.post(
+ url,
+ xml,
+ headers=self._post_header(),
+ cert=chave_cert,
+ verify=False
+ )
+ result.encoding = 'utf-8'
+ return result
+ except requests.exceptions.RequestException as e:
+ raise e
+ finally:
+ certificado_a1.excluir()
+
+ def _cabecalho_soap(self, metodo):
+ """Monta o XML do cabeçalho da requisição SOAP"""
+
+ raiz = etree.Element(
+ self._header,
+ xmlns=self._namespace_metodo + metodo
+ )
+ etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
+ etree.SubElement(raiz, 'versaoDados').text = '3.00'
+ return raiz
+
+ def _get_url(self, consulta):
+ # producao
+ if self._ambiente == 1:
+ ambiente = MDFE['SVRS']['HTTPS']
+ # homologacao
+ else:
+ ambiente = MDFE['SVRS']['HOMOLOGACAO']
+
+ self.url = ambiente + MDFE['SVRS'][consulta]
+ return self.url
diff --git a/pynfe/processamento/nfe.py b/pynfe/processamento/nfe.py
new file mode 100644
index 0000000..b15da6c
--- /dev/null
+++ b/pynfe/processamento/nfe.py
@@ -0,0 +1,437 @@
+# -*- coding: utf-8 -*-
+import datetime
+import re
+import requests
+
+from pynfe.utils import etree, so_numeros
+from pynfe.utils.flags import (
+ NAMESPACE_NFE,
+ VERSAO_PADRAO,
+ CODIGOS_ESTADOS,
+ NAMESPACE_METODO,
+ NAMESPACE_SOAP,
+ NAMESPACE_XSI,
+ NAMESPACE_XSD,
+)
+
+from pynfe.entidades.certificado import CertificadoA1
+from pynfe.utils.webservices import NFE, NFCE
+from .assinatura import AssinaturaA1
+from .comunicacao import Comunicacao
+
+
+class ComunicacaoNFe(Comunicacao):
+ """Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
+
+ _versao = VERSAO_PADRAO
+ _assinatura = AssinaturaA1
+ _namespace = NAMESPACE_NFE
+ _header = False
+ _envio_mensagem = 'nfeDadosMsg'
+ _namespace_metodo = NAMESPACE_METODO
+ _accept = False
+ _namespace_soap = NAMESPACE_SOAP
+ _namespace_xsi = NAMESPACE_XSI
+ _namespace_xsd = NAMESPACE_XSD
+ _soap_version = 'soap'
+
+ def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1):
+ """
+ Método para realizar autorização da nota de acordo com o modelo
+ :param modelo: Modelo
+ :param nota_fiscal: XML assinado
+ :param id_lote: Id do lote - numero autoincremental gerado pelo sistema
+ :param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
+ :return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário,
+ envia todo o soap de resposta da Sefaz para decisão do usuário.
+ """
+ # url do serviço
+ url = self._get_url(modelo=modelo, consulta='AUTORIZACAO')
+
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
+ etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
+ etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono
+ raiz.append(nota_fiscal)
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeAutorizacao4', raiz)
+ # Faz request no Servidor da Sefaz
+ retorno = self._post(url, xml)
+
+ # Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
+ # Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
+ if retorno.status_code == 200:
+ # namespace
+ ns = {'ns': NAMESPACE_NFE}
+ # Procuta status no xml
+ try:
+ prot = etree.fromstring(retorno.text)
+ except ValueError:
+ # em SP retorno.text apresenta erro
+ prot = etree.fromstring(retorno.content)
+ if ind_sinc == 1:
+ try:
+ # Protocolo com envio OK
+ try:
+ inf_prot = prot[0][0] # root protNFe
+ except IndexError:
+ # Estados como GO vem com a tag header
+ inf_prot = prot[1][0]
+
+ lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
+ # Lote processado
+ if lote_status == '104':
+ prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0]
+ status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
+ # autorizado usa da NF-e
+ # retorna xml final (protNFe+NFe)
+ if status == '100':
+ raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
+ raiz.append(nota_fiscal)
+ raiz.append(prot_nfe)
+ return 0, raiz
+ except IndexError:
+ # Protocolo com algum erro no Envio
+ return 1, retorno, nota_fiscal
+ else:
+ # Retorna id do protocolo para posterior consulta em caso de sucesso.
+ rec = prot[0][0]
+ status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
+ # Lote Recebido com Sucesso!
+ if status == '103':
+ nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
+ return 0, nrec, nota_fiscal
+ return 1, retorno, nota_fiscal
+
+ def consulta_recibo(self, modelo, numero):
+ """
+ Este método oferece a consulta do resultado do processamento de um lote de NF-e.
+ O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de
+ 15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado
+ deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em
+ Processamento".
+ :param modelo: Modelo da nota
+ :param numero: Número da nota
+ :return:
+ """
+
+ # url do serviço
+ url = self._get_url(modelo=modelo, consulta='RECIBO')
+
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'nRec').text = numero
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz)
+ return self._post(url, xml)
+
+ def consulta_nota(self, modelo, chave):
+ """
+ Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal
+ da Secretaria de Fazenda Estadual.
+ :param modelo: Modelo da nota
+ :param chave: Chave da nota
+ :return:
+ """
+ # url do serviço
+ url = self._get_url(modelo=modelo, consulta='CHAVE')
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
+ etree.SubElement(raiz, 'chNFe').text = chave
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz)
+ return self._post(url, xml)
+
+ def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0):
+ """
+ O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag
+ informada no XML. As tags são distNSU, consNSU e consChNFe.
+ a) distNSU – Distribuição de Conjunto de DF-e a Partir do NSU Informado
+ b) consNSU – Consulta DF-e Vinculado ao NSU Informado
+ c) consChNFe – Consulta de NF-e por Chave de Acesso Informada
+ :param cnpj: CNPJ do interessado
+ :param cpf: CPF do interessado
+ :param chave: Chave da NF-e a ser consultada
+ :param nsu: Ultimo nsu ou nsu específico para ser consultado.
+ :return:
+ """
+ # url
+ url = self._get_url_an(consulta='DISTRIBUICAO')
+ # Monta XML para envio da requisição
+ raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ if self.uf:
+ etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()]
+ if cnpj:
+ etree.SubElement(raiz, 'CNPJ').text = cnpj
+ else:
+ etree.SubElement(raiz, 'CPF').text = cpf
+ if not chave:
+ distNSU = etree.SubElement(raiz, 'distNSU')
+ etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15)
+ if chave:
+ consChNFe = etree.SubElement(raiz, 'consChNFe')
+ etree.SubElement(consChNFe, 'chNFe').text = chave
+ #Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz)
+ return self._post(url, xml)
+
+ def consulta_cadastro(self, modelo, cnpj):
+ """
+ Consulta de cadastro
+ :param modelo: Modelo da nota
+ :param cnpj: CNPJ da empresa
+ :return:
+ """
+ # UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC
+ lista_svrs = ['AC', 'RN', 'PB', 'SC']
+
+ # RS implementa um método diferente na consulta de cadastro
+ if self.uf.upper() == 'RS':
+ url = NFE['RS']['CADASTRO']
+ elif self.uf.upper() in lista_svrs:
+ url = NFE['SVRS']['CADASTRO']
+ elif self.uf.upper() == 'SVC-RS':
+ url = NFE['SVC-RS']['CADASTRO']
+ else:
+ url = self._get_url(modelo=modelo, consulta='CADASTRO')
+
+ raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE)
+ info = etree.SubElement(raiz, 'infCons')
+ etree.SubElement(info, 'xServ').text = 'CONS-CAD'
+ etree.SubElement(info, 'UF').text = self.uf.upper()
+ etree.SubElement(info, 'CNPJ').text = cnpj
+ # etree.SubElement(info, 'CPF').text = cpf
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('CadConsultaCadastro4', raiz)
+ # Chama método que efetua a requisição POST no servidor SOAP
+ return self._post(url, xml)
+
+ def evento(self, modelo, evento, id_lote=1):
+ """
+ Envia um evento de nota fiscal (cancelamento e carta de correção)
+ :param modelo: Modelo da nota
+ :param evento: Eventro
+ :param id_lote: Id do lote
+ :return:
+ """
+
+ # url do serviço
+ try:
+ # manifestacao url é do AN
+ if evento[0][5].text.startswith('2'):
+ url = self._get_url_an(consulta='EVENTOS')
+ else:
+ url = self._get_url(modelo=modelo, consulta='EVENTOS')
+ except Exception:
+ url = self._get_url(modelo=modelo, consulta='EVENTOS')
+
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
+ raiz.append(evento)
+ xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz)
+ return self._post(url, xml)
+
+ def status_servico(self):
+ """
+ Verifica status do servidor da receita.
+ :param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce
+ :return:
+ """
+ url = self._get_url('mdfe', 'STATUS')
+ # Monta XML do corpo da requisição
+ raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
+ etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
+ etree.SubElement(raiz, 'xServ').text = 'STATUS'
+ xml = self._construir_xml_soap('NFeStatusServico4', raiz)
+ return self._post(url, xml)
+
+ def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'):
+ """
+ Serviço destinado ao atendimento de solicitações de inutilização de numeração.
+ :param modelo: Modelo da nota
+ :param cnpj: CNPJda empresa
+ :param numero_inicial: Número inicial
+ :param numero_final: Número final
+ :param justificativa: Justificativa
+ :param ano: Ano
+ :param serie: Série
+ :return:
+ """
+
+ # url do servico
+ url = self._get_url(modelo=modelo, consulta='INUTILIZACAO')
+
+ # Valores default
+ ano = str(ano or datetime.date.today().year)[-2:]
+ uf = CODIGOS_ESTADOS[self.uf.upper()]
+ cnpj = so_numeros(cnpj)
+
+ # Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) +
+ # CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID”
+ id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % {
+ 'uf': uf,
+ 'ano': ano,
+ 'cnpj': cnpj,
+ 'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e;
+ 'serie': serie.zfill(3),
+ 'num_ini': str(numero_inicial).zfill(9),
+ 'num_fin': str(numero_final).zfill(9),
+ }
+
+ # Monta XML do corpo da requisição # FIXME
+ raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
+ inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
+ etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
+ etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
+ etree.SubElement(inf_inut, 'cUF').text = uf
+ etree.SubElement(inf_inut, 'ano').text = ano
+ etree.SubElement(inf_inut, 'CNPJ').text = cnpj
+ etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e
+ etree.SubElement(inf_inut, 'serie').text = serie
+ etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
+ etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
+ etree.SubElement(inf_inut, 'xJust').text = justificativa
+
+ # assinatura
+ a1 = AssinaturaA1(self.certificado, self.certificado_senha)
+ xml = a1.assinar(raiz)
+
+ # Monta XML para envio da requisição
+ xml = self._construir_xml_soap('NFeInutilizacao4', xml)
+ # Faz request no Servidor da Sefaz e retorna resposta
+ return self._post(url, xml)
+
+ def _get_url_an(self, consulta):
+ # producao
+ if self._ambiente == 1:
+ if consulta == 'DISTRIBUICAO':
+ ambiente = 'https://www1.'
+ else:
+ ambiente = 'https://www.'
+ # homologacao
+ else:
+ ambiente = 'https://hom.'
+
+ self.url = ambiente + NFE['AN'][consulta]
+ return self.url
+
+ def _get_url(self, modelo, consulta):
+ """ Retorna a url para comunicação com o webservice """
+ # estado que implementam webservices proprios
+ lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS']
+ if self.uf.upper() in lista:
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if modelo == 'nfe':
+ # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
+ self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta]
+ elif modelo == 'nfce':
+ # PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe
+ if self.uf.upper() == 'PE' or self.uf.upper() == 'BA':
+ self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
+ else:
+ # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
+ self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta]
+ else:
+ raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
+ # Estados que utilizam outros ambientes
+ else:
+ lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO']
+ if self.uf.upper() in lista_svrs:
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if modelo == 'nfe':
+ # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
+ self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
+ elif modelo == 'nfce':
+ # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
+ self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
+ else:
+ raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
+ # unico UF que utiliza SVAN ainda para NF-e
+ # SVRS para NFC-e
+ elif self.uf.upper() == 'MA':
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if modelo == 'nfe':
+ # nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
+ self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
+ elif modelo == 'nfce':
+ # nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
+ self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
+ else:
+ raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
+ else:
+ raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}")
+ return self.url
+
+ def _construir_xml_soap(self, metodo, dados, cabecalho=False):
+ """Mota o XML para o envio via SOAP"""
+
+ raiz = etree.Element(
+ '{%s}Envelope' % NAMESPACE_SOAP,
+ nsmap={
+ 'xsi': NAMESPACE_XSI,
+ 'xsd': NAMESPACE_XSD,
+ 'soap': NAMESPACE_SOAP
+ }
+ )
+ body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP)
+ # distribuição tem um corpo de xml diferente
+ if metodo == 'NFeDistribuicaoDFe':
+ x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo)
+ a = etree.SubElement(x, 'nfeDadosMsg')
+ else:
+ a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo)
+ a.append(dados)
+ return raiz
+
+ def _post_header(self):
+ """Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
+ # PE é a única UF que exige SOAPAction no header
+ response = {
+ 'content-type': 'application/soap+xml; charset=utf-8;',
+ 'Accept': 'application/soap+xml; charset=utf-8;',
+ }
+ if self.uf.upper() == 'PE':
+ response["SOAPAction"] = ""
+ return response
+
+ def _post(self, url, xml):
+ certificado_a1 = CertificadoA1(self.certificado)
+ chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
+ chave_cert = (cert, chave)
+ # Abre a conexão HTTPS
+ try:
+ xml_declaration = ''
+
+ # limpa xml com caracteres bugados para infNFeSupl em NFC-e
+ xml = re.sub(
+ '(.*?)',
+ lambda x: x.group(0).replace('<', '<').replace('>', '>').replace('&', ''),
+ etree.tostring(xml, encoding='unicode').replace('\n', '')
+ )
+ xml = xml_declaration + xml
+ # Faz o request com o servidor
+ result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False)
+ result.encoding = 'utf-8'
+ return result
+ except requests.exceptions.RequestException as e:
+ raise e
+ finally:
+ certificado_a1.excluir()
diff --git a/pynfe/processamento/nfse.py b/pynfe/processamento/nfse.py
new file mode 100644
index 0000000..5d75a52
--- /dev/null
+++ b/pynfe/processamento/nfse.py
@@ -0,0 +1,234 @@
+# -*- coding: utf-8 -*-
+from pynfe.utils import etree
+from pynfe.utils.flags import (
+ NAMESPACE_XSI,
+ NAMESPACE_BETHA,
+)
+from pynfe.utils.webservices import NFSE
+from pynfe.entidades.certificado import CertificadoA1
+from .comunicacao import Comunicacao
+
+
+class ComunicacaoNfse(Comunicacao):
+ """ Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
+
+ _versao = ''
+ _namespace = ''
+
+ def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
+ self.certificado = certificado
+ self.certificado_senha = certificado_senha
+ self._ambiente = 2 if homologacao else 1
+ self.autorizador = autorizador.upper()
+ if self.autorizador == 'GINFES':
+ self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
+ self._versao = '3'
+ elif self.autorizador == 'BETHA':
+ self._namespace = NAMESPACE_BETHA
+ self._versao = '2.02'
+ else:
+ raise Exception('Autorizador não encontrado!')
+
+ def autorizacao(self, nota):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'BETHA':
+ # xml
+ xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
+ # comunica via wsdl
+ return self._post(url, xml, 'gerar')
+ else:
+ raise Exception('Este método só esta implementado no autorizador betha.')
+
+ def enviar_lote(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'GINFES':
+ # xml
+ xml = '' + xml
+ # comunica via wsdl
+ return self._post_https(url, xml, 'enviar_lote')
+ else:
+ raise Exception('Este método só esta implementado no autorizador ginfes.')
+
+ def consultar(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'GINFES':
+ # xml
+ xml = '' + xml
+ # comunica via wsdl
+ return self._post_https(url, xml, 'consulta')
+ else:
+ raise Exception('Este método só esta implementado no autorizador ginfes.')
+
+ def consultar_rps(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'BETHA':
+ # comunica via wsdl
+ return self._post(url, xml, 'consultaRps')
+ elif self.autorizador == 'GINFES':
+ return self._post_https(url, xml, 'consultaRps')
+ # TODO outros autorizadres
+ else:
+ raise Exception('Autorizador não encontrado!')
+
+ def consultar_faixa(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'BETHA':
+ # comunica via wsdl
+ return self._post(url, xml, 'consultaFaixa')
+ else:
+ raise Exception('Este método só esta implementado no autorizador betha.')
+
+ def consultar_lote(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'GINFES':
+ # xml
+ xml = '' + xml
+ # comunica via wsdl
+ return self._post_https(url, xml, 'consulta_lote')
+ else:
+ raise Exception('Este método só esta implementado no autorizador ginfes.')
+
+ def consultar_situacao_lote(self, xml):
+ # url do serviço
+ url = self._get_url()
+ if self.autorizador == 'GINFES':
+ # comunica via wsdl
+ return self._post_https(url, xml, 'consulta_situacao_lote')
+ else:
+ raise Exception('Este método só esta implementado no autorizador ginfes.')
+
+ def cancelar(self, xml):
+ # url do serviço
+ url = self._get_url()
+ # Betha
+ if self.autorizador == 'BETHA':
+ # comunica via wsdl
+ return self._post(url, xml, 'cancelar')
+ # Ginfes
+ elif self.autorizador == 'GINFES':
+ # comunica via wsdl com certificado
+ return self._post_https(url, xml, 'cancelar')
+ # TODO outros autorizadres
+ else:
+ raise Exception('Autorizador não encontrado!')
+
+ def _cabecalho(self, retorna_string=True):
+ """ Monta o XML do cabeçalho da requisição wsdl
+ Namespaces padrão homologação (Ginfes) """
+
+ xml_declaration = ''
+ # cabecalho = '3'
+ # cabecalho
+ raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
+ etree.SubElement(raiz, 'versaoDados').text = self._versao
+
+ if retorna_string:
+ cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
+ cabecalho = xml_declaration + cabecalho
+ return cabecalho
+ else:
+ return raiz
+
+ def _cabecalho2(self, retorna_string=True):
+ """ Monta o XML do cabeçalho da requisição wsdl
+ Namespaces que funcionaram em produção (Ginfes)"""
+
+ xml_declaration = ''
+
+ # cabecalho
+ raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
+ etree.SubElement(raiz, 'versaoDados').text = self._versao
+
+ if retorna_string:
+ cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
+ cabecalho = xml_declaration + cabecalho
+ return cabecalho
+ else:
+ return raiz
+
+ def _cabecalho_ginfes(self):
+ """ Retorna o XML do cabeçalho gerado pelo xsd"""
+ from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
+ return SerializacaoGinfes().cabecalho()
+
+ def _get_url(self):
+ """ Retorna a url para comunicação com o webservice """
+ if self._ambiente == 1:
+ ambiente = 'HTTPS'
+ else:
+ ambiente = 'HOMOLOGACAO'
+ if self.autorizador in NFSE:
+ self.url = NFSE[self.autorizador][ambiente]
+ else:
+ raise Exception('Autorizador nao encontrado!')
+ return self.url
+
+ def _post(self, url, xml, metodo):
+ """ Comunicação wsdl (http) sem certificado digital """
+ # cabecalho
+ cabecalho = self._cabecalho()
+ # comunicacao wsdl
+ try:
+ from suds.client import Client
+ cliente = Client(url)
+ # gerar nfse
+ if metodo == 'gerar':
+ return cliente.service.GerarNfse(cabecalho, xml)
+ elif metodo == 'consultaRps':
+ return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
+ elif metodo == 'consultaFaixa':
+ return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
+ elif metodo == 'cancelar':
+ return cliente.service.CancelarNfse(cabecalho, xml)
+ # TODO outros metodos
+ else:
+ raise Exception('Método não implementado no autorizador.')
+ except Exception as e:
+ raise e
+
+ def _post_https(self, url, xml, metodo):
+ """ Comunicação wsdl (https) utilizando certificado do usuário """
+ # cabecalho
+ cabecalho = self._cabecalho()
+ # comunicacao wsdl
+ try:
+ from suds.client import Client
+ from pynfe.utils.https_nfse import HttpAuthenticated
+
+ certificadoA1 = CertificadoA1(self.certificado)
+ chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
+
+ cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
+
+ # gerar nfse
+ if metodo == 'gerar':
+ return cliente.service.GerarNfse(cabecalho, xml)
+ elif metodo == 'enviar_lote':
+ return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
+ elif metodo == 'consulta':
+ return cliente.service.ConsultarNfseV3(cabecalho, xml)
+ elif metodo == 'consulta_lote':
+ return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
+ elif metodo == 'consulta_situacao_lote':
+ return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
+ elif metodo == 'consultaRps':
+ return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
+ elif metodo == 'consultaFaixa':
+ return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
+ elif metodo == 'cancelar':
+ # versão 2
+ return cliente.service.CancelarNfse(xml)
+ # versão 3
+ # return cliente.service.CancelarNfseV3(cabecalho, xml)
+ # TODO outros metodos
+ else:
+ raise Exception('Método não implementado no autorizador.')
+ except Exception as e:
+ raise e
diff --git a/pynfe/processamento/resposta.py b/pynfe/processamento/resposta.py
new file mode 100644
index 0000000..40f1529
--- /dev/null
+++ b/pynfe/processamento/resposta.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+import re
+from pynfe.utils import etree
+
+
+class RetornoSoap(object):
+
+ def __init__(self, webservice, retorno, resposta):
+ self.webservice = webservice
+ self.resposta = resposta
+ self.retorno = retorno
+
+
+def analisar_retorno(webservice, retorno, classe_resposta):
+
+ # retorno.raise_for_status()
+ # print(retorno.text)
+
+ match = re.search('(.*?)', retorno.text)
+
+ if match:
+ resultado = etree.tostring(etree.fromstring(match.group(1))[0])
+ # classe_resposta.Validate_simpletypes_ = False
+ # resposta = classe_resposta.parseString(resultado)
+ resposta = resultado
+ # resposta = retorno.text
+
+ return RetornoSoap(webservice, retorno, resposta)
diff --git a/pynfe/processamento/serializacao.py b/pynfe/processamento/serializacao.py
index c571654..9f0ddc2 100644
--- a/pynfe/processamento/serializacao.py
+++ b/pynfe/processamento/serializacao.py
@@ -1,9 +1,19 @@
# -*- coding: utf-8 -*-
from pynfe.entidades import NotaFiscal
-from pynfe.utils import etree, so_numeros, obter_municipio_por_codigo, \
- obter_pais_por_codigo, obter_municipio_e_codigo, formatar_decimal, \
+from pynfe.utils import (
+ etree, so_numeros, obter_municipio_por_codigo,
+ obter_pais_por_codigo, obter_municipio_e_codigo, formatar_decimal,
remover_acentos, obter_uf_por_codigo, obter_codigo_por_municipio
-from pynfe.utils.flags import CODIGOS_ESTADOS, VERSAO_PADRAO, NAMESPACE_NFE, NAMESPACE_SIG, VERSAO_QRCODE
+)
+from pynfe.utils.flags import (
+ CODIGOS_ESTADOS,
+ VERSAO_PADRAO,
+ VERSAO_MDFE,
+ NAMESPACE_NFE,
+ NAMESPACE_MDFE,
+ NAMESPACE_SIG,
+ VERSAO_QRCODE
+)
from pynfe.utils.webservices import NFCE
import base64
import hashlib
@@ -674,7 +684,6 @@ class SerializacaoXML(Serializacao):
raiz.append(self._serializar_responsavel_tecnico(
nota_fiscal.responsavel_tecnico[0], retorna_string=False))
-
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
@@ -712,6 +721,93 @@ class SerializacaoXML(Serializacao):
else:
return raiz
+ def serializar_evento_mdfe(self, evento, tag_raiz='eventoMDFe', retorna_string=False):
+ tz = datetime.now().astimezone().strftime('%z')
+ tz = "{}:{}".format(tz[:-2], tz[-2:])
+ raiz = etree.Element(tag_raiz, versao=VERSAO_MDFE, xmlns=NAMESPACE_MDFE)
+ e = etree.SubElement(raiz, 'infEvento', Id=evento.identificador)
+ etree.SubElement(e, 'cOrgao').text = CODIGOS_ESTADOS[evento.uf.upper()]
+ etree.SubElement(e, 'tpAmb').text = str(self._ambiente)
+ if len(so_numeros(evento.cnpj)) == 11:
+ etree.SubElement(e, 'CPF').text = evento.cnpj
+ else:
+ etree.SubElement(e, 'CNPJ').text = evento.cnpj
+ etree.SubElement(e, 'chMDFe').text = evento.chave
+ etree.SubElement(e, 'dhEvento').text = evento.data_emissao.strftime('%Y-%m-%dT%H:%M:%S') + tz
+ etree.SubElement(e, 'tpEvento').text = evento.tp_evento
+ etree.SubElement(e, 'nSeqEvento').text = str(evento.n_seq_evento)
+ det = etree.SubElement(e, 'detEvento', versaoEvento=VERSAO_MDFE)
+ if evento.descricao == 'Encerramento':
+ encerramento = etree.SubElement(det, 'evEncMDFe')
+ etree.SubElement(encerramento, 'descEvento').text = evento.descricao
+ etree.SubElement(encerramento, 'nProt').text = evento.protocolo
+ etree.SubElement(encerramento, 'dtEnc').text = evento.dtenc.strftime('%Y-%m-%d')
+ etree.SubElement(encerramento, 'cUF').text = evento.cuf
+ etree.SubElement(encerramento, 'cMun').text = evento.cmun
+ elif evento.descricao == 'Inclusão Condutor':
+ inclusao = etree.SubElement(det, 'evIncCondutorMDFe')
+ etree.SubElement(inclusao, 'descEvento').text = evento.descricao
+ condutor = etree.SubElement(inclusao, 'condutor')
+ etree.SubElement(condutor, 'xNome').text = evento.nome_motorista
+ etree.SubElement(condutor, 'CPF').text = evento.cpf_motorista
+ elif evento.descricao == 'Inclusao DF-e':
+ inclusao = etree.SubElement(det, 'evIncDFeMDFe')
+ etree.SubElement(inclusao, 'descEvento').text = evento.descricao
+ etree.SubElement(inclusao, 'nProt').text = evento.protocolo
+ etree.SubElement(inclusao, 'cMunCarrega').text = evento.cmun_carrega
+ etree.SubElement(inclusao, 'xMunCarrega').text = evento.xmun_carrega
+ infDoc = etree.SubElement(inclusao, 'infDoc')
+ etree.SubElement(infDoc, 'cMunDescarga').text = evento.cmun_descarga
+ etree.SubElement(infDoc, 'xMunDescarga').text = evento.xmun_descarga
+ etree.SubElement(infDoc, 'chNFe').text = evento.chave_nfe
+ elif evento.descricao == 'Pagamento Operacao MDF-e':
+ pagamento = etree.SubElement(det, 'evPagtoOperMDFe')
+ etree.SubElement(pagamento, 'descEvento').text = evento.descricao
+ etree.SubElement(pagamento, 'nProt').text = evento.protocolo
+
+ # Viagens
+ infViagens = etree.SubElement(pagamento, 'infViagens')
+ etree.SubElement(infViagens, 'qtdViagens').text = evento.qtd_viagens.zfill(5)
+ etree.SubElement(infViagens, 'nroViagem').text = evento.nro_viagens.zfill(5)
+
+ # Informações do pagamento
+ infPag = etree.SubElement(pagamento, 'infPag')
+ etree.SubElement(infPag, 'xNome').text = evento.nome_contratante
+ if len(evento.cpfcnpj_contratante) == 11:
+ etree.SubElement(infPag, 'CPF').text = evento.cpfcnpj_contratante
+ else:
+ etree.SubElement(infPag, 'CNPJ').text = evento.cpfcnpj_contratante
+
+ # Componentes de Pagamento do Frete
+ Comp = etree.SubElement(infPag, 'Comp')
+ etree.SubElement(Comp, 'tpComp').text = evento.tpComp.zfill(2)
+ etree.SubElement(Comp, 'vComp').text = '{:.2f}'.format(evento.vComp)
+
+ # Continuação das Informações do pagamento
+ etree.SubElement(infPag, 'vContrato').text = '{:.2f}'.format(evento.vContrato)
+ etree.SubElement(infPag, 'indPag').text = evento.indPag
+
+ # Se indPag == 1 (0=A vista e 1=A prazo)
+ if evento.indPag != '':
+ if int(evento.indPag) == 1:
+ infPrazo = etree.SubElement(infPag, 'infPrazo')
+ etree.SubElement(infPrazo, 'nParcela').text = evento.nParcela.zfill(3)
+ etree.SubElement(infPrazo, 'dVenc').text = evento.dVenc.strftime('%Y-%m-%d')
+ etree.SubElement(infPrazo, 'vParcela').text = '{:.2f}'.format(evento.vParcela)
+
+ # Informações bancárias
+ infBanc = etree.SubElement(infPag, 'infBanc')
+ if evento.CNPJIPEF != '':
+ etree.SubElement(infBanc, 'CNPJIPEF').text = evento.CNPJIPEF.zfill(14)
+ else:
+ etree.SubElement(infBanc, 'codBanco').text = evento.codBanco
+ etree.SubElement(infBanc, 'codAgencia').text = evento.codAgencia
+
+ if retorna_string:
+ return etree.tostring(raiz, encoding="unicode", pretty_print=True)
+ else:
+ return raiz
+
class SerializacaoQrcode(object):
""" Classe que gera e serializa o qrcode de NFC-e no xml """
diff --git a/pynfe/utils/__init__.py b/pynfe/utils/__init__.py
index 2aeeab6..a971aa1 100644
--- a/pynfe/utils/__init__.py
+++ b/pynfe/utils/__init__.py
@@ -3,13 +3,17 @@
import os
import codecs
from unicodedata import normalize
+import re
try:
from lxml import etree
except ImportError:
raise Exception('Falhou ao importar lxml/ElementTree')
-from io import StringIO
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
try:
from . import flags
@@ -146,3 +150,11 @@ def obter_uf_por_codigo(codigo_uf):
def remover_acentos(txt):
return normalize('NFKD', txt).encode('ASCII','ignore').decode('ASCII')
+
+
+def extrai_id_srtxml(edoc):
+ result = ''
+ match = re.search('Id=[^0-9]+(\d+)"', edoc)
+ if match:
+ result = match.group(1)
+ return result
\ No newline at end of file
diff --git a/pynfe/utils/flags.py b/pynfe/utils/flags.py
index 084389d..d43df33 100644
--- a/pynfe/utils/flags.py
+++ b/pynfe/utils/flags.py
@@ -10,6 +10,11 @@ NAMESPACE_METODO = 'http://www.portalfiscal.inf.br/nfe/wsdl/'
NAMESPACE_SOAP_NFSE = 'http://schemas.xmlsoap.org/soap/envelope/'
NAMESPACE_BETHA = 'http://www.betha.com.br/e-nota-contribuinte-ws'
+NAMESPACE_MDFE = 'http://www.portalfiscal.inf.br/mdfe'
+NAMESPACE_MDFE_METODO = 'http://www.portalfiscal.inf.br/mdfe/wsdl/'
+MODELO_MDFE = '58'
+VERSAO_MDFE = '3.00'
+
VERSAO_PADRAO = '4.00'
VERSAO_QRCODE = '2'
diff --git a/pynfe/utils/webservices.py b/pynfe/utils/webservices.py
index 49c6029..acc1585 100644
--- a/pynfe/utils/webservices.py
+++ b/pynfe/utils/webservices.py
@@ -498,3 +498,18 @@ NFSE = {
'HOMOLOGACAO':'https://homologacao.ginfes.com.br/ServiceGinfesImpl?wsdl'
}
}
+
+# MDF-e
+MDFE = {
+ # unico autorizador de MDF-e
+ 'SVRS': {
+ 'RECEPCAO': 'mdferecepcao/MDFeRecepcao.asmx',
+ 'RET_RECEPCAO': 'mdferetrecepcao/MDFeRetRecepcao.asmx',
+ 'EVENTOS': 'mdferecepcaoevento/MDFeRecepcaoEvento.asmx',
+ 'CONSULTA': 'mdfeconsulta/MDFeConsulta.asmx',
+ 'STATUS': 'mdfestatusservico/MDFeStatusServico.asmx',
+ 'NAO_ENCERRADOS': 'mdfeconsnaoenc/MDFeConsNaoEnc.asmx',
+ 'HTTPS': 'https://mdfe.svrs.rs.gov.br/ws/',
+ 'HOMOLOGACAO': 'https://mdfe-homologacao.svrs.rs.gov.br/ws/'
+ }
+}