1 /// Connect to a MySQL/MariaDB server.
2 module mysql.connection;
3 
4 import std.algorithm;
5 import std.conv;
6 import std.digest.sha;
7 import std.exception;
8 import std.socket;
9 import std.string;
10 
11 import mysql.commands;
12 import mysql.exceptions;
13 import mysql.prepared;
14 import mysql.protocol.constants;
15 import mysql.protocol.packets;
16 import mysql.protocol.sockets;
17 import mysql.result;
18 debug(MYSQL_INTEGRATION_TESTS)
19 {
20 	import mysql.test.common;
21 }
22 
23 version(Have_vibe_d_core)
24 {
25 	static if(__traits(compiles, (){ import vibe.core.net; } ))
26 		import vibe.core.net;
27 	else
28 		static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'.");
29 }
30 
31 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection.
32 immutable SvrCapFlags defaultClientFlags =
33 		SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS |
34 		SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 |
35 		SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS |
36 		//SvrCapFlags.MULTI_RESULTS;
37 
38 /++
39 A class representing a database connection.
40 
41 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
42 creating a new Connection directly. That will provide certain benefits,
43 such as reusing old connections and automatic cleanup (no need to close
44 the connection when done).
45 
46 ------------------
47 // Suggested usage:
48 
49 {
50 	auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
51 	scope(exit) con.close();
52 
53 	// Use the connection
54 	...
55 }
56 ------------------
57 +/
58 //TODO: All low-level commms should be moved into the mysql.protocol package.
59 class Connection
60 {
61 /+
62 The Connection is responsible for handshaking with the server to establish
63 authentication. It then passes client preferences to the server, and
64 subsequently is the channel for all command packets that are sent, and all
65 response packets received.
66 
67 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one
68 byte as a packet number. Connection deals with the headers and ensures that
69 packet numbers are sequential.
70 
71 The initial packet is sent by the server - essentially a 'hello' packet
72 inviting login. That packet has a sequence number of zero. That sequence
73 number is the incremented by client and server packets through the handshake
74 sequence.
75 
76 After login all further sequences are initialized by the client sending a
77 command packet with a zero sequence number, to which the server replies with
78 zero or more packets with sequential sequence numbers.
79 +/
80 package:
81 	enum OpenState
82 	{
83 		/// We have not yet connected to the server, or have sent QUIT to the
84 		/// server and closed the connection
85 		notConnected,
86 		/// We have connected to the server and parsed the greeting, but not
87 		/// yet authenticated
88 		connected,
89 		/// We have successfully authenticated against the server, and need to
90 		/// send QUIT to the server when closing the connection
91 		authenticated
92 	}
93 	OpenState   _open;
94 	MySQLSocket _socket;
95 
96 	SvrCapFlags _sCaps, _cCaps;
97 	uint    _sThread;
98 	ushort  _serverStatus;
99 	ubyte   _sCharSet, _protocol;
100 	string  _serverVersion;
101 
102 	string _host, _user, _pwd, _db;
103 	ushort _port;
104 
105 	MySQLSocketType _socketType;
106 
107 	OpenSocketCallbackPhobos _openSocketPhobos;
108 	OpenSocketCallbackVibeD  _openSocketVibeD;
109 
110 	ulong _insertID;
111 
112 	// This gets incremented every time a command is issued or results are purged,
113 	// so a ResultRange can tell whether it's been invalidated.
114 	ulong _lastCommandID;
115 
116 	// Whether there are rows, headers or bimary data waiting to be retreived.
117 	// MySQL protocol doesn't permit performing any other action until all
118 	// such data is read.
119 	bool _rowsPending, _headersPending, _binaryPending;
120 
121 	// Field count of last performed command.
122 	ushort _fieldCount;
123 
124 	// ResultSetHeaders of last performed command.
125 	ResultSetHeaders _rsh;
126 
127 	// This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise
128 	// you'll get the dreaded "packet out of order" message. It, and the socket connection are
129 	// the reason why most other objects require a connection object for their construction.
130 	ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct
131 				/// ordering. First packet should have 0
132 	@property ubyte pktNumber()   { return _cpn; }
133 	void bumpPacket()       { _cpn++; }
134 	void resetPacket()      { _cpn = 0; }
135 
136 	version(Have_vibe_d_core) {} else
137 	pure const nothrow invariant()
138 	{
139 		assert(_socketType != MySQLSocketType.vibed);
140 	}
141 
142 	ubyte[] getPacket()
143 	{
144 		scope(failure) kill();
145 
146 		ubyte[4] header;
147 		_socket.read(header);
148 		// number of bytes always set as 24-bit
149 		uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
150 		enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order");
151 		bumpPacket();
152 
153 		ubyte[] packet = new ubyte[numDataBytes];
154 		_socket.read(packet);
155 		assert(packet.length == numDataBytes, "Wrong number of bytes read");
156 		return packet;
157 	}
158 
159 	void send(const(ubyte)[] packet)
160 	in
161 	{
162 		assert(packet.length > 4); // at least 1 byte more than header
163 	}
164 	body
165 	{
166 		_socket.write(packet);
167 	}
168 
169 	void send(const(ubyte)[] header, const(ubyte)[] data)
170 	in
171 	{
172 		assert(header.length == 4 || header.length == 5/*command type included*/);
173 	}
174 	body
175 	{
176 		_socket.write(header);
177 		if(data.length)
178 			_socket.write(data);
179 	}
180 
181 	void sendCmd(T)(CommandType cmd, const(T)[] data)
182 	in
183 	{
184 		// Internal thread states. Clients shouldn't use this
185 		assert(cmd != CommandType.SLEEP);
186 		assert(cmd != CommandType.CONNECT);
187 		assert(cmd != CommandType.TIME);
188 		assert(cmd != CommandType.DELAYED_INSERT);
189 		assert(cmd != CommandType.CONNECT_OUT);
190 
191 		// Deprecated
192 		assert(cmd != CommandType.CREATE_DB);
193 		assert(cmd != CommandType.DROP_DB);
194 		assert(cmd != CommandType.TABLE_DUMP);
195 
196 		// cannot send more than uint.max bytes. TODO: better error message if we try?
197 		assert(data.length <= uint.max);
198 	}
199 	out
200 	{
201 		// at this point we should have sent a command
202 		assert(pktNumber == 1);
203 	}
204 	body
205 	{
206 		autoPurge();
207 
208 		scope(failure) kill();
209 
210 		_lastCommandID++;
211 
212 		if(!_socket.connected)
213 		{
214 			if(cmd == CommandType.QUIT)
215 				return; // Don't bother reopening connection just to quit
216 
217 			_open = OpenState.notConnected;
218 			connect(_clientCapabilities);
219 		}
220 
221 		resetPacket();
222 
223 		ubyte[] header;
224 		header.length = 4 /*header*/ + 1 /*cmd*/;
225 		header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/);
226 		header[4] = cmd;
227 		bumpPacket();
228 
229 		send(header, cast(const(ubyte)[])data);
230 	}
231 
232 	OKErrorPacket getCmdResponse(bool asString = false)
233 	{
234 		auto okp = OKErrorPacket(getPacket());
235 		enforcePacketOK(okp);
236 		_serverStatus = okp.serverStatus;
237 		return okp;
238 	}
239 
240 	ubyte[] buildAuthPacket(ubyte[] token)
241 	in
242 	{
243 		assert(token.length == 20);
244 	}
245 	body
246 	{
247 		ubyte[] packet;
248 		packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1);
249 		packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
250 
251 		// NOTE: we'll set the header last when we know the size
252 
253 		// Set the default capabilities required by the client
254 		_cCaps.packInto(packet[4..8]);
255 
256 		// Request a conventional maximum packet length.
257 		1.packInto(packet[8..12]);
258 
259 		packet ~= 33; // Set UTF-8 as default charSet
260 
261 		// There's a statutory block of zero bytes here - fill them in.
262 		foreach(i; 0 .. 23)
263 			packet ~= 0;
264 
265 		// Add the user name as a null terminated string
266 		foreach(i; 0 .. _user.length)
267 			packet ~= _user[i];
268 		packet ~= 0; // \0
269 
270 		// Add our calculated authentication token as a length prefixed string.
271 		assert(token.length <= ubyte.max);
272 		if(_pwd.length == 0)  // Omit the token if the account has no password
273 			packet ~= 0;
274 		else
275 		{
276 			packet ~= cast(ubyte)token.length;
277 			foreach(i; 0 .. token.length)
278 				packet ~= token[i];
279 		}
280 
281 		// Add the default database as a null terminated string
282 		foreach(i; 0 .. _db.length)
283 			packet ~= _db[i];
284 		packet ~= 0; // \0
285 
286 		// The server sent us a greeting with packet number 0, so we send the auth packet
287 		// back with the next number.
288 		packet.setPacketHeader(pktNumber);
289 		bumpPacket();
290 		return packet;
291 	}
292 
293 	void consumeServerInfo(ref ubyte[] packet)
294 	{
295 		scope(failure) kill();
296 
297 		_sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
298 		_sCharSet = packet.consume!ubyte(); // server_language
299 		_serverStatus = packet.consume!ushort(); //server_status
300 		_sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
301 		_sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
302 
303 		enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
304 		enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
305 	}
306 
307 	ubyte[] parseGreeting()
308 	{
309 		scope(failure) kill();
310 
311 		ubyte[] packet = getPacket();
312 
313 		if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
314 		{
315 			auto okp = OKErrorPacket(packet);
316 			enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
317 		}
318 
319 		_protocol = packet.consume!ubyte();
320 
321 		_serverVersion = packet.consume!string(packet.countUntil(0));
322 		packet.skip(1); // \0 terminated _serverVersion
323 
324 		_sThread = packet.consume!uint();
325 
326 		// read first part of scramble buf
327 		ubyte[] authBuf;
328 		authBuf.length = 255;
329 		authBuf[0..8] = packet.consume(8)[]; // scramble_buff
330 
331 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
332 
333 		consumeServerInfo(packet);
334 
335 		packet.skip(1); // this byte supposed to be scramble length, but is actually zero
336 		packet.skip(10); // filler of \0
337 
338 		// rest of the scramble
339 		auto len = packet.countUntil(0);
340 		enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
341 		enforce(authBuf.length > 8+len);
342 		authBuf[8..8+len] = packet.consume(len)[];
343 		authBuf.length = 8+len; // cut to correct size
344 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
345 
346 		return authBuf;
347 	}
348 
349 	static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port)
350 	{
351 		auto s = new PlainPhobosSocket();
352 		s.connect(new InternetAddress(host, port));
353 		return s;
354 	}
355 
356 	static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port)
357 	{
358 		version(Have_vibe_d_core)
359 			return vibe.core.net.connectTCP(host, port);
360 		else
361 			assert(0);
362 	}
363 
364 	void initConnection()
365 	{
366 		resetPacket();
367 		final switch(_socketType)
368 		{
369 			case MySQLSocketType.phobos:
370 				_socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port));
371 				break;
372 
373 			case MySQLSocketType.vibed:
374 				version(Have_vibe_d_core) {
375 					_socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port));
376 					break;
377 				} else assert(0, "Unsupported socket type. Need version Have_vibe_d_core.");
378 		}
379 	}
380 
381 	ubyte[] makeToken(ubyte[] authBuf)
382 	{
383 		auto pass1 = sha1Of(cast(const(ubyte)[])_pwd);
384 		auto pass2 = sha1Of(pass1);
385 
386 		SHA1 sha1;
387 		sha1.start();
388 		sha1.put(authBuf);
389 		sha1.put(pass2);
390 		auto result = sha1.finish();
391 		foreach (size_t i; 0..20)
392 			result[i] = result[i] ^ pass1[i];
393 		return result.dup;
394 	}
395 
396 	SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
397 	{
398 		SvrCapFlags common;
399 		uint filter = 1;
400 		foreach (size_t i; 0..uint.sizeof)
401 		{
402 			bool serverSupport = (server & filter) != 0; // can the server do this capability?
403 			bool clientSupport = (client & filter) != 0; // can we support it?
404 			if(serverSupport && clientSupport)
405 				common |= filter;
406 			filter <<= 1; // check next flag
407 		}
408 		return common;
409 	}
410 
411 	void setClientFlags(SvrCapFlags capFlags)
412 	{
413 		_cCaps = getCommonCapabilities(_sCaps, capFlags);
414 
415 		// We cannot operate in <4.1 protocol, so we'll force it even if the user
416 		// didn't supply it
417 		_cCaps |= SvrCapFlags.PROTOCOL41;
418 		_cCaps |= SvrCapFlags.SECURE_CONNECTION;
419 	}
420 
421 	void authenticate(ubyte[] greeting)
422 	in
423 	{
424 		assert(_open == OpenState.connected);
425 	}
426 	out
427 	{
428 		assert(_open == OpenState.authenticated);
429 	}
430 	body
431 	{
432 		auto token = makeToken(greeting);
433 		auto authPacket = buildAuthPacket(token);
434 		send(authPacket);
435 
436 		auto packet = getPacket();
437 		auto okp = OKErrorPacket(packet);
438 		enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
439 		_open = OpenState.authenticated;
440 	}
441 
442 	SvrCapFlags _clientCapabilities;
443 
444 	void connect(SvrCapFlags clientCapabilities)
445 	in
446 	{
447 		assert(closed);
448 	}
449 	out
450 	{
451 		assert(_open == OpenState.authenticated);
452 	}
453 	body
454 	{
455 		initConnection();
456 		auto greeting = parseGreeting();
457 		_open = OpenState.connected;
458 
459 		_clientCapabilities = clientCapabilities;
460 		setClientFlags(clientCapabilities);
461 		authenticate(greeting);
462 	}
463 	
464 	/// Forcefully close the socket without sending the quit command.
465 	/// Needed in case an error leaves communatations in an undefined or non-recoverable state.
466 	void kill()
467 	{
468 		if(_socket.connected)
469 			_socket.close();
470 		_open = OpenState.notConnected;
471 		// any pending data is gone. Any statements to release will be released
472 		// on the server automatically.
473 		_headersPending = _rowsPending = _binaryPending = false;
474 		statementsToRelease.clear();
475 	}
476 	
477 	/// Called whenever mysql-native needs to send a command to the server
478 	/// and be sure there aren't any pending results (which would prevent
479 	/// a new command from being sent).
480 	void autoPurge()
481 	{
482 		// This is called every time a command is sent,
483 		// so detect & prevent infinite recursion.
484 		static bool isAutoPurging = false;
485 
486 		if(isAutoPurging)
487 			return;
488 			
489 		isAutoPurging = true;
490 		scope(exit) isAutoPurging = false;
491 
492 		try
493 		{
494 			purgeResult();
495 			statementsToRelease.releaseAll();
496 		}
497 		catch(Exception e)
498 		{
499 			// likely the connection was closed, so reset any state.
500 			// Don't treat this as a real error, because everything will be reset when we
501 			// reconnect.
502 			kill();
503 		}
504 	}
505 
506 	/++
507 	Keeps track of prepared statements queued to be released from the server.
508 
509 	Prepared statements aren't released immediately, because that
510 	involves sending a command to the server even though there might be
511 	results pending. (Can't send a command while results are pending.)
512 	+/
513 	static struct StatementsToRelease(alias doRelease)
514 	{
515 		private Connection conn;
516 		
517 		/// Ids of prepared statements to be released.
518 		/// This uses assumeSafeAppend. Do not save copies of it.
519 		uint[] ids;
520 		
521 		void add(uint statementId)
522 		{
523 			ids ~= statementId;
524 		}
525 
526 		/// Removes a prepared statement from the list of statements
527 		/// to be released from the server.
528 		/// Does nothing if the statement isn't on the list.
529 		void remove(uint statementId)
530 		{
531 			foreach(ref id; ids)
532 			if(id == statementId)
533 				id = 0;
534 		}
535 
536 		/// Releases the prepared statements queued for release.
537 		void releaseAll()
538 		{
539 			foreach(id; ids)
540 			if(id != 0)
541 				doRelease(conn, id);
542 
543 			clear();
544 		}
545 
546 		// clear all statements, and reset for using again.
547 		private void clear()
548 		{
549 			if(ids.length)
550 			{
551 				ids.length = 0;
552 				assumeSafeAppend(ids);
553 			}
554 		}
555 	}
556 	StatementsToRelease!(PreparedImpl.immediateRelease) statementsToRelease;
557 	
558 	debug(MYSQL_INTEGRATION_TESTS) uint[] fakeRelease_released;
559 	unittest
560 	{
561 		debug(MYSQL_INTEGRATION_TESTS)
562 		{
563 			static void fakeRelease(Connection conn, uint id)
564 			{
565 				conn.fakeRelease_released ~= id;
566 			}
567 			
568 			mixin(scopedCn);
569 			
570 			StatementsToRelease!fakeRelease list;
571 			list.conn = cn;
572 			assert(list.ids == []);
573 			assert(cn.fakeRelease_released == []);
574 
575 			list.add(1);
576 			assert(list.ids == [1]);
577 			assert(cn.fakeRelease_released == []);
578 
579 			list.add(7);
580 			assert(list.ids == [1, 7]);
581 			assert(cn.fakeRelease_released == []);
582 
583 			list.add(9);
584 			assert(list.ids == [1, 7, 9]);
585 			assert(cn.fakeRelease_released == []);
586 
587 			list.remove(5);
588 			assert(list.ids == [1, 7, 9]);
589 			assert(cn.fakeRelease_released == []);
590 			
591 			list.remove(7);
592 			assert(list.ids == [1, 0, 9]);
593 			assert(cn.fakeRelease_released == []);
594 			
595 			list.releaseAll();
596 			assert(list.ids == []);
597 			assert(cn.fakeRelease_released == [1, 9]);
598 		}
599 	}
600 
601 public:
602 
603 	/++
604 	Construct opened connection.
605 
606 	Throws `mysql.exceptions.MYX` upon failure to connect.
607 	
608 	If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
609 	creating a new Connection directly. That will provide certain benefits,
610 	such as reusing old connections and automatic cleanup (no need to close
611 	the connection when done).
612 
613 	------------------
614 	// Suggested usage:
615 
616 	{
617 	    auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
618 	    scope(exit) con.close();
619 
620 	    // Use the connection
621 	    ...
622 	}
623 	------------------
624 
625 	Params:
626 		cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld"
627 			(TODO: The connection string needs work to allow for semicolons in its parts!)
628 		socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos,
629 			unless -version=Have_vibe_d_core is used.
630 		openSocket = Optional callback which should return a newly-opened Phobos
631 			or Vibe.d TCP socket. This allows custom sockets to be used,
632 			subclassed from Phobos's or Vibe.d's sockets.
633 		host = An IP address in numeric dotted form, or as a host  name.
634 		user = The user name to authenticate.
635 		password = Users password.
636 		db = Desired initial database.
637 		capFlags = The set of flag bits from the server's capabilities that the client requires
638 	+/
639 	//After the connection is created, and the initial invitation is received from the server
640 	//client preferences can be set, and authentication can then be attempted.
641 	this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
642 	{
643 		version(Have_vibe_d_core)
644 			enum defaultSocketType = MySQLSocketType.vibed;
645 		else
646 			enum defaultSocketType = MySQLSocketType.phobos;
647 
648 		this(defaultSocketType, host, user, pwd, db, port, capFlags);
649 	}
650 
651 	///ditto
652 	this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
653 	{
654 		version(Have_vibe_d_core) {} else
655 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
656 
657 		this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD,
658 			host, user, pwd, db, port, capFlags);
659 	}
660 
661 	///ditto
662 	this(OpenSocketCallbackPhobos openSocket,
663 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
664 	{
665 		this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags);
666 	}
667 
668 	version(Have_vibe_d_core)
669 	///ditto
670 	this(OpenSocketCallbackVibeD openSocket,
671 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
672 	{
673 		this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags);
674 	}
675 
676 	///ditto
677 	private this(MySQLSocketType socketType,
678 		OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD,
679 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
680 	in
681 	{
682 		final switch(socketType)
683 		{
684 			case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break;
685 			case MySQLSocketType.vibed:  assert(openSocketVibeD  !is null); break;
686 		}
687 	}
688 	body
689 	{
690 		enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1");
691 		enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection");
692 		version(Have_vibe_d_core) {} else
693 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
694 
695 		statementsToRelease.conn = this;
696 
697 		_socketType = socketType;
698 		_host = host;
699 		_user = user;
700 		_pwd = pwd;
701 		_db = db;
702 		_port = port;
703 
704 		_openSocketPhobos = openSocketPhobos;
705 		_openSocketVibeD  = openSocketVibeD;
706 
707 		connect(capFlags);
708 	}
709 
710 	///ditto
711 	//After the connection is created, and the initial invitation is received from the server
712 	//client preferences can be set, and authentication can then be attempted.
713 	this(string cs, SvrCapFlags capFlags = defaultClientFlags)
714 	{
715 		string[] a = parseConnectionString(cs);
716 		this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
717 	}
718 
719 	///ditto
720 	this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags)
721 	{
722 		string[] a = parseConnectionString(cs);
723 		this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
724 	}
725 
726 	///ditto
727 	this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
728 	{
729 		string[] a = parseConnectionString(cs);
730 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
731 	}
732 
733 	version(Have_vibe_d_core)
734 	///ditto
735 	this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
736 	{
737 		string[] a = parseConnectionString(cs);
738 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
739 	}
740 
741 	/++
742 	Check whether this Connection is still connected to the server, or if
743 	the connection has been closed.
744 	+/
745 	@property bool closed()
746 	{
747 		return _open == OpenState.notConnected || !_socket.connected;
748 	}
749 
750 	version(Have_vibe_d_core)
751 	{
752 		/// Used by Vibe.d's ConnectionPool, ignore this.
753 		void acquire() { if( _socket ) _socket.acquire(); }
754 		///ditto
755 		void release() { if( _socket ) _socket.release(); }
756 		///ditto
757 		bool isOwner() { return _socket ? _socket.isOwner() : false; }
758 		///ditto
759 		bool amOwner() { return _socket ? _socket.isOwner() : false; }
760 	}
761 	else
762 	{
763 		/// Used by Vibe.d's ConnectionPool, ignore this.
764 		void acquire() { /+ Do nothing +/ }
765 		///ditto
766 		void release() { /+ Do nothing +/ }
767 		///ditto
768 		bool isOwner() { return !!_socket; }
769 		///ditto
770 		bool amOwner() { return !!_socket; }
771 	}
772 
773 	/++
774 	Explicitly close the connection.
775 	
776 	This is a two-stage process. First tell the server we are quitting this
777 	connection, and then close the socket.
778 	
779 	Idiomatic use as follows is suggested:
780 	------------------
781 	{
782 	    auto con = new Connection("localhost:user:password:mysqld");
783 	    scope(exit) con.close();
784 	    // Use the connection
785 	    ...
786 	}
787 	------------------
788 	+/
789 	void close()
790 	{
791 		if (_open == OpenState.authenticated && _socket.connected)
792 			quit();
793 
794 		if (_open == OpenState.connected)
795 			kill();
796 		resetPacket();
797 	}
798 
799 	/++
800 	Reconnects to the server using the same connection settings originally
801 	used to create the Connection.
802 
803 	Optionally takes a SvrCapFlags, allowing you to reconnect using a different
804 	set of server capability flags (most users will not need to do this).
805 
806 	If the connection is already open, this will do nothing. However, if you
807 	request a different set of SvrCapFlags then was originally used to create
808 	the Connection, the connection will be closed and then reconnected.
809 	+/
810 	void reconnect()
811 	{
812 		reconnect(_clientCapabilities);
813 	}
814 
815 	///ditto
816 	void reconnect(SvrCapFlags clientCapabilities)
817 	{
818 		bool sameCaps = clientCapabilities == _clientCapabilities;
819 		if(!closed)
820 		{
821 			// Same caps as before?
822 			if(clientCapabilities == _clientCapabilities)
823 				return; // Nothing to do, just keep current connection
824 
825 			close();
826 		}
827 
828 		connect(clientCapabilities);
829 	}
830 
831 	private void quit()
832 	in
833 	{
834 		assert(_open == OpenState.authenticated);
835 	}
836 	body
837 	{
838 		sendCmd(CommandType.QUIT, []);
839 		// No response is sent for a quit packet
840 		_open = OpenState.connected;
841 	}
842 
843 	/++
844 	Parses a connection string of the form
845 	`"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"`
846 
847 	Port is optional and defaults to 3306.
848 
849 	Whitespace surrounding any name or value is automatically stripped.
850 
851 	Returns a five-element array of strings in this order:
852 	$(UL
853 	$(LI [0]: host)
854 	$(LI [1]: user)
855 	$(LI [2]: pwd)
856 	$(LI [3]: db)
857 	$(LI [4]: port)
858 	)
859 	
860 	(TODO: The connection string needs work to allow for semicolons in its parts!)
861 	+/
862 	//TODO: Replace the return value with a proper struct.
863 	static string[] parseConnectionString(string cs)
864 	{
865 		string[] rv;
866 		rv.length = 5;
867 		rv[4] = "3306"; // Default port
868 		string[] a = split(cs, ";");
869 		foreach (s; a)
870 		{
871 			string[] a2 = split(s, "=");
872 			enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs);
873 			string name = strip(a2[0]);
874 			string val = strip(a2[1]);
875 			switch (name)
876 			{
877 				case "host":
878 					rv[0] = val;
879 					break;
880 				case "user":
881 					rv[1] = val;
882 					break;
883 				case "pwd":
884 					rv[2] = val;
885 					break;
886 				case "db":
887 					rv[3] = val;
888 					break;
889 				case "port":
890 					rv[4] = val;
891 					break;
892 				default:
893 					throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__);
894 			}
895 		}
896 		return rv;
897 	}
898 
899 	/++
900 	Select a current database.
901 	
902 	Params: dbName = Name of the requested database
903 	Throws: MYX
904 	+/
905 	void selectDB(string dbName)
906 	{
907 		sendCmd(CommandType.INIT_DB, dbName);
908 		getCmdResponse();
909 		_db = dbName;
910 	}
911 
912 	/++
913 	Check the server status
914 	
915 	Returns: An OKErrorPacket from which server status can be determined
916 	Throws: MYX
917 	+/
918 	OKErrorPacket pingServer()
919 	{
920 		sendCmd(CommandType.PING, []);
921 		return getCmdResponse();
922 	}
923 
924 	/++
925 	Refresh some feature(s) of the server.
926 	
927 	Returns: An OKErrorPacket from which server status can be determined
928 	Throws: MYX
929 	+/
930 	OKErrorPacket refreshServer(RefreshFlags flags)
931 	{
932 		sendCmd(CommandType.REFRESH, [flags]);
933 		return getCmdResponse();
934 	}
935 
936 	/++
937 	Get the next Row of a pending result set.
938 	
939 	This method can be used after either execSQL() or execPrepared() have returned true
940 	to retrieve result set rows sequentially.
941 	
942 	Similar functionality is available via execSQLSequence() and execPreparedSequence() in
943 	which case the interface is presented as a forward range of Rows.
944 	
945 	This method allows you to deal with very large result sets either a row at a time,
946 	or by feeding the rows into some suitable container such as a linked list.
947 	
948 	Returns: A Row object.
949 	+/
950 	Row getNextRow()
951 	{
952 		scope(failure) kill();
953 
954 		if (_headersPending)
955 		{
956 			_rsh = ResultSetHeaders(this, _fieldCount);
957 			_headersPending = false;
958 		}
959 		ubyte[] packet;
960 		Row rr;
961 		packet = getPacket();
962 		if (packet.isEOFPacket())
963 		{
964 			_rowsPending = _binaryPending = false;
965 			return rr;
966 		}
967 		if (_binaryPending)
968 			rr = Row(this, packet, _rsh, true);
969 		else
970 			rr = Row(this, packet, _rsh, false);
971 		//rr._valid = true;
972 		return rr;
973 	}
974 
975 	/++
976 	Flush any outstanding result set elements.
977 	
978 	When the server responds to a command that produces a result set, it
979 	queues the whole set of corresponding packets over the current connection.
980 	Before that Connection can embark on any new command, it must receive
981 	all of those packets and junk them.
982 	http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/
983 	+/
984 	ulong purgeResult()
985 	{
986 		scope(failure) kill();
987 
988 		_lastCommandID++;
989 
990 		ulong rows = 0;
991 		if (_headersPending)
992 		{
993 			for (size_t i = 0;; i++)
994 			{
995 				if (getPacket().isEOFPacket())
996 				{
997 					_headersPending = false;
998 					break;
999 				}
1000 				enforceEx!MYXProtocol(i < _fieldCount,
1001 					text("Field header count (", _fieldCount, ") exceeded but no EOF packet found."));
1002 			}
1003 		}
1004 		if (_rowsPending)
1005 		{
1006 			for (;;  rows++)
1007 			{
1008 				if (getPacket().isEOFPacket())
1009 				{
1010 					_rowsPending = _binaryPending = false;
1011 					break;
1012 				}
1013 			}
1014 		}
1015 		resetPacket();
1016 		return rows;
1017 	}
1018 
1019 	/++
1020 	Get a textual report on the server status.
1021 	
1022 	(COM_STATISTICS)
1023 	+/
1024 	string serverStats()
1025 	{
1026 		sendCmd(CommandType.STATISTICS, []);
1027 		return cast(string) getPacket();
1028 	}
1029 
1030 	/++
1031 	Enable multiple statement commands
1032 	
1033 	This can be used later if this feature was not requested in the client capability flags.
1034 	
1035 	Params: on = Boolean value to turn the capability on or off.
1036 	+/
1037 	void enableMultiStatements(bool on)
1038 	{
1039 		scope(failure) kill();
1040 
1041 		ubyte[] t;
1042 		t.length = 2;
1043 		t[0] = on ? 0 : 1;
1044 		t[1] = 0;
1045 		sendCmd(CommandType.STMT_OPTION, t);
1046 
1047 		// For some reason this command gets an EOF packet as response
1048 		auto packet = getPacket();
1049 		enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1050 	}
1051 
1052 	/// Return the in-force protocol number
1053 	@property ubyte protocol() pure const nothrow { return _protocol; }
1054 	/// Server version
1055 	@property string serverVersion() pure const nothrow { return _serverVersion; }
1056 	/// Server capability flags
1057 	@property uint serverCapabilities() pure const nothrow { return _sCaps; }
1058 	/// Server status
1059 	@property ushort serverStatus() pure const nothrow { return _serverStatus; }
1060 	/// Current character set
1061 	@property ubyte charSet() pure const nothrow { return _sCharSet; }
1062 	/// Current database
1063 	@property string currentDB() pure const nothrow { return _db; }
1064 	/// Socket type being used
1065 	@property MySQLSocketType socketType() pure const nothrow { return _socketType; }
1066 
1067 	/// After a command that inserted a row into a table with an auto-increment
1068 	/// ID column, this method allows you to retrieve the last insert ID.
1069 	@property ulong lastInsertID() pure const nothrow { return _insertID; }
1070 
1071 	/// This gets incremented every time a command is issued or results are purged,
1072 	/// so a ResultRange can tell whether it's been invalidated.
1073 	@property ulong lastCommandID() pure const nothrow { return _lastCommandID; }
1074 
1075 	/// Gets whether rows are pending
1076 	@property bool rowsPending() pure const nothrow { return _rowsPending; }
1077 
1078 	/// Gets whether anything (rows, headers or binary) is pending.
1079 	/// New commands cannot be sent on a conncection while anything is pending.
1080 	@property bool hasPending() pure const nothrow
1081 	{
1082 		return _rowsPending || _headersPending || _binaryPending;
1083 	}
1084 
1085 	/// Gets the result header's field descriptions.
1086 	@property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; }
1087 }
1088 
1089 // unittest for issue 154, when the socket is disconnected from the mysql server.
1090 // This simulates a disconnect by closing the socket underneath the Connection
1091 // object itself.
1092 debug(MYSQL_INTEGRATION_TESTS)
1093 unittest
1094 {
1095 	mixin(scopedCn);
1096 
1097 	cn.exec("DROP TABLE IF EXISTS `dropConnection`");
1098 	cn.exec("CREATE TABLE `dropConnection` (
1099 		`val` INTEGER
1100 	) ENGINE=InnoDB DEFAULT CHARSET=utf8");
1101 	cn.exec("INSERT INTO `dropConnection` VALUES (1), (2), (3)");
1102 	import mysql.prepared;
1103 	{
1104 		auto prep = cn.prepare("SELECT * FROM `dropConnection`");
1105 		prep.query();
1106 	}
1107 	// close the socket forcibly
1108 	cn._socket.close();
1109 	// this should still work (it should reconnect).
1110 	cn.exec("DROP TABLE `dropConnection`");
1111 }