Skip to the content.

Realtime comments on live video

Requirements and User stories

Functional requirements

Non functional requirements

Calculation

Data model

type User struct {
	UserID int64
	Name   string
	Email  string
	...
}
type Video struct {
	VideoID int32
	UserID  int64 // the streamer
	Title   string
}
type Comment struct {
	UserID    int64 // the viewer who publish a comment.
	VideoID   int32 // signed int32 can handle 2 billion unique IDs.
	CommentID int32
	Text      string // content of the comment.
	Timestamp Time
}

APIs

Subscribe to a video

An authenticated and authorized user needs to subscribe to a video in order to view/publish the realtime comments. Accept:text/event-stream header is telling server that we want to use SSE over HTTP. After a user subscribes a video, the backend service will maintain a map so server knows where to broadcast the realtime comments.

curl -X PUT -H "Accept:text/event-stream" -H "authorization: Bearer <JWT>" \
https://xxx/videos/<video_id>/subscribe

status code: 200 OK
content-type: text/event-stream

# The content-type: text/event-stream HTTP response header indicates that the server established an open connection to
# the event stream to dispatch events to the client. The response event stream contains the live comments.
---
id: 1
event: comment
data: {"comment": "awesome", "user": "sleepybear"}
data: {"comment": "hey there", "user": "winterwolfe"}

Unsubscribe from a video

curl -X DELETE -H "authorization: Bearer <JWT>" https://xxx/videos/<video_id>/unsubscribe/<connection_id>

status code: 200 OK

When a user unsubscribes a video, the backend service will remove the server side map record, so no more realtime comments will be sent to that client.

Publish a comment

curl -X PUT -H "authorization: Bearer <JWT>" -d {"user_id": "xxx", "comment": "Hello from San Jose!"} \
https://xxx/videos/<video_id>/comments

status code: 200 OK

Comparisons of pull-based(client-initiated) vs push-based(server-initiated)

We choose Pull-based model, because of the following reasons:

Long Poling vs SSE vs Websocket

See here for more details.

Comparison of each protocols

Architecture

Data Store

Why do we need a data store

Take YouTube Live stream as the example, when streamer ends the live streaming the video will be persisted along with all the comments/chat history. So if that is the case, we need the data store to at least persist streamed video and the comments. The scope of this design is to focus on storing comments.

Requirements on data store

Capacity planning

UserID    int64  // 8 bytes
VideoID   int32  // 4 bytes
CommentID int32  // 4 bytes
Text      string // 20480 bytes
Timestamp Time   // 8 bytes

Relational database

Let’s use MySQL and PostgreSQL as an example.

More readings:

Conclusion: Relational database might not be an optimal choice

NoSQL database

Let’s take Cassandra as an example.

Conclusion: NoSQL database is more flexible in terms of scalability. Consider using NoSQL to store comments.

Comments can be stored as individual rows within a table. Each of the following properties are mapped to each column:

type Comment struct {
	UserID    int64 // the viewer who publish a comment.
	VideoID   int32 // signed int32 can handle 2 billion unique IDs.
	CommentID int32
	Text      string // content of the comment.
	Timestamp Time
}
CREATE TABLE comments (
    video_id uuid,
    comment_time timestamp,  // Clustering key for creation time ordering
    comment_id uuid,        // Unique identifier for comments
    user_id uuid,
    comment_text text,
    PRIMARY KEY (video_id, comment_time) // video_id is the partition key; comment_time is the cluster key column
);

See this blog to learn more on Cassandra clustering keys.

When we do query from Cassandra, we can do:

select * from comments
where video_id = 'app1'
and comment_time > '2021-08-13T00:00:00';

Handle old comments

Based on our estimate, we have 4 TB / day used for storing comments. That is roughly 1.2 PB storage / year. We definitely need to consider compress or remove old comments.

TODO: How to compress ? How does Netflix compress the view history ?

Cache layer

High Level Design & Workflows

Subscribe & Unsubscribe

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.time.LocalTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Connection {
    String userID;
    Flux<String> flux;
    public Connection(String userID, Flux<String> flux) {
        this.userID = userID;
        this.flux = flux;
    }
}

@RestController
public class SSEController {

    private final Map<String, List<Flux<String>>> videoSubscriptions = new ConcurrentHashMap<>();
    private final Map<String, Flux<String>> activeConnections = new ConcurrentHashMap<>();

    @GetMapping("/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> subscribe() {
        // Generate a unique identifier for the client
        String videoId = <some-code-to-get-video-id-from-request>;
        String userId = <get-userId=from=request>;

        // Create a Flux to send SSE events to the client
        Flux<String> flux = Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> "data: Server time is " + LocalTime.now() + "\n\n")
                .doOnCancel(() -> {
                    // Cleanup resources when the client disconnects
                    System.out.println("Client disconnected: " + userId);
                    // Remove the disconnected client from the registry
                    activeConnections.get(userId).close();
                });

        // Add the new connection to the registry
        videoSubscriptions.get(videoId).add(flux);
        activeConnections.put(userId, flux);

        return flux;
    }

    // Example method to broadcast a message to all active connections
    public void broadcastMessage(String message) {
        // Iterate through the active connections and send the message
        videoSubscriptions.forEach((videoId, connections) -> {
            for (Flux<String> f : connections) {
                // Add the broadcast logic here
                // clean up the close connection
            }
        });
    }
}

Realtime comments distribution

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/comments")
public class CommentsController {

    private final SSEController sseController;

    @Autowired
    public EventController(SSEController sseController) {
        this.sseController = sseController;
    }

    @PostMapping
    public void handleEvent(@RequestBody String eventData) {
        // Process the received event, perform any necessary logic

        // Broadcast a message to all connected clients
        sseController.broadcastMessage("New Event: " + eventData);
    }
}

Failure handling

Frontend server failure

SSE connection is initiated by client, no matter what causes the disconnection server cannot reestablish the connection.

When a frontend server node is down, we lose all the connections between clients to that frontend server. Dispatcher will continue sending messages/events/comments to that frontend server node (but cannot reach).

Dispatcher failure

Dispatcher can be stateless, we pulled out the <video_id>:set<frontend_node>out to Redis. That Redis is shared among all dispatchers. Backend server can retry another dispatcher. The only drawback is latency.

TODO What will happen if API Gateway fails

TODO What will happen if GLB fails

Scaling

Multi AZ support

Challenges

Solution - 1

Solution - 2

Whenever a local dispatcher receives an event, it also sends the event to other dispatchers in other AZs. (This is what Linkedin uses) The downside is that az-apec-1 has no subscriptions on video-123, it will always receive the events, this increases the load of dispatcher.

References