1 /// Connect to a MySQL/MariaDB server. 2 module mysql.connection; 3 4 import std.algorithm; 5 import std.conv; 6 import std.digest.sha; 7 import std.exception; 8 import std.range; 9 import std.socket; 10 import std.string; 11 import std.typecons; 12 13 import mysql.commands; 14 import mysql.exceptions; 15 import mysql.prepared; 16 import mysql.protocol.constants; 17 import mysql.protocol.packets; 18 import mysql.protocol.sockets; 19 import mysql.result; 20 debug(MYSQLN_TESTS) 21 { 22 import mysql.test.common; 23 } 24 25 version(Have_vibe_d_core) 26 { 27 static if(__traits(compiles, (){ import vibe.core.net; } )) 28 import vibe.core.net; 29 else 30 static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'."); 31 } 32 33 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection. 34 immutable SvrCapFlags defaultClientFlags = 35 SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS | 36 SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 | 37 SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS | 38 //SvrCapFlags.MULTI_RESULTS; 39 40 /++ 41 Submit an SQL command to the server to be compiled into a prepared statement. 42 43 This will automatically register the prepared statement on the provided connection. 44 The resulting `mysql.prepared.Prepared` can then be used freely on ANY `Connection`, 45 as it will automatically be registered upon its first use on other connections. 46 Or, pass it to `Connection.register` if you prefer eager registration. 47 48 Internally, the result of a successful outcome will be a statement handle - an ID - 49 for the prepared statement, a count of the parameters required for 50 execution of the statement, and a count of the columns that will be present 51 in any result set that the command generates. 52 53 The server will then proceed to send prepared statement headers, 54 including parameter descriptions, and result set field descriptions, 55 followed by an EOF packet. 56 57 Throws: `mysql.exceptions.MYX` if the server has a problem. 58 +/ 59 Prepared prepare(Connection conn, string sql) 60 { 61 auto info = conn.registerIfNeeded(sql); 62 return Prepared(sql, info.headers, info.numParams); 63 } 64 65 /++ 66 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. 67 68 See `BackwardCompatPrepared` for more info. 69 +/ 70 deprecated("This is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. You should migrate from this to the Prepared-compatible exec/query overloads in 'mysql.commands'.") 71 BackwardCompatPrepared prepareBackwardCompat(Connection conn, string sql) 72 { 73 return BackwardCompatPrepared(conn, prepare(conn, sql)); 74 } 75 76 /++ 77 Convenience function to create a prepared statement which calls a stored function. 78 79 Be careful that your `numArgs` is correct. If it isn't, you may get a 80 `mysql.exceptions.MYX` with a very unclear error message. 81 82 Throws: `mysql.exceptions.MYX` if the server has a problem. 83 84 Params: 85 name = The name of the stored function. 86 numArgs = The number of arguments the stored procedure takes. 87 +/ 88 Prepared prepareFunction(Connection conn, string name, int numArgs) 89 { 90 auto sql = "select " ~ name ~ preparedPlaceholderArgs(numArgs); 91 return prepare(conn, sql); 92 } 93 94 /// 95 unittest 96 { 97 debug(MYSQLN_TESTS) 98 { 99 import mysql.test.common; 100 mixin(scopedCn); 101 102 exec(cn, `DROP FUNCTION IF EXISTS hello`); 103 exec(cn, ` 104 CREATE FUNCTION hello (s CHAR(20)) 105 RETURNS CHAR(50) DETERMINISTIC 106 RETURN CONCAT('Hello ',s,'!') 107 `); 108 109 auto preparedHello = prepareFunction(cn, "hello", 1); 110 preparedHello.setArgs("World"); 111 auto rs = cn.query(preparedHello).array; 112 assert(rs.length == 1); 113 assert(rs[0][0] == "Hello World!"); 114 } 115 } 116 117 /++ 118 Convenience function to create a prepared statement which calls a stored procedure. 119 120 OUT parameters are currently not supported. It should generally be 121 possible with MySQL to present them as a result set. 122 123 Be careful that your `numArgs` is correct. If it isn't, you may get a 124 `mysql.exceptions.MYX` with a very unclear error message. 125 126 Throws: `mysql.exceptions.MYX` if the server has a problem. 127 128 Params: 129 name = The name of the stored procedure. 130 numArgs = The number of arguments the stored procedure takes. 131 132 +/ 133 Prepared prepareProcedure(Connection conn, string name, int numArgs) 134 { 135 auto sql = "call " ~ name ~ preparedPlaceholderArgs(numArgs); 136 return prepare(conn, sql); 137 } 138 139 /// 140 unittest 141 { 142 debug(MYSQLN_TESTS) 143 { 144 import mysql.test.common; 145 import mysql.test.integration; 146 mixin(scopedCn); 147 initBaseTestTables(cn); 148 149 exec(cn, `DROP PROCEDURE IF EXISTS insert2`); 150 exec(cn, ` 151 CREATE PROCEDURE insert2 (IN p1 INT, IN p2 CHAR(50)) 152 BEGIN 153 INSERT INTO basetest (intcol, stringcol) VALUES(p1, p2); 154 END 155 `); 156 157 auto preparedInsert2 = prepareProcedure(cn, "insert2", 2); 158 preparedInsert2.setArgs(2001, "inserted string 1"); 159 cn.exec(preparedInsert2); 160 161 auto rs = query(cn, "SELECT stringcol FROM basetest WHERE intcol=2001").array; 162 assert(rs.length == 1); 163 assert(rs[0][0] == "inserted string 1"); 164 } 165 } 166 167 private string preparedPlaceholderArgs(int numArgs) 168 { 169 auto sql = "("; 170 bool comma = false; 171 foreach(i; 0..numArgs) 172 { 173 if (comma) 174 sql ~= ",?"; 175 else 176 { 177 sql ~= "?"; 178 comma = true; 179 } 180 } 181 sql ~= ")"; 182 183 return sql; 184 } 185 186 debug(MYSQLN_TESTS) 187 unittest 188 { 189 assert(preparedPlaceholderArgs(3) == "(?,?,?)"); 190 assert(preparedPlaceholderArgs(2) == "(?,?)"); 191 assert(preparedPlaceholderArgs(1) == "(?)"); 192 assert(preparedPlaceholderArgs(0) == "()"); 193 } 194 195 /// Per-connection info from the server about a registered prepared statement. 196 package struct PreparedServerInfo 197 { 198 /// Server's identifier for this prepared statement. 199 /// Apperently, this is never 0 if it's been registered, 200 /// although mysql-native no longer relies on that. 201 uint statementId; 202 203 ushort psWarnings; 204 205 /// Number of parameters this statement takes. 206 /// 207 /// This will be the same on all connections, but it's returned 208 /// by the server upon registration, so it's stored here. 209 ushort numParams; 210 211 /// Prepared statement headers 212 /// 213 /// This will be the same on all connections, but it's returned 214 /// by the server upon registration, so it's stored here. 215 PreparedStmtHeaders headers; 216 217 /// Not actually from the server. Connection uses this to keep track 218 /// of statements that should be treated as having been released. 219 bool queuedForRelease = false; 220 } 221 222 /++ 223 This is a wrapper over `mysql.prepared.Prepared`, provided ONLY as a 224 temporary aid in upgrading to mysql-native v2.0.0 and its 225 new connection-independent model of prepared statements. See the 226 $(LINK2 https://github.com/mysql-d/mysql-native/blob/master/MIGRATING_TO_V2.md, migration guide) 227 for more info. 228 229 In most cases, this layer shouldn't even be needed. But if you have many 230 lines of code making calls to exec/query the same prepared statement, 231 then this may be helpful. 232 233 To use this temporary compatability layer, change instances of: 234 235 --- 236 auto stmt = conn.prepare(...); 237 --- 238 239 to this: 240 241 --- 242 auto stmt = conn.prepareBackwardCompat(...); 243 --- 244 245 And then your prepared statement should work as before. 246 247 BUT DO NOT LEAVE IT LIKE THIS! Ultimately, you should update 248 your prepared statement code to the mysql-native v2.0.0 API, by changing 249 instances of: 250 251 --- 252 stmt.exec() 253 stmt.query() 254 stmt.queryRow() 255 stmt.queryRowTuple(outputArgs...) 256 stmt.queryValue() 257 --- 258 259 to this: 260 261 --- 262 conn.exec(stmt) 263 conn.query(stmt) 264 conn.queryRow(stmt) 265 conn.queryRowTuple(stmt, outputArgs...) 266 conn.queryValue(stmt) 267 --- 268 269 Both of the above syntaxes can be used with a `BackwardCompatPrepared` 270 (the `Connection` passed directly to `mysql.commands.exec`/`mysql.commands.query` 271 will override the one embedded associated with your `BackwardCompatPrepared`). 272 273 Once all of your code is updated, you can change `prepareBackwardCompat` 274 back to `prepare` again, and your upgrade will be complete. 275 +/ 276 struct BackwardCompatPrepared 277 { 278 import std.variant; 279 280 private Connection _conn; 281 Prepared _prepared; 282 283 /// Access underlying `Prepared` 284 @property Prepared prepared() { return _prepared; } 285 286 alias _prepared this; 287 288 /++ 289 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. 290 291 See `BackwardCompatPrepared` for more info. 292 +/ 293 deprecated("Change 'preparedStmt.exec()' to 'conn.exec(preparedStmt)'") 294 ulong exec() 295 { 296 return .exec(_conn, _prepared); 297 } 298 299 ///ditto 300 deprecated("Change 'preparedStmt.query()' to 'conn.query(preparedStmt)'") 301 ResultRange query(ColumnSpecialization[] csa = null) 302 { 303 return .query(_conn, _prepared, csa); 304 } 305 306 ///ditto 307 deprecated("Change 'preparedStmt.queryRow()' to 'conn.queryRow(preparedStmt)'") 308 Nullable!Row queryRow(ColumnSpecialization[] csa = null) 309 { 310 return .queryRow(_conn, _prepared, csa); 311 } 312 313 ///ditto 314 deprecated("Change 'preparedStmt.queryRowTuple(outArgs...)' to 'conn.queryRowTuple(preparedStmt, outArgs...)'") 315 void queryRowTuple(T...)(ref T args) if(T.length == 0 || !is(T[0] : Connection)) 316 { 317 return .queryRowTuple(_conn, _prepared, args); 318 } 319 320 ///ditto 321 deprecated("Change 'preparedStmt.queryValue()' to 'conn.queryValue(preparedStmt)'") 322 Nullable!Variant queryValue(ColumnSpecialization[] csa = null) 323 { 324 return .queryValue(_conn, _prepared, csa); 325 } 326 } 327 328 //TODO: All low-level commms should be moved into the mysql.protocol package. 329 /// Low-level comms code relating to prepared statements. 330 package struct ProtocolPrepared 331 { 332 import std.conv; 333 import std.datetime; 334 import std.variant; 335 import mysql.types; 336 337 static ubyte[] makeBitmap(in Variant[] inParams) 338 { 339 size_t bml = (inParams.length+7)/8; 340 ubyte[] bma; 341 bma.length = bml; 342 foreach (i; 0..inParams.length) 343 { 344 if(inParams[i].type != typeid(typeof(null))) 345 continue; 346 size_t bn = i/8; 347 size_t bb = i%8; 348 ubyte sr = 1; 349 sr <<= bb; 350 bma[bn] |= sr; 351 } 352 return bma; 353 } 354 355 static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow 356 { 357 ubyte[] prefix; 358 prefix.length = 14; 359 360 prefix[4] = CommandType.STMT_EXECUTE; 361 hStmt.packInto(prefix[5..9]); 362 prefix[9] = flags; // flags, no cursor 363 prefix[10] = 1; // iteration count - currently always 1 364 prefix[11] = 0; 365 prefix[12] = 0; 366 prefix[13] = 0; 367 368 return prefix; 369 } 370 371 //TODO: All low-level commms should be moved into the mysql.protocol package. 372 static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa, 373 out ubyte[] vals, out bool longData) 374 { 375 size_t pc = inParams.length; 376 ubyte[] types; 377 types.length = pc*2; 378 size_t alloc = pc*20; 379 vals.length = alloc; 380 uint vcl = 0, len; 381 int ct = 0; 382 383 void reAlloc(size_t n) 384 { 385 if (vcl+n < alloc) 386 return; 387 size_t inc = (alloc*3)/2; 388 if (inc < n) 389 inc = n; 390 alloc += inc; 391 vals.length = alloc; 392 } 393 394 foreach (size_t i; 0..pc) 395 { 396 enum UNSIGNED = 0x80; 397 enum SIGNED = 0; 398 if (psa[i].chunkSize) 399 longData= true; 400 if (inParams[i].type == typeid(typeof(null))) 401 { 402 types[ct++] = SQLType.NULL; 403 types[ct++] = SIGNED; 404 continue; 405 } 406 Variant v = inParams[i]; 407 SQLType ext = psa[i].type; 408 string ts = v.type.toString(); 409 bool isRef; 410 if (ts[$-1] == '*') 411 { 412 ts.length = ts.length-1; 413 isRef= true; 414 } 415 416 switch (ts) 417 { 418 case "bool": 419 if (ext == SQLType.INFER_FROM_D_TYPE) 420 types[ct++] = SQLType.BIT; 421 else 422 types[ct++] = cast(ubyte) ext; 423 types[ct++] = SIGNED; 424 reAlloc(2); 425 bool bv = isRef? *(v.get!(bool*)): v.get!(bool); 426 vals[vcl++] = 1; 427 vals[vcl++] = bv? 0x31: 0x30; 428 break; 429 case "byte": 430 types[ct++] = SQLType.TINY; 431 types[ct++] = SIGNED; 432 reAlloc(1); 433 vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte); 434 break; 435 case "ubyte": 436 types[ct++] = SQLType.TINY; 437 types[ct++] = UNSIGNED; 438 reAlloc(1); 439 vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte); 440 break; 441 case "short": 442 types[ct++] = SQLType.SHORT; 443 types[ct++] = SIGNED; 444 reAlloc(2); 445 short si = isRef? *(v.get!(short*)): v.get!(short); 446 vals[vcl++] = cast(ubyte) (si & 0xff); 447 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 448 break; 449 case "ushort": 450 types[ct++] = SQLType.SHORT; 451 types[ct++] = UNSIGNED; 452 reAlloc(2); 453 ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort); 454 vals[vcl++] = cast(ubyte) (us & 0xff); 455 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 456 break; 457 case "int": 458 types[ct++] = SQLType.INT; 459 types[ct++] = SIGNED; 460 reAlloc(4); 461 int ii = isRef? *(v.get!(int*)): v.get!(int); 462 vals[vcl++] = cast(ubyte) (ii & 0xff); 463 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 464 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 465 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 466 break; 467 case "uint": 468 types[ct++] = SQLType.INT; 469 types[ct++] = UNSIGNED; 470 reAlloc(4); 471 uint ui = isRef? *(v.get!(uint*)): v.get!(uint); 472 vals[vcl++] = cast(ubyte) (ui & 0xff); 473 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 474 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 475 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 476 break; 477 case "long": 478 types[ct++] = SQLType.LONGLONG; 479 types[ct++] = SIGNED; 480 reAlloc(8); 481 long li = isRef? *(v.get!(long*)): v.get!(long); 482 vals[vcl++] = cast(ubyte) (li & 0xff); 483 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 484 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 485 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 486 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 487 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 488 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 489 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 490 break; 491 case "ulong": 492 types[ct++] = SQLType.LONGLONG; 493 types[ct++] = UNSIGNED; 494 reAlloc(8); 495 ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong); 496 vals[vcl++] = cast(ubyte) (ul & 0xff); 497 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 498 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 499 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 500 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 501 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 502 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 503 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 504 break; 505 case "float": 506 types[ct++] = SQLType.FLOAT; 507 types[ct++] = SIGNED; 508 reAlloc(4); 509 float f = isRef? *(v.get!(float*)): v.get!(float); 510 ubyte* ubp = cast(ubyte*) &f; 511 vals[vcl++] = *ubp++; 512 vals[vcl++] = *ubp++; 513 vals[vcl++] = *ubp++; 514 vals[vcl++] = *ubp; 515 break; 516 case "double": 517 types[ct++] = SQLType.DOUBLE; 518 types[ct++] = SIGNED; 519 reAlloc(8); 520 double d = isRef? *(v.get!(double*)): v.get!(double); 521 ubyte* ubp = cast(ubyte*) &d; 522 vals[vcl++] = *ubp++; 523 vals[vcl++] = *ubp++; 524 vals[vcl++] = *ubp++; 525 vals[vcl++] = *ubp++; 526 vals[vcl++] = *ubp++; 527 vals[vcl++] = *ubp++; 528 vals[vcl++] = *ubp++; 529 vals[vcl++] = *ubp; 530 break; 531 case "std.datetime.date.Date": 532 case "std.datetime.Date": 533 types[ct++] = SQLType.DATE; 534 types[ct++] = SIGNED; 535 Date date = isRef? *(v.get!(Date*)): v.get!(Date); 536 ubyte[] da = pack(date); 537 size_t l = da.length; 538 reAlloc(l); 539 vals[vcl..vcl+l] = da[]; 540 vcl += l; 541 break; 542 case "std.datetime.TimeOfDay": 543 case "std.datetime.Time": 544 types[ct++] = SQLType.TIME; 545 types[ct++] = SIGNED; 546 TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay); 547 ubyte[] ta = pack(time); 548 size_t l = ta.length; 549 reAlloc(l); 550 vals[vcl..vcl+l] = ta[]; 551 vcl += l; 552 break; 553 case "std.datetime.date.DateTime": 554 case "std.datetime.DateTime": 555 types[ct++] = SQLType.DATETIME; 556 types[ct++] = SIGNED; 557 DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime); 558 ubyte[] da = pack(dt); 559 size_t l = da.length; 560 reAlloc(l); 561 vals[vcl..vcl+l] = da[]; 562 vcl += l; 563 break; 564 case "mysql.types.Timestamp": 565 types[ct++] = SQLType.TIMESTAMP; 566 types[ct++] = SIGNED; 567 Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp); 568 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 569 ubyte[] da = pack(dt); 570 size_t l = da.length; 571 reAlloc(l); 572 vals[vcl..vcl+l] = da[]; 573 vcl += l; 574 break; 575 case "immutable(char)[]": 576 if (ext == SQLType.INFER_FROM_D_TYPE) 577 types[ct++] = SQLType.VARCHAR; 578 else 579 types[ct++] = cast(ubyte) ext; 580 types[ct++] = SIGNED; 581 string s = isRef? *(v.get!(string*)): v.get!(string); 582 ubyte[] packed = packLCS(cast(void[]) s); 583 reAlloc(packed.length); 584 vals[vcl..vcl+packed.length] = packed[]; 585 vcl += packed.length; 586 break; 587 case "char[]": 588 if (ext == SQLType.INFER_FROM_D_TYPE) 589 types[ct++] = SQLType.VARCHAR; 590 else 591 types[ct++] = cast(ubyte) ext; 592 types[ct++] = SIGNED; 593 char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]); 594 ubyte[] packed = packLCS(cast(void[]) ca); 595 reAlloc(packed.length); 596 vals[vcl..vcl+packed.length] = packed[]; 597 vcl += packed.length; 598 break; 599 case "byte[]": 600 if (ext == SQLType.INFER_FROM_D_TYPE) 601 types[ct++] = SQLType.TINYBLOB; 602 else 603 types[ct++] = cast(ubyte) ext; 604 types[ct++] = SIGNED; 605 byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]); 606 ubyte[] packed = packLCS(cast(void[]) ba); 607 reAlloc(packed.length); 608 vals[vcl..vcl+packed.length] = packed[]; 609 vcl += packed.length; 610 break; 611 case "ubyte[]": 612 if (ext == SQLType.INFER_FROM_D_TYPE) 613 types[ct++] = SQLType.TINYBLOB; 614 else 615 types[ct++] = cast(ubyte) ext; 616 types[ct++] = SIGNED; 617 ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]); 618 ubyte[] packed = packLCS(cast(void[]) uba); 619 reAlloc(packed.length); 620 vals[vcl..vcl+packed.length] = packed[]; 621 vcl += packed.length; 622 break; 623 case "void": 624 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 625 default: 626 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 627 } 628 } 629 vals.length = vcl; 630 return types; 631 } 632 633 static void sendLongData(Connection conn, uint hStmt, ParameterSpecialization[] psa) 634 { 635 assert(psa.length <= ushort.max); // parameter number is sent as short 636 foreach (ushort i, PSN psn; psa) 637 { 638 if (!psn.chunkSize) continue; 639 uint cs = psn.chunkSize; 640 uint delegate(ubyte[]) dg = psn.chunkDelegate; 641 642 //TODO: All low-level commms should be moved into the mysql.protocol package. 643 ubyte[] chunk; 644 chunk.length = cs+11; 645 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 646 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 647 hStmt.packInto(chunk[5..9]); // statement handle 648 packInto(i, chunk[9..11]); // parameter number 649 650 // byte 11 on is payload 651 for (;;) 652 { 653 uint sent = dg(chunk[11..cs+11]); 654 if (sent < cs) 655 { 656 if (sent == 0) // data was exact multiple of chunk size - all sent 657 break; 658 sent += 7; // adjust for non-payload bytes 659 chunk.length = chunk.length - (cs-sent); // trim the chunk 660 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 661 conn.send(chunk); 662 break; 663 } 664 conn.send(chunk); 665 } 666 } 667 } 668 669 static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, 670 Variant[] inParams, ParameterSpecialization[] psa) 671 { 672 conn.autoPurge(); 673 674 //TODO: All low-level commms should be moved into the mysql.protocol package. 675 ubyte[] packet; 676 conn.resetPacket(); 677 678 ubyte[] prefix = makePSPrefix(hStmt, 0); 679 size_t len = prefix.length; 680 bool longData; 681 682 if (psh.paramCount) 683 { 684 ubyte[] one = [ 1 ]; 685 ubyte[] vals; 686 ubyte[] types = analyseParams(inParams, psa, vals, longData); 687 ubyte[] nbm = makeBitmap(inParams); 688 packet = prefix ~ nbm ~ one ~ types ~ vals; 689 } 690 else 691 packet = prefix; 692 693 if (longData) 694 sendLongData(conn, hStmt, psa); 695 696 assert(packet.length <= uint.max); 697 packet.setPacketHeader(conn.pktNumber); 698 conn.bumpPacket(); 699 conn.send(packet); 700 } 701 } 702 703 /++ 704 A class representing a database connection. 705 706 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 707 creating a new Connection directly. That will provide certain benefits, 708 such as reusing old connections and automatic cleanup (no need to close 709 the connection when done). 710 711 ------------------ 712 // Suggested usage: 713 714 { 715 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 716 scope(exit) con.close(); 717 718 // Use the connection 719 ... 720 } 721 ------------------ 722 +/ 723 //TODO: All low-level commms should be moved into the mysql.protocol package. 724 class Connection 725 { 726 /+ 727 The Connection is responsible for handshaking with the server to establish 728 authentication. It then passes client preferences to the server, and 729 subsequently is the channel for all command packets that are sent, and all 730 response packets received. 731 732 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one 733 byte as a packet number. Connection deals with the headers and ensures that 734 packet numbers are sequential. 735 736 The initial packet is sent by the server - essentially a 'hello' packet 737 inviting login. That packet has a sequence number of zero. That sequence 738 number is the incremented by client and server packets through the handshake 739 sequence. 740 741 After login all further sequences are initialized by the client sending a 742 command packet with a zero sequence number, to which the server replies with 743 zero or more packets with sequential sequence numbers. 744 +/ 745 package: 746 enum OpenState 747 { 748 /// We have not yet connected to the server, or have sent QUIT to the 749 /// server and closed the connection 750 notConnected, 751 /// We have connected to the server and parsed the greeting, but not 752 /// yet authenticated 753 connected, 754 /// We have successfully authenticated against the server, and need to 755 /// send QUIT to the server when closing the connection 756 authenticated 757 } 758 OpenState _open; 759 MySQLSocket _socket; 760 761 SvrCapFlags _sCaps, _cCaps; 762 uint _sThread; 763 ushort _serverStatus; 764 ubyte _sCharSet, _protocol; 765 string _serverVersion; 766 767 string _host, _user, _pwd, _db; 768 ushort _port; 769 770 MySQLSocketType _socketType; 771 772 OpenSocketCallbackPhobos _openSocketPhobos; 773 OpenSocketCallbackVibeD _openSocketVibeD; 774 775 ulong _insertID; 776 777 // This gets incremented every time a command is issued or results are purged, 778 // so a ResultRange can tell whether it's been invalidated. 779 ulong _lastCommandID; 780 781 // Whether there are rows, headers or bimary data waiting to be retreived. 782 // MySQL protocol doesn't permit performing any other action until all 783 // such data is read. 784 bool _rowsPending, _headersPending, _binaryPending; 785 786 // Field count of last performed command. 787 //TODO: Does Connection need to store this? 788 ushort _fieldCount; 789 790 // ResultSetHeaders of last performed command. 791 //TODO: Does Connection need to store this? Is this even used? 792 ResultSetHeaders _rsh; 793 794 // This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise 795 // you'll get the dreaded "packet out of order" message. It, and the socket connection are 796 // the reason why most other objects require a connection object for their construction. 797 ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct 798 /// ordering. First packet should have 0 799 @property ubyte pktNumber() { return _cpn; } 800 void bumpPacket() { _cpn++; } 801 void resetPacket() { _cpn = 0; } 802 803 version(Have_vibe_d_core) {} else 804 pure const nothrow invariant() 805 { 806 assert(_socketType != MySQLSocketType.vibed); 807 } 808 809 ubyte[] getPacket() 810 { 811 scope(failure) kill(); 812 813 ubyte[4] header; 814 _socket.read(header); 815 // number of bytes always set as 24-bit 816 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 817 enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order"); 818 bumpPacket(); 819 820 ubyte[] packet = new ubyte[numDataBytes]; 821 _socket.read(packet); 822 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 823 return packet; 824 } 825 826 void send(const(ubyte)[] packet) 827 in 828 { 829 assert(packet.length > 4); // at least 1 byte more than header 830 } 831 body 832 { 833 _socket.write(packet); 834 } 835 836 void send(const(ubyte)[] header, const(ubyte)[] data) 837 in 838 { 839 assert(header.length == 4 || header.length == 5/*command type included*/); 840 } 841 body 842 { 843 _socket.write(header); 844 if(data.length) 845 _socket.write(data); 846 } 847 848 void sendCmd(T)(CommandType cmd, const(T)[] data) 849 in 850 { 851 // Internal thread states. Clients shouldn't use this 852 assert(cmd != CommandType.SLEEP); 853 assert(cmd != CommandType.CONNECT); 854 assert(cmd != CommandType.TIME); 855 assert(cmd != CommandType.DELAYED_INSERT); 856 assert(cmd != CommandType.CONNECT_OUT); 857 858 // Deprecated 859 assert(cmd != CommandType.CREATE_DB); 860 assert(cmd != CommandType.DROP_DB); 861 assert(cmd != CommandType.TABLE_DUMP); 862 863 // cannot send more than uint.max bytes. TODO: better error message if we try? 864 assert(data.length <= uint.max); 865 } 866 out 867 { 868 // at this point we should have sent a command 869 assert(pktNumber == 1); 870 } 871 body 872 { 873 autoPurge(); 874 875 scope(failure) kill(); 876 877 _lastCommandID++; 878 879 if(!_socket.connected) 880 { 881 if(cmd == CommandType.QUIT) 882 return; // Don't bother reopening connection just to quit 883 884 _open = OpenState.notConnected; 885 connect(_clientCapabilities); 886 } 887 888 resetPacket(); 889 890 ubyte[] header; 891 header.length = 4 /*header*/ + 1 /*cmd*/; 892 header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/); 893 header[4] = cmd; 894 bumpPacket(); 895 896 send(header, cast(const(ubyte)[])data); 897 } 898 899 OKErrorPacket getCmdResponse(bool asString = false) 900 { 901 auto okp = OKErrorPacket(getPacket()); 902 enforcePacketOK(okp); 903 _serverStatus = okp.serverStatus; 904 return okp; 905 } 906 907 /// Get the next `mysql.result.Row` of a pending result set. 908 Row getNextRow() 909 { 910 scope(failure) kill(); 911 912 if (_headersPending) 913 { 914 _rsh = ResultSetHeaders(this, _fieldCount); 915 _headersPending = false; 916 } 917 ubyte[] packet; 918 Row rr; 919 packet = getPacket(); 920 if (packet.isEOFPacket()) 921 { 922 _rowsPending = _binaryPending = false; 923 return rr; 924 } 925 if (_binaryPending) 926 rr = Row(this, packet, _rsh, true); 927 else 928 rr = Row(this, packet, _rsh, false); 929 //rr._valid = true; 930 return rr; 931 } 932 933 ubyte[] buildAuthPacket(ubyte[] token) 934 in 935 { 936 assert(token.length == 20); 937 } 938 body 939 { 940 ubyte[] packet; 941 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1); 942 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 943 944 // NOTE: we'll set the header last when we know the size 945 946 // Set the default capabilities required by the client 947 _cCaps.packInto(packet[4..8]); 948 949 // Request a conventional maximum packet length. 950 1.packInto(packet[8..12]); 951 952 packet ~= 33; // Set UTF-8 as default charSet 953 954 // There's a statutory block of zero bytes here - fill them in. 955 foreach(i; 0 .. 23) 956 packet ~= 0; 957 958 // Add the user name as a null terminated string 959 foreach(i; 0 .. _user.length) 960 packet ~= _user[i]; 961 packet ~= 0; // \0 962 963 // Add our calculated authentication token as a length prefixed string. 964 assert(token.length <= ubyte.max); 965 if(_pwd.length == 0) // Omit the token if the account has no password 966 packet ~= 0; 967 else 968 { 969 packet ~= cast(ubyte)token.length; 970 foreach(i; 0 .. token.length) 971 packet ~= token[i]; 972 } 973 974 // Add the default database as a null terminated string 975 foreach(i; 0 .. _db.length) 976 packet ~= _db[i]; 977 packet ~= 0; // \0 978 979 // The server sent us a greeting with packet number 0, so we send the auth packet 980 // back with the next number. 981 packet.setPacketHeader(pktNumber); 982 bumpPacket(); 983 return packet; 984 } 985 986 void consumeServerInfo(ref ubyte[] packet) 987 { 988 scope(failure) kill(); 989 990 _sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 991 _sCharSet = packet.consume!ubyte(); // server_language 992 _serverStatus = packet.consume!ushort(); //server_status 993 _sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 994 _sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 995 996 enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 997 enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 998 } 999 1000 ubyte[] parseGreeting() 1001 { 1002 scope(failure) kill(); 1003 1004 ubyte[] packet = getPacket(); 1005 1006 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 1007 { 1008 auto okp = OKErrorPacket(packet); 1009 enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 1010 } 1011 1012 _protocol = packet.consume!ubyte(); 1013 1014 _serverVersion = packet.consume!string(packet.countUntil(0)); 1015 packet.skip(1); // \0 terminated _serverVersion 1016 1017 _sThread = packet.consume!uint(); 1018 1019 // read first part of scramble buf 1020 ubyte[] authBuf; 1021 authBuf.length = 255; 1022 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 1023 1024 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 1025 1026 consumeServerInfo(packet); 1027 1028 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 1029 packet.skip(10); // filler of \0 1030 1031 // rest of the scramble 1032 auto len = packet.countUntil(0); 1033 enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 1034 enforce(authBuf.length > 8+len); 1035 authBuf[8..8+len] = packet.consume(len)[]; 1036 authBuf.length = 8+len; // cut to correct size 1037 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 1038 1039 return authBuf; 1040 } 1041 1042 static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port) 1043 { 1044 auto s = new PlainPhobosSocket(); 1045 s.connect(new InternetAddress(host, port)); 1046 return s; 1047 } 1048 1049 static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port) 1050 { 1051 version(Have_vibe_d_core) 1052 return vibe.core.net.connectTCP(host, port); 1053 else 1054 assert(0); 1055 } 1056 1057 void initConnection() 1058 { 1059 resetPacket(); 1060 final switch(_socketType) 1061 { 1062 case MySQLSocketType.phobos: 1063 _socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port)); 1064 break; 1065 1066 case MySQLSocketType.vibed: 1067 version(Have_vibe_d_core) { 1068 _socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port)); 1069 break; 1070 } else assert(0, "Unsupported socket type. Need version Have_vibe_d_core."); 1071 } 1072 } 1073 1074 ubyte[] makeToken(ubyte[] authBuf) 1075 { 1076 auto pass1 = sha1Of(cast(const(ubyte)[])_pwd); 1077 auto pass2 = sha1Of(pass1); 1078 1079 SHA1 sha1; 1080 sha1.start(); 1081 sha1.put(authBuf); 1082 sha1.put(pass2); 1083 auto result = sha1.finish(); 1084 foreach (size_t i; 0..20) 1085 result[i] = result[i] ^ pass1[i]; 1086 return result.dup; 1087 } 1088 1089 SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 1090 { 1091 SvrCapFlags common; 1092 uint filter = 1; 1093 foreach (size_t i; 0..uint.sizeof) 1094 { 1095 bool serverSupport = (server & filter) != 0; // can the server do this capability? 1096 bool clientSupport = (client & filter) != 0; // can we support it? 1097 if(serverSupport && clientSupport) 1098 common |= filter; 1099 filter <<= 1; // check next flag 1100 } 1101 return common; 1102 } 1103 1104 void setClientFlags(SvrCapFlags capFlags) 1105 { 1106 _cCaps = getCommonCapabilities(_sCaps, capFlags); 1107 1108 // We cannot operate in <4.1 protocol, so we'll force it even if the user 1109 // didn't supply it 1110 _cCaps |= SvrCapFlags.PROTOCOL41; 1111 _cCaps |= SvrCapFlags.SECURE_CONNECTION; 1112 } 1113 1114 void authenticate(ubyte[] greeting) 1115 in 1116 { 1117 assert(_open == OpenState.connected); 1118 } 1119 out 1120 { 1121 assert(_open == OpenState.authenticated); 1122 } 1123 body 1124 { 1125 auto token = makeToken(greeting); 1126 auto authPacket = buildAuthPacket(token); 1127 send(authPacket); 1128 1129 auto packet = getPacket(); 1130 auto okp = OKErrorPacket(packet); 1131 enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 1132 _open = OpenState.authenticated; 1133 } 1134 1135 SvrCapFlags _clientCapabilities; 1136 1137 void connect(SvrCapFlags clientCapabilities) 1138 in 1139 { 1140 assert(closed); 1141 } 1142 out 1143 { 1144 assert(_open == OpenState.authenticated); 1145 } 1146 body 1147 { 1148 initConnection(); 1149 auto greeting = parseGreeting(); 1150 _open = OpenState.connected; 1151 1152 _clientCapabilities = clientCapabilities; 1153 setClientFlags(clientCapabilities); 1154 authenticate(greeting); 1155 } 1156 1157 /// Forcefully close the socket without sending the quit command. 1158 /// Needed in case an error leaves communatations in an undefined or non-recoverable state. 1159 void kill() 1160 { 1161 if(_socket.connected) 1162 _socket.close(); 1163 _open = OpenState.notConnected; 1164 // any pending data is gone. Any statements to release will be released 1165 // on the server automatically. 1166 _headersPending = _rowsPending = _binaryPending = false; 1167 1168 static if(__traits(compiles, (){ int[int] aa; aa.clear(); })) 1169 preparedLookup.clear(); 1170 else 1171 preparedLookup = null; 1172 } 1173 1174 /// Called whenever mysql-native needs to send a command to the server 1175 /// and be sure there aren't any pending results (which would prevent 1176 /// a new command from being sent). 1177 void autoPurge() 1178 { 1179 // This is called every time a command is sent, 1180 // so detect & prevent infinite recursion. 1181 static bool isAutoPurging = false; 1182 1183 if(isAutoPurging) 1184 return; 1185 1186 isAutoPurging = true; 1187 scope(exit) isAutoPurging = false; 1188 1189 try 1190 { 1191 purgeResult(); 1192 releaseQueued(); 1193 } 1194 catch(Exception e) 1195 { 1196 // likely the connection was closed, so reset any state. 1197 // Don't treat this as a real error, because everything will be reset when we 1198 // reconnect. 1199 kill(); 1200 } 1201 } 1202 1203 /// Lookup per-connection prepared statement info by SQL 1204 PreparedServerInfo[string] preparedLookup; 1205 1206 /// Set `queuedForRelease` flag for a statement in `preparedLookup`. 1207 /// Does nothing if statement not in `preparedLookup`. 1208 private void setQueuedForRelease(string sql, bool value) 1209 { 1210 if(sql in preparedLookup) 1211 { 1212 auto info = preparedLookup[sql]; 1213 info.queuedForRelease = value; 1214 preparedLookup[sql] = info; 1215 } 1216 } 1217 1218 /// Queue a prepared statement for release. 1219 void queueForRelease(string sql) 1220 { 1221 // If connection's closed, then it IS released. 1222 if(closed) 1223 return; 1224 1225 setQueuedForRelease(sql, true); 1226 } 1227 1228 /// Remove a statement from the queue to be released. 1229 void unqueueForRelease(string sql) 1230 { 1231 setQueuedForRelease(sql, false); 1232 } 1233 1234 /// Releases all prepared statements that are queued for release. 1235 void releaseQueued() 1236 { 1237 foreach(sql, info; preparedLookup) 1238 if(info.queuedForRelease) 1239 { 1240 immediateReleasePrepared(info.statementId); 1241 preparedLookup.remove(sql); 1242 } 1243 } 1244 1245 /// Returns null if not found 1246 Nullable!PreparedServerInfo getPreparedServerInfo(const string sql) pure nothrow 1247 { 1248 Nullable!PreparedServerInfo result; 1249 1250 auto pInfo = sql in preparedLookup; 1251 if(pInfo) 1252 result = *pInfo; 1253 1254 return result; 1255 } 1256 1257 /// If already registered, simply returns the cached `PreparedServerInfo`. 1258 PreparedServerInfo registerIfNeeded(string sql) 1259 out(info) 1260 { 1261 // I'm confident this can't currently happen, but 1262 // let's make sure that doesn't change. 1263 assert(!info.queuedForRelease); 1264 } 1265 body 1266 { 1267 if(auto pInfo = sql in preparedLookup) 1268 { 1269 // The statement is registered. It may, or may not, be queued 1270 // for release. Either way, all we need to do is make sure it's 1271 // un-queued and then return. 1272 pInfo.queuedForRelease = false; 1273 return *pInfo; 1274 } 1275 1276 auto info = registerIfNeededImpl(sql); 1277 preparedLookup[sql] = info; 1278 1279 return info; 1280 } 1281 1282 PreparedServerInfo registerIfNeededImpl(string sql) 1283 { 1284 scope(failure) kill(); 1285 1286 PreparedServerInfo info; 1287 1288 sendCmd(CommandType.STMT_PREPARE, sql); 1289 _fieldCount = 0; 1290 1291 //TODO: All packet handling should be moved into the mysql.protocol package. 1292 ubyte[] packet = getPacket(); 1293 if(packet.front == ResultPacketMarker.ok) 1294 { 1295 packet.popFront(); 1296 info.statementId = packet.consume!int(); 1297 _fieldCount = packet.consume!short(); 1298 info.numParams = packet.consume!short(); 1299 1300 packet.popFront(); // one byte filler 1301 info.psWarnings = packet.consume!short(); 1302 1303 // At this point the server also sends field specs for parameters 1304 // and columns if there were any of each 1305 info.headers = PreparedStmtHeaders(this, _fieldCount, info.numParams); 1306 } 1307 else if(packet.front == ResultPacketMarker.error) 1308 { 1309 auto error = OKErrorPacket(packet); 1310 enforcePacketOK(error); 1311 assert(0); // FIXME: what now? 1312 } 1313 else 1314 assert(0); // FIXME: what now? 1315 1316 return info; 1317 } 1318 1319 private void immediateReleasePrepared(uint statementId) 1320 { 1321 scope(failure) kill(); 1322 1323 if(closed()) 1324 return; 1325 1326 //TODO: All low-level commms should be moved into the mysql.protocol package. 1327 ubyte[9] packet_buf; 1328 ubyte[] packet = packet_buf; 1329 packet.setPacketHeader(0/*packet number*/); 1330 bumpPacket(); 1331 packet[4] = CommandType.STMT_CLOSE; 1332 statementId.packInto(packet[5..9]); 1333 purgeResult(); 1334 send(packet); 1335 // It seems that the server does not find it necessary to send a response 1336 // for this command. 1337 } 1338 1339 public: 1340 1341 /++ 1342 Construct opened connection. 1343 1344 Throws `mysql.exceptions.MYX` upon failure to connect. 1345 1346 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 1347 creating a new Connection directly. That will provide certain benefits, 1348 such as reusing old connections and automatic cleanup (no need to close 1349 the connection when done). 1350 1351 ------------------ 1352 // Suggested usage: 1353 1354 { 1355 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 1356 scope(exit) con.close(); 1357 1358 // Use the connection 1359 ... 1360 } 1361 ------------------ 1362 1363 Params: 1364 cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld" 1365 (TODO: The connection string needs work to allow for semicolons in its parts!) 1366 socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos, 1367 unless compiled with `-version=Have_vibe_d_core` (set automatically 1368 if using $(LINK2 http://code.dlang.org/getting_started, DUB)). 1369 openSocket = Optional callback which should return a newly-opened Phobos 1370 or Vibe.d TCP socket. This allows custom sockets to be used, 1371 subclassed from Phobos's or Vibe.d's sockets. 1372 host = An IP address in numeric dotted form, or as a host name. 1373 user = The user name to authenticate. 1374 password = User's password. 1375 db = Desired initial database. 1376 capFlags = The set of flag bits from the server's capabilities that the client requires 1377 +/ 1378 //After the connection is created, and the initial invitation is received from the server 1379 //client preferences can be set, and authentication can then be attempted. 1380 this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1381 { 1382 version(Have_vibe_d_core) 1383 enum defaultSocketType = MySQLSocketType.vibed; 1384 else 1385 enum defaultSocketType = MySQLSocketType.phobos; 1386 1387 this(defaultSocketType, host, user, pwd, db, port, capFlags); 1388 } 1389 1390 ///ditto 1391 this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1392 { 1393 version(Have_vibe_d_core) {} else 1394 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 1395 1396 this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD, 1397 host, user, pwd, db, port, capFlags); 1398 } 1399 1400 ///ditto 1401 this(OpenSocketCallbackPhobos openSocket, 1402 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1403 { 1404 this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags); 1405 } 1406 1407 version(Have_vibe_d_core) 1408 ///ditto 1409 this(OpenSocketCallbackVibeD openSocket, 1410 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1411 { 1412 this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags); 1413 } 1414 1415 ///ditto 1416 private this(MySQLSocketType socketType, 1417 OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD, 1418 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1419 in 1420 { 1421 final switch(socketType) 1422 { 1423 case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break; 1424 case MySQLSocketType.vibed: assert(openSocketVibeD !is null); break; 1425 } 1426 } 1427 body 1428 { 1429 enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1"); 1430 enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection"); 1431 version(Have_vibe_d_core) {} else 1432 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 1433 1434 _socketType = socketType; 1435 _host = host; 1436 _user = user; 1437 _pwd = pwd; 1438 _db = db; 1439 _port = port; 1440 1441 _openSocketPhobos = openSocketPhobos; 1442 _openSocketVibeD = openSocketVibeD; 1443 1444 connect(capFlags); 1445 } 1446 1447 ///ditto 1448 //After the connection is created, and the initial invitation is received from the server 1449 //client preferences can be set, and authentication can then be attempted. 1450 this(string cs, SvrCapFlags capFlags = defaultClientFlags) 1451 { 1452 string[] a = parseConnectionString(cs); 1453 this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1454 } 1455 1456 ///ditto 1457 this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags) 1458 { 1459 string[] a = parseConnectionString(cs); 1460 this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1461 } 1462 1463 ///ditto 1464 this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 1465 { 1466 string[] a = parseConnectionString(cs); 1467 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1468 } 1469 1470 version(Have_vibe_d_core) 1471 ///ditto 1472 this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 1473 { 1474 string[] a = parseConnectionString(cs); 1475 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1476 } 1477 1478 /++ 1479 Check whether this `Connection` is still connected to the server, or if 1480 the connection has been closed. 1481 +/ 1482 @property bool closed() 1483 { 1484 return _open == OpenState.notConnected || !_socket.connected; 1485 } 1486 1487 /++ 1488 Explicitly close the connection. 1489 1490 Idiomatic use as follows is suggested: 1491 ------------------ 1492 { 1493 auto con = new Connection("localhost:user:password:mysqld"); 1494 scope(exit) con.close(); 1495 // Use the connection 1496 ... 1497 } 1498 ------------------ 1499 +/ 1500 void close() 1501 { 1502 // This is a two-stage process. First tell the server we are quitting this 1503 // connection, and then close the socket. 1504 1505 if (_open == OpenState.authenticated && _socket.connected) 1506 quit(); 1507 1508 if (_open == OpenState.connected) 1509 kill(); 1510 resetPacket(); 1511 } 1512 1513 /++ 1514 Reconnects to the server using the same connection settings originally 1515 used to create the `Connection`. 1516 1517 Optionally takes a `mysql.protocol.constants.SvrCapFlags`, allowing you to 1518 reconnect using a different set of server capability flags. 1519 1520 Normally, if the connection is already open, this will do nothing. However, 1521 if you request a different set of `mysql.protocol.constants.SvrCapFlags` 1522 then was originally used to create the `Connection`, the connection will 1523 be closed and then reconnected using the new `mysql.protocol.constants.SvrCapFlags`. 1524 +/ 1525 //TODO: This needs unittested. 1526 void reconnect() 1527 { 1528 reconnect(_clientCapabilities); 1529 } 1530 1531 ///ditto 1532 void reconnect(SvrCapFlags clientCapabilities) 1533 { 1534 bool sameCaps = clientCapabilities == _clientCapabilities; 1535 if(!closed) 1536 { 1537 // Same caps as before? 1538 if(clientCapabilities == _clientCapabilities) 1539 return; // Nothing to do, just keep current connection 1540 1541 close(); 1542 } 1543 1544 connect(clientCapabilities); 1545 } 1546 1547 private void quit() 1548 in 1549 { 1550 assert(_open == OpenState.authenticated); 1551 } 1552 body 1553 { 1554 sendCmd(CommandType.QUIT, []); 1555 // No response is sent for a quit packet 1556 _open = OpenState.connected; 1557 } 1558 1559 /++ 1560 Parses a connection string of the form 1561 `"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"` 1562 1563 Port is optional and defaults to 3306. 1564 1565 Whitespace surrounding any name or value is automatically stripped. 1566 1567 Returns a five-element array of strings in this order: 1568 $(UL 1569 $(LI [0]: host) 1570 $(LI [1]: user) 1571 $(LI [2]: pwd) 1572 $(LI [3]: db) 1573 $(LI [4]: port) 1574 ) 1575 1576 (TODO: The connection string needs work to allow for semicolons in its parts!) 1577 +/ 1578 //TODO: Replace the return value with a proper struct. 1579 static string[] parseConnectionString(string cs) 1580 { 1581 string[] rv; 1582 rv.length = 5; 1583 rv[4] = "3306"; // Default port 1584 string[] a = split(cs, ";"); 1585 foreach (s; a) 1586 { 1587 string[] a2 = split(s, "="); 1588 enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs); 1589 string name = strip(a2[0]); 1590 string val = strip(a2[1]); 1591 switch (name) 1592 { 1593 case "host": 1594 rv[0] = val; 1595 break; 1596 case "user": 1597 rv[1] = val; 1598 break; 1599 case "pwd": 1600 rv[2] = val; 1601 break; 1602 case "db": 1603 rv[3] = val; 1604 break; 1605 case "port": 1606 rv[4] = val; 1607 break; 1608 default: 1609 throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__); 1610 } 1611 } 1612 return rv; 1613 } 1614 1615 /++ 1616 Select a current database. 1617 1618 Throws `mysql.exceptions.MYX` upon failure. 1619 1620 Params: dbName = Name of the requested database 1621 +/ 1622 void selectDB(string dbName) 1623 { 1624 sendCmd(CommandType.INIT_DB, dbName); 1625 getCmdResponse(); 1626 _db = dbName; 1627 } 1628 1629 /++ 1630 Check the server status. 1631 1632 Throws `mysql.exceptions.MYX` upon failure. 1633 1634 Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined 1635 +/ 1636 OKErrorPacket pingServer() 1637 { 1638 sendCmd(CommandType.PING, []); 1639 return getCmdResponse(); 1640 } 1641 1642 /++ 1643 Refresh some feature(s) of the server. 1644 1645 Throws `mysql.exceptions.MYX` upon failure. 1646 1647 Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined 1648 +/ 1649 OKErrorPacket refreshServer(RefreshFlags flags) 1650 { 1651 sendCmd(CommandType.REFRESH, [flags]); 1652 return getCmdResponse(); 1653 } 1654 1655 /++ 1656 Flush any outstanding result set elements. 1657 1658 When the server responds to a command that produces a result set, it 1659 queues the whole set of corresponding packets over the current connection. 1660 Before that `Connection` can embark on any new command, it must receive 1661 all of those packets and junk them. 1662 1663 As of v1.1.4, this is done automatically as needed. But you can still 1664 call this manually to force a purge to occur when you want. 1665 1666 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/) 1667 +/ 1668 ulong purgeResult() 1669 { 1670 scope(failure) kill(); 1671 1672 _lastCommandID++; 1673 1674 ulong rows = 0; 1675 if (_headersPending) 1676 { 1677 for (size_t i = 0;; i++) 1678 { 1679 if (getPacket().isEOFPacket()) 1680 { 1681 _headersPending = false; 1682 break; 1683 } 1684 enforceEx!MYXProtocol(i < _fieldCount, 1685 text("Field header count (", _fieldCount, ") exceeded but no EOF packet found.")); 1686 } 1687 } 1688 if (_rowsPending) 1689 { 1690 for (;; rows++) 1691 { 1692 if (getPacket().isEOFPacket()) 1693 { 1694 _rowsPending = _binaryPending = false; 1695 break; 1696 } 1697 } 1698 } 1699 resetPacket(); 1700 return rows; 1701 } 1702 1703 /++ 1704 Get a textual report on the server status. 1705 1706 (COM_STATISTICS) 1707 +/ 1708 string serverStats() 1709 { 1710 sendCmd(CommandType.STATISTICS, []); 1711 return cast(string) getPacket(); 1712 } 1713 1714 /++ 1715 Enable multiple statement commands. 1716 1717 This can be used later if this feature was not requested in the client capability flags. 1718 1719 Warning: This functionality is currently untested. 1720 1721 Params: on = Boolean value to turn the capability on or off. 1722 +/ 1723 //TODO: Need to test this 1724 void enableMultiStatements(bool on) 1725 { 1726 scope(failure) kill(); 1727 1728 ubyte[] t; 1729 t.length = 2; 1730 t[0] = on ? 0 : 1; 1731 t[1] = 0; 1732 sendCmd(CommandType.STMT_OPTION, t); 1733 1734 // For some reason this command gets an EOF packet as response 1735 auto packet = getPacket(); 1736 enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1737 } 1738 1739 /// Return the in-force protocol number. 1740 @property ubyte protocol() pure const nothrow { return _protocol; } 1741 /// Server version 1742 @property string serverVersion() pure const nothrow { return _serverVersion; } 1743 /// Server capability flags 1744 @property uint serverCapabilities() pure const nothrow { return _sCaps; } 1745 /// Server status 1746 @property ushort serverStatus() pure const nothrow { return _serverStatus; } 1747 /// Current character set 1748 @property ubyte charSet() pure const nothrow { return _sCharSet; } 1749 /// Current database 1750 @property string currentDB() pure const nothrow { return _db; } 1751 /// Socket type being used, Phobos or Vibe.d 1752 @property MySQLSocketType socketType() pure const nothrow { return _socketType; } 1753 1754 /// After a command that inserted a row into a table with an auto-increment 1755 /// ID column, this method allows you to retrieve the last insert ID. 1756 @property ulong lastInsertID() pure const nothrow { return _insertID; } 1757 1758 /// This gets incremented every time a command is issued or results are purged, 1759 /// so a `mysql.result.ResultRange` can tell whether it's been invalidated. 1760 @property ulong lastCommandID() pure const nothrow { return _lastCommandID; } 1761 1762 /// Gets whether rows are pending. 1763 /// 1764 /// Note, you may want `hasPending` instead. 1765 @property bool rowsPending() pure const nothrow { return _rowsPending; } 1766 1767 /// Gets whether anything (rows, headers or binary) is pending. 1768 /// New commands cannot be sent on a connection while anything is pending 1769 /// (the pending data will automatically be purged.) 1770 @property bool hasPending() pure const nothrow 1771 { 1772 return _rowsPending || _headersPending || _binaryPending; 1773 } 1774 1775 /// Gets the result header's field descriptions. 1776 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 1777 1778 /++ 1779 Manually register a prepared statement on this connection. 1780 1781 Does nothing if statement is already registered on this connection. 1782 1783 Calling this is not strictly necessary, as the prepared statement will 1784 automatically be registered upon its first use on any `Connection`. 1785 This is provided for those who prefer eager registration over lazy 1786 for performance reasons. 1787 +/ 1788 void register(Prepared prepared) 1789 { 1790 registerIfNeeded(prepared.sql); 1791 } 1792 1793 /++ 1794 Manually release a prepared statement on this connection. 1795 1796 This method tells the server that it can dispose of the information it 1797 holds about the current prepared statement. 1798 1799 Calling this is not strictly necessary. The server considers prepared 1800 statements to be per-connection, so they'll go away when the connection 1801 closes anyway. This is provided in case direct control is actually needed. 1802 1803 If you choose to use a reference counted struct to call this automatically, 1804 be aware that embedding reference counted structs inside garbage collectible 1805 heap objects is dangerous and should be avoided, as it can lead to various 1806 hidden problems, from crashes to race conditions. (See the discussion at issue 1807 $(LINK2 https://github.com/mysql-d/mysql-native/issues/159, #159) 1808 for details.) Instead, it may be better to simply avoid trying to manage 1809 their release at all, as it's not usually necessary. Or to periodically 1810 release all prepared statements, and simply allow mysql-native to 1811 automatically re-register them upon their next use. 1812 1813 Notes: 1814 1815 In actuality, the server might not immediately be told to release the 1816 statement (although `isRegistered` will still report `false`). 1817 1818 This is because there could be a `mysql.result.ResultRange` with results 1819 still pending for retrieval, and the protocol doesn't allow sending commands 1820 (such as "release a prepared statement") to the server while data is pending. 1821 Therefore, this function may instead queue the statement to be released 1822 when it is safe to do so: Either the next time a result set is purged or 1823 the next time a command (such as `mysql.commands.query` or 1824 `mysql.commands.exec`) is performed (because such commands automatically 1825 purge any pending results). 1826 1827 This function does NOT auto-purge because, if this is ever called from 1828 automatic resource management cleanup (refcounting, RAII, etc), that 1829 would create ugly situations where hidden, implicit behavior triggers 1830 an unexpected auto-purge. 1831 +/ 1832 void release(Prepared prepared) 1833 { 1834 release(prepared.sql); 1835 } 1836 1837 ///ditto 1838 void release(string sql) 1839 { 1840 //TODO: Don't queue it if nothing is pending. Just do it immediately. 1841 // But need to be certain both situations are unittested. 1842 queueForRelease(sql); 1843 } 1844 1845 /// Is the given SQL registered on this connection as a prepared statement? 1846 bool isRegistered(Prepared prepared) 1847 { 1848 return isRegistered( prepared.sql ); 1849 } 1850 1851 ///ditto 1852 bool isRegistered(string sql) 1853 { 1854 return isRegistered( getPreparedServerInfo(sql) ); 1855 } 1856 1857 ///ditto 1858 package bool isRegistered(Nullable!PreparedServerInfo info) 1859 { 1860 return !info.isNull && !info.queuedForRelease; 1861 } 1862 } 1863 1864 // Test register, release, isRegistered, and auto-register for prepared statements 1865 debug(MYSQLN_TESTS) 1866 unittest 1867 { 1868 import mysql.connection; 1869 import mysql.test.common; 1870 1871 Prepared preparedInsert; 1872 Prepared preparedSelect; 1873 immutable insertSQL = "INSERT INTO `autoRegistration` VALUES (1), (2)"; 1874 immutable selectSQL = "SELECT `val` FROM `autoRegistration`"; 1875 int queryTupleResult; 1876 1877 { 1878 mixin(scopedCn); 1879 1880 // Setup 1881 cn.exec("DROP TABLE IF EXISTS `autoRegistration`"); 1882 cn.exec("CREATE TABLE `autoRegistration` ( 1883 `val` INTEGER 1884 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 1885 1886 // Initial register 1887 preparedInsert = cn.prepare(insertSQL); 1888 preparedSelect = cn.prepare(selectSQL); 1889 1890 // Test basic register, release, isRegistered 1891 assert(cn.isRegistered(preparedInsert)); 1892 assert(cn.isRegistered(preparedSelect)); 1893 cn.release(preparedInsert); 1894 cn.release(preparedSelect); 1895 assert(!cn.isRegistered(preparedInsert)); 1896 assert(!cn.isRegistered(preparedSelect)); 1897 1898 // Test manual re-register 1899 cn.register(preparedInsert); 1900 cn.register(preparedSelect); 1901 assert(cn.isRegistered(preparedInsert)); 1902 assert(cn.isRegistered(preparedSelect)); 1903 1904 // Test double register 1905 cn.register(preparedInsert); 1906 cn.register(preparedSelect); 1907 assert(cn.isRegistered(preparedInsert)); 1908 assert(cn.isRegistered(preparedSelect)); 1909 1910 // Test double release 1911 cn.release(preparedInsert); 1912 cn.release(preparedSelect); 1913 assert(!cn.isRegistered(preparedInsert)); 1914 assert(!cn.isRegistered(preparedSelect)); 1915 cn.release(preparedInsert); 1916 cn.release(preparedSelect); 1917 assert(!cn.isRegistered(preparedInsert)); 1918 assert(!cn.isRegistered(preparedSelect)); 1919 } 1920 1921 // Note that at this point, both prepared statements still exist, 1922 // but are no longer registered on any connection. In fact, there 1923 // are no open connections anymore. 1924 1925 // Test auto-register: exec 1926 { 1927 mixin(scopedCn); 1928 1929 assert(!cn.isRegistered(preparedInsert)); 1930 cn.exec(preparedInsert); 1931 assert(cn.isRegistered(preparedInsert)); 1932 } 1933 1934 // Test auto-register: query 1935 { 1936 mixin(scopedCn); 1937 1938 assert(!cn.isRegistered(preparedSelect)); 1939 cn.query(preparedSelect).each(); 1940 assert(cn.isRegistered(preparedSelect)); 1941 } 1942 1943 // Test auto-register: queryRow 1944 { 1945 mixin(scopedCn); 1946 1947 assert(!cn.isRegistered(preparedSelect)); 1948 cn.queryRow(preparedSelect); 1949 assert(cn.isRegistered(preparedSelect)); 1950 } 1951 1952 // Test auto-register: queryRowTuple 1953 { 1954 mixin(scopedCn); 1955 1956 assert(!cn.isRegistered(preparedSelect)); 1957 cn.queryRowTuple(preparedSelect, queryTupleResult); 1958 assert(cn.isRegistered(preparedSelect)); 1959 } 1960 1961 // Test auto-register: queryValue 1962 { 1963 mixin(scopedCn); 1964 1965 assert(!cn.isRegistered(preparedSelect)); 1966 cn.queryValue(preparedSelect); 1967 assert(cn.isRegistered(preparedSelect)); 1968 } 1969 } 1970 1971 // An attempt to reproduce issue #81: Using mysql-native driver with no default database 1972 // I'm unable to actually reproduce the error, though. 1973 debug(MYSQLN_TESTS) 1974 unittest 1975 { 1976 import mysql.escape; 1977 mixin(scopedCn); 1978 1979 cn.exec("DROP TABLE IF EXISTS `issue81`"); 1980 cn.exec("CREATE TABLE `issue81` (a INTEGER) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 1981 cn.exec("INSERT INTO `issue81` (a) VALUES (1)"); 1982 1983 auto cn2 = new Connection(text("host=", cn._host, ";port=", cn._port, ";user=", cn._user, ";pwd=", cn._pwd)); 1984 scope(exit) cn2.close(); 1985 1986 cn2.query("SELECT * FROM `"~mysqlEscape(cn._db).text~"`.`issue81`"); 1987 } 1988 1989 // Regression test for Issue #154: 1990 // autoPurge can throw an exception if the socket was closed without purging 1991 // 1992 // This simulates a disconnect by closing the socket underneath the Connection 1993 // object itself. 1994 debug(MYSQLN_TESTS) 1995 unittest 1996 { 1997 mixin(scopedCn); 1998 1999 cn.exec("DROP TABLE IF EXISTS `dropConnection`"); 2000 cn.exec("CREATE TABLE `dropConnection` ( 2001 `val` INTEGER 2002 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 2003 cn.exec("INSERT INTO `dropConnection` VALUES (1), (2), (3)"); 2004 import mysql.prepared; 2005 { 2006 auto prep = cn.prepare("SELECT * FROM `dropConnection`"); 2007 cn.query(prep); 2008 } 2009 // close the socket forcibly 2010 cn._socket.close(); 2011 // this should still work (it should reconnect). 2012 cn.exec("DROP TABLE `dropConnection`"); 2013 } 2014 2015 /+ 2016 Test Prepared's ability to be safely refcount-released during a GC cycle 2017 (ie, `Connection.release` must not allocate GC memory). 2018 2019 Currently disabled because it's not guaranteed to always work 2020 (and apparently, cannot be made to work?) 2021 For relevant discussion, see issue #159: 2022 https://github.com/mysql-d/mysql-native/issues/159 2023 +/ 2024 version(none) 2025 debug(MYSQLN_TESTS) 2026 { 2027 /// Proof-of-concept ref-counted Prepared wrapper, just for testing, 2028 /// not really intended for actual use. 2029 private struct RCPreparedPayload 2030 { 2031 Prepared prepared; 2032 Connection conn; // Connection to be released from 2033 2034 alias prepared this; 2035 2036 @disable this(this); // not copyable 2037 ~this() 2038 { 2039 // There are a couple calls to this dtor where `conn` happens to be null. 2040 if(conn is null) 2041 return; 2042 2043 assert(conn.isRegistered(prepared)); 2044 conn.release(prepared); 2045 } 2046 } 2047 ///ditto 2048 alias RCPrepared = RefCounted!(RCPreparedPayload, RefCountedAutoInitialize.no); 2049 ///ditto 2050 private RCPrepared rcPrepare(Connection conn, string sql) 2051 { 2052 import std.algorithm.mutation : move; 2053 2054 auto prepared = conn.prepare(sql); 2055 auto payload = RCPreparedPayload(prepared, conn); 2056 return refCounted(move(payload)); 2057 } 2058 2059 unittest 2060 { 2061 import core.memory; 2062 mixin(scopedCn); 2063 2064 cn.exec("DROP TABLE IF EXISTS `rcPrepared`"); 2065 cn.exec("CREATE TABLE `rcPrepared` ( 2066 `val` INTEGER 2067 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 2068 cn.exec("INSERT INTO `rcPrepared` VALUES (1), (2), (3)"); 2069 2070 // Define this in outer scope to guarantee data is left pending when 2071 // RCPrepared's payload is collected. This will guarantee 2072 // that Connection will need to queue the release. 2073 ResultRange rows; 2074 2075 void bar() 2076 { 2077 class Foo { RCPrepared p; } 2078 auto foo = new Foo(); 2079 2080 auto rcStmt = cn.rcPrepare("SELECT * FROM `rcPrepared`"); 2081 foo.p = rcStmt; 2082 rows = cn.query(rcStmt); 2083 2084 /+ 2085 At this point, there are two references to the prepared statement: 2086 One in a `Foo` object (currently bound to `foo`), and one on the stack. 2087 2088 Returning from this function will destroy the one on the stack, 2089 and deterministically reduce the refcount to 1. 2090 2091 So, right here we set `foo` to null to *keep* the Foo object's 2092 reference to the prepared statement, but set adrift the Foo object 2093 itself, ready to be destroyed (along with the only remaining 2094 prepared statement reference it contains) by the next GC cycle. 2095 2096 Thus, `RCPreparedPayload.~this` and `Connection.release(Prepared)` 2097 will be executed during a GC cycle...and had better not perform 2098 any allocations, or else...boom! 2099 +/ 2100 foo = null; 2101 } 2102 2103 bar(); 2104 assert(cn.hasPending); // Ensure Connection is forced to queue the release. 2105 GC.collect(); // `Connection.release(Prepared)` better not be allocating, or boom! 2106 } 2107 }