Main Page | Alphabetical List | Data Structures | Directories | File List | Data Fields | Globals

fdset.c

Go to the documentation of this file.
00001 /* ==================================================================== 
00002  * The Kannel Software License, Version 1.0 
00003  * 
00004  * Copyright (c) 2001-2008 Kannel Group  
00005  * Copyright (c) 1998-2001 WapIT Ltd.   
00006  * All rights reserved. 
00007  * 
00008  * Redistribution and use in source and binary forms, with or without 
00009  * modification, are permitted provided that the following conditions 
00010  * are met: 
00011  * 
00012  * 1. Redistributions of source code must retain the above copyright 
00013  *    notice, this list of conditions and the following disclaimer. 
00014  * 
00015  * 2. Redistributions in binary form must reproduce the above copyright 
00016  *    notice, this list of conditions and the following disclaimer in 
00017  *    the documentation and/or other materials provided with the 
00018  *    distribution. 
00019  * 
00020  * 3. The end-user documentation included with the redistribution, 
00021  *    if any, must include the following acknowledgment: 
00022  *       "This product includes software developed by the 
00023  *        Kannel Group (http://www.kannel.org/)." 
00024  *    Alternately, this acknowledgment may appear in the software itself, 
00025  *    if and wherever such third-party acknowledgments normally appear. 
00026  * 
00027  * 4. The names "Kannel" and "Kannel Group" must not be used to 
00028  *    endorse or promote products derived from this software without 
00029  *    prior written permission. For written permission, please  
00030  *    contact org@kannel.org. 
00031  * 
00032  * 5. Products derived from this software may not be called "Kannel", 
00033  *    nor may "Kannel" appear in their name, without prior written 
00034  *    permission of the Kannel Group. 
00035  * 
00036  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 
00037  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
00038  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
00039  * DISCLAIMED.  IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS 
00040  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,  
00041  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT  
00042  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR  
00043  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
00044  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE  
00045  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  
00046  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
00047  * ==================================================================== 
00048  * 
00049  * This software consists of voluntary contributions made by many 
00050  * individuals on behalf of the Kannel Group.  For more information on  
00051  * the Kannel Group, please see <http://www.kannel.org/>. 
00052  * 
00053  * Portions of this software are based upon software originally written at  
00054  * WapIT Ltd., Helsinki, Finland for the Kannel project.  
00055  */ 
00056 
00057 /*
00058  * fdset.c - module for managing a large collection of file descriptors
00059  */
00060 
00061 #include "gw-config.h"
00062  
00063 #include <stdlib.h>
00064 #include <unistd.h>
00065 #include <errno.h>
00066 
00067 #include "gwlib/gwlib.h"
00068 
00069 
00070 struct FDSet
00071 {
00072     /* Thread ID of the set's internal thread, which will spend most
00073      * of its time blocking on poll().  This is set when the thread
00074      * is created, and not changed after that.  It's not protected
00075      * by any lock. */
00076     long poll_thread;
00077 
00078     /* The following fields are for use by the polling thread only.
00079      * No-one else may touch them.  It's not protected by any lock. */
00080 
00081     /* Array for use with poll().  Elements 0 through size-1 are allocated.
00082      * Elements 0 through entries-1 are in use. */
00083     struct pollfd *pollinfo;
00084     int size;
00085     int entries;
00086     
00087     /* Array of times when appropriate fd got any event or events bitmask changed */
00088     time_t *times;
00089 
00090     /* timeout for this fdset */
00091     long timeout;
00092 
00093     /* Arrays of callback and data fields.  They are kept in sync with
00094      * the pollinfo array, and are basically extra fields that we couldn't
00095      * put in struct pollfd because that structure is defined externally. */
00096     fdset_callback_t **callbacks;
00097     void **datafields;
00098 
00099     /* The poller function loops over the table after poll() returns,
00100      * and calls callback functions that may modify the table that is
00101      * being scanned.  We can't just copy the table to avoid interference,
00102      * because fdset_unregister and fdset_listen guarantee that their
00103      * operations are complete when they return -- that does not work
00104      * if poller() is scanning an outdated copy of the table.
00105      * To solve this, we have a field that marks when the table is
00106      * being scanned.  If this field is true, fdset_unregister merely
00107      * sets the fd to -1 instead of deleting the whole entry.
00108      * fdset_listen will takes care to modify revents as well as
00109      * events. fdset_register always adds to the end of the table,
00110      * so it does not have to do anything special.
00111      */
00112     int scanning;
00113 
00114     /* This field keeps track of how many fds were set to -1 by
00115      * fdset_unregister while "scanning" is true.  That way we can
00116      * efficiently check if we need to scan the table to really 
00117      * delete those entries. */
00118     int deleted_entries;
00119 
00120     
00121     /* The following fields are for general use, and are of types that
00122      * have internal locks. */
00123 
00124     /* List of struct action.  Used by other threads to make requests
00125      * of the polling thread. */
00126     List *actions;
00127 };
00128 
00129 /* Datatype to describe changes to the fdset fields that only the polling
00130  * thread may touch.  Other threads use this type to submit requests to
00131  * change those fields. */
00132 /* Action life cycle: Created, then pushed on set->actions list by
00133  * action_submit.  Poller thread wakes up and takes it from the list,
00134  * then calls handle_action, which performs the action and pushes it
00135  * on the action's done list.  action_submit then takes it back and
00136  * destroys it. */
00137 /* If no synchronization is needed, action_submit_nosync can be used.
00138  * In that case handle_action will destroy the action itself instead
00139  * of putting it on any list. */
00140 struct action
00141 {
00142     enum { REGISTER, LISTEN, UNREGISTER, DESTROY, SET_TIMEOUT } type;
00143     int fd;                     /* Used by REGISTER, LISTEN, and UNREGISTER */
00144     int mask;                   /* Used by LISTEN */
00145     int events;                 /* Used by REGISTER and LISTEN */
00146     fdset_callback_t *callback; /* Used by REGISTER */
00147     void *data;                 /* Used by REGISTER */
00148     long timeout;               /* Used by SET_TIMEOUT */
00149     /* When the request has been handled, an element is produced on this
00150      * list, so that the submitter can synchronize.  Can be left NULL. */
00151     List *done;                 /* Used by LISTEN, UNREGISTER, and DESTROY */
00152 };
00153 
00154 /* Return a new action structure of the given type, with all fields empty. */
00155 static struct action *action_create(int type)
00156 {
00157     struct action *new;
00158 
00159     new = gw_malloc(sizeof(*new));
00160     new->type = type;
00161     new->fd = -1;
00162     new->mask = 0;
00163     new->events = 0;
00164     new->callback = NULL;
00165     new->data = NULL;
00166     new->done = NULL;
00167 
00168     return new;
00169 }
00170 
00171 static void action_destroy(struct action *action)
00172 {
00173     if (action == NULL)
00174         return;
00175 
00176     gwlist_destroy(action->done, NULL);
00177     gw_free(action);
00178 }
00179 
00180 /* For use with gwlist_destroy */
00181 static void action_destroy_item(void *action)
00182 {
00183     action_destroy(action);
00184 }
00185 
00186 
00187 /*
00188  * Submit an action for this set, and wait for the polling thread to
00189  * confirm that it's been done, by pushing the action on its done list.
00190  */
00191 static void submit_action(FDSet *set, struct action *action)
00192 {
00193     List *done;
00194     void *sync;
00195 
00196     gw_assert(set != NULL);
00197     gw_assert(action != NULL);
00198 
00199     done = gwlist_create();
00200     gwlist_add_producer(done);
00201 
00202     action->done = done;
00203 
00204     gwlist_append(set->actions, action);
00205     gwthread_wakeup(set->poll_thread);
00206 
00207     sync = gwlist_consume(done);
00208     gw_assert(sync == action);
00209 
00210     action_destroy(action);
00211 }
00212 
00213 /* 
00214  * As above, but don't wait for confirmation.
00215  */
00216 static void submit_action_nosync(FDSet *set, struct action *action)
00217 {
00218     gwlist_append(set->actions, action);
00219     gwthread_wakeup(set->poll_thread);
00220 }
00221 
00222 /* Do one action for this thread and confirm that it's been done by
00223  * appending the action to its done list.  May only be called by
00224  * the polling thread.  Returns 0 normally, and returns -1 if the
00225  * action destroyed the set. */
00226 static int handle_action(FDSet *set, struct action *action)
00227 {
00228     int result;
00229 
00230     gw_assert(set != NULL);
00231     gw_assert(set->poll_thread == gwthread_self());
00232     gw_assert(action != NULL);
00233 
00234     result = 0;
00235 
00236     switch (action->type) {
00237     case REGISTER:
00238         fdset_register(set, action->fd, action->events,
00239                        action->callback, action->data);
00240         break;
00241     case LISTEN:
00242         fdset_listen(set, action->fd, action->mask, action->events);
00243         break;
00244     case UNREGISTER:
00245         fdset_unregister(set, action->fd);
00246         break;
00247     case DESTROY:
00248         fdset_destroy(set);
00249         result = -1;
00250         break;
00251     case SET_TIMEOUT:
00252         set->timeout = action->timeout;
00253         break;
00254     default:
00255         panic(0, "fdset: handle_action got unknown action type %d.",
00256               action->type);
00257     }
00258 
00259     if (action->done == NULL)
00260     action_destroy(action);
00261     else
00262         gwlist_produce(action->done, action);
00263 
00264     return result;
00265 }
00266 
00267 /* Look up the entry number in the pollinfo array for this fd.
00268  * Right now it's a linear search, this may have to be improved. */
00269 static int find_entry(FDSet *set, int fd)
00270 {
00271     int i;
00272 
00273     gw_assert(set != NULL);
00274     gw_assert(gwthread_self() == set->poll_thread);
00275 
00276     for (i = 0; i < set->entries; i++) {
00277         if (set->pollinfo[i].fd == fd)
00278             return i;
00279     }
00280 
00281     return -1;
00282 }
00283 
00284 static void remove_entry(FDSet *set, int entry)
00285 {
00286     if (entry != set->entries - 1) {
00287         /* We need to keep the array contiguous, so move the last element
00288          * to fill in the hole. */
00289         set->pollinfo[entry] = set->pollinfo[set->entries - 1];
00290         set->callbacks[entry] = set->callbacks[set->entries - 1];
00291         set->datafields[entry] = set->datafields[set->entries - 1];
00292         set->times[entry] = set->times[set->entries - 1];
00293     }
00294     set->entries--;
00295 }
00296 
00297 static void remove_deleted_entries(FDSet *set)
00298 {
00299     int i;
00300 
00301     i = 0;
00302     while (i < set->entries && set->deleted_entries > 0) {
00303         if (set->pollinfo[i].fd < 0) {
00304             remove_entry(set, i);
00305         set->deleted_entries--;
00306     } else {
00307         i++;
00308         }
00309     }
00310 }
00311 
00312 /* Main function for polling thread.  Most its time is spent blocking
00313  * in poll().  No-one else is allowed to change the fields it uses,
00314  * so other threads just put something on the actions list and wake
00315  * up this thread.  That's why it checks the actions list every time
00316  * it goes through the loop.
00317  */
00318 static void poller(void *arg)
00319 {
00320     FDSet *set = arg;
00321     struct action *action;
00322     int ret;
00323     int i;
00324     time_t now;
00325 
00326     gw_assert(set != NULL);
00327 
00328     for (;;) {
00329         while ((action = gwlist_extract_first(set->actions)) != NULL) {
00330             /* handle_action returns -1 if the set was destroyed. */
00331             if (handle_action(set, action) < 0)
00332                 return;
00333         }
00334 
00335         /* Block for defined timeout, waiting for activity */
00336         ret = gwthread_poll(set->pollinfo, set->entries, set->timeout);
00337 
00338         if (ret < 0) {
00339             if (errno != EINTR) {
00340                 error(errno, "Poller: can't handle error; sleeping 1 second.");
00341                 gwthread_sleep(1.0);
00342             }
00343             continue;
00344         }
00345         time(&now);
00346         /* Callbacks may modify the table while we scan it, so be careful. */
00347         set->scanning = 1;
00348         for (i = 0; i < set->entries; i++) {
00349             if (set->pollinfo[i].revents != 0) {
00350                 set->callbacks[i](set->pollinfo[i].fd,
00351                                 set->pollinfo[i].revents,
00352                                 set->datafields[i]);
00353                 /* update event time */
00354                 time(&set->times[i]);
00355             } else if (set->timeout > 0 && difftime(set->times[i] + set->timeout, now) <= 0) {
00356                 debug("gwlib.fdset", 0, "Timeout for fd:%d appeares.", set->pollinfo[i].fd);
00357                 set->callbacks[i](set->pollinfo[i].fd, POLLERR, set->datafields[i]);
00358             }
00359         }
00360         set->scanning = 0;
00361 
00362     if (set->deleted_entries > 0)
00363         remove_deleted_entries(set);
00364     }
00365 }
00366 
00367 
00368 FDSet *fdset_create_real(long timeout)
00369 {
00370     FDSet *new;
00371 
00372     new = gw_malloc(sizeof(*new));
00373 
00374     /* Start off with space for one element because we can't malloc 0 bytes
00375      * and we don't want to worry about these pointers being NULL. */
00376     new->size = 1;
00377     new->entries = 0;
00378     new->pollinfo = gw_malloc(sizeof(new->pollinfo[0]) * new->size);
00379     new->callbacks = gw_malloc(sizeof(new->callbacks[0]) * new->size);
00380     new->datafields = gw_malloc(sizeof(new->datafields[0]) * new->size);
00381     new->times = gw_malloc(sizeof(new->times[0]) * new->size);
00382     new->timeout = timeout > 0 ? timeout : -1;
00383     new->scanning = 0;
00384     new->deleted_entries = 0;
00385 
00386     new->actions = gwlist_create();
00387 
00388     new->poll_thread = gwthread_create(poller, new);
00389     if (new->poll_thread < 0) {
00390         error(0, "Could not start internal thread for fdset.");
00391         fdset_destroy(new);
00392         return NULL;
00393     }
00394 
00395     return new;
00396 }
00397 
00398 void fdset_destroy(FDSet *set)
00399 {
00400     if (set == NULL)
00401         return;
00402 
00403     if (set->poll_thread < 0 || gwthread_self() == set->poll_thread) {
00404         if (set->entries > 0) {
00405             warning(0, "Destroying fdset with %d active entries.",
00406                     set->entries);
00407         }
00408         gw_free(set->pollinfo);
00409         gw_free(set->callbacks);
00410         gw_free(set->datafields);
00411         gw_free(set->times);
00412         if (gwlist_len(set->actions) > 0) {
00413             error(0, "Destroying fdset with %ld pending actions.",
00414                   gwlist_len(set->actions));
00415         }
00416         gwlist_destroy(set->actions, action_destroy_item);
00417         gw_free(set);
00418     } else {
00419         long thread = set->poll_thread;
00420         submit_action(set, action_create(DESTROY));
00421     gwthread_join(thread);
00422     }
00423 }
00424 
00425 void fdset_register(FDSet *set, int fd, int events,
00426                     fdset_callback_t callback, void *data)
00427 {
00428     int new;
00429 
00430     gw_assert(set != NULL);
00431 
00432     if (gwthread_self() != set->poll_thread) {
00433         struct action *action;
00434 
00435         action = action_create(REGISTER);
00436         action->fd = fd;
00437         action->events = events;
00438         action->callback = callback;
00439         action->data = data;
00440     submit_action_nosync(set, action);
00441         return;
00442     }
00443 
00444     gw_assert(set->entries <= set->size);
00445 
00446     if (set->entries >= set->size) {
00447         int newsize = set->entries + 1;
00448         set->pollinfo = gw_realloc(set->pollinfo,
00449                                    sizeof(set->pollinfo[0]) * newsize);
00450         set->callbacks = gw_realloc(set->callbacks,
00451                                    sizeof(set->callbacks[0]) * newsize);
00452         set->datafields = gw_realloc(set->datafields,
00453                                    sizeof(set->datafields[0]) * newsize);
00454         set->times = gw_realloc(set->times, sizeof(set->times[0]) * newsize);
00455         set->size = newsize;
00456     }
00457 
00458     /* We don't check set->scanning.  Adding new entries is not harmful
00459      * because their revents fields are 0. */
00460 
00461     new = set->entries++;
00462     set->pollinfo[new].fd = fd;
00463     set->pollinfo[new].events = events;
00464     set->pollinfo[new].revents = 0;
00465     set->callbacks[new] = callback;
00466     set->datafields[new] = data;
00467     time(&set->times[new]);
00468 }
00469 
00470 void fdset_listen(FDSet *set, int fd, int mask, int events)
00471 {
00472     int entry;
00473 
00474     gw_assert(set != NULL);
00475 
00476     if (gwthread_self() != set->poll_thread) {
00477         struct action *action;
00478 
00479         action = action_create(LISTEN);
00480         action->fd = fd;
00481     action->mask = mask;
00482         action->events = events;
00483         submit_action(set, action);
00484         return;
00485     }
00486 
00487     entry = find_entry(set, fd);   
00488     if (entry < 0) {
00489         warning(0, "fdset_listen called on unregistered fd %d.", fd);
00490         return;
00491     }
00492 
00493     /* Copy the bits from events specified by the mask, and preserve the
00494      * bits not specified by the mask. */
00495     set->pollinfo[entry].events =
00496     (set->pollinfo[entry].events & ~mask) | (events & mask);
00497 
00498     /* If poller is currently scanning the array, then change the
00499      * revents field so that the callback function will not be called
00500      * for events we should no longer listen for.  The idea is the
00501      * same as for the events field, except that we only turn bits off. */
00502     if (set->scanning) {
00503         set->pollinfo[entry].revents =
00504             set->pollinfo[entry].revents & (events | ~mask);
00505     }
00506     
00507     time(&set->times[entry]);
00508 }
00509 
00510 void fdset_unregister(FDSet *set, int fd)
00511 {
00512     int entry;
00513 
00514     gw_assert(set != NULL);
00515 
00516     if (gwthread_self() != set->poll_thread) {
00517         struct action *action;
00518 
00519         action = action_create(UNREGISTER);
00520         action->fd = fd;
00521         submit_action(set, action);
00522         return;
00523     }
00524 
00525     /* Remove the entry from the pollinfo array */
00526 
00527     entry = find_entry(set, fd);
00528     if (entry < 0) {
00529         warning(0, "fdset_listen called on unregistered fd %d.", fd);
00530         return;
00531     }
00532 
00533     if (entry == set->entries - 1) {
00534         /* It's the last entry.  We can safely remove it even while
00535          * the array is being scanned, because the scan checks set->entries. */
00536         set->entries--;
00537     } else if (set->scanning) {
00538         /* We can't remove entries because the array is being
00539          * scanned.  Mark it as deleted.  */
00540         set->pollinfo[entry].fd = -1;
00541         set->deleted_entries++;
00542     } else {
00543         remove_entry(set, entry);
00544     }
00545 }
00546 
00547 void fdset_set_timeout(FDSet *set, long timeout)
00548 {
00549     gw_assert(set != NULL);
00550 
00551     if (gwthread_self() != set->poll_thread) {
00552         struct action *action;
00553 
00554         action = action_create(SET_TIMEOUT);
00555         action->timeout = timeout;
00556         submit_action(set, action);
00557         return;
00558     }
00559     set->timeout = timeout;
00560 }
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.