1 module mysql.connection; 2 3 // Publically import rest of package for backwards compatability. 4 // These public imports will eventually be phased out. 5 public import mysql.common; 6 public import mysql.result; 7 public import mysql.protocol.commands; 8 public import mysql.protocol.constants; 9 public import mysql.protocol.extra_types; 10 public import mysql.protocol.packet_helpers; 11 public import mysql.protocol.packets; 12 debug(MYSQL_INTEGRATION_TESTS) 13 { 14 public import mysql.test.common; 15 public import mysql.test.integration; 16 public import mysql.test.regression; 17 } 18 19 version(Have_vibe_d_core) 20 { 21 static if(__traits(compiles, (){ import vibe.core.net; } )) 22 import vibe.core.net; 23 else 24 static assert(false, "mysql-native can't find Vibe.d's 'vibe.core.net'."); 25 } 26 27 import std.algorithm; 28 import std.conv; 29 import std.datetime; 30 import std.digest.sha; 31 import std.exception; 32 import std.range; 33 import std.socket; 34 import std.stdio; 35 import std.string; 36 import std.traits; 37 import std.variant; 38 39 immutable SvrCapFlags defaultClientFlags = 40 SvrCapFlags.OLD_LONG_PASSWORD | SvrCapFlags.ALL_COLUMN_FLAGS | 41 SvrCapFlags.WITH_DB | SvrCapFlags.PROTOCOL41 | 42 SvrCapFlags.SECURE_CONNECTION;// | SvrCapFlags.MULTI_STATEMENTS | 43 //SvrCapFlags.MULTI_RESULTS; 44 45 /++ 46 A struct representing a database connection. 47 48 The Connection is responsible for handshaking with the server to establish 49 authentication. It then passes client preferences to the server, and 50 subsequently is the channel for all command packets that are sent, and all 51 response packets received. 52 53 Uncompressed packets consist of a 4 byte header - 3 bytes of length, and one 54 byte as a packet number. Connection deals with the headers and ensures that 55 packet numbers are sequential. 56 57 The initial packet is sent by the server - essentially a 'hello' packet 58 inviting login. That packet has a sequence number of zero. That sequence 59 number is the incremented by client and server packets through the handshake 60 sequence. 61 62 After login all further sequences are initialized by the client sending a 63 command packet with a zero sequence number, to which the server replies with 64 zero or more packets with sequential sequence numbers. 65 +/ 66 class Connection 67 { 68 package: 69 enum OpenState 70 { 71 /// We have not yet connected to the server, or have sent QUIT to the 72 /// server and closed the connection 73 notConnected, 74 /// We have connected to the server and parsed the greeting, but not 75 /// yet authenticated 76 connected, 77 /// We have successfully authenticated against the server, and need to 78 /// send QUIT to the server when closing the connection 79 authenticated 80 } 81 OpenState _open; 82 MySQLSocket _socket; 83 84 SvrCapFlags _sCaps, _cCaps; 85 uint _sThread; 86 ushort _serverStatus; 87 ubyte _sCharSet, _protocol; 88 string _serverVersion; 89 90 string _host, _user, _pwd, _db; 91 ushort _port; 92 93 MySQLSocketType _socketType; 94 95 OpenSocketCallbackPhobos _openSocketPhobos; 96 OpenSocketCallbackVibeD _openSocketVibeD; 97 98 ulong _insertID; 99 100 // This gets incremented every time a command is issued or results are purged, 101 // so a ResultRange can tell whether it's been invalidated. 102 ulong _lastCommandId; 103 104 // Whether there are rows, headers or bimary data waiting to be retreived. 105 // MySQL protocol doesn't permit performing any other action until all 106 // such data is read. 107 bool _rowsPending, _headersPending, _binaryPending; 108 109 // Field count of last performed command. 110 ushort _fieldCount; 111 112 // ResultSetHeaders of last performed command. 113 ResultSetHeaders _rsh; 114 115 // This tiny thing here is pretty critical. Pay great attention to it's maintenance, otherwise 116 // you'll get the dreaded "packet out of order" message. It, and the socket connection are 117 // the reason why most other objects require a connection object for their construction. 118 ubyte _cpn; /// Packet Number in packet header. Serial number to ensure correct 119 /// ordering. First packet should have 0 120 @property ubyte pktNumber() { return _cpn; } 121 void bumpPacket() { _cpn++; } 122 void resetPacket() { _cpn = 0; } 123 124 version(Have_vibe_d_core) {} else 125 pure const nothrow invariant() 126 { 127 assert(_socketType != MySQLSocketType.vibed); 128 } 129 130 void enforceNothingPending() 131 { 132 enforceEx!MYXDataPending(!hasPending); 133 } 134 135 debug(MYSQL_INTEGRATION_TESTS) 136 unittest 137 { 138 import mysql.protocol.prepared; 139 import mysql.test.common : scopedCn; 140 mixin(scopedCn); 141 142 cn.exec("DROP TABLE IF EXISTS `enforceNothingPending`"); 143 cn.exec("CREATE TABLE `enforceNothingPending` ( 144 `val` INTEGER 145 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 146 147 immutable insertSQL = "INSERT INTO `enforceNothingPending` VALUES (1), (2)"; 148 immutable selectSQL = "SELECT * FROM `enforceNothingPending`"; 149 Prepared preparedInsert; 150 Prepared preparedSelect; 151 assertNotThrown!MYXDataPending(cn.exec(insertSQL)); 152 assertNotThrown!MYXDataPending(cn.querySet(selectSQL)); 153 assertNotThrown!MYXDataPending(preparedInsert = cn.prepare(insertSQL)); 154 assertNotThrown!MYXDataPending(preparedSelect = cn.prepare(selectSQL)); 155 assertNotThrown!MYXDataPending(preparedInsert.exec()); 156 assertNotThrown!MYXDataPending(preparedSelect.querySet()); 157 158 auto resultSeq = cn.query(selectSQL); 159 160 assertThrown!MYXDataPending(cn.exec(insertSQL)); 161 assertThrown!MYXDataPending(cn.querySet(selectSQL)); 162 assertThrown!MYXDataPending(cn.query(selectSQL)); 163 assertThrown!MYXDataPending(cn.prepare(selectSQL)); 164 assertThrown!MYXDataPending(preparedInsert.exec()); 165 assertThrown!MYXDataPending(preparedSelect.querySet()); 166 167 resultSeq.each(); // Consume range 168 169 assertNotThrown!MYXDataPending(cn.exec(insertSQL)); 170 assertNotThrown!MYXDataPending(cn.querySet(selectSQL)); 171 assertNotThrown!MYXDataPending(cn.prepare(selectSQL)); 172 assertNotThrown!MYXDataPending(preparedInsert.exec()); 173 assertNotThrown!MYXDataPending(preparedSelect.querySet()); 174 } 175 176 ubyte[] getPacket() 177 { 178 scope(failure) kill(); 179 180 ubyte[4] header; 181 _socket.read(header); 182 // number of bytes always set as 24-bit 183 uint numDataBytes = (header[2] << 16) + (header[1] << 8) + header[0]; 184 enforceEx!MYXProtocol(header[3] == pktNumber, "Server packet out of order"); 185 bumpPacket(); 186 187 ubyte[] packet = new ubyte[numDataBytes]; 188 _socket.read(packet); 189 assert(packet.length == numDataBytes, "Wrong number of bytes read"); 190 return packet; 191 } 192 193 void send(const(ubyte)[] packet) 194 in 195 { 196 assert(packet.length > 4); // at least 1 byte more than header 197 } 198 body 199 { 200 _socket.write(packet); 201 } 202 203 void send(const(ubyte)[] header, const(ubyte)[] data) 204 in 205 { 206 assert(header.length == 4 || header.length == 5/*command type included*/); 207 } 208 body 209 { 210 _socket.write(header); 211 if(data.length) 212 _socket.write(data); 213 } 214 215 void sendCmd(T)(CommandType cmd, const(T)[] data) 216 in 217 { 218 // Internal thread states. Clients shouldn't use this 219 assert(cmd != CommandType.SLEEP); 220 assert(cmd != CommandType.CONNECT); 221 assert(cmd != CommandType.TIME); 222 assert(cmd != CommandType.DELAYED_INSERT); 223 assert(cmd != CommandType.CONNECT_OUT); 224 225 // Deprecated 226 assert(cmd != CommandType.CREATE_DB); 227 assert(cmd != CommandType.DROP_DB); 228 assert(cmd != CommandType.TABLE_DUMP); 229 230 // cannot send more than uint.max bytes. TODO: better error message if we try? 231 assert(data.length <= uint.max); 232 } 233 out 234 { 235 // at this point we should have sent a command 236 assert(pktNumber == 1); 237 } 238 body 239 { 240 enforceEx!MYX(!(_headersPending || _rowsPending), 241 "There are result set elements pending - purgeResult() required."); 242 243 scope(failure) kill(); 244 245 _lastCommandId++; 246 247 if(!_socket.connected) 248 { 249 if(cmd == CommandType.QUIT) 250 return; // Don't bother reopening connection just to quit 251 252 _open = OpenState.notConnected; 253 connect(_clientCapabilities); 254 } 255 256 resetPacket(); 257 258 ubyte[] header; 259 header.length = 4 /*header*/ + 1 /*cmd*/; 260 header.setPacketHeader(pktNumber, cast(uint)data.length +1/*cmd byte*/); 261 header[4] = cmd; 262 bumpPacket(); 263 264 send(header, cast(const(ubyte)[])data); 265 } 266 267 OKErrorPacket getCmdResponse(bool asString = false) 268 { 269 auto okp = OKErrorPacket(getPacket()); 270 enforcePacketOK(okp); 271 _serverStatus = okp.serverStatus; 272 return okp; 273 } 274 275 ubyte[] buildAuthPacket(ubyte[] token) 276 in 277 { 278 assert(token.length == 20); 279 } 280 body 281 { 282 ubyte[] packet; 283 packet.reserve(4/*header*/ + 4 + 4 + 1 + 23 + _user.length+1 + token.length+1 + _db.length+1); 284 packet.length = 4 + 4 + 4; // create room for the beginning headers that we set rather than append 285 286 // NOTE: we'll set the header last when we know the size 287 288 // Set the default capabilities required by the client 289 _cCaps.packInto(packet[4..8]); 290 291 // Request a conventional maximum packet length. 292 1.packInto(packet[8..12]); 293 294 packet ~= 33; // Set UTF-8 as default charSet 295 296 // There's a statutory block of zero bytes here - fill them in. 297 foreach(i; 0 .. 23) 298 packet ~= 0; 299 300 // Add the user name as a null terminated string 301 foreach(i; 0 .. _user.length) 302 packet ~= _user[i]; 303 packet ~= 0; // \0 304 305 // Add our calculated authentication token as a length prefixed string. 306 assert(token.length <= ubyte.max); 307 if(_pwd.length == 0) // Omit the token if the account has no password 308 packet ~= 0; 309 else 310 { 311 packet ~= cast(ubyte)token.length; 312 foreach(i; 0 .. token.length) 313 packet ~= token[i]; 314 } 315 316 // Add the default database as a null terminated string 317 foreach(i; 0 .. _db.length) 318 packet ~= _db[i]; 319 packet ~= 0; // \0 320 321 // The server sent us a greeting with packet number 0, so we send the auth packet 322 // back with the next number. 323 packet.setPacketHeader(pktNumber); 324 bumpPacket(); 325 return packet; 326 } 327 328 void consumeServerInfo(ref ubyte[] packet) 329 { 330 scope(failure) kill(); 331 332 _sCaps = cast(SvrCapFlags)packet.consume!ushort(); // server_capabilities (lower bytes) 333 _sCharSet = packet.consume!ubyte(); // server_language 334 _serverStatus = packet.consume!ushort(); //server_status 335 _sCaps += cast(SvrCapFlags)(packet.consume!ushort() << 16); // server_capabilities (upper bytes) 336 _sCaps |= SvrCapFlags.OLD_LONG_PASSWORD; // Assumed to be set since v4.1.1, according to spec 337 338 enforceEx!MYX(_sCaps & SvrCapFlags.PROTOCOL41, "Server doesn't support protocol v4.1"); 339 enforceEx!MYX(_sCaps & SvrCapFlags.SECURE_CONNECTION, "Server doesn't support protocol v4.1 connection"); 340 } 341 342 ubyte[] parseGreeting() 343 { 344 scope(failure) kill(); 345 346 ubyte[] packet = getPacket(); 347 348 if (packet.length > 0 && packet[0] == ResultPacketMarker.error) 349 { 350 auto okp = OKErrorPacket(packet); 351 enforceEx!MYX(!okp.error, "Connection failure: " ~ cast(string) okp.message); 352 } 353 354 _protocol = packet.consume!ubyte(); 355 356 _serverVersion = packet.consume!string(packet.countUntil(0)); 357 packet.skip(1); // \0 terminated _serverVersion 358 359 _sThread = packet.consume!uint(); 360 361 // read first part of scramble buf 362 ubyte[] authBuf; 363 authBuf.length = 255; 364 authBuf[0..8] = packet.consume(8)[]; // scramble_buff 365 366 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "filler should always be 0"); 367 368 consumeServerInfo(packet); 369 370 packet.skip(1); // this byte supposed to be scramble length, but is actually zero 371 packet.skip(10); // filler of \0 372 373 // rest of the scramble 374 auto len = packet.countUntil(0); 375 enforceEx!MYXProtocol(len >= 12, "second part of scramble buffer should be at least 12 bytes"); 376 enforce(authBuf.length > 8+len); 377 authBuf[8..8+len] = packet.consume(len)[]; 378 authBuf.length = 8+len; // cut to correct size 379 enforceEx!MYXProtocol(packet.consume!ubyte() == 0, "Excepted \\0 terminating scramble buf"); 380 381 return authBuf; 382 } 383 384 static PlainPhobosSocket defaultOpenSocketPhobos(string host, ushort port) 385 { 386 auto s = new PlainPhobosSocket(); 387 s.connect(new InternetAddress(host, port)); 388 return s; 389 } 390 391 static PlainVibeDSocket defaultOpenSocketVibeD(string host, ushort port) 392 { 393 version(Have_vibe_d_core) 394 return vibe.core.net.connectTCP(host, port); 395 else 396 assert(0); 397 } 398 399 void initConnection() 400 { 401 resetPacket(); 402 final switch(_socketType) 403 { 404 case MySQLSocketType.phobos: 405 _socket = new MySQLSocketPhobos(_openSocketPhobos(_host, _port)); 406 break; 407 408 case MySQLSocketType.vibed: 409 version(Have_vibe_d_core) { 410 _socket = new MySQLSocketVibeD(_openSocketVibeD(_host, _port)); 411 break; 412 } else assert(0, "Unsupported socket type. Need version Have_vibe_d_core."); 413 } 414 } 415 416 ubyte[] makeToken(ubyte[] authBuf) 417 { 418 auto pass1 = sha1Of(cast(const(ubyte)[])_pwd); 419 auto pass2 = sha1Of(pass1); 420 421 SHA1 sha1; 422 sha1.start(); 423 sha1.put(authBuf); 424 sha1.put(pass2); 425 auto result = sha1.finish(); 426 foreach (size_t i; 0..20) 427 result[i] = result[i] ^ pass1[i]; 428 return result.dup; 429 } 430 431 SvrCapFlags getCommonCapabilities(SvrCapFlags server, SvrCapFlags client) pure 432 { 433 SvrCapFlags common; 434 uint filter = 1; 435 foreach (size_t i; 0..uint.sizeof) 436 { 437 bool serverSupport = (server & filter) != 0; // can the server do this capability? 438 bool clientSupport = (client & filter) != 0; // can we support it? 439 if(serverSupport && clientSupport) 440 common |= filter; 441 filter <<= 1; // check next flag 442 } 443 return common; 444 } 445 446 void setClientFlags(SvrCapFlags capFlags) 447 { 448 _cCaps = getCommonCapabilities(_sCaps, capFlags); 449 450 // We cannot operate in <4.1 protocol, so we'll force it even if the user 451 // didn't supply it 452 _cCaps |= SvrCapFlags.PROTOCOL41; 453 _cCaps |= SvrCapFlags.SECURE_CONNECTION; 454 } 455 456 void authenticate(ubyte[] greeting) 457 in 458 { 459 assert(_open == OpenState.connected); 460 } 461 out 462 { 463 assert(_open == OpenState.authenticated); 464 } 465 body 466 { 467 auto token = makeToken(greeting); 468 auto authPacket = buildAuthPacket(token); 469 send(authPacket); 470 471 auto packet = getPacket(); 472 auto okp = OKErrorPacket(packet); 473 enforceEx!MYX(!okp.error, "Authentication failure: " ~ cast(string) okp.message); 474 _open = OpenState.authenticated; 475 } 476 477 SvrCapFlags _clientCapabilities; 478 479 void connect(SvrCapFlags clientCapabilities) 480 in 481 { 482 assert(closed); 483 } 484 out 485 { 486 assert(_open == OpenState.authenticated); 487 } 488 body 489 { 490 initConnection(); 491 auto greeting = parseGreeting(); 492 _open = OpenState.connected; 493 494 _clientCapabilities = clientCapabilities; 495 setClientFlags(clientCapabilities); 496 authenticate(greeting); 497 } 498 499 // Forcefully close the socket without sending the quit command. 500 // Needed in case an error leaves communatations in an undefined or non-recoverable state. 501 void kill() 502 { 503 if(_socket.connected) 504 _socket.close(); 505 _open = OpenState.notConnected; 506 } 507 508 public: 509 510 /++ 511 Construct opened connection. 512 513 After the connection is created, and the initial invitation is received from the server 514 client preferences can be set, and authentication can then be attempted. 515 516 Parameters: 517 socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos, 518 unless -version=Have_vibe_d_core is used. 519 openSocket = Optional callback which should return a newly-opened Phobos 520 or Vibe.d TCP socket. This allows custom sockets to be used, 521 subclassed from Phobos's or Vibe.d's sockets. 522 host = An IP address in numeric dotted form, or as a host name. 523 user = The user name to authenticate. 524 password = Users password. 525 db = Desired initial database. 526 capFlags = The set of flag bits from the server's capabilities that the client requires 527 +/ 528 this(string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 529 { 530 version(Have_vibe_d_core) 531 enum defaultSocketType = MySQLSocketType.vibed; 532 else 533 enum defaultSocketType = MySQLSocketType.phobos; 534 535 this(defaultSocketType, host, user, pwd, db, port, capFlags); 536 } 537 538 ///ditto 539 this(MySQLSocketType socketType, string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 540 { 541 version(Have_vibe_d_core) {} else 542 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 543 544 this(socketType, &defaultOpenSocketPhobos, &defaultOpenSocketVibeD, 545 host, user, pwd, db, port, capFlags); 546 } 547 548 ///ditto 549 this(OpenSocketCallbackPhobos openSocket, 550 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 551 { 552 this(MySQLSocketType.phobos, openSocket, null, host, user, pwd, db, port, capFlags); 553 } 554 555 version(Have_vibe_d_core) 556 ///ditto 557 this(OpenSocketCallbackVibeD openSocket, 558 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 559 { 560 this(MySQLSocketType.vibed, null, openSocket, host, user, pwd, db, port, capFlags); 561 } 562 563 private this(MySQLSocketType socketType, 564 OpenSocketCallbackPhobos openSocketPhobos, OpenSocketCallbackVibeD openSocketVibeD, 565 string host, string user, string pwd, string db, ushort port = 3306, SvrCapFlags capFlags = defaultClientFlags) 566 in 567 { 568 final switch(socketType) 569 { 570 case MySQLSocketType.phobos: assert(openSocketPhobos !is null); break; 571 case MySQLSocketType.vibed: assert(openSocketVibeD !is null); break; 572 } 573 } 574 body 575 { 576 enforceEx!MYX(capFlags & SvrCapFlags.PROTOCOL41, "This client only supports protocol v4.1"); 577 enforceEx!MYX(capFlags & SvrCapFlags.SECURE_CONNECTION, "This client only supports protocol v4.1 connection"); 578 version(Have_vibe_d_core) {} else 579 enforceEx!MYX(socketType != MySQLSocketType.vibed, "Cannot use Vibe.d sockets without -version=Have_vibe_d_core"); 580 581 _socketType = socketType; 582 _host = host; 583 _user = user; 584 _pwd = pwd; 585 _db = db; 586 _port = port; 587 588 _openSocketPhobos = openSocketPhobos; 589 _openSocketVibeD = openSocketVibeD; 590 591 connect(capFlags); 592 } 593 594 /++ 595 Construct opened connection. 596 597 After the connection is created, and the initial invitation is received from 598 the server client preferences are set, and authentication can then be attempted. 599 600 TBD The connection string needs work to allow for semicolons in its parts! 601 602 Parameters: 603 socketType = Whether to use a Phobos or Vibe.d socket. Default is Phobos 604 unless -version=Have_vibe_d_core is used. 605 openSocket = Optional callback which should return a newly-opened Phobos 606 or Vibe.d TCP socket. This allows custom sockets to be used, 607 subclassed from Phobos's or Vibe.d's sockets. 608 cs = A connection string of the form "host=localhost;user=user;pwd=password;db=mysqld" 609 capFlags = The set of flag bits from the server's capabilities that the client requires 610 +/ 611 this(string cs, SvrCapFlags capFlags = defaultClientFlags) 612 { 613 string[] a = parseConnectionString(cs); 614 this(a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 615 } 616 617 ///ditto 618 this(MySQLSocketType socketType, string cs, SvrCapFlags capFlags = defaultClientFlags) 619 { 620 string[] a = parseConnectionString(cs); 621 this(socketType, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 622 } 623 624 ///ditto 625 this(OpenSocketCallbackPhobos openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 626 { 627 string[] a = parseConnectionString(cs); 628 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 629 } 630 631 version(Have_vibe_d_core) 632 ///ditto 633 this(OpenSocketCallbackVibeD openSocket, string cs, SvrCapFlags capFlags = defaultClientFlags) 634 { 635 string[] a = parseConnectionString(cs); 636 this(openSocket, a[0], a[1], a[2], a[3], to!ushort(a[4]), capFlags); 637 } 638 639 @property bool closed() 640 { 641 return _open == OpenState.notConnected || !_socket.connected; 642 } 643 644 version(Have_vibe_d_core) 645 { 646 void acquire() { if( _socket ) _socket.acquire(); } 647 void release() { if( _socket ) _socket.release(); } 648 bool isOwner() { return _socket ? _socket.isOwner() : false; } 649 bool amOwner() { return _socket ? _socket.isOwner() : false; } 650 } 651 else 652 { 653 void acquire() { /+ Do nothing +/ } 654 void release() { /+ Do nothing +/ } 655 bool isOwner() { return !!_socket; } 656 bool amOwner() { return !!_socket; } 657 } 658 659 /++ 660 Explicitly close the connection. 661 662 This is a two-stage process. First tell the server we are quitting this 663 connection, and then close the socket. 664 665 Idiomatic use as follows is suggested: 666 ------------------ 667 { 668 auto con = Connection("localhost:user:password:mysqld"); 669 scope(exit) con.close(); 670 // Use the connection 671 ... 672 } 673 ------------------ 674 +/ 675 void close() 676 { 677 if (_open == OpenState.authenticated && _socket.connected) 678 quit(); 679 680 if (_open == OpenState.connected) 681 kill(); 682 resetPacket(); 683 } 684 685 void reconnect() 686 { 687 reconnect(_clientCapabilities); 688 } 689 690 void reconnect(SvrCapFlags clientCapabilities) 691 { 692 bool sameCaps = clientCapabilities == _clientCapabilities; 693 if(!closed) 694 { 695 // Same caps as before? 696 if(clientCapabilities == _clientCapabilities) 697 return; // Nothing to do, just keep current connection 698 699 close(); 700 } 701 702 connect(clientCapabilities); 703 } 704 705 private void quit() 706 in 707 { 708 assert(_open == OpenState.authenticated); 709 } 710 body 711 { 712 sendCmd(CommandType.QUIT, []); 713 // No response is sent for a quit packet 714 _open = OpenState.connected; 715 } 716 717 static string[] parseConnectionString(string cs) 718 { 719 string[] rv; 720 rv.length = 5; 721 rv[4] = "3306"; // Default port 722 string[] a = split(cs, ";"); 723 foreach (s; a) 724 { 725 string[] a2 = split(s, "="); 726 enforceEx!MYX(a2.length == 2, "Bad connection string: " ~ cs); 727 string name = strip(a2[0]); 728 string val = strip(a2[1]); 729 switch (name) 730 { 731 case "host": 732 rv[0] = val; 733 break; 734 case "user": 735 rv[1] = val; 736 break; 737 case "pwd": 738 rv[2] = val; 739 break; 740 case "db": 741 rv[3] = val; 742 break; 743 case "port": 744 rv[4] = val; 745 break; 746 default: 747 throw new MYX("Bad connection string: " ~ cs, __FILE__, __LINE__); 748 } 749 } 750 return rv; 751 } 752 753 /++ 754 Select a current database. 755 756 Params: dbName = Name of the requested database 757 Throws: MySQLException 758 +/ 759 void selectDB(string dbName) 760 { 761 sendCmd(CommandType.INIT_DB, dbName); 762 getCmdResponse(); 763 _db = dbName; 764 } 765 766 /++ 767 Check the server status 768 769 Returns: An OKErrorPacket from which server status can be determined 770 Throws: MySQLException 771 +/ 772 OKErrorPacket pingServer() 773 { 774 sendCmd(CommandType.PING, []); 775 return getCmdResponse(); 776 } 777 778 /++ 779 Refresh some feature(s) of the server. 780 781 Returns: An OKErrorPacket from which server status can be determined 782 Throws: MySQLException 783 +/ 784 OKErrorPacket refreshServer(RefreshFlags flags) 785 { 786 sendCmd(CommandType.REFRESH, [flags]); 787 return getCmdResponse(); 788 } 789 790 /++ 791 Get the next Row of a pending result set. 792 793 This method can be used after either execSQL() or execPrepared() have returned true 794 to retrieve result set rows sequentially. 795 796 Similar functionality is available via execSQLSequence() and execPreparedSequence() in 797 which case the interface is presented as a forward range of Rows. 798 799 This method allows you to deal with very large result sets either a row at a time, 800 or by feeding the rows into some suitable container such as a linked list. 801 802 Returns: A Row object. 803 +/ 804 Row getNextRow() 805 { 806 scope(failure) kill(); 807 808 if (_headersPending) 809 { 810 _rsh = ResultSetHeaders(this, _fieldCount); 811 _headersPending = false; 812 } 813 ubyte[] packet; 814 Row rr; 815 packet = getPacket(); 816 if (packet.isEOFPacket()) 817 { 818 _rowsPending = _binaryPending = false; 819 return rr; 820 } 821 if (_binaryPending) 822 rr = Row(this, packet, _rsh, true); 823 else 824 rr = Row(this, packet, _rsh, false); 825 //rr._valid = true; 826 return rr; 827 } 828 829 /++ 830 Flush any outstanding result set elements. 831 832 When the server responds to a command that produces a result set, it 833 queues the whole set of corresponding packets over the current connection. 834 Before that Connection can embark on any new command, it must receive 835 all of those packets and junk them. 836 http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/ 837 +/ 838 ulong purgeResult() 839 { 840 scope(failure) kill(); 841 842 _lastCommandId++; 843 844 ulong rows = 0; 845 if (_headersPending) 846 { 847 for (size_t i = 0;; i++) 848 { 849 if (getPacket().isEOFPacket()) 850 { 851 _headersPending = false; 852 break; 853 } 854 enforceEx!MYXProtocol(i < _fieldCount, 855 text("Field header count (", _fieldCount, ") exceeded but no EOF packet found.")); 856 } 857 } 858 if (_rowsPending) 859 { 860 for (;; rows++) 861 { 862 if (getPacket().isEOFPacket()) 863 { 864 _rowsPending = _binaryPending = false; 865 break; 866 } 867 } 868 } 869 resetPacket(); 870 return rows; 871 } 872 873 /++ 874 Get a textual report on the server status. 875 876 (COM_STATISTICS) 877 +/ 878 string serverStats() 879 { 880 sendCmd(CommandType.STATISTICS, []); 881 return cast(string) getPacket(); 882 } 883 884 /++ 885 Enable multiple statement commands 886 887 This can be used later if this feature was not requested in the client capability flags. 888 889 Params: on = Boolean value to turn the capability on or off. 890 +/ 891 void enableMultiStatements(bool on) 892 { 893 scope(failure) kill(); 894 895 ubyte[] t; 896 t.length = 2; 897 t[0] = on ? 0 : 1; 898 t[1] = 0; 899 sendCmd(CommandType.STMT_OPTION, t); 900 901 // For some reason this command gets an EOF packet as response 902 auto packet = getPacket(); 903 enforceEx!MYXProtocol(packet[0] == 254 && packet.length == 5, "Unexpected response to SET_OPTION command"); 904 } 905 906 /// Return the in-force protocol number 907 @property ubyte protocol() pure const nothrow { return _protocol; } 908 /// Server version 909 @property string serverVersion() pure const nothrow { return _serverVersion; } 910 /// Server capability flags 911 @property uint serverCapabilities() pure const nothrow { return _sCaps; } 912 /// Server status 913 @property ushort serverStatus() pure const nothrow { return _serverStatus; } 914 /// Current character set 915 @property ubyte charSet() pure const nothrow { return _sCharSet; } 916 /// Current database 917 @property string currentDB() pure const nothrow { return _db; } 918 /// Socket type being used 919 @property MySQLSocketType socketType() pure const nothrow { return _socketType; } 920 921 /// After a command that inserted a row into a table with an auto-increment 922 /// ID column, this method allows you to retrieve the last insert ID. 923 @property ulong lastInsertID() pure const nothrow { return _insertID; } 924 925 /// This gets incremented every time a command is issued or results are purged, 926 /// so a ResultRange can tell whether it's been invalidated. 927 @property ulong lastCommandId() pure const nothrow { return _lastCommandId; } 928 929 /// Gets whether rows are pending 930 @property bool rowsPending() pure const nothrow { return _rowsPending; } 931 932 /// Gets whether anything (rows, headers or binary) is pending. 933 /// New commands cannot be sent on a conncection while anything is pending. 934 @property bool hasPending() pure const nothrow 935 { 936 return _rowsPending || _headersPending || _binaryPending; 937 } 938 939 /// Gets the result header's field descriptions. 940 @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; } 941 }