Coverage for src/mesh/models/journal_models.py: 89%
85 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-28 07:45 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-04-28 07:45 +0000
1from __future__ import annotations
3from itertools import chain
4from typing import TypeVar
6from django.db import models
7from django.db.models import QuerySet
8from django.utils.translation import gettext_lazy as _
10from .base_models import BaseChangeTrackingModel
11from .submission_models import Submission
13_T = TypeVar("_T", bound=models.Model)
16class JournalSectionManager(models.Manager):
17 """
18 Custom manager for JournalSection used to cache data at the manager level.
19 """
21 # Variables used to cache data at the class level. The cache must be cleared
22 # on every change on a JournalSection (creation, deletion, update).
23 _all_journal_sections: QuerySet[JournalSection] | None = None
24 _all_journal_sections_children: dict[int | None, list[JournalSection]] | None = None
25 _all_journal_sections_parents: dict[int, JournalSection | None] | None = None
27 def get_queryset(self) -> QuerySet:
28 return (
29 super()
30 .get_queryset()
31 .select_related(
32 "created_by",
33 "last_modified_by",
34 "parent",
35 )
36 )
38 def all_journal_sections(self) -> QuerySet[JournalSection]:
39 """
40 Returns all registered `JournalSection`.
41 """
42 if self._all_journal_sections is None:
43 self._all_journal_sections = self.get_queryset().all()
45 return self._all_journal_sections
47 def all_journal_sections_parents(self) -> dict[int, JournalSection | None]:
48 """
49 Returns the mapping: `{journal_section.pk: parent}` for all registered
50 `JournalSection`.
51 """
52 if self._all_journal_sections_parents is None:
53 self._all_journal_sections_parents = {
54 c.pk: c.parent for c in self.all_journal_sections()
55 }
57 return self._all_journal_sections_parents
59 def all_journal_sections_children(self) -> dict[int | None, list[JournalSection]]:
60 """
61 Return the mapping: `{journal_section.pk : list[children]}` for all registered
62 JournalSection.
64 There's an additional entry to the mapping, `None`, listing all the journal_sections
65 without a parent (top-level journal_sections).
66 """
67 if self._all_journal_sections_children is None:
68 journal_sections = self.all_journal_sections()
69 processed_journal_sections = {c.pk: [] for c in journal_sections}
70 processed_journal_sections[None] = []
72 for journal_section in journal_sections:
73 parent: JournalSection | None = journal_section.parent
74 key: int | None = None if parent is None else parent.pk
75 processed_journal_sections[key].append(journal_section)
77 self._all_journal_sections_children = processed_journal_sections
79 return self._all_journal_sections_children
81 def get_children_recursive(
82 self, journal_section: JournalSection | None
83 ) -> list[JournalSection]:
84 """
85 Return the flattened list of all children nodes of the given journal_section.
86 """
87 children_dict = self.all_journal_sections_children()
88 key = None if journal_section is None else journal_section.pk
89 children = children_dict[key]
90 return list(
91 # Flatten the input sequences into a single sequence.
92 chain.from_iterable(
93 ([c, *self.get_children_recursive(c)] for c in children),
94 )
95 )
97 def get_parents_recursive(
98 self, journal_section: JournalSection | None
99 ) -> list[JournalSection]:
100 """
101 Return the flattened list of all parent nodes of the given journal_section.
102 """
103 if journal_section is None:
104 return []
105 parents_dict = self.all_journal_sections_parents()
106 parent = parents_dict.get(journal_section.pk)
107 if parent is None:
108 return []
109 return [parent, *self.get_parents_recursive(parent)]
111 def clean_cache(self):
112 self._all_journal_sections = None
113 self._all_journal_sections_children = None
114 self._all_journal_sections_parents = None
117class JournalSection(BaseChangeTrackingModel):
118 """
119 Represents a journal section. Sections can be nested infinitely.
121 Sections are mainly used to give editor rights over a whole section.
122 """
124 name = models.CharField(verbose_name=_("Name"), max_length=128, unique=True)
125 parent = models.ForeignKey(
126 "self", on_delete=models.SET_NULL, null=True, blank=True, related_name="children"
127 )
129 objects: JournalSectionManager = JournalSectionManager()
131 def __str__(self) -> str:
132 return self.name
134 def save(self, *args, **kwargs) -> None:
135 """
136 Need to check that the selected parent journal_section is valid.
137 """
138 if self.parent and (self.parent == self or self.parent in self.all_children()):
139 raise ValueError("The selected parent is invalid (self or child).")
140 super().save(*args, **kwargs)
141 self.__class__.objects.clean_cache()
143 def delete(self, *args, **kwargs):
144 """
145 Delete the section and additionally move all submissions and sub-sections
146 to the parent section.
147 """
148 parent = self.parent
149 children = list(
150 self.children.all().values_list("pk", flat=True) # type:ignore
151 )
152 submissions_to_update = list(
153 Submission.objects.filter(journal_section=self).values_list("pk", flat=True)
154 )
156 res = super().delete(*args, **kwargs)
158 if children: 158 ↛ 160line 158 didn't jump to line 160 because the condition on line 158 was always true
159 JournalSection.objects.filter(pk__in=children).update(parent=parent)
160 if submissions_to_update:
161 Submission.objects.filter(pk__in=submissions_to_update).update(journal_section=parent)
163 self.__class__.objects.clean_cache()
164 return res
166 def top_level_journal_section(self) -> JournalSection:
167 """
168 Return the top level parent journal_section (journal).
169 """
170 # The journal_section is a journal
171 if self.parent is None:
172 return self
174 # Browse up the journal_section arborescence until finding the top level journal_section
175 parent_journal_sections = self.__class__.objects.all_journal_sections_parents()
177 top_level_journal_section: JournalSection = self.parent
178 while top_level_journal_section.parent is not None:
179 top_level_journal_section = parent_journal_sections[
180 top_level_journal_section.parent.pk
181 ] # type:ignore
183 return top_level_journal_section
185 def all_children(self) -> list[JournalSection]:
186 """
187 Get all the `JournalSection` children.
188 """
189 if self._state.adding:
190 return []
192 return self.__class__.objects.get_children_recursive(self)