1 /++ 2 Internal - Low-level communications. 3 4 Consider this module the main entry point for the low-level MySQL/MariaDB 5 protocol code. The other modules in `mysql.protocol` are mainly tools 6 to support this module. 7 8 Previously, the code handling low-level protocol details was scattered all 9 across the library. Such functionality has been factored out into this module, 10 to be kept in one place for better encapsulation and to facilitate further 11 cleanup and refactoring. 12 13 EXPECT MAJOR CHANGES to this entire `mysql.protocol` sub-package until it 14 eventually settles into what will eventually become a low-level library 15 containing the bulk of the MySQL/MariaDB-specific code. Hang on tight... 16 17 Next tasks for this sub-package's cleanup: 18 - Reduce this module's reliance on Connection. 19 - Abstract out a PacketStream to clean up getPacket and related functionality. 20 +/ 21 module mysql.protocol.comms; 22 23 import std.algorithm; 24 import std.conv; 25 import std.digest.sha; 26 import std.exception; 27 import std.range; 28 import std.variant; 29 30 import mysql.connection; 31 import mysql.exceptions; 32 import mysql.prepared; 33 import mysql.result; 34 import mysql.types; 35 36 import mysql.protocol.constants; 37 import mysql.protocol.extra_types; 38 import mysql.protocol.packet_helpers; 39 import mysql.protocol.packets; 40 import mysql.protocol.sockets; 41 42 /// Low-level comms code relating to prepared statements. 43 package struct ProtocolPrepared 44 { 45 import std.conv; 46 import std.datetime; 47 import std.variant; 48 import mysql.types; 49 50 static ubyte[] makeBitmap(in Variant[] inParams) 51 { 52 size_t bml = (inParams.length+7)/8; 53 ubyte[] bma; 54 bma.length = bml; 55 foreach (i; 0..inParams.length) 56 { 57 if(inParams[i].type != typeid(typeof(null))) 58 continue; 59 size_t bn = i/8; 60 size_t bb = i%8; 61 ubyte sr = 1; 62 sr <<= bb; 63 bma[bn] |= sr; 64 } 65 return bma; 66 } 67 68 static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow 69 { 70 ubyte[] prefix; 71 prefix.length = 14; 72 73 prefix[4] = CommandType.STMT_EXECUTE; 74 hStmt.packInto(prefix[5..9]); 75 prefix[9] = flags; // flags, no cursor 76 prefix[10] = 1; // iteration count - currently always 1 77 prefix[11] = 0; 78 prefix[12] = 0; 79 prefix[13] = 0; 80 81 return prefix; 82 } 83 84 static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa, 85 out ubyte[] vals, out bool longData) 86 { 87 size_t pc = inParams.length; 88 ubyte[] types; 89 types.length = pc*2; 90 size_t alloc = pc*20; 91 vals.length = alloc; 92 uint vcl = 0, len; 93 int ct = 0; 94 95 void reAlloc(size_t n) 96 { 97 if (vcl+n < alloc) 98 return; 99 size_t inc = (alloc*3)/2; 100 if (inc < n) 101 inc = n; 102 alloc += inc; 103 vals.length = alloc; 104 } 105 106 foreach (size_t i; 0..pc) 107 { 108 enum UNSIGNED = 0x80; 109 enum SIGNED = 0; 110 if (psa[i].chunkSize) 111 longData= true; 112 if (inParams[i].type == typeid(typeof(null))) 113 { 114 types[ct++] = SQLType.NULL; 115 types[ct++] = SIGNED; 116 continue; 117 } 118 Variant v = inParams[i]; 119 SQLType ext = psa[i].type; 120 string ts = v.type.toString(); 121 bool isRef; 122 if (ts[$-1] == '*') 123 { 124 ts.length = ts.length-1; 125 isRef= true; 126 } 127 128 switch (ts) 129 { 130 case "bool": 131 case "const(bool)": 132 case "immutable(bool)": 133 case "shared(immutable(bool))": 134 if (ext == SQLType.INFER_FROM_D_TYPE) 135 types[ct++] = SQLType.BIT; 136 else 137 types[ct++] = cast(ubyte) ext; 138 types[ct++] = SIGNED; 139 reAlloc(2); 140 bool bv = isRef? *(v.get!(const(bool*))): v.get!(const(bool)); 141 vals[vcl++] = 1; 142 vals[vcl++] = bv? 0x31: 0x30; 143 break; 144 case "byte": 145 case "const(byte)": 146 case "immutable(byte)": 147 case "shared(immutable(byte))": 148 types[ct++] = SQLType.TINY; 149 types[ct++] = SIGNED; 150 reAlloc(1); 151 vals[vcl++] = isRef? *(v.get!(const(byte*))): v.get!(const(byte)); 152 break; 153 case "ubyte": 154 case "const(ubyte)": 155 case "immutable(ubyte)": 156 case "shared(immutable(ubyte))": 157 types[ct++] = SQLType.TINY; 158 types[ct++] = UNSIGNED; 159 reAlloc(1); 160 vals[vcl++] = isRef? *(v.get!(const(ubyte*))): v.get!(const(ubyte)); 161 break; 162 case "short": 163 case "const(short)": 164 case "immutable(short)": 165 case "shared(immutable(short))": 166 types[ct++] = SQLType.SHORT; 167 types[ct++] = SIGNED; 168 reAlloc(2); 169 short si = isRef? *(v.get!(const(short*))): v.get!(const(short)); 170 vals[vcl++] = cast(ubyte) (si & 0xff); 171 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 172 break; 173 case "ushort": 174 case "const(ushort)": 175 case "immutable(ushort)": 176 case "shared(immutable(ushort))": 177 types[ct++] = SQLType.SHORT; 178 types[ct++] = UNSIGNED; 179 reAlloc(2); 180 ushort us = isRef? *(v.get!(const(ushort*))): v.get!(const(ushort)); 181 vals[vcl++] = cast(ubyte) (us & 0xff); 182 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 183 break; 184 case "int": 185 case "const(int)": 186 case "immutable(int)": 187 case "shared(immutable(int))": 188 types[ct++] = SQLType.INT; 189 types[ct++] = SIGNED; 190 reAlloc(4); 191 int ii = isRef? *(v.get!(const(int*))): v.get!(const(int)); 192 vals[vcl++] = cast(ubyte) (ii & 0xff); 193 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 194 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 195 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 196 break; 197 case "uint": 198 case "const(uint)": 199 case "immutable(uint)": 200 case "shared(immutable(uint))": 201 types[ct++] = SQLType.INT; 202 types[ct++] = UNSIGNED; 203 reAlloc(4); 204 uint ui = isRef? *(v.get!(const(uint*))): v.get!(const(uint)); 205 vals[vcl++] = cast(ubyte) (ui & 0xff); 206 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 207 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 208 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 209 break; 210 case "long": 211 case "const(long)": 212 case "immutable(long)": 213 case "shared(immutable(long))": 214 types[ct++] = SQLType.LONGLONG; 215 types[ct++] = SIGNED; 216 reAlloc(8); 217 long li = isRef? *(v.get!(const(long*))): v.get!(const(long)); 218 vals[vcl++] = cast(ubyte) (li & 0xff); 219 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 220 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 221 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 222 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 223 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 224 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 225 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 226 break; 227 case "ulong": 228 case "const(ulong)": 229 case "immutable(ulong)": 230 case "shared(immutable(ulong))": 231 types[ct++] = SQLType.LONGLONG; 232 types[ct++] = UNSIGNED; 233 reAlloc(8); 234 ulong ul = isRef? *(v.get!(const(ulong*))): v.get!(const(ulong)); 235 vals[vcl++] = cast(ubyte) (ul & 0xff); 236 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 237 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 238 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 239 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 240 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 241 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 242 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 243 break; 244 case "float": 245 case "const(float)": 246 case "immutable(float)": 247 case "shared(immutable(float))": 248 types[ct++] = SQLType.FLOAT; 249 types[ct++] = SIGNED; 250 reAlloc(4); 251 float f = isRef? *(v.get!(const(float*))): v.get!(const(float)); 252 ubyte* ubp = cast(ubyte*) &f; 253 vals[vcl++] = *ubp++; 254 vals[vcl++] = *ubp++; 255 vals[vcl++] = *ubp++; 256 vals[vcl++] = *ubp; 257 break; 258 case "double": 259 case "const(double)": 260 case "immutable(double)": 261 case "shared(immutable(double))": 262 types[ct++] = SQLType.DOUBLE; 263 types[ct++] = SIGNED; 264 reAlloc(8); 265 double d = isRef? *(v.get!(const(double*))): v.get!(const(double)); 266 ubyte* ubp = cast(ubyte*) &d; 267 vals[vcl++] = *ubp++; 268 vals[vcl++] = *ubp++; 269 vals[vcl++] = *ubp++; 270 vals[vcl++] = *ubp++; 271 vals[vcl++] = *ubp++; 272 vals[vcl++] = *ubp++; 273 vals[vcl++] = *ubp++; 274 vals[vcl++] = *ubp; 275 break; 276 case "std.datetime.date.Date": 277 case "const(std.datetime.date.Date)": 278 case "immutable(std.datetime.date.Date)": 279 case "shared(immutable(std.datetime.date.Date))": 280 281 case "std.datetime.Date": 282 case "const(std.datetime.Date)": 283 case "immutable(std.datetime.Date)": 284 case "shared(immutable(std.datetime.Date))": 285 types[ct++] = SQLType.DATE; 286 types[ct++] = SIGNED; 287 Date date = isRef? *(v.get!(const(Date*))): v.get!(const(Date)); 288 ubyte[] da = pack(date); 289 size_t l = da.length; 290 reAlloc(l); 291 vals[vcl..vcl+l] = da[]; 292 vcl += l; 293 break; 294 case "std.datetime.TimeOfDay": 295 case "const(std.datetime.TimeOfDay)": 296 case "immutable(std.datetime.TimeOfDay)": 297 case "shared(immutable(std.datetime.TimeOfDay))": 298 299 case "std.datetime.date.TimeOfDay": 300 case "const(std.datetime.date.TimeOfDay)": 301 case "immutable(std.datetime.date.TimeOfDay)": 302 case "shared(immutable(std.datetime.date.TimeOfDay))": 303 304 case "std.datetime.Time": 305 case "const(std.datetime.Time)": 306 case "immutable(std.datetime.Time)": 307 case "shared(immutable(std.datetime.Time))": 308 types[ct++] = SQLType.TIME; 309 types[ct++] = SIGNED; 310 TimeOfDay time = isRef? *(v.get!(const(TimeOfDay*))): v.get!(const(TimeOfDay)); 311 ubyte[] ta = pack(time); 312 size_t l = ta.length; 313 reAlloc(l); 314 vals[vcl..vcl+l] = ta[]; 315 vcl += l; 316 break; 317 case "std.datetime.date.DateTime": 318 case "const(std.datetime.date.DateTime)": 319 case "immutable(std.datetime.date.DateTime)": 320 case "shared(immutable(std.datetime.date.DateTime))": 321 322 case "std.datetime.DateTime": 323 case "const(std.datetime.DateTime)": 324 case "immutable(std.datetime.DateTime)": 325 case "shared(immutable(std.datetime.DateTime))": 326 types[ct++] = SQLType.DATETIME; 327 types[ct++] = SIGNED; 328 DateTime dt = isRef? *(v.get!(const(DateTime*))): v.get!(const(DateTime)); 329 ubyte[] da = pack(dt); 330 size_t l = da.length; 331 reAlloc(l); 332 vals[vcl..vcl+l] = da[]; 333 vcl += l; 334 break; 335 case "mysql.types.Timestamp": 336 case "const(mysql.types.Timestamp)": 337 case "immutable(mysql.types.Timestamp)": 338 case "shared(immutable(mysql.types.Timestamp))": 339 types[ct++] = SQLType.TIMESTAMP; 340 types[ct++] = SIGNED; 341 Timestamp tms = isRef? *(v.get!(const(Timestamp*))): v.get!(const(Timestamp)); 342 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep).getDateTime; 343 ubyte[] da = pack(dt); 344 size_t l = da.length; 345 reAlloc(l); 346 vals[vcl..vcl+l] = da[]; 347 vcl += l; 348 break; 349 case "char[]": 350 case "const(char[])": 351 case "immutable(char[])": 352 case "const(char)[]": 353 case "immutable(char)[]": 354 case "shared(immutable(char)[])": 355 case "shared(immutable(char))[]": 356 case "shared(immutable(char[]))": 357 if (ext == SQLType.INFER_FROM_D_TYPE) 358 types[ct++] = SQLType.VARCHAR; 359 else 360 types[ct++] = cast(ubyte) ext; 361 types[ct++] = SIGNED; 362 const char[] ca = isRef? *(v.get!(const(char[]*))): v.get!(const(char[])); 363 ubyte[] packed = packLCS(cast(void[]) ca); 364 reAlloc(packed.length); 365 vals[vcl..vcl+packed.length] = packed[]; 366 vcl += packed.length; 367 break; 368 case "byte[]": 369 case "const(byte[])": 370 case "immutable(byte[])": 371 case "const(byte)[]": 372 case "immutable(byte)[]": 373 case "shared(immutable(byte)[])": 374 case "shared(immutable(byte))[]": 375 case "shared(immutable(byte[]))": 376 if (ext == SQLType.INFER_FROM_D_TYPE) 377 types[ct++] = SQLType.TINYBLOB; 378 else 379 types[ct++] = cast(ubyte) ext; 380 types[ct++] = SIGNED; 381 const byte[] ba = isRef? *(v.get!(const(byte[]*))): v.get!(const(byte[])); 382 ubyte[] packed = packLCS(cast(void[]) ba); 383 reAlloc(packed.length); 384 vals[vcl..vcl+packed.length] = packed[]; 385 vcl += packed.length; 386 break; 387 case "ubyte[]": 388 case "const(ubyte[])": 389 case "immutable(ubyte[])": 390 case "const(ubyte)[]": 391 case "immutable(ubyte)[]": 392 case "shared(immutable(ubyte)[])": 393 case "shared(immutable(ubyte))[]": 394 case "shared(immutable(ubyte[]))": 395 if (ext == SQLType.INFER_FROM_D_TYPE) 396 types[ct++] = SQLType.TINYBLOB; 397 else 398 types[ct++] = cast(ubyte) ext; 399 types[ct++] = SIGNED; 400 const ubyte[] uba = isRef? *(v.get!(const(ubyte[]*))): v.get!(const(ubyte[])); 401 ubyte[] packed = packLCS(cast(void[]) uba); 402 reAlloc(packed.length); 403 vals[vcl..vcl+packed.length] = packed[]; 404 vcl += packed.length; 405 break; 406 case "void": 407 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 408 default: 409 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 410 } 411 } 412 vals.length = vcl; 413 return types; 414 } 415 416 static void sendLongData(MySQLSocket socket, uint hStmt, ParameterSpecialization[] psa) 417 { 418 assert(psa.length <= ushort.max); // parameter number is sent as short 419 foreach (ushort i, PSN psn; psa) 420 { 421 if (!psn.chunkSize) continue; 422 uint cs = psn.chunkSize; 423 uint delegate(ubyte[]) dg = psn.chunkDelegate; 424 425 ubyte[] chunk; 426 chunk.length = cs+11; 427 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 428 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 429 hStmt.packInto(chunk[5..9]); // statement handle 430 packInto(i, chunk[9..11]); // parameter number 431 432 // byte 11 on is payload 433 for (;;) 434 { 435 uint sent = dg(chunk[11..cs+11]); 436 if (sent < cs) 437 { 438 if (sent == 0) // data was exact multiple of chunk size - all sent 439 break; 440 chunk.length = chunk.length - (cs-sent); // trim the chunk 441 sent += 7; // adjust for non-payload bytes 442 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 443 socket.send(chunk); 444 break; 445 } 446 socket.send(chunk); 447 } 448 } 449 } 450 451 static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, 452 Variant[] inParams, ParameterSpecialization[] psa) 453 { 454 conn.autoPurge(); 455 456 ubyte[] packet; 457 conn.resetPacket(); 458 459 ubyte[] prefix = makePSPrefix(hStmt, 0); 460 size_t len = prefix.length; 461 bool longData; 462 463 if (psh.paramCount) 464 { 465 ubyte[] one = [ 1 ]; 466 ubyte[] vals; 467 ubyte[] types = analyseParams(inParams, psa, vals, longData); 468 ubyte[] nbm = makeBitmap(inParams); 469 packet = prefix ~ nbm ~ one ~ types ~ vals; 470 } 471 else 472 packet = prefix; 473 474 if (longData) 475 sendLongData(conn._socket, hStmt, psa); 476 477 assert(packet.length <= uint.max); 478 packet.setPacketHeader(conn.pktNumber); 479 conn.bumpPacket(); 480 conn._socket.send(packet); 481 } 482 } 483 484 package(mysql) struct ExecQueryImplInfo 485 { 486 bool isPrepared; 487 488 // For non-prepared statements: 489 const(char[]) sql; 490 491 // For prepared statements: 492 uint hStmt; 493 PreparedStmtHeaders psh; 494 Variant[] inParams; 495 ParameterSpecialization[] psa; 496 } 497 498 /++ 499 Internal implementation for the exec and query functions. 500 501 Execute a one-off SQL command. 502 503 Any result set can be accessed via Connection.getNextRow(), but you should really be 504 using the query function for such queries. 505 506 Params: ra = An out parameter to receive the number of rows affected. 507 Returns: true if there was a (possibly empty) result set. 508 +/ 509 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info, out ulong ra) 510 { 511 scope(failure) conn.kill(); 512 513 // Send data 514 if(info.isPrepared) 515 ProtocolPrepared.sendCommand(conn, info.hStmt, info.psh, info.inParams, info.psa); 516 else 517 { 518 conn.sendCmd(CommandType.QUERY, info.sql); 519 conn._fieldCount = 0; 520 } 521 522 // Handle response 523 ubyte[] packet = conn.getPacket(); 524 bool rv; 525 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 526 { 527 conn.resetPacket(); 528 auto okp = OKErrorPacket(packet); 529 enforcePacketOK(okp); 530 ra = okp.affected; 531 conn._serverStatus = okp.serverStatus; 532 conn._insertID = okp.insertID; 533 rv = false; 534 } 535 else 536 { 537 // There was presumably a result set 538 assert(packet.front >= 1 && packet.front <= 250); // Result set packet header should have this value 539 conn._headersPending = conn._rowsPending = true; 540 conn._binaryPending = info.isPrepared; 541 auto lcb = packet.consumeIfComplete!LCB(); 542 assert(!lcb.isNull); 543 assert(!lcb.isIncomplete); 544 conn._fieldCount = cast(ushort)lcb.value; 545 assert(conn._fieldCount == lcb.value); 546 rv = true; 547 ra = 0; 548 } 549 return rv; 550 } 551 552 ///ditto 553 package(mysql) bool execQueryImpl(Connection conn, ExecQueryImplInfo info) 554 { 555 ulong rowsAffected; 556 return execQueryImpl(conn, info, rowsAffected); 557 } 558 559 package(mysql) void immediateReleasePrepared(Connection conn, uint statementId) 560 { 561 scope(failure) conn.kill(); 562 563 if(conn.closed()) 564 return; 565 566 ubyte[9] packet_buf; 567 ubyte[] packet = packet_buf; 568 packet.setPacketHeader(0/*packet number*/); 569 conn.bumpPacket(); 570 packet[4] = CommandType.STMT_CLOSE; 571 statementId.packInto(packet[5..9]); 572 conn.purgeResult(); 573 conn._socket.send(packet); 574 // It seems that the server does not find it necessary to send a response 575 // for this command. 576 } 577 578 // Moved here from `struct Row` 579 package(mysql) bool[] consumeNullBitmap(ref ubyte[] packet, uint fieldCount) pure 580 { 581 uint bitmapLength = calcBitmapLength(fieldCount); 582 enforce!MYXProtocol(packet.length >= bitmapLength, "Packet too small to hold null bitmap for all fields"); 583 auto bitmap = packet.consume(bitmapLength); 584 return decodeNullBitmap(bitmap, fieldCount); 585 } 586 587 // Moved here from `struct Row` 588 private static uint calcBitmapLength(uint fieldCount) pure nothrow 589 { 590 return (fieldCount+7+2)/8; 591 } 592 593 // Moved here from `struct Row` 594 // This is to decode the bitmap in a binary result row. First two bits are skipped 595 private bool[] decodeNullBitmap(ubyte[] bitmap, uint numFields) pure nothrow 596 in 597 { 598 assert(bitmap.length >= calcBitmapLength(numFields), 599 "bitmap not large enough to store all null fields"); 600 } 601 out(result) 602 { 603 assert(result.length == numFields); 604 } 605 body 606 { 607 bool[] nulls; 608 nulls.length = numFields; 609 610 // the current byte we are processing for nulls 611 ubyte bits = bitmap.front(); 612 // strip away the first two bits as they are reserved 613 bits >>= 2; 614 // .. and then we only have 6 bits left to process for this byte 615 ubyte bitsLeftInByte = 6; 616 foreach(ref isNull; nulls) 617 { 618 assert(bitsLeftInByte <= 8); 619 // processed all bits? fetch new byte 620 if (bitsLeftInByte == 0) 621 { 622 assert(bits == 0, "not all bits are processed!"); 623 assert(!bitmap.empty, "bits array too short for number of columns"); 624 bitmap.popFront(); 625 bits = bitmap.front; 626 bitsLeftInByte = 8; 627 } 628 assert(bitsLeftInByte > 0); 629 isNull = (bits & 0b0000_0001) != 0; 630 631 // get ready to process next bit 632 bits >>= 1; 633 --bitsLeftInByte; 634 } 635 return nulls; 636 } 637 638 // Moved here from `struct Row.this` 639 package(mysql) void ctorRow(Connection conn, ref ubyte[] packet, ResultSetHeaders rh, bool binary, 640 out Variant[] _values, out bool[] _nulls) 641 in 642 { 643 assert(rh.fieldCount <= uint.max); 644 } 645 body 646 { 647 scope(failure) conn.kill(); 648 649 uint fieldCount = cast(uint)rh.fieldCount; 650 _values.length = _nulls.length = fieldCount; 651 652 if(binary) 653 { 654 // There's a null byte header on a binary result sequence, followed by some bytes of bitmap 655 // indicating which columns are null 656 enforce!MYXProtocol(packet.front == 0, "Expected null header byte for binary result row"); 657 packet.popFront(); 658 _nulls = consumeNullBitmap(packet, fieldCount); 659 } 660 661 foreach(size_t i; 0..fieldCount) 662 { 663 if(binary && _nulls[i]) 664 { 665 _values[i] = null; 666 continue; 667 } 668 669 SQLValue sqlValue; 670 do 671 { 672 FieldDescription fd = rh[i]; 673 sqlValue = packet.consumeIfComplete(fd.type, binary, fd.unsigned, fd.charSet); 674 // TODO: Support chunk delegate 675 if(sqlValue.isIncomplete) 676 packet ~= conn.getPacket(); 677 } while(sqlValue.isIncomplete); 678 assert(!sqlValue.isIncomplete); 679 680 if(sqlValue.isNull) 681 { 682 assert(!binary); 683 assert(!_nulls[i]); 684 _nulls[i] = true; 685 _values[i] = null; 686 } 687 else 688 { 689 // Convert MySQLDate to Phobos Date 690 if(MySQLDate* myDate = sqlValue.value.peek!MySQLDate) 691 sqlValue.value = myDate.getDate(); 692 693 // Convert MySQLDateTime to Phobos DateTime 694 if(MySQLDateTime* myDateTime = sqlValue.value.peek!MySQLDateTime) 695 sqlValue.value = myDateTime.getDateTime(); 696 697 _values[i] = sqlValue.value; 698 } 699 } 700 } 701 702 ////// Moved here from Connection ///////////////////////////////// 703 704 package(mysql) ubyte[] getPacket(Connection conn) 705 { 706 scope(failure) conn.kill(); 707 708 ubyte[4] header; 709 conn._socket.read(header); 710 // number of bytes always set as 24-bit 711 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 712 enforce!MYXProtocol(header[3] == conn.pktNumber, "Server packet out of order"); 713 conn.bumpPacket(); 714 715 ubyte[] packet = new ubyte[numDataBytes]; 716 conn._socket.read(packet); 717 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 718 return packet; 719 } 720 721 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] packet) 722 in 723 { 724 assert(packet.length > 4); // at least 1 byte more than header 725 } 726 body 727 { 728 _socket.write(packet); 729 } 730 731 package(mysql) void send(MySQLSocket _socket, const(ubyte)[] header, const(ubyte)[] data) 732 in 733 { 734 assert(header.length == 4 || header.length == 5/*command type included*/); 735 } 736 body 737 { 738 _socket.write(header); 739 if(data.length) 740 _socket.write(data); 741 } 742 743 package(mysql) void sendCmd(T)(Connection conn, CommandType cmd, const(T)[] data) 744 in 745 { 746 // Internal thread states. Clients shouldn't use this 747 assert(cmd != CommandType.SLEEP); 748 assert(cmd != CommandType.CONNECT); 749 assert(cmd != CommandType.TIME); 750 assert(cmd != CommandType.DELAYED_INSERT); 751 assert(cmd != CommandType.CONNECT_OUT); 752 753 // Deprecated 754 assert(cmd != CommandType.CREATE_DB); 755 assert(cmd != CommandType.DROP_DB); 756 assert(cmd != CommandType.TABLE_DUMP); 757 758 // cannot send more than uint.max bytes. TODO: better error message if we try? 759 assert(data.length <= uint.max); 760 } 761 out 762 { 763 // at this point we should have sent a command 764 assert(conn.pktNumber == 1); 765 } 766 body 767 { 768 scope(failure) conn.kill(); 769 770 conn._lastCommandID++; 771 772 if(!conn._socket.connected) 773 { 774 if(cmd == CommandType.QUIT) 775 return; // Don't bother reopening connection just to quit 776 777 conn._open = Connection.OpenState.notConnected; 778 conn.connect(conn._clientCapabilities); 779 } 780 781 conn.autoPurge(); 782 783 conn.resetPacket(); 784 785 ubyte[] header; 786 header.length = 4 /*header*/ + 1 /*cmd*/; 787 header.setPacketHeader(conn.pktNumber, cast(uint)data.length +1/*cmd byte*/); 788 header[4] = cmd; 789 conn.bumpPacket(); 790 791 conn._socket.send(header, cast(const(ubyte)[])data); 792 } 793 794 package(mysql) OKErrorPacket getCmdResponse(Connection conn, bool asString = false) 795 { 796 auto okp = OKErrorPacket(conn.getPacket()); 797 enforcePacketOK(okp); 798 conn._serverStatus = okp.serverStatus; 799 return okp; 800 } 801 802 package(mysql) ubyte[] buildAuthPacket(Connection conn, ubyte[] token) 803 in 804 { 805 assert(token.length == 20); 806 } 807 body 808 { 809 ubyte[] packet; 810 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + conn._user.length+1 + token.length+1 + conn._db.length+1); 811 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 812 813 // NOTE: we'll set the header last when we know the size 814 815 // Set the default capabilities required by the client 816 conn._cCaps.packInto(packet[4..8]); 817 818 // Request a conventional maximum packet length. 819 1.packInto(packet[8..12]); 820 821 packet ~= 33; // Set UTF-8 as default charSet 822 823 // There's a statutory block of zero bytes here - fill them in. 824 foreach(i; 0 .. 23) 825 packet ~= 0; 826 827 // Add the user name as a null terminated string 828 foreach(i; 0 .. conn._user.length) 829 packet ~= conn._user[i]; 830 packet ~= 0; // \0 831 832 // Add our calculated authentication token as a length prefixed string. 833 assert(token.length <= ubyte.max); 834 if(conn._pwd.length == 0) // Omit the token if the account has no password 835 packet ~= 0; 836 else 837 { 838 packet ~= cast(ubyte)token.length; 839 foreach(i; 0 .. token.length) 840 packet ~= token[i]; 841 } 842 843 // Add the default database as a null terminated string 844 foreach(i; 0 .. conn._db.length) 845 packet ~= conn._db[i]; 846 packet ~= 0; // \0 847 848 // The server sent us a greeting with packet number 0, so we send the auth packet 849 // back with the next number. 850 packet.setPacketHeader(conn.pktNumber); 851 conn.bumpPacket(); 852 return packet; 853 } 854 855 package(mysql) ubyte[] makeToken(string password, ubyte[] authBuf) 856 { 857 auto pass1 = sha1Of(cast(const(ubyte)[])password); 858 auto pass2 = sha1Of(pass1); 859 860 SHA1 sha1; 861 sha1.start(); 862 sha1.put(authBuf); 863 sha1.put(pass2); 864 auto result = sha1.finish(); 865 foreach (size_t i; 0..20) 866 result[i] = result[i] ^ pass1[i]; 867 return result.dup; 868 } 869 870 /// Get the next `mysql.result.Row` of a pending result set. 871 package(mysql) Row getNextRow(Connection conn) 872 { 873 scope(failure) conn.kill(); 874 875 if (conn._headersPending) 876 { 877 conn._rsh = ResultSetHeaders(conn, conn._fieldCount); 878 conn._headersPending = false; 879 } 880 ubyte[] packet; 881 Row rr; 882 packet = conn.getPacket(); 883 if(packet.front == ResultPacketMarker.error) 884 throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__); 885 886 if (packet.isEOFPacket()) 887 { 888 conn._rowsPending = conn._binaryPending = false; 889 return rr; 890 } 891 if (conn._binaryPending) 892 rr = Row(conn, packet, conn._rsh, true); 893 else 894 rr = Row(conn, packet, conn._rsh, false); 895 //rr._valid = true; 896 return rr; 897 } 898 899 package(mysql) void consumeServerInfo(Connection conn, ref ubyte[] packet) 900 { 901 scope(failure) conn.kill(); 902 903 conn._sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 904 conn._sCharSet = packet.consume!ubyte(); // server_language 905 conn._serverStatus = packet.consume!ushort(); //server_status 906 conn._sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 907 conn._sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 908 909 enforce!MYX(conn._sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 910 enforce!MYX(conn._sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 911 } 912 913 package(mysql) ubyte[] parseGreeting(Connection conn) 914 { 915 scope(failure) conn.kill(); 916 917 ubyte[] packet = conn.getPacket(); 918 919 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 920 { 921 auto okp = OKErrorPacket(packet); 922 enforce!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 923 } 924 925 conn._protocol = packet.consume!ubyte(); 926 927 conn._serverVersion = packet.consume!string(packet.countUntil(0)); 928 packet.skip(1); // \0 terminated _serverVersion 929 930 conn._sThread = packet.consume!uint(); 931 932 // read first part of scramble buf 933 ubyte[] authBuf; 934 authBuf.length = 255; 935 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 936 937 enforce!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 938 939 conn.consumeServerInfo(packet); 940 941 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 942 packet.skip(10); // filler of \0 943 944 // rest of the scramble 945 auto len = packet.countUntil(0); 946 enforce!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 947 enforce(authBuf.length > 8+len); 948 authBuf[8..8+len] = packet.consume(len)[]; 949 authBuf.length = 8+len; // cut to correct size 950 enforce!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 951 952 return authBuf; 953 } 954 955 package(mysql) SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 956 { 957 SvrCapFlags common; 958 uint filter = 1; 959 foreach (size_t i; 0..uint.sizeof) 960 { 961 bool serverSupport = (server & filter) != 0; // can the server do this capability? 962 bool clientSupport = (client & filter) != 0; // can we support it? 963 if(serverSupport && clientSupport) 964 common |= filter; 965 filter <<= 1; // check next flag 966 } 967 return common; 968 } 969 970 package(mysql) SvrCapFlags setClientFlags(SvrCapFlags serverCaps, SvrCapFlags capFlags) 971 { 972 auto cCaps = getCommonCapabilities(serverCaps, capFlags); 973 974 // We cannot operate in <4.1 protocol, so we'll force it even if the user 975 // didn't supply it 976 cCaps |= SvrCapFlags.PROTOCOL41; 977 cCaps |= SvrCapFlags.SECURE_CONNECTION; 978 979 return cCaps; 980 } 981 982 package(mysql) void authenticate(Connection conn, ubyte[] greeting) 983 in 984 { 985 assert(conn._open == Connection.OpenState.connected); 986 } 987 out 988 { 989 assert(conn._open == Connection.OpenState.authenticated); 990 } 991 body 992 { 993 auto token = makeToken(conn._pwd, greeting); 994 auto authPacket = conn.buildAuthPacket(token); 995 conn._socket.send(authPacket); 996 997 auto packet = conn.getPacket(); 998 auto okp = OKErrorPacket(packet); 999 enforce!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 1000 conn._open = Connection.OpenState.authenticated; 1001 } 1002 1003 // Register prepared statement 1004 package(mysql) PreparedServerInfo performRegister(Connection conn, const(char[]) sql) 1005 { 1006 scope(failure) conn.kill(); 1007 1008 PreparedServerInfo info; 1009 1010 conn.sendCmd(CommandType.STMT_PREPARE, sql); 1011 conn._fieldCount = 0; 1012 1013 ubyte[] packet = conn.getPacket(); 1014 if(packet.front == ResultPacketMarker.ok) 1015 { 1016 packet.popFront(); 1017 info.statementId = packet.consume!int(); 1018 conn._fieldCount = packet.consume!short(); 1019 info.numParams = packet.consume!short(); 1020 1021 packet.popFront(); // one byte filler 1022 info.psWarnings = packet.consume!short(); 1023 1024 // At this point the server also sends field specs for parameters 1025 // and columns if there were any of each 1026 info.headers = PreparedStmtHeaders(conn, conn._fieldCount, info.numParams); 1027 } 1028 else if(packet.front == ResultPacketMarker.error) 1029 { 1030 auto error = OKErrorPacket(packet); 1031 enforcePacketOK(error); 1032 assert(0); // FIXME: what now? 1033 } 1034 else 1035 assert(0); // FIXME: what now? 1036 1037 return info; 1038 } 1039 1040 /++ 1041 Flush any outstanding result set elements. 1042 1043 When the server responds to a command that produces a result set, it 1044 queues the whole set of corresponding packets over the current connection. 1045 Before that `Connection` can embark on any new command, it must receive 1046 all of those packets and junk them. 1047 1048 As of v1.1.4, this is done automatically as needed. But you can still 1049 call this manually to force a purge to occur when you want. 1050 1051 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/) 1052 +/ 1053 package(mysql) ulong purgeResult(Connection conn) 1054 { 1055 scope(failure) conn.kill(); 1056 1057 conn._lastCommandID++; 1058 1059 ulong rows = 0; 1060 if (conn._headersPending) 1061 { 1062 for (size_t i = 0;; i++) 1063 { 1064 if (conn.getPacket().isEOFPacket()) 1065 { 1066 conn._headersPending = false; 1067 break; 1068 } 1069 enforce!MYXProtocol(i < conn._fieldCount, 1070 text("Field header count (", conn._fieldCount, ") exceeded but no EOF packet found.")); 1071 } 1072 } 1073 if (conn._rowsPending) 1074 { 1075 for (;; rows++) 1076 { 1077 if (conn.getPacket().isEOFPacket()) 1078 { 1079 conn._rowsPending = conn._binaryPending = false; 1080 break; 1081 } 1082 } 1083 } 1084 conn.resetPacket(); 1085 return rows; 1086 } 1087 1088 /++ 1089 Get a textual report on the server status. 1090 1091 (COM_STATISTICS) 1092 +/ 1093 package(mysql) string serverStats(Connection conn) 1094 { 1095 conn.sendCmd(CommandType.STATISTICS, []); 1096 return cast(string) conn.getPacket(); 1097 } 1098 1099 /++ 1100 Enable multiple statement commands. 1101 1102 This can be used later if this feature was not requested in the client capability flags. 1103 1104 Warning: This functionality is currently untested. 1105 1106 Params: on = Boolean value to turn the capability on or off. 1107 +/ 1108 //TODO: Need to test this 1109 package(mysql) void enableMultiStatements(Connection conn, bool on) 1110 { 1111 scope(failure) conn.kill(); 1112 1113 ubyte[] t; 1114 t.length = 2; 1115 t[0] = on ? 0 : 1; 1116 t[1] = 0; 1117 conn.sendCmd(CommandType.STMT_OPTION, t); 1118 1119 // For some reason this command gets an EOF packet as response 1120 auto packet = conn.getPacket(); 1121 enforce!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1122 }