Coverage for src / mesh / models / roles / role_handler.py: 81%
142 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-05-04 12:41 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-05-04 12:41 +0000
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar
5from django.core.exceptions import ImproperlyConfigured
6from django.http import HttpRequest
8from mesh.models.exceptions import RoleException
9from mesh.models.roles.author import Author
10from mesh.models.roles.base_role import Role
11from mesh.models.roles.editor import Editor
12from mesh.models.roles.journal_manager import JournalManager
13from mesh.models.roles.reviewer import Reviewer
14from mesh.models.user.user_interfaces import ImpersonateData
16if TYPE_CHECKING:
17 from mesh.models.user_models import User
18# Cache of the RoleHandler at request level. The RoleHandler might be derived
19# at middleware level.
20ROLE_HANDLER_CACHE = "role_handler_cache"
21ROLE_CLASSES: list[type[Role]] = [Author, Reviewer, Editor, JournalManager]
23if len({role_cls.code() for role_cls in ROLE_CLASSES}) != len(ROLE_CLASSES):
24 data_str = " - ".join(f"{role_cls}: {role_cls.code()}" for role_cls in ROLE_CLASSES)
25 raise ImproperlyConfigured(f"At least 2 distinct roles have the same code: {data_str}")
28def get_role_class_from_code(code: str | None) -> type[Role] | None:
29 if code is None:
30 return None
32 for role_cls in ROLE_CLASSES:
33 if role_cls.code() == code:
34 return role_cls
35 return None
38@dataclass
39class RoleData:
40 """
41 Contains all the roles and related data accessible to an user.
42 The attribute name much match the `_CODE` of the role class (which should also
43 match the python file name for consistency).
44 """
46 author: Author
47 reviewer: Reviewer
48 editor: Editor
49 journal_manager: JournalManager
51 def get_roles(self) -> list[Role]:
52 """
53 Get the list of all roles for an user.
54 The order in the list is the priority order to get the user's default role.
55 """
56 return [self.journal_manager, self.editor, self.reviewer, self.author]
58 def __getitem__(self, key) -> Role | None:
59 return getattr(self, key, None)
61 def default_role(self) -> Role:
62 """
63 Get the user's default role.
64 A role is available to the user if `role.is_active() = True`
65 """
66 for role in self.get_roles():
67 if role.is_active:
68 return role
69 raise RoleException("There is no default role for this user. Should not happen.")
71 def get_active_roles(self) -> list[Role]:
72 """
73 Returns the list of active roles for the current user.
74 """
75 return [role for role in self.get_roles() if role.is_active]
77 @classmethod
78 def from_user(cls, user: "User") -> Self:
79 return cls(
80 author=Author(user),
81 reviewer=Reviewer(user),
82 editor=Editor(user),
83 journal_manager=JournalManager(user),
84 )
87class RoleHandler:
88 """
89 Handler for the role logic & data of an user.
90 """
92 user: "User"
93 current_role: Role
94 role_data: RoleData
95 # Whether the `complete_init` method has been called.
96 init_complete = False
97 # The RoleHandler is usually instantiated on each request for the current user
98 # and is used to derive the user's current role.
99 # We keep a reference of the enclosing request here.
100 # The value can remain `None`, for ex. if the role handler is instantiated just to
101 # check some global rights for a given user.
102 request: HttpRequest | None = None
104 @property
105 def impersonate_data(self) -> ImpersonateData | None:
106 if self.request is None:
107 return None
108 return ImpersonateData.from_session(self.request.session)
110 def __init__(
111 self, user: "User", request: HttpRequest | None = None, partial_init: bool = False
112 ) -> None:
113 """
114 The initialization is split in 2 steps:
115 - `init_user_roles` Populates the given user role data.
116 - `complete_init` Process the given request to derive request
117 dependent attributes (ex. impersonate data)
118 and derive the user current role and related
119 variables
121 This is useful when we only want an user's role data without any impact on the
122 database (for ex: see `TokenBackend`).
123 Params:
124 - `user` The user for which the roles & corresponding rights
125 should be derived.
126 - `request` A reference to the enclosing HTTP request (Optional).
127 - `partial_init` Whether to perform the second step `complete_init`.
128 Default to `False`.
129 """
130 self.user = user
131 self.init_user_roles()
132 self.request = request
134 if not partial_init:
135 self.complete_init()
137 def complete_init(self):
138 """
139 Finalize the initialization of the role_handler. It consists mainly in deriving
140 the user's current role and setting some related variables.
141 """
142 if self.init_complete:
143 return
145 role = self.get_current_active_role()
147 self.set_current_role(role)
149 # Set the impersonation boolean.
150 # Current impersonator cannot further impersonate another user (we prevent
151 # nested impersonated users, would be too messy).
152 if self.request:
153 self.request.session["can_impersonate"] = (
154 self.impersonate_data is None and self.check_global_rights("can_impersonate")
155 )
156 self.init_complete = True
158 def get_current_active_role(self) -> Role:
159 """
160 Derive the user's current active role.
161 It depends on:
162 - The current role stored in DB
163 - The impersonate data if the user is currently being impersonated.
164 - The user's active roles.
165 """
166 current_role_code = self.user.current_role
168 # In impersonate context, the current_role can be set from the impersonate data.
169 impersonate_data = self.impersonate_data
170 if impersonate_data and impersonate_data.target_role:
171 current_role_code = impersonate_data.target_role
172 elif current_role_code is None:
173 current_role_code = self.role_data.default_role().code()
175 current_role_class = get_role_class_from_code(current_role_code)
176 if current_role_class is None:
177 current_role_class = self.role_data.default_role().__class__
179 # Check that the selected role is active
180 role = self.role_data[current_role_class.code()]
181 if role is None or not role.is_active:
182 role = self.role_data.default_role()
184 return role
186 def get_active_roles(self):
187 """
188 Return all the roles that are active (ie: that a user can select)
189 """
190 return self.role_data.get_active_roles()
192 def set_current_role(self, role: Role):
193 """
194 Sets the user current role.
196 IMPORTANT: In case of a current user impersonator, don't update the current
197 role in the database as it would affect the actual impersonated user.
198 Stores the new role in session, to be used when negotiating
199 the user current role in `get_current_active_role`.
200 """
201 self.user.current_role = role.code()
203 impersonate_data = self.impersonate_data
204 if impersonate_data is None:
205 self.user.save()
206 else:
207 impersonate_data.target_role = role.code()
208 impersonate_data.serialize(self.request.session)
210 self.current_role = role
211 if self.request:
212 self.request.session["user_role"] = role.summary().serialize()
213 self.request.session["available_roles"] = [
214 role.summary().serialize() for role in self.role_data.get_active_roles()
215 ]
217 def init_user_roles(self) -> None:
218 """
219 Initialize the role data of the given user.
220 """
221 self.role_data = RoleData.from_user(self.user)
223 def switch_role(self, code: str) -> Role:
224 """
225 Switch the user's current role to the provided code.
226 Return the selected user role.
227 """
228 role = self.role_data[code]
229 if role is None:
230 raise RoleException(f"The role code '{code}' does not exist.")
231 elif not role.is_active:
232 raise RoleException(
233 f"The role '{role.name()}' is not available to user '{self.user}'."
234 )
236 self.set_current_role(role)
238 return role
240 def execute_rights_function(self, role: Role, function_name: str, *args, **kwargs) -> Any:
241 """
242 Execute the given function with the provided arguments for the current role.
243 """
244 if not hasattr(role, function_name):
245 raise RoleException(
246 f"The right function `{function_name}` is not implemented "
247 f"for the role '{role.__class__}'"
248 )
249 function = getattr(role, function_name)
250 if not callable(function):
251 raise RoleException(
252 f"The right function `{function_name}` is not callable "
253 f"for the role '{role.__class__}'"
254 )
256 return function(*args, **kwargs)
258 def check_rights(self, function_name: str, *args, **kwargs) -> bool:
259 """
260 Execute the given function with the provided arguments for the current role.
261 This should only be used for right functions returning boolean values.
262 """
263 return self.execute_rights_function(self.current_role, function_name, *args, **kwargs)
265 def check_global_rights(self, function_name: str, *args, **kwargs) -> bool:
266 """
267 Execute the given function with the provided arguments for every active role
268 of the current user. Returns True if at least one role has the given right,
269 else False.
270 """
271 for role in self.role_data.get_active_roles():
272 result = self.execute_rights_function(role, function_name, *args, **kwargs)
273 if result:
274 return True
275 return False
277 def get_from_rights(self, function_name: str, *args, **kwargs) -> Any:
278 """
279 Execute the given function with the provided arguments for the current role.
280 This should only be used for rights functions with a return type other than
281 `bool`.
282 """
283 return self.execute_rights_function(self.current_role, function_name, *args, **kwargs)
285 def get_attribute(self, attribute_name: str) -> Any:
286 if not hasattr(self.current_role, attribute_name):
287 raise RoleException(
288 f"The attribute `{attribute_name}` is not implemented "
289 f"for the role '{self.current_role.code()}'"
290 )
292 return getattr(self.current_role, attribute_name)
294 def token_authentication_allowed(self) -> bool:
295 """
296 Token authentication is forbidden for user with active Editor+ roles.
297 """
298 roles_token_forbidden = [Editor.code(), JournalManager.code()]
299 return not any(
300 role.code() in roles_token_forbidden for role in self.role_data.get_active_roles()
301 )
304VisitorRT = TypeVar("VisitorRT")
307class RoleVisitor(ABC, Generic[VisitorRT]):
308 role: Role
310 def __init__(self, role: Role, *args, **kwargs):
311 self.role = role
312 super().__init__(*args, **kwargs)
314 def visit(self, role, *args, **kwargs) -> VisitorRT:
315 function_name = f"visit_{role.code()}"
316 ftor = getattr(self, function_name, None)
317 if not callable(ftor):
318 raise LookupError(f"RoleVisitor does not implements {function_name}")
319 return ftor(*args, **kwargs) # type:ignore
321 @abstractmethod
322 def visit_author(self, *args, **kwargs) -> VisitorRT: ...
324 @abstractmethod
325 def visit_editor(self, *args, **kwargs) -> VisitorRT: ...
327 @abstractmethod
328 def visit_journal_manager(self, *args, **kwargs) -> VisitorRT: ...
330 @abstractmethod
331 def visit_reviewer(self, *args, **kwargs) -> VisitorRT: ...