mmlib/mmat: replace some variables by literal constants
[mmondor.git] / mmsoftware / pthread_util / mm_pthread_pool.c
CommitLineData
2969b477 1/* $Id: mm_pthread_pool.c,v 1.4 2009/01/17 03:40:54 mmondor Exp $ */
c6268fe8
MM
2
3/*
4 * Copyright (C) 2004-2005, Matthew Mondor
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. All advertising materials mentioning features or use of this software
16 * must display the following acknowledgement:
17 * This product includes software developed by Matthew Mondor.
18 * 4. The name of Matthew Mondor may not be used to endorse or promote
19 * products derived from this software without specific prior written
20 * permission.
21 * 5. Redistribution of source code may not be released under the terms of
22 * any GNU Public License derivate.
23 *
24 * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
25 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
26 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
27 * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
28 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
30 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
33 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36/*
37 * Implementation of a pool of ready threads which adapts with concurrency
38 * needs. These ready threads can serve requests passed through efficient
39 * inter-thread messaging. mmpool(3) is used for the pool functionality.
40 */
41
42
43
44#include <pthread.h>
45#include <stdlib.h>
46#include <errno.h>
47
48#include <mm_pthread_debug.h>
49#include <mm_pthread_pool.h>
50
51
52
53MMCOPYRIGHT("@(#) Copyright (c) 2004-2005\n\
54\tMatthew Mondor. All rights reserved.\n");
2969b477 55MMRCSID("$Id: mm_pthread_pool.c,v 1.4 2009/01/17 03:40:54 mmondor Exp $");
c6268fe8
MM
56
57
58
59/*
60 * STATIC FUNCTIONS PROTOTYPES
61 */
62
63inline static pthread_object_t *thread_object_alloc(void);
64inline static void thread_object_free(pthread_object_t *);
65static bool thread_object_constructor(pnode_t *);
66static void thread_object_destructor(pnode_t *);
67static void *thread_object_main(void *);
68
69
70
71/*
72 * GLOBALS
73 */
74
75static bool thread_object_initialized = FALSE;
76static pthread_attr_t thread_object_attr;
77static pool_t thread_object_pool;
78static pool_t thread_object_msg_pool;
79static pthread_mutex_t thread_object_pool_mutex =
80 PTHREAD_MUTEX_INITIALIZER;
81static pthread_mutex_t thread_object_msg_pool_mutex =
82 PTHREAD_MUTEX_INITIALIZER;
83static pthread_ring_t thread_started_ring;
61277fea
MM
84static void *(*thread_cons_uhook)(void);
85static void (*thread_dest_uhook)(void *);
c6268fe8
MM
86
87
88
89/*
90 * EXPORTED PUBLIC FUNCTIONS
91 */
92
93/*
94 * Must be called to initialize the pthreads pool subsystem, before calling
95 * any other function of this API. Returns 0 on success, or an error number.
96 * <initial> threads are launched, and more will be launched in increments
97 * of <initial> whenever necessary. These will also only be destroyed in
98 * decrements of <initial> whenever that many threads have not been in use for
99 * some time, and a minimum of <initial> threads will always be kept.
100 * Setting <initial> to high values may actually degrade performance with some
101 * unefficient threading implementations. It is not recommended to use more
102 * than 8 using the pth(3) library. Using NetBSD 2.0+ SA threads, a high
103 * number does not reduce performance. We current do not observe any limit
104 * whatsoever according to the number of threads launched over time. It is the
105 * application's responsibility to ensure to observe decent concurrency limits
106 * before calling pthread_object_call().
107 */
108int
61277fea
MM
109pthread_object_init(int initial,
110 void *(*cons_uhook)(void), void (*dest_uhook)(void *))
c6268fe8
MM
111{
112 int error = 0;
113
114 DEBUG_PTHREAD_ENTRY();
115
116 if (thread_object_initialized) {
117 error = EINVAL;
118 goto err;
119 }
120
61277fea
MM
121 /* Optional user thread constructor/destructor hooks */
122 thread_cons_uhook = cons_uhook;
123 thread_dest_uhook = dest_uhook;
124
c6268fe8
MM
125 /*
126 * Create attributes which will be used for threads of the pool.
127 * We want them to be joinable.
128 */
129 if ((error = pthread_attr_init(&thread_object_attr)) != 0)
130 goto err;
2b9051d1
MM
131 if ((error = pthread_attr_setdetachstate(&thread_object_attr,
132 PTHREAD_CREATE_JOINABLE)) != 0)
c6268fe8
MM
133 goto err;
134
135 /*
136 * We use this ring to obtain notification of ready children when
137 * launching them. This is required for proper synchronization to
138 * avoid aweful race conditions.
139 */
140 if ((error = pthread_ring_init(&thread_started_ring)) != 0)
141 goto err;
142
143 /*
144 * First initialize the message subsystem pool
145 */
146 if (!pool_init(&thread_object_msg_pool, "thread_object_msg_pool",
147 malloc, free, NULL, NULL, sizeof(pthread_object_msg_t),
148 32768 / sizeof(pthread_object_msg_t), 1, 0)) {
149 error = ENOMEM;
150 goto err;
151 }
152
153 /*
154 * Now initialize the threads pool. This creates threads, uses
155 * synchronization with thread_started_ring, and uses the message
156 * subsystem, which all must be initialized and ready.
157 */
158 if (!pool_init(&thread_object_pool, "thread_object_pool",
159 malloc, free, thread_object_constructor, thread_object_destructor,
160 sizeof(pthread_object_t), initial, 1, 0)) {
161 error = ENOMEM;
162 goto err;
163 }
164
165 thread_object_initialized = TRUE;
166
167 DEBUG_PTHREAD_EXIT();
168 return 0;
169
170err:
171 if (POOL_VALID(&thread_object_msg_pool))
172 pool_destroy(&thread_object_msg_pool);
173 if (POOL_VALID(&thread_object_pool))
174 pool_destroy(&thread_object_pool);
175
176 DEBUG_PTHREAD_EXIT();
177 return error;
178}
179
180/*
181 * Allows allocation/creation of a message suitable for asynchronous requests
182 * with the threads via their main message port provided by this system.
183 * Returns new message, or NULL on error.
184 */
185inline pthread_object_msg_t *
186pthread_object_msg_alloc(void)
187{
188 pthread_object_msg_t *msg = NULL;
189
190 DEBUG_PTHREAD_ENTRY();
191
192 if (pthread_mutex_lock(&thread_object_msg_pool_mutex) != 0)
193 goto err;
194 msg = (pthread_object_msg_t *)pool_alloc(&thread_object_msg_pool,
195 FALSE);
196 (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex);
197
198 (void) pthread_msg_init(&msg->message, NULL);
199
200err:
201 DEBUG_PTHREAD_EXIT();
202 return msg;
203}
204
205/*
206 * Permits to free/destroy a message which was allocated using
207 * pthread_object_msg_alloc() and sent asynchroneously.
208 */
209inline int
210pthread_object_msg_free(pthread_object_msg_t *msg)
211{
212 int error = 0;
213
214 DEBUG_PTHREAD_ENTRY();
215
216 (void) pthread_msg_destroy(&msg->message);
217
218 if ((error = pthread_mutex_lock(&thread_object_msg_pool_mutex)) != 0)
219 goto err;
220 (void) pool_free((pnode_t *)msg);
221 (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex);
222
223err:
224 DEBUG_PTHREAD_EXIT();
225 return error;
226}
227
228/*
229 * Allows to invoke a thread of the pool to perform execution of the wanted
230 * function. This is very efficient since the threads are already created and
231 * are waiting for requests. There is no maximum concurrency limit enforced by
232 * this system; It is the responsibility of the application to restrict
233 * concurrency as necessary by keeping internal information on the current
234 * number of requests. 0 is returned on success, or an error number.
235 * XXX Add support for synchroneous and asynchroneous operation. Current
236 * operation is only asynchroneous, but we would like to add a boolean here to
237 * decide. We also could add back the result value of the thread function
238 * which would only be useful in synchroneous operation, when we are waiting
239 * until the task ends... Of course, it's still easy for applications to use
240 * these in a synchroneous manner, by using a message and/or ring,
241 * conditionnal variable, etc.
242 * Also evaluate if a callback function to be called to notify end of
243 * asynchroneous operation would be useful.
244 */
245int
246pthread_object_call(pthread_port_t **port,
61277fea 247 void (*function)(pthread_object_t *, void *, void *), void *args)
c6268fe8
MM
248{
249 pthread_object_t *obj = NULL;
250 pthread_object_msg_t *msg = NULL;
251 int error;
252
253 DEBUG_PTHREAD_ENTRY();
254
255 if (function == NULL) {
256 error = EINVAL;
257 goto err;
258 }
259
260 /*
261 * Allocate a thread from the pool to reserve it, and tell it to call
262 * a function via a message. The message cannot be on the stack in
263 * this case, since it holds arguments to be passed to a thread, and
264 * also consists of an asynchroneous message for wich we do not expect
265 * a response back, waiting for it. We just dispatch it and go on.
266 */
267 if ((obj = thread_object_alloc()) == NULL) {
268 error = ENOMEM;
269 goto err;
270 }
271 if ((msg = pthread_object_msg_alloc()) == NULL) {
272 error = ENOMEM;
273 goto err;
274 }
275
276 msg->command = PTHREAD_OBJ_CALL;
277 msg->u.call.function = function;
278 msg->u.call.arguments = args;
279 if ((error = pthread_msg_put(obj->port, &msg->message)) != 0)
280 goto err;
281
282 /*
283 * Everything successful;
284 * If caller wants the message port of the thread, supply it
285 */
286 if (port != NULL)
287 *port = obj->port;
288
289 DEBUG_PTHREAD_EXIT();
290 return 0;
291
292err:
293 if (msg != NULL)
294 pthread_object_msg_free(msg);
295 if (obj != NULL)
296 thread_object_free(obj);
297
298 DEBUG_PTHREAD_EXIT();
299 return error;
300}
301
302
303
304/*
305 * INTERNAL STATIC FUNCTIONS
306 */
307
308/*
309 * Internally used to allocate a ready thread from the pool.
310 */
311inline static pthread_object_t *
312thread_object_alloc(void)
313{
314 pthread_object_t *obj = NULL;
315
316 DEBUG_PTHREAD_ENTRY();
317
318 if (pthread_mutex_lock(&thread_object_pool_mutex) != 0)
319 goto err;
320 obj = (pthread_object_t *)pool_alloc(&thread_object_pool, FALSE);
321 (void) pthread_mutex_unlock(&thread_object_pool_mutex);
322
323err:
2969b477 324 DEBUG_PTHREAD_EXIT();
c6268fe8
MM
325 return obj;
326}
327
328/*
329 * Internally used to free a no longer needed thread back to the pool of ready
330 * threads.
331 */
332inline static void
333thread_object_free(pthread_object_t *obj)
334{
335
336 DEBUG_PTHREAD_ENTRY();
337
338 if (pthread_mutex_lock(&thread_object_pool_mutex) == 0) {
339 (void) pool_free((pnode_t *)obj);
340 (void) pthread_mutex_unlock(&thread_object_pool_mutex);
341 }
342
343 DEBUG_PTHREAD_EXIT();
344}
345
346/*
347 * Internally called by mmpool(3) to create a thread object.
348 */
349static bool
350thread_object_constructor(pnode_t *pnode)
351{
352 pthread_object_t *obj = (pthread_object_t *)pnode;
353 int success = TRUE;
354
355 DEBUG_PTHREAD_ENTRY();
356
357 /*
358 * Note that we leave thread_object_main() initialize the port field
359 * when it creates its port and ring.
360 */
361 if (pthread_create(&obj->thread, &thread_object_attr,
362 thread_object_main, obj) != 0) {
363 success = FALSE;
364 goto err;
365 }
366
367 /*
368 * Wait until new thread ready notification. Without this, at least
369 * with NetBSD 2.0 SA threads, hell would break loose. Thread creation
370 * isn't really a bottleneck in our case anyways, since we only need
371 * to do it when all threads of the pool are already busy.
372 */
373 (void) pthread_ring_wait(&thread_started_ring, NULL);
374
375err:
376 DEBUG_PTHREAD_EXIT();
377 return success;
378}
379
380/*
381 * Internally called by mmpool(3) to destroy a thread object.
382 */
383static void
384thread_object_destructor(pnode_t *pnode)
385{
386 pthread_object_t *obj = (pthread_object_t *)pnode;
387 pthread_object_msg_t *msg;
388
389 DEBUG_PTHREAD_ENTRY();
390
391 /*
392 * To be freed, the thread has to be terminated. We thus send it a
393 * quit message and then wait for it to exit using pthread_join().
394 * Note that we let the thread destroy the port field. Although we
395 * theoretically could use a message on the stack here, let's be safe.
396 * Thread destruction is only performed rarely anyways, so this isn't
397 * a performance problem.
398 */
399 if ((msg = pthread_object_msg_alloc()) != NULL) {
400 msg->command = PTHREAD_OBJ_QUIT;
401 (void) pthread_msg_put(obj->port, &msg->message);
402 }
403 (void) pthread_join(obj->thread, NULL);
404
405 DEBUG_PTHREAD_EXIT();
406}
407
408/*
409 * Actual thread's main loop. We create a message port and listen for command
410 * messages (quit and call). When we obtain a quit request, we destroy the
411 * port and exit cleanly. The quit event can never occur during the execution
412 * of a call command, since it is only called on already freed thread nodes
413 * (by mmpool(3) pool_free()). It is advized to applications which need to
414 * obtain and use the port of the thread after thread_object_call() to only
415 * send proper user messages, not system reserved ones.
416 */
417static void *
418thread_object_main(void *args)
419{
420 pthread_object_t *obj = (pthread_object_t *)args;
421 pthread_port_t port;
422 pthread_ring_t ring;
423 pthread_msg_t *imsg;
424 pthread_object_msg_t *msg;
425
426 DEBUG_PTHREAD_ENTRY();
427
428 /*
429 * Create our incomming message port as well as its corresponding
430 * notification ring we can sleep on. Then advertize our port address.
431 * Ideally, we should somehow panic if any of this initialization
432 * fails. XXX
433 */
434 (void) pthread_port_init(&port);
435 (void) pthread_ring_init(&ring);
436 (void) pthread_port_set_ring(&port, &ring);
437 obj->port = &port;
438
61277fea
MM
439 /* Perform thread-specific udata initialization if needed */
440 if (thread_cons_uhook != NULL)
441 obj->udata = thread_cons_uhook();
442 else
443 obj->udata = NULL;
444
c6268fe8
MM
445 /*
446 * Notify parent that we are ready, so that it may proceed
447 */
448 (void) pthread_ring_notify(&thread_started_ring);
449
450 /*
451 * Main loop, which keeps executing until we obtain a PTHREAD_OBJ_QUIT
452 * message, at which event we cleanly exit.
453 */
454 for (;;) {
455 /*
456 * Wait for any message(s) to be available, without taking any
457 * CPU time.
458 */
459 (void) pthread_ring_wait(&ring, NULL);
460
461 /*
462 * We were awaken because at least one message is available.
463 * Process all messages in the queue.
464 */
465 while ((imsg = pthread_msg_get(&port)) != NULL) {
466 msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]);
467 if (msg->command == PTHREAD_OBJ_QUIT) {
468 /*
469 * We are ordered to exit by the object
470 * destructor.
471 */
472 pthread_object_msg_free(msg);
473 goto end;
474 }
475 if (msg->command == PTHREAD_OBJ_CALL) {
476 /*
477 * Request to execute a function. This means
478 * that we were allocated/reserved first.
479 */
480 msg->u.call.function(obj,
61277fea 481 msg->u.call.arguments, obj->udata);
c6268fe8
MM
482 pthread_object_msg_free(msg);
483 /*
484 * Free/release us back, so that we be
485 * available again to process further
486 * requests. It is possible that freeing
487 * ourselves cause a PTHREAD_OBJ_QUIT message
488 * to be queued soon on our port by the
489 * destructor function. This is safe, since
490 * the destructor does not cause us to be
491 * destroyed until it waits for us to have
492 * ended cleanly using pthread_join().
493 */
494 thread_object_free(obj);
495 }
496 }
497 }
498
499end:
500 /*
501 * Discard messages that are still queued on our port (if any)
502 */
503 while ((imsg = pthread_msg_get(&port)) != NULL) {
504 msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]);
505 pthread_object_msg_free(msg);
506 }
507 /*
508 * Free our resources and exit.
509 */
61277fea
MM
510 if (thread_dest_uhook != NULL && obj->udata != NULL)
511 thread_dest_uhook(obj->udata);
c6268fe8
MM
512 (void) pthread_port_destroy(&port);
513 (void) pthread_ring_destroy(&ring);
514
515 DEBUG_PTHREAD_EXIT();
516 pthread_exit(NULL);
517
518 /* NOTREACHED */
519 return NULL;
520}