""" Consumer """ import asyncio import json import logging from datetime import datetime import websockets from clickhouse_driver import Client from alerter import mem_storage from consumer.config import CONFIG logger = logging.Logger(__name__) class DB: """ DB functionality """ def __init__(self): self.client: Client = Client(**CONFIG["clickhouse"]) self.client.connection.force_connect() async def add_instrument(self, *, description, isin, sign): """ Add instrument operation to the data back-end """ data = (isin, description, sign, datetime.utcnow()) self.client.execute( """INSERT INTO symbols (isin, description, status, added_on) VALUES""", [data]) async def add_quote(self, *, isin, price): """ Adds a single quote to the data back-end and memory storage """ now = datetime.utcnow() now_minute = now.replace(second=0, microsecond=0) data = { "isin": isin, "price": int(price * 1_000_000), "now": now, "now_minute": now_minute } self.client.execute( """ INSERT INTO quotes SELECT %(isin)s AS isin, %(now_minute)s AS minute_start, maxState(toDateTime(%(now)s)), argMinState(toInt64(%(price)s), toDateTime(%(now)s)), argMaxState(toInt64(%(price)s), toDateTime(%(now)s)), minState(toInt64(%(price)s)), maxState(toInt64(%(price)s)) GROUP BY (isin, minute_start) """, data) async def get_instruments(db: DB): """ Get instrument operations from websocket and process them """ uri = "ws://localhost:8080/instruments" async with websockets.connect(uri) as websocket: while True: instrument_data = await websocket.recv() operation = json.loads(instrument_data) if operation["type"] in {"ADD", "DELETE"}: await db.add_instrument( isin=operation["data"]["isin"], description=operation["data"]["description"], sign=1 if operation["type"] == "ADD" else -1) async def get_quotes(db: DB, mem_store: mem_storage.MemStorage): """ Get quotes information from the websocket and process them """ uri = "ws://localhost:8080/quotes" async with websockets.connect(uri) as websocket: while True: quote_data = await websocket.recv() quote = json.loads(quote_data)["data"] mem_store.process_quote(isin=quote["isin"], price=quote["price"]) await db.add_quote(isin=quote["isin"], price=quote["price"]) async def run(loop): """ Main coroutine """ db = DB() mem_store = mem_storage.MemStorage() futures = [] futures.append(loop.create_task(get_instruments(db))) futures.append(loop.create_task(get_quotes(db, mem_store))) await asyncio.gather(*futures)