Compare commits

...

2 Commits

Author SHA1 Message Date
b7e066b525 Merge pull request 'Refactor settings in batcher' (#2) from refactor-batcher-settings into dev
All checks were successful
CI/CD Pipeline / lint-sast (push) Successful in 2m35s
CI/CD Pipeline / deploy (push) Successful in 1m54s
Reviewed-on: http://79.137.198.26/danyadjan/db_kyc_project/pulls/2
2025-05-27 20:54:41 +03:00
9a052700dd Refactor settings in batcher 2025-05-11 10:49:39 +03:00
10 changed files with 55 additions and 46 deletions

View File

@ -1,7 +1,7 @@
#!/bin/sh
for i in $(seq 1 "${CELERY_WORKER_COUNT}"); do
celery -A clicker worker -l info --concurrency=10 -n "worker${i}@$(%h)"
celery -A clicker worker -l info --concurrency=10 -n "worker${i}"
done
celery -A clicker beat -l info

View File

@ -7,7 +7,7 @@ from starlette.exceptions import HTTPException
from app.src.routers.api import router as router_api
from app.src.routers.handlers import http_error_handler
from app.src.domain.setting import launch_consumer
from app.src.db import connect_pg, get_connection, get_channel, get_rmq
from app.src.db import connect_pg, get_connection, get_channel, get_rmq, get_pg
def get_application() -> FastAPI:
@ -31,7 +31,7 @@ app = get_application()
@app.on_event("startup")
async def startup():
launch_consumer(get_connection)
launch_consumer(connect_pg, get_connection)
app.state.pg_pool = await connect_pg()

View File

@ -2,6 +2,8 @@ DROP VIEW coefficients;
DROP TABLE clicks;
DROP TABLE settings;
DROP TABLE users;
DROP TABLE global_stat;

View File

@ -13,6 +13,11 @@ CREATE TABLE clicks(
);
CREATE INDEX clicks_user_id_time_idx ON clicks(user_id, time);
CREATE TABLE settings(
name VARCHAR(255) PRIMARY KEY,
value DECIMAL(100, 2) NOT NULL
);
CREATE MATERIALIZED VIEW coefficients AS
SELECT
user_id,

View File

@ -8,7 +8,6 @@ from ..config import RMQ_HOST, RMQ_PORT, RMQ_USER, RMQ_PASSWORD
fqdn = f'amqp://{RMQ_USER}:{str(RMQ_PASSWORD)}@{RMQ_HOST}:{RMQ_PORT}/'
logger = logging.getLogger("uvicorn")
async def get_connection() -> AbstractRobustConnection:
return await aio_pika.connect_robust(fqdn)

View File

@ -43,10 +43,10 @@ async def delete_user_info(pg: asyncpg.Connection, user_id: int) -> None:
async def click_value(pg: asyncpg.Connection, user_id: int) -> decimal.Decimal:
price_per_click = get_setting('PRICE_PER_CLICK')
day_multiplier = get_setting('DAY_MULT')
week_multiplier = get_setting('WEEK_MULT')
progress_multiplier = get_setting('PROGRESS_MULT')
price_per_click = await get_setting(pg, 'PRICE_PER_CLICK')
day_multiplier = await get_setting(pg, 'DAY_MULT')
week_multiplier = await get_setting(pg, 'WEEK_MULT')
progress_multiplier = await get_setting(pg, 'PROGRESS_MULT')
# period coefficients
day_coef = await period_coefficient(pg, user_id, 24, day_multiplier)
@ -86,15 +86,15 @@ async def _get_refresh_energy(pg: asyncpg.Connection, user_id: int, req_token: s
new_auth_date = _auth_date_from_token(req_token)
current_token = await get_user_session(pg, user_id)
if current_token is None:
session_energy = int(get_setting('SESSION_ENERGY'))
session_energy = int(await get_setting(pg, 'SESSION_ENERGY'))
await add_user(pg, user_id, req_token, session_energy)
return session_energy
if current_token != req_token:
last_auth_date = _auth_date_from_token(current_token)
session_cooldown = get_setting('SESSION_COOLDOWN')
session_cooldown = await get_setting(pg, 'SESSION_COOLDOWN')
if new_auth_date - last_auth_date < session_cooldown:
raise HTTPException(status_code=403, detail='Unauthorized')
session_energy = int(get_setting('SESSION_ENERGY'))
session_energy = int(await get_setting(pg, 'SESSION_ENERGY'))
await set_new_session(pg, user_id, req_token, session_energy)
return session_energy
else:

View File

@ -1,21 +0,0 @@
import decimal
import threading
_settings = dict()
mx = threading.Lock()
def get_setting(name: str) -> decimal.Decimal:
try:
mx.acquire()
return _settings[name]
finally:
mx.release()
def set_setting(name: str, value: decimal.Decimal):
try:
mx.acquire()
_settings[name] = value
finally:
mx.release()

View File

@ -0,0 +1,16 @@
from decimal import Decimal
from asyncpg import Connection
async def get_setting(conn: Connection, name: str) -> Decimal:
return await conn.fetchval('SELECT value FROM settings WHERE name=$1', name)
async def set_setting(conn: Connection, name: str, value: Decimal):
query = '''
INSERT INTO settings (name, value)
VALUES ($1, $2)
ON CONFLICT(name) DO UPDATE
SET value=$2
'''
await conn.execute(query, name, value)

View File

@ -2,16 +2,18 @@ import decimal
import json
import aio_pika
from typing import Callable
import asyncpg
SETTING_QUEUE_NAME = "settings"
async def consume_setting_updates(set_setting_func: Callable[[str, decimal.Decimal], None], chan: aio_pika.abc.AbstractChannel):
async def consume_setting_updates(pg_pool: asyncpg.Pool, set_setting_func: Callable[[str, decimal.Decimal], None], chan: aio_pika.abc.AbstractChannel):
queue = await chan.declare_queue(SETTING_QUEUE_NAME, durable=True)
async with queue.iterator() as queue_iter:
async for msg in queue_iter:
async with msg.process():
settings = json.loads(msg.body.decode('utf-8'))
for name, value in settings.items():
set_setting_func(name, decimal.Decimal(value))
async with pg_pool.acquire() as pg_conn:
for name, value in settings.items():
await set_setting_func(pg_conn, name, decimal.Decimal(value))

View File

@ -3,22 +3,28 @@ import threading
import asyncio
from collections.abc import Callable, Awaitable
import aio_pika
import asyncpg
from .repos.in_memory_storage import set_setting, get_setting as ims_get_setting
from .repos.pg import set_setting, get_setting as pg_get_setting
from .repos.rmq import consume_setting_updates
def get_setting(name: str) -> decimal.Decimal:
return ims_get_setting(name)
def get_setting(pg: asyncpg.Connection, name: str) -> decimal.Decimal:
return pg_get_setting(pg, name)
async def start_thread(rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]], *args):
async def start_thread(connect_pg: Callable[[], Awaitable[asyncpg.Pool]], rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]], *args):
pg_pool = await connect_pg()
conn = await rmq_connect_func()
async with conn:
chan = await conn.channel()
await consume_setting_updates(set_setting, chan)
try:
async with conn:
chan = await conn.channel()
await consume_setting_updates(pg_pool, set_setting, chan)
finally:
await pg_pool.close()
def launch_consumer(rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]]):
t = threading.Thread(target=asyncio.run, args=(start_thread(rmq_connect_func),))
def launch_consumer(connect_pg: Callable[[], Awaitable[asyncpg.Pool]], rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]]):
t = threading.Thread(target=asyncio.run, args=(start_thread(connect_pg, rmq_connect_func),))
t.start()