Main Page | Alphabetical List | Data Structures | Directories | File List | Data Fields | Globals

gw-prioqueue.c File Reference

#include "gw-config.h"
#include <pthread.h>
#include "thread.h"
#include "gwmem.h"
#include "gwassert.h"
#include "gwthread.h"
#include "gw-prioqueue.h"

Include dependency graph for gw-prioqueue.c:

Include dependency graph

Go to the source code of this file.

Data Structures

struct  element
struct  gw_prioqueue

Functions

void queue_lock (gw_prioqueue_t *queue)
void queue_unlock (gw_prioqueue_t *queue)
void make_bigger (gw_prioqueue_t *queue, long items)
int compare (struct element *a, struct element *b, int(*cmp)(const void *, const void *))
void upheap (gw_prioqueue_t *queue, register long index)
void downheap (gw_prioqueue_t *queue, register long index)
gw_prioqueue_tgw_prioqueue_create (int(*cmp)(const void *, const void *))
void gw_prioqueue_destroy (gw_prioqueue_t *queue, void(*item_destroy)(void *))
long gw_prioqueue_len (gw_prioqueue_t *queue)
void gw_prioqueue_insert (gw_prioqueue_t *queue, void *item)
void gw_prioqueue_foreach (gw_prioqueue_t *queue, void(*fn)(const void *, long))
void * gw_prioqueue_remove (gw_prioqueue_t *queue)
void * gw_prioqueue_get (gw_prioqueue_t *queue)
void * gw_prioqueue_consume (gw_prioqueue_t *queue)
void gw_prioqueue_add_producer (gw_prioqueue_t *queue)
void gw_prioqueue_remove_producer (gw_prioqueue_t *queue)
long gw_prioqueue_producer_count (gw_prioqueue_t *queue)


Function Documentation

int compare struct element a,
struct element b,
int(*)(const void *, const void *)  cmp
[static]
 

Definition at line 116 of file gw-prioqueue.c.

References element::item, and element::seq.

Referenced by downheap(), and upheap().

00117 {
00118     int rc;
00119 
00120     rc = cmp(a->item, b->item);
00121     if (rc == 0) {
00122         /* check sequence to guarantee order */
00123         if (a->seq < b->seq)
00124             rc = 1;
00125         else if (a->seq > b->seq)
00126             rc = -1;
00127     }
00128 
00129     return rc;
00130 }

void downheap gw_prioqueue_t queue,
register long  index
[static]
 

Heapize down - our prioqueue - start index

Definition at line 154 of file gw-prioqueue.c.

References gw_prioqueue::cmp, compare(), gw_prioqueue_t, gw_prioqueue::len, and gw_prioqueue::tab.

Referenced by gw_prioqueue_consume(), and gw_prioqueue_remove().

00155 {
00156     struct element *v = queue->tab[index];
00157     register long j;
00158     
00159     while (index <= queue->len / 2) {
00160         j = 2 * index;
00161         /* take the biggest child item */
00162         if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
00163             j++;
00164         /* break if our item bigger */
00165         if (compare(v, queue->tab[j], queue->cmp) >= 0)
00166             break;
00167         queue->tab[index] = queue->tab[j];
00168         index = j;
00169     }
00170     queue->tab[index] = v;
00171 }

Here is the call graph for this function:

void gw_prioqueue_add_producer gw_prioqueue_t queue  ) 
 

Add producer to the priority queue - priority queue

Definition at line 329 of file gw-prioqueue.c.

References gw_assert, gw_prioqueue_t, gw_prioqueue::producers, queue_lock(), and queue_unlock().

00330 {
00331     gw_assert(queue != NULL);
00332 
00333     queue_lock(queue);
00334     queue->producers++;
00335     queue_unlock(queue);
00336 }

Here is the call graph for this function:

void* gw_prioqueue_consume gw_prioqueue_t queue  ) 
 

Remove biggest item from the priority queue, but block if producers available and none items in the queue - priority queue

Returns:
biggest item or NULL if none items and none producers in the queue

Definition at line 303 of file gw-prioqueue.c.

References downheap(), gw_assert, gw_prioqueue_t, gwthread_self(), element::item, gw_prioqueue::len, gw_prioqueue::mutex, Mutex::mutex, gw_prioqueue::nonempty, Mutex::owner, gw_prioqueue::producers, queue_lock(), queue_unlock(), and gw_prioqueue::tab.

00304 {
00305     void *ret;
00306     
00307     gw_assert(queue != NULL);
00308 
00309     queue_lock(queue);
00310     while (queue->len == 1 && queue->producers > 0) {
00311         queue->mutex->owner = -1;
00312         pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex);
00313         queue->mutex->owner = gwthread_self();
00314     }
00315     if (queue->len > 1) {
00316         ret = queue->tab[1]->item;
00317         gw_free(queue->tab[1]);
00318         queue->tab[1] = queue->tab[--queue->len];
00319         downheap(queue, 1);
00320     } else {
00321         ret = NULL;
00322     }
00323     queue_unlock(queue);
00324     
00325     return ret;
00326 }

