1 /++ 2 Internal - Low-level communications. 3 4 Consider this module the main entry point for the low-level MySQL/MariaDB 5 protocol code. The other modules in `mysql.protocol` are mainly tools 6 to support this module. 7 8 Previously, the code handling low-level protocol details was scattered all 9 across the library. Such functionality has been factored out into this module, 10 to be kept in one place for better encapsulation and to facilitate further 11 cleanup and refactoring. 12 13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it 14 eventually settles into what will eventually become a low-level library 15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight... 16 17 Next tasks for this sub-package's cleanup: 18 - Reduce this module's reliance on Connection. 19 - Abstract out a PacketStream to clean up getPacket and related functionality. 20 +/ 21 module mysql.protocol.comms; 22 23 import std.algorithm; 24 import std.array; 25 import std.conv; 26 import std.digest.sha; 27 import std.exception; 28 import std.range; 29 import std.variant; 30 31 import mysql.connection; 32 import mysql.exceptions; 33 import mysql.prepared; 34 import mysql.result; 35 36 import mysql.protocol.constants; 37 import mysql.protocol.extra_types; 38 import mysql.protocol.packet_helpers; 39 import mysql.protocol.packets; 40 import mysql.protocol.sockets; 41 42 /// Low-level comms code relating to prepared statements. 43 package struct ProtocolPrepared 44 { 45 import std.conv; 46 import std.datetime; 47 import std.variant; 48 import mysql.types; 49 50 static ubyte[] makeBitmap(in Variant[] inParams) 51 { 52 size_t bml = (inParams.length+7)/8; 53 ubyte[] bma; 54 bma.length = bml; 55 foreach (i; 0..inParams.length) 56 { 57 if(inParams[i].type != typeid(typeof(null))) 58 continue; 59 size_t bn = i/8; 60 size_t bb = i%8; 61 ubyte sr = 1; 62 sr <<= bb; 63 bma[bn] |= sr; 64 } 65 return bma; 66 } 67 68 static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow 69 { 70 ubyte[] prefix; 71 prefix.length = 14; 72 73 prefix[4] = CommandType.STMT_EXECUTE; 74 hStmt.packInto(prefix[5..9]); 75 prefix[9] = flags; // flags, no cursor 76 prefix[10] = 1; // iteration count - currently always 1 77 prefix[11] = 0; 78 prefix[12] = 0; 79 prefix[13] = 0; 80 81 return prefix; 82 } 83 84 static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa, 85 out ubyte[] vals, out bool longData) 86 { 87 size_t pc = inParams.length; 88 ubyte[] types; 89 types.length = pc*2; 90 size_t alloc = pc*20; 91 vals.length = alloc; 92 uint vcl = 0, len; 93 int ct = 0; 94 95 void reAlloc(size_t n) 96 { 97 if (vcl+n < alloc) 98 return; 99 size_t inc = (alloc*3)/2; 100 if (inc < n) 101 inc = n; 102 alloc += inc; 103 vals.length = alloc; 104 } 105 106 foreach (size_t i; 0..pc) 107 { 108 enum UNSIGNED = 0x80; 109 enum SIGNED = 0; 110 if (psa[i].chunkSize) 111 longData= true; 112 if (inParams[i].type == typeid(typeof(null))) 113 { 114 types[ct++] = SQLType.NULL; 115 types[ct++] = SIGNED; 116 continue; 117 } 118 Variant v = inParams[i]; 119 SQLType ext = psa[i].type; 120 string ts = v.type.toString(); 121 bool isRef; 122 if (ts[$-1] == '*') 123 { 124 ts.length = ts.length-1; 125 isRef= true; 126 } 127 128 switch (ts) 129 { 130 case "bool": 131 case "const(bool)": 132 case "immutable(bool)": 133 case "shared(immutable(bool))": 134 if (ext == SQLType.INFER_FROM_D_TYPE) 135 types[ct++] = SQLType.BIT; 136 else 137 types[ct++] = cast(ubyte) ext; 138 types[ct++] = SIGNED; 139 reAlloc(2); 140 bool bv = isRef? *(v.get!(const(bool*))): v.get!(const(bool)); 141 vals[vcl++] = 1; 142 vals[vcl++] = bv? 0x31: 0x30; 143 break; 144 case "byte": 145 case "const(byte)": 146 case "immutable(byte)": 147 case "shared(immutable(byte))": 148 types[ct++] = SQLType.TINY; 149 types[ct++] = SIGNED; 150 reAlloc(1); 151 vals[vcl++] = isRef? *(v.get!(const(byte*))): v.get!(const(byte)); 152 break; 153 case "ubyte": 154 case "const(ubyte)": 155 case "immutable(ubyte)": 156 case "shared(immutable(ubyte))": 157 types[ct++] = SQLType.TINY; 158 types[ct++] = UNSIGNED; 159 reAlloc(1); 160 vals[vcl++] = isRef? *(v.get!(const(ubyte*))): v.get!(const(ubyte)); 161 break; 162 case "short": 163 case "const(short)": 164 case "immutable(short)": 165 case "shared(immutable(short))": 166 types[ct++] = SQLType.SHORT; 167 types[ct++] = SIGNED; 168 reAlloc(2); 169 short si = isRef? *(v.get!(const(short*))): v.get!(const(short)); 170 vals[vcl++] = cast(ubyte) (si & 0xff); 171 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 172 break; 173 case "ushort": 174 case "const(ushort)": 175 case "immutable(ushort)": 176 case "shared(immutable(ushort))": 177 types[ct++] = SQLType.SHORT; 178 types[ct++] = UNSIGNED; 179 reAlloc(2); 180 ushort us = isRef? *(v.get!(const(ushort*))): v.get!(const(ushort)); 181 vals[vcl++] = cast(ubyte) (us & 0xff); 182 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 183 break; 184 case "int": 185 case "const(int)": 186 case "immutable(int)": 187 case "shared(immutable(int))": 188 types[ct++] = SQLType.INT; 189 types[ct++] = SIGNED; 190 reAlloc(4); 191 int ii = isRef? *(v.get!(const(int*))): v.get!(const(int)); 192 vals[vcl++] = cast(ubyte) (ii & 0xff); 193 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 194 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 195 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 196 break; 197 case "uint": 198 case "const(uint)": 199 case "immutable(uint)": 200 case "shared(immutable(uint))": 201 types[ct++] = SQLType.INT; 202 types[ct++] = UNSIGNED; 203 reAlloc(4); 204 uint ui = isRef? *(v.get!(const(uint*))): v.get!(const(uint)); 205 vals[vcl++] = cast(ubyte) (ui & 0xff); 206 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 207 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 208 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 209 break; 210 case "long": 211 case "const(long)": 212 case "immutable(long)": 213 case "shared(immutable(long))": 214 types[ct++] = SQLType.LONGLONG; 215 types[ct++] = SIGNED; 216 reAlloc(8); 217 long li = isRef? *(v.get!(const(long*))): v.get!(const(long)); 218 vals[vcl++] = cast(ubyte) (li & 0xff); 219 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 220 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 221 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 222 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 223 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 224 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 225 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 226 break; 227 case "ulong": 228 case "const(ulong)": 229 case "immutable(ulong)": 230 case "shared(immutable(ulong))": 231 types[ct++] = SQLType.LONGLONG; 232 types[ct++] = UNSIGNED; 233 reAlloc(8); 234 ulong ul = isRef? *(v.get!(const(ulong*))): v.get!(const(ulong)); 235 vals[vcl++] = cast(ubyte) (ul & 0xff); 236 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 237 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 238 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 239 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 240 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 241 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 242 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 243 break; 244 case "float": 245 case "const(float)": 246 case "immutable(float)": 247 case "shared(immutable(float))": 248 types[ct++] = SQLType.FLOAT; 249 types[ct++] = SIGNED; 250 reAlloc(4); 251 float f = isRef? *(v.get!(const(float*))): v.get!(const(float)); 252 ubyte* ubp = cast(ubyte*) &f; 253 vals[vcl++] = *ubp++; 254 vals[vcl++] = *ubp++; 255 vals[vcl++] = *ubp++; 256 vals[vcl++] = *ubp; 257 break; 258 case "double": 259 case "const(double)": 260 case "immutable(double)": 261 case "shared(immutable(double))": 262 types[ct++] = SQLType.DOUBLE; 263 types[ct++] = SIGNED; 264 reAlloc(8); 265 double d = isRef? *(v.get!(const(double*))): v.get!(const(double)); 266 ubyte* ubp = cast(ubyte*) &d; 267 vals[vcl++] = *ubp++; 268 vals[vcl++] = *ubp++; 269 vals[vcl++] = *ubp++; 270 vals[vcl++] = *ubp++; 271 vals[vcl++] = *ubp++; 272 vals[vcl++] = *ubp++; 273 vals[vcl++] = *ubp++; 274 vals[vcl++] = *ubp; 275 break; 276 case "std.datetime.date.Date": 277 case "const(std.datetime.date.Date)": 278 case "immutable(std.datetime.date.Date)": 279 case "shared(immutable(std.datetime.date.Date))": 280 281 case "std.datetime.Date": 282 case "const(std.datetime.Date)": 283 case "immutable(std.datetime.Date)": 284 case "shared(immutable(std.datetime.Date))": 285 types[ct++] = SQLType.DATE; 286 types[ct++] = SIGNED; 287 Date date = isRef? *(v.get!(const(Date*))): v.get!(const(Date)); 288 ubyte[] da = pack(date); 289 size_t l = da.length; 290 reAlloc(l); 291 vals[vcl..vcl+l] = da[]; 292 vcl += l; 293 break; 294 case "std.datetime.TimeOfDay": 295 case "const(std.datetime.TimeOfDay)": 296 case "immutable(std.datetime.TimeOfDay)": 297 case "shared(immutable(std.datetime.TimeOfDay))": 298 299 case "std.datetime.date.TimeOfDay": 300 case "const(std.datetime.date.TimeOfDay)": 301 case "immutable(std.datetime.date.TimeOfDay)": 302 case "shared(immutable(std.datetime.date.TimeOfDay))": 303 304 case "std.datetime.Time": 305 case "const(std.datetime.Time)": 306 case "immutable(std.datetime.Time)": 307 case "shared(immutable(std.datetime.Time))": 308 types[ct++] = SQLType.TIME; 309 types[ct++] = SIGNED; 310 TimeOfDay time = isRef? *(v.get!(const(TimeOfDay*))): v.get!(const(TimeOfDay)); 311 ubyte[] ta = pack(time); 312 size_t l = ta.length; 313 reAlloc(l); 314 vals[vcl..vcl+l] = ta[]; 315 vcl += l; 316 break; 317 case "std.datetime.date.DateTime": 318 case "const(std.datetime.date.DateTime)": 319 case "immutable(std.datetime.date.DateTime)": 320 case "shared(immutable(std.datetime.date.DateTime))": 321 322 case "std.datetime.DateTime": 323 case "const(std.datetime.DateTime)": 324 case "immutable(std.datetime.DateTime)": 325 case "shared(immutable(std.datetime.DateTime))": 326 types[ct++] = SQLType.DATETIME; 327 types[ct++] = SIGNED; 328 DateTime dt = isRef? *(v.get!(const(DateTime*))): v.get!(const(DateTime)); 329 ubyte[] da = pack(dt); 330 size_t l = da.length; 331 reAlloc(l); 332 vals[vcl..vcl+l] = da[]; 333 vcl += l; 334 break; 335 case "mysql.types.Timestamp": 336 case "const(mysql.types.Timestamp)": 337 case "immutable(mysql.types.Timestamp)": 338 case "shared(immutable(mysql.types.Timestamp))": 339 types[ct++] = SQLType.TIMESTAMP; 340 types[ct++] = SIGNED; 341 Timestamp tms = isRef? *(v.get!(const(Timestamp*))): v.get!(const(Timestamp)); 342 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 343 ubyte[] da = pack(dt); 344 size_t l = da.length; 345 reAlloc(l); 346 vals[vcl..vcl+l] = da[]; 347 vcl += l; 348 break; 349 case "char[]": 350 case "const(char[])": 351 case "immutable(char[])": 352 case "const(char)[]": 353 case "immutable(char)[]": 354 case "shared(immutable(char)[])": 355 case "shared(immutable(char))[]": 356 case "shared(immutable(char[]))": 357 if (ext == SQLType.INFER_FROM_D_TYPE) 358 types[ct++] = SQLType.VARCHAR; 359 else 360 types[ct++] = cast(ubyte) ext; 361 types[ct++] = SIGNED; 362 const char[] ca = isRef? *(v.get!(const(char[]*))): v.get!(const(char[])); 363 ubyte[] packed = packLCS(cast(void[]) ca); 364 reAlloc(packed.length); 365 vals[vcl..vcl+packed.length] = packed[]; 366 vcl += packed.length; 367 break; 368 case "byte[]": 369 case "const(byte[])": 370 case "immutable(byte[])": 371 case "const(byte)[]": 372 case "immutable(byte)[]": 373 case "shared(immutable(byte)[])": 374 case "shared(immutable(byte))[]": 375 case "shared(immutable(byte[]))": 376 if (ext == SQLType.INFER_FROM_D_TYPE) 377 types[ct++] = SQLType.TINYBLOB; 378 else 379 types[ct++] = cast(ubyte) ext; 380 types[ct++] = SIGNED; 381 const byte[] ba = isRef? *(v.get!(const(byte[]*))): v.get!(const(byte[])); 382 ubyte[] packed = packLCS(cast(void[]) ba); 383 reAlloc(packed.length); 384 vals[vcl..vcl+packed.length] = packed[]; 385 vcl += packed.length; 386 break; 387 case "ubyte[]": 388 case "const(ubyte[])": 389 case "immutable(ubyte[])": 390 case "const(ubyte)[]": 391 case "immutable(ubyte)[]": 392 case "shared(immutable(ubyte)[])": 393 case "shared(immutable(ubyte))[]": 394 case "shared(immutable(ubyte[]))": 395 if (ext == SQLType.INFER_FROM_D_TYPE) 396 types[ct++] = SQLType.TINYBLOB; 397 else 398 types[ct++] = cast(ubyte) ext; 399 types[ct++] = SIGNED; 400 const ubyte[] uba = isRef? *(v.get!(const(ubyte[]*))): v.get!(const(ubyte[])); 401 ubyte[] packed = packLCS(cast(void[]) uba); 402 reAlloc(packed.length); 403 vals[vcl..vcl+packed.length] = packed[]; 404 vcl += packed.length; 405 break; 406 case "void": 407 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 408 default: 409 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 410 } 411 } 412 vals.length = vcl; 413 return types; 414 } 415 416 static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa) 417 { 418 assert(psa.length <= ushort.max); // parameter number is sent as short 419 foreach (size_t i, PSN psn; psa) 420 { 421 if (!psn.chunkSize) continue; 422 uint cs = psn.chunkSize; 423 uint delegate(ubyte[]) dg = psn.chunkDelegate; 424 425 ubyte[] chunk; 426 chunk.length = cs+11; 427 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 428 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 429 hStmt.packInto(chunk[5..9]); // statement handle 430 packInto(cast(ushort)i, chunk[9..11]); // parameter number 431 432 // byte 11 on is payload 433 for (;;) 434 { 435 uint sent = dg(chunk[11..cs+11]); 436 if (sent < cs) 437 { 438 if (sent == 0) // data was exact multiple of chunk size - all sent 439 break; 440 chunk.length = chunk.length - (cs-sent); // trim the chunk 441 sent += 7; // adjust for non-payload bytes 442 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 443 socket.send(chunk); 444 break; 445 } 446 socket.send(chunk); 447 } 448 } 449 } 450 451 static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, 452 Variant[] inParams, ParameterSpecialization[] psa) 453 { 454 conn.autoPurge(); 455 456 ubyte[] packet; 457 conn.resetPacket(); 458 459 ubyte[] prefix = makePSPrefix(hStmt, 0); 460 size_t len = prefix.length; 461 bool longData; 462 463 if (psh.paramCount) 464 { 465 ubyte[] one = [ 1 ]; 466 ubyte[] vals; 467 ubyte[] types = analyseParams(inParams, psa, vals, longData); 468 ubyte[] nbm = makeBitmap(inParams); 469 packet = prefix ~ nbm ~ one ~ types ~ vals; 470 } 471 else 472 packet = prefix; 473 474 if (longData) 475 sendLongData(conn._socket, hStmt, psa); 476 477 assert(packet.length <= uint.max); 478 packet.setPacketHeader(conn.pktNumber); 479 conn.bumpPacket(); 480 conn._socket.send(packet); 481 } 482 } 483 484 package(mysql) struct ExecQueryImplInfo 485 { 486 bool isPrepared; 487 488 // For non-prepared statements: 489 const(char[]) sql; 490 491 // For prepared statements: 492 uint hStmt; 493 PreparedStmtHeaders psh; 494 Variant[] inParams; 495 ParameterSpecialization[] psa; 496 } 497 498 /++ 499 Internal implementation for the exec and query functions. 500 501 Execute a one-off SQL command. 502 503 Any result set can be accessed via Connection.getNextRow(), but you should really be 504 using the query function for such queries. 505 506 Params: ra = An out parameter to receive the number of rows affected. 507 Returns: true if there was a (possibly empty) result set. 508 +/ 509 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra) 510 { 511 scope(failure) conn.kill(); 512 513 // Send data 514 if(info.isPrepared) 515 ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa); 516 else 517 { 518 conn.sendCmd(CommandType.QUERY, info.sql); 519 conn._fieldCount = 0; 520 } 521 522 // Handle response 523 ubyte[] packet = conn.getPacket(); 524 bool rv; 525 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 526 { 527 conn.resetPacket(); 528 auto okp = OKErrorPacket(packet); 529 enforcePacketOK(okp); 530 ra = okp.affected; 531 conn._serverStatus = okp.serverStatus; 532 conn._insertID = okp.insertID; 533 rv = false; 534 } 535 else 536 { 537 // There was presumably a result set 538 assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value 539 conn._headersPending = conn._rowsPending = true; 540 conn._binaryPending = info.isPrepared; 541 auto lcb = packet.consumeIfComplete!LCB(); 542 assert(!lcb.isNull); 543 assert(!lcb.isIncomplete); 544 conn._fieldCount = cast(ushort)lcb.value; 545 assert(conn._fieldCount == lcb.value); 546 rv = true; 547 ra = 0; 548 } 549 return rv; 550 } 551 552 ///ditto 553 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info) 554 { 555 ulong rowsAffected; 556 return execQueryImpl(conn, info, rowsAffected); 557 } 558 559 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId) 560 { 561 scope(failure) conn.kill(); 562 563 if(conn.closed()) 564 return; 565 566 ubyte[9] packet_buf; 567 ubyte[] packet = packet_buf; 568 packet.setPacketHeader(0/*packet number*/); 569 conn.bumpPacket(); 570 packet[4] = CommandType.STMT_CLOSE; 571 statementId.packInto(packet[5..9]); 572 conn.purgeResult(); 573 conn._socket.send(packet); 574 // It seems that the server does not find it necessary to send a response 575 // for this command. 576 } 577 578 // Moved here from `struct Row` 579 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure 580 { 581 uint bitmapLength = calcBitmapLength(fieldCount); 582 enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields"); 583 auto bitmap = packet.consume(bitmapLength); 584 return decodeNullBitmap(bitmap, fieldCount); 585 } 586 587 // Moved here from `struct Row` 588 private static uint calcBitmapLength(uint fieldCount) pure nothrow 589 { 590 return (fieldCount+7+2)/8; 591 } 592 593 // Moved here from `struct Row` 594 // This is to decode the bitmap in a binary result row. First two bits are skipped 595 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow 596 in 597 { 598 assert(bitmap.length >= calcBitmapLength(numFields), 599 "bitmap not large enough to store all null fields"); 600 } 601 out(result) 602 { 603 assert(result.length == numFields); 604 } 605 body 606 { 607 bool[] nulls; 608 nulls.length = numFields; 609 610 // the current byte we are processing for nulls 611 ubyte bits = bitmap.front(); 612 // strip away the first two bits as they are reserved 613 bits >>= 2; 614 // .. and then we only have 6 bits left to process for this byte 615 ubyte bitsLeftInByte = 6; 616 foreach(ref isNull; nulls) 617 { 618 assert(bitsLeftInByte <= 8); 619 // processed all bits? fetch new byte 620 if (bitsLeftInByte == 0) 621 { 622 assert(bits == 0, "not all bits are processed!"); 623 assert(!bitmap.empty, "bits array too short for number of columns"); 624 bitmap.popFront(); 625 bits = bitmap.front; 626 bitsLeftInByte = 8; 627 } 628 assert(bitsLeftInByte > 0); 629 isNull = (bits & 0b0000_0001) != 0; 630 631 // get ready to process next bit 632 bits >>= 1; 633 --bitsLeftInByte; 634 } 635 return nulls; 636 } 637 638 // Moved here from `struct Row.this` 639 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary, 640 out Variant[] _values, out bool[] _nulls, out string[] _names) 641 in 642 { 643 assert(rh.fieldCount <= uint.max); 644 } 645 body 646 { 647 scope(failure) conn.kill(); 648 649 uint fieldCount = cast(uint)rh.fieldCount; 650 _values.length = _nulls.length = _names.length = fieldCount; 651 652 if(binary) 653 { 654 // There's a null byte header on a binary result sequence, followed by some bytes of bitmap 655 // indicating which columns are null 656 enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row"); 657 packet.popFront(); 658 _nulls = consumeNullBitmap(packet, fieldCount); 659 } 660 661 foreach(size_t i; 0..fieldCount) 662 { 663 if(binary && _nulls[i]) 664 { 665 _values[i] = null; 666 continue; 667 } 668 669 SQLValue sqlValue; 670 do 671 { 672 FieldDescription fd = rh[i]; 673 _names[i] = fd.name; 674 sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet); 675 // TODO: Support chunk delegate 676 if(sqlValue.isIncomplete) 677 packet ~= conn.getPacket(); 678 } while(sqlValue.isIncomplete); 679 assert(!sqlValue.isIncomplete); 680 681 if(sqlValue.isNull) 682 { 683 assert(!binary); 684 assert(!_nulls[i]); 685 _nulls[i] = true; 686 _values[i] = null; 687 } 688 else 689 { 690 _values[i] = sqlValue.value; 691 } 692 } 693 } 694 695 ////// Moved here from Connection ///////////////////////////////// 696 697 package(mysql) ubyte[] getPacket(Connection conn) 698 { 699 scope(failure) conn.kill(); 700 701 ubyte[4] header; 702 conn._socket.read(header); 703 // number of bytes always set as 24-bit 704 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 705 enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order"); 706 conn.bumpPacket(); 707 708 ubyte[] packet = new ubyte[numDataBytes]; 709 conn._socket.read(packet); 710 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 711 return packet; 712 } 713 714 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet) 715 in 716 { 717 assert(packet.length > 4); // at least 1 byte more than header 718 } 719 body 720 { 721 _socket.write(packet); 722 } 723 724 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data) 725 in 726 { 727 assert(header.length == 4 || header.length == 5/*command type included*/); 728 } 729 body 730 { 731 _socket.write(header); 732 if(data.length) 733 _socket.write(data); 734 } 735 736 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data) 737 in 738 { 739 // Internal thread states. Clients shouldn't use this 740 assert(cmd != CommandType.SLEEP); 741 assert(cmd != CommandType.CONNECT); 742 assert(cmd != CommandType.TIME); 743 assert(cmd != CommandType.DELAYED_INSERT); 744 assert(cmd != CommandType.CONNECT_OUT); 745 746 // Deprecated 747 assert(cmd != CommandType.CREATE_DB); 748 assert(cmd != CommandType.DROP_DB); 749 assert(cmd != CommandType.TABLE_DUMP); 750 751 // cannot send more than uint.max bytes. TODO: better error message if we try? 752 assert(data.length <= uint.max); 753 } 754 out 755 { 756 // at this point we should have sent a command 757 assert(conn.pktNumber == 1); 758 } 759 body 760 { 761 scope(failure) conn.kill(); 762 763 conn._lastCommandID++; 764 765 if(!conn._socket.connected) 766 { 767 if(cmd == CommandType.QUIT) 768 return; // Don't bother reopening connection just to quit 769 770 conn._open = Connection.OpenState.notConnected; 771 conn.connect(conn._clientCapabilities); 772 } 773 774 conn.autoPurge(); 775 776 conn.resetPacket(); 777 778 ubyte[] header; 779 header.length = 4 /*header*/ + 1 /*cmd*/; 780 header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/); 781 header[4] = cmd; 782 conn.bumpPacket(); 783 784 conn._socket.send(header, cast(const(ubyte)[])data); 785 } 786 787 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false) 788 { 789 auto okp = OKErrorPacket(conn.getPacket()); 790 enforcePacketOK(okp); 791 conn._serverStatus = okp.serverStatus; 792 return okp; 793 } 794 795 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token) 796 in 797 { 798 assert(token.length == 20); 799 } 800 body 801 { 802 ubyte[] packet; 803 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1); 804 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 805 806 // NOTE: we'll set the header last when we know the size 807 808 // Set the default capabilities required by the client 809 conn._cCaps.packInto(packet[4..8]); 810 811 // Request a conventional maximum packet length. 812 1.packInto(packet[8..12]); 813 814 packet ~= getDefaultCollation(conn._serverVersion); 815 816 // There's a statutory block of zero bytes here - fill them in. 817 foreach(i; 0 .. 23) 818 packet ~= 0; 819 820 // Add the user name as a null terminated string 821 foreach(i; 0 .. conn._user.length) 822 packet ~= conn._user[i]; 823 packet ~= 0; // \0 824 825 // Add our calculated authentication token as a length prefixed string. 826 assert(token.length <= ubyte.max); 827 if(conn._pwd.length == 0) // Omit the token if the account has no password 828 packet ~= 0; 829 else 830 { 831 packet ~= cast(ubyte)token.length; 832 foreach(i; 0 .. token.length) 833 packet ~= token[i]; 834 } 835 836 // Add the default database as a null terminated string 837 foreach(i; 0 .. conn._db.length) 838 packet ~= conn._db[i]; 839 packet ~= 0; // \0 840 841 // The server sent us a greeting with packet number 0, so we send the auth packet 842 // back with the next number. 843 packet.setPacketHeader(conn.pktNumber); 844 conn.bumpPacket(); 845 return packet; 846 } 847 848 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf) 849 { 850 auto pass1 = sha1Of(cast(const(ubyte)[])password); 851 auto pass2 = sha1Of(pass1); 852 853 SHA1 sha1; 854 sha1.start(); 855 sha1.put(authBuf); 856 sha1.put(pass2); 857 auto result = sha1.finish(); 858 foreach (size_t i; 0..20) 859 result[i] = result[i] ^ pass1[i]; 860 return result.dup; 861 } 862 863 /// Get the next `mysql.result.Row` of a pending result set. 864 package(mysql) Row getNextRow(Connection conn) 865 { 866 scope(failure) conn.kill(); 867 868 if (conn._headersPending) 869 { 870 conn._rsh = ResultSetHeaders(conn, conn._fieldCount); 871 conn._headersPending = false; 872 } 873 ubyte[] packet; 874 Row rr; 875 packet = conn.getPacket(); 876 if(packet.front == ResultPacketMarker.error) 877 throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__); 878 879 if (packet.isEOFPacket()) 880 { 881 conn._rowsPending = conn._binaryPending = false; 882 return rr; 883 } 884 if (conn._binaryPending) 885 rr = Row(conn, packet, conn._rsh, true); 886 else 887 rr = Row(conn, packet, conn._rsh, false); 888 //rr._valid = true; 889 return rr; 890 } 891 892 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet) 893 { 894 scope(failure) conn.kill(); 895 896 conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 897 conn._sCharSet = packet.consume!ubyte(); // server_language 898 conn._serverStatus = packet.consume!ushort(); //server_status 899 conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 900 conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 901 902 enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 903 enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 904 } 905 906 package(mysql) ubyte[] parseGreeting(Connection conn) 907 { 908 scope(failure) conn.kill(); 909 910 ubyte[] packet = conn.getPacket(); 911 912 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 913 { 914 auto okp = OKErrorPacket(packet); 915 enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 916 } 917 918 conn._protocol = packet.consume!ubyte(); 919 920 conn._serverVersion = packet.consume!string(packet.countUntil(0)); 921 packet.skip(1); // \0 terminated _serverVersion 922 923 conn._sThread = packet.consume!uint(); 924 925 // read first part of scramble buf 926 ubyte[] authBuf; 927 authBuf.length = 255; 928 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 929 930 enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 931 932 conn.consumeServerInfo(packet); 933 934 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 935 packet.skip(10); // filler of \0 936 937 // rest of the scramble 938 auto len = packet.countUntil(0); 939 enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 940 enforce(authBuf.length > 8+len); 941 authBuf[8..8+len] = packet.consume(len)[]; 942 authBuf.length = 8+len; // cut to correct size 943 enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 944 945 return authBuf; 946 } 947 948 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 949 { 950 SvrCapFlags common; 951 uint filter = 1; 952 foreach (size_t i; 0..uint.sizeof) 953 { 954 bool serverSupport = (server & filter) != 0; // can the server do this capability? 955 bool clientSupport = (client & filter) != 0; // can we support it? 956 if(serverSupport && clientSupport) 957 common |= filter; 958 filter <<= 1; // check next flag 959 } 960 return common; 961 } 962 963 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags) 964 { 965 auto cCaps = getCommonCapabilities(serverCaps, capFlags); 966 967 // We cannot operate in <4.1 protocol, so we'll force it even if the user 968 // didn't supply it 969 cCaps |= SvrCapFlags.PROTOCOL41; 970 cCaps |= SvrCapFlags.SECURE_CONNECTION; 971 972 return cCaps; 973 } 974 975 package(mysql) void authenticate(Connection conn, ubyte[] greeting) 976 in 977 { 978 assert(conn._open == Connection.OpenState.connected); 979 } 980 out 981 { 982 assert(conn._open == Connection.OpenState.authenticated); 983 } 984 body 985 { 986 auto token = makeToken(conn._pwd, greeting); 987 auto authPacket = conn.buildAuthPacket(token); 988 conn._socket.send(authPacket); 989 990 auto packet = conn.getPacket(); 991 auto okp = OKErrorPacket(packet); 992 enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 993 conn._open = Connection.OpenState.authenticated; 994 } 995 996 // Register prepared statement 997 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql) 998 { 999 scope(failure) conn.kill(); 1000 1001 PreparedServerInfo info; 1002 1003 conn.sendCmd(CommandType.STMT_PREPARE, sql); 1004 conn._fieldCount = 0; 1005 1006 ubyte[] packet = conn.getPacket(); 1007 if(packet.front == ResultPacketMarker.ok) 1008 { 1009 packet.popFront(); 1010 info.statementId = packet.consume!int(); 1011 conn._fieldCount = packet.consume!short(); 1012 info.numParams = packet.consume!short(); 1013 1014 packet.popFront(); // one byte filler 1015 info.psWarnings = packet.consume!short(); 1016 1017 // At this point the server also sends field specs for parameters 1018 // and columns if there were any of each 1019 info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams); 1020 } 1021 else if(packet.front == ResultPacketMarker.error) 1022 { 1023 auto error = OKErrorPacket(packet); 1024 enforcePacketOK(error); 1025 assert(0); // FIXME: what now? 1026 } 1027 else 1028 assert(0); // FIXME: what now? 1029 1030 return info; 1031 } 1032 1033 /++ 1034 Flush any outstanding result set elements. 1035 1036 When the server responds to a command that produces a result set, it 1037 queues the whole set of corresponding packets over the current connection. 1038 Before that `Connection` can embark on any new command, it must receive 1039 all of those packets and junk them. 1040 1041 As of v1.1.4, this is done automatically as needed. But you can still 1042 call this manually to force a purge to occur when you want. 1043 1044 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/) 1045 +/ 1046 package(mysql) ulong purgeResult(Connection conn) 1047 { 1048 scope(failure) conn.kill(); 1049 1050 conn._lastCommandID++; 1051 1052 ulong rows = 0; 1053 if (conn._headersPending) 1054 { 1055 for (size_t i = 0;; i++) 1056 { 1057 if (conn.getPacket().isEOFPacket()) 1058 { 1059 conn._headersPending = false; 1060 break; 1061 } 1062 enforce!MYXProtocol(i < conn._fieldCount, 1063 text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found.")); 1064 } 1065 } 1066 if (conn._rowsPending) 1067 { 1068 for (;; rows++) 1069 { 1070 if (conn.getPacket().isEOFPacket()) 1071 { 1072 conn._rowsPending = conn._binaryPending = false; 1073 break; 1074 } 1075 } 1076 } 1077 conn.resetPacket(); 1078 return rows; 1079 } 1080 1081 /++ 1082 Get a textual report on the server status. 1083 1084 (COM_STATISTICS) 1085 +/ 1086 package(mysql) string serverStats(Connection conn) 1087 { 1088 conn.sendCmd(CommandType.STATISTICS, []); 1089 return cast(string) conn.getPacket(); 1090 } 1091 1092 /++ 1093 Enable multiple statement commands. 1094 1095 This can be used later if this feature was not requested in the client capability flags. 1096 1097 Warning: This functionality is currently untested. 1098 1099 Params: on = Boolean value to turn the capability on or off. 1100 +/ 1101 //TODO: Need to test this 1102 package(mysql) void enableMultiStatements(Connection conn, bool on) 1103 { 1104 scope(failure) conn.kill(); 1105 1106 ubyte[] t; 1107 t.length = 2; 1108 t[0] = on ? 0 : 1; 1109 t[1] = 0; 1110 conn.sendCmd(CommandType.STMT_OPTION, t); 1111 1112 // For some reason this command gets an EOF packet as response 1113 auto packet = conn.getPacket(); 1114 enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1115 } 1116 1117 private ubyte getDefaultCollation(string serverVersion) 1118 { 1119 // MySQL >= 5.5.3 supports utf8mb4 1120 const v = serverVersion 1121 .splitter('.') 1122 .map!(a => a.parse!ushort) 1123 .array; 1124 1125 if (v[0] < 5) 1126 return 33; // Set utf8_general_ci as default 1127 if (v[1] < 5) 1128 return 33; // Set utf8_general_ci as default 1129 if (v[2] < 3) 1130 return 33; // Set utf8_general_ci as default 1131 1132 return 45; // Set utf8mb4_general_ci as default 1133 } 1134 1135 unittest 1136 { 1137 assert(getDefaultCollation("5.5.3") == 45); 1138 assert(getDefaultCollation("5.5.2") == 33); 1139 1140 // MariaDB: https://mariadb.com/kb/en/connection/#initial-handshake-packet 1141 assert(getDefaultCollation("5.5.5-10.0.7-MariaDB") == 45); 1142 }