Kannel: Open Source WAP and SMS gateway  $Revision: 5037 $
dbpool_redis.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2016 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * dbpool_redis.c - implement REDIS operations for generic database connection pool
59  *
60  * In the redis_[update|select]() functions no NULL values are allowed in the
61  * binds list, otherwise we may get value corruption when getting back values
62  * from redis.
63  *
64  * Toby Phipps <toby.phipps at nexmedia.com.sg>, 2011 Initial version.
65  * Stipe Tolj <stolj at kannel.org>, 2013, 2015
66  */
67 
68 #ifdef HAVE_REDIS
69 #include <hiredis.h>
70 
71 /*
72  * Define REDIS_DEBUG to get DEBUG level output of the
73  * Redis commands send to the server.
74  */
75 /* #define REDIS_DEBUG 1 */
76 
77 #define REDIS_DEFAULT_PORT 6379
78 
79 
80 static void *redis_open_conn(const DBConf *db_conf)
81 {
82  redisContext *redis = NULL;
83  RedisConf *conf = db_conf->redis; /* make compiler happy */
84  redisReply *reply = NULL;
85  Octstr *os, *line;
86  List *lines;
87  long delimiter;
88 
89  /* sanity check */
90  if (conf == NULL)
91  return NULL;
92 
93  struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
94  redis = redisConnectWithTimeout(octstr_get_cstr(conf->host), conf->port, timeout);
95  if (redis->err) {
96  error(0, "REDIS: can not connect to server!");
97  error(0, "REDIS: %s", redis->errstr);
98  goto failed;
99  }
100 
101  info(0, "REDIS: Connected to server at %s:%ld.",
102  octstr_get_cstr(conf->host), conf->port);
103 
104  if (conf->password != NULL) {
105  reply = redisCommand(redis, "AUTH %s", octstr_get_cstr(conf->password));
106  if (strncmp("OK", reply->str, 2) != 0) {
107  error(0, "REDIS: Password authentication failed!");
108  goto failed;
109  }
110  freeReplyObject(reply);
111  }
112 
113  if (conf->idle_timeout != -1) {
114  reply = redisCommand(redis, "CONFIG SET TIMEOUT %ld", conf->idle_timeout);
115  if (strncmp("OK", reply->str, 2) != 0)
116  warning(0, "REDIS: CONFIG SET TIMEOUT %ld failed - could not set timeout",
117  conf->idle_timeout);
118  else
119  info(0, "REDIS: Set idle timeout to %ld seconds", conf->idle_timeout);
120  freeReplyObject(reply);
121  }
122 
123  if (conf->database != -1) {
124  reply = redisCommand(redis,"SELECT %ld", conf->database);
125  if (strncmp("OK", reply->str, 2) != 0)
126  error(0,"REDIS: SELECT %ld failed - could not select database", conf->database);
127  else
128  info(0,"REDIS: Selected database %ld", conf->database);
129  freeReplyObject(reply);
130  }
131 
132  reply = redisCommand(redis, "INFO");
133  if (reply->type != REDIS_REPLY_STRING) {
134  error(0, "REDIS: INFO command to get version failed!");
135  goto failed;
136  }
137 
138  os = octstr_create(reply->str);
139 
140 #if defined(REDIS_DEBUG)
141  debug("dbpool.redis",0,"Received REDIS_REPLY_STRING for INFO cmd");
142  /* octstr_dump(os, 0); */
143 #endif
144 
145  lines = octstr_split(os, octstr_imm("\n"));
146  octstr_destroy(os);
147  os = NULL;
148 
149  while ((line = gwlist_extract_first(lines)) != NULL) {
150  Octstr *key, *value;
151 
152  /* comment line */
153  if (octstr_get_char(line, 0) == '#') {
154  octstr_destroy(line);
155  continue;
156  }
157  delimiter = octstr_search_char(line, ':', 0);
158  key = octstr_copy(line, 0, delimiter);
159  octstr_strip_blanks(key);
160  value = octstr_copy(line, delimiter + 1, octstr_len(line));
161  octstr_strip_blanks(value);
162  if (octstr_str_compare(key, "redis_version") == 0) {
163  os = octstr_duplicate(value);
164  octstr_destroy(key);
165  octstr_destroy(value);
166  octstr_destroy(line);
167  break;
168  }
169  octstr_destroy(key);
170  octstr_destroy(value);
171  octstr_destroy(line);
172  }
174 
175  if (os == NULL) {
176  error(0, "REDIS: Could not parse version from INFO output!");
177  goto failed;
178  }
179 
180  info(0, "REDIS: server version %s.", octstr_get_cstr(os));
181  octstr_destroy(os);
182 
183  freeReplyObject(reply);
184  return redis;
185 
186 failed:
187  if (reply != NULL)
188  freeReplyObject(reply);
189  if (redis != NULL)
190  redisFree(redis);
191 
192  return NULL;
193 }
194 
195 
196 static void redis_close_conn(void *conn)
197 {
198  if (conn == NULL)
199  return;
200 
201  redisFree((redisContext*) conn);
202 }
203 
204 
205 static int redis_check_conn(void *conn)
206 {
207  redisReply *reply;
208 
209  if (conn == NULL)
210  return -1;
211 
212  reply = redisCommand(conn, "PING");
213  if (reply != NULL) {
214  if (strcmp(reply->str,"PONG") == 0) {
215  freeReplyObject(reply);
216  return 0;
217  }
218  }
219 
220  error(0, "REDIS: server connection check failed!");
221  error(0, "REDIS: %s", ((redisContext*)conn)->errstr);
222  if (reply != NULL)
223  freeReplyObject(reply);
224  return -1;
225 }
226 
227 
228 static int redis_select(void *conn, const Octstr *sql, List *binds, List **res)
229 {
230  redisReply *reply;
231  long i, binds_len;
232  List *row;
233  Octstr *temp = NULL;
234  const char **argv;
235 
236  /* bind parameters if any */
237  binds_len = gwlist_len(binds);
238 
239  if (binds_len > 0) {
240 #if defined(REDIS_DEBUG)
241  Octstr *os = octstr_create("");;
242 #endif
243 
244  argv = gw_malloc(sizeof(*argv) * binds_len);
245  for (i = 0; i < binds_len; i++) {
246  argv[i] = (char*)octstr_get_cstr(gwlist_get(binds, i));
247 #if defined(REDIS_DEBUG)
248  octstr_format_append(os, "\"%s\" ", argv[i]);
249 #endif
250  }
251 
252 #if defined(REDIS_DEBUG)
253  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(os));
254  octstr_destroy(os);
255 #endif
256 
257  /* execute statement */
258  reply = redisCommandArgv(conn, binds_len, argv, NULL);
259 
260  gw_free(argv);
261 
262  } else {
263 
264 #if defined(REDIS_DEBUG)
265  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(sql));
266 #endif
267 
268  /* execute statement */
269  reply = redisCommand(conn, octstr_get_cstr(sql));
270  }
271 
272  /* evaluate reply */
273  switch (reply->type) {
274  case REDIS_REPLY_ERROR:
275 #if defined(REDIS_DEBUG)
276  debug("dbpool.redis",0,"Received REDIS_REPLY_ERROR");
277 #endif
278  error(0, "REDIS: redisCommand() failed: `%s'", reply->str);
279  break;
280  case REDIS_REPLY_NIL:
281 #if defined(REDIS_DEBUG)
282  debug("dbpool.redis",0,"Received REDIS_REPLY_NIL");
283 #endif
284  break;
285  case REDIS_REPLY_STATUS:
286 #if defined(REDIS_DEBUG)
287  debug("dbpool.redis",0,"Received REDIS_REPLY_STATUS");
288 #endif
289  break;
290 
291  case REDIS_REPLY_STRING:
292 #if defined(REDIS_DEBUG)
293  debug("dbpool.redis",0,"Received REDIS_REPLY_STRING");
294 #endif
295  *res = gwlist_create();
296  row = gwlist_create();
297  temp = octstr_create_from_data(reply->str, reply->len);
298  gwlist_append(row, temp);
299  gwlist_produce(*res, row);
300  freeReplyObject(reply);
301  return 0;
302  break;
303 
304  case REDIS_REPLY_INTEGER:
305 #if defined(REDIS_DEBUG)
306  debug("dbpool.redis",0,"Received REDIS_REPLY_INTEGER");
307 #endif
308  *res = gwlist_create();
309  row = gwlist_create();
310  temp = octstr_format("%ld", reply->integer);
311  gwlist_append(row, temp);
312  gwlist_produce(*res, row);
313  freeReplyObject(reply);
314  return 0;
315  break;
316 
317  case REDIS_REPLY_ARRAY:
318 #if defined(REDIS_DEBUG)
319  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY");
320 #endif
321  *res = gwlist_create();
322  row = gwlist_create();
323  for (i = 0; i < reply->elements; i++) {
324  if (reply->element[i]->type == REDIS_REPLY_NIL ||
325  reply->element[i]->str == NULL || reply->element[i]->len == 0) {
326  gwlist_produce(row, octstr_imm(""));
327  continue;
328  }
329  temp = octstr_create_from_data(reply->element[i]->str, reply->element[i]->len);
330 #if defined(REDIS_DEBUG)
331  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY[%ld]: %s", i, octstr_get_cstr(temp));
332 #endif
333  gwlist_append(row, temp);
334  }
335  gwlist_produce(*res, row);
336  freeReplyObject(reply);
337  return 0;
338  break;
339 
340  default:
341 #if defined(REDIS_DEBUG)
342  error(0,"REDIS: Received unknown Redis reply type %d", reply->type);
343 #endif
344  break;
345  }
346 
347  freeReplyObject(reply);
348 
349  return -1;
350 }
351 
352 
353 static int redis_update(void *conn, const Octstr *sql, List *binds)
354 {
355  long i, binds_len;
356  int ret;
357  redisReply *reply;
358  const char **argv;
359 
360  /* bind parameters if any */
361  binds_len = gwlist_len(binds);
362 
363  if (binds_len > 0) {
364 #if defined(REDIS_DEBUG)
365  Octstr *os = octstr_create("");;
366 #endif
367 
368  argv = gw_malloc(sizeof(*argv) * binds_len);
369  for (i = 0; i < binds_len; i++) {
370  argv[i] = (char*)octstr_get_cstr(gwlist_get(binds, i));
371 #if defined(REDIS_DEBUG)
372  octstr_format_append(os, "\"%s\" ", argv[i]);
373 #endif
374  }
375 
376 #if defined(REDIS_DEBUG)
377  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(os));
378  octstr_destroy(os);
379 #endif
380 
381  /* execute statement */
382  reply = redisCommandArgv(conn, binds_len, argv, NULL);
383 
384  gw_free(argv);
385 
386  } else {
387 
388 #if defined(REDIS_DEBUG)
389  debug("dbpool.redis",0,"redis cmd: %s", octstr_get_cstr(sql));
390 #endif
391 
392  /* execute statement */
393  reply = redisCommand(conn, octstr_get_cstr(sql));
394  }
395 
396  /* evaluate reply */
397  switch (reply->type) {
398  case REDIS_REPLY_ERROR:
399 #if defined(REDIS_DEBUG)
400  debug("dbpool.redis",0,"Received REDIS_REPLY_ERROR");
401 #endif
402  error(0, "REDIS: redisCommand() failed: `%s'", reply->str);
403  break;
404  case REDIS_REPLY_STATUS:
405  /* Some Redis commands (e.g. WATCH) return a boolean status */
406 #if defined(REDIS_DEBUG)
407  debug("dbpool.redis",0,"Received REDIS_REPLY_STATUS: %s", reply->str);
408 #endif
409  if (strcmp(reply->str, "OK") == 0) {
410  freeReplyObject(reply);
411  return 0;
412  }
413  break;
414  case REDIS_REPLY_INTEGER:
415  /* Other commands (e.g. DEL) return an integer indicating
416  * the number of keys affected */
417 #if defined(REDIS_DEBUG)
418  debug("dbpool.redis",0,"Received REDIS_REPLY_INTEGER: %qi", reply->integer);
419 #endif
420  /*
421  * Note: Redis returns a long long. Casting it to an int here could
422  * cause precision loss, however as we're returning an update status,
423  * this should only ever be used to return a count of keys
424  * deleted/updated, and this will almost invariably be 1.
425  */
426  ret = (int)reply->integer;
427  freeReplyObject(reply);
428  return ret;
429  break;
430  case REDIS_REPLY_ARRAY:
431  /* The EXEC command returns an array of replies
432  * when executed successfully */
433 #if defined(REDIS_DEBUG)
434  debug("dbpool.redis",0,"Received REDIS_REPLY_ARRAY");
435 #endif
436  freeReplyObject(reply);
437  /* For now, we only support EXEC commands with an array
438  * return and in that case, all is well */
439  return 0;
440  break;
441  case REDIS_REPLY_NIL:
442  /* Finally, the EXEC command can return a NULL
443  * if it fails (e.g. due to a WATCH triggering */
444 #if defined(REDIS_DEBUG)
445  debug("dbpool.redis",0,"Received REDIS_REPLY_NIL");
446 #endif
447  break;
448  default:
449 #if defined(REDIS_DEBUG)
450  debug("dbpool.redis",0,"Received unknown Redis reply %d", reply->type);
451 #endif
452  break;
453  }
454 
455  freeReplyObject(reply);
456 
457  return -1;
458 }
459 
460 
461 static void redis_conf_destroy(DBConf *db_conf)
462 {
463  RedisConf *conf = db_conf->redis;
464 
465  octstr_destroy(conf->host);
466  octstr_destroy(conf->password);
467 
468  gw_free(conf);
469  gw_free(db_conf);
470 }
471 
472 
473 static struct db_ops redis_ops = {
474  .open = redis_open_conn,
475  .close = redis_close_conn,
476  .check = redis_check_conn,
477  .select = redis_select,
478  .update = redis_update,
479  .conf_destroy = redis_conf_destroy
480 };
481 
482 #endif /* HAVE_REDIS */
void error(int err, const char *fmt,...)
Definition: log.c:612
static Octstr * delimiter
Definition: test_ppg.c:104
void info(int err, const char *fmt,...)
Definition: log.c:636
RedisConf * redis
Definition: dbpool.h:172
void gwlist_append(List *list, void *item)
Definition: list.c:179
void gwlist_produce(List *list, void *item)
Definition: list.c:411
long gwlist_len(List *list)
Definition: list.c:166
void * gwlist_get(List *list, long pos)
Definition: list.c:292
void octstr_strip_blanks(Octstr *text)
Definition: octstr.c:1344
Octstr * password
Definition: dbpool.h:150
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
#define octstr_copy(ostr, from, len)
Definition: octstr.h:178
long octstr_search_char(const Octstr *ostr, int ch, long pos)
Definition: octstr.c:1010
static List * lines
Definition: mtbatch.c:88
Octstr * octstr_imm(const char *cstr)
Definition: octstr.c:281
void * gwlist_extract_first(List *list)
Definition: list.c:305
#define octstr_duplicate(ostr)
Definition: octstr.h:187
void warning(int err, const char *fmt,...)
Definition: log.c:624
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2462
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:322
#define octstr_create(cstr)
Definition: octstr.h:125
void octstr_destroy_item(void *os)
Definition: octstr.c:334
long database
Definition: dbpool.h:151
Definition: dbpool.h:164
long octstr_len(const Octstr *ostr)
Definition: octstr.c:340
Definition: octstr.c:118
long port
Definition: dbpool.h:149
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:690
int octstr_str_compare(const Octstr *ostr, const char *str)
Definition: octstr.c:971
void octstr_format_append(Octstr *os, const char *fmt,...)
Definition: octstr.c:2505
#define gwlist_create()
Definition: list.h:136
Octstr * host
Definition: dbpool.h:148
int octstr_get_char(const Octstr *ostr, long pos)
Definition: octstr.c:404
#define octstr_create_from_data(data, len)
Definition: octstr.h:134
List * octstr_split(const Octstr *os, const Octstr *sep)
Definition: octstr.c:1638
Definition: list.c:102
void *(* open)(const DBConf *conf)
Definition: dbpool_p.h:73
static void reply(HTTPClient *c, List *push_headers)
long idle_timeout
Definition: dbpool.h:152
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.