9a6e869696ef3da1135def68677b2897bbbb7550
[mmondor.git] / mmsoftware / cl / server / ecl-mp-server.lisp
1 ;;; $Id: ecl-mp-server.lisp,v 1.18 2011/09/01 03:30:18 mmondor Exp $
2
3 #|
4
5 Copyright (c) 2011, Matthew Mondor
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions
10 are met:
11 1. Redistributions of source code must retain the above copyright
12 notice, this list of conditions and the following disclaimer.
13 2. Redistributions in binary form must reproduce the above copyright
14 notice, this list of conditions and the following disclaimer in the
15 documentation and/or other materials provided with the distribution.
16
17 THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
18 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
22 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28 |#
29
30 ;;; ecl-mp-server.lisp - Simple library for TCP server applications
31
32 ;;; XXX TODO XXX
33 ;;; - Rework logging format, make connection/disconnection logging optional
34 ;;; and provide a connection-unique identifier like mmserver(3) does.
35 ;;; Perhaps locally bind a *log-session* or such variable with it...
36 ;;; - Perhaps provide an object to the serving functions rather than
37 ;;; a number of arguments.
38 ;;; - File/syslog logging (currently only supports in-memory logging)
39 ;;; - Maybe use reuseaddr/reuseport
40
41
42 (declaim (optimize (speed 3) (safety 1) (debug 3)))
43
44 (eval-when (:compile-toplevel :load-toplevel)
45 (require :sb-bsd-sockets)
46 (load "dlist"))
47
48
49 (defpackage :server
50 (:use :cl :sb-bsd-sockets :dlist)
51 (:export #:server-config
52 #:make-server-config
53 #:log-clear
54 #:log-tail
55 #:log-line
56 #:stacktrace
57 #:server-init
58 #:server-cleanup
59 #:server-stat
60 #:line-read
61 #:stream-fd
62 #:address-string
63 #:address-fixnum
64 #:server-time
65 #:server-time-posix
66 #:server-time-unix
67 #:server-time-rfc
68 #:server-time-rfc-parse
69 #:string-split
70 #:connection
71 #:connection-fd
72 #:connection-socket
73 #:connection-stream
74 #:connection-address-vector
75 #:connection-address-string
76 #:connection-port
77 #:connection-session
78 #:*connection*
79 #:*buffer*))
80
81 (in-package :server)
82
83 (defparameter *rcsid*
84 "$Id: ecl-mp-server.lisp,v 1.18 2011/09/01 03:30:18 mmondor Exp $")
85
86
87 (defun noop (&rest args)
88 (declare (ignore args))
89 nil)
90
91
92 (defstruct (server-config (:conc-name config-))
93 (log-connections t :type boolean)
94 (stacktrace t :type boolean)
95 (external-format :utf-8 :type symbol)
96 (listen-address "127.0.0.1" :type string)
97 (listen-port 7777 :type fixnum)
98 (log-lines 1000 :type fixnum)
99 (input-timeout 60 :type fixnum)
100 (children-initial 16 :type fixnum)
101 (children-minspare 16 :type fixnum)
102 (children-maxspare 16 :type fixnum)
103 (children-maximum 64 :type fixnum)
104 (children-avg-seconds 60 :type fixnum)
105 (conn-per-addr 8 :type fixnum)
106 (buffer 0 :type fixnum)
107 (serve-function 'server::noop :type symbol)
108 (overflow-function 'server::noop :type symbol))
109
110 (defvar *config* nil)
111
112
113 ;;; Various utility functions
114
115 (defun string-split (string &key
116 (separators '(#\Space))
117 (trim-parts nil)
118 (start 0) (end nil) (max nil))
119 "Returns an array containing the parts of STRING, separated by one of
120 the specified characters SEPARATORS, and optionally trimmed from the
121 characters specified in TRIM-PARTS (defaults to NIL for no trimming).
122 Begins scanning STRING from position START (defaults to 0), and stops
123 at END (defaults for NIL for end of string). Returns a maximum of MAX
124 elements (defaults to NIL for no limit)."
125 (declare (type fixnum start)
126 (type (or null fixnum) end max))
127 (flet ((separator-p (c)
128 (declare (type character c))
129 (member c separators)))
130 (let ((array (make-array (or max 16)
131 :fill-pointer 0
132 :adjustable t))
133 (array-len 0)
134 (s start))
135 (declare (type fixnum array-len)
136 (type (or null fixnum) s))
137 (unless end
138 (setf end (length string)))
139 (when max
140 (decf max))
141 (loop
142 for i of-type fixnum from s below end
143 until (and max (< max array-len))
144 while (setf s (position-if-not #'separator-p string :start i))
145 do
146 (setf i (or (position-if #'separator-p string :start s)
147 end))
148 (let ((w (subseq string s i)))
149 (vector-push-extend (if trim-parts
150 (string-trim trim-parts w)
151 w)
152 array 16))
153 (incf array-len))
154 array)))
155
156 (defvar *time-lock* (mp:make-lock :name 'time-lock))
157
158 (defvar *time* (mp:with-lock (*time-lock*)
159 (get-universal-time)))
160
161 (defun server-time ()
162 "Returns the current universal time in seconds."
163 (let ((time (mp:with-lock (*time-lock*)
164 *time*)))
165 time))
166
167 (defun server-time-posix (&optional (ut (server-time)))
168 "Returns UTC time stamp.
169 UT may optionally be supplied if the current time is already known."
170 ; (declare (type integer ut))
171 (multiple-value-bind
172 (second minute hour date month year)
173 (decode-universal-time ut 0)
174 (format nil "~4,'0D~2,'0D~2,'0D~2,'0D~2,'0D~2,'0D-0000"
175 year month date hour minute second)))
176
177 (defun server-time-unix (&optional (ut (server-time)))
178 "Returns Unix timestamp from Epoch.
179 UT may optionally be supplied if the current time is already known."
180 (- ut 2208988800))
181
182 (defun server-time-rfc (&optional (unix-time nil))
183 "Returns RFC UTC time stamp.
184 UNIX-TIME may optionally be supplied if the current Unix timestamp is
185 already known."
186 ; (declare (type (or null integer) unix-time))
187 (let ((days #("Mon" "Tue" "Wed" "Thu" "Fri" "Sat" "Sun"))
188 (months #("Jan" "Feb" "Mar" "Apr" "May" "Jun"
189 "Jul" "Aug" "Sep" "Oct" "Nov" "Dec")))
190 (declare (type (simple-array string (7)) days)
191 (type (simple-array string (12)) months))
192 (multiple-value-bind
193 (second minute hour date month year day)
194 (decode-universal-time (if unix-time
195 (+ 2208988800 unix-time)
196 (server-time))
197 0)
198 (format nil "~A, ~2,'0D ~A ~4,'0D ~2,'0D:~2,'0D:~2,'0D GMT"
199 (svref days day)
200 date (svref months (1- month)) year
201 hour minute second))))
202
203 (defun server-time-rfc-parse (time)
204 "Parses TIME string, which may be in one of the following formats:
205 Sun, 06 Nov 1994 08:49:37 GMT ; RFC 822, updated by RFC 1123
206 Sunday, 06-Nov-94 08:49:37 GMT ; RFC 850, obsoleted by RFC 1036
207 Sun Nov 6 08:49:37 1994 ; ANSI C's asctime() format
208 Allows numeric timezones and symbolic RFC 822 ones (corrected as per
209 RFC 1123).
210 Returns a UTC-relative Common Lisp universal timestamp in seconds.
211 SERVER-TIME-UNIX may optionally be used to convert to a Unix timestamp."
212 (let ((months #("Jan" "Feb" "Mar" "Apr" "May" "Jun"
213 "Jul" "Aug" "Sep" "Oct" "Nov" "Dec"))
214 (parts (string-split time
215 :separators '(#\Space #\, #\: #\-)
216 :trim-parts '(#\Space #\, #\: #\-)
217 :max 8)))
218 ;; Fix year from 2-digit to 4-digit if needed
219 (when (and (= 8 (length parts))
220 (= 2 (length (svref parts 3))))
221 (setf (svref parts 3) (concatenate 'string "19" (svref parts 3))))
222 (let (date month year hour minute second tz)
223 (handler-case
224 (progn
225 (cond ((= 8 (length parts))
226 ;; RFC 822/1123, RFC 850/1036
227 (setf date (parse-integer (svref parts 1))
228 month (position (svref parts 2) months
229 :test #'string-equal)
230 year (parse-integer (svref parts 3))
231 hour (parse-integer (svref parts 4))
232 minute (parse-integer (svref parts 5))
233 second (parse-integer (svref parts 6))
234 tz (svref parts 7)))
235 ;; ANSI C asctime()
236 ((= 7 (length parts))
237 (setf date (parse-integer (svref parts 2))
238 month (position (svref parts 1) months
239 :test #'string-equal)
240 year (parse-integer (svref parts 6))
241 hour (parse-integer (svref parts 3))
242 minute (parse-integer (svref parts 4))
243 second (parse-integer (svref parts 5))
244 tz "GMT")))
245 ;; Compensate for #\- stripping when splitting
246 (when (digit-char-p (schar tz 0))
247 (setf tz (concatenate 'string "-" tz)))
248 ;; Convert symbolic or numeric timezone offset to rational
249 (setf tz
250 (cond ((member tz '("GMT" "UTC" "UT" "Z" "J")
251 :test #'string=) 0)
252 ((string= "EDT" tz) -4)
253 ((member tz '("EST" "CDT") :test #'string=) -5)
254 ((member tz '("CST" "MDT") :test #'string=) -6)
255 ((member tz '("CDT" "PDT") :test #'string=) -7)
256 ((string= "PST" tz) -8)
257 ((string= "A" tz) 1)
258 ((string= "M" tz) 12)
259 ((string= "N" tz) -1)
260 ((string= "Y" tz) -12)
261 ((and (= (length tz) 5)
262 (member (schar tz 0) '(#\+ #\-)
263 :test #'char=))
264 (let ((s (if (char= #\+ (schar tz 0)) #'+ #'-))
265 (h (parse-integer (subseq tz 1 3)))
266 (m (parse-integer (subseq tz 3 5))))
267 (funcall s (+ h (* 1/100 m 5/3)))))
268 (t nil)))
269 ;; Finaly calculate timestamp in seconds
270 (encode-universal-time second minute hour date month year tz))
271 (t ()
272 -1)))))
273
274 (defun stream-fd (stream)
275 "Returns the file descriptor number associated to STREAM."
276 (ext:file-stream-fd (two-way-stream-output-stream stream)))
277
278 (defun address-string (addr)
279 "Converts supplied ADDR 4-part integer vector to a string suitable for
280 representation of an IPv4 address."
281 (declare (type (simple-array t (4)) addr)
282 (optimize (speed 3) (safety 0) (debug 0)))
283 (format nil "~A.~A.~A.~A"
284 (svref addr 0) (svref addr 1) (svref addr 2) (svref addr 3)))
285
286 ;;; Macro to convert a multiple arguments operation into combined pairs,
287 ;;; which are more easily inlined to C.
288 (defmacro with-fixnum-reduce ((op) &rest args)
289 (reduce #'(lambda (a b)
290 `(the fixnum (,op (the fixnum ,a) (the fixnum ,b))))
291 args))
292
293 (defun address-fixnum (addr)
294 "Converts supplied ADDR 4-part integer vector to a FIXNUM integer
295 suitable for hashing."
296 (declare (optimize (speed 3) (safety 0) (debug 0))
297 (type (simple-array t (4)) addr))
298 (with-fixnum-reduce (logior)
299 (ash (the fixnum (svref addr 0)) 24)
300 (ash (the fixnum (svref addr 1)) 16)
301 (ash (the fixnum (svref addr 2)) 8)
302 (svref addr 3)))
303
304
305 ;;; Implementation of a simple thread-safe FIFO buffer with limited entries.
306
307 (defstruct fifo
308 (head '() :type list)
309 (tail '() :type list)
310 (count 0 :type fixnum)
311 (size 0 :type fixnum)
312 (lock (mp:make-lock :name 'fifo-lock) :type mp:lock))
313
314 (defun fifo-append (fifo object)
315 (declare (optimize (speed 3) (safety 0) (debug 0)))
316 (with-accessors ((head fifo-head) (tail fifo-tail)
317 (count fifo-count) (size fifo-size)
318 (lock fifo-lock)) fifo
319 (mp:with-lock (lock)
320 (if (= (the fixnum count) (the fixnum size))
321 (setf head (rest head))
322 (incf (the fixnum count)))
323 (let ((new (cons object nil)))
324 (if (null head)
325 (setf head new
326 tail new)
327 (setf (rest tail) new
328 tail new)))))
329 nil)
330
331 (defun fifo-clear (fifo)
332 (mp:with-lock ((fifo-lock fifo))
333 (let ((list '()))
334 (setf (fifo-head fifo) list
335 (fifo-tail fifo) list
336 (fifo-count fifo) 0)))
337 nil)
338
339
340 ;;; Simple memory log implementation using the above FIFO
341
342 (defvar *log-buffer* nil)
343
344 (defun log-line (fmt &rest args)
345 "Appends FORMAT-like results to the server log, prefixed with the current
346 time."
347 (let ((l (if (null args)
348 fmt
349 (apply #'format nil fmt args))))
350 (fifo-append *log-buffer* (format nil "~A ~A" (server-time-posix) l)))
351 nil)
352
353 (defun log-clear ()
354 "Clears the in-memory server log."
355 (fifo-clear *log-buffer*))
356
357 (defun log-tail ()
358 "Writes to *STANDARD-OUTPUT* the contents of the server log."
359 (loop
360 for line in (fifo-head *log-buffer*)
361 do
362 (write-string (format nil "~A~%" line))))
363
364 (defun log-connection (connect session address port)
365 (log-line "~X ~A: [~A:~A]"
366 session
367 (if connect
368 "Connect"
369 "Closed")
370 (address-string address)
371 port))
372
373 (defun log-overflow (connection reason)
374 (log-line "~X Overflow: [~A:~A] (~A)"
375 (connection-session connection)
376 (connection-address-string connection)
377 (connection-port connection)
378 reason))
379
380 ;;; Note that DEBUG level must be at 3 for this to work.
381 (defun stacktrace ()
382 (let ((top (si:ihs-top))
383 (current si::*ihs-current*))
384 (with-output-to-string (s)
385 (format s " Stack trace:~%" current top)
386 ;; Substract STACKTRACE, LOG-ERROR and avoid printing the last
387 ;; node which is always NIL.
388 (loop
389 for i downfrom (- top 2) above current
390 do
391 (format s " in ~A~%"
392 (let ((f (si::ihs-fun i)))
393 (typecase f
394 (generic-function (clos:generic-function-name f))
395 (function (ext:compiled-function-name f))
396 (t f))))))))
397
398 (defun log-error (e)
399 (let ((trace (if (config-stacktrace *config*)
400 (stacktrace)
401 "")))
402 (log-line "# Error of type ~S: ~A~A"
403 (type-of e) e trace)))
404
405
406 ;;; Connection limits management
407
408 (defstruct climit
409 (connections 0 :type fixnum)
410 (table (make-hash-table :test 'eql) :type hash-table)
411 (max-total 64 :type fixnum)
412 (max-address 8 :type fixnum)
413 (lock (mp:make-lock :name 'climit-lock) :type mp:lock))
414
415 (defun climit-add (climit address)
416 (declare (optimize (speed 3) (safety 0) (debug 0))
417 (type fixnum address))
418 (with-accessors ((connections climit-connections)
419 (table climit-table)
420 (max-total climit-max-total)
421 (max-address climit-max-address)
422 (lock climit-lock)) climit
423 (mp:with-lock (lock)
424 (when (>= (the fixnum connections) (the fixnum max-total))
425 (return-from climit-add (values nil :max-total)))
426 (let ((node (gethash address table)))
427 (when (and node (>= (the fixnum (car node))
428 (the fixnum max-address)))
429 (return-from climit-add (values nil :max-address)))
430 (if node
431 (the fixnum (incf (the fixnum (car node))))
432 (setf (gethash address table) (cons 1 nil))))
433 (incf (the fixnum connections)))
434 (values t :success)))
435
436 (defun climit-remove (climit address)
437 (declare (optimize (speed 3) (safety 0) (debug 0))
438 (type fixnum address))
439 (with-accessors ((connections climit-connections)
440 (table climit-table)
441 (lock climit-lock)) climit
442 (mp:with-lock (lock)
443 (let ((node (gethash address table)))
444 (when (zerop (the fixnum (decf (the fixnum (car node)))))
445 (remhash address table)))
446 (decf (the fixnum connections)))
447 nil))
448
449
450 (defun bind-socket ()
451 (let ((server-socket (make-instance 'inet-socket
452 :type :stream
453 :protocol :tcp)))
454 (socket-bind server-socket
455 (make-inet-address (config-listen-address *config*))
456 (config-listen-port *config*))
457 (socket-listen server-socket (config-children-maximum *config*))
458 server-socket))
459
460
461 ;;; System which allows the thread-manager thread and REPL commands to
462 ;;; to query the state of worker threads and manage them.
463 ;;; We insert one node per thread, which the thread is left a reference
464 ;;; to so it can update its state.
465 ;;; This also allows the parent thread to notify a thread that it should
466 ;;; quit but only when done serving its client (by setting the status to
467 ;;; :QUIT, in which case the child may set its status to :DEAD so the
468 ;;; thread-manager thread may free that object.
469
470 (defstruct thread-node
471 (lock (mp:make-lock :name 'thread-node-lock))
472 thread ; Thread object so parent may kill it
473 (status :init :type symbol) ; :init :ready :busy :quit :dead
474 (connections 0 :type integer))
475
476 ;;; Although children threads don't access this list directly (only their
477 ;;; own specific node object), the thread-manager thread as well as the
478 ;;; user REPL commands in SWANK may concurrently access it, so use a lock.
479 (defvar *threads-lock* (mp:make-lock :name 'threads-lock))
480 (defvar *threads-list* (make-dlist))
481 (defvar *thread-node* nil) ; Locally bound by threads
482
483 (defvar *manager-thread* nil)
484
485 (defvar *server-socket* -1)
486 (defvar *climit* nil)
487
488 (defun server-init (&optional (config (make-server-config)))
489 "Initialization function. CONFIG supplies an object of type SERVER-CONFIG
490 holding the wanted configuration. Binds the server socket and launches
491 server threads."
492 (check-type config server-config)
493 (setf *config* config)
494 #-:SWANK
495 (unless (eq (config-external-format *config*) :utf-8)
496 (setf (stream-external-format *standard-output*) '(:LATIN-1 :LF)))
497 (ext:catch-signal ext:+sigpipe+ :ignore)
498 (setf *server-socket* (bind-socket))
499 (setf *log-buffer* (make-fifo :size (config-log-lines *config*)))
500 (setf *climit* (make-climit :max-total (config-children-maximum *config*)
501 :max-address (config-conn-per-addr *config*)))
502 (setf *manager-thread*
503 (mp:process-run-function 'manager-thread #'children-manager-thread))
504 (mp:with-lock (*threads-lock*)
505 (loop
506 repeat (config-children-initial *config*)
507 do
508 (let* ((node (make-thread-node))
509 (n (dnode-alloc node))
510 (thread (mp:with-lock ((thread-node-lock node))
511 (mp:process-run-function 'accept-thread
512 #'accept-loop-thread
513 node))))
514 (setf (thread-node-thread node) thread)
515 (dlist-append *threads-list* n))))
516 t)
517
518 (defun server-cleanup ()
519 "Kills every thread and unbinds the server socket."
520 (handler-case
521 (progn
522 (mp:process-kill *manager-thread*)
523 (setf *manager-thread* nil))
524 (t ()))
525 (let ((threads-list *threads-list*))
526 (mp:with-lock (*threads-lock*)
527 (do-dlist (n threads-list)
528 (let ((node (dnode-object n)))
529 (handler-case
530 (mp:with-lock ((thread-node-lock node))
531 (setf (thread-node-status node) :quit)
532 (mp:process-kill (thread-node-thread node))
533 (dlist-unlink threads-list n))
534 (t ()))))))
535 (log-clear)
536 (handler-case
537 (progn
538 (socket-close *server-socket*)
539 (setf *server-socket* -1))
540 (t ()))
541 t)
542
543
544 (defun server-stat ()
545 "Returns an alist with status on the server threads.
546 :TOTAL - The total number of current worker threads
547 :READY - The number of threads ready to serve a client
548 :BUSY - The number of threads currently busy serving clients
549 :DEAD - The number of transient exiting threads
550 :INIT - The number of transient starting threads
551 :CONNECTIONS - The total of the recorded number of connections recorded
552 for each thread. Note that when threads exit their number of
553 connections are lost and no longer accounted."
554 (declare (optimize (speed 3) (safety 0)))
555 (mp:with-lock (*threads-lock*)
556 (loop
557 with threads-list = *threads-list*
558 for n = (dlist-first threads-list) then (dnode-next n)
559 for node = (if n (dnode-object n) nil)
560 for status = (if node (mp:with-lock ((thread-node-lock node))
561 (thread-node-status node))
562 nil)
563 while n
564 count n into total of-type fixnum
565 sum (thread-node-connections node) into connections of-type fixnum
566 count (eq :init status) into init of-type fixnum
567 count (eq :dead status) into dead of-type fixnum
568 count (eq :ready status) into ready of-type fixnum
569 count (eq :busy status) into busy of-type fixnum
570 finally (return `(:total ,total
571 :ready ,ready
572 :busy ,busy
573 :dead ,dead
574 :init ,init
575 :connections ,connections)))))
576
577
578 (defmacro with-log-errors (&body body)
579 (let ((s-block (intern (symbol-name (gensym "BLOCK")) :keyword)))
580 `(block ,s-block
581 (let ((*debugger-hook* #'(lambda (condition hook)
582 (declare (ignore hook))
583 (log-error condition)
584 (return-from ,s-block nil))))
585 ,@body))))
586
587
588 ;;; Automatic worker threads pool manager
589
590 (defvar *maximum-children-reached* nil)
591
592 (defvar *ready-avg* 0)
593 (defvar *ready-avg-cnt* 0)
594
595 (defun children-manager ()
596 (let* ((threads-list *threads-list*)
597 (dead 0)
598 (ready 0)
599 (busy 0)
600 (config *config*)
601 (children-minspare (config-children-minspare config))
602 (children-maxspare (config-children-maxspare config))
603 (children-maximum (config-children-maximum config))
604 (children-avg-seconds (config-children-avg-seconds config)))
605 (declare (type fixnum dead ready busy
606 children-minspare children-maxspare children-maximum
607 children-avg-seconds))
608 (mp:with-lock (*threads-lock*)
609 (let ((total (dlist-nodes threads-list)))
610 (declare (type fixnum total))
611 (do-dlist (n threads-list)
612 (let* ((node (dnode-object n))
613 (status (mp:with-lock ((thread-node-lock node))
614 (thread-node-status node))))
615 (cond ((eq :dead status)
616 (dlist-unlink threads-list n)
617 (incf dead))
618 ((eq :ready status)
619 (incf ready))
620 ((eq :busy status)
621 (incf busy)))))
622
623 ;; More children needed? Launch them now if allowed.
624 (when (< ready children-minspare)
625 (loop
626 repeat (the fixnum (- children-minspare ready))
627 while (< total children-maximum)
628 do
629 (let* ((node (make-thread-node))
630 (n (dnode-alloc node))
631 (thread (mp:process-run-function 'accept-thread
632 #'accept-loop-thread
633 node)))
634 (setf (thread-node-thread node) thread)
635 (dlist-append threads-list n)
636 (incf total)))
637 (when (and (not *maximum-children-reached*)
638 (= total children-maximum))
639 (setf *maximum-children-reached* t)
640 (log-line "* Maximum number of children reached (~A)" total)))
641
642 ;; Determine if we can safely kill children which are not in use
643 ;; since some time. To do this we maintain average statistics to
644 ;; avoid constantly spawning and killing threads.
645 ;; The average calculation is spread over children-avg-seconds
646 ;; (the number of samples).
647 (if (= *ready-avg-cnt* children-avg-seconds)
648 (progn
649 (setf *ready-avg* (floor (/ *ready-avg* *ready-avg-cnt*)))
650 (let ((overflow (- *ready-avg* children-maxspare)))
651 (when (plusp overflow)
652 (do-dlist (n threads-list)
653 (with-accessors ((lock thread-node-lock)
654 (status thread-node-status)
655 (thread thread-node-thread))
656 (dnode-object n)
657 (mp:with-lock (lock)
658 (when (eq :ready status)
659 (setf status :quit)
660 (handler-case
661 (mp:process-kill thread)
662 (t ()))
663 (dlist-unlink threads-list n))))
664 (when (zerop (decf overflow))
665 (loop-finish)))))
666 (setf *ready-avg* 0
667 *ready-avg-cnt* 0))
668 (progn
669 (incf *ready-avg* ready)
670 (incf *ready-avg-cnt*))))))
671 t)
672
673 ;;; Since we'd need something like setitimer(2), and that we want to
674 ;;; leave the main thread free for interactive REPL and optionally SWANK,
675 ;;; let's simply use a thread for the children threads pool manager.
676 ;;; We also use this thread to update the current time which user code
677 ;;; may want to use to observe timeouts, and which we use to optimize
678 ;;; logging.
679 (defun children-manager-thread ()
680 (let ((*ready-avg* 0)
681 (*ready-avg-count* 0))
682 (loop
683 do
684 (with-log-errors
685 (sleep 1)
686 (let ((time (get-universal-time)))
687 (mp:with-lock (*time-lock*)
688 (setf *time* time)))
689 (children-manager))))
690 nil)
691
692
693 ;;; Makes sure that supplied SOCKET gets closed, that Connect/Disconnect
694 ;;; log entries always exist and match, and that status matches.
695 (defmacro with-socket ((session socket address port) &body body)
696 (let ((s-session (gensym))
697 (s-socket (gensym))
698 (s-address (gensym))
699 (s-port (gensym))
700 (s-log-connections (gensym))
701 (s-lock (gensym))
702 (s-status (gensym))
703 (s-connections (gensym)))
704 `(let ((,s-session ,session)
705 (,s-socket ,socket)
706 (,s-address ,address)
707 (,s-port ,port)
708 (,s-log-connections (config-log-connections *config*)))
709 (with-accessors ((,s-lock thread-node-lock)
710 (,s-status thread-node-status)
711 (,s-connections thread-node-connections))
712 *thread-node*
713 (unwind-protect
714 (mp:with-lock (,s-lock)
715 (incf ,s-connections)
716 (when (eq :ready ,s-status)
717 (setf ,s-status :busy))
718 (when ,s-log-connections
719 (log-connection t ,s-session ,s-address ,s-port))
720 ,@body)
721 (when ,s-log-connections
722 (log-connection nil ,s-session ,s-address ,s-port))
723 (mp:with-lock (,s-lock)
724 (when (eq :busy ,s-status)
725 (setf ,s-status :ready)))
726 (handler-case
727 (socket-close ,s-socket)
728 (t (e)
729 nil)))))))
730
731 ;;; Makes sure to close supplied STREAM.
732 (defmacro with-stream ((stream) &body body)
733 (let ((s-stream (gensym)))
734 `(let ((,s-stream ,stream))
735 (unwind-protect
736 (progn
737 ,@body)
738 (handler-case
739 (close ,s-stream)
740 (t (e)
741 nil))))))
742
743 ;;; Makes sure to match successful CLIMIT-ADD calls with CLIMIT-REMOVE ones.
744 (defmacro with-climit ((climit-var allowed-p-var reason-var address-int-var)
745 &body body)
746 `(multiple-value-bind (,allowed-p-var ,reason-var)
747 (climit-add ,climit-var ,address-int-var)
748 (unwind-protect
749 (progn
750 ,@body)
751 (when ,allowed-p-var
752 (climit-remove ,climit-var ,address-int-var)))))
753
754
755 ;;; Structure holding current connection information.
756 (defstruct connection
757 (fd 0 :type fixnum)
758 socket
759 stream
760 address-vector
761 address-string
762 (port 0 :type fixnum)
763 (session 0 :type integer))
764
765 (defvar *connection* nil
766 "Dynamically scoped variable that points to the current client connection
767 information structure.")
768
769 ;;;
770 (defvar *buffer* nil
771 "Dynamically scoped variable that points to a per-thread general purpose
772 unsigned-byte buffer.")
773
774 (defvar *session-int* (get-universal-time))
775 (defvar *session-lock* (mp:make-lock :name 'session-lock))
776 (defun make-session ()
777 (let ((id 0))
778 (mp:with-lock (*session-lock*)
779 (setf id (incf *session-int*)))
780 id))
781
782 ;;; The main loop of our worker threads. Accepts and serves connections
783 ;;; until told to exit or killed. On OSs where this is necessary, uses
784 ;;; *ACCEPT-LOCK* so that one thread at most is actually accept(3)-blocking
785 ;;; on the file descriptor, while other threads block waiting for the lock
786 ;;; to become available.
787 (defvar *accept-lock* (mp:make-lock :name 'accept-lock))
788
789 (defun accept-loop-thread (node)
790 (mp:with-lock ((thread-node-lock node))
791 (setf (thread-node-status node) :ready))
792 (loop
793 with config = *config*
794 with *thread-node* = node
795 with timeout of-type fixnum = (config-input-timeout config)
796 with external-format = (config-external-format config)
797 with serve-function = (config-serve-function config)
798 with overflow-function = (config-overflow-function config)
799 with climit = *climit*
800 with *buffer* = (make-array (config-buffer config)
801 :element-type '(unsigned-byte 8))
802 until (mp:with-lock ((thread-node-lock node))
803 (eq :quit (thread-node-status node)))
804 do
805 (with-log-errors
806 (multiple-value-bind (socket address port)
807 ;; XXX On NetBSD, a lock around accept(2) is not necessary,
808 ;; but somehow odd bugs can happen without it, only when
809 ;; initializing. If using a lock around accept(2), why not
810 ;; simply use one thread per accepting socket, and defer
811 ;; requests to already launched threads in the pool like
812 ;; mmserver(3) does, rather than the current method...
813 (mp:with-lock (*accept-lock*)
814 (socket-accept *server-socket*))
815 (let ((session (make-session)))
816 (with-socket (session socket address port)
817 ;; XXX Move upwards options which BSD inherit for
818 ;; performance, but Linux unfortunately requires them to
819 ;; be here as it doesn't inherit options.
820 (setf (sockopt-keep-alive socket) t
821 (sockopt-receive-timeout socket) timeout
822 (sockopt-linger socket) 0
823 (sockopt-tcp-nodelay socket) t)
824 (let ((client-stream
825 (socket-make-stream socket
826 :input t
827 :output t
828 :buffering :full
829 :external-format
830 external-format))
831 (address-int (address-fixnum address)))
832 (with-stream (client-stream)
833 (with-climit (climit allowed-p reason address-int)
834 (let ((*connection*
835 (make-connection :fd (socket-file-descriptor
836 socket)
837 :socket socket
838 :stream client-stream
839 :address-vector address
840 :address-string
841 (address-string address)
842 :port port
843 :session session)))
844 (if allowed-p
845 (funcall (symbol-function serve-function)
846 *connection*)
847 (progn
848 (log-overflow *connection* reason)
849 (funcall (symbol-function overflow-function)
850 *connection*
851 reason))))))))))))
852 (mp:with-lock ((thread-node-lock node))
853 (setf (thread-node-status node) :dead))
854 nil)
855
856
857 (defun line-read (stream &key (max 1024))
858 "Reads a text line from STREAM. Lines are expected to be terminated
859 using NewLine (\n), and any trailing NewLine-Return (\r\n) are not
860 provided as part of the returned line string. Will read up to MAX
861 characters from the line (defaults to 1024).
862 If the EXTERNAL-FORMAT is UTF-8 and an invalid UTF-8 input sequence
863 is encountered, invalid octets will be imported as LATIN-1 characters,
864 in which case output will not preserve the original bytes.
865 To obtain literal bytes, use the LATIN-1 EXTERNAL-FORMAT."
866 (declare (type fixnum max))
867 (macrolet ((add-char (c orelse)
868 `(if (<= nchars max)
869 (progn
870 (vector-push-extend ,c line 1024)
871 (the fixnum (incf (the fixnum nchars))))
872 ,(if (eq :finish orelse)
873 '(loop-finish)
874 'nil))))
875 (loop
876 with line = (make-array max
877 :element-type 'character
878 :adjustable t
879 :fill-pointer 0)
880 with nchars of-type fixnum = 0
881 for c of-type character =
882 (handler-bind
883 ((ext:stream-decoding-error
884 #'(lambda (e)
885 (mapc #'(lambda (o)
886 (let ((c (code-char o)))
887 (if (char= #\Newline c)
888 (invoke-restart 'use-value c)
889 (add-char c :finish))))
890 (ext:character-decoding-error-octets e))
891 (invoke-restart 'continue)))
892 (simple-error
893 #'(lambda (e)
894 (declare (ignore e))
895 (error (make-condition 'end-of-file
896 :stream stream)))))
897 (read-char stream))
898 until (char= #\Newline c)
899 do (add-char c :finish)
900 finally (progn
901 (handler-case
902 (loop
903 for c of-type character = (vector-pop line)
904 while (member c '(#\Return #\Newline))
905 finally (add-char c :noop))
906 (simple-error () ; VECTOR-POP may error when string empty
907 nil))
908 (return line)))))