Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

P threads programming: producer/consumer problem without synchronization. Write

ID: 3849991 • Letter: P

Question

P threads programming: producer/consumer problem without synchronization. Write a C++ program that simulates the producer/consumer problem without synchronization. The producer thread will fill 7 buffers with numbers 1 to 7. Whenever a number is placed in a buffer, the thread display a message saying, "buffer # has value of #". When producer thread completed processing, the consumer thread will then clear each buffer in sequence and display a message indicating the number of the cleared buffer. No synchronization is required for this problem.

Explanation / Answer

BoundedThreadSafeQueue

Code

#include <iostream>

#include <queue>

#include <boost/thread.hpp>

#include <boost/thread/locks.hpp>

// Queue class that can be used in multithreading context

template <typename T>

class BoundedThreadSafeQueue

{

private:

    std::queue<T> m_queue;            // Use STL queue to store data

    boost::mutex m_mutex;             // The mutex to synchronise on

    boost::condition_variable m_QueueHasData; // The condition to wait for if queue is empty

    boost::condition_variable m_QueueHasRoom; // The condition to wait for if queue is full

    unsigned int m_Size;              // max queue size

public:

    BoundedThreadSafeQueue(unsigned int Size)

        : m_Size(Size)

    {}

    bool Empty()

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        return m_queue.empty();

    }

    bool Full()

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        return m_queue.size() == m_Size;

    }

    // Push new data on the queue and notify other threads

    // waiting for data in this queue

    bool TryPush(const T &data)

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);       

        // Fail if queue full

        if (m_queue.size() == m_Size) return false;

        // Add the data to the queue

        m_queue.push(data);

        // Notify others that data is ready

        m_QueueHasData.notify_one();

        return true;

    } // Lock is automatically released here

    // Try to push data in queue

    // Wait until room in queue

    void Push(const T &data)

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        // While queue is full, wait

        // warning: do not replace m_queue.size() == m_Size with Full():

        // it will deadlock due to trying to acquire the same m_mutex

        // Push has already acquired

        while (m_queue.size() == m_Size) m_QueueHasRoom.wait(lock);

        // Now push the data

        m_queue.push(data);

        // And warn threads that are waiting for data

        m_QueueHasData.notify_one();

    }

    // Get data from the queue.

    // Return false if no data available

    bool TryPop(T &result)

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        // When there is no data, return false

        if (m_queue.size() == 0) return false;

        // Otherwise return the data

        // Retrieve the data from the queue

        result=m_queue.front(); m_queue.pop();

        // Warn threads who are waiting to push data

        m_QueueHasRoom.notify_one();

        return true;

        // Lock is automatically released here

    }

    // Get data from the queue.

    // Wait for data if not available

    void WaitAndPop(T &result)

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        // When there is no data, wait till someone fills it.

        // Lock is automatically released in the wait and obtained

        // again after the wait

        while (m_queue.size() == 0) m_QueueHasData.wait(lock);

        // Retrieve the data from the queue

        result=m_queue.front(); m_queue.pop();

        // Warn threads who are waiting to push data

        m_QueueHasRoom.notify_one();

     } // Lock is automatically released here

};

Test program

int main()

{

    std::cout << "About to construct queue" << std::endl;

    BoundedThreadSafeQueue<int> Q(3);

    std::cout << "About to construct producer" << std::endl;

    Producer<int, BoundedThreadSafeQueue> P(Q, 10, 0);

    std::cout << "About to construct consumer" << std::endl;

    Consumer<int, BoundedThreadSafeQueue> C(Q, 10, 1);

    std::cout << "About to start producer" << std::endl;

    P.Start();

    std::cout << "About to start consumer" << std::endl;

    C.Start();

    std::cout << "Waiting for producer to finish" << std::endl;

    P.WaitUntilFinished();

    std::cout << "Waiting for consumer to finish" << std::endl;

    C.WaitUntilFinished();

    std::cout << "Queue should be empty after all threads finished: " << Q.Empty() << std::endl;

    return 0;

}

Output

Discussion

#include <iostream>

#include <queue>

#include <boost/thread.hpp>

#include <boost/thread/locks.hpp>

// Queue class that can be used in multithreading context

template <typename T>

class BoundedThreadSafeQueue

{

private:

    std::queue<T> m_queue;            // Use STL queue to store data

    boost::mutex m_mutex;             // The mutex to synchronise on

    boost::condition_variable m_QueueHasData; // The condition to wait for if queue is empty

    boost::condition_variable m_QueueHasRoom; // The condition to wait for if queue is full

    unsigned int m_Size;              // max queue size

public:

    BoundedThreadSafeQueue(unsigned int Size)

        : m_Size(Size)

    {}

    bool Empty()

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        return m_queue.empty();

    }

    bool Full()

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        return m_queue.size() == m_Size;

    }

    // Push new data on the queue and notify other threads

    // waiting for data in this queue

    bool TryPush(const T &data)

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);       

        // Fail if queue full

        if (m_queue.size() == m_Size) return false;

        // Add the data to the queue

        m_queue.push(data);

        // Notify others that data is ready

        m_QueueHasData.notify_one();

        return true;

    } // Lock is automatically released here

    // Try to push data in queue

    // Wait until room in queue

    void Push(const T &data)

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        // While queue is full, wait

        // warning: do not replace m_queue.size() == m_Size with Full():

        // it will deadlock due to trying to acquire the same m_mutex

        // Push has already acquired

        while (m_queue.size() == m_Size) m_QueueHasRoom.wait(lock);

        // Now push the data

        m_queue.push(data);

        // And warn threads that are waiting for data

        m_QueueHasData.notify_one();

    }

    // Get data from the queue.

    // Return false if no data available

    bool TryPop(T &result)

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        // When there is no data, return false

        if (m_queue.size() == 0) return false;

        // Otherwise return the data

        // Retrieve the data from the queue

        result=m_queue.front(); m_queue.pop();

        // Warn threads who are waiting to push data

        m_QueueHasRoom.notify_one();

        return true;

        // Lock is automatically released here

    }

    // Get data from the queue.

    // Wait for data if not available

    void WaitAndPop(T &result)

    {

        // Acquire lock on the queue

        boost::unique_lock<boost::mutex> lock(m_mutex);

        // When there is no data, wait till someone fills it.

        // Lock is automatically released in the wait and obtained

        // again after the wait

        while (m_queue.size() == 0) m_QueueHasData.wait(lock);

        // Retrieve the data from the queue

        result=m_queue.front(); m_queue.pop();

        // Warn threads who are waiting to push data

        m_QueueHasRoom.notify_one();

     } // Lock is automatically released here

};