|
|
@ -41,7 +41,7 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
etree.SubElement(raiz, 'indSinc').text = str(indSinc) # 0 para assincrono, 1 para sincrono |
|
|
etree.SubElement(raiz, 'indSinc').text = str(indSinc) # 0 para assincrono, 1 para sincrono |
|
|
raiz.append(nota_fiscal) |
|
|
raiz.append(nota_fiscal) |
|
|
# Monta XML para envio da requisição |
|
|
# Monta XML para envio da requisição |
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeAutorizacao'), metodo='NfeAutorizacao', dados=raiz) |
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('NFeAutorizacao4', raiz) |
|
|
# Faz request no Servidor da Sefaz |
|
|
# Faz request no Servidor da Sefaz |
|
|
retorno = self._post(url, xml) |
|
|
retorno = self._post(url, xml) |
|
|
|
|
|
|
|
|
@ -96,8 +96,7 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
|
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
|
|
etree.SubElement(raiz, 'nRec').text = numero |
|
|
etree.SubElement(raiz, 'nRec').text = numero |
|
|
# Monta XML para envio da requisição |
|
|
# Monta XML para envio da requisição |
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeRetAutorizacao'), metodo='NfeRetAutorizacao', dados=raiz) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('NFeRetAutorizacao4', raiz) |
|
|
return self._post(url, xml) |
|
|
return self._post(url, xml) |
|
|
|
|
|
|
|
|
def consulta_nota(self, modelo, chave): |
|
|
def consulta_nota(self, modelo, chave): |
|
|
@ -112,8 +111,7 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR' |
|
|
etree.SubElement(raiz, 'xServ').text = 'CONSULTAR' |
|
|
etree.SubElement(raiz, 'chNFe').text = chave |
|
|
etree.SubElement(raiz, 'chNFe').text = chave |
|
|
# Monta XML para envio da requisição |
|
|
# Monta XML para envio da requisição |
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeConsulta2'), metodo='NfeConsulta2', dados=raiz) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('NFeConsultaProtocolo4', raiz) |
|
|
return self._post(url, xml) |
|
|
return self._post(url, xml) |
|
|
|
|
|
|
|
|
def consulta_notas_cnpj(self, cnpj, nsu=0): |
|
|
def consulta_notas_cnpj(self, cnpj, nsu=0): |
|
|
@ -140,7 +138,7 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
etree.SubElement(raiz, 'ultNSU').text = str(nsu) |
|
|
etree.SubElement(raiz, 'ultNSU').text = str(nsu) |
|
|
|
|
|
|
|
|
# Monta XML para envio da requisição |
|
|
# Monta XML para envio da requisição |
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeConsultaDest'), metodo='NfeConsultaDest', dados=raiz) |
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('NfeConsultaDest', raiz) |
|
|
|
|
|
|
|
|
return self._post(url, xml) |
|
|
return self._post(url, xml) |
|
|
|
|
|
|
|
|
@ -167,7 +165,7 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
etree.SubElement(info, 'CNPJ').text = cnpj |
|
|
etree.SubElement(info, 'CNPJ').text = cnpj |
|
|
#etree.SubElement(info, 'CPF').text = cpf |
|
|
#etree.SubElement(info, 'CPF').text = cpf |
|
|
# Monta XML para envio da requisição |
|
|
# Monta XML para envio da requisição |
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='CadConsultaCadastro2'), metodo='CadConsultaCadastro2', dados=raiz) |
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('CadConsultaCadastro4', raiz) |
|
|
# Chama método que efetua a requisição POST no servidor SOAP |
|
|
# Chama método que efetua a requisição POST no servidor SOAP |
|
|
return self._post(url, xml) |
|
|
return self._post(url, xml) |
|
|
|
|
|
|
|
|
@ -186,24 +184,19 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE) |
|
|
raiz = etree.Element('envEvento', versao='1.00', xmlns=NAMESPACE_NFE) |
|
|
etree.SubElement(raiz, 'idLote').text = str(idlote) # numero autoincremental gerado pelo sistema |
|
|
etree.SubElement(raiz, 'idLote').text = str(idlote) # numero autoincremental gerado pelo sistema |
|
|
raiz.append(evento) |
|
|
raiz.append(evento) |
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='RecepcaoEvento'), metodo='RecepcaoEvento', dados=raiz) |
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('NFeRecepcaoEvento4', raiz) |
|
|
return self._post(url, xml) |
|
|
return self._post(url, xml) |
|
|
|
|
|
|
|
|
def status_servico(self, modelo): |
|
|
def status_servico(self, modelo): |
|
|
""" Verifica status do servidor da receita. """ |
|
|
""" Verifica status do servidor da receita. """ |
|
|
""" modelo é a string com tipo de serviço que deseja consultar |
|
|
|
|
|
Ex: nfe ou nfce |
|
|
|
|
|
""" |
|
|
|
|
|
url = self._get_url(modelo=modelo, consulta='STATUS') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" modelo é a string com tipo de serviço que deseja consultar Ex: nfe ou nfce """ |
|
|
|
|
|
url = self._get_url(modelo, 'STATUS') |
|
|
# Monta XML do corpo da requisição |
|
|
# Monta XML do corpo da requisição |
|
|
raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) |
|
|
raiz = etree.Element('consStatServ', versao=VERSAO_PADRAO, xmlns=NAMESPACE_NFE) |
|
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
|
|
etree.SubElement(raiz, 'tpAmb').text = str(self._ambiente) |
|
|
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] |
|
|
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] |
|
|
etree.SubElement(raiz, 'xServ').text = 'STATUS' |
|
|
etree.SubElement(raiz, 'xServ').text = 'STATUS' |
|
|
# Monta XML para envio da requisição |
|
|
|
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeStatusServico2'), metodo='NfeStatusServico2', dados=raiz) |
|
|
|
|
|
# Chama método que efetua a requisição POST no servidor SOAP |
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('NFeStatusServico4', raiz) |
|
|
return self._post(url, xml) |
|
|
return self._post(url, xml) |
|
|
|
|
|
|
|
|
def download(self, cnpj, chave): |
|
|
def download(self, cnpj, chave): |
|
|
@ -222,7 +215,7 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
etree.SubElement(raiz, 'chNFe').text = str(chave) |
|
|
etree.SubElement(raiz, 'chNFe').text = str(chave) |
|
|
|
|
|
|
|
|
# Monta XML para envio da requisição |
|
|
# Monta XML para envio da requisição |
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeDownloadNF'), metodo='NfeDownloadNF', dados=raiz) |
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('NfeDownloadNF', raiz) |
|
|
|
|
|
|
|
|
return self._post(url, xml) |
|
|
return self._post(url, xml) |
|
|
|
|
|
|
|
|
@ -266,7 +259,7 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
xml = a1.assinar(raiz) |
|
|
xml = a1.assinar(raiz) |
|
|
|
|
|
|
|
|
# Monta XML para envio da requisição |
|
|
# Monta XML para envio da requisição |
|
|
xml = self._construir_xml_status_pr(cabecalho=self._cabecalho_soap(metodo='NfeInutilizacao2'), metodo='NfeInutilizacao2', dados=xml) |
|
|
|
|
|
|
|
|
xml = self._construir_xml_soap('NFeInutilizacao4', xml) |
|
|
# Faz request no Servidor da Sefaz e retorna resposta |
|
|
# Faz request no Servidor da Sefaz e retorna resposta |
|
|
return self._post(url, xml) |
|
|
return self._post(url, xml) |
|
|
|
|
|
|
|
|
@ -381,33 +374,19 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] |
|
|
etree.SubElement(raiz, 'cUF').text = CODIGOS_ESTADOS[self.uf.upper()] |
|
|
return raiz |
|
|
return raiz |
|
|
|
|
|
|
|
|
def _construir_xml_soap(self, cabecalho, metodo, dados): |
|
|
|
|
|
|
|
|
def _construir_xml_soap(self, metodo, dados): |
|
|
"""Mota o XML para o envio via SOAP""" |
|
|
"""Mota o XML para o envio via SOAP""" |
|
|
|
|
|
|
|
|
raiz = etree.Element('{%s}Envelope'%NAMESPACE_SOAP, nsmap={'soap12': NAMESPACE_SOAP}) |
|
|
|
|
|
c= etree.SubElement(raiz, '{%s}Header'%NAMESPACE_SOAP) |
|
|
|
|
|
c.append(cabecalho) |
|
|
|
|
|
|
|
|
raiz = etree.Element('{%s}Envelope'%NAMESPACE_SOAP, nsmap={'xsi': NAMESPACE_XSI, 'xsd': NAMESPACE_XSD,'soap': NAMESPACE_SOAP}) |
|
|
body = etree.SubElement(raiz, '{%s}Body'%NAMESPACE_SOAP) |
|
|
body = etree.SubElement(raiz, '{%s}Body'%NAMESPACE_SOAP) |
|
|
a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo) |
|
|
a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo) |
|
|
a.append(dados) |
|
|
a.append(dados) |
|
|
return raiz |
|
|
return raiz |
|
|
|
|
|
|
|
|
def _construir_xml_status_pr(self, cabecalho, metodo, dados): |
|
|
|
|
|
u"""Mota o XML para o envio via SOAP""" |
|
|
|
|
|
|
|
|
|
|
|
raiz = etree.Element('{%s}Envelope' % NAMESPACE_SOAP, nsmap={'xsi': NAMESPACE_XSI, 'xsd': NAMESPACE_XSD,'soap': NAMESPACE_SOAP}) |
|
|
|
|
|
c = etree.SubElement(raiz, '{%s}Header' % NAMESPACE_SOAP) |
|
|
|
|
|
c.append(cabecalho) |
|
|
|
|
|
body = etree.SubElement(raiz, '{%s}Body' % NAMESPACE_SOAP) |
|
|
|
|
|
a = etree.SubElement(body, 'nfeDadosMsg', xmlns=NAMESPACE_METODO+metodo) |
|
|
|
|
|
a.append(dados) |
|
|
|
|
|
return raiz |
|
|
|
|
|
|
|
|
|
|
|
def _post_header(self): |
|
|
def _post_header(self): |
|
|
u"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" |
|
|
|
|
|
|
|
|
"""Retorna um dicionário com os atributos para o cabeçalho da requisição HTTP""" |
|
|
return { |
|
|
return { |
|
|
u'content-type': u'application/soap+xml; charset=utf-8;', |
|
|
|
|
|
u'Accept': u'application/soap+xml; charset=utf-8;', |
|
|
|
|
|
|
|
|
'content-type': 'application/soap+xml; charset=utf-8;', |
|
|
|
|
|
'Accept': 'application/soap+xml; charset=utf-8;', |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
def _post(self, url, xml): |
|
|
def _post(self, url, xml): |
|
|
@ -422,19 +401,11 @@ class ComunicacaoSefaz(Comunicacao): |
|
|
lambda x:x.group(0).replace('<','<').replace('>','>').replace('amp;',''), |
|
|
lambda x:x.group(0).replace('<','<').replace('>','>').replace('amp;',''), |
|
|
etree.tostring(xml, encoding='unicode').replace('\n','')) |
|
|
etree.tostring(xml, encoding='unicode').replace('\n','')) |
|
|
xml = xml_declaration + xml |
|
|
xml = xml_declaration + xml |
|
|
|
|
|
|
|
|
# adapter para substituir ssl por tls |
|
|
|
|
|
# s = requests.Session() |
|
|
|
|
|
# s.mount(url, AdapterTLSV1()) |
|
|
|
|
|
# print(xml) |
|
|
|
|
|
# # print(self._post_header()) |
|
|
|
|
|
# # Faz o request com o servidor |
|
|
|
|
|
# result = s.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False, timeout=120) |
|
|
|
|
|
# result.encoding = 'utf-8' |
|
|
|
|
|
# return result |
|
|
|
|
|
|
|
|
# debug dev 4.00 |
|
|
|
|
|
print(xml) |
|
|
print(url) |
|
|
print(url) |
|
|
# Faz o request com o servidor |
|
|
# Faz o request com o servidor |
|
|
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False, timeout=120) |
|
|
|
|
|
|
|
|
result = requests.post(url, xml, headers=self._post_header(), cert=chave_cert, verify=False) |
|
|
result.encoding = 'utf-8' |
|
|
result.encoding = 'utf-8' |
|
|
return result |
|
|
return result |
|
|
except requests.exceptions.RequestException as e: |
|
|
except requests.exceptions.RequestException as e: |
|
|
@ -665,15 +636,3 @@ class ComunicacaoNfse(Comunicacao): |
|
|
raise Exception('Método não implementado no autorizador.') |
|
|
raise Exception('Método não implementado no autorizador.') |
|
|
except Exception as e: |
|
|
except Exception as e: |
|
|
raise e |
|
|
raise e |
|
|
|
|
|
|
|
|
""" Adapter para conexão tls """ |
|
|
|
|
|
from requests.adapters import HTTPAdapter |
|
|
|
|
|
from requests.packages.urllib3.poolmanager import PoolManager |
|
|
|
|
|
import ssl |
|
|
|
|
|
|
|
|
|
|
|
class AdapterTLSV1(HTTPAdapter): |
|
|
|
|
|
def init_poolmanager(self, connections, maxsize, block=False): |
|
|
|
|
|
self.poolmanager = PoolManager(num_pools=connections, |
|
|
|
|
|
maxsize=maxsize, |
|
|
|
|
|
block=block, |
|
|
|
|
|
ssl_version=ssl.PROTOCOL_TLSv1_2) |
|
|
|