*** empty log message ***
authorMatthew Mondor <mmondor@pulsar-zone.net>
Mon, 12 Sep 2005 12:04:25 +0000 (12:04 +0000)
committerMatthew Mondor <mmondor@pulsar-zone.net>
Mon, 12 Sep 2005 12:04:25 +0000 (12:04 +0000)
mmsoftware/mmlib/mmfd.c
mmsoftware/mmlib/mmserver2.c
mmsoftware/mmlib/mmserver2.h
tests/pthread_utils/mm_pthread_msg.c
tests/pthread_utils/mm_pthread_msg.h
tests/pthread_utils/mm_pthread_poll.c
tests/pthread_utils/mm_pthread_pool.c
tests/pthread_utils/mm_pthread_sleep.c
tests/pthread_utils/msg_test.c

index 2105301..834db4b 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmfd.c,v 1.22 2004/11/12 16:26:01 mmondor Exp $ */
+/* $Id: mmfd.c,v 1.23 2005/09/12 12:04:25 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
@@ -59,7 +59,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmfd.c,v 1.22 2004/11/12 16:26:01 mmondor Exp $");
+MMRCSID("$Id: mmfd.c,v 1.23 2005/09/12 12:04:25 mmondor Exp $");
 
 
 
@@ -172,7 +172,7 @@ fdprintf(fdfuncs *fdf, int fd, const char *fmt, ...)
     char buf[1024];
     va_list arg_ptr;
 
-    *buf = 0;
+    *buf = '\0';
     va_start(arg_ptr, fmt);
     vsnprintf(buf, 1023, fmt, arg_ptr);
     va_end(arg_ptr);
index f6914aa..c5d3349 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmserver2.c,v 1.47 2005/06/21 13:48:20 mmondor Exp $ */
+/* $Id: mmserver2.c,v 1.48 2005/09/12 12:04:25 mmondor Exp $ */
 
 /*
  * Copyright (C) 2004, Matthew Mondor
  * - Add some support for request maximum timeout (at least for using_execve
  *   applications, since there is no other way to cause execve(2) requests
  *   to interrupt than to kill the process from another one (the parent could).
- *   This however is tricky. Applications which don't use execve(2) don't need
- *   such a timer as they can set one up themselves. However, if they do use
- *   execve(2), the children processes cannot rely on SIGALRM anymore. They
+ *   This however is tricky.  Applications which don't use execve(2) don't need
+ *   such a timer as they can set one up themselves.  However, if they do use
+ *   execve(2), the children processes cannot rely on SIGALRM anymore.  They
  *   then need another process to interrupt them using SIGTERM if they need
- *   to be interrupted after a timeout. The parent could do it with it's
+ *   to be interrupted after a timeout.  The parent could do it with it's
  *   timer_ctx via mmalarm(3) easily, but because the children are answering
  *   and handling the incomming requests, how can they tell the parent to
  *   start such a timer for them, unless I used a special IPC mechanism.
  *   If they had to launch their own control process using fork(2) it would
- *   slow down the system too much... I could perhaps use some kind of
- *   AF_LOCAL SOCK_DGRAM trick. A child process would simply send a packet
+ *   slow down the system too much...  I could perhaps use some kind of
+ *   AF_LOCAL SOCK_DGRAM trick.  A child process would simply send a packet
  *   with it's pid_t, causing a new timeout to be created in the parent to
- *   kill that pid_t with SIGTERM if it expires. However, we also need to
- *   get rid of any existing entry at the SIGCHLD handler... server_execve()
+ *   kill that pid_t with SIGTERM if it expires.  However, we also need to
+ *   get rid of any existing entry at the SIGCHLD handler...  server_execve()
  *   would be responsible for starting the timer before calling execve(2),
- *   noticing the parent to do so. The SIGCHLD handler in the parent would
+ *   noticing the parent to do so.  The SIGCHLD handler in the parent would
  *   be responsible for stopping the timer if it did not expire already.
  * - Find a way to optimize server_address related macros to use a lookup
  *   table rather than conditionals.
 
 MMCOPYRIGHT("@(#) Copyright (c) 2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmserver2.c,v 1.47 2005/06/21 13:48:20 mmondor Exp $");
+MMRCSID("$Id: mmserver2.c,v 1.48 2005/09/12 12:04:25 mmondor Exp $");
 
 
 
 /* DEFINITIONS */
 
 /*
- * We store a node for each child process with it's current status. Each of
+ * We store a node for each child process with it's current status.  Each of
  * them have the address of their node, so that they efficiently may change
- * their status. We do not need a synchronization lock since every child
+ * their status.  We do not need a synchronization lock since every child
  * process only accesses it's own node, to which it has a pointer, and because
  * only the master parent process adds or removes these nodes.
  */
