1 /// Connect to a MySQL/MariaDB server.
2 module mysql.connection;
3 
4 import std.algorithm;
5 import std.conv;
6 import std.digest.sha;
7 import std.exception;
8 import std.range;
9 import std.socket;
10 import std.string;
11 import std.typecons;
12 
13 import mysql.commands;
14 import mysql.exceptions;
15 import mysql.prepared;
16 import mysql.protocol.constants;
17 import mysql.protocol.packets;
18 import mysql.protocol.sockets;
19 import mysql.result;
20 debug(MYSQLN_TESTS)
21 {
22 	import mysql.test.common;
23 }
24 
25 version(Have_vibe_d_core)
26 {
27 	static if(__traits(compiles, (){ import vibe.core.net; } ))
28 		import vibe.core.net;
29 	else
30 		static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'.");
31 }
32 
33 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection.
34 immutable SvrCapFlags defaultClientFlags =
35 		SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS |
36 		SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 |
37 		SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS |
38 		//SvrCapFlags.MULTI_RESULTS;
39 
40 /++
41 Submit an SQL command to the server to be compiled into a prepared statement.
42 
43 This will automatically register the prepared statement on the provided connection.
44 The resulting `Prepared` can then be used freely on ANY `Connection`,
45 as it will automatically be registered upon its first use on other connections.
46 Or, pass it to `Connection.register` if you prefer eager registration.
47 
48 Internally, the result of a successful outcome will be a statement handle - an ID -
49 for the prepared statement, a count of the parameters required for
50 execution of the statement, and a count of the columns that will be present
51 in any result set that the command generates.
52 
53 The server will then proceed to send prepared statement headers,
54 including parameter descriptions, and result set field descriptions,
55 followed by an EOF packet.
56 
57 Throws: `mysql.exceptions.MYX` if the server has a problem.
58 +/
59 Prepared prepare(Connection conn, string sql)
60 {
61 	auto info = conn.registerIfNeeded(sql);
62 	return Prepared(sql, info.headers, info.numParams);
63 }
64 
65 /++
66 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0.
67 
68 See `BackwardCompatPrepared` for more info.
69 +/
70 deprecated("This is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. You should migrate from this to the Prepared-compatible exec/query overloads in 'mysql.commands'.")
71 BackwardCompatPrepared prepareBackwardCompat(Connection conn, string sql)
72 {
73 	return BackwardCompatPrepared(conn, prepare(conn, sql));
74 }
75 
76 /++
77 Convenience function to create a prepared statement which calls a stored function.
78 
79 Be careful that your numArgs is correct. If it isn't, you may get a
80 `mysql.exceptions.MYX` with a very unclear error message.
81 
82 Throws: `mysql.exceptions.MYX` if the server has a problem.
83 
84 Params:
85 	name = The name of the stored function.
86 	numArgs = The number of arguments the stored procedure takes.
87 +/
88 Prepared prepareFunction(Connection conn, string name, int numArgs)
89 {
90 	auto sql = "select " ~ name ~ preparedPlaceholderArgs(numArgs);
91 	return prepare(conn, sql);
92 }
93 
94 ///
95 unittest
96 {
97 	debug(MYSQLN_TESTS)
98 	{
99 		import mysql.test.common;
100 		mixin(scopedCn);
101 
102 		exec(cn, `DROP FUNCTION IF EXISTS hello`);
103 		exec(cn, `
104 			CREATE FUNCTION hello (s CHAR(20))
105 			RETURNS CHAR(50) DETERMINISTIC
106 			RETURN CONCAT('Hello ',s,'!')
107 		`);
108 
109 		auto preparedHello = prepareFunction(cn, "hello", 1);
110 		preparedHello.setArgs("World");
111 		auto rs = cn.query(preparedHello).array;
112 		assert(rs.length == 1);
113 		assert(rs[0][0] == "Hello World!");
114 	}
115 }
116 
117 /++
118 Convenience function to create a prepared statement which calls a stored procedure.
119 
120 OUT parameters are currently not supported. It should generally be
121 possible with MySQL to present them as a result set.
122 
123 Be careful that your numArgs is correct. If it isn't, you may get a
124 `mysql.exceptions.MYX` with a very unclear error message.
125 
126 Throws: `mysql.exceptions.MYX` if the server has a problem.
127 
128 Params:
129 	name = The name of the stored procedure.
130 	numArgs = The number of arguments the stored procedure takes.
131 
132 +/
133 Prepared prepareProcedure(Connection conn, string name, int numArgs)
134 {
135 	auto sql = "call " ~ name ~ preparedPlaceholderArgs(numArgs);
136 	return prepare(conn, sql);
137 }
138 
139 ///
140 unittest
141 {
142 	debug(MYSQLN_TESTS)
143 	{
144 		import mysql.test.common;
145 		import mysql.test.integration;
146 		mixin(scopedCn);
147 		initBaseTestTables(cn);
148 
149 		exec(cn, `DROP PROCEDURE IF EXISTS insert2`);
150 		exec(cn, `
151 			CREATE PROCEDURE insert2 (IN p1 INT, IN p2 CHAR(50))
152 			BEGIN
153 				INSERT INTO basetest (intcol, stringcol) VALUES(p1, p2);
154 			END
155 		`);
156 
157 		auto preparedInsert2 = prepareProcedure(cn, "insert2", 2);
158 		preparedInsert2.setArgs(2001, "inserted string 1");
159 		cn.exec(preparedInsert2);
160 
161 		auto rs = query(cn, "SELECT stringcol FROM basetest WHERE intcol=2001").array;
162 		assert(rs.length == 1);
163 		assert(rs[0][0] == "inserted string 1");
164 	}
165 }
166 
167 private string preparedPlaceholderArgs(int numArgs)
168 {
169 	auto sql = "(";
170 	bool comma = false;
171 	foreach(i; 0..numArgs)
172 	{
173 		if (comma)
174 			sql ~= ",?";
175 		else
176 		{
177 			sql ~= "?";
178 			comma = true;
179 		}
180 	}
181 	sql ~= ")";
182 
183 	return sql;
184 }
185 
186 debug(MYSQLN_TESTS)
187 unittest
188 {
189 	assert(preparedPlaceholderArgs(3) == "(?,?,?)");
190 	assert(preparedPlaceholderArgs(2) == "(?,?)");
191 	assert(preparedPlaceholderArgs(1) == "(?)");
192 	assert(preparedPlaceholderArgs(0) == "()");
193 }
194 
195 /// Per-connection info from the server about a registered prepared statement.
196 package struct PreparedServerInfo
197 {
198 	/// Server's identifier for this prepared statement.
199 	/// Apperently, this is never 0 if it's been registered,
200 	/// although mysql-native no longer relies on that.
201 	uint statementId;
202 
203 	ushort psWarnings;
204 
205 	/// Number of parameters this statement takes.
206 	/// 
207 	/// This will be the same on all connections, but it's returned
208 	/// by the server upon registration, so it's stored here.
209 	ushort numParams;
210 
211 	/// Prepared statement headers
212 	///
213 	/// This will be the same on all connections, but it's returned
214 	/// by the server upon registration, so it's stored here.
215 	PreparedStmtHeaders headers;
216 	
217 	/// Not actually from the server. Connection uses this to keep track
218 	/// of statements that should be treated as having been released.
219 	bool queuedForRelease = false;
220 }
221 
222 /++
223 This is a wrapper over `Prepared` which is provided ONLY as a
224 temporary aid in upgrading to mysql-native v2.0.0 and its
225 new connection-independent model of prepared statements.
226 
227 In most cases, this layer shouldn't even be needed. But if you have many
228 lines of code making calls to exec/query the same prepared statement,
229 then this may be helpful.
230 
231 To use this temporary compatability layer, change instances of:
232 
233 ---
234 auto stmt = conn.prepare(...);
235 ---
236 
237 to:
238 
239 ---
240 auto stmt = conn.prepareBackwardCompat(...);
241 ---
242 
243 And then your prepared statement should work as before.
244 
245 BUT DO NOT LEAVE IT LIKE THIS! Ultimately, you should update
246 your prepared statement code to the mysql-native v2.0.0 API, by changing
247 instances of:
248 
249 ---
250 stmt.exec()
251 stmt.query()
252 stmt.queryRow()
253 stmt.queryRowTuple(outputArgs...)
254 stmt.queryValue()
255 ---
256 
257 to:
258 
259 ---
260 conn.exec(stmt)
261 conn.query(stmt)
262 conn.queryRow(stmt)
263 conn.queryRowTuple(stmt, outputArgs...)
264 conn.queryValue(stmt)
265 ---
266 
267 Both of the above syntaxes can be used with a `BackwardCompatPrepared`
268 (the `Connection` passed directly to `exec`/`query` will override the
269 one embedded associated with your `BackwardCompatPrepared`).
270 
271 Once all of your code is updated, you can change `prepareBackwardCompat`
272 back to `prepare` again, and your upgrade will be complete.
273 +/
274 struct BackwardCompatPrepared
275 {
276 	import std.variant;
277 	
278 	private Connection _conn;
279 	Prepared _prepared;
280 
281 	/// Access underlying `Prepared`
282 	@property Prepared prepared() { return _prepared; }
283 
284 	alias _prepared this;
285 
286 	/++
287 	This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0.
288 	
289 	See `BackwardCompatPrepared` for more info.
290 	+/
291 	deprecated("Change 'preparedStmt.exec()' to 'conn.exec(preparedStmt)'")
292 	ulong exec()
293 	{
294 		return .exec(_conn, _prepared);
295 	}
296 
297 	///ditto
298 	deprecated("Change 'preparedStmt.query()' to 'conn.query(preparedStmt)'")
299 	ResultRange query(ColumnSpecialization[] csa = null)
300 	{
301 		return .query(_conn, _prepared, csa);
302 	}
303 
304 	///ditto
305 	deprecated("Change 'preparedStmt.queryRow()' to 'conn.queryRow(preparedStmt)'")
306 	Nullable!Row queryRow(ColumnSpecialization[] csa = null)
307 	{
308 		return .queryRow(_conn, _prepared, csa);
309 	}
310 
311 	///ditto
312 	deprecated("Change 'preparedStmt.queryRowTuple(outArgs...)' to 'conn.queryRowTuple(preparedStmt, outArgs...)'")
313 	void queryRowTuple(T...)(ref T args) if(T.length == 0 || !is(T[0] : Connection))
314 	{
315 		return .queryRowTuple(_conn, _prepared, args);
316 	}
317 
318 	///ditto
319 	deprecated("Change 'preparedStmt.queryValue()' to 'conn.queryValue(preparedStmt)'")
320 	Nullable!Variant queryValue(ColumnSpecialization[] csa = null)
321 	{
322 		return .queryValue(_conn, _prepared, csa);
323 	}
324 }
325 
326 //TODO: All low-level commms should be moved into the mysql.protocol package.
327 /// Low-level comms code relating to prepared statements.
328 package struct ProtocolPrepared
329 {
330 	import std.conv;
331 	import std.datetime;
332 	import std.variant;
333 	import mysql.types;
334 	
335 	static ubyte[] makeBitmap(in Variant[] inParams)
336 	{
337 		size_t bml = (inParams.length+7)/8;
338 		ubyte[] bma;
339 		bma.length = bml;
340 		foreach (i; 0..inParams.length)
341 		{
342 			if(inParams[i].type != typeid(typeof(null)))
343 				continue;
344 			size_t bn = i/8;
345 			size_t bb = i%8;
346 			ubyte sr = 1;
347 			sr <<= bb;
348 			bma[bn] |= sr;
349 		}
350 		return bma;
351 	}
352 
353 	static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow
354 	{
355 		ubyte[] prefix;
356 		prefix.length = 14;
357 
358 		prefix[4] = CommandType.STMT_EXECUTE;
359 		hStmt.packInto(prefix[5..9]);
360 		prefix[9] = flags;   // flags, no cursor
361 		prefix[10] = 1; // iteration count - currently always 1
362 		prefix[11] = 0;
363 		prefix[12] = 0;
364 		prefix[13] = 0;
365 
366 		return prefix;
367 	}
368 
369 	//TODO: All low-level commms should be moved into the mysql.protocol package.
370 	static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa,
371 		out ubyte[] vals, out bool longData)
372 	{
373 		size_t pc = inParams.length;
374 		ubyte[] types;
375 		types.length = pc*2;
376 		size_t alloc = pc*20;
377 		vals.length = alloc;
378 		uint vcl = 0, len;
379 		int ct = 0;
380 
381 		void reAlloc(size_t n)
382 		{
383 			if (vcl+n < alloc)
384 				return;
385 			size_t inc = (alloc*3)/2;
386 			if (inc <  n)
387 				inc = n;
388 			alloc += inc;
389 			vals.length = alloc;
390 		}
391 
392 		foreach (size_t i; 0..pc)
393 		{
394 			enum UNSIGNED  = 0x80;
395 			enum SIGNED    = 0;
396 			if (psa[i].chunkSize)
397 				longData= true;
398 			if (inParams[i].type == typeid(typeof(null)))
399 			{
400 				types[ct++] = SQLType.NULL;
401 				types[ct++] = SIGNED;
402 				continue;
403 			}
404 			Variant v = inParams[i];
405 			SQLType ext = psa[i].type;
406 			string ts = v.type.toString();
407 			bool isRef;
408 			if (ts[$-1] == '*')
409 			{
410 				ts.length = ts.length-1;
411 				isRef= true;
412 			}
413 
414 			switch (ts)
415 			{
416 				case "bool":
417 					if (ext == SQLType.INFER_FROM_D_TYPE)
418 						types[ct++] = SQLType.BIT;
419 					else
420 						types[ct++] = cast(ubyte) ext;
421 					types[ct++] = SIGNED;
422 					reAlloc(2);
423 					bool bv = isRef? *(v.get!(bool*)): v.get!(bool);
424 					vals[vcl++] = 1;
425 					vals[vcl++] = bv? 0x31: 0x30;
426 					break;
427 				case "byte":
428 					types[ct++] = SQLType.TINY;
429 					types[ct++] = SIGNED;
430 					reAlloc(1);
431 					vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte);
432 					break;
433 				case "ubyte":
434 					types[ct++] = SQLType.TINY;
435 					types[ct++] = UNSIGNED;
436 					reAlloc(1);
437 					vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte);
438 					break;
439 				case "short":
440 					types[ct++] = SQLType.SHORT;
441 					types[ct++] = SIGNED;
442 					reAlloc(2);
443 					short si = isRef? *(v.get!(short*)): v.get!(short);
444 					vals[vcl++] = cast(ubyte) (si & 0xff);
445 					vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
446 					break;
447 				case "ushort":
448 					types[ct++] = SQLType.SHORT;
449 					types[ct++] = UNSIGNED;
450 					reAlloc(2);
451 					ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort);
452 					vals[vcl++] = cast(ubyte) (us & 0xff);
453 					vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
454 					break;
455 				case "int":
456 					types[ct++] = SQLType.INT;
457 					types[ct++] = SIGNED;
458 					reAlloc(4);
459 					int ii = isRef? *(v.get!(int*)): v.get!(int);
460 					vals[vcl++] = cast(ubyte) (ii & 0xff);
461 					vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
462 					vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
463 					vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
464 					break;
465 				case "uint":
466 					types[ct++] = SQLType.INT;
467 					types[ct++] = UNSIGNED;
468 					reAlloc(4);
469 					uint ui = isRef? *(v.get!(uint*)): v.get!(uint);
470 					vals[vcl++] = cast(ubyte) (ui & 0xff);
471 					vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
472 					vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
473 					vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
474 					break;
475 				case "long":
476 					types[ct++] = SQLType.LONGLONG;
477 					types[ct++] = SIGNED;
478 					reAlloc(8);
479 					long li = isRef? *(v.get!(long*)): v.get!(long);
480 					vals[vcl++] = cast(ubyte) (li & 0xff);
481 					vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
482 					vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
483 					vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
484 					vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
485 					vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
486 					vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
487 					vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
488 					break;
489 				case "ulong":
490 					types[ct++] = SQLType.LONGLONG;
491 					types[ct++] = UNSIGNED;
492 					reAlloc(8);
493 					ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong);
494 					vals[vcl++] = cast(ubyte) (ul & 0xff);
495 					vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
496 					vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
497 					vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
498 					vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
499 					vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
500 					vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
501 					vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
502 					break;
503 				case "float":
504 					types[ct++] = SQLType.FLOAT;
505 					types[ct++] = SIGNED;
506 					reAlloc(4);
507 					float f = isRef? *(v.get!(float*)): v.get!(float);
508 					ubyte* ubp = cast(ubyte*) &f;
509 					vals[vcl++] = *ubp++;
510 					vals[vcl++] = *ubp++;
511 					vals[vcl++] = *ubp++;
512 					vals[vcl++] = *ubp;
513 					break;
514 				case "double":
515 					types[ct++] = SQLType.DOUBLE;
516 					types[ct++] = SIGNED;
517 					reAlloc(8);
518 					double d = isRef? *(v.get!(double*)): v.get!(double);
519 					ubyte* ubp = cast(ubyte*) &d;
520 					vals[vcl++] = *ubp++;
521 					vals[vcl++] = *ubp++;
522 					vals[vcl++] = *ubp++;
523 					vals[vcl++] = *ubp++;
524 					vals[vcl++] = *ubp++;
525 					vals[vcl++] = *ubp++;
526 					vals[vcl++] = *ubp++;
527 					vals[vcl++] = *ubp;
528 					break;
529 				case "std.datetime.date.Date":
530 				case "std.datetime.Date":
531 					types[ct++] = SQLType.DATE;
532 					types[ct++] = SIGNED;
533 					Date date = isRef? *(v.get!(Date*)): v.get!(Date);
534 					ubyte[] da = pack(date);
535 					size_t l = da.length;
536 					reAlloc(l);
537 					vals[vcl..vcl+l] = da[];
538 					vcl += l;
539 					break;
540 				case "std.datetime.TimeOfDay":
541 				case "std.datetime.Time":
542 					types[ct++] = SQLType.TIME;
543 					types[ct++] = SIGNED;
544 					TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay);
545 					ubyte[] ta = pack(time);
546 					size_t l = ta.length;
547 					reAlloc(l);
548 					vals[vcl..vcl+l] = ta[];
549 					vcl += l;
550 					break;
551 				case "std.datetime.date.DateTime":
552 				case "std.datetime.DateTime":
553 					types[ct++] = SQLType.DATETIME;
554 					types[ct++] = SIGNED;
555 					DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime);
556 					ubyte[] da = pack(dt);
557 					size_t l = da.length;
558 					reAlloc(l);
559 					vals[vcl..vcl+l] = da[];
560 					vcl += l;
561 					break;
562 				case "mysql.types.Timestamp":
563 					types[ct++] = SQLType.TIMESTAMP;
564 					types[ct++] = SIGNED;
565 					Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp);
566 					DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep);
567 					ubyte[] da = pack(dt);
568 					size_t l = da.length;
569 					reAlloc(l);
570 					vals[vcl..vcl+l] = da[];
571 					vcl += l;
572 					break;
573 				case "immutable(char)[]":
574 					if (ext == SQLType.INFER_FROM_D_TYPE)
575 						types[ct++] = SQLType.VARCHAR;
576 					else
577 						types[ct++] = cast(ubyte) ext;
578 					types[ct++] = SIGNED;
579 					string s = isRef? *(v.get!(string*)): v.get!(string);
580 					ubyte[] packed = packLCS(cast(void[]) s);
581 					reAlloc(packed.length);
582 					vals[vcl..vcl+packed.length] = packed[];
583 					vcl += packed.length;
584 					break;
585 				case "char[]":
586 					if (ext == SQLType.INFER_FROM_D_TYPE)
587 						types[ct++] = SQLType.VARCHAR;
588 					else
589 						types[ct++] = cast(ubyte) ext;
590 					types[ct++] = SIGNED;
591 					char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]);
592 					ubyte[] packed = packLCS(cast(void[]) ca);
593 					reAlloc(packed.length);
594 					vals[vcl..vcl+packed.length] = packed[];
595 					vcl += packed.length;
596 					break;
597 				case "byte[]":
598 					if (ext == SQLType.INFER_FROM_D_TYPE)
599 						types[ct++] = SQLType.TINYBLOB;
600 					else
601 						types[ct++] = cast(ubyte) ext;
602 					types[ct++] = SIGNED;
603 					byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]);
604 					ubyte[] packed = packLCS(cast(void[]) ba);
605 					reAlloc(packed.length);
606 					vals[vcl..vcl+packed.length] = packed[];
607 					vcl += packed.length;
608 					break;
609 				case "ubyte[]":
610 					if (ext == SQLType.INFER_FROM_D_TYPE)
611 						types[ct++] = SQLType.TINYBLOB;
612 					else
613 						types[ct++] = cast(ubyte) ext;
614 					types[ct++] = SIGNED;
615 					ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]);
616 					ubyte[] packed = packLCS(cast(void[]) uba);
617 					reAlloc(packed.length);
618 					vals[vcl..vcl+packed.length] = packed[];
619 					vcl += packed.length;
620 					break;
621 				case "void":
622 					throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
623 				default:
624 					throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__);
625 			}
626 		}
627 		vals.length = vcl;
628 		return types;
629 	}
630 
631 	static void sendLongData(Connection conn, uint hStmt, ParameterSpecialization[] psa)
632 	{
633 		assert(psa.length <= ushort.max); // parameter number is sent as short
634 		foreach (ushort i, PSN psn; psa)
635 		{
636 			if (!psn.chunkSize) continue;
637 			uint cs = psn.chunkSize;
638 			uint delegate(ubyte[]) dg = psn.chunkDelegate;
639 
640 			//TODO: All low-level commms should be moved into the mysql.protocol package.
641 			ubyte[] chunk;
642 			chunk.length = cs+11;
643 			chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
644 			chunk[4] = CommandType.STMT_SEND_LONG_DATA;
645 			hStmt.packInto(chunk[5..9]); // statement handle
646 			packInto(i, chunk[9..11]); // parameter number
647 
648 			// byte 11 on is payload
649 			for (;;)
650 			{
651 				uint sent = dg(chunk[11..cs+11]);
652 				if (sent < cs)
653 				{
654 					if (sent == 0)    // data was exact multiple of chunk size - all sent
655 						break;
656 					sent += 7;        // adjust for non-payload bytes
657 					chunk.length = chunk.length - (cs-sent);     // trim the chunk
658 					packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
659 					conn.send(chunk);
660 					break;
661 				}
662 				conn.send(chunk);
663 			}
664 		}
665 	}
666 
667 	static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
668 		Variant[] inParams, ParameterSpecialization[] psa)
669 	{
670 		conn.autoPurge();
671 		
672 		//TODO: All low-level commms should be moved into the mysql.protocol package.
673 		ubyte[] packet;
674 		conn.resetPacket();
675 
676 		ubyte[] prefix = makePSPrefix(hStmt, 0);
677 		size_t len = prefix.length;
678 		bool longData;
679 
680 		if (psh.paramCount)
681 		{
682 			ubyte[] one = [ 1 ];
683 			ubyte[] vals;
684 			ubyte[] types = analyseParams(inParams, psa, vals, longData);
685 			ubyte[] nbm = makeBitmap(inParams);
686 			packet = prefix ~ nbm ~ one ~ types ~ vals;
687 		}
688 		else
689 			packet = prefix;
690 
691 		if (longData)
692 			sendLongData(conn, hStmt, psa);
693 
694 		assert(packet.length <= uint.max);
695 		packet.setPacketHeader(conn.pktNumber);
696 		conn.bumpPacket();
697 		conn.send(packet);
698 	}
699 }
700 
701 /++
702 A class representing a database connection.
703 
704 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
705 creating a new Connection directly. That will provide certain benefits,
706 such as reusing old connections and automatic cleanup (no need to close
707 the connection when done).
708 
709 ------------------
710 // Suggested usage:
711 
712 {
713 	auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
714 	scope(exit) con.close();
715 
716 	// Use the connection
717 	...
718 }
719 ------------------
720 +/
721 //TODO: All low-level commms should be moved into the mysql.protocol package.
722 class Connection
723 {
724 /+
725 The Connection is responsible for handshaking with the server to establish
726 authentication. It then passes client preferences to the server, and
727 subsequently is the channel for all command packets that are sent, and all
728 response packets received.
729 
730 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one
731 byte as a packet number. Connection deals with the headers and ensures that
732 packet numbers are sequential.
733 
734 The initial packet is sent by the server - essentially a 'hello' packet
735 inviting login. That packet has a sequence number of zero. That sequence
736 number is the incremented by client and server packets through the handshake
737 sequence.
738 
739 After login all further sequences are initialized by the client sending a
740 command packet with a zero sequence number, to which the server replies with
741 zero or more packets with sequential sequence numbers.
742 +/
743 package:
744 	enum OpenState
745 	{
746 		/// We have not yet connected to the server, or have sent QUIT to the
747 		/// server and closed the connection
748 		notConnected,
749 		/// We have connected to the server and parsed the greeting, but not
750 		/// yet authenticated
751 		connected,
752 		/// We have successfully authenticated against the server, and need to
753 		/// send QUIT to the server when closing the connection
754 		authenticated
755 	}
756 	OpenState   _open;
757 	MySQLSocket _socket;
758 
759 	SvrCapFlags _sCaps, _cCaps;
760 	uint    _sThread;
761 	ushort  _serverStatus;
762 	ubyte   _sCharSet, _protocol;
763 	string  _serverVersion;
764 
765 	string _host, _user, _pwd, _db;
766 	ushort _port;
767 
768 	MySQLSocketType _socketType;
769 
770 	OpenSocketCallbackPhobos _openSocketPhobos;
771 	OpenSocketCallbackVibeD  _openSocketVibeD;
772 
773 	ulong _insertID;
774 
775 	// This gets incremented every time a command is issued or results are purged,
776 	// so a ResultRange can tell whether it's been invalidated.
777 	ulong _lastCommandID;
778 
779 	// Whether there are rows, headers or bimary data waiting to be retreived.
780 	// MySQL protocol doesn't permit performing any other action until all
781 	// such data is read.
782 	bool _rowsPending, _headersPending, _binaryPending;
783 
784 	// Field count of last performed command.
785 	//TODO: Does Connection need to store this?
786 	ushort _fieldCount;
787 
788 	// ResultSetHeaders of last performed command.
789 	//TODO: Does Connection need to store this? Is this even used?
790 	ResultSetHeaders _rsh;
791 
792 	// This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise
793 	// you'll get the dreaded "packet out of order" message. It, and the socket connection are
794 	// the reason why most other objects require a connection object for their construction.
795 	ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct
796 				/// ordering. First packet should have 0
797 	@property ubyte pktNumber()   { return _cpn; }
798 	void bumpPacket()       { _cpn++; }
799 	void resetPacket()      { _cpn = 0; }
800 
801 	version(Have_vibe_d_core) {} else
802 	pure const nothrow invariant()
803 	{
804 		assert(_socketType != MySQLSocketType.vibed);
805 	}
806 
807 	ubyte[] getPacket()
808 	{
809 		scope(failure) kill();
810 
811 		ubyte[4] header;
812 		_socket.read(header);
813 		// number of bytes always set as 24-bit
814 		uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
815 		enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order");
816 		bumpPacket();
817 
818 		ubyte[] packet = new ubyte[numDataBytes];
819 		_socket.read(packet);
820 		assert(packet.length == numDataBytes, "Wrong number of bytes read");
821 		return packet;
822 	}
823 
824 	void send(const(ubyte)[] packet)
825 	in
826 	{
827 		assert(packet.length > 4); // at least 1 byte more than header
828 	}
829 	body
830 	{
831 		_socket.write(packet);
832 	}
833 
834 	void send(const(ubyte)[] header, const(ubyte)[] data)
835 	in
836 	{
837 		assert(header.length == 4 || header.length == 5/*command type included*/);
838 	}
839 	body
840 	{
841 		_socket.write(header);
842 		if(data.length)
843 			_socket.write(data);
844 	}
845 
846 	void sendCmd(T)(CommandType cmd, const(T)[] data)
847 	in
848 	{
849 		// Internal thread states. Clients shouldn't use this
850 		assert(cmd != CommandType.SLEEP);
851 		assert(cmd != CommandType.CONNECT);
852 		assert(cmd != CommandType.TIME);
853 		assert(cmd != CommandType.DELAYED_INSERT);
854 		assert(cmd != CommandType.CONNECT_OUT);
855 
856 		// Deprecated
857 		assert(cmd != CommandType.CREATE_DB);
858 		assert(cmd != CommandType.DROP_DB);
859 		assert(cmd != CommandType.TABLE_DUMP);
860 
861 		// cannot send more than uint.max bytes. TODO: better error message if we try?
862 		assert(data.length <= uint.max);
863 	}
864 	out
865 	{
866 		// at this point we should have sent a command
867 		assert(pktNumber == 1);
868 	}
869 	body
870 	{
871 		autoPurge();
872 
873 		scope(failure) kill();
874 
875 		_lastCommandID++;
876 
877 		if(!_socket.connected)
878 		{
879 			if(cmd == CommandType.QUIT)
880 				return; // Don't bother reopening connection just to quit
881 
882 			_open = OpenState.notConnected;
883 			connect(_clientCapabilities);
884 		}
885 
886 		resetPacket();
887 
888 		ubyte[] header;
889 		header.length = 4 /*header*/ + 1 /*cmd*/;
890 		header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/);
891 		header[4] = cmd;
892 		bumpPacket();
893 
894 		send(header, cast(const(ubyte)[])data);
895 	}
896 
897 	OKErrorPacket getCmdResponse(bool asString = false)
898 	{
899 		auto okp = OKErrorPacket(getPacket());
900 		enforcePacketOK(okp);
901 		_serverStatus = okp.serverStatus;
902 		return okp;
903 	}
904 
905 	ubyte[] buildAuthPacket(ubyte[] token)
906 	in
907 	{
908 		assert(token.length == 20);
909 	}
910 	body
911 	{
912 		ubyte[] packet;
913 		packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1);
914 		packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
915 
916 		// NOTE: we'll set the header last when we know the size
917 
918 		// Set the default capabilities required by the client
919 		_cCaps.packInto(packet[4..8]);
920 
921 		// Request a conventional maximum packet length.
922 		1.packInto(packet[8..12]);
923 
924 		packet ~= 33; // Set UTF-8 as default charSet
925 
926 		// There's a statutory block of zero bytes here - fill them in.
927 		foreach(i; 0 .. 23)
928 			packet ~= 0;
929 
930 		// Add the user name as a null terminated string
931 		foreach(i; 0 .. _user.length)
932 			packet ~= _user[i];
933 		packet ~= 0; // \0
934 
935 		// Add our calculated authentication token as a length prefixed string.
936 		assert(token.length <= ubyte.max);
937 		if(_pwd.length == 0)  // Omit the token if the account has no password
938 			packet ~= 0;
939 		else
940 		{
941 			packet ~= cast(ubyte)token.length;
942 			foreach(i; 0 .. token.length)
943 				packet ~= token[i];
944 		}
945 
946 		// Add the default database as a null terminated string
947 		foreach(i; 0 .. _db.length)
948 			packet ~= _db[i];
949 		packet ~= 0; // \0
950 
951 		// The server sent us a greeting with packet number 0, so we send the auth packet
952 		// back with the next number.
953 		packet.setPacketHeader(pktNumber);
954 		bumpPacket();
955 		return packet;
956 	}
957 
958 	void consumeServerInfo(ref ubyte[] packet)
959 	{
960 		scope(failure) kill();
961 
962 		_sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
963 		_sCharSet = packet.consume!ubyte(); // server_language
964 		_serverStatus = packet.consume!ushort(); //server_status
965 		_sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
966 		_sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
967 
968 		enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
969 		enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
970 	}
971 
972 	ubyte[] parseGreeting()
973 	{
974 		scope(failure) kill();
975 
976 		ubyte[] packet = getPacket();
977 
978 		if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
979 		{
980 			auto okp = OKErrorPacket(packet);
981 			enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
982 		}
983 
984 		_protocol = packet.consume!ubyte();
985 
986 		_serverVersion = packet.consume!string(packet.countUntil(0));
987 		packet.skip(1); // \0 terminated _serverVersion
988 
989 		_sThread = packet.consume!uint();
990 
991 		// read first part of scramble buf
992 		ubyte[] authBuf;
993 		authBuf.length = 255;
994 		authBuf[0..8] = packet.consume(8)[]; // scramble_buff
995 
996 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
997 
998 		consumeServerInfo(packet);
999 
1000 		packet.skip(1); // this byte supposed to be scramble length, but is actually zero
1001 		packet.skip(10); // filler of \0
1002 
1003 		// rest of the scramble
1004 		auto len = packet.countUntil(0);
1005 		enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
1006 		enforce(authBuf.length > 8+len);
1007 		authBuf[8..8+len] = packet.consume(len)[];
1008 		authBuf.length = 8+len; // cut to correct size
1009 		enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
1010 
1011 		return authBuf;
1012 	}
1013 
1014 	static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port)
1015 	{
1016 		auto s = new PlainPhobosSocket();
1017 		s.connect(new InternetAddress(host, port));
1018 		return s;
1019 	}
1020 
1021 	static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port)
1022 	{
1023 		version(Have_vibe_d_core)
1024 			return vibe.core.net.connectTCP(host, port);
1025 		else
1026 			assert(0);
1027 	}
1028 
1029 	void initConnection()
1030 	{
1031 		resetPacket();
1032 		final switch(_socketType)
1033 		{
1034 			case MySQLSocketType.phobos:
1035 				_socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port));
1036 				break;
1037 
1038 			case MySQLSocketType.vibed:
1039 				version(Have_vibe_d_core) {
1040 					_socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port));
1041 					break;
1042 				} else assert(0, "Unsupported socket type. Need version Have_vibe_d_core.");
1043 		}
1044 	}
1045 
1046 	ubyte[] makeToken(ubyte[] authBuf)
1047 	{
1048 		auto pass1 = sha1Of(cast(const(ubyte)[])_pwd);
1049 		auto pass2 = sha1Of(pass1);
1050 
1051 		SHA1 sha1;
1052 		sha1.start();
1053 		sha1.put(authBuf);
1054 		sha1.put(pass2);
1055 		auto result = sha1.finish();
1056 		foreach (size_t i; 0..20)
1057 			result[i] = result[i] ^ pass1[i];
1058 		return result.dup;
1059 	}
1060 
1061 	SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
1062 	{
1063 		SvrCapFlags common;
1064 		uint filter = 1;
1065 		foreach (size_t i; 0..uint.sizeof)
1066 		{
1067 			bool serverSupport = (server & filter) != 0; // can the server do this capability?
1068 			bool clientSupport = (client & filter) != 0; // can we support it?
1069 			if(serverSupport && clientSupport)
1070 				common |= filter;
1071 			filter <<= 1; // check next flag
1072 		}
1073 		return common;
1074 	}
1075 
1076 	void setClientFlags(SvrCapFlags capFlags)
1077 	{
1078 		_cCaps = getCommonCapabilities(_sCaps, capFlags);
1079 
1080 		// We cannot operate in <4.1 protocol, so we'll force it even if the user
1081 		// didn't supply it
1082 		_cCaps |= SvrCapFlags.PROTOCOL41;
1083 		_cCaps |= SvrCapFlags.SECURE_CONNECTION;
1084 	}
1085 
1086 	void authenticate(ubyte[] greeting)
1087 	in
1088 	{
1089 		assert(_open == OpenState.connected);
1090 	}
1091 	out
1092 	{
1093 		assert(_open == OpenState.authenticated);
1094 	}
1095 	body
1096 	{
1097 		auto token = makeToken(greeting);
1098 		auto authPacket = buildAuthPacket(token);
1099 		send(authPacket);
1100 
1101 		auto packet = getPacket();
1102 		auto okp = OKErrorPacket(packet);
1103 		enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
1104 		_open = OpenState.authenticated;
1105 	}
1106 
1107 	SvrCapFlags _clientCapabilities;
1108 
1109 	void connect(SvrCapFlags clientCapabilities)
1110 	in
1111 	{
1112 		assert(closed);
1113 	}
1114 	out
1115 	{
1116 		assert(_open == OpenState.authenticated);
1117 	}
1118 	body
1119 	{
1120 		initConnection();
1121 		auto greeting = parseGreeting();
1122 		_open = OpenState.connected;
1123 
1124 		_clientCapabilities = clientCapabilities;
1125 		setClientFlags(clientCapabilities);
1126 		authenticate(greeting);
1127 	}
1128 	
1129 	/// Forcefully close the socket without sending the quit command.
1130 	/// Needed in case an error leaves communatations in an undefined or non-recoverable state.
1131 	void kill()
1132 	{
1133 		if(_socket.connected)
1134 			_socket.close();
1135 		_open = OpenState.notConnected;
1136 		// any pending data is gone. Any statements to release will be released
1137 		// on the server automatically.
1138 		_headersPending = _rowsPending = _binaryPending = false;
1139 		
1140 		static if(__traits(compiles, (){ int[int] aa; aa.clear(); }))
1141 			preparedLookup.clear();
1142 		else
1143 			preparedLookup = null;
1144 	}
1145 	
1146 	/// Called whenever mysql-native needs to send a command to the server
1147 	/// and be sure there aren't any pending results (which would prevent
1148 	/// a new command from being sent).
1149 	void autoPurge()
1150 	{
1151 		// This is called every time a command is sent,
1152 		// so detect & prevent infinite recursion.
1153 		static bool isAutoPurging = false;
1154 
1155 		if(isAutoPurging)
1156 			return;
1157 			
1158 		isAutoPurging = true;
1159 		scope(exit) isAutoPurging = false;
1160 
1161 		try
1162 		{
1163 			purgeResult();
1164 			releaseQueued();
1165 		}
1166 		catch(Exception e)
1167 		{
1168 			// likely the connection was closed, so reset any state.
1169 			// Don't treat this as a real error, because everything will be reset when we
1170 			// reconnect.
1171 			kill();
1172 		}
1173 	}
1174 
1175 	/// Lookup per-connection prepared statement info by SQL
1176 	PreparedServerInfo[string] preparedLookup;
1177 	
1178 	/// Set `queuedForRelease` flag for a statement in `preparedLookup`.
1179 	/// Does nothing if statement not in `preparedLookup`.
1180 	private void setQueuedForRelease(string sql, bool value)
1181 	{
1182 		if(sql in preparedLookup)
1183 		{
1184 			auto info = preparedLookup[sql];
1185 			info.queuedForRelease = value;
1186 			preparedLookup[sql] = info;
1187 		}
1188 	}
1189 
1190 	/// Queue a prepared statement for release.
1191 	void queueForRelease(string sql)
1192 	{
1193 		// If connection's closed, then it IS released.
1194 		if(closed)
1195 			return;
1196 
1197 		setQueuedForRelease(sql, true);
1198 	}
1199 
1200 	/// Remove a statement from the queue to be released.
1201 	void unqueueForRelease(string sql)
1202 	{
1203 		setQueuedForRelease(sql, false);
1204 	}
1205 
1206 	/// Releases all prepared statements that are queued for release.
1207 	void releaseQueued()
1208 	{
1209 		foreach(sql, info; preparedLookup)
1210 		if(info.queuedForRelease)
1211 		{
1212 			immediateReleasePrepared(info.statementId);
1213 			preparedLookup.remove(sql);
1214 		}
1215 	}
1216 
1217 	/// Returns null if not found
1218 	Nullable!PreparedServerInfo getPreparedServerInfo(const string sql) pure nothrow
1219 	{
1220 		Nullable!PreparedServerInfo result;
1221 		
1222 		auto pInfo = sql in preparedLookup;
1223 		if(pInfo)
1224 			result = *pInfo;
1225 		
1226 		return result;
1227 	}
1228 	
1229 	/// If already registered, simply returns the cached `PreparedServerInfo`.
1230 	PreparedServerInfo registerIfNeeded(string sql)
1231 	out(info)
1232 	{
1233 		// I'm confident this can't currently happen, but
1234 		// let's make sure that doesn't change.
1235 		assert(!info.queuedForRelease);
1236 	}
1237 	body
1238 	{
1239 		if(auto pInfo = sql in preparedLookup)
1240 		{
1241 			// The statement is registered. It may, or may not, be queued
1242 			// for release. Either way, all we need to do is make sure it's
1243 			// un-queued and then return.
1244 			pInfo.queuedForRelease = false;
1245 			return *pInfo;
1246 		}
1247 
1248 		auto info = registerIfNeededImpl(sql);
1249 		preparedLookup[sql] = info;
1250 
1251 		return info;
1252 	}
1253 
1254 	PreparedServerInfo registerIfNeededImpl(string sql)
1255 	{
1256 		scope(failure) kill();
1257 
1258 		PreparedServerInfo info;
1259 		
1260 		sendCmd(CommandType.STMT_PREPARE, sql);
1261 		_fieldCount = 0;
1262 
1263 		//TODO: All packet handling should be moved into the mysql.protocol package.
1264 		ubyte[] packet = getPacket();
1265 		if(packet.front == ResultPacketMarker.ok)
1266 		{
1267 			packet.popFront();
1268 			info.statementId    = packet.consume!int();
1269 			_fieldCount         = packet.consume!short();
1270 			info.numParams      = packet.consume!short();
1271 
1272 			packet.popFront(); // one byte filler
1273 			info.psWarnings     = packet.consume!short();
1274 
1275 			// At this point the server also sends field specs for parameters
1276 			// and columns if there were any of each
1277 			info.headers = PreparedStmtHeaders(this, _fieldCount, info.numParams);
1278 		}
1279 		else if(packet.front == ResultPacketMarker.error)
1280 		{
1281 			auto error = OKErrorPacket(packet);
1282 			enforcePacketOK(error);
1283 			assert(0); // FIXME: what now?
1284 		}
1285 		else
1286 			assert(0); // FIXME: what now?
1287 
1288 		return info;
1289 	}
1290 
1291 	private void immediateReleasePrepared(uint statementId)
1292 	{
1293 		scope(failure) kill();
1294 
1295 		if(closed())
1296 			return;
1297 
1298 		//TODO: All low-level commms should be moved into the mysql.protocol package.
1299 		ubyte[9] packet_buf;
1300 		ubyte[] packet = packet_buf;
1301 		packet.setPacketHeader(0/*packet number*/);
1302 		bumpPacket();
1303 		packet[4] = CommandType.STMT_CLOSE;
1304 		statementId.packInto(packet[5..9]);
1305 		purgeResult();
1306 		send(packet);
1307 		// It seems that the server does not find it necessary to send a response
1308 		// for this command.
1309 	}
1310 
1311 public:
1312 
1313 	/++
1314 	Construct opened connection.
1315 
1316 	Throws `mysql.exceptions.MYX` upon failure to connect.
1317 	
1318 	If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of
1319 	creating a new Connection directly. That will provide certain benefits,
1320 	such as reusing old connections and automatic cleanup (no need to close
1321 	the connection when done).
1322 
1323 	------------------
1324 	// Suggested usage:
1325 
1326 	{
1327 	    auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb");
1328 	    scope(exit) con.close();
1329 
1330 	    // Use the connection
1331 	    ...
1332 	}
1333 	------------------
1334 
1335 	Params:
1336 		cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld"
1337 			(TODO: The connection string needs work to allow for semicolons in its parts!)
1338 		socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos,
1339 			unless compiled with `-version=Have_vibe_d_core` (set automatically
1340 			if using $(LINK2 http://code.dlang.org/getting_started, DUB)).
1341 		openSocket = Optional callback which should return a newly-opened Phobos
1342 			or Vibe.d TCP socket. This allows custom sockets to be used,
1343 			subclassed from Phobos's or Vibe.d's sockets.
1344 		host = An IP address in numeric dotted form, or as a host  name.
1345 		user = The user name to authenticate.
1346 		password = User's password.
1347 		db = Desired initial database.
1348 		capFlags = The set of flag bits from the server's capabilities that the client requires
1349 	+/
1350 	//After the connection is created, and the initial invitation is received from the server
1351 	//client preferences can be set, and authentication can then be attempted.
1352 	this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1353 	{
1354 		version(Have_vibe_d_core)
1355 			enum defaultSocketType = MySQLSocketType.vibed;
1356 		else
1357 			enum defaultSocketType = MySQLSocketType.phobos;
1358 
1359 		this(defaultSocketType, host, user, pwd, db, port, capFlags);
1360 	}
1361 
1362 	///ditto
1363 	this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1364 	{
1365 		version(Have_vibe_d_core) {} else
1366 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
1367 
1368 		this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD,
1369 			host, user, pwd, db, port, capFlags);
1370 	}
1371 
1372 	///ditto
1373 	this(OpenSocketCallbackPhobos openSocket,
1374 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1375 	{
1376 		this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags);
1377 	}
1378 
1379 	version(Have_vibe_d_core)
1380 	///ditto
1381 	this(OpenSocketCallbackVibeD openSocket,
1382 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1383 	{
1384 		this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags);
1385 	}
1386 
1387 	///ditto
1388 	private this(MySQLSocketType socketType,
1389 		OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD,
1390 		string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags)
1391 	in
1392 	{
1393 		final switch(socketType)
1394 		{
1395 			case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break;
1396 			case MySQLSocketType.vibed:  assert(openSocketVibeD  !is null); break;
1397 		}
1398 	}
1399 	body
1400 	{
1401 		enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1");
1402 		enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection");
1403 		version(Have_vibe_d_core) {} else
1404 			enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core");
1405 
1406 		_socketType = socketType;
1407 		_host = host;
1408 		_user = user;
1409 		_pwd = pwd;
1410 		_db = db;
1411 		_port = port;
1412 
1413 		_openSocketPhobos = openSocketPhobos;
1414 		_openSocketVibeD  = openSocketVibeD;
1415 
1416 		connect(capFlags);
1417 	}
1418 
1419 	///ditto
1420 	//After the connection is created, and the initial invitation is received from the server
1421 	//client preferences can be set, and authentication can then be attempted.
1422 	this(string cs, SvrCapFlags capFlags = defaultClientFlags)
1423 	{
1424 		string[] a = parseConnectionString(cs);
1425 		this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1426 	}
1427 
1428 	///ditto
1429 	this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags)
1430 	{
1431 		string[] a = parseConnectionString(cs);
1432 		this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1433 	}
1434 
1435 	///ditto
1436 	this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
1437 	{
1438 		string[] a = parseConnectionString(cs);
1439 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1440 	}
1441 
1442 	version(Have_vibe_d_core)
1443 	///ditto
1444 	this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags)
1445 	{
1446 		string[] a = parseConnectionString(cs);
1447 		this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags);
1448 	}
1449 
1450 	/++
1451 	Check whether this `Connection` is still connected to the server, or if
1452 	the connection has been closed.
1453 	+/
1454 	@property bool closed()
1455 	{
1456 		return _open == OpenState.notConnected || !_socket.connected;
1457 	}
1458 
1459 	version(Have_vibe_d_core)
1460 	{
1461 		/// Used by Vibe.d's ConnectionPool, ignore this.
1462 		void acquire() { if( _socket ) _socket.acquire(); }
1463 		///ditto
1464 		void release() { if( _socket ) _socket.release(); }
1465 		///ditto
1466 		bool isOwner() { return _socket ? _socket.isOwner() : false; }
1467 		///ditto
1468 		bool amOwner() { return _socket ? _socket.isOwner() : false; }
1469 	}
1470 	else
1471 	{
1472 		/// Used by Vibe.d's ConnectionPool, ignore this.
1473 		void acquire() { /+ Do nothing +/ }
1474 		///ditto
1475 		void release() { /+ Do nothing +/ }
1476 		///ditto
1477 		bool isOwner() { return !!_socket; }
1478 		///ditto
1479 		bool amOwner() { return !!_socket; }
1480 	}
1481 
1482 	/++
1483 	Explicitly close the connection.
1484 	
1485 	This is a two-stage process. First tell the server we are quitting this
1486 	connection, and then close the socket.
1487 	
1488 	Idiomatic use as follows is suggested:
1489 	------------------
1490 	{
1491 	    auto con = new Connection("localhost:user:password:mysqld");
1492 	    scope(exit) con.close();
1493 	    // Use the connection
1494 	    ...
1495 	}
1496 	------------------
1497 	+/
1498 	void close()
1499 	{
1500 		if (_open == OpenState.authenticated && _socket.connected)
1501 			quit();
1502 
1503 		if (_open == OpenState.connected)
1504 			kill();
1505 		resetPacket();
1506 	}
1507 
1508 	/++
1509 	Reconnects to the server using the same connection settings originally
1510 	used to create the `Connection`.
1511 
1512 	Optionally takes a `mysql.protocol.constants.SvrCapFlags`, allowing you to
1513 	reconnect using a different set of server capability flags.
1514 
1515 	Normally, if the connection is already open, this will do nothing. However,
1516 	if you request a different set of `mysql.protocol.constants.SvrCapFlags`
1517 	then was originally used to create the `Connection`, the connection will
1518 	be closed and then reconnected using the new `mysql.protocol.constants.SvrCapFlags`.
1519 	+/
1520 	void reconnect()
1521 	{
1522 		reconnect(_clientCapabilities);
1523 	}
1524 
1525 	///ditto
1526 	void reconnect(SvrCapFlags clientCapabilities)
1527 	{
1528 		bool sameCaps = clientCapabilities == _clientCapabilities;
1529 		if(!closed)
1530 		{
1531 			// Same caps as before?
1532 			if(clientCapabilities == _clientCapabilities)
1533 				return; // Nothing to do, just keep current connection
1534 
1535 			close();
1536 		}
1537 
1538 		connect(clientCapabilities);
1539 	}
1540 
1541 	private void quit()
1542 	in
1543 	{
1544 		assert(_open == OpenState.authenticated);
1545 	}
1546 	body
1547 	{
1548 		sendCmd(CommandType.QUIT, []);
1549 		// No response is sent for a quit packet
1550 		_open = OpenState.connected;
1551 	}
1552 
1553 	/++
1554 	Parses a connection string of the form
1555 	`"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"`
1556 
1557 	Port is optional and defaults to 3306.
1558 
1559 	Whitespace surrounding any name or value is automatically stripped.
1560 
1561 	Returns a five-element array of strings in this order:
1562 	$(UL
1563 	$(LI [0]: host)
1564 	$(LI [1]: user)
1565 	$(LI [2]: pwd)
1566 	$(LI [3]: db)
1567 	$(LI [4]: port)
1568 	)
1569 	
1570 	(TODO: The connection string needs work to allow for semicolons in its parts!)
1571 	+/
1572 	//TODO: Replace the return value with a proper struct.
1573 	static string[] parseConnectionString(string cs)
1574 	{
1575 		string[] rv;
1576 		rv.length = 5;
1577 		rv[4] = "3306"; // Default port
1578 		string[] a = split(cs, ";");
1579 		foreach (s; a)
1580 		{
1581 			string[] a2 = split(s, "=");
1582 			enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs);
1583 			string name = strip(a2[0]);
1584 			string val = strip(a2[1]);
1585 			switch (name)
1586 			{
1587 				case "host":
1588 					rv[0] = val;
1589 					break;
1590 				case "user":
1591 					rv[1] = val;
1592 					break;
1593 				case "pwd":
1594 					rv[2] = val;
1595 					break;
1596 				case "db":
1597 					rv[3] = val;
1598 					break;
1599 				case "port":
1600 					rv[4] = val;
1601 					break;
1602 				default:
1603 					throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__);
1604 			}
1605 		}
1606 		return rv;
1607 	}
1608 
1609 	/++
1610 	Select a current database.
1611 	
1612 	Throws `mysql.exceptions.MYX` upon failure.
1613 
1614 	Params: dbName = Name of the requested database
1615 	+/
1616 	void selectDB(string dbName)
1617 	{
1618 		sendCmd(CommandType.INIT_DB, dbName);
1619 		getCmdResponse();
1620 		_db = dbName;
1621 	}
1622 
1623 	/++
1624 	Check the server status.
1625 	
1626 	Throws `mysql.exceptions.MYX` upon failure.
1627 
1628 	Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined
1629 	+/
1630 	OKErrorPacket pingServer()
1631 	{
1632 		sendCmd(CommandType.PING, []);
1633 		return getCmdResponse();
1634 	}
1635 
1636 	/++
1637 	Refresh some feature(s) of the server.
1638 	
1639 	Throws `mysql.exceptions.MYX` upon failure.
1640 
1641 	Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined
1642 	+/
1643 	OKErrorPacket refreshServer(RefreshFlags flags)
1644 	{
1645 		sendCmd(CommandType.REFRESH, [flags]);
1646 		return getCmdResponse();
1647 	}
1648 
1649 	/++
1650 	Internal - Get the next `mysql.result.Row` of a pending result set.
1651 	
1652 	This is intended to be internal, you should not use it directly.
1653 	It will not likely remain public in the future.
1654 	
1655 	Returns: A `mysql.result.Row` object.
1656 	+/
1657 	Row getNextRow()
1658 	{
1659 		scope(failure) kill();
1660 
1661 		if (_headersPending)
1662 		{
1663 			_rsh = ResultSetHeaders(this, _fieldCount);
1664 			_headersPending = false;
1665 		}
1666 		ubyte[] packet;
1667 		Row rr;
1668 		packet = getPacket();
1669 		if (packet.isEOFPacket())
1670 		{
1671 			_rowsPending = _binaryPending = false;
1672 			return rr;
1673 		}
1674 		if (_binaryPending)
1675 			rr = Row(this, packet, _rsh, true);
1676 		else
1677 			rr = Row(this, packet, _rsh, false);
1678 		//rr._valid = true;
1679 		return rr;
1680 	}
1681 
1682 	/++
1683 	Flush any outstanding result set elements.
1684 	
1685 	When the server responds to a command that produces a result set, it
1686 	queues the whole set of corresponding packets over the current connection.
1687 	Before that `Connection` can embark on any new command, it must receive
1688 	all of those packets and junk them.
1689 	
1690 	As of v1.1.4, this is done automatically as needed. But you can still
1691 	call this manually to force a purge to occur when you want.
1692 
1693 	See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/)
1694 	+/
1695 	ulong purgeResult()
1696 	{
1697 		scope(failure) kill();
1698 
1699 		_lastCommandID++;
1700 
1701 		ulong rows = 0;
1702 		if (_headersPending)
1703 		{
1704 			for (size_t i = 0;; i++)
1705 			{
1706 				if (getPacket().isEOFPacket())
1707 				{
1708 					_headersPending = false;
1709 					break;
1710 				}
1711 				enforceEx!MYXProtocol(i < _fieldCount,
1712 					text("Field header count (", _fieldCount, ") exceeded but no EOF packet found."));
1713 			}
1714 		}
1715 		if (_rowsPending)
1716 		{
1717 			for (;;  rows++)
1718 			{
1719 				if (getPacket().isEOFPacket())
1720 				{
1721 					_rowsPending = _binaryPending = false;
1722 					break;
1723 				}
1724 			}
1725 		}
1726 		resetPacket();
1727 		return rows;
1728 	}
1729 
1730 	/++
1731 	Get a textual report on the server status.
1732 	
1733 	(COM_STATISTICS)
1734 	+/
1735 	string serverStats()
1736 	{
1737 		sendCmd(CommandType.STATISTICS, []);
1738 		return cast(string) getPacket();
1739 	}
1740 
1741 	/++
1742 	Enable multiple statement commands.
1743 	
1744 	This can be used later if this feature was not requested in the client capability flags.
1745 	
1746 	Warning: This functionality is currently untested.
1747 	
1748 	Params: on = Boolean value to turn the capability on or off.
1749 	+/
1750 	void enableMultiStatements(bool on)
1751 	{
1752 		scope(failure) kill();
1753 
1754 		ubyte[] t;
1755 		t.length = 2;
1756 		t[0] = on ? 0 : 1;
1757 		t[1] = 0;
1758 		sendCmd(CommandType.STMT_OPTION, t);
1759 
1760 		// For some reason this command gets an EOF packet as response
1761 		auto packet = getPacket();
1762 		enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1763 	}
1764 
1765 	/// Return the in-force protocol number.
1766 	@property ubyte protocol() pure const nothrow { return _protocol; }
1767 	/// Server version
1768 	@property string serverVersion() pure const nothrow { return _serverVersion; }
1769 	/// Server capability flags
1770 	@property uint serverCapabilities() pure const nothrow { return _sCaps; }
1771 	/// Server status
1772 	@property ushort serverStatus() pure const nothrow { return _serverStatus; }
1773 	/// Current character set
1774 	@property ubyte charSet() pure const nothrow { return _sCharSet; }
1775 	/// Current database
1776 	@property string currentDB() pure const nothrow { return _db; }
1777 	/// Socket type being used, Phobos or Vibe.d
1778 	@property MySQLSocketType socketType() pure const nothrow { return _socketType; }
1779 
1780 	/// After a command that inserted a row into a table with an auto-increment
1781 	/// ID column, this method allows you to retrieve the last insert ID.
1782 	@property ulong lastInsertID() pure const nothrow { return _insertID; }
1783 
1784 	/// This gets incremented every time a command is issued or results are purged,
1785 	/// so a `mysql.result.ResultRange` can tell whether it's been invalidated.
1786 	@property ulong lastCommandID() pure const nothrow { return _lastCommandID; }
1787 
1788 	/// Gets whether rows are pending.
1789 	///
1790 	/// Note, you may want `hasPending` instead.
1791 	@property bool rowsPending() pure const nothrow { return _rowsPending; }
1792 
1793 	/// Gets whether anything (rows, headers or binary) is pending.
1794 	/// New commands cannot be sent on a conncection while anything is pending
1795 	/// (the pending data will automatically be purged.)
1796 	@property bool hasPending() pure const nothrow
1797 	{
1798 		return _rowsPending || _headersPending || _binaryPending;
1799 	}
1800 
1801 	/// Gets the result header's field descriptions.
1802 	@property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; }
1803 
1804 	/++
1805 	Manually register a prepared statement on this connection.
1806 	
1807 	Does nothing if statement is already registered on this connection.
1808 	
1809 	Calling this is not strictly necessary, as the prepared statement will
1810 	automatically be registered upon its first use on any `Connection`.
1811 	This is provided for those who prefer eager registration over lazy
1812 	for performance reasons.
1813 	+/
1814 	void register(Prepared prepared)
1815 	{
1816 		registerIfNeeded(prepared.sql);
1817 	}
1818 
1819 	/++
1820 	Manually release a prepared statement on this connection.
1821 	
1822 	This method tells the server that it can dispose of the information it
1823 	holds about the current prepared statement.
1824 	
1825 	Calling this is not strictly necessary. The server considers prepared
1826 	statements to be per-connection, so they'll go away when the connection
1827 	closes anyway. This is provided in case direct control is actually needed.
1828 
1829 	Due to the internal "queued for release" system, this MAY CAUSE ALLOCATIONS,
1830 	and therefore CANNOT BE CALLED SAFELY FROM A DESTRUCTOR in case the
1831 	destructor gets triggered during a GC cycle. See issue
1832 	$(LINK2 https://github.com/mysql-d/mysql-native/issues/159, #159)
1833 	for details of this problem.
1834 	
1835 	Notes:
1836 	
1837 	In actuality, the server might not immediately be told to release the
1838 	statement (although `isRegistered` will still report `false`).
1839 	
1840 	This is because there could be a `mysql.result.ResultRange` with results
1841 	still pending for retrieval, and the protocol doesn't allow sending commands
1842 	(such as "release a prepared statement") to the server while data is pending.
1843 	Therefore, this function may instead queue the statement to be released
1844 	when it is safe to do so: Either the next time a result set is purged or
1845 	the next time a command (such as `mysql.commands.query` or
1846 	`mysql.commands.exec`) is performed (because such commands automatically
1847 	purge any pending results).
1848 	
1849 	This function does NOT auto-purge because, if this is ever called from
1850 	automatic resource management cleanup (refcounting, RAII, etc), that
1851 	would create ugly situations where hidden, implicit behavior triggers
1852 	an unexpected auto-purge.
1853 	+/
1854 	void release(Prepared prepared)
1855 	{
1856 		release(prepared.sql);
1857 	}
1858 	
1859 	///ditto
1860 	void release(string sql)
1861 	{
1862 		//TODO: Don't queue it if nothing is pending. Just do it immediately.
1863 		//      But need to be certain both situations are unittested.
1864 		queueForRelease(sql);
1865 	}
1866 	
1867 	/// Is the given SQL registered on this connection as a prepared statement?
1868 	bool isRegistered(Prepared prepared)
1869 	{
1870 		return isRegistered( prepared.sql );
1871 	}
1872 
1873 	///ditto
1874 	bool isRegistered(string sql)
1875 	{
1876 		return isRegistered( getPreparedServerInfo(sql) );
1877 	}
1878 
1879 	///ditto
1880 	package bool isRegistered(Nullable!PreparedServerInfo info)
1881 	{
1882 		return !info.isNull && !info.queuedForRelease;
1883 	}
1884 }
1885 
1886 // Test register, release, isRegistered, and auto-register for prepared statements
1887 debug(MYSQLN_TESTS)
1888 unittest
1889 {
1890 	import mysql.connection;
1891 	import mysql.test.common;
1892 	
1893 	Prepared preparedInsert;
1894 	Prepared preparedSelect;
1895 	immutable insertSQL = "INSERT INTO `autoRegistration` VALUES (1), (2)";
1896 	immutable selectSQL = "SELECT `val` FROM `autoRegistration`";
1897 	int queryTupleResult;
1898 	
1899 	{
1900 		mixin(scopedCn);
1901 		
1902 		// Setup
1903 		cn.exec("DROP TABLE IF EXISTS `autoRegistration`");
1904 		cn.exec("CREATE TABLE `autoRegistration` (
1905 			`val` INTEGER
1906 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
1907 
1908 		// Initial register
1909 		preparedInsert = cn.prepare(insertSQL);
1910 		preparedSelect = cn.prepare(selectSQL);
1911 		
1912 		// Test basic register, release, isRegistered
1913 		assert(cn.isRegistered(preparedInsert));
1914 		assert(cn.isRegistered(preparedSelect));
1915 		cn.release(preparedInsert);
1916 		cn.release(preparedSelect);
1917 		assert(!cn.isRegistered(preparedInsert));
1918 		assert(!cn.isRegistered(preparedSelect));
1919 		
1920 		// Test manual re-register
1921 		cn.register(preparedInsert);
1922 		cn.register(preparedSelect);
1923 		assert(cn.isRegistered(preparedInsert));
1924 		assert(cn.isRegistered(preparedSelect));
1925 		
1926 		// Test double register
1927 		cn.register(preparedInsert);
1928 		cn.register(preparedSelect);
1929 		assert(cn.isRegistered(preparedInsert));
1930 		assert(cn.isRegistered(preparedSelect));
1931 
1932 		// Test double release
1933 		cn.release(preparedInsert);
1934 		cn.release(preparedSelect);
1935 		assert(!cn.isRegistered(preparedInsert));
1936 		assert(!cn.isRegistered(preparedSelect));
1937 		cn.release(preparedInsert);
1938 		cn.release(preparedSelect);
1939 		assert(!cn.isRegistered(preparedInsert));
1940 		assert(!cn.isRegistered(preparedSelect));
1941 	}
1942 
1943 	// Note that at this point, both prepared statements still exist,
1944 	// but are no longer registered on any connection. In fact, there
1945 	// are no open connections anymore.
1946 	
1947 	// Test auto-register: exec
1948 	{
1949 		mixin(scopedCn);
1950 	
1951 		assert(!cn.isRegistered(preparedInsert));
1952 		cn.exec(preparedInsert);
1953 		assert(cn.isRegistered(preparedInsert));
1954 	}
1955 	
1956 	// Test auto-register: query
1957 	{
1958 		mixin(scopedCn);
1959 	
1960 		assert(!cn.isRegistered(preparedSelect));
1961 		cn.query(preparedSelect).each();
1962 		assert(cn.isRegistered(preparedSelect));
1963 	}
1964 	
1965 	// Test auto-register: queryRow
1966 	{
1967 		mixin(scopedCn);
1968 	
1969 		assert(!cn.isRegistered(preparedSelect));
1970 		cn.queryRow(preparedSelect);
1971 		assert(cn.isRegistered(preparedSelect));
1972 	}
1973 	
1974 	// Test auto-register: queryRowTuple
1975 	{
1976 		mixin(scopedCn);
1977 	
1978 		assert(!cn.isRegistered(preparedSelect));
1979 		cn.queryRowTuple(preparedSelect, queryTupleResult);
1980 		assert(cn.isRegistered(preparedSelect));
1981 	}
1982 	
1983 	// Test auto-register: queryValue
1984 	{
1985 		mixin(scopedCn);
1986 	
1987 		assert(!cn.isRegistered(preparedSelect));
1988 		cn.queryValue(preparedSelect);
1989 		assert(cn.isRegistered(preparedSelect));
1990 	}
1991 }
1992 
1993 // An attempt to reproduce issue #81: Using mysql-native driver with no default database
1994 // I'm unable to actually reproduce the error, though.
1995 debug(MYSQLN_TESTS)
1996 unittest
1997 {
1998 	import mysql.escape;
1999 	mixin(scopedCn);
2000 	
2001 	cn.exec("DROP TABLE IF EXISTS `issue81`");
2002 	cn.exec("CREATE TABLE `issue81` (a INTEGER) ENGINE=InnoDB DEFAULT CHARSET=utf8");
2003 	cn.exec("INSERT INTO `issue81` (a) VALUES (1)");
2004 
2005 	auto cn2 = new Connection(text("host=", cn._host, ";port=", cn._port, ";user=", cn._user, ";pwd=", cn._pwd));
2006 	scope(exit) cn2.close();
2007 	
2008 	cn2.query("SELECT * FROM `"~mysqlEscape(cn._db).text~"`.`issue81`");
2009 }
2010 
2011 // Regression test for Issue #154:
2012 // autoPurge can throw an exception if the socket was closed without purging
2013 //
2014 // This simulates a disconnect by closing the socket underneath the Connection
2015 // object itself.
2016 debug(MYSQLN_TESTS)
2017 unittest
2018 {
2019 	mixin(scopedCn);
2020 
2021 	cn.exec("DROP TABLE IF EXISTS `dropConnection`");
2022 	cn.exec("CREATE TABLE `dropConnection` (
2023 		`val` INTEGER
2024 	) ENGINE=InnoDB DEFAULT CHARSET=utf8");
2025 	cn.exec("INSERT INTO `dropConnection` VALUES (1), (2), (3)");
2026 	import mysql.prepared;
2027 	{
2028 		auto prep = cn.prepare("SELECT * FROM `dropConnection`");
2029 		cn.query(prep);
2030 	}
2031 	// close the socket forcibly
2032 	cn._socket.close();
2033 	// this should still work (it should reconnect).
2034 	cn.exec("DROP TABLE `dropConnection`");
2035 }
2036 
2037 /+
2038 Test Prepared's ability to be safely refcount-released during a GC cycle
2039 (ie, `Connection.release` must not allocate GC memory).
2040 
2041 While this test does succeed for me, it is currently disabled because it's
2042 not guaranteed to always work:
2043 
2044 Queuing a prepared statement for release currently involves indexing an
2045 associative array (to access `Connection.preparedLookup[xx].queuedForRelease`).
2046 Attempts at @nogc-ing `Connection.release` revealed that, according to DMD:
2047 "indexing an associative array...may cause GC allocation".
2048 
2049 Ultimately, to fix this, `Connection.release` must become @nogc, and the
2050 only ways I see to do that involve algorithmic time complexity that's
2051 just not worth the questionable benefit of releasing prepared statements
2052 within a connection's lifetime.
2053 
2054 For more info, see issue #159: https://github.com/mysql-d/mysql-native/issues/159
2055 +/
2056 version(none)
2057 debug(MYSQLN_TESTS)
2058 {
2059 	/// Proof-of-concept ref-counted Prepared wrapper, just for testing,
2060 	/// not really intended for actual use.
2061 	private struct RCPreparedPayload
2062 	{
2063 		Prepared prepared;
2064 		Connection conn; // Connection to be released from
2065 
2066 		alias prepared this;
2067 
2068 		@disable this(this); // not copyable
2069 		~this()
2070 		{
2071 			// There are a couple calls to this dtor where `conn` happens to be null.
2072 			if(conn is null)
2073 				return;
2074 
2075 			assert(conn.isRegistered(prepared));
2076 			conn.release(prepared);
2077 		}
2078 	}
2079 	///ditto
2080 	alias RCPrepared = RefCounted!(RCPreparedPayload, RefCountedAutoInitialize.no);
2081 	///ditto
2082 	private RCPrepared rcPrepare(Connection conn, string sql)
2083 	{
2084 		import std.algorithm.mutation : move;
2085 
2086 		auto prepared = conn.prepare(sql);
2087 		auto payload = RCPreparedPayload(prepared, conn);
2088 		return refCounted(move(payload));
2089 	}
2090 
2091 	unittest
2092 	{
2093 		import core.memory;
2094 		mixin(scopedCn);
2095 		
2096 		cn.exec("DROP TABLE IF EXISTS `rcPrepared`");
2097 		cn.exec("CREATE TABLE `rcPrepared` (
2098 			`val` INTEGER
2099 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
2100 		cn.exec("INSERT INTO `rcPrepared` VALUES (1), (2), (3)");
2101 
2102 		// Define this in outer scope to guarantee data is left pending when
2103 		// RCPrepared's payload is collected. This will guarantee
2104 		// that Connection will need to queue the release.
2105 		ResultRange rows;
2106 
2107 		void bar()
2108 		{
2109 			class Foo { RCPrepared p; }
2110 			auto foo = new Foo();
2111 
2112 			auto rcStmt = cn.rcPrepare("SELECT * FROM `rcPrepared`");
2113 			foo.p = rcStmt;
2114 			rows = cn.query(rcStmt);
2115 
2116 			/+
2117 			At this point, there are two references to the prepared statement:
2118 			One in a `Foo` object (currently bound to `foo`), and one on the stack.
2119 
2120 			Returning from this function will destroy the one on the stack,
2121 			and deterministically reduce the refcount to 1.
2122 
2123 			So, right here we set `foo` to null to *keep* the Foo object's
2124 			reference to the prepared statement, but set adrift the Foo object
2125 			itself, ready to be destroyed (along with the only remaining
2126 			prepared statement reference it contains) by the next GC cycle.
2127 
2128 			Thus, `RCPreparedPayload.~this` and `Connection.release(Prepared)`
2129 			will be executed during a GC cycle...and had better not perform
2130 			any allocations, or else...boom!
2131 			+/
2132 			foo = null;
2133 		}
2134 
2135 		bar();
2136 		assert(cn.hasPending); // Ensure Connection is forced to queue the release.
2137 		GC.collect(); // `Connection.release(Prepared)` better not be allocating, or boom!
2138 	}
2139 }