1 module mysql.protocol.commands; 2 3 import std.algorithm; 4 import std.conv; 5 import std.datetime; 6 import std.digest.sha; 7 import std.exception; 8 import std.range; 9 import std.socket; 10 import std.stdio; 11 import std.string; 12 import std.traits; 13 import std.variant; 14 15 import mysql.common; 16 import mysql.connection; 17 import mysql.protocol.constants; 18 import mysql.protocol.extra_types; 19 import mysql.protocol.packets; 20 21 /** 22 * Encapsulation of an SQL command or query. 23 * 24 * A Command be be either a one-off SQL query, or may use a prepared statement. 25 * Commands that are expected to return a result set - queries - have distinctive methods 26 * that are enforced. That is it will be an error to call such a method with an SQL command 27 * that does not produce a result set. 28 */ 29 struct Command 30 { 31 package: 32 Connection _con; 33 const(char)[] _sql; 34 uint _hStmt; 35 ulong _insertID; 36 bool _rowsPending, _headersPending, _pendingBinary, _rebound; 37 ushort _psParams, _psWarnings, _fieldCount; 38 ResultSetHeaders _rsh; 39 PreparedStmtHeaders _psh; 40 Variant[] _inParams; 41 ParameterSpecialization[] _psa; 42 string _prevFunc; 43 44 bool sendCmd(CommandType cmd) 45 { 46 enforceEx!MYX(!(_headersPending || _rowsPending), 47 "There are result set elements pending - purgeResult() required."); 48 49 scope(failure) _con.kill(); 50 _con.sendCmd(cmd, _sql); 51 return true; 52 } 53 54 static ubyte[] makeBitmap(in ParameterSpecialization[] psa) pure nothrow 55 { 56 size_t bml = (psa.length+7)/8; 57 ubyte[] bma; 58 bma.length = bml; 59 foreach (size_t i, PSN psn; psa) 60 { 61 if (!psn.isNull) 62 continue; 63 size_t bn = i/8; 64 size_t bb = i%8; 65 ubyte sr = 1; 66 sr <<= bb; 67 bma[bn] |= sr; 68 } 69 return bma; 70 } 71 72 ubyte[] makePSPrefix(ubyte flags = 0) pure const nothrow 73 { 74 ubyte[] prefix; 75 prefix.length = 14; 76 77 prefix[4] = CommandType.STMT_EXECUTE; 78 _hStmt.packInto(prefix[5..9]); 79 prefix[9] = flags; // flags, no cursor 80 prefix[10] = 1; // iteration count - currently always 1 81 prefix[11] = 0; 82 prefix[12] = 0; 83 prefix[13] = 0; 84 85 return prefix; 86 } 87 88 ubyte[] analyseParams(out ubyte[] vals, out bool longData) 89 { 90 size_t pc = _inParams.length; 91 ubyte[] types; 92 types.length = pc*2; 93 size_t alloc = pc*20; 94 vals.length = alloc; 95 uint vcl = 0, len; 96 int ct = 0; 97 98 void reAlloc(size_t n) 99 { 100 if (vcl+n < alloc) 101 return; 102 size_t inc = (alloc*3)/2; 103 if (inc < n) 104 inc = n; 105 alloc += inc; 106 vals.length = alloc; 107 } 108 109 foreach (size_t i; 0..pc) 110 { 111 if (_psa[i].chunkSize) 112 longData= true; 113 bool isnull = _psa[i].isNull; 114 Variant v = _inParams[i]; 115 SQLType ext = _psa[i].type; 116 string ts = v.type.toString(); 117 bool isRef; 118 if (ts[$-1] == '*') 119 { 120 ts.length = ts.length-1; 121 isRef= true; 122 } 123 124 enum UNSIGNED = 0x80; 125 enum SIGNED = 0; 126 switch (ts) 127 { 128 case "bool": 129 if (ext == SQLType.INFER_FROM_D_TYPE) 130 types[ct++] = SQLType.BIT; 131 else 132 types[ct++] = cast(ubyte) ext; 133 types[ct++] = SIGNED; 134 if (isnull) break; 135 reAlloc(2); 136 bool bv = isRef? *(v.get!(bool*)): v.get!(bool); 137 vals[vcl++] = 1; 138 vals[vcl++] = bv? 0x31: 0x30; 139 break; 140 case "byte": 141 types[ct++] = SQLType.TINY; 142 types[ct++] = SIGNED; 143 if (isnull) break; 144 reAlloc(1); 145 vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte); 146 break; 147 case "ubyte": 148 types[ct++] = SQLType.TINY; 149 types[ct++] = UNSIGNED; 150 if (isnull) break; 151 reAlloc(1); 152 vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte); 153 break; 154 case "short": 155 types[ct++] = SQLType.SHORT; 156 types[ct++] = SIGNED; 157 if (isnull) break; 158 reAlloc(2); 159 short si = isRef? *(v.get!(short*)): v.get!(short); 160 vals[vcl++] = cast(ubyte) (si & 0xff); 161 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 162 break; 163 case "ushort": 164 types[ct++] = SQLType.SHORT; 165 types[ct++] = UNSIGNED; 166 reAlloc(2); 167 ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort); 168 vals[vcl++] = cast(ubyte) (us & 0xff); 169 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 170 break; 171 case "int": 172 types[ct++] = SQLType.INT; 173 types[ct++] = SIGNED; 174 if (isnull) break; 175 reAlloc(4); 176 int ii = isRef? *(v.get!(int*)): v.get!(int); 177 vals[vcl++] = cast(ubyte) (ii & 0xff); 178 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 179 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 180 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 181 break; 182 case "uint": 183 types[ct++] = SQLType.INT; 184 types[ct++] = UNSIGNED; 185 if (isnull) break; 186 reAlloc(4); 187 uint ui = isRef? *(v.get!(uint*)): v.get!(uint); 188 vals[vcl++] = cast(ubyte) (ui & 0xff); 189 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 190 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 191 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 192 break; 193 case "long": 194 types[ct++] = SQLType.LONGLONG; 195 types[ct++] = SIGNED; 196 if (isnull) break; 197 reAlloc(8); 198 long li = isRef? *(v.get!(long*)): v.get!(long); 199 vals[vcl++] = cast(ubyte) (li & 0xff); 200 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 201 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 202 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 203 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 204 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 205 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 206 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 207 break; 208 case "ulong": 209 types[ct++] = SQLType.LONGLONG; 210 types[ct++] = UNSIGNED; 211 if (isnull) break; 212 reAlloc(8); 213 ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong); 214 vals[vcl++] = cast(ubyte) (ul & 0xff); 215 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 216 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 217 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 218 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 219 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 220 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 221 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 222 break; 223 case "float": 224 types[ct++] = SQLType.FLOAT; 225 types[ct++] = SIGNED; 226 if (isnull) break; 227 reAlloc(4); 228 float f = isRef? *(v.get!(float*)): v.get!(float); 229 ubyte* ubp = cast(ubyte*) &f; 230 vals[vcl++] = *ubp++; 231 vals[vcl++] = *ubp++; 232 vals[vcl++] = *ubp++; 233 vals[vcl++] = *ubp; 234 break; 235 case "double": 236 types[ct++] = SQLType.DOUBLE; 237 types[ct++] = SIGNED; 238 if (isnull) break; 239 reAlloc(8); 240 double d = isRef? *(v.get!(double*)): v.get!(double); 241 ubyte* ubp = cast(ubyte*) &d; 242 vals[vcl++] = *ubp++; 243 vals[vcl++] = *ubp++; 244 vals[vcl++] = *ubp++; 245 vals[vcl++] = *ubp++; 246 vals[vcl++] = *ubp++; 247 vals[vcl++] = *ubp++; 248 vals[vcl++] = *ubp++; 249 vals[vcl++] = *ubp; 250 break; 251 case "std.datetime.Date": 252 types[ct++] = SQLType.DATE; 253 types[ct++] = SIGNED; 254 Date date = isRef? *(v.get!(Date*)): v.get!(Date); 255 ubyte[] da = pack(date); 256 size_t l = da.length; 257 if (isnull) break; 258 reAlloc(l); 259 vals[vcl..vcl+l] = da[]; 260 vcl += l; 261 break; 262 case "std.datetime.Time": 263 types[ct++] = SQLType.TIME; 264 types[ct++] = SIGNED; 265 TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay); 266 ubyte[] ta = pack(time); 267 size_t l = ta.length; 268 if (isnull) break; 269 reAlloc(l); 270 vals[vcl..vcl+l] = ta[]; 271 vcl += l; 272 break; 273 case "std.datetime.DateTime": 274 types[ct++] = SQLType.DATETIME; 275 types[ct++] = SIGNED; 276 DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime); 277 ubyte[] da = pack(dt); 278 size_t l = da.length; 279 if (isnull) break; 280 reAlloc(l); 281 vals[vcl..vcl+l] = da[]; 282 vcl += l; 283 break; 284 case "connect.Timestamp": 285 types[ct++] = SQLType.TIMESTAMP; 286 types[ct++] = SIGNED; 287 Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp); 288 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 289 ubyte[] da = pack(dt); 290 size_t l = da.length; 291 if (isnull) break; 292 reAlloc(l); 293 vals[vcl..vcl+l] = da[]; 294 vcl += l; 295 break; 296 case "immutable(char)[]": 297 if (ext == SQLType.INFER_FROM_D_TYPE) 298 types[ct++] = SQLType.VARCHAR; 299 else 300 types[ct++] = cast(ubyte) ext; 301 types[ct++] = SIGNED; 302 if (isnull) break; 303 string s = isRef? *(v.get!(string*)): v.get!(string); 304 ubyte[] packed = packLCS(cast(void[]) s); 305 reAlloc(packed.length); 306 vals[vcl..vcl+packed.length] = packed[]; 307 vcl += packed.length; 308 break; 309 case "char[]": 310 if (ext == SQLType.INFER_FROM_D_TYPE) 311 types[ct++] = SQLType.VARCHAR; 312 else 313 types[ct++] = cast(ubyte) ext; 314 types[ct++] = SIGNED; 315 if (isnull) break; 316 char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]); 317 ubyte[] packed = packLCS(cast(void[]) ca); 318 reAlloc(packed.length); 319 vals[vcl..vcl+packed.length] = packed[]; 320 vcl += packed.length; 321 break; 322 case "byte[]": 323 if (ext == SQLType.INFER_FROM_D_TYPE) 324 types[ct++] = SQLType.TINYBLOB; 325 else 326 types[ct++] = cast(ubyte) ext; 327 types[ct++] = SIGNED; 328 if (isnull) break; 329 byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]); 330 ubyte[] packed = packLCS(cast(void[]) ba); 331 reAlloc(packed.length); 332 vals[vcl..vcl+packed.length] = packed[]; 333 vcl += packed.length; 334 break; 335 case "ubyte[]": 336 if (ext == SQLType.INFER_FROM_D_TYPE) 337 types[ct++] = SQLType.TINYBLOB; 338 else 339 types[ct++] = cast(ubyte) ext; 340 types[ct++] = SIGNED; 341 if (isnull) break; 342 ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]); 343 ubyte[] packed = packLCS(cast(void[]) uba); 344 reAlloc(packed.length); 345 vals[vcl..vcl+packed.length] = packed[]; 346 vcl += packed.length; 347 break; 348 case "void": 349 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 350 default: 351 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 352 } 353 } 354 vals.length = vcl; 355 return types; 356 } 357 358 void sendLongData() 359 { 360 assert(_psa.length <= ushort.max); // parameter number is sent as short 361 foreach (ushort i, PSN psn; _psa) 362 { 363 if (!psn.chunkSize) continue; 364 uint cs = psn.chunkSize; 365 uint delegate(ubyte[]) dg = psn.chunkDelegate; 366 367 ubyte[] chunk; 368 chunk.length = cs+11; 369 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 370 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 371 _hStmt.packInto(chunk[5..9]); // statement handle 372 packInto(i, chunk[9..11]); // parameter number 373 374 // byte 11 on is payload 375 for (;;) 376 { 377 uint sent = dg(chunk[11..cs+11]); 378 if (sent < cs) 379 { 380 if (sent == 0) // data was exact multiple of chunk size - all sent 381 break; 382 sent += 7; // adjust for non-payload bytes 383 chunk.length = chunk.length - (cs-sent); // trim the chunk 384 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 385 _con.send(chunk); 386 break; 387 } 388 _con.send(chunk); 389 } 390 } 391 } 392 393 public: 394 395 /** 396 * Construct a naked Command object 397 * 398 * Params: con = A Connection object to communicate with the server 399 */ 400 this(Connection con) 401 { 402 _con = con; 403 _con.resetPacket(); 404 } 405 406 /** 407 * Construct a Command object complete with SQL 408 * 409 * Params: con = A Connection object to communicate with the server 410 * sql = SQL command string. 411 */ 412 this(Connection con, const(char)[] sql) 413 { 414 _sql = sql; 415 this(con); 416 } 417 418 @property 419 { 420 /// Get the current SQL for the Command 421 const(char)[] sql() pure const nothrow { return _sql; } 422 423 /** 424 * Set a new SQL command. 425 * 426 * This can have quite profound side effects. It resets the Command to 427 * an initial state. If a query has been issued on the Command that 428 * produced a result set, then all of the result set packets - field 429 * description sequence, EOF packet, result rows sequence, EOF packet 430 * must be flushed from the server before any further operation can be 431 * performed on the Connection. If you want to write speedy and efficient 432 * MySQL programs, you should bear this in mind when designing your 433 * queries so that you are not requesting many rows when one would do. 434 * 435 * Params: sql = SQL command string. 436 */ 437 const(char)[] sql(const(char)[] sql) 438 { 439 if (_hStmt) 440 { 441 purgeResult(); 442 releaseStatement(); 443 _con.resetPacket(); 444 } 445 return this._sql = sql; 446 } 447 } 448 449 /** 450 * Submit an SQL command to the server to be compiled into a prepared statement. 451 * 452 * The result of a successful outcome will be a statement handle - an ID - 453 * for the prepared statement, a count of the parameters required for 454 * excution of the statement, and a count of the columns that will be present 455 * in any result set that the command generates. Thes values will be stored 456 * in in the Command struct. 457 * 458 * The server will then proceed to send prepared statement headers, 459 * including parameter descriptions, and result set field descriptions, 460 * followed by an EOF packet. 461 * 462 * If there is an existing statement handle in the Command struct, that 463 * prepared statement is released. 464 * 465 * Throws: MySQLException if there are pending result set items, or if the 466 * server has a problem. 467 */ 468 void prepare() 469 { 470 enforceEx!MYX(!(_headersPending || _rowsPending), 471 "There are result set elements pending - purgeResult() required."); 472 473 scope(failure) _con.kill(); 474 475 if (_hStmt) 476 releaseStatement(); 477 _con.sendCmd(CommandType.STMT_PREPARE, _sql); 478 _fieldCount = 0; 479 480 ubyte[] packet = _con.getPacket(); 481 if (packet.front == ResultPacketMarker.ok) 482 { 483 packet.popFront(); 484 _hStmt = packet.consume!int(); 485 _fieldCount = packet.consume!short(); 486 _psParams = packet.consume!short(); 487 488 _inParams.length = _psParams; 489 _psa.length = _psParams; 490 491 packet.popFront(); // one byte filler 492 _psWarnings = packet.consume!short(); 493 494 // At this point the server also sends field specs for parameters 495 // and columns if there were any of each 496 _psh = PreparedStmtHeaders(_con, _fieldCount, _psParams); 497 } 498 else if(packet.front == ResultPacketMarker.error) 499 { 500 auto error = OKErrorPacket(packet); 501 enforcePacketOK(error); 502 assert(0); // FIXME: what now? 503 } 504 else 505 assert(0); // FIXME: what now? 506 } 507 508 /** 509 * Release a prepared statement. 510 * 511 * This method tells the server that it can dispose of the information it 512 * holds about the current prepared statement, and resets the Command 513 * object to an initial state in that respect. 514 */ 515 void releaseStatement() 516 { 517 scope(failure) _con.kill(); 518 519 ubyte[] packet; 520 packet.length = 9; 521 packet.setPacketHeader(0/*packet number*/); 522 _con.bumpPacket(); 523 packet[4] = CommandType.STMT_CLOSE; 524 _hStmt.packInto(packet[5..9]); 525 purgeResult(); 526 _con.send(packet); 527 // It seems that the server does not find it necessary to send a response 528 // for this command. 529 _hStmt = 0; 530 } 531 532 /** 533 * Flush any outstanding result set elements. 534 * 535 * When the server responds to a command that produces a result set, it 536 * queues the whole set of corresponding packets over the current connection. 537 * Before that Connection can embark on any new command, it must receive 538 * all of those packets and junk them. 539 * http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/ 540 */ 541 ulong purgeResult() 542 { 543 scope(failure) _con.kill(); 544 545 ulong rows = 0; 546 if (_fieldCount) 547 { 548 if (_headersPending) 549 { 550 for (size_t i = 0;; i++) 551 { 552 if (_con.getPacket().isEOFPacket()) 553 { 554 _headersPending = false; 555 break; 556 } 557 enforceEx!MYXProtocol(i < _fieldCount, "Field header count exceeded but no EOF packet found."); 558 } 559 } 560 if (_rowsPending) 561 { 562 for (;; rows++) 563 { 564 if (_con.getPacket().isEOFPacket()) 565 { 566 _rowsPending = _pendingBinary = false; 567 break; 568 } 569 } 570 } 571 } 572 _fieldCount = 0; 573 _con.resetPacket(); 574 return rows; 575 } 576 577 /** 578 * Bind a D variable to a prepared statement parameter. 579 * 580 * In this implementation, binding comprises setting a value into the 581 * appropriate element of an array of Variants which represent the 582 * parameters, and setting any required specializations. 583 * 584 * To bind to some D variable, we set the corrsponding variant with its 585 * address, so there is no need to rebind between calls to execPreparedXXX. 586 */ 587 void bindParameter(T)(ref T val, size_t pIndex, ParameterSpecialization psn = PSN(0, false, SQLType.INFER_FROM_D_TYPE, 0, null)) 588 { 589 // Now in theory we should be able to check the parameter type here, since the 590 // protocol is supposed to send us type information for the parameters, but this 591 // capability seems to be broken. This assertion is supported by the fact that 592 // the same information is not available via the MySQL C API either. It is up 593 // to the programmer to ensure that appropriate type information is embodied 594 // in the variant array, or provided explicitly. This sucks, but short of 595 // having a client side SQL parser I don't see what can be done. 596 // 597 // We require that the statement be prepared at this point so we can at least 598 // check that the parameter number is within the required range 599 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 600 enforceEx!MYX(pIndex < _psParams, "Parameter number is out of range for the prepared statement."); 601 _inParams[pIndex] = &val; 602 psn.pIndex = pIndex; 603 _psa[pIndex] = psn; 604 } 605 606 /** 607 * Bind a tuple of D variables to the parameters of a prepared statement. 608 * 609 * You can use this method to bind a set of variables if you don't need any specialization, 610 * that is there will be no null values, and chunked transfer is not neccessary. 611 * 612 * The tuple must match the required number of parameters, and it is the programmer's 613 * responsibility to ensure that they are of appropriate types. 614 */ 615 void bindParameterTuple(T...)(ref T args) 616 { 617 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 618 enforceEx!MYX(args.length == _psParams, "Argument list supplied does not match the number of parameters."); 619 foreach (size_t i, dummy; args) 620 _inParams[i] = &args[i]; 621 } 622 623 /** 624 * Bind a Variant[] as the parameters of a prepared statement. 625 * 626 * You can use this method to bind a set of variables in Variant form to 627 * the parameters of a prepared statement. 628 * 629 * Parameter specializations can be added if required. This method could be 630 * used to add records from a data entry form along the lines of 631 * ------------ 632 * auto c = Command(con, "insert into table42 values(?, ?, ?)"); 633 * c.prepare(); 634 * Variant[] va; 635 * va.length = 3; 636 * c.bindParameters(va); 637 * DataRecord dr; // Some data input facility 638 * ulong ra; 639 * do 640 * { 641 * dr.get(); 642 * va[0] = dr("Name"); 643 * va[1] = dr("City"); 644 * va[2] = dr("Whatever"); 645 * c.execPrepared(ra); 646 * } while(tod < "17:30"); 647 * ------------ 648 * Params: va = External list of Variants to be used as parameters 649 * psnList = any required specializations 650 */ 651 void bindParameters(Variant[] va, ParameterSpecialization[] psnList= null) 652 { 653 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 654 enforceEx!MYX(va.length == _psParams, "Param count supplied does not match prepared statement"); 655 _inParams[] = va[]; 656 if (psnList !is null) 657 { 658 foreach (PSN psn; psnList) 659 _psa[psn.pIndex] = psn; 660 } 661 } 662 663 /** 664 * Access a prepared statement parameter for update. 665 * 666 * Another style of usage would simply update the parameter Variant directly 667 * 668 * ------------ 669 * c.param(0) = 42; 670 * c.param(1) = "The answer"; 671 * ------------ 672 * Params: index = The zero based index 673 */ 674 ref Variant param(size_t index) pure 675 { 676 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 677 enforceEx!MYX(index < _psParams, "Parameter index out of range."); 678 return _inParams[index]; 679 } 680 681 /** 682 * Sets a prepared statement parameter to NULL. 683 * 684 * Params: index = The zero based index 685 */ 686 void setNullParam(size_t index) 687 { 688 enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound."); 689 enforceEx!MYX(index < _psParams, "Parameter index out of range."); 690 _psa[index].isNull = true; 691 _inParams[index] = ""; 692 } 693 694 /** 695 * Execute a one-off SQL command. 696 * 697 * Use this method when you are not going to be using the same command repeatedly. 698 * It can be used with commands that don't produce a result set, or those that 699 * do. If there is a result set its existence will be indicated by the return value. 700 * 701 * Any result set can be accessed vis getNextRow(), but you should really be 702 * using execSQLResult() or execSQLSequence() for such queries. 703 * 704 * Params: ra = An out parameter to receive the number of rows affected. 705 * Returns: true if there was a (possibly empty) result set. 706 */ 707 bool execSQL(out ulong ra) 708 { 709 scope(failure) _con.kill(); 710 711 _con.sendCmd(CommandType.QUERY, _sql); 712 _fieldCount = 0; 713 ubyte[] packet = _con.getPacket(); 714 bool rv; 715 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 716 { 717 _con.resetPacket(); 718 auto okp = OKErrorPacket(packet); 719 enforcePacketOK(okp); 720 ra = okp.affected; 721 _con._serverStatus = okp.serverStatus; 722 _insertID = okp.insertID; 723 rv = false; 724 } 725 else 726 { 727 // There was presumably a result set 728 assert(packet.front >= 1 && packet.front <= 250); // ResultSet packet header should have this value 729 _headersPending = _rowsPending = true; 730 _pendingBinary = false; 731 auto lcb = packet.consumeIfComplete!LCB(); 732 assert(!lcb.isNull); 733 assert(!lcb.isIncomplete); 734 _fieldCount = cast(ushort)lcb.value; 735 assert(_fieldCount == lcb.value); 736 rv = true; 737 ra = 0; 738 } 739 return rv; 740 } 741 742 /** 743 * Execute a one-off SQL command for the case where you expect a result set, 744 * and want it all at once. 745 * 746 * Use this method when you are not going to be using the same command repeatedly. 747 * This method will throw if the SQL command does not produce a result set. 748 * 749 * If there are long data items among the expected result columns you can specify 750 * that they are to be subject to chunked transfer via a delegate. 751 * 752 * Params: csa = An optional array of ColumnSpecialization structs. 753 * Returns: A (possibly empty) ResultSet. 754 */ 755 ResultSet execSQLResult(ColumnSpecialization[] csa = null) 756 { 757 ulong ra; 758 enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set."); 759 760 _rsh = ResultSetHeaders(_con, _fieldCount); 761 if (csa !is null) 762 _rsh.addSpecializations(csa); 763 _headersPending = false; 764 765 Row[] rows; 766 while(true) 767 { 768 auto packet = _con.getPacket(); 769 if(packet.isEOFPacket()) 770 break; 771 rows ~= Row(_con, packet, _rsh, false); 772 // As the row fetches more data while incomplete, it might already have 773 // fetched the EOF marker, so we have to check it again 774 if(!packet.empty && packet.isEOFPacket()) 775 break; 776 } 777 _rowsPending = _pendingBinary = false; 778 779 return ResultSet(rows, _rsh.fieldNames); 780 } 781 782 /** 783 * Execute a one-off SQL command for the case where you expect a result set, 784 * and want to deal with it a row at a time. 785 * 786 * Use this method when you are not going to be using the same command repeatedly. 787 * This method will throw if the SQL command does not produce a result set. 788 * 789 * If there are long data items among the expected result columns you can specify 790 * that they are to be subject to chunked transfer via a delegate. 791 * 792 * Params: csa = An optional array of ColumnSpecialization structs. 793 * Returns: A (possibly empty) ResultSequence. 794 */ 795 ResultSequence execSQLSequence(ColumnSpecialization[] csa = null) 796 { 797 uint alloc = 20; 798 Row[] rra; 799 rra.length = alloc; 800 uint cr = 0; 801 ulong ra; 802 enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set."); 803 _rsh = ResultSetHeaders(_con, _fieldCount); 804 if (csa !is null) 805 _rsh.addSpecializations(csa); 806 807 _headersPending = false; 808 return ResultSequence(&this, _rsh.fieldNames); 809 } 810 811 /** 812 * Execute a one-off SQL command to place result values into a set of D variables. 813 * 814 * Use this method when you are not going to be using the same command repeatedly. 815 * It will throw if the specified command does not produce a result set, or if 816 * any column type is incompatible with the corresponding D variable. 817 * 818 * Params: args = A tuple of D variables to receive the results. 819 * Returns: true if there was a (possibly empty) result set. 820 */ 821 void execSQLTuple(T...)(ref T args) 822 { 823 ulong ra; 824 enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set."); 825 Row rr = getNextRow(); 826 /+if (!rr._valid) // The result set was empty - not a crime. 827 return;+/ 828 enforceEx!MYX(rr._values.length == args.length, "Result column count does not match the target tuple."); 829 foreach (size_t i, dummy; args) 830 { 831 enforceEx!MYX(typeid(args[i]).toString() == rr._values[i].type.toString(), 832 "Tuple "~to!string(i)~" type and column type are not compatible."); 833 args[i] = rr._values[i].get!(typeof(args[i])); 834 } 835 // If there were more rows, flush them away 836 // Question: Should I check in purgeResult and throw if there were - it's very inefficient to 837 // allow sloppy SQL that does not ensure just one row! 838 purgeResult(); 839 } 840 841 /** 842 * Execute a prepared command. 843 * 844 * Use this method when you will use the same SQL command repeatedly. 845 * It can be used with commands that don't produce a result set, or those that 846 * do. If there is a result set its existence will be indicated by the return value. 847 * 848 * Any result set can be accessed vis getNextRow(), but you should really be 849 * using execPreparedResult() or execPreparedSequence() for such queries. 850 * 851 * Params: ra = An out parameter to receive the number of rows affected. 852 * Returns: true if there was a (possibly empty) result set. 853 */ 854 bool execPrepared(out ulong ra) 855 { 856 enforceEx!MYX(_hStmt, "The statement has not been prepared."); 857 scope(failure) _con.kill(); 858 859 ubyte[] packet; 860 _con.resetPacket(); 861 862 ubyte[] prefix = makePSPrefix(0); 863 size_t len = prefix.length; 864 bool longData; 865 866 if (_psh._paramCount) 867 { 868 ubyte[] one = [ 1 ]; 869 ubyte[] vals; 870 ubyte[] types = analyseParams(vals, longData); 871 ubyte[] nbm = makeBitmap(_psa); 872 packet = prefix ~ nbm ~ one ~ types ~ vals; 873 } 874 else 875 packet = prefix; 876 877 if (longData) 878 sendLongData(); 879 880 assert(packet.length <= uint.max); 881 packet.setPacketHeader(_con.pktNumber); 882 _con.bumpPacket(); 883 _con.send(packet); 884 packet = _con.getPacket(); 885 bool rv; 886 if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error) 887 { 888 _con.resetPacket(); 889 auto okp = OKErrorPacket(packet); 890 enforcePacketOK(okp); 891 ra = okp.affected; 892 _con._serverStatus = okp.serverStatus; 893 _insertID = okp.insertID; 894 rv = false; 895 } 896 else 897 { 898 // There was presumably a result set 899 _headersPending = _rowsPending = _pendingBinary = true; 900 auto lcb = packet.consumeIfComplete!LCB(); 901 assert(!lcb.isIncomplete); 902 _fieldCount = cast(ushort)lcb.value; 903 rv = true; 904 } 905 return rv; 906 } 907 908 /** 909 * Execute a prepared SQL command for the case where you expect a result set, 910 * and want it all at once. 911 * 912 * Use this method when you will use the same command repeatedly. 913 * This method will throw if the SQL command does not produce a result set. 914 * 915 * If there are long data items among the expected result columns you can specify 916 * that they are to be subject to chunked transfer via a delegate. 917 * 918 * Params: csa = An optional array of ColumnSpecialization structs. 919 * Returns: A (possibly empty) ResultSet. 920 */ 921 ResultSet execPreparedResult(ColumnSpecialization[] csa = null) 922 { 923 ulong ra; 924 enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set."); 925 uint alloc = 20; 926 Row[] rra; 927 rra.length = alloc; 928 uint cr = 0; 929 _rsh = ResultSetHeaders(_con, _fieldCount); 930 if (csa !is null) 931 _rsh.addSpecializations(csa); 932 _headersPending = false; 933 ubyte[] packet; 934 for (size_t i = 0;; i++) 935 { 936 packet = _con.getPacket(); 937 if (packet.isEOFPacket()) 938 break; 939 Row row = Row(_con, packet, _rsh, true); 940 if (cr >= alloc) 941 { 942 alloc = (alloc*3)/2; 943 rra.length = alloc; 944 } 945 rra[cr++] = row; 946 if (!packet.empty && packet.isEOFPacket()) 947 break; 948 } 949 _rowsPending = _pendingBinary = false; 950 rra.length = cr; 951 ResultSet rs = ResultSet(rra, _rsh.fieldNames); 952 return rs; 953 } 954 955 /** 956 * Execute a prepared SQL command for the case where you expect a result set, 957 * and want to deal with it one row at a time. 958 * 959 * Use this method when you will use the same command repeatedly. 960 * This method will throw if the SQL command does not produce a result set. 961 * 962 * If there are long data items among the expected result columns you can 963 * specify that they are to be subject to chunked transfer via a delegate. 964 * 965 * Params: csa = An optional array of ColumnSpecialization structs. 966 * Returns: A (possibly empty) ResultSequence. 967 */ 968 ResultSequence execPreparedSequence(ColumnSpecialization[] csa = null) 969 { 970 ulong ra; 971 enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set."); 972 uint alloc = 20; 973 Row[] rra; 974 rra.length = alloc; 975 uint cr = 0; 976 _rsh = ResultSetHeaders(_con, _fieldCount); 977 if (csa !is null) 978 _rsh.addSpecializations(csa); 979 _headersPending = false; 980 return ResultSequence(&this, _rsh.fieldNames); 981 } 982 983 /** 984 * Execute a prepared SQL command to place result values into a set of D variables. 985 * 986 * Use this method when you will use the same command repeatedly. 987 * It will throw if the specified command does not produce a result set, or 988 * if any column type is incompatible with the corresponding D variable 989 * 990 * Params: args = A tuple of D variables to receive the results. 991 * Returns: true if there was a (possibly empty) result set. 992 */ 993 void execPreparedTuple(T...)(ref T args) 994 { 995 ulong ra; 996 enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set."); 997 Row rr = getNextRow(); 998 // enforceEx!MYX(rr._valid, "The result set was empty."); 999 enforceEx!MYX(rr._values.length == args.length, "Result column count does not match the target tuple."); 1000 foreach (size_t i, dummy; args) 1001 { 1002 enforceEx!MYX(typeid(args[i]).toString() == rr._values[i].type.toString(), 1003 "Tuple "~to!string(i)~" type and column type are not compatible."); 1004 args[i] = rr._values[i].get!(typeof(args[i])); 1005 } 1006 // If there were more rows, flush them away 1007 // Question: Should I check in purgeResult and throw if there were - it's very inefficient to 1008 // allow sloppy SQL that does not ensure just one row! 1009 purgeResult(); 1010 } 1011 1012 /** 1013 * Get the next Row of a pending result set. 1014 * 1015 * This method can be used after either execSQL() or execPrepared() have returned true 1016 * to retrieve result set rows sequentially. 1017 * 1018 * Similar functionality is available via execSQLSequence() and execPreparedSequence() in 1019 * which case the interface is presented as a forward range of Rows. 1020 * 1021 * This method allows you to deal with very large result sets either a row at a time, 1022 * or by feeding the rows into some suitable container such as a linked list. 1023 * 1024 * Returns: A Row object. 1025 */ 1026 Row getNextRow() 1027 { 1028 scope(failure) _con.kill(); 1029 1030 if (_headersPending) 1031 { 1032 _rsh = ResultSetHeaders(_con, _fieldCount); 1033 _headersPending = false; 1034 } 1035 ubyte[] packet; 1036 Row rr; 1037 packet = _con.getPacket(); 1038 if (packet.isEOFPacket()) 1039 { 1040 _rowsPending = _pendingBinary = false; 1041 return rr; 1042 } 1043 if (_pendingBinary) 1044 rr = Row(_con, packet, _rsh, true); 1045 else 1046 rr = Row(_con, packet, _rsh, false); 1047 //rr._valid = true; 1048 return rr; 1049 } 1050 1051 /** 1052 * Execute a stored function, with any required input variables, and store the 1053 * return value into a D variable. 1054 * 1055 * For this method, no query string is to be provided. The required one is of 1056 * the form "select foo(?, ? ...)". The method generates it and the appropriate 1057 * bindings - in, and out. Chunked transfers are not supported in either 1058 * direction. If you need them, create the parameters separately, then use 1059 * execPreparedResult() to get a one-row, one-column result set. 1060 * 1061 * If it is not possible to convert the column value to the type of target, 1062 * then execFunction will throw. If the result is NULL, that is indicated 1063 * by a false return value, and target is unchanged. 1064 * 1065 * In the interest of performance, this method assumes that the user has the 1066 * equired information about the number and types of IN parameters and the 1067 * type of the output variable. In the same interest, if the method is called 1068 * repeatedly for the same stored function, prepare() is omitted after the first call. 1069 * 1070 * Params: 1071 * T = The type of the variable to receive the return result. 1072 * U = type tuple of arguments 1073 * name = The name of the stored function. 1074 * target = the D variable to receive the stored function return result. 1075 * args = The list of D variables to act as IN arguments to the stored function. 1076 * 1077 */ 1078 bool execFunction(T, U...)(string name, ref T target, U args) 1079 { 1080 bool repeatCall = (name == _prevFunc); 1081 enforceEx!MYX(repeatCall || _hStmt == 0, "You must not prepare the statement before calling execFunction"); 1082 if (!repeatCall) 1083 { 1084 _sql = "select " ~ name ~ "("; 1085 bool comma = false; 1086 foreach (arg; args) 1087 { 1088 if (comma) 1089 _sql ~= ",?"; 1090 else 1091 { 1092 _sql ~= "?"; 1093 comma = true; 1094 } 1095 } 1096 _sql ~= ")"; 1097 prepare(); 1098 _prevFunc = name; 1099 } 1100 bindParameterTuple(args); 1101 ulong ra; 1102 enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set."); 1103 Row rr = getNextRow(); 1104 /+enforceEx!MYX(rr._valid, "The result set was empty.");+/ 1105 enforceEx!MYX(rr._values.length == 1, "Result was not a single column."); 1106 enforceEx!MYX(typeid(target).toString() == rr._values[0].type.toString(), 1107 "Target type and column type are not compatible."); 1108 if (!rr.isNull(0)) 1109 target = rr._values[0].get!(T); 1110 // If there were more rows, flush them away 1111 // Question: Should I check in purgeResult and throw if there were - it's very inefficient to 1112 // allow sloppy SQL that does not ensure just one row! 1113 purgeResult(); 1114 return !rr.isNull(0); 1115 } 1116 1117 /** 1118 * Execute a stored procedure, with any required input variables. 1119 * 1120 * For this method, no query string is to be provided. The required one is 1121 * of the form "call proc(?, ? ...)". The method generates it and the 1122 * appropriate in bindings. Chunked transfers are not supported. If you 1123 * need them, create the parameters separately, then use execPrepared() or 1124 * execPreparedResult(). 1125 * 1126 * In the interest of performance, this method assumes that the user has 1127 * the required information about the number and types of IN parameters. 1128 * In the same interest, if the method is called repeatedly for the same 1129 * stored function, prepare() and other redundant operations are omitted 1130 * after the first call. 1131 * 1132 * OUT parameters are not currently supported. It should generally be 1133 * possible with MySQL to present them as a result set. 1134 * 1135 * Params: 1136 * T = Type tuple 1137 * name = The name of the stored procedure. 1138 * args = Tuple of args 1139 * Returns: True if the SP created a result set. 1140 */ 1141 bool execProcedure(T...)(string name, ref T args) 1142 { 1143 bool repeatCall = (name == _prevFunc); 1144 enforceEx!MYX(repeatCall || _hStmt == 0, "You must not prepare a statement before calling execProcedure"); 1145 if (!repeatCall) 1146 { 1147 _sql = "call " ~ name ~ "("; 1148 bool comma = false; 1149 foreach (arg; args) 1150 { 1151 if (comma) 1152 _sql ~= ",?"; 1153 else 1154 { 1155 _sql ~= "?"; 1156 comma = true; 1157 } 1158 } 1159 _sql ~= ")"; 1160 prepare(); 1161 _prevFunc = name; 1162 } 1163 bindParameterTuple(args); 1164 ulong ra; 1165 return execPrepared(ra); 1166 } 1167 1168 /// After a command that inserted a row into a table with an auto-increment 1169 /// ID column, this method allows you to retrieve the last insert ID. 1170 @property ulong lastInsertID() pure const nothrow { return _insertID; } 1171 1172 /// Gets the number of parameters in this Command 1173 @property ushort numParams() pure const nothrow 1174 { 1175 return _psParams; 1176 } 1177 1178 /// Gets the number of rows pending 1179 @property bool rowsPending() pure const nothrow { return _rowsPending; } 1180 1181 /// Gets the result header's field descriptions. 1182 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 1183 /// Gets the prepared header's field descriptions. 1184 @property FieldDescription[] preparedFieldDescriptions() pure { return _psh.fieldDescriptions; } 1185 /// Gets the prepared header's param descriptions. 1186 @property ParamDescription[] preparedParamDescriptions() pure { return _psh.paramDescriptions; } 1187 }