diff --git a/dyn/add_colomn_joined2old_staus_table.py b/dyn/add_colomn_joined2old_staus_table.py new file mode 100644 index 0000000..fa48c09 --- /dev/null +++ b/dyn/add_colomn_joined2old_staus_table.py @@ -0,0 +1,53 @@ +"""这是为了把第一代数据库迁移到新的数据。 +请更新本次commit后,在确定data.db的存在时,首先运行一次此文件。 +可以重复运行,不会出错但是没必要反复运行。 +""" +import sys +import sqlite3 +from os import path + + +path_db = f'{path.dirname(path.realpath(__file__))}/data.db' +if not path.isfile(path_db): + print('未找到数据库,请仔细查看本代码开头注释内容', file=sys.stderr) + sys.exit(-1) + +conn = sqlite3.connect(path_db) +colomns = conn.execute('PRAGMA table_info("dynraffle_status")').fetchall() +for colomn in colomns: + if colomn[1] == 'handle_status': + print('您已经是最新数据库无需迁移') + break +else: + print('检测到老数据库,正在准备迁移,请稍后') + with conn: + sql_rename_old_table = 'ALTER TABLE dynraffle_status RENAME TO tmp_dynraffle_status' + sql_create_new_table = ( + 'CREATE TABLE dynraffle_status (' + 'dyn_id TEXT NOT NULL,' + 'doc_id TEXT NOT NULL UNIQUE,' + 'describe TEXT NOT NULL,' + 'uid TEXT NOT NULL,' + 'post_time INTEGER NOT NULL,' # 时间这里很简单就能比较 + 'lottery_time INTEGER NOT NULL, ' + + 'at_num INTEGER NOT NULL,' + 'feed_limit INTEGER NOT NULL,' # 0/1 表示bool型 + 'handle_status INTEGER NOT NULL,' + + 'prize_cmt_1st TEXT NOT NULL,' + 'prize_cmt_2nd TEXT,' + 'prize_cmt_3rd TEXT,' + 'PRIMARY KEY (dyn_id)' + '); ' + ) + sql_move = 'INSERT INTO dynraffle_status ' \ + 'SELECT dyn_id, doc_id, describe, uid, post_time, lottery_time, at_num, feed_limit, ?, prize_cmt_1st, prize_cmt_2nd, prize_cmt_3rd FROM tmp_dynraffle_status' + sql_drop_old_table = 'DROP TABLE tmp_dynraffle_status' + conn.execute(sql_rename_old_table) + conn.execute(sql_create_new_table) + conn.execute(sql_move, (1,)) + conn.execute(sql_drop_old_table) + +conn.close() +print('DONE') diff --git a/dyn/bili_data_types.py b/dyn/bili_data_types.py index 7da85bf..fbfd15f 100644 --- a/dyn/bili_data_types.py +++ b/dyn/bili_data_types.py @@ -16,6 +16,7 @@ class DynRaffleStatus: # 参与抽奖使用 at_num = attr.ib(converter=int) feed_limit = attr.ib(converter=bool) + handle_status = attr.ib(validator=attr.validators.in_([-1, 0, 1])) # -1 表示未参与,0表示正在参与, 1表示已经参与 # 一些其他信息 prize_cmt_1st = attr.ib(validator=attr.validators.instance_of(str)) # 奖品描述这里必须str,下同,且不提供type转换 @@ -33,6 +34,7 @@ def as_sql_values(self): at_num = self.at_num feed_limit = int(self.feed_limit) + handle_status = self.handle_status prize_cmt_1st = self.prize_cmt_1st[:20] prize_cmt_2nd = self.prize_cmt_2nd[:20] @@ -40,7 +42,7 @@ def as_sql_values(self): return \ dyn_id, doc_id, describe, uid, post_time, lottery_time,\ - at_num, feed_limit, prize_cmt_1st, prize_cmt_2nd, prize_cmt_3rd + at_num, feed_limit, handle_status, prize_cmt_1st, prize_cmt_2nd, prize_cmt_3rd @attr.s(frozen=True) diff --git a/dyn/dyn_raffle_sql.py b/dyn/dyn_raffle_sql.py index a9b41b0..0e86afc 100644 --- a/dyn/dyn_raffle_sql.py +++ b/dyn/dyn_raffle_sql.py @@ -44,6 +44,7 @@ def __init__(self): 'at_num INTEGER NOT NULL,' 'feed_limit INTEGER NOT NULL,' # 0/1 表示bool型 + 'handle_status INTEGER NOT NULL,' 'prize_cmt_1st TEXT NOT NULL,' 'prize_cmt_2nd TEXT,' @@ -60,7 +61,7 @@ def as_bili_data(self, row): def insert_element(self, dyn_raffle_status: DynRaffleStatus): # ?,?,?这种可以对应type,否则很难折腾 with self.conn: - self.conn.execute('INSERT INTO dynraffle_status VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', + self.conn.execute('INSERT INTO dynraffle_status VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', dyn_raffle_status.as_sql_values()) def select_all(self): @@ -80,9 +81,25 @@ def del_by_primary_key(self, dyn_id): with self.conn: self.conn.execute('DELETE FROM dynraffle_status WHERE dyn_id=?', (str(dyn_id),)) - def select_bytime(self, curr_time): + def select(self, handle_status, lottery_time_l, lottery_time_r): + if handle_status is None: + return [] + results = [] - for row in self.conn.execute(f'SELECT * FROM dynraffle_status WHERE lottery_time < ?', (int(curr_time),)): + if lottery_time_l is None and lottery_time_r is not None: + sql = 'SELECT * FROM dynraffle_status WHERE lottery_time <= ? AND handle_status = ?' + parameters = (int(lottery_time_r), int(handle_status)) + elif lottery_time_l is not None and lottery_time_r is None: + sql = 'SELECT * FROM dynraffle_status WHERE lottery_time >= ? AND handle_status = ?' + parameters = (int(lottery_time_l), int(handle_status)) + elif lottery_time_l is not None and lottery_time_r is not None: + sql = 'SELECT * FROM dynraffle_status WHERE (lottery_time BETWEEN ? AND ?) AND (handle_status = ?)' + parameters = (int(lottery_time_l), int(lottery_time_r), int(handle_status)) + else: + sql = 'SELECT * FROM dynraffle_status WHERE handle_status = ?' + parameters = (int(handle_status),) + + for row in self.conn.execute(sql, parameters): results.append(self.as_bili_data(row)) return results @@ -298,8 +315,19 @@ def should_del_from_dynraffle_status_table(orig_dynid): return not cursor.fetchone() -def select_bytime(curr_time): - return dynraffle_status_table.select_bytime(curr_time) +def can_rafflestatus_be_handled(dyn_id): + cursor = conn.execute( + 'SELECT 1 FROM dynraffle_status WHERE dyn_id = ? AND handle_status = ?', (str(dyn_id), 0)) + return bool(cursor.fetchone()) + + +def set_rafflestatus_handle_status(handle_status: int, dyn_id): + with conn: + conn.execute('UPDATE dynraffle_status SET handle_status = ? WHERE dyn_id = ?', (handle_status, str(dyn_id))) + + +def select_rafflestatus(handle_status, lottery_time_l=None, lottery_time_r=None): + return dynraffle_status_table.select(handle_status, lottery_time_l, lottery_time_r) # 从三个表里查最新数据 diff --git a/dyn/monitor_dyn_raffle.py b/dyn/monitor_dyn_raffle.py index cd82fc0..3c146e7 100644 --- a/dyn/monitor_dyn_raffle.py +++ b/dyn/monitor_dyn_raffle.py @@ -9,32 +9,21 @@ class DynRaffleMonitor: - def __init__(self, dyn_raffle_description_filter=None, dyn_prize_cmt_filter=None, init_docid=None): + def __init__(self, should_join_immediately: bool, init_docid=None): self.init_docid = init_docid - if dyn_raffle_description_filter is None: - self.dyn_raffle_description_filter = [] - else: - self.dyn_raffle_description_filter = dyn_raffle_description_filter - if dyn_prize_cmt_filter is None: - self.dyn_prize_cmt_filter = [] - else: - self.dyn_prize_cmt_filter = dyn_prize_cmt_filter - - async def fetch_latest_docid(self): - doc_id = await notifier.exec_func(-1, DynRaffleHandlerTask.create_dyn) - await notifier.exec_func(-1, DynRaffleHandlerTask.del_dyn_by_docid, doc_id) - return doc_id - - # 打算设置两步保险,一个是status表中最新doc_id,一个是开一个专门的表专门去定时更新doc_id - async def get_latest_docid(self) -> int: - return dyn_raffle_sql.init_docid() + + self.dyn_raffle_description_filter = [] + self.dyn_prize_cmt_filter = [] + + self.should_join_immediately = should_join_immediately + self._init_handle_status = -1 if not self.should_join_immediately else 0 # 获取dyn_raffle抽奖更多信息并且进行过滤 async def dig_and_filter(self, doc_id: int, uid: int, post_time: int, describe: str): dyn_raffle_status: DynRaffleStatus = await notifier.exec_func( -1, DynRaffleHandlerTask.fetch_dyn_raffle_status, - doc_id, uid, post_time, describe) - if dyn_raffle_status.lottery_time <= utils.curr_time() + 60: + doc_id, uid, post_time, describe, self._init_handle_status) + if dyn_raffle_status.lottery_time <= utils.curr_time() + 180: printer.info([f'{doc_id}的动态抽奖已经开奖或马上开奖,不再参与'], True) return for key_word in self.dyn_raffle_description_filter: @@ -51,14 +40,30 @@ async def dig_and_filter(self, doc_id: int, uid: int, post_time: int, describe: if dyn_raffle_status.post_time >= utils.curr_time() - 150: printer.info([f'{doc_id}的动态抽奖触发时间约束,休眠150秒后再正式参与'], True) await asyncio.sleep(150) - printer.info([f'{doc_id}的动态抽奖通过时间和关键词过滤'], True) - notifier.exec_task(-1, DynRaffleHandlerTask, 0, dyn_raffle_status, delay_range=(0, 0)) - async def run(self): + if dyn_raffle_sql.is_raffleid_duplicate(dyn_raffle_status.dyn_id): + printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖触发重复性过滤'], True) + return + dyn_raffle_sql.insert_dynraffle_status_table(dyn_raffle_status) + + printer.info([f'{doc_id}的动态抽奖通过过滤与验证,正式处理'], True) + + if dyn_raffle_status.handle_status == -1: + printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖暂不参与,仅记录数据库中等候轮询'], True) + return + printer.info([f'{doc_id}的动态抽奖正在参与'], True) + await notifier.exec_task_awaitable(-1, DynRaffleHandlerTask, 1, dyn_raffle_status, + delay_range=(0, 30)) + dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id) + printer.info([f'{doc_id}的动态抽奖参与完毕'], True) + + async def check_raffle(self): if self.init_docid is None: - init_docid = await self.get_latest_docid() + init_docid = dyn_raffle_sql.init_docid() # 1.数据库查询 if init_docid < 0: - init_docid = await self.fetch_latest_docid() - 1000 - 1 # 最后保险,必须是有效doc_id + doc_id = await notifier.exec_func(-1, DynRaffleHandlerTask.create_dyn) # 2.动态获取最新的id + await notifier.exec_func(-1, DynRaffleHandlerTask.del_dyn_by_docid, doc_id) + init_docid = doc_id - 1000 - 1 dyn_raffle_sql.insert_or_replace_other_able('init_docid', init_docid) self.init_docid = init_docid + 1 curr_docid = self.init_docid @@ -89,18 +94,57 @@ async def run(self): async def check_result(self): while True: - results = dyn_raffle_sql.select_bytime(utils.curr_time() + 900) # 延迟15min处理抽奖 + results = dyn_raffle_sql.select_rafflestatus(1, None, utils.curr_time() - 900) # 延迟15min处理抽奖 + results += dyn_raffle_sql.select_rafflestatus(-1, None, utils.curr_time() - 900) + printer.info(['正在查找已经结束的动态抽奖:', results], True) for dyn_raffle_status in results: dyn_raffle_results: Optional[DynRaffleResults] = await notifier.exec_func( -1, DynRaffleHandlerTask.fetch_dyn_raffle_results, dyn_raffle_status) print(dyn_raffle_status, dyn_raffle_results) - future = asyncio.Future() - notifier.exec_task(-1, DynRaffleHandlerTask, 2, dyn_raffle_status, dyn_raffle_results, future, delay_range=(0, 30)) - await future + + await notifier.exec_task_awaitable(-1, DynRaffleHandlerTask, 2, dyn_raffle_status, dyn_raffle_results, delay_range=(0, 30)) if dyn_raffle_results is not None: dyn_raffle_sql.insert_dynraffle_results_table(dyn_raffle_results) + dyn_raffle_sql.del_from_dynraffle_status_table(dyn_raffle_status.dyn_id) + + await asyncio.sleep(120) + + # TODO: + async def check_handle(self): + while True: + curr_time = utils.curr_time() + results = dyn_raffle_sql.select_rafflestatus(-1, curr_time + 300, curr_time + 1200)[:5] # 20分钟到5分钟 + printer.info(['正在查找需要参与的动态抽奖:', results], True) + for dyn_raffle_status in results: + print(dyn_raffle_status) + is_exist = await notifier.exec_func( + -1, DynRaffleHandlerTask.check, dyn_raffle_status.doc_id) + if not is_exist: + dyn_raffle_sql.del_from_dynraffle_status_table(dyn_raffle_status.dyn_id) + continue + printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖正在参与'], True) + await notifier.exec_task_awaitable(-1, DynRaffleHandlerTask, 1, dyn_raffle_status) + dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id) + printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖参与完毕'], True) + if not results: + await asyncio.sleep(60) + + async def run(self): + results = dyn_raffle_sql.select_rafflestatus(0) + for dyn_raffle_status in results: + print(dyn_raffle_status) + printer.info([f'正在暴力处理上次中断的{dyn_raffle_status.doc_id}的动态抽奖后续'], True) + dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id) - print('Dnone') - await asyncio.sleep(3600) + printer.info([f'欢迎使用动态抽奖'], True) + tasks = [] + task_check_raffle = asyncio.ensure_future(self.check_raffle()) + tasks.append(task_check_raffle) + task_check_result = asyncio.ensure_future(self.check_result()) + tasks.append(task_check_result) + if not self.should_join_immediately: + task_check_join = asyncio.ensure_future(self.check_handle()) + tasks.append(task_check_join) + await asyncio.wait(tasks) diff --git a/run.py b/run.py index 2aff397..17331af 100644 --- a/run.py +++ b/run.py @@ -90,13 +90,10 @@ async def get_printer_danmu(): notifier.exec_task(-2, BiliMainTask, 0, delay_range=(0, 5)) -dyn_raffle_moitor = DynRaffleMonitor() - other_tasks = [ raffle_handler.run(), - # SubstanceRaffleMonitor().run() - dyn_raffle_moitor.run(), - dyn_raffle_moitor.check_result() + # SubstanceRaffleMonitor().run(), + # DynRaffleMonitor(should_join_immediately=True).run(), ] loop.run_until_complete(asyncio.wait(other_tasks)) diff --git a/tasks/dyn_raffle_handler.py b/tasks/dyn_raffle_handler.py index 6a757f2..e2ce6a0 100644 --- a/tasks/dyn_raffle_handler.py +++ b/tasks/dyn_raffle_handler.py @@ -135,11 +135,11 @@ async def is_dyn_raffle(user, doc_id): return -1, None # 目前未发现其他code user.warn(f'互动抽奖初步查询 {json_rsp}') - return 3, None + return -1, None @staticmethod async def fetch_dyn_raffle_status( - user, doc_id: int, uid: int, post_time: int, describe: str) -> Optional[DynRaffleStatus]: + user, doc_id: int, uid: int, post_time: int, describe: str, handle_status: int) -> Optional[DynRaffleStatus]: json_rsp = await user.req_s(DynRaffleHandlerReq.fetch_dyn_raffle, user, doc_id) code = json_rsp['code'] if not code: @@ -167,6 +167,7 @@ async def fetch_dyn_raffle_status( lottery_time=lottery_time, at_num=at_num, feed_limit=feed_limit, + handle_status=handle_status, prize_cmt_1st=first_prize_cmt, prize_cmt_2nd=second_prize_cmt, prize_cmt_3rd=third_prize_cmt @@ -216,16 +217,6 @@ async def fetch_dyn_raffle_results( print(f'抽奖动态{dyn_raffle_status.doc_id}已经删除') return None - @staticmethod - async def check(user, dyn_raffle_status: DynRaffleStatus): - if not dyn_raffle_sql.is_raffleid_duplicate(dyn_raffle_status.dyn_id): - user.info([f'{dyn_raffle_status.doc_id}的动态抽奖通过重复性过滤'], True) - dyn_raffle_sql.insert_dynraffle_status_table(dyn_raffle_status) - max_sleeptime = max(min(35, dyn_raffle_status.lottery_time-utils.curr_time() - 10), 0) - return (1, (0, max_sleeptime), -2, dyn_raffle_status), - user.info([f'{dyn_raffle_status.doc_id}的动态抽奖未通过重复性过滤'], True) - return None - @staticmethod async def follow_raffle_organizer(user, uid): is_following, group_ids = await UtilsTask.check_follow(user, uid) @@ -246,8 +237,20 @@ async def unfollow_raffle_organizer(user, uid): if group_id in group_ids: await UtilsTask.unfollow(user, uid) + @staticmethod + async def check(user, doc_id: int): + # 确认dyn存在性 + json_rsp = await user.req_s(DynRaffleHandlerReq.fetch_dyn_raffle, user, doc_id) + code = json_rsp['code'] + if not code: + return True + user.info([f'{doc_id}的动态抽奖不存在'], True) + return False + @staticmethod async def join(user, dyn_raffle_status: DynRaffleStatus): + if dyn_raffle_status.lottery_time - utils.curr_time() < 15: + user.info([f'动态{dyn_raffle_status.dyn_id}马上或已经开奖,放弃参与'], True) async with user.repost_del_lock: if dyn_raffle_status.feed_limit: # 关注 await DynRaffleHandlerTask.follow_raffle_organizer(user, dyn_raffle_status.uid) @@ -270,7 +273,7 @@ async def join(user, dyn_raffle_status: DynRaffleStatus): return @staticmethod - async def notice(user, dyn_raffle_status: DynRaffleStatus, dyn_raffle_results: Optional[DynRaffleResults], all_done_future=None): + async def notice(user, dyn_raffle_status: DynRaffleStatus, dyn_raffle_results: Optional[DynRaffleResults]): int_user_uid = int(user.dict_bili['uid']) async with user.repost_del_lock: dyn_raffle_joined = dyn_raffle_sql.select_by_primary_key_from_dynraffle_joined_table( @@ -306,8 +309,3 @@ async def notice(user, dyn_raffle_status: DynRaffleStatus, dyn_raffle_results: O orig_dynid=dyn_raffle_joined.orig_dynid, following_uid=following_uid )) - - if dyn_raffle_sql.should_del_from_dynraffle_status_table(dyn_raffle_status.dyn_id): - dyn_raffle_sql.del_from_dynraffle_status_table(dyn_raffle_status.dyn_id) - if all_done_future is not None: - all_done_future.set_result(True)