watcher.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. /*
  2. * Copyright (C) 2016 Tobias Brunner
  3. * HSR Hochschule fuer Technik Rapperswil
  4. *
  5. * Copyright (C) 2013 Martin Willi
  6. * Copyright (C) 2013 revosec AG
  7. *
  8. * This program is free software; you can redistribute it and/or modify it
  9. * under the terms of the GNU General Public License as published by the
  10. * Free Software Foundation; either version 2 of the License, or (at your
  11. * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
  12. *
  13. * This program is distributed in the hope that it will be useful, but
  14. * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
  15. * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
  16. * for more details.
  17. */
  18. #include "watcher.h"
  19. #include <library.h>
  20. #include <threading/thread.h>
  21. #include <threading/mutex.h>
  22. #include <threading/condvar.h>
  23. #include <collections/linked_list.h>
  24. #include <processing/jobs/callback_job.h>
  25. #include <unistd.h>
  26. #include <errno.h>
  27. #include <fcntl.h>
  28. typedef struct private_watcher_t private_watcher_t;
  29. typedef struct entry_t entry_t;
  30. /**
  31. * Private data of an watcher_t object.
  32. */
  33. struct private_watcher_t {
  34. /**
  35. * Public watcher_t interface.
  36. */
  37. watcher_t public;
  38. /**
  39. * List of registered FDs
  40. */
  41. entry_t *fds;
  42. /**
  43. * Last registered FD
  44. */
  45. entry_t *last;
  46. /**
  47. * Number of registered FDs
  48. */
  49. u_int count;
  50. /**
  51. * Pending update of FD list?
  52. */
  53. bool pending;
  54. /**
  55. * Running state of watcher
  56. */
  57. watcher_state_t state;
  58. /**
  59. * Lock to access FD list
  60. */
  61. mutex_t *mutex;
  62. /**
  63. * Condvar to signal completion of callback
  64. */
  65. condvar_t *condvar;
  66. /**
  67. * Notification pipe to signal watcher thread
  68. */
  69. int notify[2];
  70. /**
  71. * List of callback jobs to process by watcher thread, as job_t
  72. */
  73. linked_list_t *jobs;
  74. };
  75. /**
  76. * Entry for a registered file descriptor
  77. */
  78. struct entry_t {
  79. /** file descriptor */
  80. int fd;
  81. /** events to watch */
  82. watcher_event_t events;
  83. /** registered callback function */
  84. watcher_cb_t cb;
  85. /** user data to pass to callback */
  86. void *data;
  87. /** callback(s) currently active? */
  88. int in_callback;
  89. /** next registered fd */
  90. entry_t *next;
  91. };
  92. /**
  93. * Adds the given entry at the end of the list
  94. */
  95. static void add_entry(private_watcher_t *this, entry_t *entry)
  96. {
  97. if (this->last)
  98. {
  99. this->last->next = entry;
  100. this->last = entry;
  101. }
  102. else
  103. {
  104. this->fds = this->last = entry;
  105. }
  106. this->count++;
  107. }
  108. /**
  109. * Removes and frees the given entry
  110. *
  111. * Updates the previous entry and returns the next entry in the list, if any.
  112. */
  113. static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
  114. entry_t *prev)
  115. {
  116. entry_t *next = entry->next;
  117. if (prev)
  118. {
  119. prev->next = next;
  120. }
  121. else
  122. {
  123. this->fds = next;
  124. }
  125. if (this->last == entry)
  126. {
  127. this->last = prev;
  128. }
  129. this->count--;
  130. free(entry);
  131. return next;
  132. }
  133. /**
  134. * Data we pass on for an async notification
  135. */
  136. typedef struct {
  137. /** file descriptor */
  138. int fd;
  139. /** event type */
  140. watcher_event_t event;
  141. /** registered callback function */
  142. watcher_cb_t cb;
  143. /** user data to pass to callback */
  144. void *data;
  145. /** keep registered? */
  146. bool keep;
  147. /** reference to watcher */
  148. private_watcher_t *this;
  149. } notify_data_t;
  150. /**
  151. * Notify watcher thread about changes
  152. */
  153. static void update(private_watcher_t *this)
  154. {
  155. char buf[1] = { 'u' };
  156. this->pending = TRUE;
  157. if (this->notify[1] != -1)
  158. {
  159. if (write(this->notify[1], buf, sizeof(buf)) == -1)
  160. {
  161. DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
  162. }
  163. }
  164. }
  165. /**
  166. * Cleanup function if callback gets cancelled
  167. */
  168. static void unregister(notify_data_t *data)
  169. {
  170. /* if a thread processing a callback gets cancelled, we mark the entry
  171. * as cancelled, like the callback would return FALSE. This is required
  172. * to not queue this watcher again if all threads have been gone. */
  173. data->keep = FALSE;
  174. }
  175. /**
  176. * Execute callback of registered FD, asynchronous
  177. */
  178. static job_requeue_t notify_async(notify_data_t *data)
  179. {
  180. thread_cleanup_push((void*)unregister, data);
  181. data->keep = data->cb(data->data, data->fd, data->event);
  182. thread_cleanup_pop(FALSE);
  183. return JOB_REQUEUE_NONE;
  184. }
  185. /**
  186. * Clean up notification data, reactivate FD
  187. */
  188. static void notify_end(notify_data_t *data)
  189. {
  190. private_watcher_t *this = data->this;
  191. entry_t *entry, *prev = NULL;
  192. /* reactivate the disabled entry */
  193. this->mutex->lock(this->mutex);
  194. for (entry = this->fds; entry; prev = entry, entry = entry->next)
  195. {
  196. if (entry->fd == data->fd)
  197. {
  198. if (!data->keep)
  199. {
  200. entry->events &= ~data->event;
  201. if (!entry->events)
  202. {
  203. remove_entry(this, entry, prev);
  204. break;
  205. }
  206. }
  207. entry->in_callback--;
  208. break;
  209. }
  210. }
  211. update(this);
  212. this->condvar->broadcast(this->condvar);
  213. this->mutex->unlock(this->mutex);
  214. free(data);
  215. }
  216. /**
  217. * Execute the callback for a registered FD
  218. */
  219. static void notify(private_watcher_t *this, entry_t *entry,
  220. watcher_event_t event)
  221. {
  222. notify_data_t *data;
  223. /* get a copy of entry for async job, but with specific event */
  224. INIT(data,
  225. .fd = entry->fd,
  226. .event = event,
  227. .cb = entry->cb,
  228. .data = entry->data,
  229. .keep = TRUE,
  230. .this = this,
  231. );
  232. /* deactivate entry, so we can select() other FDs even if the async
  233. * processing did not handle the event yet */
  234. entry->in_callback++;
  235. this->jobs->insert_last(this->jobs,
  236. callback_job_create_with_prio((void*)notify_async, data,
  237. (void*)notify_end, (callback_job_cancel_t)return_false,
  238. JOB_PRIO_CRITICAL));
  239. }
  240. /**
  241. * Thread cancellation function for watcher thread
  242. */
  243. static void activate_all(private_watcher_t *this)
  244. {
  245. entry_t *entry;
  246. /* When the watcher thread gets cancelled, we have to reactivate any entry
  247. * and signal threads in remove() to go on. */
  248. this->mutex->lock(this->mutex);
  249. for (entry = this->fds; entry; entry = entry->next)
  250. {
  251. entry->in_callback = 0;
  252. }
  253. this->state = WATCHER_STOPPED;
  254. this->condvar->broadcast(this->condvar);
  255. this->mutex->unlock(this->mutex);
  256. }
  257. /**
  258. * Find flagged revents in a pollfd set by fd
  259. */
  260. static inline int find_revents(struct pollfd *pfd, int count, int fd)
  261. {
  262. int i;
  263. for (i = 0; i < count; i++)
  264. {
  265. if (pfd[i].fd == fd)
  266. {
  267. return pfd[i].revents;
  268. }
  269. }
  270. return 0;
  271. }
  272. /**
  273. * Check if entry is waiting for a specific event, and if it got signaled
  274. */
  275. static inline bool entry_ready(entry_t *entry, watcher_event_t event,
  276. int revents)
  277. {
  278. if (entry->events & event)
  279. {
  280. switch (event)
  281. {
  282. case WATCHER_READ:
  283. return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
  284. case WATCHER_WRITE:
  285. return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
  286. case WATCHER_EXCEPT:
  287. return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
  288. }
  289. }
  290. return FALSE;
  291. }
  292. /**
  293. * Dispatching function
  294. */
  295. static job_requeue_t watch(private_watcher_t *this)
  296. {
  297. entry_t *entry;
  298. struct pollfd *pfd;
  299. int count = 0, res;
  300. bool rebuild = FALSE;
  301. this->mutex->lock(this->mutex);
  302. count = this->count;
  303. if (!count)
  304. {
  305. this->state = WATCHER_STOPPED;
  306. this->mutex->unlock(this->mutex);
  307. return JOB_REQUEUE_NONE;
  308. }
  309. if (this->state == WATCHER_QUEUED)
  310. {
  311. this->state = WATCHER_RUNNING;
  312. }
  313. pfd = alloca(sizeof(*pfd) * (count + 1));
  314. pfd[0].fd = this->notify[0];
  315. pfd[0].events = POLLIN;
  316. count = 1;
  317. for (entry = this->fds; entry; entry = entry->next)
  318. {
  319. if (!entry->in_callback)
  320. {
  321. pfd[count].fd = entry->fd;
  322. pfd[count].events = 0;
  323. if (entry->events & WATCHER_READ)
  324. {
  325. DBG3(DBG_JOB, " watching %d for reading", entry->fd);
  326. pfd[count].events |= POLLIN;
  327. }
  328. if (entry->events & WATCHER_WRITE)
  329. {
  330. DBG3(DBG_JOB, " watching %d for writing", entry->fd);
  331. pfd[count].events |= POLLOUT;
  332. }
  333. if (entry->events & WATCHER_EXCEPT)
  334. {
  335. DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
  336. pfd[count].events |= POLLERR;
  337. }
  338. count++;
  339. }
  340. }
  341. this->mutex->unlock(this->mutex);
  342. while (!rebuild)
  343. {
  344. int revents;
  345. char buf[1];
  346. bool old;
  347. ssize_t len;
  348. job_t *job;
  349. DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
  350. thread_cleanup_push((void*)activate_all, this);
  351. old = thread_cancelability(TRUE);
  352. res = poll(pfd, count, -1);
  353. if (res == -1 && errno == EINTR)
  354. {
  355. /* LinuxThreads interrupts poll(), but does not make it a
  356. * cancellation point. Manually test if we got cancelled. */
  357. thread_cancellation_point();
  358. }
  359. thread_cancelability(old);
  360. thread_cleanup_pop(FALSE);
  361. if (res > 0)
  362. {
  363. if (pfd[0].revents & POLLIN)
  364. {
  365. while (TRUE)
  366. {
  367. len = read(this->notify[0], buf, sizeof(buf));
  368. if (len == -1)
  369. {
  370. if (errno != EAGAIN && errno != EWOULDBLOCK)
  371. {
  372. DBG1(DBG_JOB, "reading watcher notify failed: %s",
  373. strerror(errno));
  374. }
  375. break;
  376. }
  377. }
  378. this->pending = FALSE;
  379. DBG2(DBG_JOB, "watcher got notification, rebuilding");
  380. return JOB_REQUEUE_DIRECT;
  381. }
  382. this->mutex->lock(this->mutex);
  383. for (entry = this->fds; entry; entry = entry->next)
  384. {
  385. if (entry->in_callback)
  386. {
  387. rebuild = TRUE;
  388. break;
  389. }
  390. revents = find_revents(pfd, count, entry->fd);
  391. if (entry_ready(entry, WATCHER_EXCEPT, revents))
  392. {
  393. DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
  394. notify(this, entry, WATCHER_EXCEPT);
  395. }
  396. else
  397. {
  398. if (entry_ready(entry, WATCHER_READ, revents))
  399. {
  400. DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
  401. notify(this, entry, WATCHER_READ);
  402. }
  403. if (entry_ready(entry, WATCHER_WRITE, revents))
  404. {
  405. DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
  406. notify(this, entry, WATCHER_WRITE);
  407. }
  408. }
  409. }
  410. this->mutex->unlock(this->mutex);
  411. if (this->jobs->get_count(this->jobs))
  412. {
  413. while (this->jobs->remove_first(this->jobs,
  414. (void**)&job) == SUCCESS)
  415. {
  416. lib->processor->execute_job(lib->processor, job);
  417. }
  418. /* we temporarily disable a notified FD, rebuild FDSET */
  419. return JOB_REQUEUE_DIRECT;
  420. }
  421. }
  422. else
  423. {
  424. if (!this->pending && errno != EINTR)
  425. { /* complain only if no pending updates */
  426. DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
  427. }
  428. return JOB_REQUEUE_DIRECT;
  429. }
  430. }
  431. return JOB_REQUEUE_DIRECT;
  432. }
  433. METHOD(watcher_t, add, void,
  434. private_watcher_t *this, int fd, watcher_event_t events,
  435. watcher_cb_t cb, void *data)
  436. {
  437. entry_t *entry;
  438. INIT(entry,
  439. .fd = fd,
  440. .events = events,
  441. .cb = cb,
  442. .data = data,
  443. );
  444. this->mutex->lock(this->mutex);
  445. add_entry(this, entry);
  446. if (this->state == WATCHER_STOPPED)
  447. {
  448. this->state = WATCHER_QUEUED;
  449. lib->processor->queue_job(lib->processor,
  450. (job_t*)callback_job_create_with_prio((void*)watch, this,
  451. NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
  452. }
  453. else
  454. {
  455. update(this);
  456. }
  457. this->mutex->unlock(this->mutex);
  458. }
  459. METHOD(watcher_t, remove_, void,
  460. private_watcher_t *this, int fd)
  461. {
  462. entry_t *entry, *prev = NULL;
  463. bool found = FALSE;
  464. this->mutex->lock(this->mutex);
  465. while (TRUE)
  466. {
  467. bool is_in_callback = FALSE;
  468. entry = this->fds;
  469. while (entry)
  470. {
  471. if (entry->fd == fd)
  472. {
  473. if (this->state != WATCHER_STOPPED && entry->in_callback)
  474. {
  475. is_in_callback = TRUE;
  476. break;
  477. }
  478. entry = remove_entry(this, entry, prev);
  479. found = TRUE;
  480. continue;
  481. }
  482. prev = entry;
  483. entry = entry->next;
  484. }
  485. if (!is_in_callback)
  486. {
  487. break;
  488. }
  489. this->condvar->wait(this->condvar, this->mutex);
  490. }
  491. if (found)
  492. {
  493. update(this);
  494. }
  495. this->mutex->unlock(this->mutex);
  496. }
  497. METHOD(watcher_t, get_state, watcher_state_t,
  498. private_watcher_t *this)
  499. {
  500. watcher_state_t state;
  501. this->mutex->lock(this->mutex);
  502. state = this->state;
  503. this->mutex->unlock(this->mutex);
  504. return state;
  505. }
  506. METHOD(watcher_t, destroy, void,
  507. private_watcher_t *this)
  508. {
  509. this->mutex->destroy(this->mutex);
  510. this->condvar->destroy(this->condvar);
  511. if (this->notify[0] != -1)
  512. {
  513. close(this->notify[0]);
  514. }
  515. if (this->notify[1] != -1)
  516. {
  517. close(this->notify[1]);
  518. }
  519. this->jobs->destroy(this->jobs);
  520. free(this);
  521. }
  522. #ifdef WIN32
  523. /**
  524. * Create notify pipe with a TCP socketpair
  525. */
  526. static bool create_notify(private_watcher_t *this)
  527. {
  528. u_long on = 1;
  529. if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
  530. {
  531. /* use non-blocking I/O on read-end of notify pipe */
  532. if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
  533. {
  534. return TRUE;
  535. }
  536. DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
  537. "failed: %s", strerror(errno));
  538. }
  539. return FALSE;
  540. }
  541. #else /* !WIN32 */
  542. /**
  543. * Create a notify pipe with a one-directional pipe
  544. */
  545. static bool create_notify(private_watcher_t *this)
  546. {
  547. int flags;
  548. if (pipe(this->notify) == 0)
  549. {
  550. /* use non-blocking I/O on read-end of notify pipe */
  551. flags = fcntl(this->notify[0], F_GETFL);
  552. if (flags != -1 &&
  553. fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
  554. {
  555. return TRUE;
  556. }
  557. DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
  558. "failed: %s", strerror(errno));
  559. }
  560. return FALSE;
  561. }
  562. #endif /* !WIN32 */
  563. /**
  564. * See header
  565. */
  566. watcher_t *watcher_create()
  567. {
  568. private_watcher_t *this;
  569. INIT(this,
  570. .public = {
  571. .add = _add,
  572. .remove = _remove_,
  573. .get_state = _get_state,
  574. .destroy = _destroy,
  575. },
  576. .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
  577. .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
  578. .jobs = linked_list_create(),
  579. .notify = {-1, -1},
  580. .state = WATCHER_STOPPED,
  581. );
  582. if (!create_notify(this))
  583. {
  584. DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
  585. strerror(errno));
  586. }
  587. return &this->public;
  588. }