Skip to content

Commit

Permalink
limit ffmpeg sample video creation threads and other minor changes
Browse files Browse the repository at this point in the history
Signed-off-by: anasty17 <e.anastayyar@gmail.com>
  • Loading branch information
anasty17 committed Nov 16, 2023
1 parent 6031aac commit da14a76
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 102 deletions.
1 change: 1 addition & 0 deletions bot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
queue_dict_lock = Lock()
qb_listener_lock = Lock()
cpu_eater_lock = Lock()
subprocess_lock = Lock()
status_dict = {}
task_dict = {}
rss_dict = {}
Expand Down
8 changes: 7 additions & 1 deletion bot/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from bot import (
bot,
user,
botStartTime,
LOGGER,
Interval,
Expand Down Expand Up @@ -120,6 +121,8 @@ async def restart(_, message):
proc1 = await create_subprocess_exec(
"pkill", "-9", "-f", "gunicorn|aria2c|qbittorrent-nox|ffmpeg|rclone"
)
if user:
await user.stop()
proc2 = await create_subprocess_exec("python3", "update.py")
await gather(proc1.wait(), proc2.wait())
async with aiopen(".restartmsg", "w") as f:
Expand Down Expand Up @@ -228,7 +231,10 @@ async def send_incompelete_task_message(cid, msg):

async def main():
await gather(
start_cleanup(), torrent_search.initiate_search_tools(), restart_notification(), initiate_help_messages()
start_cleanup(),
torrent_search.initiate_search_tools(),
restart_notification(),
initiate_help_messages(),
)
await sync_to_async(start_aria2_listener, wait=False)

Expand Down
32 changes: 15 additions & 17 deletions bot/helper/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
task_dict,
GLOBAL_EXTENSION_FILTER,
cpu_eater_lock,
subprocess_lock,
)
from bot.helper.telegram_helper.bot_commands import BotCommands
from bot.helper.ext_utils.bot_utils import new_task, sync_to_async
Expand Down Expand Up @@ -428,13 +429,10 @@ async def proceedExtract(self, dl_path, size, gid):
]
if not pswd:
del cmd[2]
if (
self.suproc == "cancelled"
or self.suproc is not None
and self.suproc.returncode == -9
):
return False
self.suproc = await create_subprocess_exec(*cmd)
async with subprocess_lock:
if self.suproc == "cancelled":
return False
self.suproc = await create_subprocess_exec(*cmd)
_, stderr = await self.suproc.communicate()
code = self.suproc.returncode
if code == -9:
Expand Down Expand Up @@ -473,9 +471,10 @@ async def proceedExtract(self, dl_path, size, gid):
]
if not pswd:
del cmd[2]
if self.suproc == "cancelled":
return False
self.suproc = await create_subprocess_exec(*cmd)
async with subprocess_lock:
if self.suproc == "cancelled":
return False
self.suproc = await create_subprocess_exec(*cmd)
_, stderr = await self.suproc.communicate()
code = self.suproc.returncode
if code == -9:
Expand Down Expand Up @@ -538,9 +537,10 @@ async def proceedCompress(self, dl_path, size, gid):
if not pswd:
del cmd[3]
LOGGER.info(f"Zip: orig_path: {dl_path}, zip_path: {up_path}")
if self.suproc == "cancelled":
return False
self.suproc = await create_subprocess_exec(*cmd)
async with subprocess_lock:
if self.suproc == "cancelled":
return False
self.suproc = await create_subprocess_exec(*cmd)
_, stderr = await self.suproc.communicate()
code = self.suproc.returncode
if code == -9:
Expand All @@ -567,7 +567,7 @@ async def proceedSplit(self, up_dir, m_size, o_files, size, gid):
task_dict[self.mid] = SplitStatus(self, size, gid)
LOGGER.info(f"Splitting: {self.name}")
res = await split_file(
f_path, f_size, file_, dirpath, self.splitSize, self
f_path, f_size, dirpath, self.splitSize, self
)
if not res:
return False
Expand Down Expand Up @@ -612,9 +612,7 @@ async def generateSampleVideo(self, dl_path, size, gid):
)
return res
else:
for dirpath, _, files in await sync_to_async(
walk, dl_path, topdown=False
):
for dirpath, _, files in await sync_to_async(walk, dl_path, topdown=False):
for file_ in files:
f_path = ospath.join(dirpath, file_)
if (await get_document_type(f_path))[0]:
Expand Down
101 changes: 42 additions & 59 deletions bot/helper/ext_utils/media_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from os import path as ospath
from aiofiles.os import remove as aioremove, path as aiopath, mkdir
from os import path as ospath, cpu_count
from aiofiles.os import remove as aioremove, path as aiopath, makedirs
from time import time
from re import search as re_search
from asyncio import create_subprocess_exec, gather
from asyncio.subprocess import PIPE
from PIL import Image
from aioshutil import move

