--- /dev/null
+# $Id: GNUmakefile,v 1.1 2007/03/13 19:37:22 mmondor Exp $
+
+MMLIB_PATH := ../../mmlib
+MMLIBS := $(addprefix ${MMLIB_PATH}/,mmlog.o mmpool.o mmstring.o)
+OBJS := mm_pthread_msg.o mm_pthread_sleep.o mm_pthread_pool.o mm_pthread_poll.o
+BINS := tests/msg_test tests/poll_test
+
+CFLAGS += -Wall
+#CFLAGS += -DDEBUG -DPTHREAD_DEBUG -g3
+
+LDFLAGS += -lc -lpthread
+#LDFLAGS += -lpthread_dbg
+
+
+all: $(BINS)
+
+
+%.o: %.c
+ cc -c ${CFLAGS} -I. -I$(MMLIB_PATH) -o $@ $<
+
+
+tests/msg_test: tests/msg_test.o $(MMLIBS) $(OBJS)
+ cc ${CFLAGS} -o $@ $@.c $(OBJS) -I. -I$(MMLIB_PATH) ${LDFLAGS} \
+ $(MMLIBS)
+
+tests/poll_test: tests/poll_test.o $(MMLIBS) $(OBJS)
+ cc ${CFLAGS} -o $@ $@.c $(OBJS) -I. -I$(MMLIB_PATH) ${LDFLAGS} \
+ $(MMLIBS)
+
+
+install: all
+
+
+clean:
+ rm -f tests/msg_test.o tests/poll_test.o $(BINS) $(OBJS) $(MMLIBS)
--- /dev/null
+This library is an attempt to provide pth library like API to NetBSD SA
+threads and kqueue.
+
+What we find are missing from the POSIX standard are added here:
+
+- Implementation of efficient messages to communicate among threads. These
+ messages are queued using an efficient pointer linking mechanism. It must be
+ possible for a thread to wait for messages while sleeping and to be awaken
+ when a message is available. It also must be possible to observe a maximum
+ timeout to wait for.
+- Implementation of filedescriptors and above mentionned thread messages
+ notification multiplexing, with support for timer. An example of this is
+ pth library's pth_poll_ev(). A timer event can interrupt thread-safe
+ filedescriptor polling, as well as thread messages arriving on a port.
+
+We beleive that it is possible to implement this using the kqueue(2)/kevent(2)
+system. The new call would be similar to:
+
+pthread_poll(struct pollfd *fds, int nfds,
+ struct pthread_port *ports, int nports,
+ struct pthread_sigs *sigs, int nsigs,
+ struct pthread_timers *timers, int ntimers)
+
+or similar system. This would allow multiplexing of various events into a
+single application loop.
+
+
+pthread_cond_timedwait() seems especially useful, either with a signal handler
+or perhaps using kqueue concurrently... pthread_cond_timedwait() will allow
+processes to wait for message arrival though a port, while
+pthread_cond_signal() or pthread_cond_broadcast() will be able to awaken them
+as messages are queued to the message port. However, we would ideally want to
+only signal a wanted thread waiting for a port... But, normally only one
+thread should be listening for messages on any given port. I have to see what
+I'll do for a thread listening for messages on multiple ports at a time...
+Perhaps that multiple ports could use the same conditional wait variable so
+that the process would only wait on that one, and then verify the message
+queue for each before going back in waiting mode.
+
+
+TODO:
+====
+- Replace mutex and conditonal variable initializers, as well as attributes,
+ with static initializers.
+- Provide similar static intializer macros as part of our API where possible.
+- I have a working message passing implementation, with possibility of a
+ waiter on as many ports as wanted. I however still have a challenge:
+ Multiplex system calls such as select(2), poll(2), connect(2) and accept(2)
+ with the messaging capability. One must be able to cause the other to
+ return. This could be tricky to properly implement. Maybe think about the
+ following ideas:
+ - Dedicate a thread to serve a syscall, with which communication is solely
+ done using messages. This however implies that only a single syscall at a
+ time can be processed by such a thread. This probably means that a pool of
+ such threads would become necessary. This also assumes that the syscall in
+ question do not block the whole process, but only the intended thread.
+ Alot of assumptions, but this would now work properly on all BSDs and
+ on Linux. Possibly also on Solaris.
+ - Use a mix of signals and syscalls, since signals can interrupt syscalls.
+ However, this implies adding capability in our message system to trigger
+ signals rather than only using a conditional variable to notify of message
+ arrival. This also probably means that the same signal handler must be
+ shared by the whole process, that is, all the threads.
+ - Use kqueue in a thread-safe manner with thread-specific signals (if
+ possible). kqueue can be used to track signals without the need for an
+ actual signal handler. It would also track filedescriptor changes at the
+ same time. This also probably means that we need to use kqueue user
+ events if possible, triggered from the message passing system. It also
+ means non-portable code outside of the realm of BSD systems.
+
+
+
+RECENT REVIEW AFTER SOME REFLECTION
+===================================
+
+Currently, pth_accept_ev() and pth_connect_ev() are the only two cases of
+special PTh functions which my software uses, notably mmftpd(8). These could
+easily be implemented using a random thread in the pool whenever necessary,
+with which communication would entirely use messages only. This thread could
+be told: Perform syscall in non-blocking mode using the supplied
+filedescriptor and notify me weither it succeeded, failed because of a timeout,
+if any, or was interrupted by a message event occuring on the specified ring,
+if any. The application however has to know that if it was interrupted by
+an event, the connection still occurs asynchroneously within the system.
+We should verify what could be done to cancel a not yet completed connection,
+if possible. This call could also report if the call was interrupted by a
+signal arrival (EINTR), optionally. If the socket was supplied in blocking
+mode, it would have to be switched to non-blocking mode by the system and
+then back into blocking mode. The caller could ensure to set it into
+non-blocking mode for enhanced performance if no blocking mode is required.
+The challenge would be finding a both efficient and portable solution to
+have select()/poll() awake upon reception of notification events on a ring.
+Perhaps that a global filedescriptor could be used for this, SOCK_DGRAM and
+one byte sent, or that a signal handler with a signal generation should be
+used... Both methods would probably awake the whole process, however.
+pthread_sigmask() could be used perhaps... I wouldn't want to have a special
+fd required for each ready thread of the pool, ideally.
+
+Implement:
+mm_pthread_io pthread_poll_ev(), pthread_accept_ev(),
+ pthread_connect_ev()
+mm_pthread_alarm pthread_sleep(), etc.
+
+or maybe:
+
+mm_pthread_misc For all of them
+
+Perhaps reimplement the system I worked on in mmserver(3) as well. This might
+be necessary for operations which really should be dedicated to a non-threaded
+process at occasions, and the subsystem should be available. It should probably
+use a pool using mmpool(3) as well, just like we are doing with threads.
+
+It would be interesting to implement better GC for mmpool(3)'s. Currently,
+pool_free() will discard pages which are no longer in use since some time,
+but the time cannot be linear, since it only accounts a certain number of
+calls made to it. It should instead be possible to use time intervals, and
+to let the application invoke the GC at wanted fixed intervals. This would
+allow to use time based average statistics rather than function call times
+based ones, without clobbering process or thread timers which the application
+might need. It simply has to provide its own and to call the GC function
+regularily.
+
+Hmm also, would be nice to be able to store the port_t pointer of the port
+which triggered notification on a ring_t, so that callers don't need to
+run through several ports attached on a ring... Maybe that it would be
+problematic however, since we can't guarantee atomicity between messages and
+messages processing, unless we kludged the whole thing with locks and lost
+efficiency. And because we only trigger notification to wakeup a waiting
+thread when a message is queued on an empty port, it's possible that the
+applicaton sleeps forever on a port if it didn't totally empty it, unless
+there was a way for the sleep function to immediately return if called on
+non-empty ports (as it's only alled on rings, and that rings don't have
+access to a list of ports in current implementation (only the ports can
+know which ring they are tied to)... I could implement something to have
+rings see their attached ports with a list, however. But this again means
+looping among ports to see if they're non-empty, heh, so why not let the
+application do it as they do now.
+
+
+
+IMPORTANT
+=========
+
+I did a test where multiple threads were polling on a single filedescriptor
+consisting of a socketpair, which other side was used to wake them up.
+Only one random thread would wake up.
+
+Using a signal to cause all threads to wake would not work either, because
+then again only a random thread will awake.
+
+It appears that the only way to ensure to wake wanted threads is using
+conditional variables and for them to only sleep on these.
+
+SIGIO possibility... threads would be sleeping on a conditional wait variable
+corresponding to the filedescriptor. For polling, the fd would be made in
+non-blocking I/O, with SIGIO sent to process. The fd and associated cond var
+would be added to a table. The SIGIO signal handler would need to check all
+fds in the set for possible I/O and awake corresponding threads waiting on
+cond var. A problem exists: How to check a filedescriptor for pending event?
+How to know if event is read or write, or hup, etc? Maybe using more ore less
+standard FION ioctls? poll/select with 0 timeout maybe, but that is still
+troublesome in terms of performance I beleive.
+
+fd cond interesting_events occured_events?
+
+Hmm and what if a thread was allocated to start polling, and another wrapper
+thread wait for it sleeping on a cond var? Would it be sane to do this?
+When a timeout or message occurs however detected by the wrapper thread,
+how would we stop the other thread polling? We still have a problem.
+If we left pending polling threads, how would a future thread with successful
+polling on the same descriptor ever wake up, a random one would.
+Why does POSIX threads suck so much as to not provide any decent way to
+work with filedescriptors!? If at least I had pthread_signal() it would
+help. I could send a signal to interrupt the wanted thread when it was polling.
+Or if only there was a way to set the wanted signal mask for wanted processes
+as necessary, so that I would only have the wanted one process a particular
+signal I could send to interrupt it and then restore the masks, and do this
+somehow atomically. If POSIX had any of these requirements in mind while
+developing the standard, pthread_poll_condwait() or pthread_signal() would
+already exist anyways!
+
+
+HMM
+===
+
+A thread reserved for polling would seem best. We need to be able to interrupt
+that thread whenever needed using a signal, which all other processes must
+be blocking. We could use SIGIO, or SIGUSR2 for instance. That thread would
+process thread messages and go back to polling. It probably could handle
+timeouts as well, but this is probably not necessary. If it did, would
+probably free other threads from calling gettimeofday() too often. The thread
+has to remove the fd from the polling list when an event returned on it
+anyways, and so it could also send a reply message for timeout. It has to
+be interrupted anyways when a new fd is to be added, and this means that
+it could fix the poll timer before calling it each time to fit the soonest
+to expire fd... I could probably use kqueue too, or libevent in that thread
+to make it high performance as possible.
--- /dev/null
+/* $Id: mm_pthread_debug.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2004-2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_DEBUG_H
+#define MM_PTHREAD_DEBUG_H
+
+
+
+#include <pthread.h>
+#include <syslog.h>
+
+
+
+#ifdef PTHREAD_DEBUG
+
+#define DEBUG_PTHREAD_ENTRY() \
+ syslog(LOG_NOTICE, "> TID=%p FN=%s", pthread_self(), __func__)
+
+#define DEBUG_PTHREAD_EXIT() \
+ syslog(LOG_NOTICE, "< TID=%p FN=%s", pthread_self(), __func__)
+
+#else
+#define DEBUG_PTHREAD_ENTRY()
+#define DEBUG_PTHREAD_EXIT()
+#endif
+
+
+
+#endif
--- /dev/null
+/* $Id: mm_pthread_msg.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * It is almost a shame that POSIX did not define a standard API for
+ * inter-thread asynchroneous and synchroneous messaging. So, here is my
+ * implementation. Note that for asynchroneous operation it is recommended to
+ * use a memory pool such as mmpool(3) to allocate and free messages in an
+ * efficient way, in cases where messages will need to be sent to the other
+ * end without expecting a response back before the current function ends
+ * (in which case a message obviously can't be on the stack).
+ */
+
+
+
+#include <pthread.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include <mmtypes.h>
+#include <mmlog.h>
+
+#include <mm_pthread_debug.h>
+#include <mm_pthread_msg.h>
+/*#include <mm_pthread_poll.h>*/
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: mm_pthread_msg.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
+
+
+
+/*
+ * Allows to initialize a polling notification handle. When attached to a
+ * port, a message arriving on an empty port causes the associated ring to
+ * wake the thread from pthread_ring_wait().
+ */
+int
+pthread_ring_init(pthread_ring_t *ring)
+{
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(ring != NULL && ring->magic != PRING_MAGIC);
+
+ if ((error = pthread_cond_init(&ring->cond, NULL)) == 0) {
+ if ((error = pthread_mutex_init(&ring->mutex, NULL)) == 0) {
+ ring->magic = PRING_MAGIC;
+ ring->event = ring->mevent = 0;
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+ }
+ (void) pthread_cond_destroy(&ring->cond);
+ }
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Returns TRUE if the supplied ring is a valid/usable one, or FALSE
+ * otherwise. Useful to conditionally destroy it.
+ */
+int
+pthread_ring_valid(pthread_ring_t *ring)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+
+ DEBUG_PTHREAD_EXIT();
+ return (ring != NULL && ring->magic == PRING_MAGIC);
+}
+
+/*
+ * Destroys a ring. Note that all message ports attached to this ring should
+ * first be detached or destroyed.
+ */
+int
+pthread_ring_destroy(pthread_ring_t *ring)
+{
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC);
+
+ if ((error = pthread_mutex_destroy(&ring->mutex)) == 0)
+ error = pthread_cond_destroy(&ring->cond);
+ ring->magic = 0;
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Causes the current thread to sleep until a message arrives on an empty port
+ * associated with this ring. In normal operation, a thread only goes in wait
+ * mode after it processed all queued messages on all interesting ports.
+ * However, provision is made so that a the function returns immediately if
+ * messages already were received on a port attached to this ring since the
+ * last call to pthread_ring_wait().
+ * Although using such an absolute time timespec might be disadvantageous for
+ * the API compared to a timeout in milliseconds for instance, this was chosen
+ * to remain API-compatible with pthread_cond_timedwait(), and upwards
+ * compatible with systems where nanosecond precision can be achieved.
+ */
+int
+pthread_ring_wait(pthread_ring_t *ring, const struct timespec *abstime)
+{
+ int error = 0;
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC);
+
+ /* We must hold the condition variable's mutex */
+ if (pthread_mutex_lock(&ring->mutex) != 0) {
+ error = -1;
+ goto err;
+ }
+
+ /* As long as we don't have confirmation that we must stop waiting */
+ for (ring->event = 0; ring->mevent == 0 &&
+ !ring->event && error == 0; ) {
+ /*
+ * Wait on conditional variable, which will automatically
+ * and atomically release the mutex and return with the mutex
+ * locked again, as soon as the conditional variable gets
+ * signaled.
+ */
+ if (abstime != NULL) {
+ error = pthread_cond_timedwait(&ring->cond,
+ &ring->mutex, abstime);
+ } else
+ error = pthread_cond_wait(&ring->cond, &ring->mutex);
+ }
+ ring->mevent = 0;
+
+ /*
+ * And we know that conditional waiting functions returned with mutex
+ * locked, so now release it back.
+ */
+ (void) pthread_mutex_unlock(&ring->mutex);
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Allows to wake up waiter(s) on the specified ring, which are sleeping
+ * threads within pthread_ring_wait(). This can be used to simulate the
+ * arrival of a message on an empty port. Also useful to use rings as a
+ * notification system only when no message passing is needed.
+ */
+int
+pthread_ring_notify(pthread_ring_t *ring)
+{
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC);
+
+ if ((error = pthread_mutex_lock(&ring->mutex)) == 0) {
+ ring->mevent++;
+ ring->event = 1;
+ (void) pthread_cond_signal(&ring->cond);
+ (void) pthread_mutex_unlock(&ring->mutex);
+ }
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Allows to initialize/create a message port.
+ */
+int
+pthread_port_init(pthread_port_t *port)
+{
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(port != NULL && port->magic != PPORT_MAGIC);
+
+ if ((error = pthread_mutex_init(&port->lock, NULL)) != 0)
+ goto err;
+
+ port->magic = PPORT_MAGIC;
+ port->ring = NULL;
+ DLIST_INIT(&port->messages);
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Returns TRUE if the supplied port is valid/usable, or FALSE otherwise.
+ * Useful to conditionally destroy a port, for instance.
+ */
+int
+pthread_port_valid(pthread_port_t *port)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+
+ DEBUG_PTHREAD_EXIT();
+ return (port != NULL && port->magic == PPORT_MAGIC);
+}
+
+/*
+ * Destroys the specified port, previously created using pthread_port_init().
+ */
+int
+pthread_port_destroy(pthread_port_t *port)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC);
+
+ port->magic = 0;
+
+ DEBUG_PTHREAD_EXIT();
+ return pthread_mutex_destroy(&port->lock);
+}
+
+/*
+ * Attaches a port to a ring. Multiple ports may be attached to a ring. A
+ * message arriving on an empty port will cause the attached ring to be
+ * notified, if any, and as such to cause a thread waiting on the ring to
+ * be awakened.
+ */
+int
+pthread_port_set_ring(pthread_port_t *port, pthread_ring_t *ring)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC &&
+ (ring == NULL || ring->magic == PRING_MAGIC));
+
+ port->ring = ring;
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+}
+
+/*
+ * Allows to initialize a message before it can be sent over a port. The
+ * message only needs to be initialized once in general, even if it will be
+ * used for bidirectional transmission for synchronous operation. If the
+ * reply port needs to be changed, however, this function should be used again
+ * to set the new reply port.
+ */
+int
+pthread_msg_init(pthread_msg_t *msg, pthread_port_t *rport)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(msg != NULL && msg->magic != PMESG_MAGIC &&
+ (rport == NULL || rport->magic == PPORT_MAGIC));
+
+ msg->magic = PMESG_MAGIC;
+ msg->reply = rport;
+ msg->size = 0;
+ msg->message = NULL;
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+}
+
+/*
+ * Returns TRUE if supplied message is valid/usable or FALSE otherwise.
+ */
+int
+pthread_msg_valid(pthread_msg_t *msg)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+
+ DEBUG_PTHREAD_EXIT();
+ return (msg != NULL && msg->magic == PMESG_MAGIC);
+}
+
+/*
+ * Invalidates a message, so that it can no longer be sent over ports.
+ */
+int
+pthread_msg_destroy(pthread_msg_t *msg)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(msg != NULL && msg->magic == PMESG_MAGIC);
+
+ msg->magic = 0;
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+}
+
+/*
+ * If any message exists in the queue of the specified port, unqueues it and
+ * returns it. Otherwise, NULL is returned. In normal operation, all messages
+ * queued to a port are processed before putting the thread back into sleep,
+ * mainly for efficiency, but also because it eases synchronization.
+ */
+pthread_msg_t *
+pthread_msg_get(pthread_port_t *port)
+{
+ pthread_msg_t *msg = NULL;
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC);
+
+ if (pthread_mutex_lock(&port->lock) != 0)
+ goto err;
+
+ if ((msg = DLIST_TOP(&port->messages)) != NULL) {
+ DEBUG_ASSERT(msg->magic == PMESG_MAGIC);
+ DLIST_UNLINK(&port->messages, (node_t *)msg);
+ }
+
+ (void) pthread_mutex_unlock(&port->lock);
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return (pthread_msg_t *)msg;
+}
+
+/*
+ * Queues the specified message to the specified port, returning 0 on success.
+ * Note that the message data is not copied or moved, but that a pointer
+ * system is used to queue the message. Thus, the message's shared memory
+ * region is leased temporarily to the other end. One has to be careful to
+ * not allocate this message space on the stack when asynchroneous operation
+ * is needed. In synchroneous operation mode, it is not a problem, since the
+ * sender does not have to modify the data until the other end replies back
+ * with the same message after modifying the message if necessary. In
+ * synchroneous mode, we simply delegate that message memory region to the
+ * other end until it notifies us with a reply that it is done working with
+ * it. Returns 0 on success, or an error number.
+ */
+int
+pthread_msg_put(pthread_port_t *port, pthread_msg_t *msg)
+{
+ int error = 0;
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC &&
+ msg != NULL && msg->magic == PMESG_MAGIC);
+
+ if ((error = pthread_mutex_lock(&port->lock)) != 0)
+ goto err;
+
+ DLIST_APPEND(&port->messages, (node_t *)msg);
+ if (port->ring != NULL) {
+ if (DLIST_NODES(&port->messages) == 1) {
+ /*
+ * We know that there previously were no messages,
+ * and that the reading thread then waits for any
+ * message to be available. Signal it that there at
+ * least is one message ready. The other end should
+ * normally process all available messages before
+ * going back into waiting.
+ */
+ if ((error = pthread_mutex_lock(&port->ring->mutex))
+ == 0) {
+ port->ring->event = 1;
+ (void) pthread_cond_signal(&port->ring->cond);
+ (void) pthread_mutex_unlock(
+ &port->ring->mutex);
+ }
+ }
+ /*
+ * If the other end, however, is already locked
+ * waiting for the ring to be notified while
+ * there already are messages, we still trigger mevent
+ * to cause it to unlock, however. This behavior is
+ * useful in the polling system code, for instance.
+ */
+ /* XXX We don't use a mutex for now... */
+ port->ring->mevent++;
+ }
+
+ (void) pthread_mutex_unlock(&port->lock);
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Meant to be used in synchroneous message transfer mode. The initial sender
+ * sends a message to the other end, which then uses this function to notify
+ * back the initial sender that it is done, often with a success/failure
+ * result as part of the message. Returns 0 on success, or an error number.
+ */
+int
+pthread_msg_reply(pthread_msg_t *msg)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(msg != NULL && msg->magic == PMESG_MAGIC &&
+ msg->reply != NULL);
+
+ DEBUG_PTHREAD_EXIT();
+ return pthread_msg_put(msg->reply, msg);
+}
+
+/*
+ * Returns the number of pending messages tied to the port, if any, or -1
+ * on error.
+ */
+int
+pthread_port_pending(pthread_port_t *port)
+{
+ int pending = -1;
+
+ DEBUG_PTHREAD_ENTRY();
+ DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC);
+
+ if (pthread_mutex_lock(&port->lock) != 0)
+ goto err;
+
+ pending = (int)DLIST_NODES(&port->messages);
+
+ (void) pthread_mutex_unlock(&port->lock);
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return pending;
+}
--- /dev/null
+/* $Id: mm_pthread_msg.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_MSG_H
+#define MM_PTHREAD_MSG_H
+
+
+
+#include <pthread.h>
+
+#include <mmtypes.h>
+#include <mmlist.h>
+
+
+
+#define PRING_MAGIC 0x50524e47
+#define PPORT_MAGIC 0x50505254
+#define PMESG_MAGIC 0x504d5347
+
+typedef struct {
+ u_int32_t magic;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ int mode;
+ int event;
+ int mevent;
+} pthread_ring_t;
+
+enum pthread_ring_modes {
+ PTHREAD_RMOD_NOWAIT,
+ PTHREAD_RMOD_CONDWAIT,
+ PTHREAD_RMOD_FDWAIT
+};
+
+typedef struct {
+ u_int32_t magic;
+ pthread_ring_t *ring;
+ pthread_mutex_t lock;
+ list_t messages;
+} pthread_port_t;
+
+typedef struct {
+ node_t node;
+ u_int32_t magic;
+ pthread_port_t *reply;
+ size_t size;
+ void *message;
+} pthread_msg_t;
+
+
+
+extern int pthread_ring_init(pthread_ring_t *);
+extern int pthread_ring_valid(pthread_ring_t *);
+extern int pthread_ring_destroy(pthread_ring_t *);
+extern int pthread_ring_wait(pthread_ring_t *,
+ const struct timespec *);
+extern int pthread_ring_notify(pthread_ring_t *);
+
+extern int pthread_port_init(pthread_port_t *);
+extern int pthread_port_valid(pthread_port_t *);
+extern int pthread_port_destroy(pthread_port_t *);
+extern int pthread_port_set_ring(pthread_port_t *,
+ pthread_ring_t *);
+extern int pthread_msg_init(pthread_msg_t *,
+ pthread_port_t *);
+extern int pthread_msg_valid(pthread_msg_t *);
+extern int pthread_msg_destroy(pthread_msg_t *);
+extern pthread_msg_t *pthread_msg_get(pthread_port_t *);
+extern int pthread_msg_put(pthread_port_t *,
+ pthread_msg_t *);
+extern int pthread_msg_reply(pthread_msg_t *);
+extern int pthread_port_pending(pthread_port_t *);
+
+
+
+#endif
--- /dev/null
+/* $Id: mm_pthread_poll.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * I consider this code to be a major hack around the inherent problems unix
+ * systems face because of the lack of support for filedescriptor polling in
+ * the POSIX threads API. Although pthread defines methods for thread
+ * synchronization and polling waiting for events (using conditionnal
+ * variables), and that unix provides polling on filedescriptors using
+ * select(2), poll(2), kqueue(2) and other mechanisms, both are totally
+ * distinct entities which can be considered to either conflict with
+ * eachother or to not be related enough in a unified way. The current
+ * situation makes it almost impossible for a thread to both be polling for
+ * interthread efficient messages implementations built upon pthread, and
+ * filedescriptor events, concurrently.
+ *
+ * The GNU PTH library implements non-standard functions which allow to
+ * multiplex interthread messages and filedescriptor events, using for
+ * instance pth_poll_ev(), pth_select_ev(), pth_accept_ev(), pth_connect_ev(),
+ * etc. However, this is internally implemented using a single large select(2)
+ * based loop along with a slow large loop looking for non-fd events based on
+ * the principles of libevent. This threading library has other disadventages,
+ * such as not providing a preemptive scheduler (being a fully userspace
+ * implementation) and not allowing to scale to multiple processors on SMP
+ * systems. This interface however shows how good the POSIX threads API could
+ * have been, if it was better designed with unix systems in mind. This
+ * library also being the most portable threads library alternative for quite
+ * some time, because of the fact that Operating Systems implemented POSIX
+ * threads inconsistently, or not at all, caused us to use PTH during some
+ * time to develop software in cases where a pool of processes was not ideal
+ * because of the frequency of shared memory synchronization needs.
+ *
+ * With the advent of POSIX threads implementations on more unix and unix-like
+ * systems and of modern implementations behaving more consistently, which can
+ * scale on SMP systems and provide preemptive scheduling, it was considered
+ * worthwhile for us to adapt our software again to use the standard POSIX
+ * API. Especially considering that NetBSD which had no OS provided threads
+ * implementation for applications now has an awesome pthreads implementation
+ * starting with version 2.0. However, we encountered difficulties with some
+ * software which used the complex multiplexing of thread events and
+ * filedescriptor ones. This module provides a solution to port this software.
+ * It however is somewhat a hack.
+ *
+ * The downsides of this implementation are as follows. We originally intended
+ * to develop a system which would scale among an increasing number of threads
+ * in a ready pool of threads, scaling with concurrency of the polling calls.
+ * This however proved difficult, or impossible to achieve, the main reasons
+ * being that 1) A signal delivered to a process is only received by a random
+ * thread that is not blocking it. 2) In the case where multiple threads are
+ * polling on a common file descriptor, similarily only one random thread
+ * is awaken. 3) pthread_cond_signal() and pthread_cond_broadcast() cannot
+ * wake threads waiting in filedescriptor polling. 4) to achieve what we
+ * needed, two descriptors would have been necessary per notification ring.
+ * this was considered an aweful solution and was promptly rejected. 5) The
+ * POSIX API does not define a way for a process to set or change the signal
+ * blocking masks of other threads on the fly.
+ *
+ * Our solution then had to rely on a main descriptor polling manager thread
+ * which would be used to poll file descriptors, and would as a device serve
+ * client threads via efficient interthread messages. An issue still arises
+ * when a client thread sends a message to the polling thread to add new
+ * descriptors for polling or to cancel polling and remove descriptors.
+ * The polling thread must be able to immediately process these events
+ * awaking from filedescriptor polling. Two possible hacks could be used for
+ * this. 1) Use an AF_LOCAL SOCK_DGRAM socketpair(), which one side would
+ * be used to trigger an event writing some data, and the other side always
+ * included by the polling thread within the set of descriptors. 2) Send a
+ * signal to the process which only the polling thread is not blocking,
+ * to ensure that it be the one catching it, as such to awake from polling
+ * with an EINTR error code. This second solution was considered more elegant
+ * and is used as the basis of this implementation. We currently are
+ * clubbering the SIGUSR2 signal to achieve this.
+ */
+
+
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <pthread.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <poll.h>
+#include <signal.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <mmtypes.h>
+#include <mmlog.h>
+#include <mmlist.h>
+
+#include <mm_pthread_debug.h>
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: mm_pthread_poll.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
+
+
+
+/*
+ * Synchroneous communications message between arbitrary threads and the
+ * polling thread. Since communication is synchroneous, we only need to
+ * allocate one such message per thread. We are always expecting a reply
+ * back after sending a query before reusing the buffer. In fact, the
+ * message passing system only serves as a means for synchronization around
+ * the message, which is a shared memory object.
+ */
+struct poll_msg {
+ pthread_msg_t msgnode;
+ /* Passed as parameters */
+ bool cancel;
+ struct pollfd *fds;
+ nfds_t nfds;
+ int timeout;
+ /* Returned as result */
+ int ready, error;
+ /* Internally used */
+ struct timeval expires;
+};
+
+/*
+ * An index is maintained of descriptor number -> poll_msg_index
+ * structures. Each of wich has information on the message the descriptor
+ * belongs to, and the index into the pollfd array so that it be easy to
+ * efficiently do per-fd work.
+ */
+struct poll_idx {
+ int idx;
+ struct poll_msg *msg;
+};
+
+/*
+ * Thread specific needed resources to use our special polling
+ */
+struct poll_data {
+ pthread_port_t port;
+ struct poll_msg msg;
+};
+
+
+
+#define POLLWAKE() do { \
+ pollingevents++; \
+ if (polling != 0) \
+ (void) kill(process_id, SIGUSR2); \
+} while (/* CONSTCOND */0)
+
+
+
+/*
+ * Static functions prototypes
+ */
+static int pthread_poll_proc_init(void);
+static void pthread_poll_proc_init2(void);
+static int pthread_poll_thread_init(struct poll_data **);
+static void pthread_poll_thread_exit(void *);
+static void *poll_thread(void *);
+static int poll_thread_attach_fds(struct poll_msg *);
+static void poll_thread_detach_fds(struct poll_msg *);
+static void poll_thread_sighandler(int);
+
+/*
+ * Static process specific storage
+ */
+static bool pthread_poll_initialized = FALSE;
+static pthread_once_t pthread_poll_proc_initialized = PTHREAD_ONCE_INIT;
+static pthread_key_t pthread_poll_proc_key;
+static pthread_ring_t pthread_poll_thread_started_ring;
+static pthread_port_t pthread_poll_thread_port;
+static pid_t process_id;
+static int polling = 0;
+static int pollingevents = 0;
+
+/*
+ * Static global poll_thread storage. No synhronization is necessary when
+ * using these, since only the polling thread does.
+ */
+static struct poll_idx *poll_idx;
+static nfds_t poll_idx_size;
+static struct pollfd *poll_fds;
+static nfds_t poll_fds_size;
+static nfds_t poll_nfds;
+
+
+
+/*
+ * Static internal functions
+ */
+
+static int
+pthread_poll_proc_init(void)
+{
+ int error;
+ struct sigaction act;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if ((error = pthread_key_create(&pthread_poll_proc_key,
+ pthread_poll_thread_exit)) != 0)
+ goto err;
+
+ act.sa_handler = poll_thread_sighandler;
+ act.sa_flags = 0;
+ (void) sigemptyset(&act.sa_mask);
+ (void) sigaddset(&act.sa_mask, SIGUSR2);
+ if (sigaction(SIGUSR2, &act, NULL) != 0) {
+ error = errno;
+ goto err;
+ }
+
+ process_id = getpid();
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+static void
+pthread_poll_proc_init2(void)
+{
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if ((error = pthread_poll_proc_init()) != 0) {
+ (void) fprintf(stderr, "pthread_poll_proc_init() - %s\n",
+ strerror(error));
+ DEBUG_PTHREAD_EXIT();
+ exit(EXIT_FAILURE);
+ }
+
+ DEBUG_PTHREAD_EXIT();
+}
+
+static int
+pthread_poll_thread_init(struct poll_data **res)
+{
+ int error;
+ struct poll_data *data;
+ sigset_t set;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ (void) sigemptyset(&set);
+ (void) sigaddset(&set, SIGUSR2);
+ (void) pthread_sigmask(SIG_BLOCK, &set, NULL);
+
+ if ((data = malloc(sizeof(struct poll_data))) == NULL) {
+ error = ENOMEM;
+ goto err;
+ }
+
+ if ((error = pthread_port_init(&data->port)) != 0)
+ goto err;
+ if ((error = pthread_msg_init(&data->msg.msgnode, &data->port)) != 0)
+ goto err;
+
+ if ((error = pthread_setspecific(pthread_poll_proc_key, data)) != 0)
+ goto err;
+
+ *res = data;
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+
+err:
+ if (data != NULL) {
+ (void) pthread_port_destroy(&data->port);
+ free(data);
+ }
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+static void
+pthread_poll_thread_exit(void *specific)
+{
+ struct poll_data *data = (struct poll_data *)specific;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ (void) pthread_port_destroy(&data->port);
+ (void) pthread_msg_destroy(&data->msg.msgnode);
+ free(data);
+
+ /*
+ * Some implementations need this
+ */
+ (void) pthread_setspecific(pthread_poll_proc_key, NULL);
+
+ DEBUG_PTHREAD_EXIT();
+}
+
+
+/*
+ * Actual polling thread, with which we communicate using messages polling on
+ * pthread_port_t and pthread_ring_t. This is the only thread that should be
+ * catching SIGUSR2 signals (used to wake us up and reiterate our main loop.
+ * Note: Although less efficient than using kqueue(2) or libevent(3), after
+ * discussion with 3s4i we settled to using poll(2) for now, which minimizes
+ * OS dependencies as well as third party software dependencies. Because
+ * pthread_poll_ring(2) is only sparsely used by our software (migrating from
+ * using PTH library which provided pth_poll_ev()), and that we only provide
+ * it small pollfd arrays, this implementation was considered to meet our
+ * needs using poll(2). This also met the requirements for Tact group.
+ */
+/* ARGSUSED */
+static void *
+poll_thread(void *args)
+{
+ sigset_t set;
+ pthread_ring_t ring;
+ list_t msg_list;
+ register int i;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ /*
+ * This initialization shouldn't fail. If it did, it would be nice to
+ * be able to simply panic eventually. XXX
+ */
+
+ /*
+ * Create set for SIGUSR2 which we'll unblock/block
+ */
+ (void) sigemptyset(&set);
+ (void) sigaddset(&set, SIGUSR2);
+
+ /*
+ * Allocate an initial buffer size for our pollfd array as well as for
+ * our descriptor based index. We'll double these buffers as
+ * necessary at runtime.
+ */
+ poll_fds_size = 64;
+ poll_fds = malloc(sizeof(struct pollfd) * poll_fds_size);
+ poll_nfds = 0;
+ poll_idx_size = 64;
+ poll_idx = malloc(sizeof(struct poll_msg) * poll_idx_size);
+ for (i = 0; i < poll_idx_size; i++)
+ poll_idx[i].msg = NULL;
+ DLIST_INIT(&msg_list);
+
+ /*
+ * Initialize message port and associated ring. The message port is
+ * module global, so that it be public to pthread_poll_ring().
+ */
+ (void) pthread_port_init(&pthread_poll_thread_port);
+ (void) pthread_ring_init(&ring);
+ (void) pthread_port_set_ring(&pthread_poll_thread_port, &ring);
+
+ /*
+ * Notify parent that we're ready.
+ */
+ (void) pthread_ring_notify(&pthread_poll_thread_started_ring);
+
+ /*
+ * Main loop from which we never exit
+ */
+ for (;;) {
+ register int n;
+ int timeout;
+ struct timeval tv, ttv;
+ struct poll_msg *msg, *nextmsg;
+
+ /*
+ * Get time of day in a rather high resolution. We need to
+ * do this to be able to evaluate timeouts later on. We
+ * attempt to only require one time syscall per loop.
+ */
+ (void) gettimeofday(&tv, NULL);
+
+ pollingevents = 0;
+
+ /*
+ * Process any messages. We need to add the descriptors if
+ * they aren't already added. Also store yet unsatisfied
+ * request messages into a list.
+ */
+ while ((msg = (struct poll_msg *)pthread_msg_get(
+ &pthread_poll_thread_port)) != NULL) {
+ if (msg->cancel) {
+ /*
+ * Immediately satisfy request on demand
+ */
+ msg->error = ECANCELED;
+ DLIST_UNLINK(&msg_list, (node_t *)msg);
+ poll_thread_detach_fds(msg);
+ (void) pthread_msg_reply(&msg->msgnode);
+ continue;
+ }
+ if (poll_thread_attach_fds(msg) == 0) {
+ msg->ready = msg->error = 0;
+ if (msg->timeout != -1) {
+ /*
+ * Convert millisecond timeout to an
+ * absolute time timeval
+ */
+ msg->expires.tv_sec = tv.tv_sec;
+ msg->expires.tv_usec = tv.tv_usec;
+ ttv.tv_sec = msg->timeout / 1000;
+ ttv.tv_usec = (msg->timeout % 1000)
+ * 1000;
+ timeradd(&msg->expires, &ttv,
+ &msg->expires);
+ }
+ DLIST_APPEND(&msg_list, (node_t *)msg);
+ } else {
+ msg->ready = 0;
+ msg->error = EINVAL;
+ (void) pthread_msg_reply(&msg->msgnode);
+ }
+ }
+
+ /*
+ * Process timeouts. For request messages which timed out,
+ * satisfy them immediately using ETIMEDOUT error.
+ * This also allows to evaluate which is the soonest to expire
+ * entry, which poll(2) will have to use as timeout.
+ */
+ ttv.tv_sec = ttv.tv_usec = 99999;
+ for (msg = DLIST_TOP(&msg_list); msg != NULL; msg = nextmsg) {
+ nextmsg = DLIST_NEXT(msg);
+
+ if (msg->timeout == -1)
+ continue;
+ if (timercmp(&msg->expires, &tv, <)) {
+ msg->error = ETIMEDOUT;
+ DLIST_UNLINK(&msg_list, (node_t *)msg);
+ poll_thread_detach_fds(msg);
+ (void) pthread_msg_reply(&msg->msgnode);
+ } else if (timercmp(&msg->expires, &ttv, <)) {
+ ttv.tv_sec = msg->expires.tv_sec;
+ ttv.tv_usec = msg->expires.tv_usec;
+ }
+ }
+
+ /*
+ * If there are no registered descriptors to poll for, wait
+ * using the thread friendly ring until messages occur, and
+ * reiterate.
+ */
+ if (poll_nfds == 0) {
+ (void) pthread_ring_wait(&ring, NULL);
+ continue;
+ }
+
+ /*
+ * Perform polling. poll(2) for as much time as possible,
+ * although making sure to allow the soonest to expire query
+ * to stop polling. Next to expire entry time is in ttv and
+ * current time in tv. Calculate difference and convert to
+ * milliseconds.
+ */
+ if (ttv.tv_sec == 99999 && ttv.tv_usec == 99999)
+ timeout = -1;
+ else {
+ timersub(&ttv, &tv, &ttv);
+ timeout = (ttv.tv_sec * 1000) + (ttv.tv_usec / 1000);
+ }
+
+ /*
+ * Unblock the SIGUSR2 signal, which we should be the only
+ * thread to receive, all other threads blocking it.
+ * Only leave it unblocked for the duration of the poll(2)
+ * syscall. We cause our loop to reiterate in any case of
+ * error, EINTR or no file descriptor with pending event.
+ */
+ (void) pthread_sigmask(SIG_UNBLOCK, &set, NULL);
+ polling++;
+
+ n = 0;
+ if (pollingevents != 0)
+ goto unblock;
+
+ n = poll(poll_fds, poll_nfds, timeout);
+
+unblock:
+ polling--;
+ (void) pthread_sigmask(SIG_BLOCK, &set, NULL);
+ if (pollingevents != 0 || n < 1)
+ continue;
+
+ /*
+ * Verify which descriptors have interesting events set,
+ * increasing events counter of corresponding requests.
+ */
+ for (i = 0; n != 0 && i < poll_nfds; i++) {
+ if (poll_fds[i].revents != 0) {
+ (poll_idx[poll_fds[i].fd].msg->ready)++;
+ n--;
+ }
+ }
+ /*
+ * Now verify pending request messages for events, and satisfy
+ * the requests of those who do.
+ */
+ for (msg = DLIST_TOP(&msg_list); msg != NULL; msg = nextmsg) {
+ nextmsg = DLIST_NEXT(msg);
+
+ if (msg->ready != 0) {
+ /*
+ * ready and error fields are already set
+ */
+ DLIST_UNLINK(&msg_list, (node_t *)msg);
+ poll_thread_detach_fds(msg);
+ (void) pthread_msg_reply(&msg->msgnode);
+ }
+ }
+ }
+
+ /* NOTREACHED */
+ DEBUG_PTHREAD_EXIT();
+ pthread_exit(NULL);
+ return NULL;
+}
+
+/*
+ * Permits to merge supplied pollfd set with the main set
+ */
+static int
+poll_thread_attach_fds(struct poll_msg *msg)
+{
+ register int i, fd, idx;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ for (i = 0; i < msg->nfds; i++) {
+ fd = msg->fds[i].fd;
+
+ /*
+ * Ignore unset descriptors
+ */
+ if (fd == -1)
+ continue;
+
+ /*
+ * Grow index buffer if necessary. Either grow by doubling
+ * size, or even more if necessary to hold index to fd.
+ * If we only grew to hold fd, we might need to realloc(3) too
+ * often. Take care to also NULL msg field of new entries.
+ */
+ if (poll_idx_size <= fd) {
+ struct poll_idx *idx;
+ int size, i2;
+
+ size = poll_idx_size * 2;
+ if (fd > size)
+ size = fd;
+ if ((idx = realloc(poll_idx,
+ sizeof(struct poll_idx) * size)) == NULL)
+ goto err;
+ poll_idx = idx;
+ for (i2 = poll_idx_size; i2 < size; i2++)
+ poll_idx[i2].msg = NULL;
+ poll_idx_size = size;
+ }
+
+ /*
+ * Error if descriptor not unique before adding to set.
+ * We do not allow multiple threads polling on the same
+ * descriptor at the same time in our system. We would
+ * otherwise need to gracefully handle duplicates,
+ * multiplexing them, which isn't required at all by our
+ * applications. So let's keep things simple.
+ */
+ if (poll_idx[fd].msg != NULL)
+ goto err;
+
+ /*
+ * Resize pollfd array if needed. Grow by doubling.
+ * This should happen very rarely.
+ * XXX We could check this condition only once at the
+ * top of this fonction and take in consideration the
+ * number of descriptors to add, if wanted for optimization.
+ */
+ if (poll_fds_size <= poll_nfds) {
+ struct pollfd *ptr;
+
+ if ((ptr = realloc(poll_fds,
+ sizeof(struct pollfd) * (poll_fds_size * 2)))
+ == NULL)
+ goto err;
+ poll_fds = ptr;
+ poll_fds_size *= 2;
+ }
+
+ /*
+ * Finally add descriptor to set and register it for indexing.
+ * We simply need to append it to the existing entries in our
+ * global polling set array.
+ */
+ idx = poll_nfds;
+ poll_fds[idx].fd = fd;
+ poll_fds[idx].events = msg->fds[i].events;
+ poll_fds[idx].revents = 0;
+ poll_idx[fd].msg = msg;
+ poll_idx[fd].idx = idx;
+ poll_nfds = ++idx;
+ }
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+
+err:
+ (void) poll_thread_detach_fds(msg);
+
+ DEBUG_PTHREAD_EXIT();
+ return -1;
+}
+
+/*
+ * Permits to disunite supplied pollfd set from the main set. Also sets the
+ * revents fields of the supplied set to the ones of the main set.
+ */
+static void
+poll_thread_detach_fds(struct poll_msg *msg)
+{
+ register int i, fd, idx;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ for (i = 0; i < msg->nfds; i++) {
+ fd = msg->fds[i].fd;
+
+ /*
+ * Make sure fd was properly registered
+ */
+ if (poll_idx[fd].msg != msg)
+ continue;
+
+ /*
+ * Find index in global pollfd set for this fd
+ */
+ idx = poll_idx[fd].idx;
+
+ /*
+ * Update pollfd entry according to global one
+ */
+ msg->fds[i].revents = poll_fds[idx].revents;
+
+ /*
+ * Unlink fd from the global set. The removal method is
+ * simple; Take the last entry of the global set and move it
+ * over the current entry, updating index links, and lower
+ * the gobal nfds by one. If we're the last entry, simply
+ * remove it invalidating its index entry lowering the global
+ * nfds.
+ */
+
+ if (--poll_nfds != idx) {
+ /*
+ * Not last entry, move last entry over entry to
+ * delete.
+ */
+ register struct pollfd *deleted, *last;
+ int deleted_fd, deleted_idx;
+
+ last = &poll_fds[poll_nfds];
+ deleted = &poll_fds[idx];
+ deleted_fd = deleted->fd;
+ deleted_idx = poll_idx[deleted_fd].idx;
+
+ /* Copy last entry over deleted one */
+ deleted->fd = last->fd;
+ deleted->events = last->events;
+ deleted->revents = last->revents;
+
+ /*
+ * Reindex last entry which was moved, don't touch
+ * the msg pointer though.
+ */
+ poll_idx[last->fd].idx = deleted_idx;
+
+ /* And finally invalidate last entry */
+ poll_idx[deleted_fd].msg = NULL;
+ } else {
+ /* Invalidate last entry */
+ poll_idx[poll_fds[poll_nfds].fd].msg = NULL;
+ }
+ }
+
+ DEBUG_PTHREAD_EXIT();
+}
+
+/*
+ * Called upon reception of SIGUSR2
+ */
+/* ARGSUSED */
+static void
+poll_thread_sighandler(int sig)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+
+ pollingevents++;
+
+ DEBUG_PTHREAD_EXIT();
+}
+
+
+
+/*
+ * Public API exported functions
+ */
+
+/*
+ * Must be called before launching any thread. Sets up the signal mask and
+ * launches the dedicated poll slave thread. Important note: this system
+ * clobbers the SIGUSR2 signal, which the application can no longer use for
+ * other purposes. The only solution to wake the thread manager thread from
+ * poll(2) is either to trigger an event through a dedicated filedescriptor,
+ * or to send a signal to the process which only the polling thread allows.
+ */
+int
+pthread_poll_init(void)
+{
+ int error;
+ sigset_t set;
+ pthread_attr_t attr;
+ pthread_t thread;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (pthread_poll_initialized) {
+ error = 0;
+ goto err;
+ }
+
+ /*
+ * First block SIGUSR2 signal in the parent. The reason why this must
+ * be called before the application launches any thread is that
+ * threads inherit the sigmask of their parent, and that all threads,
+ * but the polling thread, must block the signal. This ensures that
+ * only the wanted thread wakes up when a SIGUSR2 signal is received.
+ * This way, we can interrupt the polling thread in poll(2), for
+ * instance, and cause it to reiterate its main loop.
+ */
+ (void) sigemptyset(&set);
+ (void) sigaddset(&set, SIGUSR2);
+ if ((error = pthread_sigmask(SIG_BLOCK, &set, NULL)) != 0)
+ goto err;
+
+ /*
+ * We'll use this pthread_ring_t to get notification from child that
+ * it is ready to process requests before proceeding.
+ */
+ if ((error = pthread_ring_init(&pthread_poll_thread_started_ring))
+ != 0)
+ goto err;
+
+ /*
+ * We may now launch the poll thread and wait for notification from it
+ * that it is ready to serve requests. We won't need to exit this
+ * thread, so it can be launched in detached state.
+ */
+ if ((error = pthread_attr_init(&attr)) != 0)
+ goto err;
+ if ((error = pthread_attr_setdetachstate(&attr, TRUE)) != 0)
+ goto err;
+ if ((error = pthread_create(&thread, &attr, poll_thread, NULL)) != 0)
+ goto err;
+
+ /*
+ * Wait until thread is ready to serve requests
+ */
+ (void) pthread_ring_wait(&pthread_poll_thread_started_ring, NULL);
+
+ pthread_poll_initialized = TRUE;
+
+ return 0;
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * poll(2) replacement which can also be awakened by a notification happening
+ * on the specified ring. This for instance allows to process thread messages
+ * as well as descriptor events. Like poll(2), returns the number of
+ * descriptors with events on success (can be 0), or returns -1 with the
+ * specified error set in errno. Unlike poll, the error ETIMEDOUT will occur
+ * if the timeout expires before an event existed, or ECANCELLED if a ring
+ * notification event occurred instead of a filedescriptor one. Can also
+ * return errors such as EINVAL.
+ * XXX Check for ETIMEDOUT! We probably don't do this yet. Also, we could
+ * return 0 in this case like poll(2).
+ */
+int
+pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout,
+ pthread_ring_t *ring)
+{
+ int error;
+ struct poll_data *data;
+ pthread_ring_t *oring;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (!pthread_poll_initialized) {
+ error = EINVAL;
+ goto err;
+ }
+
+ /*
+ * Implicit process and thread specific initializations
+ */
+ if ((error = pthread_once(&pthread_poll_proc_initialized,
+ pthread_poll_proc_init2)) != 0)
+ goto err;
+ /*
+ * XXX Use a mutex or pthread_once() equivalent here too?
+ */
+ if ((data = pthread_getspecific(pthread_poll_proc_key)) == NULL) {
+ if ((error = pthread_poll_thread_init(&data)) != 0)
+ goto err;
+ }
+
+ /*
+ * Perform some sanity checking on supplied arguments
+ */
+ if (fds == NULL || nfds < 1 || ring == NULL || ring->magic !=
+ PRING_MAGIC) {
+ error = EINVAL;
+ goto err;
+ }
+
+ /*
+ * Ensure that our message port's ring uses the same ring which
+ * the user supplies us. If we didn't do this we would need to
+ * be able to wait for events on more than one ring simultaneously.
+ * Because we don't have a ring multiplexer object yet (which would
+ * be needed since a ring maps to a conditional variable among other
+ * things), we need to do process this way.
+ * XXX Could there be a race condition here? It needs to be stressed.
+ */
+ {
+ int mevent;
+
+ mevent = (data->port.ring != NULL ?
+ data->port.ring->mevent : 0);
+ oring = data->port.ring;
+ (void) pthread_port_set_ring(&data->port, ring);
+ data->port.ring->mevent = mevent;
+ }
+
+ /*
+ * Send query to polling thread. It is safe to simply reuse our
+ * message since we then expect a reply back and synchronize it.
+ */
+ data->msg.cancel = FALSE;
+ data->msg.fds = fds;
+ data->msg.nfds = nfds;
+ data->msg.timeout = timeout;
+ if ((error = pthread_msg_put(&pthread_poll_thread_port,
+ &data->msg.msgnode)) != 0)
+ goto err;
+
+ /*
+ * Interrupt polling thread which may still be waiting in poll(2).
+ * We do this by sending SIGUSR2 to the process, which only the
+ * polling thread is not blocking. This causes the thread to reiterate
+ * its main loop, thus processing this message and going back to
+ * sleep in poll(2).
+ */
+ POLLWAKE();
+
+ /*
+ * Wait until en event occurs and notifies our ring. An event could
+ * either be triggered by the poll request ending or by another
+ * interrupting event on the supplied ring. If a message is queued
+ * on the port between pthread_port_set_ring() and
+ * pthread_ring_wait(), the latter immediately returns.
+ */
+ if ((error = pthread_ring_wait(ring, NULL)) != 0)
+ goto err;
+ if (pthread_msg_get(&data->port) == NULL) {
+ /*
+ * No message replied back from poll thread yet, this means
+ * that our ring was notified by another event. Cancel request
+ * by sending event back with the cancel flag, and wait for
+ * reply message to occur (which will be the original request
+ * results we were waiting for). error field will be set to
+ * ECANCELED by the poll thread.
+ */
+ data->msg.cancel = TRUE;
+ (void) pthread_msg_put(&pthread_poll_thread_port,
+ &data->msg.msgnode);
+ POLLWAKE();
+ while (pthread_msg_get(&data->port) == NULL)
+ (void) pthread_ring_wait(ring, NULL);
+ }
+ /* Unclobber user supplied ring from our port events */
+ (void) pthread_port_set_ring(&data->port, oring);
+
+ /*
+ * Error, return error number.
+ */
+ if (data->msg.error != 0) {
+ error = data->msg.error;
+ goto err;
+ }
+
+ /*
+ * Success, return number of descriptors with detected events.
+ */
+ DEBUG_PTHREAD_EXIT();
+ return data->msg.ready;
+
+err:
+ errno = error;
+
+ DEBUG_PTHREAD_EXIT();
+ return -1;
+}
+
+/*
+ * accept(2) replacement which can both observe a timeout and be interrupted
+ * via pthread_ring_t events. Internally implemented using
+ * pthread_poll_ring(). Will internally set the descriptor in non-blocking
+ * mode if necessary, then reverting it to the mode it was supplied in.
+ * Returns a new descriptor on success, or -1 on error, in which case errno
+ * is set. errno can then be EINVAL, ETIMEDOUT, ECANCELED, or others.
+ * Timeout is in milliseconds, like for poll(2) and can be -1.
+ */
+int
+pthread_accept_ring(int s, struct sockaddr *addr, socklen_t *addrlen,
+ int timeout, pthread_ring_t *ring)
+{
+ int oflags, nflags, d, error = 0;
+ struct pollfd fd;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (!pthread_poll_initialized) {
+ errno = EINVAL;
+ goto err;
+ }
+
+ /*
+ * First get current fcntl status flags, and set descriptor to
+ * non-blocking mode if necessary.
+ */
+ if ((oflags = nflags = fcntl(s, F_GETFL)) == -1)
+ goto err;
+ if ((oflags & O_NONBLOCK) == 0) {
+ nflags |= O_NONBLOCK;
+ if (fcntl(s, F_SETFL, nflags) == -1)
+ goto err;
+ }
+
+ if ((d = accept(s, addr, addrlen)) == -1) {
+ if (errno != EAGAIN) /* XXX Add others? */
+ goto end;
+ } else
+ goto end;
+
+ /*
+ * EAGAIN, poll until completion, timeout or ring event.
+ */
+ fd.fd = d;
+ fd.events = POLLIN;
+ if ((error = pthread_poll_ring(&fd, 1, timeout, ring)) == 1 &&
+ (fd.revents & POLLIN) != 0)
+ error = 0;
+ else
+ error = errno;
+
+end:
+ /*
+ * Restore supplied descriptor fcntl status flags if necessary
+ */
+ if (nflags != oflags)
+ (void) fcntl(s, F_SETFL, oflags);
+
+ if (error != 0) {
+ if (d != -1) {
+ (void) close(d);
+ d = -1;
+ }
+ errno = error;
+ goto err;
+ }
+
+ DEBUG_PTHREAD_EXIT();
+ return d;
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return -1;
+}
+
+/*
+ * connect(2) replacement which can both observe a timeout and be interrupted
+ * via pthread_ring_t events. Internally implemented using
+ * pthread_poll_ring(). Will internally set the descriptor in non-blocking
+ * mode if necessary, then reverting it back to the mode it was supplied in.
+ * Returns 0 on success, or -1, in which case errno is set. errno can be
+ * EINVAL, ETIMEDOUT, ECANCELED or others.
+ * Timeout is in milliseconds, like for poll(2) and can be -1.
+ * For the application to know the actual connection status result, it should
+ * poll until completion and verify the status using getsockopt(2) with
+ * SOL_SOCKET level and SO_ERROR option. It can alternatively continue to call
+ * this function in a loop until completion. Calling the function on an
+ * already connected socket will result in EISCONN.
+ */
+int
+pthread_connect_ring(int s, const struct sockaddr *name, socklen_t namelen,
+ int timeout, pthread_ring_t *ring)
+{
+ int oflags, nflags, error = 0;
+ struct pollfd fd;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (!pthread_poll_initialized) {
+ errno = EINVAL;
+ goto err;
+ }
+
+ /*
+ * First get current fcntl status flags, and set descriptor to
+ * non-blocking mode if necessary.
+ */
+ if ((oflags = nflags = fcntl(s, F_GETFL)) == -1)
+ goto err;
+ if ((oflags & O_NONBLOCK) == 0) {
+ nflags |= O_NONBLOCK;
+ if (fcntl(s, F_SETFL, nflags) == -1)
+ goto err;
+ }
+
+ if ((error = connect(s, name, namelen)) == -1) {
+ if (errno != EINPROGRESS && errno != EALREADY) {
+ error = errno;
+ goto end;
+ }
+ } else
+ goto end;
+
+ /*
+ * EINPROGRESS or EALREADY, poll until completion, timeout or ring
+ * event.
+ */
+ fd.fd = s;
+ fd.events = POLLOUT;
+ if (pthread_poll_ring(&fd, 1, timeout, ring) == 1 &&
+ (fd.revents & POLLOUT) != 0) {
+ socklen_t l;
+
+ /*
+ * connect(2) completed, return result
+ */
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &error, &l) == -1)
+ error = errno;
+ }
+
+end:
+ /*
+ * Restore supplied descriptor fcntl status flags if necessary
+ */
+ if (nflags != oflags)
+ (void) fcntl(s, F_SETFL, oflags);
+
+ if (error != 0) {
+ errno = error;
+ goto err;
+ }
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return -1;
+}
--- /dev/null
+/* $Id: mm_pthread_poll.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_POLL_H
+#define MM_PTHREAD_POLL_H
+
+
+
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <poll.h>
+
+#include <mm_pthread_msg.h>
+
+
+
+extern int pthread_poll_init(void);
+extern int pthread_poll_ring(struct pollfd *, nfds_t, int,
+ pthread_ring_t *);
+extern int pthread_accept_ring(int, struct sockaddr *, socklen_t *, int,
+ pthread_ring_t *);
+extern int pthread_connect_ring(int, const struct sockaddr *, socklen_t,
+ int, pthread_ring_t *);
+
+
+
+#endif
--- /dev/null
+/* $Id: mm_pthread_pool.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2004-2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Implementation of a pool of ready threads which adapts with concurrency
+ * needs. These ready threads can serve requests passed through efficient
+ * inter-thread messaging. mmpool(3) is used for the pool functionality.
+ */
+
+
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <errno.h>
+
+#include <mm_pthread_debug.h>
+#include <mm_pthread_pool.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2004-2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: mm_pthread_pool.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
+
+
+
+/*
+ * STATIC FUNCTIONS PROTOTYPES
+ */
+
+inline static pthread_object_t *thread_object_alloc(void);
+inline static void thread_object_free(pthread_object_t *);
+static bool thread_object_constructor(pnode_t *);
+static void thread_object_destructor(pnode_t *);
+static void *thread_object_main(void *);
+
+
+
+/*
+ * GLOBALS
+ */
+
+static bool thread_object_initialized = FALSE;
+static pthread_attr_t thread_object_attr;
+static pool_t thread_object_pool;
+static pool_t thread_object_msg_pool;
+static pthread_mutex_t thread_object_pool_mutex =
+ PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t thread_object_msg_pool_mutex =
+ PTHREAD_MUTEX_INITIALIZER;
+static pthread_ring_t thread_started_ring;
+
+
+
+/*
+ * EXPORTED PUBLIC FUNCTIONS
+ */
+
+/*
+ * Must be called to initialize the pthreads pool subsystem, before calling
+ * any other function of this API. Returns 0 on success, or an error number.
+ * <initial> threads are launched, and more will be launched in increments
+ * of <initial> whenever necessary. These will also only be destroyed in
+ * decrements of <initial> whenever that many threads have not been in use for
+ * some time, and a minimum of <initial> threads will always be kept.
+ * Setting <initial> to high values may actually degrade performance with some
+ * unefficient threading implementations. It is not recommended to use more
+ * than 8 using the pth(3) library. Using NetBSD 2.0+ SA threads, a high
+ * number does not reduce performance. We current do not observe any limit
+ * whatsoever according to the number of threads launched over time. It is the
+ * application's responsibility to ensure to observe decent concurrency limits
+ * before calling pthread_object_call().
+ */
+int
+pthread_object_init(int initial)
+{
+ int error = 0;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (thread_object_initialized) {
+ error = EINVAL;
+ goto err;
+ }
+
+ /*
+ * Create attributes which will be used for threads of the pool.
+ * We want them to be joinable.
+ */
+ if ((error = pthread_attr_init(&thread_object_attr)) != 0)
+ goto err;
+ if ((error = pthread_attr_setdetachstate(&thread_object_attr, 0))
+ != 0)
+ goto err;
+
+ /*
+ * We use this ring to obtain notification of ready children when
+ * launching them. This is required for proper synchronization to
+ * avoid aweful race conditions.
+ */
+ if ((error = pthread_ring_init(&thread_started_ring)) != 0)
+ goto err;
+
+ /*
+ * First initialize the message subsystem pool
+ */
+ if (!pool_init(&thread_object_msg_pool, "thread_object_msg_pool",
+ malloc, free, NULL, NULL, sizeof(pthread_object_msg_t),
+ 32768 / sizeof(pthread_object_msg_t), 1, 0)) {
+ error = ENOMEM;
+ goto err;
+ }
+
+ /*
+ * Now initialize the threads pool. This creates threads, uses
+ * synchronization with thread_started_ring, and uses the message
+ * subsystem, which all must be initialized and ready.
+ */
+ if (!pool_init(&thread_object_pool, "thread_object_pool",
+ malloc, free, thread_object_constructor, thread_object_destructor,
+ sizeof(pthread_object_t), initial, 1, 0)) {
+ error = ENOMEM;
+ goto err;
+ }
+
+ thread_object_initialized = TRUE;
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+
+err:
+ if (POOL_VALID(&thread_object_msg_pool))
+ pool_destroy(&thread_object_msg_pool);
+ if (POOL_VALID(&thread_object_pool))
+ pool_destroy(&thread_object_pool);
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Allows allocation/creation of a message suitable for asynchronous requests
+ * with the threads via their main message port provided by this system.
+ * Returns new message, or NULL on error.
+ */
+inline pthread_object_msg_t *
+pthread_object_msg_alloc(void)
+{
+ pthread_object_msg_t *msg = NULL;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (pthread_mutex_lock(&thread_object_msg_pool_mutex) != 0)
+ goto err;
+ msg = (pthread_object_msg_t *)pool_alloc(&thread_object_msg_pool,
+ FALSE);
+ (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex);
+
+ (void) pthread_msg_init(&msg->message, NULL);
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return msg;
+}
+
+/*
+ * Permits to free/destroy a message which was allocated using
+ * pthread_object_msg_alloc() and sent asynchroneously.
+ */
+inline int
+pthread_object_msg_free(pthread_object_msg_t *msg)
+{
+ int error = 0;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ (void) pthread_msg_destroy(&msg->message);
+
+ if ((error = pthread_mutex_lock(&thread_object_msg_pool_mutex)) != 0)
+ goto err;
+ (void) pool_free((pnode_t *)msg);
+ (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex);
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Allows to invoke a thread of the pool to perform execution of the wanted
+ * function. This is very efficient since the threads are already created and
+ * are waiting for requests. There is no maximum concurrency limit enforced by
+ * this system; It is the responsibility of the application to restrict
+ * concurrency as necessary by keeping internal information on the current
+ * number of requests. 0 is returned on success, or an error number.
+ * XXX Add support for synchroneous and asynchroneous operation. Current
+ * operation is only asynchroneous, but we would like to add a boolean here to
+ * decide. We also could add back the result value of the thread function
+ * which would only be useful in synchroneous operation, when we are waiting
+ * until the task ends... Of course, it's still easy for applications to use
+ * these in a synchroneous manner, by using a message and/or ring,
+ * conditionnal variable, etc.
+ * Also evaluate if a callback function to be called to notify end of
+ * asynchroneous operation would be useful.
+ */
+int
+pthread_object_call(pthread_port_t **port,
+ void (*function)(pthread_object_t *, void *), void *args)
+{
+ pthread_object_t *obj = NULL;
+ pthread_object_msg_t *msg = NULL;
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (function == NULL) {
+ error = EINVAL;
+ goto err;
+ }
+
+ /*
+ * Allocate a thread from the pool to reserve it, and tell it to call
+ * a function via a message. The message cannot be on the stack in
+ * this case, since it holds arguments to be passed to a thread, and
+ * also consists of an asynchroneous message for wich we do not expect
+ * a response back, waiting for it. We just dispatch it and go on.
+ */
+ if ((obj = thread_object_alloc()) == NULL) {
+ error = ENOMEM;
+ goto err;
+ }
+ if ((msg = pthread_object_msg_alloc()) == NULL) {
+ error = ENOMEM;
+ goto err;
+ }
+
+ msg->command = PTHREAD_OBJ_CALL;
+ msg->u.call.function = function;
+ msg->u.call.arguments = args;
+ if ((error = pthread_msg_put(obj->port, &msg->message)) != 0)
+ goto err;
+
+ /*
+ * Everything successful;
+ * If caller wants the message port of the thread, supply it
+ */
+ if (port != NULL)
+ *port = obj->port;
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+
+err:
+ if (msg != NULL)
+ pthread_object_msg_free(msg);
+ if (obj != NULL)
+ thread_object_free(obj);
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+
+
+/*
+ * INTERNAL STATIC FUNCTIONS
+ */
+
+/*
+ * Internally used to allocate a ready thread from the pool.
+ */
+inline static pthread_object_t *
+thread_object_alloc(void)
+{
+ pthread_object_t *obj = NULL;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (pthread_mutex_lock(&thread_object_pool_mutex) != 0)
+ goto err;
+ obj = (pthread_object_t *)pool_alloc(&thread_object_pool, FALSE);
+ (void) pthread_mutex_unlock(&thread_object_pool_mutex);
+
+err:
+ return obj;
+}
+
+/*
+ * Internally used to free a no longer needed thread back to the pool of ready
+ * threads.
+ */
+inline static void
+thread_object_free(pthread_object_t *obj)
+{
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if (pthread_mutex_lock(&thread_object_pool_mutex) == 0) {
+ (void) pool_free((pnode_t *)obj);
+ (void) pthread_mutex_unlock(&thread_object_pool_mutex);
+ }
+
+ DEBUG_PTHREAD_EXIT();
+}
+
+/*
+ * Internally called by mmpool(3) to create a thread object.
+ */
+static bool
+thread_object_constructor(pnode_t *pnode)
+{
+ pthread_object_t *obj = (pthread_object_t *)pnode;
+ int success = TRUE;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ /*
+ * Note that we leave thread_object_main() initialize the port field
+ * when it creates its port and ring.
+ */
+ if (pthread_create(&obj->thread, &thread_object_attr,
+ thread_object_main, obj) != 0) {
+ success = FALSE;
+ goto err;
+ }
+
+ /*
+ * Wait until new thread ready notification. Without this, at least
+ * with NetBSD 2.0 SA threads, hell would break loose. Thread creation
+ * isn't really a bottleneck in our case anyways, since we only need
+ * to do it when all threads of the pool are already busy.
+ */
+ (void) pthread_ring_wait(&thread_started_ring, NULL);
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return success;
+}
+
+/*
+ * Internally called by mmpool(3) to destroy a thread object.
+ */
+static void
+thread_object_destructor(pnode_t *pnode)
+{
+ pthread_object_t *obj = (pthread_object_t *)pnode;
+ pthread_object_msg_t *msg;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ /*
+ * To be freed, the thread has to be terminated. We thus send it a
+ * quit message and then wait for it to exit using pthread_join().
+ * Note that we let the thread destroy the port field. Although we
+ * theoretically could use a message on the stack here, let's be safe.
+ * Thread destruction is only performed rarely anyways, so this isn't
+ * a performance problem.
+ */
+ if ((msg = pthread_object_msg_alloc()) != NULL) {
+ msg->command = PTHREAD_OBJ_QUIT;
+ (void) pthread_msg_put(obj->port, &msg->message);
+ }
+ (void) pthread_join(obj->thread, NULL);
+
+ DEBUG_PTHREAD_EXIT();
+}
+
+/*
+ * Actual thread's main loop. We create a message port and listen for command
+ * messages (quit and call). When we obtain a quit request, we destroy the
+ * port and exit cleanly. The quit event can never occur during the execution
+ * of a call command, since it is only called on already freed thread nodes
+ * (by mmpool(3) pool_free()). It is advized to applications which need to
+ * obtain and use the port of the thread after thread_object_call() to only
+ * send proper user messages, not system reserved ones.
+ */
+static void *
+thread_object_main(void *args)
+{
+ pthread_object_t *obj = (pthread_object_t *)args;
+ pthread_port_t port;
+ pthread_ring_t ring;
+ pthread_msg_t *imsg;
+ pthread_object_msg_t *msg;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ /*
+ * Create our incomming message port as well as its corresponding
+ * notification ring we can sleep on. Then advertize our port address.
+ * Ideally, we should somehow panic if any of this initialization
+ * fails. XXX
+ */
+ (void) pthread_port_init(&port);
+ (void) pthread_ring_init(&ring);
+ (void) pthread_port_set_ring(&port, &ring);
+ obj->port = &port;
+
+ /*
+ * Notify parent that we are ready, so that it may proceed
+ */
+ (void) pthread_ring_notify(&thread_started_ring);
+
+ /*
+ * Main loop, which keeps executing until we obtain a PTHREAD_OBJ_QUIT
+ * message, at which event we cleanly exit.
+ */
+ for (;;) {
+ /*
+ * Wait for any message(s) to be available, without taking any
+ * CPU time.
+ */
+ (void) pthread_ring_wait(&ring, NULL);
+
+ /*
+ * We were awaken because at least one message is available.
+ * Process all messages in the queue.
+ */
+ while ((imsg = pthread_msg_get(&port)) != NULL) {
+ msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]);
+ if (msg->command == PTHREAD_OBJ_QUIT) {
+ /*
+ * We are ordered to exit by the object
+ * destructor.
+ */
+ pthread_object_msg_free(msg);
+ goto end;
+ }
+ if (msg->command == PTHREAD_OBJ_CALL) {
+ /*
+ * Request to execute a function. This means
+ * that we were allocated/reserved first.
+ */
+ msg->u.call.function(obj,
+ msg->u.call.arguments);
+ pthread_object_msg_free(msg);
+ /*
+ * Free/release us back, so that we be
+ * available again to process further
+ * requests. It is possible that freeing
+ * ourselves cause a PTHREAD_OBJ_QUIT message
+ * to be queued soon on our port by the
+ * destructor function. This is safe, since
+ * the destructor does not cause us to be
+ * destroyed until it waits for us to have
+ * ended cleanly using pthread_join().
+ */
+ thread_object_free(obj);
+ }
+ }
+ }
+
+end:
+ /*
+ * Discard messages that are still queued on our port (if any)
+ */
+ while ((imsg = pthread_msg_get(&port)) != NULL) {
+ msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]);
+ pthread_object_msg_free(msg);
+ }
+ /*
+ * Free our resources and exit.
+ */
+ (void) pthread_port_destroy(&port);
+ (void) pthread_ring_destroy(&ring);
+
+ DEBUG_PTHREAD_EXIT();
+ pthread_exit(NULL);
+
+ /* NOTREACHED */
+ return NULL;
+}
--- /dev/null
+/* $Id: mm_pthread_pool.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2004-2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_POOL_H
+#define MM_PTHREAD_POOL_H
+
+
+
+#include <pthread.h>
+
+#include <mmtypes.h>
+#include <mmpool.h>
+
+#include <mm_pthread_msg.h>
+
+
+
+typedef struct {
+ pnode_t node;
+ pthread_t thread;
+ pthread_port_t *port;
+} pthread_object_t;
+
+typedef struct {
+ pnode_t node;
+ pthread_msg_t message;
+ int command;
+ union {
+ /* PTHREAD_OBJ_CALL, sent to thread_object_main() */
+ struct {
+ void (*function)(pthread_object_t *, void *);
+ void *arguments;
+ } call;
+ /* PTHREAD_OBJ_QUIT, sent to thread_oject_reaper() */
+ pthread_object_t *quit;
+ /* PTHREAD_OBJ_USER, custom user messages */
+ struct {
+ int user_command;
+ void *user_data;
+ } user;
+ } u;
+} pthread_object_msg_t;
+
+enum pthread_object_commands {
+ PTHREAD_OBJ_CALL,
+ PTHREAD_OBJ_QUIT,
+ PTHREAD_OBJ_USER,
+ PTHREAD_OBJ_MAX
+};
+
+
+
+extern int pthread_object_init(int);
+extern inline pthread_object_msg_t *pthread_object_msg_alloc(void);
+extern inline int pthread_object_msg_free(
+ pthread_object_msg_t *);
+extern int pthread_object_call(pthread_port_t **,
+ void (*)(pthread_object_t *,
+ void *), void *);
+
+
+
+#endif
--- /dev/null
+/* $Id: mm_pthread_sleep.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#include <sys/types.h>
+#include <sys/time.h>
+#include <pthread.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+
+#include <mm_pthread_debug.h>
+#include <mm_pthread_msg.h>
+#include <mm_pthread_sleep.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: mm_pthread_sleep.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $");
+
+
+
+static int pthread_sleep_proc_init(void);
+static void pthread_sleep_proc_init2(void);
+static int pthread_sleep_thread_init(pthread_ring_t **);
+static void pthread_sleep_thread_exit(void *);
+
+static pthread_key_t pthread_sleep_proc_key;
+static pthread_once_t pthread_sleep_proc_initialized = PTHREAD_ONCE_INIT;
+
+
+
+static int
+pthread_sleep_proc_init(void)
+{
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ error = pthread_key_create(&pthread_sleep_proc_key,
+ pthread_sleep_thread_exit);
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+static void
+pthread_sleep_proc_init2(void)
+{
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if ((error = pthread_sleep_proc_init()) != 0) {
+ (void) fprintf(stderr, "pthread_sleep_proc_init() - %s\n",
+ strerror(error));
+ DEBUG_PTHREAD_EXIT();
+ exit(EXIT_FAILURE);
+ }
+
+ DEBUG_PTHREAD_EXIT();
+}
+
+static int
+pthread_sleep_thread_init(pthread_ring_t **res)
+{
+ int error;
+ pthread_ring_t *ring;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ if ((ring = malloc(sizeof(pthread_ring_t))) == NULL) {
+ error = ENOMEM;
+ goto err;
+ }
+
+ if ((error = pthread_ring_init(ring)) != 0)
+ goto err;
+
+ if ((error = pthread_setspecific(pthread_sleep_proc_key, ring)) != 0)
+ goto err;
+
+ *res = ring;
+
+ DEBUG_PTHREAD_EXIT();
+ return 0;
+
+err:
+ if (ring != NULL)
+ free(ring);
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+static void
+pthread_sleep_thread_exit(void *specific)
+{
+ pthread_ring_t *ring = (pthread_ring_t *)specific;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ (void) pthread_ring_destroy(ring);
+ free(ring);
+
+ /*
+ * Although NetBSD threads don't need this, some pthread
+ * implementations do. Some will crash for attempting to reference the
+ * already freed memory twice calling us again until we NULL the
+ * pointer for the data. Lame, but the POSIX standard was unclear
+ * about this.
+ */
+ (void) pthread_setspecific(pthread_sleep_proc_key, NULL);
+
+ DEBUG_PTHREAD_EXIT();
+}
+
+
+
+/*
+ * Suspends the calling thread for duration specified in supplied timespec.
+ * Returns 0 on success, or an error number.
+ */
+int
+pthread_nanosleep(struct timespec *ts)
+{
+ int error;
+ struct timeval tv;
+ struct timespec its;
+ pthread_ring_t *ring;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ /*
+ * Process specific initialization if needed
+ */
+ if ((error = pthread_once(&pthread_sleep_proc_initialized,
+ pthread_sleep_proc_init2)) != 0)
+ goto err;
+ /*
+ * Thread specific initialization if needed
+ * XXX Use pthread_once() here too, or mutex around ring?
+ */
+ if ((ring = pthread_getspecific(pthread_sleep_proc_key)) == NULL) {
+ if ((error = pthread_sleep_thread_init(&ring)) != 0)
+ goto err;
+ }
+
+ /*
+ * Generate absolute time timespec using current time and supplied
+ * timespec delay.
+ */
+ if (gettimeofday(&tv, NULL) == -1) {
+ error = errno;
+ goto err;
+ }
+ TIMEVAL_TO_TIMESPEC(&tv, &its);
+ timespecadd(&its, ts, &its);
+
+ /*
+ * We can finally sleep. We expect ETIMEDOUT to be the normal return
+ * value in this case, which we convert to a no-error. Other errors
+ * will be returned un changed.
+ */
+ if ((error = pthread_ring_wait(ring, &its)) == ETIMEDOUT)
+ error = 0;
+
+err:
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Suspends the current thread for the duration specified into supplied
+ * timeval. Returns 0 on success or an error number.
+ */
+int
+pthread_microsleep(struct timeval *tv)
+{
+ struct timespec ts;
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ TIMEVAL_TO_TIMESPEC(tv, &ts);
+ error = pthread_nanosleep(&ts);
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Suspends execution of current thread for duration of specified
+ * milliseconds. Returns 0 on success or an error number.
+ */
+int
+pthread_millisleep(unsigned int ms)
+{
+ struct timeval tv;
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ tv.tv_sec = ms / 1000;
+ tv.tv_usec = (ms % 1000) * 1000;
+ error = pthread_microsleep(&tv);
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Suspends execution of thread for duration of specified number of seconds.
+ * Returns 0 on success or an error number.
+ */
+unsigned int
+pthread_sleep(unsigned int seconds)
+{
+ struct timespec ts;
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ ts.tv_sec = seconds;
+ ts.tv_nsec = 0;
+ error = pthread_nanosleep(&ts);
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
+
+/*
+ * Suspends execution of thread for durection of specified number of
+ * microseconds. Like usleep(3).
+ */
+int
+pthread_usleep(useconds_t ms)
+{
+ struct timeval tv;
+ int error;
+
+ DEBUG_PTHREAD_ENTRY();
+
+ tv.tv_sec = 0;
+ tv.tv_usec = ms;
+ error = pthread_microsleep(&tv);
+
+ DEBUG_PTHREAD_EXIT();
+ return error;
+}
--- /dev/null
+/* $Id: mm_pthread_sleep.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#ifndef MM_PTHREAD_SLEEP_H
+#define MM_PTHREAD_SLEEP_H
+
+
+
+#include <sys/time.h>
+#include <pthread.h>
+#include <unistd.h>
+
+
+
+extern int pthread_nanosleep(struct timespec *);
+extern int pthread_microsleep(struct timeval *);
+extern int pthread_millisleep(unsigned int);
+extern unsigned int pthread_sleep(unsigned int);
+extern int pthread_usleep(useconds_t);
+
+
+
+#endif
--- /dev/null
+/* $Id: msg_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#include <stdio.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdarg.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: msg_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $");
+
+
+
+#define THREADS 32
+#define ROUNDS 8
+#define TIMEOUT 1
+/*#define PRINTLOCK*/
+/*#define NOPRINT*/
+
+
+
+struct message {
+ pthread_msg_t node;
+ int id, i;
+};
+
+
+
+int main(void);
+static void threadfunc(pthread_object_t *, void *);
+static void printfunc(const char *, ...);
+
+
+
+static pthread_port_t main_port;
+static pthread_mutex_t print_lock;
+
+
+
+int
+main(void)
+{
+ pthread_ring_t ring;
+ struct message *msg;
+ int i, err;
+ int threads_args[THREADS];
+ struct timeval tv;
+ struct timespec ts, ts1;
+
+ if ((err = pthread_mutex_init(&print_lock, NULL)) != 0) {
+ (void) printf("main() - stdout lock - %s\n", strerror(err));
+ exit(EXIT_FAILURE);
+ }
+
+ if ((err = pthread_port_init(&main_port)) != 0 ||
+ (err = pthread_ring_init(&ring)) != 0 ||
+ (err = pthread_port_set_ring(&main_port, &ring)) != 0) {
+ printfunc("main() - initialization - %s\n", strerror(err));
+ exit(EXIT_FAILURE);
+ }
+
+ printfunc("Main: launching threads\n");
+
+ if ((err = pthread_poll_init()) != 0) {
+ printfunc("main() - pthread_poll_init() - %s\n",
+ strerror(err));
+ exit(EXIT_FAILURE);
+ }
+
+ /*
+ * Initializes a poll of ready threads which can be dispatched
+ * functions to execute.
+ */
+ if ((err = pthread_object_init(THREADS + 1)) != 0) {
+ printfunc("main() - pthread_object_init() - %s\n",
+ strerror(err));
+ exit(EXIT_FAILURE);
+ }
+
+ /*
+ * Now dispatch a main reentrant function to many threads, without
+ * waiting for them to complete, in an asynchroneous manner.
+ * XXX Because of the way this works, the parent main thread should
+ * actually already be listening to messages... We did create a port
+ * however, which should queue messages until we reach the main loop.
+ */
+ for (i = 0; i < THREADS; i++) {
+ threads_args[i] = i;
+ if ((err = pthread_object_call(NULL, threadfunc,
+ &threads_args[i])) != 0)
+ printfunc("main() - pthread_object_call() - %s\n",
+ strerror(errno));
+ }
+
+ ts1.tv_sec = TIMEOUT;
+ ts1.tv_nsec = 0;
+ for (;;) {
+ /*
+ * Read messages as long as there are any, and reply to each
+ * of them in a synchroneous manner.
+ */
+ while ((msg = (struct message *)pthread_msg_get(&main_port))
+ != NULL) {
+
+ printfunc(
+ "Main: Received message %d from thread #%d\n",
+ msg->i, msg->id);
+
+ if ((err = pthread_msg_reply((pthread_msg_t *)msg))
+ != 0)
+ printfunc(
+ "Main: pthread_message_reply() - %s\n",
+ strerror(err));
+ }
+
+ /*
+ * No more messages to process; Wait for any message(s) to be
+ * available.
+ * Note that there is special provision in the event where
+ * this loop first polling for new messages before processing
+ * them, which causes waiting for the ring to immediately
+ * return instead of actually waiting if any messages already
+ * have been sent.
+ */
+ printfunc("Main: Waiting for messages\n");
+
+ (void) gettimeofday(&tv, NULL);
+ TIMEVAL_TO_TIMESPEC(&tv, &ts);
+ timespecadd(&ts, &ts1, &ts);
+ if ((err = pthread_ring_wait(&ring, &ts)) != 0) {
+ printfunc("Main: pthread_ring_wait() - %s\n",
+ strerror(err));
+ break;
+ }
+ }
+
+ (void) pthread_mutex_destroy(&print_lock);
+ (void) pthread_port_destroy(&main_port);
+ (void) pthread_ring_destroy(&ring);
+
+ return 0;
+}
+
+static void
+threadfunc(pthread_object_t *obj, void *args)
+{
+ int id = *(int *)args;
+ int i, err;
+ struct message msg;
+ pthread_port_t rport;
+ pthread_ring_t rring;
+
+ if ((err = pthread_port_init(&rport)) != 0 ||
+ (err = pthread_ring_init(&rring)) != 0 ||
+ (err = pthread_port_set_ring(&rport, &rring)) != 0 ||
+ (err = pthread_msg_init((pthread_msg_t *)&msg, &rport)) != 0) {
+ printfunc("threadfunc() - initialization - %s\n",
+ strerror(err));
+ return;
+ }
+
+ msg.id = id;
+
+ (void) printfunc("Thread #%d started\n", id);
+
+ for (i = 0; i < ROUNDS; i++) {
+ /*
+ * Prepare and send synchronous message. For asynchronous
+ * operation, we would need to allocate a message and to send
+ * it, and not expect a reply back immediately, even letting
+ * the other end free the message as necessary. In synchronous
+ * mode we can use the same message over and over and share
+ * its memory area using proper send/reply methods for
+ * synchronization.
+ */
+ msg.i = i;
+ if ((err = pthread_msg_put(&main_port, (pthread_msg_t *)&msg))
+ != 0)
+ printfunc("Thread: pthread_message_put() - %s\n",
+ strerror(err));
+
+ /* Now wait for synchronous reply and discard it */
+ if ((err = pthread_ring_wait(&rring, NULL)) != 0) {
+ printfunc("Thread: pthread_ring_wait() - %s\n",
+ strerror(err));
+ break;
+ }
+ if (pthread_msg_get(&rport) == NULL)
+ printfunc("Thread: pthread_msg_get() == NULL!?\n");
+ printfunc("Thread #%d received reply message for %d\n",
+ id, i);
+ }
+
+ printfunc("Thread #%d ending\n", id);
+
+ (void) pthread_port_destroy(&rport);
+ (void) pthread_ring_destroy(&rring);
+ (void) pthread_msg_destroy((pthread_msg_t *)&msg);
+}
+
+static void
+printfunc(const char *fmt, ...)
+{
+ char buf[1024];
+ va_list arg_ptr;
+ int len;
+
+#ifdef NOPRINT
+ return;
+#endif
+
+ *buf = '\0';
+ va_start(arg_ptr, fmt);
+ if ((len = vsnprintf(buf, 1023, fmt, arg_ptr)) < 1)
+ return;
+ va_end(arg_ptr);
+
+#ifdef PRINTLOCK
+ (void) pthread_mutex_lock(&print_lock);
+#endif
+ (void) fwrite(buf, len, 1, stdout);
+#ifdef PRINTLOCK
+ (void) fflush(stdout);
+ (void) pthread_mutex_unlock(&print_lock);
+#endif
+}
--- /dev/null
+/* $Id: poll_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $ */
+
+/*
+ * Copyright (C) 2005, Matthew Mondor
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. All advertising materials mentioning features or use of this software
+ * must display the following acknowledgement:
+ * This product includes software developed by Matthew Mondor.
+ * 4. The name of Matthew Mondor may not be used to endorse or promote
+ * products derived from this software without specific prior written
+ * permission.
+ * 5. Redistribution of source code may not be released under the terms of
+ * any GNU Public License derivate.
+ *
+ * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+
+#include <stdio.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdarg.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+
+
+
+MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
+\tMatthew Mondor. All rights reserved.\n");
+MMRCSID("$Id: poll_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $");
+
+
+
+int main(void);
+
+
+
+int
+main(void)
+{
+ int err;
+
+ if ((err = pthread_poll_init()) != 0) {
+ (void) fprintf(stderr, "main() - pthread_poll_init() - %s\n",
+ strerror(err));
+ exit(EXIT_FAILURE);
+ }
+
+ return 0;
+}
--- /dev/null
+/*
+ * The goal of this program is to verify if it is valid for multiple threads
+ * to poll(2) on the same filedescriptor, and if so, what happens whenever
+ * an event is triggered on that descriptor.
+ *
+ * XXX Problems:
+ * - Only one of the polling threads seems to be awaken when an event occurs
+ * on the descriptor. This probably means that using a signal would be
+ * better... I sure don't want to need a filedescriptor per ring...
+ * If I did however, would this really hurt? Are there that many rings?
+ * But oops, this actually means two filedescriptors for each!
+ * using a signal is probably better. However, we then need to clobber some
+ * signal... We could use SIGUSR2.
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <pthread.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+
+
+#define THREADS 8
+
+
+
+int main(void);
+static void *thread_poll(void *);
+static void *thread_notify(void *);
+static void thread_print(int, const char *);
+
+
+
+static int sockets[2];
+static int threadargs[THREADS];
+static pthread_mutex_t print_mutex;
+static pthread_mutex_t sockets_mutex;
+
+
+
+int
+main(void)
+{
+ pthread_t threadid;
+ int i;
+
+ /*
+ * Create socketpair which will be used to trigger events to awaken
+ * polling threads.
+ */
+ if (socketpair(AF_LOCAL, SOCK_DGRAM, 0, sockets) != 0) {
+ perror("socketpair()");
+ exit(EXIT_FAILURE);
+ }
+ if (fcntl(sockets[0], F_SETFL, O_NONBLOCK) != 0 ||
+ fcntl(sockets[1], F_SETFL, O_NONBLOCK) != 0) {
+ perror("fcntl()");
+ exit(EXIT_FAILURE);
+ }
+
+ pthread_mutex_init(&print_mutex, NULL);
+ pthread_mutex_init(&sockets_mutex, NULL);
+
+ /*
+ * First launch THREADS polling threads
+ */
+ for (i = 0; i < THREADS; i++) {
+ threadargs[i] = i;
+ pthread_create(&threadid, NULL, thread_poll, &threadargs[i]);
+ }
+ sleep(1);
+
+ /*
+ * And finally launch notifyer thread
+ */
+ pthread_create(&threadid, NULL, thread_notify, NULL);
+
+ /*
+ * Now just wait
+ */
+ for (;;)
+ (void) pause();
+}
+
+static void *
+thread_poll(void *args)
+{
+ struct pollfd fds[1];
+ int n;
+ int id = *(int *)args;
+ char c;
+
+ fds[0].fd = sockets[1];
+ fds[0].events = POLLIN;
+ for (;;) {
+ thread_print(id, "Polling");
+ if ((n = poll(fds, 1, -1)) == -1) {
+ perror("poll()");
+ return NULL;
+ }
+ thread_print(id, "Poll returned");
+ if (n == 0) {
+ thread_print(id, "Woke up! (no data)");
+ continue;
+ }
+ if ((fds[0].revents & POLLIN) != 0) {
+ /* Attempt to read event/byte */
+ thread_print(id, "Woke up! (with data)");
+ pthread_mutex_lock(&sockets_mutex);
+ while ((n = read(sockets[1], &c, 1)) == 1)
+ thread_print(id, "Read data!");
+ if (n == -1)
+ thread_print(id, strerror(errno));
+ pthread_mutex_unlock(&sockets_mutex);
+ }
+ }
+}
+
+/* ARGSUSED */
+static void *
+thread_notify(void *args)
+{
+ char c = '\0';
+ struct pollfd fds[1];
+
+ fds[0].fd = sockets[0];
+ fds[0].events = POLLOUT;
+ for (;;) {
+ sleep(1);
+ thread_print(-1, "Notifying");
+ pthread_mutex_lock(&sockets_mutex);
+ if (write(sockets[0], &c, 1) != 1) {
+ /* Poll until we can send data */
+ (void) poll(fds, 1, -1);
+ }
+ pthread_mutex_unlock(&sockets_mutex);
+ }
+}
+
+static void
+thread_print(int id, const char *str)
+{
+
+ pthread_mutex_lock(&print_mutex);
+ printf("%d: %s\n", id, str);
+ pthread_mutex_unlock(&print_mutex);
+}
--- /dev/null
+/*
+ * The goal of this program is to verify if it is valid for multiple threads
+ * to be awaken from a poll(2) call by a single process-wide signal. This
+ * would allow the notifyer of a thread message event to generate this signal
+ * if needed to cause interested treads to wake up. Threads which do not want
+ * to receive the signal can simply ignore it using pthread_sigmask().
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <pthread.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+
+
+#define THREADS 8
+
+
+
+int main(void);
+static void *thread_poll(void *);
+static void *thread_notify(void *);
+static void thread_print(int, const char *);
+
+
+
+static int sockets[2];
+static int threadargs[THREADS];
+static pthread_mutex_t print_mutex;
+static pthread_mutex_t sockets_mutex;
+
+
+
+int
+main(void)
+{
+ pthread_t threadid;
+ int i;
+
+ /*
+ * Create socketpair which will be used to trigger events to awaken
+ * polling threads.
+ */
+ if (socketpair(AF_LOCAL, SOCK_DGRAM, 0, sockets) != 0) {
+ perror("socketpair()");
+ exit(EXIT_FAILURE);
+ }
+ if (fcntl(sockets[0], F_SETFL, O_NONBLOCK) != 0 ||
+ fcntl(sockets[1], F_SETFL, O_NONBLOCK) != 0) {
+ perror("fcntl()");
+ exit(EXIT_FAILURE);
+ }
+
+ pthread_mutex_init(&print_mutex, NULL);
+ pthread_mutex_init(&sockets_mutex, NULL);
+
+ /*
+ * First launch THREADS polling threads
+ */
+ for (i = 0; i < THREADS; i++) {
+ threadargs[i] = i;
+ pthread_create(&threadid, NULL, thread_poll, &threadargs[i]);
+ }
+ sleep(1);
+
+ /*
+ * And finally launch notifyer thread
+ */
+ pthread_create(&threadid, NULL, thread_notify, NULL);
+
+ /*
+ * Now just wait
+ */
+ for (;;)
+ (void) pause();
+}
+
+static void *
+thread_poll(void *args)
+{
+ struct pollfd fds[1];
+ int n;
+ int id = *(int *)args;
+ char c;
+
+ fds[0].fd = sockets[1];
+ fds[0].events = POLLIN;
+ for (;;) {
+ thread_print(id, "Polling");
+ if ((n = poll(fds, 1, -1)) == -1) {
+ perror("poll()");
+ return NULL;
+ }
+ thread_print(id, "Poll returned");
+ if (n == 0) {
+ thread_print(id, "Woke up! (no data)");
+ continue;
+ }
+ if ((fds[0].revents & POLLIN) != 0) {
+ /* Attempt to read event/byte */
+ thread_print(id, "Woke up! (with data)");
+ pthread_mutex_lock(&sockets_mutex);
+ while ((n = read(sockets[1], &c, 1)) == 1)
+ thread_print(id, "Read data!");
+ if (n == -1)
+ thread_print(id, strerror(errno));
+ pthread_mutex_unlock(&sockets_mutex);
+ }
+ }
+}
+
+/* ARGSUSED */
+static void *
+thread_notify(void *args)
+{
+ char c = '\0';
+ struct pollfd fds[1];
+
+ fds[0].fd = sockets[0];
+ fds[0].events = POLLOUT;
+ for (;;) {
+ sleep(1);
+ thread_print(-1, "Notifying");
+ pthread_mutex_lock(&sockets_mutex);
+ if (write(sockets[0], &c, 1) != 1) {
+ /* Poll until we can send data */
+ (void) poll(fds, 1, -1);
+ }
+ pthread_mutex_unlock(&sockets_mutex);
+ }
+}
+
+static void
+thread_print(int id, const char *str)
+{
+
+ pthread_mutex_lock(&print_mutex);
+ printf("%d: %s\n", id, str);
+ pthread_mutex_unlock(&print_mutex);
+}