Importing in main tree this library which used to be in tests tree
[mmondor.git] / mmsoftware / pthread_util / mm_pthread_pool.c
CommitLineData
c6268fe8
MM
1/* $Id: mm_pthread_pool.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
2
3/*
4 * Copyright (C) 2004-2005, Matthew Mondor
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. All advertising materials mentioning features or use of this software
16 * must display the following acknowledgement:
17 * This product includes software developed by Matthew Mondor.
18 * 4. The name of Matthew Mondor may not be used to endorse or promote
19 * products derived from this software without specific prior written
20 * permission.
21 * 5. Redistribution of source code may not be released under the terms of
22 * any GNU Public License derivate.
23 *
24 * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
25 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
26 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
27 * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
28 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
30 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
33 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * Implementation of a pool of ready threads which adapts with concurrency
38 * needs. These ready threads can serve requests passed through efficient
39 * inter-thread messaging. mmpool(3) is used for the pool functionality.
40 */
41
42
43
44#include <pthread.h>
45#include <stdlib.h>
46#include <errno.h>
47
48#include <mm_pthread_debug.h>
49#include <mm_pthread_pool.h>
50
51
52
53MMCOPYRIGHT("@(#) Copyright (c) 2004-2005\n\
54\tMatthew Mondor. All rights reserved.\n");
55MMRCSID("$Id: mm_pthread_pool.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
56
57
58
59/*
60 * STATIC FUNCTIONS PROTOTYPES
61 */
62
63inline static pthread_object_t *thread_object_alloc(void);
64inline static void thread_object_free(pthread_object_t *);
65static bool thread_object_constructor(pnode_t *);
66static void thread_object_destructor(pnode_t *);
67static void *thread_object_main(void *);
68
69
70
71/*
72 * GLOBALS
73 */
74
75static bool thread_object_initialized = FALSE;
76static pthread_attr_t thread_object_attr;
77static pool_t thread_object_pool;
78static pool_t thread_object_msg_pool;
79static pthread_mutex_t thread_object_pool_mutex =
80 PTHREAD_MUTEX_INITIALIZER;
81static pthread_mutex_t thread_object_msg_pool_mutex =
82 PTHREAD_MUTEX_INITIALIZER;
83static pthread_ring_t thread_started_ring;
84
85
86
87/*
88 * EXPORTED PUBLIC FUNCTIONS
89 */
90
91/*
92 * Must be called to initialize the pthreads pool subsystem, before calling
93 * any other function of this API. Returns 0 on success, or an error number.
94 * <initial> threads are launched, and more will be launched in increments
95 * of <initial> whenever necessary. These will also only be destroyed in
96 * decrements of <initial> whenever that many threads have not been in use for
97 * some time, and a minimum of <initial> threads will always be kept.
98 * Setting <initial> to high values may actually degrade performance with some
99 * unefficient threading implementations. It is not recommended to use more
100 * than 8 using the pth(3) library. Using NetBSD 2.0+ SA threads, a high
101 * number does not reduce performance. We current do not observe any limit
102 * whatsoever according to the number of threads launched over time. It is the
103 * application's responsibility to ensure to observe decent concurrency limits
104 * before calling pthread_object_call().
105 */
106int
107pthread_object_init(int initial)
108{
109 int error = 0;
110
111 DEBUG_PTHREAD_ENTRY();
112
113 if (thread_object_initialized) {
114 error = EINVAL;
115 goto err;
116 }
117
118 /*
119 * Create attributes which will be used for threads of the pool.
120 * We want them to be joinable.
121 */
122 if ((error = pthread_attr_init(&thread_object_attr)) != 0)
123 goto err;
124 if ((error = pthread_attr_setdetachstate(&thread_object_attr, 0))
125 != 0)
126 goto err;
127
128 /*
129 * We use this ring to obtain notification of ready children when
130 * launching them. This is required for proper synchronization to
131 * avoid aweful race conditions.
132 */
133 if ((error = pthread_ring_init(&thread_started_ring)) != 0)
134 goto err;
135
136 /*
137 * First initialize the message subsystem pool
138 */
139 if (!pool_init(&thread_object_msg_pool, "thread_object_msg_pool",
140 malloc, free, NULL, NULL, sizeof(pthread_object_msg_t),
141 32768 / sizeof(pthread_object_msg_t), 1, 0)) {
142 error = ENOMEM;
143 goto err;
144 }
145
146 /*
147 * Now initialize the threads pool. This creates threads, uses
148 * synchronization with thread_started_ring, and uses the message
149 * subsystem, which all must be initialized and ready.
150 */
151 if (!pool_init(&thread_object_pool, "thread_object_pool",
152 malloc, free, thread_object_constructor, thread_object_destructor,
153 sizeof(pthread_object_t), initial, 1, 0)) {
154 error = ENOMEM;
155 goto err;
156 }
157
158 thread_object_initialized = TRUE;
159
160 DEBUG_PTHREAD_EXIT();
161 return 0;
162
163err:
164 if (POOL_VALID(&thread_object_msg_pool))
165 pool_destroy(&thread_object_msg_pool);
166 if (POOL_VALID(&thread_object_pool))
167 pool_destroy(&thread_object_pool);
168
169 DEBUG_PTHREAD_EXIT();
170 return error;
171}
172
173/*
174 * Allows allocation/creation of a message suitable for asynchronous requests
175 * with the threads via their main message port provided by this system.
176 * Returns new message, or NULL on error.
177 */
178inline pthread_object_msg_t *
179pthread_object_msg_alloc(void)
180{
181 pthread_object_msg_t *msg = NULL;
182
183 DEBUG_PTHREAD_ENTRY();
184
185 if (pthread_mutex_lock(&thread_object_msg_pool_mutex) != 0)
186 goto err;
187 msg = (pthread_object_msg_t *)pool_alloc(&thread_object_msg_pool,
188 FALSE);
189 (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex);
190
191 (void) pthread_msg_init(&msg->message, NULL);
192
193err:
194 DEBUG_PTHREAD_EXIT();
195 return msg;
196}
197
198/*
199 * Permits to free/destroy a message which was allocated using
200 * pthread_object_msg_alloc() and sent asynchroneously.
201 */
202inline int
203pthread_object_msg_free(pthread_object_msg_t *msg)
204{
205 int error = 0;
206
207 DEBUG_PTHREAD_ENTRY();
208
209 (void) pthread_msg_destroy(&msg->message);
210
211 if ((error = pthread_mutex_lock(&thread_object_msg_pool_mutex)) != 0)
212 goto err;
213 (void) pool_free((pnode_t *)msg);
214 (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex);
215
216err:
217 DEBUG_PTHREAD_EXIT();
218 return error;
219}
220
221/*
222 * Allows to invoke a thread of the pool to perform execution of the wanted
223 * function. This is very efficient since the threads are already created and
224 * are waiting for requests. There is no maximum concurrency limit enforced by
225 * this system; It is the responsibility of the application to restrict
226 * concurrency as necessary by keeping internal information on the current
227 * number of requests. 0 is returned on success, or an error number.
228 * XXX Add support for synchroneous and asynchroneous operation. Current
229 * operation is only asynchroneous, but we would like to add a boolean here to
230 * decide. We also could add back the result value of the thread function
231 * which would only be useful in synchroneous operation, when we are waiting
232 * until the task ends... Of course, it's still easy for applications to use
233 * these in a synchroneous manner, by using a message and/or ring,
234 * conditionnal variable, etc.
235 * Also evaluate if a callback function to be called to notify end of
236 * asynchroneous operation would be useful.
237 */
238int
239pthread_object_call(pthread_port_t **port,
240 void (*function)(pthread_object_t *, void *), void *args)
241{
242 pthread_object_t *obj = NULL;
243 pthread_object_msg_t *msg = NULL;
244 int error;
245
246 DEBUG_PTHREAD_ENTRY();
247
248 if (function == NULL) {
249 error = EINVAL;
250 goto err;
251 }
252
253 /*
254 * Allocate a thread from the pool to reserve it, and tell it to call
255 * a function via a message. The message cannot be on the stack in
256 * this case, since it holds arguments to be passed to a thread, and
257 * also consists of an asynchroneous message for wich we do not expect
258 * a response back, waiting for it. We just dispatch it and go on.
259 */
260 if ((obj = thread_object_alloc()) == NULL) {
261 error = ENOMEM;
262 goto err;
263 }
264 if ((msg = pthread_object_msg_alloc()) == NULL) {
265 error = ENOMEM;
266 goto err;
267 }
268
269 msg->command = PTHREAD_OBJ_CALL;
270 msg->u.call.function = function;
271 msg->u.call.arguments = args;
272 if ((error = pthread_msg_put(obj->port, &msg->message)) != 0)
273 goto err;
274
275 /*
276 * Everything successful;
277 * If caller wants the message port of the thread, supply it
278 */
279 if (port != NULL)
280 *port = obj->port;
281
282 DEBUG_PTHREAD_EXIT();
283 return 0;
284
285err:
286 if (msg != NULL)
287 pthread_object_msg_free(msg);
288 if (obj != NULL)
289 thread_object_free(obj);
290
291 DEBUG_PTHREAD_EXIT();
292 return error;
293}
294
295
296
297/*
298 * INTERNAL STATIC FUNCTIONS
299 */
300
301/*
302 * Internally used to allocate a ready thread from the pool.
303 */
304inline static pthread_object_t *
305thread_object_alloc(void)
306{
307 pthread_object_t *obj = NULL;
308
309 DEBUG_PTHREAD_ENTRY();
310
311 if (pthread_mutex_lock(&thread_object_pool_mutex) != 0)
312 goto err;
313 obj = (pthread_object_t *)pool_alloc(&thread_object_pool, FALSE);
314 (void) pthread_mutex_unlock(&thread_object_pool_mutex);
315
316err:
317 return obj;
318}
319
320/*
321 * Internally used to free a no longer needed thread back to the pool of ready
322 * threads.
323 */
324inline static void
325thread_object_free(pthread_object_t *obj)
326{
327
328 DEBUG_PTHREAD_ENTRY();
329
330 if (pthread_mutex_lock(&thread_object_pool_mutex) == 0) {
331 (void) pool_free((pnode_t *)obj);
332 (void) pthread_mutex_unlock(&thread_object_pool_mutex);
333 }
334
335 DEBUG_PTHREAD_EXIT();
336}
337
338/*
339 * Internally called by mmpool(3) to create a thread object.
340 */
341static bool
342thread_object_constructor(pnode_t *pnode)
343{
344 pthread_object_t *obj = (pthread_object_t *)pnode;
345 int success = TRUE;
346
347 DEBUG_PTHREAD_ENTRY();
348
349 /*
350 * Note that we leave thread_object_main() initialize the port field
351 * when it creates its port and ring.
352 */
353 if (pthread_create(&obj->thread, &thread_object_attr,
354 thread_object_main, obj) != 0) {
355 success = FALSE;
356 goto err;
357 }
358
359 /*
360 * Wait until new thread ready notification. Without this, at least
361 * with NetBSD 2.0 SA threads, hell would break loose. Thread creation
362 * isn't really a bottleneck in our case anyways, since we only need
363 * to do it when all threads of the pool are already busy.
364 */
365 (void) pthread_ring_wait(&thread_started_ring, NULL);
366
367err:
368 DEBUG_PTHREAD_EXIT();
369 return success;
370}
371
372/*
373 * Internally called by mmpool(3) to destroy a thread object.
374 */
375static void
376thread_object_destructor(pnode_t *pnode)
377{
378 pthread_object_t *obj = (pthread_object_t *)pnode;
379 pthread_object_msg_t *msg;
380
381 DEBUG_PTHREAD_ENTRY();
382
383 /*
384 * To be freed, the thread has to be terminated. We thus send it a
385 * quit message and then wait for it to exit using pthread_join().
386 * Note that we let the thread destroy the port field. Although we
387 * theoretically could use a message on the stack here, let's be safe.
388 * Thread destruction is only performed rarely anyways, so this isn't
389 * a performance problem.
390 */
391 if ((msg = pthread_object_msg_alloc()) != NULL) {
392 msg->command = PTHREAD_OBJ_QUIT;
393 (void) pthread_msg_put(obj->port, &msg->message);
394 }
395 (void) pthread_join(obj->thread, NULL);
396
397 DEBUG_PTHREAD_EXIT();
398}
399
400/*
401 * Actual thread's main loop. We create a message port and listen for command
402 * messages (quit and call). When we obtain a quit request, we destroy the
403 * port and exit cleanly. The quit event can never occur during the execution
404 * of a call command, since it is only called on already freed thread nodes
405 * (by mmpool(3) pool_free()). It is advized to applications which need to
406 * obtain and use the port of the thread after thread_object_call() to only
407 * send proper user messages, not system reserved ones.
408 */
409static void *
410thread_object_main(void *args)
411{
412 pthread_object_t *obj = (pthread_object_t *)args;
413 pthread_port_t port;
414 pthread_ring_t ring;
415 pthread_msg_t *imsg;
416 pthread_object_msg_t *msg;
417
418 DEBUG_PTHREAD_ENTRY();
419
420 /*
421 * Create our incomming message port as well as its corresponding
422 * notification ring we can sleep on. Then advertize our port address.
423 * Ideally, we should somehow panic if any of this initialization
424 * fails. XXX
425 */
426 (void) pthread_port_init(&port);
427 (void) pthread_ring_init(&ring);
428 (void) pthread_port_set_ring(&port, &ring);
429 obj->port = &port;
430
431 /*
432 * Notify parent that we are ready, so that it may proceed
433 */
434 (void) pthread_ring_notify(&thread_started_ring);
435
436 /*
437 * Main loop, which keeps executing until we obtain a PTHREAD_OBJ_QUIT
438 * message, at which event we cleanly exit.
439 */
440 for (;;) {
441 /*
442 * Wait for any message(s) to be available, without taking any
443 * CPU time.
444 */
445 (void) pthread_ring_wait(&ring, NULL);
446
447 /*
448 * We were awaken because at least one message is available.
449 * Process all messages in the queue.
450 */
451 while ((imsg = pthread_msg_get(&port)) != NULL) {
452 msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]);
453 if (msg->command == PTHREAD_OBJ_QUIT) {
454 /*
455 * We are ordered to exit by the object
456 * destructor.
457 */
458 pthread_object_msg_free(msg);
459 goto end;
460 }
461 if (msg->command == PTHREAD_OBJ_CALL) {
462 /*
463 * Request to execute a function. This means
464 * that we were allocated/reserved first.
465 */
466 msg->u.call.function(obj,
467 msg->u.call.arguments);
468 pthread_object_msg_free(msg);
469 /*
470 * Free/release us back, so that we be
471 * available again to process further
472 * requests. It is possible that freeing
473 * ourselves cause a PTHREAD_OBJ_QUIT message
474 * to be queued soon on our port by the
475 * destructor function. This is safe, since
476 * the destructor does not cause us to be
477 * destroyed until it waits for us to have
478 * ended cleanly using pthread_join().
479 */
480 thread_object_free(obj);
481 }
482 }
483 }
484
485end:
486 /*
487 * Discard messages that are still queued on our port (if any)
488 */
489 while ((imsg = pthread_msg_get(&port)) != NULL) {
490 msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]);
491 pthread_object_msg_free(msg);
492 }
493 /*
494 * Free our resources and exit.
495 */
496 (void) pthread_port_destroy(&port);
497 (void) pthread_ring_destroy(&ring);
498
499 DEBUG_PTHREAD_EXIT();
500 pthread_exit(NULL);
501
502 /* NOTREACHED */
503 return NULL;
504}