Kannel: Open Source WAP and SMS gateway  $Revision: 5037 $
bb_boxc.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2016 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * bb_boxc.c : bearerbox box connection module
59  *
60  * handles start/restart/stop/suspend/die operations of the sms and
61  * wapbox connections
62  *
63  * Kalle Marjola 2000 for project Kannel
64  * Alexander Malysh (various fixes)
65  */
66 
67 #include <errno.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <time.h>
71 #include <string.h>
72 #include <sys/time.h>
73 #include <sys/types.h>
74 #include <sys/socket.h>
75 #include <unistd.h>
76 #include <signal.h>
77 
78 #include "gwlib/gwlib.h"
79 #include "msg.h"
80 #include "bearerbox.h"
81 #include "bb_smscconn_cb.h"
82 
83 #define SMSBOX_MAX_PENDING 100
84 
85 /* passed from bearerbox core */
86 
87 extern volatile sig_atomic_t bb_status;
88 extern volatile sig_atomic_t restart;
89 extern List *incoming_sms;
90 extern List *outgoing_sms;
91 extern List *incoming_wdp;
92 extern List *outgoing_wdp;
93 
94 extern List *flow_threads;
95 extern List *suspended;
96 
97 /* incoming/outgoing sms queue control */
98 extern long max_incoming_sms_qlength;
99 
100 
101 /* our own thingies */
102 
103 static volatile sig_atomic_t smsbox_running;
104 static volatile sig_atomic_t wapbox_running;
108 
109 /* dictionaries for holding the smsbox routing information */
114 
115 static long smsbox_port;
116 static int smsbox_port_ssl;
118 static long wapbox_port;
119 static int wapbox_port_ssl;
120 
121 /* max pending messages on the line to smsbox */
122 static long smsbox_max_pending;
123 
126 
127 
128 static Counter *boxid;
129 
130 /* sms_to_smsboxes thread-id */
131 static long sms_dequeue_thread;
132 
133 
134 typedef struct _boxc {
136  int is_wap;
137  long id;
138  int load;
139  time_t connect_time;
142  List *retry; /* If sending fails */
146  volatile sig_atomic_t alive;
147  Octstr *boxc_id; /* identifies the connected smsbox instance */
148  /* used to mark connection usable or still waiting for ident. msg */
149  volatile int routable;
150 } Boxc;
151 
152 
153 /* forward declaration */
154 static void sms_to_smsboxes(void *arg);
155 static int send_msg(Boxc *boxconn, Msg *pmsg);
156 static void boxc_sent_push(Boxc*, Msg*);
157 static void boxc_sent_pop(Boxc*, Msg*, Msg**);
158 static void boxc_gwlist_destroy(List *list);
159 
160 
161 /*-------------------------------------------------
162  * receiver thingies
163  */
164 
165 static Msg *read_from_box(Boxc *boxconn)
166 {
167  int ret;
168  Octstr *pack;
169  Msg *msg;
170 
171  pack = NULL;
172  while (bb_status != BB_DEAD && boxconn->alive) {
173  /* XXX: if box doesn't send (just keep conn open) we block here while shutdown */
174  pack = conn_read_withlen(boxconn->conn);
175  gw_claim_area(pack);
176  if (pack != NULL)
177  break;
178  if (conn_error(boxconn->conn)) {
179  info(0, "Read error when reading from box <%s>, disconnecting",
180  octstr_get_cstr(boxconn->client_ip));
181  return NULL;
182  }
183  if (conn_eof(boxconn->conn)) {
184  info(0, "Connection closed by the box <%s>",
185  octstr_get_cstr(boxconn->client_ip));
186  return NULL;
187  }
188 
189  ret = conn_wait(boxconn->conn, -1.0);
190  if (ret < 0) {
191  error(0, "Connection to box <%s> broke.",
192  octstr_get_cstr(boxconn->client_ip));
193  return NULL;
194  }
195  }
196 
197  if (pack == NULL)
198  return NULL;
199 
200  msg = msg_unpack(pack);
201  octstr_destroy(pack);
202 
203  if (msg == NULL)
204  error(0, "Failed to unpack data!");
205  return msg;
206 }
207 
208 
209 /*
210  * Try to deliver message to internal or smscconn queue
211  * and generate ack/nack for smsbox connections.
212  */
213 static void deliver_sms_to_queue(Msg *msg, Boxc *conn)
214 {
215  Msg *mack;
216  int rc;
217 
218  /*
219  * save modifies ID and time, so if the smsbox uses it, save
220  * it FIRST for the reply message!!!
221  */
222  mack = msg_create(ack);
223  gw_assert(mack != NULL);
224  uuid_copy(mack->ack.id, msg->sms.id);
225  mack->ack.time = msg->sms.time;
226 
227  store_save(msg);
228 
229  rc = smsc2_rout(msg, 0);
230  switch (rc) {
231 
232  case SMSCCONN_SUCCESS:
233  mack->ack.nack = ack_success;
234  break;
235 
236  case SMSCCONN_QUEUED:
237  mack->ack.nack = ack_buffered;
238  break;
239 
240  case SMSCCONN_FAILED_DISCARDED: /* no router at all */
241  warning(0, "Message rejected by bearerbox, no router!");
242 
243  /*
244  * we don't store_save_ack() here, since the call to
245  * bb_smscconn_send_failed() within smsc2_route() did
246  * it already.
247  */
248  mack->ack.nack = ack_failed;
249 
250  /* destroy original message */
251  msg_destroy(msg);
252  break;
253 
254  case SMSCCONN_FAILED_QFULL: /* queue full */
255  warning(0, "Message rejected by bearerbox, %s!",
256  (rc == SMSCCONN_FAILED_DISCARDED) ? "no router" : "queue full");
257  /*
258  * first create nack for store-file, in order to delete
259  * message from store-file.
260  */
261  mack->ack.nack = ack_failed_tmp;
263 
264  /* destroy original message */
265  msg_destroy(msg);
266  break;
267 
268  case SMSCCONN_FAILED_EXPIRED: /* validity expired */
269  warning(0, "Message rejected by bearerbox, validity expired!");
270 
271  /*
272  * we don't store_save_ack() here, since the call to
273  * bb_smscconn_send_failed() within smsc2_route() did
274  * it already.
275  */
276  mack->ack.nack = ack_failed;
277 
278  /* destroy original message */
279  msg_destroy(msg);
280  break;
281 
282  case SMSCCONN_FAILED_REJECTED: /* white/black-list rejection */
283  warning(0, "Message rejected by bearerbox, white/black listed!");
284 
285  mack->ack.nack = ack_failed;
286 
287  /* destroy original message */
288  msg_destroy(msg);
289  break;
290 
291  default:
292  break;
293  }
294 
295  /* put ack into incoming queue of conn */
296  send_msg(conn, mack);
297  msg_destroy(mack);
298 }
299 
300 
301 static void boxc_receiver(void *arg)
302 {
303  Boxc *conn = arg;
304  Msg *msg, *mack;
305 
306  /* remove messages from socket until it is closed */
307  while (bb_status != BB_DEAD && conn->alive) {
308 
309  gwlist_consume(suspended); /* block here if suspended */
310 
311  msg = read_from_box(conn);
312 
313  if (msg == NULL) { /* garbage/connection lost */
314  conn->alive = 0;
315  break;
316  }
317 
318  /* we don't accept new messages in shutdown phase */
319  if ((bb_status == BB_SHUTDOWN || bb_status == BB_DEAD) && msg_type(msg) == sms) {
320  mack = msg_create(ack);
321  uuid_copy(mack->ack.id, msg->sms.id);
322  mack->ack.time = msg->sms.time;
323  mack->ack.nack = ack_failed_tmp;
324  msg_destroy(msg);
325  send_msg(conn, mack);
326  msg_destroy(mack);
327  continue;
328  }
329 
330  if (msg_type(msg) == sms && conn->is_wap == 0) {
331  debug("bb.boxc", 0, "boxc_receiver: sms received");
332 
333  /* deliver message to queue */
334  deliver_sms_to_queue(msg, conn);
335 
336  if (conn->routable == 0) {
337  conn->routable = 1;
338  /* wakeup the dequeue thread */
340  }
341  } else if (msg_type(msg) == wdp_datagram && conn->is_wap) {
342  debug("bb.boxc", 0, "boxc_receiver: got wdp from wapbox");
343 
344  /* XXX we should block these in SHUTDOWN phase too, but
345  we need ack/nack msgs implemented first. */
346  gwlist_produce(conn->outgoing, msg);
347 
348  } else if (msg_type(msg) == sms && conn->is_wap) {
349  debug("bb.boxc", 0, "boxc_receiver: got sms from wapbox");
350 
351  /* should be a WAP push message, so tried it the same way */
352  deliver_sms_to_queue(msg, conn);
353 
354  if (conn->routable == 0) {
355  conn->routable = 1;
356  /* wakeup the dequeue thread */
358  }
359  } else {
360  if (msg_type(msg) == heartbeat) {
361  if (msg->heartbeat.load != conn->load)
362  debug("bb.boxc", 0, "boxc_receiver: heartbeat with "
363  "load value %ld received", msg->heartbeat.load);
364  conn->load = msg->heartbeat.load;
365  }
366  else if (msg_type(msg) == ack) {
367  if (msg->ack.nack == ack_failed_tmp) {
368  Msg *orig;
369  boxc_sent_pop(conn, msg, &orig);
370  if (orig != NULL) /* retry this message */
371  gwlist_append(conn->retry, orig);
372  } else {
373  boxc_sent_pop(conn, msg, NULL);
374  store_save(msg);
375  }
376  debug("bb.boxc", 0, "boxc_receiver: got ack");
377  }
378  /* if this is an identification message from an smsbox instance */
379  else if (msg_type(msg) == admin && msg->admin.command == cmd_identify) {
380 
381  /*
382  * any smsbox sends this command even if boxc_id is NULL,
383  * but we will only consider real identified boxes
384  */
385  if (msg->admin.boxc_id != NULL) {
386 
387  /* Only interested if the connection is not named, or its a different name */
388  if (conn->boxc_id == NULL ||
389  octstr_compare(conn->boxc_id, msg->admin.boxc_id)) {
390  List *boxc_id_list = NULL;
391 
392  /*
393  * Different name, need to remove it from the old list.
394  *
395  * I Don't think this case should ever arise, but might as well
396  * be safe.
397  */
398  if (conn->boxc_id != NULL) {
399 
400  /* Get the list for this box id */
401  boxc_id_list = dict_get(smsbox_by_id, conn->boxc_id);
402 
403  /* Delete the connection from the list */
404  if (boxc_id_list != NULL) {
405  gwlist_delete_equal(boxc_id_list, conn);
406  }
407 
408  octstr_destroy(conn->boxc_id);
409  }
410 
411  /* Get the list for this box id */
412  boxc_id_list = dict_get(smsbox_by_id, msg->admin.boxc_id);
413 
414  /* No list yet, so create it */
415  if (boxc_id_list == NULL) {
416  boxc_id_list = gwlist_create();
417  if (!dict_put_once(smsbox_by_id, msg->admin.boxc_id, boxc_id_list))
418  /* list already added */
419  boxc_id_list = dict_get(smsbox_by_id, msg->admin.boxc_id);
420  }
421 
422  /* Add the connection into the list */
423  gwlist_append(boxc_id_list, conn);
424 
425  conn->boxc_id = msg->admin.boxc_id;
426  }
427  else {
428  octstr_destroy(msg->admin.boxc_id);
429  }
430 
431  msg->admin.boxc_id = NULL;
432 
433  debug("bb.boxc", 0, "boxc_receiver: got boxc_id <%s> from <%s>",
434  octstr_get_cstr(conn->boxc_id),
435  octstr_get_cstr(conn->client_ip));
436  }
437 
438  conn->routable = 1;
439  /* wakeup the dequeue thread */
441  }
442  else
443  warning(0, "boxc_receiver: unknown msg received from <%s>, "
444  "ignored", octstr_get_cstr(conn->client_ip));
445  msg_destroy(msg);
446  }
447  }
448 }
449 
450 
451 /*---------------------------------------------
452  * sender thingies
453  */
454 
455 static int send_msg(Boxc *boxconn, Msg *pmsg)
456 {
457  Octstr *pack;
458 
459  pack = msg_pack(pmsg);
460 
461  if (pack == NULL)
462  return -1;
463 
464  if (boxconn->boxc_id != NULL)
465  debug("bb.boxc", 0, "send_msg: sending msg to boxc: <%s>",
466  octstr_get_cstr(boxconn->boxc_id));
467  else
468  debug("bb.boxc", 0, "send_msg: sending msg to box: <%s>",
469  octstr_get_cstr(boxconn->client_ip));
470 
471  if (conn_write_withlen(boxconn->conn, pack) == -1) {
472  error(0, "Couldn't write Msg to box <%s>, disconnecting",
473  octstr_get_cstr(boxconn->client_ip));
474  octstr_destroy(pack);
475  return -1;
476  }
477 
478  octstr_destroy(pack);
479  return 0;
480 }
481 
482 
483 static void boxc_sent_push(Boxc *conn, Msg *m)
484 {
485  Octstr *os;
486  char id[UUID_STR_LEN + 1];
487 
488  if (conn->is_wap || !conn->sent || !m || msg_type(m) != sms)
489  return;
490 
491  uuid_unparse(m->sms.id, id);
492  os = octstr_create(id);
493  dict_put(conn->sent, os, msg_duplicate(m));
494  semaphore_down(conn->pending);
495  octstr_destroy(os);
496 }
497 
498 
499 /*
500  * Remove msg from sent queue.
501  * Return 0 if message should be deleted from store and 1 if not (e.g. tmp nack)
502  */
503 static void boxc_sent_pop(Boxc *conn, Msg *m, Msg **orig)
504 {
505  Octstr *os;
506  char id[UUID_STR_LEN + 1];
507  Msg *msg;
508 
509  if (conn->is_wap || !conn->sent || !m || (msg_type(m) != ack && msg_type(m) != sms))
510  return;
511 
512  if (orig != NULL)
513  *orig = NULL;
514 
515  uuid_unparse((msg_type(m) == sms ? m->sms.id : m->ack.id), id);
516  os = octstr_create(id);
517  msg = dict_remove(conn->sent, os);
518  octstr_destroy(os);
519  if (!msg) {
520  error(0, "BOXC: Got ack for nonexistend message!");
521  msg_dump(m, 0);
522  return;
523  }
524  semaphore_up(conn->pending);
525  if (orig == NULL)
526  msg_destroy(msg);
527  else
528  *orig = msg;
529 }
530 
531 
532 static void boxc_sender(void *arg)
533 {
534  Msg *msg;
535  Boxc *conn = arg;
536 
537  gwlist_add_producer(flow_threads);
538 
539  while (bb_status != BB_DEAD && conn->alive) {
540 
541  /*
542  * Make sure there's no data left in the outgoing connection before
543  * doing the potentially blocking gwlist_consume()s
544  */
545  conn_flush(conn->conn);
546 
547  gwlist_consume(suspended); /* block here if suspended */
548 
549  if ((msg = gwlist_consume(conn->incoming)) == NULL) {
550  /* tell sms/wapbox to die */
551  msg = msg_create(admin);
552  msg->admin.command = restart ? cmd_restart : cmd_shutdown;
553  send_msg(conn, msg);
554  msg_destroy(msg);
555  break;
556  }
557  if (msg_type(msg) == heartbeat) {
558  debug("bb.boxc", 0, "boxc_sender: catch an heartbeat - we are alive");
559  msg_destroy(msg);
560  continue;
561  }
562  boxc_sent_push(conn, msg);
563  if (!conn->alive || send_msg(conn, msg) == -1) {
564  /* we got message here */
565  boxc_sent_pop(conn, msg, NULL);
566  gwlist_produce(conn->retry, msg);
567  break;
568  }
569  msg_destroy(msg);
570  debug("bb.boxc", 0, "boxc_sender: sent message to <%s>",
571  octstr_get_cstr(conn->client_ip));
572  }
573  /* the client closes the connection, after that die in receiver */
574  /* conn->alive = 0; */
575 
576  /* set conn to unroutable */
577  conn->routable = 0;
578 
579  gwlist_remove_producer(flow_threads);
580 }
581 
582 /*---------------------------------------------------------------
583  * accept/create/kill thingies
584  */
585 
586 
587 static Boxc *boxc_create(int fd, Octstr *ip, int ssl)
588 {
589  Boxc *boxc;
590 
591  boxc = gw_malloc(sizeof(Boxc));
592  boxc->is_wap = 0;
593  boxc->load = 0;
594  boxc->conn = conn_wrap_fd(fd, ssl);
595  boxc->id = counter_increase(boxid);
596  boxc->client_ip = ip;
597  boxc->alive = 1;
598  boxc->connect_time = time(NULL);
599  boxc->boxc_id = NULL;
600  boxc->routable = 0;
601  return boxc;
602 }
603 
604 static void boxc_destroy(Boxc *boxc)
605 {
606  if (boxc == NULL)
607  return;
608 
609  /* do nothing to the lists, as they are only references */
610 
611  if (boxc->conn)
612  conn_destroy(boxc->conn);
613  octstr_destroy(boxc->client_ip);
614  octstr_destroy(boxc->boxc_id);
615  gw_free(boxc);
616 }
617 
618 
619 
620 static Boxc *accept_boxc(int fd, int ssl)
621 {
622  Boxc *newconn;
623  Octstr *ip;
624 
625  int newfd;
626  struct sockaddr_in client_addr;
627  socklen_t client_addr_len;
628 
629  client_addr_len = sizeof(client_addr);
630 
631  newfd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
632  if (newfd < 0)
633  return NULL;
634 
635  ip = host_ip(client_addr);
636 
637  if (is_allowed_ip(box_allow_ip, box_deny_ip, ip) == 0) {
638  info(0, "Box connection tried from denied host <%s>, disconnected",
639  octstr_get_cstr(ip));
640  octstr_destroy(ip);
641  close(newfd);
642  return NULL;
643  }
644  newconn = boxc_create(newfd, ip, ssl);
645 
646  /*
647  * check if the SSL handshake was successfull, otherwise
648  * this is no valid box connection any more
649  */
650 #ifdef HAVE_LIBSSL
651  if (ssl && !conn_get_ssl(newconn->conn))
652  return NULL;
653 #endif
654 
655  info(0, "Client connected from <%s> %s", octstr_get_cstr(ip), ssl?"using SSL":"");
656 
657  /* XXX TODO: do the hand-shake, baby, yeah-yeah! */
658 
659  return newconn;
660 }
661 
662 
663 
664 static void run_smsbox(void *arg)
665 {
666  Boxc *newconn;
667  long sender;
668  Msg *msg;
669  List *keys;
670  Octstr *key;
671 
672  gwlist_add_producer(flow_threads);
673  newconn = arg;
674  newconn->incoming = gwlist_create();
675  gwlist_add_producer(newconn->incoming);
676  newconn->retry = incoming_sms;
677  newconn->outgoing = outgoing_sms;
678  newconn->sent = dict_create(smsbox_max_pending, NULL);
680 
681  sender = gwthread_create(boxc_sender, newconn);
682  if (sender == -1) {
683  error(0, "Failed to start a new thread, disconnecting client <%s>",
684  octstr_get_cstr(newconn->client_ip));
685  goto cleanup;
686  }
687  /*
688  * We register newconn in the smsbox_list here but mark newconn as routable
689  * after identification or first message received from smsbox. So we can avoid
690  * a race condition for routable smsboxes (otherwise between startup and
691  * registration we will forward some messages to smsbox).
692  */
693  gw_rwlock_wrlock(smsbox_list_rwlock);
694  gwlist_append(smsbox_list, newconn);
695  gw_rwlock_unlock(smsbox_list_rwlock);
696 
697  gwlist_add_producer(newconn->outgoing);
698  boxc_receiver(newconn);
700 
701  /* remove us from smsbox routing list */
702  gw_rwlock_wrlock(smsbox_list_rwlock);
703  gwlist_delete_equal(smsbox_list, newconn);
704  if (newconn->boxc_id) {
705 
706  /* Get the list, and remove the connection from it */
707  List *boxc_id_list = dict_get(smsbox_by_id, newconn->boxc_id);
708 
709  if(boxc_id_list != NULL) {
710  gwlist_delete_equal(boxc_id_list, newconn);
711  }
712  }
713 
714  gw_rwlock_unlock(smsbox_list_rwlock);
715 
716  /*
717  * check if we in the shutdown phase and sms dequeueing thread
718  * has removed the producer already
719  */
720  if (gwlist_producer_count(newconn->incoming) > 0)
722 
723  /* check if we are still waiting for ack's and semaphore locked */
724  if (dict_key_count(newconn->sent) >= smsbox_max_pending)
725  semaphore_up(newconn->pending); /* allow sender to go down */
726 
727  gwthread_join(sender);
728 
729  /* put not acked msgs into incoming queue */
730  keys = dict_keys(newconn->sent);
731  while((key = gwlist_extract_first(keys)) != NULL) {
732  msg = dict_remove(newconn->sent, key);
733  gwlist_produce(incoming_sms, msg);
734  octstr_destroy(key);
735  }
736  gw_assert(gwlist_len(keys) == 0);
738 
739  /* clear our send queue */
740  while((msg = gwlist_extract_first(newconn->incoming)) != NULL) {
741  gwlist_produce(incoming_sms, msg);
742  }
743 
744 cleanup:
745  gw_assert(gwlist_len(newconn->incoming) == 0);
746  gwlist_destroy(newconn->incoming, NULL);
747  gw_assert(dict_key_count(newconn->sent) == 0);
748  dict_destroy(newconn->sent);
749  semaphore_destroy(newconn->pending);
750  boxc_destroy(newconn);
751 
752  /* wakeup the dequeueing thread */
754 
755  gwlist_remove_producer(flow_threads);
756 }
757 
758 
759 
760 static void run_wapbox(void *arg)
761 {
762  Boxc *newconn;
763  List *newlist;
764  long sender;
765 
766  gwlist_add_producer(flow_threads);
767  newconn = arg;
768  newconn->is_wap = 1;
769 
770  /*
771  * create a new incoming list for just that box,
772  * and add it to list of list pointers, so we can start
773  * to route messages to it.
774  */
775 
776  debug("bb", 0, "setting up systems for new wapbox");
777 
778  newlist = gwlist_create();
779  /* this is released by the sender/receiver if it exits */
780  gwlist_add_producer(newlist);
781 
782  newconn->incoming = newlist;
783  newconn->retry = incoming_wdp;
784  newconn->outgoing = outgoing_wdp;
785 
786  sender = gwthread_create(boxc_sender, newconn);
787  if (sender == -1) {
788  error(0, "Failed to start a new thread, disconnecting client <%s>",
789  octstr_get_cstr(newconn->client_ip));
790  goto cleanup;
791  }
792  gwlist_append(wapbox_list, newconn);
793  gwlist_add_producer(newconn->outgoing);
794  boxc_receiver(newconn);
795 
796  /* cleanup after receiver has exited */
797 
799  gwlist_lock(wapbox_list);
800  gwlist_delete_equal(wapbox_list, newconn);
801  gwlist_unlock(wapbox_list);
802 
803  while (gwlist_producer_count(newlist) > 0)
804  gwlist_remove_producer(newlist);
805 
806  newconn->alive = 0;
807 
808  gwthread_join(sender);
809 
810 cleanup:
811  gw_assert(gwlist_len(newlist) == 0);
812  gwlist_destroy(newlist, NULL);
813  boxc_destroy(newconn);
814 
815  gwlist_remove_producer(flow_threads);
816 }
817 
818 
819 /*------------------------------------------------
820  * main single thread functions
821  */
822 
823 typedef struct _addrpar {
825  int port;
826  int wapboxid;
827 } AddrPar;
828 
829 static void ap_destroy(AddrPar *addr)
830 {
831  octstr_destroy(addr->address);
832  gw_free(addr);
833 }
834 
835 static int cmp_route(void *ap, void *ms)
836 {
837  AddrPar *addr = ap;
838  Msg *msg = ms;
839 
840  if (msg->wdp_datagram.source_port == addr->port &&
841  octstr_compare(msg->wdp_datagram.source_address, addr->address)==0)
842  return 1;
843 
844  return 0;
845 }
846 
847 static int cmp_boxc(void *bc, void *ap)
848 {
849  Boxc *boxc = bc;
850  AddrPar *addr = ap;
851 
852  if (boxc->id == addr->wapboxid) return 1;
853  return 0;
854 }
855 
856 static Boxc *route_msg(List *route_info, Msg *msg)
857 {
858  AddrPar *ap;
859  Boxc *conn, *best;
860  int i, b, len;
861 
862  ap = gwlist_search(route_info, msg, cmp_route);
863  if (ap == NULL) {
864  debug("bb.boxc", 0, "Did not find previous routing info for WDP, "
865  "generating new");
866 route:
867 
868  if (gwlist_len(wapbox_list) == 0)
869  return NULL;
870 
871  gwlist_lock(wapbox_list);
872 
873  /* take random wapbox from list, and then check all wapboxes
874  * and select the one with lowest load level - if tied, the first
875  * one
876  */
877  len = gwlist_len(wapbox_list);
878  b = gw_rand() % len;
879  best = gwlist_get(wapbox_list, b);
880 
881  for(i = 0; i < gwlist_len(wapbox_list); i++) {
882  conn = gwlist_get(wapbox_list, (i+b) % len);
883  if (conn != NULL && best != NULL)
884  if (conn->load < best->load)
885  best = conn;
886  }
887  if (best == NULL) {
888  warning(0, "wapbox_list empty!");
889  gwlist_unlock(wapbox_list);
890  return NULL;
891  }
892  conn = best;
893  conn->load++; /* simulate new client until we get new values */
894 
895  ap = gw_malloc(sizeof(AddrPar));
896  ap->address = octstr_duplicate(msg->wdp_datagram.source_address);
897  ap->port = msg->wdp_datagram.source_port;
898  ap->wapboxid = conn->id;
899  gwlist_produce(route_info, ap);
900 
901  gwlist_unlock(wapbox_list);
902  } else
903  conn = gwlist_search(wapbox_list, ap, cmp_boxc);
904 
905  if (conn == NULL) {
906  /* routing failed; wapbox has disappeared!
907  * ..remove routing info and re-route */
908 
909  debug("bb.boxc", 0, "Old wapbox has disappeared, re-routing");
910 
911  gwlist_delete_equal(route_info, ap);
912  ap_destroy(ap);
913  goto route;
914  }
915  return conn;
916 }
917 
918 
919 /*
920  * this thread listens to incoming_wdp list
921  * and then routs messages to proper wapbox
922  */
923 static void wdp_to_wapboxes(void *arg)
924 {
925  List *route_info;
926  AddrPar *ap;
927  Boxc *conn;
928  Msg *msg;
929  int i;
930 
931  gwlist_add_producer(flow_threads);
932  gwlist_add_producer(wapbox_list);
933 
934  route_info = gwlist_create();
935 
936 
937  while(bb_status != BB_DEAD) {
938 
939  gwlist_consume(suspended); /* block here if suspended */
940 
941  if ((msg = gwlist_consume(incoming_wdp)) == NULL)
942  break;
943 
944  gw_assert(msg_type(msg) == wdp_datagram);
945 
946  conn = route_msg(route_info, msg);
947  if (conn == NULL) {
948  warning(0, "Cannot route message, discard it");
949  msg_destroy(msg);
950  continue;
951  }
952  gwlist_produce(conn->incoming, msg);
953  }
954  debug("bb", 0, "wdp_to_wapboxes: destroying lists");
955  while((ap = gwlist_extract_first(route_info)) != NULL)
956  ap_destroy(ap);
957 
958  gw_assert(gwlist_len(route_info) == 0);
959  gwlist_destroy(route_info, NULL);
960 
961  gwlist_lock(wapbox_list);
962  for(i=0; i < gwlist_len(wapbox_list); i++) {
963  conn = gwlist_get(wapbox_list, i);
965  conn->alive = 0;
966  }
967  gwlist_unlock(wapbox_list);
968 
969  gwlist_remove_producer(wapbox_list);
970  gwlist_remove_producer(flow_threads);
971 }
972 
973 
974 static void wait_for_connections(int fd, void (*function) (void *arg),
975  List *waited, int ssl)
976 {
977  int ret;
978  int timeout = 10; /* 10 sec. */
979 
980  gw_assert(function != NULL);
981 
982  while(bb_status != BB_DEAD) {
983 
984  /* if we are being shutdowned, as long as there is
985  * messages in incoming list allow new connections, but when
986  * list is empty, exit.
987  * Note: We have timeout (defined above) for which we allow new connections.
988  * Otherwise we wait here for ever!
989  */
990  if (bb_status == BB_SHUTDOWN) {
991  ret = gwlist_wait_until_nonempty(waited);
992  if (ret == -1 || !timeout)
993  break;
994  else
995  timeout--;
996  }
997 
998  /* block here if suspended */
999  gwlist_consume(suspended);
1000 
1001  ret = gwthread_pollfd(fd, POLLIN, 1.0);
1002  if (ret > 0) {
1003  Boxc *newconn = accept_boxc(fd, ssl);
1004  if (newconn != NULL) {
1005  gwthread_create(function, newconn);
1006  gwthread_sleep(1.0);
1007  } else {
1008  error(0, "Failed to create new boxc connection.");
1009  }
1010  } else if (ret < 0 && errno != EINTR && errno != EAGAIN)
1011  error(errno, "bb_boxc::wait_for_connections failed");
1012  }
1013 }
1014 
1015 
1016 
1017 static void smsboxc_run(void *arg)
1018 {
1019  int fd;
1020 
1021  gwlist_add_producer(flow_threads);
1023 
1024  fd = make_server_socket(smsbox_port, smsbox_interface ? octstr_get_cstr(smsbox_interface) : NULL);
1025  /* XXX add interface_name if required */
1026 
1027  if (fd < 0) {
1028  panic(0, "Could not open smsbox port %ld", smsbox_port);
1029  }
1030 
1031  /*
1032  * infinitely wait for new connections;
1033  * to shut down the system, SIGTERM is send and then
1034  * select drops with error, so we can check the status
1035  */
1036  wait_for_connections(fd, run_smsbox, incoming_sms, smsbox_port_ssl);
1037 
1038  gwlist_remove_producer(smsbox_list);
1039 
1040  /* continue avalanche */
1041  gwlist_remove_producer(outgoing_sms);
1042 
1043  /* all connections do the same, so that all must remove() before it
1044  * is completely over
1045  */
1046  while(gwlist_wait_until_nonempty(smsbox_list) == 1)
1047  gwthread_sleep(1.0);
1048 
1049  /* close listen socket */
1050  close(fd);
1051 
1054 
1055  gwlist_destroy(smsbox_list, NULL);
1056  smsbox_list = NULL;
1057  gw_rwlock_destroy(smsbox_list_rwlock);
1058  smsbox_list_rwlock = NULL;
1059 
1060  /* destroy things related to smsbox routing */
1061  dict_destroy(smsbox_by_id);
1062  smsbox_by_id = NULL;
1063  dict_destroy(smsbox_by_smsc);
1064  smsbox_by_smsc = NULL;
1065  dict_destroy(smsbox_by_receiver);
1066  smsbox_by_receiver = NULL;
1067  dict_destroy(smsbox_by_smsc_receiver);
1068  smsbox_by_smsc_receiver = NULL;
1069 
1070  gwlist_remove_producer(flow_threads);
1071 }
1072 
1073 
1074 static void wapboxc_run(void *arg)
1075 {
1076  int fd, port;
1077 
1078  gwlist_add_producer(flow_threads);
1080  port = (int) *((long*)arg);
1081 
1082  fd = make_server_socket(port, NULL);
1083  /* XXX add interface_name if required */
1084 
1085  if (fd < 0) {
1086  panic(0, "Could not open wapbox port %d", port);
1087  }
1088 
1089  wait_for_connections(fd, run_wapbox, incoming_wdp, wapbox_port_ssl);
1090 
1091  /* continue avalanche */
1092 
1093  gwlist_remove_producer(outgoing_wdp);
1094 
1095 
1096  /* wait for all connections to die and then remove list
1097  */
1098 
1099  while(gwlist_wait_until_nonempty(wapbox_list) == 1)
1100  gwthread_sleep(1.0);
1101 
1102  /* wait for wdp_to_wapboxes to exit */
1103  while(gwlist_consume(wapbox_list)!=NULL)
1104  ;
1105 
1106  /* close listen socket */
1107  close(fd);
1108 
1109  gwlist_destroy(wapbox_list, NULL);
1110  wapbox_list = NULL;
1111 
1112  gwlist_remove_producer(flow_threads);
1113 }
1114 
1115 
1116 /*
1117  * Populates the corresponding smsbox_by_foobar dictionary hash tables
1118  */
1120 {
1121  CfgGroup *grp;
1122  List *list, *items;
1123  Octstr *boxc_id, *smsc_ids, *shortcuts;
1124  int i, j;
1125 
1126  boxc_id = smsc_ids = shortcuts = NULL;
1127 
1128  list = cfg_get_multi_group(cfg, octstr_imm("smsbox-route"));
1129 
1130  /* loop multi-group "smsbox-route" */
1131  while (list && (grp = gwlist_extract_first(list)) != NULL) {
1132 
1133  if ((boxc_id = cfg_get(grp, octstr_imm("smsbox-id"))) == NULL) {
1134  grp_dump(grp);
1135  panic(0,"'smsbox-route' group without valid 'smsbox-id' directive!");
1136  }
1137 
1138  /*
1139  * If smsc-id is given, then any message comming from the specified
1140  * smsc-id in the list will be routed to this smsbox instance.
1141  * If shortcode is given, then any message with receiver number
1142  * matching those will be routed to this smsbox instance.
1143  * If both are given, then only receiver within shortcode originating
1144  * from smsc-id list will be routed to this smsbox instance. So if both
1145  * are present then this is a logical AND operation.
1146  */
1147  smsc_ids = cfg_get(grp, octstr_imm("smsc-id"));
1148  shortcuts = cfg_get(grp, octstr_imm("shortcode"));
1149 
1150  /* consider now the 3 possibilities: */
1151  if (smsc_ids && !shortcuts) {
1152  /* smsc-id only, so all MO traffic */
1153  items = octstr_split(smsc_ids, octstr_imm(";"));
1154  for (i = 0; i < gwlist_len(items); i++) {
1155  Octstr *item = gwlist_get(items, i);
1156  octstr_strip_blanks(item);
1157 
1158  debug("bb.boxc",0,"Adding smsbox routing to id <%s> for smsc id <%s>",
1159  octstr_get_cstr(boxc_id), octstr_get_cstr(item));
1160 
1161  if (!dict_put_once(smsbox_by_smsc, item, octstr_duplicate(boxc_id)))
1162  panic(0, "Routing for smsc-id <%s> already exists!",
1163  octstr_get_cstr(item));
1164  }
1166  octstr_destroy(smsc_ids);
1167  }
1168  else if (!smsc_ids && shortcuts) {
1169  /* shortcode only, so these MOs from all smscs */
1170  items = octstr_split(shortcuts, octstr_imm(";"));
1171  for (i = 0; i < gwlist_len(items); i++) {
1172  Octstr *item = gwlist_get(items, i);
1173  octstr_strip_blanks(item);
1174 
1175  debug("bb.boxc",0,"Adding smsbox routing to id <%s> for receiver no <%s>",
1176  octstr_get_cstr(boxc_id), octstr_get_cstr(item));
1177 
1178  if (!dict_put_once(smsbox_by_receiver, item, octstr_duplicate(boxc_id)))
1179  panic(0, "Routing for receiver no <%s> already exists!",
1180  octstr_get_cstr(item));
1181  }
1183  octstr_destroy(shortcuts);
1184  }
1185  else if (smsc_ids && shortcuts) {
1186  /* both, so only specified MOs from specified smscs */
1187  items = octstr_split(shortcuts, octstr_imm(";"));
1188  for (i = 0; i < gwlist_len(items); i++) {
1189  List *subitems;
1190  Octstr *item = gwlist_get(items, i);
1191  octstr_strip_blanks(item);
1192  subitems = octstr_split(smsc_ids, octstr_imm(";"));
1193  for (j = 0; j < gwlist_len(subitems); j++) {
1194  Octstr *subitem = gwlist_get(subitems, j);
1195  octstr_strip_blanks(subitem);
1196 
1197  debug("bb.boxc",0,"Adding smsbox routing to id <%s> "
1198  "for receiver no <%s> and smsc id <%s>",
1199  octstr_get_cstr(boxc_id), octstr_get_cstr(item),
1200  octstr_get_cstr(subitem));
1201 
1202  /* construct the dict key '<shortcode>:<smsc-id>' */
1203  octstr_insert(subitem, item, 0);
1204  octstr_insert_char(subitem, octstr_len(item), ':');
1205  if (!dict_put_once(smsbox_by_smsc_receiver, subitem, octstr_duplicate(boxc_id)))
1206  panic(0, "Routing for receiver:smsc <%s> already exists!",
1207  octstr_get_cstr(subitem));
1208  }
1210  }
1212  octstr_destroy(shortcuts);
1213  }
1214  octstr_destroy(boxc_id);
1215  }
1216 
1217  gwlist_destroy(list, NULL);
1218 }
1219 
1220 
1221 /*-------------------------------------------------------------
1222  * public functions
1223  *
1224  * SMSBOX
1225  */
1226 
1228 {
1229  CfgGroup *grp;
1230 
1231  if (smsbox_running) return -1;
1232 
1233  debug("bb", 0, "starting smsbox connection module");
1234 
1235  grp = cfg_get_single_group(cfg, octstr_imm("core"));
1236  if (cfg_get_integer(&smsbox_port, grp, octstr_imm("smsbox-port")) == -1) {
1237  error(0, "Missing smsbox-port variable, cannot start smsboxes");
1238  return -1;
1239  }
1240 #ifdef HAVE_LIBSSL
1241  cfg_get_bool(&smsbox_port_ssl, grp, octstr_imm("smsbox-port-ssl"));
1242 #endif /* HAVE_LIBSSL */
1243 
1244  if (smsbox_port_ssl)
1245  debug("bb", 0, "smsbox connection module is SSL-enabled");
1246 
1247  smsbox_interface = cfg_get(grp, octstr_imm("smsbox-interface"));
1248 
1249  if (cfg_get_integer(&smsbox_max_pending, grp, octstr_imm("smsbox-max-pending")) == -1) {
1251  info(0, "BOXC: 'smsbox-max-pending' not set, using default (%ld).", smsbox_max_pending);
1252  }
1253 
1254  box_allow_ip = cfg_get(grp, octstr_imm("box-allow-ip"));
1255  if (box_allow_ip == NULL)
1256  box_allow_ip = octstr_create("");
1257  box_deny_ip = cfg_get(grp, octstr_imm("box-deny-ip"));
1258  if (box_deny_ip == NULL)
1259  box_deny_ip = octstr_create("");
1260  if (box_allow_ip != NULL && box_deny_ip == NULL)
1261  info(0, "Box connection allowed IPs defined without any denied...");
1262 
1263  smsbox_list = gwlist_create(); /* have a list of connections */
1264  smsbox_list_rwlock = gw_rwlock_create();
1265  if (!boxid)
1266  boxid = counter_create();
1267 
1268  /* the smsbox routing specific inits */
1269  smsbox_by_id = dict_create(10, (void(*)(void *)) boxc_gwlist_destroy);
1270  smsbox_by_smsc = dict_create(30, (void(*)(void *)) octstr_destroy);
1271  smsbox_by_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
1272  smsbox_by_smsc_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
1273 
1274  /* load the defined smsbox routing rules */
1275  init_smsbox_routes(cfg);
1276 
1277  gwlist_add_producer(outgoing_sms);
1278  gwlist_add_producer(smsbox_list);
1279 
1280  smsbox_running = 1;
1281 
1282  if ((sms_dequeue_thread = gwthread_create(sms_to_smsboxes, NULL)) == -1)
1283  panic(0, "Failed to start a new thread for smsbox routing");
1284 
1285  if (gwthread_create(smsboxc_run, NULL) == -1)
1286  panic(0, "Failed to start a new thread for smsbox connections");
1287 
1288  return 0;
1289 }
1290 
1291 
1293 {
1294  if (!smsbox_running) return -1;
1295 
1296  /* send new config to clients */
1297 
1298  return 0;
1299 }
1300 
1301 
1302 
1303 /* WAPBOX */
1304 
1306 {
1307  CfgGroup *grp;
1308 
1309  if (wapbox_running) return -1;
1310 
1311  debug("bb", 0, "starting wapbox connection module");
1312 
1313  grp = cfg_get_single_group(cfg, octstr_imm("core"));
1314 
1315  if (cfg_get_integer(&wapbox_port, grp, octstr_imm("wapbox-port")) == -1) {
1316  error(0, "Missing wapbox-port variable, cannot start WAP");
1317  return -1;
1318  }
1319 #ifdef HAVE_LIBSSL
1320  cfg_get_bool(&wapbox_port_ssl, grp, octstr_imm("wapbox-port-ssl"));
1321 #endif /* HAVE_LIBSSL */
1322 
1323  box_allow_ip = cfg_get(grp, octstr_imm("box-allow-ip"));
1324  if (box_allow_ip == NULL)
1325  box_allow_ip = octstr_create("");
1326  box_deny_ip = cfg_get(grp, octstr_imm("box-deny-ip"));
1327  if (box_deny_ip == NULL)
1328  box_deny_ip = octstr_create("");
1329  if (box_allow_ip != NULL && box_deny_ip == NULL)
1330  info(0, "Box connection allowed IPs defined without any denied...");
1331 
1332  wapbox_list = gwlist_create(); /* have a list of connections */
1333  gwlist_add_producer(outgoing_wdp);
1334  if (!boxid)
1335  boxid = counter_create();
1336 
1337  if (gwthread_create(wdp_to_wapboxes, NULL) == -1)
1338  panic(0, "Failed to start a new thread for wapbox routing");
1339 
1341  panic(0, "Failed to start a new thread for wapbox connections");
1342 
1343  wapbox_running = 1;
1344  return 0;
1345 }
1346 
1347 
1348 Octstr *boxc_status(int status_type)
1349 {
1350  Octstr *tmp;
1351  char *lb, *ws;
1352  int i, boxes, para = 0;
1353  time_t orig, t;
1354  Boxc *bi;
1355 
1356  orig = time(NULL);
1357 
1358  /*
1359  * XXX: this will cause segmentation fault if this is called
1360  * between 'destroy_list and setting list to NULL calls.
1361  * Ok, this has to be fixed, but now I am too tired.
1362  */
1363 
1364  if ((lb = bb_status_linebreak(status_type))==NULL)
1365  return octstr_create("Un-supported format");
1366 
1367  if (status_type == BBSTATUS_HTML)
1368  ws = "&nbsp;&nbsp;&nbsp;&nbsp;";
1369  else if (status_type == BBSTATUS_TEXT)
1370  ws = " ";
1371  else
1372  ws = "";
1373 
1374  if (status_type == BBSTATUS_HTML || status_type == BBSTATUS_WML)
1375  para = 1;
1376 
1377  if (status_type == BBSTATUS_XML) {
1378  tmp = octstr_create ("");
1379  octstr_append_cstr(tmp, "<boxes>\n\t");
1380  }
1381  else
1382  tmp = octstr_format("%sBox connections:%s", para ? "<p>" : "", lb);
1383  boxes = 0;
1384 
1385  if (wapbox_list) {
1386  gwlist_lock(wapbox_list);
1387  for(i=0; i < gwlist_len(wapbox_list); i++) {
1388  bi = gwlist_get(wapbox_list, i);
1389  if (bi->alive == 0)
1390  continue;
1391  t = orig - bi->connect_time;
1392  if (status_type == BBSTATUS_XML)
1394  "<box>\n\t\t<type>wapbox</type>\n\t\t<IP>%s</IP>\n"
1395  "\t\t<status>on-line %ldd %ldh %ldm %lds</status>\n"
1396  "\t\t<ssl>%s</ssl>\n\t</box>\n",
1398  t/3600/24, t/3600%24, t/60%60, t%60,
1399 #ifdef HAVE_LIBSSL
1400  conn_get_ssl(bi->conn) != NULL ? "yes" : "no"
1401 #else
1402  "not installed"
1403 #endif
1404  );
1405  else
1407  "%swapbox, IP %s (on-line %ldd %ldh %ldm %lds) %s %s",
1408  ws, octstr_get_cstr(bi->client_ip),
1409  t/3600/24, t/3600%24, t/60%60, t%60,
1410 #ifdef HAVE_LIBSSL
1411  conn_get_ssl(bi->conn) != NULL ? "using SSL" : "",
1412 #else
1413  "",
1414 #endif
1415  lb);
1416  boxes++;
1417  }
1418  gwlist_unlock(wapbox_list);
1419  }
1420  if (smsbox_list) {
1421  gw_rwlock_rdlock(smsbox_list_rwlock);
1422  for(i=0; i < gwlist_len(smsbox_list); i++) {
1423  bi = gwlist_get(smsbox_list, i);
1424  if (bi->alive == 0)
1425  continue;
1426  t = orig - bi->connect_time;
1427  if (status_type == BBSTATUS_XML)
1428  octstr_format_append(tmp, "<box>\n\t\t<type>smsbox</type>\n"
1429  "\t\t<id>%s</id>\n\t\t<IP>%s</IP>\n"
1430  "\t\t<queue>%ld</queue>\n"
1431  "\t\t<status>on-line %ldd %ldh %ldm %lds</status>\n"
1432  "\t\t<ssl>%s</ssl>\n\t</box>",
1433  (bi->boxc_id ? octstr_get_cstr(bi->boxc_id) : ""),
1435  gwlist_len(bi->incoming) + dict_key_count(bi->sent),
1436  t/3600/24, t/3600%24, t/60%60, t%60,
1437 #ifdef HAVE_LIBSSL
1438  conn_get_ssl(bi->conn) != NULL ? "yes" : "no"
1439 #else
1440  "not installed"
1441 #endif
1442  );
1443  else
1444  octstr_format_append(tmp, "%ssmsbox:%s, IP %s (%ld queued), (on-line %ldd %ldh %ldm %lds) %s %s",
1445  ws, (bi->boxc_id ? octstr_get_cstr(bi->boxc_id) : "(none)"),
1446  octstr_get_cstr(bi->client_ip), gwlist_len(bi->incoming) + dict_key_count(bi->sent),
1447  t/3600/24, t/3600%24, t/60%60, t%60,
1448 #ifdef HAVE_LIBSSL
1449  conn_get_ssl(bi->conn) != NULL ? "using SSL" : "",
1450 #else
1451  "",
1452 #endif
1453  lb);
1454  boxes++;
1455  }
1456  gw_rwlock_unlock(smsbox_list_rwlock);
1457  }
1458  if (boxes == 0 && status_type != BBSTATUS_XML) {
1459  octstr_destroy(tmp);
1460  tmp = octstr_format("%sNo boxes connected", para ? "<p>" : "");
1461  }
1462  if (para)
1463  octstr_append_cstr(tmp, "</p>");
1464  if (status_type == BBSTATUS_XML)
1465  octstr_append_cstr(tmp, "</boxes>\n");
1466  else
1467  octstr_append_cstr(tmp, "\n\n");
1468  return tmp;
1469 }
1470 
1471 
1473 {
1474  int i, q = 0;
1475  Boxc *boxc;
1476 
1477  if (wapbox_list) {
1478  gwlist_lock(wapbox_list);
1479  for(i=0; i < gwlist_len(wapbox_list); i++) {
1480  boxc = gwlist_get(wapbox_list, i);
1481  q += gwlist_len(boxc->incoming);
1482  }
1483  gwlist_unlock(wapbox_list);
1484  }
1485  return q;
1486 }
1487 
1488 
1489 void boxc_cleanup(void)
1490 {
1491  octstr_destroy(box_allow_ip);
1492  octstr_destroy(box_deny_ip);
1493  box_allow_ip = NULL;
1494  box_deny_ip = NULL;
1495  counter_destroy(boxid);
1496  boxid = NULL;
1497  octstr_destroy(smsbox_interface);
1498  smsbox_interface = NULL;
1499 }
1500 
1501 
1502 /*
1503  * Route the incoming message to one of the following input queues:
1504  * a specific smsbox conn
1505  * a random smsbox conn if no shortcut routing and msg->sms.boxc_id match
1506  *
1507  * BEWARE: All logic inside here should be fast, hence speed processing
1508  * optimized, because every single MO message passes this function and we
1509  * have to ensure that no unncessary overhead is done.
1510  */
1512 {
1513  Boxc *bc = NULL;
1514  Octstr *s, *r, *rs, *boxc_id = NULL;
1515  long len, b, i;
1516  int full_found = 0;
1517 
1518  gw_assert(msg_type(msg) == sms);
1519 
1520  /* msg_dump(msg, 0); */
1521 
1522  /* Check we have at least one smsbox connected! */
1523  gw_rwlock_rdlock(smsbox_list_rwlock);
1524  if (gwlist_len(smsbox_list) == 0) {
1525  gw_rwlock_unlock(smsbox_list_rwlock);
1526  warning(0, "smsbox_list empty!");
1527  if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
1528  gwlist_produce(incoming_sms, msg);
1529  return 0;
1530  } else {
1531  return -1;
1532  }
1533  }
1534 
1535  /*
1536  * Do we have a specific smsbox-id route to pass this msg to?
1537  */
1538  if (octstr_len(msg->sms.boxc_id) > 0) {
1539  boxc_id = msg->sms.boxc_id;
1540  } else {
1541  /*
1542  * Check if we have a "smsbox-route" for this msg.
1543  * Where the shortcode route has a higher priority then the smsc-id rule.
1544  * Highest priority has the combined <shortcode>:<smsc-id> route.
1545  */
1546  Octstr *os = octstr_format("%s:%s",
1547  octstr_get_cstr(msg->sms.receiver),
1548  octstr_get_cstr(msg->sms.smsc_id));
1549  s = (msg->sms.smsc_id ? dict_get(smsbox_by_smsc, msg->sms.smsc_id) : NULL);
1550  r = (msg->sms.receiver ? dict_get(smsbox_by_receiver, msg->sms.receiver) : NULL);
1551  rs = (os ? dict_get(smsbox_by_smsc_receiver, os) : NULL);
1552  octstr_destroy(os);
1553 
1554  if (rs)
1555  boxc_id = rs;
1556  else if (r)
1557  boxc_id = r;
1558  else if (s)
1559  boxc_id = s;
1560  }
1561 
1562  /* We have a specific smsbox-id to use */
1563  if (boxc_id != NULL) {
1564 
1565  List *boxc_id_list = dict_get(smsbox_by_id, boxc_id);
1566  if (gwlist_len(boxc_id_list) == 0) {
1567  /*
1568  * something is wrong, this was the smsbox connection we used
1569  * for sending, so it seems this smsbox is gone
1570  */
1571  warning(0, "Could not route message to smsbox id <%s>, smsbox is gone!",
1572  octstr_get_cstr(boxc_id));
1573  gw_rwlock_unlock(smsbox_list_rwlock);
1574  if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
1575  gwlist_produce(incoming_sms, msg);
1576  return 0;
1577  } else {
1578  return -1;
1579  }
1580  }
1581 
1582  /*
1583  * Take random smsbox from list, as long as it has space we will use it,
1584  * otherwise check the next one.
1585  */
1586  len = gwlist_len(boxc_id_list);
1587  b = gw_rand() % len;
1588 
1589  for (i = 0; i < len; i++) {
1590  bc = gwlist_get(boxc_id_list, (i+b) % len);
1591 
1592  if (bc != NULL && max_incoming_sms_qlength > 0 &&
1594  bc = NULL;
1595  }
1596 
1597  if (bc != NULL) {
1598  break;
1599  }
1600  }
1601 
1602  if (bc != NULL) {
1603  bc->load++;
1604  gwlist_produce(bc->incoming, msg);
1605  gw_rwlock_unlock(smsbox_list_rwlock);
1606  return 1; /* we are done */
1607  }
1608  else {
1609  /*
1610  * we have routing defined, but no smsbox connected at the moment.
1611  * put msg into global incoming queue and wait until smsbox with
1612  * such boxc_id connected.
1613  */
1614  gw_rwlock_unlock(smsbox_list_rwlock);
1615  if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
1616  gwlist_produce(incoming_sms, msg);
1617  return 0;
1618  } else {
1619  return -1;
1620  }
1621  }
1622  }
1623 
1624  /*
1625  * Ok, none of the specific routing things applied previously,
1626  * so route it to a random smsbox.
1627  * Take random smsbox from list, as long as it has space we will
1628  * use it, therwise check the next one.
1629  */
1630  len = gwlist_len(smsbox_list);
1631  b = gw_rand() % len;
1632 
1633  for (i = 0; i < len; i++) {
1634  bc = gwlist_get(smsbox_list, (i+b) % len);
1635 
1636  if (bc->boxc_id != NULL || bc->routable == 0)
1637  bc = NULL;
1638 
1639  if (bc != NULL && max_incoming_sms_qlength > 0 &&
1641  full_found = 1;
1642  bc = NULL;
1643  }
1644 
1645  if (bc != NULL) {
1646  break;
1647  }
1648  }
1649 
1650  if (bc != NULL) {
1651  bc->load++;
1652  gwlist_produce(bc->incoming, msg);
1653  }
1654 
1655  gw_rwlock_unlock(smsbox_list_rwlock);
1656 
1657  if (bc == NULL && full_found == 0) {
1658  warning(0, "smsbox_list empty!");
1659  if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
1660  gwlist_produce(incoming_sms, msg);
1661  return 0;
1662  } else {
1663  return -1;
1664  }
1665  } else if (bc == NULL && full_found == 1) {
1666  return -1;
1667  }
1668 
1669  return 1;
1670 }
1671 
1672 
1673 static void sms_to_smsboxes(void *arg)
1674 {
1675  Msg *newmsg, *startmsg, *msg;
1676  long i, len;
1677  int ret = -1;
1678  Boxc *boxc;
1679 
1680  gwlist_add_producer(flow_threads);
1681 
1682  newmsg = startmsg = msg = NULL;
1683 
1684  while (bb_status != BB_SHUTDOWN && bb_status != BB_DEAD) {
1685 
1686  if (newmsg == startmsg) {
1687  /* check if we are in shutdown phase */
1688  if (gwlist_producer_count(smsbox_list) == 0)
1689  break;
1690 
1691  if (ret == 0 || ret == -1) {
1692  /* debug("", 0, "time to sleep"); */
1693  gwthread_sleep(60.0);
1694  /* debug("", 0, "wake up list len %ld", gwlist_len(incoming_sms)); */
1695  /* shutdown ? */
1696  if (gwlist_producer_count(smsbox_list) == 0 && gwlist_len(smsbox_list) == 0)
1697  break;
1698  }
1699  startmsg = msg = gwlist_consume(incoming_sms);
1700  /* debug("", 0, "gwlist_consume done 1"); */
1701  newmsg = NULL;
1702  }
1703  else {
1704  newmsg = msg = gwlist_consume(incoming_sms);
1705 
1706  /* Back at the first message? */
1707  if (newmsg == startmsg) {
1708  gwlist_insert(incoming_sms, 0, msg);
1709  continue;
1710  }
1711  }
1712 
1713  if (msg == NULL)
1714  break;
1715 
1716  gw_assert(msg_type(msg) == sms);
1717 
1718  /* debug("bb.sms", 0, "sms_boxc_router: handling message (%p vs %p)",
1719  msg, startmsg); */
1720 
1721  ret = route_incoming_to_boxc(msg);
1722  if (ret == 1)
1723  startmsg = newmsg = NULL;
1724  else if (ret == -1) {
1725  gwlist_produce(incoming_sms, msg);
1726  }
1727  }
1728 
1729  gw_rwlock_rdlock(smsbox_list_rwlock);
1730  len = gwlist_len(smsbox_list);
1731  for (i=0; i < len; i++) {
1732  boxc = gwlist_get(smsbox_list, i);
1734  }
1735  gw_rwlock_unlock(smsbox_list_rwlock);
1736 
1737  gwlist_remove_producer(flow_threads);
1738 }
1739 
1740 
1741 /*
1742  * Simple wrapper to allow the named smsbox Lists to be
1743  * destroyed when the smsbox_by_id Dict is destroyed
1744  *
1745  */
1746 static void boxc_gwlist_destroy(List *list)
1747 {
1748  gwlist_destroy(list, NULL);
1749 }
1750 
Dict * dict_create(long size_hint, void(*destroy_value)(void *))
Definition: dict.c:192
void msg_dump(Msg *msg, int level)
Definition: msg.c:152
Connection * conn
Definition: bb_boxc.c:135
Octstr * address
Definition: bb_boxc.c:824
static volatile sig_atomic_t smsbox_running
Definition: bb_boxc.c:103
void error(int err, const char *fmt,...)
Definition: log.c:612
static void run_wapbox(void *arg)
Definition: bb_boxc.c:760
void info(int err, const char *fmt,...)
Definition: log.c:636
int boxc_incoming_wdp_queue(void)
Definition: bb_boxc.c:1472
static Boxc * route_msg(List *route_info, Msg *msg)
Definition: bb_boxc.c:856
Msg * msg_duplicate(Msg *msg)
Definition: msg.c:111
void * gwlist_search(List *list, void *pattern, int(*cmp)(void *, void *))
Definition: list.c:486
List * outgoing
Definition: bb_boxc.c:143
static void sms_to_smsboxes(void *arg)
Definition: bb_boxc.c:1673
#define msg_unpack(os)
Definition: msg.h:183
List * outgoing_wdp
Definition: bearerbox.c:88
void semaphore_up(Semaphore *semaphore)
Definition: semaphore.c:118
static Msg * read_from_box(Boxc *boxconn)
Definition: bb_boxc.c:165
int ssl
void dict_put(Dict *dict, Octstr *key, void *value)
Definition: dict.c:240
void counter_destroy(Counter *counter)
Definition: counter.c:110
void gwlist_append(List *list, void *item)
Definition: list.c:179
char * bb_status_linebreak(int status_type)
Definition: bearerbox.c:1097
static int wapbox_port_ssl
Definition: bb_boxc.c:119
static void boxc_destroy(Boxc *boxc)
Definition: bb_boxc.c:604
Semaphore * semaphore_create(long n)
Definition: semaphore.c:81
static Dict * smsbox_by_smsc
Definition: bb_boxc.c:111
void gwlist_produce(List *list, void *item)
Definition: list.c:411
void gwthread_join(long thread)
static RWLock * smsbox_list_rwlock
Definition: bb_boxc.c:107
long gwlist_len(List *list)
Definition: list.c:166
int(* store_save_ack)(Msg *msg, ack_status_t status)
Definition: bb_store.c:73
void gw_rwlock_destroy(RWLock *lock)
Definition: gw-rwlock.c:112
int gw_rwlock_wrlock(RWLock *lock)
Definition: gw-rwlock.c:177
volatile sig_atomic_t restart
Definition: bearerbox.c:149
void * gwlist_get(List *list, long pos)
Definition: list.c:292
static int cmp_boxc(void *bc, void *ap)
Definition: bb_boxc.c:847
msg_type
Definition: msg.h:73
static void boxc_sent_pop(Boxc *, Msg *, Msg **)
Definition: bb_boxc.c:503
#define cfg_get(grp, varname)
Definition: cfg.h:86
int load
Definition: bb_boxc.c:138
void semaphore_down(Semaphore *semaphore)
Definition: semaphore.c:132
void uuid_unparse(const uuid_t uu, char *out)
Definition: gw_uuid.c:561
List * incoming_sms
Definition: bearerbox.c:84
static int smsbox_port_ssl
Definition: bb_boxc.c:116
RWLock * gw_rwlock_create(void)
Definition: gw-rwlock.c:77
#define msg_create(type)
Definition: msg.h:136
int gw_rwlock_rdlock(RWLock *lock)
Definition: gw-rwlock.c:134
void octstr_append_cstr(Octstr *ostr, const char *cstr)
Definition: octstr.c:1509
static Octstr * box_deny_ip
Definition: bb_boxc.c:125
int conn_eof(Connection *conn)
Definition: conn.c:697
void octstr_strip_blanks(Octstr *text)
Definition: octstr.c:1344
int gwlist_wait_until_nonempty(List *list)
Definition: list.c:361
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
unsigned long counter_increase(Counter *counter)
Definition: counter.c:123
List * retry
Definition: bb_boxc.c:142
int is_allowed_ip(Octstr *allow_ip, Octstr *deny_ip, Octstr *ip)
Definition: utils.c:815
static void boxc_receiver(void *arg)
Definition: bb_boxc.c:301
static Dict * smsbox_by_receiver
Definition: bb_boxc.c:112
volatile sig_atomic_t alive
Definition: bb_boxc.c:146
void gwlist_unlock(List *list)
Definition: list.c:354
volatile sig_atomic_t bb_status
Definition: bearerbox.c:132
static int port
Definition: fakesmsc.c:120
static Boxc * boxc_create(int fd, Octstr *ip, int ssl)
Definition: bb_boxc.c:587
#define POLLIN
Definition: gwpoll.h:91
long max_incoming_sms_qlength
Definition: bearerbox.c:98
Octstr * octstr_imm(const char *cstr)
Definition: octstr.c:281
static int send_msg(Boxc *boxconn, Msg *pmsg)
Definition: bb_boxc.c:455
Definition: msg.h:79
Definition: cfg.c:164
void octstr_insert(Octstr *ostr1, const Octstr *ostr2, long pos)
Definition: octstr.c:1301
void * dict_remove(Dict *dict, Octstr *key)
Definition: dict.c:307
Counter * counter_create(void)
Definition: counter.c:94
struct _addrpar AddrPar
void * gwlist_extract_first(List *list)
Definition: list.c:305
List * outgoing_sms
Definition: bearerbox.c:85
void grp_dump(CfgGroup *grp)
Definition: cfg.c:808
static void wait_for_connections(int fd, void(*function)(void *arg), List *waited, int ssl)
Definition: bb_boxc.c:974
static void ap_destroy(AddrPar *addr)
Definition: bb_boxc.c:829
void * dict_get(Dict *dict, Octstr *key)
Definition: dict.c:286
List * suspended
Definition: bearerbox.c:122
void gwlist_remove_producer(List *list)
Definition: list.c:401
void conn_destroy(Connection *conn)
Definition: conn.c:619
static void deliver_sms_to_queue(Msg *msg, Boxc *conn)
Definition: bb_boxc.c:213
void octstr_insert_char(Octstr *ostr, long pos, const char c)
Definition: octstr.c:1479
int port
Definition: bb_boxc.c:825
int route_incoming_to_boxc(Msg *msg)
Definition: bb_boxc.c:1511
List * incoming_wdp
Definition: bearerbox.c:87
Definition: dict.c:116
#define octstr_duplicate(ostr)
Definition: octstr.h:187
List * cfg_get_multi_group(Cfg *cfg, Octstr *name)
Definition: cfg.c:642
static Dict * smsbox_by_id
Definition: bb_boxc.c:110
int smsbox_start(Cfg *cfg)
Definition: bb_boxc.c:1227
long dict_key_count(Dict *dict)
Definition: dict.c:335
static void init_smsbox_routes(Cfg *cfg)
Definition: bb_boxc.c:1119
long gwlist_delete_equal(List *list, void *item)
Definition: list.c:266
void semaphore_destroy(Semaphore *semaphore)
Definition: semaphore.c:104
void uuid_copy(uuid_t dst, const uuid_t src)
Definition: gw_uuid.c:150
void msg_destroy(Msg *msg)
Definition: msg.c:132
int gw_rwlock_unlock(RWLock *lock)
Definition: gw-rwlock.c:155
static Octstr * smsbox_interface
Definition: bb_boxc.c:117
static void smsboxc_run(void *arg)
Definition: bb_boxc.c:1017
static void wapboxc_run(void *arg)
Definition: bb_boxc.c:1074
static void boxc_gwlist_destroy(List *list)
Definition: bb_boxc.c:1746
int make_server_socket(int port, const char *interface_name)
Definition: socket.c:93
time_t connect_time
Definition: bb_boxc.c:139
void warning(int err, const char *fmt,...)
Definition: log.c:624
Octstr * boxc_id
Definition: bb_boxc.c:147
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2462
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:322
#define gwthread_create(func, arg)
Definition: gwthread.h:90
#define octstr_create(cstr)
Definition: octstr.h:125
void octstr_destroy_item(void *os)
Definition: octstr.c:334
static long smsbox_max_pending
Definition: bb_boxc.c:122
Definition: bb_boxc.c:134
gw_assert(wtls_machine->packet_to_send!=NULL)
void gwthread_sleep(double seconds)
static Dict * smsbox_by_smsc_receiver
Definition: bb_boxc.c:113
Octstr * conn_read_withlen(Connection *conn)
Definition: conn.c:1161
Dict * sent
Definition: bb_boxc.c:144
int conn_write_withlen(Connection *conn, Octstr *data)
Definition: conn.c:1067
int smsbox_restart(Cfg *cfg)
Definition: bb_boxc.c:1292
static Counter * boxid
Definition: bb_boxc.c:128
void gwlist_insert(List *list, long pos, void *item)
Definition: list.c:214
#define UUID_STR_LEN
Definition: gw_uuid.h:19
int gwthread_pollfd(int fd, int events, double timeout)
void gwlist_lock(List *list)
Definition: list.c:347
void boxc_cleanup(void)
Definition: bb_boxc.c:1489
Semaphore * pending
Definition: bb_boxc.c:145
static Cfg * cfg
Definition: smsbox.c:115
long octstr_len(const Octstr *ostr)
Definition: octstr.c:340
void dict_destroy(Dict *dict)
Definition: dict.c:215
struct _boxc Boxc
int cfg_get_bool(int *n, CfgGroup *grp, Octstr *varname)
Definition: cfg.c:756
Definition: octstr.c:118
int conn_wait(Connection *conn, double seconds)
Definition: conn.c:896
void * gwlist_consume(List *list)
Definition: list.c:427
Octstr * host_ip(struct sockaddr_in addr)
Definition: socket.c:615
static long smsbox_port
Definition: bb_boxc.c:115
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:690
int cfg_get_integer(long *n, CfgGroup *grp, Octstr *varname)
Definition: cfg.c:739
int wapbox_start(Cfg *cfg)
Definition: bb_boxc.c:1305
#define panic
Definition: log.h:87
void gwthread_wakeup(long thread)
Definition: cfg.c:73
int socklen_t
Definition: socket.h:73
static Octstr * box_allow_ip
Definition: bb_boxc.c:124
void octstr_format_append(Octstr *os, const char *fmt,...)
Definition: octstr.c:2505
static List * smsbox_list
Definition: bb_boxc.c:106
static void wdp_to_wapboxes(void *arg)
Definition: bb_boxc.c:923
static void boxc_sent_push(Boxc *, Msg *)
Definition: bb_boxc.c:483
Octstr * msg_pack(Msg *msg)
Definition: msg.c:181
List * dict_keys(Dict *dict)
Definition: dict.c:347
#define gwlist_create()
Definition: list.h:136
static Boxc * accept_boxc(int fd, int ssl)
Definition: bb_boxc.c:620
static List * wapbox_list
Definition: bb_boxc.c:105
Octstr * boxc_status(int status_type)
Definition: bb_boxc.c:1348
int(* store_save)(Msg *msg)
Definition: bb_store.c:72
static void boxc_sender(void *arg)
Definition: bb_boxc.c:532
static long sms_dequeue_thread
Definition: bb_boxc.c:131
long id
Definition: bb_boxc.c:137
int dict_put_once(Dict *dict, Octstr *key, void *value)
Definition: dict.c:271
int conn_error(Connection *conn)
Definition: conn.c:708
int is_wap
Definition: bb_boxc.c:136
#define MAIN_THREAD_ID
Definition: gwthread.h:77
List * flow_threads
Definition: bearerbox.c:116
static void run_smsbox(void *arg)
Definition: bb_boxc.c:664
int wapboxid
Definition: bb_boxc.c:826
CfgGroup * cfg_get_single_group(Cfg *cfg, Octstr *name)
Definition: cfg.c:636
Octstr * client_ip
Definition: bb_boxc.c:140
void gwlist_add_producer(List *list)
Definition: list.c:383
volatile int routable
Definition: bb_boxc.c:149
static volatile sig_atomic_t wapbox_running
Definition: bb_boxc.c:104
int gw_rand(void)
Definition: protected.c:174
static int cmp_route(void *ap, void *ms)
Definition: bb_boxc.c:835
List * octstr_split(const Octstr *os, const Octstr *sep)
Definition: octstr.c:1638
Definition: list.c:102
static XMLRPCDocument * msg
Definition: test_xmlrpc.c:86
static long wapbox_port
Definition: bb_boxc.c:118
#define SMSBOX_MAX_PENDING
Definition: bb_boxc.c:83
List * incoming
Definition: bb_boxc.c:141
int gwlist_producer_count(List *list)
Definition: list.c:391
Connection * conn_wrap_fd(int fd, int ssl)
Definition: conn.c:558
int conn_flush(Connection *conn)
Definition: conn.c:987
int octstr_compare(const Octstr *ostr1, const Octstr *ostr2)
Definition: octstr.c:869
long smsc2_rout(Msg *msg, int resend)
Definition: bb_smscconn.c:1710
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.