Here is the call graph for this function:

gw_prioqueue_t* gw_prioqueue_create int(*)(const void *, const void *)  cmp  ) 
 

Create priority queue - compare function

Returns:
newly created priority queue

Definition at line 174 of file gw-prioqueue.c.

References gw_assert, gw_prioqueue_t, make_bigger(), gw_prioqueue::mutex, mutex_create, and gw_prioqueue::producers.

Referenced by main(), smsc_at2_create(), and smsc_emi2_create().

00175 {
00176     gw_prioqueue_t *ret;
00177      
00178     gw_assert(cmp != NULL);
00179     
00180     ret = gw_malloc(sizeof(*ret));
00181     ret->producers = 0;
00182     pthread_cond_init(&ret->nonempty, NULL);
00183     ret->mutex = mutex_create();
00184     ret->tab = NULL;
00185     ret->size = 0;
00186     ret->len = 0;
00187     ret->seq = 0;
00188     ret->cmp = cmp;
00189     
00190     /* put NULL item at pos 0 that is our stop marker */
00191     make_bigger(ret, 1);
00192     ret->tab[0] = gw_malloc(sizeof(**ret->tab));
00193     ret->tab[0]->item = NULL;
00194     ret->tab[0]->seq = ret->seq++;
00195     ret->len++;
00196     
00197     return ret;
00198 }

Here is the call graph for this function:

void gw_prioqueue_destroy gw_prioqueue_t queue,
void(*)(void *)  item_destroy
 

Destroy priority queue - queue to destroy - item destructor

Definition at line 201 of file gw-prioqueue.c.

References gw_prioqueue_t, element::item, item_destroy(), gw_prioqueue::len, gw_prioqueue::mutex, mutex_destroy(), gw_prioqueue::nonempty, and gw_prioqueue::tab.

Referenced by at2_device_thread(), emi2_sender(), smsc_at2_create(), and smsc_emi2_create().

00202 {
00203     long i;
00204 
00205     if (queue == NULL)
00206         return;
00207     
00208     for (i = 0; i < queue->len; i++) {
00209         if (item_destroy != NULL && queue->tab[i]->item != NULL)
00210             item_destroy(queue->tab[i]->item);
00211         gw_free(queue->tab[i]);
00212     }
00213     mutex_destroy(queue->mutex);
00214     pthread_cond_destroy(&queue->nonempty);
00215     gw_free(queue->tab);
00216     gw_free(queue);
00217 }

Here is the call graph for this function:

void gw_prioqueue_foreach gw_prioqueue_t queue,
void(*)(const void *, long)  fn
 

Definition at line 252 of file gw-prioqueue.c.

References gw_assert, gw_prioqueue_t, element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab.

Referenced by main().

00253 {
00254     register long i;
00255 
00256     gw_assert(queue != NULL && fn != NULL);
00257     
00258     queue_lock(queue);
00259     for (i = 1; i < queue->len; i++)
00260         fn(queue->tab[i]->item, i - 1);
00261     queue_unlock(queue);
00262 }

Here is the call graph for this function:

void* gw_prioqueue_get gw_prioqueue_t queue  ) 
 

Definition at line 286 of file gw-prioqueue.c.

References gw_assert, gw_prioqueue_t, element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab.

00287 {
00288     void *ret;
00289     
00290     gw_assert(queue != NULL);
00291     
00292     queue_lock(queue);
00293     if (queue->len > 1)
00294         ret = queue->tab[1]->item;
00295     else
00296         ret = NULL;
00297     queue_unlock(queue);
00298     
00299     return ret;
00300 }

Here is the call graph for this function:

void gw_prioqueue_insert gw_prioqueue_t queue,
void *  item
 

Insert item into the priority queue - priority queue - to be inserted item

Definition at line 235 of file gw-prioqueue.c.

References gw_assert, gw_prioqueue_t, element::item, gw_prioqueue::len, make_bigger(), gw_prioqueue::nonempty, queue_lock(), queue_unlock(), element::seq, gw_prioqueue::seq, gw_prioqueue::tab, and upheap().

Referenced by main().

00236 {
00237     gw_assert(queue != NULL);
00238     gw_assert(item != NULL);
00239     
00240     queue_lock(queue);
00241     make_bigger(queue, 1);
00242     queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
00243     queue->tab[queue->len]->item = item;
00244     queue->tab[queue->len]->seq = queue->seq++;
00245     upheap(queue, queue->len);
00246     queue->len++;
00247     pthread_cond_signal(&queue->nonempty);
00248     queue_unlock(queue);
00249 }

Here is the call graph for this function:

long gw_prioqueue_len gw_prioqueue_t queue  ) 
 

Return priority queue length - priority queue

Returns:
length of this priority queue

Definition at line 220 of file gw-prioqueue.c.

References gw_prioqueue_t, gw_prioqueue::len, queue_lock(), and queue_unlock().

Referenced by at2_device_thread(), at2_queued_cb(), at2_send_messages(), emi2_wait(), main(), and queued_cb().

