Skip to content

Commit

Permalink
refactoreo clase BaseWS con metodos unificados para todos los webserv…
Browse files Browse the repository at this point in the history
…ices de AFIP
  • Loading branch information
reingart committed Dec 27, 2013
1 parent a5bdf0d commit 278801b
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 350 deletions.
115 changes: 114 additions & 1 deletion utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
__license__ = "GPL 3.0"

import inspect
import socket
import sys
import os
import traceback
from cStringIO import StringIO

from pysimplesoap.client import SoapFault
from pysimplesoap.client import SimpleXMLElement, SoapClient, SoapFault, parse_proxy, set_http_wrapper

try:
import dbf
Expand Down Expand Up @@ -150,6 +152,117 @@ def capturar_errores_wrapper(self, *args, **kwargs):
return capturar_errores_wrapper


class BaseWS:
"Infraestructura basica para interfaces webservices de AFIP"

def Conectar(self, cache=None, wsdl=None, proxy="", wrapper=None, cacert=None, timeout=30):
"Conectar cliente soap del web service"
# analizar transporte y servidor proxy:
if wrapper:
Http = set_http_wrapper(wrapper)
self.Version = self.Version + " " + Http._wrapper_version
if isinstance(proxy, dict):
proxy_dict = proxy
else:
proxy_dict = parse_proxy(proxy)
if self.HOMO or not wsdl:
wsdl = self.WSDL
# agregar sufijo para descargar descripción del servicio ?WSDL o ?wsdl
if not wsdl.endswith(self.WSDL[-5:]) and wsdl.startswith("http"):
wsdl += self.WSDL[-5:]
if not cache or self.HOMO:
# use 'cache' from installation base directory
cache = os.path.join(self.InstallDir, 'cache')
self.log("Conectando a wsdl=%s cache=%s proxy=%s" % (wsdl, cache, proxy_dict))
# analizar espacio de nombres (axis vs .net):
ns = 'ser' if self.WSDL[-5:] == "?wsdl" else None
self.client = SoapClient(
wsdl = wsdl,
cache = cache,
proxy = proxy_dict,
cacert = cacert,
timeout = timeout,
ns = ns,
trace = "--trace" in sys.argv)
return True

def log(self, msg):
"Dejar mensaje en bitacora de depuración (método interno)"
if not isinstance(msg, unicode):
msg = unicode(msg, 'utf8', 'ignore')
if not self.Log:
self.Log = StringIO()
self.Log.write(msg)
self.Log.write('\n\r')

def DebugLog(self):
"Devolver y limpiar la bitácora de depuración"
if self.Log:
msg = self.Log.getvalue()
# limpiar log
self.Log.close()
self.Log = None
else:
msg = u''
return msg

def LoadTestXML(self, xml):
class DummyHTTP:
def __init__(self, xml_response):
self.xml_response = xml_response
def request(self, location, method, body, headers):
return {}, self.xml_response
self.client.http = DummyHTTP(xml)

@property
def xml_request(self):
return self.XmlRequest

@property
def xml_response(self):
return self.XmlResponse

def AnalizarXml(self, xml=""):
"Analiza un mensaje XML (por defecto la respuesta)"
try:
if not xml or xml=='XmlResponse':
xml = self.XmlResponse
elif xml=='XmlRequest':
xml = self.XmlRequest
self.xml = SimpleXMLElement(xml)
return True
except Exception, e:
self.Excepcion = u"%s" % (e)
return False

def ObtenerTagXml(self, *tags):
"Busca en el Xml analizado y devuelve el tag solicitado"
# convierto el xml a un objeto
try:
if self.xml:
xml = self.xml
# por cada tag, lo busco segun su nombre o posición
for tag in tags:
xml = xml(tag) # atajo a getitem y getattr
# vuelvo a convertir a string el objeto xml encontrado
return str(xml)
except Exception, e:
self.Excepcion = u"%s" % (e)

def SetParametros(self, cuit, token, sign):
"Establece un parámetro general"
self.Token = token
self.Sign = sign
self.Cuit = cuit
return True

def SetTicketAcceso(self, ta_string):
"Establecer el token y sign desde un ticket de acceso XML"
ta = SimpleXMLElement(ta_string)
self.Token = str(ta.credentials.token)
self.Sign = str(ta.credentials.sign)
return True

# Funciones para manejo de archivos de texto de campos de ancho fijo:


