1 module mysql.connection;
2 
3 // Publically import rest of package for backwards compatability.
4 // These public imports will eventually be phased out.
5 public import mysql.common;
6 public import mysql.result;
7 public import mysql.protocol.commands;
8 public import mysql.protocol.constants;
9 public import mysql.protocol.extra_types;
10 public import mysql.protocol.packet_helpers;
11 public import mysql.protocol.packets;
12 debug(MYSQL_INTEGRATION_TESTS)
13 {
14 	public import mysql.test.common;
15 	public import mysql.test.integration;
16 	public import mysql.test.regression;
17 }
18 
19 version(Have_vibe_d_core)
20 {
21 	static if(__traits(compiles, (){ import vibe.core.net; } ))
22 		import vibe.core.net;
23 	else
24 		static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'.");
25 }
26 
27 import std.algorithm;
28 import std.conv;
29 import std.datetime;
30 import std.digest.sha;
31 import std.exception;
32 import std.range;
33 import std.socket;
34 import std.stdio;
35 import std.string;
36 import std.traits;
37 import std.variant;
38 
39 immutable SvrCapFlags defaultClientFlags =
40 		SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS |
41 		SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 |
42 		SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS |
43 		//SvrCapFlags.MULTI_RESULTS;
44 
45 /++
46 A struct representing a database connection.
47 
48 The Connection is responsible for handshaking with the server to establish
49 authentication. It then passes client preferences to the server, and
50 subsequently is the channel for all command packets that are sent, and all
51 response packets received.
52 
53 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one
54 byte as a packet number. Connection deals with the headers and ensures that
55 packet numbers are sequential.
56 
57 The initial packet is sent by the server - essentially a 'hello' packet
58 inviting login. That packet has a sequence number of zero. That sequence
59 number is the incremented by client and server packets through the handshake
60 sequence.
61 
62 After login all further sequences are initialized by the client sending a
63 command packet with a zero sequence number, to which the server replies with
64 zero or more packets with sequential sequence numbers.
65 +/
66 class Connection
67 {
68 package:
69 	enum OpenState
70 	{
71 		/// We have not yet connected to the server, or have sent QUIT to the
72 		/// server and closed the connection
73 		notConnected,
74 		/// We have connected to the server and parsed the greeting, but not
75 		/// yet authenticated
76 		connected,
77 		/// We have successfully authenticated against the server, and need to
78 		/// send QUIT to the server when closing the connection
79 		authenticated
80 	}
81 	OpenState   _open;
82 	MySQLSocket _socket;
83 
84 	SvrCapFlags _sCaps, _cCaps;
85 	uint    _sThread;
86 	ushort  _serverStatus;
87 	ubyte   _sCharSet, _protocol;
88 	string  _serverVersion;
89 
90 	string _host, _user, _pwd, _db;
91 	ushort _port;
92 
93 	MySQLSocketType _socketType;
94 
95 	OpenSocketCallbackPhobos _openSocketPhobos;
96 	OpenSocketCallbackVibeD  _openSocketVibeD;
97 
98 	ulong _insertID;
99 
100 	// This gets incremented every time a command is issued or results are purged,
101 	// so a ResultRange can tell whether it's been invalidated.
102 	ulong _lastCommandId;
103 
104 	// Whether there are rows, headers or bimary data waiting to be retreived.
105 	// MySQL protocol doesn't permit performing any other action until all
106 	// such data is read.
107 	bool _rowsPending, _headersPending, _binaryPending;
108 
109 	// Field count of last performed command.
110 	ushort _fieldCount;
111 
112 	// ResultSetHeaders of last performed command.
113 	ResultSetHeaders _rsh;
114 
115 	// This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise
116 	// you'll get the dreaded "packet out of order" message. It, and the socket connection are
117 	// the reason why most other objects require a connection object for their construction.
118 	ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct
119 				/// ordering. First packet should have 0
120 	@property ubyte pktNumber()   { return _cpn; }
121 	void bumpPacket()       { _cpn++; }
122 	void resetPacket()      { _cpn = 0; }
123 
124 	version(Have_vibe_d_core) {} else
125 	pure const nothrow invariant()
126 	{
127 		assert(_socketType != MySQLSocketType.vibed);
128 	}
129 
130 	void enforceNothingPending()
131 	{
132 		enforceEx!MYXDataPending(!hasPending);
133 	}
134 
135 	debug(MYSQL_INTEGRATION_TESTS)
136 	unittest
137 	{
138 		import mysql.protocol.prepared;
139 		import mysql.test.common : scopedCn;
140 		mixin(scopedCn);
141 
142 		cn.exec("DROP TABLE IF EXISTS `enforceNothingPending`");
143 		cn.exec("CREATE TABLE `enforceNothingPending` (
144 			`val` INTEGER
145 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
146 
147 		immutable insertSQL = "INSERT INTO `enforceNothingPending` VALUES (1), (2)";
148 		immutable selectSQL = "SELECT * FROM `enforceNothingPending`";
149 		Prepared preparedInsert;
150 		Prepared preparedSelect;
151 		assertNotThrown!MYXDataPending(cn.exec(insertSQL));
152 		assertNotThrown!MYXDataPending(cn.querySet(selectSQL));
153 		assertNotThrown!MYXDataPending(preparedInsert = cn.prepare(insertSQL));
154 		assertNotThrown!MYXDataPending(preparedSelect = cn.prepare(selectSQL));
155 		assertNotThrown!MYXDataPending(preparedInsert.exec());
156 		assertNotThrown!MYXDataPending(preparedSelect.querySet());
157 		
158 		auto resultSeq = cn.query(selectSQL);
159 		
160 		assertThrown!MYXDataPending(cn.exec(insertSQL));
161 		assertThrown!MYXDataPending(cn.querySet(selectSQL));
162 		assertThrown!MYXDataPending(cn.query(selectSQL));
163 		assertThrown!MYXDataPending(cn.prepare(selectSQL));
164 		assertThrown!MYXDataPending(preparedInsert.exec());
165 		assertThrown!MYXDataPending(preparedSelect.querySet());
166 
167 		resultSeq.each(); // Consume range
168 
169 		assertNotThrown!MYXDataPending(cn.exec(insertSQL));
170 		assertNotThrown!MYXDataPending(cn.querySet(selectSQL));
171 		assertNotThrown!MYXDataPending(cn.prepare(selectSQL));
172 		assertNotThrown!MYXDataPending(preparedInsert.exec());
173 		assertNotThrown!MYXDataPending(preparedSelect.querySet());
174 	}
175 
176 	ubyte[] getPacket()
177 	{
178 		scope(failure) kill();
179 
180 		ubyte[4] header;
181 		_socket.read(header);
182 		// number of bytes always set as 24-bit
183 		uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
184 		enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order");
185 		bumpPacket();
186 
187 		ubyte[] packet = new ubyte[numDataBytes];
188 		_socket.read(packet);
189 		assert(packet.length == numDataBytes, "Wrong number of bytes read");
190 		return packet;
191 	}
192 
193 	void send(const(ubyte)[] packet)
194 	in
195 	{
196 		assert(packet.length > 4); // at least 1 byte more than header
197 	}
198 	body
199 	{
200 		_socket.write(packet);
201 	}
202 
203 	void send(const(ubyte)[] header, const(ubyte)[] data)
204 	in
205 	{
206 		assert(header.length == 4 || header.length == 5/*command type included*/);
207 	}
208 	body
209 	{
210 		_socket.write(header);
211 		if(data.length)
212 			_socket.write(data);
213 	}
214 
215 	void sendCmd(T)(CommandType cmd, const(T)[] data)
216 	in
217 	{
218 		// Internal thread states. Clients shouldn't use this
219 		assert(cmd != CommandType.SLEEP);
220 		assert(cmd != CommandType.CONNECT);
221 		assert(cmd != CommandType.TIME);
222 		assert(cmd != CommandType.DELAYED_INSERT);
223 		assert(cmd != CommandType.CONNECT_OUT);
224 
225 		// Deprecated
226 		assert(cmd != CommandType.CREATE_DB);
227 		assert(cmd != CommandType.DROP_DB);
228 		assert(cmd != CommandType.TABLE_DUMP);
229 
230 		// cannot send more than uint.max bytes. TODO: better error message if we try?
231 		assert(data.length <= uint.max);
232 	}
233 	out
234 	{
235 		// at this point we should have sent a command
236 		assert(pktNumber == 1);
237 	}
238 	body
239 	{
240 		enforceEx!MYX(!(_headersPending || _rowsPending),
241 			"There are result set elements pending - purgeResult() required.");
242 
243 		scope(failure) kill();
244 
245 		_lastCommandId++;
246 
247 		if(!_socket.connected)
248 		{
249 			if(cmd == CommandType.QUIT)
250 				return; // Don't bother reopening connection just to quit
251 
252 			_open = OpenState.notConnected;
253 			connect(_clientCapabilities);
254 		}
255 
256 		resetPacket();
257 
258 		ubyte[] header;
259 		header.length = 4 /*header*/ + 1 /*cmd*/;
260 		header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/);
261 		header[4] = cmd;
262 		bumpPacket();
263 
264 		send(header, cast(const(ubyte)[])data);
265 	}
266 
267 	OKErrorPacket getCmdResponse(bool asString = false)
268 	{
269 		auto okp = OKErrorPacket(getPacket());
270 		enforcePacketOK(okp);
271 		_serverStatus = okp.serverStatus;
272 		return okp;
273 	}
274 
275 	ubyte[] buildAuthPacket(ubyte[] token)
276 	in
277 	{
278 		assert(token.length == 20);
279 	}
280 	body
281 	{
282 		ubyte[] packet;
283 		packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1);
284 		packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
285 
286 		// NOTE: we'll set the header last when we know the size
287 
288 		// Set the default capabilities required by the client
289 		_cCaps.packInto(packet[4..8]);
290 
291 		// Request a conventional maximum packet length.
292 		1.packInto(packet[8..12]);
293 
294 		packet ~= 33; // Set UTF-8 as default charSet
295 
296 		// There's a statutory block of zero bytes here - fill them in.
297 		foreach(i; 0 .. 23)
298 			packet ~= 0;
299 
300 		// Add the user name as a null terminated string
301 		foreach(i; 0 .. _user.length)
302 			packet ~= _user[i];
303 		packet ~= 0; // \0
304 
305 		// Add our calculated authentication token as a length prefixed string.
306 		assert(token.length <= ubyte.max);
307 		if(_pwd.length == 0)  // Omit the token if the account has no password
308 			packet ~= 0;
309 		else
310 		{
311 			packet ~= cast(ubyte)token.length;
312 			foreach(i; 0 .. token.length)
313 				packet ~= token[i];
314 		}
315 
316 		// Add the default database as a null terminated string
317 		foreach(i; 0 .. _db.length)
318 			packet ~= _db[i];
319 		packet ~= 0; // \0
320 
321 		// The server sent us a greeting with packet number 0, so we send the auth packet
322 		// back with the next number.
323 		packet.setPacketHeader(pktNumber);
324 		bumpPacket();
325 		return packet;
326 	}
327 
328 	void consumeServerInfo(ref ubyte[] packet)
329 	{
330 		scope(failure) kill();
331 
332 		_sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
333 		_sCharSet = packet.consume!ubyte(); // server_language
334 		_serverStatus = packet.consume!ushort(); //server_status
335 		_sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
336 		_sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
337 
338 		enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
339 		enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
340 	}
341 
342 	ubyte[] parseGreeting()
343 	{
344 		scope(failure) kill();
345 
346 		ubyte[] packet = getPacket();
347 
348 		if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
349 		{
350 			auto okp = OKErrorPacket(packet);
351 			enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
352 		}
353 
354 		_protocol = packet.consume!ubyte();
355 
356 		_serverVersion = packet.consume!string(packet.countUntil(0));
357 		packet.skip(1); // \0 terminated _serverVersion
358 
359 		_sThread = packet.consume!uint();
360 
361 		// read first part of scramble buf
362 		ubyte[] authBuf;
363 		authBuf.length = 255;
364 		authBuf[0..8] = packet.consume(8)[]; // scramble_buff
365 
366 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
367 
368 		consumeServerInfo(packet);
369 
370 		packet.skip(1); // this byte supposed to be scramble length, but is actually zero
371 		packet.skip(10); // filler of \0
372 
373 		// rest of the scramble
374 		auto len = packet.countUntil(0);
375 		enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
376 		enforce(authBuf.length > 8+len);
377 		authBuf[8..8+len] = packet.consume(len)[];
378 		authBuf.length = 8+len; // cut to correct size
379 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
380 
381 		return authBuf;
382 	}
383 
384 	static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port)
385 	{
386 		auto s = new PlainPhobosSocket();
387 		s.connect(new InternetAddress(host, port));
388 		return s;
389 	}
390 
391 	static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port)
392 	{
393 		version(Have_vibe_d_core)
394 			return vibe.core.net.connectTCP(host, port);
395 		else
396 			assert(0);
397 	}
398 
399 	void initConnection()
400 	{
401 		resetPacket();
402 		final switch(_socketType)
403 		{
404 			case MySQLSocketType.phobos:
405 				_socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port));
406 				break;
407 
408 			case MySQLSocketType.vibed:
409 				version(Have_vibe_d_core) {
410 					_socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port));
411 					break;
412 				} else assert(0, "Unsupported socket type. Need version Have_vibe_d_core.");
413 		}
414 	}
415 
416 	ubyte[] makeToken(ubyte[] authBuf)
417 	{
418 		auto pass1 = sha1Of(cast(const(ubyte)[])_pwd);
419 		auto pass2 = sha1Of(pass1);
420 
421 		SHA1 sha1;
422 		sha1.start();
423 		sha1.put(authBuf);
424 		sha1.put(pass2);
425 		auto result = sha1.finish();
426 		foreach (size_t i; 0..20)
427 			result[i] = result[i] ^ pass1[i];
428 		return result.dup;
429 	}
430 
431 	SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
432 	{
433 		SvrCapFlags common;
434 		uint filter = 1;
435 		foreach (size_t i; 0..uint.sizeof)
436 		{
437 			bool serverSupport = (server & filter) != 0; // can the server do this capability?
438 			bool clientSupport = (client & filter) != 0; // can we support it?
439 			if(serverSupport && clientSupport)
440 				common |= filter;
441 			filter <<= 1; // check next flag
442 		}
443 		return common;
444 	}
445 
446 	void setClientFlags(SvrCapFlags capFlags)
447 	{
448 		_cCaps = getCommonCapabilities(_sCaps, capFlags);
449 
450 		// We cannot operate in <4.1 protocol, so we'll force it even if the user
451 		// didn't supply it
452 		_cCaps |= SvrCapFlags.PROTOCOL41;
453 		_cCaps |= SvrCapFlags.SECURE_CONNECTION;
454 	}
455 
456 	void authenticate(ubyte[] greeting)
457 	in
458 	{
459 		assert(_open == OpenState.connected);
460 	}
461 	out
462 	{
463 		assert(_open == OpenState.authenticated);
464 	}
465 	body
466 	{
467 		auto token = makeToken(greeting);
468 		auto authPacket = buildAuthPacket(token);
469 		send(authPacket);
470 
471 		auto packet = getPacket();
472 		auto okp = OKErrorPacket(packet);
473 		enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
474 		_open = OpenState.authenticated;
475 	}
476 
477 	SvrCapFlags _clientCapabilities;
478 
479 	void connect(SvrCapFlags clientCapabilities)
480 	in
481 	{
482 		assert(closed);
483 	}
484 	out
485 	{
486 		assert(_open == OpenState.authenticated);
487 	}
488 	body
489 	{
490 		initConnection();
491 		auto greeting = parseGreeting();
492 		_open = OpenState.connected;
493 
494 		_clientCapabilities = clientCapabilities;
495 		setClientFlags(clientCapabilities);
496 		authenticate(greeting);
497 	}
498 	
499 	// Forcefully close the socket without sending the quit command.
500 	// Needed in case an error leaves communatations in an undefined or non-recoverable state.
501 	void kill()
502 	{
503 		if(_socket.connected)
504 			_socket.close();
505 		_open = OpenState.notConnected;
506 	}
507 	
508 public:
509 
510 	/++
511 	Construct opened connection.
512 	
513 	After the connection is created, and the initial invitation is received from the server
514 	client preferences can be set, and authentication can then be attempted.
515 	
516 	Parameters:
517 	   socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos,
518 	                unless -version=Have_vibe_d_core is used.
519 	   openSocket = Optional callback which should return a newly-opened Phobos
520 	                or Vibe.d TCP socket. This allows custom sockets to be used,
521 	                subclassed from Phobos's or Vibe.d's sockets.
522 	   host = An IP address in numeric dotted form, or as a host  name.
523 	   user = The user name to authenticate.
524 	   password = Users password.
525 	   db = Desired initial database.
526 	   capFlags = The set of flag bits from the server's capabilities that the client requires
527 	+/
528 	this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
529 	{
530 		version(Have_vibe_d_core)
531 			enum defaultSocketType = MySQLSocketType.vibed;
532 		else
533 			enum defaultSocketType = MySQLSocketType.phobos;
534 
535 		this(defaultSocketType, host, user, pwd, db, port, capFlags);
536 	}
537 
538 	///ditto
539 	this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
540 	{
541 		version(Have_vibe_d_core) {} else
542 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
543 
544 		this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD,
545 			host, user, pwd, db, port, capFlags);
546 	}
547 
548 	///ditto
549 	this(OpenSocketCallbackPhobos openSocket,
550 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
551 	{
552 		this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags);
553 	}
554 
555 	version(Have_vibe_d_core)
556 	///ditto
557 	this(OpenSocketCallbackVibeD openSocket,
558 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
559 	{
560 		this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags);
561 	}
562 
563 	private this(MySQLSocketType socketType,
564 		OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD,
565 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
566 	in
567 	{
568 		final switch(socketType)
569 		{
570 			case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break;
571 			case MySQLSocketType.vibed:  assert(openSocketVibeD  !is null); break;
572 		}
573 	}
574 	body
575 	{
576 		enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1");
577 		enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection");
578 		version(Have_vibe_d_core) {} else
579 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
580 
581 		_socketType = socketType;
582 		_host = host;
583 		_user = user;
584 		_pwd = pwd;
585 		_db = db;
586 		_port = port;
587 
588 		_openSocketPhobos = openSocketPhobos;
589 		_openSocketVibeD  = openSocketVibeD;
590 
591 		connect(capFlags);
592 	}
593 
594 	/++
595 	Construct opened connection.
596 	
597 	After the connection is created, and the initial invitation is received from
598 	the server client preferences are set, and authentication can then be attempted.
599 	
600 	TBD The connection string needs work to allow for semicolons in its parts!
601 	
602 	Parameters:
603 	   socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos
604 	                unless -version=Have_vibe_d_core is used.
605 	   openSocket = Optional callback which should return a newly-opened Phobos
606 	                or Vibe.d TCP socket. This allows custom sockets to be used,
607 	                subclassed from Phobos's or Vibe.d's sockets.
608 	   cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld"
609 	   capFlags = The set of flag bits from the server's capabilities that the client requires
610 	+/
611 	this(string cs, SvrCapFlags capFlags = defaultClientFlags)
612 	{
613 		string[] a = parseConnectionString(cs);
614 		this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
615 	}
616 
617 	///ditto
618 	this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags)
619 	{
620 		string[] a = parseConnectionString(cs);
621 		this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
622 	}
623 
624 	///ditto
625 	this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
626 	{
627 		string[] a = parseConnectionString(cs);
628 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
629 	}
630 
631 	version(Have_vibe_d_core)
632 	///ditto
633 	this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
634 	{
635 		string[] a = parseConnectionString(cs);
636 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
637 	}
638 
639 	@property bool closed()
640 	{
641 		return _open == OpenState.notConnected || !_socket.connected;
642 	}
643 
644 	version(Have_vibe_d_core)
645 	{
646 		void acquire() { if( _socket ) _socket.acquire(); }
647 		void release() { if( _socket ) _socket.release(); }
648 		bool isOwner() { return _socket ? _socket.isOwner() : false; }
649 		bool amOwner() { return _socket ? _socket.isOwner() : false; }
650 	}
651 	else
652 	{
653 		void acquire() { /+ Do nothing +/ }
654 		void release() { /+ Do nothing +/ }
655 		bool isOwner() { return !!_socket; }
656 		bool amOwner() { return !!_socket; }
657 	}
658 
659 	/++
660 	Explicitly close the connection.
661 	
662 	This is a two-stage process. First tell the server we are quitting this
663 	connection, and then close the socket.
664 	
665 	Idiomatic use as follows is suggested:
666 	------------------
667 	{
668 	    auto con = Connection("localhost:user:password:mysqld");
669 	    scope(exit) con.close();
670 	    // Use the connection
671 	    ...
672 	}
673 	------------------
674 	+/
675 	void close()
676 	{
677 		if (_open == OpenState.authenticated && _socket.connected)
678 			quit();
679 
680 		if (_open == OpenState.connected)
681 			kill();
682 		resetPacket();
683 	}
684 
685 	void reconnect()
686 	{
687 		reconnect(_clientCapabilities);
688 	}
689 
690 	void reconnect(SvrCapFlags clientCapabilities)
691 	{
692 		bool sameCaps = clientCapabilities == _clientCapabilities;
693 		if(!closed)
694 		{
695 			// Same caps as before?
696 			if(clientCapabilities == _clientCapabilities)
697 				return; // Nothing to do, just keep current connection
698 
699 			close();
700 		}
701 
702 		connect(clientCapabilities);
703 	}
704 
705 	private void quit()
706 	in
707 	{
708 		assert(_open == OpenState.authenticated);
709 	}
710 	body
711 	{
712 		sendCmd(CommandType.QUIT, []);
713 		// No response is sent for a quit packet
714 		_open = OpenState.connected;
715 	}
716 
717 	static string[] parseConnectionString(string cs)
718 	{
719 		string[] rv;
720 		rv.length = 5;
721 		rv[4] = "3306"; // Default port
722 		string[] a = split(cs, ";");
723 		foreach (s; a)
724 		{
725 			string[] a2 = split(s, "=");
726 			enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs);
727 			string name = strip(a2[0]);
728 			string val = strip(a2[1]);
729 			switch (name)
730 			{
731 				case "host":
732 					rv[0] = val;
733 					break;
734 				case "user":
735 					rv[1] = val;
736 					break;
737 				case "pwd":
738 					rv[2] = val;
739 					break;
740 				case "db":
741 					rv[3] = val;
742 					break;
743 				case "port":
744 					rv[4] = val;
745 					break;
746 				default:
747 					throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__);
748 			}
749 		}
750 		return rv;
751 	}
752 
753 	/++
754 	Select a current database.
755 	
756 	Params: dbName = Name of the requested database
757 	Throws: MySQLException
758 	+/
759 	void selectDB(string dbName)
760 	{
761 		sendCmd(CommandType.INIT_DB, dbName);
762 		getCmdResponse();
763 		_db = dbName;
764 	}
765 
766 	/++
767 	Check the server status
768 	
769 	Returns: An OKErrorPacket from which server status can be determined
770 	Throws: MySQLException
771 	+/
772 	OKErrorPacket pingServer()
773 	{
774 		sendCmd(CommandType.PING, []);
775 		return getCmdResponse();
776 	}
777 
778 	/++
779 	Refresh some feature(s) of the server.
780 	
781 	Returns: An OKErrorPacket from which server status can be determined
782 	Throws: MySQLException
783 	+/
784 	OKErrorPacket refreshServer(RefreshFlags flags)
785 	{
786 		sendCmd(CommandType.REFRESH, [flags]);
787 		return getCmdResponse();
788 	}
789 
790 	/++
791 	Get the next Row of a pending result set.
792 	
793 	This method can be used after either execSQL() or execPrepared() have returned true
794 	to retrieve result set rows sequentially.
795 	
796 	Similar functionality is available via execSQLSequence() and execPreparedSequence() in
797 	which case the interface is presented as a forward range of Rows.
798 	
799 	This method allows you to deal with very large result sets either a row at a time,
800 	or by feeding the rows into some suitable container such as a linked list.
801 	
802 	Returns: A Row object.
803 	+/
804 	Row getNextRow()
805 	{
806 		scope(failure) kill();
807 
808 		if (_headersPending)
809 		{
810 			_rsh = ResultSetHeaders(this, _fieldCount);
811 			_headersPending = false;
812 		}
813 		ubyte[] packet;
814 		Row rr;
815 		packet = getPacket();
816 		if (packet.isEOFPacket())
817 		{
818 			_rowsPending = _binaryPending = false;
819 			return rr;
820 		}
821 		if (_binaryPending)
822 			rr = Row(this, packet, _rsh, true);
823 		else
824 			rr = Row(this, packet, _rsh, false);
825 		//rr._valid = true;
826 		return rr;
827 	}
828 
829 	/++
830 	Flush any outstanding result set elements.
831 	
832 	When the server responds to a command that produces a result set, it
833 	queues the whole set of corresponding packets over the current connection.
834 	Before that Connection can embark on any new command, it must receive
835 	all of those packets and junk them.
836 	http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/
837 	+/
838 	ulong purgeResult()
839 	{
840 		scope(failure) kill();
841 
842 		_lastCommandId++;
843 
844 		ulong rows = 0;
845 		if (_headersPending)
846 		{
847 			for (size_t i = 0;; i++)
848 			{
849 				if (getPacket().isEOFPacket())
850 				{
851 					_headersPending = false;
852 					break;
853 				}
854 				enforceEx!MYXProtocol(i < _fieldCount,
855 					text("Field header count (", _fieldCount, ") exceeded but no EOF packet found."));
856 			}
857 		}
858 		if (_rowsPending)
859 		{
860 			for (;;  rows++)
861 			{
862 				if (getPacket().isEOFPacket())
863 				{
864 					_rowsPending = _binaryPending = false;
865 					break;
866 				}
867 			}
868 		}
869 		resetPacket();
870 		return rows;
871 	}
872 
873 	/++
874 	Get a textual report on the server status.
875 	
876 	(COM_STATISTICS)
877 	+/
878 	string serverStats()
879 	{
880 		sendCmd(CommandType.STATISTICS, []);
881 		return cast(string) getPacket();
882 	}
883 
884 	/++
885 	Enable multiple statement commands
886 	
887 	This can be used later if this feature was not requested in the client capability flags.
888 	
889 	Params: on = Boolean value to turn the capability on or off.
890 	+/
891 	void enableMultiStatements(bool on)
892 	{
893 		scope(failure) kill();
894 
895 		ubyte[] t;
896 		t.length = 2;
897 		t[0] = on ? 0 : 1;
898 		t[1] = 0;
899 		sendCmd(CommandType.STMT_OPTION, t);
900 
901 		// For some reason this command gets an EOF packet as response
902 		auto packet = getPacket();
903 		enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
904 	}
905 
906 	/// Return the in-force protocol number
907 	@property ubyte protocol() pure const nothrow { return _protocol; }
908 	/// Server version
909 	@property string serverVersion() pure const nothrow { return _serverVersion; }
910 	/// Server capability flags
911 	@property uint serverCapabilities() pure const nothrow { return _sCaps; }
912 	/// Server status
913 	@property ushort serverStatus() pure const nothrow { return _serverStatus; }
914 	/// Current character set
915 	@property ubyte charSet() pure const nothrow { return _sCharSet; }
916 	/// Current database
917 	@property string currentDB() pure const nothrow { return _db; }
918 	/// Socket type being used
919 	@property MySQLSocketType socketType() pure const nothrow { return _socketType; }
920 
921 	/// After a command that inserted a row into a table with an auto-increment
922 	/// ID column, this method allows you to retrieve the last insert ID.
923 	@property ulong lastInsertID() pure const nothrow { return _insertID; }
924 
925 	/// This gets incremented every time a command is issued or results are purged,
926 	/// so a ResultRange can tell whether it's been invalidated.
927 	@property ulong lastCommandId() pure const nothrow { return _lastCommandId; }
928 
929 	/// Gets whether rows are pending
930 	@property bool rowsPending() pure const nothrow { return _rowsPending; }
931 
932 	/// Gets whether anything (rows, headers or binary) is pending.
933 	/// New commands cannot be sent on a conncection while anything is pending.
934 	@property bool hasPending() pure const nothrow
935 	{
936 		return _rowsPending || _headersPending || _binaryPending;
937 	}
938 
939 	/// Gets the result header's field descriptions.
940 	@property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; }
941 }