Coverage for src / mesh / models / roles / role_handler.py: 81%

142 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-05-04 12:41 +0000

1from abc import ABC, abstractmethod 

2from dataclasses import dataclass 

3from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar 

4 

5from django.core.exceptions import ImproperlyConfigured 

6from django.http import HttpRequest 

7 

8from mesh.models.exceptions import RoleException 

9from mesh.models.roles.author import Author 

10from mesh.models.roles.base_role import Role 

11from mesh.models.roles.editor import Editor 

12from mesh.models.roles.journal_manager import JournalManager 

13from mesh.models.roles.reviewer import Reviewer 

14from mesh.models.user.user_interfaces import ImpersonateData 

15 

16if TYPE_CHECKING: 

17 from mesh.models.user_models import User 

18# Cache of the RoleHandler at request level. The RoleHandler might be derived 

19# at middleware level. 

20ROLE_HANDLER_CACHE = "role_handler_cache" 

21ROLE_CLASSES: list[type[Role]] = [Author, Reviewer, Editor, JournalManager] 

22 

23if len({role_cls.code() for role_cls in ROLE_CLASSES}) != len(ROLE_CLASSES): 

24 data_str = " - ".join(f"{role_cls}: {role_cls.code()}" for role_cls in ROLE_CLASSES) 

25 raise ImproperlyConfigured(f"At least 2 distinct roles have the same code: {data_str}") 

26 

27 

28def get_role_class_from_code(code: str | None) -> type[Role] | None: 

29 if code is None: 

30 return None 

31 

32 for role_cls in ROLE_CLASSES: 

33 if role_cls.code() == code: 

34 return role_cls 

35 return None 

36 

37 

38@dataclass 

39class RoleData: 

40 """ 

41 Contains all the roles and related data accessible to an user. 

42 The attribute name much match the `_CODE` of the role class (which should also 

43 match the python file name for consistency). 

44 """ 

45 

46 author: Author 

47 reviewer: Reviewer 

48 editor: Editor 

49 journal_manager: JournalManager 

50 

51 def get_roles(self) -> list[Role]: 

52 """ 

53 Get the list of all roles for an user. 

54 The order in the list is the priority order to get the user's default role. 

55 """ 

56 return [self.journal_manager, self.editor, self.reviewer, self.author] 

57 

58 def __getitem__(self, key) -> Role | None: 

59 return getattr(self, key, None) 

60 

61 def default_role(self) -> Role: 

62 """ 

63 Get the user's default role. 

64 A role is available to the user if `role.is_active() = True` 

65 """ 

66 for role in self.get_roles(): 

67 if role.is_active: 

68 return role 

69 raise RoleException("There is no default role for this user. Should not happen.") 

70 

71 def get_active_roles(self) -> list[Role]: 

72 """ 

73 Returns the list of active roles for the current user. 

74 """ 

75 return [role for role in self.get_roles() if role.is_active] 

76 

77 @classmethod 

78 def from_user(cls, user: "User") -> Self: 

79 return cls( 

80 author=Author(user), 

81 reviewer=Reviewer(user), 

82 editor=Editor(user), 

83 journal_manager=JournalManager(user), 

84 ) 

85 

86 

87class RoleHandler: 

88 """ 

89 Handler for the role logic & data of an user. 

90 """ 

91 

92 user: "User" 

93 current_role: Role 

94 role_data: RoleData 

95 # Whether the `complete_init` method has been called. 

96 init_complete = False 

97 # The RoleHandler is usually instantiated on each request for the current user 

98 # and is used to derive the user's current role. 

99 # We keep a reference of the enclosing request here. 

100 # The value can remain `None`, for ex. if the role handler is instantiated just to 

101 # check some global rights for a given user. 

102 request: HttpRequest | None = None 

103 

104 @property 

105 def impersonate_data(self) -> ImpersonateData | None: 

106 if self.request is None: 

107 return None 

108 return ImpersonateData.from_session(self.request.session) 

109 

110 def __init__( 

111 self, user: "User", request: HttpRequest | None = None, partial_init: bool = False 

112 ) -> None: 

113 """ 

114 The initialization is split in 2 steps: 

115 - `init_user_roles` Populates the given user role data. 

116 - `complete_init` Process the given request to derive request 

117 dependent attributes (ex. impersonate data) 

118 and derive the user current role and related 

119 variables 

120 

121 This is useful when we only want an user's role data without any impact on the 

122 database (for ex: see `TokenBackend`). 

123 Params: 

124 - `user` The user for which the roles & corresponding rights 

125 should be derived. 

126 - `request` A reference to the enclosing HTTP request (Optional). 

127 - `partial_init` Whether to perform the second step `complete_init`. 

128 Default to `False`. 

129 """ 

130 self.user = user 

131 self.init_user_roles() 

132 self.request = request 

133 

134 if not partial_init: 

135 self.complete_init() 

136 

137 def complete_init(self): 

138 """ 

139 Finalize the initialization of the role_handler. It consists mainly in deriving 

140 the user's current role and setting some related variables. 

141 """ 

142 if self.init_complete: 

143 return 

144 

145 role = self.get_current_active_role() 

146 

147 self.set_current_role(role) 

148 

149 # Set the impersonation boolean. 

150 # Current impersonator cannot further impersonate another user (we prevent 

151 # nested impersonated users, would be too messy). 

152 if self.request: 

153 self.request.session["can_impersonate"] = ( 

154 self.impersonate_data is None and self.check_global_rights("can_impersonate") 

155 ) 

156 self.init_complete = True 

157 

158 def get_current_active_role(self) -> Role: 

159 """ 

160 Derive the user's current active role. 

161 It depends on: 

162 - The current role stored in DB 

163 - The impersonate data if the user is currently being impersonated. 

164 - The user's active roles. 

165 """ 

166 current_role_code = self.user.current_role 

167 

168 # In impersonate context, the current_role can be set from the impersonate data. 

169 impersonate_data = self.impersonate_data 

170 if impersonate_data and impersonate_data.target_role: 

171 current_role_code = impersonate_data.target_role 

172 elif current_role_code is None: 

173 current_role_code = self.role_data.default_role().code() 

174 

175 current_role_class = get_role_class_from_code(current_role_code) 

176 if current_role_class is None: 

177 current_role_class = self.role_data.default_role().__class__ 

178 

179 # Check that the selected role is active 

180 role = self.role_data[current_role_class.code()] 

181 if role is None or not role.is_active: 

182 role = self.role_data.default_role() 

183 

184 return role 

185 

186 def get_active_roles(self): 

187 """ 

188 Return all the roles that are active (ie: that a user can select) 

189 """ 

190 return self.role_data.get_active_roles() 

191 

192 def set_current_role(self, role: Role): 

193 """ 

194 Sets the user current role. 

195 

196 IMPORTANT: In case of a current user impersonator, don't update the current 

197 role in the database as it would affect the actual impersonated user. 

198 Stores the new role in session, to be used when negotiating 

199 the user current role in `get_current_active_role`. 

200 """ 

201 self.user.current_role = role.code() 

202 

203 impersonate_data = self.impersonate_data 

204 if impersonate_data is None: 

205 self.user.save() 

206 else: 

207 impersonate_data.target_role = role.code() 

208 impersonate_data.serialize(self.request.session) 

209 

210 self.current_role = role 

211 if self.request: 

212 self.request.session["user_role"] = role.summary().serialize() 

213 self.request.session["available_roles"] = [ 

214 role.summary().serialize() for role in self.role_data.get_active_roles() 

215 ] 

216 

217 def init_user_roles(self) -> None: 

218 """ 

219 Initialize the role data of the given user. 

220 """ 

221 self.role_data = RoleData.from_user(self.user) 

222 

223 def switch_role(self, code: str) -> Role: 

224 """ 

225 Switch the user's current role to the provided code. 

226 Return the selected user role. 

227 """ 

