00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067 #include <errno.h>
00068 #include <stdlib.h>
00069 #include <stdio.h>
00070 #include <time.h>
00071 #include <string.h>
00072 #include <sys/time.h>
00073 #include <sys/types.h>
00074 #include <sys/socket.h>
00075 #include <unistd.h>
00076 #include <signal.h>
00077
00078 #include "gwlib/gwlib.h"
00079 #include "msg.h"
00080 #include "bearerbox.h"
00081 #include "bb_smscconn_cb.h"
00082
00083 #define SMSBOX_MAX_PENDING 100
00084
00085
00086
00087 extern volatile sig_atomic_t bb_status;
00088 extern volatile sig_atomic_t restart;
00089 extern List *incoming_sms;
00090 extern List *outgoing_sms;
00091 extern List *incoming_wdp;
00092 extern List *outgoing_wdp;
00093
00094 extern List *flow_threads;
00095 extern List *suspended;
00096
00097
00098 extern long max_incoming_sms_qlength;
00099
00100
00101
00102
00103 static volatile sig_atomic_t smsbox_running;
00104 static volatile sig_atomic_t wapbox_running;
00105 static List *wapbox_list;
00106 static List *smsbox_list;
00107 static RWLock *smsbox_list_rwlock;
00108
00109
00110 static Dict *smsbox_by_id;
00111 static Dict *smsbox_by_smsc;
00112 static Dict *smsbox_by_receiver;
00113 static Dict *smsbox_by_smsc_receiver;
00114
00115 static long smsbox_port;
00116 static int smsbox_port_ssl;
00117 static long wapbox_port;
00118 static int wapbox_port_ssl;
00119
00120
00121 static long smsbox_max_pending;
00122
00123 static Octstr *box_allow_ip;
00124 static Octstr *box_deny_ip;
00125
00126
00127 static Counter *boxid;
00128
00129
00130 static long sms_dequeue_thread;
00131
00132
00133 typedef struct _boxc {
00134 Connection *conn;
00135 int is_wap;
00136 long id;
00137 int load;
00138 time_t connect_time;
00139 Octstr *client_ip;
00140 List *incoming;
00141 List *retry;
00142 List *outgoing;
00143 Dict *sent;
00144 Semaphore *pending;
00145 volatile sig_atomic_t alive;
00146 Octstr *boxc_id;
00147
00148 volatile int routable;
00149 } Boxc;
00150
00151
00152
00153 static void sms_to_smsboxes(void *arg);
00154 static int send_msg(Boxc *boxconn, Msg *pmsg);
00155 static void boxc_sent_push(Boxc*, Msg*);
00156 static void boxc_sent_pop(Boxc*, Msg*, Msg**);
00157
00158
00159
00160
00161
00162
00163 static Msg *read_from_box(Boxc *boxconn)
00164 {
00165 int ret;
00166 Octstr *pack;
00167 Msg *msg;
00168
00169 pack = NULL;
00170 while (bb_status != BB_DEAD && boxconn->alive) {
00171
00172 pack = conn_read_withlen(boxconn->conn);
00173 gw_claim_area(pack);
00174 if (pack != NULL)
00175 break;
00176 if (conn_error(boxconn->conn)) {
00177 info(0, "Read error when reading from box <%s>, disconnecting",
00178 octstr_get_cstr(boxconn->client_ip));
00179 return NULL;
00180 }
00181 if (conn_eof(boxconn->conn)) {
00182 info(0, "Connection closed by the box <%s>",
00183 octstr_get_cstr(boxconn->client_ip));
00184 return NULL;
00185 }
00186
00187 ret = conn_wait(boxconn->conn, -1.0);
00188 if (ret < 0) {
00189 error(0, "Connection to box <%s> broke.",
00190 octstr_get_cstr(boxconn->client_ip));
00191 return NULL;
00192 }
00193 }
00194
00195 if (pack == NULL)
00196 return NULL;
00197
00198 msg = msg_unpack(pack);
00199 octstr_destroy(pack);
00200
00201 if (msg == NULL)
00202 error(0, "Failed to unpack data!");
00203 return msg;
00204 }
00205
00206
00207
00208
00209
00210
00211 static void deliver_sms_to_queue(Msg *msg, Boxc *conn)
00212 {
00213 Msg *mack;
00214 int rc;
00215
00216
00217
00218
00219
00220 mack = msg_create(ack);
00221 gw_assert(mack != NULL);
00222 uuid_copy(mack->ack.id, msg->sms.id);
00223 mack->ack.time = msg->sms.time;
00224
00225 store_save(msg);
00226
00227 rc = smsc2_rout(msg, 0);
00228 switch(rc) {
00229 case SMSCCONN_SUCCESS:
00230 mack->ack.nack = ack_success;
00231 break;
00232 case SMSCCONN_QUEUED:
00233 mack->ack.nack = ack_buffered;
00234 break;
00235 case SMSCCONN_FAILED_DISCARDED:
00236 case SMSCCONN_FAILED_QFULL:
00237 warning(0, "Message rejected by bearerbox, %s!",
00238 (rc == SMSCCONN_FAILED_DISCARDED) ? "no router" : "queue full");
00239
00240
00241
00242
00243 store_save_ack(msg, (rc == SMSCCONN_FAILED_QFULL ? ack_failed_tmp : ack_failed));
00244 mack->ack.nack = (rc == SMSCCONN_FAILED_QFULL ? ack_failed_tmp : ack_failed);
00245
00246
00247 msg_destroy(msg);
00248 break;
00249 }
00250
00251
00252 send_msg(conn, mack);
00253 msg_destroy(mack);
00254 }
00255
00256
00257 static void boxc_receiver(void *arg)
00258 {
00259 Boxc *conn = arg;
00260 Msg *msg, *mack;
00261
00262
00263 while (bb_status != BB_DEAD && conn->alive) {
00264
00265 gwlist_consume(suspended);
00266
00267 msg = read_from_box(conn);
00268
00269 if (msg == NULL) {
00270 conn->alive = 0;
00271 break;
00272 }
00273
00274
00275 if ((bb_status == BB_SHUTDOWN || bb_status == BB_DEAD) && msg_type(msg) == sms) {
00276 mack = msg_create(ack);
00277 uuid_copy(mack->ack.id, msg->sms.id);
00278 mack->ack.time = msg->sms.time;
00279 mack->ack.nack = ack_failed_tmp;
00280 msg_destroy(msg);
00281 send_msg(conn, mack);
00282 msg_destroy(mack);
00283 continue;
00284 }
00285
00286 if (msg_type(msg) == sms && conn->is_wap == 0) {
00287 debug("bb.boxc", 0, "boxc_receiver: sms received");
00288
00289
00290 deliver_sms_to_queue(msg, conn);
00291
00292 if (conn->routable == 0) {
00293 conn->routable = 1;
00294
00295 gwthread_wakeup(sms_dequeue_thread);
00296 }
00297 } else if (msg_type(msg) == wdp_datagram && conn->is_wap) {
00298 debug("bb.boxc", 0, "boxc_receiver: got wdp from wapbox");
00299
00300
00301
00302 gwlist_produce(conn->outgoing, msg);
00303
00304 } else if (msg_type(msg) == sms && conn->is_wap) {
00305 debug("bb.boxc", 0, "boxc_receiver: got sms from wapbox");
00306
00307
00308 deliver_sms_to_queue(msg, conn);
00309
00310 if (conn->routable == 0) {
00311 conn->routable = 1;
00312
00313 gwthread_wakeup(sms_dequeue_thread);
00314 }
00315 } else {
00316 if (msg_type(msg) == heartbeat) {
00317 if (msg->heartbeat.load != conn->load)
00318 debug("bb.boxc", 0, "boxc_receiver: heartbeat with "
00319 "load value %ld received", msg->heartbeat.load);
00320 conn->load = msg->heartbeat.load;
00321 }
00322 else if (msg_type(msg) == ack) {
00323 if (msg->ack.nack == ack_failed_tmp) {
00324 Msg *orig;
00325 boxc_sent_pop(conn, msg, &orig);
00326 if (orig != NULL)
00327 gwlist_append(conn->retry, orig);
00328 } else {
00329 boxc_sent_pop(conn, msg, NULL);
00330 store_save(msg);
00331 }
00332 debug("bb.boxc", 0, "boxc_receiver: got ack");
00333 }
00334
00335 else if (msg_type(msg) == admin && msg->admin.command == cmd_identify) {
00336
00337
00338
00339
00340
00341 if (msg->admin.boxc_id != NULL) {
00342
00343
00344 if (conn->boxc_id != NULL) {
00345 dict_remove(smsbox_by_id, msg->admin.boxc_id);
00346 octstr_destroy(conn->boxc_id);
00347 }
00348
00349 conn->boxc_id = msg->admin.boxc_id;
00350 msg->admin.boxc_id = NULL;
00351
00352
00353
00354 dict_put(smsbox_by_id, conn->boxc_id, conn);
00355 debug("bb.boxc", 0, "boxc_receiver: got boxc_id <%s> from <%s>",
00356 octstr_get_cstr(conn->boxc_id),
00357 octstr_get_cstr(conn->client_ip));
00358 }
00359
00360 conn->routable = 1;
00361
00362 gwthread_wakeup(sms_dequeue_thread);
00363 }
00364 else
00365 warning(0, "boxc_receiver: unknown msg received from <%s>, "
00366 "ignored", octstr_get_cstr(conn->client_ip));
00367 msg_destroy(msg);
00368 }
00369 }
00370 }
00371
00372
00373
00374
00375
00376
00377 static int send_msg(Boxc *boxconn, Msg *pmsg)
00378 {
00379 Octstr *pack;
00380
00381 pack = msg_pack(pmsg);
00382
00383 if (pack == NULL)
00384 return -1;
00385
00386 if (boxconn->boxc_id != NULL)
00387 debug("bb.boxc", 0, "send_msg: sending msg to boxc: <%s>",
00388 octstr_get_cstr(boxconn->boxc_id));
00389 else
00390 debug("bb.boxc", 0, "send_msg: sending msg to box: <%s>",
00391 octstr_get_cstr(boxconn->client_ip));
00392
00393 if (conn_write_withlen(boxconn->conn, pack) == -1) {
00394 error(0, "Couldn't write Msg to box <%s>, disconnecting",
00395 octstr_get_cstr(boxconn->client_ip));
00396 octstr_destroy(pack);
00397 return -1;
00398 }
00399
00400 octstr_destroy(pack);
00401 return 0;
00402 }
00403
00404
00405 static void boxc_sent_push(Boxc *conn, Msg *m)
00406 {
00407 Octstr *os;
00408 char id[UUID_STR_LEN + 1];
00409
00410 if (conn->is_wap || !conn->sent || !m || msg_type(m) != sms)
00411 return;
00412
00413 uuid_unparse(m->sms.id, id);
00414 os = octstr_create(id);
00415 dict_put(conn->sent, os, msg_duplicate(m));
00416 semaphore_down(conn->pending);
00417 octstr_destroy(os);
00418 }
00419
00420
00421
00422
00423
00424
00425 static void boxc_sent_pop(Boxc *conn, Msg *m, Msg **orig)
00426 {
00427 Octstr *os;
00428 char id[UUID_STR_LEN + 1];
00429 Msg *msg;
00430
00431 if (conn->is_wap || !conn->sent || !m || (msg_type(m) != ack && msg_type(m) != sms))
00432 return;
00433
00434 if (orig != NULL)
00435 *orig = NULL;
00436
00437 uuid_unparse((msg_type(m) == sms ? m->sms.id : m->ack.id), id);
00438 os = octstr_create(id);
00439 msg = dict_remove(conn->sent, os);
00440 octstr_destroy(os);
00441 if (!msg) {
00442 error(0, "BOXC: Got ack for nonexistend message!");
00443 msg_dump(m, 0);
00444 return;
00445 }
00446 semaphore_up(conn->pending);
00447 if (orig == NULL)
00448 msg_destroy(msg);
00449 else
00450 *orig = msg;
00451 }
00452
00453
00454 static void boxc_sender(void *arg)
00455 {
00456 Msg *msg;
00457 Boxc *conn = arg;
00458
00459 gwlist_add_producer(flow_threads);
00460
00461 while (bb_status != BB_DEAD && conn->alive) {
00462
00463
00464
00465
00466
00467 conn_flush(conn->conn);
00468
00469 gwlist_consume(suspended);
00470
00471 if ((msg = gwlist_consume(conn->incoming)) == NULL) {
00472
00473 msg = msg_create(admin);
00474 msg->admin.command = restart ? cmd_restart : cmd_shutdown;
00475 send_msg(conn, msg);
00476 msg_destroy(msg);
00477 break;
00478 }
00479 if (msg_type(msg) == heartbeat) {
00480 debug("bb.boxc", 0, "boxc_sender: catch an heartbeat - we are alive");
00481 msg_destroy(msg);
00482 continue;
00483 }
00484 boxc_sent_push(conn, msg);
00485 if (!conn->alive || send_msg(conn, msg) == -1) {
00486
00487 boxc_sent_pop(conn, msg, NULL);
00488 gwlist_produce(conn->retry, msg);
00489 break;
00490 }
00491 msg_destroy(msg);
00492 debug("bb.boxc", 0, "boxc_sender: sent message to <%s>",
00493 octstr_get_cstr(conn->client_ip));
00494 }
00495
00496
00497
00498
00499 conn->routable = 0;
00500
00501 gwlist_remove_producer(flow_threads);
00502 }
00503
00504
00505
00506
00507
00508
00509 static Boxc *boxc_create(int fd, Octstr *ip, int ssl)
00510 {
00511 Boxc *boxc;
00512
00513 boxc = gw_malloc(sizeof(Boxc));
00514 boxc->is_wap = 0;
00515 boxc->load = 0;
00516 boxc->conn = conn_wrap_fd(fd, ssl);
00517 boxc->id = counter_increase(boxid);
00518 boxc->client_ip = ip;
00519 boxc->alive = 1;
00520 boxc->connect_time = time(NULL);
00521 boxc->boxc_id = NULL;
00522 boxc->routable = 0;
00523 return boxc;
00524 }
00525
00526 static void boxc_destroy(Boxc *boxc)
00527 {
00528 if (boxc == NULL)
00529 return;
00530
00531
00532
00533 if (boxc->conn)
00534 conn_destroy(boxc->conn);
00535 octstr_destroy(boxc->client_ip);
00536 octstr_destroy(boxc->boxc_id);
00537 gw_free(boxc);
00538 }
00539
00540
00541
00542 static Boxc *accept_boxc(int fd, int ssl)
00543 {
00544 Boxc *newconn;
00545 Octstr *ip;
00546
00547 int newfd;
00548 struct sockaddr_in client_addr;
00549 socklen_t client_addr_len;
00550
00551 client_addr_len = sizeof(client_addr);
00552
00553 newfd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
00554 if (newfd < 0)
00555 return NULL;
00556
00557 ip = host_ip(client_addr);
00558
00559 if (is_allowed_ip(box_allow_ip, box_deny_ip, ip) == 0) {
00560 info(0, "Box connection tried from denied host <%s>, disconnected",
00561 octstr_get_cstr(ip));
00562 octstr_destroy(ip);
00563 close(newfd);
00564 return NULL;
00565 }
00566 newconn = boxc_create(newfd, ip, ssl);
00567
00568
00569
00570
00571
00572 #ifdef HAVE_LIBSSL
00573 if (ssl && !conn_get_ssl(newconn->conn))
00574 return NULL;
00575 #endif
00576
00577 info(0, "Client connected from <%s> %s", octstr_get_cstr(ip), ssl?"using SSL":"");
00578
00579
00580
00581 return newconn;
00582 }
00583
00584
00585
00586 static void run_smsbox(void *arg)
00587 {
00588 Boxc *newconn;
00589 long sender;
00590 Msg *msg;
00591 List *keys;
00592 Octstr *key;
00593
00594 gwlist_add_producer(flow_threads);
00595 newconn = arg;
00596 newconn->incoming = gwlist_create();
00597 gwlist_add_producer(newconn->incoming);
00598 newconn->retry = incoming_sms;
00599 newconn->outgoing = outgoing_sms;
00600 newconn->sent = dict_create(smsbox_max_pending, NULL);
00601 newconn->pending = semaphore_create(smsbox_max_pending);
00602
00603 sender = gwthread_create(boxc_sender, newconn);
00604 if (sender == -1) {
00605 error(0, "Failed to start a new thread, disconnecting client <%s>",
00606 octstr_get_cstr(newconn->client_ip));
00607 goto cleanup;
00608 }
00609
00610
00611
00612
00613
00614
00615 gw_rwlock_wrlock(smsbox_list_rwlock);
00616 gwlist_append(smsbox_list, newconn);
00617 gw_rwlock_unlock(smsbox_list_rwlock);
00618
00619 gwlist_add_producer(newconn->outgoing);
00620 boxc_receiver(newconn);
00621 gwlist_remove_producer(newconn->outgoing);
00622
00623
00624 gw_rwlock_wrlock(smsbox_list_rwlock);
00625 gwlist_delete_equal(smsbox_list, newconn);
00626 if (newconn->boxc_id) {
00627 dict_remove(smsbox_by_id, newconn->boxc_id);
00628 }
00629 gw_rwlock_unlock(smsbox_list_rwlock);
00630
00631
00632
00633
00634
00635 if (gwlist_producer_count(newconn->incoming) > 0)
00636 gwlist_remove_producer(newconn->incoming);
00637
00638
00639 if (dict_key_count(newconn->sent) >= smsbox_max_pending)
00640 semaphore_up(newconn->pending);
00641
00642 gwthread_join(sender);
00643
00644
00645 keys = dict_keys(newconn->sent);
00646 while((key = gwlist_extract_first(keys)) != NULL) {
00647 msg = dict_remove(newconn->sent, key);
00648 gwlist_produce(incoming_sms, msg);
00649 octstr_destroy(key);
00650 }
00651 gw_assert(gwlist_len(keys) == 0);
00652 gwlist_destroy(keys, octstr_destroy_item);
00653
00654
00655 while((msg = gwlist_extract_first(newconn->incoming)) != NULL) {
00656 gwlist_produce(incoming_sms, msg);
00657 }
00658
00659 cleanup:
00660 gw_assert(gwlist_len(newconn->incoming) == 0);
00661 gwlist_destroy(newconn->incoming, NULL);
00662 gw_assert(dict_key_count(newconn->sent) == 0);
00663 dict_destroy(newconn->sent);
00664 semaphore_destroy(newconn->pending);
00665 boxc_destroy(newconn);
00666
00667
00668 gwthread_wakeup(sms_dequeue_thread);
00669
00670 gwlist_remove_producer(flow_threads);
00671 }
00672
00673
00674
00675 static void run_wapbox(void *arg)
00676 {
00677 Boxc *newconn;
00678 List *newlist;
00679 long sender;
00680
00681 gwlist_add_producer(flow_threads);
00682 newconn = arg;
00683 newconn->is_wap = 1;
00684
00685
00686
00687
00688
00689
00690
00691 debug("bb", 0, "setting up systems for new wapbox");
00692
00693 newlist = gwlist_create();
00694
00695 gwlist_add_producer(newlist);
00696
00697 newconn->incoming = newlist;
00698 newconn->retry = incoming_wdp;
00699 newconn->outgoing = outgoing_wdp;
00700
00701 sender = gwthread_create(boxc_sender, newconn);
00702 if (sender == -1) {
00703 error(0, "Failed to start a new thread, disconnecting client <%s>",
00704 octstr_get_cstr(newconn->client_ip));
00705 goto cleanup;
00706 }
00707 gwlist_append(wapbox_list, newconn);
00708 gwlist_add_producer(newconn->outgoing);
00709 boxc_receiver(newconn);
00710
00711
00712
00713 gwlist_remove_producer(newconn->outgoing);
00714 gwlist_lock(wapbox_list);
00715 gwlist_delete_equal(wapbox_list, newconn);
00716 gwlist_unlock(wapbox_list);
00717
00718 while (gwlist_producer_count(newlist) > 0)
00719 gwlist_remove_producer(newlist);
00720
00721 newconn->alive = 0;
00722
00723 gwthread_join(sender);
00724
00725 cleanup:
00726 gw_assert(gwlist_len(newlist) == 0);
00727 gwlist_destroy(newlist, NULL);
00728 boxc_destroy(newconn);
00729
00730 gwlist_remove_producer(flow_threads);
00731 }
00732
00733
00734
00735
00736
00737
00738 typedef struct _addrpar {
00739 Octstr *address;
00740 int port;
00741 int wapboxid;
00742 } AddrPar;
00743
00744 static void ap_destroy(AddrPar *addr)
00745 {
00746 octstr_destroy(addr->address);
00747 gw_free(addr);
00748 }
00749
00750 static int cmp_route(void *ap, void *ms)
00751 {
00752 AddrPar *addr = ap;
00753 Msg *msg = ms;
00754
00755 if (msg->wdp_datagram.source_port == addr->port &&
00756 octstr_compare(msg->wdp_datagram.source_address, addr->address)==0)
00757 return 1;
00758
00759 return 0;
00760 }
00761
00762 static int cmp_boxc(void *bc, void *ap)
00763 {
00764 Boxc *boxc = bc;
00765 AddrPar *addr = ap;
00766
00767 if (boxc->id == addr->wapboxid) return 1;
00768 return 0;
00769 }
00770
00771 static Boxc *route_msg(List *route_info, Msg *msg)
00772 {
00773 AddrPar *ap;
00774 Boxc *conn, *best;
00775 int i, b, len;
00776
00777 ap = gwlist_search(route_info, msg, cmp_route);
00778 if (ap == NULL) {
00779 debug("bb.boxc", 0, "Did not find previous routing info for WDP, "
00780 "generating new");
00781 route:
00782
00783 if (gwlist_len(wapbox_list) == 0)
00784 return NULL;
00785
00786 gwlist_lock(wapbox_list);
00787
00788
00789
00790
00791
00792 len = gwlist_len(wapbox_list);
00793 b = gw_rand() % len;
00794 best = gwlist_get(wapbox_list, b);
00795
00796 for(i = 0; i < gwlist_len(wapbox_list); i++) {
00797 conn = gwlist_get(wapbox_list, (i+b) % len);
00798 if (conn != NULL && best != NULL)
00799 if (conn->load < best->load)
00800 best = conn;
00801 }
00802 if (best == NULL) {
00803 warning(0, "wapbox_list empty!");
00804 gwlist_unlock(wapbox_list);
00805 return NULL;
00806 }
00807 conn = best;
00808 conn->load++;
00809
00810 ap = gw_malloc(sizeof(AddrPar));
00811 ap->address = octstr_duplicate(msg->wdp_datagram.source_address);
00812 ap->port = msg->wdp_datagram.source_port;
00813 ap->wapboxid = conn->id;
00814 gwlist_produce(route_info, ap);
00815
00816 gwlist_unlock(wapbox_list);
00817 } else
00818 conn = gwlist_search(wapbox_list, ap, cmp_boxc);
00819
00820 if (conn == NULL) {
00821
00822
00823
00824 debug("bb.boxc", 0, "Old wapbox has disappeared, re-routing");
00825
00826 gwlist_delete_equal(route_info, ap);
00827 ap_destroy(ap);
00828 goto route;
00829 }
00830 return conn;
00831 }
00832
00833
00834
00835
00836
00837
00838 static void wdp_to_wapboxes(void *arg)
00839 {
00840 List *route_info;
00841 AddrPar *ap;
00842 Boxc *conn;
00843 Msg *msg;
00844 int i;
00845
00846 gwlist_add_producer(flow_threads);
00847 gwlist_add_producer(wapbox_list);
00848
00849 route_info = gwlist_create();
00850
00851
00852 while(bb_status != BB_DEAD) {
00853
00854 gwlist_consume(suspended);
00855
00856 if ((msg = gwlist_consume(incoming_wdp)) == NULL)
00857 break;
00858
00859 gw_assert(msg_type(msg) == wdp_datagram);
00860
00861 conn = route_msg(route_info, msg);
00862 if (conn == NULL) {
00863 warning(0, "Cannot route message, discard it");
00864 msg_destroy(msg);
00865 continue;
00866 }
00867 gwlist_produce(conn->incoming, msg);
00868 }
00869 debug("bb", 0, "wdp_to_wapboxes: destroying lists");
00870 while((ap = gwlist_extract_first(route_info)) != NULL)
00871 ap_destroy(ap);
00872
00873 gw_assert(gwlist_len(route_info) == 0);
00874 gwlist_destroy(route_info, NULL);
00875
00876 gwlist_lock(wapbox_list);
00877 for(i=0; i < gwlist_len(wapbox_list); i++) {
00878 conn = gwlist_get(wapbox_list, i);
00879 gwlist_remove_producer(conn->incoming);
00880 conn->alive = 0;
00881 }
00882 gwlist_unlock(wapbox_list);
00883
00884 gwlist_remove_producer(wapbox_list);
00885 gwlist_remove_producer(flow_threads);
00886 }
00887
00888
00889 static void wait_for_connections(int fd, void (*function) (void *arg),
00890 List *waited, int ssl)
00891 {
00892 int ret;
00893 int timeout = 10;
00894
00895 gw_assert(function != NULL);
00896
00897 while(bb_status != BB_DEAD) {
00898
00899
00900
00901
00902
00903
00904
00905 if (bb_status == BB_SHUTDOWN) {
00906 ret = gwlist_wait_until_nonempty(waited);
00907 if (ret == -1 || !timeout)
00908 break;
00909 else
00910 timeout--;
00911 }
00912
00913
00914 gwlist_consume(suspended);
00915
00916 ret = gwthread_pollfd(fd, POLLIN, 1.0);
00917 if (ret > 0) {
00918 Boxc *newconn = accept_boxc(fd, ssl);
00919 if (newconn != NULL) {
00920 gwthread_create(function, newconn);
00921 gwthread_sleep(1.0);
00922 } else {
00923 error(0, "Failed to create new boxc connection.");
00924 }
00925 } else if (ret < 0 && errno != EINTR && errno != EAGAIN)
00926 error(errno, "bb_boxc::wait_for_connections failed");
00927 }
00928 }
00929
00930
00931
00932 static void smsboxc_run(void *arg)
00933 {
00934 int fd;
00935 int port;
00936
00937 gwlist_add_producer(flow_threads);
00938 gwthread_wakeup(MAIN_THREAD_ID);
00939 port = (int) *((long *)arg);
00940
00941 fd = make_server_socket(port, NULL);
00942
00943
00944 if (fd < 0) {
00945 panic(0, "Could not open smsbox port %d", port);
00946 }
00947
00948
00949
00950
00951
00952
00953 wait_for_connections(fd, run_smsbox, incoming_sms, smsbox_port_ssl);
00954
00955 gwlist_remove_producer(smsbox_list);
00956
00957
00958 gwlist_remove_producer(outgoing_sms);
00959
00960
00961
00962
00963 while(gwlist_wait_until_nonempty(smsbox_list) == 1)
00964 gwthread_sleep(1.0);
00965
00966
00967 close(fd);
00968
00969 gwthread_wakeup(sms_dequeue_thread);
00970 gwthread_join(sms_dequeue_thread);
00971
00972 gwlist_destroy(smsbox_list, NULL);
00973 smsbox_list = NULL;
00974 gw_rwlock_destroy(smsbox_list_rwlock);
00975 smsbox_list_rwlock = NULL;
00976
00977
00978 dict_destroy(smsbox_by_id);
00979 smsbox_by_id = NULL;
00980 dict_destroy(smsbox_by_smsc);
00981 smsbox_by_smsc = NULL;
00982 dict_destroy(smsbox_by_receiver);
00983 smsbox_by_receiver = NULL;
00984 dict_destroy(smsbox_by_smsc_receiver);
00985 smsbox_by_smsc_receiver = NULL;
00986
00987 gwlist_remove_producer(flow_threads);
00988 }
00989
00990
00991 static void wapboxc_run(void *arg)
00992 {
00993 int fd, port;
00994
00995 gwlist_add_producer(flow_threads);
00996 gwthread_wakeup(MAIN_THREAD_ID);
00997 port = (int) *((long*)arg);
00998
00999 fd = make_server_socket(port, NULL);
01000
01001
01002 if (fd < 0) {
01003 panic(0, "Could not open wapbox port %d", port);
01004 }
01005
01006 wait_for_connections(fd, run_wapbox, incoming_wdp, wapbox_port_ssl);
01007
01008
01009
01010 gwlist_remove_producer(outgoing_wdp);
01011
01012
01013
01014
01015
01016 while(gwlist_wait_until_nonempty(wapbox_list) == 1)
01017 gwthread_sleep(1.0);
01018
01019
01020 while(gwlist_consume(wapbox_list)!=NULL)
01021 ;
01022
01023
01024 close(fd);
01025
01026 gwlist_destroy(wapbox_list, NULL);
01027 wapbox_list = NULL;
01028
01029 gwlist_remove_producer(flow_threads);
01030 }
01031
01032
01033
01034
01035
01036 static void init_smsbox_routes(Cfg *cfg)
01037 {
01038 CfgGroup *grp;
01039 List *list, *items;
01040 Octstr *boxc_id, *smsc_ids, *shortcuts;
01041 int i, j;
01042
01043 boxc_id = smsc_ids = shortcuts = NULL;
01044
01045 list = cfg_get_multi_group(cfg, octstr_imm("smsbox-route"));
01046
01047
01048 while (list && (grp = gwlist_extract_first(list)) != NULL) {
01049
01050 if ((boxc_id = cfg_get(grp, octstr_imm("smsbox-id"))) == NULL) {
01051 grp_dump(grp);
01052 panic(0,"'smsbox-route' group without valid 'smsbox-id' directive!");
01053 }
01054
01055
01056
01057
01058
01059
01060
01061
01062
01063
01064 smsc_ids = cfg_get(grp, octstr_imm("smsc-id"));
01065 shortcuts = cfg_get(grp, octstr_imm("shortcode"));
01066
01067
01068 if (smsc_ids && !shortcuts) {
01069
01070 items = octstr_split(smsc_ids, octstr_imm(";"));
01071 for (i = 0; i < gwlist_len(items); i++) {
01072 Octstr *item = gwlist_get(items, i);
01073 octstr_strip_blanks(item);
01074
01075 debug("bb.boxc",0,"Adding smsbox routing to id <%s> for smsc id <%s>",
01076 octstr_get_cstr(boxc_id), octstr_get_cstr(item));
01077
01078 if (!dict_put_once(smsbox_by_smsc, item, octstr_duplicate(boxc_id)))
01079 panic(0, "Routing for smsc-id <%s> already exists!",
01080 octstr_get_cstr(item));
01081 }
01082 gwlist_destroy(items, octstr_destroy_item);
01083 octstr_destroy(smsc_ids);
01084 }
01085 else if (!smsc_ids && shortcuts) {
01086
01087 items = octstr_split(shortcuts, octstr_imm(";"));
01088 for (i = 0; i < gwlist_len(items); i++) {
01089 Octstr *item = gwlist_get(items, i);
01090 octstr_strip_blanks(item);
01091
01092 debug("bb.boxc",0,"Adding smsbox routing to id <%s> for receiver no <%s>",
01093 octstr_get_cstr(boxc_id), octstr_get_cstr(item));
01094
01095 if (!dict_put_once(smsbox_by_receiver, item, octstr_duplicate(boxc_id)))
01096 panic(0, "Routing for receiver no <%s> already exists!",
01097 octstr_get_cstr(item));
01098 }
01099 gwlist_destroy(items, octstr_destroy_item);
01100 octstr_destroy(shortcuts);
01101 }
01102 else if (smsc_ids && shortcuts) {
01103
01104 items = octstr_split(shortcuts, octstr_imm(";"));
01105 for (i = 0; i < gwlist_len(items); i++) {
01106 List *subitems;
01107 Octstr *item = gwlist_get(items, i);
01108 octstr_strip_blanks(item);
01109 subitems = octstr_split(smsc_ids, octstr_imm(";"));
01110 for (j = 0; j < gwlist_len(subitems); j++) {
01111 Octstr *subitem = gwlist_get(subitems, j);
01112 octstr_strip_blanks(subitem);
01113
01114 debug("bb.boxc",0,"Adding smsbox routing to id <%s> "
01115 "for receiver no <%s> and smsc id <%s>",
01116 octstr_get_cstr(boxc_id), octstr_get_cstr(item),
01117 octstr_get_cstr(subitem));
01118
01119
01120 octstr_insert(subitem, item, 0);
01121 octstr_insert_char(subitem, octstr_len(item), ':');
01122 if (!dict_put_once(smsbox_by_smsc_receiver, subitem, octstr_duplicate(boxc_id)))
01123 panic(0, "Routing for receiver:smsc <%s> already exists!",
01124 octstr_get_cstr(subitem));
01125 }
01126 gwlist_destroy(subitems, octstr_destroy_item);
01127 }
01128 gwlist_destroy(items, octstr_destroy_item);
01129 octstr_destroy(shortcuts);
01130 }
01131 octstr_destroy(boxc_id);
01132 }
01133
01134 gwlist_destroy(list, NULL);
01135 }
01136
01137
01138
01139
01140
01141
01142
01143
01144 int smsbox_start(Cfg *cfg)
01145 {
01146 CfgGroup *grp;
01147
01148 if (smsbox_running) return -1;
01149
01150 debug("bb", 0, "starting smsbox connection module");
01151
01152 grp = cfg_get_single_group(cfg, octstr_imm("core"));
01153 if (cfg_get_integer(&smsbox_port, grp, octstr_imm("smsbox-port")) == -1) {
01154 error(0, "Missing smsbox-port variable, cannot start smsboxes");
01155 return -1;
01156 }
01157 #ifdef HAVE_LIBSSL
01158 cfg_get_bool(&smsbox_port_ssl, grp, octstr_imm("smsbox-port-ssl"));
01159 #endif
01160
01161 if (smsbox_port_ssl)
01162 debug("bb", 0, "smsbox connection module is SSL-enabled");
01163
01164 if (cfg_get_integer(&smsbox_max_pending, grp, octstr_imm("smsbox-max-pending")) == -1) {
01165 smsbox_max_pending = SMSBOX_MAX_PENDING;
01166 info(0, "BOXC: 'smsbox-max-pending' not set, using default (%ld).", smsbox_max_pending);
01167 }
01168
01169 smsbox_list = gwlist_create();
01170 smsbox_list_rwlock = gw_rwlock_create();
01171 if (!boxid)
01172 boxid = counter_create();
01173
01174
01175 smsbox_by_id = dict_create(10, NULL);
01176 smsbox_by_smsc = dict_create(30, (void(*)(void *)) octstr_destroy);
01177 smsbox_by_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
01178 smsbox_by_smsc_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
01179
01180
01181 init_smsbox_routes(cfg);
01182
01183 gwlist_add_producer(outgoing_sms);
01184 gwlist_add_producer(smsbox_list);
01185
01186 smsbox_running = 1;
01187
01188 if ((sms_dequeue_thread = gwthread_create(sms_to_smsboxes, NULL)) == -1)
01189 panic(0, "Failed to start a new thread for smsbox routing");
01190
01191 if (gwthread_create(smsboxc_run, &smsbox_port) == -1)
01192 panic(0, "Failed to start a new thread for smsbox connections");
01193
01194 return 0;
01195 }
01196
01197
01198 int smsbox_restart(Cfg *cfg)
01199 {
01200 if (!smsbox_running) return -1;
01201
01202
01203
01204 return 0;
01205 }
01206
01207
01208
01209
01210
01211 int wapbox_start(Cfg *cfg)
01212 {
01213 CfgGroup *grp;
01214
01215 if (wapbox_running) return -1;
01216
01217 debug("bb", 0, "starting wapbox connection module");
01218
01219 grp = cfg_get_single_group(cfg, octstr_imm("core"));
01220
01221 if (cfg_get_integer(&wapbox_port, grp, octstr_imm("wapbox-port")) == -1) {
01222 error(0, "Missing wapbox-port variable, cannot start WAP");
01223 return -1;
01224 }
01225 #ifdef HAVE_LIBSSL
01226 cfg_get_bool(&wapbox_port_ssl, grp, octstr_imm("wapbox-port-ssl"));
01227 #endif
01228
01229 box_allow_ip = cfg_get(grp, octstr_imm("box-allow-ip"));
01230 if (box_allow_ip == NULL)
01231 box_allow_ip = octstr_create("");
01232 box_deny_ip = cfg_get(grp, octstr_imm("box-deny-ip"));
01233 if (box_deny_ip == NULL)
01234 box_deny_ip = octstr_create("");
01235 if (box_allow_ip != NULL && box_deny_ip == NULL)
01236 info(0, "Box connection allowed IPs defined without any denied...");
01237
01238 wapbox_list = gwlist_create();
01239 gwlist_add_producer(outgoing_wdp);
01240 if (!boxid)
01241 boxid = counter_create();
01242
01243 if (gwthread_create(wdp_to_wapboxes, NULL) == -1)
01244 panic(0, "Failed to start a new thread for wapbox routing");
01245
01246 if (gwthread_create(wapboxc_run, &wapbox_port) == -1)
01247 panic(0, "Failed to start a new thread for wapbox connections");
01248
01249 wapbox_running = 1;
01250 return 0;
01251 }
01252
01253
01254 Octstr *boxc_status(int status_type)
01255 {
01256 Octstr *tmp;
01257 char *lb, *ws;
01258 int i, boxes, para = 0;
01259 time_t orig, t;
01260 Boxc *bi;
01261
01262 orig = time(NULL);
01263
01264
01265
01266
01267
01268
01269
01270 if ((lb = bb_status_linebreak(status_type))==NULL)
01271 return octstr_create("Un-supported format");
01272
01273 if (status_type == BBSTATUS_HTML)
01274 ws = " ";
01275 else if (status_type == BBSTATUS_TEXT)
01276 ws = " ";
01277 else
01278 ws = "";
01279
01280 if (status_type == BBSTATUS_HTML || status_type == BBSTATUS_WML)
01281 para = 1;
01282
01283 if (status_type == BBSTATUS_XML) {
01284 tmp = octstr_create ("");
01285 octstr_append_cstr(tmp, "<boxes>\n\t");
01286 }
01287 else
01288 tmp = octstr_format("%sBox connections:%s", para ? "<p>" : "", lb);
01289 boxes = 0;
01290
01291 if (wapbox_list) {
01292 gwlist_lock(wapbox_list);
01293 for(i=0; i < gwlist_len(wapbox_list); i++) {
01294 bi = gwlist_get(wapbox_list, i);
01295 if (bi->alive == 0)
01296 continue;
01297 t = orig - bi->connect_time;
01298 if (status_type == BBSTATUS_XML)
01299 octstr_format_append(tmp,
01300 "<box>\n\t\t<type>wapbox</type>\n\t\t<IP>%s</IP>\n"
01301 "\t\t<status>on-line %ldd %ldh %ldm %lds</status>\n"
01302 "\t\t<ssl>%s</ssl>\n\t</box>\n",
01303 octstr_get_cstr(bi->client_ip),
01304 t/3600/24, t/3600%24, t/60%60, t%60,
01305 #ifdef HAVE_LIBSSL
01306 conn_get_ssl(bi->conn) != NULL ? "yes" : "no"
01307 #else
01308 "not installed"
01309 #endif
01310 );
01311 else
01312 octstr_format_append(tmp,
01313 "%swapbox, IP %s (on-line %ldd %ldh %ldm %lds) %s %s",
01314 ws, octstr_get_cstr(bi->client_ip),
01315 t/3600/24, t/3600%24, t/60%60, t%60,
01316 #ifdef HAVE_LIBSSL
01317 conn_get_ssl(bi->conn) != NULL ? "using SSL" : "",
01318 #else
01319 "",
01320 #endif
01321 lb);
01322 boxes++;
01323 }
01324 gwlist_unlock(wapbox_list);
01325 }
01326 if (smsbox_list) {
01327 gw_rwlock_rdlock(smsbox_list_rwlock);
01328 for(i=0; i < gwlist_len(smsbox_list); i++) {
01329 bi = gwlist_get(smsbox_list, i);
01330 if (bi->alive == 0)
01331 continue;
01332 t = orig - bi->connect_time;
01333 if (status_type == BBSTATUS_XML)
01334 octstr_format_append(tmp, "<box>\n\t\t<type>smsbox</type>\n"
01335 "\t\t<id>%s</id>\n\t\t<IP>%s</IP>\n"
01336 "\t\t<queue>%ld</queue>\n"
01337 "\t\t<status>on-line %ldd %ldh %ldm %lds</status>\n"
01338 "\t\t<ssl>%s</ssl>\n\t</box>",
01339 (bi->boxc_id ? octstr_get_cstr(bi->boxc_id) : ""),
01340 octstr_get_cstr(bi->client_ip),
01341 gwlist_len(bi->incoming) + dict_key_count(bi->sent),
01342 t/3600/24, t/3600%24, t/60%60, t%60,
01343 #ifdef HAVE_LIBSSL
01344 conn_get_ssl(bi->conn) != NULL ? "yes" : "no"
01345 #else
01346 "not installed"
01347 #endif
01348 );
01349 else
01350 octstr_format_append(tmp, "%ssmsbox:%s, IP %s (%ld queued), (on-line %ldd %ldh %ldm %lds) %s %s",
01351 ws, (bi->boxc_id ? octstr_get_cstr(bi->boxc_id) : "(none)"),
01352 octstr_get_cstr(bi->client_ip), gwlist_len(bi->incoming) + dict_key_count(bi->sent),
01353 t/3600/24, t/3600%24, t/60%60, t%60,
01354 #ifdef HAVE_LIBSSL
01355 conn_get_ssl(bi->conn) != NULL ? "using SSL" : "",
01356 #else
01357 "",
01358 #endif
01359 lb);
01360 boxes++;
01361 }
01362 gw_rwlock_unlock(smsbox_list_rwlock);
01363 }
01364 if (boxes == 0 && status_type != BBSTATUS_XML) {
01365 octstr_destroy(tmp);
01366 tmp = octstr_format("%sNo boxes connected", para ? "<p>" : "");
01367 }
01368 if (para)
01369 octstr_append_cstr(tmp, "</p>");
01370 if (status_type == BBSTATUS_XML)
01371 octstr_append_cstr(tmp, "</boxes>\n");
01372 else
01373 octstr_append_cstr(tmp, "\n\n");
01374 return tmp;
01375 }
01376
01377
01378 int boxc_incoming_wdp_queue(void)
01379 {
01380 int i, q = 0;
01381 Boxc *boxc;
01382
01383 if (wapbox_list) {
01384 gwlist_lock(wapbox_list);
01385 for(i=0; i < gwlist_len(wapbox_list); i++) {
01386 boxc = gwlist_get(wapbox_list, i);
01387 q += gwlist_len(boxc->incoming);
01388 }
01389 gwlist_unlock(wapbox_list);
01390 }
01391 return q;
01392 }
01393
01394
01395 void boxc_cleanup(void)
01396 {
01397 octstr_destroy(box_allow_ip);
01398 octstr_destroy(box_deny_ip);
01399 box_allow_ip = NULL;
01400 box_deny_ip = NULL;
01401 counter_destroy(boxid);
01402 boxid = NULL;
01403 }
01404
01405
01406
01407
01408
01409
01410
01411
01412
01413
01414
01415 int route_incoming_to_boxc(Msg *msg)
01416 {
01417 Boxc *bc = NULL, *best = NULL;
01418 Octstr *s, *r, *rs;
01419 long len, b, i;
01420 int full_found = 0;
01421
01422 s = r = NULL;
01423 gw_assert(msg_type(msg) == sms);
01424
01425
01426
01427
01428
01429
01430
01431 gw_rwlock_rdlock(smsbox_list_rwlock);
01432 if (gwlist_len(smsbox_list) == 0) {
01433 gw_rwlock_unlock(smsbox_list_rwlock);
01434 warning(0, "smsbox_list empty!");
01435 if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
01436 gwlist_produce(incoming_sms, msg);
01437 return 0;
01438 }
01439 else
01440 return -1;
01441 }
01442
01443 if (octstr_len(msg->sms.boxc_id) > 0) {
01444
01445 bc = dict_get(smsbox_by_id, msg->sms.boxc_id);
01446 if (bc == NULL) {
01447
01448
01449
01450
01451 warning(0,"Could not route message to smsbox id <%s>, smsbox is gone!",
01452 octstr_get_cstr(msg->sms.boxc_id));
01453 }
01454 }
01455 else {
01456
01457
01458
01459
01460
01461 Octstr *os = octstr_format("%s:%s",
01462 octstr_get_cstr(msg->sms.receiver),
01463 octstr_get_cstr(msg->sms.smsc_id));
01464 s = (msg->sms.smsc_id ? dict_get(smsbox_by_smsc, msg->sms.smsc_id) : NULL);
01465 r = (msg->sms.receiver ? dict_get(smsbox_by_receiver, msg->sms.receiver) : NULL);
01466 rs = (os ? dict_get(smsbox_by_smsc_receiver, os) : NULL);
01467 octstr_destroy(os);
01468 bc = rs ? dict_get(smsbox_by_id, rs) :
01469 (r ? dict_get(smsbox_by_id, r) : (s ? dict_get(smsbox_by_id, s) : NULL));
01470 }
01471
01472
01473 if (bc != NULL) {
01474 if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(bc->incoming)) {
01475 gwlist_produce(bc->incoming, msg);
01476 gw_rwlock_unlock(smsbox_list_rwlock);
01477 return 1;
01478 }
01479 else {
01480 gw_rwlock_unlock(smsbox_list_rwlock);
01481 return -1;
01482 }
01483 }
01484 else if (s != NULL || r != NULL || octstr_len(msg->sms.boxc_id) > 0) {
01485 gw_rwlock_unlock(smsbox_list_rwlock);
01486
01487
01488
01489
01490
01491 if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
01492 gwlist_produce(incoming_sms, msg);
01493 return 0;
01494 }
01495 else
01496 return -1;
01497 }
01498
01499
01500
01501
01502
01503
01504
01505
01506
01507
01508 len = gwlist_len(smsbox_list);
01509 b = gw_rand() % len;
01510
01511 for(i = 0; i < gwlist_len(smsbox_list); i++) {
01512 bc = gwlist_get(smsbox_list, (i+b) % len);
01513
01514 if (bc->boxc_id != NULL || bc->routable == 0)
01515 bc = NULL;
01516
01517 if (bc != NULL && max_incoming_sms_qlength > 0 &&
01518 gwlist_len(bc->incoming) > max_incoming_sms_qlength) {
01519 full_found = 1;
01520 bc = NULL;
01521 }
01522
01523 if ((bc != NULL && best != NULL && bc->load < best->load) ||
01524 (bc != NULL && best == NULL)) {
01525 best = bc;
01526 }
01527 }
01528
01529 if (best != NULL) {
01530 best->load++;
01531 gwlist_produce(best->incoming, msg);
01532 }
01533
01534 gw_rwlock_unlock(smsbox_list_rwlock);
01535
01536 if (best == NULL && full_found == 0) {
01537 warning(0, "smsbox_list empty!");
01538 if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
01539 gwlist_produce(incoming_sms, msg);
01540 return 0;
01541 }
01542 else
01543 return -1;
01544 }
01545 else if (best == NULL && full_found == 1)
01546 return -1;
01547
01548 return 1;
01549 }
01550
01551 static void sms_to_smsboxes(void *arg)
01552 {
01553 Msg *newmsg, *startmsg, *msg;
01554 long i, len;
01555 int ret = -1;
01556 Boxc *boxc;
01557
01558 gwlist_add_producer(flow_threads);
01559
01560 newmsg = startmsg = msg = NULL;
01561
01562 while(bb_status != BB_DEAD) {
01563
01564 if (newmsg == startmsg) {
01565
01566 if (gwlist_producer_count(smsbox_list) == 0)
01567 break;
01568
01569 if (ret == 0 || ret == -1) {
01570
01571 gwthread_sleep(60.0);
01572
01573
01574 if (gwlist_producer_count(smsbox_list) == 0 && gwlist_len(smsbox_list) == 0)
01575 break;
01576 }
01577 startmsg = msg = gwlist_consume(incoming_sms);
01578
01579 newmsg = NULL;
01580 }
01581 else {
01582 newmsg = msg = gwlist_consume(incoming_sms);
01583
01584
01585 if (newmsg == startmsg) {
01586 gwlist_insert(incoming_sms, 0, msg);
01587 continue;
01588 }
01589 }
01590
01591 if (msg == NULL)
01592 break;
01593
01594 gw_assert(msg_type(msg) == sms);
01595
01596
01597
01598
01599 ret = route_incoming_to_boxc(msg);
01600 if (ret == 1)
01601 startmsg = newmsg = NULL;
01602 else if (ret == -1) {
01603 gwlist_produce(incoming_sms, msg);
01604 }
01605 }
01606
01607 gw_rwlock_rdlock(smsbox_list_rwlock);
01608 len = gwlist_len(smsbox_list);
01609 for (i=0; i < len; i++) {
01610 boxc = gwlist_get(smsbox_list, i);
01611 gwlist_remove_producer(boxc->incoming);
01612 }
01613 gw_rwlock_unlock(smsbox_list_rwlock);
01614
01615 gwlist_remove_producer(flow_threads);
01616 }
See file LICENSE for details about the license agreement for using,
modifying, copying or deriving work from this software.