Kannel: Open Source WAP and SMS gateway  $Revision: 5037 $
bb_store_spool.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2016 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
63 #include "gw-config.h"
64 
65 #include <unistd.h>
66 #include <sys/types.h>
67 #include <sys/stat.h>
68 #include <fcntl.h>
69 #include <dirent.h>
70 #include <errno.h>
71 
72 #include "gwlib/gwlib.h"
73 #include "msg.h"
74 #include "sms.h"
75 #include "bearerbox.h"
76 #include "bb_store.h"
77 
78 
79 /* how much subdirs allowed ? */
80 #define MAX_DIRS 100
81 
82 static Octstr *spool;
83 static Counter *counter;
84 static List *loaded;
85 
86 
87 static int store_spool_dump()
88 {
89  /* nothing todo */
90  return 0;
91 }
92 
93 
94 static long store_spool_messages()
95 {
96  return counter ? counter_value(counter) : -1;
97 }
98 
99 
100 static int for_each_file(const Octstr *dir_s, int ignore_err, void(*cb)(const Octstr*, void*), void *data)
101 {
102  DIR *dir;
103  struct dirent *ent;
104  struct stat stat;
105  int ret = 0;
106 
107  if ((dir = opendir(octstr_get_cstr(dir_s))) == NULL) {
108  error(errno, "Could not open directory `%s'", octstr_get_cstr(dir_s));
109  return -1;
110  }
111  while((ent = readdir(dir)) != NULL) {
112  Octstr *filename;
113  if (*(ent->d_name) == '.') /* skip hidden files */
114  continue;
115  filename = octstr_format("%S/%s", dir_s, ent->d_name);
116  if (lstat(octstr_get_cstr(filename), &stat) == -1) {
117  if (!ignore_err)
118  error(errno, "Could not get stat for `%s'", octstr_get_cstr(filename));
119  ret = -1;
120  } else if (S_ISDIR(stat.st_mode) && for_each_file(filename, ignore_err, cb, data) == -1) {
121  ret = -1;
122  } else if (S_ISREG(stat.st_mode) && cb != NULL)
123  cb(filename, data);
124  octstr_destroy(filename);
125  if (ret == -1 && ignore_err)
126  ret = 0;
127  else if (ret == -1)
128  break;
129  }
130  closedir(dir);
131 
132  return ret;
133 }
134 
135 
136 struct status {
137  void(*callback_fn)(Msg* msg, void *data);
138  void *data;
139 };
140 
141 
142 static void status_cb(const Octstr *filename, void *d)
143 {
144  struct status *data = d;
145  Octstr *msg_s;
146  Msg *msg;
147 
148  msg_s = octstr_read_file(octstr_get_cstr(filename));
149  msg = store_msg_unpack(msg_s);
150  octstr_destroy(msg_s);
151  if (msg == NULL)
152  return;
153 
154  data->callback_fn(msg, data->data);
155 
156  msg_destroy(msg);
157 }
158 
159 
160 static void store_spool_for_each_message(void(*callback_fn)(Msg* msg, void *data), void *data)
161 {
162  struct status d;
163 
164  if (spool == NULL)
165  return;
166 
168  d.data = data;
169 
170  /* ignore error because files may disappear */
171  for_each_file(spool, 1, status_cb, &d);
172 }
173 
174 
175 static void dispatch(const Octstr *filename, void *data)
176 {
177  Octstr *msg_s;
178  Msg *msg;
179  void(*receive_msg)(Msg*) = data;
180 
181  /* debug("", 0, "dispatch(%s,...) called", octstr_get_cstr(filename)); */
182 
183  msg_s = octstr_read_file(octstr_get_cstr(filename));
184  if (msg_s == NULL)
185  return;
186  msg = store_msg_unpack(msg_s);
187  octstr_destroy(msg_s);
188  if (msg != NULL) {
189  receive_msg(msg);
190  counter_increase(counter);
191  } else {
192  error(0, "Could not unpack message `%s'", octstr_get_cstr(filename));
193  }
194 }
195 
196 
197 static int store_spool_load(void(*receive_msg)(Msg*))
198 {
199  int rc;
200 
201  /* check if we are active */
202  if (spool == NULL)
203  return 0;
204 
205  /* sanity check */
206  if (receive_msg == NULL)
207  return -1;
208 
209  rc = for_each_file(spool, 0, dispatch, receive_msg);
210 
211  info(0, "Loaded %ld messages from store.", counter_value(counter));
212 
213  /* allow using of storage */
214  gwlist_remove_producer(loaded);
215 
216  return rc;
217 }
218 
219 
221 {
222  char id[UUID_STR_LEN + 1];
223  Octstr *id_s;
224 
225  /* always set msg id and timestamp */
226  if (msg_type(msg) == sms && uuid_is_null(msg->sms.id))
227  uuid_generate(msg->sms.id);
228 
229  if (msg_type(msg) == sms && msg->sms.time == MSG_PARAM_UNDEFINED)
230  time(&msg->sms.time);
231 
232  if (spool == NULL)
233  return 0;
234 
235  /* blocke here if store still not loaded */
236  gwlist_consume(loaded);
237 
238  switch(msg_type(msg)) {
239  case sms:
240  {
241  Octstr *os = store_msg_pack(msg);
242  Octstr *filename, *dir;
243  int fd;
244  size_t wrc;
245 
246  if (os == NULL) {
247  error(0, "Could not pack message.");
248  return -1;
249  }
250  uuid_unparse(msg->sms.id, id);
251  id_s = octstr_create(id);
252  dir = octstr_format("%S/%ld", spool, octstr_hash_key(id_s) % MAX_DIRS);
253  octstr_destroy(id_s);
254  if (mkdir(octstr_get_cstr(dir), S_IRUSR|S_IWUSR|S_IXUSR) == -1 && errno != EEXIST) {
255  error(errno, "Could not create directory `%s'.", octstr_get_cstr(dir));
256  octstr_destroy(dir);
257  octstr_destroy(os);
258  return -1;
259  }
260  filename = octstr_format("%S/%s", dir, id);
261  octstr_destroy(dir);
262  if ((fd = open(octstr_get_cstr(filename), O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IWUSR)) == -1) {
263  error(errno, "Could not open file `%s'.", octstr_get_cstr(filename));
264  octstr_destroy(filename);
265  octstr_destroy(os);
266  return -1;
267  }
268  for (wrc = 0; wrc < octstr_len(os); ) {
269  size_t rc = write(fd, octstr_get_cstr(os) + wrc, octstr_len(os) - wrc);
270  if (rc == -1) {
271  /* remove file */
272  error(errno, "Could not write message to `%s'.", octstr_get_cstr(filename));
273  close(fd);
274  if (unlink(octstr_get_cstr(filename)) == -1)
275  error(errno, "Oops, Could not remove failed file `%s'.", octstr_get_cstr(filename));
276  octstr_destroy(os);
277  octstr_destroy(filename);
278  return -1;
279  }
280  wrc += rc;
281  }
282  close(fd);
283  counter_increase(counter);
284  octstr_destroy(filename);
285  octstr_destroy(os);
286  break;
287  }
288  case ack:
289  {
290  Octstr *filename;
291  uuid_unparse(msg->ack.id, id);
292  id_s = octstr_create(id);
293  filename = octstr_format("%S/%ld/%s", spool, octstr_hash_key(id_s) % MAX_DIRS, id);
294  octstr_destroy(id_s);
295  if (unlink(octstr_get_cstr(filename)) == -1) {
296  error(errno, "Could not unlink file `%s'.", octstr_get_cstr(filename));
297  octstr_destroy(filename);
298  return -1;
299  }
300  counter_decrease(counter);
301  octstr_destroy(filename);
302  break;
303  }
304  default:
305  return -1;
306  }
307 
308  return 0;
309 }
310 
311 
313 {
314  int ret;
315  Msg *nack = msg_create(ack);
316 
317  nack->ack.nack = status;
318  uuid_copy(nack->ack.id, msg->sms.id);
319  nack->ack.time = msg->sms.time;
320  ret = store_spool_save(nack);
321  msg_destroy(nack);
322 
323  return ret;
324 }
325 
326 
327 static void store_spool_shutdown()
328 {
329  if (spool == NULL)
330  return;
331 
332  counter_destroy(counter);
333  octstr_destroy(spool);
334  gwlist_destroy(loaded, NULL);
335 }
336 
337 
338 int store_spool_init(const Octstr *store_dir)
339 {
340  DIR *dir;
341 
349 
350  if (store_dir == NULL)
351  return 0;
352 
353  /* check if we can open directory */
354  if ((dir = opendir(octstr_get_cstr(store_dir))) == NULL) {
355  error(errno, "Could not open directory `%s'", octstr_get_cstr(store_dir));
356  return -1;
357  }
358  closedir(dir);
359 
360  loaded = gwlist_create();
361  gwlist_add_producer(loaded);
362  spool = octstr_duplicate(store_dir);
363  counter = counter_create();
364 
365  return 0;
366 }
367 
int store_spool_init(const Octstr *store_dir)
void error(int err, const char *fmt,...)
Definition: log.c:612
void info(int err, const char *fmt,...)
Definition: log.c:636
static Octstr * spool
void counter_destroy(Counter *counter)
Definition: counter.c:110
static void store_spool_shutdown()
int(* store_save_ack)(Msg *msg, ack_status_t status)
Definition: bb_store.c:73
long(* store_messages)(void)
Definition: bb_store.c:71
static void dispatch(const Octstr *filename, void *data)
msg_type
Definition: msg.h:73
void uuid_unparse(const uuid_t uu, char *out)
Definition: gw_uuid.c:561
void uuid_generate(uuid_t out)
Definition: gw_uuid.c:392
#define msg_create(type)
Definition: msg.h:136
unsigned long counter_decrease(Counter *counter)
Definition: counter.c:155
Msg *(* store_msg_unpack)(Octstr *os)
Definition: bb_store.c:78
int(* store_dump)(void)
Definition: bb_store.c:75
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
unsigned long counter_increase(Counter *counter)
Definition: counter.c:123
#define MAX_DIRS
void(* store_for_each_message)(void(*callback_fn)(Msg *msg, void *data), void *data)
Definition: bb_store.c:79
static long store_spool_messages()
static int for_each_file(const Octstr *dir_s, int ignore_err, void(*cb)(const Octstr *, void *), void *data)
static void store_spool_for_each_message(void(*callback_fn)(Msg *msg, void *data), void *data)
Definition: msg.h:79
Counter * counter_create(void)
Definition: counter.c:94
void gwlist_remove_producer(List *list)
Definition: list.c:401
int uuid_is_null(const uuid_t uu)
Definition: gw_uuid.c:412
static int store_spool_save(Msg *msg)
static int store_spool_dump()
#define octstr_duplicate(ostr)
Definition: octstr.h:187
void uuid_copy(uuid_t dst, const uuid_t src)
Definition: gw_uuid.c:150
void msg_destroy(Msg *msg)
Definition: msg.c:132
static List * loaded
static int store_spool_save_ack(Msg *msg, ack_status_t status)
unsigned long octstr_hash_key(Octstr *ostr)
Definition: octstr.c:2521
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2462
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:322
char filename[FILENAME_MAX+1]
Definition: log.c:135
#define octstr_create(cstr)
Definition: octstr.h:125
unsigned long counter_value(Counter *counter)
Definition: counter.c:145
void * data
Octstr *(* store_msg_pack)(Msg *msg)
Definition: bb_store.c:77
#define UUID_STR_LEN
Definition: gw_uuid.h:19
Octstr * octstr_read_file(const char *filename)
Definition: octstr.c:1546
void(* store_shutdown)(void)
Definition: bb_store.c:76
long octstr_len(const Octstr *ostr)
Definition: octstr.c:340
Definition: octstr.c:118
void * gwlist_consume(List *list)
Definition: list.c:427
static int store_spool_load(void(*receive_msg)(Msg *))
int(* store_load)(void(*receive_msg)(Msg *))
Definition: bb_store.c:74
#define MSG_PARAM_UNDEFINED
Definition: msg.h:71
#define gwlist_create()
Definition: list.h:136
void(* callback_fn)(Msg *msg, void *data)
int(* store_save)(Msg *msg)
Definition: bb_store.c:72
ack_status_t
Definition: msg.h:124
void gwlist_add_producer(List *list)
Definition: list.c:383
static void status_cb(const Octstr *filename, void *d)
Definition: list.c:102
static XMLRPCDocument * msg
Definition: test_xmlrpc.c:86
static Counter * counter
Octstr * status
Definition: bb_store.c:108
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.