Coverage for src / mesh / views / middlewares / role.py: 81%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-05-04 12:41 +0000

1from typing import TYPE_CHECKING 

2 

3from django.contrib.auth.models import AnonymousUser 

4from opentelemetry import trace 

5 

6from mesh.models.orm.user_models import User 

7from mesh.models.roles.author import Author 

8from mesh.models.roles.base_role import Role 

9from mesh.models.roles.editor import Editor 

10from mesh.models.roles.journal_manager import JournalManager 

11from mesh.models.roles.reviewer import Reviewer 

12from mesh.models.roles.role_handler import get_role_class_from_code 

13from mesh.models.user.user_interfaces import ImpersonateData 

14 

15if TYPE_CHECKING: 

16 from collections.abc import Callable 

17 

18 from django.http import HttpRequest, HttpResponse 

19 

20 class MeshHttpRequest(HttpRequest): 

21 current_role: Role 

22 

23 

24tracer = trace.get_tracer(__name__) 

25# The order in the list is the priority order to get the user's default role. 

26ROLES_ORDER: list[type[Role]] = [JournalManager, Editor, Reviewer, Author] 

27 

28 

29def _get_default_role(roleDict: dict[type[Role], Role]): 

30 for RoleClass in ROLES_ORDER: 

31 role = roleDict[RoleClass] 

32 if role.is_active: 

33 return role 

34 # unreachable 

35 assert False 

36 

37 

38def get_user_role(user: User, role_dict: dict[type[Role], Role]) -> Role | None: 

39 # RoleDict must be a dict containing already instanciated role objects 

40 if isinstance(user, AnonymousUser): 

41 return None 

42 current_role_code = getattr(user, "current_role", None) 

43 if current_role_code: 

44 # User.current_role is set 

45 # Try to instantiate said role 

46 RoleClass = get_role_class_from_code(current_role_code) 

47 if not RoleClass: 

48 user.current_role = None 

49 user.save(update_fields=["current_role"]) 

50 raise ValueError(f"role {current_role_code} does not exists") 

51 role = role_dict[RoleClass] 

52 else: 

53 # User.current_role is unset 

54 # Assign a role to the user 

55 role = _get_default_role(role_dict) 

56 user.current_role = role.code() 

57 current_role_code = user.current_role 

58 

59 if not role.is_active: 

60 user.current_role = None 

61 return get_user_role(user, role_dict) 

62 user.save(update_fields=["current_role"]) 

63 return role 

64 

65 

66class CurrentRoleMiddleware: 

67 """ 

68 Creates the request.current_role attribute 

69 """ 

70 

71 def __init__(self, get_response: "Callable[[HttpRequest], HttpResponse]"): 

72 self.get_response = get_response 

73 # One-time configuration and initialization. 

74 

75 def __call__(self, request: "HttpRequest"): 

76 # Start a new span here : computing available roles can become expensive, so we have to monitor this 

77 if not isinstance(request.user, AnonymousUser): 

78 with tracer.start_as_current_span("CurrentRoleMiddleware.call"): 

79 user: User = request.user 

80 role_dict = {RoleClass: RoleClass(user) for RoleClass in ROLES_ORDER} 

81 

82 impersonate_data = ImpersonateData.from_session(request.session) 

83 if impersonate_data and impersonate_data.target_role: 

84 # Force impersonated user role 

85 RoleClass = get_role_class_from_code(impersonate_data.target_role) 

86 if RoleClass and role_dict[RoleClass].is_active: 

87 request.current_role = role_dict[RoleClass] 

88 else: 

89 request.current_role = get_user_role(user=user, role_dict=role_dict) 

90 else: 

91 request.current_role = get_user_role(user=user, role_dict=role_dict) 

92 

93 request.available_roles = [ 

94 role.summary().serialize() for role in role_dict.values() if role.is_active 

95 ] 

96 

97 # can_impersonate global for button display 

98 # TODO : move to proxy 

99 request.session["can_impersonate"] = impersonate_data is None and any( 

100 r.can_impersonate() for r in role_dict.values() if r.is_active 

101 ) 

102 

103 response = self.get_response(request) 

104 return response