Source code for grandchallenge.workstations.models

import json
import logging
import re
from datetime import datetime, timedelta
from functools import cached_property
from urllib.parse import unquote, urljoin

from django.conf import settings
from django.contrib.auth.models import Group
from django.core.exceptions import ObjectDoesNotExist
from django.core.validators import MaxValueValidator, RegexValidator
from django.db import models
from django.db.models import Q
from django.db.models.signals import post_delete
from django.db.transaction import on_commit
from django.dispatch import receiver
from django.urls.resolvers import RoutePattern
from django.utils.text import get_valid_filename
from django_extensions.db.models import TitleSlugDescriptionModel
from guardian.models import GroupObjectPermissionBase, UserObjectPermissionBase
from guardian.shortcuts import assign_perm, remove_perm
from knox.models import AuthToken
from simple_history.models import HistoricalRecords
from stdimage import JPEGField

from grandchallenge.components.backends.docker import Service
from grandchallenge.components.backends.exceptions import ComponentException
from grandchallenge.components.models import ComponentImage
from grandchallenge.components.tasks import (
    preload_interactive_algorithms,
    start_service,
    stop_service,
)
from grandchallenge.core.models import UUIDModel
from grandchallenge.core.storage import (
    get_logo_path,
    protected_s3_storage,
    public_s3_storage,
)
from grandchallenge.core.validators import JSONValidator
from grandchallenge.reader_studies.models import Question, ReaderStudy
from grandchallenge.subdomains.utils import reverse
from grandchallenge.workstations.emails import send_new_feedback_email_to_staff

__doc__ = """
Workstations are used to view, annotate and upload images to grand challenge.
A `workstation admin` is able to upload a ``WorkstationImage``, which is a docker container image.
A ``WorkstationImage`` expose a http and, optionally, a websocket port.
A `workstation user` can then launch a workstation ``Session`` for a particular ``WorkstationImage``.

When a new session is started, a new container instance of the selected ``WorkstationImage`` is lauched on the docker host.
The connection to the container will be proxied, and only accessible to the user that created the session.
The proxy will map the http and websocket connections from the user to the running instance, which is mapped by the container hostname.
The container instance will have the users API token set in the environment, so that it is able to interact with the grand challenge API as this user.
The user is able to stop the container, otherwise it will be terminated after ``maxmium_duration`` is reached.
"""

logger = logging.getLogger(__name__)


