mmlib/mmat: replace some variables by literal constants
[mmondor.git] / mmsoftware / mmlib / mm_pth_pool.c
1 /* $Id: mm_pth_pool.c,v 1.11 2007/12/05 23:47:56 mmondor Exp $ */
2
3 /*
4 * Copyright (C) 2004, Matthew Mondor
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. All advertising materials mentioning features or use of this software
16 * must display the following acknowledgement:
17 * This product includes software developed by Matthew Mondor.
18 * 4. The name of Matthew Mondor may not be used to endorse or promote
19 * products derived from this software without specific prior written
20 * permission.
21 * 5. Redistribution of source code may not be released under the terms of
22 * any GNU Public License derivate.
23 *
24 * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
25 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
26 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
27 * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
28 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
30 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
33 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 */
35
36
37
38 /* This system allows very efficient thread allocation and freeing, using the
39 * mmpool(3) allocator, with it's optional constructor/destructor support.
40 * These are only freed back when statistics determine that extra chunks of
41 * pre-allocated objects are definitely not needed any time soon. This means
42 * that most of the time, threads are already launched and waiting when we
43 * need them. We then simply need to allocate one of the pool and order it to
44 * perform the wanted tasks. Once the thread is done executing it, it can
45 * return in waiting mode again in the pool once freed.
46 *
47 * Since it is possible for the destructor function to be called when freeing
48 * a thread object at pool_free(), we are using a special reaper thread, which
49 * receives messages in a queue about which threads to pool_free().
50 * Actually, we do not currently use the reaper anymore, since it appears to
51 * be working allright without it. The object destroy function only queues a
52 * message for the thread, so it's safe without one. And it's faster this way.
53 * (there is some overhead in the Pth(3) library mutex and message passing
54 * implementation when multiple threads exist, probably because of the single
55 * select(2) based scheduler, or because the waiting queue processing needs a
56 * better algorithm perhaps).
57 *
58 * If a function needs to be cancellable, it should listen over the incomming
59 * message port of it's object for messages. This port is provided at
60 * thread_object_call(). Of course, the application can have the thread
61 * allocate a new message port and it's possible to use a custom message
62 * format, other than that provided.
63 *
64 * It is the responsibility of the functions called by the threads to release
65 * the resources they allocate before returning to prevent memory leaks. They
66 * also are responsible for freeing the messages they receive on their port if
67 * they are allocated by the sender which cannot have them on the stack.
68 */
69
70
71
72 /* HEADERFILES */
73
74 #include <stdbool.h>
75 #include <stdlib.h>
76
77 #include <mm_pth_pool.h>
78
79
80
81 MMCOPYRIGHT("@(#) Copyright (c) 2004\n\
82 \tMatthew Mondor. All rights reserved.\n");
83 MMRCSID("$Id: mm_pth_pool.c,v 1.11 2007/12/05 23:47:56 mmondor Exp $");
84
85
86
87 /* STATIC FUNCTIONS PROTOTYPES */
88
89 inline static struct thread_object * thread_object_alloc(void);
90 inline static void thread_object_free(
91 struct thread_object *);
92 static bool thread_object_constructor(pnode_t *);
93 static void thread_object_destructor(pnode_t *);
94 static void * thread_object_main(void *);
95 /*static void * thread_object_reaper(void *);*/
96
97
98
99 /* GLOBALS */
100
101 static pth_attr_t thread_object_attr;
102 /*static pth_msgport_t thread_object_reaper_port;*/
103 static pool_t thread_object_pool;
104 static pool_t thread_object_message_pool;
105 static pth_mutex_t thread_object_pool_mutex;
106 static pth_mutex_t thread_object_message_pool_mutex;
107
108
109
110 /* EXPORTED PUBLIC FUNCTIONS */
111
112 /* Call to initialize sybsystem before any other calls to this library */
113 int thread_object_init(void)
114 {
115 /* Create attributes which will be used for threads of the pool */
116 thread_object_attr = pth_attr_new();
117 (void) pth_attr_set(thread_object_attr, PTH_ATTR_JOINABLE, TRUE);
118
119 /* Create threads and messages pool_t. We first launch 8 threads here,
120 * more will be created as needed, in bunches of 8. Using too many
121 * actually degrades performance using pth(3), unfortunately.
122 * Threads will only be killed when statistics show that they have not be
123 * needed for some time. A minimum if 8 threads will always remain.
124 * We currently do not set any maximum limit. This means that your
125 * application is responsible for concurrency sanity checking before
126 * invoking thread_object_call() (I.E. number of maximum concurrently
127 * served clients at a time, etc).
128 */
129 if (!pool_init(&thread_object_pool, "thread_object_pool", malloc, free,
130 thread_object_constructor, thread_object_destructor,
131 sizeof(struct thread_object), 8, 1, 0))
132 goto err;
133 if (!pool_init(&thread_object_message_pool, "thread_object_message_pool",
134 malloc, free, NULL, NULL,
135 sizeof(struct thread_object_message),
136 32768 / sizeof(struct thread_object_message), 1, 0))
137 goto err;
138
139 /* Initialize our mutexes */
140 (void) pth_mutex_init(&thread_object_pool_mutex);
141 (void) pth_mutex_init(&thread_object_message_pool_mutex);
142
143 /* Launch our reaper thread. We never kill this thread afterwards. */
144 /*
145 if (pth_spawn(thread_object_attr, thread_object_reaper, NULL) == NULL)
146 goto err;
147 */
148
149 return 0;
150
151 err:
152 if (POOL_VALID(&thread_object_message_pool))
153 pool_destroy(&thread_object_message_pool);
154 if (POOL_VALID(&thread_object_pool))
155 pool_destroy(&thread_object_pool);
156
157 return -1;
158 }
159
160 struct thread_object_message *thread_object_message_alloc(void)
161 {
162 struct thread_object_message *msg;
163
164 (void) pth_mutex_acquire(&thread_object_message_pool_mutex, FALSE, NULL);
165 msg = (struct thread_object_message *)
166 pool_alloc(&thread_object_message_pool, FALSE);
167 (void) pth_mutex_release(&thread_object_message_pool_mutex);
168
169 return msg;
170 }
171
172 void thread_object_message_free(struct thread_object_message *msg)
173 {
174 (void) pth_mutex_acquire(&thread_object_message_pool_mutex, FALSE, NULL);
175 (void) pool_free((pnode_t *)msg);
176 (void) pth_mutex_release(&thread_object_message_pool_mutex);
177 }
178
179 /* Assigns a thread of the pool to execute a function. If port is not NULL,
180 * it is set to the port of the assigned thread, so that it may be used to
181 * send messages to the thread. This may be wanted for various reasons,
182 * including to be able to abort the function and cause the thread to
183 * be freed back to the pool. Returns 0 on success, or -1 on error, in which
184 * case a resource shortage occured.
185 */
186 int thread_object_call(pth_msgport_t *port,
187 void (*function)(struct thread_object *, void *), void *args)
188 {
189 struct thread_object *obj;
190 struct thread_object_message *msg;
191 int ret = -1;
192
193 if (function != NULL) {
194 /* Allocate thread from pool, order it to call our function, using a
195 * message. This message cannot be on the stack.
196 */
197 if ((obj = thread_object_alloc()) != NULL) {
198 if ((msg = thread_object_message_alloc()) != NULL) {
199 msg->command = TOMC_CALL;
200 msg->u.call.function = function;
201 msg->u.call.arguments = args;
202 (void) pth_msgport_put(obj->port, &msg->message);
203 /* If caller wants port of the thread, supply it */
204 if (port != NULL)
205 *port = obj->port;
206 /* Success */
207 ret = 0;
208 }
209 }
210 }
211
212 return ret;
213 }
214
215
216
217 /* INTERNAL STATIC FUNCTIONS */
218
219 inline static struct thread_object *thread_object_alloc(void)
220 {
221 struct thread_object *obj;
222
223 (void) pth_mutex_acquire(&thread_object_pool_mutex, FALSE, NULL);
224 obj = (struct thread_object *)pool_alloc(&thread_object_pool, FALSE);
225 (void) pth_mutex_release(&thread_object_pool_mutex);
226
227 return obj;
228 }
229
230 inline static void thread_object_free(struct thread_object *obj)
231 {
232 (void) pth_mutex_acquire(&thread_object_pool_mutex, FALSE, NULL);
233 (void) pool_free((pnode_t *)obj);
234 (void) pth_mutex_release(&thread_object_pool_mutex);
235 }
236
237 /* Internally called to create a thread object */
238 static bool thread_object_constructor(pnode_t *pnode)
239 {
240 struct thread_object *obj = (struct thread_object *)pnode;
241
242 /* Note that we leave thread_object_main() initialize the port field */
243 if ((obj->thread = pth_spawn(thread_object_attr, thread_object_main, obj))
244 == NULL)
245 return FALSE;
246
247 return TRUE;
248 }
249
250 /* Internally called to destroy a thread object */
251 static void thread_object_destructor(pnode_t *pnode)
252 {
253 struct thread_object *obj = (struct thread_object *)pnode;
254 struct thread_object_message *msg;
255
256 /* To be freed, the thread has to be terminated. We thus send it a quit
257 * message and then wait for it to exit using pth_join(). Note that we
258 * let the thread destroy the port field. Although we theoretically could
259 * use a message on the stack here, let's be safe.
260 */
261 if ((msg = thread_object_message_alloc()) != NULL) {
262 msg->command = TOMC_QUIT;
263 (void) pth_msgport_put(obj->port, &msg->message);
264 }
265 (void) pth_join(obj->thread, NULL);
266 }
267
268 /* Actual thread's main loop. We create a message port and listen for command
269 * messages (quit and call). When we obtain a quit request, we destroy the
270 * port and exit cleanly. The quit event can never occur during the execution
271 * of a call command, since it is only called on already freed thread nodes.
272 */
273 static void *thread_object_main(void *args)
274 {
275 struct thread_object *obj = args;
276 pth_msgport_t port;
277 pth_event_t ring;
278 pth_message_t *imsg;
279 struct thread_object_message *msg;
280
281 /* Create message port and associated ring */
282 port = pth_msgport_create("XXX(NULL)");
283 ring = pth_event(PTH_EVENT_MSG, port);
284
285 /* Advertize our port address */
286 obj->port = port;
287
288 for (;;) {
289 /* Wait for messages to be available */
290 if (pth_wait(ring) > 0) {
291 while ((imsg = pth_msgport_get(port)) != NULL) {
292 /* Process message and free it */
293 msg = (struct thread_object_message *)
294 (&((pnode_t *)imsg)[-1]);
295
296 if (msg->command == TOMC_QUIT) {
297 /* We are ordered to exit by the object destructor */
298 thread_object_message_free(msg);
299 goto end;
300 }
301 if (msg->command == TOMC_CALL &&
302 msg->u.call.function != NULL) {
303 /* We are ordered to execute a function */
304 msg->u.call.function(obj, msg->u.call.arguments);
305 thread_object_message_free(msg);
306 /* We now should free us back to the pool. Note that this
307 * can cause the destructor function to be called, which
308 * would have the effect of queueing a TOMC_QUIT message
309 * to our port. This isn't a problem, since the destructor
310 * function does not cause us to be destroyed, it will
311 * wait until we exit (see goto end).
312 */
313 thread_object_free(obj);
314 }
315 }
316 /* Tell the reaper to free this thread node back to the
317 * pool. We can't use a message on the stack for this.
318 * We cannot pool_free() us ourselves, since it could cause
319 * us to be destroyed while we are running.
320 */
321 /*
322 if ((msg = thread_object_message_alloc()) != NULL) {
323 msg->command = TOMC_QUIT;
324 msg->u.quit = obj;
325 (void) pth_msgport_put(thread_object_reaper_port,
326 &msg->message);
327 }
328 */
329 }
330 }
331
332 end:
333 /* If we reach this, we need to cleanly exit. This only occurs when our
334 * object destructor destroys us, because we were freed for some time
335 * already. Free all messages pending, if any, destroy our port and exit.
336 * The destructor is waiting for us at pth_join().
337 */
338 while ((imsg = pth_msgport_get(port)) != NULL) {
339 msg = (struct thread_object_message *)(&((pnode_t *)imsg)[-1]);
340 thread_object_message_free(msg);
341 }
342 pth_event_free(ring, PTH_FREE_ALL);
343 pth_msgport_destroy(port);
344 pth_exit(NULL);
345
346 /* NOTREACHED */
347 return NULL;
348 }
349
350 /* This consists of the reaper thread. It receives messages of
351 * thread_object_main() based threads before they exit. This allows us to
352 * automatically restore threads to their pool after they exit. This provides
353 * the caller of thread_object_call() with an API for detached asynchroneous
354 * threads which auto-cleanup on exit.
355 */
356 /* ARGSUSED */
357 /*
358 static void *thread_object_reaper(void *args)
359 {
360 pth_msgport_t port;
361 pth_event_t ring;
362 pth_message_t *imsg;
363 struct thread_object_message *msg;
364
365 port = pth_msgport_create("XXX(NULL)");
366 ring = pth_event(PTH_EVENT_MSG, port);
367
368 thread_object_reaper_port = port;
369
370 for (;;) {
371 if (pth_wait(ring) > 0) {
372 while ((imsg = pth_msgport_get(port)) != NULL) {
373 msg = (struct thread_object_message *)
374 (&((pnode_t *)imsg)[-1]);
375 if (msg->command == TOMC_QUIT && msg->u.quit != NULL)
376 thread_object_free(msg->u.quit);
377 thread_object_message_free(msg);
378 }
379 }
380 }
381
382 pth_exit(NULL);
383 }
384 */