Kannel: Open Source WAP and SMS gateway  $Revision: 5037 $
gw-prioqueue.c File Reference
#include "gw-config.h"
#include <pthread.h>
#include "thread.h"
#include "gwmem.h"
#include "gwassert.h"
#include "gwthread.h"
#include "gw-prioqueue.h"

Go to the source code of this file.

Data Structures

struct  element
 
struct  gw_prioqueue
 

Functions

static void queue_lock (gw_prioqueue_t *queue)
 
static void queue_unlock (gw_prioqueue_t *queue)
 
static void make_bigger (gw_prioqueue_t *queue, long items)
 
static int compare (struct element *a, struct element *b, int(*cmp)(const void *, const void *))
 
static void upheap (gw_prioqueue_t *queue, register long index)
 
static void downheap (gw_prioqueue_t *queue, register long index)
 
gw_prioqueue_tgw_prioqueue_create (int(*cmp)(const void *, const void *))
 
void gw_prioqueue_destroy (gw_prioqueue_t *queue, void(*item_destroy)(void *))
 
long gw_prioqueue_len (gw_prioqueue_t *queue)
 
void gw_prioqueue_insert (gw_prioqueue_t *queue, void *item)
 
void gw_prioqueue_foreach (gw_prioqueue_t *queue, void(*fn)(const void *, long))
 
void * gw_prioqueue_remove (gw_prioqueue_t *queue)
 
void * gw_prioqueue_get (gw_prioqueue_t *queue)
 
void * gw_prioqueue_consume (gw_prioqueue_t *queue)
 
void gw_prioqueue_add_producer (gw_prioqueue_t *queue)
 
void gw_prioqueue_remove_producer (gw_prioqueue_t *queue)
 
long gw_prioqueue_producer_count (gw_prioqueue_t *queue)
 

Function Documentation

static int compare ( struct element a,
struct element b,
int(*)(const void *, const void *)  cmp 
)
static

Definition at line 116 of file gw-prioqueue.c.

References element::item, and element::seq.

Referenced by downheap(), and upheap().

117 {
118  int rc;
119 
120  rc = cmp(a->item, b->item);
121  if (rc == 0) {
122  /* check sequence to guarantee order */
123  if (a->seq < b->seq)
124  rc = 1;
125  else if (a->seq > b->seq)
126  rc = -1;
127  }
128 
129  return rc;
130 }
void * item
Definition: gw-prioqueue.c:75
long long seq
Definition: gw-prioqueue.c:76
static void downheap ( gw_prioqueue_t queue,
register long  index 
)
static

Heapize down - our prioqueue - start index

Definition at line 154 of file gw-prioqueue.c.

References gw_prioqueue::cmp, compare(), and gw_prioqueue::tab.

Referenced by gw_prioqueue_consume(), and gw_prioqueue_remove().

155 {
156  struct element *v = queue->tab[index];
157  register long j;
158 
159  while (index <= queue->len / 2) {
160  j = 2 * index;
161  /* take the biggest child item */
162  if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
163  j++;
164  /* break if our item bigger */
165  if (compare(v, queue->tab[j], queue->cmp) >= 0)
166  break;
167  queue->tab[index] = queue->tab[j];
168  index = j;
169  }
170  queue->tab[index] = v;
171 }
struct element ** tab
Definition: gw-prioqueue.c:81
int(* cmp)(const void *, const void *)
Definition: gw-prioqueue.c:87
static int compare(struct element *a, struct element *b, int(*cmp)(const void *, const void *))
Definition: gw-prioqueue.c:116
void gw_prioqueue_add_producer ( gw_prioqueue_t queue)

Add producer to the priority queue - priority queue

Definition at line 331 of file gw-prioqueue.c.

References gw_assert(), gw_prioqueue::producers, queue_lock(), and queue_unlock().

Referenced by smpp_create().

332 {
333  gw_assert(queue != NULL);
334 
335  queue_lock(queue);
336  queue->producers++;
337  queue_unlock(queue);
338 }
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
gw_assert(wtls_machine->packet_to_send!=NULL)
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
void* gw_prioqueue_consume ( gw_prioqueue_t queue)

Remove biggest item from the priority queue, but block if producers available and none items in the queue - priority queue

Returns
biggest item or NULL if none items and none producers in the queue

Definition at line 303 of file gw-prioqueue.c.

