This branch is an attempt to migrate from pth to pthread.
authorMatthew Mondor <mmondor@pulsar-zone.net>
Wed, 16 Nov 2005 01:00:58 +0000 (01:00 +0000)
committerMatthew Mondor <mmondor@pulsar-zone.net>
Wed, 16 Nov 2005 01:00:58 +0000 (01:00 +0000)
The tests/pthread_utils library is developed and used to replace the pth
features which mmsoftware requires.

mmlib/mmserver.*, mmlib/mmstr.* and mmftpd/* are being fixed first at
this point;  There currently is a known bug where mmftpd hangs, apparently
at mmserver async requests but this has to be further investigated.

mmsoftware/mmftpd/GNUmakefile
mmsoftware/mmftpd/src/mmftpd.c
mmsoftware/mmftpd/src/mmftpd.h
mmsoftware/mmlib/mmserver.c
mmsoftware/mmlib/mmserver.h
mmsoftware/mmlib/mmstr.c

index 22a96d9..742cfa3 100644 (file)
@@ -1,24 +1,22 @@
-# $Id: GNUmakefile,v 1.4 2004/06/11 00:23:38 mmondor Exp $
+# $Id: GNUmakefile,v 1.4.2.1 2005/11/16 01:00:58 mmondor Exp $
 
 MMLIBS := $(addprefix ../mmlib/,mmarch.o mmfd.o mmhash.o mmlimitrate.o \
-mmlog.o mmpath.o mmpool.o mmreadcfg.o mm_pth_pool.o mmserver.o mmstat.o \
-mmstr.o mmstring.o)
-
-PTHINCDIR := $(shell pth-config --cflags)
-PTHLIBDIR := $(shell pth-config --libdir)
+mmlog.o mmpath.o mmpool.o mmreadcfg.o mmserver.o mmstat.o \
+mmstr.o mmstring.o) $(addprefix ../../tests/pthread_utils/,mm_pthread_msg.o \
+mm_pthread_poll.o mm_pthread_pool.o mm_pthread_sleep.o)
 
 OBJS := src/mmftpd.o
 
-CFLAGS += -Wall
+CFLAGS += -Wall -DDEBUG -g
 
 
 all: src/mmftpd
 
 %.o: %.c
-       cc -c ${CFLAGS} -I. -I../mmlib $(PTHINCDIR) -o $@ $<
+       cc -c ${CFLAGS} -I. -I../mmlib -I../../tests/pthread_utils -o $@ $<
 
 src/mmftpd: $(MMLIBS) $(OBJS)
-       cc -o $@ -L$(PTHLIBDIR) -lc -lcrypt -lpth $(OBJS) $(MMLIBS)
+       cc -o $@ -lc -lcrypt -lpthread -lpthread_dbg $(OBJS) $(MMLIBS)
 
 install:
        install -cs -o 0 -g 0 -m 500 src/mmftpd /usr/local/sbin
index 6e71147..9970292 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmftpd.c,v 1.66 2004/09/19 11:02:45 mmondor Exp $ */
+/* $Id: mmftpd.c,v 1.66.2.1 2005/11/16 01:00:58 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
@@ -60,7 +60,7 @@
 #include <ctype.h>
 #include <syslog.h>
 
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmtypes.h>
 #include <mmreadcfg.h>
 #include <mmstring.h>
 #include <mmfifo.h>
 #include <mmstat.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+#include <mm_pthread_sleep.h>
 
 #include "mmftpd.h"
 
@@ -84,7 +88,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmftpd.c,v 1.66 2004/09/19 11:02:45 mmondor Exp $");
+MMRCSID("$Id: mmftpd.c,v 1.66.2.1 2005/11/16 01:00:58 mmondor Exp $");
 
 
 
@@ -109,24 +113,24 @@ MMRCSID("$Id: mmftpd.c,v 1.66 2004/09/19 11:02:45 mmondor Exp $");
 static CONFIG CONF;
 
 /* Used to start transfer threads */
-static pth_attr_t tthreadattr;
+static pthread_attr_t tthreadattr;
 
 /* List of logged in users and optionally current home directory size */
-static pth_mutex_t lusers_lock;
+static pthread_mutex_t lusers_lock;
 static pool_t lusers_pool;
 static hashtable_t lusers_table;
 
 /* This is used so that clientenv structures be allocated/freed fast */
-static pth_mutex_t clenvs_lock;
+static pthread_mutex_t clenvs_lock;
 static pool_t clenvs_pool;
 static pool_t fifos_pool;
 
 /* Pool used to optimize creating/destroying mmfd mutexes */
-static pth_mutex_t mutexes_lock;
+static pthread_mutex_t mutexes_lock;
 static pool_t mutexes_pool;
 
 /* Used for the longer-term directory size cache */
-static pth_mutex_t quota_lock;
+static pthread_mutex_t quota_lock;
 static pool_t quota_pool;
 static hashtable_t quota_table;
 
@@ -300,17 +304,17 @@ static const struct tr_messages tr_msg[] = {
 static fdfuncs gfdf = {
     malloc,
     free,
-    pth_poll,
-    pth_read,
-    pth_write,
-    pth_sleep,
-    pth_usleep,
-    _pth_mutex_create,
-    _pth_mutex_destroy,
-    _pth_mutex_lock,
-    _pth_mutex_unlock,
-    _pth_thread_yield,
-    _pth_eintr
+    poll,
+    read,
+    write,
+    pthread_sleep,
+    pthread_usleep,
+    thread_mutex_create,
+    thread_mutex_destroy,
+    thread_mutex_lock,
+    thread_mutex_unlock,
+    NULL,
+    thread_eintr
 };
 
 /* async_getuserline() uses this for each asynchroneous process */
@@ -551,7 +555,7 @@ auth_pass(clientenv *clenv)
 
        /* We slow down password guessing programs */
        if (clenv->attempts > 0)
-           pth_sleep(clenv->attempts * 2);
+           pthread_sleep(clenv->attempts * 2);
        clenv->attempts++;
 
        if (clenv->anonymous) {
@@ -648,7 +652,7 @@ auth_pass(clientenv *clenv)
        }
        directory_message(clenv, 230);
 
-       pth_mutex_acquire(&lusers_lock, FALSE, NULL);
+       pthread_mutex_lock(&lusers_lock);
 
        /* Verify if user in list */
        len = mm_strlen(clenv->user) + 1;
@@ -657,14 +661,14 @@ auth_pass(clientenv *clenv)
            bool lok = TRUE;
 
            /* Yes, make sure that we observe limits, if any */
-           pth_mutex_acquire(&lun->lock, FALSE, NULL);
+           pthread_mutex_lock(&lun->lock);
            if (clenv->maxlogins != 0) {
                if (lun->logins < clenv->maxlogins)
                    lun->logins++;
                else lok = FALSE;
            } else
                lun->logins++;
-           pth_mutex_release(&lun->lock);
+           pthread_mutex_unlock(&lun->lock);
            if (lok) {
                clenv->unode = lun;
                if (clenv->anonymous)
@@ -697,7 +701,7 @@ auth_pass(clientenv *clenv)
                    nextstate = STATE_ERROR;
                }
                if (lun != NULL) {
-                   pth_mutex_init(&lun->lock);
+                   pthread_mutex_init(&lun->lock, NULL);
                    lun->logins = 1;
                    mm_memcpy(lun->user, clenv->user, len);
                    clenv->unode = lun;
@@ -716,7 +720,7 @@ auth_pass(clientenv *clenv)
            }
        }
 
-       pth_mutex_release(&lusers_lock);
+       pthread_mutex_unlock(&lusers_lock);
     }
 
     return (nextstate);
@@ -2228,9 +2232,9 @@ main_stat(clientenv *clenv)
        {
            long logins;
 
-           pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+           pthread_mutex_lock(&clenv->unode->lock);
            logins = clenv->unode->logins;
-           pth_mutex_release(&clenv->unode->lock);
+           pthread_mutex_unlock(&clenv->unode->lock);
            fdbprintf(fdb, "    Logged in as %s (%ld concurrent logins)\r\n",
                    clenv->user, logins);
        }
@@ -2278,9 +2282,9 @@ main_stat(clientenv *clenv)
 
            fdbprintf(fdb, "  %lld bytes", (int64_t)clenv->maxhomesize);
            fdbwrite(fdb, "\r\n    Current home directory size: ", 36);
-           pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+           pthread_mutex_lock(&clenv->unode->lock);
            size = clenv->unode->homesize;
-           pth_mutex_release(&clenv->unode->lock);
+           pthread_mutex_unlock(&clenv->unode->lock);
            fdbprintf(fdb, "%lld bytes", (int64_t)size);
        } else
            fdbwrite(fdb, "disabled", 8);
@@ -2530,29 +2534,29 @@ main(int argc, char **argv)
     async_init(afuncs, CONF.ASYNC_PROCESSES, uid, gids, ngids);
 
     /* Threading system initialization */
-    pth_init();
+    /* pthread_init(); */
 
     /* Part of mmserver(3)'s async system which has to be initialized after
      * the threading system
      */
-    async_init_pth();
+    async_init_pthread();
 
-    pth_mutex_init(&lusers_lock);
-    pth_mutex_init(&clenvs_lock);
-    pth_mutex_init(&mutexes_lock);
-    pth_mutex_init(&quota_lock);
+    pthread_mutex_init(&lusers_lock, NULL);
+    pthread_mutex_init(&clenvs_lock, NULL);
+    pthread_mutex_init(&mutexes_lock, NULL);
+    pthread_mutex_init(&quota_lock, NULL);
 
     srandom(getpid() + time(NULL));
 
     /* We use those for our transfer ASYNC thread, joinable because when we
      * eventually request it to quit via a message we want to make sure it
-     * ended properly waiting for it with pth_join(). I have decided to use
+     * ended properly waiting for it with pthread_join(). I have decided to use
      * a second thread dedicated to file transfers, this way I can respect
      * the RFC on listening to ABOR and STAT easily on the control connection
      * while transfers are ongoing, with ease and modularity.
      */
-    tthreadattr = pth_attr_new();
-    pth_attr_set(tthreadattr, PTH_ATTR_JOINABLE, TRUE);
+    pthread_attr_init(&tthreadattr);
+    pthread_attr_setdetachstate(&tthreadattr, 0);
 
     /* Initialize our pools */
     /* Client environment nodes */
