{x}
blog image

Design Bounded Blocking Queue

Solution Explanation

This problem requires implementing a bounded blocking queue, a data structure that allows a limited number of elements and blocks producers (enqueue operations) when full and consumers (dequeue operations) when empty. We'll leverage semaphores for thread synchronization to achieve this.

Approach

The solution utilizes two semaphores:

  • s1: Represents the number of available slots in the queue. Initialized to the queue's capacity. When a producer wants to add an element, it acquires a permit from s1. If no permits are available (queue is full), the producer blocks until a permit is released (an element is dequeued).
  • s2: Represents the number of elements in the queue. Initialized to 0. When a consumer wants to remove an element, it acquires a permit from s2. If no permits are available (queue is empty), the consumer blocks until a permit is released (an element is enqueued).

A queue (q - implemented as deque in Python, ArrayDeque in Java, and queue in C++) stores the elements.

Operations:

  • enqueue(element): Acquires a permit from s1 (blocking if necessary), adds the element to the queue, and releases a permit on s2 (signaling that an element is available for consumption).
  • dequeue(): Acquires a permit from s2 (blocking if necessary), removes and returns the element from the queue, and releases a permit on s1 (signaling that a slot is available for production).
  • size(): Returns the current number of elements in the queue.

Code Explanation (Python3 as example)

from threading import Semaphore
from collections import deque
 
class BoundedBlockingQueue(object):
    def __init__(self, capacity: int):
        self.s1 = Semaphore(capacity) # Semaphore for available slots
        self.s2 = Semaphore(0)       # Semaphore for available elements
        self.q = deque()             # Queue to store elements
 
    def enqueue(self, element: int) -> None:
        self.s1.acquire()          # Acquire a permit (blocks if queue is full)
        self.q.append(element)     # Add element to queue
        self.s2.release()          # Release a permit (signals element available)
 
    def dequeue(self) -> int:
        self.s2.acquire()          # Acquire a permit (blocks if queue is empty)
        ans = self.q.popleft()    # Remove and get element
        self.s1.release()          # Release a permit (signals slot available)
        return ans
 
    def size(self) -> int:
        return len(self.q)

The Java and C++ code follow the same logic, using their respective implementations of Semaphore and queues.

Time and Space Complexity

  • Time Complexity:

    • enqueue: O(1) - Enqueue operation is constant time because adding to a deque is O(1). Semaphore operations are also considered O(1) on average.
    • dequeue: O(1) - Similar to enqueue, dequeue is constant time because removing from the front of a deque is O(1).
    • size: O(1) - Getting the size of a deque is O(1).
  • Space Complexity: O(N), where N is the capacity of the queue. The space used is proportional to the maximum number of elements the queue can hold.

Note on Thread Safety

Semaphores inherently provide thread safety. The acquire() and release() methods are atomic, ensuring that only one thread can access and modify the semaphore's state at a time, thus preventing race conditions. The queue operations (append, popleft) are also thread-safe within the context of the semaphore protection. The solution ensures that all accesses to the shared queue (q) and semaphores are synchronized, making the bounded blocking queue thread-safe.