References downheap(), gw_assert(), gwthread_self(), element::item, gw_prioqueue::len, Mutex::mutex, gw_prioqueue::mutex, gw_prioqueue::nonempty, Mutex::owner, gw_prioqueue::producers, queue_lock(), queue_unlock(), and gw_prioqueue::tab.

304 {
305  void *ret;
306 
307  gw_assert(queue != NULL);
308 
309  queue_lock(queue);
310  while (queue->len == 1 && queue->producers > 0) {
311  queue->mutex->owner = -1;
312  pthread_cleanup_push((void(*)(void*))pthread_mutex_unlock, &queue->mutex->mutex);
313  pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex);
314  pthread_cleanup_pop(0);
315  queue->mutex->owner = gwthread_self();
316  }
317  if (queue->len > 1) {
318  ret = queue->tab[1]->item;
319  gw_free(queue->tab[1]);
320  queue->tab[1] = queue->tab[--queue->len];
321  downheap(queue, 1);
322  } else {
323  ret = NULL;
324  }
325  queue_unlock(queue);
326 
327  return ret;
328 }
long gwthread_self(void)
struct element ** tab
Definition: gw-prioqueue.c:81
static void downheap(gw_prioqueue_t *queue, register long index)
Definition: gw-prioqueue.c:154
void * item
Definition: gw-prioqueue.c:75
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
pthread_cond_t nonempty
Definition: gw-prioqueue.c:86
pthread_mutex_t mutex
Definition: thread.h:77
Mutex * mutex
Definition: gw-prioqueue.c:80
gw_assert(wtls_machine->packet_to_send!=NULL)
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
long owner
Definition: thread.h:78
gw_prioqueue_t* gw_prioqueue_create ( int(*)(const void *, const void *)  cmp)

Create priority queue - compare function

Returns
newly created priority queue

Definition at line 174 of file gw-prioqueue.c.

References gw_prioqueue::cmp, gw_assert(), element::item, gw_prioqueue::len, make_bigger(), gw_prioqueue::mutex, mutex_create, gw_prioqueue::nonempty, gw_prioqueue::producers, element::seq, gw_prioqueue::seq, gw_prioqueue::size, and gw_prioqueue::tab.

Referenced by main(), smpp_create(), smsc_at2_create(), and smsc_emi2_create().

175 {
176  gw_prioqueue_t *ret;
177 
178  gw_assert(cmp != NULL);
179 
180  ret = gw_malloc(sizeof(*ret));
181  ret->producers = 0;
182  pthread_cond_init(&ret->nonempty, NULL);
183  ret->mutex = mutex_create();
184  ret->tab = NULL;
185  ret->size = 0;
186  ret->len = 0;
187  ret->seq = 0;
188  ret->cmp = cmp;
189 
190  /* put NULL item at pos 0 that is our stop marker */
191  make_bigger(ret, 1);
192  ret->tab[0] = gw_malloc(sizeof(**ret->tab));
193  ret->tab[0]->item = NULL;
194  ret->tab[0]->seq = ret->seq++;
195  ret->len++;
196 
197  return ret;
198 }
struct element ** tab
Definition: gw-prioqueue.c:81
int(* cmp)(const void *, const void *)
Definition: gw-prioqueue.c:87
static void make_bigger(gw_prioqueue_t *queue, long items)
Definition: gw-prioqueue.c:103
#define mutex_create()
Definition: thread.h:96
void * item
Definition: gw-prioqueue.c:75
pthread_cond_t nonempty
Definition: gw-prioqueue.c:86
Mutex * mutex
Definition: gw-prioqueue.c:80
gw_assert(wtls_machine->packet_to_send!=NULL)
long long seq
Definition: gw-prioqueue.c:85
long long seq
Definition: gw-prioqueue.c:76
void gw_prioqueue_destroy ( gw_prioqueue_t queue,
void(*)(void *)  item_destroy 
)

Destroy priority queue - queue to destroy - item destructor

Definition at line 201 of file gw-prioqueue.c.

References element::item, item_destroy(), gw_prioqueue::len, gw_prioqueue::mutex, mutex_destroy(), gw_prioqueue::nonempty, and gw_prioqueue::tab.

Referenced by at2_device_thread(), emi2_sender(), smpp_destroy(), smsc_at2_create(), and smsc_emi2_create().

