1 /++ 2 Internal - Low-level communications. 3 4 Consider this module the main entry point for the low-level MySQL/MariaDB 5 protocol code. The other modules in `mysql.protocol` are mainly tools 6 to support this module. 7 8 Previously, the code handling low-level protocol details was scattered all 9 across the library. Such functionality has been factored out into this module, 10 to be kept in one place for better encapsulation and to facilitate further 11 cleanup and refactoring. 12 13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it 14 eventually settles into what will eventually become a low-level library 15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight... 16 17 Next tasks for this sub-package's cleanup: 18 - Reduce this module's reliance on Connection. 19 - Abstract out a PacketStream to clean up getPacket and related functionality. 20 +/ 21 module mysql.protocol.comms; 22 23 import std.algorithm; 24 import std.conv; 25 import std.digest.sha; 26 import std.exception; 27 import std.range; 28 import std.variant; 29 30 import mysql.connection; 31 import mysql.exceptions; 32 import mysql.prepared; 33 import mysql.result; 34 35 import mysql.protocol.constants; 36 import mysql.protocol.extra_types; 37 import mysql.protocol.packet_helpers; 38 import mysql.protocol.packets; 39 import mysql.protocol.sockets; 40 41 /// Low-level comms code relating to prepared statements. 42 package struct ProtocolPrepared 43 { 44 import std.conv; 45 import std.datetime; 46 import std.variant; 47 import mysql.types; 48 49 static ubyte[] makeBitmap(in Variant[] inParams) 50 { 51 size_t bml = (inParams.length+7)/8; 52 ubyte[] bma; 53 bma.length = bml; 54 foreach (i; 0..inParams.length) 55 { 56 if(inParams[i].type != typeid(typeof(null))) 57 continue; 58 size_t bn = i/8; 59 size_t bb = i%8; 60 ubyte sr = 1; 61 sr <<= bb; 62 bma[bn] |= sr; 63 } 64 return bma; 65 } 66 67 static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow 68 { 69 ubyte[] prefix; 70 prefix.length = 14; 71 72 prefix[4] = CommandType.STMT_EXECUTE; 73 hStmt.packInto(prefix[5..9]); 74 prefix[9] = flags; // flags, no cursor 75 prefix[10] = 1; // iteration count - currently always 1 76 prefix[11] = 0; 77 prefix[12] = 0; 78 prefix[13] = 0; 79 80 return prefix; 81 } 82 83 static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa, 84 out ubyte[] vals, out bool longData) 85 { 86 size_t pc = inParams.length; 87 ubyte[] types; 88 types.length = pc*2; 89 size_t alloc = pc*20; 90 vals.length = alloc; 91 uint vcl = 0, len; 92 int ct = 0; 93 94 void reAlloc(size_t n) 95 { 96 if (vcl+n < alloc) 97 return; 98 size_t inc = (alloc*3)/2; 99 if (inc < n) 100 inc = n; 101 alloc += inc; 102 vals.length = alloc; 103 } 104 105 foreach (size_t i; 0..pc) 106 { 107 enum UNSIGNED = 0x80; 108 enum SIGNED = 0; 109 if (psa[i].chunkSize) 110 longData= true; 111 if (inParams[i].type == typeid(typeof(null))) 112 { 113 types[ct++] = SQLType.NULL; 114 types[ct++] = SIGNED; 115 continue; 116 } 117 Variant v = inParams[i]; 118 SQLType ext = psa[i].type; 119 string ts = v.type.toString(); 120 bool isRef; 121 if (ts[$-1] == '*') 122 { 123 ts.length = ts.length-1; 124 isRef= true; 125 } 126 127 switch (ts) 128 { 129 case "bool": 130 case "const(bool)": 131 case "immutable(bool)": 132 case "shared(immutable(bool))": 133 if (ext == SQLType.INFER_FROM_D_TYPE) 134 types[ct++] = SQLType.BIT; 135 else 136 types[ct++] = cast(ubyte) ext; 137 types[ct++] = SIGNED; 138 reAlloc(2); 139 bool bv = isRef? *(v.get!(const(bool*))): v.get!(const(bool)); 140 vals[vcl++] = 1; 141 vals[vcl++] = bv? 0x31: 0x30; 142 break; 143 case "byte": 144 case "const(byte)": 145 case "immutable(byte)": 146 case "shared(immutable(byte))": 147 types[ct++] = SQLType.TINY; 148 types[ct++] = SIGNED; 149 reAlloc(1); 150 vals[vcl++] = isRef? *(v.get!(const(byte*))): v.get!(const(byte)); 151 break; 152 case "ubyte": 153 case "const(ubyte)": 154 case "immutable(ubyte)": 155 case "shared(immutable(ubyte))": 156 types[ct++] = SQLType.TINY; 157 types[ct++] = UNSIGNED; 158 reAlloc(1); 159 vals[vcl++] = isRef? *(v.get!(const(ubyte*))): v.get!(const(ubyte)); 160 break; 161 case "short": 162 case "const(short)": 163 case "immutable(short)": 164 case "shared(immutable(short))": 165 types[ct++] = SQLType.SHORT; 166 types[ct++] = SIGNED; 167 reAlloc(2); 168 short si = isRef? *(v.get!(const(short*))): v.get!(const(short)); 169 vals[vcl++] = cast(ubyte) (si & 0xff); 170 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 171 break; 172 case "ushort": 173 case "const(ushort)": 174 case "immutable(ushort)": 175 case "shared(immutable(ushort))": 176 types[ct++] = SQLType.SHORT; 177 types[ct++] = UNSIGNED; 178 reAlloc(2); 179 ushort us = isRef? *(v.get!(const(ushort*))): v.get!(const(ushort)); 180 vals[vcl++] = cast(ubyte) (us & 0xff); 181 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 182 break; 183 case "int": 184 case "const(int)": 185 case "immutable(int)": 186 case "shared(immutable(int))": 187 types[ct++] = SQLType.INT; 188 types[ct++] = SIGNED; 189 reAlloc(4); 190 int ii = isRef? *(v.get!(const(int*))): v.get!(const(int)); 191 vals[vcl++] = cast(ubyte) (ii & 0xff); 192 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 193 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 194 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 195 break; 196 case "uint": 197 case "const(uint)": 198 case "immutable(uint)": 199 case "shared(immutable(uint))": 200 types[ct++] = SQLType.INT; 201 types[ct++] = UNSIGNED; 202 reAlloc(4); 203 uint ui = isRef? *(v.get!(const(uint*))): v.get!(const(uint)); 204 vals[vcl++] = cast(ubyte) (ui & 0xff); 205 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 206 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 207 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 208 break; 209 case "long": 210 case "const(long)": 211 case "immutable(long)": 212 case "shared(immutable(long))": 213 types[ct++] = SQLType.LONGLONG; 214 types[ct++] = SIGNED; 215 reAlloc(8); 216 long li = isRef? *(v.get!(const(long*))): v.get!(const(long)); 217 vals[vcl++] = cast(ubyte) (li & 0xff); 218 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 219 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 220 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 221 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 222 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 223 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 224 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 225 break; 226 case "ulong": 227 case "const(ulong)": 228 case "immutable(ulong)": 229 case "shared(immutable(ulong))": 230 types[ct++] = SQLType.LONGLONG; 231 types[ct++] = UNSIGNED; 232 reAlloc(8); 233 ulong ul = isRef? *(v.get!(const(ulong*))): v.get!(const(ulong)); 234 vals[vcl++] = cast(ubyte) (ul & 0xff); 235 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 236 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 237 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 238 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 239 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 240 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 241 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 242 break; 243 case "float": 244 case "const(float)": 245 case "immutable(float)": 246 case "shared(immutable(float))": 247 types[ct++] = SQLType.FLOAT; 248 types[ct++] = SIGNED; 249 reAlloc(4); 250 float f = isRef? *(v.get!(const(float*))): v.get!(const(float)); 251 ubyte* ubp = cast(ubyte*) &f; 252 vals[vcl++] = *ubp++; 253 vals[vcl++] = *ubp++; 254 vals[vcl++] = *ubp++; 255 vals[vcl++] = *ubp; 256 break; 257 case "double": 258 case "const(double)": 259 case "immutable(double)": 260 case "shared(immutable(double))": 261 types[ct++] = SQLType.DOUBLE; 262 types[ct++] = SIGNED; 263 reAlloc(8); 264 double d = isRef? *(v.get!(const(double*))): v.get!(const(double)); 265 ubyte* ubp = cast(ubyte*) &d; 266 vals[vcl++] = *ubp++; 267 vals[vcl++] = *ubp++; 268 vals[vcl++] = *ubp++; 269 vals[vcl++] = *ubp++; 270 vals[vcl++] = *ubp++; 271 vals[vcl++] = *ubp++; 272 vals[vcl++] = *ubp++; 273 vals[vcl++] = *ubp; 274 break; 275 case "std.datetime.date.Date": 276 case "const(std.datetime.date.Date)": 277 case "immutable(std.datetime.date.Date)": 278 case "shared(immutable(std.datetime.date.Date))": 279 280 case "std.datetime.Date": 281 case "const(std.datetime.Date)": 282 case "immutable(std.datetime.Date)": 283 case "shared(immutable(std.datetime.Date))": 284 types[ct++] = SQLType.DATE; 285 types[ct++] = SIGNED; 286 Date date = isRef? *(v.get!(const(Date*))): v.get!(const(Date)); 287 ubyte[] da = pack(date); 288 size_t l = da.length; 289 reAlloc(l); 290 vals[vcl..vcl+l] = da[]; 291 vcl += l; 292 break; 293 case "std.datetime.TimeOfDay": 294 case "const(std.datetime.TimeOfDay)": 295 case "immutable(std.datetime.TimeOfDay)": 296 case "shared(immutable(std.datetime.TimeOfDay))": 297 298 case "std.datetime.date.TimeOfDay": 299 case "const(std.datetime.date.TimeOfDay)": 300 case "immutable(std.datetime.date.TimeOfDay)": 301 case "shared(immutable(std.datetime.date.TimeOfDay))": 302 303 case "std.datetime.Time": 304 case "const(std.datetime.Time)": 305 case "immutable(std.datetime.Time)": 306 case "shared(immutable(std.datetime.Time))": 307 types[ct++] = SQLType.TIME; 308 types[ct++] = SIGNED; 309 TimeOfDay time = isRef? *(v.get!(const(TimeOfDay*))): v.get!(const(TimeOfDay)); 310 ubyte[] ta = pack(time); 311 size_t l = ta.length; 312 reAlloc(l); 313 vals[vcl..vcl+l] = ta[]; 314 vcl += l; 315 break; 316 case "std.datetime.date.DateTime": 317 case "const(std.datetime.date.DateTime)": 318 case "immutable(std.datetime.date.DateTime)": 319 case "shared(immutable(std.datetime.date.DateTime))": 320 321 case "std.datetime.DateTime": 322 case "const(std.datetime.DateTime)": 323 case "immutable(std.datetime.DateTime)": 324 case "shared(immutable(std.datetime.DateTime))": 325 types[ct++] = SQLType.DATETIME; 326 types[ct++] = SIGNED; 327 DateTime dt = isRef? *(v.get!(const(DateTime*))): v.get!(const(DateTime)); 328 ubyte[] da = pack(dt); 329 size_t l = da.length; 330 reAlloc(l); 331 vals[vcl..vcl+l] = da[]; 332 vcl += l; 333 break; 334 case "mysql.types.Timestamp": 335 case "const(mysql.types.Timestamp)": 336 case "immutable(mysql.types.Timestamp)": 337 case "shared(immutable(mysql.types.Timestamp))": 338 types[ct++] = SQLType.TIMESTAMP; 339 types[ct++] = SIGNED; 340 Timestamp tms = isRef? *(v.get!(const(Timestamp*))): v.get!(const(Timestamp)); 341 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 342 ubyte[] da = pack(dt); 343 size_t l = da.length; 344 reAlloc(l); 345 vals[vcl..vcl+l] = da[]; 346 vcl += l; 347 break; 348 case "char[]": 349 case "const(char[])": 350 case "immutable(char[])": 351 case "const(char)[]": 352 case "immutable(char)[]": 353 case "shared(immutable(char)[])": 354 case "shared(immutable(char))[]": 355 case "shared(immutable(char[]))": 356 if (ext == SQLType.INFER_FROM_D_TYPE) 357 types[ct++] = SQLType.VARCHAR; 358 else 359 types[ct++] = cast(ubyte) ext; 360 types[ct++] = SIGNED; 361 const char[] ca = isRef? *(v.get!(const(char[]*))): v.get!(const(char[])); 362 ubyte[] packed = packLCS(cast(void[]) ca); 363 reAlloc(packed.length); 364 vals[vcl..vcl+packed.length] = packed[]; 365 vcl += packed.length; 366 break; 367 case "byte[]": 368 case "const(byte[])": 369 case "immutable(byte[])": 370 case "const(byte)[]": 371 case "immutable(byte)[]": 372 case "shared(immutable(byte)[])": 373 case "shared(immutable(byte))[]": 374 case "shared(immutable(byte[]))": 375 if (ext == SQLType.INFER_FROM_D_TYPE) 376 types[ct++] = SQLType.TINYBLOB; 377 else 378 types[ct++] = cast(ubyte) ext; 379 types[ct++] = SIGNED; 380 const byte[] ba = isRef? *(v.get!(const(byte[]*))): v.get!(const(byte[])); 381 ubyte[] packed = packLCS(cast(void[]) ba); 382 reAlloc(packed.length); 383 vals[vcl..vcl+packed.length] = packed[]; 384 vcl += packed.length; 385 break; 386 case "ubyte[]": 387 case "const(ubyte[])": 388 case "immutable(ubyte[])": 389 case "const(ubyte)[]": 390 case "immutable(ubyte)[]": 391 case "shared(immutable(ubyte)[])": 392 case "shared(immutable(ubyte))[]": 393 case "shared(immutable(ubyte[]))": 394 if (ext == SQLType.INFER_FROM_D_TYPE) 395 types[ct++] = SQLType.TINYBLOB; 396 else 397 types[ct++] = cast(ubyte) ext; 398 types[ct++] = SIGNED; 399 const ubyte[] uba = isRef? *(v.get!(const(ubyte[]*))): v.get!(const(ubyte[])); 400 ubyte[] packed = packLCS(cast(void[]) uba); 401 reAlloc(packed.length); 402 vals[vcl..vcl+packed.length] = packed[]; 403 vcl += packed.length; 404 break; 405 case "void": 406 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 407 default: 408 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 409 } 410 } 411 vals.length = vcl; 412 return types; 413 } 414 415 static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa) 416 { 417 assert(psa.length <= ushort.max); // parameter number is sent as short 418 foreach (ushort i, PSN psn; psa) 419 { 420 if (!psn.chunkSize) continue; 421 uint cs = psn.chunkSize; 422 uint delegate(ubyte[]) dg = psn.chunkDelegate; 423 424 ubyte[] chunk; 425 chunk.length = cs+11; 426 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 427 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 428 hStmt.packInto(chunk[5..9]); // statement handle 429 packInto(i, chunk[9..11]); // parameter number 430 431 // byte 11 on is payload 432 for (;;) 433 { 434 uint sent = dg(chunk[11..cs+11]); 435 if (sent < cs) 436 { 437 if (sent == 0) // data was exact multiple of chunk size - all sent 438 break; 439 chunk.length = chunk.length - (cs-sent); // trim the chunk 440 sent += 7; // adjust for non-payload bytes 441 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 442 socket.send(chunk); 443 break; 444 } 445 socket.send(chunk); 446 } 447 } 448 } 449 450 static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, 451 Variant[] inParams, ParameterSpecialization[] psa) 452 { 453 conn.autoPurge(); 454 455 ubyte[] packet; 456 conn.resetPacket(); 457 458 ubyte[] prefix = makePSPrefix(hStmt, 0); 459 size_t len = prefix.length; 460 bool longData; 461 462 if (psh.paramCount) 463 { 464 ubyte[] one = [ 1 ]; 465 ubyte[] vals; 466 ubyte[] types = analyseParams(inParams, psa, vals, longData); 467 ubyte[] nbm = makeBitmap(inParams); 468 packet = prefix ~ nbm ~ one ~ types ~ vals; 469 } 470 else 471 packet = prefix; 472 473 if (longData) 474 sendLongData(conn._socket, hStmt, psa); 475 476 assert(packet.length <= uint.max); 477 packet.setPacketHeader(conn.pktNumber); 478 conn.bumpPacket(); 479 conn._socket.send(packet); 480 } 481 } 482 483 package(mysql) struct ExecQueryImplInfo 484 { 485 bool isPrepared; 486 487 // For non-prepared statements: 488 const(char[]) sql; 489 490 // For prepared statements: 491 uint hStmt; 492 PreparedStmtHeaders psh; 493 Variant[] inParams; 494 ParameterSpecialization[] psa; 495 } 496 497 /++ 498 Internal implementation for the exec and query functions. 499 500 Execute a one-off SQL command. 501 502 Any result set can be accessed via Connection.getNextRow(), but you should really be 503 using the query function for such queries. 504 505 Params: ra = An out parameter to receive the number of rows affected. 506 Returns: true if there was a (possibly empty) result set. 507 +/ 508 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra) 509 { 510 scope(failure) conn.kill(); 511 512 // Send data 513 if(info.isPrepared) 514 ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa); 515 else 516 { 517 conn.sendCmd(CommandType.QUERY, info.sql); 518 conn._fieldCount = 0; 519 } 520 521 // Handle response 522 ubyte[] packet = conn.getPacket(); 523 bool rv; 524 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 525 { 526 conn.resetPacket(); 527 auto okp = OKErrorPacket(packet); 528 enforcePacketOK(okp); 529 ra = okp.affected; 530 conn._serverStatus = okp.serverStatus; 531 conn._insertID = okp.insertID; 532 rv = false; 533 } 534 else 535 { 536 // There was presumably a result set 537 assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value 538 conn._headersPending = conn._rowsPending = true; 539 conn._binaryPending = info.isPrepared; 540 auto lcb = packet.consumeIfComplete!LCB(); 541 assert(!lcb.isNull); 542 assert(!lcb.isIncomplete); 543 conn._fieldCount = cast(ushort)lcb.value; 544 assert(conn._fieldCount == lcb.value); 545 rv = true; 546 ra = 0; 547 } 548 return rv; 549 } 550 551 ///ditto 552 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info) 553 { 554 ulong rowsAffected; 555 return execQueryImpl(conn, info, rowsAffected); 556 } 557 558 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId) 559 { 560 scope(failure) conn.kill(); 561 562 if(conn.closed()) 563 return; 564 565 ubyte[9] packet_buf; 566 ubyte[] packet = packet_buf; 567 packet.setPacketHeader(0/*packet number*/); 568 conn.bumpPacket(); 569 packet[4] = CommandType.STMT_CLOSE; 570 statementId.packInto(packet[5..9]); 571 conn.purgeResult(); 572 conn._socket.send(packet); 573 // It seems that the server does not find it necessary to send a response 574 // for this command. 575 } 576 577 // Moved here from `struct Row` 578 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure 579 { 580 uint bitmapLength = calcBitmapLength(fieldCount); 581 enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields"); 582 auto bitmap = packet.consume(bitmapLength); 583 return decodeNullBitmap(bitmap, fieldCount); 584 } 585 586 // Moved here from `struct Row` 587 private static uint calcBitmapLength(uint fieldCount) pure nothrow 588 { 589 return (fieldCount+7+2)/8; 590 } 591 592 // Moved here from `struct Row` 593 // This is to decode the bitmap in a binary result row. First two bits are skipped 594 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow 595 in 596 { 597 assert(bitmap.length >= calcBitmapLength(numFields), 598 "bitmap not large enough to store all null fields"); 599 } 600 out(result) 601 { 602 assert(result.length == numFields); 603 } 604 body 605 { 606 bool[] nulls; 607 nulls.length = numFields; 608 609 // the current byte we are processing for nulls 610 ubyte bits = bitmap.front(); 611 // strip away the first two bits as they are reserved 612 bits >>= 2; 613 // .. and then we only have 6 bits left to process for this byte 614 ubyte bitsLeftInByte = 6; 615 foreach(ref isNull; nulls) 616 { 617 assert(bitsLeftInByte <= 8); 618 // processed all bits? fetch new byte 619 if (bitsLeftInByte == 0) 620 { 621 assert(bits == 0, "not all bits are processed!"); 622 assert(!bitmap.empty, "bits array too short for number of columns"); 623 bitmap.popFront(); 624 bits = bitmap.front; 625 bitsLeftInByte = 8; 626 } 627 assert(bitsLeftInByte > 0); 628 isNull = (bits & 0b0000_0001) != 0; 629 630 // get ready to process next bit 631 bits >>= 1; 632 --bitsLeftInByte; 633 } 634 return nulls; 635 } 636 637 // Moved here from `struct Row.this` 638 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary, 639 out Variant[] _values, out bool[] _nulls, out string[] _names) 640 in 641 { 642 assert(rh.fieldCount <= uint.max); 643 } 644 body 645 { 646 scope(failure) conn.kill(); 647 648 uint fieldCount = cast(uint)rh.fieldCount; 649 _values.length = _nulls.length = _names.length = fieldCount; 650 651 if(binary) 652 { 653 // There's a null byte header on a binary result sequence, followed by some bytes of bitmap 654 // indicating which columns are null 655 enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row"); 656 packet.popFront(); 657 _nulls = consumeNullBitmap(packet, fieldCount); 658 } 659 660 foreach(size_t i; 0..fieldCount) 661 { 662 if(binary && _nulls[i]) 663 { 664 _values[i] = null; 665 continue; 666 } 667 668 SQLValue sqlValue; 669 do 670 { 671 FieldDescription fd = rh[i]; 672 _names[i] = fd.name; 673 sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet); 674 // TODO: Support chunk delegate 675 if(sqlValue.isIncomplete) 676 packet ~= conn.getPacket(); 677 } while(sqlValue.isIncomplete); 678 assert(!sqlValue.isIncomplete); 679 680 if(sqlValue.isNull) 681 { 682 assert(!binary); 683 assert(!_nulls[i]); 684 _nulls[i] = true; 685 _values[i] = null; 686 } 687 else 688 { 689 _values[i] = sqlValue.value; 690 } 691 } 692 } 693 694 ////// Moved here from Connection ///////////////////////////////// 695 696 package(mysql) ubyte[] getPacket(Connection conn) 697 { 698 scope(failure) conn.kill(); 699 700 ubyte[4] header; 701 conn._socket.read(header); 702 // number of bytes always set as 24-bit 703 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 704 enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order"); 705 conn.bumpPacket(); 706 707 ubyte[] packet = new ubyte[numDataBytes]; 708 conn._socket.read(packet); 709 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 710 return packet; 711 } 712 713 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet) 714 in 715 { 716 assert(packet.length > 4); // at least 1 byte more than header 717 } 718 body 719 { 720 _socket.write(packet); 721 } 722 723 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data) 724 in 725 { 726 assert(header.length == 4 || header.length == 5/*command type included*/); 727 } 728 body 729 { 730 _socket.write(header); 731 if(data.length) 732 _socket.write(data); 733 } 734 735 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data) 736 in 737 { 738 // Internal thread states. Clients shouldn't use this 739 assert(cmd != CommandType.SLEEP); 740 assert(cmd != CommandType.CONNECT); 741 assert(cmd != CommandType.TIME); 742 assert(cmd != CommandType.DELAYED_INSERT); 743 assert(cmd != CommandType.CONNECT_OUT); 744 745 // Deprecated 746 assert(cmd != CommandType.CREATE_DB); 747 assert(cmd != CommandType.DROP_DB); 748 assert(cmd != CommandType.TABLE_DUMP); 749 750 // cannot send more than uint.max bytes. TODO: better error message if we try? 751 assert(data.length <= uint.max); 752 } 753 out 754 { 755 // at this point we should have sent a command 756 assert(conn.pktNumber == 1); 757 } 758 body 759 { 760 scope(failure) conn.kill(); 761 762 conn._lastCommandID++; 763 764 if(!conn._socket.connected) 765 { 766 if(cmd == CommandType.QUIT) 767 return; // Don't bother reopening connection just to quit 768 769 conn._open = Connection.OpenState.notConnected; 770 conn.connect(conn._clientCapabilities); 771 } 772 773 conn.autoPurge(); 774 775 conn.resetPacket(); 776 777 ubyte[] header; 778 header.length = 4 /*header*/ + 1 /*cmd*/; 779 header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/); 780 header[4] = cmd; 781 conn.bumpPacket(); 782 783 conn._socket.send(header, cast(const(ubyte)[])data); 784 } 785 786 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false) 787 { 788 auto okp = OKErrorPacket(conn.getPacket()); 789 enforcePacketOK(okp); 790 conn._serverStatus = okp.serverStatus; 791 return okp; 792 } 793 794 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token) 795 in 796 { 797 assert(token.length == 20); 798 } 799 body 800 { 801 ubyte[] packet; 802 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1); 803 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 804 805 // NOTE: we'll set the header last when we know the size 806 807 // Set the default capabilities required by the client 808 conn._cCaps.packInto(packet[4..8]); 809 810 // Request a conventional maximum packet length. 811 1.packInto(packet[8..12]); 812 813 packet ~= 33; // Set UTF-8 as default charSet 814 815 // There's a statutory block of zero bytes here - fill them in. 816 foreach(i; 0 .. 23) 817 packet ~= 0; 818 819 // Add the user name as a null terminated string 820 foreach(i; 0 .. conn._user.length) 821 packet ~= conn._user[i]; 822 packet ~= 0; // \0 823 824 // Add our calculated authentication token as a length prefixed string. 825 assert(token.length <= ubyte.max); 826 if(conn._pwd.length == 0) // Omit the token if the account has no password 827 packet ~= 0; 828 else 829 { 830 packet ~= cast(ubyte)token.length; 831 foreach(i; 0 .. token.length) 832 packet ~= token[i]; 833 } 834 835 // Add the default database as a null terminated string 836 foreach(i; 0 .. conn._db.length) 837 packet ~= conn._db[i]; 838 packet ~= 0; // \0 839 840 // The server sent us a greeting with packet number 0, so we send the auth packet 841 // back with the next number. 842 packet.setPacketHeader(conn.pktNumber); 843 conn.bumpPacket(); 844 return packet; 845 } 846 847 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf) 848 { 849 auto pass1 = sha1Of(cast(const(ubyte)[])password); 850 auto pass2 = sha1Of(pass1); 851 852 SHA1 sha1; 853 sha1.start(); 854 sha1.put(authBuf); 855 sha1.put(pass2); 856 auto result = sha1.finish(); 857 foreach (size_t i; 0..20) 858 result[i] = result[i] ^ pass1[i]; 859 return result.dup; 860 } 861 862 /// Get the next `mysql.result.Row` of a pending result set. 863 package(mysql) Row getNextRow(Connection conn) 864 { 865 scope(failure) conn.kill(); 866 867 if (conn._headersPending) 868 { 869 conn._rsh = ResultSetHeaders(conn, conn._fieldCount); 870 conn._headersPending = false; 871 } 872 ubyte[] packet; 873 Row rr; 874 packet = conn.getPacket(); 875 if(packet.front == ResultPacketMarker.error) 876 throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__); 877 878 if (packet.isEOFPacket()) 879 { 880 conn._rowsPending = conn._binaryPending = false; 881 return rr; 882 } 883 if (conn._binaryPending) 884 rr = Row(conn, packet, conn._rsh, true); 885 else 886 rr = Row(conn, packet, conn._rsh, false); 887 //rr._valid = true; 888 return rr; 889 } 890 891 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet) 892 { 893 scope(failure) conn.kill(); 894 895 conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 896 conn._sCharSet = packet.consume!ubyte(); // server_language 897 conn._serverStatus = packet.consume!ushort(); //server_status 898 conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 899 conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 900 901 enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 902 enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 903 } 904 905 package(mysql) ubyte[] parseGreeting(Connection conn) 906 { 907 scope(failure) conn.kill(); 908 909 ubyte[] packet = conn.getPacket(); 910 911 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 912 { 913 auto okp = OKErrorPacket(packet); 914 enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 915 } 916 917 conn._protocol = packet.consume!ubyte(); 918 919 conn._serverVersion = packet.consume!string(packet.countUntil(0)); 920 packet.skip(1); // \0 terminated _serverVersion 921 922 conn._sThread = packet.consume!uint(); 923 924 // read first part of scramble buf 925 ubyte[] authBuf; 926 authBuf.length = 255; 927 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 928 929 enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 930 931 conn.consumeServerInfo(packet); 932 933 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 934 packet.skip(10); // filler of \0 935 936 // rest of the scramble 937 auto len = packet.countUntil(0); 938 enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 939 enforce(authBuf.length > 8+len); 940 authBuf[8..8+len] = packet.consume(len)[]; 941 authBuf.length = 8+len; // cut to correct size 942 enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 943 944 return authBuf; 945 } 946 947 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 948 { 949 SvrCapFlags common; 950 uint filter = 1; 951 foreach (size_t i; 0..uint.sizeof) 952 { 953 bool serverSupport = (server & filter) != 0; // can the server do this capability? 954 bool clientSupport = (client & filter) != 0; // can we support it? 955 if(serverSupport && clientSupport) 956 common |= filter; 957 filter <<= 1; // check next flag 958 } 959 return common; 960 } 961 962 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags) 963 { 964 auto cCaps = getCommonCapabilities(serverCaps, capFlags); 965 966 // We cannot operate in <4.1 protocol, so we'll force it even if the user 967 // didn't supply it 968 cCaps |= SvrCapFlags.PROTOCOL41; 969 cCaps |= SvrCapFlags.SECURE_CONNECTION; 970 971 return cCaps; 972 } 973 974 package(mysql) void authenticate(Connection conn, ubyte[] greeting) 975 in 976 { 977 assert(conn._open == Connection.OpenState.connected); 978 } 979 out 980 { 981 assert(conn._open == Connection.OpenState.authenticated); 982 } 983 body 984 { 985 auto token = makeToken(conn._pwd, greeting); 986 auto authPacket = conn.buildAuthPacket(token); 987 conn._socket.send(authPacket); 988 989 auto packet = conn.getPacket(); 990 auto okp = OKErrorPacket(packet); 991 enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 992 conn._open = Connection.OpenState.authenticated; 993 } 994 995 // Register prepared statement 996 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql) 997 { 998 scope(failure) conn.kill(); 999 1000 PreparedServerInfo info; 1001 1002 conn.sendCmd(CommandType.STMT_PREPARE, sql); 1003 conn._fieldCount = 0; 1004 1005 ubyte[] packet = conn.getPacket(); 1006 if(packet.front == ResultPacketMarker.ok) 1007 { 1008 packet.popFront(); 1009 info.statementId = packet.consume!int(); 1010 conn._fieldCount = packet.consume!short(); 1011 info.numParams = packet.consume!short(); 1012 1013 packet.popFront(); // one byte filler 1014 info.psWarnings = packet.consume!short(); 1015 1016 // At this point the server also sends field specs for parameters 1017 // and columns if there were any of each 1018 info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams); 1019 } 1020 else if(packet.front == ResultPacketMarker.error) 1021 { 1022 auto error = OKErrorPacket(packet); 1023 enforcePacketOK(error); 1024 assert(0); // FIXME: what now? 1025 } 1026 else 1027 assert(0); // FIXME: what now? 1028 1029 return info; 1030 } 1031 1032 /++ 1033 Flush any outstanding result set elements. 1034 1035 When the server responds to a command that produces a result set, it 1036 queues the whole set of corresponding packets over the current connection. 1037 Before that `Connection` can embark on any new command, it must receive 1038 all of those packets and junk them. 1039 1040 As of v1.1.4, this is done automatically as needed. But you can still 1041 call this manually to force a purge to occur when you want. 1042 1043 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/) 1044 +/ 1045 package(mysql) ulong purgeResult(Connection conn) 1046 { 1047 scope(failure) conn.kill(); 1048 1049 conn._lastCommandID++; 1050 1051 ulong rows = 0; 1052 if (conn._headersPending) 1053 { 1054 for (size_t i = 0;; i++) 1055 { 1056 if (conn.getPacket().isEOFPacket()) 1057 { 1058 conn._headersPending = false; 1059 break; 1060 } 1061 enforce!MYXProtocol(i < conn._fieldCount, 1062 text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found.")); 1063 } 1064 } 1065 if (conn._rowsPending) 1066 { 1067 for (;; rows++) 1068 { 1069 if (conn.getPacket().isEOFPacket()) 1070 { 1071 conn._rowsPending = conn._binaryPending = false; 1072 break; 1073 } 1074 } 1075 } 1076 conn.resetPacket(); 1077 return rows; 1078 } 1079 1080 /++ 1081 Get a textual report on the server status. 1082 1083 (COM_STATISTICS) 1084 +/ 1085 package(mysql) string serverStats(Connection conn) 1086 { 1087 conn.sendCmd(CommandType.STATISTICS, []); 1088 return cast(string) conn.getPacket(); 1089 } 1090 1091 /++ 1092 Enable multiple statement commands. 1093 1094 This can be used later if this feature was not requested in the client capability flags. 1095 1096 Warning: This functionality is currently untested. 1097 1098 Params: on = Boolean value to turn the capability on or off. 1099 +/ 1100 //TODO: Need to test this 1101 package(mysql) void enableMultiStatements(Connection conn, bool on) 1102 { 1103 scope(failure) conn.kill(); 1104 1105 ubyte[] t; 1106 t.length = 2; 1107 t[0] = on ? 0 : 1; 1108 t[1] = 0; 1109 conn.sendCmd(CommandType.STMT_OPTION, t); 1110 1111 // For some reason this command gets an EOF packet as response 1112 auto packet = conn.getPacket(); 1113 enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1114 }