You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
94 lines
3.0 KiB
94 lines
3.0 KiB
""" Consumer
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
import websockets
|
|
from clickhouse_driver import Client
|
|
|
|
from alerter import mem_storage
|
|
from consumer.config import CONFIG
|
|
|
|
logger = logging.Logger(__name__)
|
|
|
|
|
|
class DB:
|
|
""" DB functionality
|
|
"""
|
|
def __init__(self):
|
|
self.client: Client = Client(**CONFIG["clickhouse"])
|
|
self.client.connection.force_connect()
|
|
|
|
async def add_instrument(self, *, description, isin, sign):
|
|
""" Add instrument operation to the data back-end
|
|
"""
|
|
data = (isin, description, sign, datetime.utcnow())
|
|
self.client.execute(
|
|
"""INSERT INTO symbols (isin, description, status, added_on) VALUES""",
|
|
[data])
|
|
|
|
async def add_quote(self, *, isin, price):
|
|
""" Adds a single quote to the data back-end and memory storage
|
|
"""
|
|
now = datetime.utcnow()
|
|
now_minute = now.replace(second=0, microsecond=0)
|
|
data = {
|
|
"isin": isin,
|
|
"price": int(price * 1_000_000),
|
|
"now": now,
|
|
"now_minute": now_minute
|
|
}
|
|
self.client.execute(
|
|
"""
|
|
INSERT INTO quotes
|
|
SELECT
|
|
%(isin)s AS isin,
|
|
%(now_minute)s AS minute_start,
|
|
maxState(toDateTime(%(now)s)),
|
|
argMinState(toInt64(%(price)s), toDateTime(%(now)s)),
|
|
argMaxState(toInt64(%(price)s), toDateTime(%(now)s)),
|
|
minState(toInt64(%(price)s)),
|
|
maxState(toInt64(%(price)s))
|
|
GROUP BY (isin, minute_start)
|
|
""", data)
|
|
|
|
|
|
async def get_instruments(db: DB):
|
|
""" Get instrument operations from websocket and process them
|
|
"""
|
|
uri = "ws://localhost:8080/instruments"
|
|
async with websockets.connect(uri) as websocket:
|
|
while True:
|
|
instrument_data = await websocket.recv()
|
|
operation = json.loads(instrument_data)
|
|
if operation["type"] in {"ADD", "DELETE"}:
|
|
await db.add_instrument(
|
|
isin=operation["data"]["isin"],
|
|
description=operation["data"]["description"],
|
|
sign=1 if operation["type"] == "ADD" else -1)
|
|
|
|
|
|
async def get_quotes(db: DB, mem_store: mem_storage.MemStorage):
|
|
""" Get quotes information from the websocket and process them
|
|
"""
|
|
uri = "ws://localhost:8080/quotes"
|
|
async with websockets.connect(uri) as websocket:
|
|
while True:
|
|
quote_data = await websocket.recv()
|
|
quote = json.loads(quote_data)["data"]
|
|
mem_store.process_quote(isin=quote["isin"], price=quote["price"])
|
|
await db.add_quote(isin=quote["isin"], price=quote["price"])
|
|
|
|
|
|
async def run(loop):
|
|
""" Main coroutine
|
|
"""
|
|
db = DB()
|
|
mem_store = mem_storage.MemStorage()
|
|
futures = []
|
|
futures.append(loop.create_task(get_instruments(db)))
|
|
futures.append(loop.create_task(get_quotes(db, mem_store)))
|
|
await asyncio.gather(*futures)
|