from bot import LOGGER
from bot import LOGGER, subprocess_lock
from bot.helper.ext_utils.bot_utils import cmd_exec
from bot.helper.ext_utils.bot_utils import sync_to_async
from bot.helper.ext_utils.files_utils import ARCH_EXT, get_mime_type
Expand All @@ -30,8 +30,7 @@ async def createThumb(msg, _id=""):
if not _id:
_id = msg.id
path = "Thumbnails/"
if not await aiopath.isdir(path):
await mkdir(path)
await makedirs(path, exist_ok=True)
photo_dir = await msg.download()
des_dir = f"{path}{_id}.jpg"
await sync_to_async(Image.open(photo_dir).convert("RGB").save, des_dir, "JPEG")
Expand Down Expand Up @@ -154,21 +153,20 @@ async def take_ss(video_file, ss_nb) -> list:
duration = (await get_media_info(video_file))[0]
if duration != 0:
dirpath, name = video_file.rsplit("/", 1)
name = name.rsplit(".", 1)[0]
name, _ = ospath.splitext(name)
dirpath = f"{dirpath}/screenshots/"
if not await aiopath.exists(dirpath):
await mkdir(dirpath)
await makedirs(dirpath, exist_ok=True)
interval = duration // ss_nb + 1
cap_time = interval
outputs = []
cmd = ''
cmd = ""
for i in range(ss_nb):
output = f"{dirpath}SS.{name}_{i:02}.png"
outputs.append(output)
cmd += f'ffmpeg -hide_banner -loglevel error -ss {cap_time} -i "{video_file}" -q:v 1 -frames:v 1 "{output}"'
cap_time += interval
if i + 1 != ss_nb:
cmd += ' && '
cmd += " && "
_, err, code = await cmd_exec(cmd, True)
if code != 0:
LOGGER.error(
Expand All @@ -183,8 +181,7 @@ async def take_ss(video_file, ss_nb) -> list:

async def get_audio_thumb(audio_file):
des_dir = "Thumbnails/"
if not await aiopath.exists(des_dir):
await mkdir(des_dir)
await makedirs(des_dir, exist_ok=True)
des_dir = f"Thumbnails/{time()}.jpg"
cmd = [
"ffmpeg",
Expand All @@ -209,8 +206,7 @@ async def get_audio_thumb(audio_file):

async def create_thumbnail(video_file, duration):
des_dir = "Thumbnails"
if not await aiopath.exists(des_dir):
await mkdir(des_dir)
await makedirs(des_dir, exist_ok=True)
des_dir = ospath.join(des_dir, f"{time()}.jpg")
if duration is None:
duration = (await get_media_info(video_file))[0]
Expand Down Expand Up @@ -244,7 +240,6 @@ async def create_thumbnail(video_file, duration):
async def split_file(
path,
size,
file_,
dirpath,
split_size,
listener,
Expand All @@ -253,28 +248,20 @@ async def split_file(
inLoop=False,
multi_streams=True,
):
if (
listener.suproc == "cancelled"
or listener.suproc is not None
and listener.suproc.returncode == -9
):
return False
if listener.seed and not listener.newDir:
dirpath = f"{dirpath}/splited_files_mltb"
if not await aiopath.exists(dirpath):
await mkdir(dirpath)
await makedirs(dirpath, exist_ok=True)
parts = -(-size // listener.splitSize)
if listener.equalSplits and not inLoop:
split_size = (size // parts) + (size % parts)
if not listener.as_doc and (await get_document_type(path))[0]:
if multi_streams:
multi_streams = await is_multi_streams(path)
duration = (await get_media_info(path))[0]
base_name, extension = ospath.splitext(file_)
base_name, extension = ospath.splitext(path)
split_size -= 5000000
while i <= parts or start_time < duration - 4:
parted_name = f"{base_name}.part{i:03}{extension}"
out_path = ospath.join(dirpath, parted_name)
out_path = f"{base_name}.part{i:03}{extension}"
cmd = [
"ffmpeg",
"-hide_banner",
Expand All @@ -301,13 +288,10 @@ async def split_file(
if not multi_streams:
del cmd[10]
del cmd[10]
if (
listener.suproc == "cancelled"
or listener.suproc is not None
and listener.suproc.returncode == -9
):
return False
listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE)
async with subprocess_lock:
if listener.suproc == "cancelled":
return False
listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE)
_, stderr = await listener.suproc.communicate()
code = listener.suproc.returncode
if code == -9:
Expand All @@ -325,7 +309,6 @@ async def split_file(
return await split_file(
path,
size,
file_,
dirpath,
split_size,
listener,
Expand All @@ -347,7 +330,6 @@ async def split_file(
return await split_file(
path,
size,
file_,
dirpath,
split_size,
listener,
Expand All @@ -373,16 +355,19 @@ async def split_file(
start_time += lpd - 3
i += 1
else:
out_path = ospath.join(dirpath, f"{file_}.")
listener.suproc = await create_subprocess_exec(
"split",
"--numeric-suffixes=1",
"--suffix-length=3",
f"--bytes={split_size}",
path,
out_path,
stderr=PIPE,
)
out_path = f"{path}."
async with subprocess_lock:
if listener.suproc == "cancelled":
return False
listener.suproc = await create_subprocess_exec(
"split",
"--numeric-suffixes=1",
"--suffix-length=3",
f"--bytes={split_size}",
path,
out_path,
stderr=PIPE,
)
_, stderr = await listener.suproc.communicate()
code = listener.suproc.returncode
if code == -9:
Expand All @@ -394,13 +379,13 @@ async def split_file(


async def createSampleVideo(
listener, input_file, sample_duration, part_duration, oneFile=False
listener, video_file, sample_duration, part_duration, oneFile=False
):
filter_complex = ""
dir, name = input_file.rsplit("/", 1)
dir, name = video_file.rsplit("/", 1)
output_file = f"{dir}/SAMPLE.{name}"
segments = [(0, part_duration)]
duration = (await get_media_info(input_file))[0]
duration = (await get_media_info(video_file))[0]
remaining_duration = duration - (part_duration * 2)
parts = (sample_duration - (part_duration * 2)) // part_duration
time_interval = remaining_duration // parts
Expand All @@ -426,7 +411,7 @@ async def createSampleVideo(
cmd = [
"ffmpeg",
"-i",
input_file,
video_file,
"-filter_complex",
filter_complex,
"-map",
Expand All @@ -437,14 +422,12 @@ async def createSampleVideo(
"libx264",
"-c:a",
"aac",
"-threads",
f"{cpu_count()//2}",
output_file,
]

if (
listener.suproc == "cancelled"
or listener.suproc is not None
and listener.suproc.returncode == -9
):
if listener.suproc == "cancelled":
return False
listener.suproc = await create_subprocess_exec(*cmd, stderr=PIPE)
_, stderr = await listener.suproc.communicate()
Expand All @@ -454,15 +437,15 @@ async def createSampleVideo(
elif code != 0:
stderr = stderr.decode().strip()
LOGGER.error(
f"{stderr}. Something went wrong while creating sample video, mostly file is corrupted. Path: {input_file}"
f"{stderr}. Something went wrong while creating sample video, mostly file is corrupted. Path: {video_file}"
)
return input_file
return video_file
else:
if oneFile:
newDir = input_file.rsplit(".", 1)[0]
await mkdir(newDir)
newDir, _ = ospath.splitext(video_file)
await makedirs(newDir, exist_ok=True)
await gather(
move(input_file, f"{newDir}/{name}"),
move(video_file, f"{newDir}/{name}"),
move(output_file, f"{newDir}/SAMPLE.{name}"),
)
return newDir
Expand Down
7 changes: 3 additions & 4 deletions bot/helper/mirror_utils/rclone_utils/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from asyncio.subprocess import PIPE
from re import findall as re_findall
from json import loads
from aiofiles.os import path as aiopath, mkdir, listdir
from aiofiles.os import path as aiopath, makedirs, listdir
from aiofiles import open as aiopen
from configparser import ConfigParser
from random import randrange
Expand Down Expand Up @@ -95,10 +95,9 @@ def _switchServiceAccount(self):
async def _create_rc_sa(self, remote, remote_opts):
sa_conf_dir = "rclone_sa"
sa_conf_file = f"{sa_conf_dir}/{remote}.conf"
if not await aiopath.isdir(sa_conf_dir):
await mkdir(sa_conf_dir)
elif await aiopath.isfile(sa_conf_file):
if await aiopath.isfile(sa_conf_file):
return sa_conf_file
await makedirs(sa_conf_dir, exist_ok=True)

if gd_id := remote_opts.get("team_drive"):
option = "team_drive"
Expand Down
14 changes: 9 additions & 5 deletions bot/helper/mirror_utils/status_utils/extract_status.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from time import time

from bot import LOGGER
from bot import LOGGER, subprocess_lock
from bot.helper.ext_utils.status_utils import (
get_readable_file_size,
MirrorStatus,
Expand Down Expand Up @@ -65,8 +65,12 @@ def task(self):

async def cancel_task(self):
LOGGER.info(f"Cancelling Extract: {self.listener.name}")
if self.listener.suproc is not None:
self.listener.suproc.kill()
else:
self.listener.suproc = "cancelled"
async with subprocess_lock:
if (
self.listener.suproc is not None
and self.listener.suproc.returncode is None
):
self.listener.suproc.kill()
else:
self.listener.suproc = "cancelled"
await self.listener.onUploadError("extracting stopped by user!")
Loading

0 comments on commit da14a76

Please sign in to comment.