This repository was archived by the owner on Oct 13, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 36
Expand file tree
/
Copy pathorder_book.py
More file actions
174 lines (152 loc) · 7.84 KB
/
order_book.py
File metadata and controls
174 lines (152 loc) · 7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/python
# -*- coding: utf-8 -*-
# bittrex_websocket/order_book.py
# Stanislav Lazarov
from bittrex_websocket.websocket_client import BittrexSocket
from bittrex_websocket.constants import BittrexMethods
import logging
from .constants import EventTypes
from collections import deque
from time import sleep, time
from events import Events
logger = logging.getLogger(__name__)
class OrderBook(BittrexSocket):
def __init__(self, url=None, retry_timeout=None, max_retries=None):
super(OrderBook, self).__init__(url, retry_timeout, max_retries)
self._on_ping = Events()
self._on_ping.on_change += self.on_ping
self.order_nounces = {}
self.order_books = {}
def get_order_book(self, ticker):
if ticker in self.order_books:
return self.order_books[ticker]
def control_queue_handler(self):
while True:
event = self.control_queue.get()
if event is not None:
if event.type == EventTypes.CONNECT:
self._handle_connect()
elif event.type == EventTypes.SUBSCRIBE:
self._handle_subscribe(event)
elif event.type == EventTypes.RECONNECT:
self._handle_reconnect(event.error_message)
elif event.type == EventTypes.CONFIRM_OB:
self._confirm_order_book(event.ticker, event.order_nounces)
elif event.type == EventTypes.SYNC_OB:
self._sync_order_book(event.ticker, event.order_nounces)
elif event.type == EventTypes.CLOSE:
self.connection.conn.close()
break
self.control_queue.task_done()
def subscribe_to_orderbook(self, ticker):
self.subscribe_to_exchange_deltas(ticker)
def _handle_subscribe_for_ticker(self, invoke, payload):
if invoke in [BittrexMethods.SUBSCRIBE_TO_EXCHANGE_DELTAS, BittrexMethods.QUERY_EXCHANGE_STATE]:
for ticker in payload[0]:
self.connection.corehub.server.invoke(invoke, ticker)
self.invokes.append({'invoke': invoke, 'ticker': ticker})
logger.info('Successfully subscribed to [{}] for [{}].'.format(invoke, ticker))
return True
elif invoke is None:
return True
return False
def on_public(self, msg):
if msg['invoke_type'] == BittrexMethods.SUBSCRIBE_TO_EXCHANGE_DELTAS:
ticker = msg['M']
if ticker not in self.order_nounces:
self.order_nounces[ticker] = deque()
self.query_exchange_state([ticker])
elif ticker in self.order_nounces:
if self.order_nounces[ticker] is False:
pass
else:
self.order_nounces[ticker].append(msg)
if ticker in self.order_books:
if self.order_nounces[ticker]:
# event = ConfirmEvent(ticker, self.order_nounces[ticker])
self._confirm_order_book(ticker, self.order_nounces[ticker])
self.order_nounces[ticker] = False
# self.control_queue.put(event)
else:
# event = SyncEvent(ticker, msg)
# self.control_queue.put(event)
self._sync_order_book(ticker, msg)
elif msg['invoke_type'] == BittrexMethods.QUERY_EXCHANGE_STATE:
ticker = msg['ticker']
self.order_books[ticker] = msg
for i in range(len(self.invokes)):
if self.invokes[i]['ticker'] == ticker and self.invokes[i]['invoke'] == msg['invoke_type']:
self.invokes[i]['invoke'] = None
break
def _confirm_order_book(self, ticker, order_nounces):
for delta in order_nounces:
if self._sync_order_book(ticker, delta):
return
def _sync_order_book(self, ticker, order_data):
# Syncs the order book for the pair, given the most recent data from the socket
nounce_diff = order_data['N'] - self.order_books[ticker]['N']
if nounce_diff == 1:
self.order_books[ticker]['N'] = order_data['N']
# Start syncing
for side in [['Z', True], ['S', False]]:
new_orders_inserted = False
for item in order_data[side[0]]:
# TYPE 0: New order entries at matching price
# -> ADD to order book
if item['TY'] == 0:
self.order_books[ticker][side[0]].append(
{
'Q': item['Q'],
'R': item['R']
})
new_orders_inserted = True
# TYPE 1: Cancelled / filled order entries at matching price
# -> DELETE from the order book
elif item['TY'] == 1:
for i, existing_order in enumerate(self.order_books[ticker][side[0]]):
if existing_order['R'] == item['R']:
del self.order_books[ticker][side[0]][i]
break
# TYPE 2: Changed order entries at matching price (partial fills, cancellations)
# -> EDIT the order book
elif item['TY'] == 2:
for existing_order in self.order_books[ticker][side[0]]:
if existing_order['R'] == item['R']:
existing_order['Q'] = item['Q']
break
if new_orders_inserted:
# Sort by price, with respect to BUY(desc) or SELL(asc)
self.order_books[ticker][side[0]] = sorted(
self.order_books[ticker][side[0]], key=lambda k: k['R'], reverse=side[1])
# Add nounce unix timestamp
self.order_books[ticker]['timestamp'] = time()
new_completed_orders_count = len(order_data['f'])
if new_completed_orders_count != 0:
#saving at least 20 min trade history; 20 mins in mils;
if order_data['f'][0]['T'] - self.order_books[ticker]['f'][-new_completed_orders_count]['T'] > 20*60*1000:
del self.order_books[ticker]['f'][-min(new_completed_orders_count, len(self.order_books[ticker]['f'])):]
#if history has data aged more thatn 120 mins delte all data upon 30 minutes history
for i in range(len(self.order_books[ticker]['f'])):
if order_data['f'][0]['T'] - self.order_books[ticker]['f'][-new_completed_orders_count]['T'] >= 120*60*1000:
if self.order_books[ticker]['f'][i]['T'] > 30*60*1000:
del self.order_books[ticker]['f'][-i:]
break
for new_completed_order in reversed(order_data['f']):
self.order_books[ticker]['f'].insert(0, new_completed_order)
self._on_ping.on_change(ticker)
return True
# The next nounce will trigger a sync.
elif nounce_diff == 0:
return True
# The order book snapshot nounce is ahead. Discard this nounce.
elif nounce_diff < 0:
return False
else:
# Experimental resync
logger.error(
'[Subscription][{}][{}]: Out of sync. Trying to resync...'.format(BittrexMethods.QUERY_EXCHANGE_STATE,
ticker))
self.order_nounces.pop(ticker)
self.order_books.pop(ticker)
def on_ping(self, msg):
pass