@@ -2586,6 +2590,7 @@ main(int argc, char **argv)
            POOL_VALID(&mutexes_pool) && POOL_VALID(&quota_pool) &&
            HASHTABLE_VALID(&quota_table) &&
            (!*CONF.DISPLAY_FILE || POOL_VALID(&fifos_pool))) {
+       thread_init();
        fdbcinit(&gfdf, &fdbc, CONF.GBANDWIDTH_IN * 1024,
                CONF.GBANDWIDTH_OUT * 1024);
        /* Finally start our TCP main daemon */
@@ -2864,7 +2869,7 @@ treesize_edit(clientenv *clenv, off_t modify)
     bool ret = TRUE;
 
     if (clenv->maxhomesize && modify != 0) {
-       pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+       pthread_mutex_lock(&clenv->unode->lock);
        cur = clenv->unode->homesize;
        cur += modify;
        if (cur < 0)
@@ -2873,7 +2878,7 @@ treesize_edit(clientenv *clenv, off_t modify)
            clenv->unode->homesize = cur;
        else
            ret = FALSE;
-       pth_mutex_release(&clenv->unode->lock);
+       pthread_mutex_unlock(&clenv->unode->lock);
     }
 
     return (ret);
@@ -2979,11 +2984,11 @@ alloc_clientenv(void)
     clientenv *clenv;
     struct fifonode *fnode = NULL;
 
-    pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
+    pthread_mutex_lock(&clenvs_lock);
     clenv = (clientenv *)pool_alloc(&clenvs_pool, TRUE);
     if (*CONF.DISPLAY_FILE)
        fnode = (struct fifonode *)pool_alloc(&fifos_pool, FALSE);
-    pth_mutex_release(&clenvs_lock);
+    pthread_mutex_unlock(&clenvs_lock);
 
     if (clenv) {
        if (!(*CONF.DISPLAY_FILE) || fnode) {
@@ -2998,9 +3003,9 @@ alloc_clientenv(void)
        free_clientenv(clenv);
     } else {
        if (fnode) {
-           pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
+           pthread_mutex_lock(&clenvs_lock);
            pool_free((pnode_t *)fnode);
-           pth_mutex_release(&clenvs_lock);
+           pthread_mutex_unlock(&clenvs_lock);
        }
     }
 
@@ -3025,15 +3030,14 @@ free_clientenv(clientenv *clenv)
     if (clenv->unode) {
        bool zero = FALSE;
 
-       pth_mutex_acquire(&lusers_lock, FALSE, NULL);
-       pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+       pthread_mutex_lock(&lusers_lock);
+       pthread_mutex_lock(&clenv->unode->lock);
        if ((--clenv->unode->logins) < 1)
            zero = TRUE;
-       pth_mutex_release(&clenv->unode->lock);
+       pthread_mutex_unlock(&clenv->unode->lock);
        if (zero) {
            hashtable_unlink(&lusers_table, (hashnode_t *)clenv->unode);
-           /* May be required by some POSIX thread implementations */
-           /* pth_mutex_destroy(&clenv->unode->lock); */
+           pthread_mutex_destroy(&clenv->unode->lock);
 
            /* We now need to either update the cache entry for this home
             * directory size or to create a new entry for it, but only if
@@ -3049,7 +3053,7 @@ free_clientenv(clientenv *clenv)
                 */
                len = mm_strlen(home) + 1;
                t = time(NULL);
-               pth_mutex_acquire(&quota_lock, FALSE, NULL);
+               pthread_mutex_lock(&quota_lock);
                if ((qnod = (quotanode *)hashtable_lookup(&quota_table, home,
                                len)) != NULL) {
                    qnod->homesize = clenv->unode->homesize;
@@ -3064,18 +3068,20 @@ free_clientenv(clientenv *clenv)
                                qnod->dir, len, FALSE);
                    }
                }
-               pth_mutex_release(&quota_lock);
+               pthread_mutex_unlock(&quota_lock);
            }
            pool_free((pnode_t *)clenv->unode);
        }
-       pth_mutex_release(&lusers_lock);
+       pthread_mutex_unlock(&lusers_lock);
     }
 
-    pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
-    if (fnode != NULL) pool_free((pnode_t *)fnode);
-    if (home != NULL) mmstrfree(home);
+    pthread_mutex_lock(&clenvs_lock);
+    if (fnode != NULL)
+       pool_free((pnode_t *)fnode);
+    if (home != NULL)
+       mmstrfree(home);
     pool_free((pnode_t *)clenv);
-    pth_mutex_release(&clenvs_lock);
+    pthread_mutex_unlock(&clenvs_lock);
 
     return (NULL);
 }
@@ -3085,25 +3091,23 @@ free_clientenv(clientenv *clenv)
 static bool
 start_transfer_thread(clientenv *clenv)
 {
-    /* I do not understand why libpth doesn't provide a function to set the
-     * reply port, or to init the message structure. Moreover, it theoretically
-     * shouldn't need the message size if it simply swaps messages around port
-     * linked list FIFOs via the message node. This currently seems the best
-     * way to go about it
-     */
-    clenv->tmsg.msg.m_size = sizeof(transfermsg);
-    clenv->tmsg.msg.m_replyport = clenv->rport;
+    pthread_msg_init(&clenv->tmsg.msg, &clenv->rport);
 
     /*
-    if ((clenv->tthread = pth_spawn(tthreadattr, (void *)transferthread,
-                   clenv))) {
-       if (transfer_request(REQ_NONE, TRUE, clenv)) return (TRUE);
-       pth_join(clenv->tthread, NULL);
+    if (pthread_create(&clenv->tthread, &tthreadattr, (void *)transferthread,
+                       clenv) == 0) {
+       if (transfer_request(REQ_NONE, TRUE, clenv))
+           return (TRUE);
+       pthread_join(&clenv->tthread, NULL);
     }
     */
-    thread_object_call(NULL, transferthread, clenv);
+    if (pthread_object_call(NULL, transferthread, clenv) != 0)
+       DEBUG_PRINTF("start_transfer_thread", "pthread_object_call()");
+
     if (transfer_request(REQ_NONE, TRUE, clenv))
        return TRUE;
+    else
+       DEBUG_PRINTF("start_transfer_thread", "transfer_request(REQ_NONE)");
 
     return (FALSE);
 }
