From bf2789eef37ce7d53de26c04496de3914098f3d9 Mon Sep 17 00:00:00 2001 From: Alexandru Pisarenco Date: Mon, 19 Apr 2021 23:07:42 +0200 Subject: [PATCH] update docs, add comments --- README.md | 10 +++++++--- alerter/mem_storage.py | 8 +++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8089d2d..b3e54df 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,11 @@ Then go to either of the endpoints: * http://127.0.0.1:5000/last30m/DJ40J281M584 - list of quotes from the last 30 minutes, grouped by 1 minute intervals, which contain the opening price, closing price, min and max price. #### "Hot" stream -Upon inserting every quote value, a query is run: -``` +Upon inserting every quote value, the value is directed to an in-memory data structure: `alerter.mem_storage` -``` +Uses a dictionary to organize by symbol. For each symbol there's 2 data structures: +* An ordered (by time) queue of entries. When a new entry is added to the right, old entries are removed from the left as long as they're older than 5 minutes. +* An ordered list of prices. This converts the "is it hot" question to an O(1) complexity. Deletions and insertions are slower however, but Python optimizes them well + +Notification is not implemented until the end, but it would probably work as a Kafka producer, and there can be many consumers that react to messages and send out push notifications. In case we run out of memory for this storage, horizontal scaling is possible, by partitioning the flow of data by the ISIN. +If restarts and crashes (data loss that leads to missed notifications) are feared, in-memory storage can be substituted for an ACID-compliant RDBMS with time partitioning, and parallel processes that constantly take care to create new, and remove old partitions. diff --git a/alerter/mem_storage.py b/alerter/mem_storage.py index d463208..da4ea13 100644 --- a/alerter/mem_storage.py +++ b/alerter/mem_storage.py @@ -38,17 +38,23 @@ class MemStorage: now = monotonic() q: deque[Quote] = self.__time_ordered_queues[isin] ol: List[float] = self.__value_ordered_lists[isin] - while now - q[0].time > 600: + # Delete everything that's older than 5 minutes + while now - q[0].time > 300: item = q.popleft() index = bisect.bisect_left(ol, item.price) ol.pop(index) + # Add current record at the end of the queue q.append(Quote(now, price)) + # Search where to add, and add into the ordered price list insert_at = bisect.bisect_left(ol, price) ol.insert(insert_at, price) + # Fast computation to check if instrument is "hot" min_val = ol[0] max_val = ol[-1] is_hot = (max_val - min_val) / min_val > 0.1 + + # Figure out if we need to mark it as hot (and notify), or unmark it if is_hot and isin not in self.__hot_stuff: self.__hot_stuff.add(isin) self.notify_hot(isin)