Dmitry Yu Okunev лет назад: 8
Родитель
Сommit
be1e1d7f67
2 измененных файлов с 119 добавлено и 44 удалено
  1. 117 43
      pftw.c
  2. 2 1
      pftw.h

+ 117 - 43
pftw.c

@@ -27,17 +27,23 @@
 #include <string.h>	/* strerror() */
 #include <stdlib.h>
 #include <search.h>
+#include <dirent.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
 
 #include "pftw.h"
 
 char pftw_running;
 
 struct pftw_task {
-	char		*path;
+	char		*dirpath;
+	size_t		 dirpath_len;
 	unsigned long	 difficulty;
-	struct stat	*stat;
+	struct stat	 stat;
 
-	pftw_queue_t	*queue;
+	struct pftw_queue *queue;
 	struct pftw_task *max;
 };
 typedef struct pftw_task pftw_task_t;
@@ -46,6 +52,7 @@ struct pftw_queue {
 	pftw_task_t	 tasks[PFTW_MAX_QUEUE_LENGTH];
 	void		*tasks_btree;
 	int		 tasks_count;
+	int		 current_task_id;
 	pftw_callback_t	 callback;
 	int		 nopenfd;
 	int		 flags;
@@ -167,12 +174,12 @@ int pftw_deinit() {
 	free(threads);
 	threads		= NULL;
 	threads_count	= 0;
-
+/*
 	free(queues);
 	queues		= NULL;
 	queues_count	= 0;
 	queues_alloced	= 0;
-
+*/
 	return sem_destroy(&threads_sem);
 }
 
@@ -193,12 +200,12 @@ static inline void lock_queue(pftw_queue_t *queue) {
 }
 
 static inline void unlock_queue(pftw_queue_t *queue) {
-	pthread_mutex_unlock(&queue->unlock);
+	pthread_mutex_unlock(&queue->lock);
 	unlock_queues();
 	return;
 }
 
-pftw_queue_t *pftw_newqueue(pftw_callback_t callback, int nopenfd, int flags) {
+pftw_queue_t *pftw_newqueue(pftw_callback_t callback, int nopenfd, int flags, void *arg) {
 	lock_queues();
 /*
 	int queue_id = queues_count;
@@ -244,7 +251,7 @@ int pftw_deletequeue(pftw_queue_t *queue) {
 
 	pthread_mutex_destroy(&queue->lock);
 
-	tdestroy(queue->tasks_btree);
+	tdestroy(queue->tasks_btree, free);
 
 	//memcpy(queue, &queues[--queues_count], sizeof(*queue));
 	free(queue);
@@ -253,29 +260,33 @@ int pftw_deletequeue(pftw_queue_t *queue) {
 	return 0;
 }
 
-static int tasks_difficultycmp_findmax(const void *task_a, const void *task_b) {
-	if (tasks_a->difficulty > tasks_b->difficulty) {
-		tasks_a->max = tasks_b;
+static int tasks_difficultycmp_findmax(const void *_task_a, const void *_task_b) {
+	const pftw_task_t *task_a = _task_a, *task_b = _task_b;
+
+	if (task_a->difficulty > task_b->difficulty) {
+		((pftw_task_t *)task_a)->max = (pftw_task_t *)task_b;
 		return 1;
 	}
-	if (tasks_a->difficulty < tasks_b->difficulty) {
-		tasks_b->max = tasks_a;
+	if (task_a->difficulty < task_b->difficulty) {
+		((pftw_task_t *)task_b)->max = (pftw_task_t *)task_a;
 		return -1;
 	}
 
 	return 0;
 }
 
-static int tasks_difficultycmp(const void *task_a, const void *task_b) {
-	if (tasks_a->difficulty > tasks_b->difficulty)
+static int tasks_difficultycmp(const void *_task_a, const void *_task_b) {
+	const pftw_task_t *task_a = _task_a, *task_b = _task_b;
+
+	if (task_a->difficulty > task_b->difficulty)
 		return 1;
-	if (tasks_a->difficulty < tasks_b->difficulty)
+	if (task_a->difficulty < task_b->difficulty)
 		return -1;
 
 	return 0;
 }
 
-int pftw_pushtask(pftw_queue_t *queue, const char *dirpath, unsigned long difficulty) {
+int pftw_pushtask(pftw_queue_t *queue, const char *dirpath, size_t dirpath_len, unsigned long difficulty) {
 	lock_queue(queue);
 
 	if (queue->tasks_count >= PFTW_MAX_QUEUE_LENGTH) {
@@ -285,20 +296,20 @@ int pftw_pushtask(pftw_queue_t *queue, const char *dirpath, unsigned long diffic
 
 	pftw_task_t *task = &queue->tasks[queue->tasks_count++];
 	task->dirpath     = strdup(dirpath);
+	task->dirpath_len = dirpath_len;
 	task->queue	  = queue;
 
-	{	// TODO: Remove this magic with the difficulty. It's just hacks to prevent key collision in the tree. First bits is a real difficulty. The last bits is just a task_id for the prevention.
-		unsigned long difficulty_overload_mask = ~((1 << (sizeof(difficulty)*8) - (PFTW_MAX_THREADS_BITS + PFTW_MAX_QUEUE_LENGTH_BITS)) - 1);
-		unsigned long difficulty_task_id_mask  =   (1 << (PFTW_MAX_THREADS_BITS + PFTW_MAX_QUEUE_LENGTH_BITS)) - 1;
+	// TODO: Remove this magic with the difficulty. It's just hacks to prevent key collision in the tree. First bits is a real difficulty. The last bits is just a task_id for the prevention.
+	unsigned long difficulty_overload_mask = ~((1 << (sizeof(difficulty)*8 - (PFTW_MAX_THREADS_BITS + PFTW_MAX_QUEUE_LENGTH_BITS))) - 1);
+	unsigned long difficulty_task_id_mask  =   (1 << (PFTW_MAX_THREADS_BITS + PFTW_MAX_QUEUE_LENGTH_BITS)) - 1;
 
-		if (difficulty & difficulty_overload_mask)
-			task->difficulty = ~difficulty_task_id_mask;
-		else
-			task->difficulty = difficulty << (PFTW_MAX_THREADS_BITS + PFTW_MAX_QUEUE_LENGTH_BITS);
+	if (difficulty & difficulty_overload_mask)
+		task->difficulty = ~difficulty_task_id_mask;
+	else
+		task->difficulty = difficulty << (PFTW_MAX_THREADS_BITS + PFTW_MAX_QUEUE_LENGTH_BITS);
 
-		task->difficulty |= queue->tasks_id++;
-		queue->task_id %= PFTW_MAX_QUEUE_LENGTH;
-	}
+	task->difficulty |= queue->current_task_id++;
+	queue->current_task_id %= PFTW_MAX_QUEUE_LENGTH;
 
 
 	while (1) {
@@ -310,8 +321,8 @@ int pftw_pushtask(pftw_queue_t *queue, const char *dirpath, unsigned long diffic
 
 		if (found != task) {	// TODO: Remove this. This is a hack to retry on key collision in the tree
 			task->difficulty &= ~difficulty_task_id_mask;
-			task->difficulty |= queue->tasks_id++;
-			queue->task_id %= PFTW_MAX_QUEUE_LENGTH;
+			task->difficulty |= queue->current_task_id++;
+			queue->current_task_id %= PFTW_MAX_QUEUE_LENGTH;
 
 			continue;
 		}
@@ -333,50 +344,109 @@ pftw_task_t *pftw_poptask(pftw_queue_t *queue) {
 
 	pftw_task_t max_key, *max;
 
-	max_key->difficulty = ~0;
+	max_key.difficulty = ~0;
 
-	void *found = tfind(&max_key, &queue->btree, tasks_difficultycmp_findmax);	// Searching for the most difficult task
+	void *found = tfind(&max_key, &queue->tasks_btree, tasks_difficultycmp_findmax);	// Searching for the most difficult task
 
-	max = max_key->max;
+	if (found == NULL) {
+		fprintf(stderr, "Unknown internal error #2 of ftw()\n");
+		return NULL;
+	}
+
+	max = max_key.max;
 
 	unlock_queue(queue);
 
 	return max;
 }
 
+int pftw_dotask(pftw_task_t *task);
+
 int pftw_dotask_processentry(pftw_task_t *task, struct dirent *entry_p) {
-	struct stat stat;
+	struct stat st;
 	pftw_queue_t *queue = task->queue;
 	int flags = queue->flags;
+	int rc;
 
 	if (flags & FTW_PHYS)
-		lstat(entry_p->d_name, &stat);
+		rc = lstat(entry_p->d_name, &st);
 	else 
-		stat (entry_p->d_name, &stat);
+		rc = stat (entry_p->d_name, &st);
+
+	if (rc)	return rc;
 
 	// TODO: check for recursion
 
-	char follow = (stat.d_type == DT_DIR);
+	char follow = (entry_p->d_type == DT_DIR);
 
 	if (flags & FTW_MOUNT)
-		if (task->stat.st_dev != stat.st_dev)
+		if (task->stat.st_dev != st.st_dev)
 			follow = 0;
 
-	char path = ;
+	size_t entry_d_name_len = strlen(entry_p->d_name);
+
+	size_t path_len = task->dirpath_len + 1 + entry_d_name_len;
+	char *path = malloc(path_len + 1);
+
+	memcpy(path, task->dirpath, task->dirpath_len);
+	path[task->dirpath_len] = '/';
+	memcpy(&path[task->dirpath_len + 1], entry_p->d_name, entry_d_name_len);
+	path[path_len] = 0;
+
+	int ftw_ftype = FTW_NS;
+	switch (entry_p->d_type) {
+		case DT_BLK:
+		case DT_CHR:
+		case DT_FIFO:
+		case DT_LNK:
+		case DT_REG:
+		case DT_SOCK:
+			ftw_ftype = FTW_F;
+			break;
+		case DT_DIR:
+			ftw_ftype = FTW_D;
+			break;
+		case DT_UNKNOWN:
+			ftw_ftype = FTW_NS;
+			break;
+	}
 
-	int rc = queue->callback(path, &stat, ftw_ftype, NULL, queue->arg);
+	rc = queue->callback(path, &st, ftw_ftype, NULL, queue->arg);
 	if (flags & FTW_ACTIONRETVAL) {
 		switch (rc) {
 			case FTW_CONTINUE:
 				break;
 			case FTW_SKIP_SUBTREE:
+				follow = 0;
 				break;
 			case FTW_SKIP_SIBLINGS:
 				fprintf(stderr, "At the moment FTW_SKIP_SIBLINGS is not supported by pftw().\n");
+				break;
 			case FTW_STOP:
+				fprintf(stderr, "At the moment FTW_STOP is not supported by pftw().\n");
 				break;
 		}
 	}
+
+	if (follow) {
+		int rc;
+		unsigned long difficulty = st.st_nlink;
+		if (difficulty >= PFTW_DIFFICULTY_THRESHOLD) {	// If the task is heavy then public it for workers
+			rc = pftw_pushtask(queue, path, path_len, difficulty);
+		} else {					// Otherwise do the task by myself
+			pftw_task_t childtask;
+
+			childtask.dirpath     = path;
+			childtask.dirpath_len = path_len;
+			childtask.queue   = queue;
+			memcpy(&childtask.stat, &st, sizeof(st));
+
+			rc = pftw_dotask(&childtask);
+		}
+		if (rc) return rc;
+	}
+
+	return 0;
 }
 
 
@@ -389,15 +459,15 @@ int pftw_dotask(pftw_task_t *task) {
 		int flags = queue->flags;
 
 		if (flags & FTW_PHYS)
-			lstat(entry_p->d_name, &task->stat);
+			lstat(task->dirpath, &task->stat);
 		else 
-			stat (entry_p->d_name, &task->stat);
+			stat (task->dirpath, &task->stat);
 	}
 
 	while (1) {
 		int rc = readdir_r(dir, &entry, &readdir_result);
 		if (rc) return rc;
-		if (result == NULL)
+		if (readdir_result == NULL)
 			break;
 
 		rc = pftw_dotask_processentry(task, &entry);
@@ -405,6 +475,10 @@ int pftw_dotask(pftw_task_t *task) {
 	}
 
 	closedir(dir);
+
+	free(task->dirpath);
+
+	return 0;
 }
 
 int pftw(const char *dirpath, pftw_callback_t fn, int nopenfd, int flags, void *arg) {
@@ -423,7 +497,7 @@ int pftw(const char *dirpath, pftw_callback_t fn, int nopenfd, int flags, void *
 	if (queue == NULL)
 		return errno;
 
-	int rc = pftw_pushtask(queue, dirpath, ~0);
+	int rc = pftw_pushtask(queue, dirpath, strlen(dirpath), ~0);
 	if (rc) return rc;
 
 	pftw_task_t *task;

+ 2 - 1
pftw.h

@@ -20,6 +20,7 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <unistd.h>
+#include <ftw.h>
 
 #define PFTW_MAX_THREADS_BITS		8
 #define PFTW_MAX_QUEUE_LENGTH_BITS	2
@@ -30,7 +31,7 @@
 #define	PFTW_ALLOCPORTION	(1<<4)
 
 
-struct FTW;
+//struct FTW;
 
 typedef int (*pftw_callback_t) (
 	const char *fpath,