c616cf325f951f7191531f8f07ea3eaa7a1d70e8
[mmondor.git] / mmsoftware / cl / server / ecl-mp-server.lisp
1 ;;; $Id: ecl-mp-server.lisp,v 1.5 2011/08/13 06:48:13 mmondor Exp $
2
3 #|
4
5 Copyright (c) 2011, Matthew Mondor
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions
10 are met:
11 1. Redistributions of source code must retain the above copyright
12 notice, this list of conditions and the following disclaimer.
13 2. Redistributions in binary form must reproduce the above copyright
14 notice, this list of conditions and the following disclaimer in the
15 documentation and/or other materials provided with the distribution.
16
17 THIS SOFTWARE IS PROVIDED BY MATTHEW MONDOR ``AS IS'' AND ANY EXPRESS OR
18 IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 IN NO EVENT SHALL MATTHEW MONDOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
22 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
28 |#
29
30 ;;; ecl-mp-server.lisp - Simple library for TCP server applications
31
32 ;;; XXX TODO XXX
33 ;;; - File/syslog logging (currently only supports in-memory logging)
34 ;;; - Maybe use reuseaddr/reuseport
35 ;;; - We support basic input timeout, but we don't yet support dropping the
36 ;;; client if it hasn't sent a request within a reasonable amount of time
37 ;;; yet still remains active. Perhaps that we could do that simply by
38 ;;; maintaining and checking time when input occurs...
39
40
41 (declaim (optimize (speed 3) (safety 1) (debug 3)))
42
43 (eval-when (:compile-toplevel :load-toplevel)
44 (require :sb-bsd-sockets)
45 (load "dlist"))
46
47
48 (defpackage :server
49 (:use :cl :sb-bsd-sockets :dlist)
50 (:export #:server-config
51 #:make-server-config
52 #:log-clear
53 #:log-tail
54 #:log-line
55 #:server-init
56 #:server-cleanup
57 #:server-stat
58 #:line-read
59 #:address-string
60 #:address-fixnum))
61
62 (in-package :server)
63
64
65 (defun noop (&rest args)
66 (declare (ignore args))
67 nil)
68
69
70 (defstruct (server-config (:conc-name config-))
71 (stacktrace t :type boolean)
72 (external-format :utf-8 :type symbol)
73 (listen-address "127.0.0.1" :type string)
74 (listen-port 7777 :type fixnum)
75 (log-lines 1000 :type fixnum)
76 (input-timeout 60 :type fixnum)
77 (children-initial 16 :type fixnum)
78 (children-minspare 16 :type fixnum)
79 (children-maxspare 16 :type fixnum)
80 (children-maximum 64 :type fixnum)
81 (children-avg-seconds 60 :type fixnum)
82 (conn-per-addr 8 :type fixnum)
83 (serve-function #'noop :type function)
84 (overflow-function #'noop :type function))
85
86 (defvar *config* nil)
87
88
89 ;;; Various utility functions
90
91 (defun time-posix (&optional (ut (get-universal-time)))
92 "Returns UTC time stamp.
93 UT may optionally be supplied if the current time is already known."
94 ; (declare (type integer ut))
95 (multiple-value-bind
96 (second minute hour date month year)
97 (decode-universal-time ut 0)
98 (format nil "~4,'0D~2,'0D~2,'0D~2,'0D~2,'0D~2,'0D-0000"
99 year month date hour minute second)))
100
101 (defun time-rfc (&optional (unix-time nil))
102 "Returns RFC time stamp."
103 ; (declare (type (or null integer) unix-time))
104 (let ((days #("Mon" "Tue" "Wed" "Thu" "Fri" "Sat" "Sun"))
105 (months #("Jan" "Feb" "Mar" "Apr" "May" "Jun"
106 "Jul" "Aug" "Sep" "Oct" "Nov" "Dec")))
107 (declare (type (simple-array string (7)) days)
108 (type (simple-array string (12)) months))
109 (multiple-value-bind
110 (second minute hour date month year day)
111 (decode-universal-time (if unix-time
112 (+ 2208988800 unix-time)
113 (get-universal-time))
114 0)
115 (format nil "~A, ~2,'0D ~A ~4,'0D ~2,'0D:~2,'0D:~2,'0D GMT"
116 (svref days day)
117 date (svref months (1- month)) year
118 hour minute second))))
119
120 (defun address-string (addr)
121 (declare (type (simple-array t (4)) addr)
122 (optimize (speed 3) (safety 0) (debug 0)))
123 (format nil "~A.~A.~A.~A"
124 (svref addr 0) (svref addr 1) (svref addr 2) (svref addr 3)))
125
126 ;;; Macro to convert a multiple arguments operation into combined pairs,
127 ;;; which are more easily inlined to C.
128 (defmacro with-fixnum-reduce ((op) &rest args)
129 (reduce #'(lambda (a b)
130 `(the fixnum (,op (the fixnum ,a) (the fixnum ,b))))
131 args))
132
133 (defun address-fixnum (addr)
134 (declare (optimize (speed 3) (safety 0) (debug 0))
135 (type (simple-array t (4)) addr))
136 (with-fixnum-reduce (logior)
137 (ash (the fixnum (svref addr 0)) 24)
138 (ash (the fixnum (svref addr 1)) 16)
139 (ash (the fixnum (svref addr 2)) 8)
140 (svref addr 3)))
141
142
143 ;;; Implementation of a simple thread-safe FIFO buffer with limited entries.
144
145 (defstruct fifo
146 (head '() :type list)
147 (tail '() :type list)
148 (count 0 :type fixnum)
149 (size 0 :type fixnum)
150 (lock (mp:make-lock :name 'fifo-lock) :type mp:lock))
151
152 (defun fifo-append (fifo object)
153 (declare (optimize (speed 3) (safety 0) (debug 0)))
154 (with-accessors ((head fifo-head) (tail fifo-tail)
155 (count fifo-count) (size fifo-size)
156 (lock fifo-lock)) fifo
157 (mp:with-lock (lock)
158 (if (= (the fixnum count) (the fixnum size))
159 (setf head (rest head))
160 (incf (the fixnum count)))
161 (let ((new (cons object nil)))
162 (if (null head)
163 (setf head new
164 tail new)
165 (setf (rest tail) new
166 tail new)))))
167 nil)
168
169 (defun fifo-clear (fifo)
170 (mp:with-lock ((fifo-lock fifo))
171 (let ((list '()))
172 (setf (fifo-head fifo) list
173 (fifo-tail fifo) list
174 (fifo-count fifo) 0)))
175 nil)
176
177
178 ;;; Simple memory log implementation using the above FIFO
179
180 (defvar *log-buffer* nil)
181
182 (defun log-line (fmt &rest args)
183 (let ((l (if (null args)
184 fmt
185 (apply #'format nil fmt args))))
186 (fifo-append *log-buffer* (format nil "~A ~A" (time-posix) l)))
187 nil)
188
189 (defun log-clear ()
190 (fifo-clear *log-buffer*))
191
192 (defun log-tail ()
193 (loop
194 for line in (fifo-head *log-buffer*)
195 do
196 (write-string (format nil "~A~%" line))))
197
198 (defun log-connection (connect socket address port)
199 (log-line "! ~A: [~A:~A] #~A"
200 (if connect
201 "Connect "
202 "Disconnect")
203 (address-string address)
204 port
205 (socket-file-descriptor socket)))
206
207 (defun log-overflow (socket address port reason)
208 (log-line "* Overflow : [~A:~A] #~A (~A)"
209 (address-string address)
210 port
211 (socket-file-descriptor socket)
212 reason))
213
214 ;;; Note that DEBUG level must be at 3 for this to work.
215 (defun log-stacktrace ()
216 (let ((top (si:ihs-top))
217 (current si::*ihs-current*))
218 (with-output-to-string (s)
219 (format s " Stack trace:~%" current top)
220 ;; Substract LOG-STACKTRACE, LOG-ERROR and avoid printing the last
221 ;; node which is always NIL.
222 (loop
223 for i downfrom (- top 2) above current
224 do
225 (format s " in ~A~%"
226 (let ((f (si::ihs-fun i)))
227 (typecase f
228 (generic-function (clos:generic-function-name f))
229 (function (ext:compiled-function-name f))
230 (t f))))))))
231
232 (defun log-error (e)
233 (let ((trace (if (config-stacktrace *config*)
234 (log-stacktrace)
235 "")))
236 (log-line "# Error of type ~S: ~A~A"
237 (type-of e) e trace)))
238
239
240 ;;; Connection limits management
241
242 (defstruct climit
243 (connections 0 :type fixnum)
244 (table (make-hash-table :test 'eql) :type hash-table)
245 (max-total 64 :type fixnum)
246 (max-address 8 :type fixnum)
247 (lock (mp:make-lock :name 'climit-lock) :type mp:lock))
248
249 (defun climit-add (climit address)
250 (declare (optimize (speed 3) (safety 0) (debug 0))
251 (type fixnum address))
252 (with-accessors ((connections climit-connections)
253 (table climit-table)
254 (max-total climit-max-total)
255 (max-address climit-max-address)
256 (lock climit-lock)) climit
257 (mp:with-lock (lock)
258 (when (>= (the fixnum connections) (the fixnum max-total))
259 (return-from climit-add (values nil :max-total)))
260 (let ((node (gethash address table)))
261 (when (and node (>= (the fixnum (car node))
262 (the fixnum max-address)))
263 (return-from climit-add (values nil :max-address)))
264 (if node
265 (the fixnum (incf (the fixnum (car node))))
266 (setf (gethash address table) (cons 1 nil))))
267 (incf (the fixnum connections)))
268 (values t :success)))
269
270 (defun climit-remove (climit address)
271 (declare (optimize (speed 3) (safety 0) (debug 0))
272 (type fixnum address))
273 (with-accessors ((connections climit-connections)
274 (table climit-table)
275 (lock climit-lock)) climit
276 (mp:with-lock (lock)
277 (let ((node (gethash address table)))
278 (when (zerop (the fixnum (decf (the fixnum (car node)))))
279 (remhash address table)))
280 (decf (the fixnum connections)))
281 nil))
282
283
284 (defun bind-socket ()
285 (let ((server-socket (make-instance 'inet-socket
286 :type :stream
287 :protocol :tcp)))
288 ;;; XXX Do necessary setsockopt(2) calls which BSD inherits
289 (setf (sockopt-tcp-nodelay server-socket) t)
290 (socket-bind server-socket
291 (make-inet-address (config-listen-address *config*))
292 (config-listen-port *config*))
293 (socket-listen server-socket (config-children-maximum *config*))
294 server-socket))
295
296
297 ;;; System which allows the thread-manager thread and REPL commands to
298 ;;; to query the state of worker threads and manage them.
299 ;;; We insert one node per thread, which the thread is left a reference
300 ;;; to so it can update its state.
301 ;;; This also allows the parent thread to notify a thread that it should
302 ;;; quit but only when done serving its client (by setting the status to
303 ;;; :QUIT, in which case the child may set its status to :DEAD so the
304 ;;; thread-manager thread may free that object.
305
306 (defstruct thread-node
307 thread ; Thread object so parent may kill it
308 (status :init :type symbol) ; :init :ready :busy :quit :dead
309 (connections 0 :type integer))
310
311 ;;; Although children threads don't access this list directl (only their
312 ;;; own specific node object), the thread-manager thread as well as the
313 ;;; user REPL commands in SWANK may concurrently access it, so use a lock.
314 (defvar *threads-lock* (mp:make-lock :name 'threads-lock))
315 (defvar *threads-list* (make-dlist))
316 (defvar *thread-node* nil) ; Locally bound by threads
317
318 (defvar *manager-thread* nil)
319
320 (defvar *server-socket* -1)
321 (defvar *climit* nil)
322
323 (defun server-init (&optional (config (make-server-config)))
324 (check-type config server-config)
325 (setf *config* config)
326 #-:SWANK
327 (unless (eq (config-external-format *config*) :utf-8)
328 (setf (stream-external-format *standard-output*) '(:LATIN-1 :LF)))
329 (ext:catch-signal ext:+sigpipe+ :ignore)
330 (setf *server-socket* (bind-socket))
331 (setf *log-buffer* (make-fifo :size (config-log-lines *config*)))
332 (setf *climit* (make-climit :max-total (config-children-maximum *config*)
333 :max-address (config-conn-per-addr *config*)))
334 (setf *manager-thread*
335 (mp:process-run-function 'manager-thread #'children-manager-thread))
336 (mp:with-lock (*threads-lock*)
337 (loop
338 repeat (config-children-initial *config*)
339 do
340 (let* ((node (make-thread-node))
341 (n (dnode-alloc node))
342 (thread (mp:process-run-function 'accept-thread
343 #'accept-loop-thread
344 node)))
345 (setf (thread-node-thread node) thread)
346 (dlist-append *threads-list* n))))
347 t)
348
349 (defun server-cleanup ()
350 (handler-case
351 (progn
352 (mp:process-kill *manager-thread*)
353 (setf *manager-thread* nil))
354 (t ()))
355 (let ((threads-list *threads-list*))
356 (mp:with-lock (*threads-lock*)
357 (do-dlist (n threads-list)
358 (let ((node (dnode-object n)))
359 (handler-case
360 (progn
361 (mp:process-kill (thread-node-thread node))
362 (dlist-unlink threads-list n))
363 (t ()))))))
364 (log-clear)
365 (handler-case
366 (progn
367 (socket-close *server-socket*)
368 (setf *server-socket* -1))
369 (t ()))
370 t)
371
372
373 (defun server-stat ()
374 (let ((threads-list *threads-list*)
375 (dead 0)
376 (ready 0)
377 (busy 0)
378 (total 0)
379 (connections 0))
380 (declare (type fixnum dead ready busy total))
381 (mp:with-lock (*threads-lock*)
382 (do-dlist (n threads-list)
383 (let ((node (dnode-object n)))
384 (with-accessors ((status thread-node-status)) node
385 (incf total)
386 (incf connections (thread-node-connections node))
387 (cond ((eq :dead status)
388 (dlist-unlink threads-list n)
389 (incf dead))
390 ((eq :ready status)
391 (incf ready))
392 ((eq :busy status)
393 (incf busy)))))))
394 `(:total ,total
395 :ready ,ready
396 :busy ,busy
397 :dead ,dead
398 :connections ,connections)))
399
400
401 (defmacro with-log-errors (&body body)
402 (let ((s-block (intern (symbol-name (gensym "BLOCK")) :keyword)))
403 `(block ,s-block
404 (let ((*debugger-hook* #'(lambda (condition hook)
405 (declare (ignore hook))
406 (log-error condition)
407 (return-from ,s-block nil))))
408 ,@body))))
409
410
411 ;;; Automatic worker threads pool manager
412
413 (defvar *maximum-children-reached* nil)
414
415 (defvar *ready-avg* 0)
416 (defvar *ready-avg-cnt* 0)
417
418 (defun children-manager ()
419 (let* ((threads-list *threads-list*)
420 (dead 0)
421 (ready 0)
422 (busy 0)
423 (config *config*)
424 (children-minspare (config-children-minspare config))
425 (children-maxspare (config-children-maxspare config))
426 (children-maximum (config-children-maximum config))
427 (children-avg-seconds (config-children-avg-seconds config)))
428 (declare (type fixnum dead ready busy
429 children-minspare children-maxspare children-maximum
430 children-avg-seconds))
431 (mp:with-lock (*threads-lock*)
432 (let ((total (dlist-nodes threads-list)))
433 (declare (type fixnum total))
434 (do-dlist (n threads-list)
435 (let ((status (thread-node-status (dnode-object n))))
436 (cond ((eq :dead status)
437 (dlist-unlink threads-list n)
438 (incf dead))
439 ((eq :ready status)
440 (incf ready))
441 ((eq :busy status)
442 (incf busy)))))
443
444 ;; More children needed? Launch them now if allowed.
445 (when (< ready children-minspare)
446 (loop
447 repeat (the fixnum (- children-minspare ready))
448 while (< total children-maximum)
449 do
450 (let* ((node (make-thread-node))
451 (n (dnode-alloc node))
452 (thread (mp:process-run-function 'accept-thread
453 #'accept-loop-thread
454 node)))
455 (setf (thread-node-thread node) thread)
456 (dlist-append threads-list n)
457 (incf total)))
458 (when (and (not *maximum-children-reached*)
459 (= total children-maximum))
460 (setf *maximum-children-reached* t)
461 (log-line "* Maximum number of children reached (~A)" total)))
462
463 ;; Determine if we can safely kill children which are not in use
464 ;; since some time. To do this we maintain average statistics to
465 ;; avoid constantly spawning and killing threads.
466 ;; The average calculation is spread over children-avg-seconds
467 ;; (the number of samples).
468 (if (= *ready-avg-cnt* children-avg-seconds)
469 (progn
470 (setf *ready-avg* (floor (/ *ready-avg* *ready-avg-cnt*)))
471 (let ((overflow (- *ready-avg* children-maxspare)))
472 (when (plusp overflow)
473 (do-dlist (n threads-list)
474 (with-accessors ((status thread-node-status)
475 (thread thread-node-thread))
476 (dnode-object n)
477 (when (eq :ready status)
478 (setf status :quit)
479 (handler-case
480 (mp:process-kill thread)
481 (t ()))
482 (dlist-unlink threads-list n)))
483 (when (zerop (decf overflow))
484 (loop-finish)))))
485 (setf *ready-avg* 0
486 *ready-avg-cnt* 0))
487 (progn
488 (incf *ready-avg* ready)
489 (incf *ready-avg-cnt*))))))
490 t)
491
492 ;;; Since we'd need something like setitimer(2), and that we want to
493 ;;; leave the main thread free for interactive REPL and optionally SWANK,
494 ;;; let's simply use a thread for the children threads pool manager.
495 (defun children-manager-thread ()
496 (let ((*ready-avg* 0)
497 (*ready-avg-count* 0))
498 (loop
499 do
500 (with-log-errors
501 (sleep 1)
502 (children-manager))))
503 nil)
504
505
506 ;;; Makes sure that supplied SOCKET gets closed, that Connect/Disconnect
507 ;;; log entries always exist and match, and that status matches.
508 (defmacro with-socket ((socket address port) &body body)
509 (let ((s-socket (gensym))
510 (s-address (gensym))
511 (s-port (gensym)))
512 `(let ((,s-socket ,socket)
513 (,s-address ,address)
514 (,s-port ,port))
515 (with-accessors ((status thread-node-status)
516 (connections thread-node-connections)) *thread-node*
517 (unwind-protect
518 (progn
519 (incf connections)
520 (when (eq :ready status)
521 (setf status :busy))
522 (log-connection t ,s-socket ,s-address ,s-port)
523 ,@body)
524 (log-connection nil ,s-socket ,s-address ,s-port)
525 (when (eq :busy status)
526 (setf status :ready))
527 (handler-case
528 (socket-close ,s-socket)
529 (t (e)
530 nil)))))))
531
532 ;;; Makes sure to close supplied STREAM.
533 (defmacro with-stream ((stream) &body body)
534 (let ((s-stream (gensym)))
535 `(let ((,s-stream ,stream))
536 (unwind-protect
537 (progn
538 ,@body)
539 (handler-case
540 (close ,s-stream)
541 (t (e)
542 nil))))))
543
544 ;;; Makes sure to match successful CLIMIT-ADD calls with CLIMIT-REMOVE ones.
545 (defmacro with-climit ((climit-var allowed-p-var reason-var address-int-var)
546 &body body)
547 `(multiple-value-bind (,allowed-p-var ,reason-var)
548 (climit-add ,climit-var ,address-int-var)
549 (unwind-protect
550 (progn
551 ,@body)
552 (when ,allowed-p-var
553 (climit-remove ,climit-var ,address-int-var)))))
554
555
556 ;;; The main loop of our worker threads. Accepts and serves connections
557 ;;; until told to exit or killed. On OSs where this is necessary, uses
558 ;;; *ACCEPT-LOCK* so that one thread at most is actually accept(3)-blocking
559 ;;; on the file descriptor, while other threads block waiting for the lock
560 ;;; to become available.
561
562 #-netbsd(defvar *accept-lock* (mp:make-lock :name 'accept-lock))
563
564 (defun accept-loop-thread (node)
565 (setf (thread-node-status node) :ready)
566 (loop
567 with config = *config*
568 with *thread-node* = node
569 with timeout of-type fixnum = (config-input-timeout config)
570 with external-format = (config-external-format config)
571 with serve-function = (config-serve-function config)
572 with overflow-function = (config-overflow-function config)
573 with climit = *climit*
574 until (eq :quit (thread-node-status node))
575 do
576 (with-log-errors
577 (multiple-value-bind (socket address port)
578 #-netbsd(mp:with-lock (*accept-lock*)
579 (socket-accept *server-socket*))
580 #+netbsd(socket-accept *server-socket*)
581 (with-socket (socket address port)
582 (setf (sockopt-keep-alive socket) t
583 (sockopt-receive-timeout socket) timeout)
584 (let ((client-stream
585 (socket-make-stream socket
586 :input t
587 :output t
588 :buffering :full
589 :external-format external-format))
590 (address-int (address-fixnum address)))
591 (with-stream (client-stream)
592 (with-climit (climit allowed-p reason address-int)
593 (if allowed-p
594 (funcall serve-function client-stream address port)
595 (progn
596 (log-overflow socket address port reason)
597 (funcall overflow-function client-stream
598 address port reason))))))))))
599 (setf (thread-node-status node) :dead)
600 nil)
601
602
603 (defun line-read (stream)
604 "Reads a text line from STREAM. Lines are expected to be terminated
605 using NewLine (\n), and any trailing NewLine-Return (\r\n) are not
606 provided as part of the returned line string.
607 If the EXTERNAL-FORMAT is UTF-8 and an invalid UTF-8 input sequence
608 is encountered, invalid octets will be imported as LATIN-1 characters,
609 in which case output will not preserve the original bytes.
610 To obtain litteral bytes, use the LATIN-1 EXTERNAL-FORMAT."
611 (let ((line (make-array 512
612 :element-type 'character
613 :adjustable nil
614 :fill-pointer 0)))
615 (macrolet ((add-char (c)
616 `(vector-push ,c line)))
617 (loop
618 do
619 (let ((c (handler-bind
620 ((ext:stream-decoding-error
621 #'(lambda (e)
622 (mapc #'(lambda (o)
623 (let ((c (code-char o)))
624 (if (char= #\Newline c)
625 (invoke-restart 'use-value c)
626 (add-char c))))
627 (ext:character-decoding-error-octets e))
628 (invoke-restart 'continue)))
629 (simple-error
630 #'(lambda (e)
631 (declare (ignore e))
632 (error (make-condition 'end-of-file
633 :stream stream)))))
634 (read-char stream))))
635 (declare (type character c))
636 ;; Terminate loop and return LINE upon \n
637 (when (char= #\Newline c)
638 (handler-case
639 (loop
640 for c of-type character = (vector-pop line)
641 while (member c '(#\Return #\Newline))
642 finally (add-char c))
643 (simple-error () ; VECTOR-POP may error when string empty
644 nil))
645 (return line))
646 (add-char c))))))