blob: 5e557f3f1b58577b4dea80d4b8dc5736774974f3 [file] [log] [blame]
Dees_Troy51a0e822012-09-05 15:24:24 -04001/* yarn.c -- generic thread operations implemented using pthread functions
Dees_Troy3bde1232012-09-22 08:10:28 -04002 * Copyright (C) 2008, 2012 Mark Adler
3 * Version 1.3 13 Jan 2012 Mark Adler
Dees_Troy51a0e822012-09-05 15:24:24 -04004 * For conditions of distribution and use, see copyright notice in yarn.h
5 */
6
7/* Basic thread operations implemented using the POSIX pthread library. All
8 pthread references are isolated within this module to allow alternate
9 implementations with other thread libraries. See yarn.h for the description
10 of these operations. */
11
12/* Version history:
13 1.0 19 Oct 2008 First version
14 1.1 26 Oct 2008 No need to set the stack size -- remove
15 Add yarn_abort() function for clean-up on error exit
Dees_Troy3bde1232012-09-22 08:10:28 -040016 1.2 19 Dec 2011 (changes reversed in 1.3)
17 1.3 13 Jan 2012 Add large file #define for consistency with pigz.c
18 Update thread portability #defines per IEEE 1003.1-2008
19 Fix documentation in yarn.h for yarn_prefix
Dees_Troy51a0e822012-09-05 15:24:24 -040020 */
21
22/* for thread portability */
Dees_Troy3bde1232012-09-22 08:10:28 -040023#define _XOPEN_SOURCE 700
24#define _POSIX_C_SOURCE 200809L
25#define _THREAD_SAFE
26
27/* use large file functions if available */
28#define _FILE_OFFSET_BITS 64
Dees_Troy51a0e822012-09-05 15:24:24 -040029
30/* external libraries and entities referenced */
31#include <stdio.h> /* fprintf(), stderr */
32#include <stdlib.h> /* exit(), malloc(), free(), NULL */
33#include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */
34 /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
35 PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
36 pthread_self(), pthread_equal(),
37 pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
38 pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
39 pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
40 pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */
41#include <errno.h> /* ENOMEM, EAGAIN, EINVAL */
42
43/* interface definition */
44#include "yarn.h"
45
46/* constants */
47#define local static /* for non-exported functions and globals */
48
49/* error handling external globals, resettable by application */
50char *yarn_prefix = "yarn";
51void (*yarn_abort)(int) = NULL;
52
53
54/* immediately exit -- use for errors that shouldn't ever happen */
55local void fail(int err)
56{
57 fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix,
58 err == ENOMEM ? "out of memory" : "internal pthread error", err);
59 if (yarn_abort != NULL)
60 yarn_abort(err);
61 exit(err == ENOMEM || err == EAGAIN ? err : EINVAL);
62}
63
64/* memory handling routines provided by user -- if none are provided, malloc()
65 and free() are used, which are therefore assumed to be thread-safe */
66typedef void *(*malloc_t)(size_t);
67typedef void (*free_t)(void *);
68local malloc_t my_malloc_f = malloc;
69local free_t my_free = free;
70
71/* use user-supplied allocation routines instead of malloc() and free() */
72void yarn_mem(malloc_t lease, free_t vacate)
73{
74 my_malloc_f = lease;
75 my_free = vacate;
76}
77
78/* memory allocation that cannot fail (from the point of view of the caller) */
79local void *my_malloc(size_t size)
80{
81 void *block;
82
83 if ((block = my_malloc_f(size)) == NULL)
84 fail(ENOMEM);
85 return block;
86}
87
88/* -- lock functions -- */
89
90struct lock_s {
91 pthread_mutex_t mutex;
92 pthread_cond_t cond;
93 long value;
94};
95
96lock *new_lock(long initial)
97{
98 int ret;
99 lock *bolt;
100
101 bolt = my_malloc(sizeof(struct lock_s));
102 if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) ||
103 (ret = pthread_cond_init(&(bolt->cond), NULL)))
104 fail(ret);
105 bolt->value = initial;
106 return bolt;
107}
108
109void possess(lock *bolt)
110{
111 int ret;
112
113 if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0)
114 fail(ret);
115}
116
117void release(lock *bolt)
118{
119 int ret;
120
121 if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0)
122 fail(ret);
123}
124
125void twist(lock *bolt, enum twist_op op, long val)
126{
127 int ret;
128
129 if (op == TO)
130 bolt->value = val;
131 else if (op == BY)
132 bolt->value += val;
133 if ((ret = pthread_cond_broadcast(&(bolt->cond))) ||
134 (ret = pthread_mutex_unlock(&(bolt->mutex))))
135 fail(ret);
136}
137
138#define until(a) while(!(a))
139
140void wait_for(lock *bolt, enum wait_op op, long val)
141{
142 int ret;
143
144 switch (op) {
145 case TO_BE:
146 until (bolt->value == val)
147 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
148 fail(ret);
149 break;
150 case NOT_TO_BE:
151 until (bolt->value != val)
152 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
153 fail(ret);
154 break;
155 case TO_BE_MORE_THAN:
156 until (bolt->value > val)
157 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
158 fail(ret);
159 break;
160 case TO_BE_LESS_THAN:
161 until (bolt->value < val)
162 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
163 fail(ret);
164 }
165}
166
167long peek_lock(lock *bolt)
168{
169 return bolt->value;
170}
171
172void free_lock(lock *bolt)
173{
174 int ret;
175 if ((ret = pthread_cond_destroy(&(bolt->cond))) ||
176 (ret = pthread_mutex_destroy(&(bolt->mutex))))
177 fail(ret);
178 my_free(bolt);
179}
180
181/* -- thread functions (uses lock functions above) -- */
182
183struct thread_s {
184 pthread_t id;
185 int done; /* true if this thread has exited */
186 thread *next; /* for list of all launched threads */
187};
188
189/* list of threads launched but not joined, count of threads exited but not
190 joined (incremented by ignition() just before exiting) */
191local lock threads_lock = {
192 PTHREAD_MUTEX_INITIALIZER,
193 PTHREAD_COND_INITIALIZER,
194 0 /* number of threads exited but not joined */
195};
196local thread *threads = NULL; /* list of extant threads */
197
198/* structure in which to pass the probe and its payload to ignition() */
199struct capsule {
200 void (*probe)(void *);
201 void *payload;
202};
203
204/* mark the calling thread as done and alert join_all() */
205local void reenter(void *dummy)
206{
207 thread *match, **prior;
208 pthread_t me;
209
Dees_Troy3bde1232012-09-22 08:10:28 -0400210 (void)dummy;
211
Dees_Troy51a0e822012-09-05 15:24:24 -0400212 /* find this thread in the threads list by matching the thread id */
213 me = pthread_self();
214 possess(&(threads_lock));
215 prior = &(threads);
216 while ((match = *prior) != NULL) {
217 if (pthread_equal(match->id, me))
218 break;
219 prior = &(match->next);
220 }
221 if (match == NULL)
222 fail(EINVAL);
223
224 /* mark this thread as done and move it to the head of the list */
225 match->done = 1;
226 if (threads != match) {
227 *prior = match->next;
228 match->next = threads;
229 threads = match;
230 }
231
232 /* update the count of threads to be joined and alert join_all() */
233 twist(&(threads_lock), BY, +1);
234}
235
236/* all threads go through this routine so that just before the thread exits,
237 it marks itself as done in the threads list and alerts join_all() so that
238 the thread resources can be released -- use cleanup stack so that the
239 marking occurs even if the thread is cancelled */
240local void *ignition(void *arg)
241{
242 struct capsule *capsule = arg;
243
244 /* run reenter() before leaving */
245 pthread_cleanup_push(reenter, NULL);
246
247 /* execute the requested function with argument */
248 capsule->probe(capsule->payload);
249 my_free(capsule);
250
251 /* mark this thread as done and let join_all() know */
252 pthread_cleanup_pop(1);
253
254 /* exit thread */
255 return NULL;
256}
257
258/* not all POSIX implementations create threads as joinable by default, so that
259 is made explicit here */
260thread *launch(void (*probe)(void *), void *payload)
261{
262 int ret;
263 thread *th;
264 struct capsule *capsule;
265 pthread_attr_t attr;
266
267 /* construct the requested call and argument for the ignition() routine
268 (allocated instead of automatic so that we're sure this will still be
269 there when ignition() actually starts up -- ignition() will free this
270 allocation) */
271 capsule = my_malloc(sizeof(struct capsule));
272 capsule->probe = probe;
273 capsule->payload = payload;
274
275 /* assure this thread is in the list before join_all() or ignition() looks
276 for it */
277 possess(&(threads_lock));
278
279 /* create the thread and call ignition() from that thread */
280 th = my_malloc(sizeof(struct thread_s));
281 if ((ret = pthread_attr_init(&attr)) ||
282 (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) ||
283 (ret = pthread_create(&(th->id), &attr, ignition, capsule)) ||
284 (ret = pthread_attr_destroy(&attr)))
285 fail(ret);
286
287 /* put the thread in the threads list for join_all() */
288 th->done = 0;
289 th->next = threads;
290 threads = th;
291 release(&(threads_lock));
292 return th;
293}
294
295void join(thread *ally)
296{
297 int ret;
298 thread *match, **prior;
299
300 /* wait for thread to exit and return its resources */
301 if ((ret = pthread_join(ally->id, NULL)) != 0)
302 fail(ret);
303
304 /* find the thread in the threads list */
305 possess(&(threads_lock));
306 prior = &(threads);
307 while ((match = *prior) != NULL) {
308 if (match == ally)
309 break;
310 prior = &(match->next);
311 }
312 if (match == NULL)
313 fail(EINVAL);
314
315 /* remove thread from list and update exited count, free thread */
316 if (match->done)
317 threads_lock.value--;
318 *prior = match->next;
319 release(&(threads_lock));
320 my_free(ally);
321}
322
323/* This implementation of join_all() only attempts to join threads that have
324 announced that they have exited (see ignition()). When there are many
325 threads, this is faster than waiting for some random thread to exit while a
326 bunch of other threads have already exited. */
327int join_all(void)
328{
329 int ret, count;
330 thread *match, **prior;
331
332 /* grab the threads list and initialize the joined count */
333 count = 0;
334 possess(&(threads_lock));
335
336 /* do until threads list is empty */
337 while (threads != NULL) {
338 /* wait until at least one thread has reentered */
339 wait_for(&(threads_lock), NOT_TO_BE, 0);
340
341 /* find the first thread marked done (should be at or near the top) */
342 prior = &(threads);
343 while ((match = *prior) != NULL) {
344 if (match->done)
345 break;
346 prior = &(match->next);
347 }
348 if (match == NULL)
349 fail(EINVAL);
350
351 /* join the thread (will be almost immediate), remove from the threads
352 list, update the reenter count, and free the thread */
353 if ((ret = pthread_join(match->id, NULL)) != 0)
354 fail(ret);
355 threads_lock.value--;
356 *prior = match->next;
357 my_free(match);
358 count++;
359 }
360
361 /* let go of the threads list and return the number of threads joined */
362 release(&(threads_lock));
363 return count;
364}
365
366/* cancel and join the thread -- the thread will cancel when it gets to a file
367 operation, a sleep or pause, or a condition wait */
368void destruct(thread *off_course)
369{
370 int ret;
371
372 if ((ret = pthread_cancel(off_course->id)) != 0)
373 fail(ret);
374 join(off_course);
375}