Kannel: Open Source WAP and SMS gateway  svn-r5335
bb_store_file.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * bb_store.c : bearerbox box SMS storage/retrieval module
59  *
60  * Kalle Marjola 2001 for project Kannel
61  *
62  * Updated Oct 2004
63  *
64  * New features:
65  * - uses dict to save messages, for faster retrieval
66  * - acks are no longer saved (to memory), they simply delete
67  * messages from dict
68  * - better choice when dump done; configurable frequency
69  */
70 
71 #include <errno.h>
72 #include <stdlib.h>
73 #include <stdio.h>
74 #include <time.h>
75 #include <string.h>
76 #include <sys/time.h>
77 #include <sys/types.h>
78 #include <sys/socket.h>
79 #include <unistd.h>
80 #include <signal.h>
81 
82 #include "gwlib/gwlib.h"
83 #include "msg.h"
84 #include "bearerbox.h"
85 #include "sms.h"
86 
87 static FILE *file = NULL;
88 static Octstr *filename = NULL;
89 static Octstr *newfile = NULL;
90 static Octstr *bakfile = NULL;
91 static Mutex *file_mutex = NULL;
92 static long cleanup_thread = -1;
93 static long dump_frequency = 0;
94 
95 static Dict *sms_dict = NULL;
96 
97 static int active = 1;
98 static time_t last_dict_mod = 0;
99 static List *loaded;
100 
101 
102 static void write_msg(Msg *msg)
103 {
104  Octstr *pack;
105  unsigned char buf[4];
106 
107  pack = store_msg_pack(msg);
108  encode_network_long(buf, octstr_len(pack));
109  octstr_insert_data(pack, 0, (char*)buf, 4);
110 
111  octstr_print(file, pack);
112  fflush(file);
113 
114  octstr_destroy(pack);
115 }
116 
117 
118 static int read_msg(Msg **msg, Octstr *os, long *off)
119 {
120  unsigned char buf[4];
121  long i;
122  Octstr *pack;
123 
124  gw_assert(*off >= 0);
125  if (*off + 4 > octstr_len(os)) {
126  error(0, "Packet too short while unpacking Msg.");
127  return -1;
128  }
129 
130  octstr_get_many_chars((char*)buf, os, *off, 4);
131  i = decode_network_long(buf);
132  *off += 4;
133 
134  pack = octstr_copy(os, *off, i);
135  *off += octstr_len(pack);
136  *msg = store_msg_unpack(pack);
137  octstr_destroy(pack);
138 
139  if (!*msg)
140  return -1;
141 
142  return 0;
143 }
144 
145 
146 static int open_file(Octstr *name)
147 {
148  file = fopen(octstr_get_cstr(name), "w");
149  if (file == NULL) {
150  error(errno, "Failed to open '%s' for writing, cannot create store-file",
152  return -1;
153  }
154  return 0;
155 }
156 
157 
158 static int rename_store(void)
159 {
160  if (rename(octstr_get_cstr(filename), octstr_get_cstr(bakfile)) == -1) {
161  if (errno != ENOENT) {
162  error(errno, "Failed to rename old store '%s' as '%s'",
164  return -1;
165  }
166  }
167  if (rename(octstr_get_cstr(newfile), octstr_get_cstr(filename)) == -1) {
168  error(errno, "Failed to rename new store '%s' as '%s'",
170  return -1;
171  }
172  return 0;
173 }
174 
175 
176 static int do_dump(void)
177 {
178  Octstr *key;
179  Msg *msg;
180  List *sms_list;
181  long l;
182 
183  if (filename == NULL)
184  return 0;
185 
186  /* create a new store-file and save all non-acknowledged
187  * messages into it
188  */
189  if (open_file(newfile)==-1)
190  return -1;
191 
192  sms_list = dict_keys(sms_dict);
193  for (l=0; l < gwlist_len(sms_list); l++) {
194  key = gwlist_get(sms_list, l);
195  msg = dict_get(sms_dict, key);
196  if (msg != NULL)
197  write_msg(msg);
198  }
199  fflush(file);
201 
202  /* rename old storefile as .bak, and then new as regular file
203  * without .new ending */
204 
205  return rename_store();
206 }
207 
208 
209 /*
210  * thread to write current store to file now and then, to prevent
211  * it from becoming far too big (slows startup)
212  */
213 static void store_dumper(void *arg)
214 {
215  time_t now;
216  int busy = 0;
217 
218  while (active) {
219  now = time(NULL);
220  /*
221  * write store to file up to each N. second, providing
222  * that something happened or if we are constantly busy.
223  */
224  if (now - last_dict_mod > dump_frequency || busy) {
225  store_dump();
226  /*
227  * make sure that no new dump is done for a while unless
228  * something happens. This moves the trigger in the future
229  * and allows the if statement to pass if nothing happened
230  * in the mean time while sleeping. The busy flag is needed
231  * to garantee we do dump in case we are constantly busy
232  * and hence the difference between now and last dict
233  * operation is less then dump frequency, otherwise we
234  * would never dump. This is for constant high load.
235  */
236  last_dict_mod = time(NULL) + 3600*24;
237  busy = 0;
238  } else {
239  busy = (now - last_dict_mod) > 0;
240  }
242  }
243  store_dump();
244  if (file != NULL)
245  fclose(file);
250 
252  /* set all vars to NULL */
253  filename = newfile = bakfile = NULL;
254  file_mutex = NULL;
255  sms_dict = NULL;
256 }
257 
258 
259 /*------------------------------------------------------*/
260 
261 static void store_file_for_each_message(void(*callback_fn)(Msg* msg, void *data), void *data)
262 {
263  List *keys;
264  long l;
265  Msg *msg;
266  Octstr *key;
267 
268  /* if there is no store-file, then don't loop in sms_store */
269  if (filename == NULL)
270  return;
271 
273  keys = dict_keys(sms_dict);
274  for (l = 0; l < gwlist_len(keys); l++) {
275  key = gwlist_get(keys, l);
276  msg = dict_get(sms_dict, key);
277  if (msg == NULL)
278  continue;
279  callback_fn(msg, data);
280  }
282 
284 }
285 
286 
287 static long store_file_messages(void)
288 {
289  return (sms_dict ? dict_key_count(sms_dict) : -1);
290 }
291 
292 
293 static int store_to_dict(Msg *msg)
294 {
295  Msg *copy;
296  Octstr *uuid_os;
297  char id[UUID_STR_LEN + 1];
298 
299  /* always set msg id and timestamp */
300  if (msg_type(msg) == sms && uuid_is_null(msg->sms.id))
301  uuid_generate(msg->sms.id);
302 
303  if (msg_type(msg) == sms && msg->sms.time == MSG_PARAM_UNDEFINED)
304  time(&msg->sms.time);
305 
306  if (msg_type(msg) == sms) {
307  copy = msg_duplicate(msg);
308 
309  uuid_unparse(copy->sms.id, id);
310  uuid_os = octstr_create(id);
311 
312  dict_put(sms_dict, uuid_os, copy);
313  octstr_destroy(uuid_os);
314  last_dict_mod = time(NULL);
315  } else if (msg_type(msg) == ack) {
316  uuid_unparse(msg->ack.id, id);
317  uuid_os = octstr_create(id);
318  copy = dict_remove(sms_dict, uuid_os);
319  octstr_destroy(uuid_os);
320  if (copy == NULL) {
321  warning(0, "bb_store: get ACK of message not found "
322  "from store, strange?");
323  } else {
324  msg_destroy(copy);
325  last_dict_mod = time(NULL);
326  }
327  } else
328  return -1;
329  return 0;
330 }
331 
332 static int store_file_save(Msg *msg)
333 {
334  if (filename == NULL)
335  return 0;
336 
337  /* block here until store not loaded */
339 
340  /* lock file_mutex in order to have dict and file in sync */
342  if (store_to_dict(msg) == -1) {
344  return -1;
345  }
346 
347  /* write to file, too */
348  write_msg(msg);
349  fflush(file);
351 
352  return 0;
353 }
354 
355 
357 {
358  Msg *mack;
359  int ret;
360 
361  /* only sms are handled */
362  if (!msg || msg_type(msg) != sms)
363  return -1;
364 
365  if (filename == NULL)
366  return 0;
367 
368  mack = msg_create(ack);
369  if (!mack)
370  return -1;
371 
372  mack->ack.time = msg->sms.time;
373  uuid_copy(mack->ack.id, msg->sms.id);
374  mack->ack.nack = status;
375 
376  ret = store_save(mack);
377  msg_destroy(mack);
378 
379  return ret;
380 }
381 
382 
383 static int store_file_load(void(*receive_msg)(Msg*))
384 {
385  List *keys;
386  Octstr *store_file, *key;
387  Msg *msg;
388  int retval, msgs;
389  long end, pos;
390 
391  if (filename == NULL)
392  return 0;
393 
395  if (file != NULL) {
396  fclose(file);
397  file = NULL;
398  }
399 
401  if (store_file != NULL)
402  info(0, "Loading store file `%s'", octstr_get_cstr(filename));
403  else {
405  if (store_file != NULL)
406  info(0, "Loading store file `%s'", octstr_get_cstr(newfile));
407  else {
409  if (store_file != NULL)
410  info(0, "Loading store file `%s'", octstr_get_cstr(bakfile));
411  else {
412  info(0, "Cannot open any store file, starting a new one");
413  retval = open_file(filename);
414  goto end;
415  }
416  }
417  }
418 
419  info(0, "Store-file size %ld, starting to unpack%s", octstr_len(store_file),
420  octstr_len(store_file) > 10000 ? " (may take awhile)" : "");
421 
422 
423  pos = 0;
424  msgs = 0;
425  end = octstr_len(store_file);
426 
427  while (pos < end) {
428  if (read_msg(&msg, store_file, &pos) == -1) {
429  error(0, "Garbage at store-file, skipped.");
430  continue;
431  }
432  if (msg_type(msg) == sms) {
434  msgs++;
435  } else if (msg_type(msg) == ack) {
437  } else {
438  warning(0, "Strange message in store-file, discarded, "
439  "dump follows:");
440  msg_dump(msg, 0);
441  }
442  msg_destroy(msg);
443  }
444  octstr_destroy(store_file);
445 
446  info(0, "Retrieved %d messages, non-acknowledged messages: %ld",
447  msgs, dict_key_count(sms_dict));
448 
449  /* now create a new sms_store out of messages left */
450 
451  keys = dict_keys(sms_dict);
452  while ((key = gwlist_extract_first(keys)) != NULL) {
453  msg = dict_remove(sms_dict, key);
454  if (store_to_dict(msg) != -1) {
455  receive_msg(msg);
456  } else {
457  error(0, "Found unknown message type in store file.");
458  msg_dump(msg, 0);
459  msg_destroy(msg);
460  }
461  octstr_destroy(key);
462  }
464 
465  /* Finally, generate new store file out of left messages */
466  retval = do_dump();
467 
468 end:
470 
471  /* allow using of store */
473 
474  /* start dumper thread */
475  if ((cleanup_thread = gwthread_create(store_dumper, NULL))==-1)
476  panic(0, "Failed to create a cleanup thread!");
477 
478  return retval;
479 }
480 
481 
482 static int store_file_dump(void)
483 {
484  int retval;
485 
486  debug("bb.store", 0, "Dumping %ld messages to store",
489  if (file != NULL) {
490  fclose(file);
491  file = NULL;
492  }
493  retval = do_dump();
495 
496  return retval;
497 }
498 
499 
500 static void store_file_shutdown(void)
501 {
502  if (filename == NULL)
503  return;
504 
505  active = 0;
507  /* wait for cleanup thread */
508  if (cleanup_thread != -1)
510 
511  gwlist_destroy(loaded, NULL);
512 }
513 
514 
515 int store_file_init(const Octstr *fname, long dump_freq)
516 {
517  /* Initialize function pointers */
525 
526  if (fname == NULL)
527  return 0; /* we are done */
528 
529  if (octstr_len(fname) > (FILENAME_MAX-5))
530  panic(0, "Store file filename too long: `%s', failed to init.",
531  octstr_get_cstr(fname));
532 
533  filename = octstr_duplicate(fname);
536 
538 
539  if (dump_freq > 0)
540  dump_frequency = dump_freq;
541  else
543 
545  active = 1;
546 
547  loaded = gwlist_create();
549 
550  return 0;
551 }
Dict * dict_create(long size_hint, void(*destroy_value)(void *))
Definition: dict.c:192
void msg_dump(Msg *msg, int level)
Definition: msg.c:152
void error(int err, const char *fmt,...)
Definition: log.c:648
void info(int err, const char *fmt,...)
Definition: log.c:672
Msg * msg_duplicate(Msg *msg)
Definition: msg.c:111
static long store_file_messages(void)
static int read_msg(Msg **msg, Octstr *os, long *off)
static void store_file_shutdown(void)
gw_assert(wtls_machine->packet_to_send !=NULL)
void dict_put(Dict *dict, Octstr *key, void *value)
Definition: dict.c:240
#define mutex_unlock(m)
Definition: thread.h:136
void encode_network_long(unsigned char *data, unsigned long value)
Definition: utils.c:940
int store_file_init(const Octstr *fname, long dump_freq)
void gwthread_join(long thread)
long gwlist_len(List *list)
Definition: list.c:166
int(* store_save_ack)(Msg *msg, ack_status_t status)
Definition: bb_store.c:73
#define mutex_create()
Definition: thread.h:96
static long dump_frequency
Definition: bb_store_file.c:93
void * gwlist_get(List *list, long pos)
Definition: list.c:292
static List * loaded
Definition: bb_store_file.c:99
long(* store_messages)(void)
Definition: bb_store.c:71
static void write_msg(Msg *msg)
msg_type
Definition: msg.h:73
void uuid_unparse(const uuid_t uu, char *out)
Definition: gw_uuid.c:562
int octstr_print(FILE *f, Octstr *ostr)
Definition: octstr.c:1191
void uuid_generate(uuid_t out)
Definition: gw_uuid.c:393
static int store_file_dump(void)
#define msg_create(type)
Definition: msg.h:136
static int active
Definition: bb_store_file.c:97
Msg *(* store_msg_unpack)(Octstr *os)
Definition: bb_store.c:78
static int store_to_dict(Msg *msg)
void octstr_insert_data(Octstr *ostr, long pos, const char *data, long len)
Definition: octstr.c:1461
int(* store_dump)(void)
Definition: bb_store.c:75
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
#define octstr_copy(ostr, from, len)
Definition: octstr.h:178
void(* store_for_each_message)(void(*callback_fn)(Msg *msg, void *data), void *data)
Definition: bb_store.c:79
static int rename_store(void)
static Octstr * filename
Definition: bb_store_file.c:88
static Octstr * newfile
Definition: bb_store_file.c:89
void msg_destroy_item(void *msg)
Definition: msg.c:147
#define BB_STORE_DEFAULT_DUMP_FREQ
Definition: bb_store.h:66
static Dict * sms_dict
Definition: bb_store_file.c:95
Definition: msg.h:79
void * dict_remove(Dict *dict, Octstr *key)
Definition: dict.c:307
void * gwlist_extract_first(List *list)
Definition: list.c:305
static int store_file_save_ack(Msg *msg, ack_status_t status)
void * dict_get(Dict *dict, Octstr *key)
Definition: dict.c:286
void gwlist_remove_producer(List *list)
Definition: list.c:401
int uuid_is_null(const uuid_t uu)
Definition: gw_uuid.c:413
Definition: dict.c:116
#define octstr_duplicate(ostr)
Definition: octstr.h:187
static time_t last_dict_mod
Definition: bb_store_file.c:98
long dict_key_count(Dict *dict)
Definition: dict.c:335
static int store_file_save(Msg *msg)
void uuid_copy(uuid_t dst, const uuid_t src)
Definition: gw_uuid.c:150
static int do_dump(void)
void msg_destroy(Msg *msg)
Definition: msg.c:132
char * name
Definition: smsc_cimd2.c:212
void warning(int err, const char *fmt,...)
Definition: log.c:660
static long cleanup_thread
Definition: bb_store_file.c:92
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2464
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:324
#define gwthread_create(func, arg)
Definition: gwthread.h:90
#define octstr_create(cstr)
Definition: octstr.h:125
void octstr_destroy_item(void *os)
Definition: octstr.c:336
void gwthread_sleep(double seconds)
void * data
void mutex_destroy(Mutex *mutex)
Definition: thread.c:97
Octstr *(* store_msg_pack)(Msg *msg)
Definition: bb_store.c:77
static Mutex * file_mutex
Definition: bb_store_file.c:91
#define UUID_STR_LEN
Definition: gw_uuid.h:19
Octstr * octstr_read_file(const char *filename)
Definition: octstr.c:1548
void(* store_shutdown)(void)
Definition: bb_store.c:76
long octstr_len(const Octstr *ostr)
Definition: octstr.c:342
void dict_destroy(Dict *dict)
Definition: dict.c:215
long decode_network_long(unsigned char *data)
Definition: utils.c:935
static void store_file_for_each_message(void(*callback_fn)(Msg *msg, void *data), void *data)
Definition: octstr.c:118
void * gwlist_consume(List *list)
Definition: list.c:427
static int open_file(Octstr *name)
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:726
#define panic
Definition: log.h:87
void gwthread_wakeup(long thread)
int(* store_load)(void(*receive_msg)(Msg *))
Definition: bb_store.c:74
#define MSG_PARAM_UNDEFINED
Definition: msg.h:71
List * dict_keys(Dict *dict)
Definition: dict.c:347
#define gwlist_create()
Definition: list.h:136
void(* callback_fn)(Msg *msg, void *data)
static Octstr * bakfile
Definition: bb_store_file.c:90
int(* store_save)(Msg *msg)
Definition: bb_store.c:72
Definition: thread.h:76
ack_status_t
Definition: msg.h:124
static FILE * file
Definition: bb_store_file.c:87
static int store_file_load(void(*receive_msg)(Msg *))
void gwlist_add_producer(List *list)
Definition: list.c:383
void octstr_get_many_chars(char *buf, Octstr *ostr, long pos, long len)
Definition: octstr.c:425
#define mutex_lock(m)
Definition: thread.h:130
Definition: list.c:102
static XMLRPCDocument * msg
Definition: test_xmlrpc.c:86
Octstr * status
Definition: bb_store.c:108
static void store_dumper(void *arg)
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.