Skip to content

Commit

Permalink
Merge pull request #2125 from nextcloud/feature/threading
Browse files Browse the repository at this point in the history
Build message threads after each sync and store thread root ID for later reference
  • Loading branch information
ChristophWurst authored Jul 13, 2020
2 parents 437d06e + 73cbe93 commit 700f28c
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 6 deletions.
3 changes: 3 additions & 0 deletions lib/AppInfo/Application.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
use OCA\Mail\Contracts\IUserPreferences;
use OCA\Mail\Events\BeforeMessageDeletedEvent;
use OCA\Mail\Events\DraftSavedEvent;
use OCA\Mail\Events\SynchronizationEvent;
use OCA\Mail\Events\MessageDeletedEvent;
use OCA\Mail\Events\MessageFlaggedEvent;
use OCA\Mail\Events\MessageSentEvent;
Expand All @@ -44,6 +45,7 @@
use OCA\Mail\Listener\DraftMailboxCreatorListener;
use OCA\Mail\Listener\FlagRepliedMessageListener;
use OCA\Mail\Listener\InteractionListener;
use OCA\Mail\Listener\AccountSynchronizedThreadUpdaterListener;
use OCA\Mail\Listener\MessageCacheUpdaterListener;
use OCA\Mail\Listener\NewMessageClassificationListener;
use OCA\Mail\Listener\SaveSentMessageListener;
Expand Down Expand Up @@ -111,6 +113,7 @@ private function registerEvents(IAppContainer $container): void {
$dispatcher->addServiceListener(MessageSentEvent::class, SaveSentMessageListener::class);
$dispatcher->addServiceListener(NewMessagesSynchronized::class, NewMessageClassificationListener::class);
$dispatcher->addServiceListener(SaveDraftEvent::class, DraftMailboxCreatorListener::class);
$dispatcher->addServiceListener(SynchronizationEvent::class, AccountSynchronizedThreadUpdaterListener::class);
$dispatcher->addServiceListener(UserDeletedEvent::class, UserDeletedListener::class);
}
}
66 changes: 64 additions & 2 deletions lib/Db/MessageMapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
namespace OCA\Mail\Db;

use Horde_Imap_Client;
use OCA\Mail\Account;
use OCA\Mail\Address;
use OCA\Mail\AddressList;
use OCA\Mail\IMAP\Threading\DatabaseMessage;
use OCA\Mail\Service\Search\SearchQuery;
use OCP\AppFramework\Db\QBMapper;
use OCP\AppFramework\Utility\ITimeFactory;
Expand Down Expand Up @@ -86,6 +88,66 @@ public function findAllUids(Mailbox $mailbox): array {
return $this->findUids($query);
}

/**
* @param Account $account
*
* @return DatabaseMessage[]
*/
public function findThreadingData(Account $account): array {
$mailboxesQuery = $this->db->getQueryBuilder();
$messagesQuery = $this->db->getQueryBuilder();

$mailboxesQuery->select('id')
->from('mail_mailboxes')
->where($mailboxesQuery->expr()->eq('account_id', $messagesQuery->createNamedParameter($account->getId(), IQueryBuilder::PARAM_INT), IQueryBuilder::PARAM_INT));
$messagesQuery->select('id', 'subject', 'message_id', 'in_reply_to', 'references', 'thread_root_id')
->from($this->getTableName())
->where($messagesQuery->expr()->in('mailbox_id', $messagesQuery->createFunction($mailboxesQuery->getSQL()), IQueryBuilder::PARAM_INT_ARRAY))
->andWhere($messagesQuery->expr()->isNotNull('message_id'));

$result = $messagesQuery->execute();
$messages = array_map(function (array $row) {
return DatabaseMessage::fromRowData(
(int)$row['id'],
$row['subject'],
$row['message_id'],
$row['references'],
$row['in_reply_to'],
$row['thread_root_id']
);
}, $result->fetchAll());
$result->closeCursor();

return $messages;
}

/**
* @param DatabaseMessage[] $messages
*
* @todo combine threads and send just one query per thread, like UPDATE ... SET thread_root_id = xxx where UID IN (...)
*/
public function writeThreadIds(array $messages): void {
$this->db->beginTransaction();

$query = $this->db->getQueryBuilder();
$query->update($this->getTableName())
->set('thread_root_id', $query->createParameter('thread_root_id'))
->where($query->expr()->eq('id', $query->createParameter('id')));

foreach ($messages as $message) {
$query->setParameter(
'thread_root_id',
$message->getThreadRootId(),
$message->getThreadRootId() === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR
);
$query->setParameter('id', $message->getDatabaseId(), IQueryBuilder::PARAM_INT);

$query->execute();
}

$this->db->commit();
}