Expand Down
83 changes: 9 additions & 74 deletions wscdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
__license__ = "GPL 3.0"
__version__ = "1.01a"

import sys, os, traceback
from pysimplesoap.client import SimpleXMLElement, SoapClient, SoapFault, parse_proxy, set_http_wrapper
import sys, os
from utils import inicializar_y_capturar_excepciones, BaseWS

# Constantes (si se usa el script de linea de comandos)
WSDL = "https://wswhomo.afip.gov.ar/WSCDC/service.asmx?WSDL"
Expand All @@ -27,7 +27,7 @@
# No debería ser necesario modificar nada despues de esta linea


class WSCDC:
class WSCDC(BaseWS):
"Interfaz para el WebService de Constatación de Comprobantes"
_public_methods_ = ['Conectar',
'AnalizarXml', 'ObtenerTagXml', 'Expirado',
Expand All @@ -45,46 +45,19 @@ class WSCDC:
_reg_progid_ = "WSCDC"
_reg_clsid_ = "{6206DF5E-3EEF-47E9-A532-CD81EBBAF3AA}"

# Variables globales para BaseWS:
HOMO = HOMO
WSDL = WSDL
Version = "%s %s" % (__version__, HOMO and 'Homologación' or '')

def __init__(self):
self.Token = self.Sign = None
self.InstallDir = INSTALL_DIR
self.Excepcion = self.Traceback = ""
self.LanzarExcepciones = True
self.XmlRequest = self.XmlResponse = ""
self.xml = None

def Conectar(self, cache=None, wsdl=None, proxy="", wrapper=None, cacert=None):
# cliente soap del web service
try:
if wrapper:
Http = set_http_wrapper(wrapper)
self.Version = WSAA.Version + " " + Http._wrapper_version
if not isinstance(proxy,dict):
proxy_dict = parse_proxy(proxy)
else:
proxy_dict = proxy
if HOMO or not wsdl:
wsdl = WSDL
if not cache or HOMO:
# use 'cache' from installation base directory
cache = os.path.join(self.InstallDir, 'cache')
self.client = SoapClient(
wsdl = wsdl,
cache = cache,
proxy = proxy_dict,
cacert = cacert,
trace = "--trace" in sys.argv)
return True
except Exception, e:
ex = traceback.format_exception( sys.exc_type, sys.exc_value, sys.exc_traceback)
self.Traceback = ''.join(ex)
self.Excepcion = traceback.format_exception_only( sys.exc_type, sys.exc_value)[0]
if self.LanzarExcepciones:
raise
return False

self.reintentos = 0
self.xml = self.client = self.Log = None

def Dummy(self):
"Método Dummy para verificación de funcionamiento de infraestructura"
Expand All @@ -94,44 +67,6 @@ def Dummy(self):
self.AuthServerStatus = result['AuthServer']
return True


def AnalizarXml(self, xml=""):
"Analiza un mensaje XML (por defecto el ticket de acceso)"
try:
if not xml or xml=='XmlResponse':
xml = self.XmlResponse
elif xml=='XmlRequest':
xml = self.XmlRequest
self.xml = SimpleXMLElement(xml)
return True
except Exception, e:
self.Excepcion = traceback.format_exception_only( sys.exc_type, sys.exc_value)[0]
return False

def ObtenerTagXml(self, *tags):
"Busca en el Xml analizado y devuelve el tag solicitado"
# convierto el xml a un objeto
try:
if self.xml:
xml = self.xml
# por cada tag, lo busco segun su nombre o posición
for tag in tags:
xml = xml(tag) # atajo a getitem y getattr
# vuelvo a convertir a string el objeto xml encontrado
return str(xml)
except Exception, e:
self.Excepcion = traceback.format_exception_only( sys.exc_type, sys.exc_value)[0]

def Expirado(self, fecha=None):
"Comprueba la fecha de expiración, devuelve si ha expirado"
try:
if not fecha:
fecha = self.ObtenerTagXml('expirationTime')
now = datetime.datetime.now()
d = datetime.datetime.strptime(fecha[:19], '%Y-%m-%dT%H:%M:%S')
return now > d
except Exception, e:
self.Excepcion = traceback.format_exception_only( sys.exc_type, sys.exc_value)[0]


# busco el directorio de instalación (global para que no cambie si usan otra dll)
Expand Down
Loading

0 comments on commit 278801b

Please sign in to comment.