From 1bff124f2ec279032e2093d68b62057ab4459087 Mon Sep 17 00:00:00 2001 From: Christoph Wurst Date: Fri, 3 Jul 2020 13:43:46 +0200 Subject: [PATCH 1/2] Build threads after each IMAP sync Signed-off-by: Christoph Wurst --- lib/AppInfo/Application.php | 3 + lib/Db/MessageMapper.php | 66 ++++++++++- lib/Events/SynchronizationEvent.php | 45 ++++++++ lib/IMAP/Threading/DatabaseMessage.php | 91 +++++++++++++++ ...countSynchronizedThreadUpdaterListener.php | 106 ++++++++++++++++++ lib/Service/Sync/ImapToDbSynchronizer.php | 22 +++- 6 files changed, 327 insertions(+), 6 deletions(-) create mode 100644 lib/Events/SynchronizationEvent.php create mode 100644 lib/IMAP/Threading/DatabaseMessage.php create mode 100644 lib/Listener/AccountSynchronizedThreadUpdaterListener.php diff --git a/lib/AppInfo/Application.php b/lib/AppInfo/Application.php index c64d6130b9..aeb49f13c4 100644 --- a/lib/AppInfo/Application.php +++ b/lib/AppInfo/Application.php @@ -32,6 +32,7 @@ use OCA\Mail\Contracts\IUserPreferences; use OCA\Mail\Events\BeforeMessageDeletedEvent; use OCA\Mail\Events\DraftSavedEvent; +use OCA\Mail\Events\SynchronizationEvent; use OCA\Mail\Events\MessageDeletedEvent; use OCA\Mail\Events\MessageFlaggedEvent; use OCA\Mail\Events\MessageSentEvent; @@ -44,6 +45,7 @@ use OCA\Mail\Listener\DraftMailboxCreatorListener; use OCA\Mail\Listener\FlagRepliedMessageListener; use OCA\Mail\Listener\InteractionListener; +use OCA\Mail\Listener\AccountSynchronizedThreadUpdaterListener; use OCA\Mail\Listener\MessageCacheUpdaterListener; use OCA\Mail\Listener\NewMessageClassificationListener; use OCA\Mail\Listener\SaveSentMessageListener; @@ -111,6 +113,7 @@ private function registerEvents(IAppContainer $container): void { $dispatcher->addServiceListener(MessageSentEvent::class, SaveSentMessageListener::class); $dispatcher->addServiceListener(NewMessagesSynchronized::class, NewMessageClassificationListener::class); $dispatcher->addServiceListener(SaveDraftEvent::class, DraftMailboxCreatorListener::class); + $dispatcher->addServiceListener(SynchronizationEvent::class, AccountSynchronizedThreadUpdaterListener::class); $dispatcher->addServiceListener(UserDeletedEvent::class, UserDeletedListener::class); } } diff --git a/lib/Db/MessageMapper.php b/lib/Db/MessageMapper.php index 1c1b57407a..a6e3438fbf 100644 --- a/lib/Db/MessageMapper.php +++ b/lib/Db/MessageMapper.php @@ -26,8 +26,10 @@ namespace OCA\Mail\Db; use Horde_Imap_Client; +use OCA\Mail\Account; use OCA\Mail\Address; use OCA\Mail\AddressList; +use OCA\Mail\IMAP\Threading\DatabaseMessage; use OCA\Mail\Service\Search\SearchQuery; use OCP\AppFramework\Db\QBMapper; use OCP\AppFramework\Utility\ITimeFactory; @@ -86,6 +88,66 @@ public function findAllUids(Mailbox $mailbox): array { return $this->findUids($query); } + /** + * @param Account $account + * + * @return DatabaseMessage[] + */ + public function findThreadingData(Account $account): array { + $mailboxesQuery = $this->db->getQueryBuilder(); + $messagesQuery = $this->db->getQueryBuilder(); + + $mailboxesQuery->select('id') + ->from('mail_mailboxes') + ->where($mailboxesQuery->expr()->eq('account_id', $messagesQuery->createNamedParameter($account->getId(), IQueryBuilder::PARAM_INT), IQueryBuilder::PARAM_INT)); + $messagesQuery->select('id', 'subject', 'message_id', 'in_reply_to', 'references', 'thread_root_id') + ->from($this->getTableName()) + ->where($messagesQuery->expr()->in('mailbox_id', $messagesQuery->createFunction($mailboxesQuery->getSQL()), IQueryBuilder::PARAM_INT_ARRAY)) + ->andWhere($messagesQuery->expr()->isNotNull('message_id')); + + $result = $messagesQuery->execute(); + $messages = array_map(function (array $row) { + return DatabaseMessage::fromRowData( + (int)$row['id'], + $row['subject'], + $row['message_id'], + $row['references'], + $row['in_reply_to'], + $row['thread_root_id'] + ); + }, $result->fetchAll()); + $result->closeCursor(); + + return $messages; + } + + /** + * @param DatabaseMessage[] $messages + * + * @todo combine threads and send just one query per thread, like UPDATE ... SET thread_root_id = xxx where UID IN (...) + */ + public function writeThreadIds(array $messages): void { + $this->db->beginTransaction(); + + $query = $this->db->getQueryBuilder(); + $query->update($this->getTableName()) + ->set('thread_root_id', $query->createParameter('thread_root_id')) + ->where($query->expr()->eq('id', $query->createParameter('id'))); + + foreach ($messages as $message) { + $query->setParameter( + 'thread_root_id', + $message->getThreadRootId(), + $message->getThreadRootId() === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR + ); + $query->setParameter('id', $message->getDatabaseId(), IQueryBuilder::PARAM_INT); + + $query->execute(); + } + + $this->db->commit(); + } + public function insertBulk(Message ...$messages): void { $this->db->beginTransaction(); @@ -124,7 +186,7 @@ public function insertBulk(Message ...$messages): void { $references = $message->getReferences(); $qb1->setParameter('references', $references, $references === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR); $threadRootId = $message->getThreadRootId(); - $qb1->setParameter('thread_root_id', $threadRootId,$threadRootId === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR); + $qb1->setParameter('thread_root_id', $threadRootId, $threadRootId === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR); $qb1->setParameter('mailbox_id', $message->getMailboxId(), IQueryBuilder::PARAM_INT); $qb1->setParameter('subject', $message->getSubject(), IQueryBuilder::PARAM_STR); $qb1->setParameter('sent_at', $message->getSentAt(), IQueryBuilder::PARAM_INT); @@ -335,7 +397,7 @@ public function findUidsByQuery(Mailbox $mailbox, SearchQuery $query, ?int $limi $select->andWhere( $qb->expr()->iLike( 'subject', - $qb->createNamedParameter('%' . $this->db->escapeLikeParameter($query-> $query->getSubject()) . '%', IQueryBuilder::PARAM_STR), + $qb->createNamedParameter('%' . $this->db->escapeLikeParameter($query->$query->getSubject()) . '%', IQueryBuilder::PARAM_STR), IQueryBuilder::PARAM_STR ) ); diff --git a/lib/Events/SynchronizationEvent.php b/lib/Events/SynchronizationEvent.php new file mode 100644 index 0000000000..12e51968a7 --- /dev/null +++ b/lib/Events/SynchronizationEvent.php @@ -0,0 +1,45 @@ + + * + * @author 2020 Christoph Wurst + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +namespace OCA\Mail\Events; + +use OCA\Mail\Account; +use OCP\EventDispatcher\Event; + +class SynchronizationEvent extends Event { + + /** @var Account */ + private $account; + + public function __construct(Account $account) { + parent::__construct(); + + $this->account = $account; + } + + public function getAccount(): Account { + return $this->account; + } +} diff --git a/lib/IMAP/Threading/DatabaseMessage.php b/lib/IMAP/Threading/DatabaseMessage.php new file mode 100644 index 0000000000..d85654a4f3 --- /dev/null +++ b/lib/IMAP/Threading/DatabaseMessage.php @@ -0,0 +1,91 @@ + + * + * @author 2020 Christoph Wurst + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +namespace OCA\Mail\IMAP\Threading; + +use function json_decode; + +class DatabaseMessage extends Message { + + /** @var int */ + private $databaseId; + + /** @var string|null */ + private $threadRootId; + + /** @var bool */ + private $dirty = false; + + public function __construct(int $databaseId, + string $subject, + string $id, + array $references, + ?string $threadRootId) { + parent::__construct($subject, $id, $references); + + $this->databaseId = $databaseId; + $this->threadRootId = $threadRootId; + } + + public static function fromRowData(int $id, + string $subject, + ?string $messageId, + ?string $references, + ?string $inReplyTo, + ?string $threadRootId): self { + $referencesForThreading = $references !== null ? json_decode($references, true) : []; + if (!empty($inReplyTo)) { + $referencesForThreading[] = $inReplyTo; + } + + return new self( + $id, + $subject, + $messageId, + $referencesForThreading, + $threadRootId + ); + } + + public function getDatabaseId(): int { + return $this->databaseId; + } + + public function getThreadRootId(): ?string { + return $this->threadRootId; + } + + public function setThreadRootId(?string $threadRootId): void { + // Only update the thread ID if it has a value, is different and we haven't set one before + if ($threadRootId !== null && $this->threadRootId !== $threadRootId && !$this->dirty) { + $this->dirty = true; + $this->threadRootId = $threadRootId; + } + } + + public function isDirty(): bool { + return $this->dirty; + } +} diff --git a/lib/Listener/AccountSynchronizedThreadUpdaterListener.php b/lib/Listener/AccountSynchronizedThreadUpdaterListener.php new file mode 100644 index 0000000000..aa167059b7 --- /dev/null +++ b/lib/Listener/AccountSynchronizedThreadUpdaterListener.php @@ -0,0 +1,106 @@ + + * + * @author 2020 Christoph Wurst + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +namespace OCA\Mail\Listener; + +use Generator; +use OCA\Mail\Db\MessageMapper; +use OCA\Mail\Events\SynchronizationEvent; +use OCA\Mail\IMAP\Threading\Container; +use OCA\Mail\IMAP\Threading\DatabaseMessage; +use OCA\Mail\IMAP\Threading\ThreadBuilder; +use OCP\EventDispatcher\Event; +use OCP\EventDispatcher\IEventListener; +use OCP\ILogger; +use function array_chunk; +use function array_filter; +use function iterator_to_array; + +class AccountSynchronizedThreadUpdaterListener implements IEventListener { + + /** @var MessageMapper */ + private $mapper; + + /** @var ThreadBuilder */ + private $builder; + + /** @var ILogger */ + private $logger; + + public function __construct(MessageMapper $mapper, + ThreadBuilder $builder, + ILogger $logger) { + $this->mapper = $mapper; + $this->builder = $builder; + $this->logger = $logger; + } + + public function handle(Event $event): void { + if (!($event instanceof SynchronizationEvent)) { + // Unrelated + return; + } + + $accountId = $event->getAccount()->getId(); + $messages = $this->mapper->findThreadingData($event->getAccount()); + $this->logger->debug("Account $accountId has " . count($messages) . " messages for threading"); + $threads = $this->builder->build($messages); + $this->logger->debug("Account $accountId has " . count($threads) . " threads"); + $flattened = iterator_to_array($this->flattenThreads($threads), false); + $needUpdate = array_filter($flattened, function (DatabaseMessage $msg) { + return $msg->isDirty(); + }); + $this->logger->debug("Account $accountId has " . count($needUpdate) . " messages with a new thread ID"); + foreach (array_chunk($needUpdate, 500) as $chunk) { + $this->mapper->writeThreadIds($chunk); + } + } + + /** + * @param Container[] $threads + * + * @return DatabaseMessage[]|Generator + */ + private function flattenThreads(array $threads, + ?string $threadId = null): Generator { + foreach ($threads as $thread) { + if (($message = $thread->getMessage()) !== null) { + /** @var DatabaseMessage $message */ + if ($threadId === null) { + // No parent -> let's use own ID + $message->setThreadRootId($message->getId()); + } else { + $message->setThreadRootId($threadId); + } + yield $message; + } + + yield from $this->flattenThreads( + $thread->getChildren(), + $message === null ? null : $message->getId() + ); + } + } +} diff --git a/lib/Service/Sync/ImapToDbSynchronizer.php b/lib/Service/Sync/ImapToDbSynchronizer.php index 2f17ed779c..b1a643b0dc 100644 --- a/lib/Service/Sync/ImapToDbSynchronizer.php +++ b/lib/Service/Sync/ImapToDbSynchronizer.php @@ -31,6 +31,7 @@ use OCA\Mail\Db\Mailbox; use OCA\Mail\Db\MailboxMapper; use OCA\Mail\Db\MessageMapper as DatabaseMessageMapper; +use OCA\Mail\Events\SynchronizationEvent; use OCA\Mail\Events\NewMessagesSynchronized; use OCA\Mail\Exception\ClientException; use OCA\Mail\Exception\IncompleteSyncException; @@ -115,9 +116,15 @@ public function syncAccount(Account $account, $mailbox, $criteria, null, - $force + $force, + true ); } + $this->dispatcher->dispatchTyped( + new SynchronizationEvent( + $account + ) + ); } /** @@ -166,8 +173,6 @@ private function resetCache(Account $account, Mailbox $mailbox): void { } /** - * @param int[] $knownUids - * * @throws ClientException * @throws MailboxNotCachedException * @throws ServiceException @@ -176,7 +181,8 @@ public function sync(Account $account, Mailbox $mailbox, int $criteria = Horde_Imap_Client::SYNC_NEWMSGSUIDS | Horde_Imap_Client::SYNC_FLAGSUIDS | Horde_Imap_Client::SYNC_VANISHEDUIDS, array $knownUids = null, - bool $force = false): void { + bool $force = false, + bool $batchSync = false): void { if ($mailbox->getSelectable() === false) { return; } @@ -222,6 +228,14 @@ public function sync(Account $account, $this->mailboxMapper->unlockFromNewSync($mailbox); } } + + if (!$batchSync) { + $this->dispatcher->dispatchTyped( + new SynchronizationEvent( + $account + ) + ); + } } /** From 73cbe93135a921836de4ccc90dd898675c908d99 Mon Sep 17 00:00:00 2001 From: Christoph Wurst Date: Fri, 10 Jul 2020 17:21:51 +0200 Subject: [PATCH 2/2] Do not build an array with all messages but just the dirty ones Signed-off-by: Christoph Wurst --- .../AccountSynchronizedThreadUpdaterListener.php | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/Listener/AccountSynchronizedThreadUpdaterListener.php b/lib/Listener/AccountSynchronizedThreadUpdaterListener.php index aa167059b7..e248c6b470 100644 --- a/lib/Listener/AccountSynchronizedThreadUpdaterListener.php +++ b/lib/Listener/AccountSynchronizedThreadUpdaterListener.php @@ -35,7 +35,6 @@ use OCP\EventDispatcher\IEventListener; use OCP\ILogger; use function array_chunk; -use function array_filter; use function iterator_to_array; class AccountSynchronizedThreadUpdaterListener implements IEventListener { @@ -69,11 +68,8 @@ public function handle(Event $event): void { $threads = $this->builder->build($messages); $this->logger->debug("Account $accountId has " . count($threads) . " threads"); $flattened = iterator_to_array($this->flattenThreads($threads), false); - $needUpdate = array_filter($flattened, function (DatabaseMessage $msg) { - return $msg->isDirty(); - }); - $this->logger->debug("Account $accountId has " . count($needUpdate) . " messages with a new thread ID"); - foreach (array_chunk($needUpdate, 500) as $chunk) { + $this->logger->debug("Account $accountId has " . count($flattened) . " messages with a new thread ID"); + foreach (array_chunk($flattened, 500) as $chunk) { $this->mapper->writeThreadIds($chunk); } } @@ -94,7 +90,9 @@ private function flattenThreads(array $threads, } else { $message->setThreadRootId($threadId); } - yield $message; + if ($message->isDirty()) { + yield $message; + } } yield from $this->flattenThreads(