Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) 2018-2020
2# MPIB <https://www.mpib-berlin.mpg.de/>,
3# MPI-CBS <https://www.cbs.mpg.de/>,
4# MPIP <http://www.psych.mpg.de/>
5#
6# This file is part of Castellum.
7#
8# Castellum is free software; you can redistribute it and/or modify it
9# under the terms of the GNU Affero General Public License as published
10# by the Free Software Foundation; either version 3 of the License, or
11# (at your option) any later version.
12#
13# Castellum is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16# Affero General Public License for more details.
17#
18# You should have received a copy of the GNU Affero General Public
19# License along with Castellum. If not, see
20# <http://www.gnu.org/licenses/>.
22import itertools
23import json
24import os
26from django import forms
27from django.core.exceptions import ObjectDoesNotExist
28from django.db import models
29from django.utils.translation import gettext_lazy as _
31from castellum.studies.models import Study
32from castellum.utils.forms import BaseImportForm
33from castellum.utils.forms import DateTimeField
34from castellum.utils.forms import DisabledChoiceField
35from castellum.utils.forms import DisabledModelChoiceField
37from .models import Appointment
38from .models import AttributeDescription
39from .models import AttributeSet
40from .models import ParticipationRequest
41from .models import SubjectFilter
42from .models.appointments import MINUTE
43from .models.attributesets import ANSWER_DECLINED
44from .models.attributesets import UNCATEGORIZED
46APP_DIR = os.path.dirname(__file__)
47IMPORT_SCHEMA = json.load(open(os.path.join(APP_DIR, 'schemas', 'attributes.json')))
50def get_description_choices(user):
51 choices = [(None, '---')]
52 disabled_choices = []
54 categories = AttributeDescription.objects.by_category()
56 for description in categories.pop(UNCATEGORIZED):
57 choices.append((description.pk, description.filter_label))
58 if not user.has_privacy_level(description.privacy_level_read): 58 ↛ 59line 58 didn't jump to line 59, because the condition on line 58 was never true
59 disabled_choices.append(description.pk)
61 for category, descriptions in categories.items(): 61 ↛ 62line 61 didn't jump to line 62, because the loop on line 61 never started
62 sub = []
63 for description in descriptions:
64 sub.append((description.pk, description.filter_label))
65 if not user.has_privacy_level(description.privacy_level_read):
66 disabled_choices.append(description.pk)
67 choices.append((category.label, sub))
69 return choices, disabled_choices
72class SubjectFilterAddForm(forms.Form):
73 description = DisabledModelChoiceField(AttributeDescription.objects.none(), required=False)
75 def __init__(self, user, *args, **kwargs):
76 super().__init__(*args, **kwargs)
78 choices, disabled_choices = get_description_choices(user)
79 self.fields["description"].choices = choices
80 self.fields["description"].widget.disabled_choices = disabled_choices
83class SubjectFilterForm(forms.ModelForm):
84 class Meta:
85 model = SubjectFilter
86 fields = "__all__"
88 def __init__(self, *args, **kwargs):
89 super().__init__(*args, **kwargs)
91 selected_description = self._get_selected_description(**kwargs)
93 self.fields["value"] = selected_description.field.filter_formfield(required=True)
94 self.fields["operator"] = forms.ChoiceField(
95 choices=selected_description.field.available_operators
96 )
98 try:
99 if self.instance.group.study.status != Study.EDIT: 99 ↛ 100line 99 didn't jump to line 100, because the condition on line 99 was never true
100 self.fields["value"].widget.attrs["disabled"] = True
101 self.fields["operator"].widget.attrs["disabled"] = True
102 except ObjectDoesNotExist:
103 pass
105 def _get_selected_description(self, instance=None, data=None, prefix=None, **kwargs):
106 if instance:
107 return instance.description
109 key = prefix + '-description' if prefix else 'description'
110 if data and data[key]:
111 try:
112 return AttributeDescription.objects.get(pk=data[key])
113 except AttributeDescription.DoesNotExist:
114 return None
116 try:
117 return kwargs['initial']['description']
118 except KeyError:
119 return None
122class SubjectFilterFormSet(forms.BaseModelFormSet):
123 def clean(self):
124 super().clean()
126 values = []
127 for form in self.forms:
128 if not form.cleaned_data.get('DELETE', False):
129 values.append((
130 form.cleaned_data.get('description'),
131 form.cleaned_data.get('operator'),
132 form.cleaned_data.get('value'),
133 ))
135 if len(set(values)) != len(values):
136 raise forms.ValidationError(_(
137 'There are duplicates in filters. Please change or delete filters!'
138 ), code='invalid')
141class AttributeSetForm(forms.ModelForm):
143 class Meta:
144 model = AttributeSet
145 exclude = ('data',)
146 widgets = {
147 'study_type_disinterest': forms.CheckboxSelectMultiple(),
148 }
150 def __init__(self, instance=None, **kwargs):
151 if not kwargs.get('initial'): 151 ↛ 155line 151 didn't jump to line 155, because the condition on line 151 was never false
152 kwargs['initial'] = {}
153 for key, value in instance.get_data().items():
154 kwargs['initial'][key] = None if value == ANSWER_DECLINED else value
155 super().__init__(instance=instance, **kwargs)
157 def clean(self):
158 cleaned_data = super().clean()
159 for key in cleaned_data:
160 if key + '_answer_declined' in self.data: 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true
161 cleaned_data[key] = ANSWER_DECLINED
162 study_type_disinterest = cleaned_data.pop('study_type_disinterest')
163 return {
164 'study_type_disinterest': study_type_disinterest,
165 'data': cleaned_data,
166 }
168 def save(self):
169 self.instance.data.update(self.cleaned_data['data'])
170 return super().save()
172 @classmethod
173 def factory(cls, user, obj=None):
174 allowed_descriptions = AttributeDescription.objects.allowed_write(user, obj=obj)
175 form_fields = {desc.json_key: desc.field.formfield() for desc in allowed_descriptions}
176 return type('AttributeSetForm', (cls,), form_fields)
179class ContactForm(forms.ModelForm):
180 def __init__(self, *args, **kwargs):
181 super().__init__(*args, **kwargs)
183 for session, appointment in self.instance.get_appointments():
184 key = 'appointment-%i' % session.pk
185 self.fields[key] = DateTimeField(
186 label=self.format_session_name(session),
187 initial=appointment.start if appointment else None,
188 required=False,
189 )
191 model_status = ParticipationRequest._meta.get_field('status')
192 self.fields['status'] = DisabledChoiceField(
193 label=model_status.verbose_name,
194 choices=model_status.choices,
195 coerce=int,
196 )
197 if not self.instance.match: 197 ↛ 198line 197 didn't jump to line 198
198 self.fields['status'].widget.disabled_choices = [
199 choice for choice, label in ParticipationRequest.STATUS_OPTIONS
200 if choice != ParticipationRequest.UNSUITABLE
201 ]
202 elif self.instance.match == 'incomplete': 202 ↛ 203line 202 didn't jump to line 203, because the condition on line 202 was never true
203 self.fields['status'].widget.disabled_choices = [ParticipationRequest.INVITED]
205 def format_session_name(self, session):
206 duration = _('%imin') % session.duration
207 types = ', '.join(str(t) for t in session.type.order_by('pk'))
208 if types: 208 ↛ 209line 208 didn't jump to line 209, because the condition on line 208 was never true
209 return '%s (%s) - %s' % (session.name, types, duration)
210 else:
211 return '%s - %s' % (session.name, duration)
213 def clean(self):
214 cleaned_data = super().clean()
216 appointments = []
217 for session in self.instance.study.studysession_set.all():
218 key = 'appointment-%i' % session.pk
219 start = cleaned_data.get(key)
220 if start:
221 appointments.append((session, key, start, start + session.duration * MINUTE))
223 for a, b in itertools.combinations(appointments, 2):
224 __, key1, start1, end1 = a
225 __, key2, start2, end2 = b
226 if start1 < end2 and start2 < end1:
227 self.add_error(key1, _('Appointments must not overlap.'))
228 self.add_error(key2, _('Appointments must not overlap.'))
229 break
231 qs = Appointment.objects\
232 .exclude(participant=self.instance)\
233 .filter(participant__status=ParticipationRequest.INVITED)\
234 .annotate(end=models.ExpressionWrapper(
235 models.F('start') + models.F('session__duration') * MINUTE,
236 output_field=models.DateTimeField(),
237 ))
239 for session, key, start, end in appointments:
240 if session.resource and qs.filter(
241 session__resource=session.resource, end__gt=start, start__lt=end,
242 ).exists():
243 self.add_error(key, _('The required resource is not available at this time'))
245 def save(self):
246 pariticipationrequest = super().save()
248 for session, appointment in pariticipationrequest.get_appointments():
249 start = self.cleaned_data.get('appointment-%i' % session.pk)
251 if appointment:
252 if not start: 252 ↛ 254line 252 didn't jump to line 254, because the condition on line 252 was never false
253 appointment.delete()
254 elif start != appointment.start:
255 appointment.start = start
256 appointment.save()
257 elif start: 257 ↛ 248line 257 didn't jump to line 248, because the condition on line 257 was never false
258 pariticipationrequest.appointment_set.create(
259 session=session,
260 start=start,
261 )
263 return pariticipationrequest
265 @property
266 def appointments(self):
267 for name in self.fields:
268 if name.startswith('appointment-'):
269 yield self[name]
271 class Meta:
272 model = ParticipationRequest
273 fields = ['status', 'followup_date', 'followup_time', 'exclusion_criteria_checked']
276class SendMailForm(forms.Form):
277 batch_size = forms.IntegerField(
278 min_value=1, label=_('How many subjects do you want to contact?')
279 )
282class CategoryImportForm(BaseImportForm):
283 schema = IMPORT_SCHEMA
284 schema_ref = '#/$defs/AttributeCategoryExport'
287class DescriptionImportForm(BaseImportForm):
288 schema = IMPORT_SCHEMA
289 schema_ref = '#/$defs/AttributeDescriptionExport'