Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

3. Barrier – 35% a) Implement a barrier for pthreads in C++ using pthread condit

ID: 3847370 • Letter: 3

Question

3. Barrier – 35% a) Implement a barrier for pthreads in C++ using pthread condition variables and mutexes. Consider a process with N running threads. During their execution, all threads call repeatedly the barrier’s wait() method. For the 1st, 2nd, ..., (N-1)th call, the wait() function blocks (suspends) the calling thread. The Nth thread that calls this function will unblock all suspended threads and then will return. All threads that were previously blocked in wait() will now return from wait() and will resume their execution. They will continue until their next call to the barrier’s wait() method. In effect, the slowest thread will determine the overall progress of the application. Note that the identity and type of thread are irrelevant in deciding to suspend it or “raise” the barrier in wait(): only the Nth call to wait() will resume all processes, while all other call to wait() will suspend the caller thread. Here is an illustration of a barrier from the Intro to Parallel Computing guide from LLNL: (Analogy: imagine a real-world barrier on a highway. Cars that arrive at the barrier have to stop and wait for the barrier to be raised. For each Nth car that arrives, the barrier is raised and all N cars waiting can now proceed. Right after the last car passes, the barrier is lowered again.) A barrier is useful to control a group of threads so they all proceed in sync at specific points in the program. Here is how a barrier object may be used: // the main thread: // barrier object declared as a global variable, to be accessible from all threads: Barrier barrier(N); // assume N was defined as an int somewhere // child threads run this thread function or similar: void* thread_fun(void* param) { while (not_exit) { // thread runs in a loop // do some work barrier.wait(); // suspend all calling threads until the Nth thread makes the call, // after which all threads will resume and return from wait // do some more work } } The Barrier class must have at least a public constructor, a destructor, and a method wait(): class Barrier { public: Barrier(int n); // n is the number of threads (N) ~Barrier(); // destructor: cleanup, destroy mutex, condvar, etc. void wait(); // … may need a condition variable, a mutex, and some other attributes }; A Barrier object must encapsulate all necessary synchronization objects (mutex and condition variable, plus something else) to do its job. Use the principle of encapsulation and keep private the instance variables. b. Write a multithreaded application in C++ that demonstrates how your barrier works.

Explanation / Answer

linux_thread.cpp

#include "linux_thread.h"

ThreadHandle ThreadCreate(THREAD_FUN (* pThreadFun)( void * ), void * pParam)
{
    pthread_t threadID;
    pthread_create(&threadID, NULL, (THREAD_FUN)pThreadFun, pParam);
    return threadID;
}

void ThreadExit(void)
{
pthread_exit(0);
}

void CloseThreadHandle(ThreadHandle hThread)
{
   //
    //CloseHandle((HANDLE) hThread);
}

void WaitForThread(ThreadHandle hThread)
{
    void * retval;
    pthread_join(hThread, &retval);
}

linux_semaphore.cpp

#include <errno.h>
#include "linux_semaphore.h"

int SemCreate(int nInitVal, SemHandle *pHandle)
{
   //static CGSemHandle hSem;
   int nRes = sem_init(pHandle, 0, nInitVal);
   DEBUGMSG("init sem:%d res:%d   err:%d ", pHandle, nRes, errno);
   if (nRes == 0)
       return 0;
   else
       return -1;
   //return (nRes == 0)?hSem:(CGSemHandle)0;
}

void SemDestroy(SemHandle *pSem)
{
   sem_destroy(pSem);
}

void SemPost(SemHandle *pSem)
{
   DEBUGMSG("post sem:%d ", pSem);
   sem_post(pSem);
}

void SemWait(SemHandle *pSem)
{
   int nErr;
   do
   {
       //DEBUGMSG("wait sem:%d ", pSem);
           nErr = sem_wait(pSem);
   } while (nErr == -1 && errno == EINTR);
}

linux_timedsem.cpp


#include "linux_timedsem.h"

