-/* $Id: mmfd.c,v 1.22 2004/11/12 16:26:01 mmondor Exp $ */
+/* $Id: mmfd.c,v 1.23 2005/09/12 12:04:25 mmondor Exp $ */
/*
* Copyright (C) 2001-2004, Matthew Mondor
MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmfd.c,v 1.22 2004/11/12 16:26:01 mmondor Exp $");
+MMRCSID("$Id: mmfd.c,v 1.23 2005/09/12 12:04:25 mmondor Exp $");
char buf[1024];
va_list arg_ptr;
- *buf = 0;
+ *buf = '\0';
va_start(arg_ptr, fmt);
vsnprintf(buf, 1023, fmt, arg_ptr);
va_end(arg_ptr);
-/* $Id: mmserver2.c,v 1.47 2005/06/21 13:48:20 mmondor Exp $ */
+/* $Id: mmserver2.c,v 1.48 2005/09/12 12:04:25 mmondor Exp $ */
/*
* Copyright (C) 2004, Matthew Mondor
* - Add some support for request maximum timeout (at least for using_execve
* applications, since there is no other way to cause execve(2) requests
* to interrupt than to kill the process from another one (the parent could).
- * This however is tricky. Applications which don't use execve(2) don't need
- * such a timer as they can set one up themselves. However, if they do use
- * execve(2), the children processes cannot rely on SIGALRM anymore. They
+ * This however is tricky. Applications which don't use execve(2) don't need
+ * such a timer as they can set one up themselves. However, if they do use
+ * execve(2), the children processes cannot rely on SIGALRM anymore. They
* then need another process to interrupt them using SIGTERM if they need
- * to be interrupted after a timeout. The parent could do it with it's
+ * to be interrupted after a timeout. The parent could do it with it's
* timer_ctx via mmalarm(3) easily, but because the children are answering
* and handling the incomming requests, how can they tell the parent to
* start such a timer for them, unless I used a special IPC mechanism.
* If they had to launch their own control process using fork(2) it would
- * slow down the system too much... I could perhaps use some kind of
- * AF_LOCAL SOCK_DGRAM trick. A child process would simply send a packet
+ * slow down the system too much... I could perhaps use some kind of
+ * AF_LOCAL SOCK_DGRAM trick. A child process would simply send a packet
* with it's pid_t, causing a new timeout to be created in the parent to
- * kill that pid_t with SIGTERM if it expires. However, we also need to
- * get rid of any existing entry at the SIGCHLD handler... server_execve()
+ * kill that pid_t with SIGTERM if it expires. However, we also need to
+ * get rid of any existing entry at the SIGCHLD handler... server_execve()
* would be responsible for starting the timer before calling execve(2),
- * noticing the parent to do so. The SIGCHLD handler in the parent would
+ * noticing the parent to do so. The SIGCHLD handler in the parent would
* be responsible for stopping the timer if it did not expire already.
* - Find a way to optimize server_address related macros to use a lookup
* table rather than conditionals.
MMCOPYRIGHT("@(#) Copyright (c) 2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmserver2.c,v 1.47 2005/06/21 13:48:20 mmondor Exp $");
+MMRCSID("$Id: mmserver2.c,v 1.48 2005/09/12 12:04:25 mmondor Exp $");
/* DEFINITIONS */
/*
- * We store a node for each child process with it's current status. Each of
+ * We store a node for each child process with it's current status. Each of
* them have the address of their node, so that they efficiently may change
- * their status. We do not need a synchronization lock since every child
+ * their status. We do not need a synchronization lock since every child
* process only accesses it's own node, to which it has a pointer, and because
* only the master parent process adds or removes these nodes.
*/
/*
* Internally stored for each socket created with server_socket_bind().
* Everytime server_socket_bind() is called successfully, we store such a new
- * node in a list_t (parent.sockets_list). When server_start() is called,
+ * node in a list_t (parent.sockets_list). When server_start() is called,
* an array of index with corresponding pollfd and int lock filedescriptors
* is created for efficient internal use in the main server loop.
*/
/*
* We store one such node in the socket-specific address cache, used to
* implement optional rate and/or concurrent number of connections on an
- * address basis. Each socket has an independent cache with it's own
+ * address basis. Each socket has an independent cache with it's own
* corresponding synchronization lock.
*/
struct address_node {
/*
* The following is used to speed up hostname cache lookups when
* frequent connections occur from the same address, where the lookup
- * only has to be done when this entry is added. We store a reference
+ * only has to be done when this entry is added. We store a reference
* count in the hostname_node structure so that we know when it can
* safely be freed.
*/
/*
* Allows the caller to initialize shared memory blocks before starting up the
- * server. The caller is responsible for remembering the pointers to these
+ * server. The caller is responsible for remembering the pointers to these
* addresses, as well as to use any necessary synchronization when accessing
- * the shared memory concurrently. This must be done at system initialization,
+ * the shared memory concurrently. This must be done at system initialization,
* before launching the server processes, or we return NULL with EPERM.
*/
void *
/*
* Permits the caller to initialize a pool of shared resources synchronization
- * read-write locks. This must be done before launching the server processes,
- * or we return -1 with EPERM. It can only be called once, make sure to create
+ * read-write locks. This must be done before launching the server processes,
+ * or we return -1 with EPERM. It can only be called once, make sure to create
* the necessary number of locks at once.
*/
int
/*
* Allows to synchronize wanted shared memory resources, locking or unlocking
* read-write locks in shared or exclusive mode, with optional nonblocking
- * flag. Locks the process until the lock can be obtained in blocking mode.
- * The id corresponds to a lock number index. Can only be used after starting
+ * flag. Locks the process until the lock can be obtained in blocking mode.
+ * The id corresponds to a lock number index. Can only be used after starting
* the server, and of course only if locks were already setup using
* server_rwlocks_init(), or we return -1 with EPERM.
* We return -1 with EINVAL if the supplied lock id is invalid.
/*
* Allows to bind to an AF_INET(6) or AF_LOCAL SOCK_DGRAM or SOCK_STREAM
- * listening socket. Can only be called before starting the server or -1 is
+ * listening socket. Can only be called before starting the server or -1 is
* returned with errno set to EPERM.
*/
int
}
/*
- * Allocate socket_node and initialize it. We allocate it as shared
+ * Allocate socket_node and initialize it. We allocate it as shared
* memory since the hashtable_t and pool_t structures for the address
* cache need to be shared.
*/
}
/*
- * Finally launches the whole thing and start to serve connections. At least
+ * Finally launches the whole thing and start to serve connections. At least
* sockets must have been setup using server_socket_bind() before calling this
- * function, or -1 is returned with EPERM. Otherwise this function never
- * returns on success. If this function returns, the application is expected
- * to exit(3). If it doesn't, an attempt is still made to not leak resources.
+ * function, or -1 is returned with EPERM. Otherwise this function never
+ * returns on success. If this function returns, the application is expected
+ * to exit(3). If it doesn't, an attempt is still made to not leak resources.
*/
int
server_start(struct server_config *config)
/*
* Initialize necessary structures for our sockets; Our pollfd array
- * and corresponding socket_node index array. Also create our
+ * and corresponding socket_node index array. Also create our
* socket-specific locks and hashtable_t if we must observe any
* limits. Note that we still create these locks if a socket needs
* no limits, but that they will not be used for such sockets.
snode->lock = &slocks[i];
/*
* Here we allocate memory on the normal heap for the packet
- * buffer which needs to be process-specific. We however tie
+ * buffer which needs to be process-specific. We however tie
* this buffer pointer to the shared structure, which still
* shouldn't hurt since the vm space of children processes
* should cause the pointer to still point to process-specific
}
/*
- * Initialize hostname cache. We ensure that this cache be large
+ * Initialize hostname cache. We ensure that this cache be large
* enough to be shared among sockets and to accomodate the total of
- * all their allowed addresses. Of course, we only take in
+ * all their allowed addresses. Of course, we only take in
* consideration sockets with hostname resolution enabled.
* Although this cache may be rather large, mmpool(3) ensures to
* reuse recently accessed pages, so that BSD should be able to page
}
/*
- * Initialize global process status stuff. Although the allocation
+ * Initialize global process status stuff. Although the allocation
* pool_t for child_process structures is stored in shared memory,
* the child_pid_status_node one, processes_list and
* processes_pid_table can all use the standard heap since only the
- * parent accesses them. We use a list_t for the list which needs to
+ * parent accesses them. We use a list_t for the list which needs to
* frequently be iterated through for speed, and a hashtable_t if
* needed for pid->status lookup table if using_execve is enabled.
*/
/*
* Can only be called by the parent process (typically in the SIGHUP handler
- * function). This causes all current children processes to recycle whenever
- * they complete their current handler. If interrupt is TRUE, causes them to
- * also immediately abort their current handler and then recycle. Returns -1
+ * function). This causes all current children processes to recycle whenever
+ * they complete their current handler. If interrupt is TRUE, causes them to
+ * also immediately abort their current handler and then recycle. Returns -1
* with EPERM if not called by the parent process.
*/
int
/*
* Block SIGALRM so that we may not be bothered by the process manager
- * freeing nodes, and SIGHUP to not recurse. We also block SIGUSR1 so
+ * freeing nodes, and SIGHUP to not recurse. We also block SIGUSR1 so
* that the children processes do not cause the process manager to be
* called during this time.
*/
/*
* Although when exiting the handler() function the connection is automatically
* closed if need be (I.E. a SOCK_STREAM socket), it is possible to call this
- * function to perform the same functionality. This also handles the address
+ * function to perform the same functionality. This also handles the address
* cache limits update.
*/
void
}
/*
- * A somewhat alarm(3) compatible function using setitimer(2). Suitable to be
+ * A somewhat alarm(3) compatible function using setitimer(2). Suitable to be
* used standalone instead of the deprecated alarm(3), or to be internally used
- * with mmalarm(3). Makes sure to not call setitimer(2) when called with 0
+ * with mmalarm(3). Makes sure to not call setitimer(2) when called with 0
* seconds to disable any timer if no timer is currently running, as an
* optimization.
*/
/*
* execve(2) replacement performing some library-specific sanity checking
- * and never returning if calling execve(2). We also make sure to tie the
+ * and never returning if calling execve(2). We also make sure to tie the
* current client connection to our stdin/stdout filedescriptors, and to
- * leave stderr tied to null(4). We return -1 with EPERM if a SOCK_STREAM
+ * leave stderr tied to null(4). We return -1 with EPERM if a SOCK_STREAM
* socket is not being served by a children process.
* XXX It might be good to append stderr to a file if the application requests
* it, like in mmspawnd(8).
(void) close(local.socket);
/*
- * Restore default signals actions. If we do not do this and the
+ * Restore default signals actions. If we do not do this and the
* current process ignored any signal, they will be ignored after
* execve(2). Ones we handle will be automaticaly restored to the
* default action.
}
/*
- * Main parent process loop. We ensure to manage the children processes pool,
+ * Main parent process loop. We ensure to manage the children processes pool,
* as well as flush expired client addresses entries from each of the socket's
- * caches. We also then flush no longer referenced hostname cached entries from
- * the global hostname resolution cache.
+ * caches. We also then flush no longer referenced hostname cached entries
+ * from the global hostname resolution cache.
*/
static void
parent_main(void)
/*
* Iterate through all nodes of a hashtable_t to reset or expire address
- * and hostname cache nodes. It is called by the above function whenever
- * the soonest to expire node timer expires. This function also leaves
+ * and hostname cache nodes. It is called by the above function whenever
+ * the soonest to expire node timer expires. This function also leaves
* the caller with the next to expire time so that a timer may be restarted.
*/
static bool
time_t rem;
/*
- * If the node expired, reset it. For those which do not, record the
- * soonest to expire one. For nodes which expired and for which no
+ * If the node expired, reset it. For those which do not, record the
+ * soonest to expire one. For nodes which expired and for which no
* connections exist anymore, expunge them, and update the
* hostname_node reference count entry, freeing the hostname cache
* entry if it has no more references.
}
/*
- * This function permits to manage the pool of children processes. It is
+ * This function permits to manage the pool of children processes. It is
* called every second.
*/
/* ARGSUSED */
/*
* Block SIGALRM, SIGHUP and SIGUSR1 then send SIGTERM to all children
- * processes which we are allowed to kill. Then make sure that they
+ * processes which we are allowed to kill. Then make sure that they
* exit. We however won't kill processes which are currently serving
- * requests if exit_interrupt_children is FALSE. We instead will cause
+ * requests if exit_interrupt_children is FALSE. We instead will cause
* them to exit after they terminate serving their request.
*/
(void) sigemptyset(&set);
}
/*
- * The main children process loop. Listen for a client request and process it,
+ * The main children process loop. Listen for a client request and process it,
* and then do the same again until we reached our maximum number of requests
* to process, in which case we exit to be replaced immediately.
*/
/*
* Before releasing the accept lock we must either accept(2)
- * on the TCP port or recvfrom(2) for UDP. This should
+ * on the TCP port or recvfrom(2) for UDP. This should
* normally not block the process as we just used poll(2).
* First set addrl to the proper length for the address family.
*/
/*
* Register or update limits for the specified client address into the current
* socket's address cache, while also resolving the hostname, using the global
- * hostname cache as necessary. address_close() must be called for this address
- * in order to balance this effect. This also will automatically resolve an
- * address to a hostname if necessary (using the hostname cache for speed).
+ * hostname cache as necessary. address_close() must be called for this
+ * address in order to balance this effect. This also will automatically
+ * resolve an address to a hostname if necessary (using the hostname cache for
+ * speed).
* If there is an address cache size issue and that a new address cannot be
- * added into it, NULL is returned. We always return the address_node pointer
- * otherwise. If returned reason is not REASON_OK, an error occured and the
+ * added into it, NULL is returned. We always return the address_node pointer
+ * otherwise. If returned reason is not REASON_OK, an error occured and the
* rejection handler should be called instead of the request handler.
*/
static struct address_node *
struct hostname_node *hnode2;
/*
- * Hostname resolution needed. We ensure to be able for
+ * Hostname resolution needed. We ensure to be able for
* multiple children processes to concurrently perform
* hostname resolution, including possibly on the same
* address since we cannot perform address node granularity
/*
* Update the address concurency counter so that the parent process may
- * garbage collect this address node whenever it expires. We do not care
+ * garbage collect this address node whenever it expires. We do not care
* about the hostname_node reference counter here and let the parent process
* deal with that.
*/
/* NOTREACHED */
break;
case SIGPIPE:
- /* Connection loss. Interrupt the current connection. */
+ /* Connection loss. Interrupt the current connection. */
child_close(TRUE);
/* NOTREACHED */
break;
}
/*
- * Closes connection with client as necessary. If resume is TRUE, uses a
- * non-local jump to resume to accepting other client requests. Otherwise,
+ * Closes connection with client as necessary. If resume is TRUE, uses a
+ * non-local jump to resume to accepting other client requests. Otherwise,
* this function returns.
*/
static void
-/* $Id: mmserver2.h,v 1.11 2005/06/21 13:48:20 mmondor Exp $ */
+/* $Id: mmserver2.h,v 1.12 2005/09/12 12:04:25 mmondor Exp $ */
/*
* Copyright (C) 2004, Matthew Mondor
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
+/*
+ * Public API provided to applications
+ */
+
#ifndef MMLIB_MMSERVER2_H
-/* $Id: mm_pthread_msg.c,v 1.1 2004/12/27 11:16:15 mmondor Exp $ */
+/* $Id: mm_pthread_msg.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
/*
* Copyright (C) 2005, Matthew Mondor
/*
* It is almost a shame that POSIX did not define a standard API for
- * inter-thread asynchroneous and synchroneous messaging. So, here is my
- * implementation. Note that for asynchroneous operation it is recommended to
- * use mmpool(3) to allocate and free messages in an efficient way, in cases
- * where messages will need to be sent to the other end without expecting a
- * response back before the current function ends (in which case a message
- * obviously can't be on the stack).
+ * inter-thread asynchroneous and synchroneous messaging. So, here is my
+ * implementation. Note that for asynchroneous operation it is recommended to
+ * use a memory pool such as mmpool(3) to allocate and free messages in an
+ * efficient way, in cases where messages will need to be sent to the other
+ * end without expecting a response back before the current function ends
+ * (in which case a message obviously can't be on the stack).
*/
MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mm_pthread_msg.c,v 1.1 2004/12/27 11:16:15 mmondor Exp $");
+MMRCSID("$Id: mm_pthread_msg.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
/*
- * Allows to initialize a polling notification handle. When attached to a
+ * Allows to initialize a polling notification handle. When attached to a
* port, a message arriving on an empty port causes the associated ring to
* wake the thread from pthread_ring_wait().
*/
if ((error = pthread_cond_init(&ring->cond, NULL)) == 0) {
if ((error = pthread_mutex_init(&ring->mutex, NULL)) == 0) {
ring->magic = PRING_MAGIC;
- ring->event = 0;
+ ring->event = ring->mevent = 0;
return 0;
}
(void) pthread_cond_destroy(&ring->cond);
}
/*
- * Destroys a ring. Note that all message ports attached to this ring should
+ * Destroys a ring. Note that all message ports attached to this ring should
* first be detached or destroyed.
*/
int
/*
* Causes the current thread to sleep until a message arrives on an empty port
- * associated with this ring. In normal operation, a thread only goes in wait
+ * associated with this ring. In normal operation, a thread only goes in wait
* mode after it processed all queued messages on all interesting ports.
+ * However, provision is made so that a the function returns immediately if
+ * messages already were received on a port attached to this ring since the
+ * last call to pthread_ring_wait().
* Although using such an absolute time timespec might be disadvantageous for
* the API compared to a timeout in milliseconds for instance, this was chosen
- * to remain API-compatible with pthread_cond_timedwait().
+ * to remain API-compatible with pthread_cond_timedwait(), and upwards
+ * compatible with systems where nanosecond precision can be achieved.
*/
int
pthread_ring_wait(pthread_ring_t *ring, const struct timespec *abstime)
return -1;
/* As long as we don't have confirmation that we must stop waiting */
- for (ring->event = 0; !ring->event && error == 0; ) {
+ for (ring->event = 0; !ring->mevent && !ring->event && error == 0; ) {
/*
* Wait on conditional variable, which will automatically
* and atomically release the mutex and return with the mutex
} else
error = pthread_cond_wait(&ring->cond, &ring->mutex);
}
+ ring->mevent = 0;
/*
* And we know that conditional waiting functions returned with mutex
/*
* Allows to wake up waiter(s) on the specified ring, which are sleeping
- * threads within pthread_ring_wait(). This can be used to simulate the
- * arrival of a message on an empty port.
+ * threads within pthread_ring_wait(). This can be used to simulate the
+ * arrival of a message on an empty port. Also useful to use rings as a
+ * notification system only when no message passing is needed.
*/
int
pthread_ring_notify(pthread_ring_t *ring)
DEBUG_ASSERT(ring != NULL && ring->magic == PRING_MAGIC);
if ((error = pthread_mutex_lock(&ring->mutex)) == 0) {
- ring->event = 1;
+ ring->event = ring->mevent = 1;
(void) pthread_cond_signal(&ring->cond);
(void) pthread_mutex_unlock(&ring->mutex);
}
}
/*
- * Attaches a port to a ring. Multiple ports may be attached to a ring. A
+ * Attaches a port to a ring. Multiple ports may be attached to a ring. A
* message arriving on an empty port will cause the attached ring to be
- * notified, if any.
+ * notified, if any, and as such to cause a thread waiting on the ring to
+ * be awakened.
*/
int
pthread_port_set_ring(pthread_port_t *port, pthread_ring_t *ring)
}
/*
- * Allows to initialize a message before it can be sent over a port. The
+ * Allows to initialize a message before it can be sent over a port. The
* message only needs to be initialized once in general, even if it will be
- * used for bidirectional transmission for synchronous operation.
+ * used for bidirectional transmission for synchronous operation. If the
+ * reply port needs to be changed, however, this function should be used again
+ * to set the new reply port.
*/
int
pthread_msg_init(pthread_msg_t *msg, pthread_port_t *rport)
/*
* If any message exists in the queue of the specified port, unqueues it and
- * returns it. Otherwise, NULL is returned. In normal operation, all messages
- * queued to a port are processed before putting the thread back into sleep.
+ * returns it. Otherwise, NULL is returned. In normal operation, all messages
+ * queued to a port are processed before putting the thread back into sleep,
+ * mainly for efficiency, but also because it eases synchronization.
*/
pthread_msg_t *
pthread_msg_get(pthread_port_t *port)
/*
* Queues the specified message to the specified port, returning 0 on success.
* Note that the message data is not copied or moved, but that a pointer
- * system is used to queue the message. One has to be careful to not allocate
- * this message space on the stack when asynchroneous operation is wanted. In
- * synchroneous operation mode, it is not a problem, since the sender does not
- * have to modify the data until the other end replies back with the same
- * message after modifying the message if necessary. In synchroneous mode, we
- * simply delegate that message memory region to the other end until it
- * notifies us with a reply that it is done working with it.
- * Returns 0 on success, or an error number.
+ * system is used to queue the message. Thus, the message's shared memory
+ * region is leased temporarily to the other end. One has to be careful to
+ * not allocate this message space on the stack when asynchroneous operation
+ * is needed. In synchroneous operation mode, it is not a problem, since the
+ * sender does not have to modify the data until the other end replies back
+ * with the same message after modifying the message if necessary. In
+ * synchroneous mode, we simply delegate that message memory region to the
+ * other end until it notifies us with a reply that it is done working with
+ * it. Returns 0 on success, or an error number.
*/
int
pthread_msg_put(pthread_port_t *port, pthread_msg_t *msg)
return error;
DLIST_APPEND(&port->messages, (node_t *)msg);
- if (port->ring != NULL && DLIST_NODES(&port->messages) == 1) {
+ if (port->ring != NULL) {
+ if (DLIST_NODES(&port->messages) == 1) {
+ /*
+ * We know that there previously were no messages,
+ * and that the reading thread then waits for any
+ * message to be available. Signal it that there at
+ * least is one message ready. The other end should
+ * normally process all available messages before
+ * going back into waiting.
+ */
+ if ((error = pthread_mutex_lock(&port->ring->mutex))
+ == 0) {
+ port->ring->event = 1;
+ (void) pthread_cond_signal(&port->ring->cond);
+ (void) pthread_mutex_unlock(
+ &port->ring->mutex);
+ }
+ }
/*
- * We know that there previously were no messages, and that
- * the reading thread then waits for any message to be
- * available. Signal it that there at least is one message
- * ready. The other end should process all available messages
- * before going back into waiting.
+ * If the other end, however, is already locked
+ * waiting for the ring to be notified while
+ * there already are messages, we still trigger mevent
+ * to cause it to unlock, however. This behavior is
+ * useful in the polling system code, for instance.
*/
- if ((error = pthread_mutex_lock(&port->ring->mutex)) == 0) {
- port->ring->event = 1;
- (void) pthread_cond_signal(&port->ring->cond);
- (void) pthread_mutex_unlock(&port->ring->mutex);
- }
+ /* XXX We don't use a mutex for now... should we? */
+ port->ring->mevent = 1;
}
+
(void) pthread_mutex_unlock(&port->lock);
return error;
}
/*
- * Meant to be used in synchroneous message transfer mode. The initial sender
+ * Meant to be used in synchroneous message transfer mode. The initial sender
* sends a message to the other end, which then uses this function to notify
* back the initial sender that it is done, often with a success/failure
- * result as part of the message. Returns 0 on success, or an error number.
+ * result as part of the message. Returns 0 on success, or an error number.
*/
int
pthread_msg_reply(pthread_msg_t *msg)
-/* $Id: mm_pthread_msg.h,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: mm_pthread_msg.h,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
/*
* Copyright (C) 2005, Matthew Mondor
pthread_mutex_t mutex;
int mode;
int event;
+ int mevent;
} pthread_ring_t;
enum pthread_ring_modes {
-/* $Id: mm_pthread_poll.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: mm_pthread_poll.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
/*
* Copyright (C) 2005, Matthew Mondor
*/
/*
- * I consider this code to be a major hack around the inherent problems UNIX
+ * I consider this code to be a major hack around the inherent problems unix
* systems face because of the lack of support for filedescriptor polling in
- * the POSIX threads API. Although pthread defines methods for thread
+ * the POSIX threads API. Although pthread defines methods for thread
* synchronization and polling waiting for events (using conditionnal
- * variables), and that UNIX provides polling on filedescriptors using
+ * variables), and that unix provides polling on filedescriptors using
* select(2), poll(2), kqueue(2) and other mechanisms, both are totally
* distinct entities which can be considered to either conflict with
- * eachother or to not be related enough in a unified way. The current
+ * eachother or to not be related enough in a unified way. The current
* situation makes it almost impossible for a thread to both be polling for
* interthread efficient messages implementations built upon pthread, and
* filedescriptor events, concurrently.
* The GNU PTH library implements non-standard functions which allow to
* multiplex interthread messages and filedescriptor events, using for
* instance pth_poll_ev(), pth_select_ev(), pth_accept_ev(), pth_connect_ev(),
- * etc. However, this is internally implemented using a single large select(2)
+ * etc. However, this is internally implemented using a single large select(2)
* based loop along with a slow large loop looking for non-fd events based on
- * the principles of libevent. This threading library has other disadventages,
+ * the principles of libevent. This threading library has other disadventages,
* such as not providing a preemptive scheduler (being a fully userspace
* implementation) and not allowing to scale to multiple processors on SMP
- * systems. This interface however shows how good the POSIX threads API could
- * have been, if it was better designed with UNIX systems in mind. This
+ * systems. This interface however shows how good the POSIX threads API could
+ * have been, if it was better designed with unix systems in mind. This
* library also being the most portable threads library alternative for quite
* some time, because of the fact that Operating Systems implemented POSIX
* threads inconsistently, or not at all, caused us to use PTH during some
* time to develop software in cases where a pool of processes was not ideal
* because of the frequency of shared memory synchronization needs.
*
- * With the advent of POSIX threads implementations on more UNIX and UNIX-like
+ * With the advent of POSIX threads implementations on more unix and unix-like
* systems and of modern implementations behaving more consistently, which can
* scale on SMP systems and provide preemptive scheduling, it was considered
* worthwhile for us to adapt our software again to use the standard POSIX
- * API. Especially considering that NetBSD which had no OS provided threads
+ * API. Especially considering that NetBSD which had no OS provided threads
* implementation for applications now has an awesome pthreads implementation
* starting with version 2.0. However, we encountered difficulties with some
* software which used the complex multiplexing of thread events and
- * filedescriptor ones. This module provides a solution to port this software.
+ * filedescriptor ones. This module provides a solution to port this software.
* It however is somewhat a hack.
*
- * The downsides of this implementation are as follows. We originally intended
+ * The downsides of this implementation are as follows. We originally intended
* to develop a system which would scale among an increasing number of threads
* in a ready pool of threads, scaling with concurrency of the polling calls.
* This however proved difficult, or impossible to achieve, the main reasons
* being that 1) A signal delivered to a process is only received by a random
- * thread that is not blocking it. 2) In the case where multiple threads are
+ * thread that is not blocking it. 2) In the case where multiple threads are
* polling on a common file descriptor, similarily only one random thread
- * is awaken. 3) pthread_cond_signal() and pthread_cond_broadcast() cannot
- * wake threads waiting in filedescriptor polling. 4) to achieve what we
+ * is awaken. 3) pthread_cond_signal() and pthread_cond_broadcast() cannot
+ * wake threads waiting in filedescriptor polling. 4) to achieve what we
* needed, two descriptors would have been necessary per notification ring.
- * this was considered an aweful solution and was promptly rejected. 5) The
+ * this was considered an aweful solution and was promptly rejected. 5) The
* POSIX API does not define a way for a process to set or change the signal
* blocking masks of other threads on the fly.
*
* Our solution then had to rely on a main descriptor polling manager thread
* which would be used to poll file descriptors, and would as a device serve
- * client threads via efficient interthread messages. An issue still arises
+ * client threads via efficient interthread messages. An issue still arises
* when a client thread sends a message to the polling thread to add new
* descriptors for polling or to cancel polling and remove descriptors.
* The polling thread must be able to immediately process these events
- * awaking from filedescriptor polling. Two possible hacks could be used for
- * this. 1) Use an AF_LOCAL SOCK_DGRAM socketpair(), which one side would
+ * awaking from filedescriptor polling. Two possible hacks could be used for
+ * this. 1) Use an AF_LOCAL SOCK_DGRAM socketpair(), which one side would
* be used to trigger an event writing some data, and the other side always
- * included by the polling thread within the set of descriptors. 2) Send a
+ * included by the polling thread within the set of descriptors. 2) Send a
* signal to the process which only the polling thread is not blocking,
* to ensure that it be the one catching it, as such to awake from polling
- * with an EINTR error code. This second solution was considered more elegant
- * and is used as the basis of this implementation. We currently are
+ * with an EINTR error code. This second solution was considered more elegant
+ * and is used as the basis of this implementation. We currently are
* clubbering the SIGUSR2 signal to achieve this.
*/
-/*
- * XXX Use pthread_runonce stuff where needed, or use proper synchronization.
- */
-
#include <sys/types.h>
#include <poll.h>
#include <signal.h>
#include <unistd.h>
+#include <stdio.h>
+#include <string.h>
#include <mmtypes.h>
#include <mmlog.h>
MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mm_pthread_poll.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $");
+MMRCSID("$Id: mm_pthread_poll.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
/*
* Synchroneous communications message between arbitrary threads and the
- * polling thread. Since communication is synchroneous, we only need to
- * allocate one such message per thread. We are always expecting a reply
- * back afer sending a query.
+ * polling thread. Since communication is synchroneous, we only need to
+ * allocate one such message per thread. We are always expecting a reply
+ * back after sending a query before reusing the buffer. In fact, the
+ * message passing system only serves as a means for synchronization around
+ * the message, which is a shared memory object.
*/
struct poll_msg {
pthread_msg_t msgnode;
/*
* An index is maintained of descriptor number -> poll_msg_index
- * structures. Each of wich has information on the message the descriptor
+ * structures. Each of wich has information on the message the descriptor
* belongs to, and the index into the pollfd array so that it be easy to
* efficiently do per-fd work.
*/
* Static functions prototypes
*/
static int pthread_poll_proc_init(void);
+static void pthread_poll_proc_init2(void);
static int pthread_poll_thread_init(struct poll_data **);
static void pthread_poll_thread_exit(void *);
static void *poll_thread(void *);
* Static process specific storage
*/
static bool pthread_poll_initialized = FALSE;
-static bool pthread_poll_proc_initialized = FALSE;
+static pthread_once_t pthread_poll_proc_initialized = PTHREAD_ONCE_INIT;
static pthread_key_t pthread_poll_proc_key;
static pthread_ring_t pthread_poll_thread_started_ring;
static pthread_port_t pthread_poll_thread_port;
static pid_t process_id;
/*
- * Static global poll_thread storage. No synhronization is necessary when
+ * Static global poll_thread storage. No synhronization is necessary when
* using these, since only the polling thread does.
*/
static struct poll_idx *poll_idx;
return errno;
process_id = getpid();
- pthread_poll_proc_initialized = TRUE;
return error;
}
+static void
+pthread_poll_proc_init2(void)
+{
+ int error;
+
+ if ((error = pthread_poll_proc_init()) != 0) {
+ (void) fprintf(stderr, "pthread_poll_proc_init() - %s\n",
+ strerror(error));
+ exit(EXIT_FAILURE);
+ }
+}
+
static int
pthread_poll_thread_init(struct poll_data **res)
{
/*
* Actual polling thread, with which we communicate using messages polling on
- * pthread_port_t and pthread_ring_t. This is the only thread that should be
+ * pthread_port_t and pthread_ring_t. This is the only thread that should be
* catching SIGUSR2 signals (used to wake us up and reiterate our main loop.
* Note: Although less efficient than using kqueue(2) or libevent(3), after
* discussion with 3s4i we settled to using poll(2) for now, which minimizes
- * OS dependencies as well as third party software dependencies. Because
+ * OS dependencies as well as third party software dependencies. Because
* pthread_poll_ring(2) is only sparsely used by our software (migrating from
* using PTH library which provided pth_poll_ev()), and that we only provide
* it small pollfd arrays, this implementation was considered to meet our
list_t msg_list;
register int i;
- /* This initialization shouldn't fail. If it did, it would be nice to
+ /*
+ * This initialization shouldn't fail. If it did, it would be nice to
* be able to simply panic eventually. XXX
*/
/*
* Allocate an initial buffer size for our pollfd array as well as for
- * our descriptor based index. We'll double these buffers as necessary
- * at runtime.
+ * our descriptor based index. We'll double these buffers as
+ * necessary at runtime.
*/
poll_fds_size = 64;
poll_fds = malloc(sizeof(struct pollfd) * poll_fds_size);
DLIST_INIT(&msg_list);
/*
- * Initialize message port and associated ring. The message port is
+ * Initialize message port and associated ring. The message port is
* module global, so that it be public to pthread_poll_ring().
*/
(void) pthread_port_init(&pthread_poll_thread_port);
struct poll_msg *msg, *nextmsg;
/*
- * Get time of day in a rather high resolution. We need to
- * do this to be able to evaluate timeouts later on. We
+ * Get time of day in a rather high resolution. We need to
+ * do this to be able to evaluate timeouts later on. We
* attempt to only require one time syscall per loop.
*/
(void) gettimeofday(&tv, NULL);
/*
- * Process any messages. We need to add the descriptors if
- * they aren't already added. Also store yet unsatisfied
+ * Process any messages. We need to add the descriptors if
+ * they aren't already added. Also store yet unsatisfied
* request messages into a list.
*/
while ((msg = (struct poll_msg *)pthread_msg_get(
}
/*
- * Process timeouts. For request messages which timed out,
+ * Process timeouts. For request messages which timed out,
* satisfy them immediately using ETIMEDOUT error.
* This also allows to evaluate which is the soonest to expire
* entry, which poll(2) will have to use as timeout.
}
/*
- * Perform polling. poll(2) for as much time as possible,
+ * Perform polling. poll(2) for as much time as possible,
* although making sure to allow the soonest to expire query
- * to stop polling. Next to expire entry time is in ttv and
- * current time in tv. Calculate difference and convert to
+ * to stop polling. Next to expire entry time is in ttv and
+ * current time in tv. Calculate difference and convert to
* milliseconds.
*/
timersub(&ttv, &tv, &ttv);
continue;
/*
- * Grow index buffer if necessary. Either grow by doubling
+ * Grow index buffer if necessary. Either grow by doubling
* size, or even more if necessary to hold index to fd.
* If we only grew to hold fd, we might need to realloc(3) too
- * often. Take care to also NULL msg field of new entries.
+ * often. Take care to also NULL msg field of new entries.
*/
if (poll_idx_size <= fd) {
struct poll_idx *idx;
goto err;
/*
- * Resize pollfd array if needed. Grow by doubling.
+ * Resize pollfd array if needed. Grow by doubling.
*/
if (poll_fds_size <= poll_nfds) {
struct pollfd *ptr;
}
/*
- * Permits to disunite supplied pollfd set from the main set. Also sets the
+ * Permits to disunite supplied pollfd set from the main set. Also sets the
* revents fields of the supplied set to the ones of the main set.
*/
static void
msg->fds[i].revents = poll_fds[idx].revents;
/*
- * Unlink fd from the global set. The removal method is
+ * Unlink fd from the global set. The removal method is
* simple; Take the last entry of the global set and move it
* over the current entry, updating index links, and lower
- * the gobal nfds by one. If we're the last entry, simply
- * remove it invalidating it's index entry and lowering the
+ * the gobal nfds by one. If we're the last entry, simply
+ * remove it invalidating its index entry and lowering the
* global nfds.
*/
idx2 = --poll_nfds;
if (idx != idx2) {
/*
* idx is new location of last entry, which is at
- * idx2. Copy idx2 one over to idx position.
+ * idx2. Copy idx2 one over to idx position.
*/
poll_fds[idx].fd = poll_fds[idx2].fd;
poll_fds[idx].events = poll_fds[idx2].events;
*/
/*
- * Must be called before launching any thread. Sets up the signal mask and
- * launches the dedicated poll slave thread. Important note: this system
+ * Must be called before launching any thread. Sets up the signal mask and
+ * launches the dedicated poll slave thread. Important note: this system
* clobbers the SIGUSR2 signal, which the application can no longer use for
- * other purposes. The only solution to wake the thread manager thread from
+ * other purposes. The only solution to wake the thread manager thread from
* poll(2) is either to trigger an event through a dedicated filedescriptor,
* or to send a signal to the process which only the polling thread allows.
*/
return 0;
/*
- * First block SIGUSR2 signal in the parent. The reason why this must
+ * First block SIGUSR2 signal in the parent. The reason why this must
* be called before the application launches any thread is that
* threads inherit the sigmask of their parent, and that all threads,
- * but the polling thread, must block the signal. This ensures that
+ * but the polling thread, must block the signal. This ensures that
* only the wanted thread wakes up when a SIGUSR2 signal is received.
* This way, we can interrupt the polling thread in poll(2), for
* instance, and cause it to reiterate its main loop.
/*
* We may now launch the poll thread and wait for notification from it
- * that it is ready to serve requests. We won't need to exit this
+ * that it is ready to serve requests. We won't need to exit this
* thread, so it can be launched in detached state.
*/
if ((error = pthread_attr_init(&attr)) != 0)
/*
* poll(2) replacement which can also be awakened by a notification happening
- * on the specified ring. This for instance allows to process thread messages
- * as well as descriptor events. ETIMEDOUT is returned if timeout occurred
+ * on the specified ring. This for instance allows to process thread messages
+ * as well as descriptor events. ETIMEDOUT is returned if timeout occurred
* before any event, or ECANCELED if a ring notification is the source of the
- * interruption. Returns the number of descriptors for which events were
- * received otherwise (which could be 0 in some cases). Other errors may be
+ * interruption. Returns the number of descriptors for which events were
+ * received otherwise (which could be 0 in some cases). Other errors may be
* returned such as EINVAL, as for poll(2).
*/
int
/*
* Implicit process and thread specific initializations
- * XXX Use pthread_once() here, or use a mutex around
- * pthread_poll_proc_initialized
*/
- if (!pthread_poll_proc_initialized)
- if ((error = pthread_poll_proc_init()) != 0)
- return error;
+ if ((error = pthread_once(&pthread_poll_proc_initialized,
+ pthread_poll_proc_init2)) != 0)
+ return error;
+ /*
+ * XXX Use a mutex or pthread_once() equivalent here too?
+ */
if ((data = pthread_getspecific(pthread_poll_proc_key)) == NULL) {
if ((error = pthread_poll_thread_init(&data)) != 0)
return error;
}
/*
- * Send query to polling thread. It is safe to simply reuse our
+ * Send query to polling thread. It is safe to simply reuse our
* message since we then expect a reply back and synchronize it.
*/
data->msg.cancel = FALSE;
/*
* Interrupt polling thread which may still be waiting in poll(2).
* We do this by sending SIGUSR2 to the process, which only the
- * polling thread is not blocking. This causes the thread to reiterate
- * it's main loop, thus processing this message and going back to
+ * polling thread is not blocking. This causes the thread to reiterate
+ * its main loop, thus processing this message and going back to
* sleep in poll(2).
*/
if (kill(process_id, SIGUSR2) != 0) {
}
/*
- * Wait until en event occurs and notifies our ring. An event could
+ * Wait until en event occurs and notifies our ring. An event could
* either be triggered by the poll request ending or by another
- * interrupting event on the supplied ring.
- * XXX hmm what would happen if the message was already available on
- * the port before we have time to start waiting on the ring?
- * Would the wait immediately return? I'm not so sure... If we looped
- * first looking for a message to be available and then waiting if
- * needed, we would need another method to detect that a foreing event
- * occured.
+ * interrupting event on the supplied ring. If a message is queued
+ * on the port between pthread_port_set_ring() and
+ * pthread_ring_wait(), the latter immediately returns.
*/
if ((error = pthread_port_set_ring(&data->port, ring)) != 0) {
errno = error;
if ((msg = (struct poll_msg *)pthread_msg_get(&data->port)) == NULL) {
/*
* No message replied back from poll thread yet, this means
- * that our ring was notified by another event. Cancel request
+ * that our ring was notified by another event. Cancel request
* by sending event back with the cancel flag, and wait for
* reply message to occur (which will be the original request
- * results we were waiting for). error field will be set to
+ * results we were waiting for). error field will be set to
* ECANCELED by the poll thread.
*/
msg->cancel = TRUE;
/*
* accept(2) replacement which can both observe a timeout and be interrupted
- * via pthread_ring_t events. Internally implemented using
- * pthread_poll_ring(). Will internally set the descriptor in non-blocking
+ * via pthread_ring_t events. Internally implemented using
+ * pthread_poll_ring(). Will internally set the descriptor in non-blocking
* mode if necessary, then reverting it to the mode it was supplied in.
* Returns a new descriptor on success, or -1 on failure, in which case
* errno will be set to either EINVAL, ETIMEDOUT, ECANCELLED, or others.
/*
* connect(2) replacement which can both observe a timeout and be interrupted
- * via pthread_ring_t events. Internally implemented using
- * pthread_poll_ring(). Will internally set the descriptor in non-blocking
+ * via pthread_ring_t events. Internally implemented using
+ * pthread_poll_ring(). Will internally set the descriptor in non-blocking
* mode if necessary, then reverting it back to the mode it was supplied in.
* Returns 0 on sucess, or an error number (EINVAL, ETIMEDOUT, ECANCELLED
- * or other). Timeout is in milliseconds, like for poll(2) and can be -1.
+ * or other). Timeout is in milliseconds, like for poll(2) and can be -1.
* For the application to know the actual connection status result, it should
* poll until completion and verify the status using getsockopt(2) with
- * SOL_SOCKET level and SO_ERROR option. It can alternatively continue to call
- * this function in a loop until completion. Calling the function on an
+ * SOL_SOCKET level and SO_ERROR option. It can alternatively continue to call
+ * this function in a loop until completion. Calling the function on an
* already connected socket will result in EISCONN.
*/
int
-/* $Id: mm_pthread_pool.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: mm_pthread_pool.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
/*
* Copyright (C) 2004-2005, Matthew Mondor
/*
* Implementation of a pool of ready processes which adapts with concurrency
- * needs. These ready processes can serve requests passed through efficient
- * inter-thread messaging. mmpool(3) is used for the pool functionality.
+ * needs. These ready processes can serve requests passed through efficient
+ * inter-thread messaging. mmpool(3) is used for the pool functionality.
*/
MMCOPYRIGHT("@(#) Copyright (c) 2004-2005\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mm_pthread_pool.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $");
+MMRCSID("$Id: mm_pthread_pool.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
/*
* Must be called to initialize the pthreads pool subsystem, before calling
- * any other function of this API. Returns 0 on success, or an error number.
+ * any other function of this API. Returns 0 on success, or an error number.
* <initial> threads are launched, and more will be launched in increments
- * of <initial> whenever necessary. These will also only be destroyed in
+ * of <initial> whenever necessary. These will also only be destroyed in
* decrements of <initial> whenever that many threads have not been in use for
* some time, and a minimum of <initial> threads will always be kept.
* Setting <initial> to high values may actually degrade performance with some
- * unefficient threading implementations. It is not recommended to use more
- * than 8 using the pth(3) library. Using NetBSD 2.0+ SA threads, a high
- * number does not reduce performance. We current do not observe any limit
- * whatsoever according to the number of threads launched over time. It is the
+ * unefficient threading implementations. It is not recommended to use more
+ * than 8 using the pth(3) library. Using NetBSD 2.0+ SA threads, a high
+ * number does not reduce performance. We current do not observe any limit
+ * whatsoever according to the number of threads launched over time. It is the
* application's responsibility to ensure to observe decent concurrency limits
* before calling pthread_object_call().
*/
/*
* We use this ring to obtain notification of ready children when
- * launching them. This is required for proper synchronization to
+ * launching them. This is required for proper synchronization to
* avoid aweful race conditions.
*/
if ((error = pthread_ring_init(&thread_started_ring)) != 0)
}
/*
- * Now initialize the threads pool. This creates threads, uses
+ * Now initialize the threads pool. This creates threads, uses
* synchronization with thread_started_ring, and uses the message
* subsystem, which all must be initialized and ready.
*/
/*
* Allows to invoke a thread of the pool to perform execution of the wanted
- * function. This is very efficient since the threads are already created and
- * are waiting for requests. There is no maximum concurrency limit enforced by
+ * function. This is very efficient since the threads are already created and
+ * are waiting for requests. There is no maximum concurrency limit enforced by
* this system; It is the responsibility of the application to restrict
* concurrency as necessary by keeping internal information on the current
- * number of requests. 0 is returned on success, or an error number.
- * XXX Add support for synchroneous and asynchroneous operation. Current
+ * number of requests. 0 is returned on success, or an error number.
+ * XXX Add support for synchroneous and asynchroneous operation. Current
* operation is only asynchroneous, but we would like to add a boolean here to
- * decide. We also could add back the result value of the thread function
+ * decide. We also could add back the result value of the thread function
* which would only be useful in synchroneous operation, when we are waiting
- * until the task ends... Of course, it's still easy for applications to use
+ * until the task ends... Of course, it's still easy for applications to use
* these in a synchroneous manner, by using a message and/or ring,
* conditionnal variable, etc.
+ * Also evaluate if a callback function to be called to notify end of
+ * asynchroneous operation would be useful.
*/
int
pthread_object_call(pthread_port_t **port,
/*
* Allocate a thread from the pool to reserve it, and tell it to call
- * a function via a message. The message cannot be on the stack in
+ * a function via a message. The message cannot be on the stack in
* this case, since it holds arguments to be passed to a thread, and
* also consists of an asynchroneous message for wich we do not expect
- * a response back, waiting for it. We just dispatch it and go on.
+ * a response back, waiting for it. We just dispatch it and go on.
*/
if ((obj = thread_object_alloc()) == NULL) {
error = ENOMEM;
*/
/*
- * Internally used to allocate a ready thread from the pool
+ * Internally used to allocate a ready thread from the pool.
*/
inline static pthread_object_t *
thread_object_alloc(void)
/*
* Internally used to free a no longer needed thread back to the pool of ready
- * threads
+ * threads.
*/
inline static void
thread_object_free(pthread_object_t *obj)
}
/*
- * Internally called by mmpool(3) to create a thread object
+ * Internally called by mmpool(3) to create a thread object.
*/
static bool
thread_object_constructor(pnode_t *pnode)
return FALSE;
/*
- * Wait until new thread ready notification. Without this, at least
- * with NetBSD 2.0 SA threads, hell would break loose. Thread creation
+ * Wait until new thread ready notification. Without this, at least
+ * with NetBSD 2.0 SA threads, hell would break loose. Thread creation
* isn't really a bottleneck in our case anyways, since we only need
* to do it when all threads of the pool are already busy.
*/
}
/*
- * Internally called by mmpool(3) to destroy a thread object
+ * Internally called by mmpool(3) to destroy a thread object.
*/
static void
thread_object_destructor(pnode_t *pnode)
pthread_object_msg_t *msg;
/*
- * To be freed, the thread has to be terminated. We thus send it a
+ * To be freed, the thread has to be terminated. We thus send it a
* quit message and then wait for it to exit using pthread_join().
- * Note that we let the thread destroy the port field. Although we
+ * Note that we let the thread destroy the port field. Although we
* theoretically could use a message on the stack here, let's be safe.
* Thread destruction is only performed rarely anyways, so this isn't
* a performance problem.
}
/*
- * Actual thread's main loop. We create a message port and listen for command
- * messages (quit and call). When we obtain a quit request, we destroy the
- * port and exit cleanly. The quit event can never occur during the execution
+ * Actual thread's main loop. We create a message port and listen for command
+ * messages (quit and call). When we obtain a quit request, we destroy the
+ * port and exit cleanly. The quit event can never occur during the execution
* of a call command, since it is only called on already freed thread nodes
- * (by mmpool(3) pool_free()). It is advized to applications which need to
+ * (by mmpool(3) pool_free()). It is advized to applications which need to
* obtain and use the port of the thread after thread_object_call() to only
* send proper user messages, not system reserved ones.
*/
pthread_object_msg_t *msg;
/*
- * Create incomming our message port as well as its corresponding
- * notification ring we can sleep on. Then advertize our port address.
+ * Create our incomming message port as well as its corresponding
+ * notification ring we can sleep on. Then advertize our port address.
* Ideally, we should somehow panic if any of this initialization
* fails. XXX
*/
}
if (msg->command == PTHREAD_OBJ_CALL) {
/*
- * Request to execute a function. This means
+ * Request to execute a function. This means
* that we were allocated/reserved first.
*/
msg->u.call.function(obj,
/*
* Free/release us back, so that we be
* available again to process further
- * requests. It is possible that freeing
+ * requests. It is possible that freeing
* ourselves cause a PTHREAD_OBJ_QUIT message
* to be queued soon on our port by the
- * destructor function. This is safe, since
+ * destructor function. This is safe, since
* the destructor does not cause us to be
* destroyed until it waits for us to have
* ended cleanly using pthread_join().
-/* $Id: mm_pthread_sleep.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: mm_pthread_sleep.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
/*
* Copyright (C) 2005, Matthew Mondor
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
-/* XXX Use pthread_once() stuff or properly synchronize */
-
#include <sys/types.h>
#include <pthread.h>
#include <errno.h>
#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
#include <mm_pthread_msg.h>
#include <mm_pthread_sleep.h>
MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mm_pthread_sleep.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $");
+MMRCSID("$Id: mm_pthread_sleep.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
static int pthread_sleep_proc_init(void);
+static void pthread_sleep_proc_init2(void);
static int pthread_sleep_thread_init(pthread_ring_t **);
static void pthread_sleep_thread_exit(void *);
static pthread_key_t pthread_sleep_proc_key;
-static bool pthread_sleep_proc_initialized = FALSE;
+static pthread_once_t pthread_sleep_proc_initialized = PTHREAD_ONCE_INIT;
pthread_sleep_thread_exit)) != 0)
return error;
- pthread_sleep_proc_initialized = TRUE;
-
return error;
}
+static void
+pthread_sleep_proc_init2(void)
+{
+ int error;
+
+ if ((error = pthread_sleep_proc_init()) != 0) {
+ (void) fprintf(stderr, "pthread_sleep_proc_init() - %s\n",
+ strerror(error));
+ exit(EXIT_FAILURE);
+ }
+}
+
static int
pthread_sleep_thread_init(pthread_ring_t **res)
{
/*
* Although NetBSD threads don't need this, some pthread
- * implementations do. Some will crash for attempting to reference the
+ * implementations do. Some will crash for attempting to reference the
* already freed memory twice calling us again until we NULL the
- * pointer for the data. Lame, but the POSIX standard was unclear
+ * pointer for the data. Lame, but the POSIX standard was unclear
* about this.
*/
(void) pthread_setspecific(pthread_sleep_proc_key, NULL);
/*
* Process specific initialization if needed
*/
- if (!pthread_sleep_proc_initialized) {
- if ((error = pthread_sleep_proc_init()) != 0)
- return error;
- }
+ if ((error = pthread_once(&pthread_sleep_proc_initialized,
+ pthread_sleep_proc_init2)) != 0)
+ return error;
/*
* Thread specific initialization if needed
- * XXX Use pthread_once() here, or mutex around ring?
+ * XXX Use pthread_once() here too, or mutex around ring?
*/
if ((ring = pthread_getspecific(pthread_sleep_proc_key)) == NULL) {
if ((error = pthread_sleep_thread_init(&ring)) != 0)
timespecadd(&its, ts, &its);
/*
- * We can finally sleep. We expect ETIMEDOUT to be the normal return
- * value in this case, which we convert to a no-error. Other errors
+ * We can finally sleep. We expect ETIMEDOUT to be the normal return
+ * value in this case, which we convert to a no-error. Other errors
* will be returned un changed.
*/
if ((error = pthread_ring_wait(ring, &its)) == ETIMEDOUT)
/*
* Suspends the current thread for the duration specified into supplied
- * timeval. Returns 0 on success or an error number.
+ * timeval. Returns 0 on success or an error number.
*/
int
pthread_microsleep(struct timeval *tv)
/*
* Suspends execution of current thread for duration of specified
- * milliseconds. Returns 0 on success or an error number.
+ * milliseconds. Returns 0 on success or an error number.
*/
int
pthread_millisleep(unsigned int ms)
-/* $Id: msg_test.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $ */
+/* $Id: msg_test.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $ */
/*
* Copyright (C) 2005, Matthew Mondor
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
+#include <stdarg.h>
#include <mm_pthread_msg.h>
#include <mm_pthread_pool.h>
MMCOPYRIGHT("@(#) Copyright (c) 2005\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: msg_test.c,v 1.1 2004/12/27 11:16:16 mmondor Exp $");
+MMRCSID("$Id: msg_test.c,v 1.2 2005/09/12 12:00:05 mmondor Exp $");
-#define THREADS 8
-#define ROUNDS 3
+#define THREADS 32
+#define ROUNDS 8
#define TIMEOUT 1
+/*#define PRINTLOCK*/
-int main(void);
-void threadfunc(pthread_object_t *, void *);
+int main(void);
+static void threadfunc(pthread_object_t *, void *);
+static void printfunc(const char *, ...);
-pthread_port_t main_port;
-pthread_mutex_t print_lock;
+static pthread_port_t main_port;
+static pthread_mutex_t print_lock;
{
pthread_ring_t ring;
struct message *msg;
- int i;
+ int i, err;
int threads_args[THREADS];
struct timeval tv;
struct timespec ts, ts1;
- pthread_mutex_init(&print_lock, NULL);
-
- pthread_port_init(&main_port);
+ if ((err = pthread_mutex_init(&print_lock, NULL)) != 0) {
+ (void) printf("main() - stdout lock - %s\n", strerror(err));
+ exit(EXIT_FAILURE);
+ }
- pthread_ring_init(&ring);
- pthread_port_set_ring(&main_port, &ring);
+ if ((err = pthread_port_init(&main_port)) != 0 ||
+ (err = pthread_ring_init(&ring)) != 0 ||
+ (err = pthread_port_set_ring(&main_port, &ring)) != 0) {
+ printfunc("main() - initialization - %s\n", strerror(err));
+ exit(EXIT_FAILURE);
+ }
- pthread_mutex_lock(&print_lock);
- printf("Main: launching threads\n");
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
+ printfunc("Main: launching threads\n");
- if (pthread_poll_init() != 0)
+ if ((err = pthread_poll_init()) != 0) {
+ printfunc("main() - pthread_poll_init() - %s\n",
+ strerror(err));
exit(EXIT_FAILURE);
- if (pthread_object_init(16) != 0)
+ }
+
+ /*
+ * Initializes a poll of ready threads which can be dispatched
+ * functions to execute.
+ */
+ if ((err = pthread_object_init(THREADS + 1)) != 0) {
+ printfunc("main() - pthread_object_init() - %s\n",
+ strerror(err));
exit(EXIT_FAILURE);
+ }
+ /*
+ * Now dispatch a main reentrant function to many threads, without
+ * waiting for them to complete, in an asynchroneous manner.
+ * XXX Because of the way this works, the parent main thread should
+ * actually already be listening to messages... We did create a port
+ * however, which should queue messages until we reach the main loop.
+ */
for (i = 0; i < THREADS; i++) {
threads_args[i] = i;
- (void) pthread_object_call(NULL, threadfunc,
- &threads_args[i]);
+ if ((err = pthread_object_call(NULL, threadfunc,
+ &threads_args[i])) != 0)
+ printfunc("main() - pthread_object_call() - %s\n",
+ strerror(errno));
}
ts1.tv_sec = TIMEOUT;
ts1.tv_nsec = 0;
for (;;) {
- int err;
-
- /* Wait for any message(s) to be available */
- pthread_mutex_lock(&print_lock);
- printf("Main: Waiting for messages\n");
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
-
- (void) gettimeofday(&tv, NULL);
- TIMEVAL_TO_TIMESPEC(&tv, &ts);
- timespecadd(&ts, &ts1, &ts);
- if ((err = pthread_ring_wait(&ring, &ts)) != 0) {
- pthread_mutex_lock(&print_lock);
- printf("Main: pthread_ring_wait() - %s\n",
- strerror(err));
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
- break;
- }
-
/*
* Read messages as long as there are any, and reply to each
* of them in a synchroneous manner.
while ((msg = (struct message *)pthread_msg_get(&main_port))
!= NULL) {
- pthread_mutex_lock(&print_lock);
- printf("Main: Received message %3d %3d\n",
- msg->id, msg->i);
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
+ printfunc(
+ "Main: Received message %d from thread #%d\n",
+ msg->i, msg->id);
if ((err = pthread_msg_reply((pthread_msg_t *)msg))
- != 0) {
- pthread_mutex_lock(&print_lock);
- printf("Main: pthread_message_reply() - %s\n",
+ != 0)
+ printfunc(
+ "Main: pthread_message_reply() - %s\n",
strerror(err));
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
- }
+ }
+
+ /*
+ * No more messages to process; Wait for any message(s) to be
+ * available.
+ * Note that there is special provision in the event where
+ * this loop first polling for new messages before processing
+ * them, which causes waiting for the ring to immediately
+ * return instead of actually waiting if any messages already
+ * have been sent.
+ */
+ printfunc("Main: Waiting for messages\n");
+
+ (void) gettimeofday(&tv, NULL);
+ TIMEVAL_TO_TIMESPEC(&tv, &ts);
+ timespecadd(&ts, &ts1, &ts);
+ if ((err = pthread_ring_wait(&ring, &ts)) != 0) {
+ printfunc("Main: pthread_ring_wait() - %s\n",
+ strerror(err));
+ break;
}
}
- pthread_mutex_destroy(&print_lock);
- pthread_port_destroy(&main_port);
- pthread_ring_destroy(&ring);
+ (void) pthread_mutex_destroy(&print_lock);
+ (void) pthread_port_destroy(&main_port);
+ (void) pthread_ring_destroy(&ring);
return 0;
}
-void
+static void
threadfunc(pthread_object_t *obj, void *args)
{
int id = *(int *)args;
- int i;
+ int i, err;
struct message msg;
pthread_port_t rport;
pthread_ring_t rring;
- pthread_port_init(&rport);
- pthread_ring_init(&rring);
- pthread_port_set_ring(&rport, &rring);
+ if ((err = pthread_port_init(&rport)) != 0 ||
+ (err = pthread_ring_init(&rring)) != 0 ||
+ (err = pthread_port_set_ring(&rport, &rring)) != 0 ||
+ (err = pthread_msg_init((pthread_msg_t *)&msg, &rport)) != 0) {
+ printfunc("threadfunc() - initialization - %s\n",
+ strerror(err));
+ return;
+ }
- pthread_msg_init((pthread_msg_t *)&msg, &rport);
msg.id = id;
- pthread_mutex_lock(&print_lock);
- printf("Thread #%d started\n", id);
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
+ (void) printfunc("Thread #%d started\n", id);
for (i = 0; i < ROUNDS; i++) {
- int err;
-
/*
- * Prepare and send synchronous message. For asynchronous
+ * Prepare and send synchronous message. For asynchronous
* operation, we would need to allocate a message and to send
* it, and not expect a reply back immediately, even letting
- * the other end free the message as necessary. In synchronous
+ * the other end free the message as necessary. In synchronous
* mode we can use the same message over and over and share
* its memory area using proper send/reply methods for
* synchronization.
*/
msg.i = i;
if ((err = pthread_msg_put(&main_port, (pthread_msg_t *)&msg))
- != 0) {
- pthread_mutex_lock(&print_lock);
- printf("Main: pthread_message_put() - %s\n",
+ != 0)
+ printfunc("Thread: pthread_message_put() - %s\n",
strerror(err));
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
- }
/* Now wait for synchronous reply and discard it */
if ((err = pthread_ring_wait(&rring, NULL)) != 0) {
- pthread_mutex_lock(&print_lock);
- printf("Thread: pthread_ring_wait() - %s\n",
+ printfunc("Thread: pthread_ring_wait() - %s\n",
strerror(err));
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
break;
}
- (void) pthread_msg_get(&rport);
+ if (pthread_msg_get(&rport) == NULL)
+ printfunc("Thread: pthread_msg_get() == NULL!?\n");
+ printfunc("Thread #%d received reply message for %d\n",
+ id, i);
}
- pthread_mutex_lock(&print_lock);
- printf("Thread #%d ending\n", id);
- fflush(stdout);
- pthread_mutex_unlock(&print_lock);
+ printfunc("Thread #%d ending\n", id);
+
+ (void) pthread_port_destroy(&rport);
+ (void) pthread_ring_destroy(&rring);
+ (void) pthread_msg_destroy((pthread_msg_t *)&msg);
+}
- pthread_port_destroy(&rport);
- pthread_ring_destroy(&rring);
- pthread_msg_destroy((pthread_msg_t *)&msg);
+static void
+printfunc(const char *fmt, ...)
+{
+ char buf[1024];
+ va_list arg_ptr;
+ int len;
+
+ *buf = '\0';
+ va_start(arg_ptr, fmt);
+ if ((len = vsnprintf(buf, 1023, fmt, arg_ptr)) < 1)
+ return;
+ va_end(arg_ptr);
+
+#ifdef PRINTLOCK
+ (void) pthread_mutex_lock(&print_lock);
+#endif
+ (void) fwrite(buf, len, 1, stdout);
+#ifdef PRINTLOCK
+ (void) fflush(stdout);
+ (void) pthread_mutex_unlock(&print_lock);
+#endif
}