Academic Integrity: tutoring, explanations, and feedback — we don’t complete graded work or submit on a student’s behalf.

I have an assignment, we are given the header file - dispatchQueue.h and we have

ID: 3751447 • Letter: I

Question

I have an assignment, we are given the header file - dispatchQueue.h and we have to edit the header file and create a c file with the functions.

The function I am stuck on is:

The following code is the header file, which we can edit if need be:

#ifndef DISPATCHQUEUE_H

#define DISPATCHQUEUE_H

#include <pthread.h>

#include <semaphore.h>

#define error_exit(MESSAGE) perror(MESSAGE), exit(EXIT_FAILURE)

typedef enum { // whether dispatching a task synchronously or

asynchronously

ASYNC, SYNC

} task_dispatch_type_t;

typedef enum { // The type of dispatch queue.

CONCURRENT, SERIAL

} queue_type_t;

typedef struct task {

char name[64]; // to identify it when debugging

void (*work)(void *); // the function to perform

void *params; // parameters to pass to the function

task_dispatch_type_t type; // asynchronous or synchronous

} task_t;

typedef struct dispatch_queue_t dispatch_queue_t; // the dispatch

queue type

typedef struct dispatch_queue_thread_t dispatch_queue_thread_t; // the

dispatch queue thread type

struct dispatch_queue_thread_t {

dispatch_queue_t *queue;// the queue this thread is associated

with

pthread_t thread; // the thread which runs the task

sem_t thread_semaphore; // the semaphore the thread waits on until

a task is allocated

task_t *task; // the current task for this tread

};

struct dispatch_queue_t {

queue_type_t queue_type; // the type of queue - serial

or concurrent

};

task_t *task_create(void (*)(void *), void *, char*);

void task_destroy(task_t *);

dispatch_queue_t *dispatch_queue_create(queue_type_t);

void dispatch_queue_destroy(dispatch_queue_t *);

int dispatch_async(dispatch_queue_t *, task_t *);

int dispatch_sync(dispatch_queue_t *, task_t *);

int dispatch_queue_wait(dispatch_queue_t *);

#endif /* DISPATCHQUEUE_H */

dispatch queue t dispatch queue create (queue type t queueType) Creates a dispatch queue, probably setting up any associated threads and a linked list to be used by the added tasks. The queueType is either CONCURRENT or SERIAL Returns: A pointer to the created dispatch queue Example: dispatch queue t *queue queue dispatch queue_create (CONCURRENT)

Explanation / Answer

dispatchQueue.c

#include "dispatchQueue.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/sysinfo.h>
#include <unistd.h>

// implementation of a linked list enqueue function
void push_dispatch_queue(dispatch_queue_t *queue, task_t task) {
    sll_node *current_node = queue->nodeHead;
    sll_node *newNode = (sll_node*) malloc(sizeof(sll_node));
    newNode->task = task;
    newNode->next = NULL;

    while(current_node->next != NULL) {
        current_node = current_node->next;
    }
    current_node->next = newNode;
}

// implementation of a linked list dequeue function
sll_node *pop_dispatch_queue(dispatch_queue_t *queue) {
    sll_node *previousHead = queue->nodeHead;
    queue->nodeHead = previousHead->next;
    return previousHead;
}

// used to push a head onto the queue if it at any point is null
void push_head_on_queue(dispatch_queue_t *queue, task_t *task){
    sll_node *head = (sll_node*) malloc(sizeof(sll_node));
    head->next = NULL;
    head->task = *task;
    queue->nodeHead = head;
}

// method that releases the queue, semaphore and queue_mutex lock memory when called
void dispatch_queue_destroy(dispatch_queue_t *queue) {
    // destroy mutexs
    pthread_mutex_destroy(&queue->queue_mutex);
    pthread_mutex_destroy(&queue->busy_thread_mutex);

    // destory semaphores
    sem_destroy(&queue->all_done_semaphore);
    sem_destroy(&queue->queue_semaphore);
    free(queue);
}

// method that releases the memeory of a task when executed
void destroy_task(task_t *task) {
    free(task);
}

// method that each thread calls when they run concurrently
void *perform_tasks(void *param){
    // case to dispatch queue to prevent compiler warnings
    dispatch_queue_t *queue = (dispatch_queue_t*) param;

    // poll indefinatly
    while(1) {
        // wait until more tasks are push to queue
        sem_wait(&queue->queue_semaphore);

        //check if termination condition has been met; break if met
        if(queue->terminate_condition == 1){
            break;
        }

        // update the queue's active thread counter and aquire queue_mutex lock
        pthread_mutex_lock(&queue->queue_mutex);
        pthread_mutex_lock(&queue->busy_thread_mutex);      
        queue->busy_threads = queue->busy_threads + 1;
        pthread_mutex_unlock(&queue->busy_thread_mutex);
      
        // dequeuing a task from the queue
        sll_node *targetNode = pop_dispatch_queue(queue);

        switch(queue->queue_type) {
        case CONCURRENT:
            // In the case of a concurrent queue, release the lock before performing the task
            pthread_mutex_unlock(&queue->queue_mutex);
            targetNode->task.work(targetNode->task.params);
            // release memory
            destroy_task(&targetNode->task);
            // decrement active thread on the queue as this thread is finished
            pthread_mutex_lock(&queue->busy_thread_mutex);      
            queue->busy_threads = queue->busy_threads - 1;
            pthread_mutex_unlock(&queue->busy_thread_mutex);
            break;
  
        case SERIAL:
            // In the case of a serial queue, release the lock after performing the task
            targetNode->task.work(targetNode->task.params);
            // release memory
            destroy_task(&targetNode->task);
            // decrement active thread on the queue as this thread is finished & release lock
            pthread_mutex_lock(&queue->busy_thread_mutex);      
            queue->busy_threads = queue->busy_threads - 1;
            pthread_mutex_unlock(&queue->busy_thread_mutex);
            pthread_mutex_unlock(&queue->queue_mutex);
            break;
        }
      
        if(queue->nodeHead == NULL && queue->busy_threads == 0){
            sem_post(&queue->all_done_semaphore);
        }
    }
    //should any threads still exist after the parent thread is killed then this statement will kill it
    pthread_exit(EXIT_SUCCESS);
}

void generate_threads(dispatch_queue_t *queue, queue_type_t queue_type) {
    // generate threads and execute the tasks on the semaphore queue
    int i;
    for(i = 0; i < get_nprocs_conf(); i++){
        pthread_t *thread = (pthread_t *) malloc(sizeof(pthread_t));
        pthread_create(thread , NULL, perform_tasks, queue);

    }
}

dispatch_queue_t *dispatch_queue_create(queue_type_t queue_type) {
    // delearing variables to put in the structure
    sem_t queue_semaphore;
    sem_t all_done_semaphore;
    pthread_mutex_t queue_mutex, busy_thread_mutex;
    dispatch_queue_t *queue = (dispatch_queue_t *) malloc(sizeof(dispatch_queue_t));

    // set up dispatch queue
    queue->queue_type = queue_type;
    queue->queue_mutex = queue_mutex;
    queue->busy_thread_mutex = busy_thread_mutex;
    queue->terminate_condition = 0;

    // set up dispatch queue thread
    queue->queue_semaphore = queue_semaphore;
    queue->all_done_semaphore = all_done_semaphore;

    // create a semaphore & queue_mutex lock
    sem_init(&queue->queue_semaphore, 0, 0);
    sem_init(&queue->all_done_semaphore, 0, 0);
    pthread_mutex_init(&queue->queue_mutex, NULL);
    pthread_mutex_init(&queue->busy_thread_mutex, NULL);

    //generate threads
    generate_threads(queue, queue_type);
    return queue;
}

task_t *task_create(void (* work)(void *), void *param, char *name) {
    // allocate memeory for the tast and populate the structure
    task_t *task = (task_t *) malloc(sizeof(task_t));
    *task->name = *name;
    task->work = (void (*)(void *))work;
    task->params = param;

    // This is never used but for best practice a default of ASYNC is assigned
    task->type = ASYNC;
    return task;
}

//peform task on the thread that calls it
int dispatch_sync(dispatch_queue_t *queue, task_t *task){
    // perform the task then destroy it
    task->work(task->params);
    destroy_task(task);
    return 0;
}

// the header specifies a int as the return value so that is what is done here, even thought the integer return value is never used
int dispatch_async(dispatch_queue_t *queue, task_t *task) {
    // lock queue
    pthread_mutex_lock(&queue->queue_mutex);

    // check if the head of the linked list is assigned, if not assign the incoming task as the head of the queue
    if(queue->nodeHead == NULL) {
        push_head_on_queue(queue, task);
    } else {
        push_dispatch_queue(queue, *task);
    }
  
    // unlock queue
    pthread_mutex_unlock(&queue->queue_mutex);
  
    // notify semaphore that new tasks avalible
    sem_post(&queue->queue_semaphore);
    return 0;
}

