Browse Source

feat: Criada a comunicação com o webservice do MDF-e. Separação das classes de NFe, NFSe e MDFe.

pull/79/head
Leonardo Gregianin 6 years ago
parent
commit
1d38a8cfd6
  1. 109
      pynfe/entidades/evento.py
  2. 5
      pynfe/processamento/__init__.py
  3. 647
      pynfe/processamento/comunicacao.py
  4. 214
      pynfe/processamento/mdfe.py
  5. 437
      pynfe/processamento/nfe.py
  6. 234
      pynfe/processamento/nfse.py
  7. 28
      pynfe/processamento/resposta.py
  8. 104
      pynfe/processamento/serializacao.py
  9. 14
      pynfe/utils/__init__.py
  10. 5
      pynfe/utils/flags.py
  11. 15
      pynfe/utils/webservices.py

109
pynfe/entidades/evento.py

@ -4,8 +4,10 @@
@author: Junior Tada, Leonardo Tada @author: Junior Tada, Leonardo Tada
""" """
from decimal import Decimal
from .base import Entidade from .base import Entidade
class Evento(Entidade): class Evento(Entidade):
# - Identificador da TAG a ser assinada, a regra de formação do Id é: “ID” + tpEvento + chave da NF-e + nSeqEvento # - Identificador da TAG a ser assinada, a regra de formação do Id é: “ID” + tpEvento + chave da NF-e + nSeqEvento
id = str() id = str()
@ -115,3 +117,110 @@ class EventoManifestacaoDest(Evento):
# - Informar a justificativa porque a operação não foi realizada, este campo deve ser informado somente no evento de Operação não Realizada. (min 15 max 255 caracteres) # - Informar a justificativa porque a operação não foi realizada, este campo deve ser informado somente no evento de Operação não Realizada. (min 15 max 255 caracteres)
justificativa = str() justificativa = str()
class EventoEncerramento(Evento):
def __init__(self, *args, **kwargs):
super(EventoEncerramento, self).__init__(*args, **kwargs)
# - Código do evento = 110112
self.tp_evento = '110112'
# - "Encerramento"
self.descricao = 'Encerramento'
# - Informar o número do Protocolo de Autorização da MDF-e a ser Encerrada
protocolo = str()
# - Data e hora do evento no formato AAAA-MM-DDThh:mm:ssTZD
dtenc = None
# - uf de onde a manifesto foi encerrado
cuf = str()
# - minicipio onde o manifesto foi encerrado
cmun = str()
class EventoInclusaoCondutor(Evento):
def __init__(self, *args, **kwargs):
super(EventoInclusaoCondutor, self).__init__(*args, **kwargs)
# - Código do evento = 110114
self.tp_evento = '110114'
# - "Encerramento"
self.descricao = 'Inclusão Condutor'
# - Nome do motorista
nome_motorista = str()
# - CPF do motorista
cpf_motorista = str()
class EventoInclusaoDFe(Evento):
def __init__(self, *args, **kwargs):
super(EventoInclusaoDFe, self).__init__(*args, **kwargs)
# - Código do evento = 110115
self.tp_evento = '110115'
# - "Inclusao DF-e"
self.descricao = 'Inclusao DF-e'
# - Informar o número do Protocolo de Autorização da MDF-e a ser Incluida nova NF-e
protocolo = str()
# - Código IBGE do Município de Carregamento
cmun_carrega = str()
# - Nome do Município de Carregamento
xmun_carrega = str()
# - Código IBGE do Município de Descarga
cmun_descarga = str()
# - Nome do Município de Descarga
xmun_descarga = str()
# - Chave de Acesso da NF-e a ser incluída no MDFe
chave_nfe = str()
class EventoInclusaoPagamento(Evento):
def __init__(self, *args, **kwargs):
super(EventoInclusaoPagamento, self).__init__(*args, **kwargs)
# - Código do evento = 110116
self.tp_evento = '110116'
# - "Pagamento Operacao MDF-e"
self.descricao = 'Pagamento Operacao MDF-e'
# - Informar o número do Protocolo de Autorização da MDF-e a ser Incluida nova NF-e
protocolo = str()
# - Quantidade de viagens
qtd_viagens = str()
# - Número da viagem
nro_viagens = str()
# Informações do pagamento
# - Nome do Contratante
nome_contratante = str()
# - CPF/CNPJ do Contratante
cpfcnpj_contratante = str()
# Componentes do Pagamento
# - Tipo do pagamento
tpComp = str()
# - Valor
vComp = Decimal()
# - Valor total do contrato
vContrato = Decimal()
# - Tipo do pagamento (0=a vista e 1=a prazo)
indPag = str()
# Se o pagamento for a prazo
# - Numero da parcela
nParcela = str()
# - Data vencimento
dVenc = None
# - Valor da parcela
vParcela = Decimal()
# Informações bancárias
# - CNPJ da Instituição de Pagamento eletrônico do Frete
CNPJIPEF = str()
# - Código do Banco
codBanco = str()
# - Código da Agência
codAgencia = str()

5
pynfe/processamento/__init__.py

@ -2,5 +2,8 @@ from .serializacao import SerializacaoXML
from .serializacao import SerializacaoNfse from .serializacao import SerializacaoNfse
from .validacao import Validacao from .validacao import Validacao
from .assinatura import AssinaturaA1 from .assinatura import AssinaturaA1
from .comunicacao import ComunicacaoSefaz
from pynfe.entidades.certificado import CertificadoA1
from .nfe import ComunicacaoNFe
from .mdfe import ComunicacaoMDFe
from .nfse import ComunicacaoNfse
from .danfe import DanfeNfce from .danfe import DanfeNfce

647
pynfe/processamento/comunicacao.py

