1 /// Connect to a MySQL/MariaDB server.
2 module mysql.connection;
3 
4 import std.algorithm;
5 import std.conv;
6 import std.digest.sha;
7 import std.exception;
8 import std.range;
9 import std.socket;
10 import std.string;
11 import std.typecons;
12 
13 import mysql.commands;
14 import mysql.exceptions;
15 import mysql.prepared;
16 import mysql.protocol.constants;
17 import mysql.protocol.packets;
18 import mysql.protocol.sockets;
19 import mysql.result;
20 debug(MYSQLN_TESTS)
21 {
22 	import mysql.test.common;
23 }
24 
25 version(Have_vibe_d_core)
26 {
27 	static if(__traits(compiles, (){ import vibe.core.net; } ))
28 		import vibe.core.net;
29 	else
30 		static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'.");
31 }
32 
33 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection.
34 immutable SvrCapFlags defaultClientFlags =
35 		SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS |
36 		SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 |
37 		SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS |
38 		//SvrCapFlags.MULTI_RESULTS;
39 
40 /++
41 Submit an SQL command to the server to be compiled into a prepared statement.
42 
43 This will automatically register the prepared statement on the provided connection.
44 The resulting `mysql.prepared.Prepared` can then be used freely on ANY `Connection`,
45 as it will automatically be registered upon its first use on other connections.
46 Or, pass it to `Connection.register` if you prefer eager registration.
47 
48 Internally, the result of a successful outcome will be a statement handle - an ID -
49 for the prepared statement, a count of the parameters required for
50 execution of the statement, and a count of the columns that will be present
51 in any result set that the command generates.
52 
53 The server will then proceed to send prepared statement headers,
54 including parameter descriptions, and result set field descriptions,
55 followed by an EOF packet.
56 
57 Throws: `mysql.exceptions.MYX` if the server has a problem.
58 +/
59 Prepared prepare(Connection conn, string sql)
60 {
61 	auto info = conn.registerIfNeeded(sql);
62 	return Prepared(sql, info.headers, info.numParams);
63 }
64 
65 /++
66 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0.
67 
68 See `BackwardCompatPrepared` for more info.
69 +/
70 deprecated("This is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. You should migrate from this to the Prepared-compatible exec/query overloads in 'mysql.commands'.")
71 BackwardCompatPrepared prepareBackwardCompat(Connection conn, string sql)
72 {
73 	return prepareBackwardCompatImpl(conn, sql);
74 }
75 
76 /// Allow mysql-native tests to get around the deprecation message
77 package BackwardCompatPrepared prepareBackwardCompatImpl(Connection conn, string sql)
78 {
79 	return BackwardCompatPrepared(conn, prepare(conn, sql));
80 }
81 
82 /++
83 Convenience function to create a prepared statement which calls a stored function.
84 
85 Be careful that your `numArgs` is correct. If it isn't, you may get a
86 `mysql.exceptions.MYX` with a very unclear error message.
87 
88 Throws: `mysql.exceptions.MYX` if the server has a problem.
89 
90 Params:
91 	name = The name of the stored function.
92 	numArgs = The number of arguments the stored procedure takes.
93 +/
94 Prepared prepareFunction(Connection conn, string name, int numArgs)
95 {
96 	auto sql = "select " ~ name ~ preparedPlaceholderArgs(numArgs);
97 	return prepare(conn, sql);
98 }
99 
100 ///
101 @("prepareFunction")
102 debug(MYSQLN_TESTS)
103 unittest
104 {
105 	import mysql.test.common;
106 	mixin(scopedCn);
107 
108 	exec(cn, `DROP FUNCTION IF EXISTS hello`);
109 	exec(cn, `
110 		CREATE FUNCTION hello (s CHAR(20))
111 		RETURNS CHAR(50) DETERMINISTIC
112 		RETURN CONCAT('Hello ',s,'!')
113 	`);
114 
115 	auto preparedHello = prepareFunction(cn, "hello", 1);
116 	preparedHello.setArgs("World");
117 	auto rs = cn.query(preparedHello).array;
118 	assert(rs.length == 1);
119 	assert(rs[0][0] == "Hello World!");
120 }
121 
122 /++
123 Convenience function to create a prepared statement which calls a stored procedure.
124 
125 OUT parameters are currently not supported. It should generally be
126 possible with MySQL to present them as a result set.
127 
128 Be careful that your `numArgs` is correct. If it isn't, you may get a
129 `mysql.exceptions.MYX` with a very unclear error message.
130 
131 Throws: `mysql.exceptions.MYX` if the server has a problem.
132 
133 Params:
134 	name = The name of the stored procedure.
135 	numArgs = The number of arguments the stored procedure takes.
136 
137 +/
138 Prepared prepareProcedure(Connection conn, string name, int numArgs)
139 {
140 	auto sql = "call " ~ name ~ preparedPlaceholderArgs(numArgs);
141 	return prepare(conn, sql);
142 }
143 
144 ///
145 @("prepareProcedure")
146 debug(MYSQLN_TESTS)
147 unittest
148 {
149 	import mysql.test.common;
150 	import mysql.test.integration;
151 	mixin(scopedCn);
152 	initBaseTestTables(cn);
153 
154 	exec(cn, `DROP PROCEDURE IF EXISTS insert2`);
155 	exec(cn, `
156 		CREATE PROCEDURE insert2 (IN p1 INT, IN p2 CHAR(50))
157 		BEGIN
158 			INSERT INTO basetest (intcol, stringcol) VALUES(p1, p2);
159 		END
160 	`);
161 
162 	auto preparedInsert2 = prepareProcedure(cn, "insert2", 2);
163 	preparedInsert2.setArgs(2001, "inserted string 1");
164 	cn.exec(preparedInsert2);
165 
166 	auto rs = query(cn, "SELECT stringcol FROM basetest WHERE intcol=2001").array;
167 	assert(rs.length == 1);
168 	assert(rs[0][0] == "inserted string 1");
169 }
170 
171 private string preparedPlaceholderArgs(int numArgs)
172 {
173 	auto sql = "(";
174 	bool comma = false;
175 	foreach(i; 0..numArgs)
176 	{
177 		if (comma)
178 			sql ~= ",?";
179 		else
180 		{
181 			sql ~= "?";
182 			comma = true;
183 		}
184 	}
185 	sql ~= ")";
186 
187 	return sql;
188 }
189 
190 @("preparedPlaceholderArgs")
191 debug(MYSQLN_TESTS)
192 unittest
193 {
194 	assert(preparedPlaceholderArgs(3) == "(?,?,?)");
195 	assert(preparedPlaceholderArgs(2) == "(?,?)");
196 	assert(preparedPlaceholderArgs(1) == "(?)");
197 	assert(preparedPlaceholderArgs(0) == "()");
198 }
199 
200 /// Per-connection info from the server about a registered prepared statement.
201 package struct PreparedServerInfo
202 {
203 	/// Server's identifier for this prepared statement.
204 	/// Apperently, this is never 0 if it's been registered,
205 	/// although mysql-native no longer relies on that.
206 	uint statementId;
207 
208 	ushort psWarnings;
209 
210 	/// Number of parameters this statement takes.
211 	/// 
212 	/// This will be the same on all connections, but it's returned
213 	/// by the server upon registration, so it's stored here.
214 	ushort numParams;
215 
216 	/// Prepared statement headers
217 	///
218 	/// This will be the same on all connections, but it's returned
219 	/// by the server upon registration, so it's stored here.
220 	PreparedStmtHeaders headers;
221 	
222 	/// Not actually from the server. Connection uses this to keep track
223 	/// of statements that should be treated as having been released.
224 	bool queuedForRelease = false;
225 }
226 
227 /++
228 This is a wrapper over `mysql.prepared.Prepared`, provided ONLY as a
229 temporary aid in upgrading to mysql-native v2.0.0 and its
230 new connection-independent model of prepared statements. See the
231 $(LINK2 https://github.com/mysql-d/mysql-native/blob/master/MIGRATING_TO_V2.md, migration guide)
232 for more info.
233 
234 In most cases, this layer shouldn't even be needed. But if you have many
235 lines of code making calls to exec/query the same prepared statement,
236 then this may be helpful.
237 
238 To use this temporary compatability layer, change instances of:
239 
240 ---
241 auto stmt = conn.prepare(...);
242 ---
243 
244 to this:
245 
246 ---
247 auto stmt = conn.prepareBackwardCompat(...);
248 ---
249 
250 And then your prepared statement should work as before.
251 
252 BUT DO NOT LEAVE IT LIKE THIS! Ultimately, you should update
253 your prepared statement code to the mysql-native v2.0.0 API, by changing
254 instances of:
255 
256 ---
257 stmt.exec()
258 stmt.query()
259 stmt.queryRow()
260 stmt.queryRowTuple(outputArgs...)
261 stmt.queryValue()
262 ---
263 
264 to this:
265 
266 ---
267 conn.exec(stmt)
268 conn.query(stmt)
269 conn.queryRow(stmt)
270 conn.queryRowTuple(stmt, outputArgs...)
271 conn.queryValue(stmt)
272 ---
273 
274 Both of the above syntaxes can be used with a `BackwardCompatPrepared`
275 (the `Connection` passed directly to `mysql.commands.exec`/`mysql.commands.query`
276 will override the one embedded associated with your `BackwardCompatPrepared`).
277 
278 Once all of your code is updated, you can change `prepareBackwardCompat`
279 back to `prepare` again, and your upgrade will be complete.
280 +/
281 struct BackwardCompatPrepared
282 {
283 	import std.variant;
284 	
285 	private Connection _conn;
286 	Prepared _prepared;
287 
288 	/// Access underlying `Prepared`
289 	@property Prepared prepared() { return _prepared; }
290 
291 	alias _prepared this;
292 
293 	/++
294 	This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0.
295 	
296 	See `BackwardCompatPrepared` for more info.
297 	+/
298 	deprecated("Change 'preparedStmt.exec()' to 'conn.exec(preparedStmt)'")
299 	ulong exec()
300 	{
301 		return .exec(_conn, _prepared);
302 	}
303 
304 	///ditto
305 	deprecated("Change 'preparedStmt.query()' to 'conn.query(preparedStmt)'")
306 	ResultRange query()
307 	{
308 		return .query(_conn, _prepared);
309 	}
310 
311 	///ditto
312 	deprecated("Change 'preparedStmt.queryRow()' to 'conn.queryRow(preparedStmt)'")
313 	Nullable!Row queryRow()
314 	{
315 		return .queryRow(_conn, _prepared);
316 	}
317 
318 	///ditto
319 	deprecated("Change 'preparedStmt.queryRowTuple(outArgs...)' to 'conn.queryRowTuple(preparedStmt, outArgs...)'")
320 	void queryRowTuple(T...)(ref T args) if(T.length == 0 || !is(T[0] : Connection))
321 	{
322 		return .queryRowTuple(_conn, _prepared, args);
323 	}
324 
325 	///ditto
326 	deprecated("Change 'preparedStmt.queryValue()' to 'conn.queryValue(preparedStmt)'")
327 	Nullable!Variant queryValue()
328 	{
329 		return .queryValue(_conn, _prepared);
330 	}
331 }
332 
333 //TODO: All low-level commms should be moved into the mysql.protocol package.
334 /// Low-level comms code relating to prepared statements.
335 package struct ProtocolPrepared
336 {
337 	import std.conv;
338 	import std.datetime;
339 	import std.variant;
340 	import mysql.types;
341 	
342 	static ubyte[] makeBitmap(in Variant[] inParams)
343 	{
344 		size_t bml = (inParams.length+7)/8;
345 		ubyte[] bma;
346 		bma.length = bml;
347 		foreach (i; 0..inParams.length)
348 		{
349 			if(inParams[i].type != typeid(typeof(null)))
350 				continue;
351 			size_t bn = i/8;
352 			size_t bb = i%8;
353 			ubyte sr = 1;
354 			sr <<= bb;
355 			bma[bn] |= sr;
356 		}
357 		return bma;
358 	}
359 
360 	static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow
361 	{
362 		ubyte[] prefix;
363 		prefix.length = 14;
364 
365 		prefix[4] = CommandType.STMT_EXECUTE;
366 		hStmt.packInto(prefix[5..9]);
367 		prefix[9] = flags;   // flags, no cursor
368 		prefix[10] = 1; // iteration count - currently always 1
369 		prefix[11] = 0;
370 		prefix[12] = 0;
371 		prefix[13] = 0;
372 
373 		return prefix;
374 	}
375 
376 	//TODO: All low-level commms should be moved into the mysql.protocol package.
377 	static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa,
378 		out ubyte[] vals, out bool longData)
379 	{
380 		size_t pc = inParams.length;
381 		ubyte[] types;
382 		types.length = pc*2;
383 		size_t alloc = pc*20;
384 		vals.length = alloc;
385 		uint vcl = 0, len;
386 		int ct = 0;
387 
388 		void reAlloc(size_t n)
389 		{
390 			if (vcl+n < alloc)
391 				return;
392 			size_t inc = (alloc*3)/2;
393 			if (inc <  n)
394 				inc = n;
395 			alloc += inc;
396 			vals.length = alloc;
397 		}
398 
399 		foreach (size_t i; 0..pc)
400 		{
401 			enum UNSIGNED  = 0x80;
402 			enum SIGNED    = 0;
403 			if (psa[i].chunkSize)
404 				longData= true;
405 			if (inParams[i].type == typeid(typeof(null)))
406 			{
407 				types[ct++] = SQLType.NULL;
408 				types[ct++] = SIGNED;
409 				continue;
410 			}
411 			Variant v = inParams[i];
412 			SQLType ext = psa[i].type;
413 			string ts = v.type.toString();
414 			bool isRef;
415 			if (ts[$-1] == '*')
416 			{
417 				ts.length = ts.length-1;
418 				isRef= true;
419 			}
420 
421 			switch (ts)
422 			{
423 				case "bool":
424 					if (ext == SQLType.INFER_FROM_D_TYPE)
425 						types[ct++] = SQLType.BIT;
426 					else
427 						types[ct++] = cast(ubyte) ext;
428 					types[ct++] = SIGNED;
429 					reAlloc(2);
430 					bool bv = isRef? *(v.get!(bool*)): v.get!(bool);
431 					vals[vcl++] = 1;
432 					vals[vcl++] = bv? 0x31: 0x30;
433 					break;
434 				case "byte":
435 					types[ct++] = SQLType.TINY;
436 					types[ct++] = SIGNED;
437 					reAlloc(1);
438 					vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte);
439 					break;
440 				case "ubyte":
441 					types[ct++] = SQLType.TINY;
442 					types[ct++] = UNSIGNED;
443 					reAlloc(1);
444 					vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte);
445 					break;
446 				case "short":
447 					types[ct++] = SQLType.SHORT;
448 					types[ct++] = SIGNED;
449 					reAlloc(2);
450 					short si = isRef? *(v.get!(short*)): v.get!(short);
451 					vals[vcl++] = cast(ubyte) (si & 0xff);
452 					vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
453 					break;
454 				case "ushort":
455 					types[ct++] = SQLType.SHORT;
456 					types[ct++] = UNSIGNED;
457 					reAlloc(2);
458 					ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort);
459 					vals[vcl++] = cast(ubyte) (us & 0xff);
460 					vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
461 					break;
462 				case "int":
463 					types[ct++] = SQLType.INT;
464 					types[ct++] = SIGNED;
465 					reAlloc(4);
466 					int ii = isRef? *(v.get!(int*)): v.get!(int);
467 					vals[vcl++] = cast(ubyte) (ii & 0xff);
468 					vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
469 					vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
470 					vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
471 					break;
472 				case "uint":
473 					types[ct++] = SQLType.INT;
474 					types[ct++] = UNSIGNED;
475 					reAlloc(4);
476 					uint ui = isRef? *(v.get!(uint*)): v.get!(uint);
477 					vals[vcl++] = cast(ubyte) (ui & 0xff);
478 					vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
479 					vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
480 					vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
481 					break;
482 				case "long":
483 					types[ct++] = SQLType.LONGLONG;
484 					types[ct++] = SIGNED;
485 					reAlloc(8);
486 					long li = isRef? *(v.get!(long*)): v.get!(long);
487 					vals[vcl++] = cast(ubyte) (li & 0xff);
488 					vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
489 					vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
490 					vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
491 					vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
492 					vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
493 					vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
494 					vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
495 					break;
496 				case "ulong":
497 					types[ct++] = SQLType.LONGLONG;
498 					types[ct++] = UNSIGNED;
499 					reAlloc(8);
500 					ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong);
501 					vals[vcl++] = cast(ubyte) (ul & 0xff);
502 					vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
503 					vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
504 					vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
505 					vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
506 					vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
507 					vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
508 					vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
509 					break;
510 				case "float":
511 					types[ct++] = SQLType.FLOAT;
512 					types[ct++] = SIGNED;
513 					reAlloc(4);
514 					float f = isRef? *(v.get!(float*)): v.get!(float);
515 					ubyte* ubp = cast(ubyte*) &f;
516 					vals[vcl++] = *ubp++;
517 					vals[vcl++] = *ubp++;
518 					vals[vcl++] = *ubp++;
519 					vals[vcl++] = *ubp;
520 					break;
521 				case "double":
522 					types[ct++] = SQLType.DOUBLE;
523 					types[ct++] = SIGNED;
524 					reAlloc(8);
525 					double d = isRef? *(v.get!(double*)): v.get!(double);
526 					ubyte* ubp = cast(ubyte*) &d;
527 					vals[vcl++] = *ubp++;
528 					vals[vcl++] = *ubp++;
529 					vals[vcl++] = *ubp++;
530 					vals[vcl++] = *ubp++;
531 					vals[vcl++] = *ubp++;
532 					vals[vcl++] = *ubp++;
533 					vals[vcl++] = *ubp++;
534 					vals[vcl++] = *ubp;
535 					break;
536 				case "std.datetime.date.Date":
537 				case "std.datetime.Date":
538 					types[ct++] = SQLType.DATE;
539 					types[ct++] = SIGNED;
540 					Date date = isRef? *(v.get!(Date*)): v.get!(Date);
541 					ubyte[] da = pack(date);
542 					size_t l = da.length;
543 					reAlloc(l);
544 					vals[vcl..vcl+l] = da[];
545 					vcl += l;
546 					break;
547 				case "std.datetime.TimeOfDay":
548 				case "std.datetime.Time":
549 					types[ct++] = SQLType.TIME;
550 					types[ct++] = SIGNED;
551 					TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay);
552 					ubyte[] ta = pack(time);
553 					size_t l = ta.length;
554 					reAlloc(l);
555 					vals[vcl..vcl+l] = ta[];
556 					vcl += l;
557 					break;
558 				case "std.datetime.date.DateTime":
559 				case "std.datetime.DateTime":
560 					types[ct++] = SQLType.DATETIME;
561 					types[ct++] = SIGNED;
562 					DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime);
563 					ubyte[] da = pack(dt);
564 					size_t l = da.length;
565 					reAlloc(l);
566 					vals[vcl..vcl+l] = da[];
567 					vcl += l;
568 					break;
569 				case "mysql.types.Timestamp":
570 					types[ct++] = SQLType.TIMESTAMP;
571 					types[ct++] = SIGNED;
572 					Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp);
573 					DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep);
574 					ubyte[] da = pack(dt);
575 					size_t l = da.length;
576 					reAlloc(l);
577 					vals[vcl..vcl+l] = da[];
578 					vcl += l;
579 					break;
580 				case "immutable(char)[]":
581 					if (ext == SQLType.INFER_FROM_D_TYPE)
582 						types[ct++] = SQLType.VARCHAR;
583 					else
584 						types[ct++] = cast(ubyte) ext;
585 					types[ct++] = SIGNED;
586 					string s = isRef? *(v.get!(string*)): v.get!(string);
587 					ubyte[] packed = packLCS(cast(void[]) s);
588 					reAlloc(packed.length);
589 					vals[vcl..vcl+packed.length] = packed[];
590 					vcl += packed.length;
591 					break;
592 				case "char[]":
593 					if (ext == SQLType.INFER_FROM_D_TYPE)
594 						types[ct++] = SQLType.VARCHAR;
595 					else
596 						types[ct++] = cast(ubyte) ext;
597 					types[ct++] = SIGNED;
598 					char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]);
599 					ubyte[] packed = packLCS(cast(void[]) ca);
600 					reAlloc(packed.length);
601 					vals[vcl..vcl+packed.length] = packed[];
602 					vcl += packed.length;
603 					break;
604 				case "byte[]":
605 					if (ext == SQLType.INFER_FROM_D_TYPE)
606 						types[ct++] = SQLType.TINYBLOB;
607 					else
608 						types[ct++] = cast(ubyte) ext;
609 					types[ct++] = SIGNED;
610 					byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]);
611 					ubyte[] packed = packLCS(cast(void[]) ba);
612 					reAlloc(packed.length);
613 					vals[vcl..vcl+packed.length] = packed[];
614 					vcl += packed.length;
615 					break;
616 				case "ubyte[]":
617 					if (ext == SQLType.INFER_FROM_D_TYPE)
618 						types[ct++] = SQLType.TINYBLOB;
619 					else
620 						types[ct++] = cast(ubyte) ext;
621 					types[ct++] = SIGNED;
622 					ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]);
623 					ubyte[] packed = packLCS(cast(void[]) uba);
624 					reAlloc(packed.length);
625 					vals[vcl..vcl+packed.length] = packed[];
626 					vcl += packed.length;
627 					break;
628 				case "void":
629 					throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
630 				default:
631 					throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__);
632 			}
633 		}
634 		vals.length = vcl;
635 		return types;
636 	}
637 
638 	static void sendLongData(Connection conn, uint hStmt, ParameterSpecialization[] psa)
639 	{
640 		assert(psa.length <= ushort.max); // parameter number is sent as short
641 		foreach (ushort i, PSN psn; psa)
642 		{
643 			if (!psn.chunkSize) continue;
644 			uint cs = psn.chunkSize;
645 			uint delegate(ubyte[]) dg = psn.chunkDelegate;
646 
647 			//TODO: All low-level commms should be moved into the mysql.protocol package.
648 			ubyte[] chunk;
649 			chunk.length = cs+11;
650 			chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
651 			chunk[4] = CommandType.STMT_SEND_LONG_DATA;
652 			hStmt.packInto(chunk[5..9]); // statement handle
653 			packInto(i, chunk[9..11]); // parameter number
654 
655 			// byte 11 on is payload
656 			for (;;)
657 			{
658 				uint sent = dg(chunk[11..cs+11]);
659 				if (sent < cs)
660 				{
661 					if (sent == 0)    // data was exact multiple of chunk size - all sent
662 						break;
663 					chunk.length = chunk.length - (cs-sent);     // trim the chunk
664 					sent += 7;        // adjust for non-payload bytes
665 					packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
666 					conn.send(chunk);
667 					break;
668 				}
669 				conn.send(chunk);
670 			}
671 		}
672 	}
673 
674 	static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
675 		Variant[] inParams, ParameterSpecialization[] psa)
676 	{
677 		conn.autoPurge();
678 		
679 		//TODO: All low-level commms should be moved into the mysql.protocol package.
680 		ubyte[] packet;
681 		conn.resetPacket();
682 
683 		ubyte[] prefix = makePSPrefix(hStmt, 0);
684 		size_t len = prefix.length;
685 		bool longData;
686 
687 		if (psh.paramCount)
688 		{
689 			ubyte[] one = [ 1 ];
690 			ubyte[] vals;
691 			ubyte[] types = analyseParams(inParams, psa, vals, longData);
692 			ubyte[] nbm = makeBitmap(inParams);
693 			packet = prefix ~ nbm ~ one ~ types ~ vals;
694 		}
695 		else
696 			packet = prefix;
697 
698 		if (longData)
699 			sendLongData(conn, hStmt, psa);
700 
701 		assert(packet.length <= uint.max);
702 		packet.setPacketHeader(conn.pktNumber);
703 		conn.bumpPacket();
704 		conn.send(packet);
705 	}
706 }
707 
708 /++
709 A class representing a database connection.
710 
711 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
712 creating a new Connection directly. That will provide certain benefits,
713 such as reusing old connections and automatic cleanup (no need to close
714 the connection when done).
715 
716 ------------------
717 // Suggested usage:
718 
719 {
720 	auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
721 	scope(exit) con.close();
722 
723 	// Use the connection
724 	...
725 }
726 ------------------
727 +/
728 //TODO: All low-level commms should be moved into the mysql.protocol package.
729 class Connection
730 {
731 /+
732 The Connection is responsible for handshaking with the server to establish
733 authentication. It then passes client preferences to the server, and
734 subsequently is the channel for all command packets that are sent, and all
735 response packets received.
736 
737 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one
738 byte as a packet number. Connection deals with the headers and ensures that
739 packet numbers are sequential.
740 
741 The initial packet is sent by the server - essentially a 'hello' packet
742 inviting login. That packet has a sequence number of zero. That sequence
743 number is the incremented by client and server packets through the handshake
744 sequence.
745 
746 After login all further sequences are initialized by the client sending a
747 command packet with a zero sequence number, to which the server replies with
748 zero or more packets with sequential sequence numbers.
749 +/
750 package:
751 	enum OpenState
752 	{
753 		/// We have not yet connected to the server, or have sent QUIT to the
754 		/// server and closed the connection
755 		notConnected,
756 		/// We have connected to the server and parsed the greeting, but not
757 		/// yet authenticated
758 		connected,
759 		/// We have successfully authenticated against the server, and need to
760 		/// send QUIT to the server when closing the connection
761 		authenticated
762 	}
763 	OpenState   _open;
764 	MySQLSocket _socket;
765 
766 	SvrCapFlags _sCaps, _cCaps;
767 	uint    _sThread;
768 	ushort  _serverStatus;
769 	ubyte   _sCharSet, _protocol;
770 	string  _serverVersion;
771 
772 	string _host, _user, _pwd, _db;
773 	ushort _port;
774 
775 	MySQLSocketType _socketType;
776 
777 	OpenSocketCallbackPhobos _openSocketPhobos;
778 	OpenSocketCallbackVibeD  _openSocketVibeD;
779 
780 	ulong _insertID;
781 
782 	// This gets incremented every time a command is issued or results are purged,
783 	// so a ResultRange can tell whether it's been invalidated.
784 	ulong _lastCommandID;
785 
786 	// Whether there are rows, headers or bimary data waiting to be retreived.
787 	// MySQL protocol doesn't permit performing any other action until all
788 	// such data is read.
789 	bool _rowsPending, _headersPending, _binaryPending;
790 
791 	// Field count of last performed command.
792 	//TODO: Does Connection need to store this?
793 	ushort _fieldCount;
794 
795 	// ResultSetHeaders of last performed command.
796 	//TODO: Does Connection need to store this? Is this even used?
797 	ResultSetHeaders _rsh;
798 
799 	// This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise
800 	// you'll get the dreaded "packet out of order" message. It, and the socket connection are
801 	// the reason why most other objects require a connection object for their construction.
802 	ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct
803 				/// ordering. First packet should have 0
804 	@property ubyte pktNumber()   { return _cpn; }
805 	void bumpPacket()       { _cpn++; }
806 	void resetPacket()      { _cpn = 0; }
807 
808 	version(Have_vibe_d_core) {} else
809 	pure const nothrow invariant()
810 	{
811 		assert(_socketType != MySQLSocketType.vibed);
812 	}
813 
814 	ubyte[] getPacket()
815 	{
816 		scope(failure) kill();
817 
818 		ubyte[4] header;
819 		_socket.read(header);
820 		// number of bytes always set as 24-bit
821 		uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
822 		enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order");
823 		bumpPacket();
824 
825 		ubyte[] packet = new ubyte[numDataBytes];
826 		_socket.read(packet);
827 		assert(packet.length == numDataBytes, "Wrong number of bytes read");
828 		return packet;
829 	}
830 
831 	void send(const(ubyte)[] packet)
832 	in
833 	{
834 		assert(packet.length > 4); // at least 1 byte more than header
835 	}
836 	body
837 	{
838 		_socket.write(packet);
839 	}
840 
841 	void send(const(ubyte)[] header, const(ubyte)[] data)
842 	in
843 	{
844 		assert(header.length == 4 || header.length == 5/*command type included*/);
845 	}
846 	body
847 	{
848 		_socket.write(header);
849 		if(data.length)
850 			_socket.write(data);
851 	}
852 
853 	void sendCmd(T)(CommandType cmd, const(T)[] data)
854 	in
855 	{
856 		// Internal thread states. Clients shouldn't use this
857 		assert(cmd != CommandType.SLEEP);
858 		assert(cmd != CommandType.CONNECT);
859 		assert(cmd != CommandType.TIME);
860 		assert(cmd != CommandType.DELAYED_INSERT);
861 		assert(cmd != CommandType.CONNECT_OUT);
862 
863 		// Deprecated
864 		assert(cmd != CommandType.CREATE_DB);
865 		assert(cmd != CommandType.DROP_DB);
866 		assert(cmd != CommandType.TABLE_DUMP);
867 
868 		// cannot send more than uint.max bytes. TODO: better error message if we try?
869 		assert(data.length <= uint.max);
870 	}
871 	out
872 	{
873 		// at this point we should have sent a command
874 		assert(pktNumber == 1);
875 	}
876 	body
877 	{
878 		autoPurge();
879 
880 		scope(failure) kill();
881 
882 		_lastCommandID++;
883 
884 		if(!_socket.connected)
885 		{
886 			if(cmd == CommandType.QUIT)
887 				return; // Don't bother reopening connection just to quit
888 
889 			_open = OpenState.notConnected;
890 			connect(_clientCapabilities);
891 		}
892 
893 		resetPacket();
894 
895 		ubyte[] header;
896 		header.length = 4 /*header*/ + 1 /*cmd*/;
897 		header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/);
898 		header[4] = cmd;
899 		bumpPacket();
900 
901 		send(header, cast(const(ubyte)[])data);
902 	}
903 
904 	OKErrorPacket getCmdResponse(bool asString = false)
905 	{
906 		auto okp = OKErrorPacket(getPacket());
907 		enforcePacketOK(okp);
908 		_serverStatus = okp.serverStatus;
909 		return okp;
910 	}
911 
912 	/// Get the next `mysql.result.Row` of a pending result set.
913 	Row getNextRow()
914 	{
915 		scope(failure) kill();
916 
917 		if (_headersPending)
918 		{
919 			_rsh = ResultSetHeaders(this, _fieldCount);
920 			_headersPending = false;
921 		}
922 		ubyte[] packet;
923 		Row rr;
924 		packet = getPacket();
925 		if(packet.front == ResultPacketMarker.error)
926 			throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__);
927 
928 		if (packet.isEOFPacket())
929 		{
930 			_rowsPending = _binaryPending = false;
931 			return rr;
932 		}
933 		if (_binaryPending)
934 			rr = Row(this, packet, _rsh, true);
935 		else
936 			rr = Row(this, packet, _rsh, false);
937 		//rr._valid = true;
938 		return rr;
939 	}
940 
941 	ubyte[] buildAuthPacket(ubyte[] token)
942 	in
943 	{
944 		assert(token.length == 20);
945 	}
946 	body
947 	{
948 		ubyte[] packet;
949 		packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1);
950 		packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
951 
952 		// NOTE: we'll set the header last when we know the size
953 
954 		// Set the default capabilities required by the client
955 		_cCaps.packInto(packet[4..8]);
956 
957 		// Request a conventional maximum packet length.
958 		1.packInto(packet[8..12]);
959 
960 		packet ~= 33; // Set UTF-8 as default charSet
961 
962 		// There's a statutory block of zero bytes here - fill them in.
963 		foreach(i; 0 .. 23)
964 			packet ~= 0;
965 
966 		// Add the user name as a null terminated string
967 		foreach(i; 0 .. _user.length)
968 			packet ~= _user[i];
969 		packet ~= 0; // \0
970 
971 		// Add our calculated authentication token as a length prefixed string.
972 		assert(token.length <= ubyte.max);
973 		if(_pwd.length == 0)  // Omit the token if the account has no password
974 			packet ~= 0;
975 		else
976 		{
977 			packet ~= cast(ubyte)token.length;
978 			foreach(i; 0 .. token.length)
979 				packet ~= token[i];
980 		}
981 
982 		// Add the default database as a null terminated string
983 		foreach(i; 0 .. _db.length)
984 			packet ~= _db[i];
985 		packet ~= 0; // \0
986 
987 		// The server sent us a greeting with packet number 0, so we send the auth packet
988 		// back with the next number.
989 		packet.setPacketHeader(pktNumber);
990 		bumpPacket();
991 		return packet;
992 	}
993 
994 	void consumeServerInfo(ref ubyte[] packet)
995 	{
996 		scope(failure) kill();
997 
998 		_sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
999 		_sCharSet = packet.consume!ubyte(); // server_language
1000 		_serverStatus = packet.consume!ushort(); //server_status
1001 		_sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
1002 		_sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
1003 
1004 		enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
1005 		enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
1006 	}
1007 
1008 	ubyte[] parseGreeting()
1009 	{
1010 		scope(failure) kill();
1011 
1012 		ubyte[] packet = getPacket();
1013 
1014 		if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
1015 		{
1016 			auto okp = OKErrorPacket(packet);
1017 			enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
1018 		}
1019 
1020 		_protocol = packet.consume!ubyte();
1021 
1022 		_serverVersion = packet.consume!string(packet.countUntil(0));
1023 		packet.skip(1); // \0 terminated _serverVersion
1024 
1025 		_sThread = packet.consume!uint();
1026 
1027 		// read first part of scramble buf
1028 		ubyte[] authBuf;
1029 		authBuf.length = 255;
1030 		authBuf[0..8] = packet.consume(8)[]; // scramble_buff
1031 
1032 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
1033 
1034 		consumeServerInfo(packet);
1035 
1036 		packet.skip(1); // this byte supposed to be scramble length, but is actually zero
1037 		packet.skip(10); // filler of \0
1038 
1039 		// rest of the scramble
1040 		auto len = packet.countUntil(0);
1041 		enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
1042 		enforce(authBuf.length > 8+len);
1043 		authBuf[8..8+len] = packet.consume(len)[];
1044 		authBuf.length = 8+len; // cut to correct size
1045 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
1046 
1047 		return authBuf;
1048 	}
1049 
1050 	static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port)
1051 	{
1052 		auto s = new PlainPhobosSocket();
1053 		s.connect(new InternetAddress(host, port));
1054 		return s;
1055 	}
1056 
1057 	static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port)
1058 	{
1059 		version(Have_vibe_d_core)
1060 			return vibe.core.net.connectTCP(host, port);
1061 		else
1062 			assert(0);
1063 	}
1064 
1065 	void initConnection()
1066 	{
1067 		resetPacket();
1068 		final switch(_socketType)
1069 		{
1070 			case MySQLSocketType.phobos:
1071 				_socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port));
1072 				break;
1073 
1074 			case MySQLSocketType.vibed:
1075 				version(Have_vibe_d_core) {
1076 					_socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port));
1077 					break;
1078 				} else assert(0, "Unsupported socket type. Need version Have_vibe_d_core.");
1079 		}
1080 	}
1081 
1082 	ubyte[] makeToken(ubyte[] authBuf)
1083 	{
1084 		auto pass1 = sha1Of(cast(const(ubyte)[])_pwd);
1085 		auto pass2 = sha1Of(pass1);
1086 
1087 		SHA1 sha1;
1088 		sha1.start();
1089 		sha1.put(authBuf);
1090 		sha1.put(pass2);
1091 		auto result = sha1.finish();
1092 		foreach (size_t i; 0..20)
1093 			result[i] = result[i] ^ pass1[i];
1094 		return result.dup;
1095 	}
1096 
1097 	SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
1098 	{
1099 		SvrCapFlags common;
1100 		uint filter = 1;
1101 		foreach (size_t i; 0..uint.sizeof)
1102 		{
1103 			bool serverSupport = (server & filter) != 0; // can the server do this capability?
1104 			bool clientSupport = (client & filter) != 0; // can we support it?
1105 			if(serverSupport && clientSupport)
1106 				common |= filter;
1107 			filter <<= 1; // check next flag
1108 		}
1109 		return common;
1110 	}
1111 
1112 	void setClientFlags(SvrCapFlags capFlags)
1113 	{
1114 		_cCaps = getCommonCapabilities(_sCaps, capFlags);
1115 
1116 		// We cannot operate in <4.1 protocol, so we'll force it even if the user
1117 		// didn't supply it
1118 		_cCaps |= SvrCapFlags.PROTOCOL41;
1119 		_cCaps |= SvrCapFlags.SECURE_CONNECTION;
1120 	}
1121 
1122 	void authenticate(ubyte[] greeting)
1123 	in
1124 	{
1125 		assert(_open == OpenState.connected);
1126 	}
1127 	out
1128 	{
1129 		assert(_open == OpenState.authenticated);
1130 	}
1131 	body
1132 	{
1133 		auto token = makeToken(greeting);
1134 		auto authPacket = buildAuthPacket(token);
1135 		send(authPacket);
1136 
1137 		auto packet = getPacket();
1138 		auto okp = OKErrorPacket(packet);
1139 		enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
1140 		_open = OpenState.authenticated;
1141 	}
1142 
1143 	SvrCapFlags _clientCapabilities;
1144 
1145 	void connect(SvrCapFlags clientCapabilities)
1146 	in
1147 	{
1148 		assert(closed);
1149 	}
1150 	out
1151 	{
1152 		assert(_open == OpenState.authenticated);
1153 	}
1154 	body
1155 	{
1156 		initConnection();
1157 		auto greeting = parseGreeting();
1158 		_open = OpenState.connected;
1159 
1160 		_clientCapabilities = clientCapabilities;
1161 		setClientFlags(clientCapabilities);
1162 		authenticate(greeting);
1163 	}
1164 	
1165 	/// Forcefully close the socket without sending the quit command.
1166 	/// Needed in case an error leaves communatations in an undefined or non-recoverable state.
1167 	void kill()
1168 	{
1169 		if(_socket.connected)
1170 			_socket.close();
1171 		_open = OpenState.notConnected;
1172 		// any pending data is gone. Any statements to release will be released
1173 		// on the server automatically.
1174 		_headersPending = _rowsPending = _binaryPending = false;
1175 		
1176 		preparedRegistrations.clear();
1177 	}
1178 	
1179 	/// Called whenever mysql-native needs to send a command to the server
1180 	/// and be sure there aren't any pending results (which would prevent
1181 	/// a new command from being sent).
1182 	void autoPurge()
1183 	{
1184 		// This is called every time a command is sent,
1185 		// so detect & prevent infinite recursion.
1186 		static bool isAutoPurging = false;
1187 
1188 		if(isAutoPurging)
1189 			return;
1190 			
1191 		isAutoPurging = true;
1192 		scope(exit) isAutoPurging = false;
1193 
1194 		try
1195 		{
1196 			purgeResult();
1197 			releaseQueued();
1198 		}
1199 		catch(Exception e)
1200 		{
1201 			// likely the connection was closed, so reset any state.
1202 			// Don't treat this as a real error, because everything will be reset when we
1203 			// reconnect.
1204 			kill();
1205 		}
1206 	}
1207 
1208 	/// Lookup per-connection prepared statement info by SQL
1209 	private PreparedRegistrations!PreparedServerInfo preparedRegistrations;
1210 
1211 	/// Releases all prepared statements that are queued for release.
1212 	void releaseQueued()
1213 	{
1214 		foreach(sql, info; preparedRegistrations.directLookup)
1215 		if(info.queuedForRelease)
1216 		{
1217 			immediateReleasePrepared(info.statementId);
1218 			preparedRegistrations.directLookup.remove(sql);
1219 		}
1220 	}
1221 
1222 	/// Returns null if not found
1223 	Nullable!PreparedServerInfo getPreparedServerInfo(const string sql) pure nothrow
1224 	{
1225 		return preparedRegistrations[sql];
1226 	}
1227 	
1228 	/// If already registered, simply returns the cached `PreparedServerInfo`.
1229 	PreparedServerInfo registerIfNeeded(string sql)
1230 	{
1231 		return preparedRegistrations.registerIfNeeded(sql, &performRegister);
1232 	}
1233 
1234 	PreparedServerInfo performRegister(string sql)
1235 	{
1236 		scope(failure) kill();
1237 
1238 		PreparedServerInfo info;
1239 		
1240 		sendCmd(CommandType.STMT_PREPARE, sql);
1241 		_fieldCount = 0;
1242 
1243 		//TODO: All packet handling should be moved into the mysql.protocol package.
1244 		ubyte[] packet = getPacket();
1245 		if(packet.front == ResultPacketMarker.ok)
1246 		{
1247 			packet.popFront();
1248 			info.statementId    = packet.consume!int();
1249 			_fieldCount         = packet.consume!short();
1250 			info.numParams      = packet.consume!short();
1251 
1252 			packet.popFront(); // one byte filler
1253 			info.psWarnings     = packet.consume!short();
1254 
1255 			// At this point the server also sends field specs for parameters
1256 			// and columns if there were any of each
1257 			info.headers = PreparedStmtHeaders(this, _fieldCount, info.numParams);
1258 		}
1259 		else if(packet.front == ResultPacketMarker.error)
1260 		{
1261 			auto error = OKErrorPacket(packet);
1262 			enforcePacketOK(error);
1263 			assert(0); // FIXME: what now?
1264 		}
1265 		else
1266 			assert(0); // FIXME: what now?
1267 
1268 		return info;
1269 	}
1270 
1271 	private void immediateReleasePrepared(uint statementId)
1272 	{
1273 		scope(failure) kill();
1274 
1275 		if(closed())
1276 			return;
1277 
1278 		//TODO: All low-level commms should be moved into the mysql.protocol package.
1279 		ubyte[9] packet_buf;
1280 		ubyte[] packet = packet_buf;
1281 		packet.setPacketHeader(0/*packet number*/);
1282 		bumpPacket();
1283 		packet[4] = CommandType.STMT_CLOSE;
1284 		statementId.packInto(packet[5..9]);
1285 		purgeResult();
1286 		send(packet);
1287 		// It seems that the server does not find it necessary to send a response
1288 		// for this command.
1289 	}
1290 
1291 public:
1292 
1293 	/++
1294 	Construct opened connection.
1295 
1296 	Throws `mysql.exceptions.MYX` upon failure to connect.
1297 	
1298 	If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
1299 	creating a new Connection directly. That will provide certain benefits,
1300 	such as reusing old connections and automatic cleanup (no need to close
1301 	the connection when done).
1302 
1303 	------------------
1304 	// Suggested usage:
1305 
1306 	{
1307 	    auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
1308 	    scope(exit) con.close();
1309 
1310 	    // Use the connection
1311 	    ...
1312 	}
1313 	------------------
1314 
1315 	Params:
1316 		cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld"
1317 			(TODO: The connection string needs work to allow for semicolons in its parts!)
1318 		socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos,
1319 			unless compiled with `-version=Have_vibe_d_core` (set automatically
1320 			if using $(LINK2 http://code.dlang.org/getting_started, DUB)).
1321 		openSocket = Optional callback which should return a newly-opened Phobos
1322 			or Vibe.d TCP socket. This allows custom sockets to be used,
1323 			subclassed from Phobos's or Vibe.d's sockets.
1324 		host = An IP address in numeric dotted form, or as a host  name.
1325 		user = The user name to authenticate.
1326 		password = User's password.
1327 		db = Desired initial database.
1328 		capFlags = The set of flag bits from the server's capabilities that the client requires
1329 	+/
1330 	//After the connection is created, and the initial invitation is received from the server
1331 	//client preferences can be set, and authentication can then be attempted.
1332 	this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1333 	{
1334 		version(Have_vibe_d_core)
1335 			enum defaultSocketType = MySQLSocketType.vibed;
1336 		else
1337 			enum defaultSocketType = MySQLSocketType.phobos;
1338 
1339 		this(defaultSocketType, host, user, pwd, db, port, capFlags);
1340 	}
1341 
1342 	///ditto
1343 	this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1344 	{
1345 		version(Have_vibe_d_core) {} else
1346 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
1347 
1348 		this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD,
1349 			host, user, pwd, db, port, capFlags);
1350 	}
1351 
1352 	///ditto
1353 	this(OpenSocketCallbackPhobos openSocket,
1354 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1355 	{
1356 		this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags);
1357 	}
1358 
1359 	version(Have_vibe_d_core)
1360 	///ditto
1361 	this(OpenSocketCallbackVibeD openSocket,
1362 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1363 	{
1364 		this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags);
1365 	}
1366 
1367 	///ditto
1368 	private this(MySQLSocketType socketType,
1369 		OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD,
1370 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1371 	in
1372 	{
1373 		final switch(socketType)
1374 		{
1375 			case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break;
1376 			case MySQLSocketType.vibed:  assert(openSocketVibeD  !is null); break;
1377 		}
1378 	}
1379 	body
1380 	{
1381 		enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1");
1382 		enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection");
1383 		version(Have_vibe_d_core) {} else
1384 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
1385 
1386 		_socketType = socketType;
1387 		_host = host;
1388 		_user = user;
1389 		_pwd = pwd;
1390 		_db = db;
1391 		_port = port;
1392 
1393 		_openSocketPhobos = openSocketPhobos;
1394 		_openSocketVibeD  = openSocketVibeD;
1395 
1396 		connect(capFlags);
1397 	}
1398 
1399 	///ditto
1400 	//After the connection is created, and the initial invitation is received from the server
1401 	//client preferences can be set, and authentication can then be attempted.
1402 	this(string cs, SvrCapFlags capFlags = defaultClientFlags)
1403 	{
1404 		string[] a = parseConnectionString(cs);
1405 		this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1406 	}
1407 
1408 	///ditto
1409 	this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags)
1410 	{
1411 		string[] a = parseConnectionString(cs);
1412 		this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1413 	}
1414 
1415 	///ditto
1416 	this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
1417 	{
1418 		string[] a = parseConnectionString(cs);
1419 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1420 	}
1421 
1422 	version(Have_vibe_d_core)
1423 	///ditto
1424 	this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
1425 	{
1426 		string[] a = parseConnectionString(cs);
1427 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1428 	}
1429 
1430 	/++
1431 	Check whether this `Connection` is still connected to the server, or if
1432 	the connection has been closed.
1433 	+/
1434 	@property bool closed()
1435 	{
1436 		return _open == OpenState.notConnected || !_socket.connected;
1437 	}
1438 
1439 	/++
1440 	Explicitly close the connection.
1441 	
1442 	Idiomatic use as follows is suggested:
1443 	------------------
1444 	{
1445 	    auto con = new Connection("localhost:user:password:mysqld");
1446 	    scope(exit) con.close();
1447 	    // Use the connection
1448 	    ...
1449 	}
1450 	------------------
1451 	+/
1452 	void close()
1453 	{
1454 		// This is a two-stage process. First tell the server we are quitting this
1455 		// connection, and then close the socket.
1456 
1457 		if (_open == OpenState.authenticated && _socket.connected)
1458 			quit();
1459 
1460 		if (_open == OpenState.connected)
1461 			kill();
1462 		resetPacket();
1463 	}
1464 
1465 	/++
1466 	Reconnects to the server using the same connection settings originally
1467 	used to create the `Connection`.
1468 
1469 	Optionally takes a `mysql.protocol.constants.SvrCapFlags`, allowing you to
1470 	reconnect using a different set of server capability flags.
1471 
1472 	Normally, if the connection is already open, this will do nothing. However,
1473 	if you request a different set of `mysql.protocol.constants.SvrCapFlags`
1474 	then was originally used to create the `Connection`, the connection will
1475 	be closed and then reconnected using the new `mysql.protocol.constants.SvrCapFlags`.
1476 	+/
1477 	//TODO: This needs unittested.
1478 	void reconnect()
1479 	{
1480 		reconnect(_clientCapabilities);
1481 	}
1482 
1483 	///ditto
1484 	void reconnect(SvrCapFlags clientCapabilities)
1485 	{
1486 		bool sameCaps = clientCapabilities == _clientCapabilities;
1487 		if(!closed)
1488 		{
1489 			// Same caps as before?
1490 			if(clientCapabilities == _clientCapabilities)
1491 				return; // Nothing to do, just keep current connection
1492 
1493 			close();
1494 		}
1495 
1496 		connect(clientCapabilities);
1497 	}
1498 
1499 	private void quit()
1500 	in
1501 	{
1502 		assert(_open == OpenState.authenticated);
1503 	}
1504 	body
1505 	{
1506 		sendCmd(CommandType.QUIT, []);
1507 		// No response is sent for a quit packet
1508 		_open = OpenState.connected;
1509 	}
1510 
1511 	/++
1512 	Parses a connection string of the form
1513 	`"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"`
1514 
1515 	Port is optional and defaults to 3306.
1516 
1517 	Whitespace surrounding any name or value is automatically stripped.
1518 
1519 	Returns a five-element array of strings in this order:
1520 	$(UL
1521 	$(LI [0]: host)
1522 	$(LI [1]: user)
1523 	$(LI [2]: pwd)
1524 	$(LI [3]: db)
1525 	$(LI [4]: port)
1526 	)
1527 	
1528 	(TODO: The connection string needs work to allow for semicolons in its parts!)
1529 	+/
1530 	//TODO: Replace the return value with a proper struct.
1531 	static string[] parseConnectionString(string cs)
1532 	{
1533 		string[] rv;
1534 		rv.length = 5;
1535 		rv[4] = "3306"; // Default port
1536 		string[] a = split(cs, ";");
1537 		foreach (s; a)
1538 		{
1539 			string[] a2 = split(s, "=");
1540 			enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs);
1541 			string name = strip(a2[0]);
1542 			string val = strip(a2[1]);
1543 			switch (name)
1544 			{
1545 				case "host":
1546 					rv[0] = val;
1547 					break;
1548 				case "user":
1549 					rv[1] = val;
1550 					break;
1551 				case "pwd":
1552 					rv[2] = val;
1553 					break;
1554 				case "db":
1555 					rv[3] = val;
1556 					break;
1557 				case "port":
1558 					rv[4] = val;
1559 					break;
1560 				default:
1561 					throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__);
1562 			}
1563 		}
1564 		return rv;
1565 	}
1566 
1567 	/++
1568 	Select a current database.
1569 	
1570 	Throws `mysql.exceptions.MYX` upon failure.
1571 
1572 	Params: dbName = Name of the requested database
1573 	+/
1574 	void selectDB(string dbName)
1575 	{
1576 		sendCmd(CommandType.INIT_DB, dbName);
1577 		getCmdResponse();
1578 		_db = dbName;
1579 	}
1580 
1581 	/++
1582 	Check the server status.
1583 	
1584 	Throws `mysql.exceptions.MYX` upon failure.
1585 
1586 	Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined
1587 	+/
1588 	OKErrorPacket pingServer()
1589 	{
1590 		sendCmd(CommandType.PING, []);
1591 		return getCmdResponse();
1592 	}
1593 
1594 	/++
1595 	Refresh some feature(s) of the server.
1596 	
1597 	Throws `mysql.exceptions.MYX` upon failure.
1598 
1599 	Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined
1600 	+/
1601 	OKErrorPacket refreshServer(RefreshFlags flags)
1602 	{
1603 		sendCmd(CommandType.REFRESH, [flags]);
1604 		return getCmdResponse();
1605 	}
1606 
1607 	/++
1608 	Flush any outstanding result set elements.
1609 	
1610 	When the server responds to a command that produces a result set, it
1611 	queues the whole set of corresponding packets over the current connection.
1612 	Before that `Connection` can embark on any new command, it must receive
1613 	all of those packets and junk them.
1614 	
1615 	As of v1.1.4, this is done automatically as needed. But you can still
1616 	call this manually to force a purge to occur when you want.
1617 
1618 	See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/)
1619 	+/
1620 	ulong purgeResult()
1621 	{
1622 		scope(failure) kill();
1623 
1624 		_lastCommandID++;
1625 
1626 		ulong rows = 0;
1627 		if (_headersPending)
1628 		{
1629 			for (size_t i = 0;; i++)
1630 			{
1631 				if (getPacket().isEOFPacket())
1632 				{
1633 					_headersPending = false;
1634 					break;
1635 				}
1636 				enforceEx!MYXProtocol(i < _fieldCount,
1637 					text("Field header count (", _fieldCount, ") exceeded but no EOF packet found."));
1638 			}
1639 		}
1640 		if (_rowsPending)
1641 		{
1642 			for (;;  rows++)
1643 			{
1644 				if (getPacket().isEOFPacket())
1645 				{
1646 					_rowsPending = _binaryPending = false;
1647 					break;
1648 				}
1649 			}
1650 		}
1651 		resetPacket();
1652 		return rows;
1653 	}
1654 
1655 	/++
1656 	Get a textual report on the server status.
1657 	
1658 	(COM_STATISTICS)
1659 	+/
1660 	string serverStats()
1661 	{
1662 		sendCmd(CommandType.STATISTICS, []);
1663 		return cast(string) getPacket();
1664 	}
1665 
1666 	/++
1667 	Enable multiple statement commands.
1668 	
1669 	This can be used later if this feature was not requested in the client capability flags.
1670 	
1671 	Warning: This functionality is currently untested.
1672 	
1673 	Params: on = Boolean value to turn the capability on or off.
1674 	+/
1675 	//TODO: Need to test this
1676 	void enableMultiStatements(bool on)
1677 	{
1678 		scope(failure) kill();
1679 
1680 		ubyte[] t;
1681 		t.length = 2;
1682 		t[0] = on ? 0 : 1;
1683 		t[1] = 0;
1684 		sendCmd(CommandType.STMT_OPTION, t);
1685 
1686 		// For some reason this command gets an EOF packet as response
1687 		auto packet = getPacket();
1688 		enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1689 	}
1690 
1691 	/// Return the in-force protocol number.
1692 	@property ubyte protocol() pure const nothrow { return _protocol; }
1693 	/// Server version
1694 	@property string serverVersion() pure const nothrow { return _serverVersion; }
1695 	/// Server capability flags
1696 	@property uint serverCapabilities() pure const nothrow { return _sCaps; }
1697 	/// Server status
1698 	@property ushort serverStatus() pure const nothrow { return _serverStatus; }
1699 	/// Current character set
1700 	@property ubyte charSet() pure const nothrow { return _sCharSet; }
1701 	/// Current database
1702 	@property string currentDB() pure const nothrow { return _db; }
1703 	/// Socket type being used, Phobos or Vibe.d
1704 	@property MySQLSocketType socketType() pure const nothrow { return _socketType; }
1705 
1706 	/// After a command that inserted a row into a table with an auto-increment
1707 	/// ID column, this method allows you to retrieve the last insert ID.
1708 	@property ulong lastInsertID() pure const nothrow { return _insertID; }
1709 
1710 	/// This gets incremented every time a command is issued or results are purged,
1711 	/// so a `mysql.result.ResultRange` can tell whether it's been invalidated.
1712 	@property ulong lastCommandID() pure const nothrow { return _lastCommandID; }
1713 
1714 	/// Gets whether rows are pending.
1715 	///
1716 	/// Note, you may want `hasPending` instead.
1717 	@property bool rowsPending() pure const nothrow { return _rowsPending; }
1718 
1719 	/// Gets whether anything (rows, headers or binary) is pending.
1720 	/// New commands cannot be sent on a connection while anything is pending
1721 	/// (the pending data will automatically be purged.)
1722 	@property bool hasPending() pure const nothrow
1723 	{
1724 		return _rowsPending || _headersPending || _binaryPending;
1725 	}
1726 
1727 	/// Gets the result header's field descriptions.
1728 	@property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; }
1729 
1730 	/++
1731 	Manually register a prepared statement on this connection.
1732 	
1733 	Does nothing if statement is already registered on this connection.
1734 	
1735 	Calling this is not strictly necessary, as the prepared statement will
1736 	automatically be registered upon its first use on any `Connection`.
1737 	This is provided for those who prefer eager registration over lazy
1738 	for performance reasons.
1739 	+/
1740 	void register(Prepared prepared)
1741 	{
1742 		register(prepared.sql);
1743 	}
1744 
1745 	///ditto
1746 	void register(string sql)
1747 	{
1748 		registerIfNeeded(sql);
1749 	}
1750 
1751 	/++
1752 	Manually release a prepared statement on this connection.
1753 	
1754 	This method tells the server that it can dispose of the information it
1755 	holds about the current prepared statement.
1756 	
1757 	Calling this is not strictly necessary. The server considers prepared
1758 	statements to be per-connection, so they'll go away when the connection
1759 	closes anyway. This is provided in case direct control is actually needed.
1760 
1761 	If you choose to use a reference counted struct to call this automatically,
1762 	be aware that embedding reference counted structs inside garbage collectible
1763 	heap objects is dangerous and should be avoided, as it can lead to various
1764 	hidden problems, from crashes to race conditions. (See the discussion at issue
1765 	$(LINK2 https://github.com/mysql-d/mysql-native/issues/159, #159)
1766 	for details.) Instead, it may be better to simply avoid trying to manage
1767 	their release at all, as it's not usually necessary. Or to periodically
1768 	release all prepared statements, and simply allow mysql-native to
1769 	automatically re-register them upon their next use.
1770 	
1771 	Notes:
1772 	
1773 	In actuality, the server might not immediately be told to release the
1774 	statement (although `isRegistered` will still report `false`).
1775 	
1776 	This is because there could be a `mysql.result.ResultRange` with results
1777 	still pending for retrieval, and the protocol doesn't allow sending commands
1778 	(such as "release a prepared statement") to the server while data is pending.
1779 	Therefore, this function may instead queue the statement to be released
1780 	when it is safe to do so: Either the next time a result set is purged or
1781 	the next time a command (such as `mysql.commands.query` or
1782 	`mysql.commands.exec`) is performed (because such commands automatically
1783 	purge any pending results).
1784 	
1785 	This function does NOT auto-purge because, if this is ever called from
1786 	automatic resource management cleanup (refcounting, RAII, etc), that
1787 	would create ugly situations where hidden, implicit behavior triggers
1788 	an unexpected auto-purge.
1789 	+/
1790 	void release(Prepared prepared)
1791 	{
1792 		release(prepared.sql);
1793 	}
1794 	
1795 	///ditto
1796 	void release(string sql)
1797 	{
1798 		//TODO: Don't queue it if nothing is pending. Just do it immediately.
1799 		//      But need to be certain both situations are unittested.
1800 		preparedRegistrations.queueForRelease(sql);
1801 	}
1802 	
1803 	/++
1804 	Manually release all prepared statements on this connection.
1805 	
1806 	While minimal, every prepared statement registered on a connection does
1807 	use up a small amount of resources in both mysql-native and on the server.
1808 	Additionally, servers can be configured
1809 	$(LINK2 https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_prepared_stmt_count,
1810 	to limit the number of prepared statements)
1811 	allowed on a connection at one time (the default, however
1812 	is quite high). Note also, that certain overloads of `mysql.commands.exec`,
1813 	`mysql.commands.query`, etc. register prepared statements behind-the-scenes
1814 	which are cached for quick re-use later.
1815 	
1816 	Therefore, it may occasionally be useful to clear out all prepared
1817 	statements on a connection, together with all resources used by them (or
1818 	at least leave the resources ready for garbage-collection). This function
1819 	does just that.
1820 	
1821 	Note that this is ALWAYS COMPLETELY SAFE to call, even if you still have
1822 	live prepared statements you intend to use again. This is safe because
1823 	mysql-native will automatically register or re-register prepared statements
1824 	as-needed.
1825 
1826 	Notes:
1827 	
1828 	In actuality, the prepared statements might not be immediately released
1829 	(although `isRegistered` will still report `false` for them).
1830 	
1831 	This is because there could be a `mysql.result.ResultRange` with results
1832 	still pending for retrieval, and the protocol doesn't allow sending commands
1833 	(such as "release a prepared statement") to the server while data is pending.
1834 	Therefore, this function may instead queue the statement to be released
1835 	when it is safe to do so: Either the next time a result set is purged or
1836 	the next time a command (such as `mysql.commands.query` or
1837 	`mysql.commands.exec`) is performed (because such commands automatically
1838 	purge any pending results).
1839 	
1840 	This function does NOT auto-purge because, if this is ever called from
1841 	automatic resource management cleanup (refcounting, RAII, etc), that
1842 	would create ugly situations where hidden, implicit behavior triggers
1843 	an unexpected auto-purge.
1844 	+/
1845 	void releaseAll()
1846 	{
1847 		preparedRegistrations.queueAllForRelease();
1848 	}
1849 
1850 	@("releaseAll")
1851 	debug(MYSQLN_TESTS)
1852 	unittest
1853 	{
1854 		mixin(scopedCn);
1855 		
1856 		cn.exec("DROP TABLE IF EXISTS `releaseAll`");
1857 		cn.exec("CREATE TABLE `releaseAll` (a INTEGER) ENGINE=InnoDB DEFAULT CHARSET=utf8");
1858 
1859 		auto preparedSelect = cn.prepare("SELECT * FROM `releaseAll`");
1860 		auto preparedInsert = cn.prepare("INSERT INTO `releaseAll` (a) VALUES (1)");
1861 		assert(cn.isRegistered(preparedSelect));
1862 		assert(cn.isRegistered(preparedInsert));
1863 
1864 		cn.releaseAll();
1865 		assert(!cn.isRegistered(preparedSelect));
1866 		assert(!cn.isRegistered(preparedInsert));
1867 		cn.exec("INSERT INTO `releaseAll` (a) VALUES (1)");
1868 		assert(!cn.isRegistered(preparedSelect));
1869 		assert(!cn.isRegistered(preparedInsert));
1870 
1871 		cn.exec(preparedInsert);
1872 		cn.query(preparedSelect).array;
1873 		assert(cn.isRegistered(preparedSelect));
1874 		assert(cn.isRegistered(preparedInsert));
1875 
1876 	}
1877 
1878 	/// Is the given statement registered on this connection as a prepared statement?
1879 	bool isRegistered(Prepared prepared)
1880 	{
1881 		return isRegistered( prepared.sql );
1882 	}
1883 
1884 	///ditto
1885 	bool isRegistered(string sql)
1886 	{
1887 		return isRegistered( preparedRegistrations[sql] );
1888 	}
1889 
1890 	///ditto
1891 	package bool isRegistered(Nullable!PreparedServerInfo info)
1892 	{
1893 		return !info.isNull && !info.queuedForRelease;
1894 	}
1895 }
1896 
1897 // Test register, release, isRegistered, and auto-register for prepared statements
1898 @("autoRegistration")
1899 debug(MYSQLN_TESTS)
1900 unittest
1901 {
1902 	import mysql.connection;
1903 	import mysql.test.common;
1904 	
1905 	Prepared preparedInsert;
1906 	Prepared preparedSelect;
1907 	immutable insertSQL = "INSERT INTO `autoRegistration` VALUES (1), (2)";
1908 	immutable selectSQL = "SELECT `val` FROM `autoRegistration`";
1909 	int queryTupleResult;
1910 	
1911 	{
1912 		mixin(scopedCn);
1913 		
1914 		// Setup
1915 		cn.exec("DROP TABLE IF EXISTS `autoRegistration`");
1916 		cn.exec("CREATE TABLE `autoRegistration` (
1917 			`val` INTEGER
1918 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
1919 
1920 		// Initial register
1921 		preparedInsert = cn.prepare(insertSQL);
1922 		preparedSelect = cn.prepare(selectSQL);
1923 		
1924 		// Test basic register, release, isRegistered
1925 		assert(cn.isRegistered(preparedInsert));
1926 		assert(cn.isRegistered(preparedSelect));
1927 		cn.release(preparedInsert);
1928 		cn.release(preparedSelect);
1929 		assert(!cn.isRegistered(preparedInsert));
1930 		assert(!cn.isRegistered(preparedSelect));
1931 		
1932 		// Test manual re-register
1933 		cn.register(preparedInsert);
1934 		cn.register(preparedSelect);
1935 		assert(cn.isRegistered(preparedInsert));
1936 		assert(cn.isRegistered(preparedSelect));
1937 		
1938 		// Test double register
1939 		cn.register(preparedInsert);
1940 		cn.register(preparedSelect);
1941 		assert(cn.isRegistered(preparedInsert));
1942 		assert(cn.isRegistered(preparedSelect));
1943 
1944 		// Test double release
1945 		cn.release(preparedInsert);
1946 		cn.release(preparedSelect);
1947 		assert(!cn.isRegistered(preparedInsert));
1948 		assert(!cn.isRegistered(preparedSelect));
1949 		cn.release(preparedInsert);
1950 		cn.release(preparedSelect);
1951 		assert(!cn.isRegistered(preparedInsert));
1952 		assert(!cn.isRegistered(preparedSelect));
1953 	}
1954 
1955 	// Note that at this point, both prepared statements still exist,
1956 	// but are no longer registered on any connection. In fact, there
1957 	// are no open connections anymore.
1958 	
1959 	// Test auto-register: exec
1960 	{
1961 		mixin(scopedCn);
1962 	
1963 		assert(!cn.isRegistered(preparedInsert));
1964 		cn.exec(preparedInsert);
1965 		assert(cn.isRegistered(preparedInsert));
1966 	}
1967 	
1968 	// Test auto-register: query
1969 	{
1970 		mixin(scopedCn);
1971 	
1972 		assert(!cn.isRegistered(preparedSelect));
1973 		cn.query(preparedSelect).each();
1974 		assert(cn.isRegistered(preparedSelect));
1975 	}
1976 	
1977 	// Test auto-register: queryRow
1978 	{
1979 		mixin(scopedCn);
1980 	
1981 		assert(!cn.isRegistered(preparedSelect));
1982 		cn.queryRow(preparedSelect);
1983 		assert(cn.isRegistered(preparedSelect));
1984 	}
1985 	
1986 	// Test auto-register: queryRowTuple
1987 	{
1988 		mixin(scopedCn);
1989 	
1990 		assert(!cn.isRegistered(preparedSelect));
1991 		cn.queryRowTuple(preparedSelect, queryTupleResult);
1992 		assert(cn.isRegistered(preparedSelect));
1993 	}
1994 	
1995 	// Test auto-register: queryValue
1996 	{
1997 		mixin(scopedCn);
1998 	
1999 		assert(!cn.isRegistered(preparedSelect));
2000 		cn.queryValue(preparedSelect);
2001 		assert(cn.isRegistered(preparedSelect));
2002 	}
2003 }
2004 
2005 // An attempt to reproduce issue #81: Using mysql-native driver with no default database
2006 // I'm unable to actually reproduce the error, though.
2007 @("issue81")
2008 debug(MYSQLN_TESTS)
2009 unittest
2010 {
2011 	import mysql.escape;
2012 	mixin(scopedCn);
2013 	
2014 	cn.exec("DROP TABLE IF EXISTS `issue81`");
2015 	cn.exec("CREATE TABLE `issue81` (a INTEGER) ENGINE=InnoDB DEFAULT CHARSET=utf8");
2016 	cn.exec("INSERT INTO `issue81` (a) VALUES (1)");
2017 
2018 	auto cn2 = new Connection(text("host=", cn._host, ";port=", cn._port, ";user=", cn._user, ";pwd=", cn._pwd));
2019 	scope(exit) cn2.close();
2020 	
2021 	cn2.query("SELECT * FROM `"~mysqlEscape(cn._db).text~"`.`issue81`");
2022 }
2023 
2024 // Regression test for Issue #154:
2025 // autoPurge can throw an exception if the socket was closed without purging
2026 //
2027 // This simulates a disconnect by closing the socket underneath the Connection
2028 // object itself.
2029 @("dropConnection")
2030 debug(MYSQLN_TESTS)
2031 unittest
2032 {
2033 	mixin(scopedCn);
2034 
2035 	cn.exec("DROP TABLE IF EXISTS `dropConnection`");
2036 	cn.exec("CREATE TABLE `dropConnection` (
2037 		`val` INTEGER
2038 	) ENGINE=InnoDB DEFAULT CHARSET=utf8");
2039 	cn.exec("INSERT INTO `dropConnection` VALUES (1), (2), (3)");
2040 	import mysql.prepared;
2041 	{
2042 		auto prep = cn.prepare("SELECT * FROM `dropConnection`");
2043 		cn.query(prep);
2044 	}
2045 	// close the socket forcibly
2046 	cn._socket.close();
2047 	// this should still work (it should reconnect).
2048 	cn.exec("DROP TABLE `dropConnection`");
2049 }
2050 
2051 /+
2052 Test Prepared's ability to be safely refcount-released during a GC cycle
2053 (ie, `Connection.release` must not allocate GC memory).
2054 
2055 Currently disabled because it's not guaranteed to always work
2056 (and apparently, cannot be made to work?)
2057 For relevant discussion, see issue #159:
2058 https://github.com/mysql-d/mysql-native/issues/159
2059 +/
2060 version(none)
2061 debug(MYSQLN_TESTS)
2062 {
2063 	/// Proof-of-concept ref-counted Prepared wrapper, just for testing,
2064 	/// not really intended for actual use.
2065 	private struct RCPreparedPayload
2066 	{
2067 		Prepared prepared;
2068 		Connection conn; // Connection to be released from
2069 
2070 		alias prepared this;
2071 
2072 		@disable this(this); // not copyable
2073 		~this()
2074 		{
2075 			// There are a couple calls to this dtor where `conn` happens to be null.
2076 			if(conn is null)
2077 				return;
2078 
2079 			assert(conn.isRegistered(prepared));
2080 			conn.release(prepared);
2081 		}
2082 	}
2083 	///ditto
2084 	alias RCPrepared = RefCounted!(RCPreparedPayload, RefCountedAutoInitialize.no);
2085 	///ditto
2086 	private RCPrepared rcPrepare(Connection conn, string sql)
2087 	{
2088 		import std.algorithm.mutation : move;
2089 
2090 		auto prepared = conn.prepare(sql);
2091 		auto payload = RCPreparedPayload(prepared, conn);
2092 		return refCounted(move(payload));
2093 	}
2094 
2095 	@("rcPrepared")
2096 	unittest
2097 	{
2098 		import core.memory;
2099 		mixin(scopedCn);
2100 		
2101 		cn.exec("DROP TABLE IF EXISTS `rcPrepared`");
2102 		cn.exec("CREATE TABLE `rcPrepared` (
2103 			`val` INTEGER
2104 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
2105 		cn.exec("INSERT INTO `rcPrepared` VALUES (1), (2), (3)");
2106 
2107 		// Define this in outer scope to guarantee data is left pending when
2108 		// RCPrepared's payload is collected. This will guarantee
2109 		// that Connection will need to queue the release.
2110 		ResultRange rows;
2111 
2112 		void bar()
2113 		{
2114 			class Foo { RCPrepared p; }
2115 			auto foo = new Foo();
2116 
2117 			auto rcStmt = cn.rcPrepare("SELECT * FROM `rcPrepared`");
2118 			foo.p = rcStmt;
2119 			rows = cn.query(rcStmt);
2120 
2121 			/+
2122 			At this point, there are two references to the prepared statement:
2123 			One in a `Foo` object (currently bound to `foo`), and one on the stack.
2124 
2125 			Returning from this function will destroy the one on the stack,
2126 			and deterministically reduce the refcount to 1.
2127 
2128 			So, right here we set `foo` to null to *keep* the Foo object's
2129 			reference to the prepared statement, but set adrift the Foo object
2130 			itself, ready to be destroyed (along with the only remaining
2131 			prepared statement reference it contains) by the next GC cycle.
2132 
2133 			Thus, `RCPreparedPayload.~this` and `Connection.release(Prepared)`
2134 			will be executed during a GC cycle...and had better not perform
2135 			any allocations, or else...boom!
2136 			+/
2137 			foo = null;
2138 		}
2139 
2140 		bar();
2141 		assert(cn.hasPending); // Ensure Connection is forced to queue the release.
2142 		GC.collect(); // `Connection.release(Prepared)` better not be allocating, or boom!
2143 	}
2144 }