From 3f40c402a1da1edb8974a7ad6b04e2e0e00a7e8b Mon Sep 17 00:00:00 2001 From: James Melvin Date: Mon, 5 Jun 2023 19:56:55 +0530 Subject: [PATCH 1/9] fix: batch or partition DB access and commits for user streak updation --- backend/oasst_backend/config.py | 2 ++ backend/oasst_backend/scheduled_tasks.py | 38 +++++++++++++++++++----- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/backend/oasst_backend/config.py b/backend/oasst_backend/config.py index 44721a76ab..14aa1123c3 100644 --- a/backend/oasst_backend/config.py +++ b/backend/oasst_backend/config.py @@ -277,6 +277,8 @@ def validate_user_stats_intervals(cls, v: int): DISCORD_API_KEY: str | None = None DISCORD_CHANNEL_ID: str | None = None + USER_STREAK_BATCH_SIZE: int = 1000 + class Config: env_file = ".env" env_file_encoding = "utf-8" diff --git a/backend/oasst_backend/scheduled_tasks.py b/backend/oasst_backend/scheduled_tasks.py index 407835c492..bc32a64a5b 100644 --- a/backend/oasst_backend/scheduled_tasks.py +++ b/backend/oasst_backend/scheduled_tasks.py @@ -7,6 +7,7 @@ from celery import shared_task from loguru import logger from oasst_backend.celery_worker import app +from oasst_backend.config import settings from oasst_backend.models import ApiClient, Message from oasst_backend.models.db_payload import MessagePayload from oasst_backend.prompt_repository import PromptRepository @@ -14,7 +15,7 @@ from oasst_backend.utils.database_utils import db_lang_to_postgres_ts_lang, default_session_factory from oasst_backend.utils.hugging_face import HfClassificationModel, HfEmbeddingModel, HfUrl, HuggingFaceAPI from oasst_shared.utils import utcnow -from sqlalchemy import func +from sqlalchemy import func, update from sqlmodel import select startup_time: datetime = utcnow() @@ -102,10 +103,17 @@ def update_user_streak() -> None: if timedelta.days > 0: # Update only greater than 24 hours . Do nothing logger.info("Process timedelta greater than 24h") - statement = select(User) - result = session.exec(statement).all() - if result is not None: - for user in result: + batch_size = settings.USER_STREAK_BATCH_SIZE # Adjust the batch size as per your requirements + + statement = select(User).execution_options(yield_per=batch_size) + num_of_partitions = 0 + for partition in session.scalars(statement).partitions(): + num_users = 0 + user_list = [] + num_of_partitions = num_of_partitions + 1 + for i, user in enumerate(partition): + num_users += 1 + user.streak_days += 1 last_activity_date = user.last_activity_date streak_last_day_date = user.streak_last_day_date # set NULL streak_days to 0 @@ -127,9 +135,23 @@ def update_user_streak() -> None: if streak_delta.days > 0: user.streak_days += 1 user.streak_last_day_date = current_time - session.add(user) - session.commit() - + user_list.append(user) + + logger.info(f"Total users in partition {i } : {num_users}") + # Bulk update the streak information + update_statement = ( + update(User) + .where(User.id.in_([user.id for user in user_list])) + .values( + streak_days=User.streak_days, + streak_last_day_date=User.streak_last_day_date, + ) + ) + session.execute(update_statement) + session.flush() + logger.info(f"Flushed partition {i}") + session.commit() + logger.info("User streak updated successfully! for {num_of_partitions} partitions") else: logger.info("Not yet 24hours since the process started! ...") logger.info("User streak end...") From 11c6a5da394c50321f847c5e364f6053709e8e36 Mon Sep 17 00:00:00 2001 From: James Melvin Date: Tue, 6 Jun 2023 20:54:24 +0530 Subject: [PATCH 2/9] fix: New logic for user streak updates --- backend/oasst_backend/api/v1/tasks.py | 2 +- backend/oasst_backend/scheduled_tasks.py | 77 ++++++------------------ backend/oasst_backend/user_repository.py | 16 ++++- 3 files changed, 32 insertions(+), 63 deletions(-) diff --git a/backend/oasst_backend/api/v1/tasks.py b/backend/oasst_backend/api/v1/tasks.py index b2b5aff74f..6e4a6b630e 100644 --- a/backend/oasst_backend/api/v1/tasks.py +++ b/backend/oasst_backend/api/v1/tasks.py @@ -174,7 +174,7 @@ async def interaction_tx(session: deps.Session): ur = UserRepository(session, api_client) task = await tm.handle_interaction(interaction) if type(task) is protocol_schema.TaskDone: - ur.update_user_last_activity(user=pr.user) + ur.update_user_last_activity(user=pr.user, update_streak=True) return task try: diff --git a/backend/oasst_backend/scheduled_tasks.py b/backend/oasst_backend/scheduled_tasks.py index bc32a64a5b..643e5643cc 100644 --- a/backend/oasst_backend/scheduled_tasks.py +++ b/backend/oasst_backend/scheduled_tasks.py @@ -7,16 +7,13 @@ from celery import shared_task from loguru import logger from oasst_backend.celery_worker import app -from oasst_backend.config import settings from oasst_backend.models import ApiClient, Message from oasst_backend.models.db_payload import MessagePayload from oasst_backend.prompt_repository import PromptRepository -from oasst_backend.user_repository import User from oasst_backend.utils.database_utils import db_lang_to_postgres_ts_lang, default_session_factory from oasst_backend.utils.hugging_face import HfClassificationModel, HfEmbeddingModel, HfUrl, HuggingFaceAPI from oasst_shared.utils import utcnow -from sqlalchemy import func, update -from sqlmodel import select +from sqlalchemy import func startup_time: datetime = utcnow() @@ -95,65 +92,25 @@ def update_search_vectors(batch_size: int) -> None: @shared_task(name="update_user_streak") def update_user_streak() -> None: + # check if user has been active in the last 24h and update streak accordingly logger.info("update_user_streak start...") try: with default_session_factory() as session: current_time = utcnow() - timedelta = current_time - startup_time - if timedelta.days > 0: - # Update only greater than 24 hours . Do nothing - logger.info("Process timedelta greater than 24h") - batch_size = settings.USER_STREAK_BATCH_SIZE # Adjust the batch size as per your requirements - - statement = select(User).execution_options(yield_per=batch_size) - num_of_partitions = 0 - for partition in session.scalars(statement).partitions(): - num_users = 0 - user_list = [] - num_of_partitions = num_of_partitions + 1 - for i, user in enumerate(partition): - num_users += 1 - user.streak_days += 1 - last_activity_date = user.last_activity_date - streak_last_day_date = user.streak_last_day_date - # set NULL streak_days to 0 - if user.streak_days is None: - user.streak_days = 0 - # if the user had completed a task - if last_activity_date is not None: - lastactitvitydelta = current_time - last_activity_date - # if the user missed consecutive days of completing a task - # reset the streak_days to 0 and set streak_last_day_date to the current_time - if lastactitvitydelta.days > 1 or user.streak_days is None: - user.streak_days = 0 - user.streak_last_day_date = current_time - # streak_last_day_date has a current timestamp in DB. Ideally should not be NULL. - if streak_last_day_date is not None: - streak_delta = current_time - streak_last_day_date - # if user completed tasks on consecutive days then increment the streak days - # update the streak_last_day_date to current time for the next calculation - if streak_delta.days > 0: - user.streak_days += 1 - user.streak_last_day_date = current_time - user_list.append(user) - - logger.info(f"Total users in partition {i } : {num_users}") - # Bulk update the streak information - update_statement = ( - update(User) - .where(User.id.in_([user.id for user in user_list])) - .values( - streak_days=User.streak_days, - streak_last_day_date=User.streak_last_day_date, - ) - ) - session.execute(update_statement) - session.flush() - logger.info(f"Flushed partition {i}") - session.commit() - logger.info("User streak updated successfully! for {num_of_partitions} partitions") - else: - logger.info("Not yet 24hours since the process started! ...") - logger.info("User streak end...") + logger.info("Process timedelta greater than 24h") + + # Reset streak_days to 0 for users with more than one day of inactivity + reset_query = f""" + UPDATE "user" + SET streak_days = 0, + streak_last_day_date = '{current_time}' + WHERE ('{current_time}' - last_activity_date) > interval '1 day' + OR streak_days IS NULL + OR last_activity_date IS NULL + """ + session.execute(reset_query) + session.commit() + + logger.info("User streak reset successfully!") except Exception as e: logger.error(str(e)) diff --git a/backend/oasst_backend/user_repository.py b/backend/oasst_backend/user_repository.py index 13c84074a9..acd74bcfcc 100644 --- a/backend/oasst_backend/user_repository.py +++ b/backend/oasst_backend/user_repository.py @@ -332,6 +332,18 @@ def query_users_ordered_by_display_name( return qry.all() @managed_tx_method(CommitMode.FLUSH) - def update_user_last_activity(self, user: User) -> None: - user.last_activity_date = utcnow() + def update_user_last_activity(self, user: User, update_streak: bool = False) -> None: + current_time = utcnow() + user.last_activity_date = current_time + streak_last_day_date = user.streak_last_day_date + + if user.streak_last_day_date is None: + # this should only happen when the user is first created + user.streak_last_day_date = user.last_activity_date + else: + # if the user has not been active for more than a day increment it by 1 + if current_time.days != streak_last_day_date.days: + user.streak_last_day_date = user.last_activity_date + user.streak_days += 1 + self.db.add(user) From 04a63ad98e0ef4d30f62650b15d6f7d304d97222 Mon Sep 17 00:00:00 2001 From: James Melvin Date: Tue, 6 Jun 2023 23:05:50 +0530 Subject: [PATCH 3/9] fix: added update_streak param --- backend/oasst_backend/user_repository.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/backend/oasst_backend/user_repository.py b/backend/oasst_backend/user_repository.py index acd74bcfcc..ffe6206563 100644 --- a/backend/oasst_backend/user_repository.py +++ b/backend/oasst_backend/user_repository.py @@ -1,6 +1,7 @@ from typing import Optional from uuid import UUID +from loguru import logger from oasst_backend.config import settings from oasst_backend.models import ApiClient, User from oasst_backend.utils.database_utils import CommitMode, managed_tx_method @@ -337,13 +338,17 @@ def update_user_last_activity(self, user: User, update_streak: bool = False) -> user.last_activity_date = current_time streak_last_day_date = user.streak_last_day_date - if user.streak_last_day_date is None: - # this should only happen when the user is first created - user.streak_last_day_date = user.last_activity_date - else: - # if the user has not been active for more than a day increment it by 1 - if current_time.days != streak_last_day_date.days: - user.streak_last_day_date = user.last_activity_date - user.streak_days += 1 + if update_streak: + try: + if user.streak_last_day_date is None: + # this should only happen when the user is first created + user.streak_last_day_date = user.last_activity_date + else: + # if the user has not been active for more than a day increment it by 1 + if current_time.days != streak_last_day_date.days: + user.streak_last_day_date = user.last_activity_date + user.streak_days += 1 + except Exception as e: + logger.error(f"Error updating user streak for user {user.id}: {e}") self.db.add(user) From 7b1331a92123a7913886261d3dc879f575a33724 Mon Sep 17 00:00:00 2001 From: Andreas Koepf Date: Tue, 6 Jun 2023 21:33:42 +0200 Subject: [PATCH 4/9] minor cleanup --- backend/oasst_backend/celery_worker.py | 6 ++-- backend/oasst_backend/config.py | 2 -- backend/oasst_backend/scheduled_tasks.py | 42 ++++++++++-------------- backend/oasst_backend/user_repository.py | 20 ++++------- 4 files changed, 27 insertions(+), 43 deletions(-) diff --git a/backend/oasst_backend/celery_worker.py b/backend/oasst_backend/celery_worker.py index fe99527b48..1bcc641407 100644 --- a/backend/oasst_backend/celery_worker.py +++ b/backend/oasst_backend/celery_worker.py @@ -19,9 +19,9 @@ # see https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html app.conf.beat_schedule = { - "update-user-streak": { - "task": "update_user_streak", - "schedule": 60.0 * 60.0 * 4, # seconds + "reset-user-streak": { + "task": "reset_user_streak", + "schedule": 60.0 * 60.0 * 4, # in seconds, every 4h }, "update-search-vectors": { "task": "update_search_vectors", diff --git a/backend/oasst_backend/config.py b/backend/oasst_backend/config.py index 14aa1123c3..44721a76ab 100644 --- a/backend/oasst_backend/config.py +++ b/backend/oasst_backend/config.py @@ -277,8 +277,6 @@ def validate_user_stats_intervals(cls, v: int): DISCORD_API_KEY: str | None = None DISCORD_CHANNEL_ID: str | None = None - USER_STREAK_BATCH_SIZE: int = 1000 - class Config: env_file = ".env" env_file_encoding = "utf-8" diff --git a/backend/oasst_backend/scheduled_tasks.py b/backend/oasst_backend/scheduled_tasks.py index 643e5643cc..a0ff2f2ca0 100644 --- a/backend/oasst_backend/scheduled_tasks.py +++ b/backend/oasst_backend/scheduled_tasks.py @@ -1,6 +1,6 @@ from __future__ import absolute_import, unicode_literals -from datetime import datetime +from datetime import timedelta from typing import Any, Dict, List from asgiref.sync import async_to_sync @@ -12,10 +12,8 @@ from oasst_backend.prompt_repository import PromptRepository from oasst_backend.utils.database_utils import db_lang_to_postgres_ts_lang, default_session_factory from oasst_backend.utils.hugging_face import HfClassificationModel, HfEmbeddingModel, HfUrl, HuggingFaceAPI -from oasst_shared.utils import utcnow -from sqlalchemy import func - -startup_time: datetime = utcnow() +from oasst_shared.utils import log_timing, utcnow +from sqlalchemy import func, text async def useHFApi(text, url, model_name): @@ -91,26 +89,20 @@ def update_search_vectors(batch_size: int) -> None: @shared_task(name="update_user_streak") -def update_user_streak() -> None: - # check if user has been active in the last 24h and update streak accordingly - logger.info("update_user_streak start...") +@log_timing(level="INFO") +def reset_user_streak() -> None: try: with default_session_factory() as session: - current_time = utcnow() - logger.info("Process timedelta greater than 24h") - - # Reset streak_days to 0 for users with more than one day of inactivity - reset_query = f""" - UPDATE "user" - SET streak_days = 0, - streak_last_day_date = '{current_time}' - WHERE ('{current_time}' - last_activity_date) > interval '1 day' - OR streak_days IS NULL - OR last_activity_date IS NULL - """ - session.execute(reset_query) + # Reset streak_days to 0 for users with more than 1.5 days of inactivity + streak_timeout = utcnow() - timedelta(hours=36) + sql_reset_query = """ +UPDATE "user" +SET streak_days = 0, + streak_last_day_date = NULL +WHERE last_activity_date < :streak_timeout + AND streak_last_day_date IS NOT NULL +""" + session.execute(text(sql_reset_query), {"streak_timeout": streak_timeout}) session.commit() - - logger.info("User streak reset successfully!") - except Exception as e: - logger.error(str(e)) + except Exception: + logger.exception("Error during user streak reset") diff --git a/backend/oasst_backend/user_repository.py b/backend/oasst_backend/user_repository.py index ffe6206563..a6ed3afb4c 100644 --- a/backend/oasst_backend/user_repository.py +++ b/backend/oasst_backend/user_repository.py @@ -1,7 +1,6 @@ from typing import Optional from uuid import UUID -from loguru import logger from oasst_backend.config import settings from oasst_backend.models import ApiClient, User from oasst_backend.utils.database_utils import CommitMode, managed_tx_method @@ -336,19 +335,14 @@ def query_users_ordered_by_display_name( def update_user_last_activity(self, user: User, update_streak: bool = False) -> None: current_time = utcnow() user.last_activity_date = current_time - streak_last_day_date = user.streak_last_day_date if update_streak: - try: - if user.streak_last_day_date is None: - # this should only happen when the user is first created - user.streak_last_day_date = user.last_activity_date - else: - # if the user has not been active for more than a day increment it by 1 - if current_time.days != streak_last_day_date.days: - user.streak_last_day_date = user.last_activity_date - user.streak_days += 1 - except Exception as e: - logger.error(f"Error updating user streak for user {user.id}: {e}") + if user.streak_last_day_date is None or user.streak_last_day_date > current_time: + # beginning new streak + user.streak_last_day_date = user.last_activity_date + user.streak_days = 0 + else: + # update streak day count + user.streak_days = (current_time - user.last_activity_date).days self.db.add(user) From 4e37c53993af41f3be0e5f9cea2f3cb2c8608823 Mon Sep 17 00:00:00 2001 From: Andreas Koepf Date: Tue, 6 Jun 2023 21:35:43 +0200 Subject: [PATCH 5/9] use correct ref date --- backend/oasst_backend/user_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/oasst_backend/user_repository.py b/backend/oasst_backend/user_repository.py index a6ed3afb4c..c13afd94f6 100644 --- a/backend/oasst_backend/user_repository.py +++ b/backend/oasst_backend/user_repository.py @@ -343,6 +343,6 @@ def update_user_last_activity(self, user: User, update_streak: bool = False) -> user.streak_days = 0 else: # update streak day count - user.streak_days = (current_time - user.last_activity_date).days + user.streak_days = (current_time - user.streak_last_day_date).days self.db.add(user) From 04db76ac10b5afb1507fcdfb5d2489a943a30800 Mon Sep 17 00:00:00 2001 From: Andreas Koepf Date: Tue, 6 Jun 2023 21:53:52 +0200 Subject: [PATCH 6/9] use sqlalchemy for update --- backend/oasst_backend/celery_worker.py | 2 +- backend/oasst_backend/scheduled_tasks.py | 25 ++++++++++++------------ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/backend/oasst_backend/celery_worker.py b/backend/oasst_backend/celery_worker.py index 1bcc641407..714772e443 100644 --- a/backend/oasst_backend/celery_worker.py +++ b/backend/oasst_backend/celery_worker.py @@ -20,7 +20,7 @@ # see https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html app.conf.beat_schedule = { "reset-user-streak": { - "task": "reset_user_streak", + "task": "periodic_user_streak_reset", "schedule": 60.0 * 60.0 * 4, # in seconds, every 4h }, "update-search-vectors": { diff --git a/backend/oasst_backend/scheduled_tasks.py b/backend/oasst_backend/scheduled_tasks.py index a0ff2f2ca0..4ae1ea1264 100644 --- a/backend/oasst_backend/scheduled_tasks.py +++ b/backend/oasst_backend/scheduled_tasks.py @@ -7,13 +7,14 @@ from celery import shared_task from loguru import logger from oasst_backend.celery_worker import app -from oasst_backend.models import ApiClient, Message +from oasst_backend.models import ApiClient, Message, User from oasst_backend.models.db_payload import MessagePayload from oasst_backend.prompt_repository import PromptRepository from oasst_backend.utils.database_utils import db_lang_to_postgres_ts_lang, default_session_factory from oasst_backend.utils.hugging_face import HfClassificationModel, HfEmbeddingModel, HfUrl, HuggingFaceAPI from oasst_shared.utils import log_timing, utcnow -from sqlalchemy import func, text +from sqlalchemy import func +from sqlmodel import update async def useHFApi(text, url, model_name): @@ -88,21 +89,19 @@ def update_search_vectors(batch_size: int) -> None: logger.error(f"update_search_vectors failed with error: {str(e)}") -@shared_task(name="update_user_streak") +@shared_task(name="periodic_user_streak_reset") @log_timing(level="INFO") -def reset_user_streak() -> None: +def periodic_user_streak_reset() -> None: try: with default_session_factory() as session: # Reset streak_days to 0 for users with more than 1.5 days of inactivity streak_timeout = utcnow() - timedelta(hours=36) - sql_reset_query = """ -UPDATE "user" -SET streak_days = 0, - streak_last_day_date = NULL -WHERE last_activity_date < :streak_timeout - AND streak_last_day_date IS NOT NULL -""" - session.execute(text(sql_reset_query), {"streak_timeout": streak_timeout}) + reset_query = ( + update(User) + .filter(User.last_activity_date < streak_timeout, User.streak_last_day_date.is_not(None)) + .values(streak_days=0, streak_last_day_date=None) + ) + session.execute(reset_query) session.commit() except Exception: - logger.exception("Error during user streak reset") + logger.exception("Error during periodic user streak reset") From 024b1623576493520791dc2b0e191a49344d1ce9 Mon Sep 17 00:00:00 2001 From: Andreas Koepf Date: Tue, 6 Jun 2023 21:56:24 +0200 Subject: [PATCH 7/9] minor formatting --- backend/oasst_backend/user_repository.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/oasst_backend/user_repository.py b/backend/oasst_backend/user_repository.py index c13afd94f6..3c326afb4a 100644 --- a/backend/oasst_backend/user_repository.py +++ b/backend/oasst_backend/user_repository.py @@ -338,8 +338,8 @@ def update_user_last_activity(self, user: User, update_streak: bool = False) -> if update_streak: if user.streak_last_day_date is None or user.streak_last_day_date > current_time: - # beginning new streak - user.streak_last_day_date = user.last_activity_date + # begin new streak + user.streak_last_day_date = current_time user.streak_days = 0 else: # update streak day count From f5f946c7099094f0ba4b3f11477d55873de03d95 Mon Sep 17 00:00:00 2001 From: Andreas Koepf Date: Tue, 6 Jun 2023 22:14:29 +0200 Subject: [PATCH 8/9] remove unused line from main.py --- backend/main.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/main.py b/backend/main.py index 168de5cbc1..8cef801bd7 100644 --- a/backend/main.py +++ b/backend/main.py @@ -32,8 +32,6 @@ from sqlmodel import Session from starlette.middleware.cors import CORSMiddleware -# from worker.scheduled_tasks import create_task - app = fastapi.FastAPI(title=settings.PROJECT_NAME, openapi_url=f"{settings.API_V1_STR}/openapi.json") startup_time: datetime = utcnow() From 95ff5af2b07ebb05f8cc6ee0abbe71bdc81c7433 Mon Sep 17 00:00:00 2001 From: James Melvin Date: Wed, 7 Jun 2023 06:14:47 +0530 Subject: [PATCH 9/9] fix: Minor typo in backend ReadMe --- backend/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/README.md b/backend/README.md index 2cf7fb34ac..f22f0ca827 100644 --- a/backend/README.md +++ b/backend/README.md @@ -26,7 +26,7 @@ Next, to install all requirements, You can run 1. `pip install -r requirements.txt` inside the `backend` folder; and 2. `pip install -e .` inside the `oasst-shared` folder. 3. `pip install -e .` inside the `oasst-data` folder. -4. `./scripts/backend-development/run-local.sh` to run the backend. This will +4. `../scripts/backend-development/run-local.sh` to run the backend. This will start the backend server at `http://localhost:8080`. ## REST Server Configuration