Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 1 | /* yarn.c -- generic thread operations implemented using pthread functions |
Dees_Troy | 3bde123 | 2012-09-22 08:10:28 -0400 | [diff] [blame] | 2 | * Copyright (C) 2008, 2012 Mark Adler |
| 3 | * Version 1.3 13 Jan 2012 Mark Adler |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 4 | * For conditions of distribution and use, see copyright notice in yarn.h |
| 5 | */ |
| 6 | |
| 7 | /* Basic thread operations implemented using the POSIX pthread library. All |
| 8 | pthread references are isolated within this module to allow alternate |
| 9 | implementations with other thread libraries. See yarn.h for the description |
| 10 | of these operations. */ |
| 11 | |
| 12 | /* Version history: |
| 13 | 1.0 19 Oct 2008 First version |
| 14 | 1.1 26 Oct 2008 No need to set the stack size -- remove |
| 15 | Add yarn_abort() function for clean-up on error exit |
Dees_Troy | 3bde123 | 2012-09-22 08:10:28 -0400 | [diff] [blame] | 16 | 1.2 19 Dec 2011 (changes reversed in 1.3) |
| 17 | 1.3 13 Jan 2012 Add large file #define for consistency with pigz.c |
| 18 | Update thread portability #defines per IEEE 1003.1-2008 |
| 19 | Fix documentation in yarn.h for yarn_prefix |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 20 | */ |
| 21 | |
| 22 | /* for thread portability */ |
Dees_Troy | 3bde123 | 2012-09-22 08:10:28 -0400 | [diff] [blame] | 23 | #define _XOPEN_SOURCE 700 |
| 24 | #define _POSIX_C_SOURCE 200809L |
| 25 | #define _THREAD_SAFE |
| 26 | |
| 27 | /* use large file functions if available */ |
| 28 | #define _FILE_OFFSET_BITS 64 |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 29 | |
| 30 | /* external libraries and entities referenced */ |
| 31 | #include <stdio.h> /* fprintf(), stderr */ |
| 32 | #include <stdlib.h> /* exit(), malloc(), free(), NULL */ |
| 33 | #include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */ |
Ethan Yonker | c798c9c | 2015-10-09 11:15:26 -0500 | [diff] [blame] | 34 | #include <signal.h> /* sigaction, SIGUSR1 */ |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 35 | /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(), |
| 36 | PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(), |
| 37 | pthread_self(), pthread_equal(), |
| 38 | pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(), |
| 39 | pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(), |
| 40 | pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(), |
| 41 | pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */ |
| 42 | #include <errno.h> /* ENOMEM, EAGAIN, EINVAL */ |
Ethan Yonker | c798c9c | 2015-10-09 11:15:26 -0500 | [diff] [blame] | 43 | #include <string.h> /* memset */ |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 44 | |
| 45 | /* interface definition */ |
| 46 | #include "yarn.h" |
| 47 | |
| 48 | /* constants */ |
| 49 | #define local static /* for non-exported functions and globals */ |
| 50 | |
| 51 | /* error handling external globals, resettable by application */ |
| 52 | char *yarn_prefix = "yarn"; |
| 53 | void (*yarn_abort)(int) = NULL; |
| 54 | |
Ethan Yonker | c798c9c | 2015-10-09 11:15:26 -0500 | [diff] [blame] | 55 | void thread_exit_handler(int sig) |
| 56 | { |
| 57 | printf("this signal is %d \n", sig); |
| 58 | pthread_exit(0); |
| 59 | } |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 60 | |
| 61 | /* immediately exit -- use for errors that shouldn't ever happen */ |
| 62 | local void fail(int err) |
| 63 | { |
| 64 | fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix, |
| 65 | err == ENOMEM ? "out of memory" : "internal pthread error", err); |
| 66 | if (yarn_abort != NULL) |
| 67 | yarn_abort(err); |
| 68 | exit(err == ENOMEM || err == EAGAIN ? err : EINVAL); |
| 69 | } |
| 70 | |
| 71 | /* memory handling routines provided by user -- if none are provided, malloc() |
| 72 | and free() are used, which are therefore assumed to be thread-safe */ |
| 73 | typedef void *(*malloc_t)(size_t); |
| 74 | typedef void (*free_t)(void *); |
| 75 | local malloc_t my_malloc_f = malloc; |
| 76 | local free_t my_free = free; |
| 77 | |
| 78 | /* use user-supplied allocation routines instead of malloc() and free() */ |
| 79 | void yarn_mem(malloc_t lease, free_t vacate) |
| 80 | { |
| 81 | my_malloc_f = lease; |
| 82 | my_free = vacate; |
| 83 | } |
| 84 | |
| 85 | /* memory allocation that cannot fail (from the point of view of the caller) */ |
| 86 | local void *my_malloc(size_t size) |
| 87 | { |
| 88 | void *block; |
| 89 | |
| 90 | if ((block = my_malloc_f(size)) == NULL) |
| 91 | fail(ENOMEM); |
| 92 | return block; |
| 93 | } |
| 94 | |
| 95 | /* -- lock functions -- */ |
| 96 | |
| 97 | struct lock_s { |
| 98 | pthread_mutex_t mutex; |
| 99 | pthread_cond_t cond; |
| 100 | long value; |
| 101 | }; |
| 102 | |
| 103 | lock *new_lock(long initial) |
| 104 | { |
| 105 | int ret; |
| 106 | lock *bolt; |
| 107 | |
| 108 | bolt = my_malloc(sizeof(struct lock_s)); |
| 109 | if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) || |
| 110 | (ret = pthread_cond_init(&(bolt->cond), NULL))) |
| 111 | fail(ret); |
| 112 | bolt->value = initial; |
| 113 | return bolt; |
| 114 | } |
| 115 | |
| 116 | void possess(lock *bolt) |
| 117 | { |
| 118 | int ret; |
| 119 | |
| 120 | if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0) |
| 121 | fail(ret); |
| 122 | } |
| 123 | |
| 124 | void release(lock *bolt) |
| 125 | { |
| 126 | int ret; |
| 127 | |
| 128 | if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0) |
| 129 | fail(ret); |
| 130 | } |
| 131 | |
| 132 | void twist(lock *bolt, enum twist_op op, long val) |
| 133 | { |
| 134 | int ret; |
| 135 | |
| 136 | if (op == TO) |
| 137 | bolt->value = val; |
| 138 | else if (op == BY) |
| 139 | bolt->value += val; |
| 140 | if ((ret = pthread_cond_broadcast(&(bolt->cond))) || |
| 141 | (ret = pthread_mutex_unlock(&(bolt->mutex)))) |
| 142 | fail(ret); |
| 143 | } |
| 144 | |
| 145 | #define until(a) while(!(a)) |
| 146 | |
| 147 | void wait_for(lock *bolt, enum wait_op op, long val) |
| 148 | { |
| 149 | int ret; |
| 150 | |
| 151 | switch (op) { |
| 152 | case TO_BE: |
| 153 | until (bolt->value == val) |
| 154 | if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) |
| 155 | fail(ret); |
| 156 | break; |
| 157 | case NOT_TO_BE: |
| 158 | until (bolt->value != val) |
| 159 | if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) |
| 160 | fail(ret); |
| 161 | break; |
| 162 | case TO_BE_MORE_THAN: |
| 163 | until (bolt->value > val) |
| 164 | if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) |
| 165 | fail(ret); |
| 166 | break; |
| 167 | case TO_BE_LESS_THAN: |
| 168 | until (bolt->value < val) |
| 169 | if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) |
| 170 | fail(ret); |
| 171 | } |
| 172 | } |
| 173 | |
| 174 | long peek_lock(lock *bolt) |
| 175 | { |
| 176 | return bolt->value; |
| 177 | } |
| 178 | |
| 179 | void free_lock(lock *bolt) |
| 180 | { |
| 181 | int ret; |
| 182 | if ((ret = pthread_cond_destroy(&(bolt->cond))) || |
| 183 | (ret = pthread_mutex_destroy(&(bolt->mutex)))) |
| 184 | fail(ret); |
| 185 | my_free(bolt); |
| 186 | } |
| 187 | |
| 188 | /* -- thread functions (uses lock functions above) -- */ |
| 189 | |
| 190 | struct thread_s { |
| 191 | pthread_t id; |
| 192 | int done; /* true if this thread has exited */ |
| 193 | thread *next; /* for list of all launched threads */ |
| 194 | }; |
| 195 | |
| 196 | /* list of threads launched but not joined, count of threads exited but not |
| 197 | joined (incremented by ignition() just before exiting) */ |
| 198 | local lock threads_lock = { |
| 199 | PTHREAD_MUTEX_INITIALIZER, |
| 200 | PTHREAD_COND_INITIALIZER, |
| 201 | 0 /* number of threads exited but not joined */ |
| 202 | }; |
| 203 | local thread *threads = NULL; /* list of extant threads */ |
| 204 | |
| 205 | /* structure in which to pass the probe and its payload to ignition() */ |
| 206 | struct capsule { |
| 207 | void (*probe)(void *); |
| 208 | void *payload; |
| 209 | }; |
| 210 | |
| 211 | /* mark the calling thread as done and alert join_all() */ |
| 212 | local void reenter(void *dummy) |
| 213 | { |
| 214 | thread *match, **prior; |
| 215 | pthread_t me; |
| 216 | |
Dees_Troy | 3bde123 | 2012-09-22 08:10:28 -0400 | [diff] [blame] | 217 | (void)dummy; |
| 218 | |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 219 | /* find this thread in the threads list by matching the thread id */ |
| 220 | me = pthread_self(); |
| 221 | possess(&(threads_lock)); |
| 222 | prior = &(threads); |
| 223 | while ((match = *prior) != NULL) { |
| 224 | if (pthread_equal(match->id, me)) |
| 225 | break; |
| 226 | prior = &(match->next); |
| 227 | } |
| 228 | if (match == NULL) |
| 229 | fail(EINVAL); |
| 230 | |
| 231 | /* mark this thread as done and move it to the head of the list */ |
| 232 | match->done = 1; |
| 233 | if (threads != match) { |
| 234 | *prior = match->next; |
| 235 | match->next = threads; |
| 236 | threads = match; |
| 237 | } |
| 238 | |
| 239 | /* update the count of threads to be joined and alert join_all() */ |
| 240 | twist(&(threads_lock), BY, +1); |
| 241 | } |
| 242 | |
| 243 | /* all threads go through this routine so that just before the thread exits, |
| 244 | it marks itself as done in the threads list and alerts join_all() so that |
| 245 | the thread resources can be released -- use cleanup stack so that the |
| 246 | marking occurs even if the thread is cancelled */ |
| 247 | local void *ignition(void *arg) |
| 248 | { |
| 249 | struct capsule *capsule = arg; |
| 250 | |
| 251 | /* run reenter() before leaving */ |
| 252 | pthread_cleanup_push(reenter, NULL); |
| 253 | |
| 254 | /* execute the requested function with argument */ |
| 255 | capsule->probe(capsule->payload); |
| 256 | my_free(capsule); |
| 257 | |
| 258 | /* mark this thread as done and let join_all() know */ |
| 259 | pthread_cleanup_pop(1); |
| 260 | |
| 261 | /* exit thread */ |
| 262 | return NULL; |
| 263 | } |
| 264 | |
| 265 | /* not all POSIX implementations create threads as joinable by default, so that |
| 266 | is made explicit here */ |
| 267 | thread *launch(void (*probe)(void *), void *payload) |
| 268 | { |
| 269 | int ret; |
| 270 | thread *th; |
| 271 | struct capsule *capsule; |
| 272 | pthread_attr_t attr; |
Ethan Yonker | c798c9c | 2015-10-09 11:15:26 -0500 | [diff] [blame] | 273 | struct sigaction actions; |
| 274 | |
| 275 | memset(&actions, 0, sizeof(actions)); |
| 276 | sigemptyset(&actions.sa_mask); |
| 277 | actions.sa_flags = 0; |
| 278 | actions.sa_handler = thread_exit_handler; |
| 279 | ret = sigaction(SIGUSR1,&actions,NULL); |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 280 | |
| 281 | /* construct the requested call and argument for the ignition() routine |
| 282 | (allocated instead of automatic so that we're sure this will still be |
| 283 | there when ignition() actually starts up -- ignition() will free this |
| 284 | allocation) */ |
| 285 | capsule = my_malloc(sizeof(struct capsule)); |
| 286 | capsule->probe = probe; |
| 287 | capsule->payload = payload; |
| 288 | |
| 289 | /* assure this thread is in the list before join_all() or ignition() looks |
| 290 | for it */ |
| 291 | possess(&(threads_lock)); |
| 292 | |
| 293 | /* create the thread and call ignition() from that thread */ |
| 294 | th = my_malloc(sizeof(struct thread_s)); |
| 295 | if ((ret = pthread_attr_init(&attr)) || |
| 296 | (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) || |
| 297 | (ret = pthread_create(&(th->id), &attr, ignition, capsule)) || |
| 298 | (ret = pthread_attr_destroy(&attr))) |
| 299 | fail(ret); |
| 300 | |
| 301 | /* put the thread in the threads list for join_all() */ |
| 302 | th->done = 0; |
| 303 | th->next = threads; |
| 304 | threads = th; |
| 305 | release(&(threads_lock)); |
| 306 | return th; |
| 307 | } |
| 308 | |
| 309 | void join(thread *ally) |
| 310 | { |
| 311 | int ret; |
| 312 | thread *match, **prior; |
| 313 | |
| 314 | /* wait for thread to exit and return its resources */ |
| 315 | if ((ret = pthread_join(ally->id, NULL)) != 0) |
| 316 | fail(ret); |
| 317 | |
| 318 | /* find the thread in the threads list */ |
| 319 | possess(&(threads_lock)); |
| 320 | prior = &(threads); |
| 321 | while ((match = *prior) != NULL) { |
| 322 | if (match == ally) |
| 323 | break; |
| 324 | prior = &(match->next); |
| 325 | } |
| 326 | if (match == NULL) |
| 327 | fail(EINVAL); |
| 328 | |
| 329 | /* remove thread from list and update exited count, free thread */ |
| 330 | if (match->done) |
| 331 | threads_lock.value--; |
| 332 | *prior = match->next; |
| 333 | release(&(threads_lock)); |
| 334 | my_free(ally); |
| 335 | } |
| 336 | |
| 337 | /* This implementation of join_all() only attempts to join threads that have |
| 338 | announced that they have exited (see ignition()). When there are many |
| 339 | threads, this is faster than waiting for some random thread to exit while a |
| 340 | bunch of other threads have already exited. */ |
| 341 | int join_all(void) |
| 342 | { |
| 343 | int ret, count; |
| 344 | thread *match, **prior; |
| 345 | |
| 346 | /* grab the threads list and initialize the joined count */ |
| 347 | count = 0; |
| 348 | possess(&(threads_lock)); |
| 349 | |
| 350 | /* do until threads list is empty */ |
| 351 | while (threads != NULL) { |
| 352 | /* wait until at least one thread has reentered */ |
| 353 | wait_for(&(threads_lock), NOT_TO_BE, 0); |
| 354 | |
| 355 | /* find the first thread marked done (should be at or near the top) */ |
| 356 | prior = &(threads); |
| 357 | while ((match = *prior) != NULL) { |
| 358 | if (match->done) |
| 359 | break; |
| 360 | prior = &(match->next); |
| 361 | } |
| 362 | if (match == NULL) |
| 363 | fail(EINVAL); |
| 364 | |
| 365 | /* join the thread (will be almost immediate), remove from the threads |
| 366 | list, update the reenter count, and free the thread */ |
| 367 | if ((ret = pthread_join(match->id, NULL)) != 0) |
| 368 | fail(ret); |
| 369 | threads_lock.value--; |
| 370 | *prior = match->next; |
| 371 | my_free(match); |
| 372 | count++; |
| 373 | } |
| 374 | |
| 375 | /* let go of the threads list and return the number of threads joined */ |
| 376 | release(&(threads_lock)); |
| 377 | return count; |
| 378 | } |
| 379 | |
| 380 | /* cancel and join the thread -- the thread will cancel when it gets to a file |
| 381 | operation, a sleep or pause, or a condition wait */ |
| 382 | void destruct(thread *off_course) |
| 383 | { |
| 384 | int ret; |
| 385 | |
Ethan Yonker | c798c9c | 2015-10-09 11:15:26 -0500 | [diff] [blame] | 386 | if ((ret = pthread_kill(off_course->id, SIGUSR1)) != 0) |
Dees_Troy | 51a0e82 | 2012-09-05 15:24:24 -0400 | [diff] [blame] | 387 | fail(ret); |
| 388 | join(off_course); |
| 389 | } |