-$Id: ChangeLog,v 1.50 2005/11/17 07:38:07 mmondor Exp $
+$Id: ChangeLog,v 1.51 2007/03/13 20:28:22 mmondor Exp $
+
+
+
+Release: mmftpd 0.1.0 devl
+Date : March 13, 2007
+By : Matthew Mondor
+
+* Performance enhancements
+ - Now uses pthread instead of pth, using the new pthread_utils library
+ for functionality which pth had but pthread lacks. This allows to
+ execute less functions through dedicated processes (which also require
+ some send(2)/recv(2) overhead), because of the expected preemptive
+ nature of most pthread implementations. It also allows better scaling
+ with SMP.
+ - Uses PTHREAD_MUTEX_INITIALIZER where possible instead of
+ pthread_mutex_init().
-# $Id: GNUmakefile,v 1.6 2006/12/13 14:37:12 mmondor Exp $
+# $Id: GNUmakefile,v 1.7 2007/03/13 20:28:22 mmondor Exp $
MMLIBS := $(addprefix ../mmlib/,mmarch.o mmfd.o mmhash.o mmlimitrate.o \
-mmlog.o mmpath.o mmpool.o mmreadcfg.o mm_pth_pool.o mmserver.o mmstat.o \
-mmstr.o mmstring.o)
-
-PTHINCDIR := $(shell pth-config --cflags)
-PTHLIBDIR := $(shell pth-config --libdir)
+mmlog.o mmpath.o mmpool.o mmreadcfg.o mmserver.o mmstat.o \
+mmstr.o mmstring.o) $(addprefix ../pthread_util/,mm_pthread_msg.o \
+mm_pthread_poll.o mm_pthread_pool.o mm_pthread_sleep.o)
OBJS := src/mmftpd.o
CFLAGS += -Wall
+#CFLAGS += -DNODETACH -DDEBUG -DPTHREAD_DEBUG -g3
+
+LDFLAGS += -lc -lcrypt -lpthread
+#LDFLAGS += -lpthread_dbg
all: src/mmftpd
%.o: %.c
- $(CC) -c ${CFLAGS} -I. -I../mmlib $(PTHINCDIR) -o $@ $<
+ $(CC) -c ${CFLAGS} -I. -I../mmlib -I../pthread_util -o $@ $<
src/mmftpd: $(MMLIBS) $(OBJS)
- $(CC) -o $@ -L$(PTHLIBDIR) -lc -lcrypt -lpth $(OBJS) $(MMLIBS)
+ $(CC) -o $@ $(LDFLAGS) $(OBJS) $(MMLIBS)
install:
install -cs -o 0 -g 0 -m 500 src/mmftpd /usr/local/sbin
-/* $Id: mmftpd.c,v 1.66 2004/09/19 11:02:45 mmondor Exp $ */
+/* $Id: mmftpd.c,v 1.67 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2001-2004, Matthew Mondor
#include <ctype.h>
#include <syslog.h>
-#include <pth.h>
+#include <pthread.h>
#include <mmtypes.h>
#include <mmreadcfg.h>
#include <mmstring.h>
#include <mmfifo.h>
#include <mmstat.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_poll.h>
+#include <mm_pthread_sleep.h>
#include "mmftpd.h"
MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmftpd.c,v 1.66 2004/09/19 11:02:45 mmondor Exp $");
+MMRCSID("$Id: mmftpd.c,v 1.67 2007/03/13 20:28:22 mmondor Exp $");
static CONFIG CONF;
/* Used to start transfer threads */
-static pth_attr_t tthreadattr;
+static pthread_attr_t tthreadattr;
/* List of logged in users and optionally current home directory size */
-static pth_mutex_t lusers_lock;
+static pthread_mutex_t lusers_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t lusers_pool;
static hashtable_t lusers_table;
/* This is used so that clientenv structures be allocated/freed fast */
-static pth_mutex_t clenvs_lock;
+static pthread_mutex_t clenvs_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t clenvs_pool;
static pool_t fifos_pool;
/* Pool used to optimize creating/destroying mmfd mutexes */
-static pth_mutex_t mutexes_lock;
+static pthread_mutex_t mutexes_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t mutexes_pool;
/* Used for the longer-term directory size cache */
-static pth_mutex_t quota_lock;
+static pthread_mutex_t quota_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t quota_pool;
static hashtable_t quota_table;
static fdfuncs gfdf = {
malloc,
free,
- pth_poll,
- pth_read,
- pth_write,
- pth_sleep,
- pth_usleep,
- _pth_mutex_create,
- _pth_mutex_destroy,
- _pth_mutex_lock,
- _pth_mutex_unlock,
- _pth_thread_yield,
- _pth_eintr
+ poll,
+ read,
+ write,
+ pthread_sleep,
+ pthread_usleep,
+ thread_mutex_create,
+ thread_mutex_destroy,
+ thread_mutex_lock,
+ thread_mutex_unlock,
+ NULL,
+ thread_eintr
};
/* async_getuserline() uses this for each asynchroneous process */
/* We slow down password guessing programs */
if (clenv->attempts > 0)
- pth_sleep(clenv->attempts * 2);
+ pthread_sleep(clenv->attempts * 2);
clenv->attempts++;
if (clenv->anonymous) {
}
directory_message(clenv, 230);
- pth_mutex_acquire(&lusers_lock, FALSE, NULL);
+ pthread_mutex_lock(&lusers_lock);
/* Verify if user in list */
len = mm_strlen(clenv->user) + 1;
bool lok = TRUE;
/* Yes, make sure that we observe limits, if any */
- pth_mutex_acquire(&lun->lock, FALSE, NULL);
+ pthread_mutex_lock(&lun->lock);
if (clenv->maxlogins != 0) {
if (lun->logins < clenv->maxlogins)
lun->logins++;
else lok = FALSE;
} else
lun->logins++;
- pth_mutex_release(&lun->lock);
+ pthread_mutex_unlock(&lun->lock);
if (lok) {
clenv->unode = lun;
if (clenv->anonymous)
nextstate = STATE_ERROR;
}
if (lun != NULL) {
- pth_mutex_init(&lun->lock);
+ pthread_mutex_init(&lun->lock, NULL);
lun->logins = 1;
mm_memcpy(lun->user, clenv->user, len);
clenv->unode = lun;
}
}
- pth_mutex_release(&lusers_lock);
+ pthread_mutex_unlock(&lusers_lock);
}
return (nextstate);
{
long logins;
- pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+ pthread_mutex_lock(&clenv->unode->lock);
logins = clenv->unode->logins;
- pth_mutex_release(&clenv->unode->lock);
+ pthread_mutex_unlock(&clenv->unode->lock);
fdbprintf(fdb, " Logged in as %s (%ld concurrent logins)\r\n",
clenv->user, logins);
}
fdbprintf(fdb, " %lld bytes", (int64_t)clenv->maxhomesize);
fdbwrite(fdb, "\r\n Current home directory size: ", 36);
- pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+ pthread_mutex_lock(&clenv->unode->lock);
size = clenv->unode->homesize;
- pth_mutex_release(&clenv->unode->lock);
+ pthread_mutex_unlock(&clenv->unode->lock);
fdbprintf(fdb, "%lld bytes", (int64_t)size);
} else
fdbwrite(fdb, "disabled", 8);
int
main(int argc, char **argv)
{
- char *conf_file = "/etc/mmftpd.conf";
+ char *conf_file = "/usr/local/etc/mmftpd.conf";
pid_t uid;
gid_t *gids;
int ngids, ret = EXIT_SUCCESS;
/* Set defaults */
*CONF.CHROOT_DIR = '\0';
- mm_strcpy(CONF.PASSWD_FILE, "/etc/mmftpdpasswd");
+ mm_strcpy(CONF.PASSWD_FILE, "/usr/local/etc/mmftpdpasswd");
mm_strcpy(CONF.PID_PATH, "/var/run/mmftpd.pid");
mm_strcpy(CONF.USER, "mmftpd");
mm_strcpy(CONF.GROUPS, "mmftpd,mmstat");
async_init(afuncs, CONF.ASYNC_PROCESSES, uid, gids, ngids);
/* Threading system initialization */
- pth_init();
+ /* pthread_init(); */
/* Part of mmserver(3)'s async system which has to be initialized after
* the threading system
*/
- async_init_pth();
-
- pth_mutex_init(&lusers_lock);
- pth_mutex_init(&clenvs_lock);
- pth_mutex_init(&mutexes_lock);
- pth_mutex_init("a_lock);
+ async_init_pthread();
srandom(getpid() + time(NULL));
/* We use those for our transfer ASYNC thread, joinable because when we
* eventually request it to quit via a message we want to make sure it
- * ended properly waiting for it with pth_join(). I have decided to use
+ * ended properly waiting for it with pthread_join(). I have decided to use
* a second thread dedicated to file transfers, this way I can respect
* the RFC on listening to ABOR and STAT easily on the control connection
* while transfers are ongoing, with ease and modularity.
*/
- tthreadattr = pth_attr_new();
- pth_attr_set(tthreadattr, PTH_ATTR_JOINABLE, TRUE);
+ pthread_attr_init(&tthreadattr);
+ pthread_attr_setdetachstate(&tthreadattr, 0);
/* Initialize our pools */
/* Client environment nodes */
POOL_VALID(&mutexes_pool) && POOL_VALID("a_pool) &&
HASHTABLE_VALID("a_table) &&
(!*CONF.DISPLAY_FILE || POOL_VALID(&fifos_pool))) {
+ thread_init();
fdbcinit(&gfdf, &fdbc, CONF.GBANDWIDTH_IN * 1024,
CONF.GBANDWIDTH_OUT * 1024);
/* Finally start our TCP main daemon */
bool space;
/* First locate end of command */
- for (; *str != '\0' && !isspace(*str); str++) ;
+ for (; *str != '\0' && !isspace((int)*str); str++) ;
if (*str != '\0') {
space = FALSE;
while (*str != '\0') {
/* Find first printable char */
- for (; *str != '\0' && isspace(*str); str++)
+ for (; *str != '\0' && isspace((int)*str); str++)
space = TRUE;
/* Is it an option? */
if (space && *str == '-') {
* char
*/
space = FALSE;
- while (*str != '\0' && !isspace(*str))
+ while (*str != '\0' && !isspace((int)*str))
*str++ = ' ';
} else
break;
bool ret = TRUE;
if (clenv->maxhomesize && modify != 0) {
- pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+ pthread_mutex_lock(&clenv->unode->lock);
cur = clenv->unode->homesize;
cur += modify;
if (cur < 0)
clenv->unode->homesize = cur;
else
ret = FALSE;
- pth_mutex_release(&clenv->unode->lock);
+ pthread_mutex_unlock(&clenv->unode->lock);
}
return (ret);
clientenv *clenv;
struct fifonode *fnode = NULL;
- pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
+ pthread_mutex_lock(&clenvs_lock);
clenv = (clientenv *)pool_alloc(&clenvs_pool, TRUE);
if (*CONF.DISPLAY_FILE)
fnode = (struct fifonode *)pool_alloc(&fifos_pool, FALSE);
- pth_mutex_release(&clenvs_lock);
+ pthread_mutex_unlock(&clenvs_lock);
if (clenv) {
if (!(*CONF.DISPLAY_FILE) || fnode) {
free_clientenv(clenv);
} else {
if (fnode) {
- pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
+ pthread_mutex_lock(&clenvs_lock);
pool_free((pnode_t *)fnode);
- pth_mutex_release(&clenvs_lock);
+ pthread_mutex_unlock(&clenvs_lock);
}
}
if (clenv->unode) {
bool zero = FALSE;
- pth_mutex_acquire(&lusers_lock, FALSE, NULL);
- pth_mutex_acquire(&clenv->unode->lock, FALSE, NULL);
+ pthread_mutex_lock(&lusers_lock);
+ pthread_mutex_lock(&clenv->unode->lock);
if ((--clenv->unode->logins) < 1)
zero = TRUE;
- pth_mutex_release(&clenv->unode->lock);
+ pthread_mutex_unlock(&clenv->unode->lock);
if (zero) {
hashtable_unlink(&lusers_table, (hashnode_t *)clenv->unode);
- /* May be required by some POSIX thread implementations */
- /* pth_mutex_destroy(&clenv->unode->lock); */
+ pthread_mutex_destroy(&clenv->unode->lock);
/* We now need to either update the cache entry for this home
* directory size or to create a new entry for it, but only if
*/
len = mm_strlen(home) + 1;
t = time(NULL);
- pth_mutex_acquire("a_lock, FALSE, NULL);
+ pthread_mutex_lock("a_lock);
if ((qnod = (quotanode *)hashtable_lookup("a_table, home,
len)) != NULL) {
qnod->homesize = clenv->unode->homesize;
qnod->dir, len, FALSE);
}
}
- pth_mutex_release("a_lock);
+ pthread_mutex_unlock("a_lock);
}
pool_free((pnode_t *)clenv->unode);
}
- pth_mutex_release(&lusers_lock);
+ pthread_mutex_unlock(&lusers_lock);
}
- pth_mutex_acquire(&clenvs_lock, FALSE, NULL);
- if (fnode != NULL) pool_free((pnode_t *)fnode);
- if (home != NULL) mmstrfree(home);
+ pthread_mutex_lock(&clenvs_lock);
+ if (fnode != NULL)
+ pool_free((pnode_t *)fnode);
+ if (home != NULL)
+ mmstrfree(home);
pool_free((pnode_t *)clenv);
- pth_mutex_release(&clenvs_lock);
+ pthread_mutex_unlock(&clenvs_lock);
return (NULL);
}
static bool
start_transfer_thread(clientenv *clenv)
{
- /* I do not understand why libpth doesn't provide a function to set the
- * reply port, or to init the message structure. Moreover, it theoretically
- * shouldn't need the message size if it simply swaps messages around port
- * linked list FIFOs via the message node. This currently seems the best
- * way to go about it
- */
- clenv->tmsg.msg.m_size = sizeof(transfermsg);
- clenv->tmsg.msg.m_replyport = clenv->rport;
+ pthread_msg_init(&clenv->tmsg.msg, &clenv->rport);
/*
- if ((clenv->tthread = pth_spawn(tthreadattr, (void *)transferthread,
- clenv))) {
- if (transfer_request(REQ_NONE, TRUE, clenv)) return (TRUE);
- pth_join(clenv->tthread, NULL);
+ if (pthread_create(&clenv->tthread, &tthreadattr, (void *)transferthread,
+ clenv) == 0) {
+ if (transfer_request(REQ_NONE, TRUE, clenv))
+ return (TRUE);
+ pthread_join(&clenv->tthread, NULL);
}
*/
- thread_object_call(NULL, transferthread, clenv);
+ if (pthread_object_call(NULL, transferthread, clenv) != 0)
+ DEBUG_PRINTF("start_transfer_thread", "pthread_object_call()");
+
if (transfer_request(REQ_NONE, TRUE, clenv))
return TRUE;
+ else
+ DEBUG_PRINTF("start_transfer_thread", "transfer_request(REQ_NONE)");
return (FALSE);
}
static void
stop_transfer_thread(clientenv *clenv)
{
- if (clenv->sport) {
+ if (pthread_port_valid(&clenv->sport)) {
/* Send a quit request and wait for reply */
if (!transfer_request(REQ_QUIT, TRUE, clenv))
DEBUG_PRINTF("stop_transfer_thread",
* Join thread as we want to make sure it exits before we do.
*/
/*
- pth_join(clenv->tthread, NULL);
+ pthread_join(&clenv->tthread, NULL);
*/
}
+ if (pthread_msg_valid(&clenv->tmsg.msg))
+ pthread_msg_destroy(&clenv->tmsg.msg);
}
{
int res = FALSE;
transfermsg *msg = &(clenv->tmsg);
- pth_event_t ring;
if (req != REQ_NONE) {
msg->request = req;
msg->result = FALSE;
- pth_msgport_put(clenv->sport, (pth_message_t *)msg);
+ pthread_msg_put(&clenv->sport, (pthread_msg_t *)msg);
}
if (waitreply) {
- ring = pth_event(PTH_EVENT_TIME, pth_timeout(120, 0));
- pth_event_concat(clenv->rring, ring, NULL);
for (;;) {
- if ((pth_wait(clenv->rring))) {
- if ((pth_event_occurred(ring))) {
- /* Timeout occured waiting for reply */
- res = FALSE;
- break;
- }
- if ((pth_event_occurred(clenv->rring))) {
- if ((msg = (transfermsg *)pth_msgport_get(clenv->rport))) {
- res = msg->result;
+ if ((msg = (transfermsg *)pthread_msg_get(&clenv->rport))
+ == NULL) {
+ struct timeval tv;
+ struct timespec ts, ts1;
+
+ ts1.tv_sec = 120;
+ ts1.tv_nsec = 0;
+ (void) gettimeofday(&tv, NULL);
+ TIMEVAL_TO_TIMESPEC(&tv, &ts);
+ timespecadd(&ts, &ts1, &ts);
+
+ if (pthread_ring_wait(&clenv->rring, &ts) == ETIMEDOUT) {
+ res = FALSE;
break;
- }
}
+ } else {
+ res = msg->result;
+ break;
}
}
- pth_event_isolate(clenv->rring);
- pth_event_free(ring, PTH_FREE_ALL);
} else
res = TRUE;
return FALSE;
if (*addr == '\0')
break;
- } else if (isdigit(*addr))
+ } else if (isdigit((int)*addr))
*uptr++ = *addr;
else
return FALSE;
}
/* Open our reply port and start our file transfer thread */
- if ((clenv->rport = pth_msgport_create("XXX(NULL)"))) {
- clenv->rring = pth_event(PTH_EVENT_MSG, clenv->rport);
+ if (pthread_port_init(&clenv->rport) == 0 &&
+ pthread_ring_init(&clenv->rring) == 0 &&
+ pthread_port_set_ring(&clenv->rport, &clenv->rring) == 0) {
/* Init our clientenv as required */
clenv->fdb = fdb;
} else
DEBUG_PRINTF("handleclient", "start_transfer_thread()");
- pth_event_free(clenv->rring, PTH_FREE_ALL);
- while ((pth_msgport_get(clenv->rport)) != NULL) ;
- pth_msgport_destroy(clenv->rport);
+ while (pthread_msg_get(&clenv->rport) != NULL) ;
} else
- DEBUG_PRINTF("handleclient", "pth_msgport_create()");
+ DEBUG_PRINTF("handleclient", "pthread_port_init()");
+
+ if (pthread_port_valid(&clenv->rport))
+ pthread_port_destroy(&clenv->rport);
+ if (pthread_ring_valid(&clenv->rring))
+ pthread_ring_destroy(&clenv->rring);
control_in = FDBBYTESR(fdb);
control_out = FDBBYTESW(fdb);
p = CONF.PASV_RANGE_MIN;
}
if (err == -1)
- pth_sleep(1);
+ pthread_sleep(1);
}
if (err != -1) {
/* Successful bind(2) */
* other transfer requests when one is ongoing already.
*/
static void
-transferthread(struct thread_object *obj, void *args)
+transferthread(pthread_object_t *obj, void *args)
{
clientenv *clenv = args;
fdbuf *fdb;
if ((fdb = fdbopen(&gfdf, &fdbc, -1, FDB_BUFSIZE, FDB_BUFSIZE, 1024, 1024,
CONF.DATA_TIMEOUT * 1000, CONF.DATA_TIMEOUT * 1000,
FALSE)) != NULL) {
- if ((clenv->sport = pth_msgport_create("XXX(NULL)"))) {
+ if (pthread_port_init(&clenv->sport) == 0) {
/* Reply that we started thread properly */
ok = TRUE;
clenv->tmsg.result = TRUE;
- pth_msgport_put(clenv->rport, (pth_message_t *)&(clenv->tmsg));
+ pthread_msg_put(&clenv->rport, (pthread_msg_t *)&(clenv->tmsg));
/* Process our tasks */
transferthread_main(clenv, fdb);
- while ((msg = (transfermsg *)pth_msgport_get(clenv->sport))
+ while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
!= NULL) {
msg->result = FALSE;
- pth_msgport_reply((pth_message_t *)msg);
+ pthread_msg_reply((pthread_msg_t *)msg);
}
- pth_msgport_destroy(clenv->sport);
+ pthread_port_destroy(&clenv->sport);
} else
- DEBUG_PRINTF("transferthread", "pth_msgport_create()");
+ DEBUG_PRINTF("transferthread", "pthread_port_init()");
fdbclose(fdb);
} else
DEBUG_PRINTF("transferthread", "fdbopen()");
if (!ok) {
/* Reply that thread couldn't be setup properly */
clenv->tmsg.result = FALSE;
- pth_msgport_put(clenv->rport, (pth_message_t *)&(clenv->tmsg));
+ pthread_msg_put(&clenv->rport, (pthread_msg_t *)&(clenv->tmsg));
}
/*
- pth_exit(NULL);
+ pthread_exit(NULL);
*/
}
static void
transferthread_main(clientenv *clenv, fdbuf *fdb)
{
- int state = TTSTATE_INITIAL, l, reason;
+ int state = TTSTATE_INITIAL, l, reason, err;
register char *from, *to;
char ipaddr[20], buffer[T_BUFSIZE], *from2;
- pth_event_t ring, ring1;
+ pthread_ring_t ring;
transfermsg *msg;
struct sockaddr_in server, client, addr;
socklen_t addrl;
fdbparam_set(fdb, -1, FDBP_FD);
/* Transfer device entry setup */
- ring = pth_event(PTH_EVENT_MSG, clenv->sport);
+ pthread_ring_init(&ring);
+ pthread_port_set_ring(&clenv->sport, &ring);
/* Main device loop */
while (state != TTSTATE_DONE) {
/* State main loop */
while (state == TTSTATE_INITIAL) {
- if ((pth_wait(ring))) {
- if ((pth_event_occurred(ring))) {
- while ((msg = (transfermsg *)pth_msgport_get(
- clenv->sport)) != NULL) {
- /* Process request and reply */
- switch (msg->request) {
- case REQ_QUIT:
- msg->result = TRUE;
- state = TTSTATE_DONE;
- break;
- case REQ_ABORT:
- /* As we handle PASV port, do this */
- if (fdpasv != -1) {
- close(fdpasv);
- fdpasv = -1;
+ if (pthread_port_pending(&clenv->sport) < 1)
+ pthread_ring_wait(&ring, NULL);
+ while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
+ != NULL) {
+ /* Process request and reply */
+ switch (msg->request) {
+ case REQ_QUIT:
+ msg->result = TRUE;
+ state = TTSTATE_DONE;
+ break;
+ case REQ_ABORT:
+ /* As we handle PASV port, do this */
+ if (fdpasv != -1) {
+ close(fdpasv);
+ fdpasv = -1;
+ }
+ port = 0;
+ passive = ongoing = FALSE;
+ download = TRUE;
+ list = 0;
+ *path = '\0';
+ msg->result = TRUE;
+ break;
+ case REQ_STATUS:
+ msg->port = (u_int16_t)port;
+ msg->passive = passive;
+ msg->ongoing = ongoing;
+ msg->download = download;
+ msg->list = list;
+ msg->rbytes = FDBBYTESR(fdb);
+ msg->wbytes = FDBBYTESW(fdb);
+ msg->dlfiles = dlfiles;
+ msg->ulfiles = ulfiles;
+ msg->result = TRUE;
+ break;
+ case REQ_NEWPORT:
+ port = 0;
+ if (fd != -1) {
+ fdbflushw(fdb);
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ fd = -1;
+ fdbparam_set(fdb, -1, FDBP_FD);
+ }
+ /* As we handle PASV port, do this */
+ if (fdpasv != -1) {
+ close(fdpasv);
+ fdpasv = -1;
+ }
+ if (msg->passive) {
+ /* PASV request, we need to open a high
+ * port and start listening for a
+ * connection on it
+ */
+ msg->result = FALSE; /* Default */
+ if (pasv_bind(clenv, &server, &fdpasv,
+ &port)) {
+ msg->port = (u_int16_t)port;
+ passive = msg->result = TRUE;
+ break;
}
- port = 0;
- passive = ongoing = FALSE;
- download = TRUE;
- list = 0;
- *path = '\0';
- msg->result = TRUE;
- break;
- case REQ_STATUS:
- msg->port = (u_int16_t)port;
- msg->passive = passive;
- msg->ongoing = ongoing;
- msg->download = download;
- msg->list = list;
- msg->rbytes = FDBBYTESR(fdb);
- msg->wbytes = FDBBYTESW(fdb);
- msg->dlfiles = dlfiles;
- msg->ulfiles = ulfiles;
+ } else {
+ /* Normal PORT, just remember necessary
+ * things. The caller should have
+ * performed address and port sanity
+ * checking already.
+ */
+ port = (int)msg->port;
+ passive = FALSE;
msg->result = TRUE;
- break;
- case REQ_NEWPORT:
- port = 0;
- if (fd != -1) {
- fdbflushw(fdb);
- shutdown(fd, SHUT_RDWR);
- close(fd);
- fd = -1;
- fdbparam_set(fdb, -1, FDBP_FD);
- }
- /* As we handle PASV port, do this */
- if (fdpasv != -1) {
- close(fdpasv);
- fdpasv = -1;
- }
- if (msg->passive) {
- /* PASV request, we need to open a high
- * port and start listening for a
- * connection on it
- */
- msg->result = FALSE; /* Default */
- if (pasv_bind(clenv, &server, &fdpasv,
- &port)) {
- msg->port = (u_int16_t)port;
- passive = msg->result = TRUE;
- break;
- }
- } else {
- /* Normal PORT, just remember necessary
- * things. The caller should have
- * performed address and port sanity
- * checking already.
- */
- port = (int)msg->port;
- passive = FALSE;
+ }
+ break;
+ case REQ_TRANSFER:
+ msg->result = FALSE; /* Default */
+ if (port != 0) {
+ fdbparam_set(fdb, msg->rrate * 1024,
+ FDBP_RRATE);
+ fdbparam_set(fdb, msg->wrate * 1024,
+ FDBP_WRATE);
+ if (msg->path) mm_strncpy(path, msg->path,
+ MMPATH_MAX - 1);
+ else *path = '\0';
+ if (msg->list) {
+ list = msg->list;
msg->result = TRUE;
- }
- break;
- case REQ_TRANSFER:
- msg->result = FALSE; /* Default */
- if (port != 0) {
- fdbparam_set(fdb, msg->rrate * 1024,
- FDBP_RRATE);
- fdbparam_set(fdb, msg->wrate * 1024,
- FDBP_WRATE);
- if (msg->path) mm_strncpy(path, msg->path,
- MMPATH_MAX - 1);
- else *path = '\0';
- if (msg->list) {
- list = msg->list;
+ } else {
+ if (msg->file != -1) {
+ file = msg->file;
+ download = msg->download;
msg->result = TRUE;
- } else {
- if (msg->file != -1) {
- file = msg->file;
- download = msg->download;
- msg->result = TRUE;
- }
}
- if (msg->result)
- state = TTSTATE_CONNECT;
- } else
- msg->result = FALSE;
- break;
- default:
+ }
+ if (msg->result)
+ state = TTSTATE_CONNECT;
+ } else
msg->result = FALSE;
- }
- pth_msgport_reply((pth_message_t *)msg);
- }
+ break;
+ default:
+ msg->result = FALSE;
}
+ pthread_msg_reply((pthread_msg_t *)msg);
}
}
/* State entry setup */
ongoing = TRUE;
- /* Setup our connect timeout event */
- ring1 = pth_event(PTH_EVENT_TIME, pth_timeout(CONF.DATA_TIMEOUT,
- 0));
- pth_event_concat(ring, ring1, NULL);
/* State main loop */
while (state == TTSTATE_CONNECT) {
if (passive) {
- /* Accept connection from client */
+ /*
+ * Accept connection from client.
+ * On timeout, ETIMEDOUT is returned, on interrupting
+ * message, ECANCELED is. Other errror codes could be
+ * returned, or the new file descriptor.
+ */
addrl = sizeof(struct sockaddr);
- fd = pth_accept_ev(fdpasv, (struct sockaddr *)&addr,
- &addrl, ring);
+ if ((err = pthread_accept_ring(fdpasv,
+ (struct sockaddr *)&addr,
+ &addrl, CONF.DATA_TIMEOUT * 1000,
+ &ring)) > -1) {
+ fd = err;
+ err = 0;
+ } else {
+ fd = -1;
+ err = errno;
+ }
} else {
/* Start connection to the client */
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) != -1) {
client.sin_family = AF_INET;
client.sin_addr.s_addr = clenv->cipaddr;
client.sin_port = htons(port);
- /* Finally attempt connection */
- if ((pth_connect_ev(fd, (struct sockaddr *)&client,
+ /*
+ * Finally attempt connection.
+ * On timeout, ETIMEDOUT is returned, on interrupting
+ * message, ECANCELED is. Other error codes could be
+ * returned, or 0 on success. In the case of any of
+ * the two special error codes we just described, the
+ * connection request is still pending in the kernel.
+ * We can continue calling the following function
+ * until we get connected, which can return EISCONN if
+ * it already was connected, or we could monitor the
+ * status of the connection using getsockopt(2) with
+ * SOL_SOCKET level and SO_ERROR option. If we want
+ * to abort the in-progress connection, we need to use
+ * close(2).
+ */
+ /*
+ * XXX Hmm we should loop here! Not create new sockets
+ * again in the loop! Unless we wanted to make
+ * pthread_connect_ring() automatically cancel any
+ * connection closing the descriptor upon ETIMEDOUT or
+ * ECANCELED return codes, in which case we could
+ * continue the current behavior.
+ */
+ if ((pthread_connect_ring(fd,
+ (struct sockaddr *)&client,
sizeof(struct sockaddr_in),
- ring)) == -1) {
+ CONF.DATA_TIMEOUT * 1000,
+ &ring)) != 0 && errno != ETIMEDOUT &&
+ errno != ECANCELED) {
mmsyslog(1, LOGLEVEL,
"%08X PORT data connection failiure",
clenv->id);
REGISTER_ERROR();
state = TTSTATE_INITIAL;
break;
- }
+ } else if (errno == ETIMEDOUT || errno == ECANCELED) {
+ /* XXX For now */
+ (void) close(fd);
+ fd = -1;
+ err = ETIMEDOUT;
+ } else
+ err = 0;
} else {
DEBUG_PRINTF("transferthread_main-2", "socket()");
reply(clenv->fdb, 425, FALSE,
}
}
/* Verify if connection timeout occured */
- if ((pth_event_occurred(ring1))) {
+ if (err == ETIMEDOUT) {
mmsyslog(1, LOGLEVEL, "%08X Data connection timeout",
clenv->id);
reply(clenv->fdb, 425, FALSE,
state = TTSTATE_INITIAL;
break;
}
- /* Check for any request events and process them if any */
- if ((pth_event_occurred(ring))) {
- while ((msg = (transfermsg *)pth_msgport_get(
- clenv->sport)) != NULL) {
- /* Process request and reply */
- switch (msg->request) {
- case REQ_QUIT:
- msg->result = TRUE;
- state = TTSTATE_DONE;
- break;
- case REQ_ABORT:
- msg->result = TRUE;
- state = TTSTATE_INITIAL;
- break;
- case REQ_STATUS:
- msg->port = (u_int16_t)port;
- msg->passive = passive;
- msg->ongoing = ongoing;
- msg->download = download;
- msg->list = list;
- msg->rbytes = FDBBYTESR(fdb);
- msg->wbytes = FDBBYTESW(fdb);
- msg->dlfiles = dlfiles;
- msg->ulfiles = ulfiles;
- msg->result = TRUE;
- break;
- default:
- msg->result = FALSE;
- }
- pth_msgport_reply((pth_message_t *)msg);
+ /*
+ * Check for any request events and process them if any.
+ * ECANCELED above can also cause this code execution to be
+ * triggered fast.
+ */
+ while ((msg = (transfermsg *)pthread_msg_get(&clenv->sport))
+ != NULL) {
+ /* Process request and reply */
+ switch (msg->request) {
+ case REQ_QUIT:
+ msg->result = TRUE;
+ state = TTSTATE_DONE;
+ break;
+ case REQ_ABORT:
+ msg->result = TRUE;
+ state = TTSTATE_INITIAL;
+ break;
+ case REQ_STATUS:
+ msg->port = (u_int16_t)port;
+ msg->passive = passive;
+ msg->ongoing = ongoing;
+ msg->download = download;
+ msg->list = list;
+ msg->rbytes = FDBBYTESR(fdb);
+ msg->wbytes = FDBBYTESW(fdb);
+ msg->dlfiles = dlfiles;
+ msg->ulfiles = ulfiles;
+ msg->result = TRUE;
+ break;
+ default:
+ msg->result = FALSE;
}
+ pthread_msg_reply((pthread_msg_t *)msg);
}
if (state == TTSTATE_CONNECT) {
/* Verify if connection was established */
}
/* State exit cleanup */
- pth_event_isolate(ring);
- pth_event_free(ring1, PTH_FREE_ALL);
} else if (state == TTSTATE_TRANSFER) { /* TRANSFER STATE */
TYPE_IMAGE ? fdbrcleartosend(fdb) :
fdbcleartosend(fdb))) {
/* Client ready to be written some bytes to */
- if ((l = pth_read(file, buffer, T_BUFSIZE)) > 0) {
+ if ((l = read(file, buffer, T_BUFSIZE)) > 0) {
if (clenv->type == TYPE_ASCII) {
/* ASCII xfer, perform \n to \r\n
* conversion
break;
}
/* Verify if we received any important events */
- if ((pth_event_occurred(ring))) {
- while ((msg = (transfermsg *)pth_msgport_get(
- clenv->sport)) != NULL) {
- /* Process request and reply */
- switch (msg->request) {
- case REQ_QUIT:
- msg->result = TRUE;
- state = TTSTATE_DONE;
- cont = FALSE;
- break;
- case REQ_ABORT:
- msg->result = TRUE;
- state = TTSTATE_INITIAL;
- cont = FALSE;
- reason = TR_ABORTED;
- break;
- case REQ_STATUS:
- msg->port = (u_int16_t)port;
- msg->passive = passive;
- msg->ongoing = ongoing;
- msg->download = download;
- msg->list = list;
- msg->rbytes = FDBBYTESR(fdb);
- msg->wbytes = FDBBYTESW(fdb);
- msg->dlfiles = dlfiles;
- msg->ulfiles = ulfiles;
- msg->result = TRUE;
- break;
- default:
- msg->result = FALSE;
- }
- pth_msgport_reply((pth_message_t *)msg);
+ while ((msg = (transfermsg *)pthread_msg_get(
+ &clenv->sport)) != NULL) {
+ /* Process request and reply */
+ switch (msg->request) {
+ case REQ_QUIT:
+ msg->result = TRUE;
+ state = TTSTATE_DONE;
+ cont = FALSE;
+ break;
+ case REQ_ABORT:
+ msg->result = TRUE;
+ state = TTSTATE_INITIAL;
+ cont = FALSE;
+ reason = TR_ABORTED;
+ break;
+ case REQ_STATUS:
+ msg->port = (u_int16_t)port;
+ msg->passive = passive;
+ msg->ongoing = ongoing;
+ msg->download = download;
+ msg->list = list;
+ msg->rbytes = FDBBYTESR(fdb);
+ msg->wbytes = FDBBYTESW(fdb);
+ msg->dlfiles = dlfiles;
+ msg->ulfiles = ulfiles;
+ msg->result = TRUE;
+ break;
+ default:
+ msg->result = FALSE;
}
+ pthread_msg_reply((pthread_msg_t *)msg);
}
}
if (*from == '\r') {
if (treesize_edit(clenv,
from - from2)) {
- if ((l = pth_write(file,
- from2, from - from2))
- != from - from2) {
+ if ((l = write(file, from2,
+ from - from2)) !=
+ from - from2) {
mmsyslog(0, LOGLEVEL,
"%08X Error "
"writing to "
}
if (l && from2 < to) {
if (treesize_edit(clenv, to - from2)) {
- if ((l = pth_write(file, from2,
- to - from2))
- != to - from2) {
+ if ((l = write(file, from2,
+ to - from2)) != to - from2) {
mmsyslog(0, LOGLEVEL,
"%08X Error writing "
"to disk!",
/* IMAGE/binary transfer, write data as-is
*/
if (treesize_edit(clenv, l)) {
- if ((pth_write(file, buffer, l)) !=
- l) {
+ if ((write(file, buffer, l)) != l) {
mmsyslog(0, LOGLEVEL,
"%08X Error writing to "
"disk!", clenv->id);
break;
}
/* Verify if we received any important events */
- if ((pth_event_occurred(ring))) {
- while ((msg = (transfermsg *)pth_msgport_get(
- clenv->sport)) != NULL) {
- /* Process request and reply */
- switch (msg->request) {
- case REQ_QUIT:
- msg->result = TRUE;
- state = TTSTATE_DONE;
- cont = FALSE;
- break;
- case REQ_ABORT:
- msg->result = TRUE;
- state = TTSTATE_INITIAL;
- cont = FALSE;
- reason = TR_ABORTED;
- break;
- case REQ_STATUS:
- msg->port = (u_int16_t)port;
- msg->passive = passive;
- msg->ongoing = ongoing;
- msg->download = download;
- msg->list = list;
- msg->rbytes = FDBBYTESR(fdb);
- msg->wbytes = FDBBYTESW(fdb);
- msg->dlfiles = dlfiles;
- msg->ulfiles = ulfiles;
- msg->result = TRUE;
- break;
- default:
- msg->result = FALSE;
- }
- pth_msgport_reply((pth_message_t *)msg);
+ while ((msg = (transfermsg *)pthread_msg_get(
+ &clenv->sport)) != NULL) {
+ /* Process request and reply */
+ switch (msg->request) {
+ case REQ_QUIT:
+ msg->result = TRUE;
+ state = TTSTATE_DONE;
+ cont = FALSE;
+ break;
+ case REQ_ABORT:
+ msg->result = TRUE;
+ state = TTSTATE_INITIAL;
+ cont = FALSE;
+ reason = TR_ABORTED;
+ break;
+ case REQ_STATUS:
+ msg->port = (u_int16_t)port;
+ msg->passive = passive;
+ msg->ongoing = ongoing;
+ msg->download = download;
+ msg->list = list;
+ msg->rbytes = FDBBYTESR(fdb);
+ msg->wbytes = FDBBYTESW(fdb);
+ msg->dlfiles = dlfiles;
+ msg->ulfiles = ulfiles;
+ msg->result = TRUE;
+ break;
+ default:
+ msg->result = FALSE;
}
+ pthread_msg_reply((pthread_msg_t *)msg);
}
}
}
}
if (fdpasv != -1) close(fdpasv);
- pth_event_free(ring, PTH_FREE_ALL);
+ pthread_ring_destroy(&ring);
}
/* mmfd library thread support functions */
+static pthread_mutexattr_t thread_ma;
+
+static void
+thread_init(void)
+{
+ (void) pthread_mutexattr_init(&thread_ma);
+ (void) pthread_mutexattr_settype(&thread_ma, PTHREAD_MUTEX_RECURSIVE);
+}
static void *
-_pth_mutex_create(void)
+thread_mutex_create(void)
{
struct mutexnode *mnod;
- pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+ pthread_mutex_lock(&mutexes_lock);
mnod = (struct mutexnode *)pool_alloc(&mutexes_pool, FALSE);
- pth_mutex_release(&mutexes_lock);
+ pthread_mutex_unlock(&mutexes_lock);
- if (mnod)
- pth_mutex_init(&mnod->mutex);
+ if (mnod != NULL)
+ pthread_mutex_init(&mnod->mutex, &thread_ma);
return ((void *)mnod);
}
static void *
-_pth_mutex_destroy(void *mtx)
+thread_mutex_destroy(void *mtx)
{
- /* struct mutexnode *mnod = mtx; */
+ struct mutexnode *mnod = mtx;
- /* pth_mutex_destroy(&mnod->mutex); */
- pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+ pthread_mutex_destroy(&mnod->mutex);
+
+ pthread_mutex_lock(&mutexes_lock);
pool_free(mtx);
- pth_mutex_release(&mutexes_lock);
+ pthread_mutex_unlock(&mutexes_lock);
return (NULL);
}
static void
-_pth_mutex_lock(void *mtx)
+thread_mutex_lock(void *mtx)
{
struct mutexnode *mnod = mtx;
- pth_mutex_acquire(&mnod->mutex, FALSE, NULL);
+ pthread_mutex_lock(&mnod->mutex);
}
static void
-_pth_mutex_unlock(void *mtx)
+thread_mutex_unlock(void *mtx)
{
struct mutexnode *mnod = mtx;
- pth_mutex_release(&mnod->mutex);
-}
-
-
-static void
-_pth_thread_yield(void)
-{
- pth_yield(NULL);
-}
-
-
-static bool
-_pth_eintr(void)
-{
- if (errno == EINTR)
- return TRUE;
-
- return FALSE;
+ pthread_mutex_unlock(&mnod->mutex);
}
static bool
-eintr(void)
+thread_eintr(void)
{
if (errno == EINTR)
return TRUE;
NULL,
NULL,
NULL,
- eintr
+ thread_eintr /* not thread-specific */
};
getuserline_fdb = fdbopen(&fdf, NULL, -1, FDB_BUFSIZE, 0, 0, 0,
found = FALSE;
size = 0;
len = mm_strlen(path) + 1;
- pth_mutex_acquire("a_lock, FALSE, NULL);
+ pthread_mutex_lock("a_lock);
if ((qnod = (quotanode *)hashtable_lookup("a_table, path, len))
!= NULL) {
struct stat st;
}
}
}
- pth_mutex_release("a_lock);
+ pthread_mutex_unlock("a_lock);
if (found)
return (size);
-/* $Id: mmftpd.h,v 1.25 2004/09/19 10:59:08 mmondor Exp $ */
+/* $Id: mmftpd.h,v 1.26 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2001-2004, Matthew Mondor
/* HEADERS */
#include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
#include <mmfd.h>
#include <mmpath.h>
#include <mmfifo.h>
#include <mmstat.h>
#include <mmserver.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_msg.h>
+#include <mm_pthread_pool.h>
+#include <mm_pthread_sleep.h>
/* DEFINITIONS */
#define DAEMON_NAME "mmftpd"
-#define DAEMON_VERSION "0.0.18/mmondor"
+#define DAEMON_VERSION "0.1.0/mmondor"
/* Transfer buffer size */
#define T_BUFSIZE 16384
#define REGISTER_ERROR() do { \
clenv->errors++; \
if (CONF.DELAY_ON_ERROR) \
- pth_sleep(clenv->errors); \
+ pthread_sleep(clenv->errors); \
} while (FALSE)
#define REGISTER_PERMISSION() do { \
/* We communicate with our transfer thread using this message structure */
typedef struct transfermsg {
- pth_message_t msg; /* Internal message node */
+ pthread_msg_t msg; /* Internal message node */
int request; /* Request to send to device */
bool result; /* Result status after device request */
u_int16_t port; /* PORT or PASV port for next transfer */
struct lusernode *unode; /* User's lusernode, for quick lookup */
char cwd[MMPATH_MAX]; /* User's current directory (chrooted) */
struct transfermsg tmsg; /* Only need one shared msg with tthread */
- pth_msgport_t rport, sport; /* Message ports (tthread and ours) */
- pth_event_t rring; /* Ring of events we wait for */
- pth_t tthread; /* Thread ID of our transfer thread */
+ pthread_port_t rport, sport; /* Message ports (tthread and ours) */
+ pthread_ring_t rring; /* Ring of events we wait for */
+ pthread_t tthread; /* Thread ID of our transfer thread */
long errors; /* Total number of errors that occured */
long attempts; /* Auth state temporary counter */
long maxlogins; /* Allowed simultanious logins for user */
typedef struct lusernode {
hashnode_t node;
char user[32]; /* User name (key) */
- pth_mutex_t lock; /* For treesize_edit() */
+ pthread_mutex_t lock; /* For treesize_edit() */
long logins; /* Simultanious current logins for user */
off_t homesize; /* Current home directory size for user */
} lusernode;
/* Used for mmfd thread support delegation/abstraction */
struct mutexnode {
pnode_t node;
- pth_mutex_t mutex;
+ pthread_mutex_t mutex;
};
/* Used to efficiently allocate FIFO buffers for recently visited directories
struct async_clenv *);
static bool pasv_bind(clientenv *, struct sockaddr_in *, int *, int *);
-static void transferthread(struct thread_object *, void *);
+static void transferthread(pthread_object_t *, void *);
static void transferthread_main(clientenv *, fdbuf *);
-static void *_pth_mutex_create(void);
-static void *_pth_mutex_destroy(void *);
-static void _pth_mutex_lock(void *);
-static void _pth_mutex_unlock(void *);
-static void _pth_thread_yield(void);
-static bool _pth_eintr(void);
-static bool eintr(void);
+static void thread_init(void);
+static void *thread_mutex_create(void);
+static void *thread_mutex_destroy(void *);
+static void thread_mutex_lock(void *);
+static void thread_mutex_unlock(void *);
+static bool thread_eintr(void);
static void async_checkpw(struct async_msg *);
static void async_treesize(struct async_msg *);
-/* $Id: mmreadcfg.c,v 1.20 2006/06/13 17:17:02 mmondor Exp $ */
+/* $Id: mmreadcfg.c,v 1.21 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 1991-2004, Matthew Mondor
MMCOPYRIGHT("@(#) Copyright (c) 1991-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmreadcfg.c,v 1.20 2006/06/13 17:17:02 mmondor Exp $");
+MMRCSID("$Id: mmreadcfg.c,v 1.21 2007/03/13 20:28:22 mmondor Exp $");
-/* $Id: mmserver.c,v 1.34 2005/02/20 01:18:01 mmondor Exp $ */
+/* $Id: mmserver.c,v 1.35 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2000-2004, Matthew Mondor
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
+#include <string.h>
+#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
#include <syslog.h>
-#include <pth.h>
+#include <pthread.h>
#include <signal.h>
#include <sys/poll.h>
#include <time.h>
#include <mmreadcfg.h>
#include <mmlimitrate.h>
#include <mmlog.h>
-#include <mm_pth_pool.h>
+
+#include <mm_pthread_pool.h>
+#include <mm_pthread_msg.h>
+#include <mm_pthread_poll.h>
+#include <mm_pthread_sleep.h>
MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmserver.c,v 1.34 2005/02/20 01:18:01 mmondor Exp $");
+MMRCSID("$Id: mmserver.c,v 1.35 2007/03/13 20:28:22 mmondor Exp $");
static void sighandler(int);
static struct iface * parse_ifaces(char *, char *);
-static void phandleclient(struct thread_object *, void *);
+static void phandleclient(pthread_object_t *, void *);
static void writepidfile(const char *);
static void * async_thread(void *);
static struct async_clenv * async_open_clenv(void);
static int (*handleclientfunc)(unsigned long, int,
clientlistnode *, struct iface *,
struct async_clenv *);
-static pth_mutex_t ctable_lock, ppool_lock;
+static pthread_mutex_t ctable_lock = PTHREAD_MUTEX_INITIALIZER,
+ ppool_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t cpool, ppool;
static hashtable_t ctable;
static struct async_env *async = NULL;
/* MAIN DAEMON PROGRAM */
-/* Before calling this function, the async_init() and async_init_pth()
+/* Before calling this function, the async_init() and async_init_pthread()
* functions should have been called. Before any async_*() or tcp_server()
* calls the process is expected to have called make_daemon().
*/
struct ifacendx *fdsi;
struct iface *ifaces, *tif;
unsigned long id;
- pth_t clnode_thread;
- pth_attr_t threadattr, clnode_threadattr;
- int tcp_proto;
+ pthread_t clnode_thread;
+ pthread_attr_t threadattr, clnode_threadattr;
+ int tcp_proto, clnode_thread_started, err;
sinaddr = (struct sockaddr_in *)&addr;
handleclientfunc = handleclient1;
fds = NULL;
fdsi = NULL;
ifaces = NULL;
+ clnode_thread_started = -1;
- /* Pth related */
- threadattr = pth_attr_new();
- pth_attr_set(threadattr, PTH_ATTR_JOINABLE, FALSE);
- clnode_threadattr = pth_attr_new();
- pth_attr_set(clnode_threadattr, PTH_ATTR_JOINABLE, TRUE);
- pth_mutex_init(&ctable_lock);
- pth_mutex_init(&ppool_lock);
+ /* Pthread related */
+ pthread_attr_init(&threadattr);
+ pthread_attr_setdetachstate(&threadattr, 1);
+ pthread_attr_init(&clnode_threadattr);
+ pthread_attr_setdetachstate(&clnode_threadattr, 0);
/* Used by resolve_hostname() */
if (!mmstrinit(malloc, free, 16384)) {
sizeof(phandleinfo), 16384 / sizeof(phandleinfo), 0, 0) ||
!hashtable_init(&ctable, "ctable", maxips, 1, malloc, free,
clnode_keycmp, clnode_keyhash, FALSE) ||
- ((clnode_thread = pth_spawn(clnode_threadattr,
- clnode_expire_thread, &rateper)) == NULL)) {
+ ((clnode_thread_started = pthread_create(&clnode_thread,
+ &clnode_threadattr, clnode_expire_thread, &rateper))) != 0) {
syslog(LOG_NOTICE, "tcp_server() - Out of memory");
closelog();
mmstrexit();
exit(-1);
}
- if (thread_object_init() == -1) {
- syslog(LOG_NOTICE, "tcp_server() - thread_object_init()");
+ if ((err = pthread_object_init(8)) != 0) {
+ syslog(LOG_NOTICE, "tcp_server() - pthread_object_init() - %s",
+ strerror(err));
exit(-1);
}
if (ret) {
free(ifaces);
mmstrexit();
- if (clnode_thread != NULL) {
- pth_abort(clnode_thread);
- pth_join(clnode_thread, NULL);
+ if (clnode_thread_started == 0) {
+ pthread_cancel(clnode_thread);
+ pthread_join(clnode_thread, NULL);
}
if (HASHTABLE_VALID(&ctable))
hashtable_destroy(&ctable, FALSE);
/* Start answering and dispatching connections */
syslog(LOG_NOTICE, "Started for uid: %d", uid);
for (;;) {
- if ((nifaces2 = pth_poll(fds, nifaces, -1)) > 0) {
+ if ((nifaces2 = poll(fds, nifaces, -1)) > 0) {
/* Loop but once only, and only for long as nifaces2 times.
* Use our fast index to reference to the interface structure.
*/
if (fds[i].revents & POLLIN) {
nifaces2--;
addrl = sizeof(struct sockaddr);
- if ((msgsock = pth_accept(fds[i].fd, &addr, &addrl))
- != -1) {
+ if ((msgsock = accept(fds[i].fd, &addr, &addrl)) != -1) {
/* Make sure that we respect connection and rate
* limits.
*/
ok = FALSE;
reason = MMS_NORMAL;
- pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+ pthread_mutex_lock(&ctable_lock);
if ((clnode = (clientlistnode *)hashtable_lookup(
&ctable, &sinaddr->sin_addr.s_addr,
sizeof(u_int32_t))) == NULL) {
} else
reason = MMS_CONPERADDRESS_EXCEEDED;
}
- pth_mutex_release(&ctable_lock);
+ pthread_mutex_unlock(&ctable_lock);
if (ok) {
phandleinfo *phi;
/* Dispatch client to a thread */
- pth_mutex_acquire(&ppool_lock, FALSE, NULL);
+ pthread_mutex_lock(&ppool_lock);
phi = (phandleinfo *)pool_alloc(&ppool, FALSE);
- pth_mutex_release(&ppool_lock);
+ pthread_mutex_unlock(&ppool_lock);
if (phi != NULL) {
id++;
phi->id = id;
phi->clnode = clnode;
/* Indexed reference */
phi->iface = fdsi[i].iface;
- if (thread_object_call(NULL, phandleclient,
- phi) != -1)
+ if (pthread_object_call(NULL, phandleclient,
+ phi) == 0)
clnode->connections++;
else {
if (phi != NULL) {
- pth_mutex_acquire(&ppool_lock, FALSE,
- NULL);
+ pthread_mutex_lock(&ppool_lock);
pool_free((pnode_t *)phi);
- pth_mutex_release(&ppool_lock);
+ pthread_mutex_unlock(&ppool_lock);
}
syslog(LOG_NOTICE, "tcp_server() - "
"Error launching thread");
/* Close down connection with client
* immediately, sending message.
*/
- (void) pth_write(msgsock, message, msglen);
+ (void) write(msgsock, message, msglen);
shutdown(msgsock, 2);
close(msgsock);
mm_strcpy(ipaddr, "0.0.0.0");
syslog(LOG_NOTICE, "tcp_server() - [%s] - %s",
ipaddr, MMS_RSTRING(reason));
}
-
- pth_mutex_release(&ctable_lock);
-
} else
- syslog(LOG_NOTICE, "tcp_server() - pth_accept()");
+ syslog(LOG_NOTICE, "tcp_server() - accept()");
}
}
}
free(ifaces);
mmstrexit();
- pth_abort(clnode_thread);
- pth_join(clnode_thread, NULL);
+ pthread_cancel(clnode_thread);
+ pthread_join(clnode_thread, NULL);
hashtable_destroy(&ctable, FALSE);
pool_destroy(&ppool);
pool_destroy(&cpool);
- /* Unrequired for Pth (non-existing even)
- * pth_mutex_destroy(&apool_lock);
- * pth_mutex_destroy(&ctable_lock);
- * pth_mutex_destroy(&ppool_lock);
- */
exit(ret);
}
signal(SIGTTIN, SIG_IGN);
signal(SIGTSTP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
+ signal(SIGUSR2, SIG_IGN);
umask(0);
/* SERVER (THREAD) */
-/* This is started by pth_spawn(), it calls the user handler function, which
- * only has to care about serving the client.
+/* This is started by pthread_create(), it calls the user handler function,
+ * which only has to care about serving the client.
*/
static void
-phandleclient(struct thread_object *obj, void *args)
+phandleclient(pthread_object_t *obj, void *args)
{
phandleinfo *phi;
int socket, ret;
socket = phi->socket;
clnode = phi->clnode;
iface = phi->iface;
- pth_mutex_acquire(&ppool_lock, FALSE, NULL);
+ pthread_mutex_lock(&ppool_lock);
pool_free((pnode_t *)phi);
- pth_mutex_release(&ppool_lock);
+ pthread_mutex_unlock(&ppool_lock);
ret = 0;
}
if (tmp) {
/* Lock the mutex for a very short moment */
- pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+ pthread_mutex_lock(&ctable_lock);
/* Verify again for NULL to avoid a potential memory leak */
if(clnode->hostname == NULL)
clnode->hostname = tmp;
else
tmp = mmstrfree(tmp);
- pth_mutex_release(&ctable_lock);
+ pthread_mutex_unlock(&ctable_lock);
}
}
/* Decrease our connection/refcount counter, and let our ctable thread
* decide if/when to uncache our node.
*/
- pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+ pthread_mutex_lock(&ctable_lock);
if (clnode->connections > 0)
clnode->connections--;
- pth_mutex_release(&ctable_lock);
+ pthread_mutex_unlock(&ctable_lock);
/* kthxbye :) */
}
/* Fatal error */
syslog(LOG_NOTICE, "async_thread() - calloc(%d)",
idx_size + pfd_size);
- pth_exit(NULL);
+ pthread_exit(NULL);
}
i = 0;
DLIST_FOREACH(freeprocs, proc) {
/* First check for new messages and queue them in our queue list if
* any, but only wait for them if all processes are free (we don't
- * expect any results). Special note: Pth will only notify event for
- * a port if it received a message while it was empty.
+ * expect any results).
*/
- if ((pth_msgport_pending(async->port))) {
- while ((omsg = (char *)pth_msgport_get(async->port)) != NULL)
- DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
- } else if (freeprocs->nodes == async->nprocs) {
- pth_wait(async->ring);
- while ((omsg = (char *)pth_msgport_get(async->port)) != NULL)
- DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
+ while ((omsg = (char *)pthread_msg_get(&async->port)) != NULL)
+ DLIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
+ if (DLIST_NODES(freeprocs) == async->nprocs &&
+ DLIST_NODES(&queue) == 0) {
+ pthread_ring_wait(&async->ring, NULL);
+ continue;
}
/* Verify for any available processes to dispatch requests to, and
if (DEBUG_TRUE(m->func_id < async->nfuncs)) {
len = async->funcs[m->func_id].msg_len;
idx[p->sock].aclenv = m->aclenv;
- if ((pth_send(p->sock, m, len, 0)) == len)
+ if ((send(p->sock, m, len, 0)) == len)
DLIST_UNLINK(freeprocs, (node_t *)p);
else
syslog(LOG_NOTICE, "async_thread() - send(%d:%d)",
}
/* Wait for results from our async processes via their socket,
- * but be sure to break when new Pth messages are available.
+ * but be sure to break when new inter-thread messages are available.
* If new results are obtained, reply back to the caller thread.
+ * If all processes are free, go back to pthread_ring_wait()
+ * instead of pthread_poll_ring().
*/
- for (;;) {
- int selr, i;
-
- if ((selr = pth_poll_ev(pfds, nfds, -1, async->ring)) > 0) {
- for (i = 0; selr > 0 && i < nfds; i++) {
- if (pfds[i].revents & POLLERR) {
- /* If this happens something is really wrong, exit */
- syslog(LOG_NOTICE, "async_thread() - POLLERR(%d)",
- pfds[i].fd);
- kill(0, SIGTERM);
- for (;;)
- sleep(30);
- }
- if (pfds[i].revents & POLLIN) {
- int fd = pfds[i].fd;
- size_t len;
- struct async_proc *p = idx[fd].aproc;
- struct async_clenv *e = idx[fd].aclenv;
-
- /* Results, reply message back to client thread and
- * return this process node in the free pool.
- */
- if ((len = pth_recv(fd, e->msg, async->msg_len,
- MSG_WAITALL)) <
- sizeof(struct async_msg))
- syslog(LOG_NOTICE, "async_thread() - recv(%d:%d)",
- fd, (int)async->msg_len);
- pth_msgport_reply(&(e->msg->msg));
- DLIST_APPEND(freeprocs, (node_t *)p);
- selr--;
- }
- }
- }
- if ((pth_event_occurred(async->ring)))
- /* Other requests we must queue immediately */
- break;
+ while (DLIST_NODES(freeprocs) < async->nprocs) {
+ int selr, i;
+
+ selr = pthread_poll_ring(pfds, nfds, -1, &async->ring);
+ if (selr == -1 && errno == ECANCELED)
+ break;
+ for (i = 0; selr > 0 && i < nfds; i++) {
+ if (pfds[i].revents & POLLERR) {
+ /* If this happens something is really wrong, exit */
+ syslog(LOG_NOTICE, "async_thread() - POLLERR(%d)",
+ pfds[i].fd);
+ kill(0, SIGTERM);
+ for (;;)
+ pthread_sleep(30);
+ }
+ if (pfds[i].revents & POLLIN) {
+ int fd = pfds[i].fd;
+ size_t len;
+ struct async_proc *p = idx[fd].aproc;
+ struct async_clenv *e = idx[fd].aclenv;
+
+ /* Results, reply message back to client thread and
+ * return this process node in the free pool.
+ */
+ if ((len = recv(fd, e->msg, async->msg_len,
+ MSG_WAITALL)) < sizeof(struct async_msg))
+ syslog(LOG_NOTICE, "async_thread() - recv(%d:%d)",
+ fd, (int)async->msg_len);
+ pthread_msg_reply(&(e->msg->msg));
+ DLIST_APPEND(freeprocs, (node_t *)p);
+ selr--;
+ }
+ }
}
}
/* NOTREACHED */
- pth_exit(NULL);
+ pthread_exit(NULL);
return (NULL);
}
/* This function should be called at early application startup to setup
- * the asynchroneous pool of processes. async_init_pth() call may then
- * be called later on after pth_init() to initialize the client-side
+ * the asynchroneous pool of processes. async_init_pthread() call may then
+ * be called later on after pthread_init() to initialize the client-side
* asynchroneous context (required before using async_open_clenv()).
* This function is assumed to be called once only, and it's effects to
* only be discarded when the main process exits.
}
-/* This function is used by an application after pth_init() to initialize
+/* This function is used by an application after pthread_init() to initialize
* the asynchroneous thread device/server, which will dispatch requests to
* the asynchroneous pool of processes in a thread-safe manner (without
* blocking the whole process. async_init() should have been called first.
*/
bool
-async_init_pth(void)
+async_init_pthread(void)
{
size_t env_len;
bool res = FALSE;
if (DEBUG_TRUE(async)) {
+ pthread_poll_init();
+
res = TRUE;
- pth_mutex_init(&async->apool_lock);
+ pthread_mutex_init(&async->apool_lock, NULL);
/* Setup our fast-allocation pool for client-side async environments */
env_len = (size_t)OALIGN_CEIL(sizeof(struct async_clenv), long);
if (!pool_init(&async->apool, "asyncenv_pool", malloc, free,
aclenv_constructor, aclenv_destructor,
env_len + async->msg_len,
16384 / (env_len + async->msg_len), 0, 0)) {
- syslog(LOG_NOTICE, "async_init_pth() - pool_init(apool)");
+ syslog(LOG_NOTICE, "async_init_pthread() - pool_init(apool)");
res = FALSE;
}
if (res) {
- pth_attr_t attrs;
+ pthread_attr_t attrs;
+ pthread_t thread;
res = FALSE;
- if ((attrs = pth_attr_new()) != NULL) {
- /* Start async slave thread device */
- if ((async->port = pth_msgport_create("mms_async_server"))) {
- if ((async->ring = pth_event(PTH_EVENT_MSG,
- async->port))) {
- pth_attr_set(attrs, PTH_ATTR_JOINABLE, FALSE);
- if ((pth_spawn(attrs, async_thread, NULL)) != NULL)
- res = TRUE;
- pth_attr_destroy(attrs);
- } else
- syslog(LOG_NOTICE, "async_init_pth() - pth_event()");
+ /* Start async slave thread device */
+ if (pthread_port_init(&async->port) == 0) {
+ if (pthread_ring_init(&async->ring) == 0) {
+ pthread_port_set_ring(&async->port, &async->ring);
+ pthread_attr_init(&attrs);
+ pthread_attr_setdetachstate(&attrs, 1);
+ if ((pthread_create(&thread, &attrs, async_thread, NULL))
+ == 0)
+ res = TRUE;
} else
- syslog(LOG_NOTICE,
- "async_init_pth() - pth_msgport_create()");
- pth_attr_destroy(attrs);
- }
+ syslog(LOG_NOTICE, "async_init_pth() - pth_event()");
+ } else
+ syslog(LOG_NOTICE,
+ "async_init_pth() - pth_msgport_create()");
+ pthread_attr_destroy(&attrs);
}
if (!res) {
- if (async->ring) {
- pth_event_free(async->ring, PTH_FREE_ALL);
- async->ring = NULL;
- }
- if (async->port) {
- pth_msgport_destroy(async->port);
- async->port = NULL;
- }
+ if (pthread_port_valid(&async->port))
+ pthread_port_destroy(&async->port);
+ if (pthread_ring_valid(&async->ring))
+ pthread_ring_destroy(&async->ring);
pool_destroy(&async->apool);
}
} else
- DEBUG_PRINTF("async_init_pth", "Call async_init() first");
+ DEBUG_PRINTF("async_init_pthread", "Call async_init() first");
return (res);
}
/* Used by a client thread to initialize a context suitable to call the
* async_call() function with. Each thread needs one to use the device.
- * async_init() and async_init_pth() should have been called first.
+ * async_init() and async_init_pthread() should have been called first.
*/
static struct async_clenv *
async_open_clenv(void)
if (DEBUG_TRUE(async != NULL && POOL_VALID(&async->apool))) {
/* Optimized for speed using a pool of pre-allocated nodes */
- pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
+ pthread_mutex_lock(&async->apool_lock);
aclenv = (struct async_clenv *)pool_alloc(&async->apool, FALSE);
- pth_mutex_release(&async->apool_lock);
+ pthread_mutex_unlock(&async->apool_lock);
} else
DEBUG_PRINTF("async_open_clenv",
"Call async_init() and async_init_pth() first");
static struct async_clenv *
async_close_clenv(struct async_clenv *aclenv)
{
- pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
+ pthread_mutex_lock(&async->apool_lock);
pool_free((pnode_t *)aclenv);
- pth_mutex_release(&async->apool_lock);
+ pthread_mutex_unlock(&async->apool_lock);
return (NULL);
}
{
struct async_clenv *aclenv = (struct async_clenv *)pnode;
- if ((aclenv->port = pth_msgport_create("XXX(NULL)")) != NULL) {
- if ((aclenv->ring = pth_event(PTH_EVENT_MSG, aclenv->port))) {
- register char *ptr = (char *)aclenv;
+ if (pthread_port_init(&aclenv->port) == 0) {
+ if (pthread_ring_init(&aclenv->ring) == 0) {
+ register char *ptr = (char *)aclenv;
ptr += (size_t)OALIGN_CEIL(sizeof(struct async_clenv), long);
aclenv->msg = (struct async_msg *)ptr;
- aclenv->msg->msg.m_replyport = aclenv->port;
- aclenv->msg->msg.m_size = async->msg_len;
+ pthread_msg_init(&aclenv->msg->msg, &aclenv->port);
+ pthread_port_set_ring(&aclenv->port, &aclenv->ring);
return TRUE;
}
- pth_msgport_destroy(aclenv->port);
+ pthread_port_destroy(&aclenv->port);
}
return FALSE;
{
struct async_clenv *aclenv = (struct async_clenv *)pnode;
- pth_event_free(aclenv->ring, PTH_FREE_ALL);
- pth_msgport_destroy(aclenv->port);
+ pthread_msg_destroy(&aclenv->msg->msg);
+ pthread_port_destroy(&aclenv->port);
+ pthread_ring_destroy(&aclenv->ring);
}
/* Send request */
aclenv->msg->func_id = function;
aclenv->msg->aclenv = aclenv;
- pth_msgport_put(async->port, &(aclenv->msg->msg));
+ pthread_msg_put(&async->port, &(aclenv->msg->msg));
/* Sleep until we get reply back, in a thread-safe manner.
* Note: This will permanently block the current thread if the
* we get a reply back seems a good choice here; The async functions
* should ensure to eventually return.
*/
- for (;;) {
- if ((pth_wait(aclenv->ring))) {
- if ((pth_event_occurred(aclenv->ring))) {
- /* We know that message pointer will be the same */
- if ((pth_msgport_get(aclenv->port)))
- break;
- }
- }
- }
+ while (pthread_msg_get(&aclenv->port) == NULL)
+ pthread_ring_wait(&aclenv->ring, NULL);
}
data.cnt = 0;
for (;;) {
/* Sleep until it is known that at least one node expired */
- pth_sleep((unsigned int)data.soonest);
+ pthread_sleep((unsigned int)data.soonest);
/* Tell our iterator function the current time and the maximum
* allowed time to wait to
*/
/* Lock ctable, reset expired nodes, garbage collect, and set
* data.soonest to the time of the soonest next expireing node.
*/
- pth_mutex_acquire(&ctable_lock, FALSE, NULL);
+ pthread_mutex_lock(&ctable_lock);
if (HASHTABLE_NODES(&ctable) > 0)
hashtable_iterate(&ctable, clnode_expire_thread_iterator, &data);
- pth_mutex_release(&ctable_lock);
+ pthread_mutex_unlock(&ctable_lock);
}
/* NOTREACHED */
- pth_exit(NULL);
+ pthread_exit(NULL);
return NULL;
}
if (rem != 0 && data->soonest > rem)
data->soonest = rem;
- /* If the cache is big, prevent from interfering with other threads */
- if ((data->cnt++) == 64) {
- data->cnt = 0;
- pth_yield(NULL);
- }
-
return TRUE;
}
-/* $Id: mmserver.h,v 1.10 2004/06/09 20:58:54 mmondor Exp $ */
+/* $Id: mmserver.h,v 1.11 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2000-2004, Matthew Mondor
#include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
#include <mmtypes.h>
#include <mmpool.h>
#include <mmhash.h>
#include <mmlimitrate.h>
+#include <mm_pthread_msg.h>
+
*/
struct async_msg {
pnode_t node;
- pth_message_t msg;
+ pthread_msg_t msg;
struct async_clenv *aclenv;
unsigned int func_id;
/* User data structure here, user async functions should not touch
/* Global async server context environment */
struct async_env {
- pth_t slave;
- pth_msgport_t port;
- pth_event_t ring;
- pth_mutex_t apool_lock;
+ pthread_t slave;
+ pthread_port_t port;
+ pthread_ring_t ring;
+ pthread_mutex_t apool_lock;
struct async_func *funcs;
unsigned int nfuncs, nprocs;
size_t msg_len;
/* Client/thread specific async context */
struct async_clenv {
pnode_t node;
- pth_msgport_t port;
- pth_event_t ring;
+ pthread_port_t port;
+ pthread_ring_t ring;
struct async_msg *msg;
};
extern bool make_daemon(const char *, const char *);
extern bool async_init(struct async_func *, int, uid_t, gid_t *, int);
-extern bool async_init_pth(void);
+extern bool async_init_pthread(void);
extern void async_call(struct async_clenv *, unsigned int);
extern char * resolve_hostname(struct async_clenv *, struct sockaddr *);
-/* $Id: mmsql.c,v 1.13 2004/12/03 17:29:50 mmondor Exp $ */
+/* $Id: mmsql.c,v 1.14 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2000-2004, Matthew Mondor
MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmsql.c,v 1.13 2004/12/03 17:29:50 mmondor Exp $");
+MMRCSID("$Id: mmsql.c,v 1.14 2007/03/13 20:28:22 mmondor Exp $");
thrfuncs.mutex_destroy = funcs->mutex_destroy;
thrfuncs.mutex_lock = funcs->mutex_lock;
thrfuncs.mutex_unlock = funcs->mutex_unlock;
- thrfuncs.thread_yield = funcs->thread_yield;
thrfuncs.thread_sleep = funcs->thread_sleep;
if ((mmsql_lock = thrfuncs.mutex_init()) != NULL)
-/* $Id: mmsql.h,v 1.8 2004/08/21 10:00:12 mmondor Exp $ */
+/* $Id: mmsql.h,v 1.9 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2000-2004, Matthew Mondor
void * (*mutex_destroy)(void *);
void (*mutex_lock)(void *);
void (*mutex_unlock)(void *);
- void (*thread_yield)(void);
- void (*thread_sleep)(int);
+ unsigned int (*thread_sleep)(unsigned int);
};
-/* $Id: mmstr.c,v 1.8 2004/05/22 17:44:00 mmondor Exp $ */
+/* $Id: mmstr.c,v 1.9 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2000-2004, Matthew Mondor
/* INCLUDES */
#include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
#include <mmtypes.h>
#include <mmstring.h>
MMCOPYRIGHT("@(#) Copyright (c) 2000-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmstr.c,v 1.8 2004/05/22 17:44:00 mmondor Exp $");
+MMRCSID("$Id: mmstr.c,v 1.9 2007/03/13 20:28:22 mmondor Exp $");
/* GLOBALS */
static pool_t mmstrp;
-static pth_mutex_t mmstrp_lock = PTH_MUTEX_INIT;
+static pthread_mutex_t mmstrp_lock = PTHREAD_MUTEX_INITIALIZER;
{
bool ok = FALSE;
- pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+ pthread_mutex_lock(&mmstrp_lock);
if (!POOL_VALID(&mmstrp))
ok = pool_init(&mmstrp, "mmstr_pool", allocfunc, freefunc, NULL, NULL,
sizeof(mmstrnode), buffer / sizeof(mmstrnode), 0, 0);
else
ok = TRUE;
- pth_mutex_release(&mmstrp_lock);
+ pthread_mutex_unlock(&mmstrp_lock);
return ok;
}
void
mmstrexit(void)
{
- pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+ pthread_mutex_lock(&mmstrp_lock);
pool_destroy(&mmstrp);
- pth_mutex_release(&mmstrp_lock);
+ pthread_mutex_unlock(&mmstrp_lock);
}
*/
for (ptr = str; *ptr != '\0'; ptr++) ;
if ((len = (size_t)(ptr - str) + 1) < 256) {
- pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+ pthread_mutex_lock(&mmstrp_lock);
nod = (mmstrnode *)pool_alloc(&mmstrp, FALSE);
- pth_mutex_release(&mmstrp_lock);
+ pthread_mutex_unlock(&mmstrp_lock);
if (nod != NULL) {
mm_memcpy(nod->string, str, len);
return nod->string;
register mmstrnode *nod = NULL;
if (size < 256) {
- pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+ pthread_mutex_lock(&mmstrp_lock);
nod = (mmstrnode *)pool_alloc(&mmstrp, FALSE);
- pth_mutex_release(&mmstrp_lock);
+ pthread_mutex_unlock(&mmstrp_lock);
if (nod != NULL) {
*nod->string = '\0';
return nod->string;
/* Evaluate actual node pointer from supplied string pointer and free it */
str -= sizeof(pnode_t);
- pth_mutex_acquire(&mmstrp_lock, FALSE, NULL);
+ pthread_mutex_lock(&mmstrp_lock);
pool_free((pnode_t *)str);
- pth_mutex_release(&mmstrp_lock);
+ pthread_mutex_unlock(&mmstrp_lock);
return NULL;
}
-$Id: ChangeLog,v 1.52 2005/11/17 07:38:07 mmondor Exp $
+$Id: ChangeLog,v 1.53 2007/03/13 20:28:22 mmondor Exp $
+
+
+
+Release: mmmail 0.1.0 devl
+Date : March 13, 2007
+By : Matthew Mondor
+
+* Performance enhancements
+ - Now uses pthread instead of pth, using the new pthread_utils library
+ for functionality which pth had but pthread lacks. This allows to
+ execute less functions through dedicated processes (which also require
+ some send(2)/recv(2) overhead), because of the expected preemptive
+ nature of most pthread implementations. It also allows better scaling
+ with SMP.
+ - Uses PTHREAD_MUTEX_INITIALIZER where possible instead of
+ pthread_mutex_init().
-# $Id: GNUmakefile,v 1.7 2005/01/23 14:47:34 mmondor Exp $
+# $Id: GNUmakefile,v 1.8 2007/03/13 20:28:22 mmondor Exp $
MMLIBS := $(addprefix ../mmlib/,mmfd.o mmhash.o mmlimitrate.o mmsql.o \
-mmlog.o mmpool.o mmreadcfg.o mmserver.o mm_pth_pool.o mmstat.o mmstr.o \
-mmstring.o)
+mmlog.o mmpool.o mmreadcfg.o mmserver.o mmstat.o mmstr.o \
+mmstring.o) $(addprefix ../pthread_util/,mm_pthread_msg.o \
+mm_pthread_poll.o mm_pthread_pool.o mm_pthread_sleep.o)
-PTH_CFLAGS := $(shell pth-config --cflags)
-PTH_LDFLAGS := $(shell pth-config --ldflags --libs)
MYSQL_CFLAGS := $(shell mysql_config --cflags)
MYSQL_LDFLAGS := $(shell mysql_config --libs)
-CFLAGS += $(PTH_CFLAGS) $(MYSQL_CFLAGS) -I. -I../mmlib
-LDFLAGS += $(PTH_LDFLAGS) $(MYSQL_LDFLAGS) -lc -lcrypt
+CFLAGS += $(MYSQL_CFLAGS) -I. -I../mmlib -I../pthread_util
+LDFLAGS += $(MYSQL_LDFLAGS) -lc -lcrypt -lpthread
OBJS := src/mmsmtpd/mmsmtpd.o src/mmpop3d/mmpop3d.o src/mmrelayd/mmrelayd.o
BINS := src/mmsmtpd/mmsmtpd src/mmpop3d/mmpop3d src/mmrelayd/mmrelayd
CFLAGS += -Wall
#CFLAGS += -DMMMAIL_MYSQL
CFLAGS += -DMMMAIL_FILE
+#CFLAGS += -DNODETACH -DDEBUG -DPTHREAD_DEBUG
+#LDFLAGS += -lpthread_dbg
all: $(BINS)
# To compile all C files to objects
%.o: %.c
- cc -c ${CFLAGS} -o $@ $<
+ $(CC) -c ${CFLAGS} -o $@ $<
# To link all executables (binaries)
$(BINS): $(MMLIBS) $(OBJS)
- cc -o $@ ${LDFLAGS} ${MMLIBS} $@.o
+ $(CC) -o $@ ${LDFLAGS} ${MMLIBS} $@.o
install:
-/* $Id: mmpop3d.c,v 1.40 2005/03/05 15:33:33 mmondor Exp $ */
+/* $Id: mmpop3d.c,v 1.41 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2001-2004, Matthew Mondor
#include <time.h>
#include <ctype.h>
-#include <pth.h>
+
+#include <pthread.h>
#include <mmtypes.h>
#include <mmreadcfg.h>
#include <mmstring.h>
#include <mmstat.h>
+#include <mm_pthread_sleep.h>
+
#include "mmpop3d.h"
MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmpop3d.c,v 1.40 2005/03/05 15:33:33 mmondor Exp $");
+MMRCSID("$Id: mmpop3d.c,v 1.41 2007/03/13 20:28:22 mmondor Exp $");
static int LOGLEVEL;
/* Pool used to allocate clientenv nodes */
-static pth_mutex_t clenv_lock;
+static pthread_mutex_t clenv_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t clenv_pool;
/* Pool used to optimize creating/destroying mmfd mutexes */
-static pth_mutex_t mutexes_lock;
+static pthread_mutex_t mutexes_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t mutexes_pool;
/* Used for fast command lookup */
static fdfuncs gfdf = {
malloc,
free,
- pth_poll,
- pth_read,
- pth_write,
- pth_sleep,
- pth_usleep,
- _pth_mutex_create,
- _pth_mutex_destroy,
- _pth_mutex_lock,
- _pth_mutex_unlock,
- _pth_thread_yield,
- _pth_eintr
+ poll,
+ read,
+ write,
+ pthread_sleep,
+ pthread_usleep,
+ thread_mutex_create,
+ thread_mutex_destroy,
+ thread_mutex_lock,
+ thread_mutex_unlock,
+ NULL,
+ thread_eintr
};
{
uid_t uid;
gid_t *gids;
- char *conf_file = "/etc/mmpop3d.conf";
+ char *conf_file = "/usr/local/etc/mmpop3d.conf";
int ret = -1, ngids;
long facility;
char *db_host;
{NULL, 0}
};
struct mmsql_threadsupport mmsqlfuncs = {
- _pth_mutex_create,
- _pth_mutex_destroy,
- _pth_mutex_lock,
- _pth_mutex_unlock,
- _pth_thread_yield,
- _pth_thread_sleep
+ thread_mutex_create,
+ thread_mutex_destroy,
+ thread_mutex_lock,
+ thread_mutex_unlock,
+ pthread_sleep
};
mmstat_t vstat;
async_init(afuncs, CONF.ASYNC_PROCESSES, uid, gids, ngids);
/* Things which shouldn't be part of the async pool processes */
- pth_init();
- async_init_pth();
- pth_mutex_init(&clenv_lock);
- pth_mutex_init(&mutexes_lock);
+ async_init_pthread();
/* Allocate necessary pools */
/* Client nodes */
if (hash_commands(commands, 3) && POOL_VALID(&clenv_pool) &&
POOL_VALID(&mutexes_pool) && strlist) {
+ thread_init();
fdbcinit(&gfdf, &fdbc, CONF.GBANDWIDTH_IN * 1024,
CONF.GBANDWIDTH_OUT * 1024);
mmsql_init(&mmsqlfuncs);
{
clientenv *clenv;
- pth_mutex_acquire(&clenv_lock, FALSE, NULL);
+ pthread_mutex_lock(&clenv_lock);
clenv = (clientenv *)pool_alloc(&clenv_pool, TRUE);
- pth_mutex_release(&clenv_lock);
+ pthread_mutex_unlock(&clenv_lock);
if (clenv != NULL) {
mmstat_init(&clenv->pstat, TRUE, FALSE);
if (clenv->index != NULL)
free(clenv->index);
- pth_mutex_acquire(&clenv_lock, FALSE, NULL);
+ pthread_mutex_lock(&clenv_lock);
pool_free((pnode_t *)clenv);
- pth_mutex_release(&clenv_lock);
+ pthread_mutex_unlock(&clenv_lock);
return (NULL);
}
*/
ptr = host;
while (*ptr != '\0') {
- if (!isalnum(*ptr) || *ptr == ' ')
+ if (!isalnum((int)*ptr) || *ptr == ' ')
return (FALSE);
else {
/* Find next host part */
inline static bool
valid_char(char c)
{
- return (isalnum(c) || c == '.' || c == '-' || c == '_');
+ return (isalnum((int)c) || c == '.' || c == '-' || c == '_');
}
/* mmfd library thread support functions */
+static pthread_mutexattr_t thread_ma;
+
+static void
+thread_init(void)
+{
+ (void) pthread_mutexattr_init(&thread_ma);
+ (void) pthread_mutexattr_settype(&thread_ma, PTHREAD_MUTEX_RECURSIVE);
+}
+
+
static void *
-_pth_mutex_create(void)
+thread_mutex_create(void)
{
struct mutexnode *mnod;
- pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+ pthread_mutex_lock(&mutexes_lock);
mnod = (struct mutexnode *)pool_alloc(&mutexes_pool, FALSE);
- pth_mutex_release(&mutexes_lock);
+ pthread_mutex_unlock(&mutexes_lock);
if (mnod != NULL)
- pth_mutex_init(&mnod->mutex);
+ pthread_mutex_init(&mnod->mutex, &thread_ma);
return ((void *)mnod);
}
static void *
-_pth_mutex_destroy(void *mtx)
+thread_mutex_destroy(void *mtx)
{
- /* struct mutexnode *mnod = mtx; */
+ struct mutexnode *mnod = mtx;
- /* pth_mutex_destroy(&mnod->mutex); */
- pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+ pthread_mutex_destroy(&mnod->mutex);
+ pthread_mutex_lock(&mutexes_lock);
pool_free(mtx);
- pth_mutex_release(&mutexes_lock);
+ pthread_mutex_unlock(&mutexes_lock);
return (NULL);
}
static void
-_pth_mutex_lock(void *mtx)
+thread_mutex_lock(void *mtx)
{
struct mutexnode *mnod = mtx;
- pth_mutex_acquire(&mnod->mutex, FALSE, NULL);
+ pthread_mutex_lock(&mnod->mutex);
}
static void
-_pth_mutex_unlock(void *mtx)
+thread_mutex_unlock(void *mtx)
{
struct mutexnode *mnod = mtx;
- pth_mutex_release(&mnod->mutex);
-}
-
-
-static void
-_pth_thread_yield(void)
-{
- pth_yield(NULL);
-}
-
-
-static void
-_pth_thread_sleep(int secs)
-{
- pth_sleep(secs);
+ pthread_mutex_unlock(&mnod->mutex);
}
static bool
-_pth_eintr(void)
+thread_eintr(void)
{
if (errno == EINTR)
return TRUE;
-/* $Id: mmpop3d.h,v 1.16 2005/03/05 15:33:34 mmondor Exp $ */
+/* $Id: mmpop3d.h,v 1.17 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2001-2004, Matthew Mondor
#include <sys/types.h>
-#include <pth.h>
+#include <pthread.h>
#include <mmtypes.h>
#include <mmhash.h>
#include <mmfd.h>
#include <mmstat.h>
+#include <mm_pthread_sleep.h>
+
/* DEFINITIONS */
#define DAEMON_NAME "mmpop3d"
-#define DAEMON_VERSION "mmmail-0.0.24/mmondor"
+#define DAEMON_VERSION "mmmail-0.1.0/mmondor"
/* Negative states are used by the state swapper, others are real states */
#define STATE_ERROR -3
#define REGISTER_ERROR(x) do { \
(x)->errors++; \
if (CONF.DELAY_ON_ERROR) \
- pth_sleep((x)->errors); \
+ pthread_sleep((x)->errors); \
} while(0)
/* Used for mmfd thread support delegation/abstraction */
struct mutexnode {
pnode_t node;
- pth_mutex_t mutex;
+ pthread_mutex_t mutex;
};
/* This defines a state */
static int handleclient(unsigned long, int, clientlistnode *, struct iface *,
struct async_clenv *);
-static void *_pth_mutex_create(void);
-static void *_pth_mutex_destroy(void *);
-static void _pth_mutex_lock(void *);
-static void _pth_mutex_unlock(void *);
-static void _pth_thread_yield(void);
-static void _pth_thread_sleep(int);
-static bool _pth_eintr(void);
+static void thread_init(void);
+static void *thread_mutex_create(void);
+static void *thread_mutex_destroy(void *);
+static void thread_mutex_lock(void *);
+static void thread_mutex_unlock(void *);
+static bool thread_eintr(void);
static void async_checkpw(struct async_msg *);
static bool checkpw(clientenv *, const char *, const char *);
-/* $Id: mmsmtpd.c,v 1.74 2005/06/27 17:54:52 mmondor Exp $ */
+/* $Id: mmsmtpd.c,v 1.75 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2001-2004, Matthew Mondor
#include <syslog.h>
-#include <pth.h>
#include <signal.h>
#include <time.h>
#include <dirent.h>
#include <ctype.h>
+#include <pthread.h>
+
#include <mmtypes.h>
#include <mmreadcfg.h>
#include <mmfd.h>
MMCOPYRIGHT("@(#) Copyright (c) 2001-2004\n\
\tMatthew Mondor. All rights reserved.\n");
-MMRCSID("$Id: mmsmtpd.c,v 1.74 2005/06/27 17:54:52 mmondor Exp $");
+MMRCSID("$Id: mmsmtpd.c,v 1.75 2007/03/13 20:28:22 mmondor Exp $");
/* Used for clenv allocation buffering */
static pool_t clenv_pool;
-static pth_mutex_t clenv_lock;
+static pthread_mutex_t clenv_lock = PTHREAD_MUTEX_INITIALIZER;
/* Used for the flood protection cache */
static pool_t hosts_pool;
static hashtable_t hosts_table;
-static pth_mutex_t hosts_lock;
+static pthread_mutex_t hosts_lock = PTHREAD_MUTEX_INITIALIZER;
/* Used for rcpt allocation buffering */
static pool_t rcpt_pool;
-static pth_mutex_t rcpt_lock;
+static pthread_mutex_t rcpt_lock = PTHREAD_MUTEX_INITIALIZER;
/* Pool used to optimize creating/destroying mmfd mutexes */
-static pth_mutex_t mutexes_lock;
+static pthread_mutex_t mutexes_lock = PTHREAD_MUTEX_INITIALIZER;
static pool_t mutexes_pool;
/* For fast command lookup */
static fdfuncs gfdf = {
malloc,
free,
- pth_poll,
- pth_read,
- pth_write,
- pth_sleep,
- pth_usleep,
- _pth_mutex_create,
- _pth_mutex_destroy,
- _pth_mutex_lock,
- _pth_mutex_unlock,
- _pth_thread_yield,
- _pth_eintr
+ poll,
+ read,
+ write,
+ pthread_sleep,
+ pthread_usleep,
+ thread_mutex_create,
+ thread_mutex_destroy,
+ thread_mutex_lock,
+ thread_mutex_unlock,
+ NULL,
+ thread_eintr
};
/*
* Connection to mmrelayd(8) establishment
*/
int relayd_sock = -1;
-pth_mutex_t relayd_lock = PTH_MUTEX_INIT;
+pthread_mutex_t relayd_lock = PTHREAD_MUTEX_INITIALIZER;
{
uid_t uid;
gid_t *gids;
- char *conf_file = "/etc/mmsmtpd.conf";
+ char *conf_file = "/usr/local/etc/mmsmtpd.conf";
int ngids, ret = -1;
long facility;
char *db_host;
{NULL, 0}
};
struct mmsql_threadsupport mmsqlfuncs = {
- _pth_mutex_create,
- _pth_mutex_destroy,
- _pth_mutex_lock,
- _pth_mutex_unlock,
- _pth_thread_yield,
- _pth_thread_sleep
+ thread_mutex_create,
+ thread_mutex_destroy,
+ thread_mutex_lock,
+ thread_mutex_unlock,
+ pthread_sleep
};
mmstat_t vstat;
- pth_t hosts_table_thread = NULL;
- pth_t mmmail_db_gc_thread = NULL;
- pth_attr_t threadattr;
+ pthread_t hosts_table_thread = NULL;
+ pthread_t mmmail_db_gc_thread = NULL;
+ pthread_attr_t threadattr;
/* Set defaults */
*CONF.CHROOT_DIR = 0;
async_init(afuncs, CONF.ASYNC_PROCESSES, uid, gids, ngids);
/* Things which shouldn't be part of the async pool processes */
- pth_init();
- async_init_pth();
- pth_mutex_init(&clenv_lock);
- pth_mutex_init(&hosts_lock);
- pth_mutex_init(&rcpt_lock);
- pth_mutex_init(&mutexes_lock);
+ async_init_pthread();
/* Allocate necessary pools */
/* Client nodes */
pool_init(&mutexes_pool, "mutexes_pool", malloc, free, NULL, NULL,
sizeof(struct mutexnode),
(16384 * CONF.ALLOC_BUFFERS) / sizeof(struct mutexnode), 0, 0);
+
+ pthread_attr_init(&threadattr);
+ pthread_attr_setdetachstate(&threadattr, 0);
+
/* Rate nodes */
if (CONF.FLOOD_PROTECTION) {
pool_init(&hosts_pool, "hosts_pool", malloc, free, NULL, NULL,
sizeof(hostnode), CONF.FLOOD_CACHE, 1, 1);
hashtable_init(&hosts_table, "hosts_table", CONF.FLOOD_CACHE, 1,
malloc, free, mm_memcmp, mm_memhash32, FALSE);
- threadattr = pth_attr_new();
- pth_attr_set(threadattr, PTH_ATTR_JOINABLE, TRUE);
- hosts_table_thread = pth_spawn(threadattr, hosts_expire_thread, NULL);
+ pthread_create(&hosts_table_thread, &threadattr, hosts_expire_thread,
+ NULL);
}
/* Launch box directories cleaning thread */
- threadattr = pth_attr_new();
- pth_attr_set(threadattr, PTH_ATTR_JOINABLE, TRUE);
- mmmail_db_gc_thread = pth_spawn(threadattr, db_gc_thread, NULL);
+ pthread_create(&mmmail_db_gc_thread, &threadattr, db_gc_thread, NULL);
/* mmstr nodes */
strlist = mmstrinit(malloc, free, 65536 * CONF.ALLOC_BUFFERS);
(!CONF.FLOOD_PROTECTION || (POOL_VALID(&hosts_pool) &&
HASHTABLE_VALID(&hosts_table) &&
hosts_table_thread != NULL))) {
+ thread_init();
fdbcinit(&gfdf, &fdbc, CONF.GBANDWIDTH_IN * 1024,
CONF.GBANDWIDTH_OUT * 1024);
mmsql_init(&mmsqlfuncs);
if (strlist)
mmstrexit();
+ /* XXX
if (hosts_table_thread != NULL) {
pth_abort(hosts_table_thread);
pth_join(hosts_table_thread, NULL);
pth_abort(mmmail_db_gc_thread);
pth_join(mmmail_db_gc_thread, NULL);
}
+ */
if (HASHTABLE_VALID(&command_table))
hashtable_destroy(&command_table, FALSE);
if (POOL_VALID(&command_pool))
if ((mm_strncasecmp(" FROM:<>", &clenv->buffer[4], 8)) == 0) {
/* Some systems use empty MAIL FROM like this, make sure
* that IP address or hostname is allowed to do this.
+ * If so, we also want to make sure not to perform any type
+ * of envelope based filtering for this post.
*/
valid = check_nofrom(clenv->c_ipaddr, clenv->c_hostname);
if (valid)
*addr = '\0';
- } else
+ clenv->nofrom = TRUE;
+ } else {
valid = valid_address(clenv, addr, 128, clenv->buffer,
(CONF.RESOLVE_MX_MAIL) ? HOST_RES_MX : HOST_NORES);
+ clenv->nofrom = FALSE;
+ }
if (valid) {
if ((clenv->from = (char *)mmstrdup(addr)) != NULL)
/* These only apply to local addresses */
if (!relay) {
- /* Ensure to observe allow filters if any set for box */
- if (boxinfo.filter) {
+ /*
+ * Ensure to observe allow filters if any set for box, except for mail
+ * with an empty FROM address from allowed servers
+ */
+ if (boxinfo.filter && !clenv->nofrom) {
if (!box_filter_allow(addr, clenv->from, boxinfo.filter_type)) {
reason = RCPT_FILTER;
if (CONF.STATFAIL_FILTER)
*/
{
register rcptnode *rnode;
- register int cnt;
ahash = mm_strhash64(addr);
- cnt = 0;
DLIST_FOREACH(&clenv->rcpt, rnode) {
if (rnode->hash == ahash) {
reason = RCPT_EXISTS;
goto end;
}
- cnt++;
- if (cnt > 64) {
- cnt = 0;
- pth_yield(NULL);
- }
}
}
len = mm_strlen(entry);
valid = TRUE;
- pth_mutex_acquire(&hosts_lock, FALSE, NULL);
+ pthread_mutex_lock(&hosts_lock);
/* First acquire our hostnode, or create it if required */
if ((hnod = (hostnode *)hashtable_lookup(&hosts_table, entry, len + 1))
== NULL) {
clenv->id, LR_POSTS(&hnod->lr), CONF.FLOOD_EXPIRES);
}
}
- pth_mutex_release(&hosts_lock);
+ pthread_mutex_unlock(&hosts_lock);
if (!valid) {
if (CONF.STATFAIL_FLOOD)
register rcptnode *rnode;
reason = RCPT_ERROR;
- pth_mutex_acquire(&rcpt_lock, FALSE, NULL);
+ pthread_mutex_lock(&rcpt_lock);
rnode = (rcptnode *)pool_alloc(&rcpt_pool, FALSE);
- pth_mutex_release(&rcpt_lock);
+ pthread_mutex_unlock(&rcpt_lock);
if (rnode != NULL) {
mm_strcpy(rnode->address, (relay ? foraddr : addr));
mm_strcpy(rnode->foraddress, foraddr);
{
clientenv *clenv;
- pth_mutex_acquire(&clenv_lock, FALSE, NULL);
+ pthread_mutex_lock(&clenv_lock);
clenv = (clientenv *)pool_alloc(&clenv_pool, TRUE);
- pth_mutex_release(&clenv_lock);
+ pthread_mutex_unlock(&clenv_lock);
if (clenv != NULL) {
mmstat_init(&clenv->vstat, TRUE, TRUE);
mmstrfree(clenv->from);
empty_rcpts(&clenv->rcpt);
- pth_mutex_acquire(&clenv_lock, FALSE, NULL);
+ pthread_mutex_lock(&clenv_lock);
pool_free((pnode_t *)clenv);
- pth_mutex_release(&clenv_lock);
+ pthread_mutex_unlock(&clenv_lock);
return (NULL);
}
{
node_t *nod, *tmp;
- pth_mutex_acquire(&rcpt_lock, FALSE, NULL);
+ pthread_mutex_lock(&rcpt_lock);
for (nod = DLIST_TOP(lst); nod != NULL; nod = tmp) {
tmp = DLIST_NEXT(nod);
}
DLIST_INIT(lst);
- pth_mutex_release(&rcpt_lock);
+ pthread_mutex_unlock(&rcpt_lock);
}
if ((mysqlres = mmsql_query(query, mm_strlen(query))) != NULL) {
if ((mysql_num_rows(mysqlres)) > 0) {
char pat[64];
- int cur = 0, max = -1, cnt = 0;
+ int cur = 0, max = -1;
MYSQL_ROW *row;
unsigned long *lengths;
}
}
}
- cnt++;
- if (cnt > 64) {
- cnt = 0;
- pth_yield(NULL);
- }
}
if (max > -1)
res = TRUE;
if ((mysqlres = mmsql_query("SELECT nofrom_pattern FROM nofrom", -1))
!= NULL) {
if ((mysql_num_rows(mysqlres)) > 0) {
- int cnt = 0;
MYSQL_ROW *row;
unsigned long *lengths;
}
}
}
- cnt++;
- if (cnt > 64) {
- cnt = 0;
- pth_yield(NULL);
- }
}
}
mysqlres = mmsql_free_result(mysqlres);
*/
ptr = host;
while (*ptr != '\0') {
- if (!isalnum(*ptr))
+ if (!isalnum((int)*ptr))
return FALSE;
/* Find next host part */
while (*ptr != '\0' && *ptr != '.') ptr++;
} else return (FALSE);
if (*addr == '\0')
break;
- } else if (isdigit(*addr))
+ } else if (isdigit((int)*addr))
*uptr++ = *addr;
else
return (FALSE);
* spaces/tabs.
*/
if (*line != '\t' && *line != ' ') {
- for (ptr = line; *ptr != '\0' && (isalnum(*ptr) || *ptr == '-');
+ for (ptr = line; *ptr != '\0' && (isalnum((int)*ptr) ||
+ *ptr == '-');
ptr++) ;
if (*ptr != ':')
goto endheader;
* If we cannot obtain lock, we know that it's already being notified, and
* we don't need to do anything.
*/
- if (pth_mutex_acquire(&relayd_lock, TRUE, NULL) == FALSE)
+ if (pthread_mutex_lock(&relayd_lock) != 0)
return;
/*
* and send it again, but once only.
*/
for (;;) {
- if (pth_write(relayd_sock, "N", 1) != 1) {
+ if (write(relayd_sock, "N", 1) != 1) {
(void) close(relayd_sock);
if ((relayd_sock = do_data_queue_notify_connect()) != -1)
continue;
mmsyslog(0, LOGLEVEL,
"mmrelayd(8) could not be notified (not running?)");
- (void) pth_mutex_release(&relayd_lock);
+ (void) pthread_mutex_unlock(&relayd_lock);
}
/*
mm_memclr(&addr, sizeof(struct sockaddr_un));
(void) mm_strncpy(addr.sun_path, CONF.MMRELAYD_SOCKET_PATH, 100);
addr.sun_family = AF_UNIX;
- if ((pth_connect(fd, (struct sockaddr *)&addr,
+ if ((connect(fd, (struct sockaddr *)&addr,
sizeof(struct sockaddr_un))) == -1) {
(void) close(fd);
fd = -1;
/* mmfd library thread support functions */
+static pthread_mutexattr_t thread_ma;
+
+static void
+thread_init(void)
+{
+ (void) pthread_mutexattr_init(&thread_ma);
+ (void) pthread_mutexattr_settype(&thread_ma, PTHREAD_MUTEX_RECURSIVE);
+}
+
+
static void *
-_pth_mutex_create(void)
+thread_mutex_create(void)
{
struct mutexnode *mnod;
- pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+ pthread_mutex_lock(&mutexes_lock);
mnod = (struct mutexnode *)pool_alloc(&mutexes_pool, FALSE);
- pth_mutex_release(&mutexes_lock);
+ pthread_mutex_unlock(&mutexes_lock);
if (mnod != NULL)
- pth_mutex_init(&mnod->mutex);
+ pthread_mutex_init(&mnod->mutex, &thread_ma);
return ((void *)mnod);
}
static void *
-_pth_mutex_destroy(void *mtx)
+thread_mutex_destroy(void *mtx)
{
- /* struct mutexnode *mnod = mtx; */
+ struct mutexnode *mnod = mtx;
- /* pth_mutex_destroy(&mnod->mutex); */
- pth_mutex_acquire(&mutexes_lock, FALSE, NULL);
+ pthread_mutex_destroy(&mnod->mutex);
+ pthread_mutex_lock(&mutexes_lock);
pool_free(mtx);
- pth_mutex_release(&mutexes_lock);
+ pthread_mutex_unlock(&mutexes_lock);
return (NULL);
}
static void
-_pth_mutex_lock(void *mtx)
+thread_mutex_lock(void *mtx)
{
struct mutexnode *mnod = mtx;
- pth_mutex_acquire(&mnod->mutex, FALSE, NULL);
+ pthread_mutex_lock(&mnod->mutex);
}
static void
-_pth_mutex_unlock(void *mtx)
+thread_mutex_unlock(void *mtx)
{
struct mutexnode *mnod = mtx;
- pth_mutex_release(&mnod->mutex);
-}
-
-
-static void
-_pth_thread_yield(void)
-{
- pth_yield(NULL);
-}
-
-
-static void
-_pth_thread_sleep(int secs)
-{
- pth_sleep(secs);
+ pthread_mutex_unlock(&mnod->mutex);
}
static bool
-_pth_eintr(void)
+thread_eintr(void)
{
if (errno == EINTR)
return TRUE;
/* Set the initial timeout to the maximum allowed */
data.soonest = CONF.FLOOD_EXPIRES * 60;
- data.cnt = 0;
for (;;) {
/* Sleep until it is known that at least one node expired */
- pth_sleep((unsigned int)data.soonest);
+ pthread_sleep(data.soonest);
/* Tell our iterator function the current time and the maximum
* allowed time to wait to
*/
/* Lock hosts_table, expunge expired nodes and set data.soonest to the
* time of the soonest next expireing node
*/
- pth_mutex_acquire(&hosts_lock, FALSE, NULL);
+ pthread_mutex_lock(&hosts_lock);
if (HASHTABLE_NODES(&hosts_table) > 0)
hashtable_iterate(&hosts_table, hosts_expire_thread_iterator,
&data);
- pth_mutex_release(&hosts_lock);
+ pthread_mutex_unlock(&hosts_lock);
}
/* NOTREACHED */
- pth_exit(NULL);
+ pthread_exit(NULL);
return NULL;
}
data->soonest = rem;
}
- /* If the cache is big, prevent from interfering with other threads */
- if ((data->cnt++) == 64) {
- data->cnt = 0;
- pth_yield(NULL);
- }
-
return TRUE;
}
for (rounds = 1; ; rounds++) {
MYSQL_RES *mysqlres;
- (void) pth_sleep(60);
+ (void) pthread_sleep(60);
/*
* Perform dangling mailbox directories cleanup
}
/* NOTREACHED */
- pth_exit(NULL);
+ pthread_exit(NULL);
return NULL;
}
char dirpath[256], filepath[256];
DIR *dir;
struct dirent ent, *res;
- int count;
/*
* Now <path> holds the actual directory to delete mail from. We know
return;
}
- count = 1;
while (readdir_r(dir, &ent, &res) == 0 && res == &ent) {
(void) snprintf(filepath, 255, "%s/%s", dirpath, ent.d_name);
if (unlink(filepath) != 0)
syslog(LOG_NOTICE, "db_gc_thread_delete(%s) - unlink(%s) == %s",
addr, filepath, strerror(errno));
- if (++count == 64) {
- count = 0;
- (void) pth_yield(NULL);
- }
}
if (rmdir(dirpath) != 0)
syslog(LOG_NOTICE, "db_gc_thread_delete(%s) - rmdir(%s) == %s",
-/* $Id: mmsmtpd.h,v 1.33 2005/03/26 11:45:48 mmondor Exp $ */
+/* $Id: mmsmtpd.h,v 1.34 2007/03/13 20:28:22 mmondor Exp $ */
/*
* Copyright (C) 2001-2004, Matthew Mondor
#include <sys/types.h>
#include <time.h>
-#include <pth.h>
+#include <pthread.h>
+#include <mm_pthread_sleep.h>
#include <mmtypes.h>
#include <mmlist.h>
/* DEFINITIONS */
#define DAEMON_NAME "mmsmtpd"
-#define DAEMON_VERSION "mmmail-0.0.24/mmondor"
+#define DAEMON_VERSION "mmmail-0.1.0/mmondor"
/* Negative states are used by the state swapper, others are real states */
#define STATE_ERROR -3
#define REGISTER_ERROR(x) do { \
(x)->errors++; \
if (CONF.DELAY_ON_ERROR) \
- pth_sleep((x)->errors); \
+ pthread_sleep((x)->errors); \
} while(FALSE)
/* Evaluates if a character is valid for addresses and hostnames */
long mesg_size; /* Current cached message size in bytes */
long errors; /* Total number of errors that occured */
int timeout; /* Timeout in ms */
+ bool nofrom; /* If empty MAIL FROM from allowed server */
unsigned long id; /* Our connection ID */
unsigned long messages; /* Messages user sent us */
unsigned long rcpts; /* Number of RCPT accepted */
struct hosts_expire_thread_iterator_udata {
time_t current, soonest;
- int cnt;
};
/* Used for mmfd thread support delegation/abstraction */
struct mutexnode {
pnode_t node;
- pth_mutex_t mutex;
+ pthread_mutex_t mutex;
};
/* This defines a state */
static int handleclient(unsigned long, int, clientlistnode *, struct iface *,
struct async_clenv *);
-static void *_pth_mutex_create(void);
-static void *_pth_mutex_destroy(void *);
-static void _pth_mutex_lock(void *);
-static void _pth_mutex_unlock(void *);
-static void _pth_thread_yield(void);
-static void _pth_thread_sleep(int);
-static bool _pth_eintr(void);
+static void thread_init(void);
+static void *thread_mutex_create(void);
+static void *thread_mutex_destroy(void *);
+static void thread_mutex_lock(void *);
+static void thread_mutex_unlock(void *);
+static bool thread_eintr(void);
static void async_resquery(struct async_msg *);
static int a_res_query(clientenv *, const char *, int, int, u_char *, int);