@@ -3113,7 +3117,7 @@ start_transfer_thread(clientenv *clenv)
 static void
 stop_transfer_thread(clientenv *clenv)
 {
-    if (clenv->sport) {
+    if (pthread_port_valid(&clenv->sport)) {
        /* Send a quit request and wait for reply */
        if (!transfer_request(REQ_QUIT, TRUE, clenv))
            DEBUG_PRINTF("stop_transfer_thread",
@@ -3123,9 +3127,11 @@ stop_transfer_thread(clientenv *clenv)
         * Join thread as we want to make sure it exits before we do.
         */
        /*
-       pth_join(clenv->tthread, NULL);
+       pthread_join(&clenv->tthread, NULL);
        */
     }
+    if (pthread_msg_valid(&clenv->tmsg.msg))
+       pthread_msg_destroy(&clenv->tmsg.msg);
 }
 
 
@@ -3139,33 +3145,34 @@ transfer_request(int req, bool waitreply, clientenv *clenv)
 {
     int res = FALSE;
     transfermsg *msg = &(clenv->tmsg);
-    pth_event_t ring;
 
     if (req != REQ_NONE) {
        msg->request = req;
        msg->result = FALSE;
-       pth_msgport_put(clenv->sport, (pth_message_t *)msg);
+       pthread_msg_put(&clenv->sport, (pthread_msg_t *)msg);
     }
     if (waitreply) {
-       ring = pth_event(PTH_EVENT_TIME, pth_timeout(120, 0));
-       pth_event_concat(clenv->rring, ring, NULL);
        for (;;) {
-           if ((pth_wait(clenv->rring))) {
-               if ((pth_event_occurred(ring))) {
-                   /* Timeout occured waiting for reply */
-                   res = FALSE;
-                   break;
-               }
-               if ((pth_event_occurred(clenv->rring))) {
-                   if ((msg = (transfermsg *)pth_msgport_get(clenv->rport))) {
-                       res = msg->result;
+           if ((msg = (transfermsg *)pthread_msg_get(&clenv->rport))
+                   == NULL) {
+               struct timeval  tv;
+               struct timespec ts, ts1;
+
+               ts1.tv_sec = 120;
+               ts1.tv_nsec = 0;
+               (void) gettimeofday(&tv, NULL);
+               TIMEVAL_TO_TIMESPEC(&tv, &ts);
+               timespecadd(&ts, &ts1, &ts);
+
+               if (pthread_ring_wait(&clenv->rring, &ts) == ETIMEDOUT) {
+                       res = FALSE;
                        break;
-                   }
                }
+           } else {
+               res = msg->result;
+               break;
            }
        }
-       pth_event_isolate(clenv->rring);
-       pth_event_free(ring, PTH_FREE_ALL);
     } else
        res = TRUE;
 
@@ -3510,8 +3517,9 @@ handleclient(unsigned long cid, int fd, clientlistnode *clientlnode,
            }
 
            /* Open our reply port and start our file transfer thread */
-           if ((clenv->rport = pth_msgport_create("XXX(NULL)"))) {
-               clenv->rring = pth_event(PTH_EVENT_MSG, clenv->rport);
+           if (pthread_port_init(&clenv->rport) == 0 &&
+                   pthread_ring_init(&clenv->rring) == 0 &&
+                   pthread_port_set_ring(&clenv->rport, &clenv->rring) == 0) {
 
                /* Init our clientenv as required */
                clenv->fdb = fdb;
@@ -3648,11 +3656,14 @@ handleclient(unsigned long cid, int fd, clientlistnode *clientlnode,
                } else
                    DEBUG_PRINTF("handleclient", "start_transfer_thread()");
 
-               pth_event_free(clenv->rring, PTH_FREE_ALL);
-               while ((pth_msgport_get(clenv->rport)) != NULL) ;
-               pth_msgport_destroy(clenv->rport);
+               while (pthread_msg_get(&clenv->rport) != NULL) ;
            } else
-               DEBUG_PRINTF("handleclient", "pth_msgport_create()");
+               DEBUG_PRINTF("handleclient", "pthread_port_init()");
+
+           if (pthread_port_valid(&clenv->rport))
+               pthread_port_destroy(&clenv->rport);
+           if (pthread_ring_valid(&clenv->rring))
+               pthread_ring_destroy(&clenv->rring);
 
            control_in = FDBBYTESR(fdb);
            control_out = FDBBYTESW(fdb);
@@ -3787,7 +3798,7 @@ pasv_bind(clientenv *clenv, struct sockaddr_in *server, int *sock, int *port)
                        p = CONF.PASV_RANGE_MIN;
                }
                if (err == -1)
-                   pth_sleep(1);
+                   pthread_sleep(1);
            }
            if (err != -1) {
                /* Successful bind(2) */
@@ -3819,7 +3830,7 @@ pasv_bind(clientenv *clenv, struct sockaddr_in *server, int *sock, int *port)
  * other transfer requests when one is ongoing already.
  */
 static void
-transferthread(struct thread_object *obj, void *args)
+transferthread(pthread_object_t *obj, void *args)
 {
     clientenv *clenv = args;
     fdbuf *fdb;
@@ -3838,21 +3849,21 @@ transferthread(struct thread_object *obj, void *args)
     if ((fdb = fdbopen(&gfdf, &fdbc, -1, FDB_BUFSIZE, FDB_BUFSIZE, 1024, 1024,
                    CONF.DATA_TIMEOUT * 1000, CONF.DATA_TIMEOUT * 1000,
                    FALSE)) != NULL) {
-       if ((clenv->sport = pth_msgport_create("XXX(NULL)"))) {
+       if (pthread_port_init(&clenv->sport) == 0) {
            /* Reply that we started thread properly */
            ok = TRUE;
            clenv->tmsg.result = TRUE;
-           pth_msgport_put(clenv->rport, (pth_message_t *)&(clenv->tmsg));
+           pthread_msg_put(&clenv->rport, (pthread_msg_t *)&(clenv->tmsg));
 
            /* Process our tasks */
            transferthread_main(clenv, fdb);
 
-           while ((msg = (transfermsg *)pth_msgport_get(clenv->sport))
+           while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
                    != NULL) {
                msg->result = FALSE;
-               pth_msgport_reply((pth_message_t *)msg);
+               pthread_msg_reply((pthread_msg_t *)msg);
            }
-           pth_msgport_destroy(clenv->sport);
+           pthread_port_destroy(&clenv->sport);
        } else
            DEBUG_PRINTF("transferthread", "pth_msgport_create()");
        fdbclose(fdb);
@@ -3862,11 +3873,11 @@ transferthread(struct thread_object *obj, void *args)
     if (!ok) {
        /* Reply that thread couldn't be setup properly */
        clenv->tmsg.result = FALSE;
-       pth_msgport_put(clenv->rport, (pth_message_t *)&(clenv->tmsg));
+       pthread_msg_put(&clenv->rport, (pthread_msg_t *)&(clenv->tmsg));
     }
 
     /*
-    pth_exit(NULL);
+    pthread_exit(NULL);
     */
 }
 
@@ -3915,10 +3926,10 @@ transferthread(struct thread_object *obj, void *args)
 static void
 transferthread_main(clientenv *clenv, fdbuf *fdb)
 {
-    int state = TTSTATE_INITIAL, l, reason;
+    int state = TTSTATE_INITIAL, l, reason, err;
     register char *from, *to;
     char ipaddr[20], buffer[T_BUFSIZE], *from2;
-    pth_event_t ring, ring1;
+    pthread_ring_t ring;
     transfermsg *msg;
     struct sockaddr_in server, client, addr;
     socklen_t addrl;
@@ -3933,7 +3944,8 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
     fdbparam_set(fdb, -1, FDBP_FD);
 
     /* Transfer device entry setup */
-    ring = pth_event(PTH_EVENT_MSG, clenv->sport);
+    pthread_ring_init(&ring);
+    pthread_port_set_ring(&clenv->sport, &ring);
 
     /* Main device loop */
     while (state != TTSTATE_DONE) {
@@ -3965,109 +3977,107 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
 
            /* State main loop */
            while (state == TTSTATE_INITIAL) {
-               if ((pth_wait(ring))) {
-                   if ((pth_event_occurred(ring))) {
-                       while ((msg = (transfermsg *)pth_msgport_get(
-                                       clenv->sport)) != NULL) {
-                           /* Process request and reply */
-                           switch (msg->request) {
-                           case REQ_QUIT:
-                               msg->result = TRUE;
-                               state = TTSTATE_DONE;
-                               break;
-                           case REQ_ABORT:
-                               /* As we handle PASV port, do this */
-                               if (fdpasv != -1) {
-                                   close(fdpasv);
-                                   fdpasv = -1;
+               if (pthread_port_pending(&clenv->sport) < 1)
+                   pthread_ring_wait(&ring, NULL);
+               while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
+                       != NULL) {
+                   /* Process request and reply */
+                   switch (msg->request) {
+                       case REQ_QUIT:
+                           msg->result = TRUE;
+                           state = TTSTATE_DONE;
+                           break;
+                       case REQ_ABORT:
+                           /* As we handle PASV port, do this */
+                           if (fdpasv != -1) {
+                               close(fdpasv);
+                               fdpasv = -1;
+                           }
+                           port = 0;
+                           passive = ongoing = FALSE;
+                           download = TRUE;
+                           list = 0;
+                           *path = '\0';
+                           msg->result = TRUE;
+                           break;
+                       case REQ_STATUS:
+                           msg->port = (u_int16_t)port;
+                           msg->passive = passive;
+                           msg->ongoing = ongoing;
+                           msg->download = download;
+                           msg->list = list;
+                           msg->rbytes = FDBBYTESR(fdb);
+                           msg->wbytes = FDBBYTESW(fdb);
+                           msg->dlfiles = dlfiles;
+                           msg->ulfiles = ulfiles;
+                           msg->result = TRUE;
+                           break;
+                       case REQ_NEWPORT:
+                           port = 0;
+                           if (fd != -1) {
+                               fdbflushw(fdb);
+                               shutdown(fd, SHUT_RDWR);
+                               close(fd);
+                               fd = -1;
+                               fdbparam_set(fdb, -1, FDBP_FD);
+                           }
+                           /* As we handle PASV port, do this */
+                           if (fdpasv != -1) {
+                               close(fdpasv);
+                               fdpasv = -1;
+                           }
+                           if (msg->passive) {
+                               /* PASV request, we need to open a high
+                                * port and start listening for a
+                                * connection on it
+                                */
+                               msg->result = FALSE;    /* Default */
+                               if (pasv_bind(clenv, &server, &fdpasv,
+                                           &port)) {
+                                   msg->port = (u_int16_t)port;
+                                   passive = msg->result = TRUE;
+                                   break;
                                }
-                               port = 0;
-                               passive = ongoing = FALSE;
-                               download = TRUE;
-                               list = 0;
-                               *path = '\0';
-                               msg->result = TRUE;
-                               break;
-                           case REQ_STATUS:
-                               msg->port = (u_int16_t)port;
-                               msg->passive = passive;
-                               msg->ongoing = ongoing;
-                               msg->download = download;
-                               msg->list = list;
-                               msg->rbytes = FDBBYTESR(fdb);
-                               msg->wbytes = FDBBYTESW(fdb);
-                               msg->dlfiles = dlfiles;
-                               msg->ulfiles = ulfiles;
+                           } else {
+                               /* Normal PORT, just remember necessary
+                                * things. The caller should have
+                                * performed address and port sanity
+                                * checking already.
+                                */
+                               port = (int)msg->port;
+                               passive = FALSE;
                                msg->result = TRUE;
-                               break;
-                           case REQ_NEWPORT:
-                               port = 0;
-                               if (fd != -1) {
-                                   fdbflushw(fdb);
-                                   shutdown(fd, SHUT_RDWR);
-                                   close(fd);
-                                   fd = -1;
-                                   fdbparam_set(fdb, -1, FDBP_FD);
-                               }
-                               /* As we handle PASV port, do this */
-                               if (fdpasv != -1) {
-                                   close(fdpasv);
-                                   fdpasv = -1;
-                               }
-                               if (msg->passive) {
-                                   /* PASV request, we need to open a high
-                                    * port and start listening for a
-                                    * connection on it
-                                    */
-                                   msg->result = FALSE;        /* Default */
-                                   if (pasv_bind(clenv, &server, &fdpasv,
-                                               &port)) {
-                                       msg->port = (u_int16_t)port;
-                                       passive = msg->result = TRUE;
-                                       break;
-                                   }
-                               } else {
-                                   /* Normal PORT, just remember necessary
-                                    * things. The caller should have
-                                    * performed address and port sanity
-                                    * checking already.
-                                    */
-                                   port = (int)msg->port;
-                                   passive = FALSE;
+                           }
+                           break;
+                       case REQ_TRANSFER:
+                           msg->result = FALSE;        /* Default */
+                           if (port != 0) {
+                               fdbparam_set(fdb, msg->rrate * 1024,
+                                       FDBP_RRATE);
+                               fdbparam_set(fdb, msg->wrate * 1024,
+                                       FDBP_WRATE);
+                               if (msg->path) mm_strncpy(path, msg->path,
+                                       MMPATH_MAX - 1);
+                               else *path = '\0';
+                               if (msg->list) {
+                                   list = msg->list;
                                    msg->result = TRUE;
-                               }
-                               break;
-                           case REQ_TRANSFER:
-                               msg->result = FALSE;    /* Default */
-                               if (port != 0) {
-                                   fdbparam_set(fdb, msg->rrate * 1024,
-                                           FDBP_RRATE);
-                                   fdbparam_set(fdb, msg->wrate * 1024,
-                                           FDBP_WRATE);
-                                   if (msg->path) mm_strncpy(path, msg->path,
-                                           MMPATH_MAX - 1);
-                                   else *path = '\0';
-                                   if (msg->list) {
-                                       list = msg->list;
+                               } else {
+                                   if (msg->file != -1) {
+                                       file = msg->file;
+                                       download = msg->download;
                                        msg->result = TRUE;
-                                   } else {
-                                       if (msg->file != -1) {
-                                           file = msg->file;
-                                           download = msg->download;
-                                           msg->result = TRUE;
-                                       }
                                    }
-                                   if (msg->result)
-                                       state = TTSTATE_CONNECT;
-                               } else
-                                   msg->result = FALSE;
-                               break;
-                           default:
+                               }
+                               if (msg->result)
+                                   state = TTSTATE_CONNECT;
+                           } else
                                msg->result = FALSE;
-                           }
-                           pth_msgport_reply((pth_message_t *)msg);
-                       }
+                           break;
+                       default:
+                           msg->result = FALSE;
                    }
+                   pthread_msg_reply((pthread_msg_t *)msg);
                }
            }
 
@@ -4077,18 +4087,27 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
 
            /* State entry setup */
            ongoing = TRUE;
-           /* Setup our connect timeout event */
-           ring1 = pth_event(PTH_EVENT_TIME, pth_timeout(CONF.DATA_TIMEOUT,
-                       0));
-           pth_event_concat(ring, ring1, NULL);
 
            /* State main loop */
            while (state == TTSTATE_CONNECT) {
                if (passive) {
-                   /* Accept connection from client */
+                   /*
+                    * Accept connection from client.
+                    * On timeout, ETIMEDOUT is returned, on interrupting
+                    * message, ECANCELED is.  Other errror codes could be
+                    * returned, or the new file descriptor.
+                    */
                    addrl = sizeof(struct sockaddr);
-                   fd = pth_accept_ev(fdpasv, (struct sockaddr *)&addr,
-                           &addrl, ring);
+                   if ((err = pthread_accept_ring(fdpasv,
+                                   (struct sockaddr *)&addr,
+                                   &addrl, CONF.DATA_TIMEOUT * 1000,
+                                   &ring)) > -1) {
+                       fd = err;
+                       err = 0;
+                   } else {
+                       fd = -1;
+                       err = errno;
+                   }
                } else {
                    /* Start connection to the client */
                    if ((fd = socket(AF_INET, SOCK_STREAM, 0)) != -1) {
@@ -4096,10 +4115,35 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                        client.sin_family = AF_INET;
                        client.sin_addr.s_addr = clenv->cipaddr;
                        client.sin_port = htons(port);
-                       /* Finally attempt connection */
-                       if ((pth_connect_ev(fd, (struct sockaddr *)&client,
+                       /*
+                        * Finally attempt connection.
+                        * On timeout, ETIMEDOUT is returned, on interrupting
+                        * message, ECANCELED is.  Other error codes could be
+                        * returned, or 0 on success.  In the case of any of
+                        * the two special error codes we just described, the
+                        * connection request is still pending in the kernel.
+                        * We can continue calling the following function
+                        * until we get connected, which can return EISCONN if
+                        * it already was connected, or we could monitor the
+                        * status of the connection using getsockopt(2) with
+                        * SOL_SOCKET level and SO_ERROR option.  If we want
+                        * to abort the in-progress connection, we need to use
+                        * close(2).
+                        */
+                       /*
+                        * XXX Hmm we should loop here! Not create new sockets
+                        * again in the loop!  Unless we wanted to make
+                        * pthread_connect_ring() automatically cancel any
+                        * connection closing the descriptor upon ETIMEDOUT or
+                        * ECANCELED return codes, in which case we could
+                        * continue the current behavior.
+                        */
+                       if ((pthread_connect_ring(fd,
+                                       (struct sockaddr *)&client,
                                        sizeof(struct sockaddr_in),
-                                       ring)) == -1) {
+                                       CONF.DATA_TIMEOUT * 1000,
+                                       &ring)) != 0 && errno != ETIMEDOUT &&
+                                       errno != ECANCELED) {
                            mmsyslog(1, LOGLEVEL,
                                    "%08X PORT data connection failiure",
                                    clenv->id);
@@ -4109,7 +4153,13 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                            REGISTER_ERROR();
                            state = TTSTATE_INITIAL;
                            break;
-                       }
+                       } else if (errno == ETIMEDOUT || errno == ECANCELED) {
+                       /* XXX For now */
+                           (void) close(fd);
+                           fd = -1;
+                           err = ETIMEDOUT;
+                       } else
+                           err = 0;
                    } else {
                        DEBUG_PRINTF("transferthread_main-2", "socket()");
                        reply(clenv->fdb, 425, FALSE,
@@ -4121,7 +4171,7 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                    }
                }
                /* Verify if connection timeout occured */
-               if ((pth_event_occurred(ring1))) {
+               if (err == ETIMEDOUT) {
                    mmsyslog(1, LOGLEVEL, "%08X Data connection timeout",
                            clenv->id);
                    reply(clenv->fdb, 425, FALSE,
@@ -4131,37 +4181,39 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                    state = TTSTATE_INITIAL;
                    break;
                }
-               /* Check for any request events and process them if any */
-               if ((pth_event_occurred(ring))) {
-                   while ((msg = (transfermsg *)pth_msgport_get(
-                                   clenv->sport)) != NULL) {
-                       /* Process request and reply */
-                       switch (msg->request) {
-                       case REQ_QUIT:
-                           msg->result = TRUE;
-                           state = TTSTATE_DONE;
-                           break;
-                       case REQ_ABORT:
-                           msg->result = TRUE;
-                           state = TTSTATE_INITIAL;
-                           break;
-                       case REQ_STATUS:
-                           msg->port = (u_int16_t)port;
-                           msg->passive = passive;
-                           msg->ongoing = ongoing;
-                           msg->download = download;
-                           msg->list = list;
-                           msg->rbytes = FDBBYTESR(fdb);
-                           msg->wbytes = FDBBYTESW(fdb);
-                           msg->dlfiles = dlfiles;
-                           msg->ulfiles = ulfiles;
-                           msg->result = TRUE;
-                           break;
-                       default:
-                           msg->result = FALSE;
-                       }
-                       pth_msgport_reply((pth_message_t *)msg);
+               /*
+                * Check for any request events and process them if any.
+                * ECANCELED above can also cause this code execution to be
+                * triggered fast.
+                */
+               while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
+                       != NULL) {
+                   /* Process request and reply */
+                   switch (msg->request) {
+                   case REQ_QUIT:
+                       msg->result = TRUE;
+                       state = TTSTATE_DONE;
+                       break;
+                   case REQ_ABORT:
+                       msg->result = TRUE;
+                       state = TTSTATE_INITIAL;
+                       break;
+                   case REQ_STATUS:
+                       msg->port = (u_int16_t)port;
+                       msg->passive = passive;
+                       msg->ongoing = ongoing;
+                       msg->download = download;
+                       msg->list = list;
+                       msg->rbytes = FDBBYTESR(fdb);
+                       msg->wbytes = FDBBYTESW(fdb);
+                       msg->dlfiles = dlfiles;
+                       msg->ulfiles = ulfiles;
+                       msg->result = TRUE;
+                       break;
+                   default:
+                       msg->result = FALSE;
                    }
+                   pthread_msg_reply((pthread_msg_t *)msg);
                }
                if (state == TTSTATE_CONNECT) {
                    /* Verify if connection was established */
@@ -4196,8 +4248,6 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
            }
 
            /* State exit cleanup */
-           pth_event_isolate(ring);
-           pth_event_free(ring1, PTH_FREE_ALL);
 
        } else if (state == TTSTATE_TRANSFER) { /* TRANSFER STATE */
 
@@ -4266,7 +4316,7 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                                    TYPE_IMAGE ? fdbrcleartosend(fdb) :
                                    fdbcleartosend(fdb))) {
                            /* Client ready to be written some bytes to */
-                           if ((l = pth_read(file, buffer, T_BUFSIZE)) > 0) {
+                           if ((l = read(file, buffer, T_BUFSIZE)) > 0) {
                                if (clenv->type == TYPE_ASCII) {
                                    /* ASCII xfer, perform \n to \r\n
                                     * conversion
@@ -4326,39 +4376,37 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                            break;
                        }
                        /* Verify if we received any important events */
-                       if ((pth_event_occurred(ring))) {
-                           while ((msg = (transfermsg *)pth_msgport_get(
-                                           clenv->sport)) != NULL) {
-                               /* Process request and reply */
-                               switch (msg->request) {
-                               case REQ_QUIT:
-                                   msg->result = TRUE;
-                                   state = TTSTATE_DONE;
-                                   cont = FALSE;
-                                   break;
-                               case REQ_ABORT:
-                                   msg->result = TRUE;
-                                   state = TTSTATE_INITIAL;
-                                   cont = FALSE;
-                                   reason = TR_ABORTED;
-                                   break;
-                               case REQ_STATUS:
-                                   msg->port = (u_int16_t)port;
-                                   msg->passive = passive;
-                                   msg->ongoing = ongoing;
-                                   msg->download = download;
-                                   msg->list = list;
-                                   msg->rbytes = FDBBYTESR(fdb);
-                                   msg->wbytes = FDBBYTESW(fdb);
-                                   msg->dlfiles = dlfiles;
-                                   msg->ulfiles = ulfiles;
-                                   msg->result = TRUE;
-                                   break;
-                               default:
-                                   msg->result = FALSE;
-                               }
-                               pth_msgport_reply((pth_message_t *)msg);
+                       while ((msg = (transfermsg *)pthread_msg_get(
+                                       &clenv->sport)) != NULL) {
+                           /* Process request and reply */
+                           switch (msg->request) {
+                           case REQ_QUIT:
+                               msg->result = TRUE;
+                               state = TTSTATE_DONE;
+                               cont = FALSE;
+                               break;
+                           case REQ_ABORT:
+                               msg->result = TRUE;
+                               state = TTSTATE_INITIAL;
+                               cont = FALSE;
+                               reason = TR_ABORTED;
+                               break;
+                           case REQ_STATUS:
+                               msg->port = (u_int16_t)port;
+                               msg->passive = passive;
+                               msg->ongoing = ongoing;
+                               msg->download = download;
+                               msg->list = list;
+                               msg->rbytes = FDBBYTESR(fdb);
+                               msg->wbytes = FDBBYTESW(fdb);
+                               msg->dlfiles = dlfiles;
+                               msg->ulfiles = ulfiles;
+                               msg->result = TRUE;
+                               break;
+                           default:
+                               msg->result = FALSE;
                            }
+                           pthread_msg_reply((pthread_msg_t *)msg);
                        }
                    }
 
@@ -4384,9 +4432,9 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                                        if (*from == '\r') {
                                            if (treesize_edit(clenv,
                                                        from - from2)) {
-                                               if ((l = pth_write(file,
-                                                       from2, from - from2))
-                                                       != from - from2) {
+                                               if ((l = write(file, from2,
+                                                   from - from2)) !=
+                                                       from - from2) {
                                                    mmsyslog(0, LOGLEVEL,
                                                            "%08X Error "
                                                            "writing to "
@@ -4413,9 +4461,8 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                                    }
                                    if (l && from2 < to) {
                                        if (treesize_edit(clenv, to - from2)) {
-                                           if ((l = pth_write(file, from2,
-                                                           to - from2))
-                                                   != to - from2) {
+                                           if ((l = write(file, from2,
+                                               to - from2)) != to - from2) {
                                                mmsyslog(0, LOGLEVEL,
                                                        "%08X Error writing "
                                                        "to disk!",
@@ -4442,8 +4489,7 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                                    /* IMAGE/binary transfer, write data as-is
                                     */
                                    if (treesize_edit(clenv, l)) {
-                                       if ((pth_write(file, buffer, l)) !=
-                                               l) {
+                                       if ((write(file, buffer, l)) != l) {
                                            mmsyslog(0, LOGLEVEL,
                                                    "%08X Error writing to "
                                                    "disk!", clenv->id);
@@ -4481,39 +4527,37 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
                            break;
                        }
                        /* Verify if we received any important events */
-                       if ((pth_event_occurred(ring))) {
-                           while ((msg = (transfermsg *)pth_msgport_get(
-                                           clenv->sport)) != NULL) {
-                               /* Process request and reply */
-                               switch (msg->request) {
-                               case REQ_QUIT:
-                                   msg->result = TRUE;
-                                   state = TTSTATE_DONE;
-                                   cont = FALSE;
-                                   break;
-                               case REQ_ABORT:
-                                   msg->result = TRUE;
-                                   state = TTSTATE_INITIAL;
-                                   cont = FALSE;
-                                   reason = TR_ABORTED;
-                                   break;
-                               case REQ_STATUS:
-                                   msg->port = (u_int16_t)port;
-                                   msg->passive = passive;
-                                   msg->ongoing = ongoing;
-                                   msg->download = download;
-                                   msg->list = list;
-                                   msg->rbytes = FDBBYTESR(fdb);
-                                   msg->wbytes = FDBBYTESW(fdb);
-                                   msg->dlfiles = dlfiles;
-                                   msg->ulfiles = ulfiles;
-                                   msg->result = TRUE;
-                                   break;
-                               default:
-                                   msg->result = FALSE;
-                               }
-                               pth_msgport_reply((pth_message_t *)msg);
+                       while ((msg = (transfermsg *)pthread_msg_get(
+                                           &clenv->sport)) != NULL) {
+                           /* Process request and reply */
+                           switch (msg->request) {
+                           case REQ_QUIT:
+                               msg->result = TRUE;
+                               state = TTSTATE_DONE;
+                               cont = FALSE;
+                               break;
+                           case REQ_ABORT:
+                               msg->result = TRUE;
+                               state = TTSTATE_INITIAL;
+                               cont = FALSE;
+                               reason = TR_ABORTED;
+                               break;
+                           case REQ_STATUS:
+                               msg->port = (u_int16_t)port;
+                               msg->passive = passive;
+                               msg->ongoing = ongoing;
+                               msg->download = download;
+                               msg->list = list;
+                               msg->rbytes = FDBBYTESR(fdb);
+                               msg->wbytes = FDBBYTESW(fdb);
+                               msg->dlfiles = dlfiles;
+                               msg->ulfiles = ulfiles;
+                               msg->result = TRUE;
+                               break;
+                           default:
+                               msg->result = FALSE;
                            }
+                           pthread_msg_reply((pthread_msg_t *)msg);
                        }
                    }
                }
@@ -4544,70 +4588,72 @@ transferthread_main(clientenv *clenv, fdbuf *fdb)
     }
     if (fdpasv != -1) close(fdpasv);
 
-    pth_event_free(ring, PTH_FREE_ALL);
+    pthread_ring_destroy(&ring);
 }
 
 
 /* mmfd library thread support functions */
 
+static pthread_mutexattr_t     thread_ma;
+
+static void
+thread_init(void)
+{
+    (void) pthread_mutexattr_init(&thread_ma);
+    (void) pthread_mutexattr_settype(&thread_ma, PTHREAD_MUTEX_RECURSIVE);
+}
 
 static void *
-_pth_mutex_create(void)
+thread_mutex_create(void)
 {
     struct mutexnode *mnod;
 
-    pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+    pthread_mutex_lock(&mutexes_lock);
     mnod = (struct mutexnode *)pool_alloc(&mutexes_pool, FALSE);
-    pth_mutex_release(&mutexes_lock);
+    pthread_mutex_unlock(&mutexes_lock);
 
-    if (mnod)
-       pth_mutex_init(&mnod->mutex);
+    if (mnod != NULL)
+       pthread_mutex_init(&mnod->mutex, &thread_ma);
 
     return ((void *)mnod);
 }
 
 
 static void *
-_pth_mutex_destroy(void *mtx)
+thread_mutex_destroy(void *mtx)
 {
-    /* struct mutexnode *mnod = mtx; */
+    struct mutexnode *mnod = mtx;
 
-    /* pth_mutex_destroy(&mnod->mutex); */
-    pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+    pthread_mutex_destroy(&mnod->mutex);
+
+    pthread_mutex_lock(&mutexes_lock);
     pool_free(mtx);
-    pth_mutex_release(&mutexes_lock);
+    pthread_mutex_unlock(&mutexes_lock);
 
     return (NULL);
 }
 
 
 static void
-_pth_mutex_lock(void *mtx)
+thread_mutex_lock(void *mtx)
 {
     struct mutexnode *mnod = mtx;
 
-    pth_mutex_acquire(&mnod->mutex, FALSE, NULL);
+    pthread_mutex_lock(&mnod->mutex);
 }
 
 
 static void
-_pth_mutex_unlock(void *mtx)
+thread_mutex_unlock(void *mtx)
 {
     struct mutexnode *mnod = mtx;
 
-    pth_mutex_release(&mnod->mutex);
-}
-
-
-static void
-_pth_thread_yield(void)
-{
-    pth_yield(NULL);
+    pthread_mutex_unlock(&mnod->mutex);
 }
 
 
 static bool
-_pth_eintr(void)
+thread_eintr(void)
 {
     if (errno == EINTR)
        return TRUE;
@@ -4774,7 +4820,7 @@ treesize(clientenv *clenv, const char *path, off_t min)
     found = FALSE;
     size = 0;
     len = mm_strlen(path) + 1;
-    pth_mutex_acquire(&quota_lock, FALSE, NULL);
+    pthread_mutex_lock(&quota_lock);
     if ((qnod = (quotanode *)hashtable_lookup(&quota_table, path, len))
            != NULL) {
        struct stat st;
@@ -4792,7 +4838,7 @@ treesize(clientenv *clenv, const char *path, off_t min)
            }
        }
     }
-    pth_mutex_release(&quota_lock);
+    pthread_mutex_unlock(&quota_lock);
 
     if (found)
        return (size);
index a3dacd4..5208cf2 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmftpd.h,v 1.25 2004/09/19 10:59:08 mmondor Exp $ */
+/* $Id: mmftpd.h,v 1.25.2.1 2005/11/16 01:00:58 mmondor Exp $ */
 
 /*
  * Copyright (C) 2001-2004, Matthew Mondor
@@ -43,7 +43,7 @@
 /* HEADERS */
 
 #include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmfd.h>
 #include <mmpath.h>
 #include <mmfifo.h>
 #include <mmstat.h>
 #include <mmserver.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_sleep.h>
 
 
 
@@ -132,7 +135,7 @@ enum transferthread_states {
 #define REGISTER_ERROR() do { \
     clenv->errors++; \
     if (CONF.DELAY_ON_ERROR) \
-       pth_sleep(clenv->errors); \
+       pthread_sleep(clenv->errors); \
 } while (FALSE)
 
 #define REGISTER_PERMISSION() do { \
@@ -169,7 +172,7 @@ typedef struct config {
 
 /* We communicate with our transfer thread using this message structure */
 typedef struct transfermsg {
-    pth_message_t msg;           /* Internal message node */
+    pthread_msg_t msg;           /* Internal message node */
     int request;                 /* Request to send to device */
     bool result;                 /* Result status after device request */
     u_int16_t port;              /* PORT or PASV port for next transfer */
@@ -208,9 +211,9 @@ typedef struct clientenv {
     struct lusernode *unode;     /* User's lusernode, for quick lookup */
     char cwd[MMPATH_MAX];        /* User's current directory (chrooted) */
     struct transfermsg tmsg;     /* Only need one shared msg with tthread */
-    pth_msgport_t rport, sport;   /* Message ports (tthread and ours) */
-    pth_event_t rring;           /* Ring of events we wait for */
-    pth_t tthread;               /* Thread ID of our transfer thread */
+    pthread_port_t rport, sport;  /* Message ports (tthread and ours) */
+    pthread_ring_t rring;        /* Ring of events we wait for */
+    pthread_t tthread;           /* Thread ID of our transfer thread */
     long errors;                 /* Total number of errors that occured */
     long attempts;               /* Auth state temporary counter */
     long maxlogins;              /* Allowed simultanious logins for user */
@@ -242,7 +245,7 @@ typedef struct clientenv {
 typedef struct lusernode {
     hashnode_t node;
     char user[32];             /* User name (key) */
-    pth_mutex_t lock;          /* For treesize_edit() */
+    pthread_mutex_t lock;      /* For treesize_edit() */
     long logins;               /* Simultanious current logins for user */
     off_t homesize;            /* Current home directory size for user */
 } lusernode;
@@ -269,7 +272,7 @@ struct commandnode {
 /* Used for mmfd thread support delegation/abstraction */
 struct mutexnode {
     pnode_t node;
-    pth_mutex_t mutex;
+    pthread_mutex_t mutex;
 };
 
 /* Used to efficiently allocate FIFO buffers for recently visited directories
@@ -420,15 +423,15 @@ static int handleclient(unsigned long, int, clientlistnode *, struct iface *,
        struct async_clenv *);
 
 static bool pasv_bind(clientenv *, struct sockaddr_in *, int *, int *);
-static void transferthread(struct thread_object *, void *);
+static void transferthread(pthread_object_t *, void *);
 static void transferthread_main(clientenv *, fdbuf *);
 
-static void *_pth_mutex_create(void);
-static void *_pth_mutex_destroy(void *);
-static void _pth_mutex_lock(void *);
-static void _pth_mutex_unlock(void *);
-static void _pth_thread_yield(void);
-static bool _pth_eintr(void);
+static void thread_init(void);
+static void *thread_mutex_create(void);
+static void *thread_mutex_destroy(void *);
+static void thread_mutex_lock(void *);
+static void thread_mutex_unlock(void *);
+static bool thread_eintr(void);
 static bool eintr(void);
 
 static void async_checkpw(struct async_msg *);
index cc5e251..ba6e83f 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmserver.c,v 1.34 2005/02/20 01:18:01 mmondor Exp $ */
+/* $Id: mmserver.c,v 1.34.2.1 2005/11/16 01:00:58 mmondor Exp $ */
 
 /*
  * Copyright (C) 2000-2004, Matthew Mondor
@@ -45,6 +45,8 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <stdio.h>
+#include <string.h>
+#include <errno.h>
 
 #include <netdb.h>
 #include <netinet/in.h>
@@ -58,7 +60,7 @@
 
 #include <syslog.h>
 
-#include <pth.h>
+#include <pthread.h>
 #include <signal.h>
 #include <sys/poll.h>
 #include <time.h>
 #include <mmreadcfg.h>
 #include <mmlimitrate.h>
 #include <mmlog.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_pool.h>
+#include <mm_pthread_msg.h>
+#include <mm_pthread_poll.h>
+#include <mm_pthread_sleep.h>
 
 
 
 
 MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmserver.c,v 1.34 2005/02/20 01:18:01 mmondor Exp $");
+MMRCSID("$Id: mmserver.c,v 1.34.2.1 2005/11/16 01:00:58 mmondor Exp $");
 
 
 
@@ -94,7 +100,7 @@ MMRCSID("$Id: mmserver.c,v 1.34 2005/02/20 01:18:01 mmondor Exp $");
 
 static void                    sighandler(int);
 static struct iface *          parse_ifaces(char *, char *);
-static void                    phandleclient(struct thread_object *, void *);
+static void                    phandleclient(pthread_object_t *, void *);
 static void                    writepidfile(const char *);
 static void *                  async_thread(void *);
 static struct async_clenv *    async_open_clenv(void);
@@ -121,7 +127,7 @@ static bool                         clnode_expire_thread_iterator(hashnode_t *,
 static int             (*handleclientfunc)(unsigned long, int,
                                clientlistnode *, struct iface *,
                                struct async_clenv *);
-static pth_mutex_t     ctable_lock, ppool_lock;
+static pthread_mutex_t ctable_lock, ppool_lock;
 static pool_t          cpool, ppool;
 static hashtable_t     ctable;
 static struct async_env *async = NULL;
@@ -147,7 +153,7 @@ const char *const mms_reasons[MMS_MAX] = {
 
 /* MAIN DAEMON PROGRAM */
 
-/* Before calling this function, the async_init() and async_init_pth()
+/* Before calling this function, the async_init() and async_init_pthread()
  * functions should have been called. Before any async_*() or tcp_server()
  * calls the process is expected to have called make_daemon().
  */
@@ -168,9 +174,9 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
     struct ifacendx *fdsi;
     struct iface *ifaces, *tif;
     unsigned long id;
-    pth_t clnode_thread;
-    pth_attr_t threadattr, clnode_threadattr;
-    int tcp_proto;
+    pthread_t clnode_thread;
+    pthread_attr_t threadattr, clnode_threadattr;
+    int tcp_proto, clnode_thread_started, err;
 
     sinaddr = (struct sockaddr_in *)&addr;
     handleclientfunc = handleclient1;
@@ -180,14 +186,15 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
     fds = NULL;
     fdsi = NULL;
     ifaces = NULL;
+    clnode_thread_started = -1;
 
-    /* Pth related */
-    threadattr = pth_attr_new();
-    pth_attr_set(threadattr, PTH_ATTR_JOINABLE, FALSE);
-    clnode_threadattr = pth_attr_new();
-    pth_attr_set(clnode_threadattr, PTH_ATTR_JOINABLE, TRUE);
-    pth_mutex_init(&ctable_lock);
-    pth_mutex_init(&ppool_lock);
+    /* Pthread related */
+    pthread_attr_init(&threadattr);
+    pthread_attr_setdetachstate(&threadattr, 1);
+    pthread_attr_init(&clnode_threadattr);
+    pthread_attr_setdetachstate(&clnode_threadattr, 0);
+    pthread_mutex_init(&ctable_lock, NULL);
+    pthread_mutex_init(&ppool_lock, NULL);
 
     /* Used by resolve_hostname() */
     if (!mmstrinit(malloc, free, 16384)) {
@@ -209,8 +216,8 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                sizeof(phandleinfo), 16384 / sizeof(phandleinfo), 0, 0) ||
            !hashtable_init(&ctable, "ctable", maxips, 1, malloc, free,
                clnode_keycmp, clnode_keyhash, FALSE) ||
-           ((clnode_thread = pth_spawn(clnode_threadattr,
-                               clnode_expire_thread, &rateper)) == NULL)) {
+           ((clnode_thread_started = pthread_create(&clnode_thread,
+               &clnode_threadattr, clnode_expire_thread, &rateper))) != 0) {
        syslog(LOG_NOTICE, "tcp_server() - Out of memory");
        closelog();
        mmstrexit();
@@ -218,8 +225,9 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
        exit(-1);
     }
 
-    if (thread_object_init() == -1) {
-       syslog(LOG_NOTICE, "tcp_server() - thread_object_init()");
+    if ((err = pthread_object_init(8)) != 0) {
+       syslog(LOG_NOTICE, "tcp_server() - pthread_object_init() - %s",
+               strerror(err));
        exit(-1);
     }
 
@@ -343,9 +351,9 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
     if (ret) {
        free(ifaces);
        mmstrexit();
-       if (clnode_thread != NULL) {
-           pth_abort(clnode_thread);
-           pth_join(clnode_thread, NULL);
+       if (clnode_thread_started == 0) {
+           pthread_cancel(clnode_thread);
+           pthread_join(clnode_thread, NULL);
        }
        if (HASHTABLE_VALID(&ctable))
            hashtable_destroy(&ctable, FALSE);
@@ -359,7 +367,7 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
     /* Start answering and dispatching connections */
     syslog(LOG_NOTICE, "Started for uid: %d", uid);
     for (;;) {
-       if ((nifaces2 = pth_poll(fds, nifaces, -1)) > 0) {
+       if ((nifaces2 = poll(fds, nifaces, -1)) > 0) {
            /* Loop but once only, and only for long as nifaces2 times.
             * Use our fast index to reference to the interface structure.
             */
@@ -367,14 +375,13 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                if (fds[i].revents & POLLIN) {
                    nifaces2--;
                    addrl = sizeof(struct sockaddr);
-                   if ((msgsock = pth_accept(fds[i].fd, &addr, &addrl))
-                           != -1) {
+                   if ((msgsock = accept(fds[i].fd, &addr, &addrl)) != -1) {
                        /* Make sure that we respect connection and rate
                         * limits.
                         */
                        ok = FALSE;
                        reason = MMS_NORMAL;
-                       pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+                       pthread_mutex_lock(&ctable_lock);
                        if ((clnode = (clientlistnode *)hashtable_lookup(
                                        &ctable, &sinaddr->sin_addr.s_addr,
                                        sizeof(u_int32_t))) == NULL) {
@@ -415,15 +422,15 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                            } else
                                reason = MMS_CONPERADDRESS_EXCEEDED;
                        }
-                       pth_mutex_release(&ctable_lock);
+                       pthread_mutex_unlock(&ctable_lock);
 
                        if (ok) {
                            phandleinfo *phi;
 
                            /* Dispatch client to a thread */
-                           pth_mutex_acquire(&ppool_lock, FALSE, NULL);
+                           pthread_mutex_lock(&ppool_lock);
                            phi = (phandleinfo *)pool_alloc(&ppool, FALSE);
-                           pth_mutex_release(&ppool_lock);
+                           pthread_mutex_unlock(&ppool_lock);
                            if (phi != NULL) {
                                id++;
                                phi->id = id;
@@ -431,15 +438,14 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                                phi->clnode = clnode;
                                /* Indexed reference */
                                phi->iface = fdsi[i].iface;
-                               if (thread_object_call(NULL, phandleclient,
-                                           phi) != -1)
+                               if (pthread_object_call(NULL, phandleclient,
+                                           phi) == 0)
                                    clnode->connections++;
                                else {
                                    if (phi != NULL) {
-                                       pth_mutex_acquire(&ppool_lock, FALSE,
-                                               NULL);
+                                       pthread_mutex_lock(&ppool_lock);
                                        pool_free((pnode_t *)phi);
-                                       pth_mutex_release(&ppool_lock);
+                                       pthread_mutex_unlock(&ppool_lock);
                                    }
                                    syslog(LOG_NOTICE, "tcp_server() - "
                                            "Error launching thread");
@@ -453,7 +459,7 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                            /* Close down connection with client
                             * immediately, sending message.
                             */
-                           (void) pth_write(msgsock, message, msglen);
+                           (void) write(msgsock, message, msglen);
                            shutdown(msgsock, 2);
                            close(msgsock);
                            mm_strcpy(ipaddr, "0.0.0.0");
@@ -463,11 +469,8 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
                                syslog(LOG_NOTICE, "tcp_server() - [%s] - %s",
                                        ipaddr, MMS_RSTRING(reason));
                        }
-
-                       pth_mutex_release(&ctable_lock);
-
                    } else
-                       syslog(LOG_NOTICE, "tcp_server() - pth_accept()");
+                       syslog(LOG_NOTICE, "tcp_server() - accept()");
                }
            }
        }
@@ -475,16 +478,13 @@ tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
 
     free(ifaces);
     mmstrexit();
-    pth_abort(clnode_thread);
-    pth_join(clnode_thread, NULL);
+    pthread_cancel(clnode_thread);
+    pthread_join(clnode_thread, NULL);
     hashtable_destroy(&ctable, FALSE);
     pool_destroy(&ppool);
     pool_destroy(&cpool);
-    /* Unrequired for Pth (non-existing even)
-     * pth_mutex_destroy(&apool_lock);
-     * pth_mutex_destroy(&ctable_lock);
-     * pth_mutex_destroy(&ppool_lock);
-     */
+    pthread_mutex_destroy(&ctable_lock);
+    pthread_mutex_destroy(&ppool_lock);
 
     exit(ret);
 }
@@ -651,11 +651,11 @@ parse_ifaces(char *hostnames, char *addresses)
 
 /* SERVER (THREAD) */
 
-/* This is started by pth_spawn(), it calls the user handler function, which
- * only has to care about serving the client.
+/* This is started by pthread_create(), it calls the user handler function,
+ * which only has to care about serving the client.
  */
 static void
-phandleclient(struct thread_object *obj, void *args)
+phandleclient(pthread_object_t *obj, void *args)
 {
     phandleinfo *phi;
     int socket, ret;
@@ -671,9 +671,9 @@ phandleclient(struct thread_object *obj, void *args)
     socket = phi->socket;
     clnode = phi->clnode;
     iface = phi->iface;
-    pth_mutex_acquire(&ppool_lock, FALSE, NULL);
+    pthread_mutex_lock(&ppool_lock);
     pool_free((pnode_t *)phi);
-    pth_mutex_release(&ppool_lock);
+    pthread_mutex_unlock(&ppool_lock);
 
     ret = 0;
 
@@ -691,13 +691,13 @@ phandleclient(struct thread_object *obj, void *args)
            }
            if (tmp) {
                /* Lock the mutex for a very short moment */
-               pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+               pthread_mutex_lock(&ctable_lock);
                /* Verify again for NULL to avoid a potential memory leak */
                if(clnode->hostname == NULL)
                    clnode->hostname = tmp;
                else
                    tmp = mmstrfree(tmp);
-               pth_mutex_release(&ctable_lock);
+               pthread_mutex_unlock(&ctable_lock);
            }
        }
 
@@ -719,10 +719,10 @@ phandleclient(struct thread_object *obj, void *args)
     /* Decrease our connection/refcount counter, and let our ctable thread
      * decide if/when to uncache our node.
      */
-    pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+    pthread_mutex_lock(&ctable_lock);
     if (clnode->connections > 0)
        clnode->connections--;
-    pth_mutex_release(&ctable_lock);
+    pthread_mutex_unlock(&ctable_lock);
 
     /* kthxbye :) */
 }
@@ -791,7 +791,7 @@ async_thread(void *args)
        /* Fatal error */
        syslog(LOG_NOTICE, "async_thread() -  calloc(%d)",
                idx_size + pfd_size);
-       pth_exit(NULL);
+       pthread_exit(NULL);
     }
     i = 0;
     DLIST_FOREACH(freeprocs, proc) {
@@ -806,15 +806,16 @@ async_thread(void *args)
 
        /* First check for new messages and queue them in our queue list if
         * any, but only wait for them if all processes are free (we don't
-        * expect any results). Special note: Pth will only notify event for
-        * a port if it received a message while it was empty.
+        * expect any results).
         */
-       if ((pth_msgport_pending(async->port))) {
-           while ((omsg = (char *)pth_msgport_get(async->port)) != NULL)
+       if (pthread_port_pending(&async->port) > 0) {
+           while ((omsg = (char *)pthread_msg_get(&async->port)) != NULL)
                DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
-       } else if (freeprocs->nodes == async->nprocs) {
-           pth_wait(async->ring);
-           while ((omsg = (char *)pth_msgport_get(async->port)) != NULL)
+       } else if (DLIST_NODES(freeprocs) == async->nprocs) {
+           /* XXX */ syslog(LOG_NOTICE, "* pthread_ring_wait() starting");
+           pthread_ring_wait(&async->ring, NULL);
+           /* XXX */ syslog(LOG_NOTICE, "* pthread_ring_wait() ended");
+           while ((omsg = (char *)pthread_msg_get(&async->port)) != NULL)
                DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
        }
 
@@ -832,7 +833,8 @@ async_thread(void *args)
                if (DEBUG_TRUE(m->func_id < async->nfuncs)) {
                    len = async->funcs[m->func_id].msg_len;
                    idx[p->sock].aclenv = m->aclenv;
-                   if ((pth_send(p->sock, m, len, 0)) == len)
+                   /* XXX */ syslog(LOG_NOTICE, "send()ing");
+                   if ((send(p->sock, m, len, 0)) == len)
                        DLIST_UNLINK(freeprocs, (node_t *)p);
                    else
                        syslog(LOG_NOTICE, "async_thread() - send(%d:%d)",
@@ -845,13 +847,19 @@ async_thread(void *args)
        }
 
        /* Wait for results from our async processes via their socket,
-        * but be sure to break when new Pth messages are available.
+        * but be sure to break when new inter-thread messages are available.
         * If new results are obtained, reply back to the caller thread.
         */
        for (;;) {
           int selr, i;
 
-          if ((selr = pth_poll_ev(pfds, nfds, -1, async->ring)) > 0) {
+          /* XXX */ syslog(LOG_NOTICE, "* pthread_poll_ring() starting");
+          selr = pthread_poll_ring(pfds, nfds, -1, &async->ring);
+          /* XXX */ syslog(LOG_NOTICE, "* pthread_poll_ring() ended (selr = "
+                           "%d (%s)", selr, strerror(errno));
+          if (selr == -1 && errno == ECANCELED)
+                  break;
+          if (selr > 0) {
               for (i = 0; selr > 0 && i < nfds; i++) {
                   if (pfds[i].revents & POLLERR) {
                       /* If this happens something is really wrong, exit */
@@ -870,32 +878,30 @@ async_thread(void *args)
                       /* Results, reply message back to client thread and
                        * return this process node in the free pool.
                        */
-                      if ((len = pth_recv(fd, e->msg, async->msg_len,
+                      if ((len = recv(fd, e->msg, async->msg_len,
                                       MSG_WAITALL)) <
                               sizeof(struct async_msg))
                           syslog(LOG_NOTICE, "async_thread() - recv(%d:%d)",
                                   fd, (int)async->msg_len);
-                      pth_msgport_reply(&(e->msg->msg));
+                      /* XXX */syslog(LOG_NOTICE, "recv()ed reply");
+                      pthread_msg_reply(&(e->msg->msg));
                       DLIST_APPEND(freeprocs, (node_t *)p);
                       selr--;
                   }
               }
           }
-          if ((pth_event_occurred(async->ring)))
-              /* Other requests we must queue immediately */
-              break;
        }
     }
 
     /* NOTREACHED */
-    pth_exit(NULL);
+    pthread_exit(NULL);
     return (NULL);
 }
 
 
 /* This function should be called at early application startup to setup
- * the asynchroneous pool of processes. async_init_pth() call may then
- * be called later on after pth_init() to initialize the client-side
+ * the asynchroneous pool of processes. async_init_pthread() call may then
+ * be called later on after pthread_init() to initialize the client-side
  * asynchroneous context (required before using async_open_clenv()).
  * This function is assumed to be called once only, and it's effects to
  * only be discarded when the main process exits.
@@ -972,67 +978,65 @@ async_init(struct async_func *funcs, int procs, uid_t uid, gid_t *gids,
 }
 
 
-/* This function is used by an application after pth_init() to initialize
+/* This function is used by an application after pthread_init() to initialize
  * the asynchroneous thread device/server, which will dispatch requests to
  * the asynchroneous pool of processes in a thread-safe manner (without
  * blocking the whole process. async_init() should have been called first.
  */
 bool
-async_init_pth(void)
+async_init_pthread(void)
 {
     size_t env_len;
     bool res = FALSE;
 
     if (DEBUG_TRUE(async)) {
 
+       pthread_poll_init();
+
        res = TRUE;
-       pth_mutex_init(&async->apool_lock);
+       pthread_mutex_init(&async->apool_lock, NULL);
        /* Setup our fast-allocation pool for client-side async environments */
        env_len = (size_t)OALIGN_CEIL(sizeof(struct async_clenv), long);
        if (!pool_init(&async->apool, "asyncenv_pool", malloc, free,
                    aclenv_constructor, aclenv_destructor,
                    env_len + async->msg_len,
                    16384 / (env_len + async->msg_len), 0, 0)) {
-           syslog(LOG_NOTICE, "async_init_pth() - pool_init(apool)");
+           syslog(LOG_NOTICE, "async_init_pthread() - pool_init(apool)");
            res = FALSE;
        }
 
        if (res) {
-           pth_attr_t attrs;
+           pthread_attr_t      attrs;
+           pthread_t           thread;
 
            res = FALSE;
-           if ((attrs = pth_attr_new()) != NULL) {
-               /* Start async slave thread device */
-               if ((async->port = pth_msgport_create("mms_async_server"))) {
-                   if ((async->ring = pth_event(PTH_EVENT_MSG,
-                                   async->port))) {
-                       pth_attr_set(attrs, PTH_ATTR_JOINABLE, FALSE);
-                       if ((pth_spawn(attrs, async_thread, NULL)) != NULL)
-                           res = TRUE;
-                       pth_attr_destroy(attrs);
-                   } else
-                       syslog(LOG_NOTICE, "async_init_pth() - pth_event()");
+           pthread_attr_init(&attrs);
+           /* Start async slave thread device */
+           if (pthread_port_init(&async->port) == 0) {
+               if (pthread_ring_init(&async->ring) == 0) {
+                   pthread_port_set_ring(&async->port, &async->ring);
+                   pthread_attr_setdetachstate(&attrs, 1);
+                   if ((pthread_create(&thread, &attrs, async_thread, NULL))
+                           == 0)
+                       res = TRUE;
                } else
-                   syslog(LOG_NOTICE,
-                           "async_init_pth() - pth_msgport_create()");
-               pth_attr_destroy(attrs);
-           }
+                   syslog(LOG_NOTICE, "async_init_pth() - pth_event()");
+           } else
+               syslog(LOG_NOTICE,
+                       "async_init_pth() - pth_msgport_create()");
+           pthread_attr_destroy(&attrs);
        }
 
        if (!res) {
-           if (async->ring) {
-               pth_event_free(async->ring, PTH_FREE_ALL);
-               async->ring = NULL;
-           }
-           if (async->port) {
-               pth_msgport_destroy(async->port);
-               async->port = NULL;
-           }
+           if (pthread_port_valid(&async->port))
+               pthread_port_destroy(&async->port);
+           if (pthread_ring_valid(&async->ring))
+               pthread_ring_destroy(&async->ring);
            pool_destroy(&async->apool);
        }
 
     } else
-       DEBUG_PRINTF("async_init_pth", "Call async_init() first");
+       DEBUG_PRINTF("async_init_pthread", "Call async_init() first");
 
     return (res);
 }
@@ -1040,7 +1044,7 @@ async_init_pth(void)
 
 /* Used by a client thread to initialize a context suitable to call the
  * async_call() function with. Each thread needs one to use the device.
- * async_init() and async_init_pth() should have been called first.
+ * async_init() and async_init_pthread() should have been called first.
  */
 static struct async_clenv *
 async_open_clenv(void)
@@ -1049,9 +1053,9 @@ async_open_clenv(void)
 
     if (DEBUG_TRUE(async != NULL && POOL_VALID(&async->apool))) {
        /* Optimized for speed using a pool of pre-allocated nodes */
-       pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
+       pthread_mutex_lock(&async->apool_lock);
        aclenv = (struct async_clenv *)pool_alloc(&async->apool, FALSE);
-       pth_mutex_release(&async->apool_lock);
+       pthread_mutex_unlock(&async->apool_lock);
     } else
        DEBUG_PRINTF("async_open_clenv",
                "Call async_init() and async_init_pth() first");
@@ -1065,9 +1069,9 @@ async_open_clenv(void)
 static struct async_clenv *
 async_close_clenv(struct async_clenv *aclenv)
 {
-    pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
+    pthread_mutex_lock(&async->apool_lock);
     pool_free((pnode_t *)aclenv);
-    pth_mutex_release(&async->apool_lock);
+    pthread_mutex_unlock(&async->apool_lock);
 
     return (NULL);
 }
@@ -1076,18 +1080,17 @@ static bool aclenv_constructor(pnode_t *pnode)
 {
     struct async_clenv *aclenv = (struct async_clenv *)pnode;
 
-    if ((aclenv->port = pth_msgport_create("XXX(NULL)")) != NULL) {
-       if ((aclenv->ring = pth_event(PTH_EVENT_MSG, aclenv->port))) {
-           register char *ptr = (char *)aclenv;
+    if (pthread_port_init(&aclenv->port) == 0) {
+       if (pthread_ring_init(&aclenv->ring) == 0) {
+           register char       *ptr = (char *)aclenv;
 
            ptr += (size_t)OALIGN_CEIL(sizeof(struct async_clenv), long);
            aclenv->msg = (struct async_msg *)ptr;
-           aclenv->msg->msg.m_replyport = aclenv->port;
-           aclenv->msg->msg.m_size = async->msg_len;
+           pthread_msg_init(&aclenv->msg->msg, &aclenv->port);
 
            return TRUE;
        }
-       pth_msgport_destroy(aclenv->port);
+       pthread_port_destroy(&aclenv->port);
     }
 
     return FALSE;
@@ -1097,8 +1100,9 @@ static void aclenv_destructor(pnode_t *pnode)
 {
     struct async_clenv *aclenv = (struct async_clenv *)pnode;
 
-    pth_event_free(aclenv->ring, PTH_FREE_ALL);
-    pth_msgport_destroy(aclenv->port);
+    pthread_msg_destroy(&aclenv->msg->msg);
+    pthread_port_destroy(&aclenv->port);
+    pthread_ring_destroy(&aclenv->ring);
 }
 
 
@@ -1114,7 +1118,7 @@ async_call(struct async_clenv *aclenv, unsigned int function)
     /* Send request */
     aclenv->msg->func_id = function;
     aclenv->msg->aclenv = aclenv;
-    pth_msgport_put(async->port, &(aclenv->msg->msg));
+    pthread_msg_put(&async->port, &(aclenv->msg->msg));
 
     /* Sleep until we get reply back, in a thread-safe manner.
      * Note: This will permanently block the current thread if the
@@ -1131,15 +1135,8 @@ async_call(struct async_clenv *aclenv, unsigned int function)
      * we get a reply back seems a good choice here; The async functions
      * should ensure to eventually return.
      */
-    for (;;) {
-       if ((pth_wait(aclenv->ring))) {
-           if ((pth_event_occurred(aclenv->ring))) {
-               /* We know that message pointer will be the same */
-               if ((pth_msgport_get(aclenv->port)))
-                   break;
-           }
-       }
-    }
+    while (pthread_msg_get(&aclenv->port) == NULL)
+       pthread_ring_wait(&aclenv->ring, NULL);
 }
 
 
@@ -1356,7 +1353,7 @@ clnode_expire_thread(void *args)
     data.cnt = 0;
     for (;;) {
        /* Sleep until it is known that at least one node expired */
-       pth_sleep((unsigned int)data.soonest);
+       pthread_sleep((unsigned int)data.soonest);
        /* Tell our iterator function the current time and the maximum
         * allowed time to wait to
         */
@@ -1365,14 +1362,14 @@ clnode_expire_thread(void *args)
        /* Lock ctable, reset expired nodes, garbage collect, and set
         * data.soonest to the time of the soonest next expireing node.
         */
-       pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+       pthread_mutex_lock(&ctable_lock);
        if (HASHTABLE_NODES(&ctable) > 0)
            hashtable_iterate(&ctable, clnode_expire_thread_iterator, &data);
-       pth_mutex_release(&ctable_lock);
+       pthread_mutex_unlock(&ctable_lock);
     }
 
     /* NOTREACHED */
-    pth_exit(NULL);
+    pthread_exit(NULL);
     return NULL;
 }
 
@@ -1403,11 +1400,5 @@ clnode_expire_thread_iterator(hashnode_t *hnod, void *udata)
     if (rem != 0 && data->soonest > rem)
        data->soonest = rem;
 
-    /* If the cache is big, prevent from interfering with other threads */
-    if ((data->cnt++) == 64) {
-       data->cnt = 0;
-       pth_yield(NULL);
-    }
-
     return TRUE;
 }
index 9bf088a..2a099d8 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmserver.h,v 1.10 2004/06/09 20:58:54 mmondor Exp $ */
+/* $Id: mmserver.h,v 1.10.2.1 2005/11/16 01:00:58 mmondor Exp $ */
 
 /*
  * Copyright (C) 2000-2004, Matthew Mondor
@@ -43,7 +43,7 @@
 
 
 #include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmtypes.h>
 #include <mmpool.h>
@@ -51,6 +51,8 @@
 #include <mmhash.h>
 #include <mmlimitrate.h>
 
+#include <mm_pthread_msg.h>
+
 
 
 
@@ -146,7 +148,7 @@ struct ifacendx {
  */
 struct async_msg {
     pnode_t node;
-    pth_message_t msg;
+    pthread_msg_t msg;
     struct async_clenv *aclenv;
     unsigned int func_id;
     /* User data structure here, user async functions should not touch
@@ -169,10 +171,10 @@ struct async_proc {
 
 /* Global async server context environment */
 struct async_env {
-    pth_t slave;
-    pth_msgport_t port;
-    pth_event_t ring;
-    pth_mutex_t apool_lock;
+    pthread_t slave;
+    pthread_port_t port;
+    pthread_ring_t ring;
+    pthread_mutex_t apool_lock;
     struct async_func *funcs;
     unsigned int nfuncs, nprocs;
     size_t msg_len;
@@ -190,8 +192,8 @@ struct async_idx {
 /* Client/thread specific async context */
 struct async_clenv {
     pnode_t node;
-    pth_msgport_t port;
-    pth_event_t ring;
+    pthread_port_t port;
+    pthread_ring_t ring;
     struct async_msg *msg;
 };
 
@@ -220,7 +222,7 @@ extern void tcp_server(char *, char *, char *, uid_t, gid_t *, int,
 extern bool    make_daemon(const char *, const char *);
 
 extern bool    async_init(struct async_func *, int, uid_t, gid_t *, int);
-extern bool    async_init_pth(void);
+extern bool    async_init_pthread(void);
 extern void    async_call(struct async_clenv *, unsigned int);
 
 extern char *  resolve_hostname(struct async_clenv *, struct sockaddr *);
index e5f916c..c0006f0 100644 (file)
@@ -1,4 +1,4 @@
-/* $Id: mmstr.c,v 1.8 2004/05/22 17:44:00 mmondor Exp $ */
+/* $Id: mmstr.c,v 1.8.2.1 2005/11/16 01:00:58 mmondor Exp $ */
 
 /*
  * Copyright (C) 2000-2004, Matthew Mondor
@@ -39,7 +39,7 @@
 /* INCLUDES */
 
 #include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
 
 #include <mmtypes.h>
 #include <mmstring.h>
@@ -51,7 +51,7 @@
 
 MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
 \tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmstr.c,v 1.8 2004/05/22 17:44:00 mmondor Exp $");
+MMRCSID("$Id: mmstr.c,v 1.8.2.1 2005/11/16 01:00:58 mmondor Exp $");
 
 
 
@@ -59,7 +59,7 @@ MMRCSID("$Id: mmstr.c,v 1.8 2004/05/22 17:44:00 mmondor Exp $");
 /* GLOBALS */
 
 static pool_t mmstrp;
-static pth_mutex_t mmstrp_lock = PTH_MUTEX_INIT;
+static pthread_mutex_t mmstrp_lock = PTHREAD_MUTEX_INITIALIZER;
 
 
 
@@ -76,13 +76,13 @@ mmstrinit(void *(*allocfunc)(size_t), void (*freefunc)(void *),
 {
     bool ok = FALSE;
 
-    pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+    pthread_mutex_lock(&mmstrp_lock);
     if (!POOL_VALID(&mmstrp))
        ok = pool_init(&mmstrp, "mmstr_pool", allocfunc, freefunc, NULL, NULL,
                sizeof(mmstrnode), buffer / sizeof(mmstrnode), 0, 0);
     else
        ok = TRUE;
-    pth_mutex_release(&mmstrp_lock);
+    pthread_mutex_unlock(&mmstrp_lock);
 
     return ok;
 }
@@ -92,9 +92,9 @@ mmstrinit(void *(*allocfunc)(size_t), void (*freefunc)(void *),
 void
 mmstrexit(void)
 {
-    pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+    pthread_mutex_lock(&mmstrp_lock);
     pool_destroy(&mmstrp);
-    pth_mutex_release(&mmstrp_lock);
+    pthread_mutex_unlock(&mmstrp_lock);
 }
 
 
@@ -113,9 +113,9 @@ mmstrdup(const char *str)
      */
     for (ptr = str; *ptr != '\0'; ptr++) ;
     if ((len = (size_t)(ptr - str) + 1) < 256) {
-       pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+       pthread_mutex_lock(&mmstrp_lock);
        nod = (mmstrnode *)pool_alloc(&mmstrp, FALSE);
-       pth_mutex_release(&mmstrp_lock);
+       pthread_mutex_unlock(&mmstrp_lock);
        if (nod != NULL) {
            mm_memcpy(nod->string, str, len);
            return nod->string;
@@ -135,9 +135,9 @@ mmstralloc(size_t size)
     register mmstrnode *nod = NULL;
 
     if (size < 256) {
-       pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+       pthread_mutex_lock(&mmstrp_lock);
        nod = (mmstrnode *)pool_alloc(&mmstrp, FALSE);
-       pth_mutex_release(&mmstrp_lock);
+       pthread_mutex_unlock(&mmstrp_lock);
        if (nod != NULL) {
            *nod->string = '\0';
            return nod->string;
@@ -155,9 +155,9 @@ mmstrfree(char *str)
     /* Evaluate actual node pointer from supplied string pointer and free it */
     str -= sizeof(pnode_t);
 
-    pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+    pthread_mutex_lock(&mmstrp_lock);
     pool_free((pnode_t *)str);
-    pth_mutex_release(&mmstrp_lock);
+    pthread_mutex_unlock(&mmstrp_lock);
 
     return NULL;
 }