Merge pull request 'Refactor settings in batcher' (#2) from refactor-batcher-settings into dev
Reviewed-on: http://79.137.198.26/danyadjan/db_kyc_project/pulls/2
This commit is contained in:
commit
b7e066b525
|
@ -1,7 +1,7 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
for i in $(seq 1 "${CELERY_WORKER_COUNT}"); do
|
for i in $(seq 1 "${CELERY_WORKER_COUNT}"); do
|
||||||
celery -A clicker worker -l info --concurrency=10 -n "worker${i}@$(%h)"
|
celery -A clicker worker -l info --concurrency=10 -n "worker${i}"
|
||||||
done
|
done
|
||||||
|
|
||||||
celery -A clicker beat -l info
|
celery -A clicker beat -l info
|
||||||
|
|
|
@ -7,7 +7,7 @@ from starlette.exceptions import HTTPException
|
||||||
from app.src.routers.api import router as router_api
|
from app.src.routers.api import router as router_api
|
||||||
from app.src.routers.handlers import http_error_handler
|
from app.src.routers.handlers import http_error_handler
|
||||||
from app.src.domain.setting import launch_consumer
|
from app.src.domain.setting import launch_consumer
|
||||||
from app.src.db import connect_pg, get_connection, get_channel, get_rmq
|
from app.src.db import connect_pg, get_connection, get_channel, get_rmq, get_pg
|
||||||
|
|
||||||
|
|
||||||
def get_application() -> FastAPI:
|
def get_application() -> FastAPI:
|
||||||
|
@ -31,7 +31,7 @@ app = get_application()
|
||||||
|
|
||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
async def startup():
|
async def startup():
|
||||||
launch_consumer(get_connection)
|
launch_consumer(connect_pg, get_connection)
|
||||||
|
|
||||||
app.state.pg_pool = await connect_pg()
|
app.state.pg_pool = await connect_pg()
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,8 @@ DROP VIEW coefficients;
|
||||||
|
|
||||||
DROP TABLE clicks;
|
DROP TABLE clicks;
|
||||||
|
|
||||||
|
DROP TABLE settings;
|
||||||
|
|
||||||
DROP TABLE users;
|
DROP TABLE users;
|
||||||
|
|
||||||
DROP TABLE global_stat;
|
DROP TABLE global_stat;
|
||||||
|
|
|
@ -13,6 +13,11 @@ CREATE TABLE clicks(
|
||||||
);
|
);
|
||||||
CREATE INDEX clicks_user_id_time_idx ON clicks(user_id, time);
|
CREATE INDEX clicks_user_id_time_idx ON clicks(user_id, time);
|
||||||
|
|
||||||
|
CREATE TABLE settings(
|
||||||
|
name VARCHAR(255) PRIMARY KEY,
|
||||||
|
value DECIMAL(100, 2) NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
CREATE MATERIALIZED VIEW coefficients AS
|
CREATE MATERIALIZED VIEW coefficients AS
|
||||||
SELECT
|
SELECT
|
||||||
user_id,
|
user_id,
|
||||||
|
|
|
@ -8,7 +8,6 @@ from ..config import RMQ_HOST, RMQ_PORT, RMQ_USER, RMQ_PASSWORD
|
||||||
|
|
||||||
|
|
||||||
fqdn = f'amqp://{RMQ_USER}:{str(RMQ_PASSWORD)}@{RMQ_HOST}:{RMQ_PORT}/'
|
fqdn = f'amqp://{RMQ_USER}:{str(RMQ_PASSWORD)}@{RMQ_HOST}:{RMQ_PORT}/'
|
||||||
logger = logging.getLogger("uvicorn")
|
|
||||||
|
|
||||||
async def get_connection() -> AbstractRobustConnection:
|
async def get_connection() -> AbstractRobustConnection:
|
||||||
return await aio_pika.connect_robust(fqdn)
|
return await aio_pika.connect_robust(fqdn)
|
||||||
|
|
|
@ -43,10 +43,10 @@ async def delete_user_info(pg: asyncpg.Connection, user_id: int) -> None:
|
||||||
|
|
||||||
|
|
||||||
async def click_value(pg: asyncpg.Connection, user_id: int) -> decimal.Decimal:
|
async def click_value(pg: asyncpg.Connection, user_id: int) -> decimal.Decimal:
|
||||||
price_per_click = get_setting('PRICE_PER_CLICK')
|
price_per_click = await get_setting(pg, 'PRICE_PER_CLICK')
|
||||||
day_multiplier = get_setting('DAY_MULT')
|
day_multiplier = await get_setting(pg, 'DAY_MULT')
|
||||||
week_multiplier = get_setting('WEEK_MULT')
|
week_multiplier = await get_setting(pg, 'WEEK_MULT')
|
||||||
progress_multiplier = get_setting('PROGRESS_MULT')
|
progress_multiplier = await get_setting(pg, 'PROGRESS_MULT')
|
||||||
|
|
||||||
# period coefficients
|
# period coefficients
|
||||||
day_coef = await period_coefficient(pg, user_id, 24, day_multiplier)
|
day_coef = await period_coefficient(pg, user_id, 24, day_multiplier)
|
||||||
|
@ -86,15 +86,15 @@ async def _get_refresh_energy(pg: asyncpg.Connection, user_id: int, req_token: s
|
||||||
new_auth_date = _auth_date_from_token(req_token)
|
new_auth_date = _auth_date_from_token(req_token)
|
||||||
current_token = await get_user_session(pg, user_id)
|
current_token = await get_user_session(pg, user_id)
|
||||||
if current_token is None:
|
if current_token is None:
|
||||||
session_energy = int(get_setting('SESSION_ENERGY'))
|
session_energy = int(await get_setting(pg, 'SESSION_ENERGY'))
|
||||||
await add_user(pg, user_id, req_token, session_energy)
|
await add_user(pg, user_id, req_token, session_energy)
|
||||||
return session_energy
|
return session_energy
|
||||||
if current_token != req_token:
|
if current_token != req_token:
|
||||||
last_auth_date = _auth_date_from_token(current_token)
|
last_auth_date = _auth_date_from_token(current_token)
|
||||||
session_cooldown = get_setting('SESSION_COOLDOWN')
|
session_cooldown = await get_setting(pg, 'SESSION_COOLDOWN')
|
||||||
if new_auth_date - last_auth_date < session_cooldown:
|
if new_auth_date - last_auth_date < session_cooldown:
|
||||||
raise HTTPException(status_code=403, detail='Unauthorized')
|
raise HTTPException(status_code=403, detail='Unauthorized')
|
||||||
session_energy = int(get_setting('SESSION_ENERGY'))
|
session_energy = int(await get_setting(pg, 'SESSION_ENERGY'))
|
||||||
await set_new_session(pg, user_id, req_token, session_energy)
|
await set_new_session(pg, user_id, req_token, session_energy)
|
||||||
return session_energy
|
return session_energy
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -1,21 +0,0 @@
|
||||||
import decimal
|
|
||||||
import threading
|
|
||||||
|
|
||||||
_settings = dict()
|
|
||||||
mx = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def get_setting(name: str) -> decimal.Decimal:
|
|
||||||
try:
|
|
||||||
mx.acquire()
|
|
||||||
return _settings[name]
|
|
||||||
finally:
|
|
||||||
mx.release()
|
|
||||||
|
|
||||||
|
|
||||||
def set_setting(name: str, value: decimal.Decimal):
|
|
||||||
try:
|
|
||||||
mx.acquire()
|
|
||||||
_settings[name] = value
|
|
||||||
finally:
|
|
||||||
mx.release()
|
|
16
batcher/app/src/domain/setting/repos/pg.py
Normal file
16
batcher/app/src/domain/setting/repos/pg.py
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
from decimal import Decimal
|
||||||
|
from asyncpg import Connection
|
||||||
|
|
||||||
|
|
||||||
|
async def get_setting(conn: Connection, name: str) -> Decimal:
|
||||||
|
return await conn.fetchval('SELECT value FROM settings WHERE name=$1', name)
|
||||||
|
|
||||||
|
|
||||||
|
async def set_setting(conn: Connection, name: str, value: Decimal):
|
||||||
|
query = '''
|
||||||
|
INSERT INTO settings (name, value)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
ON CONFLICT(name) DO UPDATE
|
||||||
|
SET value=$2
|
||||||
|
'''
|
||||||
|
await conn.execute(query, name, value)
|
|
@ -2,16 +2,18 @@ import decimal
|
||||||
import json
|
import json
|
||||||
import aio_pika
|
import aio_pika
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
import asyncpg
|
||||||
|
|
||||||
SETTING_QUEUE_NAME = "settings"
|
SETTING_QUEUE_NAME = "settings"
|
||||||
|
|
||||||
async def consume_setting_updates(set_setting_func: Callable[[str, decimal.Decimal], None], chan: aio_pika.abc.AbstractChannel):
|
|
||||||
|
async def consume_setting_updates(pg_pool: asyncpg.Pool, set_setting_func: Callable[[str, decimal.Decimal], None], chan: aio_pika.abc.AbstractChannel):
|
||||||
queue = await chan.declare_queue(SETTING_QUEUE_NAME, durable=True)
|
queue = await chan.declare_queue(SETTING_QUEUE_NAME, durable=True)
|
||||||
|
|
||||||
async with queue.iterator() as queue_iter:
|
async with queue.iterator() as queue_iter:
|
||||||
async for msg in queue_iter:
|
async for msg in queue_iter:
|
||||||
async with msg.process():
|
async with msg.process():
|
||||||
settings = json.loads(msg.body.decode('utf-8'))
|
settings = json.loads(msg.body.decode('utf-8'))
|
||||||
|
async with pg_pool.acquire() as pg_conn:
|
||||||
for name, value in settings.items():
|
for name, value in settings.items():
|
||||||
set_setting_func(name, decimal.Decimal(value))
|
await set_setting_func(pg_conn, name, decimal.Decimal(value))
|
||||||
|
|
|
@ -3,22 +3,28 @@ import threading
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections.abc import Callable, Awaitable
|
from collections.abc import Callable, Awaitable
|
||||||
import aio_pika
|
import aio_pika
|
||||||
|
import asyncpg
|
||||||
|
|
||||||
from .repos.in_memory_storage import set_setting, get_setting as ims_get_setting
|
from .repos.pg import set_setting, get_setting as pg_get_setting
|
||||||
from .repos.rmq import consume_setting_updates
|
from .repos.rmq import consume_setting_updates
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_setting(name: str) -> decimal.Decimal:
|
def get_setting(pg: asyncpg.Connection, name: str) -> decimal.Decimal:
|
||||||
return ims_get_setting(name)
|
return pg_get_setting(pg, name)
|
||||||
|
|
||||||
async def start_thread(rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]], *args):
|
|
||||||
|
async def start_thread(connect_pg: Callable[[], Awaitable[asyncpg.Pool]], rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]], *args):
|
||||||
|
pg_pool = await connect_pg()
|
||||||
conn = await rmq_connect_func()
|
conn = await rmq_connect_func()
|
||||||
|
try:
|
||||||
async with conn:
|
async with conn:
|
||||||
chan = await conn.channel()
|
chan = await conn.channel()
|
||||||
await consume_setting_updates(set_setting, chan)
|
await consume_setting_updates(pg_pool, set_setting, chan)
|
||||||
|
finally:
|
||||||
|
await pg_pool.close()
|
||||||
|
|
||||||
|
|
||||||
def launch_consumer(rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]]):
|
def launch_consumer(connect_pg: Callable[[], Awaitable[asyncpg.Pool]], rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]]):
|
||||||
t = threading.Thread(target=asyncio.run, args=(start_thread(rmq_connect_func),))
|
t = threading.Thread(target=asyncio.run, args=(start_thread(connect_pg, rmq_connect_func),))
|
||||||
t.start()
|
t.start()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user