Importing in main tree this library which used to be in tests tree
authorMatthew Mondor <mmondor@pulsar-zone.net>
Tue, 13 Mar 2007 19:37:26 +0000 (19:37 +0000)
committerMatthew Mondor <mmondor@pulsar-zone.net>
Tue, 13 Mar 2007 19:37:26 +0000 (19:37 +0000)
15 files changed:
mmsoftware/pthread_util/GNUmakefile [new file with mode: 0644]
mmsoftware/pthread_util/README [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_debug.h [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_msg.c [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_msg.h [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_poll.c [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_poll.h [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_pool.c [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_pool.h [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_sleep.c [new file with mode: 0644]
mmsoftware/pthread_util/mm_pthread_sleep.h [new file with mode: 0644]
mmsoftware/pthread_util/tests/msg_test.c [new file with mode: 0644]
mmsoftware/pthread_util/tests/poll_test.c [new file with mode: 0644]
mmsoftware/pthread_util/tests/polltest.c [new file with mode: 0644]
mmsoftware/pthread_util/tests/sigtest.c [new file with mode: 0644]

diff --git a/mmsoftware/pthread_util/GNUmakefile b/mmsoftware/pthread_util/GNUmakefile
new file mode 100644 (file)
index 0000000..7d89656
--- /dev/null
@@ -0,0 +1,35 @@
+# $Id: GNUmakefile,v 1.1 2007/03/13 19:37:22 mmondor Exp $
+
+MMLIB_PATH := ../../mmlib
+MMLIBS := $(addprefix ${MMLIB_PATH}/,mmlog.o mmpool.o mmstring.o)
+OBJS := mm_pthread_msg.o mm_pthread_sleep.o mm_pthread_pool.o mm_pthread_poll.o
+BINS := tests/msg_test tests/poll_test
+
+CFLAGS += -Wall
+#CFLAGS += -DDEBUG -DPTHREAD_DEBUG -g3
+
+LDFLAGS += -lc -lpthread
+#LDFLAGS += -lpthread_dbg
+
+
+all: $(BINS)
+
+
+%.o: %.c
+       cc -c ${CFLAGS} -I. -I$(MMLIB_PATH) -o $@ $<
+
+
+tests/msg_test: tests/msg_test.o $(MMLIBS) $(OBJS)
+       cc ${CFLAGS} -o $@ $@.c $(OBJS) -I. -I$(MMLIB_PATH) ${LDFLAGS} \
+               $(MMLIBS)
+
+tests/poll_test: tests/poll_test.o $(MMLIBS) $(OBJS)
+       cc ${CFLAGS} -o $@ $@.c $(OBJS) -I. -I$(MMLIB_PATH) ${LDFLAGS} \
+               $(MMLIBS)
+
+
+install: all
+
+
+clean:
+       rm -f tests/msg_test.o tests/poll_test.o $(BINS) $(OBJS) $(MMLIBS)
diff --git a/mmsoftware/pthread_util/README b/mmsoftware/pthread_util/README
new file mode 100644 (file)
index 0000000..8054ac7
--- /dev/null
@@ -0,0 +1,197 @@
+This library is an attempt to provide pth library like API to NetBSD SA
+threads and kqueue.
+
+What we find are missing from the POSIX standard are added here:
+
+- Implementation of efficient messages to communicate among threads. These
+  messages are queued using an efficient pointer linking mechanism. It must be
+  possible for a thread to wait for messages while sleeping and to be awaken
+  when a message is available. It also must be possible to observe a maximum
+  timeout to wait for.
+- Implementation of filedescriptors and above mentionned thread messages
+  notification multiplexing, with support for timer. An example of this is
+  pth library's pth_poll_ev(). A timer event can interrupt thread-safe
+  filedescriptor polling, as well as thread messages arriving on a port.
+
+We beleive that it is possible to implement this using the kqueue(2)/kevent(2)
+system. The new call would be similar to:
+
+pthread_poll(struct pollfd *fds, int nfds,
+       struct pthread_port *ports, int nports,
+       struct pthread_sigs *sigs, int nsigs,
+       struct pthread_timers *timers, int ntimers)
+
+or similar system. This would allow multiplexing of various events into a
+single application loop.
+
+
+pthread_cond_timedwait() seems especially useful, either with a signal handler
+or perhaps using kqueue concurrently... pthread_cond_timedwait() will allow
+processes to wait for message arrival though a port, while
+pthread_cond_signal() or pthread_cond_broadcast() will be able to awaken them
+as messages are queued to the message port. However, we would ideally want to
+only signal a wanted thread waiting for a port... But, normally only one
+thread should be listening for messages on any given port. I have to see what
+I'll do for a thread listening for messages on multiple ports at a time...
+Perhaps that multiple ports could use the same conditional wait variable so
+that the process would only wait on that one, and then verify the message
+queue for each before going back in waiting mode.
+
+
+TODO:
+====
+- Replace mutex and conditonal variable initializers, as well as attributes,
+  with static initializers.
+- Provide similar static intializer macros as part of our API where possible.
+- I have a working message passing implementation, with possibility of a
+  waiter on as many ports as wanted. I however still have a challenge:
+  Multiplex system calls such as select(2), poll(2), connect(2) and accept(2)
+  with the messaging capability. One must be able to cause the other to
+  return. This could be tricky to properly implement. Maybe think about the
+  following ideas:
+  - Dedicate a thread to serve a syscall, with which communication is solely
+    done using messages. This however implies that only a single syscall at a
+    time can be processed by such a thread. This probably means that a pool of
+    such threads would become necessary. This also assumes that the syscall in
+    question do not block the whole process, but only the intended thread.
+    Alot of assumptions, but this would now work properly on all BSDs and
+    on Linux. Possibly also on Solaris.
+  - Use a mix of signals and syscalls, since signals can interrupt syscalls.
+    However, this implies adding capability in our message system to trigger
+    signals rather than only using a conditional variable to notify of message
+    arrival. This also probably means that the same signal handler must be
+    shared by the whole process, that is, all the threads.
+  - Use kqueue in a thread-safe manner with thread-specific signals (if
+    possible). kqueue can be used to track signals without the need for an
+    actual signal handler. It would also track filedescriptor changes at the
+    same time. This also probably means that we need to use kqueue user
+    events if possible, triggered from the message passing system. It also
+    means non-portable code outside of the realm of BSD systems.
+
+
+
+RECENT REVIEW AFTER SOME REFLECTION
+===================================
+
+Currently, pth_accept_ev() and pth_connect_ev() are the only two cases of
+special PTh functions which my software uses, notably mmftpd(8). These could
+easily be implemented using a random thread in the pool whenever necessary,
+with which communication would entirely use messages only. This thread could
+be told: Perform syscall in non-blocking mode using the supplied
+filedescriptor and notify me weither it succeeded, failed because of a timeout,
+if any, or was interrupted by a message event occuring on the specified ring,
+if any. The application however has to know that if it was interrupted by
+an event, the connection still occurs asynchroneously within the system.
+We should verify what could be done to cancel a not yet completed connection,
+if possible. This call could also report if the call was interrupted by a
+signal arrival (EINTR), optionally. If the socket was supplied in blocking
+mode, it would have to be switched to non-blocking mode by the system and
+then back into blocking mode. The caller could ensure to set it into
+non-blocking mode for enhanced performance if no blocking mode is required.
+The challenge would be finding a both efficient and portable solution to
+have select()/poll() awake upon reception of notification events on a ring.
+Perhaps that a global filedescriptor could be used for this, SOCK_DGRAM and
+one byte sent, or that a signal handler with a signal generation should be
+used... Both methods would probably awake the whole process, however.
+pthread_sigmask() could be used perhaps... I wouldn't want to have a special
+fd required for each ready thread of the pool, ideally.
+
+Implement:
+mm_pthread_io          pthread_poll_ev(), pthread_accept_ev(),
+                               pthread_connect_ev()
+mm_pthread_alarm       pthread_sleep(), etc.
+
+or maybe:
+
+mm_pthread_misc                For all of them
+
+Perhaps reimplement the system I worked on in mmserver(3) as well. This might
+be necessary for operations which really should be dedicated to a non-threaded
+process at occasions, and the subsystem should be available. It should probably
+use a pool using mmpool(3) as well, just like we are doing with threads.
+
+It would be interesting to implement better GC for mmpool(3)'s. Currently,
+pool_free() will discard pages which are no longer in use since some time,
+but the time cannot be linear, since it only accounts a certain number of
+calls made to it. It should instead be possible to use time intervals, and
+to let the application invoke the GC at wanted fixed intervals. This would
+allow to use time based average statistics rather than function call times
+based ones, without clobbering process or thread timers which the application
+might need. It simply has to provide its own and to call the GC function
+regularily.
+
+Hmm also, would be nice to be able to store the port_t pointer of the port
+which triggered notification on a ring_t, so that callers don't need to
+run through several ports attached on a ring... Maybe that it would be
+problematic however, since we can't guarantee atomicity between messages and
+messages processing, unless we kludged the whole thing with locks and lost
+efficiency. And because we only trigger notification to wakeup a waiting
+thread when a message is queued on an empty port, it's possible that the
+applicaton sleeps forever on a port if it didn't totally empty it, unless
+there was a way for the sleep function to immediately return if called on
+non-empty ports (as it's only alled on rings, and that rings don't have
+access to a list of ports in current implementation (only the ports can
+know which ring they are tied to)... I could implement something to have
+rings see their attached ports with a list, however. But this again means
+looping among ports to see if they're non-empty, heh, so why not let the
+application do it as they do now.
+
+
+
+IMPORTANT
+=========
+
+I did a test where multiple threads were polling on a single filedescriptor
+consisting of a socketpair, which other side was used to wake them up.
+Only one random thread would wake up.
+
+Using a signal to cause all threads to wake would not work either, because
+then again only a random thread will awake.
+
+It appears that the only way to ensure to wake wanted threads is using
+conditional variables and for them to only sleep on these.
+
+SIGIO possibility... threads would be sleeping on a conditional wait variable
+corresponding to the filedescriptor. For polling, the fd would be made in
+non-blocking I/O, with SIGIO sent to process. The fd and associated cond var
+would be added to a table. The SIGIO signal handler would need to check all
+fds in the set for possible I/O and awake corresponding threads waiting on
+cond var. A problem exists: How to check a filedescriptor for pending event?
+How to know if event is read or write, or hup, etc? Maybe using more ore less
+standard FION ioctls? poll/select with 0 timeout maybe, but that is still
+troublesome in terms of performance I beleive.
+
+fd     cond    interesting_events      occured_events?
+
+Hmm and what if a thread was allocated to start polling, and another wrapper
+thread wait for it sleeping on a cond var? Would it be sane to do this?
+When a timeout or message occurs however detected by the wrapper thread,
+how would we stop the other thread polling? We still have a problem.
+If we left pending polling threads, how would a future thread with successful
+polling on the same descriptor ever wake up, a random one would.
+Why does POSIX threads suck so much as to not provide any decent way to
+work with filedescriptors!? If at least I had pthread_signal() it would
+help. I could send a signal to interrupt the wanted thread when it was polling.
+Or if only there was a way to set the wanted signal mask for wanted processes
+as necessary, so that I would only have the wanted one process a particular
+signal I could send to interrupt it and then restore the masks, and do this
+somehow atomically. If POSIX had any of these requirements in mind while
+developing the standard, pthread_poll_condwait() or pthread_signal() would
+already exist anyways!
+
+
+HMM
+===
+
+A thread reserved for polling would seem best. We need to be able to interrupt
+that thread whenever needed using a signal, which all other processes must
+be blocking. We could use SIGIO, or SIGUSR2 for instance. That thread would
+process thread messages and go back to polling. It probably could handle
+timeouts as well, but this is probably not necessary. If it did, would
+probably free other threads from calling gettimeofday() too often. The thread
+has to remove the fd from the polling list when an event returned on it
+anyways, and so it could also send a reply message for timeout. It has to
+be interrupted anyways when a new fd is to be added, and this means that
+it could fix the poll timer before calling it each time to fit the soonest
+to expire fd... I could probably use kqueue too, or libevent in that thread
+to make it high performance as possible.
diff --git a/mmsoftware/pthread_util/mm_pthread_debug.h b/mmsoftware/pthread_util/mm_pthread_debug.h
new file mode 100644 (file)
index 0000000..a5ee1dc
--- /dev/null
@@ -0,0 +1,63 @@
+/* $Id: mm_pthread_debug.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2004-2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_DEBUG_H
+#define MM_PTHREAD_DEBUG_H
+
+
+
+#include <pthread.h>
+#include <syslog.h>
+
+
+
+#ifdef PTHREAD_DEBUG
+
+#define DEBUG_PTHREAD_ENTRY()  \
+       syslog(LOG_NOTICE, "> TID=%p FN=%s", pthread_self(), __func__)
+
+#define DEBUG_PTHREAD_EXIT()   \
+       syslog(LOG_NOTICE, "< TID=%p FN=%s", pthread_self(), __func__)
+
+#else
+#define DEBUG_PTHREAD_ENTRY()
+#define DEBUG_PTHREAD_EXIT()
+#endif
+
+
+
+#endif
diff --git a/mmsoftware/pthread_util/mm_pthread_msg.c b/mmsoftware/pthread_util/mm_pthread_msg.c
new file mode 100644 (file)
index 0000000..6deb30e
--- /dev/null
@@ -0,0 +1,466 @@
+/* $Id: mm_pthread_msg.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * It is almost a shame that POSIX did not define a standard API for
+ * inter-thread asynchroneous and synchroneous messaging.  So, here is my
+ * implementation.  Note that for asynchroneous operation it is recommended to
+ * use a memory pool such as mmpool(3) to allocate and free messages in an
+ * efficient way, in cases where messages will need to be sent to the other
+ * end without expecting a response back before the current function ends
+ * (in which case a message obviously can't be on the stack).
+ */
+
+
+
+#include <pthread.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include <mmtypes.h>
+#include <mmlog.h>
+
+#include <mm_pthread_debug.h>
+#include <mm_pthread_msg.h>
+/*#include <mm_pthread_poll.h>*/
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: mm_pthread_msg.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
+
+
+
+/*
+ * Allows to initialize a polling notification handle.  When attached to a
+ * port, a message arriving on an empty port causes the associated ring to
+ * wake the thread from pthread_ring_wait().
+ */
+int
+pthread_ring_init(pthread_ring_t *ring)
+{
+       int     error;
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(ring != NULL && ring->magic != PRING_MAGIC);
+
+       if ((error = pthread_cond_init(&ring->cond, NULL)) == 0) {
+               if ((error = pthread_mutex_init(&ring->mutex, NULL)) == 0) {
+                       ring->magic = PRING_MAGIC;
+                       ring->event = ring->mevent = 0;
+                       DEBUG_PTHREAD_EXIT();
+                       return 0;
+               }
+               (void) pthread_cond_destroy(&ring->cond);
+       }
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Returns TRUE if the supplied ring is a valid/usable one, or FALSE
+ * otherwise.  Useful to conditionally destroy it.
+ */
+int
+pthread_ring_valid(pthread_ring_t *ring)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+
+       DEBUG_PTHREAD_EXIT();
+       return (ring != NULL && ring->magic == PRING_MAGIC);
+}
+
+/*
+ * Destroys a ring.  Note that all message ports attached to this ring should
+ * first be detached or destroyed.
+ */
+int
+pthread_ring_destroy(pthread_ring_t *ring)
+{
+       int     error;
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC);
+
+       if ((error = pthread_mutex_destroy(&ring->mutex)) == 0)
+               error = pthread_cond_destroy(&ring->cond);
+       ring->magic = 0;
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Causes the current thread to sleep until a message arrives on an empty port
+ * associated with this ring.  In normal operation, a thread only goes in wait
+ * mode after it processed all queued messages on all interesting ports.
+ * However, provision is made so that a the function returns immediately if
+ * messages already were received on a port attached to this ring since the
+ * last call to pthread_ring_wait().
+ * Although using such an absolute time timespec might be disadvantageous for
+ * the API compared to a timeout in milliseconds for instance, this was chosen
+ * to remain API-compatible with pthread_cond_timedwait(), and upwards
+ * compatible with systems where nanosecond precision can be achieved.
+ */
+int
+pthread_ring_wait(pthread_ring_t *ring, const struct timespec *abstime)
+{
+       int     error = 0;
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC);
+
+       /* We must hold the condition variable's mutex */
+       if (pthread_mutex_lock(&ring->mutex) != 0) {
+               error = -1;
+               goto err;
+       }
+
+       /* As long as we don't have confirmation that we must stop waiting */
+       for (ring->event = 0; ring->mevent == 0 &&
+           !ring->event && error == 0; ) {
+               /*
+                * Wait on conditional variable, which will automatically
+                * and atomically release the mutex and return with the mutex
+                * locked again, as soon as the conditional variable gets
+                * signaled.
+                */
+               if (abstime != NULL) {
+                       error = pthread_cond_timedwait(&ring->cond,
+                           &ring->mutex, abstime);
+               } else
+                       error = pthread_cond_wait(&ring->cond, &ring->mutex);
+       }
+       ring->mevent = 0;
+
+       /*
+        * And we know that conditional waiting functions returned with mutex
+        * locked, so now release it back.
+        */
+       (void) pthread_mutex_unlock(&ring->mutex);
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Allows to wake up waiter(s) on the specified ring, which are sleeping
+ * threads within pthread_ring_wait().  This can be used to simulate the
+ * arrival of a message on an empty port.  Also useful to use rings as a
+ * notification system only when no message passing is needed.
+ */
+int
+pthread_ring_notify(pthread_ring_t *ring)
+{
+       int     error;
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC);
+
+       if ((error = pthread_mutex_lock(&ring->mutex)) == 0) {
+               ring->mevent++;
+               ring->event = 1;
+               (void) pthread_cond_signal(&ring->cond);
+               (void) pthread_mutex_unlock(&ring->mutex);
+       }
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Allows to initialize/create a message port.
+ */
+int
+pthread_port_init(pthread_port_t *port)
+{
+       int     error;
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(port != NULL && port->magic != PPORT_MAGIC);
+
+       if ((error = pthread_mutex_init(&port->lock, NULL)) != 0)
+               goto err;
+
+       port->magic = PPORT_MAGIC;
+       port->ring = NULL;
+       DLIST_INIT(&port->messages);
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Returns TRUE if the supplied port is valid/usable, or FALSE otherwise.
+ * Useful to conditionally destroy a port, for instance.
+ */
+int
+pthread_port_valid(pthread_port_t *port)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+
+       DEBUG_PTHREAD_EXIT();
+       return (port != NULL && port->magic == PPORT_MAGIC);
+}
+
+/*
+ * Destroys the specified port, previously created using pthread_port_init().
+ */
+int
+pthread_port_destroy(pthread_port_t *port)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC);
+
+       port->magic = 0;
+
+       DEBUG_PTHREAD_EXIT();
+       return pthread_mutex_destroy(&port->lock);
+}
+
+/*
+ * Attaches a port to a ring.  Multiple ports may be attached to a ring.  A
+ * message arriving on an empty port will cause the attached ring to be
+ * notified, if any, and as such to cause a thread waiting on the ring to
+ * be awakened.
+ */
+int
+pthread_port_set_ring(pthread_port_t *port, pthread_ring_t *ring)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC &&
+           (ring == NULL || ring->magic == PRING_MAGIC));
+
+       port->ring = ring;
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+}
+
+/*
+ * Allows to initialize a message before it can be sent over a port.  The
+ * message only needs to be initialized once in general, even if it will be
+ * used for bidirectional transmission for synchronous operation.  If the
+ * reply port needs to be changed, however, this function should be used again
+ * to set the new reply port.
+ */
+int
+pthread_msg_init(pthread_msg_t *msg, pthread_port_t *rport)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(msg != NULL && msg->magic != PMESG_MAGIC &&
+           (rport == NULL || rport->magic == PPORT_MAGIC));
+
+       msg->magic = PMESG_MAGIC;
+       msg->reply = rport;
+       msg->size = 0;
+       msg->message = NULL;
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+}
+
+/*
+ * Returns TRUE if supplied message is valid/usable or FALSE otherwise.
+ */
+int
+pthread_msg_valid(pthread_msg_t *msg)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+
+       DEBUG_PTHREAD_EXIT();
+       return (msg != NULL && msg->magic == PMESG_MAGIC);
+}
+
+/*
+ * Invalidates a message, so that it can no longer be sent over ports.
+ */
+int
+pthread_msg_destroy(pthread_msg_t *msg)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(msg != NULL && msg->magic == PMESG_MAGIC);
+
+       msg->magic = 0;
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+}
+
+/*
+ * If any message exists in the queue of the specified port, unqueues it and
+ * returns it.  Otherwise, NULL is returned.  In normal operation, all messages
+ * queued to a port are processed before putting the thread back into sleep,
+ * mainly for efficiency, but also because it eases synchronization.
+ */
+pthread_msg_t *
+pthread_msg_get(pthread_port_t *port)
+{
+       pthread_msg_t   *msg = NULL;
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC);
+
+       if (pthread_mutex_lock(&port->lock) != 0)
+               goto err;
+
+       if ((msg = DLIST_TOP(&port->messages)) != NULL) {
+               DEBUG_ASSERT(msg->magic == PMESG_MAGIC);
+               DLIST_UNLINK(&port->messages, (node_t *)msg);
+       }
+
+       (void) pthread_mutex_unlock(&port->lock);
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return (pthread_msg_t *)msg;
+}
+
+/*
+ * Queues the specified message to the specified port, returning 0 on success.
+ * Note that the message data is not copied or moved, but that a pointer
+ * system is used to queue the message.  Thus, the message's shared memory
+ * region is leased temporarily to the other end.  One has to be careful to
+ * not allocate this message space on the stack when asynchroneous operation
+ * is needed.  In synchroneous operation mode, it is not a problem, since the
+ * sender does not have to modify the data until the other end replies back
+ * with the same message after modifying the message if necessary.  In
+ * synchroneous mode, we simply delegate that message memory region to the
+ * other end until it notifies us with a reply that it is done working with
+ * it.  Returns 0 on success, or an error number.
+ */
+int
+pthread_msg_put(pthread_port_t *port, pthread_msg_t *msg)
+{
+       int     error = 0;
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC &&
+           msg != NULL && msg->magic == PMESG_MAGIC);
+
+       if ((error = pthread_mutex_lock(&port->lock)) != 0)
+               goto err;
+
+       DLIST_APPEND(&port->messages, (node_t *)msg);
+       if (port->ring != NULL) {
+               if (DLIST_NODES(&port->messages) == 1) {
+                       /*
+                        * We know that there previously were no messages,
+                        * and that the reading thread then waits for any
+                        * message to be available.  Signal it that there at
+                        * least is one message ready.  The other end should
+                        * normally process all available messages before
+                        * going back into waiting.
+                        */
+                       if ((error = pthread_mutex_lock(&port->ring->mutex))
+                           == 0) {
+                               port->ring->event = 1;
+                               (void) pthread_cond_signal(&port->ring->cond);
+                               (void) pthread_mutex_unlock(
+                                   &port->ring->mutex);
+                       }
+               }
+               /*
+                * If the other end, however, is already locked
+                * waiting for the ring to be notified while
+                * there already are messages, we still trigger mevent
+                * to cause it to unlock, however.  This behavior is
+                * useful in the polling system code, for instance.
+                */
+               /* XXX We don't use a mutex for now... */
+               port->ring->mevent++;
+       }
+
+       (void) pthread_mutex_unlock(&port->lock);
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Meant to be used in synchroneous message transfer mode.  The initial sender
+ * sends a message to the other end, which then uses this function to notify
+ * back the initial sender that it is done, often with a success/failure
+ * result as part of the message.  Returns 0 on success, or an error number.
+ */
+int
+pthread_msg_reply(pthread_msg_t *msg)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(msg != NULL && msg->magic == PMESG_MAGIC &&
+           msg->reply != NULL);
+
+       DEBUG_PTHREAD_EXIT();
+       return pthread_msg_put(msg->reply, msg);
+}
+
+/*
+ * Returns the number of pending messages tied to the port, if any, or -1
+ * on error.
+ */
+int
+pthread_port_pending(pthread_port_t *port)
+{
+       int     pending = -1;
+
+       DEBUG_PTHREAD_ENTRY();
+       DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC);
+
+       if (pthread_mutex_lock(&port->lock) != 0)
+               goto err;
+
+       pending = (int)DLIST_NODES(&port->messages);
+
+       (void) pthread_mutex_unlock(&port->lock);
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return pending;
+}
diff --git a/mmsoftware/pthread_util/mm_pthread_msg.h b/mmsoftware/pthread_util/mm_pthread_msg.h
new file mode 100644 (file)
index 0000000..d0d4282
--- /dev/null
@@ -0,0 +1,110 @@
+/* $Id: mm_pthread_msg.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_MSG_H
+#define MM_PTHREAD_MSG_H
+
+
+
+#include <pthread.h>
+
+#include <mmtypes.h>
+#include <mmlist.h>
+
+
+
+#define PRING_MAGIC    0x50524e47
+#define        PPORT_MAGIC     0x50505254
+#define PMESG_MAGIC    0x504d5347
+
+typedef struct {
+       u_int32_t       magic;
+       pthread_cond_t  cond;
+       pthread_mutex_t mutex;
+       int             mode;
+       int             event;
+       int             mevent;
+} pthread_ring_t;
+
+enum pthread_ring_modes {
+       PTHREAD_RMOD_NOWAIT,
+       PTHREAD_RMOD_CONDWAIT,
+       PTHREAD_RMOD_FDWAIT
+};
+
+typedef struct {
+       u_int32_t       magic;
+       pthread_ring_t  *ring;
+       pthread_mutex_t lock;
+       list_t          messages;
+} pthread_port_t;
+
+typedef struct {
+       node_t          node;
+       u_int32_t       magic;
+       pthread_port_t  *reply;
+       size_t          size;
+       void            *message;
+} pthread_msg_t;
+
+
+
+extern int             pthread_ring_init(pthread_ring_t *);
+extern int             pthread_ring_valid(pthread_ring_t *);
+extern int             pthread_ring_destroy(pthread_ring_t *);
+extern int             pthread_ring_wait(pthread_ring_t *,
+                           const struct timespec *);
+extern int             pthread_ring_notify(pthread_ring_t *);
+
+extern int             pthread_port_init(pthread_port_t *);
+extern int             pthread_port_valid(pthread_port_t *);
+extern int             pthread_port_destroy(pthread_port_t *);
+extern int             pthread_port_set_ring(pthread_port_t *,
+                           pthread_ring_t *);
+extern int             pthread_msg_init(pthread_msg_t *,
+                           pthread_port_t *);
+extern int             pthread_msg_valid(pthread_msg_t *);
+extern int             pthread_msg_destroy(pthread_msg_t *);
+extern pthread_msg_t   *pthread_msg_get(pthread_port_t *);
+extern int             pthread_msg_put(pthread_port_t *,
+                           pthread_msg_t *);
+extern int             pthread_msg_reply(pthread_msg_t *);
+extern int             pthread_port_pending(pthread_port_t *);
+
+
+
+#endif
diff --git a/mmsoftware/pthread_util/mm_pthread_poll.c b/mmsoftware/pthread_util/mm_pthread_poll.c
new file mode 100644 (file)
index 0000000..05778ee
--- /dev/null
@@ -0,0 +1,1116 @@
+/* $Id: mm_pthread_poll.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * I consider this code to be a major hack around the inherent problems unix
+ * systems face because of the lack of support for filedescriptor polling in
+ * the POSIX threads API.  Although pthread defines methods for thread
+ * synchronization and polling waiting for events (using conditionnal
+ * variables), and that unix provides polling on filedescriptors using
+ * select(2), poll(2), kqueue(2) and other mechanisms, both are totally
+ * distinct entities which can be considered to either conflict with
+ * eachother or to not be related enough in a unified way.  The current
+ * situation makes it almost impossible for a thread to both be polling for
+ * interthread efficient messages implementations built upon pthread, and
+ * filedescriptor events, concurrently.
+ *
+ * The GNU PTH library implements non-standard functions which allow to
+ * multiplex interthread messages and filedescriptor events, using for
+ * instance pth_poll_ev(), pth_select_ev(), pth_accept_ev(), pth_connect_ev(),
+ * etc.  However, this is internally implemented using a single large select(2)
+ * based loop along with a slow large loop looking for non-fd events based on
+ * the principles of libevent.  This threading library has other disadventages,
+ * such as not providing a preemptive scheduler (being a fully userspace
+ * implementation) and not allowing to scale to multiple processors on SMP
+ * systems.  This interface however shows how good the POSIX threads API could
+ * have been, if it was better designed with unix systems in mind.  This
+ * library also being the most portable threads library alternative for quite
+ * some time, because of the fact that Operating Systems implemented POSIX
+ * threads inconsistently, or not at all, caused us to use PTH during some
+ * time to develop software in cases where a pool of processes was not ideal
+ * because of the frequency of shared memory synchronization needs.
+ *
+ * With the advent of POSIX threads implementations on more unix and unix-like
+ * systems and of modern implementations behaving more consistently, which can
+ * scale on SMP systems and provide preemptive scheduling, it was considered
+ * worthwhile for us to adapt our software again to use the standard POSIX
+ * API.  Especially considering that NetBSD which had no OS provided threads
+ * implementation for applications now has an awesome pthreads implementation
+ * starting with version 2.0. However, we encountered difficulties with some
+ * software which used the complex multiplexing of thread events and
+ * filedescriptor ones.  This module provides a solution to port this software.
+ * It however is somewhat a hack.
+ *
+ * The downsides of this implementation are as follows.  We originally intended
+ * to develop a system which would scale among an increasing number of threads
+ * in a ready pool of threads, scaling with concurrency of the polling calls.
+ * This however proved difficult, or impossible to achieve, the main reasons
+ * being that 1) A signal delivered to a process is only received by a random
+ * thread that is not blocking it.  2) In the case where multiple threads are
+ * polling on a common file descriptor, similarily only one random thread
+ * is awaken.  3) pthread_cond_signal() and pthread_cond_broadcast() cannot
+ * wake threads waiting in filedescriptor polling.  4) to achieve what we
+ * needed, two descriptors would have been necessary per notification ring.
+ * this was considered an aweful solution and was promptly rejected.  5) The
+ * POSIX API does not define a way for a process to set or change the signal
+ * blocking masks of other threads on the fly.
+ *
+ * Our solution then had to rely on a main descriptor polling manager thread
+ * which would be used to poll file descriptors, and would as a device serve
+ * client threads via efficient interthread messages.  An issue still arises
+ * when a client thread sends a message to the polling thread to add new
+ * descriptors for polling or to cancel polling and remove descriptors.
+ * The polling thread must be able to immediately process these events
+ * awaking from filedescriptor polling.  Two possible hacks could be used for
+ * this.  1) Use an AF_LOCAL SOCK_DGRAM socketpair(), which one side would
+ * be used to trigger an event writing some data, and the other side always
+ * included by the polling thread within the set of descriptors.  2) Send a
+ * signal to the process which only the polling thread is not blocking,
+ * to ensure that it be the one catching it, as such to awake from polling
+ * with an EINTR error code.  This second solution was considered more elegant
+ * and is used as the basis of this implementation.  We currently are
+ * clubbering the SIGUSR2 signal to achieve this.
+ */
+
+
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <pthread.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <poll.h>
+#include <signal.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <mmtypes.h>
+#include <mmlog.h>
+#include <mmlist.h>
+
+#include <mm_pthread_debug.h>
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: mm_pthread_poll.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
+
+
+
+/*
+ * Synchroneous communications message between arbitrary threads and the
+ * polling thread.  Since communication is synchroneous, we only need to
+ * allocate one such message per thread.  We are always expecting a reply
+ * back after sending a query before reusing the buffer.  In fact, the
+ * message passing system only serves as a means for synchronization around
+ * the message, which is a shared memory object.
+ */
+struct poll_msg {
+       pthread_msg_t   msgnode;
+       /* Passed as parameters */
+       bool            cancel;
+       struct pollfd   *fds;
+       nfds_t          nfds;
+       int             timeout;
+       /* Returned as result */
+       int             ready, error;
+       /* Internally used */
+       struct timeval  expires;
+};
+
+/*
+ * An index is maintained of descriptor number -> poll_msg_index
+ * structures.  Each of wich has information on the message the descriptor
+ * belongs to, and the index into the pollfd array so that it be easy to
+ * efficiently do per-fd work.
+ */
+struct poll_idx {
+       int             idx;
+       struct poll_msg *msg;
+};
+
+/*
+ * Thread specific needed resources to use our special polling
+ */
+struct poll_data {
+       pthread_port_t  port;
+       struct poll_msg msg;
+};
+
+
+
+#define                        POLLWAKE()      do {                            \
+       pollingevents++;                                                \
+       if (polling != 0)                                               \
+               (void) kill(process_id, SIGUSR2);                       \
+} while (/* CONSTCOND */0)
+
+
+
+/*
+ * Static functions prototypes
+ */
+static int             pthread_poll_proc_init(void);
+static void            pthread_poll_proc_init2(void);
+static int             pthread_poll_thread_init(struct poll_data **);
+static void            pthread_poll_thread_exit(void *);
+static void            *poll_thread(void *);
+static int             poll_thread_attach_fds(struct poll_msg *);
+static void            poll_thread_detach_fds(struct poll_msg *);
+static void            poll_thread_sighandler(int);
+
+/*
+ * Static process specific storage
+ */
+static bool            pthread_poll_initialized = FALSE;
+static pthread_once_t  pthread_poll_proc_initialized = PTHREAD_ONCE_INIT;
+static pthread_key_t   pthread_poll_proc_key;
+static pthread_ring_t  pthread_poll_thread_started_ring;
+static pthread_port_t  pthread_poll_thread_port;
+static pid_t           process_id;
+static int             polling = 0;
+static int             pollingevents = 0;
+
+/*
+ * Static global poll_thread storage.  No synhronization is necessary when
+ * using these, since only the polling thread does.
+ */
+static struct poll_idx *poll_idx;
+static nfds_t          poll_idx_size;
+static struct pollfd   *poll_fds;
+static nfds_t          poll_fds_size;
+static nfds_t          poll_nfds;
+
+
+
+/*
+ * Static internal functions
+ */
+
+static int
+pthread_poll_proc_init(void)
+{
+       int                     error;
+       struct sigaction        act;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if ((error = pthread_key_create(&pthread_poll_proc_key,
+           pthread_poll_thread_exit)) != 0)
+               goto err;
+
+       act.sa_handler = poll_thread_sighandler;
+       act.sa_flags = 0;
+       (void) sigemptyset(&act.sa_mask);
+       (void) sigaddset(&act.sa_mask, SIGUSR2);
+       if (sigaction(SIGUSR2, &act, NULL) != 0) {
+               error = errno;
+               goto err;
+       }
+
+       process_id = getpid();
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+static void
+pthread_poll_proc_init2(void)
+{
+       int     error;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if ((error = pthread_poll_proc_init()) != 0) {
+               (void) fprintf(stderr, "pthread_poll_proc_init() - %s\n",
+                   strerror(error));
+               DEBUG_PTHREAD_EXIT();
+               exit(EXIT_FAILURE);
+       }
+
+       DEBUG_PTHREAD_EXIT();
+}
+
+static int
+pthread_poll_thread_init(struct poll_data **res)
+{
+       int                     error;
+       struct poll_data        *data;
+       sigset_t                set;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       (void) sigemptyset(&set);
+       (void) sigaddset(&set, SIGUSR2);
+       (void) pthread_sigmask(SIG_BLOCK, &set, NULL);
+
+       if ((data = malloc(sizeof(struct poll_data))) == NULL) {
+               error = ENOMEM;
+               goto err;
+       }
+
+       if ((error = pthread_port_init(&data->port)) != 0)
+               goto err;
+       if ((error = pthread_msg_init(&data->msg.msgnode, &data->port)) != 0)
+               goto err;
+
+       if ((error = pthread_setspecific(pthread_poll_proc_key, data)) != 0)
+               goto err;
+
+       *res = data;
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+
+err:
+       if (data != NULL) {
+               (void) pthread_port_destroy(&data->port);
+               free(data);
+       }
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+static void
+pthread_poll_thread_exit(void *specific)
+{
+       struct poll_data        *data = (struct poll_data *)specific;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       (void) pthread_port_destroy(&data->port);
+       (void) pthread_msg_destroy(&data->msg.msgnode);
+       free(data);
+
+       /*
+        * Some implementations need this
+        */
+       (void) pthread_setspecific(pthread_poll_proc_key, NULL);
+
+       DEBUG_PTHREAD_EXIT();
+}
+
+
+/*
+ * Actual polling thread, with which we communicate using messages polling on
+ * pthread_port_t and pthread_ring_t.  This is the only thread that should be
+ * catching SIGUSR2 signals (used to wake us up and reiterate our main loop.
+ * Note: Although less efficient than using kqueue(2) or libevent(3), after
+ * discussion with 3s4i we settled to using poll(2) for now, which minimizes
+ * OS dependencies as well as third party software dependencies.  Because
+ * pthread_poll_ring(2) is only sparsely used by our software (migrating from
+ * using PTH library which provided pth_poll_ev()), and that we only provide
+ * it small pollfd arrays, this implementation was considered to meet our
+ * needs using poll(2). This also met the requirements for Tact group.
+ */
+/* ARGSUSED */
+static void *
+poll_thread(void *args)
+{
+       sigset_t        set;
+       pthread_ring_t  ring;
+       list_t          msg_list;
+       register int    i;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       /*
+        * This initialization shouldn't fail.  If it did, it would be nice to
+        * be able to simply panic eventually. XXX
+        */
+
+       /*
+        * Create set for SIGUSR2 which we'll unblock/block
+        */
+       (void) sigemptyset(&set);
+       (void) sigaddset(&set, SIGUSR2);
+
+       /*
+        * Allocate an initial buffer size for our pollfd array as well as for
+        * our descriptor based index.  We'll double these buffers as
+        * necessary at runtime.
+        */
+       poll_fds_size = 64;
+       poll_fds = malloc(sizeof(struct pollfd) * poll_fds_size);
+       poll_nfds = 0;
+       poll_idx_size = 64;
+       poll_idx = malloc(sizeof(struct poll_msg) * poll_idx_size);
+       for (i = 0; i < poll_idx_size; i++)
+               poll_idx[i].msg = NULL;
+       DLIST_INIT(&msg_list);
+
+       /*
+        * Initialize message port and associated ring.  The message port is
+        * module global, so that it be public to pthread_poll_ring().
+        */
+       (void) pthread_port_init(&pthread_poll_thread_port);
+       (void) pthread_ring_init(&ring);
+       (void) pthread_port_set_ring(&pthread_poll_thread_port, &ring);
+
+       /*
+        * Notify parent that we're ready.
+        */
+       (void) pthread_ring_notify(&pthread_poll_thread_started_ring);
+
+       /*
+        * Main loop from which we never exit
+        */
+       for (;;) {
+               register int    n;
+               int             timeout;
+               struct timeval  tv, ttv;
+               struct poll_msg *msg, *nextmsg;
+
+               /*
+                * Get time of day in a rather high resolution.  We need to
+                * do this to be able to evaluate timeouts later on.  We
+                * attempt to only require one time syscall per loop.
+                */
+               (void) gettimeofday(&tv, NULL);
+
+               pollingevents = 0;
+
+               /*
+                * Process any messages.  We need to add the descriptors if
+                * they aren't already added.  Also store yet unsatisfied
+                * request messages into a list.
+                */
+               while ((msg = (struct poll_msg *)pthread_msg_get(
+                   &pthread_poll_thread_port)) != NULL) {
+                       if (msg->cancel) {
+                               /*
+                                * Immediately satisfy request on demand
+                                */
+                               msg->error = ECANCELED;
+                               DLIST_UNLINK(&msg_list, (node_t *)msg);
+                               poll_thread_detach_fds(msg);
+                               (void) pthread_msg_reply(&msg->msgnode);
+                               continue;
+                       }
+                       if (poll_thread_attach_fds(msg) == 0) {
+                               msg->ready = msg->error = 0;
+                               if (msg->timeout != -1) {
+                                       /*
+                                        * Convert millisecond timeout to an
+                                        * absolute time timeval
+                                        */
+                                       msg->expires.tv_sec = tv.tv_sec;
+                                       msg->expires.tv_usec = tv.tv_usec;
+                                       ttv.tv_sec = msg->timeout / 1000;
+                                       ttv.tv_usec = (msg->timeout % 1000)
+                                          * 1000;
+                                       timeradd(&msg->expires, &ttv,
+                                           &msg->expires);
+                               }
+                               DLIST_APPEND(&msg_list, (node_t *)msg);
+                       } else {
+                               msg->ready = 0;
+                               msg->error = EINVAL;
+                               (void) pthread_msg_reply(&msg->msgnode);
+                       }
+               }
+
+               /*
+                * Process timeouts.  For request messages which timed out,
+                * satisfy them immediately using ETIMEDOUT error.
+                * This also allows to evaluate which is the soonest to expire
+                * entry, which poll(2) will have to use as timeout.
+                */
+               ttv.tv_sec = ttv.tv_usec = 99999;
+               for (msg = DLIST_TOP(&msg_list); msg != NULL; msg = nextmsg) {
+                       nextmsg = DLIST_NEXT(msg);
+
+                       if (msg->timeout == -1)
+                               continue;
+                       if (timercmp(&msg->expires, &tv, <)) {
+                               msg->error = ETIMEDOUT;
+                               DLIST_UNLINK(&msg_list, (node_t *)msg);
+                               poll_thread_detach_fds(msg);
+                               (void) pthread_msg_reply(&msg->msgnode);
+                       } else if (timercmp(&msg->expires, &ttv, <)) {
+                               ttv.tv_sec = msg->expires.tv_sec;
+                               ttv.tv_usec = msg->expires.tv_usec;
+                       }
+               }
+
+               /*
+                * If there are no registered descriptors to poll for, wait
+                * using the thread friendly ring until messages occur, and
+                * reiterate.
+                */
+               if (poll_nfds == 0) {
+                       (void) pthread_ring_wait(&ring, NULL);
+                       continue;
+               }
+
+               /*
+                * Perform polling.  poll(2) for as much time as possible,
+                * although making sure to allow the soonest to expire query
+                * to stop polling.  Next to expire entry time is in ttv and
+                * current time in tv.  Calculate difference and convert to
+                * milliseconds.
+                */
+               if (ttv.tv_sec == 99999 && ttv.tv_usec == 99999)
+                       timeout = -1;
+               else {
+                       timersub(&ttv, &tv, &ttv);
+                       timeout = (ttv.tv_sec * 1000) + (ttv.tv_usec / 1000);
+               }
+
+               /*
+                * Unblock the SIGUSR2 signal, which we should be the only
+                * thread to receive, all other threads blocking it.
+                * Only leave it unblocked for the duration of the poll(2)
+                * syscall.  We cause our loop to reiterate in any case of
+                * error, EINTR or no file descriptor with pending event.
+                */
+               (void) pthread_sigmask(SIG_UNBLOCK, &set, NULL);
+               polling++;
+
+               n = 0;
+               if (pollingevents != 0)
+                       goto unblock;
+
+               n = poll(poll_fds, poll_nfds, timeout);
+
+unblock:
+               polling--;
+               (void) pthread_sigmask(SIG_BLOCK, &set, NULL);
+               if (pollingevents != 0 || n < 1)
+                       continue;
+
+               /*
+                * Verify which descriptors have interesting events set,
+                * increasing events counter of corresponding requests.
+                */
+               for (i = 0; n != 0 && i < poll_nfds; i++) {
+                       if (poll_fds[i].revents != 0) {
+                               (poll_idx[poll_fds[i].fd].msg->ready)++;
+                               n--;
+                       }
+               }
+               /*
+                * Now verify pending request messages for events, and satisfy
+                * the requests of those who do.
+                */
+               for (msg = DLIST_TOP(&msg_list); msg != NULL; msg = nextmsg) {
+                       nextmsg = DLIST_NEXT(msg);
+
+                       if (msg->ready != 0) {
+                               /*
+                                * ready and error fields are already set
+                                */
+                               DLIST_UNLINK(&msg_list, (node_t *)msg);
+                               poll_thread_detach_fds(msg);
+                               (void) pthread_msg_reply(&msg->msgnode);
+                       }
+               }
+       }
+
+       /* NOTREACHED */
+       DEBUG_PTHREAD_EXIT();
+       pthread_exit(NULL);
+       return NULL;
+}
+
+/*
+ * Permits to merge supplied pollfd set with the main set
+ */
+static int
+poll_thread_attach_fds(struct poll_msg *msg)
+{
+       register int    i, fd, idx;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       for (i = 0; i < msg->nfds; i++) {
+               fd = msg->fds[i].fd;
+
+               /*
+                * Ignore unset descriptors
+                */
+               if (fd == -1)
+                       continue;
+
+               /*
+                * Grow index buffer if necessary.  Either grow by doubling
+                * size, or even more if necessary to hold index to fd.
+                * If we only grew to hold fd, we might need to realloc(3) too
+                * often.  Take care to also NULL msg field of new entries.
+                */
+               if (poll_idx_size <= fd) {
+                       struct poll_idx *idx;
+                       int             size, i2;
+
+                       size = poll_idx_size * 2;
+                       if (fd > size)
+                               size = fd;
+                       if ((idx = realloc(poll_idx,
+                           sizeof(struct poll_idx) * size)) == NULL)
+                               goto err;
+                       poll_idx = idx;
+                       for (i2 = poll_idx_size; i2 < size; i2++)
+                               poll_idx[i2].msg = NULL;
+                       poll_idx_size = size;
+               }
+
+               /*
+                * Error if descriptor not unique before adding to set.
+                * We do not allow multiple threads polling on the same
+                * descriptor at the same time in our system.  We would
+                * otherwise need to gracefully handle duplicates,
+                * multiplexing them, which isn't required at all by our
+                * applications.  So let's keep things simple.
+                */
+               if (poll_idx[fd].msg != NULL)
+                       goto err;
+
+               /*
+                * Resize pollfd array if needed.  Grow by doubling.
+                * This should happen very rarely.
+                * XXX We could check this condition only once at the
+                * top of this fonction and take in consideration the
+                * number of descriptors to add, if wanted for optimization.
+                */
+               if (poll_fds_size <= poll_nfds) {
+                       struct pollfd   *ptr;
+
+                       if ((ptr = realloc(poll_fds,
+                           sizeof(struct pollfd) * (poll_fds_size * 2)))
+                           == NULL)
+                               goto err;
+                       poll_fds = ptr;
+                       poll_fds_size *= 2;
+               }
+
+               /*
+                * Finally add descriptor to set and register it for indexing.
+                * We simply need to append it to the existing entries in our
+                * global polling set array.
+                */
+               idx = poll_nfds;
+               poll_fds[idx].fd = fd;
+               poll_fds[idx].events = msg->fds[i].events;
+               poll_fds[idx].revents = 0;
+               poll_idx[fd].msg = msg;
+               poll_idx[fd].idx = idx;
+               poll_nfds = ++idx;
+       }
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+
+err:
+       (void) poll_thread_detach_fds(msg);
+
+       DEBUG_PTHREAD_EXIT();
+       return -1;
+}
+
+/*
+ * Permits to disunite supplied pollfd set from the main set.  Also sets the
+ * revents fields of the supplied set to the ones of the main set.
+ */
+static void
+poll_thread_detach_fds(struct poll_msg *msg)
+{
+       register int    i, fd, idx;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       for (i = 0; i < msg->nfds; i++) {
+               fd = msg->fds[i].fd;
+
+               /*
+                * Make sure fd was properly registered
+                */
+               if (poll_idx[fd].msg != msg)
+                       continue;
+
+               /*
+                * Find index in global pollfd set for this fd
+                */
+               idx = poll_idx[fd].idx;
+
+               /*
+                * Update pollfd entry according to global one
+                */
+               msg->fds[i].revents = poll_fds[idx].revents;
+
+               /*
+                * Unlink fd from the global set.  The removal method is
+                * simple; Take the last entry of the global set and move it
+                * over the current entry, updating index links, and lower
+                * the gobal nfds by one.  If we're the last entry, simply
+                * remove it invalidating its index entry lowering the global
+                * nfds.
+                */
+
+               if (--poll_nfds != idx) {
+                       /*
+                        * Not last entry, move last entry over entry to
+                        * delete.
+                        */
+                       register struct pollfd  *deleted, *last;
+                       int                     deleted_fd, deleted_idx;
+
+                       last = &poll_fds[poll_nfds];
+                       deleted = &poll_fds[idx];
+                       deleted_fd = deleted->fd;
+                       deleted_idx = poll_idx[deleted_fd].idx;
+
+                        /* Copy last entry over deleted one */
+                       deleted->fd = last->fd;
+                       deleted->events = last->events;
+                       deleted->revents = last->revents;
+
+                       /*
+                        * Reindex last entry which was moved, don't touch
+                        * the msg pointer though.
+                        */
+                       poll_idx[last->fd].idx = deleted_idx;
+
+                       /* And finally invalidate last entry */
+                       poll_idx[deleted_fd].msg = NULL;
+               } else {
+                       /* Invalidate last entry */
+                       poll_idx[poll_fds[poll_nfds].fd].msg = NULL;
+               }
+       }
+
+       DEBUG_PTHREAD_EXIT();
+}
+
+/*
+ * Called upon reception of SIGUSR2
+ */
+/* ARGSUSED */
+static void
+poll_thread_sighandler(int sig)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+
+       pollingevents++;
+
+       DEBUG_PTHREAD_EXIT();
+}
+
+
+
+/*
+ * Public API exported functions
+ */
+
+/*
+ * Must be called before launching any thread.  Sets up the signal mask and
+ * launches the dedicated poll slave thread.  Important note: this system
+ * clobbers the SIGUSR2 signal, which the application can no longer use for
+ * other purposes.  The only solution to wake the thread manager thread from
+ * poll(2) is either to trigger an event through a dedicated filedescriptor,
+ * or to send a signal to the process which only the polling thread allows.
+ */
+int
+pthread_poll_init(void)
+{
+       int             error;
+       sigset_t        set;
+       pthread_attr_t  attr;
+       pthread_t       thread;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (pthread_poll_initialized) {
+               error = 0;
+               goto err;
+       }
+
+       /*
+        * First block SIGUSR2 signal in the parent.  The reason why this must
+        * be called before the application launches any thread is that
+        * threads inherit the sigmask of their parent, and that all threads,
+        * but the polling thread, must block the signal.  This ensures that
+        * only the wanted thread wakes up when a SIGUSR2 signal is received.
+        * This way, we can interrupt the polling thread in poll(2), for
+        * instance, and cause it to reiterate its main loop.
+        */
+       (void) sigemptyset(&set);
+       (void) sigaddset(&set, SIGUSR2);
+       if ((error = pthread_sigmask(SIG_BLOCK, &set, NULL)) != 0)
+               goto err;
+
+       /*
+        * We'll use this pthread_ring_t to get notification from child that
+        * it is ready to process requests before proceeding.
+        */
+       if ((error = pthread_ring_init(&pthread_poll_thread_started_ring))
+           != 0)
+               goto err;
+
+       /*
+        * We may now launch the poll thread and wait for notification from it
+        * that it is ready to serve requests.  We won't need to exit this
+        * thread, so it can be launched in detached state.
+        */
+       if ((error = pthread_attr_init(&attr)) != 0)
+               goto err;
+       if ((error = pthread_attr_setdetachstate(&attr, TRUE)) != 0)
+               goto err;
+       if ((error = pthread_create(&thread, &attr, poll_thread, NULL)) != 0)
+               goto err;
+
+       /*
+        * Wait until thread is ready to serve requests
+        */
+       (void) pthread_ring_wait(&pthread_poll_thread_started_ring, NULL);
+
+       pthread_poll_initialized = TRUE;
+
+       return 0;
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * poll(2) replacement which can also be awakened by a notification happening
+ * on the specified ring.  This for instance allows to process thread messages
+ * as well as descriptor events.  Like poll(2), returns the number of
+ * descriptors with events on success (can be 0), or returns -1 with the
+ * specified error set in errno.  Unlike poll, the error ETIMEDOUT will occur
+ * if the timeout expires before an event existed, or ECANCELLED if a ring
+ * notification event occurred instead of a filedescriptor one.  Can also
+ * return errors such as EINVAL.
+ * XXX Check for ETIMEDOUT!  We probably don't do this yet.  Also, we could
+ * return 0 in this case like poll(2).
+ */
+int
+pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout,
+    pthread_ring_t *ring)
+{
+       int                     error;
+       struct poll_data        *data;
+       pthread_ring_t          *oring;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (!pthread_poll_initialized) {
+               error = EINVAL;
+               goto err;
+       }
+
+       /*
+        * Implicit process and thread specific initializations
+        */
+       if ((error =  pthread_once(&pthread_poll_proc_initialized,
+           pthread_poll_proc_init2)) != 0)
+               goto err;
+       /*
+        * XXX Use a mutex or pthread_once() equivalent here too?
+        */
+       if ((data = pthread_getspecific(pthread_poll_proc_key)) == NULL) {
+               if ((error = pthread_poll_thread_init(&data)) != 0)
+                       goto err;
+       }
+
+       /*
+        * Perform some sanity checking on supplied arguments
+        */
+       if (fds == NULL || nfds < 1 || ring == NULL || ring->magic !=
+           PRING_MAGIC) {
+               error = EINVAL;
+               goto err;
+       }
+
+       /*
+        * Ensure that our message port's ring uses the same ring which
+        * the user supplies us.  If we didn't do this we would need to
+        * be able to wait for events on more than one ring simultaneously.
+        * Because we don't have a ring multiplexer object yet (which would
+        * be needed since a ring maps to a conditional variable among other
+        * things), we need to do process this way.
+        * XXX Could there be a race condition here?  It needs to be stressed.
+        */
+       {
+               int     mevent;
+
+               mevent = (data->port.ring != NULL ?
+                   data->port.ring->mevent : 0);
+               oring = data->port.ring;
+               (void) pthread_port_set_ring(&data->port, ring);
+               data->port.ring->mevent = mevent;
+       }
+
+       /*
+        * Send query to polling thread.  It is safe to simply reuse our
+        * message since we then expect a reply back and synchronize it.
+        */
+       data->msg.cancel = FALSE;
+       data->msg.fds = fds;
+       data->msg.nfds = nfds;
+       data->msg.timeout = timeout;
+       if ((error = pthread_msg_put(&pthread_poll_thread_port,
+           &data->msg.msgnode)) != 0)
+               goto err;
+
+       /*
+        * Interrupt polling thread which may still be waiting in poll(2).
+        * We do this by sending SIGUSR2 to the process, which only the
+        * polling thread is not blocking.  This causes the thread to reiterate
+        * its main loop, thus processing this message and going back to
+        * sleep in poll(2).
+        */
+       POLLWAKE();
+
+       /*
+        * Wait until en event occurs and notifies our ring.  An event could
+        * either be triggered by the poll request ending or by another
+        * interrupting event on the supplied ring.  If a message is queued
+        * on the port between pthread_port_set_ring() and
+        * pthread_ring_wait(), the latter immediately returns.
+        */
+       if ((error = pthread_ring_wait(ring, NULL)) != 0)
+               goto err;
+       if (pthread_msg_get(&data->port) == NULL) {
+               /*
+                * No message replied back from poll thread yet, this means
+                * that our ring was notified by another event.  Cancel request
+                * by sending event back with the cancel flag, and wait for
+                * reply message to occur (which will be the original request
+                * results we were waiting for).  error field will be set to
+                * ECANCELED by the poll thread.
+                */
+               data->msg.cancel = TRUE;
+               (void) pthread_msg_put(&pthread_poll_thread_port,
+                   &data->msg.msgnode);
+               POLLWAKE();
+               while (pthread_msg_get(&data->port) == NULL)
+                       (void) pthread_ring_wait(ring, NULL);
+       }
+       /* Unclobber user supplied ring from our port events */
+       (void) pthread_port_set_ring(&data->port, oring);
+
+       /*
+        * Error, return error number.
+        */
+       if (data->msg.error != 0) {
+               error = data->msg.error;
+               goto err;
+       }
+
+       /*
+        * Success, return number of descriptors with detected events.
+        */
+       DEBUG_PTHREAD_EXIT();
+       return data->msg.ready;
+
+err:
+       errno = error;
+
+       DEBUG_PTHREAD_EXIT();
+       return -1;
+}
+
+/*
+ * accept(2) replacement which can both observe a timeout and be interrupted
+ * via pthread_ring_t events.  Internally implemented using
+ * pthread_poll_ring().  Will internally set the descriptor in non-blocking
+ * mode if necessary, then reverting it to the mode it was supplied in.
+ * Returns a new descriptor on success, or -1 on error, in which case errno
+ * is set.  errno can then be EINVAL, ETIMEDOUT, ECANCELED, or others.
+ * Timeout is in milliseconds, like for poll(2) and can be -1.
+ */
+int
+pthread_accept_ring(int s, struct sockaddr *addr, socklen_t *addrlen,
+    int timeout, pthread_ring_t *ring)
+{
+       int             oflags, nflags, d, error = 0;
+       struct pollfd   fd;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (!pthread_poll_initialized) {
+               errno = EINVAL;
+               goto err;
+       }
+
+       /*
+        * First get current fcntl status flags, and set descriptor to
+        * non-blocking mode if necessary.
+        */
+       if ((oflags = nflags = fcntl(s, F_GETFL)) == -1)
+               goto err;
+       if ((oflags & O_NONBLOCK) == 0) {
+               nflags |= O_NONBLOCK;
+               if (fcntl(s, F_SETFL, nflags) == -1)
+                       goto err;
+       }
+
+       if ((d = accept(s, addr, addrlen)) == -1) {
+               if (errno != EAGAIN) /* XXX Add others? */
+                       goto end;
+       } else
+               goto end;
+
+       /*
+        * EAGAIN, poll until completion, timeout or ring event.
+        */
+       fd.fd = d;
+       fd.events = POLLIN;
+       if ((error = pthread_poll_ring(&fd, 1, timeout, ring)) == 1 &&
+           (fd.revents & POLLIN) != 0)
+               error = 0;
+       else
+               error = errno;
+
+end:
+       /*
+        * Restore supplied descriptor fcntl status flags if necessary
+        */
+       if (nflags != oflags)
+               (void) fcntl(s, F_SETFL, oflags);
+
+       if (error != 0) {
+               if (d != -1) {
+                       (void) close(d);
+                       d = -1;
+               }
+               errno = error;
+               goto err;
+       }
+
+       DEBUG_PTHREAD_EXIT();
+       return d;
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return -1;
+}
+
+/*
+ * connect(2) replacement which can both observe a timeout and be interrupted
+ * via pthread_ring_t events.  Internally implemented using
+ * pthread_poll_ring().  Will internally set the descriptor in non-blocking
+ * mode if necessary, then reverting it back to the mode it was supplied in.
+ * Returns 0 on success, or -1, in which case errno is set.  errno can be
+ * EINVAL, ETIMEDOUT, ECANCELED or others.
+ * Timeout is in milliseconds, like for poll(2) and can be -1.
+ * For the application to know the actual connection status result, it should
+ * poll until completion and verify the status using getsockopt(2) with
+ * SOL_SOCKET level and SO_ERROR option.  It can alternatively continue to call
+ * this function in a loop until completion.  Calling the function on an
+ * already connected socket will result in EISCONN.
+ */
+int
+pthread_connect_ring(int s, const struct sockaddr *name, socklen_t namelen,
+    int timeout, pthread_ring_t *ring)
+{
+       int             oflags, nflags, error = 0;
+       struct pollfd   fd;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (!pthread_poll_initialized) {
+               errno = EINVAL;
+               goto err;
+       }
+
+       /*
+        * First get current fcntl status flags, and set descriptor to
+        * non-blocking mode if necessary.
+        */
+       if ((oflags = nflags = fcntl(s, F_GETFL)) == -1)
+               goto err;
+       if ((oflags & O_NONBLOCK) == 0) {
+               nflags |= O_NONBLOCK;
+               if (fcntl(s, F_SETFL, nflags) == -1)
+                       goto err;
+       }
+
+       if ((error = connect(s, name, namelen)) == -1) {
+               if (errno != EINPROGRESS && errno != EALREADY) {
+                       error = errno;
+                       goto end;
+               }
+       } else
+               goto end;
+
+       /*
+        * EINPROGRESS or EALREADY, poll until completion, timeout or ring
+        * event.
+        */
+       fd.fd = s;
+       fd.events = POLLOUT;
+       if (pthread_poll_ring(&fd, 1, timeout, ring) == 1 &&
+           (fd.revents & POLLOUT) != 0) {
+               socklen_t       l;
+
+               /*
+                * connect(2) completed, return result
+                */
+               if (getsockopt(s, SOL_SOCKET, SO_ERROR, &error, &l) == -1)
+                       error = errno;
+       }
+
+end:
+       /*
+        * Restore supplied descriptor fcntl status flags if necessary
+        */
+       if (nflags != oflags)
+               (void) fcntl(s, F_SETFL, oflags);
+
+       if (error != 0) {
+               errno = error;
+               goto err;
+       }
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return -1;
+}
diff --git a/mmsoftware/pthread_util/mm_pthread_poll.h b/mmsoftware/pthread_util/mm_pthread_poll.h
new file mode 100644 (file)
index 0000000..729862f
--- /dev/null
@@ -0,0 +1,63 @@
+/* $Id: mm_pthread_poll.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_POLL_H
+#define MM_PTHREAD_POLL_H
+
+
+
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <poll.h>
+
+#include <mm_pthread_msg.h>
+
+
+
+extern int     pthread_poll_init(void);
+extern int     pthread_poll_ring(struct pollfd *, nfds_t, int,
+                   pthread_ring_t *);
+extern int     pthread_accept_ring(int, struct sockaddr *, socklen_t *, int,
+                   pthread_ring_t *);
+extern int     pthread_connect_ring(int, const struct sockaddr *, socklen_t,
+                   int, pthread_ring_t *);
+
+
+
+#endif
diff --git a/mmsoftware/pthread_util/mm_pthread_pool.c b/mmsoftware/pthread_util/mm_pthread_pool.c
new file mode 100644 (file)
index 0000000..6684c00
--- /dev/null
@@ -0,0 +1,504 @@
+/* $Id: mm_pthread_pool.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2004-2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Implementation of a pool of ready threads which adapts with concurrency
+ * needs.  These ready threads can serve requests passed through efficient
+ * inter-thread messaging.  mmpool(3) is used for the pool functionality.
+ */
+
+
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <errno.h>
+
+#include <mm_pthread_debug.h>
+#include <mm_pthread_pool.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2004-2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: mm_pthread_pool.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
+
+
+
+/*
+ * STATIC FUNCTIONS PROTOTYPES
+ */
+
+inline static pthread_object_t *thread_object_alloc(void);
+inline static void             thread_object_free(pthread_object_t *);
+static bool                    thread_object_constructor(pnode_t *);
+static void                    thread_object_destructor(pnode_t *);
+static void                    *thread_object_main(void *);
+
+
+
+/*
+ * GLOBALS
+ */
+
+static bool                    thread_object_initialized = FALSE;
+static pthread_attr_t          thread_object_attr;
+static pool_t                  thread_object_pool;
+static pool_t                  thread_object_msg_pool;
+static pthread_mutex_t         thread_object_pool_mutex =
+                                   PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t         thread_object_msg_pool_mutex =
+                                   PTHREAD_MUTEX_INITIALIZER;
+static pthread_ring_t          thread_started_ring;
+
+
+
+/*
+ * EXPORTED PUBLIC FUNCTIONS
+ */
+
+/*
+ * Must be called to initialize the pthreads pool subsystem, before calling
+ * any other function of this API.  Returns 0 on success, or an error number.
+ * <initial> threads are launched, and more will be launched in increments
+ * of <initial> whenever necessary.  These will also only be destroyed in
+ * decrements of <initial> whenever that many threads have not been in use for
+ * some time, and a minimum of <initial> threads will always be kept.
+ * Setting <initial> to high values may actually degrade performance with some
+ * unefficient threading implementations.  It is not recommended to use more
+ * than 8 using the pth(3) library.  Using NetBSD 2.0+ SA threads, a high
+ * number does not reduce performance.  We current do not observe any limit
+ * whatsoever according to the number of threads launched over time.  It is the
+ * application's responsibility to ensure to observe decent concurrency limits
+ * before calling pthread_object_call().
+ */
+int
+pthread_object_init(int initial)
+{
+       int     error = 0;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (thread_object_initialized) {
+               error = EINVAL;
+               goto err;
+       }
+
+       /*
+        * Create attributes which will be used for threads of the pool.
+        * We want them to be joinable.
+        */
+       if ((error = pthread_attr_init(&thread_object_attr)) != 0)
+               goto err;
+       if ((error = pthread_attr_setdetachstate(&thread_object_attr, 0))
+           != 0)
+               goto err;
+
+       /*
+        * We use this ring to obtain notification of ready children when
+        * launching them.  This is required for proper synchronization to
+        * avoid aweful race conditions.
+        */
+       if ((error = pthread_ring_init(&thread_started_ring)) != 0)
+               goto err;
+
+       /*
+        * First initialize the message subsystem pool
+        */
+       if (!pool_init(&thread_object_msg_pool, "thread_object_msg_pool",
+           malloc, free, NULL, NULL, sizeof(pthread_object_msg_t),
+           32768 / sizeof(pthread_object_msg_t), 1, 0)) {
+               error = ENOMEM;
+               goto err;
+       }
+
+       /*
+        * Now initialize the threads pool.  This creates threads, uses
+        * synchronization with thread_started_ring, and uses the message
+        * subsystem, which all must be initialized and ready.
+        */
+       if (!pool_init(&thread_object_pool, "thread_object_pool",
+           malloc, free, thread_object_constructor, thread_object_destructor,
+           sizeof(pthread_object_t), initial, 1, 0)) {
+               error = ENOMEM;
+               goto err;
+       }
+
+       thread_object_initialized = TRUE;
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+
+err:
+       if (POOL_VALID(&thread_object_msg_pool))
+               pool_destroy(&thread_object_msg_pool);
+       if (POOL_VALID(&thread_object_pool))
+               pool_destroy(&thread_object_pool);
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Allows allocation/creation of a message suitable for asynchronous requests
+ * with the threads via their main message port provided by this system.
+ * Returns new message, or NULL on error.
+ */
+inline pthread_object_msg_t *
+pthread_object_msg_alloc(void)
+{
+       pthread_object_msg_t    *msg = NULL;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (pthread_mutex_lock(&thread_object_msg_pool_mutex) != 0)
+               goto err;
+       msg = (pthread_object_msg_t *)pool_alloc(&thread_object_msg_pool,
+           FALSE);
+       (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex);
+
+       (void) pthread_msg_init(&msg->message, NULL);
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return msg;
+}
+
+/*
+ * Permits to free/destroy a message which was allocated using
+ * pthread_object_msg_alloc() and sent asynchroneously.
+ */
+inline int
+pthread_object_msg_free(pthread_object_msg_t *msg)
+{
+       int     error = 0;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       (void) pthread_msg_destroy(&msg->message);
+
+       if ((error = pthread_mutex_lock(&thread_object_msg_pool_mutex)) != 0)
+               goto err;
+       (void) pool_free((pnode_t *)msg);
+       (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex);
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Allows to invoke a thread of the pool to perform execution of the wanted
+ * function.  This is very efficient since the threads are already created and
+ * are waiting for requests.  There is no maximum concurrency limit enforced by
+ * this system; It is the responsibility of the application to restrict
+ * concurrency as necessary by keeping internal information on the current
+ * number of requests.  0 is returned on success, or an error number.
+ * XXX Add support for synchroneous and asynchroneous operation.  Current
+ * operation is only asynchroneous, but we would like to add a boolean here to
+ * decide.  We also could add back the result value of the thread function
+ * which would only be useful in synchroneous operation, when we are waiting
+ * until the task ends...  Of course, it's still easy for applications to use
+ * these in a synchroneous manner, by using a message and/or ring,
+ * conditionnal variable, etc.
+ * Also evaluate if a callback function to be called to notify end of
+ * asynchroneous operation would be useful.
+ */
+int
+pthread_object_call(pthread_port_t **port,
+    void (*function)(pthread_object_t *, void *), void *args)
+{
+       pthread_object_t        *obj = NULL;
+       pthread_object_msg_t    *msg = NULL;
+       int                     error;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (function == NULL) {
+               error = EINVAL;
+               goto err;
+       }
+
+       /*
+        * Allocate a thread from the pool to reserve it, and tell it to call
+        * a function via a message.  The message cannot be on the stack in
+        * this case, since it holds arguments to be passed to a thread, and
+        * also consists of an asynchroneous message for wich we do not expect
+        * a response back, waiting for it.  We just dispatch it and go on.
+        */
+       if ((obj = thread_object_alloc()) == NULL) {
+               error = ENOMEM;
+               goto err;
+       }
+       if ((msg = pthread_object_msg_alloc()) == NULL) {
+               error = ENOMEM;
+               goto err;
+       }
+
+       msg->command = PTHREAD_OBJ_CALL;
+       msg->u.call.function = function;
+       msg->u.call.arguments = args;
+       if ((error = pthread_msg_put(obj->port, &msg->message)) != 0)
+               goto err;
+
+       /*
+        * Everything successful;
+        * If caller wants the message port of the thread, supply it
+        */
+       if (port != NULL)
+               *port = obj->port;
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+
+err:
+       if (msg != NULL)
+               pthread_object_msg_free(msg);
+       if (obj != NULL)
+               thread_object_free(obj);
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+
+
+/*
+ * INTERNAL STATIC FUNCTIONS
+ */
+
+/*
+ * Internally used to allocate a ready thread from the pool.
+ */
+inline static pthread_object_t *
+thread_object_alloc(void)
+{
+       pthread_object_t        *obj = NULL;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (pthread_mutex_lock(&thread_object_pool_mutex) != 0)
+               goto err;
+       obj = (pthread_object_t *)pool_alloc(&thread_object_pool, FALSE);
+       (void) pthread_mutex_unlock(&thread_object_pool_mutex);
+
+err:
+       return obj;
+}
+
+/*
+ * Internally used to free a no longer needed thread back to the pool of ready
+ * threads.
+ */
+inline static void
+thread_object_free(pthread_object_t *obj)
+{
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if (pthread_mutex_lock(&thread_object_pool_mutex) == 0) {
+               (void) pool_free((pnode_t *)obj);
+               (void) pthread_mutex_unlock(&thread_object_pool_mutex);
+       }
+
+       DEBUG_PTHREAD_EXIT();
+}
+
+/*
+ * Internally called by mmpool(3) to create a thread object.
+ */
+static bool
+thread_object_constructor(pnode_t *pnode)
+{
+       pthread_object_t        *obj = (pthread_object_t *)pnode;
+       int                     success = TRUE;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       /*
+        * Note that we leave thread_object_main() initialize the port field
+        * when it creates its port and ring.
+        */
+       if (pthread_create(&obj->thread, &thread_object_attr,
+           thread_object_main, obj) != 0) {
+               success = FALSE;
+               goto err;
+       }
+
+       /*
+        * Wait until new thread ready notification.  Without this, at least
+        * with NetBSD 2.0 SA threads, hell would break loose.  Thread creation
+        * isn't really a bottleneck in our case anyways, since we only need
+        * to do it when all threads of the pool are already busy.
+        */
+       (void) pthread_ring_wait(&thread_started_ring, NULL);
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return success;
+}
+
+/*
+ * Internally called by mmpool(3) to destroy a thread object.
+ */
+static void
+thread_object_destructor(pnode_t *pnode)
+{
+       pthread_object_t        *obj = (pthread_object_t *)pnode;
+       pthread_object_msg_t    *msg;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       /*
+        * To be freed, the thread has to be terminated.  We thus send it a
+        * quit message and then wait for it to exit using pthread_join().
+        * Note that we let the thread destroy the port field.  Although we
+        * theoretically could use a message on the stack here, let's be safe.
+        * Thread destruction is only performed rarely anyways, so this isn't
+        * a performance problem.
+        */
+       if ((msg = pthread_object_msg_alloc()) != NULL) {
+               msg->command = PTHREAD_OBJ_QUIT;
+               (void) pthread_msg_put(obj->port, &msg->message);
+       }
+       (void) pthread_join(obj->thread, NULL);
+
+       DEBUG_PTHREAD_EXIT();
+}
+
+/*
+ * Actual thread's main loop.  We create a message port and listen for command
+ * messages (quit and call).  When we obtain a quit request, we destroy the
+ * port and exit cleanly.  The quit event can never occur during the execution
+ * of a call command, since it is only called on already freed thread nodes
+ * (by mmpool(3) pool_free()).  It is advized to applications which need to
+ * obtain and use the port of the thread after thread_object_call() to only
+ * send proper user messages, not system reserved ones.
+ */
+static void *
+thread_object_main(void *args)
+{
+       pthread_object_t        *obj = (pthread_object_t *)args;
+       pthread_port_t          port;
+       pthread_ring_t          ring;
+       pthread_msg_t           *imsg;
+       pthread_object_msg_t    *msg;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       /*
+        * Create our incomming message port as well as its corresponding
+        * notification ring we can sleep on.  Then advertize our port address.
+        * Ideally, we should somehow panic if any of this initialization
+        * fails. XXX
+        */
+       (void) pthread_port_init(&port);
+       (void) pthread_ring_init(&ring);
+       (void) pthread_port_set_ring(&port, &ring);
+       obj->port = &port;
+
+       /*
+        * Notify parent that we are ready, so that it may proceed
+        */
+       (void) pthread_ring_notify(&thread_started_ring);
+
+       /*
+        * Main loop, which keeps executing until we obtain a PTHREAD_OBJ_QUIT
+        * message, at which event we cleanly exit.
+        */
+       for (;;) {
+               /*
+                * Wait for any message(s) to be available, without taking any
+                * CPU time.
+                */
+               (void) pthread_ring_wait(&ring, NULL);
+
+               /*
+                * We were awaken because at least one message is available.
+                * Process all messages in the queue.
+                */
+               while ((imsg = pthread_msg_get(&port)) != NULL) {
+                       msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]);
+                       if (msg->command == PTHREAD_OBJ_QUIT) {
+                               /*
+                                * We are ordered to exit by the object
+                                * destructor.
+                                */
+                               pthread_object_msg_free(msg);
+                               goto end;
+                       }
+                       if (msg->command == PTHREAD_OBJ_CALL) {
+                               /*
+                                * Request to execute a function.  This means
+                                * that we were allocated/reserved first.
+                                */
+                               msg->u.call.function(obj,
+                                   msg->u.call.arguments);
+                               pthread_object_msg_free(msg);
+                               /*
+                                * Free/release us back, so that we be
+                                * available again to process further
+                                * requests.  It is possible that freeing
+                                * ourselves cause a PTHREAD_OBJ_QUIT message
+                                * to be queued soon on our port by the
+                                * destructor function.  This is safe, since
+                                * the destructor does not cause us to be
+                                * destroyed until it waits for us to have
+                                * ended cleanly using pthread_join().
+                                */
+                               thread_object_free(obj);
+                       }
+               }
+       }
+
+end:
+       /*
+        * Discard messages that are still queued on our port (if any)
+        */
+       while ((imsg = pthread_msg_get(&port)) != NULL) {
+               msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]);
+               pthread_object_msg_free(msg);
+       }
+       /*
+        * Free our resources and exit.
+        */
+       (void) pthread_port_destroy(&port);
+       (void) pthread_ring_destroy(&ring);
+
+       DEBUG_PTHREAD_EXIT();
+       pthread_exit(NULL);
+
+       /* NOTREACHED */
+       return NULL;
+}
diff --git a/mmsoftware/pthread_util/mm_pthread_pool.h b/mmsoftware/pthread_util/mm_pthread_pool.h
new file mode 100644 (file)
index 0000000..96ed45d
--- /dev/null
@@ -0,0 +1,97 @@
+/* $Id: mm_pthread_pool.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2004-2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_POOL_H
+#define MM_PTHREAD_POOL_H
+
+
+
+#include <pthread.h>
+
+#include <mmtypes.h>
+#include <mmpool.h>
+
+#include <mm_pthread_msg.h>
+
+
+
+typedef struct {
+       pnode_t         node;
+       pthread_t       thread;
+       pthread_port_t  *port;
+} pthread_object_t;
+
+typedef struct {
+       pnode_t         node;
+       pthread_msg_t   message;
+       int             command;
+       union {
+               /* PTHREAD_OBJ_CALL, sent to thread_object_main() */
+               struct {
+                       void    (*function)(pthread_object_t *, void *);
+                       void    *arguments;
+               } call;
+               /* PTHREAD_OBJ_QUIT, sent to thread_oject_reaper() */
+               pthread_object_t *quit;
+               /* PTHREAD_OBJ_USER, custom user messages */
+               struct {
+                       int     user_command;
+                       void    *user_data;
+               } user;
+       } u;
+} pthread_object_msg_t;
+
+enum pthread_object_commands {
+       PTHREAD_OBJ_CALL,
+       PTHREAD_OBJ_QUIT,
+       PTHREAD_OBJ_USER,
+       PTHREAD_OBJ_MAX
+};
+
+
+
+extern int                             pthread_object_init(int);
+extern inline pthread_object_msg_t     *pthread_object_msg_alloc(void);
+extern inline int                      pthread_object_msg_free(
+                                           pthread_object_msg_t *);
+extern int                             pthread_object_call(pthread_port_t **,
+                                           void (*)(pthread_object_t *,
+                                           void *), void *);
+
+
+
+#endif
diff --git a/mmsoftware/pthread_util/mm_pthread_sleep.c b/mmsoftware/pthread_util/mm_pthread_sleep.c
new file mode 100644 (file)
index 0000000..24badc4
--- /dev/null
@@ -0,0 +1,285 @@
+/* $Id: mm_pthread_sleep.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <pthread.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+#include <mm_pthread_debug.h>
+#include <mm_pthread_msg.h>
+#include <mm_pthread_sleep.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: mm_pthread_sleep.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
+
+
+
+static int             pthread_sleep_proc_init(void);
+static void            pthread_sleep_proc_init2(void);
+static int             pthread_sleep_thread_init(pthread_ring_t **);
+static void            pthread_sleep_thread_exit(void *);
+
+static pthread_key_t   pthread_sleep_proc_key;
+static pthread_once_t  pthread_sleep_proc_initialized = PTHREAD_ONCE_INIT;
+
+
+
+static int
+pthread_sleep_proc_init(void)
+{
+       int     error;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       error = pthread_key_create(&pthread_sleep_proc_key,
+           pthread_sleep_thread_exit);
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+static void
+pthread_sleep_proc_init2(void)
+{
+       int     error;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if ((error = pthread_sleep_proc_init()) != 0) {
+               (void) fprintf(stderr, "pthread_sleep_proc_init() - %s\n",
+                   strerror(error));
+               DEBUG_PTHREAD_EXIT();
+               exit(EXIT_FAILURE);
+       }
+
+       DEBUG_PTHREAD_EXIT();
+}
+
+static int
+pthread_sleep_thread_init(pthread_ring_t **res)
+{
+       int             error;
+       pthread_ring_t  *ring;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       if ((ring = malloc(sizeof(pthread_ring_t))) == NULL) {
+               error = ENOMEM;
+               goto err;
+       }
+
+       if ((error = pthread_ring_init(ring)) != 0)
+               goto err;
+
+       if ((error = pthread_setspecific(pthread_sleep_proc_key, ring)) != 0)
+               goto err;
+
+       *res = ring;
+
+       DEBUG_PTHREAD_EXIT();
+       return 0;
+
+err:
+       if (ring != NULL)
+               free(ring);
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+static void
+pthread_sleep_thread_exit(void *specific)
+{
+       pthread_ring_t  *ring = (pthread_ring_t *)specific;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       (void) pthread_ring_destroy(ring);
+       free(ring);
+
+       /*
+        * Although NetBSD threads don't need this, some pthread
+        * implementations do.  Some will crash for attempting to reference the
+        * already freed memory twice calling us again until we NULL the
+        * pointer for the data.  Lame, but the POSIX standard was unclear
+        * about this.
+        */
+       (void) pthread_setspecific(pthread_sleep_proc_key, NULL);
+
+       DEBUG_PTHREAD_EXIT();
+}
+
+
+
+/*
+ * Suspends the calling thread for duration specified in supplied timespec.
+ * Returns 0 on success, or an error number.
+ */
+int
+pthread_nanosleep(struct timespec *ts)
+{
+       int             error;
+       struct timeval  tv;
+       struct timespec its;
+       pthread_ring_t  *ring;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       /*
+        * Process specific initialization if needed
+        */
+       if ((error = pthread_once(&pthread_sleep_proc_initialized,
+           pthread_sleep_proc_init2)) != 0)
+               goto err;
+       /*
+        * Thread specific initialization if needed
+        * XXX Use pthread_once() here too, or mutex around ring?
+        */
+       if ((ring = pthread_getspecific(pthread_sleep_proc_key)) == NULL) {
+               if ((error = pthread_sleep_thread_init(&ring)) != 0)
+                       goto err;
+       }
+
+       /*
+        * Generate absolute time timespec using current time and supplied
+        * timespec delay.
+        */
+       if (gettimeofday(&tv, NULL) == -1) {
+               error = errno;
+               goto err;
+       }
+       TIMEVAL_TO_TIMESPEC(&tv, &its);
+       timespecadd(&its, ts, &its);
+
+       /*
+        * We can finally sleep.  We expect ETIMEDOUT to be the normal return
+        * value in this case, which we convert to a no-error.  Other errors
+        * will be returned un changed.
+        */
+       if ((error = pthread_ring_wait(ring, &its)) == ETIMEDOUT)
+               error = 0;
+
+err:
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Suspends the current thread for the duration specified into supplied
+ * timeval.  Returns 0 on success or an error number.
+ */
+int
+pthread_microsleep(struct timeval *tv)
+{
+       struct timespec ts;
+       int             error;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       TIMEVAL_TO_TIMESPEC(tv, &ts);
+       error = pthread_nanosleep(&ts);
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Suspends execution of current thread for duration of specified
+ * milliseconds.  Returns 0 on success or an error number.
+ */
+int
+pthread_millisleep(unsigned int ms)
+{
+       struct timeval  tv;
+       int             error;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       tv.tv_sec = ms / 1000;
+       tv.tv_usec = (ms % 1000) * 1000;
+       error = pthread_microsleep(&tv);
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Suspends execution of thread for duration of specified number of seconds.
+ * Returns 0 on success or an error number.
+ */
+unsigned int
+pthread_sleep(unsigned int seconds)
+{
+       struct timespec ts;
+       int             error;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       ts.tv_sec = seconds;
+       ts.tv_nsec = 0;
+       error = pthread_nanosleep(&ts);
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
+
+/*
+ * Suspends execution of thread for durection of specified number of
+ * microseconds.  Like usleep(3).
+ */
+int
+pthread_usleep(useconds_t ms)
+{
+       struct timeval  tv;
+       int             error;
+
+       DEBUG_PTHREAD_ENTRY();
+
+       tv.tv_sec = 0;
+       tv.tv_usec = ms;
+       error = pthread_microsleep(&tv);
+
+       DEBUG_PTHREAD_EXIT();
+       return error;
+}
diff --git a/mmsoftware/pthread_util/mm_pthread_sleep.h b/mmsoftware/pthread_util/mm_pthread_sleep.h
new file mode 100644 (file)
index 0000000..e76b876
--- /dev/null
@@ -0,0 +1,57 @@
+/* $Id: mm_pthread_sleep.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_SLEEP_H
+#define MM_PTHREAD_SLEEP_H
+
+
+
+#include <sys/time.h>
+#include <pthread.h>
+#include <unistd.h>
+
+
+
+extern int             pthread_nanosleep(struct timespec *);
+extern int             pthread_microsleep(struct timeval *);
+extern int             pthread_millisleep(unsigned int);
+extern unsigned int    pthread_sleep(unsigned int);
+extern int             pthread_usleep(useconds_t);
+
+
+
+#endif
diff --git a/mmsoftware/pthread_util/tests/msg_test.c b/mmsoftware/pthread_util/tests/msg_test.c
new file mode 100644 (file)
index 0000000..c97ca88
--- /dev/null
@@ -0,0 +1,269 @@
+/* $Id: msg_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#include <stdio.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdarg.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: msg_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $");
+
+
+
+#define        THREADS                 32
+#define ROUNDS                 8
+#define TIMEOUT                        1
+/*#define PRINTLOCK*/
+/*#define NOPRINT*/
+
+
+
+struct message {
+       pthread_msg_t   node;
+       int             id, i;
+};
+
+
+
+int            main(void);
+static void    threadfunc(pthread_object_t *, void *);
+static void    printfunc(const char *, ...);
+
+
+
+static pthread_port_t  main_port;
+static pthread_mutex_t print_lock;
+
+
+
+int
+main(void)
+{
+       pthread_ring_t  ring;
+       struct message  *msg;
+       int             i, err;
+       int             threads_args[THREADS];
+       struct timeval  tv;
+       struct timespec ts, ts1;
+
+       if ((err = pthread_mutex_init(&print_lock, NULL)) != 0) {
+               (void) printf("main() - stdout lock - %s\n", strerror(err));
+               exit(EXIT_FAILURE);
+       }
+
+       if ((err = pthread_port_init(&main_port)) != 0 ||
+           (err = pthread_ring_init(&ring)) != 0 ||
+           (err = pthread_port_set_ring(&main_port, &ring)) != 0) {
+               printfunc("main() - initialization - %s\n", strerror(err));
+               exit(EXIT_FAILURE);
+       }
+
+       printfunc("Main: launching threads\n");
+
+       if ((err = pthread_poll_init()) != 0) {
+               printfunc("main() - pthread_poll_init() - %s\n",
+                   strerror(err));
+               exit(EXIT_FAILURE);
+       }
+
+       /*
+        * Initializes a poll of ready threads which can be dispatched
+        * functions to execute.
+        */
+       if ((err = pthread_object_init(THREADS + 1)) != 0) {
+               printfunc("main() - pthread_object_init() - %s\n",
+                   strerror(err));
+               exit(EXIT_FAILURE);
+       }
+
+       /*
+        * Now dispatch a main reentrant function to many threads, without
+        * waiting for them to complete, in an asynchroneous manner.
+        * XXX Because of the way this works, the parent main thread should
+        * actually already be listening to messages...  We did create a port
+        * however, which should queue messages until we reach the main loop.
+        */
+       for (i = 0; i < THREADS; i++) {
+               threads_args[i] = i;
+               if ((err = pthread_object_call(NULL, threadfunc,
+                   &threads_args[i])) != 0)
+                       printfunc("main() - pthread_object_call() - %s\n",
+                           strerror(errno));
+       }
+
+       ts1.tv_sec = TIMEOUT;
+       ts1.tv_nsec = 0;
+       for (;;) {
+               /*
+                * Read messages as long as there are any, and reply to each
+                * of them in a synchroneous manner.
+                */
+               while ((msg = (struct message *)pthread_msg_get(&main_port))
+                   != NULL) {
+
+                       printfunc(
+                           "Main: Received message %d from thread #%d\n",
+                           msg->i, msg->id);
+
+                       if ((err = pthread_msg_reply((pthread_msg_t *)msg))
+                           != 0)
+                               printfunc(
+                                   "Main: pthread_message_reply() - %s\n",
+                                   strerror(err));
+               }
+
+               /*
+                * No more messages to process; Wait for any message(s) to be
+                * available.
+                * Note that there is special provision in the event where
+                * this loop first polling for new messages before processing
+                * them, which causes waiting for the ring to immediately
+                * return instead of actually waiting if any messages already
+                * have been sent.
+                */
+               printfunc("Main: Waiting for messages\n");
+
+               (void) gettimeofday(&tv, NULL);
+               TIMEVAL_TO_TIMESPEC(&tv, &ts);
+               timespecadd(&ts, &ts1, &ts);
+               if ((err = pthread_ring_wait(&ring, &ts)) != 0) {
+                       printfunc("Main: pthread_ring_wait() - %s\n",
+                           strerror(err));
+                       break;
+               }
+       }
+
+       (void) pthread_mutex_destroy(&print_lock);
+       (void) pthread_port_destroy(&main_port);
+       (void) pthread_ring_destroy(&ring);
+
+       return 0;
+}
+
+static void
+threadfunc(pthread_object_t *obj, void *args)
+{
+       int             id = *(int *)args;
+       int             i, err;
+       struct message  msg;
+       pthread_port_t  rport;
+       pthread_ring_t  rring;
+
+       if ((err = pthread_port_init(&rport)) != 0 ||
+           (err = pthread_ring_init(&rring)) != 0 ||
+           (err = pthread_port_set_ring(&rport, &rring)) != 0 ||
+           (err = pthread_msg_init((pthread_msg_t *)&msg, &rport)) != 0) {
+               printfunc("threadfunc() - initialization - %s\n",
+                   strerror(err));
+               return;
+       }
+
+       msg.id = id;
+
+       (void) printfunc("Thread #%d started\n", id);
+
+       for (i = 0; i < ROUNDS; i++) {
+               /*
+                * Prepare and send synchronous message.  For asynchronous
+                * operation, we would need to allocate a message and to send
+                * it, and not expect a reply back immediately, even letting
+                * the other end free the message as necessary.  In synchronous
+                * mode we can use the same message over and over and share
+                * its memory area using proper send/reply methods for
+                * synchronization.
+                */
+               msg.i = i;
+               if ((err = pthread_msg_put(&main_port, (pthread_msg_t *)&msg))
+                   != 0)
+                       printfunc("Thread: pthread_message_put() - %s\n",
+                           strerror(err));
+
+               /* Now wait for synchronous reply and discard it */
+               if ((err = pthread_ring_wait(&rring, NULL)) != 0) {
+                       printfunc("Thread: pthread_ring_wait() - %s\n",
+                           strerror(err));
+                       break;
+               }
+               if (pthread_msg_get(&rport) == NULL)
+                       printfunc("Thread: pthread_msg_get() == NULL!?\n");
+               printfunc("Thread #%d received reply message for %d\n",
+                   id, i);
+       }
+
+       printfunc("Thread #%d ending\n", id);
+
+       (void) pthread_port_destroy(&rport);
+       (void) pthread_ring_destroy(&rring);
+       (void) pthread_msg_destroy((pthread_msg_t *)&msg);
+}
+
+static void
+printfunc(const char *fmt, ...)
+{
+       char    buf[1024];
+       va_list arg_ptr;
+       int     len;
+
+#ifdef NOPRINT
+       return;
+#endif
+
+       *buf = '\0';
+       va_start(arg_ptr, fmt);
+       if ((len = vsnprintf(buf, 1023, fmt, arg_ptr)) < 1)
+               return;
+       va_end(arg_ptr);
+
+#ifdef PRINTLOCK
+       (void) pthread_mutex_lock(&print_lock);
+#endif
+       (void) fwrite(buf, len, 1, stdout);
+#ifdef PRINTLOCK
+       (void) fflush(stdout);
+       (void) pthread_mutex_unlock(&print_lock);
+#endif
+}
diff --git a/mmsoftware/pthread_util/tests/poll_test.c b/mmsoftware/pthread_util/tests/poll_test.c
new file mode 100644 (file)
index 0000000..843dd78
--- /dev/null
@@ -0,0 +1,73 @@
+/* $Id: poll_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ *    must display the following acknowledgement:
+ *      This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ *    products derived from this software without specific prior written
+ *    permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ *    any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#include <stdio.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdarg.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: poll_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $");
+
+
+
+int    main(void);
+
+
+
+int
+main(void)
+{
+       int     err;
+
+       if ((err = pthread_poll_init()) != 0) {
+               (void) fprintf(stderr, "main() - pthread_poll_init() - %s\n",
+                   strerror(err));
+               exit(EXIT_FAILURE);
+       }
+
+       return 0;
+}
diff --git a/mmsoftware/pthread_util/tests/polltest.c b/mmsoftware/pthread_util/tests/polltest.c
new file mode 100644 (file)
index 0000000..ceedfd5
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * The goal of this program is to verify if it is valid for multiple threads
+ * to poll(2) on the same filedescriptor, and if so, what happens whenever
+ * an event is triggered on that descriptor.
+ *
+ * XXX Problems:
+ * - Only one of the polling threads seems to be awaken when an event occurs
+ *   on the descriptor. This probably means that using a signal would be
+ *   better... I sure don't want to need a filedescriptor per ring...
+ *   If I did however, would this really hurt? Are there that many rings?
+ *   But oops, this actually means two filedescriptors for each!
+ *   using a signal is probably better. However, we then need to clobber some
+ *   signal... We could use SIGUSR2.
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <pthread.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+
+
+#define THREADS        8
+
+
+
+int            main(void);
+static void    *thread_poll(void *);
+static void    *thread_notify(void *);
+static void    thread_print(int, const char *);
+
+
+
+static int             sockets[2];
+static int             threadargs[THREADS];
+static pthread_mutex_t print_mutex;
+static pthread_mutex_t sockets_mutex;
+
+
+
+int
+main(void)
+{
+       pthread_t       threadid;
+       int             i;
+
+       /*
+        * Create socketpair which will be used to trigger events to awaken
+        * polling threads.
+        */
+       if (socketpair(AF_LOCAL, SOCK_DGRAM, 0, sockets) != 0) {
+               perror("socketpair()");
+               exit(EXIT_FAILURE);
+       }
+       if (fcntl(sockets[0], F_SETFL, O_NONBLOCK) != 0 ||
+           fcntl(sockets[1], F_SETFL, O_NONBLOCK) != 0) {
+               perror("fcntl()");
+               exit(EXIT_FAILURE);
+       }
+
+       pthread_mutex_init(&print_mutex, NULL);
+       pthread_mutex_init(&sockets_mutex, NULL);
+
+       /*
+        * First launch THREADS polling threads
+        */
+       for (i = 0; i < THREADS; i++) {
+               threadargs[i] = i;
+               pthread_create(&threadid, NULL, thread_poll, &threadargs[i]);
+       }
+       sleep(1);
+
+       /*
+        * And finally launch notifyer thread
+        */
+       pthread_create(&threadid, NULL, thread_notify, NULL);
+
+       /*
+        * Now just wait
+        */
+       for (;;)
+               (void) pause();
+}
+
+static void *
+thread_poll(void *args)
+{
+       struct pollfd   fds[1];
+       int             n;
+       int             id = *(int *)args;
+       char            c;
+
+       fds[0].fd = sockets[1];
+       fds[0].events = POLLIN;
+       for (;;) {
+               thread_print(id, "Polling");
+               if ((n = poll(fds, 1, -1)) == -1) {
+                       perror("poll()");
+                       return NULL;
+               }
+               thread_print(id, "Poll returned");
+               if (n == 0) {
+                       thread_print(id, "Woke up! (no data)");
+                       continue;
+               }
+               if ((fds[0].revents & POLLIN) != 0) {
+                       /* Attempt to read event/byte */
+                       thread_print(id, "Woke up! (with data)");
+                       pthread_mutex_lock(&sockets_mutex);
+                       while ((n = read(sockets[1], &c, 1)) == 1)
+                               thread_print(id, "Read data!");
+                       if (n == -1)
+                               thread_print(id, strerror(errno));
+                       pthread_mutex_unlock(&sockets_mutex);
+               }
+       }
+}
+
+/* ARGSUSED */
+static void *
+thread_notify(void *args)
+{
+       char            c = '\0';
+       struct pollfd   fds[1];
+
+       fds[0].fd = sockets[0];
+       fds[0].events = POLLOUT;
+       for (;;) {
+               sleep(1);
+               thread_print(-1, "Notifying");
+               pthread_mutex_lock(&sockets_mutex);
+               if (write(sockets[0], &c, 1) != 1) {
+                       /* Poll until we can send data */
+                       (void) poll(fds, 1, -1);
+               }
+               pthread_mutex_unlock(&sockets_mutex);
+       }
+}
+
+static void
+thread_print(int id, const char *str)
+{
+
+       pthread_mutex_lock(&print_mutex);
+       printf("%d: %s\n", id, str);
+       pthread_mutex_unlock(&print_mutex);
+}
diff --git a/mmsoftware/pthread_util/tests/sigtest.c b/mmsoftware/pthread_util/tests/sigtest.c
new file mode 100644 (file)
index 0000000..cc4b681
--- /dev/null
@@ -0,0 +1,146 @@
+/*
+ * The goal of this program is to verify if it is valid for multiple threads
+ * to be awaken from a poll(2) call by a single process-wide signal. This
+ * would allow the notifyer of a thread message event to generate this signal
+ * if needed to cause interested treads to wake up. Threads which do not want
+ * to receive the signal can simply ignore it using pthread_sigmask().
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <pthread.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+
+
+#define THREADS        8
+
+
+
+int            main(void);
+static void    *thread_poll(void *);
+static void    *thread_notify(void *);
+static void    thread_print(int, const char *);
+
+
+
+static int             sockets[2];
+static int             threadargs[THREADS];
+static pthread_mutex_t print_mutex;
+static pthread_mutex_t sockets_mutex;
+
+
+
+int
+main(void)
+{
+       pthread_t       threadid;
+       int             i;
+
+       /*
+        * Create socketpair which will be used to trigger events to awaken
+        * polling threads.
+        */
+       if (socketpair(AF_LOCAL, SOCK_DGRAM, 0, sockets) != 0) {
+               perror("socketpair()");
+               exit(EXIT_FAILURE);
+       }
+       if (fcntl(sockets[0], F_SETFL, O_NONBLOCK) != 0 ||
+           fcntl(sockets[1], F_SETFL, O_NONBLOCK) != 0) {
+               perror("fcntl()");
+               exit(EXIT_FAILURE);
+       }
+
+       pthread_mutex_init(&print_mutex, NULL);
+       pthread_mutex_init(&sockets_mutex, NULL);
+
+       /*
+        * First launch THREADS polling threads
+        */
+       for (i = 0; i < THREADS; i++) {
+               threadargs[i] = i;
+               pthread_create(&threadid, NULL, thread_poll, &threadargs[i]);
+       }
+       sleep(1);
+
+       /*
+        * And finally launch notifyer thread
+        */
+       pthread_create(&threadid, NULL, thread_notify, NULL);
+
+       /*
+        * Now just wait
+        */
+       for (;;)
+               (void) pause();
+}
+
+static void *
+thread_poll(void *args)
+{
+       struct pollfd   fds[1];
+       int             n;
+       int             id = *(int *)args;
+       char            c;
+
+       fds[0].fd = sockets[1];
+       fds[0].events = POLLIN;
+       for (;;) {
+               thread_print(id, "Polling");
+               if ((n = poll(fds, 1, -1)) == -1) {
+                       perror("poll()");
+                       return NULL;
+               }
+               thread_print(id, "Poll returned");
+               if (n == 0) {
+                       thread_print(id, "Woke up! (no data)");
+                       continue;
+               }
+               if ((fds[0].revents & POLLIN) != 0) {
+                       /* Attempt to read event/byte */
+                       thread_print(id, "Woke up! (with data)");
+                       pthread_mutex_lock(&sockets_mutex);
+                       while ((n = read(sockets[1], &c, 1)) == 1)
+                               thread_print(id, "Read data!");
+                       if (n == -1)
+                               thread_print(id, strerror(errno));
+                       pthread_mutex_unlock(&sockets_mutex);
+               }
+       }
+}
+
+/* ARGSUSED */
+static void *
+thread_notify(void *args)
+{
+       char            c = '\0';
+       struct pollfd   fds[1];
+
+       fds[0].fd = sockets[0];
+       fds[0].events = POLLOUT;
+       for (;;) {
+               sleep(1);
+               thread_print(-1, "Notifying");
+               pthread_mutex_lock(&sockets_mutex);
+               if (write(sockets[0], &c, 1) != 1) {
+                       /* Poll until we can send data */
+                       (void) poll(fds, 1, -1);
+               }
+               pthread_mutex_unlock(&sockets_mutex);
+       }
+}
+
+static void
+thread_print(int id, const char *str)
+{
+
+       pthread_mutex_lock(&print_mutex);
+       printf("%d: %s\n", id, str);
+       pthread_mutex_unlock(&print_mutex);
+}