Category: multi-threading

Implement a cyclic buffer

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    private final String[] buffer;
    private final int capacity;

    private int front;
    private int rear;
    private int count;

    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int capacity){
        this.capacity = capacity;
        buffer = new String[capacity];
    }

    public void deposit(String data) throws InterruptedException {
        lock.lock();
        try {
            while(count == capacity){
                notFull.await();
            }
            buffer[rear] = data;
            rear = (rear + 1) % capacity;
            count++;
            notEmpty.signal();
        } final {
            lock.unlock();
        }
    }
    public void fetch() throws InterruptedException {
        lock.lock();
        try {
            while(count == 0){
                notEmpty.await();
            }
            String result = buffer[front];
            front = (front + 1) % capacity;
            count--;
            notFull.signal();
            return result;
        } final {
            lock.unlock();
        }
    }
}

Reference: https://baptiste-wicht.com/posts/2010/09/java-concurrency-part-5-monitors-locks-and-conditions.html

Advertisements

Design a blocking queue

By blocking queue it means if the queue is empty, the dequeue thread should be blocked until some other thread enqueue anything. If the queue is full then the enqueue thread gets blocked until the dequeue thread dequeue anything from the queue.

public interface FixedSizeBlockingQueue<E> {

   // only initialize this queue once and throws Exception if the user is trying to initialize it multiple t times.
   public void init(int capacity) throws Exception;

   // throws Exception if the queue is not initialized
   public void push(E obj) throws Exception;

   // throws Exception if the queue is not initialized
   public E pop() throws Exception;

   // implement an atomic putList function which can put a list of object atomically. By atomically it mean the objs in the list should be next to each other in the queue. The size of the list could be larger than the queue capacity.
   // throws Exception if the queue is not initialized
   public void pushList(List<E> objs) throws Exception;
}
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBlockingQueue<E> {
    private int capacity;
    private Queue<E> queue;
    private Lock lock = new ReentrantLock();
    private Lock pushLock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public void init(int capacity) throws Exception {
        lock.lock();
        try {
            if(queue == null){
                queue = new LinkedList<>();
                capacity = capacity;
            } else {
                throw new Exception();
            }
        } finally {
            lock.unlock();
        }
    }

    public void push(E obj) throws Exception {
        pushLock.lock();
        lock.lock();
        try {
            while(capacity == queue.size()){
                notFull.await();
            }
            queue.add(obj);
            notEmpty.signal();
        } finally {
            lock.unlock();
            pushLock.unlock();
        }
    }

    public E pop() throws Exception {
        lock.lock();
        try {
            while(queue.size() == 0){
                notEmpty.await();
            }
            notFull.signal();
            return queue.poll():
        } finally {
            lock.unlock();
        }
    }
    public void pushList(List<E> objs) throws Exception {
        pushLock.lock();
        lock.lock();
        try {
            for(E obj : objs){
                while(queue.size == capacity){
                    notFull.await();
                }
                notEmpty.signal();
                queue.push(obj);
            }
        } finally {
            lock.unlock();
            pushLock.unlock();
        }
    }
}

reference: http://baozitraining.org/blog/design-and-implement-a-blocking-queue/