Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# (c) 2018-2020 

2# MPIB <https://www.mpib-berlin.mpg.de/>, 

3# MPI-CBS <https://www.cbs.mpg.de/>, 

4# MPIP <http://www.psych.mpg.de/> 

5# 

6# This file is part of Castellum. 

7# 

8# Castellum is free software; you can redistribute it and/or modify it 

9# under the terms of the GNU Affero General Public License as published 

10# by the Free Software Foundation; either version 3 of the License, or 

11# (at your option) any later version. 

12# 

13# Castellum is distributed in the hope that it will be useful, but 

14# WITHOUT ANY WARRANTY; without even the implied warranty of 

15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

16# Affero General Public License for more details. 

17# 

18# You should have received a copy of the GNU Affero General Public 

19# License along with Castellum. If not, see 

20# <http://www.gnu.org/licenses/>. 

21 

22import itertools 

23import json 

24import os 

25 

26from django import forms 

27from django.core.exceptions import ObjectDoesNotExist 

28from django.db import models 

29from django.utils.translation import gettext_lazy as _ 

30 

31from castellum.studies.models import Study 

32from castellum.utils.forms import BaseImportForm 

33from castellum.utils.forms import DateTimeField 

34from castellum.utils.forms import DisabledChoiceField 

35from castellum.utils.forms import DisabledModelChoiceField 

36 

37from .models import Appointment 

38from .models import AttributeDescription 

39from .models import AttributeSet 

40from .models import ParticipationRequest 

41from .models import SubjectFilter 

42from .models.appointments import MINUTE 

43from .models.attributesets import ANSWER_DECLINED 

44from .models.attributesets import UNCATEGORIZED 

45 

46APP_DIR = os.path.dirname(__file__) 

47IMPORT_SCHEMA = json.load(open(os.path.join(APP_DIR, 'schemas', 'attributes.json'))) 

48 

49 

50def get_description_choices(user): 

51 choices = [(None, '---')] 

52 disabled_choices = [] 

53 

54 categories = AttributeDescription.objects.by_category() 

55 

56 for description in categories.pop(UNCATEGORIZED): 

57 choices.append((description.pk, description.filter_label)) 

58 if not user.has_privacy_level(description.privacy_level_read): 58 ↛ 59line 58 didn't jump to line 59, because the condition on line 58 was never true

59 disabled_choices.append(description.pk) 

60 

61 for category, descriptions in categories.items(): 61 ↛ 62line 61 didn't jump to line 62, because the loop on line 61 never started

62 sub = [] 

63 for description in descriptions: 

64 sub.append((description.pk, description.filter_label)) 

65 if not user.has_privacy_level(description.privacy_level_read): 

66 disabled_choices.append(description.pk) 

67 choices.append((category.label, sub)) 

68 

69 return choices, disabled_choices 

70 

71 

72class SubjectFilterAddForm(forms.Form): 

73 description = DisabledModelChoiceField(AttributeDescription.objects.none(), required=False) 

74 

75 def __init__(self, user, *args, **kwargs): 

76 super().__init__(*args, **kwargs) 

77 

78 choices, disabled_choices = get_description_choices(user) 

79 self.fields["description"].choices = choices 

80 self.fields["description"].widget.disabled_choices = disabled_choices 

81 

82 

83class SubjectFilterForm(forms.ModelForm): 

84 class Meta: 

85 model = SubjectFilter 

86 fields = "__all__" 

87 

88 def __init__(self, *args, **kwargs): 

89 super().__init__(*args, **kwargs) 

90 

91 selected_description = self._get_selected_description(**kwargs) 

92 

93 self.fields["value"] = selected_description.field.filter_formfield(required=True) 

94 self.fields["operator"] = forms.ChoiceField( 

95 choices=selected_description.field.available_operators 

96 ) 

97 

98 try: 

99 if self.instance.group.study.status != Study.EDIT: 99 ↛ 100line 99 didn't jump to line 100, because the condition on line 99 was never true

100 self.fields["value"].widget.attrs["disabled"] = True 

101 self.fields["operator"].widget.attrs["disabled"] = True 

102 except ObjectDoesNotExist: 

103 pass 

104 

105 def _get_selected_description(self, instance=None, data=None, prefix=None, **kwargs): 

106 if instance: 

107 return instance.description 

108 

109 key = prefix + '-description' if prefix else 'description' 

110 if data and data[key]: 

111 try: 

112 return AttributeDescription.objects.get(pk=data[key]) 

113 except AttributeDescription.DoesNotExist: 

114 return None 

115 

116 try: 

117 return kwargs['initial']['description'] 

118 except KeyError: 

119 return None 

120 

121 

122class SubjectFilterFormSet(forms.BaseModelFormSet): 

123 def clean(self): 

124 super().clean() 

125 

126 values = [] 

127 for form in self.forms: 

128 if not form.cleaned_data.get('DELETE', False): 

129 values.append(( 

130 form.cleaned_data.get('description'), 

131 form.cleaned_data.get('operator'), 

132 form.cleaned_data.get('value'), 

133 )) 

134 

135 if len(set(values)) != len(values): 

136 raise forms.ValidationError(_( 

137 'There are duplicates in filters. Please change or delete filters!' 

138 ), code='invalid') 

139 

140 

141class AttributeSetForm(forms.ModelForm): 

142 

143 class Meta: 

144 model = AttributeSet 

145 exclude = ('data',) 

146 widgets = { 

147 'study_type_disinterest': forms.CheckboxSelectMultiple(), 

148 } 

149 

150 def __init__(self, instance=None, **kwargs): 

151 if not kwargs.get('initial'): 151 ↛ 155line 151 didn't jump to line 155, because the condition on line 151 was never false

