1 module lighttp.server.server;
2 
3 import std.string : toLower;
4 import std.system : Endian;
5 
6 import libasync;
7 
8 import xbuffer : Buffer, BufferOverflowException;
9 
10 import lighttp.server.router;
11 import lighttp.util;
12 
13 /**
14  * Options to define how the server behaves.
15  */
16 struct ServerOptions {
17 
18 	/**
19 	 * Name of the server set as value in the `Server` header field
20 	 * and displayed in lighttp's default error messages.
21 	 */
22 	string name = "lighttp/0.6";
23 
24 	/**
25 	 * Indicates whether the handler should catch exceptions.
26 	 * If set to true the server will return a `500 Internal Server Error`
27 	 * upon catching an exception.
28 	 */
29 	bool handleExceptions = true;
30 
31 	/**
32 	 * Indicates the maximum size for a payload. If the header
33 	 * `Content-Length` sent by the client exceeds the indicated
34 	 * length the server will return a `413 Payload too Large`.
35 	 */
36 	size_t max = size_t.max;
37 
38 }
39 
40 /**
41  * Base class for servers.
42  */
43 abstract class ServerBase {
44 	
45 	private ServerOptions _options;
46 	private EventLoop _eventLoop;
47 	private Router _router;
48 	
49 	this(EventLoop eventLoop, ServerOptions options=ServerOptions.init) {
50 		_options = options;
51 		_eventLoop = eventLoop;
52 		_router = new Router();
53 	}
54 	
55 	this(ServerOptions options=ServerOptions.init) {
56 		this(getThreadEventLoop(), options);
57 	}
58 
59 	/**
60 	 * Gets the server's options.
61 	 */
62 	@property ServerOptions options() pure nothrow @safe @nogc {
63 		return _options;
64 	}
65 	
66 	/**
67 	 * Gets the server's event loop. It should be used to
68 	 * run the server.
69 	 * Example:
70 	 * ---
71 	 * auto server = new Server();
72 	 * server.host("0.0.0.0");
73 	 * while(true) server.eventLoop.loop();
74 	 * ---
75 	 */
76 	@property EventLoop eventLoop() pure nothrow @safe @nogc {
77 		return _eventLoop;
78 	}
79 	
80 	/**
81 	 * Gets the server's router.
82 	 */
83 	@property Router router() pure nothrow @safe @nogc {
84 		return _router;
85 	}
86 	
87 	/**
88 	 * Gets the server's default port.
89 	 */
90 	abstract @property ushort defaultPort() pure nothrow @safe @nogc;
91 	
92 	/**
93 	 * Binds the server to the given address.
94 	 * Example:
95 	 * ---
96 	 * server.host("0.0.0.0");
97 	 * server.host("::1", 8080);
98 	 * ---
99 	 */
100 	void host(string ip, ushort port) {
101 		auto listener = new AsyncTCPListener(this.eventLoop);
102 		listener.host(ip, port);
103 		listener.run(&this.handler);
104 	}
105 	
106 	/// ditto
107 	void host(string ip) {
108 		return this.host(ip, this.defaultPort);
109 	}
110 	
111 	/**
112 	 * Calls eventLoop.loop until the given condition
113 	 * is true.
114 	 */
115 	void run(bool delegate() condition) {
116 		while(condition()) this.eventLoop.loop();
117 	}
118 	
119 	/**
120 	 * Calls eventLoop.loop in an infinite loop.
121 	 */
122 	void run() {
123 		while(true) this.eventLoop.loop();
124 	}
125 	
126 	abstract void delegate(TCPEvent) handler(AsyncTCPConnection conn);
127 	
128 }
129 
130 class ServerImpl(T:Connection, ushort _port) : ServerBase {
131 	
132 	this(E...)(E args) { //TODO remove when default constructors are implemented
133 		super(args);
134 	}
135 	
136 	override @property ushort defaultPort() {
137 		return _port;
138 	}
139 	
140 	override void delegate(TCPEvent) handler(AsyncTCPConnection conn) {
141 		Connection ret = new T(this, conn);
142 		return &ret.handle;
143 	}
144 	
145 }
146 
147 /**
148  * Default HTTP server.
149  * Example:
150  * ---
151  * auto server = new Server();
152  * server.host("0.0.0.0");
153  * server.loop();
154  * ---
155  */
156 alias Server = ServerImpl!(DefaultConnection, 80);
157 
158 private ubyte[] __buffer = new ubyte[2 ^^ 24]; // 16 mb
159 
160 class Connection {
161 	
162 	AsyncTCPConnection conn;
163 	
164 	protected Buffer buffer;
165 	
166 	void onStart() {}
167 
168 	protected bool log=false;
169 	
170 	final void handle(TCPEvent event) {
171 		switch(event) with(TCPEvent) {
172 			case READ:
173 				this.buffer.reset();
174 				while(true) {
175 					auto len = this.conn.recv(__buffer);
176 					if(len > 0) this.buffer.write(__buffer[0..len]);
177 					if(len < __buffer.length) break;
178 				}
179 				if (this.buffer.data!void.length > 0)
180 					this.onRead();
181 				break;
182 			case CLOSE:
183 				this.onClose();
184 				break;
185 			default:
186 				break;
187 		}
188 	}
189 	
190 	abstract void onRead();
191 	
192 	abstract void onClose();
193 	
194 }
195 
196 class DefaultConnection : Connection {
197 	
198 	private ServerBase server;
199 
200 	private void delegate() _handle;
201 	private void delegate(ref HandleResult, AsyncTCPConnection, ServerRequest, ServerResponse) _handleRoute;
202 	
203 	this(ServerBase server, AsyncTCPConnection conn) {
204 		this.buffer = new Buffer(4096);
205 		this.server = server;
206 		this.conn = conn;
207 		_handle = &this.handle;
208 		if(this.server.options.handleExceptions) _handleRoute = &this.handleRouteCatch;
209 		else _handleRoute = &this.handleRouteNoCatch;
210 	}
211 	
212 	override void onRead() {
213 		_handle();
214 	}
215 
216 	private void handleRouteCatch(ref HandleResult result, AsyncTCPConnection client, ServerRequest req, ServerResponse res) {
217 		try this.server.router.handle(this.server.options, result, client, req, res);
218 		catch(Exception) res.status = StatusCodes.internalServerError;
219 	}
220 
221 	private void handleRouteNoCatch(ref HandleResult result, AsyncTCPConnection client, ServerRequest req, ServerResponse res) {
222 		this.server.router.handle(this.server.options, result, client, req, res);
223 	}
224 
225 	void handle() {
226 		handleImpl(this.buffer.data!char);
227 	}
228 	
229 	protected void handleImpl(string data) {
230 		ServerRequest request = new ServerRequest();
231 		ServerResponse response = new ServerResponse();
232 		request.address = this.conn.local;
233 		response.headers["Server"] = this.server.options.name;
234 		HandleResult result;
235 		void delegate() send = {
236 			this.conn.send(cast(ubyte[])response.toString());
237 			auto connection = "connection" in response.headers;
238 			if(result.connection !is null) {
239 				_handle = &result.connection.onRead;
240 				result.connection.buffer = this.buffer;
241 				result.connection.onStart();
242 			} else if(connection is null || toLower(*connection) != "keep-alive") {
243 				this.conn.kill(true);
244 			}
245 		};
246 		if(request.parse(data)) {
247 			//TODO max request size
248 			if(auto connection = "connection" in request.headers) response.headers["Connection"] = *connection;
249 			_handleRoute(result, this.conn, request, response);
250 		} else {
251 			response.status = StatusCodes.badRequest;
252 		}
253 		if(response.status.code >= 400 && response.body_.length == 0) this.server.router.handleError(request, response);
254 		if(response.ready) send();
255 		else response.send = send;
256 	}
257 	
258 	override void onClose() {}
259 	
260 }
261 
262 class MultipartConnection : Connection {
263 	
264 	private size_t length;
265 	private Http req, res;
266 	void delegate() callback;
267 	
268 	this(AsyncTCPConnection conn, size_t length, Http req, Http res, void delegate() callback) {
269 		this.conn = conn;
270 		this.length = length;
271 		this.req = req;
272 		this.res = res;
273 		this.callback = callback;
274 	}
275 	
276 	override void onRead() {
277 		this.req.body_ = this.req.body_ ~ this.buffer.data!char.idup;
278 		import std.stdio;
279 		writeln("Body is not ", this.req.body_.length);
280 		if(this.req.body_.length >= this.length) {
281 			this.callback();
282 			this.conn.send(cast(ubyte[])res.toString());
283 			this.conn.kill(true);
284 		}
285 	}
286 	
287 	override void onClose() {}
288 	
289 }
290 
291 /**
292  * Base class for web socket clients.
293  */
294 class WebSocketConnection : Connection {
295 	
296 	void delegate() onStartImpl;
297 	
298 	this() {
299 		this.onStartImpl = {};
300 	}
301 	
302 	override void onStart() {
303 		this.onStartImpl();
304 	}
305 	
306 	override void onRead() {
307 		try if((this.buffer.read!ubyte() & 0b1111) == 1) {
308 			immutable info = this.buffer.read!ubyte();
309 			immutable masked = (info & 0b10000000) != 0;
310 			size_t length = info & 0b01111111;
311 			if(length == 0b01111110) {
312 				length = this.buffer.read!(Endian.bigEndian, ushort)();
313 			} else if(length == 0b01111111) {
314 				length = this.buffer.read!(Endian.bigEndian, ulong)() & size_t.max;
315 			}
316 			if(masked) {
317 				ubyte[] mask = this.buffer.read!(ubyte[])(4);
318 				ubyte[] data = this.buffer.read!(ubyte[])(length);
319 				foreach(i, ref ubyte p; data) {
320 					p ^= mask[i % 4];
321 				}
322 				this.onReceive(data);
323 			} else {
324 				this.onReceive(this.buffer.read!(ubyte[])(length));
325 			}
326 		} catch(BufferOverflowException) {}
327 	}
328 	
329 	/**
330 	 * Sends data to the connected web socket.
331 	 */
332 	void send(in void[] data) {
333 		this.buffer.reset();
334 		this.buffer.write!ubyte(0b10000001);
335 		if(data.length < 0b01111110) {
336 			this.buffer.write!ubyte(data.length & ubyte.max);
337 		} else if(data.length < ushort.max) {
338 			this.buffer.write!ubyte(0b01111110);
339 			this.buffer.write!(Endian.bigEndian, ushort)(data.length & ushort.max);
340 		} else {
341 			this.buffer.write!ubyte(0b01111111);
342 			this.buffer.write!(Endian.bigEndian, ulong)(data.length);
343 		}
344 		this.buffer.write(data);
345 		this.conn.send(this.buffer.data!ubyte);
346 	}
347 	
348 	/**
349 	 * Notifies that the client has sent some data.
350 	 */
351 	abstract void onReceive(ubyte[] data);
352 	
353 	/**
354 	 * Notifies that the connection has been interrupted.
355 	 */
356 	override abstract void onClose();
357 	
358 }