1 /// Connect to a MySQL/MariaDB server.
2 module mysql.connection;
3 
4 import std.algorithm;
5 import std.conv;
6 import std.digest.sha;
7 import std.exception;
8 import std.socket;
9 import std.string;
10 
11 import mysql.commands;
12 import mysql.exceptions;
13 import mysql.protocol.constants;
14 import mysql.protocol.packets;
15 import mysql.protocol.sockets;
16 import mysql.result;
17 debug(MYSQL_INTEGRATION_TESTS)
18 {
19 	import mysql.test.common;
20 }
21 
22 version(Have_vibe_d_core)
23 {
24 	static if(__traits(compiles, (){ import vibe.core.net; } ))
25 		import vibe.core.net;
26 	else
27 		static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'.");
28 }
29 
30 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection.
31 immutable SvrCapFlags defaultClientFlags =
32 		SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS |
33 		SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 |
34 		SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS |
35 		//SvrCapFlags.MULTI_RESULTS;
36 
37 /++
38 A class representing a database connection.
39 
40 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
41 creating a new Connection directly. That will provide certain benefits,
42 such as reusing old connections and automatic cleanup (no need to close
43 the connection when done).
44 
45 ------------------
46 // Suggested usage:
47 
48 {
49 	auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
50 	scope(exit) con.close();
51 
52 	// Use the connection
53 	...
54 }
55 ------------------
56 +/
57 class Connection
58 {
59 /+
60 The Connection is responsible for handshaking with the server to establish
61 authentication. It then passes client preferences to the server, and
62 subsequently is the channel for all command packets that are sent, and all
63 response packets received.
64 
65 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one
66 byte as a packet number. Connection deals with the headers and ensures that
67 packet numbers are sequential.
68 
69 The initial packet is sent by the server - essentially a 'hello' packet
70 inviting login. That packet has a sequence number of zero. That sequence
71 number is the incremented by client and server packets through the handshake
72 sequence.
73 
74 After login all further sequences are initialized by the client sending a
75 command packet with a zero sequence number, to which the server replies with
76 zero or more packets with sequential sequence numbers.
77 +/
78 package:
79 	enum OpenState
80 	{
81 		/// We have not yet connected to the server, or have sent QUIT to the
82 		/// server and closed the connection
83 		notConnected,
84 		/// We have connected to the server and parsed the greeting, but not
85 		/// yet authenticated
86 		connected,
87 		/// We have successfully authenticated against the server, and need to
88 		/// send QUIT to the server when closing the connection
89 		authenticated
90 	}
91 	OpenState   _open;
92 	MySQLSocket _socket;
93 
94 	SvrCapFlags _sCaps, _cCaps;
95 	uint    _sThread;
96 	ushort  _serverStatus;
97 	ubyte   _sCharSet, _protocol;
98 	string  _serverVersion;
99 
100 	string _host, _user, _pwd, _db;
101 	ushort _port;
102 
103 	MySQLSocketType _socketType;
104 
105 	OpenSocketCallbackPhobos _openSocketPhobos;
106 	OpenSocketCallbackVibeD  _openSocketVibeD;
107 
108 	ulong _insertID;
109 
110 	// This gets incremented every time a command is issued or results are purged,
111 	// so a ResultRange can tell whether it's been invalidated.
112 	ulong _lastCommandID;
113 
114 	// Whether there are rows, headers or bimary data waiting to be retreived.
115 	// MySQL protocol doesn't permit performing any other action until all
116 	// such data is read.
117 	bool _rowsPending, _headersPending, _binaryPending;
118 
119 	// Field count of last performed command.
120 	ushort _fieldCount;
121 
122 	// ResultSetHeaders of last performed command.
123 	ResultSetHeaders _rsh;
124 
125 	// This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise
126 	// you'll get the dreaded "packet out of order" message. It, and the socket connection are
127 	// the reason why most other objects require a connection object for their construction.
128 	ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct
129 				/// ordering. First packet should have 0
130 	@property ubyte pktNumber()   { return _cpn; }
131 	void bumpPacket()       { _cpn++; }
132 	void resetPacket()      { _cpn = 0; }
133 
134 	version(Have_vibe_d_core) {} else
135 	pure const nothrow invariant()
136 	{
137 		assert(_socketType != MySQLSocketType.vibed);
138 	}
139 
140 	void enforceNothingPending()
141 	{
142 		enforceEx!MYXDataPending(!hasPending);
143 	}
144 
145 	debug(MYSQL_INTEGRATION_TESTS)
146 	unittest
147 	{
148 		import mysql.prepared;
149 		import mysql.test.common : scopedCn;
150 		mixin(scopedCn);
151 
152 		cn.exec("DROP TABLE IF EXISTS `enforceNothingPending`");
153 		cn.exec("CREATE TABLE `enforceNothingPending` (
154 			`val` INTEGER
155 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
156 
157 		immutable insertSQL = "INSERT INTO `enforceNothingPending` VALUES (1), (2)";
158 		immutable selectSQL = "SELECT * FROM `enforceNothingPending`";
159 		Prepared preparedInsert;
160 		Prepared preparedSelect;
161 		assertNotThrown!MYXDataPending(cn.exec(insertSQL));
162 		assertNotThrown!MYXDataPending(cn.querySet(selectSQL));
163 		assertNotThrown!MYXDataPending(preparedInsert = cn.prepare(insertSQL));
164 		assertNotThrown!MYXDataPending(preparedSelect = cn.prepare(selectSQL));
165 		assertNotThrown!MYXDataPending(preparedInsert.exec());
166 		assertNotThrown!MYXDataPending(preparedSelect.querySet());
167 		
168 		auto resultSeq = cn.query(selectSQL);
169 		
170 		assertThrown!MYXDataPending(cn.exec(insertSQL));
171 		assertThrown!MYXDataPending(cn.querySet(selectSQL));
172 		assertThrown!MYXDataPending(cn.query(selectSQL));
173 		assertThrown!MYXDataPending(cn.prepare(selectSQL));
174 		assertThrown!MYXDataPending(preparedInsert.exec());
175 		assertThrown!MYXDataPending(preparedSelect.querySet());
176 
177 		resultSeq.each(); // Consume range
178 
179 		assertNotThrown!MYXDataPending(cn.exec(insertSQL));
180 		assertNotThrown!MYXDataPending(cn.querySet(selectSQL));
181 		assertNotThrown!MYXDataPending(cn.prepare(selectSQL));
182 		assertNotThrown!MYXDataPending(preparedInsert.exec());
183 		assertNotThrown!MYXDataPending(preparedSelect.querySet());
184 	}
185 
186 	ubyte[] getPacket()
187 	{
188 		scope(failure) kill();
189 
190 		ubyte[4] header;
191 		_socket.read(header);
192 		// number of bytes always set as 24-bit
193 		uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
194 		enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order");
195 		bumpPacket();
196 
197 		ubyte[] packet = new ubyte[numDataBytes];
198 		_socket.read(packet);
199 		assert(packet.length == numDataBytes, "Wrong number of bytes read");
200 		return packet;
201 	}
202 
203 	void send(const(ubyte)[] packet)
204 	in
205 	{
206 		assert(packet.length > 4); // at least 1 byte more than header
207 	}
208 	body
209 	{
210 		_socket.write(packet);
211 	}
212 
213 	void send(const(ubyte)[] header, const(ubyte)[] data)
214 	in
215 	{
216 		assert(header.length == 4 || header.length == 5/*command type included*/);
217 	}
218 	body
219 	{
220 		_socket.write(header);
221 		if(data.length)
222 			_socket.write(data);
223 	}
224 
225 	void sendCmd(T)(CommandType cmd, const(T)[] data)
226 	in
227 	{
228 		// Internal thread states. Clients shouldn't use this
229 		assert(cmd != CommandType.SLEEP);
230 		assert(cmd != CommandType.CONNECT);
231 		assert(cmd != CommandType.TIME);
232 		assert(cmd != CommandType.DELAYED_INSERT);
233 		assert(cmd != CommandType.CONNECT_OUT);
234 
235 		// Deprecated
236 		assert(cmd != CommandType.CREATE_DB);
237 		assert(cmd != CommandType.DROP_DB);
238 		assert(cmd != CommandType.TABLE_DUMP);
239 
240 		// cannot send more than uint.max bytes. TODO: better error message if we try?
241 		assert(data.length <= uint.max);
242 	}
243 	out
244 	{
245 		// at this point we should have sent a command
246 		assert(pktNumber == 1);
247 	}
248 	body
249 	{
250 		enforceEx!MYX(!(_headersPending || _rowsPending),
251 			"There are result set elements pending - purgeResult() required.");
252 
253 		scope(failure) kill();
254 
255 		_lastCommandID++;
256 
257 		if(!_socket.connected)
258 		{
259 			if(cmd == CommandType.QUIT)
260 				return; // Don't bother reopening connection just to quit
261 
262 			_open = OpenState.notConnected;
263 			connect(_clientCapabilities);
264 		}
265 
266 		resetPacket();
267 
268 		ubyte[] header;
269 		header.length = 4 /*header*/ + 1 /*cmd*/;
270 		header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/);
271 		header[4] = cmd;
272 		bumpPacket();
273 
274 		send(header, cast(const(ubyte)[])data);
275 	}
276 
277 	OKErrorPacket getCmdResponse(bool asString = false)
278 	{
279 		auto okp = OKErrorPacket(getPacket());
280 		enforcePacketOK(okp);
281 		_serverStatus = okp.serverStatus;
282 		return okp;
283 	}
284 
285 	ubyte[] buildAuthPacket(ubyte[] token)
286 	in
287 	{
288 		assert(token.length == 20);
289 	}
290 	body
291 	{
292 		ubyte[] packet;
293 		packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1);
294 		packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
295 
296 		// NOTE: we'll set the header last when we know the size
297 
298 		// Set the default capabilities required by the client
299 		_cCaps.packInto(packet[4..8]);
300 
301 		// Request a conventional maximum packet length.
302 		1.packInto(packet[8..12]);
303 
304 		packet ~= 33; // Set UTF-8 as default charSet
305 
306 		// There's a statutory block of zero bytes here - fill them in.
307 		foreach(i; 0 .. 23)
308 			packet ~= 0;
309 
310 		// Add the user name as a null terminated string
311 		foreach(i; 0 .. _user.length)
312 			packet ~= _user[i];
313 		packet ~= 0; // \0
314 
315 		// Add our calculated authentication token as a length prefixed string.
316 		assert(token.length <= ubyte.max);
317 		if(_pwd.length == 0)  // Omit the token if the account has no password
318 			packet ~= 0;
319 		else
320 		{
321 			packet ~= cast(ubyte)token.length;
322 			foreach(i; 0 .. token.length)
323 				packet ~= token[i];
324 		}
325 
326 		// Add the default database as a null terminated string
327 		foreach(i; 0 .. _db.length)
328 			packet ~= _db[i];
329 		packet ~= 0; // \0
330 
331 		// The server sent us a greeting with packet number 0, so we send the auth packet
332 		// back with the next number.
333 		packet.setPacketHeader(pktNumber);
334 		bumpPacket();
335 		return packet;
336 	}
337 
338 	void consumeServerInfo(ref ubyte[] packet)
339 	{
340 		scope(failure) kill();
341 
342 		_sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
343 		_sCharSet = packet.consume!ubyte(); // server_language
344 		_serverStatus = packet.consume!ushort(); //server_status
345 		_sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
346 		_sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
347 
348 		enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
349 		enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
350 	}
351 
352 	ubyte[] parseGreeting()
353 	{
354 		scope(failure) kill();
355 
356 		ubyte[] packet = getPacket();
357 
358 		if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
359 		{
360 			auto okp = OKErrorPacket(packet);
361 			enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
362 		}
363 
364 		_protocol = packet.consume!ubyte();
365 
366 		_serverVersion = packet.consume!string(packet.countUntil(0));
367 		packet.skip(1); // \0 terminated _serverVersion
368 
369 		_sThread = packet.consume!uint();
370 
371 		// read first part of scramble buf
372 		ubyte[] authBuf;
373 		authBuf.length = 255;
374 		authBuf[0..8] = packet.consume(8)[]; // scramble_buff
375 
376 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
377 
378 		consumeServerInfo(packet);
379 
380 		packet.skip(1); // this byte supposed to be scramble length, but is actually zero
381 		packet.skip(10); // filler of \0
382 
383 		// rest of the scramble
384 		auto len = packet.countUntil(0);
385 		enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
386 		enforce(authBuf.length > 8+len);
387 		authBuf[8..8+len] = packet.consume(len)[];
388 		authBuf.length = 8+len; // cut to correct size
389 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
390 
391 		return authBuf;
392 	}
393 
394 	static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port)
395 	{
396 		auto s = new PlainPhobosSocket();
397 		s.connect(new InternetAddress(host, port));
398 		return s;
399 	}
400 
401 	static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port)
402 	{
403 		version(Have_vibe_d_core)
404 			return vibe.core.net.connectTCP(host, port);
405 		else
406 			assert(0);
407 	}
408 
409 	void initConnection()
410 	{
411 		resetPacket();
412 		final switch(_socketType)
413 		{
414 			case MySQLSocketType.phobos:
415 				_socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port));
416 				break;
417 
418 			case MySQLSocketType.vibed:
419 				version(Have_vibe_d_core) {
420 					_socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port));
421 					break;
422 				} else assert(0, "Unsupported socket type. Need version Have_vibe_d_core.");
423 		}
424 	}
425 
426 	ubyte[] makeToken(ubyte[] authBuf)
427 	{
428 		auto pass1 = sha1Of(cast(const(ubyte)[])_pwd);
429 		auto pass2 = sha1Of(pass1);
430 
431 		SHA1 sha1;
432 		sha1.start();
433 		sha1.put(authBuf);
434 		sha1.put(pass2);
435 		auto result = sha1.finish();
436 		foreach (size_t i; 0..20)
437 			result[i] = result[i] ^ pass1[i];
438 		return result.dup;
439 	}
440 
441 	SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
442 	{
443 		SvrCapFlags common;
444 		uint filter = 1;
445 		foreach (size_t i; 0..uint.sizeof)
446 		{
447 			bool serverSupport = (server & filter) != 0; // can the server do this capability?
448 			bool clientSupport = (client & filter) != 0; // can we support it?
449 			if(serverSupport && clientSupport)
450 				common |= filter;
451 			filter <<= 1; // check next flag
452 		}
453 		return common;
454 	}
455 
456 	void setClientFlags(SvrCapFlags capFlags)
457 	{
458 		_cCaps = getCommonCapabilities(_sCaps, capFlags);
459 
460 		// We cannot operate in <4.1 protocol, so we'll force it even if the user
461 		// didn't supply it
462 		_cCaps |= SvrCapFlags.PROTOCOL41;
463 		_cCaps |= SvrCapFlags.SECURE_CONNECTION;
464 	}
465 
466 	void authenticate(ubyte[] greeting)
467 	in
468 	{
469 		assert(_open == OpenState.connected);
470 	}
471 	out
472 	{
473 		assert(_open == OpenState.authenticated);
474 	}
475 	body
476 	{
477 		auto token = makeToken(greeting);
478 		auto authPacket = buildAuthPacket(token);
479 		send(authPacket);
480 
481 		auto packet = getPacket();
482 		auto okp = OKErrorPacket(packet);
483 		enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
484 		_open = OpenState.authenticated;
485 	}
486 
487 	SvrCapFlags _clientCapabilities;
488 
489 	void connect(SvrCapFlags clientCapabilities)
490 	in
491 	{
492 		assert(closed);
493 	}
494 	out
495 	{
496 		assert(_open == OpenState.authenticated);
497 	}
498 	body
499 	{
500 		initConnection();
501 		auto greeting = parseGreeting();
502 		_open = OpenState.connected;
503 
504 		_clientCapabilities = clientCapabilities;
505 		setClientFlags(clientCapabilities);
506 		authenticate(greeting);
507 	}
508 	
509 	// Forcefully close the socket without sending the quit command.
510 	// Needed in case an error leaves communatations in an undefined or non-recoverable state.
511 	void kill()
512 	{
513 		if(_socket.connected)
514 			_socket.close();
515 		_open = OpenState.notConnected;
516 	}
517 	
518 public:
519 
520 	/++
521 	Construct opened connection.
522 
523 	Throws `mysql.exceptions.MySQLException` upon failure to connect.
524 	
525 	If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
526 	creating a new Connection directly. That will provide certain benefits,
527 	such as reusing old connections and automatic cleanup (no need to close
528 	the connection when done).
529 
530 	------------------
531 	// Suggested usage:
532 
533 	{
534 	    auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
535 	    scope(exit) con.close();
536 
537 	    // Use the connection
538 	    ...
539 	}
540 	------------------
541 
542 	Params:
543 		cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld"
544 			(TODO: The connection string needs work to allow for semicolons in its parts!)
545 		socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos,
546 			unless -version=Have_vibe_d_core is used.
547 		openSocket = Optional callback which should return a newly-opened Phobos
548 			or Vibe.d TCP socket. This allows custom sockets to be used,
549 			subclassed from Phobos's or Vibe.d's sockets.
550 		host = An IP address in numeric dotted form, or as a host  name.
551 		user = The user name to authenticate.
552 		password = Users password.
553 		db = Desired initial database.
554 		capFlags = The set of flag bits from the server's capabilities that the client requires
555 	+/
556 	//After the connection is created, and the initial invitation is received from the server
557 	//client preferences can be set, and authentication can then be attempted.
558 	this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
559 	{
560 		version(Have_vibe_d_core)
561 			enum defaultSocketType = MySQLSocketType.vibed;
562 		else
563 			enum defaultSocketType = MySQLSocketType.phobos;
564 
565 		this(defaultSocketType, host, user, pwd, db, port, capFlags);
566 	}
567 
568 	///ditto
569 	this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
570 	{
571 		version(Have_vibe_d_core) {} else
572 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
573 
574 		this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD,
575 			host, user, pwd, db, port, capFlags);
576 	}
577 
578 	///ditto
579 	this(OpenSocketCallbackPhobos openSocket,
580 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
581 	{
582 		this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags);
583 	}
584 
585 	version(Have_vibe_d_core)
586 	///ditto
587 	this(OpenSocketCallbackVibeD openSocket,
588 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
589 	{
590 		this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags);
591 	}
592 
593 	///ditto
594 	private this(MySQLSocketType socketType,
595 		OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD,
596 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
597 	in
598 	{
599 		final switch(socketType)
600 		{
601 			case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break;
602 			case MySQLSocketType.vibed:  assert(openSocketVibeD  !is null); break;
603 		}
604 	}
605 	body
606 	{
607 		enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1");
608 		enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection");
609 		version(Have_vibe_d_core) {} else
610 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
611 
612 		_socketType = socketType;
613 		_host = host;
614 		_user = user;
615 		_pwd = pwd;
616 		_db = db;
617 		_port = port;
618 
619 		_openSocketPhobos = openSocketPhobos;
620 		_openSocketVibeD  = openSocketVibeD;
621 
622 		connect(capFlags);
623 	}
624 
625 	///ditto
626 	//After the connection is created, and the initial invitation is received from the server
627 	//client preferences can be set, and authentication can then be attempted.
628 	this(string cs, SvrCapFlags capFlags = defaultClientFlags)
629 	{
630 		string[] a = parseConnectionString(cs);
631 		this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
632 	}
633 
634 	///ditto
635 	this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags)
636 	{
637 		string[] a = parseConnectionString(cs);
638 		this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
639 	}
640 
641 	///ditto
642 	this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
643 	{
644 		string[] a = parseConnectionString(cs);
645 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
646 	}
647 
648 	version(Have_vibe_d_core)
649 	///ditto
650 	this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
651 	{
652 		string[] a = parseConnectionString(cs);
653 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
654 	}
655 
656 	/++
657 	Check whether this Connection is still connected to the server, or if
658 	the connection has been closed.
659 	+/
660 	@property bool closed()
661 	{
662 		return _open == OpenState.notConnected || !_socket.connected;
663 	}
664 
665 	version(Have_vibe_d_core)
666 	{
667 		/// Used by Vibe.d's ConnectionPool, ignore this.
668 		void acquire() { if( _socket ) _socket.acquire(); }
669 		///ditto
670 		void release() { if( _socket ) _socket.release(); }
671 		///ditto
672 		bool isOwner() { return _socket ? _socket.isOwner() : false; }
673 		///ditto
674 		bool amOwner() { return _socket ? _socket.isOwner() : false; }
675 	}
676 	else
677 	{
678 		/// Used by Vibe.d's ConnectionPool, ignore this.
679 		void acquire() { /+ Do nothing +/ }
680 		///ditto
681 		void release() { /+ Do nothing +/ }
682 		///ditto
683 		bool isOwner() { return !!_socket; }
684 		///ditto
685 		bool amOwner() { return !!_socket; }
686 	}
687 
688 	/++
689 	Explicitly close the connection.
690 	
691 	This is a two-stage process. First tell the server we are quitting this
692 	connection, and then close the socket.
693 	
694 	Idiomatic use as follows is suggested:
695 	------------------
696 	{
697 	    auto con = new Connection("localhost:user:password:mysqld");
698 	    scope(exit) con.close();
699 	    // Use the connection
700 	    ...
701 	}
702 	------------------
703 	+/
704 	void close()
705 	{
706 		if (_open == OpenState.authenticated && _socket.connected)
707 			quit();
708 
709 		if (_open == OpenState.connected)
710 			kill();
711 		resetPacket();
712 	}
713 
714 	/++
715 	Reconnects to the server using the same connection settings originally
716 	used to create the Connection.
717 
718 	Optionally takes a SvrCapFlags, allowing you to reconnect using a different
719 	set of server capability flags (most users will not need to do this).
720 
721 	If the connection is already open, this will do nothing. However, if you
722 	request a different set of SvrCapFlags then was originally used to create
723 	the Connection, the connection will be closed and then reconnected.
724 	+/
725 	void reconnect()
726 	{
727 		reconnect(_clientCapabilities);
728 	}
729 
730 	///ditto
731 	void reconnect(SvrCapFlags clientCapabilities)
732 	{
733 		bool sameCaps = clientCapabilities == _clientCapabilities;
734 		if(!closed)
735 		{
736 			// Same caps as before?
737 			if(clientCapabilities == _clientCapabilities)
738 				return; // Nothing to do, just keep current connection
739 
740 			close();
741 		}
742 
743 		connect(clientCapabilities);
744 	}
745 
746 	private void quit()
747 	in
748 	{
749 		assert(_open == OpenState.authenticated);
750 	}
751 	body
752 	{
753 		sendCmd(CommandType.QUIT, []);
754 		// No response is sent for a quit packet
755 		_open = OpenState.connected;
756 	}
757 
758 	/++
759 	Parses a connection string of the form
760 	`"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"`
761 
762 	Port is optional and defaults to 3306.
763 
764 	Whitespace surrounding any name or value is automatically stripped.
765 
766 	Returns a five-element array of strings in this order:
767 	$(UL
768 	$(LI [0]: host)
769 	$(LI [1]: user)
770 	$(LI [2]: pwd)
771 	$(LI [3]: db)
772 	$(LI [4]: port)
773 	)
774 	
775 	(TODO: The connection string needs work to allow for semicolons in its parts!)
776 	+/
777 	//TODO: Replace the return value with a proper struct.
778 	static string[] parseConnectionString(string cs)
779 	{
780 		string[] rv;
781 		rv.length = 5;
782 		rv[4] = "3306"; // Default port
783 		string[] a = split(cs, ";");
784 		foreach (s; a)
785 		{
786 			string[] a2 = split(s, "=");
787 			enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs);
788 			string name = strip(a2[0]);
789 			string val = strip(a2[1]);
790 			switch (name)
791 			{
792 				case "host":
793 					rv[0] = val;
794 					break;
795 				case "user":
796 					rv[1] = val;
797 					break;
798 				case "pwd":
799 					rv[2] = val;
800 					break;
801 				case "db":
802 					rv[3] = val;
803 					break;
804 				case "port":
805 					rv[4] = val;
806 					break;
807 				default:
808 					throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__);
809 			}
810 		}
811 		return rv;
812 	}
813 
814 	/++
815 	Select a current database.
816 	
817 	Params: dbName = Name of the requested database
818 	Throws: MySQLException
819 	+/
820 	void selectDB(string dbName)
821 	{
822 		sendCmd(CommandType.INIT_DB, dbName);
823 		getCmdResponse();
824 		_db = dbName;
825 	}
826 
827 	/++
828 	Check the server status
829 	
830 	Returns: An OKErrorPacket from which server status can be determined
831 	Throws: MySQLException
832 	+/
833 	OKErrorPacket pingServer()
834 	{
835 		sendCmd(CommandType.PING, []);
836 		return getCmdResponse();
837 	}
838 
839 	/++
840 	Refresh some feature(s) of the server.
841 	
842 	Returns: An OKErrorPacket from which server status can be determined
843 	Throws: MySQLException
844 	+/
845 	OKErrorPacket refreshServer(RefreshFlags flags)
846 	{
847 		sendCmd(CommandType.REFRESH, [flags]);
848 		return getCmdResponse();
849 	}
850 
851 	/++
852 	Get the next Row of a pending result set.
853 	
854 	This method can be used after either execSQL() or execPrepared() have returned true
855 	to retrieve result set rows sequentially.
856 	
857 	Similar functionality is available via execSQLSequence() and execPreparedSequence() in
858 	which case the interface is presented as a forward range of Rows.
859 	
860 	This method allows you to deal with very large result sets either a row at a time,
861 	or by feeding the rows into some suitable container such as a linked list.
862 	
863 	Returns: A Row object.
864 	+/
865 	Row getNextRow()
866 	{
867 		scope(failure) kill();
868 
869 		if (_headersPending)
870 		{
871 			_rsh = ResultSetHeaders(this, _fieldCount);
872 			_headersPending = false;
873 		}
874 		ubyte[] packet;
875 		Row rr;
876 		packet = getPacket();
877 		if (packet.isEOFPacket())
878 		{
879 			_rowsPending = _binaryPending = false;
880 			return rr;
881 		}
882 		if (_binaryPending)
883 			rr = Row(this, packet, _rsh, true);
884 		else
885 			rr = Row(this, packet, _rsh, false);
886 		//rr._valid = true;
887 		return rr;
888 	}
889 
890 	/++
891 	Flush any outstanding result set elements.
892 	
893 	When the server responds to a command that produces a result set, it
894 	queues the whole set of corresponding packets over the current connection.
895 	Before that Connection can embark on any new command, it must receive
896 	all of those packets and junk them.
897 	http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/
898 	+/
899 	ulong purgeResult()
900 	{
901 		scope(failure) kill();
902 
903 		_lastCommandID++;
904 
905 		ulong rows = 0;
906 		if (_headersPending)
907 		{
908 			for (size_t i = 0;; i++)
909 			{
910 				if (getPacket().isEOFPacket())
911 				{
912 					_headersPending = false;
913 					break;
914 				}
915 				enforceEx!MYXProtocol(i < _fieldCount,
916 					text("Field header count (", _fieldCount, ") exceeded but no EOF packet found."));
917 			}
918 		}
919 		if (_rowsPending)
920 		{
921 			for (;;  rows++)
922 			{
923 				if (getPacket().isEOFPacket())
924 				{
925 					_rowsPending = _binaryPending = false;
926 					break;
927 				}
928 			}
929 		}
930 		resetPacket();
931 		return rows;
932 	}
933 
934 	/++
935 	Get a textual report on the server status.
936 	
937 	(COM_STATISTICS)
938 	+/
939 	string serverStats()
940 	{
941 		sendCmd(CommandType.STATISTICS, []);
942 		return cast(string) getPacket();
943 	}
944 
945 	/++
946 	Enable multiple statement commands
947 	
948 	This can be used later if this feature was not requested in the client capability flags.
949 	
950 	Params: on = Boolean value to turn the capability on or off.
951 	+/
952 	void enableMultiStatements(bool on)
953 	{
954 		scope(failure) kill();
955 
956 		ubyte[] t;
957 		t.length = 2;
958 		t[0] = on ? 0 : 1;
959 		t[1] = 0;
960 		sendCmd(CommandType.STMT_OPTION, t);
961 
962 		// For some reason this command gets an EOF packet as response
963 		auto packet = getPacket();
964 		enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
965 	}
966 
967 	/// Return the in-force protocol number
968 	@property ubyte protocol() pure const nothrow { return _protocol; }
969 	/// Server version
970 	@property string serverVersion() pure const nothrow { return _serverVersion; }
971 	/// Server capability flags
972 	@property uint serverCapabilities() pure const nothrow { return _sCaps; }
973 	/// Server status
974 	@property ushort serverStatus() pure const nothrow { return _serverStatus; }
975 	/// Current character set
976 	@property ubyte charSet() pure const nothrow { return _sCharSet; }
977 	/// Current database
978 	@property string currentDB() pure const nothrow { return _db; }
979 	/// Socket type being used
980 	@property MySQLSocketType socketType() pure const nothrow { return _socketType; }
981 
982 	/// After a command that inserted a row into a table with an auto-increment
983 	/// ID column, this method allows you to retrieve the last insert ID.
984 	@property ulong lastInsertID() pure const nothrow { return _insertID; }
985 
986 	/// This gets incremented every time a command is issued or results are purged,
987 	/// so a ResultRange can tell whether it's been invalidated.
988 	@property ulong lastCommandID() pure const nothrow { return _lastCommandID; }
989 
990 	/// Gets whether rows are pending
991 	@property bool rowsPending() pure const nothrow { return _rowsPending; }
992 
993 	/// Gets whether anything (rows, headers or binary) is pending.
994 	/// New commands cannot be sent on a conncection while anything is pending.
995 	@property bool hasPending() pure const nothrow
996 	{
997 		return _rowsPending || _headersPending || _binaryPending;
998 	}
999 
1000 	/// Gets the result header's field descriptions.
1001 	@property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; }
1002 }