1 /// Connect to a MySQL/MariaDB server. 2 module mysql.connection; 3 4 import std.algorithm; 5 import std.conv; 6 import std.digest.sha; 7 import std.exception; 8 import std.range; 9 import std.socket; 10 import std.string; 11 import std.typecons; 12 13 import mysql.commands; 14 import mysql.exceptions; 15 import mysql.prepared; 16 import mysql.protocol.constants; 17 import mysql.protocol.packets; 18 import mysql.protocol.sockets; 19 import mysql.result; 20 debug(MYSQLN_TESTS) 21 { 22 import mysql.test.common; 23 } 24 25 version(Have_vibe_d_core) 26 { 27 static if(__traits(compiles, (){ import vibe.core.net; } )) 28 import vibe.core.net; 29 else 30 static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'."); 31 } 32 33 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection. 34 immutable SvrCapFlags defaultClientFlags = 35 SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS | 36 SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 | 37 SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS | 38 //SvrCapFlags.MULTI_RESULTS; 39 40 /++ 41 Submit an SQL command to the server to be compiled into a prepared statement. 42 43 This will automatically register the prepared statement on the provided connection. 44 The resulting `mysql.prepared.Prepared` can then be used freely on ANY `Connection`, 45 as it will automatically be registered upon its first use on other connections. 46 Or, pass it to `Connection.register` if you prefer eager registration. 47 48 Internally, the result of a successful outcome will be a statement handle - an ID - 49 for the prepared statement, a count of the parameters required for 50 execution of the statement, and a count of the columns that will be present 51 in any result set that the command generates. 52 53 The server will then proceed to send prepared statement headers, 54 including parameter descriptions, and result set field descriptions, 55 followed by an EOF packet. 56 57 Throws: `mysql.exceptions.MYX` if the server has a problem. 58 +/ 59 Prepared prepare(Connection conn, string sql) 60 { 61 auto info = conn.registerIfNeeded(sql); 62 return Prepared(sql, info.headers, info.numParams); 63 } 64 65 /++ 66 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. 67 68 See `BackwardCompatPrepared` for more info. 69 +/ 70 deprecated("This is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. You should migrate from this to the Prepared-compatible exec/query overloads in 'mysql.commands'.") 71 BackwardCompatPrepared prepareBackwardCompat(Connection conn, string sql) 72 { 73 return prepareBackwardCompatImpl(conn, sql); 74 } 75 76 /// Allow mysql-native tests to get around the deprecation message 77 package BackwardCompatPrepared prepareBackwardCompatImpl(Connection conn, string sql) 78 { 79 return BackwardCompatPrepared(conn, prepare(conn, sql)); 80 } 81 82 /++ 83 Convenience function to create a prepared statement which calls a stored function. 84 85 Be careful that your `numArgs` is correct. If it isn't, you may get a 86 `mysql.exceptions.MYX` with a very unclear error message. 87 88 Throws: `mysql.exceptions.MYX` if the server has a problem. 89 90 Params: 91 name = The name of the stored function. 92 numArgs = The number of arguments the stored procedure takes. 93 +/ 94 Prepared prepareFunction(Connection conn, string name, int numArgs) 95 { 96 auto sql = "select " ~ name ~ preparedPlaceholderArgs(numArgs); 97 return prepare(conn, sql); 98 } 99 100 /// 101 @("prepareFunction") 102 debug(MYSQLN_TESTS) 103 unittest 104 { 105 import mysql.test.common; 106 mixin(scopedCn); 107 108 exec(cn, `DROP FUNCTION IF EXISTS hello`); 109 exec(cn, ` 110 CREATE FUNCTION hello (s CHAR(20)) 111 RETURNS CHAR(50) DETERMINISTIC 112 RETURN CONCAT('Hello ',s,'!') 113 `); 114 115 auto preparedHello = prepareFunction(cn, "hello", 1); 116 preparedHello.setArgs("World"); 117 auto rs = cn.query(preparedHello).array; 118 assert(rs.length == 1); 119 assert(rs[0][0] == "Hello World!"); 120 } 121 122 /++ 123 Convenience function to create a prepared statement which calls a stored procedure. 124 125 OUT parameters are currently not supported. It should generally be 126 possible with MySQL to present them as a result set. 127 128 Be careful that your `numArgs` is correct. If it isn't, you may get a 129 `mysql.exceptions.MYX` with a very unclear error message. 130 131 Throws: `mysql.exceptions.MYX` if the server has a problem. 132 133 Params: 134 name = The name of the stored procedure. 135 numArgs = The number of arguments the stored procedure takes. 136 137 +/ 138 Prepared prepareProcedure(Connection conn, string name, int numArgs) 139 { 140 auto sql = "call " ~ name ~ preparedPlaceholderArgs(numArgs); 141 return prepare(conn, sql); 142 } 143 144 /// 145 @("prepareProcedure") 146 debug(MYSQLN_TESTS) 147 unittest 148 { 149 import mysql.test.common; 150 import mysql.test.integration; 151 mixin(scopedCn); 152 initBaseTestTables(cn); 153 154 exec(cn, `DROP PROCEDURE IF EXISTS insert2`); 155 exec(cn, ` 156 CREATE PROCEDURE insert2 (IN p1 INT, IN p2 CHAR(50)) 157 BEGIN 158 INSERT INTO basetest (intcol, stringcol) VALUES(p1, p2); 159 END 160 `); 161 162 auto preparedInsert2 = prepareProcedure(cn, "insert2", 2); 163 preparedInsert2.setArgs(2001, "inserted string 1"); 164 cn.exec(preparedInsert2); 165 166 auto rs = query(cn, "SELECT stringcol FROM basetest WHERE intcol=2001").array; 167 assert(rs.length == 1); 168 assert(rs[0][0] == "inserted string 1"); 169 } 170 171 private string preparedPlaceholderArgs(int numArgs) 172 { 173 auto sql = "("; 174 bool comma = false; 175 foreach(i; 0..numArgs) 176 { 177 if (comma) 178 sql ~= ",?"; 179 else 180 { 181 sql ~= "?"; 182 comma = true; 183 } 184 } 185 sql ~= ")"; 186 187 return sql; 188 } 189 190 @("preparedPlaceholderArgs") 191 debug(MYSQLN_TESTS) 192 unittest 193 { 194 assert(preparedPlaceholderArgs(3) == "(?,?,?)"); 195 assert(preparedPlaceholderArgs(2) == "(?,?)"); 196 assert(preparedPlaceholderArgs(1) == "(?)"); 197 assert(preparedPlaceholderArgs(0) == "()"); 198 } 199 200 /// Per-connection info from the server about a registered prepared statement. 201 package struct PreparedServerInfo 202 { 203 /// Server's identifier for this prepared statement. 204 /// Apperently, this is never 0 if it's been registered, 205 /// although mysql-native no longer relies on that. 206 uint statementId; 207 208 ushort psWarnings; 209 210 /// Number of parameters this statement takes. 211 /// 212 /// This will be the same on all connections, but it's returned 213 /// by the server upon registration, so it's stored here. 214 ushort numParams; 215 216 /// Prepared statement headers 217 /// 218 /// This will be the same on all connections, but it's returned 219 /// by the server upon registration, so it's stored here. 220 PreparedStmtHeaders headers; 221 222 /// Not actually from the server. Connection uses this to keep track 223 /// of statements that should be treated as having been released. 224 bool queuedForRelease = false; 225 } 226 227 /++ 228 This is a wrapper over `mysql.prepared.Prepared`, provided ONLY as a 229 temporary aid in upgrading to mysql-native v2.0.0 and its 230 new connection-independent model of prepared statements. See the 231 $(LINK2 https://github.com/mysql-d/mysql-native/blob/master/MIGRATING_TO_V2.md, migration guide) 232 for more info. 233 234 In most cases, this layer shouldn't even be needed. But if you have many 235 lines of code making calls to exec/query the same prepared statement, 236 then this may be helpful. 237 238 To use this temporary compatability layer, change instances of: 239 240 --- 241 auto stmt = conn.prepare(...); 242 --- 243 244 to this: 245 246 --- 247 auto stmt = conn.prepareBackwardCompat(...); 248 --- 249 250 And then your prepared statement should work as before. 251 252 BUT DO NOT LEAVE IT LIKE THIS! Ultimately, you should update 253 your prepared statement code to the mysql-native v2.0.0 API, by changing 254 instances of: 255 256 --- 257 stmt.exec() 258 stmt.query() 259 stmt.queryRow() 260 stmt.queryRowTuple(outputArgs...) 261 stmt.queryValue() 262 --- 263 264 to this: 265 266 --- 267 conn.exec(stmt) 268 conn.query(stmt) 269 conn.queryRow(stmt) 270 conn.queryRowTuple(stmt, outputArgs...) 271 conn.queryValue(stmt) 272 --- 273 274 Both of the above syntaxes can be used with a `BackwardCompatPrepared` 275 (the `Connection` passed directly to `mysql.commands.exec`/`mysql.commands.query` 276 will override the one embedded associated with your `BackwardCompatPrepared`). 277 278 Once all of your code is updated, you can change `prepareBackwardCompat` 279 back to `prepare` again, and your upgrade will be complete. 280 +/ 281 struct BackwardCompatPrepared 282 { 283 import std.variant; 284 285 private Connection _conn; 286 Prepared _prepared; 287 288 /// Access underlying `Prepared` 289 @property Prepared prepared() { return _prepared; } 290 291 alias _prepared this; 292 293 /++ 294 This function is provided ONLY as a temporary aid in upgrading to mysql-native v2.0.0. 295 296 See `BackwardCompatPrepared` for more info. 297 +/ 298 deprecated("Change 'preparedStmt.exec()' to 'conn.exec(preparedStmt)'") 299 ulong exec() 300 { 301 return .exec(_conn, _prepared); 302 } 303 304 ///ditto 305 deprecated("Change 'preparedStmt.query()' to 'conn.query(preparedStmt)'") 306 ResultRange query() 307 { 308 return .query(_conn, _prepared); 309 } 310 311 ///ditto 312 deprecated("Change 'preparedStmt.queryRow()' to 'conn.queryRow(preparedStmt)'") 313 Nullable!Row queryRow() 314 { 315 return .queryRow(_conn, _prepared); 316 } 317 318 ///ditto 319 deprecated("Change 'preparedStmt.queryRowTuple(outArgs...)' to 'conn.queryRowTuple(preparedStmt, outArgs...)'") 320 void queryRowTuple(T...)(ref T args) if(T.length == 0 || !is(T[0] : Connection)) 321 { 322 return .queryRowTuple(_conn, _prepared, args); 323 } 324 325 ///ditto 326 deprecated("Change 'preparedStmt.queryValue()' to 'conn.queryValue(preparedStmt)'") 327 Nullable!Variant queryValue() 328 { 329 return .queryValue(_conn, _prepared); 330 } 331 } 332 333 //TODO: All low-level commms should be moved into the mysql.protocol package. 334 /// Low-level comms code relating to prepared statements. 335 package struct ProtocolPrepared 336 { 337 import std.conv; 338 import std.datetime; 339 import std.variant; 340 import mysql.types; 341 342 static ubyte[] makeBitmap(in Variant[] inParams) 343 { 344 size_t bml = (inParams.length+7)/8; 345 ubyte[] bma; 346 bma.length = bml; 347 foreach (i; 0..inParams.length) 348 { 349 if(inParams[i].type != typeid(typeof(null))) 350 continue; 351 size_t bn = i/8; 352 size_t bb = i%8; 353 ubyte sr = 1; 354 sr <<= bb; 355 bma[bn] |= sr; 356 } 357 return bma; 358 } 359 360 static ubyte[] makePSPrefix(uint hStmt, ubyte flags = 0) pure nothrow 361 { 362 ubyte[] prefix; 363 prefix.length = 14; 364 365 prefix[4] = CommandType.STMT_EXECUTE; 366 hStmt.packInto(prefix[5..9]); 367 prefix[9] = flags; // flags, no cursor 368 prefix[10] = 1; // iteration count - currently always 1 369 prefix[11] = 0; 370 prefix[12] = 0; 371 prefix[13] = 0; 372 373 return prefix; 374 } 375 376 //TODO: All low-level commms should be moved into the mysql.protocol package. 377 static ubyte[] analyseParams(Variant[] inParams, ParameterSpecialization[] psa, 378 out ubyte[] vals, out bool longData) 379 { 380 size_t pc = inParams.length; 381 ubyte[] types; 382 types.length = pc*2; 383 size_t alloc = pc*20; 384 vals.length = alloc; 385 uint vcl = 0, len; 386 int ct = 0; 387 388 void reAlloc(size_t n) 389 { 390 if (vcl+n < alloc) 391 return; 392 size_t inc = (alloc*3)/2; 393 if (inc < n) 394 inc = n; 395 alloc += inc; 396 vals.length = alloc; 397 } 398 399 foreach (size_t i; 0..pc) 400 { 401 enum UNSIGNED = 0x80; 402 enum SIGNED = 0; 403 if (psa[i].chunkSize) 404 longData= true; 405 if (inParams[i].type == typeid(typeof(null))) 406 { 407 types[ct++] = SQLType.NULL; 408 types[ct++] = SIGNED; 409 continue; 410 } 411 Variant v = inParams[i]; 412 SQLType ext = psa[i].type; 413 string ts = v.type.toString(); 414 bool isRef; 415 if (ts[$-1] == '*') 416 { 417 ts.length = ts.length-1; 418 isRef= true; 419 } 420 421 switch (ts) 422 { 423 case "bool": 424 if (ext == SQLType.INFER_FROM_D_TYPE) 425 types[ct++] = SQLType.BIT; 426 else 427 types[ct++] = cast(ubyte) ext; 428 types[ct++] = SIGNED; 429 reAlloc(2); 430 bool bv = isRef? *(v.get!(bool*)): v.get!(bool); 431 vals[vcl++] = 1; 432 vals[vcl++] = bv? 0x31: 0x30; 433 break; 434 case "byte": 435 types[ct++] = SQLType.TINY; 436 types[ct++] = SIGNED; 437 reAlloc(1); 438 vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte); 439 break; 440 case "ubyte": 441 types[ct++] = SQLType.TINY; 442 types[ct++] = UNSIGNED; 443 reAlloc(1); 444 vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte); 445 break; 446 case "short": 447 types[ct++] = SQLType.SHORT; 448 types[ct++] = SIGNED; 449 reAlloc(2); 450 short si = isRef? *(v.get!(short*)): v.get!(short); 451 vals[vcl++] = cast(ubyte) (si & 0xff); 452 vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff); 453 break; 454 case "ushort": 455 types[ct++] = SQLType.SHORT; 456 types[ct++] = UNSIGNED; 457 reAlloc(2); 458 ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort); 459 vals[vcl++] = cast(ubyte) (us & 0xff); 460 vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff); 461 break; 462 case "int": 463 types[ct++] = SQLType.INT; 464 types[ct++] = SIGNED; 465 reAlloc(4); 466 int ii = isRef? *(v.get!(int*)): v.get!(int); 467 vals[vcl++] = cast(ubyte) (ii & 0xff); 468 vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff); 469 vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff); 470 vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff); 471 break; 472 case "uint": 473 types[ct++] = SQLType.INT; 474 types[ct++] = UNSIGNED; 475 reAlloc(4); 476 uint ui = isRef? *(v.get!(uint*)): v.get!(uint); 477 vals[vcl++] = cast(ubyte) (ui & 0xff); 478 vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff); 479 vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff); 480 vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff); 481 break; 482 case "long": 483 types[ct++] = SQLType.LONGLONG; 484 types[ct++] = SIGNED; 485 reAlloc(8); 486 long li = isRef? *(v.get!(long*)): v.get!(long); 487 vals[vcl++] = cast(ubyte) (li & 0xff); 488 vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff); 489 vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff); 490 vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff); 491 vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff); 492 vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff); 493 vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff); 494 vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff); 495 break; 496 case "ulong": 497 types[ct++] = SQLType.LONGLONG; 498 types[ct++] = UNSIGNED; 499 reAlloc(8); 500 ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong); 501 vals[vcl++] = cast(ubyte) (ul & 0xff); 502 vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff); 503 vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff); 504 vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff); 505 vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff); 506 vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff); 507 vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff); 508 vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff); 509 break; 510 case "float": 511 types[ct++] = SQLType.FLOAT; 512 types[ct++] = SIGNED; 513 reAlloc(4); 514 float f = isRef? *(v.get!(float*)): v.get!(float); 515 ubyte* ubp = cast(ubyte*) &f; 516 vals[vcl++] = *ubp++; 517 vals[vcl++] = *ubp++; 518 vals[vcl++] = *ubp++; 519 vals[vcl++] = *ubp; 520 break; 521 case "double": 522 types[ct++] = SQLType.DOUBLE; 523 types[ct++] = SIGNED; 524 reAlloc(8); 525 double d = isRef? *(v.get!(double*)): v.get!(double); 526 ubyte* ubp = cast(ubyte*) &d; 527 vals[vcl++] = *ubp++; 528 vals[vcl++] = *ubp++; 529 vals[vcl++] = *ubp++; 530 vals[vcl++] = *ubp++; 531 vals[vcl++] = *ubp++; 532 vals[vcl++] = *ubp++; 533 vals[vcl++] = *ubp++; 534 vals[vcl++] = *ubp; 535 break; 536 case "std.datetime.date.Date": 537 case "std.datetime.Date": 538 types[ct++] = SQLType.DATE; 539 types[ct++] = SIGNED; 540 Date date = isRef? *(v.get!(Date*)): v.get!(Date); 541 ubyte[] da = pack(date); 542 size_t l = da.length; 543 reAlloc(l); 544 vals[vcl..vcl+l] = da[]; 545 vcl += l; 546 break; 547 case "std.datetime.TimeOfDay": 548 case "std.datetime.Time": 549 types[ct++] = SQLType.TIME; 550 types[ct++] = SIGNED; 551 TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay); 552 ubyte[] ta = pack(time); 553 size_t l = ta.length; 554 reAlloc(l); 555 vals[vcl..vcl+l] = ta[]; 556 vcl += l; 557 break; 558 case "std.datetime.date.DateTime": 559 case "std.datetime.DateTime": 560 types[ct++] = SQLType.DATETIME; 561 types[ct++] = SIGNED; 562 DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime); 563 ubyte[] da = pack(dt); 564 size_t l = da.length; 565 reAlloc(l); 566 vals[vcl..vcl+l] = da[]; 567 vcl += l; 568 break; 569 case "mysql.types.Timestamp": 570 types[ct++] = SQLType.TIMESTAMP; 571 types[ct++] = SIGNED; 572 Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp); 573 DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep); 574 ubyte[] da = pack(dt); 575 size_t l = da.length; 576 reAlloc(l); 577 vals[vcl..vcl+l] = da[]; 578 vcl += l; 579 break; 580 case "immutable(char)[]": 581 if (ext == SQLType.INFER_FROM_D_TYPE) 582 types[ct++] = SQLType.VARCHAR; 583 else 584 types[ct++] = cast(ubyte) ext; 585 types[ct++] = SIGNED; 586 string s = isRef? *(v.get!(string*)): v.get!(string); 587 ubyte[] packed = packLCS(cast(void[]) s); 588 reAlloc(packed.length); 589 vals[vcl..vcl+packed.length] = packed[]; 590 vcl += packed.length; 591 break; 592 case "char[]": 593 if (ext == SQLType.INFER_FROM_D_TYPE) 594 types[ct++] = SQLType.VARCHAR; 595 else 596 types[ct++] = cast(ubyte) ext; 597 types[ct++] = SIGNED; 598 char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]); 599 ubyte[] packed = packLCS(cast(void[]) ca); 600 reAlloc(packed.length); 601 vals[vcl..vcl+packed.length] = packed[]; 602 vcl += packed.length; 603 break; 604 case "byte[]": 605 if (ext == SQLType.INFER_FROM_D_TYPE) 606 types[ct++] = SQLType.TINYBLOB; 607 else 608 types[ct++] = cast(ubyte) ext; 609 types[ct++] = SIGNED; 610 byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]); 611 ubyte[] packed = packLCS(cast(void[]) ba); 612 reAlloc(packed.length); 613 vals[vcl..vcl+packed.length] = packed[]; 614 vcl += packed.length; 615 break; 616 case "ubyte[]": 617 if (ext == SQLType.INFER_FROM_D_TYPE) 618 types[ct++] = SQLType.TINYBLOB; 619 else 620 types[ct++] = cast(ubyte) ext; 621 types[ct++] = SIGNED; 622 ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]); 623 ubyte[] packed = packLCS(cast(void[]) uba); 624 reAlloc(packed.length); 625 vals[vcl..vcl+packed.length] = packed[]; 626 vcl += packed.length; 627 break; 628 case "void": 629 throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__); 630 default: 631 throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__); 632 } 633 } 634 vals.length = vcl; 635 return types; 636 } 637 638 static void sendLongData(Connection conn, uint hStmt, ParameterSpecialization[] psa) 639 { 640 assert(psa.length <= ushort.max); // parameter number is sent as short 641 foreach (ushort i, PSN psn; psa) 642 { 643 if (!psn.chunkSize) continue; 644 uint cs = psn.chunkSize; 645 uint delegate(ubyte[]) dg = psn.chunkDelegate; 646 647 //TODO: All low-level commms should be moved into the mysql.protocol package. 648 ubyte[] chunk; 649 chunk.length = cs+11; 650 chunk.setPacketHeader(0 /*each chunk is separate cmd*/); 651 chunk[4] = CommandType.STMT_SEND_LONG_DATA; 652 hStmt.packInto(chunk[5..9]); // statement handle 653 packInto(i, chunk[9..11]); // parameter number 654 655 // byte 11 on is payload 656 for (;;) 657 { 658 uint sent = dg(chunk[11..cs+11]); 659 if (sent < cs) 660 { 661 if (sent == 0) // data was exact multiple of chunk size - all sent 662 break; 663 chunk.length = chunk.length - (cs-sent); // trim the chunk 664 sent += 7; // adjust for non-payload bytes 665 packInto!(uint, true)(cast(uint)sent, chunk[0..3]); 666 conn.send(chunk); 667 break; 668 } 669 conn.send(chunk); 670 } 671 } 672 } 673 674 static void sendCommand(Connection conn, uint hStmt, PreparedStmtHeaders psh, 675 Variant[] inParams, ParameterSpecialization[] psa) 676 { 677 conn.autoPurge(); 678 679 //TODO: All low-level commms should be moved into the mysql.protocol package. 680 ubyte[] packet; 681 conn.resetPacket(); 682 683 ubyte[] prefix = makePSPrefix(hStmt, 0); 684 size_t len = prefix.length; 685 bool longData; 686 687 if (psh.paramCount) 688 { 689 ubyte[] one = [ 1 ]; 690 ubyte[] vals; 691 ubyte[] types = analyseParams(inParams, psa, vals, longData); 692 ubyte[] nbm = makeBitmap(inParams); 693 packet = prefix ~ nbm ~ one ~ types ~ vals; 694 } 695 else 696 packet = prefix; 697 698 if (longData) 699 sendLongData(conn, hStmt, psa); 700 701 assert(packet.length <= uint.max); 702 packet.setPacketHeader(conn.pktNumber); 703 conn.bumpPacket(); 704 conn.send(packet); 705 } 706 } 707 708 /++ 709 A class representing a database connection. 710 711 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 712 creating a new Connection directly. That will provide certain benefits, 713 such as reusing old connections and automatic cleanup (no need to close 714 the connection when done). 715 716 ------------------ 717 // Suggested usage: 718 719 { 720 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 721 scope(exit) con.close(); 722 723 // Use the connection 724 ... 725 } 726 ------------------ 727 +/ 728 //TODO: All low-level commms should be moved into the mysql.protocol package. 729 class Connection 730 { 731 /+ 732 The Connection is responsible for handshaking with the server to establish 733 authentication. It then passes client preferences to the server, and 734 subsequently is the channel for all command packets that are sent, and all 735 response packets received. 736 737 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one 738 byte as a packet number. Connection deals with the headers and ensures that 739 packet numbers are sequential. 740 741 The initial packet is sent by the server - essentially a 'hello' packet 742 inviting login. That packet has a sequence number of zero. That sequence 743 number is the incremented by client and server packets through the handshake 744 sequence. 745 746 After login all further sequences are initialized by the client sending a 747 command packet with a zero sequence number, to which the server replies with 748 zero or more packets with sequential sequence numbers. 749 +/ 750 package: 751 enum OpenState 752 { 753 /// We have not yet connected to the server, or have sent QUIT to the 754 /// server and closed the connection 755 notConnected, 756 /// We have connected to the server and parsed the greeting, but not 757 /// yet authenticated 758 connected, 759 /// We have successfully authenticated against the server, and need to 760 /// send QUIT to the server when closing the connection 761 authenticated 762 } 763 OpenState _open; 764 MySQLSocket _socket; 765 766 SvrCapFlags _sCaps, _cCaps; 767 uint _sThread; 768 ushort _serverStatus; 769 ubyte _sCharSet, _protocol; 770 string _serverVersion; 771 772 string _host, _user, _pwd, _db; 773 ushort _port; 774 775 MySQLSocketType _socketType; 776 777 OpenSocketCallbackPhobos _openSocketPhobos; 778 OpenSocketCallbackVibeD _openSocketVibeD; 779 780 ulong _insertID; 781 782 // This gets incremented every time a command is issued or results are purged, 783 // so a ResultRange can tell whether it's been invalidated. 784 ulong _lastCommandID; 785 786 // Whether there are rows, headers or bimary data waiting to be retreived. 787 // MySQL protocol doesn't permit performing any other action until all 788 // such data is read. 789 bool _rowsPending, _headersPending, _binaryPending; 790 791 // Field count of last performed command. 792 //TODO: Does Connection need to store this? 793 ushort _fieldCount; 794 795 // ResultSetHeaders of last performed command. 796 //TODO: Does Connection need to store this? Is this even used? 797 ResultSetHeaders _rsh; 798 799 // This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise 800 // you'll get the dreaded "packet out of order" message. It, and the socket connection are 801 // the reason why most other objects require a connection object for their construction. 802 ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct 803 /// ordering. First packet should have 0 804 @property ubyte pktNumber() { return _cpn; } 805 void bumpPacket() { _cpn++; } 806 void resetPacket() { _cpn = 0; } 807 808 version(Have_vibe_d_core) {} else 809 pure const nothrow invariant() 810 { 811 assert(_socketType != MySQLSocketType.vibed); 812 } 813 814 ubyte[] getPacket() 815 { 816 scope(failure) kill(); 817 818 ubyte[4] header; 819 _socket.read(header); 820 // number of bytes always set as 24-bit 821 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 822 enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order"); 823 bumpPacket(); 824 825 ubyte[] packet = new ubyte[numDataBytes]; 826 _socket.read(packet); 827 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 828 return packet; 829 } 830 831 void send(const(ubyte)[] packet) 832 in 833 { 834 assert(packet.length > 4); // at least 1 byte more than header 835 } 836 body 837 { 838 _socket.write(packet); 839 } 840 841 void send(const(ubyte)[] header, const(ubyte)[] data) 842 in 843 { 844 assert(header.length == 4 || header.length == 5/*command type included*/); 845 } 846 body 847 { 848 _socket.write(header); 849 if(data.length) 850 _socket.write(data); 851 } 852 853 void sendCmd(T)(CommandType cmd, const(T)[] data) 854 in 855 { 856 // Internal thread states. Clients shouldn't use this 857 assert(cmd != CommandType.SLEEP); 858 assert(cmd != CommandType.CONNECT); 859 assert(cmd != CommandType.TIME); 860 assert(cmd != CommandType.DELAYED_INSERT); 861 assert(cmd != CommandType.CONNECT_OUT); 862 863 // Deprecated 864 assert(cmd != CommandType.CREATE_DB); 865 assert(cmd != CommandType.DROP_DB); 866 assert(cmd != CommandType.TABLE_DUMP); 867 868 // cannot send more than uint.max bytes. TODO: better error message if we try? 869 assert(data.length <= uint.max); 870 } 871 out 872 { 873 // at this point we should have sent a command 874 assert(pktNumber == 1); 875 } 876 body 877 { 878 autoPurge(); 879 880 scope(failure) kill(); 881 882 _lastCommandID++; 883 884 if(!_socket.connected) 885 { 886 if(cmd == CommandType.QUIT) 887 return; // Don't bother reopening connection just to quit 888 889 _open = OpenState.notConnected; 890 connect(_clientCapabilities); 891 } 892 893 resetPacket(); 894 895 ubyte[] header; 896 header.length = 4 /*header*/ + 1 /*cmd*/; 897 header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/); 898 header[4] = cmd; 899 bumpPacket(); 900 901 send(header, cast(const(ubyte)[])data); 902 } 903 904 OKErrorPacket getCmdResponse(bool asString = false) 905 { 906 auto okp = OKErrorPacket(getPacket()); 907 enforcePacketOK(okp); 908 _serverStatus = okp.serverStatus; 909 return okp; 910 } 911 912 /// Get the next `mysql.result.Row` of a pending result set. 913 Row getNextRow() 914 { 915 scope(failure) kill(); 916 917 if (_headersPending) 918 { 919 _rsh = ResultSetHeaders(this, _fieldCount); 920 _headersPending = false; 921 } 922 ubyte[] packet; 923 Row rr; 924 packet = getPacket(); 925 if(packet.front == ResultPacketMarker.error) 926 throw new MYXReceived(OKErrorPacket(packet), __FILE__, __LINE__); 927 928 if (packet.isEOFPacket()) 929 { 930 _rowsPending = _binaryPending = false; 931 return rr; 932 } 933 if (_binaryPending) 934 rr = Row(this, packet, _rsh, true); 935 else 936 rr = Row(this, packet, _rsh, false); 937 //rr._valid = true; 938 return rr; 939 } 940 941 ubyte[] buildAuthPacket(ubyte[] token) 942 in 943 { 944 assert(token.length == 20); 945 } 946 body 947 { 948 ubyte[] packet; 949 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1); 950 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 951 952 // NOTE: we'll set the header last when we know the size 953 954 // Set the default capabilities required by the client 955 _cCaps.packInto(packet[4..8]); 956 957 // Request a conventional maximum packet length. 958 1.packInto(packet[8..12]); 959 960 packet ~= 33; // Set UTF-8 as default charSet 961 962 // There's a statutory block of zero bytes here - fill them in. 963 foreach(i; 0 .. 23) 964 packet ~= 0; 965 966 // Add the user name as a null terminated string 967 foreach(i; 0 .. _user.length) 968 packet ~= _user[i]; 969 packet ~= 0; // \0 970 971 // Add our calculated authentication token as a length prefixed string. 972 assert(token.length <= ubyte.max); 973 if(_pwd.length == 0) // Omit the token if the account has no password 974 packet ~= 0; 975 else 976 { 977 packet ~= cast(ubyte)token.length; 978 foreach(i; 0 .. token.length) 979 packet ~= token[i]; 980 } 981 982 // Add the default database as a null terminated string 983 foreach(i; 0 .. _db.length) 984 packet ~= _db[i]; 985 packet ~= 0; // \0 986 987 // The server sent us a greeting with packet number 0, so we send the auth packet 988 // back with the next number. 989 packet.setPacketHeader(pktNumber); 990 bumpPacket(); 991 return packet; 992 } 993 994 void consumeServerInfo(ref ubyte[] packet) 995 { 996 scope(failure) kill(); 997 998 _sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 999 _sCharSet = packet.consume!ubyte(); // server_language 1000 _serverStatus = packet.consume!ushort(); //server_status 1001 _sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 1002 _sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 1003 1004 enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 1005 enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 1006 } 1007 1008 ubyte[] parseGreeting() 1009 { 1010 scope(failure) kill(); 1011 1012 ubyte[] packet = getPacket(); 1013 1014 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 1015 { 1016 auto okp = OKErrorPacket(packet); 1017 enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 1018 } 1019 1020 _protocol = packet.consume!ubyte(); 1021 1022 _serverVersion = packet.consume!string(packet.countUntil(0)); 1023 packet.skip(1); // \0 terminated _serverVersion 1024 1025 _sThread = packet.consume!uint(); 1026 1027 // read first part of scramble buf 1028 ubyte[] authBuf; 1029 authBuf.length = 255; 1030 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 1031 1032 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 1033 1034 consumeServerInfo(packet); 1035 1036 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 1037 packet.skip(10); // filler of \0 1038 1039 // rest of the scramble 1040 auto len = packet.countUntil(0); 1041 enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 1042 enforce(authBuf.length > 8+len); 1043 authBuf[8..8+len] = packet.consume(len)[]; 1044 authBuf.length = 8+len; // cut to correct size 1045 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 1046 1047 return authBuf; 1048 } 1049 1050 static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port) 1051 { 1052 auto s = new PlainPhobosSocket(); 1053 s.connect(new InternetAddress(host, port)); 1054 return s; 1055 } 1056 1057 static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port) 1058 { 1059 version(Have_vibe_d_core) 1060 return vibe.core.net.connectTCP(host, port); 1061 else 1062 assert(0); 1063 } 1064 1065 void initConnection() 1066 { 1067 resetPacket(); 1068 final switch(_socketType) 1069 { 1070 case MySQLSocketType.phobos: 1071 _socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port)); 1072 break; 1073 1074 case MySQLSocketType.vibed: 1075 version(Have_vibe_d_core) { 1076 _socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port)); 1077 break; 1078 } else assert(0, "Unsupported socket type. Need version Have_vibe_d_core."); 1079 } 1080 } 1081 1082 ubyte[] makeToken(ubyte[] authBuf) 1083 { 1084 auto pass1 = sha1Of(cast(const(ubyte)[])_pwd); 1085 auto pass2 = sha1Of(pass1); 1086 1087 SHA1 sha1; 1088 sha1.start(); 1089 sha1.put(authBuf); 1090 sha1.put(pass2); 1091 auto result = sha1.finish(); 1092 foreach (size_t i; 0..20) 1093 result[i] = result[i] ^ pass1[i]; 1094 return result.dup; 1095 } 1096 1097 SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 1098 { 1099 SvrCapFlags common; 1100 uint filter = 1; 1101 foreach (size_t i; 0..uint.sizeof) 1102 { 1103 bool serverSupport = (server & filter) != 0; // can the server do this capability? 1104 bool clientSupport = (client & filter) != 0; // can we support it? 1105 if(serverSupport && clientSupport) 1106 common |= filter; 1107 filter <<= 1; // check next flag 1108 } 1109 return common; 1110 } 1111 1112 void setClientFlags(SvrCapFlags capFlags) 1113 { 1114 _cCaps = getCommonCapabilities(_sCaps, capFlags); 1115 1116 // We cannot operate in <4.1 protocol, so we'll force it even if the user 1117 // didn't supply it 1118 _cCaps |= SvrCapFlags.PROTOCOL41; 1119 _cCaps |= SvrCapFlags.SECURE_CONNECTION; 1120 } 1121 1122 void authenticate(ubyte[] greeting) 1123 in 1124 { 1125 assert(_open == OpenState.connected); 1126 } 1127 out 1128 { 1129 assert(_open == OpenState.authenticated); 1130 } 1131 body 1132 { 1133 auto token = makeToken(greeting); 1134 auto authPacket = buildAuthPacket(token); 1135 send(authPacket); 1136 1137 auto packet = getPacket(); 1138 auto okp = OKErrorPacket(packet); 1139 enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 1140 _open = OpenState.authenticated; 1141 } 1142 1143 SvrCapFlags _clientCapabilities; 1144 1145 void connect(SvrCapFlags clientCapabilities) 1146 in 1147 { 1148 assert(closed); 1149 } 1150 out 1151 { 1152 assert(_open == OpenState.authenticated); 1153 } 1154 body 1155 { 1156 initConnection(); 1157 auto greeting = parseGreeting(); 1158 _open = OpenState.connected; 1159 1160 _clientCapabilities = clientCapabilities; 1161 setClientFlags(clientCapabilities); 1162 authenticate(greeting); 1163 } 1164 1165 /// Forcefully close the socket without sending the quit command. 1166 /// Needed in case an error leaves communatations in an undefined or non-recoverable state. 1167 void kill() 1168 { 1169 if(_socket.connected) 1170 _socket.close(); 1171 _open = OpenState.notConnected; 1172 // any pending data is gone. Any statements to release will be released 1173 // on the server automatically. 1174 _headersPending = _rowsPending = _binaryPending = false; 1175 1176 preparedRegistrations.clear(); 1177 } 1178 1179 /// Called whenever mysql-native needs to send a command to the server 1180 /// and be sure there aren't any pending results (which would prevent 1181 /// a new command from being sent). 1182 void autoPurge() 1183 { 1184 // This is called every time a command is sent, 1185 // so detect & prevent infinite recursion. 1186 static bool isAutoPurging = false; 1187 1188 if(isAutoPurging) 1189 return; 1190 1191 isAutoPurging = true; 1192 scope(exit) isAutoPurging = false; 1193 1194 try 1195 { 1196 purgeResult(); 1197 releaseQueued(); 1198 } 1199 catch(Exception e) 1200 { 1201 // likely the connection was closed, so reset any state. 1202 // Don't treat this as a real error, because everything will be reset when we 1203 // reconnect. 1204 kill(); 1205 } 1206 } 1207 1208 /// Lookup per-connection prepared statement info by SQL 1209 private PreparedRegistrations!PreparedServerInfo preparedRegistrations; 1210 1211 /// Releases all prepared statements that are queued for release. 1212 void releaseQueued() 1213 { 1214 foreach(sql, info; preparedRegistrations.directLookup) 1215 if(info.queuedForRelease) 1216 { 1217 immediateReleasePrepared(info.statementId); 1218 preparedRegistrations.directLookup.remove(sql); 1219 } 1220 } 1221 1222 /// Returns null if not found 1223 Nullable!PreparedServerInfo getPreparedServerInfo(const string sql) pure nothrow 1224 { 1225 return preparedRegistrations[sql]; 1226 } 1227 1228 /// If already registered, simply returns the cached `PreparedServerInfo`. 1229 PreparedServerInfo registerIfNeeded(string sql) 1230 { 1231 return preparedRegistrations.registerIfNeeded(sql, &performRegister); 1232 } 1233 1234 PreparedServerInfo performRegister(string sql) 1235 { 1236 scope(failure) kill(); 1237 1238 PreparedServerInfo info; 1239 1240 sendCmd(CommandType.STMT_PREPARE, sql); 1241 _fieldCount = 0; 1242 1243 //TODO: All packet handling should be moved into the mysql.protocol package. 1244 ubyte[] packet = getPacket(); 1245 if(packet.front == ResultPacketMarker.ok) 1246 { 1247 packet.popFront(); 1248 info.statementId = packet.consume!int(); 1249 _fieldCount = packet.consume!short(); 1250 info.numParams = packet.consume!short(); 1251 1252 packet.popFront(); // one byte filler 1253 info.psWarnings = packet.consume!short(); 1254 1255 // At this point the server also sends field specs for parameters 1256 // and columns if there were any of each 1257 info.headers = PreparedStmtHeaders(this, _fieldCount, info.numParams); 1258 } 1259 else if(packet.front == ResultPacketMarker.error) 1260 { 1261 auto error = OKErrorPacket(packet); 1262 enforcePacketOK(error); 1263 assert(0); // FIXME: what now? 1264 } 1265 else 1266 assert(0); // FIXME: what now? 1267 1268 return info; 1269 } 1270 1271 private void immediateReleasePrepared(uint statementId) 1272 { 1273 scope(failure) kill(); 1274 1275 if(closed()) 1276 return; 1277 1278 //TODO: All low-level commms should be moved into the mysql.protocol package. 1279 ubyte[9] packet_buf; 1280 ubyte[] packet = packet_buf; 1281 packet.setPacketHeader(0/*packet number*/); 1282 bumpPacket(); 1283 packet[4] = CommandType.STMT_CLOSE; 1284 statementId.packInto(packet[5..9]); 1285 purgeResult(); 1286 send(packet); 1287 // It seems that the server does not find it necessary to send a response 1288 // for this command. 1289 } 1290 1291 public: 1292 1293 /++ 1294 Construct opened connection. 1295 1296 Throws `mysql.exceptions.MYX` upon failure to connect. 1297 1298 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 1299 creating a new Connection directly. That will provide certain benefits, 1300 such as reusing old connections and automatic cleanup (no need to close 1301 the connection when done). 1302 1303 ------------------ 1304 // Suggested usage: 1305 1306 { 1307 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 1308 scope(exit) con.close(); 1309 1310 // Use the connection 1311 ... 1312 } 1313 ------------------ 1314 1315 Params: 1316 cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld" 1317 (TODO: The connection string needs work to allow for semicolons in its parts!) 1318 socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos, 1319 unless compiled with `-version=Have_vibe_d_core` (set automatically 1320 if using $(LINK2 http://code.dlang.org/getting_started, DUB)). 1321 openSocket = Optional callback which should return a newly-opened Phobos 1322 or Vibe.d TCP socket. This allows custom sockets to be used, 1323 subclassed from Phobos's or Vibe.d's sockets. 1324 host = An IP address in numeric dotted form, or as a host name. 1325 user = The user name to authenticate. 1326 password = User's password. 1327 db = Desired initial database. 1328 capFlags = The set of flag bits from the server's capabilities that the client requires 1329 +/ 1330 //After the connection is created, and the initial invitation is received from the server 1331 //client preferences can be set, and authentication can then be attempted. 1332 this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1333 { 1334 version(Have_vibe_d_core) 1335 enum defaultSocketType = MySQLSocketType.vibed; 1336 else 1337 enum defaultSocketType = MySQLSocketType.phobos; 1338 1339 this(defaultSocketType, host, user, pwd, db, port, capFlags); 1340 } 1341 1342 ///ditto 1343 this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1344 { 1345 version(Have_vibe_d_core) {} else 1346 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 1347 1348 this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD, 1349 host, user, pwd, db, port, capFlags); 1350 } 1351 1352 ///ditto 1353 this(OpenSocketCallbackPhobos openSocket, 1354 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1355 { 1356 this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags); 1357 } 1358 1359 version(Have_vibe_d_core) 1360 ///ditto 1361 this(OpenSocketCallbackVibeD openSocket, 1362 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1363 { 1364 this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags); 1365 } 1366 1367 ///ditto 1368 private this(MySQLSocketType socketType, 1369 OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD, 1370 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 1371 in 1372 { 1373 final switch(socketType) 1374 { 1375 case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break; 1376 case MySQLSocketType.vibed: assert(openSocketVibeD !is null); break; 1377 } 1378 } 1379 body 1380 { 1381 enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1"); 1382 enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection"); 1383 version(Have_vibe_d_core) {} else 1384 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 1385 1386 _socketType = socketType; 1387 _host = host; 1388 _user = user; 1389 _pwd = pwd; 1390 _db = db; 1391 _port = port; 1392 1393 _openSocketPhobos = openSocketPhobos; 1394 _openSocketVibeD = openSocketVibeD; 1395 1396 connect(capFlags); 1397 } 1398 1399 ///ditto 1400 //After the connection is created, and the initial invitation is received from the server 1401 //client preferences can be set, and authentication can then be attempted. 1402 this(string cs, SvrCapFlags capFlags = defaultClientFlags) 1403 { 1404 string[] a = parseConnectionString(cs); 1405 this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1406 } 1407 1408 ///ditto 1409 this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags) 1410 { 1411 string[] a = parseConnectionString(cs); 1412 this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1413 } 1414 1415 ///ditto 1416 this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 1417 { 1418 string[] a = parseConnectionString(cs); 1419 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1420 } 1421 1422 version(Have_vibe_d_core) 1423 ///ditto 1424 this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 1425 { 1426 string[] a = parseConnectionString(cs); 1427 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 1428 } 1429 1430 /++ 1431 Check whether this `Connection` is still connected to the server, or if 1432 the connection has been closed. 1433 +/ 1434 @property bool closed() 1435 { 1436 return _open == OpenState.notConnected || !_socket.connected; 1437 } 1438 1439 /++ 1440 Explicitly close the connection. 1441 1442 Idiomatic use as follows is suggested: 1443 ------------------ 1444 { 1445 auto con = new Connection("localhost:user:password:mysqld"); 1446 scope(exit) con.close(); 1447 // Use the connection 1448 ... 1449 } 1450 ------------------ 1451 +/ 1452 void close() 1453 { 1454 // This is a two-stage process. First tell the server we are quitting this 1455 // connection, and then close the socket. 1456 1457 if (_open == OpenState.authenticated && _socket.connected) 1458 quit(); 1459 1460 if (_open == OpenState.connected) 1461 kill(); 1462 resetPacket(); 1463 } 1464 1465 /++ 1466 Reconnects to the server using the same connection settings originally 1467 used to create the `Connection`. 1468 1469 Optionally takes a `mysql.protocol.constants.SvrCapFlags`, allowing you to 1470 reconnect using a different set of server capability flags. 1471 1472 Normally, if the connection is already open, this will do nothing. However, 1473 if you request a different set of `mysql.protocol.constants.SvrCapFlags` 1474 then was originally used to create the `Connection`, the connection will 1475 be closed and then reconnected using the new `mysql.protocol.constants.SvrCapFlags`. 1476 +/ 1477 //TODO: This needs unittested. 1478 void reconnect() 1479 { 1480 reconnect(_clientCapabilities); 1481 } 1482 1483 ///ditto 1484 void reconnect(SvrCapFlags clientCapabilities) 1485 { 1486 bool sameCaps = clientCapabilities == _clientCapabilities; 1487 if(!closed) 1488 { 1489 // Same caps as before? 1490 if(clientCapabilities == _clientCapabilities) 1491 return; // Nothing to do, just keep current connection 1492 1493 close(); 1494 } 1495 1496 connect(clientCapabilities); 1497 } 1498 1499 private void quit() 1500 in 1501 { 1502 assert(_open == OpenState.authenticated); 1503 } 1504 body 1505 { 1506 sendCmd(CommandType.QUIT, []); 1507 // No response is sent for a quit packet 1508 _open = OpenState.connected; 1509 } 1510 1511 /++ 1512 Parses a connection string of the form 1513 `"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"` 1514 1515 Port is optional and defaults to 3306. 1516 1517 Whitespace surrounding any name or value is automatically stripped. 1518 1519 Returns a five-element array of strings in this order: 1520 $(UL 1521 $(LI [0]: host) 1522 $(LI [1]: user) 1523 $(LI [2]: pwd) 1524 $(LI [3]: db) 1525 $(LI [4]: port) 1526 ) 1527 1528 (TODO: The connection string needs work to allow for semicolons in its parts!) 1529 +/ 1530 //TODO: Replace the return value with a proper struct. 1531 static string[] parseConnectionString(string cs) 1532 { 1533 string[] rv; 1534 rv.length = 5; 1535 rv[4] = "3306"; // Default port 1536 string[] a = split(cs, ";"); 1537 foreach (s; a) 1538 { 1539 string[] a2 = split(s, "="); 1540 enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs); 1541 string name = strip(a2[0]); 1542 string val = strip(a2[1]); 1543 switch (name) 1544 { 1545 case "host": 1546 rv[0] = val; 1547 break; 1548 case "user": 1549 rv[1] = val; 1550 break; 1551 case "pwd": 1552 rv[2] = val; 1553 break; 1554 case "db": 1555 rv[3] = val; 1556 break; 1557 case "port": 1558 rv[4] = val; 1559 break; 1560 default: 1561 throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__); 1562 } 1563 } 1564 return rv; 1565 } 1566 1567 /++ 1568 Select a current database. 1569 1570 Throws `mysql.exceptions.MYX` upon failure. 1571 1572 Params: dbName = Name of the requested database 1573 +/ 1574 void selectDB(string dbName) 1575 { 1576 sendCmd(CommandType.INIT_DB, dbName); 1577 getCmdResponse(); 1578 _db = dbName; 1579 } 1580 1581 /++ 1582 Check the server status. 1583 1584 Throws `mysql.exceptions.MYX` upon failure. 1585 1586 Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined 1587 +/ 1588 OKErrorPacket pingServer() 1589 { 1590 sendCmd(CommandType.PING, []); 1591 return getCmdResponse(); 1592 } 1593 1594 /++ 1595 Refresh some feature(s) of the server. 1596 1597 Throws `mysql.exceptions.MYX` upon failure. 1598 1599 Returns: An `mysql.protocol.packets.OKErrorPacket` from which server status can be determined 1600 +/ 1601 OKErrorPacket refreshServer(RefreshFlags flags) 1602 { 1603 sendCmd(CommandType.REFRESH, [flags]); 1604 return getCmdResponse(); 1605 } 1606 1607 /++ 1608 Flush any outstanding result set elements. 1609 1610 When the server responds to a command that produces a result set, it 1611 queues the whole set of corresponding packets over the current connection. 1612 Before that `Connection` can embark on any new command, it must receive 1613 all of those packets and junk them. 1614 1615 As of v1.1.4, this is done automatically as needed. But you can still 1616 call this manually to force a purge to occur when you want. 1617 1618 See_Also: $(LINK http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/) 1619 +/ 1620 ulong purgeResult() 1621 { 1622 scope(failure) kill(); 1623 1624 _lastCommandID++; 1625 1626 ulong rows = 0; 1627 if (_headersPending) 1628 { 1629 for (size_t i = 0;; i++) 1630 { 1631 if (getPacket().isEOFPacket()) 1632 { 1633 _headersPending = false; 1634 break; 1635 } 1636 enforceEx!MYXProtocol(i < _fieldCount, 1637 text("Field header count (", _fieldCount, ") exceeded but no EOF packet found.")); 1638 } 1639 } 1640 if (_rowsPending) 1641 { 1642 for (;; rows++) 1643 { 1644 if (getPacket().isEOFPacket()) 1645 { 1646 _rowsPending = _binaryPending = false; 1647 break; 1648 } 1649 } 1650 } 1651 resetPacket(); 1652 return rows; 1653 } 1654 1655 /++ 1656 Get a textual report on the server status. 1657 1658 (COM_STATISTICS) 1659 +/ 1660 string serverStats() 1661 { 1662 sendCmd(CommandType.STATISTICS, []); 1663 return cast(string) getPacket(); 1664 } 1665 1666 /++ 1667 Enable multiple statement commands. 1668 1669 This can be used later if this feature was not requested in the client capability flags. 1670 1671 Warning: This functionality is currently untested. 1672 1673 Params: on = Boolean value to turn the capability on or off. 1674 +/ 1675 //TODO: Need to test this 1676 void enableMultiStatements(bool on) 1677 { 1678 scope(failure) kill(); 1679 1680 ubyte[] t; 1681 t.length = 2; 1682 t[0] = on ? 0 : 1; 1683 t[1] = 0; 1684 sendCmd(CommandType.STMT_OPTION, t); 1685 1686 // For some reason this command gets an EOF packet as response 1687 auto packet = getPacket(); 1688 enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1689 } 1690 1691 /// Return the in-force protocol number. 1692 @property ubyte protocol() pure const nothrow { return _protocol; } 1693 /// Server version 1694 @property string serverVersion() pure const nothrow { return _serverVersion; } 1695 /// Server capability flags 1696 @property uint serverCapabilities() pure const nothrow { return _sCaps; } 1697 /// Server status 1698 @property ushort serverStatus() pure const nothrow { return _serverStatus; } 1699 /// Current character set 1700 @property ubyte charSet() pure const nothrow { return _sCharSet; } 1701 /// Current database 1702 @property string currentDB() pure const nothrow { return _db; } 1703 /// Socket type being used, Phobos or Vibe.d 1704 @property MySQLSocketType socketType() pure const nothrow { return _socketType; } 1705 1706 /// After a command that inserted a row into a table with an auto-increment 1707 /// ID column, this method allows you to retrieve the last insert ID. 1708 @property ulong lastInsertID() pure const nothrow { return _insertID; } 1709 1710 /// This gets incremented every time a command is issued or results are purged, 1711 /// so a `mysql.result.ResultRange` can tell whether it's been invalidated. 1712 @property ulong lastCommandID() pure const nothrow { return _lastCommandID; } 1713 1714 /// Gets whether rows are pending. 1715 /// 1716 /// Note, you may want `hasPending` instead. 1717 @property bool rowsPending() pure const nothrow { return _rowsPending; } 1718 1719 /// Gets whether anything (rows, headers or binary) is pending. 1720 /// New commands cannot be sent on a connection while anything is pending 1721 /// (the pending data will automatically be purged.) 1722 @property bool hasPending() pure const nothrow 1723 { 1724 return _rowsPending || _headersPending || _binaryPending; 1725 } 1726 1727 /// Gets the result header's field descriptions. 1728 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 1729 1730 /++ 1731 Manually register a prepared statement on this connection. 1732 1733 Does nothing if statement is already registered on this connection. 1734 1735 Calling this is not strictly necessary, as the prepared statement will 1736 automatically be registered upon its first use on any `Connection`. 1737 This is provided for those who prefer eager registration over lazy 1738 for performance reasons. 1739 +/ 1740 void register(Prepared prepared) 1741 { 1742 register(prepared.sql); 1743 } 1744 1745 ///ditto 1746 void register(string sql) 1747 { 1748 registerIfNeeded(sql); 1749 } 1750 1751 /++ 1752 Manually release a prepared statement on this connection. 1753 1754 This method tells the server that it can dispose of the information it 1755 holds about the current prepared statement. 1756 1757 Calling this is not strictly necessary. The server considers prepared 1758 statements to be per-connection, so they'll go away when the connection 1759 closes anyway. This is provided in case direct control is actually needed. 1760 1761 If you choose to use a reference counted struct to call this automatically, 1762 be aware that embedding reference counted structs inside garbage collectible 1763 heap objects is dangerous and should be avoided, as it can lead to various 1764 hidden problems, from crashes to race conditions. (See the discussion at issue 1765 $(LINK2 https://github.com/mysql-d/mysql-native/issues/159, #159) 1766 for details.) Instead, it may be better to simply avoid trying to manage 1767 their release at all, as it's not usually necessary. Or to periodically 1768 release all prepared statements, and simply allow mysql-native to 1769 automatically re-register them upon their next use. 1770 1771 Notes: 1772 1773 In actuality, the server might not immediately be told to release the 1774 statement (although `isRegistered` will still report `false`). 1775 1776 This is because there could be a `mysql.result.ResultRange` with results 1777 still pending for retrieval, and the protocol doesn't allow sending commands 1778 (such as "release a prepared statement") to the server while data is pending. 1779 Therefore, this function may instead queue the statement to be released 1780 when it is safe to do so: Either the next time a result set is purged or 1781 the next time a command (such as `mysql.commands.query` or 1782 `mysql.commands.exec`) is performed (because such commands automatically 1783 purge any pending results). 1784 1785 This function does NOT auto-purge because, if this is ever called from 1786 automatic resource management cleanup (refcounting, RAII, etc), that 1787 would create ugly situations where hidden, implicit behavior triggers 1788 an unexpected auto-purge. 1789 +/ 1790 void release(Prepared prepared) 1791 { 1792 release(prepared.sql); 1793 } 1794 1795 ///ditto 1796 void release(string sql) 1797 { 1798 //TODO: Don't queue it if nothing is pending. Just do it immediately. 1799 // But need to be certain both situations are unittested. 1800 preparedRegistrations.queueForRelease(sql); 1801 } 1802 1803 /++ 1804 Manually release all prepared statements on this connection. 1805 1806 While minimal, every prepared statement registered on a connection does 1807 use up a small amount of resources in both mysql-native and on the server. 1808 Additionally, servers can be configured 1809 $(LINK2 https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_prepared_stmt_count, 1810 to limit the number of prepared statements) 1811 allowed on a connection at one time (the default, however 1812 is quite high). Note also, that certain overloads of `mysql.commands.exec`, 1813 `mysql.commands.query`, etc. register prepared statements behind-the-scenes 1814 which are cached for quick re-use later. 1815 1816 Therefore, it may occasionally be useful to clear out all prepared 1817 statements on a connection, together with all resources used by them (or 1818 at least leave the resources ready for garbage-collection). This function 1819 does just that. 1820 1821 Note that this is ALWAYS COMPLETELY SAFE to call, even if you still have 1822 live prepared statements you intend to use again. This is safe because 1823 mysql-native will automatically register or re-register prepared statements 1824 as-needed. 1825 1826 Notes: 1827 1828 In actuality, the prepared statements might not be immediately released 1829 (although `isRegistered` will still report `false` for them). 1830 1831 This is because there could be a `mysql.result.ResultRange` with results 1832 still pending for retrieval, and the protocol doesn't allow sending commands 1833 (such as "release a prepared statement") to the server while data is pending. 1834 Therefore, this function may instead queue the statement to be released 1835 when it is safe to do so: Either the next time a result set is purged or 1836 the next time a command (such as `mysql.commands.query` or 1837 `mysql.commands.exec`) is performed (because such commands automatically 1838 purge any pending results). 1839 1840 This function does NOT auto-purge because, if this is ever called from 1841 automatic resource management cleanup (refcounting, RAII, etc), that 1842 would create ugly situations where hidden, implicit behavior triggers 1843 an unexpected auto-purge. 1844 +/ 1845 void releaseAll() 1846 { 1847 preparedRegistrations.queueAllForRelease(); 1848 } 1849 1850 @("releaseAll") 1851 debug(MYSQLN_TESTS) 1852 unittest 1853 { 1854 mixin(scopedCn); 1855 1856 cn.exec("DROP TABLE IF EXISTS `releaseAll`"); 1857 cn.exec("CREATE TABLE `releaseAll` (a INTEGER) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 1858 1859 auto preparedSelect = cn.prepare("SELECT * FROM `releaseAll`"); 1860 auto preparedInsert = cn.prepare("INSERT INTO `releaseAll` (a) VALUES (1)"); 1861 assert(cn.isRegistered(preparedSelect)); 1862 assert(cn.isRegistered(preparedInsert)); 1863 1864 cn.releaseAll(); 1865 assert(!cn.isRegistered(preparedSelect)); 1866 assert(!cn.isRegistered(preparedInsert)); 1867 cn.exec("INSERT INTO `releaseAll` (a) VALUES (1)"); 1868 assert(!cn.isRegistered(preparedSelect)); 1869 assert(!cn.isRegistered(preparedInsert)); 1870 1871 cn.exec(preparedInsert); 1872 cn.query(preparedSelect).array; 1873 assert(cn.isRegistered(preparedSelect)); 1874 assert(cn.isRegistered(preparedInsert)); 1875 1876 } 1877 1878 /// Is the given statement registered on this connection as a prepared statement? 1879 bool isRegistered(Prepared prepared) 1880 { 1881 return isRegistered( prepared.sql ); 1882 } 1883 1884 ///ditto 1885 bool isRegistered(string sql) 1886 { 1887 return isRegistered( preparedRegistrations[sql] ); 1888 } 1889 1890 ///ditto 1891 package bool isRegistered(Nullable!PreparedServerInfo info) 1892 { 1893 return !info.isNull && !info.queuedForRelease; 1894 } 1895 } 1896 1897 // Test register, release, isRegistered, and auto-register for prepared statements 1898 @("autoRegistration") 1899 debug(MYSQLN_TESTS) 1900 unittest 1901 { 1902 import mysql.connection; 1903 import mysql.test.common; 1904 1905 Prepared preparedInsert; 1906 Prepared preparedSelect; 1907 immutable insertSQL = "INSERT INTO `autoRegistration` VALUES (1), (2)"; 1908 immutable selectSQL = "SELECT `val` FROM `autoRegistration`"; 1909 int queryTupleResult; 1910 1911 { 1912 mixin(scopedCn); 1913 1914 // Setup 1915 cn.exec("DROP TABLE IF EXISTS `autoRegistration`"); 1916 cn.exec("CREATE TABLE `autoRegistration` ( 1917 `val` INTEGER 1918 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 1919 1920 // Initial register 1921 preparedInsert = cn.prepare(insertSQL); 1922 preparedSelect = cn.prepare(selectSQL); 1923 1924 // Test basic register, release, isRegistered 1925 assert(cn.isRegistered(preparedInsert)); 1926 assert(cn.isRegistered(preparedSelect)); 1927 cn.release(preparedInsert); 1928 cn.release(preparedSelect); 1929 assert(!cn.isRegistered(preparedInsert)); 1930 assert(!cn.isRegistered(preparedSelect)); 1931 1932 // Test manual re-register 1933 cn.register(preparedInsert); 1934 cn.register(preparedSelect); 1935 assert(cn.isRegistered(preparedInsert)); 1936 assert(cn.isRegistered(preparedSelect)); 1937 1938 // Test double register 1939 cn.register(preparedInsert); 1940 cn.register(preparedSelect); 1941 assert(cn.isRegistered(preparedInsert)); 1942 assert(cn.isRegistered(preparedSelect)); 1943 1944 // Test double release 1945 cn.release(preparedInsert); 1946 cn.release(preparedSelect); 1947 assert(!cn.isRegistered(preparedInsert)); 1948 assert(!cn.isRegistered(preparedSelect)); 1949 cn.release(preparedInsert); 1950 cn.release(preparedSelect); 1951 assert(!cn.isRegistered(preparedInsert)); 1952 assert(!cn.isRegistered(preparedSelect)); 1953 } 1954 1955 // Note that at this point, both prepared statements still exist, 1956 // but are no longer registered on any connection. In fact, there 1957 // are no open connections anymore. 1958 1959 // Test auto-register: exec 1960 { 1961 mixin(scopedCn); 1962 1963 assert(!cn.isRegistered(preparedInsert)); 1964 cn.exec(preparedInsert); 1965 assert(cn.isRegistered(preparedInsert)); 1966 } 1967 1968 // Test auto-register: query 1969 { 1970 mixin(scopedCn); 1971 1972 assert(!cn.isRegistered(preparedSelect)); 1973 cn.query(preparedSelect).each(); 1974 assert(cn.isRegistered(preparedSelect)); 1975 } 1976 1977 // Test auto-register: queryRow 1978 { 1979 mixin(scopedCn); 1980 1981 assert(!cn.isRegistered(preparedSelect)); 1982 cn.queryRow(preparedSelect); 1983 assert(cn.isRegistered(preparedSelect)); 1984 } 1985 1986 // Test auto-register: queryRowTuple 1987 { 1988 mixin(scopedCn); 1989 1990 assert(!cn.isRegistered(preparedSelect)); 1991 cn.queryRowTuple(preparedSelect, queryTupleResult); 1992 assert(cn.isRegistered(preparedSelect)); 1993 } 1994 1995 // Test auto-register: queryValue 1996 { 1997 mixin(scopedCn); 1998 1999 assert(!cn.isRegistered(preparedSelect)); 2000 cn.queryValue(preparedSelect); 2001 assert(cn.isRegistered(preparedSelect)); 2002 } 2003 } 2004 2005 // An attempt to reproduce issue #81: Using mysql-native driver with no default database 2006 // I'm unable to actually reproduce the error, though. 2007 @("issue81") 2008 debug(MYSQLN_TESTS) 2009 unittest 2010 { 2011 import mysql.escape; 2012 mixin(scopedCn); 2013 2014 cn.exec("DROP TABLE IF EXISTS `issue81`"); 2015 cn.exec("CREATE TABLE `issue81` (a INTEGER) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 2016 cn.exec("INSERT INTO `issue81` (a) VALUES (1)"); 2017 2018 auto cn2 = new Connection(text("host=", cn._host, ";port=", cn._port, ";user=", cn._user, ";pwd=", cn._pwd)); 2019 scope(exit) cn2.close(); 2020 2021 cn2.query("SELECT * FROM `"~mysqlEscape(cn._db).text~"`.`issue81`"); 2022 } 2023 2024 // Regression test for Issue #154: 2025 // autoPurge can throw an exception if the socket was closed without purging 2026 // 2027 // This simulates a disconnect by closing the socket underneath the Connection 2028 // object itself. 2029 @("dropConnection") 2030 debug(MYSQLN_TESTS) 2031 unittest 2032 { 2033 mixin(scopedCn); 2034 2035 cn.exec("DROP TABLE IF EXISTS `dropConnection`"); 2036 cn.exec("CREATE TABLE `dropConnection` ( 2037 `val` INTEGER 2038 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 2039 cn.exec("INSERT INTO `dropConnection` VALUES (1), (2), (3)"); 2040 import mysql.prepared; 2041 { 2042 auto prep = cn.prepare("SELECT * FROM `dropConnection`"); 2043 cn.query(prep); 2044 } 2045 // close the socket forcibly 2046 cn._socket.close(); 2047 // this should still work (it should reconnect). 2048 cn.exec("DROP TABLE `dropConnection`"); 2049 } 2050 2051 /+ 2052 Test Prepared's ability to be safely refcount-released during a GC cycle 2053 (ie, `Connection.release` must not allocate GC memory). 2054 2055 Currently disabled because it's not guaranteed to always work 2056 (and apparently, cannot be made to work?) 2057 For relevant discussion, see issue #159: 2058 https://github.com/mysql-d/mysql-native/issues/159 2059 +/ 2060 version(none) 2061 debug(MYSQLN_TESTS) 2062 { 2063 /// Proof-of-concept ref-counted Prepared wrapper, just for testing, 2064 /// not really intended for actual use. 2065 private struct RCPreparedPayload 2066 { 2067 Prepared prepared; 2068 Connection conn; // Connection to be released from 2069 2070 alias prepared this; 2071 2072 @disable this(this); // not copyable 2073 ~this() 2074 { 2075 // There are a couple calls to this dtor where `conn` happens to be null. 2076 if(conn is null) 2077 return; 2078 2079 assert(conn.isRegistered(prepared)); 2080 conn.release(prepared); 2081 } 2082 } 2083 ///ditto 2084 alias RCPrepared = RefCounted!(RCPreparedPayload, RefCountedAutoInitialize.no); 2085 ///ditto 2086 private RCPrepared rcPrepare(Connection conn, string sql) 2087 { 2088 import std.algorithm.mutation : move; 2089 2090 auto prepared = conn.prepare(sql); 2091 auto payload = RCPreparedPayload(prepared, conn); 2092 return refCounted(move(payload)); 2093 } 2094 2095 @("rcPrepared") 2096 unittest 2097 { 2098 import core.memory; 2099 mixin(scopedCn); 2100 2101 cn.exec("DROP TABLE IF EXISTS `rcPrepared`"); 2102 cn.exec("CREATE TABLE `rcPrepared` ( 2103 `val` INTEGER 2104 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 2105 cn.exec("INSERT INTO `rcPrepared` VALUES (1), (2), (3)"); 2106 2107 // Define this in outer scope to guarantee data is left pending when 2108 // RCPrepared's payload is collected. This will guarantee 2109 // that Connection will need to queue the release. 2110 ResultRange rows; 2111 2112 void bar() 2113 { 2114 class Foo { RCPrepared p; } 2115 auto foo = new Foo(); 2116 2117 auto rcStmt = cn.rcPrepare("SELECT * FROM `rcPrepared`"); 2118 foo.p = rcStmt; 2119 rows = cn.query(rcStmt); 2120 2121 /+ 2122 At this point, there are two references to the prepared statement: 2123 One in a `Foo` object (currently bound to `foo`), and one on the stack. 2124 2125 Returning from this function will destroy the one on the stack, 2126 and deterministically reduce the refcount to 1. 2127 2128 So, right here we set `foo` to null to *keep* the Foo object's 2129 reference to the prepared statement, but set adrift the Foo object 2130 itself, ready to be destroyed (along with the only remaining 2131 prepared statement reference it contains) by the next GC cycle. 2132 2133 Thus, `RCPreparedPayload.~this` and `Connection.release(Prepared)` 2134 will be executed during a GC cycle...and had better not perform 2135 any allocations, or else...boom! 2136 +/ 2137 foo = null; 2138 } 2139 2140 bar(); 2141 assert(cn.hasPending); // Ensure Connection is forced to queue the release. 2142 GC.collect(); // `Connection.release(Prepared)` better not be allocating, or boom! 2143 } 2144 }