forked from asavinov/intelligent-trading-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
collector.py
211 lines (170 loc) · 7.82 KB
/
collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import os
import sys
import argparse
import math, time
from datetime import datetime
from decimal import *
import pandas as pd
import asyncio
from binance.client import Client
from binance.exceptions import *
from binance.helpers import date_to_milliseconds, interval_to_milliseconds
from binance.enums import *
from service.App import *
from common.utils import *
from service.analyzer import *
import logging
log = logging.getLogger('collector')
logging.basicConfig(
filename="collector.log", # parameter in App
level=logging.DEBUG,
#format = "%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
format = "%(asctime)s %(levelname)s %(message)s",
#datefmt = '%Y-%m-%d %H:%M:%S',
)
async def main_collector_task():
"""
It is a highest level task which is added to the event loop and executed normally every 1 minute and then it calls other tasks.
"""
symbol = App.config["symbol"]
startTime, endTime = get_interval("1m")
now_ts = now_timestamp()
log.info(f"===> Start collector task. Timestamp {now_ts}. Interval [{startTime},{endTime}].")
#
# 0. Check server state (if necessary)
#
if data_provider_problems_exist():
await data_provider_health_check()
if data_provider_problems_exist():
log.error(f"Problems with the data provider server found. No signaling, no trade. Will try next time.")
return 1
#
# 1. Ensure that we are up-to-date with klines
#
res = await sync_data_collector_task()
if res > 0:
log.error(f"Problem getting data from the server. No signaling, no trade. Will try next time.")
return 1
log.info(f"<=== End collector task.")
return 0
#
# Request/update market data
#
async def sync_data_collector_task():
"""
Collect latest data.
After executing this task our local (in-memory) data state is up-to-date.
Hence, we can do something useful like data analysis and trading.
Limitations and notes:
- Currently, we can work only with one symbol
- We update only local state by loading latest data. If it is necessary to initialize the db then another function should be used.
"""
data_sources = App.config.get("data_sources", [])
symbols = [x.get("folder") for x in data_sources]
if not symbols:
symbols = [App.config["symbol"]]
# How many records are missing (and to be requested) for each symbol
missing_klines_counts = [App.analyzer.get_missing_klines_count(sym) for sym in symbols]
# Create a list of tasks for retrieving data
#coros = [request_klines(sym, "1m", 5) for sym in symbols]
tasks = [asyncio.create_task(request_klines(s, "1m", c)) for c, s in zip(missing_klines_counts, symbols)]
results = {}
timeout = 10 # Seconds to wait for the result
# Process responses in the order of arrival
for fut in asyncio.as_completed(tasks, timeout=timeout):
# Get the results
res = None
try:
res = await fut
except TimeoutError as te:
log.warning(f"Timeout {timeout} seconds when requesting kline data.")
return 1
except Exception as e:
log.warning(f"Exception when requesting kline data.")
return 1
# Add to the database (will overwrite existing klines if any)
if res and res.keys():
# res is dict for symbol, which is a list of record lists of 12 fields
# ==============================
# TODO: We need to check these fields for validity (presence, non-null)
# TODO: We can load maximum 999 latest klines, so if more 1600, then some other method
# TODO: Print somewhere diagnostics about how many lines are in history buffer of db, and if nans are found
results.update(res)
try:
added_count = App.analyzer.store_klines(res)
except Exception as e:
log.error(f"Error storing kline result in the database. Exception: {e}")
return 1
else:
log.error("Received empty or wrong result from klines request.")
return 1
return 0
async def request_klines(symbol, freq, limit):
"""
Request klines data from the service for one symbol. Maximum the specified number of klines will be returned.
:return: Dict with the symbol as a key and a list of klines as a value. One kline is also a list.
"""
klines_per_request = 400
now_ts = now_timestamp()
startTime, endTime = get_interval(freq)
klines = []
try:
if limit <= klines_per_request: # Server will return these number of klines in one request
# INFO:
# - startTime: include all intervals (ids) with same or greater id: if within interval then excluding this interval; if is equal to open time then include this interval
# - endTime: include all intervals (ids) with same or smaller id: if equal to left border then return this interval, if within interval then return this interval
# - It will return also incomplete current interval (in particular, we could collect approximate klines for higher frequencies by requesting incomplete intervals)
klines = App.client.get_klines(symbol=symbol, interval=freq, limit=limit, endTime=now_ts)
# Return: list of lists, that is, one kline is a list (not dict) with items ordered: timestamp, open, high, low, close etc.
else:
# https://sammchardy.github.io/binance/2018/01/08/historical-data-download-binance.html
# get_historical_klines(symbol, interval, start_str, end_str=None, limit=500)
start_ts = now_ts - (limit+1) * 60_000 # Subtract the number of minutes from now ts
klines = App.client.get_historical_klines(symbol=symbol, interval=freq, start_str=start_ts, end_str=now_ts)
except BinanceRequestException as bre:
# {"code": 1103, "msg": "An unknown parameter was sent"}
log.error(f"BinanceRequestException while requesting klines: {bre}")
return {}
except BinanceAPIException as bae:
# {"code": 1002, "msg": "Invalid API call"}
log.error(f"BinanceAPIException while requesting klines: {bae}")
return {}
except Exception as e:
log.error(f"Exception while requesting klines: {e}")
return {}
#
# Post-process
#
# Find latest *full* (completed) interval in the result list.
# The problem is that the result also contains the current (still running) interval which we want to exclude
klines_full = [kl for kl in klines if kl[0] < startTime]
last_full_kline = klines_full[-1]
last_full_kline_ts = last_full_kline[0]
if last_full_kline_ts != startTime - 60_000:
log.error(f"UNEXPECTED RESULT: Last full kline timestamp {last_full_kline_ts} is not equal to previous full interval start {startTime - 60_000}. Maybe some results are missing and there are gaps.")
# Return all received klines with the symbol as a key
return {symbol: klines_full}
#
# Server and account info
#
async def data_provider_health_check():
"""
Request information about the data provider server state.
"""
symbol = App.config["symbol"]
# Get server state (ping) and trade status (e.g., trade can be suspended on some symbol)
system_status = App.client.get_system_status()
#{
# "status": 0, # 0: normal,1:system maintenance
# "msg": "normal" # normal or System maintenance.
#}
if not system_status or system_status.get("status") != 0:
App.server_status = 1
return 1
App.server_status = 0
# Ping the server
# Check time synchronization
#server_time = App.client.get_server_time()
#time_diff = int(time.time() * 1000) - server_time['serverTime']
# TODO: Log large time differences (or better trigger time synchronization procedure)
return 0