|
@@ -65,7 +65,7 @@ struct pftw_queue {
|
|
|
int flags;
|
|
|
void *arg;
|
|
|
pthread_mutex_t lock;
|
|
|
- pthread_mutex_t ending_lock;
|
|
|
+ sem_t ending_sem;
|
|
|
int workers[PFTW_MAX_THREADS];
|
|
|
int workers_count;
|
|
|
};
|
|
@@ -155,8 +155,8 @@ pftw_queue_t *pftw_newqueue(pftw_callback_t callback, int nopenfd, int flags, vo
|
|
|
queue->current_task_id = 0;
|
|
|
queue->workers_count = 0;
|
|
|
|
|
|
- pthread_mutex_init(&queue->lock, NULL);
|
|
|
- pthread_mutex_init(&queue->ending_lock, NULL);
|
|
|
+ pthread_mutex_init(&queue->lock, NULL);
|
|
|
+ sem_init(&queue->ending_sem, 0, 1);
|
|
|
|
|
|
unlock_queues();
|
|
|
return queue;
|
|
@@ -540,7 +540,7 @@ int pftw(const char *dirpath, pftw_callback_t fn, int nopenfd, int flags, void *
|
|
|
|
|
|
while (queue->workers_count > 0) {
|
|
|
dprintf("pftw(): queue->workers_count == %i\n", queue->workers_count);
|
|
|
- pthread_mutex_lock(&queue->ending_lock);
|
|
|
+ sem_wait(&queue->ending_sem);
|
|
|
};
|
|
|
|
|
|
rc = pftw_deletequeue(queue);
|
|
@@ -570,8 +570,11 @@ void pftw_worker_dash(int worker_id) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ int queue_id = worker_id % queues_count;
|
|
|
|
|
|
- thread_queue[worker_id] = queues[worker_id % queues_count];
|
|
|
+ dprintf("pftw_worker_dash(%i): queue_id == %i\n", worker_id, queue_id);
|
|
|
+
|
|
|
+ thread_queue[worker_id] = queues[queue_id];
|
|
|
queue = thread_queue[worker_id];
|
|
|
|
|
|
queue->workers[queue->workers_count++] = worker_id;
|
|
@@ -625,7 +628,7 @@ void *pftw_worker(void *_arg) {
|
|
|
i++;
|
|
|
}
|
|
|
}
|
|
|
- pthread_mutex_unlock(&queue->ending_lock);
|
|
|
+ sem_post(&queue->ending_sem);
|
|
|
unlock_queue(queue);
|
|
|
}
|
|
|
thread_queue[worker_id] = NULL;
|