Browse Source
Merge pull request #79 from leogregianin/new_mdfe
Merge pull request #79 from leogregianin/new_mdfe
Comunicação com o webservice do MDF-enew_mdfe
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 2340 additions and 666 deletions
-
3.gitignore
-
2MANIFEST.in
-
2pynfe/entidades/__init__.py
-
16pynfe/entidades/certificado.py
-
109pynfe/entidades/evento.py
-
444pynfe/entidades/manifesto.py
-
34pynfe/entidades/notafiscal.py
-
5pynfe/processamento/__init__.py
-
647pynfe/processamento/comunicacao.py
-
286pynfe/processamento/mdfe.py
-
439pynfe/processamento/nfe.py
-
234pynfe/processamento/nfse.py
-
28pynfe/processamento/resposta.py
-
668pynfe/processamento/serializacao.py
-
12pynfe/utils/__init__.py
-
16pynfe/utils/flags.py
-
47pynfe/utils/webservices.py
-
2requirements-nfse.txt
-
8setup.py
@ -0,0 +1,2 @@ |
|||||
|
graft tests |
||||
|
include AUTHORS PLANEJAMENTO LICENCE *.py *.sh |
||||
@ -0,0 +1,444 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
import random |
||||
|
|
||||
|
from .base import Entidade |
||||
|
from pynfe import get_version |
||||
|
from pynfe.utils.flags import MDFE_STATUS, CODIGO_BRASIL, CODIGOS_ESTADOS |
||||
|
|
||||
|
from pynfe.utils import so_numeros |
||||
|
|
||||
|
from decimal import Decimal |
||||
|
|
||||
|
|
||||
|
class Manifesto(Entidade): |
||||
|
status = MDFE_STATUS[0] |
||||
|
|
||||
|
# - UF - converter para codigos em CODIGOS_ESTADOS |
||||
|
uf = str() |
||||
|
|
||||
|
# tpAmb |
||||
|
|
||||
|
# - Tipo Emitente |
||||
|
# 1=Transportadora; 2=Carga própria; 3=CTe Globalizado |
||||
|
tipo_emitente = int() |
||||
|
|
||||
|
# - Tipo transportador - 0=nenhum; 1=etc; 2=tac; 3=ctc |
||||
|
tipo_transportador = int() |
||||
|
|
||||
|
# Manifesto fixo 58 |
||||
|
# - Modelo (formato: NN) |
||||
|
modelo = 58 |
||||
|
|
||||
|
# - Serie (obrigatorio - formato: NNN) |
||||
|
serie = str() |
||||
|
|
||||
|
# - Numero MDFe (obrigatorio) |
||||
|
numero_mdfe = str() |
||||
|
|
||||
|
# - Código numérico aleatório que compõe a chave de acesso |
||||
|
codigo_numerico_aleatorio = str() |
||||
|
|
||||
|
# - Digito verificador do codigo numerico aleatorio |
||||
|
dv_codigo_numerico_aleatorio = str() |
||||
|
|
||||
|
# - Tipo do modal de transporte |
||||
|
# 1=Rodoviario; 2=Aereo; 3=Aquaviario; 4=Ferroviario |
||||
|
modal = 1 |
||||
|
|
||||
|
# - Data da Emissao (obrigatorio) |
||||
|
data_emissao = None |
||||
|
|
||||
|
# - Forma de emissao (obrigatorio - seleciona de lista) - NF_FORMAS_EMISSAO |
||||
|
forma_emissao = str() |
||||
|
|
||||
|
# - Processo de emissão da NF-e (obrigatorio - seleciona de lista) - NF_PROCESSOS_EMISSAO |
||||
|
processo_emissao = 0 |
||||
|
|
||||
|
# - Versao do processo de emissão do MDF-e |
||||
|
versao_processo_emissao = get_version() |
||||
|
|
||||
|
# - UF inicio. Exemplo SP, MT, PR |
||||
|
UFIni = str() |
||||
|
|
||||
|
# - UF final. Exemplo SP, MT, PR |
||||
|
UFFim = str() |
||||
|
|
||||
|
# - Digest value da NF-e (somente leitura) |
||||
|
digest_value = None |
||||
|
|
||||
|
# - Protocolo (somente leitura) |
||||
|
protocolo = str() |
||||
|
|
||||
|
# - Data (somente leitura) |
||||
|
data = None |
||||
|
|
||||
|
# - Municípios carregamento (lista 1 para * / ManyToManyField) |
||||
|
municipio_carrega = None |
||||
|
|
||||
|
# - Percurso da viagem (lista 1 para * / ManyToManyField) |
||||
|
percurso = None |
||||
|
|
||||
|
# Data inicial da viagem |
||||
|
dhIniViagem = None |
||||
|
|
||||
|
# - Emitente (lista 1 para * / ManyToManyField) |
||||
|
emitente = None |
||||
|
|
||||
|
# - Modal rodoviario (lista 1 para * / ManyToManyField) |
||||
|
modal_rodoviario = None |
||||
|
|
||||
|
# - Documentos vinculados NFe ou CTe (lista 1 para * / ManyToManyField) |
||||
|
documentos = None |
||||
|
|
||||
|
# - Seguradora (lista 1 para * / ManyToManyField) |
||||
|
seguradora = None |
||||
|
|
||||
|
# - Produto predominante |
||||
|
produto = None |
||||
|
|
||||
|
# - Resumo dos Totais do MDF-e |
||||
|
totais = None |
||||
|
|
||||
|
# - Lacres |
||||
|
lacres = None |
||||
|
|
||||
|
# - Informacoes Adicionais |
||||
|
# - Informacoes adicionais de interesse do fisco |
||||
|
informacoes_adicionais_interesse_fisco = str() |
||||
|
|
||||
|
# - Informacoes complementares de interesse do contribuinte |
||||
|
informacoes_complementares_interesse_contribuinte = str() |
||||
|
|
||||
|
def __init__(self, *args, **kwargs): |
||||
|
self.municipio_carrega = [] |
||||
|
self.percurso = [] |
||||
|
self.modal_rodoviario = [] |
||||
|
self.documentos = [] |
||||
|
self.seguradora = [] |
||||
|
self.produto = [] |
||||
|
self.lacres = [] |
||||
|
self.responsavel_tecnico = [] |
||||
|
super(Manifesto, self).__init__(*args, **kwargs) |
||||
|
|
||||
|
def __str__(self): |
||||
|
return ' '.join([str(self.modelo), self.serie, self.numero_mdfe]) |
||||
|
|
||||
|
def adicionar_municipio_carrega(self, **kwargs): |
||||
|
obj = ManifestoMunicipioCarrega(**kwargs) |
||||
|
self.municipio_carrega.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def adicionar_percurso(self, **kwargs): |
||||
|
obj = ManifestoPercurso(**kwargs) |
||||
|
self.percurso.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def adicionar_modal_rodoviario(self, **kwargs): |
||||
|
obj = ManifestoRodoviario(**kwargs) |
||||
|
self.modal_rodoviario.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def adicionar_documentos(self, **kwargs): |
||||
|
obj = ManifestoDocumentos(**kwargs) |
||||
|
self.documentos.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def adicionar_seguradora(self, **kwargs): |
||||
|
obj = ManifestoSeguradora(**kwargs) |
||||
|
self.seguradora.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def adicionar_produto(self, **kwargs): |
||||
|
obj = ManifestoProduto(**kwargs) |
||||
|
self.produto.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def adicionar_totais(self, **kwargs): |
||||
|
obj = ManifestoTotais(**kwargs) |
||||
|
self.totais.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def adicionar_lacres(self, **kwargs): |
||||
|
obj = ManifestoLacres(**kwargs) |
||||
|
self.lacres.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def adicionar_responsavel_tecnico(self, **kwargs): |
||||
|
""" Adiciona uma instancia de Responsavel Tecnico """ |
||||
|
obj = ManifestoResponsavelTecnico(**kwargs) |
||||
|
self.responsavel_tecnico.append(obj) |
||||
|
return obj |
||||
|
|
||||
|
def _codigo_numerico_aleatorio(self): |
||||
|
self.codigo_numerico_aleatorio = str(random.randint(0, 99999999)).zfill(8) |
||||
|
return self.codigo_numerico_aleatorio |
||||
|
|
||||
|
def _dv_codigo_numerico(self, key): |
||||
|
assert len(key) == 43 |
||||
|
|
||||
|
weights = [2, 3, 4, 5, 6, 7, 8, 9] |
||||
|
weights_size = len(weights) |
||||
|
key_numbers = [int(k) for k in key] |
||||
|
key_numbers.reverse() |
||||
|
|
||||
|
key_sum = 0 |
||||
|
for i, key_number in enumerate(key_numbers): |
||||
|
# cycle though weights |
||||
|
i = i % weights_size |
||||
|
key_sum += key_number * weights[i] |
||||
|
|
||||
|
remainder = key_sum % 11 |
||||
|
if remainder == 0 or remainder == 1: |
||||
|
self.dv_codigo_numerico_aleatorio = '0' |
||||
|
return '0' |
||||
|
self.dv_codigo_numerico_aleatorio = str(11 - remainder) |
||||
|
return str(self.dv_codigo_numerico_aleatorio) |
||||
|
|
||||
|
@property |
||||
|
# @memoize |
||||
|
def identificador_unico(self): |
||||
|
# Monta 'Id' da tag raiz <infMDFe> |
||||
|
# Ex.: MDFe35080599999090910270580010000000011518005123 |
||||
|
key = "%(uf)s%(ano)s%(mes)s%(cnpj)s%(mod)s%(serie)s%(nMDF)s%(tpEmis)s%(cMDF)s"%{ |
||||
|
'uf': CODIGOS_ESTADOS[self.uf], |
||||
|
'ano': self.data_emissao.strftime('%y'), |
||||
|
'mes': self.data_emissao.strftime('%m'), |
||||
|
'cnpj': so_numeros(self.emitente.cpfcnpj).zfill(14), |
||||
|
'mod': self.modelo, |
||||
|
'serie': str(self.serie).zfill(3), |
||||
|
'nMDF': str(self.numero_mdfe).zfill(9), |
||||
|
'tpEmis': str(self.forma_emissao), |
||||
|
'cMDF': self._codigo_numerico_aleatorio(), |
||||
|
} |
||||
|
return "MDFe%(uf)s%(ano)s%(mes)s%(cnpj)s%(mod)s%(serie)s%(nMDF)s%(tpEmis)s%(cMDF)s%(cDV)s"%{ |
||||
|
'uf': CODIGOS_ESTADOS[self.uf], |
||||
|
'ano': self.data_emissao.strftime('%y'), |
||||
|
'mes': self.data_emissao.strftime('%m'), |
||||
|
'cnpj': so_numeros(self.emitente.cpfcnpj).zfill(14), |
||||
|
'mod': self.modelo, |
||||
|
'serie': str(self.serie).zfill(3), |
||||
|
'nMDF': str(self.numero_mdfe).zfill(9), |
||||
|
'tpEmis': str(self.forma_emissao), |
||||
|
'cMDF': str(self.codigo_numerico_aleatorio), |
||||
|
'cDV': self._dv_codigo_numerico(key), |
||||
|
} |
||||
|
|
||||
|
|
||||
|
class ManifestoMunicipioCarrega(Entidade): |
||||
|
# - Codigo municipio |
||||
|
cMunCarrega = str() |
||||
|
|
||||
|
# - Nome do municipio |
||||
|
xMunCarrega = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoPercurso(Entidade): |
||||
|
# - Nome da UF (2 digitos) |
||||
|
UFPer = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoRodoviario(Entidade): |
||||
|
rntrc = str() |
||||
|
ciot = None |
||||
|
pedagio = None |
||||
|
contratante = None |
||||
|
pagamento = None |
||||
|
veiculo_tracao = None |
||||
|
veiculo_reboque = None |
||||
|
|
||||
|
|
||||
|
class ManifestoCIOT(Entidade): |
||||
|
numero_ciot = str() |
||||
|
cpfcnpj = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoPedagio(Entidade): |
||||
|
cnpj_fornecedor = str() |
||||
|
cpfcnpj_pagador = str() |
||||
|
numero_compra = str() |
||||
|
valor_pedagio = Decimal() |
||||
|
|
||||
|
|
||||
|
class ManifestoContratante(Entidade): |
||||
|
nome = str() |
||||
|
cpfcnpj = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoVeiculoTracao(Entidade): |
||||
|
cInt = str() |
||||
|
placa = str() |
||||
|
RENAVAM = str() |
||||
|
tara = str() |
||||
|
capKG = str() |
||||
|
capM3 = str() |
||||
|
proprietario = None |
||||
|
condutor = None |
||||
|
tpRod = str() |
||||
|
tpCar = str() |
||||
|
UF = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoVeiculoReboque(Entidade): |
||||
|
cInt = str() |
||||
|
placa = str() |
||||
|
RENAVAM = str() |
||||
|
tara = str() |
||||
|
capKG = str() |
||||
|
capM3 = str() |
||||
|
proprietario = None |
||||
|
tpCar = str() |
||||
|
UF = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoCondutor(Entidade): |
||||
|
nome_motorista = str() |
||||
|
cpf_motorista = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoDocumentos(Entidade): |
||||
|
|
||||
|
# Código do municipio de descarga |
||||
|
cMunDescarga = str() |
||||
|
# Nome do municipio de descarga |
||||
|
xMunDescarga = str() |
||||
|
|
||||
|
# Documentos vinculados |
||||
|
documentos_nfe = None |
||||
|
documentos_cte = None |
||||
|
|
||||
|
|
||||
|
class ManifestoDocumentosNFe(Entidade): |
||||
|
chave_acesso_nfe = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoDocumentosCTe(Entidade): |
||||
|
chave_acesso_cte = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoSeguradora(Entidade): |
||||
|
|
||||
|
# infResp - Responsavel seguro |
||||
|
# 1=Emitente; 2=Tomador |
||||
|
responsavel_seguro = str() |
||||
|
# - CNPJ do responsavel |
||||
|
cnpj_responsavel = str() |
||||
|
|
||||
|
# infSeg - Seguradora |
||||
|
# - Nome da seguradora |
||||
|
nome_seguradora = str() |
||||
|
# - CNPJ seguradora |
||||
|
cnpj_seguradora = str() |
||||
|
|
||||
|
# Apolice do Seguro |
||||
|
numero_apolice = str() |
||||
|
|
||||
|
# Lista de Averbacoes |
||||
|
averbacoes = None |
||||
|
|
||||
|
|
||||
|
class ManifestoAverbacao(Entidade): |
||||
|
# Numero da Averbacao |
||||
|
numero = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoProduto(Entidade): |
||||
|
|
||||
|
# Tipo de carga |
||||
|
# 01=GranelSolido |
||||
|
# 02=GranelLiquido |
||||
|
# 03=Frigorificada |
||||
|
# 04=Conteinerizada |
||||
|
# 05=CargaGeral |
||||
|
# 06=Neogranel |
||||
|
# 07=PerigosaGranelSolido |
||||
|
# 08=PerigosaGranelLiquido |
||||
|
# 09=PerigosaCargaFrigorificada |
||||
|
# 10=PerigosaConteinerizada |
||||
|
# 11=PerigosaCargaGeral |
||||
|
tipo_carga = str() |
||||
|
|
||||
|
nome_produto = str() |
||||
|
cean = str() |
||||
|
ncm = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoEmitente(Entidade): |
||||
|
# Dados do Emitente |
||||
|
|
||||
|
# - CPF ou CNPJ (obrigatorio) |
||||
|
cpfcnpj = str() |
||||
|
|
||||
|
# - Inscricao Estadual (obrigatorio) |
||||
|
inscricao_estadual = str() |
||||
|
|
||||
|
# - Nome/Razao Social (obrigatorio) |
||||
|
razao_social = str() |
||||
|
|
||||
|
# - Nome Fantasia |
||||
|
nome_fantasia = str() |
||||
|
|
||||
|
# Endereco |
||||
|
# - Logradouro (obrigatorio) |
||||
|
endereco_logradouro = str() |
||||
|
|
||||
|
# - Numero (obrigatorio) |
||||
|
endereco_numero = str() |
||||
|
|
||||
|
# - Complemento |
||||
|
endereco_complemento = str() |
||||
|
|
||||
|
# - Bairro (obrigatorio) |
||||
|
endereco_bairro = str() |
||||
|
|
||||
|
# - Codigo Municipio (opt) |
||||
|
endereco_cod_municipio = str() |
||||
|
|
||||
|
# - Municipio (obrigatorio) |
||||
|
endereco_municipio = str() |
||||
|
|
||||
|
# - CEP |
||||
|
endereco_cep = str() |
||||
|
|
||||
|
# - UF (obrigatorio) |
||||
|
endereco_uf = str() |
||||
|
|
||||
|
# - Telefone |
||||
|
endereco_telefone = str() |
||||
|
|
||||
|
# - Email |
||||
|
endereco_email = str() |
||||
|
|
||||
|
def __str__(self): |
||||
|
return self.cpfcnpj |
||||
|
|
||||
|
|
||||
|
class ManifestoTotais(Entidade): |
||||
|
|
||||
|
# Quantidade total de CT-e relacionados no Manifesto |
||||
|
qCTe = int() |
||||
|
|
||||
|
# Quantidade total de NF-e relacionadas no Manifesto |
||||
|
qNFe = int() |
||||
|
|
||||
|
# Valor total da carga / mercadorias transportadas |
||||
|
vCarga = Decimal() |
||||
|
|
||||
|
# - Código da unidade de medida do Peso Bruto da Carga / Mercadorias transportadas |
||||
|
# Unidades: 01 – KG; 02 - TON |
||||
|
cUnid = str() |
||||
|
|
||||
|
# - Peso Bruto Total da Carga / Mercadorias transportadas |
||||
|
qCarga = Decimal() |
||||
|
|
||||
|
|
||||
|
class ManifestoLacres(Entidade): |
||||
|
nLacre = str() |
||||
|
|
||||
|
|
||||
|
class ManifestoResponsavelTecnico(Entidade): |
||||
|
# NT 2018/003 |
||||
|
cnpj = str() |
||||
|
contato = str() |
||||
|
email = str() |
||||
|
fone = str() |
||||
|
csrt = str() |
||||
@ -0,0 +1,286 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
import time |
||||
|
import re |
||||
|
import requests |
||||
|
from io import StringIO |
||||
|
import base64 |
||||
|
|
||||
|
from pynfe.utils.flags import ( |
||||
|
NAMESPACE_MDFE, |
||||
|
MODELO_MDFE, |
||||
|
VERSAO_MDFE, |
||||
|
NAMESPACE_MDFE_METODO, |
||||
|
NAMESPACE_SOAP, |
||||
|
NAMESPACE_XSI, |
||||
|
NAMESPACE_XSD, |
||||
|
CODIGOS_ESTADOS |
||||
|
) |
||||
|
from pynfe.utils.webservices import MDFE |
||||
|
from pynfe.entidades.certificado import CertificadoA1 |
||||
|
from pynfe.utils import etree, extrai_id_srtxml |
||||
|
from .comunicacao import Comunicacao |
||||
|
from .resposta import analisar_retorno |
||||
|
|
||||
|
MDFE_SITUACAO_JA_ENVIADO = ('100', '101', '132') |
||||
|
|
||||
|
|
||||
|
class ComunicacaoMDFe(Comunicacao): |
||||
|
|
||||
|
_modelo = MODELO_MDFE |
||||
|
_namespace = NAMESPACE_MDFE |
||||
|
_versao = VERSAO_MDFE |
||||
|
_header = 'mdfeCabecMsg' |
||||
|
_envio_mensagem = 'mdfeDadosMsg' |
||||
|
_retorno_mensagem = 'mdfeRecepcaoResult' |
||||
|
_namespace_metodo = NAMESPACE_MDFE_METODO |
||||
|
|
||||
|
_accept = True |
||||
|
_soap_action = False |
||||
|
_namespace_soap = NAMESPACE_SOAP |
||||
|
_namespace_xsi = NAMESPACE_XSI |
||||
|
_namespace_xsd = NAMESPACE_XSD |
||||
|
_soap_version = 'soap12' |
||||
|
_edoc_situacao_ja_enviado = MDFE_SITUACAO_JA_ENVIADO |
||||
|
_edoc_situacao_arquivo_recebido_com_sucesso = '103' |
||||
|
_edoc_situacao_lote_processado = '104' |
||||
|
_edoc_situacao_em_processamento = '105' |
||||
|
_edoc_situacao_servico_em_operacao = '107' |
||||
|
|
||||
|
consulta_servico_ao_enviar = True |
||||
|
maximo_tentativas_consulta_recibo = 5 |
||||
|
|
||||
|
def autorizacao(self, manifesto, id_lote=1, ind_sinc=1): |
||||
|
""" |
||||
|
Método para realizar autorização do manifesto |
||||
|
:param manifesto: XML assinado |
||||
|
:param id_lote: Id do lote - numero autoincremental gerado pelo sistema |
||||
|
:param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono |
||||
|
:return: Uma tupla que em caso de sucesso, retorna xml com manifesto e protocolo de autorização. |
||||
|
Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário. |
||||
|
""" |
||||
|
# url do serviço |
||||
|
if ind_sinc == 0: |
||||
|
url = self._get_url(consulta='RECEPCAO') |
||||
|
elif ind_sinc == 1: |
||||
|
url = self._get_url(consulta='RECEPCAO_SINC') |
||||
|
else: |
||||
|
raise f'ind_sinc deve ser 0=assincrono ou 1=sincrono' |
||||
|
|
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('enviMDFe', xmlns=NAMESPACE_MDFE, versao=VERSAO_MDFE) |
||||
|
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema |
||||
|
raiz.append(manifesto) |
||||
|
|
||||
|
# Monta XML para envio da requisição |
||||
|
if ind_sinc == 0: |
||||
|
xml = self._construir_xml_soap('MDFeRecepcao', raiz) |
||||
|
elif ind_sinc == 1: |
||||
|
xml = self._construir_xml_soap('MDFeRecepcaoSinc', raiz) |
||||
|
|
||||
|
# Faz request no Servidor da Sefaz |
||||
|
retorno = self._post(url, xml) |
||||
|
|
||||
|
# Em caso de sucesso, retorna xml com o mdfe e protocolo de autorização. |
||||
|
# Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário. |
||||
|
if retorno.status_code == 200: |
||||
|
# namespace |
||||
|
ns = {'ns': NAMESPACE_MDFE} |
||||
|
# Procuta status no xml |
||||
|
try: |
||||
|
prot = etree.fromstring(retorno.text) |
||||
|
except ValueError: |
||||
|
# em SP retorno.text apresenta erro |
||||
|
prot = etree.fromstring(retorno.content) |
||||
|
|
||||
|
if ind_sinc == 1: |
||||
|
try: |
||||
|
# Protocolo com envio OK |
||||
|
inf_prot = prot[1][0] |
||||
|
lote_status = inf_prot.xpath("ns:retEnviMDFe/ns:cStat", namespaces=ns)[0].text |
||||
|
|
||||
|
# Lote processado |
||||
|
if lote_status == self._edoc_situacao_lote_processado: |
||||
|
prot_mdfe = inf_prot.xpath("ns:retEnviMDFe/ns:protMDFe", namespaces=ns)[0] |
||||
|
status = prot_mdfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text |
||||
|
|
||||
|
# autorizado uso do MDF-e |
||||
|
# retorna xml final (protMDFe + MDFe) |
||||
|
if status in self._edoc_situacao_ja_enviado: # if status == '100': |
||||
|
raiz = etree.Element('mdfeProc', xmlns=NAMESPACE_MDFE, versao=VERSAO_MDFE) |
||||
|
raiz.append(manifesto) |
||||
|
raiz.append(prot_mdfe) |
||||
|
return 0, raiz |
||||
|
except IndexError: |
||||
|
# Protocolo com algum erro no Envio |
||||
|
return 1, retorno, manifesto |
||||
|
else: |
||||
|
# Retorna id do protocolo para posterior consulta em caso de sucesso. |
||||
|
rec = prot[1][0] |
||||
|
status = rec.xpath("ns:retEnviMDFe/ns:cStat", namespaces=ns)[0].text |
||||
|
# Lote Recebido com Sucesso! |
||||
|
if status == self._edoc_situacao_arquivo_recebido_com_sucesso: |
||||
|
nrec = rec.xpath("ns:retEnviMDFe/ns:infRec/ns:nRec", namespaces=ns)[0].text |
||||
|
return 0, nrec, manifesto |
||||
|
return 1, retorno, manifesto |
||||
|
|
||||
|
def status_servico(self): |
||||
|
url = self._get_url('STATUS') |
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('consStatServMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE) |
||||
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
||||
|
etree.SubElement(raiz, 'xServ').text = 'STATUS' |
||||
|
xml = self._construir_xml_soap('MDFeStatusServico', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def consulta(self, chave): |
||||
|
url = self._get_url('CONSULTA') |
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('consSitMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE) |
||||
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
||||
|
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR' |
||||
|
etree.SubElement(raiz, 'chMDFe').text = chave |
||||
|
# Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('MDFeConsulta', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def consulta_nao_encerrados(self, cpfcnpj): |
||||
|
url = self._get_url('NAO_ENCERRADOS') |
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('consMDFeNaoEnc', xmlns=NAMESPACE_MDFE, versao=self._versao) |
||||
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
||||
|
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR NÃO ENCERRADOS' |
||||
|
if len(cpfcnpj) == 11: |
||||
|
etree.SubElement(raiz, 'CPF').text = cpfcnpj.zfill(11) |
||||
|
else: |
||||
|
etree.SubElement(raiz, 'CNPJ').text = cpfcnpj.zfill(14) |
||||
|
# Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('MDFeConsNaoEnc', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def consulta_recibo(self, numero): |
||||
|
url = self._get_url('RET_RECEPCAO') |
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('consReciMDFe', versao=self._versao, xmlns=NAMESPACE_MDFE) |
||||
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
||||
|
etree.SubElement(raiz, 'nRec').text = numero.zfill(15) |
||||
|
# Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('MDFeRetRecepcao', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def evento(self, evento): |
||||
|
""" |
||||
|
Envia eventos do MDFe como: |
||||
|
Encerramento |
||||
|
Cancelamento |
||||
|
Inclusao Condutor |
||||
|
Inclusao DF-e |
||||
|
Pagamento Operacao MDF-e |
||||
|
:param evento: Nome do Evento |
||||
|
:return: |
||||
|
""" |
||||
|
# url do serviço |
||||
|
url = self._get_url('EVENTOS') |
||||
|
# Monta XML do corpo da requisição |
||||
|
xml = self._construir_xml_soap('MDFeRecepcaoEvento', evento) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def _construir_xml_soap(self, metodo, dados): |
||||
|
"""Mota o XML para o envio via SOAP""" |
||||
|
|
||||
|
raiz = etree.Element( |
||||
|
'{%s}Envelope' % NAMESPACE_SOAP, |
||||
|
nsmap={ |
||||
|
'xsi': self._namespace_xsi, |
||||
|
'xsd': self._namespace_xsd, |
||||
|
self._soap_version: self._namespace_soap |
||||
|
} |
||||
|
) |
||||
|
|
||||
|
if self._header: |
||||
|
cabecalho = self._cabecalho_soap(metodo) |
||||
|
c = etree.SubElement(raiz, '{%s}Header' % self._namespace_soap) |
||||
|
c.append(cabecalho) |
||||
|
|
||||
|
body = etree.SubElement(raiz, '{%s}Body' % self._namespace_soap) |
||||
|
|
||||
|
a = etree.SubElement( |
||||
|
body, |
||||
|
self._envio_mensagem, |
||||
|
xmlns=self._namespace_metodo+metodo |
||||
|
) |
||||
|
|
||||
|
# if metodo == 'MDFeRecepcaoSinc': |
||||
|
# body_base64 = base64.b16encode(a).decode() |
||||
|
|
||||
|
a.append(dados) |
||||
|
return raiz |
||||
|
|
||||
|
def _post_header(self, soap_webservice_method=False): |
||||
|
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" |
||||
|
header = { |
||||
|
b'content-type': b'text/xml; charset=utf-8;', |
||||
|
} |
||||
|
|
||||
|
# PE é a únca UF que exige SOAPAction no header |
||||
|
if soap_webservice_method: |
||||
|
header[b'SOAPAction'] = \ |
||||
|
(self._namespace_metodo + soap_webservice_method).encode('utf-8') |
||||
|
|
||||
|
if self._accept: |
||||
|
header[b'Accept'] = b'application/soap+xml; charset=utf-8;' |
||||
|
|
||||
|
return header |
||||
|
|
||||
|
def _post(self, url, xml): |
||||
|
certificado_a1 = CertificadoA1(self.certificado) |
||||
|
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True) |
||||
|
chave_cert = (cert, chave) |
||||
|
# Abre a conexão HTTPS |
||||
|
try: |
||||
|
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>' |
||||
|
|
||||
|
# limpa xml com caracteres bugados para infMDFeSupl em NFC-e |
||||
|
xml = re.sub( |
||||
|
'<qrCodMDFe>(.*?)</qrCodMDFe>', |
||||
|
lambda x: x.group(0).replace('<', '<').replace('>', '>').replace('amp;', ''), |
||||
|
etree.tostring(xml, encoding='unicode').replace('\n', '') |
||||
|
) |
||||
|
xml = xml_declaration + xml |
||||
|
xml = xml.encode('utf8') # necessário para o evento "CONSULTAR NÃO ENCERRADOS" |
||||
|
|
||||
|
# Faz o request com o servidor |
||||
|
result = requests.post( |
||||
|
url, |
||||
|
xml, |
||||
|
headers=self._post_header(), |
||||
|
cert=chave_cert, |
||||
|
verify=False |
||||
|
) |
||||
|
result.encoding = 'utf-8' |
||||
|
return result |
||||
|
except requests.exceptions.RequestException as e: |
||||
|
raise e |
||||
|
finally: |
||||
|
certificado_a1.excluir() |
||||
|
|
||||
|
def _cabecalho_soap(self, metodo): |
||||
|
"""Monta o XML do cabeçalho da requisição SOAP""" |
||||
|
|
||||
|
raiz = etree.Element( |
||||
|
self._header, |
||||
|
xmlns=self._namespace_metodo + metodo |
||||
|
) |
||||
|
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] |
||||
|
etree.SubElement(raiz, 'versaoDados').text = '3.00' |
||||
|
return raiz |
||||
|
|
||||
|
def _get_url(self, consulta): |
||||
|
# producao |
||||
|
if self._ambiente == 1: |
||||
|
ambiente = MDFE['SVRS']['HTTPS'] |
||||
|
# homologacao |
||||
|
else: |
||||
|
ambiente = MDFE['SVRS']['HOMOLOGACAO'] |
||||
|
|
||||
|
self.url = ambiente + MDFE['SVRS'][consulta] |
||||
|
return self.url |
||||
@ -0,0 +1,439 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
import datetime |
||||
|
import re |
||||
|
import requests |
||||
|
|
||||
|
from pynfe.utils import etree, so_numeros |
||||
|
from pynfe.utils.flags import ( |
||||
|
NAMESPACE_NFE, |
||||
|
VERSAO_PADRAO, |
||||
|
CODIGOS_ESTADOS, |
||||
|
NAMESPACE_METODO, |
||||
|
NAMESPACE_SOAP, |
||||
|
NAMESPACE_XSI, |
||||
|
NAMESPACE_XSD, |
||||
|
) |
||||
|
|
||||
|
from pynfe.entidades.certificado import CertificadoA1 |
||||
|
from pynfe.utils.webservices import NFE, NFCE |
||||
|
from .assinatura import AssinaturaA1 |
||||
|
from .comunicacao import Comunicacao |
||||
|
|
||||
|
|
||||
|
class ComunicacaoNFe(Comunicacao): |
||||
|
"""Classe de comunicação que segue o padrão definido para as SEFAZ dos Estados.""" |
||||
|
|
||||
|
_versao = VERSAO_PADRAO |
||||
|
_assinatura = AssinaturaA1 |
||||
|
_namespace = NAMESPACE_NFE |
||||
|
_header = False |
||||
|
_envio_mensagem = 'nfeDadosMsg' |
||||
|
_namespace_metodo = NAMESPACE_METODO |
||||
|
_accept = False |
||||
|
_namespace_soap = NAMESPACE_SOAP |
||||
|
_namespace_xsi = NAMESPACE_XSI |
||||
|
_namespace_xsd = NAMESPACE_XSD |
||||
|
_soap_version = 'soap' |
||||
|
|
||||
|
def autorizacao(self, modelo, nota_fiscal, id_lote=1, ind_sinc=1): |
||||
|
""" |
||||
|
Método para realizar autorização da nota de acordo com o modelo |
||||
|
:param modelo: Modelo |
||||
|
:param nota_fiscal: XML assinado |
||||
|
:param id_lote: Id do lote - numero autoincremental gerado pelo sistema |
||||
|
:param ind_sinc: Indicador de sincrono e assincrono, 0 para assincrono, 1 para sincrono |
||||
|
:return: Uma tupla que em caso de sucesso, retorna xml com nfe e protocolo de autorização. Caso contrário, |
||||
|
envia todo o soap de resposta da Sefaz para decisão do usuário. |
||||
|
""" |
||||
|
# url do serviço |
||||
|
url = self._get_url(modelo=modelo, consulta='AUTORIZACAO') |
||||
|
|
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('enviNFe', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO) |
||||
|
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema |
||||
|
etree.SubElement(raiz, 'indSinc').text = str(ind_sinc) # 0 para assincrono, 1 para sincrono |
||||
|
raiz.append(nota_fiscal) |
||||
|
|
||||
|
# Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('NFeAutorizacao4', raiz) |
||||
|
# Faz request no Servidor da Sefaz |
||||
|
retorno = self._post(url, xml) |
||||
|
|
||||
|
# Em caso de sucesso, retorna xml com nfe e protocolo de autorização. |
||||
|
# Caso contrário, envia todo o soap de resposta da Sefaz para decisão do usuário. |
||||
|
if retorno.status_code == 200: |
||||
|
# namespace |
||||
|
ns = {'ns': NAMESPACE_NFE} |
||||
|
# Procuta status no xml |
||||
|
try: |
||||
|
prot = etree.fromstring(retorno.text) |
||||
|
except ValueError: |
||||
|
# em SP retorno.text apresenta erro |
||||
|
prot = etree.fromstring(retorno.content) |
||||
|
if ind_sinc == 1: |
||||
|
try: |
||||
|
# Protocolo com envio OK |
||||
|
try: |
||||
|
inf_prot = prot[0][0] # root protNFe |
||||
|
except IndexError: |
||||
|
# Estados como GO vem com a tag header |
||||
|
inf_prot = prot[1][0] |
||||
|
|
||||
|
lote_status = inf_prot.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text |
||||
|
# Lote processado |
||||
|
if lote_status == '104': |
||||
|
prot_nfe = inf_prot.xpath("ns:retEnviNFe/ns:protNFe", namespaces=ns)[0] |
||||
|
status = prot_nfe.xpath('ns:infProt/ns:cStat', namespaces=ns)[0].text |
||||
|
# autorizado usa da NF-e |
||||
|
# retorna xml final (protNFe+NFe) |
||||
|
if status == '100': |
||||
|
raiz = etree.Element('nfeProc', xmlns=NAMESPACE_NFE, versao=VERSAO_PADRAO) |
||||
|
raiz.append(nota_fiscal) |
||||
|
raiz.append(prot_nfe) |
||||
|
return 0, raiz |
||||
|
except IndexError: |
||||
|
# Protocolo com algum erro no Envio |
||||
|
return 1, retorno, nota_fiscal |
||||
|
else: |
||||
|
# Retorna id do protocolo para posterior consulta em caso de sucesso. |
||||
|
rec = prot[0][0] |
||||
|
status = rec.xpath("ns:retEnviNFe/ns:cStat", namespaces=ns)[0].text |
||||
|
# Lote Recebido com Sucesso! |
||||
|
if status == '103': |
||||
|
nrec = rec.xpath("ns:retEnviNFe/ns:infRec/ns:nRec", namespaces=ns)[0].text |
||||
|
return 0, nrec, nota_fiscal |
||||
|
return 1, retorno, nota_fiscal |
||||
|
|
||||
|
def consulta_recibo(self, modelo, numero): |
||||
|
""" |
||||
|
Este método oferece a consulta do resultado do processamento de um lote de NF-e. |
||||
|
O aplicativo do Contribuinte deve ser construído de forma a aguardar um tempo mínimo de |
||||
|
15 segundos entre o envio do Lote de NF-e para processamento e a consulta do resultado |
||||
|
deste processamento, evitando a obtenção desnecessária do status de erro 105 - "Lote em |
||||
|
Processamento". |
||||
|
:param modelo: Modelo da nota |
||||
|
:param numero: Número da nota |
||||
|
:return: |
||||
|
""" |
||||
|
|
||||
|
# url do serviço |
||||
|
url = self._get_url(modelo=modelo, consulta='RECIBO') |
||||
|
|
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('consReciNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) |
||||
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
||||
|
etree.SubElement(raiz, 'nRec').text = numero |
||||
|
|
||||
|
# Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def consulta_nota(self, modelo, chave): |
||||
|
""" |
||||
|
Este método oferece a consulta da situação da NF-e/NFC-e na Base de Dados do Portal |
||||
|
da Secretaria de Fazenda Estadual. |
||||
|
:param modelo: Modelo da nota |
||||
|
:param chave: Chave da nota |
||||
|
:return: |
||||
|
""" |
||||
|
# url do serviço |
||||
|
url = self._get_url(modelo=modelo, consulta='CHAVE') |
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('consSitNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) |
||||
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
||||
|
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR' |
||||
|
etree.SubElement(raiz, 'chNFe').text = chave |
||||
|
# Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def consulta_distribuicao(self, cnpj=None, cpf=None, chave=None, nsu=0): |
||||
|
""" |
||||
|
O XML do pedido de distribuição suporta três tipos de consultas que são definidas de acordo com a tag |
||||
|
informada no XML. As tags são distNSU, consNSU e consChNFe. |
||||
|
a) distNSU – Distribuição de Conjunto de DF-e a Partir do NSU Informado |
||||
|
b) consNSU – Consulta DF-e Vinculado ao NSU Informado |
||||
|
c) consChNFe – Consulta de NF-e por Chave de Acesso Informada |
||||
|
:param cnpj: CNPJ do interessado |
||||
|
:param cpf: CPF do interessado |
||||
|
:param chave: Chave da NF-e a ser consultada |
||||
|
:param nsu: Ultimo nsu ou nsu específico para ser consultado. |
||||
|
:return: |
||||
|
""" |
||||
|
# url |
||||
|
url = self._get_url_an(consulta='DISTRIBUICAO') |
||||
|
# Monta XML para envio da requisição |
||||
|
raiz = etree.Element('distDFeInt', versao='1.01', xmlns=NAMESPACE_NFE) |
||||
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
||||
|
if self.uf: |
||||
|
etree.SubElement(raiz, 'cUFAutor').text = CODIGOS_ESTADOS[self.uf.upper()] |
||||
|
if cnpj: |
||||
|
etree.SubElement(raiz, 'CNPJ').text = cnpj |
||||
|
else: |
||||
|
etree.SubElement(raiz, 'CPF').text = cpf |
||||
|
if not chave: |
||||
|
distNSU = etree.SubElement(raiz, 'distNSU') |
||||
|
etree.SubElement(distNSU, 'ultNSU').text = str(nsu).zfill(15) |
||||
|
if chave: |
||||
|
consChNFe = etree.SubElement(raiz, 'consChNFe') |
||||
|
etree.SubElement(consChNFe, 'chNFe').text = chave |
||||
|
#Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('NFeDistribuicaoDFe', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def consulta_cadastro(self, modelo, cnpj): |
||||
|
""" |
||||
|
Consulta de cadastro |
||||
|
:param modelo: Modelo da nota |
||||
|
:param cnpj: CNPJ da empresa |
||||
|
:return: |
||||
|
""" |
||||
|
# UF que utilizam a SVRS - Sefaz Virtual do RS: Para serviço de Consulta Cadastro: AC, RN, PB, SC |
||||
|
lista_svrs = ['AC', 'RN', 'PB', 'SC', 'PA'] |
||||
|
|
||||
|
# RS implementa um método diferente na consulta de cadastro |
||||
|
# usa o mesmo url para produção e homologação |
||||
|
# não tem url para NFCE |
||||
|
if self.uf.upper() == 'RS': |
||||
|
url = NFE['RS']['CADASTRO'] |
||||
|
elif self.uf.upper() in lista_svrs: |
||||
|
url = NFE['SVRS']['CADASTRO'] |
||||
|
elif self.uf.upper() == 'SVC-RS': |
||||
|
url = NFE['SVC-RS']['CADASTRO'] |
||||
|
else: |
||||
|
url = self._get_url(modelo=modelo, consulta='CADASTRO') |
||||
|
|
||||
|
raiz = etree.Element('ConsCad', versao='2.00', xmlns=NAMESPACE_NFE) |
||||
|
info = etree.SubElement(raiz, 'infCons') |
||||
|
etree.SubElement(info, 'xServ').text = 'CONS-CAD' |
||||
|
etree.SubElement(info, 'UF').text = self.uf.upper() |
||||
|
etree.SubElement(info, 'CNPJ').text = cnpj |
||||
|
# etree.SubElement(info, 'CPF').text = cpf |
||||
|
|
||||
|
# Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('CadConsultaCadastro4', raiz) |
||||
|
# Chama método que efetua a requisição POST no servidor SOAP |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def evento(self, modelo, evento, id_lote=1): |
||||
|
""" |
||||
|
Envia um evento de nota fiscal (cancelamento e carta de correção) |
||||
|
:param modelo: Modelo da nota |
||||
|
:param evento: Eventro |
||||
|
:param id_lote: Id do lote |
||||
|
:return: |
||||
|
""" |
||||
|
|
||||
|
# url do serviço |
||||
|
try: |
||||
|
# manifestacao url é do AN |
||||
|
if evento[0][5].text.startswith('2'): |
||||
|
url = self._get_url_an(consulta='EVENTOS') |
||||
|
else: |
||||
|
url = self._get_url(modelo=modelo, consulta='EVENTOS') |
||||
|
except Exception: |
||||
|
url = self._get_url(modelo=modelo, consulta='EVENTOS') |
||||
|
|
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE) |
||||
|
etree.SubElement(raiz, 'idLote').text = str(id_lote) # numero autoincremental gerado pelo sistema |
||||
|
raiz.append(evento) |
||||
|
xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def status_servico(self, modelo): |
||||
|
""" |
||||
|
Verifica status do servidor da receita. |
||||
|
:param modelo: modelo é a string com tipo de serviço que deseja consultar, Ex: nfe ou nfce |
||||
|
:return: |
||||
|
""" |
||||
|
url = self._get_url(modelo, 'STATUS') |
||||
|
# Monta XML do corpo da requisição |
||||
|
raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) |
||||
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
||||
|
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] |
||||
|
etree.SubElement(raiz, 'xServ').text = 'STATUS' |
||||
|
xml = self._construir_xml_soap('NFeStatusServico4', raiz) |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def inutilizacao(self, modelo, cnpj, numero_inicial, numero_final, justificativa='', ano=None, serie='1'): |
||||
|
""" |
||||
|
Serviço destinado ao atendimento de solicitações de inutilização de numeração. |
||||
|
:param modelo: Modelo da nota |
||||
|
:param cnpj: CNPJda empresa |
||||
|
:param numero_inicial: Número inicial |
||||
|
:param numero_final: Número final |
||||
|
:param justificativa: Justificativa |
||||
|
:param ano: Ano |
||||
|
:param serie: Série |
||||
|
:return: |
||||
|
""" |
||||
|
|
||||
|
# url do servico |
||||
|
url = self._get_url(modelo=modelo, consulta='INUTILIZACAO') |
||||
|
|
||||
|
# Valores default |
||||
|
ano = str(ano or datetime.date.today().year)[-2:] |
||||
|
uf = CODIGOS_ESTADOS[self.uf.upper()] |
||||
|
cnpj = so_numeros(cnpj) |
||||
|
|
||||
|
# Identificador da TAG a ser assinada formada com Código da UF + Ano (2 posições) + |
||||
|
# CNPJ + modelo + série + nro inicial e nro final precedida do literal “ID” |
||||
|
id_unico = 'ID%(uf)s%(ano)s%(cnpj)s%(modelo)s%(serie)s%(num_ini)s%(num_fin)s' % { |
||||
|
'uf': uf, |
||||
|
'ano': ano, |
||||
|
'cnpj': cnpj, |
||||
|
'modelo': '55' if modelo == 'nfe' else '65', # 55=NF-e; 65=NFC-e; |
||||
|
'serie': serie.zfill(3), |
||||
|
'num_ini': str(numero_inicial).zfill(9), |
||||
|
'num_fin': str(numero_final).zfill(9), |
||||
|
} |
||||
|
|
||||
|
# Monta XML do corpo da requisição # FIXME |
||||
|
raiz = etree.Element('inutNFe', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) |
||||
|
inf_inut = etree.SubElement(raiz, 'infInut', Id=id_unico) |
||||
|
etree.SubElement(inf_inut, 'tpAmb').text = str(self._ambiente) |
||||
|
etree.SubElement(inf_inut, 'xServ').text = 'INUTILIZAR' |
||||
|
etree.SubElement(inf_inut, 'cUF').text = uf |
||||
|
etree.SubElement(inf_inut, 'ano').text = ano |
||||
|
etree.SubElement(inf_inut, 'CNPJ').text = cnpj |
||||
|
etree.SubElement(inf_inut, 'mod').text = '55' if modelo == 'nfe' else '65' # 55=NF-e; 65=NFC-e |
||||
|
etree.SubElement(inf_inut, 'serie').text = serie |
||||
|
etree.SubElement(inf_inut, 'nNFIni').text = str(numero_inicial) |
||||
|
etree.SubElement(inf_inut, 'nNFFin').text = str(numero_final) |
||||
|
etree.SubElement(inf_inut, 'xJust').text = justificativa |
||||
|
|
||||
|
# assinatura |
||||
|
a1 = AssinaturaA1(self.certificado, self.certificado_senha) |
||||
|
xml = a1.assinar(raiz) |
||||
|
|
||||
|
# Monta XML para envio da requisição |
||||
|
xml = self._construir_xml_soap('NFeInutilizacao4', xml) |
||||
|
# Faz request no Servidor da Sefaz e retorna resposta |
||||
|
return self._post(url, xml) |
||||
|
|
||||
|
def _get_url_an(self, consulta): |
||||
|
# producao |
||||
|
if self._ambiente == 1: |
||||
|
if consulta == 'DISTRIBUICAO': |
||||
|
ambiente = 'https://www1.' |
||||
|
else: |
||||
|
ambiente = 'https://www.' |
||||
|
# homologacao |
||||
|
else: |
||||
|
ambiente = 'https://hom.' |
||||
|
|
||||
|
self.url = ambiente + NFE['AN'][consulta] |
||||
|
return self.url |
||||
|
|
||||
|
def _get_url(self, modelo, consulta): |
||||
|
""" Retorna a url para comunicação com o webservice """ |
||||
|
# estado que implementam webservices proprios |
||||
|
lista = ['PR', 'MS', 'SP', 'AM', 'CE', 'BA', 'GO', 'MG', 'MT', 'PE', 'RS'] |
||||
|
if self.uf.upper() in lista: |
||||
|
if self._ambiente == 1: |
||||
|
ambiente = 'HTTPS' |
||||
|
else: |
||||
|
ambiente = 'HOMOLOGACAO' |
||||
|
if modelo == 'nfe': |
||||
|
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 |
||||
|
self.url = NFE[self.uf.upper()][ambiente] + NFE[self.uf.upper()][consulta] |
||||
|
elif modelo == 'nfce': |
||||
|
# PE e BA são as únicas UF'sque possuem NFE proprio e SVRS para NFCe |
||||
|
if self.uf.upper() == 'PE' or self.uf.upper() == 'BA': |
||||
|
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] |
||||
|
else: |
||||
|
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 |
||||
|
self.url = NFCE[self.uf.upper()][ambiente] + NFCE[self.uf.upper()][consulta] |
||||
|
else: |
||||
|
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') |
||||
|
# Estados que utilizam outros ambientes |
||||
|
else: |
||||
|
lista_svrs = ['AC', 'AL', 'AP', 'DF', 'ES', 'PB', 'PI', 'RJ', 'RN', 'RO', 'RR', 'SC', 'SE', 'TO', 'PA'] |
||||
|
if self.uf.upper() in lista_svrs: |
||||
|
if self._ambiente == 1: |
||||
|
ambiente = 'HTTPS' |
||||
|
else: |
||||
|
ambiente = 'HOMOLOGACAO' |
||||
|
if modelo == 'nfe': |
||||
|
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 |
||||
|
self.url = NFE['SVRS'][ambiente] + NFE['SVRS'][consulta] |
||||
|
elif modelo == 'nfce': |
||||
|
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 |
||||
|
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] |
||||
|
else: |
||||
|
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') |
||||
|
# unico UF que utiliza SVAN ainda para NF-e |
||||
|
# SVRS para NFC-e |
||||
|
elif self.uf.upper() == 'MA': |
||||
|
if self._ambiente == 1: |
||||
|
ambiente = 'HTTPS' |
||||
|
else: |
||||
|
ambiente = 'HOMOLOGACAO' |
||||
|
if modelo == 'nfe': |
||||
|
# nfe Ex: https://nfe.fazenda.pr.gov.br/nfe/NFeStatusServico3 |
||||
|
self.url = NFE['SVAN'][ambiente] + NFE['SVAN'][consulta] |
||||
|
elif modelo == 'nfce': |
||||
|
# nfce Ex: https://homologacao.nfce.fazenda.pr.gov.br/nfce/NFeStatusServico3 |
||||
|
self.url = NFCE['SVRS'][ambiente] + NFCE['SVRS'][consulta] |
||||
|
else: |
||||
|
raise Exception('Modelo não encontrado! Defina modelo="nfe" ou "nfce"') |
||||
|
else: |
||||
|
raise Exception(f"Url não encontrada para {modelo} e {consulta} {self.uf.upper()}") |
||||
|
return self.url |
||||
|
|
||||
|
def _construir_xml_soap(self, metodo, dados, cabecalho=False): |
||||
|
"""Mota o XML para o envio via SOAP""" |
||||
|
|
||||
|
raiz = etree.Element( |
||||
|
'{%s}Envelope' % NAMESPACE_SOAP, |
||||
|
nsmap={ |
||||
|
'xsi': NAMESPACE_XSI, |
||||
|
'xsd': NAMESPACE_XSD, |
||||
|
'soap': NAMESPACE_SOAP |
||||
|
} |
||||
|
) |
||||
|
body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP) |
||||
|
# distribuição tem um corpo de xml diferente |
||||
|
if metodo == 'NFeDistribuicaoDFe': |
||||
|
x = etree.SubElement(body, 'nfeDistDFeInteresse', xmlns=NAMESPACE_METODO+metodo) |
||||
|
a = etree.SubElement(x, 'nfeDadosMsg') |
||||
|
else: |
||||
|
a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo) |
||||
|
a.append(dados) |
||||
|
return raiz |
||||
|
|
||||
|
def _post_header(self): |
||||
|
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" |
||||
|
# PE é a única UF que exige SOAPAction no header |
||||
|
response = { |
||||
|
'content-type': 'application/soap+xml; charset=utf-8;', |
||||
|
'Accept': 'application/soap+xml; charset=utf-8;', |
||||
|
} |
||||
|
if self.uf.upper() == 'PE': |
||||
|
response["SOAPAction"] = "" |
||||
|
return response |
||||
|
|
||||
|
def _post(self, url, xml): |
||||
|
certificado_a1 = CertificadoA1(self.certificado) |
||||
|
chave, cert = certificado_a1.separar_arquivo(self.certificado_senha, caminho=True) |
||||
|
chave_cert = (cert, chave) |
||||
|
# Abre a conexão HTTPS |
||||
|
try: |
||||
|
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>' |
||||
|
|
||||
|
# limpa xml com caracteres bugados para infNFeSupl em NFC-e |
||||
|
xml = re.sub( |
||||
|
'<qrCode>(.*?)</qrCode>', |
||||
|
lambda x: x.group(0).replace('<', '<').replace('>', '>').replace('&', ''), |
||||
|
etree.tostring(xml, encoding='unicode').replace('\n', '') |
||||
|
) |
||||
|
xml = xml_declaration + xml |
||||
|
# Faz o request com o servidor |
||||
|
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False) |
||||
|
result.encoding = 'utf-8' |
||||
|
return result |
||||
|
except requests.exceptions.RequestException as e: |
||||
|
raise e |
||||
|
finally: |
||||
|
certificado_a1.excluir() |
||||
@ -0,0 +1,234 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
from pynfe.utils import etree |
||||
|
from pynfe.utils.flags import ( |
||||
|
NAMESPACE_XSI, |
||||
|
NAMESPACE_BETHA, |
||||
|
) |
||||
|
from pynfe.utils.webservices import NFSE |
||||
|
from pynfe.entidades.certificado import CertificadoA1 |
||||
|
from .comunicacao import Comunicacao |
||||
|
|
||||
|
|
||||
|
class ComunicacaoNfse(Comunicacao): |
||||
|
""" Classe de comunicação que segue o padrão definido para as SEFAZ dos Municípios. """ |
||||
|
|
||||
|
_versao = '' |
||||
|
_namespace = '' |
||||
|
|
||||
|
def __init__(self, certificado, certificado_senha, autorizador, homologacao=False): |
||||
|
self.certificado = certificado |
||||
|
self.certificado_senha = certificado_senha |
||||
|
self._ambiente = 2 if homologacao else 1 |
||||
|
self.autorizador = autorizador.upper() |
||||
|
if self.autorizador == 'GINFES': |
||||
|
self._namespace = 'http://www.ginfes.com.br/cabecalho_v03.xsd' |
||||
|
self._versao = '3' |
||||
|
elif self.autorizador == 'BETHA': |
||||
|
self._namespace = NAMESPACE_BETHA |
||||
|
self._versao = '2.02' |
||||
|
else: |
||||
|
raise Exception('Autorizador não encontrado!') |
||||
|
|
||||
|
def autorizacao(self, nota): |
||||
|
# url do serviço |
||||
|
url = self._get_url() |
||||
|
if self.autorizador == 'BETHA': |
||||
|
# xml |
||||
|
xml = etree.tostring(nota, encoding='unicode', pretty_print=False) |
||||
|
# comunica via wsdl |
||||
|
return self._post(url, xml, 'gerar') |
||||
|
else: |
||||
|
raise Exception('Este método só esta implementado no autorizador betha.') |
||||
|
|
||||
|
def enviar_lote(self, xml): |
||||
|
# url do serviço |
||||
|
url = self._get_url() |
||||
|
if self.autorizador == 'GINFES': |
||||
|
# xml |
||||
|
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml |
||||
|
# comunica via wsdl |
||||
|
return self._post_https(url, xml, 'enviar_lote') |
||||
|
else: |
||||
|
raise Exception('Este método só esta implementado no autorizador ginfes.') |
||||
|
|
||||
|
def consultar(self, xml): |
||||
|
# url do serviço |
||||
|
url = self._get_url() |
||||
|
if self.autorizador == 'GINFES': |
||||
|
# xml |
||||
|
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml |
||||
|
# comunica via wsdl |
||||
|
return self._post_https(url, xml, 'consulta') |
||||
|
else: |
||||
|
raise Exception('Este método só esta implementado no autorizador ginfes.') |
||||
|
|
||||
|
def consultar_rps(self, xml): |
||||
|
# url do serviço |
||||
|
url = self._get_url() |
||||
|
if self.autorizador == 'BETHA': |
||||
|
# comunica via wsdl |
||||
|
return self._post(url, xml, 'consultaRps') |
||||
|
elif self.autorizador == 'GINFES': |
||||
|
return self._post_https(url, xml, 'consultaRps') |
||||
|
# TODO outros autorizadres |
||||
|
else: |
||||
|
raise Exception('Autorizador não encontrado!') |
||||
|
|
||||
|
def consultar_faixa(self, xml): |
||||
|
# url do serviço |
||||
|
url = self._get_url() |
||||
|
if self.autorizador == 'BETHA': |
||||
|
# comunica via wsdl |
||||
|
return self._post(url, xml, 'consultaFaixa') |
||||
|
else: |
||||
|
raise Exception('Este método só esta implementado no autorizador betha.') |
||||
|
|
||||
|
def consultar_lote(self, xml): |
||||
|
# url do serviço |
||||
|
url = self._get_url() |
||||
|
if self.autorizador == 'GINFES': |
||||
|
# xml |
||||
|
xml = '<?xml version="1.0" encoding="UTF-8"?>' + xml |
||||
|
# comunica via wsdl |
||||
|
return self._post_https(url, xml, 'consulta_lote') |
||||
|
else: |
||||
|
raise Exception('Este método só esta implementado no autorizador ginfes.') |
||||
|
|
||||
|
def consultar_situacao_lote(self, xml): |
||||
|
# url do serviço |
||||
|
url = self._get_url() |
||||
|
if self.autorizador == 'GINFES': |
||||
|
# comunica via wsdl |
||||
|
return self._post_https(url, xml, 'consulta_situacao_lote') |
||||
|
else: |
||||
|
raise Exception('Este método só esta implementado no autorizador ginfes.') |
||||
|
|
||||
|
def cancelar(self, xml): |
||||
|
# url do serviço |
||||
|
url = self._get_url() |
||||
|
# Betha |
||||
|
if self.autorizador == 'BETHA': |
||||
|
# comunica via wsdl |
||||
|
return self._post(url, xml, 'cancelar') |
||||
|
# Ginfes |
||||
|
elif self.autorizador == 'GINFES': |
||||
|
# comunica via wsdl com certificado |
||||
|
return self._post_https(url, xml, 'cancelar') |
||||
|
# TODO outros autorizadres |
||||
|
else: |
||||
|
raise Exception('Autorizador não encontrado!') |
||||
|
|
||||
|
def _cabecalho(self, retorna_string=True): |
||||
|
""" Monta o XML do cabeçalho da requisição wsdl |
||||
|
Namespaces padrão homologação (Ginfes) """ |
||||
|
|
||||
|
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>' |
||||
|
# cabecalho = '<ns2:cabecalho versao="3" xmlns:ns2="http://www.ginfes.com.br/cabecalho_v03.xsd" |
||||
|
# xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><versaoDados>3</versaoDados></ns2:cabecalho>' |
||||
|
# cabecalho |
||||
|
raiz = etree.Element('{%s}cabecalho'%self._namespace, nsmap={'ns2':self._namespace, 'xsi':NAMESPACE_XSI}, versao=self._versao) |
||||
|
etree.SubElement(raiz, 'versaoDados').text = self._versao |
||||
|
|
||||
|
if retorna_string: |
||||
|
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n','') |
||||
|
cabecalho = xml_declaration + cabecalho |
||||
|
return cabecalho |
||||
|
else: |
||||
|
return raiz |
||||
|
|
||||
|
def _cabecalho2(self, retorna_string=True): |
||||
|
""" Monta o XML do cabeçalho da requisição wsdl |
||||
|
Namespaces que funcionaram em produção (Ginfes)""" |
||||
|
|
||||
|
xml_declaration = '<?xml version="1.0" encoding="UTF-8"?>' |
||||
|
|
||||
|
# cabecalho |
||||
|
raiz = etree.Element('cabecalho', xmlns=self._namespace, versao=self._versao) |
||||
|
etree.SubElement(raiz, 'versaoDados').text = self._versao |
||||
|
|
||||
|
if retorna_string: |
||||
|
cabecalho = etree.tostring(raiz, encoding='unicode', pretty_print=False).replace('\n', '') |
||||
|
cabecalho = xml_declaration + cabecalho |
||||
|
return cabecalho |
||||
|
else: |
||||
|
return raiz |
||||
|
|
||||
|
def _cabecalho_ginfes(self): |
||||
|
""" Retorna o XML do cabeçalho gerado pelo xsd""" |
||||
|
from pynfe.processamento.autorizador_nfse import SerializacaoGinfes |
||||
|
return SerializacaoGinfes().cabecalho() |
||||
|
|
||||
|
def _get_url(self): |
||||
|
""" Retorna a url para comunicação com o webservice """ |
||||
|
if self._ambiente == 1: |
||||
|
ambiente = 'HTTPS' |
||||
|
else: |
||||
|
ambiente = 'HOMOLOGACAO' |
||||
|
if self.autorizador in NFSE: |
||||
|
self.url = NFSE[self.autorizador][ambiente] |
||||
|
else: |
||||
|
raise Exception('Autorizador nao encontrado!') |
||||
|
return self.url |
||||
|
|
||||
|
def _post(self, url, xml, metodo): |
||||
|
""" Comunicação wsdl (http) sem certificado digital """ |
||||
|
# cabecalho |
||||
|
cabecalho = self._cabecalho() |
||||
|
# comunicacao wsdl |
||||
|
try: |
||||
|
from suds.client import Client |
||||
|
cliente = Client(url) |
||||
|
# gerar nfse |
||||
|
if metodo == 'gerar': |
||||
|
return cliente.service.GerarNfse(cabecalho, xml) |
||||
|
elif metodo == 'consultaRps': |
||||
|
return cliente.service.ConsultarNfsePorRps(cabecalho, xml) |
||||
|
elif metodo == 'consultaFaixa': |
||||
|
return cliente.service.ConsultarNfseFaixa(cabecalho, xml) |
||||
|
elif metodo == 'cancelar': |
||||
|
return cliente.service.CancelarNfse(cabecalho, xml) |
||||
|
# TODO outros metodos |
||||
|
else: |
||||
|
raise Exception('Método não implementado no autorizador.') |
||||
|
except Exception as e: |
||||
|
raise e |
||||
|
|
||||
|
def _post_https(self, url, xml, metodo): |
||||
|
""" Comunicação wsdl (https) utilizando certificado do usuário """ |
||||
|
# cabecalho |
||||
|
cabecalho = self._cabecalho() |
||||
|
# comunicacao wsdl |
||||
|
try: |
||||
|
from suds.client import Client |
||||
|
from pynfe.utils.https_nfse import HttpAuthenticated |
||||
|
|
||||
|
certificadoA1 = CertificadoA1(self.certificado) |
||||
|
chave, cert = certificadoA1.separar_arquivo(self.certificado_senha, caminho=True) |
||||
|
|
||||
|
cliente = Client(url, transport = HttpAuthenticated(key=chave, cert=cert, endereco=url)) |
||||
|
|
||||
|
# gerar nfse |
||||
|
if metodo == 'gerar': |
||||
|
return cliente.service.GerarNfse(cabecalho, xml) |
||||
|
elif metodo == 'enviar_lote': |
||||
|
return cliente.service.RecepcionarLoteRpsV3(cabecalho, xml) |
||||
|
elif metodo == 'consulta': |
||||
|
return cliente.service.ConsultarNfseV3(cabecalho, xml) |
||||
|
elif metodo == 'consulta_lote': |
||||
|
return cliente.service.ConsultarLoteRpsV3(cabecalho, xml) |
||||
|
elif metodo == 'consulta_situacao_lote': |
||||
|
return cliente.service.ConsultarSituacaoLoteRpsV3(cabecalho, xml) |
||||
|
elif metodo == 'consultaRps': |
||||
|
return cliente.service.ConsultarNfsePorRpsV3(cabecalho, xml) |
||||
|
elif metodo == 'consultaFaixa': |
||||
|
return cliente.service.ConsultarNfseFaixa(cabecalho, xml) |
||||
|
elif metodo == 'cancelar': |
||||
|
# versão 2 |
||||
|
return cliente.service.CancelarNfse(xml) |
||||
|
# versão 3 |
||||
|
# return cliente.service.CancelarNfseV3(cabecalho, xml) |
||||
|
# TODO outros metodos |
||||
|
else: |
||||
|
raise Exception('Método não implementado no autorizador.') |
||||
|
except Exception as e: |
||||
|
raise e |
||||
@ -0,0 +1,28 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
import re |
||||
|
from pynfe.utils import etree |
||||
|
|
||||
|
|
||||
|
class RetornoSoap(object): |
||||
|
|
||||
|
def __init__(self, webservice, retorno, resposta): |
||||
|
self.webservice = webservice |
||||
|
self.resposta = resposta |
||||
|
self.retorno = retorno |
||||
|
|
||||
|
|
||||
|
def analisar_retorno(webservice, retorno, classe_resposta): |
||||
|
|
||||
|
# retorno.raise_for_status() |
||||
|
# print(retorno.text) |
||||
|
|
||||
|
match = re.search('<soap:Body>(.*?)</soap:Body>', retorno.text) |
||||
|
|
||||
|
if match: |
||||
|
resultado = etree.tostring(etree.fromstring(match.group(1))[0]) |
||||
|
# classe_resposta.Validate_simpletypes_ = False |
||||
|
# resposta = classe_resposta.parseString(resultado) |
||||
|
resposta = resultado |
||||
|
# resposta = retorno.text |
||||
|
|
||||
|
return RetornoSoap(webservice, retorno, resposta) |
||||
@ -1,3 +1,3 @@ |
|||||
# Opcional para NFS-e |
# Opcional para NFS-e |
||||
suds-jurko |
suds-jurko |
||||
pyxb=1.2.4 |
|
||||
|
pyxb==1.2.4 |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue