cluster.c 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536
  1. /*
  2. clsync - file tree sync utility based on fanotify and inotify
  3. Copyright (C) 2013 Dmitry Yu Okunev <dyokunev@ut.mephi.ru> 0x8E30679C
  4. This program is free software: you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published by
  6. the Free Software Foundation, either version 3 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU General Public License for more details.
  12. You should have received a copy of the GNU General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. /*
  16. Hello, dear developer.
  17. Cluster technologies are almost always very difficult. So I'll try to
  18. fill this code with comments. Enjoy ;)
  19. Also you can ask me directly by e-mail or IRC, if something seems too
  20. hard.
  21. -- 0x8E30679C
  22. */
  23. #ifdef CLUSTER_SUPPORT
  24. #include "common.h"
  25. #include "cluster.h"
  26. #include "sync.h"
  27. #include "output.h"
  28. #include "malloc.h"
  29. #ifdef HAVE_MHASH
  30. #include <mhash.h>
  31. #endif
  32. // Global variables. They will be initialized in cluster_init()
  33. #define NODES_ALLOC (MAX(MAXNODES, NODEID_NOID)+1)
  34. int sock_i = -1;
  35. struct sockaddr_in sa_i = {0};
  36. int sock_o = -1;
  37. struct sockaddr_in sa_o = {0};
  38. options_t *options_p = NULL;
  39. indexes_t *indexes_p = NULL;
  40. pthread_t pthread_cluster = 0;
  41. nodeinfo_t nodeinfo[NODES_ALLOC]= {{0}};
  42. nodeinfo_t *nodeinfo_my = NULL;
  43. uint8_t node_id_my = NODEID_NOID;
  44. uint8_t node_ids[NODES_ALLOC] = {0};
  45. unsigned int cluster_timeout = 0;
  46. uint8_t node_count = 0;
  47. uint8_t node_online = 0;
  48. cluster_recvproc_funct_t recvproc_funct[COUNT_CLUSTERCMDID] = {NULL};
  49. window_t window_i = {0};
  50. window_t window_o = {0};
  51. /**
  52. * @brief Adds command (message) to window_p->buffer
  53. *
  54. * @param[in] clustercmd_p Pointer to cluster cmd to put into window
  55. *
  56. * @retval zero Successful
  57. * @retval non-zero If got error while deleting the message. The error-code is placed into returned value.
  58. *
  59. */
  60. static inline int clustercmd_window_add(window_t *window_p, clustercmd_t *clustercmd_p, GHashTable *serial2queuedpacket_ht) {
  61. #ifdef PARANOID
  62. if(clustercmd_p->h.src_node_id >= MAXNODES) {
  63. printf_e("Error: clustercmd_window_add(): Invalid src_node_id: %i.\n", clustercmd_p->h.src_node_id);
  64. return EINVAL;
  65. }
  66. #endif
  67. // Checking if there enough window_p->cells allocated
  68. if(window_p->packets_len >= window_p->size) {
  69. window_p->size += ALLOC_PORTION;
  70. # define CXREALLOC(a, size) \
  71. (typeof(a))xrealloc((char *)(a), (size_t)(size) * sizeof(*(a)))
  72. window_p->packets_id = CXREALLOC(window_p->packets_id, window_p->size);
  73. window_p->occupied_sides = CXREALLOC(window_p->occupied_sides, window_p->size);
  74. # undef CXREALLOC
  75. }
  76. // Calculating required memory space in buffer for the message
  77. size_t clustercmd_size = CLUSTERCMD_SIZE(clustercmd_p);
  78. size_t required_space = sizeof(clustercmdqueuedpackethdr_t) + clustercmd_size;
  79. // Searching occupied boundaries in the window_p->buffer
  80. size_t occupied_left = SIZE_MAX, occupied_right=0;
  81. int i;
  82. i = 0;
  83. while(i < window_p->packets_len) {
  84. unsigned int window_id;
  85. window_id = window_p->packets_id[i];
  86. occupied_left = MIN(occupied_left, window_p->occupied_sides[window_id].left);
  87. occupied_right = MAX(occupied_right, window_p->occupied_sides[window_id].right);
  88. }
  89. printf_ddd("Debug3: clustercmd_window_add(): w.size == %u, b_left == %u; b_right == %u; w.buf_size == %u; r_space == %u\n",
  90. window_p->size, occupied_left, occupied_right, window_p->buf_size, required_space);
  91. // Trying to find a space in the buffer to place message
  92. size_t buf_coordinate = SIZE_MAX;
  93. if(window_p->packets_len) {
  94. // Free space from left (start of buffer)
  95. size_t free_left = occupied_left;
  96. // Free space from right (end of buffer)
  97. size_t free_right = window_p->buf_size - occupied_right;
  98. if(free_left > required_space)
  99. buf_coordinate = free_left - required_space;
  100. else
  101. if(free_right > required_space)
  102. buf_coordinate = occupied_right;
  103. else
  104. {
  105. // Not enough space in the window_p->buffer;
  106. window_p->buf_size += MAX(CLUSTER_WINDOW_BUFSIZE_PORTION, required_space);
  107. window_p->buf = xrealloc(window_p->buf, window_p->buf_size);
  108. buf_coordinate = occupied_right;
  109. }
  110. printf_ddd("Debug3: clustercmd_window_add(): f_left == %u; f_right == %u; b_coord == %u; w.buf_size == %u",
  111. free_left, free_right, buf_coordinate, window_p->buf_size);
  112. } else {
  113. buf_coordinate = 0;
  114. if(window_p->buf_size <= required_space) {
  115. window_p->buf_size += MAX(CLUSTER_WINDOW_BUFSIZE_PORTION, required_space);
  116. window_p->buf = xrealloc(window_p->buf, window_p->buf_size);
  117. }
  118. }
  119. unsigned int window_id;
  120. // packet id in window
  121. window_id = window_p->packets_len;
  122. // reserving the space in buffer
  123. window_p->occupied_sides[window_id].left = buf_coordinate;
  124. window_p->occupied_sides[window_id].right = buf_coordinate + required_space;
  125. // placing information into buffer
  126. clustercmdqueuedpacket_t *queuedpacket_p;
  127. printf_ddd("Debug3: clustercmd_window_add(): b_coord == %u\n", buf_coordinate);
  128. queuedpacket_p = (clustercmdqueuedpacket_t *)&window_p->buf[buf_coordinate];
  129. memset(&queuedpacket_p->h, 0, sizeof(queuedpacket_p->h));
  130. memcpy(&queuedpacket_p->cmd, clustercmd_p, clustercmd_size);
  131. queuedpacket_p->h.window_id = window_id;
  132. // remembering new packet
  133. g_hash_table_insert(serial2queuedpacket_ht, GINT_TO_POINTER(clustercmd_p->h.serial), queuedpacket_p);
  134. window_p->packets_id[window_p->packets_len++] = window_id;
  135. return 0;
  136. }
  137. /**
  138. * @brief Removes command (message) from window_p->buffer
  139. *
  140. * @param[in] queuedpacket_p Pointer to queuedpacket structure of the command (message)
  141. *
  142. * @retval zero Successful
  143. * @retval non-zero If got error while deleting the message. The error-code is placed into returned value.
  144. *
  145. */
  146. static inline int clustercmd_window_del(window_t *window_p, clustercmdqueuedpacket_t *queuedpacket_p, GHashTable *serial2queuedpacket_ht) {
  147. #ifdef PARANOID
  148. if(!window_p->size) {
  149. printf_e("Error: clustercmd_window_del(): window not allocated.\n");
  150. return EINVAL;
  151. }
  152. if(!window_p->packets_len) {
  153. printf_e("Error: clustercmd_window_del(): there already no packets in the window.\n");
  154. return EINVAL;
  155. }
  156. #endif
  157. unsigned int window_id_del = queuedpacket_p->h.window_id;
  158. unsigned int window_id_last = --window_p->packets_len;
  159. // Forgeting the packet
  160. // Moving the last packet into place of deleting packet, to free the tail in "window_p->packets_id" and "window_p->occupied_sides"
  161. if(window_id_del != window_id_last) {
  162. printf_ddd("Debug3: clustercmd_window_del(): %i -> %i\n", window_id_last, window_id_del);
  163. window_p->packets_id[window_id_del] = window_p->packets_id[window_id_last];
  164. memcpy(&window_p->occupied_sides[window_id_del], &window_p->occupied_sides[window_id_last], sizeof(window_p->occupied_sides[window_id_del]));
  165. }
  166. // Removing from hash table
  167. g_hash_table_remove(serial2queuedpacket_ht, GINT_TO_POINTER(queuedpacket_p->cmd.h.serial));
  168. return 0;
  169. }
  170. #ifndef HAVE_MHASH
  171. /**
  172. * @brief Calculated Adler32 value for char array
  173. *
  174. * @param[in] date Pointer to data
  175. * @param[in] len Length of the data
  176. *
  177. * @retval uint32_t Adler32 value of data
  178. *
  179. */
  180. // Copied from http://en.wikipedia.org/wiki/Adler-32
  181. uint32_t adler32_calc(unsigned char *data, int32_t len) { // where data is the location of the data in physical
  182. // memory and len is the length of the data in bytes
  183. const int MOD_ADLER = 65521;
  184. uint32_t a = 1, b = 0;
  185. int32_t index;
  186. // Process each byte of the data in order
  187. for (index = 0; index < len; ++index)
  188. {
  189. a = (a + data[index]) % MOD_ADLER;
  190. b = (b + a) % MOD_ADLER;
  191. }
  192. return (b << 16) | a;
  193. }
  194. #endif
  195. /**
  196. * @brief Calculates Adler32 for clustercmd
  197. *
  198. * @param[in] clustercmd_p Pointer to clustercmd
  199. * @param[out] clustercmdadler32_p Pointer to structure to return value(s)
  200. *
  201. * @retval zero On successful calculation
  202. * @retval non-zero On error. Error-code is placed into returned value.
  203. *
  204. */
  205. int clustercmd_adler32_calc(clustercmd_t *clustercmd_p, clustercmdadler32_t *clustercmdadler32_p, adler32_calc_t flags) {
  206. if(flags & ADLER32_CALC_HEADER) {
  207. uint32_t adler32;
  208. clustercmdadler32_t adler32_save;
  209. // Preparing
  210. memcpy(&adler32_save, &clustercmd_p->h.adler32, sizeof(clustercmdadler32_t));
  211. memset(&clustercmd_p->h.adler32, 0, sizeof(clustercmdadler32_t));
  212. adler32 = 0xFFFFFFFF;
  213. uint32_t size = sizeof(clustercmdhdr_t);
  214. char *ptr = (char *)&clustercmd_p->h;
  215. // Calculating
  216. #ifdef HAVE_MHASH
  217. MHASH td = mhash_init(MHASH_ADLER32);
  218. mhash(td, ptr, size);
  219. mhash_deinit(td, &adler32);
  220. #else
  221. adler32 = adler32_calc((unsigned char *)ptr, size);
  222. #endif
  223. // Ending
  224. memcpy(&clustercmd_p->h.adler32, &adler32_save, sizeof(clustercmdadler32_t));
  225. clustercmdadler32_p->hdr = adler32 ^ 0xFFFFFFFF;
  226. }
  227. if(flags & ADLER32_CALC_DATA) {
  228. uint32_t adler32;
  229. uint32_t size = clustercmd_p->h.data_len;
  230. char *ptr = clustercmd_p->data.p;
  231. #ifdef PARANOID
  232. if(size & 0x3) {
  233. printf_e("Error: clustercmd_adler32_calc(): clustercmd_p->h.data_len&0x3 != 0: %u\n",
  234. clustercmd_p->h.data_len);
  235. return EINVAL;
  236. }
  237. #endif
  238. // Calculating
  239. #ifdef HAVE_MHASH
  240. MHASH td = mhash_init(MHASH_ADLER32);
  241. mhash(td, ptr, size);
  242. mhash_deinit(td, &adler32);
  243. #else
  244. adler32 = adler32_calc((unsigned char *)ptr, size);
  245. #endif
  246. // Ending
  247. clustercmdadler32_p->dat = adler32 ^ 0xFFFFFFFF;
  248. }
  249. return 0;
  250. }
  251. /**
  252. * @brief Changes information about node's status in nodeinfo[] and updates connected information.
  253. *
  254. * @param[in] node_id node_id of the node.
  255. * @param[in] node_status New node status.
  256. *
  257. * @retval zero Successful
  258. * @retval non-zero If got error while changing the status. The error-code is placed into returned value.
  259. *
  260. */
  261. int node_status_change(uint8_t node_id, uint8_t node_status) {
  262. uint8_t node_status_old = nodeinfo[node_id].status;
  263. nodeinfo_t *nodeinfo_p = &nodeinfo[node_id];
  264. if((node_status == NODESTATUS_DOESNTEXIST) && (node_status_old != NODESTATUS_DOESNTEXIST)) {
  265. node_count--;
  266. node_ids[nodeinfo_p->num] = node_ids[node_count];
  267. g_hash_table_destroy(nodeinfo_p->modtime_ht);
  268. g_hash_table_destroy(nodeinfo_p->serial2queuedpacket_ht);
  269. #ifdef VERYPARANOID
  270. memset(nodeinfo_p, 0, sizeof(*nodeinfo_p));
  271. #endif
  272. return 0;
  273. }
  274. if(node_status == node_status_old)
  275. return 0;
  276. switch(node_status_old) {
  277. case NODESTATUS_DOESNTEXIST:
  278. nodeinfo_p->id = node_id;
  279. nodeinfo_p->num = node_count;
  280. nodeinfo_p->modtime_ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, 0);
  281. nodeinfo_p->serial2queuedpacket_ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, 0, 0);
  282. node_ids[node_count] = node_id;
  283. node_count++;
  284. #ifdef PARANOID
  285. if(node_status == NODESTATUS_OFFLINE)
  286. break; // In case of NODESTATUS_DOESNTEXIST -> NODESTATUS_OFFLINE, node_online should be increased
  287. #endif
  288. case NODESTATUS_OFFLINE:
  289. node_online++;
  290. break;
  291. default:
  292. if(node_status == NODESTATUS_OFFLINE)
  293. node_online--;
  294. break;
  295. }
  296. nodeinfo[node_id].status = node_status;
  297. return 0;
  298. }
  299. /**
  300. * @brief Sends message to another nodes of the cluster.
  301. *
  302. * @param[in] clustercmd_p Command structure pointer.
  303. *
  304. * @retval zero Successfully send.
  305. * @retval non-zero Got error, while sending.
  306. *
  307. */
  308. int cluster_send(clustercmd_t *clustercmd_p) {
  309. clustercmd_p->h.src_node_id = node_id_my;
  310. clustercmd_adler32_calc(clustercmd_p, &clustercmd_p->h.adler32, ADLER32_CALC_ALL);
  311. printf_ddd("Debug3: cluster_send(): Sending: "
  312. "{h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u, adler32.hdr: %p, adler32.dat: %p, data_len: %u}\n",
  313. clustercmd_p->h.dst_node_id, clustercmd_p->h.src_node_id, clustercmd_p->h.cmd_id,
  314. (void *)(long)clustercmd_p->h.adler32.hdr, (void *)(long)clustercmd_p->h.adler32.dat,
  315. clustercmd_p->h.data_len);
  316. nodeinfo_t *nodeinfo_p;
  317. nodeinfo_p = &nodeinfo[clustercmd_p->h.dst_node_id];
  318. // Checking if the node online
  319. switch(nodeinfo_p->status) {
  320. case NODESTATUS_DOESNTEXIST:
  321. case NODESTATUS_OFFLINE:
  322. printf_d("Debug: cluster_send(): There's no online node with id %u. Skipping sending.\n", clustercmd_p->h.dst_node_id);
  323. return EADDRNOTAVAIL;
  324. default:
  325. break;
  326. }
  327. // Putting the message into output windowa
  328. if(nodeinfo_my != NULL)
  329. clustercmd_window_add(&window_o, clustercmd_p, nodeinfo_my->serial2queuedpacket_ht);
  330. // Sending the message
  331. sendto(sock_o, clustercmd_p, CLUSTERCMD_SIZE_PADDED(clustercmd_p), 0, &sa_o, sizeof(sa_o));
  332. // Finishing
  333. return 0;
  334. }
  335. /**
  336. * @brief Sets message processing functions for cluster_recv_proc() function for specified command type
  337. *
  338. * @param[in] cmd_id The command type
  339. * @param[in] procfunct The processing function for messages with specified cmd_id
  340. *
  341. * @retval zero Successful
  342. * @retval non-zero If got error while setting processing function. The error-code is placed into returned value.
  343. *
  344. */
  345. static inline int cluster_recv_proc_set(clustercmd_id_t cmd_id, cluster_recvproc_funct_t procfunct) {
  346. recvproc_funct[cmd_id] = procfunct;
  347. return 0;
  348. }
  349. /**
  350. * @brief Safe wrapper for recvfrom() function
  351. *
  352. * @param[in] sock The socket descriptor
  353. * @param[in] buf Pointer to buffer
  354. * @param[in] size Amount of bytes to read
  355. *
  356. * @retval zero Successful
  357. * @retval non-zero If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
  358. *
  359. */
  360. static inline int cluster_read(int sock, void *buf, size_t size, cluster_read_flags_t flags) {
  361. static struct in_addr last_addr = {0};
  362. struct sockaddr_in sa_in;
  363. size_t sa_in_len = sizeof(sa_in);
  364. int readret = recvfrom(sock, buf, size, MSG_WAITALL, (struct sockaddr *)&sa_in, (socklen_t * restrict)&sa_in_len);
  365. if(flags & CLREAD_CONTINUE) {
  366. if(memcmp(&last_addr, &sa_in.sin_addr, sizeof(last_addr))) {
  367. printf_d("Debug: Get message from wrong source (%s != %s). Skipping it :(.\n", inet_ntoa(sa_in.sin_addr), inet_ntoa(last_addr));
  368. size = 0;
  369. return 0;
  370. }
  371. }
  372. memcpy(&last_addr, &sa_in.sin_addr, sizeof(last_addr));
  373. #ifdef PARANOID
  374. if(!readret) {
  375. printf_e("Error: cluster_read(): recvfrom() returned 0. This shouldn't happend. Exit.");
  376. return EINVAL;
  377. }
  378. #endif
  379. if(readret < 0) {
  380. printf_e("Error: cluster_read(): recvfrom() returned %i. "
  381. "Seems, that something wrong with network socket: %s (errno %i).\n",
  382. readret, strerror(errno), errno);
  383. return errno != -1 ? errno : -2;
  384. }
  385. printf_dd("Debug2: cluster_read(): Got message from %s (len: %i, expected: %i).\n", inet_ntoa(sa_in.sin_addr), readret, size);
  386. if(readret < size) {
  387. // Too short message
  388. printf_e("Warning: cluster_read(): Got too short message from node. Ignoring it.\n");
  389. return -1;
  390. }
  391. return 0;
  392. }
  393. /**
  394. * @brief Sends packet-reject notification
  395. *
  396. * @param[in] clustercmd_p Pointer to clustercmd that will be rejected
  397. * @param[in] reason Reason why the clustercmd is denied
  398. *
  399. * @retval zero Successful
  400. * @retval non-zero If got error while read()-ing. The error-code is placed into returned value. "-1" means that message is too short.
  401. *
  402. */
  403. static inline int clustercmd_reject(clustercmd_t *clustercmd_p, uint8_t reason) {
  404. clustercmd_t *clustercmd_rej_p = CLUSTER_ALLOCA(clustercmd_rej_t, 0);
  405. clustercmd_rej_p->h.dst_node_id = clustercmd_p->h.src_node_id;
  406. clustercmd_rej_p->data.rej.serial = clustercmd_p->h.serial;
  407. clustercmd_rej_p->data.rej.reason = reason;
  408. return cluster_send(clustercmd_rej_p);
  409. }
  410. #define CLUSTER_RECV_RETURNMESSAGE(clustercmd_p) {\
  411. last_serial = (clustercmd_p)->h.serial;\
  412. last_src_node_id = (clustercmd_p)->h.src_node_id;\
  413. if(clustercmd_pp != NULL)\
  414. *clustercmd_pp = (clustercmd_p);\
  415. return 1;\
  416. }
  417. /**
  418. * @brief Receives message from another nodes of the cluster. (not thread-safe)
  419. *
  420. * @param[out] clustercmd_pp Pointer to command structure pointer. It will be re-allocated every time when size is not enough. Allocated space will be reused on next calling.
  421. * @param[i/o] timeout_p Pointer to timeout (in milliseconds). Timeout is assumed zero if the pointer is NULL. After waiting the event timeout value will be decreased on elapsed time.
  422. *
  423. * @retval 1 If there's new message.
  424. * @retval 0 If there's no new messages.
  425. * @retval -1 If got error while receiving. The error-code is placed into "errno".
  426. *
  427. */
  428. static int cluster_recv(clustercmd_t **clustercmd_pp, unsigned int *timeout_p) {
  429. static clustercmd_t *clustercmd_p=NULL;
  430. static size_t size=0;
  431. static uint8_t last_src_node_id = NODEID_NOID;
  432. static uint32_t last_serial = 0;
  433. int timeout;
  434. // Getting the timeout
  435. timeout = (timeout_p == NULL ? 0 : *timeout_p);
  436. if(!size) {
  437. size = BUFSIZ;
  438. clustercmd_p = (clustercmd_t *)xmalloc(size);
  439. }
  440. // Checking if there message is waiting in the window
  441. if(last_src_node_id != NODEID_NOID) {
  442. nodeinfo_t *nodeinfo_p = &nodeinfo[last_src_node_id];
  443. clustercmdqueuedpacket_t *clustercmdqueuedpacket_p = (clustercmdqueuedpacket_t *)
  444. g_hash_table_lookup(nodeinfo_p->serial2queuedpacket_ht, GINT_TO_POINTER(last_serial+1));
  445. if(clustercmdqueuedpacket_p != NULL)
  446. CLUSTER_RECV_RETURNMESSAGE(&clustercmdqueuedpacket_p->cmd);
  447. }
  448. // Checking if there any event on read socket
  449. // select()
  450. struct timeval tv;
  451. fd_set rfds;
  452. FD_ZERO(&rfds);
  453. FD_SET(sock_i, &rfds);
  454. tv.tv_sec = timeout / 1000;
  455. tv.tv_usec = timeout % 1000;
  456. int selret = select(sock_i+1, &rfds, NULL, NULL, &tv);
  457. // Remembering the rest part of timeout
  458. if(timeout_p != NULL)
  459. *timeout_p = tv.tv_sec * 1000 + tv.tv_usec / 1000;
  460. // processing select()'s retuned value
  461. if(selret < 0) {
  462. printf_e("Error: cluster_recv(): got error while select(): %s (errno: %i).\n",
  463. strerror(errno), errno);
  464. return 0;
  465. }
  466. if(selret == 0) {
  467. printf_ddd("Debug: cluster_recv(): no new messages.\n");
  468. return 0;
  469. }
  470. printf_ddd("Debug: cluster_recv(): got new message(s).\n");
  471. // Reading new message's header
  472. clustercmdadler32_t adler32;
  473. //clustercmd_t *clustercmd_p = (clustercmd_t *)mmap(NULL, sizeof(clustercmdhdr_t), PROT_NONE,
  474. // MAP_PRIVATE, sock, 0);
  475. int ret;
  476. if((ret=cluster_read(sock_i, (void *)clustercmd_p, sizeof(clustercmdhdr_t), CLREAD_NONE))) {
  477. if(ret == -1) return 0; // Invalid message? Skipping.
  478. printf_e("Error: cluster_recv(): Got error from cluster_read(): %s (errno %i).\n",
  479. strerror(errno), errno);
  480. errno = ret;
  481. return -1;
  482. }
  483. // Checking adler32 of packet headers.
  484. clustercmd_adler32_calc(clustercmd_p, &adler32, ADLER32_CALC_HEADER);
  485. if(adler32.hdr != clustercmd_p->h.adler32.hdr) {
  486. printf_d("Debug: cluster_recv(): hdr-adler32 mismatch: %p != %p.\n",
  487. (void*)(long)clustercmd_p->h.adler32.hdr, (void*)(long)adler32.hdr);
  488. if((ret=clustercmd_reject(clustercmd_p, REJ_adler32MISMATCH)) != EADDRNOTAVAIL) {
  489. printf_e("Error: cluster_recv(): Got error while clustercmd_reject(): %s (errno: %i).\n",
  490. strerror(ret), ret);
  491. errno = ret;
  492. return -1;
  493. }
  494. }
  495. // Checking src_node_id and dst_node_id
  496. uint8_t src_node_id = clustercmd_p->h.src_node_id;
  497. uint8_t dst_node_id = clustercmd_p->h.dst_node_id;
  498. // Packet from registering node?
  499. if(src_node_id == NODEID_NOID) {
  500. // Wrong command from registering node?
  501. if(clustercmd_p->h.cmd_id != CLUSTERCMDID_GETMYID) {
  502. printf_e("Warning: cluster_recv(): Got non getmyid packet from NOID node. Ignoring the packet.\n");
  503. return 0;
  504. }
  505. if(clustercmd_p->h.serial != 0) {
  506. printf_e("Warning: cluster_recv(): Got packet with non-zero serial from NOID node. Ignoring the packet.\n");
  507. return 0;
  508. }
  509. } else
  510. // Wrong src_node_id?
  511. if(src_node_id >= MAXNODES) {
  512. printf_e("Warning: cluster_recv(): Invalid h.src_node_id: %i >= "XTOSTR(MAXNODES)"\n",
  513. src_node_id);
  514. return 0;
  515. }
  516. // Is this broadcast message?
  517. if(dst_node_id == NODEID_NOID) {
  518. // CODE HERE
  519. } else
  520. // Wrong dst_node_id?
  521. if(dst_node_id >= MAXNODES) {
  522. printf_e("Warning: cluster_recv(): Invalid h.dst_node_id: %i >= "XTOSTR(MAXNODES)"\n",
  523. dst_node_id);
  524. return 0;
  525. }
  526. // Seems, that headers are correct. Continuing.
  527. printf_ddd("Debug3: cluster_recv(): Received: {h.dst_node_id: %u, h.src_node_id: %u, cmd_id: %u,"
  528. " adler32: %u, data_len: %u}, timeout: %u -> %u\n",
  529. dst_node_id, src_node_id, clustercmd_p->h.cmd_id,
  530. clustercmd_p->h.adler32, clustercmd_p->h.data_len, *timeout_p, timeout);
  531. // Paranoid routines
  532. // The message from us? Something wrong if it is.
  533. #ifdef PARANOID
  534. if((clustercmd_p->h.src_node_id == node_id_my) && (node_id_my != NODEID_NOID)) {
  535. #ifdef VERYPARANOID
  536. printf_e("Error: cluster_recv(): clustercmd_p->h.src_node_id == node_id_my (%i != %i)."
  537. " Exit.\n", clustercmd_p->h.src_node_id, node_id_my);
  538. return EINVAL;
  539. #else
  540. printf_e("Warning: cluster_recv(): clustercmd_p->h.src_node_id == node_id_my (%i != %i)."
  541. " Ignoring the command.\n", clustercmd_p->h.src_node_id, node_id_my);
  542. clustercmd_p = NULL;
  543. return 0;
  544. #endif
  545. }
  546. #endif
  547. nodeinfo_t *nodeinfo_p = &nodeinfo[src_node_id];
  548. // Not actual packet?
  549. if(clustercmd_p->h.serial <= nodeinfo_p->last_serial) {
  550. printf_d("Debug: cluster_recv(): Ignoring packet from %i due to serial: %i <= %i\n",
  551. src_node_id, clustercmd_p->h.serial, nodeinfo_p->last_serial);
  552. return 0;
  553. }
  554. // Is this misordered packet?
  555. if(clustercmd_p->h.serial != nodeinfo_p->last_serial + 1) {
  556. clustercmd_window_add(&window_i, clustercmd_p, nodeinfo_p->serial2queuedpacket_ht);
  557. return 0;
  558. }
  559. // Is this the end of packet (packet without data)
  560. if(clustercmd_p->h.data_len == 0)
  561. CLUSTER_RECV_RETURNMESSAGE(clustercmd_p);
  562. // Too big data?
  563. if(clustercmd_p->h.data_len > CLUSTER_PACKET_MAXSIZE) {
  564. printf_e("Warning: cluster_recv(): Got too big message from node %i. Ignoring it.\n",
  565. src_node_id);
  566. return 0;
  567. }
  568. // Incorrect size of data?
  569. if(clustercmd_p->h.data_len & 0x3) {
  570. printf_e("Warning: cluster_recv(): Received packet of size not a multiple of 4. Ignoring it.\n");
  571. return 0;
  572. }
  573. // Need more space for this packet?
  574. if(CLUSTERCMD_SIZE(clustercmd_p) > size) {
  575. size = CLUSTERCMD_SIZE(clustercmd_p);
  576. clustercmd_p = (clustercmd_t *)xrealloc((char *)clustercmd_p, size);
  577. }
  578. // Reading the data
  579. if((ret=cluster_read(sock_i, (void *)clustercmd_p->data.p, clustercmd_p->h.data_len, CLREAD_CONTINUE))) {
  580. if(ret == -1) return 0;
  581. printf_e("Error: cluster_recv(): Got error from cluster_read(): %s (errno %i).\n",
  582. strerror(errno), errno);
  583. errno = ret;
  584. return -1;
  585. }
  586. // Checking adler32 of packet data.
  587. clustercmd_adler32_calc(clustercmd_p, &adler32, ADLER32_CALC_DATA);
  588. if(adler32.dat != clustercmd_p->h.adler32.dat) {
  589. printf_d("Debug: cluster_recv(): dat-adler32 mismatch: %p != %p.\n",
  590. (void*)(long)clustercmd_p->h.adler32.dat, (void*)(long)adler32.dat);
  591. if((ret=clustercmd_reject(clustercmd_p, REJ_adler32MISMATCH)) != EADDRNOTAVAIL) {
  592. printf_e("Error: cluster_recv(): Got error while clustercmd_reject(): %s (errno: %i).\n",
  593. strerror(ret), ret);
  594. errno = ret;
  595. return -1;
  596. }
  597. }
  598. CLUSTER_RECV_RETURNMESSAGE(clustercmd_p);
  599. }
  600. /**
  601. * @brief (hsyncop) Reads messages for time "_timeout" and proceeding them to recvproc_funct[] functions
  602. *
  603. * @param[in] _timeout How long to wait messages (totally)
  604. *
  605. * @retval zero Successful
  606. * @retval non-zero If got error while reading or processing messages. The error-code is placed into returned value.
  607. *
  608. */
  609. int cluster_recv_proc(unsigned int _timeout) {
  610. printf_ddd("Debug3: cluster_recv_proc(%i)\n", _timeout);
  611. clustercmd_t *clustercmd_p;
  612. int ret;
  613. unsigned int timeout = _timeout;
  614. while((ret=cluster_recv(&clustercmd_p, &timeout))) {
  615. // Exit if error
  616. if(ret == -1) {
  617. printf_e("Error: cluster_recv_proc(): Got error while cluster_recv(): %s (%i).\n",
  618. strerror(errno), errno);
  619. return errno;
  620. }
  621. // If we have appropriate callback function, then call it! :)
  622. if(recvproc_funct[clustercmd_p->h.cmd_id])
  623. if((ret=recvproc_funct[clustercmd_p->h.cmd_id](clustercmd_p))) {
  624. printf_e("Error: cluster_recv_proc(): Got error from recvproc_funct[%i]: %s (%i)\n",
  625. clustercmd_p->h.cmd_id, strerror(ret), ret);
  626. return ret;
  627. }
  628. }
  629. return 0;
  630. }
  631. /**
  632. * @brief recvproc-function for ACK-messages
  633. *
  634. * @param[in] clustercmd_p Pointer to clustercmd
  635. * @param[in] arg_p Pointer to argument
  636. *
  637. * @retval zero Successfully initialized.
  638. * @retval non-zero Got error, while initializing.
  639. *
  640. */
  641. static int cluster_recvproc_ack(clustercmd_t *clustercmd_p) {
  642. uint32_t cmd_serial_ack = clustercmd_p->data.ack.serial;
  643. clustercmdqueuedpacket_t *queuedpacket_p =
  644. (clustercmdqueuedpacket_t *)g_hash_table_lookup(nodeinfo_my->serial2queuedpacket_ht, GINT_TO_POINTER(cmd_serial_ack));
  645. if(queuedpacket_p == NULL)
  646. return 0;
  647. uint8_t node_id_from = clustercmd_p->h.src_node_id;
  648. if(! queuedpacket_p->h.w.o.ack_from[node_id_from]) {
  649. queuedpacket_p->h.w.o.ack_count++;
  650. queuedpacket_p->h.w.o.ack_from[node_id_from]++;
  651. if(queuedpacket_p->h.w.o.ack_count == node_count-1)
  652. clustercmd_window_del(&window_o, queuedpacket_p, nodeinfo_my->serial2queuedpacket_ht);
  653. }
  654. return 0;
  655. }
  656. /**
  657. * @brief Sets message processing functions for cluster_recv_proc() function for specified command type
  658. *
  659. * @param[in] cmd_id The command type
  660. * @param[in] procfunct The processing function for messages with specified cmd_id
  661. *
  662. * @retval zero Successful
  663. * @retval non-zero If got error while setting processing function. The error-code is placed into returned value.
  664. *
  665. */
  666. int cluster_io_init() {
  667. cluster_recv_proc_set(CLUSTERCMDID_ACK, cluster_recvproc_ack);
  668. return 0;
  669. }
  670. /**
  671. * @brief Antagonist of cluster_recv_proc_init() function. Freeing everything what was allocated in cluster_recv_proc_init()
  672. *
  673. * @retval zero Successfully initialized
  674. * @retval non-zero Got error, while initializing
  675. *
  676. */
  677. int cluster_io_deinit() {
  678. if(window_i.buf_size) {
  679. #ifdef PARANOID
  680. if(window_i.buf == NULL) {
  681. printf_e("Error: cluster_recv_proc_deinit(): window_i.buf_size != 0, but window_i.buf == NULL.\n");
  682. } else
  683. #endif
  684. free(window_i.buf);
  685. }
  686. if(window_o.buf_size) {
  687. #ifdef PARANOID
  688. if(window_o.buf == NULL) {
  689. printf_e("Error: cluster_recv_proc_deinit(): window_o.buf_size != 0, but window_o.buf == NULL.\n");
  690. } else
  691. #endif
  692. free(window_o.buf);
  693. }
  694. return 0;
  695. }
  696. /**
  697. * @brief recvproc-function for setid-messages
  698. *
  699. * @param[in] clustercmd_p Pointer to clustercmd
  700. * @param[in] arg_p Pointer to argument
  701. *
  702. * @retval zero Successfully initialized.
  703. * @retval non-zero Got error, while initializing.
  704. *
  705. */
  706. static int cluster_recvproc_setid(clustercmd_t *clustercmd_p) {
  707. static time_t updatets = 0;
  708. // Is this the most recent information? Skipping if not.
  709. clustercmd_setiddata_t *data_setid_p = &clustercmd_p->data.setid;
  710. if(!(data_setid_p->updatets > updatets))
  711. return 0;
  712. // Is the node name length in message equals to our node name length? Skipping if not.
  713. uint32_t recv_nodename_len;
  714. recv_nodename_len = CLUSTER_RESTDATALEN(clustercmd_p, clustercmd_setiddata_t);
  715. if(recv_nodename_len != options_p->cluster_nodename_len)
  716. return 0;
  717. // Is the node name equals to ours? Skipping if not.
  718. if(memcmp(data_setid_p->node_name, options_p->cluster_nodename, recv_nodename_len))
  719. return 0;
  720. // Remembering the node that answered us
  721. node_status_change(clustercmd_p->h.src_node_id, NODESTATUS_SEEMSONLINE);
  722. // Seems, that somebody knows our node id, remembering it.
  723. node_id_my = clustercmd_p->h.dst_node_id;
  724. updatets = data_setid_p->updatets;
  725. return 0;
  726. }
  727. extern int cluster_loop();
  728. /**
  729. * @brief Initializes cluster subsystem.
  730. *
  731. * @param[in] _options_p Pointer to "options" variable, defined in main().
  732. * @param[in] _indexes_p Pointer to "indexes" variable, defined in sync_run().
  733. *
  734. * @retval zero Successfully initialized.
  735. * @retval non-zero Got error, while initializing.
  736. *
  737. */
  738. int cluster_init(options_t *_options_p, indexes_t *_indexes_p) {
  739. int ret;
  740. // Preventing double initializing
  741. if(options_p != NULL) {
  742. printf_e("Error: cluster_init(): cluster subsystem is already initialized.\n");
  743. return EALREADY;
  744. }
  745. // Initializing global variables, pt. 1
  746. options_p = _options_p;
  747. indexes_p = _indexes_p;
  748. cluster_timeout = options_p->cluster_timeout;
  749. node_status_change(NODEID_NOID, NODESTATUS_ONLINE);
  750. // Initializing network routines
  751. // Input socket
  752. // Creating socket
  753. sock_i = socket(AF_INET, SOCK_DGRAM, 0);
  754. if(sock_i < 0) {
  755. printf_e("cluster_init(): Cannot create socket for input traffic: %s (errno: %i)\n", strerror(errno), errno);
  756. return errno;
  757. }
  758. // Enable SO_REUSEADDR to allow multiple instances of this application to receive copies
  759. // of the multicast datagrams.
  760. int reuse = 1;
  761. if(setsockopt(sock_i, SOL_SOCKET, SO_REUSEADDR,(char *)&reuse, sizeof(reuse)) < 0) {
  762. printf_e("Error: cluster_init(): Got error while setsockopt(): %s (errno: %i)\n",
  763. strerror(errno), errno);
  764. return errno;
  765. }
  766. // Binding
  767. sa_i.sin_family = AF_INET;
  768. sa_i.sin_port = htons(options_p->cluster_mcastipport);
  769. sa_i.sin_addr.s_addr = INADDR_ANY;
  770. if(bind(sock_i, (struct sockaddr*)&sa_i, sizeof(sa_i))) {
  771. printf_e("Error: cluster_init(): Got error while bind(): %s (errno: %i)\n",
  772. strerror(errno), errno);
  773. return errno;
  774. }
  775. // Joining to multicast group
  776. struct ip_mreq group;
  777. group.imr_interface.s_addr = inet_addr(options_p->cluster_iface);
  778. group.imr_multiaddr.s_addr = inet_addr(options_p->cluster_mcastipaddr);
  779. if(setsockopt(sock_i, IPPROTO_IP, IP_ADD_MEMBERSHIP,
  780. (char *)&group, sizeof(group)) < 0) {
  781. printf_e("Error: cluster_init(): Cannot setsockopt() to enter to membership %s -> %s\n",
  782. options_p->cluster_iface, options_p->cluster_mcastipaddr);
  783. return errno;
  784. }
  785. // Output socket
  786. // Creating socket
  787. sock_o = socket(AF_INET, SOCK_DGRAM, 0);
  788. if(sock_o < 0) {
  789. printf_e("cluster_init(): Cannot create socket for output traffic: %s (errno: %i)\n", strerror(errno), errno);
  790. return errno;
  791. }
  792. // Initializing the group sockaddr structure
  793. sa_o.sin_family = AF_INET;
  794. sa_o.sin_port = htons(options_p->cluster_mcastipport);
  795. sa_o.sin_addr.s_addr = inet_addr(options_p->cluster_mcastipaddr);
  796. // Disable looping back output datagrams
  797. {
  798. char loopch = 0;
  799. if(setsockopt(sock_o, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&loopch, sizeof(loopch))<0) {
  800. printf_e("Error: Cannot disable loopback for output socket: %s (errno: %i).\n", strerror(errno), errno);
  801. return errno;
  802. }
  803. }
  804. // Setting local interface for output traffic
  805. {
  806. struct in_addr addr_o;
  807. addr_o.s_addr = inet_addr(options_p->cluster_iface);
  808. if(setsockopt(sock_o, IPPROTO_IP, IP_MULTICAST_IF, &addr_o, sizeof(addr_o)) < 0) {
  809. printf_e("Error: Cannot set local interface for outbound traffic: %s (errno: %i)\n", strerror(errno), errno);
  810. return errno;
  811. }
  812. }
  813. // Initializing another routines
  814. cluster_io_init();
  815. // Getting my ID in the cluster
  816. // Trying to preserve my node_id after restart. :)
  817. // Asking another nodes about my previous node_id
  818. {
  819. clustercmd_t *clustercmd_p = CLUSTER_ALLOCA(clustercmd_getmyid_t, options_p->cluster_nodename_len);
  820. clustercmd_p->h.data_len = options_p->cluster_nodename_len;
  821. memcpy(clustercmd_p->data.getmyid.node_name, options_p->cluster_nodename, clustercmd_p->h.data_len+1);
  822. clustercmd_p->h.cmd_id = CLUSTERCMDID_GETMYID;
  823. clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
  824. if((ret=cluster_send(clustercmd_p)))
  825. return ret;
  826. }
  827. // Processing answers
  828. cluster_recv_proc_set(CLUSTERCMDID_SETID, cluster_recvproc_setid);
  829. if((ret=cluster_recv_proc(cluster_timeout)))
  830. return ret;
  831. printf_ddd("Debug3: cluster_init(): After communicating with others, my node_id is %i.\n", node_id_my);
  832. // Getting free node_id if nobody said us the certain value (see above).
  833. if(node_id_my == NODEID_NOID) {
  834. int i=0;
  835. while(i<MAXNODES) {
  836. if(nodeinfo[i].status == NODESTATUS_DOESNTEXIST) {
  837. node_id_my = i;
  838. break;
  839. }
  840. i++;
  841. }
  842. printf_ddd("Debug3: cluster_init(): I was have to set my node_id to %i.\n", node_id_my);
  843. }
  844. // If there's no free id-s, then exit :(
  845. if(node_id_my == NODEID_NOID) {
  846. printf_e("Error: Cannot find free node ID. Seems, that all %i ID-s are already occupied.\n");
  847. return ENOMEM;
  848. }
  849. // Registering in the cluster
  850. // Sending registration information
  851. node_status_change(node_id_my, NODESTATUS_SEEMSONLINE);
  852. {
  853. clustercmd_t *clustercmd_p = CLUSTER_ALLOCA(clustercmd_reg_t, options_p->cluster_nodename_len);
  854. clustercmd_reg_t *data_reg_p = &clustercmd_p->data.reg;
  855. memcpy(data_reg_p->node_name, options_p->cluster_nodename, options_p->cluster_nodename_len+1);
  856. clustercmd_p->h.data_len = options_p->cluster_nodename_len+1;
  857. clustercmd_p->h.cmd_id = CLUSTERCMDID_REG;
  858. clustercmd_p->h.dst_node_id = NODEID_NOID; // broadcast
  859. if((ret=cluster_send(clustercmd_p)))
  860. return ret;
  861. }
  862. // Getting answers
  863. if((ret=cluster_recv_proc(cluster_timeout)))
  864. return ret;
  865. node_status_change(node_id_my, NODESTATUS_ONLINE);
  866. // Initializing global variables, pt. 2
  867. nodeinfo_my = &nodeinfo[node_id_my];
  868. // Running thread, that will process background communicating routines with another nodes.
  869. // The process is based on function cluster_loop() [let's use shorthand "cluster_loop()-thread"]
  870. ret = pthread_create(&pthread_cluster, NULL, (void *(*)(void *))cluster_loop, NULL);
  871. return ret;
  872. }
  873. /**
  874. * @brief (syncop) Sends signal to cluster_loop()-thread
  875. *
  876. * @param[in] signal Signal number
  877. *
  878. * @retval zero Successfully send the signal
  879. * @retval non-zero Got error, while sending the signal
  880. *
  881. */
  882. static inline int cluster_signal(int signal) {
  883. if(pthread_cluster)
  884. return pthread_kill(pthread_cluster, signal);
  885. return 0;
  886. }
  887. extern int cluster_modtime_exchange_cleanup();
  888. /**
  889. * @brief Antagonist of cluster_init() function. Kills cluster_loop()-thread and cleaning up
  890. *
  891. * @retval zero Successfully initialized
  892. * @retval non-zero Got error, while initializing
  893. *
  894. */
  895. int cluster_deinit() {
  896. int ret = 0;
  897. cluster_signal(SIGTERM);
  898. ret = pthread_join(pthread_cluster, NULL);
  899. cluster_io_deinit();
  900. node_status_change(NODEID_NOID, NODESTATUS_DOESNTEXIST);
  901. #ifdef VERYPARANOID
  902. int i=0;
  903. #endif
  904. while(node_count) {
  905. #ifdef VERYPARANOID
  906. if(i++ > MAXNODES) {
  907. printf_e("Error: cluster_deinit() looped. Forcing break.");
  908. break;
  909. }
  910. #endif
  911. node_status_change(0, NODESTATUS_DOESNTEXIST);
  912. }
  913. close(sock_i);
  914. close(sock_o);
  915. #ifdef VERYPARANOID
  916. memset(nodeinfo, 0, sizeof(nodeinfo_t) * NODES_ALLOC);
  917. nodeinfo_my = NULL;
  918. node_count = 0;
  919. node_online = 0;
  920. node_id_my = NODEID_NOID;
  921. memset(&sa_i, 0, sizeof(sa_i));
  922. memset(&sa_o, 0, sizeof(sa_o));
  923. #endif
  924. cluster_modtime_exchange_cleanup();
  925. return ret;
  926. }
  927. /**
  928. * @brief (syncop) Forces anothes nodes to ignore events about the file or directory
  929. *
  930. * @param[in] fpath Path to the file or directory
  931. *
  932. * @retval zero Successfully initialized
  933. * @retval non-zero Got error, while initializing
  934. *
  935. */
  936. int cluster_lock(const char *fpath) {
  937. return 0;
  938. }
  939. /**
  940. * @brief (syncop) Forces anothes nodes to ignore events about all files and directories listed in queues of "indexes_p"
  941. *
  942. * @retval zero Successfully initialized
  943. * @retval non-zero Got error, while initializing
  944. *
  945. */
  946. int cluster_lock_byindexes() {
  947. return 0;
  948. }
  949. /**
  950. * @brief (syncop) Returns events-handling on another nodes about all files and directories, locked by cluster_lock() and cluster_lock_byindexes() from this node
  951. *
  952. * @retval zero Successfully initialized
  953. * @retval non-zero Got error, while initializing
  954. *
  955. */
  956. int cluster_unlock_all() {
  957. return 0;
  958. }
  959. #define CLUSTER_LOOP_CHECK(a) {\
  960. int ret = a;\
  961. if(ret) {\
  962. sync_term(ret);\
  963. return ret;\
  964. }\
  965. }
  966. /**
  967. * @brief Processes background communicating routines with another nodes. cluster_init() function create a thread for this function.
  968. *
  969. * @retval zero Successfully initialized
  970. * @retval non-zero Got error, while initializing
  971. *
  972. */
  973. int cluster_loop() {
  974. int ret = 0;
  975. sigset_t sigset_cluster;
  976. // Ignoring SIGINT signal
  977. sigemptyset(&sigset_cluster);
  978. sigaddset(&sigset_cluster, SIGINT);
  979. CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_BLOCK, &sigset_cluster, NULL));
  980. // Don't ignoring SIGTERM signal
  981. sigemptyset(&sigset_cluster);
  982. sigaddset(&sigset_cluster, SIGTERM);
  983. CLUSTER_LOOP_CHECK(pthread_sigmask(SIG_UNBLOCK, &sigset_cluster, NULL));
  984. // Starting the loop
  985. printf_ddd("Debug3: cluster_loop() started.\n");
  986. while(1) {
  987. int _ret;
  988. // Waiting for event
  989. fd_set rfds;
  990. FD_ZERO(&rfds);
  991. FD_SET(sock_i, &rfds);
  992. printf_ddd("Debug3: cluster_loop(): select()\n");
  993. _ret = select(sock_i+1, &rfds, NULL, NULL, NULL);
  994. // Exit if error
  995. if((_ret == -1) && (errno != EINTR)) {
  996. ret = errno;
  997. sync_term(ret);
  998. break;
  999. }
  1000. // Breaking the loop, if there's SIGTERM signal for this thread
  1001. printf_ddd("Debug3: cluster_loop(): sigpending()\n");
  1002. if(sigpending(&sigset_cluster))
  1003. if(sigismember(&sigset_cluster, SIGTERM))
  1004. break;
  1005. // Processing new messages
  1006. printf_ddd("Debug3: cluster_loop(): cluster_recv_proc()\n");
  1007. if((ret=cluster_recv_proc(0))) {
  1008. sync_term(ret);
  1009. break;
  1010. }
  1011. }
  1012. printf_ddd("Debug3: cluster_loop() finished with exitcode %i.\n", ret);
  1013. return ret;
  1014. #ifdef DOXYGEN
  1015. sync_term(0);
  1016. #endif
  1017. }
  1018. /**
  1019. * @brief Updating information about modification time of a directory.
  1020. *
  1021. * @param[in] path Canonized path to updated file/dir
  1022. * @param[in] dirlevel Directory level provided by fts (man 3 fts)
  1023. * @param[in] st_mode st_mode value to detect is it directory or not (S_IFDIR or not)
  1024. *
  1025. * @retval zero Successfully initialized
  1026. * @retval non-zero Got error, while initializing
  1027. *
  1028. */
  1029. int cluster_modtime_update(const char *path, short int dirlevel, mode_t st_mode) {
  1030. // "modtime" is incorrent name-part of function. Actually it updates "change time" (man 2 lstat64).
  1031. int ret;
  1032. // Getting relative directory level (depth)
  1033. short int dirlevel_rel = dirlevel - options_p->watchdir_dirlevel;
  1034. if((st_mode & S_IFMT) == S_IFDIR)
  1035. dirlevel_rel++;
  1036. // Don't remembering information about directories with level beyond the limits
  1037. if((dirlevel_rel > options_p->cluster_scan_dl_max) || (dirlevel_rel < options_p->cluster_hash_dl_min))
  1038. return 0;
  1039. // Getting directory/file-'s information (including "change time" aka "st_ctime")
  1040. struct stat64 stat64;
  1041. ret=lstat64(path, &stat64);
  1042. if(ret) {
  1043. printf_e("Error: cluster_modtime_update() cannot lstat64() on \"%s\": %s (errno: %i)\n", path, strerror(errno), errno);
  1044. return errno;
  1045. }
  1046. // Getting absolute directory path
  1047. const char *dirpath;
  1048. if((st_mode & S_IFMT) == S_IFDIR) {
  1049. dirpath = path;
  1050. } else {
  1051. char *path_dup = strdup(path);
  1052. dirpath = (const char *)dirname(path_dup);
  1053. free(path_dup);
  1054. }
  1055. // Getting relative directory path
  1056. // Initializing
  1057. size_t dirpath_len = strlen(dirpath);
  1058. char *dirpath_rel_p = xmalloc(dirpath_len+1);
  1059. char *dirpath_rel = dirpath_rel_p;
  1060. const char *dirpath_rel_full = &dirpath[options_p->watchdirlen];
  1061. size_t dirpath_rel_full_len = dirpath_len - options_p->watchdirlen;
  1062. // Getting coodinate of the end (directory path is already canonized, so we can simply count number of slashes to get directory level)
  1063. int slashcount=0;
  1064. size_t dirpath_rel_end=0;
  1065. while(dirpath_rel_full[dirpath_rel_end] && (dirpath_rel_end < dirpath_rel_full_len)) {
  1066. if(dirpath_rel_full[dirpath_rel_end] == '/') {
  1067. slashcount++;
  1068. if(slashcount >= options_p->cluster_hash_dl_max)
  1069. break;
  1070. }
  1071. dirpath_rel_end++;
  1072. }
  1073. // Copy the required part of path to dirpath_rel
  1074. memcpy(dirpath_rel, dirpath_rel_full, dirpath_rel_end);
  1075. // Updating "st_ctime" information. We should check current value for this directory and update it only if it less or not set.
  1076. // Checking current value
  1077. char toupdate = 0;
  1078. gpointer ctime_gp = g_hash_table_lookup(nodeinfo_my->modtime_ht, dirpath_rel);
  1079. if(ctime_gp == NULL)
  1080. toupdate++;
  1081. else if(GPOINTER_TO_INT(ctime_gp) < stat64.st_ctime)
  1082. toupdate++;
  1083. // g_hash_table_replace() will replace existent information about the directory or create it if it doesn't exist.
  1084. if(toupdate)
  1085. g_hash_table_replace(nodeinfo_my->modtime_ht, strdup(dirpath_rel), GINT_TO_POINTER(stat64.st_ctime));
  1086. // Why I'm using "st_ctime" instead of "st_mtime"? Because "st_ctime" also updates on updating inode information.
  1087. return 0;
  1088. }
  1089. /**
  1090. * @brief Puts entry to list to be send to other nodes. To be called from cluster_modtime_exchange()
  1091. *
  1092. * @param[in] pushrentry_arg_p Pointer to pushentry_arg structure
  1093. *
  1094. */
  1095. void cluster_modtime_exchange_pushentry(gpointer dir_gp, gpointer modtype_gp, void *pushentry_arg_gp) {
  1096. struct pushdoubleentry_arg *pushentry_arg_p = (struct pushdoubleentry_arg *)pushentry_arg_gp;
  1097. char *dir = (char *)dir_gp;
  1098. time_t ctime = (time_t)GPOINTER_TO_INT(modtype_gp);
  1099. size_t size = strlen(dir)+1; // TODO: strlen should be already prepared
  1100. // but not re-calculated here
  1101. if(pushentry_arg_p->allocated <= pushentry_arg_p->total) {
  1102. pushentry_arg_p->allocated += ALLOC_PORTION;
  1103. pushentry_arg_p->entry = (struct doubleentry *)
  1104. xrealloc(
  1105. (char *)pushentry_arg_p->entry,
  1106. pushentry_arg_p->allocated * sizeof(*pushentry_arg_p->entry)
  1107. );
  1108. }
  1109. pushentry_arg_p->entry[pushentry_arg_p->total].dat0 = dir;
  1110. pushentry_arg_p->entry[pushentry_arg_p->total].size0 = size;
  1111. pushentry_arg_p->entry[pushentry_arg_p->total].dat1 = (void *)ctime; // Will be problems if sizeof(time_t) > sizeof(void *)
  1112. pushentry_arg_p->entry[pushentry_arg_p->total].size1 = sizeof(ctime);
  1113. pushentry_arg_p->size += size;
  1114. pushentry_arg_p->total++;
  1115. return;
  1116. }
  1117. static struct pushdoubleentry_arg cluster_modtime_exchange_pushentry_arg = {0};
  1118. /**
  1119. * @brief Clean up after the last run of cluster_modtime_exchange.
  1120. *
  1121. * @retval zero Successfully initialized
  1122. * @retval non-zero Got error, while initializing
  1123. *
  1124. */
  1125. int cluster_modtime_exchange_cleanup() {
  1126. struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;
  1127. int i=0;
  1128. while(i < pushentry_arg_p->allocated) {
  1129. if(pushentry_arg_p->entry[i].alloc0)
  1130. free(pushentry_arg_p->entry[i].dat0);
  1131. if(pushentry_arg_p->entry[i].alloc1)
  1132. free(pushentry_arg_p->entry[i].dat1);
  1133. i++;
  1134. }
  1135. free(pushentry_arg_p->entry);
  1136. #ifdef VERYPARANOID
  1137. memset(pushentry_arg_p, 0, sizeof(*pushentry_arg_p));
  1138. #endif
  1139. return 0;
  1140. }
  1141. /**
  1142. * @brief Exchanging with "modtime_ht"-s to be able to compare them.
  1143. *
  1144. * @retval zero Successfully initialized
  1145. * @retval non-zero Got error, while initializing
  1146. *
  1147. */
  1148. int cluster_modtime_exchange() {
  1149. struct pushdoubleentry_arg *pushentry_arg_p = &cluster_modtime_exchange_pushentry_arg;
  1150. // Getting hash table entries
  1151. pushentry_arg_p->size=0;
  1152. pushentry_arg_p->total=0;
  1153. g_hash_table_foreach(nodeinfo_my->modtime_ht, cluster_modtime_exchange_pushentry, (void *)pushentry_arg_p);
  1154. if(!pushentry_arg_p->total) {
  1155. // !!!
  1156. }
  1157. // Calculating required RAM to compile clustercmd
  1158. size_t toalloc = 0;
  1159. int i = 0;
  1160. while(i < pushentry_arg_p->total) {
  1161. toalloc += 4; // for size header
  1162. toalloc += pushentry_arg_p->entry[i].size0; // for path
  1163. toalloc += pushentry_arg_p->entry[i].size1; // for ctime
  1164. }
  1165. // Allocating space for the clustercmd
  1166. clustercmd_t *clustercmd_p = (clustercmd_t *)xmalloc(sizeof(clustercmdhdr_t) + toalloc);
  1167. memset(clustercmd_p, 0, sizeof(clustercmdhdr_t));
  1168. // Setting up clustercmd
  1169. clustercmd_p->h.dst_node_id = NODEID_NOID;
  1170. clustercmd_p->h.cmd_id = CLUSTERCMDID_HT_EXCH;
  1171. clustercmd_p->h.data_len = toalloc;
  1172. // Filing clustercmd with hash-table entriyes
  1173. i = 0;
  1174. clustercmd_ht_exch_t *clustercmd_ht_exch_p = &clustercmd_p->data.ht_exch;
  1175. while(i < pushentry_arg_p->total) {
  1176. // Setting the data
  1177. clustercmd_ht_exch_p->ctime = (time_t)pushentry_arg_p->entry[i].dat1;
  1178. clustercmd_ht_exch_p->path_length = (time_t)pushentry_arg_p->entry[i].size0;
  1179. memcpy(
  1180. clustercmd_ht_exch_p->path,
  1181. pushentry_arg_p->entry[i].dat0,
  1182. clustercmd_ht_exch_p->path_length
  1183. );
  1184. // Pointing to space for next entry:
  1185. size_t offset = sizeof(clustercmd_ht_exch_t)-1+pushentry_arg_p->entry[i].size0;
  1186. clustercmd_ht_exch_p = (clustercmd_ht_exch_t *)
  1187. (&((char *) clustercmd_ht_exch_p)[offset] );
  1188. }
  1189. // Sending
  1190. cluster_send(clustercmd_p);
  1191. // Cleanup
  1192. free(clustercmd_p);
  1193. return 0;
  1194. }
  1195. /**
  1196. * @brief (syncop) Syncing file tree with another nodes with using of directories' modification time as a recent-detector.
  1197. *
  1198. * @param[in] dirpath Path to the directory
  1199. *
  1200. * @retval zero Successfully initialized
  1201. * @retval non-zero Got error, while initializing
  1202. *
  1203. */
  1204. int cluster_initialsync() {
  1205. cluster_modtime_exchange();
  1206. return 0;
  1207. }
  1208. /**
  1209. * @brief (syncop) "Captures" right to update the file or directory to another nodes. It just removes events about the file of directory from another nodes
  1210. *
  1211. * @param[in] dirpath Path to the directory
  1212. *
  1213. * @retval zero Successfully initialized
  1214. * @retval non-zero Got error, while initializing
  1215. *
  1216. */
  1217. int cluster_capture(const char *path) {
  1218. return 0;
  1219. }
  1220. #endif