Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

Multi-threaded Server - Mutexes - Semaphores Modify the multi-threaded server (f

ID: 3727920 • Letter: M

Question

Multi-threaded Server - Mutexes - Semaphores

Modify the multi-threaded server (file server.c) such that it is created with a pool of
N threads and a buffer of M ints.
The sever validates the argument:
$ ./server
usage: ./server N-threads M-items

file server.c:

#include <stdbool.h>
#include <semaphore.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>

#define N 2 // number of threads
volatile int buffer;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; // for buffer access
sem_t semaphore; // controls the pool of threads

static void * worker(void *arg) {
   // input
   int j;  
    while (true) {
   // block until work is available
   if (sem_wait(&semaphore)) {
   fprintf(stderr, "sem_wait: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
   // get exclusive access to buffer
   if (pthread_mutex_lock(&mtx)) {
   fprintf(stderr, "pthread_mutex_lock: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
   // store input
   j = buffer;
   // release exclusive access to buffer
   if (pthread_mutex_unlock(&mtx)) {
   fprintf(stderr, "pthread_mutex_unlock: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
   // process data
   fprintf(stdout, "Processing: %d ", j);
   sleep(10);
   fprintf(stdout, "Done with: %d ", j);  
   }
}

int main() {
   // input
   int j;
   // initialize semaphore
   if (sem_init(&semaphore, 0, 0)) {
   fprintf(stderr, "sem_init: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
   // create pool of N detached threads
    pthread_t t;
   for (int i=0; i<N; ++i) {
   if (pthread_create(&t, NULL, worker, NULL)) {
   fprintf(stderr, "pthread_create: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
   if (pthread_detach(t)) {
   fprintf(stderr, "pthread_detach: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
    }
   // main loop
    while (fscanf(stdin, "%d", &j)!=EOF) {
   // get exclusive access to buffer
   if (pthread_mutex_lock(&mtx)) {
   fprintf(stderr, "pthread_mutex_lock: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
   // load input into buffer
   buffer = j;
   // release exclusive access to buffer
   if (pthread_mutex_unlock(&mtx)) {
   fprintf(stderr, "pthread_mutex_unlock: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
   // unlock one thread
   if (sem_post(&semaphore)) {
   fprintf(stderr, "sem_post: %s ", strerror(errno));
   exit(EXIT_FAILURE);
   }
   }
   return(0);
}

Example output:

When the buffer has reached its full capacity, inputs are ignored until room is available.
In the following scenario, inputs 5 and 6 are lost because the buffer is full:
$ ./server 2 2
*** 2 threads, 2 buffered items
1 2
Processing: 1
Processing: 2
3 4
5 6
buffer is full!
buffer is full!
Done with: 1
Processing: 3
Done with: 2
Processing: 4
7
Done with: 3
Processing: 7
Done with: 4
Done with: 7


Here is another normal execution:
./server 5 10
*** 5 threads, 10 buffered items
1 2 3 4 5
1
Processing: 1
Processing: 2
Processing: 3
Processing: 4
Processing: 5
6 7 8 9 10 11 12 13 14 15
Done with: 1
Done with: 2
Processing: 7
Done with: 3
Done with: 5
Processing: 8
Processing: 6
Processing: 9
Done with: 4
Processing: 10
Done with: 7
Done with: 9
Processing: 12
Processing: 11
Done with: 6
Processing: 13
Done with: 8
Processing: 14
Done with: 10
Processing: 15
Done with: 12
Done with: 11
Done with: 13
Done with: 14
Done with: 1

Explanation / Answer

// multiple producers & consumers
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
#include <stdlib.h>
#include <iostream>
#include <unistd.h>

#define MAX 15

int buffer[MAX];
int fill = 0;
int use = 0;
void put(int value) {
buffer[fill] = value;
fill = (fill + 1) % MAX;
}

int get() {
int b = buffer[use];
use = (use + 1) % MAX;

return b;
}

int loops = 0;
sem_t empty;
sem_t full;
sem_t mutex;

void *producer(void *arg) {
int i;

for (i = 0; i < loops; i++) {
sem_wait(∅);
sem_wait(&mutex;);
put(i);
sem_post(&full;);

printf("put %d ", i); fflush(NULL);

sem_post(&mutex;);

sleep(1);
}

pthread_exit(NULL);
}

void *consumer(void *arg) {
int i;

for (i = 0; i < loops; i++) {
sem_wait(&full;);
sem_wait(&mutex;);

int b = get();
sem_post(∅);

printf("get %d ", b); fflush(NULL);

sem_post(&mutex;);
}

pthread_exit(NULL);

}

int main(int argc, char *argv[])
{
if(argc < 2 ){
printf("Needs 2nd arg for loop count variable. ");
return 1;
}

loops = atoi(argv[1]);
if(loops > MAX){
printf("Max allowed arg is %d ", MAX);
return 1;
}

sem_init(∅, 0, MAX); // MAX buffers are empty to begin with...
sem_init(&full;, 0, 0); // ... and 0 are full
sem_init(&mutex;, 0, 1); // mutex = 1 since it a lock
pthread_t pThread, cThread;
pthread_create(&pThread;, 0, producer, 0);
pthread_create(&cThread;, 0, consumer, 0);
pthread_join(pThread, NULL);
pthread_join(cThread, NULL);
int emptyValue, fullValue, mutexValue;
sem_getvalue(∅, &emptyValue;);
sem_getvalue(&full;, &fullValue;);
sem_getvalue(&mutex;, &mutexValue;);
if(emptyValue == MAX) sem_destroy(∅);
else std::cout << "Error destroying empty. Value is " << emptyValue << std::endl;
if(fullValue == 0) sem_destroy(&full;);
else std::cout << "Error destroying full. Value is " << fullValue << std::endl;
if(mutexValue == 1) sem_destroy(&mutex;);
else std::cout << "Error destroying mutex. Value is " << mutexValue << std::endl;
return 0;
}