Batcher repos & usecase for click
This commit is contained in:
parent
faf0edc497
commit
504adfb263
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
||||||
|
.idea
|
11
batcher/Pipfile
Normal file
11
batcher/Pipfile
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
[[source]]
|
||||||
|
url = "https://pypi.org/simple"
|
||||||
|
verify_ssl = true
|
||||||
|
name = "pypi"
|
||||||
|
|
||||||
|
[packages]
|
||||||
|
|
||||||
|
[dev-packages]
|
||||||
|
|
||||||
|
[requires]
|
||||||
|
python_version = "3.12"
|
0
batcher/app/__init__.py
Normal file
0
batcher/app/__init__.py
Normal file
0
batcher/app/src/__init__.py
Normal file
0
batcher/app/src/__init__.py
Normal file
0
batcher/app/src/domain/__init__.py
Normal file
0
batcher/app/src/domain/__init__.py
Normal file
0
batcher/app/src/domain/click/__init__.py
Normal file
0
batcher/app/src/domain/click/__init__.py
Normal file
9
batcher/app/src/domain/click/models.py
Normal file
9
batcher/app/src/domain/click/models.py
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
import datetime
|
||||||
|
import decimal
|
||||||
|
import pydantic
|
||||||
|
|
||||||
|
|
||||||
|
class Click(pydantic.BaseModel):
|
||||||
|
UserID: int
|
||||||
|
DateTime: datetime.datetime
|
||||||
|
Value: decimal.Decimal
|
0
batcher/app/src/domain/click/repos/__init__.py
Normal file
0
batcher/app/src/domain/click/repos/__init__.py
Normal file
50
batcher/app/src/domain/click/repos/pg.py
Normal file
50
batcher/app/src/domain/click/repos/pg.py
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
import decimal
|
||||||
|
from asyncpg import Connection
|
||||||
|
|
||||||
|
from ..models import Click
|
||||||
|
|
||||||
|
|
||||||
|
async def update_click_expiry(conn: Connection, user_id: int, period: int) -> decimal.Decimal:
|
||||||
|
cur_time = datetime.now()
|
||||||
|
cutoff_time = cur_time - timedelta(hours=period)
|
||||||
|
query = '''
|
||||||
|
WITH expired_values AS(
|
||||||
|
UPDATE clicks
|
||||||
|
SET expiry_info=jsonb_set(expiry_info, $1, 'true')
|
||||||
|
WHERE 1=1
|
||||||
|
AND time < $2
|
||||||
|
AND user_id =$3
|
||||||
|
AND not (expiry_info->>$4)::bool
|
||||||
|
RETURNING value
|
||||||
|
)
|
||||||
|
SELECT COALESCE(SUM(value), 0)
|
||||||
|
FROM expired_values
|
||||||
|
;
|
||||||
|
'''
|
||||||
|
period_key = f'period_{period}'
|
||||||
|
return await conn.fetchval(query, [period_key], cutoff_time, user_id, period_key)
|
||||||
|
|
||||||
|
|
||||||
|
async def store(conn: Connection, click: Click) -> int:
|
||||||
|
query = '''
|
||||||
|
INSERT INTO clicks(user_id, time, value, expiry_info)
|
||||||
|
VALUES($1, $2, $3, '{"period_24": false, "period_168": false}')
|
||||||
|
RETURNING id
|
||||||
|
;
|
||||||
|
'''
|
||||||
|
return await conn.fetchval(query, click.UserID, click.DateTime, click.Value)
|
||||||
|
|
||||||
|
|
||||||
|
async def bulk_store_copy(conn: Connection, click: Click, count: int) -> None:
|
||||||
|
args = [(click.UserID, click.DateTime. click.Value) for _ in range(count)]
|
||||||
|
query = '''
|
||||||
|
INSERT INTO clicks(user_id, time, values, expiry_info)
|
||||||
|
VALUES($1, $2, $3, '{"period_24": false, "period_168": false}')
|
||||||
|
;
|
||||||
|
'''
|
||||||
|
await conn.executemany(query, args)
|
||||||
|
|
||||||
|
|
||||||
|
async def delete_by_user_id(conn: Connection, user_id: int):
|
||||||
|
await conn.execute('DELETE FROM clicks WHERE user_id=$1', user_id)
|
169
batcher/app/src/domain/click/repos/redis.py
Normal file
169
batcher/app/src/domain/click/repos/redis.py
Normal file
|
@ -0,0 +1,169 @@
|
||||||
|
import decimal
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
|
import redis.asyncio as redis
|
||||||
|
|
||||||
|
|
||||||
|
async def get_period_sum(r: redis.Redis, user_id: int, period: int) -> decimal.Decimal:
|
||||||
|
sum_str = await r.get(f'period_{period}_user_{user_id}')
|
||||||
|
if sum_str is None:
|
||||||
|
return decimal.Decimal(0)
|
||||||
|
return decimal.Decimal(sum_str)
|
||||||
|
|
||||||
|
|
||||||
|
async def incr_period_sum(r: redis.Redis, user_id: int, _period: int, value: decimal.Decimal) -> decimal.Decimal:
|
||||||
|
return await r.incrbyfloat(f'period_{_period}_user_{user_id}', float(value))
|
||||||
|
|
||||||
|
|
||||||
|
async def get_max_period_sum(r: redis.Redis, _period: int) -> decimal.Decimal:
|
||||||
|
max_sum_str = await r.get(f'max_period_{_period}')
|
||||||
|
if max_sum_str is None:
|
||||||
|
return decimal.Decimal(0)
|
||||||
|
return decimal.Decimal(max_sum_str)
|
||||||
|
|
||||||
|
|
||||||
|
async def compare_max_period_sum(r: redis.Redis, _period: int, _sum: decimal.Decimal) -> None:
|
||||||
|
_script = r.register_script('''
|
||||||
|
local currentValue = tonumber(redis.call('GET', KEYS[1]))
|
||||||
|
local cmpValue = tonumber(ARGV[1])
|
||||||
|
if not currentValue or cmpValue > currentValue then
|
||||||
|
redis.call('SET', KEYS[1], ARGV[1])
|
||||||
|
return cmpValue
|
||||||
|
else
|
||||||
|
return currentValue
|
||||||
|
end
|
||||||
|
''')
|
||||||
|
await _script(keys=[f'max_period_{_period}'], args=[str(_sum)])
|
||||||
|
|
||||||
|
|
||||||
|
async def get_energy(r: redis.Redis, user_id: int) -> int:
|
||||||
|
energy_str = await r.get(f'energy_{user_id}')
|
||||||
|
if energy_str is None:
|
||||||
|
return 0
|
||||||
|
return int(energy_str)
|
||||||
|
|
||||||
|
|
||||||
|
async def set_energy(r: redis.Redis, user_id: int, energy: int) -> int:
|
||||||
|
await r.set(f'energy_{user_id}', energy)
|
||||||
|
|
||||||
|
|
||||||
|
async def decr_energy(r: redis.Redis, user_id: int, amount: int) -> (int, int):
|
||||||
|
_script = r.register_script('''
|
||||||
|
local energy = tonumber(redis.call('GET', KEYS[1]))
|
||||||
|
local delta = tonumber(ARGV[1])
|
||||||
|
if energy < delta then
|
||||||
|
redis.call('SET', KEYS[1], 0)
|
||||||
|
return {0, energy}
|
||||||
|
else
|
||||||
|
local newEnergy = tonumber(redis.call('DECRBY', KEYS[1], ARGV[1]))
|
||||||
|
return {newEnergy, delta}
|
||||||
|
end
|
||||||
|
''')
|
||||||
|
new_energy, spent= map(int, await _script(keys=[f'energy_{user_id}'], args=[amount]))
|
||||||
|
return new_energy, spent
|
||||||
|
|
||||||
|
|
||||||
|
async def get_global_average(r: redis.Redis) -> decimal.Decimal:
|
||||||
|
avg_str = await r.get('global_average')
|
||||||
|
if avg_str is None:
|
||||||
|
return decimal.Decimal(0)
|
||||||
|
return decimal.Decimal(avg_str)
|
||||||
|
|
||||||
|
|
||||||
|
async def update_global_average(r: redis.Redis, value_to_add: decimal.Decimal) -> decimal.Decimal:
|
||||||
|
_script = r.register_script('''
|
||||||
|
local delta = tonumber(ARGV[1]) / tonumber(redis.call('GET', KEYS[1]))
|
||||||
|
return redis.call('INCRBYFLOAT', KEYS[2], delta)
|
||||||
|
''')
|
||||||
|
return decimal.Decimal(await _script(keys=["user_count", "global_average"], args=[float(value_to_add)]))
|
||||||
|
|
||||||
|
|
||||||
|
async def get_user_total(r: redis.Redis, user_id: int) -> decimal.Decimal:
|
||||||
|
total_str = await r.get(f'total_{user_id}')
|
||||||
|
if total_str is None:
|
||||||
|
return decimal.Decimal(0)
|
||||||
|
return decimal.Decimal(total_str)
|
||||||
|
|
||||||
|
|
||||||
|
async def incr_user_count_if_no_clicks(r: redis.Redis, user_id: int) -> int:
|
||||||
|
_script = r.register_script('''
|
||||||
|
local clickCount = tonumber(redis.call('GET', KEYS[1]))
|
||||||
|
local userCount = tonumber(redis.call('GET', KEYS[2]))
|
||||||
|
if (not clickCount) then
|
||||||
|
local oldUserCount = redis.call('GET', KEYS[2])
|
||||||
|
if (not oldUserCount) then
|
||||||
|
redis.call('SET', KEYS[2], 1)
|
||||||
|
redis.call('SET', KEYS[3], 0)
|
||||||
|
return 1
|
||||||
|
end
|
||||||
|
userCount = tonumber(redis.call('INCR', KEYS[2]))
|
||||||
|
oldUserCount = tonumber(oldUserCount)
|
||||||
|
local globalAverage = tonumber(redis.call('GET', KEYS[3]))
|
||||||
|
redis.call('SET', KEYS[3], globalAverage / userCount * oldUserCount)
|
||||||
|
end
|
||||||
|
return userCount
|
||||||
|
''')
|
||||||
|
return int(await _script(keys=[f'total_{user_id}', 'user_count', 'global_average'], args=[]))
|
||||||
|
|
||||||
|
|
||||||
|
async def incr_user_total(r: redis.Redis, user_id: int, value: decimal.Decimal) -> decimal.Decimal:
|
||||||
|
return await r.incrbyfloat(f'total_{user_id}', float(value))
|
||||||
|
|
||||||
|
|
||||||
|
async def get_user_session(r: redis.Redis, user_id: int) -> Optional[str]:
|
||||||
|
return await r.get(f'session_{user_id}')
|
||||||
|
|
||||||
|
|
||||||
|
async def set_user_session(r: redis.Redis, user_id: int, token: str) -> None:
|
||||||
|
await r.set(f'session_{user_id}', token, ex=30 * 60)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_user_count(r: redis.Redis) -> int:
|
||||||
|
user_count_str = await r.get('user_count')
|
||||||
|
if user_count_str is None:
|
||||||
|
return 0
|
||||||
|
return int(user_count_str)
|
||||||
|
|
||||||
|
|
||||||
|
async def incr_user_count(r: redis.Redis) -> int:
|
||||||
|
_script = r.register_script('''
|
||||||
|
local oldCount = redis.call('GET', KEYS[1])
|
||||||
|
if (not oldCount) then
|
||||||
|
redis.call('SET', KEYS[1], 1)
|
||||||
|
redis.call('SET', KEYS[2], 0)
|
||||||
|
return 1
|
||||||
|
end
|
||||||
|
local newCount = tonumber(redis.call('INCR', KEYS[1]))
|
||||||
|
local globalAverage = tonumber(redis.call('GET', KEYS[2]))
|
||||||
|
redis.call('SET', KEYS[2], globalAverage / newCount * oldCount)
|
||||||
|
return newCount
|
||||||
|
''')
|
||||||
|
return int(await _script(keys=['user_count', 'global_average'], args=[]))
|
||||||
|
|
||||||
|
|
||||||
|
async def delete_user_info(r: redis.Redis, user_id: int, periods: List[int]):
|
||||||
|
_script = r.register_script('''
|
||||||
|
local userTotal = redis.call('GET', KEYS[3])
|
||||||
|
if (not userTotal) then
|
||||||
|
return
|
||||||
|
end
|
||||||
|
local oldUserCount = tonumber(redis.call('GET', KEYS[1]))
|
||||||
|
local newUserCount = tonumber(redis.call('DECR', KEYS[1]))
|
||||||
|
local globalAverage = tonumber(redis.call('GET', KEYS[2]))
|
||||||
|
redis.call('SET', KEYS[2], (globalAverage * oldUserCount - userTotal) / newUserCount)
|
||||||
|
for i, v in ipairs(KEYS) do
|
||||||
|
if (i > 2) then
|
||||||
|
redis.call('DEL', v)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
''')
|
||||||
|
keys = [
|
||||||
|
'user_count',
|
||||||
|
'global_average',
|
||||||
|
f'total_{user_id}'
|
||||||
|
f'energy_{user_id}',
|
||||||
|
f'session_{user_id}',
|
||||||
|
]
|
||||||
|
for period in periods:
|
||||||
|
keys.append(f'period_{period}_user_{user_id}')
|
||||||
|
await _script(keys=keys, args=[])
|
29
batcher/app/src/domain/click/repos/rmq.py
Normal file
29
batcher/app/src/domain/click/repos/rmq.py
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
import json
|
||||||
|
import kombu
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from ..models import Click
|
||||||
|
|
||||||
|
|
||||||
|
CELERY_QUEUE_NAME = "celery"
|
||||||
|
SETTING_QUEUE_NAME = "settings"
|
||||||
|
CLICK_TASK_NAME = "clicks.celery.click.handle_click"
|
||||||
|
SETTING_TASK_NAME = "misc.celery.deliver_setting.deliver_setting"
|
||||||
|
|
||||||
|
|
||||||
|
def send_click_batch_copy(conn: kombu.Connection, click: Click, count: int):
|
||||||
|
producer = kombu.Producer(conn)
|
||||||
|
producer.publish(
|
||||||
|
json.dumps({
|
||||||
|
'id': str(uuid.uuid4()),
|
||||||
|
'task': CLICK_TASK_NAME,
|
||||||
|
'args': [click.UserID, int(click.DateTime.timestamp() * 1e3), str(click.Value), count],
|
||||||
|
'kwargs': dict(),
|
||||||
|
}),
|
||||||
|
routing_key=CELERY_QUEUE_NAME,
|
||||||
|
delivery_mode='persistent',
|
||||||
|
mandatory=False,
|
||||||
|
immediate=False,
|
||||||
|
content_type='application/json',
|
||||||
|
serializer='json',
|
||||||
|
)
|
18
batcher/app/src/domain/click/schemas.py
Normal file
18
batcher/app/src/domain/click/schemas.py
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
import decimal
|
||||||
|
|
||||||
|
import pydantic
|
||||||
|
|
||||||
|
from .models import Click
|
||||||
|
|
||||||
|
|
||||||
|
class ClickResponse(pydantic.BaseModel):
|
||||||
|
click: Click
|
||||||
|
energy: int
|
||||||
|
|
||||||
|
|
||||||
|
class ClickValueResponse(pydantic.BaseModel):
|
||||||
|
value: decimal.Decimal
|
||||||
|
|
||||||
|
|
||||||
|
class BatchClickRequest(pydantic.BaseModel):
|
||||||
|
count: int
|
137
batcher/app/src/domain/click/usecase.py
Normal file
137
batcher/app/src/domain/click/usecase.py
Normal file
|
@ -0,0 +1,137 @@
|
||||||
|
from datetime import datetime
|
||||||
|
import decimal
|
||||||
|
from typing import Tuple
|
||||||
|
import aiohttp
|
||||||
|
import redis.asyncio as redis
|
||||||
|
import kombu
|
||||||
|
import asyncpg
|
||||||
|
|
||||||
|
from .repos.redis import (
|
||||||
|
get_period_sum, incr_period_sum, get_max_period_sum, get_user_total, get_global_average,
|
||||||
|
incr_user_count_if_no_clicks, update_global_average, incr_user_total, compare_max_period_sum,
|
||||||
|
delete_user_info as r_delete_user_info, get_user_session, set_user_session, set_energy, get_energy as r_get_energy,
|
||||||
|
decr_energy,
|
||||||
|
)
|
||||||
|
from .repos.pg import update_click_expiry, bulk_store_copy, delete_by_user_id
|
||||||
|
from .repos.rmq import send_click_batch_copy
|
||||||
|
|
||||||
|
from .models import Click
|
||||||
|
|
||||||
|
|
||||||
|
SETTING_DICT = {
|
||||||
|
'PRICE_PER_CLICK': decimal.Decimal(1),
|
||||||
|
'DAY_MULT': decimal.Decimal(1),
|
||||||
|
'WEEK_MULT': decimal.Decimal(1),
|
||||||
|
'PROGRESS_MULT': decimal.Decimal(1),
|
||||||
|
'SESSION_ENERGY': decimal.Decimal(500),
|
||||||
|
}
|
||||||
|
|
||||||
|
PRECISION = 2
|
||||||
|
|
||||||
|
|
||||||
|
async def add_click_batch_copy(r: redis.Redis, pg: asyncpg.Connection, rmq: kombu.Connection, user_id: int, count: int) -> Click:
|
||||||
|
_click_value = await click_value(r, pg, user_id)
|
||||||
|
click_value_sum = _click_value * count
|
||||||
|
|
||||||
|
# update variables
|
||||||
|
await incr_user_count_if_no_clicks(r, user_id)
|
||||||
|
await update_global_average(r, click_value_sum)
|
||||||
|
await incr_user_total(r, user_id, click_value_sum)
|
||||||
|
|
||||||
|
for period in (24, 24*7):
|
||||||
|
new_period_sum = await incr_period_sum(r, user_id, period, click_value_sum)
|
||||||
|
await compare_max_period_sum(r, period, new_period_sum)
|
||||||
|
|
||||||
|
click = Click(
|
||||||
|
UserID=user_id,
|
||||||
|
DateTime=datetime.now(),
|
||||||
|
Value=_click_value,
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
# insert click
|
||||||
|
await bulk_store_copy(pg, click, count)
|
||||||
|
|
||||||
|
# send click to backend
|
||||||
|
send_click_batch_copy(rmq, click, count)
|
||||||
|
|
||||||
|
return click
|
||||||
|
|
||||||
|
|
||||||
|
async def delete_user_info(r: redis.Redis, pg: asyncpg.Connection, user_id: int) -> None:
|
||||||
|
await r_delete_user_info(r, user_id, [24, 168])
|
||||||
|
await delete_by_user_id(pg, user_id)
|
||||||
|
|
||||||
|
|
||||||
|
async def click_value(r: redis.Redis, pg: asyncpg.Connection, user_id: int) -> decimal.Decimal:
|
||||||
|
price_per_click = get_setting('PRICE_PER_CLICK')
|
||||||
|
day_multiplier = get_setting('DAY_MULT')
|
||||||
|
week_multiplier = get_setting('WEEK_MULT')
|
||||||
|
progress_multiplier = get_setting('PROGRESS_MULT')
|
||||||
|
|
||||||
|
# period coefficients
|
||||||
|
day_coef = await period_coefficient(r, pg, user_id, 24, day_multiplier)
|
||||||
|
week_coef = await period_coefficient(r, pg, user_id, 24*7, week_multiplier)
|
||||||
|
|
||||||
|
# progress coefficient
|
||||||
|
user_total = await get_user_total(r, user_id)
|
||||||
|
global_avg = await get_global_average(r)
|
||||||
|
progress_coef = progress_coefficient(user_total, global_avg, progress_multiplier)
|
||||||
|
|
||||||
|
return round(price_per_click * day_coef * week_coef * progress_coef, PRECISION)
|
||||||
|
|
||||||
|
|
||||||
|
async def period_coefficient(r: redis.Redis, pg: asyncpg.Connection, user_id: int, period: int, multiplier: decimal.Decimal) -> decimal.Decimal:
|
||||||
|
current_sum = await get_period_sum(r, user_id, period)
|
||||||
|
expired_sum = await update_click_expiry(pg, user_id, period)
|
||||||
|
new_sum = current_sum - expired_sum
|
||||||
|
await incr_period_sum(r, user_id, period, -expired_sum)
|
||||||
|
max_period_sum = await get_max_period_sum(r, period)
|
||||||
|
if max_period_sum == decimal.Decimal(0):
|
||||||
|
return decimal.Decimal(1)
|
||||||
|
return new_sum * multiplier / max_period_sum + 1
|
||||||
|
|
||||||
|
|
||||||
|
def progress_coefficient(user_total: decimal.Decimal, global_avg: decimal.Decimal, multiplier: decimal.Decimal) -> decimal.Decimal:
|
||||||
|
if user_total == decimal.Decimal(0):
|
||||||
|
return decimal.Decimal(1)
|
||||||
|
return min(global_avg * multiplier / user_total + 1, decimal.Decimal(2))
|
||||||
|
|
||||||
|
|
||||||
|
async def check_registration(r: redis.Redis, user_id: int, _token: str, backend_url: str) -> bool:
|
||||||
|
if await _has_any_clicks(r, user_id):
|
||||||
|
return True
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.get(f'{backend_url}/api/v1/users/{user_id}', headers={'Authorization': _token}) as resp:
|
||||||
|
return resp.status == 200
|
||||||
|
|
||||||
|
|
||||||
|
async def _has_any_clicks(r: redis.Redis, user_id: int) -> bool:
|
||||||
|
total_value = await get_user_total(r, user_id)
|
||||||
|
return total_value > decimal.Decimal(0)
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_refresh_energy(r: redis.Redis, user_id: int, req_token: str) -> int:
|
||||||
|
current_token = await get_user_session(r, user_id)
|
||||||
|
if current_token != req_token:
|
||||||
|
session_energy = int(get_setting('SESSION_ENERGY'))
|
||||||
|
await set_user_session(r, user_id, req_token)
|
||||||
|
await set_energy(r, user_id, session_energy)
|
||||||
|
return session_energy
|
||||||
|
else:
|
||||||
|
return await r_get_energy(r, user_id)
|
||||||
|
|
||||||
|
|
||||||
|
async def check_energy(r: redis.Redis, user_id: int, amount: int, _token: str) -> Tuple[int, int]:
|
||||||
|
_energy = await _get_refresh_energy(r, user_id, _token)
|
||||||
|
if _energy == 0:
|
||||||
|
return 0, 0
|
||||||
|
return await decr_energy(r, user_id, amount)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_energy(r: redis.Redis, user_id: int, _token: str) -> int:
|
||||||
|
return await _get_refresh_energy(r, user_id, _token)
|
||||||
|
|
||||||
|
|
||||||
|
def get_setting(name: str) -> decimal.Decimal:
|
||||||
|
return SETTING_DICT[name]
|
Loading…
Reference in New Issue
Block a user