@@ -263,7 +263,7 @@ struct local {
 /*
  * Internally stored for each socket created with server_socket_bind().
  * Everytime server_socket_bind() is called successfully, we store such a new
- * node in a list_t (parent.sockets_list). When server_start() is called,
+ * node in a list_t (parent.sockets_list).  When server_start() is called,
  * an array of index with corresponding pollfd and int lock filedescriptors
  * is created for efficient internal use in the main server loop.
  */
@@ -282,7 +282,7 @@ struct socket_node {
 /*
  * We store one such node in the socket-specific address cache, used to
  * implement optional rate and/or concurrent number of connections on an
- * address basis. Each socket has an independent cache with it's own
+ * address basis.  Each socket has an independent cache with it's own
  * corresponding synchronization lock.
  */
 struct address_node {
@@ -293,7 +293,7 @@ struct address_node {
        /*
         * The following is used to speed up hostname cache lookups when
         * frequent connections occur from the same address, where the lookup
-        * only has to be done when this entry is added. We store a reference
+        * only has to be done when this entry is added.  We store a reference
         * count in the hostname_node structure so that we know when it can
         * safely be freed.
         */
@@ -409,9 +409,9 @@ server_init(void)
 
 /*
  * Allows the caller to initialize shared memory blocks before starting up the
- * server. The caller is responsible for remembering the pointers to these
+ * server.  The caller is responsible for remembering the pointers to these
  * addresses, as well as to use any necessary synchronization when accessing
- * the shared memory concurrently. This must be done at system initialization,
+ * the shared memory concurrently.  This must be done at system initialization,
  * before launching the server processes, or we return NULL with EPERM.
  */
 void *
@@ -428,8 +428,8 @@ server_shmem_malloc(size_t size)
 
 /*
  * Permits the caller to initialize a pool of shared resources synchronization
- * read-write locks. This must be done before launching the server processes,
- * or we return -1 with EPERM. It can only be called once, make sure to create
+ * read-write locks.  This must be done before launching the server processes,
+ * or we return -1 with EPERM.  It can only be called once, make sure to create
  * the necessary number of locks at once.
  */
 int
@@ -479,8 +479,8 @@ server_rwlocks_init(const char *path, int num, const char *user,
 /*
  * Allows to synchronize wanted shared memory resources, locking or unlocking
  * read-write locks in shared or exclusive mode, with optional nonblocking
- * flag. Locks the process until the lock can be obtained in blocking mode.
- * The id corresponds to a lock number index. Can only be used after starting
+ * flag.  Locks the process until the lock can be obtained in blocking mode.
+ * The id corresponds to a lock number index.  Can only be used after starting
  * the server, and of course only if locks were already setup using
  * server_rwlocks_init(), or we return -1 with EPERM.
  * We return -1 with EINVAL if the supplied lock id is invalid.
@@ -530,7 +530,7 @@ server_rwlock_ctl(int id, int op)
 
 /*
  * Allows to bind to an AF_INET(6) or AF_LOCAL SOCK_DGRAM or SOCK_STREAM
- * listening socket. Can only be called before starting the server or -1 is
+ * listening socket.  Can only be called before starting the server or -1 is
  * returned with errno set to EPERM.
  */
 int
@@ -681,7 +681,7 @@ server_socket_bind(struct server_socket_config *config)
        }
 
        /*
-        * Allocate socket_node and initialize it. We allocate it as shared
+        * Allocate socket_node and initialize it.  We allocate it as shared
         * memory since the hashtable_t and pool_t structures for the address
         * cache need to be shared.
         */
@@ -810,11 +810,11 @@ err:
 }
 
 /*
- * Finally launches the whole thing and start to serve connections. At least
+ * Finally launches the whole thing and start to serve connections.  At least
  * sockets must have been setup using server_socket_bind() before calling this
- * function, or -1 is returned with EPERM. Otherwise this function never
- * returns on success. If this function returns, the application is expected
- * to exit(3). If it doesn't, an attempt is still made to not leak resources.
+ * function, or -1 is returned with EPERM.  Otherwise this function never
+ * returns on success.  If this function returns, the application is expected
+ * to exit(3).  If it doesn't, an attempt is still made to not leak resources.
  */
 int
 server_start(struct server_config *config)
@@ -907,7 +907,7 @@ server_start(struct server_config *config)
 
        /*
         * Initialize necessary structures for our sockets; Our pollfd array
-        * and corresponding socket_node index array. Also create our
+        * and corresponding socket_node index array.  Also create our
         * socket-specific locks and hashtable_t if we must observe any
         * limits.  Note that we still create these locks if a socket needs
         * no limits, but that they will not be used for such sockets.
@@ -931,7 +931,7 @@ server_start(struct server_config *config)
                snode->lock = &slocks[i];
                /*
                 * Here we allocate memory on the normal heap for the packet
-                * buffer which needs to be process-specific. We however tie
+                * buffer which needs to be process-specific.  We however tie
                 * this buffer pointer to the shared structure, which still
                 * shouldn't hurt since the vm space of children processes
                 * should cause the pointer to still point to process-specific
@@ -962,9 +962,9 @@ server_start(struct server_config *config)
        }
 
        /*
-        * Initialize hostname cache. We ensure that this cache be large
+        * Initialize hostname cache.  We ensure that this cache be large
         * enough to be shared among sockets and to accomodate the total of
-        * all their allowed addresses. Of course, we only take in
+        * all their allowed addresses.  Of course, we only take in
         * consideration sockets with hostname resolution enabled.
         * Although this cache may be rather large, mmpool(3) ensures to
         * reuse recently accessed pages, so that BSD should be able to page
@@ -1008,11 +1008,11 @@ server_start(struct server_config *config)
        }
 
        /*
-        * Initialize global process status stuff. Although the allocation
+        * Initialize global process status stuff.  Although the allocation
         * pool_t  for child_process structures is stored in shared memory,
         * the child_pid_status_node one, processes_list and
         * processes_pid_table can all use the standard heap since only the
-        * parent accesses them. We use a list_t for the list which needs to
+        * parent accesses them.  We use a list_t for the list which needs to
         * frequently be iterated through for speed, and a hashtable_t if
         * needed for pid->status lookup table if using_execve is enabled.
         */
@@ -1127,9 +1127,9 @@ server_exit(void)
 
 /*
  * Can only be called by the parent process (typically in the SIGHUP handler
- * function). This causes all current children processes to recycle whenever
- * they complete their current handler. If interrupt is TRUE, causes them to
- * also immediately abort their current handler and then recycle. Returns -1
+ * function).  This causes all current children processes to recycle whenever
+ * they complete their current handler.  If interrupt is TRUE, causes them to
+ * also immediately abort their current handler and then recycle.  Returns -1
  * with EPERM if not called by the parent process.
  */
 int
@@ -1145,7 +1145,7 @@ server_recycle(bool interrupt)
 
        /*
         * Block SIGALRM so that we may not be bothered by the process manager
-        * freeing nodes, and SIGHUP to not recurse. We also block SIGUSR1 so
+        * freeing nodes, and SIGHUP to not recurse.  We also block SIGUSR1 so
         * that the children processes do not cause the process manager to be
         * called during this time.
         */
@@ -1204,7 +1204,7 @@ server_recycle(bool interrupt)
 /*
  * Although when exiting the handler() function the connection is automatically
  * closed if need be (I.E. a SOCK_STREAM socket), it is possible to call this
- * function to perform the same functionality. This also handles the address
+ * function to perform the same functionality.  This also handles the address
  * cache limits update.
  */
 void
@@ -1216,9 +1216,9 @@ server_close(void)
 }
 
 /*
- * A somewhat alarm(3) compatible function using setitimer(2). Suitable to be
+ * A somewhat alarm(3) compatible function using setitimer(2).  Suitable to be
  * used standalone instead of the deprecated alarm(3), or to be internally used
- * with mmalarm(3). Makes sure to not call setitimer(2) when called with 0
+ * with mmalarm(3).  Makes sure to not call setitimer(2) when called with 0
  * seconds to disable any timer if no timer is currently running, as an
  * optimization.
  */
@@ -1246,9 +1246,9 @@ server_alarm(unsigned int seconds)
 
 /*
  * execve(2) replacement performing some library-specific sanity checking
- * and never returning if calling execve(2). We also make sure to tie the
+ * and never returning if calling execve(2).  We also make sure to tie the
  * current client connection to our stdin/stdout filedescriptors, and to
- * leave stderr tied to null(4). We return -1 with EPERM if a SOCK_STREAM
+ * leave stderr tied to null(4).  We return -1 with EPERM if a SOCK_STREAM
  * socket is not being served by a children process.
  * XXX It might be good to append stderr to a file if the application requests
  * it, like in mmspawnd(8).
@@ -1276,7 +1276,7 @@ server_execve(const char *path, char * const *argv, char * const *envp)
        (void) close(local.socket);
 
        /*
-        * Restore default signals actions. If we do not do this and the
+        * Restore default signals actions.  If we do not do this and the
         * current process ignored any signal, they will be ignored after
         * execve(2).  Ones we handle will be automaticaly restored to the
         * default action.
@@ -1620,10 +1620,10 @@ parent_launch(void)
 }
 
 /*
- * Main parent process loop. We ensure to manage the children processes pool,
+ * Main parent process loop.  We ensure to manage the children processes pool,
  * as well as flush expired client addresses entries from each of the socket's
- * caches. We also then flush no longer referenced hostname cached entries from
- * the global hostname resolution cache.
+ * caches.  We also then flush no longer referenced hostname cached entries
+ * from the global hostname resolution cache.
  */
 static void
 parent_main(void)
@@ -1772,8 +1772,8 @@ parent_main(void)
 
 /*
  * Iterate through all nodes of a hashtable_t to reset or expire address
- * and hostname cache nodes. It is called by the above function whenever
- * the soonest to expire node timer expires. This function also leaves
+ * and hostname cache nodes.  It is called by the above function whenever
+ * the soonest to expire node timer expires.  This function also leaves
  * the caller with the next to expire time so that a timer may be restarted.
  */
 static bool
@@ -1784,8 +1784,8 @@ parent_main_address_iterator(hashnode_t *hnode, void *udata)
        time_t                          rem;
 
        /*
-        * If the node expired, reset it. For those which do not, record the
-        * soonest to expire one. For nodes which expired and for which no
+        * If the node expired, reset it.  For those which do not, record the
+        * soonest to expire one.  For nodes which expired and for which no
         * connections exist anymore, expunge them, and update the
         * hostname_node reference count entry, freeing the hostname cache
         * entry if it has no more references.
@@ -1837,7 +1837,7 @@ parent_main_expire(timerid_t tid, void *udata)
 }
 
 /*
- * This function permits to manage the pool of children processes. It is
+ * This function permits to manage the pool of children processes.  It is
  * called every second.
  */
 /* ARGSUSED */
@@ -2135,9 +2135,9 @@ parent_exit(int code)
 
        /*
         * Block SIGALRM, SIGHUP and SIGUSR1 then send SIGTERM to all children
-        * processes which we are allowed to kill. Then make sure that they
+        * processes which we are allowed to kill.  Then make sure that they
         * exit.  We however won't kill processes which are currently serving
-        * requests if exit_interrupt_children is FALSE. We instead will cause
+        * requests if exit_interrupt_children is FALSE.  We instead will cause
         * them to exit after they terminate serving their request.
         */
        (void) sigemptyset(&set);
@@ -2369,7 +2369,7 @@ child_launch(void)
 }
 
 /*
- * The main children process loop. Listen for a client request and process it,
+ * The main children process loop.  Listen for a client request and process it,
  * and then do the same again until we reached our maximum number of requests
  * to process, in which case we exit to be replaced immediately.
  */
@@ -2452,7 +2452,7 @@ child_main(void)
 
                /*
                 * Before releasing the accept lock we must either accept(2)
-                * on the TCP port or recvfrom(2) for UDP. This should
+                * on the TCP port or recvfrom(2) for UDP.  This should
                 * normally not block the process as we just used poll(2).
                 * First set addrl to the proper length for the address family.
                 */
@@ -2593,12 +2593,13 @@ err:
 /*
  * Register or update limits for the specified client address into the current
  * socket's address cache, while also resolving the hostname, using the global
- * hostname cache as necessary. address_close() must be called for this address
- * in order to balance this effect. This also will automatically resolve an
- * address to a hostname if necessary (using the hostname cache for speed).
+ * hostname cache as necessary.  address_close() must be called for this
+ * address in order to balance this effect.  This also will automatically
+ * resolve an address to a hostname if necessary (using the hostname cache for
+ * speed).
  * If there is an address cache size issue and that a new address cannot be
- * added into it, NULL is returned. We always return the address_node pointer
- * otherwise. If returned reason is not REASON_OK, an error occured and the
+ * added into it, NULL is returned.  We always return the address_node pointer
+ * otherwise.  If returned reason is not REASON_OK, an error occured and the
  * rejection handler should be called instead of the request handler.
  */
 static struct address_node *
@@ -2699,7 +2700,7 @@ address_open(struct socket_node *snode, struct server_sockaddr *addr,
                struct hostname_node    *hnode2;
 
                /*
-                * Hostname resolution needed. We ensure to be able for
+                * Hostname resolution needed.  We ensure to be able for
                 * multiple children processes to concurrently perform
                 * hostname resolution, including possibly on the same
                 * address since we cannot perform address node granularity
@@ -2792,7 +2793,7 @@ hcache_err:
 
 /*
  * Update the address concurency counter so that the parent process may
- * garbage collect this address node whenever it expires. We do not care
+ * garbage collect this address node whenever it expires.  We do not care
  * about the hostname_node reference counter here and let the parent process
  * deal with that.
  */
@@ -2835,7 +2836,7 @@ child_sighandler(int sig)
                /* NOTREACHED */
                break;
        case SIGPIPE:
-               /* Connection loss. Interrupt the current connection. */
+               /* Connection loss.  Interrupt the current connection. */
                child_close(TRUE);
                /* NOTREACHED */
                break;
@@ -2861,8 +2862,8 @@ child_sighandler(int sig)
 }
 
 /*
- * Closes connection with client as necessary. If resume is TRUE, uses a
- * non-local jump to resume to accepting other client requests. Otherwise,
+ * Closes connection with client as necessary.  If resume is TRUE, uses a
+ * non-local jump to resume to accepting other client requests.  Otherwise,
  * this function returns.
  */
 static void
index a9bf513..d12d1a7 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmserver2.h,v 1.11 2005/06/21 13:48:20 mmondor Exp $ */
+/* $Id: mmserver2.h,v 1.12 2005/09/12 12:04:25 mmondor Exp $ */
 
 /*
  * Copyright (C) 2004, Matthew Mondor
  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
+/*
+ * Public API provided to applications
+ */
+
 
 
 #ifndef MMLIB_MMSERVER2_H
index 9c4eee8..6dad6fb 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mm_pthread_msg.c,v 1.1 2004/12/27 11:16:15 mmondor Exp $ */
+/* $Id: mm_pthread_msg.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
 
 /*
  * Copyright (C) 2005, Matthew Mondor
 
 /*
  * It is almost a shame that POSIX did not define a standard API for
- * inter-thread asynchroneous and synchroneous messaging. So, here is my
- * implementation. Note that for asynchroneous operation it is recommended to
- * use mmpool(3) to allocate and free messages in an efficient way, in cases
- * where messages will need to be sent to the other end without expecting a
- * response back before the current function ends (in which case a message
- * obviously can't be on the stack).
+ * inter-thread asynchroneous and synchroneous messaging.  So, here is my
+ * implementation.  Note that for asynchroneous operation it is recommended to
+ * use a memory pool such as mmpool(3) to allocate and free messages in an
+ * efficient way, in cases where messages will need to be sent to the other
+ * end without expecting a response back before the current function ends
+ * (in which case a message obviously can't be on the stack).
  */
 
 
 
 MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mm_pthread_msg.c,v 1.1 2004/12/27 11:16:15 mmondor Exp $");
+MMRCSID("$Id: mm_pthread_msg.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
 
 
 
 /*
- * Allows to initialize a polling notification handle. When attached to a
+ * Allows to initialize a polling notification handle.  When attached to a
  * port, a message arriving on an empty port causes the associated ring to
  * wake the thread from pthread_ring_wait().
  */
@@ -77,7 +77,7 @@ pthread_ring_init(pthread_ring_t *ring)
        if ((error = pthread_cond_init(&ring->cond, NULL)) == 0) {
                if ((error = pthread_mutex_init(&ring->mutex, NULL)) == 0) {
                        ring->magic = PRING_MAGIC;
-                       ring->event = 0;
+                       ring->event = ring->mevent = 0;
                        return 0;
                }
                (void) pthread_cond_destroy(&ring->cond);
@@ -87,7 +87,7 @@ pthread_ring_init(pthread_ring_t *ring)
 }
 
 /*
- * Destroys a ring. Note that all message ports attached to this ring should
+ * Destroys a ring.  Note that all message ports attached to this ring should
  * first be detached or destroyed.
  */
 int
@@ -106,11 +106,15 @@ pthread_ring_destroy(pthread_ring_t *ring)
 
 /*
  * Causes the current thread to sleep until a message arrives on an empty port
- * associated with this ring. In normal operation, a thread only goes in wait
+ * associated with this ring.  In normal operation, a thread only goes in wait
  * mode after it processed all queued messages on all interesting ports.
+ * However, provision is made so that a the function returns immediately if
+ * messages already were received on a port attached to this ring since the
+ * last call to pthread_ring_wait().
  * Although using such an absolute time timespec might be disadvantageous for
  * the API compared to a timeout in milliseconds for instance, this was chosen
- * to remain API-compatible with pthread_cond_timedwait().
+ * to remain API-compatible with pthread_cond_timedwait(), and upwards
+ * compatible with systems where nanosecond precision can be achieved.
  */
 int
 pthread_ring_wait(pthread_ring_t *ring, const struct timespec *abstime)
@@ -124,7 +128,7 @@ pthread_ring_wait(pthread_ring_t *ring, const struct timespec *abstime)
                return -1;
 
        /* As long as we don't have confirmation that we must stop waiting */
-       for (ring->event = 0; !ring->event && error == 0; ) {
+       for (ring->event = 0; !ring->mevent && !ring->event && error == 0; ) {
                /*
                 * Wait on conditional variable, which will automatically
                 * and atomically release the mutex and return with the mutex
@@ -137,6 +141,7 @@ pthread_ring_wait(pthread_ring_t *ring, const struct timespec *abstime)
                } else
                        error = pthread_cond_wait(&ring->cond, &ring->mutex);
        }
+       ring->mevent = 0;
 
        /*
         * And we know that conditional waiting functions returned with mutex
@@ -149,8 +154,9 @@ pthread_ring_wait(pthread_ring_t *ring, const struct timespec *abstime)
 
 /*
  * Allows to wake up waiter(s) on the specified ring, which are sleeping
- * threads within pthread_ring_wait(). This can be used to simulate the
- * arrival of a message on an empty port.
+ * threads within pthread_ring_wait().  This can be used to simulate the
+ * arrival of a message on an empty port.  Also useful to use rings as a
+ * notification system only when no message passing is needed.
  */
 int
 pthread_ring_notify(pthread_ring_t *ring)
@@ -160,7 +166,7 @@ pthread_ring_notify(pthread_ring_t *ring)
        DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC);
 
        if ((error = pthread_mutex_lock(&ring->mutex)) == 0) {
-               ring->event = 1;
+               ring->event = ring->mevent = 1;
                (void) pthread_cond_signal(&ring->cond);
                (void) pthread_mutex_unlock(&ring->mutex);
        }
@@ -203,9 +209,10 @@ pthread_port_destroy(pthread_port_t *port)
 }
 
 /*
- * Attaches a port to a ring. Multiple ports may be attached to a ring. A
+ * Attaches a port to a ring.  Multiple ports may be attached to a ring.  A
  * message arriving on an empty port will cause the attached ring to be
- * notified, if any.
+ * notified, if any, and as such to cause a thread waiting on the ring to
+ * be awakened.
  */
 int
 pthread_port_set_ring(pthread_port_t *port, pthread_ring_t *ring)
@@ -220,9 +227,11 @@ pthread_port_set_ring(pthread_port_t *port, pthread_ring_t *ring)
 }
 
 /*
- * Allows to initialize a message before it can be sent over a port. The
+ * Allows to initialize a message before it can be sent over a port.  The
  * message only needs to be initialized once in general, even if it will be
- * used for bidirectional transmission for synchronous operation.
+ * used for bidirectional transmission for synchronous operation.  If the
+ * reply port needs to be changed, however, this function should be used again
+ * to set the new reply port.
  */
 int
 pthread_msg_init(pthread_msg_t *msg, pthread_port_t *rport)
@@ -255,8 +264,9 @@ pthread_msg_destroy(pthread_msg_t *msg)
 
 /*
  * If any message exists in the queue of the specified port, unqueues it and
- * returns it. Otherwise, NULL is returned. In normal operation, all messages
- * queued to a port are processed before putting the thread back into sleep.
+ * returns it.  Otherwise, NULL is returned.  In normal operation, all messages
+ * queued to a port are processed before putting the thread back into sleep,
+ * mainly for efficiency, but also because it eases synchronization.
  */
 pthread_msg_t *
 pthread_msg_get(pthread_port_t *port)
@@ -280,14 +290,15 @@ pthread_msg_get(pthread_port_t *port)
 /*
  * Queues the specified message to the specified port, returning 0 on success.
  * Note that the message data is not copied or moved, but that a pointer
- * system is used to queue the message. One has to be careful to not allocate
- * this message space on the stack when asynchroneous operation is wanted. In
- * synchroneous operation mode, it is not a problem, since the sender does not
- * have to modify the data until the other end replies back with the same
- * message after modifying the message if necessary. In synchroneous mode, we
- * simply delegate that message memory region to the other end until it
- * notifies us with a reply that it is done working with it.
- * Returns 0 on success, or an error number.
+ * system is used to queue the message.  Thus, the message's shared memory
+ * region is leased temporarily to the other end.  One has to be careful to
+ * not allocate this message space on the stack when asynchroneous operation
+ * is needed.  In synchroneous operation mode, it is not a problem, since the
+ * sender does not have to modify the data until the other end replies back
+ * with the same message after modifying the message if necessary.  In
+ * synchroneous mode, we simply delegate that message memory region to the
+ * other end until it notifies us with a reply that it is done working with
+ * it.  Returns 0 on success, or an error number.
  */
 int
 pthread_msg_put(pthread_port_t *port, pthread_msg_t *msg)
@@ -301,30 +312,45 @@ pthread_msg_put(pthread_port_t *port, pthread_msg_t *msg)
                return error;
 
        DLIST_APPEND(&port->messages, (node_t *)msg);
-       if (port->ring != NULL && DLIST_NODES(&port->messages) == 1) {
+       if (port->ring != NULL) {
+               if (DLIST_NODES(&port->messages) == 1) {
+                       /*
+                        * We know that there previously were no messages,
+                        * and that the reading thread then waits for any
+                        * message to be available.  Signal it that there at
+                        * least is one message ready.  The other end should
+                        * normally process all available messages before
+                        * going back into waiting.
+                        */
+                       if ((error = pthread_mutex_lock(&port->ring->mutex))
+                           == 0) {
+                               port->ring->event = 1;
+                               (void) pthread_cond_signal(&port->ring->cond);
+                               (void) pthread_mutex_unlock(
+                                   &port->ring->mutex);
+                       }
+               }
                /*
-                * We know that there previously were no messages, and that
-                * the reading thread then waits for any message to be
-                * available. Signal it that there at least is one message
-                * ready. The other end should process all available messages
-                * before going back into waiting.
+                * If the other end, however, is already locked
+                * waiting for the ring to be notified while
+                * there already are messages, we still trigger mevent
+                * to cause it to unlock, however.  This behavior is
+                * useful in the polling system code, for instance.
                 */
-               if ((error = pthread_mutex_lock(&port->ring->mutex)) == 0) {
-                       port->ring->event = 1;
-                       (void) pthread_cond_signal(&port->ring->cond);
-                       (void) pthread_mutex_unlock(&port->ring->mutex);
-               }
+               /* XXX We don't use a mutex for now... should we? */
+               port->ring->mevent = 1;
        }
+
        (void) pthread_mutex_unlock(&port->lock);
 
        return error;
 }
 
 /*
- * Meant to be used in synchroneous message transfer mode. The initial sender
+ * Meant to be used in synchroneous message transfer mode.  The initial sender
  * sends a message to the other end, which then uses this function to notify
  * back the initial sender that it is done, often with a success/failure
- * result as part of the message. Returns 0 on success, or an error number.
+ * result as part of the message.  Returns 0 on success, or an error number.
  */
 int
 pthread_msg_reply(pthread_msg_t *msg)
index 32a015c..cd66e26 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mm_pthread_msg.h,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: mm_pthread_msg.h,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
 
 /*
  * Copyright (C) 2005, Matthew Mondor
@@ -57,6 +57,7 @@ typedef struct {
        pthread_mutex_t mutex;
        int             mode;
        int             event;
+       int             mevent;
 } pthread_ring_t;
 
 enum pthread_ring_modes {
index 2d06ee3..80abf60 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mm_pthread_poll.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: mm_pthread_poll.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
 
 /*
  * Copyright (C) 2005, Matthew Mondor
  */
 
 /*
- * I consider this code to be a major hack around the inherent problems UNIX
+ * I consider this code to be a major hack around the inherent problems unix
  * systems face because of the lack of support for filedescriptor polling in
- * the POSIX threads API. Although pthread defines methods for thread
+ * the POSIX threads API.  Although pthread defines methods for thread
  * synchronization and polling waiting for events (using conditionnal
- * variables), and that UNIX provides polling on filedescriptors using
+ * variables), and that unix provides polling on filedescriptors using
  * select(2), poll(2), kqueue(2) and other mechanisms, both are totally
  * distinct entities which can be considered to either conflict with
- * eachother or to not be related enough in a unified way. The current
+ * eachother or to not be related enough in a unified way.  The current
  * situation makes it almost impossible for a thread to both be polling for
  * interthread efficient messages implementations built upon pthread, and
  * filedescriptor events, concurrently.
  * The GNU PTH library implements non-standard functions which allow to
  * multiplex interthread messages and filedescriptor events, using for
  * instance pth_poll_ev(), pth_select_ev(), pth_accept_ev(), pth_connect_ev(),
- * etc. However, this is internally implemented using a single large select(2)
+ * etc.  However, this is internally implemented using a single large select(2)
  * based loop along with a slow large loop looking for non-fd events based on
- * the principles of libevent. This threading library has other disadventages,
+ * the principles of libevent.  This threading library has other disadventages,
  * such as not providing a preemptive scheduler (being a fully userspace
  * implementation) and not allowing to scale to multiple processors on SMP
- * systems. This interface however shows how good the POSIX threads API could
- * have been, if it was better designed with UNIX systems in mind. This
+ * systems.  This interface however shows how good the POSIX threads API could
+ * have been, if it was better designed with unix systems in mind.  This
  * library also being the most portable threads library alternative for quite
  * some time, because of the fact that Operating Systems implemented POSIX
  * threads inconsistently, or not at all, caused us to use PTH during some
  * time to develop software in cases where a pool of processes was not ideal
  * because of the frequency of shared memory synchronization needs.
  *
- * With the advent of POSIX threads implementations on more UNIX and UNIX-like
+ * With the advent of POSIX threads implementations on more unix and unix-like
  * systems and of modern implementations behaving more consistently, which can
  * scale on SMP systems and provide preemptive scheduling, it was considered
  * worthwhile for us to adapt our software again to use the standard POSIX
- * API. Especially considering that NetBSD which had no OS provided threads
+ * API.  Especially considering that NetBSD which had no OS provided threads
  * implementation for applications now has an awesome pthreads implementation
  * starting with version 2.0. However, we encountered difficulties with some
  * software which used the complex multiplexing of thread events and
- * filedescriptor ones. This module provides a solution to port this software.
+ * filedescriptor ones.  This module provides a solution to port this software.
  * It however is somewhat a hack.
  *
- * The downsides of this implementation are as follows. We originally intended
+ * The downsides of this implementation are as follows.  We originally intended
  * to develop a system which would scale among an increasing number of threads
  * in a ready pool of threads, scaling with concurrency of the polling calls.
  * This however proved difficult, or impossible to achieve, the main reasons
  * being that 1) A signal delivered to a process is only received by a random
- * thread that is not blocking it. 2) In the case where multiple threads are
+ * thread that is not blocking it.  2) In the case where multiple threads are
  * polling on a common file descriptor, similarily only one random thread
- * is awaken. 3) pthread_cond_signal() and pthread_cond_broadcast() cannot
- * wake threads waiting in filedescriptor polling. 4) to achieve what we
+ * is awaken.  3) pthread_cond_signal() and pthread_cond_broadcast() cannot
+ * wake threads waiting in filedescriptor polling.  4) to achieve what we
  * needed, two descriptors would have been necessary per notification ring.
- * this was considered an aweful solution and was promptly rejected. 5) The
+ * this was considered an aweful solution and was promptly rejected.  5) The
  * POSIX API does not define a way for a process to set or change the signal
  * blocking masks of other threads on the fly.
  *
  * Our solution then had to rely on a main descriptor polling manager thread
  * which would be used to poll file descriptors, and would as a device serve
- * client threads via efficient interthread messages. An issue still arises
+ * client threads via efficient interthread messages.  An issue still arises
  * when a client thread sends a message to the polling thread to add new
  * descriptors for polling or to cancel polling and remove descriptors.
  * The polling thread must be able to immediately process these events
- * awaking from filedescriptor polling. Two possible hacks could be used for
- * this. 1) Use an AF_LOCAL SOCK_DGRAM socketpair(), which one side would
+ * awaking from filedescriptor polling.  Two possible hacks could be used for
+ * this.  1) Use an AF_LOCAL SOCK_DGRAM socketpair(), which one side would
  * be used to trigger an event writing some data, and the other side always
- * included by the polling thread within the set of descriptors. 2) Send a
+ * included by the polling thread within the set of descriptors.  2) Send a
  * signal to the process which only the polling thread is not blocking,
  * to ensure that it be the one catching it, as such to awake from polling
- * with an EINTR error code. This second solution was considered more elegant
- * and is used as the basis of this implementation. We currently are
+ * with an EINTR error code.  This second solution was considered more elegant
+ * and is used as the basis of this implementation.  We currently are
  * clubbering the SIGUSR2 signal to achieve this.
  */
 
-/*
- * XXX Use pthread_runonce stuff where needed, or use proper synchronization.
- */
-
 
 
 #include <sys/types.h>
 #include <poll.h>
 #include <signal.h>
 #include <unistd.h>
+#include <stdio.h>
+#include <string.h>
 
 #include <mmtypes.h>
 #include <mmlog.h>
 
 MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mm_pthread_poll.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $");
+MMRCSID("$Id: mm_pthread_poll.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
 
 
 
 /*
  * Synchroneous communications message between arbitrary threads and the
- * polling thread. Since communication is synchroneous, we only need to
- * allocate one such message per thread. We are always expecting a reply
- * back afer sending a query.
+ * polling thread.  Since communication is synchroneous, we only need to
+ * allocate one such message per thread.  We are always expecting a reply
+ * back after sending a query before reusing the buffer.  In fact, the
+ * message passing system only serves as a means for synchronization around
+ * the message, which is a shared memory object.
  */
 struct poll_msg {
        pthread_msg_t   msgnode;
@@ -156,7 +156,7 @@ struct poll_msg {
 
 /*
  * An index is maintained of descriptor number -> poll_msg_index
- * structures. Each of wich has information on the message the descriptor
+ * structures.  Each of wich has information on the message the descriptor
  * belongs to, and the index into the pollfd array so that it be easy to
  * efficiently do per-fd work.
  */
@@ -179,6 +179,7 @@ struct poll_data {
  * Static functions prototypes
  */
 static int             pthread_poll_proc_init(void);
+static void            pthread_poll_proc_init2(void);
 static int             pthread_poll_thread_init(struct poll_data **);
 static void            pthread_poll_thread_exit(void *);
 static void            *poll_thread(void *);
@@ -190,14 +191,14 @@ static void               poll_thread_sighandler(int);
  * Static process specific storage
  */
 static bool            pthread_poll_initialized = FALSE;
-static bool            pthread_poll_proc_initialized = FALSE;
+static pthread_once_t  pthread_poll_proc_initialized = PTHREAD_ONCE_INIT;
 static pthread_key_t   pthread_poll_proc_key;
 static pthread_ring_t  pthread_poll_thread_started_ring;
 static pthread_port_t  pthread_poll_thread_port;
 static pid_t           process_id;
 
 /*
- * Static global poll_thread storage. No synhronization is necessary when
+ * Static global poll_thread storage.  No synhronization is necessary when
  * using these, since only the polling thread does.
  */
 static struct poll_idx *poll_idx;
@@ -230,11 +231,22 @@ pthread_poll_proc_init(void)
                return errno;
 
        process_id = getpid();
-       pthread_poll_proc_initialized = TRUE;
 
        return error;
 }
 
+static void
+pthread_poll_proc_init2(void)
+{
+       int     error;
+
+       if ((error = pthread_poll_proc_init()) != 0) {
+               (void) fprintf(stderr, "pthread_poll_proc_init() - %s\n",
+                   strerror(error));
+               exit(EXIT_FAILURE);
+       }
+}
+
 static int
 pthread_poll_thread_init(struct poll_data **res)
 {
@@ -282,11 +294,11 @@ pthread_poll_thread_exit(void *specific)
 
 /*
  * Actual polling thread, with which we communicate using messages polling on
- * pthread_port_t and pthread_ring_t. This is the only thread that should be
+ * pthread_port_t and pthread_ring_t.  This is the only thread that should be
  * catching SIGUSR2 signals (used to wake us up and reiterate our main loop.
  * Note: Although less efficient than using kqueue(2) or libevent(3), after
  * discussion with 3s4i we settled to using poll(2) for now, which minimizes
- * OS dependencies as well as third party software dependencies. Because
+ * OS dependencies as well as third party software dependencies.  Because
  * pthread_poll_ring(2) is only sparsely used by our software (migrating from
  * using PTH library which provided pth_poll_ev()), and that we only provide
  * it small pollfd arrays, this implementation was considered to meet our
@@ -301,7 +313,8 @@ poll_thread(void *args)
        list_t          msg_list;
        register int    i;
 
-       /* This initialization shouldn't fail. If it did, it would be nice to
+       /*
+        * This initialization shouldn't fail.  If it did, it would be nice to
         * be able to simply panic eventually. XXX
         */
 
@@ -315,8 +328,8 @@ poll_thread(void *args)
 
        /*
         * Allocate an initial buffer size for our pollfd array as well as for
-        * our descriptor based index. We'll double these buffers as necessary
-        * at runtime.
+        * our descriptor based index.  We'll double these buffers as
+        * necessary at runtime.
         */
        poll_fds_size = 64;
        poll_fds = malloc(sizeof(struct pollfd) * poll_fds_size);
@@ -328,7 +341,7 @@ poll_thread(void *args)
        DLIST_INIT(&msg_list);
 
        /*
-        * Initialize message port and associated ring. The message port is
+        * Initialize message port and associated ring.  The message port is
         * module global, so that it be public to pthread_poll_ring().
         */
        (void) pthread_port_init(&pthread_poll_thread_port);
@@ -350,15 +363,15 @@ poll_thread(void *args)
                struct poll_msg *msg, *nextmsg;
 
                /*
-                * Get time of day in a rather high resolution. We need to
-                * do this to be able to evaluate timeouts later on. We
+                * Get time of day in a rather high resolution.  We need to
+                * do this to be able to evaluate timeouts later on.  We
                 * attempt to only require one time syscall per loop.
                 */
                (void) gettimeofday(&tv, NULL);
 
                /*
-                * Process any messages. We need to add the descriptors if
-                * they aren't already added. Also store yet unsatisfied
+                * Process any messages.  We need to add the descriptors if
+                * they aren't already added.  Also store yet unsatisfied
                 * request messages into a list.
                 */
                while ((msg = (struct poll_msg *)pthread_msg_get(
@@ -397,7 +410,7 @@ poll_thread(void *args)
                }
 
                /*
-                * Process timeouts. For request messages which timed out,
+                * Process timeouts.  For request messages which timed out,
                 * satisfy them immediately using ETIMEDOUT error.
                 * This also allows to evaluate which is the soonest to expire
                 * entry, which poll(2) will have to use as timeout.
@@ -430,10 +443,10 @@ poll_thread(void *args)
                }
 
                /*
-                * Perform polling. poll(2) for as much time as possible,
+                * Perform polling.  poll(2) for as much time as possible,
                 * although making sure to allow the soonest to expire query
-                * to stop polling. Next to expire entry time is in ttv and
-                * current time in tv. Calculate difference and convert to
+                * to stop polling.  Next to expire entry time is in ttv and
+                * current time in tv.  Calculate difference and convert to
                 * milliseconds.
                 */
                timersub(&ttv, &tv, &ttv);
@@ -490,10 +503,10 @@ poll_thread_attach_fds(struct poll_msg *msg)
                        continue;
 
                /*
-                * Grow index buffer if necessary. Either grow by doubling
+                * Grow index buffer if necessary.  Either grow by doubling
                 * size, or even more if necessary to hold index to fd.
                 * If we only grew to hold fd, we might need to realloc(3) too
-                * often. Take care to also NULL msg field of new entries.
+                * often.  Take care to also NULL msg field of new entries.
                 */
                if (poll_idx_size <= fd) {
                        struct poll_idx *idx;
@@ -518,7 +531,7 @@ poll_thread_attach_fds(struct poll_msg *msg)
                        goto err;
 
                /*
-                * Resize pollfd array if needed. Grow by doubling.
+                * Resize pollfd array if needed.  Grow by doubling.
                 */
                if (poll_fds_size <= poll_nfds) {
                        struct pollfd   *ptr;
@@ -552,7 +565,7 @@ err:
 }
 
 /*
- * Permits to disunite supplied pollfd set from the main set. Also sets the
+ * Permits to disunite supplied pollfd set from the main set.  Also sets the
  * revents fields of the supplied set to the ones of the main set.
  */
 static void
@@ -580,18 +593,18 @@ poll_thread_detach_fds(struct poll_msg *msg)
                msg->fds[i].revents = poll_fds[idx].revents;
 
                /*
-                * Unlink fd from the global set. The removal method is
+                * Unlink fd from the global set.  The removal method is
                 * simple; Take the last entry of the global set and move it
                 * over the current entry, updating index links, and lower
-                * the gobal nfds by one. If we're the last entry, simply
-                * remove it invalidating it's index entry and lowering the
+                * the gobal nfds by one.  If we're the last entry, simply
+                * remove it invalidating its index entry and lowering the
                 * global nfds.
                 */
                idx2 = --poll_nfds;
                if (idx != idx2) {
                        /*
                         * idx is new location of last entry, which is at
-                        * idx2. Copy idx2 one over to idx position.
+                        * idx2.  Copy idx2 one over to idx position.
                         */
                        poll_fds[idx].fd = poll_fds[idx2].fd;
                        poll_fds[idx].events = poll_fds[idx2].events;
@@ -627,10 +640,10 @@ poll_thread_sighandler(int sig)
  */
 
 /*
- * Must be called before launching any thread. Sets up the signal mask and
- * launches the dedicated poll slave thread. Important note: this system
+ * Must be called before launching any thread.  Sets up the signal mask and
+ * launches the dedicated poll slave thread.  Important note: this system
  * clobbers the SIGUSR2 signal, which the application can no longer use for
- * other purposes. The only solution to wake the thread manager thread from
+ * other purposes.  The only solution to wake the thread manager thread from
  * poll(2) is either to trigger an event through a dedicated filedescriptor,
  * or to send a signal to the process which only the polling thread allows.
  */
@@ -646,10 +659,10 @@ pthread_poll_init(void)
                return 0;
 
        /*
-        * First block SIGUSR2 signal in the parent. The reason why this must
+        * First block SIGUSR2 signal in the parent.  The reason why this must
         * be called before the application launches any thread is that
         * threads inherit the sigmask of their parent, and that all threads,
-        * but the polling thread, must block the signal. This ensures that
+        * but the polling thread, must block the signal.  This ensures that
         * only the wanted thread wakes up when a SIGUSR2 signal is received.
         * This way, we can interrupt the polling thread in poll(2), for
         * instance, and cause it to reiterate its main loop.
@@ -669,7 +682,7 @@ pthread_poll_init(void)
 
        /*
         * We may now launch the poll thread and wait for notification from it
-        * that it is ready to serve requests. We won't need to exit this
+        * that it is ready to serve requests.  We won't need to exit this
         * thread, so it can be launched in detached state.
         */
        if ((error = pthread_attr_init(&attr)) != 0)
@@ -691,11 +704,11 @@ pthread_poll_init(void)
 
 /*
  * poll(2) replacement which can also be awakened by a notification happening
- * on the specified ring. This for instance allows to process thread messages
- * as well as descriptor events. ETIMEDOUT is returned if timeout occurred
+ * on the specified ring.  This for instance allows to process thread messages
+ * as well as descriptor events.  ETIMEDOUT is returned if timeout occurred
  * before any event, or ECANCELED if a ring notification is the source of the
- * interruption. Returns the number of descriptors for which events were
- * received otherwise (which could be 0 in some cases). Other errors may be
+ * interruption.  Returns the number of descriptors for which events were
+ * received otherwise (which could be 0 in some cases).  Other errors may be
  * returned such as EINVAL, as for poll(2).
  */
 int
@@ -711,12 +724,13 @@ pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout,
 
        /*
         * Implicit process and thread specific initializations
-        * XXX Use pthread_once() here, or use a mutex around
-        * pthread_poll_proc_initialized
         */
-       if (!pthread_poll_proc_initialized)
-               if ((error = pthread_poll_proc_init()) != 0)
-                       return error;
+       if ((error =  pthread_once(&pthread_poll_proc_initialized,
+           pthread_poll_proc_init2)) != 0)
+               return error;
+       /*
+        * XXX Use a mutex or pthread_once() equivalent here too?
+        */
        if ((data = pthread_getspecific(pthread_poll_proc_key)) == NULL) {
                if ((error = pthread_poll_thread_init(&data)) != 0)
                        return error;
@@ -732,7 +746,7 @@ pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout,
        }
 
        /*
-        * Send query to polling thread. It is safe to simply reuse our
+        * Send query to polling thread.  It is safe to simply reuse our
         * message since we then expect a reply back and synchronize it.
         */
        data->msg.cancel = FALSE;
@@ -747,8 +761,8 @@ pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout,
        /*
         * Interrupt polling thread which may still be waiting in poll(2).
         * We do this by sending SIGUSR2 to the process, which only the
-        * polling thread is not blocking. This causes the thread to reiterate
-        * it's main loop, thus processing this message and going back to
+        * polling thread is not blocking.  This causes the thread to reiterate
+        * its main loop, thus processing this message and going back to
         * sleep in poll(2).
         */
        if (kill(process_id, SIGUSR2) != 0) {
@@ -757,15 +771,11 @@ pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout,
        }
 
        /*
-        * Wait until en event occurs and notifies our ring. An event could
+        * Wait until en event occurs and notifies our ring.  An event could
         * either be triggered by the poll request ending or by another
-        * interrupting event on the supplied ring.
-        * XXX hmm what would happen if the message was already available on
-        * the port before we have time to start waiting on the ring?
-        * Would the wait immediately return? I'm not so sure... If we looped
-        * first looking for a message to be available and then waiting if
-        * needed, we would need another method to detect that a foreing event
-        * occured.
+        * interrupting event on the supplied ring.  If a message is queued
+        * on the port between pthread_port_set_ring() and
+        * pthread_ring_wait(), the latter immediately returns.
         */
        if ((error = pthread_port_set_ring(&data->port, ring)) != 0) {
                errno = error;
@@ -778,10 +788,10 @@ pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout,
        if ((msg = (struct poll_msg *)pthread_msg_get(&data->port)) == NULL) {
                /*
                 * No message replied back from poll thread yet, this means
-                * that our ring was notified by another event. Cancel request
+                * that our ring was notified by another event.  Cancel request
                 * by sending event back with the cancel flag, and wait for
                 * reply message to occur (which will be the original request
-                * results we were waiting for). error field will be set to
+                * results we were waiting for).  error field will be set to
                 * ECANCELED by the poll thread.
                 */
                msg->cancel = TRUE;
@@ -810,8 +820,8 @@ pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout,
 
 /*
  * accept(2) replacement which can both observe a timeout and be interrupted
- * via pthread_ring_t events. Internally implemented using
- * pthread_poll_ring(). Will internally set the descriptor in non-blocking
+ * via pthread_ring_t events.  Internally implemented using
+ * pthread_poll_ring().  Will internally set the descriptor in non-blocking
  * mode if necessary, then reverting it to the mode it was supplied in.
  * Returns a new descriptor on success, or -1 on failure, in which case
  * errno will be set to either EINVAL, ETIMEDOUT, ECANCELLED, or others.
@@ -874,15 +884,15 @@ end:
 
 /*
  * connect(2) replacement which can both observe a timeout and be interrupted
- * via pthread_ring_t events. Internally implemented using
- * pthread_poll_ring(). Will internally set the descriptor in non-blocking
+ * via pthread_ring_t events.  Internally implemented using
+ * pthread_poll_ring().  Will internally set the descriptor in non-blocking
  * mode if necessary, then reverting it back to the mode it was supplied in.
  * Returns 0 on sucess, or an error number (EINVAL, ETIMEDOUT, ECANCELLED
- * or other). Timeout is in milliseconds, like for poll(2) and can be -1.
+ * or other).  Timeout is in milliseconds, like for poll(2) and can be -1.
  * For the application to know the actual connection status result, it should
  * poll until completion and verify the status using getsockopt(2) with
- * SOL_SOCKET level and SO_ERROR option. It can alternatively continue to call
- * this function in a loop until completion. Calling the function on an
+ * SOL_SOCKET level and SO_ERROR option.  It can alternatively continue to call
+ * this function in a loop until completion.  Calling the function on an
  * already connected socket will result in EISCONN.
  */
 int
index 481144c..6afd9df 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mm_pthread_pool.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: mm_pthread_pool.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
 
 /*
  * Copyright (C) 2004-2005, Matthew Mondor
@@ -35,8 +35,8 @@
 
 /*
  * Implementation of a pool of ready processes which adapts with concurrency
- * needs. These ready processes can serve requests passed through efficient
- * inter-thread messaging. mmpool(3) is used for the pool functionality.
+ * needs.  These ready processes can serve requests passed through efficient
+ * inter-thread messaging.  mmpool(3) is used for the pool functionality.
  */
 
 
@@ -51,7 +51,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2004-2005\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mm_pthread_pool.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $");
+MMRCSID("$Id: mm_pthread_pool.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
 
 
 
@@ -87,16 +87,16 @@ static pthread_ring_t               thread_started_ring;
 
 /*
  * Must be called to initialize the pthreads pool subsystem, before calling
- * any other function of this API. Returns 0 on success, or an error number.
+ * any other function of this API.  Returns 0 on success, or an error number.
  * <initial> threads are launched, and more will be launched in increments
- * of <initial> whenever necessary. These will also only be destroyed in
+ * of <initial> whenever necessary.  These will also only be destroyed in
  * decrements of <initial> whenever that many threads have not been in use for
  * some time, and a minimum of <initial> threads will always be kept.
  * Setting <initial> to high values may actually degrade performance with some
- * unefficient threading implementations. It is not recommended to use more
- * than 8 using the pth(3) library. Using NetBSD 2.0+ SA threads, a high
- * number does not reduce performance. We current do not observe any limit
- * whatsoever according to the number of threads launched over time. It is the
+ * unefficient threading implementations.  It is not recommended to use more
+ * than 8 using the pth(3) library.  Using NetBSD 2.0+ SA threads, a high
+ * number does not reduce performance.  We current do not observe any limit
+ * whatsoever according to the number of threads launched over time.  It is the
  * application's responsibility to ensure to observe decent concurrency limits
  * before calling pthread_object_call().
  */
@@ -120,7 +120,7 @@ pthread_object_init(int initial)
 
        /*
         * We use this ring to obtain notification of ready children when
-        * launching them. This is required for proper synchronization to
+        * launching them.  This is required for proper synchronization to
         * avoid aweful race conditions.
         */
        if ((error = pthread_ring_init(&thread_started_ring)) != 0)
@@ -137,7 +137,7 @@ pthread_object_init(int initial)
        }
 
        /*
-        * Now initialize the threads pool. This creates threads, uses
+        * Now initialize the threads pool.  This creates threads, uses
         * synchronization with thread_started_ring, and uses the message
         * subsystem, which all must be initialized and ready.
         */
@@ -213,18 +213,20 @@ pthread_object_msg_free(pthread_object_msg_t *msg)
 
 /*
  * Allows to invoke a thread of the pool to perform execution of the wanted
- * function. This is very efficient since the threads are already created and
- * are waiting for requests. There is no maximum concurrency limit enforced by
+ * function.  This is very efficient since the threads are already created and
+ * are waiting for requests.  There is no maximum concurrency limit enforced by
  * this system; It is the responsibility of the application to restrict
  * concurrency as necessary by keeping internal information on the current
- * number of requests. 0 is returned on success, or an error number.
- * XXX Add support for synchroneous and asynchroneous operation. Current
+ * number of requests.  0 is returned on success, or an error number.
+ * XXX Add support for synchroneous and asynchroneous operation.  Current
  * operation is only asynchroneous, but we would like to add a boolean here to
- * decide. We also could add back the result value of the thread function
+ * decide.  We also could add back the result value of the thread function
  * which would only be useful in synchroneous operation, when we are waiting
- * until the task ends... Of course, it's still easy for applications to use
+ * until the task ends...  Of course, it's still easy for applications to use
  * these in a synchroneous manner, by using a message and/or ring,
  * conditionnal variable, etc.
+ * Also evaluate if a callback function to be called to notify end of
+ * asynchroneous operation would be useful.
  */
 int
 pthread_object_call(pthread_port_t **port,
@@ -241,10 +243,10 @@ pthread_object_call(pthread_port_t **port,
 
        /*
         * Allocate a thread from the pool to reserve it, and tell it to call
-        * a function via a message. The message cannot be on the stack in
+        * a function via a message.  The message cannot be on the stack in
         * this case, since it holds arguments to be passed to a thread, and
         * also consists of an asynchroneous message for wich we do not expect
-        * a response back, waiting for it. We just dispatch it and go on.
+        * a response back, waiting for it.  We just dispatch it and go on.
         */
        if ((obj = thread_object_alloc()) == NULL) {
                error = ENOMEM;
@@ -286,7 +288,7 @@ err:
  */
 
 /*
- * Internally used to allocate a ready thread from the pool
+ * Internally used to allocate a ready thread from the pool.
  */
 inline static pthread_object_t *
 thread_object_alloc(void)
@@ -303,7 +305,7 @@ thread_object_alloc(void)
 
 /*
  * Internally used to free a no longer needed thread back to the pool of ready
- * threads
+ * threads.
  */
 inline static void
 thread_object_free(pthread_object_t *obj)
@@ -316,7 +318,7 @@ thread_object_free(pthread_object_t *obj)
 }
 
 /*
- * Internally called by mmpool(3) to create a thread object
+ * Internally called by mmpool(3) to create a thread object.
  */
 static bool
 thread_object_constructor(pnode_t *pnode)
@@ -332,8 +334,8 @@ thread_object_constructor(pnode_t *pnode)
                return FALSE;
 
        /*
-        * Wait until new thread ready notification. Without this, at least
-        * with NetBSD 2.0 SA threads, hell would break loose. Thread creation
+        * Wait until new thread ready notification.  Without this, at least
+        * with NetBSD 2.0 SA threads, hell would break loose.  Thread creation
         * isn't really a bottleneck in our case anyways, since we only need
         * to do it when all threads of the pool are already busy.
         */
@@ -343,7 +345,7 @@ thread_object_constructor(pnode_t *pnode)
 }
 
 /*
- * Internally called by mmpool(3) to destroy a thread object
+ * Internally called by mmpool(3) to destroy a thread object.
  */
 static void
 thread_object_destructor(pnode_t *pnode)
@@ -352,9 +354,9 @@ thread_object_destructor(pnode_t *pnode)
        pthread_object_msg_t    *msg;
 
        /*
-        * To be freed, the thread has to be terminated. We thus send it a
+        * To be freed, the thread has to be terminated.  We thus send it a
         * quit message and then wait for it to exit using pthread_join().
-        * Note that we let the thread destroy the port field. Although we
+        * Note that we let the thread destroy the port field.  Although we
         * theoretically could use a message on the stack here, let's be safe.
         * Thread destruction is only performed rarely anyways, so this isn't
         * a performance problem.
@@ -367,11 +369,11 @@ thread_object_destructor(pnode_t *pnode)
 }
 
 /*
- * Actual thread's main loop. We create a message port and listen for command
- * messages (quit and call). When we obtain a quit request, we destroy the
- * port and exit cleanly. The quit event can never occur during the execution
+ * Actual thread's main loop.  We create a message port and listen for command
+ * messages (quit and call).  When we obtain a quit request, we destroy the
+ * port and exit cleanly.  The quit event can never occur during the execution
  * of a call command, since it is only called on already freed thread nodes
- * (by mmpool(3) pool_free()). It is advized to applications which need to
+ * (by mmpool(3) pool_free()).  It is advized to applications which need to
  * obtain and use the port of the thread after thread_object_call() to only
  * send proper user messages, not system reserved ones.
  */
@@ -385,8 +387,8 @@ thread_object_main(void *args)
        pthread_object_msg_t    *msg;
 
        /*
-        * Create incomming our message port as well as its corresponding
-        * notification ring we can sleep on. Then advertize our port address.
+        * Create our incomming message port as well as its corresponding
+        * notification ring we can sleep on.  Then advertize our port address.
         * Ideally, we should somehow panic if any of this initialization
         * fails. XXX
         */
@@ -427,7 +429,7 @@ thread_object_main(void *args)
                        }
                        if (msg->command == PTHREAD_OBJ_CALL) {
                                /*
-                                * Request to execute a function. This means
+                                * Request to execute a function.  This means
                                 * that we were allocated/reserved first.
                                 */
                                msg->u.call.function(obj,
@@ -436,10 +438,10 @@ thread_object_main(void *args)
                                /*
                                 * Free/release us back, so that we be
                                 * available again to process further
-                                * requests. It is possible that freeing
+                                * requests.  It is possible that freeing
                                 * ourselves cause a PTHREAD_OBJ_QUIT message
                                 * to be queued soon on our port by the
-                                * destructor function. This is safe, since
+                                * destructor function.  This is safe, since
                                 * the destructor does not cause us to be
                                 * destroyed until it waits for us to have
                                 * ended cleanly using pthread_join().
index 9f8e96e..2fe2a00 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mm_pthread_sleep.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: mm_pthread_sleep.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
 
 /*
  * Copyright (C) 2005, Matthew Mondor
@@ -33,8 +33,6 @@
  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-/* XXX Use pthread_once() stuff or properly synchronize */
-
 
 
 #include <sys/types.h>
@@ -42,6 +40,8 @@
 #include <pthread.h>
 #include <errno.h>
 #include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
 
 #include <mm_pthread_msg.h>
 #include <mm_pthread_sleep.h>
 
 MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mm_pthread_sleep.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $");
+MMRCSID("$Id: mm_pthread_sleep.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
 
 
 
 static int             pthread_sleep_proc_init(void);
+static void            pthread_sleep_proc_init2(void);
 static int             pthread_sleep_thread_init(pthread_ring_t **);
 static void            pthread_sleep_thread_exit(void *);
 
 static pthread_key_t   pthread_sleep_proc_key;
-static bool            pthread_sleep_proc_initialized = FALSE;
+static pthread_once_t  pthread_sleep_proc_initialized = PTHREAD_ONCE_INIT;
 
 
 
@@ -72,11 +73,21 @@ pthread_sleep_proc_init(void)
            pthread_sleep_thread_exit)) != 0)
                return error;
 
-       pthread_sleep_proc_initialized = TRUE;
-
        return error;
 }
 
+static void
+pthread_sleep_proc_init2(void)
+{
+       int     error;
+
+       if ((error = pthread_sleep_proc_init()) != 0) {
+               (void) fprintf(stderr, "pthread_sleep_proc_init() - %s\n",
+                   strerror(error));
+               exit(EXIT_FAILURE);
+       }
+}
+
 static int
 pthread_sleep_thread_init(pthread_ring_t **res)
 {
@@ -112,9 +123,9 @@ pthread_sleep_thread_exit(void *specific)
 
        /*
         * Although NetBSD threads don't need this, some pthread
-        * implementations do. Some will crash for attempting to reference the
+        * implementations do.  Some will crash for attempting to reference the
         * already freed memory twice calling us again until we NULL the
-        * pointer for the data. Lame, but the POSIX standard was unclear
+        * pointer for the data.  Lame, but the POSIX standard was unclear
         * about this.
         */
        (void) pthread_setspecific(pthread_sleep_proc_key, NULL);
@@ -137,13 +148,12 @@ pthread_nanosleep(struct timespec *ts)
        /*
         * Process specific initialization if needed
         */
-       if (!pthread_sleep_proc_initialized) {
-               if ((error = pthread_sleep_proc_init()) != 0)
-                       return error;
-       }
+       if ((error = pthread_once(&pthread_sleep_proc_initialized,
+           pthread_sleep_proc_init2)) != 0)
+               return error;
        /*
         * Thread specific initialization if needed
-        * XXX Use pthread_once() here, or mutex around ring?
+        * XXX Use pthread_once() here too, or mutex around ring?
         */
        if ((ring = pthread_getspecific(pthread_sleep_proc_key)) == NULL) {
                if ((error = pthread_sleep_thread_init(&ring)) != 0)
@@ -160,8 +170,8 @@ pthread_nanosleep(struct timespec *ts)
        timespecadd(&its, ts, &its);
 
        /*
-        * We can finally sleep. We expect ETIMEDOUT to be the normal return
-        * value in this case, which we convert to a no-error. Other errors
+        * We can finally sleep.  We expect ETIMEDOUT to be the normal return
+        * value in this case, which we convert to a no-error.  Other errors
         * will be returned un changed.
         */
        if ((error = pthread_ring_wait(ring, &its)) == ETIMEDOUT)
@@ -172,7 +182,7 @@ pthread_nanosleep(struct timespec *ts)
 
 /*
  * Suspends the current thread for the duration specified into supplied
- * timeval. Returns 0 on success or an error number.
+ * timeval.  Returns 0 on success or an error number.
  */
 int
 pthread_microsleep(struct timeval *tv)
@@ -186,7 +196,7 @@ pthread_microsleep(struct timeval *tv)
 
 /*
  * Suspends execution of current thread for duration of specified
- * milliseconds. Returns 0 on success or an error number.
+ * milliseconds.  Returns 0 on success or an error number.
  */
 int
 pthread_millisleep(unsigned int ms)
index f7a5485..4c2f6b8 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: msg_test.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: msg_test.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
 
 /*
  * Copyright (C) 2005, Matthew Mondor
@@ -40,6 +40,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
+#include <stdarg.h>
 
 #include <mm_pthread_msg.h>
 #include <mm_pthread_pool.h>
 
 MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: msg_test.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $");
+MMRCSID("$Id: msg_test.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
 
 
 
-#define        THREADS                 8
-#define ROUNDS                 3
+#define        THREADS                 32
+#define ROUNDS                 8
 #define TIMEOUT                        1
+/*#define PRINTLOCK*/
 
 
 
@@ -66,13 +68,14 @@ struct message {
 
 
 
-int    main(void);
-void   threadfunc(pthread_object_t *, void *);
+int            main(void);
+static void    threadfunc(pthread_object_t *, void *);
+static void    printfunc(const char *, ...);
 
 
 
-pthread_port_t main_port;
-pthread_mutex_t        print_lock;
+static pthread_port_t  main_port;
+static pthread_mutex_t print_lock;
 
 
 
@@ -81,57 +84,59 @@ main(void)
 {
        pthread_ring_t  ring;
        struct message  *msg;
-       int             i;
+       int             i, err;
        int             threads_args[THREADS];
        struct timeval  tv;
        struct timespec ts, ts1;
 
-       pthread_mutex_init(&print_lock, NULL);
-
-       pthread_port_init(&main_port);
+       if ((err = pthread_mutex_init(&print_lock, NULL)) != 0) {
+               (void) printf("main() - stdout lock - %s\n", strerror(err));
+               exit(EXIT_FAILURE);
+       }
 
-       pthread_ring_init(&ring);
-       pthread_port_set_ring(&main_port, &ring);
+       if ((err = pthread_port_init(&main_port)) != 0 ||
+           (err = pthread_ring_init(&ring)) != 0 ||
+           (err = pthread_port_set_ring(&main_port, &ring)) != 0) {
+               printfunc("main() - initialization - %s\n", strerror(err));
+               exit(EXIT_FAILURE);
+       }
 
-       pthread_mutex_lock(&print_lock);
-       printf("Main: launching threads\n");
-       fflush(stdout);
-       pthread_mutex_unlock(&print_lock);
+       printfunc("Main: launching threads\n");
 
-       if (pthread_poll_init() != 0)
+       if ((err = pthread_poll_init()) != 0) {
+               printfunc("main() - pthread_poll_init() - %s\n",
+                   strerror(err));
                exit(EXIT_FAILURE);
-       if (pthread_object_init(16) != 0)
+       }
+
+       /*
+        * Initializes a poll of ready threads which can be dispatched
+        * functions to execute.
+        */
+       if ((err = pthread_object_init(THREADS + 1)) != 0) {
+               printfunc("main() - pthread_object_init() - %s\n",
+                   strerror(err));
                exit(EXIT_FAILURE);
+       }
 
+       /*
+        * Now dispatch a main reentrant function to many threads, without
+        * waiting for them to complete, in an asynchroneous manner.
+        * XXX Because of the way this works, the parent main thread should
+        * actually already be listening to messages...  We did create a port
+        * however, which should queue messages until we reach the main loop.
+        */
        for (i = 0; i < THREADS; i++) {
                threads_args[i] = i;
-               (void) pthread_object_call(NULL, threadfunc,
-                   &threads_args[i]);
+               if ((err = pthread_object_call(NULL, threadfunc,
+                   &threads_args[i])) != 0)
+                       printfunc("main() - pthread_object_call() - %s\n",
+                           strerror(errno));
        }
 
        ts1.tv_sec = TIMEOUT;
        ts1.tv_nsec = 0;
        for (;;) {
-               int     err;
-
-               /* Wait for any message(s) to be available */
-               pthread_mutex_lock(&print_lock);
-               printf("Main: Waiting for messages\n");
-               fflush(stdout);
-               pthread_mutex_unlock(&print_lock);
-
-               (void) gettimeofday(&tv, NULL);
-               TIMEVAL_TO_TIMESPEC(&tv, &ts);
-               timespecadd(&ts, &ts1, &ts);
-               if ((err = pthread_ring_wait(&ring, &ts)) != 0) {
-                       pthread_mutex_lock(&print_lock);
-                       printf("Main: pthread_ring_wait() - %s\n",
-                           strerror(err));
-                       fflush(stdout);
-                       pthread_mutex_unlock(&print_lock);
-                       break;
-               }
-
                /*
                 * Read messages as long as there are any, and reply to each
                 * of them in a synchroneous manner.
@@ -139,91 +144,121 @@ main(void)
                while ((msg = (struct message *)pthread_msg_get(&main_port))
                    != NULL) {
 
-                       pthread_mutex_lock(&print_lock);
-                       printf("Main: Received message %3d %3d\n",
-                           msg->id, msg->i);
-                       fflush(stdout);
-                       pthread_mutex_unlock(&print_lock);
+                       printfunc(
+                           "Main: Received message %d from thread #%d\n",
+                           msg->i, msg->id);
 
                        if ((err = pthread_msg_reply((pthread_msg_t *)msg))
-                           != 0) {
-                               pthread_mutex_lock(&print_lock);
-                               printf("Main: pthread_message_reply() - %s\n",
+                           != 0)
+                               printfunc(
+                                   "Main: pthread_message_reply() - %s\n",
                                    strerror(err));
-                               fflush(stdout);
-                               pthread_mutex_unlock(&print_lock);
-                       }
+               }
+
+               /*
+                * No more messages to process; Wait for any message(s) to be
+                * available.
+                * Note that there is special provision in the event where
+                * this loop first polling for new messages before processing
+                * them, which causes waiting for the ring to immediately
+                * return instead of actually waiting if any messages already
+                * have been sent.
+                */
+               printfunc("Main: Waiting for messages\n");
+
+               (void) gettimeofday(&tv, NULL);
+               TIMEVAL_TO_TIMESPEC(&tv, &ts);
+               timespecadd(&ts, &ts1, &ts);
+               if ((err = pthread_ring_wait(&ring, &ts)) != 0) {
+                       printfunc("Main: pthread_ring_wait() - %s\n",
+                           strerror(err));
+                       break;
                }
        }
 
-       pthread_mutex_destroy(&print_lock);
-       pthread_port_destroy(&main_port);
-       pthread_ring_destroy(&ring);
+       (void) pthread_mutex_destroy(&print_lock);
+       (void) pthread_port_destroy(&main_port);
+       (void) pthread_ring_destroy(&ring);
 
        return 0;
 }
 
-void
+static void
 threadfunc(pthread_object_t *obj, void *args)
 {
        int             id = *(int *)args;
-       int             i;
+       int             i, err;
        struct message  msg;
        pthread_port_t  rport;
        pthread_ring_t  rring;
 
-       pthread_port_init(&rport);
-       pthread_ring_init(&rring);
-       pthread_port_set_ring(&rport, &rring);
+       if ((err = pthread_port_init(&rport)) != 0 ||
+           (err = pthread_ring_init(&rring)) != 0 ||
+           (err = pthread_port_set_ring(&rport, &rring)) != 0 ||
+           (err = pthread_msg_init((pthread_msg_t *)&msg, &rport)) != 0) {
+               printfunc("threadfunc() - initialization - %s\n",
+                   strerror(err));
+               return;
+       }
 
-       pthread_msg_init((pthread_msg_t *)&msg, &rport);
        msg.id = id;
 
-       pthread_mutex_lock(&print_lock);
-       printf("Thread #%d started\n", id);
-       fflush(stdout);
-       pthread_mutex_unlock(&print_lock);
+       (void) printfunc("Thread #%d started\n", id);
 
        for (i = 0; i < ROUNDS; i++) {
-               int     err;
-
                /*
-                * Prepare and send synchronous message. For asynchronous
+                * Prepare and send synchronous message.  For asynchronous
                 * operation, we would need to allocate a message and to send
                 * it, and not expect a reply back immediately, even letting
-                * the other end free the message as necessary. In synchronous
+                * the other end free the message as necessary.  In synchronous
                 * mode we can use the same message over and over and share
                 * its memory area using proper send/reply methods for
                 * synchronization.
                 */
                msg.i = i;
                if ((err = pthread_msg_put(&main_port, (pthread_msg_t *)&msg))
-                   != 0) {
-                       pthread_mutex_lock(&print_lock);
-                       printf("Main: pthread_message_put() - %s\n",
+                   != 0)
+                       printfunc("Thread: pthread_message_put() - %s\n",
                            strerror(err));
-                       fflush(stdout);
-                       pthread_mutex_unlock(&print_lock);
-               }
 
                /* Now wait for synchronous reply and discard it */
                if ((err = pthread_ring_wait(&rring, NULL)) != 0) {
-                       pthread_mutex_lock(&print_lock);
-                       printf("Thread: pthread_ring_wait() - %s\n",
+                       printfunc("Thread: pthread_ring_wait() - %s\n",
                            strerror(err));
-                       fflush(stdout);
-                       pthread_mutex_unlock(&print_lock);
                        break;
                }
-               (void) pthread_msg_get(&rport);
+               if (pthread_msg_get(&rport) == NULL)
+                       printfunc("Thread: pthread_msg_get() == NULL!?\n");
+               printfunc("Thread #%d received reply message for %d\n",
+                   id, i);
        }
 
-       pthread_mutex_lock(&print_lock);
-       printf("Thread #%d ending\n", id);
-       fflush(stdout);
-       pthread_mutex_unlock(&print_lock);
+       printfunc("Thread #%d ending\n", id);
+
+       (void) pthread_port_destroy(&rport);
+       (void) pthread_ring_destroy(&rring);
+       (void) pthread_msg_destroy((pthread_msg_t *)&msg);
+}
 
-       pthread_port_destroy(&rport);
-       pthread_ring_destroy(&rring);
-       pthread_msg_destroy((pthread_msg_t *)&msg);
+static void
+printfunc(const char *fmt, ...)
+{
+       char    buf[1024];
+       va_list arg_ptr;
+       int     len;
+
+       *buf = '\0';
+       va_start(arg_ptr, fmt);
+       if ((len = vsnprintf(buf, 1023, fmt, arg_ptr)) < 1)
+               return;
+       va_end(arg_ptr);
+
+#ifdef PRINTLOCK
+       (void) pthread_mutex_lock(&print_lock);
+#endif
+       (void) fwrite(buf, len, 1, stdout);
+#ifdef PRINTLOCK
+       (void) fflush(stdout);
+       (void) pthread_mutex_unlock(&print_lock);
+#endif
 }