From 186f4f11be0a40f37cc53973d44e200e65957eb4 Mon Sep 17 00:00:00 2001 From: Alexandru Pisarenco Date: Mon, 19 Apr 2021 22:55:28 +0200 Subject: [PATCH] first commit --- .DS_Store | Bin 0 -> 6148 bytes .flake8 | 2 + .gitignore | 6 ++ .pylintrc | 27 +++++++++ .vscode/launch.json | 15 +++++ .vscode/settings.json | 3 + LICENSE | 21 +++++++ Makefile | 42 +++++++++++++ README.md | 134 +++++++++++++++++++++++++++++++++++++++++ alerter/__init__.py | 0 alerter/mem_storage.py | 56 +++++++++++++++++ clickhouse/ddl.sql | 22 +++++++ config.json.example | 9 +++ consumer/__init__.py | 0 consumer/config.py | 6 ++ consumer/consumer.py | 93 ++++++++++++++++++++++++++++ log/.gitignore | 1 + requirements.txt | 8 +++ run_consumer.py | 9 +++ test/__init__.py | 0 web/app.py | 117 +++++++++++++++++++++++++++++++++++ 21 files changed, 571 insertions(+) create mode 100644 .DS_Store create mode 100644 .flake8 create mode 100644 .gitignore create mode 100644 .pylintrc create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README.md create mode 100644 alerter/__init__.py create mode 100644 alerter/mem_storage.py create mode 100644 clickhouse/ddl.sql create mode 100644 config.json.example create mode 100644 consumer/__init__.py create mode 100644 consumer/config.py create mode 100644 consumer/consumer.py create mode 100644 log/.gitignore create mode 100644 requirements.txt create mode 100644 run_consumer.py create mode 100644 test/__init__.py create mode 100644 web/app.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..002d2194fbab9f3ef491fbdfd4a6ed8e47b8673c GIT binary patch literal 6148 zcmeHK!EVz)5S?uUY=RIuAkpJqxJ4+A5K<3FR@xpaE(lj}0My#Gl~^*~C~+D=2=Z^h zAMpiz4!qeNP*LGV5sG%KnKzl8@noO1UM~@;@iLEy`b4CmjGZ3LZ-mELugH!>c7e*> zqaZ^UWT^%Q2@C`V{&xm=?LMTZShF)K>GJwLp*f|LyV!Hs87(QJ3R&k^-A7pC6Zkbo z3S_c3LHvNGsH$)kE>av<_B^X>hCcj~xy$-Z-A_q8klGlZ3bq&KN=3q<44hE)7AGML{E;--oAUk z{;>J@38X-j^5_T?*THAlYkgoB$-J&?Jwcxz|KW1~;BO5B@_6;_t(j4nNMIl^@V6M? z_k)2l#)7R!_v(OgM*v_4VJGl;o97xM7z?%@F#=J(6zEHp95IwHN8CtWEZBPV<)q~B zp`^2t6N+-(aew2|NyQ$64GaVZer15a{ULqc|BwEB{&z{i1O@^F+sT0H943cj%uV*z xE3@Ri)F5lHwDa5MxH82G0Qd log/partner-service.log 2> log/partner-service.error.log & + sleep 1 + +consumer: stop-consumer + venv/bin/python run_consumer.py > log/consumer.log 2> log/consumer.error.log & + +flask: stop-flask + FLASK_APP=web.app flask run > log/flask.log 2> log/flask.error.log & + +stop-all: stop-generator stop-consumer + +flake8: + python -m flake8 web consumer test alerter + +pylint: + python -m pylint --rcfile .pylintrc web consumer test alerter + +isort: + isort web consumer test alerter + +reformat: + yapf --parallel --recursive --in-place web consumer test alerter + +pre-commit: reformat isort pylint flake8 + +venv: + virtualenv -p python3 venv + venv/bin/pip install -r requirements.txt diff --git a/README.md b/README.md new file mode 100644 index 0000000..8089d2d --- /dev/null +++ b/README.md @@ -0,0 +1,134 @@ +# Homework assignment + +## Potential solutions +How do I do this in an easy but scalable way? (spoiler alert: it's ClickHouse) + +### DIY +Write an in-memory data aggregator that is basically a HashMap with symbols and their primary aggregated KPIs for the running minute. Pseudocode in Python because I'm comfortable with Python: +```python +symbols = {} +while True: + if minute_is_up(): + # parallel thread reads from queue (regular in-memory queue) and writes to RDBMS + send_to_queue(symbols) + quote = endpoint.get() + if quote.symbol not in symbols: + symbol = Symbol(open=quote.value) + symbols[quote.symbol] = symbol + else: + symbol = symbols.get[quote.symbol] + if symbol.max < quote.value: + symbol.max = quote.value + if symbol.min < quote.value: + symbol.min = quote.value + +@route("/candlestick", method="get") +def get_candlestick(symbol, minute): + if minute_not_in_queue(minute) and minute_is_not_now(minute): + candlestick = get_from_posrgres(symbol, minute) + if not candlestick and minute_is_not_now(minute): + candlestick = get_from_queue(symbol, minute) + if not minute_is_not_now(): + candlestick = get_current(symbol) + return candlestick +``` +Potential issues can occur if database writes are a bottleneck, which will cause memory usage to grow, leading to a myriad of problems that make this approach not appealing right now. + +### Time Series DB +Time series databases are very good at storing timed events. The writes are fast, and queries on time ranges are also fast. It scales linearly. That's thanks to intelligent automatic partitioning on time fields. + +TimescaleDB can be set up using a Docker image, and hit the ground running. The problem is that it will fill up the database with useless high resolution data. It will work well in this particular problem but I don't think it's the best solution. + +### Clickhouse +Why: If it's good enough for CERN, and it runs on a potato VM, then it's probably good enough for me. + +ClickHouse supports a large variety of table engines, but for this purpose, described in the Python DIY solution, there is a table engine called [AggregatingMergeTree](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/aggregatingmergetree/). Storage requirements are modest, queries will be instant, scales well (vertically AND horizontally), it's good enough for Production. + +Bonus reason: I played around with ClickHouse enough to make people happy with fast dashboards. + +### Others that crossed my mind + +Flink, Spark, but they're slower and I'm rusty with the Java ecosystem. + +## We need some basic infrastructure + +### Scaleway Cloud (cheap and good) +Set up the scaleway CLI on my Mac: +```bash +brew install scw +scw init +``` + +### Create new instance +Choosing `DEV1-M` instance size because 4GB of RAM @ 15€ per month, should be more than enough for the purpose. +```bash +# note the UUID of the instance to use later +scw instance server create type=DEV1-M zone=nl-ams-1 image=ubuntu_bionic name=tr-clickhouse ip=new +# keep checking the server status +scw instance server get zone=nl-ams-1 f40ff8bc-988c-40dd-912c-5de21032a30b -o json | jq -r ".state" +# once it's running, get the public ip address +scw instance server get zone=nl-ams-1 f40ff8bc-988c-40dd-912c-5de21032a30b -o json | jq -r ".public_ip.address" +# Good to go! +ssh root@51.15.116.117 +``` + +### Setup ClickHouse +Following [the guide](https://clickhouse.tech/#quick-start), and changing some minor elements: +```bash +apt-get -y install apt-transport-https ca-certificates dirmngr +apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4 +echo "deb https://repo.clickhouse.tech/deb/stable/ main/" | tee /etc/apt/sources.list.d/clickhouse.list +apt-get update +apt-get install -y clickhouse-server clickhouse-client +# specify password +systemctl start clickhouse-server +``` + +## Solution + +### Running clickhouse + +Clickhouse offers a variety of features that will be really useful: +* The `AggregatingMergeTree` table family, which keeps a state of the current aggregated data, and updates it when new data arrives, without keeping all the rows +* The `CollapsingMergeTree` table family, which allows rows to delete themselves automatically when marked for deletion, which can be useful when symbols get deleted. A log can be kept on S3 if necessary. +* Aggregation states for all aggregation functions. + +It comes with a few limitations, mainly to do with client library stability on Python in async mode, and with obscure error messages that can be received. + +The [DDL](clickhouse/ddl.sql) to create the quotes table is as follows: + +The open/close values for each symbol will be kept as `argMin` and `argMax` aggregation states based on the value of the `minute_start` field. The values themselves will be treated as "micros" (the value * 1 million), and stored as integer for simplicity and speed. Alternatively, `numeric` types can be used. + +A similar construction is made for the `symbols` table. + +### Running the consumer + +* Rename and set up `config.json.example` to connect to ClickHouse DB API (not HTTP). +* Set up a virtualenv and install the requirements +* start up the sample data generator +* start up the consumer + +```bash +# virtualenv +make venv +# start sample data generator +make generator +# Start consumer +make consumer +``` +Data should be pouring in + +### Running the web service + +```bash +make flask +``` +Then go to either of the endpoints: +* http://127.0.0.1:5000/instruments - list of all instruments and their most recent quote (or null) +* http://127.0.0.1:5000/last30m/DJ40J281M584 - list of quotes from the last 30 minutes, grouped by 1 minute intervals, which contain the opening price, closing price, min and max price. + +#### "Hot" stream +Upon inserting every quote value, a query is run: +``` + +``` diff --git a/alerter/__init__.py b/alerter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alerter/mem_storage.py b/alerter/mem_storage.py new file mode 100644 index 0000000..d463208 --- /dev/null +++ b/alerter/mem_storage.py @@ -0,0 +1,56 @@ +""" In-memory fast temporary storage +""" +import bisect +from collections import defaultdict, deque +from time import monotonic +from typing import List + + +class Quote: + """ How much something costs at which time since the process started + """ + __slots__ = ["time", "price"] + + def __init__(self, time: float, price: float): + self.time = time + self.price = price + + +class MemStorage: + """ In-memory storage and notifier class + """ + def __init__(self): + # FIFO queue where old records get discarded + self.__time_ordered_queues = defaultdict(deque) + # Discarded records get looked up in an ordered list with O(log(n)) and value is removed + # ordered list is maintained ordered + self.__value_ordered_lists = defaultdict(list) + # we remember what's hot + self.__hot_stuff = set() + + def notify_hot(self, isin: str): + """ Symbol isin is "hot". Send real-time notifications + """ + + def process_quote(self, isin: str, price=float): + """ Fast in-memory storage of values + """ + now = monotonic() + q: deque[Quote] = self.__time_ordered_queues[isin] + ol: List[float] = self.__value_ordered_lists[isin] + while now - q[0].time > 600: + item = q.popleft() + index = bisect.bisect_left(ol, item.price) + ol.pop(index) + q.append(Quote(now, price)) + insert_at = bisect.bisect_left(ol, price) + ol.insert(insert_at, price) + + min_val = ol[0] + max_val = ol[-1] + is_hot = (max_val - min_val) / min_val > 0.1 + if is_hot and isin not in self.__hot_stuff: + self.__hot_stuff.add(isin) + self.notify_hot(isin) + elif not is_hot and isin in self.__hot_stuff: + self.__hot_stuff.remove(isin) diff --git a/clickhouse/ddl.sql b/clickhouse/ddl.sql new file mode 100644 index 0000000..40ed8e4 --- /dev/null +++ b/clickhouse/ddl.sql @@ -0,0 +1,22 @@ +CREATE TABLE IF NOT EXISTS traderepublic.quotes +( + isin String, + minute_start DateTime, + last_record_timestamp AggregateFunction(max, DateTime), + first_value AggregateFunction(argMin, Int64, DateTime), + last_value AggregateFunction(argMax, Int64, DateTime), + min_value AggregateFunction(min, Int64), + max_value AggregateFunction(max, Int64) +) ENGINE = AggregatingMergeTree() +PARTITION BY isin +ORDER BY (isin, minute_start); + +CREATE TABLE IF NOT EXISTS traderepublic.symbols +( + isin String, + description String, + status Int8, + added_on DateTime +) ENGINE = CollapsingMergeTree(status) +PARTITION BY isin +ORDER BY (isin, added_on) diff --git a/config.json.example b/config.json.example new file mode 100644 index 0000000..76f89c8 --- /dev/null +++ b/config.json.example @@ -0,0 +1,9 @@ +{ + "clickhouse": { + "host": "123.123.123.123", + "port": "9000", + "database": "traderepublic", + "user": "default", + "password": "" + } +} diff --git a/consumer/__init__.py b/consumer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/consumer/config.py b/consumer/config.py new file mode 100644 index 0000000..75750ec --- /dev/null +++ b/consumer/config.py @@ -0,0 +1,6 @@ +""" Simple config module +""" +import json + +with open("config.json", "r") as fp: + CONFIG = json.load(fp) diff --git a/consumer/consumer.py b/consumer/consumer.py new file mode 100644 index 0000000..9aef491 --- /dev/null +++ b/consumer/consumer.py @@ -0,0 +1,93 @@ +""" Consumer +""" +import asyncio +import json +import logging +from datetime import datetime + +import websockets +from clickhouse_driver import Client + +from alerter import mem_storage +from consumer.config import CONFIG + +logger = logging.Logger(__name__) + + +class DB: + """ DB functionality + """ + def __init__(self): + self.client: Client = Client(**CONFIG["clickhouse"]) + self.client.connection.force_connect() + + async def add_instrument(self, *, description, isin, sign): + """ Add instrument operation to the data back-end + """ + data = (isin, description, sign, datetime.utcnow()) + self.client.execute( + """INSERT INTO symbols (isin, description, status, added_on) VALUES""", + [data]) + + async def add_quote(self, *, isin, price): + """ Adds a single quote to the data back-end and memory storage + """ + now = datetime.utcnow() + now_minute = now.replace(second=0, microsecond=0) + data = { + "isin": isin, + "price": int(price * 1_000_000), + "now": now, + "now_minute": now_minute + } + self.client.execute( + """ + INSERT INTO quotes + SELECT + %(isin)s AS isin, + %(now_minute)s AS minute_start, + maxState(toDateTime(%(now)s)), + argMinState(toInt64(%(price)s), toDateTime(%(now)s)), + argMaxState(toInt64(%(price)s), toDateTime(%(now)s)), + minState(toInt64(%(price)s)), + maxState(toInt64(%(price)s)) + GROUP BY (isin, minute_start) + """, data) + + +async def get_instruments(db: DB): + """ Get instrument operations from websocket and process them + """ + uri = "ws://localhost:8080/instruments" + async with websockets.connect(uri) as websocket: + while True: + instrument_data = await websocket.recv() + operation = json.loads(instrument_data) + if operation["type"] in {"ADD", "DELETE"}: + await db.add_instrument( + isin=operation["data"]["isin"], + description=operation["data"]["description"], + sign=1 if operation["type"] == "ADD" else -1) + + +async def get_quotes(db: DB, mem_store: mem_storage.MemStorage): + """ Get quotes information from the websocket and process them + """ + uri = "ws://localhost:8080/quotes" + async with websockets.connect(uri) as websocket: + while True: + quote_data = await websocket.recv() + quote = json.loads(quote_data)["data"] + mem_store.process_quote(isin=quote["isin"], price=quote["price"]) + await db.add_quote(isin=quote["isin"], price=quote["price"]) + + +async def run(loop): + """ Main coroutine + """ + db = DB() + mem_store = mem_storage.MemStorage() + futures = [] + futures.append(loop.create_task(get_instruments(db))) + futures.append(loop.create_task(get_quotes(db, mem_store))) + await asyncio.gather(*futures) diff --git a/log/.gitignore b/log/.gitignore new file mode 100644 index 0000000..bf0824e --- /dev/null +++ b/log/.gitignore @@ -0,0 +1 @@ +*.log \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2f118a5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +aioch==0.0.2 +flake8==3.9.1 +flask==1.1.2 +isort==5.8.0 +pylint==2.7.4 +pylint-quotes==0.2.1 +websockets==8.1 +yapf==0.31.0 diff --git a/run_consumer.py b/run_consumer.py new file mode 100644 index 0000000..8869e37 --- /dev/null +++ b/run_consumer.py @@ -0,0 +1,9 @@ +""" Main entrypoint +""" +import asyncio + +from consumer import consumer + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(consumer.run(loop)) diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/web/app.py b/web/app.py new file mode 100644 index 0000000..92aafd9 --- /dev/null +++ b/web/app.py @@ -0,0 +1,117 @@ +""" Web service app +""" +import json +from datetime import datetime, timedelta + +import pytz +from clickhouse_driver import Client +from flask import Flask + +app = Flask(__name__) + +with open("config.json", "r") as fp: + config = json.load(fp) +client = Client(**config["clickhouse"]) + + +@app.route("/instruments") +def get_instruments(): + """ Get list of instruments with the latest quote + """ + data = client.execute(""" + SELECT + isin, + description, + last_value + FROM ( + SELECT + isin, + argMax(description, added_on) AS description, + argMax(status, added_on) AS status + FROM symbols FINAL + GROUP BY isin + ) ndsymbols + LEFT JOIN ( + SELECT + isin, + toString(argMax(last_value, minute_start)) AS last_value + FROM ( + SELECT + isin, minute_start, + maxMerge(last_record_timestamp) AS last_record_timestamp, + argMinMerge(first_value)/1000000.0 AS first_value, + argMaxMerge(last_value)/1000000.0 AS last_value, + minMerge(min_value)/1000000.0 AS min_value, + maxMerge(max_value)/1000000.0 AS max_value + FROM traderepublic.quotes + GROUP BY (isin, minute_start) + ) aquotes + GROUP BY isin + ) vals + ON vals.isin=ndsymbols.isin +WHERE ndsymbols.status=1 +ORDER BY isin + """) + response_rows = [] + for row in data: + response_rows.append({ + "isin": + row[0], + "description": + row[1], + "last_value": + float(row[2]) if row[2] != "" else None + }) + return {"data": response_rows} + + +@app.route("/last30m/") +def get_last30m(isin: str): + """ Get last 30 minutes of candlestick data with 1 minute resolution + """ + time30m = datetime.utcnow() - timedelta(minutes=30) + time30m = time30m.replace(second=0, microsecond=0) + data = client.execute( + """ + SELECT + minute_start, + first_value, + last_value, + min_value, + max_value + FROM ( + SELECT + isin, minute_start, + argMinMerge(first_value)/1000000.0 AS first_value, + argMaxMerge(last_value)/1000000.0 AS last_value, + minMerge(min_value)/1000000.0 AS min_value, + maxMerge(max_value)/1000000.0 AS max_value + FROM traderepublic.quotes + WHERE isin=%(isin)s AND minute_start > %(time30m)s + GROUP BY (isin, minute_start) + ORDER BY minute_start + ) aquotes + """, { + "isin": isin, + "time30m": time30m, + }) + if not data: + return {} + result = {"isin": isin, "candlesticks": []} + candlesticks = result["candlesticks"] + timezone = pytz.timezone("UTC") + for row in data: + time_start = row[0].replace(tzinfo=timezone) + candlesticks.append({ + "time_start": + time_start.strftime("%Y-%m-%d %H:%M:%S%z"), + "open": + row[1], + "close": + row[2], + "min": + row[3], + "max": + row[4] + }) + return result