Skip to content

Commit

Permalink
Fix mega download
Browse files Browse the repository at this point in the history
Signed-off-by: anasty17 <e.anastayyar@gmail.com>
  • Loading branch information
anasty17 committed Nov 6, 2023
1 parent 203934a commit 1d8ea0d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 36 deletions.
6 changes: 3 additions & 3 deletions bot/helper/ext_utils/bot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ def bt_selection_buttons(id_):
async def initiate_help_messages():
mirror, yt, clone = await gather(
telegraph.create_page(
title="Mirror-Leech-Bot Drive Search", content=MIRROR_HELP_MESSAGE
title="Mirror-Leech Command Usage", content=MIRROR_HELP_MESSAGE
),
telegraph.create_page(
title="Mirror-Leech-Bot Drive Search", content=YT_HELP_MESSAGE
title="YTDLP Command Usage", content=YT_HELP_MESSAGE
),
telegraph.create_page(
title="Mirror-Leech-Bot Drive Search", content=CLONE_HELP_MESSAGE
title="Clone Command Usage", content=CLONE_HELP_MESSAGE
),
)
buttons = ButtonMaker()
Expand Down
67 changes: 34 additions & 33 deletions bot/helper/mirror_utils/download_utils/mega_download.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from secrets import token_urlsafe
from aiofiles.os import makedirs
from asyncio import Event
from threading import Event
from mega import MegaApi, MegaListener, MegaRequest, MegaTransfer, MegaError

from bot import (
Expand All @@ -10,10 +10,9 @@
task_dict,
non_queued_dl,
queue_dict_lock,
bot_loop,
)
from bot.helper.telegram_helper.message_utils import sendMessage, sendStatusMessage
from bot.helper.ext_utils.bot_utils import async_to_sync, sync_to_async
from bot.helper.ext_utils.bot_utils import sync_to_async
from bot.helper.ext_utils.links_utils import get_mega_link_type
from bot.helper.mirror_utils.status_utils.mega_download_status import MegaDownloadStatus
from bot.helper.mirror_utils.status_utils.queue_status import QueueStatus
Expand All @@ -31,6 +30,7 @@ def __init__(self, continue_event: Event, listener):
self.listener = listener
self.is_cancelled = False
self.error = None
self.completed = False
self.isFile = False
self._bytes_transferred = 0
self._speed = 0
Expand All @@ -49,7 +49,7 @@ def onRequestFinish(self, api, request, error):
if str(error).lower() != "no error":
self.error = error.copy()
LOGGER.error(f"Mega onRequestFinishError: {self.error}")
bot_loop.create_task(self.continue_event.set())
self.continue_event.set()
return
request_type = request.getType()
if request_type == MegaRequest.TYPE_LOGIN:
Expand All @@ -67,22 +67,19 @@ def onRequestFinish(self, api, request, error):
or self.node
and "cloud drive" not in self._name.lower()
):
bot_loop.create_task(self.continue_event.set())
self.continue_event.set()

def onRequestTemporaryError(self, api, request, error: MegaError):
LOGGER.error(f"Mega Request error in {error}")
if not self.is_cancelled:
self.is_cancelled = True
async_to_sync(
self.listener.onDownloadError, f"RequestTempError: {error.toString()}"
)
self.error = error.toString()
bot_loop.create_task(self.continue_event.set())
self.error = f"RequestTempError: {error.toString()}"
self.continue_event.set()

def onTransferUpdate(self, api: MegaApi, transfer: MegaTransfer):
if self.is_cancelled:
api.cancelTransfer(transfer, None)
bot_loop.create_task(self.continue_event.set())
self.continue_event.set()
return
self._speed = transfer.getSpeed()
self._bytes_transferred = transfer.getTransferredBytes()
Expand All @@ -92,8 +89,8 @@ def onTransferFinish(self, api: MegaApi, transfer: MegaTransfer, error):
if self.is_cancelled:
self.continue_event.set()
elif transfer.isFinished() and (transfer.isFolderTransfer() or self.isFile):
async_to_sync(self.listener.onDownloadComplete)
bot_loop.create_task(self.continue_event.set())
self.completed = True
self.continue_event.set()
except Exception as e:
LOGGER.error(e)

