diff --git a/airflow/auth/managers/fab/security_manager/override.py b/airflow/auth/managers/fab/security_manager/override.py
index f452fe912cb76..089b449422eba 100644
--- a/airflow/auth/managers/fab/security_manager/override.py
+++ b/airflow/auth/managers/fab/security_manager/override.py
@@ -19,14 +19,26 @@
from functools import cached_property
-from flask import g
+from flask import flash, g
from flask_appbuilder.const import AUTH_DB, AUTH_LDAP, AUTH_OAUTH, AUTH_OID, AUTH_REMOTE_USER
from flask_babel import lazy_gettext
from flask_jwt_extended import JWTManager
from flask_login import LoginManager
+from itsdangerous import want_bytes
+from markupsafe import Markup
from werkzeug.security import generate_password_hash
+from airflow.auth.managers.fab.models import User
from airflow.auth.managers.fab.models.anonymous_user import AnonymousUser
+from airflow.www.session import AirflowDatabaseSessionInterface
+
+# This is the limit of DB user sessions that we consider as "healthy". If you have more sessions that this
+# number then we will refuse to delete sessions that have expired and old user sessions when resetting
+# user's password, and raise a warning in the UI instead. Usually when you have that many sessions, it means
+# that there is something wrong with your deployment - for example you have an automated API call that
+# continuously creates new sessions. Such setup should be fixed by reusing sessions or by periodically
+# purging the old sessions by using `airflow db clean` command.
+MAX_NUM_DATABASE_USER_SESSIONS = 50000
class FabAirflowSecurityManagerOverride:
@@ -230,8 +242,47 @@ def reset_password(self, userid, password):
"""
user = self.get_user_by_id(userid)
user.password = generate_password_hash(password)
+ self.reset_user_sessions(user)
self.update_user(user)
+ def reset_user_sessions(self, user: User) -> None:
+ if isinstance(self.appbuilder.get_app.session_interface, AirflowDatabaseSessionInterface):
+ interface = self.appbuilder.get_app.session_interface
+ session = interface.db.session
+ user_session_model = interface.sql_session_model
+ num_sessions = session.query(user_session_model).count()
+ if num_sessions > MAX_NUM_DATABASE_USER_SESSIONS:
+ flash(
+ Markup(
+ f"The old sessions for user {user.username} have NOT been deleted!
"
+ f"You have a lot ({num_sessions}) of user sessions in the 'SESSIONS' table in "
+ f"your database.
"
+ "This indicates that this deployment might have an automated API calls that create "
+ "and not reuse sessions.
You should consider reusing sessions or cleaning them "
+ "periodically using db clean.
"
+ "Make sure to reset password for the user again after cleaning the session table "
+ "to remove old sessions of the user."
+ ),
+ "warning",
+ )
+ else:
+ for s in session.query(user_session_model):
+ session_details = interface.serializer.loads(want_bytes(s.data))
+ if session_details.get("_user_id") == user.id:
+ session.delete(s)
+ else:
+ flash(
+ Markup(
+ "Since you are using `securecookie` session backend mechanism, we cannot prevent "
+ f"some old sessions for user {user.username} to be reused.
If you want to make sure "
+ "that the user is logged out from all sessions, you should consider using "
+ "`database` session backend mechanism.
You can also change the 'secret_key` "
+ "webserver configuration for all your webserver instances and restart the webserver. "
+ "This however will logout all users from all sessions."
+ ),
+ "warning",
+ )
+
def load_user(self, user_id):
"""Load user by ID."""
return self.get_user_by_id(int(user_id))
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index dc89dcc8143f4..b9ea0c0e3f6f7 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1387,7 +1387,19 @@ webserver:
default: ""
session_backend:
description: |
- The type of backend used to store web session data, can be 'database' or 'securecookie'
+ The type of backend used to store web session data, can be `database` or `securecookie`. For the
+ `database` backend, sessions are store in the database (in `session` table) and they can be
+ managed there (for example when you reset password of the user, all sessions for that user are
+ deleted). For the `securecookie` backend, sessions are stored in encrypted cookies on the client
+ side. The `securecookie` mechanism is 'lighter' than database backend, but sessions are not deleted
+ when you reset password of the user, which means that other than waiting for expiry time, the only
+ way to invalidate all sessions for a user is to change secret_key and restart webserver (which
+ also invalidates and logs out all other user's sessions).
+
+ When you are using `database` backend, make sure to keep your database session table small
+ by periodically running `airflow db clean --table session` command, especially if you have
+ automated API calls that will create a new session for each call rather than reuse the sessions
+ stored in browser cookies.
version_added: 2.2.4
type: string
example: securecookie
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index f9dae372429da..79a804a9e5011 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -38,6 +38,7 @@
from airflow import AirflowException
from airflow.cli.simple_table import AirflowConsole
+from airflow.configuration import conf
from airflow.models import Base
from airflow.utils import timezone
from airflow.utils.db import reflect_tables
@@ -115,6 +116,9 @@ def readable_config(self):
_TableConfig(table_name="celery_tasksetmeta", recency_column_name="date_done"),
]
+if conf.get("webserver", "session_backend") == "database":
+ config_list.append(_TableConfig(table_name="session", recency_column_name="expiry"))
+
config_dict: dict[str, _TableConfig] = {x.orm_model.name: x for x in sorted(config_list)}
diff --git a/tests/www/views/test_views_custom_user_views.py b/tests/www/views/test_views_custom_user_views.py
index 71ab551ab7612..771a103d69553 100644
--- a/tests/www/views/test_views_custom_user_views.py
+++ b/tests/www/views/test_views_custom_user_views.py
@@ -17,7 +17,11 @@
# under the License.
from __future__ import annotations
+from datetime import datetime, timedelta
+from unittest import mock
+
import pytest
+from flask.sessions import SecureCookieSessionInterface
from flask_appbuilder import SQLA
from airflow import settings
@@ -166,3 +170,111 @@ def test_user_model_view_with_delete_access(self):
check_content_in_response("Deleted Row", response)
check_content_not_in_response(user_to_delete.username, response)
assert bool(self.security_manager.get_user_by_id(user_to_delete.id)) is False
+
+
+# type: ignore[attr-defined]
+
+
+class TestResetUserSessions:
+ @classmethod
+ def setup_class(cls):
+ settings.configure_orm()
+
+ def setup_method(self):
+ # We cannot reuse the app in tests (on class level) as in Flask 2.2 this causes
+ # an exception because app context teardown is removed and if even single request is run via app
+ # it cannot be re-intialized again by passing it as constructor to SQLA
+ # This makes the tests slightly slower (but they work with Flask 2.1 and 2.2
+ self.app = application.create_app(testing=True)
+ self.appbuilder = self.app.appbuilder
+ self.app.config["WTF_CSRF_ENABLED"] = False
+ self.security_manager = self.appbuilder.sm
+ self.interface = self.app.session_interface
+ self.model = self.interface.sql_session_model
+ self.serializer = self.interface.serializer
+ self.db = self.interface.db
+ self.db.session.query(self.model).delete()
+ self.db.session.commit()
+ self.db.session.flush()
+ self.user_1 = create_user(
+ self.app,
+ username="user_to_delete_1",
+ role_name="user_to_delete",
+ )
+ self.user_2 = create_user(
+ self.app,
+ username="user_to_delete_2",
+ role_name="user_to_delete",
+ )
+ self.db.session.commit()
+ self.db.session.flush()
+
+ def create_user_db_session(self, session_id: str, time_delta: timedelta, user_id: int):
+ self.db.session.add(
+ self.model(
+ session_id=session_id,
+ data=self.serializer.dumps({"_user_id": user_id}),
+ expiry=datetime.now() + time_delta,
+ )
+ )
+
+ @pytest.mark.parametrize(
+ "time_delta, user_sessions_deleted",
+ [
+ pytest.param(timedelta(days=-1), True, id="Both expired"),
+ pytest.param(timedelta(hours=1), True, id="Both fresh"),
+ pytest.param(timedelta(days=1), True, id="Both future"),
+ ],
+ )
+ def test_reset_user_sessions_delete(self, time_delta: timedelta, user_sessions_deleted: bool):
+
+ self.create_user_db_session("session_id_1", time_delta, self.user_1.id)
+ self.create_user_db_session("session_id_2", time_delta, self.user_2.id)
+ self.db.session.commit()
+ self.db.session.flush()
+ assert self.db.session.query(self.model).count() == 2
+ assert self.get_session_by_id("session_id_1") is not None
+ assert self.get_session_by_id("session_id_2") is not None
+
+ self.security_manager.reset_password(self.user_1.id, "new_password")
+ self.db.session.commit()
+ self.db.session.flush()
+ if user_sessions_deleted:
+ assert self.db.session.query(self.model).count() == 1
+ assert self.get_session_by_id("session_id_1") is None
+ else:
+ assert self.db.session.query(self.model).count() == 2
+ assert self.get_session_by_id("session_id_1") is not None
+
+ def get_session_by_id(self, session_id: str):
+ return self.db.session.query(self.model).filter(self.model.session_id == session_id).scalar()
+
+ @mock.patch("airflow.auth.managers.fab.security_manager.override.flash")
+ @mock.patch("airflow.auth.managers.fab.security_manager.override.MAX_NUM_DATABASE_USER_SESSIONS", 1)
+ def test_refuse_delete(self, flash_mock):
+ self.create_user_db_session("session_id_1", timedelta(days=1), self.user_1.id)
+ self.create_user_db_session("session_id_2", timedelta(days=1), self.user_2.id)
+ self.db.session.commit()
+ self.db.session.flush()
+ assert self.db.session.query(self.model).count() == 2
+ assert self.get_session_by_id("session_id_1") is not None
+ assert self.get_session_by_id("session_id_2") is not None
+ self.security_manager.reset_password(self.user_1.id, "new_password")
+ assert flash_mock.called
+ assert (
+ "The old sessions for user user_to_delete_1 have NOT been deleted!"
+ in flash_mock.call_args[0][0]
+ )
+ assert self.db.session.query(self.model).count() == 2
+ assert self.get_session_by_id("session_id_1") is not None
+ assert self.get_session_by_id("session_id_2") is not None
+
+ @mock.patch("airflow.auth.managers.fab.security_manager.override.flash")
+ def test_warn_securecookie(self, flash_mock):
+ self.app.session_interface = SecureCookieSessionInterface()
+ self.security_manager.reset_password(self.user_1.id, "new_password")
+ assert flash_mock.called
+ assert (
+ "Since you are using `securecookie` session backend mechanism, we cannot"
+ in flash_mock.call_args[0][0]
+ )