202 {
203  long i;
204 
205  if (queue == NULL)
206  return;
207 
208  for (i = 0; i < queue->len; i++) {
209  if (item_destroy != NULL && queue->tab[i]->item != NULL)
210  item_destroy(queue->tab[i]->item);
211  gw_free(queue->tab[i]);
212  }
213  mutex_destroy(queue->mutex);
214  pthread_cond_destroy(&queue->nonempty);
215  gw_free(queue->tab);
216  gw_free(queue);
217 }
struct element ** tab
Definition: gw-prioqueue.c:81
void * item
Definition: gw-prioqueue.c:75
pthread_cond_t nonempty
Definition: gw-prioqueue.c:86
Mutex * mutex
Definition: gw-prioqueue.c:80
void mutex_destroy(Mutex *mutex)
Definition: thread.c:97
static void item_destroy(void *item)
Definition: dict.c:91
void gw_prioqueue_foreach ( gw_prioqueue_t queue,
void(*)(const void *, long)  fn 
)

Definition at line 252 of file gw-prioqueue.c.

References gw_assert(), element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab.

Referenced by main().

253 {
254  register long i;
255 
256  gw_assert(queue != NULL && fn != NULL);
257 
258  queue_lock(queue);
259  for (i = 1; i < queue->len; i++)
260  fn(queue->tab[i]->item, i - 1);
261  queue_unlock(queue);
262 }
struct element ** tab
Definition: gw-prioqueue.c:81
void * item
Definition: gw-prioqueue.c:75
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
gw_assert(wtls_machine->packet_to_send!=NULL)
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
void* gw_prioqueue_get ( gw_prioqueue_t queue)

Definition at line 286 of file gw-prioqueue.c.

References gw_assert(), element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab.

287 {
288  void *ret;
289 
290  gw_assert(queue != NULL);
291 
292  queue_lock(queue);
293  if (queue->len > 1)
294  ret = queue->tab[1]->item;
295  else
296  ret = NULL;
297  queue_unlock(queue);
298 
299  return ret;
300 }
struct element ** tab
Definition: gw-prioqueue.c:81
void * item
Definition: gw-prioqueue.c:75
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
gw_assert(wtls_machine->packet_to_send!=NULL)
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
void gw_prioqueue_insert ( gw_prioqueue_t queue,
void *  item 
)

Insert item into the priority queue - priority queue - to be inserted item

Definition at line 235 of file gw-prioqueue.c.

References gw_assert(), element::item, gw_prioqueue::len, make_bigger(), gw_prioqueue::nonempty, queue_lock(), queue_unlock(), element::seq, gw_prioqueue::seq, gw_prioqueue::tab, and upheap().

Referenced by main().

236 {
237  gw_assert(queue != NULL);
238  gw_assert(item != NULL);
239 
240  queue_lock(queue);
241  make_bigger(queue, 1);
242  queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
243  queue->tab[queue->len]->item = item;
244  queue->tab[queue->len]->seq = queue->seq++;
245  upheap(queue, queue->len);
246  queue->len++;
247  pthread_cond_signal(&queue->nonempty);
248  queue_unlock(queue);
249 }
struct element ** tab
Definition: gw-prioqueue.c:81
static void make_bigger(gw_prioqueue_t *queue, long items)
Definition: gw-prioqueue.c:103
void * item
Definition: gw-prioqueue.c:75
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
pthread_cond_t nonempty
Definition: gw-prioqueue.c:86
gw_assert(wtls_machine->packet_to_send!=NULL)
long long seq
Definition: gw-prioqueue.c:85
long long seq
Definition: gw-prioqueue.c:76
static void upheap(gw_prioqueue_t *queue, register long index)
Definition: gw-prioqueue.c:138
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
long gw_prioqueue_len ( gw_prioqueue_t queue)

Return priority queue length - priority queue

Returns
length of this priority queue

Definition at line 220 of file gw-prioqueue.c.

References gw_prioqueue::len, queue_lock(), and queue_unlock().

Referenced by at2_device_thread(), at2_queued_cb(), at2_send_messages(), emi2_wait(), io_thread(), main(), and queued_cb().

221 {
222  long len;
223 
224  if (queue == NULL)
225  return 0;
226 
227  queue_lock(queue);
228  len = queue->len - 1;
229  queue_unlock(queue);
230 
231  return len;
232 }
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
long gw_prioqueue_producer_count ( gw_prioqueue_t queue)

