Main Page | Alphabetical List | Data Structures | Directories | File List | Data Fields | Globals

bb_boxc.c

Go to the documentation of this file.
00001 /* ==================================================================== 
00002  * The Kannel Software License, Version 1.0 
00003  * 
00004  * Copyright (c) 2001-2008 Kannel Group  
00005  * Copyright (c) 1998-2001 WapIT Ltd.   
00006  * All rights reserved. 
00007  * 
00008  * Redistribution and use in source and binary forms, with or without 
00009  * modification, are permitted provided that the following conditions 
00010  * are met: 
00011  * 
00012  * 1. Redistributions of source code must retain the above copyright 
00013  *    notice, this list of conditions and the following disclaimer. 
00014  * 
00015  * 2. Redistributions in binary form must reproduce the above copyright 
00016  *    notice, this list of conditions and the following disclaimer in 
00017  *    the documentation and/or other materials provided with the 
00018  *    distribution. 
00019  * 
00020  * 3. The end-user documentation included with the redistribution, 
00021  *    if any, must include the following acknowledgment: 
00022  *       "This product includes software developed by the 
00023  *        Kannel Group (http://www.kannel.org/)." 
00024  *    Alternately, this acknowledgment may appear in the software itself, 
00025  *    if and wherever such third-party acknowledgments normally appear. 
00026  * 
00027  * 4. The names "Kannel" and "Kannel Group" must not be used to 
00028  *    endorse or promote products derived from this software without 
00029  *    prior written permission. For written permission, please  
00030  *    contact org@kannel.org. 
00031  * 
00032  * 5. Products derived from this software may not be called "Kannel", 
00033  *    nor may "Kannel" appear in their name, without prior written 
00034  *    permission of the Kannel Group. 
00035  * 
00036  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 
00037  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
00038  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
00039  * DISCLAIMED.  IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS 
00040  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,  
00041  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT  
00042  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR  
00043  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
00044  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE  
00045  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  
00046  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
00047  * ==================================================================== 
00048  * 
00049  * This software consists of voluntary contributions made by many 
00050  * individuals on behalf of the Kannel Group.  For more information on  
00051  * the Kannel Group, please see <http://www.kannel.org/>. 
00052  * 
00053  * Portions of this software are based upon software originally written at  
00054  * WapIT Ltd., Helsinki, Finland for the Kannel project.  
00055  */ 
00056 
00057 /*
00058  * bb_boxc.c : bearerbox box connection module
00059  *
00060  * handles start/restart/stop/suspend/die operations of the sms and
00061  * wapbox connections
00062  *
00063  * Kalle Marjola 2000 for project Kannel
00064  * Alexander Malysh (various fixes)
00065  */
00066 
00067 #include <errno.h>
00068 #include <stdlib.h>
00069 #include <stdio.h>
00070 #include <time.h>
00071 #include <string.h>
00072 #include <sys/time.h>
00073 #include <sys/types.h>
00074 #include <sys/socket.h>
00075 #include <unistd.h>
00076 #include <signal.h>
00077 
00078 #include "gwlib/gwlib.h"
00079 #include "msg.h"
00080 #include "bearerbox.h"
00081 #include "bb_smscconn_cb.h"
00082 
00083 #define SMSBOX_MAX_PENDING 100
00084 
00085 /* passed from bearerbox core */
00086 
00087 extern volatile sig_atomic_t bb_status;
00088 extern volatile sig_atomic_t restart;
00089 extern List *incoming_sms;
00090 extern List *outgoing_sms;
00091 extern List *incoming_wdp;
00092 extern List *outgoing_wdp;
00093 
00094 extern List *flow_threads;
00095 extern List *suspended;
00096 
00097 /* incoming/outgoing sms queue control */
00098 extern long max_incoming_sms_qlength;
00099 
00100 
00101 /* our own thingies */
00102 
00103 static volatile sig_atomic_t smsbox_running;
00104 static volatile sig_atomic_t wapbox_running;
00105 static List *wapbox_list;
00106 static List *smsbox_list;
00107 static RWLock   *smsbox_list_rwlock;
00108 
00109 /* dictionaries for holding the smsbox routing information */
00110 static Dict *smsbox_by_id;
00111 static Dict *smsbox_by_smsc;
00112 static Dict *smsbox_by_receiver;
00113 static Dict *smsbox_by_smsc_receiver;
00114 
00115 static long smsbox_port;
00116 static int smsbox_port_ssl;
00117 static long wapbox_port;
00118 static int wapbox_port_ssl;
00119 
00120 /* max pending messages on the line to smsbox */
00121 static long smsbox_max_pending;
00122 
00123 static Octstr *box_allow_ip;
00124 static Octstr *box_deny_ip;
00125 
00126 
00127 static Counter *boxid;
00128 
00129 /* sms_to_smsboxes thread-id */
00130 static long sms_dequeue_thread;
00131 
00132 
00133 typedef struct _boxc {
00134     Connection  *conn;
00135     int               is_wap;
00136     long            id;
00137     int               load;
00138     time_t        connect_time;
00139     Octstr        *client_ip;
00140     List            *incoming;
00141     List            *retry;     /* If sending fails */
00142     List            *outgoing;
00143     Dict           *sent;
00144     Semaphore *pending;
00145     volatile sig_atomic_t alive;
00146     Octstr        *boxc_id; /* identifies the connected smsbox instance */
00147     /* used to mark connection usable or still waiting for ident. msg */
00148     volatile int routable;
00149 } Boxc;
00150 
00151 
00152 /* forward declaration */
00153 static void sms_to_smsboxes(void *arg);
00154 static int send_msg(Boxc *boxconn, Msg *pmsg);
00155 static void boxc_sent_push(Boxc*, Msg*);
00156 static void boxc_sent_pop(Boxc*, Msg*, Msg**);
00157 
00158 
00159 /*-------------------------------------------------
00160  *  receiver thingies
00161  */
00162 
00163 static Msg *read_from_box(Boxc *boxconn)
00164 {
00165     int ret;
00166     Octstr *pack;
00167     Msg *msg;
00168 
00169     pack = NULL;
00170     while (bb_status != BB_DEAD && boxconn->alive) {
00171             /* XXX: if box doesn't send (just keep conn open) we block here while shutdown */
00172         pack = conn_read_withlen(boxconn->conn);
00173         gw_claim_area(pack);
00174         if (pack != NULL)
00175             break;
00176         if (conn_error(boxconn->conn)) {
00177             info(0, "Read error when reading from box <%s>, disconnecting",
00178                  octstr_get_cstr(boxconn->client_ip));
00179             return NULL;
00180         }
00181         if (conn_eof(boxconn->conn)) {
00182             info(0, "Connection closed by the box <%s>",
00183                  octstr_get_cstr(boxconn->client_ip));
00184             return NULL;
00185         }
00186 
00187         ret = conn_wait(boxconn->conn, -1.0);
00188         if (ret < 0) {
00189             error(0, "Connection to box <%s> broke.",
00190                   octstr_get_cstr(boxconn->client_ip));
00191             return NULL;
00192         }
00193     }
00194 
00195     if (pack == NULL)
00196         return NULL;
00197 
00198     msg = msg_unpack(pack);
00199     octstr_destroy(pack);
00200 
00201     if (msg == NULL)
00202         error(0, "Failed to unpack data!");
00203     return msg;
00204 }
00205 
00206 
00207 /*
00208  * Try to deliver message to internal or smscconn queue
00209  * and generate ack/nack for smsbox connections.
00210  */
00211 static void deliver_sms_to_queue(Msg *msg, Boxc *conn)
00212 {
00213     Msg *mack;
00214     int rc;
00215 
00216     /*
00217      * save modifies ID and time, so if the smsbox uses it, save
00218      * it FIRST for the reply message!!!
00219      */
00220     mack = msg_create(ack);
00221     gw_assert(mack != NULL);
00222     uuid_copy(mack->ack.id, msg->sms.id);
00223     mack->ack.time = msg->sms.time;
00224 
00225     store_save(msg);
00226 
00227     rc = smsc2_rout(msg, 0);
00228     switch(rc) {
00229         case SMSCCONN_SUCCESS:
00230            mack->ack.nack = ack_success;
00231            break;
00232         case SMSCCONN_QUEUED:
00233            mack->ack.nack = ack_buffered;
00234            break;
00235         case SMSCCONN_FAILED_DISCARDED: /* no router at all */
00236         case SMSCCONN_FAILED_QFULL: /* queue full */
00237            warning(0, "Message rejected by bearerbox, %s!",
00238                              (rc == SMSCCONN_FAILED_DISCARDED) ? "no router" : "queue full");
00239            /*
00240             * first create nack for store-file, in order to delete
00241             * message from store-file.
00242             */
00243            store_save_ack(msg, (rc == SMSCCONN_FAILED_QFULL ? ack_failed_tmp : ack_failed));
00244            mack->ack.nack = (rc == SMSCCONN_FAILED_QFULL ? ack_failed_tmp : ack_failed);
00245 
00246            /* destroy original message */
00247            msg_destroy(msg);
00248            break;
00249     }
00250 
00251     /* put ack into incoming queue of conn */
00252     send_msg(conn, mack);
00253     msg_destroy(mack);
00254 }
00255 
00256 
00257 static void boxc_receiver(void *arg)
00258 {
00259     Boxc *conn = arg;
00260     Msg *msg, *mack;
00261 
00262     /* remove messages from socket until it is closed */
00263     while (bb_status != BB_DEAD && conn->alive) {
00264 
00265         gwlist_consume(suspended);  /* block here if suspended */
00266 
00267         msg = read_from_box(conn);
00268 
00269         if (msg == NULL) {  /* garbage/connection lost */
00270             conn->alive = 0;
00271             break;
00272         }
00273 
00274         /* we don't accept new messages in shutdown phase */
00275         if ((bb_status == BB_SHUTDOWN || bb_status == BB_DEAD) && msg_type(msg) == sms) {
00276             mack = msg_create(ack);
00277             uuid_copy(mack->ack.id, msg->sms.id);
00278             mack->ack.time = msg->sms.time;
00279             mack->ack.nack = ack_failed_tmp;
00280             msg_destroy(msg);
00281             send_msg(conn, mack);
00282             msg_destroy(mack);
00283             continue;
00284         }
00285 
00286         if (msg_type(msg) == sms && conn->is_wap == 0) {
00287             debug("bb.boxc", 0, "boxc_receiver: sms received");
00288 
00289             /* deliver message to queue */
00290             deliver_sms_to_queue(msg, conn);
00291 
00292             if (conn->routable == 0) {
00293                 conn->routable = 1;
00294                 /* wakeup the dequeue thread */
00295                 gwthread_wakeup(sms_dequeue_thread);
00296             }
00297         } else if (msg_type(msg) == wdp_datagram && conn->is_wap) {
00298             debug("bb.boxc", 0, "boxc_receiver: got wdp from wapbox");
00299 
00300             /* XXX we should block these in SHUTDOWN phase too, but
00301                we need ack/nack msgs implemented first. */
00302             gwlist_produce(conn->outgoing, msg);
00303 
00304         } else if (msg_type(msg) == sms && conn->is_wap) {
00305             debug("bb.boxc", 0, "boxc_receiver: got sms from wapbox");
00306 
00307             /* should be a WAP push message, so tried it the same way */
00308             deliver_sms_to_queue(msg, conn);
00309 
00310             if (conn->routable == 0) {
00311                 conn->routable = 1;
00312                 /* wakeup the dequeue thread */
00313                 gwthread_wakeup(sms_dequeue_thread);
00314             }
00315         } else {
00316             if (msg_type(msg) == heartbeat) {
00317                 if (msg->heartbeat.load != conn->load)
00318                     debug("bb.boxc", 0, "boxc_receiver: heartbeat with "
00319                           "load value %ld received", msg->heartbeat.load);
00320                 conn->load = msg->heartbeat.load;
00321             }
00322             else if (msg_type(msg) == ack) {
00323                 if (msg->ack.nack == ack_failed_tmp) {
00324                     Msg *orig;
00325                     boxc_sent_pop(conn, msg, &orig);
00326                     if (orig != NULL) /* retry this message */
00327                         gwlist_append(conn->retry, orig);
00328                 } else {
00329                     boxc_sent_pop(conn, msg, NULL);
00330                     store_save(msg);
00331                 }
00332                 debug("bb.boxc", 0, "boxc_receiver: got ack");
00333             }
00334             /* if this is an identification message from an smsbox instance */
00335             else if (msg_type(msg) == admin && msg->admin.command == cmd_identify) {
00336 
00337                 /*
00338                  * any smsbox sends this command even if boxc_id is NULL,
00339                  * but we will only consider real identified boxes
00340                  */
00341                 if (msg->admin.boxc_id != NULL) {
00342 
00343                     /* and add the boxc_id into conn for boxc_status() output */
00344                     if (conn->boxc_id != NULL) {
00345                         dict_remove(smsbox_by_id, msg->admin.boxc_id);
00346                         octstr_destroy(conn->boxc_id);
00347                     }
00348 
00349                     conn->boxc_id = msg->admin.boxc_id;
00350                     msg->admin.boxc_id = NULL;
00351 
00352                     /* add this identified smsbox to the dictionary */
00353                     /* XXX check for equal boxc_id in Dict, otherwise we overwrite it */
00354                     dict_put(smsbox_by_id, conn->boxc_id, conn);
00355                     debug("bb.boxc", 0, "boxc_receiver: got boxc_id <%s> from <%s>",
00356                           octstr_get_cstr(conn->boxc_id),
00357                           octstr_get_cstr(conn->client_ip));
00358                 }
00359 
00360                 conn->routable = 1;
00361                 /* wakeup the dequeue thread */
00362                 gwthread_wakeup(sms_dequeue_thread);
00363             }
00364             else
00365                 warning(0, "boxc_receiver: unknown msg received from <%s>, "
00366                            "ignored", octstr_get_cstr(conn->client_ip));
00367             msg_destroy(msg);
00368         }
00369     }
00370 }
00371 
00372 
00373 /*---------------------------------------------
00374  * sender thingies
00375  */
00376 
00377 static int send_msg(Boxc *boxconn, Msg *pmsg)
00378 {
00379     Octstr *pack;
00380 
00381     pack = msg_pack(pmsg);
00382 
00383     if (pack == NULL)
00384         return -1;
00385 
00386     if (boxconn->boxc_id != NULL)
00387         debug("bb.boxc", 0, "send_msg: sending msg to boxc: <%s>",
00388           octstr_get_cstr(boxconn->boxc_id));
00389     else
00390         debug("bb.boxc", 0, "send_msg: sending msg to box: <%s>",
00391           octstr_get_cstr(boxconn->client_ip));
00392 
00393     if (conn_write_withlen(boxconn->conn, pack) == -1) {
00394         error(0, "Couldn't write Msg to box <%s>, disconnecting",
00395           octstr_get_cstr(boxconn->client_ip));
00396         octstr_destroy(pack);
00397         return -1;
00398     }
00399 
00400     octstr_destroy(pack);
00401     return 0;
00402 }
00403 
00404 
00405 static void boxc_sent_push(Boxc *conn, Msg *m)
00406 {
00407     Octstr *os;
00408     char id[UUID_STR_LEN + 1];
00409     
00410     if (conn->is_wap || !conn->sent || !m || msg_type(m) != sms)
00411         return;
00412     
00413     uuid_unparse(m->sms.id, id);
00414     os = octstr_create(id);
00415     dict_put(conn->sent, os, msg_duplicate(m));
00416     semaphore_down(conn->pending);
00417     octstr_destroy(os);
00418 }
00419 
00420 
00421 /*
00422  * Remove msg from sent queue.
00423  * Return 0 if message should be deleted from store and 1 if not (e.g. tmp nack)
00424  */
00425 static void boxc_sent_pop(Boxc *conn, Msg *m, Msg **orig)
00426 {
00427     Octstr *os;
00428     char id[UUID_STR_LEN + 1];
00429     Msg *msg;
00430 
00431     if (conn->is_wap || !conn->sent || !m || (msg_type(m) != ack && msg_type(m) != sms))
00432         return;
00433 
00434     if (orig != NULL)
00435         *orig = NULL;
00436     
00437     uuid_unparse((msg_type(m) == sms ? m->sms.id : m->ack.id), id);
00438     os = octstr_create(id);
00439     msg = dict_remove(conn->sent, os);
00440     octstr_destroy(os);
00441     if (!msg) {
00442         error(0, "BOXC: Got ack for nonexistend message!");
00443         msg_dump(m, 0);
00444         return;
00445     }
00446     semaphore_up(conn->pending);
00447     if (orig == NULL)
00448         msg_destroy(msg);
00449     else
00450         *orig = msg;
00451 }
00452 
00453 
00454 static void boxc_sender(void *arg)
00455 {
00456     Msg *msg;
00457     Boxc *conn = arg;
00458 
00459     gwlist_add_producer(flow_threads);
00460 
00461     while (bb_status != BB_DEAD && conn->alive) {
00462 
00463         /*
00464          * Make sure there's no data left in the outgoing connection before
00465          * doing the potentially blocking gwlist_consume()s
00466          */
00467         conn_flush(conn->conn);
00468 
00469         gwlist_consume(suspended);  /* block here if suspended */
00470 
00471         if ((msg = gwlist_consume(conn->incoming)) == NULL) {
00472             /* tell sms/wapbox to die */
00473             msg = msg_create(admin);
00474             msg->admin.command = restart ? cmd_restart : cmd_shutdown;
00475             send_msg(conn, msg);
00476             msg_destroy(msg);
00477             break;
00478         }
00479         if (msg_type(msg) == heartbeat) {
00480             debug("bb.boxc", 0, "boxc_sender: catch an heartbeat - we are alive");
00481             msg_destroy(msg);
00482             continue;
00483         }
00484         boxc_sent_push(conn, msg);
00485         if (!conn->alive || send_msg(conn, msg) == -1) {
00486             /* we got message here */
00487             boxc_sent_pop(conn, msg, NULL);
00488             gwlist_produce(conn->retry, msg);
00489             break;
00490         }
00491         msg_destroy(msg);
00492         debug("bb.boxc", 0, "boxc_sender: sent message to <%s>",
00493                octstr_get_cstr(conn->client_ip));
00494     }
00495     /* the client closes the connection, after that die in receiver */
00496     /* conn->alive = 0; */
00497 
00498     /* set conn to unroutable */
00499     conn->routable = 0;
00500 
00501     gwlist_remove_producer(flow_threads);
00502 }
00503 
00504 /*---------------------------------------------------------------
00505  * accept/create/kill thingies
00506  */
00507 
00508 
00509 static Boxc *boxc_create(int fd, Octstr *ip, int ssl)
00510 {
00511     Boxc *boxc;
00512 
00513     boxc = gw_malloc(sizeof(Boxc));
00514     boxc->is_wap = 0;
00515     boxc->load = 0;
00516     boxc->conn = conn_wrap_fd(fd, ssl);
00517     boxc->id = counter_increase(boxid);
00518     boxc->client_ip = ip;
00519     boxc->alive = 1;
00520     boxc->connect_time = time(NULL);
00521     boxc->boxc_id = NULL;
00522     boxc->routable = 0;
00523     return boxc;
00524 }
00525 
00526 static void boxc_destroy(Boxc *boxc)
00527 {
00528     if (boxc == NULL)
00529         return;
00530 
00531     /* do nothing to the lists, as they are only references */
00532 
00533     if (boxc->conn)
00534         conn_destroy(boxc->conn);
00535     octstr_destroy(boxc->client_ip);
00536     octstr_destroy(boxc->boxc_id);
00537     gw_free(boxc);
00538 }
00539 
00540 
00541 
00542 static Boxc *accept_boxc(int fd, int ssl)
00543 {
00544     Boxc *newconn;
00545     Octstr *ip;
00546 
00547     int newfd;
00548     struct sockaddr_in client_addr;
00549     socklen_t client_addr_len;
00550 
00551     client_addr_len = sizeof(client_addr);
00552 
00553     newfd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
00554     if (newfd < 0)
00555         return NULL;
00556 
00557     ip = host_ip(client_addr);
00558 
00559     if (is_allowed_ip(box_allow_ip, box_deny_ip, ip) == 0) {
00560         info(0, "Box connection tried from denied host <%s>, disconnected",
00561                 octstr_get_cstr(ip));
00562         octstr_destroy(ip);
00563         close(newfd);
00564         return NULL;
00565     }
00566     newconn = boxc_create(newfd, ip, ssl);
00567 
00568     /*
00569      * check if the SSL handshake was successfull, otherwise
00570      * this is no valid box connection any more
00571      */
00572 #ifdef HAVE_LIBSSL
00573      if (ssl && !conn_get_ssl(newconn->conn))
00574         return NULL;
00575 #endif
00576 
00577     info(0, "Client connected from <%s> %s", octstr_get_cstr(ip), ssl?"using SSL":"");
00578 
00579     /* XXX TODO: do the hand-shake, baby, yeah-yeah! */
00580 
00581     return newconn;
00582 }
00583 
00584 
00585 
00586 static void run_smsbox(void *arg)
00587 {
00588     Boxc *newconn;
00589     long sender;
00590     Msg *msg;
00591     List *keys;
00592     Octstr *key;
00593 
00594     gwlist_add_producer(flow_threads);
00595     newconn = arg;
00596     newconn->incoming = gwlist_create();
00597     gwlist_add_producer(newconn->incoming);
00598     newconn->retry = incoming_sms;
00599     newconn->outgoing = outgoing_sms;
00600     newconn->sent = dict_create(smsbox_max_pending, NULL);
00601     newconn->pending = semaphore_create(smsbox_max_pending);
00602 
00603     sender = gwthread_create(boxc_sender, newconn);
00604     if (sender == -1) {
00605         error(0, "Failed to start a new thread, disconnecting client <%s>",
00606               octstr_get_cstr(newconn->client_ip));
00607         goto cleanup;
00608     }
00609     /*
00610      * We register newconn in the smsbox_list here but mark newconn as routable
00611      * after identification or first message received from smsbox. So we can avoid
00612      * a race condition for routable smsboxes (otherwise between startup and
00613      * registration we will forward some messages to smsbox).
00614      */
00615     gw_rwlock_wrlock(smsbox_list_rwlock);
00616     gwlist_append(smsbox_list, newconn);
00617     gw_rwlock_unlock(smsbox_list_rwlock);
00618 
00619     gwlist_add_producer(newconn->outgoing);
00620     boxc_receiver(newconn);
00621     gwlist_remove_producer(newconn->outgoing);
00622 
00623     /* remove us from smsbox routing list */
00624     gw_rwlock_wrlock(smsbox_list_rwlock);
00625     gwlist_delete_equal(smsbox_list, newconn);
00626     if (newconn->boxc_id) {
00627         dict_remove(smsbox_by_id, newconn->boxc_id);
00628     }
00629     gw_rwlock_unlock(smsbox_list_rwlock);
00630 
00631     /*
00632      * check if we in the shutdown phase and sms dequeueing thread
00633      *   has removed the producer already
00634      */
00635     if (gwlist_producer_count(newconn->incoming) > 0)
00636         gwlist_remove_producer(newconn->incoming);
00637 
00638     /* check if we are still waiting for ack's and semaphore locked */
00639     if (dict_key_count(newconn->sent) >= smsbox_max_pending)
00640         semaphore_up(newconn->pending); /* allow sender to go down */
00641         
00642     gwthread_join(sender);
00643 
00644     /* put not acked msgs into incoming queue */    
00645     keys = dict_keys(newconn->sent);
00646     while((key = gwlist_extract_first(keys)) != NULL) {
00647         msg = dict_remove(newconn->sent, key);
00648         gwlist_produce(incoming_sms, msg);
00649         octstr_destroy(key);
00650     }
00651     gw_assert(gwlist_len(keys) == 0);
00652     gwlist_destroy(keys, octstr_destroy_item);
00653 
00654     /* clear our send queue */
00655     while((msg = gwlist_extract_first(newconn->incoming)) != NULL) {
00656         gwlist_produce(incoming_sms, msg);
00657     }
00658 
00659 cleanup:
00660     gw_assert(gwlist_len(newconn->incoming) == 0);
00661     gwlist_destroy(newconn->incoming, NULL);
00662     gw_assert(dict_key_count(newconn->sent) == 0);
00663     dict_destroy(newconn->sent);
00664     semaphore_destroy(newconn->pending);
00665     boxc_destroy(newconn);
00666 
00667     /* wakeup the dequeueing thread */
00668     gwthread_wakeup(sms_dequeue_thread);
00669 
00670     gwlist_remove_producer(flow_threads);
00671 }
00672 
00673 
00674 
00675 static void run_wapbox(void *arg)
00676 {
00677     Boxc *newconn;
00678     List *newlist;
00679     long sender;
00680 
00681     gwlist_add_producer(flow_threads);
00682     newconn = arg;
00683     newconn->is_wap = 1;
00684     
00685     /*
00686      * create a new incoming list for just that box,
00687      * and add it to list of list pointers, so we can start
00688      * to route messages to it.
00689      */
00690 
00691     debug("bb", 0, "setting up systems for new wapbox");
00692     
00693     newlist = gwlist_create();
00694     /* this is released by the sender/receiver if it exits */
00695     gwlist_add_producer(newlist);
00696     
00697     newconn->incoming = newlist;
00698     newconn->retry = incoming_wdp;
00699     newconn->outgoing = outgoing_wdp;
00700 
00701     sender = gwthread_create(boxc_sender, newconn);
00702     if (sender == -1) {
00703         error(0, "Failed to start a new thread, disconnecting client <%s>",
00704               octstr_get_cstr(newconn->client_ip));
00705         goto cleanup;
00706     }
00707     gwlist_append(wapbox_list, newconn);
00708     gwlist_add_producer(newconn->outgoing);
00709     boxc_receiver(newconn);
00710 
00711     /* cleanup after receiver has exited */
00712     
00713     gwlist_remove_producer(newconn->outgoing);
00714     gwlist_lock(wapbox_list);
00715     gwlist_delete_equal(wapbox_list, newconn);
00716     gwlist_unlock(wapbox_list);
00717 
00718     while (gwlist_producer_count(newlist) > 0)
00719         gwlist_remove_producer(newlist);
00720 
00721     newconn->alive = 0;
00722     
00723     gwthread_join(sender);
00724 
00725 cleanup:
00726     gw_assert(gwlist_len(newlist) == 0);
00727     gwlist_destroy(newlist, NULL);
00728     boxc_destroy(newconn);
00729 
00730     gwlist_remove_producer(flow_threads);
00731 }
00732 
00733 
00734 /*------------------------------------------------
00735  * main single thread functions
00736  */
00737 
00738 typedef struct _addrpar {
00739     Octstr *address;
00740     int port;
00741     int wapboxid;
00742 } AddrPar;
00743 
00744 static void ap_destroy(AddrPar *addr)
00745 {
00746     octstr_destroy(addr->address);
00747     gw_free(addr);
00748 }
00749 
00750 static int cmp_route(void *ap, void *ms)
00751 {
00752     AddrPar *addr = ap;
00753     Msg *msg = ms;
00754     
00755     if (msg->wdp_datagram.source_port == addr->port  &&
00756         octstr_compare(msg->wdp_datagram.source_address, addr->address)==0)
00757     return 1;
00758 
00759     return 0;
00760 }
00761 
00762 static int cmp_boxc(void *bc, void *ap)
00763 {
00764     Boxc *boxc = bc;
00765     AddrPar *addr = ap;
00766 
00767     if (boxc->id == addr->wapboxid) return 1;
00768         return 0;
00769 }
00770 
00771 static Boxc *route_msg(List *route_info, Msg *msg)
00772 {
00773     AddrPar *ap;
00774     Boxc *conn, *best;
00775     int i, b, len;
00776     
00777     ap = gwlist_search(route_info, msg, cmp_route);
00778     if (ap == NULL) {
00779         debug("bb.boxc", 0, "Did not find previous routing info for WDP, "
00780               "generating new");
00781 route:
00782 
00783         if (gwlist_len(wapbox_list) == 0)
00784             return NULL;
00785 
00786         gwlist_lock(wapbox_list);
00787 
00788     /* take random wapbox from list, and then check all wapboxes
00789      * and select the one with lowest load level - if tied, the first
00790      * one
00791      */
00792         len = gwlist_len(wapbox_list);
00793         b = gw_rand() % len;
00794         best = gwlist_get(wapbox_list, b);
00795 
00796         for(i = 0; i < gwlist_len(wapbox_list); i++) {
00797             conn = gwlist_get(wapbox_list, (i+b) % len);
00798             if (conn != NULL && best != NULL)
00799                 if (conn->load < best->load)
00800                     best = conn;
00801         }
00802         if (best == NULL) {
00803             warning(0, "wapbox_list empty!");
00804             gwlist_unlock(wapbox_list);
00805             return NULL;
00806         }
00807         conn = best;
00808         conn->load++;   /* simulate new client until we get new values */
00809     
00810         ap = gw_malloc(sizeof(AddrPar));
00811         ap->address = octstr_duplicate(msg->wdp_datagram.source_address);
00812         ap->port = msg->wdp_datagram.source_port;
00813         ap->wapboxid = conn->id;
00814         gwlist_produce(route_info, ap);
00815 
00816         gwlist_unlock(wapbox_list);
00817     } else
00818         conn = gwlist_search(wapbox_list, ap, cmp_boxc);
00819 
00820     if (conn == NULL) {
00821     /* routing failed; wapbox has disappeared!
00822      * ..remove routing info and re-route   */
00823 
00824         debug("bb.boxc", 0, "Old wapbox has disappeared, re-routing");
00825 
00826         gwlist_delete_equal(route_info, ap);
00827         ap_destroy(ap);
00828         goto route;
00829     }
00830     return conn;
00831 }
00832 
00833 
00834 /*
00835  * this thread listens to incoming_wdp list
00836  * and then routs messages to proper wapbox
00837  */
00838 static void wdp_to_wapboxes(void *arg)
00839 {
00840     List *route_info;
00841     AddrPar *ap;
00842     Boxc *conn;
00843     Msg *msg;
00844     int i;
00845 
00846     gwlist_add_producer(flow_threads);
00847     gwlist_add_producer(wapbox_list);
00848 
00849     route_info = gwlist_create();
00850 
00851     
00852     while(bb_status != BB_DEAD) {
00853 
00854         gwlist_consume(suspended);  /* block here if suspended */
00855 
00856         if ((msg = gwlist_consume(incoming_wdp)) == NULL)
00857              break;
00858 
00859         gw_assert(msg_type(msg) == wdp_datagram);
00860 
00861         conn = route_msg(route_info, msg);
00862         if (conn == NULL) {
00863             warning(0, "Cannot route message, discard it");
00864             msg_destroy(msg);
00865             continue;
00866         }
00867         gwlist_produce(conn->incoming, msg);
00868     }
00869     debug("bb", 0, "wdp_to_wapboxes: destroying lists");
00870     while((ap = gwlist_extract_first(route_info)) != NULL)
00871     ap_destroy(ap);
00872 
00873     gw_assert(gwlist_len(route_info) == 0);
00874     gwlist_destroy(route_info, NULL);
00875 
00876     gwlist_lock(wapbox_list);
00877     for(i=0; i < gwlist_len(wapbox_list); i++) {
00878         conn = gwlist_get(wapbox_list, i);
00879         gwlist_remove_producer(conn->incoming);
00880         conn->alive = 0;
00881     }
00882     gwlist_unlock(wapbox_list);
00883 
00884     gwlist_remove_producer(wapbox_list);
00885     gwlist_remove_producer(flow_threads);
00886 }
00887 
00888 
00889 static void wait_for_connections(int fd, void (*function) (void *arg), 
00890                                  List *waited, int ssl)
00891 {
00892     int ret;
00893     int timeout = 10; /* 10 sec. */
00894 
00895     gw_assert(function != NULL);
00896     
00897     while(bb_status != BB_DEAD) {
00898 
00899         /* if we are being shutdowned, as long as there is
00900          * messages in incoming list allow new connections, but when
00901          * list is empty, exit.
00902          * Note: We have timeout (defined above) for which we allow new connections.
00903          *           Otherwise we wait here for ever!
00904          */
00905         if (bb_status == BB_SHUTDOWN) {
00906             ret = gwlist_wait_until_nonempty(waited);
00907             if (ret == -1 || !timeout)
00908                 break;
00909             else
00910                 timeout--;
00911         }
00912 
00913         /* block here if suspended */
00914         gwlist_consume(suspended);
00915 
00916         ret = gwthread_pollfd(fd, POLLIN, 1.0);
00917         if (ret > 0) {
00918             Boxc *newconn = accept_boxc(fd, ssl);
00919             if (newconn != NULL) {
00920                 gwthread_create(function, newconn);
00921                 gwthread_sleep(1.0);
00922             } else {
00923                 error(0, "Failed to create new boxc connection.");
00924             }
00925         } else if (ret < 0 && errno != EINTR && errno != EAGAIN)
00926             error(errno, "bb_boxc::wait_for_connections failed");
00927     }
00928 }
00929 
00930 
00931 
00932 static void smsboxc_run(void *arg)
00933 {
00934     int fd;
00935     int port;
00936 
00937     gwlist_add_producer(flow_threads);
00938     gwthread_wakeup(MAIN_THREAD_ID);
00939     port = (int) *((long *)arg);
00940     
00941     fd = make_server_socket(port, NULL); 
00942     /* XXX add interface_name if required */
00943 
00944     if (fd < 0) {
00945         panic(0, "Could not open smsbox port %d", port);
00946     }
00947 
00948     /*
00949      * infinitely wait for new connections;
00950      * to shut down the system, SIGTERM is send and then
00951      * select drops with error, so we can check the status
00952      */
00953     wait_for_connections(fd, run_smsbox, incoming_sms, smsbox_port_ssl);
00954 
00955     gwlist_remove_producer(smsbox_list);
00956 
00957     /* continue avalanche */
00958     gwlist_remove_producer(outgoing_sms);
00959 
00960     /* all connections do the same, so that all must remove() before it
00961      * is completely over
00962      */
00963     while(gwlist_wait_until_nonempty(smsbox_list) == 1)
00964         gwthread_sleep(1.0);
00965 
00966     /* close listen socket */
00967     close(fd);
00968 
00969     gwthread_wakeup(sms_dequeue_thread);
00970     gwthread_join(sms_dequeue_thread);
00971 
00972     gwlist_destroy(smsbox_list, NULL);
00973     smsbox_list = NULL;
00974     gw_rwlock_destroy(smsbox_list_rwlock);
00975     smsbox_list_rwlock = NULL;
00976 
00977     /* destroy things related to smsbox routing */
00978     dict_destroy(smsbox_by_id);
00979     smsbox_by_id = NULL;
00980     dict_destroy(smsbox_by_smsc);
00981     smsbox_by_smsc = NULL;
00982     dict_destroy(smsbox_by_receiver);
00983     smsbox_by_receiver = NULL;
00984     dict_destroy(smsbox_by_smsc_receiver);
00985     smsbox_by_smsc_receiver = NULL;
00986     
00987     gwlist_remove_producer(flow_threads);
00988 }
00989 
00990 
00991 static void wapboxc_run(void *arg)
00992 {
00993     int fd, port;
00994 
00995     gwlist_add_producer(flow_threads);
00996     gwthread_wakeup(MAIN_THREAD_ID);
00997     port = (int) *((long*)arg);
00998     
00999     fd = make_server_socket(port, NULL);
01000         /* XXX add interface_name if required */
01001 
01002     if (fd < 0) {
01003         panic(0, "Could not open wapbox port %d", port);
01004     }
01005 
01006     wait_for_connections(fd, run_wapbox, incoming_wdp, wapbox_port_ssl);
01007 
01008     /* continue avalanche */
01009 
01010     gwlist_remove_producer(outgoing_wdp);
01011 
01012 
01013     /* wait for all connections to die and then remove list
01014      */
01015     
01016     while(gwlist_wait_until_nonempty(wapbox_list) == 1)
01017         gwthread_sleep(1.0);
01018 
01019     /* wait for wdp_to_wapboxes to exit */
01020     while(gwlist_consume(wapbox_list)!=NULL)
01021     ;
01022     
01023     /* close listen socket */
01024     close(fd);   
01025  
01026     gwlist_destroy(wapbox_list, NULL);
01027     wapbox_list = NULL;
01028     
01029     gwlist_remove_producer(flow_threads);
01030 }
01031 
01032 
01033 /*
01034  * Populates the corresponding smsbox_by_foobar dictionary hash tables
01035  */
01036 static void init_smsbox_routes(Cfg *cfg)
01037 {
01038     CfgGroup *grp;
01039     List *list, *items;
01040     Octstr *boxc_id, *smsc_ids, *shortcuts;
01041     int i, j;
01042 
01043     boxc_id = smsc_ids = shortcuts = NULL;
01044 
01045     list = cfg_get_multi_group(cfg, octstr_imm("smsbox-route")); 
01046  
01047     /* loop multi-group "smsbox-route" */
01048     while (list && (grp = gwlist_extract_first(list)) != NULL) { 
01049          
01050         if ((boxc_id = cfg_get(grp, octstr_imm("smsbox-id"))) == NULL) { 
01051             grp_dump(grp); 
01052             panic(0,"'smsbox-route' group without valid 'smsbox-id' directive!"); 
01053         }
01054 
01055         /*
01056          * If smsc-id is given, then any message comming from the specified
01057          * smsc-id in the list will be routed to this smsbox instance.
01058          * If shortcode is given, then any message with receiver number 
01059          * matching those will be routed to this smsbox instance.
01060          * If both are given, then only receiver within shortcode originating
01061          * from smsc-id list will be routed to this smsbox instance. So if both
01062          * are present then this is a logical AND operation.
01063          */
01064         smsc_ids = cfg_get(grp, octstr_imm("smsc-id"));
01065         shortcuts = cfg_get(grp, octstr_imm("shortcode"));
01066 
01067         /* consider now the 3 possibilities: */
01068         if (smsc_ids && !shortcuts) {
01069             /* smsc-id only, so all MO traffic */
01070             items = octstr_split(smsc_ids, octstr_imm(";"));
01071             for (i = 0; i < gwlist_len(items); i++) {
01072                 Octstr *item = gwlist_get(items, i);
01073                 octstr_strip_blanks(item);
01074 
01075                 debug("bb.boxc",0,"Adding smsbox routing to id <%s> for smsc id <%s>",
01076                       octstr_get_cstr(boxc_id), octstr_get_cstr(item));
01077 
01078                 if (!dict_put_once(smsbox_by_smsc, item, octstr_duplicate(boxc_id)))
01079                     panic(0, "Routing for smsc-id <%s> already exists!",
01080                           octstr_get_cstr(item));
01081             }
01082             gwlist_destroy(items, octstr_destroy_item);
01083             octstr_destroy(smsc_ids);
01084         }
01085         else if (!smsc_ids && shortcuts) {
01086             /* shortcode only, so these MOs from all smscs */
01087             items = octstr_split(shortcuts, octstr_imm(";"));
01088             for (i = 0; i < gwlist_len(items); i++) {
01089                 Octstr *item = gwlist_get(items, i);
01090                 octstr_strip_blanks(item);
01091 
01092                 debug("bb.boxc",0,"Adding smsbox routing to id <%s> for receiver no <%s>",
01093                       octstr_get_cstr(boxc_id), octstr_get_cstr(item));
01094             
01095                 if (!dict_put_once(smsbox_by_receiver, item, octstr_duplicate(boxc_id)))
01096                     panic(0, "Routing for receiver no <%s> already exists!",
01097                           octstr_get_cstr(item));
01098             }
01099             gwlist_destroy(items, octstr_destroy_item);
01100             octstr_destroy(shortcuts);
01101         }
01102         else if (smsc_ids && shortcuts) {
01103             /* both, so only specified MOs from specified smscs */
01104             items = octstr_split(shortcuts, octstr_imm(";"));
01105             for (i = 0; i < gwlist_len(items); i++) {
01106                 List *subitems;
01107                 Octstr *item = gwlist_get(items, i);
01108                 octstr_strip_blanks(item);
01109                 subitems = octstr_split(smsc_ids, octstr_imm(";")); 
01110                 for (j = 0; j < gwlist_len(subitems); j++) {
01111                     Octstr *subitem = gwlist_get(subitems, j);
01112                     octstr_strip_blanks(subitem);
01113                     
01114                     debug("bb.boxc",0,"Adding smsbox routing to id <%s> "
01115                           "for receiver no <%s> and smsc id <%s>",
01116                           octstr_get_cstr(boxc_id), octstr_get_cstr(item),
01117                           octstr_get_cstr(subitem));
01118             
01119                     /* construct the dict key '<shortcode>:<smsc-id>' */
01120                     octstr_insert(subitem, item, 0);
01121                     octstr_insert_char(subitem, octstr_len(item), ':');
01122                     if (!dict_put_once(smsbox_by_smsc_receiver, subitem, octstr_duplicate(boxc_id)))
01123                         panic(0, "Routing for receiver:smsc <%s> already exists!",
01124                               octstr_get_cstr(subitem));
01125                 }
01126                 gwlist_destroy(subitems, octstr_destroy_item);
01127             }
01128             gwlist_destroy(items, octstr_destroy_item);
01129             octstr_destroy(shortcuts);
01130         }
01131         octstr_destroy(boxc_id);
01132     }
01133 
01134     gwlist_destroy(list, NULL);
01135 }
01136 
01137 
01138 /*-------------------------------------------------------------
01139  * public functions
01140  *
01141  * SMSBOX
01142  */
01143 
01144 int smsbox_start(Cfg *cfg)
01145 {
01146     CfgGroup *grp;
01147     
01148     if (smsbox_running) return -1;
01149 
01150     debug("bb", 0, "starting smsbox connection module");
01151 
01152     grp = cfg_get_single_group(cfg, octstr_imm("core"));
01153     if (cfg_get_integer(&smsbox_port, grp, octstr_imm("smsbox-port")) == -1) {
01154         error(0, "Missing smsbox-port variable, cannot start smsboxes");
01155         return -1;
01156     }
01157 #ifdef HAVE_LIBSSL
01158     cfg_get_bool(&smsbox_port_ssl, grp, octstr_imm("smsbox-port-ssl"));
01159 #endif /* HAVE_LIBSSL */
01160 
01161     if (smsbox_port_ssl)
01162         debug("bb", 0, "smsbox connection module is SSL-enabled");
01163         
01164     if (cfg_get_integer(&smsbox_max_pending, grp, octstr_imm("smsbox-max-pending")) == -1) {
01165         smsbox_max_pending = SMSBOX_MAX_PENDING;
01166         info(0, "BOXC: 'smsbox-max-pending' not set, using default (%ld).", smsbox_max_pending);
01167     }
01168     
01169     smsbox_list = gwlist_create();  /* have a list of connections */
01170     smsbox_list_rwlock = gw_rwlock_create();
01171     if (!boxid)
01172         boxid = counter_create();
01173 
01174     /* the smsbox routing specific inits */
01175     smsbox_by_id = dict_create(10, NULL);  /* and a hash directory of identified */
01176     smsbox_by_smsc = dict_create(30, (void(*)(void *)) octstr_destroy);
01177     smsbox_by_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
01178     smsbox_by_smsc_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
01179 
01180     /* load the defined smsbox routing rules */
01181     init_smsbox_routes(cfg);
01182 
01183     gwlist_add_producer(outgoing_sms);
01184     gwlist_add_producer(smsbox_list);
01185 
01186     smsbox_running = 1;
01187     
01188     if ((sms_dequeue_thread = gwthread_create(sms_to_smsboxes, NULL)) == -1)
01189         panic(0, "Failed to start a new thread for smsbox routing");
01190 
01191     if (gwthread_create(smsboxc_run, &smsbox_port) == -1)
01192         panic(0, "Failed to start a new thread for smsbox connections");
01193 
01194     return 0;
01195 }
01196 
01197 
01198 int smsbox_restart(Cfg *cfg)
01199 {
01200     if (!smsbox_running) return -1;
01201     
01202     /* send new config to clients */
01203 
01204     return 0;
01205 }
01206 
01207 
01208 
01209 /* WAPBOX */
01210 
01211 int wapbox_start(Cfg *cfg)
01212 {
01213     CfgGroup *grp;
01214 
01215     if (wapbox_running) return -1;
01216 
01217     debug("bb", 0, "starting wapbox connection module");
01218     
01219     grp = cfg_get_single_group(cfg, octstr_imm("core"));
01220     
01221     if (cfg_get_integer(&wapbox_port, grp, octstr_imm("wapbox-port")) == -1) {
01222         error(0, "Missing wapbox-port variable, cannot start WAP");
01223         return -1;
01224     }
01225 #ifdef HAVE_LIBSSL
01226     cfg_get_bool(&wapbox_port_ssl, grp, octstr_imm("wapbox-port-ssl"));
01227 #endif /* HAVE_LIBSSL */
01228   
01229     box_allow_ip = cfg_get(grp, octstr_imm("box-allow-ip"));
01230     if (box_allow_ip == NULL)
01231         box_allow_ip = octstr_create("");
01232     box_deny_ip = cfg_get(grp, octstr_imm("box-deny-ip"));
01233     if (box_deny_ip == NULL)
01234         box_deny_ip = octstr_create("");
01235     if (box_allow_ip != NULL && box_deny_ip == NULL)
01236         info(0, "Box connection allowed IPs defined without any denied...");
01237     
01238     wapbox_list = gwlist_create();  /* have a list of connections */
01239     gwlist_add_producer(outgoing_wdp);
01240     if (!boxid)
01241         boxid = counter_create();
01242 
01243     if (gwthread_create(wdp_to_wapboxes, NULL) == -1)
01244         panic(0, "Failed to start a new thread for wapbox routing");
01245  
01246     if (gwthread_create(wapboxc_run, &wapbox_port) == -1)
01247         panic(0, "Failed to start a new thread for wapbox connections");
01248 
01249     wapbox_running = 1;
01250     return 0;
01251 }
01252 
01253 
01254 Octstr *boxc_status(int status_type)
01255 {
01256     Octstr *tmp;
01257     char *lb, *ws;
01258     int i, boxes, para = 0;
01259     time_t orig, t;
01260     Boxc *bi;
01261 
01262     orig = time(NULL);
01263 
01264     /*
01265      * XXX: this will cause segmentation fault if this is called
01266      *    between 'destroy_list and setting list to NULL calls.
01267      *    Ok, this has to be fixed, but now I am too tired.
01268      */
01269     
01270     if ((lb = bb_status_linebreak(status_type))==NULL)
01271         return octstr_create("Un-supported format");
01272 
01273     if (status_type == BBSTATUS_HTML)
01274         ws = "&nbsp;&nbsp;&nbsp;&nbsp;";
01275     else if (status_type == BBSTATUS_TEXT)
01276         ws = "    ";
01277     else
01278         ws = "";
01279 
01280     if (status_type == BBSTATUS_HTML || status_type == BBSTATUS_WML)
01281         para = 1;
01282     
01283     if (status_type == BBSTATUS_XML) {
01284         tmp = octstr_create ("");
01285         octstr_append_cstr(tmp, "<boxes>\n\t");
01286     }
01287     else
01288         tmp = octstr_format("%sBox connections:%s", para ? "<p>" : "", lb);
01289     boxes = 0;
01290     
01291     if (wapbox_list) {
01292         gwlist_lock(wapbox_list);
01293         for(i=0; i < gwlist_len(wapbox_list); i++) {
01294             bi = gwlist_get(wapbox_list, i);
01295             if (bi->alive == 0)
01296                 continue;
01297             t = orig - bi->connect_time;
01298             if (status_type == BBSTATUS_XML)
01299                 octstr_format_append(tmp,
01300                 "<box>\n\t\t<type>wapbox</type>\n\t\t<IP>%s</IP>\n"
01301                 "\t\t<status>on-line %ldd %ldh %ldm %lds</status>\n"
01302                 "\t\t<ssl>%s</ssl>\n\t</box>\n",
01303                 octstr_get_cstr(bi->client_ip),
01304                 t/3600/24, t/3600%24, t/60%60, t%60,
01305 #ifdef HAVE_LIBSSL
01306                 conn_get_ssl(bi->conn) != NULL ? "yes" : "no"
01307 #else 
01308                 "not installed"
01309 #endif
01310                 );
01311             else
01312                 octstr_format_append(tmp,
01313                 "%swapbox, IP %s (on-line %ldd %ldh %ldm %lds) %s %s",
01314                 ws, octstr_get_cstr(bi->client_ip),
01315                 t/3600/24, t/3600%24, t/60%60, t%60, 
01316 #ifdef HAVE_LIBSSL
01317                 conn_get_ssl(bi->conn) != NULL ? "using SSL" : "",
01318 #else
01319                 "",
01320 #endif 
01321                 lb);
01322                 boxes++;
01323            }
01324            gwlist_unlock(wapbox_list);
01325         }
01326         if (smsbox_list) {
01327             gw_rwlock_rdlock(smsbox_list_rwlock);
01328         for(i=0; i < gwlist_len(smsbox_list); i++) {
01329             bi = gwlist_get(smsbox_list, i);
01330             if (bi->alive == 0)
01331                 continue;
01332             t = orig - bi->connect_time;
01333             if (status_type == BBSTATUS_XML)
01334                 octstr_format_append(tmp, "<box>\n\t\t<type>smsbox</type>\n"
01335                     "\t\t<id>%s</id>\n\t\t<IP>%s</IP>\n"
01336                     "\t\t<queue>%ld</queue>\n"
01337                     "\t\t<status>on-line %ldd %ldh %ldm %lds</status>\n"
01338                     "\t\t<ssl>%s</ssl>\n\t</box>",
01339                     (bi->boxc_id ? octstr_get_cstr(bi->boxc_id) : ""),
01340                     octstr_get_cstr(bi->client_ip),
01341                     gwlist_len(bi->incoming) + dict_key_count(bi->sent),
01342                     t/3600/24, t/3600%24, t/60%60, t%60,
01343 #ifdef HAVE_LIBSSL
01344                     conn_get_ssl(bi->conn) != NULL ? "yes" : "no"
01345 #else 
01346                     "not installed"
01347 #endif
01348                     );
01349             else
01350                 octstr_format_append(tmp, "%ssmsbox:%s, IP %s (%ld queued), (on-line %ldd %ldh %ldm %lds) %s %s",
01351                     ws, (bi->boxc_id ? octstr_get_cstr(bi->boxc_id) : "(none)"), 
01352                     octstr_get_cstr(bi->client_ip), gwlist_len(bi->incoming) + dict_key_count(bi->sent),
01353                     t/3600/24, t/3600%24, t/60%60, t%60, 
01354 #ifdef HAVE_LIBSSL
01355                     conn_get_ssl(bi->conn) != NULL ? "using SSL" : "",
01356 #else
01357                     "",
01358 #endif 
01359                     lb);
01360            boxes++;
01361         }
01362         gw_rwlock_unlock(smsbox_list_rwlock);
01363     }
01364     if (boxes == 0 && status_type != BBSTATUS_XML) {
01365         octstr_destroy(tmp);
01366         tmp = octstr_format("%sNo boxes connected", para ? "<p>" : "");
01367     }
01368     if (para)
01369         octstr_append_cstr(tmp, "</p>");
01370     if (status_type == BBSTATUS_XML)
01371         octstr_append_cstr(tmp, "</boxes>\n");
01372     else
01373         octstr_append_cstr(tmp, "\n\n");
01374     return tmp;
01375 }
01376 
01377 
01378 int boxc_incoming_wdp_queue(void)
01379 {
01380     int i, q = 0;
01381     Boxc *boxc;
01382     
01383     if (wapbox_list) {
01384         gwlist_lock(wapbox_list);
01385         for(i=0; i < gwlist_len(wapbox_list); i++) {
01386             boxc = gwlist_get(wapbox_list, i);
01387             q += gwlist_len(boxc->incoming);
01388         }
01389         gwlist_unlock(wapbox_list);
01390     }
01391     return q;
01392 }
01393 
01394 
01395 void boxc_cleanup(void)
01396 {
01397     octstr_destroy(box_allow_ip);
01398     octstr_destroy(box_deny_ip);
01399     box_allow_ip = NULL;
01400     box_deny_ip = NULL;
01401     counter_destroy(boxid);
01402     boxid = NULL;
01403 }
01404 
01405 
01406 /*
01407  * Route the incoming message to one of the following input queues:
01408  *   a specific smsbox conn
01409  *   a random smsbox conn if no shortcut routing and msg->sms.boxc_id match
01410  *
01411  * BEWARE: All logic inside here should be fast, hence speed processing
01412  * optimized, because every single MO message passes this function and we 
01413  * have to ensure that no unncessary overhead is done.
01414  */
01415 int route_incoming_to_boxc(Msg *msg)
01416 {
01417     Boxc *bc = NULL, *best = NULL;
01418     Octstr *s, *r, *rs;
01419     long len, b, i;
01420     int full_found = 0;
01421 
01422     s = r = NULL;
01423     gw_assert(msg_type(msg) == sms);
01424 
01425     /* msg_dump(msg, 0); */
01426 
01427     /* 
01428      * We have a specific route to pass this msg to smsbox-id 
01429      * Lookup the connection in the dictionary.
01430      */
01431     gw_rwlock_rdlock(smsbox_list_rwlock);
01432     if (gwlist_len(smsbox_list) == 0) {
01433         gw_rwlock_unlock(smsbox_list_rwlock);
01434         warning(0, "smsbox_list empty!");
01435         if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
01436             gwlist_produce(incoming_sms, msg);
01437         return 0;
01438          }
01439          else
01440              return -1;
01441     }
01442 
01443     if (octstr_len(msg->sms.boxc_id) > 0) {
01444         
01445         bc = dict_get(smsbox_by_id, msg->sms.boxc_id);
01446         if (bc == NULL) {
01447             /*
01448              * something is wrong, this was the smsbox connection we used
01449              * for sending, so it seems this smsbox is gone
01450              */
01451             warning(0,"Could not route message to smsbox id <%s>, smsbox is gone!",
01452                     octstr_get_cstr(msg->sms.boxc_id));
01453         }
01454     }
01455     else {
01456         /*
01457          * Check if we have a "smsbox-route" for this msg.
01458          * Where the shortcode route has a higher priority then the smsc-id rule.
01459          * Highest priority has the combined <shortcode>:<smsc-id> route.
01460          */
01461         Octstr *os = octstr_format("%s:%s", 
01462                                    octstr_get_cstr(msg->sms.receiver),
01463                                    octstr_get_cstr(msg->sms.smsc_id));
01464         s = (msg->sms.smsc_id ? dict_get(smsbox_by_smsc, msg->sms.smsc_id) : NULL);
01465         r = (msg->sms.receiver ? dict_get(smsbox_by_receiver, msg->sms.receiver) : NULL);
01466         rs = (os ? dict_get(smsbox_by_smsc_receiver, os) : NULL);
01467         octstr_destroy(os); 
01468         bc = rs ? dict_get(smsbox_by_id, rs) : 
01469             (r ? dict_get(smsbox_by_id, r) : (s ? dict_get(smsbox_by_id, s) : NULL));
01470     }
01471 
01472     /* check if we found our routing */
01473     if (bc != NULL) {
01474         if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(bc->incoming)) {
01475             gwlist_produce(bc->incoming, msg);
01476             gw_rwlock_unlock(smsbox_list_rwlock);
01477             return 1; /* we are done */
01478         }
01479         else {
01480             gw_rwlock_unlock(smsbox_list_rwlock);
01481             return -1;
01482         }
01483     }
01484     else if (s != NULL || r != NULL || octstr_len(msg->sms.boxc_id) > 0) {
01485         gw_rwlock_unlock(smsbox_list_rwlock);
01486         /*
01487          * we have routing defined, but no smsbox connected at the moment.
01488          * put msg into global incoming queue and wait until smsbox with
01489          * such boxc_id connected.
01490          */
01491         if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
01492             gwlist_produce(incoming_sms, msg);
01493             return 0;
01494         }
01495         else
01496             return -1;
01497     }
01498 
01499     /*
01500      * ok, none of the routing things applied previously, so route it to
01501      * a random smsbox.
01502      */
01503 
01504     /* take random smsbox from list, and then check all smsboxes
01505      * and select the one with lowest load level - if tied, the first
01506      * one
01507      */
01508     len = gwlist_len(smsbox_list);
01509     b = gw_rand() % len;
01510 
01511     for(i = 0; i < gwlist_len(smsbox_list); i++) {
01512     bc = gwlist_get(smsbox_list, (i+b) % len);
01513 
01514         if (bc->boxc_id != NULL || bc->routable == 0)
01515             bc = NULL;
01516 
01517         if (bc != NULL && max_incoming_sms_qlength > 0 &&
01518             gwlist_len(bc->incoming) > max_incoming_sms_qlength) {
01519             full_found = 1;
01520             bc = NULL;
01521         }
01522 
01523         if ((bc != NULL && best != NULL && bc->load < best->load) ||
01524              (bc != NULL && best == NULL)) {
01525         best = bc;
01526         }
01527     }
01528 
01529     if (best != NULL) {
01530         best->load++;
01531         gwlist_produce(best->incoming, msg);
01532     }
01533 
01534     gw_rwlock_unlock(smsbox_list_rwlock);
01535 
01536     if (best == NULL && full_found == 0) {
01537     warning(0, "smsbox_list empty!");
01538         if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
01539             gwlist_produce(incoming_sms, msg);
01540         return 0;
01541          }
01542          else
01543              return -1;
01544     }
01545     else if (best == NULL && full_found == 1)
01546         return -1;
01547 
01548     return 1;
01549 }
01550 
01551 static void sms_to_smsboxes(void *arg)
01552 {
01553     Msg *newmsg, *startmsg, *msg;
01554     long i, len;
01555     int ret = -1;
01556     Boxc *boxc;
01557 
01558     gwlist_add_producer(flow_threads);
01559 
01560     newmsg = startmsg = msg = NULL;
01561 
01562     while(bb_status != BB_DEAD) {
01563 
01564         if (newmsg == startmsg) {
01565             /* check if we are in shutdown phase */
01566             if (gwlist_producer_count(smsbox_list) == 0)
01567                 break;
01568 
01569             if (ret == 0 || ret == -1) {
01570                 /* debug("", 0, "time to sleep"); */
01571                 gwthread_sleep(60.0);
01572                 /* debug("", 0, "wake up list len %ld", gwlist_len(incoming_sms)); */
01573                 /* shutdown ? */
01574                 if (gwlist_producer_count(smsbox_list) == 0 && gwlist_len(smsbox_list) == 0)
01575                     break;
01576             }
01577             startmsg = msg = gwlist_consume(incoming_sms);
01578             /* debug("", 0, "gwlist_consume done 1"); */
01579             newmsg = NULL;
01580         }
01581         else {
01582             newmsg = msg = gwlist_consume(incoming_sms);
01583             
01584             /* Back at the first message? */
01585             if (newmsg == startmsg) {
01586                 gwlist_insert(incoming_sms, 0, msg);
01587                 continue;
01588             }
01589         }
01590 
01591         if (msg == NULL)
01592             break;
01593 
01594         gw_assert(msg_type(msg) == sms);
01595 
01596         /* debug("bb.sms", 0, "sms_boxc_router: handling message (%p vs %p)",
01597               msg, startmsg); */
01598 
01599         ret = route_incoming_to_boxc(msg);
01600         if (ret == 1)
01601             startmsg = newmsg = NULL;
01602         else if (ret == -1) {
01603             gwlist_produce(incoming_sms, msg);
01604         }
01605     }
01606 
01607     gw_rwlock_rdlock(smsbox_list_rwlock);
01608     len = gwlist_len(smsbox_list);
01609     for (i=0; i < len; i++) {
01610         boxc = gwlist_get(smsbox_list, i);
01611         gwlist_remove_producer(boxc->incoming);
01612     }
01613     gw_rwlock_unlock(smsbox_list_rwlock);
01614 
01615     gwlist_remove_producer(flow_threads);
01616 }
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.