int TimedSemCreate(TimedSemHandle *pHandle, int nInitVal)
{
   int res;
   pHandle->nCount = nInitVal;
   res = pthread_mutex_init(&pHandle->mtx, 0);
   if (res != 0) return res;
res = pthread_cond_init(&pHandle->cond, 0);
if (res != 0)
   {
       pthread_mutex_destroy(&pHandle->mtx);
       return res;
   }
   return res;
}

void TimedSemDestroy(TimedSemHandle hSem)
{
   pthread_cond_destroy(&hSem.cond);
    pthread_mutex_destroy(&hSem.mtx);
}

void TimedSemPost(TimedSemHandle hSem)
{
   pthread_mutex_lock(&hSem.mtx);
    hSem.nCount++;
    pthread_cond_signal(&hSem.cond);
    pthread_mutex_unlock(&hSem.mtx);
}

bool TimedSemWait(TimedSemHandle hSem, int nTimeOut)
{
   struct timespec abs_ts;
    struct timeval cur_tv;
   int rc;
   pthread_mutex_lock(&hSem.mtx);
    while (hSem.nCount <= 0)
    {
        if (nTimeOut >= 0)
        {
            gettimeofday(&cur_tv, NULL);
            abs_ts.tv_sec = cur_tv.tv_sec + nTimeOut / 1000;
            abs_ts.tv_nsec = cur_tv.tv_usec * 1000
                + (nTimeOut % 1000) * 1000000;
            rc = pthread_cond_timedwait(&hSem.cond, &hSem.mtx, &abs_ts);
            if (rc == ETIMEDOUT) {
                pthread_mutex_unlock(&hSem.mtx);
                return false;
            }
        }
        else
            pthread_cond_wait(&hSem.cond, &hSem.mtx);
    }
    hSem.nCount--;
    pthread_mutex_unlock(&hSem.mtx);
    return true;
}

win32_semaphore.cpp

#include "../include/common_semaphore.h"

int SemCreate(INT32 nInitVal, SemHandle *pHandle)
{
   if ((*pHandle = CreateSemaphore(NULL, nInitVal, 65535, NULL))!= NULL)
       return 0;
   else
       return -1;
}

void SemDestroy(SemHandle *pSem)
{
   CloseHandle(*pSem);
}

void SemPost(SemHandle *pSem)
{
   ReleaseSemaphore(*pSem, 1, NULL);
}

void SemWait(SemHandle *pSem)
{
   WaitForSingleObject(*pSem, INFINITE);
}

win32_thread.cpp

#include "../include/common_thread.h"

ThreadHandle ThreadCreate(THREADFUNC_TYPE (* pThreadFun)( void * ), void * pParam)
{
    return (ThreadHandle)_beginthreadex(NULL, 0, (THREADFUNC_TYPE) pThreadFun, pParam, NULL, NULL);
}

void ThreadExit(void)
{
    _endthreadex(0);
}

void CloseThreadHandle(ThreadHandle hThread)
{
    CloseHandle((HANDLE) hThread);
}

void WaitForThread(ThreadHandle hThread)
{
    WaitForSingleObject((HANDLE)hThread, INFINITE);
}

win32_timedsem.cpp

#include "../include/common_timedsem.h"

void TimedSemCreate(TimedSemHandle *pHandle, int nInitVal)
{
   *pHandle = CreateSemaphore(NULL, nInitVal, 65535, NULL);
}

void TimedSemDestroy(TimedSemHandle hSem)
{
   CloseHandle(hSem);
}

void TimedSemPost(TimedSemHandle hSem)
{
   ReleaseSemaphore(hSem, 1, NULL);
}

bool TimedSemWait(TimedSemHandle hSem, int nTimeOut)
{
   WaitForSingleObject(hSem, nTimeOut);
   return true;
}
linux_thread.h

#ifndef __LINUX_THREAD_H__
#define __LINUX_THREAD_H__

