00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061 #include "gw-config.h"
00062
00063 #include <stdlib.h>
00064 #include <unistd.h>
00065 #include <errno.h>
00066
00067 #include "gwlib/gwlib.h"
00068
00069
00070 struct FDSet
00071 {
00072
00073
00074
00075
00076 long poll_thread;
00077
00078
00079
00080
00081
00082
00083 struct pollfd *pollinfo;
00084 int size;
00085 int entries;
00086
00087
00088 time_t *times;
00089
00090
00091 long timeout;
00092
00093
00094
00095
00096 fdset_callback_t **callbacks;
00097 void **datafields;
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112 int scanning;
00113
00114
00115
00116
00117
00118 int deleted_entries;
00119
00120
00121
00122
00123
00124
00125
00126 List *actions;
00127 };
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140 struct action
00141 {
00142 enum { REGISTER, LISTEN, UNREGISTER, DESTROY, SET_TIMEOUT } type;
00143 int fd;
00144 int mask;
00145 int events;
00146 fdset_callback_t *callback;
00147 void *data;
00148 long timeout;
00149
00150
00151 List *done;
00152 };
00153
00154
00155 static struct action *action_create(int type)
00156 {
00157 struct action *new;
00158
00159 new = gw_malloc(sizeof(*new));
00160 new->type = type;
00161 new->fd = -1;
00162 new->mask = 0;
00163 new->events = 0;
00164 new->callback = NULL;
00165 new->data = NULL;
00166 new->done = NULL;
00167
00168 return new;
00169 }
00170
00171 static void action_destroy(struct action *action)
00172 {
00173 if (action == NULL)
00174 return;
00175
00176 gwlist_destroy(action->done, NULL);
00177 gw_free(action);
00178 }
00179
00180
00181 static void action_destroy_item(void *action)
00182 {
00183 action_destroy(action);
00184 }
00185
00186
00187
00188
00189
00190
00191 static void submit_action(FDSet *set, struct action *action)
00192 {
00193 List *done;
00194 void *sync;
00195
00196 gw_assert(set != NULL);
00197 gw_assert(action != NULL);
00198
00199 done = gwlist_create();
00200 gwlist_add_producer(done);
00201
00202 action->done = done;
00203
00204 gwlist_append(set->actions, action);
00205 gwthread_wakeup(set->poll_thread);
00206
00207 sync = gwlist_consume(done);
00208 gw_assert(sync == action);
00209
00210 action_destroy(action);
00211 }
00212
00213
00214
00215
00216 static void submit_action_nosync(FDSet *set, struct action *action)
00217 {
00218 gwlist_append(set->actions, action);
00219 gwthread_wakeup(set->poll_thread);
00220 }
00221
00222
00223
00224
00225
00226 static int handle_action(FDSet *set, struct action *action)
00227 {
00228 int result;
00229
00230 gw_assert(set != NULL);
00231 gw_assert(set->poll_thread == gwthread_self());
00232 gw_assert(action != NULL);
00233
00234 result = 0;
00235
00236 switch (action->type) {
00237 case REGISTER:
00238 fdset_register(set, action->fd, action->events,
00239 action->callback, action->data);
00240 break;
00241 case LISTEN:
00242 fdset_listen(set, action->fd, action->mask, action->events);
00243 break;
00244 case UNREGISTER:
00245 fdset_unregister(set, action->fd);
00246 break;
00247 case DESTROY:
00248 fdset_destroy(set);
00249 result = -1;
00250 break;
00251 case SET_TIMEOUT:
00252 set->timeout = action->timeout;
00253 break;
00254 default:
00255 panic(0, "fdset: handle_action got unknown action type %d.",
00256 action->type);
00257 }
00258
00259 if (action->done == NULL)
00260 action_destroy(action);
00261 else
00262 gwlist_produce(action->done, action);
00263
00264 return result;
00265 }
00266
00267
00268
00269 static int find_entry(FDSet *set, int fd)
00270 {
00271 int i;
00272
00273 gw_assert(set != NULL);
00274 gw_assert(gwthread_self() == set->poll_thread);
00275
00276 for (i = 0; i < set->entries; i++) {
00277 if (set->pollinfo[i].fd == fd)
00278 return i;
00279 }
00280
00281 return -1;
00282 }
00283
00284 static void remove_entry(FDSet *set, int entry)
00285 {
00286 if (entry != set->entries - 1) {
00287
00288
00289 set->pollinfo[entry] = set->pollinfo[set->entries - 1];
00290 set->callbacks[entry] = set->callbacks[set->entries - 1];
00291 set->datafields[entry] = set->datafields[set->entries - 1];
00292 set->times[entry] = set->times[set->entries - 1];
00293 }
00294 set->entries--;
00295 }
00296
00297 static void remove_deleted_entries(FDSet *set)
00298 {
00299 int i;
00300
00301 i = 0;
00302 while (i < set->entries && set->deleted_entries > 0) {
00303 if (set->pollinfo[i].fd < 0) {
00304 remove_entry(set, i);
00305 set->deleted_entries--;
00306 } else {
00307 i++;
00308 }
00309 }
00310 }
00311
00312
00313
00314
00315
00316
00317
00318 static void poller(void *arg)
00319 {
00320 FDSet *set = arg;
00321 struct action *action;
00322 int ret;
00323 int i;
00324 time_t now;
00325
00326 gw_assert(set != NULL);
00327
00328 for (;;) {
00329 while ((action = gwlist_extract_first(set->actions)) != NULL) {
00330
00331 if (handle_action(set, action) < 0)
00332 return;
00333 }
00334
00335
00336 ret = gwthread_poll(set->pollinfo, set->entries, set->timeout);
00337
00338 if (ret < 0) {
00339 if (errno != EINTR) {
00340 error(errno, "Poller: can't handle error; sleeping 1 second.");
00341 gwthread_sleep(1.0);
00342 }
00343 continue;
00344 }
00345 time(&now);
00346
00347 set->scanning = 1;
00348 for (i = 0; i < set->entries; i++) {
00349 if (set->pollinfo[i].revents != 0) {
00350 set->callbacks[i](set->pollinfo[i].fd,
00351 set->pollinfo[i].revents,
00352 set->datafields[i]);
00353
00354 time(&set->times[i]);
00355 } else if (set->timeout > 0 && difftime(set->times[i] + set->timeout, now) <= 0) {
00356 debug("gwlib.fdset", 0, "Timeout for fd:%d appeares.", set->pollinfo[i].fd);
00357 set->callbacks[i](set->pollinfo[i].fd, POLLERR, set->datafields[i]);
00358 }
00359 }
00360 set->scanning = 0;
00361
00362 if (set->deleted_entries > 0)
00363 remove_deleted_entries(set);
00364 }
00365 }
00366
00367
00368 FDSet *fdset_create_real(long timeout)
00369 {
00370 FDSet *new;
00371
00372 new = gw_malloc(sizeof(*new));
00373
00374
00375
00376 new->size = 1;
00377 new->entries = 0;
00378 new->pollinfo = gw_malloc(sizeof(new->pollinfo[0]) * new->size);
00379 new->callbacks = gw_malloc(sizeof(new->callbacks[0]) * new->size);
00380 new->datafields = gw_malloc(sizeof(new->datafields[0]) * new->size);
00381 new->times = gw_malloc(sizeof(new->times[0]) * new->size);
00382 new->timeout = timeout > 0 ? timeout : -1;
00383 new->scanning = 0;
00384 new->deleted_entries = 0;
00385
00386 new->actions = gwlist_create();
00387
00388 new->poll_thread = gwthread_create(poller, new);
00389 if (new->poll_thread < 0) {
00390 error(0, "Could not start internal thread for fdset.");
00391 fdset_destroy(new);
00392 return NULL;
00393 }
00394
00395 return new;
00396 }
00397
00398 void fdset_destroy(FDSet *set)
00399 {
00400 if (set == NULL)
00401 return;
00402
00403 if (set->poll_thread < 0 || gwthread_self() == set->poll_thread) {
00404 if (set->entries > 0) {
00405 warning(0, "Destroying fdset with %d active entries.",
00406 set->entries);
00407 }
00408 gw_free(set->pollinfo);
00409 gw_free(set->callbacks);
00410 gw_free(set->datafields);
00411 gw_free(set->times);
00412 if (gwlist_len(set->actions) > 0) {
00413 error(0, "Destroying fdset with %ld pending actions.",
00414 gwlist_len(set->actions));
00415 }
00416 gwlist_destroy(set->actions, action_destroy_item);
00417 gw_free(set);
00418 } else {
00419 long thread = set->poll_thread;
00420 submit_action(set, action_create(DESTROY));
00421 gwthread_join(thread);
00422 }
00423 }
00424
00425 void fdset_register(FDSet *set, int fd, int events,
00426 fdset_callback_t callback, void *data)
00427 {
00428 int new;
00429
00430 gw_assert(set != NULL);
00431
00432 if (gwthread_self() != set->poll_thread) {
00433 struct action *action;
00434
00435 action = action_create(REGISTER);
00436 action->fd = fd;
00437 action->events = events;
00438 action->callback = callback;
00439 action->data = data;
00440 submit_action_nosync(set, action);
00441 return;
00442 }
00443
00444 gw_assert(set->entries <= set->size);
00445
00446 if (set->entries >= set->size) {
00447 int newsize = set->entries + 1;
00448 set->pollinfo = gw_realloc(set->pollinfo,
00449 sizeof(set->pollinfo[0]) * newsize);
00450 set->callbacks = gw_realloc(set->callbacks,
00451 sizeof(set->callbacks[0]) * newsize);
00452 set->datafields = gw_realloc(set->datafields,
00453 sizeof(set->datafields[0]) * newsize);
00454 set->times = gw_realloc(set->times, sizeof(set->times[0]) * newsize);
00455 set->size = newsize;
00456 }
00457
00458
00459
00460
00461 new = set->entries++;
00462 set->pollinfo[new].fd = fd;
00463 set->pollinfo[new].events = events;
00464 set->pollinfo[new].revents = 0;
00465 set->callbacks[new] = callback;
00466 set->datafields[new] = data;
00467 time(&set->times[new]);
00468 }
00469
00470 void fdset_listen(FDSet *set, int fd, int mask, int events)
00471 {
00472 int entry;
00473
00474 gw_assert(set != NULL);
00475
00476 if (gwthread_self() != set->poll_thread) {
00477 struct action *action;
00478
00479 action = action_create(LISTEN);
00480 action->fd = fd;
00481 action->mask = mask;
00482 action->events = events;
00483 submit_action(set, action);
00484 return;
00485 }
00486
00487 entry = find_entry(set, fd);
00488 if (entry < 0) {
00489 warning(0, "fdset_listen called on unregistered fd %d.", fd);
00490 return;
00491 }
00492
00493
00494
00495 set->pollinfo[entry].events =
00496 (set->pollinfo[entry].events & ~mask) | (events & mask);
00497
00498
00499
00500
00501
00502 if (set->scanning) {
00503 set->pollinfo[entry].revents =
00504 set->pollinfo[entry].revents & (events | ~mask);
00505 }
00506
00507 time(&set->times[entry]);
00508 }
00509
00510 void fdset_unregister(FDSet *set, int fd)
00511 {
00512 int entry;
00513
00514 gw_assert(set != NULL);
00515
00516 if (gwthread_self() != set->poll_thread) {
00517 struct action *action;
00518
00519 action = action_create(UNREGISTER);
00520 action->fd = fd;
00521 submit_action(set, action);
00522 return;
00523 }
00524
00525
00526
00527 entry = find_entry(set, fd);
00528 if (entry < 0) {
00529 warning(0, "fdset_listen called on unregistered fd %d.", fd);
00530 return;
00531 }
00532
00533 if (entry == set->entries - 1) {
00534
00535
00536 set->entries--;
00537 } else if (set->scanning) {
00538
00539
00540 set->pollinfo[entry].fd = -1;
00541 set->deleted_entries++;
00542 } else {
00543 remove_entry(set, entry);
00544 }
00545 }
00546
00547 void fdset_set_timeout(FDSet *set, long timeout)
00548 {
00549 gw_assert(set != NULL);
00550
00551 if (gwthread_self() != set->poll_thread) {
00552 struct action *action;
00553
00554 action = action_create(SET_TIMEOUT);
00555 action->timeout = timeout;
00556 submit_action(set, action);
00557 return;
00558 }
00559 set->timeout = timeout;
00560 }
See file LICENSE for details about the license agreement for using,
modifying, copying or deriving work from this software.