Merged pthread-branch to trunk
authorMatthew Mondor <mmondor@pulsar-zone.net>
Tue, 13 Mar 2007 20:28:22 +0000 (20:28 +0000)
committerMatthew Mondor <mmondor@pulsar-zone.net>
Tue, 13 Mar 2007 20:28:22 +0000 (20:28 +0000)
16 files changed:
mmsoftware/mmftpd/ChangeLog
mmsoftware/mmftpd/GNUmakefile
mmsoftware/mmftpd/src/mmftpd.c
mmsoftware/mmftpd/src/mmftpd.h
mmsoftware/mmlib/mmreadcfg.c
mmsoftware/mmlib/mmserver.c
mmsoftware/mmlib/mmserver.h
mmsoftware/mmlib/mmsql.c
mmsoftware/mmlib/mmsql.h
mmsoftware/mmlib/mmstr.c
mmsoftware/mmmail/ChangeLog
mmsoftware/mmmail/GNUmakefile
mmsoftware/mmmail/src/mmpop3d/mmpop3d.c
mmsoftware/mmmail/src/mmpop3d/mmpop3d.h
mmsoftware/mmmail/src/mmsmtpd/mmsmtpd.c
mmsoftware/mmmail/src/mmsmtpd/mmsmtpd.h

index 564bcef..10d9ba1 100644 (file)
@@ -1,4 +1,20 @@
-$Id: ChangeLog,v 1.50 2005/11/17 07:38:07 mmondor Exp $
+$Id: ChangeLog,v 1.51 2007/03/13 20:28:22 mmondor Exp $
+
+
+
+Release: mmftpd 0.1.0 devl
+Date   : March 13, 2007
+By     : Matthew Mondor
+
+* Performance enhancements
+  - Now uses pthread instead of pth, using the new pthread_utils library
+    for functionality which pth had but pthread lacks.  This allows to
+    execute less functions through dedicated processes (which also require
+    some send(2)/recv(2) overhead), because of the expected preemptive
+    nature of most pthread implementations.  It also allows better scaling
+    with SMP.
+  - Uses PTHREAD_MUTEX_INITIALIZER where possible instead of
+    pthread_mutex_init().
 
 
 
index 1c17ca1..5e3ff6e 100644 (file)
@@ -1,24 +1,26 @@
-# $Id: GNUmakefile,v 1.6 2006/12/13 14:37:12 mmondor Exp $
+# $Id: GNUmakefile,v 1.7 2007/03/13 20:28:22 mmondor Exp $
 
 MMLIBS := $(addprefix ../mmlib/,mmarch.o mmfd.o mmhash.o mmlimitrate.o \
-mmlog.o mmpath.o mmpool.o mmreadcfg.o mm_pth_pool.o mmserver.o mmstat.o \
-mmstr.o mmstring.o)
-
-PTHINCDIR := $(shell pth-config --cflags)
-PTHLIBDIR := $(shell pth-config --libdir)
+mmlog.o mmpath.o mmpool.o mmreadcfg.o mmserver.o mmstat.o \
+mmstr.o mmstring.o) $(addprefix ../pthread_util/,mm_pthread_msg.o \
+mm_pthread_poll.o mm_pthread_pool.o mm_pthread_sleep.o)
 
 OBJS := src/mmftpd.o
 
 CFLAGS += -Wall
+#CFLAGS += -DNODETACH -DDEBUG -DPTHREAD_DEBUG -g3
+
+LDFLAGS += -lc -lcrypt -lpthread
+#LDFLAGS += -lpthread_dbg
 
 
 all: src/mmftpd
 
 %.o: %.c
-       $(CC) -c ${CFLAGS} -I. -I../mmlib $(PTHINCDIR) -o $@ $<
+       $(CC) -c ${CFLAGS} -I. -I../mmlib -I../pthread_util -o $@ $<
 
 src/mmftpd: $(MMLIBS) $(OBJS)
-       $(CC) -o $@ -L$(PTHLIBDIR) -lc -lcrypt -lpth $(OBJS) $(MMLIBS)
+       $(CC) -o $@ $(LDFLAGS) $(OBJS) $(MMLIBS)
 
 install:
        install -cs -o 0 -g 0 -m 500 src/mmftpd /usr/local/sbin
index 6e71147..a1caa6a 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmftpd.c,v 1.66 2004/09/19 11:02:45 mmondor Exp $ */
+/* $Id: mmftpd.c,v 1.67 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
@@ -60,7 +60,7 @@
 #include <ctype.h>
 #include <syslog.h>
 
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmtypes.h>
 #include <mmreadcfg.h>
 #include <mmstring.h>
 #include <mmfifo.h>
 #include <mmstat.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+#include <mm_pthread_sleep.h>
 
 #include "mmftpd.h"
 
@@ -84,7 +88,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmftpd.c,v 1.66 2004/09/19 11:02:45 mmondor Exp $");
+MMRCSID("$Id: mmftpd.c,v 1.67 2007/03/13 20:28:22 mmondor Exp $");
 
 
 
@@ -109,24 +113,24 @@ MMRCSID("$Id: mmftpd.c,v 1.66 2004/09/19 11:02:45 mmondor Exp $");
 static CONFIG CONF;
 
 /* Used to start transfer threads */
-static pth_attr_t tthreadattr;
+static pthread_attr_t tthreadattr;
 
 /* List of logged in users and optionally current home directory size */
-static pth_mutex_t lusers_lock;
+static pthread_mutex_t lusers_lock = PTHREAD_MUTEX_INITIALIZER;
 static pool_t lusers_pool;
 static hashtable_t lusers_table;
 
 /* This is used so that clientenv structures be allocated/freed fast */
-static pth_mutex_t clenvs_lock;
+static pthread_mutex_t clenvs_lock = PTHREAD_MUTEX_INITIALIZER;
 static pool_t clenvs_pool;
 static pool_t fifos_pool;
 
 /* Pool used to optimize creating/destroying mmfd mutexes */
-static pth_mutex_t mutexes_lock;
+static pthread_mutex_t mutexes_lock = PTHREAD_MUTEX_INITIALIZER;
 static pool_t mutexes_pool;
 
 /* Used for the longer-term directory size cache */
-static pth_mutex_t quota_lock;
+static pthread_mutex_t quota_lock = PTHREAD_MUTEX_INITIALIZER;
 static pool_t quota_pool;
 static hashtable_t quota_table;
 
@@ -300,17 +304,17 @@ static const struct tr_messages tr_msg[] = {
 static fdfuncs gfdf = {
     malloc,
     free,
-    pth_poll,
-    pth_read,
-    pth_write,
-    pth_sleep,
-    pth_usleep,
-    _pth_mutex_create,
-    _pth_mutex_destroy,
-    _pth_mutex_lock,
-    _pth_mutex_unlock,
-    _pth_thread_yield,
-    _pth_eintr
+    poll,
+    read,
+    write,
+    pthread_sleep,
+    pthread_usleep,
+    thread_mutex_create,
+    thread_mutex_destroy,
+    thread_mutex_lock,
+    thread_mutex_unlock,
+    NULL,
+    thread_eintr
 };
 
 /* async_getuserline() uses this for each asynchroneous process */
@@ -551,7 +555,7 @@ auth_pass(clientenv *clenv)
 
        /* We slow down password guessing programs */
        if (clenv->attempts > 0)
-           pth_sleep(clenv->attempts * 2);
+           pthread_sleep(clenv->attempts * 2);
        clenv->attempts++;
 
        if (clenv->anonymous) {
@@ -648,7 +652,7 @@ auth_pass(clientenv *clenv)
        }
        directory_message(clenv, 230);
 
-       pth_mutex_acquire(&lusers_lock, FALSE, NULL);
+       pthread_mutex_lock(&lusers_lock);
 
        /* Verify if user in list */
        len = mm_strlen(clenv->user) + 1;
@@ -657,14 +661,14 @@ auth_pass(clientenv *clenv)
            bool lok = TRUE;
 
            /* Yes, make sure that we observe limits, if any */
-           pth_mutex_acquire(&lun->lock, FALSE, NULL);
+           pthread_mutex_lock(&lun->lock);
            if (clenv->maxlogins != 0) {
                if (lun->logins < clenv->maxlogins)
                    lun->logins++;
                else lok = FALSE;
            } else
                lun->logins++;
-           pth_mutex_release(&lun->lock);
+           pthread_mutex_unlock(&lun->lock);
            if (lok) {
                clenv->unode = lun;
                if (clenv->anonymous)
@@ -697,7 +701,7 @@ auth_pass(clientenv *clenv)
                    nextstate = STATE_ERROR;
                }
                if (lun != NULL) {
-                   pth_mutex_init(&lun->lock);
+                   pthread_mutex_init(&lun->lock, NULL);
                    lun->logins = 1;
                    mm_memcpy(lun->user, clenv->user, len);
                    clenv->unode = lun;
@@ -716,7 +720,7 @@ auth_pass(clientenv *clenv)
            }
        }
 
-       pth_mutex_release(&lusers_lock);
+       pthread_mutex_unlock(&lusers_lock);
     }
 
     return (nextstate);
@@ -2228,9 +2232,9 @@ main_stat(clientenv *clenv)
        {
            long logins;
 
-           pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+           pthread_mutex_lock(&clenv->unode->lock);
            logins = clenv->unode->logins;
-           pth_mutex_release(&clenv->unode->lock);
+           pthread_mutex_unlock(&clenv->unode->lock);
            fdbprintf(fdb, "    Logged in as %s (%ld concurrent logins)\r\n",
                    clenv->user, logins);
        }
@@ -2278,9 +2282,9 @@ main_stat(clientenv *clenv)
 
            fdbprintf(fdb, "  %lld bytes", (int64_t)clenv->maxhomesize);
            fdbwrite(fdb, "\r\n    Current home directory size: ", 36);
-           pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+           pthread_mutex_lock(&clenv->unode->lock);
            size = clenv->unode->homesize;
-           pth_mutex_release(&clenv->unode->lock);
+           pthread_mutex_unlock(&clenv->unode->lock);
            fdbprintf(fdb, "%lld bytes", (int64_t)size);
        } else
            fdbwrite(fdb, "disabled", 8);
@@ -2334,7 +2338,7 @@ main_stat(clientenv *clenv)
 int
 main(int argc, char **argv)
 {
-    char *conf_file = "/etc/mmftpd.conf";
+    char *conf_file = "/usr/local/etc/mmftpd.conf";
     pid_t uid;
     gid_t *gids;
     int ngids, ret = EXIT_SUCCESS;
@@ -2415,7 +2419,7 @@ main(int argc, char **argv)
 
     /* Set defaults */
     *CONF.CHROOT_DIR = '\0';
-    mm_strcpy(CONF.PASSWD_FILE, "/etc/mmftpdpasswd");
+    mm_strcpy(CONF.PASSWD_FILE, "/usr/local/etc/mmftpdpasswd");
     mm_strcpy(CONF.PID_PATH, "/var/run/mmftpd.pid");
     mm_strcpy(CONF.USER, "mmftpd");
     mm_strcpy(CONF.GROUPS, "mmftpd,mmstat");
@@ -2530,29 +2534,24 @@ main(int argc, char **argv)
     async_init(afuncs, CONF.ASYNC_PROCESSES, uid, gids, ngids);
 
     /* Threading system initialization */
-    pth_init();
+    /* pthread_init(); */
 
     /* Part of mmserver(3)'s async system which has to be initialized after
      * the threading system
      */
-    async_init_pth();
-
-    pth_mutex_init(&lusers_lock);
-    pth_mutex_init(&clenvs_lock);
-    pth_mutex_init(&mutexes_lock);
-    pth_mutex_init(&quota_lock);
+    async_init_pthread();
 
     srandom(getpid() + time(NULL));
 
     /* We use those for our transfer ASYNC thread, joinable because when we
      * eventually request it to quit via a message we want to make sure it
-     * ended properly waiting for it with pth_join(). I have decided to use
+     * ended properly waiting for it with pthread_join(). I have decided to use
      * a second thread dedicated to file transfers, this way I can respect
      * the RFC on listening to ABOR and STAT easily on the control connection
      * while transfers are ongoing, with ease and modularity.
      */
-    tthreadattr = pth_attr_new();
-    pth_attr_set(tthreadattr, PTH_ATTR_JOINABLE, TRUE);
+    pthread_attr_init(&tthreadattr);
+    pthread_attr_setdetachstate(&tthreadattr, 0);
 
     /* Initialize our pools */
     /* Client environment nodes */
@@ -2586,6 +2585,7 @@ main(int argc, char **argv)
            POOL_VALID(&mutexes_pool) && POOL_VALID(&quota_pool) &&
            HASHTABLE_VALID(&quota_table) &&
            (!*CONF.DISPLAY_FILE || POOL_VALID(&fifos_pool))) {
+       thread_init();
        fdbcinit(&gfdf, &fdbc, CONF.GBANDWIDTH_IN * 1024,
                CONF.GBANDWIDTH_OUT * 1024);
        /* Finally start our TCP main daemon */
@@ -2759,13 +2759,13 @@ stripoptions(char *str)
     bool space;
 
     /* First locate end of command */
-    for (; *str != '\0' && !isspace(*str); str++) ;
+    for (; *str != '\0' && !isspace((int)*str); str++) ;
 
     if (*str != '\0') {
        space = FALSE;
        while (*str != '\0') {
            /* Find first printable char */
-           for (; *str != '\0' && isspace(*str); str++)
+           for (; *str != '\0' && isspace((int)*str); str++)
                space = TRUE;
            /* Is it an option? */
            if (space && *str == '-') {
@@ -2773,7 +2773,7 @@ stripoptions(char *str)
                 * char
                 */
                space = FALSE;
-               while (*str != '\0' && !isspace(*str))
+               while (*str != '\0' && !isspace((int)*str))
                    *str++ = ' ';
            } else
                break;
@@ -2864,7 +2864,7 @@ treesize_edit(clientenv *clenv, off_t modify)
     bool ret = TRUE;
 
     if (clenv->maxhomesize && modify != 0) {
-       pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+       pthread_mutex_lock(&clenv->unode->lock);
        cur = clenv->unode->homesize;
        cur += modify;
        if (cur < 0)
@@ -2873,7 +2873,7 @@ treesize_edit(clientenv *clenv, off_t modify)
            clenv->unode->homesize = cur;
        else
            ret = FALSE;
-       pth_mutex_release(&clenv->unode->lock);
+       pthread_mutex_unlock(&clenv->unode->lock);
     }
 
     return (ret);
@@ -2979,11 +2979,11 @@ alloc_clientenv(void)
     clientenv *clenv;
     struct fifonode *fnode = NULL;
 
-    pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
+    pthread_mutex_lock(&clenvs_lock);
     clenv = (clientenv *)pool_alloc(&clenvs_pool, TRUE);
     if (*CONF.DISPLAY_FILE)
        fnode = (struct fifonode *)pool_alloc(&fifos_pool, FALSE);
-    pth_mutex_release(&clenvs_lock);
+    pthread_mutex_unlock(&clenvs_lock);
 
     if (clenv) {
        if (!(*CONF.DISPLAY_FILE) || fnode) {
@@ -2998,9 +2998,9 @@ alloc_clientenv(void)
        free_clientenv(clenv);
     } else {
        if (fnode) {
-           pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
+           pthread_mutex_lock(&clenvs_lock);
            pool_free((pnode_t *)fnode);
-           pth_mutex_release(&clenvs_lock);
+           pthread_mutex_unlock(&clenvs_lock);
        }
     }
 
@@ -3025,15 +3025,14 @@ free_clientenv(clientenv *clenv)
     if (clenv->unode) {
        bool zero = FALSE;
 
-       pth_mutex_acquire(&lusers_lock, FALSE, NULL);
-       pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+       pthread_mutex_lock(&lusers_lock);
+       pthread_mutex_lock(&clenv->unode->lock);
        if ((--clenv->unode->logins) < 1)
            zero = TRUE;
-       pth_mutex_release(&clenv->unode->lock);
+       pthread_mutex_unlock(&clenv->unode->lock);
        if (zero) {
            hashtable_unlink(&lusers_table, (hashnode_t *)clenv->unode);
-           /* May be required by some POSIX thread implementations */
-           /* pth_mutex_destroy(&clenv->unode->lock); */
+           pthread_mutex_destroy(&clenv->unode->lock);
 
            /* We now need to either update the cache entry for this home
             * directory size or to create a new entry for it, but only if
@@ -3049,7 +3048,7 @@ free_clientenv(clientenv *clenv)
                 */
                len = mm_strlen(home) + 1;
                t = time(NULL);
-               pth_mutex_acquire(&quota_lock, FALSE, NULL);
+               pthread_mutex_lock(&quota_lock);
                if ((qnod = (quotanode *)hashtable_lookup(&quota_table, home,
                                len)) != NULL) {
                    qnod->homesize = clenv->unode->homesize;
@@ -3064,18 +3063,20 @@ free_clientenv(clientenv *clenv)
                                qnod->dir, len, FALSE);
                    }
                }
-               pth_mutex_release(&quota_lock);
+               pthread_mutex_unlock(&quota_lock);
            }
            pool_free((pnode_t *)clenv->unode);
        }
-       pth_mutex_release(&lusers_lock);
+       pthread_mutex_unlock(&lusers_lock);
     }
 
-    pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
-    if (fnode != NULL) pool_free((pnode_t *)fnode);
-    if (home != NULL) mmstrfree(home);
+    pthread_mutex_lock(&clenvs_lock);
+    if (fnode != NULL)
+       pool_free((pnode_t *)fnode);
+    if (home != NULL)
+       mmstrfree(home);
     pool_free((pnode_t *)clenv);
-    pth_mutex_release(&clenvs_lock);
+    pthread_mutex_unlock(&clenvs_lock);
 
     return (NULL);
 }
