Browse Source

Refatorando código

tags/0.1.5
Danimar Ribeiro 10 years ago
parent
commit
79a9339340
  1. 17
      pytrustnfe/Certificado.py
  2. 22
      pytrustnfe/HttpClient.py
  3. 18
      pytrustnfe/Servidores.py
  4. 66
      pytrustnfe/servicos/Assinatura.py
  5. 74
      pytrustnfe/servicos/Comunicacao.py
  6. 28
      pytrustnfe/servicos/nfe_autorizacao.py
  7. 2
      pytrustnfe/utils.py
  8. 29
      setup.py

17
pytrustnfe/Certificado.py

@ -1,4 +1,4 @@
#coding=utf-8
# coding=utf-8
''' '''
Created on Jun 16, 2015 Created on Jun 16, 2015
@ -14,11 +14,14 @@ def converte_pfx_pem(caminho, senha):
stream = open(caminho, 'rb').read() stream = open(caminho, 'rb').read()
try: try:
certificado = crypto.load_pkcs12(stream, senha) certificado = crypto.load_pkcs12(stream, senha)
privada = crypto.dump_privatekey(crypto.FILETYPE_PEM, certificado.get_privatekey())
certificado = crypto.dump_certificate(crypto.FILETYPE_PEM, certificado.get_certificate())
privada = crypto.dump_privatekey(crypto.FILETYPE_PEM,
certificado.get_privatekey())
certificado = crypto.dump_certificate(crypto.FILETYPE_PEM,
certificado.get_certificate())
except Exception as e: except Exception as e:
if len(e.message) == 1 and len(e.message[0])==3 and e.message[0][2] == 'mac verify failure':
raise Exception('Senha inválida')
if len(e.message) == 1 and len(e.message[0]) == 3 and \
e.message[0][2] == 'mac verify failure':
raise Exception('Senha inválida')
raise raise
return privada, certificado
return privada, certificado

22
pytrustnfe/HttpClient.py

@ -1,4 +1,4 @@
#coding=utf-8
# coding=utf-8
''' '''
Created on Jun 16, 2015 Created on Jun 16, 2015
@ -6,24 +6,25 @@ Created on Jun 16, 2015
''' '''
from httplib import HTTPSConnection from httplib import HTTPSConnection
class HttpClient(object): class HttpClient(object):
def __init__(self, url, chave_pem, certificado_pem): def __init__(self, url, chave_pem, certificado_pem):
self.url = url self.url = url
self.chave_pem = chave_pem self.chave_pem = chave_pem
self.certificado_pem = certificado_pem self.certificado_pem = certificado_pem
def _headers(self): def _headers(self):
return {
return {
u'Content-type': u'application/soap+xml; charset=utf-8', u'Content-type': u'application/soap+xml; charset=utf-8',
u'Accept': u'application/soap+xml; charset=utf-8' u'Accept': u'application/soap+xml; charset=utf-8'
} }
def post_xml(self, post, xml): def post_xml(self, post, xml):
conexao = HTTPSConnection(self.url, '443', key_file=self.chave_pem,
conexao = HTTPSConnection(self.url, '443', key_file=self.chave_pem,
cert_file=self.certificado_pem) cert_file=self.certificado_pem)
try: try:
conexao.request(u'POST', post, xml, self._headers()) conexao.request(u'POST', post, xml, self._headers())
response = conexao.getresponse() response = conexao.getresponse()
@ -31,9 +32,6 @@ class HttpClient(object):
return response.read() return response.read()
return response.read() return response.read()
except Exception as e: except Exception as e:
print str(e)
print(str(e))
finally: finally:
conexao.close() conexao.close()

18
pytrustnfe/Servidores.py

