Browse Source

Utilizando o suds para efetuar consultas via webservice

tags/0.1.5
Danimar Ribeiro 10 years ago
parent
commit
809fe39b84
  1. 100
      pytrustnfe/servicos/assinatura.py
  2. 28
      pytrustnfe/servicos/comunicacao.py
  3. 50
      pytrustnfe/servicos/nfe_autorizacao.py
  4. 6
      pytrustnfe/utils.py

100
pytrustnfe/servicos/assinatura.py

@ -5,14 +5,21 @@ Created on Jun 14, 2015
@author: danimar
'''
import xmlsec
import libxml2
import os.path
from signxml import xmldsig
from signxml import methods
from lxml import etree
from OpenSSL import crypto
NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#'
def extract_cert_and_key_from_pfx(pfx, password):
pfx = crypto.load_pkcs12(pfx, password)
# PEM formatted private key
key = crypto.dump_privatekey(crypto.FILETYPE_PEM,
pfx.get_privatekey())
# PEM formatted certificate
cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
pfx.get_certificate())
return cert, key
def recursively_empty(e):
@ -21,7 +28,7 @@ def recursively_empty(e):
return all((recursively_empty(c) for c in e.iterchildren()))
def assinar(xml, cert, key, reference, pfx, senha):
def assinar(xml, cert, key, reference):
context = etree.iterwalk(xml)
for action, elem in context:
parent = elem.getparent()
@ -43,87 +50,4 @@ def assinar(xml, cert, key, reference, pfx, senha):
c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315')
xmldsig(signed_root, digest_algorithm=u'sha1').verify(x509_cert=cert)
# signature = Assinatura(pfx, senha)
# xmlsec = signature.assina_xml(not_signed, reference)
# xmlsec = xmlsec.replace("""<!DOCTYPE NFe [
# <!ATTLIST infNFe Id ID #IMPLIED>
# ]>\n""", "")
return etree.tostring(signed_root)
# , xmlsec
class Assinatura(object):
def __init__(self, arquivo, senha):
self.arquivo = arquivo
self.senha = senha
def _checar_certificado(self):
if not os.path.isfile(self.arquivo):
raise Exception('Caminho do certificado não existe.')
def _inicializar_cripto(self):
libxml2.initParser()
libxml2.substituteEntitiesDefault(1)
xmlsec.init()
xmlsec.cryptoAppInit(None)
xmlsec.cryptoInit()
def _finalizar_cripto(self):
xmlsec.cryptoShutdown()
xmlsec.cryptoAppShutdown()
xmlsec.shutdown()
libxml2.cleanupParser()
def assina_xml(self, xml, reference):
self._checar_certificado()
self._inicializar_cripto()
try:
doc_xml = libxml2.parseMemory(xml.encode('utf-8'),
len(xml.encode('utf-8')))
import ipdb; ipdb.set_trace()
signNode = xmlsec.TmplSignature(doc_xml,
xmlsec.transformInclC14NId(),
xmlsec.transformRsaSha1Id(), None)
doc_xml.getRootElement().addChild(signNode)
refNode = signNode.addReference(
xmlsec.transformSha1Id(),
None, reference, None)
refNode.addTransform(xmlsec.transformEnvelopedId())
refNode.addTransform(xmlsec.transformInclC14NId())
keyInfoNode = signNode.ensureKeyInfo()
keyInfoNode.addX509Data()
dsig_ctx = xmlsec.DSigCtx()
chave = xmlsec.cryptoAppKeyLoad(
filename=str(self.arquivo),
format=xmlsec.KeyDataFormatPkcs12,
pwd=str(self.senha), pwdCallback=None, pwdCallbackCtx=None)
dsig_ctx.signKey = chave
dsig_ctx.sign(signNode)
status = dsig_ctx.status
dsig_ctx.destroy()
if status != xmlsec.DSigStatusSucceeded:
raise RuntimeError(
'Erro ao realizar a assinatura do arquivo; status: "' +
str(status) + '"')
xpath = doc_xml.xpathNewContext()
xpath.xpathRegisterNs('sig', NAMESPACE_SIG)
certs = xpath.xpathEval('//sig:X509Data/sig:X509Certificate')
for i in range(len(certs)-1):
certs[i].unlinkNode()
certs[i].freeNode()
xml = doc_xml.serialize()
return xml
finally:
doc_xml.freeDoc()
self._finalizar_cripto()

28
pytrustnfe/servicos/comunicacao.py

@ -5,6 +5,9 @@ Created on Jun 14, 2015
@author: danimar
'''
import suds.client
import suds_requests
import requests
from lxml import objectify
from uuid import uuid4
from pytrustnfe.HttpClient import HttpClient
@ -25,8 +28,29 @@ class Comunicacao(object):
tag_retorno = ''
def __init__(self, cert, key):
self.certificado = cert
self.senha = key
self.cert = cert
self.key = key
def _get_client(self, base_url):
cache_location = '/tmp/suds'
cache = suds.cache.DocumentCache(location=cache_location)
f = open('/tmp/suds/cert_nfe.cer', 'w')
f.write(self.cert)
f.close()
f = open('/tmp/suds/key_nfe.cer', 'w')
f.write(self.key)
f.close()
session = requests.Session()
session.verify = False
session.cert = ('/tmp/suds/cert_nfe.cer',
'/tmp/suds/key_nfe.cer')
return suds.client.Client(
base_url,
cache=cache,
transport=suds_requests.RequestsTransport(session)
)
def _soap_xml(self, body):
xml = '''<?xml version="1.0" encoding="utf-8"?>

50
pytrustnfe/servicos/nfe_autorizacao.py

@ -5,6 +5,7 @@ Created on 21/06/2015
@author: danimar
'''
from lxml import etree
from suds.sax.element import Element
from pytrustnfe.servicos.comunicacao import Comunicacao
from pytrustnfe import utils
from pytrustnfe.xml import render_xml
@ -13,43 +14,30 @@ from pytrustnfe.servicos.assinatura import assinar
class NfeAutorizacao(Comunicacao):
def __init__(self, cert, key, certificado, senha):
Comunicacao.__init__(self, certificado, senha)
self.cert = cert
self.key = key
def __init__(self, cert, key):
Comunicacao.__init__(self, cert, key)
def autorizar_nfe(self, nfe):
def autorizar_nfe(self, nfe, id):
self._validar_nfe(nfe)
xml = render_xml('nfeEnv.xml', **nfe)
self.metodo = 'NFeAutorizacao'
self.tag_retorno = 'retEnviNFe'
self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx'
self.url = 'nfe.sefazrs.rs.gov.br'
xml_signed = assinar(xml, self.cert, self.key, '#%s' % id)
return self._executar_consulta(xml)
client = self._get_client(
'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx?wsdl')
def autorizar_nfe_e_recibo(self, nfe, id):
self._validar_nfe(nfe)
xml = render_xml('nfeEnv.xml', **nfe)
return assinar(xml, self.cert, self.key,
'#%s' % id,
self.certificado, self.senha)
cabecalho = client.factory.create('nfeCabecMsg')
cabecalho.cUF = '43'
cabecalho.versaoDados = '3.10'
client.set_options(soapheaders=cabecalho)
self.metodo = 'NFeAutorizacao'
self.tag_retorno = 'retEnviNFe'
self.web_service = 'ws/NfeAutorizacao/NFeAutorizacao.asmx'
self.url = 'nfe.sefazrs.rs.gov.br'
xml_recibo, recibo = self._executar_consulta(xml)
consulta_recibo = utils.gerar_consulta_recibo(recibo)
self._validar_nfe(nfe)
resposta = client.service.nfeAutorizacaoLote(xml_signed)
print client.last_sent()
print client.last_received()
self.metodo = 'NFeRetAutorizacao'
self.tag_retorno = 'retConsReciNFe'
self.web_service = 'ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx'
self.url = 'nfe.sefazrs.rs.gov.br'
consulta_recibo = utils.gerar_consulta_recibo(resposta)
return self._executar_consulta(xml), consulta_recibo
client = self._get_client(
'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx'
)
return client.service.nfeRetAutorizacao(consulta_recibo)

6
pytrustnfe/utils.py

@ -19,11 +19,7 @@ def datetime_tostring(data):
def gerar_consulta_recibo(recibo):
c = DynamicXml('consReciNFe')
c(xmlns="http://www.portalfiscal.inf.br/nfe", versao="2.00")
c.tpAmb = recibo.tpAmb
c.nRec = recibo.infRec.nRec
return c
return {'tpAmb': recibo.tpAmb, 'nRec': recibo.infRec.nRec}
def gerar_chave(obj_chave, prefix=None):

Loading…
Cancel
Save