00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 #include "gw-config.h"
00066 #include <pthread.h>
00067 #include "thread.h"
00068 #include "gwmem.h"
00069 #include "gwassert.h"
00070 #include "gwthread.h"
00071 #include "gw-prioqueue.h"
00072
00073
00074 struct element {
00075 void *item;
00076 long long seq;
00077 };
00078
00079 struct gw_prioqueue {
00080 Mutex *mutex;
00081 struct element **tab;
00082 size_t size;
00083 long len;
00084 long producers;
00085 long long seq;
00086 pthread_cond_t nonempty;
00087 int (*cmp)(const void*, const void *);
00088 };
00089
00090
00091 static void inline queue_lock(gw_prioqueue_t *queue)
00092 {
00093 mutex_lock(queue->mutex);
00094 }
00095
00096
00097 static void inline queue_unlock(gw_prioqueue_t *queue)
00098 {
00099 mutex_unlock(queue->mutex);
00100 }
00101
00102
00103 static void make_bigger(gw_prioqueue_t *queue, long items)
00104 {
00105 size_t size = queue->size;
00106 size_t new_size = sizeof(*queue->tab) * (queue->len + items);
00107
00108 if (size >= new_size)
00109 return;
00110
00111 queue->tab = gw_realloc(queue->tab, new_size);
00112 queue->size = new_size;
00113 }
00114
00115
00116 static int compare(struct element *a, struct element *b, int(*cmp)(const void*, const void *))
00117 {
00118 int rc;
00119
00120 rc = cmp(a->item, b->item);
00121 if (rc == 0) {
00122
00123 if (a->seq < b->seq)
00124 rc = 1;
00125 else if (a->seq > b->seq)
00126 rc = -1;
00127 }
00128
00129 return rc;
00130 }
00131
00132
00138 static void upheap(gw_prioqueue_t *queue, register long index)
00139 {
00140 struct element *v = queue->tab[index];
00141 while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
00142 queue->tab[index] = queue->tab[index / 2];
00143 index /= 2;
00144 }
00145 queue->tab[index] = v;
00146 }
00147
00148
00154 static void downheap(gw_prioqueue_t *queue, register long index)
00155 {
00156 struct element *v = queue->tab[index];
00157 register long j;
00158
00159 while (index <= queue->len / 2) {
00160 j = 2 * index;
00161
00162 if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
00163 j++;
00164
00165 if (compare(v, queue->tab[j], queue->cmp) >= 0)
00166 break;
00167 queue->tab[index] = queue->tab[j];
00168 index = j;
00169 }
00170 queue->tab[index] = v;
00171 }
00172
00173
00174 gw_prioqueue_t *gw_prioqueue_create(int(*cmp)(const void*, const void *))
00175 {
00176 gw_prioqueue_t *ret;
00177
00178 gw_assert(cmp != NULL);
00179
00180 ret = gw_malloc(sizeof(*ret));
00181 ret->producers = 0;
00182 pthread_cond_init(&ret->nonempty, NULL);
00183 ret->mutex = mutex_create();
00184 ret->tab = NULL;
00185 ret->size = 0;
00186 ret->len = 0;
00187 ret->seq = 0;
00188 ret->cmp = cmp;
00189
00190
00191 make_bigger(ret, 1);
00192 ret->tab[0] = gw_malloc(sizeof(**ret->tab));
00193 ret->tab[0]->item = NULL;
00194 ret->tab[0]->seq = ret->seq++;
00195 ret->len++;
00196
00197 return ret;
00198 }
00199
00200
00201 void gw_prioqueue_destroy(gw_prioqueue_t *queue, void(*item_destroy)(void*))
00202 {
00203 long i;
00204
00205 if (queue == NULL)
00206 return;
00207
00208 for (i = 0; i < queue->len; i++) {
00209 if (item_destroy != NULL && queue->tab[i]->item != NULL)
00210 item_destroy(queue->tab[i]->item);
00211 gw_free(queue->tab[i]);
00212 }
00213 mutex_destroy(queue->mutex);
00214 pthread_cond_destroy(&queue->nonempty);
00215 gw_free(queue->tab);
00216 gw_free(queue);
00217 }
00218
00219
00220 long gw_prioqueue_len(gw_prioqueue_t *queue)
00221 {
00222 long len;
00223
00224 if (queue == NULL)
00225 return 0;
00226
00227 queue_lock(queue);
00228 len = queue->len - 1;
00229 queue_unlock(queue);
00230
00231 return len;
00232 }
00233
00234
00235 void gw_prioqueue_insert(gw_prioqueue_t *queue, void *item)
00236 {
00237 gw_assert(queue != NULL);
00238 gw_assert(item != NULL);
00239
00240 queue_lock(queue);
00241 make_bigger(queue, 1);
00242 queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
00243 queue->tab[queue->len]->item = item;
00244 queue->tab[queue->len]->seq = queue->seq++;
00245 upheap(queue, queue->len);
00246 queue->len++;
00247 pthread_cond_signal(&queue->nonempty);
00248 queue_unlock(queue);
00249 }
00250
00251
00252 void gw_prioqueue_foreach(gw_prioqueue_t *queue, void(*fn)(const void *, long))
00253 {
00254 register long i;
00255
00256 gw_assert(queue != NULL && fn != NULL);
00257
00258 queue_lock(queue);
00259 for (i = 1; i < queue->len; i++)
00260 fn(queue->tab[i]->item, i - 1);
00261 queue_unlock(queue);
00262 }
00263
00264
00265 void *gw_prioqueue_remove(gw_prioqueue_t *queue)
00266 {
00267 void *ret;
00268
00269 gw_assert(queue != NULL);
00270
00271 queue_lock(queue);
00272 if (queue->len <= 1) {
00273 queue_unlock(queue);
00274 return NULL;
00275 }
00276 ret = queue->tab[1]->item;
00277 gw_free(queue->tab[1]);
00278 queue->tab[1] = queue->tab[--queue->len];
00279 downheap(queue, 1);
00280 queue_unlock(queue);
00281
00282 return ret;
00283 }
00284
00285
00286 void *gw_prioqueue_get(gw_prioqueue_t *queue)
00287 {
00288 void *ret;
00289
00290 gw_assert(queue != NULL);
00291
00292 queue_lock(queue);
00293 if (queue->len > 1)
00294 ret = queue->tab[1]->item;
00295 else
00296 ret = NULL;
00297 queue_unlock(queue);
00298
00299 return ret;
00300 }
00301
00302
00303 void *gw_prioqueue_consume(gw_prioqueue_t *queue)
00304 {
00305 void *ret;
00306
00307 gw_assert(queue != NULL);
00308
00309 queue_lock(queue);
00310 while (queue->len == 1 && queue->producers > 0) {
00311 queue->mutex->owner = -1;
00312 pthread_cond_wait(&queue->nonempty, &queue->mutex->mutex);
00313 queue->mutex->owner = gwthread_self();
00314 }
00315 if (queue->len > 1) {
00316 ret = queue->tab[1]->item;
00317 gw_free(queue->tab[1]);
00318 queue->tab[1] = queue->tab[--queue->len];
00319 downheap(queue, 1);
00320 } else {
00321 ret = NULL;
00322 }
00323 queue_unlock(queue);
00324
00325 return ret;
00326 }
00327
00328
00329 void gw_prioqueue_add_producer(gw_prioqueue_t *queue)
00330 {
00331 gw_assert(queue != NULL);
00332
00333 queue_lock(queue);
00334 queue->producers++;
00335 queue_unlock(queue);
00336 }
00337
00338
00339 void gw_prioqueue_remove_producer(gw_prioqueue_t *queue)
00340 {
00341 gw_assert(queue != NULL);
00342
00343 queue_lock(queue);
00344 gw_assert(queue->producers > 0);
00345 queue->producers--;
00346 pthread_cond_broadcast(&queue->nonempty);
00347 queue_unlock(queue);
00348 }
00349
00350
00351 long gw_prioqueue_producer_count(gw_prioqueue_t *queue)
00352 {
00353 long ret;
00354
00355 gw_assert(queue != NULL);
00356
00357 queue_lock(queue);
00358 ret = queue->producers;
00359 queue_unlock(queue);
00360
00361 return ret;
00362 }
00363
See file LICENSE for details about the license agreement for using,
modifying, copying or deriving work from this software.