1 /// Connect to a MySQL/MariaDB server. 2 module mysql.connection; 3 4 import std.algorithm; 5 import std.conv; 6 import std.digest.sha; 7 import std.exception; 8 import std.socket; 9 import std.string; 10 11 import mysql.commands; 12 import mysql.exceptions; 13 import mysql.prepared; 14 import mysql.protocol.constants; 15 import mysql.protocol.packets; 16 import mysql.protocol.sockets; 17 import mysql.result; 18 debug(MYSQL_INTEGRATION_TESTS) 19 { 20 import mysql.test.common; 21 } 22 23 version(Have_vibe_d_core) 24 { 25 static if(__traits(compiles, (){ import vibe.core.net; } )) 26 import vibe.core.net; 27 else 28 static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'."); 29 } 30 31 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection. 32 immutable SvrCapFlags defaultClientFlags = 33 SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS | 34 SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 | 35 SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS | 36 //SvrCapFlags.MULTI_RESULTS; 37 38 /++ 39 A class representing a database connection. 40 41 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 42 creating a new Connection directly. That will provide certain benefits, 43 such as reusing old connections and automatic cleanup (no need to close 44 the connection when done). 45 46 ------------------ 47 // Suggested usage: 48 49 { 50 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 51 scope(exit) con.close(); 52 53 // Use the connection 54 ... 55 } 56 ------------------ 57 +/ 58 //TODO: All low-level commms should be moved into the mysql.protocol package. 59 class Connection 60 { 61 /+ 62 The Connection is responsible for handshaking with the server to establish 63 authentication. It then passes client preferences to the server, and 64 subsequently is the channel for all command packets that are sent, and all 65 response packets received. 66 67 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one 68 byte as a packet number. Connection deals with the headers and ensures that 69 packet numbers are sequential. 70 71 The initial packet is sent by the server - essentially a 'hello' packet 72 inviting login. That packet has a sequence number of zero. That sequence 73 number is the incremented by client and server packets through the handshake 74 sequence. 75 76 After login all further sequences are initialized by the client sending a 77 command packet with a zero sequence number, to which the server replies with 78 zero or more packets with sequential sequence numbers. 79 +/ 80 package: 81 enum OpenState 82 { 83 /// We have not yet connected to the server, or have sent QUIT to the 84 /// server and closed the connection 85 notConnected, 86 /// We have connected to the server and parsed the greeting, but not 87 /// yet authenticated 88 connected, 89 /// We have successfully authenticated against the server, and need to 90 /// send QUIT to the server when closing the connection 91 authenticated 92 } 93 OpenState _open; 94 MySQLSocket _socket; 95 96 SvrCapFlags _sCaps, _cCaps; 97 uint _sThread; 98 ushort _serverStatus; 99 ubyte _sCharSet, _protocol; 100 string _serverVersion; 101 102 string _host, _user, _pwd, _db; 103 ushort _port; 104 105 MySQLSocketType _socketType; 106 107 OpenSocketCallbackPhobos _openSocketPhobos; 108 OpenSocketCallbackVibeD _openSocketVibeD; 109 110 ulong _insertID; 111 112 // This gets incremented every time a command is issued or results are purged, 113 // so a ResultRange can tell whether it's been invalidated. 114 ulong _lastCommandID; 115 116 // Whether there are rows, headers or bimary data waiting to be retreived. 117 // MySQL protocol doesn't permit performing any other action until all 118 // such data is read. 119 bool _rowsPending, _headersPending, _binaryPending; 120 121 // Field count of last performed command. 122 ushort _fieldCount; 123 124 // ResultSetHeaders of last performed command. 125 ResultSetHeaders _rsh; 126 127 // This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise 128 // you'll get the dreaded "packet out of order" message. It, and the socket connection are 129 // the reason why most other objects require a connection object for their construction. 130 ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct 131 /// ordering. First packet should have 0 132 @property ubyte pktNumber() { return _cpn; } 133 void bumpPacket() { _cpn++; } 134 void resetPacket() { _cpn = 0; } 135 136 version(Have_vibe_d_core) {} else 137 pure const nothrow invariant() 138 { 139 assert(_socketType != MySQLSocketType.vibed); 140 } 141 142 ubyte[] getPacket() 143 { 144 scope(failure) kill(); 145 146 ubyte[4] header; 147 _socket.read(header); 148 // number of bytes always set as 24-bit 149 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 150 enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order"); 151 bumpPacket(); 152 153 ubyte[] packet = new ubyte[numDataBytes]; 154 _socket.read(packet); 155 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 156 return packet; 157 } 158 159 void send(const(ubyte)[] packet) 160 in 161 { 162 assert(packet.length > 4); // at least 1 byte more than header 163 } 164 body 165 { 166 _socket.write(packet); 167 } 168 169 void send(const(ubyte)[] header, const(ubyte)[] data) 170 in 171 { 172 assert(header.length == 4 || header.length == 5/*command type included*/); 173 } 174 body 175 { 176 _socket.write(header); 177 if(data.length) 178 _socket.write(data); 179 } 180 181 void sendCmd(T)(CommandType cmd, const(T)[] data) 182 in 183 { 184 // Internal thread states. Clients shouldn't use this 185 assert(cmd != CommandType.SLEEP); 186 assert(cmd != CommandType.CONNECT); 187 assert(cmd != CommandType.TIME); 188 assert(cmd != CommandType.DELAYED_INSERT); 189 assert(cmd != CommandType.CONNECT_OUT); 190 191 // Deprecated 192 assert(cmd != CommandType.CREATE_DB); 193 assert(cmd != CommandType.DROP_DB); 194 assert(cmd != CommandType.TABLE_DUMP); 195 196 // cannot send more than uint.max bytes. TODO: better error message if we try? 197 assert(data.length <= uint.max); 198 } 199 out 200 { 201 // at this point we should have sent a command 202 assert(pktNumber == 1); 203 } 204 body 205 { 206 autoPurge(); 207 208 scope(failure) kill(); 209 210 _lastCommandID++; 211 212 if(!_socket.connected) 213 { 214 if(cmd == CommandType.QUIT) 215 return; // Don't bother reopening connection just to quit 216 217 _open = OpenState.notConnected; 218 connect(_clientCapabilities); 219 } 220 221 resetPacket(); 222 223 ubyte[] header; 224 header.length = 4 /*header*/ + 1 /*cmd*/; 225 header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/); 226 header[4] = cmd; 227 bumpPacket(); 228 229 send(header, cast(const(ubyte)[])data); 230 } 231 232 OKErrorPacket getCmdResponse(bool asString = false) 233 { 234 auto okp = OKErrorPacket(getPacket()); 235 enforcePacketOK(okp); 236 _serverStatus = okp.serverStatus; 237 return okp; 238 } 239 240 ubyte[] buildAuthPacket(ubyte[] token) 241 in 242 { 243 assert(token.length == 20); 244 } 245 body 246 { 247 ubyte[] packet; 248 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1); 249 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 250 251 // NOTE: we'll set the header last when we know the size 252 253 // Set the default capabilities required by the client 254 _cCaps.packInto(packet[4..8]); 255 256 // Request a conventional maximum packet length. 257 1.packInto(packet[8..12]); 258 259 packet ~= 33; // Set UTF-8 as default charSet 260 261 // There's a statutory block of zero bytes here - fill them in. 262 foreach(i; 0 .. 23) 263 packet ~= 0; 264 265 // Add the user name as a null terminated string 266 foreach(i; 0 .. _user.length) 267 packet ~= _user[i]; 268 packet ~= 0; // \0 269 270 // Add our calculated authentication token as a length prefixed string. 271 assert(token.length <= ubyte.max); 272 if(_pwd.length == 0) // Omit the token if the account has no password 273 packet ~= 0; 274 else 275 { 276 packet ~= cast(ubyte)token.length; 277 foreach(i; 0 .. token.length) 278 packet ~= token[i]; 279 } 280 281 // Add the default database as a null terminated string 282 foreach(i; 0 .. _db.length) 283 packet ~= _db[i]; 284 packet ~= 0; // \0 285 286 // The server sent us a greeting with packet number 0, so we send the auth packet 287 // back with the next number. 288 packet.setPacketHeader(pktNumber); 289 bumpPacket(); 290 return packet; 291 } 292 293 void consumeServerInfo(ref ubyte[] packet) 294 { 295 scope(failure) kill(); 296 297 _sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 298 _sCharSet = packet.consume!ubyte(); // server_language 299 _serverStatus = packet.consume!ushort(); //server_status 300 _sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 301 _sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 302 303 enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 304 enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 305 } 306 307 ubyte[] parseGreeting() 308 { 309 scope(failure) kill(); 310 311 ubyte[] packet = getPacket(); 312 313 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 314 { 315 auto okp = OKErrorPacket(packet); 316 enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 317 } 318 319 _protocol = packet.consume!ubyte(); 320 321 _serverVersion = packet.consume!string(packet.countUntil(0)); 322 packet.skip(1); // \0 terminated _serverVersion 323 324 _sThread = packet.consume!uint(); 325 326 // read first part of scramble buf 327 ubyte[] authBuf; 328 authBuf.length = 255; 329 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 330 331 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 332 333 consumeServerInfo(packet); 334 335 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 336 packet.skip(10); // filler of \0 337 338 // rest of the scramble 339 auto len = packet.countUntil(0); 340 enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 341 enforce(authBuf.length > 8+len); 342 authBuf[8..8+len] = packet.consume(len)[]; 343 authBuf.length = 8+len; // cut to correct size 344 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 345 346 return authBuf; 347 } 348 349 static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port) 350 { 351 auto s = new PlainPhobosSocket(); 352 s.connect(new InternetAddress(host, port)); 353 return s; 354 } 355 356 static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port) 357 { 358 version(Have_vibe_d_core) 359 return vibe.core.net.connectTCP(host, port); 360 else 361 assert(0); 362 } 363 364 void initConnection() 365 { 366 resetPacket(); 367 final switch(_socketType) 368 { 369 case MySQLSocketType.phobos: 370 _socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port)); 371 break; 372 373 case MySQLSocketType.vibed: 374 version(Have_vibe_d_core) { 375 _socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port)); 376 break; 377 } else assert(0, "Unsupported socket type. Need version Have_vibe_d_core."); 378 } 379 } 380 381 ubyte[] makeToken(ubyte[] authBuf) 382 { 383 auto pass1 = sha1Of(cast(const(ubyte)[])_pwd); 384 auto pass2 = sha1Of(pass1); 385 386 SHA1 sha1; 387 sha1.start(); 388 sha1.put(authBuf); 389 sha1.put(pass2); 390 auto result = sha1.finish(); 391 foreach (size_t i; 0..20) 392 result[i] = result[i] ^ pass1[i]; 393 return result.dup; 394 } 395 396 SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 397 { 398 SvrCapFlags common; 399 uint filter = 1; 400 foreach (size_t i; 0..uint.sizeof) 401 { 402 bool serverSupport = (server & filter) != 0; // can the server do this capability? 403 bool clientSupport = (client & filter) != 0; // can we support it? 404 if(serverSupport && clientSupport) 405 common |= filter; 406 filter <<= 1; // check next flag 407 } 408 return common; 409 } 410 411 void setClientFlags(SvrCapFlags capFlags) 412 { 413 _cCaps = getCommonCapabilities(_sCaps, capFlags); 414 415 // We cannot operate in <4.1 protocol, so we'll force it even if the user 416 // didn't supply it 417 _cCaps |= SvrCapFlags.PROTOCOL41; 418 _cCaps |= SvrCapFlags.SECURE_CONNECTION; 419 } 420 421 void authenticate(ubyte[] greeting) 422 in 423 { 424 assert(_open == OpenState.connected); 425 } 426 out 427 { 428 assert(_open == OpenState.authenticated); 429 } 430 body 431 { 432 auto token = makeToken(greeting); 433 auto authPacket = buildAuthPacket(token); 434 send(authPacket); 435 436 auto packet = getPacket(); 437 auto okp = OKErrorPacket(packet); 438 enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 439 _open = OpenState.authenticated; 440 } 441 442 SvrCapFlags _clientCapabilities; 443 444 void connect(SvrCapFlags clientCapabilities) 445 in 446 { 447 assert(closed); 448 } 449 out 450 { 451 assert(_open == OpenState.authenticated); 452 } 453 body 454 { 455 initConnection(); 456 auto greeting = parseGreeting(); 457 _open = OpenState.connected; 458 459 _clientCapabilities = clientCapabilities; 460 setClientFlags(clientCapabilities); 461 authenticate(greeting); 462 } 463 464 /// Forcefully close the socket without sending the quit command. 465 /// Needed in case an error leaves communatations in an undefined or non-recoverable state. 466 void kill() 467 { 468 if(_socket.connected) 469 _socket.close(); 470 _open = OpenState.notConnected; 471 // any pending data is gone. Any statements to release will be released 472 // on the server automatically. 473 _headersPending = _rowsPending = _binaryPending = false; 474 statementsToRelease.clear(); 475 } 476 477 /// Called whenever mysql-native needs to send a command to the server 478 /// and be sure there aren't any pending results (which would prevent 479 /// a new command from being sent). 480 void autoPurge() 481 { 482 // This is called every time a command is sent, 483 // so detect & prevent infinite recursion. 484 static bool isAutoPurging = false; 485 486 if(isAutoPurging) 487 return; 488 489 isAutoPurging = true; 490 scope(exit) isAutoPurging = false; 491 492 try 493 { 494 purgeResult(); 495 statementsToRelease.releaseAll(); 496 } 497 catch(Exception e) 498 { 499 // likely the connection was closed, so reset any state. 500 // Don't treat this as a real error, because everything will be reset when we 501 // reconnect. 502 kill(); 503 } 504 } 505 506 /++ 507 Keeps track of prepared statements queued to be released from the server. 508 509 Prepared statements aren't released immediately, because that 510 involves sending a command to the server even though there might be 511 results pending. (Can't send a command while results are pending.) 512 +/ 513 static struct StatementsToRelease(alias doRelease) 514 { 515 private Connection conn; 516 517 /// Ids of prepared statements to be released. 518 /// This uses assumeSafeAppend. Do not save copies of it. 519 uint[] ids; 520 521 void add(uint statementId) 522 { 523 ids ~= statementId; 524 } 525 526 /// Removes a prepared statement from the list of statements 527 /// to be released from the server. 528 /// Does nothing if the statement isn't on the list. 529 void remove(uint statementId) 530 { 531 foreach(ref id; ids) 532 if(id == statementId) 533 id = 0; 534 } 535 536 /// Releases the prepared statements queued for release. 537 void releaseAll() 538 { 539 foreach(id; ids) 540 if(id != 0) 541 doRelease(conn, id); 542 543 clear(); 544 } 545 546 // clear all statements, and reset for using again. 547 private void clear() 548 { 549 if(ids.length) 550 { 551 ids.length = 0; 552 assumeSafeAppend(ids); 553 } 554 } 555 } 556 StatementsToRelease!(PreparedImpl.immediateRelease) statementsToRelease; 557 558 debug(MYSQL_INTEGRATION_TESTS) uint[] fakeRelease_released; 559 unittest 560 { 561 debug(MYSQL_INTEGRATION_TESTS) 562 { 563 static void fakeRelease(Connection conn, uint id) 564 { 565 conn.fakeRelease_released ~= id; 566 } 567 568 mixin(scopedCn); 569 570 StatementsToRelease!fakeRelease list; 571 list.conn = cn; 572 assert(list.ids == []); 573 assert(cn.fakeRelease_released == []); 574 575 list.add(1); 576 assert(list.ids == [1]); 577 assert(cn.fakeRelease_released == []); 578 579 list.add(7); 580 assert(list.ids == [1, 7]); 581 assert(cn.fakeRelease_released == []); 582 583 list.add(9); 584 assert(list.ids == [1, 7, 9]); 585 assert(cn.fakeRelease_released == []); 586 587 list.remove(5); 588 assert(list.ids == [1, 7, 9]); 589 assert(cn.fakeRelease_released == []); 590 591 list.remove(7); 592 assert(list.ids == [1, 0, 9]); 593 assert(cn.fakeRelease_released == []); 594 595 list.releaseAll(); 596 assert(list.ids == []); 597 assert(cn.fakeRelease_released == [1, 9]); 598 } 599 } 600 601 public: 602 603 /++ 604 Construct opened connection. 605 606 Throws `mysql.exceptions.MYX` upon failure to connect. 607 608 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 609 creating a new Connection directly. That will provide certain benefits, 610 such as reusing old connections and automatic cleanup (no need to close 611 the connection when done). 612 613 ------------------ 614 // Suggested usage: 615 616 { 617 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 618 scope(exit) con.close(); 619 620 // Use the connection 621 ... 622 } 623 ------------------ 624 625 Params: 626 cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld" 627 (TODO: The connection string needs work to allow for semicolons in its parts!) 628 socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos, 629 unless -version=Have_vibe_d_core is used. 630 openSocket = Optional callback which should return a newly-opened Phobos 631 or Vibe.d TCP socket. This allows custom sockets to be used, 632 subclassed from Phobos's or Vibe.d's sockets. 633 host = An IP address in numeric dotted form, or as a host name. 634 user = The user name to authenticate. 635 password = Users password. 636 db = Desired initial database. 637 capFlags = The set of flag bits from the server's capabilities that the client requires 638 +/ 639 //After the connection is created, and the initial invitation is received from the server 640 //client preferences can be set, and authentication can then be attempted. 641 this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 642 { 643 version(Have_vibe_d_core) 644 enum defaultSocketType = MySQLSocketType.vibed; 645 else 646 enum defaultSocketType = MySQLSocketType.phobos; 647 648 this(defaultSocketType, host, user, pwd, db, port, capFlags); 649 } 650 651 ///ditto 652 this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 653 { 654 version(Have_vibe_d_core) {} else 655 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 656 657 this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD, 658 host, user, pwd, db, port, capFlags); 659 } 660 661 ///ditto 662 this(OpenSocketCallbackPhobos openSocket, 663 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 664 { 665 this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags); 666 } 667 668 version(Have_vibe_d_core) 669 ///ditto 670 this(OpenSocketCallbackVibeD openSocket, 671 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 672 { 673 this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags); 674 } 675 676 ///ditto 677 private this(MySQLSocketType socketType, 678 OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD, 679 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 680 in 681 { 682 final switch(socketType) 683 { 684 case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break; 685 case MySQLSocketType.vibed: assert(openSocketVibeD !is null); break; 686 } 687 } 688 body 689 { 690 enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1"); 691 enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection"); 692 version(Have_vibe_d_core) {} else 693 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 694 695 statementsToRelease.conn = this; 696 697 _socketType = socketType; 698 _host = host; 699 _user = user; 700 _pwd = pwd; 701 _db = db; 702 _port = port; 703 704 _openSocketPhobos = openSocketPhobos; 705 _openSocketVibeD = openSocketVibeD; 706 707 connect(capFlags); 708 } 709 710 ///ditto 711 //After the connection is created, and the initial invitation is received from the server 712 //client preferences can be set, and authentication can then be attempted. 713 this(string cs, SvrCapFlags capFlags = defaultClientFlags) 714 { 715 string[] a = parseConnectionString(cs); 716 this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 717 } 718 719 ///ditto 720 this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags) 721 { 722 string[] a = parseConnectionString(cs); 723 this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 724 } 725 726 ///ditto 727 this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 728 { 729 string[] a = parseConnectionString(cs); 730 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 731 } 732 733 version(Have_vibe_d_core) 734 ///ditto 735 this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 736 { 737 string[] a = parseConnectionString(cs); 738 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 739 } 740 741 /++ 742 Check whether this Connection is still connected to the server, or if 743 the connection has been closed. 744 +/ 745 @property bool closed() 746 { 747 return _open == OpenState.notConnected || !_socket.connected; 748 } 749 750 version(Have_vibe_d_core) 751 { 752 /// Used by Vibe.d's ConnectionPool, ignore this. 753 void acquire() { if( _socket ) _socket.acquire(); } 754 ///ditto 755 void release() { if( _socket ) _socket.release(); } 756 ///ditto 757 bool isOwner() { return _socket ? _socket.isOwner() : false; } 758 ///ditto 759 bool amOwner() { return _socket ? _socket.isOwner() : false; } 760 } 761 else 762 { 763 /// Used by Vibe.d's ConnectionPool, ignore this. 764 void acquire() { /+ Do nothing +/ } 765 ///ditto 766 void release() { /+ Do nothing +/ } 767 ///ditto 768 bool isOwner() { return !!_socket; } 769 ///ditto 770 bool amOwner() { return !!_socket; } 771 } 772 773 /++ 774 Explicitly close the connection. 775 776 This is a two-stage process. First tell the server we are quitting this 777 connection, and then close the socket. 778 779 Idiomatic use as follows is suggested: 780 ------------------ 781 { 782 auto con = new Connection("localhost:user:password:mysqld"); 783 scope(exit) con.close(); 784 // Use the connection 785 ... 786 } 787 ------------------ 788 +/ 789 void close() 790 { 791 if (_open == OpenState.authenticated && _socket.connected) 792 quit(); 793 794 if (_open == OpenState.connected) 795 kill(); 796 resetPacket(); 797 } 798 799 /++ 800 Reconnects to the server using the same connection settings originally 801 used to create the Connection. 802 803 Optionally takes a SvrCapFlags, allowing you to reconnect using a different 804 set of server capability flags (most users will not need to do this). 805 806 If the connection is already open, this will do nothing. However, if you 807 request a different set of SvrCapFlags then was originally used to create 808 the Connection, the connection will be closed and then reconnected. 809 +/ 810 void reconnect() 811 { 812 reconnect(_clientCapabilities); 813 } 814 815 ///ditto 816 void reconnect(SvrCapFlags clientCapabilities) 817 { 818 bool sameCaps = clientCapabilities == _clientCapabilities; 819 if(!closed) 820 { 821 // Same caps as before? 822 if(clientCapabilities == _clientCapabilities) 823 return; // Nothing to do, just keep current connection 824 825 close(); 826 } 827 828 connect(clientCapabilities); 829 } 830 831 private void quit() 832 in 833 { 834 assert(_open == OpenState.authenticated); 835 } 836 body 837 { 838 sendCmd(CommandType.QUIT, []); 839 // No response is sent for a quit packet 840 _open = OpenState.connected; 841 } 842 843 /++ 844 Parses a connection string of the form 845 `"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"` 846 847 Port is optional and defaults to 3306. 848 849 Whitespace surrounding any name or value is automatically stripped. 850 851 Returns a five-element array of strings in this order: 852 $(UL 853 $(LI [0]: host) 854 $(LI [1]: user) 855 $(LI [2]: pwd) 856 $(LI [3]: db) 857 $(LI [4]: port) 858 ) 859 860 (TODO: The connection string needs work to allow for semicolons in its parts!) 861 +/ 862 //TODO: Replace the return value with a proper struct. 863 static string[] parseConnectionString(string cs) 864 { 865 string[] rv; 866 rv.length = 5; 867 rv[4] = "3306"; // Default port 868 string[] a = split(cs, ";"); 869 foreach (s; a) 870 { 871 string[] a2 = split(s, "="); 872 enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs); 873 string name = strip(a2[0]); 874 string val = strip(a2[1]); 875 switch (name) 876 { 877 case "host": 878 rv[0] = val; 879 break; 880 case "user": 881 rv[1] = val; 882 break; 883 case "pwd": 884 rv[2] = val; 885 break; 886 case "db": 887 rv[3] = val; 888 break; 889 case "port": 890 rv[4] = val; 891 break; 892 default: 893 throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__); 894 } 895 } 896 return rv; 897 } 898 899 /++ 900 Select a current database. 901 902 Params: dbName = Name of the requested database 903 Throws: MYX 904 +/ 905 void selectDB(string dbName) 906 { 907 sendCmd(CommandType.INIT_DB, dbName); 908 getCmdResponse(); 909 _db = dbName; 910 } 911 912 /++ 913 Check the server status 914 915 Returns: An OKErrorPacket from which server status can be determined 916 Throws: MYX 917 +/ 918 OKErrorPacket pingServer() 919 { 920 sendCmd(CommandType.PING, []); 921 return getCmdResponse(); 922 } 923 924 /++ 925 Refresh some feature(s) of the server. 926 927 Returns: An OKErrorPacket from which server status can be determined 928 Throws: MYX 929 +/ 930 OKErrorPacket refreshServer(RefreshFlags flags) 931 { 932 sendCmd(CommandType.REFRESH, [flags]); 933 return getCmdResponse(); 934 } 935 936 /++ 937 Get the next Row of a pending result set. 938 939 This method can be used after either execSQL() or execPrepared() have returned true 940 to retrieve result set rows sequentially. 941 942 Similar functionality is available via execSQLSequence() and execPreparedSequence() in 943 which case the interface is presented as a forward range of Rows. 944 945 This method allows you to deal with very large result sets either a row at a time, 946 or by feeding the rows into some suitable container such as a linked list. 947 948 Returns: A Row object. 949 +/ 950 Row getNextRow() 951 { 952 scope(failure) kill(); 953 954 if (_headersPending) 955 { 956 _rsh = ResultSetHeaders(this, _fieldCount); 957 _headersPending = false; 958 } 959 ubyte[] packet; 960 Row rr; 961 packet = getPacket(); 962 if (packet.isEOFPacket()) 963 { 964 _rowsPending = _binaryPending = false; 965 return rr; 966 } 967 if (_binaryPending) 968 rr = Row(this, packet, _rsh, true); 969 else 970 rr = Row(this, packet, _rsh, false); 971 //rr._valid = true; 972 return rr; 973 } 974 975 /++ 976 Flush any outstanding result set elements. 977 978 When the server responds to a command that produces a result set, it 979 queues the whole set of corresponding packets over the current connection. 980 Before that Connection can embark on any new command, it must receive 981 all of those packets and junk them. 982 http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/ 983 +/ 984 ulong purgeResult() 985 { 986 scope(failure) kill(); 987 988 _lastCommandID++; 989 990 ulong rows = 0; 991 if (_headersPending) 992 { 993 for (size_t i = 0;; i++) 994 { 995 if (getPacket().isEOFPacket()) 996 { 997 _headersPending = false; 998 break; 999 } 1000 enforceEx!MYXProtocol(i < _fieldCount, 1001 text("Field header count (", _fieldCount, ") exceeded but no EOF packet found.")); 1002 } 1003 } 1004 if (_rowsPending) 1005 { 1006 for (;; rows++) 1007 { 1008 if (getPacket().isEOFPacket()) 1009 { 1010 _rowsPending = _binaryPending = false; 1011 break; 1012 } 1013 } 1014 } 1015 resetPacket(); 1016 return rows; 1017 } 1018 1019 /++ 1020 Get a textual report on the server status. 1021 1022 (COM_STATISTICS) 1023 +/ 1024 string serverStats() 1025 { 1026 sendCmd(CommandType.STATISTICS, []); 1027 return cast(string) getPacket(); 1028 } 1029 1030 /++ 1031 Enable multiple statement commands 1032 1033 This can be used later if this feature was not requested in the client capability flags. 1034 1035 Params: on = Boolean value to turn the capability on or off. 1036 +/ 1037 void enableMultiStatements(bool on) 1038 { 1039 scope(failure) kill(); 1040 1041 ubyte[] t; 1042 t.length = 2; 1043 t[0] = on ? 0 : 1; 1044 t[1] = 0; 1045 sendCmd(CommandType.STMT_OPTION, t); 1046 1047 // For some reason this command gets an EOF packet as response 1048 auto packet = getPacket(); 1049 enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1050 } 1051 1052 /// Return the in-force protocol number 1053 @property ubyte protocol() pure const nothrow { return _protocol; } 1054 /// Server version 1055 @property string serverVersion() pure const nothrow { return _serverVersion; } 1056 /// Server capability flags 1057 @property uint serverCapabilities() pure const nothrow { return _sCaps; } 1058 /// Server status 1059 @property ushort serverStatus() pure const nothrow { return _serverStatus; } 1060 /// Current character set 1061 @property ubyte charSet() pure const nothrow { return _sCharSet; } 1062 /// Current database 1063 @property string currentDB() pure const nothrow { return _db; } 1064 /// Socket type being used 1065 @property MySQLSocketType socketType() pure const nothrow { return _socketType; } 1066 1067 /// After a command that inserted a row into a table with an auto-increment 1068 /// ID column, this method allows you to retrieve the last insert ID. 1069 @property ulong lastInsertID() pure const nothrow { return _insertID; } 1070 1071 /// This gets incremented every time a command is issued or results are purged, 1072 /// so a ResultRange can tell whether it's been invalidated. 1073 @property ulong lastCommandID() pure const nothrow { return _lastCommandID; } 1074 1075 /// Gets whether rows are pending 1076 @property bool rowsPending() pure const nothrow { return _rowsPending; } 1077 1078 /// Gets whether anything (rows, headers or binary) is pending. 1079 /// New commands cannot be sent on a conncection while anything is pending. 1080 @property bool hasPending() pure const nothrow 1081 { 1082 return _rowsPending || _headersPending || _binaryPending; 1083 } 1084 1085 /// Gets the result header's field descriptions. 1086 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 1087 } 1088 1089 // unittest for issue 154, when the socket is disconnected from the mysql server. 1090 // This simulates a disconnect by closing the socket underneath the Connection 1091 // object itself. 1092 debug(MYSQL_INTEGRATION_TESTS) 1093 unittest 1094 { 1095 mixin(scopedCn); 1096 1097 cn.exec("DROP TABLE IF EXISTS `dropConnection`"); 1098 cn.exec("CREATE TABLE `dropConnection` ( 1099 `val` INTEGER 1100 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 1101 cn.exec("INSERT INTO `dropConnection` VALUES (1), (2), (3)"); 1102 import mysql.prepared; 1103 { 1104 auto prep = cn.prepare("SELECT * FROM `dropConnection`"); 1105 prep.query(); 1106 } 1107 // close the socket forcibly 1108 cn._socket.close(); 1109 // this should still work (it should reconnect). 1110 cn.exec("DROP TABLE `dropConnection`"); 1111 }