blob: 74db3fb4e0ce4e350fe6a939ccfc2d75e492ce00 [file] [log] [blame]
Dees_Troy51a0e822012-09-05 15:24:24 -04001/* yarn.c -- generic thread operations implemented using pthread functions
Dees_Troy3bde1232012-09-22 08:10:28 -04002 * Copyright (C) 2008, 2012 Mark Adler
3 * Version 1.3 13 Jan 2012 Mark Adler
Dees_Troy51a0e822012-09-05 15:24:24 -04004 * For conditions of distribution and use, see copyright notice in yarn.h
5 */
6
7/* Basic thread operations implemented using the POSIX pthread library. All
8 pthread references are isolated within this module to allow alternate
9 implementations with other thread libraries. See yarn.h for the description
10 of these operations. */
11
12/* Version history:
13 1.0 19 Oct 2008 First version
14 1.1 26 Oct 2008 No need to set the stack size -- remove
15 Add yarn_abort() function for clean-up on error exit
Dees_Troy3bde1232012-09-22 08:10:28 -040016 1.2 19 Dec 2011 (changes reversed in 1.3)
17 1.3 13 Jan 2012 Add large file #define for consistency with pigz.c
18 Update thread portability #defines per IEEE 1003.1-2008
19 Fix documentation in yarn.h for yarn_prefix
Dees_Troy51a0e822012-09-05 15:24:24 -040020 */
21
22/* for thread portability */
Dees_Troy3bde1232012-09-22 08:10:28 -040023#define _XOPEN_SOURCE 700
24#define _POSIX_C_SOURCE 200809L
25#define _THREAD_SAFE
26
27/* use large file functions if available */
28#define _FILE_OFFSET_BITS 64
Dees_Troy51a0e822012-09-05 15:24:24 -040029
30/* external libraries and entities referenced */
31#include <stdio.h> /* fprintf(), stderr */
32#include <stdlib.h> /* exit(), malloc(), free(), NULL */
33#include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */
Ethan Yonkerc798c9c2015-10-09 11:15:26 -050034#include <signal.h> /* sigaction, SIGUSR1 */
Dees_Troy51a0e822012-09-05 15:24:24 -040035 /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
36 PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
37 pthread_self(), pthread_equal(),
38 pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
39 pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
40 pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
41 pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */
42#include <errno.h> /* ENOMEM, EAGAIN, EINVAL */
Ethan Yonkerc798c9c2015-10-09 11:15:26 -050043#include <string.h> /* memset */
Dees_Troy51a0e822012-09-05 15:24:24 -040044
45/* interface definition */
46#include "yarn.h"
47
48/* constants */
49#define local static /* for non-exported functions and globals */
50
51/* error handling external globals, resettable by application */
52char *yarn_prefix = "yarn";
53void (*yarn_abort)(int) = NULL;
54
Ethan Yonkerc798c9c2015-10-09 11:15:26 -050055void thread_exit_handler(int sig)
56{
57 printf("this signal is %d \n", sig);
58 pthread_exit(0);
59}
Dees_Troy51a0e822012-09-05 15:24:24 -040060
61/* immediately exit -- use for errors that shouldn't ever happen */
62local void fail(int err)
63{
64 fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix,
65 err == ENOMEM ? "out of memory" : "internal pthread error", err);
66 if (yarn_abort != NULL)
67 yarn_abort(err);
68 exit(err == ENOMEM || err == EAGAIN ? err : EINVAL);
69}
70
71/* memory handling routines provided by user -- if none are provided, malloc()
72 and free() are used, which are therefore assumed to be thread-safe */
73typedef void *(*malloc_t)(size_t);
74typedef void (*free_t)(void *);
75local malloc_t my_malloc_f = malloc;
76local free_t my_free = free;
77
78/* use user-supplied allocation routines instead of malloc() and free() */
79void yarn_mem(malloc_t lease, free_t vacate)
80{
81 my_malloc_f = lease;
82 my_free = vacate;
83}
84
85/* memory allocation that cannot fail (from the point of view of the caller) */
86local void *my_malloc(size_t size)
87{
88 void *block;
89
90 if ((block = my_malloc_f(size)) == NULL)
91 fail(ENOMEM);
92 return block;
93}
94
95/* -- lock functions -- */
96
97struct lock_s {
98 pthread_mutex_t mutex;
99 pthread_cond_t cond;
100 long value;
101};
102
103lock *new_lock(long initial)
104{
105 int ret;
106 lock *bolt;
107
108 bolt = my_malloc(sizeof(struct lock_s));
109 if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) ||
110 (ret = pthread_cond_init(&(bolt->cond), NULL)))
111 fail(ret);
112 bolt->value = initial;
113 return bolt;
114}
115
116void possess(lock *bolt)
117{
118 int ret;
119
120 if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0)
121 fail(ret);
122}
123
124void release(lock *bolt)
125{
126 int ret;
127
128 if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0)
129 fail(ret);
130}
131
132void twist(lock *bolt, enum twist_op op, long val)
133{
134 int ret;
135
136 if (op == TO)
137 bolt->value = val;
138 else if (op == BY)
139 bolt->value += val;
140 if ((ret = pthread_cond_broadcast(&(bolt->cond))) ||
141 (ret = pthread_mutex_unlock(&(bolt->mutex))))
142 fail(ret);
143}
144
145#define until(a) while(!(a))
146
147void wait_for(lock *bolt, enum wait_op op, long val)
148{
149 int ret;
150
151 switch (op) {
152 case TO_BE:
153 until (bolt->value == val)
154 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
155 fail(ret);
156 break;
157 case NOT_TO_BE:
158 until (bolt->value != val)
159 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
160 fail(ret);
161 break;
162 case TO_BE_MORE_THAN:
163 until (bolt->value > val)
164 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
165 fail(ret);
166 break;
167 case TO_BE_LESS_THAN:
168 until (bolt->value < val)
169 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
170 fail(ret);
171 }
172}
173
174long peek_lock(lock *bolt)
175{
176 return bolt->value;
177}
178
179void free_lock(lock *bolt)
180{
181 int ret;
182 if ((ret = pthread_cond_destroy(&(bolt->cond))) ||
183 (ret = pthread_mutex_destroy(&(bolt->mutex))))
184 fail(ret);
185 my_free(bolt);
186}
187
188/* -- thread functions (uses lock functions above) -- */
189
190struct thread_s {
191 pthread_t id;
192 int done; /* true if this thread has exited */
193 thread *next; /* for list of all launched threads */
194};
195
196/* list of threads launched but not joined, count of threads exited but not
197 joined (incremented by ignition() just before exiting) */
198local lock threads_lock = {
199 PTHREAD_MUTEX_INITIALIZER,
200 PTHREAD_COND_INITIALIZER,
201 0 /* number of threads exited but not joined */
202};
203local thread *threads = NULL; /* list of extant threads */
204
205/* structure in which to pass the probe and its payload to ignition() */
206struct capsule {
207 void (*probe)(void *);
208 void *payload;
209};
210
211/* mark the calling thread as done and alert join_all() */
212local void reenter(void *dummy)
213{
214 thread *match, **prior;
215 pthread_t me;
216
Dees_Troy3bde1232012-09-22 08:10:28 -0400217 (void)dummy;
218
Dees_Troy51a0e822012-09-05 15:24:24 -0400219 /* find this thread in the threads list by matching the thread id */
220 me = pthread_self();
221 possess(&(threads_lock));
222 prior = &(threads);
223 while ((match = *prior) != NULL) {
224 if (pthread_equal(match->id, me))
225 break;
226 prior = &(match->next);
227 }
228 if (match == NULL)
229 fail(EINVAL);
230
231 /* mark this thread as done and move it to the head of the list */
232 match->done = 1;
233 if (threads != match) {
234 *prior = match->next;
235 match->next = threads;
236 threads = match;
237 }
238
239 /* update the count of threads to be joined and alert join_all() */
240 twist(&(threads_lock), BY, +1);
241}
242
243/* all threads go through this routine so that just before the thread exits,
244 it marks itself as done in the threads list and alerts join_all() so that
245 the thread resources can be released -- use cleanup stack so that the
246 marking occurs even if the thread is cancelled */
247local void *ignition(void *arg)
248{
249 struct capsule *capsule = arg;
250
251 /* run reenter() before leaving */
252 pthread_cleanup_push(reenter, NULL);
253
254 /* execute the requested function with argument */
255 capsule->probe(capsule->payload);
256 my_free(capsule);
257
258 /* mark this thread as done and let join_all() know */
259 pthread_cleanup_pop(1);
260
261 /* exit thread */
262 return NULL;
263}
264
265/* not all POSIX implementations create threads as joinable by default, so that
266 is made explicit here */
267thread *launch(void (*probe)(void *), void *payload)
268{
269 int ret;
270 thread *th;
271 struct capsule *capsule;
272 pthread_attr_t attr;
Ethan Yonkerc798c9c2015-10-09 11:15:26 -0500273 struct sigaction actions;
274
275 memset(&actions, 0, sizeof(actions));
276 sigemptyset(&actions.sa_mask);
277 actions.sa_flags = 0;
278 actions.sa_handler = thread_exit_handler;
279 ret = sigaction(SIGUSR1,&actions,NULL);
Dees_Troy51a0e822012-09-05 15:24:24 -0400280
281 /* construct the requested call and argument for the ignition() routine
282 (allocated instead of automatic so that we're sure this will still be
283 there when ignition() actually starts up -- ignition() will free this
284 allocation) */
285 capsule = my_malloc(sizeof(struct capsule));
286 capsule->probe = probe;
287 capsule->payload = payload;
288
289 /* assure this thread is in the list before join_all() or ignition() looks
290 for it */
291 possess(&(threads_lock));
292
293 /* create the thread and call ignition() from that thread */
294 th = my_malloc(sizeof(struct thread_s));
295 if ((ret = pthread_attr_init(&attr)) ||
296 (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) ||
297 (ret = pthread_create(&(th->id), &attr, ignition, capsule)) ||
298 (ret = pthread_attr_destroy(&attr)))
299 fail(ret);
300
301 /* put the thread in the threads list for join_all() */
302 th->done = 0;
303 th->next = threads;
304 threads = th;
305 release(&(threads_lock));
306 return th;
307}
308
309void join(thread *ally)
310{
311 int ret;
312 thread *match, **prior;
313
314 /* wait for thread to exit and return its resources */
315 if ((ret = pthread_join(ally->id, NULL)) != 0)
316 fail(ret);
317
318 /* find the thread in the threads list */
319 possess(&(threads_lock));
320 prior = &(threads);
321 while ((match = *prior) != NULL) {
322 if (match == ally)
323 break;
324 prior = &(match->next);
325 }
326 if (match == NULL)
327 fail(EINVAL);
328
329 /* remove thread from list and update exited count, free thread */
330 if (match->done)
331 threads_lock.value--;
332 *prior = match->next;
333 release(&(threads_lock));
334 my_free(ally);
335}
336
337/* This implementation of join_all() only attempts to join threads that have
338 announced that they have exited (see ignition()). When there are many
339 threads, this is faster than waiting for some random thread to exit while a
340 bunch of other threads have already exited. */
341int join_all(void)
342{
343 int ret, count;
344 thread *match, **prior;
345
346 /* grab the threads list and initialize the joined count */
347 count = 0;
348 possess(&(threads_lock));
349
350 /* do until threads list is empty */
351 while (threads != NULL) {
352 /* wait until at least one thread has reentered */
353 wait_for(&(threads_lock), NOT_TO_BE, 0);
354
355 /* find the first thread marked done (should be at or near the top) */
356 prior = &(threads);
357 while ((match = *prior) != NULL) {
358 if (match->done)
359 break;
360 prior = &(match->next);
361 }
362 if (match == NULL)
363 fail(EINVAL);
364
365 /* join the thread (will be almost immediate), remove from the threads
366 list, update the reenter count, and free the thread */
367 if ((ret = pthread_join(match->id, NULL)) != 0)
368 fail(ret);
369 threads_lock.value--;
370 *prior = match->next;
371 my_free(match);
372 count++;
373 }
374
375 /* let go of the threads list and return the number of threads joined */
376 release(&(threads_lock));
377 return count;
378}
379
380/* cancel and join the thread -- the thread will cancel when it gets to a file
381 operation, a sleep or pause, or a condition wait */
382void destruct(thread *off_course)
383{
384 int ret;
385
Ethan Yonkerc798c9c2015-10-09 11:15:26 -0500386 if ((ret = pthread_kill(off_course->id, SIGUSR1)) != 0)
Dees_Troy51a0e822012-09-05 15:24:24 -0400387 fail(ret);
388 join(off_course);
389}