1 module lighttp.server;
2 
3 import std.string : toLower;
4 import std.system : Endian;
5 
6 import libasync;
7 
8 import xbuffer;
9 import xbuffer.memory : xalloc, xfree;
10 
11 import lighttp.router;
12 import lighttp.util;
13 
14 /**
15  * Options to define how the server behaves.
16  */
17 struct ServerOptions {
18 
19 	/**
20 	 * Name of the server set as value in the `Server` header field
21 	 * and displayed in lighttp's default error messages.
22 	 */
23 	string name = "lighttp/0.5";
24 
25 	/**
26 	 * Indicates whether the handler should catch exceptions.
27 	 * If set to true the server will return a `500 Internal Server Error`
28 	 * upon catching an exception.
29 	 */
30 	bool handleExceptions = true;
31 
32 	/**
33 	 * Indicates the maximum size for a payload. If the header
34 	 * `Content-Length` sent by the client exceeds the indicated
35 	 * length the server will return a `413 Payload too Large`.
36 	 */
37 	size_t max = size_t.max;
38 
39 }
40 
41 /**
42  * Base class for servers.
43  */
44 abstract class ServerBase {
45 	
46 	private ServerOptions _options;
47 	private EventLoop _eventLoop;
48 	private Router _router;
49 	
50 	this(EventLoop eventLoop, ServerOptions options=ServerOptions.init) {
51 		_options = options;
52 		_eventLoop = eventLoop;
53 		_router = new Router();
54 	}
55 	
56 	this(ServerOptions options=ServerOptions.init) {
57 		this(getThreadEventLoop(), options);
58 	}
59 
60 	/**
61 	 * Gets the server's options.
62 	 */
63 	@property ServerOptions options() pure nothrow @safe @nogc {
64 		return _options;
65 	}
66 	
67 	/**
68 	 * Gets the server's event loop. It should be used to
69 	 * run the server.
70 	 * Example:
71 	 * ---
72 	 * auto server = new Server();
73 	 * server.host("0.0.0.0");
74 	 * while(true) server.eventLoop.loop();
75 	 * ---
76 	 */
77 	@property EventLoop eventLoop() pure nothrow @safe @nogc {
78 		return _eventLoop;
79 	}
80 	
81 	/**
82 	 * Gets the server's router.
83 	 */
84 	@property Router router() pure nothrow @safe @nogc {
85 		return _router;
86 	}
87 	
88 	/**
89 	 * Gets the server's default port.
90 	 */
91 	abstract @property ushort defaultPort() pure nothrow @safe @nogc;
92 	
93 	/**
94 	 * Binds the server to the given address.
95 	 * Example:
96 	 * ---
97 	 * server.host("0.0.0.0");
98 	 * server.host("::1", 8080);
99 	 * ---
100 	 */
101 	void host(string ip, ushort port) {
102 		auto listener = new AsyncTCPListener(this.eventLoop);
103 		listener.host(ip, port);
104 		listener.run(&this.handler);
105 	}
106 	
107 	/// ditto
108 	void host(string ip) {
109 		return this.host(ip, this.defaultPort);
110 	}
111 	
112 	/**
113 	 * Calls eventLoop.loop until the given condition
114 	 * is true.
115 	 */
116 	void run(bool delegate() condition) {
117 		while(condition()) this.eventLoop.loop();
118 	}
119 	
120 	/**
121 	 * Calls eventLoop.loop in an infinite loop.
122 	 */
123 	void run() {
124 		while(true) this.eventLoop.loop();
125 	}
126 	
127 	abstract void delegate(TCPEvent) handler(AsyncTCPConnection conn);
128 	
129 }
130 
131 class ServerImpl(T:Connection, ushort _port) : ServerBase {
132 	
133 	this(E...)(E args) { //TODO remove when default constructors are implemented
134 		super(args);
135 	}
136 	
137 	override @property ushort defaultPort() {
138 		return _port;
139 	}
140 	
141 	override void delegate(TCPEvent) handler(AsyncTCPConnection conn) {
142 		Connection ret = new T(this, conn);
143 		return &ret.handle;
144 	}
145 	
146 }
147 
148 /**
149  * Default HTTP server.
150  * Example:
151  * ---
152  * auto server = new Server();
153  * server.host("0.0.0.0");
154  * server.loop();
155  * ---
156  */
157 alias Server = ServerImpl!(DefaultConnection, 80);
158 
159 private ubyte[] __buffer = new ubyte[2 ^^ 24]; // 16 mb
160 
161 class Connection {
162 	
163 	AsyncTCPConnection conn;
164 	
165 	protected Buffer buffer;
166 	
167 	void onStart() {}
168 
169 	protected bool log=false;
170 	
171 	final void handle(TCPEvent event) {
172 		switch(event) with(TCPEvent) {
173 			case READ:
174 				this.buffer.reset();
175 				while(true) {
176 					auto len = this.conn.recv(__buffer);
177 					if(len > 0) this.buffer.write(__buffer[0..len]);
178 					if(len < __buffer.length) break;
179 				}
180 				this.onRead();
181 				break;
182 			case CLOSE:
183 				this.onClose();
184 				break;
185 			default:
186 				break;
187 		}
188 	}
189 	
190 	abstract void onRead();
191 	
192 	abstract void onClose();
193 	
194 }
195 
196 class DefaultConnection : Connection {
197 	
198 	private ServerBase server;
199 
200 	private void delegate() _handle;
201 	private void delegate(ref HandleResult, AsyncTCPConnection, ServerRequest, ServerResponse) _handleRoute;
202 	
203 	this(ServerBase server, AsyncTCPConnection conn) {
204 		this.buffer = new Buffer(4096);
205 		this.server = server;
206 		this.conn = conn;
207 		_handle = &this.handle;
208 		if(this.server.options.handleExceptions) _handleRoute = &this.handleRouteCatch;
209 		else _handleRoute = &this.handleRouteNoCatch;
210 	}
211 	
212 	override void onRead() {
213 		_handle();
214 	}
215 
216 	private void handleRouteCatch(ref HandleResult result, AsyncTCPConnection client, ServerRequest req, ServerResponse res) {
217 		try this.server.router.handle(this.server.options, result, client, req, res);
218 		catch(Exception) res.status = StatusCodes.internalServerError;
219 	}
220 
221 	private void handleRouteNoCatch(ref HandleResult result, AsyncTCPConnection client, ServerRequest req, ServerResponse res) {
222 		this.server.router.handle(this.server.options, result, client, req, res);
223 	}
224 
225 	void handle() {
226 		handleImpl(this.buffer.data!char);
227 	}
228 	
229 	protected void handleImpl(string data) {
230 		ServerRequest request = new ServerRequest();
231 		ServerResponse response = new ServerResponse();
232 		request.address = this.conn.local;
233 		response.headers["Server"] = this.server.options.name;
234 		HandleResult result;
235 		if(request.parse(data)) {
236 			//TODO max request size
237 			if(auto connection = "connection" in request.headers) response.headers["Connection"] = *connection;
238 			_handleRoute(result, this.conn, request, response);
239 		} else {
240 			response.status = StatusCodes.badRequest;
241 		}
242 		if(response.status.code >= 400 && response.body_.length == 0) this.server.router.handleError(request, response);
243 		if(response.ready) this.conn.send(cast(ubyte[])response.toString());
244 		auto connection = "connection" in response.headers;
245 		if(result.connection !is null) {
246 			_handle = &result.connection.onRead;
247 			result.connection.buffer = this.buffer;
248 			result.connection.onStart();
249 		} else if(connection is null || toLower(*connection) != "keep-alive") {
250 			this.conn.kill();
251 		}
252 	}
253 	
254 	override void onClose() {}
255 	
256 }
257 
258 class MultipartConnection : Connection {
259 	
260 	private size_t length;
261 	private Http req, res;
262 	void delegate() callback;
263 	
264 	this(AsyncTCPConnection conn, size_t length, Http req, Http res, void delegate() callback) {
265 		this.conn = conn;
266 		this.length = length;
267 		this.req = req;
268 		this.res = res;
269 		this.callback = callback;
270 	}
271 	
272 	override void onRead() {
273 		this.req.body_ = this.req.body_ ~ this.buffer.data!char.idup;
274 		import std.stdio;
275 		writeln("Body is not ", this.req.body_.length);
276 		if(this.req.body_.length >= this.length) {
277 			this.callback();
278 			this.conn.send(cast(ubyte[])res.toString());
279 			this.conn.kill();
280 		}
281 	}
282 	
283 	override void onClose() {}
284 	
285 }
286 
287 /**
288  * Base class for web socket clients.
289  */
290 class WebSocketConnection : Connection {
291 	
292 	void delegate() onStartImpl;
293 	
294 	this() {
295 		this.onStartImpl = {};
296 	}
297 	
298 	override void onStart() {
299 		this.onStartImpl();
300 	}
301 	
302 	override void onRead() {
303 		try if((this.buffer.read!ubyte() & 0b1111) == 1) {
304 			immutable info = this.buffer.read!ubyte();
305 			immutable masked = (info & 0b10000000) != 0;
306 			size_t length = info & 0b01111111;
307 			if(length == 0b01111110) {
308 				length = this.buffer.read!(Endian.bigEndian, ushort)();
309 			} else if(length == 0b01111111) {
310 				length = this.buffer.read!(Endian.bigEndian, ulong)() & size_t.max;
311 			}
312 			if(masked) {
313 				ubyte[] mask = this.buffer.read!(ubyte[])(4);
314 				ubyte[] data = this.buffer.read!(ubyte[])(length);
315 				foreach(i, ref ubyte p; data) {
316 					p ^= mask[i % 4];
317 				}
318 				this.onReceive(data);
319 			} else {
320 				this.onReceive(this.buffer.read!(ubyte[])(length));
321 			}
322 		} catch(BufferOverflowException) {}
323 	}
324 	
325 	/**
326 	 * Sends data to the connected web socket.
327 	 */
328 	void send(in void[] data) {
329 		this.buffer.reset();
330 		this.buffer.write!ubyte(0b10000001);
331 		if(data.length < 0b01111110) {
332 			this.buffer.write!ubyte(data.length & ubyte.max);
333 		} else if(data.length < ushort.max) {
334 			this.buffer.write!ubyte(0b01111110);
335 			this.buffer.write!(Endian.bigEndian, ushort)(data.length & ushort.max);
336 		} else {
337 			this.buffer.write!ubyte(0b01111111);
338 			this.buffer.write!(Endian.bigEndian, ulong)(data.length);
339 		}
340 		this.buffer.write(data);
341 		this.conn.send(this.buffer.data!ubyte);
342 	}
343 	
344 	/**
345 	 * Notifies that the client has sent some data.
346 	 */
347 	abstract void onReceive(ubyte[] data);
348 	
349 	/**
350 	 * Notifies that the connection has been interrupted.
351 	 */
352 	override abstract void onClose();
353 	
354 }