Browse Source
Removendo arquivos extras
Removendo arquivos extras
Assinatura através do signxml sendo chamada corretamentetags/0.1.5
23 changed files with 349 additions and 401 deletions
-
31pytrustnfe/HttpClient.py
-
35pytrustnfe/Servidores.py
-
29pytrustnfe/certificado.py
-
23pytrustnfe/client.py
-
51pytrustnfe/nfe/__init__.py
-
130pytrustnfe/nfe/assinatura.py
-
16pytrustnfe/nfe/comunicacao.py
-
21pytrustnfe/servicos/NFeDistribuicaoDFe.py
-
21pytrustnfe/servicos/NFeRetAutorizacao.py
-
25pytrustnfe/servicos/NfeConsultaCadastro.py
-
21pytrustnfe/servicos/NfeConsultaProtocolo.py
-
21pytrustnfe/servicos/NfeInutilizacao.py
-
19pytrustnfe/servicos/NfeStatusServico.py
-
20pytrustnfe/servicos/RecepcaoEvento.py
-
15pytrustnfe/servicos/Validacao.py
-
48pytrustnfe/servicos/assinatura.py
-
41pytrustnfe/servicos/nfe_autorizacao.py
-
33pytrustnfe/utils.py
-
4pytrustnfe/xml/__init__.py
-
0pytrustnfe/xml/consultar_cadastro.xml
-
18pytrustnfe/xml/nfeEnv.xml
@ -1,31 +0,0 @@ |
|||
# coding=utf-8 |
|||
''' |
|||
Created on Jun 16, 2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
import requests |
|||
|
|||
|
|||
class HttpClient(object): |
|||
|
|||
def __init__(self, url, cert_path, key_path): |
|||
self.url = url |
|||
self.cert_path = cert_path |
|||
self.key_path = key_path |
|||
|
|||
def _headers(self): |
|||
return { |
|||
u'Content-type': u'application/soap+xml; charset=utf-8; action="http://www.portalfiscal.inf.br/nfe/wsdl/NfeAutorizacao/nfeAutorizacaoLote', |
|||
u'Accept': u'application/soap+xml; charset=utf-8' |
|||
} |
|||
|
|||
def post_xml(self, post, xml): |
|||
try: |
|||
url = 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx' |
|||
res = requests.post(url, data=xml, cert=(self.cert_path, self.key_path), |
|||
verify=False, headers=self._headers()) |
|||
return res.text |
|||
except Exception as e: |
|||
print(str(e)) |
|||
|
|||
@ -0,0 +1,51 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# © 2016 Danimar Ribeiro, Trustcode |
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). |
|||
|
|||
|
|||
import os |
|||
from lxml import etree |
|||
from .comunicacao import Comunicacao |
|||
from .assinatura import assinar, Assinatura |
|||
from pytrustnfe import utils |
|||
from pytrustnfe.xml import render_xml |
|||
|
|||
|
|||
class NFe(Comunicacao): |
|||
|
|||
def __init__(self, cert, key): |
|||
Comunicacao.__init__(self, cert, key) |
|||
|
|||
def consultar_cadastro(self, cadastro, estado): |
|||
self.url = 'https://cad.sefazrs.rs.gov.br/ws/cadconsultacadastro/cadconsultacadastro2.asmx' |
|||
self.metodo = 'NfeConsultaCadastro' |
|||
|
|||
path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml') |
|||
xml = render_xml(path, 'consultar_cadastro.xml', **cadastro) |
|||
|
|||
xml_response, obj = self._executar_consulta(xml) |
|||
|
|||
return { |
|||
'sent_xml': xml, |
|||
'received_xml': xml_response, |
|||
'object': obj.Body.nfeAutorizacaoLoteResult |
|||
} |
|||
|
|||
|
|||
def autorizar_nfe(self, nfe, nfe_id): |
|||
self.url = 'https://nfe-homologacao.sefazrs.rs.gov.br/ws/NfeAutorizacao/NFeAutorizacao.asmx' |
|||
self.metodo = 'NfeAutorizacao/nfeAutorizacaoLote' |
|||
|
|||
path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml') |
|||
xml = render_xml(path, 'nfeEnv.xml', **nfe) |
|||
|
|||
xmlElem = etree.fromstring(xml) |
|||
xml_signed = assinar(xmlElem, self.cert, self.key, '#%s' % nfe_id) |
|||
|
|||
xml_response, obj = self._executar_consulta(xml_signed) |
|||
|
|||
return { |
|||
'sent_xml': xml_signed, |
|||
'received_xml': xml_response, |
|||
'object': obj.Body.nfeAutorizacaoLoteResult |
|||
} |
|||
@ -0,0 +1,130 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# © 2016 Danimar Ribeiro, Trustcode |
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). |
|||
|
|||
import xmlsec |
|||
import libxml2 |
|||
import os.path |
|||
|
|||
from signxml import XMLSigner |
|||
from signxml import methods |
|||
from lxml import etree |
|||
from OpenSSL import crypto |
|||
|
|||
NAMESPACE_SIG = 'http://www.w3.org/2000/09/xmldsig#' |
|||
|
|||
|
|||
def extract_cert_and_key_from_pfx(pfx, password): |
|||
pfx = crypto.load_pkcs12(pfx, password) |
|||
# PEM formatted private key |
|||
key = crypto.dump_privatekey(crypto.FILETYPE_PEM, |
|||
pfx.get_privatekey()) |
|||
# PEM formatted certificate |
|||
cert = crypto.dump_certificate(crypto.FILETYPE_PEM, |
|||
pfx.get_certificate()) |
|||
return cert, key |
|||
|
|||
|
|||
def recursively_empty(e): |
|||
if e.text: |
|||
return False |
|||
return all((recursively_empty(c) for c in e.iterchildren())) |
|||
|
|||
|
|||
def assinar(xml, cert, key, reference): |
|||
context = etree.iterwalk(xml) |
|||
for dummy, elem in context: |
|||
parent = elem.getparent() |
|||
if recursively_empty(elem): |
|||
parent.remove(elem) |
|||
|
|||
element = xml.find('{' + xml.nsmap[None] + '}NFe') |
|||
signer = XMLSigner(digest_algorithm=u'sha1',signature_algorithm="rsa-sha1", |
|||
method=methods.enveloped, |
|||
c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315') |
|||
ns = {} |
|||
ns[None] = signer.namespaces['ds'] |
|||
signer.namespaces = ns |
|||
signed_root = signer.sign(element, key=str(key), cert=cert, reference_uri=reference) |
|||
|
|||
xml.remove(element) |
|||
xml.append(signed_root) |
|||
return etree.tostring(xml) |
|||
|
|||
|
|||
class Assinatura(object): |
|||
|
|||
def __init__(self, arquivo, senha): |
|||
self.arquivo = arquivo |
|||
self.senha = senha |
|||
|
|||
def _checar_certificado(self): |
|||
if not os.path.isfile(self.arquivo): |
|||
raise Exception('Caminho do certificado não existe.') |
|||
|
|||
def _inicializar_cripto(self): |
|||
libxml2.initParser() |
|||
libxml2.substituteEntitiesDefault(1) |
|||
|
|||
xmlsec.init() |
|||
xmlsec.cryptoAppInit(None) |
|||
xmlsec.cryptoInit() |
|||
|
|||
def _finalizar_cripto(self): |
|||
xmlsec.cryptoShutdown() |
|||
xmlsec.cryptoAppShutdown() |
|||
xmlsec.shutdown() |
|||
|
|||
libxml2.cleanupParser() |
|||
|
|||
def assina_xml(self, xml, reference): |
|||
self._checar_certificado() |
|||
self._inicializar_cripto() |
|||
try: |
|||
doc_xml = libxml2.parseMemory( |
|||
xml.encode('utf-8'), len(xml.encode('utf-8'))) |
|||
|
|||
signNode = xmlsec.TmplSignature(doc_xml, xmlsec.transformInclC14NId(), |
|||
xmlsec.transformRsaSha1Id(), None) |
|||
|
|||
doc_xml.getLastChild().addChild(signNode) |
|||
refNode = signNode.addReference(xmlsec.transformSha1Id(), |
|||
None, reference, None) |
|||
|
|||
refNode.addTransform(xmlsec.transformEnvelopedId()) |
|||
refNode.addTransform(xmlsec.transformInclC14NId()) |
|||
keyInfoNode = signNode.ensureKeyInfo() |
|||
keyInfoNode.addX509Data() |
|||
|
|||
dsig_ctx = xmlsec.DSigCtx() |
|||
chave = xmlsec.cryptoAppKeyLoad(filename=str(self.arquivo), |
|||
format=xmlsec.KeyDataFormatPkcs12, |
|||
pwd=str(self.senha), |
|||
pwdCallback=None, |
|||
pwdCallbackCtx=None) |
|||
|
|||
dsig_ctx.signKey = chave |
|||
dsig_ctx.sign(signNode) |
|||
|
|||
status = dsig_ctx.status |
|||
dsig_ctx.destroy() |
|||
|
|||
if status != xmlsec.DSigStatusSucceeded: |
|||
raise RuntimeError( |
|||
'Erro ao realizar a assinatura do arquivo; status: "' + |
|||
str(status) + |
|||
'"') |
|||
|
|||
xpath = doc_xml.xpathNewContext() |
|||
xpath.xpathRegisterNs('sig', NAMESPACE_SIG) |
|||
certificados = xpath.xpathEval( |
|||
'//sig:X509Data/sig:X509Certificate') |
|||
for i in range(len(certificados) - 1): |
|||
certificados[i].unlinkNode() |
|||
certificados[i].freeNode() |
|||
|
|||
xml = doc_xml.serialize() |
|||
return xml |
|||
finally: |
|||
doc_xml.freeDoc() |
|||
self._finalizar_cripto() |
|||
@ -1,21 +0,0 @@ |
|||
#coding=utf-8 |
|||
''' |
|||
Created on 21/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
from pytrustnfe.servicos.Comunicacao import Comunicacao |
|||
from pytrustnfe.xml import DynamicXml |
|||
|
|||
|
|||
class NfeDistribuicaoDFe(Comunicacao): |
|||
|
|||
def distribuicao(self, dfe): |
|||
xml = self._validar_xml(recibo) |
|||
|
|||
self.metodo = 'NFeDistribuicaoDFe' |
|||
self.tag_retorno = 'retDistDFeInt' |
|||
self.web_service = 'NFeDistribuicaoDFe/NFeDistribuicaoDFe.asmx' |
|||
self.url = 'www1.nfe.fazenda.gov.br' |
|||
|
|||
return self._executar_consulta(xml) |
|||
@ -1,21 +0,0 @@ |
|||
#coding=utf-8 |
|||
''' |
|||
Created on 21/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
from pytrustnfe.servicos.Comunicacao import Comunicacao |
|||
from pytrustnfe.xml import DynamicXml |
|||
|
|||
|
|||
class NfeRetAutorizacao(Comunicacao): |
|||
|
|||
def consulta_autorizacao(self, recibo): |
|||
xml = self._validar_xml(recibo) |
|||
|
|||
self.metodo = 'NFeRetAutorizacao' |
|||
self.tag_retorno = 'retConsReciNFe' |
|||
self.web_service = 'ws/NfeRetAutorizacao/NFeRetAutorizacao.asmx' |
|||
self.url = 'nfe.sefazrs.rs.gov.br' |
|||
|
|||
return self._executar_consulta(xml) |
|||
@ -1,25 +0,0 @@ |
|||
#coding=utf-8 |
|||
''' |
|||
Created on 21/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
from pytrustnfe.servicos.Comunicacao import Comunicacao |
|||
from pytrustnfe.xml.DynamicXml import DynamicXml |
|||
|
|||
|
|||
class NfeConsultaCadastro(Comunicacao): |
|||
|
|||
def __init__(self, certificado, senha): |
|||
super(NfeConsultaCadastro, self).__init__(certificado, senha) |
|||
self.metodo = 'CadConsultaCadastro2' |
|||
self.tag_retorno = 'retConsCad' |
|||
|
|||
|
|||
def consultar_cadastro(self, cadastro, estado): |
|||
xml = self._validar_xml(cadastro) |
|||
|
|||
self.web_service = '/ws/cadconsultacadastro/cadconsultacadastro2.asmx' |
|||
self.url = 'cad.svrs.rs.gov.br' |
|||
|
|||
return self._executar_consulta(xml) |
|||
@ -1,21 +0,0 @@ |
|||
#coding=utf-8 |
|||
''' |
|||
Created on 21/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
from pytrustnfe.servicos.Comunicacao import Comunicacao |
|||
from pytrustnfe.xml import DynamicXml |
|||
|
|||
|
|||
class NfeConsultaProtocolo(Comunicacao): |
|||
|
|||
def consultar_protocolo(self, recibo): |
|||
xml = self._validar_xml(recibo) |
|||
|
|||
self.metodo = 'NfeConsulta2' |
|||
self.tag_retorno = 'retConsSitNFe' |
|||
self.web_service = 'ws/NfeConsulta/NfeConsulta2.asmx' |
|||
self.url = 'nfe.sefazrs.rs.gov.br' |
|||
|
|||
return self._executar_consulta(xml) |
|||
@ -1,21 +0,0 @@ |
|||
#coding=utf-8 |
|||
''' |
|||
Created on 21/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
from pytrustnfe.servicos.Comunicacao import Comunicacao |
|||
from pytrustnfe.xml import DynamicXml |
|||
|
|||
|
|||
class NfeInutilizacao(Comunicacao): |
|||
|
|||
def inutilizar(self, inutilizacao): |
|||
xml = self._validar_xml(recibo) |
|||
|
|||
self.metodo = 'nfeinutilizacao2' |
|||
self.tag_retorno = 'retInutNFe' |
|||
self.web_service = 'ws/nfeinutilizacao/nfeinutilizacao2.asmx' |
|||
self.url = 'nfe.sefazrs.rs.gov.br' |
|||
|
|||
return self._executar_consulta(xml) |
|||
@ -1,19 +0,0 @@ |
|||
# coding=utf-8 |
|||
''' |
|||
Created on 21/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
from pytrustnfe.servicos.comunicacao import Comunicacao |
|||
|
|||
class NfeStatusServico(Comunicacao): |
|||
|
|||
def status(self, consulta): |
|||
xml = self._validar_xml(recibo) |
|||
|
|||
self.metodo = 'NfeStatusServico2' |
|||
self.tag_retorno = 'retConsStatServ' |
|||
self.web_service = 'ws/NfeStatusServico/NfeStatusServico2.asmx' |
|||
self.url = 'nfe.sefazrs.rs.gov.br' |
|||
|
|||
return self._executar_consulta(xml) |
|||
@ -1,20 +0,0 @@ |
|||
#coding=utf-8 |
|||
''' |
|||
Created on 21/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
from pytrustnfe.servicos.Comunicacao import Comunicacao |
|||
|
|||
|
|||
class RecepcaoEvento(Comunicacao): |
|||
|
|||
def registrar_evento(self, evento): |
|||
xml = self._validar_xml(recibo) |
|||
|
|||
self.metodo = 'RecepcaoEvento' |
|||
self.tag_retorno = 'retEnvEvento' |
|||
self.web_service = 'ws/recepcaoevento/recepcaoevento.asmx' |
|||
self.url = 'nfe.sefazrs.rs.gov.br' |
|||
|
|||
return self._executar_consulta(xml) |
|||
@ -1,15 +0,0 @@ |
|||
''' |
|||
Created on 24/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
|
|||
def validar_schema(): |
|||
arquivo_esquema = '' |
|||
xml = tira_abertura(self.xml).encode('utf-8') |
|||
|
|||
esquema = etree.XMLSchema(etree.parse(arquivo_esquema)) |
|||
esquema.validate(etree.fromstring(xml)) |
|||
|
|||
namespace = '{http://www.portalfiscal.inf.br/nfe}' |
|||
return "\n".join([x.message.replace(namespace, '') for x in esquema.error_log]) |
|||
@ -1,48 +0,0 @@ |
|||
# coding=utf-8 |
|||
''' |
|||
Created on Jun 14, 2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
|
|||
from signxml import XMLSigner |
|||
from signxml import methods |
|||
from lxml import etree |
|||
from OpenSSL import crypto |
|||
|
|||
|
|||
def extract_cert_and_key_from_pfx(pfx, password): |
|||
pfx = crypto.load_pkcs12(pfx, password) |
|||
# PEM formatted private key |
|||
key = crypto.dump_privatekey(crypto.FILETYPE_PEM, |
|||
pfx.get_privatekey()) |
|||
# PEM formatted certificate |
|||
cert = crypto.dump_certificate(crypto.FILETYPE_PEM, |
|||
pfx.get_certificate()) |
|||
return cert, key |
|||
|
|||
|
|||
def recursively_empty(e): |
|||
if e.text: |
|||
return False |
|||
return all((recursively_empty(c) for c in e.iterchildren())) |
|||
|
|||
|
|||
def assinar(xml, cert, key, reference): |
|||
context = etree.iterwalk(xml) |
|||
for action, elem in context: |
|||
parent = elem.getparent() |
|||
if recursively_empty(elem): |
|||
parent.remove(elem) |
|||
|
|||
# element = xml.find('{' + xml.nsmap[None] + '}NFe') |
|||
signer = XMLSigner(digest_algorithm=u'sha1',signature_algorithm="rsa-sha1", |
|||
method=methods.enveloped, |
|||
c14n_algorithm='http://www.w3.org/TR/2001/REC-xml-c14n-20010315') |
|||
ns = {} |
|||
ns[None] = signer.namespaces['ds'] |
|||
signer.namespaces = ns |
|||
signed_root = signer.sign(xml, key=str(key), cert=cert, reference_uri=reference) |
|||
|
|||
# XMLSigner(signed_root, digest_algorithm=u'sha1').verify(x509_cert=cert) |
|||
return etree.tostring(signed_root) |
|||
@ -1,41 +0,0 @@ |
|||
# coding=utf-8 |
|||
''' |
|||
Created on 21/06/2015 |
|||
|
|||
@author: danimar |
|||
''' |
|||
import os |
|||
from lxml import etree |
|||
from suds.sax.element import Element |
|||
from suds.sax.text import Raw |
|||
from suds.sax.parser import Parser |
|||
from pytrustnfe.servicos.comunicacao import Comunicacao |
|||
from pytrustnfe import utils |
|||
from pytrustnfe.xml import render_xml |
|||
from pytrustnfe.servicos.assinatura import assinar |
|||
|
|||
|
|||
class NfeAutorizacao(Comunicacao): |
|||
|
|||
def __init__(self, cert, key): |
|||
Comunicacao.__init__(self, cert, key) |
|||
|
|||
def autorizar_nfe(self, nfe, id): |
|||
self.url = 'nfe-homologacao.sefazrs.rs.gov.br' |
|||
self.web_service = '/ws/NfeAutorizacao/NFeAutorizacao.asmx' |
|||
self.metodo = 'nfeAutorizacaoLote' |
|||
|
|||
self._validar_nfe(nfe) |
|||
path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'xml') |
|||
xml = render_xml(path, 'nfeEnv.xml', **nfe) |
|||
|
|||
#xmlElem = etree.fromstring(xml) TODO Assinar |
|||
#xml_signed = assinar(xmlElem, self.cert, self.key, '#%s' % id) |
|||
print xml |
|||
xml_response, obj = self._executar_consulta(xml) |
|||
|
|||
return { |
|||
'sent_xml': xml, |
|||
'received_xml': xml_response, |
|||
'object': obj.Body.nfeAutorizacaoLoteResult |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue