Coverage for src/mesh/model/roles/role_handler.py: 92%

149 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-04-28 07:45 +0000

1from dataclasses import dataclass 

2from typing import Any, Self 

3 

4from django.core.exceptions import ImproperlyConfigured 

5from django.http import HttpRequest 

6 

7from mesh.model.exceptions import RoleException 

8from mesh.models.user_models import User 

9 

10from ..user.user_interfaces import ImpersonateData 

11from .author import Author 

12from .base_role import Role, RoleRights 

13from .editor import Editor 

14from .journal_manager import JournalManager 

15from .reviewer import Reviewer 

16 

17# Cache of the RoleHandler at request level. The RoleHandler might be derived 

18# at middleware level. 

19ROLE_HANDLER_CACHE = "role_handler_cache" 

20ROLE_CLASSES: list[type[Role]] = [Author, Reviewer, Editor, JournalManager] 

21 

22if len({role_cls.code() for role_cls in ROLE_CLASSES}) != len(ROLE_CLASSES): 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true

23 data_str = " - ".join(f"{role_cls}: {role_cls.code()}" for role_cls in ROLE_CLASSES) 

24 raise ImproperlyConfigured(f"At least 2 distinct roles have the same code: {data_str}") 

25 

26 

27def get_role_class_from_code(code: str | None) -> type[Role] | None: 

28 if code is None: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 return None 

30 

31 for role_cls in ROLE_CLASSES: 

32 if role_cls.code() == code: 

33 return role_cls 

34 return None 

35 

36 

37@dataclass 

38class RoleData: 

39 """ 

40 Contains all the roles and related data accessible to an user. 

41 The attribute name much match the `_CODE` of the role class (which should also 

42 match the python file name for consistency). 

43 """ 

44 

45 author: Author 

46 reviewer: Reviewer 

47 editor: Editor 

48 journal_manager: JournalManager 

49 

50 def get_roles(self) -> list[Role]: 

51 """ 

52 Get the list of all roles for an user. 

53 The order in the list is the priority order to get the user's default role. 

54 """ 

55 return [self.journal_manager, self.editor, self.reviewer, self.author] 

56 

57 def __getitem__(self, key) -> Role | None: 

58 return getattr(self, key, None) 

59 

60 def default_role(self) -> Role: 

61 """ 

62 Get the user's default role. 

63 A role is available to the user if `role.active = True` 

64 """ 

65 for role in self.get_roles(): 65 ↛ 68line 65 didn't jump to line 68 because the loop on line 65 didn't complete

66 if role.active: 

67 return role 

68 raise RoleException("There is no default role for this user. Should not happen.") 

69 

70 def get_active_roles(self) -> list[Role]: 

71 """ 

72 Returns the list of active roles for the current user. 

73 """ 

74 return [role for role in self.get_roles() if role.active] 

75 

76 @classmethod 

77 def from_user(cls, user: User) -> Self: 

78 return cls( 

79 author=Author(user), 

80 reviewer=Reviewer(user), 

81 editor=Editor(user), 

82 journal_manager=JournalManager(user), 

83 ) 

84 

85 

86class RoleHandler: 

87 """ 

88 Handler for the role logic & data of an user. 

89 """ 

90 

91 user: User 

92 current_role: Role 

93 # Just a shortcut for `current_role.rights` 

94 rights: RoleRights 

95 role_data: RoleData 

96 # Whether the `complete_init` method has been called. 

97 init_complete = False 

98 # The RoleHandler is usually instantiated on each request for the current user 

99 # and is used to derive the user's current role. 

100 # We keep a reference of the enclosing request here. 

101 # The value can remain `None`, for ex. if the role handler is instantiated just to 

102 # check some global rights for a given user. 

103 request: HttpRequest | None = None 

104 

105 @property 

106 def impersonate_data(self) -> ImpersonateData | None: 

107 if self.request is None: 

108 return None 

109 return ImpersonateData.from_session(self.request.session) 

110 

111 def __init__( 

112 self, user: User, request: HttpRequest | None = None, partial_init: bool = False 

113 ) -> None: 

114 """ 

115 The initialization is split in 2 steps: 

116 - `init_user_roles` Populates the given user role data. 

117 - `complete_init` Process the given request to derive request 

118 dependent attributes (ex. impersonate data) 

119 and derive the user current role and related 

120 variables 

121 

122 This is useful when we only want an user's role data without any impact on the 

123 database (for ex: see `TokenBackend`). 

124 Params: 

125 - `user` The user for which the roles & corresponding rights 

126 should be derived. 

127 - `request` A reference to the enclosing HTTP request (Optional). 

128 - `partial_init` Whether to perform the second step `complete_init`. 

129 Default to `False`. 

130 """ 

131 self.user = user 

132 self.init_user_roles() 

133 self.request = request 

134 

135 if not partial_init: 

136 self.complete_init() 

137 

138 def complete_init(self): 

139 """ 

140 Finalize the initialization of the role_handler. It consists mainly in deriving 

141 the user's current role and setting some related variables. 

142 """ 

143 if self.init_complete: 

144 return 

145 

146 role = self.get_current_active_role() 

147 

148 self.set_current_role(role) 

149 

150 # Set the impersonation boolean. 

151 # Current impersonator cannot further impersonate another user (we prevent 

152 # nested impersonated users, would be too messy). 

153 if self.request: 

154 self.request.session["can_impersonate"] = ( 

155 self.impersonate_data is None and self.check_global_rights("can_impersonate") 

156 ) 

157 self.init_complete = True 

158 

159 def get_current_active_role(self) -> Role: 

160 """ 

161 Derive the user's current active role. 

162 It depends on: 

163 - The current role stored in DB 

164 - The impersonate data if the user is currently being impersonated. 

165 - The user's active roles. 

166 """ 

167 current_role_code = self.user.current_role 

168 

169 # In impersonate context, the current_role can be set from the impersonate data. 

170 impersonate_data = self.impersonate_data 

171 if impersonate_data and impersonate_data.target_role: 

172 current_role_code = impersonate_data.target_role 

173 elif current_role_code is None: 

174 current_role_code = self.role_data.default_role().code() 

175 

176 current_role_class = get_role_class_from_code(current_role_code) 

177 if current_role_class is None: 

178 current_role_class = self.role_data.default_role().__class__ 

179 

180 # Check that the selected role is active 

181 role = self.role_data[current_role_class.code()] 

182 if role is None or not role.active: 

183 role = self.role_data.default_role() 

184 

185 return role 

186 

187 def get_active_roles(self): 

188 """ 

189 Return all the roles that are active (ie: that a user can select) 

190 """ 

191 return self.role_data.get_active_roles() 

192 

193 def set_current_role(self, role: Role): 

194 """ 

195 Sets the user current role. 

196 

197 IMPORTANT: In case of a current user impersonator, don't update the current 

198 role in the database as it would affect the actual impersonated user. 

199 Stores the new role in session, to be used when negotiating 

200 the user current role in `get_current_active_role`. 

201 """ 

202 self.user.current_role = role.code() 

203 

204 impersonate_data = self.impersonate_data 

205 if impersonate_data is None: 

206 self.user.save() 

207 else: 

208 impersonate_data.target_role = role.code() 

209 impersonate_data.serialize(self.request.session) # type:ignore 

210 

211 self.current_role = role 

212 self.rights = role.rights 

213 if self.request: 

214 self.request.session["user_role"] = role.summary().serialize() 

215 self.request.session["available_roles"] = [ 

216 role.summary().serialize() for role in self.role_data.get_active_roles() 

217 ] 

218 

219 def init_user_roles(self) -> None: 

220 """ 

221 Initialize the role data of the given user. 

222 """ 

223 self.role_data = RoleData.from_user(self.user) 

224 

225 def switch_role(self, code: str) -> Role: 

226 """ 

227 Switch the user's current role to the provided code. 

228 Return the selected user role. 

229 """ 

230 role = self.role_data[code] 

231 if role is None: 

232 raise RoleException(f"The role code '{code}' does not exist.") 

233 elif not role.active: 

234 raise RoleException( 

235 f"The role '{role.name()}' is not available to user '{self.user}'." 

236 ) 

237 

238 self.set_current_role(role) 

239 

240 return role 

241 

242 def execute_rights_function( 

243 self, rights: RoleRights, function_name: str, *args, **kwargs 

244 ) -> Any: 

245 """ 

246 Execute the given function with the provided arguments for the current role. 

247 """ 

248 if not hasattr(rights, function_name): 

249 raise RoleException( 

250 f"The right function `{function_name}` is not implemented " 

251 f"for the role rights '{rights.__class__}'" 

252 ) 

253 function = getattr(rights, function_name) 

254 if not callable(function): 

255 raise RoleException( 

256 f"The right function `{function_name}` is not callable " 

257 f"for the role rights '{rights.__class__}'" 

258 ) 

259 

260 return function(*args, **kwargs) 

261 

262 def check_rights(self, function_name: str, *args, **kwargs) -> bool: 

263 """ 

264 Execute the given function with the provided arguments for the current role. 

265 This should only be used for right functions returning boolean values. 

266 """ 

267 return self.execute_rights_function(self.rights, function_name, *args, **kwargs) 

268 

269 def check_global_rights(self, function_name: str, *args, **kwargs) -> bool: 

270 """ 

271 Execute the given function with the provided arguments for every active role 

272 of the current user. Returns True if at least one role has the given right, 

273 else False. 

274 """ 

275 for role in self.role_data.get_active_roles(): 

276 result = self.execute_rights_function(role.rights, function_name, *args, **kwargs) 

277 if result: 

278 return True 

279 return False 

280 

281 def get_from_rights(self, function_name: str, *args, **kwargs) -> Any: 

282 """ 

283 Execute the given function with the provided arguments for the current role. 

284 This should only be used for rights functions with a return type other than 

285 `bool`. 

286 """ 

287 return self.execute_rights_function(self.rights, function_name, *args, **kwargs) 

288 

289 def get_attribute(self, attribute_name: str) -> Any: 

290 if not hasattr(self.rights, attribute_name): 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 raise RoleException( 

292 f"The attribute `{attribute_name}` is not implemented " 

293 f"for the role '{self.current_role.code()}'" 

294 ) 

295 

296 return getattr(self.rights, attribute_name) 

297 

298 def token_authentication_allowed(self) -> bool: 

299 """ 

300 Token authentication is forbidden for user with active Editor+ roles. 

301 """ 

302 roles_token_forbidden = [Editor.code(), JournalManager.code()] 

303 return not any( 

304 role.code() in roles_token_forbidden for role in self.role_data.get_active_roles() 

305 ) 

306 

307 

308class RoleVisitor: 

309 def __init__(self, role_handler, *args, **kwargs): 

310 self.role_handler = role_handler 

311 super().__init__(*args, **kwargs) 

312 

313 def visit(self, role, submission, *args, **kwargs): 

314 function_name = f"visit_{role.code()}" 

315 ftor = getattr(self, function_name, None) 

316 if callable(ftor): 316 ↛ exitline 316 didn't return from function 'visit' because the condition on line 316 was always true

317 return ftor(submission, *args, **kwargs) 

318 

319 def visit_author(self, submission, *args, **kwargs): 

320 pass 

321 

322 def visit_editor(self, submission, *args, **kwargs): 

323 pass 

324 

325 def visit_journal_manager(self, submission, *args, **kwargs): 

326 pass 

327 

328 def visit_reviewer(self, submission, *args, **kwargs): 

329 pass