00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 #include "gwlib/gwlib.h"
00066 #include "smscconn.h"
00067 #include "smscconn_p.h"
00068 #include "bb_smscconn_cb.h"
00069
00070 #include "smsc.h"
00071 #include "smsc_p.h"
00072
00073
00074 typedef struct smsc_wrapper {
00075 SMSCenter *smsc;
00076 List *outgoing_queue;
00077 List *stopped;
00078 long receiver_thread;
00079 long sender_thread;
00080 Mutex *reconnect_mutex;
00081 } SmscWrapper;
00082
00083
00084 static void smscwrapper_destroy(SmscWrapper *wrap)
00085 {
00086 if (wrap == NULL)
00087 return;
00088 gwlist_destroy(wrap->outgoing_queue, NULL);
00089 gwlist_destroy(wrap->stopped, NULL);
00090 mutex_destroy(wrap->reconnect_mutex);
00091 if (wrap->smsc != NULL)
00092 smsc_close(wrap->smsc);
00093 gw_free(wrap);
00094 }
00095
00096
00097 static int reconnect(SMSCConn *conn)
00098 {
00099 SmscWrapper *wrap = conn->data;
00100 Msg *msg;
00101 int ret;
00102 int wait = 1;
00103
00104
00105
00106
00107
00108
00109
00110
00111 if (conn->status == SMSCCONN_RECONNECTING) {
00112 mutex_lock(wrap->reconnect_mutex);
00113 mutex_unlock(wrap->reconnect_mutex);
00114 return 0;
00115 }
00116 mutex_lock(wrap->reconnect_mutex);
00117
00118 debug("bb.sms", 0, "smsc_wrapper <%s>: reconnect started",
00119 octstr_get_cstr(conn->name));
00120
00121 while((msg = gwlist_extract_first(wrap->outgoing_queue))!=NULL) {
00122 bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_TEMPORARILY, NULL);
00123 }
00124 conn->status = SMSCCONN_RECONNECTING;
00125
00126
00127 while(conn->why_killed == SMSCCONN_ALIVE) {
00128 ret = smsc_reopen(wrap->smsc);
00129 if (ret == 0) {
00130 info(0, "Re-open of %s succeeded.", octstr_get_cstr(conn->name));
00131 mutex_lock(conn->flow_mutex);
00132 conn->status = SMSCCONN_ACTIVE;
00133 conn->connect_time = time(NULL);
00134 mutex_unlock(conn->flow_mutex);
00135 bb_smscconn_connected(conn);
00136 break;
00137 }
00138 else if (ret == -2) {
00139 error(0, "Re-open of %s failed permanently",
00140 octstr_get_cstr(conn->name));
00141 mutex_lock(conn->flow_mutex);
00142 conn->status = SMSCCONN_DISCONNECTED;
00143 mutex_unlock(wrap->reconnect_mutex);
00144 mutex_unlock(conn->flow_mutex);
00145 return -1;
00146 }
00147 else {
00148 error(0, "Re-open to <%s> failed, retrying after %d minutes...",
00149 octstr_get_cstr(conn->name), wait);
00150 gwthread_sleep(wait*60.0);
00151
00152 wait = wait > 10 ? 10 : wait * 2 + 1;
00153 }
00154 }
00155 mutex_unlock(wrap->reconnect_mutex);
00156 return 0;
00157 }
00158
00159
00160 static Msg *sms_receive(SMSCConn *conn)
00161 {
00162 SmscWrapper *wrap = conn->data;
00163 int ret;
00164 Msg *newmsg = NULL;
00165
00166 if (smscenter_pending_smsmessage(wrap->smsc) == 1) {
00167
00168 ret = smscenter_receive_msg(wrap->smsc, &newmsg);
00169 if (ret == 1) {
00170
00171
00172 newmsg->sms.smsc_id = octstr_duplicate(conn->id);
00173
00174 return newmsg;
00175 } else if (ret == 0) {
00176 warning(0, "SMSC %s: Pending message returned '1', "
00177 "but nothing to receive!", octstr_get_cstr(conn->name));
00178 msg_destroy(newmsg);
00179 return NULL;
00180 } else {
00181 msg_destroy(newmsg);
00182 if (reconnect(conn) == -1)
00183 smscconn_shutdown(conn, 0);
00184 return NULL;
00185 }
00186 }
00187 return NULL;
00188 }
00189
00190
00191 static void wrapper_receiver(void *arg)
00192 {
00193 Msg *msg;
00194 SMSCConn *conn = arg;
00195 SmscWrapper *wrap = conn->data;
00196
00197 double sleep = 0.0001;
00198
00199
00200 log_thread_to(conn->log_idx);
00201
00202
00203 while(conn->why_killed == SMSCCONN_ALIVE) {
00204
00205 gwlist_consume(wrap->stopped);
00206
00207 msg = sms_receive(conn);
00208 if (msg) {
00209 debug("bb.sms", 0, "smscconn (%s): new message received",
00210 octstr_get_cstr(conn->name));
00211 sleep = 0.0001;
00212 bb_smscconn_receive(conn, msg);
00213 }
00214 else {
00215
00216
00217
00218 gwthread_sleep(sleep);
00219
00220
00221
00222
00223 sleep *= 2;
00224 if (sleep >= 2.0)
00225 sleep = 1.999999;
00226 }
00227 }
00228 conn->why_killed = SMSCCONN_KILLED_SHUTDOWN;
00229
00230
00231 }
00232
00233
00234
00235 static int sms_send(SMSCConn *conn, Msg *msg)
00236 {
00237 SmscWrapper *wrap = conn->data;
00238 int ret;
00239
00240 debug("bb.sms", 0, "smscconn_sender (%s): sending message",
00241 octstr_get_cstr(conn->name));
00242
00243 ret = smscenter_submit_msg(wrap->smsc, msg);
00244 if (ret == -1) {
00245 bb_smscconn_send_failed(conn, msg,
00246 SMSCCONN_FAILED_REJECTED, octstr_create("REJECTED"));
00247
00248 if (reconnect(conn) == -1)
00249 smscconn_shutdown(conn, 0);
00250 return -1;
00251 } else {
00252 bb_smscconn_sent(conn, msg, NULL);
00253 return 0;
00254 }
00255 }
00256
00257
00258 static void wrapper_sender(void *arg)
00259 {
00260 Msg *msg;
00261 SMSCConn *conn = arg;
00262 SmscWrapper *wrap = conn->data;
00263
00264
00265 log_thread_to(conn->log_idx);
00266
00267
00268
00269 while(conn->status != SMSCCONN_DEAD) {
00270
00271 if ((msg = gwlist_consume(wrap->outgoing_queue)) == NULL)
00272 break;
00273
00274 if (octstr_search_char(msg->sms.receiver, ' ', 0) != -1) {
00275
00276
00277
00278
00279 int i;
00280 Msg *newmsg;
00281
00282
00283
00284 List *nlist = octstr_split_words(msg->sms.receiver);
00285
00286 debug("bb.sms", 0, "Handling multi-receiver message");
00287
00288 for(i=0; i < gwlist_len(nlist); i++) {
00289
00290 newmsg = msg_duplicate(msg);
00291 octstr_destroy(newmsg->sms.receiver);
00292
00293 newmsg->sms.receiver = gwlist_get(nlist, i);
00294 sms_send(conn, newmsg);
00295 }
00296 gwlist_destroy(nlist, NULL);
00297 msg_destroy(msg);
00298 }
00299 else
00300 sms_send(conn,msg);
00301
00302 }
00303
00304
00305 debug("bb.sms", 0, "SMSCConn %s sender died, waiting for receiver",
00306 octstr_get_cstr(conn->name));
00307
00308 conn->why_killed = SMSCCONN_KILLED_SHUTDOWN;
00309
00310 if (conn->is_stopped) {
00311 gwlist_remove_producer(wrap->stopped);
00312 conn->is_stopped = 0;
00313 }
00314
00315 gwthread_wakeup(wrap->receiver_thread);
00316 gwthread_join(wrap->receiver_thread);
00317
00318
00319
00320 mutex_lock(conn->flow_mutex);
00321
00322 conn->status = SMSCCONN_DEAD;
00323
00324 while((msg = gwlist_extract_first(wrap->outgoing_queue))!=NULL) {
00325 bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_SHUTDOWN, NULL);
00326 }
00327 smscwrapper_destroy(wrap);
00328 conn->data = NULL;
00329
00330 mutex_unlock(conn->flow_mutex);
00331
00332 bb_smscconn_killed();
00333 }
00334
00335
00336
00337 static int wrapper_add_msg(SMSCConn *conn, Msg *sms)
00338 {
00339 SmscWrapper *wrap = conn->data;
00340 Msg *copy;
00341
00342 copy = msg_duplicate(sms);
00343 gwlist_produce(wrap->outgoing_queue, copy);
00344
00345 return 0;
00346 }
00347
00348
00349 static int wrapper_shutdown(SMSCConn *conn, int finish_sending)
00350 {
00351 SmscWrapper *wrap = conn->data;
00352
00353 debug("bb.sms", 0, "Shutting down SMSCConn %s, %s",
00354 octstr_get_cstr(conn->name), finish_sending ? "slow" : "instant");
00355
00356 if (finish_sending == 0) {
00357 Msg *msg;
00358 while((msg = gwlist_extract_first(wrap->outgoing_queue))!=NULL) {
00359 bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_SHUTDOWN, NULL);
00360 }
00361 }
00362 gwlist_remove_producer(wrap->outgoing_queue);
00363 gwthread_wakeup(wrap->sender_thread);
00364 gwthread_wakeup(wrap->receiver_thread);
00365 return 0;
00366 }
00367
00368 static void wrapper_stop(SMSCConn *conn)
00369 {
00370 SmscWrapper *wrap = conn->data;
00371
00372 debug("smscconn", 0, "Stopping wrapper");
00373 gwlist_add_producer(wrap->stopped);
00374
00375 }
00376
00377 static void wrapper_start(SMSCConn *conn)
00378 {
00379 SmscWrapper *wrap = conn->data;
00380
00381 debug("smscconn", 0, "Starting wrapper");
00382 gwlist_remove_producer(wrap->stopped);
00383 }
00384
00385
00386 static long wrapper_queued(SMSCConn *conn)
00387 {
00388 SmscWrapper *wrap = conn->data;
00389 long ret = gwlist_len(wrap->outgoing_queue);
00390
00391
00392
00393 conn->load = ret;
00394 return ret;
00395 }
00396
00397 int smsc_wrapper_create(SMSCConn *conn, CfgGroup *cfg)
00398 {
00399
00400
00401
00402
00403
00404
00405
00406 SmscWrapper *wrap;
00407
00408 wrap = gw_malloc(sizeof(SmscWrapper));
00409 wrap->smsc = NULL;
00410 conn->data = wrap;
00411 conn->send_msg = wrapper_add_msg;
00412
00413
00414 wrap->outgoing_queue = gwlist_create();
00415 wrap->stopped = gwlist_create();
00416 wrap->reconnect_mutex = mutex_create();
00417 gwlist_add_producer(wrap->outgoing_queue);
00418
00419 if ((wrap->smsc = smsc_open(cfg)) == NULL)
00420 goto error;
00421
00422 conn->name = octstr_create(smsc_name(wrap->smsc));
00423 conn->status = SMSCCONN_ACTIVE;
00424 conn->connect_time = time(NULL);
00425
00426 if (conn->is_stopped)
00427 gwlist_add_producer(wrap->stopped);
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438 if ((wrap->receiver_thread = gwthread_create(wrapper_receiver, conn))==-1)
00439 goto error;
00440
00441 if ((wrap->sender_thread = gwthread_create(wrapper_sender, conn))==-1)
00442 goto error;
00443
00444 conn->shutdown = wrapper_shutdown;
00445 conn->queued = wrapper_queued;
00446 conn->stop_conn = wrapper_stop;
00447 conn->start_conn = wrapper_start;
00448
00449 return 0;
00450
00451 error:
00452 error(0, "Failed to create Smsc wrapper");
00453 conn->data = NULL;
00454 smscwrapper_destroy(wrap);
00455 conn->why_killed = SMSCCONN_KILLED_CANNOT_CONNECT;
00456 conn->status = SMSCCONN_DEAD;
00457 return -1;
00458 }
00459
00460
00461
See file LICENSE for details about the license agreement for using,
modifying, copying or deriving work from this software.