You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

94 lines
3.0 KiB

""" Consumer
"""
import asyncio
import json
import logging
from datetime import datetime
import websockets
from clickhouse_driver import Client
from alerter import mem_storage
from consumer.config import CONFIG
logger = logging.Logger(__name__)
class DB:
""" DB functionality
"""
def __init__(self):
self.client: Client = Client(**CONFIG["clickhouse"])
self.client.connection.force_connect()
async def add_instrument(self, *, description, isin, sign):
""" Add instrument operation to the data back-end
"""
data = (isin, description, sign, datetime.utcnow())
self.client.execute(
"""INSERT INTO symbols (isin, description, status, added_on) VALUES""",
[data])
async def add_quote(self, *, isin, price):
""" Adds a single quote to the data back-end and memory storage
"""
now = datetime.utcnow()
now_minute = now.replace(second=0, microsecond=0)
data = {
"isin": isin,
"price": int(price * 1_000_000),
"now": now,
"now_minute": now_minute
}
self.client.execute(
"""
INSERT INTO quotes
SELECT
%(isin)s AS isin,
%(now_minute)s AS minute_start,
maxState(toDateTime(%(now)s)),
argMinState(toInt64(%(price)s), toDateTime(%(now)s)),
argMaxState(toInt64(%(price)s), toDateTime(%(now)s)),
minState(toInt64(%(price)s)),
maxState(toInt64(%(price)s))
GROUP BY (isin, minute_start)
""", data)
async def get_instruments(db: DB):
""" Get instrument operations from websocket and process them
"""
uri = "ws://localhost:8080/instruments"
async with websockets.connect(uri) as websocket:
while True:
instrument_data = await websocket.recv()
operation = json.loads(instrument_data)
if operation["type"] in {"ADD", "DELETE"}:
await db.add_instrument(
isin=operation["data"]["isin"],
description=operation["data"]["description"],
sign=1 if operation["type"] == "ADD" else -1)
async def get_quotes(db: DB, mem_store: mem_storage.MemStorage):
""" Get quotes information from the websocket and process them
"""
uri = "ws://localhost:8080/quotes"
async with websockets.connect(uri) as websocket:
while True:
quote_data = await websocket.recv()
quote = json.loads(quote_data)["data"]
mem_store.process_quote(isin=quote["isin"], price=quote["price"])
await db.add_quote(isin=quote["isin"], price=quote["price"])
async def run(loop):
""" Main coroutine
"""
db = DB()
mem_store = mem_storage.MemStorage()
futures = []
futures.append(loop.create_task(get_instruments(db)))
futures.append(loop.create_task(get_quotes(db, mem_store)))
await asyncio.gather(*futures)