# Homework assignment ## Potential solutions How do I do this in an easy but scalable way? (spoiler alert: it's ClickHouse) ### DIY Write an in-memory data aggregator that is basically a HashMap with symbols and their primary aggregated KPIs for the running minute. Pseudocode in Python because I'm comfortable with Python: ```python symbols = {} while True: if minute_is_up(): # parallel thread reads from queue (regular in-memory queue) and writes to RDBMS send_to_queue(symbols) quote = endpoint.get() if quote.symbol not in symbols: symbol = Symbol(open=quote.value) symbols[quote.symbol] = symbol else: symbol = symbols.get[quote.symbol] if symbol.max < quote.value: symbol.max = quote.value if symbol.min < quote.value: symbol.min = quote.value @route("/candlestick", method="get") def get_candlestick(symbol, minute): if minute_not_in_queue(minute) and minute_is_not_now(minute): candlestick = get_from_posrgres(symbol, minute) if not candlestick and minute_is_not_now(minute): candlestick = get_from_queue(symbol, minute) if not minute_is_not_now(): candlestick = get_current(symbol) return candlestick ``` Potential issues can occur if database writes are a bottleneck, which will cause memory usage to grow, leading to a myriad of problems that make this approach not appealing right now. ### Time Series DB Time series databases are very good at storing timed events. The writes are fast, and queries on time ranges are also fast. It scales linearly. That's thanks to intelligent automatic partitioning on time fields. TimescaleDB can be set up using a Docker image, and hit the ground running. The problem is that it will fill up the database with useless high resolution data. It will work well in this particular problem but I don't think it's the best solution. ### Clickhouse Why: If it's good enough for CERN, and it runs on a potato VM, then it's probably good enough for me. ClickHouse supports a large variety of table engines, but for this purpose, described in the Python DIY solution, there is a table engine called [AggregatingMergeTree](https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/aggregatingmergetree/). Storage requirements are modest, queries will be instant, scales well (vertically AND horizontally), it's good enough for Production. Bonus reason: I played around with ClickHouse enough to make people happy with fast dashboards. ### Others that crossed my mind Flink, Spark, but they're slower and I'm rusty with the Java ecosystem. ## We need some basic infrastructure ### Scaleway Cloud (cheap and good) Set up the scaleway CLI on my Mac: ```bash brew install scw scw init ``` ### Create new instance Choosing `DEV1-M` instance size because 4GB of RAM @ 15€ per month, should be more than enough for the purpose. ```bash # note the UUID of the instance to use later scw instance server create type=DEV1-M zone=nl-ams-1 image=ubuntu_bionic name=tr-clickhouse ip=new # keep checking the server status scw instance server get zone=nl-ams-1 f40ff8bc-988c-40dd-912c-5de21032a30b -o json | jq -r ".state" # once it's running, get the public ip address scw instance server get zone=nl-ams-1 f40ff8bc-988c-40dd-912c-5de21032a30b -o json | jq -r ".public_ip.address" # Good to go! ssh root@51.15.116.117 ``` ### Setup ClickHouse Following [the guide](https://clickhouse.tech/#quick-start), and changing some minor elements: ```bash apt-get -y install apt-transport-https ca-certificates dirmngr apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4 echo "deb https://repo.clickhouse.tech/deb/stable/ main/" | tee /etc/apt/sources.list.d/clickhouse.list apt-get update apt-get install -y clickhouse-server clickhouse-client # specify password systemctl start clickhouse-server ``` ## Solution ### Running clickhouse Clickhouse offers a variety of features that will be really useful: * The `AggregatingMergeTree` table family, which keeps a state of the current aggregated data, and updates it when new data arrives, without keeping all the rows * The `CollapsingMergeTree` table family, which allows rows to delete themselves automatically when marked for deletion, which can be useful when symbols get deleted. A log can be kept on S3 if necessary. * Aggregation states for all aggregation functions. It comes with a few limitations, mainly to do with client library stability on Python in async mode, and with obscure error messages that can be received. The [DDL](clickhouse/ddl.sql) to create the quotes table is as follows: The open/close values for each symbol will be kept as `argMin` and `argMax` aggregation states based on the value of the `minute_start` field. The values themselves will be treated as "micros" (the value * 1 million), and stored as integer for simplicity and speed. Alternatively, `numeric` types can be used. A similar construction is made for the `symbols` table. ### Running the consumer * Rename and set up `config.json.example` to connect to ClickHouse DB API (not HTTP). * Set up a virtualenv and install the requirements * start up the sample data generator * start up the consumer ```bash # virtualenv make venv # start sample data generator make generator # Start consumer make consumer ``` Data should be pouring in ### Running the web service ```bash make flask ``` Then go to either of the endpoints: * http://127.0.0.1:5000/instruments - list of all instruments and their most recent quote (or null) * http://127.0.0.1:5000/last30m/DJ40J281M584 - list of quotes from the last 30 minutes, grouped by 1 minute intervals, which contain the opening price, closing price, min and max price. #### "Hot" stream Upon inserting every quote value, the value is directed to an in-memory data structure: [alerter.mem_storage](alerter/mem_storage.py). Uses a dictionary to organize by symbol. For each symbol there's 2 data structures: * An ordered (by time) queue of entries. When a new entry is added to the right, old entries are removed from the left as long as they're older than 5 minutes. * An ordered list of prices. This converts the "is it hot" question to an O(1) complexity. Deletions and insertions are slower however, but Python optimizes them well Notification is not implemented until the end, but it would probably work as a Kafka producer, and there can be many consumers that react to messages and send out push notifications. In case we run out of memory for this storage, horizontal scaling is possible, by partitioning the flow of data by the ISIN. If restarts and crashes (data loss that leads to missed notifications) are feared, in-memory storage can be substituted for an ACID-compliant RDBMS with time partitioning, and parallel processes that constantly take care to create new, and remove old partitions.