00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057 #include <unistd.h>
00058 #include "gwlib/gwlib.h"
00059 #include "gw/smsc/smpp_pdu.h"
00060 #include <string.h>
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070 static long http_port = 8080;
00071
00072
00073
00074
00075
00076 static long admin_port = 13000;
00077 static char *admin_password = "bar";
00078
00079
00080
00081
00082
00083 static long smpp_port = 2345;
00084
00085
00086
00087
00088
00089
00090 static long num_messages = 1;
00091
00092
00093
00094
00095
00096
00097 typedef List EventQueue;
00098
00099
00100 typedef struct Event {
00101 enum event_type {
00102 got_smsc,
00103 deliver,
00104 deliver_ack,
00105 http_request,
00106 http_response,
00107 submit,
00108 got_enquire_link
00109 } type;
00110 long id;
00111 long time;
00112
00113 Connection *conn;
00114 long sequence_number;
00115
00116
00117 HTTPClient *client;
00118 Octstr *body;
00119 } Event;
00120
00121
00122 static Counter *event_id_counter = NULL;
00123
00124
00125 static const char *eq_type(Event *e)
00126 {
00127 #define TYPE(name) case name: return #name;
00128 switch (e->type) {
00129 TYPE(got_smsc)
00130 TYPE(deliver)
00131 TYPE(deliver_ack)
00132 TYPE(http_request)
00133 TYPE(http_response)
00134 TYPE(submit)
00135 TYPE(got_enquire_link)
00136 }
00137 #undef TYPE
00138 return "unknown";
00139 }
00140
00141
00142 static Event *eq_create_event(enum event_type type)
00143 {
00144 Event *e;
00145
00146 e = gw_malloc(sizeof(*e));
00147 e->type = type;
00148 e->time = date_universal_now();
00149 e->id = counter_increase(event_id_counter);
00150 e->conn = NULL;
00151 e->sequence_number = -1;
00152 e->client = NULL;
00153 e->body = NULL;
00154 return e;
00155 }
00156
00157
00158 static Event *eq_create_submit(Connection *conn, long sequence_number,
00159 Octstr *body)
00160 {
00161 Event *e;
00162
00163 gw_assert(conn != NULL);
00164 gw_assert(sequence_number >= 0);
00165
00166 e = eq_create_event(submit);
00167 e->conn = conn;
00168 e->sequence_number = sequence_number;
00169 e->body = octstr_duplicate(body);
00170 return e;
00171 }
00172
00173
00174 static Event *eq_create_http_request(HTTPClient *client, Octstr *body)
00175 {
00176 Event *e;
00177
00178 gw_assert(client != NULL);
00179 gw_assert(body != NULL);
00180
00181 e = eq_create_event(http_request);
00182 e->client = client;
00183 e->body = octstr_duplicate(body);
00184 return e;
00185 }
00186
00187
00188 static void eq_destroy_event(Event *e)
00189 {
00190 octstr_destroy(e->body);
00191 gw_free(e);
00192 }
00193
00194
00195 static EventQueue *eq_create(void)
00196 {
00197 return gwlist_create();
00198 }
00199
00200
00201 static void eq_add_producer(EventQueue *eq)
00202 {
00203 gwlist_add_producer(eq);
00204 }
00205
00206
00207 static void eq_remove_producer(EventQueue *eq)
00208 {
00209 gwlist_remove_producer(eq);
00210 }
00211
00212
00213 static void eq_destroy(EventQueue *eq)
00214 {
00215 gwlist_destroy(eq, NULL);
00216 }
00217
00218
00219 static void eq_append(EventQueue *eq, Event *e)
00220 {
00221 gwlist_produce(eq, e);
00222 }
00223
00224
00225 static Event *eq_extract(EventQueue *eq)
00226 {
00227 return gwlist_consume(eq);
00228 }
00229
00230
00231 static void eq_log(Event *e)
00232 {
00233 info(0, "Event %ld, type %s, time %ld", e->id, eq_type(e), e->time);
00234 }
00235
00236
00237 static void eq_init(void)
00238 {
00239 event_id_counter = counter_create();
00240 }
00241
00242
00243 static void eq_shutdown(void)
00244 {
00245 counter_destroy(event_id_counter);
00246 }
00247
00248
00249 static long eq_round_trip_time(Event *e)
00250 {
00251 long now, then;
00252
00253 now = date_universal_now();
00254 if (octstr_parse_long(&then, e->body, 0, 10) == -1)
00255 return 0;
00256 return now - then;
00257 }
00258
00259
00260
00261
00262
00263
00264
00265 struct smsc_emu_arg {
00266 Semaphore *sema;
00267 EventQueue *eq;
00268 };
00269
00270
00271 static EventQueue *undelivered_messages = NULL;
00272
00273
00274
00275
00276
00277
00278
00279 enum { MAX_THREADS = 2 };
00280 enum { SMPP_MAX_QUEUE = 10 };
00281
00282
00283 struct smpp_emu_arg {
00284 EventQueue *eq;
00285 Connection *conn;
00286 long id;
00287 Semaphore *ok_to_send;
00288 long writer_id;
00289 int quit;
00290 };
00291
00292
00293 static Counter *smpp_emu_counter = NULL;
00294
00295
00296 static void smpp_emu_writer(void *arg)
00297 {
00298 Event *e;
00299 SMPP_PDU *pdu;
00300 Octstr *os;
00301 struct smpp_emu_arg *p;
00302
00303 p = arg;
00304 for (;;) {
00305 semaphore_down(p->ok_to_send);
00306 e = eq_extract(undelivered_messages);
00307 if (e == NULL)
00308 break;
00309 e->time = date_universal_now();
00310 eq_log(e);
00311 pdu = smpp_pdu_create(deliver_sm,
00312 counter_increase(smpp_emu_counter));
00313 pdu->u.deliver_sm.source_addr = octstr_create("123");
00314 pdu->u.deliver_sm.destination_addr = octstr_create("456");
00315 pdu->u.deliver_sm.short_message = octstr_format("%ld", e->time);
00316 os = smpp_pdu_pack(pdu);
00317 conn_write(p->conn, os);
00318 octstr_destroy(os);
00319 smpp_pdu_destroy(pdu);
00320 eq_destroy_event(e);
00321 }
00322 }
00323
00324
00325 static void smpp_emu_handle_pdu(struct smpp_emu_arg *p, SMPP_PDU *pdu)
00326 {
00327 SMPP_PDU *resp;
00328 Octstr *os;
00329
00330 resp = NULL;
00331 switch (pdu->type) {
00332 case bind_transmitter:
00333 resp = smpp_pdu_create(bind_transmitter_resp,
00334 pdu->u.bind_transmitter.sequence_number);
00335 break;
00336
00337 case bind_receiver:
00338 resp = smpp_pdu_create(bind_receiver_resp,
00339 pdu->u.bind_receiver.sequence_number);
00340 eq_append(p->eq, eq_create_event(got_smsc));
00341 gw_assert(p->writer_id == -1);
00342 p->writer_id = gwthread_create(smpp_emu_writer, p);
00343 if (p->writer_id == -1)
00344 panic(0, "Couldn't create SMPP helper thread.");
00345 break;
00346
00347 case submit_sm:
00348 eq_append(p->eq,
00349 eq_create_submit(p->conn, pdu->u.submit_sm.sequence_number,
00350 pdu->u.submit_sm.short_message));
00351 break;
00352
00353 case deliver_sm_resp:
00354 eq_append(p->eq, eq_create_event(deliver_ack));
00355 semaphore_up(p->ok_to_send);
00356 break;
00357
00358 case enquire_link:
00359 eq_append(p->eq, eq_create_event(got_enquire_link));
00360 resp = smpp_pdu_create(enquire_link_resp,
00361 pdu->u.enquire_link.sequence_number);
00362 break;
00363
00364 case unbind:
00365 resp = smpp_pdu_create(unbind_resp,
00366 pdu->u.unbind.sequence_number);
00367 break;
00368
00369 default:
00370 error(0, "SMPP: Unhandled PDU type %s", pdu->type_name);
00371 break;
00372 }
00373
00374 if (resp != NULL) {
00375 os = smpp_pdu_pack(resp);
00376 conn_write(p->conn, os);
00377 octstr_destroy(os);
00378 smpp_pdu_destroy(resp);
00379 }
00380 }
00381
00382
00383 static void smpp_emu_reader(void *arg)
00384 {
00385 Octstr *os;
00386 long len;
00387 SMPP_PDU *pdu;
00388 struct smpp_emu_arg *p;
00389
00390 p = arg;
00391
00392 len = 0;
00393 while (!p->quit && conn_wait(p->conn, -1.0) != -1) {
00394 for (;;) {
00395 if (len == 0) {
00396 len = smpp_pdu_read_len(p->conn);
00397 if (len == -1) {
00398 error(0, "Client sent garbage, closing connection.");
00399 goto error;
00400 } else if (len == 0) {
00401 if (conn_eof(p->conn) || conn_error(p->conn))
00402 goto error;
00403 break;
00404 }
00405 }
00406
00407 gw_assert(len > 0);
00408 os = smpp_pdu_read_data(p->conn, len);
00409 if (os != NULL) {
00410 len = 0;
00411 pdu = smpp_pdu_unpack(os);
00412 if (pdu == NULL) {
00413 error(0, "PDU unpacking failed!");
00414 octstr_dump(os, 0);
00415 } else {
00416 smpp_emu_handle_pdu(p, pdu);
00417 smpp_pdu_destroy(pdu);
00418 }
00419 octstr_destroy(os);
00420 } else if (conn_eof(p->conn) || conn_error(p->conn))
00421 goto error;
00422 else
00423 break;
00424 }
00425 }
00426
00427 error:
00428 if (p->writer_id != -1)
00429 gwthread_join(p->writer_id);
00430 }
00431
00432
00433 static void smpp_emu(void *arg)
00434 {
00435 EventQueue *eq;
00436 struct smsc_emu_arg *p;
00437 int fd;
00438 int new_fd;
00439 Octstr *client_addr;
00440 long i;
00441 long num_threads;
00442 struct smpp_emu_arg *thread[MAX_THREADS];
00443
00444 p = arg;
00445 eq = p->eq;
00446 eq_add_producer(eq);
00447 semaphore_up(p->sema);
00448
00449
00450
00451
00452 fd = make_server_socket(smpp_port, NULL);
00453 if (fd == -1)
00454 panic(0, "Couldn't create SMPP listen port.");
00455
00456 num_threads = 0;
00457 for (;;) {
00458 new_fd = gw_accept(fd, &client_addr);
00459 if (new_fd == -1)
00460 break;
00461 octstr_destroy(client_addr);
00462 if (num_threads == MAX_THREADS) {
00463 warning(0, "Too many SMPP client connections.");
00464 (void) close(new_fd);
00465 } else {
00466 thread[num_threads] = gw_malloc(sizeof(*thread[0]));
00467 thread[num_threads]->conn = conn_wrap_fd(new_fd, 0);
00468 thread[num_threads]->eq = eq;
00469 thread[num_threads]->quit = 0;
00470 thread[num_threads]->writer_id = -1;
00471 thread[num_threads]->ok_to_send =
00472 semaphore_create(SMPP_MAX_QUEUE);
00473 thread[num_threads]->id =
00474 gwthread_create(smpp_emu_reader, thread[num_threads]);
00475 if (thread[num_threads]->id == -1)
00476 panic(0, "Couldn't start SMPP subthread.");
00477 ++num_threads;
00478 }
00479 }
00480
00481 for (i = 0; i < num_threads; ++i) {
00482 thread[i]->quit = 1;
00483 gwthread_wakeup(thread[i]->id);
00484 gwthread_join(thread[i]->id);
00485 conn_destroy(thread[i]->conn);
00486 semaphore_destroy(thread[i]->ok_to_send);
00487 gw_free(thread[i]);
00488 }
00489
00490 eq_remove_producer(eq);
00491 }
00492
00493
00494
00495
00496
00497
00498
00499 static long smpp_emu_id = -1;
00500
00501
00502
00503
00504
00505 static void smsc_emu_create(EventQueue *eq)
00506 {
00507 struct smsc_emu_arg *arg;
00508
00509 gw_assert(smpp_emu_id == -1);
00510
00511 arg = gw_malloc(sizeof(*arg));
00512 arg->sema = semaphore_create(0);
00513 arg->eq = eq;
00514 smpp_emu_id = gwthread_create(smpp_emu, arg);
00515 if (smpp_emu_id == -1)
00516 panic(0, "Couldn't start SMPP emulator thread.");
00517 semaphore_down(arg->sema);
00518 semaphore_destroy(arg->sema);
00519 gw_free(arg);
00520 }
00521
00522
00523 static void smsc_emu_destroy(void)
00524 {
00525 eq_remove_producer(undelivered_messages);
00526 gw_assert(smpp_emu_id != -1);
00527 gwthread_wakeup(smpp_emu_id);
00528 gwthread_join(smpp_emu_id);
00529 }
00530
00531
00532 static void smsc_emu_deliver(void)
00533 {
00534 eq_append(undelivered_messages, eq_create_event(deliver));
00535 }
00536
00537
00538 static void smsc_emu_submit_ack(Event *e)
00539 {
00540 SMPP_PDU *resp;
00541 Octstr *os;
00542
00543 resp = smpp_pdu_create(submit_sm_resp, e->sequence_number);
00544 os = smpp_pdu_pack(resp);
00545 conn_write(e->conn, os);
00546 octstr_destroy(os);
00547 smpp_pdu_destroy(resp);
00548 }
00549
00550
00551 static void smsc_emu_init(void)
00552 {
00553 smpp_emu_counter = counter_create();
00554 undelivered_messages = eq_create();
00555 eq_add_producer(undelivered_messages);
00556 }
00557
00558
00559 static void smsc_emu_shutdown(void)
00560 {
00561 counter_destroy(smpp_emu_counter);
00562 eq_destroy(undelivered_messages);
00563 }
00564
00565
00566
00567
00568
00569
00570
00571 static List *httpd_emu_headers = NULL;
00572
00573
00574 struct httpd_emu_arg {
00575 int port;
00576 Semaphore *sema;
00577 EventQueue *eq;
00578 };
00579
00580
00581
00582
00583
00584 static void httpd_emu(void *arg)
00585 {
00586 HTTPClient *client;
00587 Octstr *ip;
00588 Octstr *url;
00589 List *headers;
00590 Octstr *body;
00591 List *cgivars;
00592 struct httpd_emu_arg *p;
00593 EventQueue *eq;
00594
00595 p = arg;
00596 eq = p->eq;
00597 eq_add_producer(eq);
00598 semaphore_up(p->sema);
00599
00600 for (;;) {
00601 client = http_accept_request(p->port, &ip, &url, &headers, &body,
00602 &cgivars);
00603 if (client == NULL)
00604 break;
00605
00606 eq_append(eq, eq_create_http_request(client,
00607 http_cgi_variable(cgivars, "arg")));
00608 octstr_destroy(ip);
00609 octstr_destroy(url);
00610 http_destroy_headers(headers);
00611 octstr_destroy(body);
00612 http_destroy_cgiargs(cgivars);
00613 }
00614 eq_remove_producer(eq);
00615 gw_free(p);
00616 }
00617
00618
00619
00620
00621
00622
00623 static long httpd_emu_tid = -1;
00624
00625
00626
00627
00628
00629
00630 static void httpd_emu_create(EventQueue *eq)
00631 {
00632 struct httpd_emu_arg *arg;
00633 int ssl = 0;
00634
00635 if (http_open_port(http_port, ssl) == -1)
00636 panic(0, "Can't open HTTP server emulator port %ld.", http_port);
00637
00638 gw_assert(httpd_emu_tid == -1);
00639 arg = gw_malloc(sizeof(*arg));
00640 arg->port = http_port;
00641 arg->sema = semaphore_create(0);
00642 arg->eq = eq;
00643 httpd_emu_tid = gwthread_create(httpd_emu, arg);
00644 if (httpd_emu_tid == -1)
00645 panic(0, "Can't start the HTTP server emulator thread.");
00646 semaphore_down(arg->sema);
00647 semaphore_destroy(arg->sema);
00648 }
00649
00650
00651
00652
00653
00654
00655 static void httpd_emu_destroy(void)
00656 {
00657 gw_assert(httpd_emu_tid != -1);
00658 http_close_all_ports();
00659 gwthread_join(httpd_emu_tid);
00660 httpd_emu_tid = -1;
00661 }
00662
00663
00664
00665
00666
00667 static void httpd_emu_reply(Event *e)
00668 {
00669 http_send_reply(e->client, HTTP_OK, httpd_emu_headers, e->body);
00670 }
00671
00672
00673 static void httpd_emu_init(void)
00674 {
00675 httpd_emu_headers = http_create_empty_headers();
00676 http_header_add(httpd_emu_headers, "Content-Type", "text/plain");
00677 }
00678
00679
00680 static void httpd_emu_shutdown(void)
00681 {
00682 http_destroy_headers(httpd_emu_headers);
00683 }
00684
00685
00686
00687
00688
00689
00690
00691 static void kill_kannel(void)
00692 {
00693 Octstr *url;
00694 Octstr *final_url;
00695 List *req_headers;
00696 List *reply_headers;
00697 Octstr *reply_body;
00698 int ret;
00699
00700 url = octstr_format("http://localhost:%ld/shutdown?password=%s",
00701 admin_port, admin_password);
00702 req_headers = http_create_empty_headers();
00703 http_header_add(req_headers, "Content-Type", "text/plain");
00704 ret = http_get_real(HTTP_METHOD_GET, url, req_headers, &final_url,
00705 &reply_headers, &reply_body);
00706 if (ret != -1) {
00707 octstr_destroy(final_url);
00708 http_destroy_headers(reply_headers);
00709 octstr_destroy(reply_body);
00710 }
00711 octstr_destroy(url);
00712 http_destroy_headers(req_headers);
00713 }
00714
00715
00716
00717
00718
00719
00720 enum { MAX_IN_AVERAGE = 100 };
00721 enum { MAX_RTT = 1 };
00722 enum { MAX_WAITING = 100 };
00723
00724 static void sustained_level_benchmark(void)
00725 {
00726 EventQueue *eq;
00727 Event *e;
00728 long i;
00729 long num_deliver;
00730 long num_submit;
00731 long rtt;
00732 long times[MAX_IN_AVERAGE];
00733 long next_time;
00734 double time_sum;
00735 long num_unanswered;
00736
00737 eq = eq_create();
00738
00739 httpd_emu_create(eq);
00740 smsc_emu_create(eq);
00741
00742
00743 while ((e = eq_extract(eq)) != NULL && e->type != got_smsc)
00744 debug("test_smsc", 0, "Discarding event of type %s", eq_type(e));
00745 debug("test_smsc", 0, "Got event got_smsc.");
00746 eq_destroy_event(e);
00747
00748
00749
00750
00751
00752 num_submit = 0;
00753 for (i = 0; i < MAX_IN_AVERAGE; ++i)
00754 times[i] = 0;
00755 next_time = 0;
00756 time_sum = 0.0;
00757 num_unanswered = 0;
00758 num_deliver = 0;
00759
00760 while (num_submit < num_messages) {
00761 for (;;) {
00762 if (num_deliver >= num_messages || num_unanswered >= MAX_WAITING)
00763 break;
00764 if (time_sum / MAX_IN_AVERAGE >= MAX_RTT && num_unanswered > 0)
00765 break;
00766 smsc_emu_deliver();
00767 ++num_unanswered;
00768 ++num_deliver;
00769 }
00770
00771 e = eq_extract(eq);
00772 if (e == NULL)
00773 break;
00774 eq_log(e);
00775
00776 switch (e->type) {
00777 case deliver_ack:
00778 break;
00779
00780 case http_request:
00781 httpd_emu_reply(e);
00782 break;
00783
00784 case submit:
00785 rtt = eq_round_trip_time(e);
00786 time_sum -= times[next_time];
00787 times[next_time] = rtt;
00788 time_sum += times[next_time];
00789 debug("", 0, "RTT = %ld", rtt);
00790 next_time = (next_time + 1) % MAX_IN_AVERAGE;
00791 ++num_submit;
00792 --num_unanswered;
00793 smsc_emu_submit_ack(e);
00794 break;
00795
00796 case got_enquire_link:
00797 break;
00798
00799 default:
00800 debug("test_smsc", 0, "Ignoring event of type %s", eq_type(e));
00801 break;
00802 }
00803
00804 eq_destroy_event(e);
00805 }
00806
00807 kill_kannel();
00808
00809 debug("test_smsc", 0, "Terminating benchmark.");
00810 smsc_emu_destroy();
00811 httpd_emu_destroy();
00812 eq_destroy(eq);
00813 }
00814
00815
00816
00817
00818
00819
00820 enum { MAX_IN_QUEUE = 1000 };
00821
00822 static void n_messages_benchmark(void)
00823 {
00824 EventQueue *eq;
00825 Event *e;
00826 long i;
00827 long num_submit;
00828 long num_in_queue;
00829 long num_deliver;
00830
00831 eq = eq_create();
00832
00833 httpd_emu_create(eq);
00834 smsc_emu_create(eq);
00835
00836
00837 while ((e = eq_extract(eq)) != NULL && e->type != got_smsc)
00838 debug("test_smsc", 0, "Discarding event of type %s", eq_type(e));
00839 debug("test_smsc", 0, "Got event got_smsc.");
00840 eq_destroy_event(e);
00841
00842
00843 for (i = 0; i < num_messages && i < MAX_IN_QUEUE; ++i)
00844 smsc_emu_deliver();
00845 num_in_queue = i;
00846 num_deliver = i;
00847
00848
00849
00850
00851
00852 num_submit = 0;
00853 while (num_submit < num_messages && (e = eq_extract(eq)) != NULL) {
00854 while (num_deliver < num_messages && num_in_queue < MAX_IN_QUEUE) {
00855 smsc_emu_deliver();
00856 ++num_in_queue;
00857 ++num_deliver;
00858 }
00859
00860 eq_log(e);
00861
00862 switch (e->type) {
00863 case deliver_ack:
00864 break;
00865
00866 case http_request:
00867 httpd_emu_reply(e);
00868 break;
00869
00870 case submit:
00871 debug("", 0, "RTT = %ld", eq_round_trip_time(e));
00872 smsc_emu_submit_ack(e);
00873 ++num_submit;
00874 --num_in_queue;
00875 break;
00876
00877 case got_enquire_link:
00878 break;
00879
00880 default:
00881 debug("test_smsc", 0, "Ignoring event of type %s", eq_type(e));
00882 break;
00883 }
00884
00885 eq_destroy_event(e);
00886 }
00887
00888 kill_kannel();
00889
00890 debug("test_smsc", 0, "Terminating benchmark.");
00891 smsc_emu_destroy();
00892 httpd_emu_destroy();
00893 eq_destroy(eq);
00894 }
00895
00896
00897
00898
00899
00900
00901
00902 int main(int argc, char **argv)
00903 {
00904 int opt;
00905 char *main_name;
00906 int i;
00907 static struct {
00908 char *name;
00909 void (*func)(void);
00910 } tab[] = {
00911 { "n_messages", n_messages_benchmark },
00912 { "sustained_level", sustained_level_benchmark },
00913 };
00914
00915 gwlib_init();
00916 eq_init();
00917 httpd_emu_init();
00918 smsc_emu_init();
00919
00920 main_name = "n_messages_benchmark";
00921
00922 while ((opt = getopt(argc, argv, "m:r:")) != EOF) {
00923 switch (opt) {
00924 case 'm':
00925 main_name = optarg;
00926 break;
00927 case 'r':
00928 num_messages = atoi(optarg);
00929 break;
00930 }
00931 }
00932
00933 for (i = 0; (size_t) i < sizeof(tab) / sizeof(tab[0]); ++i) {
00934 if (strcmp(main_name, tab[i].name) == 0) {
00935 tab[i].func();
00936 break;
00937 }
00938 }
00939
00940 smsc_emu_shutdown();
00941 httpd_emu_shutdown();
00942 eq_shutdown();
00943 gwlib_shutdown();
00944 return 0;
00945 }
See file LICENSE for details about the license agreement for using,
modifying, copying or deriving work from this software.