Building a Realtime Social Network using Django Channels


This post follows on from our previous posts introducing DjangoChannelsRestFramework. Here we will look at how to build a WebSocket server for a realtime social network with the ability for users to subscribe to posts by hashtag.


While this example will focus on a social network, it should be easy to adapt it to any multicast subscription/observation situation. We assume you are familiar with some of the basics of Django Channels, see this tutorial to get up-to-speed on building a simple chat application using Django Channels.



The Post Model


For our social network we are going to need a DB model to represent each Post.


from django.db import models

class Post(models.Model):
    created_at = models.DateTimeField(auto_now_add=True)

    author = models.ForeignKey(
        settings.AUTH_USER_MODEL,
        on_delete=models.CASCADE,
        blank=False,
    )

    body = models.CharField(
        max_length=256,
        blank=False
    )



The WebSocket Consumer


For each open WebSocket connection Django Channels will create an instance of this consumer class.


class LivePostConsumer(
    ListModelMixin,
    RetrieveModelMixin,
    CreateModelMixin,
    PatchModelMixin,
    DeleteModelMixin,
    GenericAsyncAPIConsumer
):

    queryset = models.Post.objects.all()
    serializer_class = serializers.PostSerializer
    permission_classes = (IsAuthenticatedForWrite,)

    def filter_queryset(self, queryset: QuerySet, **kwargs):
        queryset = super().filter_queryset(queryset=queryset, **kwargs)

        # we need to ensure that only the author can edit their posts.
        if kwargs.get('action') == 'list':
            filter = kwargs.get("body_contains", None)
            if filter:
                queryset = queryset.filter(body__icontains=filter)
            # users can list the latest 500 posts
            return queryset.order_by('-created_at')[:500]

        if kwargs.get('action') == 'retrieve':
            return queryset

        # for other actions we can only expose the posts created by this user.
        return queryset.filter(author=self.scope.get("user"))


We are using a custom permission_class that limits users to list and retrieve actions if they are not logged in.


class IsAuthenticatedForWrite(IsAuthenticated):
    async def has_permission(
            self, scope: Dict[str, Any],
            consumer: AsyncConsumer,
            action: str,
            **kwargs
    ) -> bool:
        if action in ('list', 'retrieve'):
            return True
        return await super().has_permission(
            scope,
            consumer,
            action,
            **kwargs
        )


To ensure we extract the users information from their Django session we can use AuthMiddlewareStack provided by Django Channels.


application = ProtocolTypeRouter({
    "websocket": AuthMiddlewareStack(
        URLRouter([
            url(r"^ws/$", consumers.LivePostConsumer),
        ])
    ),
})


We also need to write a PostSerializer, you can use a ModelSerializer from DRF.


class PostSerializer(ModelSerializer):
    class Meta:
        model = Post
        fields = ['created_at', 'author', 'body', 'pk']
        read_only_fields = ['author', 'created_at', 'pk']
    
    def create(self, validated_data):
        validated_data['author'] = self.context.get('scope').get('user')
        return super().create(validated_data)



Using this consumer from your client


The LivePostConsumer will expose a few actions to your frontend client that we can use by sending JSON messages over the WebSocket connection.


To request a list of the latest posts on our network we send a JSON websocket message:


{
   "action": "list", "request_id": 42
}


The consumer will send a message back over the WebSocket:


{
    "action": "list",
    "errors": [],
    "response_status": 200,
    "request_id": 42,
    "data": [
        {
            "author": 4,
            "body": "Check out this new framework... #Django #Python",
            "created_at": "2020-05-31T00:46:33+00:00",
            "pk": 2
        },
        {
            "author": 23,
            "body": "It has been a long day, #BeerTime",
            "created_at": "2020-05-30T00:46:33+00:00",
            "pk": 1
        }
    ]
}


If you look at the above filter_queryset method you will see we have added an option to filter posts:


{
   "action": "list",
   "request_id": 92,
   "body_contains": "python"
}


See our post that explains how to use this consumer to retrieve, create, patch and delete posts.



Subscribing to a hashtag


As part of our social network we would like it to be possible for a user to see live updates to a trending story by subscribing to a hashtag.


To do this we will add some additional methods to our LivePostConsumer.


class LivePostConsumer(...):
    # .. the above filter_queryset goes here
    
    @model_observer(models.Post)
    async def post_change_handler(self, message, observer=None, **kwargs):
        # called when a subscribed item changes
        await self.send_json(message)

    @post_change_handler.groups_for_signal
    def post_change_handler(self, instance: models.Post, **kwargs):
        # DO NOT DO DATABASE QURIES HERE
        # This is called very often through the lifecycle of every instance of a Post model
        for hashtag in re.findall(r"#[a-z0-9]+", instance.body.lower()):
            yield f'-hashtag-{hashtag}'

    @post_change_handler.groups_for_consumer
    def post_change_handler(self, hashtag=None, list=False, **kwargs):
        # This is called when you subscribe/unsubscribe
        if hashtag is not None:
            yield f'-hashtag-#{hashtag}'

    @action()
    async def subscribe_to_hashtag(self, hashtag, **kwargs):
        await self.post_change_handler.subscribe(
            hashtag=hashtag
        )
        return {}, 201

    @action()
    async def unsubscribe_from_hashtag(self, hashtag, **kwargs):
        await self.post_change_handler.unsubscribe(
            hashtag=hashtag
        )
        return {}, 204


To subscribe to a hashtag send:


{
   "action": "subscribe_to_hashtag",
   "request_id": 102,
   "hashtag": "python"
}


To unsubscribe from a hashtag send:


{
   "action": "unsubscribe_from_hashtag",
   "request_id": 103,
   "hashtag": "python"
}


When we are subscribed to a hashtag and a new Post is created that includes this hashtag we will receive a message like this:


{"action": "create", "pk": 23}


DCRF will send these events when a Post is created, updated or deleted. All operations that use the Django ORM will result in these events being sent to the subscribed users. This includes updating models in the Django Admin interface or using Django commands.


Note if an item is updated so that it no longer contains a subscribed hashtag the subscribed user will get a delete message sent to them. As the Post is no longer in the group for the hashtag that they subscribed to.



Improving the message we send to our ws clients


When we are informed of a created/updated Post we need to display this info to the user, so that we don't need to send a retrieve request, we include the Post body directly in the message.


class LivePostConsumer(...):
    # .. the above methods go here

    @post_change_handler.serializer
    def post_change_handler(self, instance: models.Post, action, **kwargs):
        if action == 'delete':
            return {"pk": instance.pk}
        return {
            "pk": instance.pk,
            "data": {"body": instance.body}
        }


With this change in place when a Post is created/updated the message will include the body value directly.


{
  "action": "create",
  "pk": 23,
  "data": {"body": "Hello there...#NewDay"}
}



The full source code for this project can be found on Github.