Return producer count for the priority queue - priority queue

Returns
producer count

Definition at line 353 of file gw-prioqueue.c.

References gw_assert(), gw_prioqueue::producers, queue_lock(), and queue_unlock().

354 {
355  long ret;
356 
357  gw_assert(queue != NULL);
358 
359  queue_lock(queue);
360  ret = queue->producers;
361  queue_unlock(queue);
362 
363  return ret;
364 }
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
gw_assert(wtls_machine->packet_to_send!=NULL)
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
void* gw_prioqueue_remove ( gw_prioqueue_t queue)

Remove biggest item from the priority queue, but not block if producers available and none items in the queue - priority queue

Returns
- biggest item or NULL if none items in the queue

Definition at line 265 of file gw-prioqueue.c.

References downheap(), gw_assert(), element::item, gw_prioqueue::len, queue_lock(), queue_unlock(), and gw_prioqueue::tab.

Referenced by at2_send_messages(), at2_shutdown_cb(), emi2_do_send(), emi2_sender(), io_thread(), main(), open_send_connection(), send_messages(), and shutdown_cb().

266 {
267  void *ret;
268 
269  gw_assert(queue != NULL);
270 
271  queue_lock(queue);
272  if (queue->len <= 1) {
273  queue_unlock(queue);
274  return NULL;
275  }
276  ret = queue->tab[1]->item;
277  gw_free(queue->tab[1]);
278  queue->tab[1] = queue->tab[--queue->len];
279  downheap(queue, 1);
280  queue_unlock(queue);
281 
282  return ret;
283 }
struct element ** tab
Definition: gw-prioqueue.c:81
static void downheap(gw_prioqueue_t *queue, register long index)
Definition: gw-prioqueue.c:154
void * item
Definition: gw-prioqueue.c:75
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
gw_assert(wtls_machine->packet_to_send!=NULL)
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
void gw_prioqueue_remove_producer ( gw_prioqueue_t queue)

Remove producer from the priority queue - priority queue

Definition at line 341 of file gw-prioqueue.c.

References gw_assert(), gw_prioqueue::nonempty, gw_prioqueue::producers, queue_lock(), and queue_unlock().

342 {
343  gw_assert(queue != NULL);
344 
345  queue_lock(queue);
346  gw_assert(queue->producers > 0);
347  queue->producers--;
348  pthread_cond_broadcast(&queue->nonempty);
349  queue_unlock(queue);
350 }
static void queue_lock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:91
pthread_cond_t nonempty
Definition: gw-prioqueue.c:86
gw_assert(wtls_machine->packet_to_send!=NULL)
static void queue_unlock(gw_prioqueue_t *queue)
Definition: gw-prioqueue.c:97
static void make_bigger ( gw_prioqueue_t queue,
long  items 
)
static

Definition at line 103 of file gw-prioqueue.c.

References gw_prioqueue::len, gw_prioqueue::size, size, and gw_prioqueue::tab.

Referenced by gw_prioqueue_create(), and gw_prioqueue_insert().

104 {
105  size_t size = queue->size;
106  size_t new_size = sizeof(*queue->tab) * (queue->len + items);
107 
108  if (size >= new_size)
109  return;
110 
111  queue->tab = gw_realloc(queue->tab, new_size);
112  queue->size = new_size;
113 }
int size
Definition: wsasm.c:84
struct element ** tab
Definition: gw-prioqueue.c:81
static void upheap ( gw_prioqueue_t queue,
register long  index 
)
static

Heapize up - our prioqueue

  • - start index

Definition at line 138 of file gw-prioqueue.c.

References gw_prioqueue::cmp, compare(), element::item, and gw_prioqueue::tab.

Referenced by gw_prioqueue_insert().

139 {
140  struct element *v = queue->tab[index];
141  while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
142  queue->tab[index] = queue->tab[index / 2];
143  index /= 2;
144  }
145  queue->tab[index] = v;
146 }
struct element ** tab
Definition: gw-prioqueue.c:81
int(* cmp)(const void *, const void *)
Definition: gw-prioqueue.c:87
void * item
Definition: gw-prioqueue.c:75
static int compare(struct element *a, struct element *b, int(*cmp)(const void *, const void *))
Definition: gw-prioqueue.c:116
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.