1 /// Connect to a MySQL/MariaDB server. 2 module mysql.connection; 3 4 import std.algorithm; 5 import std.conv; 6 import std.digest.sha; 7 import std.exception; 8 import std.range; 9 import std.socket; 10 import std.string; 11 import std.typecons; 12 13 import mysql.commands; 14 import mysql.exceptions; 15 import mysql.prepared; 16 import mysql.protocol.constants; 17 import mysql.protocol.packets; 18 import mysql.protocol.sockets; 19 import mysql.result; 20 debug(MYSQLN_TESTS) 21 { 22 import mysql.test.common; 23 } 24 25 version(Have_vibe_d_core) 26 { 27 static if(__traits(compiles, (){ import vibe.core.net; } )) 28 import vibe.core.net; 29 else 30 static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'."); 31 } 32 33 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection. 34 immutable SvrCapFlags defaultClientFlags = 35 SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS | 36 SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 | 37 SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS | 38 //SvrCapFlags.MULTI_RESULTS; 39 40 /++ 41 Submit an SQL command to the server to be compiled into a prepared statement. 42 43 This will automatically register the prepared statement on the provided connection. 44 The resulting `Prepared` can then be used freely on ANY `Connection`, 45 as it will automatically be registered upon its first use on other connections. 46 Or, pass it to `Connection.register` if you prefer eager registration. 47 48 Internally, the result of a successful outcome will be a statement handle - an ID - 49 for the prepared statement, a count of the parameters required for 50 execution of the statement, and a count of the columns that will be present 51 in any result set that the command generates. 52 53 The server will then proceed to send prepared statement headers, 54 including parameter descriptions, and result set field descriptions, 55 followed by an EOF packet. 56 57 Throws: `mysql.exceptions.MYX` if the server has a problem. 58 +/ 59 Prepared prepare(Connection conn, string sql) 60 { 61 auto info = conn.registerIfNeeded(sql); 62 return Prepared(sql, info.headers, info.numParams); 63 } 64 65 /++ 66 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. 67 68 See `BackwardCompatPrepared` for more info. 69 +/ 70 deprecated("This is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. You should migrate from this to the Prepared-compatible exec/query overloads in 'mysql.commands'.") 71 BackwardCompatPrepared prepareBackwardCompat(Connection conn, string sql) 72 { 73 return BackwardCompatPrepared(conn, prepare(conn, sql)); 74 } 75 76 /++ 77 Convenience function to create a prepared statement which calls a stored function. 78 79 Be careful that your numArgs is correct. If it isn't, you may get a 80 `mysql.exceptions.MYX` with a very unclear error message. 81 82 Throws: `mysql.exceptions.MYX` if the server has a problem. 83 84 Params: 85 name = The name of the stored function. 86 numArgs = The number of arguments the stored procedure takes. 87 +/ 88 Prepared prepareFunction(Connection conn, string name, int numArgs) 89 { 90 auto sql = "select " ~ name ~ preparedPlaceholderArgs(numArgs); 91 return prepare(conn, sql); 92 } 93 94 /// 95 unittest 96 { 97 debug(MYSQLN_TESTS) 98 { 99 import mysql.test.common; 100 mixin(scopedCn); 101 102 exec(cn, `DROP FUNCTION IF EXISTS hello`); 103 exec(cn, ` 104 CREATE FUNCTION hello (s CHAR(20)) 105 RETURNS CHAR(50) DETERMINISTIC 106 RETURN CONCAT('Hello ',s,'!') 107 `); 108 109 auto preparedHello = prepareFunction(cn, "hello", 1); 110 preparedHello.setArgs("World"); 111 auto rs = cn.query(preparedHello).array; 112 assert(rs.length == 1); 113 assert(rs[0][0] == "Hello World!"); 114 } 115 } 116 117 /++ 118 Convenience function to create a prepared statement which calls a stored procedure. 119 120 OUT parameters are currently not supported. It should generally be 121 possible with MySQL to present them as a result set. 122 123 Be careful that your numArgs is correct. If it isn't, you may get a 124 `mysql.exceptions.MYX` with a very unclear error message. 125 126 Throws: `mysql.exceptions.MYX` if the server has a problem. 127 128 Params: 129 name = The name of the stored procedure. 130 numArgs = The number of arguments the stored procedure takes. 131 132 +/ 133 Prepared prepareProcedure(Connection conn, string name, int numArgs) 134 { 135 auto sql = "call " ~ name ~ preparedPlaceholderArgs(numArgs); 136 return prepare(conn, sql); 137 } 138 139 /// 140 unittest 141 { 142 debug(MYSQLN_TESTS) 143 { 144 import mysql.test.common; 145 import mysql.test.integration; 146 mixin(scopedCn); 147 initBaseTestTables(cn); 148 149 exec(cn, `DROP PROCEDURE IF EXISTS insert2`); 150 exec(cn, ` 151 CREATE PROCEDURE insert2 (IN p1 INT, IN p2 CHAR(50)) 152 BEGIN 153 INSERT INTO basetest (intcol, stringcol) VALUES(p1, p2); 154 END 155 `); 156 157 auto preparedInsert2 = prepareProcedure(cn, "insert2", 2); 158 preparedInsert2.setArgs(2001, "inserted string 1"); 159 cn.exec(preparedInsert2); 160 161 auto rs = query(cn, "SELECT stringcol FROM basetest WHERE intcol=2001").array; 162 assert(rs.length == 1); 163 assert(rs[0][0] == "inserted string 1"); 164 } 165 } 166 167 private string preparedPlaceholderArgs(int numArgs) 168 { 169 auto sql = "("; 170 bool comma = false; 171 foreach(i; 0..numArgs) 172 { 173 if (comma) 174 sql ~= ",?"; 175 else 176 { 177 sql ~= "?"; 178 comma = true; 179 } 180 } 181 sql ~= ")"; 182 183 return sql; 184 } 185 186 debug(MYSQLN_TESTS) 187 unittest 188 { 189 assert(preparedPlaceholderArgs(3) == "(?,?,?)"); 190 assert(preparedPlaceholderArgs(2) == "(?,?)"); 191 assert(preparedPlaceholderArgs(1) == "(?)"); 192 assert(preparedPlaceholderArgs(0) == "()"); 193 } 194 195 /// Per-connection info from the server about a registered prepared statement. 196 package struct PreparedServerInfo 197 { 198 /// Server's identifier for this prepared statement. 199 /// Apperently, this is never 0 if it's been registered, 200 /// although mysql-native no longer relies on that. 201 uint statementId; 202 203 ushort psWarnings; 204 205 /// Number of parameters this statement takes. 206 /// 207 /// This will be the same on all connections, but it's returned 208 /// by the server upon registration, so it's stored here. 209 ushort numParams; 210 211 /// Prepared statement headers 212 /// 213 /// This will be the same on all connections, but it's returned 214 /// by the server upon registration, so it's stored here. 215 PreparedStmtHeaders headers; 216 217 /// Not actually from the server. Connection uses this to keep track 218 /// of statements that should be treated as having been released. 219 bool queuedForRelease = false; 220 } 221 222 /++ 223 This is a wrapper over `Prepared` which is provided ONLY as a 224 temporary aid in upgrading to mysql-native v2.0.0 and its 225 new connection-independent model of prepared statements. 226 227 In most cases, this layer shouldn't even be needed. But if you have many 228 lines of code making calls to exec/query the same prepared statement, 229 then this may be helpful. 230 231 To use this temporary compatability layer, change instances of: 232 233 --- 234 auto stmt = conn.prepare(...); 235 --- 236 237 to: 238 239 --- 240 auto stmt = conn.prepareBackwardCompat(...); 241 --- 242 243 And then your prepared statement should work as before. 244 245 BUT DO NOT LEAVE IT LIKE THIS! Ultimately, you should update 246 your prepared statement code to the mysql-native v2.0.0 API, by changing 247 instances of: 248 249 --- 250 stmt.exec() 251 stmt.query() 252 stmt.queryRow() 253 stmt.queryRowTuple(outputArgs...) 254 stmt.queryValue() 255 --- 256 257 to: 258 259 --- 260 conn.exec(stmt) 261 conn.query(stmt) 262 conn.queryRow(stmt) 263 conn.queryRowTuple(stmt, outputArgs...) 264 conn.queryValue(stmt) 265 --- 266 267 Both of the above syntaxes can be used with a `BackwardCompatPrepared` 268 (the `Connection` passed directly to `exec`/`query` will override the 269 one embedded associated with your `BackwardCompatPrepared`). 270 271 Once all of your code is updated, you can change `prepareBackwardCompat` 272 back to `prepare` again, and your upgrade will be complete. 273 +/ 274 struct BackwardCompatPrepared 275 { 276 import std.variant; 277 278 private Connection _conn; 279 Prepared _prepared; 280 281 /// Access underlying `Prepared` 282 @property Prepared prepared() { return _prepared; } 283 284 alias _prepared this; 285 286 /++ 287 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. 288 289 See `BackwardCompatPrepared` for more info. 290 +/ 291 deprecated("Change 'preparedStmt.exec()' to 'conn.exec(preparedStmt)'") 292 ulong exec() 293 { 294 return .exec(_conn, _prepared); 295 } 296 297 ///ditto 298 deprecated("Change 'preparedStmt.query()' to 'conn.query(preparedStmt)'") 299 ResultRange query(ColumnSpecialization[] csa = null) 300 { 301 return .query(_conn, _prepared, csa); 302 } 303 304 ///ditto 305 deprecated("Change 'preparedStmt.queryRow()' to 'conn.queryRow(preparedStmt)'") 306 Nullable!Row queryRow(ColumnSpecialization[] csa = null) 307 { 308 return .queryRow(_conn, _prepared, csa); 309 } 310 311 ///ditto 312 deprecated("Change 'preparedStmt.queryRowTuple(outArgs...)' to 'conn.queryRowTuple(preparedStmt, outArgs...)'") 313 void queryRowTuple(T...)(ref T args) if(T.length == 0 || !is(T[0] : Connection)) 314 { 315 return .queryRowTuple(_conn, _prepared, args); 316 } 317 318 ///ditto 319 deprecated("Change 'preparedStmt.queryValue()' to 'conn.queryValue(preparedStmt)'") 320 Nullable!Variant queryValue(ColumnSpecialization[] csa = null) 321 { 322 return .queryValue(_conn, _prepared, csa); 323 } 324 } 325 326 //TODO: All low-level commms should be moved into the mysql.protocol package. 327 /// Low-level comms code relating to prepared statements. 328 package struct ProtocolPrepared 329 { 330 import std.conv; 331 import std.datetime; 332 import std.variant; 333 import mysql.types; 334 335 static ubyte[] makeBitmap(in Variant[] inParams) 336 { 337 size_t bml = (inParams.length+7)/8; 338 ubyte[] bma; 339 bma.length = bml; 340 foreach (i; 0..inParams.length) 341 { 342 if(inParams[i].type != typeid(typeof(null))) 343 continue; 344 size_t bn = i/8; 345 size_t bb = i%8; 346 ubyte sr = 1; 347 sr <<= bb; 348 bma[bn] |= sr; 349 } 350 return bma; 351 } 352 353 static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow 354 { 355 ubyte[] prefix; 356 prefix.length = 14; 357 358 prefix[4] = CommandType.STMT_EXECUTE; 359 hStmt.packInto(prefix[5..9]); 360 prefix[9] = flags; // flags, no cursor 361 prefix[10] = 1; // iteration count - currently always 1 362 prefix[11] = 0; 363 prefix[12] = 0; 364 prefix[13] = 0; 365 366 return prefix; 367 } 368 369 //TODO: All low-level commms should be moved into the mysql.protocol package. 370 static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa, 371 out ubyte[] vals, out bool longData) 372 { 373 size_t pc = inParams.length; 374 ubyte[] types; 375 types.length = pc*2; 376 size_t alloc = pc*20; 377 vals.length = alloc; 378 uint vcl = 0, len; 379 int ct = 0; 380 381 void reAlloc(size_t n) 382 { 383 if (vcl+n < alloc) 384 return; 385 size_t inc = (alloc*3)/2; 386 if (inc < n) 387 inc = n; 388 alloc += inc; 389 vals.length = alloc; 390 } 391 392 foreach (size_t i; 0..pc) 393 { 394 enum UNSIGNED = 0x80; 395 enum SIGNED = 0; 396 if (psa[i].chunkSize) 397 longData= true; 398 if (inParams[i].type == typeid(typeof(null))) 399 { 400 types[ct++] = SQLType.NULL; 401 types[ct++] = SIGNED; 402 continue; 403 } 404 Variant v = inParams[i]; 405 SQLType ext = psa[i].type; 406 string ts = v.type.toString(); 407 bool isRef; 408 if (ts[$-1] == '*') 409 { 410 ts.length = ts.length-1; 411 isRef= true; 412 } 413 414 switch (ts) 415 { 416 case "bool": 417 if (ext == SQLType.INFER_FROM_D_TYPE) 418 types[ct++] = SQLType.BIT; 419 else 420 types[ct++] = cast(ubyte) ext; 421 types[ct++] = SIGNED; 422 reAlloc(2); 423 bool bv = isRef? *(v.get!(bool*)): v.get!(bool); 424 vals[vcl++] = 1; 425 vals[vcl++] = bv? 0x31: 0x30; 426 break; 427 case "byte": 428 types[ct++] = SQLType.TINY; 429 types[ct++] = SIGNED; 430 reAlloc(1); 431 vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte); 432 break; 433 case "ubyte": 434 types[ct++] = SQLType.TINY; 435 types[ct++] = UNSIGNED; 436 reAlloc(1); 437 vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte); 438 break; 439 case "short": 440 types[ct++] = SQLType.SHORT; 441 types[ct++] = SIGNED; 442 reAlloc(2); 443 short si = isRef? *(v.get!(short*)): v.get!(short); 444 vals[vcl++] = cast(ubyte) (si & 0xff); 445 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 446 break; 447 case "ushort": 448 types[ct++] = SQLType.SHORT; 449 types[ct++] = UNSIGNED; 450 reAlloc(2); 451 ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort); 452 vals[vcl++] = cast(ubyte) (us & 0xff); 453 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 454 break; 455 case "int": 456 types[ct++] = SQLType.INT; 457 types[ct++] = SIGNED; 458 reAlloc(4); 459 int ii = isRef? *(v.get!(int*)): v.get!(int); 460 vals[vcl++] = cast(ubyte) (ii & 0xff); 461 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 462 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 463 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 464 break; 465 case "uint": 466 types[ct++] = SQLType.INT; 467 types[ct++] = UNSIGNED; 468 reAlloc(4); 469 uint ui = isRef? *(v.get!(uint*)): v.get!(uint); 470 vals[vcl++] = cast(ubyte) (ui & 0xff); 471 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 472 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 473 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 474 break; 475 case "long": 476 types[ct++] = SQLType.LONGLONG; 477 types[ct++] = SIGNED; 478 reAlloc(8); 479 long li = isRef? *(v.get!(long*)): v.get!(long); 480 vals[vcl++] = cast(ubyte) (li & 0xff); 481 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 482 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 483 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 484 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 485 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 486 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 487 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 488 break; 489 case "ulong": 490 types[ct++] = SQLType.LONGLONG; 491 types[ct++] = UNSIGNED; 492 reAlloc(8); 493 ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong); 494 vals[vcl++] = cast(ubyte) (ul & 0xff); 495 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 496 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 497 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 498 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 499 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 500 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 501 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 502 break; 503 case "float": 504 types[ct++] = SQLType.FLOAT; 505 types[ct++] = SIGNED; 506 reAlloc(4); 507 float f = isRef? *(v.get!(float*)): v.get!(float); 508 ubyte* ubp = cast(ubyte*) &f; 509 vals[vcl++] = *ubp++; 510 vals[vcl++] = *ubp++; 511 vals[vcl++] = *ubp++; 512 vals[vcl++] = *ubp; 513 break; 514 case "double": 515 types[ct++] = SQLType.DOUBLE; 516 types[ct++] = SIGNED; 517 reAlloc(8); 518 double d = isRef? *(v.get!(double*)): v.get!(double); 519 ubyte* ubp = cast(ubyte*) &d; 520 vals[vcl++] = *ubp++; 521 vals[vcl++] = *ubp++; 522 vals[vcl++] = *ubp++; 523 vals[vcl++] = *ubp++; 524 vals[vcl++] = *ubp++; 525 vals[vcl++] = *ubp++; 526 vals[vcl++] = *ubp++; 527 vals[vcl++] = *ubp; 528 break; 529 case "std.datetime.date.Date": 530 case "std.datetime.Date": 531 types[ct++] = SQLType.DATE; 532 types[ct++] = SIGNED; 533 Date date = isRef? *(v.get!(Date*)): v.get!(Date); 534 ubyte[] da = pack(date); 535 size_t l = da.length; 536 reAlloc(l); 537 vals[vcl..vcl+l] = da[]; 538 vcl += l; 539 break; 540 case "std.datetime.TimeOfDay": 541 case "std.datetime.Time": 542 types[ct++] = SQLType.TIME; 543 types[ct++] = SIGNED; 544 TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay); 545 ubyte[] ta = pack(time); 546 size_t l = ta.length; 547 reAlloc(l); 548 vals[vcl..vcl+l] = ta[]; 549 vcl += l; 550 break; 551 case "std.datetime.date.DateTime": 552 case "std.datetime.DateTime": 553 types[ct++] = SQLType.DATETIME; 554 types[ct++] = SIGNED; 555 DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime); 556 ubyte[] da = pack(dt); 557 size_t l = da.length; 558 reAlloc(l); 559 vals[vcl..vcl+l] = da[]; 560 vcl += l; 561 break; 562 case "mysql.types.Timestamp": 563 types[ct++] = SQLType.TIMESTAMP; 564 types[ct++] = SIGNED; 565 Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp); 566 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 567 ubyte[] da = pack(dt); 568 size_t l = da.length; 569 reAlloc(l); 570 vals[vcl..vcl+l] = da[]; 571 vcl += l; 572 break; 573 case "immutable(char)[]": 574 if (ext == SQLType.INFER_FROM_D_TYPE) 575 types[ct++] = SQLType.VARCHAR; 576 else 577 types[ct++] = cast(ubyte) ext; 578 types[ct++] = SIGNED; 579 string s = isRef? *(v.get!(string*)): v.get!(string); 580 ubyte[] packed = packLCS(cast(void[]) s); 581 reAlloc(packed.length); 582 vals[vcl..vcl+packed.length] = packed[]; 583 vcl += packed.length; 584 break; 585 case "char[]": 586 if (ext == SQLType.INFER_FROM_D_TYPE) 587 types[ct++] = SQLType.VARCHAR; 588 else 589 types[ct++] = cast(ubyte) ext; 590 types[ct++] = SIGNED; 591 char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]); 592 ubyte[] packed = packLCS(cast(void[]) ca); 593 reAlloc(packed.length); 594 vals[vcl..vcl+packed.length] = packed[]; 595 vcl += packed.length; 596 break; 597 case "byte[]": 598 if (ext == SQLType.INFER_FROM_D_TYPE) 599 types[ct++] = SQLType.TINYBLOB; 600 else 601 types[ct++] = cast(ubyte) ext; 602 types[ct++] = SIGNED; 603 byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]); 604 ubyte[] packed = packLCS(cast(void[]) ba); 605 reAlloc(packed.length); 606 vals[vcl..vcl+packed.length] = packed[]; 607 vcl += packed.length; 608 break; 609 case "ubyte[]": 610 if (ext == SQLType.INFER_FROM_D_TYPE) 611 types[ct++] = SQLType.TINYBLOB; 612 else 613 types[ct++] = cast(ubyte) ext; 614 types[ct++] = SIGNED; 615 ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]); 616 ubyte[] packed = packLCS(cast(void[]) uba); 617 reAlloc(packed.length); 618 vals[vcl..vcl+packed.length] = packed[]; 619 vcl += packed.length; 620 break; 621 case "void": 622 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 623 default: 624 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 625 } 626 } 627 vals.length = vcl; 628 return types; 629 } 630 631 static void sendLongData(Connection conn, uint hStmt, ParameterSpecialization[] psa) 632 { 633 assert(psa.length <= ushort.max); // parameter number is sent as short 634 foreach (ushort i, PSN psn; psa) 635 { 636 if (!psn.chunkSize) continue; 637 uint cs = psn.chunkSize; 638 uint delegate(ubyte[]) dg = psn.chunkDelegate; 639 640 //TODO: All low-level commms should be moved into the mysql.protocol package. 641 ubyte[] chunk; 642 chunk.length = cs+11; 643 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 644 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 645 hStmt.packInto(chunk[5..9]); // statement handle 646 packInto(i, chunk[9..11]); // parameter number 647 648 // byte 11 on is payload 649 for (;;) 650 { 651 uint sent = dg(chunk[11..cs+11]); 652 if (sent < cs) 653 { 654 if (sent == 0) // data was exact multiple of chunk size - all sent 655 break; 656 sent += 7; // adjust for non-payload bytes 657 chunk.length = chunk.length - (cs-sent); // trim the chunk 658 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 659 conn.send(chunk); 660 break; 661 } 662 conn.send(chunk); 663 } 664 } 665 } 666 667 static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, 668 Variant[] inParams, ParameterSpecialization[] psa) 669 { 670 conn.autoPurge(); 671 672 //TODO: All low-level commms should be moved into the mysql.protocol package. 673 ubyte[] packet; 674 conn.resetPacket(); 675 676 ubyte[] prefix = makePSPrefix(hStmt, 0); 677 size_t len = prefix.length; 678 bool longData; 679 680 if (psh.paramCount) 681 { 682 ubyte[] one = [ 1 ]; 683 ubyte[] vals; 684 ubyte[] types = analyseParams(inParams, psa, vals, longData); 685 ubyte[] nbm = makeBitmap(inParams); 686 packet = prefix ~ nbm ~ one ~ types ~ vals; 687 } 688 else 689 packet = prefix; 690 691 if (longData) 692 sendLongData(conn, hStmt, psa); 693 694 assert(packet.length <= uint.max); 695 packet.setPacketHeader(conn.pktNumber); 696 conn.bumpPacket(); 697 conn.send(packet); 698 } 699 } 700 701 /++ 702 A class representing a database connection. 703 704 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 705 creating a new Connection directly. That will provide certain benefits, 706 such as reusing old connections and automatic cleanup (no need to close 707 the connection when done). 708 709 ------------------ 710 // Suggested usage: 711 712 { 713 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 714 scope(exit) con.close(); 715 716 // Use the connection 717 ... 718 } 719 ------------------ 720 +/ 721 //TODO: All low-level commms should be moved into the mysql.protocol package. 722 class Connection 723 { 724 /+ 725 The Connection is responsible for handshaking with the server to establish 726 authentication. It then passes client preferences to the server, and 727 subsequently is the channel for all command packets that are sent, and all 728 response packets received. 729 730 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one 731 byte as a packet number. Connection deals with the headers and ensures that 732 packet numbers are sequential. 733 734 The initial packet is sent by the server - essentially a 'hello' packet 735 inviting login. That packet has a sequence number of zero. That sequence 736 number is the incremented by client and server packets through the handshake 737 sequence. 738 739 After login all further sequences are initialized by the client sending a 740 command packet with a zero sequence number, to which the server replies with 741 zero or more packets with sequential sequence numbers. 742 +/ 743 package: 744 enum OpenState 745 { 746 /// We have not yet connected to the server, or have sent QUIT to the 747 /// server and closed the connection 748 notConnected, 749 /// We have connected to the server and parsed the greeting, but not 750 /// yet authenticated 751 connected, 752 /// We have successfully authenticated against the server, and need to 753 /// send QUIT to the server when closing the connection 754 authenticated 755 } 756 OpenState _open; 757 MySQLSocket _socket; 758 759 SvrCapFlags _sCaps, _cCaps; 760 uint _sThread; 761 ushort _serverStatus; 762 ubyte _sCharSet, _protocol; 763 string _serverVersion; 764 765 string _host, _user, _pwd, _db; 766 ushort _port; 767 768 MySQLSocketType _socketType; 769 770 OpenSocketCallbackPhobos _openSocketPhobos; 771 OpenSocketCallbackVibeD _openSocketVibeD; 772 773 ulong _insertID; 774 775 // This gets incremented every time a command is issued or results are purged, 776 // so a ResultRange can tell whether it's been invalidated. 777 ulong _lastCommandID; 778 779 // Whether there are rows, headers or bimary data waiting to be retreived. 780 // MySQL protocol doesn't permit performing any other action until all 781 // such data is read. 782 bool _rowsPending, _headersPending, _binaryPending; 783 784 // Field count of last performed command. 785 //TODO: Does Connection need to store this? 786 ushort _fieldCount; 787 788 // ResultSetHeaders of last performed command. 789 //TODO: Does Connection need to store this? Is this even used? 790 ResultSetHeaders _rsh; 791 792 // This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise 793 // you'll get the dreaded "packet out of order" message. It, and the socket connection are 794 // the reason why most other objects require a connection object for their construction. 795 ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct 796 /// ordering. First packet should have 0 797 @property ubyte pktNumber() { return _cpn; } 798 void bumpPacket() { _cpn++; } 799 void resetPacket() { _cpn = 0; } 800 801 version(Have_vibe_d_core) {} else 802 pure const nothrow invariant() 803 { 804 assert(_socketType != MySQLSocketType.vibed); 805 } 806 807 ubyte[] getPacket() 808 { 809 scope(failure) kill(); 810 811 ubyte[4] header; 812 _socket.read(header); 813 // number of bytes always set as 24-bit 814 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 815 enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order"); 816 bumpPacket(); 817 818 ubyte[] packet = new ubyte[numDataBytes]; 819 _socket.read(packet); 820 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 821 return packet; 822 } 823 824 void send(const(ubyte)[] packet) 825 in 826 { 827 assert(packet.length > 4); // at least 1 byte more than header 828 } 829 body 830 { 831 _socket.write(packet); 832 } 833 834 void send(const(ubyte)[] header, const(ubyte)[] data) 835 in 836 { 837 assert(header.length == 4 || header.length == 5/*command type included*/); 838 } 839 body 840 { 841 _socket.write(header); 842 if(data.length) 843 _socket.write(data); 844 } 845 846 void sendCmd(T)(CommandType cmd, const(T)[] data) 847 in 848 { 849 // Internal thread states. Clients shouldn't use this 850 assert(cmd != CommandType.SLEEP); 851 assert(cmd != CommandType.CONNECT); 852 assert(cmd != CommandType.TIME); 853 assert(cmd != CommandType.DELAYED_INSERT); 854 assert(cmd != CommandType.CONNECT_OUT); 855 856 // Deprecated 857 assert(cmd != CommandType.CREATE_DB); 858 assert(cmd != CommandType.DROP_DB); 859 assert(cmd != CommandType.TABLE_DUMP); 860 861 // cannot send more than uint.max bytes. TODO: better error message if we try? 862 assert(data.length <= uint.max); 863 } 864 out 865 { 866 // at this point we should have sent a command 867 assert(pktNumber == 1); 868 } 869 body 870 { 871 autoPurge(); 872 873 scope(failure) kill(); 874 875 _lastCommandID++; 876 877 if(!_socket.connected) 878 { 879 if(cmd == CommandType.QUIT) 880 return; // Don't bother reopening connection just to quit 881 882 _open = OpenState.notConnected; 883 connect(_clientCapabilities); 884 } 885 886 resetPacket(); 887 888 ubyte[] header; 889 header.length = 4 /*header*/ + 1 /*cmd*/; 890 header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/); 891 header[4] = cmd; 892 bumpPacket(); 893 894 send(header, cast(const(ubyte)[])data); 895 } 896 897 OKErrorPacket getCmdResponse(bool asString = false) 898 { 899 auto okp = OKErrorPacket(getPacket()); 900 enforcePacketOK(okp); 901 _serverStatus = okp.serverStatus; 902 return okp; 903 } 904 905 ubyte[] buildAuthPacket(ubyte[] token) 906 in 907 { 908 assert(token.length == 20); 909 } 910 body 911 { 912 ubyte[] packet; 913 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1); 914 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 915 916 // NOTE: we'll set the header last when we know the size 917 918 // Set the default capabilities required by the client 919 _cCaps.packInto(packet[4..8]); 920 921 // Request a conventional maximum packet length. 922 1.packInto(packet[8..12]); 923 924 packet ~= 33; // Set UTF-8 as default charSet 925 926 // There's a statutory block of zero bytes here - fill them in. 927 foreach(i; 0 .. 23) 928 packet ~= 0; 929 930 // Add the user name as a null terminated string 931 foreach(i; 0 .. _user.length) 932 packet ~= _user[i]; 933 packet ~= 0; // \0 934 935 // Add our calculated authentication token as a length prefixed string. 936 assert(token.length <= ubyte.max); 937 if(_pwd.length == 0) // Omit the token if the account has no password 938 packet ~= 0; 939 else 940 { 941 packet ~= cast(ubyte)token.length; 942 foreach(i; 0 .. token.length) 943 packet ~= token[i]; 944 } 945 946 // Add the default database as a null terminated string 947 foreach(i; 0 .. _db.length) 948 packet ~= _db[i]; 949 packet ~= 0; // \0 950 951 // The server sent us a greeting with packet number 0, so we send the auth packet 952 // back with the next number. 953 packet.setPacketHeader(pktNumber); 954 bumpPacket(); 955 return packet; 956 } 957 958 void consumeServerInfo(ref ubyte[] packet) 959 { 960 scope(failure) kill(); 961 962 _sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 963 _sCharSet = packet.consume!ubyte(); // server_language 964 _serverStatus = packet.consume!ushort(); //server_status 965 _sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 966 _sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 967 968 enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 969 enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 970 } 971 972 ubyte[] parseGreeting() 973 { 974 scope(failure) kill(); 975 976 ubyte[] packet = getPacket(); 977 978 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 979 { 980 auto okp = OKErrorPacket(packet); 981 enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 982 } 983 984 _protocol = packet.consume!ubyte(); 985 986 _serverVersion = packet.consume!string(packet.countUntil(0)); 987 packet.skip(1); // \0 terminated _serverVersion 988 989 _sThread = packet.consume!uint(); 990 991 // read first part of scramble buf 992 ubyte[] authBuf; 993 authBuf.length = 255; 994 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 995 996 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 997 998 consumeServerInfo(packet); 999 1000 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 1001 packet.skip(10); // filler of \0 1002 1003 // rest of the scramble 1004 auto len = packet.countUntil(0); 1005 enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 1006 enforce(authBuf.length > 8+len); 1007 authBuf[8..8+len] = packet.consume(len)[]; 1008 authBuf.length = 8+len; // cut to correct size 1009 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 1010 1011 return authBuf; 1012 } 1013 1014 static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port) 1015 { 1016 auto s = new PlainPhobosSocket(); 1017 s.connect(new InternetAddress(host, port)); 1018 return s; 1019 } 1020 1021 static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port) 1022 { 1023 version(Have_vibe_d_core) 1024 return vibe.core.net.connectTCP(host, port); 1025 else 1026 assert(0); 1027 } 1028 1029 void initConnection() 1030 { 1031 resetPacket(); 1032 final switch(_socketType) 1033 { 1034 case MySQLSocketType.phobos: 1035 _socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port)); 1036 break; 1037 1038 case MySQLSocketType.vibed: 1039 version(Have_vibe_d_core) { 1040 _socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port)); 1041 break; 1042 } else assert(0, "Unsupported socket type. Need version Have_vibe_d_core."); 1043 } 1044 } 1045 1046 ubyte[] makeToken(ubyte[] authBuf) 1047 { 1048 auto pass1 = sha1Of(cast(const(ubyte)[])_pwd); 1049 auto pass2 = sha1Of(pass1); 1050 1051 SHA1 sha1; 1052 sha1.start(); 1053 sha1.put(authBuf); 1054 sha1.put(pass2); 1055 auto result = sha1.finish(); 1056 foreach (size_t i; 0..20) 1057 result[i] = result[i] ^ pass1[i]; 1058 return result.dup; 1059 } 1060 1061 SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 1062 { 1063 SvrCapFlags common; 1064 uint filter = 1; 1065 foreach (size_t i; 0..uint.sizeof) 1066 { 1067 bool serverSupport = (server & filter) != 0; // can the server do this capability? 1068 bool clientSupport = (client & filter) != 0; // can we support it? 1069 if(serverSupport && clientSupport) 1070 common |= filter; 1071 filter <<= 1; // check next flag 1072 } 1073 return common; 1074 } 1075 1076 void setClientFlags(SvrCapFlags capFlags) 1077 { 1078 _cCaps = getCommonCapabilities(_sCaps, capFlags); 1079 1080 // We cannot operate in <4.1 protocol, so we'll force it even if the user 1081 // didn't supply it 1082 _cCaps |= SvrCapFlags.PROTOCOL41; 1083 _cCaps |= SvrCapFlags.SECURE_CONNECTION; 1084 } 1085 1086 void authenticate(ubyte[] greeting) 1087 in 1088 { 1089 assert(_open == OpenState.connected); 1090 } 1091 out 1092 { 1093 assert(_open == OpenState.authenticated); 1094 } 1095 body 1096 { 1097 auto token = makeToken(greeting); 1098 auto authPacket = buildAuthPacket(token); 1099 send(authPacket); 1100 1101 auto packet = getPacket(); 1102 auto okp = OKErrorPacket(packet); 1103 enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 1104 _open = OpenState.authenticated; 1105 } 1106 1107 SvrCapFlags _clientCapabilities; 1108 1109 void connect(SvrCapFlags clientCapabilities) 1110 in 1111 { 1112 assert(closed); 1113 } 1114 out 1115 { 1116 assert(_open == OpenState.authenticated); 1117 } 1118 body 1119 { 1120 initConnection(); 1121 auto greeting = parseGreeting(); 1122 _open = OpenState.connected; 1123 1124 _clientCapabilities = clientCapabilities; 1125 setClientFlags(clientCapabilities); 1126 authenticate(greeting); 1127 } 1128 1129 /// Forcefully close the socket without sending the quit command. 1130 /// Needed in case an error leaves communatations in an undefined or non-recoverable state. 1131 void kill() 1132 { 1133 if(_socket.connected) 1134 _socket.close(); 1135 _open = OpenState.notConnected; 1136 // any pending data is gone. Any statements to release will be released 1137 // on the server automatically. 1138 _headersPending = _rowsPending = _binaryPending = false; 1139 1140 static if(__traits(compiles, (){ int[int] aa; aa.clear(); })) 1141 preparedLookup.clear(); 1142 else 1143 preparedLookup = null; 1144 } 1145 1146 /// Called whenever mysql-native needs to send a command to the server 1147 /// and be sure there aren't any pending results (which would prevent 1148 /// a new command from being sent). 1149 void autoPurge() 1150 { 1151 // This is called every time a command is sent, 1152 // so detect & prevent infinite recursion. 1153 static bool isAutoPurging = false; 1154 1155 if(isAutoPurging) 1156 return; 1157 1158 isAutoPurging = true; 1159 scope(exit) isAutoPurging = false; 1160 1161 try 1162 { 1163 purgeResult(); 1164 releaseQueued(); 1165 } 1166 catch(Exception e) 1167 { 1168 // likely the connection was closed, so reset any state. 1169 // Don't treat this as a real error, because everything will be reset when we 1170 // reconnect. 1171 kill(); 1172 } 1173 } 1174 1175 /// Lookup per-connection prepared statement info by SQL 1176 PreparedServerInfo[string] preparedLookup; 1177 1178 /// Set `queuedForRelease` flag for a statement in `preparedLookup`. 1179 /// Does nothing if statement not in `preparedLookup`. 1180 private void setQueuedForRelease(string sql, bool value) 1181 { 1182 if(sql in preparedLookup) 1183 { 1184 auto info = preparedLookup[sql]; 1185 info.queuedForRelease = value; 1186 preparedLookup[sql] = info; 1187 } 1188 } 1189 1190 /// Queue a prepared statement for release. 1191 void queueForRelease(string sql) 1192 { 1193 // If connection's closed, then it IS released. 1194 if(closed) 1195 return; 1196 1197 setQueuedForRelease(sql, true); 1198 } 1199 1200 /// Remove a statement from the queue to be released. 1201 void unqueueForRelease(string sql) 1202 { 1203 setQueuedForRelease(sql, false); 1204 } 1205 1206 /// Releases all prepared statements that are queued for release. 1207 void releaseQueued() 1208 { 1209 foreach(sql, info; preparedLookup) 1210 if(info.queuedForRelease) 1211 { 1212 immediateReleasePrepared(info.statementId); 1213 preparedLookup.remove(sql); 1214 } 1215 } 1216 1217 /// Returns null if not found 1218 Nullable!PreparedServerInfo getPreparedServerInfo(const string sql) pure nothrow 1219 { 1220 Nullable!PreparedServerInfo result; 1221 1222 auto pInfo = sql in preparedLookup; 1223 if(pInfo) 1224 result = *pInfo; 1225 1226 return result; 1227 } 1228 1229 /// If already registered, simply returns the cached `PreparedServerInfo`. 1230 PreparedServerInfo registerIfNeeded(string sql) 1231 out(info) 1232 { 1233 // I'm confident this can't currently happen, but 1234 // let's make sure that doesn't change. 1235 assert(!info.queuedForRelease); 1236 } 1237 body 1238 { 1239 if(auto pInfo = sql in preparedLookup) 1240 { 1241 // The statement is registered. It may, or may not, be queued 1242 // for release. Either way, all we need to do is make sure it's 1243 // un-queued and then return. 1244 pInfo.queuedForRelease = false; 1245 return *pInfo; 1246 } 1247 1248 auto info = registerIfNeededImpl(sql); 1249 preparedLookup[sql] = info; 1250 1251 return info; 1252 } 1253 1254 PreparedServerInfo registerIfNeededImpl(string sql) 1255 { 1256 scope(failure) kill(); 1257 1258 PreparedServerInfo info; 1259 1260 sendCmd(CommandType.STMT_PREPARE, sql); 1261 _fieldCount = 0; 1262 1263 //TODO: All packet handling should be moved into the mysql.protocol package. 1264 ubyte[] packet = getPacket(); 1265 if(packet.front == ResultPacketMarker.ok) 1266 { 1267 packet.popFront(); 1268 info.statementId = packet.consume!int(); 1269 _fieldCount = packet.consume!short(); 1270 info.numParams = packet.consume!short(); 1271 1272 packet.popFront(); // one byte filler 1273 info.psWarnings = packet.consume!short(); 1274 1275 // At this point the server also sends field specs for parameters 1276 // and columns if there were any of each 1277 info.headers = PreparedStmtHeaders(this, _fieldCount, info.numParams); 1278 } 1279 else if(packet.front == ResultPacketMarker.error) 1280 { 1281 auto error = OKErrorPacket(packet); 1282 enforcePacketOK(error); 1283 assert(0); // FIXME: what now? 1284 } 1285 else 1286 assert(0); // FIXME: what now? 1287 1288 return info; 1289 } 1290 1291 private void immediateReleasePrepared(uint statementId) 1292 { 1293 scope(failure) kill(); 1294 1295 if(closed()) 1296 return; 1297 1298 //TODO: All low-level commms should be moved into the mysql.protocol package. 1299 ubyte[9] packet_buf; 1300 ubyte[] packet = packet_buf; 1301 packet.setPacketHeader(0/*packet number*/); 1302 bumpPacket(); 1303 packet[4] = CommandType.STMT_CLOSE; 1304 statementId.packInto(packet[5..9]); 1305 purgeResult(); 1306 send(packet); 1307 // It seems that the server does not find it necessary to send a response 1308 // for this command. 1309 } 1310 1311 public: 1312 1313 /++ 1314 Construct opened connection. 1315 1316 Throws `mysql.exceptions.MYX` upon failure to connect. 1317 1318 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 1319 creating a new Connection directly. That will provide certain benefits, 1320 such as reusing old connections and automatic cleanup (no need to close 1321 the connection when done). 1322 1323 ------------------ 1324 // Suggested usage: 1325 1326 { 1327 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 1328 scope(exit) con.close(); 1329 1330 // Use the connection 1331 ... 1332 } 1333 ------------------ 1334 1335 Params: 1336 cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld" 1337 (TODO: The connection string needs work to allow for semicolons in its parts!) 1338 socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos, 1339 unless compiled with `-version=Have_vibe_d_core` (set automatically 1340 if using $(LINK2 http://code.dlang.org/getting_started, DUB)). 1341 openSocket = Optional callback which should return a newly-opened Phobos 1342 or Vibe.d TCP socket. This allows custom sockets to be used, 1343 subclassed from Phobos's or Vibe.d's sockets. 1344 host = An IP address in numeric dotted form, or as a host name. 1345 user = The user name to authenticate. 1346 password = User's password. 1347 db = Desired initial database. 1348 capFlags = The set of flag bits from the server's capabilities that the client requires 1349 +/ 1350 //After the connection is created, and the initial invitation is received from the server 1351 //client preferences can be set, and authentication can then be attempted. 1352 this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1353 { 1354 version(Have_vibe_d_core) 1355 enum defaultSocketType = MySQLSocketType.vibed; 1356 else 1357 enum defaultSocketType = MySQLSocketType.phobos; 1358 1359 this(defaultSocketType, host, user, pwd, db, port, capFlags); 1360 } 1361 1362 ///ditto 1363 this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1364 { 1365 version(Have_vibe_d_core) {} else 1366 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 1367 1368 this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD, 1369 host, user, pwd, db, port, capFlags); 1370 } 1371 1372 ///ditto 1373 this(OpenSocketCallbackPhobos openSocket, 1374 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1375 { 1376 this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags); 1377 } 1378 1379 version(Have_vibe_d_core) 1380 ///ditto 1381 this(OpenSocketCallbackVibeD openSocket, 1382 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1383 { 1384 this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags); 1385 } 1386 1387 ///ditto 1388 private this(MySQLSocketType socketType, 1389 OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD, 1390 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1391 in 1392 { 1393 final switch(socketType) 1394 { 1395 case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break; 1396 case MySQLSocketType.vibed: assert(openSocketVibeD !is null); break; 1397 } 1398 } 1399 body 1400 { 1401 enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1"); 1402 enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection"); 1403 version(Have_vibe_d_core) {} else 1404 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 1405 1406 _socketType = socketType; 1407 _host = host; 1408 _user = user; 1409 _pwd = pwd; 1410 _db = db; 1411 _port = port; 1412 1413 _openSocketPhobos = openSocketPhobos; 1414 _openSocketVibeD = openSocketVibeD; 1415 1416 connect(capFlags); 1417 } 1418 1419 ///ditto 1420 //After the connection is created, and the initial invitation is received from the server 1421 //client preferences can be set, and authentication can then be attempted. 1422 this(string cs, SvrCapFlags capFlags = defaultClientFlags) 1423 { 1424 string[] a = parseConnectionString(cs); 1425 this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1426 } 1427 1428 ///ditto 1429 this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags) 1430 { 1431 string[] a = parseConnectionString(cs); 1432 this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1433 } 1434 1435 ///ditto 1436 this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 1437 { 1438 string[] a = parseConnectionString(cs); 1439 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1440 } 1441 1442 version(Have_vibe_d_core) 1443 ///ditto 1444 this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 1445 { 1446 string[] a = parseConnectionString(cs); 1447 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1448 } 1449 1450 /++ 1451 Check whether this `Connection` is still connected to the server, or if 1452 the connection has been closed. 1453 +/ 1454 @property bool closed() 1455 { 1456 return _open == OpenState.notConnected || !_socket.connected; 1457 } 1458 1459 version(Have_vibe_d_core) 1460 { 1461 /// Used by Vibe.d's ConnectionPool, ignore this. 1462 void acquire() { if( _socket ) _socket.acquire(); } 1463 ///ditto 1464 void release() { if( _socket ) _socket.release(); } 1465 ///ditto 1466 bool isOwner() { return _socket ? _socket.isOwner() : false; } 1467 ///ditto 1468 bool amOwner() { return _socket ? _socket.isOwner() : false; } 1469 } 1470 else 1471 { 1472 /// Used by Vibe.d's ConnectionPool, ignore this. 1473 void acquire() { /+ Do nothing +/ } 1474 ///ditto 1475 void release() { /+ Do nothing +/ } 1476 ///ditto 1477 bool isOwner() { return !!_socket; } 1478 ///ditto 1479 bool amOwner() { return !!_socket; } 1480 } 1481 1482 /++ 1483 Explicitly close the connection. 1484 1485 This is a two-stage process. First tell the server we are quitting this 1486 connection, and then close the socket. 1487 1488 Idiomatic use as follows is suggested: 1489 ------------------ 1490 { 1491 auto con = new Connection("localhost:user:password:mysqld"); 1492 scope(exit) con.close(); 1493 // Use the connection 1494 ... 1495 } 1496 ------------------ 1497 +/ 1498 void close() 1499 { 1500 if (_open == OpenState.authenticated && _socket.connected) 1501 quit(); 1502 1503 if (_open == OpenState.connected) 1504 kill(); 1505 resetPacket(); 1506 } 1507 1508 /++ 1509 Reconnects to the server using the same connection settings originally 1510 used to create the `Connection`. 1511 1512 Optionally takes a `mysql.protocol.constants.SvrCapFlags`, allowing you to 1513 reconnect using a different set of server capability flags. 1514 1515 Normally, if the connection is already open, this will do nothing. However, 1516 if you request a different set of `mysql.protocol.constants.SvrCapFlags` 1517 then was originally used to create the `Connection`, the connection will 1518 be closed and then reconnected using the new `mysql.protocol.constants.SvrCapFlags`. 1519 +/ 1520 void reconnect() 1521 { 1522 reconnect(_clientCapabilities); 1523 } 1524 1525 ///ditto 1526 void reconnect(SvrCapFlags clientCapabilities) 1527 { 1528 bool sameCaps = clientCapabilities == _clientCapabilities; 1529 if(!closed) 1530 { 1531 // Same caps as before? 1532 if(clientCapabilities == _clientCapabilities) 1533 return; // Nothing to do, just keep current connection 1534 1535 close(); 1536 } 1537 1538 connect(clientCapabilities); 1539 } 1540 1541 private void quit() 1542 in 1543 { 1544 assert(_open == OpenState.authenticated); 1545 } 1546 body 1547 { 1548 sendCmd(CommandType.QUIT, []); 1549 // No response is sent for a quit packet 1550 _open = OpenState.connected; 1551 } 1552 1553 /++ 1554 Parses a connection string of the form 1555 `"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"` 1556 1557 Port is optional and defaults to 3306. 1558 1559 Whitespace surrounding any name or value is automatically stripped. 1560 1561 Returns a five-element array of strings in this order: 1562 $(UL 1563 $(LI [0]: host) 1564 $(LI [1]: user) 1565 $(LI [2]: pwd) 1566 $(LI [3]: db) 1567 $(LI [4]: port) 1568 ) 1569 1570 (TODO: The connection string needs work to allow for semicolons in its parts!) 1571 +/ 1572 //TODO: Replace the return value with a proper struct. 1573 static string[] parseConnectionString(string cs) 1574 { 1575 string[] rv; 1576 rv.length = 5; 1577 rv[4] = "3306"; // Default port 1578 string[] a = split(cs, ";"); 1579 foreach (s; a) 1580 { 1581 string[] a2 = split(s, "="); 1582 enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs); 1583 string name = strip(a2[0]); 1584 string val = strip(a2[1]); 1585 switch (name) 1586 { 1587 case "host": 1588 rv[0] = val; 1589 break; 1590 case "user": 1591 rv[1] = val; 1592 break; 1593 case "pwd": 1594 rv[2] = val; 1595 break; 1596 case "db": 1597 rv[3] = val; 1598 break; 1599 case "port": 1600 rv[4] = val; 1601 break; 1602 default: 1603 throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__); 1604 } 1605 } 1606 return rv; 1607 } 1608 1609 /++ 1610 Select a current database. 1611 1612 Throws `mysql.exceptions.MYX` upon failure. 1613 1614 Params: dbName = Name of the requested database 1615 +/ 1616 void selectDB(string dbName) 1617 { 1618 sendCmd(CommandType.INIT_DB, dbName); 1619 getCmdResponse(); 1620 _db = dbName; 1621 } 1622 1623 /++ 1624 Check the server status. 1625 1626 Throws `mysql.exceptions.MYX` upon failure. 1627 1628 Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined 1629 +/ 1630 OKErrorPacket pingServer() 1631 { 1632 sendCmd(CommandType.PING, []); 1633 return getCmdResponse(); 1634 } 1635 1636 /++ 1637 Refresh some feature(s) of the server. 1638 1639 Throws `mysql.exceptions.MYX` upon failure. 1640 1641 Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined 1642 +/ 1643 OKErrorPacket refreshServer(RefreshFlags flags) 1644 { 1645 sendCmd(CommandType.REFRESH, [flags]); 1646 return getCmdResponse(); 1647 } 1648 1649 /++ 1650 Internal - Get the next `mysql.result.Row` of a pending result set. 1651 1652 This is intended to be internal, you should not use it directly. 1653 It will not likely remain public in the future. 1654 1655 Returns: A `mysql.result.Row` object. 1656 +/ 1657 Row getNextRow() 1658 { 1659 scope(failure) kill(); 1660 1661 if (_headersPending) 1662 { 1663 _rsh = ResultSetHeaders(this, _fieldCount); 1664 _headersPending = false; 1665 } 1666 ubyte[] packet; 1667 Row rr; 1668 packet = getPacket(); 1669 if (packet.isEOFPacket()) 1670 { 1671 _rowsPending = _binaryPending = false; 1672 return rr; 1673 } 1674 if (_binaryPending) 1675 rr = Row(this, packet, _rsh, true); 1676 else 1677 rr = Row(this, packet, _rsh, false); 1678 //rr._valid = true; 1679 return rr; 1680 } 1681 1682 /++ 1683 Flush any outstanding result set elements. 1684 1685 When the server responds to a command that produces a result set, it 1686 queues the whole set of corresponding packets over the current connection. 1687 Before that `Connection` can embark on any new command, it must receive 1688 all of those packets and junk them. 1689 1690 As of v1.1.4, this is done automatically as needed. But you can still 1691 call this manually to force a purge to occur when you want. 1692 1693 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/) 1694 +/ 1695 ulong purgeResult() 1696 { 1697 scope(failure) kill(); 1698 1699 _lastCommandID++; 1700 1701 ulong rows = 0; 1702 if (_headersPending) 1703 { 1704 for (size_t i = 0;; i++) 1705 { 1706 if (getPacket().isEOFPacket()) 1707 { 1708 _headersPending = false; 1709 break; 1710 } 1711 enforceEx!MYXProtocol(i < _fieldCount, 1712 text("Field header count (", _fieldCount, ") exceeded but no EOF packet found.")); 1713 } 1714 } 1715 if (_rowsPending) 1716 { 1717 for (;; rows++) 1718 { 1719 if (getPacket().isEOFPacket()) 1720 { 1721 _rowsPending = _binaryPending = false; 1722 break; 1723 } 1724 } 1725 } 1726 resetPacket(); 1727 return rows; 1728 } 1729 1730 /++ 1731 Get a textual report on the server status. 1732 1733 (COM_STATISTICS) 1734 +/ 1735 string serverStats() 1736 { 1737 sendCmd(CommandType.STATISTICS, []); 1738 return cast(string) getPacket(); 1739 } 1740 1741 /++ 1742 Enable multiple statement commands. 1743 1744 This can be used later if this feature was not requested in the client capability flags. 1745 1746 Warning: This functionality is currently untested. 1747 1748 Params: on = Boolean value to turn the capability on or off. 1749 +/ 1750 void enableMultiStatements(bool on) 1751 { 1752 scope(failure) kill(); 1753 1754 ubyte[] t; 1755 t.length = 2; 1756 t[0] = on ? 0 : 1; 1757 t[1] = 0; 1758 sendCmd(CommandType.STMT_OPTION, t); 1759 1760 // For some reason this command gets an EOF packet as response 1761 auto packet = getPacket(); 1762 enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1763 } 1764 1765 /// Return the in-force protocol number. 1766 @property ubyte protocol() pure const nothrow { return _protocol; } 1767 /// Server version 1768 @property string serverVersion() pure const nothrow { return _serverVersion; } 1769 /// Server capability flags 1770 @property uint serverCapabilities() pure const nothrow { return _sCaps; } 1771 /// Server status 1772 @property ushort serverStatus() pure const nothrow { return _serverStatus; } 1773 /// Current character set 1774 @property ubyte charSet() pure const nothrow { return _sCharSet; } 1775 /// Current database 1776 @property string currentDB() pure const nothrow { return _db; } 1777 /// Socket type being used, Phobos or Vibe.d 1778 @property MySQLSocketType socketType() pure const nothrow { return _socketType; } 1779 1780 /// After a command that inserted a row into a table with an auto-increment 1781 /// ID column, this method allows you to retrieve the last insert ID. 1782 @property ulong lastInsertID() pure const nothrow { return _insertID; } 1783 1784 /// This gets incremented every time a command is issued or results are purged, 1785 /// so a `mysql.result.ResultRange` can tell whether it's been invalidated. 1786 @property ulong lastCommandID() pure const nothrow { return _lastCommandID; } 1787 1788 /// Gets whether rows are pending. 1789 /// 1790 /// Note, you may want `hasPending` instead. 1791 @property bool rowsPending() pure const nothrow { return _rowsPending; } 1792 1793 /// Gets whether anything (rows, headers or binary) is pending. 1794 /// New commands cannot be sent on a conncection while anything is pending 1795 /// (the pending data will automatically be purged.) 1796 @property bool hasPending() pure const nothrow 1797 { 1798 return _rowsPending || _headersPending || _binaryPending; 1799 } 1800 1801 /// Gets the result header's field descriptions. 1802 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 1803 1804 /++ 1805 Manually register a prepared statement on this connection. 1806 1807 Does nothing if statement is already registered on this connection. 1808 1809 Calling this is not strictly necessary, as the prepared statement will 1810 automatically be registered upon its first use on any `Connection`. 1811 This is provided for those who prefer eager registration over lazy 1812 for performance reasons. 1813 +/ 1814 void register(Prepared prepared) 1815 { 1816 registerIfNeeded(prepared.sql); 1817 } 1818 1819 /++ 1820 Manually release a prepared statement on this connection. 1821 1822 This method tells the server that it can dispose of the information it 1823 holds about the current prepared statement. 1824 1825 Calling this is not strictly necessary. The server considers prepared 1826 statements to be per-connection, so they'll go away when the connection 1827 closes anyway. This is provided in case direct control is actually needed. 1828 1829 Due to the internal "queued for release" system, this MAY CAUSE ALLOCATIONS, 1830 and therefore CANNOT BE CALLED SAFELY FROM A DESTRUCTOR in case the 1831 destructor gets triggered during a GC cycle. See issue 1832 $(LINK2 https://github.com/mysql-d/mysql-native/issues/159, #159) 1833 for details of this problem. 1834 1835 Notes: 1836 1837 In actuality, the server might not immediately be told to release the 1838 statement (although `isRegistered` will still report `false`). 1839 1840 This is because there could be a `mysql.result.ResultRange` with results 1841 still pending for retrieval, and the protocol doesn't allow sending commands 1842 (such as "release a prepared statement") to the server while data is pending. 1843 Therefore, this function may instead queue the statement to be released 1844 when it is safe to do so: Either the next time a result set is purged or 1845 the next time a command (such as `mysql.commands.query` or 1846 `mysql.commands.exec`) is performed (because such commands automatically 1847 purge any pending results). 1848 1849 This function does NOT auto-purge because, if this is ever called from 1850 automatic resource management cleanup (refcounting, RAII, etc), that 1851 would create ugly situations where hidden, implicit behavior triggers 1852 an unexpected auto-purge. 1853 +/ 1854 void release(Prepared prepared) 1855 { 1856 release(prepared.sql); 1857 } 1858 1859 ///ditto 1860 void release(string sql) 1861 { 1862 //TODO: Don't queue it if nothing is pending. Just do it immediately. 1863 // But need to be certain both situations are unittested. 1864 queueForRelease(sql); 1865 } 1866 1867 /// Is the given SQL registered on this connection as a prepared statement? 1868 bool isRegistered(Prepared prepared) 1869 { 1870 return isRegistered( prepared.sql ); 1871 } 1872 1873 ///ditto 1874 bool isRegistered(string sql) 1875 { 1876 return isRegistered( getPreparedServerInfo(sql) ); 1877 } 1878 1879 ///ditto 1880 package bool isRegistered(Nullable!PreparedServerInfo info) 1881 { 1882 return !info.isNull && !info.queuedForRelease; 1883 } 1884 } 1885 1886 // Test register, release, isRegistered, and auto-register for prepared statements 1887 debug(MYSQLN_TESTS) 1888 unittest 1889 { 1890 import mysql.connection; 1891 import mysql.test.common; 1892 1893 Prepared preparedInsert; 1894 Prepared preparedSelect; 1895 immutable insertSQL = "INSERT INTO `autoRegistration` VALUES (1), (2)"; 1896 immutable selectSQL = "SELECT `val` FROM `autoRegistration`"; 1897 int queryTupleResult; 1898 1899 { 1900 mixin(scopedCn); 1901 1902 // Setup 1903 cn.exec("DROP TABLE IF EXISTS `autoRegistration`"); 1904 cn.exec("CREATE TABLE `autoRegistration` ( 1905 `val` INTEGER 1906 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 1907 1908 // Initial register 1909 preparedInsert = cn.prepare(insertSQL); 1910 preparedSelect = cn.prepare(selectSQL); 1911 1912 // Test basic register, release, isRegistered 1913 assert(cn.isRegistered(preparedInsert)); 1914 assert(cn.isRegistered(preparedSelect)); 1915 cn.release(preparedInsert); 1916 cn.release(preparedSelect); 1917 assert(!cn.isRegistered(preparedInsert)); 1918 assert(!cn.isRegistered(preparedSelect)); 1919 1920 // Test manual re-register 1921 cn.register(preparedInsert); 1922 cn.register(preparedSelect); 1923 assert(cn.isRegistered(preparedInsert)); 1924 assert(cn.isRegistered(preparedSelect)); 1925 1926 // Test double register 1927 cn.register(preparedInsert); 1928 cn.register(preparedSelect); 1929 assert(cn.isRegistered(preparedInsert)); 1930 assert(cn.isRegistered(preparedSelect)); 1931 1932 // Test double release 1933 cn.release(preparedInsert); 1934 cn.release(preparedSelect); 1935 assert(!cn.isRegistered(preparedInsert)); 1936 assert(!cn.isRegistered(preparedSelect)); 1937 cn.release(preparedInsert); 1938 cn.release(preparedSelect); 1939 assert(!cn.isRegistered(preparedInsert)); 1940 assert(!cn.isRegistered(preparedSelect)); 1941 } 1942 1943 // Note that at this point, both prepared statements still exist, 1944 // but are no longer registered on any connection. In fact, there 1945 // are no open connections anymore. 1946 1947 // Test auto-register: exec 1948 { 1949 mixin(scopedCn); 1950 1951 assert(!cn.isRegistered(preparedInsert)); 1952 cn.exec(preparedInsert); 1953 assert(cn.isRegistered(preparedInsert)); 1954 } 1955 1956 // Test auto-register: query 1957 { 1958 mixin(scopedCn); 1959 1960 assert(!cn.isRegistered(preparedSelect)); 1961 cn.query(preparedSelect).each(); 1962 assert(cn.isRegistered(preparedSelect)); 1963 } 1964 1965 // Test auto-register: queryRow 1966 { 1967 mixin(scopedCn); 1968 1969 assert(!cn.isRegistered(preparedSelect)); 1970 cn.queryRow(preparedSelect); 1971 assert(cn.isRegistered(preparedSelect)); 1972 } 1973 1974 // Test auto-register: queryRowTuple 1975 { 1976 mixin(scopedCn); 1977 1978 assert(!cn.isRegistered(preparedSelect)); 1979 cn.queryRowTuple(preparedSelect, queryTupleResult); 1980 assert(cn.isRegistered(preparedSelect)); 1981 } 1982 1983 // Test auto-register: queryValue 1984 { 1985 mixin(scopedCn); 1986 1987 assert(!cn.isRegistered(preparedSelect)); 1988 cn.queryValue(preparedSelect); 1989 assert(cn.isRegistered(preparedSelect)); 1990 } 1991 } 1992 1993 // An attempt to reproduce issue #81: Using mysql-native driver with no default database 1994 // I'm unable to actually reproduce the error, though. 1995 debug(MYSQLN_TESTS) 1996 unittest 1997 { 1998 import mysql.escape; 1999 mixin(scopedCn); 2000 2001 cn.exec("DROP TABLE IF EXISTS `issue81`"); 2002 cn.exec("CREATE TABLE `issue81` (a INTEGER) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 2003 cn.exec("INSERT INTO `issue81` (a) VALUES (1)"); 2004 2005 auto cn2 = new Connection(text("host=", cn._host, ";port=", cn._port, ";user=", cn._user, ";pwd=", cn._pwd)); 2006 scope(exit) cn2.close(); 2007 2008 cn2.query("SELECT * FROM `"~mysqlEscape(cn._db).text~"`.`issue81`"); 2009 } 2010 2011 // Regression test for Issue #154: 2012 // autoPurge can throw an exception if the socket was closed without purging 2013 // 2014 // This simulates a disconnect by closing the socket underneath the Connection 2015 // object itself. 2016 debug(MYSQLN_TESTS) 2017 unittest 2018 { 2019 mixin(scopedCn); 2020 2021 cn.exec("DROP TABLE IF EXISTS `dropConnection`"); 2022 cn.exec("CREATE TABLE `dropConnection` ( 2023 `val` INTEGER 2024 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 2025 cn.exec("INSERT INTO `dropConnection` VALUES (1), (2), (3)"); 2026 import mysql.prepared; 2027 { 2028 auto prep = cn.prepare("SELECT * FROM `dropConnection`"); 2029 cn.query(prep); 2030 } 2031 // close the socket forcibly 2032 cn._socket.close(); 2033 // this should still work (it should reconnect). 2034 cn.exec("DROP TABLE `dropConnection`"); 2035 } 2036 2037 /+ 2038 Test Prepared's ability to be safely refcount-released during a GC cycle 2039 (ie, `Connection.release` must not allocate GC memory). 2040 2041 While this test does succeed for me, it is currently disabled because it's 2042 not guaranteed to always work: 2043 2044 Queuing a prepared statement for release currently involves indexing an 2045 associative array (to access `Connection.preparedLookup[xx].queuedForRelease`). 2046 Attempts at @nogc-ing `Connection.release` revealed that, according to DMD: 2047 "indexing an associative array...may cause GC allocation". 2048 2049 Ultimately, to fix this, `Connection.release` must become @nogc, and the 2050 only ways I see to do that involve algorithmic time complexity that's 2051 just not worth the questionable benefit of releasing prepared statements 2052 within a connection's lifetime. 2053 2054 For more info, see issue #159: https://github.com/mysql-d/mysql-native/issues/159 2055 +/ 2056 version(none) 2057 debug(MYSQLN_TESTS) 2058 { 2059 /// Proof-of-concept ref-counted Prepared wrapper, just for testing, 2060 /// not really intended for actual use. 2061 private struct RCPreparedPayload 2062 { 2063 Prepared prepared; 2064 Connection conn; // Connection to be released from 2065 2066 alias prepared this; 2067 2068 @disable this(this); // not copyable 2069 ~this() 2070 { 2071 // There are a couple calls to this dtor where `conn` happens to be null. 2072 if(conn is null) 2073 return; 2074 2075 assert(conn.isRegistered(prepared)); 2076 conn.release(prepared); 2077 } 2078 } 2079 ///ditto 2080 alias RCPrepared = RefCounted!(RCPreparedPayload, RefCountedAutoInitialize.no); 2081 ///ditto 2082 private RCPrepared rcPrepare(Connection conn, string sql) 2083 { 2084 import std.algorithm.mutation : move; 2085 2086 auto prepared = conn.prepare(sql); 2087 auto payload = RCPreparedPayload(prepared, conn); 2088 return refCounted(move(payload)); 2089 } 2090 2091 unittest 2092 { 2093 import core.memory; 2094 mixin(scopedCn); 2095 2096 cn.exec("DROP TABLE IF EXISTS `rcPrepared`"); 2097 cn.exec("CREATE TABLE `rcPrepared` ( 2098 `val` INTEGER 2099 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 2100 cn.exec("INSERT INTO `rcPrepared` VALUES (1), (2), (3)"); 2101 2102 // Define this in outer scope to guarantee data is left pending when 2103 // RCPrepared's payload is collected. This will guarantee 2104 // that Connection will need to queue the release. 2105 ResultRange rows; 2106 2107 void bar() 2108 { 2109 class Foo { RCPrepared p; } 2110 auto foo = new Foo(); 2111 2112 auto rcStmt = cn.rcPrepare("SELECT * FROM `rcPrepared`"); 2113 foo.p = rcStmt; 2114 rows = cn.query(rcStmt); 2115 2116 /+ 2117 At this point, there are two references to the prepared statement: 2118 One in a `Foo` object (currently bound to `foo`), and one on the stack. 2119 2120 Returning from this function will destroy the one on the stack, 2121 and deterministically reduce the refcount to 1. 2122 2123 So, right here we set `foo` to null to *keep* the Foo object's 2124 reference to the prepared statement, but set adrift the Foo object 2125 itself, ready to be destroyed (along with the only remaining 2126 prepared statement reference it contains) by the next GC cycle. 2127 2128 Thus, `RCPreparedPayload.~this` and `Connection.release(Prepared)` 2129 will be executed during a GC cycle...and had better not perform 2130 any allocations, or else...boom! 2131 +/ 2132 foo = null; 2133 } 2134 2135 bar(); 2136 assert(cn.hasPending); // Ensure Connection is forced to queue the release. 2137 GC.collect(); // `Connection.release(Prepared)` better not be allocating, or boom! 2138 } 2139 }