-/* $Id: mmserver.c,v 1.34.2.1 2005/11/16 01:00:58 mmondor Exp $ */
+/* $Id: mmserver.c,v 1.34.2.2 2005/11/18 10:04:01 mmondor Exp $ */
/*
* Copyright (C) 2000-2004, Matthew Mondor
MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmserver.c,v 1.34.2.1 2005/11/16 01:00:58 mmondor Exp $");
+MMRCSID("$Id: mmserver.c,v 1.34.2.2 2005/11/18 10:04:01 mmondor Exp $");
static int (*handleclientfunc)(unsigned long, int,
clientlistnode *, struct iface *,
struct async_clenv *);
-static pthread_mutex_t ctable_lock, ppool_lock;
+static pthread_mutex_t ctable_lock = PTHREAD_MUTEX_INITIALIZER,
+ ppool_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t cpool, ppool;
static hashtable_t ctable;
static struct async_env *async = NULL;
pthread_attr_setdetachstate(&threadattr, 1);
pthread_attr_init(&clnode_threadattr);
pthread_attr_setdetachstate(&clnode_threadattr, 0);
- pthread_mutex_init(&ctable_lock, NULL);
- pthread_mutex_init(&ppool_lock, NULL);
/* Used by resolve_hostname() */
if (!mmstrinit(malloc, free, 16384)) {
exit(-1);
}
- if ((err = pthread_object_init(8)) != 0) {
+ if ((err = pthread_object_init(/*XXX 8*/4)) != 0) {
syslog(LOG_NOTICE, "tcp_server() - pthread_object_init() - %s",
strerror(err));
exit(-1);
hashtable_destroy(&ctable, FALSE);
pool_destroy(&ppool);
pool_destroy(&cpool);
- pthread_mutex_destroy(&ctable_lock);
- pthread_mutex_destroy(&ppool_lock);
exit(ret);
}
signal(SIGTTIN, SIG_IGN);
signal(SIGTSTP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
+ signal(SIGUSR2, SIG_IGN);
umask(0);
* any, but only wait for them if all processes are free (we don't
* expect any results).
*/
- if (pthread_port_pending(&async->port) > 0) {
- while ((omsg = (char *)pthread_msg_get(&async->port)) != NULL)
- DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
- } else if (DLIST_NODES(freeprocs) == async->nprocs) {
+ while ((omsg = (char *)pthread_msg_get(&async->port)) != NULL)
+ DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
+ if (DLIST_NODES(freeprocs) == async->nprocs &&
+ DLIST_NODES(&queue) == 0) {
/* XXX */ syslog(LOG_NOTICE, "* pthread_ring_wait() starting");
pthread_ring_wait(&async->ring, NULL);
/* XXX */ syslog(LOG_NOTICE, "* pthread_ring_wait() ended");
- while ((omsg = (char *)pthread_msg_get(&async->port)) != NULL)
- DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
+ continue;
}
/* Verify for any available processes to dispatch requests to, and
/* Wait for results from our async processes via their socket,
* but be sure to break when new inter-thread messages are available.
* If new results are obtained, reply back to the caller thread.
+ * If all processes are free, go back to pthread_ring_wait()
+ * instead of pthread_poll_ring().
*/
- for (;;) {
- int selr, i;
-
- /* XXX */ syslog(LOG_NOTICE, "* pthread_poll_ring() starting");
- selr = pthread_poll_ring(pfds, nfds, -1, &async->ring);
- /* XXX */ syslog(LOG_NOTICE, "* pthread_poll_ring() ended (selr = "
- "%d (%s)", selr, strerror(errno));
- if (selr == -1 && errno == ECANCELED)
- break;
- if (selr > 0) {
- for (i = 0; selr > 0 && i < nfds; i++) {
- if (pfds[i].revents & POLLERR) {
- /* If this happens something is really wrong, exit */
- syslog(LOG_NOTICE, "async_thread() - POLLERR(%d)",
- pfds[i].fd);
- kill(0, SIGTERM);
- for (;;)
- sleep(30);
- }
- if (pfds[i].revents & POLLIN) {
- int fd = pfds[i].fd;
- size_t len;
- struct async_proc *p = idx[fd].aproc;
- struct async_clenv *e = idx[fd].aclenv;
-
- /* Results, reply message back to client thread and
- * return this process node in the free pool.
- */
- if ((len = recv(fd, e->msg, async->msg_len,
- MSG_WAITALL)) <
- sizeof(struct async_msg))
- syslog(LOG_NOTICE, "async_thread() - recv(%d:%d)",
- fd, (int)async->msg_len);
- /* XXX */syslog(LOG_NOTICE, "recv()ed reply");
- pthread_msg_reply(&(e->msg->msg));
- DLIST_APPEND(freeprocs, (node_t *)p);
- selr--;
- }
- }
- }
+ while (DLIST_NODES(freeprocs) < async->nprocs) {
+ int selr, i;
+
+ /* XXX */ syslog(LOG_NOTICE, "* pthread_poll_ring() starting");
+ selr = pthread_poll_ring(pfds, nfds, -1, &async->ring);
+ /* XXX */ syslog(LOG_NOTICE, "* pthread_poll_ring() ended (selr = "
+ "%d (%s)", selr, strerror(errno));
+ if (selr == -1 && errno == ECANCELED)
+ break;
+ for (i = 0; selr > 0 && i < nfds; i++) {
+ if (pfds[i].revents & POLLERR) {
+ /* If this happens something is really wrong, exit */
+ syslog(LOG_NOTICE, "async_thread() - POLLERR(%d)",
+ pfds[i].fd);
+ kill(0, SIGTERM);
+ for (;;)
+ pthread_sleep(30);
+ }
+ if (pfds[i].revents & POLLIN) {
+ int fd = pfds[i].fd;
+ size_t len;
+ struct async_proc *p = idx[fd].aproc;
+ struct async_clenv *e = idx[fd].aclenv;
+
+ /* Results, reply message back to client thread and
+ * return this process node in the free pool.
+ */
+ if ((len = recv(fd, e->msg, async->msg_len,
+ MSG_WAITALL)) < sizeof(struct async_msg))
+ syslog(LOG_NOTICE, "async_thread() - recv(%d:%d)",
+ fd, (int)async->msg_len);
+ /* XXX */syslog(LOG_NOTICE, "recv()ed reply");
+ pthread_msg_reply(&(e->msg->msg));
+ /* XXX */syslog(LOG_NOTICE, "pthread_msg_reply() ended");
+ DLIST_APPEND(freeprocs, (node_t *)p);
+ selr--;
+ }
+ }
}
}
pthread_t thread;
res = FALSE;
- pthread_attr_init(&attrs);
/* Start async slave thread device */
if (pthread_port_init(&async->port) == 0) {
if (pthread_ring_init(&async->ring) == 0) {
pthread_port_set_ring(&async->port, &async->ring);
+ pthread_attr_init(&attrs);
pthread_attr_setdetachstate(&attrs, 1);
if ((pthread_create(&thread, &attrs, async_thread, NULL))
== 0)