From: Matthew Mondor Date: Tue, 13 Mar 2007 19:37:26 +0000 (+0000) Subject: Importing in main tree this library which used to be in tests tree X-Git-Tag: pgsql-branch-merge~3 X-Git-Url: http://git.pulsar-zone.net/?a=commitdiff_plain;h=c6268fe8d08c8762fee6025ff7d40815151fe04b;p=mmondor.git Importing in main tree this library which used to be in tests tree --- diff --git a/mmsoftware/pthread_util/GNUmakefile b/mmsoftware/pthread_util/GNUmakefile new file mode 100644 index 0000000..7d89656 --- /dev/null +++ b/mmsoftware/pthread_util/GNUmakefile @@ -0,0 +1,35 @@ +# $Id: GNUmakefile,v 1.1 2007/03/13 19:37:22 mmondor Exp $ + +MMLIB_PATH := ../../mmlib +MMLIBS := $(addprefix ${MMLIB_PATH}/,mmlog.o mmpool.o mmstring.o) +OBJS := mm_pthread_msg.o mm_pthread_sleep.o mm_pthread_pool.o mm_pthread_poll.o +BINS := tests/msg_test tests/poll_test + +CFLAGS += -Wall +#CFLAGS += -DDEBUG -DPTHREAD_DEBUG -g3 + +LDFLAGS += -lc -lpthread +#LDFLAGS += -lpthread_dbg + + +all: $(BINS) + + +%.o: %.c + cc -c ${CFLAGS} -I. -I$(MMLIB_PATH) -o $@ $< + + +tests/msg_test: tests/msg_test.o $(MMLIBS) $(OBJS) + cc ${CFLAGS} -o $@ $@.c $(OBJS) -I. -I$(MMLIB_PATH) ${LDFLAGS} \ + $(MMLIBS) + +tests/poll_test: tests/poll_test.o $(MMLIBS) $(OBJS) + cc ${CFLAGS} -o $@ $@.c $(OBJS) -I. -I$(MMLIB_PATH) ${LDFLAGS} \ + $(MMLIBS) + + +install: all + + +clean: + rm -f tests/msg_test.o tests/poll_test.o $(BINS) $(OBJS) $(MMLIBS) diff --git a/mmsoftware/pthread_util/README b/mmsoftware/pthread_util/README new file mode 100644 index 0000000..8054ac7 --- /dev/null +++ b/mmsoftware/pthread_util/README @@ -0,0 +1,197 @@ +This library is an attempt to provide pth library like API to NetBSD SA +threads and kqueue. + +What we find are missing from the POSIX standard are added here: + +- Implementation of efficient messages to communicate among threads. These + messages are queued using an efficient pointer linking mechanism. It must be + possible for a thread to wait for messages while sleeping and to be awaken + when a message is available. It also must be possible to observe a maximum + timeout to wait for. +- Implementation of filedescriptors and above mentionned thread messages + notification multiplexing, with support for timer. An example of this is + pth library's pth_poll_ev(). A timer event can interrupt thread-safe + filedescriptor polling, as well as thread messages arriving on a port. + +We beleive that it is possible to implement this using the kqueue(2)/kevent(2) +system. The new call would be similar to: + +pthread_poll(struct pollfd *fds, int nfds, + struct pthread_port *ports, int nports, + struct pthread_sigs *sigs, int nsigs, + struct pthread_timers *timers, int ntimers) + +or similar system. This would allow multiplexing of various events into a +single application loop. + + +pthread_cond_timedwait() seems especially useful, either with a signal handler +or perhaps using kqueue concurrently... pthread_cond_timedwait() will allow +processes to wait for message arrival though a port, while +pthread_cond_signal() or pthread_cond_broadcast() will be able to awaken them +as messages are queued to the message port. However, we would ideally want to +only signal a wanted thread waiting for a port... But, normally only one +thread should be listening for messages on any given port. I have to see what +I'll do for a thread listening for messages on multiple ports at a time... +Perhaps that multiple ports could use the same conditional wait variable so +that the process would only wait on that one, and then verify the message +queue for each before going back in waiting mode. + + +TODO: +==== +- Replace mutex and conditonal variable initializers, as well as attributes, + with static initializers. +- Provide similar static intializer macros as part of our API where possible. +- I have a working message passing implementation, with possibility of a + waiter on as many ports as wanted. I however still have a challenge: + Multiplex system calls such as select(2), poll(2), connect(2) and accept(2) + with the messaging capability. One must be able to cause the other to + return. This could be tricky to properly implement. Maybe think about the + following ideas: + - Dedicate a thread to serve a syscall, with which communication is solely + done using messages. This however implies that only a single syscall at a + time can be processed by such a thread. This probably means that a pool of + such threads would become necessary. This also assumes that the syscall in + question do not block the whole process, but only the intended thread. + Alot of assumptions, but this would now work properly on all BSDs and + on Linux. Possibly also on Solaris. + - Use a mix of signals and syscalls, since signals can interrupt syscalls. + However, this implies adding capability in our message system to trigger + signals rather than only using a conditional variable to notify of message + arrival. This also probably means that the same signal handler must be + shared by the whole process, that is, all the threads. + - Use kqueue in a thread-safe manner with thread-specific signals (if + possible). kqueue can be used to track signals without the need for an + actual signal handler. It would also track filedescriptor changes at the + same time. This also probably means that we need to use kqueue user + events if possible, triggered from the message passing system. It also + means non-portable code outside of the realm of BSD systems. + + + +RECENT REVIEW AFTER SOME REFLECTION +=================================== + +Currently, pth_accept_ev() and pth_connect_ev() are the only two cases of +special PTh functions which my software uses, notably mmftpd(8). These could +easily be implemented using a random thread in the pool whenever necessary, +with which communication would entirely use messages only. This thread could +be told: Perform syscall in non-blocking mode using the supplied +filedescriptor and notify me weither it succeeded, failed because of a timeout, +if any, or was interrupted by a message event occuring on the specified ring, +if any. The application however has to know that if it was interrupted by +an event, the connection still occurs asynchroneously within the system. +We should verify what could be done to cancel a not yet completed connection, +if possible. This call could also report if the call was interrupted by a +signal arrival (EINTR), optionally. If the socket was supplied in blocking +mode, it would have to be switched to non-blocking mode by the system and +then back into blocking mode. The caller could ensure to set it into +non-blocking mode for enhanced performance if no blocking mode is required. +The challenge would be finding a both efficient and portable solution to +have select()/poll() awake upon reception of notification events on a ring. +Perhaps that a global filedescriptor could be used for this, SOCK_DGRAM and +one byte sent, or that a signal handler with a signal generation should be +used... Both methods would probably awake the whole process, however. +pthread_sigmask() could be used perhaps... I wouldn't want to have a special +fd required for each ready thread of the pool, ideally. + +Implement: +mm_pthread_io pthread_poll_ev(), pthread_accept_ev(), + pthread_connect_ev() +mm_pthread_alarm pthread_sleep(), etc. + +or maybe: + +mm_pthread_misc For all of them + +Perhaps reimplement the system I worked on in mmserver(3) as well. This might +be necessary for operations which really should be dedicated to a non-threaded +process at occasions, and the subsystem should be available. It should probably +use a pool using mmpool(3) as well, just like we are doing with threads. + +It would be interesting to implement better GC for mmpool(3)'s. Currently, +pool_free() will discard pages which are no longer in use since some time, +but the time cannot be linear, since it only accounts a certain number of +calls made to it. It should instead be possible to use time intervals, and +to let the application invoke the GC at wanted fixed intervals. This would +allow to use time based average statistics rather than function call times +based ones, without clobbering process or thread timers which the application +might need. It simply has to provide its own and to call the GC function +regularily. + +Hmm also, would be nice to be able to store the port_t pointer of the port +which triggered notification on a ring_t, so that callers don't need to +run through several ports attached on a ring... Maybe that it would be +problematic however, since we can't guarantee atomicity between messages and +messages processing, unless we kludged the whole thing with locks and lost +efficiency. And because we only trigger notification to wakeup a waiting +thread when a message is queued on an empty port, it's possible that the +applicaton sleeps forever on a port if it didn't totally empty it, unless +there was a way for the sleep function to immediately return if called on +non-empty ports (as it's only alled on rings, and that rings don't have +access to a list of ports in current implementation (only the ports can +know which ring they are tied to)... I could implement something to have +rings see their attached ports with a list, however. But this again means +looping among ports to see if they're non-empty, heh, so why not let the +application do it as they do now. + + + +IMPORTANT +========= + +I did a test where multiple threads were polling on a single filedescriptor +consisting of a socketpair, which other side was used to wake them up. +Only one random thread would wake up. + +Using a signal to cause all threads to wake would not work either, because +then again only a random thread will awake. + +It appears that the only way to ensure to wake wanted threads is using +conditional variables and for them to only sleep on these. + +SIGIO possibility... threads would be sleeping on a conditional wait variable +corresponding to the filedescriptor. For polling, the fd would be made in +non-blocking I/O, with SIGIO sent to process. The fd and associated cond var +would be added to a table. The SIGIO signal handler would need to check all +fds in the set for possible I/O and awake corresponding threads waiting on +cond var. A problem exists: How to check a filedescriptor for pending event? +How to know if event is read or write, or hup, etc? Maybe using more ore less +standard FION ioctls? poll/select with 0 timeout maybe, but that is still +troublesome in terms of performance I beleive. + +fd cond interesting_events occured_events? + +Hmm and what if a thread was allocated to start polling, and another wrapper +thread wait for it sleeping on a cond var? Would it be sane to do this? +When a timeout or message occurs however detected by the wrapper thread, +how would we stop the other thread polling? We still have a problem. +If we left pending polling threads, how would a future thread with successful +polling on the same descriptor ever wake up, a random one would. +Why does POSIX threads suck so much as to not provide any decent way to +work with filedescriptors!? If at least I had pthread_signal() it would +help. I could send a signal to interrupt the wanted thread when it was polling. +Or if only there was a way to set the wanted signal mask for wanted processes +as necessary, so that I would only have the wanted one process a particular +signal I could send to interrupt it and then restore the masks, and do this +somehow atomically. If POSIX had any of these requirements in mind while +developing the standard, pthread_poll_condwait() or pthread_signal() would +already exist anyways! + + +HMM +=== + +A thread reserved for polling would seem best. We need to be able to interrupt +that thread whenever needed using a signal, which all other processes must +be blocking. We could use SIGIO, or SIGUSR2 for instance. That thread would +process thread messages and go back to polling. It probably could handle +timeouts as well, but this is probably not necessary. If it did, would +probably free other threads from calling gettimeofday() too often. The thread +has to remove the fd from the polling list when an event returned on it +anyways, and so it could also send a reply message for timeout. It has to +be interrupted anyways when a new fd is to be added, and this means that +it could fix the poll timer before calling it each time to fit the soonest +to expire fd... I could probably use kqueue too, or libevent in that thread +to make it high performance as possible. diff --git a/mmsoftware/pthread_util/mm_pthread_debug.h b/mmsoftware/pthread_util/mm_pthread_debug.h new file mode 100644 index 0000000..a5ee1dc --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_debug.h @@ -0,0 +1,63 @@ +/* $Id: mm_pthread_debug.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2004-2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#ifndef MM_PTHREAD_DEBUG_H +#define MM_PTHREAD_DEBUG_H + + + +#include +#include + + + +#ifdef PTHREAD_DEBUG + +#define DEBUG_PTHREAD_ENTRY() \ + syslog(LOG_NOTICE, "> TID=%p FN=%s", pthread_self(), __func__) + +#define DEBUG_PTHREAD_EXIT() \ + syslog(LOG_NOTICE, "< TID=%p FN=%s", pthread_self(), __func__) + +#else +#define DEBUG_PTHREAD_ENTRY() +#define DEBUG_PTHREAD_EXIT() +#endif + + + +#endif diff --git a/mmsoftware/pthread_util/mm_pthread_msg.c b/mmsoftware/pthread_util/mm_pthread_msg.c new file mode 100644 index 0000000..6deb30e --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_msg.c @@ -0,0 +1,466 @@ +/* $Id: mm_pthread_msg.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * It is almost a shame that POSIX did not define a standard API for + * inter-thread asynchroneous and synchroneous messaging. So, here is my + * implementation. Note that for asynchroneous operation it is recommended to + * use a memory pool such as mmpool(3) to allocate and free messages in an + * efficient way, in cases where messages will need to be sent to the other + * end without expecting a response back before the current function ends + * (in which case a message obviously can't be on the stack). + */ + + + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +/*#include */ + + + +MMCOPYRIGHT("@(#) Copyright (c) 2005\n\ +\tMatthew Mondor. All rights reserved.\n"); +MMRCSID("$Id: mm_pthread_msg.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $"); + + + +/* + * Allows to initialize a polling notification handle. When attached to a + * port, a message arriving on an empty port causes the associated ring to + * wake the thread from pthread_ring_wait(). + */ +int +pthread_ring_init(pthread_ring_t *ring) +{ + int error; + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(ring != NULL && ring->magic != PRING_MAGIC); + + if ((error = pthread_cond_init(&ring->cond, NULL)) == 0) { + if ((error = pthread_mutex_init(&ring->mutex, NULL)) == 0) { + ring->magic = PRING_MAGIC; + ring->event = ring->mevent = 0; + DEBUG_PTHREAD_EXIT(); + return 0; + } + (void) pthread_cond_destroy(&ring->cond); + } + + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Returns TRUE if the supplied ring is a valid/usable one, or FALSE + * otherwise. Useful to conditionally destroy it. + */ +int +pthread_ring_valid(pthread_ring_t *ring) +{ + + DEBUG_PTHREAD_ENTRY(); + + DEBUG_PTHREAD_EXIT(); + return (ring != NULL && ring->magic == PRING_MAGIC); +} + +/* + * Destroys a ring. Note that all message ports attached to this ring should + * first be detached or destroyed. + */ +int +pthread_ring_destroy(pthread_ring_t *ring) +{ + int error; + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC); + + if ((error = pthread_mutex_destroy(&ring->mutex)) == 0) + error = pthread_cond_destroy(&ring->cond); + ring->magic = 0; + + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Causes the current thread to sleep until a message arrives on an empty port + * associated with this ring. In normal operation, a thread only goes in wait + * mode after it processed all queued messages on all interesting ports. + * However, provision is made so that a the function returns immediately if + * messages already were received on a port attached to this ring since the + * last call to pthread_ring_wait(). + * Although using such an absolute time timespec might be disadvantageous for + * the API compared to a timeout in milliseconds for instance, this was chosen + * to remain API-compatible with pthread_cond_timedwait(), and upwards + * compatible with systems where nanosecond precision can be achieved. + */ +int +pthread_ring_wait(pthread_ring_t *ring, const struct timespec *abstime) +{ + int error = 0; + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC); + + /* We must hold the condition variable's mutex */ + if (pthread_mutex_lock(&ring->mutex) != 0) { + error = -1; + goto err; + } + + /* As long as we don't have confirmation that we must stop waiting */ + for (ring->event = 0; ring->mevent == 0 && + !ring->event && error == 0; ) { + /* + * Wait on conditional variable, which will automatically + * and atomically release the mutex and return with the mutex + * locked again, as soon as the conditional variable gets + * signaled. + */ + if (abstime != NULL) { + error = pthread_cond_timedwait(&ring->cond, + &ring->mutex, abstime); + } else + error = pthread_cond_wait(&ring->cond, &ring->mutex); + } + ring->mevent = 0; + + /* + * And we know that conditional waiting functions returned with mutex + * locked, so now release it back. + */ + (void) pthread_mutex_unlock(&ring->mutex); + +err: + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Allows to wake up waiter(s) on the specified ring, which are sleeping + * threads within pthread_ring_wait(). This can be used to simulate the + * arrival of a message on an empty port. Also useful to use rings as a + * notification system only when no message passing is needed. + */ +int +pthread_ring_notify(pthread_ring_t *ring) +{ + int error; + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC); + + if ((error = pthread_mutex_lock(&ring->mutex)) == 0) { + ring->mevent++; + ring->event = 1; + (void) pthread_cond_signal(&ring->cond); + (void) pthread_mutex_unlock(&ring->mutex); + } + + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Allows to initialize/create a message port. + */ +int +pthread_port_init(pthread_port_t *port) +{ + int error; + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(port != NULL && port->magic != PPORT_MAGIC); + + if ((error = pthread_mutex_init(&port->lock, NULL)) != 0) + goto err; + + port->magic = PPORT_MAGIC; + port->ring = NULL; + DLIST_INIT(&port->messages); + +err: + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Returns TRUE if the supplied port is valid/usable, or FALSE otherwise. + * Useful to conditionally destroy a port, for instance. + */ +int +pthread_port_valid(pthread_port_t *port) +{ + + DEBUG_PTHREAD_ENTRY(); + + DEBUG_PTHREAD_EXIT(); + return (port != NULL && port->magic == PPORT_MAGIC); +} + +/* + * Destroys the specified port, previously created using pthread_port_init(). + */ +int +pthread_port_destroy(pthread_port_t *port) +{ + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC); + + port->magic = 0; + + DEBUG_PTHREAD_EXIT(); + return pthread_mutex_destroy(&port->lock); +} + +/* + * Attaches a port to a ring. Multiple ports may be attached to a ring. A + * message arriving on an empty port will cause the attached ring to be + * notified, if any, and as such to cause a thread waiting on the ring to + * be awakened. + */ +int +pthread_port_set_ring(pthread_port_t *port, pthread_ring_t *ring) +{ + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC && + (ring == NULL || ring->magic == PRING_MAGIC)); + + port->ring = ring; + + DEBUG_PTHREAD_EXIT(); + return 0; +} + +/* + * Allows to initialize a message before it can be sent over a port. The + * message only needs to be initialized once in general, even if it will be + * used for bidirectional transmission for synchronous operation. If the + * reply port needs to be changed, however, this function should be used again + * to set the new reply port. + */ +int +pthread_msg_init(pthread_msg_t *msg, pthread_port_t *rport) +{ + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(msg != NULL && msg->magic != PMESG_MAGIC && + (rport == NULL || rport->magic == PPORT_MAGIC)); + + msg->magic = PMESG_MAGIC; + msg->reply = rport; + msg->size = 0; + msg->message = NULL; + + DEBUG_PTHREAD_EXIT(); + return 0; +} + +/* + * Returns TRUE if supplied message is valid/usable or FALSE otherwise. + */ +int +pthread_msg_valid(pthread_msg_t *msg) +{ + + DEBUG_PTHREAD_ENTRY(); + + DEBUG_PTHREAD_EXIT(); + return (msg != NULL && msg->magic == PMESG_MAGIC); +} + +/* + * Invalidates a message, so that it can no longer be sent over ports. + */ +int +pthread_msg_destroy(pthread_msg_t *msg) +{ + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(msg != NULL && msg->magic == PMESG_MAGIC); + + msg->magic = 0; + + DEBUG_PTHREAD_EXIT(); + return 0; +} + +/* + * If any message exists in the queue of the specified port, unqueues it and + * returns it. Otherwise, NULL is returned. In normal operation, all messages + * queued to a port are processed before putting the thread back into sleep, + * mainly for efficiency, but also because it eases synchronization. + */ +pthread_msg_t * +pthread_msg_get(pthread_port_t *port) +{ + pthread_msg_t *msg = NULL; + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC); + + if (pthread_mutex_lock(&port->lock) != 0) + goto err; + + if ((msg = DLIST_TOP(&port->messages)) != NULL) { + DEBUG_ASSERT(msg->magic == PMESG_MAGIC); + DLIST_UNLINK(&port->messages, (node_t *)msg); + } + + (void) pthread_mutex_unlock(&port->lock); + +err: + DEBUG_PTHREAD_EXIT(); + return (pthread_msg_t *)msg; +} + +/* + * Queues the specified message to the specified port, returning 0 on success. + * Note that the message data is not copied or moved, but that a pointer + * system is used to queue the message. Thus, the message's shared memory + * region is leased temporarily to the other end. One has to be careful to + * not allocate this message space on the stack when asynchroneous operation + * is needed. In synchroneous operation mode, it is not a problem, since the + * sender does not have to modify the data until the other end replies back + * with the same message after modifying the message if necessary. In + * synchroneous mode, we simply delegate that message memory region to the + * other end until it notifies us with a reply that it is done working with + * it. Returns 0 on success, or an error number. + */ +int +pthread_msg_put(pthread_port_t *port, pthread_msg_t *msg) +{ + int error = 0; + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC && + msg != NULL && msg->magic == PMESG_MAGIC); + + if ((error = pthread_mutex_lock(&port->lock)) != 0) + goto err; + + DLIST_APPEND(&port->messages, (node_t *)msg); + if (port->ring != NULL) { + if (DLIST_NODES(&port->messages) == 1) { + /* + * We know that there previously were no messages, + * and that the reading thread then waits for any + * message to be available. Signal it that there at + * least is one message ready. The other end should + * normally process all available messages before + * going back into waiting. + */ + if ((error = pthread_mutex_lock(&port->ring->mutex)) + == 0) { + port->ring->event = 1; + (void) pthread_cond_signal(&port->ring->cond); + (void) pthread_mutex_unlock( + &port->ring->mutex); + } + } + /* + * If the other end, however, is already locked + * waiting for the ring to be notified while + * there already are messages, we still trigger mevent + * to cause it to unlock, however. This behavior is + * useful in the polling system code, for instance. + */ + /* XXX We don't use a mutex for now... */ + port->ring->mevent++; + } + + (void) pthread_mutex_unlock(&port->lock); + +err: + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Meant to be used in synchroneous message transfer mode. The initial sender + * sends a message to the other end, which then uses this function to notify + * back the initial sender that it is done, often with a success/failure + * result as part of the message. Returns 0 on success, or an error number. + */ +int +pthread_msg_reply(pthread_msg_t *msg) +{ + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(msg != NULL && msg->magic == PMESG_MAGIC && + msg->reply != NULL); + + DEBUG_PTHREAD_EXIT(); + return pthread_msg_put(msg->reply, msg); +} + +/* + * Returns the number of pending messages tied to the port, if any, or -1 + * on error. + */ +int +pthread_port_pending(pthread_port_t *port) +{ + int pending = -1; + + DEBUG_PTHREAD_ENTRY(); + DEBUG_ASSERT(port != NULL && port->magic == PPORT_MAGIC); + + if (pthread_mutex_lock(&port->lock) != 0) + goto err; + + pending = (int)DLIST_NODES(&port->messages); + + (void) pthread_mutex_unlock(&port->lock); + +err: + DEBUG_PTHREAD_EXIT(); + return pending; +} diff --git a/mmsoftware/pthread_util/mm_pthread_msg.h b/mmsoftware/pthread_util/mm_pthread_msg.h new file mode 100644 index 0000000..d0d4282 --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_msg.h @@ -0,0 +1,110 @@ +/* $Id: mm_pthread_msg.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#ifndef MM_PTHREAD_MSG_H +#define MM_PTHREAD_MSG_H + + + +#include + +#include +#include + + + +#define PRING_MAGIC 0x50524e47 +#define PPORT_MAGIC 0x50505254 +#define PMESG_MAGIC 0x504d5347 + +typedef struct { + u_int32_t magic; + pthread_cond_t cond; + pthread_mutex_t mutex; + int mode; + int event; + int mevent; +} pthread_ring_t; + +enum pthread_ring_modes { + PTHREAD_RMOD_NOWAIT, + PTHREAD_RMOD_CONDWAIT, + PTHREAD_RMOD_FDWAIT +}; + +typedef struct { + u_int32_t magic; + pthread_ring_t *ring; + pthread_mutex_t lock; + list_t messages; +} pthread_port_t; + +typedef struct { + node_t node; + u_int32_t magic; + pthread_port_t *reply; + size_t size; + void *message; +} pthread_msg_t; + + + +extern int pthread_ring_init(pthread_ring_t *); +extern int pthread_ring_valid(pthread_ring_t *); +extern int pthread_ring_destroy(pthread_ring_t *); +extern int pthread_ring_wait(pthread_ring_t *, + const struct timespec *); +extern int pthread_ring_notify(pthread_ring_t *); + +extern int pthread_port_init(pthread_port_t *); +extern int pthread_port_valid(pthread_port_t *); +extern int pthread_port_destroy(pthread_port_t *); +extern int pthread_port_set_ring(pthread_port_t *, + pthread_ring_t *); +extern int pthread_msg_init(pthread_msg_t *, + pthread_port_t *); +extern int pthread_msg_valid(pthread_msg_t *); +extern int pthread_msg_destroy(pthread_msg_t *); +extern pthread_msg_t *pthread_msg_get(pthread_port_t *); +extern int pthread_msg_put(pthread_port_t *, + pthread_msg_t *); +extern int pthread_msg_reply(pthread_msg_t *); +extern int pthread_port_pending(pthread_port_t *); + + + +#endif diff --git a/mmsoftware/pthread_util/mm_pthread_poll.c b/mmsoftware/pthread_util/mm_pthread_poll.c new file mode 100644 index 0000000..05778ee --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_poll.c @@ -0,0 +1,1116 @@ +/* $Id: mm_pthread_poll.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * I consider this code to be a major hack around the inherent problems unix + * systems face because of the lack of support for filedescriptor polling in + * the POSIX threads API. Although pthread defines methods for thread + * synchronization and polling waiting for events (using conditionnal + * variables), and that unix provides polling on filedescriptors using + * select(2), poll(2), kqueue(2) and other mechanisms, both are totally + * distinct entities which can be considered to either conflict with + * eachother or to not be related enough in a unified way. The current + * situation makes it almost impossible for a thread to both be polling for + * interthread efficient messages implementations built upon pthread, and + * filedescriptor events, concurrently. + * + * The GNU PTH library implements non-standard functions which allow to + * multiplex interthread messages and filedescriptor events, using for + * instance pth_poll_ev(), pth_select_ev(), pth_accept_ev(), pth_connect_ev(), + * etc. However, this is internally implemented using a single large select(2) + * based loop along with a slow large loop looking for non-fd events based on + * the principles of libevent. This threading library has other disadventages, + * such as not providing a preemptive scheduler (being a fully userspace + * implementation) and not allowing to scale to multiple processors on SMP + * systems. This interface however shows how good the POSIX threads API could + * have been, if it was better designed with unix systems in mind. This + * library also being the most portable threads library alternative for quite + * some time, because of the fact that Operating Systems implemented POSIX + * threads inconsistently, or not at all, caused us to use PTH during some + * time to develop software in cases where a pool of processes was not ideal + * because of the frequency of shared memory synchronization needs. + * + * With the advent of POSIX threads implementations on more unix and unix-like + * systems and of modern implementations behaving more consistently, which can + * scale on SMP systems and provide preemptive scheduling, it was considered + * worthwhile for us to adapt our software again to use the standard POSIX + * API. Especially considering that NetBSD which had no OS provided threads + * implementation for applications now has an awesome pthreads implementation + * starting with version 2.0. However, we encountered difficulties with some + * software which used the complex multiplexing of thread events and + * filedescriptor ones. This module provides a solution to port this software. + * It however is somewhat a hack. + * + * The downsides of this implementation are as follows. We originally intended + * to develop a system which would scale among an increasing number of threads + * in a ready pool of threads, scaling with concurrency of the polling calls. + * This however proved difficult, or impossible to achieve, the main reasons + * being that 1) A signal delivered to a process is only received by a random + * thread that is not blocking it. 2) In the case where multiple threads are + * polling on a common file descriptor, similarily only one random thread + * is awaken. 3) pthread_cond_signal() and pthread_cond_broadcast() cannot + * wake threads waiting in filedescriptor polling. 4) to achieve what we + * needed, two descriptors would have been necessary per notification ring. + * this was considered an aweful solution and was promptly rejected. 5) The + * POSIX API does not define a way for a process to set or change the signal + * blocking masks of other threads on the fly. + * + * Our solution then had to rely on a main descriptor polling manager thread + * which would be used to poll file descriptors, and would as a device serve + * client threads via efficient interthread messages. An issue still arises + * when a client thread sends a message to the polling thread to add new + * descriptors for polling or to cancel polling and remove descriptors. + * The polling thread must be able to immediately process these events + * awaking from filedescriptor polling. Two possible hacks could be used for + * this. 1) Use an AF_LOCAL SOCK_DGRAM socketpair(), which one side would + * be used to trigger an event writing some data, and the other side always + * included by the polling thread within the set of descriptors. 2) Send a + * signal to the process which only the polling thread is not blocking, + * to ensure that it be the one catching it, as such to awake from polling + * with an EINTR error code. This second solution was considered more elegant + * and is used as the basis of this implementation. We currently are + * clubbering the SIGUSR2 signal to achieve this. + */ + + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + + + +MMCOPYRIGHT("@(#) Copyright (c) 2005\n\ +\tMatthew Mondor. All rights reserved.\n"); +MMRCSID("$Id: mm_pthread_poll.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $"); + + + +/* + * Synchroneous communications message between arbitrary threads and the + * polling thread. Since communication is synchroneous, we only need to + * allocate one such message per thread. We are always expecting a reply + * back after sending a query before reusing the buffer. In fact, the + * message passing system only serves as a means for synchronization around + * the message, which is a shared memory object. + */ +struct poll_msg { + pthread_msg_t msgnode; + /* Passed as parameters */ + bool cancel; + struct pollfd *fds; + nfds_t nfds; + int timeout; + /* Returned as result */ + int ready, error; + /* Internally used */ + struct timeval expires; +}; + +/* + * An index is maintained of descriptor number -> poll_msg_index + * structures. Each of wich has information on the message the descriptor + * belongs to, and the index into the pollfd array so that it be easy to + * efficiently do per-fd work. + */ +struct poll_idx { + int idx; + struct poll_msg *msg; +}; + +/* + * Thread specific needed resources to use our special polling + */ +struct poll_data { + pthread_port_t port; + struct poll_msg msg; +}; + + + +#define POLLWAKE() do { \ + pollingevents++; \ + if (polling != 0) \ + (void) kill(process_id, SIGUSR2); \ +} while (/* CONSTCOND */0) + + + +/* + * Static functions prototypes + */ +static int pthread_poll_proc_init(void); +static void pthread_poll_proc_init2(void); +static int pthread_poll_thread_init(struct poll_data **); +static void pthread_poll_thread_exit(void *); +static void *poll_thread(void *); +static int poll_thread_attach_fds(struct poll_msg *); +static void poll_thread_detach_fds(struct poll_msg *); +static void poll_thread_sighandler(int); + +/* + * Static process specific storage + */ +static bool pthread_poll_initialized = FALSE; +static pthread_once_t pthread_poll_proc_initialized = PTHREAD_ONCE_INIT; +static pthread_key_t pthread_poll_proc_key; +static pthread_ring_t pthread_poll_thread_started_ring; +static pthread_port_t pthread_poll_thread_port; +static pid_t process_id; +static int polling = 0; +static int pollingevents = 0; + +/* + * Static global poll_thread storage. No synhronization is necessary when + * using these, since only the polling thread does. + */ +static struct poll_idx *poll_idx; +static nfds_t poll_idx_size; +static struct pollfd *poll_fds; +static nfds_t poll_fds_size; +static nfds_t poll_nfds; + + + +/* + * Static internal functions + */ + +static int +pthread_poll_proc_init(void) +{ + int error; + struct sigaction act; + + DEBUG_PTHREAD_ENTRY(); + + if ((error = pthread_key_create(&pthread_poll_proc_key, + pthread_poll_thread_exit)) != 0) + goto err; + + act.sa_handler = poll_thread_sighandler; + act.sa_flags = 0; + (void) sigemptyset(&act.sa_mask); + (void) sigaddset(&act.sa_mask, SIGUSR2); + if (sigaction(SIGUSR2, &act, NULL) != 0) { + error = errno; + goto err; + } + + process_id = getpid(); + + DEBUG_PTHREAD_EXIT(); + return 0; + +err: + DEBUG_PTHREAD_EXIT(); + return error; +} + +static void +pthread_poll_proc_init2(void) +{ + int error; + + DEBUG_PTHREAD_ENTRY(); + + if ((error = pthread_poll_proc_init()) != 0) { + (void) fprintf(stderr, "pthread_poll_proc_init() - %s\n", + strerror(error)); + DEBUG_PTHREAD_EXIT(); + exit(EXIT_FAILURE); + } + + DEBUG_PTHREAD_EXIT(); +} + +static int +pthread_poll_thread_init(struct poll_data **res) +{ + int error; + struct poll_data *data; + sigset_t set; + + DEBUG_PTHREAD_ENTRY(); + + (void) sigemptyset(&set); + (void) sigaddset(&set, SIGUSR2); + (void) pthread_sigmask(SIG_BLOCK, &set, NULL); + + if ((data = malloc(sizeof(struct poll_data))) == NULL) { + error = ENOMEM; + goto err; + } + + if ((error = pthread_port_init(&data->port)) != 0) + goto err; + if ((error = pthread_msg_init(&data->msg.msgnode, &data->port)) != 0) + goto err; + + if ((error = pthread_setspecific(pthread_poll_proc_key, data)) != 0) + goto err; + + *res = data; + + DEBUG_PTHREAD_EXIT(); + return 0; + +err: + if (data != NULL) { + (void) pthread_port_destroy(&data->port); + free(data); + } + + DEBUG_PTHREAD_EXIT(); + return error; +} + +static void +pthread_poll_thread_exit(void *specific) +{ + struct poll_data *data = (struct poll_data *)specific; + + DEBUG_PTHREAD_ENTRY(); + + (void) pthread_port_destroy(&data->port); + (void) pthread_msg_destroy(&data->msg.msgnode); + free(data); + + /* + * Some implementations need this + */ + (void) pthread_setspecific(pthread_poll_proc_key, NULL); + + DEBUG_PTHREAD_EXIT(); +} + + +/* + * Actual polling thread, with which we communicate using messages polling on + * pthread_port_t and pthread_ring_t. This is the only thread that should be + * catching SIGUSR2 signals (used to wake us up and reiterate our main loop. + * Note: Although less efficient than using kqueue(2) or libevent(3), after + * discussion with 3s4i we settled to using poll(2) for now, which minimizes + * OS dependencies as well as third party software dependencies. Because + * pthread_poll_ring(2) is only sparsely used by our software (migrating from + * using PTH library which provided pth_poll_ev()), and that we only provide + * it small pollfd arrays, this implementation was considered to meet our + * needs using poll(2). This also met the requirements for Tact group. + */ +/* ARGSUSED */ +static void * +poll_thread(void *args) +{ + sigset_t set; + pthread_ring_t ring; + list_t msg_list; + register int i; + + DEBUG_PTHREAD_ENTRY(); + + /* + * This initialization shouldn't fail. If it did, it would be nice to + * be able to simply panic eventually. XXX + */ + + /* + * Create set for SIGUSR2 which we'll unblock/block + */ + (void) sigemptyset(&set); + (void) sigaddset(&set, SIGUSR2); + + /* + * Allocate an initial buffer size for our pollfd array as well as for + * our descriptor based index. We'll double these buffers as + * necessary at runtime. + */ + poll_fds_size = 64; + poll_fds = malloc(sizeof(struct pollfd) * poll_fds_size); + poll_nfds = 0; + poll_idx_size = 64; + poll_idx = malloc(sizeof(struct poll_msg) * poll_idx_size); + for (i = 0; i < poll_idx_size; i++) + poll_idx[i].msg = NULL; + DLIST_INIT(&msg_list); + + /* + * Initialize message port and associated ring. The message port is + * module global, so that it be public to pthread_poll_ring(). + */ + (void) pthread_port_init(&pthread_poll_thread_port); + (void) pthread_ring_init(&ring); + (void) pthread_port_set_ring(&pthread_poll_thread_port, &ring); + + /* + * Notify parent that we're ready. + */ + (void) pthread_ring_notify(&pthread_poll_thread_started_ring); + + /* + * Main loop from which we never exit + */ + for (;;) { + register int n; + int timeout; + struct timeval tv, ttv; + struct poll_msg *msg, *nextmsg; + + /* + * Get time of day in a rather high resolution. We need to + * do this to be able to evaluate timeouts later on. We + * attempt to only require one time syscall per loop. + */ + (void) gettimeofday(&tv, NULL); + + pollingevents = 0; + + /* + * Process any messages. We need to add the descriptors if + * they aren't already added. Also store yet unsatisfied + * request messages into a list. + */ + while ((msg = (struct poll_msg *)pthread_msg_get( + &pthread_poll_thread_port)) != NULL) { + if (msg->cancel) { + /* + * Immediately satisfy request on demand + */ + msg->error = ECANCELED; + DLIST_UNLINK(&msg_list, (node_t *)msg); + poll_thread_detach_fds(msg); + (void) pthread_msg_reply(&msg->msgnode); + continue; + } + if (poll_thread_attach_fds(msg) == 0) { + msg->ready = msg->error = 0; + if (msg->timeout != -1) { + /* + * Convert millisecond timeout to an + * absolute time timeval + */ + msg->expires.tv_sec = tv.tv_sec; + msg->expires.tv_usec = tv.tv_usec; + ttv.tv_sec = msg->timeout / 1000; + ttv.tv_usec = (msg->timeout % 1000) + * 1000; + timeradd(&msg->expires, &ttv, + &msg->expires); + } + DLIST_APPEND(&msg_list, (node_t *)msg); + } else { + msg->ready = 0; + msg->error = EINVAL; + (void) pthread_msg_reply(&msg->msgnode); + } + } + + /* + * Process timeouts. For request messages which timed out, + * satisfy them immediately using ETIMEDOUT error. + * This also allows to evaluate which is the soonest to expire + * entry, which poll(2) will have to use as timeout. + */ + ttv.tv_sec = ttv.tv_usec = 99999; + for (msg = DLIST_TOP(&msg_list); msg != NULL; msg = nextmsg) { + nextmsg = DLIST_NEXT(msg); + + if (msg->timeout == -1) + continue; + if (timercmp(&msg->expires, &tv, <)) { + msg->error = ETIMEDOUT; + DLIST_UNLINK(&msg_list, (node_t *)msg); + poll_thread_detach_fds(msg); + (void) pthread_msg_reply(&msg->msgnode); + } else if (timercmp(&msg->expires, &ttv, <)) { + ttv.tv_sec = msg->expires.tv_sec; + ttv.tv_usec = msg->expires.tv_usec; + } + } + + /* + * If there are no registered descriptors to poll for, wait + * using the thread friendly ring until messages occur, and + * reiterate. + */ + if (poll_nfds == 0) { + (void) pthread_ring_wait(&ring, NULL); + continue; + } + + /* + * Perform polling. poll(2) for as much time as possible, + * although making sure to allow the soonest to expire query + * to stop polling. Next to expire entry time is in ttv and + * current time in tv. Calculate difference and convert to + * milliseconds. + */ + if (ttv.tv_sec == 99999 && ttv.tv_usec == 99999) + timeout = -1; + else { + timersub(&ttv, &tv, &ttv); + timeout = (ttv.tv_sec * 1000) + (ttv.tv_usec / 1000); + } + + /* + * Unblock the SIGUSR2 signal, which we should be the only + * thread to receive, all other threads blocking it. + * Only leave it unblocked for the duration of the poll(2) + * syscall. We cause our loop to reiterate in any case of + * error, EINTR or no file descriptor with pending event. + */ + (void) pthread_sigmask(SIG_UNBLOCK, &set, NULL); + polling++; + + n = 0; + if (pollingevents != 0) + goto unblock; + + n = poll(poll_fds, poll_nfds, timeout); + +unblock: + polling--; + (void) pthread_sigmask(SIG_BLOCK, &set, NULL); + if (pollingevents != 0 || n < 1) + continue; + + /* + * Verify which descriptors have interesting events set, + * increasing events counter of corresponding requests. + */ + for (i = 0; n != 0 && i < poll_nfds; i++) { + if (poll_fds[i].revents != 0) { + (poll_idx[poll_fds[i].fd].msg->ready)++; + n--; + } + } + /* + * Now verify pending request messages for events, and satisfy + * the requests of those who do. + */ + for (msg = DLIST_TOP(&msg_list); msg != NULL; msg = nextmsg) { + nextmsg = DLIST_NEXT(msg); + + if (msg->ready != 0) { + /* + * ready and error fields are already set + */ + DLIST_UNLINK(&msg_list, (node_t *)msg); + poll_thread_detach_fds(msg); + (void) pthread_msg_reply(&msg->msgnode); + } + } + } + + /* NOTREACHED */ + DEBUG_PTHREAD_EXIT(); + pthread_exit(NULL); + return NULL; +} + +/* + * Permits to merge supplied pollfd set with the main set + */ +static int +poll_thread_attach_fds(struct poll_msg *msg) +{ + register int i, fd, idx; + + DEBUG_PTHREAD_ENTRY(); + + for (i = 0; i < msg->nfds; i++) { + fd = msg->fds[i].fd; + + /* + * Ignore unset descriptors + */ + if (fd == -1) + continue; + + /* + * Grow index buffer if necessary. Either grow by doubling + * size, or even more if necessary to hold index to fd. + * If we only grew to hold fd, we might need to realloc(3) too + * often. Take care to also NULL msg field of new entries. + */ + if (poll_idx_size <= fd) { + struct poll_idx *idx; + int size, i2; + + size = poll_idx_size * 2; + if (fd > size) + size = fd; + if ((idx = realloc(poll_idx, + sizeof(struct poll_idx) * size)) == NULL) + goto err; + poll_idx = idx; + for (i2 = poll_idx_size; i2 < size; i2++) + poll_idx[i2].msg = NULL; + poll_idx_size = size; + } + + /* + * Error if descriptor not unique before adding to set. + * We do not allow multiple threads polling on the same + * descriptor at the same time in our system. We would + * otherwise need to gracefully handle duplicates, + * multiplexing them, which isn't required at all by our + * applications. So let's keep things simple. + */ + if (poll_idx[fd].msg != NULL) + goto err; + + /* + * Resize pollfd array if needed. Grow by doubling. + * This should happen very rarely. + * XXX We could check this condition only once at the + * top of this fonction and take in consideration the + * number of descriptors to add, if wanted for optimization. + */ + if (poll_fds_size <= poll_nfds) { + struct pollfd *ptr; + + if ((ptr = realloc(poll_fds, + sizeof(struct pollfd) * (poll_fds_size * 2))) + == NULL) + goto err; + poll_fds = ptr; + poll_fds_size *= 2; + } + + /* + * Finally add descriptor to set and register it for indexing. + * We simply need to append it to the existing entries in our + * global polling set array. + */ + idx = poll_nfds; + poll_fds[idx].fd = fd; + poll_fds[idx].events = msg->fds[i].events; + poll_fds[idx].revents = 0; + poll_idx[fd].msg = msg; + poll_idx[fd].idx = idx; + poll_nfds = ++idx; + } + + DEBUG_PTHREAD_EXIT(); + return 0; + +err: + (void) poll_thread_detach_fds(msg); + + DEBUG_PTHREAD_EXIT(); + return -1; +} + +/* + * Permits to disunite supplied pollfd set from the main set. Also sets the + * revents fields of the supplied set to the ones of the main set. + */ +static void +poll_thread_detach_fds(struct poll_msg *msg) +{ + register int i, fd, idx; + + DEBUG_PTHREAD_ENTRY(); + + for (i = 0; i < msg->nfds; i++) { + fd = msg->fds[i].fd; + + /* + * Make sure fd was properly registered + */ + if (poll_idx[fd].msg != msg) + continue; + + /* + * Find index in global pollfd set for this fd + */ + idx = poll_idx[fd].idx; + + /* + * Update pollfd entry according to global one + */ + msg->fds[i].revents = poll_fds[idx].revents; + + /* + * Unlink fd from the global set. The removal method is + * simple; Take the last entry of the global set and move it + * over the current entry, updating index links, and lower + * the gobal nfds by one. If we're the last entry, simply + * remove it invalidating its index entry lowering the global + * nfds. + */ + + if (--poll_nfds != idx) { + /* + * Not last entry, move last entry over entry to + * delete. + */ + register struct pollfd *deleted, *last; + int deleted_fd, deleted_idx; + + last = &poll_fds[poll_nfds]; + deleted = &poll_fds[idx]; + deleted_fd = deleted->fd; + deleted_idx = poll_idx[deleted_fd].idx; + + /* Copy last entry over deleted one */ + deleted->fd = last->fd; + deleted->events = last->events; + deleted->revents = last->revents; + + /* + * Reindex last entry which was moved, don't touch + * the msg pointer though. + */ + poll_idx[last->fd].idx = deleted_idx; + + /* And finally invalidate last entry */ + poll_idx[deleted_fd].msg = NULL; + } else { + /* Invalidate last entry */ + poll_idx[poll_fds[poll_nfds].fd].msg = NULL; + } + } + + DEBUG_PTHREAD_EXIT(); +} + +/* + * Called upon reception of SIGUSR2 + */ +/* ARGSUSED */ +static void +poll_thread_sighandler(int sig) +{ + + DEBUG_PTHREAD_ENTRY(); + + pollingevents++; + + DEBUG_PTHREAD_EXIT(); +} + + + +/* + * Public API exported functions + */ + +/* + * Must be called before launching any thread. Sets up the signal mask and + * launches the dedicated poll slave thread. Important note: this system + * clobbers the SIGUSR2 signal, which the application can no longer use for + * other purposes. The only solution to wake the thread manager thread from + * poll(2) is either to trigger an event through a dedicated filedescriptor, + * or to send a signal to the process which only the polling thread allows. + */ +int +pthread_poll_init(void) +{ + int error; + sigset_t set; + pthread_attr_t attr; + pthread_t thread; + + DEBUG_PTHREAD_ENTRY(); + + if (pthread_poll_initialized) { + error = 0; + goto err; + } + + /* + * First block SIGUSR2 signal in the parent. The reason why this must + * be called before the application launches any thread is that + * threads inherit the sigmask of their parent, and that all threads, + * but the polling thread, must block the signal. This ensures that + * only the wanted thread wakes up when a SIGUSR2 signal is received. + * This way, we can interrupt the polling thread in poll(2), for + * instance, and cause it to reiterate its main loop. + */ + (void) sigemptyset(&set); + (void) sigaddset(&set, SIGUSR2); + if ((error = pthread_sigmask(SIG_BLOCK, &set, NULL)) != 0) + goto err; + + /* + * We'll use this pthread_ring_t to get notification from child that + * it is ready to process requests before proceeding. + */ + if ((error = pthread_ring_init(&pthread_poll_thread_started_ring)) + != 0) + goto err; + + /* + * We may now launch the poll thread and wait for notification from it + * that it is ready to serve requests. We won't need to exit this + * thread, so it can be launched in detached state. + */ + if ((error = pthread_attr_init(&attr)) != 0) + goto err; + if ((error = pthread_attr_setdetachstate(&attr, TRUE)) != 0) + goto err; + if ((error = pthread_create(&thread, &attr, poll_thread, NULL)) != 0) + goto err; + + /* + * Wait until thread is ready to serve requests + */ + (void) pthread_ring_wait(&pthread_poll_thread_started_ring, NULL); + + pthread_poll_initialized = TRUE; + + return 0; + +err: + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * poll(2) replacement which can also be awakened by a notification happening + * on the specified ring. This for instance allows to process thread messages + * as well as descriptor events. Like poll(2), returns the number of + * descriptors with events on success (can be 0), or returns -1 with the + * specified error set in errno. Unlike poll, the error ETIMEDOUT will occur + * if the timeout expires before an event existed, or ECANCELLED if a ring + * notification event occurred instead of a filedescriptor one. Can also + * return errors such as EINVAL. + * XXX Check for ETIMEDOUT! We probably don't do this yet. Also, we could + * return 0 in this case like poll(2). + */ +int +pthread_poll_ring(struct pollfd *fds, nfds_t nfds, int timeout, + pthread_ring_t *ring) +{ + int error; + struct poll_data *data; + pthread_ring_t *oring; + + DEBUG_PTHREAD_ENTRY(); + + if (!pthread_poll_initialized) { + error = EINVAL; + goto err; + } + + /* + * Implicit process and thread specific initializations + */ + if ((error = pthread_once(&pthread_poll_proc_initialized, + pthread_poll_proc_init2)) != 0) + goto err; + /* + * XXX Use a mutex or pthread_once() equivalent here too? + */ + if ((data = pthread_getspecific(pthread_poll_proc_key)) == NULL) { + if ((error = pthread_poll_thread_init(&data)) != 0) + goto err; + } + + /* + * Perform some sanity checking on supplied arguments + */ + if (fds == NULL || nfds < 1 || ring == NULL || ring->magic != + PRING_MAGIC) { + error = EINVAL; + goto err; + } + + /* + * Ensure that our message port's ring uses the same ring which + * the user supplies us. If we didn't do this we would need to + * be able to wait for events on more than one ring simultaneously. + * Because we don't have a ring multiplexer object yet (which would + * be needed since a ring maps to a conditional variable among other + * things), we need to do process this way. + * XXX Could there be a race condition here? It needs to be stressed. + */ + { + int mevent; + + mevent = (data->port.ring != NULL ? + data->port.ring->mevent : 0); + oring = data->port.ring; + (void) pthread_port_set_ring(&data->port, ring); + data->port.ring->mevent = mevent; + } + + /* + * Send query to polling thread. It is safe to simply reuse our + * message since we then expect a reply back and synchronize it. + */ + data->msg.cancel = FALSE; + data->msg.fds = fds; + data->msg.nfds = nfds; + data->msg.timeout = timeout; + if ((error = pthread_msg_put(&pthread_poll_thread_port, + &data->msg.msgnode)) != 0) + goto err; + + /* + * Interrupt polling thread which may still be waiting in poll(2). + * We do this by sending SIGUSR2 to the process, which only the + * polling thread is not blocking. This causes the thread to reiterate + * its main loop, thus processing this message and going back to + * sleep in poll(2). + */ + POLLWAKE(); + + /* + * Wait until en event occurs and notifies our ring. An event could + * either be triggered by the poll request ending or by another + * interrupting event on the supplied ring. If a message is queued + * on the port between pthread_port_set_ring() and + * pthread_ring_wait(), the latter immediately returns. + */ + if ((error = pthread_ring_wait(ring, NULL)) != 0) + goto err; + if (pthread_msg_get(&data->port) == NULL) { + /* + * No message replied back from poll thread yet, this means + * that our ring was notified by another event. Cancel request + * by sending event back with the cancel flag, and wait for + * reply message to occur (which will be the original request + * results we were waiting for). error field will be set to + * ECANCELED by the poll thread. + */ + data->msg.cancel = TRUE; + (void) pthread_msg_put(&pthread_poll_thread_port, + &data->msg.msgnode); + POLLWAKE(); + while (pthread_msg_get(&data->port) == NULL) + (void) pthread_ring_wait(ring, NULL); + } + /* Unclobber user supplied ring from our port events */ + (void) pthread_port_set_ring(&data->port, oring); + + /* + * Error, return error number. + */ + if (data->msg.error != 0) { + error = data->msg.error; + goto err; + } + + /* + * Success, return number of descriptors with detected events. + */ + DEBUG_PTHREAD_EXIT(); + return data->msg.ready; + +err: + errno = error; + + DEBUG_PTHREAD_EXIT(); + return -1; +} + +/* + * accept(2) replacement which can both observe a timeout and be interrupted + * via pthread_ring_t events. Internally implemented using + * pthread_poll_ring(). Will internally set the descriptor in non-blocking + * mode if necessary, then reverting it to the mode it was supplied in. + * Returns a new descriptor on success, or -1 on error, in which case errno + * is set. errno can then be EINVAL, ETIMEDOUT, ECANCELED, or others. + * Timeout is in milliseconds, like for poll(2) and can be -1. + */ +int +pthread_accept_ring(int s, struct sockaddr *addr, socklen_t *addrlen, + int timeout, pthread_ring_t *ring) +{ + int oflags, nflags, d, error = 0; + struct pollfd fd; + + DEBUG_PTHREAD_ENTRY(); + + if (!pthread_poll_initialized) { + errno = EINVAL; + goto err; + } + + /* + * First get current fcntl status flags, and set descriptor to + * non-blocking mode if necessary. + */ + if ((oflags = nflags = fcntl(s, F_GETFL)) == -1) + goto err; + if ((oflags & O_NONBLOCK) == 0) { + nflags |= O_NONBLOCK; + if (fcntl(s, F_SETFL, nflags) == -1) + goto err; + } + + if ((d = accept(s, addr, addrlen)) == -1) { + if (errno != EAGAIN) /* XXX Add others? */ + goto end; + } else + goto end; + + /* + * EAGAIN, poll until completion, timeout or ring event. + */ + fd.fd = d; + fd.events = POLLIN; + if ((error = pthread_poll_ring(&fd, 1, timeout, ring)) == 1 && + (fd.revents & POLLIN) != 0) + error = 0; + else + error = errno; + +end: + /* + * Restore supplied descriptor fcntl status flags if necessary + */ + if (nflags != oflags) + (void) fcntl(s, F_SETFL, oflags); + + if (error != 0) { + if (d != -1) { + (void) close(d); + d = -1; + } + errno = error; + goto err; + } + + DEBUG_PTHREAD_EXIT(); + return d; + +err: + DEBUG_PTHREAD_EXIT(); + return -1; +} + +/* + * connect(2) replacement which can both observe a timeout and be interrupted + * via pthread_ring_t events. Internally implemented using + * pthread_poll_ring(). Will internally set the descriptor in non-blocking + * mode if necessary, then reverting it back to the mode it was supplied in. + * Returns 0 on success, or -1, in which case errno is set. errno can be + * EINVAL, ETIMEDOUT, ECANCELED or others. + * Timeout is in milliseconds, like for poll(2) and can be -1. + * For the application to know the actual connection status result, it should + * poll until completion and verify the status using getsockopt(2) with + * SOL_SOCKET level and SO_ERROR option. It can alternatively continue to call + * this function in a loop until completion. Calling the function on an + * already connected socket will result in EISCONN. + */ +int +pthread_connect_ring(int s, const struct sockaddr *name, socklen_t namelen, + int timeout, pthread_ring_t *ring) +{ + int oflags, nflags, error = 0; + struct pollfd fd; + + DEBUG_PTHREAD_ENTRY(); + + if (!pthread_poll_initialized) { + errno = EINVAL; + goto err; + } + + /* + * First get current fcntl status flags, and set descriptor to + * non-blocking mode if necessary. + */ + if ((oflags = nflags = fcntl(s, F_GETFL)) == -1) + goto err; + if ((oflags & O_NONBLOCK) == 0) { + nflags |= O_NONBLOCK; + if (fcntl(s, F_SETFL, nflags) == -1) + goto err; + } + + if ((error = connect(s, name, namelen)) == -1) { + if (errno != EINPROGRESS && errno != EALREADY) { + error = errno; + goto end; + } + } else + goto end; + + /* + * EINPROGRESS or EALREADY, poll until completion, timeout or ring + * event. + */ + fd.fd = s; + fd.events = POLLOUT; + if (pthread_poll_ring(&fd, 1, timeout, ring) == 1 && + (fd.revents & POLLOUT) != 0) { + socklen_t l; + + /* + * connect(2) completed, return result + */ + if (getsockopt(s, SOL_SOCKET, SO_ERROR, &error, &l) == -1) + error = errno; + } + +end: + /* + * Restore supplied descriptor fcntl status flags if necessary + */ + if (nflags != oflags) + (void) fcntl(s, F_SETFL, oflags); + + if (error != 0) { + errno = error; + goto err; + } + + DEBUG_PTHREAD_EXIT(); + return 0; + +err: + DEBUG_PTHREAD_EXIT(); + return -1; +} diff --git a/mmsoftware/pthread_util/mm_pthread_poll.h b/mmsoftware/pthread_util/mm_pthread_poll.h new file mode 100644 index 0000000..729862f --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_poll.h @@ -0,0 +1,63 @@ +/* $Id: mm_pthread_poll.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#ifndef MM_PTHREAD_POLL_H +#define MM_PTHREAD_POLL_H + + + +#include +#include +#include +#include +#include + +#include + + + +extern int pthread_poll_init(void); +extern int pthread_poll_ring(struct pollfd *, nfds_t, int, + pthread_ring_t *); +extern int pthread_accept_ring(int, struct sockaddr *, socklen_t *, int, + pthread_ring_t *); +extern int pthread_connect_ring(int, const struct sockaddr *, socklen_t, + int, pthread_ring_t *); + + + +#endif diff --git a/mmsoftware/pthread_util/mm_pthread_pool.c b/mmsoftware/pthread_util/mm_pthread_pool.c new file mode 100644 index 0000000..6684c00 --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_pool.c @@ -0,0 +1,504 @@ +/* $Id: mm_pthread_pool.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2004-2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Implementation of a pool of ready threads which adapts with concurrency + * needs. These ready threads can serve requests passed through efficient + * inter-thread messaging. mmpool(3) is used for the pool functionality. + */ + + + +#include +#include +#include + +#include +#include + + + +MMCOPYRIGHT("@(#) Copyright (c) 2004-2005\n\ +\tMatthew Mondor. All rights reserved.\n"); +MMRCSID("$Id: mm_pthread_pool.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $"); + + + +/* + * STATIC FUNCTIONS PROTOTYPES + */ + +inline static pthread_object_t *thread_object_alloc(void); +inline static void thread_object_free(pthread_object_t *); +static bool thread_object_constructor(pnode_t *); +static void thread_object_destructor(pnode_t *); +static void *thread_object_main(void *); + + + +/* + * GLOBALS + */ + +static bool thread_object_initialized = FALSE; +static pthread_attr_t thread_object_attr; +static pool_t thread_object_pool; +static pool_t thread_object_msg_pool; +static pthread_mutex_t thread_object_pool_mutex = + PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t thread_object_msg_pool_mutex = + PTHREAD_MUTEX_INITIALIZER; +static pthread_ring_t thread_started_ring; + + + +/* + * EXPORTED PUBLIC FUNCTIONS + */ + +/* + * Must be called to initialize the pthreads pool subsystem, before calling + * any other function of this API. Returns 0 on success, or an error number. + * threads are launched, and more will be launched in increments + * of whenever necessary. These will also only be destroyed in + * decrements of whenever that many threads have not been in use for + * some time, and a minimum of threads will always be kept. + * Setting to high values may actually degrade performance with some + * unefficient threading implementations. It is not recommended to use more + * than 8 using the pth(3) library. Using NetBSD 2.0+ SA threads, a high + * number does not reduce performance. We current do not observe any limit + * whatsoever according to the number of threads launched over time. It is the + * application's responsibility to ensure to observe decent concurrency limits + * before calling pthread_object_call(). + */ +int +pthread_object_init(int initial) +{ + int error = 0; + + DEBUG_PTHREAD_ENTRY(); + + if (thread_object_initialized) { + error = EINVAL; + goto err; + } + + /* + * Create attributes which will be used for threads of the pool. + * We want them to be joinable. + */ + if ((error = pthread_attr_init(&thread_object_attr)) != 0) + goto err; + if ((error = pthread_attr_setdetachstate(&thread_object_attr, 0)) + != 0) + goto err; + + /* + * We use this ring to obtain notification of ready children when + * launching them. This is required for proper synchronization to + * avoid aweful race conditions. + */ + if ((error = pthread_ring_init(&thread_started_ring)) != 0) + goto err; + + /* + * First initialize the message subsystem pool + */ + if (!pool_init(&thread_object_msg_pool, "thread_object_msg_pool", + malloc, free, NULL, NULL, sizeof(pthread_object_msg_t), + 32768 / sizeof(pthread_object_msg_t), 1, 0)) { + error = ENOMEM; + goto err; + } + + /* + * Now initialize the threads pool. This creates threads, uses + * synchronization with thread_started_ring, and uses the message + * subsystem, which all must be initialized and ready. + */ + if (!pool_init(&thread_object_pool, "thread_object_pool", + malloc, free, thread_object_constructor, thread_object_destructor, + sizeof(pthread_object_t), initial, 1, 0)) { + error = ENOMEM; + goto err; + } + + thread_object_initialized = TRUE; + + DEBUG_PTHREAD_EXIT(); + return 0; + +err: + if (POOL_VALID(&thread_object_msg_pool)) + pool_destroy(&thread_object_msg_pool); + if (POOL_VALID(&thread_object_pool)) + pool_destroy(&thread_object_pool); + + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Allows allocation/creation of a message suitable for asynchronous requests + * with the threads via their main message port provided by this system. + * Returns new message, or NULL on error. + */ +inline pthread_object_msg_t * +pthread_object_msg_alloc(void) +{ + pthread_object_msg_t *msg = NULL; + + DEBUG_PTHREAD_ENTRY(); + + if (pthread_mutex_lock(&thread_object_msg_pool_mutex) != 0) + goto err; + msg = (pthread_object_msg_t *)pool_alloc(&thread_object_msg_pool, + FALSE); + (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex); + + (void) pthread_msg_init(&msg->message, NULL); + +err: + DEBUG_PTHREAD_EXIT(); + return msg; +} + +/* + * Permits to free/destroy a message which was allocated using + * pthread_object_msg_alloc() and sent asynchroneously. + */ +inline int +pthread_object_msg_free(pthread_object_msg_t *msg) +{ + int error = 0; + + DEBUG_PTHREAD_ENTRY(); + + (void) pthread_msg_destroy(&msg->message); + + if ((error = pthread_mutex_lock(&thread_object_msg_pool_mutex)) != 0) + goto err; + (void) pool_free((pnode_t *)msg); + (void) pthread_mutex_unlock(&thread_object_msg_pool_mutex); + +err: + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Allows to invoke a thread of the pool to perform execution of the wanted + * function. This is very efficient since the threads are already created and + * are waiting for requests. There is no maximum concurrency limit enforced by + * this system; It is the responsibility of the application to restrict + * concurrency as necessary by keeping internal information on the current + * number of requests. 0 is returned on success, or an error number. + * XXX Add support for synchroneous and asynchroneous operation. Current + * operation is only asynchroneous, but we would like to add a boolean here to + * decide. We also could add back the result value of the thread function + * which would only be useful in synchroneous operation, when we are waiting + * until the task ends... Of course, it's still easy for applications to use + * these in a synchroneous manner, by using a message and/or ring, + * conditionnal variable, etc. + * Also evaluate if a callback function to be called to notify end of + * asynchroneous operation would be useful. + */ +int +pthread_object_call(pthread_port_t **port, + void (*function)(pthread_object_t *, void *), void *args) +{ + pthread_object_t *obj = NULL; + pthread_object_msg_t *msg = NULL; + int error; + + DEBUG_PTHREAD_ENTRY(); + + if (function == NULL) { + error = EINVAL; + goto err; + } + + /* + * Allocate a thread from the pool to reserve it, and tell it to call + * a function via a message. The message cannot be on the stack in + * this case, since it holds arguments to be passed to a thread, and + * also consists of an asynchroneous message for wich we do not expect + * a response back, waiting for it. We just dispatch it and go on. + */ + if ((obj = thread_object_alloc()) == NULL) { + error = ENOMEM; + goto err; + } + if ((msg = pthread_object_msg_alloc()) == NULL) { + error = ENOMEM; + goto err; + } + + msg->command = PTHREAD_OBJ_CALL; + msg->u.call.function = function; + msg->u.call.arguments = args; + if ((error = pthread_msg_put(obj->port, &msg->message)) != 0) + goto err; + + /* + * Everything successful; + * If caller wants the message port of the thread, supply it + */ + if (port != NULL) + *port = obj->port; + + DEBUG_PTHREAD_EXIT(); + return 0; + +err: + if (msg != NULL) + pthread_object_msg_free(msg); + if (obj != NULL) + thread_object_free(obj); + + DEBUG_PTHREAD_EXIT(); + return error; +} + + + +/* + * INTERNAL STATIC FUNCTIONS + */ + +/* + * Internally used to allocate a ready thread from the pool. + */ +inline static pthread_object_t * +thread_object_alloc(void) +{ + pthread_object_t *obj = NULL; + + DEBUG_PTHREAD_ENTRY(); + + if (pthread_mutex_lock(&thread_object_pool_mutex) != 0) + goto err; + obj = (pthread_object_t *)pool_alloc(&thread_object_pool, FALSE); + (void) pthread_mutex_unlock(&thread_object_pool_mutex); + +err: + return obj; +} + +/* + * Internally used to free a no longer needed thread back to the pool of ready + * threads. + */ +inline static void +thread_object_free(pthread_object_t *obj) +{ + + DEBUG_PTHREAD_ENTRY(); + + if (pthread_mutex_lock(&thread_object_pool_mutex) == 0) { + (void) pool_free((pnode_t *)obj); + (void) pthread_mutex_unlock(&thread_object_pool_mutex); + } + + DEBUG_PTHREAD_EXIT(); +} + +/* + * Internally called by mmpool(3) to create a thread object. + */ +static bool +thread_object_constructor(pnode_t *pnode) +{ + pthread_object_t *obj = (pthread_object_t *)pnode; + int success = TRUE; + + DEBUG_PTHREAD_ENTRY(); + + /* + * Note that we leave thread_object_main() initialize the port field + * when it creates its port and ring. + */ + if (pthread_create(&obj->thread, &thread_object_attr, + thread_object_main, obj) != 0) { + success = FALSE; + goto err; + } + + /* + * Wait until new thread ready notification. Without this, at least + * with NetBSD 2.0 SA threads, hell would break loose. Thread creation + * isn't really a bottleneck in our case anyways, since we only need + * to do it when all threads of the pool are already busy. + */ + (void) pthread_ring_wait(&thread_started_ring, NULL); + +err: + DEBUG_PTHREAD_EXIT(); + return success; +} + +/* + * Internally called by mmpool(3) to destroy a thread object. + */ +static void +thread_object_destructor(pnode_t *pnode) +{ + pthread_object_t *obj = (pthread_object_t *)pnode; + pthread_object_msg_t *msg; + + DEBUG_PTHREAD_ENTRY(); + + /* + * To be freed, the thread has to be terminated. We thus send it a + * quit message and then wait for it to exit using pthread_join(). + * Note that we let the thread destroy the port field. Although we + * theoretically could use a message on the stack here, let's be safe. + * Thread destruction is only performed rarely anyways, so this isn't + * a performance problem. + */ + if ((msg = pthread_object_msg_alloc()) != NULL) { + msg->command = PTHREAD_OBJ_QUIT; + (void) pthread_msg_put(obj->port, &msg->message); + } + (void) pthread_join(obj->thread, NULL); + + DEBUG_PTHREAD_EXIT(); +} + +/* + * Actual thread's main loop. We create a message port and listen for command + * messages (quit and call). When we obtain a quit request, we destroy the + * port and exit cleanly. The quit event can never occur during the execution + * of a call command, since it is only called on already freed thread nodes + * (by mmpool(3) pool_free()). It is advized to applications which need to + * obtain and use the port of the thread after thread_object_call() to only + * send proper user messages, not system reserved ones. + */ +static void * +thread_object_main(void *args) +{ + pthread_object_t *obj = (pthread_object_t *)args; + pthread_port_t port; + pthread_ring_t ring; + pthread_msg_t *imsg; + pthread_object_msg_t *msg; + + DEBUG_PTHREAD_ENTRY(); + + /* + * Create our incomming message port as well as its corresponding + * notification ring we can sleep on. Then advertize our port address. + * Ideally, we should somehow panic if any of this initialization + * fails. XXX + */ + (void) pthread_port_init(&port); + (void) pthread_ring_init(&ring); + (void) pthread_port_set_ring(&port, &ring); + obj->port = &port; + + /* + * Notify parent that we are ready, so that it may proceed + */ + (void) pthread_ring_notify(&thread_started_ring); + + /* + * Main loop, which keeps executing until we obtain a PTHREAD_OBJ_QUIT + * message, at which event we cleanly exit. + */ + for (;;) { + /* + * Wait for any message(s) to be available, without taking any + * CPU time. + */ + (void) pthread_ring_wait(&ring, NULL); + + /* + * We were awaken because at least one message is available. + * Process all messages in the queue. + */ + while ((imsg = pthread_msg_get(&port)) != NULL) { + msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]); + if (msg->command == PTHREAD_OBJ_QUIT) { + /* + * We are ordered to exit by the object + * destructor. + */ + pthread_object_msg_free(msg); + goto end; + } + if (msg->command == PTHREAD_OBJ_CALL) { + /* + * Request to execute a function. This means + * that we were allocated/reserved first. + */ + msg->u.call.function(obj, + msg->u.call.arguments); + pthread_object_msg_free(msg); + /* + * Free/release us back, so that we be + * available again to process further + * requests. It is possible that freeing + * ourselves cause a PTHREAD_OBJ_QUIT message + * to be queued soon on our port by the + * destructor function. This is safe, since + * the destructor does not cause us to be + * destroyed until it waits for us to have + * ended cleanly using pthread_join(). + */ + thread_object_free(obj); + } + } + } + +end: + /* + * Discard messages that are still queued on our port (if any) + */ + while ((imsg = pthread_msg_get(&port)) != NULL) { + msg = (pthread_object_msg_t *)(&((pnode_t *)imsg)[-1]); + pthread_object_msg_free(msg); + } + /* + * Free our resources and exit. + */ + (void) pthread_port_destroy(&port); + (void) pthread_ring_destroy(&ring); + + DEBUG_PTHREAD_EXIT(); + pthread_exit(NULL); + + /* NOTREACHED */ + return NULL; +} diff --git a/mmsoftware/pthread_util/mm_pthread_pool.h b/mmsoftware/pthread_util/mm_pthread_pool.h new file mode 100644 index 0000000..96ed45d --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_pool.h @@ -0,0 +1,97 @@ +/* $Id: mm_pthread_pool.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2004-2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#ifndef MM_PTHREAD_POOL_H +#define MM_PTHREAD_POOL_H + + + +#include + +#include +#include + +#include + + + +typedef struct { + pnode_t node; + pthread_t thread; + pthread_port_t *port; +} pthread_object_t; + +typedef struct { + pnode_t node; + pthread_msg_t message; + int command; + union { + /* PTHREAD_OBJ_CALL, sent to thread_object_main() */ + struct { + void (*function)(pthread_object_t *, void *); + void *arguments; + } call; + /* PTHREAD_OBJ_QUIT, sent to thread_oject_reaper() */ + pthread_object_t *quit; + /* PTHREAD_OBJ_USER, custom user messages */ + struct { + int user_command; + void *user_data; + } user; + } u; +} pthread_object_msg_t; + +enum pthread_object_commands { + PTHREAD_OBJ_CALL, + PTHREAD_OBJ_QUIT, + PTHREAD_OBJ_USER, + PTHREAD_OBJ_MAX +}; + + + +extern int pthread_object_init(int); +extern inline pthread_object_msg_t *pthread_object_msg_alloc(void); +extern inline int pthread_object_msg_free( + pthread_object_msg_t *); +extern int pthread_object_call(pthread_port_t **, + void (*)(pthread_object_t *, + void *), void *); + + + +#endif diff --git a/mmsoftware/pthread_util/mm_pthread_sleep.c b/mmsoftware/pthread_util/mm_pthread_sleep.c new file mode 100644 index 0000000..24badc4 --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_sleep.c @@ -0,0 +1,285 @@ +/* $Id: mm_pthread_sleep.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + + +MMCOPYRIGHT("@(#) Copyright (c) 2005\n\ +\tMatthew Mondor. All rights reserved.\n"); +MMRCSID("$Id: mm_pthread_sleep.c,v 1.1 2007/03/13 19:37:22 mmondor Exp $"); + + + +static int pthread_sleep_proc_init(void); +static void pthread_sleep_proc_init2(void); +static int pthread_sleep_thread_init(pthread_ring_t **); +static void pthread_sleep_thread_exit(void *); + +static pthread_key_t pthread_sleep_proc_key; +static pthread_once_t pthread_sleep_proc_initialized = PTHREAD_ONCE_INIT; + + + +static int +pthread_sleep_proc_init(void) +{ + int error; + + DEBUG_PTHREAD_ENTRY(); + + error = pthread_key_create(&pthread_sleep_proc_key, + pthread_sleep_thread_exit); + + DEBUG_PTHREAD_EXIT(); + return error; +} + +static void +pthread_sleep_proc_init2(void) +{ + int error; + + DEBUG_PTHREAD_ENTRY(); + + if ((error = pthread_sleep_proc_init()) != 0) { + (void) fprintf(stderr, "pthread_sleep_proc_init() - %s\n", + strerror(error)); + DEBUG_PTHREAD_EXIT(); + exit(EXIT_FAILURE); + } + + DEBUG_PTHREAD_EXIT(); +} + +static int +pthread_sleep_thread_init(pthread_ring_t **res) +{ + int error; + pthread_ring_t *ring; + + DEBUG_PTHREAD_ENTRY(); + + if ((ring = malloc(sizeof(pthread_ring_t))) == NULL) { + error = ENOMEM; + goto err; + } + + if ((error = pthread_ring_init(ring)) != 0) + goto err; + + if ((error = pthread_setspecific(pthread_sleep_proc_key, ring)) != 0) + goto err; + + *res = ring; + + DEBUG_PTHREAD_EXIT(); + return 0; + +err: + if (ring != NULL) + free(ring); + + DEBUG_PTHREAD_EXIT(); + return error; +} + +static void +pthread_sleep_thread_exit(void *specific) +{ + pthread_ring_t *ring = (pthread_ring_t *)specific; + + DEBUG_PTHREAD_ENTRY(); + + (void) pthread_ring_destroy(ring); + free(ring); + + /* + * Although NetBSD threads don't need this, some pthread + * implementations do. Some will crash for attempting to reference the + * already freed memory twice calling us again until we NULL the + * pointer for the data. Lame, but the POSIX standard was unclear + * about this. + */ + (void) pthread_setspecific(pthread_sleep_proc_key, NULL); + + DEBUG_PTHREAD_EXIT(); +} + + + +/* + * Suspends the calling thread for duration specified in supplied timespec. + * Returns 0 on success, or an error number. + */ +int +pthread_nanosleep(struct timespec *ts) +{ + int error; + struct timeval tv; + struct timespec its; + pthread_ring_t *ring; + + DEBUG_PTHREAD_ENTRY(); + + /* + * Process specific initialization if needed + */ + if ((error = pthread_once(&pthread_sleep_proc_initialized, + pthread_sleep_proc_init2)) != 0) + goto err; + /* + * Thread specific initialization if needed + * XXX Use pthread_once() here too, or mutex around ring? + */ + if ((ring = pthread_getspecific(pthread_sleep_proc_key)) == NULL) { + if ((error = pthread_sleep_thread_init(&ring)) != 0) + goto err; + } + + /* + * Generate absolute time timespec using current time and supplied + * timespec delay. + */ + if (gettimeofday(&tv, NULL) == -1) { + error = errno; + goto err; + } + TIMEVAL_TO_TIMESPEC(&tv, &its); + timespecadd(&its, ts, &its); + + /* + * We can finally sleep. We expect ETIMEDOUT to be the normal return + * value in this case, which we convert to a no-error. Other errors + * will be returned un changed. + */ + if ((error = pthread_ring_wait(ring, &its)) == ETIMEDOUT) + error = 0; + +err: + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Suspends the current thread for the duration specified into supplied + * timeval. Returns 0 on success or an error number. + */ +int +pthread_microsleep(struct timeval *tv) +{ + struct timespec ts; + int error; + + DEBUG_PTHREAD_ENTRY(); + + TIMEVAL_TO_TIMESPEC(tv, &ts); + error = pthread_nanosleep(&ts); + + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Suspends execution of current thread for duration of specified + * milliseconds. Returns 0 on success or an error number. + */ +int +pthread_millisleep(unsigned int ms) +{ + struct timeval tv; + int error; + + DEBUG_PTHREAD_ENTRY(); + + tv.tv_sec = ms / 1000; + tv.tv_usec = (ms % 1000) * 1000; + error = pthread_microsleep(&tv); + + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Suspends execution of thread for duration of specified number of seconds. + * Returns 0 on success or an error number. + */ +unsigned int +pthread_sleep(unsigned int seconds) +{ + struct timespec ts; + int error; + + DEBUG_PTHREAD_ENTRY(); + + ts.tv_sec = seconds; + ts.tv_nsec = 0; + error = pthread_nanosleep(&ts); + + DEBUG_PTHREAD_EXIT(); + return error; +} + +/* + * Suspends execution of thread for durection of specified number of + * microseconds. Like usleep(3). + */ +int +pthread_usleep(useconds_t ms) +{ + struct timeval tv; + int error; + + DEBUG_PTHREAD_ENTRY(); + + tv.tv_sec = 0; + tv.tv_usec = ms; + error = pthread_microsleep(&tv); + + DEBUG_PTHREAD_EXIT(); + return error; +} diff --git a/mmsoftware/pthread_util/mm_pthread_sleep.h b/mmsoftware/pthread_util/mm_pthread_sleep.h new file mode 100644 index 0000000..e76b876 --- /dev/null +++ b/mmsoftware/pthread_util/mm_pthread_sleep.h @@ -0,0 +1,57 @@ +/* $Id: mm_pthread_sleep.h,v 1.1 2007/03/13 19:37:22 mmondor Exp $ */ + +/* + * Copyright (C) 2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#ifndef MM_PTHREAD_SLEEP_H +#define MM_PTHREAD_SLEEP_H + + + +#include +#include +#include + + + +extern int pthread_nanosleep(struct timespec *); +extern int pthread_microsleep(struct timeval *); +extern int pthread_millisleep(unsigned int); +extern unsigned int pthread_sleep(unsigned int); +extern int pthread_usleep(useconds_t); + + + +#endif diff --git a/mmsoftware/pthread_util/tests/msg_test.c b/mmsoftware/pthread_util/tests/msg_test.c new file mode 100644 index 0000000..c97ca88 --- /dev/null +++ b/mmsoftware/pthread_util/tests/msg_test.c @@ -0,0 +1,269 @@ +/* $Id: msg_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $ */ + +/* + * Copyright (C) 2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + + +MMCOPYRIGHT("@(#) Copyright (c) 2005\n\ +\tMatthew Mondor. All rights reserved.\n"); +MMRCSID("$Id: msg_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $"); + + + +#define THREADS 32 +#define ROUNDS 8 +#define TIMEOUT 1 +/*#define PRINTLOCK*/ +/*#define NOPRINT*/ + + + +struct message { + pthread_msg_t node; + int id, i; +}; + + + +int main(void); +static void threadfunc(pthread_object_t *, void *); +static void printfunc(const char *, ...); + + + +static pthread_port_t main_port; +static pthread_mutex_t print_lock; + + + +int +main(void) +{ + pthread_ring_t ring; + struct message *msg; + int i, err; + int threads_args[THREADS]; + struct timeval tv; + struct timespec ts, ts1; + + if ((err = pthread_mutex_init(&print_lock, NULL)) != 0) { + (void) printf("main() - stdout lock - %s\n", strerror(err)); + exit(EXIT_FAILURE); + } + + if ((err = pthread_port_init(&main_port)) != 0 || + (err = pthread_ring_init(&ring)) != 0 || + (err = pthread_port_set_ring(&main_port, &ring)) != 0) { + printfunc("main() - initialization - %s\n", strerror(err)); + exit(EXIT_FAILURE); + } + + printfunc("Main: launching threads\n"); + + if ((err = pthread_poll_init()) != 0) { + printfunc("main() - pthread_poll_init() - %s\n", + strerror(err)); + exit(EXIT_FAILURE); + } + + /* + * Initializes a poll of ready threads which can be dispatched + * functions to execute. + */ + if ((err = pthread_object_init(THREADS + 1)) != 0) { + printfunc("main() - pthread_object_init() - %s\n", + strerror(err)); + exit(EXIT_FAILURE); + } + + /* + * Now dispatch a main reentrant function to many threads, without + * waiting for them to complete, in an asynchroneous manner. + * XXX Because of the way this works, the parent main thread should + * actually already be listening to messages... We did create a port + * however, which should queue messages until we reach the main loop. + */ + for (i = 0; i < THREADS; i++) { + threads_args[i] = i; + if ((err = pthread_object_call(NULL, threadfunc, + &threads_args[i])) != 0) + printfunc("main() - pthread_object_call() - %s\n", + strerror(errno)); + } + + ts1.tv_sec = TIMEOUT; + ts1.tv_nsec = 0; + for (;;) { + /* + * Read messages as long as there are any, and reply to each + * of them in a synchroneous manner. + */ + while ((msg = (struct message *)pthread_msg_get(&main_port)) + != NULL) { + + printfunc( + "Main: Received message %d from thread #%d\n", + msg->i, msg->id); + + if ((err = pthread_msg_reply((pthread_msg_t *)msg)) + != 0) + printfunc( + "Main: pthread_message_reply() - %s\n", + strerror(err)); + } + + /* + * No more messages to process; Wait for any message(s) to be + * available. + * Note that there is special provision in the event where + * this loop first polling for new messages before processing + * them, which causes waiting for the ring to immediately + * return instead of actually waiting if any messages already + * have been sent. + */ + printfunc("Main: Waiting for messages\n"); + + (void) gettimeofday(&tv, NULL); + TIMEVAL_TO_TIMESPEC(&tv, &ts); + timespecadd(&ts, &ts1, &ts); + if ((err = pthread_ring_wait(&ring, &ts)) != 0) { + printfunc("Main: pthread_ring_wait() - %s\n", + strerror(err)); + break; + } + } + + (void) pthread_mutex_destroy(&print_lock); + (void) pthread_port_destroy(&main_port); + (void) pthread_ring_destroy(&ring); + + return 0; +} + +static void +threadfunc(pthread_object_t *obj, void *args) +{ + int id = *(int *)args; + int i, err; + struct message msg; + pthread_port_t rport; + pthread_ring_t rring; + + if ((err = pthread_port_init(&rport)) != 0 || + (err = pthread_ring_init(&rring)) != 0 || + (err = pthread_port_set_ring(&rport, &rring)) != 0 || + (err = pthread_msg_init((pthread_msg_t *)&msg, &rport)) != 0) { + printfunc("threadfunc() - initialization - %s\n", + strerror(err)); + return; + } + + msg.id = id; + + (void) printfunc("Thread #%d started\n", id); + + for (i = 0; i < ROUNDS; i++) { + /* + * Prepare and send synchronous message. For asynchronous + * operation, we would need to allocate a message and to send + * it, and not expect a reply back immediately, even letting + * the other end free the message as necessary. In synchronous + * mode we can use the same message over and over and share + * its memory area using proper send/reply methods for + * synchronization. + */ + msg.i = i; + if ((err = pthread_msg_put(&main_port, (pthread_msg_t *)&msg)) + != 0) + printfunc("Thread: pthread_message_put() - %s\n", + strerror(err)); + + /* Now wait for synchronous reply and discard it */ + if ((err = pthread_ring_wait(&rring, NULL)) != 0) { + printfunc("Thread: pthread_ring_wait() - %s\n", + strerror(err)); + break; + } + if (pthread_msg_get(&rport) == NULL) + printfunc("Thread: pthread_msg_get() == NULL!?\n"); + printfunc("Thread #%d received reply message for %d\n", + id, i); + } + + printfunc("Thread #%d ending\n", id); + + (void) pthread_port_destroy(&rport); + (void) pthread_ring_destroy(&rring); + (void) pthread_msg_destroy((pthread_msg_t *)&msg); +} + +static void +printfunc(const char *fmt, ...) +{ + char buf[1024]; + va_list arg_ptr; + int len; + +#ifdef NOPRINT + return; +#endif + + *buf = '\0'; + va_start(arg_ptr, fmt); + if ((len = vsnprintf(buf, 1023, fmt, arg_ptr)) < 1) + return; + va_end(arg_ptr); + +#ifdef PRINTLOCK + (void) pthread_mutex_lock(&print_lock); +#endif + (void) fwrite(buf, len, 1, stdout); +#ifdef PRINTLOCK + (void) fflush(stdout); + (void) pthread_mutex_unlock(&print_lock); +#endif +} diff --git a/mmsoftware/pthread_util/tests/poll_test.c b/mmsoftware/pthread_util/tests/poll_test.c new file mode 100644 index 0000000..843dd78 --- /dev/null +++ b/mmsoftware/pthread_util/tests/poll_test.c @@ -0,0 +1,73 @@ +/* $Id: poll_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $ */ + +/* + * Copyright (C) 2005, Matthew Mondor + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. All advertising materials mentioning features or use of this software + * must display the following acknowledgement: + * This product includes software developed by Matthew Mondor. + * 4. The name of Matthew Mondor may not be used to endorse or promote + * products derived from this software without specific prior written + * permission. + * 5. Redistribution of source code may not be released under the terms of + * any GNU Public License derivate. + * + * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + + +MMCOPYRIGHT("@(#) Copyright (c) 2005\n\ +\tMatthew Mondor. All rights reserved.\n"); +MMRCSID("$Id: poll_test.c,v 1.1 2007/03/13 19:37:26 mmondor Exp $"); + + + +int main(void); + + + +int +main(void) +{ + int err; + + if ((err = pthread_poll_init()) != 0) { + (void) fprintf(stderr, "main() - pthread_poll_init() - %s\n", + strerror(err)); + exit(EXIT_FAILURE); + } + + return 0; +} diff --git a/mmsoftware/pthread_util/tests/polltest.c b/mmsoftware/pthread_util/tests/polltest.c new file mode 100644 index 0000000..ceedfd5 --- /dev/null +++ b/mmsoftware/pthread_util/tests/polltest.c @@ -0,0 +1,153 @@ +/* + * The goal of this program is to verify if it is valid for multiple threads + * to poll(2) on the same filedescriptor, and if so, what happens whenever + * an event is triggered on that descriptor. + * + * XXX Problems: + * - Only one of the polling threads seems to be awaken when an event occurs + * on the descriptor. This probably means that using a signal would be + * better... I sure don't want to need a filedescriptor per ring... + * If I did however, would this really hurt? Are there that many rings? + * But oops, this actually means two filedescriptors for each! + * using a signal is probably better. However, we then need to clobber some + * signal... We could use SIGUSR2. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +#define THREADS 8 + + + +int main(void); +static void *thread_poll(void *); +static void *thread_notify(void *); +static void thread_print(int, const char *); + + + +static int sockets[2]; +static int threadargs[THREADS]; +static pthread_mutex_t print_mutex; +static pthread_mutex_t sockets_mutex; + + + +int +main(void) +{ + pthread_t threadid; + int i; + + /* + * Create socketpair which will be used to trigger events to awaken + * polling threads. + */ + if (socketpair(AF_LOCAL, SOCK_DGRAM, 0, sockets) != 0) { + perror("socketpair()"); + exit(EXIT_FAILURE); + } + if (fcntl(sockets[0], F_SETFL, O_NONBLOCK) != 0 || + fcntl(sockets[1], F_SETFL, O_NONBLOCK) != 0) { + perror("fcntl()"); + exit(EXIT_FAILURE); + } + + pthread_mutex_init(&print_mutex, NULL); + pthread_mutex_init(&sockets_mutex, NULL); + + /* + * First launch THREADS polling threads + */ + for (i = 0; i < THREADS; i++) { + threadargs[i] = i; + pthread_create(&threadid, NULL, thread_poll, &threadargs[i]); + } + sleep(1); + + /* + * And finally launch notifyer thread + */ + pthread_create(&threadid, NULL, thread_notify, NULL); + + /* + * Now just wait + */ + for (;;) + (void) pause(); +} + +static void * +thread_poll(void *args) +{ + struct pollfd fds[1]; + int n; + int id = *(int *)args; + char c; + + fds[0].fd = sockets[1]; + fds[0].events = POLLIN; + for (;;) { + thread_print(id, "Polling"); + if ((n = poll(fds, 1, -1)) == -1) { + perror("poll()"); + return NULL; + } + thread_print(id, "Poll returned"); + if (n == 0) { + thread_print(id, "Woke up! (no data)"); + continue; + } + if ((fds[0].revents & POLLIN) != 0) { + /* Attempt to read event/byte */ + thread_print(id, "Woke up! (with data)"); + pthread_mutex_lock(&sockets_mutex); + while ((n = read(sockets[1], &c, 1)) == 1) + thread_print(id, "Read data!"); + if (n == -1) + thread_print(id, strerror(errno)); + pthread_mutex_unlock(&sockets_mutex); + } + } +} + +/* ARGSUSED */ +static void * +thread_notify(void *args) +{ + char c = '\0'; + struct pollfd fds[1]; + + fds[0].fd = sockets[0]; + fds[0].events = POLLOUT; + for (;;) { + sleep(1); + thread_print(-1, "Notifying"); + pthread_mutex_lock(&sockets_mutex); + if (write(sockets[0], &c, 1) != 1) { + /* Poll until we can send data */ + (void) poll(fds, 1, -1); + } + pthread_mutex_unlock(&sockets_mutex); + } +} + +static void +thread_print(int id, const char *str) +{ + + pthread_mutex_lock(&print_mutex); + printf("%d: %s\n", id, str); + pthread_mutex_unlock(&print_mutex); +} diff --git a/mmsoftware/pthread_util/tests/sigtest.c b/mmsoftware/pthread_util/tests/sigtest.c new file mode 100644 index 0000000..cc4b681 --- /dev/null +++ b/mmsoftware/pthread_util/tests/sigtest.c @@ -0,0 +1,146 @@ +/* + * The goal of this program is to verify if it is valid for multiple threads + * to be awaken from a poll(2) call by a single process-wide signal. This + * would allow the notifyer of a thread message event to generate this signal + * if needed to cause interested treads to wake up. Threads which do not want + * to receive the signal can simply ignore it using pthread_sigmask(). + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + + +#define THREADS 8 + + + +int main(void); +static void *thread_poll(void *); +static void *thread_notify(void *); +static void thread_print(int, const char *); + + + +static int sockets[2]; +static int threadargs[THREADS]; +static pthread_mutex_t print_mutex; +static pthread_mutex_t sockets_mutex; + + + +int +main(void) +{ + pthread_t threadid; + int i; + + /* + * Create socketpair which will be used to trigger events to awaken + * polling threads. + */ + if (socketpair(AF_LOCAL, SOCK_DGRAM, 0, sockets) != 0) { + perror("socketpair()"); + exit(EXIT_FAILURE); + } + if (fcntl(sockets[0], F_SETFL, O_NONBLOCK) != 0 || + fcntl(sockets[1], F_SETFL, O_NONBLOCK) != 0) { + perror("fcntl()"); + exit(EXIT_FAILURE); + } + + pthread_mutex_init(&print_mutex, NULL); + pthread_mutex_init(&sockets_mutex, NULL); + + /* + * First launch THREADS polling threads + */ + for (i = 0; i < THREADS; i++) { + threadargs[i] = i; + pthread_create(&threadid, NULL, thread_poll, &threadargs[i]); + } + sleep(1); + + /* + * And finally launch notifyer thread + */ + pthread_create(&threadid, NULL, thread_notify, NULL); + + /* + * Now just wait + */ + for (;;) + (void) pause(); +} + +static void * +thread_poll(void *args) +{ + struct pollfd fds[1]; + int n; + int id = *(int *)args; + char c; + + fds[0].fd = sockets[1]; + fds[0].events = POLLIN; + for (;;) { + thread_print(id, "Polling"); + if ((n = poll(fds, 1, -1)) == -1) { + perror("poll()"); + return NULL; + } + thread_print(id, "Poll returned"); + if (n == 0) { + thread_print(id, "Woke up! (no data)"); + continue; + } + if ((fds[0].revents & POLLIN) != 0) { + /* Attempt to read event/byte */ + thread_print(id, "Woke up! (with data)"); + pthread_mutex_lock(&sockets_mutex); + while ((n = read(sockets[1], &c, 1)) == 1) + thread_print(id, "Read data!"); + if (n == -1) + thread_print(id, strerror(errno)); + pthread_mutex_unlock(&sockets_mutex); + } + } +} + +/* ARGSUSED */ +static void * +thread_notify(void *args) +{ + char c = '\0'; + struct pollfd fds[1]; + + fds[0].fd = sockets[0]; + fds[0].events = POLLOUT; + for (;;) { + sleep(1); + thread_print(-1, "Notifying"); + pthread_mutex_lock(&sockets_mutex); + if (write(sockets[0], &c, 1) != 1) { + /* Poll until we can send data */ + (void) poll(fds, 1, -1); + } + pthread_mutex_unlock(&sockets_mutex); + } +} + +static void +thread_print(int id, const char *str) +{ + + pthread_mutex_lock(&print_mutex); + printf("%d: %s\n", id, str); + pthread_mutex_unlock(&print_mutex); +}