@ -1,8 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import re
import ssl import ssl
import datetime import datetime
import requests
from pynfe.utils import etree, so_numeros from pynfe.utils import etree, so_numeros
from pynfe.utils.flags import ( from pynfe.utils.flags import (
NAMESPACE_NFE, NAMESPACE_NFE,
@ -15,14 +14,13 @@ from pynfe.utils.flags import (
NAMESPACE_METODO NAMESPACE_METODO
) )
from pynfe.utils.webservices import NFE, NFCE, NFSE from pynfe.utils.webservices import NFE, NFCE, NFSE
from pynfe.entidades.certificado import CertificadoA1
from .assinatura import AssinaturaA1 from .assinatura import AssinaturaA1
class Comunicacao(object): class Comunicacao(object):
""" """
Classe abstrata responsavel por definir os metodos e logica das classes Classe abstrata responsavel por definir os metodos e logica das classes
de comunicação com os webservices da NF-e.
de comunicação com os webservices.
""" """
_ambiente = 1 # 1 = Produção, 2 = Homologação _ambiente = 1 # 1 = Produção, 2 = Homologação
@ -30,637 +28,22 @@ class Comunicacao(object):
certificado = None certificado = None
certificado_senha = None certificado_senha = None
url = None url = None
_versao = False
_assinatura = AssinaturaA1
_namespace = False
_header = False
_envio_mensagem = False
_namespace_metodo = False
_accept = False
_soap_action = False
_ws_url = False
_namespace_soap = NAMESPACE_SOAP
_namespace_xsi = NAMESPACE_XSI
_namespace_xsd = NAMESPACE_XSD
_soap_version = 'soap'
def __init__(self, uf, certificado, certificado_senha, homologacao=False): def __init__(self, uf, certificado, certificado_senha, homologacao=False):
self.uf = uf self.uf = uf
self.certificado = certificado self.certificado = certificado
self.certificado_senha = certificado_senha self.certificado_senha = certificado_senha
self._ambiente = 2 if homologacao else 1 self._ambiente = 2 if homologacao else 1
class ComunicacaoSefaz(Comunicacao):
"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
_versao = VERSAO_PADRAO
_assinatura = AssinaturaA1
def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1):
"""
Método para realizar autorização da nota de acordo com o modelo
:param modelo: Modelo
:param nota_fiscal: XML assinado
:param id_lote: Id do lote - numero autoincremental gerado pelo sistema
:param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
:return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário,
envia todo o soap de resposta da Sefaz para decisão do usuário.
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='AUTORIZACAO')
# Monta XML do corpo da requisição
raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono
raiz.append(nota_fiscal)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeAutorizacao4', raiz)
# Faz request no Servidor da Sefaz
retorno = self._post(url, xml)
# Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
# Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
if retorno.status_code == 200:
# namespace
ns = {'ns': NAMESPACE_NFE}
# Procuta status no xml
try:
prot = etree.fromstring(retorno.text)
except ValueError:
# em SP retorno.text apresenta erro
prot = etree.fromstring(retorno.content)
if ind_sinc == 1:
try:
# Protocolo com envio OK
try:
inf_prot = prot[0][0] # root protNFe
except IndexError:
# Estados como GO vem com a tag header
inf_prot = prot[1][0]
lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
# Lote processado
if lote_status == '104':
prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0]
status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
# autorizado usa da NF-e
# retorna xml final (protNFe+NFe)
if status == '100':
raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
raiz.append(nota_fiscal)
raiz.append(prot_nfe)
return 0, raiz
except IndexError:
# Protocolo com algum erro no Envio
return 1, retorno, nota_fiscal
else:
# Retorna id do protocolo para posterior consulta em caso de sucesso.
rec = prot[0][0]
status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
# Lote Recebido com Sucesso!
if status == '103':
nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
return 0, nrec, nota_fiscal
return 1, retorno, nota_fiscal
def consulta_recibo(self, modelo, numero):
"""
Este método oferece a consulta do resultado do processamento de um lote de NF-e.
O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de
15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado
deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em
Processamento".
:param modelo: Modelo da nota
:param numero: Número da nota
:return:
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='RECIBO')
# Monta XML do corpo da requisição
raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'nRec').text = numero
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz)
return self._post(url, xml)
def consulta_nota(self, modelo, chave):
"""
Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal
da Secretaria de Fazenda Estadual.
:param modelo: Modelo da nota
:param chave: Chave da nota
:return:
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='CHAVE')
# Monta XML do corpo da requisição
raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
etree.SubElement(raiz, 'chNFe').text = chave
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz)
return self._post(url, xml)
def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0):
"""
O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag
informada no XML. As tags são distNSU, consNSU e consChNFe.
a) distNSU Distribuição de Conjunto de DF-e a Partir do NSU Informado
b) consNSU Consulta DF-e Vinculado ao NSU Informado
c) consChNFe Consulta de NF-e por Chave de Acesso Informada
:param cnpj: CNPJ do interessado
:param cpf: CPF do interessado
:param chave: Chave da NF-e a ser consultada
:param nsu: Ultimo nsu ou nsu específico para ser consultado.
:return:
"""
# url
url = self._get_url_an(consulta='DISTRIBUICAO')
# Monta XML para envio da requisição
raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
if self.uf:
etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()]
if cnpj:
etree.SubElement(raiz, 'CNPJ').text = cnpj
else:
etree.SubElement(raiz, 'CPF').text = cpf
if not chave:
distNSU = etree.SubElement(raiz, 'distNSU')
etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15)
if chave:
consChNFe = etree.SubElement(raiz, 'consChNFe')
etree.SubElement(consChNFe, 'chNFe').text = chave
#Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz)
return self._post(url, xml)
def consulta_cadastro(self, modelo, cnpj):
"""
Consulta de cadastro
:param modelo: Modelo da nota
:param cnpj: CNPJ da empresa
:return:
"""
# UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC
lista_svrs = ['AC', 'RN', 'PB', 'SC']
# RS implementa um método diferente na consulta de cadastro
if self.uf.upper() == 'RS':
url = NFE['RS']['CADASTRO']
elif self.uf.upper() in lista_svrs:
url = NFE['SVRS']['CADASTRO']
elif self.uf.upper() == 'SVC-RS':
url = NFE['SVC-RS']['CADASTRO']
else:
url = self._get_url(modelo=modelo, consulta='CADASTRO')
raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE)
info = etree.SubElement(raiz, 'infCons')
etree.SubElement(info, 'xServ').text = 'CONS-CAD'
etree.SubElement(info, 'UF').text = self.uf.upper()
etree.SubElement(info, 'CNPJ').text = cnpj
# etree.SubElement(info, 'CPF').text = cpf
# Monta XML para envio da requisição
xml = self._construir_xml_soap('CadConsultaCadastro4', raiz)
# Chama método que efetua a requisição POST no servidor SOAP
return self._post(url, xml)
def evento(self, modelo, evento, id_lote=1):
"""
Envia um evento de nota fiscal (cancelamento e carta de correção)
:param modelo: Modelo da nota
:param evento: Eventro
:param id_lote: Id do lote
:return:
"""
# url do serviço
try:
# manifestacao url é do AN
if evento[0][5].text.startswith('2'):
url = self._get_url_an(consulta='EVENTOS')
else:
url = self._get_url(modelo=modelo, consulta='EVENTOS')
except Exception:
url = self._get_url(modelo=modelo, consulta='EVENTOS')
# Monta XML do corpo da requisição
raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
raiz.append(evento)
xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz)
return self._post(url, xml)
def status_servico(self, modelo):
"""
Verifica status do servidor da receita.
:param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce
:return:
"""
url = self._get_url(modelo, 'STATUS')
# Monta XML do corpo da requisição
raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
etree.SubElement(raiz, 'xServ').text = 'STATUS'
xml = self._construir_xml_soap('NFeStatusServico4', raiz)
return self._post(url, xml)
def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'):
"""
Serviço destinado ao atendimento de solicitações de inutilização de numeração.
:param modelo: Modelo da nota
:param cnpj: CNPJda empresa
:param numero_inicial: Número inicial
:param numero_final: Número final
:param justificativa: Justificativa
:param ano: Ano
:param serie: Série
:return:
"""
# url do servico
url = self._get_url(modelo=modelo, consulta='INUTILIZACAO')
# Valores default
ano = str(ano or datetime.date.today().year)[-2:]
uf = CODIGOS_ESTADOS[self.uf.upper()]
cnpj = so_numeros(cnpj)
# Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) +
# CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID”
id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % {
'uf': uf,
'ano': ano,
'cnpj': cnpj,
'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e;
'serie': serie.zfill(3),
'num_ini': str(numero_inicial).zfill(9),
'num_fin': str(numero_final).zfill(9),
}
# Monta XML do corpo da requisição # FIXME
raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
etree.SubElement(inf_inut, 'cUF').text = uf
etree.SubElement(inf_inut, 'ano').text = ano
etree.SubElement(inf_inut, 'CNPJ').text = cnpj
etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e
etree.SubElement(inf_inut, 'serie').text = serie
etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
etree.SubElement(inf_inut, 'xJust').text = justificativa
# assinatura
a1 = AssinaturaA1(self.certificado, self.certificado_senha)
xml = a1.assinar(raiz)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeInutilizacao4', xml)
# Faz request no Servidor da Sefaz e retorna resposta
return self._post(url, xml)
def _get_url_an(self, consulta):
# producao
if self._ambiente == 1:
if consulta == 'DISTRIBUICAO':
ambiente = 'https://www1.'
else:
ambiente = 'https://www.'
# homologacao
else:
ambiente = 'https://hom.'
self.url = ambiente + NFE['AN'][consulta]
return self.url
def _get_url(self, modelo, consulta):
""" Retorna a url para comunicação com o webservice """
# estado que implementam webservices proprios
lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS']
if self.uf.upper() in lista:
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta]
elif modelo == 'nfce':
# PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe
if self.uf.upper() == 'PE' or self.uf.upper() == 'BA':
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
# Estados que utilizam outros ambientes
else:
lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO']
if self.uf.upper() in lista_svrs:
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
elif modelo == 'nfce':
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
# unico UF que utiliza SVAN ainda para NF-e
# SVRS para NFC-e
elif self.uf.upper() == 'MA':
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
elif modelo == 'nfce':
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
else:
raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}")
return self.url
def _construir_xml_soap(self, metodo, dados, cabecalho=False):
"""Mota o XML para o envio via SOAP"""
raiz = etree.Element('{%s}Envelope' % NAMESPACE_SOAP, nsmap={
'xsi': NAMESPACE_XSI, 'xsd': NAMESPACE_XSD,'soap': NAMESPACE_SOAP})
body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP)
## distribuição tem um corpo de xml diferente
if metodo == 'NFeDistribuicaoDFe':
x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo)
a = etree.SubElement(x, 'nfeDadosMsg')
else:
a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo)
a.append(dados)
return raiz
def _post_header(self):
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
# PE é a única UF que exige SOAPAction no header
response = {
'content-type': 'application/soap+xml; charset=utf-8;',
'Accept': 'application/soap+xml; charset=utf-8;',
}
if self.uf.upper() == 'PE':
response["SOAPAction"] = ""
return response
def _post(self, url, xml):
certificado_a1 = CertificadoA1(self.certificado)
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
chave_cert = (cert, chave)
# Abre a conexão HTTPS
try:
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# limpa xml com caracteres bugados para infNFeSupl em NFC-e
xml = re.sub(
'<qrCode>(.*?)</qrCode>',
lambda x: x.group(0).replace('&lt;', '<').replace('&gt;', '>').replace('&amp;', ''),
etree.tostring(xml, encoding='unicode').replace('\n', '')
)
xml = xml_declaration + xml
# Faz o request com o servidor
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False)
result.encoding = 'utf-8'
return result
except requests.exceptions.RequestException as e:
raise e
finally:
certificado_a1.excluir()
class ComunicacaoNfse(Comunicacao):
""" Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
_versao = ''
_namespace = ''
def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
self.certificado = certificado
self.certificado_senha = certificado_senha
self._ambiente = 2 if homologacao else 1
self.autorizador = autorizador.upper()
if self.autorizador == 'GINFES':
self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
self._versao = '3'
elif self.autorizador == 'BETHA':
self._namespace = NAMESPACE_BETHA
self._versao = '2.02'
else:
raise Exception('Autorizador não encontrado!')
def autorizacao(self, nota):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# xml
xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
# comunica via wsdl
return self._post(url, xml, 'gerar')
else:
raise Exception('Este método só esta implementado no autorizador betha.')
def enviar_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'enviar_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'consulta')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar_rps(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'consultaRps')
elif self.autorizador == 'GINFES':
return self._post_https(url, xml, 'consultaRps')
# TODO outros autorizadres
else:
raise Exception('Autorizador não encontrado!')
def consultar_faixa(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'consultaFaixa')
else:
raise Exception('Este método só esta implementado no autorizador betha.')
def consultar_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'consulta_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar_situacao_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# comunica via wsdl
return self._post_https(url, xml, 'consulta_situacao_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def cancelar(self, xml):
# url do serviço
url = self._get_url()
# Betha
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'cancelar')
# Ginfes
elif self.autorizador == 'GINFES':
# comunica via wsdl com certificado
return self._post_https(url, xml, 'cancelar')
# TODO outros autorizadres
else:
raise Exception('Autorizador não encontrado!')
def _cabecalho(self, retorna_string=True):
""" Monta o XML do cabeçalho da requisição wsdl
Namespaces padrão homologação (Ginfes) """
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# cabecalho = '<ns2:cabecalho versao="3" xmlns:ns2="http://www.ginfes.com.br/cabecalho_v03.xsd"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><versaoDados>3</versaoDados></ns2:cabecalho>'
# cabecalho
raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = self._versao
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _cabecalho2(self, retorna_string=True):
""" Monta o XML do cabeçalho da requisição wsdl
Namespaces que funcionaram em produção (Ginfes)"""
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# cabecalho
raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = self._versao
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _cabecalho_ginfes(self):
""" Retorna o XML do cabeçalho gerado pelo xsd"""
from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
return SerializacaoGinfes().cabecalho()
def _get_url(self):
""" Retorna a url para comunicação com o webservice """
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if self.autorizador in NFSE:
self.url = NFSE[self.autorizador][ambiente]
else:
raise Exception('Autorizador nao encontrado!')
return self.url
def _post(self, url, xml, metodo):
""" Comunicação wsdl (http) sem certificado digital """
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
cliente = Client(url)
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
return cliente.service.CancelarNfse(cabecalho, xml)
# TODO outros metodos
else:
raise Exception('Método não implementado no autorizador.')
except Exception as e:
raise e
def _post_https(self, url, xml, metodo):
""" Comunicação wsdl (https) utilizando certificado do usuário """
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
from pynfe.utils.https_nfse import HttpAuthenticated
certificadoA1 = CertificadoA1(self.certificado)
chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'enviar_lote':
return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
elif metodo == 'consulta':
return cliente.service.ConsultarNfseV3(cabecalho, xml)
elif metodo == 'consulta_lote':
return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
elif metodo == 'consulta_situacao_lote':
return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
# versão 2
return cliente.service.CancelarNfse(xml)
# versão 3
# return cliente.service.CancelarNfseV3(cabecalho, xml)
# TODO outros metodos
else:
raise Exception('Método não implementado no autorizador.')
except Exception as e:
raise e

