Kannel: Open Source WAP and SMS gateway  svn-r5335
dbpool_redis.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * dbpool_redis.c - implement REDIS operations for generic database connection pool
59  *
60  * In the redis_[update|select]() functions no NULL values are allowed in the
61  * binds list, otherwise we may get value corruption when getting back values
62  * from redis.
63  *
64  * Toby Phipps <toby.phipps at nexmedia.com.sg>, 2011 Initial version.
65  * Stipe Tolj <stolj at kannel.org>, 2013, 2015
66  */
67 
68 #ifdef HAVE_REDIS
69 #include <hiredis.h>
70 
71 /*
72  * Define REDIS_DEBUG to get DEBUG level output of the
73  * Redis commands send to the server.
74  */
75 /* #define REDIS_DEBUG 1 */
76 
77 #define REDIS_DEFAULT_PORT 6379
78 
79 
80 static void *redis_open_conn(const DBConf *db_conf)
81 {
82  redisContext *redis = NULL;
83  RedisConf *conf = db_conf->redis; /* make compiler happy */
84  redisReply *reply = NULL;
85  Octstr *os, *line;
86  List *lines;
87  long delimiter;
88 
89  /* sanity check */
90  if (conf == NULL)
91  return NULL;
92 
93  struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
94  redis = redisConnectWithTimeout(octstr_get_cstr(conf->host), conf->port, timeout);
95  if (redis->err) {
96  error(0, "REDIS: can not connect to server!");
97  error(0, "REDIS: %s", redis->errstr);
98  goto failed;
99  }
100 
101  info(0, "REDIS: Connected to server at %s:%ld.",
102  octstr_get_cstr(conf->host), conf->port);
103 
104  if (conf->password != NULL) {
105  reply = redisCommand(redis, "AUTH %s", octstr_get_cstr(conf->password));
106  if (strncmp("OK", reply->str, 2) != 0) {
107  error(0, "REDIS: Password authentication failed!");
108  goto failed;
109  }
110  freeReplyObject(reply);
111  }
112 
113  if (conf->idle_timeout != -1) {
114  reply = redisCommand(redis, "CONFIG SET TIMEOUT %ld", conf->idle_timeout);
115  if (strncmp("OK", reply->str, 2) != 0)
116  warning(0, "REDIS: CONFIG SET TIMEOUT %ld failed - could not set timeout",
117  conf->idle_timeout);
118  else
119  info(0, "REDIS: Set idle timeout to %ld seconds", conf->idle_timeout);
120  freeReplyObject(reply);
121  }
122 
123  if (conf->database != -1) {
124  reply = redisCommand(redis,"SELECT %ld", conf->database);
125  if (strncmp("OK", reply->str, 2) != 0)
126  error(0,"REDIS: SELECT %ld failed - could not select database", conf->database);
127  else
128  info(0,"REDIS: Selected database %ld", conf->database);
129  freeReplyObject(reply);
130  }
131 
132  reply = redisCommand(redis, "INFO");
133  if (reply->type != REDIS_REPLY_STRING) {
134  error(0, "REDIS: INFO command to get version failed!");
135  goto failed;
136  }
137 
138  os = octstr_create(reply->str);
139 
140 #if defined(REDIS_DEBUG)
141  debug("dbpool.redis",0,"Received REDIS_REPLY_STRING for INFO cmd");
142  /* octstr_dump(os, 0); */
143 #endif
144 
145  lines = octstr_split(os, octstr_imm("\n"));
146  octstr_destroy(os);
147  os = NULL;
148 
149  while ((line = gwlist_extract_first(lines)) != NULL) {
150  Octstr *key, *value;
151 
152  /* comment line */
153  if (octstr_get_char(line, 0) == '#') {
154  octstr_destroy(line);
155  continue;
156  }
157  delimiter = octstr_search_char(line, ':', 0);
158  key = octstr_copy(line, 0, delimiter);
159  octstr_strip_blanks(key);
160  value = octstr_copy(line, delimiter + 1, octstr_len(line));
161  octstr_strip_blanks(value);
162  if (octstr_str_compare(key, "redis_version") == 0) {
163  os = octstr_duplicate(value);
164  octstr_destroy(key);
165  octstr_destroy(value);
166  octstr_destroy(line);
167  break;
168  }
169  octstr_destroy(key);
170  octstr_destroy(value);
171  octstr_destroy(line);
172  }
174 
175  if (os == NULL) {
176  error(0, "REDIS: Could not parse version from INFO output!");
177  goto failed;
178  }
179 
180  info(0, "REDIS: server version %s.", octstr_get_cstr(os));
181  octstr_destroy(os);
182 
183  freeReplyObject(reply);
184  return redis;
185 
186 failed:
187  if (reply != NULL)
188  freeReplyObject(reply);
189  if (redis != NULL)
190  redisFree(redis);
191 
192  return NULL;
193 }
194 
195 
196 static void redis_close_conn(void *conn)
197 {
198  if (conn == NULL)
199  return;
200 
201  redisFree((redisContext*) conn);
202 }
203 
204 
205 static int redis_check_conn(void *conn)
206 {
207  redisReply *reply;
208 
209  if (conn == NULL)
210  return -1;
211 
212  reply = redisCommand(conn, "PING");
213  if (reply != NULL) {
214  if (strcmp(reply->str,"PONG") == 0) {
215  freeReplyObject(reply);
216  return 0;
217  }
218  }
219 
220  error(0, "REDIS: server connection check failed!");
221  if (((redisContext*)conn)->err != 0) {
222  /* error occured */
223  error(0, "REDIS: %s", ((redisContext*)conn)->errstr);
224  } else if (reply != NULL) {
225  /* some strange reply ? */
226  error(0, "REDIS REPLY: %s", reply->str);
227  }
228  if (reply != NULL)
229  freeReplyObject(reply);
230  return -1;
231 }
232 
233 static List *redis_populate_array(redisReply *reply)
234 {
235  long i;
236  List *row;
237  Octstr *temp = NULL;
238 
239  gw_assert(reply->type == REDIS_REPLY_ARRAY);
240 
241  row = gwlist_create();
242  for (i = 0; i < reply->elements; i++) {
243  switch(reply->element[i]->type) {
244  case REDIS_REPLY_NIL:
245  gwlist_produce(row, octstr_imm(""));
246  break;
247  case REDIS_REPLY_ARRAY: {
248  List *arr;
249  arr = redis_populate_array(reply->element[i]);
250  gwlist_append(row, arr);
251  break;
252  }
253  default:
254  if (reply->element[i]->str == NULL || reply->element[i]->len == 0) {
255  gwlist_produce(row, octstr_imm(""));
256  } else {
257  temp = octstr_create_from_data(reply->element[i]->str, reply->element[i]->len);
258 #if defined(REDIS_DEBUG)
259  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY[%ld]: %s", i, octstr_get_cstr(temp));
260 #endif
261  gwlist_append(row, temp);
262  }
263  break;
264  }
265  }
266 
267  return row;
268 }
269 
270 static int redis_select(void *conn, const Octstr *sql, List *binds, List **res)
271 {
272  redisReply *reply;
273  long i, binds_len;
274  List *row;
275  Octstr *temp = NULL;
276  const char **argv;
277 
278  /* bind parameters if any */
279  binds_len = gwlist_len(binds);
280 
281  if (binds_len > 0) {
282 #if defined(REDIS_DEBUG)
283  Octstr *os = octstr_create("");;
284 #endif
285 
286  argv = gw_malloc(sizeof(*argv) * binds_len);
287  for (i = 0; i < binds_len; i++) {
288  argv[i] = (char*)octstr_get_cstr(gwlist_get(binds, i));
289 #if defined(REDIS_DEBUG)
290  octstr_format_append(os, "\"%s\" ", argv[i]);
291 #endif
292  }
293 
294 #if defined(REDIS_DEBUG)
295  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(os));
296  octstr_destroy(os);
297 #endif
298 
299  /* execute statement */
300  reply = redisCommandArgv(conn, binds_len, argv, NULL);
301 
302  gw_free(argv);
303 
304  } else {
305 
306 #if defined(REDIS_DEBUG)
307  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(sql));
308 #endif
309 
310  /* execute statement */
311  reply = redisCommand(conn, octstr_get_cstr(sql));
312  }
313 
314  if (reply == NULL)
315  return -1;
316 
317  /* evaluate reply */
318  switch (reply->type) {
319  case REDIS_REPLY_ERROR:
320 #if defined(REDIS_DEBUG)
321  debug("dbpool.redis",0,"Received REDIS_REPLY_ERROR");
322 #endif
323  error(0, "REDIS: redisCommand() failed: `%s'", reply->str);
324  break;
325  case REDIS_REPLY_NIL:
326 #if defined(REDIS_DEBUG)
327  debug("dbpool.redis",0,"Received REDIS_REPLY_NIL");
328 #endif
329  break;
330  case REDIS_REPLY_STATUS:
331 #if defined(REDIS_DEBUG)
332  debug("dbpool.redis",0,"Received REDIS_REPLY_STATUS");
333 #endif
334  break;
335 
336  case REDIS_REPLY_STRING:
337 #if defined(REDIS_DEBUG)
338  debug("dbpool.redis",0,"Received REDIS_REPLY_STRING");
339 #endif
340  *res = gwlist_create();
341  row = gwlist_create();
342  temp = octstr_create_from_data(reply->str, reply->len);
343  gwlist_append(row, temp);
344  gwlist_produce(*res, row);
345  freeReplyObject(reply);
346  return 0;
347  break;
348 
349  case REDIS_REPLY_INTEGER:
350 #if defined(REDIS_DEBUG)
351  debug("dbpool.redis",0,"Received REDIS_REPLY_INTEGER");
352 #endif
353  *res = gwlist_create();
354  row = gwlist_create();
355  temp = octstr_format("%ld", reply->integer);
356  gwlist_append(row, temp);
357  gwlist_produce(*res, row);
358  freeReplyObject(reply);
359  return 0;
360 
361  case REDIS_REPLY_ARRAY:
362 #if defined(REDIS_DEBUG)
363  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY");
364 #endif
365  *res = gwlist_create();
366  row = redis_populate_array(reply);
367  gwlist_produce(*res, row);
368  freeReplyObject(reply);
369  return 0;
370 
371  default:
372 #if defined(REDIS_DEBUG)
373  error(0,"REDIS: Received unknown Redis reply type %d", reply->type);
374 #endif
375  break;
376  }
377 
378  freeReplyObject(reply);
379 
380  return -1;
381 }
382 
383 
384 static int redis_update(void *conn, const Octstr *sql, List *binds)
385 {
386  long i, binds_len;
387  int ret;
388  redisReply *reply;
389  const char **argv;
390 
391  /* bind parameters if any */
392  binds_len = gwlist_len(binds);
393 
394  if (binds_len > 0) {
395 #if defined(REDIS_DEBUG)
396  Octstr *os = octstr_create("");;
397 #endif
398 
399  argv = gw_malloc(sizeof(*argv) * binds_len);
400  for (i = 0; i < binds_len; i++) {
401  argv[i] = (char*)octstr_get_cstr(gwlist_get(binds, i));
402 #if defined(REDIS_DEBUG)
403  octstr_format_append(os, "\"%s\" ", argv[i]);
404 #endif
405  }
406 
407 #if defined(REDIS_DEBUG)
408  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(os));
409  octstr_destroy(os);
410 #endif
411 
412  /* execute statement */
413  reply = redisCommandArgv(conn, binds_len, argv, NULL);
414 
415  gw_free(argv);
416 
417  } else {
418 
419 #if defined(REDIS_DEBUG)
420  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(sql));
421 #endif
422 
423  /* execute statement */
424  reply = redisCommand(conn, octstr_get_cstr(sql));
425  }
426 
427  if (reply == NULL)
428  return -1;
429 
430  /* evaluate reply */
431  switch (reply->type) {
432  case REDIS_REPLY_ERROR:
433 #if defined(REDIS_DEBUG)
434  debug("dbpool.redis",0,"Received REDIS_REPLY_ERROR");
435 #endif
436  error(0, "REDIS: redisCommand() failed: `%s'", reply->str);
437  break;
438  case REDIS_REPLY_STATUS:
439  /* Some Redis commands (e.g. WATCH) return a boolean status */
440 #if defined(REDIS_DEBUG)
441  debug("dbpool.redis",0,"Received REDIS_REPLY_STATUS: %s", reply->str);
442 #endif
443  if (strcmp(reply->str, "OK") == 0) {
444  freeReplyObject(reply);
445  return 0;
446  }
447  break;
448  case REDIS_REPLY_INTEGER:
449  /* Other commands (e.g. DEL) return an integer indicating
450  * the number of keys affected */
451 #if defined(REDIS_DEBUG)
452  debug("dbpool.redis",0,"Received REDIS_REPLY_INTEGER: %qi", reply->integer);
453 #endif
454  /*
455  * Note: Redis returns a long long. Casting it to an int here could
456  * cause precision loss, however as we're returning an update status,
457  * this should only ever be used to return a count of keys
458  * deleted/updated, and this will almost invariably be 1.
459  */
460  ret = (int)reply->integer;
461  freeReplyObject(reply);
462  return ret;
463  case REDIS_REPLY_ARRAY:
464  /* The EXEC command returns an array of replies
465  * when executed successfully */
466 #if defined(REDIS_DEBUG)
467  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY");
468 #endif
469  freeReplyObject(reply);
470  /* For now, we only support EXEC commands with an array
471  * return and in that case, all is well */
472  return 0;
473  case REDIS_REPLY_NIL:
474  /* Finally, the EXEC command can return a NULL
475  * if it fails (e.g. due to a WATCH triggering */
476 #if defined(REDIS_DEBUG)
477  debug("dbpool.redis",0,"Received REDIS_REPLY_NIL");
478 #endif
479  break;
480  default:
481 #if defined(REDIS_DEBUG)
482  debug("dbpool.redis",0,"Received unknown Redis reply %d", reply->type);
483 #endif
484  break;
485  }
486 
487  freeReplyObject(reply);
488 
489  return -1;
490 }
491 
492 
493 static void redis_conf_destroy(DBConf *db_conf)
494 {
495  RedisConf *conf = db_conf->redis;
496 
497  octstr_destroy(conf->host);
498  octstr_destroy(conf->password);
499 
500  gw_free(conf);
501  gw_free(db_conf);
502 }
503 
504 
505 static struct db_ops redis_ops = {
506  .open = redis_open_conn,
507  .close = redis_close_conn,
508  .check = redis_check_conn,
509  .select = redis_select,
510  .update = redis_update,
511  .conf_destroy = redis_conf_destroy
512 };
513 
514 #endif /* HAVE_REDIS */
void error(int err, const char *fmt,...)
Definition: log.c:648
static Octstr * delimiter
Definition: test_ppg.c:104
void info(int err, const char *fmt,...)
Definition: log.c:672
gw_assert(wtls_machine->packet_to_send !=NULL)
RedisConf * redis
Definition: dbpool.h:172
void gwlist_append(List *list, void *item)
Definition: list.c:179
void gwlist_produce(List *list, void *item)
Definition: list.c:411
long gwlist_len(List *list)
Definition: list.c:166
void * gwlist_get(List *list, long pos)
Definition: list.c:292
void octstr_strip_blanks(Octstr *text)
Definition: octstr.c:1346
Octstr * password
Definition: dbpool.h:150
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
#define octstr_copy(ostr, from, len)
Definition: octstr.h:178
long octstr_search_char(const Octstr *ostr, int ch, long pos)
Definition: octstr.c:1012
static List * lines
Definition: mtbatch.c:88
Octstr * octstr_imm(const char *cstr)
Definition: octstr.c:283
void * gwlist_extract_first(List *list)
Definition: list.c:305
#define octstr_duplicate(ostr)
Definition: octstr.h:187
void warning(int err, const char *fmt,...)
Definition: log.c:660
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2464
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:324
#define octstr_create(cstr)
Definition: octstr.h:125
void octstr_destroy_item(void *os)
Definition: octstr.c:336
long database
Definition: dbpool.h:151
Definition: dbpool.h:164
long octstr_len(const Octstr *ostr)
Definition: octstr.c:342
Definition: octstr.c:118
long port
Definition: dbpool.h:149
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:726
int octstr_str_compare(const Octstr *ostr, const char *str)
Definition: octstr.c:973
void octstr_format_append(Octstr *os, const char *fmt,...)
Definition: octstr.c:2507
#define gwlist_create()
Definition: list.h:136
Octstr * host
Definition: dbpool.h:148
int octstr_get_char(const Octstr *ostr, long pos)
Definition: octstr.c:406
#define octstr_create_from_data(data, len)
Definition: octstr.h:134
List * octstr_split(const Octstr *os, const Octstr *sep)
Definition: octstr.c:1640
Definition: list.c:102
void *(* open)(const DBConf *conf)
Definition: dbpool_p.h:73
static void reply(HTTPClient *c, List *push_headers)
long idle_timeout
Definition: dbpool.h:152
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.