Source cleanup
[mmondor.git] / mmsoftware / mmlib / mmserver.c
1 /* $Id: mmserver.c,v 1.13 2003/07/17 09:13:01 mmondor Exp $ */
2
3 /*
4 * Copyright (C) 2000-2003, Matthew Mondor
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. All advertising materials mentioning features or use of this software
16 * must display the following acknowledgement:
17 * This product includes software written by Matthew Mondor.
18 * 4. The name of Matthew Mondor may not be used to endorse or promote
19 * products derived from this software without specific prior written
20 * permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
23 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
24 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
25 * IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
31 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34
35
36
37 /* HEADERS */
38
39 #include <sys/types.h>
40 #include <sys/wait.h>
41 #include <sys/stat.h>
42 #include <fcntl.h>
43 #include <stdlib.h>
44 #include <unistd.h>
45 #include <stdio.h>
46
47 #include <netdb.h>
48 #include <netinet/in.h>
49 #include <netinet/in_systm.h>
50 #include <netinet/ip.h>
51 #include <netinet/tcp.h>
52 #include <sys/socket.h>
53 #include <arpa/inet.h>
54 #include <arpa/nameser.h>
55 #include <resolv.h>
56
57 #include <syslog.h>
58
59 #include <pth.h>
60 #include <signal.h>
61 #include <sys/poll.h>
62 #include <time.h>
63
64 #include <mmstr.h>
65 #include <mmstring.h>
66 #include <mmfd.h>
67 #include <mmlist.h>
68 #include <mmpool.h>
69 #include <mmhash.h>
70 #include <mmserver.h>
71 #include <mmreadcfg.h>
72 #include <mmlog.h>
73
74
75
76
77 MMCOPYRIGHT("@(#) Copyright (c) 2002-2003\n\
78 \tMatthew Mondor. All rights reserved.\n");
79 MMRCSID("$Id: mmserver.c,v 1.13 2003/07/17 09:13:01 mmondor Exp $");
80
81
82
83
84 /* DEFINITIONS */
85
86
87
88
89 /* STATIC PROTOTYPES */
90
91 static void sighandler(int);
92 static struct iface *parse_ifaces(char *, char *);
93 static void *phandleclient(void *);
94 static void writepidfile(const char *);
95 static void *async_thread(void *);
96 static bool spawn_async_procs(uid_t, gid_t *, int, size_t);
97 static void async_proc_main(int);
98 static void async_resolvehostname(struct async_msg *);
99
100 static u_int32_t clnode_keyhash(const void *, size_t);
101 static int clnode_keycmp(const void *, const void *, size_t);
102 static void *clnode_expire_thread(void *);
103 static bool clnode_expire_thread_iterator(hashnode_t *, void *);
104
105
106
107
108 /* GLOBAL VARIABLES */
109
110 static int (*handleclientfunc)(unsigned long, int, clientlistnode *,
111 struct iface *, struct async_clenv *);
112 static pth_mutex_t ctable_lock, ppool_lock;
113 static pool_t cpool, ppool;
114 static hashtable_t ctable;
115 static struct async_env *async = NULL;
116
117 /* Useful so that daemons can log standard disconnection status easily */
118 const char *const mms_reasons[] = {
119 "NORMAL",
120 "INPUT_TIMEOUT",
121 "OUTPUT_TIMEOUT",
122 "INPUT_ERROR",
123 "OUTPUT_ERROR",
124 "RESOURCE_ERROR",
125 "NOT_AVAILABLE",
126 "MANY_ERRORS",
127 "CONNECTION_RATE_EXCEEDED", /* These three used internally */
128 "NUMBER_OF_ADDRESSES_EXCEEDED",
129 "CONNECTIONS_PER_ADDRESS_EXCEEDED",
130 "UNKNOWN",
131 NULL
132 };
133
134
135
136
137 /* MAIN DAEMON PROGRAM */
138
139 /* Before calling this function, the async_init() and async_init_pth()
140 * functions should have been called. Before any async_*() or tcp_server()
141 * calls the process is expected to have called make_daemon().
142 */
143 void
144 tcp_server(char *message, char *server_names, char *listen_ips, uid_t uid,
145 gid_t *gids, int ngids, long maxips, long maxperip, long ratemax,
146 long rateper, long timeout, int port, bool resolve,
147 int (*handleclient1)(unsigned long, int, clientlistnode *,
148 struct iface *, struct async_clenv *))
149 {
150 struct sockaddr addr;
151 struct sockaddr_in server, *sinaddr;
152 socklen_t addrl;
153 clientlistnode *clnode;
154 int msgsock, ret, msglen, nifaces, nifaces2, i, reason;
155 bool ok;
156 struct pollfd *fds;
157 struct ifacendx *fdsi;
158 struct iface *ifaces, *tif;
159 unsigned long id;
160 pth_t thread, clnode_thread;
161 pth_attr_t threadattr, clnode_threadattr;
162
163 sinaddr = (struct sockaddr_in *)&addr;
164 handleclientfunc = handleclient1;
165 id = (unsigned long)time(NULL);
166 msgsock = -1;
167 msglen = mm_strlen(message);
168 fds = NULL;
169 fdsi = NULL;
170 ifaces = NULL;
171
172 /* Pth related */
173 threadattr = pth_attr_new();
174 pth_attr_set(threadattr, PTH_ATTR_JOINABLE, FALSE);
175 clnode_threadattr = pth_attr_new();
176 pth_attr_set(clnode_threadattr, PTH_ATTR_JOINABLE, TRUE);
177 pth_mutex_init(&ctable_lock);
178 pth_mutex_init(&ppool_lock);
179
180 /* Used by resolve_hostname() */
181 if (!mmstrinit(malloc, free, 16384)) {
182 DPRINTF("tcp_server()", "mmstrinit()");
183 closelog();
184 exit(-1);
185 }
186
187 if ((ifaces = parse_ifaces(server_names, listen_ips)) == NULL) {
188 closelog();
189 mmstrexit();
190 exit(-1);
191 }
192
193 if (!pool_init(&cpool, malloc, free, sizeof(clientlistnode),
194 65536 / sizeof(clientlistnode), 0, 0) ||
195 !pool_init(&ppool, malloc, free, sizeof(phandleinfo),
196 16384 / sizeof(phandleinfo), 0, 0) ||
197 !hashtable_init(&ctable, maxips, 1, malloc, free, clnode_keycmp,
198 clnode_keyhash, FALSE) ||
199 ((clnode_thread = pth_spawn(clnode_threadattr,
200 clnode_expire_thread, &rateper)) == NULL)) {
201 DPRINTF("tcp_server()", "Out of memory");
202 closelog();
203 mmstrexit();
204 free(ifaces);
205 exit(-1);
206 }
207
208 /* Bind the port we should listen to, with all specified addresses */
209 for (ret = nifaces = 0, tif = ifaces; *(tif->hostname); tif++) {
210 if ((tif->sock = socket(AF_INET, SOCK_STREAM, 0)) > -1) {
211 int i1 = 1;
212
213 if ((setsockopt(tif->sock, SOL_SOCKET, SO_REUSEADDR, &i1,
214 sizeof(int))) == -1)
215 DPRINTF("tcp_server()", "Error on setsockopt(%s:%d) - (%s)",
216 tif->address_str, port, tif->hostname);
217 mm_memclr(&server, sizeof(struct sockaddr_in));
218 server.sin_family = AF_INET;
219 server.sin_addr = tif->address.sin_addr;
220 server.sin_port = htons(port);
221 if ((bind(tif->sock, (struct sockaddr *)(void *)&server,
222 sizeof(struct sockaddr_in))) == 0)
223 nifaces++;
224 else {
225 syslog(LOG_NOTICE,
226 "tcp_server() - Error on bind(%s:%d) - (%s)",
227 tif->address_str, port, tif->hostname);
228 close(tif->sock);
229 tif->sock = -1;
230 }
231 }
232 }
233
234 /* Drop privileges and start listening to all bound ports */
235 if (nifaces > 0) {
236 if (mmdropprivs(uid, gids, ngids)) {
237 for (tif = ifaces; *(tif->hostname); tif++) {
238 if (tif->sock != -1) {
239 if ((listen(tif->sock, maxips * maxperip)) != 0) {
240 DPRINTF("tcp_server()", "listen(%s)", tif->hostname);
241 close(tif->sock);
242 tif->sock = -1;
243 nifaces--;
244 } else
245 syslog(LOG_NOTICE, "Listening on [%s:%d] - (%s)",
246 tif->address_str, port, tif->hostname);
247 }
248 }
249 } else {
250 syslog(LOG_NOTICE,
251 "tcp_server() - Cannot change uid and gid to safe privs");
252 ret = -1;
253 }
254 } else {
255 syslog(LOG_NOTICE,
256 "tcp_server() - no interfaces (left) to listen to");
257 ret = -1;
258 }
259
260 if (!ret) {
261 /* Setup our poll() array, with corresponding index array */
262 if ((fds = malloc((sizeof(struct pollfd) * nifaces) +
263 (sizeof(struct ifacendx) * nifaces))) != NULL) {
264 for (fdsi = (struct ifacendx *)&(fds[nifaces]), i = 0,
265 tif = ifaces;
266 *(tif->hostname); tif++) {
267 if (tif->sock != -1) {
268 fds[i].fd = tif->sock;
269 fds[i].events = POLLIN;
270 fds[i].revents = 0;
271 fdsi[i].iface = tif;
272 i++;
273 }
274 }
275 } else {
276 DPRINTF("tcp_server()", "Out of memory");
277 ret = -1;
278 }
279 } else
280 ret = -1;
281
282 if (ret) {
283 free(ifaces);
284 mmstrexit();
285 if (clnode_thread != NULL) {
286 pth_abort(clnode_thread);
287 pth_join(clnode_thread, NULL);
288 }
289 if (HASHTABLE_VALID(&ctable)) hashtable_destroy(&ctable, FALSE);
290 if (POOL_VALID(&ppool)) pool_destroy(&ppool);
291 if (POOL_VALID(&cpool)) pool_destroy(&cpool);
292 exit(ret);
293 }
294
295 /* Start answering and dispatching connections */
296 syslog(LOG_NOTICE, "Started for uid: %d", uid);
297 for (;;) {
298 if ((nifaces2 = pth_poll(fds, nifaces, -1)) > 0) {
299 /* Loop but once only, and only for long as nifaces2 times.
300 * Use our fast index to reference to the interface structure.
301 */
302 for (i = 0; i < nifaces && nifaces2; i++) {
303 if (fds[i].revents & POLLIN) {
304 nifaces2--;
305 addrl = sizeof(struct sockaddr);
306 if ((msgsock = pth_accept(fds[i].fd, &addr, &addrl))
307 != -1) {
308 /* Make sure that we respect connection and rate
309 * limits.
310 */
311 ok = FALSE;
312 reason = MMS_NORMAL;
313 pth_mutex_acquire(&ctable_lock, FALSE, NULL);
314 if ((clnode = (clientlistnode *)hashtable_find(&ctable,
315 &sinaddr->sin_addr.s_addr,
316 sizeof(u_int32_t))) == NULL) {
317 /* Create new node */
318 if (HASHTABLE_NODES(&ctable) < maxips) {
319 if ((clnode = (clientlistnode *)pool_alloc(
320 &cpool, FALSE)) != NULL) {
321 clnode->hostname = NULL;
322 clnode->connections = clnode->rate = 0;
323 clnode->period = time(NULL) + rateper;
324 clnode->timeout = (timeout * 1000);
325 clnode->client = addr;
326 clnode->resolve = resolve;
327 hashtable_link(&ctable,
328 (hashnode_t *)clnode,
329 &((struct sockaddr_in *)&clnode->
330 client)->sin_addr.s_addr,
331 sizeof(u_int32_t));
332 } else
333 DPRINTF("tcp_server()", "Out of memory");
334 } else
335 reason = MMS_ADDRESSES_EXCEEDED;
336 }
337 if (clnode != NULL) {
338 /* Either we found an existing node or we created
339 * it successfully
340 */
341 if (clnode->connections < maxperip) {
342 if (ratemax != 0) {
343 if (clnode->rate < ratemax) {
344 clnode->rate++;
345 ok = TRUE;
346 } else
347 reason = MMS_RATE_EXCEEDED;
348 } else
349 ok = TRUE;
350 } else
351 reason = MMS_CONPERADDRESS_EXCEEDED;
352 }
353 pth_mutex_release(&ctable_lock);
354
355 if (ok) {
356 phandleinfo *phi;
357
358 /* Start thread to serve the client */
359 pth_mutex_acquire(&ppool_lock, FALSE, NULL);
360 phi = (phandleinfo *)pool_alloc(&ppool, FALSE);
361 pth_mutex_release(&ppool_lock);
362 if (phi != NULL) {
363 id++;
364 phi->id = id;
365 phi->socket = msgsock;
366 phi->clnode = clnode;
367 /* Indexed reference */
368 phi->iface = fdsi[i].iface;
369 if ((thread = pth_spawn(threadattr,
370 phandleclient,
371 phi)) != NULL)
372 clnode->connections++;
373 else {
374 if (phi != NULL) {
375 pth_mutex_acquire(&ppool_lock, FALSE,
376 NULL);
377 pool_free((pnode_t *)phi);
378 pth_mutex_release(&ppool_lock);
379 }
380 DPRINTF("tcp_server()",
381 "Error launching thread");
382 }
383 } else
384 DPRINTF("tcp_server()", "Out of memory");
385 } else {
386 char ipaddr[20];
387
388 /* Close down connection with client
389 * immediately, sending message.
390 */
391 write(msgsock, message, msglen);
392 shutdown(msgsock, 2);
393 close(msgsock);
394 mm_strcpy(ipaddr, "0.0.0.0");
395 inet_ntop(AF_INET, &(sinaddr->sin_addr), ipaddr,
396 19);
397 if (reason != MMS_NORMAL)
398 syslog(LOG_NOTICE, "tcp_server() - [%s] - %s",
399 ipaddr, MMS_RSTRING(reason));
400 }
401
402 pth_mutex_release(&ctable_lock);
403
404 } else
405 DPRINTF("tcp_server()", "pth_accept()");
406 }
407 }
408 }
409 }
410
411 free(ifaces);
412 mmstrexit();
413 pth_abort(clnode_thread);
414 pth_join(clnode_thread, NULL);
415 hashtable_destroy(&ctable, FALSE);
416 pool_destroy(&ppool);
417 pool_destroy(&cpool);
418 /* Unrequired for Pth (non-existing even)
419 * pth_mutex_destroy(&apool_lock);
420 * pth_mutex_destroy(&ctable_lock);
421 * pth_mutex_destroy(&ppool_lock);
422 */
423
424 exit(ret);
425 }
426
427
428 /* Writes our process ID number to specified file. To be called before
429 * chroot(2) or dropping privileges.
430 */
431 static void
432 writepidfile(const char *file)
433 {
434 char str[16];
435 int fd;
436
437 if ((fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600)) != -1) {
438 snprintf(str, 15, "%d\n", getpid());
439 write(fd, str, mm_strlen(str));
440 close(fd);
441 } else
442 DPRINTF("writepidfile()", "open(%s)", file);
443 }
444
445
446 /* Uses fork() to become a daemon, and detaches from the current tty.
447 * Returns either successful or not
448 */
449 bool
450 make_daemon(const char *pidfile, const char *jail)
451 {
452 int pid;
453
454 #ifdef NODETACH
455 pid = getpid();
456 #else
457 if ((pid = fork()) != -1)
458 #endif
459 {
460
461 #ifndef NODETACH
462 if (pid > 0) {
463 /* Parent */
464 exit(0);
465 } else
466 #endif
467 {
468 int fd;
469 struct sigaction act;
470
471 /* Child */
472 setsid();
473 chdir("/");
474 if ((fd = open("/dev/null", O_RDWR)) != -1) {
475 dup2(fd, 0);
476 dup2(fd, 1);
477 dup2(fd, 2);
478 if (fd > 2)
479 close(fd);
480 }
481 writepidfile(pidfile);
482
483 if (jail && *jail) {
484 if ((chroot(jail)) == -1) {
485 syslog(LOG_NOTICE,
486 "make_daemon() - Could not chroot(2) to '%s'",
487 jail);
488 exit(-1);
489 }
490 chdir("/");
491 }
492
493 /* Setup our break handler and ignore wanted signals */
494 act.sa_handler = sighandler;
495 act.sa_flags = SA_NOCLDWAIT;
496 sigemptyset(&act.sa_mask);
497 sigaction(SIGTERM, &act, NULL);
498 sigaction(SIGCHLD, &act, NULL);
499 sigaction(SIGSEGV, &act, NULL);
500 signal(SIGTTOU, SIG_IGN);
501 signal(SIGTTIN, SIG_IGN);
502 signal(SIGTSTP, SIG_IGN);
503 signal(SIGPIPE, SIG_IGN);
504
505 umask(0);
506
507 #ifndef __GLIBC__
508 setproctitle("Main threaded process");
509 #endif
510
511 return (TRUE);
512 }
513 }
514
515 return (FALSE);
516 }
517
518
519 static void
520 sighandler(int sig)
521 {
522 switch (sig) {
523 case SIGSEGV:
524 syslog(LOG_NOTICE, "Exiting (SIGSEGV)");
525 kill(0, SIGTERM);
526 exit(0);
527 break;
528 case SIGTERM:
529 syslog(LOG_NOTICE, "Exiting (SIGTERM)");
530 signal(SIGTERM, SIG_IGN);
531 kill(0, SIGTERM);
532 exit(0);
533 break;
534 case SIGCHLD:
535 {
536 int s = 0;
537
538 while ((wait3(&s, WNOHANG, NULL)) > 0) ;
539 }
540 }
541 }
542
543
544 static struct iface *
545 parse_ifaces(char *hostnames, char *addresses)
546 {
547 struct iface *ifaces;
548 char *hosts[65], *ips[65];
549 struct sockaddr_in saddr;
550 int n, i;
551
552 ifaces = NULL;
553
554 if (hostnames != NULL && addresses != NULL && *hostnames != '\0' &&
555 *addresses != '\0') {
556 if ((n = mm_straspl(hosts, hostnames, 64)) ==
557 mm_straspl(ips, addresses, 64)) {
558 if ((ifaces = malloc(sizeof(struct iface) * (n + 1))) != NULL) {
559 /* Setup our array */
560 for (i = 0; i < n; i++) {
561 mm_memclr(&saddr, sizeof(struct sockaddr_in));
562 if ((inet_aton(ips[i], &(saddr.sin_addr))) == 1) {
563 mm_strncpy(ifaces[i].hostname, hosts[i], 64);
564 mm_strncpy(ifaces[i].address_str, ips[i], 15);
565 ifaces[i].sock = -1;
566 ifaces[i].address = saddr;
567 *(ifaces[i + 1].hostname) = '\0';
568 } else {
569 syslog(LOG_NOTICE,
570 "parse_ifaces() - Invalid address [%s]",
571 ips[i]);
572 free(ifaces);
573 ifaces = NULL;
574 break;
575 }
576 }
577 }
578 } else
579 syslog(LOG_NOTICE,
580 "parse_ifaces() - Number of hostnames and addresses mismatch");
581 }
582
583 return (ifaces);
584 }
585
586
587 /* SERVER (THREAD) */
588
589 /* This is started by pth_spawn(), it calls the user handler function, which
590 * only has to care about serving the client.
591 */
592 static void *
593 phandleclient(void *args)
594 {
595 phandleinfo *phi;
596 int socket, ret;
597 clientlistnode *clnode;
598 char *tmp;
599 struct iface *iface;
600 struct async_clenv *aclenv;
601 unsigned long id;
602
603 /* Get and discard our passed parameters */
604 phi = (phandleinfo *)args;
605 id = phi->id;
606 socket = phi->socket;
607 clnode = phi->clnode;
608 iface = phi->iface;
609 pth_mutex_acquire(&ppool_lock, FALSE, NULL);
610 pool_free((pnode_t *)phi);
611 pth_mutex_release(&ppool_lock);
612
613 ret = 0;
614
615 if ((aclenv = async_open_clenv())) {
616 if (clnode->resolve) {
617 /* We want to resolve the client's IP address' hostname, but not
618 * if it already was done for another client currently logged on
619 * from that same address.
620 */
621 tmp = NULL;
622 /* We need to determine if hostname should be resolved. */
623 if (clnode->hostname == NULL) {
624 /* It seems so, so let's do it without clobbering the mutex */
625 tmp = resolve_hostname(aclenv, &(clnode->client));
626 }
627 if (tmp) {
628 /* Lock the mutex for a very short moment */
629 pth_mutex_acquire(&ctable_lock, FALSE, NULL);
630 /* Verify again for NULL to avoid a potential memory leak */
631 if(clnode->hostname == NULL) clnode->hostname = tmp;
632 else tmp = mmstrfree(tmp);
633 pth_mutex_release(&ctable_lock);
634 }
635 }
636
637 /* Handle the client */
638 ret = handleclientfunc(id, socket, clnode, iface, aclenv);
639
640 async_close_clenv(aclenv);
641 } else
642 DPRINTF("phandleclient()", "async_open_clenv()");
643
644 /* Cleanup */
645
646 /* Close connection with client */
647 if (socket != -1) {
648 shutdown(socket, 2);
649 close(socket);
650 }
651
652 /* Decrease our connection/refcount counter, and let our ctable thread
653 * decide if/when to uncache our node.
654 */
655 pth_mutex_acquire(&ctable_lock, FALSE, NULL);
656 if (clnode->connections > 0) clnode->connections--;
657 pth_mutex_release(&ctable_lock);
658
659 /* kthxbye :) */
660 pth_exit(&ret);
661
662 /* NOTREACHED */
663 return (NULL);
664 }
665
666
667
668 /* This consists of a general purpose thread which can serve real
669 * asynchroneous functions via another thread, suitable to use for functions
670 * which can block the whole process when using non-preemptive threads like
671 * the Pth library provides. Pth message ports are used to communicate with
672 * this device in a way that processes waiting for results only block
673 * the requesting thread. The function internally uses unix datagrams to
674 * similarly communicate arguments and obtain back results from a free process
675 * in the asynchroneous processes pool. Another advantage of this technique is
676 * that on SMP systems the various processors can now be taken advantage of.
677 * The caller should of course expect data rather than pointers to be used for
678 * both arguments and return values since pointers are only valid for the
679 * current process.
680 *
681 * Drawbacks exist though; socket send() and recv() calls are internally
682 * required, as well as memory copy operations. Moreover, two new
683 * filedescriptor are required in the main process for each asynchroneous
684 * process in our pool.
685 * It should be used where necessary, like calculating MD5 hashes, resolving
686 * hostnames and evaluating directory tree sizes recursively, etc.
687 *
688 * It would have been possible to use different datagram sizes to transfer
689 * arguments and results to/from the other processes, but because of the way
690 * the Pth AmigaOS-style messages work, a decision was made so that unions are
691 * used by async functions and the whole structure is transfered back and
692 * forth.
693 */
694
695 /* ARGSUSED */
696 static void *
697 async_thread(void *args)
698 {
699 list_t queue, *freeprocs = &async->freeprocs;
700 struct async_idx *idx = NULL;
701 struct async_proc *proc;
702 struct pollfd *pfds = NULL;
703 char *ptr;
704 int i, nfds;
705 size_t idx_size, pfd_size;
706
707 /* freeprocs is our pool of available async processes, queue is our
708 * queue of requests waiting to be processed and replied to.
709 * We use the idx array to fast-index process node via an fd which
710 * status changed during poll().
711 */
712
713 /* Initialize our queue, fd->process index and poll array */
714 i = 0;
715 LIST_INIT(&queue);
716 LIST_FOREACH(freeprocs, proc)
717 if (i < proc->sock)
718 i = proc->sock;
719 nfds = freeprocs->nodes;
720 idx_size = (size_t)OALIGN_CEIL(sizeof(struct async_idx) * (i + 1), long);
721 pfd_size = (size_t)OALIGN_CEIL(sizeof(struct pollfd) * (nfds + 1), long);
722 if ((ptr = calloc(idx_size + pfd_size, 1)) != NULL) {
723 idx = (struct async_idx *)ptr;
724 ptr += idx_size;
725 pfds = (struct pollfd *)ptr;
726 } else {
727 /* Fatal error */
728 DPRINTF("async_thread()", "calloc(%d)", idx_size + pfd_size);
729 pth_exit(NULL);
730 }
731 i = 0;
732 LIST_FOREACH(freeprocs, proc) {
733 idx[proc->sock].aproc = proc;
734 pfds[i].fd = proc->sock;
735 pfds[i].events = POLLIN | POLLERR;
736 i++;
737 }
738
739 for (;;) {
740 char *omsg;
741
742 /* First check for new messages and queue them in our queue list if
743 * any, but only wait for them if all processes are free (we don't
744 * expect any results). Special note: Pth will only notify event for
745 * a port if it received a message while it was empty.
746 */
747 if ((pth_msgport_pending(async->port))) {
748 while ((omsg = (char *)pth_msgport_get(async->port)) != NULL)
749 LIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
750 } else if (freeprocs->nodes == async->nprocs) {
751 pth_wait(async->ring);
752 while ((omsg = (char *)pth_msgport_get(async->port)) != NULL)
753 LIST_APPEND(&queue, (node_t *)(omsg - sizeof(pnode_t)));
754 }
755
756 /* Verify for any available processes to dispatch requests to, and
757 * if so, dispatch requests from the queue until no more processes
758 * are available or the queue is empty.
759 */
760 if (LIST_NODES(&queue) > 0 && LIST_NODES(freeprocs) > 0) {
761 struct async_proc *p;
762 struct async_msg *m;
763 size_t len;
764
765 while ((p = LIST_TOP(freeprocs)) != NULL &&
766 (m = LIST_TOP(&queue)) != NULL) {
767 if (m->func_id < async->nfuncs) {
768 len = async->funcs[m->func_id].msg_len;
769 idx[p->sock].aclenv = m->aclenv;
770 if ((pth_send(p->sock, m, len, 0)) == len)
771 LIST_UNLINK(freeprocs, (node_t *)p);
772 else
773 DPRINTF("async_thread()", "send(%d:%d)",
774 p->sock, (int)len);
775 } else
776 DPRINTF("async_thread()", "Unknown function %u",
777 m->func_id);
778 LIST_UNLINK(&queue, (node_t *)m);
779 }
780 }
781
782 /* Wait for results from our async processes via their socket,
783 * but be sure to break when new Pth messages are available.
784 * If new results are obtained, reply back to the caller thread.
785 */
786 for (;;) {
787 int selr, i;
788
789 if ((selr = pth_poll_ev(pfds, nfds, -1, async->ring)) > 0) {
790 for (i = 0; selr > 0 && i < nfds; i++) {
791 if (pfds[i].revents & POLLERR) {
792 /* If this happens something is really wrong, exit */
793 syslog(LOG_NOTICE, "async_thread() - POLLERR(%d)",
794 pfds[i].fd);
795 kill(0, SIGTERM);
796 for (;;)
797 sleep(30);
798 }
799 if (pfds[i].revents & POLLIN) {
800 int fd = pfds[i].fd;
801 size_t len;
802 struct async_proc *p = idx[fd].aproc;
803 struct async_clenv *e = idx[fd].aclenv;
804
805 /* Results, reply message back to client thread and
806 * return this process node in the free pool.
807 */
808 if ((len = pth_recv(fd, e->msg, async->msg_len,
809 MSG_WAITALL)) <
810 sizeof(struct async_msg))
811 DPRINTF("async_thread()", "recv(%d:%d)",
812 fd, (int)async->msg_len);
813 pth_msgport_reply(&(e->msg->msg));
814 LIST_APPEND(freeprocs, (node_t *)p);
815 selr--;
816 }
817 }
818 }
819 if ((pth_event_occurred(async->ring)))
820 /* Other requests we must queue immediately */
821 break;
822 }
823 }
824
825 /* NOTREACHED */
826 pth_exit(NULL);
827 return (NULL);
828 }
829
830
831 /* This function should be called at early application startup to setup
832 * the asynchroneous pool of processes. async_init_pth() call may then
833 * be called later on after pth_init() to initialize the client-side
834 * asynchroneous context (required before using async_open_clenv()).
835 * This function is assumed to be called once only, and it's effects to
836 * only be discarded when the main process exits.
837 */
838 bool
839 async_init(struct async_func *funcs, int procs, uid_t uid, gid_t *gids,
840 int ngids)
841 {
842 unsigned int i, nfuncs;
843 size_t funcs_len, msg_len, env_len;
844 bool res = FALSE;
845
846 /* Evaluate required space to hold user functions plus our resolve one */
847 for (i = 0; funcs[i].func; i++) ;
848 nfuncs = i + 1;
849 funcs_len = sizeof(struct async_func) * (nfuncs + 1);
850
851 /* Evaluate maximum message size we should expect for arguments */
852 msg_len = sizeof(struct async_resolvehostname_msg);
853 for (i = 0; i < nfuncs; i++) {
854 if (msg_len < funcs[i].msg_len)
855 msg_len = funcs[i].msg_len;
856 }
857
858 /* Necessary allocations */
859 env_len = (size_t)OALIGN_CEIL(sizeof(struct async_env), long);
860 funcs_len = (size_t)OALIGN_CEIL(funcs_len, long);
861 msg_len = (size_t)OALIGN_CEIL(msg_len, long);
862 if ((async = calloc(env_len + funcs_len + msg_len, 1)) != NULL) {
863 if (pool_init(&async->freeprocs_pool, malloc, free,
864 sizeof(struct async_proc), procs, 1, 1)) {
865 register char *ptr = (char *)async;
866
867 ptr += env_len;
868 async->funcs = (struct async_func *)ptr;
869 ptr += funcs_len;
870 async->msg = (struct async_msg *)ptr;
871 async->nfuncs = nfuncs + 1;
872 async->nprocs = procs;
873 async->msg_len = msg_len;
874 /* Fill in functions */
875 async->funcs[0].func = async_resolvehostname;
876 async->funcs[0].msg_len = sizeof(struct async_resolvehostname_msg);
877 for (i = 0; i < nfuncs; i++) {
878 async->funcs[i + 1].func = funcs[i].func;
879 async->funcs[i + 1].msg_len = funcs[i].msg_len;
880 }
881 LIST_INIT(&async->freeprocs);
882 res = TRUE;
883 } else
884 DPRINTF("async_init()", "pool_init(freeprocs_pool)");
885 } else
886 DPRINTF("async_init()", "calloc(%d)", env_len + funcs_len + msg_len);
887
888 if (res) {
889 /* Start async slave processes */
890 if (!spawn_async_procs(uid, gids, ngids, msg_len)) {
891 DPRINTF("async_init()", "spawn_async_procs()");
892 res = FALSE;
893 }
894 }
895
896 if (!res) {
897 if (async) {
898 pool_destroy(&async->freeprocs_pool);
899 free(async);
900 async = NULL;
901 }
902 }
903
904 return (res);
905 }
906
907
908 /* This function is used by an application after pth_init() to initialize
909 * the asynchroneous thread device/server, which will dispatch requests to
910 * the asynchroneous pool of processes in a thread-safe manner (without
911 * blocking the whole process. async_init() should have been called first.
912 */
913 bool
914 async_init_pth(void)
915 {
916 size_t env_len;
917 bool res = FALSE;
918
919 if (async) {
920
921 res = TRUE;
922 pth_mutex_init(&async->apool_lock);
923 /* Setup our fast-allocation pool for client-side async environments */
924 env_len = (size_t)OALIGN_CEIL(sizeof(struct async_clenv), long);
925 if (!pool_init(&async->apool, malloc, free, env_len + async->msg_len,
926 16384 / (env_len + async->msg_len), 0, 0)) {
927 DPRINTF("async_init_pth()", "pool_init(apool)");
928 res = FALSE;
929 }
930
931 if (res) {
932 pth_attr_t attrs;
933
934 res = FALSE;
935 if ((attrs = pth_attr_new()) != NULL) {
936 /* Start async slave thread device */
937 if ((async->port = pth_msgport_create("mms_async_server"))) {
938 if ((async->ring = pth_event(PTH_EVENT_MSG,
939 async->port))) {
940 pth_attr_set(attrs, PTH_ATTR_JOINABLE, FALSE);
941 if ((pth_spawn(attrs, async_thread, NULL)) != NULL)
942 res = TRUE;
943 pth_attr_destroy(attrs);
944 } else
945 DPRINTF("async_init_pth()", "pth_event()");
946 } else
947 DPRINTF("async_init_pth()", "pth_msgport_create()");
948 pth_attr_destroy(attrs);
949 }
950 }
951
952 if (!res) {
953 if (async->ring) {
954 pth_event_free(async->ring, PTH_FREE_ALL);
955 async->ring = NULL;
956 }
957 if (async->port) {
958 pth_msgport_destroy(async->port);
959 async->port = NULL;
960 }
961 pool_destroy(&async->apool);
962 }
963
964 } else
965 DPRINTF("async_init_pth()", "Call async_init() first");
966
967 return (res);
968 }
969
970
971 /* Used by a client thread to initialize a context suitable to call the
972 * async_call() function with. Each thread needs one to use the device.
973 * async_init() and async_init_pth() should have been called first.
974 */
975 struct async_clenv *
976 async_open_clenv(void)
977 {
978 struct async_clenv *aclenv;
979
980 if (async != NULL && POOL_VALID(&async->apool)) {
981 /* Optimized for speed using a pool of pre-allocated nodes */
982 pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
983 aclenv = (struct async_clenv *)pool_alloc(&async->apool, FALSE);
984 pth_mutex_release(&async->apool_lock);
985 if (aclenv) {
986 if ((aclenv->port = pth_msgport_create("XXX(NULL)"))) {
987 if ((aclenv->ring = pth_event(PTH_EVENT_MSG, aclenv->port))) {
988 register char *ptr = (char *)aclenv;
989
990 ptr += (size_t)OALIGN_CEIL(sizeof(struct async_clenv),
991 long);
992 aclenv->msg = (struct async_msg *)ptr;
993 /* Pth has no cleaner API for this at current time */
994 aclenv->msg->msg.m_replyport = aclenv->port;
995 aclenv->msg->msg.m_size = async->msg_len;
996
997 return (aclenv);
998 }
999 pth_msgport_destroy(aclenv->port);
1000 }
1001 pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
1002 pool_free((pnode_t *)aclenv);
1003 pth_mutex_release(&async->apool_lock);
1004 }
1005 } else
1006 DPRINTF("async_open_clenv()",
1007 "Call async_init() and async_init_pth() first");
1008
1009 return (NULL);
1010 }
1011
1012
1013 /* Destroys an async_clenv which was previously initialized using the
1014 * async_open_clenv() function.
1015 */
1016 struct async_clenv *
1017 async_close_clenv(struct async_clenv *aclenv)
1018 {
1019 pth_event_free(aclenv->ring, PTH_FREE_ALL);
1020 pth_msgport_destroy(aclenv->port);
1021
1022 pth_mutex_acquire(&async->apool_lock, FALSE, NULL);
1023 pool_free((pnode_t *)aclenv);
1024 pth_mutex_release(&async->apool_lock);
1025
1026 return (NULL);
1027 }
1028
1029
1030 /* This is the function which is used to call async functions handled by
1031 * async_thread() and our pool of slave processes. It does not check for
1032 * timeout, the async functions set should be sure to always return within a
1033 * reasonable delay if they could block indefinitely.
1034 */
1035 void
1036 async_call(struct async_clenv *aclenv, unsigned int function)
1037 {
1038 /* Send request */
1039 aclenv->msg->func_id = function;
1040 aclenv->msg->aclenv = aclenv;
1041 pth_msgport_put(async->port, &(aclenv->msg->msg));
1042
1043 /* Sleep until we get reply back, in a thread-safe manner.
1044 * Note: This will permanently block the current thread if the
1045 * task does not complete. It would be possible to respect a timeout
1046 * here, however this could cause problems. For instance, we may have
1047 * sent a resolve request which takes some time because of a network
1048 * problem, and the client could disconnect (and this thread end).
1049 * The async function reply would then attempt to use a no longer
1050 * existing port. (It could look for the port name in the list, but
1051 * this would have considerable performance drawbacks. Ideally,
1052 * any async operation would be abortable by the thread that initiated
1053 * it. This could be tricky to implement in order to work for everything
1054 * properly, avoiding resource leaks). So blocking the thread until
1055 * we get a reply back seems a good choice here; The async functions
1056 * should ensure to eventually return.
1057 */
1058 for (;;) {
1059 if ((pth_wait(aclenv->ring))) {
1060 if ((pth_event_occurred(aclenv->ring))) {
1061 /* We know that message pointer will be the same */
1062 if ((pth_msgport_get(aclenv->port)))
1063 break;
1064 }
1065 }
1066 }
1067 }
1068
1069
1070 /* This function is called by async_init() and starts up all slave
1071 * asynchroneous processes. It knows when to stop when the blocklist has no
1072 * more entries. It is expected that make_daemon() was used first, so that
1073 * filedescriptors 0-2 be already redirected to /dev/null.
1074 */
1075 static bool
1076 spawn_async_procs(uid_t uid, gid_t *gids, int ngids, size_t msg_len)
1077 {
1078 struct async_proc *aproc;
1079 int s[2], opt;
1080 bool ret = FALSE;
1081
1082 if (async == NULL)
1083 return ret;
1084
1085 do {
1086 if ((aproc = (struct async_proc *)pool_alloc(&async->freeprocs_pool,
1087 TRUE))) {
1088 if ((socketpair(PF_UNIX, SOCK_DGRAM, 0, s)) == 0) {
1089
1090 /* Set buffer sizes to make sure that atomic datagram
1091 * operations work as expected for the maximum packet size
1092 */
1093 opt = (int)BALIGN_CEIL(msg_len, 1024);
1094 if (setsockopt(s[0], SOL_SOCKET, SO_SNDBUF, &opt,
1095 sizeof(int)) == -1 ||
1096 setsockopt(s[0], SOL_SOCKET, SO_RCVBUF, &opt,
1097 sizeof(int)) == -1 ||
1098 setsockopt(s[1], SOL_SOCKET, SO_SNDBUF, &opt,
1099 sizeof(int)) == -1 ||
1100 setsockopt(s[1], SOL_SOCKET, SO_RCVBUF, &opt,
1101 sizeof(int)) == -1)
1102 DPRINTF("spawn_async_proc()", "setsockopt(SO_*BUF)");
1103
1104 aproc->sock = s[0];
1105 /* Create new process */
1106 if (!(aproc->pid = fork())) {
1107 close(s[0]);
1108
1109 /* Child */
1110 chdir("/");
1111 umask(0);
1112
1113 /* Finally drop privileges and perform our tasks */
1114 if (mmdropprivs(uid, gids, ngids)) {
1115 close(s[0]);
1116 while ((aproc = LIST_TOP(&async->freeprocs)) != NULL) {
1117 close(aproc->sock);
1118 LIST_UNLINK(&async->freeprocs, (node_t *)aproc);
1119 }
1120 async_proc_main(s[1]);
1121 } else
1122 DPRINTF("spawn_async_procs()", "mmdropprivs()");
1123
1124 } else if (aproc->pid == -1) {
1125 pool_free((pnode_t *)aproc);
1126 aproc = NULL;
1127 DPRINTF("spawn_async_procs()", "fork()");
1128 close(s[0]);
1129 close(s[1]);
1130 }
1131 } else
1132 DPRINTF("spawn_async_procs()", "socketpair()");
1133 }
1134 if (aproc) {
1135 close(s[1]);
1136 LIST_APPEND(&async->freeprocs, (node_t *)aproc);
1137 }
1138 else ret = TRUE;
1139 } while (aproc);
1140
1141 return (ret);
1142 }
1143
1144
1145 /* The main loop of each asynchroneous process in the pool */
1146 static void
1147 async_proc_main(int sock)
1148 {
1149 struct pollfd pfd[] = {
1150 {-1, POLLIN | POLLERR, 0}
1151 };
1152
1153 #ifndef __GLIBC__
1154 setproctitle("Asynchroneous server process");
1155 #endif
1156
1157 pfd[0].fd = sock;
1158
1159 for (;;) {
1160 int selr;
1161 unsigned int func_id;
1162 size_t len;
1163
1164 /* Wait for packets, process them and send result packets */
1165 if ((selr = poll(pfd, 1, -1))) {
1166 if (selr == 1) {
1167 if (pfd[0].revents & POLLERR) {
1168 /* This should not happen. If it does something really
1169 * went wrong; Exit.
1170 */
1171 DPRINTF("async_proc_main()", "Socket error (%d)", sock);
1172 kill(0, SIGTERM);
1173 for (;;)
1174 sleep(30);
1175 }
1176 if (pfd[0].revents & POLLIN) {
1177 if ((len = recv(sock, async->msg, async->msg_len,
1178 MSG_WAITALL))
1179 >= sizeof(struct async_msg)) {
1180 func_id = async->msg->func_id;
1181 if (func_id < async->nfuncs &&
1182 len == async->funcs[func_id].msg_len) {
1183 async->funcs[func_id].func(async->msg);
1184 if ((send(sock, async->msg, len, 0)) != len)
1185 DPRINTF("async_proc_main()", "send(%d:%d)",
1186 sock, (int)len);
1187 } else
1188 DPRINTF("async_proc_main()",
1189 "Invalid function %u on socket %d",
1190 func_id, sock);
1191 } else
1192 DPRINTF("async_proc_main()", "recv(%d:%d)",
1193 sock, (int)async->msg_len);
1194 }
1195 }
1196 }
1197 }
1198 }
1199
1200
1201 /* This function will always be available in the asynchoneous functions set,
1202 * referenced as ASYNC_RESOLVEHOSTNAME when using async_call().
1203 */
1204 static void
1205 async_resolvehostname(struct async_msg *msg)
1206 {
1207 struct async_resolvehostname_msg *amsg = (void *)msg;
1208 struct sockaddr *saddr = &(amsg->un.args.saddr);
1209
1210 if ((getnameinfo(saddr, sizeof(struct sockaddr_in), amsg->un.res.hostname,
1211 255, NULL, 0, 0)) != 0) {
1212 DPRINTF("async_resolvehostname()", "getnameinfo()");
1213 mm_strcpy(amsg->un.res.hostname, "unknown");
1214 }
1215 }
1216
1217
1218 /* Easy function wrapper for async_resolvehostname(). mmstrfree() should
1219 * be called to release the supplied result string when no longer needed.
1220 * This function is thread-safe, as well as asynchroneous, the current
1221 * process (all threads) will remain free to execute other tasks when
1222 * the current thread waits for the hostname resolution to complete, as the
1223 * request is dispatched to one of the asynchroneous processes in our pool.
1224 */
1225 char *
1226 resolve_hostname(struct async_clenv *aclenv, struct sockaddr *saddr)
1227 {
1228 char *tmp;
1229 struct async_resolvehostname_msg *amsg = (void *)aclenv->msg;
1230
1231 if ((tmp = mmstralloc(255)) != NULL) {
1232 mm_memcpy(&(amsg->un.args.saddr), saddr, sizeof(struct sockaddr));
1233 async_call(aclenv, ASYNC_RESOLVEHOSTNAME);
1234 mm_strncpy(tmp, amsg->un.res.hostname, 255);
1235 } else
1236 DPRINTF("resolve_hostname()", "mmstralloc()");
1237
1238 return (tmp);
1239 }
1240
1241
1242 /* These are related to the clnode ipaddress hash table. */
1243
1244
1245 /* A quick hashtable_hash() replacement which already deals with unique
1246 * 32-bit values
1247 */
1248 /* ARGSUSED */
1249 static u_int32_t
1250 clnode_keyhash(const void *data, size_t len)
1251 {
1252 return *((u_int32_t *)data);
1253 }
1254
1255
1256 /* A quick memcmp() replacement which only needs to compare two 32-bit values
1257 */
1258 /* ARGSUSED */
1259 static int
1260 clnode_keycmp(const void *src, const void *dst, size_t len)
1261 {
1262 return *((u_int32_t *)src) - *((u_int32_t *)dst);
1263 }
1264
1265
1266 /* This is the actual ctable hash control and expiration thread */
1267 /* ARGSUSED */
1268 static void *
1269 clnode_expire_thread(void *args)
1270 {
1271 long period = *((long *)args);
1272 struct clnode_expire_thread_iterator_udata data;
1273
1274 /* Set initial timeout to maximum allowed */
1275 data.soonest = time(NULL) + period;
1276 data.cnt = 0;
1277 data.period = period;
1278 for (;;) {
1279 /* Sleep until it is known that at least one node expired */
1280 pth_sleep(data.soonest - time(NULL));
1281 /* Tell our iterator function the current time and the maximum
1282 * allowed time to wait to
1283 */
1284 data.current = time(NULL);
1285 data.soonest = data.current + period;
1286 /* Lock ctable, reset expired nodes, garbage collect, and set
1287 * data.soonest to the time of the soonest next expireing node.
1288 */
1289 pth_mutex_acquire(&ctable_lock, FALSE, NULL);
1290 if (HASHTABLE_NODES(&ctable) > 0)
1291 hashtable_iterate(&ctable, clnode_expire_thread_iterator, &data);
1292 pth_mutex_release(&ctable_lock);
1293 }
1294
1295 /* NOTREACHED */
1296 pth_exit(NULL);
1297 return NULL;
1298 }
1299
1300
1301 static bool
1302 clnode_expire_thread_iterator(hashnode_t *hnod, void *udata)
1303 {
1304 clientlistnode *clnode = (clientlistnode *)hnod;
1305 struct clnode_expire_thread_iterator_udata *data = udata;
1306
1307 if (clnode->period <= data->current) {
1308 /* This entry expired, reset it */
1309 clnode->period = data->current + data->period;
1310 clnode->rate = 0;
1311 } else {
1312 /* We must find and record the soonest expireing node */
1313 if (data->soonest > clnode->period)
1314 data->soonest = clnode->period;
1315 }
1316 if (clnode->connections == 0 && clnode->rate == 0) {
1317 /* Safe to expunge this node from the cache */
1318 hashtable_unlink(&ctable, (hashnode_t *)clnode);
1319 pool_free((pnode_t *)clnode);
1320 }
1321
1322 /* If the cache is big, prevent from interfering with other threads */
1323 if ((data->cnt++) == 64) {
1324 data->cnt = 0;
1325 pth_yield(NULL);
1326 }
1327
1328 return TRUE;
1329 }