214
pynfe/processamento/mdfe.py

@ -0,0 +1,214 @@
# -*- coding: utf-8 -*-
import time
import re
import requests
import collections
from io import StringIO
from pynfe.utils.flags import (
NAMESPACE_MDFE,
MODELO_MDFE,
VERSAO_MDFE,
NAMESPACE_MDFE_METODO,
NAMESPACE_SOAP,
NAMESPACE_XSI,
NAMESPACE_XSD,
CODIGOS_ESTADOS
)
from pynfe.utils.webservices import MDFE
from pynfe.entidades.certificado import CertificadoA1
from pynfe.utils import etree, extrai_id_srtxml
from .comunicacao import Comunicacao
from .resposta import analisar_retorno
MDFE_SITUACAO_JA_ENVIADO = ('100', '101', '132')
class ComunicacaoMDFe(Comunicacao):
_modelo = MODELO_MDFE
_namespace = NAMESPACE_MDFE
_versao = VERSAO_MDFE
_header = 'mdfeCabecMsg'
_envio_mensagem = 'mdfeDadosMsg'
_retorno_mensagem = 'mdfeRecepcaoResult'
_namespace_metodo = NAMESPACE_MDFE_METODO
_accept = True
_soap_action = False
_namespace_soap = NAMESPACE_SOAP
_namespace_xsi = NAMESPACE_XSI
_namespace_xsd = NAMESPACE_XSD
_soap_version = 'soap12'
_edoc_situacao_ja_enviado = MDFE_SITUACAO_JA_ENVIADO
_edoc_situacao_arquivo_recebido_com_sucesso = '103'
_edoc_situacao_em_processamento = '105'
_edoc_situacao_servico_em_operacao = '107'
consulta_servico_ao_enviar = True
maximo_tentativas_consulta_recibo = 5
def status_servico(self):
url = self._get_url('STATUS')
# Monta XML do corpo da requisição
raiz = etree.Element('consStatServMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'STATUS'
xml = self._construir_xml_soap('MDFeStatusServico', raiz)
return self._post(url, xml)
def consulta(self, chave):
url = self._get_url('CONSULTA')
# Monta XML do corpo da requisição
raiz = etree.Element('consSitMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
etree.SubElement(raiz, 'chMDFe').text = chave
# Monta XML para envio da requisição
xml = self._construir_xml_soap('MDFeConsulta', raiz)
return self._post(url, xml)
def consulta_nao_encerrados(self, cpfcnpj):
url = self._get_url('NAO_ENCERRADOS')
# Monta XML do corpo da requisição
# raiz = etree.Element('consMDFeNaoEnc', xmlns=NAMESPACE_MDFE, versao=self._versao)
attr = collections.OrderedDict()
attr['xmlns'] = NAMESPACE_MDFE
attr['versao'] = self._versao
raiz = etree.Element('consMDFeNaoEnc', attr)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR NÃO ENCERRADOS'
if len(cpfcnpj) == 11:
etree.SubElement(raiz, 'CPF').text = cpfcnpj.zfill(11)
else:
etree.SubElement(raiz, 'CNPJ').text = cpfcnpj.zfill(14)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('MDFeConsNaoEnc', raiz)
return self._post(url, xml)
def consulta_recibo(self, numero):
url = self._get_url('RET_RECEPCAO')
# Monta XML do corpo da requisição
raiz = etree.Element('consReciMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'nRec').text = numero.zfill(15)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('MDFeRetRecepcao', raiz)
return self._post(url, xml)
def evento(self, evento):
"""
Envia eventos do MDFe como:
Encerramento
Cancelamento
Inclusao Condutor
Inclusao DF-e
Pagamento Operacao MDF-e
:param evento: Nome do Evento
:return:
"""
# url do serviço
url = self._get_url('EVENTOS')
# Monta XML do corpo da requisição
xml = self._construir_xml_soap('MDFeRecepcaoEvento', evento)
return self._post(url, xml)
def _construir_xml_soap(self, metodo, dados):
"""Mota o XML para o envio via SOAP"""
ns = collections.OrderedDict()
ns['xsi'] = self._namespace_xsi
ns['xsd'] = self._namespace_xsd
ns[self._soap_version] = self._namespace_soap
raiz = etree.Element(
'{%s}Envelope' % self._namespace_soap,
nsmap=ns
)
if self._header:
cabecalho = self._cabecalho_soap(metodo)
c = etree.SubElement(raiz, '{%s}Header' % self._namespace_soap)
c.append(cabecalho)
body = etree.SubElement(raiz, '{%s}Body' % self._namespace_soap)
a = etree.SubElement(
body,
self._envio_mensagem,
xmlns=self._namespace_metodo+metodo
)
a.append(dados)
return raiz
def _post_header(self, soap_webservice_method=False):
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
header = {
b'content-type': b'text/xml; charset=utf-8;',
}
# PE é a únca UF que exige SOAPAction no header
if soap_webservice_method:
header[b'SOAPAction'] = \
(self._namespace_metodo + soap_webservice_method).encode('utf-8')
if self._accept:
header[b'Accept'] = b'application/soap+xml; charset=utf-8;'
return header
def _post(self, url, xml):
certificado_a1 = CertificadoA1(self.certificado)
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
chave_cert = (cert, chave)
# Abre a conexão HTTPS
try:
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# limpa xml com caracteres bugados para infNFeSupl em NFC-e
xml = re.sub(
'<qrCode>(.*?)</qrCode>',
lambda x: x.group(0).replace('&lt;', '<').replace('&gt;', '>').replace('&amp;', ''),
etree.tostring(xml, encoding='unicode').replace('\n', '')
)
xml = xml_declaration + xml
# print(xml)
# print('-' * 20)
# Faz o request com o servidor
result = requests.post(
url,
xml,
headers=self._post_header(),
cert=chave_cert,
verify=False
)
result.encoding = 'utf-8'
return result
except requests.exceptions.RequestException as e:
raise e
finally:
certificado_a1.excluir()
def _cabecalho_soap(self, metodo):
"""Monta o XML do cabeçalho da requisição SOAP"""
raiz = etree.Element(
self._header,
xmlns=self._namespace_metodo + metodo
)
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
etree.SubElement(raiz, 'versaoDados').text = '3.00'
return raiz
def _get_url(self, consulta):
# producao
if self._ambiente == 1:
ambiente = MDFE['SVRS']['HTTPS']
# homologacao
else:
ambiente = MDFE['SVRS']['HOMOLOGACAO']
self.url = ambiente + MDFE['SVRS'][consulta]
return self.url

437
pynfe/processamento/nfe.py

@ -0,0 +1,437 @@
# -*- coding: utf-8 -*-
import datetime
import re
import requests
from pynfe.utils import etree, so_numeros
from pynfe.utils.flags import (
NAMESPACE_NFE,
VERSAO_PADRAO,
CODIGOS_ESTADOS,
NAMESPACE_METODO,
NAMESPACE_SOAP,
NAMESPACE_XSI,
NAMESPACE_XSD,
)
from pynfe.entidades.certificado import CertificadoA1
from pynfe.utils.webservices import NFE, NFCE
from .assinatura import AssinaturaA1
from .comunicacao import Comunicacao
class ComunicacaoNFe(Comunicacao):
"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados."""
_versao = VERSAO_PADRAO
_assinatura = AssinaturaA1
_namespace = NAMESPACE_NFE
_header = False
_envio_mensagem = 'nfeDadosMsg'
_namespace_metodo = NAMESPACE_METODO
_accept = False
_namespace_soap = NAMESPACE_SOAP
_namespace_xsi = NAMESPACE_XSI
_namespace_xsd = NAMESPACE_XSD
_soap_version = 'soap'
def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1):
"""
Método para realizar autorização da nota de acordo com o modelo
:param modelo: Modelo
:param nota_fiscal: XML assinado
:param id_lote: Id do lote - numero autoincremental gerado pelo sistema
:param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono
:return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário,
envia todo o soap de resposta da Sefaz para decisão do usuário.
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='AUTORIZACAO')
# Monta XML do corpo da requisição
raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono
raiz.append(nota_fiscal)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeAutorizacao4', raiz)
# Faz request no Servidor da Sefaz
retorno = self._post(url, xml)
# Em caso de sucesso, retorna xml com nfe e protocolo de autorização.
# Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário.
if retorno.status_code == 200:
# namespace
ns = {'ns': NAMESPACE_NFE}
# Procuta status no xml
try:
prot = etree.fromstring(retorno.text)
except ValueError:
# em SP retorno.text apresenta erro
prot = etree.fromstring(retorno.content)
if ind_sinc == 1:
try:
# Protocolo com envio OK
try:
inf_prot = prot[0][0] # root protNFe
except IndexError:
# Estados como GO vem com a tag header
inf_prot = prot[1][0]
lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
# Lote processado
if lote_status == '104':
prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0]
status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text
# autorizado usa da NF-e
# retorna xml final (protNFe+NFe)
if status == '100':
raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO)
raiz.append(nota_fiscal)
raiz.append(prot_nfe)
return 0, raiz
except IndexError:
# Protocolo com algum erro no Envio
return 1, retorno, nota_fiscal
else:
# Retorna id do protocolo para posterior consulta em caso de sucesso.
rec = prot[0][0]
status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text
# Lote Recebido com Sucesso!
if status == '103':
nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text
return 0, nrec, nota_fiscal
return 1, retorno, nota_fiscal
def consulta_recibo(self, modelo, numero):
"""
Este método oferece a consulta do resultado do processamento de um lote de NF-e.
O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de
15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado
deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em
Processamento".
:param modelo: Modelo da nota
:param numero: Número da nota
:return:
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='RECIBO')
# Monta XML do corpo da requisição
raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'nRec').text = numero
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz)
return self._post(url, xml)
def consulta_nota(self, modelo, chave):
"""
Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal
da Secretaria de Fazenda Estadual.
:param modelo: Modelo da nota
:param chave: Chave da nota
:return:
"""
# url do serviço
url = self._get_url(modelo=modelo, consulta='CHAVE')
# Monta XML do corpo da requisição
raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR'
etree.SubElement(raiz, 'chNFe').text = chave
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz)
return self._post(url, xml)
def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0):
"""
O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag
informada no XML. As tags são distNSU, consNSU e consChNFe.
a) distNSU Distribuição de Conjunto de DF-e a Partir do NSU Informado
b) consNSU Consulta DF-e Vinculado ao NSU Informado
c) consChNFe Consulta de NF-e por Chave de Acesso Informada
:param cnpj: CNPJ do interessado
:param cpf: CPF do interessado
:param chave: Chave da NF-e a ser consultada
:param nsu: Ultimo nsu ou nsu específico para ser consultado.
:return:
"""
# url
url = self._get_url_an(consulta='DISTRIBUICAO')
# Monta XML para envio da requisição
raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
if self.uf:
etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()]
if cnpj:
etree.SubElement(raiz, 'CNPJ').text = cnpj
else:
etree.SubElement(raiz, 'CPF').text = cpf
if not chave:
distNSU = etree.SubElement(raiz, 'distNSU')
etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15)
if chave:
consChNFe = etree.SubElement(raiz, 'consChNFe')
etree.SubElement(consChNFe, 'chNFe').text = chave
#Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz)
return self._post(url, xml)
def consulta_cadastro(self, modelo, cnpj):
"""
Consulta de cadastro
:param modelo: Modelo da nota
:param cnpj: CNPJ da empresa
:return:
"""
# UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC
lista_svrs = ['AC', 'RN', 'PB', 'SC']
# RS implementa um método diferente na consulta de cadastro
if self.uf.upper() == 'RS':
url = NFE['RS']['CADASTRO']
elif self.uf.upper() in lista_svrs:
url = NFE['SVRS']['CADASTRO']
elif self.uf.upper() == 'SVC-RS':
url = NFE['SVC-RS']['CADASTRO']
else:
url = self._get_url(modelo=modelo, consulta='CADASTRO')
raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE)
info = etree.SubElement(raiz, 'infCons')
etree.SubElement(info, 'xServ').text = 'CONS-CAD'
etree.SubElement(info, 'UF').text = self.uf.upper()
etree.SubElement(info, 'CNPJ').text = cnpj
# etree.SubElement(info, 'CPF').text = cpf
# Monta XML para envio da requisição
xml = self._construir_xml_soap('CadConsultaCadastro4', raiz)
# Chama método que efetua a requisição POST no servidor SOAP
return self._post(url, xml)
def evento(self, modelo, evento, id_lote=1):
"""
Envia um evento de nota fiscal (cancelamento e carta de correção)
:param modelo: Modelo da nota
:param evento: Eventro
:param id_lote: Id do lote
:return:
"""
# url do serviço
try:
# manifestacao url é do AN
if evento[0][5].text.startswith('2'):
url = self._get_url_an(consulta='EVENTOS')
else:
url = self._get_url(modelo=modelo, consulta='EVENTOS')
except Exception:
url = self._get_url(modelo=modelo, consulta='EVENTOS')
# Monta XML do corpo da requisição
raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema
raiz.append(evento)
xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz)
return self._post(url, xml)
def status_servico(self):
"""
Verifica status do servidor da receita.
:param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce
:return:
"""
url = self._get_url('mdfe', 'STATUS')
# Monta XML do corpo da requisição
raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente)
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()]
etree.SubElement(raiz, 'xServ').text = 'STATUS'
xml = self._construir_xml_soap('NFeStatusServico4', raiz)
return self._post(url, xml)
def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'):
"""
Serviço destinado ao atendimento de solicitações de inutilização de numeração.
:param modelo: Modelo da nota
:param cnpj: CNPJda empresa
:param numero_inicial: Número inicial
:param numero_final: Número final
:param justificativa: Justificativa
:param ano: Ano
:param serie: Série
:return:
"""
# url do servico
url = self._get_url(modelo=modelo, consulta='INUTILIZACAO')
# Valores default
ano = str(ano or datetime.date.today().year)[-2:]
uf = CODIGOS_ESTADOS[self.uf.upper()]
cnpj = so_numeros(cnpj)
# Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) +
# CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID”
id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % {
'uf': uf,
'ano': ano,
'cnpj': cnpj,
'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e;
'serie': serie.zfill(3),
'num_ini': str(numero_inicial).zfill(9),
'num_fin': str(numero_final).zfill(9),
}
# Monta XML do corpo da requisição # FIXME
raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE)
inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico)
etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente)
etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR'
etree.SubElement(inf_inut, 'cUF').text = uf
etree.SubElement(inf_inut, 'ano').text = ano
etree.SubElement(inf_inut, 'CNPJ').text = cnpj
etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e
etree.SubElement(inf_inut, 'serie').text = serie
etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial)
etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final)
etree.SubElement(inf_inut, 'xJust').text = justificativa
# assinatura
a1 = AssinaturaA1(self.certificado, self.certificado_senha)
xml = a1.assinar(raiz)
# Monta XML para envio da requisição
xml = self._construir_xml_soap('NFeInutilizacao4', xml)
# Faz request no Servidor da Sefaz e retorna resposta
return self._post(url, xml)
def _get_url_an(self, consulta):
# producao
if self._ambiente == 1:
if consulta == 'DISTRIBUICAO':
ambiente = 'https://www1.'
else:
ambiente = 'https://www.'
# homologacao
else:
ambiente = 'https://hom.'
self.url = ambiente + NFE['AN'][consulta]
return self.url
def _get_url(self, modelo, consulta):
""" Retorna a url para comunicação com o webservice """
# estado que implementam webservices proprios
lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS']
if self.uf.upper() in lista:
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta]
elif modelo == 'nfce':
# PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe
if self.uf.upper() == 'PE' or self.uf.upper() == 'BA':
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
# Estados que utilizam outros ambientes
else:
lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO']
if self.uf.upper() in lista_svrs:
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta]
elif modelo == 'nfce':
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
# unico UF que utiliza SVAN ainda para NF-e
# SVRS para NFC-e
elif self.uf.upper() == 'MA':
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if modelo == 'nfe':
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3
self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta]
elif modelo == 'nfce':
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta]
else:
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"')
else:
raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}")
return self.url
def _construir_xml_soap(self, metodo, dados, cabecalho=False):
"""Mota o XML para o envio via SOAP"""
raiz = etree.Element(
'{%s}Envelope' % NAMESPACE_SOAP,
nsmap={
'xsi': NAMESPACE_XSI,
'xsd': NAMESPACE_XSD,
'soap': NAMESPACE_SOAP
}
)
body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP)
# distribuição tem um corpo de xml diferente
if metodo == 'NFeDistribuicaoDFe':
x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo)
a = etree.SubElement(x, 'nfeDadosMsg')
else:
a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo)
a.append(dados)
return raiz
def _post_header(self):
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP"""
# PE é a única UF que exige SOAPAction no header
response = {
'content-type': 'application/soap+xml; charset=utf-8;',
'Accept': 'application/soap+xml; charset=utf-8;',
}
if self.uf.upper() == 'PE':
response["SOAPAction"] = ""
return response
def _post(self, url, xml):
certificado_a1 = CertificadoA1(self.certificado)
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True)
chave_cert = (cert, chave)
# Abre a conexão HTTPS
try:
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# limpa xml com caracteres bugados para infNFeSupl em NFC-e
xml = re.sub(
'<qrCode>(.*?)</qrCode>',
lambda x: x.group(0).replace('&lt;', '<').replace('&gt;', '>').replace('&amp;', ''),
etree.tostring(xml, encoding='unicode').replace('\n', '')
)
xml = xml_declaration + xml
# Faz o request com o servidor
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False)
result.encoding = 'utf-8'
return result
except requests.exceptions.RequestException as e:
raise e
finally:
certificado_a1.excluir()

234
pynfe/processamento/nfse.py

@ -0,0 +1,234 @@
# -*- coding: utf-8 -*-
from pynfe.utils import etree
from pynfe.utils.flags import (
NAMESPACE_XSI,
NAMESPACE_BETHA,
)
from pynfe.utils.webservices import NFSE
from pynfe.entidades.certificado import CertificadoA1
from .comunicacao import Comunicacao
class ComunicacaoNfse(Comunicacao):
""" Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """
_versao = ''
_namespace = ''
def __init__(self, certificado, certificado_senha, autorizador, homologacao=False):
self.certificado = certificado
self.certificado_senha = certificado_senha
self._ambiente = 2 if homologacao else 1
self.autorizador = autorizador.upper()
if self.autorizador == 'GINFES':
self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd'
self._versao = '3'
elif self.autorizador == 'BETHA':
self._namespace = NAMESPACE_BETHA
self._versao = '2.02'
else:
raise Exception('Autorizador não encontrado!')
def autorizacao(self, nota):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# xml
xml = etree.tostring(nota, encoding='unicode', pretty_print=False)
# comunica via wsdl
return self._post(url, xml, 'gerar')
else:
raise Exception('Este método só esta implementado no autorizador betha.')
def enviar_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'enviar_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'consulta')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar_rps(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'consultaRps')
elif self.autorizador == 'GINFES':
return self._post_https(url, xml, 'consultaRps')
# TODO outros autorizadres
else:
raise Exception('Autorizador não encontrado!')
def consultar_faixa(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'consultaFaixa')
else:
raise Exception('Este método só esta implementado no autorizador betha.')
def consultar_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# xml
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml
# comunica via wsdl
return self._post_https(url, xml, 'consulta_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def consultar_situacao_lote(self, xml):
# url do serviço
url = self._get_url()
if self.autorizador == 'GINFES':
# comunica via wsdl
return self._post_https(url, xml, 'consulta_situacao_lote')
else:
raise Exception('Este método só esta implementado no autorizador ginfes.')
def cancelar(self, xml):
# url do serviço
url = self._get_url()
# Betha
if self.autorizador == 'BETHA':
# comunica via wsdl
return self._post(url, xml, 'cancelar')
# Ginfes
elif self.autorizador == 'GINFES':
# comunica via wsdl com certificado
return self._post_https(url, xml, 'cancelar')
# TODO outros autorizadres
else:
raise Exception('Autorizador não encontrado!')
def _cabecalho(self, retorna_string=True):
""" Monta o XML do cabeçalho da requisição wsdl
Namespaces padrão homologação (Ginfes) """
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# cabecalho = '<ns2:cabecalho versao="3" xmlns:ns2="http://www.ginfes.com.br/cabecalho_v03.xsd"
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><versaoDados>3</versaoDados></ns2:cabecalho>'
# cabecalho
raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = self._versao
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _cabecalho2(self, retorna_string=True):
""" Monta o XML do cabeçalho da requisição wsdl
Namespaces que funcionaram em produção (Ginfes)"""
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>'
# cabecalho
raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao)
etree.SubElement(raiz, 'versaoDados').text = self._versao
if retorna_string:
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '')
cabecalho = xml_declaration + cabecalho
return cabecalho
else:
return raiz
def _cabecalho_ginfes(self):
""" Retorna o XML do cabeçalho gerado pelo xsd"""
from pynfe.processamento.autorizador_nfse import SerializacaoGinfes
return SerializacaoGinfes().cabecalho()
def _get_url(self):
""" Retorna a url para comunicação com o webservice """
if self._ambiente == 1:
ambiente = 'HTTPS'
else:
ambiente = 'HOMOLOGACAO'
if self.autorizador in NFSE:
self.url = NFSE[self.autorizador][ambiente]
else:
raise Exception('Autorizador nao encontrado!')
return self.url
def _post(self, url, xml, metodo):
""" Comunicação wsdl (http) sem certificado digital """
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
cliente = Client(url)
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRps(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
return cliente.service.CancelarNfse(cabecalho, xml)
# TODO outros metodos
else:
raise Exception('Método não implementado no autorizador.')
except Exception as e:
raise e
def _post_https(self, url, xml, metodo):
""" Comunicação wsdl (https) utilizando certificado do usuário """
# cabecalho
cabecalho = self._cabecalho()
# comunicacao wsdl
try:
from suds.client import Client
from pynfe.utils.https_nfse import HttpAuthenticated
certificadoA1 = CertificadoA1(self.certificado)
chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True)
cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url))
# gerar nfse
if metodo == 'gerar':
return cliente.service.GerarNfse(cabecalho, xml)
elif metodo == 'enviar_lote':
return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml)
elif metodo == 'consulta':
return cliente.service.ConsultarNfseV3(cabecalho, xml)
elif metodo == 'consulta_lote':
return cliente.service.ConsultarLoteRpsV3(cabecalho, xml)
elif metodo == 'consulta_situacao_lote':
return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml)
elif metodo == 'consultaRps':
return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml)
elif metodo == 'consultaFaixa':
return cliente.service.ConsultarNfseFaixa(cabecalho, xml)
elif metodo == 'cancelar':
# versão 2
return cliente.service.CancelarNfse(xml)
# versão 3
# return cliente.service.CancelarNfseV3(cabecalho, xml)
# TODO outros metodos
else:
raise Exception('Método não implementado no autorizador.')
except Exception as e:
raise e

28
pynfe/processamento/resposta.py

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
import re
from pynfe.utils import etree
class RetornoSoap(object):
def __init__(self, webservice, retorno, resposta):
self.webservice = webservice
self.resposta = resposta
self.retorno = retorno
def analisar_retorno(webservice, retorno, classe_resposta):
# retorno.raise_for_status()
# print(retorno.text)
match = re.search('<soap:Body>(.*?)</soap:Body>', retorno.text)
if match:
resultado = etree.tostring(etree.fromstring(match.group(1))[0])
# classe_resposta.Validate_simpletypes_ = False
# resposta = classe_resposta.parseString(resultado)
resposta = resultado
# resposta = retorno.text
return RetornoSoap(webservice, retorno, resposta)

104
pynfe/processamento/serializacao.py

@ -1,9 +1,19 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from pynfe.entidades import NotaFiscal from pynfe.entidades import NotaFiscal
from pynfe.utils import etree, so_numeros, obter_municipio_por_codigo, \
obter_pais_por_codigo, obter_municipio_e_codigo, formatar_decimal, \
from pynfe.utils import (
etree, so_numeros, obter_municipio_por_codigo,
obter_pais_por_codigo, obter_municipio_e_codigo, formatar_decimal,
remover_acentos, obter_uf_por_codigo, obter_codigo_por_municipio remover_acentos, obter_uf_por_codigo, obter_codigo_por_municipio
from pynfe.utils.flags import CODIGOS_ESTADOS, VERSAO_PADRAO, NAMESPACE_NFE, NAMESPACE_SIG, VERSAO_QRCODE
)
from pynfe.utils.flags import (
CODIGOS_ESTADOS,
VERSAO_PADRAO,
VERSAO_MDFE,
NAMESPACE_NFE,
NAMESPACE_MDFE,
NAMESPACE_SIG,
VERSAO_QRCODE
)
from pynfe.utils.webservices import NFCE from pynfe.utils.webservices import NFCE
import base64 import base64
import hashlib import hashlib
@ -674,7 +684,6 @@ class SerializacaoXML(Serializacao):
raiz.append(self._serializar_responsavel_tecnico( raiz.append(self._serializar_responsavel_tecnico(
nota_fiscal.responsavel_tecnico[0], retorna_string=False)) nota_fiscal.responsavel_tecnico[0], retorna_string=False))
if retorna_string: if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True) return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else: else:
@ -712,6 +721,93 @@ class SerializacaoXML(Serializacao):
else: else:
return raiz return raiz
def serializar_evento_mdfe(self, evento, tag_raiz='eventoMDFe', retorna_string=False):
tz = datetime.now().astimezone().strftime('%z')
tz = "{}:{}".format(tz[:-2], tz[-2:])
raiz = etree.Element(tag_raiz, versao=VERSAO_MDFE, xmlns=NAMESPACE_MDFE)
e = etree.SubElement(raiz, 'infEvento', Id=evento.identificador)
etree.SubElement(e, 'cOrgao').text = CODIGOS_ESTADOS[evento.uf.upper()]
etree.SubElement(e, 'tpAmb').text = str(self._ambiente)
if len(so_numeros(evento.cnpj)) == 11:
etree.SubElement(e, 'CPF').text = evento.cnpj
else:
etree.SubElement(e, 'CNPJ').text = evento.cnpj
etree.SubElement(e, 'chMDFe').text = evento.chave
etree.SubElement(e, 'dhEvento').text = evento.data_emissao.strftime('%Y-%m-%dT%H:%M:%S') + tz
etree.SubElement(e, 'tpEvento').text = evento.tp_evento
etree.SubElement(e, 'nSeqEvento').text = str(evento.n_seq_evento)
det = etree.SubElement(e, 'detEvento', versaoEvento=VERSAO_MDFE)
if evento.descricao == 'Encerramento':
encerramento = etree.SubElement(det, 'evEncMDFe')
etree.SubElement(encerramento, 'descEvento').text = evento.descricao
etree.SubElement(encerramento, 'nProt').text = evento.protocolo
etree.SubElement(encerramento, 'dtEnc').text = evento.dtenc.strftime('%Y-%m-%d')
etree.SubElement(encerramento, 'cUF').text = evento.cuf
etree.SubElement(encerramento, 'cMun').text = evento.cmun
elif evento.descricao == 'Inclusão Condutor':
inclusao = etree.SubElement(det, 'evIncCondutorMDFe')
etree.SubElement(inclusao, 'descEvento').text = evento.descricao
condutor = etree.SubElement(inclusao, 'condutor')
etree.SubElement(condutor, 'xNome').text = evento.nome_motorista
etree.SubElement(condutor, 'CPF').text = evento.cpf_motorista
elif evento.descricao == 'Inclusao DF-e':
inclusao = etree.SubElement(det, 'evIncDFeMDFe')
etree.SubElement(inclusao, 'descEvento').text = evento.descricao
etree.SubElement(inclusao, 'nProt').text = evento.protocolo
etree.SubElement(inclusao, 'cMunCarrega').text = evento.cmun_carrega
etree.SubElement(inclusao, 'xMunCarrega').text = evento.xmun_carrega
infDoc = etree.SubElement(inclusao, 'infDoc')
etree.SubElement(infDoc, 'cMunDescarga').text = evento.cmun_descarga
etree.SubElement(infDoc, 'xMunDescarga').text = evento.xmun_descarga
etree.SubElement(infDoc, 'chNFe').text = evento.chave_nfe
elif evento.descricao == 'Pagamento Operacao MDF-e':
pagamento = etree.SubElement(det, 'evPagtoOperMDFe')
etree.SubElement(pagamento, 'descEvento').text = evento.descricao
etree.SubElement(pagamento, 'nProt').text = evento.protocolo
# Viagens
infViagens = etree.SubElement(pagamento, 'infViagens')
etree.SubElement(infViagens, 'qtdViagens').text = evento.qtd_viagens.zfill(5)
etree.SubElement(infViagens, 'nroViagem').text = evento.nro_viagens.zfill(5)
# Informações do pagamento
infPag = etree.SubElement(pagamento, 'infPag')
etree.SubElement(infPag, 'xNome').text = evento.nome_contratante
if len(evento.cpfcnpj_contratante) == 11:
etree.SubElement(infPag, 'CPF').text = evento.cpfcnpj_contratante
else:
etree.SubElement(infPag, 'CNPJ').text = evento.cpfcnpj_contratante
# Componentes de Pagamento do Frete
Comp = etree.SubElement(infPag, 'Comp')
etree.SubElement(Comp, 'tpComp').text = evento.tpComp.zfill(2)
etree.SubElement(Comp, 'vComp').text = '{:.2f}'.format(evento.vComp)
# Continuação das Informações do pagamento
etree.SubElement(infPag, 'vContrato').text = '{:.2f}'.format(evento.vContrato)
etree.SubElement(infPag, 'indPag').text = evento.indPag
# Se indPag == 1 (0=A vista e 1=A prazo)
if evento.indPag != '':
if int(evento.indPag) == 1:
infPrazo = etree.SubElement(infPag, 'infPrazo')
etree.SubElement(infPrazo, 'nParcela').text = evento.nParcela.zfill(3)
etree.SubElement(infPrazo, 'dVenc').text = evento.dVenc.strftime('%Y-%m-%d')
etree.SubElement(infPrazo, 'vParcela').text = '{:.2f}'.format(evento.vParcela)
# Informações bancárias
infBanc = etree.SubElement(infPag, 'infBanc')
if evento.CNPJIPEF != '':
etree.SubElement(infBanc, 'CNPJIPEF').text = evento.CNPJIPEF.zfill(14)
else:
etree.SubElement(infBanc, 'codBanco').text = evento.codBanco
etree.SubElement(infBanc, 'codAgencia').text = evento.codAgencia
if retorna_string:
return etree.tostring(raiz, encoding="unicode", pretty_print=True)
else:
return raiz
class SerializacaoQrcode(object): class SerializacaoQrcode(object):
""" Classe que gera e serializa o qrcode de NFC-e no xml """ """ Classe que gera e serializa o qrcode de NFC-e no xml """

14
pynfe/utils/__init__.py

@ -3,13 +3,17 @@
import os import os
import codecs import codecs
from unicodedata import normalize from unicodedata import normalize
import re
try: try:
from lxml import etree from lxml import etree
except ImportError: except ImportError:
raise Exception('Falhou ao importar lxml/ElementTree') raise Exception('Falhou ao importar lxml/ElementTree')
from io import StringIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
try: try:
from . import flags from . import flags
@ -146,3 +150,11 @@ def obter_uf_por_codigo(codigo_uf):
def remover_acentos(txt): def remover_acentos(txt):
return normalize('NFKD', txt).encode('ASCII','ignore').decode('ASCII') return normalize('NFKD', txt).encode('ASCII','ignore').decode('ASCII')
def extrai_id_srtxml(edoc):
result = ''
match = re.search('Id=[^0-9]+(\d+)"', edoc)
if match:
result = match.group(1)
return result

5
pynfe/utils/flags.py

@ -10,6 +10,11 @@ NAMESPACE_METODO = 'http://www.portalfiscal.inf.br/nfe/wsdl/'
NAMESPACE_SOAP_NFSE = 'http://schemas.xmlsoap.org/soap/envelope/' NAMESPACE_SOAP_NFSE = 'http://schemas.xmlsoap.org/soap/envelope/'
NAMESPACE_BETHA = 'http://www.betha.com.br/e-nota-contribuinte-ws' NAMESPACE_BETHA = 'http://www.betha.com.br/e-nota-contribuinte-ws'
NAMESPACE_MDFE = 'http://www.portalfiscal.inf.br/mdfe'
NAMESPACE_MDFE_METODO = 'http://www.portalfiscal.inf.br/mdfe/wsdl/'
MODELO_MDFE = '58'
VERSAO_MDFE = '3.00'
VERSAO_PADRAO = '4.00' VERSAO_PADRAO = '4.00'
VERSAO_QRCODE = '2' VERSAO_QRCODE = '2'

15
pynfe/utils/webservices.py

@ -498,3 +498,18 @@ NFSE = {
'HOMOLOGACAO':'https://homologacao.ginfes.com.br/ServiceGinfesImpl?wsdl' 'HOMOLOGACAO':'https://homologacao.ginfes.com.br/ServiceGinfesImpl?wsdl'
} }
} }
# MDF-e
MDFE = {
# unico autorizador de MDF-e
'SVRS': {
'RECEPCAO': 'mdferecepcao/MDFeRecepcao.asmx',
'RET_RECEPCAO': 'mdferetrecepcao/MDFeRetRecepcao.asmx',
'EVENTOS': 'mdferecepcaoevento/MDFeRecepcaoEvento.asmx',
'CONSULTA': 'mdfeconsulta/MDFeConsulta.asmx',
'STATUS': 'mdfestatusservico/MDFeStatusServico.asmx',
'NAO_ENCERRADOS': 'mdfeconsnaoenc/MDFeConsNaoEnc.asmx',
'HTTPS': 'https://mdfe.svrs.rs.gov.br/ws/',
'HOMOLOGACAO': 'https://mdfe-homologacao.svrs.rs.gov.br/ws/'
}
}
Loading…
Cancel
Save