¿Cómo extraer mediante programación datos en tiempo real de software de stock gratuito?
Si escribe su propio programa, una forma es obtener datos de una fuente de información proporcionada, como un servicio web. Otra forma es conectarse al análisis duro de páginas web que proporcionan información en tiempo real.
El ejemplo de código es el siguiente:
Creado el jueves 23 de julio a las 09:17:27 de 2015
@autor: jet
"""
DAY_PRICE_COLS = ['fecha', 'apertura', 'alto', 'cierre', 'bajo', 'volumen',
'cambio', '% chg', 'ma5', 'ma10', 'ma20',
'vma5', 'vma10', 'vma20', 'facturación']
DAY_PRICE_URL = '%sapi .finanzas.%s/%s/?code=%s&type=last'
INDEX_KEY = ['SH', 'SZ', 'HS300', 'SZ50', 'GEB', 'SMEB' ] p>
INDEX_LIST = {'SH': 'sh000001', 'SZ': 'sz399001', 'HS300': 'sz399300',
'SZ50': 'sh000016', 'GEB' : 'sz399006', 'SMEB': 'sz399005'}
INDEX_DAY_PRICE_COLS= ['fecha', 'apertura', 'alta', 'cerrada', 'baja', 'volumen',
'chg', '%chg', 'ma5', 'ma10', 'ma20',
'vma5', 'vma10', 'vma20']
K_TYPE_KEY = ['D', 'W', 'M']
K_TYPE_MIN_KEY = ['5', '15', '30', '60']
K_TYPE = {'D': 'akdaily', 'W': 'akweekly', 'M': 'akmonthly'}
MIN_PRICE_URL = '%sapi.finance.%s/akmin?scode =%s&type =%s'
PAGE_TYPE = {'', 'ifeng': 'ifeng.com'}
URL_ERROR_MSG = 'Error al obtener, verifique el estado de la red, o la URL del puerto API ha sido No coincide '
get_hist_data.py
# -*- codificación: utf-8 -*-
"""
Creado el jueves 23 de julio 09:15:40 2015
@autor: jet
"""
importar const como ct
importar pandas como pd
importar json
desde urllib2 importar urlopen,Solicitud
def get_hist_data(código = Ninguno, inicio = Ninguno , end = Ninguno, ktype = 'D'):
"""
Función:
Obtener datos históricos de transacciones de acciones individuales
------- -
Entrada:
--------
código:cadena
Código de stock como: 601989
start:string
Formato de fecha de inicio: AAAA-MM-DD Si está vacío, obtenga los datos de fecha más antiguos proporcionados por la API p><
p>end:string
Formato de fecha de finalización: AAAA-MM-DD, si está vacío, obtenga los datos del último día de negociación
ktype:string(default=D, unificado automáticamente dentro de la función (en mayúsculas)
Tipo de datos D=línea K diaria, W=línea K semanal, M=línea K mensual, 5=5 minutos, 15=15 minutos
30=30 minutos, 60=60 minutos
Salida:
--------
DataFrame
fecha fecha
precio de apertura de apertura
precio más alto más alto
precio de cierre de cierre
precio más bajo más bajo
cambiar cantidad de cambio
p_chg Aumentar y disminuir
ma5 Precio promedio de 5 días
ma10 Precio promedio de 10 días
ma20 20 precio promedio en días
vma5 volumen promedio en 5 días
vma10 volumen promedio en 10 días
vma20 volumen promedio en 20 días
tasa de rotación (el índice no tiene este elemento)
"""
code = code_to_APIcode(code.upper())
ktype = ktype. superior()
url = ''
url = get_url(ktipo, código)
imprimir(url)
js = json .loads(ping_API(url))
cols = []
if len(js['record'][0]) == 14:
cols = ct.INDEX_DAY_PRICE_COLS
otro:
cols = ct.DAY_PRICE_COLS
df = pd.DataFrame(js['record'], columnas=cols)
si ktype en ct.K_TYPE_KEY:
df = df.applymap(lambda x:x.replace(u',', u''))
para col en cols[1:]:
df[col]=df[col].astype(float)
si el inicio no es Ninguno:
df = df [df.date >= start]
si el final no es Ninguno:
df = df[df.date <= end]
df = df.set_index('fecha')
return df
def code_to_APIcode(código):
"""
Función :
Verificar si el código de stock ingresado es correcto, si es correcto devolver el código de stock correspondiente a la API
"""
print(code)
si el código está en ct.INDEX_KEY:
devuelve ct.INDEX_LIST[código]
si no:
si len(código) != 6:
raise IOError('¡error de entrada de código! ')
de lo contrario:
devuelve 'sh%s'%code si el código[:1] está en ['5', '6'] else 'sz%s'%code
def get_url(ktype, código):
"""
Función:
Verificar si el tipo de línea K ingresado es correcto, si es correcto, devolver la URL
"""
si ktype en ct.K_TYPE_KEY:
url = ct.DAY_PRICE_URL % (ct.PAGE_TYPE['http'], ct.PAGE_DOMAIN['ifeng'],
ct.K_TYPE[ktype], código)
url de retorno
elif ktype en ct.K_TYPE_MIN_KEY:
url = ct.MIN_PRICE_URL % (ct .PAGE_TYPE['http'], ct.PAGE_DOMAIN['ifeng'],
código, ktype)
URL de retorno
más:
raise IOError('ktype error de entrada!')
def ping_API(url):
"""
Función: p>
Envía una solicitud de datos a la API, y si el enlace es normal, se devolverán los datos
"""
text = ''
pruebe:
req = Solicitud(url)
texto = urlopen(req,timeout=10).read()
if len(texto ) < 15:
raise IOError('¡sin datos!')
excepto excepción como e:
print(e)
else:
devolver texto
#Entrada de prueba
print(get_hist_data('601989','2015-07-11','2015-07-22 '))