#include "gw-config.h"#include <pthread.h>#include "thread.h"#include "gwmem.h"#include "gwassert.h"#include "gwthread.h"#include "gw-prioqueue.h"Include dependency graph for gw-prioqueue.c:

Go to the source code of this file.
|
||||||||||||||||
|
Definition at line 116 of file gw-prioqueue.c. References element::item, and element::seq. Referenced by downheap(), and upheap(). 00117 {
00118 int rc;
00119
00120 rc = cmp(a->item, b->item);
00121 if (rc == 0) {
00122 /* check sequence to guarantee order */
00123 if (a->seq < b->seq)
00124 rc = 1;
00125 else if (a->seq > b->seq)
00126 rc = -1;
00127 }
00128
00129 return rc;
00130 }
|
|
||||||||||||
|
Heapize down - our prioqueue - start index Definition at line 154 of file gw-prioqueue.c. References gw_prioqueue::cmp, compare(), gw_prioqueue_t, gw_prioqueue::len, and gw_prioqueue::tab. Referenced by gw_prioqueue_consume(), and gw_prioqueue_remove(). 00155 {
00156 struct element *v = queue->tab[index];
00157 register long j;
00158
00159 while (index <= queue->len / 2) {
00160 j = 2 * index;
00161 /* take the biggest child item */
00162 if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
00163 j++;
00164 /* break if our item bigger */
00165 if (compare(v, queue->tab[j], queue->cmp) >= 0)
00166 break;
00167 queue->tab[index] = queue->tab[j];
00168 index = j;
00169 }
00170 queue->tab[index] = v;
00171 }
|
Here is the call graph for this function:

|
|
Add producer to the priority queue - priority queue Definition at line 329 of file gw-prioqueue.c. References gw_assert, gw_prioqueue_t, gw_prioqueue::producers, queue_lock(), and queue_unlock(). 00330 {
00331 gw_assert(queue != NULL);
00332
00333 queue_lock(queue);
00334 queue->producers++;
00335 queue_unlock(queue);
00336 }
|
Here is the call graph for this function:

|
|
Remove biggest item from the priority queue, but block if producers available and none items in the queue - priority queue
Definition at line 303 of file gw-prioqueue.c. References downheap(), gw_assert, gw_prioqueue_t, gwthread_self(), element::item, gw_prioqueue::len, gw_prioqueue::mutex, Mutex::mutex, gw_prioqueue::nonempty, Mutex::owner, gw_prioqueue::producers, queue_lock(), queue_unlock(), and gw_prioqueue::tab. 00304 {
00305 void *ret;
00306
00307 gw_assert(queue != NULL);
00308
00309 queue_lock(queue);
00310 while (queue->len == 1 && queue->producers > 0) {
00311 queue->mutex->owner = -1;
00312 pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex);
00313 queue->mutex->owner = gwthread_self();
00314 }
00315 if (queue->len > 1) {
00316 ret = queue->tab[1]->item;
00317 gw_free(queue->tab[1]);
00318 queue->tab[1] = queue->tab[--queue->len];
00319 downheap(queue, 1);
00320 } else {
00321 ret = NULL;
00322 }
00323 queue_unlock(queue);
00324
00325 return ret;
00326 }
|
Here is the call graph for this function:

|
|
Create priority queue - compare function
Definition at line 174 of file gw-prioqueue.c. References gw_assert, gw_prioqueue_t, make_bigger(), gw_prioqueue::mutex, mutex_create, and gw_prioqueue::producers. Referenced by main(), smsc_at2_create(), and smsc_emi2_create(). 00175 {
00176 gw_prioqueue_t *ret;
00177
00178 gw_assert(cmp != NULL);
00179
00180 ret = gw_malloc(sizeof(*ret));
00181 ret->producers = 0;
00182 pthread_cond_init(&ret->nonempty, NULL);
00183 ret->mutex = mutex_create();
00184 ret->tab = NULL;
00185 ret->size = 0;
00186 ret->len = 0;
00187 ret->seq = 0;
00188 ret->cmp = cmp;
00189
00190 /* put NULL item at pos 0 that is our stop marker */
00191 make_bigger(ret, 1);
00192 ret->tab[0] = gw_malloc(sizeof(**ret->tab));
00193 ret->tab[0]->item = NULL;
00194 ret->tab[0]->seq = ret->seq++;
00195 ret->len++;
00196
00197 return ret;
00198 }
|
Here is the call graph for this function:

