Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 1 | /* yarn.c -- generic thread operations implemented using pthread functions |
Dees_Troy | 3bde123 | 2012-09-22 08:10:28 -0400 | [diff] [blame^] | 2 | * Copyright (C) 2008, 2012 Mark Adler |
| 3 | * Version 1.3 13 Jan 2012 Mark Adler |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 4 | * For conditions of distribution and use, see copyright notice in yarn.h |
| 5 | */ |
| 6 | |
| 7 | /* Basic thread operations implemented using the POSIX pthread library. All |
| 8 | pthread references are isolated within this module to allow alternate |
| 9 | implementations with other thread libraries. See yarn.h for the description |
| 10 | of these operations. */ |
| 11 | |
| 12 | /* Version history: |
| 13 | 1.0 19 Oct 2008 First version |
| 14 | 1.1 26 Oct 2008 No need to set the stack size -- remove |
| 15 | Add yarn_abort() function for clean-up on error exit |
Dees_Troy | 3bde123 | 2012-09-22 08:10:28 -0400 | [diff] [blame^] | 16 | 1.2 19 Dec 2011 (changes reversed in 1.3) |
| 17 | 1.3 13 Jan 2012 Add large file #define for consistency with pigz.c |
| 18 | Update thread portability #defines per IEEE 1003.1-2008 |
| 19 | Fix documentation in yarn.h for yarn_prefix |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 20 | */ |
| 21 | |
| 22 | /* for thread portability */ |
Dees_Troy | 3bde123 | 2012-09-22 08:10:28 -0400 | [diff] [blame^] | 23 | #define _XOPEN_SOURCE 700 |
| 24 | #define _POSIX_C_SOURCE 200809L |
| 25 | #define _THREAD_SAFE |
| 26 | |
| 27 | /* use large file functions if available */ |
| 28 | #define _FILE_OFFSET_BITS 64 |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 29 | |
| 30 | /* external libraries and entities referenced */ |
| 31 | #include <stdio.h> /* fprintf(), stderr */ |
| 32 | #include <stdlib.h> /* exit(), malloc(), free(), NULL */ |
| 33 | #include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */ |
| 34 | /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(), |
| 35 | PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(), |
| 36 | pthread_self(), pthread_equal(), |
| 37 | pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(), |
| 38 | pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(), |
| 39 | pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(), |
| 40 | pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */ |
| 41 | #include <errno.h> /* ENOMEM, EAGAIN, EINVAL */ |
| 42 | |
| 43 | /* interface definition */ |
| 44 | #include "yarn.h" |
| 45 | |
| 46 | /* constants */ |
| 47 | #define local static /* for non-exported functions and globals */ |
| 48 | |
| 49 | /* error handling external globals, resettable by application */ |
| 50 | char *yarn_prefix = "yarn"; |
| 51 | void (*yarn_abort)(int) = NULL; |
| 52 | |
| 53 | |
| 54 | /* immediately exit -- use for errors that shouldn't ever happen */ |
| 55 | local void fail(int err) |
| 56 | { |
| 57 | fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix, |
| 58 | err == ENOMEM ? "out of memory" : "internal pthread error", err); |
| 59 | if (yarn_abort != NULL) |
| 60 | yarn_abort(err); |
| 61 | exit(err == ENOMEM || err == EAGAIN ? err : EINVAL); |
| 62 | } |
| 63 | |
| 64 | /* memory handling routines provided by user -- if none are provided, malloc() |
| 65 | and free() are used, which are therefore assumed to be thread-safe */ |
| 66 | typedef void *(*malloc_t)(size_t); |
| 67 | typedef void (*free_t)(void *); |
| 68 | local malloc_t my_malloc_f = malloc; |
| 69 | local free_t my_free = free; |
| 70 | |
| 71 | /* use user-supplied allocation routines instead of malloc() and free() */ |
| 72 | void yarn_mem(malloc_t lease, free_t vacate) |
| 73 | { |
| 74 | my_malloc_f = lease; |
| 75 | my_free = vacate; |
| 76 | } |
| 77 | |
| 78 | /* memory allocation that cannot fail (from the point of view of the caller) */ |
| 79 | local void *my_malloc(size_t size) |
| 80 | { |
| 81 | void *block; |
| 82 | |
| 83 | if ((block = my_malloc_f(size)) == NULL) |
| 84 | fail(ENOMEM); |
| 85 | return block; |
| 86 | } |
| 87 | |
| 88 | /* -- lock functions -- */ |
| 89 | |
| 90 | struct lock_s { |
| 91 | pthread_mutex_t mutex; |
| 92 | pthread_cond_t cond; |
| 93 | long value; |
| 94 | }; |
| 95 | |
| 96 | lock *new_lock(long initial) |
| 97 | { |
| 98 | int ret; |
| 99 | lock *bolt; |
| 100 | |
| 101 | bolt = my_malloc(sizeof(struct lock_s)); |
| 102 | if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) || |
| 103 | (ret = pthread_cond_init(&(bolt->cond), NULL))) |
| 104 | fail(ret); |
| 105 | bolt->value = initial; |
| 106 | return bolt; |
| 107 | } |
| 108 | |
| 109 | void possess(lock *bolt) |
| 110 | { |
| 111 | int ret; |
| 112 | |
| 113 | if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0) |
| 114 | fail(ret); |
| 115 | } |
| 116 | |
| 117 | void release(lock *bolt) |
| 118 | { |
| 119 | int ret; |
| 120 | |
| 121 | if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0) |
| 122 | fail(ret); |
| 123 | } |
| 124 | |
| 125 | void twist(lock *bolt, enum twist_op op, long val) |
| 126 | { |
| 127 | int ret; |
| 128 | |
| 129 | if (op == TO) |
| 130 | bolt->value = val; |
| 131 | else if (op == BY) |
| 132 | bolt->value += val; |
| 133 | if ((ret = pthread_cond_broadcast(&(bolt->cond))) || |
| 134 | (ret = pthread_mutex_unlock(&(bolt->mutex)))) |
| 135 | fail(ret); |
| 136 | } |
| 137 | |
| 138 | #define until(a) while(!(a)) |
| 139 | |
| 140 | void wait_for(lock *bolt, enum wait_op op, long val) |
| 141 | { |
| 142 | int ret; |
| 143 | |
| 144 | switch (op) { |
| 145 | case TO_BE: |
| 146 | until (bolt->value == val) |
| 147 | if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) |
| 148 | fail(ret); |
| 149 | break; |
| 150 | case NOT_TO_BE: |
| 151 | until (bolt->value != val) |
| 152 | if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) |
| 153 | fail(ret); |
| 154 | break; |
| 155 | case TO_BE_MORE_THAN: |
| 156 | until (bolt->value > val) |
| 157 | if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) |
| 158 | fail(ret); |
| 159 | break; |
| 160 | case TO_BE_LESS_THAN: |
| 161 | until (bolt->value < val) |
| 162 | if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) |
| 163 | fail(ret); |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | long peek_lock(lock *bolt) |
| 168 | { |
| 169 | return bolt->value; |
| 170 | } |
| 171 | |
| 172 | void free_lock(lock *bolt) |
| 173 | { |
| 174 | int ret; |
| 175 | if ((ret = pthread_cond_destroy(&(bolt->cond))) || |
| 176 | (ret = pthread_mutex_destroy(&(bolt->mutex)))) |
| 177 | fail(ret); |
| 178 | my_free(bolt); |
| 179 | } |
| 180 | |
| 181 | /* -- thread functions (uses lock functions above) -- */ |
| 182 | |
| 183 | struct thread_s { |
| 184 | pthread_t id; |
| 185 | int done; /* true if this thread has exited */ |
| 186 | thread *next; /* for list of all launched threads */ |
| 187 | }; |
| 188 | |
| 189 | /* list of threads launched but not joined, count of threads exited but not |
| 190 | joined (incremented by ignition() just before exiting) */ |
| 191 | local lock threads_lock = { |
| 192 | PTHREAD_MUTEX_INITIALIZER, |
| 193 | PTHREAD_COND_INITIALIZER, |
| 194 | 0 /* number of threads exited but not joined */ |
| 195 | }; |
| 196 | local thread *threads = NULL; /* list of extant threads */ |
| 197 | |
| 198 | /* structure in which to pass the probe and its payload to ignition() */ |
| 199 | struct capsule { |
| 200 | void (*probe)(void *); |
| 201 | void *payload; |
| 202 | }; |
| 203 | |
| 204 | /* mark the calling thread as done and alert join_all() */ |
| 205 | local void reenter(void *dummy) |
| 206 | { |
| 207 | thread *match, **prior; |
| 208 | pthread_t me; |
| 209 | |
Dees_Troy | 3bde123 | 2012-09-22 08:10:28 -0400 | [diff] [blame^] | 210 | (void)dummy; |
| 211 | |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 212 | /* find this thread in the threads list by matching the thread id */ |
| 213 | me = pthread_self(); |
| 214 | possess(&(threads_lock)); |
| 215 | prior = &(threads); |
| 216 | while ((match = *prior) != NULL) { |
| 217 | if (pthread_equal(match->id, me)) |
| 218 | break; |
| 219 | prior = &(match->next); |
| 220 | } |
| 221 | if (match == NULL) |
| 222 | fail(EINVAL); |
| 223 | |
| 224 | /* mark this thread as done and move it to the head of the list */ |
| 225 | match->done = 1; |
| 226 | if (threads != match) { |
| 227 | *prior = match->next; |
| 228 | match->next = threads; |
| 229 | threads = match; |
| 230 | } |
| 231 | |
| 232 | /* update the count of threads to be joined and alert join_all() */ |
| 233 | twist(&(threads_lock), BY, +1); |
| 234 | } |
| 235 | |
| 236 | /* all threads go through this routine so that just before the thread exits, |
| 237 | it marks itself as done in the threads list and alerts join_all() so that |
| 238 | the thread resources can be released -- use cleanup stack so that the |
| 239 | marking occurs even if the thread is cancelled */ |
| 240 | local void *ignition(void *arg) |
| 241 | { |
| 242 | struct capsule *capsule = arg; |
| 243 | |
| 244 | /* run reenter() before leaving */ |
| 245 | pthread_cleanup_push(reenter, NULL); |
| 246 | |
| 247 | /* execute the requested function with argument */ |
| 248 | capsule->probe(capsule->payload); |
| 249 | my_free(capsule); |
| 250 | |
| 251 | /* mark this thread as done and let join_all() know */ |
| 252 | pthread_cleanup_pop(1); |
| 253 | |
| 254 | /* exit thread */ |
| 255 | return NULL; |
| 256 | } |
| 257 | |
| 258 | /* not all POSIX implementations create threads as joinable by default, so that |
| 259 | is made explicit here */ |
| 260 | thread *launch(void (*probe)(void *), void *payload) |
| 261 | { |
| 262 | int ret; |
| 263 | thread *th; |
| 264 | struct capsule *capsule; |
| 265 | pthread_attr_t attr; |
| 266 | |
| 267 | /* construct the requested call and argument for the ignition() routine |
| 268 | (allocated instead of automatic so that we're sure this will still be |
| 269 | there when ignition() actually starts up -- ignition() will free this |
| 270 | allocation) */ |
| 271 | capsule = my_malloc(sizeof(struct capsule)); |
| 272 | capsule->probe = probe; |
| 273 | capsule->payload = payload; |
| 274 | |
| 275 | /* assure this thread is in the list before join_all() or ignition() looks |
| 276 | for it */ |
| 277 | possess(&(threads_lock)); |
| 278 | |
| 279 | /* create the thread and call ignition() from that thread */ |
| 280 | th = my_malloc(sizeof(struct thread_s)); |
| 281 | if ((ret = pthread_attr_init(&attr)) || |
| 282 | (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) || |
| 283 | (ret = pthread_create(&(th->id), &attr, ignition, capsule)) || |
| 284 | (ret = pthread_attr_destroy(&attr))) |
| 285 | fail(ret); |
| 286 | |
| 287 | /* put the thread in the threads list for join_all() */ |
| 288 | th->done = 0; |
| 289 | th->next = threads; |
| 290 | threads = th; |
| 291 | release(&(threads_lock)); |
| 292 | return th; |
| 293 | } |
| 294 | |
| 295 | void join(thread *ally) |
| 296 | { |
| 297 | int ret; |
| 298 | thread *match, **prior; |
| 299 | |
| 300 | /* wait for thread to exit and return its resources */ |
| 301 | if ((ret = pthread_join(ally->id, NULL)) != 0) |
| 302 | fail(ret); |
| 303 | |
| 304 | /* find the thread in the threads list */ |
| 305 | possess(&(threads_lock)); |
| 306 | prior = &(threads); |
| 307 | while ((match = *prior) != NULL) { |
| 308 | if (match == ally) |
| 309 | break; |
| 310 | prior = &(match->next); |
| 311 | } |
| 312 | if (match == NULL) |
| 313 | fail(EINVAL); |
| 314 | |
| 315 | /* remove thread from list and update exited count, free thread */ |
| 316 | if (match->done) |
| 317 | threads_lock.value--; |
| 318 | *prior = match->next; |
| 319 | release(&(threads_lock)); |
| 320 | my_free(ally); |
| 321 | } |
| 322 | |
| 323 | /* This implementation of join_all() only attempts to join threads that have |
| 324 | announced that they have exited (see ignition()). When there are many |
| 325 | threads, this is faster than waiting for some random thread to exit while a |
| 326 | bunch of other threads have already exited. */ |
| 327 | int join_all(void) |
| 328 | { |
| 329 | int ret, count; |
| 330 | thread *match, **prior; |
| 331 | |
| 332 | /* grab the threads list and initialize the joined count */ |
| 333 | count = 0; |
| 334 | possess(&(threads_lock)); |
| 335 | |
| 336 | /* do until threads list is empty */ |
| 337 | while (threads != NULL) { |
| 338 | /* wait until at least one thread has reentered */ |
| 339 | wait_for(&(threads_lock), NOT_TO_BE, 0); |
| 340 | |
| 341 | /* find the first thread marked done (should be at or near the top) */ |
| 342 | prior = &(threads); |
| 343 | while ((match = *prior) != NULL) { |
| 344 | if (match->done) |
| 345 | break; |
| 346 | prior = &(match->next); |
| 347 | } |
| 348 | if (match == NULL) |
| 349 | fail(EINVAL); |
| 350 | |
| 351 | /* join the thread (will be almost immediate), remove from the threads |
| 352 | list, update the reenter count, and free the thread */ |
| 353 | if ((ret = pthread_join(match->id, NULL)) != 0) |
| 354 | fail(ret); |
| 355 | threads_lock.value--; |
| 356 | *prior = match->next; |
| 357 | my_free(match); |
| 358 | count++; |
| 359 | } |
| 360 | |
| 361 | /* let go of the threads list and return the number of threads joined */ |
| 362 | release(&(threads_lock)); |
| 363 | return count; |
| 364 | } |
| 365 | |
| 366 | /* cancel and join the thread -- the thread will cancel when it gets to a file |
| 367 | operation, a sleep or pause, or a condition wait */ |
| 368 | void destruct(thread *off_course) |
| 369 | { |
| 370 | int ret; |
| 371 | |
| 372 | if ((ret = pthread_cancel(off_course->id)) != 0) |
| 373 | fail(ret); |
| 374 | join(off_course); |
| 375 | } |