#include <pthread.h>

#define   ThreadMutex                           pthread_mutex_t                          
#define   InitMutex(mutex)               pthread_mutex_init(mutex, NULL)          
#define   DestroyMutex(mutex)           pthread_mutex_destroy(mutex)  
#define   EnterMutex(mutex)               pthread_mutex_lock(mutex)              
#define   LeaveMutex(mutex)               pthread_mutex_unlock(mutex)                  
#define   ThreadHandle                       pthread_t                          
#define   THREADFUNC_TYPE                   void *                                                          
typedef void * (* THREAD_FUN)(void *);                                                          


#endif

linux_timedsem.h

#ifndef __CG_LINUX_TIMEDSEM_H__
#define __CG_LINUX_TIMEDSEM_H__

#include <errno.h>
#include <sys/time.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <pthread.h>

#include "common.h"
typedef struct __TIMEDSEM
{
    int nCount;
    pthread_mutex_t mtx;
    pthread_cond_t cond;
}
TimedSemHandle;

#endif

linux_semaphore.h

#ifndef __CG_LINUX_SEMAPHORE_H__
#define __CG_LINUX_SEMAPHORE_H__

#include <pthread.h>
#include <semaphore.h>

#include "common.h"
typedef sem_t SemHandle;
#endif

win32_semaphore.h

#ifndef __WIN32_SEMAPHORE_H__
#define __WIN32_SEMAPHORE_H__
#define   WIN32_LEAN_AND_MEAN
#include <windows.h>

typedef HANDLE SemHandle;

#endif

win32_thread.h

#ifndef __WIN32_THREAD_H__
#define __WIN32_THREAD_H__

#define   WIN32_LEAN_AND_MEAN   /**<Windows*/
#include <windows.h>
#include <process.h>

/************* mutex (use CRITICAL_SECTION in windows) ***************/
#define   ThreadMutex           CRITICAL_SECTION                  
#define   InitMutex(mutex)     InitializeCriticalSection(mutex)      
#define   DestroyMutex(mutex)   DeleteCriticalSection(mutex)          
#define   EnterMutex(mutex)   EnterCriticalSection(mutex)      
#define   LeaveMutex(mutex)   LeaveCriticalSection(mutex)              
#define   ThreadHandle        HANDLE                              
#define   THREADFUNC_TYPE       THREAD_FUN                          

typedef unsigned int (__stdcall * THREAD_FUN)(void *);      


#endif

win32_timedsem.h

#ifndef __WIN32_TIMEDSEM_H__
#define __WIN32_TIMEDSEM_H__
#define   WIN32_LEAN_AND_MEAN
#include <windows.h>
#include "common.h"
typedef HANDLE TimedSemHandle;

#endif

common.h


#ifndef __CG_COMMON_H__
#define __CG_COMMON_H__

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>

#ifndef __cplusplus
#define true 1      
#define false 0      
#define inline      
#endif

#ifdef WIN32
#pragma   warning(disable:4996)
#define   WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <Winsock2.h>
#include <shlwapi.h>
//#include <ws2tcpip.h>
#include <objbase.h>
#ifndef __cplusplus
typedef unsigned char       bool;
#endif
typedef signed char         INT8, *PINT8;
typedef signed short        INT16, *PINT16;
typedef signed int          INT32, *PINT32;
typedef signed __int64      INT64, *PINT64;
typedef unsigned char       UINT8, *PUINT8;
typedef unsigned short      UINT16, *PUINT16;
typedef unsigned int        UINT32, *PUINT32;
typedef unsigned __int64    UINT64, *PUINT64;
typedef float               FP32, *PFP32;
typedef double               FP64, *PFP64;
typedef struct { unsigned char octet[16]; } UINT128;