|
||||||||||||
|
Destroy priority queue - queue to destroy - item destructor Definition at line 201 of file gw-prioqueue.c. References gw_prioqueue_t, element::item, item_destroy(), gw_prioqueue::len, gw_prioqueue::mutex, mutex_destroy(), gw_prioqueue::nonempty, and gw_prioqueue::tab. Referenced by at2_device_thread(), emi2_sender(), smsc_at2_create(), and smsc_emi2_create(). 00202 {
00203 long i;
00204
00205 if (queue == NULL)
00206 return;
00207
00208 for (i = 0; i < queue->len; i++) {
00209 if (item_destroy != NULL && queue->tab[i]->item != NULL)
00210 item_destroy(queue->tab[i]->item);
00211 gw_free(queue->tab[i]);
00212 }
00213 mutex_destroy(queue->mutex);
00214 pthread_cond_destroy(&queue->nonempty);
00215 gw_free(queue->tab);
00216 gw_free(queue);
00217 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 252 of file gw-prioqueue.c. References gw_assert, gw_prioqueue_t, element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab. Referenced by main(). 00253 {
00254 register long i;
00255
00256 gw_assert(queue != NULL && fn != NULL);
00257
00258 queue_lock(queue);
00259 for (i = 1; i < queue->len; i++)
00260 fn(queue->tab[i]->item, i - 1);
00261 queue_unlock(queue);
00262 }
|
Here is the call graph for this function:

|
|
Definition at line 286 of file gw-prioqueue.c. References gw_assert, gw_prioqueue_t, element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab. 00287 {
00288 void *ret;
00289
00290 gw_assert(queue != NULL);
00291
00292 queue_lock(queue);
00293 if (queue->len > 1)
00294 ret = queue->tab[1]->item;
00295 else
00296 ret = NULL;
00297 queue_unlock(queue);
00298
00299 return ret;
00300 }
|
Here is the call graph for this function:

|
||||||||||||
|
Insert item into the priority queue - priority queue - to be inserted item Definition at line 235 of file gw-prioqueue.c. References gw_assert, gw_prioqueue_t, element::item, gw_prioqueue::len, make_bigger(), gw_prioqueue::nonempty, queue_lock(), queue_unlock(), element::seq, gw_prioqueue::seq, gw_prioqueue::tab, and upheap(). Referenced by main(). 00236 {
00237 gw_assert(queue != NULL);
00238 gw_assert(item != NULL);
00239
00240 queue_lock(queue);
00241 make_bigger(queue, 1);
00242 queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
00243 queue->tab[queue->len]->item = item;
00244 queue->tab[queue->len]->seq = queue->seq++;
00245 upheap(queue, queue->len);
00246 queue->len++;
00247 pthread_cond_signal(&queue->nonempty);
00248 queue_unlock(queue);
00249 }
|
Here is the call graph for this function:

|
|
Return priority queue length - priority queue
Definition at line 220 of file gw-prioqueue.c. References gw_prioqueue_t, gw_prioqueue::len, queue_lock(), and queue_unlock(). Referenced by at2_device_thread(), at2_queued_cb(), at2_send_messages(), emi2_wait(), main(), and queued_cb(). 00221 {
00222 long len;
00223
00224 if (queue == NULL)
00225 return 0;
00226
00227 queue_lock(queue);
00228 len = queue->len - 1;
00229 queue_unlock(queue);
00230
00231 return len;
00232 }
|
Here is the call graph for this function:

|
|
Return producer count for the priority queue - priority queue
Definition at line 351 of file gw-prioqueue.c. References gw_assert, gw_prioqueue_t, gw_prioqueue::producers, queue_lock(), and queue_unlock(). 00352 {
00353 long ret;
00354
00355 gw_assert(queue != NULL);
00356
00357 queue_lock(queue);
00358 ret = queue->producers;
00359 queue_unlock(queue);
00360
00361 return ret;
00362 }
|
Here is the call graph for this function:

|
|
Remove biggest item from the priority queue, but not block if producers available and none items in the queue - priority queue
Definition at line 265 of file gw-prioqueue.c. References downheap(), gw_assert, gw_prioqueue_t, element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab. Referenced by at2_send_messages(), at2_shutdown_cb(), emi2_do_send(), emi2_sender(), main(), open_send_connection(), and shutdown_cb(). 00266 {
00267 void *ret;
00268
00269 gw_assert(queue != NULL);
00270
00271 queue_lock(queue);
00272 if (queue->len <= 1) {
00273 queue_unlock(queue);
00274 return NULL;
00275 }
00276 ret = queue->tab[1]->item;
00277 gw_free(queue->tab[1]);
00278 queue->tab[1] = queue->tab[--queue->len];
00279 downheap(queue, 1);
00280 queue_unlock(queue);
00281
00282 return ret;
00283 }
|
Here is the call graph for this function:

|
|
Remove producer from the priority queue - priority queue Definition at line 339 of file gw-prioqueue.c. References gw_assert, gw_prioqueue_t, gw_prioqueue::nonempty, gw_prioqueue::producers, queue_lock(), and queue_unlock(). 00340 {
00341 gw_assert(queue != NULL);
00342
00343 queue_lock(queue);
00344 gw_assert(queue->producers > 0);
00345 queue->producers--;
00346 pthread_cond_broadcast(&queue->nonempty);
00347 queue_unlock(queue);
00348 }
|
Here is the call graph for this function:

|
||||||||||||
|
Definition at line 103 of file gw-prioqueue.c. References gw_prioqueue_t, gw_prioqueue::len, gw_prioqueue::size, size, and gw_prioqueue::tab. 00104 {
00105 size_t size = queue->size;
00106 size_t new_size = sizeof(*queue->tab) * (queue->len + items);
00107
00108 if (size >= new_size)
00109 return;
00110
00111 queue->tab = gw_realloc(queue->tab, new_size);
00112 queue->size = new_size;
00113 }
|
|
|
Definition at line 91 of file gw-prioqueue.c. References gw_prioqueue_t, gw_prioqueue::mutex, and mutex_lock. Referenced by gw_prioqueue_add_producer(), gw_prioqueue_consume(), gw_prioqueue_foreach(), gw_prioqueue_get(), gw_prioqueue_insert(), gw_prioqueue_len(), gw_prioqueue_producer_count(), gw_prioqueue_remove(), and gw_prioqueue_remove_producer(). 00092 {
00093 mutex_lock(queue->mutex);
00094 }
|
|
|
Definition at line 97 of file gw-prioqueue.c. References gw_prioqueue_t, gw_prioqueue::mutex, and mutex_unlock. Referenced by gw_prioqueue_add_producer(), gw_prioqueue_consume(), gw_prioqueue_foreach(), gw_prioqueue_get(), gw_prioqueue_insert(), gw_prioqueue_len(), gw_prioqueue_producer_count(), gw_prioqueue_remove(), and gw_prioqueue_remove_producer(). 00098 {
00099 mutex_unlock(queue->mutex);
00100 }
|
|
||||||||||||
|
Heapize up - our prioqueue + - start index Definition at line 138 of file gw-prioqueue.c. References gw_prioqueue::cmp, compare(), gw_prioqueue_t, element::item, and gw_prioqueue::tab. Referenced by gw_prioqueue_insert(). 00139 {
00140 struct element *v = queue->tab[index];
00141 while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
00142 queue->tab[index] = queue->tab[index / 2];
00143 index /= 2;
00144 }
00145 queue->tab[index] = v;
00146 }
|
Here is the call graph for this function:
