00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063 #include <unistd.h>
00064 #include <errno.h>
00065 #include <pthread.h>
00066 #include <signal.h>
00067 #include <string.h>
00068
00069 #include "gwlib/gwlib.h"
00070
00071 #ifdef HAVE_LIBSSL
00072 #include <openssl/err.h>
00073 #endif
00074
00075
00076
00077
00078 #define THREADTABLE_SIZE 1024
00079
00080 struct threadinfo
00081 {
00082 pthread_t self;
00083 const char *name;
00084 gwthread_func_t *func;
00085 long number;
00086 int wakefd_recv;
00087 int wakefd_send;
00088
00089
00090
00091 List *joiners;
00092 pid_t pid;
00093 };
00094
00095 struct new_thread_args
00096 {
00097 gwthread_func_t *func;
00098 void *arg;
00099 struct threadinfo *ti;
00100
00101 int failed;
00102 };
00103
00104
00105
00106 static struct threadinfo *threadtable[THREADTABLE_SIZE];
00107 #define THREAD(t) (threadtable[(t) % THREADTABLE_SIZE])
00108
00109
00110 static long active_threads = 0;
00111
00112
00113
00114
00115
00116 static long next_threadnumber;
00117
00118
00119
00120
00121
00122 static struct threadinfo mainthread;
00123
00124
00125
00126
00127 static pthread_key_t tsd_key;
00128
00129 static pthread_mutex_t threadtable_lock;
00130
00131 static void inline lock(void)
00132 {
00133 int ret;
00134
00135 ret = pthread_mutex_lock(&threadtable_lock);
00136 if (ret != 0) {
00137 panic(ret, "gwthread-pthread: could not lock thread table");
00138 }
00139 }
00140
00141 static void inline unlock(void)
00142 {
00143 int ret;
00144
00145 ret = pthread_mutex_unlock(&threadtable_lock);
00146 if (ret != 0) {
00147 panic(ret, "gwthread-pthread: could not unlock thread table");
00148 }
00149 }
00150
00151
00152
00153 static void flushpipe(int fd)
00154 {
00155 unsigned char buf[128];
00156 ssize_t bytes;
00157
00158 do {
00159 bytes = read(fd, buf, sizeof(buf));
00160 } while (bytes > 0);
00161 }
00162
00163
00164
00165
00166
00167 static long fill_threadinfo(pthread_t id, const char *name,
00168 gwthread_func_t *func,
00169 struct threadinfo *ti)
00170 {
00171 int pipefds[2];
00172 long first_try;
00173
00174 gw_assert(active_threads < THREADTABLE_SIZE);
00175
00176
00177 ti->self = id;
00178 ti->name = name;
00179 ti->func = func;
00180 ti->pid = -1;
00181 ti->wakefd_recv = -1;
00182 ti->wakefd_send = -1;
00183 ti->joiners = NULL;
00184 ti->number = -1;
00185
00186 if (pipe(pipefds) < 0) {
00187 error(errno, "cannot allocate wakeup pipe for new thread");
00188 return -1;
00189 }
00190 ti->wakefd_recv = pipefds[0];
00191 ti->wakefd_send = pipefds[1];
00192 socket_set_blocking(ti->wakefd_recv, 0);
00193 socket_set_blocking(ti->wakefd_send, 0);
00194
00195
00196 first_try = next_threadnumber;
00197 do {
00198 ti->number = next_threadnumber++;
00199
00200 if (ti->number == first_try + THREADTABLE_SIZE) {
00201 error(0, "Cannot have more than %d active threads", THREADTABLE_SIZE);
00202 ti->number = -1;
00203 return -1;
00204 }
00205 } while (THREAD(ti->number) != NULL);
00206 THREAD(ti->number) = ti;
00207
00208 active_threads++;
00209
00210 return ti->number;
00211 }
00212
00213
00214 static struct threadinfo *getthreadinfo(void)
00215 {
00216 struct threadinfo *threadinfo;
00217
00218 threadinfo = pthread_getspecific(tsd_key);
00219 if (threadinfo == NULL) {
00220 panic(0, "gwthread-pthread: pthread_getspecific failed");
00221 } else {
00222 gw_assert(pthread_equal(threadinfo->self, pthread_self()));
00223 }
00224 return threadinfo;
00225 }
00226
00227
00228
00229
00230
00231
00232 static void alert_joiners(void)
00233 {
00234 struct threadinfo *threadinfo;
00235 pthread_cond_t *joiner_cond;
00236
00237 threadinfo = getthreadinfo();
00238 if (!threadinfo->joiners)
00239 return;
00240 while ((joiner_cond = gwlist_extract_first(threadinfo->joiners))) {
00241 pthread_cond_broadcast(joiner_cond);
00242 }
00243 }
00244
00245 static void delete_threadinfo(void)
00246 {
00247 struct threadinfo *threadinfo;
00248
00249 threadinfo = getthreadinfo();
00250 gwlist_destroy(threadinfo->joiners, NULL);
00251 if (threadinfo->wakefd_recv != -1)
00252 close(threadinfo->wakefd_recv);
00253 if (threadinfo->wakefd_send != -1)
00254 close(threadinfo->wakefd_send);
00255 if (threadinfo->number != -1) {
00256 THREAD(threadinfo->number) = NULL;
00257 active_threads--;
00258 }
00259 gw_assert(threadinfo != &mainthread);
00260 gw_free(threadinfo);
00261 }
00262
00263 void gwthread_init(void)
00264 {
00265 int ret;
00266 int i;
00267
00268 pthread_mutex_init(&threadtable_lock, NULL);
00269
00270 ret = pthread_key_create(&tsd_key, NULL);
00271 if (ret != 0) {
00272 panic(ret, "gwthread-pthread: pthread_key_create failed");
00273 }
00274
00275 for (i = 0; i < THREADTABLE_SIZE; i++) {
00276 threadtable[i] = NULL;
00277 }
00278 active_threads = 0;
00279
00280
00281 if (fill_threadinfo(pthread_self(), "main", NULL, &mainthread) == -1)
00282 panic(0, "gwthread-pthread: unable to fill main threadinfo.");
00283
00284 ret = pthread_setspecific(tsd_key, &mainthread);
00285 if (ret != 0)
00286 panic(ret, "gwthread-pthread: pthread_setspecific failed");
00287 }
00288
00289
00290
00291
00292 void gwthread_shutdown(void)
00293 {
00294 int ret;
00295 int running;
00296 int i;
00297
00298
00299 gw_assert(threadtable[0] != NULL);
00300 lock();
00301
00302 running = 0;
00303
00304
00305 for (i = 1; i < THREADTABLE_SIZE; i++) {
00306 if (threadtable[i] != NULL) {
00307 debug("gwlib", 0, "Thread %ld (%s) still running",
00308 threadtable[i]->number,
00309 threadtable[i]->name);
00310 running++;
00311 }
00312 }
00313 unlock();
00314
00315
00316 if (running)
00317 return;
00318
00319 ret = pthread_mutex_destroy(&threadtable_lock);
00320 if (ret != 0) {
00321 warning(ret, "cannot destroy threadtable lock");
00322 }
00323
00324
00325
00326 }
00327
00328 static void *new_thread(void *arg)
00329 {
00330 int ret;
00331 struct new_thread_args *p = arg;
00332
00333
00334
00335 lock();
00336
00337 if (p->failed) {
00338
00339
00340 gw_free(p);
00341 delete_threadinfo();
00342 unlock();
00343 return NULL;
00344 }
00345 unlock();
00346
00347
00348
00349
00350
00351
00352
00353 ret = pthread_setspecific(tsd_key, p->ti);
00354 if (ret != 0) {
00355 panic(ret, "gwthread-pthread: pthread_setspecific failed");
00356 }
00357
00358 p->ti->pid = getpid();
00359 debug("gwlib.gwthread", 0, "Thread %ld (%s) maps to pid %ld.",
00360 p->ti->number, p->ti->name, (long) p->ti->pid);
00361
00362 (p->func)(p->arg);
00363
00364 lock();
00365 debug("gwlib.gwthread", 0, "Thread %ld (%s) terminates.",
00366 p->ti->number, p->ti->name);
00367 alert_joiners();
00368 #ifdef HAVE_LIBSSL
00369
00370
00371 ERR_remove_state(gwthread_self());
00372 #endif
00373
00374
00375 gw_free(p);
00376 delete_threadinfo();
00377 unlock();
00378
00379 return NULL;
00380 }
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393 #if defined(DARWIN_OLD)
00394 static int pthread_sigmask();
00395 #endif
00396
00397 static int block_user_signals(sigset_t *old_set_storage)
00398 {
00399 int ret;
00400 sigset_t block_signals;
00401
00402 ret = sigemptyset(&block_signals);
00403 if (ret != 0) {
00404 error(errno, "gwthread-pthread: Couldn't initialize signal set");
00405 return -1;
00406 }
00407 ret = sigaddset(&block_signals, SIGHUP);
00408 ret |= sigaddset(&block_signals, SIGTERM);
00409 ret |= sigaddset(&block_signals, SIGQUIT);
00410 ret |= sigaddset(&block_signals, SIGINT);
00411 if (ret != 0) {
00412 error(0, "gwthread-pthread: Couldn't add signal to signal set");
00413 return -1;
00414 }
00415 ret = pthread_sigmask(SIG_BLOCK, &block_signals, old_set_storage);
00416 if (ret != 0) {
00417 error(ret,
00418 "gwthread-pthread: Couldn't disable signals for thread creation");
00419 return -1;
00420 }
00421 return 0;
00422 }
00423
00424 static void restore_user_signals(sigset_t *old_set)
00425 {
00426 int ret;
00427
00428 ret = pthread_sigmask(SIG_SETMASK, old_set, NULL);
00429 if (ret != 0) {
00430 panic(ret, "gwthread-pthread: Couldn't restore signal set.");
00431 }
00432 }
00433
00434
00435 static long spawn_thread(gwthread_func_t *func, const char *name, void *arg)
00436 {
00437 int ret;
00438 pthread_t id;
00439 struct new_thread_args *p = NULL;
00440 long new_thread_id;
00441
00442
00443
00444
00445 p = gw_malloc(sizeof(*p));
00446 p->func = func;
00447 p->arg = arg;
00448 p->ti = gw_malloc(sizeof(*(p->ti)));
00449 p->failed = 0;
00450
00451
00452
00453
00454 lock();
00455
00456 if (active_threads >= THREADTABLE_SIZE) {
00457 unlock();
00458 warning(0, "Too many threads, could not create new thread.");
00459 gw_free(p);
00460 return -1;
00461 }
00462
00463 ret = pthread_create(&id, NULL, &new_thread, p);
00464 if (ret != 0) {
00465 unlock();
00466 error(ret, "Could not create new thread.");
00467 gw_free(p);
00468 return -1;
00469 }
00470 ret = pthread_detach(id);
00471 if (ret != 0) {
00472 error(ret, "Could not detach new thread.");
00473 }
00474
00475 new_thread_id = fill_threadinfo(id, name, func, p->ti);
00476 if (new_thread_id == -1)
00477 p->failed = 1;
00478 unlock();
00479
00480 if (new_thread_id != -1)
00481 debug("gwlib.gwthread", 0, "Started thread %ld (%s)", new_thread_id, name);
00482 else
00483 debug("gwlib.gwthread", 0, "Failed to start thread (%s)", name);
00484
00485 return new_thread_id;
00486 }
00487
00488 long gwthread_create_real(gwthread_func_t *func, const char *name, void *arg)
00489 {
00490 int sigtrick = 0;
00491 sigset_t old_signal_set;
00492 long thread_id;
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505 if (gwthread_self() == MAIN_THREAD_ID)
00506 sigtrick = block_user_signals(&old_signal_set) == 0;
00507
00508 thread_id = spawn_thread(func, name, arg);
00509
00510
00511
00512
00513
00514
00515 if (sigtrick)
00516 restore_user_signals(&old_signal_set);
00517
00518 return thread_id;
00519 }
00520
00521 void gwthread_join(long thread)
00522 {
00523 struct threadinfo *threadinfo;
00524 pthread_cond_t exit_cond;
00525 int ret;
00526
00527 gw_assert(thread >= 0);
00528
00529 lock();
00530 threadinfo = THREAD(thread);
00531 if (threadinfo == NULL || threadinfo->number != thread) {
00532
00533 unlock();
00534 return;
00535 }
00536
00537
00538
00539
00540 ret = pthread_cond_init(&exit_cond, NULL);
00541 if (ret != 0) {
00542 warning(ret, "gwthread_join: cannot create condition variable.");
00543 unlock();
00544 return;
00545 }
00546
00547 if (!threadinfo->joiners)
00548 threadinfo->joiners = gwlist_create();
00549 gwlist_append(threadinfo->joiners, &exit_cond);
00550
00551
00552
00553
00554 ret = pthread_cond_wait(&exit_cond, &threadtable_lock);
00555 unlock();
00556
00557 if (ret != 0)
00558 warning(ret, "gwthread_join: error in pthread_cond_wait");
00559
00560 pthread_cond_destroy(&exit_cond);
00561 }
00562
00563 void gwthread_join_all(void)
00564 {
00565 long i;
00566 long our_thread = gwthread_self();
00567
00568 for (i = 0; i < THREADTABLE_SIZE; ++i) {
00569 if (THREAD(our_thread) != THREAD(i))
00570 gwthread_join(i);
00571 }
00572 }
00573
00574 void gwthread_wakeup_all(void)
00575 {
00576 long i;
00577 long our_thread = gwthread_self();
00578
00579 for (i = 0; i < THREADTABLE_SIZE; ++i) {
00580 if (THREAD(our_thread) != THREAD(i))
00581 gwthread_wakeup(i);
00582 }
00583 }
00584
00585 void gwthread_join_every(gwthread_func_t *func)
00586 {
00587 struct threadinfo *ti;
00588 pthread_cond_t exit_cond;
00589 int ret;
00590 long i;
00591
00592 ret = pthread_cond_init(&exit_cond, NULL);
00593 if (ret != 0) {
00594 warning(ret, "gwthread_join_every: cannot create condition variable.");
00595 unlock();
00596 return;
00597 }
00598
00599
00600
00601
00602
00603
00604
00605 lock();
00606 for (i = 0; i < THREADTABLE_SIZE; ++i) {
00607 ti = THREAD(i);
00608 if (ti == NULL || ti->func != func)
00609 continue;
00610 debug("gwlib.gwthread", 0,
00611 "Waiting for %ld (%s) to terminate",
00612 ti->number, ti->name);
00613 if (!ti->joiners)
00614 ti->joiners = gwlist_create();
00615 gwlist_append(ti->joiners, &exit_cond);
00616 ret = pthread_cond_wait(&exit_cond, &threadtable_lock);
00617 if (ret != 0)
00618 warning(ret, "gwthread_join_all: error in pthread_cond_wait");
00619 }
00620 unlock();
00621
00622 pthread_cond_destroy(&exit_cond);
00623 }
00624
00625
00626 long gwthread_self(void)
00627 {
00628 struct threadinfo *threadinfo;
00629 threadinfo = pthread_getspecific(tsd_key);
00630 if (threadinfo)
00631 return threadinfo->number;
00632 else
00633 return -1;
00634 }
00635
00636
00637 long gwthread_self_pid(void)
00638 {
00639 struct threadinfo *threadinfo;
00640 threadinfo = pthread_getspecific(tsd_key);
00641 if (threadinfo && threadinfo->pid != -1)
00642 return (long) threadinfo->pid;
00643 else
00644 return (long) getpid();
00645 }
00646
00647 void gwthread_self_ids(long *tid, long *pid)
00648 {
00649 struct threadinfo *threadinfo;
00650 threadinfo = pthread_getspecific(tsd_key);
00651 if (threadinfo) {
00652 *tid = threadinfo->number;
00653 *pid = (threadinfo->pid != -1) ? threadinfo->pid : getpid();
00654 } else {
00655 *tid = -1;
00656 *pid = getpid();
00657 }
00658 }
00659
00660 void gwthread_wakeup(long thread)
00661 {
00662 unsigned char c = 0;
00663 struct threadinfo *threadinfo;
00664 int fd;
00665
00666 gw_assert(thread >= 0);
00667
00668 lock();
00669
00670 threadinfo = THREAD(thread);
00671 if (threadinfo == NULL || threadinfo->number != thread) {
00672 unlock();
00673 return;
00674 }
00675
00676 fd = threadinfo->wakefd_send;
00677 unlock();
00678
00679 write(fd, &c, 1);
00680 }
00681
00682 int gwthread_pollfd(int fd, int events, double timeout)
00683 {
00684 struct pollfd pollfd[2];
00685 struct threadinfo *threadinfo;
00686 int milliseconds;
00687 int ret;
00688
00689 threadinfo = getthreadinfo();
00690
00691 pollfd[0].fd = threadinfo->wakefd_recv;
00692 pollfd[0].events = POLLIN;
00693 pollfd[0].revents = 0;
00694
00695 pollfd[1].fd = fd;
00696 pollfd[1].events = events;
00697 pollfd[1].revents = 0;
00698
00699 milliseconds = timeout * 1000;
00700 if (milliseconds < 0)
00701 milliseconds = POLL_NOTIMEOUT;
00702
00703 ret = poll(pollfd, 2, milliseconds);
00704 if (ret < 0) {
00705 if (errno != EINTR)
00706 error(errno, "gwthread_pollfd: error in poll");
00707 return -1;
00708 }
00709
00710 if (pollfd[0].revents)
00711 flushpipe(pollfd[0].fd);
00712
00713 return pollfd[1].revents;
00714 }
00715
00716 int gwthread_poll(struct pollfd *fds, long numfds, double timeout)
00717 {
00718 struct pollfd *pollfds;
00719 struct threadinfo *threadinfo;
00720 int milliseconds;
00721 int ret;
00722
00723 threadinfo = getthreadinfo();
00724
00725
00726
00727
00728 pollfds = gw_malloc((numfds + 1) * sizeof(*pollfds));
00729 pollfds[0].fd = threadinfo->wakefd_recv;
00730 pollfds[0].events = POLLIN;
00731 pollfds[0].revents = 0;
00732 memcpy(pollfds + 1, fds, numfds * sizeof(*pollfds));
00733
00734 milliseconds = timeout * 1000;
00735 if (milliseconds < 0)
00736 milliseconds = POLL_NOTIMEOUT;
00737
00738 ret = poll(pollfds, numfds + 1, milliseconds);
00739 if (ret < 0) {
00740 if (errno != EINTR)
00741 error(errno, "gwthread_poll: error in poll");
00742 gw_free(pollfds);
00743 return -1;
00744 }
00745 if (pollfds[0].revents)
00746 flushpipe(pollfds[0].fd);
00747
00748
00749 memcpy(fds, pollfds + 1, numfds * sizeof(*pollfds));
00750 gw_free(pollfds);
00751
00752 return ret;
00753 }
00754
00755
00756 void gwthread_sleep(double seconds)
00757 {
00758 struct pollfd pollfd;
00759 struct threadinfo *threadinfo;
00760 int milliseconds;
00761 int ret;
00762
00763 threadinfo = getthreadinfo();
00764
00765 pollfd.fd = threadinfo->wakefd_recv;
00766 pollfd.events = POLLIN;
00767
00768 milliseconds = seconds * 1000;
00769 if (milliseconds < 0)
00770 milliseconds = POLL_NOTIMEOUT;
00771
00772 ret = poll(&pollfd, 1, milliseconds);
00773 if (ret < 0) {
00774 if (errno != EINTR && errno != EAGAIN) {
00775 warning(errno, "gwthread_sleep: error in poll");
00776 }
00777 }
00778 if (ret == 1) {
00779 flushpipe(pollfd.fd);
00780 }
00781 }
00782
00783
00784 void gwthread_sleep_micro(double dseconds)
00785 {
00786 fd_set fd_set_recv;
00787 struct threadinfo *threadinfo;
00788 int fd;
00789 int ret;
00790
00791 threadinfo = getthreadinfo();
00792 fd = threadinfo->wakefd_recv;
00793
00794 FD_ZERO(&fd_set_recv);
00795 FD_SET(fd, &fd_set_recv);
00796
00797 if (dseconds < 0) {
00798 ret = select(fd + 1, &fd_set_recv, NULL, NULL, NULL);
00799 } else {
00800 struct timeval timeout;
00801 timeout.tv_sec = dseconds;
00802 timeout.tv_usec = (dseconds - timeout.tv_sec) * 1000000;
00803
00804 ret = select(fd + 1, &fd_set_recv, NULL, NULL, &timeout);
00805 }
00806
00807 if (ret < 0) {
00808 if (errno != EINTR && errno != EAGAIN) {
00809 warning(errno, "gwthread_sleep_micro: error in select()");
00810 }
00811 }
00812
00813 if (FD_ISSET(fd, &fd_set_recv)) {
00814 flushpipe(fd);
00815 }
00816 }
00817
00818
00819 int gwthread_cancel(long thread)
00820 {
00821 struct threadinfo *threadinfo;
00822
00823 gw_assert(thread >= 0);
00824
00825 threadinfo = THREAD(thread);
00826 if (threadinfo == NULL || threadinfo->number != thread) {
00827 return -1;
00828 } else {
00829 return pthread_cancel(threadinfo->self);
00830 }
00831 }
00832
00833
00834 #ifndef BROKEN_PTHREADS
00835
00836
00837 int gwthread_shouldhandlesignal(int signal){
00838 return 1;
00839 }
00840 #else
00841
00842
00843 int gwthread_shouldhandlesignal(int signal){
00844 return (gwthread_self() == MAIN_THREAD_ID);
00845 }
00846 #endif
00847
00848 int gwthread_dumpsigmask(void) {
00849 sigset_t signal_set;
00850 int signum;
00851
00852
00853 if (pthread_sigmask(SIG_BLOCK, NULL, &signal_set) != 0) {
00854 warning(0, "gwthread_dumpsigmask: Couldn't get signal mask.");
00855 return -1;
00856 }
00857
00858
00859
00860 for (signum = 1; signum <= 32; signum++) {
00861 if (!sigismember(&signal_set, signum)) {
00862 debug("gwlib", 0,
00863 "gwthread_dumpsigmask: Signal Number %d will be caught.",
00864 signum);
00865 }
00866 }
00867 return 0;
00868 }
00869
00870
00871
00872 #if defined(DARWIN_OLD)
00873 static int pthread_sigmask()
00874 {
00875 return 0;
00876 }
00877 #endif
See file LICENSE for details about the license agreement for using,
modifying, copying or deriving work from this software.