3. Barrier – 35% a) Implement a barrier for pthreads in C++ using pthread condit
ID: 3847370 • Letter: 3
Question
3. Barrier – 35% a) Implement a barrier for pthreads in C++ using pthread condition variables and mutexes. Consider a process with N running threads. During their execution, all threads call repeatedly the barrier’s wait() method. For the 1st, 2nd, ..., (N-1)th call, the wait() function blocks (suspends) the calling thread. The Nth thread that calls this function will unblock all suspended threads and then will return. All threads that were previously blocked in wait() will now return from wait() and will resume their execution. They will continue until their next call to the barrier’s wait() method. In effect, the slowest thread will determine the overall progress of the application. Note that the identity and type of thread are irrelevant in deciding to suspend it or “raise” the barrier in wait(): only the Nth call to wait() will resume all processes, while all other call to wait() will suspend the caller thread. Here is an illustration of a barrier from the Intro to Parallel Computing guide from LLNL: (Analogy: imagine a real-world barrier on a highway. Cars that arrive at the barrier have to stop and wait for the barrier to be raised. For each Nth car that arrives, the barrier is raised and all N cars waiting can now proceed. Right after the last car passes, the barrier is lowered again.) A barrier is useful to control a group of threads so they all proceed in sync at specific points in the program. Here is how a barrier object may be used: // the main thread: // barrier object declared as a global variable, to be accessible from all threads: Barrier barrier(N); // assume N was defined as an int somewhere // child threads run this thread function or similar: void* thread_fun(void* param) { while (not_exit) { // thread runs in a loop // do some work barrier.wait(); // suspend all calling threads until the Nth thread makes the call, // after which all threads will resume and return from wait // do some more work } } The Barrier class must have at least a public constructor, a destructor, and a method wait(): class Barrier { public: Barrier(int n); // n is the number of threads (N) ~Barrier(); // destructor: cleanup, destroy mutex, condvar, etc. void wait(); // … may need a condition variable, a mutex, and some other attributes }; A Barrier object must encapsulate all necessary synchronization objects (mutex and condition variable, plus something else) to do its job. Use the principle of encapsulation and keep private the instance variables. b. Write a multithreaded application in C++ that demonstrates how your barrier works.
Explanation / Answer
linux_thread.cpp
#include "linux_thread.h"
ThreadHandle ThreadCreate(THREAD_FUN (* pThreadFun)( void * ), void * pParam)
{
pthread_t threadID;
pthread_create(&threadID, NULL, (THREAD_FUN)pThreadFun, pParam);
return threadID;
}
void ThreadExit(void)
{
pthread_exit(0);
}
void CloseThreadHandle(ThreadHandle hThread)
{
//
//CloseHandle((HANDLE) hThread);
}
void WaitForThread(ThreadHandle hThread)
{
void * retval;
pthread_join(hThread, &retval);
}
linux_semaphore.cpp
#include <errno.h>
#include "linux_semaphore.h"
int SemCreate(int nInitVal, SemHandle *pHandle)
{
//static CGSemHandle hSem;
int nRes = sem_init(pHandle, 0, nInitVal);
DEBUGMSG("init sem:%d res:%d err:%d ", pHandle, nRes, errno);
if (nRes == 0)
return 0;
else
return -1;
//return (nRes == 0)?hSem:(CGSemHandle)0;
}
void SemDestroy(SemHandle *pSem)
{
sem_destroy(pSem);
}
void SemPost(SemHandle *pSem)
{
DEBUGMSG("post sem:%d ", pSem);
sem_post(pSem);
}
void SemWait(SemHandle *pSem)
{
int nErr;
do
{
//DEBUGMSG("wait sem:%d ", pSem);
nErr = sem_wait(pSem);
} while (nErr == -1 && errno == EINTR);
}
linux_timedsem.cpp
#include "linux_timedsem.h"
int TimedSemCreate(TimedSemHandle *pHandle, int nInitVal)
{
int res;
pHandle->nCount = nInitVal;
res = pthread_mutex_init(&pHandle->mtx, 0);
if (res != 0) return res;
res = pthread_cond_init(&pHandle->cond, 0);
if (res != 0)
{
pthread_mutex_destroy(&pHandle->mtx);
return res;
}
return res;
}
void TimedSemDestroy(TimedSemHandle hSem)
{
pthread_cond_destroy(&hSem.cond);
pthread_mutex_destroy(&hSem.mtx);
}
void TimedSemPost(TimedSemHandle hSem)
{
pthread_mutex_lock(&hSem.mtx);
hSem.nCount++;
pthread_cond_signal(&hSem.cond);
pthread_mutex_unlock(&hSem.mtx);
}
bool TimedSemWait(TimedSemHandle hSem, int nTimeOut)
{
struct timespec abs_ts;
struct timeval cur_tv;
int rc;
pthread_mutex_lock(&hSem.mtx);
while (hSem.nCount <= 0)
{
if (nTimeOut >= 0)
{
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + nTimeOut / 1000;
abs_ts.tv_nsec = cur_tv.tv_usec * 1000
+ (nTimeOut % 1000) * 1000000;
rc = pthread_cond_timedwait(&hSem.cond, &hSem.mtx, &abs_ts);
if (rc == ETIMEDOUT) {
pthread_mutex_unlock(&hSem.mtx);
return false;
}
}
else
pthread_cond_wait(&hSem.cond, &hSem.mtx);
}
hSem.nCount--;
pthread_mutex_unlock(&hSem.mtx);
return true;
}
win32_semaphore.cpp
#include "../include/common_semaphore.h"
int SemCreate(INT32 nInitVal, SemHandle *pHandle)
{
if ((*pHandle = CreateSemaphore(NULL, nInitVal, 65535, NULL))!= NULL)
return 0;
else
return -1;
}
void SemDestroy(SemHandle *pSem)
{
CloseHandle(*pSem);
}
void SemPost(SemHandle *pSem)
{
ReleaseSemaphore(*pSem, 1, NULL);
}
void SemWait(SemHandle *pSem)
{
WaitForSingleObject(*pSem, INFINITE);
}
win32_thread.cpp
#include "../include/common_thread.h"
ThreadHandle ThreadCreate(THREADFUNC_TYPE (* pThreadFun)( void * ), void * pParam)
{
return (ThreadHandle)_beginthreadex(NULL, 0, (THREADFUNC_TYPE) pThreadFun, pParam, NULL, NULL);
}
void ThreadExit(void)
{
_endthreadex(0);
}
void CloseThreadHandle(ThreadHandle hThread)
{
CloseHandle((HANDLE) hThread);
}
void WaitForThread(ThreadHandle hThread)
{
WaitForSingleObject((HANDLE)hThread, INFINITE);
}
win32_timedsem.cpp
#include "../include/common_timedsem.h"
void TimedSemCreate(TimedSemHandle *pHandle, int nInitVal)
{
*pHandle = CreateSemaphore(NULL, nInitVal, 65535, NULL);
}
void TimedSemDestroy(TimedSemHandle hSem)
{
CloseHandle(hSem);
}
void TimedSemPost(TimedSemHandle hSem)
{
ReleaseSemaphore(hSem, 1, NULL);
}
bool TimedSemWait(TimedSemHandle hSem, int nTimeOut)
{
WaitForSingleObject(hSem, nTimeOut);
return true;
}
linux_thread.h
#ifndef __LINUX_THREAD_H__
#define __LINUX_THREAD_H__
#include <pthread.h>
#define ThreadMutex pthread_mutex_t
#define InitMutex(mutex) pthread_mutex_init(mutex, NULL)
#define DestroyMutex(mutex) pthread_mutex_destroy(mutex)
#define EnterMutex(mutex) pthread_mutex_lock(mutex)
#define LeaveMutex(mutex) pthread_mutex_unlock(mutex)
#define ThreadHandle pthread_t
#define THREADFUNC_TYPE void *
typedef void * (* THREAD_FUN)(void *);
#endif
linux_timedsem.h
#ifndef __CG_LINUX_TIMEDSEM_H__
#define __CG_LINUX_TIMEDSEM_H__
#include <errno.h>
#include <sys/time.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <pthread.h>
#include "common.h"
typedef struct __TIMEDSEM
{
int nCount;
pthread_mutex_t mtx;
pthread_cond_t cond;
}
TimedSemHandle;
#endif
linux_semaphore.h
#ifndef __CG_LINUX_SEMAPHORE_H__
#define __CG_LINUX_SEMAPHORE_H__
#include <pthread.h>
#include <semaphore.h>
#include "common.h"
typedef sem_t SemHandle;
#endif
win32_semaphore.h
#ifndef __WIN32_SEMAPHORE_H__
#define __WIN32_SEMAPHORE_H__
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
typedef HANDLE SemHandle;
#endif
win32_thread.h
#ifndef __WIN32_THREAD_H__
#define __WIN32_THREAD_H__
#define WIN32_LEAN_AND_MEAN /**<Windows*/
#include <windows.h>
#include <process.h>
/************* mutex (use CRITICAL_SECTION in windows) ***************/
#define ThreadMutex CRITICAL_SECTION
#define InitMutex(mutex) InitializeCriticalSection(mutex)
#define DestroyMutex(mutex) DeleteCriticalSection(mutex)
#define EnterMutex(mutex) EnterCriticalSection(mutex)
#define LeaveMutex(mutex) LeaveCriticalSection(mutex)
#define ThreadHandle HANDLE
#define THREADFUNC_TYPE THREAD_FUN
typedef unsigned int (__stdcall * THREAD_FUN)(void *);
#endif
win32_timedsem.h
#ifndef __WIN32_TIMEDSEM_H__
#define __WIN32_TIMEDSEM_H__
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include "common.h"
typedef HANDLE TimedSemHandle;
#endif
common.h
#ifndef __CG_COMMON_H__
#define __CG_COMMON_H__
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#ifndef __cplusplus
#define true 1
#define false 0
#define inline
#endif
#ifdef WIN32
#pragma warning(disable:4996)
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <Winsock2.h>
#include <shlwapi.h>
//#include <ws2tcpip.h>
#include <objbase.h>
#ifndef __cplusplus
typedef unsigned char bool;
#endif
typedef signed char INT8, *PINT8;
typedef signed short INT16, *PINT16;
typedef signed int INT32, *PINT32;
typedef signed __int64 INT64, *PINT64;
typedef unsigned char UINT8, *PUINT8;
typedef unsigned short UINT16, *PUINT16;
typedef unsigned int UINT32, *PUINT32;
typedef unsigned __int64 UINT64, *PUINT64;
typedef float FP32, *PFP32;
typedef double FP64, *PFP64;
typedef struct { unsigned char octet[16]; } UINT128;
#pragma comment ( lib, "Shlwapi.lib" )
#ifdef __DEBUG__
static void DEBUGMSGOUT(const char* pszFormat, ...)
{
char buf[40960]={0};
va_list arglist;
va_start(arglist, pszFormat);
vsprintf(&buf[strlen(buf)], pszFormat, arglist);
va_end(arglist);
OutputDebugStringA(buf);
printf("%s", buf);
}
#define DEBUGMSG DEBUGMSGOUT("[%s %d %s] ", __FILE__, __LINE__, __FUNCTION__); DEBUGMSGOUT
#else
#define DEBUGMSG
#endif
#endif
#ifdef _LINUX
#include <unistd.h>
#include <sys/time.h>
#include <uuid/uuid.h>
#ifndef __cplusplus
typedef unsigned char bool;
#endif
#define TRUE 1
#define FALSE 0
typedef unsigned char boolean;
typedef signed char INT8, *PINT8, CHAR;
typedef signed short INT16, *PINT16;
typedef signed int INT32, *PINT32;
typedef signed long long INT64, *PINT64;
typedef unsigned char UINT8, *PUINT8, UCHAR;
typedef unsigned short UINT16, *PUINT16;
typedef unsigned int UINT32, *PUINT32, DWORD;
typedef unsigned long long UINT64, *PUINT64;
typedef float FP32, *PFP32;
typedef double FP64, *PFP64;
typedef struct { unsigned char octet[16]; } UINT128;
#ifdef __DEBUG__
#define DEBUGMSG(x...) do{printf("[%s %d %s] ", __FILE__, __LINE__, __FUNCTION__);printf(x); }while(0)
#else
#define DEBUGMSG(x...)
#endif
//handle type
typedef UINT32 HWND;
typedef void* HANDLE;
#define __stdcall
#endif
extern void SRand(void);
extern int Rand(void);
extern void SleepMs(int mSec);
extern int GetTimeofDay(struct timeval* tp);
extern char *GetCurrPath(void);
extern char *GetTmpPath(void);
extern const char* newGUID();
extern bool DirExists(char *szPathName);
extern bool CreateDir(char *szPathName);
#endif
common_semaphore.h
#ifndef __SEMAPHORE_H__
#define __SEMAPHORE_H__
#include "common.h"
#ifdef WIN32
#include "win32_semaphore.h"
#endif
#ifdef _LINUX
#include "linux_semaphore.h"
#endif
extern int SemCreate(INT32 nInitVal, SemHandle *pHandle);
extern void SemDestroy(SemHandle *pSem);
extern void SemPost(SemHandle *pSem);
extern void SemWait(SemHandle *pSem);
#endif
common_thread.h
#ifndef __CCOMMON_THREAD_H__
#define __CCOMMON_THREAD_H__
#include "common.h"
#ifdef WIN32 //define the macro for Win32 thread
#include "win32_thread.h"
#endif
#ifdef _LINUX
#include "linux_thread.h"
#endif
extern ThreadHandle ThreadCreate(THREAD_FUN (* pThreadFun)( void * ), void * pParam);
extern void ThreadExit();
extern void CloseThreadHandle(ThreadHandle hThread);
extern void WaitForThread(ThreadHandle hThread);
#endif
common_timedsem.h
#ifndef __TIMEDSEM_H__
#define __TIMEDSEM_H__
#include "common.h"
#ifdef WIN32
#include "win32_timedsem.h"
#endif
#ifdef _LINUX
#include "linux_timedsem.h"
#endif
extern void TimedSemCreate(TimedSemHandle *pHandle, int nInitVal);
extern void TimedSemDestroy(TimedSemHandle hSem);
extern void TimedSemPost(TimedSemHandle hSem);
extern bool TimedSemWait(TimedSemHandle hSem, int nTimeOut);
#endif
Related Questions
drjack9650@gmail.com
Navigate
Integrity-first tutoring: explanations and feedback only — we do not complete graded work. Learn more.