1 /// Connect to a MySQL/MariaDB server.
2 module mysql.connection;
3 
4 import std.algorithm;
5 import std.conv;
6 import std.digest.sha;
7 import std.exception;
8 import std.socket;
9 import std.string;
10 
11 import mysql.commands;
12 import mysql.exceptions;
13 import mysql.prepared;
14 import mysql.protocol.constants;
15 import mysql.protocol.packets;
16 import mysql.protocol.sockets;
17 import mysql.result;
18 debug(MYSQL_INTEGRATION_TESTS)
19 {
20 	import mysql.test.common;
21 }
22 
23 version(Have_vibe_d_core)
24 {
25 	static if(__traits(compiles, (){ import vibe.core.net; } ))
26 		import vibe.core.net;
27 	else
28 		static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'.");
29 }
30 
31 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection.
32 immutable SvrCapFlags defaultClientFlags =
33 		SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS |
34 		SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 |
35 		SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS |
36 		//SvrCapFlags.MULTI_RESULTS;
37 
38 /++
39 A class representing a database connection.
40 
41 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
42 creating a new Connection directly. That will provide certain benefits,
43 such as reusing old connections and automatic cleanup (no need to close
44 the connection when done).
45 
46 ------------------
47 // Suggested usage:
48 
49 {
50 	auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
51 	scope(exit) con.close();
52 
53 	// Use the connection
54 	...
55 }
56 ------------------
57 +/
58 //TODO: All low-level commms should be moved into the mysql.protocol package.
59 class Connection
60 {
61 /+
62 The Connection is responsible for handshaking with the server to establish
63 authentication. It then passes client preferences to the server, and
64 subsequently is the channel for all command packets that are sent, and all
65 response packets received.
66 
67 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one
68 byte as a packet number. Connection deals with the headers and ensures that
69 packet numbers are sequential.
70 
71 The initial packet is sent by the server - essentially a 'hello' packet
72 inviting login. That packet has a sequence number of zero. That sequence
73 number is the incremented by client and server packets through the handshake
74 sequence.
75 
76 After login all further sequences are initialized by the client sending a
77 command packet with a zero sequence number, to which the server replies with
78 zero or more packets with sequential sequence numbers.
79 +/
80 package:
81 	enum OpenState
82 	{
83 		/// We have not yet connected to the server, or have sent QUIT to the
84 		/// server and closed the connection
85 		notConnected,
86 		/// We have connected to the server and parsed the greeting, but not
87 		/// yet authenticated
88 		connected,
89 		/// We have successfully authenticated against the server, and need to
90 		/// send QUIT to the server when closing the connection
91 		authenticated
92 	}
93 	OpenState   _open;
94 	MySQLSocket _socket;
95 
96 	SvrCapFlags _sCaps, _cCaps;
97 	uint    _sThread;
98 	ushort  _serverStatus;
99 	ubyte   _sCharSet, _protocol;
100 	string  _serverVersion;
101 
102 	string _host, _user, _pwd, _db;
103 	ushort _port;
104 
105 	MySQLSocketType _socketType;
106 
107 	OpenSocketCallbackPhobos _openSocketPhobos;
108 	OpenSocketCallbackVibeD  _openSocketVibeD;
109 
110 	ulong _insertID;
111 
112 	// This gets incremented every time a command is issued or results are purged,
113 	// so a ResultRange can tell whether it's been invalidated.
114 	ulong _lastCommandID;
115 
116 	// Whether there are rows, headers or bimary data waiting to be retreived.
117 	// MySQL protocol doesn't permit performing any other action until all
118 	// such data is read.
119 	bool _rowsPending, _headersPending, _binaryPending;
120 
121 	// Field count of last performed command.
122 	ushort _fieldCount;
123 
124 	// ResultSetHeaders of last performed command.
125 	ResultSetHeaders _rsh;
126 
127 	// This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise
128 	// you'll get the dreaded "packet out of order" message. It, and the socket connection are
129 	// the reason why most other objects require a connection object for their construction.
130 	ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct
131 				/// ordering. First packet should have 0
132 	@property ubyte pktNumber()   { return _cpn; }
133 	void bumpPacket()       { _cpn++; }
134 	void resetPacket()      { _cpn = 0; }
135 
136 	version(Have_vibe_d_core) {} else
137 	pure const nothrow invariant()
138 	{
139 		assert(_socketType != MySQLSocketType.vibed);
140 	}
141 
142 	ubyte[] getPacket()
143 	{
144 		scope(failure) kill();
145 
146 		ubyte[4] header;
147 		_socket.read(header);
148 		// number of bytes always set as 24-bit
149 		uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
150 		enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order");
151 		bumpPacket();
152 
153 		ubyte[] packet = new ubyte[numDataBytes];
154 		_socket.read(packet);
155 		assert(packet.length == numDataBytes, "Wrong number of bytes read");
156 		return packet;
157 	}
158 
159 	void send(const(ubyte)[] packet)
160 	in
161 	{
162 		assert(packet.length > 4); // at least 1 byte more than header
163 	}
164 	body
165 	{
166 		_socket.write(packet);
167 	}
168 
169 	void send(const(ubyte)[] header, const(ubyte)[] data)
170 	in
171 	{
172 		assert(header.length == 4 || header.length == 5/*command type included*/);
173 	}
174 	body
175 	{
176 		_socket.write(header);
177 		if(data.length)
178 			_socket.write(data);
179 	}
180 
181 	void sendCmd(T)(CommandType cmd, const(T)[] data)
182 	in
183 	{
184 		// Internal thread states. Clients shouldn't use this
185 		assert(cmd != CommandType.SLEEP);
186 		assert(cmd != CommandType.CONNECT);
187 		assert(cmd != CommandType.TIME);
188 		assert(cmd != CommandType.DELAYED_INSERT);
189 		assert(cmd != CommandType.CONNECT_OUT);
190 
191 		// Deprecated
192 		assert(cmd != CommandType.CREATE_DB);
193 		assert(cmd != CommandType.DROP_DB);
194 		assert(cmd != CommandType.TABLE_DUMP);
195 
196 		// cannot send more than uint.max bytes. TODO: better error message if we try?
197 		assert(data.length <= uint.max);
198 	}
199 	out
200 	{
201 		// at this point we should have sent a command
202 		assert(pktNumber == 1);
203 	}
204 	body
205 	{
206 		autoPurge();
207 
208 		scope(failure) kill();
209 
210 		_lastCommandID++;
211 
212 		if(!_socket.connected)
213 		{
214 			if(cmd == CommandType.QUIT)
215 				return; // Don't bother reopening connection just to quit
216 
217 			_open = OpenState.notConnected;
218 			connect(_clientCapabilities);
219 		}
220 
221 		resetPacket();
222 
223 		ubyte[] header;
224 		header.length = 4 /*header*/ + 1 /*cmd*/;
225 		header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/);
226 		header[4] = cmd;
227 		bumpPacket();
228 
229 		send(header, cast(const(ubyte)[])data);
230 	}
231 
232 	OKErrorPacket getCmdResponse(bool asString = false)
233 	{
234 		auto okp = OKErrorPacket(getPacket());
235 		enforcePacketOK(okp);
236 		_serverStatus = okp.serverStatus;
237 		return okp;
238 	}
239 
240 	ubyte[] buildAuthPacket(ubyte[] token)
241 	in
242 	{
243 		assert(token.length == 20);
244 	}
245 	body
246 	{
247 		ubyte[] packet;
248 		packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1);
249 		packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
250 
251 		// NOTE: we'll set the header last when we know the size
252 
253 		// Set the default capabilities required by the client
254 		_cCaps.packInto(packet[4..8]);
255 
256 		// Request a conventional maximum packet length.
257 		1.packInto(packet[8..12]);
258 
259 		packet ~= 33; // Set UTF-8 as default charSet
260 
261 		// There's a statutory block of zero bytes here - fill them in.
262 		foreach(i; 0 .. 23)
263 			packet ~= 0;
264 
265 		// Add the user name as a null terminated string
266 		foreach(i; 0 .. _user.length)
267 			packet ~= _user[i];
268 		packet ~= 0; // \0
269 
270 		// Add our calculated authentication token as a length prefixed string.
271 		assert(token.length <= ubyte.max);
272 		if(_pwd.length == 0)  // Omit the token if the account has no password
273 			packet ~= 0;
274 		else
275 		{
276 			packet ~= cast(ubyte)token.length;
277 			foreach(i; 0 .. token.length)
278 				packet ~= token[i];
279 		}
280 
281 		// Add the default database as a null terminated string
282 		foreach(i; 0 .. _db.length)
283 			packet ~= _db[i];
284 		packet ~= 0; // \0
285 
286 		// The server sent us a greeting with packet number 0, so we send the auth packet
287 		// back with the next number.
288 		packet.setPacketHeader(pktNumber);
289 		bumpPacket();
290 		return packet;
291 	}
292 
293 	void consumeServerInfo(ref ubyte[] packet)
294 	{
295 		scope(failure) kill();
296 
297 		_sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
298 		_sCharSet = packet.consume!ubyte(); // server_language
299 		_serverStatus = packet.consume!ushort(); //server_status
300 		_sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
301 		_sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
302 
303 		enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
304 		enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
305 	}
306 
307 	ubyte[] parseGreeting()
308 	{
309 		scope(failure) kill();
310 
311 		ubyte[] packet = getPacket();
312 
313 		if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
314 		{
315 			auto okp = OKErrorPacket(packet);
316 			enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
317 		}
318 
319 		_protocol = packet.consume!ubyte();
320 
321 		_serverVersion = packet.consume!string(packet.countUntil(0));
322 		packet.skip(1); // \0 terminated _serverVersion
323 
324 		_sThread = packet.consume!uint();
325 
326 		// read first part of scramble buf
327 		ubyte[] authBuf;
328 		authBuf.length = 255;
329 		authBuf[0..8] = packet.consume(8)[]; // scramble_buff
330 
331 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
332 
333 		consumeServerInfo(packet);
334 
335 		packet.skip(1); // this byte supposed to be scramble length, but is actually zero
336 		packet.skip(10); // filler of \0
337 
338 		// rest of the scramble
339 		auto len = packet.countUntil(0);
340 		enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
341 		enforce(authBuf.length > 8+len);
342 		authBuf[8..8+len] = packet.consume(len)[];
343 		authBuf.length = 8+len; // cut to correct size
344 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
345 
346 		return authBuf;
347 	}
348 
349 	static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port)
350 	{
351 		auto s = new PlainPhobosSocket();
352 		s.connect(new InternetAddress(host, port));
353 		return s;
354 	}
355 
356 	static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port)
357 	{
358 		version(Have_vibe_d_core)
359 			return vibe.core.net.connectTCP(host, port);
360 		else
361 			assert(0);
362 	}
363 
364 	void initConnection()
365 	{
366 		resetPacket();
367 		final switch(_socketType)
368 		{
369 			case MySQLSocketType.phobos:
370 				_socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port));
371 				break;
372 
373 			case MySQLSocketType.vibed:
374 				version(Have_vibe_d_core) {
375 					_socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port));
376 					break;
377 				} else assert(0, "Unsupported socket type. Need version Have_vibe_d_core.");
378 		}
379 	}
380 
381 	ubyte[] makeToken(ubyte[] authBuf)
382 	{
383 		auto pass1 = sha1Of(cast(const(ubyte)[])_pwd);
384 		auto pass2 = sha1Of(pass1);
385 
386 		SHA1 sha1;
387 		sha1.start();
388 		sha1.put(authBuf);
389 		sha1.put(pass2);
390 		auto result = sha1.finish();
391 		foreach (size_t i; 0..20)
392 			result[i] = result[i] ^ pass1[i];
393 		return result.dup;
394 	}
395 
396 	SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
397 	{
398 		SvrCapFlags common;
399 		uint filter = 1;
400 		foreach (size_t i; 0..uint.sizeof)
401 		{
402 			bool serverSupport = (server & filter) != 0; // can the server do this capability?
403 			bool clientSupport = (client & filter) != 0; // can we support it?
404 			if(serverSupport && clientSupport)
405 				common |= filter;
406 			filter <<= 1; // check next flag
407 		}
408 		return common;
409 	}
410 
411 	void setClientFlags(SvrCapFlags capFlags)
412 	{
413 		_cCaps = getCommonCapabilities(_sCaps, capFlags);
414 
415 		// We cannot operate in <4.1 protocol, so we'll force it even if the user
416 		// didn't supply it
417 		_cCaps |= SvrCapFlags.PROTOCOL41;
418 		_cCaps |= SvrCapFlags.SECURE_CONNECTION;
419 	}
420 
421 	void authenticate(ubyte[] greeting)
422 	in
423 	{
424 		assert(_open == OpenState.connected);
425 	}
426 	out
427 	{
428 		assert(_open == OpenState.authenticated);
429 	}
430 	body
431 	{
432 		auto token = makeToken(greeting);
433 		auto authPacket = buildAuthPacket(token);
434 		send(authPacket);
435 
436 		auto packet = getPacket();
437 		auto okp = OKErrorPacket(packet);
438 		enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
439 		_open = OpenState.authenticated;
440 	}
441 
442 	SvrCapFlags _clientCapabilities;
443 
444 	void connect(SvrCapFlags clientCapabilities)
445 	in
446 	{
447 		assert(closed);
448 	}
449 	out
450 	{
451 		assert(_open == OpenState.authenticated);
452 	}
453 	body
454 	{
455 		initConnection();
456 		auto greeting = parseGreeting();
457 		_open = OpenState.connected;
458 
459 		_clientCapabilities = clientCapabilities;
460 		setClientFlags(clientCapabilities);
461 		authenticate(greeting);
462 	}
463 	
464 	/// Forcefully close the socket without sending the quit command.
465 	/// Needed in case an error leaves communatations in an undefined or non-recoverable state.
466 	void kill()
467 	{
468 		if(_socket.connected)
469 			_socket.close();
470 		_open = OpenState.notConnected;
471 	}
472 	
473 	/// Called whenever mysql-native needs to send a command to the server
474 	/// and be sure there aren't any pending results (which would prevent
475 	/// a new command from being sent).
476 	void autoPurge()
477 	{
478 		// This is called every time a command is sent,
479 		// so detect & prevent infinite recursion.
480 		static bool isAutoPurging = false;
481 
482 		if(isAutoPurging)
483 			return;
484 			
485 		isAutoPurging = true;
486 		scope(exit) isAutoPurging = false;
487 		scope(failure) kill();
488 
489 		purgeResult();
490 		statementsToRelease.releaseAll();
491 	}
492 
493 	/++
494 	Keeps track of prepared statements queued to be released from the server.
495 
496 	Prepared statements aren't released immediately, because that
497 	involves sending a command to the server even though there might be
498 	results pending. (Can't send a command while results are pending.)
499 	+/
500 	static struct StatementsToRelease(alias doRelease)
501 	{
502 		private Connection conn;
503 		
504 		/// Ids of prepared statements to be released.
505 		/// This uses assumeSafeAppend. Do not save copies of it.
506 		uint[] ids;
507 		
508 		void add(uint statementId)
509 		{
510 			ids ~= statementId;
511 		}
512 
513 		/// Removes a prepared statement from the list of statements
514 		/// to be released from the server.
515 		/// Does nothing if the statement isn't on the list.
516 		void remove(uint statementId)
517 		{
518 			foreach(ref id; ids)
519 			if(id == statementId)
520 				id = 0;
521 		}
522 
523 		/// Releases the prepared statements queued for release.
524 		void releaseAll()
525 		{
526 			foreach(id; ids)
527 			if(id != 0)
528 				doRelease(conn, id);
529 
530 			ids.length = 0;
531 			assumeSafeAppend(ids);
532 		}
533 	}
534 	StatementsToRelease!(PreparedImpl.immediateRelease) statementsToRelease;
535 	
536 	debug(MYSQL_INTEGRATION_TESTS) uint[] fakeRelease_released;
537 	unittest
538 	{
539 		debug(MYSQL_INTEGRATION_TESTS)
540 		{
541 			static void fakeRelease(Connection conn, uint id)
542 			{
543 				conn.fakeRelease_released ~= id;
544 			}
545 			
546 			mixin(scopedCn);
547 			
548 			StatementsToRelease!fakeRelease list;
549 			list.conn = cn;
550 			assert(list.ids == []);
551 			assert(cn.fakeRelease_released == []);
552 
553 			list.add(1);
554 			assert(list.ids == [1]);
555 			assert(cn.fakeRelease_released == []);
556 
557 			list.add(7);
558 			assert(list.ids == [1, 7]);
559 			assert(cn.fakeRelease_released == []);
560 
561 			list.add(9);
562 			assert(list.ids == [1, 7, 9]);
563 			assert(cn.fakeRelease_released == []);
564 
565 			list.remove(5);
566 			assert(list.ids == [1, 7, 9]);
567 			assert(cn.fakeRelease_released == []);
568 			
569 			list.remove(7);
570 			assert(list.ids == [1, 0, 9]);
571 			assert(cn.fakeRelease_released == []);
572 			
573 			list.releaseAll();
574 			assert(list.ids == []);
575 			assert(cn.fakeRelease_released == [1, 9]);
576 		}
577 	}
578 
579 public:
580 
581 	/++
582 	Construct opened connection.
583 
584 	Throws `mysql.exceptions.MySQLException` upon failure to connect.
585 	
586 	If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
587 	creating a new Connection directly. That will provide certain benefits,
588 	such as reusing old connections and automatic cleanup (no need to close
589 	the connection when done).
590 
591 	------------------
592 	// Suggested usage:
593 
594 	{
595 	    auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
596 	    scope(exit) con.close();
597 
598 	    // Use the connection
599 	    ...
600 	}
601 	------------------
602 
603 	Params:
604 		cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld"
605 			(TODO: The connection string needs work to allow for semicolons in its parts!)
606 		socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos,
607 			unless -version=Have_vibe_d_core is used.
608 		openSocket = Optional callback which should return a newly-opened Phobos
609 			or Vibe.d TCP socket. This allows custom sockets to be used,
610 			subclassed from Phobos's or Vibe.d's sockets.
611 		host = An IP address in numeric dotted form, or as a host  name.
612 		user = The user name to authenticate.
613 		password = Users password.
614 		db = Desired initial database.
615 		capFlags = The set of flag bits from the server's capabilities that the client requires
616 	+/
617 	//After the connection is created, and the initial invitation is received from the server
618 	//client preferences can be set, and authentication can then be attempted.
619 	this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
620 	{
621 		version(Have_vibe_d_core)
622 			enum defaultSocketType = MySQLSocketType.vibed;
623 		else
624 			enum defaultSocketType = MySQLSocketType.phobos;
625 
626 		this(defaultSocketType, host, user, pwd, db, port, capFlags);
627 	}
628 
629 	///ditto
630 	this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
631 	{
632 		version(Have_vibe_d_core) {} else
633 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
634 
635 		this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD,
636 			host, user, pwd, db, port, capFlags);
637 	}
638 
639 	///ditto
640 	this(OpenSocketCallbackPhobos openSocket,
641 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
642 	{
643 		this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags);
644 	}
645 
646 	version(Have_vibe_d_core)
647 	///ditto
648 	this(OpenSocketCallbackVibeD openSocket,
649 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
650 	{
651 		this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags);
652 	}
653 
654 	///ditto
655 	private this(MySQLSocketType socketType,
656 		OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD,
657 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
658 	in
659 	{
660 		final switch(socketType)
661 		{
662 			case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break;
663 			case MySQLSocketType.vibed:  assert(openSocketVibeD  !is null); break;
664 		}
665 	}
666 	body
667 	{
668 		enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1");
669 		enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection");
670 		version(Have_vibe_d_core) {} else
671 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
672 
673 		statementsToRelease.conn = this;
674 
675 		_socketType = socketType;
676 		_host = host;
677 		_user = user;
678 		_pwd = pwd;
679 		_db = db;
680 		_port = port;
681 
682 		_openSocketPhobos = openSocketPhobos;
683 		_openSocketVibeD  = openSocketVibeD;
684 
685 		connect(capFlags);
686 	}
687 
688 	///ditto
689 	//After the connection is created, and the initial invitation is received from the server
690 	//client preferences can be set, and authentication can then be attempted.
691 	this(string cs, SvrCapFlags capFlags = defaultClientFlags)
692 	{
693 		string[] a = parseConnectionString(cs);
694 		this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
695 	}
696 
697 	///ditto
698 	this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags)
699 	{
700 		string[] a = parseConnectionString(cs);
701 		this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
702 	}
703 
704 	///ditto
705 	this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
706 	{
707 		string[] a = parseConnectionString(cs);
708 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
709 	}
710 
711 	version(Have_vibe_d_core)
712 	///ditto
713 	this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
714 	{
715 		string[] a = parseConnectionString(cs);
716 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
717 	}
718 
719 	/++
720 	Check whether this Connection is still connected to the server, or if
721 	the connection has been closed.
722 	+/
723 	@property bool closed()
724 	{
725 		return _open == OpenState.notConnected || !_socket.connected;
726 	}
727 
728 	version(Have_vibe_d_core)
729 	{
730 		/// Used by Vibe.d's ConnectionPool, ignore this.
731 		void acquire() { if( _socket ) _socket.acquire(); }
732 		///ditto
733 		void release() { if( _socket ) _socket.release(); }
734 		///ditto
735 		bool isOwner() { return _socket ? _socket.isOwner() : false; }
736 		///ditto
737 		bool amOwner() { return _socket ? _socket.isOwner() : false; }
738 	}
739 	else
740 	{
741 		/// Used by Vibe.d's ConnectionPool, ignore this.
742 		void acquire() { /+ Do nothing +/ }
743 		///ditto
744 		void release() { /+ Do nothing +/ }
745 		///ditto
746 		bool isOwner() { return !!_socket; }
747 		///ditto
748 		bool amOwner() { return !!_socket; }
749 	}
750 
751 	/++
752 	Explicitly close the connection.
753 	
754 	This is a two-stage process. First tell the server we are quitting this
755 	connection, and then close the socket.
756 	
757 	Idiomatic use as follows is suggested:
758 	------------------
759 	{
760 	    auto con = new Connection("localhost:user:password:mysqld");
761 	    scope(exit) con.close();
762 	    // Use the connection
763 	    ...
764 	}
765 	------------------
766 	+/
767 	void close()
768 	{
769 		if (_open == OpenState.authenticated && _socket.connected)
770 			quit();
771 
772 		if (_open == OpenState.connected)
773 			kill();
774 		resetPacket();
775 	}
776 
777 	/++
778 	Reconnects to the server using the same connection settings originally
779 	used to create the Connection.
780 
781 	Optionally takes a SvrCapFlags, allowing you to reconnect using a different
782 	set of server capability flags (most users will not need to do this).
783 
784 	If the connection is already open, this will do nothing. However, if you
785 	request a different set of SvrCapFlags then was originally used to create
786 	the Connection, the connection will be closed and then reconnected.
787 	+/
788 	void reconnect()
789 	{
790 		reconnect(_clientCapabilities);
791 	}
792 
793 	///ditto
794 	void reconnect(SvrCapFlags clientCapabilities)
795 	{
796 		bool sameCaps = clientCapabilities == _clientCapabilities;
797 		if(!closed)
798 		{
799 			// Same caps as before?
800 			if(clientCapabilities == _clientCapabilities)
801 				return; // Nothing to do, just keep current connection
802 
803 			close();
804 		}
805 
806 		connect(clientCapabilities);
807 	}
808 
809 	private void quit()
810 	in
811 	{
812 		assert(_open == OpenState.authenticated);
813 	}
814 	body
815 	{
816 		sendCmd(CommandType.QUIT, []);
817 		// No response is sent for a quit packet
818 		_open = OpenState.connected;
819 	}
820 
821 	/++
822 	Parses a connection string of the form
823 	`"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"`
824 
825 	Port is optional and defaults to 3306.
826 
827 	Whitespace surrounding any name or value is automatically stripped.
828 
829 	Returns a five-element array of strings in this order:
830 	$(UL
831 	$(LI [0]: host)
832 	$(LI [1]: user)
833 	$(LI [2]: pwd)
834 	$(LI [3]: db)
835 	$(LI [4]: port)
836 	)
837 	
838 	(TODO: The connection string needs work to allow for semicolons in its parts!)
839 	+/
840 	//TODO: Replace the return value with a proper struct.
841 	static string[] parseConnectionString(string cs)
842 	{
843 		string[] rv;
844 		rv.length = 5;
845 		rv[4] = "3306"; // Default port
846 		string[] a = split(cs, ";");
847 		foreach (s; a)
848 		{
849 			string[] a2 = split(s, "=");
850 			enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs);
851 			string name = strip(a2[0]);
852 			string val = strip(a2[1]);
853 			switch (name)
854 			{
855 				case "host":
856 					rv[0] = val;
857 					break;
858 				case "user":
859 					rv[1] = val;
860 					break;
861 				case "pwd":
862 					rv[2] = val;
863 					break;
864 				case "db":
865 					rv[3] = val;
866 					break;
867 				case "port":
868 					rv[4] = val;
869 					break;
870 				default:
871 					throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__);
872 			}
873 		}
874 		return rv;
875 	}
876 
877 	/++
878 	Select a current database.
879 	
880 	Params: dbName = Name of the requested database
881 	Throws: MySQLException
882 	+/
883 	void selectDB(string dbName)
884 	{
885 		sendCmd(CommandType.INIT_DB, dbName);
886 		getCmdResponse();
887 		_db = dbName;
888 	}
889 
890 	/++
891 	Check the server status
892 	
893 	Returns: An OKErrorPacket from which server status can be determined
894 	Throws: MySQLException
895 	+/
896 	OKErrorPacket pingServer()
897 	{
898 		sendCmd(CommandType.PING, []);
899 		return getCmdResponse();
900 	}
901 
902 	/++
903 	Refresh some feature(s) of the server.
904 	
905 	Returns: An OKErrorPacket from which server status can be determined
906 	Throws: MySQLException
907 	+/
908 	OKErrorPacket refreshServer(RefreshFlags flags)
909 	{
910 		sendCmd(CommandType.REFRESH, [flags]);
911 		return getCmdResponse();
912 	}
913 
914 	/++
915 	Get the next Row of a pending result set.
916 	
917 	This method can be used after either execSQL() or execPrepared() have returned true
918 	to retrieve result set rows sequentially.
919 	
920 	Similar functionality is available via execSQLSequence() and execPreparedSequence() in
921 	which case the interface is presented as a forward range of Rows.
922 	
923 	This method allows you to deal with very large result sets either a row at a time,
924 	or by feeding the rows into some suitable container such as a linked list.
925 	
926 	Returns: A Row object.
927 	+/
928 	Row getNextRow()
929 	{
930 		scope(failure) kill();
931 
932 		if (_headersPending)
933 		{
934 			_rsh = ResultSetHeaders(this, _fieldCount);
935 			_headersPending = false;
936 		}
937 		ubyte[] packet;
938 		Row rr;
939 		packet = getPacket();
940 		if (packet.isEOFPacket())
941 		{
942 			_rowsPending = _binaryPending = false;
943 			return rr;
944 		}
945 		if (_binaryPending)
946 			rr = Row(this, packet, _rsh, true);
947 		else
948 			rr = Row(this, packet, _rsh, false);
949 		//rr._valid = true;
950 		return rr;
951 	}
952 
953 	/++
954 	Flush any outstanding result set elements.
955 	
956 	When the server responds to a command that produces a result set, it
957 	queues the whole set of corresponding packets over the current connection.
958 	Before that Connection can embark on any new command, it must receive
959 	all of those packets and junk them.
960 	http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/
961 	+/
962 	ulong purgeResult()
963 	{
964 		scope(failure) kill();
965 
966 		_lastCommandID++;
967 
968 		ulong rows = 0;
969 		if (_headersPending)
970 		{
971 			for (size_t i = 0;; i++)
972 			{
973 				if (getPacket().isEOFPacket())
974 				{
975 					_headersPending = false;
976 					break;
977 				}
978 				enforceEx!MYXProtocol(i < _fieldCount,
979 					text("Field header count (", _fieldCount, ") exceeded but no EOF packet found."));
980 			}
981 		}
982 		if (_rowsPending)
983 		{
984 			for (;;  rows++)
985 			{
986 				if (getPacket().isEOFPacket())
987 				{
988 					_rowsPending = _binaryPending = false;
989 					break;
990 				}
991 			}
992 		}
993 		resetPacket();
994 		return rows;
995 	}
996 
997 	/++
998 	Get a textual report on the server status.
999 	
1000 	(COM_STATISTICS)
1001 	+/
1002 	string serverStats()
1003 	{
1004 		sendCmd(CommandType.STATISTICS, []);
1005 		return cast(string) getPacket();
1006 	}
1007 
1008 	/++
1009 	Enable multiple statement commands
1010 	
1011 	This can be used later if this feature was not requested in the client capability flags.
1012 	
1013 	Params: on = Boolean value to turn the capability on or off.
1014 	+/
1015 	void enableMultiStatements(bool on)
1016 	{
1017 		scope(failure) kill();
1018 
1019 		ubyte[] t;
1020 		t.length = 2;
1021 		t[0] = on ? 0 : 1;
1022 		t[1] = 0;
1023 		sendCmd(CommandType.STMT_OPTION, t);
1024 
1025 		// For some reason this command gets an EOF packet as response
1026 		auto packet = getPacket();
1027 		enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1028 	}
1029 
1030 	/// Return the in-force protocol number
1031 	@property ubyte protocol() pure const nothrow { return _protocol; }
1032 	/// Server version
1033 	@property string serverVersion() pure const nothrow { return _serverVersion; }
1034 	/// Server capability flags
1035 	@property uint serverCapabilities() pure const nothrow { return _sCaps; }
1036 	/// Server status
1037 	@property ushort serverStatus() pure const nothrow { return _serverStatus; }
1038 	/// Current character set
1039 	@property ubyte charSet() pure const nothrow { return _sCharSet; }
1040 	/// Current database
1041 	@property string currentDB() pure const nothrow { return _db; }
1042 	/// Socket type being used
1043 	@property MySQLSocketType socketType() pure const nothrow { return _socketType; }
1044 
1045 	/// After a command that inserted a row into a table with an auto-increment
1046 	/// ID column, this method allows you to retrieve the last insert ID.
1047 	@property ulong lastInsertID() pure const nothrow { return _insertID; }
1048 
1049 	/// This gets incremented every time a command is issued or results are purged,
1050 	/// so a ResultRange can tell whether it's been invalidated.
1051 	@property ulong lastCommandID() pure const nothrow { return _lastCommandID; }
1052 
1053 	/// Gets whether rows are pending
1054 	@property bool rowsPending() pure const nothrow { return _rowsPending; }
1055 
1056 	/// Gets whether anything (rows, headers or binary) is pending.
1057 	/// New commands cannot be sent on a conncection while anything is pending.
1058 	@property bool hasPending() pure const nothrow
1059 	{
1060 		return _rowsPending || _headersPending || _binaryPending;
1061 	}
1062 
1063 	/// Gets the result header's field descriptions.
1064 	@property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; }
1065 }