1 /++ 2 Internal - Low-level communications. 3 4 Consider this module the main entry point for the low-level MySQL/MariaDB 5 protocol code. The other modules in `mysql.protocol` are mainly tools 6 to support this module. 7 8 Previously, the code handling low-level protocol details was scattered all 9 across the library. Such functionality has been factored out into this module, 10 to be kept in one place for better encapsulation and to facilitate further 11 cleanup and refactoring. 12 13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it 14 eventually settles into what will eventually become a low-level library 15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight... 16 17 Next tasks for this sub-package's cleanup: 18 - Reduce this module's reliance on Connection. 19 - Abstract out a PacketStream to clean up getPacket and related functionality. 20 +/ 21 module mysql.protocol.comms; 22 23 import std.algorithm; 24 import std.conv; 25 import std.digest.sha; 26 import std.exception; 27 import std.range; 28 import std.variant; 29 30 import mysql.connection; 31 import mysql.exceptions; 32 import mysql.prepared; 33 import mysql.result; 34 35 import mysql.protocol.constants; 36 import mysql.protocol.extra_types; 37 import mysql.protocol.packet_helpers; 38 import mysql.protocol.packets; 39 import mysql.protocol.sockets; 40 41 /// Low-level comms code relating to prepared statements. 42 package struct ProtocolPrepared 43 { 44 import std.conv; 45 import std.datetime; 46 import std.variant; 47 import mysql.types; 48 49 static ubyte[] makeBitmap(in Variant[] inParams) 50 { 51 size_t bml = (inParams.length+7)/8; 52 ubyte[] bma; 53 bma.length = bml; 54 foreach (i; 0..inParams.length) 55 { 56 if(inParams[i].type != typeid(typeof(null))) 57 continue; 58 size_t bn = i/8; 59 size_t bb = i%8; 60 ubyte sr = 1; 61 sr <<= bb; 62 bma[bn] |= sr; 63 } 64 return bma; 65 } 66 67 static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow 68 { 69 ubyte[] prefix; 70 prefix.length = 14; 71 72 prefix[4] = CommandType.STMT_EXECUTE; 73 hStmt.packInto(prefix[5..9]); 74 prefix[9] = flags; // flags, no cursor 75 prefix[10] = 1; // iteration count - currently always 1 76 prefix[11] = 0; 77 prefix[12] = 0; 78 prefix[13] = 0; 79 80 return prefix; 81 } 82 83 static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa, 84 out ubyte[] vals, out bool longData) 85 { 86 size_t pc = inParams.length; 87 ubyte[] types; 88 types.length = pc*2; 89 size_t alloc = pc*20; 90 vals.length = alloc; 91 uint vcl = 0, len; 92 int ct = 0; 93 94 void reAlloc(size_t n) 95 { 96 if (vcl+n < alloc) 97 return; 98 size_t inc = (alloc*3)/2; 99 if (inc < n) 100 inc = n; 101 alloc += inc; 102 vals.length = alloc; 103 } 104 105 foreach (size_t i; 0..pc) 106 { 107 enum UNSIGNED = 0x80; 108 enum SIGNED = 0; 109 if (psa[i].chunkSize) 110 longData= true; 111 if (inParams[i].type == typeid(typeof(null))) 112 { 113 types[ct++] = SQLType.NULL; 114 types[ct++] = SIGNED; 115 continue; 116 } 117 Variant v = inParams[i]; 118 SQLType ext = psa[i].type; 119 string ts = v.type.toString(); 120 bool isRef; 121 if (ts[$-1] == '*') 122 { 123 ts.length = ts.length-1; 124 isRef= true; 125 } 126 127 switch (ts) 128 { 129 case "bool": 130 case "const(bool)": 131 case "immutable(bool)": 132 case "shared(immutable(bool))": 133 if (ext == SQLType.INFER_FROM_D_TYPE) 134 types[ct++] = SQLType.BIT; 135 else 136 types[ct++] = cast(ubyte) ext; 137 types[ct++] = SIGNED; 138 reAlloc(2); 139 bool bv = isRef? *(v.get!(const(bool*))): v.get!(const(bool)); 140 vals[vcl++] = 1; 141 vals[vcl++] = bv? 0x31: 0x30; 142 break; 143 case "byte": 144 case "const(byte)": 145 case "immutable(byte)": 146 case "shared(immutable(byte))": 147 types[ct++] = SQLType.TINY; 148 types[ct++] = SIGNED; 149 reAlloc(1); 150 vals[vcl++] = isRef? *(v.get!(const(byte*))): v.get!(const(byte)); 151 break; 152 case "ubyte": 153 case "const(ubyte)": 154 case "immutable(ubyte)": 155 case "shared(immutable(ubyte))": 156 types[ct++] = SQLType.TINY; 157 types[ct++] = UNSIGNED; 158 reAlloc(1); 159 vals[vcl++] = isRef? *(v.get!(const(ubyte*))): v.get!(const(ubyte)); 160 break; 161 case "short": 162 case "const(short)": 163 case "immutable(short)": 164 case "shared(immutable(short))": 165 types[ct++] = SQLType.SHORT; 166 types[ct++] = SIGNED; 167 reAlloc(2); 168 short si = isRef? *(v.get!(const(short*))): v.get!(const(short)); 169 vals[vcl++] = cast(ubyte) (si & 0xff); 170 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 171 break; 172 case "ushort": 173 case "const(ushort)": 174 case "immutable(ushort)": 175 case "shared(immutable(ushort))": 176 types[ct++] = SQLType.SHORT; 177 types[ct++] = UNSIGNED; 178 reAlloc(2); 179 ushort us = isRef? *(v.get!(const(ushort*))): v.get!(const(ushort)); 180 vals[vcl++] = cast(ubyte) (us & 0xff); 181 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 182 break; 183 case "int": 184 case "const(int)": 185 case "immutable(int)": 186 case "shared(immutable(int))": 187 types[ct++] = SQLType.INT; 188 types[ct++] = SIGNED; 189 reAlloc(4); 190 int ii = isRef? *(v.get!(const(int*))): v.get!(const(int)); 191 vals[vcl++] = cast(ubyte) (ii & 0xff); 192 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 193 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 194 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 195 break; 196 case "uint": 197 case "const(uint)": 198 case "immutable(uint)": 199 case "shared(immutable(uint))": 200 types[ct++] = SQLType.INT; 201 types[ct++] = UNSIGNED; 202 reAlloc(4); 203 uint ui = isRef? *(v.get!(const(uint*))): v.get!(const(uint)); 204 vals[vcl++] = cast(ubyte) (ui & 0xff); 205 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 206 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 207 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 208 break; 209 case "long": 210 case "const(long)": 211 case "immutable(long)": 212 case "shared(immutable(long))": 213 types[ct++] = SQLType.LONGLONG; 214 types[ct++] = SIGNED; 215 reAlloc(8); 216 long li = isRef? *(v.get!(const(long*))): v.get!(const(long)); 217 vals[vcl++] = cast(ubyte) (li & 0xff); 218 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 219 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 220 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 221 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 222 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 223 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 224 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 225 break; 226 case "ulong": 227 case "const(ulong)": 228 case "immutable(ulong)": 229 case "shared(immutable(ulong))": 230 types[ct++] = SQLType.LONGLONG; 231 types[ct++] = UNSIGNED; 232 reAlloc(8); 233 ulong ul = isRef? *(v.get!(const(ulong*))): v.get!(const(ulong)); 234 vals[vcl++] = cast(ubyte) (ul & 0xff); 235 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 236 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 237 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 238 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 239 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 240 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 241 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 242 break; 243 case "float": 244 case "const(float)": 245 case "immutable(float)": 246 case "shared(immutable(float))": 247 types[ct++] = SQLType.FLOAT; 248 types[ct++] = SIGNED; 249 reAlloc(4); 250 float f = isRef? *(v.get!(const(float*))): v.get!(const(float)); 251 ubyte* ubp = cast(ubyte*) &f; 252 vals[vcl++] = *ubp++; 253 vals[vcl++] = *ubp++; 254 vals[vcl++] = *ubp++; 255 vals[vcl++] = *ubp; 256 break; 257 case "double": 258 case "const(double)": 259 case "immutable(double)": 260 case "shared(immutable(double))": 261 types[ct++] = SQLType.DOUBLE; 262 types[ct++] = SIGNED; 263 reAlloc(8); 264 double d = isRef? *(v.get!(const(double*))): v.get!(const(double)); 265 ubyte* ubp = cast(ubyte*) &d; 266 vals[vcl++] = *ubp++; 267 vals[vcl++] = *ubp++; 268 vals[vcl++] = *ubp++; 269 vals[vcl++] = *ubp++; 270 vals[vcl++] = *ubp++; 271 vals[vcl++] = *ubp++; 272 vals[vcl++] = *ubp++; 273 vals[vcl++] = *ubp; 274 break; 275 case "std.datetime.date.Date": 276 case "const(std.datetime.date.Date)": 277 case "immutable(std.datetime.date.Date)": 278 case "shared(immutable(std.datetime.date.Date))": 279 280 case "std.datetime.Date": 281 case "const(std.datetime.Date)": 282 case "immutable(std.datetime.Date)": 283 case "shared(immutable(std.datetime.Date))": 284 types[ct++] = SQLType.DATE; 285 types[ct++] = SIGNED; 286 Date date = isRef? *(v.get!(const(Date*))): v.get!(const(Date)); 287 ubyte[] da = pack(date); 288 size_t l = da.length; 289 reAlloc(l); 290 vals[vcl..vcl+l] = da[]; 291 vcl += l; 292 break; 293 case "std.datetime.TimeOfDay": 294 case "const(std.datetime.TimeOfDay)": 295 case "immutable(std.datetime.TimeOfDay)": 296 case "shared(immutable(std.datetime.TimeOfDay))": 297 298 case "std.datetime.date.TimeOfDay": 299 case "const(std.datetime.date.TimeOfDay)": 300 case "immutable(std.datetime.date.TimeOfDay)": 301 case "shared(immutable(std.datetime.date.TimeOfDay))": 302 303 case "std.datetime.Time": 304 case "const(std.datetime.Time)": 305 case "immutable(std.datetime.Time)": 306 case "shared(immutable(std.datetime.Time))": 307 types[ct++] = SQLType.TIME; 308 types[ct++] = SIGNED; 309 TimeOfDay time = isRef? *(v.get!(const(TimeOfDay*))): v.get!(const(TimeOfDay)); 310 ubyte[] ta = pack(time); 311 size_t l = ta.length; 312 reAlloc(l); 313 vals[vcl..vcl+l] = ta[]; 314 vcl += l; 315 break; 316 case "std.datetime.date.DateTime": 317 case "const(std.datetime.date.DateTime)": 318 case "immutable(std.datetime.date.DateTime)": 319 case "shared(immutable(std.datetime.date.DateTime))": 320 321 case "std.datetime.DateTime": 322 case "const(std.datetime.DateTime)": 323 case "immutable(std.datetime.DateTime)": 324 case "shared(immutable(std.datetime.DateTime))": 325 types[ct++] = SQLType.DATETIME; 326 types[ct++] = SIGNED; 327 DateTime dt = isRef? *(v.get!(const(DateTime*))): v.get!(const(DateTime)); 328 ubyte[] da = pack(dt); 329 size_t l = da.length; 330 reAlloc(l); 331 vals[vcl..vcl+l] = da[]; 332 vcl += l; 333 break; 334 case "mysql.types.Timestamp": 335 case "const(mysql.types.Timestamp)": 336 case "immutable(mysql.types.Timestamp)": 337 case "shared(immutable(mysql.types.Timestamp))": 338 types[ct++] = SQLType.TIMESTAMP; 339 types[ct++] = SIGNED; 340 Timestamp tms = isRef? *(v.get!(const(Timestamp*))): v.get!(const(Timestamp)); 341 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 342 ubyte[] da = pack(dt); 343 size_t l = da.length; 344 reAlloc(l); 345 vals[vcl..vcl+l] = da[]; 346 vcl += l; 347 break; 348 case "char[]": 349 case "const(char[])": 350 case "immutable(char[])": 351 case "const(char)[]": 352 case "immutable(char)[]": 353 case "shared(immutable(char)[])": 354 case "shared(immutable(char))[]": 355 case "shared(immutable(char[]))": 356 if (ext == SQLType.INFER_FROM_D_TYPE) 357 types[ct++] = SQLType.VARCHAR; 358 else 359 types[ct++] = cast(ubyte) ext; 360 types[ct++] = SIGNED; 361 const char[] ca = isRef? *(v.get!(const(char[]*))): v.get!(const(char[])); 362 ubyte[] packed = packLCS(cast(void[]) ca); 363 reAlloc(packed.length); 364 vals[vcl..vcl+packed.length] = packed[]; 365 vcl += packed.length; 366 break; 367 case "byte[]": 368 case "const(byte[])": 369 case "immutable(byte[])": 370 case "const(byte)[]": 371 case "immutable(byte)[]": 372 case "shared(immutable(byte)[])": 373 case "shared(immutable(byte))[]": 374 case "shared(immutable(byte[]))": 375 if (ext == SQLType.INFER_FROM_D_TYPE) 376 types[ct++] = SQLType.TINYBLOB; 377 else 378 types[ct++] = cast(ubyte) ext; 379 types[ct++] = SIGNED; 380 const byte[] ba = isRef? *(v.get!(const(byte[]*))): v.get!(const(byte[])); 381 ubyte[] packed = packLCS(cast(void[]) ba); 382 reAlloc(packed.length); 383 vals[vcl..vcl+packed.length] = packed[]; 384 vcl += packed.length; 385 break; 386 case "ubyte[]": 387 case "const(ubyte[])": 388 case "immutable(ubyte[])": 389 case "const(ubyte)[]": 390 case "immutable(ubyte)[]": 391 case "shared(immutable(ubyte)[])": 392 case "shared(immutable(ubyte))[]": 393 case "shared(immutable(ubyte[]))": 394 if (ext == SQLType.INFER_FROM_D_TYPE) 395 types[ct++] = SQLType.TINYBLOB; 396 else 397 types[ct++] = cast(ubyte) ext; 398 types[ct++] = SIGNED; 399 const ubyte[] uba = isRef? *(v.get!(const(ubyte[]*))): v.get!(const(ubyte[])); 400 ubyte[] packed = packLCS(cast(void[]) uba); 401 reAlloc(packed.length); 402 vals[vcl..vcl+packed.length] = packed[]; 403 vcl += packed.length; 404 break; 405 case "void": 406 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 407 default: 408 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 409 } 410 } 411 vals.length = vcl; 412 return types; 413 } 414 415 static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa) 416 { 417 assert(psa.length <= ushort.max); // parameter number is sent as short 418 foreach (ushort i, PSN psn; psa) 419 { 420 if (!psn.chunkSize) continue; 421 uint cs = psn.chunkSize; 422 uint delegate(ubyte[]) dg = psn.chunkDelegate; 423 424 ubyte[] chunk; 425 chunk.length = cs+11; 426 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 427 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 428 hStmt.packInto(chunk[5..9]); // statement handle 429 packInto(i, chunk[9..11]); // parameter number 430 431 // byte 11 on is payload 432 for (;;) 433 { 434 uint sent = dg(chunk[11..cs+11]); 435 if (sent < cs) 436 { 437 if (sent == 0) // data was exact multiple of chunk size - all sent 438 break; 439 chunk.length = chunk.length - (cs-sent); // trim the chunk 440 sent += 7; // adjust for non-payload bytes 441 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 442 socket.send(chunk); 443 break; 444 } 445 socket.send(chunk); 446 } 447 } 448 } 449 450 static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, 451 Variant[] inParams, ParameterSpecialization[] psa) 452 { 453 conn.autoPurge(); 454 455 ubyte[] packet; 456 conn.resetPacket(); 457 458 ubyte[] prefix = makePSPrefix(hStmt, 0); 459 size_t len = prefix.length; 460 bool longData; 461 462 if (psh.paramCount) 463 { 464 ubyte[] one = [ 1 ]; 465 ubyte[] vals; 466 ubyte[] types = analyseParams(inParams, psa, vals, longData); 467 ubyte[] nbm = makeBitmap(inParams); 468 packet = prefix ~ nbm ~ one ~ types ~ vals; 469 } 470 else 471 packet = prefix; 472 473 if (longData) 474 sendLongData(conn._socket, hStmt, psa); 475 476 assert(packet.length <= uint.max); 477 packet.setPacketHeader(conn.pktNumber); 478 conn.bumpPacket(); 479 conn._socket.send(packet); 480 } 481 } 482 483 package(mysql) struct ExecQueryImplInfo 484 { 485 bool isPrepared; 486 487 // For non-prepared statements: 488 const(char[]) sql; 489 490 // For prepared statements: 491 uint hStmt; 492 PreparedStmtHeaders psh; 493 Variant[] inParams; 494 ParameterSpecialization[] psa; 495 } 496 497 /++ 498 Internal implementation for the exec and query functions. 499 500 Execute a one-off SQL command. 501 502 Any result set can be accessed via Connection.getNextRow(), but you should really be 503 using the query function for such queries. 504 505 Params: ra = An out parameter to receive the number of rows affected. 506 Returns: true if there was a (possibly empty) result set. 507 +/ 508 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra) 509 { 510 scope(failure) conn.kill(); 511 512 // Send data 513 if(info.isPrepared) 514 ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa); 515 else 516 { 517 conn.sendCmd(CommandType.QUERY, info.sql); 518 conn._fieldCount = 0; 519 } 520 521 // Handle response 522 ubyte[] packet = conn.getPacket(); 523 bool rv; 524 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 525 { 526 conn.resetPacket(); 527 auto okp = OKErrorPacket(packet); 528 enforcePacketOK(okp); 529 ra = okp.affected; 530 conn._serverStatus = okp.serverStatus; 531 conn._insertID = okp.insertID; 532 rv = false; 533 } 534 else 535 { 536 // There was presumably a result set 537 assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value 538 conn._headersPending = conn._rowsPending = true; 539 conn._binaryPending = info.isPrepared; 540 auto lcb = packet.consumeIfComplete!LCB(); 541 assert(!lcb.isNull); 542 assert(!lcb.isIncomplete); 543 conn._fieldCount = cast(ushort)lcb.value; 544 assert(conn._fieldCount == lcb.value); 545 rv = true; 546 ra = 0; 547 } 548 return rv; 549 } 550 551 ///ditto 552 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info) 553 { 554 ulong rowsAffected; 555 return execQueryImpl(conn, info, rowsAffected); 556 } 557 558 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId) 559 { 560 scope(failure) conn.kill(); 561 562 if(conn.closed()) 563 return; 564 565 ubyte[9] packet_buf; 566 ubyte[] packet = packet_buf; 567 packet.setPacketHeader(0/*packet number*/); 568 conn.bumpPacket(); 569 packet[4] = CommandType.STMT_CLOSE; 570 statementId.packInto(packet[5..9]); 571 conn.purgeResult(); 572 conn._socket.send(packet); 573 // It seems that the server does not find it necessary to send a response 574 // for this command. 575 } 576 577 // Moved here from `struct Row` 578 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure 579 { 580 uint bitmapLength = calcBitmapLength(fieldCount); 581 enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields"); 582 auto bitmap = packet.consume(bitmapLength); 583 return decodeNullBitmap(bitmap, fieldCount); 584 } 585 586 // Moved here from `struct Row` 587 private static uint calcBitmapLength(uint fieldCount) pure nothrow 588 { 589 return (fieldCount+7+2)/8; 590 } 591 592 // Moved here from `struct Row` 593 // This is to decode the bitmap in a binary result row. First two bits are skipped 594 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow 595 in 596 { 597 assert(bitmap.length >= calcBitmapLength(numFields), 598 "bitmap not large enough to store all null fields"); 599 } 600 out(result) 601 { 602 assert(result.length == numFields); 603 } 604 body 605 { 606 bool[] nulls; 607 nulls.length = numFields; 608 609 // the current byte we are processing for nulls 610 ubyte bits = bitmap.front(); 611 // strip away the first two bits as they are reserved 612 bits >>= 2; 613 // .. and then we only have 6 bits left to process for this byte 614 ubyte bitsLeftInByte = 6; 615 foreach(ref isNull; nulls) 616 { 617 assert(bitsLeftInByte <= 8); 618 // processed all bits? fetch new byte 619 if (bitsLeftInByte == 0) 620 { 621 assert(bits == 0, "not all bits are processed!"); 622 assert(!bitmap.empty, "bits array too short for number of columns"); 623 bitmap.popFront(); 624 bits = bitmap.front; 625 bitsLeftInByte = 8; 626 } 627 assert(bitsLeftInByte > 0); 628 isNull = (bits & 0b0000_0001) != 0; 629 630 // get ready to process next bit 631 bits >>= 1; 632 --bitsLeftInByte; 633 } 634 return nulls; 635 } 636 637 // Moved here from `struct Row.this` 638 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary, 639 out Variant[] _values, out bool[] _nulls) 640 in 641 { 642 assert(rh.fieldCount <= uint.max); 643 } 644 body 645 { 646 scope(failure) conn.kill(); 647 648 uint fieldCount = cast(uint)rh.fieldCount; 649 _values.length = _nulls.length = fieldCount; 650 651 if(binary) 652 { 653 // There's a null byte header on a binary result sequence, followed by some bytes of bitmap 654 // indicating which columns are null 655 enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row"); 656 packet.popFront(); 657 _nulls = consumeNullBitmap(packet, fieldCount); 658 } 659 660 foreach(size_t i; 0..fieldCount) 661 { 662 if(binary && _nulls[i]) 663 { 664 _values[i] = null; 665 continue; 666 } 667 668 SQLValue sqlValue; 669 do 670 { 671 FieldDescription fd = rh[i]; 672 sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet); 673 // TODO: Support chunk delegate 674 if(sqlValue.isIncomplete) 675 packet ~= conn.getPacket(); 676 } while(sqlValue.isIncomplete); 677 assert(!sqlValue.isIncomplete); 678 679 if(sqlValue.isNull) 680 { 681 assert(!binary); 682 assert(!_nulls[i]); 683 _nulls[i] = true; 684 _values[i] = null; 685 } 686 else 687 { 688 _values[i] = sqlValue.value; 689 } 690 } 691 } 692 693 ////// Moved here from Connection ///////////////////////////////// 694 695 package(mysql) ubyte[] getPacket(Connection conn) 696 { 697 scope(failure) conn.kill(); 698 699 ubyte[4] header; 700 conn._socket.read(header); 701 // number of bytes always set as 24-bit 702 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 703 enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order"); 704 conn.bumpPacket(); 705 706 ubyte[] packet = new ubyte[numDataBytes]; 707 conn._socket.read(packet); 708 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 709 return packet; 710 } 711 712 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet) 713 in 714 { 715 assert(packet.length > 4); // at least 1 byte more than header 716 } 717 body 718 { 719 _socket.write(packet); 720 } 721 722 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data) 723 in 724 { 725 assert(header.length == 4 || header.length == 5/*command type included*/); 726 } 727 body 728 { 729 _socket.write(header); 730 if(data.length) 731 _socket.write(data); 732 } 733 734 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data) 735 in 736 { 737 // Internal thread states. Clients shouldn't use this 738 assert(cmd != CommandType.SLEEP); 739 assert(cmd != CommandType.CONNECT); 740 assert(cmd != CommandType.TIME); 741 assert(cmd != CommandType.DELAYED_INSERT); 742 assert(cmd != CommandType.CONNECT_OUT); 743 744 // Deprecated 745 assert(cmd != CommandType.CREATE_DB); 746 assert(cmd != CommandType.DROP_DB); 747 assert(cmd != CommandType.TABLE_DUMP); 748 749 // cannot send more than uint.max bytes. TODO: better error message if we try? 750 assert(data.length <= uint.max); 751 } 752 out 753 { 754 // at this point we should have sent a command 755 assert(conn.pktNumber == 1); 756 } 757 body 758 { 759 scope(failure) conn.kill(); 760 761 conn._lastCommandID++; 762 763 if(!conn._socket.connected) 764 { 765 if(cmd == CommandType.QUIT) 766 return; // Don't bother reopening connection just to quit 767 768 conn._open = Connection.OpenState.notConnected; 769 conn.connect(conn._clientCapabilities); 770 } 771 772 conn.autoPurge(); 773 774 conn.resetPacket(); 775 776 ubyte[] header; 777 header.length = 4 /*header*/ + 1 /*cmd*/; 778 header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/); 779 header[4] = cmd; 780 conn.bumpPacket(); 781 782 conn._socket.send(header, cast(const(ubyte)[])data); 783 } 784 785 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false) 786 { 787 auto okp = OKErrorPacket(conn.getPacket()); 788 enforcePacketOK(okp); 789 conn._serverStatus = okp.serverStatus; 790 return okp; 791 } 792 793 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token) 794 in 795 { 796 assert(token.length == 20); 797 } 798 body 799 { 800 ubyte[] packet; 801 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1); 802 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 803 804 // NOTE: we'll set the header last when we know the size 805 806 // Set the default capabilities required by the client 807 conn._cCaps.packInto(packet[4..8]); 808 809 // Request a conventional maximum packet length. 810 1.packInto(packet[8..12]); 811 812 packet ~= 33; // Set UTF-8 as default charSet 813 814 // There's a statutory block of zero bytes here - fill them in. 815 foreach(i; 0 .. 23) 816 packet ~= 0; 817 818 // Add the user name as a null terminated string 819 foreach(i; 0 .. conn._user.length) 820 packet ~= conn._user[i]; 821 packet ~= 0; // \0 822 823 // Add our calculated authentication token as a length prefixed string. 824 assert(token.length <= ubyte.max); 825 if(conn._pwd.length == 0) // Omit the token if the account has no password 826 packet ~= 0; 827 else 828 { 829 packet ~= cast(ubyte)token.length; 830 foreach(i; 0 .. token.length) 831 packet ~= token[i]; 832 } 833 834 // Add the default database as a null terminated string 835 foreach(i; 0 .. conn._db.length) 836 packet ~= conn._db[i]; 837 packet ~= 0; // \0 838 839 // The server sent us a greeting with packet number 0, so we send the auth packet 840 // back with the next number. 841 packet.setPacketHeader(conn.pktNumber); 842 conn.bumpPacket(); 843 return packet; 844 } 845 846 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf) 847 { 848 auto pass1 = sha1Of(cast(const(ubyte)[])password); 849 auto pass2 = sha1Of(pass1); 850 851 SHA1 sha1; 852 sha1.start(); 853 sha1.put(authBuf); 854 sha1.put(pass2); 855 auto result = sha1.finish(); 856 foreach (size_t i; 0..20) 857 result[i] = result[i] ^ pass1[i]; 858 return result.dup; 859 } 860 861 /// Get the next `mysql.result.Row` of a pending result set. 862 package(mysql) Row getNextRow(Connection conn) 863 { 864 scope(failure) conn.kill(); 865 866 if (conn._headersPending) 867 { 868 conn._rsh = ResultSetHeaders(conn, conn._fieldCount); 869 conn._headersPending = false; 870 } 871 ubyte[] packet; 872 Row rr; 873 packet = conn.getPacket(); 874 if(packet.front == ResultPacketMarker.error) 875 throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__); 876 877 if (packet.isEOFPacket()) 878 { 879 conn._rowsPending = conn._binaryPending = false; 880 return rr; 881 } 882 if (conn._binaryPending) 883 rr = Row(conn, packet, conn._rsh, true); 884 else 885 rr = Row(conn, packet, conn._rsh, false); 886 //rr._valid = true; 887 return rr; 888 } 889 890 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet) 891 { 892 scope(failure) conn.kill(); 893 894 conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 895 conn._sCharSet = packet.consume!ubyte(); // server_language 896 conn._serverStatus = packet.consume!ushort(); //server_status 897 conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 898 conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 899 900 enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 901 enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 902 } 903 904 package(mysql) ubyte[] parseGreeting(Connection conn) 905 { 906 scope(failure) conn.kill(); 907 908 ubyte[] packet = conn.getPacket(); 909 910 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 911 { 912 auto okp = OKErrorPacket(packet); 913 enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 914 } 915 916 conn._protocol = packet.consume!ubyte(); 917 918 conn._serverVersion = packet.consume!string(packet.countUntil(0)); 919 packet.skip(1); // \0 terminated _serverVersion 920 921 conn._sThread = packet.consume!uint(); 922 923 // read first part of scramble buf 924 ubyte[] authBuf; 925 authBuf.length = 255; 926 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 927 928 enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 929 930 conn.consumeServerInfo(packet); 931 932 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 933 packet.skip(10); // filler of \0 934 935 // rest of the scramble 936 auto len = packet.countUntil(0); 937 enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 938 enforce(authBuf.length > 8+len); 939 authBuf[8..8+len] = packet.consume(len)[]; 940 authBuf.length = 8+len; // cut to correct size 941 enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 942 943 return authBuf; 944 } 945 946 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 947 { 948 SvrCapFlags common; 949 uint filter = 1; 950 foreach (size_t i; 0..uint.sizeof) 951 { 952 bool serverSupport = (server & filter) != 0; // can the server do this capability? 953 bool clientSupport = (client & filter) != 0; // can we support it? 954 if(serverSupport && clientSupport) 955 common |= filter; 956 filter <<= 1; // check next flag 957 } 958 return common; 959 } 960 961 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags) 962 { 963 auto cCaps = getCommonCapabilities(serverCaps, capFlags); 964 965 // We cannot operate in <4.1 protocol, so we'll force it even if the user 966 // didn't supply it 967 cCaps |= SvrCapFlags.PROTOCOL41; 968 cCaps |= SvrCapFlags.SECURE_CONNECTION; 969 970 return cCaps; 971 } 972 973 package(mysql) void authenticate(Connection conn, ubyte[] greeting) 974 in 975 { 976 assert(conn._open == Connection.OpenState.connected); 977 } 978 out 979 { 980 assert(conn._open == Connection.OpenState.authenticated); 981 } 982 body 983 { 984 auto token = makeToken(conn._pwd, greeting); 985 auto authPacket = conn.buildAuthPacket(token); 986 conn._socket.send(authPacket); 987 988 auto packet = conn.getPacket(); 989 auto okp = OKErrorPacket(packet); 990 enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 991 conn._open = Connection.OpenState.authenticated; 992 } 993 994 // Register prepared statement 995 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql) 996 { 997 scope(failure) conn.kill(); 998 999 PreparedServerInfo info; 1000 1001 conn.sendCmd(CommandType.STMT_PREPARE, sql); 1002 conn._fieldCount = 0; 1003 1004 ubyte[] packet = conn.getPacket(); 1005 if(packet.front == ResultPacketMarker.ok) 1006 { 1007 packet.popFront(); 1008 info.statementId = packet.consume!int(); 1009 conn._fieldCount = packet.consume!short(); 1010 info.numParams = packet.consume!short(); 1011 1012 packet.popFront(); // one byte filler 1013 info.psWarnings = packet.consume!short(); 1014 1015 // At this point the server also sends field specs for parameters 1016 // and columns if there were any of each 1017 info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams); 1018 } 1019 else if(packet.front == ResultPacketMarker.error) 1020 { 1021 auto error = OKErrorPacket(packet); 1022 enforcePacketOK(error); 1023 assert(0); // FIXME: what now? 1024 } 1025 else 1026 assert(0); // FIXME: what now? 1027 1028 return info; 1029 } 1030 1031 /++ 1032 Flush any outstanding result set elements. 1033 1034 When the server responds to a command that produces a result set, it 1035 queues the whole set of corresponding packets over the current connection. 1036 Before that `Connection` can embark on any new command, it must receive 1037 all of those packets and junk them. 1038 1039 As of v1.1.4, this is done automatically as needed. But you can still 1040 call this manually to force a purge to occur when you want. 1041 1042 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/) 1043 +/ 1044 package(mysql) ulong purgeResult(Connection conn) 1045 { 1046 scope(failure) conn.kill(); 1047 1048 conn._lastCommandID++; 1049 1050 ulong rows = 0; 1051 if (conn._headersPending) 1052 { 1053 for (size_t i = 0;; i++) 1054 { 1055 if (conn.getPacket().isEOFPacket()) 1056 { 1057 conn._headersPending = false; 1058 break; 1059 } 1060 enforce!MYXProtocol(i < conn._fieldCount, 1061 text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found.")); 1062 } 1063 } 1064 if (conn._rowsPending) 1065 { 1066 for (;; rows++) 1067 { 1068 if (conn.getPacket().isEOFPacket()) 1069 { 1070 conn._rowsPending = conn._binaryPending = false; 1071 break; 1072 } 1073 } 1074 } 1075 conn.resetPacket(); 1076 return rows; 1077 } 1078 1079 /++ 1080 Get a textual report on the server status. 1081 1082 (COM_STATISTICS) 1083 +/ 1084 package(mysql) string serverStats(Connection conn) 1085 { 1086 conn.sendCmd(CommandType.STATISTICS, []); 1087 return cast(string) conn.getPacket(); 1088 } 1089 1090 /++ 1091 Enable multiple statement commands. 1092 1093 This can be used later if this feature was not requested in the client capability flags. 1094 1095 Warning: This functionality is currently untested. 1096 1097 Params: on = Boolean value to turn the capability on or off. 1098 +/ 1099 //TODO: Need to test this 1100 package(mysql) void enableMultiStatements(Connection conn, bool on) 1101 { 1102 scope(failure) conn.kill(); 1103 1104 ubyte[] t; 1105 t.length = 2; 1106 t[0] = on ? 0 : 1; 1107 t[1] = 0; 1108 conn.sendCmd(CommandType.STMT_OPTION, t); 1109 1110 // For some reason this command gets an EOF packet as response 1111 auto packet = conn.getPacket(); 1112 enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1113 }