Posts

Producer Consumer Problem in java

Mar 10, 2024
1169 words
6 mins
0 views

    What is the Producer-Consumer Problem?

    The Producer-Consumer Problem, also known as the Bounded Buffer Problem, entails multiple threads operating as either producers or consumers, sharing a common buffer (or storage).

    1. The Producer generates an item and adds it to the shared buffer.
    2. The Consumer retrieves an item from the same buffer for consumption.

    Accessing a buffer without synchronization methods can lead to data corruption, race conditions, and undefined behavior. Race conditions occur when multiple threads access shared resources concurrently without proper synchronization, leading to unpredictable outcomes. In the context of buffer access, this means that one thread might be reading from or writing to the buffer while another thread is also attempting to read from or write to it simultaneously. This can result in interleaved or partial data being read or written, leading to corrupted or inconsistent data.

    In extreme cases, such as when multiple threads attempt to modify the same data simultaneously, it can lead to a situation where the data becomes completely corrupted or nonsensical. This is commonly referred to as “data corruption” or “data integrity issues.”

    To avoid such problems, synchronization mechanisms such as locks, semaphores, or mutexes are used to coordinate access to shared resources like buffers among multiple threads. These mechanisms ensure that only one thread can access the buffer at a time, preventing race conditions and maintaining data integrity.

    In our case, we will use semaphores to fix this issue.

    My Implementation of Producer-Consumer in Java

    First of all, I added a few unique things to this problem which is:

    1. Producer generation rate of products controlled by a function that decreases the sleep time of the thread by the increase of the consumer threads count.

    2. Manager thread to control Producer generation of a product by pausing Producer thread when the buffer reaches max size and continuing after the buffer is about to be empty.

    Now let’s start with coding in Java

    Semaphore class

    You can skip this and use the Java default Semaphore class

    java
    	public class Semaphore {
    	    private int count;
    	
    	    public Semaphore(int count) {
    	        this.count = count;
    	    }
    	
    	    public synchronized void acquire() throws InterruptedException {
    	        while (count <= 0) {
    	            wait();
    	        }
    	        count--;
    	    }
    	
    	    public synchronized void release() {
    	        count++;
    	        notify();
    	    }
    	}

    this class will help us to synchronize the thread’s access to the critical section which is the buffer.

    Stock class

    This is the buffer class I named it stock and we will store it the products (just random numbers) generated by the Producer thread in it.

    java
    	public class Stock {
    	    public static int MAXIMUM_STOCK = 40;
    	    public static int MINIMUM_STOCK = 10;
    	    private final List<Integer> storage = new ArrayList<>();
    	
    	    public void Store(int item) {
    	        storage.add(item);
    	    }
    	
    	    public int Retrieve() {
    	        return storage.removeFirst();
    	    }
    	
    	    public int getSize() {
    	        return storage.size();
    	    }
    	
    	    public boolean isMaximumStock() {
    	        return storage.size() >= MAXIMUM_STOCK;
    	    }
    	
    	    public boolean isMinimumStock() {
    	        return storage.size() <= MINIMUM_STOCK;
    	    }
    	}

    Factory Thread

    The factory thread will generate products and it’s will be stored in stock.

    java
    	public class Factory implements Runnable {
    	    private boolean productionStatus = true;
    	
    	    public boolean getProductionStatus() {
    	        return productionStatus;
    	    }
    	
    	    public void setProductionStatus(boolean x) {
    	        this.productionStatus = x;
    	    }
    	
    	    private int Produce() {
    	        return Utils.random.nextInt(10000);
    	    }
    	
    	    @Override
    	    public void run() {
    	        while (true) {
    	            if (this.productionStatus) {
    	                try {
    	                    Manager.empty.acquire();
    	                } catch (InterruptedException ignored) {}
    	
    	                try {
    	                    Manager.mutex.acquire();
    	
    	                    //Critical Section
    	                    int product = Produce();
    	                    Manager.getStock().Store(product);
    	
    	                    Debug.out("Factory Produced a Product (" + product + ").", Debug.Color.GREEN);
    	                    Debug.out("stock: " + Manager.getStock().getSize(), Debug.Color.YELLOW);
    	
    	                    Manager.mutex.release();
    	                    Manager.full.release();
    	                } catch (InterruptedException e) {
    	                    throw new RuntimeException(e);
    	                }
    	
    	                try {
    	                    Thread.sleep(TimingCalculator(Client.getCount()));
    	                } catch (InterruptedException ignored) {
    	                }
    	
    	            }
    	        }
    	    }
    	}

    and don’t forget the util class that has the function that controls the producer generation rate.

    java
    	public class Utils {
    	    public static Random random = new Random();
    	
    	    public static int TimingCalculator(int input) {
    	        int initialMaxTime = 500;
    	        int minimalTime = 25;
    	        double factor = 1.5f;
    	        return Math.max((int) (initialMaxTime / Math.pow(factor, input - 1)), minimalTime);
    	    }
    	}

    Client Thread

    The Client threads will consume the product from the stock.

    java
    	public class Client implements Runnable {
    	    private static int count = 0;
    	    private final int id;
    	
    	    public Client() {
    	        this.id = ++count;
    	    }
    	
    	    public static int getCount() {
    	        return Client.count;
    	    }
    	
    	    @Override
    	    public void run() {
    	        while (true) {
    	            try {
    	                Manager.full.acquire();
    	                Manager.mutex.acquire();
    	
    	                //Critical Section
    	                Debug.out("Client-" + id + " consumed a Product (" + Manager.getStock().Retrieve() + ").", Debug.Color.RED);
    	                Debug.out("Stock: " + Manager.getStock().getSize(), Debug.Color.YELLOW);
    	
    	                Manager.mutex.release();
    	                Manager.empty.release();
    	
    	                Thread.sleep(1000);
    	            } catch (InterruptedException e) {
    	                throw new RuntimeException(e);
    	            }
    	        }
    	    }
    	}

    Now for the most important thread in my implementation

    Manager Thread

    This thread init the semaphores for the synchronization and also stock and factory and will monitor the producer thread status and stock, when the buffer is full it will pause the production and when the buffer is about to be empty it will resume the production.

    java
    	public class Manager implements Runnable {
    	    public static Semaphore mutex = new Semaphore(1);
    	    public static Semaphore empty = new Semaphore(Stock.MAXIMUM_STOCK);
    	    public static Semaphore full = new Semaphore(0);
    	
    	    private static final Stock stock = new Stock();
    	    private static final Factory factory = new Factory();
    	    private static Thread factoryThread;
    	
    	    public Manager() {
    	        Manager.factoryThread = new Thread(factory);
    	    }
    	
    	    public static Stock getStock() {
    	        return stock;
    	    }
    	
    	    public static Thread getFactoryThread() {
    	        return factoryThread;
    	    }
    	
    	    @Override
    	    public void run() {
    	        while (true) {
    	            if (factory.getProductionStatus()) {
    	                if (stock.isMaximumStock()) {
    	                    factoryThread.interrupt();
    	                    factory.setProductionStatus(false);
    	                    Debug.out("Factory Paused⏸️ Production", Debug.Color.CYAN);
    	                }
    	            } else if (stock.isMinimumStock()) {
    	                factory.setProductionStatus(true);
    	                factoryThread = (new Thread(factory));
    	                factoryThread.start();
    	                Debug.out("Factory Resumed▶️ Production", Debug.Color.CYAN);
    	            }
    	
    	
    	            try {
    	                Thread.sleep(0);
    	            } catch (InterruptedException e) {
    	                throw new RuntimeException(e);
    	            }
    	        }
    	    }
    	}

    Finally thing is set

    Main Class

    After you implement every class and thread when executing the main function.

    java
    	public class Main {
    	    public static Scanner read = new Scanner(System.in);
    	
    	    public static void main(String[] args) throws InterruptedException {
    	        //Create a Manager object for maintaining
    	        Manager manager = new Manager();
    	
    	        //Get the number of clients
    	        Debug.out("Enter number of clients: ");
    	        int numClients = read.nextInt();
    	
    	        //Display Speed/Wait Time of the Factory
    	        Debug.out("Factory Timing: " + TimingCalculator(numClients) + "ms", Debug.Color.CYAN);
    	
    	        //Create client objects and threads
    	        Client[] clients = new Client[numClients];
    	        Thread[] clientThreads = new Thread[numClients];
    	        for (int i = 0; i < numClients; i++) {
    	            clients[i] = new Client();
    	            clientThreads[i] = new Thread(clients[i]);
    	        }
    	
    	        //Get Factory thread from Manager and create Manager thread
    	        Thread factoryThread = Manager.getFactoryThread();
    	        Thread managerThread = new Thread(manager);
    	
    	        //Start Threads
    	        managerThread.start();
    	        factoryThread.start();
    	        for (Thread clientThread : clientThreads) {
    	            clientThread.start();
    	        }
    	
    	        //wait for Threads to complete
    	        managerThread.join();
    	        factoryThread.join();
    	        for (Thread clientThread : clientThreads) {
    	            clientThread.join();
    	        }
    	    }
    	}

    The output should be like this:

    bash
    	Enter number of clients:
    	5
    	Factory Timing: 98ms
    	Factory Produced a Product (5810).
    	stock: 1
    	Client-1 consumed a Product (5810).
    	Stock: 0
    	Factory Produced a Product (1132).

    When the stock fills up:

    bash
    	Factory Produced a Product (7226).
    	stock: 39
    	Factory Produced a Product (5621).
    	stock: 40
    	Factory Paused⏸️ Production
    	Client-1 consumed a Product (9236).
    	Stock: 39
    	Client-3 consumed a Product (8003).
    	Stock: 38
    	Client-2 consumed a Product (4937).
    	Stock: 37

    When the stock is about to run out:

    bash
    	Factory Produced a Product (7226).
    	stock: 39
    	Factory Produced a Product (5621).
    	stock: 40
    	Factory Paused⏸️ Production
    	Client-1 consumed a Product (9236).
    	Stock: 39
    	Client-3 consumed a Product (8003).
    	Stock: 38
    	Client-2 consumed a Product (4937).
    	Stock: 37

    Full Java implementation is here on my repo Producer-Consumer-Java


    Prev Introducing my blog
    Found a mistake?

    Every post is a Markdown file so contributing is simple as following the link below and pressing the pencil icon inside GitHub to edit it.

    Edit on GitHub