1 /++ 2 Connect to a MySQL/MariaDB database using vibe.d's 3 $(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool, ConnectionPool). 4 5 You have to include vibe.d in your project to be able to use this class. 6 If you don't want to, refer to `mysql.connection.Connection`. 7 8 This provides various benefits over creating a new connection manually, 9 such as automatically reusing old connections, and automatic cleanup (no need to close 10 the connection when done). 11 +/ 12 module mysql.pool; 13 14 import std.conv; 15 import std.typecons; 16 import mysql.connection; 17 import mysql.prepared; 18 import mysql.protocol.constants; 19 debug(MYSQLN_TESTS) 20 { 21 import mysql.test.common; 22 } 23 24 version(Have_vibe_d_core) version = IncludeMySQLPool; 25 version(MySQLDocs) version = IncludeMySQLPool; 26 27 version(IncludeMySQLPool) 28 { 29 version(Have_vibe_d_core) 30 import vibe.core.connectionpool; 31 else version(MySQLDocs) 32 { 33 /++ 34 Vibe.d's 35 $(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool, ConnectionPool) 36 class. 37 38 Not actually included in module `mysql.pool`. Only listed here for 39 documentation purposes. For ConnectionPool and it's documentation, see: 40 $(LINK http://vibed.org/api/vibe.core.connectionpool/ConnectionPool) 41 +/ 42 class ConnectionPool(T) 43 { 44 /// See: $(LINK http://vibed.org/api/vibe.core.connectionpool/ConnectionPool.this) 45 this(Connection delegate() connection_factory, uint max_concurrent = (uint).max) 46 {} 47 48 /// See: $(LINK http://vibed.org/api/vibe.core.connectionpool/ConnectionPool.lockConnection) 49 LockedConnection!T lockConnection() { return LockedConnection!T(); } 50 51 /// See: $(LINK http://vibed.org/api/vibe.core.connectionpool/ConnectionPool.maxConcurrency) 52 uint maxConcurrency; 53 } 54 55 /++ 56 Vibe.d's 57 $(LINK2 http://vibed.org/api/vibe.core.connectionpool/LockedConnection, LockedConnection) 58 struct. 59 60 Not actually included in module `mysql.pool`. Only listed here for 61 documentation purposes. For LockedConnection and it's documentation, see: 62 $(LINK http://vibed.org/api/vibe.core.connectionpool/LockedConnection) 63 +/ 64 struct LockedConnection(Connection) { Connection c; alias c this; } 65 } 66 67 /++ 68 A lightweight convenience interface to a MySQL/MariaDB database using vibe.d's 69 $(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool, ConnectionPool). 70 71 You have to include vibe.d in your project to be able to use this class. 72 If you don't want to, refer to `mysql.connection.Connection`. 73 74 If, for any reason, this class doesn't suit your needs, it's easy to just 75 use vibe.d's $(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool, ConnectionPool) 76 directly. Simply provide it with a delegate that creates a new `mysql.connection.Connection` 77 and does any other custom processing if needed. 78 +/ 79 class MySQLPool 80 { 81 private 82 { 83 string m_host; 84 string m_user; 85 string m_password; 86 string m_database; 87 ushort m_port; 88 SvrCapFlags m_capFlags; 89 void delegate(Connection) m_onNewConnection; 90 ConnectionPool!Connection m_pool; 91 PreparedRegistrations!PreparedInfo preparedRegistrations; 92 93 struct PreparedInfo 94 { 95 bool queuedForRelease = false; 96 } 97 } 98 99 /// Sets up a connection pool with the provided connection settings. 100 /// 101 /// The optional `onNewConnection` param allows you to set a callback 102 /// which will be run every time a new connection is created. 103 this(string host, string user, string password, string database, 104 ushort port = 3306, uint maxConcurrent = (uint).max, 105 SvrCapFlags capFlags = defaultClientFlags, 106 void delegate(Connection) onNewConnection = null) 107 { 108 m_host = host; 109 m_user = user; 110 m_password = password; 111 m_database = database; 112 m_port = port; 113 m_capFlags = capFlags; 114 m_onNewConnection = onNewConnection; 115 m_pool = new ConnectionPool!Connection(&createConnection); 116 } 117 118 ///ditto 119 this(string host, string user, string password, string database, 120 ushort port, SvrCapFlags capFlags, void delegate(Connection) onNewConnection = null) 121 { 122 this(host, user, password, database, port, (uint).max, capFlags, onNewConnection); 123 } 124 125 ///ditto 126 this(string host, string user, string password, string database, 127 ushort port, void delegate(Connection) onNewConnection) 128 { 129 this(host, user, password, database, port, (uint).max, defaultClientFlags, onNewConnection); 130 } 131 132 ///ditto 133 this(string connStr, uint maxConcurrent = (uint).max, SvrCapFlags capFlags = defaultClientFlags, 134 void delegate(Connection) onNewConnection = null) 135 { 136 auto parts = Connection.parseConnectionString(connStr); 137 this(parts[0], parts[1], parts[2], parts[3], to!ushort(parts[4]), capFlags, onNewConnection); 138 } 139 140 ///ditto 141 this(string connStr, SvrCapFlags capFlags, void delegate(Connection) onNewConnection = null) 142 { 143 this(connStr, (uint).max, capFlags, onNewConnection); 144 } 145 146 ///ditto 147 this(string connStr, void delegate(Connection) onNewConnection) 148 { 149 this(connStr, (uint).max, defaultClientFlags, onNewConnection); 150 } 151 152 /++ 153 Obtain a connection. If one isn't available, a new one will be created. 154 155 The connection returned is actually a `LockedConnection!Connection`, 156 but it uses `alias this`, and so can be used just like a Connection. 157 (See vibe.d's 158 $(LINK2 http://vibed.org/api/vibe.core.connectionpool/LockedConnection, LockedConnection documentation).) 159 160 No other fiber will be given this `mysql.connection.Connection` as long as your fiber still holds it. 161 162 There is no need to close, release or unlock this connection. It is 163 reference-counted and will automatically be returned to the pool once 164 your fiber is done with it. 165 166 If you have passed any prepared statements to `autoRegister` 167 or `autoRelease`, then those statements will automatically be 168 registered/released on the connection. (Currently, this automatic 169 register/release may actually occur upon the first command sent via 170 the connection.) 171 +/ 172 LockedConnection!Connection lockConnection() 173 { 174 auto conn = m_pool.lockConnection(); 175 if(conn.closed) 176 conn.reconnect(); 177 178 applyAuto(conn); 179 return conn; 180 } 181 182 /// Applies any `autoRegister`/`autoRelease` settings to a connection, 183 /// if necessary. 184 private void applyAuto(T)(T conn) 185 { 186 foreach(sql, info; preparedRegistrations.directLookup) 187 { 188 auto registeredOnPool = !info.queuedForRelease; 189 auto registeredOnConnection = conn.isRegistered(sql); 190 191 if(registeredOnPool && !registeredOnConnection) // Need to register? 192 conn.register(sql); 193 else if(!registeredOnPool && registeredOnConnection) // Need to release? 194 conn.release(sql); 195 } 196 } 197 198 private Connection createConnection() 199 { 200 auto conn = new Connection(m_host, m_user, m_password, m_database, m_port, m_capFlags); 201 202 if(m_onNewConnection) 203 m_onNewConnection(conn); 204 205 return conn; 206 } 207 208 /// Get/set a callback delegate to be run every time a new connection 209 /// is created. 210 @property void onNewConnection(void delegate(Connection) onNewConnection) 211 { 212 m_onNewConnection = onNewConnection; 213 } 214 215 ///ditto 216 @property void delegate(Connection) onNewConnection() 217 { 218 return m_onNewConnection; 219 } 220 221 @("onNewConnection") 222 debug(MYSQLN_TESTS) 223 unittest 224 { 225 auto count = 0; 226 void callback(Connection conn) 227 { 228 count++; 229 } 230 231 // Test getting/setting 232 auto poolA = new MySQLPool(testConnectionStr, &callback); 233 auto poolB = new MySQLPool(testConnectionStr); 234 auto poolNoCallback = new MySQLPool(testConnectionStr); 235 236 assert(poolA.onNewConnection == &callback); 237 assert(poolB.onNewConnection is null); 238 assert(poolNoCallback.onNewConnection is null); 239 240 poolB.onNewConnection = &callback; 241 assert(poolB.onNewConnection == &callback); 242 assert(count == 0); 243 244 // Ensure callback is called 245 { 246 auto connA = poolA.lockConnection(); 247 assert(!connA.closed); 248 assert(count == 1); 249 250 auto connB = poolB.lockConnection(); 251 assert(!connB.closed); 252 assert(count == 2); 253 } 254 255 // Ensure works with no callback 256 { 257 auto oldCount = count; 258 auto poolC = new MySQLPool(testConnectionStr); 259 auto connC = poolC.lockConnection(); 260 assert(!connC.closed); 261 assert(count == oldCount); 262 } 263 } 264 265 /++ 266 Forwards to vibe.d's 267 $(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool.maxConcurrency, ConnectionPool.maxConcurrency) 268 +/ 269 @property uint maxConcurrency() 270 { 271 return m_pool.maxConcurrency; 272 } 273 274 ///ditto 275 @property void maxConcurrency(uint maxConcurrent) 276 { 277 m_pool.maxConcurrency = maxConcurrent; 278 } 279 280 /++ 281 Set a prepared statement to be automatically registered on all 282 connections received from this pool. 283 284 This also clears any `autoRelease` which may have been set for this statement. 285 286 Calling this is not strictly necessary, as a prepared statement will 287 automatically be registered upon its first use on any `Connection`. 288 This is provided for those who prefer eager registration over lazy 289 for performance reasons. 290 291 Once this has been called, obtaining a connection via `lockConnection` 292 will automatically register the prepared statement on the connection 293 if it isn't already registered on the connection. This single 294 registration safely persists after the connection is reclaimed by the 295 pool and locked again by another Vibe.d task. 296 297 Note, due to the way Vibe.d works, it is not possible to eagerly 298 register or release a statement on all connections already sitting 299 in the pool. This can only be done when locking a connection. 300 301 You can stop the pool from continuing to auto-register the statement 302 by calling either `autoRelease` or `clearAuto`. 303 +/ 304 void autoRegister(Prepared prepared) 305 { 306 autoRegister(prepared.sql); 307 } 308 309 ///ditto 310 void autoRegister(const(char[]) sql) 311 { 312 preparedRegistrations.registerIfNeeded(sql, (sql) => PreparedInfo()); 313 } 314 315 /++ 316 Set a prepared statement to be automatically released from all 317 connections received from this pool. 318 319 This also clears any `autoRegister` which may have been set for this statement. 320 321 Calling this is not strictly necessary. The server considers prepared 322 statements to be per-connection, so they'll go away when the connection 323 closes anyway. This is provided in case direct control is actually needed. 324 325 Once this has been called, obtaining a connection via `lockConnection` 326 will automatically release the prepared statement from the connection 327 if it isn't already releases from the connection. 328 329 Note, due to the way Vibe.d works, it is not possible to eagerly 330 register or release a statement on all connections already sitting 331 in the pool. This can only be done when locking a connection. 332 333 You can stop the pool from continuing to auto-release the statement 334 by calling either `autoRegister` or `clearAuto`. 335 +/ 336 void autoRelease(Prepared prepared) 337 { 338 autoRelease(prepared.sql); 339 } 340 341 ///ditto 342 void autoRelease(const(char[]) sql) 343 { 344 preparedRegistrations.queueForRelease(sql); 345 } 346 347 /// Is the given statement set to be automatically registered on all 348 /// connections obtained from this connection pool? 349 bool isAutoRegistered(Prepared prepared) 350 { 351 return isAutoRegistered(prepared.sql); 352 } 353 ///ditto 354 bool isAutoRegistered(const(char[]) sql) 355 { 356 return isAutoRegistered(preparedRegistrations[sql]); 357 } 358 ///ditto 359 package bool isAutoRegistered(Nullable!PreparedInfo info) 360 { 361 return info.isNull || !info.queuedForRelease; 362 } 363 364 /// Is the given statement set to be automatically released on all 365 /// connections obtained from this connection pool? 366 bool isAutoReleased(Prepared prepared) 367 { 368 return isAutoReleased(prepared.sql); 369 } 370 ///ditto 371 bool isAutoReleased(const(char[]) sql) 372 { 373 return isAutoReleased(preparedRegistrations[sql]); 374 } 375 ///ditto 376 package bool isAutoReleased(Nullable!PreparedInfo info) 377 { 378 return info.isNull || info.queuedForRelease; 379 } 380 381 /++ 382 Is the given statement set for NEITHER auto-register 383 NOR auto-release on connections obtained from 384 this connection pool? 385 386 Equivalent to `!isAutoRegistered && !isAutoReleased`. 387 +/ 388 bool isAutoCleared(Prepared prepared) 389 { 390 return isAutoCleared(prepared.sql); 391 } 392 ///ditto 393 bool isAutoCleared(const(char[]) sql) 394 { 395 return isAutoCleared(preparedRegistrations[sql]); 396 } 397 ///ditto 398 package bool isAutoCleared(Nullable!PreparedInfo info) 399 { 400 return info.isNull; 401 } 402 403 /++ 404 Removes any `autoRegister` or `autoRelease` which may have been set 405 for this prepared statement. 406 407 Does nothing if the statement has not been set for auto-register or auto-release. 408 409 This releases any relevent memory for potential garbage collection. 410 +/ 411 void clearAuto(Prepared prepared) 412 { 413 return clearAuto(prepared.sql); 414 } 415 ///ditto 416 void clearAuto(const(char[]) sql) 417 { 418 preparedRegistrations.directLookup.remove(sql); 419 } 420 421 /++ 422 Removes ALL prepared statement `autoRegister` and `autoRelease` which have been set. 423 424 This releases all relevent memory for potential garbage collection. 425 +/ 426 void clearAllRegistrations() 427 { 428 preparedRegistrations.clear(); 429 } 430 } 431 432 @("registration") 433 debug(MYSQLN_TESTS) 434 unittest 435 { 436 import mysql.commands; 437 auto pool = new MySQLPool(testConnectionStr); 438 439 // Setup 440 Connection cn = pool.lockConnection(); 441 cn.exec("DROP TABLE IF EXISTS `poolRegistration`"); 442 cn.exec("CREATE TABLE `poolRegistration` ( 443 `data` LONGBLOB 444 ) ENGINE=InnoDB DEFAULT CHARSET=utf8"); 445 immutable sql = "SELECT * from `poolRegistration`"; 446 //auto cn2 = pool.lockConnection(); // Seems to return the same connection as `cn` 447 auto cn2 = pool.createConnection(); 448 pool.applyAuto(cn2); 449 assert(cn !is cn2); 450 451 // Tests: 452 // Initial 453 assert(pool.isAutoCleared(sql)); 454 assert(pool.isAutoRegistered(sql)); 455 assert(pool.isAutoReleased(sql)); 456 assert(!cn.isRegistered(sql)); 457 assert(!cn2.isRegistered(sql)); 458 459 // Register on connection #1 460 auto prepared = cn.prepare(sql); 461 { 462 assert(pool.isAutoCleared(sql)); 463 assert(pool.isAutoRegistered(sql)); 464 assert(pool.isAutoReleased(sql)); 465 assert(cn.isRegistered(sql)); 466 assert(!cn2.isRegistered(sql)); 467 468 //auto cn3 = pool.lockConnection(); // Seems to return the same connection as `cn` 469 auto cn3 = pool.createConnection(); 470 pool.applyAuto(cn3); 471 assert(!cn3.isRegistered(sql)); 472 } 473 474 // autoRegister 475 pool.autoRegister(prepared); 476 { 477 assert(!pool.isAutoCleared(sql)); 478 assert(pool.isAutoRegistered(sql)); 479 assert(!pool.isAutoReleased(sql)); 480 assert(cn.isRegistered(sql)); 481 assert(!cn2.isRegistered(sql)); 482 483 //auto cn3 = pool.lockConnection(); // Seems to return the *same* connection as `cn` 484 auto cn3 = pool.createConnection(); 485 pool.applyAuto(cn3); 486 assert(cn3.isRegistered(sql)); 487 } 488 489 // autoRelease 490 pool.autoRelease(prepared); 491 { 492 assert(!pool.isAutoCleared(sql)); 493 assert(!pool.isAutoRegistered(sql)); 494 assert(pool.isAutoReleased(sql)); 495 assert(cn.isRegistered(sql)); 496 assert(!cn2.isRegistered(sql)); 497 498 //auto cn3 = pool.lockConnection(); // Seems to return the same connection as `cn` 499 auto cn3 = pool.createConnection(); 500 pool.applyAuto(cn3); 501 assert(!cn3.isRegistered(sql)); 502 } 503 504 // clearAuto 505 pool.clearAuto(prepared); 506 { 507 assert(pool.isAutoCleared(sql)); 508 assert(pool.isAutoRegistered(sql)); 509 assert(pool.isAutoReleased(sql)); 510 assert(cn.isRegistered(sql)); 511 assert(!cn2.isRegistered(sql)); 512 513 //auto cn3 = pool.lockConnection(); // Seems to return the same connection as `cn` 514 auto cn3 = pool.createConnection(); 515 pool.applyAuto(cn3); 516 assert(!cn3.isRegistered(sql)); 517 } 518 } 519 520 @("closedConnection") // "cct" 521 debug(MYSQLN_TESTS) 522 { 523 MySQLPool cctPool; 524 int cctCount=0; 525 526 void cctStart() 527 { 528 import std.array; 529 import mysql.commands; 530 531 cctPool = new MySQLPool(testConnectionStr); 532 cctPool.onNewConnection = (Connection conn) { cctCount++; }; 533 assert(cctCount == 0); 534 535 auto cn = cctPool.lockConnection(); 536 assert(!cn.closed); 537 cn.close(); 538 assert(cn.closed); 539 assert(cctCount == 1); 540 } 541 542 unittest 543 { 544 cctStart(); 545 assert(cctCount == 1); 546 547 auto cn = cctPool.lockConnection(); 548 assert(cctCount == 1); 549 assert(!cn.closed); 550 } 551 } 552 }