Fix backend & batcher
This commit is contained in:
parent
c668cb5487
commit
c6e60fab39
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -3,3 +3,5 @@
|
||||||
.DS_Store
|
.DS_Store
|
||||||
__pycache__/
|
__pycache__/
|
||||||
*.py[cod]
|
*.py[cod]
|
||||||
|
celerybeat-schedule
|
||||||
|
backend/static
|
||||||
|
|
|
@ -25,7 +25,10 @@ SECRET_KEY = os.getenv('SECRET_KEY', 'django-insecure-4nww@d-th@7^(chggt5q+$e*d_
|
||||||
DEBUG = int(os.getenv('DEBUG', 0))
|
DEBUG = int(os.getenv('DEBUG', 0))
|
||||||
PROD = 1 - DEBUG
|
PROD = 1 - DEBUG
|
||||||
|
|
||||||
ALLOWED_HOSTS = ['crowngame.ru', 'backend', '127.0.0.1']
|
ALLOWED_HOSTS = ['backend', '127.0.0.1']
|
||||||
|
if app_url := os.getenv('APP_URL', None):
|
||||||
|
ALLOWED_HOSTS.append(app_url)
|
||||||
|
|
||||||
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
|
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')
|
||||||
USE_X_FORWARDED_HOST = True
|
USE_X_FORWARDED_HOST = True
|
||||||
USE_X_FORWARDED_PORT = True
|
USE_X_FORWARDED_PORT = True
|
||||||
|
|
|
@ -6,9 +6,5 @@ class MiscConfig(AppConfig):
|
||||||
name = "misc"
|
name = "misc"
|
||||||
|
|
||||||
def ready(self):
|
def ready(self):
|
||||||
from .celery import deliver_setting as deliver_setting_celery
|
|
||||||
from .signals import deliver_setting
|
from .signals import deliver_setting
|
||||||
from misc.models import Setting
|
|
||||||
|
|
||||||
for setting in Setting.objects.all():
|
|
||||||
deliver_setting_celery.delay(setting.name)
|
|
||||||
|
|
0
backend/misc/management/__init__.py
Normal file
0
backend/misc/management/__init__.py
Normal file
0
backend/misc/management/commands/__init__.py
Normal file
0
backend/misc/management/commands/__init__.py
Normal file
10
backend/misc/management/commands/send_settings.py
Normal file
10
backend/misc/management/commands/send_settings.py
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
from django.core.management.base import BaseCommand, CommandError
|
||||||
|
from misc.celery import deliver_setting
|
||||||
|
from misc.models import Setting
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = 'Sends all settings to rmq for batcher to consume'
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
for setting in Setting.objects.all():
|
||||||
|
deliver_setting.delay(setting.name)
|
|
@ -5,5 +5,6 @@ set -o pipefail
|
||||||
set -o nounset
|
set -o nounset
|
||||||
|
|
||||||
python manage.py migrate
|
python manage.py migrate
|
||||||
|
python manage.py send_settings
|
||||||
python manage.py collectstatic --noinput --verbosity 0
|
python manage.py collectstatic --noinput --verbosity 0
|
||||||
gunicorn clicker.wsgi -b 0.0.0.0:8000 -w 17 --timeout 600 --chdir=/app --access-logfile -
|
gunicorn clicker.wsgi -b 0.0.0.0:8000 -w 17 --timeout 600 --chdir=/app --access-logfile -
|
||||||
|
|
|
@ -6,5 +6,6 @@ set -o nounset
|
||||||
set -o xtrace
|
set -o xtrace
|
||||||
|
|
||||||
python manage.py migrate
|
python manage.py migrate
|
||||||
|
python manage.py send_settings
|
||||||
python manage.py collectstatic --noinput --verbosity 0
|
python manage.py collectstatic --noinput --verbosity 0
|
||||||
python manage.py runserver 0.0.0.0:8000
|
python manage.py runserver 0.0.0.0:8000
|
||||||
|
|
|
@ -33,7 +33,7 @@ class TelegramValidationAuthentication(authentication.BaseAuthentication):
|
||||||
split_res = base64.b64decode(token).decode('utf-8').split(':')
|
split_res = base64.b64decode(token).decode('utf-8').split(':')
|
||||||
try:
|
try:
|
||||||
data_check_string = ':'.join(split_res[:-1]).strip().replace('/', '\\/')
|
data_check_string = ':'.join(split_res[:-1]).strip().replace('/', '\\/')
|
||||||
hash = split_res[-1]
|
_hash = split_res[-1]
|
||||||
except IndexError:
|
except IndexError:
|
||||||
raise exceptions.AuthenticationFailed('Invalid token format')
|
raise exceptions.AuthenticationFailed('Invalid token format')
|
||||||
secret = hmac.new(
|
secret = hmac.new(
|
||||||
|
@ -46,7 +46,7 @@ class TelegramValidationAuthentication(authentication.BaseAuthentication):
|
||||||
msg=data_check_string.encode('utf-8'),
|
msg=data_check_string.encode('utf-8'),
|
||||||
digestmod=hashlib.sha256
|
digestmod=hashlib.sha256
|
||||||
).hexdigest()
|
).hexdigest()
|
||||||
if hash != actual_hash:
|
if _hash != actual_hash:
|
||||||
raise exceptions.AuthenticationFailed('Invalid token (hash check failed)')
|
raise exceptions.AuthenticationFailed('Invalid token (hash check failed)')
|
||||||
|
|
||||||
data_dict = dict([x.split('=') for x in data_check_string.split('\n')])
|
data_dict = dict([x.split('=') for x in data_check_string.split('\n')])
|
||||||
|
|
|
@ -8,4 +8,6 @@ RUN pip install --no-cache-dir --upgrade -r /batcher/requirements.txt
|
||||||
|
|
||||||
COPY ./app /batcher/app
|
COPY ./app /batcher/app
|
||||||
|
|
||||||
CMD ["fastapi", "run", "app/main.py", "--port", "$HTTP_PORT"]
|
ENV PYTHONPATH="${PYTHONPATH}:/batcher/app"
|
||||||
|
|
||||||
|
CMD uvicorn app.main:app --host 0.0.0.0 --port "${HTTP_PORT}"
|
|
@ -1,11 +0,0 @@
|
||||||
[[source]]
|
|
||||||
url = "https://pypi.org/simple"
|
|
||||||
verify_ssl = true
|
|
||||||
name = "pypi"
|
|
||||||
|
|
||||||
[packages]
|
|
||||||
|
|
||||||
[dev-packages]
|
|
||||||
|
|
||||||
[requires]
|
|
||||||
python_version = "3.12"
|
|
|
@ -1,9 +1,13 @@
|
||||||
|
import aio_pika
|
||||||
from fastapi import Depends, FastAPI, Request, Response
|
from fastapi import Depends, FastAPI, Request, Response
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
from functools import partial
|
||||||
from starlette.exceptions import HTTPException
|
from starlette.exceptions import HTTPException
|
||||||
|
|
||||||
from .src.routers.api import router as router_api
|
from app.src.routers.api import router as router_api
|
||||||
from .src.routers.handlers import http_error_handler
|
from app.src.routers.handlers import http_error_handler
|
||||||
|
from app.src.domain.setting import launch_consumer
|
||||||
|
from app.src.db import connect_pg, connect_redis, get_connection, get_channel, get_rmq
|
||||||
|
|
||||||
|
|
||||||
def get_application() -> FastAPI:
|
def get_application() -> FastAPI:
|
||||||
|
@ -23,5 +27,23 @@ def get_application() -> FastAPI:
|
||||||
|
|
||||||
return application
|
return application
|
||||||
|
|
||||||
|
app = get_application()
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def startup():
|
||||||
|
launch_consumer(get_connection)
|
||||||
|
|
||||||
|
app.state.pg_pool = await connect_pg()
|
||||||
|
|
||||||
|
app.state.redis_pool = connect_redis()
|
||||||
|
|
||||||
|
rmq_conn_pool = aio_pika.pool.Pool(get_connection, max_size=2)
|
||||||
|
rmq_chan_pool = aio_pika.pool.Pool(partial(get_channel, conn_pool=rmq_conn_pool), max_size=10)
|
||||||
|
app.state.rmq_chan_pool = rmq_chan_pool
|
||||||
|
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
async def shutdown():
|
||||||
|
await app.state.pg_pool.close()
|
||||||
|
await app.state.redis.close()
|
||||||
|
|
||||||
app = get_application()
|
|
|
@ -1,6 +1,6 @@
|
||||||
import sys
|
import sys
|
||||||
import asyncio
|
import asyncio
|
||||||
from .src.db.pg import migrate
|
from app.src.db.pg import migrate
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
|
@ -2,7 +2,7 @@ from starlette.config import Config
|
||||||
from starlette.datastructures import Secret
|
from starlette.datastructures import Secret
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
|
||||||
config = Config('.env')
|
config = Config()
|
||||||
|
|
||||||
|
|
||||||
REDIS_USER = config('REDIS_USER')
|
REDIS_USER = config('REDIS_USER')
|
||||||
|
@ -11,8 +11,6 @@ REDIS_PORT = config('REDIS_PORT', cast=int)
|
||||||
REDIS_HOST = config('REDIS_HOST')
|
REDIS_HOST = config('REDIS_HOST')
|
||||||
REDIS_DB = config('REDIS_DB')
|
REDIS_DB = config('REDIS_DB')
|
||||||
|
|
||||||
HTTP_PORT = config('HTTP_PORT', cast=int)
|
|
||||||
|
|
||||||
PG_HOST = config('POSTGRES_HOST')
|
PG_HOST = config('POSTGRES_HOST')
|
||||||
PG_PORT = config('POSTGRES_PORT', cast=int)
|
PG_PORT = config('POSTGRES_PORT', cast=int)
|
||||||
PG_USER = config('POSTGRES_USER')
|
PG_USER = config('POSTGRES_USER')
|
||||||
|
@ -22,7 +20,7 @@ PG_DB = config('POSTGRES_DB')
|
||||||
RMQ_HOST = config('RABBITMQ_HOST')
|
RMQ_HOST = config('RABBITMQ_HOST')
|
||||||
RMQ_PORT = config('RABBITMQ_PORT', cast=int)
|
RMQ_PORT = config('RABBITMQ_PORT', cast=int)
|
||||||
RMQ_USER = config('RABBITMQ_DEFAULT_USER')
|
RMQ_USER = config('RABBITMQ_DEFAULT_USER')
|
||||||
RMQ_PASSWORD = config('RABBITMQ_DEFAULT_PASSWORD', cast=Secret)
|
RMQ_PASSWORD = config('RABBITMQ_DEFAULT_PASS', cast=Secret)
|
||||||
|
|
||||||
TG_TOKEN = config('TG_TOKEN', cast=Secret)
|
TG_TOKEN = config('TG_TOKEN', cast=Secret)
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
from .pg import get_pg
|
from .pg import get_pg, connect_pg
|
||||||
from .redis import get_redis
|
from .redis import get_redis, connect_redis
|
||||||
from .rmq import get_rmq
|
from .rmq import get_rmq, get_channel, get_connection
|
|
@ -1 +1 @@
|
||||||
from .pg import get_pg, migrate
|
from .pg import get_pg, migrate, connect_pg
|
||||||
|
|
|
@ -1,32 +1,29 @@
|
||||||
from batcher.app.src.config import PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB
|
from app.src.config import PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DB
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import asyncio
|
from starlette.requests import Request
|
||||||
import asyncpg
|
import asyncpg
|
||||||
from asyncpg_trek import plan, execute, Direction
|
from asyncpg_trek import plan, execute, Direction
|
||||||
from asyncpg_trek.asyncpg import AsyncpgBackend
|
from asyncpg_trek.asyncpg import AsyncpgBackend
|
||||||
|
|
||||||
|
|
||||||
DB_URL = f'postgresql://{PG_USER}:{str(PG_PASSWORD)}@{PG_HOST}:{PG_PORT}/{PG_DB}'
|
DB_URL = f'postgresql://{PG_USER}:{str(PG_PASSWORD)}@{PG_HOST}:{PG_PORT}/{PG_DB}'
|
||||||
MIGRATIONS_DIR = Path(__file__) / "migrations"
|
MIGRATIONS_DIR = Path(__file__).parent.resolve() / "migrations"
|
||||||
|
|
||||||
|
|
||||||
async def connect_db() -> asyncpg.Pool:
|
async def connect_pg() -> asyncpg.Pool:
|
||||||
return await asyncpg.create_pool(DB_URL)
|
return await asyncpg.create_pool(DB_URL)
|
||||||
|
|
||||||
|
|
||||||
pool = asyncio.run(connect_db())
|
async def get_pg(request: Request) -> asyncpg.Connection:
|
||||||
|
async with request.app.state.pg_pool.acquire() as conn:
|
||||||
|
|
||||||
async def get_pg() -> asyncpg.Connection:
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
yield conn
|
yield conn
|
||||||
|
|
||||||
|
|
||||||
async def migrate(
|
async def migrate(
|
||||||
target_revision: str,
|
target_revision: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
pool = await connect_pg()
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
backend = AsyncpgBackend(conn)
|
backend = AsyncpgBackend(conn)
|
||||||
async with backend.connect() as conn:
|
planned = await plan(backend, MIGRATIONS_DIR, target_revision=target_revision, direction=Direction.up)
|
||||||
planned = await plan(conn, backend, MIGRATIONS_DIR, target_revision=target_revision, direction=Direction.up)
|
await execute(backend, planned)
|
||||||
await execute(conn, backend, planned)
|
|
||||||
|
|
|
@ -1,11 +1,14 @@
|
||||||
import asyncio
|
from starlette.requests import Request
|
||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
|
|
||||||
from ..config import REDIS_HOST, REDIS_PORT, REDIS_USER, REDIS_PASSWORD, REDIS_DB
|
from ..config import REDIS_HOST, REDIS_PORT, REDIS_USER, REDIS_PASSWORD, REDIS_DB
|
||||||
|
|
||||||
|
|
||||||
r = asyncio.run(redis.Redis(host=REDIS_HOST, port=REDIS_PORT, username=REDIS_USER, password=REDIS_PASSWORD, db=REDIS_DB))
|
def connect_redis() -> redis.ConnectionPool:
|
||||||
|
return redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, username=REDIS_USER, password=str(REDIS_PASSWORD), db=REDIS_DB)
|
||||||
|
|
||||||
|
|
||||||
def get_redis() -> redis.Redis:
|
async def get_redis(request: Request) -> redis.Redis:
|
||||||
|
r = redis.Redis(connection_pool=request.app.state.redis_pool)
|
||||||
yield r
|
yield r
|
||||||
|
await r.aclose()
|
||||||
|
|
|
@ -1,26 +1,28 @@
|
||||||
import aio_pika
|
|
||||||
from aio_pika.abc import AbstractRobustConnection
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import aio_pika
|
||||||
|
from starlette.requests import Request
|
||||||
|
from aio_pika.abc import AbstractRobustConnection
|
||||||
|
|
||||||
from ..config import RMQ_HOST, RMQ_PORT, RMQ_USER, RMQ_PASSWORD
|
from ..config import RMQ_HOST, RMQ_PORT, RMQ_USER, RMQ_PASSWORD
|
||||||
|
|
||||||
|
|
||||||
|
fqdn = f'amqp://{RMQ_USER}:{str(RMQ_PASSWORD)}@{RMQ_HOST}:{RMQ_PORT}/'
|
||||||
|
|
||||||
async def get_connection() -> AbstractRobustConnection:
|
async def get_connection() -> AbstractRobustConnection:
|
||||||
return await aio_pika.connect_robust(f'amqp://{RMQ_USER}:{RMQ_PASSWORD}@{RMQ_HOST}:{RMQ_PORT}/')
|
while True:
|
||||||
|
try:
|
||||||
|
conn = await aio_pika.connect_robust(fqdn)
|
||||||
|
return conn
|
||||||
|
except ConnectionError:
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
conn_pool = aio_pika.pool.Pool(get_connection, max_size=2)
|
async def get_channel(conn_pool: AbstractRobustConnection) -> aio_pika.Channel:
|
||||||
|
|
||||||
|
|
||||||
async def get_channel() -> aio_pika.Channel:
|
|
||||||
async with conn_pool.acquire() as connection:
|
async with conn_pool.acquire() as connection:
|
||||||
return await connection.channel()
|
return await connection.channel()
|
||||||
|
|
||||||
|
|
||||||
chan_pool = aio_pika.pool.Pool(get_channel, max_size=10)
|
async def get_rmq(request: Request) -> aio_pika.Channel:
|
||||||
|
async with request.app.state.rmq_chan_pool.acquire() as chan:
|
||||||
|
|
||||||
async def get_rmq() -> aio_pika.Channel:
|
|
||||||
async with chan_pool.acquire() as chan:
|
|
||||||
yield chan
|
yield chan
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ async def get_token_header(authorization: str = Header()) -> (int, str):
|
||||||
raise HTTPException(status_code=403, detail='Unauthorized')
|
raise HTTPException(status_code=403, detail='Unauthorized')
|
||||||
secret = hmac.new(
|
secret = hmac.new(
|
||||||
'WebAppData'.encode(),
|
'WebAppData'.encode(),
|
||||||
TG_TOKEN.encode('utf-8'),
|
str(TG_TOKEN).encode('utf-8'),
|
||||||
digestmod=hashlib.sha256
|
digestmod=hashlib.sha256
|
||||||
).digest()
|
).digest()
|
||||||
actual_hash = hmac.new(
|
actual_hash = hmac.new(
|
||||||
|
@ -33,7 +33,7 @@ async def get_token_header(authorization: str = Header()) -> (int, str):
|
||||||
msg=data_check_string.encode('utf-8'),
|
msg=data_check_string.encode('utf-8'),
|
||||||
digestmod=hashlib.sha256
|
digestmod=hashlib.sha256
|
||||||
).hexdigest()
|
).hexdigest()
|
||||||
if hash != actual_hash:
|
if _hash != actual_hash:
|
||||||
raise HTTPException(status_code=403, detail='Unauthorized')
|
raise HTTPException(status_code=403, detail='Unauthorized')
|
||||||
|
|
||||||
data_dict = dict([x.split('=') for x in data_check_string.split('\n')])
|
data_dict = dict([x.split('=') for x in data_check_string.split('\n')])
|
||||||
|
|
|
@ -4,6 +4,6 @@ import pydantic
|
||||||
|
|
||||||
|
|
||||||
class Click(pydantic.BaseModel):
|
class Click(pydantic.BaseModel):
|
||||||
UserID: int
|
userId: int
|
||||||
DateTime: datetime.datetime
|
dateTime: datetime.datetime
|
||||||
Value: decimal.Decimal
|
value: decimal.Decimal
|
||||||
|
|
|
@ -33,13 +33,13 @@ async def store(conn: Connection, click: Click) -> int:
|
||||||
RETURNING id
|
RETURNING id
|
||||||
;
|
;
|
||||||
'''
|
'''
|
||||||
return await conn.fetchval(query, click.UserID, click.DateTime, click.Value)
|
return await conn.fetchval(query, click.userId, click.dateTime, click.value)
|
||||||
|
|
||||||
|
|
||||||
async def bulk_store_copy(conn: Connection, click: Click, count: int) -> None:
|
async def bulk_store_copy(conn: Connection, click: Click, count: int) -> None:
|
||||||
args = [(click.UserID, click.DateTime. click.Value) for _ in range(count)]
|
args = [(click.userId, click.dateTime, click.value) for _ in range(count)]
|
||||||
query = '''
|
query = '''
|
||||||
INSERT INTO clicks(user_id, time, values, expiry_info)
|
INSERT INTO clicks(user_id, time, value, expiry_info)
|
||||||
VALUES($1, $2, $3, '{"period_24": false, "period_168": false}')
|
VALUES($1, $2, $3, '{"period_24": false, "period_168": false}')
|
||||||
;
|
;
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -5,10 +5,10 @@ import redis.asyncio as redis
|
||||||
|
|
||||||
|
|
||||||
async def get_period_sum(r: redis.Redis, user_id: int, period: int) -> decimal.Decimal:
|
async def get_period_sum(r: redis.Redis, user_id: int, period: int) -> decimal.Decimal:
|
||||||
sum_str = await r.get(f'period_{period}_user_{user_id}')
|
sum_bytes = await r.get(f'period_{period}_user_{user_id}')
|
||||||
if sum_str is None:
|
if sum_bytes is None:
|
||||||
return decimal.Decimal(0)
|
return decimal.Decimal(0)
|
||||||
return decimal.Decimal(sum_str)
|
return decimal.Decimal(sum_bytes.decode())
|
||||||
|
|
||||||
|
|
||||||
async def incr_period_sum(r: redis.Redis, user_id: int, _period: int, value: decimal.Decimal) -> decimal.Decimal:
|
async def incr_period_sum(r: redis.Redis, user_id: int, _period: int, value: decimal.Decimal) -> decimal.Decimal:
|
||||||
|
@ -16,10 +16,10 @@ async def incr_period_sum(r: redis.Redis, user_id: int, _period: int, value: dec
|
||||||
|
|
||||||
|
|
||||||
async def get_max_period_sum(r: redis.Redis, _period: int) -> decimal.Decimal:
|
async def get_max_period_sum(r: redis.Redis, _period: int) -> decimal.Decimal:
|
||||||
max_sum_str = await r.get(f'max_period_{_period}')
|
max_sum_bytes = await r.get(f'max_period_{_period}')
|
||||||
if max_sum_str is None:
|
if max_sum_bytes is None:
|
||||||
return decimal.Decimal(0)
|
return decimal.Decimal(0)
|
||||||
return decimal.Decimal(max_sum_str)
|
return decimal.Decimal(max_sum_bytes.decode())
|
||||||
|
|
||||||
|
|
||||||
async def compare_max_period_sum(r: redis.Redis, _period: int, _sum: decimal.Decimal) -> None:
|
async def compare_max_period_sum(r: redis.Redis, _period: int, _sum: decimal.Decimal) -> None:
|
||||||
|
@ -64,10 +64,10 @@ async def decr_energy(r: redis.Redis, user_id: int, amount: int) -> (int, int):
|
||||||
|
|
||||||
|
|
||||||
async def get_global_average(r: redis.Redis) -> decimal.Decimal:
|
async def get_global_average(r: redis.Redis) -> decimal.Decimal:
|
||||||
avg_str = await r.get('global_average')
|
avg_bytes = await r.get('global_average')
|
||||||
if avg_str is None:
|
if avg_bytes is None:
|
||||||
return decimal.Decimal(0)
|
return decimal.Decimal(0)
|
||||||
return decimal.Decimal(avg_str)
|
return decimal.Decimal(avg_bytes.decode())
|
||||||
|
|
||||||
|
|
||||||
async def update_global_average(r: redis.Redis, value_to_add: decimal.Decimal) -> decimal.Decimal:
|
async def update_global_average(r: redis.Redis, value_to_add: decimal.Decimal) -> decimal.Decimal:
|
||||||
|
@ -75,14 +75,14 @@ async def update_global_average(r: redis.Redis, value_to_add: decimal.Decimal) -
|
||||||
local delta = tonumber(ARGV[1]) / tonumber(redis.call('GET', KEYS[1]))
|
local delta = tonumber(ARGV[1]) / tonumber(redis.call('GET', KEYS[1]))
|
||||||
return redis.call('INCRBYFLOAT', KEYS[2], delta)
|
return redis.call('INCRBYFLOAT', KEYS[2], delta)
|
||||||
''')
|
''')
|
||||||
return decimal.Decimal(await _script(keys=["user_count", "global_average"], args=[float(value_to_add)]))
|
return decimal.Decimal((await _script(keys=["user_count", "global_average"], args=[float(value_to_add)])).decode())
|
||||||
|
|
||||||
|
|
||||||
async def get_user_total(r: redis.Redis, user_id: int) -> decimal.Decimal:
|
async def get_user_total(r: redis.Redis, user_id: int) -> decimal.Decimal:
|
||||||
total_str = await r.get(f'total_{user_id}')
|
total_bytes = await r.get(f'total_{user_id}')
|
||||||
if total_str is None:
|
if total_bytes is None:
|
||||||
return decimal.Decimal(0)
|
return decimal.Decimal(0)
|
||||||
return decimal.Decimal(total_str)
|
return decimal.Decimal(total_bytes.decode())
|
||||||
|
|
||||||
|
|
||||||
async def incr_user_count_if_no_clicks(r: redis.Redis, user_id: int) -> int:
|
async def incr_user_count_if_no_clicks(r: redis.Redis, user_id: int) -> int:
|
||||||
|
@ -111,7 +111,10 @@ async def incr_user_total(r: redis.Redis, user_id: int, value: decimal.Decimal)
|
||||||
|
|
||||||
|
|
||||||
async def get_user_session(r: redis.Redis, user_id: int) -> Optional[str]:
|
async def get_user_session(r: redis.Redis, user_id: int) -> Optional[str]:
|
||||||
return await r.get(f'session_{user_id}')
|
session_bytes = await r.get(f'session_{user_id}')
|
||||||
|
if session_bytes is None:
|
||||||
|
return None
|
||||||
|
return session_bytes.decode()
|
||||||
|
|
||||||
|
|
||||||
async def set_user_session(r: redis.Redis, user_id: int, token: str) -> None:
|
async def set_user_session(r: redis.Redis, user_id: int, token: str) -> None:
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import json
|
import json
|
||||||
import aio_pika
|
import aio_pika
|
||||||
import uuid
|
import uuid
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
from ..models import Click
|
from ..models import Click
|
||||||
|
|
||||||
|
@ -9,14 +10,28 @@ CELERY_QUEUE_NAME = "celery"
|
||||||
CLICK_TASK_NAME = "clicks.celery.click.handle_click"
|
CLICK_TASK_NAME = "clicks.celery.click.handle_click"
|
||||||
|
|
||||||
|
|
||||||
def send_click_batch_copy(chan: aio_pika.Channel, click: Click, count: int):
|
async def send_click_batch_copy(chan: aio_pika.Channel, click: Click, count: int):
|
||||||
|
args = (click.userId, int(click.dateTime.timestamp() * 1e3), str(click.value), count)
|
||||||
await chan.default_exchange.publish(
|
await chan.default_exchange.publish(
|
||||||
message=aio_pika.Message(json.dumps({
|
message=aio_pika.Message(
|
||||||
'id': str(uuid.uuid4()),
|
body=json.dumps([
|
||||||
'task': CLICK_TASK_NAME,
|
args,
|
||||||
'args': [click.UserID, int(click.DateTime.timestamp() * 1e3), str(click.Value), count],
|
{},
|
||||||
'kwargs': dict(),
|
{"callbacks": None, "errbacks": None, "chain": None, "chord": None},
|
||||||
}).encode('utf-8')),
|
]).encode('utf-8'),
|
||||||
|
headers={
|
||||||
|
'task': CLICK_TASK_NAME,
|
||||||
|
'lang': 'py',
|
||||||
|
'argsrepr': repr(args),
|
||||||
|
'kwargsrepr': '{}',
|
||||||
|
'id': str(uuid.uuid4()),
|
||||||
|
'eta': datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
|
||||||
|
'retries': 0,
|
||||||
|
},
|
||||||
|
content_type='application/json',
|
||||||
|
content_encoding='utf-8',
|
||||||
|
),
|
||||||
routing_key=CELERY_QUEUE_NAME,
|
routing_key=CELERY_QUEUE_NAME,
|
||||||
mandatory=False,
|
mandatory=False,
|
||||||
|
immediate=False,
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,7 +6,7 @@ import redis.asyncio as redis
|
||||||
import aio_pika
|
import aio_pika
|
||||||
import asyncpg
|
import asyncpg
|
||||||
|
|
||||||
from batcher.app.src.domain.setting import get_setting
|
from app.src.domain.setting import get_setting
|
||||||
from .repos.redis import (
|
from .repos.redis import (
|
||||||
get_period_sum, incr_period_sum, get_max_period_sum, get_user_total, get_global_average,
|
get_period_sum, incr_period_sum, get_max_period_sum, get_user_total, get_global_average,
|
||||||
incr_user_count_if_no_clicks, update_global_average, incr_user_total, compare_max_period_sum,
|
incr_user_count_if_no_clicks, update_global_average, incr_user_total, compare_max_period_sum,
|
||||||
|
@ -35,17 +35,16 @@ async def add_click_batch_copy(r: redis.Redis, pg: asyncpg.Connection, rmq: aio
|
||||||
await compare_max_period_sum(r, period, new_period_sum)
|
await compare_max_period_sum(r, period, new_period_sum)
|
||||||
|
|
||||||
click = Click(
|
click = Click(
|
||||||
UserID=user_id,
|
userId=user_id,
|
||||||
DateTime=datetime.now(),
|
dateTime=datetime.now(),
|
||||||
Value=_click_value,
|
value=_click_value,
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# insert click
|
# insert click
|
||||||
await bulk_store_copy(pg, click, count)
|
await bulk_store_copy(pg, click, count)
|
||||||
|
|
||||||
# send click to backend
|
# send click to backend
|
||||||
send_click_batch_copy(rmq, click, count)
|
await send_click_batch_copy(rmq, click, count)
|
||||||
|
|
||||||
return click
|
return click
|
||||||
|
|
||||||
|
|
|
@ -1,15 +1,12 @@
|
||||||
import decimal
|
import decimal
|
||||||
import json
|
import json
|
||||||
|
|
||||||
import aio_pika
|
import aio_pika
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
|
|
||||||
SETTING_QUEUE_NAME = "settings"
|
SETTING_QUEUE_NAME = "settings"
|
||||||
SETTING_TASK_NAME = "misc.celery.deliver_setting.deliver_setting"
|
|
||||||
|
|
||||||
|
async def consume_setting_updates(set_setting_func: Callable[[str, decimal.Decimal], None], chan: aio_pika.abc.AbstractChannel):
|
||||||
async def consume_setting_updates(update_setting_func: Callable[[str, decimal.Decimal], None], chan: aio_pika.Channel):
|
|
||||||
queue = await chan.get_queue(SETTING_QUEUE_NAME)
|
queue = await chan.get_queue(SETTING_QUEUE_NAME)
|
||||||
|
|
||||||
async with queue.iterator() as queue_iter:
|
async with queue.iterator() as queue_iter:
|
||||||
|
@ -17,4 +14,4 @@ async def consume_setting_updates(update_setting_func: Callable[[str, decimal.De
|
||||||
async with msg.process():
|
async with msg.process():
|
||||||
settings = json.loads(msg.body.decode('utf-8'))
|
settings = json.loads(msg.body.decode('utf-8'))
|
||||||
for name, value in settings.items():
|
for name, value in settings.items():
|
||||||
update_setting_func(name, value)
|
set_setting_func(name, decimal.Decimal(value))
|
||||||
|
|
|
@ -1,16 +1,24 @@
|
||||||
import decimal
|
import decimal
|
||||||
import threading
|
import threading
|
||||||
|
import asyncio
|
||||||
|
from collections.abc import Callable, Awaitable
|
||||||
import aio_pika
|
import aio_pika
|
||||||
|
|
||||||
from .repos.in_memory_storage import get_setting as ims_get_setting
|
from .repos.in_memory_storage import set_setting, get_setting as ims_get_setting
|
||||||
from .repos.rmq import consume_setting_updates
|
from .repos.rmq import consume_setting_updates
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_setting(name: str) -> decimal.Decimal:
|
def get_setting(name: str) -> decimal.Decimal:
|
||||||
return ims_get_setting(name)
|
return ims_get_setting(name)
|
||||||
|
|
||||||
|
async def start_thread(rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]], *args):
|
||||||
|
conn = await rmq_connect_func()
|
||||||
|
async with conn:
|
||||||
|
chan = await conn.channel()
|
||||||
|
await consume_setting_updates(set_setting, chan)
|
||||||
|
|
||||||
def launch_consumer(rmq: aio_pika.Connection):
|
|
||||||
t = threading.Thread(target=consume_setting_updates, args=(ims_get_setting, rmq))
|
def launch_consumer(rmq_connect_func: Callable[[], Awaitable[aio_pika.abc.AbstractRobustConnection]]):
|
||||||
|
t = threading.Thread(target=asyncio.run, args=(start_thread(rmq_connect_func),))
|
||||||
t.start()
|
t.start()
|
||||||
|
|
|
@ -2,7 +2,7 @@ import aio_pika
|
||||||
import asyncpg
|
import asyncpg
|
||||||
import redis
|
import redis
|
||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
from typing import Annotated
|
from typing import Annotated, Tuple
|
||||||
from ..domain.click import (
|
from ..domain.click import (
|
||||||
ClickResponse, BatchClickRequest, EnergyResponse, ClickValueResponse,
|
ClickResponse, BatchClickRequest, EnergyResponse, ClickValueResponse,
|
||||||
add_click_batch_copy, check_registration, check_energy, get_energy, click_value, delete_user_info
|
add_click_batch_copy, check_registration, check_energy, get_energy, click_value, delete_user_info
|
||||||
|
@ -22,9 +22,9 @@ router = APIRouter(
|
||||||
|
|
||||||
|
|
||||||
@router.post("/batch-click/", response_model=ClickResponse, status_code=200)
|
@router.post("/batch-click/", response_model=ClickResponse, status_code=200)
|
||||||
async def batch_click(req: BatchClickRequest, auth_info: Annotated[(int, str), Depends(get_token_header)], pg: Annotated[asyncpg.Connection, Depends(get_pg)], r: Annotated[redis.Redis, Depends(get_redis)], rmq: Annotated[aio_pika.Channel, Depends(get_rmq)]):
|
async def batch_click(req: BatchClickRequest, auth_info: Annotated[Tuple[int, str], Depends(get_token_header)], pg: Annotated[asyncpg.Connection, Depends(get_pg)], r: Annotated[redis.Redis, Depends(get_redis)], rmq: Annotated[aio_pika.Channel, Depends(get_rmq)]):
|
||||||
user_id, token = auth_info
|
user_id, token = auth_info
|
||||||
if not check_registration(r, user_id, token, BACKEND_URL):
|
if not await check_registration(r, user_id, token, BACKEND_URL):
|
||||||
raise HTTPException(status_code=403, detail='Unauthorized')
|
raise HTTPException(status_code=403, detail='Unauthorized')
|
||||||
|
|
||||||
_energy, spent = await check_energy(r, user_id, req.count, token)
|
_energy, spent = await check_energy(r, user_id, req.count, token)
|
||||||
|
@ -39,9 +39,9 @@ async def batch_click(req: BatchClickRequest, auth_info: Annotated[(int, str), D
|
||||||
|
|
||||||
|
|
||||||
@router.get("/energy", response_model=EnergyResponse, status_code=200)
|
@router.get("/energy", response_model=EnergyResponse, status_code=200)
|
||||||
async def energy(auth_info: Annotated[(int, str), Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)]):
|
async def energy(auth_info: Annotated[Tuple[int, str], Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)]):
|
||||||
user_id, token = auth_info
|
user_id, token = auth_info
|
||||||
if not check_registration(r, user_id, token, BACKEND_URL):
|
if not await check_registration(r, user_id, token, BACKEND_URL):
|
||||||
raise HTTPException(status_code=403, detail='Unauthorized')
|
raise HTTPException(status_code=403, detail='Unauthorized')
|
||||||
|
|
||||||
_energy = await get_energy(r, user_id, token)
|
_energy = await get_energy(r, user_id, token)
|
||||||
|
@ -51,9 +51,9 @@ async def energy(auth_info: Annotated[(int, str), Depends(get_token_header)], r:
|
||||||
|
|
||||||
|
|
||||||
@router.get('/coefficient', response_model=ClickValueResponse, status_code=200)
|
@router.get('/coefficient', response_model=ClickValueResponse, status_code=200)
|
||||||
async def coefficient(auth_info: Annotated[(int, str), Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)], pg: Annotated[asyncpg.Connection, Depends(get_pg)]):
|
async def coefficient(auth_info: Annotated[Tuple[int, str], Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)], pg: Annotated[asyncpg.Connection, Depends(get_pg)]):
|
||||||
user_id, token = auth_info
|
user_id, token = auth_info
|
||||||
if not check_registration(r, user_id, token, BACKEND_URL):
|
if not await check_registration(r, user_id, token, BACKEND_URL):
|
||||||
raise HTTPException(status_code=403, detail='Unauthorized')
|
raise HTTPException(status_code=403, detail='Unauthorized')
|
||||||
|
|
||||||
value = await click_value(r, pg, user_id)
|
value = await click_value(r, pg, user_id)
|
||||||
|
@ -63,9 +63,9 @@ async def coefficient(auth_info: Annotated[(int, str), Depends(get_token_header)
|
||||||
|
|
||||||
|
|
||||||
@router.delete('/internal/user', status_code=204)
|
@router.delete('/internal/user', status_code=204)
|
||||||
async def delete_user(auth_info: Annotated[(int, str), Depends(get_token_header())], r: Annotated[redis.Redis, Depends(get_redis)], pg: Annotated[asyncpg.Connection, Depends(get_pg)]):
|
async def delete_user(auth_info: Annotated[Tuple[int, str], Depends(get_token_header)], r: Annotated[redis.Redis, Depends(get_redis)], pg: Annotated[asyncpg.Connection, Depends(get_pg)]):
|
||||||
user_id, token = auth_info
|
user_id, token = auth_info
|
||||||
if not check_registration(r, user_id, token, BACKEND_URL):
|
if not await check_registration(r, user_id, token, BACKEND_URL):
|
||||||
raise HTTPException(status_code=403, detail='Unauthorized')
|
raise HTTPException(status_code=403, detail='Unauthorized')
|
||||||
|
|
||||||
await delete_user_info(r, pg, user_id)
|
await delete_user_info(r, pg, user_id)
|
0
batcher/migrate.sh
Normal file → Executable file
0
batcher/migrate.sh
Normal file → Executable file
|
@ -27,5 +27,6 @@ starlette==0.40.0
|
||||||
typing_extensions==4.12.2
|
typing_extensions==4.12.2
|
||||||
tzdata==2024.2
|
tzdata==2024.2
|
||||||
urllib3==2.2.3
|
urllib3==2.2.3
|
||||||
|
uvicorn==0.32.1
|
||||||
vine==5.1.0
|
vine==5.1.0
|
||||||
yarl==1.15.5
|
yarl==1.15.5
|
||||||
|
|
BIN
bot/.DS_Store
vendored
BIN
bot/.DS_Store
vendored
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -5,7 +5,7 @@ token = os.getenv('TG_TOKEN', '7748003961:AAEIXu8NFICPabNaQP5JQ3AcY79nZdUbKdI')
|
||||||
api_token = os.getenv('API_TOKEN', 'b43fa8ccea5b6dd5e889a8ad3890ce14ce36a8bc') # TODO: remove
|
api_token = os.getenv('API_TOKEN', 'b43fa8ccea5b6dd5e889a8ad3890ce14ce36a8bc') # TODO: remove
|
||||||
backend_url = os.getenv('BACKEND_URL', 'http://backend:8000')
|
backend_url = os.getenv('BACKEND_URL', 'http://backend:8000')
|
||||||
request_url = f'{backend_url}/api'
|
request_url = f'{backend_url}/api'
|
||||||
url = os.getenv('URL', 'https://google.com')
|
url = os.getenv('APP_URL', 'https://google.com')
|
||||||
bot_name = os.getenv('BOT_NAME', 'https://t.me/danyadjan_test_bot')
|
bot_name = os.getenv('BOT_NAME', 'https://t.me/danyadjan_test_bot')
|
||||||
|
|
||||||
bucket_name = 'brawny-basket'
|
bucket_name = 'brawny-basket'
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
bot/pictures/.DS_Store
vendored
BIN
bot/pictures/.DS_Store
vendored
Binary file not shown.
|
@ -1,5 +1,3 @@
|
||||||
version: '3.9'
|
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
db_data: {}
|
db_data: {}
|
||||||
batcher_db_data: {}
|
batcher_db_data: {}
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
version: '3.9'
|
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
db_data: {}
|
db_data: {}
|
||||||
batcher_db_data: {}
|
batcher_db_data: {}
|
||||||
|
|
BIN
frontend/.DS_Store
vendored
BIN
frontend/.DS_Store
vendored
Binary file not shown.
BIN
frontend/public/.DS_Store
vendored
BIN
frontend/public/.DS_Store
vendored
Binary file not shown.
BIN
frontend/public/assets/.DS_Store
vendored
BIN
frontend/public/assets/.DS_Store
vendored
Binary file not shown.
BIN
frontend/src/.DS_Store
vendored
BIN
frontend/src/.DS_Store
vendored
Binary file not shown.
BIN
frontend/src/assets/.DS_Store
vendored
BIN
frontend/src/assets/.DS_Store
vendored
Binary file not shown.
|
@ -14,7 +14,7 @@ import { useNavigate } from 'react-router-dom';
|
||||||
export function StoragePage() {
|
export function StoragePage() {
|
||||||
const userId = useAppSelector<string>(state => state.userTg.id);
|
const userId = useAppSelector<string>(state => state.userTg.id);
|
||||||
const [page, setPage] = useState('storage');
|
const [page, setPage] = useState('storage');
|
||||||
const refLink = `https://t.me/sapphirecrown_bot?start=user_${userId}`;
|
const refLink = `https://t.me/kyc_clicker_bot?start=user_${userId}`;
|
||||||
const [showNotif, setShow] = useState(false);
|
const [showNotif, setShow] = useState(false);
|
||||||
const navigate = useNavigate();
|
const navigate = useNavigate();
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ export function WrongSourcePage() {
|
||||||
<div className={styles.container} style={{ height: `${height}px` }}>
|
<div className={styles.container} style={{ height: `${height}px` }}>
|
||||||
<div className={styles.innerContainer}>
|
<div className={styles.innerContainer}>
|
||||||
<h1 style={ETextStyles.RwSb24100} className={styles.title}>Похоже вы вошли не по той ссылке...</h1>
|
<h1 style={ETextStyles.RwSb24100} className={styles.title}>Похоже вы вошли не по той ссылке...</h1>
|
||||||
<Button text='Войти через Telegram' onClick={() => document.location.href = 'https://t.me/sapphirecrown_bot'}/>
|
<Button text='Войти через Telegram' onClick={() => document.location.href = 'https://t.me/kyc_clicker_bot'}/>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
|
|
|
@ -42,32 +42,32 @@ http {
|
||||||
access_log /var/log/nginx/access.log upstreamlog;
|
access_log /var/log/nginx/access.log upstreamlog;
|
||||||
error_log /var/log/nginx/error.log;
|
error_log /var/log/nginx/error.log;
|
||||||
listen 80;
|
listen 80;
|
||||||
listen 443 ssl http2;
|
; listen 443 ssl http2;
|
||||||
charset utf-8;
|
charset utf-8;
|
||||||
server_name crowngame.ru www.crowngame.ru;
|
; server_name kyc_clicker.ru www.kyc_clicker.ru;
|
||||||
|
|
||||||
root /dist/;
|
root /dist/;
|
||||||
index index.html;
|
index index.html;
|
||||||
|
|
||||||
ssl_certificate /etc/letsencrypt/live/crowngame.ru/fullchain.pem;
|
; ssl_certificate /etc/letsencrypt/live/kyc_clicker.ru/fullchain.pem;
|
||||||
ssl_certificate_key /etc/letsencrypt/live/crowngame.ru/privkey.pem;
|
; ssl_certificate_key /etc/letsencrypt/live/kyc_clicker.ru/privkey.pem;
|
||||||
|
|
||||||
include /etc/letsencrypt/options-ssl-nginx.conf;
|
; include /etc/letsencrypt/options-ssl-nginx.conf;
|
||||||
ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem;
|
; ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem;
|
||||||
|
|
||||||
if ($server_port = 80) {
|
; if ($server_port = 80) {
|
||||||
set $https_redirect 1;
|
; set $https_redirect 1;
|
||||||
}
|
; }
|
||||||
if ($host ~ '^www\.') {
|
; if ($host ~ '^www\.') {
|
||||||
set $https_redirect 1;
|
; set $https_redirect 1;
|
||||||
}
|
; }
|
||||||
if ($https_redirect = 1) {
|
; if ($https_redirect = 1) {
|
||||||
return 301 https://crowngame.ru$request_uri;
|
; return 301 https://crowngame.ru$request_uri;
|
||||||
}
|
; }
|
||||||
|
|
||||||
location /.well-known/acme-challenge/ {
|
; location /.well-known/acme-challenge/ {
|
||||||
root /var/www/certbot;
|
; root /var/www/certbot;
|
||||||
}
|
; }
|
||||||
|
|
||||||
# frontend
|
# frontend
|
||||||
location / {
|
location / {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user