1 module draklib.server.raknetserver;
2 import draklib.core;
3 import draklib.logging;
4 import draklib.util;
5 import draklib.server.socket;
6 import draklib.server.session;
7 import draklib.server.serverinterface;
8 import draklib.protocol.unconnected;
9 
10 import core.thread;
11 
12 import std.exception;
13 import std.concurrency;
14 import std.datetime;
15 import std.conv;
16 import std.socket : Address;
17 
18 struct ServerOptions {
19 	string serverIdent;
20 	uint sendBufferSize = 4096;
21 	uint recvBufferSize = 4096;
22 
23 	/// The Amount of time with no packets recieved needed to disconnect
24 	/// a client due to timeout.
25 	uint timeoutThreshold = 7000;
26 	
27 	uint recvPacketsPerTick = 250;
28 	uint sendPacketsPerTick = 250;
29 
30 	bool warnOnCantKeepUp = true;
31 
32 	long serverGUID = -1;
33 }
34 
35 class RakNetServer {
36 	shared static uint INSTANCES = 0;
37 	package const Logger logger;
38 	package Tid controller;
39 	package ServerSocket socket;
40 	package ServerOptions options;
41 
42 	package Session[string] sessions;
43 	package ulong[string] blacklist;
44 
45 	private shared bool running = false;
46 	private shared bool crashed = false;
47 	private shared ulong currentTick;
48 
49 	this(Tid controller, in Logger logger, ushort bindPort, string bindIp = "0.0.0.0", ServerOptions options = ServerOptions()) {
50 		this.logger = logger;
51 		this.controller = controller;
52 		this.options = options;
53 		socket = new ServerSocket(logger, bindIp, bindPort);
54 
55 		if(options.serverGUID == -1) {
56 			import std.random;
57 			this.options.serverGUID = uniform(0L, long.max);
58 		}
59 	}
60 
61 	void start() {
62 		enforce(!running, new InvalidOperationException("Attempted to start server while already running!"));
63 		running = true;
64 		run();
65 	}
66 
67 	void stop() {
68 		enforce(running, new InvalidOperationException("Attempted to stop server that is not running!"));
69 		running = false;
70 	}
71 
72 	private void run() {
73 		Thread.getThis().name = "RakNetServer #" ~ to!string(INSTANCES++);
74 		logger.logDebug("Starting DRakLib server on " ~ socket.getBindAddress().toString());
75 
76 		socket.bind();
77 
78 		long elapsed;
79 		StopWatch sw = StopWatch();
80 		while(running) {
81 			currentTick++;
82 			sw.reset();
83 			sw.start();
84 			try{
85 				doTick();
86 			} catch(Exception e) {
87 				logger.logError("FATAL! Exception in tick!");
88 				logger.logTrace(e.toString());
89 
90 				running = false;
91 				crashed = true;
92 				break;
93 			}
94 			sw.stop();
95 			elapsed = sw.peek().msecs();
96 			if(elapsed > 50) {
97 				if(options.warnOnCantKeepUp) logger.logWarn("Can't keep up! (" ~ to!string(elapsed) ~ " > 50) Did the system time change or is the server overloaded?");
98 			} else {
99 				Thread.sleep(dur!("msecs")(50 - elapsed));
100 			}
101 		}
102 
103 		send(controller, ServerStoppedMessage(hasCrashed));
104 	}
105 
106 	private void doTick() {
107 		uint max = options.recvPacketsPerTick;
108 		Address a;
109 		byte[] data = new byte[1024 * 1024];
110 		while(max-- > 0 && socket.recv(a, data)) {
111 			handlePacket(a, data);
112 			data = new byte[1024 * 1024];
113 		}
114 
115 		foreach(session; sessions) {
116 			session.update();
117 		}
118 
119 		max = options.sendPacketsPerTick;
120 		while(max-- > 0 && receiveTimeout(dur!("msecs")(1), &this.onStopServerMessage, &this.onSendPacketMessage)) {
121 			
122 		}
123 	}
124 
125 	private void handlePacket(ref Address address, ref byte[] data) {
126 		if(address.toString() in blacklist) {
127 			if(blacklist[address.toString()] <= currentTick) {
128 				blacklist.remove(address.toString());
129 			} else return;
130 		}
131 		switch(data[0]) {
132 			case RakNetInfo.UNCONNECTED_PING_1:
133 				UnconnectedPingPacket1 ping1 = new UnconnectedPingPacket1();
134 				ping1.decode(data);
135 
136 				UnconnectedPongPacket pong1 = new UnconnectedPongPacket();
137 				pong1.serverGUID = options.serverGUID;
138 				pong1.serverInfo = options.serverIdent;
139 				pong1.time = ping1.time;
140 				byte[] buffer;
141 				pong1.encode(buffer);
142 				sendPacket(address, buffer);
143 				break;
144 			
145 			case RakNetInfo.UNCONNECTED_PING_2:
146 				UnconnectedPingPacket2 ping2 = new UnconnectedPingPacket2();
147 				ping2.decode(data);
148 
149 				//AdvertiseSystemPacket pong2 = new AdvertiseSystemPacket();
150 				UnconnectedPongPacket pong2 = new UnconnectedPongPacket();
151 				pong2.serverGUID = options.serverGUID;
152 				pong2.serverInfo = options.serverIdent;
153 				pong2.time = ping2.time;
154 				byte[] buffer;
155 				pong2.encode(buffer);
156 				sendPacket(address, buffer);
157 				break;
158 
159 			default:
160 				import std.array;
161 
162 				Session session = sessions.get(address.toString(), null);
163 				if(session is null) {
164 					string ip = split(address.toString(), ":")[0];
165 					ushort port = to!ushort(split(address.toString(), ":")[1]);
166 					session = new Session(this, ip, port);
167 
168 					logger.logDebug("Session " ~ session.getIdentifier() ~ " created");
169 					sessions[session.getIdentifier()] = session;
170 				}
171 				session.handlePacket(data);
172 				break;
173 		}
174 	}
175 
176 	package void sendPacket(Address sendTo, in byte[] data) {
177 		socket.send(sendTo, data);
178 	}
179 
180 	// Message handlers
181 
182 	package void onSessionOpen(Session session, long clientID) {
183 		send(controller, SessionOpenMessage(session.getIpAddress(), session.getPort(), session.getClientGUID()));
184 	}
185 
186 	package void onSessionClose(Session session, in string reason = null) {
187 		if(!(session.getIdentifier() in sessions)) return;
188 		sessions.remove(session.getIdentifier());
189 		if(reason !is null) {
190 			logger.logDebug("Session " ~ session.getIdentifier() ~ " closed: " ~ reason);
191 			send(controller, SessionCloseMessage(session.getIpAddress(), session.getPort(), reason));
192 			return;
193 		}
194 		send(controller, SessionCloseMessage(session.getIpAddress(), session.getPort(), "unknown"));
195 	}
196 
197 	package void onSessionReceivePacket(Session session, shared byte[] buffer) {
198 		send(controller, SessionReceivePacketMessage(session.getIpAddress(), session.getPort(), buffer));
199 	}
200 
201 	package void onStopServerMessage(StopServerMessage m) {
202 		stop();
203 	}
204 
205 	package void onSendPacketMessage(SendPacketMessage m) {
206 		import draklib.protocol.reliability;
207 
208 		string ident = m.ip ~ ":" ~ to!string(m.port);
209 		if(ident in sessions) {
210 			EncapsulatedPacket ep = new EncapsulatedPacket();
211 			ep.reliability = m.reliability;
212 			ep.payload = cast(byte[]) m.payload;
213 			sessions[ident].addToQueue(ep, m.immediate);
214 		}
215 	}
216 
217 	public void addToBlacklist(string identifier, ulong ticks) {
218 		if(identifier in blacklist) return;
219 		blacklist[identifier] = currentTick + ticks;
220 		debug logger.logDebug("Added " ~ identifier ~ " to blacklist until tick: " ~ to!string(blacklist[identifier]));
221 	}
222 
223 	public void removeFromBlacklist(string identifier) {
224 		if(identifier in blacklist) {
225 			blacklist.remove(identifier);
226 		}
227 	}
228 
229 	public bool isRunning() {
230 		return running;
231 	}
232 
233 	public bool hasCrashed() {
234 		return crashed;
235 	}
236 }