1 /++
2 Internal - Low-level communications.
3 
4 Consider this module the main entry point for the low-level MySQL/MariaDB
5 protocol code. The other modules in `mysql.protocol` are mainly tools
6 to support this module.
7 
8 Previously, the code handling low-level protocol details was scattered all
9 across the library. Such functionality has been factored out into this module,
10 to be kept in one place for better encapsulation and to facilitate further
11 cleanup and refactoring.
12 
13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it
14 eventually settles into what will eventually become a low-level library
15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight...
16 
17 Next tasks for this sub-package's cleanup:
18 - Reduce this module's reliance on Connection.
19 - Abstract out a PacketStream to clean up getPacket and related functionality.
20 +/
21 module mysql.protocol.comms;
22 
23 import std.algorithm;
24 import std.conv;
25 import std.digest.sha;
26 import std.exception;
27 import std.range;
28 import std.variant;
29 
30 import mysql.connection;
31 import mysql.exceptions;
32 import mysql.prepared;
33 import mysql.result;
34 import mysql.types;
35 
36 import mysql.protocol.constants;
37 import mysql.protocol.extra_types;
38 import mysql.protocol.packet_helpers;
39 import mysql.protocol.packets;
40 import mysql.protocol.sockets;
41 
42 /// Low-level comms code relating to prepared statements.
43 package struct ProtocolPrepared
44 {
45 	import std.conv;
46 	import std.datetime;
47 	import std.variant;
48 	import mysql.types;
49 	
50 	static ubyte[] makeBitmap(in Variant[] inParams)
51 	{
52 		size_t bml = (inParams.length+7)/8;
53 		ubyte[] bma;
54 		bma.length = bml;
55 		foreach (i; 0..inParams.length)
56 		{
57 			if(inParams[i].type != typeid(typeof(null)))
58 				continue;
59 			size_t bn = i/8;
60 			size_t bb = i%8;
61 			ubyte sr = 1;
62 			sr <<= bb;
63 			bma[bn] |= sr;
64 		}
65 		return bma;
66 	}
67 
68 	static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow
69 	{
70 		ubyte[] prefix;
71 		prefix.length = 14;
72 
73 		prefix[4] = CommandType.STMT_EXECUTE;
74 		hStmt.packInto(prefix[5..9]);
75 		prefix[9] = flags;   // flags, no cursor
76 		prefix[10] = 1; // iteration count - currently always 1
77 		prefix[11] = 0;
78 		prefix[12] = 0;
79 		prefix[13] = 0;
80 
81 		return prefix;
82 	}
83 
84 	static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa,
85 		out ubyte[] vals, out bool longData)
86 	{
87 		size_t pc = inParams.length;
88 		ubyte[] types;
89 		types.length = pc*2;
90 		size_t alloc = pc*20;
91 		vals.length = alloc;
92 		uint vcl = 0, len;
93 		int ct = 0;
94 
95 		void reAlloc(size_t n)
96 		{
97 			if (vcl+n < alloc)
98 				return;
99 			size_t inc = (alloc*3)/2;
100 			if (inc <  n)
101 				inc = n;
102 			alloc += inc;
103 			vals.length = alloc;
104 		}
105 
106 		foreach (size_t i; 0..pc)
107 		{
108 			enum UNSIGNED  = 0x80;
109 			enum SIGNED    = 0;
110 			if (psa[i].chunkSize)
111 				longData= true;
112 			if (inParams[i].type == typeid(typeof(null)))
113 			{
114 				types[ct++] = SQLType.NULL;
115 				types[ct++] = SIGNED;
116 				continue;
117 			}
118 			Variant v = inParams[i];
119 			SQLType ext = psa[i].type;
120 			string ts = v.type.toString();
121 			bool isRef;
122 			if (ts[$-1] == '*')
123 			{
124 				ts.length = ts.length-1;
125 				isRef= true;
126 			}
127 
128 			switch (ts)
129 			{
130 				case "bool":
131 				case "const(bool)":
132 				case "immutable(bool)":
133 				case "shared(immutable(bool))":
134 					if (ext == SQLType.INFER_FROM_D_TYPE)
135 						types[ct++] = SQLType.BIT;
136 					else
137 						types[ct++] = cast(ubyte) ext;
138 					types[ct++] = SIGNED;
139 					reAlloc(2);
140 					bool bv = isRef? *(v.get!(const(bool*))): v.get!(const(bool));
141 					vals[vcl++] = 1;
142 					vals[vcl++] = bv? 0x31: 0x30;
143 					break;
144 				case "byte":
145 				case "const(byte)":
146 				case "immutable(byte)":
147 				case "shared(immutable(byte))":
148 					types[ct++] = SQLType.TINY;
149 					types[ct++] = SIGNED;
150 					reAlloc(1);
151 					vals[vcl++] = isRef? *(v.get!(const(byte*))): v.get!(const(byte));
152 					break;
153 				case "ubyte":
154 				case "const(ubyte)":
155 				case "immutable(ubyte)":
156 				case "shared(immutable(ubyte))":
157 					types[ct++] = SQLType.TINY;
158 					types[ct++] = UNSIGNED;
159 					reAlloc(1);
160 					vals[vcl++] = isRef? *(v.get!(const(ubyte*))): v.get!(const(ubyte));
161 					break;
162 				case "short":
163 				case "const(short)":
164 				case "immutable(short)":
165 				case "shared(immutable(short))":
166 					types[ct++] = SQLType.SHORT;
167 					types[ct++] = SIGNED;
168 					reAlloc(2);
169 					short si = isRef? *(v.get!(const(short*))): v.get!(const(short));
170 					vals[vcl++] = cast(ubyte) (si & 0xff);
171 					vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
172 					break;
173 				case "ushort":
174 				case "const(ushort)":
175 				case "immutable(ushort)":
176 				case "shared(immutable(ushort))":
177 					types[ct++] = SQLType.SHORT;
178 					types[ct++] = UNSIGNED;
179 					reAlloc(2);
180 					ushort us = isRef? *(v.get!(const(ushort*))): v.get!(const(ushort));
181 					vals[vcl++] = cast(ubyte) (us & 0xff);
182 					vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
183 					break;
184 				case "int":
185 				case "const(int)":
186 				case "immutable(int)":
187 				case "shared(immutable(int))":
188 					types[ct++] = SQLType.INT;
189 					types[ct++] = SIGNED;
190 					reAlloc(4);
191 					int ii = isRef? *(v.get!(const(int*))): v.get!(const(int));
192 					vals[vcl++] = cast(ubyte) (ii & 0xff);
193 					vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
194 					vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
195 					vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
196 					break;
197 				case "uint":
198 				case "const(uint)":
199 				case "immutable(uint)":
200 				case "shared(immutable(uint))":
201 					types[ct++] = SQLType.INT;
202 					types[ct++] = UNSIGNED;
203 					reAlloc(4);
204 					uint ui = isRef? *(v.get!(const(uint*))): v.get!(const(uint));
205 					vals[vcl++] = cast(ubyte) (ui & 0xff);
206 					vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
207 					vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
208 					vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
209 					break;
210 				case "long":
211 				case "const(long)":
212 				case "immutable(long)":
213 				case "shared(immutable(long))":
214 					types[ct++] = SQLType.LONGLONG;
215 					types[ct++] = SIGNED;
216 					reAlloc(8);
217 					long li = isRef? *(v.get!(const(long*))): v.get!(const(long));
218 					vals[vcl++] = cast(ubyte) (li & 0xff);
219 					vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
220 					vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
221 					vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
222 					vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
223 					vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
224 					vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
225 					vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
226 					break;
227 				case "ulong":
228 				case "const(ulong)":
229 				case "immutable(ulong)":
230 				case "shared(immutable(ulong))":
231 					types[ct++] = SQLType.LONGLONG;
232 					types[ct++] = UNSIGNED;
233 					reAlloc(8);
234 					ulong ul = isRef? *(v.get!(const(ulong*))): v.get!(const(ulong));
235 					vals[vcl++] = cast(ubyte) (ul & 0xff);
236 					vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
237 					vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
238 					vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
239 					vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
240 					vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
241 					vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
242 					vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
243 					break;
244 				case "float":
245 				case "const(float)":
246 				case "immutable(float)":
247 				case "shared(immutable(float))":
248 					types[ct++] = SQLType.FLOAT;
249 					types[ct++] = SIGNED;
250 					reAlloc(4);
251 					float f = isRef? *(v.get!(const(float*))): v.get!(const(float));
252 					ubyte* ubp = cast(ubyte*) &f;
253 					vals[vcl++] = *ubp++;
254 					vals[vcl++] = *ubp++;
255 					vals[vcl++] = *ubp++;
256 					vals[vcl++] = *ubp;
257 					break;
258 				case "double":
259 				case "const(double)":
260 				case "immutable(double)":
261 				case "shared(immutable(double))":
262 					types[ct++] = SQLType.DOUBLE;
263 					types[ct++] = SIGNED;
264 					reAlloc(8);
265 					double d = isRef? *(v.get!(const(double*))): v.get!(const(double));
266 					ubyte* ubp = cast(ubyte*) &d;
267 					vals[vcl++] = *ubp++;
268 					vals[vcl++] = *ubp++;
269 					vals[vcl++] = *ubp++;
270 					vals[vcl++] = *ubp++;
271 					vals[vcl++] = *ubp++;
272 					vals[vcl++] = *ubp++;
273 					vals[vcl++] = *ubp++;
274 					vals[vcl++] = *ubp;
275 					break;
276 				case "std.datetime.date.Date":
277 				case "const(std.datetime.date.Date)":
278 				case "immutable(std.datetime.date.Date)":
279 				case "shared(immutable(std.datetime.date.Date))":
280 
281 				case "std.datetime.Date":
282 				case "const(std.datetime.Date)":
283 				case "immutable(std.datetime.Date)":
284 				case "shared(immutable(std.datetime.Date))":
285 					types[ct++] = SQLType.DATE;
286 					types[ct++] = SIGNED;
287 					Date date = isRef? *(v.get!(const(Date*))): v.get!(const(Date));
288 					ubyte[] da = pack(date);
289 					size_t l = da.length;
290 					reAlloc(l);
291 					vals[vcl..vcl+l] = da[];
292 					vcl += l;
293 					break;
294 				case "std.datetime.TimeOfDay":
295 				case "const(std.datetime.TimeOfDay)":
296 				case "immutable(std.datetime.TimeOfDay)":
297 				case "shared(immutable(std.datetime.TimeOfDay))":
298 
299 				case "std.datetime.date.TimeOfDay":
300 				case "const(std.datetime.date.TimeOfDay)":
301 				case "immutable(std.datetime.date.TimeOfDay)":
302 				case "shared(immutable(std.datetime.date.TimeOfDay))":
303 
304 				case "std.datetime.Time":
305 				case "const(std.datetime.Time)":
306 				case "immutable(std.datetime.Time)":
307 				case "shared(immutable(std.datetime.Time))":
308 					types[ct++] = SQLType.TIME;
309 					types[ct++] = SIGNED;
310 					TimeOfDay time = isRef? *(v.get!(const(TimeOfDay*))): v.get!(const(TimeOfDay));
311 					ubyte[] ta = pack(time);
312 					size_t l = ta.length;
313 					reAlloc(l);
314 					vals[vcl..vcl+l] = ta[];
315 					vcl += l;
316 					break;
317 				case "std.datetime.date.DateTime":
318 				case "const(std.datetime.date.DateTime)":
319 				case "immutable(std.datetime.date.DateTime)":
320 				case "shared(immutable(std.datetime.date.DateTime))":
321 
322 				case "std.datetime.DateTime":
323 				case "const(std.datetime.DateTime)":
324 				case "immutable(std.datetime.DateTime)":
325 				case "shared(immutable(std.datetime.DateTime))":
326 					types[ct++] = SQLType.DATETIME;
327 					types[ct++] = SIGNED;
328 					DateTime dt = isRef? *(v.get!(const(DateTime*))): v.get!(const(DateTime));
329 					ubyte[] da = pack(dt);
330 					size_t l = da.length;
331 					reAlloc(l);
332 					vals[vcl..vcl+l] = da[];
333 					vcl += l;
334 					break;
335 				case "mysql.types.Timestamp":
336 				case "const(mysql.types.Timestamp)":
337 				case "immutable(mysql.types.Timestamp)":
338 				case "shared(immutable(mysql.types.Timestamp))":
339 					types[ct++] = SQLType.TIMESTAMP;
340 					types[ct++] = SIGNED;
341 					Timestamp tms = isRef? *(v.get!(const(Timestamp*))): v.get!(const(Timestamp));
342 					DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep).getDateTime;
343 					ubyte[] da = pack(dt);
344 					size_t l = da.length;
345 					reAlloc(l);
346 					vals[vcl..vcl+l] = da[];
347 					vcl += l;
348 					break;
349 				case "char[]":
350 				case "const(char[])":
351 				case "immutable(char[])":
352 				case "const(char)[]":
353 				case "immutable(char)[]":
354 				case "shared(immutable(char)[])":
355 				case "shared(immutable(char))[]":
356 				case "shared(immutable(char[]))":
357 					if (ext == SQLType.INFER_FROM_D_TYPE)
358 						types[ct++] = SQLType.VARCHAR;
359 					else
360 						types[ct++] = cast(ubyte) ext;
361 					types[ct++] = SIGNED;
362 					const char[] ca = isRef? *(v.get!(const(char[]*))): v.get!(const(char[]));
363 					ubyte[] packed = packLCS(cast(void[]) ca);
364 					reAlloc(packed.length);
365 					vals[vcl..vcl+packed.length] = packed[];
366 					vcl += packed.length;
367 					break;
368 				case "byte[]":
369 				case "const(byte[])":
370 				case "immutable(byte[])":
371 				case "const(byte)[]":
372 				case "immutable(byte)[]":
373 				case "shared(immutable(byte)[])":
374 				case "shared(immutable(byte))[]":
375 				case "shared(immutable(byte[]))":
376 					if (ext == SQLType.INFER_FROM_D_TYPE)
377 						types[ct++] = SQLType.TINYBLOB;
378 					else
379 						types[ct++] = cast(ubyte) ext;
380 					types[ct++] = SIGNED;
381 					const byte[] ba = isRef? *(v.get!(const(byte[]*))): v.get!(const(byte[]));
382 					ubyte[] packed = packLCS(cast(void[]) ba);
383 					reAlloc(packed.length);
384 					vals[vcl..vcl+packed.length] = packed[];
385 					vcl += packed.length;
386 					break;
387 				case "ubyte[]":
388 				case "const(ubyte[])":
389 				case "immutable(ubyte[])":
390 				case "const(ubyte)[]":
391 				case "immutable(ubyte)[]":
392 				case "shared(immutable(ubyte)[])":
393 				case "shared(immutable(ubyte))[]":
394 				case "shared(immutable(ubyte[]))":
395 					if (ext == SQLType.INFER_FROM_D_TYPE)
396 						types[ct++] = SQLType.TINYBLOB;
397 					else
398 						types[ct++] = cast(ubyte) ext;
399 					types[ct++] = SIGNED;
400 					const ubyte[] uba = isRef? *(v.get!(const(ubyte[]*))): v.get!(const(ubyte[]));
401 					ubyte[] packed = packLCS(cast(void[]) uba);
402 					reAlloc(packed.length);
403 					vals[vcl..vcl+packed.length] = packed[];
404 					vcl += packed.length;
405 					break;
406 				case "void":
407 					throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
408 				default:
409 					throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__);
410 			}
411 		}
412 		vals.length = vcl;
413 		return types;
414 	}
415 
416 	static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa)
417 	{
418 		assert(psa.length <= ushort.max); // parameter number is sent as short
419 		foreach (ushort i, PSN psn; psa)
420 		{
421 			if (!psn.chunkSize) continue;
422 			uint cs = psn.chunkSize;
423 			uint delegate(ubyte[]) dg = psn.chunkDelegate;
424 
425 			ubyte[] chunk;
426 			chunk.length = cs+11;
427 			chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
428 			chunk[4] = CommandType.STMT_SEND_LONG_DATA;
429 			hStmt.packInto(chunk[5..9]); // statement handle
430 			packInto(i, chunk[9..11]); // parameter number
431 
432 			// byte 11 on is payload
433 			for (;;)
434 			{
435 				uint sent = dg(chunk[11..cs+11]);
436 				if (sent < cs)
437 				{
438 					if (sent == 0)    // data was exact multiple of chunk size - all sent
439 						break;
440 					chunk.length = chunk.length - (cs-sent);     // trim the chunk
441 					sent += 7;        // adjust for non-payload bytes
442 					packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
443 					socket.send(chunk);
444 					break;
445 				}
446 				socket.send(chunk);
447 			}
448 		}
449 	}
450 
451 	static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
452 		Variant[] inParams, ParameterSpecialization[] psa)
453 	{
454 		conn.autoPurge();
455 		
456 		ubyte[] packet;
457 		conn.resetPacket();
458 
459 		ubyte[] prefix = makePSPrefix(hStmt, 0);
460 		size_t len = prefix.length;
461 		bool longData;
462 
463 		if (psh.paramCount)
464 		{
465 			ubyte[] one = [ 1 ];
466 			ubyte[] vals;
467 			ubyte[] types = analyseParams(inParams, psa, vals, longData);
468 			ubyte[] nbm = makeBitmap(inParams);
469 			packet = prefix ~ nbm ~ one ~ types ~ vals;
470 		}
471 		else
472 			packet = prefix;
473 
474 		if (longData)
475 			sendLongData(conn._socket, hStmt, psa);
476 
477 		assert(packet.length <= uint.max);
478 		packet.setPacketHeader(conn.pktNumber);
479 		conn.bumpPacket();
480 		conn._socket.send(packet);
481 	}
482 }
483 
484 package(mysql) struct ExecQueryImplInfo
485 {
486 	bool isPrepared;
487 
488 	// For non-prepared statements:
489 	const(char[]) sql;
490 
491 	// For prepared statements:
492 	uint hStmt;
493 	PreparedStmtHeaders psh;
494 	Variant[] inParams;
495 	ParameterSpecialization[] psa;
496 }
497 
498 /++
499 Internal implementation for the exec and query functions.
500 
501 Execute a one-off SQL command.
502 
503 Any result set can be accessed via Connection.getNextRow(), but you should really be
504 using the query function for such queries.
505 
506 Params: ra = An out parameter to receive the number of rows affected.
507 Returns: true if there was a (possibly empty) result set.
508 +/
509 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra)
510 {
511 	scope(failure) conn.kill();
512 
513 	// Send data
514 	if(info.isPrepared)
515 		ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
516 	else
517 	{
518 		conn.sendCmd(CommandType.QUERY, info.sql);
519 		conn._fieldCount = 0;
520 	}
521 
522 	// Handle response
523 	ubyte[] packet = conn.getPacket();
524 	bool rv;
525 	if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
526 	{
527 		conn.resetPacket();
528 		auto okp = OKErrorPacket(packet);
529 		enforcePacketOK(okp);
530 		ra = okp.affected;
531 		conn._serverStatus = okp.serverStatus;
532 		conn._insertID = okp.insertID;
533 		rv = false;
534 	}
535 	else
536 	{
537 		// There was presumably a result set
538 		assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value
539 		conn._headersPending = conn._rowsPending = true;
540 		conn._binaryPending = info.isPrepared;
541 		auto lcb = packet.consumeIfComplete!LCB();
542 		assert(!lcb.isNull);
543 		assert(!lcb.isIncomplete);
544 		conn._fieldCount = cast(ushort)lcb.value;
545 		assert(conn._fieldCount == lcb.value);
546 		rv = true;
547 		ra = 0;
548 	}
549 	return rv;
550 }
551 
552 ///ditto
553 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info)
554 {
555 	ulong rowsAffected;
556 	return execQueryImpl(conn, info, rowsAffected);
557 }
558 
559 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId)
560 {
561 	scope(failure) conn.kill();
562 
563 	if(conn.closed())
564 		return;
565 
566 	ubyte[9] packet_buf;
567 	ubyte[] packet = packet_buf;
568 	packet.setPacketHeader(0/*packet number*/);
569 	conn.bumpPacket();
570 	packet[4] = CommandType.STMT_CLOSE;
571 	statementId.packInto(packet[5..9]);
572 	conn.purgeResult();
573 	conn._socket.send(packet);
574 	// It seems that the server does not find it necessary to send a response
575 	// for this command.
576 }
577 
578 // Moved here from `struct Row`
579 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure
580 {
581 	uint bitmapLength = calcBitmapLength(fieldCount);
582 	enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields");
583 	auto bitmap = packet.consume(bitmapLength);
584 	return decodeNullBitmap(bitmap, fieldCount);
585 }
586 
587 // Moved here from `struct Row`
588 private static uint calcBitmapLength(uint fieldCount) pure nothrow
589 {
590 	return (fieldCount+7+2)/8;
591 }
592 
593 // Moved here from `struct Row`
594 // This is to decode the bitmap in a binary result row. First two bits are skipped
595 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow
596 in
597 {
598 	assert(bitmap.length >= calcBitmapLength(numFields),
599 		"bitmap not large enough to store all null fields");
600 }
601 out(result)
602 {
603 	assert(result.length == numFields);
604 }
605 body
606 {
607 	bool[] nulls;
608 	nulls.length = numFields;
609 
610 	// the current byte we are processing for nulls
611 	ubyte bits = bitmap.front();
612 	// strip away the first two bits as they are reserved
613 	bits >>= 2;
614 	// .. and then we only have 6 bits left to process for this byte
615 	ubyte bitsLeftInByte = 6;
616 	foreach(ref isNull; nulls)
617 	{
618 		assert(bitsLeftInByte <= 8);
619 		// processed all bits? fetch new byte
620 		if (bitsLeftInByte == 0)
621 		{
622 			assert(bits == 0, "not all bits are processed!");
623 			assert(!bitmap.empty, "bits array too short for number of columns");
624 			bitmap.popFront();
625 			bits = bitmap.front;
626 			bitsLeftInByte = 8;
627 		}
628 		assert(bitsLeftInByte > 0);
629 		isNull = (bits & 0b0000_0001) != 0;
630 
631 		// get ready to process next bit
632 		bits >>= 1;
633 		--bitsLeftInByte;
634 	}
635 	return nulls;
636 }
637 
638 // Moved here from `struct Row.this`
639 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary,
640 	out Variant[] _values, out bool[] _nulls)
641 in
642 {
643 	assert(rh.fieldCount <= uint.max);
644 }
645 body
646 {
647 	scope(failure) conn.kill();
648 
649 	uint fieldCount = cast(uint)rh.fieldCount;
650 	_values.length = _nulls.length = fieldCount;
651 
652 	if(binary)
653 	{
654 		// There's a null byte header on a binary result sequence, followed by some bytes of bitmap
655 		// indicating which columns are null
656 		enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row");
657 		packet.popFront();
658 		_nulls = consumeNullBitmap(packet, fieldCount);
659 	}
660 
661 	foreach(size_t i; 0..fieldCount)
662 	{
663 		if(binary && _nulls[i])
664 		{
665 			_values[i] = null;
666 			continue;
667 		}
668 
669 		SQLValue sqlValue;
670 		do
671 		{
672 			FieldDescription fd = rh[i];
673 			sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet);
674 			// TODO: Support chunk delegate
675 			if(sqlValue.isIncomplete)
676 				packet ~= conn.getPacket();
677 		} while(sqlValue.isIncomplete);
678 		assert(!sqlValue.isIncomplete);
679 
680 		if(sqlValue.isNull)
681 		{
682 			assert(!binary);
683 			assert(!_nulls[i]);
684 			_nulls[i] = true;
685 			_values[i] = null;
686 		}
687 		else
688 		{
689 			// Convert MySQLDate to Phobos Date
690 			if(MySQLDate* myDate = sqlValue.value.peek!MySQLDate)
691 				sqlValue.value = myDate.getDate();
692 
693 			// Convert MySQLDateTime to Phobos DateTime
694 			if(MySQLDateTime* myDateTime = sqlValue.value.peek!MySQLDateTime)
695 				sqlValue.value = myDateTime.getDateTime();
696 
697 			_values[i] = sqlValue.value;
698 		}
699 	}
700 }
701 
702 ////// Moved here from Connection /////////////////////////////////
703 
704 package(mysql) ubyte[] getPacket(Connection conn)
705 {
706 	scope(failure) conn.kill();
707 
708 	ubyte[4] header;
709 	conn._socket.read(header);
710 	// number of bytes always set as 24-bit
711 	uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
712 	enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order");
713 	conn.bumpPacket();
714 
715 	ubyte[] packet = new ubyte[numDataBytes];
716 	conn._socket.read(packet);
717 	assert(packet.length == numDataBytes, "Wrong number of bytes read");
718 	return packet;
719 }
720 
721 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet)
722 in
723 {
724 	assert(packet.length > 4); // at least 1 byte more than header
725 }
726 body
727 {
728 	_socket.write(packet);
729 }
730 
731 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data)
732 in
733 {
734 	assert(header.length == 4 || header.length == 5/*command type included*/);
735 }
736 body
737 {
738 	_socket.write(header);
739 	if(data.length)
740 		_socket.write(data);
741 }
742 
743 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
744 in
745 {
746 	// Internal thread states. Clients shouldn't use this
747 	assert(cmd != CommandType.SLEEP);
748 	assert(cmd != CommandType.CONNECT);
749 	assert(cmd != CommandType.TIME);
750 	assert(cmd != CommandType.DELAYED_INSERT);
751 	assert(cmd != CommandType.CONNECT_OUT);
752 
753 	// Deprecated
754 	assert(cmd != CommandType.CREATE_DB);
755 	assert(cmd != CommandType.DROP_DB);
756 	assert(cmd != CommandType.TABLE_DUMP);
757 
758 	// cannot send more than uint.max bytes. TODO: better error message if we try?
759 	assert(data.length <= uint.max);
760 }
761 out
762 {
763 	// at this point we should have sent a command
764 	assert(conn.pktNumber == 1);
765 }
766 body
767 {
768 	scope(failure) conn.kill();
769 
770 	conn._lastCommandID++;
771 
772 	if(!conn._socket.connected)
773 	{
774 		if(cmd == CommandType.QUIT)
775 			return; // Don't bother reopening connection just to quit
776 
777 		conn._open = Connection.OpenState.notConnected;
778 		conn.connect(conn._clientCapabilities);
779 	}
780 
781 	conn.autoPurge();
782  
783 	conn.resetPacket();
784 
785 	ubyte[] header;
786 	header.length = 4 /*header*/ + 1 /*cmd*/;
787 	header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
788 	header[4] = cmd;
789 	conn.bumpPacket();
790 
791 	conn._socket.send(header, cast(const(ubyte)[])data);
792 }
793 
794 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false)
795 {
796 	auto okp = OKErrorPacket(conn.getPacket());
797 	enforcePacketOK(okp);
798 	conn._serverStatus = okp.serverStatus;
799 	return okp;
800 }
801 
802 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token)
803 in
804 {
805 	assert(token.length == 20);
806 }
807 body
808 {
809 	ubyte[] packet;
810 	packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1);
811 	packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
812 
813 	// NOTE: we'll set the header last when we know the size
814 
815 	// Set the default capabilities required by the client
816 	conn._cCaps.packInto(packet[4..8]);
817 
818 	// Request a conventional maximum packet length.
819 	1.packInto(packet[8..12]);
820 
821 	packet ~= 33; // Set UTF-8 as default charSet
822 
823 	// There's a statutory block of zero bytes here - fill them in.
824 	foreach(i; 0 .. 23)
825 		packet ~= 0;
826 
827 	// Add the user name as a null terminated string
828 	foreach(i; 0 .. conn._user.length)
829 		packet ~= conn._user[i];
830 	packet ~= 0; // \0
831 
832 	// Add our calculated authentication token as a length prefixed string.
833 	assert(token.length <= ubyte.max);
834 	if(conn._pwd.length == 0)  // Omit the token if the account has no password
835 		packet ~= 0;
836 	else
837 	{
838 		packet ~= cast(ubyte)token.length;
839 		foreach(i; 0 .. token.length)
840 			packet ~= token[i];
841 	}
842 
843 	// Add the default database as a null terminated string
844 	foreach(i; 0 .. conn._db.length)
845 		packet ~= conn._db[i];
846 	packet ~= 0; // \0
847 
848 	// The server sent us a greeting with packet number 0, so we send the auth packet
849 	// back with the next number.
850 	packet.setPacketHeader(conn.pktNumber);
851 	conn.bumpPacket();
852 	return packet;
853 }
854 
855 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf)
856 {
857 	auto pass1 = sha1Of(cast(const(ubyte)[])password);
858 	auto pass2 = sha1Of(pass1);
859 
860 	SHA1 sha1;
861 	sha1.start();
862 	sha1.put(authBuf);
863 	sha1.put(pass2);
864 	auto result = sha1.finish();
865 	foreach (size_t i; 0..20)
866 		result[i] = result[i] ^ pass1[i];
867 	return result.dup;
868 }
869 
870 /// Get the next `mysql.result.Row` of a pending result set.
871 package(mysql) Row getNextRow(Connection conn)
872 {
873 	scope(failure) conn.kill();
874 
875 	if (conn._headersPending)
876 	{
877 		conn._rsh = ResultSetHeaders(conn, conn._fieldCount);
878 		conn._headersPending = false;
879 	}
880 	ubyte[] packet;
881 	Row rr;
882 	packet = conn.getPacket();
883 	if(packet.front == ResultPacketMarker.error)
884 		throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__);
885 
886 	if (packet.isEOFPacket())
887 	{
888 		conn._rowsPending = conn._binaryPending = false;
889 		return rr;
890 	}
891 	if (conn._binaryPending)
892 		rr = Row(conn, packet, conn._rsh, true);
893 	else
894 		rr = Row(conn, packet, conn._rsh, false);
895 	//rr._valid = true;
896 	return rr;
897 }
898 
899 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet)
900 {
901 	scope(failure) conn.kill();
902 
903 	conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
904 	conn._sCharSet = packet.consume!ubyte(); // server_language
905 	conn._serverStatus = packet.consume!ushort(); //server_status
906 	conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
907 	conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
908 
909 	enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
910 	enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
911 }
912 
913 package(mysql) ubyte[] parseGreeting(Connection conn)
914 {
915 	scope(failure) conn.kill();
916 
917 	ubyte[] packet = conn.getPacket();
918 
919 	if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
920 	{
921 		auto okp = OKErrorPacket(packet);
922 		enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
923 	}
924 
925 	conn._protocol = packet.consume!ubyte();
926 
927 	conn._serverVersion = packet.consume!string(packet.countUntil(0));
928 	packet.skip(1); // \0 terminated _serverVersion
929 
930 	conn._sThread = packet.consume!uint();
931 
932 	// read first part of scramble buf
933 	ubyte[] authBuf;
934 	authBuf.length = 255;
935 	authBuf[0..8] = packet.consume(8)[]; // scramble_buff
936 
937 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
938 
939 	conn.consumeServerInfo(packet);
940 
941 	packet.skip(1); // this byte supposed to be scramble length, but is actually zero
942 	packet.skip(10); // filler of \0
943 
944 	// rest of the scramble
945 	auto len = packet.countUntil(0);
946 	enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
947 	enforce(authBuf.length > 8+len);
948 	authBuf[8..8+len] = packet.consume(len)[];
949 	authBuf.length = 8+len; // cut to correct size
950 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
951 
952 	return authBuf;
953 }
954 
955 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
956 {
957 	SvrCapFlags common;
958 	uint filter = 1;
959 	foreach (size_t i; 0..uint.sizeof)
960 	{
961 		bool serverSupport = (server & filter) != 0; // can the server do this capability?
962 		bool clientSupport = (client & filter) != 0; // can we support it?
963 		if(serverSupport && clientSupport)
964 			common |= filter;
965 		filter <<= 1; // check next flag
966 	}
967 	return common;
968 }
969 
970 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags)
971 {
972 	auto cCaps = getCommonCapabilities(serverCaps, capFlags);
973 
974 	// We cannot operate in <4.1 protocol, so we'll force it even if the user
975 	// didn't supply it
976 	cCaps |= SvrCapFlags.PROTOCOL41;
977 	cCaps |= SvrCapFlags.SECURE_CONNECTION;
978 	
979 	return cCaps;
980 }
981 
982 package(mysql) void authenticate(Connection conn, ubyte[] greeting)
983 in
984 {
985 	assert(conn._open == Connection.OpenState.connected);
986 }
987 out
988 {
989 	assert(conn._open == Connection.OpenState.authenticated);
990 }
991 body
992 {
993 	auto token = makeToken(conn._pwd, greeting);
994 	auto authPacket = conn.buildAuthPacket(token);
995 	conn._socket.send(authPacket);
996 
997 	auto packet = conn.getPacket();
998 	auto okp = OKErrorPacket(packet);
999 	enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
1000 	conn._open = Connection.OpenState.authenticated;
1001 }
1002 
1003 // Register prepared statement
1004 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql)
1005 {
1006 	scope(failure) conn.kill();
1007 
1008 	PreparedServerInfo info;
1009 	
1010 	conn.sendCmd(CommandType.STMT_PREPARE, sql);
1011 	conn._fieldCount = 0;
1012 
1013 	ubyte[] packet = conn.getPacket();
1014 	if(packet.front == ResultPacketMarker.ok)
1015 	{
1016 		packet.popFront();
1017 		info.statementId    = packet.consume!int();
1018 		conn._fieldCount    = packet.consume!short();
1019 		info.numParams      = packet.consume!short();
1020 
1021 		packet.popFront(); // one byte filler
1022 		info.psWarnings     = packet.consume!short();
1023 
1024 		// At this point the server also sends field specs for parameters
1025 		// and columns if there were any of each
1026 		info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams);
1027 	}
1028 	else if(packet.front == ResultPacketMarker.error)
1029 	{
1030 		auto error = OKErrorPacket(packet);
1031 		enforcePacketOK(error);
1032 		assert(0); // FIXME: what now?
1033 	}
1034 	else
1035 		assert(0); // FIXME: what now?
1036 
1037 	return info;
1038 }
1039 
1040 /++
1041 Flush any outstanding result set elements.
1042 
1043 When the server responds to a command that produces a result set, it
1044 queues the whole set of corresponding packets over the current connection.
1045 Before that `Connection` can embark on any new command, it must receive
1046 all of those packets and junk them.
1047 
1048 As of v1.1.4, this is done automatically as needed. But you can still
1049 call this manually to force a purge to occur when you want.
1050 
1051 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/)
1052 +/
1053 package(mysql) ulong purgeResult(Connection conn)
1054 {
1055 	scope(failure) conn.kill();
1056 
1057 	conn._lastCommandID++;
1058 
1059 	ulong rows = 0;
1060 	if (conn._headersPending)
1061 	{
1062 		for (size_t i = 0;; i++)
1063 		{
1064 			if (conn.getPacket().isEOFPacket())
1065 			{
1066 				conn._headersPending = false;
1067 				break;
1068 			}
1069 			enforce!MYXProtocol(i < conn._fieldCount,
1070 				text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found."));
1071 		}
1072 	}
1073 	if (conn._rowsPending)
1074 	{
1075 		for (;;  rows++)
1076 		{
1077 			if (conn.getPacket().isEOFPacket())
1078 			{
1079 				conn._rowsPending = conn._binaryPending = false;
1080 				break;
1081 			}
1082 		}
1083 	}
1084 	conn.resetPacket();
1085 	return rows;
1086 }
1087 
1088 /++
1089 Get a textual report on the server status.
1090 
1091 (COM_STATISTICS)
1092 +/
1093 package(mysql) string serverStats(Connection conn)
1094 {
1095 	conn.sendCmd(CommandType.STATISTICS, []);
1096 	return cast(string) conn.getPacket();
1097 }
1098 
1099 /++
1100 Enable multiple statement commands.
1101 
1102 This can be used later if this feature was not requested in the client capability flags.
1103 
1104 Warning: This functionality is currently untested.
1105 
1106 Params: on = Boolean value to turn the capability on or off.
1107 +/
1108 //TODO: Need to test this
1109 package(mysql) void enableMultiStatements(Connection conn, bool on)
1110 {
1111 	scope(failure) conn.kill();
1112 
1113 	ubyte[] t;
1114 	t.length = 2;
1115 	t[0] = on ? 0 : 1;
1116 	t[1] = 0;
1117 	conn.sendCmd(CommandType.STMT_OPTION, t);
1118 
1119 	// For some reason this command gets an EOF packet as response
1120 	auto packet = conn.getPacket();
1121 	enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1122 }