From 504adfb2636f39751aca863fb83191be9f69a576 Mon Sep 17 00:00:00 2001 From: Michail Kostochka Date: Sun, 20 Oct 2024 13:27:14 +0300 Subject: [PATCH] Batcher repos & usecase for click --- .gitignore | 1 + batcher/Pipfile | 11 ++ batcher/app/__init__.py | 0 batcher/app/src/__init__.py | 0 batcher/app/src/domain/__init__.py | 0 batcher/app/src/domain/click/__init__.py | 0 batcher/app/src/domain/click/models.py | 9 + .../app/src/domain/click/repos/__init__.py | 0 batcher/app/src/domain/click/repos/pg.py | 50 ++++++ batcher/app/src/domain/click/repos/redis.py | 169 ++++++++++++++++++ batcher/app/src/domain/click/repos/rmq.py | 29 +++ batcher/app/src/domain/click/schemas.py | 18 ++ batcher/app/src/domain/click/usecase.py | 137 ++++++++++++++ 13 files changed, 424 insertions(+) create mode 100644 .gitignore create mode 100644 batcher/Pipfile create mode 100644 batcher/app/__init__.py create mode 100644 batcher/app/src/__init__.py create mode 100644 batcher/app/src/domain/__init__.py create mode 100644 batcher/app/src/domain/click/__init__.py create mode 100644 batcher/app/src/domain/click/models.py create mode 100644 batcher/app/src/domain/click/repos/__init__.py create mode 100644 batcher/app/src/domain/click/repos/pg.py create mode 100644 batcher/app/src/domain/click/repos/redis.py create mode 100644 batcher/app/src/domain/click/repos/rmq.py create mode 100644 batcher/app/src/domain/click/schemas.py create mode 100644 batcher/app/src/domain/click/usecase.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/batcher/Pipfile b/batcher/Pipfile new file mode 100644 index 0000000..645a67e --- /dev/null +++ b/batcher/Pipfile @@ -0,0 +1,11 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] + +[dev-packages] + +[requires] +python_version = "3.12" diff --git a/batcher/app/__init__.py b/batcher/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/batcher/app/src/__init__.py b/batcher/app/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/batcher/app/src/domain/__init__.py b/batcher/app/src/domain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/batcher/app/src/domain/click/__init__.py b/batcher/app/src/domain/click/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/batcher/app/src/domain/click/models.py b/batcher/app/src/domain/click/models.py new file mode 100644 index 0000000..300b121 --- /dev/null +++ b/batcher/app/src/domain/click/models.py @@ -0,0 +1,9 @@ +import datetime +import decimal +import pydantic + + +class Click(pydantic.BaseModel): + UserID: int + DateTime: datetime.datetime + Value: decimal.Decimal diff --git a/batcher/app/src/domain/click/repos/__init__.py b/batcher/app/src/domain/click/repos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/batcher/app/src/domain/click/repos/pg.py b/batcher/app/src/domain/click/repos/pg.py new file mode 100644 index 0000000..b1ad83c --- /dev/null +++ b/batcher/app/src/domain/click/repos/pg.py @@ -0,0 +1,50 @@ +from datetime import datetime, timedelta +import decimal +from asyncpg import Connection + +from ..models import Click + + +async def update_click_expiry(conn: Connection, user_id: int, period: int) -> decimal.Decimal: + cur_time = datetime.now() + cutoff_time = cur_time - timedelta(hours=period) + query = ''' + WITH expired_values AS( + UPDATE clicks + SET expiry_info=jsonb_set(expiry_info, $1, 'true') + WHERE 1=1 + AND time < $2 + AND user_id =$3 + AND not (expiry_info->>$4)::bool + RETURNING value + ) + SELECT COALESCE(SUM(value), 0) + FROM expired_values + ; + ''' + period_key = f'period_{period}' + return await conn.fetchval(query, [period_key], cutoff_time, user_id, period_key) + + +async def store(conn: Connection, click: Click) -> int: + query = ''' + INSERT INTO clicks(user_id, time, value, expiry_info) + VALUES($1, $2, $3, '{"period_24": false, "period_168": false}') + RETURNING id + ; + ''' + return await conn.fetchval(query, click.UserID, click.DateTime, click.Value) + + +async def bulk_store_copy(conn: Connection, click: Click, count: int) -> None: + args = [(click.UserID, click.DateTime. click.Value) for _ in range(count)] + query = ''' + INSERT INTO clicks(user_id, time, values, expiry_info) + VALUES($1, $2, $3, '{"period_24": false, "period_168": false}') + ; + ''' + await conn.executemany(query, args) + + +async def delete_by_user_id(conn: Connection, user_id: int): + await conn.execute('DELETE FROM clicks WHERE user_id=$1', user_id) diff --git a/batcher/app/src/domain/click/repos/redis.py b/batcher/app/src/domain/click/repos/redis.py new file mode 100644 index 0000000..5a64365 --- /dev/null +++ b/batcher/app/src/domain/click/repos/redis.py @@ -0,0 +1,169 @@ +import decimal +from typing import Optional, List + +import redis.asyncio as redis + + +async def get_period_sum(r: redis.Redis, user_id: int, period: int) -> decimal.Decimal: + sum_str = await r.get(f'period_{period}_user_{user_id}') + if sum_str is None: + return decimal.Decimal(0) + return decimal.Decimal(sum_str) + + +async def incr_period_sum(r: redis.Redis, user_id: int, _period: int, value: decimal.Decimal) -> decimal.Decimal: + return await r.incrbyfloat(f'period_{_period}_user_{user_id}', float(value)) + + +async def get_max_period_sum(r: redis.Redis, _period: int) -> decimal.Decimal: + max_sum_str = await r.get(f'max_period_{_period}') + if max_sum_str is None: + return decimal.Decimal(0) + return decimal.Decimal(max_sum_str) + + +async def compare_max_period_sum(r: redis.Redis, _period: int, _sum: decimal.Decimal) -> None: + _script = r.register_script(''' + local currentValue = tonumber(redis.call('GET', KEYS[1])) + local cmpValue = tonumber(ARGV[1]) + if not currentValue or cmpValue > currentValue then + redis.call('SET', KEYS[1], ARGV[1]) + return cmpValue + else + return currentValue + end + ''') + await _script(keys=[f'max_period_{_period}'], args=[str(_sum)]) + + +async def get_energy(r: redis.Redis, user_id: int) -> int: + energy_str = await r.get(f'energy_{user_id}') + if energy_str is None: + return 0 + return int(energy_str) + + +async def set_energy(r: redis.Redis, user_id: int, energy: int) -> int: + await r.set(f'energy_{user_id}', energy) + + +async def decr_energy(r: redis.Redis, user_id: int, amount: int) -> (int, int): + _script = r.register_script(''' + local energy = tonumber(redis.call('GET', KEYS[1])) + local delta = tonumber(ARGV[1]) + if energy < delta then + redis.call('SET', KEYS[1], 0) + return {0, energy} + else + local newEnergy = tonumber(redis.call('DECRBY', KEYS[1], ARGV[1])) + return {newEnergy, delta} + end + ''') + new_energy, spent= map(int, await _script(keys=[f'energy_{user_id}'], args=[amount])) + return new_energy, spent + + +async def get_global_average(r: redis.Redis) -> decimal.Decimal: + avg_str = await r.get('global_average') + if avg_str is None: + return decimal.Decimal(0) + return decimal.Decimal(avg_str) + + +async def update_global_average(r: redis.Redis, value_to_add: decimal.Decimal) -> decimal.Decimal: + _script = r.register_script(''' + local delta = tonumber(ARGV[1]) / tonumber(redis.call('GET', KEYS[1])) + return redis.call('INCRBYFLOAT', KEYS[2], delta) + ''') + return decimal.Decimal(await _script(keys=["user_count", "global_average"], args=[float(value_to_add)])) + + +async def get_user_total(r: redis.Redis, user_id: int) -> decimal.Decimal: + total_str = await r.get(f'total_{user_id}') + if total_str is None: + return decimal.Decimal(0) + return decimal.Decimal(total_str) + + +async def incr_user_count_if_no_clicks(r: redis.Redis, user_id: int) -> int: + _script = r.register_script(''' + local clickCount = tonumber(redis.call('GET', KEYS[1])) + local userCount = tonumber(redis.call('GET', KEYS[2])) + if (not clickCount) then + local oldUserCount = redis.call('GET', KEYS[2]) + if (not oldUserCount) then + redis.call('SET', KEYS[2], 1) + redis.call('SET', KEYS[3], 0) + return 1 + end + userCount = tonumber(redis.call('INCR', KEYS[2])) + oldUserCount = tonumber(oldUserCount) + local globalAverage = tonumber(redis.call('GET', KEYS[3])) + redis.call('SET', KEYS[3], globalAverage / userCount * oldUserCount) + end + return userCount + ''') + return int(await _script(keys=[f'total_{user_id}', 'user_count', 'global_average'], args=[])) + + +async def incr_user_total(r: redis.Redis, user_id: int, value: decimal.Decimal) -> decimal.Decimal: + return await r.incrbyfloat(f'total_{user_id}', float(value)) + + +async def get_user_session(r: redis.Redis, user_id: int) -> Optional[str]: + return await r.get(f'session_{user_id}') + + +async def set_user_session(r: redis.Redis, user_id: int, token: str) -> None: + await r.set(f'session_{user_id}', token, ex=30 * 60) + + +async def get_user_count(r: redis.Redis) -> int: + user_count_str = await r.get('user_count') + if user_count_str is None: + return 0 + return int(user_count_str) + + +async def incr_user_count(r: redis.Redis) -> int: + _script = r.register_script(''' + local oldCount = redis.call('GET', KEYS[1]) + if (not oldCount) then + redis.call('SET', KEYS[1], 1) + redis.call('SET', KEYS[2], 0) + return 1 + end + local newCount = tonumber(redis.call('INCR', KEYS[1])) + local globalAverage = tonumber(redis.call('GET', KEYS[2])) + redis.call('SET', KEYS[2], globalAverage / newCount * oldCount) + return newCount + ''') + return int(await _script(keys=['user_count', 'global_average'], args=[])) + + +async def delete_user_info(r: redis.Redis, user_id: int, periods: List[int]): + _script = r.register_script(''' + local userTotal = redis.call('GET', KEYS[3]) + if (not userTotal) then + return + end + local oldUserCount = tonumber(redis.call('GET', KEYS[1])) + local newUserCount = tonumber(redis.call('DECR', KEYS[1])) + local globalAverage = tonumber(redis.call('GET', KEYS[2])) + redis.call('SET', KEYS[2], (globalAverage * oldUserCount - userTotal) / newUserCount) + for i, v in ipairs(KEYS) do + if (i > 2) then + redis.call('DEL', v) + end + end + ''') + keys = [ + 'user_count', + 'global_average', + f'total_{user_id}' + f'energy_{user_id}', + f'session_{user_id}', + ] + for period in periods: + keys.append(f'period_{period}_user_{user_id}') + await _script(keys=keys, args=[]) diff --git a/batcher/app/src/domain/click/repos/rmq.py b/batcher/app/src/domain/click/repos/rmq.py new file mode 100644 index 0000000..01cdece --- /dev/null +++ b/batcher/app/src/domain/click/repos/rmq.py @@ -0,0 +1,29 @@ +import json +import kombu +import uuid + +from ..models import Click + + +CELERY_QUEUE_NAME = "celery" +SETTING_QUEUE_NAME = "settings" +CLICK_TASK_NAME = "clicks.celery.click.handle_click" +SETTING_TASK_NAME = "misc.celery.deliver_setting.deliver_setting" + + +def send_click_batch_copy(conn: kombu.Connection, click: Click, count: int): + producer = kombu.Producer(conn) + producer.publish( + json.dumps({ + 'id': str(uuid.uuid4()), + 'task': CLICK_TASK_NAME, + 'args': [click.UserID, int(click.DateTime.timestamp() * 1e3), str(click.Value), count], + 'kwargs': dict(), + }), + routing_key=CELERY_QUEUE_NAME, + delivery_mode='persistent', + mandatory=False, + immediate=False, + content_type='application/json', + serializer='json', + ) diff --git a/batcher/app/src/domain/click/schemas.py b/batcher/app/src/domain/click/schemas.py new file mode 100644 index 0000000..717c377 --- /dev/null +++ b/batcher/app/src/domain/click/schemas.py @@ -0,0 +1,18 @@ +import decimal + +import pydantic + +from .models import Click + + +class ClickResponse(pydantic.BaseModel): + click: Click + energy: int + + +class ClickValueResponse(pydantic.BaseModel): + value: decimal.Decimal + + +class BatchClickRequest(pydantic.BaseModel): + count: int diff --git a/batcher/app/src/domain/click/usecase.py b/batcher/app/src/domain/click/usecase.py new file mode 100644 index 0000000..4c87148 --- /dev/null +++ b/batcher/app/src/domain/click/usecase.py @@ -0,0 +1,137 @@ +from datetime import datetime +import decimal +from typing import Tuple +import aiohttp +import redis.asyncio as redis +import kombu +import asyncpg + +from .repos.redis import ( + get_period_sum, incr_period_sum, get_max_period_sum, get_user_total, get_global_average, + incr_user_count_if_no_clicks, update_global_average, incr_user_total, compare_max_period_sum, + delete_user_info as r_delete_user_info, get_user_session, set_user_session, set_energy, get_energy as r_get_energy, + decr_energy, +) +from .repos.pg import update_click_expiry, bulk_store_copy, delete_by_user_id +from .repos.rmq import send_click_batch_copy + +from .models import Click + + +SETTING_DICT = { + 'PRICE_PER_CLICK': decimal.Decimal(1), + 'DAY_MULT': decimal.Decimal(1), + 'WEEK_MULT': decimal.Decimal(1), + 'PROGRESS_MULT': decimal.Decimal(1), + 'SESSION_ENERGY': decimal.Decimal(500), +} + +PRECISION = 2 + + +async def add_click_batch_copy(r: redis.Redis, pg: asyncpg.Connection, rmq: kombu.Connection, user_id: int, count: int) -> Click: + _click_value = await click_value(r, pg, user_id) + click_value_sum = _click_value * count + + # update variables + await incr_user_count_if_no_clicks(r, user_id) + await update_global_average(r, click_value_sum) + await incr_user_total(r, user_id, click_value_sum) + + for period in (24, 24*7): + new_period_sum = await incr_period_sum(r, user_id, period, click_value_sum) + await compare_max_period_sum(r, period, new_period_sum) + + click = Click( + UserID=user_id, + DateTime=datetime.now(), + Value=_click_value, + + ) + + # insert click + await bulk_store_copy(pg, click, count) + + # send click to backend + send_click_batch_copy(rmq, click, count) + + return click + + +async def delete_user_info(r: redis.Redis, pg: asyncpg.Connection, user_id: int) -> None: + await r_delete_user_info(r, user_id, [24, 168]) + await delete_by_user_id(pg, user_id) + + +async def click_value(r: redis.Redis, pg: asyncpg.Connection, user_id: int) -> decimal.Decimal: + price_per_click = get_setting('PRICE_PER_CLICK') + day_multiplier = get_setting('DAY_MULT') + week_multiplier = get_setting('WEEK_MULT') + progress_multiplier = get_setting('PROGRESS_MULT') + + # period coefficients + day_coef = await period_coefficient(r, pg, user_id, 24, day_multiplier) + week_coef = await period_coefficient(r, pg, user_id, 24*7, week_multiplier) + + # progress coefficient + user_total = await get_user_total(r, user_id) + global_avg = await get_global_average(r) + progress_coef = progress_coefficient(user_total, global_avg, progress_multiplier) + + return round(price_per_click * day_coef * week_coef * progress_coef, PRECISION) + + +async def period_coefficient(r: redis.Redis, pg: asyncpg.Connection, user_id: int, period: int, multiplier: decimal.Decimal) -> decimal.Decimal: + current_sum = await get_period_sum(r, user_id, period) + expired_sum = await update_click_expiry(pg, user_id, period) + new_sum = current_sum - expired_sum + await incr_period_sum(r, user_id, period, -expired_sum) + max_period_sum = await get_max_period_sum(r, period) + if max_period_sum == decimal.Decimal(0): + return decimal.Decimal(1) + return new_sum * multiplier / max_period_sum + 1 + + +def progress_coefficient(user_total: decimal.Decimal, global_avg: decimal.Decimal, multiplier: decimal.Decimal) -> decimal.Decimal: + if user_total == decimal.Decimal(0): + return decimal.Decimal(1) + return min(global_avg * multiplier / user_total + 1, decimal.Decimal(2)) + + +async def check_registration(r: redis.Redis, user_id: int, _token: str, backend_url: str) -> bool: + if await _has_any_clicks(r, user_id): + return True + async with aiohttp.ClientSession() as session: + async with session.get(f'{backend_url}/api/v1/users/{user_id}', headers={'Authorization': _token}) as resp: + return resp.status == 200 + + +async def _has_any_clicks(r: redis.Redis, user_id: int) -> bool: + total_value = await get_user_total(r, user_id) + return total_value > decimal.Decimal(0) + + +async def _get_refresh_energy(r: redis.Redis, user_id: int, req_token: str) -> int: + current_token = await get_user_session(r, user_id) + if current_token != req_token: + session_energy = int(get_setting('SESSION_ENERGY')) + await set_user_session(r, user_id, req_token) + await set_energy(r, user_id, session_energy) + return session_energy + else: + return await r_get_energy(r, user_id) + + +async def check_energy(r: redis.Redis, user_id: int, amount: int, _token: str) -> Tuple[int, int]: + _energy = await _get_refresh_energy(r, user_id, _token) + if _energy == 0: + return 0, 0 + return await decr_energy(r, user_id, amount) + + +async def get_energy(r: redis.Redis, user_id: int, _token: str) -> int: + return await _get_refresh_energy(r, user_id, _token) + + +def get_setting(name: str) -> decimal.Decimal: + return SETTING_DICT[name]