1 /++
2 Internal - Low-level communications.
3 
4 Consider this module the main entry point for the low-level MySQL/MariaDB
5 protocol code. The other modules in `mysql.protocol` are mainly tools
6 to support this module.
7 
8 Previously, the code handling low-level protocol details was scattered all
9 across the library. Such functionality has been factored out into this module,
10 to be kept in one place for better encapsulation and to facilitate further
11 cleanup and refactoring.
12 
13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it
14 eventually settles into what will eventually become a low-level library
15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight...
16 
17 Next tasks for this sub-package's cleanup:
18 - Reduce this module's reliance on Connection.
19 - Abstract out a PacketStream to clean up getPacket and related functionality.
20 +/
21 module mysql.protocol.comms;
22 
23 import std.algorithm;
24 import std.conv;
25 import std.digest.sha;
26 import std.exception;
27 import std.range;
28 
29 import mysql.connection;
30 import mysql.exceptions;
31 import mysql.logger;
32 import mysql.safe.prepared;
33 import mysql.result;
34 import mysql.types;
35 
36 import mysql.protocol.constants;
37 import mysql.protocol.extra_types;
38 import mysql.protocol.packet_helpers;
39 import mysql.protocol.packets;
40 import mysql.protocol.sockets;
41 
42 @safe:
43 
44 /// Low-level comms code relating to prepared statements.
45 package struct ProtocolPrepared
46 {
47 	@safe:
48 	import std.conv;
49 	import std.datetime;
50 	import mysql.types;
51 
52 	static ubyte[] makeBitmap(in MySQLVal[] inParams)
53 	{
54 		size_t bml = (inParams.length+7)/8;
55 		ubyte[] bma;
56 		bma.length = bml;
57 		foreach (i; 0..inParams.length)
58 		{
59 			if(inParams[i].kind != MySQLVal.Kind.Null)
60 				continue;
61 			size_t bn = i/8;
62 			size_t bb = i%8;
63 			ubyte sr = 1;
64 			sr <<= bb;
65 			bma[bn] |= sr;
66 		}
67 		return bma;
68 	}
69 
70 	static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow
71 	{
72 		ubyte[] prefix;
73 		prefix.length = 14;
74 
75 		prefix[4] = CommandType.STMT_EXECUTE;
76 		hStmt.packInto(prefix[5..9]);
77 		prefix[9] = flags;   // flags, no cursor
78 		prefix[10] = 1; // iteration count - currently always 1
79 		prefix[11] = 0;
80 		prefix[12] = 0;
81 		prefix[13] = 0;
82 
83 		return prefix;
84 	}
85 
86 	static ubyte[] analyseParams(MySQLVal[] inParams, ParameterSpecialization[] psa,
87 		out ubyte[] vals, out bool longData)
88 	{
89 		import taggedalgebraic.taggedalgebraic : get;
90 
91 		size_t pc = inParams.length;
92 		ubyte[] types;
93 		types.length = pc*2;
94 		size_t alloc = pc*20;
95 		vals.length = alloc;
96 		uint vcl = 0, len;
97 		int ct = 0;
98 
99 		void reAlloc(size_t n)
100 		{
101 			if (vcl+n < alloc)
102 				return;
103 			size_t inc = (alloc*3)/2;
104 			if (inc <  n)
105 				inc = n;
106 			alloc += inc;
107 			vals.length = alloc;
108 		}
109 
110 		foreach (size_t i; 0..pc)
111 		{
112 			enum UNSIGNED  = 0x80;
113 			enum SIGNED    = 0;
114 			if (psa[i].chunkSize)
115 				longData= true;
116 			if (inParams[i].kind == MySQLVal.Kind.Null)
117 			{
118 				types[ct++] = SQLType.NULL;
119 				types[ct++] = SIGNED;
120 				continue;
121 			}
122 			MySQLVal v = inParams[i];
123 			SQLType ext = psa[i].type;
124 			auto ts = v.kind;
125 			bool isRef = false;
126 
127 			// TODO: use v.visit instead for more efficiency and shorter code.
128 			with(MySQLVal.Kind) final switch (ts)
129 			{
130 				case BitRef:
131 					isRef = true; goto case;
132 				case Bit:
133 					if (ext == SQLType.INFER_FROM_D_TYPE)
134 						types[ct++] = SQLType.BIT;
135 					else
136 						types[ct++] = cast(ubyte) ext;
137 					types[ct++] = SIGNED;
138 					reAlloc(2);
139 					bool bv = isRef? *v.get!BitRef : v.get!Bit;
140 					vals[vcl++] = 1;
141 					vals[vcl++] = bv? 0x31: 0x30;
142 					break;
143 				case ByteRef:
144 					isRef = true; goto case;
145 				case Byte:
146 					types[ct++] = SQLType.TINY;
147 					types[ct++] = SIGNED;
148 					reAlloc(1);
149 					vals[vcl++] = isRef? *v.get!ByteRef : v.get!Byte;
150 					break;
151 				case UByteRef:
152 					isRef = true; goto case;
153 				case UByte:
154 					types[ct++] = SQLType.TINY;
155 					types[ct++] = UNSIGNED;
156 					reAlloc(1);
157 					vals[vcl++] = isRef? *v.get!UByteRef : v.get!UByte;
158 					break;
159 				case ShortRef:
160 					isRef = true; goto case;
161 				case Short:
162 					types[ct++] = SQLType.SHORT;
163 					types[ct++] = SIGNED;
164 					reAlloc(2);
165 					short si = isRef? *v.get!ShortRef : v.get!Short;
166 					vals[vcl++] = cast(ubyte) (si & 0xff);
167 					vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
168 					break;
169 				case UShortRef:
170 					isRef = true; goto case;
171 				case UShort:
172 					types[ct++] = SQLType.SHORT;
173 					types[ct++] = UNSIGNED;
174 					reAlloc(2);
175 					ushort us = isRef? *v.get!UShortRef : v.get!UShort;
176 					vals[vcl++] = cast(ubyte) (us & 0xff);
177 					vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
178 					break;
179 				case IntRef:
180 					isRef = true; goto case;
181 				case Int:
182 					types[ct++] = SQLType.INT;
183 					types[ct++] = SIGNED;
184 					reAlloc(4);
185 					int ii = isRef? *v.get!IntRef : v.get!Int;
186 					vals[vcl++] = cast(ubyte) (ii & 0xff);
187 					vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
188 					vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
189 					vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
190 					break;
191 				case UIntRef:
192 					isRef = true; goto case;
193 				case UInt:
194 					types[ct++] = SQLType.INT;
195 					types[ct++] = UNSIGNED;
196 					reAlloc(4);
197 					uint ui = isRef? *v.get!UIntRef : v.get!UInt;
198 					vals[vcl++] = cast(ubyte) (ui & 0xff);
199 					vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
200 					vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
201 					vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
202 					break;
203 				case LongRef:
204 					isRef = true; goto case;
205 				case Long:
206 					types[ct++] = SQLType.LONGLONG;
207 					types[ct++] = SIGNED;
208 					reAlloc(8);
209 					long li = isRef? *v.get!LongRef : v.get!Long;
210 					vals[vcl++] = cast(ubyte) (li & 0xff);
211 					vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
212 					vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
213 					vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
214 					vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
215 					vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
216 					vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
217 					vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
218 					break;
219 				case ULongRef:
220 					isRef = true; goto case;
221 				case ULong:
222 					types[ct++] = SQLType.LONGLONG;
223 					types[ct++] = UNSIGNED;
224 					reAlloc(8);
225 					ulong ul = isRef? *v.get!ULongRef : v.get!ULong;
226 					vals[vcl++] = cast(ubyte) (ul & 0xff);
227 					vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
228 					vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
229 					vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
230 					vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
231 					vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
232 					vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
233 					vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
234 					break;
235 				case FloatRef:
236 					isRef = true; goto case;
237 				case Float:
238 					types[ct++] = SQLType.FLOAT;
239 					types[ct++] = SIGNED;
240 					reAlloc(4);
241 					float[1] f = isRef? *v.get!FloatRef : v.get!Float;
242 					ubyte[] uba = cast(ubyte[]) f[];
243 					vals[vcl .. vcl + uba.length] = uba[];
244 					vcl += uba.length;
245 					break;
246 				case DoubleRef:
247 					isRef = true; goto case;
248 				case Double:
249 					types[ct++] = SQLType.DOUBLE;
250 					types[ct++] = SIGNED;
251 					reAlloc(8);
252 					double[1] d = isRef? *v.get!DoubleRef : v.get!Double;
253 					ubyte[] uba = cast(ubyte[]) d[];
254 					vals[vcl .. uba.length] = uba[];
255 					vcl += uba.length;
256 					break;
257 				case DateRef:
258 					isRef = true; goto case;
259 				case Date:
260 					types[ct++] = SQLType.DATE;
261 					types[ct++] = SIGNED;
262 					auto date = isRef? *v.get!DateRef : v.get!Date;
263 					ubyte[] da = pack(date);
264 					size_t l = da.length;
265 					reAlloc(l);
266 					vals[vcl..vcl+l] = da[];
267 					vcl += l;
268 					break;
269 				case TimeRef:
270 					isRef = true; goto case;
271 				case Time:
272 					types[ct++] = SQLType.TIME;
273 					types[ct++] = SIGNED;
274 					auto time = isRef? *v.get!TimeRef : v.get!Time;
275 					ubyte[] ta = pack(time);
276 					size_t l = ta.length;
277 					reAlloc(l);
278 					vals[vcl..vcl+l] = ta[];
279 					vcl += l;
280 					break;
281 				case DateTimeRef:
282 					isRef = true; goto case;
283 				case DateTime:
284 					types[ct++] = SQLType.DATETIME;
285 					types[ct++] = SIGNED;
286 					auto dt = isRef? *v.get!DateTimeRef : v.get!DateTime;
287 					ubyte[] da = pack(dt);
288 					size_t l = da.length;
289 					reAlloc(l);
290 					vals[vcl..vcl+l] = da[];
291 					vcl += l;
292 					break;
293 				case TimestampRef:
294 					isRef = true; goto case;
295 				case Timestamp:
296 					types[ct++] = SQLType.TIMESTAMP;
297 					types[ct++] = SIGNED;
298 					auto tms = isRef? *v.get!TimestampRef : v.get!Timestamp;
299 					auto dt = mysql.protocol.packet_helpers.toDateTime(tms.rep);
300 					ubyte[] da = pack(dt);
301 					size_t l = da.length;
302 					reAlloc(l);
303 					vals[vcl..vcl+l] = da[];
304 					vcl += l;
305 					break;
306 				case TextRef:
307 					isRef = true; goto case;
308 				case Text:
309 					if (ext == SQLType.INFER_FROM_D_TYPE)
310 						types[ct++] = SQLType.VARCHAR;
311 					else
312 						types[ct++] = cast(ubyte) ext;
313 					types[ct++] = SIGNED;
314 					const char[] ca = isRef? *v.get!TextRef : v.get!Text;
315 					ubyte[] packed = packLCS(ca);
316 					reAlloc(packed.length);
317 					vals[vcl..vcl+packed.length] = packed[];
318 					vcl += packed.length;
319 					break;
320 				// TODO: this is the same as the Text case except for the get
321 				// call. These should be combined somehow.
322 				case CTextRef:
323 					isRef = true; goto case;
324 				case CText:
325 					if (ext == SQLType.INFER_FROM_D_TYPE)
326 						types[ct++] = SQLType.VARCHAR;
327 					else
328 						types[ct++] = cast(ubyte) ext;
329 					types[ct++] = SIGNED;
330 					const char[] ca = isRef? *v.get!CTextRef : v.get!CText;
331 					ubyte[] packed = packLCS(ca);
332 					reAlloc(packed.length);
333 					vals[vcl..vcl+packed.length] = packed[];
334 					vcl += packed.length;
335 					break;
336 				case BlobRef:
337 					isRef = true; goto case;
338 				case Blob:
339 				case CBlob:
340 					if (ext == SQLType.INFER_FROM_D_TYPE)
341 						types[ct++] = SQLType.TINYBLOB;
342 					else
343 						types[ct++] = cast(ubyte) ext;
344 					types[ct++] = SIGNED;
345 					const ubyte[] uba = isRef? *v.get!BlobRef : (ts == Blob ? v.get!Blob : v.get!CBlob);
346 					ubyte[] packed = packLCS(uba);
347 					reAlloc(packed.length);
348 					vals[vcl..vcl+packed.length] = packed[];
349 					vcl += packed.length;
350 					break;
351 				case Null:
352 					throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
353 			}
354 		}
355 		vals.length = vcl;
356 		return types;
357 	}
358 
359 	static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa)
360 	{
361 		assert(psa.length <= ushort.max); // parameter number is sent as short
362 		foreach (size_t i, PSN psn; psa)
363 		{
364 			if (!psn.chunkSize) continue;
365 			uint cs = psn.chunkSize;
366 			uint delegate(ubyte[]) @safe dg = psn.chunkDelegate;
367 
368 			ubyte[] chunk;
369 			chunk.length = cs+11;
370 			chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
371 			chunk[4] = CommandType.STMT_SEND_LONG_DATA;
372 			hStmt.packInto(chunk[5..9]); // statement handle
373 			packInto(cast(ushort)i, chunk[9..11]); // parameter number
374 
375 			// byte 11 on is payload
376 			for (;;)
377 			{
378 				uint sent = dg(chunk[11..cs+11]);
379 				if (sent < cs)
380 				{
381 					if (sent == 0)    // data was exact multiple of chunk size - all sent
382 						break;
383 					chunk.length = chunk.length - (cs-sent);     // trim the chunk
384 					sent += 7;        // adjust for non-payload bytes
385 					packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
386 					socket.send(chunk);
387 					break;
388 				}
389 				socket.send(chunk);
390 			}
391 		}
392 	}
393 
394 	static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh,
395 		MySQLVal[] inParams, ParameterSpecialization[] psa)
396 	{
397 		conn.autoPurge();
398 
399 		ubyte[] packet;
400 		conn.resetPacket();
401 
402 		ubyte[] prefix = makePSPrefix(hStmt, 0);
403 		size_t len = prefix.length;
404 		bool longData;
405 
406 		if (psh.paramCount)
407 		{
408 			ubyte[] one = [ 1 ];
409 			ubyte[] vals;
410 			ubyte[] types = analyseParams(inParams, psa, vals, longData);
411 			ubyte[] nbm = makeBitmap(inParams);
412 			packet = prefix ~ nbm ~ one ~ types ~ vals;
413 		}
414 		else
415 			packet = prefix;
416 
417 		if (longData)
418 			sendLongData(conn._socket, hStmt, psa);
419 
420 		assert(packet.length <= uint.max);
421 		packet.setPacketHeader(conn.pktNumber);
422 		conn.bumpPacket();
423 		conn._socket.send(packet);
424 	}
425 }
426 
427 package(mysql) struct ExecQueryImplInfo
428 {
429 	bool isPrepared;
430 
431 	// For non-prepared statements:
432 	const(char[]) sql;
433 
434 	// For prepared statements:
435 	uint hStmt;
436 	PreparedStmtHeaders psh;
437 	MySQLVal[] inParams;
438 	ParameterSpecialization[] psa;
439 }
440 
441 /++
442 Internal implementation for the exec and query functions.
443 
444 Execute a one-off SQL command.
445 
446 Any result set can be accessed via Connection.getNextRow(), but you should really be
447 using the query function for such queries.
448 
449 Params: ra = An out parameter to receive the number of rows affected.
450 Returns: true if there was a (possibly empty) result set.
451 +/
452 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra)
453 {
454 	scope(failure) conn.kill();
455 
456 	// Send data
457 	if(info.isPrepared)
458 	{
459 		logTrace("prepared SQL: %s", info.hStmt);
460 
461 		ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa);
462 	}
463 	else
464 	{
465 		logTrace("exec query: %s", info.sql);
466 
467 		conn.sendCmd(CommandType.QUERY, info.sql);
468 		conn._fieldCount = 0;
469 	}
470 
471 	// Handle response
472 	ubyte[] packet = conn.getPacket();
473 	bool rv;
474 	if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
475 	{
476 		conn.resetPacket();
477 		auto okp = OKErrorPacket(packet);
478 
479 		if(okp.error) {
480 			logError("packet error: %s", cast(string) okp.message);
481 		}
482 
483 		enforcePacketOK(okp);
484 		ra = okp.affected;
485 		conn._serverStatus = okp.serverStatus;
486 		conn._insertID = okp.insertID;
487 		rv = false;
488 	}
489 	else
490 	{
491 		// There was presumably a result set
492 		assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value
493 		conn._headersPending = conn._rowsPending = true;
494 		conn._binaryPending = info.isPrepared;
495 		auto lcb = packet.consumeIfComplete!LCB();
496 		assert(!lcb.isNull);
497 		assert(!lcb.isIncomplete);
498 		conn._fieldCount = cast(ushort)lcb.value;
499 		assert(conn._fieldCount == lcb.value);
500 		rv = true;
501 		ra = 0;
502 	}
503 	return rv;
504 }
505 
506 ///ditto
507 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info)
508 {
509 	ulong rowsAffected;
510 	return execQueryImpl(conn, info, rowsAffected);
511 }
512 
513 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId)
514 {
515 	scope(failure) conn.kill();
516 
517 	if(conn.closed())
518 		return;
519 
520 	ubyte[9] packet_buf;
521 	ubyte[] packet = packet_buf;
522 	packet.setPacketHeader(0/*packet number*/);
523 	conn.bumpPacket();
524 	packet[4] = CommandType.STMT_CLOSE;
525 	statementId.packInto(packet[5..9]);
526 	conn.purgeResult();
527 	conn._socket.send(packet);
528 	// It seems that the server does not find it necessary to send a response
529 	// for this command.
530 }
531 
532 // Moved here from `struct Row`
533 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure
534 {
535 	uint bitmapLength = calcBitmapLength(fieldCount);
536 	enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields");
537 	auto bitmap = packet.consume(bitmapLength);
538 	return decodeNullBitmap(bitmap, fieldCount);
539 }
540 
541 // Moved here from `struct Row`
542 private static uint calcBitmapLength(uint fieldCount) pure nothrow
543 {
544 	return (fieldCount+7+2)/8;
545 }
546 
547 // Moved here from `struct Row`
548 // This is to decode the bitmap in a binary result row. First two bits are skipped
549 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow
550 in
551 {
552 	assert(bitmap.length >= calcBitmapLength(numFields),
553 		"bitmap not large enough to store all null fields");
554 }
555 out(result)
556 {
557 	assert(result.length == numFields);
558 }
559 do
560 {
561 	bool[] nulls;
562 	nulls.length = numFields;
563 
564 	// the current byte we are processing for nulls
565 	ubyte bits = bitmap.front();
566 	// strip away the first two bits as they are reserved
567 	bits >>= 2;
568 	// .. and then we only have 6 bits left to process for this byte
569 	ubyte bitsLeftInByte = 6;
570 	foreach(ref isNull; nulls)
571 	{
572 		assert(bitsLeftInByte <= 8);
573 		// processed all bits? fetch new byte
574 		if (bitsLeftInByte == 0)
575 		{
576 			assert(bits == 0, "not all bits are processed!");
577 			assert(!bitmap.empty, "bits array too short for number of columns");
578 			bitmap.popFront();
579 			bits = bitmap.front;
580 			bitsLeftInByte = 8;
581 		}
582 		assert(bitsLeftInByte > 0);
583 		isNull = (bits & 0b0000_0001) != 0;
584 
585 		// get ready to process next bit
586 		bits >>= 1;
587 		--bitsLeftInByte;
588 	}
589 	return nulls;
590 }
591 
592 // Moved here from `struct Row.this`
593 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary,
594 	out MySQLVal[] _values, out bool[] _nulls, out string[] _names)
595 in
596 {
597 	assert(rh.fieldCount <= uint.max);
598 }
599 do
600 {
601 	scope(failure) conn.kill();
602 
603 	uint fieldCount = cast(uint)rh.fieldCount;
604 	_values.length = _nulls.length = _names.length = fieldCount;
605 
606 	if(binary)
607 	{
608 		// There's a null byte header on a binary result sequence, followed by some bytes of bitmap
609 		// indicating which columns are null
610 		enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row");
611 		packet.popFront();
612 		_nulls = consumeNullBitmap(packet, fieldCount);
613 	}
614 
615 	foreach(size_t i; 0..fieldCount)
616 	{
617 		if(binary && _nulls[i])
618 		{
619 			_values[i] = null;
620 			_names[i] = rh[i].name;
621 			continue;
622 		}
623 
624 		SQLValue sqlValue;
625 		do
626 		{
627 			FieldDescription fd = rh[i];
628 			_names[i] = fd.name;
629 			sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet);
630 			// TODO: Support chunk delegate
631 			if(sqlValue.isIncomplete)
632 				packet ~= conn.getPacket();
633 		} while(sqlValue.isIncomplete);
634 		assert(!sqlValue.isIncomplete);
635 
636 		if(sqlValue.isNull)
637 		{
638 			assert(!binary);
639 			assert(!_nulls[i]);
640 			_nulls[i] = true;
641 			_values[i] = null;
642 		}
643 		else
644 		{
645 			_values[i] = sqlValue.value;
646 		}
647 	}
648 }
649 
650 ////// Moved here from Connection /////////////////////////////////
651 
652 package(mysql) ubyte[] getPacket(Connection conn)
653 {
654 	scope(failure) conn.kill();
655 
656 	ubyte[4] header;
657 	conn._socket.read(header);
658 	// number of bytes always set as 24-bit
659 	uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0];
660 	enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order");
661 	conn.bumpPacket();
662 
663 	ubyte[] packet = new ubyte[numDataBytes];
664 	conn._socket.read(packet);
665 	assert(packet.length == numDataBytes, "Wrong number of bytes read");
666 	return packet;
667 }
668 
669 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet)
670 in
671 {
672 	assert(packet.length > 4); // at least 1 byte more than header
673 }
674 do
675 {
676 	_socket.write(packet);
677 }
678 
679 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data)
680 in
681 {
682 	assert(header.length == 4 || header.length == 5/*command type included*/);
683 }
684 do
685 {
686 	_socket.write(header);
687 	if(data.length)
688 		_socket.write(data);
689 }
690 
691 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data)
692 in
693 {
694 	// Internal thread states. Clients shouldn't use this
695 	assert(cmd != CommandType.SLEEP);
696 	assert(cmd != CommandType.CONNECT);
697 	assert(cmd != CommandType.TIME);
698 	assert(cmd != CommandType.DELAYED_INSERT);
699 	assert(cmd != CommandType.CONNECT_OUT);
700 
701 	// Deprecated
702 	assert(cmd != CommandType.CREATE_DB);
703 	assert(cmd != CommandType.DROP_DB);
704 	assert(cmd != CommandType.TABLE_DUMP);
705 
706 	// cannot send more than uint.max bytes. TODO: better error message if we try?
707 	assert(data.length <= uint.max);
708 }
709 out
710 {
711 	// at this point we should have sent a command
712 	assert(conn.pktNumber == 1);
713 }
714 do
715 {
716 	scope(failure) conn.kill();
717 
718 	conn._lastCommandID++;
719 
720 	if(!conn._socket.connected)
721 	{
722 		if(cmd == CommandType.QUIT)
723 			return; // Don't bother reopening connection just to quit
724 
725 		conn._open = Connection.OpenState.notConnected;
726 		conn.connect(conn._clientCapabilities);
727 	}
728 
729 	conn.autoPurge();
730 
731 	conn.resetPacket();
732 
733 	ubyte[] header;
734 	header.length = 4 /*header*/ + 1 /*cmd*/;
735 	header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/);
736 	header[4] = cmd;
737 	conn.bumpPacket();
738 
739 	conn._socket.send(header, cast(const(ubyte)[])data);
740 }
741 
742 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false)
743 {
744 	auto okp = OKErrorPacket(conn.getPacket());
745 	enforcePacketOK(okp);
746 	conn._serverStatus = okp.serverStatus;
747 	return okp;
748 }
749 
750 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token)
751 in
752 {
753 	assert(token.length == 20);
754 }
755 do
756 {
757 	ubyte[] packet;
758 	packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1);
759 	packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append
760 
761 	// NOTE: we'll set the header last when we know the size
762 
763 	// Set the default capabilities required by the client
764 	conn._cCaps.packInto(packet[4..8]);
765 
766 	// Request a conventional maximum packet length.
767 	1.packInto(packet[8..12]);
768 
769 	packet ~= getDefaultCollation(conn._serverVersion);
770 
771 	// There's a statutory block of zero bytes here - fill them in.
772 	foreach(i; 0 .. 23)
773 		packet ~= 0;
774 
775 	// Add the user name as a null terminated string
776 	foreach(i; 0 .. conn._user.length)
777 		packet ~= conn._user[i];
778 	packet ~= 0; // \0
779 
780 	// Add our calculated authentication token as a length prefixed string.
781 	assert(token.length <= ubyte.max);
782 	if(conn._pwd.length == 0)  // Omit the token if the account has no password
783 		packet ~= 0;
784 	else
785 	{
786 		packet ~= cast(ubyte)token.length;
787 		foreach(i; 0 .. token.length)
788 			packet ~= token[i];
789 	}
790 
791 	// Add the default database as a null terminated string
792 	foreach(i; 0 .. conn._db.length)
793 		packet ~= conn._db[i];
794 	packet ~= 0; // \0
795 
796 	// The server sent us a greeting with packet number 0, so we send the auth packet
797 	// back with the next number.
798 	packet.setPacketHeader(conn.pktNumber);
799 	conn.bumpPacket();
800 	return packet;
801 }
802 
803 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf)
804 {
805 	auto pass1 = sha1Of(cast(const(ubyte)[])password);
806 	auto pass2 = sha1Of(pass1);
807 
808 	SHA1 sha1;
809 	sha1.start();
810 	sha1.put(authBuf);
811 	sha1.put(pass2);
812 	auto result = sha1.finish();
813 	foreach (size_t i; 0..20)
814 		result[i] = result[i] ^ pass1[i];
815 	return result.dup;
816 }
817 
818 /// Get the next `mysql.result.Row` of a pending result set.
819 package(mysql) SafeRow getNextRow(Connection conn)
820 {
821 	scope(failure) conn.kill();
822 
823 	if (conn._headersPending)
824 	{
825 		conn._rsh = ResultSetHeaders(conn, conn._fieldCount);
826 		conn._headersPending = false;
827 	}
828 	ubyte[] packet;
829 	SafeRow rr;
830 	packet = conn.getPacket();
831 	if(packet.front == ResultPacketMarker.error)
832 		throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__);
833 
834 	if (packet.isEOFPacket())
835 	{
836 		conn._rowsPending = conn._binaryPending = false;
837 		return rr;
838 	}
839 	if (conn._binaryPending)
840 		rr = SafeRow(conn, packet, conn._rsh, true);
841 	else
842 		rr = SafeRow(conn, packet, conn._rsh, false);
843 	//rr._valid = true;
844 	return rr;
845 }
846 
847 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet)
848 {
849 	scope(failure) conn.kill();
850 
851 	conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes)
852 	conn._sCharSet = packet.consume!ubyte(); // server_language
853 	conn._serverStatus = packet.consume!ushort(); //server_status
854 	conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes)
855 	conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec
856 
857 	enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1");
858 	enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection");
859 }
860 
861 package(mysql) ubyte[] parseGreeting(Connection conn)
862 {
863 	scope(failure) conn.kill();
864 
865 	ubyte[] packet = conn.getPacket();
866 
867 	if (packet.length > 0 && packet[0] == ResultPacketMarker.error)
868 	{
869 		auto okp = OKErrorPacket(packet);
870 		enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message);
871 	}
872 
873 	conn._protocol = packet.consume!ubyte();
874 
875 	conn._serverVersion = packet.consume!string(packet.countUntil(0));
876 	packet.skip(1); // \0 terminated _serverVersion
877 
878 	conn._sThread = packet.consume!uint();
879 
880 	// read first part of scramble buf
881 	ubyte[] authBuf;
882 	authBuf.length = 255;
883 	authBuf[0..8] = packet.consume(8)[]; // scramble_buff
884 
885 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0");
886 
887 	conn.consumeServerInfo(packet);
888 
889 	packet.skip(1); // this byte supposed to be scramble length, but is actually zero
890 	packet.skip(10); // filler of \0
891 
892 	// rest of the scramble
893 	auto len = packet.countUntil(0);
894 	enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes");
895 	enforce(authBuf.length > 8+len);
896 	authBuf[8..8+len] = packet.consume(len)[];
897 	authBuf.length = 8+len; // cut to correct size
898 	enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf");
899 
900 	return authBuf;
901 }
902 
903 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure
904 {
905 	SvrCapFlags common;
906 	uint filter = 1;
907 	foreach (size_t i; 0..uint.sizeof)
908 	{
909 		bool serverSupport = (server & filter) != 0; // can the server do this capability?
910 		bool clientSupport = (client & filter) != 0; // can we support it?
911 		if(serverSupport && clientSupport)
912 			common |= filter;
913 		filter <<= 1; // check next flag
914 	}
915 	return common;
916 }
917 
918 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags)
919 {
920 	auto cCaps = getCommonCapabilities(serverCaps, capFlags);
921 
922 	// We cannot operate in <4.1 protocol, so we'll force it even if the user
923 	// didn't supply it
924 	cCaps |= SvrCapFlags.PROTOCOL41;
925 	cCaps |= SvrCapFlags.SECURE_CONNECTION;
926 
927 	return cCaps;
928 }
929 
930 package(mysql) void authenticate(Connection conn, ubyte[] greeting)
931 in
932 {
933 	assert(conn._open == Connection.OpenState.connected);
934 }
935 out
936 {
937 	assert(conn._open == Connection.OpenState.authenticated);
938 }
939 do
940 {
941 	auto token = makeToken(conn._pwd, greeting);
942 	auto authPacket = conn.buildAuthPacket(token);
943 	conn._socket.send(authPacket);
944 
945 	auto packet = conn.getPacket();
946 	auto okp = OKErrorPacket(packet);
947 
948 	if(okp.error) {
949 		logError("Authentication failure: %s", cast(string) okp.message);
950 	}
951 
952 	enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message);
953 	conn._open = Connection.OpenState.authenticated;
954 }
955 
956 // Register prepared statement
957 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql)
958 {
959 	scope(failure) conn.kill();
960 
961 	PreparedServerInfo info;
962 
963 	conn.sendCmd(CommandType.STMT_PREPARE, sql);
964 	conn._fieldCount = 0;
965 
966 	ubyte[] packet = conn.getPacket();
967 	if(packet.front == ResultPacketMarker.ok)
968 	{
969 		packet.popFront();
970 		info.statementId    = packet.consume!int();
971 		conn._fieldCount    = packet.consume!short();
972 		info.numParams      = packet.consume!short();
973 
974 		packet.popFront(); // one byte filler
975 		info.psWarnings     = packet.consume!short();
976 
977 		// At this point the server also sends field specs for parameters
978 		// and columns if there were any of each
979 		info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams);
980 	}
981 	else if(packet.front == ResultPacketMarker.error)
982 	{
983 		auto error = OKErrorPacket(packet);
984 		enforcePacketOK(error);
985 		logCritical("Unexpected failure: %s", cast(string) error.message);
986 		assert(0); // FIXME: what now?
987 	}
988 	else
989 		assert(0); // FIXME: what now?
990 
991 	return info;
992 }
993 
994 /++
995 Flush any outstanding result set elements.
996 
997 When the server responds to a command that produces a result set, it
998 queues the whole set of corresponding packets over the current connection.
999 Before that `Connection` can embark on any new command, it must receive
1000 all of those packets and junk them.
1001 
1002 As of v1.1.4, this is done automatically as needed. But you can still
1003 call this manually to force a purge to occur when you want.
1004 
1005 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/)
1006 +/
1007 package(mysql) ulong purgeResult(Connection conn)
1008 {
1009 	scope(failure) conn.kill();
1010 
1011 	conn._lastCommandID++;
1012 
1013 	ulong rows = 0;
1014 	if (conn._headersPending)
1015 	{
1016 		for (size_t i = 0;; i++)
1017 		{
1018 			if (conn.getPacket().isEOFPacket())
1019 			{
1020 				conn._headersPending = false;
1021 				break;
1022 			}
1023 			enforce!MYXProtocol(i < conn._fieldCount,
1024 				text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found."));
1025 		}
1026 	}
1027 	if (conn._rowsPending)
1028 	{
1029 		for (;;  rows++)
1030 		{
1031 			if (conn.getPacket().isEOFPacket())
1032 			{
1033 				conn._rowsPending = conn._binaryPending = false;
1034 				break;
1035 			}
1036 		}
1037 	}
1038 	conn.resetPacket();
1039 	return rows;
1040 }
1041 
1042 /++
1043 Get a textual report on the server status.
1044 
1045 (COM_STATISTICS)
1046 +/
1047 package(mysql) string serverStats(Connection conn)
1048 {
1049 	conn.sendCmd(CommandType.STATISTICS, []);
1050 	auto result = conn.getPacket();
1051 	return (() @trusted => cast(string)result)();
1052 }
1053 
1054 /++
1055 Enable multiple statement commands.
1056 
1057 This can be used later if this feature was not requested in the client capability flags.
1058 
1059 Warning: This functionality is currently untested.
1060 
1061 Params:
1062 	conn = The connection.
1063 	on = Boolean value to turn the capability on or off.
1064 +/
1065 //TODO: Need to test this
1066 package(mysql) void enableMultiStatements(Connection conn, bool on)
1067 {
1068 	scope(failure) conn.kill();
1069 
1070 	ubyte[] t;
1071 	t.length = 2;
1072 	t[0] = on ? 0 : 1;
1073 	t[1] = 0;
1074 	conn.sendCmd(CommandType.STMT_OPTION, t);
1075 
1076 	// For some reason this command gets an EOF packet as response
1077 	auto packet = conn.getPacket();
1078 	enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command");
1079 }
1080 
1081 private ubyte getDefaultCollation(string serverVersion)
1082 {
1083 	import std.array : array;
1084 	// MySQL >= 5.5.3 supports utf8mb4
1085 	const v = serverVersion
1086 		.splitter('.')
1087 		.map!(a => a.parse!ushort)
1088 		.array;
1089 
1090 	if (v[0] < 5)
1091 		return 33; // Set utf8_general_ci as default
1092 	if (v[1] < 5)
1093 		return 33; // Set utf8_general_ci as default
1094 	if (v[2] < 3)
1095 		return 33; // Set utf8_general_ci as default
1096 
1097 	return 45; // Set utf8mb4_general_ci as default
1098 }
1099 
1100 unittest
1101 {
1102 	assert(getDefaultCollation("5.5.3") == 45);
1103 	assert(getDefaultCollation("5.5.2") == 33);
1104 
1105 	// MariaDB: https://mariadb.com/kb/en/connection/#initial-handshake-packet
1106 	assert(getDefaultCollation("5.5.5-10.0.7-MariaDB") == 45);
1107 }