Expand All @@ -107,13 +104,10 @@ def onTransferTemporaryError(self, api, transfer, error):
# Don't break the transfer queue if transfer's in queued (1) or retrying (4) state [causes seg fault]
return

self.error = errStr
self.error = f"TransferTempError: {errStr} ({filen}"
if not self.is_cancelled:
self.is_cancelled = True
async_to_sync(
self.listener.onDownloadError, f"TransferTempError: {errStr} ({filen})"
)
bot_loop.create_task(self.continue_event.set())
self.continue_event.set()

async def cancel_task(self):
self.is_cancelled = True
Expand All @@ -124,10 +118,10 @@ class AsyncExecutor:
def __init__(self):
self.continue_event = Event()

async def do(self, function, args):
def do(self, function, args):
self.continue_event.clear()
await sync_to_async(function, *args)
await self.continue_event.wait()
function(*args)
self.continue_event.wait()


async def add_mega_download(listener, path):
Expand All @@ -142,31 +136,31 @@ async def add_mega_download(listener, path):
api.addListener(mega_listener)

if MEGA_EMAIL and MEGA_PASSWORD:
await executor.do(api.login, (MEGA_EMAIL, MEGA_PASSWORD))
await sync_to_async(executor.do, api.login, (MEGA_EMAIL, MEGA_PASSWORD))

if get_mega_link_type(listener.link) == "file":
await executor.do(api.getPublicNode, (listener.link,))
await sync_to_async(executor.do, api.getPublicNode, (listener.link,))
node = mega_listener.public_node
mega_listener.isFile = True
else:
folder_api = MegaApi(None, None, None, "mirror-leech-telegram-bot")
folder_api.addListener(mega_listener)
await executor.do(folder_api.loginToFolder, (listener.link,))
await sync_to_async(executor.do, folder_api.loginToFolder, (listener.link,))
node = await sync_to_async(folder_api.authorizeNode, mega_listener.node)
if mega_listener.error is not None:
await sendMessage(listener.message, str(mega_listener.error))
await executor.do(api.logout, ())
await sync_to_async(executor.do, api.logout, ())
if folder_api is not None:
await executor.do(folder_api.logout, ())
await sync_to_async(executor.do, folder_api.logout, ())
return

listener.name = listener.name or node.getName()
msg, button = await stop_duplicate_check(listener)
if msg:
await sendMessage(listener.message, msg, button)
await executor.do(api.logout, ())
await sync_to_async(executor.do, api.logout, ())
if folder_api is not None:
await executor.do(folder_api.logout, ())
await sync_to_async(executor.do, folder_api.logout, ())
return

gid = token_urlsafe(8)
Expand All @@ -183,9 +177,9 @@ async def add_mega_download(listener, path):
await event.wait()
async with task_dict_lock:
if listener.mid not in task_dict:
await executor.do(api.logout, ())
await sync_to_async(executor.do, api.logout, ())
if folder_api is not None:
await executor.do(folder_api.logout, ())
await sync_to_async(executor.do, folder_api.logout, ())
return
from_queue = True
LOGGER.info(f"Start Queued Download from Mega: {listener.name}")
Expand All @@ -206,7 +200,14 @@ async def add_mega_download(listener, path):
LOGGER.info(f"Download from Mega: {listener.name}")

await makedirs(path, exist_ok=True)
await executor.do(api.startDownload, (node, path, listener.name, None, False, None))
await executor.do(api.logout, ())
await sync_to_async(
executor.do, api.startDownload, (node, path, listener.name, None, False, None)
)
await sync_to_async(executor.do, api.logout, ())
if folder_api is not None:
await executor.do(folder_api.logout, ())
await sync_to_async(executor.do, folder_api.logout, ())

if mega_listener.completed:
await listener.onDownloadComplete()
elif (error := mega_listener.error) and mega_listener.is_cancelled:
await listener.onDownloadError(error)

0 comments on commit 1d8ea0d

Please sign in to comment.