Coverage for src/mesh/models/journal_models.py: 89%

85 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-04-28 07:45 +0000

1from __future__ import annotations 

2 

3from itertools import chain 

4from typing import TypeVar 

5 

6from django.db import models 

7from django.db.models import QuerySet 

8from django.utils.translation import gettext_lazy as _ 

9 

10from .base_models import BaseChangeTrackingModel 

11from .submission_models import Submission 

12 

13_T = TypeVar("_T", bound=models.Model) 

14 

15 

16class JournalSectionManager(models.Manager): 

17 """ 

18 Custom manager for JournalSection used to cache data at the manager level. 

19 """ 

20 

21 # Variables used to cache data at the class level. The cache must be cleared 

22 # on every change on a JournalSection (creation, deletion, update). 

23 _all_journal_sections: QuerySet[JournalSection] | None = None 

24 _all_journal_sections_children: dict[int | None, list[JournalSection]] | None = None 

25 _all_journal_sections_parents: dict[int, JournalSection | None] | None = None 

26 

27 def get_queryset(self) -> QuerySet: 

28 return ( 

29 super() 

30 .get_queryset() 

31 .select_related( 

32 "created_by", 

33 "last_modified_by", 

34 "parent", 

35 ) 

36 ) 

37 

38 def all_journal_sections(self) -> QuerySet[JournalSection]: 

39 """ 

40 Returns all registered `JournalSection`. 

41 """ 

42 if self._all_journal_sections is None: 

43 self._all_journal_sections = self.get_queryset().all() 

44 

45 return self._all_journal_sections 

46 

47 def all_journal_sections_parents(self) -> dict[int, JournalSection | None]: 

48 """ 

49 Returns the mapping: `{journal_section.pk: parent}` for all registered 

50 `JournalSection`. 

51 """ 

52 if self._all_journal_sections_parents is None: 

53 self._all_journal_sections_parents = { 

54 c.pk: c.parent for c in self.all_journal_sections() 

55 } 

56 

57 return self._all_journal_sections_parents 

58 

59 def all_journal_sections_children(self) -> dict[int | None, list[JournalSection]]: 

60 """ 

61 Return the mapping: `{journal_section.pk : list[children]}` for all registered 

62 JournalSection. 

63 

64 There's an additional entry to the mapping, `None`, listing all the journal_sections 

65 without a parent (top-level journal_sections). 

66 """ 

67 if self._all_journal_sections_children is None: 

68 journal_sections = self.all_journal_sections() 

69 processed_journal_sections = {c.pk: [] for c in journal_sections} 

70 processed_journal_sections[None] = [] 

71 

72 for journal_section in journal_sections: 

73 parent: JournalSection | None = journal_section.parent 

74 key: int | None = None if parent is None else parent.pk 

75 processed_journal_sections[key].append(journal_section) 

76 

77 self._all_journal_sections_children = processed_journal_sections 

78 

79 return self._all_journal_sections_children 

80 

81 def get_children_recursive( 

82 self, journal_section: JournalSection | None 

83 ) -> list[JournalSection]: 

84 """ 

85 Return the flattened list of all children nodes of the given journal_section. 

86 """ 

87 children_dict = self.all_journal_sections_children() 

88 key = None if journal_section is None else journal_section.pk 

89 children = children_dict[key] 

90 return list( 

91 # Flatten the input sequences into a single sequence. 

92 chain.from_iterable( 

93 ([c, *self.get_children_recursive(c)] for c in children), 

94 ) 

95 ) 

96 

97 def get_parents_recursive( 

98 self, journal_section: JournalSection | None 

99 ) -> list[JournalSection]: 

100 """ 

101 Return the flattened list of all parent nodes of the given journal_section. 

102 """ 

103 if journal_section is None: 

104 return [] 

105 parents_dict = self.all_journal_sections_parents() 

106 parent = parents_dict.get(journal_section.pk) 

107 if parent is None: 

108 return [] 

109 return [parent, *self.get_parents_recursive(parent)] 

110 

111 def clean_cache(self): 

112 self._all_journal_sections = None 

113 self._all_journal_sections_children = None 

114 self._all_journal_sections_parents = None 

115 

116 

117class JournalSection(BaseChangeTrackingModel): 

118 """ 

119 Represents a journal section. Sections can be nested infinitely. 

120 

121 Sections are mainly used to give editor rights over a whole section. 

122 """ 

123 

124 name = models.CharField(verbose_name=_("Name"), max_length=128, unique=True) 

125 parent = models.ForeignKey( 

126 "self", on_delete=models.SET_NULL, null=True, blank=True, related_name="children" 

127 ) 

128 

129 objects: JournalSectionManager = JournalSectionManager() 

130 

131 def __str__(self) -> str: 

132 return self.name 

133 

134 def save(self, *args, **kwargs) -> None: 

135 """ 

136 Need to check that the selected parent journal_section is valid. 

137 """ 

138 if self.parent and (self.parent == self or self.parent in self.all_children()): 

139 raise ValueError("The selected parent is invalid (self or child).") 

140 super().save(*args, **kwargs) 

141 self.__class__.objects.clean_cache() 

142 

143 def delete(self, *args, **kwargs): 

144 """ 

145 Delete the section and additionally move all submissions and sub-sections 

146 to the parent section. 

147 """ 

148 parent = self.parent 

149 children = list( 

150 self.children.all().values_list("pk", flat=True) # type:ignore 

151 ) 

152 submissions_to_update = list( 

153 Submission.objects.filter(journal_section=self).values_list("pk", flat=True) 

154 ) 

155 

156 res = super().delete(*args, **kwargs) 

157 

158 if children: 158 ↛ 160line 158 didn't jump to line 160 because the condition on line 158 was always true

159 JournalSection.objects.filter(pk__in=children).update(parent=parent) 

160 if submissions_to_update: 

161 Submission.objects.filter(pk__in=submissions_to_update).update(journal_section=parent) 

162 

163 self.__class__.objects.clean_cache() 

164 return res 

165 

166 def top_level_journal_section(self) -> JournalSection: 

167 """ 

168 Return the top level parent journal_section (journal). 

169 """ 

170 # The journal_section is a journal 

171 if self.parent is None: 

172 return self 

173 

174 # Browse up the journal_section arborescence until finding the top level journal_section 

175 parent_journal_sections = self.__class__.objects.all_journal_sections_parents() 

176 

177 top_level_journal_section: JournalSection = self.parent 

178 while top_level_journal_section.parent is not None: 

179 top_level_journal_section = parent_journal_sections[ 

180 top_level_journal_section.parent.pk 

181 ] # type:ignore 

182 

183 return top_level_journal_section 

184 

185 def all_children(self) -> list[JournalSection]: 

186 """ 

187 Get all the `JournalSection` children. 

188 """ 

189 if self._state.adding: 

190 return [] 

191 

192 return self.__class__.objects.get_children_recursive(self)