From 2caa186935151683076b74357daad83d2538a3f6 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 13 Aug 2023 21:21:43 +0200 Subject: [PATCH] Remove user sessions when resetting password (#33347) * Remove user sessions when resetting password When user's password is reset, we also remove all DB sessions for that user - for database session backend. In case we are using securecookie mechanism, resetting password does not invalidate old sessions, so instead we are displaying warning to the user performing the reset that in order to clear existing sessions of the user, the secure_key needs to be changed and it will invalidate all sessions for all users. Protection has been added in case the number of sessions in the DB is too big to effectively scan and remove sessions for the user. In such case we print warning for the user that sessions have not been reset, and we suggest to improve the way their deployment mechanisms create too many sessions - by either changing the way how automation of the API calls is done and/or by purging the sessions regularly by "airflow db clean". * Update airflow/auth/managers/fab/security_manager/override.py Co-authored-by: Hussein Awala --------- Co-authored-by: Hussein Awala --- .../managers/fab/security_manager/override.py | 53 ++++++++- airflow/config_templates/config.yml | 14 ++- airflow/utils/db_cleanup.py | 4 + .../www/views/test_views_custom_user_views.py | 112 ++++++++++++++++++ 4 files changed, 181 insertions(+), 2 deletions(-) diff --git a/airflow/auth/managers/fab/security_manager/override.py b/airflow/auth/managers/fab/security_manager/override.py index f452fe912cb76..089b449422eba 100644 --- a/airflow/auth/managers/fab/security_manager/override.py +++ b/airflow/auth/managers/fab/security_manager/override.py @@ -19,14 +19,26 @@ from functools import cached_property -from flask import g +from flask import flash, g from flask_appbuilder.const import AUTH_DB, AUTH_LDAP, AUTH_OAUTH, AUTH_OID, AUTH_REMOTE_USER from flask_babel import lazy_gettext from flask_jwt_extended import JWTManager from flask_login import LoginManager +from itsdangerous import want_bytes +from markupsafe import Markup from werkzeug.security import generate_password_hash +from airflow.auth.managers.fab.models import User from airflow.auth.managers.fab.models.anonymous_user import AnonymousUser +from airflow.www.session import AirflowDatabaseSessionInterface + +# This is the limit of DB user sessions that we consider as "healthy". If you have more sessions that this +# number then we will refuse to delete sessions that have expired and old user sessions when resetting +# user's password, and raise a warning in the UI instead. Usually when you have that many sessions, it means +# that there is something wrong with your deployment - for example you have an automated API call that +# continuously creates new sessions. Such setup should be fixed by reusing sessions or by periodically +# purging the old sessions by using `airflow db clean` command. +MAX_NUM_DATABASE_USER_SESSIONS = 50000 class FabAirflowSecurityManagerOverride: @@ -230,8 +242,47 @@ def reset_password(self, userid, password): """ user = self.get_user_by_id(userid) user.password = generate_password_hash(password) + self.reset_user_sessions(user) self.update_user(user) + def reset_user_sessions(self, user: User) -> None: + if isinstance(self.appbuilder.get_app.session_interface, AirflowDatabaseSessionInterface): + interface = self.appbuilder.get_app.session_interface + session = interface.db.session + user_session_model = interface.sql_session_model + num_sessions = session.query(user_session_model).count() + if num_sessions > MAX_NUM_DATABASE_USER_SESSIONS: + flash( + Markup( + f"The old sessions for user {user.username} have NOT been deleted!
" + f"You have a lot ({num_sessions}) of user sessions in the 'SESSIONS' table in " + f"your database.
" + "This indicates that this deployment might have an automated API calls that create " + "and not reuse sessions.
You should consider reusing sessions or cleaning them " + "periodically using db clean.
" + "Make sure to reset password for the user again after cleaning the session table " + "to remove old sessions of the user." + ), + "warning", + ) + else: + for s in session.query(user_session_model): + session_details = interface.serializer.loads(want_bytes(s.data)) + if session_details.get("_user_id") == user.id: + session.delete(s) + else: + flash( + Markup( + "Since you are using `securecookie` session backend mechanism, we cannot prevent " + f"some old sessions for user {user.username} to be reused.
If you want to make sure " + "that the user is logged out from all sessions, you should consider using " + "`database` session backend mechanism.
You can also change the 'secret_key` " + "webserver configuration for all your webserver instances and restart the webserver. " + "This however will logout all users from all sessions." + ), + "warning", + ) + def load_user(self, user_id): """Load user by ID.""" return self.get_user_by_id(int(user_id)) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index d5a5cb298183d..719b1d2f944a6 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1387,7 +1387,19 @@ webserver: default: "" session_backend: description: | - The type of backend used to store web session data, can be 'database' or 'securecookie' + The type of backend used to store web session data, can be `database` or `securecookie`. For the + `database` backend, sessions are store in the database (in `session` table) and they can be + managed there (for example when you reset password of the user, all sessions for that user are + deleted). For the `securecookie` backend, sessions are stored in encrypted cookies on the client + side. The `securecookie` mechanism is 'lighter' than database backend, but sessions are not deleted + when you reset password of the user, which means that other than waiting for expiry time, the only + way to invalidate all sessions for a user is to change secret_key and restart webserver (which + also invalidates and logs out all other user's sessions). + + When you are using `database` backend, make sure to keep your database session table small + by periodically running `airflow db clean --table session` command, especially if you have + automated API calls that will create a new session for each call rather than reuse the sessions + stored in browser cookies. version_added: 2.2.4 type: string example: securecookie diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index f9dae372429da..79a804a9e5011 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -38,6 +38,7 @@ from airflow import AirflowException from airflow.cli.simple_table import AirflowConsole +from airflow.configuration import conf from airflow.models import Base from airflow.utils import timezone from airflow.utils.db import reflect_tables @@ -115,6 +116,9 @@ def readable_config(self): _TableConfig(table_name="celery_tasksetmeta", recency_column_name="date_done"), ] +if conf.get("webserver", "session_backend") == "database": + config_list.append(_TableConfig(table_name="session", recency_column_name="expiry")) + config_dict: dict[str, _TableConfig] = {x.orm_model.name: x for x in sorted(config_list)} diff --git a/tests/www/views/test_views_custom_user_views.py b/tests/www/views/test_views_custom_user_views.py index 71ab551ab7612..771a103d69553 100644 --- a/tests/www/views/test_views_custom_user_views.py +++ b/tests/www/views/test_views_custom_user_views.py @@ -17,7 +17,11 @@ # under the License. from __future__ import annotations +from datetime import datetime, timedelta +from unittest import mock + import pytest +from flask.sessions import SecureCookieSessionInterface from flask_appbuilder import SQLA from airflow import settings @@ -166,3 +170,111 @@ def test_user_model_view_with_delete_access(self): check_content_in_response("Deleted Row", response) check_content_not_in_response(user_to_delete.username, response) assert bool(self.security_manager.get_user_by_id(user_to_delete.id)) is False + + +# type: ignore[attr-defined] + + +class TestResetUserSessions: + @classmethod + def setup_class(cls): + settings.configure_orm() + + def setup_method(self): + # We cannot reuse the app in tests (on class level) as in Flask 2.2 this causes + # an exception because app context teardown is removed and if even single request is run via app + # it cannot be re-intialized again by passing it as constructor to SQLA + # This makes the tests slightly slower (but they work with Flask 2.1 and 2.2 + self.app = application.create_app(testing=True) + self.appbuilder = self.app.appbuilder + self.app.config["WTF_CSRF_ENABLED"] = False + self.security_manager = self.appbuilder.sm + self.interface = self.app.session_interface + self.model = self.interface.sql_session_model + self.serializer = self.interface.serializer + self.db = self.interface.db + self.db.session.query(self.model).delete() + self.db.session.commit() + self.db.session.flush() + self.user_1 = create_user( + self.app, + username="user_to_delete_1", + role_name="user_to_delete", + ) + self.user_2 = create_user( + self.app, + username="user_to_delete_2", + role_name="user_to_delete", + ) + self.db.session.commit() + self.db.session.flush() + + def create_user_db_session(self, session_id: str, time_delta: timedelta, user_id: int): + self.db.session.add( + self.model( + session_id=session_id, + data=self.serializer.dumps({"_user_id": user_id}), + expiry=datetime.now() + time_delta, + ) + ) + + @pytest.mark.parametrize( + "time_delta, user_sessions_deleted", + [ + pytest.param(timedelta(days=-1), True, id="Both expired"), + pytest.param(timedelta(hours=1), True, id="Both fresh"), + pytest.param(timedelta(days=1), True, id="Both future"), + ], + ) + def test_reset_user_sessions_delete(self, time_delta: timedelta, user_sessions_deleted: bool): + + self.create_user_db_session("session_id_1", time_delta, self.user_1.id) + self.create_user_db_session("session_id_2", time_delta, self.user_2.id) + self.db.session.commit() + self.db.session.flush() + assert self.db.session.query(self.model).count() == 2 + assert self.get_session_by_id("session_id_1") is not None + assert self.get_session_by_id("session_id_2") is not None + + self.security_manager.reset_password(self.user_1.id, "new_password") + self.db.session.commit() + self.db.session.flush() + if user_sessions_deleted: + assert self.db.session.query(self.model).count() == 1 + assert self.get_session_by_id("session_id_1") is None + else: + assert self.db.session.query(self.model).count() == 2 + assert self.get_session_by_id("session_id_1") is not None + + def get_session_by_id(self, session_id: str): + return self.db.session.query(self.model).filter(self.model.session_id == session_id).scalar() + + @mock.patch("airflow.auth.managers.fab.security_manager.override.flash") + @mock.patch("airflow.auth.managers.fab.security_manager.override.MAX_NUM_DATABASE_USER_SESSIONS", 1) + def test_refuse_delete(self, flash_mock): + self.create_user_db_session("session_id_1", timedelta(days=1), self.user_1.id) + self.create_user_db_session("session_id_2", timedelta(days=1), self.user_2.id) + self.db.session.commit() + self.db.session.flush() + assert self.db.session.query(self.model).count() == 2 + assert self.get_session_by_id("session_id_1") is not None + assert self.get_session_by_id("session_id_2") is not None + self.security_manager.reset_password(self.user_1.id, "new_password") + assert flash_mock.called + assert ( + "The old sessions for user user_to_delete_1 have NOT been deleted!" + in flash_mock.call_args[0][0] + ) + assert self.db.session.query(self.model).count() == 2 + assert self.get_session_by_id("session_id_1") is not None + assert self.get_session_by_id("session_id_2") is not None + + @mock.patch("airflow.auth.managers.fab.security_manager.override.flash") + def test_warn_securecookie(self, flash_mock): + self.app.session_interface = SecureCookieSessionInterface() + self.security_manager.reset_password(self.user_1.id, "new_password") + assert flash_mock.called + assert ( + "Since you are using `securecookie` session backend mechanism, we cannot" + in flash_mock.call_args[0][0] + )