123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657 |
- /*
- * Copyright (C) 2016 Tobias Brunner
- * HSR Hochschule fuer Technik Rapperswil
- *
- * Copyright (C) 2013 Martin Willi
- * Copyright (C) 2013 revosec AG
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * for more details.
- */
- #include "watcher.h"
- #include <library.h>
- #include <threading/thread.h>
- #include <threading/mutex.h>
- #include <threading/condvar.h>
- #include <collections/linked_list.h>
- #include <processing/jobs/callback_job.h>
- #include <unistd.h>
- #include <errno.h>
- #include <fcntl.h>
- typedef struct private_watcher_t private_watcher_t;
- typedef struct entry_t entry_t;
- /**
- * Private data of an watcher_t object.
- */
- struct private_watcher_t {
- /**
- * Public watcher_t interface.
- */
- watcher_t public;
- /**
- * List of registered FDs
- */
- entry_t *fds;
- /**
- * Last registered FD
- */
- entry_t *last;
- /**
- * Number of registered FDs
- */
- u_int count;
- /**
- * Pending update of FD list?
- */
- bool pending;
- /**
- * Running state of watcher
- */
- watcher_state_t state;
- /**
- * Lock to access FD list
- */
- mutex_t *mutex;
- /**
- * Condvar to signal completion of callback
- */
- condvar_t *condvar;
- /**
- * Notification pipe to signal watcher thread
- */
- int notify[2];
- /**
- * List of callback jobs to process by watcher thread, as job_t
- */
- linked_list_t *jobs;
- };
- /**
- * Entry for a registered file descriptor
- */
- struct entry_t {
- /** file descriptor */
- int fd;
- /** events to watch */
- watcher_event_t events;
- /** registered callback function */
- watcher_cb_t cb;
- /** user data to pass to callback */
- void *data;
- /** callback(s) currently active? */
- int in_callback;
- /** next registered fd */
- entry_t *next;
- };
- /**
- * Adds the given entry at the end of the list
- */
- static void add_entry(private_watcher_t *this, entry_t *entry)
- {
- if (this->last)
- {
- this->last->next = entry;
- this->last = entry;
- }
- else
- {
- this->fds = this->last = entry;
- }
- this->count++;
- }
- /**
- * Removes and frees the given entry
- *
- * Updates the previous entry and returns the next entry in the list, if any.
- */
- static entry_t *remove_entry(private_watcher_t *this, entry_t *entry,
- entry_t *prev)
- {
- entry_t *next = entry->next;
- if (prev)
- {
- prev->next = next;
- }
- else
- {
- this->fds = next;
- }
- if (this->last == entry)
- {
- this->last = prev;
- }
- this->count--;
- free(entry);
- return next;
- }
- /**
- * Data we pass on for an async notification
- */
- typedef struct {
- /** file descriptor */
- int fd;
- /** event type */
- watcher_event_t event;
- /** registered callback function */
- watcher_cb_t cb;
- /** user data to pass to callback */
- void *data;
- /** keep registered? */
- bool keep;
- /** reference to watcher */
- private_watcher_t *this;
- } notify_data_t;
- /**
- * Notify watcher thread about changes
- */
- static void update(private_watcher_t *this)
- {
- char buf[1] = { 'u' };
- this->pending = TRUE;
- if (this->notify[1] != -1)
- {
- if (write(this->notify[1], buf, sizeof(buf)) == -1)
- {
- DBG1(DBG_JOB, "notifying watcher failed: %s", strerror(errno));
- }
- }
- }
- /**
- * Cleanup function if callback gets cancelled
- */
- static void unregister(notify_data_t *data)
- {
- /* if a thread processing a callback gets cancelled, we mark the entry
- * as cancelled, like the callback would return FALSE. This is required
- * to not queue this watcher again if all threads have been gone. */
- data->keep = FALSE;
- }
- /**
- * Execute callback of registered FD, asynchronous
- */
- static job_requeue_t notify_async(notify_data_t *data)
- {
- thread_cleanup_push((void*)unregister, data);
- data->keep = data->cb(data->data, data->fd, data->event);
- thread_cleanup_pop(FALSE);
- return JOB_REQUEUE_NONE;
- }
- /**
- * Clean up notification data, reactivate FD
- */
- static void notify_end(notify_data_t *data)
- {
- private_watcher_t *this = data->this;
- entry_t *entry, *prev = NULL;
- /* reactivate the disabled entry */
- this->mutex->lock(this->mutex);
- for (entry = this->fds; entry; prev = entry, entry = entry->next)
- {
- if (entry->fd == data->fd)
- {
- if (!data->keep)
- {
- entry->events &= ~data->event;
- if (!entry->events)
- {
- remove_entry(this, entry, prev);
- break;
- }
- }
- entry->in_callback--;
- break;
- }
- }
- update(this);
- this->condvar->broadcast(this->condvar);
- this->mutex->unlock(this->mutex);
- free(data);
- }
- /**
- * Execute the callback for a registered FD
- */
- static void notify(private_watcher_t *this, entry_t *entry,
- watcher_event_t event)
- {
- notify_data_t *data;
- /* get a copy of entry for async job, but with specific event */
- INIT(data,
- .fd = entry->fd,
- .event = event,
- .cb = entry->cb,
- .data = entry->data,
- .keep = TRUE,
- .this = this,
- );
- /* deactivate entry, so we can select() other FDs even if the async
- * processing did not handle the event yet */
- entry->in_callback++;
- this->jobs->insert_last(this->jobs,
- callback_job_create_with_prio((void*)notify_async, data,
- (void*)notify_end, (callback_job_cancel_t)return_false,
- JOB_PRIO_CRITICAL));
- }
- /**
- * Thread cancellation function for watcher thread
- */
- static void activate_all(private_watcher_t *this)
- {
- entry_t *entry;
- /* When the watcher thread gets cancelled, we have to reactivate any entry
- * and signal threads in remove() to go on. */
- this->mutex->lock(this->mutex);
- for (entry = this->fds; entry; entry = entry->next)
- {
- entry->in_callback = 0;
- }
- this->state = WATCHER_STOPPED;
- this->condvar->broadcast(this->condvar);
- this->mutex->unlock(this->mutex);
- }
- /**
- * Find flagged revents in a pollfd set by fd
- */
- static inline int find_revents(struct pollfd *pfd, int count, int fd)
- {
- int i;
- for (i = 0; i < count; i++)
- {
- if (pfd[i].fd == fd)
- {
- return pfd[i].revents;
- }
- }
- return 0;
- }
- /**
- * Check if entry is waiting for a specific event, and if it got signaled
- */
- static inline bool entry_ready(entry_t *entry, watcher_event_t event,
- int revents)
- {
- if (entry->events & event)
- {
- switch (event)
- {
- case WATCHER_READ:
- return (revents & (POLLIN | POLLHUP | POLLNVAL)) != 0;
- case WATCHER_WRITE:
- return (revents & (POLLOUT | POLLHUP | POLLNVAL)) != 0;
- case WATCHER_EXCEPT:
- return (revents & (POLLERR | POLLHUP | POLLNVAL)) != 0;
- }
- }
- return FALSE;
- }
- /**
- * Dispatching function
- */
- static job_requeue_t watch(private_watcher_t *this)
- {
- entry_t *entry;
- struct pollfd *pfd;
- int count = 0, res;
- bool rebuild = FALSE;
- this->mutex->lock(this->mutex);
- count = this->count;
- if (!count)
- {
- this->state = WATCHER_STOPPED;
- this->mutex->unlock(this->mutex);
- return JOB_REQUEUE_NONE;
- }
- if (this->state == WATCHER_QUEUED)
- {
- this->state = WATCHER_RUNNING;
- }
- pfd = alloca(sizeof(*pfd) * (count + 1));
- pfd[0].fd = this->notify[0];
- pfd[0].events = POLLIN;
- count = 1;
- for (entry = this->fds; entry; entry = entry->next)
- {
- if (!entry->in_callback)
- {
- pfd[count].fd = entry->fd;
- pfd[count].events = 0;
- if (entry->events & WATCHER_READ)
- {
- DBG3(DBG_JOB, " watching %d for reading", entry->fd);
- pfd[count].events |= POLLIN;
- }
- if (entry->events & WATCHER_WRITE)
- {
- DBG3(DBG_JOB, " watching %d for writing", entry->fd);
- pfd[count].events |= POLLOUT;
- }
- if (entry->events & WATCHER_EXCEPT)
- {
- DBG3(DBG_JOB, " watching %d for exceptions", entry->fd);
- pfd[count].events |= POLLERR;
- }
- count++;
- }
- }
- this->mutex->unlock(this->mutex);
- while (!rebuild)
- {
- int revents;
- char buf[1];
- bool old;
- ssize_t len;
- job_t *job;
- DBG2(DBG_JOB, "watcher going to poll() %d fds", count);
- thread_cleanup_push((void*)activate_all, this);
- old = thread_cancelability(TRUE);
- res = poll(pfd, count, -1);
- if (res == -1 && errno == EINTR)
- {
- /* LinuxThreads interrupts poll(), but does not make it a
- * cancellation point. Manually test if we got cancelled. */
- thread_cancellation_point();
- }
- thread_cancelability(old);
- thread_cleanup_pop(FALSE);
- if (res > 0)
- {
- if (pfd[0].revents & POLLIN)
- {
- while (TRUE)
- {
- len = read(this->notify[0], buf, sizeof(buf));
- if (len == -1)
- {
- if (errno != EAGAIN && errno != EWOULDBLOCK)
- {
- DBG1(DBG_JOB, "reading watcher notify failed: %s",
- strerror(errno));
- }
- break;
- }
- }
- this->pending = FALSE;
- DBG2(DBG_JOB, "watcher got notification, rebuilding");
- return JOB_REQUEUE_DIRECT;
- }
- this->mutex->lock(this->mutex);
- for (entry = this->fds; entry; entry = entry->next)
- {
- if (entry->in_callback)
- {
- rebuild = TRUE;
- break;
- }
- revents = find_revents(pfd, count, entry->fd);
- if (entry_ready(entry, WATCHER_EXCEPT, revents))
- {
- DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
- notify(this, entry, WATCHER_EXCEPT);
- }
- else
- {
- if (entry_ready(entry, WATCHER_READ, revents))
- {
- DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
- notify(this, entry, WATCHER_READ);
- }
- if (entry_ready(entry, WATCHER_WRITE, revents))
- {
- DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
- notify(this, entry, WATCHER_WRITE);
- }
- }
- }
- this->mutex->unlock(this->mutex);
- if (this->jobs->get_count(this->jobs))
- {
- while (this->jobs->remove_first(this->jobs,
- (void**)&job) == SUCCESS)
- {
- lib->processor->execute_job(lib->processor, job);
- }
- /* we temporarily disable a notified FD, rebuild FDSET */
- return JOB_REQUEUE_DIRECT;
- }
- }
- else
- {
- if (!this->pending && errno != EINTR)
- { /* complain only if no pending updates */
- DBG1(DBG_JOB, "watcher poll() error: %s", strerror(errno));
- }
- return JOB_REQUEUE_DIRECT;
- }
- }
- return JOB_REQUEUE_DIRECT;
- }
- METHOD(watcher_t, add, void,
- private_watcher_t *this, int fd, watcher_event_t events,
- watcher_cb_t cb, void *data)
- {
- entry_t *entry;
- INIT(entry,
- .fd = fd,
- .events = events,
- .cb = cb,
- .data = data,
- );
- this->mutex->lock(this->mutex);
- add_entry(this, entry);
- if (this->state == WATCHER_STOPPED)
- {
- this->state = WATCHER_QUEUED;
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((void*)watch, this,
- NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
- }
- else
- {
- update(this);
- }
- this->mutex->unlock(this->mutex);
- }
- METHOD(watcher_t, remove_, void,
- private_watcher_t *this, int fd)
- {
- entry_t *entry, *prev = NULL;
- bool found = FALSE;
- this->mutex->lock(this->mutex);
- while (TRUE)
- {
- bool is_in_callback = FALSE;
- entry = this->fds;
- while (entry)
- {
- if (entry->fd == fd)
- {
- if (this->state != WATCHER_STOPPED && entry->in_callback)
- {
- is_in_callback = TRUE;
- break;
- }
- entry = remove_entry(this, entry, prev);
- found = TRUE;
- continue;
- }
- prev = entry;
- entry = entry->next;
- }
- if (!is_in_callback)
- {
- break;
- }
- this->condvar->wait(this->condvar, this->mutex);
- }
- if (found)
- {
- update(this);
- }
- this->mutex->unlock(this->mutex);
- }
- METHOD(watcher_t, get_state, watcher_state_t,
- private_watcher_t *this)
- {
- watcher_state_t state;
- this->mutex->lock(this->mutex);
- state = this->state;
- this->mutex->unlock(this->mutex);
- return state;
- }
- METHOD(watcher_t, destroy, void,
- private_watcher_t *this)
- {
- this->mutex->destroy(this->mutex);
- this->condvar->destroy(this->condvar);
- if (this->notify[0] != -1)
- {
- close(this->notify[0]);
- }
- if (this->notify[1] != -1)
- {
- close(this->notify[1]);
- }
- this->jobs->destroy(this->jobs);
- free(this);
- }
- #ifdef WIN32
- /**
- * Create notify pipe with a TCP socketpair
- */
- static bool create_notify(private_watcher_t *this)
- {
- u_long on = 1;
- if (socketpair(AF_INET, SOCK_STREAM, 0, this->notify) == 0)
- {
- /* use non-blocking I/O on read-end of notify pipe */
- if (ioctlsocket(this->notify[0], FIONBIO, &on) == 0)
- {
- return TRUE;
- }
- DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
- "failed: %s", strerror(errno));
- }
- return FALSE;
- }
- #else /* !WIN32 */
- /**
- * Create a notify pipe with a one-directional pipe
- */
- static bool create_notify(private_watcher_t *this)
- {
- int flags;
- if (pipe(this->notify) == 0)
- {
- /* use non-blocking I/O on read-end of notify pipe */
- flags = fcntl(this->notify[0], F_GETFL);
- if (flags != -1 &&
- fcntl(this->notify[0], F_SETFL, flags | O_NONBLOCK) != -1)
- {
- return TRUE;
- }
- DBG1(DBG_LIB, "setting watcher notify pipe read-end non-blocking "
- "failed: %s", strerror(errno));
- }
- return FALSE;
- }
- #endif /* !WIN32 */
- /**
- * See header
- */
- watcher_t *watcher_create()
- {
- private_watcher_t *this;
- INIT(this,
- .public = {
- .add = _add,
- .remove = _remove_,
- .get_state = _get_state,
- .destroy = _destroy,
- },
- .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
- .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
- .jobs = linked_list_create(),
- .notify = {-1, -1},
- .state = WATCHER_STOPPED,
- );
- if (!create_notify(this))
- {
- DBG1(DBG_LIB, "creating watcher notify pipe failed: %s",
- strerror(errno));
- }
- return &this->public;
- }
|