#pragma   comment   (   lib,   "Shlwapi.lib"   )
#ifdef __DEBUG__
   static void DEBUGMSGOUT(const char* pszFormat, ...)
   {
       char buf[40960]={0};
       va_list arglist;
       va_start(arglist, pszFormat);
       vsprintf(&buf[strlen(buf)], pszFormat, arglist);
       va_end(arglist);
       OutputDebugStringA(buf);
       printf("%s", buf);
   }
   #define DEBUGMSG DEBUGMSGOUT("[%s %d %s] ", __FILE__, __LINE__, __FUNCTION__); DEBUGMSGOUT
#else
   #define DEBUGMSG
#endif
#endif

#ifdef _LINUX
#include <unistd.h>
#include <sys/time.h>
#include <uuid/uuid.h>
#ifndef __cplusplus
typedef unsigned char       bool;
#endif
#define TRUE 1
#define FALSE 0
typedef unsigned char boolean;
typedef signed char         INT8, *PINT8, CHAR;
typedef signed short        INT16, *PINT16;
typedef signed int          INT32, *PINT32;
typedef signed long long    INT64, *PINT64;
typedef unsigned char       UINT8, *PUINT8, UCHAR;
typedef unsigned short      UINT16, *PUINT16;
typedef unsigned int        UINT32, *PUINT32, DWORD;
typedef unsigned long long UINT64, *PUINT64;
typedef float               FP32, *PFP32;
typedef double               FP64, *PFP64;
typedef struct { unsigned char octet[16]; } UINT128;

#ifdef __DEBUG__
   #define DEBUGMSG(x...) do{printf("[%s %d %s] ", __FILE__, __LINE__, __FUNCTION__);printf(x); }while(0)
#else
   #define DEBUGMSG(x...)
#endif

//handle type
typedef UINT32 HWND;
typedef void* HANDLE;
#define __stdcall
#endif

extern void SRand(void);


extern int Rand(void);


extern void SleepMs(int mSec);


extern int GetTimeofDay(struct timeval* tp);


extern char *GetCurrPath(void);


extern char *GetTmpPath(void);


extern const char* newGUID();


extern bool DirExists(char *szPathName);


extern bool CreateDir(char *szPathName);

#endif

common_semaphore.h

#ifndef __SEMAPHORE_H__
#define __SEMAPHORE_H__

#include "common.h"

#ifdef WIN32
#include "win32_semaphore.h"
#endif

#ifdef _LINUX
#include "linux_semaphore.h"
#endif


extern int SemCreate(INT32 nInitVal, SemHandle *pHandle);


extern void SemDestroy(SemHandle *pSem);


extern void SemPost(SemHandle *pSem);


extern void SemWait(SemHandle *pSem);

#endif

common_thread.h

#ifndef __CCOMMON_THREAD_H__
#define __CCOMMON_THREAD_H__

#include "common.h"

#ifdef WIN32 //define the macro for Win32 thread
#include "win32_thread.h"
#endif

#ifdef _LINUX
#include "linux_thread.h"
#endif

extern ThreadHandle ThreadCreate(THREAD_FUN (* pThreadFun)( void * ), void * pParam);

extern void ThreadExit();

extern void CloseThreadHandle(ThreadHandle hThread);

extern void WaitForThread(ThreadHandle hThread);

#endif

common_timedsem.h


#ifndef __TIMEDSEM_H__
#define __TIMEDSEM_H__

#include "common.h"

#ifdef WIN32
#include "win32_timedsem.h"
#endif

#ifdef _LINUX
#include "linux_timedsem.h"
#endif

extern void TimedSemCreate(TimedSemHandle *pHandle, int nInitVal);


extern void TimedSemDestroy(TimedSemHandle hSem);


extern void TimedSemPost(TimedSemHandle hSem);


extern bool TimedSemWait(TimedSemHandle hSem, int nTimeOut);


#endif

Hire Me For All Your Tutoring Needs
Integrity-first tutoring: clear explanations, guidance, and feedback.
Drop an Email at
drjack9650@gmail.com
Chat Now And Get Quote