1 /// Connect to a MySQL/MariaDB server. 2 module mysql.connection; 3 4 import std.algorithm; 5 import std.conv; 6 import std.digest.sha; 7 import std.exception; 8 import std.socket; 9 import std.string; 10 11 import mysql.commands; 12 import mysql.exceptions; 13 import mysql.prepared; 14 import mysql.protocol.constants; 15 import mysql.protocol.packets; 16 import mysql.protocol.sockets; 17 import mysql.result; 18 debug(MYSQL_INTEGRATION_TESTS) 19 { 20 import mysql.test.common; 21 } 22 23 version(Have_vibe_d_core) 24 { 25 static if(__traits(compiles, (){ import vibe.core.net; } )) 26 import vibe.core.net; 27 else 28 static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'."); 29 } 30 31 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection. 32 immutable SvrCapFlags defaultClientFlags = 33 SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS | 34 SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 | 35 SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS | 36 //SvrCapFlags.MULTI_RESULTS; 37 38 /++ 39 A class representing a database connection. 40 41 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 42 creating a new Connection directly. That will provide certain benefits, 43 such as reusing old connections and automatic cleanup (no need to close 44 the connection when done). 45 46 ------------------ 47 // Suggested usage: 48 49 { 50 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 51 scope(exit) con.close(); 52 53 // Use the connection 54 ... 55 } 56 ------------------ 57 +/ 58 //TODO: All low-level commms should be moved into the mysql.protocol package. 59 class Connection 60 { 61 /+ 62 The Connection is responsible for handshaking with the server to establish 63 authentication. It then passes client preferences to the server, and 64 subsequently is the channel for all command packets that are sent, and all 65 response packets received. 66 67 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one 68 byte as a packet number. Connection deals with the headers and ensures that 69 packet numbers are sequential. 70 71 The initial packet is sent by the server - essentially a 'hello' packet 72 inviting login. That packet has a sequence number of zero. That sequence 73 number is the incremented by client and server packets through the handshake 74 sequence. 75 76 After login all further sequences are initialized by the client sending a 77 command packet with a zero sequence number, to which the server replies with 78 zero or more packets with sequential sequence numbers. 79 +/ 80 package: 81 enum OpenState 82 { 83 /// We have not yet connected to the server, or have sent QUIT to the 84 /// server and closed the connection 85 notConnected, 86 /// We have connected to the server and parsed the greeting, but not 87 /// yet authenticated 88 connected, 89 /// We have successfully authenticated against the server, and need to 90 /// send QUIT to the server when closing the connection 91 authenticated 92 } 93 OpenState _open; 94 MySQLSocket _socket; 95 96 SvrCapFlags _sCaps, _cCaps; 97 uint _sThread; 98 ushort _serverStatus; 99 ubyte _sCharSet, _protocol; 100 string _serverVersion; 101 102 string _host, _user, _pwd, _db; 103 ushort _port; 104 105 MySQLSocketType _socketType; 106 107 OpenSocketCallbackPhobos _openSocketPhobos; 108 OpenSocketCallbackVibeD _openSocketVibeD; 109 110 ulong _insertID; 111 112 // This gets incremented every time a command is issued or results are purged, 113 // so a ResultRange can tell whether it's been invalidated. 114 ulong _lastCommandID; 115 116 // Whether there are rows, headers or bimary data waiting to be retreived. 117 // MySQL protocol doesn't permit performing any other action until all 118 // such data is read. 119 bool _rowsPending, _headersPending, _binaryPending; 120 121 // Field count of last performed command. 122 ushort _fieldCount; 123 124 // ResultSetHeaders of last performed command. 125 ResultSetHeaders _rsh; 126 127 // This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise 128 // you'll get the dreaded "packet out of order" message. It, and the socket connection are 129 // the reason why most other objects require a connection object for their construction. 130 ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct 131 /// ordering. First packet should have 0 132 @property ubyte pktNumber() { return _cpn; } 133 void bumpPacket() { _cpn++; } 134 void resetPacket() { _cpn = 0; } 135 136 version(Have_vibe_d_core) {} else 137 pure const nothrow invariant() 138 { 139 assert(_socketType != MySQLSocketType.vibed); 140 } 141 142 ubyte[] getPacket() 143 { 144 scope(failure) kill(); 145 146 ubyte[4] header; 147 _socket.read(header); 148 // number of bytes always set as 24-bit 149 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 150 enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order"); 151 bumpPacket(); 152 153 ubyte[] packet = new ubyte[numDataBytes]; 154 _socket.read(packet); 155 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 156 return packet; 157 } 158 159 void send(const(ubyte)[] packet) 160 in 161 { 162 assert(packet.length > 4); // at least 1 byte more than header 163 } 164 body 165 { 166 _socket.write(packet); 167 } 168 169 void send(const(ubyte)[] header, const(ubyte)[] data) 170 in 171 { 172 assert(header.length == 4 || header.length == 5/*command type included*/); 173 } 174 body 175 { 176 _socket.write(header); 177 if(data.length) 178 _socket.write(data); 179 } 180 181 void sendCmd(T)(CommandType cmd, const(T)[] data) 182 in 183 { 184 // Internal thread states. Clients shouldn't use this 185 assert(cmd != CommandType.SLEEP); 186 assert(cmd != CommandType.CONNECT); 187 assert(cmd != CommandType.TIME); 188 assert(cmd != CommandType.DELAYED_INSERT); 189 assert(cmd != CommandType.CONNECT_OUT); 190 191 // Deprecated 192 assert(cmd != CommandType.CREATE_DB); 193 assert(cmd != CommandType.DROP_DB); 194 assert(cmd != CommandType.TABLE_DUMP); 195 196 // cannot send more than uint.max bytes. TODO: better error message if we try? 197 assert(data.length <= uint.max); 198 } 199 out 200 { 201 // at this point we should have sent a command 202 assert(pktNumber == 1); 203 } 204 body 205 { 206 autoPurge(); 207 208 scope(failure) kill(); 209 210 _lastCommandID++; 211 212 if(!_socket.connected) 213 { 214 if(cmd == CommandType.QUIT) 215 return; // Don't bother reopening connection just to quit 216 217 _open = OpenState.notConnected; 218 connect(_clientCapabilities); 219 } 220 221 resetPacket(); 222 223 ubyte[] header; 224 header.length = 4 /*header*/ + 1 /*cmd*/; 225 header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/); 226 header[4] = cmd; 227 bumpPacket(); 228 229 send(header, cast(const(ubyte)[])data); 230 } 231 232 OKErrorPacket getCmdResponse(bool asString = false) 233 { 234 auto okp = OKErrorPacket(getPacket()); 235 enforcePacketOK(okp); 236 _serverStatus = okp.serverStatus; 237 return okp; 238 } 239 240 ubyte[] buildAuthPacket(ubyte[] token) 241 in 242 { 243 assert(token.length == 20); 244 } 245 body 246 { 247 ubyte[] packet; 248 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1); 249 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 250 251 // NOTE: we'll set the header last when we know the size 252 253 // Set the default capabilities required by the client 254 _cCaps.packInto(packet[4..8]); 255 256 // Request a conventional maximum packet length. 257 1.packInto(packet[8..12]); 258 259 packet ~= 33; // Set UTF-8 as default charSet 260 261 // There's a statutory block of zero bytes here - fill them in. 262 foreach(i; 0 .. 23) 263 packet ~= 0; 264 265 // Add the user name as a null terminated string 266 foreach(i; 0 .. _user.length) 267 packet ~= _user[i]; 268 packet ~= 0; // \0 269 270 // Add our calculated authentication token as a length prefixed string. 271 assert(token.length <= ubyte.max); 272 if(_pwd.length == 0) // Omit the token if the account has no password 273 packet ~= 0; 274 else 275 { 276 packet ~= cast(ubyte)token.length; 277 foreach(i; 0 .. token.length) 278 packet ~= token[i]; 279 } 280 281 // Add the default database as a null terminated string 282 foreach(i; 0 .. _db.length) 283 packet ~= _db[i]; 284 packet ~= 0; // \0 285 286 // The server sent us a greeting with packet number 0, so we send the auth packet 287 // back with the next number. 288 packet.setPacketHeader(pktNumber); 289 bumpPacket(); 290 return packet; 291 } 292 293 void consumeServerInfo(ref ubyte[] packet) 294 { 295 scope(failure) kill(); 296 297 _sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 298 _sCharSet = packet.consume!ubyte(); // server_language 299 _serverStatus = packet.consume!ushort(); //server_status 300 _sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 301 _sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 302 303 enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 304 enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 305 } 306 307 ubyte[] parseGreeting() 308 { 309 scope(failure) kill(); 310 311 ubyte[] packet = getPacket(); 312 313 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 314 { 315 auto okp = OKErrorPacket(packet); 316 enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 317 } 318 319 _protocol = packet.consume!ubyte(); 320 321 _serverVersion = packet.consume!string(packet.countUntil(0)); 322 packet.skip(1); // \0 terminated _serverVersion 323 324 _sThread = packet.consume!uint(); 325 326 // read first part of scramble buf 327 ubyte[] authBuf; 328 authBuf.length = 255; 329 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 330 331 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 332 333 consumeServerInfo(packet); 334 335 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 336 packet.skip(10); // filler of \0 337 338 // rest of the scramble 339 auto len = packet.countUntil(0); 340 enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 341 enforce(authBuf.length > 8+len); 342 authBuf[8..8+len] = packet.consume(len)[]; 343 authBuf.length = 8+len; // cut to correct size 344 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 345 346 return authBuf; 347 } 348 349 static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port) 350 { 351 auto s = new PlainPhobosSocket(); 352 s.connect(new InternetAddress(host, port)); 353 return s; 354 } 355 356 static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port) 357 { 358 version(Have_vibe_d_core) 359 return vibe.core.net.connectTCP(host, port); 360 else 361 assert(0); 362 } 363 364 void initConnection() 365 { 366 resetPacket(); 367 final switch(_socketType) 368 { 369 case MySQLSocketType.phobos: 370 _socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port)); 371 break; 372 373 case MySQLSocketType.vibed: 374 version(Have_vibe_d_core) { 375 _socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port)); 376 break; 377 } else assert(0, "Unsupported socket type. Need version Have_vibe_d_core."); 378 } 379 } 380 381 ubyte[] makeToken(ubyte[] authBuf) 382 { 383 auto pass1 = sha1Of(cast(const(ubyte)[])_pwd); 384 auto pass2 = sha1Of(pass1); 385 386 SHA1 sha1; 387 sha1.start(); 388 sha1.put(authBuf); 389 sha1.put(pass2); 390 auto result = sha1.finish(); 391 foreach (size_t i; 0..20) 392 result[i] = result[i] ^ pass1[i]; 393 return result.dup; 394 } 395 396 SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 397 { 398 SvrCapFlags common; 399 uint filter = 1; 400 foreach (size_t i; 0..uint.sizeof) 401 { 402 bool serverSupport = (server & filter) != 0; // can the server do this capability? 403 bool clientSupport = (client & filter) != 0; // can we support it? 404 if(serverSupport && clientSupport) 405 common |= filter; 406 filter <<= 1; // check next flag 407 } 408 return common; 409 } 410 411 void setClientFlags(SvrCapFlags capFlags) 412 { 413 _cCaps = getCommonCapabilities(_sCaps, capFlags); 414 415 // We cannot operate in <4.1 protocol, so we'll force it even if the user 416 // didn't supply it 417 _cCaps |= SvrCapFlags.PROTOCOL41; 418 _cCaps |= SvrCapFlags.SECURE_CONNECTION; 419 } 420 421 void authenticate(ubyte[] greeting) 422 in 423 { 424 assert(_open == OpenState.connected); 425 } 426 out 427 { 428 assert(_open == OpenState.authenticated); 429 } 430 body 431 { 432 auto token = makeToken(greeting); 433 auto authPacket = buildAuthPacket(token); 434 send(authPacket); 435 436 auto packet = getPacket(); 437 auto okp = OKErrorPacket(packet); 438 enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 439 _open = OpenState.authenticated; 440 } 441 442 SvrCapFlags _clientCapabilities; 443 444 void connect(SvrCapFlags clientCapabilities) 445 in 446 { 447 assert(closed); 448 } 449 out 450 { 451 assert(_open == OpenState.authenticated); 452 } 453 body 454 { 455 initConnection(); 456 auto greeting = parseGreeting(); 457 _open = OpenState.connected; 458 459 _clientCapabilities = clientCapabilities; 460 setClientFlags(clientCapabilities); 461 authenticate(greeting); 462 } 463 464 /// Forcefully close the socket without sending the quit command. 465 /// Needed in case an error leaves communatations in an undefined or non-recoverable state. 466 void kill() 467 { 468 if(_socket.connected) 469 _socket.close(); 470 _open = OpenState.notConnected; 471 } 472 473 /// Called whenever mysql-native needs to send a command to the server 474 /// and be sure there aren't any pending results (which would prevent 475 /// a new command from being sent). 476 void autoPurge() 477 { 478 // This is called every time a command is sent, 479 // so detect & prevent infinite recursion. 480 static bool isAutoPurging = false; 481 482 if(isAutoPurging) 483 return; 484 485 isAutoPurging = true; 486 scope(exit) isAutoPurging = false; 487 scope(failure) kill(); 488 489 purgeResult(); 490 statementsToRelease.releaseAll(); 491 } 492 493 /++ 494 Keeps track of prepared statements queued to be released from the server. 495 496 Prepared statements aren't released immediately, because that 497 involves sending a command to the server even though there might be 498 results pending. (Can't send a command while results are pending.) 499 +/ 500 static struct StatementsToRelease(alias doRelease) 501 { 502 private Connection conn; 503 504 /// Ids of prepared statements to be released. 505 /// This uses assumeSafeAppend. Do not save copies of it. 506 uint[] ids; 507 508 void add(uint statementId) 509 { 510 ids ~= statementId; 511 } 512 513 /// Removes a prepared statement from the list of statements 514 /// to be released from the server. 515 /// Does nothing if the statement isn't on the list. 516 void remove(uint statementId) 517 { 518 foreach(ref id; ids) 519 if(id == statementId) 520 id = 0; 521 } 522 523 /// Releases the prepared statements queued for release. 524 void releaseAll() 525 { 526 foreach(id; ids) 527 if(id != 0) 528 doRelease(conn, id); 529 530 ids.length = 0; 531 assumeSafeAppend(ids); 532 } 533 } 534 StatementsToRelease!(PreparedImpl.immediateRelease) statementsToRelease; 535 536 debug(MYSQL_INTEGRATION_TESTS) uint[] fakeRelease_released; 537 unittest 538 { 539 debug(MYSQL_INTEGRATION_TESTS) 540 { 541 static void fakeRelease(Connection conn, uint id) 542 { 543 conn.fakeRelease_released ~= id; 544 } 545 546 mixin(scopedCn); 547 548 StatementsToRelease!fakeRelease list; 549 list.conn = cn; 550 assert(list.ids == []); 551 assert(cn.fakeRelease_released == []); 552 553 list.add(1); 554 assert(list.ids == [1]); 555 assert(cn.fakeRelease_released == []); 556 557 list.add(7); 558 assert(list.ids == [1, 7]); 559 assert(cn.fakeRelease_released == []); 560 561 list.add(9); 562 assert(list.ids == [1, 7, 9]); 563 assert(cn.fakeRelease_released == []); 564 565 list.remove(5); 566 assert(list.ids == [1, 7, 9]); 567 assert(cn.fakeRelease_released == []); 568 569 list.remove(7); 570 assert(list.ids == [1, 0, 9]); 571 assert(cn.fakeRelease_released == []); 572 573 list.releaseAll(); 574 assert(list.ids == []); 575 assert(cn.fakeRelease_released == [1, 9]); 576 } 577 } 578 579 public: 580 581 /++ 582 Construct opened connection. 583 584 Throws `mysql.exceptions.MySQLException` upon failure to connect. 585 586 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 587 creating a new Connection directly. That will provide certain benefits, 588 such as reusing old connections and automatic cleanup (no need to close 589 the connection when done). 590 591 ------------------ 592 // Suggested usage: 593 594 { 595 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 596 scope(exit) con.close(); 597 598 // Use the connection 599 ... 600 } 601 ------------------ 602 603 Params: 604 cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld" 605 (TODO: The connection string needs work to allow for semicolons in its parts!) 606 socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos, 607 unless -version=Have_vibe_d_core is used. 608 openSocket = Optional callback which should return a newly-opened Phobos 609 or Vibe.d TCP socket. This allows custom sockets to be used, 610 subclassed from Phobos's or Vibe.d's sockets. 611 host = An IP address in numeric dotted form, or as a host name. 612 user = The user name to authenticate. 613 password = Users password. 614 db = Desired initial database. 615 capFlags = The set of flag bits from the server's capabilities that the client requires 616 +/ 617 //After the connection is created, and the initial invitation is received from the server 618 //client preferences can be set, and authentication can then be attempted. 619 this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 620 { 621 version(Have_vibe_d_core) 622 enum defaultSocketType = MySQLSocketType.vibed; 623 else 624 enum defaultSocketType = MySQLSocketType.phobos; 625 626 this(defaultSocketType, host, user, pwd, db, port, capFlags); 627 } 628 629 ///ditto 630 this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 631 { 632 version(Have_vibe_d_core) {} else 633 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 634 635 this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD, 636 host, user, pwd, db, port, capFlags); 637 } 638 639 ///ditto 640 this(OpenSocketCallbackPhobos openSocket, 641 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 642 { 643 this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags); 644 } 645 646 version(Have_vibe_d_core) 647 ///ditto 648 this(OpenSocketCallbackVibeD openSocket, 649 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 650 { 651 this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags); 652 } 653 654 ///ditto 655 private this(MySQLSocketType socketType, 656 OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD, 657 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 658 in 659 { 660 final switch(socketType) 661 { 662 case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break; 663 case MySQLSocketType.vibed: assert(openSocketVibeD !is null); break; 664 } 665 } 666 body 667 { 668 enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1"); 669 enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection"); 670 version(Have_vibe_d_core) {} else 671 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 672 673 statementsToRelease.conn = this; 674 675 _socketType = socketType; 676 _host = host; 677 _user = user; 678 _pwd = pwd; 679 _db = db; 680 _port = port; 681 682 _openSocketPhobos = openSocketPhobos; 683 _openSocketVibeD = openSocketVibeD; 684 685 connect(capFlags); 686 } 687 688 ///ditto 689 //After the connection is created, and the initial invitation is received from the server 690 //client preferences can be set, and authentication can then be attempted. 691 this(string cs, SvrCapFlags capFlags = defaultClientFlags) 692 { 693 string[] a = parseConnectionString(cs); 694 this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 695 } 696 697 ///ditto 698 this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags) 699 { 700 string[] a = parseConnectionString(cs); 701 this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 702 } 703 704 ///ditto 705 this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 706 { 707 string[] a = parseConnectionString(cs); 708 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 709 } 710 711 version(Have_vibe_d_core) 712 ///ditto 713 this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 714 { 715 string[] a = parseConnectionString(cs); 716 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 717 } 718 719 /++ 720 Check whether this Connection is still connected to the server, or if 721 the connection has been closed. 722 +/ 723 @property bool closed() 724 { 725 return _open == OpenState.notConnected || !_socket.connected; 726 } 727 728 version(Have_vibe_d_core) 729 { 730 /// Used by Vibe.d's ConnectionPool, ignore this. 731 void acquire() { if( _socket ) _socket.acquire(); } 732 ///ditto 733 void release() { if( _socket ) _socket.release(); } 734 ///ditto 735 bool isOwner() { return _socket ? _socket.isOwner() : false; } 736 ///ditto 737 bool amOwner() { return _socket ? _socket.isOwner() : false; } 738 } 739 else 740 { 741 /// Used by Vibe.d's ConnectionPool, ignore this. 742 void acquire() { /+ Do nothing +/ } 743 ///ditto 744 void release() { /+ Do nothing +/ } 745 ///ditto 746 bool isOwner() { return !!_socket; } 747 ///ditto 748 bool amOwner() { return !!_socket; } 749 } 750 751 /++ 752 Explicitly close the connection. 753 754 This is a two-stage process. First tell the server we are quitting this 755 connection, and then close the socket. 756 757 Idiomatic use as follows is suggested: 758 ------------------ 759 { 760 auto con = new Connection("localhost:user:password:mysqld"); 761 scope(exit) con.close(); 762 // Use the connection 763 ... 764 } 765 ------------------ 766 +/ 767 void close() 768 { 769 if (_open == OpenState.authenticated && _socket.connected) 770 quit(); 771 772 if (_open == OpenState.connected) 773 kill(); 774 resetPacket(); 775 } 776 777 /++ 778 Reconnects to the server using the same connection settings originally 779 used to create the Connection. 780 781 Optionally takes a SvrCapFlags, allowing you to reconnect using a different 782 set of server capability flags (most users will not need to do this). 783 784 If the connection is already open, this will do nothing. However, if you 785 request a different set of SvrCapFlags then was originally used to create 786 the Connection, the connection will be closed and then reconnected. 787 +/ 788 void reconnect() 789 { 790 reconnect(_clientCapabilities); 791 } 792 793 ///ditto 794 void reconnect(SvrCapFlags clientCapabilities) 795 { 796 bool sameCaps = clientCapabilities == _clientCapabilities; 797 if(!closed) 798 { 799 // Same caps as before? 800 if(clientCapabilities == _clientCapabilities) 801 return; // Nothing to do, just keep current connection 802 803 close(); 804 } 805 806 connect(clientCapabilities); 807 } 808 809 private void quit() 810 in 811 { 812 assert(_open == OpenState.authenticated); 813 } 814 body 815 { 816 sendCmd(CommandType.QUIT, []); 817 // No response is sent for a quit packet 818 _open = OpenState.connected; 819 } 820 821 /++ 822 Parses a connection string of the form 823 `"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"` 824 825 Port is optional and defaults to 3306. 826 827 Whitespace surrounding any name or value is automatically stripped. 828 829 Returns a five-element array of strings in this order: 830 $(UL 831 $(LI [0]: host) 832 $(LI [1]: user) 833 $(LI [2]: pwd) 834 $(LI [3]: db) 835 $(LI [4]: port) 836 ) 837 838 (TODO: The connection string needs work to allow for semicolons in its parts!) 839 +/ 840 //TODO: Replace the return value with a proper struct. 841 static string[] parseConnectionString(string cs) 842 { 843 string[] rv; 844 rv.length = 5; 845 rv[4] = "3306"; // Default port 846 string[] a = split(cs, ";"); 847 foreach (s; a) 848 { 849 string[] a2 = split(s, "="); 850 enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs); 851 string name = strip(a2[0]); 852 string val = strip(a2[1]); 853 switch (name) 854 { 855 case "host": 856 rv[0] = val; 857 break; 858 case "user": 859 rv[1] = val; 860 break; 861 case "pwd": 862 rv[2] = val; 863 break; 864 case "db": 865 rv[3] = val; 866 break; 867 case "port": 868 rv[4] = val; 869 break; 870 default: 871 throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__); 872 } 873 } 874 return rv; 875 } 876 877 /++ 878 Select a current database. 879 880 Params: dbName = Name of the requested database 881 Throws: MySQLException 882 +/ 883 void selectDB(string dbName) 884 { 885 sendCmd(CommandType.INIT_DB, dbName); 886 getCmdResponse(); 887 _db = dbName; 888 } 889 890 /++ 891 Check the server status 892 893 Returns: An OKErrorPacket from which server status can be determined 894 Throws: MySQLException 895 +/ 896 OKErrorPacket pingServer() 897 { 898 sendCmd(CommandType.PING, []); 899 return getCmdResponse(); 900 } 901 902 /++ 903 Refresh some feature(s) of the server. 904 905 Returns: An OKErrorPacket from which server status can be determined 906 Throws: MySQLException 907 +/ 908 OKErrorPacket refreshServer(RefreshFlags flags) 909 { 910 sendCmd(CommandType.REFRESH, [flags]); 911 return getCmdResponse(); 912 } 913 914 /++ 915 Get the next Row of a pending result set. 916 917 This method can be used after either execSQL() or execPrepared() have returned true 918 to retrieve result set rows sequentially. 919 920 Similar functionality is available via execSQLSequence() and execPreparedSequence() in 921 which case the interface is presented as a forward range of Rows. 922 923 This method allows you to deal with very large result sets either a row at a time, 924 or by feeding the rows into some suitable container such as a linked list. 925 926 Returns: A Row object. 927 +/ 928 Row getNextRow() 929 { 930 scope(failure) kill(); 931 932 if (_headersPending) 933 { 934 _rsh = ResultSetHeaders(this, _fieldCount); 935 _headersPending = false; 936 } 937 ubyte[] packet; 938 Row rr; 939 packet = getPacket(); 940 if (packet.isEOFPacket()) 941 { 942 _rowsPending = _binaryPending = false; 943 return rr; 944 } 945 if (_binaryPending) 946 rr = Row(this, packet, _rsh, true); 947 else 948 rr = Row(this, packet, _rsh, false); 949 //rr._valid = true; 950 return rr; 951 } 952 953 /++ 954 Flush any outstanding result set elements. 955 956 When the server responds to a command that produces a result set, it 957 queues the whole set of corresponding packets over the current connection. 958 Before that Connection can embark on any new command, it must receive 959 all of those packets and junk them. 960 http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/ 961 +/ 962 ulong purgeResult() 963 { 964 scope(failure) kill(); 965 966 _lastCommandID++; 967 968 ulong rows = 0; 969 if (_headersPending) 970 { 971 for (size_t i = 0;; i++) 972 { 973 if (getPacket().isEOFPacket()) 974 { 975 _headersPending = false; 976 break; 977 } 978 enforceEx!MYXProtocol(i < _fieldCount, 979 text("Field header count (", _fieldCount, ") exceeded but no EOF packet found.")); 980 } 981 } 982 if (_rowsPending) 983 { 984 for (;; rows++) 985 { 986 if (getPacket().isEOFPacket()) 987 { 988 _rowsPending = _binaryPending = false; 989 break; 990 } 991 } 992 } 993 resetPacket(); 994 return rows; 995 } 996 997 /++ 998 Get a textual report on the server status. 999 1000 (COM_STATISTICS) 1001 +/ 1002 string serverStats() 1003 { 1004 sendCmd(CommandType.STATISTICS, []); 1005 return cast(string) getPacket(); 1006 } 1007 1008 /++ 1009 Enable multiple statement commands 1010 1011 This can be used later if this feature was not requested in the client capability flags. 1012 1013 Params: on = Boolean value to turn the capability on or off. 1014 +/ 1015 void enableMultiStatements(bool on) 1016 { 1017 scope(failure) kill(); 1018 1019 ubyte[] t; 1020 t.length = 2; 1021 t[0] = on ? 0 : 1; 1022 t[1] = 0; 1023 sendCmd(CommandType.STMT_OPTION, t); 1024 1025 // For some reason this command gets an EOF packet as response 1026 auto packet = getPacket(); 1027 enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 1028 } 1029 1030 /// Return the in-force protocol number 1031 @property ubyte protocol() pure const nothrow { return _protocol; } 1032 /// Server version 1033 @property string serverVersion() pure const nothrow { return _serverVersion; } 1034 /// Server capability flags 1035 @property uint serverCapabilities() pure const nothrow { return _sCaps; } 1036 /// Server status 1037 @property ushort serverStatus() pure const nothrow { return _serverStatus; } 1038 /// Current character set 1039 @property ubyte charSet() pure const nothrow { return _sCharSet; } 1040 /// Current database 1041 @property string currentDB() pure const nothrow { return _db; } 1042 /// Socket type being used 1043 @property MySQLSocketType socketType() pure const nothrow { return _socketType; } 1044 1045 /// After a command that inserted a row into a table with an auto-increment 1046 /// ID column, this method allows you to retrieve the last insert ID. 1047 @property ulong lastInsertID() pure const nothrow { return _insertID; } 1048 1049 /// This gets incremented every time a command is issued or results are purged, 1050 /// so a ResultRange can tell whether it's been invalidated. 1051 @property ulong lastCommandID() pure const nothrow { return _lastCommandID; } 1052 1053 /// Gets whether rows are pending 1054 @property bool rowsPending() pure const nothrow { return _rowsPending; } 1055 1056 /// Gets whether anything (rows, headers or binary) is pending. 1057 /// New commands cannot be sent on a conncection while anything is pending. 1058 @property bool hasPending() pure const nothrow 1059 { 1060 return _rowsPending || _headersPending || _binaryPending; 1061 } 1062 1063 /// Gets the result header's field descriptions. 1064 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 1065 }