[docs] class Workstation(UUIDModel, TitleSlugDescriptionModel): """Store the title and description of a workstation.""" logo = JPEGField( upload_to=get_logo_path, storage=public_s3_storage, variations=settings.STDIMAGE_LOGO_VARIATIONS, ) editors_group = models.OneToOneField( Group, on_delete=models.PROTECT, editable=False, related_name="editors_of_workstation", ) users_group = models.OneToOneField( Group, on_delete=models.PROTECT, editable=False, related_name="users_of_workstation", ) config = models.ForeignKey( "workstation_configs.WorkstationConfig", null=True, blank=True, on_delete=models.SET_NULL, ) public = models.BooleanField( default=False, help_text=( "If True, all logged in users can use this viewer, " "otherwise, only the users group can use this viewer." ), ) class Meta(UUIDModel.Meta, TitleSlugDescriptionModel.Meta): ordering = ("created", "title") @cached_property def active_image(self): """ Returns ------- The desired image version for this workstation or None """ try: return ( self.workstationimage_set.executable_images() .filter(is_desired_version=True) .get() ) except ObjectDoesNotExist: return None def __str__(self): public = " (Public)" if self.public else "" return f"Viewer {self.title}{public}" def get_absolute_url(self): return reverse("workstations:detail", kwargs={"slug": self.slug}) def create_groups(self): self.editors_group = Group.objects.create( name=f"{self._meta.app_label}_{self._meta.model_name}_{self.pk}_editors" ) self.users_group = Group.objects.create( name=f"{self._meta.app_label}_{self._meta.model_name}_{self.pk}_users" )
[docs] def save(self, *args, **kwargs): adding = self._state.adding if adding: self.create_groups() super().save(*args, **kwargs) self.assign_permissions()
def assign_permissions(self): # Allow the editors and users groups to view this workstation assign_perm(f"view_{self._meta.model_name}", self.editors_group, self) assign_perm(f"view_{self._meta.model_name}", self.users_group, self) # Allow the editors to change this workstation assign_perm( f"change_{self._meta.model_name}", self.editors_group, self ) g_reg = Group.objects.get(name=settings.REGISTERED_USERS_GROUP_NAME) if self.public: assign_perm(f"view_{self._meta.model_name}", g_reg, self) else: remove_perm(f"view_{self._meta.model_name}", g_reg, self) def is_editor(self, user): return user.groups.filter(pk=self.editors_group.pk).exists() def add_editor(self, user): return user.groups.add(self.editors_group) def remove_editor(self, user): return user.groups.remove(self.editors_group) def is_user(self, user): return user.groups.filter(pk=self.users_group.pk).exists() def add_user(self, user): return user.groups.add(self.users_group) def remove_user(self, user): return user.groups.remove(self.users_group)
[docs] class WorkstationUserObjectPermission(UserObjectPermissionBase): content_object = models.ForeignKey(Workstation, on_delete=models.CASCADE)
[docs] class WorkstationGroupObjectPermission(GroupObjectPermissionBase): content_object = models.ForeignKey(Workstation, on_delete=models.CASCADE)
[docs] @receiver(post_delete, sender=Workstation) def delete_workstation_groups_hook(*_, instance: Workstation, using, **__): """ Deletes the related groups. We use a signal rather than overriding delete() to catch usages of bulk_delete. """ try: instance.editors_group.delete(using=using) except ObjectDoesNotExist: pass try: instance.users_group.delete(using=using) except ObjectDoesNotExist: pass
[docs] class WorkstationImage(UUIDModel, ComponentImage): """ A ``WorkstationImage`` is a docker container image of a workstation. Parameters ---------- workstation A ``Workstation`` can have multiple ``WorkstationImage``, that represent different versions of a workstation http_port This container will expose a http server on this port websocket_port This container will expose a websocket on this port. Any relative url that starts with ``/mlab4d4c4142`` will be proxied to this port. initial_path The initial path that users will navigate to in order to load the workstation """ SHIM_IMAGE = False workstation = models.ForeignKey(Workstation, on_delete=models.PROTECT) http_port = models.PositiveIntegerField( default=8080, validators=[MaxValueValidator(2**16 - 1)] ) websocket_port = models.PositiveIntegerField( default=4114, validators=[MaxValueValidator(2**16 - 1)] ) initial_path = models.CharField( max_length=256, default="cirrus", validators=[ RegexValidator( regex=r"^(?:[^/][^\s]*)\Z", message="This path is invalid, it must not start with a /", ) ], ) class Meta(UUIDModel.Meta, ComponentImage.Meta): ordering = ("created", "creator") def get_absolute_url(self): return reverse( "workstations:image-detail", kwargs={"slug": self.workstation.slug, "pk": self.pk}, ) @property def import_status_url(self) -> str: return reverse( "workstations:image-import-status-detail", kwargs={"slug": self.workstation.slug, "pk": self.pk}, ) def assign_permissions(self): # Allow the editors group to view this workstation image assign_perm( f"view_{self._meta.model_name}", self.workstation.editors_group, self, ) # Allow the editors to change this workstation image assign_perm( f"change_{self._meta.model_name}", self.workstation.editors_group, self, )
[docs] def save(self, *args, **kwargs): adding = self._state.adding super().save(*args, **kwargs) if adding: self.assign_permissions()
def get_peer_images(self): return WorkstationImage.objects.filter(workstation=self.workstation)
[docs] class WorkstationImageUserObjectPermission(UserObjectPermissionBase): content_object = models.ForeignKey( WorkstationImage, on_delete=models.CASCADE )
[docs] class WorkstationImageGroupObjectPermission(GroupObjectPermissionBase): content_object = models.ForeignKey( WorkstationImage, on_delete=models.CASCADE )
ENV_VARS_SCHEMA = { "$schema": "http://json-schema.org/draft-06/schema", "type": "array", "title": "The Environment Variables Schema", "description": "Defines environment variable names and values", "items": { "$id": "#/items", "type": "object", "title": "The Environment Variable Schema", "description": "Defines an environment variable", "required": ["name", "value"], "additionalProperties": False, "properties": { "name": { "$id": "#/items/properties/name", "type": "string", "title": "The Name Schema", "description": "The name of this environment variable", "default": "ENV_VAR", "pattern": r"^[A-Z0-9\_]+$", "examples": ["ENV_VAR"], }, "value": { "$id": "#/items/properties/value", "type": "string", "title": "The Value Schema", "description": "The value of this environment variable", "default": "env_var_value", "examples": ["env_var_value"], }, }, }, }
[docs] class Session(UUIDModel): """ Tracks who has launched workstation images. The ``WorkstationImage`` will be launched as a ``Service``. The ``Session`` is responsible for starting and stopping the ``Service``. Parameters ---------- status Stores what has happened with the service, is it running, errored, etc? region Stores which region this session runs in creator Who created the session? This is also the only user that should be able to access the launched service. workstation_image The container image that will be launched by this ``Session``. maximum_duration The maximum time that the service can be active before it is terminated user_finished Indicates if the user has chosen to end the session early history The history of this Session """ QUEUED = 0 STARTED = 1 RUNNING = 2 FAILED = 3 STOPPED = 4 # These should match the values in workstations/js/session.js STATUS_CHOICES = ( (QUEUED, "Queued"), (STARTED, "Started"), (RUNNING, "Running"), (FAILED, "Failed"), (STOPPED, "Stopped"), )
[docs] class Region(models.TextChoices): # AWS regions # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html AF_SOUTH_1 = "af-south-1", "Africa (Cape Town)" AP_EAST_1 = "ap-east-1", "Asia Pacific (Hong Kong)" AP_NORTHEAST_1 = "ap-northeast-1", "Asia Pacific (Tokyo)" AP_NORTHEAST_2 = "ap-northeast-2", "Asia Pacific (Seoul)" AP_NORTHEAST_3 = "ap-northeast-3", "Asia Pacific (Osaka-Local)" AP_SOUTH_1 = "ap-south-1", "Asia Pacific (Mumbai)" AP_SOUTHEAST_1 = "ap-southeast-1", "Asia Pacific (Singapore)" AP_SOUTHEAST_2 = "ap-southeast-2", "Asia Pacific (Sydney)" CA_CENTRAL_1 = "ca-central-1", "Canada (Central)" EU_CENTRAL_1 = "eu-central-1", "Europe (Frankfurt)" EU_NORTH_1 = "eu-north-1", "Europe (Stockholm)" EU_SOUTH_1 = "eu-south-1", "Europe (Milan)" EU_WEST_1 = "eu-west-1", "Europe (Ireland)" EU_WEST_2 = "eu-west-2", "Europe (London)" EU_WEST_3 = "eu-west-3", "Europe (Paris)" ME_SOUTH_1 = "me-south-1", "Middle East (Bahrain)" SA_EAST_1 = "sa-east-1", "South America (São Paulo)" US_EAST_1 = "us-east-1", "US East (N. Virginia)" US_EAST_2 = "us-east-2", "US East (Ohio)" US_WEST_1 = "us-west-1", "US West (N. California)" US_WEST_2 = "us-west-2", "US West (Oregon)" # User defined regions EU_NL_1 = "eu-nl-1", "Netherlands (Nijmegen)" EU_NL_2 = "eu-nl-2", "Netherlands (Amsterdam)"
status = models.PositiveSmallIntegerField( choices=STATUS_CHOICES, default=QUEUED, db_index=True ) region = models.CharField( max_length=14, choices=Region.choices, default=Region.EU_NL_1, help_text="Which region is this session available in?", ) creator = models.ForeignKey( settings.AUTH_USER_MODEL, null=True, on_delete=models.SET_NULL ) auth_token = models.ForeignKey( AuthToken, null=True, on_delete=models.SET_NULL ) workstation_image = models.ForeignKey( WorkstationImage, on_delete=models.PROTECT ) maximum_duration = models.DurationField(default=timedelta(minutes=10)) user_finished = models.BooleanField(default=False) logs = models.TextField(editable=False, blank=True) ping_times = models.JSONField(null=True, default=None) history = HistoricalRecords( excluded_fields=["logs", "ping_times", "auth_token"] ) extra_env_vars = models.JSONField( default=list, blank=True, help_text="Extra environment variables to include in this session", validators=[JSONValidator(schema=ENV_VARS_SCHEMA)], ) class Meta(UUIDModel.Meta): ordering = ("created", "creator") def __str__(self): return f"Session {self.pk}" @property def task_kwargs(self) -> dict: """ Returns ------- The kwargs that need to be passed to celery to get this object """ return { "app_label": self._meta.app_label, "model_name": self._meta.model_name, "pk": self.pk, } @property def hostname(self) -> str: """ Returns ------- The unique hostname for this session """ return ( f"{self.pk}-{self._meta.model_name}-{self._meta.app_label}".lower() ) @property def expires_at(self) -> datetime: """ Returns ------- The time when this session expires. """ return self.created + self.maximum_duration @property def environment(self) -> dict: """ Returns ------- The environment variables that should be set on the container. """ env = {var["name"]: var["value"] for var in self.extra_env_vars} env.update( { "GRAND_CHALLENGE_API_ROOT": unquote(reverse("api:api-root")), "WORKSTATION_SENTRY_DSN": settings.WORKSTATION_SENTRY_DSN, "WORKSTATION_SESSION_ID": str(self.pk), "CIRRUS_KEEP_ALIVE_METHOD": "old", "AWS_DEFAULT_REGION": str(self.region), "INTERACTIVE_ALGORITHMS_LAMBDA_FUNCTIONS": json.dumps( settings.INTERACTIVE_ALGORITHMS_LAMBDA_FUNCTIONS ), } ) if self.creator: if self.auth_token: self.auth_token.delete() duration_limit = timedelta( seconds=settings.WORKSTATIONS_SESSION_DURATION_LIMIT ) + timedelta(minutes=settings.WORKSTATIONS_GRACE_MINUTES) auth_token, token = AuthToken.objects.create( user=self.creator, expiry=duration_limit ) self.auth_token = auth_token self.save() env.update({"GRAND_CHALLENGE_AUTHORIZATION": f"Bearer {token}"}) if settings.DEBUG: # Allow the container to communicate with the dev environment env.update({"GRAND_CHALLENGE_UNSAFE": "true"}) return env @property def service(self) -> Service: """ Returns ------- The service for this session, could be active or inactive. """ return Service( job_id=f"{self._meta.app_label}-{self._meta.model_name}-{self.pk}", exec_image_repo_tag=self.workstation_image.original_repo_tag, memory_limit=settings.COMPONENTS_MEMORY_LIMIT, ) @property def workstation_url(self) -> str: """ Returns ------- The url that users will use to access the workstation instance. """ return urljoin( self.get_absolute_url(), self.workstation_image.initial_path )
[docs] def start(self) -> None: """ Starts the service for this session, ensuring that the ``workstation_image`` is ready to be used and that ``WORKSTATIONS_MAXIMUM_SESSIONS`` has not been reached in this region. Raises ------ ComponentException If the service cannot be started. """ try: if not self.workstation_image.can_execute: raise ComponentException("Workstation image was not ready") if ( Session.objects.all() .filter( status__in=[Session.RUNNING, Session.STARTED], region=self.region, ) .count() >= settings.WORKSTATIONS_MAXIMUM_SESSIONS ): raise ComponentException("Too many sessions are running") self.service.start( http_port=self.workstation_image.http_port, websocket_port=self.workstation_image.websocket_port, hostname=self.hostname, environment=self.environment, ) self.update_status(status=self.STARTED) except Exception: self.update_status(status=self.FAILED) raise
[docs] def stop(self) -> None: """Stop the service for this session, cleaning up all of the containers.""" self.logs = self.service.logs() self.service.stop_and_cleanup() self.update_status(status=self.STOPPED) if self.auth_token: self.auth_token.delete()
[docs] def update_status(self, *, status: STATUS_CHOICES) -> None: """ Updates the status of this session. Parameters ---------- status The new status for this session. """ self.status = status self.save()
def get_absolute_url(self): return reverse( "session-detail", kwargs={ "slug": self.workstation_image.workstation.slug, "pk": self.pk, "rendering_subdomain": self.region, }, ) @property def api_url(self) -> str: return reverse("api:session-detail", kwargs={"pk": self.pk}) def assign_permissions(self): # Allow the editors group to view and change this session assign_perm( f"view_{self._meta.model_name}", self.workstation_image.workstation.editors_group, self, ) assign_perm( f"change_{self._meta.model_name}", self.workstation_image.workstation.editors_group, self, ) # Allow the session creator to view or change this assign_perm(f"view_{self._meta.model_name}", self.creator, self) assign_perm(f"change_{self._meta.model_name}", self.creator, self)
[docs] def save(self, *args, **kwargs) -> None: """Save the session instance, starting or stopping the service if needed.""" created = self._state.adding if created and not self.region: # Launch in the first active region if no preference set self.region = settings.WORKSTATIONS_ACTIVE_REGIONS[0] super().save(*args, **kwargs) if created: self.assign_permissions() on_commit( start_service.signature( kwargs=self.task_kwargs, queue=f"workstations-{self.region}", ).apply_async ) elif self.user_finished and self.status != self.STOPPED: on_commit( stop_service.signature( kwargs=self.task_kwargs, queue=f"workstations-{self.region}", ).apply_async )
def handle_reader_study_switching(self, *, workstation_path): reader_study_pattern = RoutePattern( f"{settings.WORKSTATIONS_READY_STUDY_PATH_PARAM}/<uuid:pk>" ) display_set_pattern = RoutePattern( f"{settings.WORKSTATIONS_DISPLAY_SET_PATH_PARAM}/<uuid:pk>" ) if match := re.match(reader_study_pattern.regex, workstation_path): lookup = Q(pk=match.groupdict()["pk"]) elif match := re.match(display_set_pattern.regex, workstation_path): lookup = Q(display_sets__pk=match.groupdict()["pk"]) else: # Not a reader study path return reader_study = ReaderStudy.objects.get(lookup) reader_study.workstation_sessions.add(self) if ( Question.objects.filter(reader_study=reader_study) .exclude(interactive_algorithm="") .exists() ): on_commit( preload_interactive_algorithms.signature( queue=f"workstations-{self.region}" ).apply_async )
[docs] class SessionUserObjectPermission(UserObjectPermissionBase): content_object = models.ForeignKey(Session, on_delete=models.CASCADE)
[docs] class SessionGroupObjectPermission(GroupObjectPermissionBase): content_object = models.ForeignKey(Session, on_delete=models.CASCADE)
def feedback_screenshot_filepath(instance, filename): return ( f"session-feedback/" f"{instance.pk}/" f"{get_valid_filename(filename)}" )
[docs] class Feedback(UUIDModel): session = models.ForeignKey(Session, on_delete=models.CASCADE) screenshot = models.ImageField( upload_to=feedback_screenshot_filepath, storage=protected_s3_storage, blank=True, ) user_comment = models.TextField() context = models.JSONField(null=True, blank=True)
[docs] def save(self, *args, **kwargs) -> None: adding = self._state.adding super().save(*args, **kwargs) if adding: self.assign_permissions() send_new_feedback_email_to_staff(feedback=self)
def assign_permissions(self): assign_perm( f"view_{self._meta.model_name}", self.session.creator, self )
[docs] class FeedbackUserObjectPermission(UserObjectPermissionBase): content_object = models.ForeignKey(Feedback, on_delete=models.CASCADE)
[docs] class FeedbackGroupObjectPermission(GroupObjectPermissionBase): content_object = models.ForeignKey(Feedback, on_delete=models.CASCADE)