1 /++
2 Internal - Low-level communications.
3 
4 Consider this module the main entry point for the low-level MySQL/MariaDB
5 protocol code. The other modules in `mysql.protocol` are mainly tools
6 to support this module.
7 
8 Previously, the code handling low-level protocol details was scattered all
9 across the library. Such functionality has been factored out into this module,
10 to be kept in one place for better encapsulation and to facilitate further
11 cleanup and refactoring.
12 
13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it
14 eventually settles into what will eventually become a low-level library
15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight...
16 
17 Next tasks for this sub-package's cleanup:
18 - Reduce this module's reliance on Connection.
19 - Abstract out a PacketStream to clean up getPacket and related functionality.
20 +/
21 module mysql.protocol.comms;
22 
23 import std.algorithm;
24 import std.array;
25 import std.conv;
26 import std.digest.sha;
27 import std.exception;
28 import std.range;
29 import std.variant;
30 
31 import mysql.connection;
32 import mysql.exceptions;
33 import mysql.prepared;
34 import mysql.result;
35 
36 import mysql.protocol.constants;
37 import mysql.protocol.extra_types;
38 import mysql.protocol.packet_helpers;
39 import mysql.protocol.packets;
40 import mysql.protocol.sockets;
41 
42 /// Low-level comms code relating to prepared statements.
43 package struct ProtocolPrepared
44 {
45 	import std.conv;
46 	import std.datetime;
47 	import std.variant;
48 	import mysql.types;
49 
50 	static ubyte[] makeBitmap(in Variant[] inParams)
51 	{
52 		size_t bml = (inParams.length+7)/8;
53 		ubyte[] bma;
54 		bma.length = bml;
55 		foreach (i; 0..inParams.length)
56 		{
57 			if(inParams[i].type != typeid(typeof(null)))
58 				continue;
59 			size_t bn = i/8;
60 			size_t bb = i%8;
61 			ubyte sr = 1;
62 			sr <<= bb;
63 			bma[bn] |= sr;
64 		}
65 		return bma;
66 	}
67 
68 	static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow
69 	{
70 		ubyte[] prefix;
71 		prefix.length = 14;
72 
73 		prefix[4] = CommandType.STMT_EXECUTE;
74 		hStmt.packInto(prefix[5..9]);
75 		prefix[9] = flags;   // flags, no cursor
76 		prefix[10] = 1; // iteration count - currently always 1
77 		prefix[11] = 0;
78 		prefix[12] = 0;
79 		prefix[13] = 0;
80 
81 		return prefix;
82 	}
83 
84 	static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa,
85 		out ubyte[] vals, out bool longData)
86 	{
87 		size_t pc = inParams.length;
88 		ubyte[] types;
89 		types.length = pc*2;
90 		size_t alloc = pc*20;
91 		vals.length = alloc;
92 		uint vcl = 0, len;
93 		int ct = 0;
94 
95 		void reAlloc(size_t n)
96 		{
97 			if (vcl+n < alloc)
98 				return;
99 			size_t inc = (alloc*3)/2;
100 			if (inc <  n)
101 				inc = n;
102 			alloc += inc;
103 			vals.length = alloc;
104 		}
105 
106 		foreach (size_t i; 0..pc)
107 		{
108 			enum UNSIGNED  = 0x80;
109 			enum SIGNED    = 0;
110 			if (psa[i].chunkSize)
111 				longData= true;
112 			if (inParams[i].type == typeid(typeof(null)))
113 			{
114 				types[ct++] = SQLType.NULL;
115 				types[ct++] = SIGNED;
116 				continue;
117 			}
118 			Variant v = inParams[i];
119 			SQLType ext = psa[i].type;
120 			string ts = v.type.toString();
121 			bool isRef;
122 			if (ts[$-1] == '*')
123 			{
124 				ts.length = ts.length-1;
125 				isRef= true;
126 			}
127 
128 			switch (ts)
129 			{
130 				case "bool":
131 				case "const(bool)":
132 				case "immutable(bool)":
133 				case "shared(immutable(bool))":
134 					if (ext == SQLType.INFER_FROM_D_TYPE)
135 						types[ct++] = SQLType.BIT;
136 					else
137 						types[ct++] = cast(ubyte) ext;
138 					types[ct++] = SIGNED;
139 					reAlloc(2);
140 					bool bv = isRef? *(v.get!(const(bool*))): v.get!(const(bool));
141 					vals[vcl++] = 1;
142 					vals[vcl++] = bv? 0x31: 0x30;
143 					break;
144 				case "byte":
145 				case "const(byte)":
146 				case "immutable(byte)":
147 				case "shared(immutable(byte))":
148 					types[ct++] = SQLType.TINY;
149 					types[ct++] = SIGNED;
150 					reAlloc(1);
151 					vals[vcl++] = isRef? *(v.get!(const(byte*))): v.get!(const(byte));
152 					break;
153 				case "ubyte":
154 				case "const(ubyte)":
155 				case "immutable(ubyte)":
156 				case "shared(immutable(ubyte))":
157 					types[ct++] = SQLType.TINY;
158 					types[ct++] = UNSIGNED;
159 					reAlloc(1);
160 					vals[vcl++] = isRef? *(v.get!(const(ubyte*))): v.get!(const(ubyte));
161 					break;
162 				case "short":
163 				case "const(short)":
164 				case "immutable(short)":
165 				case "shared(immutable(short))":
166 					types[ct++] = SQLType.SHORT;
167 					types[ct++] = SIGNED;
168 					reAlloc(2);
169 					short si = isRef? *(v.get!(const(short*))): v.get!(const(short));
170 					vals[vcl++] = cast(ubyte) (si & 0xff);
171 					vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
172 					break;
173 				case "ushort":
174 				case "const(ushort)":
175 				case "immutable(ushort)":
176 				case "shared(immutable(ushort))":
177 					types[ct++] = SQLType.SHORT;
178 					types[ct++] = UNSIGNED;
179 					reAlloc(2);
180 					ushort us = isRef? *(v.get!(const(ushort*))): v.get!(const(ushort));
181 					vals[vcl++] = cast(ubyte) (us & 0xff);
182 					vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
183 					break;
184 				case "int":
185 				case "const(int)":
186 				case "immutable(int)":
187 				case "shared(immutable(int))":
188 					types[ct++] = SQLType.INT;
189 					types[ct++] = SIGNED;
190 					reAlloc(4);
191 					int ii = isRef? *(v.get!(const(int*))): v.get!(const(int));
192 					vals[vcl++] = cast(ubyte) (ii & 0xff);
193 					vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
194 					vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
195 					vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
196 					break;
197 				case "uint":
198 				case "const(uint)":
199 				case "immutable(uint)":
200 				case "shared(immutable(uint))":
201 					types[ct++] = SQLType.INT;
202 					types[ct++] = UNSIGNED;
203 					reAlloc(4);
204 					uint ui = isRef? *(v.get!(const(uint*))): v.get!(const(uint));
205 					vals[vcl++] = cast(ubyte) (ui & 0xff);
206 					vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
207 					vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
208 					vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
209 					break;
210 				case "long":
211 				case "const(long)":
212 				case "immutable(long)":
213 				case "shared(immutable(long))":
214 					types[ct++] = SQLType.LONGLONG;
215 					types[ct++] = SIGNED;
216 					reAlloc(8);
217 					long li = isRef? *(v.get!(const(long*))): v.get!(const(long));
218 					vals[vcl++] = cast(ubyte) (li & 0xff);
219 					vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
220 					vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
221 					vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
222 					vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
223 					vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
224 					vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
225 					vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
226 					break;
227 				case "ulong":
228 				case "const(ulong)":
229 				case "immutable(ulong)":
230 				case "shared(immutable(ulong))":
231 					types[ct++] = SQLType.LONGLONG;
232 					types[ct++] = UNSIGNED;
233 					reAlloc(8);
234 					ulong ul = isRef? *(v.get!(const(ulong*))): v.get!(const(ulong));
235 					vals[vcl++] = cast(ubyte) (ul & 0xff);
236 					vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
237 					vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
238 					vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
239 					vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
240 					vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
241 					vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
242 					vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
243 					break;
244 				case "float":
245 				case "const(float)":
246 				case "immutable(float)":
247 				case "shared(immutable(float))":
248 					types[ct++] = SQLType.FLOAT;
249 					types[ct++] = SIGNED;
250 					reAlloc(4);
251 					float f = isRef? *(v.get!(const(float*))): v.get!(const(float));
252 					ubyte* ubp = cast(ubyte*) &f;
253 					vals[vcl++] = *ubp++;
254 					vals[vcl++] = *ubp++;
255 					vals[vcl++] = *ubp++;
256 					vals[vcl++] = *ubp;
257 					break;
258 				case "double":
259 				case "const(double)":
260 				case "immutable(double)":
261 				case "shared(immutable(double))":
262 					types[ct++] = SQLType.DOUBLE;
263 					types[ct++] = SIGNED;
264 					reAlloc(8);
265 					double d = isRef? *(v.get!(const(double*))): v.get!(const(double));
266 					ubyte* ubp = cast(ubyte*) &d;
267 					vals[vcl++] = *ubp++;
268 					vals[vcl++] = *ubp++;
269 					vals[vcl++] = *ubp++;
270 					vals[vcl++] = *ubp++;
271 					vals[vcl++] = *ubp++;
272 					vals[vcl++] = *ubp++;
273 					vals[vcl++] = *ubp++;
274 					vals[vcl++] = *ubp;
275 					break;
276 				case "std.datetime.date.Date":
277 				case "const(std.datetime.date.Date)":
278 				case "immutable(std.datetime.date.Date)":
279 				case "shared(immutable(std.datetime.date.Date))":
280 
281 				case "std.datetime.Date":
282 				case "const(std.datetime.Date)":
283 				case "immutable(std.datetime.Date)":
284 				case "shared(immutable(std.datetime.Date))":
285 					types[ct++] = SQLType.DATE;
286 					types[ct++] = SIGNED;
287 					Date date = isRef? *(v.get!(const(Date*))): v.get!(const(Date));
288 					ubyte[] da = pack(date);
289 					size_t l = da.length;
290 					reAlloc(l);
291 					vals[vcl..vcl+l] = da[];
292 					vcl += l;
293 					break;
294 				case "std.datetime.TimeOfDay":
295 				case "const(std.datetime.TimeOfDay)":
296 				case "immutable(std.datetime.TimeOfDay)":
297 				case "shared(immutable(std.datetime.TimeOfDay))":
298 
299 				case "std.datetime.date.TimeOfDay":
300 				case "const(std.datetime.date.TimeOfDay)":
301 				case "immutable(std.datetime.date.TimeOfDay)":
302 				case "shared(immutable(std.datetime.date.TimeOfDay))":
303 
304 				case "std.datetime.Time":
305 				case "const(std.datetime.Time)":
306 				case "immutable(std.datetime.Time)":
307 				case "shared(immutable(std.datetime.Time))":
308 					types[ct++] = SQLType.TIME;
309 					types[ct++] = SIGNED;
310 					TimeOfDay time = isRef? *(v.get!(const(TimeOfDay*))): v.get!(const(TimeOfDay));
311 					ubyte[] ta = pack(time);
312 					size_t l = ta.length;
313 					reAlloc(l);
314 					vals[vcl..vcl+l] = ta[];
315 					vcl += l;
316 					break;
317 				case "std.datetime.date.DateTime":
318 				case "const(std.datetime.date.DateTime)":
319 				case "immutable(std.datetime.date.DateTime)":
320 				case "shared(immutable(std.datetime.date.DateTime))":
321 
322 				case "std.datetime.DateTime":
323 				case "const(std.datetime.DateTime)":
324 				case "immutable(std.datetime.DateTime)":
325 				case "shared(immutable(std.datetime.DateTime))":
326 					types[ct++] = SQLType.DATETIME;
327 					types[ct++] = SIGNED;
328 					DateTime dt = isRef? *(v.get!(const(DateTime*))): v.get!(const(DateTime));
329 					ubyte[] da = pack(dt);
330 					size_t l = da.length;
331 					reAlloc(l);
332 					vals[vcl..vcl+l] = da[];
333 					vcl += l;
334 					break;
335 				case "mysql.types.Timestamp":
336 				case "const(mysql.types.Timestamp)":
337 				case "immutable(mysql.types.Timestamp)":
338 				case "shared(immutable(mysql.types.Timestamp))":
339 					types[ct++] = SQLType.TIMESTAMP;
340 					types[ct++] = SIGNED;
341 					Timestamp tms = isRef? *(v.get!(const(Timestamp*))): v.get!(const(Timestamp));
342 					DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep);
343 					ubyte[] da = pack(dt);
344 					size_t l = da.length;
345 					reAlloc(l);
346 					vals[vcl..vcl+l] = da[];
347 					vcl += l;
348 					break;
349 				case "char[]":
350 				case "const(char[])":
351 				case "immutable(char[])":
352 				case "const(char)[]":
353 				case "immutable(char)[]":
354 				case "shared(immutable(char)[])":
355 				case "shared(immutable(char))[]":
356 				case "shared(immutable(char[]))":
357 					if (ext == SQLType.INFER_FROM_D_TYPE)
358 						types[ct++] = SQLType.VARCHAR;
359 					else
360 						types[ct++] = cast(ubyte) ext;
361 					types[ct++] = SIGNED;
362 					const char[] ca = isRef? *(v.get!(const(char[]*))): v.get!(const(char[]));
363 					ubyte[] packed = packLCS(cast(void[]) ca);
364 					reAlloc(packed.length);
365 					vals[vcl..vcl+packed.length] = packed[];
366 					vcl += packed.length;
367 					break;
368 				case "byte[]":
369 				case "const(byte[])":
370 				case "immutable(byte[])":
371 				case "const(byte)[]":
372 				case "immutable(byte)[]":
373 				case "shared(immutable(byte)[])":
374 				case "shared(immutable(byte))[]":
375 				case "shared(immutable(byte[]))":
376 					if (ext == SQLType.INFER_FROM_D_TYPE)
377 						types[ct++] = SQLType.TINYBLOB;
378 					else
379 						types[ct++] = cast(ubyte) ext;
380 					types[ct++] = SIGNED;
381 					const byte[] ba = isRef? *(v.get!(const(byte[]*))): v.get!(const(byte[]));
382 					ubyte[] packed = packLCS(cast(void[]) ba);
383 					reAlloc(packed.length);
384 					vals[vcl..vcl+packed.length] = packed[];
385 					vcl += packed.length;
386 					break;
387 				case "ubyte[]":
388 				case "const(ubyte[])":
389 				case "immutable(ubyte[])":
390 				case "const(ubyte)[]":
391 				case "immutable(ubyte)[]":
392 				case "shared(immutable(ubyte)[])":
393 				case "shared(immutable(ubyte))[]":
394 				case "shared(immutable(ubyte[]))":
395 					if (ext == SQLType.INFER_FROM_D_TYPE)
396 						types[ct++] = SQLType.TINYBLOB;
397 					else
398 						types[ct++] = cast(ubyte) ext;
399 					types[ct++] = SIGNED;
400 					const ubyte[] uba = isRef? *(v.get!(const(ubyte[]*))): v.get!(const(ubyte[]));
401 					ubyte[] packed = packLCS(cast(void[]) uba);
402 					reAlloc(packed.length);
403 					vals[vcl..vcl+packed.length] = packed[];
404 					vcl += packed.length;
405 					break;
406 				case "void":
407 					throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
408 				default:
409 					throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__);
410 			}
411 		}
412 		vals.length = vcl;
413 		return types;
414 	}
415 
416 	static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa)
417 	{
418 		assert(psa.length <= ushort.max); // parameter number is sent as short
419 		foreach (size_t i, PSN psn; psa)
420 		{
421 			if (!psn.chunkSize) continue;
422 			uint cs = psn.chunkSize;
423 			uint delegate(ubyte[]) dg = psn.chunkDelegate;
424 
425 			ubyte[] chunk;
426 			chunk.length = cs+11;
427 			chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
428 			chunk[4] = CommandType.STMT_SEND_LONG_DATA;
429 			hStmt.packInto(chunk[5..9]); // statement handle
430 			packInto(cast(ushort)i, chunk[9..11]); // parameter number
431 
432 			// byte 11 on is payload
433 			for (;;)
434 			{
435 				uint sent = dg(chunk[11..cs+11]);
436 				if (sent < cs)
437 				{
438 					if (sent == 0)    // data was exact multiple of chunk size - all sent
439 						break;
440 					chunk.length = chunk.length - (cs-sent);     // trim the chunk
441 					sent += 7;        // adjust for non-payload bytes
442 					packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
443 					socket.send(chunk);
444 					break;
445 				}
446 				socket.send(chunk);
447 			}
448 		}
449 	}
450 
451 	static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
452 		Variant[] inParams, ParameterSpecialization[] psa)
453 	{
454 		conn.autoPurge();
455 
456 		ubyte[] packet;
457 		conn.resetPacket();
458 
459 		ubyte[] prefix = makePSPrefix(hStmt, 0);
460 		size_t len = prefix.length;
461 		bool longData;
462 
463 		if (psh.paramCount)
464 		{
465 			ubyte[] one = [ 1 ];
466 			ubyte[] vals;
467 			ubyte[] types = analyseParams(inParams, psa, vals, longData);
468 			ubyte[] nbm = makeBitmap(inParams);
469 			packet = prefix ~ nbm ~ one ~ types ~ vals;
470 		}
471 		else
472 			packet = prefix;
473 
474 		if (longData)
475 			sendLongData(conn._socket, hStmt, psa);
476 
477 		assert(packet.length <= uint.max);
478 		packet.setPacketHeader(conn.pktNumber);
479 		conn.bumpPacket();
480 		conn._socket.send(packet);
481 	}
482 }
483 
484 package(mysql) struct ExecQueryImplInfo
485 {
486 	bool isPrepared;
487 
488 	// For non-prepared statements:
489 	const(char[]) sql;
490 
491 	// For prepared statements:
492 	uint hStmt;
493 	PreparedStmtHeaders psh;
494 	Variant[] inParams;
495 	ParameterSpecialization[] psa;
496 }
497 
498 /++
499 Internal implementation for the exec and query functions.
500 
501 Execute a one-off SQL command.
502 
503 Any result set can be accessed via Connection.getNextRow(), but you should really be
504 using the query function for such queries.
505 
506 Params: ra = An out parameter to receive the number of rows affected.
507 Returns: true if there was a (possibly empty) result set.
508 +/
509 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra)
510 {
511 	scope(failure) conn.kill();
512 
513 	// Send data
514 	if(info.isPrepared)
515 		ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
516 	else
517 	{
518 		conn.sendCmd(CommandType.QUERY, info.sql);
519 		conn._fieldCount = 0;
520 	}
521 
522 	// Handle response
523 	ubyte[] packet = conn.getPacket();
524 	bool rv;
525 	if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
526 	{
527 		conn.resetPacket();
528 		auto okp = OKErrorPacket(packet);
529 		enforcePacketOK(okp);
530 		ra = okp.affected;
531 		conn._serverStatus = okp.serverStatus;
532 		conn._insertID = okp.insertID;
533 		rv = false;
534 	}
535 	else
536 	{
537 		// There was presumably a result set
538 		assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value
539 		conn._headersPending = conn._rowsPending = true;
540 		conn._binaryPending = info.isPrepared;
541 		auto lcb = packet.consumeIfComplete!LCB();
542 		assert(!lcb.isNull);
543 		assert(!lcb.isIncomplete);
544 		conn._fieldCount = cast(ushort)lcb.value;
545 		assert(conn._fieldCount == lcb.value);
546 		rv = true;
547 		ra = 0;
548 	}
549 	return rv;
550 }
551 
552 ///ditto
553 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info)
554 {
555 	ulong rowsAffected;
556 	return execQueryImpl(conn, info, rowsAffected);
557 }
558 
559 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId)
560 {
561 	scope(failure) conn.kill();
562 
563 	if(conn.closed())
564 		return;
565 
566 	ubyte[9] packet_buf;
567 	ubyte[] packet = packet_buf;
568 	packet.setPacketHeader(0/*packet number*/);
569 	conn.bumpPacket();
570 	packet[4] = CommandType.STMT_CLOSE;
571 	statementId.packInto(packet[5..9]);
572 	conn.purgeResult();
573 	conn._socket.send(packet);
574 	// It seems that the server does not find it necessary to send a response
575 	// for this command.
576 }
577 
578 // Moved here from `struct Row`
579 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure
580 {
581 	uint bitmapLength = calcBitmapLength(fieldCount);
582 	enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields");
583 	auto bitmap = packet.consume(bitmapLength);
584 	return decodeNullBitmap(bitmap, fieldCount);
585 }
586 
587 // Moved here from `struct Row`
588 private static uint calcBitmapLength(uint fieldCount) pure nothrow
589 {
590 	return (fieldCount+7+2)/8;
591 }
592 
593 // Moved here from `struct Row`
594 // This is to decode the bitmap in a binary result row. First two bits are skipped
595 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow
596 in
597 {
598 	assert(bitmap.length >= calcBitmapLength(numFields),
599 		"bitmap not large enough to store all null fields");
600 }
601 out(result)
602 {
603 	assert(result.length == numFields);
604 }
605 body
606 {
607 	bool[] nulls;
608 	nulls.length = numFields;
609 
610 	// the current byte we are processing for nulls
611 	ubyte bits = bitmap.front();
612 	// strip away the first two bits as they are reserved
613 	bits >>= 2;
614 	// .. and then we only have 6 bits left to process for this byte
615 	ubyte bitsLeftInByte = 6;
616 	foreach(ref isNull; nulls)
617 	{
618 		assert(bitsLeftInByte <= 8);
619 		// processed all bits? fetch new byte
620 		if (bitsLeftInByte == 0)
621 		{
622 			assert(bits == 0, "not all bits are processed!");
623 			assert(!bitmap.empty, "bits array too short for number of columns");
624 			bitmap.popFront();
625 			bits = bitmap.front;
626 			bitsLeftInByte = 8;
627 		}
628 		assert(bitsLeftInByte > 0);
629 		isNull = (bits & 0b0000_0001) != 0;
630 
631 		// get ready to process next bit
632 		bits >>= 1;
633 		--bitsLeftInByte;
634 	}
635 	return nulls;
636 }
637 
638 // Moved here from `struct Row.this`
639 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary,
640 	out Variant[] _values, out bool[] _nulls, out string[] _names)
641 in
642 {
643 	assert(rh.fieldCount <= uint.max);
644 }
645 body
646 {
647 	scope(failure) conn.kill();
648 
649 	uint fieldCount = cast(uint)rh.fieldCount;
650 	_values.length = _nulls.length = _names.length = fieldCount;
651 
652 	if(binary)
653 	{
654 		// There's a null byte header on a binary result sequence, followed by some bytes of bitmap
655 		// indicating which columns are null
656 		enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row");
657 		packet.popFront();
658 		_nulls = consumeNullBitmap(packet, fieldCount);
659 	}
660 
661 	foreach(size_t i; 0..fieldCount)
662 	{
663 		if(binary && _nulls[i])
664 		{
665 			_values[i] = null;
666 			continue;
667 		}
668 
669 		SQLValue sqlValue;
670 		do
671 		{
672 			FieldDescription fd = rh[i];
673 			_names[i] = fd.name;
674 			sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet);
675 			// TODO: Support chunk delegate
676 			if(sqlValue.isIncomplete)
677 				packet ~= conn.getPacket();
678 		} while(sqlValue.isIncomplete);
679 		assert(!sqlValue.isIncomplete);
680 
681 		if(sqlValue.isNull)
682 		{
683 			assert(!binary);
684 			assert(!_nulls[i]);
685 			_nulls[i] = true;
686 			_values[i] = null;
687 		}
688 		else
689 		{
690 			_values[i] = sqlValue.value;
691 		}
692 	}
693 }
694 
695 ////// Moved here from Connection /////////////////////////////////
696 
697 package(mysql) ubyte[] getPacket(Connection conn)
698 {
699 	scope(failure) conn.kill();
700 
701 	ubyte[4] header;
702 	conn._socket.read(header);
703 	// number of bytes always set as 24-bit
704 	uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
705 	enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order");
706 	conn.bumpPacket();
707 
708 	ubyte[] packet = new ubyte[numDataBytes];
709 	conn._socket.read(packet);
710 	assert(packet.length == numDataBytes, "Wrong number of bytes read");
711 	return packet;
712 }
713 
714 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet)
715 in
716 {
717 	assert(packet.length > 4); // at least 1 byte more than header
718 }
719 body
720 {
721 	_socket.write(packet);
722 }
723 
724 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data)
725 in
726 {
727 	assert(header.length == 4 || header.length == 5/*command type included*/);
728 }
729 body
730 {
731 	_socket.write(header);
732 	if(data.length)
733 		_socket.write(data);
734 }
735 
736 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
737 in
738 {
739 	// Internal thread states. Clients shouldn't use this
740 	assert(cmd != CommandType.SLEEP);
741 	assert(cmd != CommandType.CONNECT);
742 	assert(cmd != CommandType.TIME);
743 	assert(cmd != CommandType.DELAYED_INSERT);
744 	assert(cmd != CommandType.CONNECT_OUT);
745 
746 	// Deprecated
747 	assert(cmd != CommandType.CREATE_DB);
748 	assert(cmd != CommandType.DROP_DB);
749 	assert(cmd != CommandType.TABLE_DUMP);
750 
751 	// cannot send more than uint.max bytes. TODO: better error message if we try?
752 	assert(data.length <= uint.max);
753 }
754 out
755 {
756 	// at this point we should have sent a command
757 	assert(conn.pktNumber == 1);
758 }
759 body
760 {
761 	scope(failure) conn.kill();
762 
763 	conn._lastCommandID++;
764 
765 	if(!conn._socket.connected)
766 	{
767 		if(cmd == CommandType.QUIT)
768 			return; // Don't bother reopening connection just to quit
769 
770 		conn._open = Connection.OpenState.notConnected;
771 		conn.connect(conn._clientCapabilities);
772 	}
773 
774 	conn.autoPurge();
775 
776 	conn.resetPacket();
777 
778 	ubyte[] header;
779 	header.length = 4 /*header*/ + 1 /*cmd*/;
780 	header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
781 	header[4] = cmd;
782 	conn.bumpPacket();
783 
784 	conn._socket.send(header, cast(const(ubyte)[])data);
785 }
786 
787 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false)
788 {
789 	auto okp = OKErrorPacket(conn.getPacket());
790 	enforcePacketOK(okp);
791 	conn._serverStatus = okp.serverStatus;
792 	return okp;
793 }
794 
795 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token)
796 in
797 {
798 	assert(token.length == 20);
799 }
800 body
801 {
802 	ubyte[] packet;
803 	packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1);
804 	packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
805 
806 	// NOTE: we'll set the header last when we know the size
807 
808 	// Set the default capabilities required by the client
809 	conn._cCaps.packInto(packet[4..8]);
810 
811 	// Request a conventional maximum packet length.
812 	1.packInto(packet[8..12]);
813 
814 	packet ~= getDefaultCollation(conn._serverVersion);
815 
816 	// There's a statutory block of zero bytes here - fill them in.
817 	foreach(i; 0 .. 23)
818 		packet ~= 0;
819 
820 	// Add the user name as a null terminated string
821 	foreach(i; 0 .. conn._user.length)
822 		packet ~= conn._user[i];
823 	packet ~= 0; // \0
824 
825 	// Add our calculated authentication token as a length prefixed string.
826 	assert(token.length <= ubyte.max);
827 	if(conn._pwd.length == 0)  // Omit the token if the account has no password
828 		packet ~= 0;
829 	else
830 	{
831 		packet ~= cast(ubyte)token.length;
832 		foreach(i; 0 .. token.length)
833 			packet ~= token[i];
834 	}
835 
836 	// Add the default database as a null terminated string
837 	foreach(i; 0 .. conn._db.length)
838 		packet ~= conn._db[i];
839 	packet ~= 0; // \0
840 
841 	// The server sent us a greeting with packet number 0, so we send the auth packet
842 	// back with the next number.
843 	packet.setPacketHeader(conn.pktNumber);
844 	conn.bumpPacket();
845 	return packet;
846 }
847 
848 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf)
849 {
850 	auto pass1 = sha1Of(cast(const(ubyte)[])password);
851 	auto pass2 = sha1Of(pass1);
852 
853 	SHA1 sha1;
854 	sha1.start();
855 	sha1.put(authBuf);
856 	sha1.put(pass2);
857 	auto result = sha1.finish();
858 	foreach (size_t i; 0..20)
859 		result[i] = result[i] ^ pass1[i];
860 	return result.dup;
861 }
862 
863 /// Get the next `mysql.result.Row` of a pending result set.
864 package(mysql) Row getNextRow(Connection conn)
865 {
866 	scope(failure) conn.kill();
867 
868 	if (conn._headersPending)
869 	{
870 		conn._rsh = ResultSetHeaders(conn, conn._fieldCount);
871 		conn._headersPending = false;
872 	}
873 	ubyte[] packet;
874 	Row rr;
875 	packet = conn.getPacket();
876 	if(packet.front == ResultPacketMarker.error)
877 		throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__);
878 
879 	if (packet.isEOFPacket())
880 	{
881 		conn._rowsPending = conn._binaryPending = false;
882 		return rr;
883 	}
884 	if (conn._binaryPending)
885 		rr = Row(conn, packet, conn._rsh, true);
886 	else
887 		rr = Row(conn, packet, conn._rsh, false);
888 	//rr._valid = true;
889 	return rr;
890 }
891 
892 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet)
893 {
894 	scope(failure) conn.kill();
895 
896 	conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
897 	conn._sCharSet = packet.consume!ubyte(); // server_language
898 	conn._serverStatus = packet.consume!ushort(); //server_status
899 	conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
900 	conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
901 
902 	enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
903 	enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
904 }
905 
906 package(mysql) ubyte[] parseGreeting(Connection conn)
907 {
908 	scope(failure) conn.kill();
909 
910 	ubyte[] packet = conn.getPacket();
911 
912 	if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
913 	{
914 		auto okp = OKErrorPacket(packet);
915 		enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
916 	}
917 
918 	conn._protocol = packet.consume!ubyte();
919 
920 	conn._serverVersion = packet.consume!string(packet.countUntil(0));
921 	packet.skip(1); // \0 terminated _serverVersion
922 
923 	conn._sThread = packet.consume!uint();
924 
925 	// read first part of scramble buf
926 	ubyte[] authBuf;
927 	authBuf.length = 255;
928 	authBuf[0..8] = packet.consume(8)[]; // scramble_buff
929 
930 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
931 
932 	conn.consumeServerInfo(packet);
933 
934 	packet.skip(1); // this byte supposed to be scramble length, but is actually zero
935 	packet.skip(10); // filler of \0
936 
937 	// rest of the scramble
938 	auto len = packet.countUntil(0);
939 	enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
940 	enforce(authBuf.length > 8+len);
941 	authBuf[8..8+len] = packet.consume(len)[];
942 	authBuf.length = 8+len; // cut to correct size
943 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
944 
945 	return authBuf;
946 }
947 
948 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
949 {
950 	SvrCapFlags common;
951 	uint filter = 1;
952 	foreach (size_t i; 0..uint.sizeof)
953 	{
954 		bool serverSupport = (server & filter) != 0; // can the server do this capability?
955 		bool clientSupport = (client & filter) != 0; // can we support it?
956 		if(serverSupport && clientSupport)
957 			common |= filter;
958 		filter <<= 1; // check next flag
959 	}
960 	return common;
961 }
962 
963 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags)
964 {
965 	auto cCaps = getCommonCapabilities(serverCaps, capFlags);
966 
967 	// We cannot operate in <4.1 protocol, so we'll force it even if the user
968 	// didn't supply it
969 	cCaps |= SvrCapFlags.PROTOCOL41;
970 	cCaps |= SvrCapFlags.SECURE_CONNECTION;
971 
972 	return cCaps;
973 }
974 
975 package(mysql) void authenticate(Connection conn, ubyte[] greeting)
976 in
977 {
978 	assert(conn._open == Connection.OpenState.connected);
979 }
980 out
981 {
982 	assert(conn._open == Connection.OpenState.authenticated);
983 }
984 body
985 {
986 	auto token = makeToken(conn._pwd, greeting);
987 	auto authPacket = conn.buildAuthPacket(token);
988 	conn._socket.send(authPacket);
989 
990 	auto packet = conn.getPacket();
991 	auto okp = OKErrorPacket(packet);
992 	enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
993 	conn._open = Connection.OpenState.authenticated;
994 }
995 
996 // Register prepared statement
997 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql)
998 {
999 	scope(failure) conn.kill();
1000 
1001 	PreparedServerInfo info;
1002 
1003 	conn.sendCmd(CommandType.STMT_PREPARE, sql);
1004 	conn._fieldCount = 0;
1005 
1006 	ubyte[] packet = conn.getPacket();
1007 	if(packet.front == ResultPacketMarker.ok)
1008 	{
1009 		packet.popFront();
1010 		info.statementId    = packet.consume!int();
1011 		conn._fieldCount    = packet.consume!short();
1012 		info.numParams      = packet.consume!short();
1013 
1014 		packet.popFront(); // one byte filler
1015 		info.psWarnings     = packet.consume!short();
1016 
1017 		// At this point the server also sends field specs for parameters
1018 		// and columns if there were any of each
1019 		info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams);
1020 	}
1021 	else if(packet.front == ResultPacketMarker.error)
1022 	{
1023 		auto error = OKErrorPacket(packet);
1024 		enforcePacketOK(error);
1025 		assert(0); // FIXME: what now?
1026 	}
1027 	else
1028 		assert(0); // FIXME: what now?
1029 
1030 	return info;
1031 }
1032 
1033 /++
1034 Flush any outstanding result set elements.
1035 
1036 When the server responds to a command that produces a result set, it
1037 queues the whole set of corresponding packets over the current connection.
1038 Before that `Connection` can embark on any new command, it must receive
1039 all of those packets and junk them.
1040 
1041 As of v1.1.4, this is done automatically as needed. But you can still
1042 call this manually to force a purge to occur when you want.
1043 
1044 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/)
1045 +/
1046 package(mysql) ulong purgeResult(Connection conn)
1047 {
1048 	scope(failure) conn.kill();
1049 
1050 	conn._lastCommandID++;
1051 
1052 	ulong rows = 0;
1053 	if (conn._headersPending)
1054 	{
1055 		for (size_t i = 0;; i++)
1056 		{
1057 			if (conn.getPacket().isEOFPacket())
1058 			{
1059 				conn._headersPending = false;
1060 				break;
1061 			}
1062 			enforce!MYXProtocol(i < conn._fieldCount,
1063 				text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found."));
1064 		}
1065 	}
1066 	if (conn._rowsPending)
1067 	{
1068 		for (;;  rows++)
1069 		{
1070 			if (conn.getPacket().isEOFPacket())
1071 			{
1072 				conn._rowsPending = conn._binaryPending = false;
1073 				break;
1074 			}
1075 		}
1076 	}
1077 	conn.resetPacket();
1078 	return rows;
1079 }
1080 
1081 /++
1082 Get a textual report on the server status.
1083 
1084 (COM_STATISTICS)
1085 +/
1086 package(mysql) string serverStats(Connection conn)
1087 {
1088 	conn.sendCmd(CommandType.STATISTICS, []);
1089 	return cast(string) conn.getPacket();
1090 }
1091 
1092 /++
1093 Enable multiple statement commands.
1094 
1095 This can be used later if this feature was not requested in the client capability flags.
1096 
1097 Warning: This functionality is currently untested.
1098 
1099 Params: on = Boolean value to turn the capability on or off.
1100 +/
1101 //TODO: Need to test this
1102 package(mysql) void enableMultiStatements(Connection conn, bool on)
1103 {
1104 	scope(failure) conn.kill();
1105 
1106 	ubyte[] t;
1107 	t.length = 2;
1108 	t[0] = on ? 0 : 1;
1109 	t[1] = 0;
1110 	conn.sendCmd(CommandType.STMT_OPTION, t);
1111 
1112 	// For some reason this command gets an EOF packet as response
1113 	auto packet = conn.getPacket();
1114 	enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1115 }
1116 
1117 private ubyte getDefaultCollation(string serverVersion)
1118 {
1119 	// MySQL >= 5.5.3 supports utf8mb4
1120 	const v = serverVersion
1121 		.splitter('.')
1122 		.map!(a => a.parse!ushort)
1123 		.array;
1124 
1125 	if (v[0] < 5)
1126 		return 33; // Set utf8_general_ci as default
1127 	if (v[1] < 5)
1128 		return 33; // Set utf8_general_ci as default
1129 	if (v[2] < 3)
1130 		return 33; // Set utf8_general_ci as default
1131 
1132 	return 45; // Set utf8mb4_general_ci as default
1133 }
1134 
1135 unittest
1136 {
1137 	assert(getDefaultCollation("5.5.3") == 45);
1138 	assert(getDefaultCollation("5.5.2") == 33);
1139 
1140 	// MariaDB: https://mariadb.com/kb/en/connection/#initial-handshake-packet
1141 	assert(getDefaultCollation("5.5.5-10.0.7-MariaDB") == 45);
1142 }