@@ -3085,25 +3086,23 @@ free_clientenv(clientenv *clenv)
 static bool
 start_transfer_thread(clientenv *clenv)
 {
-    /* I do not understand why libpth doesn't provide a function to set the
-     * reply port, or to init the message structure. Moreover, it theoretically
-     * shouldn't need the message size if it simply swaps messages around port
-     * linked list FIFOs via the message node. This currently seems the best
-     * way to go about it
-     */
-    clenv->tmsg.msg.m_size = sizeof(transfermsg);
-    clenv->tmsg.msg.m_replyport = clenv->rport;
+    pthread_msg_init(&clenv->tmsg.msg, &clenv->rport);
 
     /*
-    if ((clenv->tthread = pth_spawn(tthreadattr, (void *)transferthread,
-                   clenv))) {
-       if (transfer_request(REQ_NONE, TRUE, clenv)) return (TRUE);
-       pth_join(clenv->tthread, NULL);
+    if (pthread_create(&clenv->tthread, &tthreadattr, (void *)transferthread,
+                       clenv) == 0) {
+       if (transfer_request(REQ_NONE, TRUE, clenv))
+           return (TRUE);
+       pthread_join(&clenv->tthread, NULL);
     }
     */
-    thread_object_call(NULL, transferthread, clenv);
+    if (pthread_object_call(NULL, transferthread, clenv) != 0)
+       DEBUG_PRINTF("start_transfer_thread", "pthread_object_call()");
+
     if (transfer_request(REQ_NONE, TRUE, clenv))
        return TRUE;
+    else
+       DEBUG_PRINTF("start_transfer_thread", "transfer_request(REQ_NONE)");
 
     return (FALSE);
 }
