Skip to the content.

Design a distributed delayed job queueing system

Requirement

Functional requirement

Non-functional requirement

Data model

Task

type Task struct {
  ID string        // globally unique task ID
  Namespace string // the namespace of a task to provide queue isolation
  Delay Time       // the delay when task needs to be executed
  Score int        // the score which could be used to calculate the priority other than time
  Topics string    // for consumers to subscribe
  Payload Payload
}

type Payload struct {
  CallbackFuncName string // for consumers to invoke the callback function
  Args []string // args pass to the callback function
}

Delayed queue APIs

type DelayedQueue interface {
    Push(task Task) error
    Poll() error
    Delete(taskID string, namespace string) error
    Update(task Task) error
}
# Create a new delayed task
curl -X PUT https://endpoint/v1/delayedqueue \
-d {id: 1, namespace: default, delay: 5s, score: 10, payload: {...}}

# Update a task
curl -X POST https://endpoint/v1/delayedqueue \
-d {id: 1, namespace: default, delay: 15s, score: 10, payload: {...}}

# Delete a task
curl -X DELETE https://endpoint/v1/delayedqueue \
-d {id: 1, namespace: default}

# Get a task status
curl -X GET https://endpoint/v1/delayedqueue/status \
-d {id: 1, namespace: default}

Architecture

Meitu delayed queue architecture

meitu-architecture

Airbnb architecture

airbnb-architecture

How to handle data persistence

how-to-handle-data-persistence

How to enforce at-least once semantics

how-to-enforce-at-least-once

How to implement un-schedule feature

How to prevent noisy tasks to occupy all the queue resource (isolation)

How to handle the scaling

scaling

The control plane looks like a LB/ingress controller which provides the round-robin or other advanced L7 routing rules.

How delay queue is implemented in Java’s implementation

How producer adds objects

/**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
private final Condition available = lock.newCondition();

/**
 * Inserts the specified element into this delay queue.
 *
 * @param e the element to add
 * @return {@code true}
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            /*
            if the newly added object becomes the top element:
            - the old leader who is waiting for the second element(now) needs to be reset. we need a new leader to take
              the current top element.
            - available.signal() wakes up the next waiting thread, tells it there is an element might expire now.
             */
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

How consumer poll objects

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                // if current thread sees the current top object is null, then a) release lock, b) step away to wait
                // when it awakes, it will reacquire the lock and continue
                available.await();
            else {
                // if current thread sees the current top is NOT null
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)
                    // and current top object expires, then just return it to caller
                    return q.poll();
                // otherwise the current thread needs to step away and wait
                first = null; // don't retain ref while waiting
                if (leader != null)
                    // if current thread sees there is already a leader waiting in the queue to process the current
                    // top object, then it steps away to wait
                    available.await();
                else {
                    // otherwise promote current thread to be leader who has privilege to handle the current top object
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // current thread becomes the leader and just need to wait "delay" amount of time, then it could
                        // awake and proceed. When it awakes the lock will be reacquired so that no other threads could
                        // enter, and it will continue to the beginning of the loop to take the top object
                        available.awaitNanos(delay);
                    } finally {
                        // if current thread is leader and it is about to process the current top object, resets leader
                        // so that other threads have chance to work
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            // if current leader has processed the top object and the next top element is NOT null, notify next waiting
            // thread to process it
            available.signal();
        lock.unlock();
    }
}

What is leader-follower pattern for consumer threads and why needs it

How multiple consumers check if the top object has expired?

Questions

Caveat

Conclusions

References