Main Page | Alphabetical List | Data Structures | Directories | File List | Data Fields | Globals

gw-prioqueue.c

Go to the documentation of this file.
00001 /* ==================================================================== 
00002  * The Kannel Software License, Version 1.0 
00003  * 
00004  * Copyright (c) 2001-2008 Kannel Group  
00005  * Copyright (c) 1998-2001 WapIT Ltd.   
00006  * All rights reserved. 
00007  * 
00008  * Redistribution and use in source and binary forms, with or without 
00009  * modification, are permitted provided that the following conditions 
00010  * are met: 
00011  * 
00012  * 1. Redistributions of source code must retain the above copyright 
00013  *    notice, this list of conditions and the following disclaimer. 
00014  * 
00015  * 2. Redistributions in binary form must reproduce the above copyright 
00016  *    notice, this list of conditions and the following disclaimer in 
00017  *    the documentation and/or other materials provided with the 
00018  *    distribution. 
00019  * 
00020  * 3. The end-user documentation included with the redistribution, 
00021  *    if any, must include the following acknowledgment: 
00022  *       "This product includes software developed by the 
00023  *        Kannel Group (http://www.kannel.org/)." 
00024  *    Alternately, this acknowledgment may appear in the software itself, 
00025  *    if and wherever such third-party acknowledgments normally appear. 
00026  * 
00027  * 4. The names "Kannel" and "Kannel Group" must not be used to 
00028  *    endorse or promote products derived from this software without 
00029  *    prior written permission. For written permission, please  
00030  *    contact org@kannel.org. 
00031  * 
00032  * 5. Products derived from this software may not be called "Kannel", 
00033  *    nor may "Kannel" appear in their name, without prior written 
00034  *    permission of the Kannel Group. 
00035  * 
00036  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 
00037  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 
00038  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
00039  * DISCLAIMED.  IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS 
00040  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,  
00041  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT  
00042  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR  
00043  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
00044  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE  
00045  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,  
00046  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
00047  * ==================================================================== 
00048  * 
00049  * This software consists of voluntary contributions made by many 
00050  * individuals on behalf of the Kannel Group.  For more information on  
00051  * the Kannel Group, please see <http://www.kannel.org/>. 
00052  * 
00053  * Portions of this software are based upon software originally written at  
00054  * WapIT Ltd., Helsinki, Finland for the Kannel project.  
00055  */ 
00056 
00057 /*
00058  * gw-prioqueue.c - generic priority queue with guaranteed order.
00059  *
00060  * Algorithm ala Robert Sedgewick.
00061  *
00062  * Alexander Malysh <amalysh at kannel.org>, 2004, 2008
00063  */
00064 
00065 #include "gw-config.h"
00066 #include <pthread.h>
00067 #include "thread.h"
00068 #include "gwmem.h"
00069 #include "gwassert.h"
00070 #include "gwthread.h"
00071 #include "gw-prioqueue.h"
00072 
00073 
00074 struct element {
00075     void *item;
00076     long long seq;
00077 };
00078 
00079 struct gw_prioqueue {
00080     Mutex *mutex;
00081     struct element **tab;
00082     size_t size;
00083     long len;
00084     long producers;
00085     long long seq;
00086     pthread_cond_t nonempty;
00087     int (*cmp)(const void*, const void *);
00088 };
00089 
00090 
00091 static void inline queue_lock(gw_prioqueue_t *queue)
00092 {
00093     mutex_lock(queue->mutex);
00094 }
00095 
00096 
00097 static void inline queue_unlock(gw_prioqueue_t *queue)
00098 {
00099     mutex_unlock(queue->mutex);
00100 }
00101 
00102 
00103 static void make_bigger(gw_prioqueue_t *queue, long items)
00104 {
00105     size_t size = queue->size;
00106     size_t new_size = sizeof(*queue->tab) * (queue->len + items);
00107     
00108     if (size >= new_size)
00109         return;
00110     
00111     queue->tab = gw_realloc(queue->tab, new_size);
00112     queue->size = new_size;
00113 }
00114 
00115 
00116 static int compare(struct element *a, struct element *b, int(*cmp)(const void*, const void *))
00117 {
00118     int rc;
00119 
00120     rc = cmp(a->item, b->item);
00121     if (rc == 0) {
00122         /* check sequence to guarantee order */
00123         if (a->seq < b->seq)
00124             rc = 1;
00125         else if (a->seq > b->seq)
00126             rc = -1;
00127     }
00128 
00129     return rc;
00130 }
00131 
00132 
00138 static void upheap(gw_prioqueue_t *queue, register long index)
00139 {
00140     struct element *v = queue->tab[index];
00141     while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
00142         queue->tab[index] = queue->tab[index / 2];
00143         index /= 2;
00144     }
00145     queue->tab[index] = v;
00146 }
00147 
00148 
00154 static void downheap(gw_prioqueue_t *queue, register long index)
00155 {
00156     struct element *v = queue->tab[index];
00157     register long j;
00158     
00159     while (index <= queue->len / 2) {
00160         j = 2 * index;
00161         /* take the biggest child item */
00162         if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
00163             j++;
00164         /* break if our item bigger */
00165         if (compare(v, queue->tab[j], queue->cmp) >= 0)
00166             break;
00167         queue->tab[index] = queue->tab[j];
00168         index = j;
00169     }
00170     queue->tab[index] = v;
00171 }
00172 
00173 
00174 gw_prioqueue_t *gw_prioqueue_create(int(*cmp)(const void*, const void *))
00175 {
00176     gw_prioqueue_t *ret;
00177      
00178     gw_assert(cmp != NULL);
00179     
00180     ret = gw_malloc(sizeof(*ret));
00181     ret->producers = 0;
00182     pthread_cond_init(&ret->nonempty, NULL);
00183     ret->mutex = mutex_create();
00184     ret->tab = NULL;
00185     ret->size = 0;
00186     ret->len = 0;
00187     ret->seq = 0;
00188     ret->cmp = cmp;
00189     
00190     /* put NULL item at pos 0 that is our stop marker */
00191     make_bigger(ret, 1);
00192     ret->tab[0] = gw_malloc(sizeof(**ret->tab));
00193     ret->tab[0]->item = NULL;
00194     ret->tab[0]->seq = ret->seq++;
00195     ret->len++;
00196     
00197     return ret;
00198 }
00199 
00200 
00201 void gw_prioqueue_destroy(gw_prioqueue_t *queue, void(*item_destroy)(void*))
00202 {
00203     long i;
00204 
00205     if (queue == NULL)
00206         return;
00207     
00208     for (i = 0; i < queue->len; i++) {
00209         if (item_destroy != NULL && queue->tab[i]->item != NULL)
00210             item_destroy(queue->tab[i]->item);
00211         gw_free(queue->tab[i]);
00212     }
00213     mutex_destroy(queue->mutex);
00214     pthread_cond_destroy(&queue->nonempty);
00215     gw_free(queue->tab);
00216     gw_free(queue);
00217 }
00218 
00219 
00220 long gw_prioqueue_len(gw_prioqueue_t *queue)
00221 {
00222     long len;
00223 
00224     if (queue == NULL)
00225         return 0;
00226      
00227     queue_lock(queue);
00228     len = queue->len - 1;
00229     queue_unlock(queue);
00230     
00231     return len;
00232 }
00233 
00234 
00235 void gw_prioqueue_insert(gw_prioqueue_t *queue, void *item)
00236 {
00237     gw_assert(queue != NULL);
00238     gw_assert(item != NULL);
00239     
00240     queue_lock(queue);
00241     make_bigger(queue, 1);
00242     queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
00243     queue->tab[queue->len]->item = item;
00244     queue->tab[queue->len]->seq = queue->seq++;
00245     upheap(queue, queue->len);
00246     queue->len++;
00247     pthread_cond_signal(&queue->nonempty);
00248     queue_unlock(queue);
00249 }
00250 
00251 
00252 void gw_prioqueue_foreach(gw_prioqueue_t *queue, void(*fn)(const void *, long))
00253 {
00254     register long i;
00255 
00256     gw_assert(queue != NULL && fn != NULL);
00257     
00258     queue_lock(queue);
00259     for (i = 1; i < queue->len; i++)
00260         fn(queue->tab[i]->item, i - 1);
00261     queue_unlock(queue);
00262 }
00263 
00264 
00265 void *gw_prioqueue_remove(gw_prioqueue_t *queue)
00266 {
00267     void *ret;
00268     
00269     gw_assert(queue != NULL);
00270     
00271     queue_lock(queue);
00272     if (queue->len <= 1) {
00273         queue_unlock(queue);
00274         return NULL;
00275     }
00276     ret = queue->tab[1]->item;
00277     gw_free(queue->tab[1]);
00278     queue->tab[1] = queue->tab[--queue->len];
00279     downheap(queue, 1);
00280     queue_unlock(queue);
00281     
00282     return ret;
00283 }
00284 
00285 
00286 void *gw_prioqueue_get(gw_prioqueue_t *queue)
00287 {
00288     void *ret;
00289     
00290     gw_assert(queue != NULL);
00291     
00292     queue_lock(queue);
00293     if (queue->len > 1)
00294         ret = queue->tab[1]->item;
00295     else
00296         ret = NULL;
00297     queue_unlock(queue);
00298     
00299     return ret;
00300 }
00301 
00302 
00303 void *gw_prioqueue_consume(gw_prioqueue_t *queue)
00304 {
00305     void *ret;
00306     
00307     gw_assert(queue != NULL);
00308 
00309     queue_lock(queue);
00310     while (queue->len == 1 && queue->producers > 0) {
00311         queue->mutex->owner = -1;
00312         pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex);
00313         queue->mutex->owner = gwthread_self();
00314     }
00315     if (queue->len > 1) {
00316         ret = queue->tab[1]->item;
00317         gw_free(queue->tab[1]);
00318         queue->tab[1] = queue->tab[--queue->len];
00319         downheap(queue, 1);
00320     } else {
00321         ret = NULL;
00322     }
00323     queue_unlock(queue);
00324     
00325     return ret;
00326 }
00327 
00328 
00329 void gw_prioqueue_add_producer(gw_prioqueue_t *queue)
00330 {
00331     gw_assert(queue != NULL);
00332 
00333     queue_lock(queue);
00334     queue->producers++;
00335     queue_unlock(queue);
00336 }
00337 
00338 
00339 void gw_prioqueue_remove_producer(gw_prioqueue_t *queue)
00340 {
00341     gw_assert(queue != NULL);
00342 
00343     queue_lock(queue);
00344     gw_assert(queue->producers > 0);
00345     queue->producers--;
00346     pthread_cond_broadcast(&queue->nonempty);
00347     queue_unlock(queue);
00348 }
00349 
00350 
00351 long gw_prioqueue_producer_count(gw_prioqueue_t *queue)
00352 {
00353     long ret;
00354 
00355     gw_assert(queue != NULL);
00356     
00357     queue_lock(queue);
00358     ret = queue->producers;
00359     queue_unlock(queue);
00360     
00361     return ret;
00362 }
00363 
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.