1 /// Connect to a MySQL/MariaDB server.
2 module mysql.connection;
3 
4 import std.algorithm;
5 import std.conv;
6 import std.digest.sha;
7 import std.exception;
8 import std.range;
9 import std.socket;
10 import std.string;
11 import std.typecons;
12 
13 import mysql.commands;
14 import mysql.exceptions;
15 import mysql.prepared;
16 import mysql.protocol.constants;
17 import mysql.protocol.packets;
18 import mysql.protocol.sockets;
19 import mysql.result;
20 debug(MYSQLN_TESTS)
21 {
22 	import mysql.test.common;
23 }
24 
25 version(Have_vibe_d_core)
26 {
27 	static if(__traits(compiles, (){ import vibe.core.net; } ))
28 		import vibe.core.net;
29 	else
30 		static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'.");
31 }
32 
33 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection.
34 immutable SvrCapFlags defaultClientFlags =
35 		SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS |
36 		SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 |
37 		SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS |
38 		//SvrCapFlags.MULTI_RESULTS;
39 
40 /++
41 Submit an SQL command to the server to be compiled into a prepared statement.
42 
43 This will automatically register the prepared statement on the provided connection.
44 The resulting `mysql.prepared.Prepared` can then be used freely on ANY `Connection`,
45 as it will automatically be registered upon its first use on other connections.
46 Or, pass it to `Connection.register` if you prefer eager registration.
47 
48 Internally, the result of a successful outcome will be a statement handle - an ID -
49 for the prepared statement, a count of the parameters required for
50 execution of the statement, and a count of the columns that will be present
51 in any result set that the command generates.
52 
53 The server will then proceed to send prepared statement headers,
54 including parameter descriptions, and result set field descriptions,
55 followed by an EOF packet.
56 
57 Throws: `mysql.exceptions.MYX` if the server has a problem.
58 +/
59 Prepared prepare(Connection conn, string sql)
60 {
61 	auto info = conn.registerIfNeeded(sql);
62 	return Prepared(sql, info.headers, info.numParams);
63 }
64 
65 /++
66 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0.
67 
68 See `BackwardCompatPrepared` for more info.
69 +/
70 deprecated("This is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. You should migrate from this to the Prepared-compatible exec/query overloads in 'mysql.commands'.")
71 BackwardCompatPrepared prepareBackwardCompat(Connection conn, string sql)
72 {
73 	return BackwardCompatPrepared(conn, prepare(conn, sql));
74 }
75 
76 /++
77 Convenience function to create a prepared statement which calls a stored function.
78 
79 Be careful that your `numArgs` is correct. If it isn't, you may get a
80 `mysql.exceptions.MYX` with a very unclear error message.
81 
82 Throws: `mysql.exceptions.MYX` if the server has a problem.
83 
84 Params:
85 	name = The name of the stored function.
86 	numArgs = The number of arguments the stored procedure takes.
87 +/
88 Prepared prepareFunction(Connection conn, string name, int numArgs)
89 {
90 	auto sql = "select " ~ name ~ preparedPlaceholderArgs(numArgs);
91 	return prepare(conn, sql);
92 }
93 
94 ///
95 unittest
96 {
97 	debug(MYSQLN_TESTS)
98 	{
99 		import mysql.test.common;
100 		mixin(scopedCn);
101 
102 		exec(cn, `DROP FUNCTION IF EXISTS hello`);
103 		exec(cn, `
104 			CREATE FUNCTION hello (s CHAR(20))
105 			RETURNS CHAR(50) DETERMINISTIC
106 			RETURN CONCAT('Hello ',s,'!')
107 		`);
108 
109 		auto preparedHello = prepareFunction(cn, "hello", 1);
110 		preparedHello.setArgs("World");
111 		auto rs = cn.query(preparedHello).array;
112 		assert(rs.length == 1);
113 		assert(rs[0][0] == "Hello World!");
114 	}
115 }
116 
117 /++
118 Convenience function to create a prepared statement which calls a stored procedure.
119 
120 OUT parameters are currently not supported. It should generally be
121 possible with MySQL to present them as a result set.
122 
123 Be careful that your `numArgs` is correct. If it isn't, you may get a
124 `mysql.exceptions.MYX` with a very unclear error message.
125 
126 Throws: `mysql.exceptions.MYX` if the server has a problem.
127 
128 Params:
129 	name = The name of the stored procedure.
130 	numArgs = The number of arguments the stored procedure takes.
131 
132 +/
133 Prepared prepareProcedure(Connection conn, string name, int numArgs)
134 {
135 	auto sql = "call " ~ name ~ preparedPlaceholderArgs(numArgs);
136 	return prepare(conn, sql);
137 }
138 
139 ///
140 unittest
141 {
142 	debug(MYSQLN_TESTS)
143 	{
144 		import mysql.test.common;
145 		import mysql.test.integration;
146 		mixin(scopedCn);
147 		initBaseTestTables(cn);
148 
149 		exec(cn, `DROP PROCEDURE IF EXISTS insert2`);
150 		exec(cn, `
151 			CREATE PROCEDURE insert2 (IN p1 INT, IN p2 CHAR(50))
152 			BEGIN
153 				INSERT INTO basetest (intcol, stringcol) VALUES(p1, p2);
154 			END
155 		`);
156 
157 		auto preparedInsert2 = prepareProcedure(cn, "insert2", 2);
158 		preparedInsert2.setArgs(2001, "inserted string 1");
159 		cn.exec(preparedInsert2);
160 
161 		auto rs = query(cn, "SELECT stringcol FROM basetest WHERE intcol=2001").array;
162 		assert(rs.length == 1);
163 		assert(rs[0][0] == "inserted string 1");
164 	}
165 }
166 
167 private string preparedPlaceholderArgs(int numArgs)
168 {
169 	auto sql = "(";
170 	bool comma = false;
171 	foreach(i; 0..numArgs)
172 	{
173 		if (comma)
174 			sql ~= ",?";
175 		else
176 		{
177 			sql ~= "?";
178 			comma = true;
179 		}
180 	}
181 	sql ~= ")";
182 
183 	return sql;
184 }
185 
186 debug(MYSQLN_TESTS)
187 unittest
188 {
189 	assert(preparedPlaceholderArgs(3) == "(?,?,?)");
190 	assert(preparedPlaceholderArgs(2) == "(?,?)");
191 	assert(preparedPlaceholderArgs(1) == "(?)");
192 	assert(preparedPlaceholderArgs(0) == "()");
193 }
194 
195 /// Per-connection info from the server about a registered prepared statement.
196 package struct PreparedServerInfo
197 {
198 	/// Server's identifier for this prepared statement.
199 	/// Apperently, this is never 0 if it's been registered,
200 	/// although mysql-native no longer relies on that.
201 	uint statementId;
202 
203 	ushort psWarnings;
204 
205 	/// Number of parameters this statement takes.
206 	/// 
207 	/// This will be the same on all connections, but it's returned
208 	/// by the server upon registration, so it's stored here.
209 	ushort numParams;
210 
211 	/// Prepared statement headers
212 	///
213 	/// This will be the same on all connections, but it's returned
214 	/// by the server upon registration, so it's stored here.
215 	PreparedStmtHeaders headers;
216 	
217 	/// Not actually from the server. Connection uses this to keep track
218 	/// of statements that should be treated as having been released.
219 	bool queuedForRelease = false;
220 }
221 
222 /++
223 This is a wrapper over `mysql.prepared.Prepared`, provided ONLY as a
224 temporary aid in upgrading to mysql-native v2.0.0 and its
225 new connection-independent model of prepared statements. See the
226 $(LINK2 https://github.com/mysql-d/mysql-native/blob/master/MIGRATING_TO_V2.md, migration guide)
227 for more info.
228 
229 In most cases, this layer shouldn't even be needed. But if you have many
230 lines of code making calls to exec/query the same prepared statement,
231 then this may be helpful.
232 
233 To use this temporary compatability layer, change instances of:
234 
235 ---
236 auto stmt = conn.prepare(...);
237 ---
238 
239 to this:
240 
241 ---
242 auto stmt = conn.prepareBackwardCompat(...);
243 ---
244 
245 And then your prepared statement should work as before.
246 
247 BUT DO NOT LEAVE IT LIKE THIS! Ultimately, you should update
248 your prepared statement code to the mysql-native v2.0.0 API, by changing
249 instances of:
250 
251 ---
252 stmt.exec()
253 stmt.query()
254 stmt.queryRow()
255 stmt.queryRowTuple(outputArgs...)
256 stmt.queryValue()
257 ---
258 
259 to this:
260 
261 ---
262 conn.exec(stmt)
263 conn.query(stmt)
264 conn.queryRow(stmt)
265 conn.queryRowTuple(stmt, outputArgs...)
266 conn.queryValue(stmt)
267 ---
268 
269 Both of the above syntaxes can be used with a `BackwardCompatPrepared`
270 (the `Connection` passed directly to `mysql.commands.exec`/`mysql.commands.query`
271 will override the one embedded associated with your `BackwardCompatPrepared`).
272 
273 Once all of your code is updated, you can change `prepareBackwardCompat`
274 back to `prepare` again, and your upgrade will be complete.
275 +/
276 struct BackwardCompatPrepared
277 {
278 	import std.variant;
279 	
280 	private Connection _conn;
281 	Prepared _prepared;
282 
283 	/// Access underlying `Prepared`
284 	@property Prepared prepared() { return _prepared; }
285 
286 	alias _prepared this;
287 
288 	/++
289 	This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0.
290 	
291 	See `BackwardCompatPrepared` for more info.
292 	+/
293 	deprecated("Change 'preparedStmt.exec()' to 'conn.exec(preparedStmt)'")
294 	ulong exec()
295 	{
296 		return .exec(_conn, _prepared);
297 	}
298 
299 	///ditto
300 	deprecated("Change 'preparedStmt.query()' to 'conn.query(preparedStmt)'")
301 	ResultRange query(ColumnSpecialization[] csa = null)
302 	{
303 		return .query(_conn, _prepared, csa);
304 	}
305 
306 	///ditto
307 	deprecated("Change 'preparedStmt.queryRow()' to 'conn.queryRow(preparedStmt)'")
308 	Nullable!Row queryRow(ColumnSpecialization[] csa = null)
309 	{
310 		return .queryRow(_conn, _prepared, csa);
311 	}
312 
313 	///ditto
314 	deprecated("Change 'preparedStmt.queryRowTuple(outArgs...)' to 'conn.queryRowTuple(preparedStmt, outArgs...)'")
315 	void queryRowTuple(T...)(ref T args) if(T.length == 0 || !is(T[0] : Connection))
316 	{
317 		return .queryRowTuple(_conn, _prepared, args);
318 	}
319 
320 	///ditto
321 	deprecated("Change 'preparedStmt.queryValue()' to 'conn.queryValue(preparedStmt)'")
322 	Nullable!Variant queryValue(ColumnSpecialization[] csa = null)
323 	{
324 		return .queryValue(_conn, _prepared, csa);
325 	}
326 }
327 
328 //TODO: All low-level commms should be moved into the mysql.protocol package.
329 /// Low-level comms code relating to prepared statements.
330 package struct ProtocolPrepared
331 {
332 	import std.conv;
333 	import std.datetime;
334 	import std.variant;
335 	import mysql.types;
336 	
337 	static ubyte[] makeBitmap(in Variant[] inParams)
338 	{
339 		size_t bml = (inParams.length+7)/8;
340 		ubyte[] bma;
341 		bma.length = bml;
342 		foreach (i; 0..inParams.length)
343 		{
344 			if(inParams[i].type != typeid(typeof(null)))
345 				continue;
346 			size_t bn = i/8;
347 			size_t bb = i%8;
348 			ubyte sr = 1;
349 			sr <<= bb;
350 			bma[bn] |= sr;
351 		}
352 		return bma;
353 	}
354 
355 	static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow
356 	{
357 		ubyte[] prefix;
358 		prefix.length = 14;
359 
360 		prefix[4] = CommandType.STMT_EXECUTE;
361 		hStmt.packInto(prefix[5..9]);
362 		prefix[9] = flags;   // flags, no cursor
363 		prefix[10] = 1; // iteration count - currently always 1
364 		prefix[11] = 0;
365 		prefix[12] = 0;
366 		prefix[13] = 0;
367 
368 		return prefix;
369 	}
370 
371 	//TODO: All low-level commms should be moved into the mysql.protocol package.
372 	static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa,
373 		out ubyte[] vals, out bool longData)
374 	{
375 		size_t pc = inParams.length;
376 		ubyte[] types;
377 		types.length = pc*2;
378 		size_t alloc = pc*20;
379 		vals.length = alloc;
380 		uint vcl = 0, len;
381 		int ct = 0;
382 
383 		void reAlloc(size_t n)
384 		{
385 			if (vcl+n < alloc)
386 				return;
387 			size_t inc = (alloc*3)/2;
388 			if (inc <  n)
389 				inc = n;
390 			alloc += inc;
391 			vals.length = alloc;
392 		}
393 
394 		foreach (size_t i; 0..pc)
395 		{
396 			enum UNSIGNED  = 0x80;
397 			enum SIGNED    = 0;
398 			if (psa[i].chunkSize)
399 				longData= true;
400 			if (inParams[i].type == typeid(typeof(null)))
401 			{
402 				types[ct++] = SQLType.NULL;
403 				types[ct++] = SIGNED;
404 				continue;
405 			}
406 			Variant v = inParams[i];
407 			SQLType ext = psa[i].type;
408 			string ts = v.type.toString();
409 			bool isRef;
410 			if (ts[$-1] == '*')
411 			{
412 				ts.length = ts.length-1;
413 				isRef= true;
414 			}
415 
416 			switch (ts)
417 			{
418 				case "bool":
419 					if (ext == SQLType.INFER_FROM_D_TYPE)
420 						types[ct++] = SQLType.BIT;
421 					else
422 						types[ct++] = cast(ubyte) ext;
423 					types[ct++] = SIGNED;
424 					reAlloc(2);
425 					bool bv = isRef? *(v.get!(bool*)): v.get!(bool);
426 					vals[vcl++] = 1;
427 					vals[vcl++] = bv? 0x31: 0x30;
428 					break;
429 				case "byte":
430 					types[ct++] = SQLType.TINY;
431 					types[ct++] = SIGNED;
432 					reAlloc(1);
433 					vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte);
434 					break;
435 				case "ubyte":
436 					types[ct++] = SQLType.TINY;
437 					types[ct++] = UNSIGNED;
438 					reAlloc(1);
439 					vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte);
440 					break;
441 				case "short":
442 					types[ct++] = SQLType.SHORT;
443 					types[ct++] = SIGNED;
444 					reAlloc(2);
445 					short si = isRef? *(v.get!(short*)): v.get!(short);
446 					vals[vcl++] = cast(ubyte) (si & 0xff);
447 					vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
448 					break;
449 				case "ushort":
450 					types[ct++] = SQLType.SHORT;
451 					types[ct++] = UNSIGNED;
452 					reAlloc(2);
453 					ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort);
454 					vals[vcl++] = cast(ubyte) (us & 0xff);
455 					vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
456 					break;
457 				case "int":
458 					types[ct++] = SQLType.INT;
459 					types[ct++] = SIGNED;
460 					reAlloc(4);
461 					int ii = isRef? *(v.get!(int*)): v.get!(int);
462 					vals[vcl++] = cast(ubyte) (ii & 0xff);
463 					vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
464 					vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
465 					vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
466 					break;
467 				case "uint":
468 					types[ct++] = SQLType.INT;
469 					types[ct++] = UNSIGNED;
470 					reAlloc(4);
471 					uint ui = isRef? *(v.get!(uint*)): v.get!(uint);
472 					vals[vcl++] = cast(ubyte) (ui & 0xff);
473 					vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
474 					vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
475 					vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
476 					break;
477 				case "long":
478 					types[ct++] = SQLType.LONGLONG;
479 					types[ct++] = SIGNED;
480 					reAlloc(8);
481 					long li = isRef? *(v.get!(long*)): v.get!(long);
482 					vals[vcl++] = cast(ubyte) (li & 0xff);
483 					vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
484 					vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
485 					vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
486 					vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
487 					vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
488 					vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
489 					vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
490 					break;
491 				case "ulong":
492 					types[ct++] = SQLType.LONGLONG;
493 					types[ct++] = UNSIGNED;
494 					reAlloc(8);
495 					ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong);
496 					vals[vcl++] = cast(ubyte) (ul & 0xff);
497 					vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
498 					vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
499 					vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
500 					vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
501 					vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
502 					vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
503 					vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
504 					break;
505 				case "float":
506 					types[ct++] = SQLType.FLOAT;
507 					types[ct++] = SIGNED;
508 					reAlloc(4);
509 					float f = isRef? *(v.get!(float*)): v.get!(float);
510 					ubyte* ubp = cast(ubyte*) &f;
511 					vals[vcl++] = *ubp++;
512 					vals[vcl++] = *ubp++;
513 					vals[vcl++] = *ubp++;
514 					vals[vcl++] = *ubp;
515 					break;
516 				case "double":
517 					types[ct++] = SQLType.DOUBLE;
518 					types[ct++] = SIGNED;
519 					reAlloc(8);
520 					double d = isRef? *(v.get!(double*)): v.get!(double);
521 					ubyte* ubp = cast(ubyte*) &d;
522 					vals[vcl++] = *ubp++;
523 					vals[vcl++] = *ubp++;
524 					vals[vcl++] = *ubp++;
525 					vals[vcl++] = *ubp++;
526 					vals[vcl++] = *ubp++;
527 					vals[vcl++] = *ubp++;
528 					vals[vcl++] = *ubp++;
529 					vals[vcl++] = *ubp;
530 					break;
531 				case "std.datetime.date.Date":
532 				case "std.datetime.Date":
533 					types[ct++] = SQLType.DATE;
534 					types[ct++] = SIGNED;
535 					Date date = isRef? *(v.get!(Date*)): v.get!(Date);
536 					ubyte[] da = pack(date);
537 					size_t l = da.length;
538 					reAlloc(l);
539 					vals[vcl..vcl+l] = da[];
540 					vcl += l;
541 					break;
542 				case "std.datetime.TimeOfDay":
543 				case "std.datetime.Time":
544 					types[ct++] = SQLType.TIME;
545 					types[ct++] = SIGNED;
546 					TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay);
547 					ubyte[] ta = pack(time);
548 					size_t l = ta.length;
549 					reAlloc(l);
550 					vals[vcl..vcl+l] = ta[];
551 					vcl += l;
552 					break;
553 				case "std.datetime.date.DateTime":
554 				case "std.datetime.DateTime":
555 					types[ct++] = SQLType.DATETIME;
556 					types[ct++] = SIGNED;
557 					DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime);
558 					ubyte[] da = pack(dt);
559 					size_t l = da.length;
560 					reAlloc(l);
561 					vals[vcl..vcl+l] = da[];
562 					vcl += l;
563 					break;
564 				case "mysql.types.Timestamp":
565 					types[ct++] = SQLType.TIMESTAMP;
566 					types[ct++] = SIGNED;
567 					Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp);
568 					DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep);
569 					ubyte[] da = pack(dt);
570 					size_t l = da.length;
571 					reAlloc(l);
572 					vals[vcl..vcl+l] = da[];
573 					vcl += l;
574 					break;
575 				case "immutable(char)[]":
576 					if (ext == SQLType.INFER_FROM_D_TYPE)
577 						types[ct++] = SQLType.VARCHAR;
578 					else
579 						types[ct++] = cast(ubyte) ext;
580 					types[ct++] = SIGNED;
581 					string s = isRef? *(v.get!(string*)): v.get!(string);
582 					ubyte[] packed = packLCS(cast(void[]) s);
583 					reAlloc(packed.length);
584 					vals[vcl..vcl+packed.length] = packed[];
585 					vcl += packed.length;
586 					break;
587 				case "char[]":
588 					if (ext == SQLType.INFER_FROM_D_TYPE)
589 						types[ct++] = SQLType.VARCHAR;
590 					else
591 						types[ct++] = cast(ubyte) ext;
592 					types[ct++] = SIGNED;
593 					char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]);
594 					ubyte[] packed = packLCS(cast(void[]) ca);
595 					reAlloc(packed.length);
596 					vals[vcl..vcl+packed.length] = packed[];
597 					vcl += packed.length;
598 					break;
599 				case "byte[]":
600 					if (ext == SQLType.INFER_FROM_D_TYPE)
601 						types[ct++] = SQLType.TINYBLOB;
602 					else
603 						types[ct++] = cast(ubyte) ext;
604 					types[ct++] = SIGNED;
605 					byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]);
606 					ubyte[] packed = packLCS(cast(void[]) ba);
607 					reAlloc(packed.length);
608 					vals[vcl..vcl+packed.length] = packed[];
609 					vcl += packed.length;
610 					break;
611 				case "ubyte[]":
612 					if (ext == SQLType.INFER_FROM_D_TYPE)
613 						types[ct++] = SQLType.TINYBLOB;
614 					else
615 						types[ct++] = cast(ubyte) ext;
616 					types[ct++] = SIGNED;
617 					ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]);
618 					ubyte[] packed = packLCS(cast(void[]) uba);
619 					reAlloc(packed.length);
620 					vals[vcl..vcl+packed.length] = packed[];
621 					vcl += packed.length;
622 					break;
623 				case "void":
624 					throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
625 				default:
626 					throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__);
627 			}
628 		}
629 		vals.length = vcl;
630 		return types;
631 	}
632 
633 	static void sendLongData(Connection conn, uint hStmt, ParameterSpecialization[] psa)
634 	{
635 		assert(psa.length <= ushort.max); // parameter number is sent as short
636 		foreach (ushort i, PSN psn; psa)
637 		{
638 			if (!psn.chunkSize) continue;
639 			uint cs = psn.chunkSize;
640 			uint delegate(ubyte[]) dg = psn.chunkDelegate;
641 
642 			//TODO: All low-level commms should be moved into the mysql.protocol package.
643 			ubyte[] chunk;
644 			chunk.length = cs+11;
645 			chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
646 			chunk[4] = CommandType.STMT_SEND_LONG_DATA;
647 			hStmt.packInto(chunk[5..9]); // statement handle
648 			packInto(i, chunk[9..11]); // parameter number
649 
650 			// byte 11 on is payload
651 			for (;;)
652 			{
653 				uint sent = dg(chunk[11..cs+11]);
654 				if (sent < cs)
655 				{
656 					if (sent == 0)    // data was exact multiple of chunk size - all sent
657 						break;
658 					sent += 7;        // adjust for non-payload bytes
659 					chunk.length = chunk.length - (cs-sent);     // trim the chunk
660 					packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
661 					conn.send(chunk);
662 					break;
663 				}
664 				conn.send(chunk);
665 			}
666 		}
667 	}
668 
669 	static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
670 		Variant[] inParams, ParameterSpecialization[] psa)
671 	{
672 		conn.autoPurge();
673 		
674 		//TODO: All low-level commms should be moved into the mysql.protocol package.
675 		ubyte[] packet;
676 		conn.resetPacket();
677 
678 		ubyte[] prefix = makePSPrefix(hStmt, 0);
679 		size_t len = prefix.length;
680 		bool longData;
681 
682 		if (psh.paramCount)
683 		{
684 			ubyte[] one = [ 1 ];
685 			ubyte[] vals;
686 			ubyte[] types = analyseParams(inParams, psa, vals, longData);
687 			ubyte[] nbm = makeBitmap(inParams);
688 			packet = prefix ~ nbm ~ one ~ types ~ vals;
689 		}
690 		else
691 			packet = prefix;
692 
693 		if (longData)
694 			sendLongData(conn, hStmt, psa);
695 
696 		assert(packet.length <= uint.max);
697 		packet.setPacketHeader(conn.pktNumber);
698 		conn.bumpPacket();
699 		conn.send(packet);
700 	}
701 }
702 
703 /++
704 A class representing a database connection.
705 
706 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
707 creating a new Connection directly. That will provide certain benefits,
708 such as reusing old connections and automatic cleanup (no need to close
709 the connection when done).
710 
711 ------------------
712 // Suggested usage:
713 
714 {
715 	auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
716 	scope(exit) con.close();
717 
718 	// Use the connection
719 	...
720 }
721 ------------------
722 +/
723 //TODO: All low-level commms should be moved into the mysql.protocol package.
724 class Connection
725 {
726 /+
727 The Connection is responsible for handshaking with the server to establish
728 authentication. It then passes client preferences to the server, and
729 subsequently is the channel for all command packets that are sent, and all
730 response packets received.
731 
732 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one
733 byte as a packet number. Connection deals with the headers and ensures that
734 packet numbers are sequential.
735 
736 The initial packet is sent by the server - essentially a 'hello' packet
737 inviting login. That packet has a sequence number of zero. That sequence
738 number is the incremented by client and server packets through the handshake
739 sequence.
740 
741 After login all further sequences are initialized by the client sending a
742 command packet with a zero sequence number, to which the server replies with
743 zero or more packets with sequential sequence numbers.
744 +/
745 package:
746 	enum OpenState
747 	{
748 		/// We have not yet connected to the server, or have sent QUIT to the
749 		/// server and closed the connection
750 		notConnected,
751 		/// We have connected to the server and parsed the greeting, but not
752 		/// yet authenticated
753 		connected,
754 		/// We have successfully authenticated against the server, and need to
755 		/// send QUIT to the server when closing the connection
756 		authenticated
757 	}
758 	OpenState   _open;
759 	MySQLSocket _socket;
760 
761 	SvrCapFlags _sCaps, _cCaps;
762 	uint    _sThread;
763 	ushort  _serverStatus;
764 	ubyte   _sCharSet, _protocol;
765 	string  _serverVersion;
766 
767 	string _host, _user, _pwd, _db;
768 	ushort _port;
769 
770 	MySQLSocketType _socketType;
771 
772 	OpenSocketCallbackPhobos _openSocketPhobos;
773 	OpenSocketCallbackVibeD  _openSocketVibeD;
774 
775 	ulong _insertID;
776 
777 	// This gets incremented every time a command is issued or results are purged,
778 	// so a ResultRange can tell whether it's been invalidated.
779 	ulong _lastCommandID;
780 
781 	// Whether there are rows, headers or bimary data waiting to be retreived.
782 	// MySQL protocol doesn't permit performing any other action until all
783 	// such data is read.
784 	bool _rowsPending, _headersPending, _binaryPending;
785 
786 	// Field count of last performed command.
787 	//TODO: Does Connection need to store this?
788 	ushort _fieldCount;
789 
790 	// ResultSetHeaders of last performed command.
791 	//TODO: Does Connection need to store this? Is this even used?
792 	ResultSetHeaders _rsh;
793 
794 	// This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise
795 	// you'll get the dreaded "packet out of order" message. It, and the socket connection are
796 	// the reason why most other objects require a connection object for their construction.
797 	ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct
798 				/// ordering. First packet should have 0
799 	@property ubyte pktNumber()   { return _cpn; }
800 	void bumpPacket()       { _cpn++; }
801 	void resetPacket()      { _cpn = 0; }
802 
803 	version(Have_vibe_d_core) {} else
804 	pure const nothrow invariant()
805 	{
806 		assert(_socketType != MySQLSocketType.vibed);
807 	}
808 
809 	ubyte[] getPacket()
810 	{
811 		scope(failure) kill();
812 
813 		ubyte[4] header;
814 		_socket.read(header);
815 		// number of bytes always set as 24-bit
816 		uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
817 		enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order");
818 		bumpPacket();
819 
820 		ubyte[] packet = new ubyte[numDataBytes];
821 		_socket.read(packet);
822 		assert(packet.length == numDataBytes, "Wrong number of bytes read");
823 		return packet;
824 	}
825 
826 	void send(const(ubyte)[] packet)
827 	in
828 	{
829 		assert(packet.length > 4); // at least 1 byte more than header
830 	}
831 	body
832 	{
833 		_socket.write(packet);
834 	}
835 
836 	void send(const(ubyte)[] header, const(ubyte)[] data)
837 	in
838 	{
839 		assert(header.length == 4 || header.length == 5/*command type included*/);
840 	}
841 	body
842 	{
843 		_socket.write(header);
844 		if(data.length)
845 			_socket.write(data);
846 	}
847 
848 	void sendCmd(T)(CommandType cmd, const(T)[] data)
849 	in
850 	{
851 		// Internal thread states. Clients shouldn't use this
852 		assert(cmd != CommandType.SLEEP);
853 		assert(cmd != CommandType.CONNECT);
854 		assert(cmd != CommandType.TIME);
855 		assert(cmd != CommandType.DELAYED_INSERT);
856 		assert(cmd != CommandType.CONNECT_OUT);
857 
858 		// Deprecated
859 		assert(cmd != CommandType.CREATE_DB);
860 		assert(cmd != CommandType.DROP_DB);
861 		assert(cmd != CommandType.TABLE_DUMP);
862 
863 		// cannot send more than uint.max bytes. TODO: better error message if we try?
864 		assert(data.length <= uint.max);
865 	}
866 	out
867 	{
868 		// at this point we should have sent a command
869 		assert(pktNumber == 1);
870 	}
871 	body
872 	{
873 		autoPurge();
874 
875 		scope(failure) kill();
876 
877 		_lastCommandID++;
878 
879 		if(!_socket.connected)
880 		{
881 			if(cmd == CommandType.QUIT)
882 				return; // Don't bother reopening connection just to quit
883 
884 			_open = OpenState.notConnected;
885 			connect(_clientCapabilities);
886 		}
887 
888 		resetPacket();
889 
890 		ubyte[] header;
891 		header.length = 4 /*header*/ + 1 /*cmd*/;
892 		header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/);
893 		header[4] = cmd;
894 		bumpPacket();
895 
896 		send(header, cast(const(ubyte)[])data);
897 	}
898 
899 	OKErrorPacket getCmdResponse(bool asString = false)
900 	{
901 		auto okp = OKErrorPacket(getPacket());
902 		enforcePacketOK(okp);
903 		_serverStatus = okp.serverStatus;
904 		return okp;
905 	}
906 
907 	/// Get the next `mysql.result.Row` of a pending result set.
908 	Row getNextRow()
909 	{
910 		scope(failure) kill();
911 
912 		if (_headersPending)
913 		{
914 			_rsh = ResultSetHeaders(this, _fieldCount);
915 			_headersPending = false;
916 		}
917 		ubyte[] packet;
918 		Row rr;
919 		packet = getPacket();
920 		if (packet.isEOFPacket())
921 		{
922 			_rowsPending = _binaryPending = false;
923 			return rr;
924 		}
925 		if (_binaryPending)
926 			rr = Row(this, packet, _rsh, true);
927 		else
928 			rr = Row(this, packet, _rsh, false);
929 		//rr._valid = true;
930 		return rr;
931 	}
932 
933 	ubyte[] buildAuthPacket(ubyte[] token)
934 	in
935 	{
936 		assert(token.length == 20);
937 	}
938 	body
939 	{
940 		ubyte[] packet;
941 		packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1);
942 		packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
943 
944 		// NOTE: we'll set the header last when we know the size
945 
946 		// Set the default capabilities required by the client
947 		_cCaps.packInto(packet[4..8]);
948 
949 		// Request a conventional maximum packet length.
950 		1.packInto(packet[8..12]);
951 
952 		packet ~= 33; // Set UTF-8 as default charSet
953 
954 		// There's a statutory block of zero bytes here - fill them in.
955 		foreach(i; 0 .. 23)
956 			packet ~= 0;
957 
958 		// Add the user name as a null terminated string
959 		foreach(i; 0 .. _user.length)
960 			packet ~= _user[i];
961 		packet ~= 0; // \0
962 
963 		// Add our calculated authentication token as a length prefixed string.
964 		assert(token.length <= ubyte.max);
965 		if(_pwd.length == 0)  // Omit the token if the account has no password
966 			packet ~= 0;
967 		else
968 		{
969 			packet ~= cast(ubyte)token.length;
970 			foreach(i; 0 .. token.length)
971 				packet ~= token[i];
972 		}
973 
974 		// Add the default database as a null terminated string
975 		foreach(i; 0 .. _db.length)
976 			packet ~= _db[i];
977 		packet ~= 0; // \0
978 
979 		// The server sent us a greeting with packet number 0, so we send the auth packet
980 		// back with the next number.
981 		packet.setPacketHeader(pktNumber);
982 		bumpPacket();
983 		return packet;
984 	}
985 
986 	void consumeServerInfo(ref ubyte[] packet)
987 	{
988 		scope(failure) kill();
989 
990 		_sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
991 		_sCharSet = packet.consume!ubyte(); // server_language
992 		_serverStatus = packet.consume!ushort(); //server_status
993 		_sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
994 		_sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
995 
996 		enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
997 		enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
998 	}
999 
1000 	ubyte[] parseGreeting()
1001 	{
1002 		scope(failure) kill();
1003 
1004 		ubyte[] packet = getPacket();
1005 
1006 		if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
1007 		{
1008 			auto okp = OKErrorPacket(packet);
1009 			enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
1010 		}
1011 
1012 		_protocol = packet.consume!ubyte();
1013 
1014 		_serverVersion = packet.consume!string(packet.countUntil(0));
1015 		packet.skip(1); // \0 terminated _serverVersion
1016 
1017 		_sThread = packet.consume!uint();
1018 
1019 		// read first part of scramble buf
1020 		ubyte[] authBuf;
1021 		authBuf.length = 255;
1022 		authBuf[0..8] = packet.consume(8)[]; // scramble_buff
1023 
1024 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
1025 
1026 		consumeServerInfo(packet);
1027 
1028 		packet.skip(1); // this byte supposed to be scramble length, but is actually zero
1029 		packet.skip(10); // filler of \0
1030 
1031 		// rest of the scramble
1032 		auto len = packet.countUntil(0);
1033 		enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
1034 		enforce(authBuf.length > 8+len);
1035 		authBuf[8..8+len] = packet.consume(len)[];
1036 		authBuf.length = 8+len; // cut to correct size
1037 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
1038 
1039 		return authBuf;
1040 	}
1041 
1042 	static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port)
1043 	{
1044 		auto s = new PlainPhobosSocket();
1045 		s.connect(new InternetAddress(host, port));
1046 		return s;
1047 	}
1048 
1049 	static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port)
1050 	{
1051 		version(Have_vibe_d_core)
1052 			return vibe.core.net.connectTCP(host, port);
1053 		else
1054 			assert(0);
1055 	}
1056 
1057 	void initConnection()
1058 	{
1059 		resetPacket();
1060 		final switch(_socketType)
1061 		{
1062 			case MySQLSocketType.phobos:
1063 				_socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port));
1064 				break;
1065 
1066 			case MySQLSocketType.vibed:
1067 				version(Have_vibe_d_core) {
1068 					_socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port));
1069 					break;
1070 				} else assert(0, "Unsupported socket type. Need version Have_vibe_d_core.");
1071 		}
1072 	}
1073 
1074 	ubyte[] makeToken(ubyte[] authBuf)
1075 	{
1076 		auto pass1 = sha1Of(cast(const(ubyte)[])_pwd);
1077 		auto pass2 = sha1Of(pass1);
1078 
1079 		SHA1 sha1;
1080 		sha1.start();
1081 		sha1.put(authBuf);
1082 		sha1.put(pass2);
1083 		auto result = sha1.finish();
1084 		foreach (size_t i; 0..20)
1085 			result[i] = result[i] ^ pass1[i];
1086 		return result.dup;
1087 	}
1088 
1089 	SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
1090 	{
1091 		SvrCapFlags common;
1092 		uint filter = 1;
1093 		foreach (size_t i; 0..uint.sizeof)
1094 		{
1095 			bool serverSupport = (server & filter) != 0; // can the server do this capability?
1096 			bool clientSupport = (client & filter) != 0; // can we support it?
1097 			if(serverSupport && clientSupport)
1098 				common |= filter;
1099 			filter <<= 1; // check next flag
1100 		}
1101 		return common;
1102 	}
1103 
1104 	void setClientFlags(SvrCapFlags capFlags)
1105 	{
1106 		_cCaps = getCommonCapabilities(_sCaps, capFlags);
1107 
1108 		// We cannot operate in <4.1 protocol, so we'll force it even if the user
1109 		// didn't supply it
1110 		_cCaps |= SvrCapFlags.PROTOCOL41;
1111 		_cCaps |= SvrCapFlags.SECURE_CONNECTION;
1112 	}
1113 
1114 	void authenticate(ubyte[] greeting)
1115 	in
1116 	{
1117 		assert(_open == OpenState.connected);
1118 	}
1119 	out
1120 	{
1121 		assert(_open == OpenState.authenticated);
1122 	}
1123 	body
1124 	{
1125 		auto token = makeToken(greeting);
1126 		auto authPacket = buildAuthPacket(token);
1127 		send(authPacket);
1128 
1129 		auto packet = getPacket();
1130 		auto okp = OKErrorPacket(packet);
1131 		enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
1132 		_open = OpenState.authenticated;
1133 	}
1134 
1135 	SvrCapFlags _clientCapabilities;
1136 
1137 	void connect(SvrCapFlags clientCapabilities)
1138 	in
1139 	{
1140 		assert(closed);
1141 	}
1142 	out
1143 	{
1144 		assert(_open == OpenState.authenticated);
1145 	}
1146 	body
1147 	{
1148 		initConnection();
1149 		auto greeting = parseGreeting();
1150 		_open = OpenState.connected;
1151 
1152 		_clientCapabilities = clientCapabilities;
1153 		setClientFlags(clientCapabilities);
1154 		authenticate(greeting);
1155 	}
1156 	
1157 	/// Forcefully close the socket without sending the quit command.
1158 	/// Needed in case an error leaves communatations in an undefined or non-recoverable state.
1159 	void kill()
1160 	{
1161 		if(_socket.connected)
1162 			_socket.close();
1163 		_open = OpenState.notConnected;
1164 		// any pending data is gone. Any statements to release will be released
1165 		// on the server automatically.
1166 		_headersPending = _rowsPending = _binaryPending = false;
1167 		
1168 		static if(__traits(compiles, (){ int[int] aa; aa.clear(); }))
1169 			preparedLookup.clear();
1170 		else
1171 			preparedLookup = null;
1172 	}
1173 	
1174 	/// Called whenever mysql-native needs to send a command to the server
1175 	/// and be sure there aren't any pending results (which would prevent
1176 	/// a new command from being sent).
1177 	void autoPurge()
1178 	{
1179 		// This is called every time a command is sent,
1180 		// so detect & prevent infinite recursion.
1181 		static bool isAutoPurging = false;
1182 
1183 		if(isAutoPurging)
1184 			return;
1185 			
1186 		isAutoPurging = true;
1187 		scope(exit) isAutoPurging = false;
1188 
1189 		try
1190 		{
1191 			purgeResult();
1192 			releaseQueued();
1193 		}
1194 		catch(Exception e)
1195 		{
1196 			// likely the connection was closed, so reset any state.
1197 			// Don't treat this as a real error, because everything will be reset when we
1198 			// reconnect.
1199 			kill();
1200 		}
1201 	}
1202 
1203 	/// Lookup per-connection prepared statement info by SQL
1204 	PreparedServerInfo[string] preparedLookup;
1205 	
1206 	/// Set `queuedForRelease` flag for a statement in `preparedLookup`.
1207 	/// Does nothing if statement not in `preparedLookup`.
1208 	private void setQueuedForRelease(string sql, bool value)
1209 	{
1210 		if(sql in preparedLookup)
1211 		{
1212 			auto info = preparedLookup[sql];
1213 			info.queuedForRelease = value;
1214 			preparedLookup[sql] = info;
1215 		}
1216 	}
1217 
1218 	/// Queue a prepared statement for release.
1219 	void queueForRelease(string sql)
1220 	{
1221 		// If connection's closed, then it IS released.
1222 		if(closed)
1223 			return;
1224 
1225 		setQueuedForRelease(sql, true);
1226 	}
1227 
1228 	/// Remove a statement from the queue to be released.
1229 	void unqueueForRelease(string sql)
1230 	{
1231 		setQueuedForRelease(sql, false);
1232 	}
1233 
1234 	/// Releases all prepared statements that are queued for release.
1235 	void releaseQueued()
1236 	{
1237 		foreach(sql, info; preparedLookup)
1238 		if(info.queuedForRelease)
1239 		{
1240 			immediateReleasePrepared(info.statementId);
1241 			preparedLookup.remove(sql);
1242 		}
1243 	}
1244 
1245 	/// Returns null if not found
1246 	Nullable!PreparedServerInfo getPreparedServerInfo(const string sql) pure nothrow
1247 	{
1248 		Nullable!PreparedServerInfo result;
1249 		
1250 		auto pInfo = sql in preparedLookup;
1251 		if(pInfo)
1252 			result = *pInfo;
1253 		
1254 		return result;
1255 	}
1256 	
1257 	/// If already registered, simply returns the cached `PreparedServerInfo`.
1258 	PreparedServerInfo registerIfNeeded(string sql)
1259 	out(info)
1260 	{
1261 		// I'm confident this can't currently happen, but
1262 		// let's make sure that doesn't change.
1263 		assert(!info.queuedForRelease);
1264 	}
1265 	body
1266 	{
1267 		if(auto pInfo = sql in preparedLookup)
1268 		{
1269 			// The statement is registered. It may, or may not, be queued
1270 			// for release. Either way, all we need to do is make sure it's
1271 			// un-queued and then return.
1272 			pInfo.queuedForRelease = false;
1273 			return *pInfo;
1274 		}
1275 
1276 		auto info = registerIfNeededImpl(sql);
1277 		preparedLookup[sql] = info;
1278 
1279 		return info;
1280 	}
1281 
1282 	PreparedServerInfo registerIfNeededImpl(string sql)
1283 	{
1284 		scope(failure) kill();
1285 
1286 		PreparedServerInfo info;
1287 		
1288 		sendCmd(CommandType.STMT_PREPARE, sql);
1289 		_fieldCount = 0;
1290 
1291 		//TODO: All packet handling should be moved into the mysql.protocol package.
1292 		ubyte[] packet = getPacket();
1293 		if(packet.front == ResultPacketMarker.ok)
1294 		{
1295 			packet.popFront();
1296 			info.statementId    = packet.consume!int();
1297 			_fieldCount         = packet.consume!short();
1298 			info.numParams      = packet.consume!short();
1299 
1300 			packet.popFront(); // one byte filler
1301 			info.psWarnings     = packet.consume!short();
1302 
1303 			// At this point the server also sends field specs for parameters
1304 			// and columns if there were any of each
1305 			info.headers = PreparedStmtHeaders(this, _fieldCount, info.numParams);
1306 		}
1307 		else if(packet.front == ResultPacketMarker.error)
1308 		{
1309 			auto error = OKErrorPacket(packet);
1310 			enforcePacketOK(error);
1311 			assert(0); // FIXME: what now?
1312 		}
1313 		else
1314 			assert(0); // FIXME: what now?
1315 
1316 		return info;
1317 	}
1318 
1319 	private void immediateReleasePrepared(uint statementId)
1320 	{
1321 		scope(failure) kill();
1322 
1323 		if(closed())
1324 			return;
1325 
1326 		//TODO: All low-level commms should be moved into the mysql.protocol package.
1327 		ubyte[9] packet_buf;
1328 		ubyte[] packet = packet_buf;
1329 		packet.setPacketHeader(0/*packet number*/);
1330 		bumpPacket();
1331 		packet[4] = CommandType.STMT_CLOSE;
1332 		statementId.packInto(packet[5..9]);
1333 		purgeResult();
1334 		send(packet);
1335 		// It seems that the server does not find it necessary to send a response
1336 		// for this command.
1337 	}
1338 
1339 public:
1340 
1341 	/++
1342 	Construct opened connection.
1343 
1344 	Throws `mysql.exceptions.MYX` upon failure to connect.
1345 	
1346 	If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
1347 	creating a new Connection directly. That will provide certain benefits,
1348 	such as reusing old connections and automatic cleanup (no need to close
1349 	the connection when done).
1350 
1351 	------------------
1352 	// Suggested usage:
1353 
1354 	{
1355 	    auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
1356 	    scope(exit) con.close();
1357 
1358 	    // Use the connection
1359 	    ...
1360 	}
1361 	------------------
1362 
1363 	Params:
1364 		cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld"
1365 			(TODO: The connection string needs work to allow for semicolons in its parts!)
1366 		socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos,
1367 			unless compiled with `-version=Have_vibe_d_core` (set automatically
1368 			if using $(LINK2 http://code.dlang.org/getting_started, DUB)).
1369 		openSocket = Optional callback which should return a newly-opened Phobos
1370 			or Vibe.d TCP socket. This allows custom sockets to be used,
1371 			subclassed from Phobos's or Vibe.d's sockets.
1372 		host = An IP address in numeric dotted form, or as a host  name.
1373 		user = The user name to authenticate.
1374 		password = User's password.
1375 		db = Desired initial database.
1376 		capFlags = The set of flag bits from the server's capabilities that the client requires
1377 	+/
1378 	//After the connection is created, and the initial invitation is received from the server
1379 	//client preferences can be set, and authentication can then be attempted.
1380 	this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1381 	{
1382 		version(Have_vibe_d_core)
1383 			enum defaultSocketType = MySQLSocketType.vibed;
1384 		else
1385 			enum defaultSocketType = MySQLSocketType.phobos;
1386 
1387 		this(defaultSocketType, host, user, pwd, db, port, capFlags);
1388 	}
1389 
1390 	///ditto
1391 	this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1392 	{
1393 		version(Have_vibe_d_core) {} else
1394 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
1395 
1396 		this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD,
1397 			host, user, pwd, db, port, capFlags);
1398 	}
1399 
1400 	///ditto
1401 	this(OpenSocketCallbackPhobos openSocket,
1402 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1403 	{
1404 		this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags);
1405 	}
1406 
1407 	version(Have_vibe_d_core)
1408 	///ditto
1409 	this(OpenSocketCallbackVibeD openSocket,
1410 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1411 	{
1412 		this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags);
1413 	}
1414 
1415 	///ditto
1416 	private this(MySQLSocketType socketType,
1417 		OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD,
1418 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1419 	in
1420 	{
1421 		final switch(socketType)
1422 		{
1423 			case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break;
1424 			case MySQLSocketType.vibed:  assert(openSocketVibeD  !is null); break;
1425 		}
1426 	}
1427 	body
1428 	{
1429 		enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1");
1430 		enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection");
1431 		version(Have_vibe_d_core) {} else
1432 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
1433 
1434 		_socketType = socketType;
1435 		_host = host;
1436 		_user = user;
1437 		_pwd = pwd;
1438 		_db = db;
1439 		_port = port;
1440 
1441 		_openSocketPhobos = openSocketPhobos;
1442 		_openSocketVibeD  = openSocketVibeD;
1443 
1444 		connect(capFlags);
1445 	}
1446 
1447 	///ditto
1448 	//After the connection is created, and the initial invitation is received from the server
1449 	//client preferences can be set, and authentication can then be attempted.
1450 	this(string cs, SvrCapFlags capFlags = defaultClientFlags)
1451 	{
1452 		string[] a = parseConnectionString(cs);
1453 		this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1454 	}
1455 
1456 	///ditto
1457 	this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags)
1458 	{
1459 		string[] a = parseConnectionString(cs);
1460 		this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1461 	}
1462 
1463 	///ditto
1464 	this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
1465 	{
1466 		string[] a = parseConnectionString(cs);
1467 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1468 	}
1469 
1470 	version(Have_vibe_d_core)
1471 	///ditto
1472 	this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
1473 	{
1474 		string[] a = parseConnectionString(cs);
1475 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1476 	}
1477 
1478 	/++
1479 	Check whether this `Connection` is still connected to the server, or if
1480 	the connection has been closed.
1481 	+/
1482 	@property bool closed()
1483 	{
1484 		return _open == OpenState.notConnected || !_socket.connected;
1485 	}
1486 
1487 	/++
1488 	Explicitly close the connection.
1489 	
1490 	Idiomatic use as follows is suggested:
1491 	------------------
1492 	{
1493 	    auto con = new Connection("localhost:user:password:mysqld");
1494 	    scope(exit) con.close();
1495 	    // Use the connection
1496 	    ...
1497 	}
1498 	------------------
1499 	+/
1500 	void close()
1501 	{
1502 		// This is a two-stage process. First tell the server we are quitting this
1503 		// connection, and then close the socket.
1504 
1505 		if (_open == OpenState.authenticated && _socket.connected)
1506 			quit();
1507 
1508 		if (_open == OpenState.connected)
1509 			kill();
1510 		resetPacket();
1511 	}
1512 
1513 	/++
1514 	Reconnects to the server using the same connection settings originally
1515 	used to create the `Connection`.
1516 
1517 	Optionally takes a `mysql.protocol.constants.SvrCapFlags`, allowing you to
1518 	reconnect using a different set of server capability flags.
1519 
1520 	Normally, if the connection is already open, this will do nothing. However,
1521 	if you request a different set of `mysql.protocol.constants.SvrCapFlags`
1522 	then was originally used to create the `Connection`, the connection will
1523 	be closed and then reconnected using the new `mysql.protocol.constants.SvrCapFlags`.
1524 	+/
1525 	//TODO: This needs unittested.
1526 	void reconnect()
1527 	{
1528 		reconnect(_clientCapabilities);
1529 	}
1530 
1531 	///ditto
1532 	void reconnect(SvrCapFlags clientCapabilities)
1533 	{
1534 		bool sameCaps = clientCapabilities == _clientCapabilities;
1535 		if(!closed)
1536 		{
1537 			// Same caps as before?
1538 			if(clientCapabilities == _clientCapabilities)
1539 				return; // Nothing to do, just keep current connection
1540 
1541 			close();
1542 		}
1543 
1544 		connect(clientCapabilities);
1545 	}
1546 
1547 	private void quit()
1548 	in
1549 	{
1550 		assert(_open == OpenState.authenticated);
1551 	}
1552 	body
1553 	{
1554 		sendCmd(CommandType.QUIT, []);
1555 		// No response is sent for a quit packet
1556 		_open = OpenState.connected;
1557 	}
1558 
1559 	/++
1560 	Parses a connection string of the form
1561 	`"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"`
1562 
1563 	Port is optional and defaults to 3306.
1564 
1565 	Whitespace surrounding any name or value is automatically stripped.
1566 
1567 	Returns a five-element array of strings in this order:
1568 	$(UL
1569 	$(LI [0]: host)
1570 	$(LI [1]: user)
1571 	$(LI [2]: pwd)
1572 	$(LI [3]: db)
1573 	$(LI [4]: port)
1574 	)
1575 	
1576 	(TODO: The connection string needs work to allow for semicolons in its parts!)
1577 	+/
1578 	//TODO: Replace the return value with a proper struct.
1579 	static string[] parseConnectionString(string cs)
1580 	{
1581 		string[] rv;
1582 		rv.length = 5;
1583 		rv[4] = "3306"; // Default port
1584 		string[] a = split(cs, ";");
1585 		foreach (s; a)
1586 		{
1587 			string[] a2 = split(s, "=");
1588 			enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs);
1589 			string name = strip(a2[0]);
1590 			string val = strip(a2[1]);
1591 			switch (name)
1592 			{
1593 				case "host":
1594 					rv[0] = val;
1595 					break;
1596 				case "user":
1597 					rv[1] = val;
1598 					break;
1599 				case "pwd":
1600 					rv[2] = val;
1601 					break;
1602 				case "db":
1603 					rv[3] = val;
1604 					break;
1605 				case "port":
1606 					rv[4] = val;
1607 					break;
1608 				default:
1609 					throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__);
1610 			}
1611 		}
1612 		return rv;
1613 	}
1614 
1615 	/++
1616 	Select a current database.
1617 	
1618 	Throws `mysql.exceptions.MYX` upon failure.
1619 
1620 	Params: dbName = Name of the requested database
1621 	+/
1622 	void selectDB(string dbName)
1623 	{
1624 		sendCmd(CommandType.INIT_DB, dbName);
1625 		getCmdResponse();
1626 		_db = dbName;
1627 	}
1628 
1629 	/++
1630 	Check the server status.
1631 	
1632 	Throws `mysql.exceptions.MYX` upon failure.
1633 
1634 	Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined
1635 	+/
1636 	OKErrorPacket pingServer()
1637 	{
1638 		sendCmd(CommandType.PING, []);
1639 		return getCmdResponse();
1640 	}
1641 
1642 	/++
1643 	Refresh some feature(s) of the server.
1644 	
1645 	Throws `mysql.exceptions.MYX` upon failure.
1646 
1647 	Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined
1648 	+/
1649 	OKErrorPacket refreshServer(RefreshFlags flags)
1650 	{
1651 		sendCmd(CommandType.REFRESH, [flags]);
1652 		return getCmdResponse();
1653 	}
1654 
1655 	/++
1656 	Flush any outstanding result set elements.
1657 	
1658 	When the server responds to a command that produces a result set, it
1659 	queues the whole set of corresponding packets over the current connection.
1660 	Before that `Connection` can embark on any new command, it must receive
1661 	all of those packets and junk them.
1662 	
1663 	As of v1.1.4, this is done automatically as needed. But you can still
1664 	call this manually to force a purge to occur when you want.
1665 
1666 	See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/)
1667 	+/
1668 	ulong purgeResult()
1669 	{
1670 		scope(failure) kill();
1671 
1672 		_lastCommandID++;
1673 
1674 		ulong rows = 0;
1675 		if (_headersPending)
1676 		{
1677 			for (size_t i = 0;; i++)
1678 			{
1679 				if (getPacket().isEOFPacket())
1680 				{
1681 					_headersPending = false;
1682 					break;
1683 				}
1684 				enforceEx!MYXProtocol(i < _fieldCount,
1685 					text("Field header count (", _fieldCount, ") exceeded but no EOF packet found."));
1686 			}
1687 		}
1688 		if (_rowsPending)
1689 		{
1690 			for (;;  rows++)
1691 			{
1692 				if (getPacket().isEOFPacket())
1693 				{
1694 					_rowsPending = _binaryPending = false;
1695 					break;
1696 				}
1697 			}
1698 		}
1699 		resetPacket();
1700 		return rows;
1701 	}
1702 
1703 	/++
1704 	Get a textual report on the server status.
1705 	
1706 	(COM_STATISTICS)
1707 	+/
1708 	string serverStats()
1709 	{
1710 		sendCmd(CommandType.STATISTICS, []);
1711 		return cast(string) getPacket();
1712 	}
1713 
1714 	/++
1715 	Enable multiple statement commands.
1716 	
1717 	This can be used later if this feature was not requested in the client capability flags.
1718 	
1719 	Warning: This functionality is currently untested.
1720 	
1721 	Params: on = Boolean value to turn the capability on or off.
1722 	+/
1723 	//TODO: Need to test this
1724 	void enableMultiStatements(bool on)
1725 	{
1726 		scope(failure) kill();
1727 
1728 		ubyte[] t;
1729 		t.length = 2;
1730 		t[0] = on ? 0 : 1;
1731 		t[1] = 0;
1732 		sendCmd(CommandType.STMT_OPTION, t);
1733 
1734 		// For some reason this command gets an EOF packet as response
1735 		auto packet = getPacket();
1736 		enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1737 	}
1738 
1739 	/// Return the in-force protocol number.
1740 	@property ubyte protocol() pure const nothrow { return _protocol; }
1741 	/// Server version
1742 	@property string serverVersion() pure const nothrow { return _serverVersion; }
1743 	/// Server capability flags
1744 	@property uint serverCapabilities() pure const nothrow { return _sCaps; }
1745 	/// Server status
1746 	@property ushort serverStatus() pure const nothrow { return _serverStatus; }
1747 	/// Current character set
1748 	@property ubyte charSet() pure const nothrow { return _sCharSet; }
1749 	/// Current database
1750 	@property string currentDB() pure const nothrow { return _db; }
1751 	/// Socket type being used, Phobos or Vibe.d
1752 	@property MySQLSocketType socketType() pure const nothrow { return _socketType; }
1753 
1754 	/// After a command that inserted a row into a table with an auto-increment
1755 	/// ID column, this method allows you to retrieve the last insert ID.
1756 	@property ulong lastInsertID() pure const nothrow { return _insertID; }
1757 
1758 	/// This gets incremented every time a command is issued or results are purged,
1759 	/// so a `mysql.result.ResultRange` can tell whether it's been invalidated.
1760 	@property ulong lastCommandID() pure const nothrow { return _lastCommandID; }
1761 
1762 	/// Gets whether rows are pending.
1763 	///
1764 	/// Note, you may want `hasPending` instead.
1765 	@property bool rowsPending() pure const nothrow { return _rowsPending; }
1766 
1767 	/// Gets whether anything (rows, headers or binary) is pending.
1768 	/// New commands cannot be sent on a connection while anything is pending
1769 	/// (the pending data will automatically be purged.)
1770 	@property bool hasPending() pure const nothrow
1771 	{
1772 		return _rowsPending || _headersPending || _binaryPending;
1773 	}
1774 
1775 	/// Gets the result header's field descriptions.
1776 	@property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; }
1777 
1778 	/++
1779 	Manually register a prepared statement on this connection.
1780 	
1781 	Does nothing if statement is already registered on this connection.
1782 	
1783 	Calling this is not strictly necessary, as the prepared statement will
1784 	automatically be registered upon its first use on any `Connection`.
1785 	This is provided for those who prefer eager registration over lazy
1786 	for performance reasons.
1787 	+/
1788 	void register(Prepared prepared)
1789 	{
1790 		registerIfNeeded(prepared.sql);
1791 	}
1792 
1793 	/++
1794 	Manually release a prepared statement on this connection.
1795 	
1796 	This method tells the server that it can dispose of the information it
1797 	holds about the current prepared statement.
1798 	
1799 	Calling this is not strictly necessary. The server considers prepared
1800 	statements to be per-connection, so they'll go away when the connection
1801 	closes anyway. This is provided in case direct control is actually needed.
1802 
1803 	If you choose to use a reference counted struct to call this automatically,
1804 	be aware that embedding reference counted structs inside garbage collectible
1805 	heap objects is dangerous and should be avoided, as it can lead to various
1806 	hidden problems, from crashes to race conditions. (See the discussion at issue
1807 	$(LINK2 https://github.com/mysql-d/mysql-native/issues/159, #159)
1808 	for details.) Instead, it may be better to simply avoid trying to manage
1809 	their release at all, as it's not usually necessary. Or to periodically
1810 	release all prepared statements, and simply allow mysql-native to
1811 	automatically re-register them upon their next use.
1812 	
1813 	Notes:
1814 	
1815 	In actuality, the server might not immediately be told to release the
1816 	statement (although `isRegistered` will still report `false`).
1817 	
1818 	This is because there could be a `mysql.result.ResultRange` with results
1819 	still pending for retrieval, and the protocol doesn't allow sending commands
1820 	(such as "release a prepared statement") to the server while data is pending.
1821 	Therefore, this function may instead queue the statement to be released
1822 	when it is safe to do so: Either the next time a result set is purged or
1823 	the next time a command (such as `mysql.commands.query` or
1824 	`mysql.commands.exec`) is performed (because such commands automatically
1825 	purge any pending results).
1826 	
1827 	This function does NOT auto-purge because, if this is ever called from
1828 	automatic resource management cleanup (refcounting, RAII, etc), that
1829 	would create ugly situations where hidden, implicit behavior triggers
1830 	an unexpected auto-purge.
1831 	+/
1832 	void release(Prepared prepared)
1833 	{
1834 		release(prepared.sql);
1835 	}
1836 	
1837 	///ditto
1838 	void release(string sql)
1839 	{
1840 		//TODO: Don't queue it if nothing is pending. Just do it immediately.
1841 		//      But need to be certain both situations are unittested.
1842 		queueForRelease(sql);
1843 	}
1844 	
1845 	/// Is the given SQL registered on this connection as a prepared statement?
1846 	bool isRegistered(Prepared prepared)
1847 	{
1848 		return isRegistered( prepared.sql );
1849 	}
1850 
1851 	///ditto
1852 	bool isRegistered(string sql)
1853 	{
1854 		return isRegistered( getPreparedServerInfo(sql) );
1855 	}
1856 
1857 	///ditto
1858 	package bool isRegistered(Nullable!PreparedServerInfo info)
1859 	{
1860 		return !info.isNull && !info.queuedForRelease;
1861 	}
1862 }
1863 
1864 // Test register, release, isRegistered, and auto-register for prepared statements
1865 debug(MYSQLN_TESTS)
1866 unittest
1867 {
1868 	import mysql.connection;
1869 	import mysql.test.common;
1870 	
1871 	Prepared preparedInsert;
1872 	Prepared preparedSelect;
1873 	immutable insertSQL = "INSERT INTO `autoRegistration` VALUES (1), (2)";
1874 	immutable selectSQL = "SELECT `val` FROM `autoRegistration`";
1875 	int queryTupleResult;
1876 	
1877 	{
1878 		mixin(scopedCn);
1879 		
1880 		// Setup
1881 		cn.exec("DROP TABLE IF EXISTS `autoRegistration`");
1882 		cn.exec("CREATE TABLE `autoRegistration` (
1883 			`val` INTEGER
1884 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
1885 
1886 		// Initial register
1887 		preparedInsert = cn.prepare(insertSQL);
1888 		preparedSelect = cn.prepare(selectSQL);
1889 		
1890 		// Test basic register, release, isRegistered
1891 		assert(cn.isRegistered(preparedInsert));
1892 		assert(cn.isRegistered(preparedSelect));
1893 		cn.release(preparedInsert);
1894 		cn.release(preparedSelect);
1895 		assert(!cn.isRegistered(preparedInsert));
1896 		assert(!cn.isRegistered(preparedSelect));
1897 		
1898 		// Test manual re-register
1899 		cn.register(preparedInsert);
1900 		cn.register(preparedSelect);
1901 		assert(cn.isRegistered(preparedInsert));
1902 		assert(cn.isRegistered(preparedSelect));
1903 		
1904 		// Test double register
1905 		cn.register(preparedInsert);
1906 		cn.register(preparedSelect);
1907 		assert(cn.isRegistered(preparedInsert));
1908 		assert(cn.isRegistered(preparedSelect));
1909 
1910 		// Test double release
1911 		cn.release(preparedInsert);
1912 		cn.release(preparedSelect);
1913 		assert(!cn.isRegistered(preparedInsert));
1914 		assert(!cn.isRegistered(preparedSelect));
1915 		cn.release(preparedInsert);
1916 		cn.release(preparedSelect);
1917 		assert(!cn.isRegistered(preparedInsert));
1918 		assert(!cn.isRegistered(preparedSelect));
1919 	}
1920 
1921 	// Note that at this point, both prepared statements still exist,
1922 	// but are no longer registered on any connection. In fact, there
1923 	// are no open connections anymore.
1924 	
1925 	// Test auto-register: exec
1926 	{
1927 		mixin(scopedCn);
1928 	
1929 		assert(!cn.isRegistered(preparedInsert));
1930 		cn.exec(preparedInsert);
1931 		assert(cn.isRegistered(preparedInsert));
1932 	}
1933 	
1934 	// Test auto-register: query
1935 	{
1936 		mixin(scopedCn);
1937 	
1938 		assert(!cn.isRegistered(preparedSelect));
1939 		cn.query(preparedSelect).each();
1940 		assert(cn.isRegistered(preparedSelect));
1941 	}
1942 	
1943 	// Test auto-register: queryRow
1944 	{
1945 		mixin(scopedCn);
1946 	
1947 		assert(!cn.isRegistered(preparedSelect));
1948 		cn.queryRow(preparedSelect);
1949 		assert(cn.isRegistered(preparedSelect));
1950 	}
1951 	
1952 	// Test auto-register: queryRowTuple
1953 	{
1954 		mixin(scopedCn);
1955 	
1956 		assert(!cn.isRegistered(preparedSelect));
1957 		cn.queryRowTuple(preparedSelect, queryTupleResult);
1958 		assert(cn.isRegistered(preparedSelect));
1959 	}
1960 	
1961 	// Test auto-register: queryValue
1962 	{
1963 		mixin(scopedCn);
1964 	
1965 		assert(!cn.isRegistered(preparedSelect));
1966 		cn.queryValue(preparedSelect);
1967 		assert(cn.isRegistered(preparedSelect));
1968 	}
1969 }
1970 
1971 // An attempt to reproduce issue #81: Using mysql-native driver with no default database
1972 // I'm unable to actually reproduce the error, though.
1973 debug(MYSQLN_TESTS)
1974 unittest
1975 {
1976 	import mysql.escape;
1977 	mixin(scopedCn);
1978 	
1979 	cn.exec("DROP TABLE IF EXISTS `issue81`");
1980 	cn.exec("CREATE TABLE `issue81` (a INTEGER) ENGINE=InnoDB DEFAULT CHARSET=utf8");
1981 	cn.exec("INSERT INTO `issue81` (a) VALUES (1)");
1982 
1983 	auto cn2 = new Connection(text("host=", cn._host, ";port=", cn._port, ";user=", cn._user, ";pwd=", cn._pwd));
1984 	scope(exit) cn2.close();
1985 	
1986 	cn2.query("SELECT * FROM `"~mysqlEscape(cn._db).text~"`.`issue81`");
1987 }
1988 
1989 // Regression test for Issue #154:
1990 // autoPurge can throw an exception if the socket was closed without purging
1991 //
1992 // This simulates a disconnect by closing the socket underneath the Connection
1993 // object itself.
1994 debug(MYSQLN_TESTS)
1995 unittest
1996 {
1997 	mixin(scopedCn);
1998 
1999 	cn.exec("DROP TABLE IF EXISTS `dropConnection`");
2000 	cn.exec("CREATE TABLE `dropConnection` (
2001 		`val` INTEGER
2002 	) ENGINE=InnoDB DEFAULT CHARSET=utf8");
2003 	cn.exec("INSERT INTO `dropConnection` VALUES (1), (2), (3)");
2004 	import mysql.prepared;
2005 	{
2006 		auto prep = cn.prepare("SELECT * FROM `dropConnection`");
2007 		cn.query(prep);
2008 	}
2009 	// close the socket forcibly
2010 	cn._socket.close();
2011 	// this should still work (it should reconnect).
2012 	cn.exec("DROP TABLE `dropConnection`");
2013 }
2014 
2015 /+
2016 Test Prepared's ability to be safely refcount-released during a GC cycle
2017 (ie, `Connection.release` must not allocate GC memory).
2018 
2019 Currently disabled because it's not guaranteed to always work
2020 (and apparently, cannot be made to work?)
2021 For relevant discussion, see issue #159:
2022 https://github.com/mysql-d/mysql-native/issues/159
2023 +/
2024 version(none)
2025 debug(MYSQLN_TESTS)
2026 {
2027 	/// Proof-of-concept ref-counted Prepared wrapper, just for testing,
2028 	/// not really intended for actual use.
2029 	private struct RCPreparedPayload
2030 	{
2031 		Prepared prepared;
2032 		Connection conn; // Connection to be released from
2033 
2034 		alias prepared this;
2035 
2036 		@disable this(this); // not copyable
2037 		~this()
2038 		{
2039 			// There are a couple calls to this dtor where `conn` happens to be null.
2040 			if(conn is null)
2041 				return;
2042 
2043 			assert(conn.isRegistered(prepared));
2044 			conn.release(prepared);
2045 		}
2046 	}
2047 	///ditto
2048 	alias RCPrepared = RefCounted!(RCPreparedPayload, RefCountedAutoInitialize.no);
2049 	///ditto
2050 	private RCPrepared rcPrepare(Connection conn, string sql)
2051 	{
2052 		import std.algorithm.mutation : move;
2053 
2054 		auto prepared = conn.prepare(sql);
2055 		auto payload = RCPreparedPayload(prepared, conn);
2056 		return refCounted(move(payload));
2057 	}
2058 
2059 	unittest
2060 	{
2061 		import core.memory;
2062 		mixin(scopedCn);
2063 		
2064 		cn.exec("DROP TABLE IF EXISTS `rcPrepared`");
2065 		cn.exec("CREATE TABLE `rcPrepared` (
2066 			`val` INTEGER
2067 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
2068 		cn.exec("INSERT INTO `rcPrepared` VALUES (1), (2), (3)");
2069 
2070 		// Define this in outer scope to guarantee data is left pending when
2071 		// RCPrepared's payload is collected. This will guarantee
2072 		// that Connection will need to queue the release.
2073 		ResultRange rows;
2074 
2075 		void bar()
2076 		{
2077 			class Foo { RCPrepared p; }
2078 			auto foo = new Foo();
2079 
2080 			auto rcStmt = cn.rcPrepare("SELECT * FROM `rcPrepared`");
2081 			foo.p = rcStmt;
2082 			rows = cn.query(rcStmt);
2083 
2084 			/+
2085 			At this point, there are two references to the prepared statement:
2086 			One in a `Foo` object (currently bound to `foo`), and one on the stack.
2087 
2088 			Returning from this function will destroy the one on the stack,
2089 			and deterministically reduce the refcount to 1.
2090 
2091 			So, right here we set `foo` to null to *keep* the Foo object's
2092 			reference to the prepared statement, but set adrift the Foo object
2093 			itself, ready to be destroyed (along with the only remaining
2094 			prepared statement reference it contains) by the next GC cycle.
2095 
2096 			Thus, `RCPreparedPayload.~this` and `Connection.release(Prepared)`
2097 			will be executed during a GC cycle...and had better not perform
2098 			any allocations, or else...boom!
2099 			+/
2100 			foo = null;
2101 		}
2102 
2103 		bar();
2104 		assert(cn.hasPending); // Ensure Connection is forced to queue the release.
2105 		GC.collect(); // `Connection.release(Prepared)` better not be allocating, or boom!
2106 	}
2107 }