@ -9,33 +9,33 @@ def localizar_url(servico, estado):
METODO_WS = { METODO_WS = {
WS_NFE_AUTORIZACAO: {
WS_NFE_AUTORIZACAO:{
'webservice': 'NfeAutorizacao', 'webservice': 'NfeAutorizacao',
'metodo' : 'NfeAutorizacao',
'metodo': 'NfeAutorizacao',
}, },
WS_NFE_CONSULTA_AUTORIZACAO: { WS_NFE_CONSULTA_AUTORIZACAO: {
'webservice': 'NfeRetAutorizacao', 'webservice': 'NfeRetAutorizacao',
'metodo' : 'NfeRetAutorizacao',
'metodo': 'NfeRetAutorizacao',
}, },
WS_NFE_INUTILIZACAO: { WS_NFE_INUTILIZACAO: {
'webservice': 'NfeInutilizacao2', 'webservice': 'NfeInutilizacao2',
'metodo' : 'nfeInutilizacaoNF2',
'metodo': 'nfeInutilizacaoNF2',
}, },
WS_NFE_CONSULTA: { WS_NFE_CONSULTA: {
'webservice': 'NfeConsulta2', 'webservice': 'NfeConsulta2',
'metodo' : 'nfeConsultaNF2',
'metodo': 'nfeConsultaNF2',
}, },
WS_NFE_SITUACAO: { WS_NFE_SITUACAO: {
'webservice': 'NfeStatusServico2', 'webservice': 'NfeStatusServico2',
'metodo' : 'nfeStatusServicoNF2',
'metodo': 'nfeStatusServicoNF2',
}, },
WS_NFE_CONSULTA_CADASTRO: { WS_NFE_CONSULTA_CADASTRO: {
'webservice': 'CadConsultaCadastro2', 'webservice': 'CadConsultaCadastro2',
'metodo' : 'consultaCadastro2',
'metodo': 'consultaCadastro2',
}, },
WS_NFE_RECEPCAO_EVENTO: { WS_NFE_RECEPCAO_EVENTO: {
'webservice': 'RecepcaoEvento', 'webservice': 'RecepcaoEvento',
'metodo' : 'nfeRecepcaoEvento',
'metodo': 'nfeRecepcaoEvento',
}, },
WS_NFE_DOWNLOAD: { WS_NFE_DOWNLOAD: {
'webservice': 'NfeDownloadNF', 'webservice': 'NfeDownloadNF',
@ -529,4 +529,4 @@ ESTADO_WS_CONTINGENCIA = {
'SE': SVC_AN, 'SE': SVC_AN,
'SP': SVC_AN, 'SP': SVC_AN,
'TO': SVC_AN, 'TO': SVC_AN,
}
}

66
pytrustnfe/servicos/Assinatura.py

@ -14,65 +14,71 @@ class Assinatura(object):
def __init__(self, arquivo, senha): def __init__(self, arquivo, senha):
self.arquivo = arquivo self.arquivo = arquivo
self.senha = senha self.senha = senha
def _checar_certificado(self):
def _checar_certificado(self):
if not os.path.isfile(self.arquivo): if not os.path.isfile(self.arquivo):
raise Exception('Caminho do certificado não existe.') raise Exception('Caminho do certificado não existe.')
def _inicializar_cripto(self): def _inicializar_cripto(self):
libxml2.initParser() libxml2.initParser()
libxml2.substituteEntitiesDefault(1) libxml2.substituteEntitiesDefault(1)
xmlsec.init() xmlsec.init()
xmlsec.cryptoAppInit(None) xmlsec.cryptoAppInit(None)
xmlsec.cryptoInit() xmlsec.cryptoInit()
def _finalizar_cripto(self): def _finalizar_cripto(self):
xmlsec.cryptoShutdown() xmlsec.cryptoShutdown()
xmlsec.cryptoAppShutdown() xmlsec.cryptoAppShutdown()
xmlsec.shutdown() xmlsec.shutdown()
libxml2.cleanupParser() libxml2.cleanupParser()
def assina_xml(self, xml): def assina_xml(self, xml):
self._checar_certificado() self._checar_certificado()
self._inicializar_cripto() self._inicializar_cripto()
try: try:
doc_xml = libxml2.parseMemory(xml.encode('utf-8'), len(xml.encode('utf-8')))
signNode = xmlsec.TmplSignature(doc_xml, xmlsec.transformInclC14NId(),
xmlsec.transformRsaSha1Id(), None)
doc_xml.getRootElement().addChild(signNode)
refNode = signNode.addReference(xmlsec.transformSha1Id(),
None, '#NFe43150602261542000143550010000000761792265342', None)
doc_xml = libxml2.parseMemory(xml.encode('utf-8'),
len(xml.encode('utf-8')))
signNode = xmlsec.TmplSignature(doc_xml,
xmlsec.transformInclC14NId(),
xmlsec.transformRsaSha1Id(), None)
doc_xml.getRootElement().addChild(signNode)
refNode = signNode.addReference(
xmlsec.transformSha1Id(),
None, '#NFe43150602261542000143550010000000761792265342', None)
refNode.addTransform(xmlsec.transformEnvelopedId()) refNode.addTransform(xmlsec.transformEnvelopedId())
refNode.addTransform(xmlsec.transformInclC14NId()) refNode.addTransform(xmlsec.transformInclC14NId())
keyInfoNode = signNode.ensureKeyInfo() keyInfoNode = signNode.ensureKeyInfo()
keyInfoNode.addX509Data()
dsig_ctx = xmlsec.DSigCtx()
chave = xmlsec.cryptoAppKeyLoad(filename=str(self.arquivo), format=xmlsec.KeyDataFormatPkcs12,
pwd=str(self.senha), pwdCallback=None, pwdCallbackCtx=None)
keyInfoNode.addX509Data()
dsig_ctx = xmlsec.DSigCtx()
chave = xmlsec.cryptoAppKeyLoad(
filename=str(self.arquivo),
format=xmlsec.KeyDataFormatPkcs12,
pwd=str(self.senha), pwdCallback=None, pwdCallbackCtx=None)
dsig_ctx.signKey = chave dsig_ctx.signKey = chave
dsig_ctx.sign(signNode) dsig_ctx.sign(signNode)
status = dsig_ctx.status status = dsig_ctx.status
dsig_ctx.destroy() dsig_ctx.destroy()
if status != xmlsec.DSigStatusSucceeded: if status != xmlsec.DSigStatusSucceeded:
raise RuntimeError('Erro ao realizar a assinatura do arquivo; status: "' + str(status) + '"')
raise RuntimeError(
'Erro ao realizar a assinatura do arquivo; status: "' +
str(status) + '"')
xpath = doc_xml.xpathNewContext() xpath = doc_xml.xpathNewContext()
xpath.xpathRegisterNs('sig', NAMESPACE_SIG) xpath.xpathRegisterNs('sig', NAMESPACE_SIG)
certificados = xpath.xpathEval('//sig:X509Data/sig:X509Certificate')
for i in range(len(certificados)-1):
certificados[i].unlinkNode()
certificados[i].freeNode()
certs = xpath.xpathEval('//sig:X509Data/sig:X509Certificate')
for i in range(len(certs)-1):
certs[i].unlinkNode()
certs[i].freeNode()
xml = doc_xml.serialize() xml = doc_xml.serialize()
return xml return xml

74
pytrustnfe/servicos/Comunicacao.py

@ -1,4 +1,4 @@
#coding=utf-8
# coding=utf-8
''' '''
Created on Jun 14, 2015 Created on Jun 14, 2015
@ -7,16 +7,13 @@ Created on Jun 14, 2015
from lxml import objectify from lxml import objectify
from uuid import uuid4 from uuid import uuid4
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import tostring
from pytrustnfe.xml.DynamicXml import DynamicXml
from pytrustnfe.HttpClient import HttpClient from pytrustnfe.HttpClient import HttpClient
from pytrustnfe.Certificado import converte_pfx_pem from pytrustnfe.Certificado import converte_pfx_pem
from xml.dom.minidom import parseString from xml.dom.minidom import parseString
from pytrustnfe.Strings import CONSULTA_CADASTRO_COMPLETA
common_namespaces = { 'soap': 'http://www.w3.org/2003/05/soap-envelope' }
common_namespaces = {'soap': 'http://www.w3.org/2003/05/soap-envelope'}
soap_body_path = './soap:Envelope/soap:Body' soap_body_path = './soap:Envelope/soap:Body'
soap_fault_path = './soap:Envelope/soap:Body/soap:Fault' soap_fault_path = './soap:Envelope/soap:Body/soap:Fault'
@ -24,43 +21,43 @@ soap_fault_path = './soap:Envelope/soap:Body/soap:Fault'
class Comunicacao(object): class Comunicacao(object):
url = '' url = ''
web_service = ''
web_service = ''
metodo = '' metodo = ''
tag_retorno = ''
tag_retorno = ''
def __init__(self, certificado, senha): def __init__(self, certificado, senha):
self.certificado = certificado self.certificado = certificado
self.senha = senha
self.senha = senha
def _soap_xml(self, body): def _soap_xml(self, body):
return '<?xml version="1.0" encoding="utf-8"?>'\
'<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">'\
'<soap:Header>'\
'<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/' + self.metodo + '">'\
'<cUF>42</cUF><versaoDados>2.00</versaoDados>'\
'</nfeCabecMsg>'\
'</soap:Header>'\
'<soap:Body>'\
'<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/' + self.metodo + '">'\
+ body + '</nfeDadosMsg>'\
'</soap:Body>'\
'</soap:Envelope>'
xml = '''<?xml version="1.0" encoding="utf-8"?>
<soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">
<soap:Header>
<nfeCabecMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/'''
xml += self.metodo
xml += '''"><cUF>42</cUF><versaoDados>2.00</versaoDados>
</nfeCabecMsg>
</soap:Header>
<soap:Body>
<nfeDadosMsg xmlns="http://www.portalfiscal.inf.br/nfe/wsdl/'''
xml += self.metodo + '">' + body
xml += '</nfeDadosMsg></soap:Body></soap:Envelope>'
def _preparar_temp_pem(self): def _preparar_temp_pem(self):
chave_temp = '/tmp/' + uuid4().hex chave_temp = '/tmp/' + uuid4().hex
certificado_temp = '/tmp/' + uuid4().hex certificado_temp = '/tmp/' + uuid4().hex
chave, certificado = converte_pfx_pem(self.certificado, self.senha) chave, certificado = converte_pfx_pem(self.certificado, self.senha)
arq_temp = open(chave_temp, 'w') arq_temp = open(chave_temp, 'w')
arq_temp.write(chave) arq_temp.write(chave)
arq_temp.close() arq_temp.close()
arq_temp = open(certificado_temp, 'w') arq_temp = open(certificado_temp, 'w')
arq_temp.write(certificado) arq_temp.write(certificado)
arq_temp.close() arq_temp.close()
return chave_temp, certificado_temp return chave_temp, certificado_temp
def _validar_dados(self): def _validar_dados(self):
assert self.url != '', "Url servidor não configurada" assert self.url != '', "Url servidor não configurada"
assert self.web_service != '', "Web service não especificado" assert self.web_service != '', "Web service não especificado"
@ -68,7 +65,7 @@ class Comunicacao(object):
assert self.senha != '', "Senha não configurada" assert self.senha != '', "Senha não configurada"
assert self.metodo != '', "Método não configurado" assert self.metodo != '', "Método não configurado"
assert self.tag_retorno != '', "Tag de retorno não configurado" assert self.tag_retorno != '', "Tag de retorno não configurado"
def _validar_xml(self, obj): def _validar_xml(self, obj):
xml = None xml = None
if isinstance(obj, DynamicXml): if isinstance(obj, DynamicXml):
@ -77,28 +74,23 @@ class Comunicacao(object):
xml = obj xml = obj
assert xml is not None, "Objeto deve ser do tipo DynamicXml ou string" assert xml is not None, "Objeto deve ser do tipo DynamicXml ou string"
return xml return xml
def _executar_consulta(self, xmlEnviar): def _executar_consulta(self, xmlEnviar):
self._validar_dados() self._validar_dados()
chave, certificado = self._preparar_temp_pem() chave, certificado = self._preparar_temp_pem()
client = HttpClient(self.url, chave, certificado) client = HttpClient(self.url, chave, certificado)
soap_xml = self._soap_xml(xmlEnviar) soap_xml = self._soap_xml(xmlEnviar)
xml_retorno = client.post_xml(self.web_service, soap_xml) xml_retorno = client.post_xml(self.web_service, soap_xml)
dom = parseString(xml_retorno) dom = parseString(xml_retorno)
nodes = dom.getElementsByTagNameNS(common_namespaces['soap'],'Fault')
if len(nodes) > 0:
nodes = dom.getElementsByTagNameNS(common_namespaces['soap'], 'Fault')
if len(nodes) > 0:
return nodes[0].toxml(), None return nodes[0].toxml(), None
nodes = dom.getElementsByTagName(self.tag_retorno)
nodes = dom.getElementsByTagName(self.tag_retorno)
if len(nodes) > 0: if len(nodes) > 0:
obj = objectify.fromstring(nodes[0].toxml()) obj = objectify.fromstring(nodes[0].toxml())
return nodes[0].toxml(), obj return nodes[0].toxml(), obj
return xml_retorno, objectify.fromstring(xml_retorno) return xml_retorno, objectify.fromstring(xml_retorno)

28
pytrustnfe/servicos/NFeAutorizacao.py → pytrustnfe/servicos/nfe_autorizacao.py

@ -1,48 +1,44 @@
#coding=utf-8
# coding=utf-8
''' '''
Created on 21/06/2015 Created on 21/06/2015
@author: danimar @author: danimar
''' '''
from pytrustnfe.servicos.Comunicacao import Comunicacao from pytrustnfe.servicos.Comunicacao import Comunicacao
from pytrustnfe.xml import DynamicXml
from pytrustnfe import utils from pytrustnfe import utils
class NfeAutorizacao(Comunicacao): class NfeAutorizacao(Comunicacao):
def __init__(self, certificado, senha): def __init__(self, certificado, senha):
Comunicacao.__init__(self, certificado, senha) Comunicacao.__init__(self, certificado, senha)
def autorizar_nfe(self, nfe): def autorizar_nfe(self, nfe):
xml = self._validar_xml(nfe) xml = self._validar_xml(nfe)
self.metodo = 'NFeAutorizacao' self.metodo = 'NFeAutorizacao'
self.tag_retorno = 'retEnviNFe' self.tag_retorno = 'retEnviNFe'
self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx' self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx'
self.url = 'nfe.sefazrs.rs.gov.br' self.url = 'nfe.sefazrs.rs.gov.br'
return self._executar_consulta(xml) return self._executar_consulta(xml)
def autorizar_nfe_e_recibo(self, nfe): def autorizar_nfe_e_recibo(self, nfe):
xml = self._validar_xml(nfe) xml = self._validar_xml(nfe)
self.metodo = 'NFeAutorizacao' self.metodo = 'NFeAutorizacao'
self.tag_retorno = 'retEnviNFe' self.tag_retorno = 'retEnviNFe'
self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx' self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx'
self.url = 'nfe.sefazrs.rs.gov.br' self.url = 'nfe.sefazrs.rs.gov.br'
xml_recibo, recibo = self._executar_consulta(xml) xml_recibo, recibo = self._executar_consulta(xml)
consulta_recibo = utils.gerar_consulta_recibo(recibo) consulta_recibo = utils.gerar_consulta_recibo(recibo)
xml = self._validar_xml(nfe) xml = self._validar_xml(nfe)
self.metodo = 'NFeRetAutorizacao' self.metodo = 'NFeRetAutorizacao'
self.tag_retorno = 'retConsReciNFe' self.tag_retorno = 'retConsReciNFe'
self.web_service = 'ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx' self.web_service = 'ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx'
self.url = 'nfe.sefazrs.rs.gov.br' self.url = 'nfe.sefazrs.rs.gov.br'
return self._executar_consulta(xml)
return self._executar_consulta(xml), consulta_recibo

2
pytrustnfe/utils.py

@ -19,7 +19,7 @@ def datetime_tostring(data):
return data.strftime("%d-%m-%y %H:%M:%S") return data.strftime("%d-%m-%y %H:%M:%S")
def gerar_consulta_recibo(recibo):
def gerar_consulta_recibo(recibo):
c = DynamicXml('consReciNFe') c = DynamicXml('consReciNFe')
c(xmlns="http://www.portalfiscal.inf.br/nfe", versao="2.00") c(xmlns="http://www.portalfiscal.inf.br/nfe", versao="2.00")
c.tpAmb = recibo.tpAmb c.tpAmb = recibo.tpAmb

29
setup.py

@ -1,28 +1,31 @@
#coding=utf-8
# coding=utf-8
from setuptools import setup, find_packages from setuptools import setup, find_packages
setup( setup(
name = "PyNfeTrust",
version = "0.1",
author = "Danimar Ribeiro",
author_email = 'danimaribeiro@gmail.com',
keywords = ['nfe', 'mdf-e'],
name="PyNfeTrust",
version="0.1",
author="Danimar Ribeiro",
author_email='danimaribeiro@gmail.com',
keywords=['nfe', 'mdf-e'],
classifiers=[ classifiers=[
'Development Status :: 1 - alpha', 'Development Status :: 1 - alpha',
'Environment :: Plugins', 'Environment :: Plugins',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)',
'License :: OSI Approved :: GNU Lesser General Public License v2 or \
later (LGPLv2+)',
'Operating System :: OS Independent', 'Operating System :: OS Independent',
'Programming Language :: Python', 'Programming Language :: Python',
'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: Software Development :: Libraries :: Python Modules',
], ],
packages = find_packages(exclude=['*test*']),
url = 'https://github.com/danimaribeiro/PyNfeTrust',
license = 'LGPL-v2.1+',
description = 'PyNfeTrust é uma biblioteca para envio de NF-e',
long_description = 'PyNfeTrust',
packages=find_packages(exclude=['*test*']),
url='https://github.com/danimaribeiro/PyNfeTrust',
license='LGPL-v2.1+',
description='PyNfeTrust é uma biblioteca para envio de NF-e',
long_description='PyNfeTrust',
install_requires=[ install_requires=[
'PyXMLSec >= 0.3.0'
'PyXMLSec >= 0.3.0',
'Jinja2 >= 2.8',
'signxml >= 1.0.0',
], ],
test_suite='nose.collector', test_suite='nose.collector',
tests_require=[ tests_require=[

Loading…
Cancel
Save