1 module mysql.protocol.commands; 2 3 import std.algorithm; 4 import std.conv; 5 import std.datetime; 6 import std.digest.sha; 7 import std.exception; 8 import std.range; 9 import std.socket; 10 import std.stdio; 11 import std.string; 12 import std.traits; 13 import std.variant; 14 15 import mysql.common; 16 import mysql.connection; 17 import mysql.protocol.constants; 18 import mysql.protocol.extra_types; 19 import mysql.protocol.packets; 20 21 /** 22 * Encapsulation of an SQL command or query. 23 * 24 * A Command be be either a one-off SQL query, or may use a prepared statement. 25 * Commands that are expected to return a result set - queries - have distinctive methods 26 * that are enforced. That is it will be an error to call such a method with an SQL command 27 * that does not produce a result set. 28 */ 29 struct Command 30 { 31 package: 32 Connection _con; 33 const(char)[] _sql; 34 uint _hStmt; 35 ulong _insertID; 36 bool _rowsPending, _headersPending, _pendingBinary, _rebound; 37 ushort _psParams, _psWarnings, _fieldCount; 38 ResultSetHeaders _rsh; 39 PreparedStmtHeaders _psh; 40 Variant[] _inParams; 41 ParameterSpecialization[] _psa; 42 string _prevFunc; 43 44 bool sendCmd(CommandType cmd) 45 { 46 enforceEx!MYX(!(_headersPending || _rowsPending), 47 "There are result set elements pending - purgeResult() required."); 48 49 scope(failure) _con.kill(); 50 _con.sendCmd(cmd, _sql); 51 return true; 52 } 53 54 static ubyte[] makeBitmap(in ParameterSpecialization[] psa) pure nothrow 55 { 56 size_t bml = (psa.length+7)/8; 57 ubyte[] bma; 58 bma.length = bml; 59 foreach (size_t i, PSN psn; psa) 60 { 61 if (!psn.isNull) 62 continue; 63 size_t bn = i/8; 64 size_t bb = i%8; 65 ubyte sr = 1; 66 sr <<= bb; 67 bma[bn] |= sr; 68 } 69 return bma; 70 } 71 72 ubyte[] makePSPrefix(ubyte flags = 0) pure const nothrow 73 { 74 ubyte[] prefix; 75 prefix.length = 14; 76 77 prefix[4] = CommandType.STMT_EXECUTE; 78 _hStmt.packInto(prefix[5..9]); 79 prefix[9] = flags; // flags, no cursor 80 prefix[10] = 1; // iteration count - currently always 1 81 prefix[11] = 0; 82 prefix[12] = 0; 83 prefix[13] = 0; 84 85 return prefix; 86 } 87 88 ubyte[] analyseParams(out ubyte[] vals, out bool longData) 89 { 90 size_t pc = _inParams.length; 91 ubyte[] types; 92 types.length = pc*2; 93 size_t alloc = pc*20; 94 vals.length = alloc; 95 uint vcl = 0, len; 96 int ct = 0; 97 98 void reAlloc(size_t n) 99 { 100 if (vcl+n < alloc) 101 return; 102 size_t inc = (alloc*3)/2; 103 if (inc < n) 104 inc = n; 105 alloc += inc; 106 vals.length = alloc; 107 } 108 109 foreach (size_t i; 0..pc) 110 { 111 if (_psa[i].chunkSize) 112 longData= true; 113 bool isnull = _psa[i].isNull; 114 Variant v = _inParams[i]; 115 SQLType ext = _psa[i].type; 116 string ts = v.type.toString(); 117 bool isRef; 118 if (ts[$-1] == '*') 119 { 120 ts.length = ts.length-1; 121 isRef= true; 122 } 123 124 enum UNSIGNED = 0x80; 125 enum SIGNED = 0; 126 switch (ts) 127 { 128 case "bool": 129 if (ext == SQLType.INFER_FROM_D_TYPE) 130 types[ct++] = SQLType.BIT; 131 else 132 types[ct++] = cast(ubyte) ext; 133 types[ct++] = SIGNED; 134 if (isnull) break; 135 reAlloc(2); 136 bool bv = isRef? *(v.get!(bool*)): v.get!(bool); 137 vals[vcl++] = 1; 138 vals[vcl++] = bv? 0x31: 0x30; 139 break; 140 case "byte": 141 types[ct++] = SQLType.TINY; 142 types[ct++] = SIGNED; 143 if (isnull) break; 144 reAlloc(1); 145 vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte); 146 break; 147 case "ubyte": 148 types[ct++] = SQLType.TINY; 149 types[ct++] = UNSIGNED; 150 if (isnull) break; 151 reAlloc(1); 152 vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte); 153 break; 154 case "short": 155 types[ct++] = SQLType.SHORT; 156 types[ct++] = SIGNED; 157 if (isnull) break; 158 reAlloc(2); 159 short si = isRef? *(v.get!(short*)): v.get!(short); 160 vals[vcl++] = cast(ubyte) (si & 0xff); 161 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 162 break; 163 case "ushort": 164 types[ct++] = SQLType.SHORT; 165 types[ct++] = UNSIGNED; 166 reAlloc(2); 167 ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort); 168 vals[vcl++] = cast(ubyte) (us & 0xff); 169 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 170 break; 171 case "int": 172 types[ct++] = SQLType.INT; 173 types[ct++] = SIGNED; 174 if (isnull) break; 175 reAlloc(4); 176 int ii = isRef? *(v.get!(int*)): v.get!(int); 177 vals[vcl++] = cast(ubyte) (ii & 0xff); 178 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 179 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 180 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 181 break; 182 case "uint": 183 types[ct++] = SQLType.INT; 184 types[ct++] = UNSIGNED; 185 if (isnull) break; 186 reAlloc(4); 187 uint ui = isRef? *(v.get!(uint*)): v.get!(uint); 188 vals[vcl++] = cast(ubyte) (ui & 0xff); 189 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 190 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 191 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 192 break; 193 case "long": 194 types[ct++] = SQLType.LONGLONG; 195 types[ct++] = SIGNED; 196 if (isnull) break; 197 reAlloc(8); 198 long li = isRef? *(v.get!(long*)): v.get!(long); 199 vals[vcl++] = cast(ubyte) (li & 0xff); 200 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 201 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 202 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 203 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 204 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 205 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 206 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 207 break; 208 case "ulong": 209 types[ct++] = SQLType.LONGLONG; 210 types[ct++] = UNSIGNED; 211 if (isnull) break; 212 reAlloc(8); 213 ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong); 214 vals[vcl++] = cast(ubyte) (ul & 0xff); 215 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 216 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 217 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 218 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 219 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 220 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 221 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 222 break; 223 case "float": 224 types[ct++] = SQLType.FLOAT; 225 types[ct++] = SIGNED; 226 if (isnull) break; 227 reAlloc(4); 228 float f = isRef? *(v.get!(float*)): v.get!(float); 229 ubyte* ubp = cast(ubyte*) &f; 230 vals[vcl++] = *ubp++; 231 vals[vcl++] = *ubp++; 232 vals[vcl++] = *ubp++; 233 vals[vcl++] = *ubp; 234 break; 235 case "double": 236 types[ct++] = SQLType.DOUBLE; 237 types[ct++] = SIGNED; 238 if (isnull) break; 239 reAlloc(8); 240 double d = isRef? *(v.get!(double*)): v.get!(double); 241 ubyte* ubp = cast(ubyte*) &d; 242 vals[vcl++] = *ubp++; 243 vals[vcl++] = *ubp++; 244 vals[vcl++] = *ubp++; 245 vals[vcl++] = *ubp++; 246 vals[vcl++] = *ubp++; 247 vals[vcl++] = *ubp++; 248 vals[vcl++] = *ubp++; 249 vals[vcl++] = *ubp; 250 break; 251 case "std.datetime.Date": 252 types[ct++] = SQLType.DATE; 253 types[ct++] = SIGNED; 254 Date date = isRef? *(v.get!(Date*)): v.get!(Date); 255 ubyte[] da = pack(date); 256 size_t l = da.length; 257 if (isnull) break; 258 reAlloc(l); 259 vals[vcl..vcl+l] = da[]; 260 vcl += l; 261 break; 262 case "std.datetime.Time": 263 types[ct++] = SQLType.TIME; 264 types[ct++] = SIGNED; 265 TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay); 266 ubyte[] ta = pack(time); 267 size_t l = ta.length; 268 if (isnull) break; 269 reAlloc(l); 270 vals[vcl..vcl+l] = ta[]; 271 vcl += l; 272 break; 273 case "std.datetime.DateTime": 274 types[ct++] = SQLType.DATETIME; 275 types[ct++] = SIGNED; 276 DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime); 277 ubyte[] da = pack(dt); 278 size_t l = da.length; 279 if (isnull) break; 280 reAlloc(l); 281 vals[vcl..vcl+l] = da[]; 282 vcl += l; 283 break; 284 case "connect.Timestamp": 285 types[ct++] = SQLType.TIMESTAMP; 286 types[ct++] = SIGNED; 287 Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp); 288 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 289 ubyte[] da = pack(dt); 290 size_t l = da.length; 291 if (isnull) break; 292 reAlloc(l); 293 vals[vcl..vcl+l] = da[]; 294 vcl += l; 295 break; 296 case "immutable(char)[]": 297 if (ext == SQLType.INFER_FROM_D_TYPE) 298 types[ct++] = SQLType.VARCHAR; 299 else 300 types[ct++] = cast(ubyte) ext; 301 types[ct++] = SIGNED; 302 if (isnull) break; 303 string s = isRef? *(v.get!(string*)): v.get!(string); 304 ubyte[] packed = packLCS(cast(void[]) s); 305 reAlloc(packed.length); 306 vals[vcl..vcl+packed.length] = packed[]; 307 vcl += packed.length; 308 break; 309 case "char[]": 310 if (ext == SQLType.INFER_FROM_D_TYPE) 311 types[ct++] = SQLType.VARCHAR; 312 else 313 types[ct++] = cast(ubyte) ext; 314 types[ct++] = SIGNED; 315 if (isnull) break; 316 char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]); 317 ubyte[] packed = packLCS(cast(void[]) ca); 318 reAlloc(packed.length); 319 vals[vcl..vcl+packed.length] = packed[]; 320 vcl += packed.length; 321 break; 322 case "byte[]": 323 if (ext == SQLType.INFER_FROM_D_TYPE) 324 types[ct++] = SQLType.TINYBLOB; 325 else 326 types[ct++] = cast(ubyte) ext; 327 types[ct++] = SIGNED; 328 if (isnull) break; 329 byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]); 330 ubyte[] packed = packLCS(cast(void[]) ba); 331 reAlloc(packed.length); 332 vals[vcl..vcl+packed.length] = packed[]; 333 vcl += packed.length; 334 break; 335 case "ubyte[]": 336 if (ext == SQLType.INFER_FROM_D_TYPE) 337 types[ct++] = SQLType.TINYBLOB; 338 else 339 types[ct++] = cast(ubyte) ext; 340 types[ct++] = SIGNED; 341 if (isnull) break; 342 ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]); 343 ubyte[] packed = packLCS(cast(void[]) uba); 344 reAlloc(packed.length); 345 vals[vcl..vcl+packed.length] = packed[]; 346 vcl += packed.length; 347 break; 348 case "void": 349 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 350 default: 351 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 352 } 353 } 354 vals.length = vcl; 355 return types; 356 } 357 358 void sendLongData() 359 { 360 assert(_psa.length <= ushort.max); // parameter number is sent as short 361 foreach (ushort i, PSN psn; _psa) 362 { 363 if (!psn.chunkSize) continue; 364 uint cs = psn.chunkSize; 365 uint delegate(ubyte[]) dg = psn.chunkDelegate; 366 367 ubyte[] chunk; 368 chunk.length = cs+11; 369 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 370 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 371 _hStmt.packInto(chunk[5..9]); // statement handle 372 packInto(i, chunk[9..11]); // parameter number 373 374 // byte 11 on is payload 375 for (;;) 376 { 377 uint sent = dg(chunk[11..cs+11]); 378 if (sent < cs) 379 { 380 if (sent == 0) // data was exact multiple of chunk size - all sent 381 break; 382 sent += 7; // adjust for non-payload bytes 383 chunk.length = chunk.length - (cs-sent); // trim the chunk 384 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 385 _con.send(chunk); 386 break; 387 } 388 _con.send(chunk); 389 } 390 } 391 } 392 393 public: 394 395 /** 396 * Construct a naked Command object 397 * 398 * Params: con = A Connection object to communicate with the server 399 */ 400 this(Connection con) 401 { 402 _con = con; 403 _con.resetPacket(); 404 } 405 406 /** 407 * Construct a Command object complete with SQL 408 * 409 * Params: con = A Connection object to communicate with the server 410 * sql = SQL command string. 411 */ 412 this(Connection con, const(char)[] sql) 413 { 414 _sql = sql; 415 this(con); 416 } 417 418 @property 419 { 420 /// Get the current SQL for the Command 421 const(char)[] sql() pure const nothrow { return _sql; } 422 423 /** 424 * Set a new SQL command. 425 * 426 * This can have quite profound side effects. It resets the Command to 427 * an initial state. If a query has been issued on the Command that 428 * produced a result set, then all of the result set packets - field 429 * description sequence, EOF packet, result rows sequence, EOF packet 430 * must be flushed from the server before any further operation can be 431 * performed on the Connection. If you want to write speedy and efficient 432 * MySQL programs, you should bear this in mind when designing your 433 * queries so that you are not requesting many rows when one would do. 434 * 435 * Params: sql = SQL command string. 436 */ 437 const(char)[] sql(const(char)[] sql) 438 { 439 if (_hStmt) 440 { 441 purgeResult(); 442 releaseStatement(); 443 _con.resetPacket(); 444 } 445 return this._sql = sql; 446 } 447 } 448 449 /** 450 * Submit an SQL command to the server to be compiled into a prepared statement. 451 * 452 * The result of a successful outcome will be a statement handle - an ID - 453 * for the prepared statement, a count of the parameters required for 454 * excution of the statement, and a count of the columns that will be present 455 * in any result set that the command generates. Thes values will be stored 456 * in in the Command struct. 457 * 458 * The server will then proceed to send prepared statement headers, 459 * including parameter descriptions, and result set field descriptions, 460 * followed by an EOF packet. 461 * 462 * If there is an existing statement handle in the Command struct, that 463 * prepared statement is released. 464 * 465 * Throws: MySQLException if there are pending result set items, or if the 466 * server has a problem. 467 */ 468 void prepare() 469 { 470 enforceEx!MYX(!(_headersPending || _rowsPending), 471 "There are result set elements pending - purgeResult() required."); 472 473 scope(failure) _con.kill(); 474 475 if (_hStmt) 476 releaseStatement(); 477 _con.sendCmd(CommandType.STMT_PREPARE, _sql); 478 _fieldCount = 0; 479 480 ubyte[] packet = _con.getPacket(); 481 if (packet.front == ResultPacketMarker.ok) 482 { 483 packet.popFront(); 484 _hStmt = packet.consume!int(); 485 _fieldCount = packet.consume!short(); 486 _psParams = packet.consume!short(); 487 488 _inParams.length = _psParams; 489 _psa.length = _psParams; 490 491 packet.popFront(); // one byte filler 492 _psWarnings = packet.consume!short(); 493 494 // At this point the server also sends field specs for parameters 495 // and columns if there were any of each 496 _psh = PreparedStmtHeaders(_con, _fieldCount, _psParams); 497 } 498 else if(packet.front == ResultPacketMarker.error) 499 { 500 auto error = OKErrorPacket(packet); 501 enforcePacketOK(error); 502 assert(0); // FIXME: what now? 503 } 504 else 505 assert(0); // FIXME: what now? 506 } 507 508 /** 509 * Release a prepared statement. 510 * 511 * This method tells the server that it can dispose of the information it 512 * holds about the current prepared statement, and resets the Command 513 * object to an initial state in that respect. 514 */ 515 void releaseStatement() 516 { 517 scope(failure) _con.kill(); 518 519 ubyte[] packet; 520 packet.length = 9; 521 packet.setPacketHeader(0/*packet number*/); 522 _con.bumpPacket(); 523 packet[4] = CommandType.STMT_CLOSE; 524 _hStmt.packInto(packet[5..9]); 525 purgeResult(); 526 _con.send(packet); 527 // It seems that the server does not find it necessary to send a response 528 // for this command. 529 _hStmt = 0; 530 } 531 532 /** 533 * Flush any outstanding result set elements. 534 * 535 * When the server responds to a command that produces a result set, it 536 * queues the whole set of corresponding packets over the current connection. 537 * Before that Connection can embark on any new command, it must receive 538 * all of those packets and junk them. 539 * http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/ 540 */ 541 ulong purgeResult() 542 { 543 scope(failure) _con.kill(); 544 545 ulong rows = 0; 546 if (_fieldCount) 547 { 548 if (_headersPending) 549 { 550 for (size_t i = 0;; i++) 551 { 552 if (_con.getPacket().isEOFPacket()) 553 { 554 _headersPending = false; 555 break; 556 } 557 enforceEx!MYXProtocol(i < _fieldCount, "Field header count exceeded but no EOF packet found."); 558 } 559 } 560 if (_rowsPending) 561 { 562 for (;; rows++) 563 { 564 if (_con.getPacket().isEOFPacket()) 565 { 566 _rowsPending = _pendingBinary = false; 567 break; 568 } 569 } 570 } 571 } 572 _fieldCount = 0; 573 _con.resetPacket(); 574 return rows; 575 } 576 577 /** 578 * Bind a D variable to a prepared statement parameter. 579 * 580 * In this implementation, binding comprises setting a value into the 581 * appropriate element of an array of Variants which represent the 582 * parameters, and setting any required specializations. 583 * 584 * To bind to some D variable, we set the corrsponding variant with its 585 * address, so there is no need to rebind between calls to execPreparedXXX. 586 */ 587 void bindParameter(T)(ref T val, size_t pIndex, ParameterSpecialization psn = PSN(0, false, SQLType.INFER_FROM_D_TYPE, 0, null)) 588 { 589 // Now in theory we should be able to check the parameter type here, since the 590 // protocol is supposed to send us type information for the parameters, but this 591 // capability seems to be broken. This assertion is supported by the fact that 592 // the same information is not available via the MySQL C API either. It is up 593 // to the programmer to ensure that appropriate type information is embodied 594 // in the variant array, or provided explicitly. This sucks, but short of 595 // having a client side SQL parser I don't see what can be done. 596 // 597 // We require that the statement be prepared at this point so we can at least 598 // check that the parameter number is within the required range 599 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 600 enforceEx!MYX(pIndex < _psParams, "Parameter number is out of range for the prepared statement."); 601 _inParams[pIndex] = &val; 602 psn.pIndex = pIndex; 603 _psa[pIndex] = psn; 604 } 605 606 /** 607 * Bind a tuple of D variables to the parameters of a prepared statement. 608 * 609 * You can use this method to bind a set of variables if you don't need any specialization, 610 * that is there will be no null values, and chunked transfer is not neccessary. 611 * 612 * The tuple must match the required number of parameters, and it is the programmer's 613 * responsibility to ensure that they are of appropriate types. 614 */ 615 void bindParameterTuple(T...)(ref T args) 616 { 617 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 618 enforceEx!MYX(args.length == _psParams, "Argument list supplied does not match the number of parameters."); 619 foreach (size_t i, dummy; args) 620 _inParams[i] = &args[i]; 621 } 622 623 /** 624 * Bind a Variant[] as the parameters of a prepared statement. 625 * 626 * You can use this method to bind a set of variables in Variant form to 627 * the parameters of a prepared statement. 628 * 629 * Parameter specializations can be added if required. This method could be 630 * used to add records from a data entry form along the lines of 631 * ------------ 632 * auto c = Command(con, "insert into table42 values(?, ?, ?)"); 633 * c.prepare(); 634 * Variant[] va; 635 * va.length = 3; 636 * c.bindParameters(va); 637 * DataRecord dr; // Some data input facility 638 * ulong ra; 639 * do 640 * { 641 * dr.get(); 642 * va[0] = dr("Name"); 643 * va[1] = dr("City"); 644 * va[2] = dr("Whatever"); 645 * c.execPrepared(ra); 646 * } while(tod < "17:30"); 647 * ------------ 648 * Params: va = External list of Variants to be used as parameters 649 * psnList = any required specializations 650 */ 651 void bindParameters(Variant[] va, ParameterSpecialization[] psnList= null) 652 { 653 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 654 enforceEx!MYX(va.length == _psParams, "Param count supplied does not match prepared statement"); 655 _inParams[] = va[]; 656 if (psnList !is null) 657 { 658 foreach (PSN psn; psnList) 659 _psa[psn.pIndex] = psn; 660 } 661 } 662 663 /** 664 * Access a prepared statement parameter for update. 665 * 666 * Another style of usage would simply update the parameter Variant directly 667 * 668 * ------------ 669 * c.param(0) = 42; 670 * c.param(1) = "The answer"; 671 * ------------ 672 * Params: index = The zero based index 673 */ 674 ref Variant param(size_t index) pure 675 { 676 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 677 enforceEx!MYX(index < _psParams, "Parameter index out of range."); 678 return _inParams[index]; 679 } 680 681 /** 682 * Sets a prepared statement parameter to NULL. 683 * 684 * Params: index = The zero based index 685 */ 686 void setNullParam(size_t index) 687 { 688 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 689 enforceEx!MYX(index < _psParams, "Parameter index out of range."); 690 _psa[index].isNull = true; 691 _inParams[index] = ""; 692 } 693 694 /** 695 * Execute a one-off SQL command. 696 * 697 * Use this method when you are not going to be using the same command repeatedly. 698 * It can be used with commands that don't produce a result set, or those that 699 * do. If there is a result set its existence will be indicated by the return value. 700 * 701 * Any result set can be accessed vis getNextRow(), but you should really be 702 * using execSQLResult() or execSQLSequence() for such queries. 703 * 704 * Params: ra = An out parameter to receive the number of rows affected. 705 * Returns: true if there was a (possibly empty) result set. 706 */ 707 bool execSQL(out ulong ra) 708 { 709 scope(failure) _con.kill(); 710 711 _con.sendCmd(CommandType.QUERY, _sql); 712 _fieldCount = 0; 713 ubyte[] packet = _con.getPacket(); 714 bool rv; 715 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 716 { 717 _con.resetPacket(); 718 auto okp = OKErrorPacket(packet); 719 enforcePacketOK(okp); 720 ra = okp.affected; 721 _con._serverStatus = okp.serverStatus; 722 _insertID = okp.insertID; 723 rv = false; 724 } 725 else 726 { 727 // There was presumably a result set 728 assert(packet.front >= 1 && packet.front <= 250); // ResultSet packet header should have this value 729 _headersPending = _rowsPending = true; 730 _pendingBinary = false; 731 auto lcb = packet.consumeIfComplete!LCB(); 732 assert(!lcb.isNull); 733 assert(!lcb.isIncomplete); 734 _fieldCount = cast(ushort)lcb.value; 735 assert(_fieldCount == lcb.value); 736 rv = true; 737 ra = 0; 738 } 739 return rv; 740 } 741 742 ///ditto 743 bool execSQL() 744 { 745 ulong ra; 746 return execSQL(ra); 747 } 748 749 /** 750 * Execute a one-off SQL command for the case where you expect a result set, 751 * and want it all at once. 752 * 753 * Use this method when you are not going to be using the same command repeatedly. 754 * This method will throw if the SQL command does not produce a result set. 755 * 756 * If there are long data items among the expected result columns you can specify 757 * that they are to be subject to chunked transfer via a delegate. 758 * 759 * Params: csa = An optional array of ColumnSpecialization structs. 760 * Returns: A (possibly empty) ResultSet. 761 */ 762 ResultSet execSQLResult(ColumnSpecialization[] csa = null) 763 { 764 ulong ra; 765 enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set."); 766 767 _rsh = ResultSetHeaders(_con, _fieldCount); 768 if (csa !is null) 769 _rsh.addSpecializations(csa); 770 _headersPending = false; 771 772 Row[] rows; 773 while(true) 774 { 775 auto packet = _con.getPacket(); 776 if(packet.isEOFPacket()) 777 break; 778 rows ~= Row(_con, packet, _rsh, false); 779 // As the row fetches more data while incomplete, it might already have 780 // fetched the EOF marker, so we have to check it again 781 if(!packet.empty && packet.isEOFPacket()) 782 break; 783 } 784 _rowsPending = _pendingBinary = false; 785 786 return ResultSet(rows, _rsh.fieldNames); 787 } 788 789 /** 790 * Execute a one-off SQL command for the case where you expect a result set, 791 * and want to deal with it a row at a time. 792 * 793 * Use this method when you are not going to be using the same command repeatedly. 794 * This method will throw if the SQL command does not produce a result set. 795 * 796 * If there are long data items among the expected result columns you can specify 797 * that they are to be subject to chunked transfer via a delegate. 798 * 799 * Params: csa = An optional array of ColumnSpecialization structs. 800 * Returns: A (possibly empty) ResultSequence. 801 */ 802 ResultSequence execSQLSequence(ColumnSpecialization[] csa = null) 803 { 804 uint alloc = 20; 805 Row[] rra; 806 rra.length = alloc; 807 uint cr = 0; 808 ulong ra; 809 enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set."); 810 _rsh = ResultSetHeaders(_con, _fieldCount); 811 if (csa !is null) 812 _rsh.addSpecializations(csa); 813 814 _headersPending = false; 815 return ResultSequence(&this, _rsh.fieldNames); 816 } 817 818 /** 819 * Execute a one-off SQL command to place result values into a set of D variables. 820 * 821 * Use this method when you are not going to be using the same command repeatedly. 822 * It will throw if the specified command does not produce a result set, or if 823 * any column type is incompatible with the corresponding D variable. 824 * 825 * Params: args = A tuple of D variables to receive the results. 826 * Returns: true if there was a (possibly empty) result set. 827 */ 828 void execSQLTuple(T...)(ref T args) 829 { 830 ulong ra; 831 enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set."); 832 Row rr = getNextRow(); 833 /+if (!rr._valid) // The result set was empty - not a crime. 834 return;+/ 835 enforceEx!MYX(rr._values.length == args.length, "Result column count does not match the target tuple."); 836 foreach (size_t i, dummy; args) 837 { 838 enforceEx!MYX(typeid(args[i]).toString() == rr._values[i].type.toString(), 839 "Tuple "~to!string(i)~" type and column type are not compatible."); 840 args[i] = rr._values[i].get!(typeof(args[i])); 841 } 842 // If there were more rows, flush them away 843 // Question: Should I check in purgeResult and throw if there were - it's very inefficient to 844 // allow sloppy SQL that does not ensure just one row! 845 purgeResult(); 846 } 847 848 /** 849 * Execute a prepared command. 850 * 851 * Use this method when you will use the same SQL command repeatedly. 852 * It can be used with commands that don't produce a result set, or those that 853 * do. If there is a result set its existence will be indicated by the return value. 854 * 855 * Any result set can be accessed vis getNextRow(), but you should really be 856 * using execPreparedResult() or execPreparedSequence() for such queries. 857 * 858 * Params: ra = An out parameter to receive the number of rows affected. 859 * Returns: true if there was a (possibly empty) result set. 860 */ 861 bool execPrepared(out ulong ra) 862 { 863 enforceEx!MYX(_hStmt, "The statement has not been prepared."); 864 scope(failure) _con.kill(); 865 866 ubyte[] packet; 867 _con.resetPacket(); 868 869 ubyte[] prefix = makePSPrefix(0); 870 size_t len = prefix.length; 871 bool longData; 872 873 if (_psh._paramCount) 874 { 875 ubyte[] one = [ 1 ]; 876 ubyte[] vals; 877 ubyte[] types = analyseParams(vals, longData); 878 ubyte[] nbm = makeBitmap(_psa); 879 packet = prefix ~ nbm ~ one ~ types ~ vals; 880 } 881 else 882 packet = prefix; 883 884 if (longData) 885 sendLongData(); 886 887 assert(packet.length <= uint.max); 888 packet.setPacketHeader(_con.pktNumber); 889 _con.bumpPacket(); 890 _con.send(packet); 891 packet = _con.getPacket(); 892 bool rv; 893 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 894 { 895 _con.resetPacket(); 896 auto okp = OKErrorPacket(packet); 897 enforcePacketOK(okp); 898 ra = okp.affected; 899 _con._serverStatus = okp.serverStatus; 900 _insertID = okp.insertID; 901 rv = false; 902 } 903 else 904 { 905 // There was presumably a result set 906 _headersPending = _rowsPending = _pendingBinary = true; 907 auto lcb = packet.consumeIfComplete!LCB(); 908 assert(!lcb.isIncomplete); 909 _fieldCount = cast(ushort)lcb.value; 910 rv = true; 911 } 912 return rv; 913 } 914 915 /** 916 * Execute a prepared SQL command for the case where you expect a result set, 917 * and want it all at once. 918 * 919 * Use this method when you will use the same command repeatedly. 920 * This method will throw if the SQL command does not produce a result set. 921 * 922 * If there are long data items among the expected result columns you can specify 923 * that they are to be subject to chunked transfer via a delegate. 924 * 925 * Params: csa = An optional array of ColumnSpecialization structs. 926 * Returns: A (possibly empty) ResultSet. 927 */ 928 ResultSet execPreparedResult(ColumnSpecialization[] csa = null) 929 { 930 ulong ra; 931 enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set."); 932 uint alloc = 20; 933 Row[] rra; 934 rra.length = alloc; 935 uint cr = 0; 936 _rsh = ResultSetHeaders(_con, _fieldCount); 937 if (csa !is null) 938 _rsh.addSpecializations(csa); 939 _headersPending = false; 940 ubyte[] packet; 941 for (size_t i = 0;; i++) 942 { 943 packet = _con.getPacket(); 944 if (packet.isEOFPacket()) 945 break; 946 Row row = Row(_con, packet, _rsh, true); 947 if (cr >= alloc) 948 { 949 alloc = (alloc*3)/2; 950 rra.length = alloc; 951 } 952 rra[cr++] = row; 953 if (!packet.empty && packet.isEOFPacket()) 954 break; 955 } 956 _rowsPending = _pendingBinary = false; 957 rra.length = cr; 958 ResultSet rs = ResultSet(rra, _rsh.fieldNames); 959 return rs; 960 } 961 962 /** 963 * Execute a prepared SQL command for the case where you expect a result set, 964 * and want to deal with it one row at a time. 965 * 966 * Use this method when you will use the same command repeatedly. 967 * This method will throw if the SQL command does not produce a result set. 968 * 969 * If there are long data items among the expected result columns you can 970 * specify that they are to be subject to chunked transfer via a delegate. 971 * 972 * Params: csa = An optional array of ColumnSpecialization structs. 973 * Returns: A (possibly empty) ResultSequence. 974 */ 975 ResultSequence execPreparedSequence(ColumnSpecialization[] csa = null) 976 { 977 ulong ra; 978 enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set."); 979 uint alloc = 20; 980 Row[] rra; 981 rra.length = alloc; 982 uint cr = 0; 983 _rsh = ResultSetHeaders(_con, _fieldCount); 984 if (csa !is null) 985 _rsh.addSpecializations(csa); 986 _headersPending = false; 987 return ResultSequence(&this, _rsh.fieldNames); 988 } 989 990 /** 991 * Execute a prepared SQL command to place result values into a set of D variables. 992 * 993 * Use this method when you will use the same command repeatedly. 994 * It will throw if the specified command does not produce a result set, or 995 * if any column type is incompatible with the corresponding D variable 996 * 997 * Params: args = A tuple of D variables to receive the results. 998 * Returns: true if there was a (possibly empty) result set. 999 */ 1000 void execPreparedTuple(T...)(ref T args) 1001 { 1002 ulong ra; 1003 enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set."); 1004 Row rr = getNextRow(); 1005 // enforceEx!MYX(rr._valid, "The result set was empty."); 1006 enforceEx!MYX(rr._values.length == args.length, "Result column count does not match the target tuple."); 1007 foreach (size_t i, dummy; args) 1008 { 1009 enforceEx!MYX(typeid(args[i]).toString() == rr._values[i].type.toString(), 1010 "Tuple "~to!string(i)~" type and column type are not compatible."); 1011 args[i] = rr._values[i].get!(typeof(args[i])); 1012 } 1013 // If there were more rows, flush them away 1014 // Question: Should I check in purgeResult and throw if there were - it's very inefficient to 1015 // allow sloppy SQL that does not ensure just one row! 1016 purgeResult(); 1017 } 1018 1019 /** 1020 * Get the next Row of a pending result set. 1021 * 1022 * This method can be used after either execSQL() or execPrepared() have returned true 1023 * to retrieve result set rows sequentially. 1024 * 1025 * Similar functionality is available via execSQLSequence() and execPreparedSequence() in 1026 * which case the interface is presented as a forward range of Rows. 1027 * 1028 * This method allows you to deal with very large result sets either a row at a time, 1029 * or by feeding the rows into some suitable container such as a linked list. 1030 * 1031 * Returns: A Row object. 1032 */ 1033 Row getNextRow() 1034 { 1035 scope(failure) _con.kill(); 1036 1037 if (_headersPending) 1038 { 1039 _rsh = ResultSetHeaders(_con, _fieldCount); 1040 _headersPending = false; 1041 } 1042 ubyte[] packet; 1043 Row rr; 1044 packet = _con.getPacket(); 1045 if (packet.isEOFPacket()) 1046 { 1047 _rowsPending = _pendingBinary = false; 1048 return rr; 1049 } 1050 if (_pendingBinary) 1051 rr = Row(_con, packet, _rsh, true); 1052 else 1053 rr = Row(_con, packet, _rsh, false); 1054 //rr._valid = true; 1055 return rr; 1056 } 1057 1058 /** 1059 * Execute a stored function, with any required input variables, and store the 1060 * return value into a D variable. 1061 * 1062 * For this method, no query string is to be provided. The required one is of 1063 * the form "select foo(?, ? ...)". The method generates it and the appropriate 1064 * bindings - in, and out. Chunked transfers are not supported in either 1065 * direction. If you need them, create the parameters separately, then use 1066 * execPreparedResult() to get a one-row, one-column result set. 1067 * 1068 * If it is not possible to convert the column value to the type of target, 1069 * then execFunction will throw. If the result is NULL, that is indicated 1070 * by a false return value, and target is unchanged. 1071 * 1072 * In the interest of performance, this method assumes that the user has the 1073 * equired information about the number and types of IN parameters and the 1074 * type of the output variable. In the same interest, if the method is called 1075 * repeatedly for the same stored function, prepare() is omitted after the first call. 1076 * 1077 * Params: 1078 * T = The type of the variable to receive the return result. 1079 * U = type tuple of arguments 1080 * name = The name of the stored function. 1081 * target = the D variable to receive the stored function return result. 1082 * args = The list of D variables to act as IN arguments to the stored function. 1083 * 1084 */ 1085 bool execFunction(T, U...)(string name, ref T target, U args) 1086 { 1087 bool repeatCall = (name == _prevFunc); 1088 enforceEx!MYX(repeatCall || _hStmt == 0, "You must not prepare the statement before calling execFunction"); 1089 if (!repeatCall) 1090 { 1091 _sql = "select " ~ name ~ "("; 1092 bool comma = false; 1093 foreach (arg; args) 1094 { 1095 if (comma) 1096 _sql ~= ",?"; 1097 else 1098 { 1099 _sql ~= "?"; 1100 comma = true; 1101 } 1102 } 1103 _sql ~= ")"; 1104 prepare(); 1105 _prevFunc = name; 1106 } 1107 bindParameterTuple(args); 1108 ulong ra; 1109 enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set."); 1110 Row rr = getNextRow(); 1111 /+enforceEx!MYX(rr._valid, "The result set was empty.");+/ 1112 enforceEx!MYX(rr._values.length == 1, "Result was not a single column."); 1113 enforceEx!MYX(typeid(target).toString() == rr._values[0].type.toString(), 1114 "Target type and column type are not compatible."); 1115 if (!rr.isNull(0)) 1116 target = rr._values[0].get!(T); 1117 // If there were more rows, flush them away 1118 // Question: Should I check in purgeResult and throw if there were - it's very inefficient to 1119 // allow sloppy SQL that does not ensure just one row! 1120 purgeResult(); 1121 return !rr.isNull(0); 1122 } 1123 1124 /** 1125 * Execute a stored procedure, with any required input variables. 1126 * 1127 * For this method, no query string is to be provided. The required one is 1128 * of the form "call proc(?, ? ...)". The method generates it and the 1129 * appropriate in bindings. Chunked transfers are not supported. If you 1130 * need them, create the parameters separately, then use execPrepared() or 1131 * execPreparedResult(). 1132 * 1133 * In the interest of performance, this method assumes that the user has 1134 * the required information about the number and types of IN parameters. 1135 * In the same interest, if the method is called repeatedly for the same 1136 * stored function, prepare() and other redundant operations are omitted 1137 * after the first call. 1138 * 1139 * OUT parameters are not currently supported. It should generally be 1140 * possible with MySQL to present them as a result set. 1141 * 1142 * Params: 1143 * T = Type tuple 1144 * name = The name of the stored procedure. 1145 * args = Tuple of args 1146 * Returns: True if the SP created a result set. 1147 */ 1148 bool execProcedure(T...)(string name, ref T args) 1149 { 1150 bool repeatCall = (name == _prevFunc); 1151 enforceEx!MYX(repeatCall || _hStmt == 0, "You must not prepare a statement before calling execProcedure"); 1152 if (!repeatCall) 1153 { 1154 _sql = "call " ~ name ~ "("; 1155 bool comma = false; 1156 foreach (arg; args) 1157 { 1158 if (comma) 1159 _sql ~= ",?"; 1160 else 1161 { 1162 _sql ~= "?"; 1163 comma = true; 1164 } 1165 } 1166 _sql ~= ")"; 1167 prepare(); 1168 _prevFunc = name; 1169 } 1170 bindParameterTuple(args); 1171 ulong ra; 1172 return execPrepared(ra); 1173 } 1174 1175 /// After a command that inserted a row into a table with an auto-increment 1176 /// ID column, this method allows you to retrieve the last insert ID. 1177 @property ulong lastInsertID() pure const nothrow { return _insertID; } 1178 1179 /// Gets the number of parameters in this Command 1180 @property ushort numParams() pure const nothrow 1181 { 1182 return _psParams; 1183 } 1184 1185 /// Gets the number of rows pending 1186 @property bool rowsPending() pure const nothrow { return _rowsPending; } 1187 1188 /// Gets the result header's field descriptions. 1189 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 1190 /// Gets the prepared header's field descriptions. 1191 @property FieldDescription[] preparedFieldDescriptions() pure { return _psh.fieldDescriptions; } 1192 /// Gets the prepared header's param descriptions. 1193 @property ParamDescription[] preparedParamDescriptions() pure { return _psh.paramDescriptions; } 1194 }