00221 {
00222     long len;
00223 
00224     if (queue == NULL)
00225         return 0;
00226      
00227     queue_lock(queue);
00228     len = queue->len - 1;
00229     queue_unlock(queue);
00230     
00231     return len;
00232 }

Here is the call graph for this function:

long gw_prioqueue_producer_count gw_prioqueue_t queue  ) 
 

Return producer count for the priority queue - priority queue

Returns:
producer count

Definition at line 351 of file gw-prioqueue.c.

References gw_assert, gw_prioqueue_t, gw_prioqueue::producers, queue_lock(), and queue_unlock().

00352 {
00353     long ret;
00354 
00355     gw_assert(queue != NULL);
00356     
00357     queue_lock(queue);
00358     ret = queue->producers;
00359     queue_unlock(queue);
00360     
00361     return ret;
00362 }

Here is the call graph for this function:

void* gw_prioqueue_remove gw_prioqueue_t queue  ) 
 

Remove biggest item from the priority queue, but not block if producers available and none items in the queue - priority queue

Returns:
- biggest item or NULL if none items in the queue

Definition at line 265 of file gw-prioqueue.c.

References downheap(), gw_assert, gw_prioqueue_t, element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab.

Referenced by at2_send_messages(), at2_shutdown_cb(), emi2_do_send(), emi2_sender(), main(), open_send_connection(), and shutdown_cb().

00266 {
00267     void *ret;
00268     
00269     gw_assert(queue != NULL);
00270     
00271     queue_lock(queue);
00272     if (queue->len <= 1) {
00273         queue_unlock(queue);
00274         return NULL;
00275     }
00276     ret = queue->tab[1]->item;
00277     gw_free(queue->tab[1]);
00278     queue->tab[1] = queue->tab[--queue->len];
00279     downheap(queue, 1);
00280     queue_unlock(queue);
00281     
00282     return ret;
00283 }

Here is the call graph for this function:

void gw_prioqueue_remove_producer gw_prioqueue_t queue  ) 
 

Remove producer from the priority queue - priority queue

Definition at line 339 of file gw-prioqueue.c.

References gw_assert, gw_prioqueue_t, gw_prioqueue::nonempty, gw_prioqueue::producers, queue_lock(), and queue_unlock().

00340 {
00341     gw_assert(queue != NULL);
00342 
00343     queue_lock(queue);
00344     gw_assert(queue->producers > 0);
00345     queue->producers--;
00346     pthread_cond_broadcast(&queue->nonempty);
00347     queue_unlock(queue);
00348 }

Here is the call graph for this function:

void make_bigger gw_prioqueue_t queue,
long  items
[static]
 

Definition at line 103 of file gw-prioqueue.c.

References gw_prioqueue_t, gw_prioqueue::len, gw_prioqueue::size, size, and gw_prioqueue::tab.

00104 {
00105     size_t size = queue->size;
00106     size_t new_size = sizeof(*queue->tab) * (queue->len + items);
00107     
00108     if (size >= new_size)
00109         return;
00110     
00111     queue->tab = gw_realloc(queue->tab, new_size);
00112     queue->size = new_size;
00113 }

void queue_lock gw_prioqueue_t queue  )  [inline, static]
 

Definition at line 91 of file gw-prioqueue.c.

References gw_prioqueue_t, gw_prioqueue::mutex, and mutex_lock.

Referenced by gw_prioqueue_add_producer(), gw_prioqueue_consume(), gw_prioqueue_foreach(), gw_prioqueue_get(), gw_prioqueue_insert(), gw_prioqueue_len(), gw_prioqueue_producer_count(), gw_prioqueue_remove(), and gw_prioqueue_remove_producer().

00092 {
00093     mutex_lock(queue->mutex);
00094 }

void queue_unlock gw_prioqueue_t queue  )  [inline, static]
 

Definition at line 97 of file gw-prioqueue.c.

References gw_prioqueue_t, gw_prioqueue::mutex, and mutex_unlock.

Referenced by gw_prioqueue_add_producer(), gw_prioqueue_consume(), gw_prioqueue_foreach(), gw_prioqueue_get(), gw_prioqueue_insert(), gw_prioqueue_len(), gw_prioqueue_producer_count(), gw_prioqueue_remove(), and gw_prioqueue_remove_producer().

00098 {
00099     mutex_unlock(queue->mutex);
00100 }

void upheap gw_prioqueue_t queue,
register long  index
[static]
 

Heapize up - our prioqueue + - start index

Definition at line 138 of file gw-prioqueue.c.

References gw_prioqueue::cmp, compare(), gw_prioqueue_t, element::item, and gw_prioqueue::tab.

Referenced by gw_prioqueue_insert().

00139 {
00140     struct element *v = queue->tab[index];
00141     while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
00142         queue->tab[index] = queue->tab[index / 2];
00143         index /= 2;
00144     }
00145     queue->tab[index] = v;
00146 }

Here is the call graph for this function:

See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.