228 role = self.role_data[code] 

229 if role is None: 

230 raise RoleException(f"The role code '{code}' does not exist.") 

231 elif not role.is_active: 

232 raise RoleException( 

233 f"The role '{role.name()}' is not available to user '{self.user}'." 

234 ) 

235 

236 self.set_current_role(role) 

237 

238 return role 

239 

240 def execute_rights_function(self, role: Role, function_name: str, *args, **kwargs) -> Any: 

241 """ 

242 Execute the given function with the provided arguments for the current role. 

243 """ 

244 if not hasattr(role, function_name): 

245 raise RoleException( 

246 f"The right function `{function_name}` is not implemented " 

247 f"for the role '{role.__class__}'" 

248 ) 

249 function = getattr(role, function_name) 

250 if not callable(function): 

251 raise RoleException( 

252 f"The right function `{function_name}` is not callable " 

253 f"for the role '{role.__class__}'" 

254 ) 

255 

256 return function(*args, **kwargs) 

257 

258 def check_rights(self, function_name: str, *args, **kwargs) -> bool: 

259 """ 

260 Execute the given function with the provided arguments for the current role. 

261 This should only be used for right functions returning boolean values. 

262 """ 

263 return self.execute_rights_function(self.current_role, function_name, *args, **kwargs) 

264 

265 def check_global_rights(self, function_name: str, *args, **kwargs) -> bool: 

266 """ 

267 Execute the given function with the provided arguments for every active role 

268 of the current user. Returns True if at least one role has the given right, 

269 else False. 

270 """ 

271 for role in self.role_data.get_active_roles(): 

272 result = self.execute_rights_function(role, function_name, *args, **kwargs) 

273 if result: 

274 return True 

275 return False 

276 

277 def get_from_rights(self, function_name: str, *args, **kwargs) -> Any: 

278 """ 

279 Execute the given function with the provided arguments for the current role. 

280 This should only be used for rights functions with a return type other than 

281 `bool`. 

282 """ 

283 return self.execute_rights_function(self.current_role, function_name, *args, **kwargs) 

284 

285 def get_attribute(self, attribute_name: str) -> Any: 

286 if not hasattr(self.current_role, attribute_name): 

287 raise RoleException( 

288 f"The attribute `{attribute_name}` is not implemented " 

289 f"for the role '{self.current_role.code()}'" 

290 ) 

291 

292 return getattr(self.current_role, attribute_name) 

293 

294 def token_authentication_allowed(self) -> bool: 

295 """ 

296 Token authentication is forbidden for user with active Editor+ roles. 

297 """ 

298 roles_token_forbidden = [Editor.code(), JournalManager.code()] 

299 return not any( 

300 role.code() in roles_token_forbidden for role in self.role_data.get_active_roles() 

301 ) 

302 

303 

304VisitorRT = TypeVar("VisitorRT") 

305 

306 

307class RoleVisitor(ABC, Generic[VisitorRT]): 

308 role: Role 

309 

310 def __init__(self, role: Role, *args, **kwargs): 

311 self.role = role 

312 super().__init__(*args, **kwargs) 

313 

314 def visit(self, role, *args, **kwargs) -> VisitorRT: 

315 function_name = f"visit_{role.code()}" 

316 ftor = getattr(self, function_name, None) 

317 if not callable(ftor): 

318 raise LookupError(f"RoleVisitor does not implements {function_name}") 

319 return ftor(*args, **kwargs) # type:ignore 

320 

321 @abstractmethod 

322 def visit_author(self, *args, **kwargs) -> VisitorRT: ... 

323 

324 @abstractmethod 

325 def visit_editor(self, *args, **kwargs) -> VisitorRT: ... 

326 

327 @abstractmethod 

328 def visit_journal_manager(self, *args, **kwargs) -> VisitorRT: ... 

329 

330 @abstractmethod 

331 def visit_reviewer(self, *args, **kwargs) -> VisitorRT: ...