public function insertBulk(Message ...$messages): void {
$this->db->beginTransaction();

Expand Down Expand Up @@ -124,7 +186,7 @@ public function insertBulk(Message ...$messages): void {
$references = $message->getReferences();
$qb1->setParameter('references', $references, $references === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR);
$threadRootId = $message->getThreadRootId();
$qb1->setParameter('thread_root_id', $threadRootId,$threadRootId === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR);
$qb1->setParameter('thread_root_id', $threadRootId, $threadRootId === null ? IQueryBuilder::PARAM_NULL : IQueryBuilder::PARAM_STR);
$qb1->setParameter('mailbox_id', $message->getMailboxId(), IQueryBuilder::PARAM_INT);
$qb1->setParameter('subject', $message->getSubject(), IQueryBuilder::PARAM_STR);
$qb1->setParameter('sent_at', $message->getSentAt(), IQueryBuilder::PARAM_INT);
Expand Down Expand Up @@ -335,7 +397,7 @@ public function findUidsByQuery(Mailbox $mailbox, SearchQuery $query, ?int $limi
$select->andWhere(
$qb->expr()->iLike(
'subject',
$qb->createNamedParameter('%' . $this->db->escapeLikeParameter($query-> $query->getSubject()) . '%', IQueryBuilder::PARAM_STR),
$qb->createNamedParameter('%' . $this->db->escapeLikeParameter($query->$query->getSubject()) . '%', IQueryBuilder::PARAM_STR),
IQueryBuilder::PARAM_STR
)
);
Expand Down
45 changes: 45 additions & 0 deletions lib/Events/SynchronizationEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

/**
* @copyright 2020 Christoph Wurst <christoph@winzerhof-wurst.at>
*
* @author 2020 Christoph Wurst <christoph@winzerhof-wurst.at>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

namespace OCA\Mail\Events;

use OCA\Mail\Account;
use OCP\EventDispatcher\Event;

class SynchronizationEvent extends Event {

/** @var Account */
private $account;

public function __construct(Account $account) {
parent::__construct();

$this->account = $account;
}

public function getAccount(): Account {
return $this->account;
}
}
91 changes: 91 additions & 0 deletions lib/IMAP/Threading/DatabaseMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?php

declare(strict_types=1);

/**
* @copyright 2020 Christoph Wurst <christoph@winzerhof-wurst.at>
*
* @author 2020 Christoph Wurst <christoph@winzerhof-wurst.at>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

namespace OCA\Mail\IMAP\Threading;

use function json_decode;

class DatabaseMessage extends Message {

/** @var int */
private $databaseId;

/** @var string|null */
private $threadRootId;

/** @var bool */
private $dirty = false;

public function __construct(int $databaseId,
string $subject,
string $id,
array $references,
?string $threadRootId) {
parent::__construct($subject, $id, $references);

$this->databaseId = $databaseId;
$this->threadRootId = $threadRootId;
}

public static function fromRowData(int $id,
string $subject,
?string $messageId,
?string $references,
?string $inReplyTo,
?string $threadRootId): self {
$referencesForThreading = $references !== null ? json_decode($references, true) : [];
if (!empty($inReplyTo)) {
$referencesForThreading[] = $inReplyTo;
}

return new self(
$id,
$subject,
$messageId,
$referencesForThreading,
$threadRootId
);
}

public function getDatabaseId(): int {
return $this->databaseId;
}

public function getThreadRootId(): ?string {
return $this->threadRootId;
}

public function setThreadRootId(?string $threadRootId): void {
// Only update the thread ID if it has a value, is different and we haven't set one before
if ($threadRootId !== null && $this->threadRootId !== $threadRootId && !$this->dirty) {
$this->dirty = true;
$this->threadRootId = $threadRootId;
}
}

public function isDirty(): bool {
return $this->dirty;
}
}
104 changes: 104 additions & 0 deletions lib/Listener/AccountSynchronizedThreadUpdaterListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?php

declare(strict_types=1);

/**
* @copyright 2020 Christoph Wurst <christoph@winzerhof-wurst.at>
*
* @author 2020 Christoph Wurst <christoph@winzerhof-wurst.at>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

namespace OCA\Mail\Listener;

use Generator;
use OCA\Mail\Db\MessageMapper;
use OCA\Mail\Events\SynchronizationEvent;
use OCA\Mail\IMAP\Threading\Container;
use OCA\Mail\IMAP\Threading\DatabaseMessage;
use OCA\Mail\IMAP\Threading\ThreadBuilder;
use OCP\EventDispatcher\Event;
use OCP\EventDispatcher\IEventListener;
use OCP\ILogger;
use function array_chunk;
use function iterator_to_array;

class AccountSynchronizedThreadUpdaterListener implements IEventListener {

/** @var MessageMapper */
private $mapper;

/** @var ThreadBuilder */
private $builder;

/** @var ILogger */
private $logger;

public function __construct(MessageMapper $mapper,
ThreadBuilder $builder,
ILogger $logger) {
$this->mapper = $mapper;
$this->builder = $builder;
$this->logger = $logger;
}

public function handle(Event $event): void {
if (!($event instanceof SynchronizationEvent)) {
// Unrelated
return;
}

$accountId = $event->getAccount()->getId();
$messages = $this->mapper->findThreadingData($event->getAccount());
$this->logger->debug("Account $accountId has " . count($messages) . " messages for threading");
$threads = $this->builder->build($messages);
$this->logger->debug("Account $accountId has " . count($threads) . " threads");
$flattened = iterator_to_array($this->flattenThreads($threads), false);
$this->logger->debug("Account $accountId has " . count($flattened) . " messages with a new thread ID");
foreach (array_chunk($flattened, 500) as $chunk) {
$this->mapper->writeThreadIds($chunk);
}
}

/**
* @param Container[] $threads
*
* @return DatabaseMessage[]|Generator
*/
private function flattenThreads(array $threads,
?string $threadId = null): Generator {
foreach ($threads as $thread) {
if (($message = $thread->getMessage()) !== null) {
/** @var DatabaseMessage $message */
if ($threadId === null) {
// No parent -> let's use own ID
$message->setThreadRootId($message->getId());
} else {
$message->setThreadRootId($threadId);
}
if ($message->isDirty()) {
yield $message;
}
}

yield from $this->flattenThreads(
$thread->getChildren(),
$message === null ? null : $message->getId()
);
}
}
}
Loading

0 comments on commit 700f28c

Please sign in to comment.