scheduler.c 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. /*
  2. * Copyright (C) 2008-2015 Tobias Brunner
  3. * Copyright (C) 2005-2006 Martin Willi
  4. * Copyright (C) 2005 Jan Hutter
  5. * HSR Hochschule fuer Technik Rapperswil
  6. *
  7. * This program is free software; you can redistribute it and/or modify it
  8. * under the terms of the GNU General Public License as published by the
  9. * Free Software Foundation; either version 2 of the License, or (at your
  10. * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
  11. *
  12. * This program is distributed in the hope that it will be useful, but
  13. * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
  14. * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
  15. * for more details.
  16. */
  17. #include <stdlib.h>
  18. #include "scheduler.h"
  19. #include <utils/debug.h>
  20. #include <processing/processor.h>
  21. #include <processing/jobs/callback_job.h>
  22. #include <threading/thread.h>
  23. #include <threading/condvar.h>
  24. #include <threading/mutex.h>
  25. /* the initial size of the heap */
  26. #define HEAP_SIZE_DEFAULT 64
  27. typedef struct event_t event_t;
  28. /**
  29. * Event containing a job and a schedule time
  30. */
  31. struct event_t {
  32. /**
  33. * Time to fire the event.
  34. */
  35. timeval_t time;
  36. /**
  37. * Every event has its assigned job.
  38. */
  39. job_t *job;
  40. };
  41. /**
  42. * destroy an event and its job
  43. */
  44. static void event_destroy(event_t *event)
  45. {
  46. event->job->destroy(event->job);
  47. free(event);
  48. }
  49. typedef struct private_scheduler_t private_scheduler_t;
  50. /**
  51. * Private data of a scheduler_t object.
  52. */
  53. struct private_scheduler_t {
  54. /**
  55. * Public part of a scheduler_t object.
  56. */
  57. scheduler_t public;
  58. /**
  59. * The heap in which the events are stored.
  60. */
  61. event_t **heap;
  62. /**
  63. * The size of the heap.
  64. */
  65. u_int heap_size;
  66. /**
  67. * The number of scheduled events.
  68. */
  69. u_int event_count;
  70. /**
  71. * Exclusive access to list
  72. */
  73. mutex_t *mutex;
  74. /**
  75. * Condvar to wait for next job.
  76. */
  77. condvar_t *condvar;
  78. };
  79. /**
  80. * Comparse two timevals, return >0 if a > b, <0 if a < b and =0 if equal
  81. */
  82. static int timeval_cmp(timeval_t *a, timeval_t *b)
  83. {
  84. if (a->tv_sec > b->tv_sec)
  85. {
  86. return 1;
  87. }
  88. if (a->tv_sec < b->tv_sec)
  89. {
  90. return -1;
  91. }
  92. if (a->tv_usec > b->tv_usec)
  93. {
  94. return 1;
  95. }
  96. if (a->tv_usec < b->tv_usec)
  97. {
  98. return -1;
  99. }
  100. return 0;
  101. }
  102. /**
  103. * Returns the top event without removing it. Returns NULL if the heap is empty.
  104. */
  105. static event_t *peek_event(private_scheduler_t *this)
  106. {
  107. return this->event_count > 0 ? this->heap[1] : NULL;
  108. }
  109. /**
  110. * Removes the top event from the heap and returns it. Returns NULL if the heap
  111. * is empty.
  112. */
  113. static event_t *remove_event(private_scheduler_t *this)
  114. {
  115. event_t *event, *top;
  116. if (!this->event_count)
  117. {
  118. return NULL;
  119. }
  120. /* store the value to return */
  121. event = this->heap[1];
  122. /* move the bottom event to the top */
  123. top = this->heap[1] = this->heap[this->event_count];
  124. if (--this->event_count > 1)
  125. {
  126. /* seep down the top event */
  127. u_int position = 1;
  128. while ((position << 1) <= this->event_count)
  129. {
  130. u_int child = position << 1;
  131. if ((child + 1) <= this->event_count &&
  132. timeval_cmp(&this->heap[child + 1]->time,
  133. &this->heap[child]->time) < 0)
  134. {
  135. /* the "right" child is smaller */
  136. child++;
  137. }
  138. if (timeval_cmp(&top->time, &this->heap[child]->time) <= 0)
  139. {
  140. /* the top event fires before the smaller of the two children,
  141. * stop */
  142. break;
  143. }
  144. /* swap with the smaller child */
  145. this->heap[position] = this->heap[child];
  146. position = child;
  147. }
  148. this->heap[position] = top;
  149. }
  150. return event;
  151. }
  152. /**
  153. * Get events from the queue and pass it to the processor
  154. */
  155. static job_requeue_t schedule(private_scheduler_t * this)
  156. {
  157. timeval_t now;
  158. event_t *event;
  159. bool timed = FALSE, oldstate;
  160. this->mutex->lock(this->mutex);
  161. time_monotonic(&now);
  162. if ((event = peek_event(this)) != NULL)
  163. {
  164. if (timeval_cmp(&now, &event->time) >= 0)
  165. {
  166. remove_event(this);
  167. this->mutex->unlock(this->mutex);
  168. DBG2(DBG_JOB, "got event, queuing job for execution");
  169. lib->processor->queue_job(lib->processor, event->job);
  170. free(event);
  171. return JOB_REQUEUE_DIRECT;
  172. }
  173. timersub(&event->time, &now, &now);
  174. if (now.tv_sec)
  175. {
  176. DBG2(DBG_JOB, "next event in %ds %dms, waiting",
  177. now.tv_sec, now.tv_usec/1000);
  178. }
  179. else
  180. {
  181. DBG2(DBG_JOB, "next event in %dms, waiting", now.tv_usec/1000);
  182. }
  183. timed = TRUE;
  184. }
  185. thread_cleanup_push((thread_cleanup_t)this->mutex->unlock, this->mutex);
  186. oldstate = thread_cancelability(TRUE);
  187. if (timed)
  188. {
  189. this->condvar->timed_wait_abs(this->condvar, this->mutex, event->time);
  190. }
  191. else
  192. {
  193. DBG2(DBG_JOB, "no events, waiting");
  194. this->condvar->wait(this->condvar, this->mutex);
  195. }
  196. thread_cancelability(oldstate);
  197. thread_cleanup_pop(TRUE);
  198. return JOB_REQUEUE_DIRECT;
  199. }
  200. METHOD(scheduler_t, get_job_load, u_int,
  201. private_scheduler_t *this)
  202. {
  203. int count;
  204. this->mutex->lock(this->mutex);
  205. count = this->event_count;
  206. this->mutex->unlock(this->mutex);
  207. return count;
  208. }
  209. METHOD(scheduler_t, schedule_job_tv, void,
  210. private_scheduler_t *this, job_t *job, timeval_t tv)
  211. {
  212. event_t *event;
  213. u_int position;
  214. event = malloc_thing(event_t);
  215. event->job = job;
  216. event->job->status = JOB_STATUS_QUEUED;
  217. event->time = tv;
  218. this->mutex->lock(this->mutex);
  219. this->event_count++;
  220. if (this->event_count > this->heap_size)
  221. {
  222. /* double the size of the heap */
  223. this->heap_size <<= 1;
  224. this->heap = (event_t**)realloc(this->heap,
  225. (this->heap_size + 1) * sizeof(event_t*));
  226. }
  227. /* "put" the event to the bottom */
  228. position = this->event_count;
  229. /* then bubble it up */
  230. while (position > 1 && timeval_cmp(&this->heap[position >> 1]->time,
  231. &event->time) > 0)
  232. {
  233. /* parent has to be fired after the new event, move up */
  234. this->heap[position] = this->heap[position >> 1];
  235. position >>= 1;
  236. }
  237. this->heap[position] = event;
  238. this->condvar->signal(this->condvar);
  239. this->mutex->unlock(this->mutex);
  240. }
  241. METHOD(scheduler_t, schedule_job, void,
  242. private_scheduler_t *this, job_t *job, uint32_t s)
  243. {
  244. timeval_t tv;
  245. time_monotonic(&tv);
  246. tv.tv_sec += s;
  247. schedule_job_tv(this, job, tv);
  248. }
  249. METHOD(scheduler_t, schedule_job_ms, void,
  250. private_scheduler_t *this, job_t *job, uint32_t ms)
  251. {
  252. timeval_t tv, add;
  253. time_monotonic(&tv);
  254. add.tv_sec = ms / 1000;
  255. add.tv_usec = (ms % 1000) * 1000;
  256. timeradd(&tv, &add, &tv);
  257. schedule_job_tv(this, job, tv);
  258. }
  259. METHOD(scheduler_t, flush, void,
  260. private_scheduler_t *this)
  261. {
  262. event_t *event;
  263. this->mutex->lock(this->mutex);
  264. while ((event = remove_event(this)) != NULL)
  265. {
  266. event_destroy(event);
  267. }
  268. this->condvar->signal(this->condvar);
  269. this->mutex->unlock(this->mutex);
  270. }
  271. METHOD(scheduler_t, destroy, void,
  272. private_scheduler_t *this)
  273. {
  274. flush(this);
  275. this->condvar->destroy(this->condvar);
  276. this->mutex->destroy(this->mutex);
  277. free(this->heap);
  278. free(this);
  279. }
  280. /*
  281. * Described in header.
  282. */
  283. scheduler_t * scheduler_create()
  284. {
  285. private_scheduler_t *this;
  286. callback_job_t *job;
  287. INIT(this,
  288. .public = {
  289. .get_job_load = _get_job_load,
  290. .schedule_job = _schedule_job,
  291. .schedule_job_ms = _schedule_job_ms,
  292. .schedule_job_tv = _schedule_job_tv,
  293. .flush = _flush,
  294. .destroy = _destroy,
  295. },
  296. .heap_size = HEAP_SIZE_DEFAULT,
  297. .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
  298. .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
  299. );
  300. this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
  301. job = callback_job_create_with_prio((callback_job_cb_t)schedule, this,
  302. NULL, return_false, JOB_PRIO_CRITICAL);
  303. lib->processor->queue_job(lib->processor, (job_t*)job);
  304. return &this->public;
  305. }