int dispatch_queue_wait(dispatch_queue_t *queue) {
    // block until there are no more active theads on the queue and there are no more tasks on the queue
    sem_wait(&queue->all_done_semaphore);
    // flush all waiting works waiting on the queue semaphore
    queue->terminate_condition = 1;
    int index;
    for(index = 0; index < get_nprocs_conf(); index++) {
        sem_post(&queue->queue_semaphore);
    }
    return 0;
}

void dispatch_for(dispatch_queue_t *queue, long number, void (*work) (long)) {
    int id;
    char names[number][2]; // because these are going to be parameters to tasks they have to hang around
    for(id = 'A'; id < 'A' + number; id++){
        char *name = names[id - 'A'];
        name[0] = id;
        name[1] = '';
        long param_value = id - 'A';
        task_t *task = task_create((void *)work, (void *)param_value, name);
        dispatch_async(queue, task);
    }
    dispatch_queue_wait(queue);
    dispatch_queue_destroy(queue);
}


dispatchQueue.h

#ifndef DISPATCHQUEUE_H
#define   DISPATCHQUEUE_H

#include <pthread.h>
#include <semaphore.h>
  
#define error_exit(MESSAGE)     perror(MESSAGE), exit(EXIT_FAILURE)

    typedef enum { // whether dispatching a task synchronously or asynchronously
        ASYNC, SYNC
    } task_dispatch_type_t;
  
    typedef enum { // The type of dispatch queue.
        CONCURRENT, SERIAL
    } queue_type_t;

    typedef struct task {
        char name[64];              // to identify it when debugging
        void (*work)(void *);       // the function to perform
        void *params;               // parameters to pass to the function
        task_dispatch_type_t type; // asynchronous or synchronous
    } task_t;
  
    typedef struct dispatch_queue_t dispatch_queue_t; // the dispatch queue type
    typedef struct dispatch_queue_thread_t dispatch_queue_thread_t; // the dispatch queue thread type
    typedef struct sll_node sll_node; // the singly linked list data structure type

    struct sll_node {
        task_t task;                // the task to be executed
        struct sll_node *next;      // link to the next item in the linked list
    };                              // singly linked list

    struct dispatch_queue_thread_t {
        dispatch_queue_t *queue;    // the queue this thread is associated with
        pthread_t thread;           // the thread which runs the task
        sem_t thread_semaphore;     // the semaphore the thread waits on until a task is allocated
        task_t *task;               // the current task for this tread
    };

    struct dispatch_queue_t {
        queue_type_t queue_type;            // the type of queue - serial or concurrent
        struct sll_node *nodeHead;          // first item in the linked list
        pthread_mutex_t queue_mutex;        // mutex associated with this queue
        pthread_mutex_t busy_thread_mutex; // mutex associated with this queue
        sem_t queue_semaphore;              // the semaphore on the queue to notify threads when a task is ready
        sem_t all_done_semaphore;           // semaphore used to notify the main thread that all tasks are completed
        int terminate_condition;            // condition used to determine whether or not the queue workers should terminate and shutdown
        int busy_threads;                   // number of activly threads working on a task from this queue
    };
  
    task_t *task_create(void (*)(void *), void *, char*);
  
    void task_destroy(task_t *);

    dispatch_queue_t *dispatch_queue_create(queue_type_t);
  
    void dispatch_queue_destroy(dispatch_queue_t *);
  
    int dispatch_async(dispatch_queue_t *, task_t *);
  
    int dispatch_sync(dispatch_queue_t *, task_t *);
  
    void dispatch_for(dispatch_queue_t *, long, void (*)(long));
  
    int dispatch_queue_wait(dispatch_queue_t *);

    void push_dispatch_queue(dispatch_queue_t *queue, task_t task);

    sll_node *pop_dispatch_queue(dispatch_queue_t *queue);

#endif   /* DISPATCHQUEUE_H */

testFor.c

#include "dispatchQueue.h"
#include <stdio.h>
#include <stdlib.h>

void for_test(long i) {
    long counter = 0;
    for (; i < 1000000000; i++)
        counter++;
    printf("The number is %ld ", counter);
}

// Only required for SE 370 students.
int main(int argc, char** argv) {
    dispatch_queue_t *queue;
    queue = dispatch_queue_create(CONCURRENT);
    dispatch_for(queue, 10, for_test);
    return (EXIT_SUCCESS);
}

Hire Me For All Your Tutoring Needs
Integrity-first tutoring: clear explanations, guidance, and feedback.
Drop an Email at
drjack9650@gmail.com
Chat Now And Get Quote