Coverage for src / mesh / models / journal_models.py: 88%
90 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-20 10:03 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-20 10:03 +0000
1from __future__ import annotations
3from itertools import chain
4from typing import TypeVar
6from django.db import models
7from django.db.models import QuerySet
8from django.utils.translation import gettext_lazy as _
10from .base_models import BaseChangeTrackingModel
11from .submission_models import Submission
13_T = TypeVar("_T", bound=models.Model)
16class JournalSectionManager(models.Manager["JournalSection"]):
17 """
18 Custom manager for JournalSection used to cache data at the manager level.
19 """
21 # Variables used to cache data at the class level. The cache must be cleared
22 # on every change on a JournalSection (creation, deletion, update).
23 _all_journal_sections: QuerySet[JournalSection] | None = None
24 _all_journal_sections_children: dict[int | None, list[JournalSection]] | None = None
25 _all_journal_sections_parents: dict[int, JournalSection | None] | None = None
27 def get_queryset(self):
28 return (
29 super()
30 .get_queryset()
31 .select_related(
32 "created_by",
33 "last_modified_by",
34 "parent",
35 )
36 )
38 def all_journal_sections(self) -> QuerySet[JournalSection]:
39 """
40 Returns all registered `JournalSection`.
41 """
42 if self._all_journal_sections is None:
43 self._all_journal_sections = self.get_queryset().all()
45 return self._all_journal_sections
47 def all_journal_sections_parents(self):
48 """
49 Returns the mapping: `{journal_section.pk: parent}` for all registered
50 `JournalSection`.
51 """
52 if self._all_journal_sections_parents is None:
53 self._all_journal_sections_parents = {
54 c.pk: c.parent for c in self.all_journal_sections()
55 }
57 return self._all_journal_sections_parents
59 def all_journal_sections_children(self) -> dict[int | None, list[JournalSection]]:
60 """
61 Return the mapping: `{journal_section.pk : list[children]}` for all registered
62 JournalSection.
64 There's an additional entry to the mapping, `None`, listing all the journal_sections
65 without a parent (top-level journal_sections).
66 """
67 if self._all_journal_sections_children is None:
68 journal_sections = self.all_journal_sections()
69 processed_journal_sections = {c.pk: [] for c in journal_sections}
70 processed_journal_sections[None] = []
72 for journal_section in journal_sections:
73 parent: JournalSection | None = journal_section.parent
74 key: int | None = None if parent is None else parent.pk
75 processed_journal_sections[key].append(journal_section)
77 self._all_journal_sections_children = processed_journal_sections
79 return self._all_journal_sections_children
81 def get_children_recursive(
82 self, journal_section: JournalSection | None
83 ) -> list[JournalSection]:
84 """
85 Return the flattened list of all children nodes of the given journal_section.
86 """
87 children_dict = self.all_journal_sections_children()
88 key = None if journal_section is None else journal_section.pk
89 children = children_dict[key]
90 return list(
91 # Flatten the input sequences into a single sequence.
92 chain.from_iterable(
93 ([c, *self.get_children_recursive(c)] for c in children),
94 )
95 )
97 def get_parents_recursive(
98 self, journal_section: JournalSection | None
99 ) -> list[JournalSection]:
100 """
101 Return the flattened list of all parent nodes of the given journal_section.
102 """
103 if journal_section is None:
104 return []
105 parents_dict = self.all_journal_sections_parents()
106 parent = parents_dict.get(journal_section.pk)
107 if parent is None:
108 return []
109 return [parent, *self.get_parents_recursive(parent)]
111 def clean_cache(self):
112 self._all_journal_sections = None
113 self._all_journal_sections_children = None
114 self._all_journal_sections_parents = None
117class JournalSection(BaseChangeTrackingModel):
118 """
119 Represents a journal section. Sections can be nested infinitely.
121 Sections are mainly used to give editor rights over a whole section.
122 """
124 name = models.CharField(verbose_name=_("Name"), max_length=128, unique=True)
125 parent = models.ForeignKey["JournalSection"](
126 "self", on_delete=models.SET_NULL, null=True, related_name="children"
127 )
128 children: "models.manager.RelatedManager[JournalSection]"
130 objects: JournalSectionManager = JournalSectionManager() # type: ignore
132 def __str__(self) -> str:
133 return self.name
135 def save(self, *args, **kwargs) -> None:
136 """
137 Need to check that the selected parent journal_section is valid.
138 """
139 if self.parent and (self.parent == self or self.parent in self.all_children()):
140 raise ValueError("The selected parent is invalid (self or child).")
141 super().save(*args, **kwargs)
142 self.__class__.objects.clean_cache()
144 def delete(self, *args, **kwargs):
145 """
146 Delete the section and additionally move all submissions and sub-sections
147 to the parent section.
148 """
149 parent = self.parent
150 children = list(self.children.all().values_list("pk", flat=True))
151 submissions_to_update = list(
152 Submission.objects.filter(journal_section=self).values_list("pk", flat=True)
153 )
155 res = super().delete(*args, **kwargs)
157 if children:
158 JournalSection.objects.filter(pk__in=children).update(parent=parent)
159 if submissions_to_update:
160 Submission.objects.filter(pk__in=submissions_to_update).update(journal_section=parent)
162 self.__class__.objects.clean_cache()
163 return res
165 def top_level_journal_section(self) -> JournalSection:
166 """
167 Return the top level parent journal_section (journal).
168 """
169 # The journal_section is a journal
170 if self.parent is None:
171 return self
173 # Browse up the journal_section arborescence until finding the top level journal_section
174 parent_journal_sections = self.__class__.objects.all_journal_sections_parents()
176 top_level_journal_section = self.parent
178 parent_journal = top_level_journal_section.parent
179 while parent_journal is not None:
180 top_level_journal_section = parent_journal_sections[parent_journal.pk]
181 if top_level_journal_section is None:
182 raise ValueError(
183 f"Invalid cache: {parent_journal.pk} pk not found in parent_journal_sections"
184 )
185 parent_journal = top_level_journal_section.parent
187 return top_level_journal_section
189 def all_children(self) -> list[JournalSection]:
190 """
191 Get all the `JournalSection` children.
192 """
193 if self._state.adding:
194 return []
196 return self.__class__.objects.get_children_recursive(self)