1 /// Connect to a MySQL/MariaDB server. 2 module mysql.connection; 3 4 import std.algorithm; 5 import std.conv; 6 import std.digest.sha; 7 import std.exception; 8 import std.socket; 9 import std.string; 10 11 import mysql.commands; 12 import mysql.exceptions; 13 import mysql.protocol.constants; 14 import mysql.protocol.packets; 15 import mysql.protocol.sockets; 16 import mysql.result; 17 debug(MYSQL_INTEGRATION_TESTS) 18 { 19 import mysql.test.common; 20 } 21 22 version(Have_vibe_d_core) 23 { 24 static if(__traits(compiles, (){ import vibe.core.net; } )) 25 import vibe.core.net; 26 else 27 static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'."); 28 } 29 30 /// The default `mysql.protocol.constants.SvrCapFlags` used when creating a connection. 31 immutable SvrCapFlags defaultClientFlags = 32 SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS | 33 SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 | 34 SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS | 35 //SvrCapFlags.MULTI_RESULTS; 36 37 /++ 38 A class representing a database connection. 39 40 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 41 creating a new Connection directly. That will provide certain benefits, 42 such as reusing old connections and automatic cleanup (no need to close 43 the connection when done). 44 45 ------------------ 46 // Suggested usage: 47 48 { 49 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 50 scope(exit) con.close(); 51 52 // Use the connection 53 ... 54 } 55 ------------------ 56 +/ 57 class Connection 58 { 59 /+ 60 The Connection is responsible for handshaking with the server to establish 61 authentication. It then passes client preferences to the server, and 62 subsequently is the channel for all command packets that are sent, and all 63 response packets received. 64 65 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one 66 byte as a packet number. Connection deals with the headers and ensures that 67 packet numbers are sequential. 68 69 The initial packet is sent by the server - essentially a 'hello' packet 70 inviting login. That packet has a sequence number of zero. That sequence 71 number is the incremented by client and server packets through the handshake 72 sequence. 73 74 After login all further sequences are initialized by the client sending a 75 command packet with a zero sequence number, to which the server replies with 76 zero or more packets with sequential sequence numbers. 77 +/ 78 package: 79 enum OpenState 80 { 81 /// We have not yet connected to the server, or have sent QUIT to the 82 /// server and closed the connection 83 notConnected, 84 /// We have connected to the server and parsed the greeting, but not 85 /// yet authenticated 86 connected, 87 /// We have successfully authenticated against the server, and need to 88 /// send QUIT to the server when closing the connection 89 authenticated 90 } 91 OpenState _open; 92 MySQLSocket _socket; 93 94 SvrCapFlags _sCaps, _cCaps; 95 uint _sThread; 96 ushort _serverStatus; 97 ubyte _sCharSet, _protocol; 98 string _serverVersion; 99 100 string _host, _user, _pwd, _db; 101 ushort _port; 102 103 MySQLSocketType _socketType; 104 105 OpenSocketCallbackPhobos _openSocketPhobos; 106 OpenSocketCallbackVibeD _openSocketVibeD; 107 108 ulong _insertID; 109 110 // This gets incremented every time a command is issued or results are purged, 111 // so a ResultRange can tell whether it's been invalidated. 112 ulong _lastCommandID; 113 114 // Whether there are rows, headers or bimary data waiting to be retreived. 115 // MySQL protocol doesn't permit performing any other action until all 116 // such data is read. 117 bool _rowsPending, _headersPending, _binaryPending; 118 119 // Field count of last performed command. 120 ushort _fieldCount; 121 122 // ResultSetHeaders of last performed command. 123 ResultSetHeaders _rsh; 124 125 // This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise 126 // you'll get the dreaded "packet out of order" message. It, and the socket connection are 127 // the reason why most other objects require a connection object for their construction. 128 ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct 129 /// ordering. First packet should have 0 130 @property ubyte pktNumber() { return _cpn; } 131 void bumpPacket() { _cpn++; } 132 void resetPacket() { _cpn = 0; } 133 134 version(Have_vibe_d_core) {} else 135 pure const nothrow invariant() 136 { 137 assert(_socketType != MySQLSocketType.vibed); 138 } 139 140 void enforceNothingPending() 141 { 142 enforceEx!MYXDataPending(!hasPending); 143 } 144 145 debug(MYSQL_INTEGRATION_TESTS) 146 unittest 147 { 148 import mysql.prepared; 149 import mysql.test.common : scopedCn; 150 mixin(scopedCn); 151 152 cn.exec("DROP TABLE IF EXISTS `enforceNothingPending`"); 153 cn.exec("CREATE TABLE `enforceNothingPending` ( 154 `val` INTEGER 155 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 156 157 immutable insertSQL = "INSERT INTO `enforceNothingPending` VALUES (1), (2)"; 158 immutable selectSQL = "SELECT * FROM `enforceNothingPending`"; 159 Prepared preparedInsert; 160 Prepared preparedSelect; 161 assertNotThrown!MYXDataPending(cn.exec(insertSQL)); 162 assertNotThrown!MYXDataPending(cn.querySet(selectSQL)); 163 assertNotThrown!MYXDataPending(preparedInsert = cn.prepare(insertSQL)); 164 assertNotThrown!MYXDataPending(preparedSelect = cn.prepare(selectSQL)); 165 assertNotThrown!MYXDataPending(preparedInsert.exec()); 166 assertNotThrown!MYXDataPending(preparedSelect.querySet()); 167 168 auto resultSeq = cn.query(selectSQL); 169 170 assertThrown!MYXDataPending(cn.exec(insertSQL)); 171 assertThrown!MYXDataPending(cn.querySet(selectSQL)); 172 assertThrown!MYXDataPending(cn.query(selectSQL)); 173 assertThrown!MYXDataPending(cn.prepare(selectSQL)); 174 assertThrown!MYXDataPending(preparedInsert.exec()); 175 assertThrown!MYXDataPending(preparedSelect.querySet()); 176 177 resultSeq.each(); // Consume range 178 179 assertNotThrown!MYXDataPending(cn.exec(insertSQL)); 180 assertNotThrown!MYXDataPending(cn.querySet(selectSQL)); 181 assertNotThrown!MYXDataPending(cn.prepare(selectSQL)); 182 assertNotThrown!MYXDataPending(preparedInsert.exec()); 183 assertNotThrown!MYXDataPending(preparedSelect.querySet()); 184 } 185 186 ubyte[] getPacket() 187 { 188 scope(failure) kill(); 189 190 ubyte[4] header; 191 _socket.read(header); 192 // number of bytes always set as 24-bit 193 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 194 enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order"); 195 bumpPacket(); 196 197 ubyte[] packet = new ubyte[numDataBytes]; 198 _socket.read(packet); 199 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 200 return packet; 201 } 202 203 void send(const(ubyte)[] packet) 204 in 205 { 206 assert(packet.length > 4); // at least 1 byte more than header 207 } 208 body 209 { 210 _socket.write(packet); 211 } 212 213 void send(const(ubyte)[] header, const(ubyte)[] data) 214 in 215 { 216 assert(header.length == 4 || header.length == 5/*command type included*/); 217 } 218 body 219 { 220 _socket.write(header); 221 if(data.length) 222 _socket.write(data); 223 } 224 225 void sendCmd(T)(CommandType cmd, const(T)[] data) 226 in 227 { 228 // Internal thread states. Clients shouldn't use this 229 assert(cmd != CommandType.SLEEP); 230 assert(cmd != CommandType.CONNECT); 231 assert(cmd != CommandType.TIME); 232 assert(cmd != CommandType.DELAYED_INSERT); 233 assert(cmd != CommandType.CONNECT_OUT); 234 235 // Deprecated 236 assert(cmd != CommandType.CREATE_DB); 237 assert(cmd != CommandType.DROP_DB); 238 assert(cmd != CommandType.TABLE_DUMP); 239 240 // cannot send more than uint.max bytes. TODO: better error message if we try? 241 assert(data.length <= uint.max); 242 } 243 out 244 { 245 // at this point we should have sent a command 246 assert(pktNumber == 1); 247 } 248 body 249 { 250 enforceEx!MYX(!(_headersPending || _rowsPending), 251 "There are result set elements pending - purgeResult() required."); 252 253 scope(failure) kill(); 254 255 _lastCommandID++; 256 257 if(!_socket.connected) 258 { 259 if(cmd == CommandType.QUIT) 260 return; // Don't bother reopening connection just to quit 261 262 _open = OpenState.notConnected; 263 connect(_clientCapabilities); 264 } 265 266 resetPacket(); 267 268 ubyte[] header; 269 header.length = 4 /*header*/ + 1 /*cmd*/; 270 header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/); 271 header[4] = cmd; 272 bumpPacket(); 273 274 send(header, cast(const(ubyte)[])data); 275 } 276 277 OKErrorPacket getCmdResponse(bool asString = false) 278 { 279 auto okp = OKErrorPacket(getPacket()); 280 enforcePacketOK(okp); 281 _serverStatus = okp.serverStatus; 282 return okp; 283 } 284 285 ubyte[] buildAuthPacket(ubyte[] token) 286 in 287 { 288 assert(token.length == 20); 289 } 290 body 291 { 292 ubyte[] packet; 293 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1); 294 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 295 296 // NOTE: we'll set the header last when we know the size 297 298 // Set the default capabilities required by the client 299 _cCaps.packInto(packet[4..8]); 300 301 // Request a conventional maximum packet length. 302 1.packInto(packet[8..12]); 303 304 packet ~= 33; // Set UTF-8 as default charSet 305 306 // There's a statutory block of zero bytes here - fill them in. 307 foreach(i; 0 .. 23) 308 packet ~= 0; 309 310 // Add the user name as a null terminated string 311 foreach(i; 0 .. _user.length) 312 packet ~= _user[i]; 313 packet ~= 0; // \0 314 315 // Add our calculated authentication token as a length prefixed string. 316 assert(token.length <= ubyte.max); 317 if(_pwd.length == 0) // Omit the token if the account has no password 318 packet ~= 0; 319 else 320 { 321 packet ~= cast(ubyte)token.length; 322 foreach(i; 0 .. token.length) 323 packet ~= token[i]; 324 } 325 326 // Add the default database as a null terminated string 327 foreach(i; 0 .. _db.length) 328 packet ~= _db[i]; 329 packet ~= 0; // \0 330 331 // The server sent us a greeting with packet number 0, so we send the auth packet 332 // back with the next number. 333 packet.setPacketHeader(pktNumber); 334 bumpPacket(); 335 return packet; 336 } 337 338 void consumeServerInfo(ref ubyte[] packet) 339 { 340 scope(failure) kill(); 341 342 _sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 343 _sCharSet = packet.consume!ubyte(); // server_language 344 _serverStatus = packet.consume!ushort(); //server_status 345 _sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 346 _sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 347 348 enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 349 enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 350 } 351 352 ubyte[] parseGreeting() 353 { 354 scope(failure) kill(); 355 356 ubyte[] packet = getPacket(); 357 358 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 359 { 360 auto okp = OKErrorPacket(packet); 361 enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 362 } 363 364 _protocol = packet.consume!ubyte(); 365 366 _serverVersion = packet.consume!string(packet.countUntil(0)); 367 packet.skip(1); // \0 terminated _serverVersion 368 369 _sThread = packet.consume!uint(); 370 371 // read first part of scramble buf 372 ubyte[] authBuf; 373 authBuf.length = 255; 374 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 375 376 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 377 378 consumeServerInfo(packet); 379 380 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 381 packet.skip(10); // filler of \0 382 383 // rest of the scramble 384 auto len = packet.countUntil(0); 385 enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 386 enforce(authBuf.length > 8+len); 387 authBuf[8..8+len] = packet.consume(len)[]; 388 authBuf.length = 8+len; // cut to correct size 389 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 390 391 return authBuf; 392 } 393 394 static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port) 395 { 396 auto s = new PlainPhobosSocket(); 397 s.connect(new InternetAddress(host, port)); 398 return s; 399 } 400 401 static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port) 402 { 403 version(Have_vibe_d_core) 404 return vibe.core.net.connectTCP(host, port); 405 else 406 assert(0); 407 } 408 409 void initConnection() 410 { 411 resetPacket(); 412 final switch(_socketType) 413 { 414 case MySQLSocketType.phobos: 415 _socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port)); 416 break; 417 418 case MySQLSocketType.vibed: 419 version(Have_vibe_d_core) { 420 _socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port)); 421 break; 422 } else assert(0, "Unsupported socket type. Need version Have_vibe_d_core."); 423 } 424 } 425 426 ubyte[] makeToken(ubyte[] authBuf) 427 { 428 auto pass1 = sha1Of(cast(const(ubyte)[])_pwd); 429 auto pass2 = sha1Of(pass1); 430 431 SHA1 sha1; 432 sha1.start(); 433 sha1.put(authBuf); 434 sha1.put(pass2); 435 auto result = sha1.finish(); 436 foreach (size_t i; 0..20) 437 result[i] = result[i] ^ pass1[i]; 438 return result.dup; 439 } 440 441 SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 442 { 443 SvrCapFlags common; 444 uint filter = 1; 445 foreach (size_t i; 0..uint.sizeof) 446 { 447 bool serverSupport = (server & filter) != 0; // can the server do this capability? 448 bool clientSupport = (client & filter) != 0; // can we support it? 449 if(serverSupport && clientSupport) 450 common |= filter; 451 filter <<= 1; // check next flag 452 } 453 return common; 454 } 455 456 void setClientFlags(SvrCapFlags capFlags) 457 { 458 _cCaps = getCommonCapabilities(_sCaps, capFlags); 459 460 // We cannot operate in <4.1 protocol, so we'll force it even if the user 461 // didn't supply it 462 _cCaps |= SvrCapFlags.PROTOCOL41; 463 _cCaps |= SvrCapFlags.SECURE_CONNECTION; 464 } 465 466 void authenticate(ubyte[] greeting) 467 in 468 { 469 assert(_open == OpenState.connected); 470 } 471 out 472 { 473 assert(_open == OpenState.authenticated); 474 } 475 body 476 { 477 auto token = makeToken(greeting); 478 auto authPacket = buildAuthPacket(token); 479 send(authPacket); 480 481 auto packet = getPacket(); 482 auto okp = OKErrorPacket(packet); 483 enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 484 _open = OpenState.authenticated; 485 } 486 487 SvrCapFlags _clientCapabilities; 488 489 void connect(SvrCapFlags clientCapabilities) 490 in 491 { 492 assert(closed); 493 } 494 out 495 { 496 assert(_open == OpenState.authenticated); 497 } 498 body 499 { 500 initConnection(); 501 auto greeting = parseGreeting(); 502 _open = OpenState.connected; 503 504 _clientCapabilities = clientCapabilities; 505 setClientFlags(clientCapabilities); 506 authenticate(greeting); 507 } 508 509 // Forcefully close the socket without sending the quit command. 510 // Needed in case an error leaves communatations in an undefined or non-recoverable state. 511 void kill() 512 { 513 if(_socket.connected) 514 _socket.close(); 515 _open = OpenState.notConnected; 516 } 517 518 public: 519 520 /++ 521 Construct opened connection. 522 523 Throws `mysql.exceptions.MySQLException` upon failure to connect. 524 525 If you are using Vibe.d, consider using `mysql.pool.MySQLPool` instead of 526 creating a new Connection directly. That will provide certain benefits, 527 such as reusing old connections and automatic cleanup (no need to close 528 the connection when done). 529 530 ------------------ 531 // Suggested usage: 532 533 { 534 auto con = new Connection("host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"); 535 scope(exit) con.close(); 536 537 // Use the connection 538 ... 539 } 540 ------------------ 541 542 Params: 543 cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld" 544 (TODO: The connection string needs work to allow for semicolons in its parts!) 545 socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos, 546 unless -version=Have_vibe_d_core is used. 547 openSocket = Optional callback which should return a newly-opened Phobos 548 or Vibe.d TCP socket. This allows custom sockets to be used, 549 subclassed from Phobos's or Vibe.d's sockets. 550 host = An IP address in numeric dotted form, or as a host name. 551 user = The user name to authenticate. 552 password = Users password. 553 db = Desired initial database. 554 capFlags = The set of flag bits from the server's capabilities that the client requires 555 +/ 556 //After the connection is created, and the initial invitation is received from the server 557 //client preferences can be set, and authentication can then be attempted. 558 this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 559 { 560 version(Have_vibe_d_core) 561 enum defaultSocketType = MySQLSocketType.vibed; 562 else 563 enum defaultSocketType = MySQLSocketType.phobos; 564 565 this(defaultSocketType, host, user, pwd, db, port, capFlags); 566 } 567 568 ///ditto 569 this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 570 { 571 version(Have_vibe_d_core) {} else 572 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 573 574 this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD, 575 host, user, pwd, db, port, capFlags); 576 } 577 578 ///ditto 579 this(OpenSocketCallbackPhobos openSocket, 580 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 581 { 582 this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags); 583 } 584 585 version(Have_vibe_d_core) 586 ///ditto 587 this(OpenSocketCallbackVibeD openSocket, 588 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 589 { 590 this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags); 591 } 592 593 ///ditto 594 private this(MySQLSocketType socketType, 595 OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD, 596 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 597 in 598 { 599 final switch(socketType) 600 { 601 case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break; 602 case MySQLSocketType.vibed: assert(openSocketVibeD !is null); break; 603 } 604 } 605 body 606 { 607 enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1"); 608 enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection"); 609 version(Have_vibe_d_core) {} else 610 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 611 612 _socketType = socketType; 613 _host = host; 614 _user = user; 615 _pwd = pwd; 616 _db = db; 617 _port = port; 618 619 _openSocketPhobos = openSocketPhobos; 620 _openSocketVibeD = openSocketVibeD; 621 622 connect(capFlags); 623 } 624 625 ///ditto 626 //After the connection is created, and the initial invitation is received from the server 627 //client preferences can be set, and authentication can then be attempted. 628 this(string cs, SvrCapFlags capFlags = defaultClientFlags) 629 { 630 string[] a = parseConnectionString(cs); 631 this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 632 } 633 634 ///ditto 635 this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags) 636 { 637 string[] a = parseConnectionString(cs); 638 this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 639 } 640 641 ///ditto 642 this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 643 { 644 string[] a = parseConnectionString(cs); 645 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 646 } 647 648 version(Have_vibe_d_core) 649 ///ditto 650 this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 651 { 652 string[] a = parseConnectionString(cs); 653 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 654 } 655 656 /++ 657 Check whether this Connection is still connected to the server, or if 658 the connection has been closed. 659 +/ 660 @property bool closed() 661 { 662 return _open == OpenState.notConnected || !_socket.connected; 663 } 664 665 version(Have_vibe_d_core) 666 { 667 /// Used by Vibe.d's ConnectionPool, ignore this. 668 void acquire() { if( _socket ) _socket.acquire(); } 669 ///ditto 670 void release() { if( _socket ) _socket.release(); } 671 ///ditto 672 bool isOwner() { return _socket ? _socket.isOwner() : false; } 673 ///ditto 674 bool amOwner() { return _socket ? _socket.isOwner() : false; } 675 } 676 else 677 { 678 /// Used by Vibe.d's ConnectionPool, ignore this. 679 void acquire() { /+ Do nothing +/ } 680 ///ditto 681 void release() { /+ Do nothing +/ } 682 ///ditto 683 bool isOwner() { return !!_socket; } 684 ///ditto 685 bool amOwner() { return !!_socket; } 686 } 687 688 /++ 689 Explicitly close the connection. 690 691 This is a two-stage process. First tell the server we are quitting this 692 connection, and then close the socket. 693 694 Idiomatic use as follows is suggested: 695 ------------------ 696 { 697 auto con = new Connection("localhost:user:password:mysqld"); 698 scope(exit) con.close(); 699 // Use the connection 700 ... 701 } 702 ------------------ 703 +/ 704 void close() 705 { 706 if (_open == OpenState.authenticated && _socket.connected) 707 quit(); 708 709 if (_open == OpenState.connected) 710 kill(); 711 resetPacket(); 712 } 713 714 /++ 715 Reconnects to the server using the same connection settings originally 716 used to create the Connection. 717 718 Optionally takes a SvrCapFlags, allowing you to reconnect using a different 719 set of server capability flags (most users will not need to do this). 720 721 If the connection is already open, this will do nothing. However, if you 722 request a different set of SvrCapFlags then was originally used to create 723 the Connection, the connection will be closed and then reconnected. 724 +/ 725 void reconnect() 726 { 727 reconnect(_clientCapabilities); 728 } 729 730 ///ditto 731 void reconnect(SvrCapFlags clientCapabilities) 732 { 733 bool sameCaps = clientCapabilities == _clientCapabilities; 734 if(!closed) 735 { 736 // Same caps as before? 737 if(clientCapabilities == _clientCapabilities) 738 return; // Nothing to do, just keep current connection 739 740 close(); 741 } 742 743 connect(clientCapabilities); 744 } 745 746 private void quit() 747 in 748 { 749 assert(_open == OpenState.authenticated); 750 } 751 body 752 { 753 sendCmd(CommandType.QUIT, []); 754 // No response is sent for a quit packet 755 _open = OpenState.connected; 756 } 757 758 /++ 759 Parses a connection string of the form 760 `"host=localhost;port=3306;user=joe;pwd=pass123;db=myappsdb"` 761 762 Port is optional and defaults to 3306. 763 764 Whitespace surrounding any name or value is automatically stripped. 765 766 Returns a five-element array of strings in this order: 767 $(UL 768 $(LI [0]: host) 769 $(LI [1]: user) 770 $(LI [2]: pwd) 771 $(LI [3]: db) 772 $(LI [4]: port) 773 ) 774 775 (TODO: The connection string needs work to allow for semicolons in its parts!) 776 +/ 777 //TODO: Replace the return value with a proper struct. 778 static string[] parseConnectionString(string cs) 779 { 780 string[] rv; 781 rv.length = 5; 782 rv[4] = "3306"; // Default port 783 string[] a = split(cs, ";"); 784 foreach (s; a) 785 { 786 string[] a2 = split(s, "="); 787 enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs); 788 string name = strip(a2[0]); 789 string val = strip(a2[1]); 790 switch (name) 791 { 792 case "host": 793 rv[0] = val; 794 break; 795 case "user": 796 rv[1] = val; 797 break; 798 case "pwd": 799 rv[2] = val; 800 break; 801 case "db": 802 rv[3] = val; 803 break; 804 case "port": 805 rv[4] = val; 806 break; 807 default: 808 throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__); 809 } 810 } 811 return rv; 812 } 813 814 /++ 815 Select a current database. 816 817 Params: dbName = Name of the requested database 818 Throws: MySQLException 819 +/ 820 void selectDB(string dbName) 821 { 822 sendCmd(CommandType.INIT_DB, dbName); 823 getCmdResponse(); 824 _db = dbName; 825 } 826 827 /++ 828 Check the server status 829 830 Returns: An OKErrorPacket from which server status can be determined 831 Throws: MySQLException 832 +/ 833 OKErrorPacket pingServer() 834 { 835 sendCmd(CommandType.PING, []); 836 return getCmdResponse(); 837 } 838 839 /++ 840 Refresh some feature(s) of the server. 841 842 Returns: An OKErrorPacket from which server status can be determined 843 Throws: MySQLException 844 +/ 845 OKErrorPacket refreshServer(RefreshFlags flags) 846 { 847 sendCmd(CommandType.REFRESH, [flags]); 848 return getCmdResponse(); 849 } 850 851 /++ 852 Get the next Row of a pending result set. 853 854 This method can be used after either execSQL() or execPrepared() have returned true 855 to retrieve result set rows sequentially. 856 857 Similar functionality is available via execSQLSequence() and execPreparedSequence() in 858 which case the interface is presented as a forward range of Rows. 859 860 This method allows you to deal with very large result sets either a row at a time, 861 or by feeding the rows into some suitable container such as a linked list. 862 863 Returns: A Row object. 864 +/ 865 Row getNextRow() 866 { 867 scope(failure) kill(); 868 869 if (_headersPending) 870 { 871 _rsh = ResultSetHeaders(this, _fieldCount); 872 _headersPending = false; 873 } 874 ubyte[] packet; 875 Row rr; 876 packet = getPacket(); 877 if (packet.isEOFPacket()) 878 { 879 _rowsPending = _binaryPending = false; 880 return rr; 881 } 882 if (_binaryPending) 883 rr = Row(this, packet, _rsh, true); 884 else 885 rr = Row(this, packet, _rsh, false); 886 //rr._valid = true; 887 return rr; 888 } 889 890 /++ 891 Flush any outstanding result set elements. 892 893 When the server responds to a command that produces a result set, it 894 queues the whole set of corresponding packets over the current connection. 895 Before that Connection can embark on any new command, it must receive 896 all of those packets and junk them. 897 http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/ 898 +/ 899 ulong purgeResult() 900 { 901 scope(failure) kill(); 902 903 _lastCommandID++; 904 905 ulong rows = 0; 906 if (_headersPending) 907 { 908 for (size_t i = 0;; i++) 909 { 910 if (getPacket().isEOFPacket()) 911 { 912 _headersPending = false; 913 break; 914 } 915 enforceEx!MYXProtocol(i < _fieldCount, 916 text("Field header count (", _fieldCount, ") exceeded but no EOF packet found.")); 917 } 918 } 919 if (_rowsPending) 920 { 921 for (;; rows++) 922 { 923 if (getPacket().isEOFPacket()) 924 { 925 _rowsPending = _binaryPending = false; 926 break; 927 } 928 } 929 } 930 resetPacket(); 931 return rows; 932 } 933 934 /++ 935 Get a textual report on the server status. 936 937 (COM_STATISTICS) 938 +/ 939 string serverStats() 940 { 941 sendCmd(CommandType.STATISTICS, []); 942 return cast(string) getPacket(); 943 } 944 945 /++ 946 Enable multiple statement commands 947 948 This can be used later if this feature was not requested in the client capability flags. 949 950 Params: on = Boolean value to turn the capability on or off. 951 +/ 952 void enableMultiStatements(bool on) 953 { 954 scope(failure) kill(); 955 956 ubyte[] t; 957 t.length = 2; 958 t[0] = on ? 0 : 1; 959 t[1] = 0; 960 sendCmd(CommandType.STMT_OPTION, t); 961 962 // For some reason this command gets an EOF packet as response 963 auto packet = getPacket(); 964 enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 965 } 966 967 /// Return the in-force protocol number 968 @property ubyte protocol() pure const nothrow { return _protocol; } 969 /// Server version 970 @property string serverVersion() pure const nothrow { return _serverVersion; } 971 /// Server capability flags 972 @property uint serverCapabilities() pure const nothrow { return _sCaps; } 973 /// Server status 974 @property ushort serverStatus() pure const nothrow { return _serverStatus; } 975 /// Current character set 976 @property ubyte charSet() pure const nothrow { return _sCharSet; } 977 /// Current database 978 @property string currentDB() pure const nothrow { return _db; } 979 /// Socket type being used 980 @property MySQLSocketType socketType() pure const nothrow { return _socketType; } 981 982 /// After a command that inserted a row into a table with an auto-increment 983 /// ID column, this method allows you to retrieve the last insert ID. 984 @property ulong lastInsertID() pure const nothrow { return _insertID; } 985 986 /// This gets incremented every time a command is issued or results are purged, 987 /// so a ResultRange can tell whether it's been invalidated. 988 @property ulong lastCommandID() pure const nothrow { return _lastCommandID; } 989 990 /// Gets whether rows are pending 991 @property bool rowsPending() pure const nothrow { return _rowsPending; } 992 993 /// Gets whether anything (rows, headers or binary) is pending. 994 /// New commands cannot be sent on a conncection while anything is pending. 995 @property bool hasPending() pure const nothrow 996 { 997 return _rowsPending || _headersPending || _binaryPending; 998 } 999 1000 /// Gets the result header's field descriptions. 1001 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 1002 }