Skip to content

Commit

Permalink
支持在可以选择开奖前几分钟才抽奖(20分钟);加入参与的状态,防止参与与删除凑到一起了
Browse files Browse the repository at this point in the history
默认关闭;先前此版本的动态请先运行add_colomn_joined2old_staus_table.py
  • Loading branch information
yjqiang committed Mar 16, 2019
1 parent 45540fd commit 93fb545
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 60 deletions.
53 changes: 53 additions & 0 deletions dyn/add_colomn_joined2old_staus_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""这是为了把第一代数据库迁移到新的数据。
请更新本次commit后,在确定data.db的存在时,首先运行一次此文件。
可以重复运行,不会出错但是没必要反复运行。
"""
import sys
import sqlite3
from os import path


path_db = f'{path.dirname(path.realpath(__file__))}/data.db'
if not path.isfile(path_db):
print('未找到数据库,请仔细查看本代码开头注释内容', file=sys.stderr)
sys.exit(-1)

conn = sqlite3.connect(path_db)
colomns = conn.execute('PRAGMA table_info("dynraffle_status")').fetchall()
for colomn in colomns:
if colomn[1] == 'handle_status':
print('您已经是最新数据库无需迁移')
break
else:
print('检测到老数据库,正在准备迁移,请稍后')
with conn:
sql_rename_old_table = 'ALTER TABLE dynraffle_status RENAME TO tmp_dynraffle_status'
sql_create_new_table = (
'CREATE TABLE dynraffle_status ('
'dyn_id TEXT NOT NULL,'
'doc_id TEXT NOT NULL UNIQUE,'
'describe TEXT NOT NULL,'
'uid TEXT NOT NULL,'
'post_time INTEGER NOT NULL,' # 时间这里很简单就能比较
'lottery_time INTEGER NOT NULL, '

'at_num INTEGER NOT NULL,'
'feed_limit INTEGER NOT NULL,' # 0/1 表示bool型
'handle_status INTEGER NOT NULL,'

'prize_cmt_1st TEXT NOT NULL,'
'prize_cmt_2nd TEXT,'
'prize_cmt_3rd TEXT,'
'PRIMARY KEY (dyn_id)'
'); '
)
sql_move = 'INSERT INTO dynraffle_status ' \
'SELECT dyn_id, doc_id, describe, uid, post_time, lottery_time, at_num, feed_limit, ?, prize_cmt_1st, prize_cmt_2nd, prize_cmt_3rd FROM tmp_dynraffle_status'
sql_drop_old_table = 'DROP TABLE tmp_dynraffle_status'
conn.execute(sql_rename_old_table)
conn.execute(sql_create_new_table)
conn.execute(sql_move, (1,))
conn.execute(sql_drop_old_table)

conn.close()
print('DONE')
4 changes: 3 additions & 1 deletion dyn/bili_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DynRaffleStatus:
# 参与抽奖使用
at_num = attr.ib(converter=int)
feed_limit = attr.ib(converter=bool)
handle_status = attr.ib(validator=attr.validators.in_([-1, 0, 1])) # -1 表示未参与,0表示正在参与, 1表示已经参与

# 一些其他信息
prize_cmt_1st = attr.ib(validator=attr.validators.instance_of(str)) # 奖品描述这里必须str,下同,且不提供type转换
Expand All @@ -33,14 +34,15 @@ def as_sql_values(self):

at_num = self.at_num
feed_limit = int(self.feed_limit)
handle_status = self.handle_status

prize_cmt_1st = self.prize_cmt_1st[:20]
prize_cmt_2nd = self.prize_cmt_2nd[:20]
prize_cmt_3rd = self.prize_cmt_3rd[:20]

return \
dyn_id, doc_id, describe, uid, post_time, lottery_time,\
at_num, feed_limit, prize_cmt_1st, prize_cmt_2nd, prize_cmt_3rd
at_num, feed_limit, handle_status, prize_cmt_1st, prize_cmt_2nd, prize_cmt_3rd


@attr.s(frozen=True)
Expand Down
38 changes: 33 additions & 5 deletions dyn/dyn_raffle_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self):

'at_num INTEGER NOT NULL,'
'feed_limit INTEGER NOT NULL,' # 0/1 表示bool型
'handle_status INTEGER NOT NULL,'

'prize_cmt_1st TEXT NOT NULL,'
'prize_cmt_2nd TEXT,'
Expand All @@ -60,7 +61,7 @@ def as_bili_data(self, row):
def insert_element(self, dyn_raffle_status: DynRaffleStatus):
# ?,?,?这种可以对应type,否则很难折腾
with self.conn:
self.conn.execute('INSERT INTO dynraffle_status VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
self.conn.execute('INSERT INTO dynraffle_status VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
dyn_raffle_status.as_sql_values())

def select_all(self):
Expand All @@ -80,9 +81,25 @@ def del_by_primary_key(self, dyn_id):
with self.conn:
self.conn.execute('DELETE FROM dynraffle_status WHERE dyn_id=?', (str(dyn_id),))

def select_bytime(self, curr_time):
def select(self, handle_status, lottery_time_l, lottery_time_r):
if handle_status is None:
return []

results = []
for row in self.conn.execute(f'SELECT * FROM dynraffle_status WHERE lottery_time < ?', (int(curr_time),)):
if lottery_time_l is None and lottery_time_r is not None:
sql = 'SELECT * FROM dynraffle_status WHERE lottery_time <= ? AND handle_status = ?'
parameters = (int(lottery_time_r), int(handle_status))
elif lottery_time_l is not None and lottery_time_r is None:
sql = 'SELECT * FROM dynraffle_status WHERE lottery_time >= ? AND handle_status = ?'
parameters = (int(lottery_time_l), int(handle_status))
elif lottery_time_l is not None and lottery_time_r is not None:
sql = 'SELECT * FROM dynraffle_status WHERE (lottery_time BETWEEN ? AND ?) AND (handle_status = ?)'
parameters = (int(lottery_time_l), int(lottery_time_r), int(handle_status))
else:
sql = 'SELECT * FROM dynraffle_status WHERE handle_status = ?'
parameters = (int(handle_status),)

for row in self.conn.execute(sql, parameters):
results.append(self.as_bili_data(row))
return results

Expand Down Expand Up @@ -298,8 +315,19 @@ def should_del_from_dynraffle_status_table(orig_dynid):
return not cursor.fetchone()


def select_bytime(curr_time):
return dynraffle_status_table.select_bytime(curr_time)
def can_rafflestatus_be_handled(dyn_id):
cursor = conn.execute(
'SELECT 1 FROM dynraffle_status WHERE dyn_id = ? AND handle_status = ?', (str(dyn_id), 0))
return bool(cursor.fetchone())


def set_rafflestatus_handle_status(handle_status: int, dyn_id):
with conn:
conn.execute('UPDATE dynraffle_status SET handle_status = ? WHERE dyn_id = ?', (handle_status, str(dyn_id)))


def select_rafflestatus(handle_status, lottery_time_l=None, lottery_time_r=None):
return dynraffle_status_table.select(handle_status, lottery_time_l, lottery_time_r)


# 从三个表里查最新数据
Expand Down
106 changes: 75 additions & 31 deletions dyn/monitor_dyn_raffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,21 @@


class DynRaffleMonitor:
def __init__(self, dyn_raffle_description_filter=None, dyn_prize_cmt_filter=None, init_docid=None):
def __init__(self, should_join_immediately: bool, init_docid=None):
self.init_docid = init_docid
if dyn_raffle_description_filter is None:
self.dyn_raffle_description_filter = []
else:
self.dyn_raffle_description_filter = dyn_raffle_description_filter
if dyn_prize_cmt_filter is None:
self.dyn_prize_cmt_filter = []
else:
self.dyn_prize_cmt_filter = dyn_prize_cmt_filter

async def fetch_latest_docid(self):
doc_id = await notifier.exec_func(-1, DynRaffleHandlerTask.create_dyn)
await notifier.exec_func(-1, DynRaffleHandlerTask.del_dyn_by_docid, doc_id)
return doc_id

# 打算设置两步保险,一个是status表中最新doc_id,一个是开一个专门的表专门去定时更新doc_id
async def get_latest_docid(self) -> int:
return dyn_raffle_sql.init_docid()

self.dyn_raffle_description_filter = []
self.dyn_prize_cmt_filter = []

self.should_join_immediately = should_join_immediately
self._init_handle_status = -1 if not self.should_join_immediately else 0

# 获取dyn_raffle抽奖更多信息并且进行过滤
async def dig_and_filter(self, doc_id: int, uid: int, post_time: int, describe: str):
dyn_raffle_status: DynRaffleStatus = await notifier.exec_func(
-1, DynRaffleHandlerTask.fetch_dyn_raffle_status,
doc_id, uid, post_time, describe)
if dyn_raffle_status.lottery_time <= utils.curr_time() + 60:
doc_id, uid, post_time, describe, self._init_handle_status)
if dyn_raffle_status.lottery_time <= utils.curr_time() + 180:
printer.info([f'{doc_id}的动态抽奖已经开奖或马上开奖,不再参与'], True)
return
for key_word in self.dyn_raffle_description_filter:
Expand All @@ -51,14 +40,30 @@ async def dig_and_filter(self, doc_id: int, uid: int, post_time: int, describe:
if dyn_raffle_status.post_time >= utils.curr_time() - 150:
printer.info([f'{doc_id}的动态抽奖触发时间约束,休眠150秒后再正式参与'], True)
await asyncio.sleep(150)
printer.info([f'{doc_id}的动态抽奖通过时间和关键词过滤'], True)
notifier.exec_task(-1, DynRaffleHandlerTask, 0, dyn_raffle_status, delay_range=(0, 0))

async def run(self):
if dyn_raffle_sql.is_raffleid_duplicate(dyn_raffle_status.dyn_id):
printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖触发重复性过滤'], True)
return
dyn_raffle_sql.insert_dynraffle_status_table(dyn_raffle_status)

printer.info([f'{doc_id}的动态抽奖通过过滤与验证,正式处理'], True)

if dyn_raffle_status.handle_status == -1:
printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖暂不参与,仅记录数据库中等候轮询'], True)
return
printer.info([f'{doc_id}的动态抽奖正在参与'], True)
await notifier.exec_task_awaitable(-1, DynRaffleHandlerTask, 1, dyn_raffle_status,
delay_range=(0, 30))
dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id)
printer.info([f'{doc_id}的动态抽奖参与完毕'], True)

async def check_raffle(self):
if self.init_docid is None:
init_docid = await self.get_latest_docid()
init_docid = dyn_raffle_sql.init_docid() # 1.数据库查询
if init_docid < 0:
init_docid = await self.fetch_latest_docid() - 1000 - 1 # 最后保险,必须是有效doc_id
doc_id = await notifier.exec_func(-1, DynRaffleHandlerTask.create_dyn) # 2.动态获取最新的id
await notifier.exec_func(-1, DynRaffleHandlerTask.del_dyn_by_docid, doc_id)
init_docid = doc_id - 1000 - 1
dyn_raffle_sql.insert_or_replace_other_able('init_docid', init_docid)
self.init_docid = init_docid + 1
curr_docid = self.init_docid
Expand Down Expand Up @@ -89,18 +94,57 @@ async def run(self):

async def check_result(self):
while True:
results = dyn_raffle_sql.select_bytime(utils.curr_time() + 900) # 延迟15min处理抽奖
results = dyn_raffle_sql.select_rafflestatus(1, None, utils.curr_time() - 900) # 延迟15min处理抽奖
results += dyn_raffle_sql.select_rafflestatus(-1, None, utils.curr_time() - 900)
printer.info(['正在查找已经结束的动态抽奖:', results], True)
for dyn_raffle_status in results:

dyn_raffle_results: Optional[DynRaffleResults] = await notifier.exec_func(
-1, DynRaffleHandlerTask.fetch_dyn_raffle_results,
dyn_raffle_status)
print(dyn_raffle_status, dyn_raffle_results)
future = asyncio.Future()
notifier.exec_task(-1, DynRaffleHandlerTask, 2, dyn_raffle_status, dyn_raffle_results, future, delay_range=(0, 30))
await future

await notifier.exec_task_awaitable(-1, DynRaffleHandlerTask, 2, dyn_raffle_status, dyn_raffle_results, delay_range=(0, 30))
if dyn_raffle_results is not None:
dyn_raffle_sql.insert_dynraffle_results_table(dyn_raffle_results)
dyn_raffle_sql.del_from_dynraffle_status_table(dyn_raffle_status.dyn_id)

await asyncio.sleep(120)

# TODO:
async def check_handle(self):
while True:
curr_time = utils.curr_time()
results = dyn_raffle_sql.select_rafflestatus(-1, curr_time + 300, curr_time + 1200)[:5] # 20分钟到5分钟
printer.info(['正在查找需要参与的动态抽奖:', results], True)
for dyn_raffle_status in results:
print(dyn_raffle_status)
is_exist = await notifier.exec_func(
-1, DynRaffleHandlerTask.check, dyn_raffle_status.doc_id)
if not is_exist:
dyn_raffle_sql.del_from_dynraffle_status_table(dyn_raffle_status.dyn_id)
continue
printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖正在参与'], True)
await notifier.exec_task_awaitable(-1, DynRaffleHandlerTask, 1, dyn_raffle_status)
dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id)
printer.info([f'{dyn_raffle_status.doc_id}的动态抽奖参与完毕'], True)
if not results:
await asyncio.sleep(60)

async def run(self):
results = dyn_raffle_sql.select_rafflestatus(0)
for dyn_raffle_status in results:
print(dyn_raffle_status)
printer.info([f'正在暴力处理上次中断的{dyn_raffle_status.doc_id}的动态抽奖后续'], True)
dyn_raffle_sql.set_rafflestatus_handle_status(1, dyn_raffle_status.dyn_id)

print('Dnone')
await asyncio.sleep(3600)
printer.info([f'欢迎使用动态抽奖'], True)
tasks = []
task_check_raffle = asyncio.ensure_future(self.check_raffle())
tasks.append(task_check_raffle)
task_check_result = asyncio.ensure_future(self.check_result())
tasks.append(task_check_result)
if not self.should_join_immediately:
task_check_join = asyncio.ensure_future(self.check_handle())
tasks.append(task_check_join)
await asyncio.wait(tasks)
7 changes: 2 additions & 5 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,10 @@ async def get_printer_danmu():
notifier.exec_task(-2, BiliMainTask, 0, delay_range=(0, 5))


dyn_raffle_moitor = DynRaffleMonitor()

other_tasks = [
raffle_handler.run(),
# SubstanceRaffleMonitor().run()
dyn_raffle_moitor.run(),
dyn_raffle_moitor.check_result()
# SubstanceRaffleMonitor().run(),
# DynRaffleMonitor(should_join_immediately=True).run(),
]

loop.run_until_complete(asyncio.wait(other_tasks))
Expand Down
34 changes: 16 additions & 18 deletions tasks/dyn_raffle_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ async def is_dyn_raffle(user, doc_id):
return -1, None
# 目前未发现其他code
user.warn(f'互动抽奖初步查询 {json_rsp}')
return 3, None
return -1, None

@staticmethod
async def fetch_dyn_raffle_status(
user, doc_id: int, uid: int, post_time: int, describe: str) -> Optional[DynRaffleStatus]:
user, doc_id: int, uid: int, post_time: int, describe: str, handle_status: int) -> Optional[DynRaffleStatus]:
json_rsp = await user.req_s(DynRaffleHandlerReq.fetch_dyn_raffle, user, doc_id)
code = json_rsp['code']
if not code:
Expand Down Expand Up @@ -167,6 +167,7 @@ async def fetch_dyn_raffle_status(
lottery_time=lottery_time,
at_num=at_num,
feed_limit=feed_limit,
handle_status=handle_status,
prize_cmt_1st=first_prize_cmt,
prize_cmt_2nd=second_prize_cmt,
prize_cmt_3rd=third_prize_cmt
Expand Down Expand Up @@ -216,16 +217,6 @@ async def fetch_dyn_raffle_results(
print(f'抽奖动态{dyn_raffle_status.doc_id}已经删除')
return None

@staticmethod
async def check(user, dyn_raffle_status: DynRaffleStatus):
if not dyn_raffle_sql.is_raffleid_duplicate(dyn_raffle_status.dyn_id):
user.info([f'{dyn_raffle_status.doc_id}的动态抽奖通过重复性过滤'], True)
dyn_raffle_sql.insert_dynraffle_status_table(dyn_raffle_status)
max_sleeptime = max(min(35, dyn_raffle_status.lottery_time-utils.curr_time() - 10), 0)
return (1, (0, max_sleeptime), -2, dyn_raffle_status),
user.info([f'{dyn_raffle_status.doc_id}的动态抽奖未通过重复性过滤'], True)
return None

@staticmethod
async def follow_raffle_organizer(user, uid):
is_following, group_ids = await UtilsTask.check_follow(user, uid)
Expand All @@ -246,8 +237,20 @@ async def unfollow_raffle_organizer(user, uid):
if group_id in group_ids:
await UtilsTask.unfollow(user, uid)

@staticmethod
async def check(user, doc_id: int):
# 确认dyn存在性
json_rsp = await user.req_s(DynRaffleHandlerReq.fetch_dyn_raffle, user, doc_id)
code = json_rsp['code']
if not code:
return True
user.info([f'{doc_id}的动态抽奖不存在'], True)
return False

@staticmethod
async def join(user, dyn_raffle_status: DynRaffleStatus):
if dyn_raffle_status.lottery_time - utils.curr_time() < 15:
user.info([f'动态{dyn_raffle_status.dyn_id}马上或已经开奖,放弃参与'], True)
async with user.repost_del_lock:
if dyn_raffle_status.feed_limit: # 关注
await DynRaffleHandlerTask.follow_raffle_organizer(user, dyn_raffle_status.uid)
Expand All @@ -270,7 +273,7 @@ async def join(user, dyn_raffle_status: DynRaffleStatus):
return

@staticmethod
async def notice(user, dyn_raffle_status: DynRaffleStatus, dyn_raffle_results: Optional[DynRaffleResults], all_done_future=None):
async def notice(user, dyn_raffle_status: DynRaffleStatus, dyn_raffle_results: Optional[DynRaffleResults]):
int_user_uid = int(user.dict_bili['uid'])
async with user.repost_del_lock:
dyn_raffle_joined = dyn_raffle_sql.select_by_primary_key_from_dynraffle_joined_table(
Expand Down Expand Up @@ -306,8 +309,3 @@ async def notice(user, dyn_raffle_status: DynRaffleStatus, dyn_raffle_results: O
orig_dynid=dyn_raffle_joined.orig_dynid,
following_uid=following_uid
))

if dyn_raffle_sql.should_del_from_dynraffle_status_table(dyn_raffle_status.dyn_id):
dyn_raffle_sql.del_from_dynraffle_status_table(dyn_raffle_status.dyn_id)
if all_done_future is not None:
all_done_future.set_result(True)

0 comments on commit 93fb545

Please sign in to comment.