1 module lighttp.server; 2 3 import std.string : toLower; 4 import std.system : Endian; 5 6 import libasync; 7 8 import xbuffer; 9 import xbuffer.memory : xalloc, xfree; 10 11 import lighttp.router; 12 import lighttp.util; 13 14 /** 15 * Options to define how the server behaves. 16 */ 17 struct ServerOptions { 18 19 /** 20 * Name of the server set as value in the `Server` header field 21 * and displayed in lighttp's default error messages. 22 */ 23 string name = "lighttp/0.5"; 24 25 /** 26 * Indicates whether the handler should catch exceptions. 27 * If set to true the server will return a `500 Internal Server Error` 28 * upon catching an exception. 29 */ 30 bool handleExceptions = true; 31 32 /** 33 * Indicates the maximum size for a payload. If the header 34 * `Content-Length` sent by the client exceeds the indicated 35 * length the server will return a `413 Payload too Large`. 36 */ 37 size_t max = size_t.max; 38 39 } 40 41 /** 42 * Base class for servers. 43 */ 44 abstract class ServerBase { 45 46 private ServerOptions _options; 47 private EventLoop _eventLoop; 48 private Router _router; 49 50 this(EventLoop eventLoop, ServerOptions options=ServerOptions.init) { 51 _options = options; 52 _eventLoop = eventLoop; 53 _router = new Router(); 54 } 55 56 this(ServerOptions options=ServerOptions.init) { 57 this(getThreadEventLoop(), options); 58 } 59 60 /** 61 * Gets the server's options. 62 */ 63 @property ServerOptions options() pure nothrow @safe @nogc { 64 return _options; 65 } 66 67 /** 68 * Gets the server's event loop. It should be used to 69 * run the server. 70 * Example: 71 * --- 72 * auto server = new Server(); 73 * server.host("0.0.0.0"); 74 * while(true) server.eventLoop.loop(); 75 * --- 76 */ 77 @property EventLoop eventLoop() pure nothrow @safe @nogc { 78 return _eventLoop; 79 } 80 81 /** 82 * Gets the server's router. 83 */ 84 @property Router router() pure nothrow @safe @nogc { 85 return _router; 86 } 87 88 /** 89 * Gets the server's default port. 90 */ 91 abstract @property ushort defaultPort() pure nothrow @safe @nogc; 92 93 /** 94 * Binds the server to the given address. 95 * Example: 96 * --- 97 * server.host("0.0.0.0"); 98 * server.host("::1", 8080); 99 * --- 100 */ 101 void host(string ip, ushort port) { 102 auto listener = new AsyncTCPListener(this.eventLoop); 103 listener.host(ip, port); 104 listener.run(&this.handler); 105 } 106 107 /// ditto 108 void host(string ip) { 109 return this.host(ip, this.defaultPort); 110 } 111 112 /** 113 * Calls eventLoop.loop until the given condition 114 * is true. 115 */ 116 void run(bool delegate() condition) { 117 while(condition()) this.eventLoop.loop(); 118 } 119 120 /** 121 * Calls eventLoop.loop in an infinite loop. 122 */ 123 void run() { 124 while(true) this.eventLoop.loop(); 125 } 126 127 abstract void delegate(TCPEvent) handler(AsyncTCPConnection conn); 128 129 } 130 131 class ServerImpl(T:Connection, ushort _port) : ServerBase { 132 133 this(E...)(E args) { //TODO remove when default constructors are implemented 134 super(args); 135 } 136 137 override @property ushort defaultPort() { 138 return _port; 139 } 140 141 override void delegate(TCPEvent) handler(AsyncTCPConnection conn) { 142 Connection ret = new T(this, conn); 143 return &ret.handle; 144 } 145 146 } 147 148 /** 149 * Default HTTP server. 150 * Example: 151 * --- 152 * auto server = new Server(); 153 * server.host("0.0.0.0"); 154 * server.loop(); 155 * --- 156 */ 157 alias Server = ServerImpl!(DefaultConnection, 80); 158 159 private ubyte[] __buffer = new ubyte[2 ^^ 24]; // 16 mb 160 161 class Connection { 162 163 AsyncTCPConnection conn; 164 165 protected Buffer buffer; 166 167 void onStart() {} 168 169 protected bool log=false; 170 171 final void handle(TCPEvent event) { 172 switch(event) with(TCPEvent) { 173 case READ: 174 this.buffer.reset(); 175 while(true) { 176 auto len = this.conn.recv(__buffer); 177 if(len > 0) this.buffer.write(__buffer[0..len]); 178 if(len < __buffer.length) break; 179 } 180 this.onRead(); 181 break; 182 case CLOSE: 183 this.onClose(); 184 break; 185 default: 186 break; 187 } 188 } 189 190 abstract void onRead(); 191 192 abstract void onClose(); 193 194 } 195 196 class DefaultConnection : Connection { 197 198 private ServerBase server; 199 200 private void delegate() _handle; 201 private void delegate(ref HandleResult, AsyncTCPConnection, ServerRequest, ServerResponse) _handleRoute; 202 203 this(ServerBase server, AsyncTCPConnection conn) { 204 this.buffer = new Buffer(4096); 205 this.server = server; 206 this.conn = conn; 207 _handle = &this.handle; 208 if(this.server.options.handleExceptions) _handleRoute = &this.handleRouteCatch; 209 else _handleRoute = &this.handleRouteNoCatch; 210 } 211 212 override void onRead() { 213 _handle(); 214 } 215 216 private void handleRouteCatch(ref HandleResult result, AsyncTCPConnection client, ServerRequest req, ServerResponse res) { 217 try this.server.router.handle(this.server.options, result, client, req, res); 218 catch(Exception) res.status = StatusCodes.internalServerError; 219 } 220 221 private void handleRouteNoCatch(ref HandleResult result, AsyncTCPConnection client, ServerRequest req, ServerResponse res) { 222 this.server.router.handle(this.server.options, result, client, req, res); 223 } 224 225 void handle() { 226 handleImpl(this.buffer.data!char); 227 } 228 229 protected void handleImpl(string data) { 230 ServerRequest request = new ServerRequest(); 231 ServerResponse response = new ServerResponse(); 232 request.address = this.conn.local; 233 response.headers["Server"] = this.server.options.name; 234 HandleResult result; 235 if(request.parse(data)) { 236 //TODO max request size 237 if(auto connection = "connection" in request.headers) response.headers["Connection"] = *connection; 238 _handleRoute(result, this.conn, request, response); 239 } else { 240 response.status = StatusCodes.badRequest; 241 } 242 if(response.status.code >= 400 && response.body_.length == 0) this.server.router.handleError(request, response); 243 if(response.ready) this.conn.send(cast(ubyte[])response.toString()); 244 auto connection = "connection" in response.headers; 245 if(result.connection !is null) { 246 _handle = &result.connection.onRead; 247 result.connection.buffer = this.buffer; 248 result.connection.onStart(); 249 } else if(connection is null || toLower(*connection) != "keep-alive") { 250 this.conn.kill(); 251 } 252 } 253 254 override void onClose() {} 255 256 } 257 258 class MultipartConnection : Connection { 259 260 private size_t length; 261 private Http req, res; 262 void delegate() callback; 263 264 this(AsyncTCPConnection conn, size_t length, Http req, Http res, void delegate() callback) { 265 this.conn = conn; 266 this.length = length; 267 this.req = req; 268 this.res = res; 269 this.callback = callback; 270 } 271 272 override void onRead() { 273 this.req.body_ = this.req.body_ ~ this.buffer.data!char.idup; 274 import std.stdio; 275 writeln("Body is not ", this.req.body_.length); 276 if(this.req.body_.length >= this.length) { 277 this.callback(); 278 this.conn.send(cast(ubyte[])res.toString()); 279 this.conn.kill(); 280 } 281 } 282 283 override void onClose() {} 284 285 } 286 287 /** 288 * Base class for web socket clients. 289 */ 290 class WebSocketConnection : Connection { 291 292 void delegate() onStartImpl; 293 294 this() { 295 this.onStartImpl = {}; 296 } 297 298 override void onStart() { 299 this.onStartImpl(); 300 } 301 302 override void onRead() { 303 try if((this.buffer.read!ubyte() & 0b1111) == 1) { 304 immutable info = this.buffer.read!ubyte(); 305 immutable masked = (info & 0b10000000) != 0; 306 size_t length = info & 0b01111111; 307 if(length == 0b01111110) { 308 length = this.buffer.read!(Endian.bigEndian, ushort)(); 309 } else if(length == 0b01111111) { 310 length = this.buffer.read!(Endian.bigEndian, ulong)() & size_t.max; 311 } 312 if(masked) { 313 ubyte[] mask = this.buffer.read!(ubyte[])(4); 314 ubyte[] data = this.buffer.read!(ubyte[])(length); 315 foreach(i, ref ubyte p; data) { 316 p ^= mask[i % 4]; 317 } 318 this.onReceive(data); 319 } else { 320 this.onReceive(this.buffer.read!(ubyte[])(length)); 321 } 322 } catch(BufferOverflowException) {} 323 } 324 325 /** 326 * Sends data to the connected web socket. 327 */ 328 void send(in void[] data) { 329 this.buffer.reset(); 330 this.buffer.write!ubyte(0b10000001); 331 if(data.length < 0b01111110) { 332 this.buffer.write!ubyte(data.length & ubyte.max); 333 } else if(data.length < ushort.max) { 334 this.buffer.write!ubyte(0b01111110); 335 this.buffer.write!(Endian.bigEndian, ushort)(data.length & ushort.max); 336 } else { 337 this.buffer.write!ubyte(0b01111111); 338 this.buffer.write!(Endian.bigEndian, ulong)(data.length); 339 } 340 this.buffer.write(data); 341 this.conn.send(this.buffer.data!ubyte); 342 } 343 344 /** 345 * Notifies that the client has sent some data. 346 */ 347 abstract void onReceive(ubyte[] data); 348 349 /** 350 * Notifies that the connection has been interrupted. 351 */ 352 override abstract void onClose(); 353 354 }