1 module draklib.server.raknetserver; 2 import draklib.core; 3 import draklib.logging; 4 import draklib.util; 5 import draklib.server.socket; 6 import draklib.server.session; 7 import draklib.server.serverinterface; 8 import draklib.protocol.unconnected; 9 10 import core.thread; 11 12 import std.exception; 13 import std.concurrency; 14 import std.datetime; 15 import std.conv; 16 import std.socket : Address; 17 18 struct ServerOptions { 19 string serverIdent; 20 uint sendBufferSize = 4096; 21 uint recvBufferSize = 4096; 22 23 /// The Amount of time with no packets recieved needed to disconnect 24 /// a client due to timeout. 25 uint timeoutThreshold = 7000; 26 27 uint recvPacketsPerTick = 250; 28 uint sendPacketsPerTick = 250; 29 30 bool warnOnCantKeepUp = true; 31 32 long serverGUID = -1; 33 } 34 35 class RakNetServer { 36 shared static uint INSTANCES = 0; 37 package const Logger logger; 38 package Tid controller; 39 package ServerSocket socket; 40 package ServerOptions options; 41 42 package Session[string] sessions; 43 package ulong[string] blacklist; 44 45 private shared bool running = false; 46 private shared bool crashed = false; 47 private shared ulong currentTick; 48 49 this(Tid controller, in Logger logger, ushort bindPort, string bindIp = "0.0.0.0", ServerOptions options = ServerOptions()) { 50 this.logger = logger; 51 this.controller = controller; 52 this.options = options; 53 socket = new ServerSocket(logger, bindIp, bindPort); 54 55 if(options.serverGUID == -1) { 56 import std.random; 57 this.options.serverGUID = uniform(0L, long.max); 58 } 59 } 60 61 void start() { 62 enforce(!running, new InvalidOperationException("Attempted to start server while already running!")); 63 running = true; 64 run(); 65 } 66 67 void stop() { 68 enforce(running, new InvalidOperationException("Attempted to stop server that is not running!")); 69 running = false; 70 } 71 72 private void run() { 73 Thread.getThis().name = "RakNetServer #" ~ to!string(INSTANCES++); 74 logger.logDebug("Starting DRakLib server on " ~ socket.getBindAddress().toString()); 75 76 socket.bind(); 77 78 long elapsed; 79 StopWatch sw = StopWatch(); 80 while(running) { 81 currentTick++; 82 sw.reset(); 83 sw.start(); 84 try{ 85 doTick(); 86 } catch(Exception e) { 87 logger.logError("FATAL! Exception in tick!"); 88 logger.logTrace(e.toString()); 89 90 running = false; 91 crashed = true; 92 break; 93 } 94 sw.stop(); 95 elapsed = sw.peek().msecs(); 96 if(elapsed > 50) { 97 if(options.warnOnCantKeepUp) logger.logWarn("Can't keep up! (" ~ to!string(elapsed) ~ " > 50) Did the system time change or is the server overloaded?"); 98 } else { 99 Thread.sleep(dur!("msecs")(50 - elapsed)); 100 } 101 } 102 103 send(controller, ServerStoppedMessage(hasCrashed)); 104 } 105 106 private void doTick() { 107 uint max = options.recvPacketsPerTick; 108 Address a; 109 byte[] data = new byte[1024 * 1024]; 110 while(max-- > 0 && socket.recv(a, data)) { 111 handlePacket(a, data); 112 data = new byte[1024 * 1024]; 113 } 114 115 foreach(session; sessions) { 116 session.update(); 117 } 118 119 max = options.sendPacketsPerTick; 120 while(max-- > 0 && receiveTimeout(dur!("msecs")(1), &this.onStopServerMessage, &this.onSendPacketMessage)) { 121 122 } 123 } 124 125 private void handlePacket(ref Address address, ref byte[] data) { 126 if(address.toString() in blacklist) { 127 if(blacklist[address.toString()] <= currentTick) { 128 blacklist.remove(address.toString()); 129 } else return; 130 } 131 switch(data[0]) { 132 case RakNetInfo.UNCONNECTED_PING_1: 133 UnconnectedPingPacket1 ping1 = new UnconnectedPingPacket1(); 134 ping1.decode(data); 135 136 UnconnectedPongPacket pong1 = new UnconnectedPongPacket(); 137 pong1.serverGUID = options.serverGUID; 138 pong1.serverInfo = options.serverIdent; 139 pong1.time = ping1.time; 140 byte[] buffer; 141 pong1.encode(buffer); 142 sendPacket(address, buffer); 143 break; 144 145 case RakNetInfo.UNCONNECTED_PING_2: 146 UnconnectedPingPacket2 ping2 = new UnconnectedPingPacket2(); 147 ping2.decode(data); 148 149 //AdvertiseSystemPacket pong2 = new AdvertiseSystemPacket(); 150 UnconnectedPongPacket pong2 = new UnconnectedPongPacket(); 151 pong2.serverGUID = options.serverGUID; 152 pong2.serverInfo = options.serverIdent; 153 pong2.time = ping2.time; 154 byte[] buffer; 155 pong2.encode(buffer); 156 sendPacket(address, buffer); 157 break; 158 159 default: 160 import std.array; 161 162 Session session = sessions.get(address.toString(), null); 163 if(session is null) { 164 string ip = split(address.toString(), ":")[0]; 165 ushort port = to!ushort(split(address.toString(), ":")[1]); 166 session = new Session(this, ip, port); 167 168 logger.logDebug("Session " ~ session.getIdentifier() ~ " created"); 169 sessions[session.getIdentifier()] = session; 170 } 171 session.handlePacket(data); 172 break; 173 } 174 } 175 176 package void sendPacket(Address sendTo, in byte[] data) { 177 socket.send(sendTo, data); 178 } 179 180 // Message handlers 181 182 package void onSessionOpen(Session session, long clientID) { 183 send(controller, SessionOpenMessage(session.getIpAddress(), session.getPort(), session.getClientGUID())); 184 } 185 186 package void onSessionClose(Session session, in string reason = null) { 187 if(!(session.getIdentifier() in sessions)) return; 188 sessions.remove(session.getIdentifier()); 189 if(reason !is null) { 190 logger.logDebug("Session " ~ session.getIdentifier() ~ " closed: " ~ reason); 191 send(controller, SessionCloseMessage(session.getIpAddress(), session.getPort(), reason)); 192 return; 193 } 194 send(controller, SessionCloseMessage(session.getIpAddress(), session.getPort(), "unknown")); 195 } 196 197 package void onSessionReceivePacket(Session session, shared byte[] buffer) { 198 send(controller, SessionReceivePacketMessage(session.getIpAddress(), session.getPort(), buffer)); 199 } 200 201 package void onStopServerMessage(StopServerMessage m) { 202 stop(); 203 } 204 205 package void onSendPacketMessage(SendPacketMessage m) { 206 import draklib.protocol.reliability; 207 208 string ident = m.ip ~ ":" ~ to!string(m.port); 209 if(ident in sessions) { 210 EncapsulatedPacket ep = new EncapsulatedPacket(); 211 ep.reliability = m.reliability; 212 ep.payload = cast(byte[]) m.payload; 213 sessions[ident].addToQueue(ep, m.immediate); 214 } 215 } 216 217 public void addToBlacklist(string identifier, ulong ticks) { 218 if(identifier in blacklist) return; 219 blacklist[identifier] = currentTick + ticks; 220 debug logger.logDebug("Added " ~ identifier ~ " to blacklist until tick: " ~ to!string(blacklist[identifier])); 221 } 222 223 public void removeFromBlacklist(string identifier) { 224 if(identifier in blacklist) { 225 blacklist.remove(identifier); 226 } 227 } 228 229 public bool isRunning() { 230 return running; 231 } 232 233 public bool hasCrashed() { 234 return crashed; 235 } 236 }