Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# (c) 2018-2020
2# MPIB <https://www.mpib-berlin.mpg.de/>,
3# MPI-CBS <https://www.cbs.mpg.de/>,
4# MPIP <http://www.psych.mpg.de/>
5#
6# This file is part of Castellum.
7#
8# Castellum is free software; you can redistribute it and/or modify it
9# under the terms of the GNU Affero General Public License as published
10# by the Free Software Foundation; either version 3 of the License, or
11# (at your option) any later version.
12#
13# Castellum is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16# Affero General Public License for more details.
17#
18# You should have received a copy of the GNU Affero General Public
19# License along with Castellum. If not, see
20# <http://www.gnu.org/licenses/>.
22import datetime
23import logging
24import zipfile
26from django.conf import settings
27from django.contrib import messages
28from django.contrib.auth.mixins import LoginRequiredMixin
29from django.core.exceptions import ObjectDoesNotExist
30from django.core.exceptions import PermissionDenied
31from django.db import models
32from django.http import Http404
33from django.http import HttpResponse
34from django.shortcuts import get_object_or_404
35from django.shortcuts import redirect
36from django.template.response import TemplateResponse
37from django.urls import reverse
38from django.utils.functional import cached_property
39from django.utils.translation import gettext_lazy as _
40from django.views.generic import DeleteView
41from django.views.generic import DetailView
42from django.views.generic import FormView
43from django.views.generic import ListView
44from django.views.generic import View
45from django.views.generic.detail import SingleObjectMixin
47from dateutil.relativedelta import relativedelta
49from castellum.castellum_auth.mixins import PermissionRequiredMixin
50from castellum.contacts.forms import ContactForm
51from castellum.contacts.forms import SearchForm
52from castellum.contacts.mixins import BaseContactUpdateView
53from castellum.contacts.models import Address
54from castellum.contacts.models import Contact
55from castellum.recruitment import filter_queries
56from castellum.recruitment.mixins import BaseAttributeSetUpdateView
57from castellum.recruitment.models import AttributeSet
58from castellum.recruitment.models import ParticipationRequest
59from castellum.studies.models import Study
61from .forms import SubjectPrivacyLevelForm
62from .mixins import BaseAdditionalInfoUpdateView
63from .mixins import BaseDataProtectionUpdateView
64from .mixins import SubjectMixin
65from .models import Consent
66from .models import ExportAnswer
67from .models import Subject
69monitoring_logger = logging.getLogger('monitoring.subjects')
72class SubjectSearchView(LoginRequiredMixin, FormView):
73 template_name = 'subjects/subject_search.html'
74 form_class = SearchForm
76 def get_matches(self, search):
77 user = self.request.user
78 contacts = Contact.objects.fuzzy_filter(search)
79 can_view_contact = user.has_perm('contacts.view_contact')
80 for subject in Subject.objects.filter(filter_queries.pk_filter(contacts)):
81 has_privacy_level = user.has_privacy_level(subject.privacy_level)
82 prs = []
83 for pr in subject.participationrequest_set.filter(study__status=Study.EXECUTION):
84 can_access_study = user.has_perm('studies.access_study', obj=pr.study)
85 invited = pr.status == ParticipationRequest.INVITED
86 can_recruit = (
87 has_privacy_level and
88 can_access_study and
89 user.has_perms((
90 'contacts.view_contact',
91 'recruitment.change_participationrequest',
92 ), obj=pr.study)
93 )
94 can_conduct = (
95 has_privacy_level and
96 can_access_study and
97 invited and
98 user.has_perm(
99 'recruitment.view_participation_pseudonyms', obj=pr.study
100 )
101 )
102 can_search = invited and user.has_perm('recruitment.search_execution')
103 if can_recruit or can_conduct or can_search:
104 prs.append((pr, can_recruit, can_conduct))
105 if prs or can_view_contact:
106 yield subject, subject.contact, prs, has_privacy_level
108 def get_context_data(self, **kwargs):
109 context = super().get_context_data(**kwargs)
111 context['count_total'] = Contact.objects.count()
113 start = datetime.date.today() - relativedelta(months=6)
114 context['count_updated'] = Contact.objects.filter(updated_at__gte=start).count()
116 context['count_participated'] = Subject.objects.filter(
117 participationrequest__status=ParticipationRequest.INVITED
118 ).distinct().count()
120 completeness, total = filter_queries.completeness_expr()
121 context['count_complete'] = Subject.objects\
122 .annotate(completeness=completeness)\
123 .filter(completeness=total)\
124 .count()
126 context['count_consent'] = Subject.objects.filter(filter_queries.has_consent()).count()
128 form = kwargs.get('form')
129 if form and form.is_valid():
130 context['matches'] = self.get_matches(form.cleaned_data['search'])
132 if 'privacy_level_form' not in context:
133 context['privacy_level_form'] = SubjectPrivacyLevelForm(user=self.request.user)
134 return context
136 def form_valid(self, form):
137 monitoring_logger.info((
138 'Subject search: search term: "{search}" by {user}'
139 ).format(user=self.request.user.pk, **form.cleaned_data))
141 return self.render_to_response(self.get_context_data(form=form))
143 def post(self, request, *args, **kwargs):
144 if 'privacy_level' not in request.POST:
145 return super().post(request, *args, **kwargs)
147 if not request.user.has_perm('contacts.add_contact'):
148 raise PermissionDenied
150 form = SearchForm(data=request.POST)
151 privacy_level_form = SubjectPrivacyLevelForm(user=self.request.user, data=request.POST)
153 if form.is_valid() and privacy_level_form.is_valid():
154 if form.cleaned_data.get('last_name'):
155 contact = Contact.objects.create(
156 first_name=form.cleaned_data['first_name'],
157 last_name=form.cleaned_data['last_name'],
158 email=form.cleaned_data.get('email'),
159 )
160 contact.subject.privacy_level = privacy_level_form.cleaned_data['privacy_level']
161 contact.subject.save()
163 monitoring_logger.info('SubjectData update: {} by {}'.format(
164 contact.subject_id, self.request.user.pk
165 ))
166 return redirect('subjects:detail', pk=contact.subject.pk)
167 else:
168 messages.error(request, _(
169 'Both first and last name must be provided in order to create a subject!'
170 ))
171 return self.render_to_response(self.get_context_data(
172 form=form,
173 privacy_level_form=privacy_level_form,
174 ))
177class SubjectDetailView(SubjectMixin, PermissionRequiredMixin, DetailView):
178 model = Subject
179 permission_required = 'contacts.view_contact'
180 template_name = 'subjects/subject_detail.html'
181 tab = 'detail'
183 def get_context_data(self, **kwargs):
184 context = super().get_context_data(**kwargs)
186 context['updated_at'] = max(self.object.updated_at, self.object.contact.updated_at)
188 try:
189 context['updated_at'] = max(context['updated_at'], self.object.attributeset.updated_at)
191 completed, total = self.object.attributeset.get_completeness()
192 context['attributeset_completeness'] = '%i%%' % (100 * completed / (total or 1))
193 except AttributeSet.DoesNotExist:
194 pass
196 context['coverletter_exists'] = bool(settings.CASTELLUM_COVERLETTER_TEMPLATE)
198 return context
201class SubjectDeleteView(SubjectMixin, PermissionRequiredMixin, DeleteView):
202 model = Subject
203 permission_required = 'subjects.delete_subject'
204 tab = 'delete'
206 def is_last_guardian(self):
207 if self.object.contact.guardian_of.exists(): 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true
208 for ward in Contact.objects.filter(guardians=self.object.contact):
209 if ward.guardians.count() <= 1:
210 return True
211 else:
212 return False
214 def get_context_data(self, **kwargs):
215 context = super().get_context_data(**kwargs)
216 context['has_participationrequests'] = self.object.participationrequest_set.exists()
217 context['is_last_guardian'] = self.is_last_guardian()
218 return context
220 def get_success_url(self):
221 return reverse('subjects:index')
223 def delete(self, request, *args, **kwargs):
224 self.object = self.get_object()
225 if self.object.participationrequest_set.exists(): 225 ↛ 226line 225 didn't jump to line 226, because the condition on line 225 was never true
226 return self.get(request, *args, **kwargs)
227 monitoring_logger.info('Subject deleted by {}'.format(self.request.user.pk))
228 messages.success(self.request, _('Subject has been deleted.'))
229 return super().delete(request, *args, **kwargs)
232class SubjectExportView(SubjectMixin, PermissionRequiredMixin, DetailView):
233 model = Subject
234 template_name = 'subjects/subject_export.html'
235 permission_required = 'subjects.view_subject'
236 tab = 'export'
238 def get_context_data(self, **kwargs):
239 context = super().get_context_data(**kwargs)
241 if self.object.export_requested:
242 monitoring_logger.info('GDPR-Request export: {} by {}'.format(
243 self.object.pk, self.request.user.pk
244 ))
246 context['dataset_list'] = [self.object, self.object.contact]
248 try:
249 context['dataset_list'].append(self.object.attributeset)
250 except ObjectDoesNotExist:
251 pass
253 for participation_request in self.object.participationrequest_set.all(): 253 ↛ 254line 253 didn't jump to line 254, because the loop on line 253 never started
254 context['dataset_list'].append(participation_request)
256 try:
257 context['dataset_list'].append(self.object.consent)
258 except ObjectDoesNotExist:
259 pass
261 for answer in self.object.exportanswer_set.all(): 261 ↛ 262line 261 didn't jump to line 262, because the loop on line 261 never started
262 context['dataset_list'].append(answer)
264 return context
266 def post(self, request, *args, **kwargs):
267 self.object = self.get_object()
268 if self.object.export_requested:
269 self.object.export_requested = None
270 ExportAnswer.objects.create(subject=self.object, created_by=request.user.username)
271 else:
272 self.object.export_requested = datetime.date.today()
273 self.object.save()
274 return redirect('subjects:export', self.object.pk)
277class SubjectUpdateMixin(SubjectMixin):
278 def get_context_data(self, **kwargs):
279 context = super().get_context_data(**kwargs)
280 context['base_template'] = 'subjects/subject_base.html'
281 return context
283 def form_valid(self, form, *args):
284 messages.success(self.request, _('Data has been saved.'))
285 return super().form_valid(form, *args)
288class DataProtectionUpdateView(SubjectUpdateMixin, BaseDataProtectionUpdateView):
289 tab = 'data-protection'
291 def get_success_url(self):
292 return reverse('subjects:data-protection', args=[self.object.pk])
295class AdditionalInfoUpdateView(SubjectUpdateMixin, BaseAdditionalInfoUpdateView):
296 tab = 'additional-info'
298 def get_success_url(self):
299 return reverse('subjects:additional-info', args=[self.object.pk])
302class ContactUpdateView(SubjectUpdateMixin, BaseContactUpdateView):
303 tab = 'contact'
305 def get_object(self):
306 subject = get_object_or_404(Subject, pk=self.kwargs['pk'])
307 return subject.contact
309 def get_success_url(self):
310 return reverse('subjects:contact', args=[self.object.subject.pk])
313class AttributeSetUpdateView(SubjectUpdateMixin, BaseAttributeSetUpdateView):
314 tab = 'attributeset'
316 def get_object(self):
317 subject = get_object_or_404(Subject, pk=self.kwargs['pk'])
318 attributeset, __ = AttributeSet.objects.get_or_create(
319 subject=subject, defaults={'data': {}}
320 )
321 return attributeset
323 def get_success_url(self):
324 return reverse('subjects:attributeset', args=[self.object.subject.pk])
327class ParticipationListView(SubjectMixin, PermissionRequiredMixin, ListView):
328 tab = 'participations'
329 model = ParticipationRequest
330 template_name = 'subjects/subject_participationlist.html'
331 permission_required = 'contacts.view_contact'
333 @cached_property
334 def subject(self):
335 return get_object_or_404(Subject, pk=self.kwargs['pk'])
337 def is_permitted(self, study):
338 return self.request.user.has_perm('studies.access_study', obj=study) and (
339 self.request.user.has_perm('recruitment.view_participation_pseudonyms', obj=study) or
340 self.request.user.has_perm('recruitment.change_participationrequest', obj=study)
341 )
343 def get_queryset(self):
344 queryset = ParticipationRequest.objects\
345 .filter(subject=self.subject, study__status=Study.EXECUTION)\
346 .order_by('-updated_at')
347 return [
348 participation_request for participation_request in queryset
349 if self.is_permitted(participation_request.study)
350 ]
352 def get_context_data(self, **kwargs):
353 context = super().get_context_data(**kwargs)
354 context['finished_list'] = ParticipationRequest.objects\
355 .filter(subject=self.subject, study__status=Study.FINISHED)\
356 .order_by('-updated_at')
357 return context
360class ParticipationDeleteView(SubjectMixin, PermissionRequiredMixin, DeleteView):
361 tab = 'delete'
362 model = ParticipationRequest
363 permission_required = ['studies.access_study', 'recruitment.delete_participationrequest']
365 def get_object(self):
366 return get_object_or_404(
367 ParticipationRequest, pk=self.kwargs['pk'], subject_id=self.kwargs['subject_pk']
368 )
370 def get_success_url(self):
371 return reverse('subjects:delete', args=[self.subject.pk])
373 def delete(self, request, *args, **kwargs):
374 self.object = self.get_object()
375 monitoring_logger.info('ParticipationRequest in Study {} deleted by {}'.format(
376 self.object.study.name, self.request.user.pk
377 ))
378 return super().delete(self, request, *args, **kwargs)
381class AddToStudyView(SubjectMixin, PermissionRequiredMixin, DetailView):
382 model = Subject
383 template_name = 'subjects/subject_add_to_study.html'
384 permission_required = 'contacts.view_contact'
385 tab = 'participations'
387 def get_studies(self, subject):
388 perms = ('recruitment.add_participationrequest', 'studies.access_study')
389 studies = (
390 Study.objects
391 .filter(status=Study.EXECUTION)
392 .exclude(participationrequest__subject=subject)
393 )
394 for study in studies:
395 if ( 395 ↛ 394line 395 didn't jump to line 394
396 self.request.user.has_perms(perms, obj=study) and
397 Subject.objects.filter(filter_queries.study(study), pk=subject.pk).exists()
398 ):
399 yield study
401 def get_context_data(self, **kwargs):
402 context = super().get_context_data(**kwargs)
404 try:
405 context['attributeset'] = self.object.attributeset
406 context['studies'] = list(self.get_studies(self.object))
407 except AttributeSet.DoesNotExist:
408 pass
410 return context
412 def post(self, request, *args, **kwargs):
413 self.object = self.get_object()
414 study = get_object_or_404(Study, pk=request.POST['study'], status=Study.EXECUTION)
416 perms = ('recruitment.add_participationrequest', 'studies.access_study')
417 if not request.user.has_perms(perms, obj=study):
418 raise PermissionDenied
420 participation_request = ParticipationRequest.objects.create(
421 subject=self.object, study=study
422 )
424 url = reverse('recruitment:contact', args=[study.pk, participation_request.pk])
425 return redirect(url + '?context=subjects:participation-list')
428class AddToFinishedStudyView(SubjectMixin, PermissionRequiredMixin, DetailView):
429 model = Subject
430 template_name = 'subjects/subject_add_to_study.html'
431 permission_required = 'subjects.change_subject'
432 tab = 'participations'
434 def get_studies(self, subject):
435 return (
436 Study.objects
437 .filter(status=Study.FINISHED)
438 .exclude(participationrequest__subject=subject)
439 )
441 def get_context_data(self, **kwargs):
442 context = super().get_context_data(**kwargs)
443 context['attributeset'] = True
444 context['studies'] = list(self.get_studies(self.object))
445 return context
447 def post(self, request, *args, **kwargs):
448 self.object = self.get_object()
449 study = get_object_or_404(Study, pk=request.POST['study'], status=Study.FINISHED)
451 ParticipationRequest.objects.create(
452 subject=self.object, study=study, status=ParticipationRequest.INVITED
453 )
455 return redirect('subjects:participation-list', self.object.pk)
458class GuardianSearchView(PermissionRequiredMixin, SubjectSearchView):
459 template_name = 'subjects/guardian_search.html'
460 permission_required = 'contacts.view_contact'
461 nochrome = True
463 def get_matches(self, search):
464 contacts = Contact.objects.fuzzy_filter(search)
465 return Subject.objects.filter(filter_queries.pk_filter(contacts))
467 def get_context_data(self, **kwargs):
468 context = super().get_context_data(**kwargs)
469 form = kwargs.get('form')
470 if form and form.is_valid() and 'contact_form' not in context:
471 context['contact_form'] = ContactForm(
472 user=self.request.user, initial=form.cleaned_data, is_guardian=True
473 )
474 return context
476 def post(self, request, *args, **kwargs):
477 if 'privacy_level' not in request.POST:
478 return super().post(request, *args, **kwargs)
480 if not request.user.has_perm('contacts.add_contact'):
481 raise PermissionDenied
483 form = SearchForm(data=request.POST)
484 privacy_level_form = SubjectPrivacyLevelForm(user=self.request.user, data=request.POST)
485 contact_form = ContactForm(user=self.request.user, data=request.POST, is_guardian=True)
487 if form.is_valid() and privacy_level_form.is_valid() and contact_form.is_valid(): 487 ↛ 499line 487 didn't jump to line 499, because the condition on line 487 was never false
488 contact = contact_form.save()
489 contact.subject.privacy_level = privacy_level_form.cleaned_data['privacy_level']
490 contact.subject.save()
492 monitoring_logger.info('SubjectData update: {} by {}'.format(
493 contact.subject_id, self.request.user.pk
494 ))
495 return TemplateResponse(self.request, 'subjects/guardian_success.html', {
496 'name': contact.full_name,
497 'pk': contact.subject.pk,
498 })
499 return self.render_to_response(self.get_context_data(
500 form=form,
501 privacy_level_form=privacy_level_form,
502 contact_form=contact_form,
503 ))
506class MaintenanceAttributesView(PermissionRequiredMixin, ListView):
507 model = Subject
508 paginate_by = 20
509 permission_required = 'subjects.change_subject'
510 template_name = 'subjects/maintenance_attributes.html'
511 tab = 'attributes'
513 def get_queryset(self):
514 completeness, total = filter_queries.completeness_expr()
515 return super().get_queryset()\
516 .exclude(attributeset=None)\
517 .annotate(completeness=completeness)\
518 .filter(filter_queries.has_consent(), completeness__lt=total)\
519 .order_by('completeness')
522class MaintenanceContactView(PermissionRequiredMixin, ListView):
523 model = Subject
524 paginate_by = 20
525 permission_required = 'subjects.change_subject'
526 template_name = 'subjects/maintenance_contact.html'
527 tab = 'contact'
529 def get_queryset(self):
530 contacts = Contact.objects\
531 .annotate(
532 guardian_count=models.Count('guardians'),
533 guardian_of_count=models.Count('guardian_of'),
534 )\
535 .filter(
536 models.Q(
537 guardian_count=0,
538 address=None,
539 email='',
540 phone_number='',
541 ) | models.Q(
542 guardian_of_count=0,
543 date_of_birth=None,
544 )
545 )\
546 .values('subject_id')\
547 .order_by()
548 return super().get_queryset()\
549 .filter(pk__in=[c['subject_id'] for c in contacts])\
550 .order_by('pk')
553class MaintenanceDuplicatesView(PermissionRequiredMixin, ListView):
554 model = Contact
555 paginate_by = 20
556 permission_required = 'subjects.change_subject'
557 template_name = 'subjects/maintenance_duplicates.html'
558 tab = 'duplicates'
560 def get_queryset(self):
561 duplicates = []
563 names = Contact.objects\
564 .values('first_name', 'last_name')\
565 .annotate(count=models.Count('id'))\
566 .filter(count__gt=1)
568 for name in names:
569 date_of_birth_count = Contact.objects.filter(
570 first_name=name['first_name'],
571 last_name=name['last_name'],
572 date_of_birth__isnull=False,
573 ).values('date_of_birth').distinct().count()
575 if date_of_birth_count < name['count']:
576 duplicates.append(name)
578 return duplicates
581class MaintenanceConsentView(PermissionRequiredMixin, ListView):
582 model = Subject
583 paginate_by = 20
584 permission_required = 'subjects.change_subject'
585 template_name = 'subjects/maintenance_consent.html'
586 tab = 'consent'
588 def get_by_consent_from_before_full_age(self):
589 today = datetime.date.today()
591 # date_of_birth is stored in Contact, so we cannot query it
592 # directly. As an optimization, we only look at subjects that
593 # came of age in the past year.
594 contacts = Contact.objects.filter(
595 date_of_birth__lt=today - relativedelta(years=settings.CASTELLUM_FULL_AGE),
596 date_of_birth__gt=today - relativedelta(years=settings.CASTELLUM_FULL_AGE + 1),
597 )
599 pks = [c.subject.pk for c in contacts if c.subject.has_consent_from_before_full_age]
600 return Subject.objects.filter(pk__in=pks)
602 def get_by_outdated_document(self):
603 return Subject.objects\
604 .exclude(filter_queries.has_consent(exclude_deprecated=True))\
605 .filter(consent__status=Consent.CONFIRMED)
607 def get_queryset(self):
608 return Subject.objects.none().union(
609 self.get_by_consent_from_before_full_age(),
610 self.get_by_outdated_document(),
611 )
614class MaintenanceWaitingView(PermissionRequiredMixin, ListView):
615 model = Subject
616 paginate_by = 20
617 permission_required = 'subjects.change_subject'
618 template_name = 'subjects/maintenance_waiting.html'
619 tab = 'waiting'
621 def get_queryset(self):
622 return super().get_queryset().filter(consent__status=Consent.WAITING)
625class CoverletterView(SubjectMixin, PermissionRequiredMixin, SingleObjectMixin, View):
626 model = Subject
627 permission_required = 'contacts.view_contact'
629 def get_data(self):
630 contact = self.object.contact
631 try:
632 return (contact.full_name, *contact.address.lines())
633 except Address.DoesNotExist:
634 return (contact.full_name, '', '')
636 def patch_document(self, content):
637 name, address1, address2 = self.get_data()
639 return content\
640 .replace(b'{name}', name.encode('utf8'))\
641 .replace(b'{address1}', address1.encode('utf8'))\
642 .replace(b'{address2}', address2.encode('utf8'))
644 def patch_zipfile(self, zin, zout):
645 for name in zin.namelist():
646 with zin.open(name, 'r') as fh:
647 content = fh.read()
648 if name == 'word/document.xml':
649 content = self.patch_document(content)
650 with zout.open(name, 'w') as fh:
651 fh.write(content)
653 def get(self, request, *args, **kwargs):
654 if not settings.CASTELLUM_COVERLETTER_TEMPLATE: 654 ↛ 655line 654 didn't jump to line 655, because the condition on line 654 was never true
655 raise Http404
657 self.object = self.get_object()
659 response = HttpResponse(
660 content_type='application/vnd.openxmlformats-officedocument.wordprocessingml.document',
661 )
663 zresponse = zipfile.ZipFile(response, 'w')
664 with zipfile.ZipFile(settings.CASTELLUM_COVERLETTER_TEMPLATE, 'r') as zin:
665 self.patch_zipfile(zin, zresponse)
667 return response