123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- from django.contrib.auth import get_user_model, update_session_auth_hash
- from django.contrib.auth.tokens import default_token_generator
- from django.utils.timezone import now
- from rest_framework import generics, status, views, viewsets
- from rest_framework.decorators import action
- from rest_framework.exceptions import NotFound
- from rest_framework.response import Response
- from djoser import signals, utils
- from djoser.compat import get_user_email
- from djoser.conf import settings
- User = get_user_model()
- class TokenCreateView(utils.ActionViewMixin, generics.GenericAPIView):
- """
- Use this endpoint to obtain user authentication token.
- """
- serializer_class = settings.SERIALIZERS.token_create
- permission_classes = settings.PERMISSIONS.token_create
- def _action(self, serializer):
- token = utils.login_user(self.request, serializer.user)
- token_serializer_class = settings.SERIALIZERS.token
- return Response(
- data=token_serializer_class(token).data, status=status.HTTP_200_OK
- )
- class TokenDestroyView(views.APIView):
- """
- Use this endpoint to logout user (remove user authentication token).
- """
- permission_classes = settings.PERMISSIONS.token_destroy
- def post(self, request):
- utils.logout_user(request)
- return Response(status=status.HTTP_204_NO_CONTENT)
- class UserViewSet(viewsets.ModelViewSet):
- serializer_class = settings.SERIALIZERS.user
- queryset = User.objects.all()
- permission_classes = settings.PERMISSIONS.user
- token_generator = default_token_generator
- lookup_field = settings.USER_ID_FIELD
- def permission_denied(self, request, **kwargs):
- if (
- settings.HIDE_USERS
- and request.user.is_authenticated
- and self.action in ["update", "partial_update", "list", "retrieve"]
- ):
- raise NotFound()
- super().permission_denied(request, **kwargs)
- def get_queryset(self):
- user = self.request.user
- queryset = super().get_queryset()
- if settings.HIDE_USERS and self.action == "list" and not user.is_staff:
- queryset = queryset.filter(pk=user.pk)
- return queryset
- def get_permissions(self):
- if self.action == "create":
- self.permission_classes = settings.PERMISSIONS.user_create
- elif self.action == "activation":
- self.permission_classes = settings.PERMISSIONS.activation
- elif self.action == "resend_activation":
- self.permission_classes = settings.PERMISSIONS.password_reset
- elif self.action == "list":
- self.permission_classes = settings.PERMISSIONS.user_list
- elif self.action == "reset_password":
- self.permission_classes = settings.PERMISSIONS.password_reset
- elif self.action == "reset_password_confirm":
- self.permission_classes = settings.PERMISSIONS.password_reset_confirm
- elif self.action == "set_password":
- self.permission_classes = settings.PERMISSIONS.set_password
- elif self.action == "set_username":
- self.permission_classes = settings.PERMISSIONS.set_username
- elif self.action == "reset_username":
- self.permission_classes = settings.PERMISSIONS.username_reset
- elif self.action == "reset_username_confirm":
- self.permission_classes = settings.PERMISSIONS.username_reset_confirm
- elif self.action == "destroy" or (
- self.action == "me" and self.request and self.request.method == "DELETE"
- ):
- self.permission_classes = settings.PERMISSIONS.user_delete
- return super().get_permissions()
- def get_serializer_class(self):
- if self.action == "create":
- if settings.USER_CREATE_PASSWORD_RETYPE:
- return settings.SERIALIZERS.user_create_password_retype
- return settings.SERIALIZERS.user_create
- elif self.action == "destroy" or (
- self.action == "me" and self.request and self.request.method == "DELETE"
- ):
- return settings.SERIALIZERS.user_delete
- elif self.action == "activation":
- return settings.SERIALIZERS.activation
- elif self.action == "resend_activation":
- return settings.SERIALIZERS.password_reset
- elif self.action == "reset_password":
- return settings.SERIALIZERS.password_reset
- elif self.action == "reset_password_confirm":
- if settings.PASSWORD_RESET_CONFIRM_RETYPE:
- return settings.SERIALIZERS.password_reset_confirm_retype
- return settings.SERIALIZERS.password_reset_confirm
- elif self.action == "set_password":
- if settings.SET_PASSWORD_RETYPE:
- return settings.SERIALIZERS.set_password_retype
- return settings.SERIALIZERS.set_password
- elif self.action == "set_username":
- if settings.SET_USERNAME_RETYPE:
- return settings.SERIALIZERS.set_username_retype
- return settings.SERIALIZERS.set_username
- elif self.action == "reset_username":
- return settings.SERIALIZERS.username_reset
- elif self.action == "reset_username_confirm":
- if settings.USERNAME_RESET_CONFIRM_RETYPE:
- return settings.SERIALIZERS.username_reset_confirm_retype
- return settings.SERIALIZERS.username_reset_confirm
- elif self.action == "me":
- return settings.SERIALIZERS.current_user
- return self.serializer_class
- def get_instance(self):
- return self.request.user
- def perform_create(self, serializer):
- user = serializer.save()
- signals.user_registered.send(
- sender=self.__class__, user=user, request=self.request
- )
- context = {"user": user}
- to = [get_user_email(user)]
- if settings.SEND_ACTIVATION_EMAIL:
- settings.EMAIL.activation(self.request, context).send(to)
- elif settings.SEND_CONFIRMATION_EMAIL:
- settings.EMAIL.confirmation(self.request, context).send(to)
- def perform_update(self, serializer):
- super().perform_update(serializer)
- user = serializer.instance
- # should we send activation email after update?
- if settings.SEND_ACTIVATION_EMAIL:
- context = {"user": user}
- to = [get_user_email(user)]
- settings.EMAIL.activation(self.request, context).send(to)
- def destroy(self, request, *args, **kwargs):
- instance = self.get_object()
- serializer = self.get_serializer(instance, data=request.data)
- serializer.is_valid(raise_exception=True)
- if instance == request.user:
- utils.logout_user(self.request)
- self.perform_destroy(instance)
- return Response(status=status.HTTP_204_NO_CONTENT)
- @action(["get", "put", "patch", "delete"], detail=False)
- def me(self, request, *args, **kwargs):
- self.get_object = self.get_instance
- if request.method == "GET":
- return self.retrieve(request, *args, **kwargs)
- elif request.method == "PUT":
- return self.update(request, *args, **kwargs)
- elif request.method == "PATCH":
- return self.partial_update(request, *args, **kwargs)
- elif request.method == "DELETE":
- return self.destroy(request, *args, **kwargs)
- @action(["post"], detail=False)
- def activation(self, request, *args, **kwargs):
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- user = serializer.user
- user.is_active = True
- user.save()
- signals.user_activated.send(
- sender=self.__class__, user=user, request=self.request
- )
- if settings.SEND_CONFIRMATION_EMAIL:
- context = {"user": user}
- to = [get_user_email(user)]
- settings.EMAIL.confirmation(self.request, context).send(to)
- return Response(status=status.HTTP_204_NO_CONTENT)
- @action(["post"], detail=False)
- def resend_activation(self, request, *args, **kwargs):
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- user = serializer.get_user(is_active=False)
- if not settings.SEND_ACTIVATION_EMAIL or not user:
- return Response(status=status.HTTP_400_BAD_REQUEST)
- context = {"user": user}
- to = [get_user_email(user)]
- settings.EMAIL.activation(self.request, context).send(to)
- return Response(status=status.HTTP_204_NO_CONTENT)
- @action(["post"], detail=False)
- def set_password(self, request, *args, **kwargs):
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- self.request.user.set_password(serializer.data["new_password"])
- self.request.user.save()
- if settings.PASSWORD_CHANGED_EMAIL_CONFIRMATION:
- context = {"user": self.request.user}
- to = [get_user_email(self.request.user)]
- settings.EMAIL.password_changed_confirmation(self.request, context).send(to)
- if settings.LOGOUT_ON_PASSWORD_CHANGE:
- utils.logout_user(self.request)
- elif settings.CREATE_SESSION_ON_LOGIN:
- update_session_auth_hash(self.request, self.request.user)
- return Response(status=status.HTTP_204_NO_CONTENT)
- @action(["post"], detail=False)
- def reset_password(self, request, *args, **kwargs):
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- user = serializer.get_user()
- if user:
- context = {"user": user}
- to = [get_user_email(user)]
- settings.EMAIL.password_reset(self.request, context).send(to)
- return Response(status=status.HTTP_204_NO_CONTENT)
- @action(["post"], detail=False)
- def reset_password_confirm(self, request, *args, **kwargs):
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- serializer.user.set_password(serializer.data["new_password"])
- if hasattr(serializer.user, "last_login"):
- serializer.user.last_login = now()
- serializer.user.save()
- if settings.PASSWORD_CHANGED_EMAIL_CONFIRMATION:
- context = {"user": serializer.user}
- to = [get_user_email(serializer.user)]
- settings.EMAIL.password_changed_confirmation(self.request, context).send(to)
- return Response(status=status.HTTP_204_NO_CONTENT)
- @action(["post"], detail=False, url_path="set_{}".format(User.USERNAME_FIELD))
- def set_username(self, request, *args, **kwargs):
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- user = self.request.user
- new_username = serializer.data["new_" + User.USERNAME_FIELD]
- setattr(user, User.USERNAME_FIELD, new_username)
- user.save()
- if settings.USERNAME_CHANGED_EMAIL_CONFIRMATION:
- context = {"user": user}
- to = [get_user_email(user)]
- settings.EMAIL.username_changed_confirmation(self.request, context).send(to)
- return Response(status=status.HTTP_204_NO_CONTENT)
- @action(["post"], detail=False, url_path="reset_{}".format(User.USERNAME_FIELD))
- def reset_username(self, request, *args, **kwargs):
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- user = serializer.get_user()
- if user:
- context = {"user": user}
- to = [get_user_email(user)]
- settings.EMAIL.username_reset(self.request, context).send(to)
- return Response(status=status.HTTP_204_NO_CONTENT)
- @action(
- ["post"], detail=False, url_path="reset_{}_confirm".format(User.USERNAME_FIELD)
- )
- def reset_username_confirm(self, request, *args, **kwargs):
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- new_username = serializer.data["new_" + User.USERNAME_FIELD]
- setattr(serializer.user, User.USERNAME_FIELD, new_username)
- if hasattr(serializer.user, "last_login"):
- serializer.user.last_login = now()
- serializer.user.save()
- if settings.USERNAME_CHANGED_EMAIL_CONFIRMATION:
- context = {"user": serializer.user}
- to = [get_user_email(serializer.user)]
- settings.EMAIL.username_changed_confirmation(self.request, context).send(to)
- return Response(status=status.HTTP_204_NO_CONTENT)
|