- Add SERVER-TIME-UNIVERSAL to convert a Unix timestamp to universal-time
[mmondor.git] / mmsoftware / cl / server / ecl-mp-server.lisp
1 ;;; $Id: ecl-mp-server.lisp,v 1.19 2011/09/01 07:05:04 mmondor Exp $
2
3 #|
4
5 Copyright (c) 2011, Matthew Mondor
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions
10 are met:
11 1. Redistributions of source code must retain the above copyright
12 notice, this list of conditions and the following disclaimer.
13 2. Redistributions in binary form must reproduce the above copyright
14 notice, this list of conditions and the following disclaimer in the
15 documentation and/or other materials provided with the distribution.
16
17 THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
18 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
22 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28 |#
29
30 ;;; ecl-mp-server.lisp - Simple library for TCP server applications
31
32 ;;; XXX TODO XXX
33 ;;; - Rework logging format, make connection/disconnection logging optional
34 ;;; and provide a connection-unique identifier like mmserver(3) does.
35 ;;; Perhaps locally bind a *log-session* or such variable with it...
36 ;;; - Perhaps provide an object to the serving functions rather than
37 ;;; a number of arguments.
38 ;;; - File/syslog logging (currently only supports in-memory logging)
39 ;;; - Maybe use reuseaddr/reuseport
40
41
42 (declaim (optimize (speed 3) (safety 1) (debug 3)))
43
44 (eval-when (:compile-toplevel :load-toplevel)
45 (require :sb-bsd-sockets)
46 (load "dlist"))
47
48
49 (defpackage :server
50 (:use :cl :sb-bsd-sockets :dlist)
51 (:export #:server-config
52 #:make-server-config
53 #:log-clear
54 #:log-tail
55 #:log-line
56 #:stacktrace
57 #:server-init
58 #:server-cleanup
59 #:server-stat
60 #:line-read
61 #:stream-fd
62 #:address-string
63 #:address-fixnum
64 #:server-time
65 #:server-time-posix
66 #:server-time-unix
67 #:server-time-universal
68 #:server-time-rfc
69 #:server-time-rfc-parse
70 #:string-split
71 #:connection
72 #:connection-fd
73 #:connection-socket
74 #:connection-stream
75 #:connection-address-vector
76 #:connection-address-string
77 #:connection-port
78 #:connection-session
79 #:*connection*
80 #:*buffer*))
81
82 (in-package :server)
83
84 (defparameter *rcsid*
85 "$Id: ecl-mp-server.lisp,v 1.19 2011/09/01 07:05:04 mmondor Exp $")
86
87
88 (defun noop (&rest args)
89 (declare (ignore args))
90 nil)
91
92
93 (defstruct (server-config (:conc-name config-))
94 (log-connections t :type boolean)
95 (stacktrace t :type boolean)
96 (external-format :utf-8 :type symbol)
97 (listen-address "127.0.0.1" :type string)
98 (listen-port 7777 :type fixnum)
99 (log-lines 1000 :type fixnum)
100 (input-timeout 60 :type fixnum)
101 (children-initial 16 :type fixnum)
102 (children-minspare 16 :type fixnum)
103 (children-maxspare 16 :type fixnum)
104 (children-maximum 64 :type fixnum)
105 (children-avg-seconds 60 :type fixnum)
106 (conn-per-addr 8 :type fixnum)
107 (buffer 0 :type fixnum)
108 (serve-function 'server::noop :type symbol)
109 (overflow-function 'server::noop :type symbol))
110
111 (defvar *config* nil)
112
113
114 ;;; Various utility functions
115
116 (defun string-split (string &key
117 (separators '(#\Space))
118 (trim-parts nil)
119 (start 0) (end nil) (max nil))
120 "Returns an array containing the parts of STRING, separated by one of
121 the specified characters SEPARATORS, and optionally trimmed from the
122 characters specified in TRIM-PARTS (defaults to NIL for no trimming).
123 Begins scanning STRING from position START (defaults to 0), and stops
124 at END (defaults for NIL for end of string). Returns a maximum of MAX
125 elements (defaults to NIL for no limit)."
126 (declare (type fixnum start)
127 (type (or null fixnum) end max))
128 (flet ((separator-p (c)
129 (declare (type character c))
130 (member c separators)))
131 (let ((array (make-array (or max 16)
132 :fill-pointer 0
133 :adjustable t))
134 (array-len 0)
135 (s start))
136 (declare (type fixnum array-len)
137 (type (or null fixnum) s))
138 (unless end
139 (setf end (length string)))
140 (when max
141 (decf max))
142 (loop
143 for i of-type fixnum from s below end
144 until (and max (< max array-len))
145 while (setf s (position-if-not #'separator-p string :start i))
146 do
147 (setf i (or (position-if #'separator-p string :start s)
148 end))
149 (let ((w (subseq string s i)))
150 (vector-push-extend (if trim-parts
151 (string-trim trim-parts w)
152 w)
153 array 16))
154 (incf array-len))
155 array)))
156
157 (defvar *time-lock* (mp:make-lock :name 'time-lock))
158
159 (defvar *time* (mp:with-lock (*time-lock*)
160 (get-universal-time)))
161
162 (defun server-time ()
163 "Returns the current universal time in seconds."
164 (let ((time (mp:with-lock (*time-lock*)
165 *time*)))
166 time))
167
168 (defun server-time-posix (&optional (ut (server-time)))
169 "Returns UTC time stamp.
170 UT may optionally be supplied if the current time is already known."
171 ; (declare (type integer ut))
172 (multiple-value-bind
173 (second minute hour date month year)
174 (decode-universal-time ut 0)
175 (format nil "~4,'0D~2,'0D~2,'0D~2,'0D~2,'0D~2,'0D-0000"
176 year month date hour minute second)))
177
178 (defun server-time-unix (&optional (ut (server-time)))
179 "Returns Unix timestamp from Epoch.
180 UT may optionally be supplied if the current time is already known."
181 (- ut 2208988800))
182
183 (defun server-time-universal (unixt)
184 "Returns universal/server time for the Unix timestamp UNIXT."
185 (+ unixt 2208988800))
186
187 (defun server-time-rfc (&optional (ut (server-time)))
188 "Returns RFC UTC time stamp.
189 UT may optionally be supplied if the current time is already known."
190 ; (declare (type (or null integer) unix-time))
191 (let ((days #("Mon" "Tue" "Wed" "Thu" "Fri" "Sat" "Sun"))
192 (months #("Jan" "Feb" "Mar" "Apr" "May" "Jun"
193 "Jul" "Aug" "Sep" "Oct" "Nov" "Dec")))
194 (declare (type (simple-array string (7)) days)
195 (type (simple-array string (12)) months))
196 (multiple-value-bind
197 (second minute hour date month year day)
198 (decode-universal-time ut 0)
199 (format nil "~A, ~2,'0D ~A ~4,'0D ~2,'0D:~2,'0D:~2,'0D GMT"
200 (svref days day)
201 date (svref months (1- month)) year
202 hour minute second))))
203
204 (defun server-time-rfc-parse (time)
205 "Parses TIME string, which may be in one of the following formats:
206 Sun, 06 Nov 1994 08:49:37 GMT ; RFC 822, updated by RFC 1123
207 Sunday, 06-Nov-94 08:49:37 GMT ; RFC 850, obsoleted by RFC 1036
208 Sun Nov 6 08:49:37 1994 ; ANSI C's asctime() format
209 Allows numeric timezones and symbolic RFC 822 ones (corrected as per
210 RFC 1123).
211 Returns a UTC-relative Common Lisp universal timestamp in seconds.
212 SERVER-TIME-UNIX may optionally be used to convert to a Unix timestamp."
213 (let ((months #("Jan" "Feb" "Mar" "Apr" "May" "Jun"
214 "Jul" "Aug" "Sep" "Oct" "Nov" "Dec"))
215 (parts (string-split time
216 :separators '(#\Space #\, #\: #\-)
217 :trim-parts '(#\Space #\, #\: #\-)
218 :max 8)))
219 ;; Fix year from 2-digit to 4-digit if needed
220 (when (and (= 8 (length parts))
221 (= 2 (length (svref parts 3))))
222 (setf (svref parts 3) (concatenate 'string "19" (svref parts 3))))
223 (let (date month year hour minute second tz)
224 (handler-case
225 (progn
226 (cond ((= 8 (length parts))
227 ;; RFC 822/1123, RFC 850/1036
228 (setf date (parse-integer (svref parts 1))
229 month (1+ (position (svref parts 2) months
230 :test #'string-equal))
231 year (parse-integer (svref parts 3))
232 hour (parse-integer (svref parts 4))
233 minute (parse-integer (svref parts 5))
234 second (parse-integer (svref parts 6))
235 tz (svref parts 7)))
236 ;; ANSI C asctime()
237 ((= 7 (length parts))
238 (setf date (parse-integer (svref parts 2))
239 month (1+ (position (svref parts 1) months
240 :test #'string-equal))
241 year (parse-integer (svref parts 6))
242 hour (parse-integer (svref parts 3))
243 minute (parse-integer (svref parts 4))
244 second (parse-integer (svref parts 5))
245 tz "GMT")))
246 ;; Compensate for #\- stripping when splitting
247 (when (digit-char-p (schar tz 0))
248 (setf tz (concatenate 'string "-" tz)))
249 ;; Convert symbolic or numeric timezone offset to rational
250 (setf tz
251 (cond ((member tz '("GMT" "UTC" "UT" "Z" "J")
252 :test #'string=) 0)
253 ((string= "EDT" tz) -4)
254 ((member tz '("EST" "CDT") :test #'string=) -5)
255 ((member tz '("CST" "MDT") :test #'string=) -6)
256 ((member tz '("CDT" "PDT") :test #'string=) -7)
257 ((string= "PST" tz) -8)
258 ((string= "A" tz) 1)
259 ((string= "M" tz) 12)
260 ((string= "N" tz) -1)
261 ((string= "Y" tz) -12)
262 ((and (= (length tz) 5)
263 (member (schar tz 0) '(#\+ #\-)
264 :test #'char=))
265 (let ((s (if (char= #\+ (schar tz 0)) #'+ #'-))
266 (h (parse-integer (subseq tz 1 3)))
267 (m (parse-integer (subseq tz 3 5))))
268 (funcall s (+ h (* 1/100 m 5/3)))))
269 (t nil)))
270 ;; Finaly calculate timestamp in seconds
271 (encode-universal-time second minute hour date month year tz))
272 (t ()
273 -1)))))
274
275 (defun stream-fd (stream)
276 "Returns the file descriptor number associated to STREAM."
277 (ext:file-stream-fd (two-way-stream-output-stream stream)))
278
279 (defun address-string (addr)
280 "Converts supplied ADDR 4-part integer vector to a string suitable for
281 representation of an IPv4 address."
282 (declare (type (simple-array t (4)) addr)
283 (optimize (speed 3) (safety 0) (debug 0)))
284 (format nil "~A.~A.~A.~A"
285 (svref addr 0) (svref addr 1) (svref addr 2) (svref addr 3)))
286
287 ;;; Macro to convert a multiple arguments operation into combined pairs,
288 ;;; which are more easily inlined to C.
289 (defmacro with-fixnum-reduce ((op) &rest args)
290 (reduce #'(lambda (a b)
291 `(the fixnum (,op (the fixnum ,a) (the fixnum ,b))))
292 args))
293
294 (defun address-fixnum (addr)
295 "Converts supplied ADDR 4-part integer vector to a FIXNUM integer
296 suitable for hashing."
297 (declare (optimize (speed 3) (safety 0) (debug 0))
298 (type (simple-array t (4)) addr))
299 (with-fixnum-reduce (logior)
300 (ash (the fixnum (svref addr 0)) 24)
301 (ash (the fixnum (svref addr 1)) 16)
302 (ash (the fixnum (svref addr 2)) 8)
303 (svref addr 3)))
304
305
306 ;;; Implementation of a simple thread-safe FIFO buffer with limited entries.
307
308 (defstruct fifo
309 (head '() :type list)
310 (tail '() :type list)
311 (count 0 :type fixnum)
312 (size 0 :type fixnum)
313 (lock (mp:make-lock :name 'fifo-lock) :type mp:lock))
314
315 (defun fifo-append (fifo object)
316 (declare (optimize (speed 3) (safety 0) (debug 0)))
317 (with-accessors ((head fifo-head) (tail fifo-tail)
318 (count fifo-count) (size fifo-size)
319 (lock fifo-lock)) fifo
320 (mp:with-lock (lock)
321 (if (= (the fixnum count) (the fixnum size))
322 (setf head (rest head))
323 (incf (the fixnum count)))
324 (let ((new (cons object nil)))
325 (if (null head)
326 (setf head new
327 tail new)
328 (setf (rest tail) new
329 tail new)))))
330 nil)
331
332 (defun fifo-clear (fifo)
333 (mp:with-lock ((fifo-lock fifo))
334 (let ((list '()))
335 (setf (fifo-head fifo) list
336 (fifo-tail fifo) list
337 (fifo-count fifo) 0)))
338 nil)
339
340
341 ;;; Simple memory log implementation using the above FIFO
342
343 (defvar *log-buffer* nil)
344
345 (defun log-line (fmt &rest args)
346 "Appends FORMAT-like results to the server log, prefixed with the current
347 time."
348 (let ((l (if (null args)
349 fmt
350 (apply #'format nil fmt args))))
351 (fifo-append *log-buffer* (format nil "~A ~A" (server-time-posix) l)))
352 nil)
353
354 (defun log-clear ()
355 "Clears the in-memory server log."
356 (fifo-clear *log-buffer*))
357
358 (defun log-tail ()
359 "Writes to *STANDARD-OUTPUT* the contents of the server log."
360 (loop
361 for line in (fifo-head *log-buffer*)
362 do
363 (write-string (format nil "~A~%" line))))
364
365 (defun log-connection (connect session address port)
366 (log-line "~X ~A: [~A:~A]"
367 session
368 (if connect
369 "Connect"
370 "Closed")
371 (address-string address)
372 port))
373
374 (defun log-overflow (connection reason)
375 (log-line "~X Overflow: [~A:~A] (~A)"
376 (connection-session connection)
377 (connection-address-string connection)
378 (connection-port connection)
379 reason))
380
381 ;;; Note that DEBUG level must be at 3 for this to work.
382 (defun stacktrace ()
383 (let ((top (si:ihs-top))
384 (current si::*ihs-current*))
385 (with-output-to-string (s)
386 (format s " Stack trace:~%" current top)
387 ;; Substract STACKTRACE, LOG-ERROR and avoid printing the last
388 ;; node which is always NIL.
389 (loop
390 for i downfrom (- top 2) above current
391 do
392 (format s " in ~A~%"
393 (let ((f (si::ihs-fun i)))
394 (typecase f
395 (generic-function (clos:generic-function-name f))
396 (function (ext:compiled-function-name f))
397 (t f))))))))
398
399 (defun log-error (e)
400 (let ((trace (if (config-stacktrace *config*)
401 (stacktrace)
402 "")))
403 (log-line "# Error of type ~S: ~A~A"
404 (type-of e) e trace)))
405
406
407 ;;; Connection limits management
408
409 (defstruct climit
410 (connections 0 :type fixnum)
411 (table (make-hash-table :test 'eql) :type hash-table)
412 (max-total 64 :type fixnum)
413 (max-address 8 :type fixnum)
414 (lock (mp:make-lock :name 'climit-lock) :type mp:lock))
415
416 (defun climit-add (climit address)
417 (declare (optimize (speed 3) (safety 0) (debug 0))
418 (type fixnum address))
419 (with-accessors ((connections climit-connections)
420 (table climit-table)
421 (max-total climit-max-total)
422 (max-address climit-max-address)
423 (lock climit-lock)) climit
424 (mp:with-lock (lock)
425 (when (>= (the fixnum connections) (the fixnum max-total))
426 (return-from climit-add (values nil :max-total)))
427 (let ((node (gethash address table)))
428 (when (and node (>= (the fixnum (car node))
429 (the fixnum max-address)))
430 (return-from climit-add (values nil :max-address)))
431 (if node
432 (the fixnum (incf (the fixnum (car node))))
433 (setf (gethash address table) (cons 1 nil))))
434 (incf (the fixnum connections)))
435 (values t :success)))
436
437 (defun climit-remove (climit address)
438 (declare (optimize (speed 3) (safety 0) (debug 0))
439 (type fixnum address))
440 (with-accessors ((connections climit-connections)
441 (table climit-table)
442 (lock climit-lock)) climit
443 (mp:with-lock (lock)
444 (let ((node (gethash address table)))
445 (when (zerop (the fixnum (decf (the fixnum (car node)))))
446 (remhash address table)))
447 (decf (the fixnum connections)))
448 nil))
449
450
451 (defun bind-socket ()
452 (let ((server-socket (make-instance 'inet-socket
453 :type :stream
454 :protocol :tcp)))
455 (socket-bind server-socket
456 (make-inet-address (config-listen-address *config*))
457 (config-listen-port *config*))
458 (socket-listen server-socket (config-children-maximum *config*))
459 server-socket))
460
461
462 ;;; System which allows the thread-manager thread and REPL commands to
463 ;;; to query the state of worker threads and manage them.
464 ;;; We insert one node per thread, which the thread is left a reference
465 ;;; to so it can update its state.
466 ;;; This also allows the parent thread to notify a thread that it should
467 ;;; quit but only when done serving its client (by setting the status to
468 ;;; :QUIT, in which case the child may set its status to :DEAD so the
469 ;;; thread-manager thread may free that object.
470
471 (defstruct thread-node
472 (lock (mp:make-lock :name 'thread-node-lock))
473 thread ; Thread object so parent may kill it
474 (status :init :type symbol) ; :init :ready :busy :quit :dead
475 (connections 0 :type integer))
476
477 ;;; Although children threads don't access this list directly (only their
478 ;;; own specific node object), the thread-manager thread as well as the
479 ;;; user REPL commands in SWANK may concurrently access it, so use a lock.
480 (defvar *threads-lock* (mp:make-lock :name 'threads-lock))
481 (defvar *threads-list* (make-dlist))
482 (defvar *thread-node* nil) ; Locally bound by threads
483
484 (defvar *manager-thread* nil)
485
486 (defvar *server-socket* -1)
487 (defvar *climit* nil)
488
489 (defun server-init (&optional (config (make-server-config)))
490 "Initialization function. CONFIG supplies an object of type SERVER-CONFIG
491 holding the wanted configuration. Binds the server socket and launches
492 server threads."
493 (check-type config server-config)
494 (setf *config* config)
495 #-:SWANK
496 (unless (eq (config-external-format *config*) :utf-8)
497 (setf (stream-external-format *standard-output*) '(:LATIN-1 :LF)))
498 (ext:catch-signal ext:+sigpipe+ :ignore)
499 (setf *server-socket* (bind-socket))
500 (setf *log-buffer* (make-fifo :size (config-log-lines *config*)))
501 (setf *climit* (make-climit :max-total (config-children-maximum *config*)
502 :max-address (config-conn-per-addr *config*)))
503 (setf *manager-thread*
504 (mp:process-run-function 'manager-thread #'children-manager-thread))
505 (mp:with-lock (*threads-lock*)
506 (loop
507 repeat (config-children-initial *config*)
508 do
509 (let* ((node (make-thread-node))
510 (n (dnode-alloc node))
511 (thread (mp:with-lock ((thread-node-lock node))
512 (mp:process-run-function 'accept-thread
513 #'accept-loop-thread
514 node))))
515 (setf (thread-node-thread node) thread)
516 (dlist-append *threads-list* n))))
517 t)
518
519 (defun server-cleanup ()
520 "Kills every thread and unbinds the server socket."
521 (handler-case
522 (progn
523 (mp:process-kill *manager-thread*)
524 (setf *manager-thread* nil))
525 (t ()))
526 (let ((threads-list *threads-list*))
527 (mp:with-lock (*threads-lock*)
528 (do-dlist (n threads-list)
529 (let ((node (dnode-object n)))
530 (handler-case
531 (mp:with-lock ((thread-node-lock node))
532 (setf (thread-node-status node) :quit)
533 (mp:process-kill (thread-node-thread node))
534 (dlist-unlink threads-list n))
535 (t ()))))))
536 (log-clear)
537 (handler-case
538 (progn
539 (socket-close *server-socket*)
540 (setf *server-socket* -1))
541 (t ()))
542 t)
543
544
545 (defun server-stat ()
546 "Returns an alist with status on the server threads.
547 :TOTAL - The total number of current worker threads
548 :READY - The number of threads ready to serve a client
549 :BUSY - The number of threads currently busy serving clients
550 :DEAD - The number of transient exiting threads
551 :INIT - The number of transient starting threads
552 :CONNECTIONS - The total of the recorded number of connections recorded
553 for each thread. Note that when threads exit their number of
554 connections are lost and no longer accounted."
555 (declare (optimize (speed 3) (safety 0)))
556 (mp:with-lock (*threads-lock*)
557 (loop
558 with threads-list = *threads-list*
559 for n = (dlist-first threads-list) then (dnode-next n)
560 for node = (if n (dnode-object n) nil)
561 for status = (if node (mp:with-lock ((thread-node-lock node))
562 (thread-node-status node))
563 nil)
564 while n
565 count n into total of-type fixnum
566 sum (thread-node-connections node) into connections of-type fixnum
567 count (eq :init status) into init of-type fixnum
568 count (eq :dead status) into dead of-type fixnum
569 count (eq :ready status) into ready of-type fixnum
570 count (eq :busy status) into busy of-type fixnum
571 finally (return `(:total ,total
572 :ready ,ready
573 :busy ,busy
574 :dead ,dead
575 :init ,init
576 :connections ,connections)))))
577
578
579 (defmacro with-log-errors (&body body)
580 (let ((s-block (intern (symbol-name (gensym "BLOCK")) :keyword)))
581 `(block ,s-block
582 (let ((*debugger-hook* #'(lambda (condition hook)
583 (declare (ignore hook))
584 (log-error condition)
585 (return-from ,s-block nil))))
586 ,@body))))
587
588
589 ;;; Automatic worker threads pool manager
590
591 (defvar *maximum-children-reached* nil)
592
593 (defvar *ready-avg* 0)
594 (defvar *ready-avg-cnt* 0)
595
596 (defun children-manager ()
597 (let* ((threads-list *threads-list*)
598 (dead 0)
599 (ready 0)
600 (busy 0)
601 (config *config*)
602 (children-minspare (config-children-minspare config))
603 (children-maxspare (config-children-maxspare config))
604 (children-maximum (config-children-maximum config))
605 (children-avg-seconds (config-children-avg-seconds config)))
606 (declare (type fixnum dead ready busy
607 children-minspare children-maxspare children-maximum
608 children-avg-seconds))
609 (mp:with-lock (*threads-lock*)
610 (let ((total (dlist-nodes threads-list)))
611 (declare (type fixnum total))
612 (do-dlist (n threads-list)
613 (let* ((node (dnode-object n))
614 (status (mp:with-lock ((thread-node-lock node))
615 (thread-node-status node))))
616 (cond ((eq :dead status)
617 (dlist-unlink threads-list n)
618 (incf dead))
619 ((eq :ready status)
620 (incf ready))
621 ((eq :busy status)
622 (incf busy)))))
623
624 ;; More children needed? Launch them now if allowed.
625 (when (< ready children-minspare)
626 (loop
627 repeat (the fixnum (- children-minspare ready))
628 while (< total children-maximum)
629 do
630 (let* ((node (make-thread-node))
631 (n (dnode-alloc node))
632 (thread (mp:process-run-function 'accept-thread
633 #'accept-loop-thread
634 node)))
635 (setf (thread-node-thread node) thread)
636 (dlist-append threads-list n)
637 (incf total)))
638 (when (and (not *maximum-children-reached*)
639 (= total children-maximum))
640 (setf *maximum-children-reached* t)
641 (log-line "* Maximum number of children reached (~A)" total)))
642
643 ;; Determine if we can safely kill children which are not in use
644 ;; since some time. To do this we maintain average statistics to
645 ;; avoid constantly spawning and killing threads.
646 ;; The average calculation is spread over children-avg-seconds
647 ;; (the number of samples).
648 (if (= *ready-avg-cnt* children-avg-seconds)
649 (progn
650 (setf *ready-avg* (floor (/ *ready-avg* *ready-avg-cnt*)))
651 (let ((overflow (- *ready-avg* children-maxspare)))
652 (when (plusp overflow)
653 (do-dlist (n threads-list)
654 (with-accessors ((lock thread-node-lock)
655 (status thread-node-status)
656 (thread thread-node-thread))
657 (dnode-object n)
658 (mp:with-lock (lock)
659 (when (eq :ready status)
660 (setf status :quit)
661 (handler-case
662 (mp:process-kill thread)
663 (t ()))
664 (dlist-unlink threads-list n))))
665 (when (zerop (decf overflow))
666 (loop-finish)))))
667 (setf *ready-avg* 0
668 *ready-avg-cnt* 0))
669 (progn
670 (incf *ready-avg* ready)
671 (incf *ready-avg-cnt*))))))
672 t)
673
674 ;;; Since we'd need something like setitimer(2), and that we want to
675 ;;; leave the main thread free for interactive REPL and optionally SWANK,
676 ;;; let's simply use a thread for the children threads pool manager.
677 ;;; We also use this thread to update the current time which user code
678 ;;; may want to use to observe timeouts, and which we use to optimize
679 ;;; logging.
680 (defun children-manager-thread ()
681 (let ((*ready-avg* 0)
682 (*ready-avg-count* 0))
683 (loop
684 do
685 (with-log-errors
686 (sleep 1)
687 (let ((time (get-universal-time)))
688 (mp:with-lock (*time-lock*)
689 (setf *time* time)))
690 (children-manager))))
691 nil)
692
693
694 ;;; Makes sure that supplied SOCKET gets closed, that Connect/Disconnect
695 ;;; log entries always exist and match, and that status matches.
696 (defmacro with-socket ((session socket address port) &body body)
697 (let ((s-session (gensym))
698 (s-socket (gensym))
699 (s-address (gensym))
700 (s-port (gensym))
701 (s-log-connections (gensym))
702 (s-lock (gensym))
703 (s-status (gensym))
704 (s-connections (gensym)))
705 `(let ((,s-session ,session)
706 (,s-socket ,socket)
707 (,s-address ,address)
708 (,s-port ,port)
709 (,s-log-connections (config-log-connections *config*)))
710 (with-accessors ((,s-lock thread-node-lock)
711 (,s-status thread-node-status)
712 (,s-connections thread-node-connections))
713 *thread-node*
714 (unwind-protect
715 (mp:with-lock (,s-lock)
716 (incf ,s-connections)
717 (when (eq :ready ,s-status)
718 (setf ,s-status :busy))
719 (when ,s-log-connections
720 (log-connection t ,s-session ,s-address ,s-port))
721 ,@body)
722 (when ,s-log-connections
723 (log-connection nil ,s-session ,s-address ,s-port))
724 (mp:with-lock (,s-lock)
725 (when (eq :busy ,s-status)
726 (setf ,s-status :ready)))
727 (handler-case
728 (socket-close ,s-socket)
729 (t (e)
730 nil)))))))
731
732 ;;; Makes sure to close supplied STREAM.
733 (defmacro with-stream ((stream) &body body)
734 (let ((s-stream (gensym)))
735 `(let ((,s-stream ,stream))
736 (unwind-protect
737 (progn
738 ,@body)
739 (handler-case
740 (close ,s-stream)
741 (t (e)
742 nil))))))
743
744 ;;; Makes sure to match successful CLIMIT-ADD calls with CLIMIT-REMOVE ones.
745 (defmacro with-climit ((climit-var allowed-p-var reason-var address-int-var)
746 &body body)
747 `(multiple-value-bind (,allowed-p-var ,reason-var)
748 (climit-add ,climit-var ,address-int-var)
749 (unwind-protect
750 (progn
751 ,@body)
752 (when ,allowed-p-var
753 (climit-remove ,climit-var ,address-int-var)))))
754
755
756 ;;; Structure holding current connection information.
757 (defstruct connection
758 (fd 0 :type fixnum)
759 socket
760 stream
761 address-vector
762 address-string
763 (port 0 :type fixnum)
764 (session 0 :type integer))
765
766 (defvar *connection* nil
767 "Dynamically scoped variable that points to the current client connection
768 information structure.")
769
770 ;;;
771 (defvar *buffer* nil
772 "Dynamically scoped variable that points to a per-thread general purpose
773 unsigned-byte buffer.")
774
775 (defvar *session-int* (get-universal-time))
776 (defvar *session-lock* (mp:make-lock :name 'session-lock))
777 (defun make-session ()
778 (let ((id 0))
779 (mp:with-lock (*session-lock*)
780 (setf id (incf *session-int*)))
781 id))
782
783 ;;; The main loop of our worker threads. Accepts and serves connections
784 ;;; until told to exit or killed. On OSs where this is necessary, uses
785 ;;; *ACCEPT-LOCK* so that one thread at most is actually accept(3)-blocking
786 ;;; on the file descriptor, while other threads block waiting for the lock
787 ;;; to become available.
788 (defvar *accept-lock* (mp:make-lock :name 'accept-lock))
789
790 (defun accept-loop-thread (node)
791 (mp:with-lock ((thread-node-lock node))
792 (setf (thread-node-status node) :ready))
793 (loop
794 with config = *config*
795 with *thread-node* = node
796 with timeout of-type fixnum = (config-input-timeout config)
797 with external-format = (config-external-format config)
798 with serve-function = (config-serve-function config)
799 with overflow-function = (config-overflow-function config)
800 with climit = *climit*
801 with *buffer* = (make-array (config-buffer config)
802 :element-type '(unsigned-byte 8))
803 until (mp:with-lock ((thread-node-lock node))
804 (eq :quit (thread-node-status node)))
805 do
806 (with-log-errors
807 (multiple-value-bind (socket address port)
808 ;; XXX On NetBSD, a lock around accept(2) is not necessary,
809 ;; but somehow odd bugs can happen without it, only when
810 ;; initializing. If using a lock around accept(2), why not
811 ;; simply use one thread per accepting socket, and defer
812 ;; requests to already launched threads in the pool like
813 ;; mmserver(3) does, rather than the current method...
814 (mp:with-lock (*accept-lock*)
815 (socket-accept *server-socket*))
816 (let ((session (make-session)))
817 (with-socket (session socket address port)
818 ;; XXX Move upwards options which BSD inherit for
819 ;; performance, but Linux unfortunately requires them to
820 ;; be here as it doesn't inherit options.
821 (setf (sockopt-keep-alive socket) t
822 (sockopt-receive-timeout socket) timeout
823 (sockopt-linger socket) 0
824 (sockopt-tcp-nodelay socket) t)
825 (let ((client-stream
826 (socket-make-stream socket
827 :input t
828 :output t
829 :buffering :full
830 :external-format
831 external-format))
832 (address-int (address-fixnum address)))
833 (with-stream (client-stream)
834 (with-climit (climit allowed-p reason address-int)
835 (let ((*connection*
836 (make-connection :fd (socket-file-descriptor
837 socket)
838 :socket socket
839 :stream client-stream
840 :address-vector address
841 :address-string
842 (address-string address)
843 :port port
844 :session session)))
845 (if allowed-p
846 (funcall (symbol-function serve-function)
847 *connection*)
848 (progn
849 (log-overflow *connection* reason)
850 (funcall (symbol-function overflow-function)
851 *connection*
852 reason))))))))))))
853 (mp:with-lock ((thread-node-lock node))
854 (setf (thread-node-status node) :dead))
855 nil)
856
857
858 (defun line-read (stream &key (max 1024))
859 "Reads a text line from STREAM. Lines are expected to be terminated
860 using NewLine (\n), and any trailing NewLine-Return (\r\n) are not
861 provided as part of the returned line string. Will read up to MAX
862 characters from the line (defaults to 1024).
863 If the EXTERNAL-FORMAT is UTF-8 and an invalid UTF-8 input sequence
864 is encountered, invalid octets will be imported as LATIN-1 characters,
865 in which case output will not preserve the original bytes.
866 To obtain literal bytes, use the LATIN-1 EXTERNAL-FORMAT."
867 (declare (type fixnum max))
868 (macrolet ((add-char (c orelse)
869 `(if (<= nchars max)
870 (progn
871 (vector-push-extend ,c line 1024)
872 (the fixnum (incf (the fixnum nchars))))
873 ,(if (eq :finish orelse)
874 '(loop-finish)
875 'nil))))
876 (loop
877 with line = (make-array max
878 :element-type 'character
879 :adjustable t
880 :fill-pointer 0)
881 with nchars of-type fixnum = 0
882 for c of-type character =
883 (handler-bind
884 ((ext:stream-decoding-error
885 #'(lambda (e)
886 (mapc #'(lambda (o)
887 (let ((c (code-char o)))
888 (if (char= #\Newline c)
889 (invoke-restart 'use-value c)
890 (add-char c :finish))))
891 (ext:character-decoding-error-octets e))
892 (invoke-restart 'continue)))
893 (simple-error
894 #'(lambda (e)
895 (declare (ignore e))
896 (error (make-condition 'end-of-file
897 :stream stream)))))
898 (read-char stream))
899 until (char= #\Newline c)
900 do (add-char c :finish)
901 finally (progn
902 (handler-case
903 (loop
904 for c of-type character = (vector-pop line)
905 while (member c '(#\Return #\Newline))
906 finally (add-char c :noop))
907 (simple-error () ; VECTOR-POP may error when string empty
908 nil))
909 (return line)))))