1 module lighttp.server.server; 2 3 import std.string : toLower; 4 import std.system : Endian; 5 6 import libasync; 7 8 import xbuffer : Buffer, BufferOverflowException; 9 10 import lighttp.server.router; 11 import lighttp.util; 12 13 /** 14 * Options to define how the server behaves. 15 */ 16 struct ServerOptions { 17 18 /** 19 * Name of the server set as value in the `Server` header field 20 * and displayed in lighttp's default error messages. 21 */ 22 string name = "lighttp/0.6"; 23 24 /** 25 * Indicates whether the handler should catch exceptions. 26 * If set to true the server will return a `500 Internal Server Error` 27 * upon catching an exception. 28 */ 29 bool handleExceptions = true; 30 31 /** 32 * Indicates the maximum size for a payload. If the header 33 * `Content-Length` sent by the client exceeds the indicated 34 * length the server will return a `413 Payload too Large`. 35 */ 36 size_t max = size_t.max; 37 38 } 39 40 /** 41 * Base class for servers. 42 */ 43 abstract class ServerBase { 44 45 private ServerOptions _options; 46 private EventLoop _eventLoop; 47 private Router _router; 48 49 this(EventLoop eventLoop, ServerOptions options=ServerOptions.init) { 50 _options = options; 51 _eventLoop = eventLoop; 52 _router = new Router(); 53 } 54 55 this(ServerOptions options=ServerOptions.init) { 56 this(getThreadEventLoop(), options); 57 } 58 59 /** 60 * Gets the server's options. 61 */ 62 @property ServerOptions options() pure nothrow @safe @nogc { 63 return _options; 64 } 65 66 /** 67 * Gets the server's event loop. It should be used to 68 * run the server. 69 * Example: 70 * --- 71 * auto server = new Server(); 72 * server.host("0.0.0.0"); 73 * while(true) server.eventLoop.loop(); 74 * --- 75 */ 76 @property EventLoop eventLoop() pure nothrow @safe @nogc { 77 return _eventLoop; 78 } 79 80 /** 81 * Gets the server's router. 82 */ 83 @property Router router() pure nothrow @safe @nogc { 84 return _router; 85 } 86 87 /** 88 * Gets the server's default port. 89 */ 90 abstract @property ushort defaultPort() pure nothrow @safe @nogc; 91 92 /** 93 * Binds the server to the given address. 94 * Example: 95 * --- 96 * server.host("0.0.0.0"); 97 * server.host("::1", 8080); 98 * --- 99 */ 100 void host(string ip, ushort port) { 101 auto listener = new AsyncTCPListener(this.eventLoop); 102 listener.host(ip, port); 103 listener.run(&this.handler); 104 } 105 106 /// ditto 107 void host(string ip) { 108 return this.host(ip, this.defaultPort); 109 } 110 111 /** 112 * Calls eventLoop.loop until the given condition 113 * is true. 114 */ 115 void run(bool delegate() condition) { 116 while(condition()) this.eventLoop.loop(); 117 } 118 119 /** 120 * Calls eventLoop.loop in an infinite loop. 121 */ 122 void run() { 123 while(true) this.eventLoop.loop(); 124 } 125 126 abstract void delegate(TCPEvent) handler(AsyncTCPConnection conn); 127 128 } 129 130 class ServerImpl(T:Connection, ushort _port) : ServerBase { 131 132 this(E...)(E args) { //TODO remove when default constructors are implemented 133 super(args); 134 } 135 136 override @property ushort defaultPort() { 137 return _port; 138 } 139 140 override void delegate(TCPEvent) handler(AsyncTCPConnection conn) { 141 Connection ret = new T(this, conn); 142 return &ret.handle; 143 } 144 145 } 146 147 /** 148 * Default HTTP server. 149 * Example: 150 * --- 151 * auto server = new Server(); 152 * server.host("0.0.0.0"); 153 * server.loop(); 154 * --- 155 */ 156 alias Server = ServerImpl!(DefaultConnection, 80); 157 158 private ubyte[] __buffer = new ubyte[2 ^^ 24]; // 16 mb 159 160 class Connection { 161 162 AsyncTCPConnection conn; 163 164 protected Buffer buffer; 165 166 void onStart() {} 167 168 protected bool log=false; 169 170 final void handle(TCPEvent event) { 171 switch(event) with(TCPEvent) { 172 case READ: 173 this.buffer.reset(); 174 while(true) { 175 auto len = this.conn.recv(__buffer); 176 if(len > 0) this.buffer.write(__buffer[0..len]); 177 if(len < __buffer.length) break; 178 } 179 if (this.buffer.data!void.length > 0) 180 this.onRead(); 181 break; 182 case CLOSE: 183 this.onClose(); 184 break; 185 default: 186 break; 187 } 188 } 189 190 abstract void onRead(); 191 192 abstract void onClose(); 193 194 } 195 196 class DefaultConnection : Connection { 197 198 private ServerBase server; 199 200 private void delegate() _handle; 201 private void delegate(ref HandleResult, AsyncTCPConnection, ServerRequest, ServerResponse) _handleRoute; 202 203 this(ServerBase server, AsyncTCPConnection conn) { 204 this.buffer = new Buffer(4096); 205 this.server = server; 206 this.conn = conn; 207 _handle = &this.handle; 208 if(this.server.options.handleExceptions) _handleRoute = &this.handleRouteCatch; 209 else _handleRoute = &this.handleRouteNoCatch; 210 } 211 212 override void onRead() { 213 _handle(); 214 } 215 216 private void handleRouteCatch(ref HandleResult result, AsyncTCPConnection client, ServerRequest req, ServerResponse res) { 217 try this.server.router.handle(this.server.options, result, client, req, res); 218 catch(Exception) res.status = StatusCodes.internalServerError; 219 } 220 221 private void handleRouteNoCatch(ref HandleResult result, AsyncTCPConnection client, ServerRequest req, ServerResponse res) { 222 this.server.router.handle(this.server.options, result, client, req, res); 223 } 224 225 void handle() { 226 handleImpl(this.buffer.data!char); 227 } 228 229 protected void handleImpl(string data) { 230 ServerRequest request = new ServerRequest(); 231 ServerResponse response = new ServerResponse(); 232 request.address = this.conn.local; 233 response.headers["Server"] = this.server.options.name; 234 HandleResult result; 235 void delegate() send = { 236 this.conn.send(cast(ubyte[])response.toString()); 237 auto connection = "connection" in response.headers; 238 if(result.connection !is null) { 239 _handle = &result.connection.onRead; 240 result.connection.buffer = this.buffer; 241 result.connection.onStart(); 242 } else if(connection is null || toLower(*connection) != "keep-alive") { 243 this.conn.kill(); 244 } 245 }; 246 if(request.parse(data)) { 247 //TODO max request size 248 if(auto connection = "connection" in request.headers) response.headers["Connection"] = *connection; 249 _handleRoute(result, this.conn, request, response); 250 } else { 251 response.status = StatusCodes.badRequest; 252 } 253 if(response.status.code >= 400 && response.body_.length == 0) this.server.router.handleError(request, response); 254 if(response.ready) send(); 255 else response.send = send; 256 } 257 258 override void onClose() {} 259 260 } 261 262 class MultipartConnection : Connection { 263 264 private size_t length; 265 private Http req, res; 266 void delegate() callback; 267 268 this(AsyncTCPConnection conn, size_t length, Http req, Http res, void delegate() callback) { 269 this.conn = conn; 270 this.length = length; 271 this.req = req; 272 this.res = res; 273 this.callback = callback; 274 } 275 276 override void onRead() { 277 this.req.body_ = this.req.body_ ~ this.buffer.data!char.idup; 278 import std.stdio; 279 writeln("Body is not ", this.req.body_.length); 280 if(this.req.body_.length >= this.length) { 281 this.callback(); 282 this.conn.send(cast(ubyte[])res.toString()); 283 this.conn.kill(); 284 } 285 } 286 287 override void onClose() {} 288 289 } 290 291 /** 292 * Base class for web socket clients. 293 */ 294 class WebSocketConnection : Connection { 295 296 void delegate() onStartImpl; 297 298 this() { 299 this.onStartImpl = {}; 300 } 301 302 override void onStart() { 303 this.onStartImpl(); 304 } 305 306 override void onRead() { 307 try if((this.buffer.read!ubyte() & 0b1111) == 1) { 308 immutable info = this.buffer.read!ubyte(); 309 immutable masked = (info & 0b10000000) != 0; 310 size_t length = info & 0b01111111; 311 if(length == 0b01111110) { 312 length = this.buffer.read!(Endian.bigEndian, ushort)(); 313 } else if(length == 0b01111111) { 314 length = this.buffer.read!(Endian.bigEndian, ulong)() & size_t.max; 315 } 316 if(masked) { 317 ubyte[] mask = this.buffer.read!(ubyte[])(4); 318 ubyte[] data = this.buffer.read!(ubyte[])(length); 319 foreach(i, ref ubyte p; data) { 320 p ^= mask[i % 4]; 321 } 322 this.onReceive(data); 323 } else { 324 this.onReceive(this.buffer.read!(ubyte[])(length)); 325 } 326 } catch(BufferOverflowException) {} 327 } 328 329 /** 330 * Sends data to the connected web socket. 331 */ 332 void send(in void[] data) { 333 this.buffer.reset(); 334 this.buffer.write!ubyte(0b10000001); 335 if(data.length < 0b01111110) { 336 this.buffer.write!ubyte(data.length & ubyte.max); 337 } else if(data.length < ushort.max) { 338 this.buffer.write!ubyte(0b01111110); 339 this.buffer.write!(Endian.bigEndian, ushort)(data.length & ushort.max); 340 } else { 341 this.buffer.write!ubyte(0b01111111); 342 this.buffer.write!(Endian.bigEndian, ulong)(data.length); 343 } 344 this.buffer.write(data); 345 this.conn.send(this.buffer.data!ubyte); 346 } 347 348 /** 349 * Notifies that the client has sent some data. 350 */ 351 abstract void onReceive(ubyte[] data); 352 353 /** 354 * Notifies that the connection has been interrupted. 355 */ 356 override abstract void onClose(); 357 358 }