@@ -3113,7 +3112,7 @@ start_transfer_thread(clientenv *clenv)
 static void
 stop_transfer_thread(clientenv *clenv)
 {
-    if (clenv->sport) {
+    if (pthread_port_valid(&clenv->sport)) {
        /* Send a quit request and wait for reply */
        if (!transfer_request(REQ_QUIT, TRUE, clenv))
            DEBUG_PRINTF("stop_transfer_thread",
@@ -3123,9 +3122,11 @@ stop_transfer_thread(clientenv *clenv)
         * Join thread as we want to make sure it exits before we do.
         */
        /*
-       pth_join(clenv->tthread, NULL);
+       pthread_join(&clenv->tthread, NULL);
        */
     }
+    if (pthread_msg_valid(&clenv->tmsg.msg))
+       pthread_msg_destroy(&clenv->tmsg.msg);
 }
 
 
@@ -3139,33 +3140,34 @@ transfer_request(int req, bool waitreply, clientenv *clenv)
 {
     int res = FALSE;
     transfermsg *msg = &(clenv->tmsg);
-    pth_event_t ring;
 
     if (req != REQ_NONE) {
        msg->request = req;
        msg->result = FALSE;
-       pth_msgport_put(clenv->sport, (pth_message_t *)msg);
+       pthread_msg_put(&clenv->sport, (pthread_msg_t *)msg);
     }
     if (waitreply) {
-       ring = pth_event(PTH_EVENT_TIME, pth_timeout(120, 0));
-       pth_event_concat(clenv->rring, ring, NULL);
        for (;;) {
-           if ((pth_wait(clenv->rring))) {
-               if ((pth_event_occurred(ring))) {
-                   /* Timeout occured waiting for reply */
-                   res = FALSE;
-                   break;
-               }
-               if ((pth_event_occurred(clenv->rring))) {
-                   if ((msg = (transfermsg *)pth_msgport_get(clenv->rport))) {
-                       res = msg->result;
+           if ((msg = (transfermsg *)pthread_msg_get(&clenv->rport))
+                   == NULL) {
+               struct timeval  tv;
+               struct timespec ts, ts1;
+
+               ts1.tv_sec = 120;
+               ts1.tv_nsec = 0;
+               (void) gettimeofday(&tv, NULL);
+               TIMEVAL_TO_TIMESPEC(&tv, &ts);
+               timespecadd(&ts, &ts1, &ts);
+
+               if (pthread_ring_wait(&clenv->rring, &ts) == ETIMEDOUT) {
+                       res = FALSE;
                        break;
-                   }
                }
+           } else {
+               res = msg->result;
+               break;
            }
        }
-       pth_event_isolate(clenv->rring);
-       pth_event_free(ring, PTH_FREE_ALL);
     } else
        res = TRUE;
 
@@ -3298,7 +3300,7 @@ valid_ipaddress(const char *addr)
                return FALSE;
            if (*addr == '\0')
                break;
-       } else if (isdigit(*addr))
+       } else if (isdigit((int)*addr))
            *uptr++ = *addr;
        else
            return FALSE;
@@ -3510,8 +3512,9 @@ handleclient(unsigned long cid, int fd, clientlistnode *clientlnode,
            }
 
            /* Open our reply port and start our file transfer thread */
-           if ((clenv->rport = pth_msgport_create("XXX(NULL)"))) {
-               clenv->rring = pth_event(PTH_EVENT_MSG, clenv->rport);
+           if (pthread_port_init(&clenv->rport) == 0 &&
+                   pthread_ring_init(&clenv->rring) == 0 &&
+                   pthread_port_set_ring(&clenv->rport, &clenv->rring) == 0) {
 
                /* Init our clientenv as required */
                clenv->fdb = fdb;
@@ -3648,11 +3651,14 @@ handleclient(unsigned long cid, int fd, clientlistnode *clientlnode,
                } else
                    DEBUG_PRINTF("handleclient", "start_transfer_thread()");
 
-               pth_event_free(clenv->rring, PTH_FREE_ALL);
-               while ((pth_msgport_get(clenv->rport)) != NULL) ;
-               pth_msgport_destroy(clenv->rport);
+               while (pthread_msg_get(&clenv->rport) != NULL) ;
            } else
-               DEBUG_PRINTF("handleclient", "pth_msgport_create()");
+               DEBUG_PRINTF("handleclient", "pthread_port_init()");
+
+           if (pthread_port_valid(&clenv->rport))
+               pthread_port_destroy(&clenv->rport);
+           if (pthread_ring_valid(&clenv->rring))
+               pthread_ring_destroy(&clenv->rring);
 
            control_in = FDBBYTESR(fdb);
            control_out = FDBBYTESW(fdb);
@@ -3787,7 +3793,7 @@ pasv_bind(clientenv *clenv, struct sockaddr_in *server, int *sock, int *port)
                        p = CONF.PASV_RANGE_MIN;
                }
                if (err == -1)
-                   pth_sleep(1);
+                   pthread_sleep(1);
            }
            if (err != -1) {
                /* Successful bind(2) */
@@ -3819,7 +3825,7 @@ pasv_bind(clientenv *clenv, struct sockaddr_in *server, int *sock, int *port)
  * other transfer requests when one is ongoing already.
  */
 static void
-transferthread(struct thread_object *obj, void *args)
+transferthread(pthread_object_t *obj, void *args)
 {
     clientenv *clenv = args;
     fdbuf *fdb;
@@ -3838,23 +3844,23 @@ transferthread(struct thread_object *obj, void *args)
     if ((fdb = fdbopen(&gfdf, &fdbc, -1, FDB_BUFSIZE, FDB_BUFSIZE, 1024, 1024,
                    CONF.DATA_TIMEOUT * 1000, CONF.DATA_TIMEOUT * 1000,
                    FALSE)) != NULL) {
-       if ((clenv->sport = pth_msgport_create("XXX(NULL)"))) {
+       if (pthread_port_init(&clenv->sport) == 0) {
            /* Reply that we started thread properly */
            ok = TRUE;
            clenv->tmsg.result = TRUE;
-           pth_msgport_put(clenv->rport, (pth_message_t *)&(clenv->tmsg));
+           pthread_msg_put(&clenv->rport, (pthread_msg_t *)&(clenv->tmsg));
 
            /* Process our tasks */
            transferthread_main(clenv, fdb);
 
-           while ((msg = (transfermsg *)pth_msgport_get(clenv->sport))
+           while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
                    != NULL) {
                msg->result = FALSE;
-               pth_msgport_reply((pth_message_t *)msg);
+               pthread_msg_reply((pthread_msg_t *)msg);
            }
-           pth_msgport_destroy(clenv->sport);
+           pthread_port_destroy(&clenv->sport);
        } else
-           DEBUG_PRINTF("transferthread", "pth_msgport_create()");
+           DEBUG_PRINTF("transferthread", "pthread_port_init()");
        fdbclose(fdb);
     } else
        DEBUG_PRINTF("transferthread", "fdbopen()");
@@ -3862,11 +3868,11 @@ transferthread(struct thread_object *obj, void *args)
     if (!ok) {
        /* Reply that thread couldn't be setup properly */
        clenv->tmsg.result = FALSE;
-       pth_msgport_put(clenv->rport, (pth_message_t *)&(clenv->tmsg));
+       pthread_msg_put(&clenv->rport, (pthread_msg_t *)&(clenv->tmsg));
     }
 
     /*
-    pth_exit(NULL);
+    pthread_exit(NULL);
     */
 }
 
@@ -3915,10 +3921,10 @@ transferthread(struct thread_object *obj, void *args)
 static void
 transferthread_main(clientenv *clenv, fdbuf *fdb)
 {
-    int state = TTSTATE_INITIAL, l, reason;
+    int state = TTSTATE_INITIAL, l, reason, err;
     register char *from, *to;
     char ipaddr[20], buffer[T_BUFSIZE], *from2;
-    pth_event_t ring, ring1;
+    pthread_ring_t ring;
     transfermsg *msg;
     struct sockaddr_in server, client, addr;
     socklen_t addrl;
@@ -3933,7 +3939,8 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
     fdbparam_set(fdb, -1, FDBP_FD);
 
     /* Transfer device entry setup */
-    ring = pth_event(PTH_EVENT_MSG, clenv->sport);
+    pthread_ring_init(&ring);
+    pthread_port_set_ring(&clenv->sport, &ring);
 
     /* Main device loop */
     while (state != TTSTATE_DONE) {
@@ -3965,109 +3972,107 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
 
            /* State main loop */
            while (state == TTSTATE_INITIAL) {
-               if ((pth_wait(ring))) {
-                   if ((pth_event_occurred(ring))) {
-                       while ((msg = (transfermsg *)pth_msgport_get(
-                                       clenv->sport)) != NULL) {
-                           /* Process request and reply */
-                           switch (msg->request) {
-                           case REQ_QUIT:
-                               msg->result = TRUE;
-                               state = TTSTATE_DONE;
-                               break;
-                           case REQ_ABORT:
-                               /* As we handle PASV port, do this */
-                               if (fdpasv != -1) {
-                                   close(fdpasv);
-                                   fdpasv = -1;
+               if (pthread_port_pending(&clenv->sport) < 1)
+                   pthread_ring_wait(&ring, NULL);
+               while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
+                       != NULL) {
+                   /* Process request and reply */
+                   switch (msg->request) {
+                       case REQ_QUIT:
+                           msg->result = TRUE;
+                           state = TTSTATE_DONE;
+                           break;
+                       case REQ_ABORT:
+                           /* As we handle PASV port, do this */
+                           if (fdpasv != -1) {
+                               close(fdpasv);
+                               fdpasv = -1;
+                           }
+                           port = 0;
+                           passive = ongoing = FALSE;
+                           download = TRUE;
+                           list = 0;
+                           *path = '\0';
+                           msg->result = TRUE;
+                           break;
+                       case REQ_STATUS:
+                           msg->port = (u_int16_t)port;
+                           msg->passive = passive;
+                           msg->ongoing = ongoing;
+                           msg->download = download;
+                           msg->list = list;
+                           msg->rbytes = FDBBYTESR(fdb);
+                           msg->wbytes = FDBBYTESW(fdb);
+                           msg->dlfiles = dlfiles;
+                           msg->ulfiles = ulfiles;
+                           msg->result = TRUE;
+                           break;
+                       case REQ_NEWPORT:
+                           port = 0;
+                           if (fd != -1) {
+                               fdbflushw(fdb);
+                               shutdown(fd, SHUT_RDWR);
+                               close(fd);
+                               fd = -1;
+                               fdbparam_set(fdb, -1, FDBP_FD);
+                           }
+                           /* As we handle PASV port, do this */
+                           if (fdpasv != -1) {
+                               close(fdpasv);
+                               fdpasv = -1;
+                           }
+                           if (msg->passive) {
+                               /* PASV request, we need to open a high
+                                * port and start listening for a
+                                * connection on it
+                                */
+                               msg->result = FALSE;    /* Default */
+                               if (pasv_bind(clenv, &server, &fdpasv,
+                                           &port)) {
+                                   msg->port = (u_int16_t)port;
+                                   passive = msg->result = TRUE;
+                                   break;
                                }
-                               port = 0;
-                               passive = ongoing = FALSE;
-                               download = TRUE;
-                               list = 0;
-                               *path = '\0';
-                               msg->result = TRUE;
-                               break;
-                           case REQ_STATUS:
-                               msg->port = (u_int16_t)port;
-                               msg->passive = passive;
-                               msg->ongoing = ongoing;
-                               msg->download = download;
-                               msg->list = list;
-                               msg->rbytes = FDBBYTESR(fdb);
-                               msg->wbytes = FDBBYTESW(fdb);
-                               msg->dlfiles = dlfiles;
-                               msg->ulfiles = ulfiles;
+                           } else {
+                               /* Normal PORT, just remember necessary
+                                * things. The caller should have
+                                * performed address and port sanity
+                                * checking already.
+                                */
+                               port = (int)msg->port;
+                               passive = FALSE;
                                msg->result = TRUE;
-                               break;
-                           case REQ_NEWPORT:
-                               port = 0;
-                               if (fd != -1) {
-                                   fdbflushw(fdb);
-                                   shutdown(fd, SHUT_RDWR);
-                                   close(fd);
-                                   fd = -1;
-                                   fdbparam_set(fdb, -1, FDBP_FD);
-                               }
-                               /* As we handle PASV port, do this */
-                               if (fdpasv != -1) {
-                                   close(fdpasv);
-                                   fdpasv = -1;
-                               }
-                               if (msg->passive) {
-                                   /* PASV request, we need to open a high
-                                    * port and start listening for a
-                                    * connection on it
-                                    */
-                                   msg->result = FALSE;        /* Default */
-                                   if (pasv_bind(clenv, &server, &fdpasv,
-                                               &port)) {
-                                       msg->port = (u_int16_t)port;
-                                       passive = msg->result = TRUE;
-                                       break;
-                                   }
-                               } else {
-                                   /* Normal PORT, just remember necessary
-                                    * things. The caller should have
-                                    * performed address and port sanity
-                                    * checking already.
-                                    */
-                                   port = (int)msg->port;
-                                   passive = FALSE;
+                           }
+                           break;
+                       case REQ_TRANSFER:
+                           msg->result = FALSE;        /* Default */
+                           if (port != 0) {
+                               fdbparam_set(fdb, msg->rrate * 1024,
+                                       FDBP_RRATE);
+                               fdbparam_set(fdb, msg->wrate * 1024,
+                                       FDBP_WRATE);
+                               if (msg->path) mm_strncpy(path, msg->path,
+                                       MMPATH_MAX - 1);
+                               else *path = '\0';
+                               if (msg->list) {
+                                   list = msg->list;
                                    msg->result = TRUE;
-                               }
-                               break;
-                           case REQ_TRANSFER:
-                               msg->result = FALSE;    /* Default */
-                               if (port != 0) {
-                                   fdbparam_set(fdb, msg->rrate * 1024,
-                                           FDBP_RRATE);
-                                   fdbparam_set(fdb, msg->wrate * 1024,
-                                           FDBP_WRATE);
-                                   if (msg->path) mm_strncpy(path, msg->path,
-                                           MMPATH_MAX - 1);
-                                   else *path = '\0';
-                                   if (msg->list) {
-                                       list = msg->list;
+                               } else {
+                                   if (msg->file != -1) {
+                                       file = msg->file;
+                                       download = msg->download;
                                        msg->result = TRUE;
-                                   } else {
-                                       if (msg->file != -1) {
-                                           file = msg->file;
-                                           download = msg->download;
-                                           msg->result = TRUE;
-                                       }
                                    }
-                                   if (msg->result)
-                                       state = TTSTATE_CONNECT;
-                               } else
-                                   msg->result = FALSE;
-                               break;
-                           default:
+                               }
+                               if (msg->result)
+                                   state = TTSTATE_CONNECT;
+                           } else
                                msg->result = FALSE;
-                           }
-                           pth_msgport_reply((pth_message_t *)msg);
-                       }
+                           break;
+                       default:
+                           msg->result = FALSE;
                    }
+                   pthread_msg_reply((pthread_msg_t *)msg);
                }
            }
 
@@ -4077,18 +4082,27 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
 
            /* State entry setup */
            ongoing = TRUE;
-           /* Setup our connect timeout event */
-           ring1 = pth_event(PTH_EVENT_TIME, pth_timeout(CONF.DATA_TIMEOUT,
-                       0));
-           pth_event_concat(ring, ring1, NULL);
 
            /* State main loop */
            while (state == TTSTATE_CONNECT) {
                if (passive) {
-                   /* Accept connection from client */
+                   /*
+                    * Accept connection from client.
+                    * On timeout, ETIMEDOUT is returned, on interrupting
+                    * message, ECANCELED is.  Other errror codes could be
+                    * returned, or the new file descriptor.
+                    */
                    addrl = sizeof(struct sockaddr);
-                   fd = pth_accept_ev(fdpasv, (struct sockaddr *)&addr,
-                           &addrl, ring);
+                   if ((err = pthread_accept_ring(fdpasv,
+                                   (struct sockaddr *)&addr,
+                                   &addrl, CONF.DATA_TIMEOUT * 1000,
+                                   &ring)) > -1) {
+                       fd = err;
+                       err = 0;
+                   } else {
+                       fd = -1;
+                       err = errno;
+                   }
                } else {
                    /* Start connection to the client */
                    if ((fd = socket(AF_INET, SOCK_STREAM, 0)) != -1) {
@@ -4096,10 +4110,35 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                        client.sin_family = AF_INET;
                        client.sin_addr.s_addr = clenv->cipaddr;
                        client.sin_port = htons(port);
-                       /* Finally attempt connection */
-                       if ((pth_connect_ev(fd, (struct sockaddr *)&client,
+                       /*
+                        * Finally attempt connection.
+                        * On timeout, ETIMEDOUT is returned, on interrupting
+                        * message, ECANCELED is.  Other error codes could be
+                        * returned, or 0 on success.  In the case of any of
+                        * the two special error codes we just described, the
+                        * connection request is still pending in the kernel.
+                        * We can continue calling the following function
+                        * until we get connected, which can return EISCONN if
+                        * it already was connected, or we could monitor the
+                        * status of the connection using getsockopt(2) with
+                        * SOL_SOCKET level and SO_ERROR option.  If we want
+                        * to abort the in-progress connection, we need to use
+                        * close(2).
+                        */
+                       /*
+                        * XXX Hmm we should loop here! Not create new sockets
+                        * again in the loop!  Unless we wanted to make
+                        * pthread_connect_ring() automatically cancel any
+                        * connection closing the descriptor upon ETIMEDOUT or
+                        * ECANCELED return codes, in which case we could
+                        * continue the current behavior.
+                        */
+                       if ((pthread_connect_ring(fd,
+                                       (struct sockaddr *)&client,
                                        sizeof(struct sockaddr_in),
-                                       ring)) == -1) {
+                                       CONF.DATA_TIMEOUT * 1000,
+                                       &ring)) != 0 && errno != ETIMEDOUT &&
+                                       errno != ECANCELED) {
                            mmsyslog(1, LOGLEVEL,
                                    "%08X PORT data connection failiure",
                                    clenv->id);
@@ -4109,7 +4148,13 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                            REGISTER_ERROR();
                            state = TTSTATE_INITIAL;
                            break;
-                       }
+                       } else if (errno == ETIMEDOUT || errno == ECANCELED) {
+                       /* XXX For now */
+                           (void) close(fd);
+                           fd = -1;
+                           err = ETIMEDOUT;
+                       } else
+                           err = 0;
                    } else {
                        DEBUG_PRINTF("transferthread_main-2", "socket()");
                        reply(clenv->fdb, 425, FALSE,
@@ -4121,7 +4166,7 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                    }
                }
                /* Verify if connection timeout occured */
-               if ((pth_event_occurred(ring1))) {
+               if (err == ETIMEDOUT) {
                    mmsyslog(1, LOGLEVEL, "%08X Data connection timeout",
                            clenv->id);
                    reply(clenv->fdb, 425, FALSE,
@@ -4131,37 +4176,39 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                    state = TTSTATE_INITIAL;
                    break;
                }
-               /* Check for any request events and process them if any */
-               if ((pth_event_occurred(ring))) {
-                   while ((msg = (transfermsg *)pth_msgport_get(
-                                   clenv->sport)) != NULL) {
-                       /* Process request and reply */
-                       switch (msg->request) {
-                       case REQ_QUIT:
-                           msg->result = TRUE;
-                           state = TTSTATE_DONE;
-                           break;
-                       case REQ_ABORT:
-                           msg->result = TRUE;
-                           state = TTSTATE_INITIAL;
-                           break;
-                       case REQ_STATUS:
-                           msg->port = (u_int16_t)port;
-                           msg->passive = passive;
-                           msg->ongoing = ongoing;
-                           msg->download = download;
-                           msg->list = list;
-                           msg->rbytes = FDBBYTESR(fdb);
-                           msg->wbytes = FDBBYTESW(fdb);
-                           msg->dlfiles = dlfiles;
-                           msg->ulfiles = ulfiles;
-                           msg->result = TRUE;
-                           break;
-                       default:
-                           msg->result = FALSE;
-                       }
-                       pth_msgport_reply((pth_message_t *)msg);
+               /*
+                * Check for any request events and process them if any.
+                * ECANCELED above can also cause this code execution to be
+                * triggered fast.
+                */
+               while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
+                       != NULL) {
+                   /* Process request and reply */
+                   switch (msg->request) {
+                   case REQ_QUIT:
+                       msg->result = TRUE;
+                       state = TTSTATE_DONE;
+                       break;
+                   case REQ_ABORT:
+                       msg->result = TRUE;
+                       state = TTSTATE_INITIAL;
+                       break;
+                   case REQ_STATUS:
+                       msg->port = (u_int16_t)port;
+                       msg->passive = passive;
+                       msg->ongoing = ongoing;
+                       msg->download = download;
+                       msg->list = list;
+                       msg->rbytes = FDBBYTESR(fdb);
+                       msg->wbytes = FDBBYTESW(fdb);
+                       msg->dlfiles = dlfiles;
+                       msg->ulfiles = ulfiles;
+                       msg->result = TRUE;
+                       break;
+                   default:
+                       msg->result = FALSE;
                    }
+                   pthread_msg_reply((pthread_msg_t *)msg);
                }
                if (state == TTSTATE_CONNECT) {
                    /* Verify if connection was established */
@@ -4196,8 +4243,6 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
            }
 
            /* State exit cleanup */
-           pth_event_isolate(ring);
-           pth_event_free(ring1, PTH_FREE_ALL);
 
        } else if (state == TTSTATE_TRANSFER) { /* TRANSFER STATE */
 
@@ -4266,7 +4311,7 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                                    TYPE_IMAGE ? fdbrcleartosend(fdb) :
                                    fdbcleartosend(fdb))) {
                            /* Client ready to be written some bytes to */
-                           if ((l = pth_read(file, buffer, T_BUFSIZE)) > 0) {
+                           if ((l = read(file, buffer, T_BUFSIZE)) > 0) {
                                if (clenv->type == TYPE_ASCII) {
                                    /* ASCII xfer, perform \n to \r\n
                                     * conversion
@@ -4326,39 +4371,37 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                            break;
                        }
                        /* Verify if we received any important events */
-                       if ((pth_event_occurred(ring))) {
-                           while ((msg = (transfermsg *)pth_msgport_get(
-                                           clenv->sport)) != NULL) {
-                               /* Process request and reply */
-                               switch (msg->request) {
-                               case REQ_QUIT:
-                                   msg->result = TRUE;
-                                   state = TTSTATE_DONE;
-                                   cont = FALSE;
-                                   break;
-                               case REQ_ABORT:
-                                   msg->result = TRUE;
-                                   state = TTSTATE_INITIAL;
-                                   cont = FALSE;
-                                   reason = TR_ABORTED;
-                                   break;
-                               case REQ_STATUS:
-                                   msg->port = (u_int16_t)port;
-                                   msg->passive = passive;
-                                   msg->ongoing = ongoing;
-                                   msg->download = download;
-                                   msg->list = list;
-                                   msg->rbytes = FDBBYTESR(fdb);
-                                   msg->wbytes = FDBBYTESW(fdb);
-                                   msg->dlfiles = dlfiles;
-                                   msg->ulfiles = ulfiles;
-                                   msg->result = TRUE;
-                                   break;
-                               default:
-                                   msg->result = FALSE;
-                               }
-                               pth_msgport_reply((pth_message_t *)msg);
+                       while ((msg = (transfermsg *)pthread_msg_get(
+                                       &clenv->sport)) != NULL) {
+                           /* Process request and reply */
+                           switch (msg->request) {
+                           case REQ_QUIT:
+                               msg->result = TRUE;
+                               state = TTSTATE_DONE;
+                               cont = FALSE;
+                               break;
+                           case REQ_ABORT:
+                               msg->result = TRUE;
+                               state = TTSTATE_INITIAL;
+                               cont = FALSE;
+                               reason = TR_ABORTED;
+                               break;
+                           case REQ_STATUS:
+                               msg->port = (u_int16_t)port;
+                               msg->passive = passive;
+                               msg->ongoing = ongoing;
+                               msg->download = download;
+                               msg->list = list;
+                               msg->rbytes = FDBBYTESR(fdb);
+                               msg->wbytes = FDBBYTESW(fdb);
+                               msg->dlfiles = dlfiles;
+                               msg->ulfiles = ulfiles;
+                               msg->result = TRUE;
+                               break;
+                           default:
+                               msg->result = FALSE;
                            }
+                           pthread_msg_reply((pthread_msg_t *)msg);
                        }
                    }
 
@@ -4384,9 +4427,9 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                                        if (*from == '\r') {
                                            if (treesize_edit(clenv,
                                                        from - from2)) {
-                                               if ((l = pth_write(file,
-                                                       from2, from - from2))
-                                                       != from - from2) {
+                                               if ((l = write(file, from2,
+                                                   from - from2)) !=
+                                                       from - from2) {
                                                    mmsyslog(0, LOGLEVEL,
                                                            "%08X Error "
                                                            "writing to "
@@ -4413,9 +4456,8 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                                    }
                                    if (l && from2 < to) {
                                        if (treesize_edit(clenv, to - from2)) {
-                                           if ((l = pth_write(file, from2,
-                                                           to - from2))
-                                                   != to - from2) {
+                                           if ((l = write(file, from2,
+                                               to - from2)) != to - from2) {
                                                mmsyslog(0, LOGLEVEL,
                                                        "%08X Error writing "
                                                        "to disk!",
@@ -4442,8 +4484,7 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                                    /* IMAGE/binary transfer, write data as-is
                                     */
                                    if (treesize_edit(clenv, l)) {
-                                       if ((pth_write(file, buffer, l)) !=
-                                               l) {
+                                       if ((write(file, buffer, l)) != l) {
                                            mmsyslog(0, LOGLEVEL,
                                                    "%08X Error writing to "
                                                    "disk!", clenv->id);
@@ -4481,39 +4522,37 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                            break;
                        }
                        /* Verify if we received any important events */
-                       if ((pth_event_occurred(ring))) {
-                           while ((msg = (transfermsg *)pth_msgport_get(
-                                           clenv->sport)) != NULL) {
-                               /* Process request and reply */
-                               switch (msg->request) {
-                               case REQ_QUIT:
-                                   msg->result = TRUE;
-                                   state = TTSTATE_DONE;
-                                   cont = FALSE;
-                                   break;
-                               case REQ_ABORT:
-                                   msg->result = TRUE;
-                                   state = TTSTATE_INITIAL;
-                                   cont = FALSE;
-                                   reason = TR_ABORTED;
-                                   break;
-                               case REQ_STATUS:
-                                   msg->port = (u_int16_t)port;
-                                   msg->passive = passive;
-                                   msg->ongoing = ongoing;
-                                   msg->download = download;
-                                   msg->list = list;
-                                   msg->rbytes = FDBBYTESR(fdb);
-                                   msg->wbytes = FDBBYTESW(fdb);
-                                   msg->dlfiles = dlfiles;
-                                   msg->ulfiles = ulfiles;
-                                   msg->result = TRUE;
-                                   break;
-                               default:
-                                   msg->result = FALSE;
-                               }
-                               pth_msgport_reply((pth_message_t *)msg);
+                       while ((msg = (transfermsg *)pthread_msg_get(
+                                           &clenv->sport)) != NULL) {
+                           /* Process request and reply */
+                           switch (msg->request) {
+                           case REQ_QUIT:
+                               msg->result = TRUE;
+                               state = TTSTATE_DONE;
+                               cont = FALSE;
+                               break;
+                           case REQ_ABORT:
+                               msg->result = TRUE;
+                               state = TTSTATE_INITIAL;
+                               cont = FALSE;
+                               reason = TR_ABORTED;
+                               break;
+                           case REQ_STATUS:
+                               msg->port = (u_int16_t)port;
+                               msg->passive = passive;
+                               msg->ongoing = ongoing;
+                               msg->download = download;
+                               msg->list = list;
+                               msg->rbytes = FDBBYTESR(fdb);
+                               msg->wbytes = FDBBYTESW(fdb);
+                               msg->dlfiles = dlfiles;
+                               msg->ulfiles = ulfiles;
+                               msg->result = TRUE;
+                               break;
+                           default:
+                               msg->result = FALSE;
                            }
+                           pthread_msg_reply((pthread_msg_t *)msg);
                        }
                    }
                }
@@ -4544,80 +4583,72 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
     }
     if (fdpasv != -1) close(fdpasv);
 
-    pth_event_free(ring, PTH_FREE_ALL);
+    pthread_ring_destroy(&ring);
 }
 
 
 /* mmfd library thread support functions */
 
+static pthread_mutexattr_t     thread_ma;
+
+static void
+thread_init(void)
+{
+    (void) pthread_mutexattr_init(&thread_ma);
+    (void) pthread_mutexattr_settype(&thread_ma, PTHREAD_MUTEX_RECURSIVE);
+}
 
 static void *
-_pth_mutex_create(void)
+thread_mutex_create(void)
 {
     struct mutexnode *mnod;
 
-    pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+    pthread_mutex_lock(&mutexes_lock);
     mnod = (struct mutexnode *)pool_alloc(&mutexes_pool, FALSE);
-    pth_mutex_release(&mutexes_lock);
+    pthread_mutex_unlock(&mutexes_lock);
 
-    if (mnod)
-       pth_mutex_init(&mnod->mutex);
+    if (mnod != NULL)
+       pthread_mutex_init(&mnod->mutex, &thread_ma);
 
     return ((void *)mnod);
 }
 
 
 static void *
-_pth_mutex_destroy(void *mtx)
+thread_mutex_destroy(void *mtx)
 {
-    /* struct mutexnode *mnod = mtx; */
+    struct mutexnode *mnod = mtx;
 
-    /* pth_mutex_destroy(&mnod->mutex); */
-    pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+    pthread_mutex_destroy(&mnod->mutex);
+
+    pthread_mutex_lock(&mutexes_lock);
     pool_free(mtx);
-    pth_mutex_release(&mutexes_lock);
+    pthread_mutex_unlock(&mutexes_lock);
 
     return (NULL);
 }
 
 
 static void
-_pth_mutex_lock(void *mtx)
+thread_mutex_lock(void *mtx)
 {
     struct mutexnode *mnod = mtx;
 
-    pth_mutex_acquire(&mnod->mutex, FALSE, NULL);
+    pthread_mutex_lock(&mnod->mutex);
 }
 
 
 static void
-_pth_mutex_unlock(void *mtx)
+thread_mutex_unlock(void *mtx)
 {
     struct mutexnode *mnod = mtx;
 
-    pth_mutex_release(&mnod->mutex);
-}
-
-
-static void
-_pth_thread_yield(void)
-{
-    pth_yield(NULL);
-}
-
-
-static bool
-_pth_eintr(void)
-{
-    if (errno == EINTR)
-       return TRUE;
-
-    return FALSE;
+    pthread_mutex_unlock(&mnod->mutex);
 }
 
 
 static bool
-eintr(void)
+thread_eintr(void)
 {
     if (errno == EINTR)
        return TRUE;
@@ -4686,7 +4717,7 @@ async_getuserline(struct async_msg *msg)
            NULL,
            NULL,
            NULL,
-           eintr
+           thread_eintr        /* not thread-specific */
        };
 
        getuserline_fdb = fdbopen(&fdf, NULL, -1, FDB_BUFSIZE, 0, 0, 0,
@@ -4774,7 +4805,7 @@ treesize(clientenv *clenv, const char *path, off_t min)
     found = FALSE;
     size = 0;
     len = mm_strlen(path) + 1;
-    pth_mutex_acquire(&quota_lock, FALSE, NULL);
+    pthread_mutex_lock(&quota_lock);
     if ((qnod = (quotanode *)hashtable_lookup(&quota_table, path, len))
            != NULL) {
        struct stat st;
@@ -4792,7 +4823,7 @@ treesize(clientenv *clenv, const char *path, off_t min)
            }
        }
     }
-    pth_mutex_release(&quota_lock);
+    pthread_mutex_unlock(&quota_lock);
 
     if (found)
        return (size);
index a3dacd4..66ee9cf 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmftpd.h,v 1.25 2004/09/19 10:59:08 mmondor Exp $ */
+/* $Id: mmftpd.h,v 1.26 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
@@ -43,7 +43,7 @@
 /* HEADERS */
 
 #include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmfd.h>
 #include <mmpath.h>
 #include <mmfifo.h>
 #include <mmstat.h>
 #include <mmserver.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_sleep.h>
 
 
 
@@ -62,7 +65,7 @@
 /* DEFINITIONS */
 
 #define DAEMON_NAME            "mmftpd"
-#define DAEMON_VERSION         "0.0.18/mmondor"
+#define DAEMON_VERSION         "0.1.0/mmondor"
 
 /* Transfer buffer size */
 #define T_BUFSIZE              16384
@@ -132,7 +135,7 @@ enum transferthread_states {
 #define REGISTER_ERROR() do { \
     clenv->errors++; \
     if (CONF.DELAY_ON_ERROR) \
-       pth_sleep(clenv->errors); \
+       pthread_sleep(clenv->errors); \
 } while (FALSE)
 
 #define REGISTER_PERMISSION() do { \
@@ -169,7 +172,7 @@ typedef struct config {
 
 /* We communicate with our transfer thread using this message structure */
 typedef struct transfermsg {
-    pth_message_t msg;           /* Internal message node */
+    pthread_msg_t msg;           /* Internal message node */
     int request;                 /* Request to send to device */
     bool result;                 /* Result status after device request */
     u_int16_t port;              /* PORT or PASV port for next transfer */
@@ -208,9 +211,9 @@ typedef struct clientenv {
     struct lusernode *unode;     /* User's lusernode, for quick lookup */
     char cwd[MMPATH_MAX];        /* User's current directory (chrooted) */
     struct transfermsg tmsg;     /* Only need one shared msg with tthread */
-    pth_msgport_t rport, sport;   /* Message ports (tthread and ours) */
-    pth_event_t rring;           /* Ring of events we wait for */
-    pth_t tthread;               /* Thread ID of our transfer thread */
+    pthread_port_t rport, sport;  /* Message ports (tthread and ours) */
+    pthread_ring_t rring;        /* Ring of events we wait for */
+    pthread_t tthread;           /* Thread ID of our transfer thread */
     long errors;                 /* Total number of errors that occured */
     long attempts;               /* Auth state temporary counter */
     long maxlogins;              /* Allowed simultanious logins for user */
@@ -242,7 +245,7 @@ typedef struct clientenv {
 typedef struct lusernode {
     hashnode_t node;
     char user[32];             /* User name (key) */
-    pth_mutex_t lock;          /* For treesize_edit() */
+    pthread_mutex_t lock;      /* For treesize_edit() */
     long logins;               /* Simultanious current logins for user */
     off_t homesize;            /* Current home directory size for user */
 } lusernode;
@@ -269,7 +272,7 @@ struct commandnode {
 /* Used for mmfd thread support delegation/abstraction */
 struct mutexnode {
     pnode_t node;
-    pth_mutex_t mutex;
+    pthread_mutex_t mutex;
 };
 
 /* Used to efficiently allocate FIFO buffers for recently visited directories
@@ -420,16 +423,15 @@ static int handleclient(unsigned long, int, clientlistnode *, struct iface *,
        struct async_clenv *);
 
 static bool pasv_bind(clientenv *, struct sockaddr_in *, int *, int *);
-static void transferthread(struct thread_object *, void *);
+static void transferthread(pthread_object_t *, void *);
 static void transferthread_main(clientenv *, fdbuf *);
 
-static void *_pth_mutex_create(void);
-static void *_pth_mutex_destroy(void *);
-static void _pth_mutex_lock(void *);
-static void _pth_mutex_unlock(void *);
-static void _pth_thread_yield(void);
-static bool _pth_eintr(void);
-static bool eintr(void);
+static void thread_init(void);
+static void *thread_mutex_create(void);
+static void *thread_mutex_destroy(void *);
+static void thread_mutex_lock(void *);
+static void thread_mutex_unlock(void *);
+static bool thread_eintr(void);
 
 static void async_checkpw(struct async_msg *);
 static void async_treesize(struct async_msg *);
index 7dd9305..d6b420f 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmreadcfg.c,v 1.20 2006/06/13 17:17:02 mmondor Exp $ */
+/* $Id: mmreadcfg.c,v 1.21 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 1991-2004, Matthew Mondor
@@ -61,7 +61,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 1991-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmreadcfg.c,v 1.20 2006/06/13 17:17:02 mmondor Exp $");
+MMRCSID("$Id: mmreadcfg.c,v 1.21 2007/03/13 20:28:22 mmondor Exp $");
 
 
 
index cc5e251..659d020 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmserver.c,v 1.34 2005/02/20 01:18:01 mmondor Exp $ */
+/* $Id: mmserver.c,v 1.35 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2000-2004, Matthew Mondor
@@ -45,6 +45,8 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <stdio.h>
+#include <string.h>
+#include <errno.h>
 
 #include <netdb.h>
 #include <netinet/in.h>
@@ -58,7 +60,7 @@
 
 #include <syslog.h>
 
-#include <pth.h>
+#include <pthread.h>
 #include <signal.h>
 #include <sys/poll.h>
 #include <time.h>
 #include <mmreadcfg.h>
 #include <mmlimitrate.h>
 #include <mmlog.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_pool.h>
+#include <mm_pthread_msg.h>
+#include <mm_pthread_poll.h>
+#include <mm_pthread_sleep.h>
 
 
 
 
 MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmserver.c,v 1.34 2005/02/20 01:18:01 mmondor Exp $");
+MMRCSID("$Id: mmserver.c,v 1.35 2007/03/13 20:28:22 mmondor Exp $");
 
 
 
@@ -94,7 +100,7 @@ MMRCSID("$Id: mmserver.c,v 1.34 2005/02/20 01:18:01 mmondor Exp $");
 
 static void                    sighandler(int);
 static struct iface *          parse_ifaces(char *, char *);
-static void                    phandleclient(struct thread_object *, void *);
+static void                    phandleclient(pthread_object_t *, void *);
 static void                    writepidfile(const char *);
 static void *                  async_thread(void *);
 static struct async_clenv *    async_open_clenv(void);
@@ -121,7 +127,8 @@ static bool                         clnode_expire_thread_iterator(hashnode_t *,
 static int             (*handleclientfunc)(unsigned long, int,
                                clientlistnode *, struct iface *,
                                struct async_clenv *);
-static pth_mutex_t     ctable_lock, ppool_lock;
+static pthread_mutex_t ctable_lock = PTHREAD_MUTEX_INITIALIZER,
+                       ppool_lock = PTHREAD_MUTEX_INITIALIZER;
 static pool_t          cpool, ppool;
 static hashtable_t     ctable;
 static struct async_env *async = NULL;
@@ -147,7 +154,7 @@ const char *const mms_reasons[MMS_MAX] = {
 
 /* MAIN DAEMON PROGRAM */
 
-/* Before calling this function, the async_init() and async_init_pth()
+/* Before calling this function, the async_init() and async_init_pthread()
  * functions should have been called. Before any async_*() or tcp_server()
  * calls the process is expected to have called make_daemon().
  */
@@ -168,9 +175,9 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
     struct ifacendx *fdsi;
     struct iface *ifaces, *tif;
     unsigned long id;
-    pth_t clnode_thread;
-    pth_attr_t threadattr, clnode_threadattr;
-    int tcp_proto;
+    pthread_t clnode_thread;
+    pthread_attr_t threadattr, clnode_threadattr;
+    int tcp_proto, clnode_thread_started, err;
 
     sinaddr = (struct sockaddr_in *)&addr;
     handleclientfunc = handleclient1;
@@ -180,14 +187,13 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
     fds = NULL;
     fdsi = NULL;
     ifaces = NULL;
+    clnode_thread_started = -1;
 
-    /* Pth related */
-    threadattr = pth_attr_new();
-    pth_attr_set(threadattr, PTH_ATTR_JOINABLE, FALSE);
-    clnode_threadattr = pth_attr_new();
-    pth_attr_set(clnode_threadattr, PTH_ATTR_JOINABLE, TRUE);
-    pth_mutex_init(&ctable_lock);
-    pth_mutex_init(&ppool_lock);
+    /* Pthread related */
+    pthread_attr_init(&threadattr);
+    pthread_attr_setdetachstate(&threadattr, 1);
+    pthread_attr_init(&clnode_threadattr);
+    pthread_attr_setdetachstate(&clnode_threadattr, 0);
 
     /* Used by resolve_hostname() */
     if (!mmstrinit(malloc, free, 16384)) {
@@ -209,8 +215,8 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                sizeof(phandleinfo), 16384 / sizeof(phandleinfo), 0, 0) ||
            !hashtable_init(&ctable, "ctable", maxips, 1, malloc, free,
                clnode_keycmp, clnode_keyhash, FALSE) ||
-           ((clnode_thread = pth_spawn(clnode_threadattr,
-                               clnode_expire_thread, &rateper)) == NULL)) {
+           ((clnode_thread_started = pthread_create(&clnode_thread,
+               &clnode_threadattr, clnode_expire_thread, &rateper))) != 0) {
        syslog(LOG_NOTICE, "tcp_server() - Out of memory");
        closelog();
        mmstrexit();
@@ -218,8 +224,9 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
        exit(-1);
     }
 
-    if (thread_object_init() == -1) {
-       syslog(LOG_NOTICE, "tcp_server() - thread_object_init()");
+    if ((err = pthread_object_init(8)) != 0) {
+       syslog(LOG_NOTICE, "tcp_server() - pthread_object_init() - %s",
+               strerror(err));
        exit(-1);
     }
 
@@ -343,9 +350,9 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
     if (ret) {
        free(ifaces);
        mmstrexit();
-       if (clnode_thread != NULL) {
-           pth_abort(clnode_thread);
-           pth_join(clnode_thread, NULL);
+       if (clnode_thread_started == 0) {
+           pthread_cancel(clnode_thread);
+           pthread_join(clnode_thread, NULL);
        }
        if (HASHTABLE_VALID(&ctable))
            hashtable_destroy(&ctable, FALSE);
@@ -359,7 +366,7 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
     /* Start answering and dispatching connections */
     syslog(LOG_NOTICE, "Started for uid: %d", uid);
     for (;;) {
-       if ((nifaces2 = pth_poll(fds, nifaces, -1)) > 0) {
+       if ((nifaces2 = poll(fds, nifaces, -1)) > 0) {
            /* Loop but once only, and only for long as nifaces2 times.
             * Use our fast index to reference to the interface structure.
             */
@@ -367,14 +374,13 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                if (fds[i].revents & POLLIN) {
                    nifaces2--;
                    addrl = sizeof(struct sockaddr);
-                   if ((msgsock = pth_accept(fds[i].fd, &addr, &addrl))
-                           != -1) {
+                   if ((msgsock = accept(fds[i].fd, &addr, &addrl)) != -1) {
                        /* Make sure that we respect connection and rate
                         * limits.
                         */
                        ok = FALSE;
                        reason = MMS_NORMAL;
-                       pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+                       pthread_mutex_lock(&ctable_lock);
                        if ((clnode = (clientlistnode *)hashtable_lookup(
                                        &ctable, &sinaddr->sin_addr.s_addr,
                                        sizeof(u_int32_t))) == NULL) {
@@ -415,15 +421,15 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                            } else
                                reason = MMS_CONPERADDRESS_EXCEEDED;
                        }
-                       pth_mutex_release(&ctable_lock);
+                       pthread_mutex_unlock(&ctable_lock);
 
                        if (ok) {
                            phandleinfo *phi;
 
                            /* Dispatch client to a thread */
-                           pth_mutex_acquire(&ppool_lock, FALSE, NULL);
+                           pthread_mutex_lock(&ppool_lock);
                            phi = (phandleinfo *)pool_alloc(&ppool, FALSE);
-                           pth_mutex_release(&ppool_lock);
+                           pthread_mutex_unlock(&ppool_lock);
                            if (phi != NULL) {
                                id++;
                                phi->id = id;
@@ -431,15 +437,14 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                                phi->clnode = clnode;
                                /* Indexed reference */
                                phi->iface = fdsi[i].iface;
-                               if (thread_object_call(NULL, phandleclient,
-                                           phi) != -1)
+                               if (pthread_object_call(NULL, phandleclient,
+                                           phi) == 0)
                                    clnode->connections++;
                                else {
                                    if (phi != NULL) {
-                                       pth_mutex_acquire(&ppool_lock, FALSE,
-                                               NULL);
+                                       pthread_mutex_lock(&ppool_lock);
                                        pool_free((pnode_t *)phi);
-                                       pth_mutex_release(&ppool_lock);
+                                       pthread_mutex_unlock(&ppool_lock);
                                    }
                                    syslog(LOG_NOTICE, "tcp_server() - "
                                            "Error launching thread");
@@ -453,7 +458,7 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                            /* Close down connection with client
                             * immediately, sending message.
                             */
-                           (void) pth_write(msgsock, message, msglen);
+                           (void) write(msgsock, message, msglen);
                            shutdown(msgsock, 2);
                            close(msgsock);
                            mm_strcpy(ipaddr, "0.0.0.0");
@@ -463,11 +468,8 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                                syslog(LOG_NOTICE, "tcp_server() - [%s] - %s",
                                        ipaddr, MMS_RSTRING(reason));
                        }
-
-                       pth_mutex_release(&ctable_lock);
-
                    } else
-                       syslog(LOG_NOTICE, "tcp_server() - pth_accept()");
+                       syslog(LOG_NOTICE, "tcp_server() - accept()");
                }
            }
        }
@@ -475,16 +477,11 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
 
     free(ifaces);
     mmstrexit();
-    pth_abort(clnode_thread);
-    pth_join(clnode_thread, NULL);
+    pthread_cancel(clnode_thread);
+    pthread_join(clnode_thread, NULL);
     hashtable_destroy(&ctable, FALSE);
     pool_destroy(&ppool);
     pool_destroy(&cpool);
-    /* Unrequired for Pth (non-existing even)
-     * pth_mutex_destroy(&apool_lock);
-     * pth_mutex_destroy(&ctable_lock);
-     * pth_mutex_destroy(&ppool_lock);
-     */
 
     exit(ret);
 }
@@ -566,6 +563,7 @@ make_daemon(const char *pidfile, const char *jail)
            signal(SIGTTIN, SIG_IGN);
            signal(SIGTSTP, SIG_IGN);
            signal(SIGPIPE, SIG_IGN);
+           signal(SIGUSR2, SIG_IGN);
 
            umask(0);
 
@@ -651,11 +649,11 @@ parse_ifaces(char *hostnames, char *addresses)
 
 /* SERVER (THREAD) */
 
-/* This is started by pth_spawn(), it calls the user handler function, which
- * only has to care about serving the client.
+/* This is started by pthread_create(), it calls the user handler function,
+ * which only has to care about serving the client.
  */
 static void
-phandleclient(struct thread_object *obj, void *args)
+phandleclient(pthread_object_t *obj, void *args)
 {
     phandleinfo *phi;
     int socket, ret;
@@ -671,9 +669,9 @@ phandleclient(struct thread_object *obj, void *args)
     socket = phi->socket;
     clnode = phi->clnode;
     iface = phi->iface;
-    pth_mutex_acquire(&ppool_lock, FALSE, NULL);
+    pthread_mutex_lock(&ppool_lock);
     pool_free((pnode_t *)phi);
-    pth_mutex_release(&ppool_lock);
+    pthread_mutex_unlock(&ppool_lock);
 
     ret = 0;
 
@@ -691,13 +689,13 @@ phandleclient(struct thread_object *obj, void *args)
            }
            if (tmp) {
                /* Lock the mutex for a very short moment */
-               pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+               pthread_mutex_lock(&ctable_lock);
                /* Verify again for NULL to avoid a potential memory leak */
                if(clnode->hostname == NULL)
                    clnode->hostname = tmp;
                else
                    tmp = mmstrfree(tmp);
-               pth_mutex_release(&ctable_lock);
+               pthread_mutex_unlock(&ctable_lock);
            }
        }
 
@@ -719,10 +717,10 @@ phandleclient(struct thread_object *obj, void *args)
     /* Decrease our connection/refcount counter, and let our ctable thread
      * decide if/when to uncache our node.
      */
-    pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+    pthread_mutex_lock(&ctable_lock);
     if (clnode->connections > 0)
        clnode->connections--;
-    pth_mutex_release(&ctable_lock);
+    pthread_mutex_unlock(&ctable_lock);
 
     /* kthxbye :) */
 }
@@ -791,7 +789,7 @@ async_thread(void *args)
        /* Fatal error */
        syslog(LOG_NOTICE, "async_thread() -  calloc(%d)",
                idx_size + pfd_size);
-       pth_exit(NULL);
+       pthread_exit(NULL);
     }
     i = 0;
     DLIST_FOREACH(freeprocs, proc) {
@@ -806,16 +804,14 @@ async_thread(void *args)
 
        /* First check for new messages and queue them in our queue list if
         * any, but only wait for them if all processes are free (we don't
-        * expect any results). Special note: Pth will only notify event for
-        * a port if it received a message while it was empty.
+        * expect any results).
         */
-       if ((pth_msgport_pending(async->port))) {
-           while ((omsg = (char *)pth_msgport_get(async->port)) != NULL)
-               DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
-       } else if (freeprocs->nodes == async->nprocs) {
-           pth_wait(async->ring);
-           while ((omsg = (char *)pth_msgport_get(async->port)) != NULL)
-               DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
+       while ((omsg = (char *)pthread_msg_get(&async->port)) != NULL)
+           DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
+       if (DLIST_NODES(freeprocs) == async->nprocs &&
+           DLIST_NODES(&queue) == 0) {
+           pthread_ring_wait(&async->ring, NULL);
+           continue;
        }
 
        /* Verify for any available processes to dispatch requests to, and
@@ -832,7 +828,7 @@ async_thread(void *args)
                if (DEBUG_TRUE(m->func_id < async->nfuncs)) {
                    len = async->funcs[m->func_id].msg_len;
                    idx[p->sock].aclenv = m->aclenv;
-                   if ((pth_send(p->sock, m, len, 0)) == len)
+                   if ((send(p->sock, m, len, 0)) == len)
                        DLIST_UNLINK(freeprocs, (node_t *)p);
                    else
                        syslog(LOG_NOTICE, "async_thread() - send(%d:%d)",
@@ -845,57 +841,56 @@ async_thread(void *args)
        }
 
        /* Wait for results from our async processes via their socket,
-        * but be sure to break when new Pth messages are available.
+        * but be sure to break when new inter-thread messages are available.
         * If new results are obtained, reply back to the caller thread.
+        * If all processes are free, go back to pthread_ring_wait()
+        * instead of pthread_poll_ring().
         */
-       for (;;) {
-          int selr, i;
-
-          if ((selr = pth_poll_ev(pfds, nfds, -1, async->ring)) > 0) {
-              for (i = 0; selr > 0 && i < nfds; i++) {
-                  if (pfds[i].revents & POLLERR) {
-                      /* If this happens something is really wrong, exit */
-                      syslog(LOG_NOTICE, "async_thread() - POLLERR(%d)",
-                              pfds[i].fd);
-                      kill(0, SIGTERM);
-                      for (;;)
-                          sleep(30);
-                  }
-                  if (pfds[i].revents & POLLIN) {
-                      int fd = pfds[i].fd;
-                      size_t len;
-                      struct async_proc *p = idx[fd].aproc;
-                      struct async_clenv *e = idx[fd].aclenv;
-
-                      /* Results, reply message back to client thread and
-                       * return this process node in the free pool.
-                       */
-                      if ((len = pth_recv(fd, e->msg, async->msg_len,
-                                      MSG_WAITALL)) <
-                              sizeof(struct async_msg))
-                          syslog(LOG_NOTICE, "async_thread() - recv(%d:%d)",
-                                  fd, (int)async->msg_len);
-                      pth_msgport_reply(&(e->msg->msg));
-                      DLIST_APPEND(freeprocs, (node_t *)p);
-                      selr--;
-                  }
-              }
-          }
-          if ((pth_event_occurred(async->ring)))
-              /* Other requests we must queue immediately */
-              break;
+       while (DLIST_NODES(freeprocs) < async->nprocs) {
+           int selr, i;
+
+           selr = pthread_poll_ring(pfds, nfds, -1, &async->ring);
+           if (selr == -1 && errno == ECANCELED)
+               break;
+           for (i = 0; selr > 0 && i < nfds; i++) {
+               if (pfds[i].revents & POLLERR) {
+                   /* If this happens something is really wrong, exit */
+                   syslog(LOG_NOTICE, "async_thread() - POLLERR(%d)",
+                           pfds[i].fd);
+                   kill(0, SIGTERM);
+                   for (;;)
+                       pthread_sleep(30);
+               }
+               if (pfds[i].revents & POLLIN) {
+                   int                 fd = pfds[i].fd;
+                   size_t              len;
+                   struct async_proc   *p = idx[fd].aproc;
+                   struct async_clenv  *e = idx[fd].aclenv;
+
+                   /* Results, reply message back to client thread and
+                    * return this process node in the free pool.
+                    */
+                   if ((len = recv(fd, e->msg, async->msg_len,
+                       MSG_WAITALL)) < sizeof(struct async_msg))
+                       syslog(LOG_NOTICE, "async_thread() - recv(%d:%d)",
+                               fd, (int)async->msg_len);
+                   pthread_msg_reply(&(e->msg->msg));
+                   DLIST_APPEND(freeprocs, (node_t *)p);
+                   selr--;
+               }
+           }
        }
     }
 
     /* NOTREACHED */
-    pth_exit(NULL);
+    pthread_exit(NULL);
     return (NULL);
 }
 
 
 /* This function should be called at early application startup to setup
- * the asynchroneous pool of processes. async_init_pth() call may then
- * be called later on after pth_init() to initialize the client-side
+ * the asynchroneous pool of processes. async_init_pthread() call may then
+ * be called later on after pthread_init() to initialize the client-side
  * asynchroneous context (required before using async_open_clenv()).
  * This function is assumed to be called once only, and it's effects to
  * only be discarded when the main process exits.
@@ -972,67 +967,65 @@ async_init(struct async_func *funcs, int procs, uid_t uid, gid_t *gids,
 }
 
 
-/* This function is used by an application after pth_init() to initialize
+/* This function is used by an application after pthread_init() to initialize
  * the asynchroneous thread device/server, which will dispatch requests to
  * the asynchroneous pool of processes in a thread-safe manner (without
  * blocking the whole process. async_init() should have been called first.
  */
 bool
-async_init_pth(void)
+async_init_pthread(void)
 {
     size_t env_len;
     bool res = FALSE;
 
     if (DEBUG_TRUE(async)) {
 
+       pthread_poll_init();
+
        res = TRUE;
-       pth_mutex_init(&async->apool_lock);
+       pthread_mutex_init(&async->apool_lock, NULL);
        /* Setup our fast-allocation pool for client-side async environments */
        env_len = (size_t)OALIGN_CEIL(sizeof(struct async_clenv), long);
        if (!pool_init(&async->apool, "asyncenv_pool", malloc, free,
                    aclenv_constructor, aclenv_destructor,
                    env_len + async->msg_len,
                    16384 / (env_len + async->msg_len), 0, 0)) {
-           syslog(LOG_NOTICE, "async_init_pth() - pool_init(apool)");
+           syslog(LOG_NOTICE, "async_init_pthread() - pool_init(apool)");
            res = FALSE;
        }
 
        if (res) {
-           pth_attr_t attrs;
+           pthread_attr_t      attrs;
+           pthread_t           thread;
 
            res = FALSE;
-           if ((attrs = pth_attr_new()) != NULL) {
-               /* Start async slave thread device */
-               if ((async->port = pth_msgport_create("mms_async_server"))) {
-                   if ((async->ring = pth_event(PTH_EVENT_MSG,
-                                   async->port))) {
-                       pth_attr_set(attrs, PTH_ATTR_JOINABLE, FALSE);
-                       if ((pth_spawn(attrs, async_thread, NULL)) != NULL)
-                           res = TRUE;
-                       pth_attr_destroy(attrs);
-                   } else
-                       syslog(LOG_NOTICE, "async_init_pth() - pth_event()");
+           /* Start async slave thread device */
+           if (pthread_port_init(&async->port) == 0) {
+               if (pthread_ring_init(&async->ring) == 0) {
+                   pthread_port_set_ring(&async->port, &async->ring);
+                   pthread_attr_init(&attrs);
+                   pthread_attr_setdetachstate(&attrs, 1);
+                   if ((pthread_create(&thread, &attrs, async_thread, NULL))
+                           == 0)
+                       res = TRUE;
                } else
-                   syslog(LOG_NOTICE,
-                           "async_init_pth() - pth_msgport_create()");
-               pth_attr_destroy(attrs);
-           }
+                   syslog(LOG_NOTICE, "async_init_pth() - pth_event()");
+           } else
+               syslog(LOG_NOTICE,
+                       "async_init_pth() - pth_msgport_create()");
+           pthread_attr_destroy(&attrs);
        }
 
        if (!res) {
-           if (async->ring) {
-               pth_event_free(async->ring, PTH_FREE_ALL);
-               async->ring = NULL;
-           }
-           if (async->port) {
-               pth_msgport_destroy(async->port);
-               async->port = NULL;
-           }
+           if (pthread_port_valid(&async->port))
+               pthread_port_destroy(&async->port);
+           if (pthread_ring_valid(&async->ring))
+               pthread_ring_destroy(&async->ring);
            pool_destroy(&async->apool);
        }
 
     } else
-       DEBUG_PRINTF("async_init_pth", "Call async_init() first");
+       DEBUG_PRINTF("async_init_pthread", "Call async_init() first");
 
     return (res);
 }
@@ -1040,7 +1033,7 @@ async_init_pth(void)
 
 /* Used by a client thread to initialize a context suitable to call the
  * async_call() function with. Each thread needs one to use the device.
- * async_init() and async_init_pth() should have been called first.
+ * async_init() and async_init_pthread() should have been called first.
  */
 static struct async_clenv *
 async_open_clenv(void)
@@ -1049,9 +1042,9 @@ async_open_clenv(void)
 
     if (DEBUG_TRUE(async != NULL && POOL_VALID(&async->apool))) {
        /* Optimized for speed using a pool of pre-allocated nodes */
-       pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
+       pthread_mutex_lock(&async->apool_lock);
        aclenv = (struct async_clenv *)pool_alloc(&async->apool, FALSE);
-       pth_mutex_release(&async->apool_lock);
+       pthread_mutex_unlock(&async->apool_lock);
     } else
        DEBUG_PRINTF("async_open_clenv",
                "Call async_init() and async_init_pth() first");
@@ -1065,9 +1058,9 @@ async_open_clenv(void)
 static struct async_clenv *
 async_close_clenv(struct async_clenv *aclenv)
 {
-    pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
+    pthread_mutex_lock(&async->apool_lock);
     pool_free((pnode_t *)aclenv);
-    pth_mutex_release(&async->apool_lock);
+    pthread_mutex_unlock(&async->apool_lock);
 
     return (NULL);
 }
@@ -1076,18 +1069,18 @@ static bool aclenv_constructor(pnode_t *pnode)
 {
     struct async_clenv *aclenv = (struct async_clenv *)pnode;
 
-    if ((aclenv->port = pth_msgport_create("XXX(NULL)")) != NULL) {
-       if ((aclenv->ring = pth_event(PTH_EVENT_MSG, aclenv->port))) {
-           register char *ptr = (char *)aclenv;
+    if (pthread_port_init(&aclenv->port) == 0) {
+       if (pthread_ring_init(&aclenv->ring) == 0) {
+           register char       *ptr = (char *)aclenv;
 
            ptr += (size_t)OALIGN_CEIL(sizeof(struct async_clenv), long);
            aclenv->msg = (struct async_msg *)ptr;
-           aclenv->msg->msg.m_replyport = aclenv->port;
-           aclenv->msg->msg.m_size = async->msg_len;
+           pthread_msg_init(&aclenv->msg->msg, &aclenv->port);
+           pthread_port_set_ring(&aclenv->port, &aclenv->ring);
 
            return TRUE;
        }
-       pth_msgport_destroy(aclenv->port);
+       pthread_port_destroy(&aclenv->port);
     }
 
     return FALSE;
@@ -1097,8 +1090,9 @@ static void aclenv_destructor(pnode_t *pnode)
 {
     struct async_clenv *aclenv = (struct async_clenv *)pnode;
 
-    pth_event_free(aclenv->ring, PTH_FREE_ALL);
-    pth_msgport_destroy(aclenv->port);
+    pthread_msg_destroy(&aclenv->msg->msg);
+    pthread_port_destroy(&aclenv->port);
+    pthread_ring_destroy(&aclenv->ring);
 }
 
 
@@ -1114,7 +1108,7 @@ async_call(struct async_clenv *aclenv, unsigned int function)
     /* Send request */
     aclenv->msg->func_id = function;
     aclenv->msg->aclenv = aclenv;
-    pth_msgport_put(async->port, &(aclenv->msg->msg));
+    pthread_msg_put(&async->port, &(aclenv->msg->msg));
 
     /* Sleep until we get reply back, in a thread-safe manner.
      * Note: This will permanently block the current thread if the
@@ -1131,15 +1125,8 @@ async_call(struct async_clenv *aclenv, unsigned int function)
      * we get a reply back seems a good choice here; The async functions
      * should ensure to eventually return.
      */
-    for (;;) {
-       if ((pth_wait(aclenv->ring))) {
-           if ((pth_event_occurred(aclenv->ring))) {
-               /* We know that message pointer will be the same */
-               if ((pth_msgport_get(aclenv->port)))
-                   break;
-           }
-       }
-    }
+    while (pthread_msg_get(&aclenv->port) == NULL)
+       pthread_ring_wait(&aclenv->ring, NULL);
 }
 
 
@@ -1356,7 +1343,7 @@ clnode_expire_thread(void *args)
     data.cnt = 0;
     for (;;) {
        /* Sleep until it is known that at least one node expired */
-       pth_sleep((unsigned int)data.soonest);
+       pthread_sleep((unsigned int)data.soonest);
        /* Tell our iterator function the current time and the maximum
         * allowed time to wait to
         */
@@ -1365,14 +1352,14 @@ clnode_expire_thread(void *args)
        /* Lock ctable, reset expired nodes, garbage collect, and set
         * data.soonest to the time of the soonest next expireing node.
         */
-       pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+       pthread_mutex_lock(&ctable_lock);
        if (HASHTABLE_NODES(&ctable) > 0)
            hashtable_iterate(&ctable, clnode_expire_thread_iterator, &data);
-       pth_mutex_release(&ctable_lock);
+       pthread_mutex_unlock(&ctable_lock);
     }
 
     /* NOTREACHED */
-    pth_exit(NULL);
+    pthread_exit(NULL);
     return NULL;
 }
 
@@ -1403,11 +1390,5 @@ clnode_expire_thread_iterator(hashnode_t *hnod, void *udata)
     if (rem != 0 && data->soonest > rem)
        data->soonest = rem;
 
-    /* If the cache is big, prevent from interfering with other threads */
-    if ((data->cnt++) == 64) {
-       data->cnt = 0;
-       pth_yield(NULL);
-    }
-
     return TRUE;
 }
index 9bf088a..d730e59 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmserver.h,v 1.10 2004/06/09 20:58:54 mmondor Exp $ */
+/* $Id: mmserver.h,v 1.11 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2000-2004, Matthew Mondor
@@ -43,7 +43,7 @@
 
 
 #include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmtypes.h>
 #include <mmpool.h>
@@ -51,6 +51,8 @@
 #include <mmhash.h>
 #include <mmlimitrate.h>
 
+#include <mm_pthread_msg.h>
+
 
 
 
@@ -146,7 +148,7 @@ struct ifacendx {
  */
 struct async_msg {
     pnode_t node;
-    pth_message_t msg;
+    pthread_msg_t msg;
     struct async_clenv *aclenv;
     unsigned int func_id;
     /* User data structure here, user async functions should not touch
@@ -169,10 +171,10 @@ struct async_proc {
 
 /* Global async server context environment */
 struct async_env {
-    pth_t slave;
-    pth_msgport_t port;
-    pth_event_t ring;
-    pth_mutex_t apool_lock;
+    pthread_t slave;
+    pthread_port_t port;
+    pthread_ring_t ring;
+    pthread_mutex_t apool_lock;
     struct async_func *funcs;
     unsigned int nfuncs, nprocs;
     size_t msg_len;
@@ -190,8 +192,8 @@ struct async_idx {
 /* Client/thread specific async context */
 struct async_clenv {
     pnode_t node;
-    pth_msgport_t port;
-    pth_event_t ring;
+    pthread_port_t port;
+    pthread_ring_t ring;
     struct async_msg *msg;
 };
 
@@ -220,7 +222,7 @@ extern void tcp_server(char *, char *, char *, uid_t, gid_t *, int,
 extern bool    make_daemon(const char *, const char *);
 
 extern bool    async_init(struct async_func *, int, uid_t, gid_t *, int);
-extern bool    async_init_pth(void);
+extern bool    async_init_pthread(void);
 extern void    async_call(struct async_clenv *, unsigned int);
 
 extern char *  resolve_hostname(struct async_clenv *, struct sockaddr *);
index f702e72..37e47f3 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmsql.c,v 1.13 2004/12/03 17:29:50 mmondor Exp $ */
+/* $Id: mmsql.c,v 1.14 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2000-2004, Matthew Mondor
@@ -69,7 +69,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmsql.c,v 1.13 2004/12/03 17:29:50 mmondor Exp $");
+MMRCSID("$Id: mmsql.c,v 1.14 2007/03/13 20:28:22 mmondor Exp $");
 
 
 
@@ -93,7 +93,6 @@ mmsql_init(struct mmsql_threadsupport *funcs)
     thrfuncs.mutex_destroy = funcs->mutex_destroy;
     thrfuncs.mutex_lock = funcs->mutex_lock;
     thrfuncs.mutex_unlock = funcs->mutex_unlock;
-    thrfuncs.thread_yield = funcs->thread_yield;
     thrfuncs.thread_sleep = funcs->thread_sleep;
 
     if ((mmsql_lock = thrfuncs.mutex_init()) != NULL)
index 0202e94..16de574 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmsql.h,v 1.8 2004/08/21 10:00:12 mmondor Exp $ */
+/* $Id: mmsql.h,v 1.9 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2000-2004, Matthew Mondor
@@ -60,8 +60,7 @@ struct mmsql_threadsupport {
     void * (*mutex_destroy)(void *);
     void (*mutex_lock)(void *);
     void (*mutex_unlock)(void *);
-    void (*thread_yield)(void);
-    void (*thread_sleep)(int);
+    unsigned int (*thread_sleep)(unsigned int);
 };
 
 
index e5f916c..3eb5598 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmstr.c,v 1.8 2004/05/22 17:44:00 mmondor Exp $ */
+/* $Id: mmstr.c,v 1.9 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2000-2004, Matthew Mondor
@@ -39,7 +39,7 @@
 /* INCLUDES */
 
 #include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmtypes.h>
 #include <mmstring.h>
@@ -51,7 +51,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmstr.c,v 1.8 2004/05/22 17:44:00 mmondor Exp $");
+MMRCSID("$Id: mmstr.c,v 1.9 2007/03/13 20:28:22 mmondor Exp $");
 
 
 
@@ -59,7 +59,7 @@ MMRCSID("$Id: mmstr.c,v 1.8 2004/05/22 17:44:00 mmondor Exp $");
 /* GLOBALS */
 
 static pool_t mmstrp;
-static pth_mutex_t mmstrp_lock = PTH_MUTEX_INIT;
+static pthread_mutex_t mmstrp_lock = PTHREAD_MUTEX_INITIALIZER;
 
 
 
@@ -76,13 +76,13 @@ mmstrinit(void *(*allocfunc)(size_t), void (*freefunc)(void *),
 {
     bool ok = FALSE;
 
-    pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+    pthread_mutex_lock(&mmstrp_lock);
     if (!POOL_VALID(&mmstrp))
        ok = pool_init(&mmstrp, "mmstr_pool", allocfunc, freefunc, NULL, NULL,
                sizeof(mmstrnode), buffer / sizeof(mmstrnode), 0, 0);
     else
        ok = TRUE;
-    pth_mutex_release(&mmstrp_lock);
+    pthread_mutex_unlock(&mmstrp_lock);
 
     return ok;
 }
@@ -92,9 +92,9 @@ mmstrinit(void *(*allocfunc)(size_t), void (*freefunc)(void *),
 void
 mmstrexit(void)
 {
-    pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+    pthread_mutex_lock(&mmstrp_lock);
     pool_destroy(&mmstrp);
-    pth_mutex_release(&mmstrp_lock);
+    pthread_mutex_unlock(&mmstrp_lock);
 }
 
 
@@ -113,9 +113,9 @@ mmstrdup(const char *str)
      */
     for (ptr = str; *ptr != '\0'; ptr++) ;
     if ((len = (size_t)(ptr - str) + 1) < 256) {
-       pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+       pthread_mutex_lock(&mmstrp_lock);
        nod = (mmstrnode *)pool_alloc(&mmstrp, FALSE);
-       pth_mutex_release(&mmstrp_lock);
+       pthread_mutex_unlock(&mmstrp_lock);
        if (nod != NULL) {
            mm_memcpy(nod->string, str, len);
            return nod->string;
@@ -135,9 +135,9 @@ mmstralloc(size_t size)
     register mmstrnode *nod = NULL;
 
     if (size < 256) {
-       pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+       pthread_mutex_lock(&mmstrp_lock);
        nod = (mmstrnode *)pool_alloc(&mmstrp, FALSE);
-       pth_mutex_release(&mmstrp_lock);
+       pthread_mutex_unlock(&mmstrp_lock);
        if (nod != NULL) {
            *nod->string = '\0';
            return nod->string;
@@ -155,9 +155,9 @@ mmstrfree(char *str)
     /* Evaluate actual node pointer from supplied string pointer and free it */
     str -= sizeof(pnode_t);
 
-    pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+    pthread_mutex_lock(&mmstrp_lock);
     pool_free((pnode_t *)str);
-    pth_mutex_release(&mmstrp_lock);
+    pthread_mutex_unlock(&mmstrp_lock);
 
     return NULL;
 }
index 6beaffd..15e7897 100644 (file)
@@ -1,4 +1,20 @@
-$Id: ChangeLog,v 1.52 2005/11/17 07:38:07 mmondor Exp $
+$Id: ChangeLog,v 1.53 2007/03/13 20:28:22 mmondor Exp $
+
+
+
+Release: mmmail 0.1.0 devl
+Date   : March 13, 2007
+By     : Matthew Mondor
+
+* Performance enhancements
+  - Now uses pthread instead of pth, using the new pthread_utils library
+    for functionality which pth had but pthread lacks.  This allows to
+    execute less functions through dedicated processes (which also require
+    some send(2)/recv(2) overhead), because of the expected preemptive
+    nature of most pthread implementations.  It also allows better scaling
+    with SMP.
+  - Uses PTHREAD_MUTEX_INITIALIZER where possible instead of
+    pthread_mutex_init().
 
 
 
index 9ad6850..f1e92e9 100644 (file)
@@ -1,16 +1,15 @@
-# $Id: GNUmakefile,v 1.7 2005/01/23 14:47:34 mmondor Exp $
+# $Id: GNUmakefile,v 1.8 2007/03/13 20:28:22 mmondor Exp $
 
 MMLIBS := $(addprefix ../mmlib/,mmfd.o mmhash.o mmlimitrate.o mmsql.o \
-mmlog.o mmpool.o mmreadcfg.o mmserver.o mm_pth_pool.o mmstat.o mmstr.o \
-mmstring.o)
+mmlog.o mmpool.o mmreadcfg.o mmserver.o mmstat.o mmstr.o \
+mmstring.o) $(addprefix ../pthread_util/,mm_pthread_msg.o \
+mm_pthread_poll.o mm_pthread_pool.o mm_pthread_sleep.o)
 
-PTH_CFLAGS := $(shell pth-config --cflags)
-PTH_LDFLAGS := $(shell pth-config --ldflags --libs)
 MYSQL_CFLAGS := $(shell mysql_config --cflags)
 MYSQL_LDFLAGS := $(shell mysql_config --libs)
 
-CFLAGS += $(PTH_CFLAGS) $(MYSQL_CFLAGS) -I. -I../mmlib
-LDFLAGS += $(PTH_LDFLAGS) $(MYSQL_LDFLAGS) -lc -lcrypt
+CFLAGS += $(MYSQL_CFLAGS) -I. -I../mmlib -I../pthread_util
+LDFLAGS += $(MYSQL_LDFLAGS) -lc -lcrypt -lpthread
 
 OBJS := src/mmsmtpd/mmsmtpd.o src/mmpop3d/mmpop3d.o src/mmrelayd/mmrelayd.o
 BINS := src/mmsmtpd/mmsmtpd src/mmpop3d/mmpop3d src/mmrelayd/mmrelayd
@@ -18,17 +17,19 @@ BINS := src/mmsmtpd/mmsmtpd src/mmpop3d/mmpop3d src/mmrelayd/mmrelayd
 CFLAGS += -Wall
 #CFLAGS += -DMMMAIL_MYSQL
 CFLAGS += -DMMMAIL_FILE
+#CFLAGS += -DNODETACH -DDEBUG -DPTHREAD_DEBUG
+#LDFLAGS += -lpthread_dbg
 
 
 all: $(BINS)
 
 # To compile all C files to objects
 %.o: %.c
-       cc -c ${CFLAGS} -o $@ $<
+       $(CC) -c ${CFLAGS} -o $@ $<
 
 # To link all executables (binaries)
 $(BINS): $(MMLIBS) $(OBJS)
-       cc -o $@ ${LDFLAGS} ${MMLIBS} $@.o
+       $(CC) -o $@ ${LDFLAGS} ${MMLIBS} $@.o
 
 
 install:
index ab1de52..fe0e72b 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmpop3d.c,v 1.40 2005/03/05 15:33:33 mmondor Exp $ */
+/* $Id: mmpop3d.c,v 1.41 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
@@ -59,7 +59,8 @@
 #include <time.h>
 
 #include <ctype.h>
-#include <pth.h>
+
+#include <pthread.h>
 
 #include <mmtypes.h>
 #include <mmreadcfg.h>
@@ -74,6 +75,8 @@
 #include <mmstring.h>
 #include <mmstat.h>
 
+#include <mm_pthread_sleep.h>
+
 #include "mmpop3d.h"
 
 
@@ -81,7 +84,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmpop3d.c,v 1.40 2005/03/05 15:33:33 mmondor Exp $");
+MMRCSID("$Id: mmpop3d.c,v 1.41 2007/03/13 20:28:22 mmondor Exp $");
 
 
 
@@ -152,11 +155,11 @@ static const struct state states[] = {
 static int LOGLEVEL;
 
 /* Pool used to allocate clientenv nodes */
-static pth_mutex_t clenv_lock;
+static pthread_mutex_t clenv_lock = PTHREAD_MUTEX_INITIALIZER;
 static pool_t clenv_pool;
 
 /* Pool used to optimize creating/destroying mmfd mutexes */
-static pth_mutex_t mutexes_lock;
+static pthread_mutex_t mutexes_lock = PTHREAD_MUTEX_INITIALIZER;
 static pool_t mutexes_pool;
 
 /* Used for fast command lookup */
@@ -170,17 +173,17 @@ static fdbcontext fdbc;
 static fdfuncs gfdf = {
     malloc,
     free,
-    pth_poll,
-    pth_read,
-    pth_write,
-    pth_sleep,
-    pth_usleep,
-    _pth_mutex_create,
-    _pth_mutex_destroy,
-    _pth_mutex_lock,
-    _pth_mutex_unlock,
-    _pth_thread_yield,
-    _pth_eintr
+    poll,
+    read,
+    write,
+    pthread_sleep,
+    pthread_usleep,
+    thread_mutex_create,
+    thread_mutex_destroy,
+    thread_mutex_lock,
+    thread_mutex_unlock,
+    NULL,
+    thread_eintr
 };
 
 
@@ -193,7 +196,7 @@ main(int argc, char **argv)
 {
     uid_t uid;
     gid_t *gids;
-    char *conf_file = "/etc/mmpop3d.conf";
+    char *conf_file = "/usr/local/etc/mmpop3d.conf";
     int ret = -1, ngids;
     long facility;
     char *db_host;
@@ -255,12 +258,11 @@ main(int argc, char **argv)
        {NULL, 0}
     };
     struct mmsql_threadsupport mmsqlfuncs = {
-       _pth_mutex_create,
-       _pth_mutex_destroy, 
-       _pth_mutex_lock,
-       _pth_mutex_unlock,
-       _pth_thread_yield,
-       _pth_thread_sleep
+       thread_mutex_create,
+       thread_mutex_destroy, 
+       thread_mutex_lock,
+       thread_mutex_unlock,
+       pthread_sleep
     };
     mmstat_t vstat;
 
@@ -380,10 +382,7 @@ main(int argc, char **argv)
     async_init(afuncs, CONF.ASYNC_PROCESSES, uid, gids, ngids);
 
     /* Things which shouldn't be part of the async pool processes */
-    pth_init();
-    async_init_pth();
-    pth_mutex_init(&clenv_lock);
-    pth_mutex_init(&mutexes_lock);
+    async_init_pthread();
 
     /* Allocate necessary pools */
     /* Client nodes */
@@ -399,6 +398,7 @@ main(int argc, char **argv)
 
     if (hash_commands(commands, 3) && POOL_VALID(&clenv_pool) &&
            POOL_VALID(&mutexes_pool) && strlist) {
+       thread_init();
        fdbcinit(&gfdf, &fdbc, CONF.GBANDWIDTH_IN * 1024,
                CONF.GBANDWIDTH_OUT * 1024);
        mmsql_init(&mmsqlfuncs);
@@ -886,9 +886,9 @@ alloc_clientenv(void)
 {
     clientenv *clenv;
 
-    pth_mutex_acquire(&clenv_lock, FALSE, NULL);
+    pthread_mutex_lock(&clenv_lock);
     clenv = (clientenv *)pool_alloc(&clenv_pool, TRUE);
-    pth_mutex_release(&clenv_lock);
+    pthread_mutex_unlock(&clenv_lock);
 
     if (clenv != NULL) {
        mmstat_init(&clenv->pstat, TRUE, FALSE);
@@ -939,9 +939,9 @@ free_clientenv(clientenv *clenv)
     if (clenv->index != NULL)
        free(clenv->index);
 
-    pth_mutex_acquire(&clenv_lock, FALSE, NULL);
+    pthread_mutex_lock(&clenv_lock);
     pool_free((pnode_t *)clenv);
-    pth_mutex_release(&clenv_lock);
+    pthread_mutex_unlock(&clenv_lock);
 
     return (NULL);
 }
@@ -1076,7 +1076,7 @@ valid_host(char *host)
      */
     ptr = host;
     while (*ptr != '\0') {
-       if (!isalnum(*ptr) || *ptr == ' ')
+       if (!isalnum((int)*ptr) || *ptr == ' ')
            return (FALSE);
        else {
            /* Find next host part */
@@ -1102,7 +1102,7 @@ valid_host(char *host)
 inline static bool
 valid_char(char c)
 {
-    return (isalnum(c) || c == '.' || c == '-' || c == '_');
+    return (isalnum((int)c) || c == '.' || c == '-' || c == '_');
 }
 
 
@@ -1747,70 +1747,66 @@ handleclient(unsigned long id, int fd, clientlistnode *clientlnode,
 /* mmfd library thread support functions */
 
 
+static pthread_mutexattr_t     thread_ma;
+
+static void
+thread_init(void)
+{
+    (void) pthread_mutexattr_init(&thread_ma);
+    (void) pthread_mutexattr_settype(&thread_ma, PTHREAD_MUTEX_RECURSIVE);
+}
+
+
 static void *
-_pth_mutex_create(void)
+thread_mutex_create(void)
 {
     struct mutexnode *mnod;
 
-    pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+    pthread_mutex_lock(&mutexes_lock);
     mnod = (struct mutexnode *)pool_alloc(&mutexes_pool, FALSE);
-    pth_mutex_release(&mutexes_lock);
+    pthread_mutex_unlock(&mutexes_lock);
 
     if (mnod != NULL)
-       pth_mutex_init(&mnod->mutex);
+       pthread_mutex_init(&mnod->mutex, &thread_ma);
 
     return ((void *)mnod);
 }
 
 
 static void *
-_pth_mutex_destroy(void *mtx)
+thread_mutex_destroy(void *mtx)
 {
-    /* struct mutexnode *mnod = mtx; */
+    struct mutexnode *mnod = mtx;
 
-    /* pth_mutex_destroy(&mnod->mutex); */
-    pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+    pthread_mutex_destroy(&mnod->mutex);
+    pthread_mutex_lock(&mutexes_lock);
     pool_free(mtx);
-    pth_mutex_release(&mutexes_lock);
+    pthread_mutex_unlock(&mutexes_lock);
 
     return (NULL);
 }
 
 
 static void
-_pth_mutex_lock(void *mtx)
+thread_mutex_lock(void *mtx)
 {
     struct mutexnode *mnod = mtx;
 
-    pth_mutex_acquire(&mnod->mutex, FALSE, NULL);
+    pthread_mutex_lock(&mnod->mutex);
 }
 
 
 static void
-_pth_mutex_unlock(void *mtx)
+thread_mutex_unlock(void *mtx)
 {
     struct mutexnode *mnod = mtx;
 
-    pth_mutex_release(&mnod->mutex);
-}
-
-
-static void
-_pth_thread_yield(void)
-{
-    pth_yield(NULL);
-}
-
-
-static void
-_pth_thread_sleep(int secs)
-{
-    pth_sleep(secs);
+    pthread_mutex_unlock(&mnod->mutex);
 }
 
 
 static bool
-_pth_eintr(void)
+thread_eintr(void)
 {
     if (errno == EINTR)
        return TRUE;
index 7b5d296..7f5caff 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmpop3d.h,v 1.16 2005/03/05 15:33:34 mmondor Exp $ */
+/* $Id: mmpop3d.h,v 1.17 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
@@ -44,7 +44,7 @@
 
 #include <sys/types.h>
 
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmtypes.h>
 #include <mmhash.h>
 #include <mmfd.h>
 #include <mmstat.h>
 
+#include <mm_pthread_sleep.h>
+
 
 
 
 /* DEFINITIONS */
 
 #define DAEMON_NAME    "mmpop3d"
-#define DAEMON_VERSION "mmmail-0.0.24/mmondor"
+#define DAEMON_VERSION "mmmail-0.1.0/mmondor"
 
 /* Negative states are used by the state swapper, others are real states */
 #define STATE_ERROR    -3
@@ -76,7 +78,7 @@
 #define REGISTER_ERROR(x) do { \
     (x)->errors++; \
        if (CONF.DELAY_ON_ERROR) \
-           pth_sleep((x)->errors); \
+           pthread_sleep((x)->errors); \
 } while(0)
 
 
@@ -143,7 +145,7 @@ typedef struct clientenv {
 /* Used for mmfd thread support delegation/abstraction */
 struct mutexnode {
     pnode_t node;
-    pth_mutex_t mutex;
+    pthread_mutex_t mutex;
 };
 
 /* This defines a state */
@@ -225,13 +227,12 @@ inline static bool valid_char(char);
 static int handleclient(unsigned long, int, clientlistnode *, struct iface *,
        struct async_clenv *);
 
-static void *_pth_mutex_create(void);
-static void *_pth_mutex_destroy(void *);
-static void _pth_mutex_lock(void *);
-static void _pth_mutex_unlock(void *);
-static void _pth_thread_yield(void);
-static void _pth_thread_sleep(int);
-static bool _pth_eintr(void);
+static void thread_init(void);
+static void *thread_mutex_create(void);
+static void *thread_mutex_destroy(void *);
+static void thread_mutex_lock(void *);
+static void thread_mutex_unlock(void *);
+static bool thread_eintr(void);
 
 static void async_checkpw(struct async_msg *);
 static bool checkpw(clientenv *, const char *, const char *);
index 1001b2e..3c9759e 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmsmtpd.c,v 1.74 2005/06/27 17:54:52 mmondor Exp $ */
+/* $Id: mmsmtpd.c,v 1.75 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
 
 #include <syslog.h>
 
-#include <pth.h>
 #include <signal.h>
 #include <time.h>
 #include <dirent.h>
 
 #include <ctype.h>
 
+#include <pthread.h>
+
 #include <mmtypes.h>
 #include <mmreadcfg.h>
 #include <mmfd.h>
@@ -82,7 +83,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmsmtpd.c,v 1.74 2005/06/27 17:54:52 mmondor Exp $");
+MMRCSID("$Id: mmsmtpd.c,v 1.75 2007/03/13 20:28:22 mmondor Exp $");
 
 
 
@@ -131,19 +132,19 @@ static int LOGLEVEL;
 
 /* Used for clenv allocation buffering */
 static pool_t clenv_pool;
-static pth_mutex_t clenv_lock;
+static pthread_mutex_t clenv_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /* Used for the flood protection cache */
 static pool_t hosts_pool;
 static hashtable_t hosts_table;
-static pth_mutex_t hosts_lock;
+static pthread_mutex_t hosts_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /* Used for rcpt allocation buffering */
 static pool_t rcpt_pool;
-static pth_mutex_t rcpt_lock;
+static pthread_mutex_t rcpt_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /* Pool used to optimize creating/destroying mmfd mutexes */
-static pth_mutex_t mutexes_lock;
+static pthread_mutex_t mutexes_lock = PTHREAD_MUTEX_INITIALIZER;
 static pool_t mutexes_pool;
 
 /* For fast command lookup */
@@ -181,17 +182,17 @@ static const struct reply_messages data_msg[DATA_MAX] = {
 static fdfuncs gfdf = {
     malloc,
     free,
-    pth_poll,
-    pth_read,
-    pth_write,
-    pth_sleep,
-    pth_usleep,
-    _pth_mutex_create,
-    _pth_mutex_destroy,
-    _pth_mutex_lock,
-    _pth_mutex_unlock,
-    _pth_thread_yield,
-    _pth_eintr
+    poll,
+    read,
+    write,
+    pthread_sleep,
+    pthread_usleep,
+    thread_mutex_create,
+    thread_mutex_destroy,
+    thread_mutex_lock,
+    thread_mutex_unlock,
+    NULL,
+    thread_eintr
 };
 
 /*
@@ -241,7 +242,7 @@ static const unsigned char valid_addr_host_table[256] = {
  * Connection to mmrelayd(8) establishment
  */
 int            relayd_sock = -1;
-pth_mutex_t    relayd_lock = PTH_MUTEX_INIT;
+pthread_mutex_t        relayd_lock = PTHREAD_MUTEX_INITIALIZER;
 
 
 
@@ -253,7 +254,7 @@ main(int argc, char **argv)
 {
     uid_t uid;
     gid_t *gids;
-    char *conf_file = "/etc/mmsmtpd.conf";
+    char *conf_file = "/usr/local/etc/mmsmtpd.conf";
     int ngids, ret = -1;
     long facility;
     char *db_host;
@@ -339,17 +340,16 @@ main(int argc, char **argv)
        {NULL, 0}
     };
     struct mmsql_threadsupport mmsqlfuncs = {
-       _pth_mutex_create,
-       _pth_mutex_destroy,
-       _pth_mutex_lock,
-       _pth_mutex_unlock,
-       _pth_thread_yield,
-       _pth_thread_sleep
+       thread_mutex_create,
+       thread_mutex_destroy,
+       thread_mutex_lock,
+       thread_mutex_unlock,
+       pthread_sleep
     };
     mmstat_t vstat;
-    pth_t hosts_table_thread = NULL;
-    pth_t mmmail_db_gc_thread = NULL;
-    pth_attr_t threadattr;
+    pthread_t hosts_table_thread = NULL;
+    pthread_t mmmail_db_gc_thread = NULL;
+    pthread_attr_t threadattr;
 
     /* Set defaults */
     *CONF.CHROOT_DIR = 0;
@@ -506,12 +506,7 @@ main(int argc, char **argv)
     async_init(afuncs, CONF.ASYNC_PROCESSES, uid, gids, ngids);
 
     /* Things which shouldn't be part of the async pool processes */
-    pth_init();
-    async_init_pth();
-    pth_mutex_init(&clenv_lock);
-    pth_mutex_init(&hosts_lock);
-    pth_mutex_init(&rcpt_lock);
-    pth_mutex_init(&mutexes_lock);
+    async_init_pthread();
 
     /* Allocate necessary pools */
     /* Client nodes */
@@ -526,20 +521,21 @@ main(int argc, char **argv)
     pool_init(&mutexes_pool, "mutexes_pool", malloc, free, NULL, NULL,
            sizeof(struct mutexnode),
            (16384 * CONF.ALLOC_BUFFERS) / sizeof(struct mutexnode), 0, 0);
+
+    pthread_attr_init(&threadattr);
+    pthread_attr_setdetachstate(&threadattr, 0);
+
     /* Rate nodes */
     if (CONF.FLOOD_PROTECTION) {
        pool_init(&hosts_pool, "hosts_pool", malloc, free, NULL, NULL,
                sizeof(hostnode), CONF.FLOOD_CACHE, 1, 1);
        hashtable_init(&hosts_table, "hosts_table", CONF.FLOOD_CACHE, 1,
                malloc, free, mm_memcmp, mm_memhash32, FALSE);
-       threadattr = pth_attr_new();
-       pth_attr_set(threadattr, PTH_ATTR_JOINABLE, TRUE);
-       hosts_table_thread = pth_spawn(threadattr, hosts_expire_thread, NULL);
+       pthread_create(&hosts_table_thread, &threadattr, hosts_expire_thread,
+                      NULL);
     }
     /* Launch box directories cleaning thread */
-    threadattr = pth_attr_new();
-    pth_attr_set(threadattr, PTH_ATTR_JOINABLE, TRUE);
-    mmmail_db_gc_thread = pth_spawn(threadattr, db_gc_thread, NULL);
+    pthread_create(&mmmail_db_gc_thread, &threadattr, db_gc_thread, NULL);
     /* mmstr nodes */
     strlist = mmstrinit(malloc, free, 65536 * CONF.ALLOC_BUFFERS);
 
@@ -548,6 +544,7 @@ main(int argc, char **argv)
            (!CONF.FLOOD_PROTECTION || (POOL_VALID(&hosts_pool) &&
                    HASHTABLE_VALID(&hosts_table) &&
                    hosts_table_thread != NULL))) {
+       thread_init();
        fdbcinit(&gfdf, &fdbc, CONF.GBANDWIDTH_IN * 1024,
                CONF.GBANDWIDTH_OUT * 1024);
        mmsql_init(&mmsqlfuncs);
@@ -569,6 +566,7 @@ main(int argc, char **argv)
 
     if (strlist)
        mmstrexit();
+    /* XXX
     if (hosts_table_thread != NULL) {
        pth_abort(hosts_table_thread);
        pth_join(hosts_table_thread, NULL);
@@ -577,6 +575,7 @@ main(int argc, char **argv)
        pth_abort(mmmail_db_gc_thread);
        pth_join(mmmail_db_gc_thread, NULL);
     }
+    */
     if (HASHTABLE_VALID(&command_table))
        hashtable_destroy(&command_table, FALSE);
     if (POOL_VALID(&command_pool))
@@ -745,13 +744,18 @@ all_mail(clientenv *clenv)
            if ((mm_strncasecmp(" FROM:<>", &clenv->buffer[4], 8)) == 0) {
                /* Some systems use empty MAIL FROM like this, make sure
                 * that IP address or hostname is allowed to do this.
+                * If so, we also want to make sure not to perform any type
+                * of envelope based filtering for this post.
                 */
                valid = check_nofrom(clenv->c_ipaddr, clenv->c_hostname);
                if (valid)
                    *addr = '\0';
-           } else
+               clenv->nofrom = TRUE;
+           } else {
                valid = valid_address(clenv, addr, 128, clenv->buffer,
                        (CONF.RESOLVE_MX_MAIL) ? HOST_RES_MX : HOST_NORES);
+               clenv->nofrom = FALSE;
+           }
 
            if (valid) {
                if ((clenv->from = (char *)mmstrdup(addr)) != NULL)
@@ -882,8 +886,11 @@ all_rcpt(clientenv *clenv)
 
     /* These only apply to local addresses */
     if (!relay) {
-       /* Ensure to observe allow filters if any set for box */
-       if (boxinfo.filter) {
+       /*
+        * Ensure to observe allow filters if any set for box, except for mail
+        * with an empty FROM address from allowed servers
+        */
+       if (boxinfo.filter && !clenv->nofrom) {
            if (!box_filter_allow(addr, clenv->from, boxinfo.filter_type)) {
                reason = RCPT_FILTER;
                if (CONF.STATFAIL_FILTER)
@@ -912,20 +919,13 @@ all_rcpt(clientenv *clenv)
      */
     {
        register rcptnode *rnode;
-       register int cnt;
 
        ahash = mm_strhash64(addr);
-       cnt = 0;
        DLIST_FOREACH(&clenv->rcpt, rnode) {
            if (rnode->hash == ahash) {
                reason = RCPT_EXISTS;
                goto end;
            }
-           cnt++;
-           if (cnt > 64) {
-               cnt = 0;
-               pth_yield(NULL);
-           }
        }
     }
 
@@ -944,7 +944,7 @@ all_rcpt(clientenv *clenv)
        len = mm_strlen(entry);
 
        valid = TRUE;
-       pth_mutex_acquire(&hosts_lock, FALSE, NULL);
+       pthread_mutex_lock(&hosts_lock);
        /* First acquire our hostnode, or create it if required */
        if ((hnod = (hostnode *)hashtable_lookup(&hosts_table, entry, len + 1))
                == NULL) {
@@ -972,7 +972,7 @@ all_rcpt(clientenv *clenv)
                        clenv->id, LR_POSTS(&hnod->lr), CONF.FLOOD_EXPIRES);
            }
        }
-       pth_mutex_release(&hosts_lock);
+       pthread_mutex_unlock(&hosts_lock);
 
        if (!valid) {
            if (CONF.STATFAIL_FLOOD)
@@ -988,9 +988,9 @@ all_rcpt(clientenv *clenv)
        register rcptnode *rnode;
 
        reason = RCPT_ERROR;
-       pth_mutex_acquire(&rcpt_lock, FALSE, NULL);
+       pthread_mutex_lock(&rcpt_lock);
        rnode = (rcptnode *)pool_alloc(&rcpt_pool, FALSE);
-       pth_mutex_release(&rcpt_lock);
+       pthread_mutex_unlock(&rcpt_lock);
        if (rnode != NULL) {
            mm_strcpy(rnode->address, (relay ? foraddr : addr));
            mm_strcpy(rnode->foraddress, foraddr);
@@ -1151,9 +1151,9 @@ alloc_clientenv(void)
 {
     clientenv *clenv;
 
-    pth_mutex_acquire(&clenv_lock, FALSE, NULL);
+    pthread_mutex_lock(&clenv_lock);
     clenv = (clientenv *)pool_alloc(&clenv_pool, TRUE);
-    pth_mutex_release(&clenv_lock);
+    pthread_mutex_unlock(&clenv_lock);
 
     if (clenv != NULL) {
        mmstat_init(&clenv->vstat, TRUE, TRUE);
@@ -1190,9 +1190,9 @@ free_clientenv(clientenv *clenv)
        mmstrfree(clenv->from);
     empty_rcpts(&clenv->rcpt);
 
-    pth_mutex_acquire(&clenv_lock, FALSE, NULL);
+    pthread_mutex_lock(&clenv_lock);
     pool_free((pnode_t *)clenv);
-    pth_mutex_release(&clenv_lock);
+    pthread_mutex_unlock(&clenv_lock);
 
     return (NULL);
 }
@@ -1206,7 +1206,7 @@ empty_rcpts(list_t *lst)
 {
     node_t *nod, *tmp;
 
-    pth_mutex_acquire(&rcpt_lock, FALSE, NULL);
+    pthread_mutex_lock(&rcpt_lock);
 
     for (nod = DLIST_TOP(lst); nod != NULL; nod = tmp) {
        tmp = DLIST_NEXT(nod);
@@ -1214,7 +1214,7 @@ empty_rcpts(list_t *lst)
     }
     DLIST_INIT(lst);
 
-    pth_mutex_release(&rcpt_lock);
+    pthread_mutex_unlock(&rcpt_lock);
 }
 
 
@@ -1246,7 +1246,7 @@ check_alias(char *addr)
     if ((mysqlres = mmsql_query(query, mm_strlen(query))) != NULL) {
        if ((mysql_num_rows(mysqlres)) > 0) {
            char pat[64];
-           int cur = 0, max = -1, cnt = 0;
+           int cur = 0, max = -1;
            MYSQL_ROW *row;
            unsigned long *lengths;
 
@@ -1266,11 +1266,6 @@ check_alias(char *addr)
                        }
                    }
                }
-               cnt++;
-               if (cnt > 64) {
-                   cnt = 0;
-                   pth_yield(NULL);
-               }
            }
            if (max > -1)
                res = TRUE;
@@ -1297,7 +1292,6 @@ check_nofrom(const char *addr, const char *host)
     if ((mysqlres = mmsql_query("SELECT nofrom_pattern FROM nofrom", -1))
            != NULL) {
        if ((mysql_num_rows(mysqlres)) > 0) {
-           int cnt = 0;
            MYSQL_ROW *row;
            unsigned long *lengths;
 
@@ -1321,11 +1315,6 @@ check_nofrom(const char *addr, const char *host)
                        }
                    }
                }
-               cnt++;
-               if (cnt > 64) {
-                   cnt = 0;
-                   pth_yield(NULL);
-               }
            }
        }
        mysqlres = mmsql_free_result(mysqlres);
@@ -1582,7 +1571,7 @@ valid_host(clientenv *clenv, char *host, int res, bool addr, bool sanity)
         */
        ptr = host;
        while (*ptr != '\0') {
-           if (!isalnum(*ptr))
+           if (!isalnum((int)*ptr))
                return FALSE;
            /* Find next host part */
            while (*ptr != '\0' && *ptr != '.') ptr++;
@@ -1639,7 +1628,7 @@ valid_ipaddress(const char *addr)
            } else return (FALSE);
            if (*addr == '\0')
                break;
-       } else if (isdigit(*addr))
+       } else if (isdigit((int)*addr))
            *uptr++ = *addr;
        else
            return (FALSE);
@@ -1689,7 +1678,8 @@ validate_msg_line(char *line, ssize_t *len, int *res, void *udata)
         * spaces/tabs.
         */
        if (*line != '\t' && *line != ' ') {
-           for (ptr = line; *ptr != '\0' && (isalnum(*ptr) || *ptr == '-');
+           for (ptr = line; *ptr != '\0' && (isalnum((int)*ptr) ||
+                       *ptr == '-');
                    ptr++) ;
            if (*ptr != ':')
                goto endheader;
@@ -2418,7 +2408,7 @@ do_data_queue_notify(clientenv *clenv)
      * If we cannot obtain lock, we know that it's already being notified, and
      * we don't need to do anything.
      */
-    if (pth_mutex_acquire(&relayd_lock, TRUE, NULL) == FALSE)
+    if (pthread_mutex_lock(&relayd_lock) != 0)
        return;
 
     /*
@@ -2435,7 +2425,7 @@ do_data_queue_notify(clientenv *clenv)
      * and send it again, but once only.
      */
     for (;;) {
-       if (pth_write(relayd_sock, "N", 1) != 1) {
+       if (write(relayd_sock, "N", 1) != 1) {
            (void) close(relayd_sock);
            if ((relayd_sock = do_data_queue_notify_connect()) != -1)
                continue;
@@ -2450,7 +2440,7 @@ end:
        mmsyslog(0, LOGLEVEL,
                "mmrelayd(8) could not be notified (not running?)");
 
-    (void) pth_mutex_release(&relayd_lock);
+    (void) pthread_mutex_unlock(&relayd_lock);
 }
 
 /*
@@ -2472,7 +2462,7 @@ do_data_queue_notify_connect(void)
        mm_memclr(&addr, sizeof(struct sockaddr_un));
        (void) mm_strncpy(addr.sun_path, CONF.MMRELAYD_SOCKET_PATH, 100);
        addr.sun_family = AF_UNIX;
-       if ((pth_connect(fd, (struct sockaddr *)&addr,
+       if ((connect(fd, (struct sockaddr *)&addr,
                        sizeof(struct sockaddr_un))) == -1) {
            (void) close(fd);
            fd = -1;
@@ -2677,70 +2667,66 @@ handleclient(unsigned long id, int fd, clientlistnode *clientlnode,
 /* mmfd library thread support functions */
 
 
+static pthread_mutexattr_t     thread_ma;
+
+static void
+thread_init(void)
+{
+    (void) pthread_mutexattr_init(&thread_ma);
+    (void) pthread_mutexattr_settype(&thread_ma, PTHREAD_MUTEX_RECURSIVE);
+}
+
+
 static void *
-_pth_mutex_create(void)
+thread_mutex_create(void)
 {
     struct mutexnode *mnod;
 
-    pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+    pthread_mutex_lock(&mutexes_lock);
     mnod = (struct mutexnode *)pool_alloc(&mutexes_pool, FALSE);
-    pth_mutex_release(&mutexes_lock);
+    pthread_mutex_unlock(&mutexes_lock);
 
     if (mnod != NULL)
-       pth_mutex_init(&mnod->mutex);
+       pthread_mutex_init(&mnod->mutex, &thread_ma);
 
     return ((void *)mnod);
 }
 
 
 static void *
-_pth_mutex_destroy(void *mtx)
+thread_mutex_destroy(void *mtx)
 {
-    /* struct mutexnode *mnod = mtx; */
+    struct mutexnode *mnod = mtx;
 
-    /* pth_mutex_destroy(&mnod->mutex); */
-    pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+    pthread_mutex_destroy(&mnod->mutex);
+    pthread_mutex_lock(&mutexes_lock);
     pool_free(mtx);
-    pth_mutex_release(&mutexes_lock);
+    pthread_mutex_unlock(&mutexes_lock);
 
     return (NULL);
 }
 
 
 static void
-_pth_mutex_lock(void *mtx)
+thread_mutex_lock(void *mtx)
 {
     struct mutexnode *mnod = mtx;
 
-    pth_mutex_acquire(&mnod->mutex, FALSE, NULL);
+    pthread_mutex_lock(&mnod->mutex);
 }
 
 
 static void
-_pth_mutex_unlock(void *mtx)
+thread_mutex_unlock(void *mtx)
 {
     struct mutexnode *mnod = mtx;
 
-    pth_mutex_release(&mnod->mutex);
-}
-
-
-static void
-_pth_thread_yield(void)
-{
-    pth_yield(NULL);
-}
-
-
-static void
-_pth_thread_sleep(int secs)
-{
-    pth_sleep(secs);
+    pthread_mutex_unlock(&mnod->mutex);
 }
 
 
 static bool
-_pth_eintr(void)
+thread_eintr(void)
 {
     if (errno == EINTR)
        return TRUE;
@@ -2799,10 +2785,9 @@ hosts_expire_thread(void *args)
 
     /* Set the initial timeout to the maximum allowed */
     data.soonest = CONF.FLOOD_EXPIRES * 60;
-    data.cnt = 0;
     for (;;) {
        /* Sleep until it is known that at least one node expired */
-       pth_sleep((unsigned int)data.soonest);
+       pthread_sleep(data.soonest);
        /* Tell our iterator function the current time and the maximum
         * allowed time to wait to
         */
@@ -2811,15 +2796,15 @@ hosts_expire_thread(void *args)
        /* Lock hosts_table, expunge expired nodes and set data.soonest to the
         * time of the soonest next expireing node
         */
-       pth_mutex_acquire(&hosts_lock, FALSE, NULL);
+       pthread_mutex_lock(&hosts_lock);
        if (HASHTABLE_NODES(&hosts_table) > 0)
            hashtable_iterate(&hosts_table, hosts_expire_thread_iterator,
                    &data);
-       pth_mutex_release(&hosts_lock);
+       pthread_mutex_unlock(&hosts_lock);
     }
 
     /* NOTREACHED */
-    pth_exit(NULL);
+    pthread_exit(NULL);
     return NULL;
 }
 
@@ -2842,12 +2827,6 @@ hosts_expire_thread_iterator(hashnode_t *hnod, void *udata)
            data->soonest = rem;
     }
 
-    /* If the cache is big, prevent from interfering with other threads */
-    if ((data->cnt++) == 64) {
-       data->cnt = 0;
-       pth_yield(NULL);
-    }
-
     return TRUE;
 }
 
@@ -2869,7 +2848,7 @@ db_gc_thread(void *args)
     for (rounds = 1; ; rounds++) {
        MYSQL_RES       *mysqlres;
 
-       (void) pth_sleep(60);
+       (void) pthread_sleep(60);
 
        /*
         * Perform dangling mailbox directories cleanup
@@ -2959,7 +2938,7 @@ db_gc_thread(void *args)
     }
 
     /* NOTREACHED */
-    pth_exit(NULL);
+    pthread_exit(NULL);
     return NULL;
 }
 
@@ -2969,7 +2948,6 @@ db_gc_thread_delete(const char *addr)
     char               dirpath[256], filepath[256];
     DIR                        *dir;
     struct dirent      ent, *res;
-    int                        count;
 
     /*
      * Now <path> holds the actual directory to delete mail from. We know
@@ -2982,16 +2960,11 @@ db_gc_thread_delete(const char *addr)
        return;
     }
 
-    count = 1;
     while (readdir_r(dir, &ent, &res) == 0 && res == &ent) {
        (void) snprintf(filepath, 255, "%s/%s", dirpath, ent.d_name);
        if (unlink(filepath) != 0)
            syslog(LOG_NOTICE, "db_gc_thread_delete(%s) - unlink(%s) == %s",
                    addr, filepath, strerror(errno));
-       if (++count == 64) {
-           count = 0;
-           (void) pth_yield(NULL);
-       }
     }
     if (rmdir(dirpath) != 0)
        syslog(LOG_NOTICE, "db_gc_thread_delete(%s) - rmdir(%s) == %s",
index 9cb9a01..e46674b 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmsmtpd.h,v 1.33 2005/03/26 11:45:48 mmondor Exp $ */
+/* $Id: mmsmtpd.h,v 1.34 2007/03/13 20:28:22 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
@@ -45,7 +45,8 @@
 #include <sys/types.h>
 #include <time.h>
 
-#include <pth.h>
+#include <pthread.h>
+#include <mm_pthread_sleep.h>
 
 #include <mmtypes.h>
 #include <mmlist.h>
@@ -61,7 +62,7 @@
 
 /* DEFINITIONS */
 #define DAEMON_NAME    "mmsmtpd"
-#define DAEMON_VERSION "mmmail-0.0.24/mmondor"
+#define DAEMON_VERSION "mmmail-0.1.0/mmondor"
 
 /* Negative states are used by the state swapper, others are real states */
 #define STATE_ERROR    -3
@@ -110,7 +111,7 @@ enum data_reason {
 #define REGISTER_ERROR(x)      do { \
     (x)->errors++; \
        if (CONF.DELAY_ON_ERROR) \
-           pth_sleep((x)->errors); \
+           pthread_sleep((x)->errors); \
 } while(FALSE)
 
 /* Evaluates if a character is valid for addresses and hostnames */
@@ -153,6 +154,7 @@ typedef struct clientenv {
     long mesg_size;            /* Current cached message size in bytes */
     long errors;               /* Total number of errors that occured */
     int timeout;               /* Timeout in ms */
+    bool nofrom;               /* If empty MAIL FROM from allowed server */
     unsigned long id;          /* Our connection ID */
     unsigned long messages;    /* Messages user sent us */
     unsigned long rcpts;       /* Number of RCPT accepted */
@@ -182,13 +184,12 @@ typedef struct hostnode {
 
 struct hosts_expire_thread_iterator_udata {
     time_t current, soonest;
-    int cnt;
 };
 
 /* Used for mmfd thread support delegation/abstraction */
 struct mutexnode {
     pnode_t node;
-    pth_mutex_t mutex;
+    pthread_mutex_t mutex;
 };
 
 /* This defines a state */
@@ -317,13 +318,12 @@ static int do_data_queue_notify_connect(void);
 static int handleclient(unsigned long, int, clientlistnode *, struct iface *,
        struct async_clenv *);
 
-static void *_pth_mutex_create(void);
-static void *_pth_mutex_destroy(void *);
-static void _pth_mutex_lock(void *);
-static void _pth_mutex_unlock(void *);
-static void _pth_thread_yield(void);
-static void _pth_thread_sleep(int);
-static bool _pth_eintr(void);
+static void thread_init(void);
+static void *thread_mutex_create(void);
+static void *thread_mutex_destroy(void *);
+static void thread_mutex_lock(void *);
+static void thread_mutex_unlock(void *);
+static bool thread_eintr(void);
 
 static void async_resquery(struct async_msg *);
 static int a_res_query(clientenv *, const char *, int, int, u_char *, int);