1 /++ 2 Internal - Low-level communications. 3 4 Consider this module the main entry point for the low-level MySQL/MariaDB 5 protocol code. The other modules in `mysql.protocol` are mainly tools 6 to support this module. 7 8 Previously, the code handling low-level protocol details was scattered all 9 across the library. Such functionality has been factored out into this module, 10 to be kept in one place for better encapsulation and to facilitate further 11 cleanup and refactoring. 12 13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it 14 eventually settles into what will eventually become a low-level library 15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight... 16 17 Next tasks for this sub-package's cleanup: 18 - Reduce this module's reliance on Connection. 19 - Abstract out a PacketStream to clean up getPacket and related functionality. 20 +/ 21 module mysql.protocol.comms; 22 23 import std.algorithm; 24 import std.conv; 25 import std.digest.sha; 26 import std.exception; 27 import std.range; 28 29 import mysql.connection; 30 import mysql.exceptions; 31 import mysql.logger; 32 import mysql.safe.prepared; 33 import mysql.result; 34 import mysql.types; 35 36 import mysql.protocol.constants; 37 import mysql.protocol.extra_types; 38 import mysql.protocol.packet_helpers; 39 import mysql.protocol.packets; 40 import mysql.protocol.sockets; 41 42 @safe: 43 44 /// Low-level comms code relating to prepared statements. 45 package struct ProtocolPrepared 46 { 47 @safe: 48 import std.conv; 49 import std.datetime; 50 import mysql.types; 51 52 static ubyte[] makeBitmap(in MySQLVal[] inParams) 53 { 54 size_t bml = (inParams.length+7)/8; 55 ubyte[] bma; 56 bma.length = bml; 57 foreach (i; 0..inParams.length) 58 { 59 if(inParams[i].kind != MySQLVal.Kind.Null) 60 continue; 61 size_t bn = i/8; 62 size_t bb = i%8; 63 ubyte sr = 1; 64 sr <<= bb; 65 bma[bn] |= sr; 66 } 67 return bma; 68 } 69 70 static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow 71 { 72 ubyte[] prefix; 73 prefix.length = 14; 74 75 prefix[4] = CommandType.STMT_EXECUTE; 76 hStmt.packInto(prefix[5..9]); 77 prefix[9] = flags; // flags, no cursor 78 prefix[10] = 1; // iteration count - currently always 1 79 prefix[11] = 0; 80 prefix[12] = 0; 81 prefix[13] = 0; 82 83 return prefix; 84 } 85 86 static ubyte[] analyseParams(MySQLVal[] inParams, ParameterSpecialization[] psa, 87 out ubyte[] vals, out bool longData) 88 { 89 import taggedalgebraic.taggedalgebraic : get; 90 91 size_t pc = inParams.length; 92 ubyte[] types; 93 types.length = pc*2; 94 size_t alloc = pc*20; 95 vals.length = alloc; 96 uint vcl = 0, len; 97 int ct = 0; 98 99 void reAlloc(size_t n) 100 { 101 if (vcl+n < alloc) 102 return; 103 size_t inc = (alloc*3)/2; 104 if (inc < n) 105 inc = n; 106 alloc += inc; 107 vals.length = alloc; 108 } 109 110 foreach (size_t i; 0..pc) 111 { 112 enum UNSIGNED = 0x80; 113 enum SIGNED = 0; 114 if (psa[i].chunkSize) 115 longData= true; 116 if (inParams[i].kind == MySQLVal.Kind.Null) 117 { 118 types[ct++] = SQLType.NULL; 119 types[ct++] = SIGNED; 120 continue; 121 } 122 MySQLVal v = inParams[i]; 123 SQLType ext = psa[i].type; 124 auto ts = v.kind; 125 bool isRef = false; 126 127 // TODO: use v.visit instead for more efficiency and shorter code. 128 with(MySQLVal.Kind) final switch (ts) 129 { 130 case BitRef: 131 isRef = true; goto case; 132 case Bit: 133 if (ext == SQLType.INFER_FROM_D_TYPE) 134 types[ct++] = SQLType.BIT; 135 else 136 types[ct++] = cast(ubyte) ext; 137 types[ct++] = SIGNED; 138 reAlloc(2); 139 bool bv = isRef? *v.get!BitRef : v.get!Bit; 140 vals[vcl++] = 1; 141 vals[vcl++] = bv? 0x31: 0x30; 142 break; 143 case ByteRef: 144 isRef = true; goto case; 145 case Byte: 146 types[ct++] = SQLType.TINY; 147 types[ct++] = SIGNED; 148 reAlloc(1); 149 vals[vcl++] = isRef? *v.get!ByteRef : v.get!Byte; 150 break; 151 case UByteRef: 152 isRef = true; goto case; 153 case UByte: 154 types[ct++] = SQLType.TINY; 155 types[ct++] = UNSIGNED; 156 reAlloc(1); 157 vals[vcl++] = isRef? *v.get!UByteRef : v.get!UByte; 158 break; 159 case ShortRef: 160 isRef = true; goto case; 161 case Short: 162 types[ct++] = SQLType.SHORT; 163 types[ct++] = SIGNED; 164 reAlloc(2); 165 short si = isRef? *v.get!ShortRef : v.get!Short; 166 vals[vcl++] = cast(ubyte) (si & 0xff); 167 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 168 break; 169 case UShortRef: 170 isRef = true; goto case; 171 case UShort: 172 types[ct++] = SQLType.SHORT; 173 types[ct++] = UNSIGNED; 174 reAlloc(2); 175 ushort us = isRef? *v.get!UShortRef : v.get!UShort; 176 vals[vcl++] = cast(ubyte) (us & 0xff); 177 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 178 break; 179 case IntRef: 180 isRef = true; goto case; 181 case Int: 182 types[ct++] = SQLType.INT; 183 types[ct++] = SIGNED; 184 reAlloc(4); 185 int ii = isRef? *v.get!IntRef : v.get!Int; 186 vals[vcl++] = cast(ubyte) (ii & 0xff); 187 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 188 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 189 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 190 break; 191 case UIntRef: 192 isRef = true; goto case; 193 case UInt: 194 types[ct++] = SQLType.INT; 195 types[ct++] = UNSIGNED; 196 reAlloc(4); 197 uint ui = isRef? *v.get!UIntRef : v.get!UInt; 198 vals[vcl++] = cast(ubyte) (ui & 0xff); 199 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 200 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 201 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 202 break; 203 case LongRef: 204 isRef = true; goto case; 205 case Long: 206 types[ct++] = SQLType.LONGLONG; 207 types[ct++] = SIGNED; 208 reAlloc(8); 209 long li = isRef? *v.get!LongRef : v.get!Long; 210 vals[vcl++] = cast(ubyte) (li & 0xff); 211 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 212 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 213 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 214 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 215 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 216 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 217 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 218 break; 219 case ULongRef: 220 isRef = true; goto case; 221 case ULong: 222 types[ct++] = SQLType.LONGLONG; 223 types[ct++] = UNSIGNED; 224 reAlloc(8); 225 ulong ul = isRef? *v.get!ULongRef : v.get!ULong; 226 vals[vcl++] = cast(ubyte) (ul & 0xff); 227 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 228 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 229 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 230 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 231 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 232 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 233 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 234 break; 235 case FloatRef: 236 isRef = true; goto case; 237 case Float: 238 types[ct++] = SQLType.FLOAT; 239 types[ct++] = SIGNED; 240 reAlloc(4); 241 float[1] f = isRef? *v.get!FloatRef : v.get!Float; 242 ubyte[] uba = cast(ubyte[]) f[]; 243 vals[vcl .. vcl + uba.length] = uba[]; 244 vcl += uba.length; 245 break; 246 case DoubleRef: 247 isRef = true; goto case; 248 case Double: 249 types[ct++] = SQLType.DOUBLE; 250 types[ct++] = SIGNED; 251 reAlloc(8); 252 double[1] d = isRef? *v.get!DoubleRef : v.get!Double; 253 ubyte[] uba = cast(ubyte[]) d[]; 254 vals[vcl .. uba.length] = uba[]; 255 vcl += uba.length; 256 break; 257 case DateRef: 258 isRef = true; goto case; 259 case Date: 260 types[ct++] = SQLType.DATE; 261 types[ct++] = SIGNED; 262 auto date = isRef? *v.get!DateRef : v.get!Date; 263 ubyte[] da = pack(date); 264 size_t l = da.length; 265 reAlloc(l); 266 vals[vcl..vcl+l] = da[]; 267 vcl += l; 268 break; 269 case TimeRef: 270 isRef = true; goto case; 271 case Time: 272 types[ct++] = SQLType.TIME; 273 types[ct++] = SIGNED; 274 auto time = isRef? *v.get!TimeRef : v.get!Time; 275 ubyte[] ta = pack(time); 276 size_t l = ta.length; 277 reAlloc(l); 278 vals[vcl..vcl+l] = ta[]; 279 vcl += l; 280 break; 281 case DateTimeRef: 282 isRef = true; goto case; 283 case DateTime: 284 types[ct++] = SQLType.DATETIME; 285 types[ct++] = SIGNED; 286 auto dt = isRef? *v.get!DateTimeRef : v.get!DateTime; 287 ubyte[] da = pack(dt); 288 size_t l = da.length; 289 reAlloc(l); 290 vals[vcl..vcl+l] = da[]; 291 vcl += l; 292 break; 293 case TimestampRef: 294 isRef = true; goto case; 295 case Timestamp: 296 types[ct++] = SQLType.TIMESTAMP; 297 types[ct++] = SIGNED; 298 auto tms = isRef? *v.get!TimestampRef : v.get!Timestamp; 299 auto dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 300 ubyte[] da = pack(dt); 301 size_t l = da.length; 302 reAlloc(l); 303 vals[vcl..vcl+l] = da[]; 304 vcl += l; 305 break; 306 case TextRef: 307 isRef = true; goto case; 308 case Text: 309 if (ext == SQLType.INFER_FROM_D_TYPE) 310 types[ct++] = SQLType.VARCHAR; 311 else 312 types[ct++] = cast(ubyte) ext; 313 types[ct++] = SIGNED; 314 const char[] ca = isRef? *v.get!TextRef : v.get!Text; 315 ubyte[] packed = packLCS(ca); 316 reAlloc(packed.length); 317 vals[vcl..vcl+packed.length] = packed[]; 318 vcl += packed.length; 319 break; 320 // TODO: this is the same as the Text case except for the get 321 // call. These should be combined somehow. 322 case CTextRef: 323 isRef = true; goto case; 324 case CText: 325 if (ext == SQLType.INFER_FROM_D_TYPE) 326 types[ct++] = SQLType.VARCHAR; 327 else 328 types[ct++] = cast(ubyte) ext; 329 types[ct++] = SIGNED; 330 const char[] ca = isRef? *v.get!CTextRef : v.get!CText; 331 ubyte[] packed = packLCS(ca); 332 reAlloc(packed.length); 333 vals[vcl..vcl+packed.length] = packed[]; 334 vcl += packed.length; 335 break; 336 case BlobRef: 337 isRef = true; goto case; 338 case Blob: 339 case CBlob: 340 if (ext == SQLType.INFER_FROM_D_TYPE) 341 types[ct++] = SQLType.TINYBLOB; 342 else 343 types[ct++] = cast(ubyte) ext; 344 types[ct++] = SIGNED; 345 const ubyte[] uba = isRef? *v.get!BlobRef : (ts == Blob ? v.get!Blob : v.get!CBlob); 346 ubyte[] packed = packLCS(uba); 347 reAlloc(packed.length); 348 vals[vcl..vcl+packed.length] = packed[]; 349 vcl += packed.length; 350 break; 351 case Null: 352 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 353 } 354 } 355 vals.length = vcl; 356 return types; 357 } 358 359 static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa) 360 { 361 assert(psa.length <= ushort.max); // parameter number is sent as short 362 foreach (size_t i, PSN psn; psa) 363 { 364 if (!psn.chunkSize) continue; 365 uint cs = psn.chunkSize; 366 uint delegate(ubyte[]) @safe dg = psn.chunkDelegate; 367 368 ubyte[] chunk; 369 chunk.length = cs+11; 370 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 371 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 372 hStmt.packInto(chunk[5..9]); // statement handle 373 packInto(cast(ushort)i, chunk[9..11]); // parameter number 374 375 // byte 11 on is payload 376 for (;;) 377 { 378 uint sent = dg(chunk[11..cs+11]); 379 if (sent < cs) 380 { 381 if (sent == 0) // data was exact multiple of chunk size - all sent 382 break; 383 chunk.length = chunk.length - (cs-sent); // trim the chunk 384 sent += 7; // adjust for non-payload bytes 385 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 386 socket.send(chunk); 387 break; 388 } 389 socket.send(chunk); 390 } 391 } 392 } 393 394 static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, 395 MySQLVal[] inParams, ParameterSpecialization[] psa) 396 { 397 conn.autoPurge(); 398 399 ubyte[] packet; 400 conn.resetPacket(); 401 402 ubyte[] prefix = makePSPrefix(hStmt, 0); 403 size_t len = prefix.length; 404 bool longData; 405 406 if (psh.paramCount) 407 { 408 ubyte[] one = [ 1 ]; 409 ubyte[] vals; 410 ubyte[] types = analyseParams(inParams, psa, vals, longData); 411 ubyte[] nbm = makeBitmap(inParams); 412 packet = prefix ~ nbm ~ one ~ types ~ vals; 413 } 414 else 415 packet = prefix; 416 417 if (longData) 418 sendLongData(conn._socket, hStmt, psa); 419 420 assert(packet.length <= uint.max); 421 packet.setPacketHeader(conn.pktNumber); 422 conn.bumpPacket(); 423 conn._socket.send(packet); 424 } 425 } 426 427 package(mysql) struct ExecQueryImplInfo 428 { 429 bool isPrepared; 430 431 // For non-prepared statements: 432 const(char[]) sql; 433 434 // For prepared statements: 435 uint hStmt; 436 PreparedStmtHeaders psh; 437 MySQLVal[] inParams; 438 ParameterSpecialization[] psa; 439 } 440 441 /++ 442 Internal implementation for the exec and query functions. 443 444 Execute a one-off SQL command. 445 446 Any result set can be accessed via Connection.getNextRow(), but you should really be 447 using the query function for such queries. 448 449 Params: ra = An out parameter to receive the number of rows affected. 450 Returns: true if there was a (possibly empty) result set. 451 +/ 452 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra) 453 { 454 scope(failure) conn.kill(); 455 456 // Send data 457 if(info.isPrepared) 458 { 459 logTrace("prepared SQL: %s", info.hStmt); 460 461 ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa); 462 } 463 else 464 { 465 logTrace("exec query: %s", info.sql); 466 467 conn.sendCmd(CommandType.QUERY, info.sql); 468 conn._fieldCount = 0; 469 } 470 471 // Handle response 472 ubyte[] packet = conn.getPacket(); 473 bool rv; 474 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 475 { 476 conn.resetPacket(); 477 auto okp = OKErrorPacket(packet); 478 479 if(okp.error) { 480 logError("packet error: %s", cast(string) okp.message); 481 } 482 483 enforcePacketOK(okp); 484 ra = okp.affected; 485 conn._serverStatus = okp.serverStatus; 486 conn._insertID = okp.insertID; 487 rv = false; 488 } 489 else 490 { 491 // There was presumably a result set 492 assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value 493 conn._headersPending = conn._rowsPending = true; 494 conn._binaryPending = info.isPrepared; 495 auto lcb = packet.consumeIfComplete!LCB(); 496 assert(!lcb.isNull); 497 assert(!lcb.isIncomplete); 498 conn._fieldCount = cast(ushort)lcb.value; 499 assert(conn._fieldCount == lcb.value); 500 rv = true; 501 ra = 0; 502 } 503 return rv; 504 } 505 506 ///ditto 507 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info) 508 { 509 ulong rowsAffected; 510 return execQueryImpl(conn, info, rowsAffected); 511 } 512 513 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId) 514 { 515 scope(failure) conn.kill(); 516 517 if(conn.closed()) 518 return; 519 520 ubyte[9] packet_buf; 521 ubyte[] packet = packet_buf; 522 packet.setPacketHeader(0/*packet number*/); 523 conn.bumpPacket(); 524 packet[4] = CommandType.STMT_CLOSE; 525 statementId.packInto(packet[5..9]); 526 conn.purgeResult(); 527 conn._socket.send(packet); 528 // It seems that the server does not find it necessary to send a response 529 // for this command. 530 } 531 532 // Moved here from `struct Row` 533 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure 534 { 535 uint bitmapLength = calcBitmapLength(fieldCount); 536 enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields"); 537 auto bitmap = packet.consume(bitmapLength); 538 return decodeNullBitmap(bitmap, fieldCount); 539 } 540 541 // Moved here from `struct Row` 542 private static uint calcBitmapLength(uint fieldCount) pure nothrow 543 { 544 return (fieldCount+7+2)/8; 545 } 546 547 // Moved here from `struct Row` 548 // This is to decode the bitmap in a binary result row. First two bits are skipped 549 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow 550 in 551 { 552 assert(bitmap.length >= calcBitmapLength(numFields), 553 "bitmap not large enough to store all null fields"); 554 } 555 out(result) 556 { 557 assert(result.length == numFields); 558 } 559 do 560 { 561 bool[] nulls; 562 nulls.length = numFields; 563 564 // the current byte we are processing for nulls 565 ubyte bits = bitmap.front(); 566 // strip away the first two bits as they are reserved 567 bits >>= 2; 568 // .. and then we only have 6 bits left to process for this byte 569 ubyte bitsLeftInByte = 6; 570 foreach(ref isNull; nulls) 571 { 572 assert(bitsLeftInByte <= 8); 573 // processed all bits? fetch new byte 574 if (bitsLeftInByte == 0) 575 { 576 assert(bits == 0, "not all bits are processed!"); 577 assert(!bitmap.empty, "bits array too short for number of columns"); 578 bitmap.popFront(); 579 bits = bitmap.front; 580 bitsLeftInByte = 8; 581 } 582 assert(bitsLeftInByte > 0); 583 isNull = (bits & 0b0000_0001) != 0; 584 585 // get ready to process next bit 586 bits >>= 1; 587 --bitsLeftInByte; 588 } 589 return nulls; 590 } 591 592 // Moved here from `struct Row.this` 593 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary, 594 out MySQLVal[] _values, out bool[] _nulls, out string[] _names) 595 in 596 { 597 assert(rh.fieldCount <= uint.max); 598 } 599 do 600 { 601 scope(failure) conn.kill(); 602 603 uint fieldCount = cast(uint)rh.fieldCount; 604 _values.length = _nulls.length = _names.length = fieldCount; 605 606 if(binary) 607 { 608 // There's a null byte header on a binary result sequence, followed by some bytes of bitmap 609 // indicating which columns are null 610 enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row"); 611 packet.popFront(); 612 _nulls = consumeNullBitmap(packet, fieldCount); 613 } 614 615 foreach(size_t i; 0..fieldCount) 616 { 617 if(binary && _nulls[i]) 618 { 619 _values[i] = null; 620 _names[i] = rh[i].name; 621 continue; 622 } 623 624 SQLValue sqlValue; 625 do 626 { 627 FieldDescription fd = rh[i]; 628 _names[i] = fd.name; 629 sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet); 630 // TODO: Support chunk delegate 631 if(sqlValue.isIncomplete) 632 packet ~= conn.getPacket(); 633 } while(sqlValue.isIncomplete); 634 assert(!sqlValue.isIncomplete); 635 636 if(sqlValue.isNull) 637 { 638 assert(!binary); 639 assert(!_nulls[i]); 640 _nulls[i] = true; 641 _values[i] = null; 642 } 643 else 644 { 645 _values[i] = sqlValue.value; 646 } 647 } 648 } 649 650 ////// Moved here from Connection ///////////////////////////////// 651 652 package(mysql) ubyte[] getPacket(Connection conn) 653 { 654 scope(failure) conn.kill(); 655 656 ubyte[4] header; 657 conn._socket.read(header); 658 // number of bytes always set as 24-bit 659 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 660 enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order"); 661 conn.bumpPacket(); 662 663 ubyte[] packet = new ubyte[numDataBytes]; 664 conn._socket.read(packet); 665 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 666 return packet; 667 } 668 669 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet) 670 in 671 { 672 assert(packet.length > 4); // at least 1 byte more than header 673 } 674 do 675 { 676 _socket.write(packet); 677 } 678 679 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data) 680 in 681 { 682 assert(header.length == 4 || header.length == 5/*command type included*/); 683 } 684 do 685 { 686 _socket.write(header); 687 if(data.length) 688 _socket.write(data); 689 } 690 691 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data) 692 in 693 { 694 // Internal thread states. Clients shouldn't use this 695 assert(cmd != CommandType.SLEEP); 696 assert(cmd != CommandType.CONNECT); 697 assert(cmd != CommandType.TIME); 698 assert(cmd != CommandType.DELAYED_INSERT); 699 assert(cmd != CommandType.CONNECT_OUT); 700 701 // Deprecated 702 assert(cmd != CommandType.CREATE_DB); 703 assert(cmd != CommandType.DROP_DB); 704 assert(cmd != CommandType.TABLE_DUMP); 705 706 // cannot send more than uint.max bytes. TODO: better error message if we try? 707 assert(data.length <= uint.max); 708 } 709 out 710 { 711 // at this point we should have sent a command 712 assert(conn.pktNumber == 1); 713 } 714 do 715 { 716 scope(failure) conn.kill(); 717 718 conn._lastCommandID++; 719 720 if(!conn._socket.connected) 721 { 722 if(cmd == CommandType.QUIT) 723 return; // Don't bother reopening connection just to quit 724 725 conn._open = Connection.OpenState.notConnected; 726 conn.connect(conn._clientCapabilities); 727 } 728 729 conn.autoPurge(); 730 731 conn.resetPacket(); 732 733 ubyte[] header; 734 header.length = 4 /*header*/ + 1 /*cmd*/; 735 header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/); 736 header[4] = cmd; 737 conn.bumpPacket(); 738 739 conn._socket.send(header, cast(const(ubyte)[])data); 740 } 741 742 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false) 743 { 744 auto okp = OKErrorPacket(conn.getPacket()); 745 enforcePacketOK(okp); 746 conn._serverStatus = okp.serverStatus; 747 return okp; 748 } 749 750 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token) 751 in 752 { 753 assert(token.length == 20); 754 } 755 do 756 { 757 ubyte[] packet; 758 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1); 759 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 760 761 // NOTE: we'll set the header last when we know the size 762 763 // Set the default capabilities required by the client 764 conn._cCaps.packInto(packet[4..8]); 765 766 // Request a conventional maximum packet length. 767 1.packInto(packet[8..12]); 768 769 packet ~= getDefaultCollation(conn._serverVersion); 770 771 // There's a statutory block of zero bytes here - fill them in. 772 foreach(i; 0 .. 23) 773 packet ~= 0; 774 775 // Add the user name as a null terminated string 776 foreach(i; 0 .. conn._user.length) 777 packet ~= conn._user[i]; 778 packet ~= 0; // \0 779 780 // Add our calculated authentication token as a length prefixed string. 781 assert(token.length <= ubyte.max); 782 if(conn._pwd.length == 0) // Omit the token if the account has no password 783 packet ~= 0; 784 else 785 { 786 packet ~= cast(ubyte)token.length; 787 foreach(i; 0 .. token.length) 788 packet ~= token[i]; 789 } 790 791 // Add the default database as a null terminated string 792 foreach(i; 0 .. conn._db.length) 793 packet ~= conn._db[i]; 794 packet ~= 0; // \0 795 796 // The server sent us a greeting with packet number 0, so we send the auth packet 797 // back with the next number. 798 packet.setPacketHeader(conn.pktNumber); 799 conn.bumpPacket(); 800 return packet; 801 } 802 803 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf) 804 { 805 auto pass1 = sha1Of(cast(const(ubyte)[])password); 806 auto pass2 = sha1Of(pass1); 807 808 SHA1 sha1; 809 sha1.start(); 810 sha1.put(authBuf); 811 sha1.put(pass2); 812 auto result = sha1.finish(); 813 foreach (size_t i; 0..20) 814 result[i] = result[i] ^ pass1[i]; 815 return result.dup; 816 } 817 818 /// Get the next `mysql.result.Row` of a pending result set. 819 package(mysql) SafeRow getNextRow(Connection conn) 820 { 821 scope(failure) conn.kill(); 822 823 if (conn._headersPending) 824 { 825 conn._rsh = ResultSetHeaders(conn, conn._fieldCount); 826 conn._headersPending = false; 827 } 828 ubyte[] packet; 829 SafeRow rr; 830 packet = conn.getPacket(); 831 if(packet.front == ResultPacketMarker.error) 832 throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__); 833 834 if (packet.isEOFPacket()) 835 { 836 conn._rowsPending = conn._binaryPending = false; 837 return rr; 838 } 839 if (conn._binaryPending) 840 rr = SafeRow(conn, packet, conn._rsh, true); 841 else 842 rr = SafeRow(conn, packet, conn._rsh, false); 843 //rr._valid = true; 844 return rr; 845 } 846 847 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet) 848 { 849 scope(failure) conn.kill(); 850 851 conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 852 conn._sCharSet = packet.consume!ubyte(); // server_language 853 conn._serverStatus = packet.consume!ushort(); //server_status 854 conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 855 conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 856 857 enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 858 enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 859 } 860 861 package(mysql) ubyte[] parseGreeting(Connection conn) 862 { 863 scope(failure) conn.kill(); 864 865 ubyte[] packet = conn.getPacket(); 866 867 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 868 { 869 auto okp = OKErrorPacket(packet); 870 enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 871 } 872 873 conn._protocol = packet.consume!ubyte(); 874 875 conn._serverVersion = packet.consume!string(packet.countUntil(0)); 876 packet.skip(1); // \0 terminated _serverVersion 877 878 conn._sThread = packet.consume!uint(); 879 880 // read first part of scramble buf 881 ubyte[] authBuf; 882 authBuf.length = 255; 883 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 884 885 enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 886 887 conn.consumeServerInfo(packet); 888 889 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 890 packet.skip(10); // filler of \0 891 892 // rest of the scramble 893 auto len = packet.countUntil(0); 894 enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 895 enforce(authBuf.length > 8+len); 896 authBuf[8..8+len] = packet.consume(len)[]; 897 authBuf.length = 8+len; // cut to correct size 898 enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 899 900 return authBuf; 901 } 902 903 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 904 { 905 SvrCapFlags common; 906 uint filter = 1; 907 foreach (size_t i; 0..uint.sizeof) 908 { 909 bool serverSupport = (server & filter) != 0; // can the server do this capability? 910 bool clientSupport = (client & filter) != 0; // can we support it? 911 if(serverSupport && clientSupport) 912 common |= filter; 913 filter <<= 1; // check next flag 914 } 915 return common; 916 } 917 918 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags) 919 { 920 auto cCaps = getCommonCapabilities(serverCaps, capFlags); 921 922 // We cannot operate in <4.1 protocol, so we'll force it even if the user 923 // didn't supply it 924 cCaps |= SvrCapFlags.PROTOCOL41; 925 cCaps |= SvrCapFlags.SECURE_CONNECTION; 926 927 return cCaps; 928 } 929 930 package(mysql) void authenticate(Connection conn, ubyte[] greeting) 931 in 932 { 933 assert(conn._open == Connection.OpenState.connected); 934 } 935 out 936 { 937 assert(conn._open == Connection.OpenState.authenticated); 938 } 939 do 940 { 941 auto token = makeToken(conn._pwd, greeting); 942 auto authPacket = conn.buildAuthPacket(token); 943 conn._socket.send(authPacket); 944 945 auto packet = conn.getPacket(); 946 auto okp = OKErrorPacket(packet); 947 948 if(okp.error) { 949 logError("Authentication failure: %s", cast(string) okp.message); 950 } 951 952 enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 953 conn._open = Connection.OpenState.authenticated; 954 } 955 956 // Register prepared statement 957 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql) 958 { 959 scope(failure) conn.kill(); 960 961 PreparedServerInfo info; 962 963 conn.sendCmd(CommandType.STMT_PREPARE, sql); 964 conn._fieldCount = 0; 965 966 ubyte[] packet = conn.getPacket(); 967 if(packet.front == ResultPacketMarker.ok) 968 { 969 packet.popFront(); 970 info.statementId = packet.consume!int(); 971 conn._fieldCount = packet.consume!short(); 972 info.numParams = packet.consume!short(); 973 974 packet.popFront(); // one byte filler 975 info.psWarnings = packet.consume!short(); 976 977 // At this point the server also sends field specs for parameters 978 // and columns if there were any of each 979 info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams); 980 } 981 else if(packet.front == ResultPacketMarker.error) 982 { 983 auto error = OKErrorPacket(packet); 984 enforcePacketOK(error); 985 logCritical("Unexpected failure: %s", cast(string) error.message); 986 assert(0); // FIXME: what now? 987 } 988 else 989 assert(0); // FIXME: what now? 990 991 return info; 992 } 993 994 /++ 995 Flush any outstanding result set elements. 996 997 When the server responds to a command that produces a result set, it 998 queues the whole set of corresponding packets over the current connection. 999 Before that `Connection` can embark on any new command, it must receive 1000 all of those packets and junk them. 1001 1002 As of v1.1.4, this is done automatically as needed. But you can still 1003 call this manually to force a purge to occur when you want. 1004 1005 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/) 1006 +/ 1007 package(mysql) ulong purgeResult(Connection conn) 1008 { 1009 scope(failure) conn.kill(); 1010 1011 conn._lastCommandID++; 1012 1013 ulong rows = 0; 1014 if (conn._headersPending) 1015 { 1016 for (size_t i = 0;; i++) 1017 { 1018 if (conn.getPacket().isEOFPacket()) 1019 { 1020 conn._headersPending = false; 1021 break; 1022 } 1023 enforce!MYXProtocol(i < conn._fieldCount, 1024 text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found.")); 1025 } 1026 } 1027 if (conn._rowsPending) 1028 { 1029 for (;; rows++) 1030 { 1031 if (conn.getPacket().isEOFPacket()) 1032 { 1033 conn._rowsPending = conn._binaryPending = false; 1034 break; 1035 } 1036 } 1037 } 1038 conn.resetPacket(); 1039 return rows; 1040 } 1041 1042 /++ 1043 Get a textual report on the server status. 1044 1045 (COM_STATISTICS) 1046 +/ 1047 package(mysql) string serverStats(Connection conn) 1048 { 1049 conn.sendCmd(CommandType.STATISTICS, []); 1050 auto result = conn.getPacket(); 1051 return (() @trusted => cast(string)result)(); 1052 } 1053 1054 /++ 1055 Enable multiple statement commands. 1056 1057 This can be used later if this feature was not requested in the client capability flags. 1058 1059 Warning: This functionality is currently untested. 1060 1061 Params: 1062 conn = The connection. 1063 on = Boolean value to turn the capability on or off. 1064 +/ 1065 //TODO: Need to test this 1066 package(mysql) void enableMultiStatements(Connection conn, bool on) 1067 { 1068 scope(failure) conn.kill(); 1069 1070 ubyte[] t; 1071 t.length = 2; 1072 t[0] = on ? 0 : 1; 1073 t[1] = 0; 1074 conn.sendCmd(CommandType.STMT_OPTION, t); 1075 1076 // For some reason this command gets an EOF packet as response 1077 auto packet = conn.getPacket(); 1078 enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1079 } 1080 1081 private ubyte getDefaultCollation(string serverVersion) 1082 { 1083 import std.array : array; 1084 // MySQL >= 5.5.3 supports utf8mb4 1085 const v = serverVersion 1086 .splitter('.') 1087 .map!(a => a.parse!ushort) 1088 .array; 1089 1090 if (v[0] < 5) 1091 return 33; // Set utf8_general_ci as default 1092 if (v[1] < 5) 1093 return 33; // Set utf8_general_ci as default 1094 if (v[2] < 3) 1095 return 33; // Set utf8_general_ci as default 1096 1097 return 45; // Set utf8mb4_general_ci as default 1098 } 1099 1100 unittest 1101 { 1102 assert(getDefaultCollation("5.5.3") == 45); 1103 assert(getDefaultCollation("5.5.2") == 33); 1104 1105 // MariaDB: https://mariadb.com/kb/en/connection/#initial-handshake-packet 1106 assert(getDefaultCollation("5.5.5-10.0.7-MariaDB") == 45); 1107 }