152 kwargs['initial'] = {} 

153 for key, value in instance.get_data().items(): 

154 kwargs['initial'][key] = None if value == ANSWER_DECLINED else value 

155 super().__init__(instance=instance, **kwargs) 

156 

157 def clean(self): 

158 cleaned_data = super().clean() 

159 for key in cleaned_data: 

160 if key + '_answer_declined' in self.data: 160 ↛ 161line 160 didn't jump to line 161, because the condition on line 160 was never true

161 cleaned_data[key] = ANSWER_DECLINED 

162 study_type_disinterest = cleaned_data.pop('study_type_disinterest') 

163 return { 

164 'study_type_disinterest': study_type_disinterest, 

165 'data': cleaned_data, 

166 } 

167 

168 def save(self): 

169 self.instance.data.update(self.cleaned_data['data']) 

170 return super().save() 

171 

172 @classmethod 

173 def factory(cls, user, obj=None): 

174 allowed_descriptions = AttributeDescription.objects.allowed_write(user, obj=obj) 

175 form_fields = {desc.json_key: desc.field.formfield() for desc in allowed_descriptions} 

176 return type('AttributeSetForm', (cls,), form_fields) 

177 

178 

179class ContactForm(forms.ModelForm): 

180 def __init__(self, *args, **kwargs): 

181 super().__init__(*args, **kwargs) 

182 

183 for session, appointment in self.instance.get_appointments(): 

184 key = 'appointment-%i' % session.pk 

185 self.fields[key] = DateTimeField( 

186 label=self.format_session_name(session), 

187 initial=appointment.start if appointment else None, 

188 required=False, 

189 ) 

190 

191 model_status = ParticipationRequest._meta.get_field('status') 

192 self.fields['status'] = DisabledChoiceField( 

193 label=model_status.verbose_name, 

194 choices=model_status.choices, 

195 coerce=int, 

196 ) 

197 if not self.instance.match: 197 ↛ 198line 197 didn't jump to line 198

198 self.fields['status'].widget.disabled_choices = [ 

199 choice for choice, label in ParticipationRequest.STATUS_OPTIONS 

200 if choice != ParticipationRequest.UNSUITABLE 

201 ] 

202 elif self.instance.match == 'incomplete': 202 ↛ 203line 202 didn't jump to line 203, because the condition on line 202 was never true

203 self.fields['status'].widget.disabled_choices = [ParticipationRequest.INVITED] 

204 

205 def format_session_name(self, session): 

206 duration = _('%imin') % session.duration 

207 types = ', '.join(str(t) for t in session.type.order_by('pk')) 

208 if types: 208 ↛ 209line 208 didn't jump to line 209, because the condition on line 208 was never true

209 return '%s (%s) - %s' % (session.name, types, duration) 

210 else: 

211 return '%s - %s' % (session.name, duration) 

212 

213 def clean(self): 

214 cleaned_data = super().clean() 

215 

216 appointments = [] 

217 for session in self.instance.study.studysession_set.all(): 

218 key = 'appointment-%i' % session.pk 

219 start = cleaned_data.get(key) 

220 if start: 

221 appointments.append((session, key, start, start + session.duration * MINUTE)) 

222 

223 for a, b in itertools.combinations(appointments, 2): 

224 __, key1, start1, end1 = a 

225 __, key2, start2, end2 = b 

226 if start1 < end2 and start2 < end1: 

227 self.add_error(key1, _('Appointments must not overlap.')) 

228 self.add_error(key2, _('Appointments must not overlap.')) 

229 break 

230 

231 qs = Appointment.objects\ 

232 .exclude(participant=self.instance)\ 

233 .filter(participant__status=ParticipationRequest.INVITED)\ 

234 .annotate(end=models.ExpressionWrapper( 

235 models.F('start') + models.F('session__duration') * MINUTE, 

236 output_field=models.DateTimeField(), 

237 )) 

238 

239 for session, key, start, end in appointments: 

240 if session.resource and qs.filter( 

241 session__resource=session.resource, end__gt=start, start__lt=end, 

242 ).exists(): 

243 self.add_error(key, _('The required resource is not available at this time')) 

244 

245 def save(self): 

246 pariticipationrequest = super().save() 

247 

248 for session, appointment in pariticipationrequest.get_appointments(): 

249 start = self.cleaned_data.get('appointment-%i' % session.pk) 

250 

251 if appointment: 

252 if not start: 252 ↛ 254line 252 didn't jump to line 254, because the condition on line 252 was never false

253 appointment.delete() 

254 elif start != appointment.start: 

255 appointment.start = start 

256 appointment.save() 

257 elif start: 257 ↛ 248line 257 didn't jump to line 248, because the condition on line 257 was never false

258 pariticipationrequest.appointment_set.create( 

259 session=session, 

260 start=start, 

261 ) 

262 

263 return pariticipationrequest 

264 

265 @property 

266 def appointments(self): 

267 for name in self.fields: 

268 if name.startswith('appointment-'): 

269 yield self[name] 

270 

271 class Meta: 

272 model = ParticipationRequest 

273 fields = ['status', 'followup_date', 'followup_time', 'exclusion_criteria_checked'] 

274 

275 

276class SendMailForm(forms.Form): 

277 batch_size = forms.IntegerField( 

278 min_value=1, label=_('How many subjects do you want to contact?') 

279 ) 

280 

281 

282class CategoryImportForm(BaseImportForm): 

283 schema = IMPORT_SCHEMA 

284 schema_ref = '#/$defs/AttributeCategoryExport' 

285 

286 

287class DescriptionImportForm(BaseImportForm): 

288 schema = IMPORT_SCHEMA 

289 schema_ref = '#/$defs/AttributeDescriptionExport'