1 /++
2 Internal - Low-level communications.
3 
4 Consider this module the main entry point for the low-level MySQL/MariaDB
5 protocol code. The other modules in `mysql.protocol` are mainly tools
6 to support this module.
7 
8 Previously, the code handling low-level protocol details was scattered all
9 across the library. Such functionality has been factored out into this module,
10 to be kept in one place for better encapsulation and to facilitate further
11 cleanup and refactoring.
12 
13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it
14 eventually settles into what will eventually become a low-level library
15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight...
16 
17 Next tasks for this sub-package's cleanup:
18 - Reduce this module's reliance on Connection.
19 - Abstract out a PacketStream to clean up getPacket and related functionality.
20 +/
21 module mysql.protocol.comms;
22 
23 import std.algorithm;
24 import std.conv;
25 import std.digest.sha;
26 import std.exception;
27 import std.range;
28 import std.variant;
29 
30 import mysql.connection;
31 import mysql.exceptions;
32 import mysql.prepared;
33 import mysql.result;
34 
35 import mysql.protocol.constants;
36 import mysql.protocol.extra_types;
37 import mysql.protocol.packet_helpers;
38 import mysql.protocol.packets;
39 import mysql.protocol.sockets;
40 
41 /// Low-level comms code relating to prepared statements.
42 package struct ProtocolPrepared
43 {
44 	import std.conv;
45 	import std.datetime;
46 	import std.variant;
47 	import mysql.types;
48 	
49 	static ubyte[] makeBitmap(in Variant[] inParams)
50 	{
51 		size_t bml = (inParams.length+7)/8;
52 		ubyte[] bma;
53 		bma.length = bml;
54 		foreach (i; 0..inParams.length)
55 		{
56 			if(inParams[i].type != typeid(typeof(null)))
57 				continue;
58 			size_t bn = i/8;
59 			size_t bb = i%8;
60 			ubyte sr = 1;
61 			sr <<= bb;
62 			bma[bn] |= sr;
63 		}
64 		return bma;
65 	}
66 
67 	static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow
68 	{
69 		ubyte[] prefix;
70 		prefix.length = 14;
71 
72 		prefix[4] = CommandType.STMT_EXECUTE;
73 		hStmt.packInto(prefix[5..9]);
74 		prefix[9] = flags;   // flags, no cursor
75 		prefix[10] = 1; // iteration count - currently always 1
76 		prefix[11] = 0;
77 		prefix[12] = 0;
78 		prefix[13] = 0;
79 
80 		return prefix;
81 	}
82 
83 	static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa,
84 		out ubyte[] vals, out bool longData)
85 	{
86 		size_t pc = inParams.length;
87 		ubyte[] types;
88 		types.length = pc*2;
89 		size_t alloc = pc*20;
90 		vals.length = alloc;
91 		uint vcl = 0, len;
92 		int ct = 0;
93 
94 		void reAlloc(size_t n)
95 		{
96 			if (vcl+n < alloc)
97 				return;
98 			size_t inc = (alloc*3)/2;
99 			if (inc <  n)
100 				inc = n;
101 			alloc += inc;
102 			vals.length = alloc;
103 		}
104 
105 		foreach (size_t i; 0..pc)
106 		{
107 			enum UNSIGNED  = 0x80;
108 			enum SIGNED    = 0;
109 			if (psa[i].chunkSize)
110 				longData= true;
111 			if (inParams[i].type == typeid(typeof(null)))
112 			{
113 				types[ct++] = SQLType.NULL;
114 				types[ct++] = SIGNED;
115 				continue;
116 			}
117 			Variant v = inParams[i];
118 			SQLType ext = psa[i].type;
119 			string ts = v.type.toString();
120 			bool isRef;
121 			if (ts[$-1] == '*')
122 			{
123 				ts.length = ts.length-1;
124 				isRef= true;
125 			}
126 
127 			switch (ts)
128 			{
129 				case "bool":
130 				case "const(bool)":
131 				case "immutable(bool)":
132 				case "shared(immutable(bool))":
133 					if (ext == SQLType.INFER_FROM_D_TYPE)
134 						types[ct++] = SQLType.BIT;
135 					else
136 						types[ct++] = cast(ubyte) ext;
137 					types[ct++] = SIGNED;
138 					reAlloc(2);
139 					bool bv = isRef? *(v.get!(const(bool*))): v.get!(const(bool));
140 					vals[vcl++] = 1;
141 					vals[vcl++] = bv? 0x31: 0x30;
142 					break;
143 				case "byte":
144 				case "const(byte)":
145 				case "immutable(byte)":
146 				case "shared(immutable(byte))":
147 					types[ct++] = SQLType.TINY;
148 					types[ct++] = SIGNED;
149 					reAlloc(1);
150 					vals[vcl++] = isRef? *(v.get!(const(byte*))): v.get!(const(byte));
151 					break;
152 				case "ubyte":
153 				case "const(ubyte)":
154 				case "immutable(ubyte)":
155 				case "shared(immutable(ubyte))":
156 					types[ct++] = SQLType.TINY;
157 					types[ct++] = UNSIGNED;
158 					reAlloc(1);
159 					vals[vcl++] = isRef? *(v.get!(const(ubyte*))): v.get!(const(ubyte));
160 					break;
161 				case "short":
162 				case "const(short)":
163 				case "immutable(short)":
164 				case "shared(immutable(short))":
165 					types[ct++] = SQLType.SHORT;
166 					types[ct++] = SIGNED;
167 					reAlloc(2);
168 					short si = isRef? *(v.get!(const(short*))): v.get!(const(short));
169 					vals[vcl++] = cast(ubyte) (si & 0xff);
170 					vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
171 					break;
172 				case "ushort":
173 				case "const(ushort)":
174 				case "immutable(ushort)":
175 				case "shared(immutable(ushort))":
176 					types[ct++] = SQLType.SHORT;
177 					types[ct++] = UNSIGNED;
178 					reAlloc(2);
179 					ushort us = isRef? *(v.get!(const(ushort*))): v.get!(const(ushort));
180 					vals[vcl++] = cast(ubyte) (us & 0xff);
181 					vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
182 					break;
183 				case "int":
184 				case "const(int)":
185 				case "immutable(int)":
186 				case "shared(immutable(int))":
187 					types[ct++] = SQLType.INT;
188 					types[ct++] = SIGNED;
189 					reAlloc(4);
190 					int ii = isRef? *(v.get!(const(int*))): v.get!(const(int));
191 					vals[vcl++] = cast(ubyte) (ii & 0xff);
192 					vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
193 					vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
194 					vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
195 					break;
196 				case "uint":
197 				case "const(uint)":
198 				case "immutable(uint)":
199 				case "shared(immutable(uint))":
200 					types[ct++] = SQLType.INT;
201 					types[ct++] = UNSIGNED;
202 					reAlloc(4);
203 					uint ui = isRef? *(v.get!(const(uint*))): v.get!(const(uint));
204 					vals[vcl++] = cast(ubyte) (ui & 0xff);
205 					vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
206 					vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
207 					vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
208 					break;
209 				case "long":
210 				case "const(long)":
211 				case "immutable(long)":
212 				case "shared(immutable(long))":
213 					types[ct++] = SQLType.LONGLONG;
214 					types[ct++] = SIGNED;
215 					reAlloc(8);
216 					long li = isRef? *(v.get!(const(long*))): v.get!(const(long));
217 					vals[vcl++] = cast(ubyte) (li & 0xff);
218 					vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
219 					vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
220 					vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
221 					vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
222 					vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
223 					vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
224 					vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
225 					break;
226 				case "ulong":
227 				case "const(ulong)":
228 				case "immutable(ulong)":
229 				case "shared(immutable(ulong))":
230 					types[ct++] = SQLType.LONGLONG;
231 					types[ct++] = UNSIGNED;
232 					reAlloc(8);
233 					ulong ul = isRef? *(v.get!(const(ulong*))): v.get!(const(ulong));
234 					vals[vcl++] = cast(ubyte) (ul & 0xff);
235 					vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
236 					vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
237 					vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
238 					vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
239 					vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
240 					vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
241 					vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
242 					break;
243 				case "float":
244 				case "const(float)":
245 				case "immutable(float)":
246 				case "shared(immutable(float))":
247 					types[ct++] = SQLType.FLOAT;
248 					types[ct++] = SIGNED;
249 					reAlloc(4);
250 					float f = isRef? *(v.get!(const(float*))): v.get!(const(float));
251 					ubyte* ubp = cast(ubyte*) &f;
252 					vals[vcl++] = *ubp++;
253 					vals[vcl++] = *ubp++;
254 					vals[vcl++] = *ubp++;
255 					vals[vcl++] = *ubp;
256 					break;
257 				case "double":
258 				case "const(double)":
259 				case "immutable(double)":
260 				case "shared(immutable(double))":
261 					types[ct++] = SQLType.DOUBLE;
262 					types[ct++] = SIGNED;
263 					reAlloc(8);
264 					double d = isRef? *(v.get!(const(double*))): v.get!(const(double));
265 					ubyte* ubp = cast(ubyte*) &d;
266 					vals[vcl++] = *ubp++;
267 					vals[vcl++] = *ubp++;
268 					vals[vcl++] = *ubp++;
269 					vals[vcl++] = *ubp++;
270 					vals[vcl++] = *ubp++;
271 					vals[vcl++] = *ubp++;
272 					vals[vcl++] = *ubp++;
273 					vals[vcl++] = *ubp;
274 					break;
275 				case "std.datetime.date.Date":
276 				case "const(std.datetime.date.Date)":
277 				case "immutable(std.datetime.date.Date)":
278 				case "shared(immutable(std.datetime.date.Date))":
279 
280 				case "std.datetime.Date":
281 				case "const(std.datetime.Date)":
282 				case "immutable(std.datetime.Date)":
283 				case "shared(immutable(std.datetime.Date))":
284 					types[ct++] = SQLType.DATE;
285 					types[ct++] = SIGNED;
286 					Date date = isRef? *(v.get!(const(Date*))): v.get!(const(Date));
287 					ubyte[] da = pack(date);
288 					size_t l = da.length;
289 					reAlloc(l);
290 					vals[vcl..vcl+l] = da[];
291 					vcl += l;
292 					break;
293 				case "std.datetime.TimeOfDay":
294 				case "const(std.datetime.TimeOfDay)":
295 				case "immutable(std.datetime.TimeOfDay)":
296 				case "shared(immutable(std.datetime.TimeOfDay))":
297 
298 				case "std.datetime.date.TimeOfDay":
299 				case "const(std.datetime.date.TimeOfDay)":
300 				case "immutable(std.datetime.date.TimeOfDay)":
301 				case "shared(immutable(std.datetime.date.TimeOfDay))":
302 
303 				case "std.datetime.Time":
304 				case "const(std.datetime.Time)":
305 				case "immutable(std.datetime.Time)":
306 				case "shared(immutable(std.datetime.Time))":
307 					types[ct++] = SQLType.TIME;
308 					types[ct++] = SIGNED;
309 					TimeOfDay time = isRef? *(v.get!(const(TimeOfDay*))): v.get!(const(TimeOfDay));
310 					ubyte[] ta = pack(time);
311 					size_t l = ta.length;
312 					reAlloc(l);
313 					vals[vcl..vcl+l] = ta[];
314 					vcl += l;
315 					break;
316 				case "std.datetime.date.DateTime":
317 				case "const(std.datetime.date.DateTime)":
318 				case "immutable(std.datetime.date.DateTime)":
319 				case "shared(immutable(std.datetime.date.DateTime))":
320 
321 				case "std.datetime.DateTime":
322 				case "const(std.datetime.DateTime)":
323 				case "immutable(std.datetime.DateTime)":
324 				case "shared(immutable(std.datetime.DateTime))":
325 					types[ct++] = SQLType.DATETIME;
326 					types[ct++] = SIGNED;
327 					DateTime dt = isRef? *(v.get!(const(DateTime*))): v.get!(const(DateTime));
328 					ubyte[] da = pack(dt);
329 					size_t l = da.length;
330 					reAlloc(l);
331 					vals[vcl..vcl+l] = da[];
332 					vcl += l;
333 					break;
334 				case "mysql.types.Timestamp":
335 				case "const(mysql.types.Timestamp)":
336 				case "immutable(mysql.types.Timestamp)":
337 				case "shared(immutable(mysql.types.Timestamp))":
338 					types[ct++] = SQLType.TIMESTAMP;
339 					types[ct++] = SIGNED;
340 					Timestamp tms = isRef? *(v.get!(const(Timestamp*))): v.get!(const(Timestamp));
341 					DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep);
342 					ubyte[] da = pack(dt);
343 					size_t l = da.length;
344 					reAlloc(l);
345 					vals[vcl..vcl+l] = da[];
346 					vcl += l;
347 					break;
348 				case "char[]":
349 				case "const(char[])":
350 				case "immutable(char[])":
351 				case "const(char)[]":
352 				case "immutable(char)[]":
353 				case "shared(immutable(char)[])":
354 				case "shared(immutable(char))[]":
355 				case "shared(immutable(char[]))":
356 					if (ext == SQLType.INFER_FROM_D_TYPE)
357 						types[ct++] = SQLType.VARCHAR;
358 					else
359 						types[ct++] = cast(ubyte) ext;
360 					types[ct++] = SIGNED;
361 					const char[] ca = isRef? *(v.get!(const(char[]*))): v.get!(const(char[]));
362 					ubyte[] packed = packLCS(cast(void[]) ca);
363 					reAlloc(packed.length);
364 					vals[vcl..vcl+packed.length] = packed[];
365 					vcl += packed.length;
366 					break;
367 				case "byte[]":
368 				case "const(byte[])":
369 				case "immutable(byte[])":
370 				case "const(byte)[]":
371 				case "immutable(byte)[]":
372 				case "shared(immutable(byte)[])":
373 				case "shared(immutable(byte))[]":
374 				case "shared(immutable(byte[]))":
375 					if (ext == SQLType.INFER_FROM_D_TYPE)
376 						types[ct++] = SQLType.TINYBLOB;
377 					else
378 						types[ct++] = cast(ubyte) ext;
379 					types[ct++] = SIGNED;
380 					const byte[] ba = isRef? *(v.get!(const(byte[]*))): v.get!(const(byte[]));
381 					ubyte[] packed = packLCS(cast(void[]) ba);
382 					reAlloc(packed.length);
383 					vals[vcl..vcl+packed.length] = packed[];
384 					vcl += packed.length;
385 					break;
386 				case "ubyte[]":
387 				case "const(ubyte[])":
388 				case "immutable(ubyte[])":
389 				case "const(ubyte)[]":
390 				case "immutable(ubyte)[]":
391 				case "shared(immutable(ubyte)[])":
392 				case "shared(immutable(ubyte))[]":
393 				case "shared(immutable(ubyte[]))":
394 					if (ext == SQLType.INFER_FROM_D_TYPE)
395 						types[ct++] = SQLType.TINYBLOB;
396 					else
397 						types[ct++] = cast(ubyte) ext;
398 					types[ct++] = SIGNED;
399 					const ubyte[] uba = isRef? *(v.get!(const(ubyte[]*))): v.get!(const(ubyte[]));
400 					ubyte[] packed = packLCS(cast(void[]) uba);
401 					reAlloc(packed.length);
402 					vals[vcl..vcl+packed.length] = packed[];
403 					vcl += packed.length;
404 					break;
405 				case "void":
406 					throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
407 				default:
408 					throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__);
409 			}
410 		}
411 		vals.length = vcl;
412 		return types;
413 	}
414 
415 	static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa)
416 	{
417 		assert(psa.length <= ushort.max); // parameter number is sent as short
418 		foreach (ushort i, PSN psn; psa)
419 		{
420 			if (!psn.chunkSize) continue;
421 			uint cs = psn.chunkSize;
422 			uint delegate(ubyte[]) dg = psn.chunkDelegate;
423 
424 			ubyte[] chunk;
425 			chunk.length = cs+11;
426 			chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
427 			chunk[4] = CommandType.STMT_SEND_LONG_DATA;
428 			hStmt.packInto(chunk[5..9]); // statement handle
429 			packInto(i, chunk[9..11]); // parameter number
430 
431 			// byte 11 on is payload
432 			for (;;)
433 			{
434 				uint sent = dg(chunk[11..cs+11]);
435 				if (sent < cs)
436 				{
437 					if (sent == 0)    // data was exact multiple of chunk size - all sent
438 						break;
439 					chunk.length = chunk.length - (cs-sent);     // trim the chunk
440 					sent += 7;        // adjust for non-payload bytes
441 					packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
442 					socket.send(chunk);
443 					break;
444 				}
445 				socket.send(chunk);
446 			}
447 		}
448 	}
449 
450 	static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
451 		Variant[] inParams, ParameterSpecialization[] psa)
452 	{
453 		conn.autoPurge();
454 		
455 		ubyte[] packet;
456 		conn.resetPacket();
457 
458 		ubyte[] prefix = makePSPrefix(hStmt, 0);
459 		size_t len = prefix.length;
460 		bool longData;
461 
462 		if (psh.paramCount)
463 		{
464 			ubyte[] one = [ 1 ];
465 			ubyte[] vals;
466 			ubyte[] types = analyseParams(inParams, psa, vals, longData);
467 			ubyte[] nbm = makeBitmap(inParams);
468 			packet = prefix ~ nbm ~ one ~ types ~ vals;
469 		}
470 		else
471 			packet = prefix;
472 
473 		if (longData)
474 			sendLongData(conn._socket, hStmt, psa);
475 
476 		assert(packet.length <= uint.max);
477 		packet.setPacketHeader(conn.pktNumber);
478 		conn.bumpPacket();
479 		conn._socket.send(packet);
480 	}
481 }
482 
483 package(mysql) struct ExecQueryImplInfo
484 {
485 	bool isPrepared;
486 
487 	// For non-prepared statements:
488 	const(char[]) sql;
489 
490 	// For prepared statements:
491 	uint hStmt;
492 	PreparedStmtHeaders psh;
493 	Variant[] inParams;
494 	ParameterSpecialization[] psa;
495 }
496 
497 /++
498 Internal implementation for the exec and query functions.
499 
500 Execute a one-off SQL command.
501 
502 Any result set can be accessed via Connection.getNextRow(), but you should really be
503 using the query function for such queries.
504 
505 Params: ra = An out parameter to receive the number of rows affected.
506 Returns: true if there was a (possibly empty) result set.
507 +/
508 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra)
509 {
510 	scope(failure) conn.kill();
511 
512 	// Send data
513 	if(info.isPrepared)
514 		ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
515 	else
516 	{
517 		conn.sendCmd(CommandType.QUERY, info.sql);
518 		conn._fieldCount = 0;
519 	}
520 
521 	// Handle response
522 	ubyte[] packet = conn.getPacket();
523 	bool rv;
524 	if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
525 	{
526 		conn.resetPacket();
527 		auto okp = OKErrorPacket(packet);
528 		enforcePacketOK(okp);
529 		ra = okp.affected;
530 		conn._serverStatus = okp.serverStatus;
531 		conn._insertID = okp.insertID;
532 		rv = false;
533 	}
534 	else
535 	{
536 		// There was presumably a result set
537 		assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value
538 		conn._headersPending = conn._rowsPending = true;
539 		conn._binaryPending = info.isPrepared;
540 		auto lcb = packet.consumeIfComplete!LCB();
541 		assert(!lcb.isNull);
542 		assert(!lcb.isIncomplete);
543 		conn._fieldCount = cast(ushort)lcb.value;
544 		assert(conn._fieldCount == lcb.value);
545 		rv = true;
546 		ra = 0;
547 	}
548 	return rv;
549 }
550 
551 ///ditto
552 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info)
553 {
554 	ulong rowsAffected;
555 	return execQueryImpl(conn, info, rowsAffected);
556 }
557 
558 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId)
559 {
560 	scope(failure) conn.kill();
561 
562 	if(conn.closed())
563 		return;
564 
565 	ubyte[9] packet_buf;
566 	ubyte[] packet = packet_buf;
567 	packet.setPacketHeader(0/*packet number*/);
568 	conn.bumpPacket();
569 	packet[4] = CommandType.STMT_CLOSE;
570 	statementId.packInto(packet[5..9]);
571 	conn.purgeResult();
572 	conn._socket.send(packet);
573 	// It seems that the server does not find it necessary to send a response
574 	// for this command.
575 }
576 
577 // Moved here from `struct Row`
578 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure
579 {
580 	uint bitmapLength = calcBitmapLength(fieldCount);
581 	enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields");
582 	auto bitmap = packet.consume(bitmapLength);
583 	return decodeNullBitmap(bitmap, fieldCount);
584 }
585 
586 // Moved here from `struct Row`
587 private static uint calcBitmapLength(uint fieldCount) pure nothrow
588 {
589 	return (fieldCount+7+2)/8;
590 }
591 
592 // Moved here from `struct Row`
593 // This is to decode the bitmap in a binary result row. First two bits are skipped
594 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow
595 in
596 {
597 	assert(bitmap.length >= calcBitmapLength(numFields),
598 		"bitmap not large enough to store all null fields");
599 }
600 out(result)
601 {
602 	assert(result.length == numFields);
603 }
604 body
605 {
606 	bool[] nulls;
607 	nulls.length = numFields;
608 
609 	// the current byte we are processing for nulls
610 	ubyte bits = bitmap.front();
611 	// strip away the first two bits as they are reserved
612 	bits >>= 2;
613 	// .. and then we only have 6 bits left to process for this byte
614 	ubyte bitsLeftInByte = 6;
615 	foreach(ref isNull; nulls)
616 	{
617 		assert(bitsLeftInByte <= 8);
618 		// processed all bits? fetch new byte
619 		if (bitsLeftInByte == 0)
620 		{
621 			assert(bits == 0, "not all bits are processed!");
622 			assert(!bitmap.empty, "bits array too short for number of columns");
623 			bitmap.popFront();
624 			bits = bitmap.front;
625 			bitsLeftInByte = 8;
626 		}
627 		assert(bitsLeftInByte > 0);
628 		isNull = (bits & 0b0000_0001) != 0;
629 
630 		// get ready to process next bit
631 		bits >>= 1;
632 		--bitsLeftInByte;
633 	}
634 	return nulls;
635 }
636 
637 // Moved here from `struct Row.this`
638 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary,
639 	out Variant[] _values, out bool[] _nulls, out string[] _names)
640 in
641 {
642 	assert(rh.fieldCount <= uint.max);
643 }
644 body
645 {
646 	scope(failure) conn.kill();
647 
648 	uint fieldCount = cast(uint)rh.fieldCount;
649 	_values.length = _nulls.length = _names.length = fieldCount;
650 
651 	if(binary)
652 	{
653 		// There's a null byte header on a binary result sequence, followed by some bytes of bitmap
654 		// indicating which columns are null
655 		enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row");
656 		packet.popFront();
657 		_nulls = consumeNullBitmap(packet, fieldCount);
658 	}
659 
660 	foreach(size_t i; 0..fieldCount)
661 	{
662 		if(binary && _nulls[i])
663 		{
664 			_values[i] = null;
665 			continue;
666 		}
667 
668 		SQLValue sqlValue;
669 		do
670 		{
671 			FieldDescription fd = rh[i];
672 			_names[i] = fd.name;
673 			sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet);
674 			// TODO: Support chunk delegate
675 			if(sqlValue.isIncomplete)
676 				packet ~= conn.getPacket();
677 		} while(sqlValue.isIncomplete);
678 		assert(!sqlValue.isIncomplete);
679 
680 		if(sqlValue.isNull)
681 		{
682 			assert(!binary);
683 			assert(!_nulls[i]);
684 			_nulls[i] = true;
685 			_values[i] = null;
686 		}
687 		else
688 		{
689 			_values[i] = sqlValue.value;
690 		}
691 	}
692 }
693 
694 ////// Moved here from Connection /////////////////////////////////
695 
696 package(mysql) ubyte[] getPacket(Connection conn)
697 {
698 	scope(failure) conn.kill();
699 
700 	ubyte[4] header;
701 	conn._socket.read(header);
702 	// number of bytes always set as 24-bit
703 	uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
704 	enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order");
705 	conn.bumpPacket();
706 
707 	ubyte[] packet = new ubyte[numDataBytes];
708 	conn._socket.read(packet);
709 	assert(packet.length == numDataBytes, "Wrong number of bytes read");
710 	return packet;
711 }
712 
713 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet)
714 in
715 {
716 	assert(packet.length > 4); // at least 1 byte more than header
717 }
718 body
719 {
720 	_socket.write(packet);
721 }
722 
723 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data)
724 in
725 {
726 	assert(header.length == 4 || header.length == 5/*command type included*/);
727 }
728 body
729 {
730 	_socket.write(header);
731 	if(data.length)
732 		_socket.write(data);
733 }
734 
735 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
736 in
737 {
738 	// Internal thread states. Clients shouldn't use this
739 	assert(cmd != CommandType.SLEEP);
740 	assert(cmd != CommandType.CONNECT);
741 	assert(cmd != CommandType.TIME);
742 	assert(cmd != CommandType.DELAYED_INSERT);
743 	assert(cmd != CommandType.CONNECT_OUT);
744 
745 	// Deprecated
746 	assert(cmd != CommandType.CREATE_DB);
747 	assert(cmd != CommandType.DROP_DB);
748 	assert(cmd != CommandType.TABLE_DUMP);
749 
750 	// cannot send more than uint.max bytes. TODO: better error message if we try?
751 	assert(data.length <= uint.max);
752 }
753 out
754 {
755 	// at this point we should have sent a command
756 	assert(conn.pktNumber == 1);
757 }
758 body
759 {
760 	scope(failure) conn.kill();
761 
762 	conn._lastCommandID++;
763 
764 	if(!conn._socket.connected)
765 	{
766 		if(cmd == CommandType.QUIT)
767 			return; // Don't bother reopening connection just to quit
768 
769 		conn._open = Connection.OpenState.notConnected;
770 		conn.connect(conn._clientCapabilities);
771 	}
772 
773 	conn.autoPurge();
774  
775 	conn.resetPacket();
776 
777 	ubyte[] header;
778 	header.length = 4 /*header*/ + 1 /*cmd*/;
779 	header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
780 	header[4] = cmd;
781 	conn.bumpPacket();
782 
783 	conn._socket.send(header, cast(const(ubyte)[])data);
784 }
785 
786 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false)
787 {
788 	auto okp = OKErrorPacket(conn.getPacket());
789 	enforcePacketOK(okp);
790 	conn._serverStatus = okp.serverStatus;
791 	return okp;
792 }
793 
794 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token)
795 in
796 {
797 	assert(token.length == 20);
798 }
799 body
800 {
801 	ubyte[] packet;
802 	packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1);
803 	packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
804 
805 	// NOTE: we'll set the header last when we know the size
806 
807 	// Set the default capabilities required by the client
808 	conn._cCaps.packInto(packet[4..8]);
809 
810 	// Request a conventional maximum packet length.
811 	1.packInto(packet[8..12]);
812 
813 	packet ~= 33; // Set UTF-8 as default charSet
814 
815 	// There's a statutory block of zero bytes here - fill them in.
816 	foreach(i; 0 .. 23)
817 		packet ~= 0;
818 
819 	// Add the user name as a null terminated string
820 	foreach(i; 0 .. conn._user.length)
821 		packet ~= conn._user[i];
822 	packet ~= 0; // \0
823 
824 	// Add our calculated authentication token as a length prefixed string.
825 	assert(token.length <= ubyte.max);
826 	if(conn._pwd.length == 0)  // Omit the token if the account has no password
827 		packet ~= 0;
828 	else
829 	{
830 		packet ~= cast(ubyte)token.length;
831 		foreach(i; 0 .. token.length)
832 			packet ~= token[i];
833 	}
834 
835 	// Add the default database as a null terminated string
836 	foreach(i; 0 .. conn._db.length)
837 		packet ~= conn._db[i];
838 	packet ~= 0; // \0
839 
840 	// The server sent us a greeting with packet number 0, so we send the auth packet
841 	// back with the next number.
842 	packet.setPacketHeader(conn.pktNumber);
843 	conn.bumpPacket();
844 	return packet;
845 }
846 
847 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf)
848 {
849 	auto pass1 = sha1Of(cast(const(ubyte)[])password);
850 	auto pass2 = sha1Of(pass1);
851 
852 	SHA1 sha1;
853 	sha1.start();
854 	sha1.put(authBuf);
855 	sha1.put(pass2);
856 	auto result = sha1.finish();
857 	foreach (size_t i; 0..20)
858 		result[i] = result[i] ^ pass1[i];
859 	return result.dup;
860 }
861 
862 /// Get the next `mysql.result.Row` of a pending result set.
863 package(mysql) Row getNextRow(Connection conn)
864 {
865 	scope(failure) conn.kill();
866 
867 	if (conn._headersPending)
868 	{
869 		conn._rsh = ResultSetHeaders(conn, conn._fieldCount);
870 		conn._headersPending = false;
871 	}
872 	ubyte[] packet;
873 	Row rr;
874 	packet = conn.getPacket();
875 	if(packet.front == ResultPacketMarker.error)
876 		throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__);
877 
878 	if (packet.isEOFPacket())
879 	{
880 		conn._rowsPending = conn._binaryPending = false;
881 		return rr;
882 	}
883 	if (conn._binaryPending)
884 		rr = Row(conn, packet, conn._rsh, true);
885 	else
886 		rr = Row(conn, packet, conn._rsh, false);
887 	//rr._valid = true;
888 	return rr;
889 }
890 
891 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet)
892 {
893 	scope(failure) conn.kill();
894 
895 	conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
896 	conn._sCharSet = packet.consume!ubyte(); // server_language
897 	conn._serverStatus = packet.consume!ushort(); //server_status
898 	conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
899 	conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
900 
901 	enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
902 	enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
903 }
904 
905 package(mysql) ubyte[] parseGreeting(Connection conn)
906 {
907 	scope(failure) conn.kill();
908 
909 	ubyte[] packet = conn.getPacket();
910 
911 	if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
912 	{
913 		auto okp = OKErrorPacket(packet);
914 		enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
915 	}
916 
917 	conn._protocol = packet.consume!ubyte();
918 
919 	conn._serverVersion = packet.consume!string(packet.countUntil(0));
920 	packet.skip(1); // \0 terminated _serverVersion
921 
922 	conn._sThread = packet.consume!uint();
923 
924 	// read first part of scramble buf
925 	ubyte[] authBuf;
926 	authBuf.length = 255;
927 	authBuf[0..8] = packet.consume(8)[]; // scramble_buff
928 
929 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
930 
931 	conn.consumeServerInfo(packet);
932 
933 	packet.skip(1); // this byte supposed to be scramble length, but is actually zero
934 	packet.skip(10); // filler of \0
935 
936 	// rest of the scramble
937 	auto len = packet.countUntil(0);
938 	enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
939 	enforce(authBuf.length > 8+len);
940 	authBuf[8..8+len] = packet.consume(len)[];
941 	authBuf.length = 8+len; // cut to correct size
942 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
943 
944 	return authBuf;
945 }
946 
947 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
948 {
949 	SvrCapFlags common;
950 	uint filter = 1;
951 	foreach (size_t i; 0..uint.sizeof)
952 	{
953 		bool serverSupport = (server & filter) != 0; // can the server do this capability?
954 		bool clientSupport = (client & filter) != 0; // can we support it?
955 		if(serverSupport && clientSupport)
956 			common |= filter;
957 		filter <<= 1; // check next flag
958 	}
959 	return common;
960 }
961 
962 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags)
963 {
964 	auto cCaps = getCommonCapabilities(serverCaps, capFlags);
965 
966 	// We cannot operate in <4.1 protocol, so we'll force it even if the user
967 	// didn't supply it
968 	cCaps |= SvrCapFlags.PROTOCOL41;
969 	cCaps |= SvrCapFlags.SECURE_CONNECTION;
970 	
971 	return cCaps;
972 }
973 
974 package(mysql) void authenticate(Connection conn, ubyte[] greeting)
975 in
976 {
977 	assert(conn._open == Connection.OpenState.connected);
978 }
979 out
980 {
981 	assert(conn._open == Connection.OpenState.authenticated);
982 }
983 body
984 {
985 	auto token = makeToken(conn._pwd, greeting);
986 	auto authPacket = conn.buildAuthPacket(token);
987 	conn._socket.send(authPacket);
988 
989 	auto packet = conn.getPacket();
990 	auto okp = OKErrorPacket(packet);
991 	enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
992 	conn._open = Connection.OpenState.authenticated;
993 }
994 
995 // Register prepared statement
996 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql)
997 {
998 	scope(failure) conn.kill();
999 
1000 	PreparedServerInfo info;
1001 	
1002 	conn.sendCmd(CommandType.STMT_PREPARE, sql);
1003 	conn._fieldCount = 0;
1004 
1005 	ubyte[] packet = conn.getPacket();
1006 	if(packet.front == ResultPacketMarker.ok)
1007 	{
1008 		packet.popFront();
1009 		info.statementId    = packet.consume!int();
1010 		conn._fieldCount    = packet.consume!short();
1011 		info.numParams      = packet.consume!short();
1012 
1013 		packet.popFront(); // one byte filler
1014 		info.psWarnings     = packet.consume!short();
1015 
1016 		// At this point the server also sends field specs for parameters
1017 		// and columns if there were any of each
1018 		info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams);
1019 	}
1020 	else if(packet.front == ResultPacketMarker.error)
1021 	{
1022 		auto error = OKErrorPacket(packet);
1023 		enforcePacketOK(error);
1024 		assert(0); // FIXME: what now?
1025 	}
1026 	else
1027 		assert(0); // FIXME: what now?
1028 
1029 	return info;
1030 }
1031 
1032 /++
1033 Flush any outstanding result set elements.
1034 
1035 When the server responds to a command that produces a result set, it
1036 queues the whole set of corresponding packets over the current connection.
1037 Before that `Connection` can embark on any new command, it must receive
1038 all of those packets and junk them.
1039 
1040 As of v1.1.4, this is done automatically as needed. But you can still
1041 call this manually to force a purge to occur when you want.
1042 
1043 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/)
1044 +/
1045 package(mysql) ulong purgeResult(Connection conn)
1046 {
1047 	scope(failure) conn.kill();
1048 
1049 	conn._lastCommandID++;
1050 
1051 	ulong rows = 0;
1052 	if (conn._headersPending)
1053 	{
1054 		for (size_t i = 0;; i++)
1055 		{
1056 			if (conn.getPacket().isEOFPacket())
1057 			{
1058 				conn._headersPending = false;
1059 				break;
1060 			}
1061 			enforce!MYXProtocol(i < conn._fieldCount,
1062 				text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found."));
1063 		}
1064 	}
1065 	if (conn._rowsPending)
1066 	{
1067 		for (;;  rows++)
1068 		{
1069 			if (conn.getPacket().isEOFPacket())
1070 			{
1071 				conn._rowsPending = conn._binaryPending = false;
1072 				break;
1073 			}
1074 		}
1075 	}
1076 	conn.resetPacket();
1077 	return rows;
1078 }
1079 
1080 /++
1081 Get a textual report on the server status.
1082 
1083 (COM_STATISTICS)
1084 +/
1085 package(mysql) string serverStats(Connection conn)
1086 {
1087 	conn.sendCmd(CommandType.STATISTICS, []);
1088 	return cast(string) conn.getPacket();
1089 }
1090 
1091 /++
1092 Enable multiple statement commands.
1093 
1094 This can be used later if this feature was not requested in the client capability flags.
1095 
1096 Warning: This functionality is currently untested.
1097 
1098 Params: on = Boolean value to turn the capability on or off.
1099 +/
1100 //TODO: Need to test this
1101 package(mysql) void enableMultiStatements(Connection conn, bool on)
1102 {
1103 	scope(failure) conn.kill();
1104 
1105 	ubyte[] t;
1106 	t.length = 2;
1107 	t[0] = on ? 0 : 1;
1108 	t[1] = 0;
1109 	conn.sendCmd(CommandType.STMT_OPTION, t);
1110 
1111 	// For some reason this command gets an EOF packet as response
1112 	auto packet = conn.getPacket();
1113 	enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1114 }