1 /++
2 Internal - Low-level communications.
3 
4 Consider this module the main entry point for the low-level MySQL/MariaDB
5 protocol code. The other modules in `mysql.protocol` are mainly tools
6 to support this module.
7 
8 Previously, the code handling low-level protocol details was scattered all
9 across the library. Such functionality has been factored out into this module,
10 to be kept in one place for better encapsulation and to facilitate further
11 cleanup and refactoring.
12 
13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it
14 eventually settles into what will eventually become a low-level library
15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight...
16 
17 Next tasks for this sub-package's cleanup:
18 - Reduce this module's reliance on Connection.
19 - Abstract out a PacketStream to clean up getPacket and related functionality.
20 +/
21 module mysql.protocol.comms;
22 
23 import std.algorithm;
24 import std.conv;
25 import std.digest.sha;
26 import std.exception;
27 import std.range;
28 import std.variant;
29 
30 import mysql.connection;
31 import mysql.exceptions;
32 import mysql.prepared;
33 import mysql.result;
34 
35 import mysql.protocol.constants;
36 import mysql.protocol.extra_types;
37 import mysql.protocol.packet_helpers;
38 import mysql.protocol.packets;
39 import mysql.protocol.sockets;
40 
41 /// Low-level comms code relating to prepared statements.
42 package struct ProtocolPrepared
43 {
44 	import std.conv;
45 	import std.datetime;
46 	import std.variant;
47 	import mysql.types;
48 	
49 	static ubyte[] makeBitmap(in Variant[] inParams)
50 	{
51 		size_t bml = (inParams.length+7)/8;
52 		ubyte[] bma;
53 		bma.length = bml;
54 		foreach (i; 0..inParams.length)
55 		{
56 			if(inParams[i].type != typeid(typeof(null)))
57 				continue;
58 			size_t bn = i/8;
59 			size_t bb = i%8;
60 			ubyte sr = 1;
61 			sr <<= bb;
62 			bma[bn] |= sr;
63 		}
64 		return bma;
65 	}
66 
67 	static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow
68 	{
69 		ubyte[] prefix;
70 		prefix.length = 14;
71 
72 		prefix[4] = CommandType.STMT_EXECUTE;
73 		hStmt.packInto(prefix[5..9]);
74 		prefix[9] = flags;   // flags, no cursor
75 		prefix[10] = 1; // iteration count - currently always 1
76 		prefix[11] = 0;
77 		prefix[12] = 0;
78 		prefix[13] = 0;
79 
80 		return prefix;
81 	}
82 
83 	static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa,
84 		out ubyte[] vals, out bool longData)
85 	{
86 		size_t pc = inParams.length;
87 		ubyte[] types;
88 		types.length = pc*2;
89 		size_t alloc = pc*20;
90 		vals.length = alloc;
91 		uint vcl = 0, len;
92 		int ct = 0;
93 
94 		void reAlloc(size_t n)
95 		{
96 			if (vcl+n < alloc)
97 				return;
98 			size_t inc = (alloc*3)/2;
99 			if (inc <  n)
100 				inc = n;
101 			alloc += inc;
102 			vals.length = alloc;
103 		}
104 
105 		foreach (size_t i; 0..pc)
106 		{
107 			enum UNSIGNED  = 0x80;
108 			enum SIGNED    = 0;
109 			if (psa[i].chunkSize)
110 				longData= true;
111 			if (inParams[i].type == typeid(typeof(null)))
112 			{
113 				types[ct++] = SQLType.NULL;
114 				types[ct++] = SIGNED;
115 				continue;
116 			}
117 			Variant v = inParams[i];
118 			SQLType ext = psa[i].type;
119 			string ts = v.type.toString();
120 			bool isRef;
121 			if (ts[$-1] == '*')
122 			{
123 				ts.length = ts.length-1;
124 				isRef= true;
125 			}
126 
127 			switch (ts)
128 			{
129 				case "bool":
130 				case "const(bool)":
131 				case "immutable(bool)":
132 				case "shared(immutable(bool))":
133 					if (ext == SQLType.INFER_FROM_D_TYPE)
134 						types[ct++] = SQLType.BIT;
135 					else
136 						types[ct++] = cast(ubyte) ext;
137 					types[ct++] = SIGNED;
138 					reAlloc(2);
139 					bool bv = isRef? *(v.get!(const(bool*))): v.get!(const(bool));
140 					vals[vcl++] = 1;
141 					vals[vcl++] = bv? 0x31: 0x30;
142 					break;
143 				case "byte":
144 				case "const(byte)":
145 				case "immutable(byte)":
146 				case "shared(immutable(byte))":
147 					types[ct++] = SQLType.TINY;
148 					types[ct++] = SIGNED;
149 					reAlloc(1);
150 					vals[vcl++] = isRef? *(v.get!(const(byte*))): v.get!(const(byte));
151 					break;
152 				case "ubyte":
153 				case "const(ubyte)":
154 				case "immutable(ubyte)":
155 				case "shared(immutable(ubyte))":
156 					types[ct++] = SQLType.TINY;
157 					types[ct++] = UNSIGNED;
158 					reAlloc(1);
159 					vals[vcl++] = isRef? *(v.get!(const(ubyte*))): v.get!(const(ubyte));
160 					break;
161 				case "short":
162 				case "const(short)":
163 				case "immutable(short)":
164 				case "shared(immutable(short))":
165 					types[ct++] = SQLType.SHORT;
166 					types[ct++] = SIGNED;
167 					reAlloc(2);
168 					short si = isRef? *(v.get!(const(short*))): v.get!(const(short));
169 					vals[vcl++] = cast(ubyte) (si & 0xff);
170 					vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
171 					break;
172 				case "ushort":
173 				case "const(ushort)":
174 				case "immutable(ushort)":
175 				case "shared(immutable(ushort))":
176 					types[ct++] = SQLType.SHORT;
177 					types[ct++] = UNSIGNED;
178 					reAlloc(2);
179 					ushort us = isRef? *(v.get!(const(ushort*))): v.get!(const(ushort));
180 					vals[vcl++] = cast(ubyte) (us & 0xff);
181 					vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
182 					break;
183 				case "int":
184 				case "const(int)":
185 				case "immutable(int)":
186 				case "shared(immutable(int))":
187 					types[ct++] = SQLType.INT;
188 					types[ct++] = SIGNED;
189 					reAlloc(4);
190 					int ii = isRef? *(v.get!(const(int*))): v.get!(const(int));
191 					vals[vcl++] = cast(ubyte) (ii & 0xff);
192 					vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
193 					vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
194 					vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
195 					break;
196 				case "uint":
197 				case "const(uint)":
198 				case "immutable(uint)":
199 				case "shared(immutable(uint))":
200 					types[ct++] = SQLType.INT;
201 					types[ct++] = UNSIGNED;
202 					reAlloc(4);
203 					uint ui = isRef? *(v.get!(const(uint*))): v.get!(const(uint));
204 					vals[vcl++] = cast(ubyte) (ui & 0xff);
205 					vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
206 					vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
207 					vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
208 					break;
209 				case "long":
210 				case "const(long)":
211 				case "immutable(long)":
212 				case "shared(immutable(long))":
213 					types[ct++] = SQLType.LONGLONG;
214 					types[ct++] = SIGNED;
215 					reAlloc(8);
216 					long li = isRef? *(v.get!(const(long*))): v.get!(const(long));
217 					vals[vcl++] = cast(ubyte) (li & 0xff);
218 					vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
219 					vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
220 					vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
221 					vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
222 					vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
223 					vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
224 					vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
225 					break;
226 				case "ulong":
227 				case "const(ulong)":
228 				case "immutable(ulong)":
229 				case "shared(immutable(ulong))":
230 					types[ct++] = SQLType.LONGLONG;
231 					types[ct++] = UNSIGNED;
232 					reAlloc(8);
233 					ulong ul = isRef? *(v.get!(const(ulong*))): v.get!(const(ulong));
234 					vals[vcl++] = cast(ubyte) (ul & 0xff);
235 					vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
236 					vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
237 					vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
238 					vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
239 					vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
240 					vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
241 					vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
242 					break;
243 				case "float":
244 				case "const(float)":
245 				case "immutable(float)":
246 				case "shared(immutable(float))":
247 					types[ct++] = SQLType.FLOAT;
248 					types[ct++] = SIGNED;
249 					reAlloc(4);
250 					float f = isRef? *(v.get!(const(float*))): v.get!(const(float));
251 					ubyte* ubp = cast(ubyte*) &f;
252 					vals[vcl++] = *ubp++;
253 					vals[vcl++] = *ubp++;
254 					vals[vcl++] = *ubp++;
255 					vals[vcl++] = *ubp;
256 					break;
257 				case "double":
258 				case "const(double)":
259 				case "immutable(double)":
260 				case "shared(immutable(double))":
261 					types[ct++] = SQLType.DOUBLE;
262 					types[ct++] = SIGNED;
263 					reAlloc(8);
264 					double d = isRef? *(v.get!(const(double*))): v.get!(const(double));
265 					ubyte* ubp = cast(ubyte*) &d;
266 					vals[vcl++] = *ubp++;
267 					vals[vcl++] = *ubp++;
268 					vals[vcl++] = *ubp++;
269 					vals[vcl++] = *ubp++;
270 					vals[vcl++] = *ubp++;
271 					vals[vcl++] = *ubp++;
272 					vals[vcl++] = *ubp++;
273 					vals[vcl++] = *ubp;
274 					break;
275 				case "std.datetime.date.Date":
276 				case "const(std.datetime.date.Date)":
277 				case "immutable(std.datetime.date.Date)":
278 				case "shared(immutable(std.datetime.date.Date))":
279 
280 				case "std.datetime.Date":
281 				case "const(std.datetime.Date)":
282 				case "immutable(std.datetime.Date)":
283 				case "shared(immutable(std.datetime.Date))":
284 					types[ct++] = SQLType.DATE;
285 					types[ct++] = SIGNED;
286 					Date date = isRef? *(v.get!(const(Date*))): v.get!(const(Date));
287 					ubyte[] da = pack(date);
288 					size_t l = da.length;
289 					reAlloc(l);
290 					vals[vcl..vcl+l] = da[];
291 					vcl += l;
292 					break;
293 				case "std.datetime.TimeOfDay":
294 				case "const(std.datetime.TimeOfDay)":
295 				case "immutable(std.datetime.TimeOfDay)":
296 				case "shared(immutable(std.datetime.TimeOfDay))":
297 
298 				case "std.datetime.date.TimeOfDay":
299 				case "const(std.datetime.date.TimeOfDay)":
300 				case "immutable(std.datetime.date.TimeOfDay)":
301 				case "shared(immutable(std.datetime.date.TimeOfDay))":
302 
303 				case "std.datetime.Time":
304 				case "const(std.datetime.Time)":
305 				case "immutable(std.datetime.Time)":
306 				case "shared(immutable(std.datetime.Time))":
307 					types[ct++] = SQLType.TIME;
308 					types[ct++] = SIGNED;
309 					TimeOfDay time = isRef? *(v.get!(const(TimeOfDay*))): v.get!(const(TimeOfDay));
310 					ubyte[] ta = pack(time);
311 					size_t l = ta.length;
312 					reAlloc(l);
313 					vals[vcl..vcl+l] = ta[];
314 					vcl += l;
315 					break;
316 				case "std.datetime.date.DateTime":
317 				case "const(std.datetime.date.DateTime)":
318 				case "immutable(std.datetime.date.DateTime)":
319 				case "shared(immutable(std.datetime.date.DateTime))":
320 
321 				case "std.datetime.DateTime":
322 				case "const(std.datetime.DateTime)":
323 				case "immutable(std.datetime.DateTime)":
324 				case "shared(immutable(std.datetime.DateTime))":
325 					types[ct++] = SQLType.DATETIME;
326 					types[ct++] = SIGNED;
327 					DateTime dt = isRef? *(v.get!(const(DateTime*))): v.get!(const(DateTime));
328 					ubyte[] da = pack(dt);
329 					size_t l = da.length;
330 					reAlloc(l);
331 					vals[vcl..vcl+l] = da[];
332 					vcl += l;
333 					break;
334 				case "mysql.types.Timestamp":
335 				case "const(mysql.types.Timestamp)":
336 				case "immutable(mysql.types.Timestamp)":
337 				case "shared(immutable(mysql.types.Timestamp))":
338 					types[ct++] = SQLType.TIMESTAMP;
339 					types[ct++] = SIGNED;
340 					Timestamp tms = isRef? *(v.get!(const(Timestamp*))): v.get!(const(Timestamp));
341 					DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep);
342 					ubyte[] da = pack(dt);
343 					size_t l = da.length;
344 					reAlloc(l);
345 					vals[vcl..vcl+l] = da[];
346 					vcl += l;
347 					break;
348 				case "char[]":
349 				case "const(char[])":
350 				case "immutable(char[])":
351 				case "const(char)[]":
352 				case "immutable(char)[]":
353 				case "shared(immutable(char)[])":
354 				case "shared(immutable(char))[]":
355 				case "shared(immutable(char[]))":
356 					if (ext == SQLType.INFER_FROM_D_TYPE)
357 						types[ct++] = SQLType.VARCHAR;
358 					else
359 						types[ct++] = cast(ubyte) ext;
360 					types[ct++] = SIGNED;
361 					const char[] ca = isRef? *(v.get!(const(char[]*))): v.get!(const(char[]));
362 					ubyte[] packed = packLCS(cast(void[]) ca);
363 					reAlloc(packed.length);
364 					vals[vcl..vcl+packed.length] = packed[];
365 					vcl += packed.length;
366 					break;
367 				case "byte[]":
368 				case "const(byte[])":
369 				case "immutable(byte[])":
370 				case "const(byte)[]":
371 				case "immutable(byte)[]":
372 				case "shared(immutable(byte)[])":
373 				case "shared(immutable(byte))[]":
374 				case "shared(immutable(byte[]))":
375 					if (ext == SQLType.INFER_FROM_D_TYPE)
376 						types[ct++] = SQLType.TINYBLOB;
377 					else
378 						types[ct++] = cast(ubyte) ext;
379 					types[ct++] = SIGNED;
380 					const byte[] ba = isRef? *(v.get!(const(byte[]*))): v.get!(const(byte[]));
381 					ubyte[] packed = packLCS(cast(void[]) ba);
382 					reAlloc(packed.length);
383 					vals[vcl..vcl+packed.length] = packed[];
384 					vcl += packed.length;
385 					break;
386 				case "ubyte[]":
387 				case "const(ubyte[])":
388 				case "immutable(ubyte[])":
389 				case "const(ubyte)[]":
390 				case "immutable(ubyte)[]":
391 				case "shared(immutable(ubyte)[])":
392 				case "shared(immutable(ubyte))[]":
393 				case "shared(immutable(ubyte[]))":
394 					if (ext == SQLType.INFER_FROM_D_TYPE)
395 						types[ct++] = SQLType.TINYBLOB;
396 					else
397 						types[ct++] = cast(ubyte) ext;
398 					types[ct++] = SIGNED;
399 					const ubyte[] uba = isRef? *(v.get!(const(ubyte[]*))): v.get!(const(ubyte[]));
400 					ubyte[] packed = packLCS(cast(void[]) uba);
401 					reAlloc(packed.length);
402 					vals[vcl..vcl+packed.length] = packed[];
403 					vcl += packed.length;
404 					break;
405 				case "void":
406 					throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
407 				default:
408 					throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__);
409 			}
410 		}
411 		vals.length = vcl;
412 		return types;
413 	}
414 
415 	static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa)
416 	{
417 		assert(psa.length <= ushort.max); // parameter number is sent as short
418 		foreach (ushort i, PSN psn; psa)
419 		{
420 			if (!psn.chunkSize) continue;
421 			uint cs = psn.chunkSize;
422 			uint delegate(ubyte[]) dg = psn.chunkDelegate;
423 
424 			ubyte[] chunk;
425 			chunk.length = cs+11;
426 			chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
427 			chunk[4] = CommandType.STMT_SEND_LONG_DATA;
428 			hStmt.packInto(chunk[5..9]); // statement handle
429 			packInto(i, chunk[9..11]); // parameter number
430 
431 			// byte 11 on is payload
432 			for (;;)
433 			{
434 				uint sent = dg(chunk[11..cs+11]);
435 				if (sent < cs)
436 				{
437 					if (sent == 0)    // data was exact multiple of chunk size - all sent
438 						break;
439 					chunk.length = chunk.length - (cs-sent);     // trim the chunk
440 					sent += 7;        // adjust for non-payload bytes
441 					packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
442 					socket.send(chunk);
443 					break;
444 				}
445 				socket.send(chunk);
446 			}
447 		}
448 	}
449 
450 	static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
451 		Variant[] inParams, ParameterSpecialization[] psa)
452 	{
453 		conn.autoPurge();
454 		
455 		ubyte[] packet;
456 		conn.resetPacket();
457 
458 		ubyte[] prefix = makePSPrefix(hStmt, 0);
459 		size_t len = prefix.length;
460 		bool longData;
461 
462 		if (psh.paramCount)
463 		{
464 			ubyte[] one = [ 1 ];
465 			ubyte[] vals;
466 			ubyte[] types = analyseParams(inParams, psa, vals, longData);
467 			ubyte[] nbm = makeBitmap(inParams);
468 			packet = prefix ~ nbm ~ one ~ types ~ vals;
469 		}
470 		else
471 			packet = prefix;
472 
473 		if (longData)
474 			sendLongData(conn._socket, hStmt, psa);
475 
476 		assert(packet.length <= uint.max);
477 		packet.setPacketHeader(conn.pktNumber);
478 		conn.bumpPacket();
479 		conn._socket.send(packet);
480 	}
481 }
482 
483 package(mysql) struct ExecQueryImplInfo
484 {
485 	bool isPrepared;
486 
487 	// For non-prepared statements:
488 	const(char[]) sql;
489 
490 	// For prepared statements:
491 	uint hStmt;
492 	PreparedStmtHeaders psh;
493 	Variant[] inParams;
494 	ParameterSpecialization[] psa;
495 }
496 
497 /++
498 Internal implementation for the exec and query functions.
499 
500 Execute a one-off SQL command.
501 
502 Any result set can be accessed via Connection.getNextRow(), but you should really be
503 using the query function for such queries.
504 
505 Params: ra = An out parameter to receive the number of rows affected.
506 Returns: true if there was a (possibly empty) result set.
507 +/
508 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra)
509 {
510 	scope(failure) conn.kill();
511 
512 	// Send data
513 	if(info.isPrepared)
514 		ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
515 	else
516 	{
517 		conn.sendCmd(CommandType.QUERY, info.sql);
518 		conn._fieldCount = 0;
519 	}
520 
521 	// Handle response
522 	ubyte[] packet = conn.getPacket();
523 	bool rv;
524 	if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
525 	{
526 		conn.resetPacket();
527 		auto okp = OKErrorPacket(packet);
528 		enforcePacketOK(okp);
529 		ra = okp.affected;
530 		conn._serverStatus = okp.serverStatus;
531 		conn._insertID = okp.insertID;
532 		rv = false;
533 	}
534 	else
535 	{
536 		// There was presumably a result set
537 		assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value
538 		conn._headersPending = conn._rowsPending = true;
539 		conn._binaryPending = info.isPrepared;
540 		auto lcb = packet.consumeIfComplete!LCB();
541 		assert(!lcb.isNull);
542 		assert(!lcb.isIncomplete);
543 		conn._fieldCount = cast(ushort)lcb.value;
544 		assert(conn._fieldCount == lcb.value);
545 		rv = true;
546 		ra = 0;
547 	}
548 	return rv;
549 }
550 
551 ///ditto
552 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info)
553 {
554 	ulong rowsAffected;
555 	return execQueryImpl(conn, info, rowsAffected);
556 }
557 
558 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId)
559 {
560 	scope(failure) conn.kill();
561 
562 	if(conn.closed())
563 		return;
564 
565 	ubyte[9] packet_buf;
566 	ubyte[] packet = packet_buf;
567 	packet.setPacketHeader(0/*packet number*/);
568 	conn.bumpPacket();
569 	packet[4] = CommandType.STMT_CLOSE;
570 	statementId.packInto(packet[5..9]);
571 	conn.purgeResult();
572 	conn._socket.send(packet);
573 	// It seems that the server does not find it necessary to send a response
574 	// for this command.
575 }
576 
577 // Moved here from `struct Row`
578 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure
579 {
580 	uint bitmapLength = calcBitmapLength(fieldCount);
581 	enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields");
582 	auto bitmap = packet.consume(bitmapLength);
583 	return decodeNullBitmap(bitmap, fieldCount);
584 }
585 
586 // Moved here from `struct Row`
587 private static uint calcBitmapLength(uint fieldCount) pure nothrow
588 {
589 	return (fieldCount+7+2)/8;
590 }
591 
592 // Moved here from `struct Row`
593 // This is to decode the bitmap in a binary result row. First two bits are skipped
594 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow
595 in
596 {
597 	assert(bitmap.length >= calcBitmapLength(numFields),
598 		"bitmap not large enough to store all null fields");
599 }
600 out(result)
601 {
602 	assert(result.length == numFields);
603 }
604 body
605 {
606 	bool[] nulls;
607 	nulls.length = numFields;
608 
609 	// the current byte we are processing for nulls
610 	ubyte bits = bitmap.front();
611 	// strip away the first two bits as they are reserved
612 	bits >>= 2;
613 	// .. and then we only have 6 bits left to process for this byte
614 	ubyte bitsLeftInByte = 6;
615 	foreach(ref isNull; nulls)
616 	{
617 		assert(bitsLeftInByte <= 8);
618 		// processed all bits? fetch new byte
619 		if (bitsLeftInByte == 0)
620 		{
621 			assert(bits == 0, "not all bits are processed!");
622 			assert(!bitmap.empty, "bits array too short for number of columns");
623 			bitmap.popFront();
624 			bits = bitmap.front;
625 			bitsLeftInByte = 8;
626 		}
627 		assert(bitsLeftInByte > 0);
628 		isNull = (bits & 0b0000_0001) != 0;
629 
630 		// get ready to process next bit
631 		bits >>= 1;
632 		--bitsLeftInByte;
633 	}
634 	return nulls;
635 }
636 
637 // Moved here from `struct Row.this`
638 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary,
639 	out Variant[] _values, out bool[] _nulls)
640 in
641 {
642 	assert(rh.fieldCount <= uint.max);
643 }
644 body
645 {
646 	scope(failure) conn.kill();
647 
648 	uint fieldCount = cast(uint)rh.fieldCount;
649 	_values.length = _nulls.length = fieldCount;
650 
651 	if(binary)
652 	{
653 		// There's a null byte header on a binary result sequence, followed by some bytes of bitmap
654 		// indicating which columns are null
655 		enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row");
656 		packet.popFront();
657 		_nulls = consumeNullBitmap(packet, fieldCount);
658 	}
659 
660 	foreach(size_t i; 0..fieldCount)
661 	{
662 		if(binary && _nulls[i])
663 		{
664 			_values[i] = null;
665 			continue;
666 		}
667 
668 		SQLValue sqlValue;
669 		do
670 		{
671 			FieldDescription fd = rh[i];
672 			sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet);
673 			// TODO: Support chunk delegate
674 			if(sqlValue.isIncomplete)
675 				packet ~= conn.getPacket();
676 		} while(sqlValue.isIncomplete);
677 		assert(!sqlValue.isIncomplete);
678 
679 		if(sqlValue.isNull)
680 		{
681 			assert(!binary);
682 			assert(!_nulls[i]);
683 			_nulls[i] = true;
684 			_values[i] = null;
685 		}
686 		else
687 		{
688 			_values[i] = sqlValue.value;
689 		}
690 	}
691 }
692 
693 ////// Moved here from Connection /////////////////////////////////
694 
695 package(mysql) ubyte[] getPacket(Connection conn)
696 {
697 	scope(failure) conn.kill();
698 
699 	ubyte[4] header;
700 	conn._socket.read(header);
701 	// number of bytes always set as 24-bit
702 	uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
703 	enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order");
704 	conn.bumpPacket();
705 
706 	ubyte[] packet = new ubyte[numDataBytes];
707 	conn._socket.read(packet);
708 	assert(packet.length == numDataBytes, "Wrong number of bytes read");
709 	return packet;
710 }
711 
712 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet)
713 in
714 {
715 	assert(packet.length > 4); // at least 1 byte more than header
716 }
717 body
718 {
719 	_socket.write(packet);
720 }
721 
722 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data)
723 in
724 {
725 	assert(header.length == 4 || header.length == 5/*command type included*/);
726 }
727 body
728 {
729 	_socket.write(header);
730 	if(data.length)
731 		_socket.write(data);
732 }
733 
734 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
735 in
736 {
737 	// Internal thread states. Clients shouldn't use this
738 	assert(cmd != CommandType.SLEEP);
739 	assert(cmd != CommandType.CONNECT);
740 	assert(cmd != CommandType.TIME);
741 	assert(cmd != CommandType.DELAYED_INSERT);
742 	assert(cmd != CommandType.CONNECT_OUT);
743 
744 	// Deprecated
745 	assert(cmd != CommandType.CREATE_DB);
746 	assert(cmd != CommandType.DROP_DB);
747 	assert(cmd != CommandType.TABLE_DUMP);
748 
749 	// cannot send more than uint.max bytes. TODO: better error message if we try?
750 	assert(data.length <= uint.max);
751 }
752 out
753 {
754 	// at this point we should have sent a command
755 	assert(conn.pktNumber == 1);
756 }
757 body
758 {
759 	scope(failure) conn.kill();
760 
761 	conn._lastCommandID++;
762 
763 	if(!conn._socket.connected)
764 	{
765 		if(cmd == CommandType.QUIT)
766 			return; // Don't bother reopening connection just to quit
767 
768 		conn._open = Connection.OpenState.notConnected;
769 		conn.connect(conn._clientCapabilities);
770 	}
771 
772 	conn.autoPurge();
773  
774 	conn.resetPacket();
775 
776 	ubyte[] header;
777 	header.length = 4 /*header*/ + 1 /*cmd*/;
778 	header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
779 	header[4] = cmd;
780 	conn.bumpPacket();
781 
782 	conn._socket.send(header, cast(const(ubyte)[])data);
783 }
784 
785 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false)
786 {
787 	auto okp = OKErrorPacket(conn.getPacket());
788 	enforcePacketOK(okp);
789 	conn._serverStatus = okp.serverStatus;
790 	return okp;
791 }
792 
793 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token)
794 in
795 {
796 	assert(token.length == 20);
797 }
798 body
799 {
800 	ubyte[] packet;
801 	packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1);
802 	packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
803 
804 	// NOTE: we'll set the header last when we know the size
805 
806 	// Set the default capabilities required by the client
807 	conn._cCaps.packInto(packet[4..8]);
808 
809 	// Request a conventional maximum packet length.
810 	1.packInto(packet[8..12]);
811 
812 	packet ~= 33; // Set UTF-8 as default charSet
813 
814 	// There's a statutory block of zero bytes here - fill them in.
815 	foreach(i; 0 .. 23)
816 		packet ~= 0;
817 
818 	// Add the user name as a null terminated string
819 	foreach(i; 0 .. conn._user.length)
820 		packet ~= conn._user[i];
821 	packet ~= 0; // \0
822 
823 	// Add our calculated authentication token as a length prefixed string.
824 	assert(token.length <= ubyte.max);
825 	if(conn._pwd.length == 0)  // Omit the token if the account has no password
826 		packet ~= 0;
827 	else
828 	{
829 		packet ~= cast(ubyte)token.length;
830 		foreach(i; 0 .. token.length)
831 			packet ~= token[i];
832 	}
833 
834 	// Add the default database as a null terminated string
835 	foreach(i; 0 .. conn._db.length)
836 		packet ~= conn._db[i];
837 	packet ~= 0; // \0
838 
839 	// The server sent us a greeting with packet number 0, so we send the auth packet
840 	// back with the next number.
841 	packet.setPacketHeader(conn.pktNumber);
842 	conn.bumpPacket();
843 	return packet;
844 }
845 
846 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf)
847 {
848 	auto pass1 = sha1Of(cast(const(ubyte)[])password);
849 	auto pass2 = sha1Of(pass1);
850 
851 	SHA1 sha1;
852 	sha1.start();
853 	sha1.put(authBuf);
854 	sha1.put(pass2);
855 	auto result = sha1.finish();
856 	foreach (size_t i; 0..20)
857 		result[i] = result[i] ^ pass1[i];
858 	return result.dup;
859 }
860 
861 /// Get the next `mysql.result.Row` of a pending result set.
862 package(mysql) Row getNextRow(Connection conn)
863 {
864 	scope(failure) conn.kill();
865 
866 	if (conn._headersPending)
867 	{
868 		conn._rsh = ResultSetHeaders(conn, conn._fieldCount);
869 		conn._headersPending = false;
870 	}
871 	ubyte[] packet;
872 	Row rr;
873 	packet = conn.getPacket();
874 	if(packet.front == ResultPacketMarker.error)
875 		throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__);
876 
877 	if (packet.isEOFPacket())
878 	{
879 		conn._rowsPending = conn._binaryPending = false;
880 		return rr;
881 	}
882 	if (conn._binaryPending)
883 		rr = Row(conn, packet, conn._rsh, true);
884 	else
885 		rr = Row(conn, packet, conn._rsh, false);
886 	//rr._valid = true;
887 	return rr;
888 }
889 
890 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet)
891 {
892 	scope(failure) conn.kill();
893 
894 	conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
895 	conn._sCharSet = packet.consume!ubyte(); // server_language
896 	conn._serverStatus = packet.consume!ushort(); //server_status
897 	conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
898 	conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
899 
900 	enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
901 	enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
902 }
903 
904 package(mysql) ubyte[] parseGreeting(Connection conn)
905 {
906 	scope(failure) conn.kill();
907 
908 	ubyte[] packet = conn.getPacket();
909 
910 	if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
911 	{
912 		auto okp = OKErrorPacket(packet);
913 		enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
914 	}
915 
916 	conn._protocol = packet.consume!ubyte();
917 
918 	conn._serverVersion = packet.consume!string(packet.countUntil(0));
919 	packet.skip(1); // \0 terminated _serverVersion
920 
921 	conn._sThread = packet.consume!uint();
922 
923 	// read first part of scramble buf
924 	ubyte[] authBuf;
925 	authBuf.length = 255;
926 	authBuf[0..8] = packet.consume(8)[]; // scramble_buff
927 
928 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
929 
930 	conn.consumeServerInfo(packet);
931 
932 	packet.skip(1); // this byte supposed to be scramble length, but is actually zero
933 	packet.skip(10); // filler of \0
934 
935 	// rest of the scramble
936 	auto len = packet.countUntil(0);
937 	enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
938 	enforce(authBuf.length > 8+len);
939 	authBuf[8..8+len] = packet.consume(len)[];
940 	authBuf.length = 8+len; // cut to correct size
941 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
942 
943 	return authBuf;
944 }
945 
946 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
947 {
948 	SvrCapFlags common;
949 	uint filter = 1;
950 	foreach (size_t i; 0..uint.sizeof)
951 	{
952 		bool serverSupport = (server & filter) != 0; // can the server do this capability?
953 		bool clientSupport = (client & filter) != 0; // can we support it?
954 		if(serverSupport && clientSupport)
955 			common |= filter;
956 		filter <<= 1; // check next flag
957 	}
958 	return common;
959 }
960 
961 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags)
962 {
963 	auto cCaps = getCommonCapabilities(serverCaps, capFlags);
964 
965 	// We cannot operate in <4.1 protocol, so we'll force it even if the user
966 	// didn't supply it
967 	cCaps |= SvrCapFlags.PROTOCOL41;
968 	cCaps |= SvrCapFlags.SECURE_CONNECTION;
969 	
970 	return cCaps;
971 }
972 
973 package(mysql) void authenticate(Connection conn, ubyte[] greeting)
974 in
975 {
976 	assert(conn._open == Connection.OpenState.connected);
977 }
978 out
979 {
980 	assert(conn._open == Connection.OpenState.authenticated);
981 }
982 body
983 {
984 	auto token = makeToken(conn._pwd, greeting);
985 	auto authPacket = conn.buildAuthPacket(token);
986 	conn._socket.send(authPacket);
987 
988 	auto packet = conn.getPacket();
989 	auto okp = OKErrorPacket(packet);
990 	enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
991 	conn._open = Connection.OpenState.authenticated;
992 }
993 
994 // Register prepared statement
995 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql)
996 {
997 	scope(failure) conn.kill();
998 
999 	PreparedServerInfo info;
1000 	
1001 	conn.sendCmd(CommandType.STMT_PREPARE, sql);
1002 	conn._fieldCount = 0;
1003 
1004 	ubyte[] packet = conn.getPacket();
1005 	if(packet.front == ResultPacketMarker.ok)
1006 	{
1007 		packet.popFront();
1008 		info.statementId    = packet.consume!int();
1009 		conn._fieldCount    = packet.consume!short();
1010 		info.numParams      = packet.consume!short();
1011 
1012 		packet.popFront(); // one byte filler
1013 		info.psWarnings     = packet.consume!short();
1014 
1015 		// At this point the server also sends field specs for parameters
1016 		// and columns if there were any of each
1017 		info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams);
1018 	}
1019 	else if(packet.front == ResultPacketMarker.error)
1020 	{
1021 		auto error = OKErrorPacket(packet);
1022 		enforcePacketOK(error);
1023 		assert(0); // FIXME: what now?
1024 	}
1025 	else
1026 		assert(0); // FIXME: what now?
1027 
1028 	return info;
1029 }
1030 
1031 /++
1032 Flush any outstanding result set elements.
1033 
1034 When the server responds to a command that produces a result set, it
1035 queues the whole set of corresponding packets over the current connection.
1036 Before that `Connection` can embark on any new command, it must receive
1037 all of those packets and junk them.
1038 
1039 As of v1.1.4, this is done automatically as needed. But you can still
1040 call this manually to force a purge to occur when you want.
1041 
1042 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/)
1043 +/
1044 package(mysql) ulong purgeResult(Connection conn)
1045 {
1046 	scope(failure) conn.kill();
1047 
1048 	conn._lastCommandID++;
1049 
1050 	ulong rows = 0;
1051 	if (conn._headersPending)
1052 	{
1053 		for (size_t i = 0;; i++)
1054 		{
1055 			if (conn.getPacket().isEOFPacket())
1056 			{
1057 				conn._headersPending = false;
1058 				break;
1059 			}
1060 			enforce!MYXProtocol(i < conn._fieldCount,
1061 				text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found."));
1062 		}
1063 	}
1064 	if (conn._rowsPending)
1065 	{
1066 		for (;;  rows++)
1067 		{
1068 			if (conn.getPacket().isEOFPacket())
1069 			{
1070 				conn._rowsPending = conn._binaryPending = false;
1071 				break;
1072 			}
1073 		}
1074 	}
1075 	conn.resetPacket();
1076 	return rows;
1077 }
1078 
1079 /++
1080 Get a textual report on the server status.
1081 
1082 (COM_STATISTICS)
1083 +/
1084 package(mysql) string serverStats(Connection conn)
1085 {
1086 	conn.sendCmd(CommandType.STATISTICS, []);
1087 	return cast(string) conn.getPacket();
1088 }
1089 
1090 /++
1091 Enable multiple statement commands.
1092 
1093 This can be used later if this feature was not requested in the client capability flags.
1094 
1095 Warning: This functionality is currently untested.
1096 
1097 Params: on = Boolean value to turn the capability on or off.
1098 +/
1099 //TODO: Need to test this
1100 package(mysql) void enableMultiStatements(Connection conn, bool on)
1101 {
1102 	scope(failure) conn.kill();
1103 
1104 	ubyte[] t;
1105 	t.length = 2;
1106 	t[0] = on ? 0 : 1;
1107 	t[1] = 0;
1108 	conn.sendCmd(CommandType.STMT_OPTION, t);
1109 
1110 	// For some reason this command gets an EOF packet as response
1111 	auto packet = conn.getPacket();
1112 	enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1113 }