pftw.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749
  1. /*
  2. libpftw — parallel file tree walk library
  3. Copyright (C) 2015 Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
  4. This program is free software: you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published by
  6. the Free Software Foundation, either version 3 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #define _GNU_SOURCE
  16. #include <errno.h>
  17. #include <semaphore.h>
  18. #include <signal.h>
  19. #include <pthread.h>
  20. #include <stdio.h> /* fprintf() */
  21. #include <string.h> /* strerror() */
  22. #include <stdlib.h>
  23. #include <search.h>
  24. #include <dirent.h>
  25. #include <sys/types.h>
  26. #include <sys/stat.h>
  27. #include <unistd.h>
  28. #include <assert.h>
  29. #include "pftw.h"
  30. #ifdef DEBUG
  31. #define dprintf(...) printf(__VA_ARGS__)
  32. #else
  33. #define dprintf(...) {}
  34. #endif
  35. char pftw_running;
  36. struct pftw_task {
  37. char dirpath[PATH_MAX];
  38. size_t dirpath_len;
  39. unsigned long difficulty;
  40. struct stat stat;
  41. struct pftw_queue *queue;
  42. struct pftw_task *max;
  43. };
  44. typedef struct pftw_task pftw_task_t;
  45. struct pftw_queue {
  46. int id;
  47. pftw_task_t *tasks[PFTW_MAX_QUEUE_LENGTH];
  48. void *tasks_btree;
  49. int tasks_count;
  50. int current_task_id;
  51. pftw_callback_t callback;
  52. int nopenfd;
  53. int flags;
  54. void *arg;
  55. pthread_mutex_t lock;
  56. sem_t ending_sem;
  57. int workers[PFTW_MAX_THREADS];
  58. int workers_count;
  59. };
  60. typedef struct pftw_queue pftw_queue_t;
  61. pthread_t *threads = NULL;
  62. int threads_count = 0;
  63. sem_t threads_sem;
  64. pftw_queue_t **thread_queue = NULL;
  65. pftw_queue_t **queues = NULL;
  66. int queues_count = 0;
  67. int queues_alloced = 0;
  68. pthread_mutex_t queues_lock = PTHREAD_MUTEX_INITIALIZER;
  69. /*
  70. pftw_queue_t *queues = NULL;
  71. int queues_count = 0;
  72. */
  73. static inline void lock_queues() {
  74. dprintf("lock queues\n");
  75. pthread_mutex_lock(&queues_lock);
  76. dprintf("locked queues\n");
  77. return;
  78. }
  79. static inline void unlock_queues() {
  80. dprintf("unlock queues\n");
  81. pthread_mutex_unlock(&queues_lock);
  82. return;
  83. }
  84. static inline int alsolock_queue(pftw_queue_t *queue) {
  85. int i = 0;
  86. while (i < queues_count)
  87. if (queues[i] == queue) {
  88. dprintf("lock %p\n", queue);
  89. pthread_mutex_lock(&queue->lock);
  90. dprintf("locked %p, tasks: %i\n", queue, queue->tasks_count);
  91. return 0;
  92. }
  93. return ENOENT;
  94. }
  95. static inline int lock_queue(pftw_queue_t *queue) {
  96. lock_queues();
  97. return alsolock_queue(queue);
  98. }
  99. static inline void unlock_queue(pftw_queue_t *queue) {
  100. dprintf("unlock %p\n", queue);
  101. pthread_mutex_unlock(&queue->lock);
  102. unlock_queues();
  103. return;
  104. }
  105. pftw_queue_t *pftw_newqueue(pftw_callback_t callback, int nopenfd, int flags, void *arg) {
  106. lock_queues();
  107. int queue_id = queues_count;
  108. if (queue_id >= queues_alloced) {
  109. queues_alloced += PFTW_ALLOCPORTION;
  110. queues = realloc(queues, queues_alloced*sizeof(*queues));
  111. if (queues == NULL) {
  112. unlock_queues();
  113. return NULL;
  114. }
  115. }
  116. queues_count++;
  117. pftw_queue_t *queue = malloc(sizeof(*queue));
  118. if (queue == NULL)
  119. return NULL;
  120. queues[queue_id] = queue;
  121. queue->id = queue_id;
  122. queue->tasks_count = 0;
  123. queue->callback = callback;
  124. queue->nopenfd = nopenfd;
  125. queue->flags = flags;
  126. queue->tasks_btree = NULL;
  127. queue->arg = arg;
  128. queue->current_task_id = 0;
  129. queue->workers_count = 0;
  130. pthread_mutex_init(&queue->lock, NULL);
  131. sem_init(&queue->ending_sem, 0, 1);
  132. unlock_queues();
  133. return queue;
  134. }
  135. /*void free_noop(void *arg) {
  136. return;
  137. }*/
  138. int pftw_deletequeue(pftw_queue_t *queue) {
  139. lock_queues();
  140. if (queue->tasks_count > 0) {
  141. unlock_queues();
  142. return EBUSY;
  143. }
  144. pthread_mutex_destroy(&queue->lock);
  145. tdestroy(queue->tasks_btree, free);
  146. int i;
  147. i = 0;
  148. while (i < queue->workers_count)
  149. thread_queue[queue->workers[i++]] = NULL;
  150. queues[queue->id] = queues[--queues_count];
  151. queues[queue->id]->id = queue->id;
  152. free(queue);
  153. unlock_queues();
  154. return 0;
  155. }
  156. static int tasks_difficultycmp_findmax(const void *_task_a, const void *_task_b) {
  157. pftw_task_t *task_a = (pftw_task_t *)_task_a, *task_b = (pftw_task_t *)_task_b;
  158. dprintf("max: %s|%s|%lu (%p, %p)\n", task_a->dirpath, task_b->dirpath, task_b->difficulty, task_a->max, task_b->max);
  159. if (task_a->difficulty > task_b->difficulty) {
  160. if (task_a->max != NULL)
  161. if (task_a->max->difficulty > task_b->difficulty)
  162. return 1;
  163. task_a->max = task_b;
  164. return 1;
  165. }
  166. if (task_a->difficulty < task_b->difficulty) {
  167. if (task_b->max != NULL)
  168. if (task_b->max->difficulty > task_a->difficulty)
  169. return -1;
  170. task_b->max = task_a;
  171. return -1;
  172. }
  173. if (task_a->max == NULL)
  174. task_a->max = task_b;
  175. if (task_b->max == NULL)
  176. task_b->max = task_a;
  177. return 0;
  178. }
  179. static int tasks_difficultycmp(const void *_task_a, const void *_task_b) {
  180. const pftw_task_t *task_a = _task_a, *task_b = _task_b;
  181. dprintf("%s|%s|%lu\n", task_a->dirpath, task_b->dirpath, task_b->difficulty);
  182. if (task_a->difficulty > task_b->difficulty)
  183. return 1;
  184. if (task_a->difficulty < task_b->difficulty)
  185. return -1;
  186. return 0;
  187. }
  188. int pftw_pushtask(pftw_queue_t *queue, const char *dirpath, size_t dirpath_len, struct stat *st, unsigned long difficulty) {
  189. dprintf("pftw_pushtask(): \"%s\"\n", dirpath);
  190. int rc = lock_queue(queue);
  191. if (rc)
  192. return (rc == ENOENT ? 0 : rc);
  193. if (queue->tasks_count >= PFTW_MAX_QUEUE_LENGTH) {
  194. unlock_queue(queue);
  195. return EBUSY;
  196. }
  197. pftw_task_t **task_p = &queue->tasks[queue->tasks_count++];
  198. *task_p = calloc(1, sizeof(**task_p));
  199. if (*task_p == NULL)
  200. return ENOMEM;
  201. pftw_task_t *task = *task_p;
  202. strcpy(task->dirpath, dirpath);
  203. task->dirpath_len = dirpath_len;
  204. task->queue = queue;
  205. if (st == NULL)
  206. memset(&task->stat, 0, sizeof(task->stat));
  207. else
  208. memcpy(&task->stat, st, sizeof(task->stat));
  209. // TODO: Remove this magic with the difficulty. It's just hacks to prevent key collision in the tree. First bits is a real difficulty. The last bits is just a task_id for the prevention.
  210. int difficulty_bitpos_edge = PFTW_MAX_THREADS_BITS + PFTW_MAX_QUEUE_LENGTH_BITS;
  211. unsigned long difficulty_overload_mask;
  212. difficulty_overload_mask = ~0;
  213. difficulty_overload_mask <<= sizeof(difficulty)*8 - difficulty_bitpos_edge;
  214. unsigned long difficulty_task_id_mask;
  215. difficulty_task_id_mask = ~0;
  216. difficulty_task_id_mask <<= difficulty_bitpos_edge;
  217. difficulty_task_id_mask = ~difficulty_task_id_mask;
  218. if (difficulty & difficulty_overload_mask)
  219. task->difficulty = ~difficulty_task_id_mask;
  220. else
  221. task->difficulty = difficulty << (PFTW_MAX_THREADS_BITS + PFTW_MAX_QUEUE_LENGTH_BITS);
  222. task->difficulty |= queue->current_task_id++;
  223. task->max = NULL;
  224. queue->current_task_id %= PFTW_MAX_QUEUE_LENGTH;
  225. dprintf("pftw_pushtask(): \"%s\" (%i): %lu\n", dirpath, queue->tasks_count, task->difficulty);
  226. while (1) {
  227. void *found = tsearch(task, &queue->tasks_btree, tasks_difficultycmp);
  228. if (found == NULL) {
  229. unlock_queue(queue);
  230. return ENOMEM;
  231. }
  232. if (found == task) { // TODO: Remove this. This is a hack to retry on key collision in the tree
  233. task->difficulty &= ~difficulty_task_id_mask;
  234. task->difficulty |= queue->current_task_id++;
  235. queue->current_task_id %= PFTW_MAX_QUEUE_LENGTH;
  236. continue;
  237. }
  238. break;
  239. }
  240. unlock_queue(queue);
  241. return sem_post(&threads_sem);
  242. }
  243. int pftw_poptask(pftw_queue_t *queue, pftw_task_t *task) {
  244. int rc = lock_queue(queue);
  245. if (rc) {
  246. unlock_queue(queue);
  247. if (rc == ENOENT)
  248. return ENOENT;
  249. fprintf(stderr, "Unknown internal error #3 of ftw()\n");
  250. return rc;
  251. }
  252. if (queue->tasks_count <= 0) {
  253. unlock_queue(queue);
  254. return ENOENT;
  255. }
  256. pftw_task_t max_key = {{0}};
  257. //strcpy(max_key.dirpath, "JUST A TEST");
  258. max_key.difficulty = ~0;
  259. //max_key.difficulty = 1000000;
  260. void *found = tfind(&max_key, &queue->tasks_btree, tasks_difficultycmp_findmax); // Searching for the most difficult task
  261. assert (found == NULL);
  262. assert (max_key.max != NULL);
  263. memcpy(task, max_key.max, sizeof(*task));
  264. dprintf("pftw_poptask(): \"%s\": %lu\n", task->dirpath, task->difficulty);
  265. found = tdelete(max_key.max, &queue->tasks_btree, tasks_difficultycmp);
  266. if (found == NULL) {
  267. fprintf(stderr, "Unknown internal error #4 of ftw()\n");
  268. unlock_queue(queue);
  269. return ENOENT;
  270. }
  271. //size_t task_inqueueid = (max_key.max - queue->tasks) / sizeof(*task);
  272. --queue->tasks_count;
  273. free(max_key.max);
  274. //if (task_inqueueid != queue->tasks_count)
  275. // memcpy(&queue->tasks[task_inqueueid], &queue->tasks[queue->tasks_count], sizeof(*queue->tasks));
  276. dprintf("pftw_poptask(): \"%s\" (%i)\n", task->dirpath, queue->tasks_count);
  277. unlock_queue(queue);
  278. return 0;
  279. }
  280. int pftw_dotask(pftw_task_t *task);
  281. int pftw_dotasknow(pftw_queue_t *queue, const char *path, size_t path_len, struct stat *st_p) {
  282. pftw_task_t childtask;
  283. strcpy(childtask.dirpath, path);
  284. childtask.dirpath_len = path_len;
  285. childtask.queue = queue;
  286. memcpy(&childtask.stat, st_p, sizeof(*st_p));
  287. return pftw_dotask(&childtask);
  288. }
  289. int pftw_dotask_processentry(pftw_task_t *task, struct dirent *entry_p) {
  290. struct stat st;
  291. pftw_queue_t *queue = task->queue;
  292. int flags = queue->flags;
  293. int rc;
  294. // Getting path
  295. size_t entry_d_name_len = strlen(entry_p->d_name);
  296. size_t path_len = task->dirpath_len + 1 + entry_d_name_len;
  297. char path[PATH_MAX+1];
  298. if (path_len > PATH_MAX) {
  299. fprintf(stderr, "pftw internal error #6\n");
  300. return ENAMETOOLONG;
  301. }
  302. memcpy(path, task->dirpath, task->dirpath_len);
  303. path[task->dirpath_len] = '/';
  304. memcpy(&path[task->dirpath_len + 1], entry_p->d_name, entry_d_name_len);
  305. path[path_len] = 0;
  306. // Getting stat
  307. dprintf("pftw_dotask_processentry(): \"%s\" (%lu) of \"%s\" (%lu) is (%lu) \"%s\"\n", entry_p->d_name, entry_d_name_len, task->dirpath, task->dirpath_len, path_len, path);
  308. if (flags & FTW_PHYS)
  309. rc = lstat(path, &st);
  310. else
  311. rc = stat (path, &st);
  312. if (rc) return errno;
  313. // TODO: check for recursion
  314. char follow = (entry_p->d_type == DT_DIR);
  315. if (flags & FTW_MOUNT)
  316. if (task->stat.st_dev != st.st_dev)
  317. follow = 0;
  318. int ftw_ftype = FTW_NS;
  319. switch (entry_p->d_type) {
  320. case DT_BLK:
  321. case DT_CHR:
  322. case DT_FIFO:
  323. case DT_LNK:
  324. case DT_REG:
  325. case DT_SOCK:
  326. ftw_ftype = FTW_F;
  327. break;
  328. case DT_DIR:
  329. ftw_ftype = FTW_D;
  330. break;
  331. case DT_UNKNOWN:
  332. ftw_ftype = FTW_NS;
  333. break;
  334. }
  335. rc = queue->callback(path, &st, ftw_ftype, NULL, queue->arg);
  336. if (flags & FTW_ACTIONRETVAL) {
  337. switch (rc) {
  338. case FTW_CONTINUE:
  339. break;
  340. case FTW_SKIP_SUBTREE:
  341. follow = 0;
  342. break;
  343. case FTW_SKIP_SIBLINGS:
  344. fprintf(stderr, "At the moment FTW_SKIP_SIBLINGS is not supported by pftw().\n");
  345. break;
  346. case FTW_STOP:
  347. fprintf(stderr, "At the moment FTW_STOP is not supported by pftw().\n");
  348. break;
  349. }
  350. }
  351. if (follow) {
  352. int rc;
  353. unsigned long difficulty = st.st_nlink;
  354. if (difficulty >= PFTW_DIFFICULTY_THRESHOLD) { // If the task is heavy then public it for workers
  355. rc = pftw_pushtask(queue, path, path_len, &st, difficulty);
  356. if (rc == EBUSY)
  357. rc = pftw_dotasknow(queue, path, path_len, &st);
  358. } else { // Otherwise do the task by myself
  359. rc = pftw_dotasknow(queue, path, path_len, &st);
  360. }
  361. if (rc) return rc;
  362. }
  363. return 0;
  364. }
  365. int pftw_dotask(pftw_task_t *task) {
  366. dprintf("opendir(%s)\n", task->dirpath);
  367. DIR *dir = opendir(task->dirpath);
  368. pftw_queue_t *queue = task->queue;
  369. if (dir == NULL) {
  370. switch (errno) {
  371. case EACCES:
  372. queue->callback(task->dirpath, &task->stat, FTW_DNR, NULL, queue->arg);
  373. return 0;
  374. default:
  375. return errno;
  376. }
  377. }
  378. struct dirent entry, *readdir_result;
  379. if (task->stat.st_nlink == 0) { // If stat() is not done, yet
  380. int rc;
  381. int flags = queue->flags;
  382. if (flags & FTW_PHYS)
  383. rc = lstat(task->dirpath, &task->stat);
  384. else
  385. rc = stat (task->dirpath, &task->stat);
  386. if (rc)
  387. return rc;
  388. }
  389. while (1) {
  390. int rc = readdir_r(dir, &entry, &readdir_result);
  391. if (rc) return rc;
  392. if (readdir_result == NULL)
  393. break;
  394. if (entry.d_name[0] == '.' && (entry.d_name[1] == 0 || (entry.d_name[1] == '.' && entry.d_name[2] == 0)))
  395. continue; // Skip "." and ".."
  396. rc = pftw_dotask_processentry(task, &entry);
  397. if (rc) return rc;
  398. }
  399. closedir(dir);
  400. return 0;
  401. }
  402. int pftw(const char *dirpath, pftw_callback_t fn, int nopenfd, int flags, void *arg) {
  403. if (!pftw_running)
  404. return EBUSY;
  405. if (flags & ~(FTW_ACTIONRETVAL|FTW_MOUNT|FTW_PHYS)) // Check if all flags are supported flags
  406. return EINVAL;
  407. if (!(flags & FTW_PHYS)) { // Recursion anti-loop is not implemented. Symlinks are too often dangerous.
  408. fprintf(stderr, "At the moment pftw() cannot be used without flag FTW_PHYS.\n");
  409. return EINVAL;
  410. }
  411. pftw_queue_t *queue = pftw_newqueue(fn, nopenfd, flags, arg);
  412. if (queue == NULL)
  413. return errno;
  414. int rc = pftw_pushtask(queue, dirpath, strlen(dirpath), NULL, ~0);
  415. if (rc == EBUSY) {
  416. fprintf(stderr, "This case is not implemented, yet\n");
  417. }
  418. if (rc) return rc;
  419. pftw_task_t task;
  420. while ((rc = pftw_poptask(queue, &task)) == 0) {
  421. rc = pftw_dotask(&task);
  422. if (rc) return rc;
  423. }
  424. if (rc && rc != ENOENT) return rc;
  425. while (queue->workers_count > 0) {
  426. dprintf("pftw(): queue->workers_count == %i\n", queue->workers_count);
  427. sem_wait(&queue->ending_sem);
  428. };
  429. rc = pftw_deletequeue(queue);
  430. if (rc) return rc;
  431. return 0;
  432. }
  433. void pftw_worker_dash(int worker_id) {
  434. int rc;
  435. dprintf("pftw_worker_dash(%i)\n", worker_id);
  436. do {
  437. pftw_queue_t *queue;
  438. if (queues_count == 0) {
  439. return;
  440. }
  441. queue = thread_queue[worker_id];
  442. if (queue == NULL) {
  443. lock_queues();
  444. if (queues_count == 0) {
  445. unlock_queues();
  446. return;
  447. }
  448. int queue_id = worker_id % queues_count;
  449. dprintf("pftw_worker_dash(%i): queue_id == %i\n", worker_id, queue_id);
  450. thread_queue[worker_id] = queues[queue_id];
  451. queue = thread_queue[worker_id];
  452. queue->workers[queue->workers_count++] = worker_id;
  453. unlock_queues();
  454. }
  455. pftw_task_t task;
  456. rc = pftw_poptask(queue, &task);
  457. switch (rc) {
  458. case 0:
  459. rc = pftw_dotask(&task);
  460. if (rc) {
  461. fprintf(stderr, "pftw internal error #1: %s\n", strerror(errno));
  462. return;
  463. }
  464. break;
  465. case ENOENT:
  466. break;
  467. default:
  468. fprintf(stderr, "pftw internal error #5: %s\n", strerror(errno));
  469. return;
  470. }
  471. dprintf("pftw_worker_dash(%i): %i\n", worker_id, rc);
  472. } while (rc == 0);
  473. return;
  474. }
  475. void *pftw_worker(void *_arg) {
  476. int worker_id = (long)_arg;
  477. int ret;
  478. ret = sem_wait(&threads_sem);
  479. while (pftw_running) {
  480. pftw_worker_dash(worker_id);
  481. dprintf("worker %i: sem_wait(): pftw_running == %i; %p\n", worker_id, pftw_running, thread_queue[worker_id]);
  482. if (thread_queue[worker_id] != NULL) {
  483. pftw_queue_t *queue = thread_queue[worker_id];
  484. int rc = lock_queue(queue);
  485. if (!rc) {
  486. {
  487. int i;
  488. i = 0;
  489. while (i < queue->workers_count) {
  490. if (queue->workers[i] == worker_id) {
  491. queue->workers[i] = queue->workers[--queue->workers_count];
  492. break;
  493. }
  494. i++;
  495. }
  496. }
  497. sem_post(&queue->ending_sem);
  498. unlock_queue(queue);
  499. }
  500. thread_queue[worker_id] = NULL;
  501. }
  502. ret = sem_wait(&threads_sem);
  503. if (ret) {
  504. pftw_running = 0;
  505. fprintf(stderr, "pftw internal error #0: %s\n", strerror(errno));
  506. return (void *)(long)errno;
  507. }
  508. }
  509. dprintf("worker %i finished (%i)\n", worker_id, pftw_running);
  510. return NULL;
  511. }
  512. int pftw_init(int num_threads) {
  513. if (num_threads < 2)
  514. return EINVAL;
  515. if (num_threads > PFTW_MAX_THREADS)
  516. return EINVAL;
  517. num_threads--; // One thread is the master thread [pftw()]
  518. if (pftw_running) {
  519. return EBUSY;
  520. }
  521. threads = calloc(num_threads, sizeof(pthread_t));
  522. thread_queue = calloc(num_threads, sizeof(void *));
  523. if (threads == NULL)
  524. return ENOMEM;
  525. int ret = sem_init(&threads_sem, 0, num_threads);
  526. if (ret) {
  527. free(threads);
  528. return errno;
  529. }
  530. pftw_running = 1;
  531. {
  532. int i = 0;
  533. while (i < num_threads) {
  534. pthread_create(&threads[i], NULL, pftw_worker, (void *)(long)i);
  535. i++;
  536. }
  537. threads_count = num_threads;
  538. }
  539. return 0;
  540. }
  541. int pftw_deinit() {
  542. dprintf("pftw_deinit()\n");
  543. if (!pftw_running)
  544. return ENOENT;
  545. // No more iterations for pftw workers
  546. pftw_running = 0;
  547. {
  548. int i;
  549. // Interrupting sem_wait()
  550. i = 0;
  551. while (i < threads_count) {
  552. // int ret = pthread_kill(threads[i], SIGCONT);
  553. int ret = sem_post(&threads_sem);
  554. if (ret)
  555. return ret;
  556. i++;
  557. }
  558. // Waiting for finish
  559. i = 0;
  560. while (i < threads_count) {
  561. int ret;
  562. void *retval;
  563. ret = pthread_join(threads[i], &retval);
  564. if (ret)
  565. return ret;
  566. i++;
  567. }
  568. }
  569. // Clean up
  570. free(threads);
  571. threads = NULL;
  572. threads_count = 0;
  573. free(thread_queue);
  574. thread_queue = NULL;
  575. {
  576. int i;
  577. i = 0;
  578. while (i < queues_count)
  579. free(queues[i++]);
  580. free(queues);
  581. queues = NULL;
  582. queues_count = 0;
  583. queues_alloced = 0;
  584. }
  585. return sem_destroy(&threads_sem);
  586. }