Kannel: Open Source WAP and SMS gateway  svn-r5335
fdset.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * fdset.c - module for managing a large collection of file descriptors
59  */
60 
61 #include "gw-config.h"
62 
63 #include <stdlib.h>
64 #include <unistd.h>
65 #include <errno.h>
66 
67 #include "gwlib/gwlib.h"
68 
69 
70 struct FDSet
71 {
72  /* Thread ID of the set's internal thread, which will spend most
73  * of its time blocking on poll(). This is set when the thread
74  * is created, and not changed after that. It's not protected
75  * by any lock. */
77 
78  /* The following fields are for use by the polling thread only.
79  * No-one else may touch them. It's not protected by any lock. */
80 
81  /* Array for use with poll(). Elements 0 through size-1 are allocated.
82  * Elements 0 through entries-1 are in use. */
83  struct pollfd *pollinfo;
84  int size;
85  int entries;
86 
87  /* Array of times when appropriate fd got any event or events bitmask changed */
88  time_t *times;
89 
90  /* timeout for this fdset */
91  long timeout;
92 
93  /* Arrays of callback and data fields. They are kept in sync with
94  * the pollinfo array, and are basically extra fields that we couldn't
95  * put in struct pollfd because that structure is defined externally. */
97  void **datafields;
98 
99  /* The poller function loops over the table after poll() returns,
100  * and calls callback functions that may modify the table that is
101  * being scanned. We can't just copy the table to avoid interference,
102  * because fdset_unregister and fdset_listen guarantee that their
103  * operations are complete when they return -- that does not work
104  * if poller() is scanning an outdated copy of the table.
105  * To solve this, we have a field that marks when the table is
106  * being scanned. If this field is true, fdset_unregister merely
107  * sets the fd to -1 instead of deleting the whole entry.
108  * fdset_listen will takes care to modify revents as well as
109  * events. fdset_register always adds to the end of the table,
110  * so it does not have to do anything special.
111  */
112  int scanning;
113 
114  /* This field keeps track of how many fds were set to -1 by
115  * fdset_unregister while "scanning" is true. That way we can
116  * efficiently check if we need to scan the table to really
117  * delete those entries. */
119 
120 
121  /* The following fields are for general use, and are of types that
122  * have internal locks. */
123 
124  /* List of struct action. Used by other threads to make requests
125  * of the polling thread. */
127 };
128 
129 /* Datatype to describe changes to the fdset fields that only the polling
130  * thread may touch. Other threads use this type to submit requests to
131  * change those fields. */
132 /* Action life cycle: Created, then pushed on set->actions list by
133  * action_submit. Poller thread wakes up and takes it from the list,
134  * then calls handle_action, which performs the action and pushes it
135  * on the action's done list. action_submit then takes it back and
136  * destroys it. */
137 /* If no synchronization is needed, action_submit_nosync can be used.
138  * In that case handle_action will destroy the action itself instead
139  * of putting it on any list. */
140 struct action
141 {
143  int fd; /* Used by REGISTER, LISTEN, and UNREGISTER */
144  int mask; /* Used by LISTEN */
145  int events; /* Used by REGISTER and LISTEN */
146  fdset_callback_t *callback; /* Used by REGISTER */
147  void *data; /* Used by REGISTER */
148  long timeout; /* Used by SET_TIMEOUT */
149  /* When the request has been handled, an element is produced on this
150  * list, so that the submitter can synchronize. Can be left NULL. */
151  List *done; /* Used by LISTEN, UNREGISTER, and DESTROY */
152 };
153 
154 /* Return a new action structure of the given type, with all fields empty. */
155 static struct action *action_create(int type)
156 {
157  struct action *new;
158 
159  new = gw_malloc(sizeof(*new));
160  new->type = type;
161  new->fd = -1;
162  new->mask = 0;
163  new->events = 0;
164  new->callback = NULL;
165  new->data = NULL;
166  new->done = NULL;
167 
168  return new;
169 }
170 
171 static void action_destroy(struct action *action)
172 {
173  if (action == NULL)
174  return;
175 
176  gwlist_destroy(action->done, NULL);
177  gw_free(action);
178 }
179 
180 /* For use with gwlist_destroy */
181 static void action_destroy_item(void *action)
182 {
184 }
185 
186 
187 /*
188  * Submit an action for this set, and wait for the polling thread to
189  * confirm that it's been done, by pushing the action on its done list.
190  */
191 static void submit_action(FDSet *set, struct action *action)
192 {
193  List *done;
194  void *sync;
195 
196  gw_assert(set != NULL);
197  gw_assert(action != NULL);
198 
199  done = gwlist_create();
201 
202  action->done = done;
203 
204  gwlist_append(set->actions, action);
205  gwthread_wakeup(set->poll_thread);
206 
207  sync = gwlist_consume(done);
208  gw_assert(sync == action);
209 
211 }
212 
213 /*
214  * As above, but don't wait for confirmation.
215  */
216 static void submit_action_nosync(FDSet *set, struct action *action)
217 {
218  gwlist_append(set->actions, action);
219  gwthread_wakeup(set->poll_thread);
220 }
221 
222 /* Do one action for this thread and confirm that it's been done by
223  * appending the action to its done list. May only be called by
224  * the polling thread. Returns 0 normally, and returns -1 if the
225  * action destroyed the set. */
226 static int handle_action(FDSet *set, struct action *action)
227 {
228  int result;
229 
230  gw_assert(set != NULL);
231  gw_assert(set->poll_thread == gwthread_self());
232  gw_assert(action != NULL);
233 
234  result = 0;
235 
236  switch (action->type) {
237  case REGISTER:
240  break;
241  case LISTEN:
243  break;
244  case UNREGISTER:
245  fdset_unregister(set, action->fd);
246  break;
247  case DESTROY:
248  fdset_destroy(set);
249  result = -1;
250  break;
251  case SET_TIMEOUT:
252  set->timeout = action->timeout;
253  break;
254  default:
255  panic(0, "fdset: handle_action got unknown action type %d.",
256  action->type);
257  }
258 
259  if (action->done == NULL)
261  else
263 
264  return result;
265 }
266 
267 /* Look up the entry number in the pollinfo array for this fd.
268  * Right now it's a linear search, this may have to be improved. */
269 static int find_entry(FDSet *set, int fd)
270 {
271  int i;
272 
273  gw_assert(set != NULL);
274  gw_assert(gwthread_self() == set->poll_thread);
275 
276  for (i = 0; i < set->entries; i++) {
277  if (set->pollinfo[i].fd == fd)
278  return i;
279  }
280 
281  return -1;
282 }
283 
284 static void remove_entry(FDSet *set, int entry)
285 {
286  if (entry != set->entries - 1) {
287  /* We need to keep the array contiguous, so move the last element
288  * to fill in the hole. */
289  set->pollinfo[entry] = set->pollinfo[set->entries - 1];
290  set->callbacks[entry] = set->callbacks[set->entries - 1];
291  set->datafields[entry] = set->datafields[set->entries - 1];
292  set->times[entry] = set->times[set->entries - 1];
293  }
294  set->entries--;
295 }
296 
297 static void remove_deleted_entries(FDSet *set)
298 {
299  int i;
300 
301  i = 0;
302  while (i < set->entries && set->deleted_entries > 0) {
303  if (set->pollinfo[i].fd < 0) {
304  remove_entry(set, i);
305  set->deleted_entries--;
306  } else {
307  i++;
308  }
309  }
310 }
311 
312 /* Main function for polling thread. Most its time is spent blocking
313  * in poll(). No-one else is allowed to change the fields it uses,
314  * so other threads just put something on the actions list and wake
315  * up this thread. That's why it checks the actions list every time
316  * it goes through the loop.
317  */
318 static void poller(void *arg)
319 {
320  FDSet *set = arg;
321  struct action *action;
322  int ret;
323  int i;
324  time_t now;
325 
326  gw_assert(set != NULL);
327 
328  for (;;) {
329  while ((action = gwlist_extract_first(set->actions)) != NULL) {
330  /* handle_action returns -1 if the set was destroyed. */
331  if (handle_action(set, action) < 0)
332  return;
333  }
334 
335  /* Block for defined timeout, waiting for activity */
336  ret = gwthread_poll(set->pollinfo, set->entries, set->timeout);
337 
338  if (ret < 0) {
339  if (errno != EINTR) {
340  error(errno, "Poller: can't handle error; sleeping 1 second.");
341  gwthread_sleep(1.0);
342  }
343  continue;
344  }
345  time(&now);
346  /* Callbacks may modify the table while we scan it, so be careful. */
347  set->scanning = 1;
348  for (i = 0; i < set->entries; i++) {
349  if (set->pollinfo[i].revents != 0) {
350  set->callbacks[i](set->pollinfo[i].fd,
351  set->pollinfo[i].revents,
352  set->datafields[i]);
353  /* update event time */
354  time(&set->times[i]);
355  } else if (set->timeout > 0 && difftime(set->times[i] + set->timeout, now) <= 0) {
356  debug("gwlib.fdset", 0, "Timeout for fd:%d appears.", set->pollinfo[i].fd);
357  set->callbacks[i](set->pollinfo[i].fd, POLLERR, set->datafields[i]);
358  }
359  }
360  set->scanning = 0;
361 
362  if (set->deleted_entries > 0)
364  }
365 }
366 
367 
369 {
370  FDSet *new;
371 
372  new = gw_malloc(sizeof(*new));
373 
374  /* Start off with space for one element because we can't malloc 0 bytes
375  * and we don't want to worry about these pointers being NULL. */
376  new->size = 1;
377  new->entries = 0;
378  new->pollinfo = gw_malloc(sizeof(new->pollinfo[0]) * new->size);
379  new->callbacks = gw_malloc(sizeof(new->callbacks[0]) * new->size);
380  new->datafields = gw_malloc(sizeof(new->datafields[0]) * new->size);
381  new->times = gw_malloc(sizeof(new->times[0]) * new->size);
382  new->timeout = timeout > 0 ? timeout : -1;
383  new->scanning = 0;
384  new->deleted_entries = 0;
385 
386  new->actions = gwlist_create();
387 
388  new->poll_thread = gwthread_create(poller, new);
389  if (new->poll_thread < 0) {
390  error(0, "Could not start internal thread for fdset.");
391  fdset_destroy(new);
392  return NULL;
393  }
394 
395  return new;
396 }
397 
399 {
400  if (set == NULL)
401  return;
402 
403  if (set->poll_thread < 0 || gwthread_self() == set->poll_thread) {
404  if (set->entries > 0) {
405  warning(0, "Destroying fdset with %d active entries.",
406  set->entries);
407  }
408  gw_free(set->pollinfo);
409  gw_free(set->callbacks);
410  gw_free(set->datafields);
411  gw_free(set->times);
412  if (gwlist_len(set->actions) > 0) {
413  error(0, "Destroying fdset with %ld pending actions.",
414  gwlist_len(set->actions));
415  }
416  gwlist_destroy(set->actions, action_destroy_item);
417  gw_free(set);
418  } else {
419  long thread = set->poll_thread;
421  gwthread_join(thread);
422  }
423 }
424 
425 void fdset_register(FDSet *set, int fd, int events,
427 {
428  int new;
429 
430  gw_assert(set != NULL);
431 
432  if (gwthread_self() != set->poll_thread) {
433  struct action *action;
434 
436  action->fd = fd;
437  action->events = events;
439  action->data = data;
441  return;
442  }
443 
444  gw_assert(set->entries <= set->size);
445 
446  if (set->entries >= set->size) {
447  int newsize = set->entries + 1;
448  set->pollinfo = gw_realloc(set->pollinfo,
449  sizeof(set->pollinfo[0]) * newsize);
450  set->callbacks = gw_realloc(set->callbacks,
451  sizeof(set->callbacks[0]) * newsize);
452  set->datafields = gw_realloc(set->datafields,
453  sizeof(set->datafields[0]) * newsize);
454  set->times = gw_realloc(set->times, sizeof(set->times[0]) * newsize);
455  set->size = newsize;
456  }
457 
458  /* We don't check set->scanning. Adding new entries is not harmful
459  * because their revents fields are 0. */
460 
461  new = set->entries++;
462  set->pollinfo[new].fd = fd;
463  set->pollinfo[new].events = events;
464  set->pollinfo[new].revents = 0;
465  set->callbacks[new] = callback;
466  set->datafields[new] = data;
467  time(&set->times[new]);
468 }
469 
470 void fdset_listen(FDSet *set, int fd, int mask, int events)
471 {
472  int entry;
473 
474  gw_assert(set != NULL);
475 
476  if (gwthread_self() != set->poll_thread) {
477  struct action *action;
478 
480  action->fd = fd;
481  action->mask = mask;
482  action->events = events;
483  submit_action(set, action);
484  return;
485  }
486 
487  entry = find_entry(set, fd);
488  if (entry < 0) {
489  warning(0, "fdset_listen called on unregistered fd %d.", fd);
490  return;
491  }
492 
493  /* Copy the bits from events specified by the mask, and preserve the
494  * bits not specified by the mask. */
495  set->pollinfo[entry].events =
496  (set->pollinfo[entry].events & ~mask) | (events & mask);
497 
498  /* If poller is currently scanning the array, then change the
499  * revents field so that the callback function will not be called
500  * for events we should no longer listen for. The idea is the
501  * same as for the events field, except that we only turn bits off. */
502  if (set->scanning) {
503  set->pollinfo[entry].revents =
504  set->pollinfo[entry].revents & (events | ~mask);
505  }
506 
507  time(&set->times[entry]);
508 }
509 
510 void fdset_unregister(FDSet *set, int fd)
511 {
512  int entry;
513 
514  gw_assert(set != NULL);
515 
516  if (gwthread_self() != set->poll_thread) {
517  struct action *action;
518 
520  action->fd = fd;
521  submit_action(set, action);
522  return;
523  }
524 
525  /* Remove the entry from the pollinfo array */
526 
527  entry = find_entry(set, fd);
528  if (entry < 0) {
529  warning(0, "fdset_listen called on unregistered fd %d.", fd);
530  return;
531  }
532 
533  if (entry == set->entries - 1) {
534  /* It's the last entry. We can safely remove it even while
535  * the array is being scanned, because the scan checks set->entries. */
536  set->entries--;
537  } else if (set->scanning) {
538  /* We can't remove entries because the array is being
539  * scanned. Mark it as deleted. */
540  set->pollinfo[entry].fd = -1;
541  set->deleted_entries++;
542  } else {
543  remove_entry(set, entry);
544  }
545 }
546 
548 {
549  gw_assert(set != NULL);
550 
551  if (gwthread_self() != set->poll_thread) {
552  struct action *action;
553 
556  submit_action(set, action);
557  return;
558  }
559  set->timeout = timeout;
560 }
void error(int err, const char *fmt,...)
Definition: log.c:648
static struct action * action_create(int type)
Definition: fdset.c:155
long gwthread_self(void)
void fdset_listen(FDSet *set, int fd, int mask, int events)
Definition: fdset.c:470
gw_assert(wtls_machine->packet_to_send !=NULL)
enum action::@57 type
void gwlist_append(List *list, void *item)
Definition: list.c:179
int events
Definition: fdset.c:145
static int find_entry(FDSet *set, int fd)
Definition: fdset.c:269
Definition: fdset.c:140
void gwlist_produce(List *list, void *item)
Definition: list.c:411
void gwthread_join(long thread)
int size
Definition: fdset.c:84
long gwlist_len(List *list)
Definition: list.c:166
int type
Definition: smsc_cimd2.c:215
int scanning
Definition: fdset.c:112
time_t * times
Definition: fdset.c:88
static void remove_deleted_entries(FDSet *set)
Definition: fdset.c:297
void fdset_unregister(FDSet *set, int fd)
Definition: fdset.c:510
int gwthread_poll(struct pollfd *fds, long numfds, double timeout)
int fd
Definition: fdset.c:143
void fdset_register(FDSet *set, int fd, int events, fdset_callback_t callback, void *data)
Definition: fdset.c:425
fdset_callback_t ** callbacks
Definition: fdset.c:96
List * actions
Definition: fdset.c:126
struct pollfd * pollinfo
Definition: fdset.c:83
long timeout
Definition: fdset.c:148
void * gwlist_extract_first(List *list)
Definition: list.c:305
int deleted_entries
Definition: fdset.c:118
void ** datafields
Definition: fdset.c:97
static void submit_action(FDSet *set, struct action *action)
Definition: fdset.c:191
void fdset_callback_t(int fd, int revents, void *data)
Definition: fdset.h:73
static int action(int hex)
void warning(int err, const char *fmt,...)
Definition: log.c:660
static void action_destroy_item(void *action)
Definition: fdset.c:181
Definition: gwpoll.h:84
long timeout
Definition: fdset.c:91
#define POLLERR
Definition: gwpoll.h:96
#define gwthread_create(func, arg)
Definition: gwthread.h:90
void gwthread_sleep(double seconds)
void fdset_destroy(FDSet *set)
Definition: fdset.c:398
static void remove_entry(FDSet *set, int entry)
Definition: fdset.c:284
fdset_callback_t * callback
Definition: fdset.c:146
void * gwlist_consume(List *list)
Definition: list.c:427
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:726
#define panic
Definition: log.h:87
void gwthread_wakeup(long thread)
List * done
Definition: fdset.c:151
int entries
Definition: fdset.c:85
#define gwlist_create()
Definition: list.h:136
Definition: fdset.c:70
static void poller(void *arg)
Definition: fdset.c:318
static void submit_action_nosync(FDSet *set, struct action *action)
Definition: fdset.c:216
void fdset_set_timeout(FDSet *set, long timeout)
Definition: fdset.c:547
void gwlist_add_producer(List *list)
Definition: list.c:383
FDSet * fdset_create_real(long timeout)
Definition: fdset.c:368
void * data
Definition: fdset.c:147
int mask
Definition: fdset.c:144
Definition: list.c:102
long poll_thread
Definition: fdset.c:76
static int handle_action(FDSet *set, struct action *action)
Definition: fdset.c:226
static void action_destroy(struct action *action)
Definition: fdset.c:171
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.