diff --git a/addons/digest/tests/test_digest.py b/addons/digest/tests/test_digest.py index 5db2454eb977b..c074c4d13ce13 100644 --- a/addons/digest/tests/test_digest.py +++ b/addons/digest/tests/test_digest.py @@ -1,11 +1,15 @@ # -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. +from contextlib import contextmanager +from freezegun import freeze_time +from datetime import datetime from dateutil.relativedelta import relativedelta from lxml import html +from unittest.mock import patch from werkzeug.urls import url_encode, url_join -from odoo import fields, SUPERUSER_ID +from odoo import SUPERUSER_ID from odoo.addons.base.tests.common import HttpCaseWithUserDemo from odoo.addons.digest.tests.common import TestDigestCommon from odoo.tests import tagged @@ -14,9 +18,19 @@ class TestDigest(TestDigestCommon): + @contextmanager + def mock_datetime_and_now(self, mock_dt): + """ Used when synchronization date (using env.cr.now()) is important + in addition to standard datetime mocks. Used mainly to detect sync + issues. """ + with freeze_time(mock_dt), \ + patch.object(self.env.cr, 'now', lambda: mock_dt): + yield + @classmethod def setUpClass(cls): super(TestDigest, cls).setUpClass() + cls.reference_datetime = datetime(2024, 2, 13, 13, 30, 0) # clean messages cls.env['mail.message'].search([ @@ -27,12 +41,53 @@ def setUpClass(cls): # clean demo users so that we keep only the test users cls.env['res.users'].search([('login', 'in', ['demo', 'portal'])]).action_archive() - # clean logs so that town down is activated + # clean logs so that town down can be tested cls.env['res.users.log'].search([('create_uid', 'in', (cls.user_admin + cls.user_employee).ids)]).unlink() + # create logs for user_admin + cls._setup_logs_for_users(cls.user_admin, cls.reference_datetime - relativedelta(days=5)) + + with cls.mock_datetime_and_now(cls, cls.reference_datetime): + cls.test_digest, cls.test_digest_2 = cls.env['digest.digest'].create([ + { + "kpi_mail_message_total": True, + "kpi_res_users_connected": True, + "name": "My Digest", + "periodicity": "daily", + }, { + "kpi_mail_message_total": True, + "kpi_res_users_connected": True, + "name": "My Digest", + "periodicity": "weekly", + "user_ids": [(4, cls.user_admin.id), (4, cls.user_employee.id)], + } + ]) + + @classmethod + def _setup_logs_for_users(cls, res_users, log_dt): + with cls.mock_datetime_and_now(cls, log_dt): + for user in res_users: + cls.env['res.users.log'].with_user(SUPERUSER_ID).create({ + 'create_uid': user.id, + }) + + @users('admin') + def test_assert_initial_values(self): + """ Ensure base values for tests """ + test_digest = self.test_digest.with_user(self.env.user) + test_digest_2 = self.test_digest_2.with_user(self.env.user) + self.assertEqual(test_digest.create_date, self.reference_datetime) + self.assertEqual(test_digest.next_run_date, self.reference_datetime.date() + relativedelta(days=1)) + self.assertEqual(test_digest.periodicity, 'daily') + self.assertFalse(test_digest.user_ids) + + self.assertEqual(test_digest_2.create_date, self.reference_datetime) + self.assertEqual(test_digest_2.next_run_date, self.reference_datetime.date() + relativedelta(weeks=1)) + self.assertEqual(test_digest_2.periodicity, 'weekly') + self.assertEqual(test_digest_2.user_ids, self.user_admin + self.user_employee) @users('admin') def test_digest_kpi_res_users_connected_value(self): - self.env['res.users.log'].search([]).unlink() + self.env['res.users.log'].with_user(SUPERUSER_ID).search([]).unlink() # Sanity check initial_values = self.all_digests.mapped('kpi_res_users_connected_value') self.assertEqual(initial_values, [0, 0, 0]) @@ -84,35 +139,8 @@ def test_digest_subscribe(self): "check the user was subscribed as action_subscribe will silently " "ignore subs of non-employees" ) - - @users('admin') - def test_digest_tone_down(self): - digest = self.env['digest.digest'].browse(self.digest_1.ids) - digest._action_subscribe_users(self.user_employee) - - # initial data - self.assertEqual(digest.periodicity, 'daily') - - # no logs for employee -> should tone down periodicity - digest.flush_recordset() - with self.mock_mail_gateway(): - digest.action_send() - - self.assertEqual(digest.periodicity, 'weekly') - - # no logs for employee -> should tone down periodicity - digest.flush_recordset() - with self.mock_mail_gateway(): - digest.action_send() - - self.assertEqual(digest.periodicity, 'monthly') - - # no logs for employee -> should tone down periodicity - digest.flush_recordset() - with self.mock_mail_gateway(): - digest.action_send() - - self.assertEqual(digest.periodicity, 'quarterly') + digest_user.action_unsubscribe() + self.assertFalse(digest_user.is_subscribed) @users('admin') def test_digest_tip_description(self): @@ -144,42 +172,121 @@ def test_digest_tip_description(self): ) @users('admin') - def test_digest_tone_down_wlogs(self): - digest = self.env['digest.digest'].browse(self.digest_1.ids) - digest._action_subscribe_users(self.user_employee) + def test_digest_tone_down(self): + test_digest = self.env['digest.digest'].browse(self.test_digest.ids) + test_digest_2 = self.env['digest.digest'].browse(self.test_digest_2.ids) + test_digest._action_subscribe_users(self.user_employee) + digests = test_digest + test_digest_2 # batch recordset + + # no logs for employee but for admin -> should tone down periodicity of + # first digest, not the second one (admin being subscribed) + digests.flush_recordset() + current_dt = self.reference_datetime + relativedelta(days=1) + with self.mock_datetime_and_now(current_dt), \ + self.mock_mail_gateway(): + digests.action_send() + + self.assertEqual(test_digest.next_run_date, current_dt.date() + relativedelta(weeks=1)) + self.assertEqual(test_digest.periodicity, 'weekly') + self.assertEqual(test_digest_2.next_run_date, current_dt.date() + relativedelta(weeks=1)) + self.assertEqual(test_digest_2.periodicity, 'weekly', + 'Should not have tone down because admin has logs') - # initial data - self.assertEqual(digest.periodicity, 'daily') + # no logs for employee -> should tone down periodicity + with self.mock_datetime_and_now(current_dt), \ + self.mock_mail_gateway(): + digests.action_send() - # logs for employee -> should not tone down - logs = self.env['res.users.log'].with_user(SUPERUSER_ID).create({'create_uid': self.user_employee.id}) - digest.flush_recordset() - with self.mock_mail_gateway(): - digest.action_send() + self.assertEqual(test_digest.next_run_date, current_dt.date() + relativedelta(months=1)) + self.assertEqual(test_digest.periodicity, 'monthly') + self.assertEqual(test_digest_2.next_run_date, current_dt.date() + relativedelta(weeks=1)) + self.assertEqual(test_digest_2.periodicity, 'weekly') - logs.unlink() - logs = self.env['res.users.log'].with_user(SUPERUSER_ID).create({ - 'create_uid': self.user_employee.id, - 'create_date': fields.Datetime.now() - relativedelta(days=20), - }) + # no logs for employee -> should tone down periodicity + with self.mock_datetime_and_now(current_dt), \ + self.mock_mail_gateway(): + digests.action_send() - # logs for employee are more than 3 days old -> should tone down - digest.flush_recordset() - with self.mock_mail_gateway(): - digest.action_send() - self.assertEqual(digest.periodicity, 'weekly') + self.assertEqual(test_digest.next_run_date, current_dt.date() + relativedelta(months=3)) + self.assertEqual(test_digest.periodicity, 'quarterly') + self.assertEqual(test_digest_2.next_run_date, current_dt.date() + relativedelta(weeks=1)) + self.assertEqual(test_digest_2.periodicity, 'weekly') - # logs for employee are more than 2 weeks old -> should tone down - digest.flush_recordset() - with self.mock_mail_gateway(): - digest.action_send() - self.assertEqual(digest.periodicity, 'monthly') + @users('admin') + def test_digest_tone_down_wlogs(self): + digest = self.env['digest.digest'].browse(self.digest_1.ids) + digest._action_subscribe_users(self.user_employee) - # logs for employee are less than 1 month old -> should not tone down - digest.flush_recordset() - with self.mock_mail_gateway(): - digest.action_send() - self.assertEqual(digest.periodicity, 'monthly') + for logs, (periodicity, run_date), (exp_periodicity, exp_run_date) in zip( + [ + # daily + [(self.user_employee, self.reference_datetime)], + [(self.user_employee, self.reference_datetime - relativedelta(days=4))], # old logs -> tone down + [], # no logs -> tone down + # weekly + [(self.user_employee, self.reference_datetime - relativedelta(days=8))], + [(self.user_employee, self.reference_datetime - relativedelta(days=15))], # old logs -> tone down + [], # no logs -> tone down + # monthly + [(self.user_employee, self.reference_datetime - relativedelta(days=25))], + [(self.user_employee, self.reference_datetime - relativedelta(days=32))], # old logs -> tone down + [], # no logs -> tone down + # quarterly + [(self.user_employee, self.reference_datetime - relativedelta(months=2))], + [(self.user_employee, self.reference_datetime - relativedelta(months=4))], # old logs but end of tone down + [], # no logs but end of town down + ], + [ + # daily + ('daily', self.reference_datetime.date()), + ('daily', self.reference_datetime.date()), + ('daily', self.reference_datetime.date()), + # weekly + ('weekly', self.reference_datetime.date()), + ('weekly', self.reference_datetime.date()), + ('weekly', self.reference_datetime.date()), + # monthly + ('monthly', self.reference_datetime.date()), + ('monthly', self.reference_datetime.date()), + ('monthly', self.reference_datetime.date()), + # quarterly + ('quarterly', self.reference_datetime.date()), + ('quarterly', self.reference_datetime.date()), + # ('quarterly', self.reference_datetime.date()), + ], + [ + ('daily', self.reference_datetime.date() + relativedelta(days=1)), # just push date + ('weekly', self.reference_datetime.date() + relativedelta(weeks=1)), # tone down on daily + ('weekly', self.reference_datetime.date() + relativedelta(weeks=1)), # tone down on daily + # weekly + ('weekly', self.reference_datetime.date() + relativedelta(weeks=1)), # just push date + ('monthly', self.reference_datetime.date() + relativedelta(months=1)), # tone down on weekly + ('monthly', self.reference_datetime.date() + relativedelta(months=1)), # tone down on weekly + # monthly + ('monthly', self.reference_datetime.date() + relativedelta(months=1)), # just push date + ('quarterly', self.reference_datetime.date() + relativedelta(months=3)), # tone down on monthly + ('quarterly', self.reference_datetime.date() + relativedelta(months=3)), # tone down on monthly + # quarterly + ('quarterly', self.reference_datetime.date() + relativedelta(months=3)), # just push date + ('quarterly', self.reference_datetime.date() + relativedelta(months=3)), # just push date + ('quarterly', self.reference_datetime.date() + relativedelta(months=3)), # just push date + ], + ): + with self.subTest(logs=logs, periodicity=periodicity, run_date=run_date): + digest.write({ + 'next_run_date': run_date, + 'periodicity': periodicity, + }) + for log_user, log_dt in logs: + self._setup_logs_for_users(log_user, log_dt) + + with self.mock_datetime_and_now(run_date), \ + self.mock_mail_gateway(): + digest.action_send() + + self.assertEqual(digest.next_run_date, exp_run_date) + self.assertEqual(digest.periodicity, exp_periodicity) + self.env['res.users.log'].with_user(SUPERUSER_ID).search([]).unlink() @tagged('-at_install', 'post_install') @@ -199,60 +306,32 @@ def setUp(self): self.base_url = self.test_digest.get_base_url() self.user_demo_unsubscribe_token = self.test_digest._get_unsubscribe_token(self.user_demo.id) - @users('demo') - def test_unsubscribe_classic(self): - self.assertIn(self.user_demo, self.test_digest.user_ids) - self.authenticate(self.env.user.login, self.env.user.login) - - response = self._url_unsubscribe() - self.assertEqual(response.status_code, 200) - self.assertNotIn(self.user_demo, self.test_digest.user_ids) - - @users('demo') - def test_unsubscribe_issues(self): - """ Test when not being member """ - self.test_digest.write({'user_ids': [(3, self.user_demo.id)]}) - self.assertNotIn(self.user_demo, self.test_digest.user_ids) - - # unsubscribe - self.authenticate(self.env.user.login, self.env.user.login) - response = self._url_unsubscribe() - self.assertEqual(response.status_code, 200) - self.assertNotIn(self.user_demo, self.test_digest.user_ids) - - def test_unsubscribe_token(self): - self.assertIn(self.user_demo, self.test_digest.user_ids) - self.authenticate(None, None) - response = self._url_unsubscribe(token=self.user_demo_unsubscribe_token, user_id=self.user_demo.id) - self.assertEqual(response.status_code, 200) - self.test_digest.invalidate_recordset() - self.assertNotIn(self.user_demo, self.test_digest.user_ids) - - def test_unsubscribe_token_one_click(self): - self.assertIn(self.user_demo, self.test_digest.user_ids) - self.authenticate(None, None) - - # Ensure we cannot unregister using GET method (method not allowed) - response = self._url_unsubscribe(token=self.user_demo_unsubscribe_token, user_id=self.user_demo.id, - one_click='1', method='GET') - self.assertEqual(response.status_code, 403, 'GET method is forbidden') - self.test_digest.invalidate_recordset() - self.assertIn(self.user_demo, self.test_digest.user_ids) - - # Ensure we can unregister with POST method - response = self._url_unsubscribe(token=self.user_demo_unsubscribe_token, user_id=self.user_demo.id, - one_click='1', method='POST') - self.assertEqual(response.status_code, 200) - self.test_digest.invalidate_recordset() - self.assertNotIn(self.user_demo, self.test_digest.user_ids) - - def test_unsubscribe_public(self): - """ Check public users are redirected when trying to catch unsubscribe - route. """ - self.authenticate(None, None) - - response = self._url_unsubscribe() - self.assertEqual(response.status_code, 404) + def test_unsubscribe(self): + """ Test various combination of unsubscribe: logged, using token, ... """ + digest = self.test_digest + demo_token = digest._get_unsubscribe_token(self.user_demo.id) + for test_user, is_member, is_logged, token, exp_code in [ + (self.user_demo, True, True, False, 200), # unsubscribe logged, easy + (self.user_demo, False, True, False, 200), # unsubscribe not a member should not crash + (self.user_demo, False, False, demo_token, 200), # unsubscribe using a token + (self.user_demo, False, False, 'probably-not-a-token', 404), # wrong token -> crash + (self.user_demo, False, False, False, 404), # cannot be done unlogged / no token + ]: + with self.subTest(user_name=test_user.name, is_member=is_member, is_logged=is_logged, token=token): + if is_member: + digest._action_subscribe_users(test_user) + self.assertIn(test_user, digest.user_ids) + else: + digest._action_unsubscribe_users(test_user) + self.assertNotIn(test_user, digest.user_ids) + + self.authenticate(test_user.login if is_logged else None, test_user.login if is_logged else None) + if token: + response = self._url_unsubscribe(token=token, user_id=test_user.id) + else: + response = self._url_unsubscribe() + self.assertEqual(response.status_code, exp_code) + self.assertNotIn(test_user, digest.user_ids) def _url_unsubscribe(self, token=None, user_id=None, one_click=None, method='GET'): url_params = {}