1 /++
2 Connect to a MySQL/MariaDB database using vibe.d's
3 $(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool, ConnectionPool).
4 
5 You have to include vibe.d in your project to be able to use this class.
6 If you don't want to, refer to `mysql.connection.Connection`.
7 
8 This provides various benefits over creating a new connection manually,
9 such as automatically reusing old connections, and automatic cleanup (no need to close
10 the connection when done).
11 +/
12 module mysql.pool;
13 
14 import std.conv;
15 import std.typecons;
16 import mysql.connection;
17 import mysql.prepared;
18 import mysql.protocol.constants;
19 debug(MYSQLN_TESTS)
20 {
21 	import mysql.test.common;
22 }
23 
24 version(Have_vibe_d_core) version = IncludeMySQLPool;
25 version(MySQLDocs)        version = IncludeMySQLPool;
26 
27 version(IncludeMySQLPool)
28 {
29 	version(Have_vibe_d_core)
30 		import vibe.core.connectionpool;
31 	else version(MySQLDocs)
32 	{
33 		/++
34 		Vibe.d's
35 		$(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool, ConnectionPool)
36 		class.
37 
38 		Not actually included in module `mysql.pool`. Only listed here for
39 		documentation purposes. For ConnectionPool and it's documentation, see:
40 		$(LINK http://vibed.org/api/vibe.core.connectionpool/ConnectionPool)
41 		+/
42 		class ConnectionPool(T)
43 		{
44 			/// See: $(LINK http://vibed.org/api/vibe.core.connectionpool/ConnectionPool.this)
45 			this(Connection delegate() connection_factory, uint max_concurrent = (uint).max)
46 			{}
47 
48 			/// See: $(LINK http://vibed.org/api/vibe.core.connectionpool/ConnectionPool.lockConnection)
49 			LockedConnection!T lockConnection() { return LockedConnection!T(); }
50 
51 			/// See: $(LINK http://vibed.org/api/vibe.core.connectionpool/ConnectionPool.maxConcurrency)
52 			uint maxConcurrency;
53 		}
54 
55 		/++
56 		Vibe.d's
57 		$(LINK2 http://vibed.org/api/vibe.core.connectionpool/LockedConnection, LockedConnection)
58 		struct.
59 
60 		Not actually included in module `mysql.pool`. Only listed here for
61 		documentation purposes. For LockedConnection and it's documentation, see:
62 		$(LINK http://vibed.org/api/vibe.core.connectionpool/LockedConnection)
63 		+/
64 		struct LockedConnection(Connection) { Connection c; alias c this; }
65 	}
66 
67 	/++
68 	A lightweight convenience interface to a MySQL/MariaDB database using vibe.d's
69 	$(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool, ConnectionPool).
70 
71 	You have to include vibe.d in your project to be able to use this class.
72 	If you don't want to, refer to `mysql.connection.Connection`.
73 
74 	If, for any reason, this class doesn't suit your needs, it's easy to just
75 	use vibe.d's $(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool, ConnectionPool)
76 	directly. Simply provide it with a delegate that creates a new `mysql.connection.Connection`
77 	and does any other custom processing if needed.
78 	+/
79 	class MySQLPool
80 	{
81 		private
82 		{
83 			string m_host;
84 			string m_user;
85 			string m_password;
86 			string m_database;
87 			ushort m_port;
88 			SvrCapFlags m_capFlags;
89 			void delegate(Connection) m_onNewConnection;
90 			ConnectionPool!Connection m_pool;
91 			PreparedRegistrations!PreparedInfo preparedRegistrations;
92 
93 			struct PreparedInfo
94 			{
95 				bool queuedForRelease = false;
96 			}
97 		}
98 
99 		/// Sets up a connection pool with the provided connection settings.
100 		///
101 		/// The optional `onNewConnection` param allows you to set a callback
102 		/// which will be run every time a new connection is created.
103 		this(string host, string user, string password, string database,
104 			ushort port = 3306, uint maxConcurrent = (uint).max,
105 			SvrCapFlags capFlags = defaultClientFlags,
106 			void delegate(Connection) onNewConnection = null)
107 		{
108 			m_host = host;
109 			m_user = user;
110 			m_password = password;
111 			m_database = database;
112 			m_port = port;
113 			m_capFlags = capFlags;
114 			m_onNewConnection = onNewConnection;
115 			m_pool = new ConnectionPool!Connection(&createConnection);
116 		}
117 
118 		///ditto
119 		this(string host, string user, string password, string database,
120 			ushort port, SvrCapFlags capFlags, void delegate(Connection) onNewConnection = null)
121 		{
122 			this(host, user, password, database, port, (uint).max, capFlags, onNewConnection);
123 		}
124 
125 		///ditto
126 		this(string host, string user, string password, string database,
127 			ushort port, void delegate(Connection) onNewConnection)
128 		{
129 			this(host, user, password, database, port, (uint).max, defaultClientFlags, onNewConnection);
130 		}
131 
132 		///ditto
133 		this(string connStr, uint maxConcurrent = (uint).max, SvrCapFlags capFlags = defaultClientFlags,
134 			void delegate(Connection) onNewConnection = null)
135 		{
136 			auto parts = Connection.parseConnectionString(connStr);
137 			this(parts[0], parts[1], parts[2], parts[3], to!ushort(parts[4]), capFlags, onNewConnection);
138 		}
139 
140 		///ditto
141 		this(string connStr, SvrCapFlags capFlags, void delegate(Connection) onNewConnection = null)
142 		{
143 			this(connStr, (uint).max, capFlags, onNewConnection);
144 		}
145 
146 		///ditto
147 		this(string connStr, void delegate(Connection) onNewConnection)
148 		{
149 			this(connStr, (uint).max, defaultClientFlags, onNewConnection);
150 		}
151 
152 		/++
153 		Obtain a connection. If one isn't available, a new one will be created.
154 
155 		The connection returned is actually a `LockedConnection!Connection`,
156 		but it uses `alias this`, and so can be used just like a Connection.
157 		(See vibe.d's
158 		$(LINK2 http://vibed.org/api/vibe.core.connectionpool/LockedConnection, LockedConnection documentation).)
159 
160 		No other fiber will be given this `mysql.connection.Connection` as long as your fiber still holds it.
161 
162 		There is no need to close, release or unlock this connection. It is
163 		reference-counted and will automatically be returned to the pool once
164 		your fiber is done with it.
165 		
166 		If you have passed any prepared statements to  `autoRegister`
167 		or `autoRelease`, then those statements will automatically be
168 		registered/released on the connection. (Currently, this automatic
169 		register/release may actually occur upon the first command sent via
170 		the connection.)
171 		+/
172 		LockedConnection!Connection lockConnection()
173 		{
174 			auto conn = m_pool.lockConnection();
175 			if(conn.closed)
176 				conn.reconnect();
177 
178 			applyAuto(conn);
179 			return conn;
180 		}
181 
182 		/// Applies any `autoRegister`/`autoRelease` settings to a connection,
183 		/// if necessary.
184 		private void applyAuto(T)(T conn)
185 		{
186 			foreach(sql, info; preparedRegistrations.directLookup)
187 			{
188 				auto registeredOnPool = !info.queuedForRelease;
189 				auto registeredOnConnection = conn.isRegistered(sql);
190 				
191 				if(registeredOnPool && !registeredOnConnection) // Need to register?
192 					conn.register(sql);
193 				else if(!registeredOnPool && registeredOnConnection) // Need to release?
194 					conn.release(sql);
195 			}
196 		}
197 
198 		private Connection createConnection()
199 		{
200 			auto conn = new Connection(m_host, m_user, m_password, m_database, m_port, m_capFlags);
201 
202 			if(m_onNewConnection)
203 				m_onNewConnection(conn);
204 
205 			return conn;
206 		}
207 
208 		/// Get/set a callback delegate to be run every time a new connection
209 		/// is created.
210 		@property void onNewConnection(void delegate(Connection) onNewConnection)
211 		{
212 			m_onNewConnection = onNewConnection;
213 		}
214 
215 		///ditto
216 		@property void delegate(Connection) onNewConnection()
217 		{
218 			return m_onNewConnection;
219 		}
220 
221 		@("onNewConnection")
222 		debug(MYSQLN_TESTS)
223 		unittest
224 		{
225  			auto count = 0;
226 			void callback(Connection conn)
227 			{
228 				count++;
229 			}
230 
231 			// Test getting/setting
232 			auto poolA = new MySQLPool(testConnectionStr, &callback);
233 			auto poolB = new MySQLPool(testConnectionStr);
234 			auto poolNoCallback = new MySQLPool(testConnectionStr);
235 
236 			assert(poolA.onNewConnection == &callback);
237 			assert(poolB.onNewConnection is null);
238 			assert(poolNoCallback.onNewConnection is null);
239 			
240 			poolB.onNewConnection = &callback;
241 			assert(poolB.onNewConnection == &callback);
242 			assert(count == 0);
243 
244 			// Ensure callback is called
245 			{
246 				auto connA = poolA.lockConnection();
247 				assert(!connA.closed);
248 				assert(count == 1);
249 				
250 				auto connB = poolB.lockConnection();
251 				assert(!connB.closed);
252 				assert(count == 2);
253 			}
254 
255 			// Ensure works with no callback
256 			{
257 				auto oldCount = count;
258 				auto poolC = new MySQLPool(testConnectionStr);
259 				auto connC = poolC.lockConnection();
260 				assert(!connC.closed);
261 				assert(count == oldCount);
262 			}
263 		}
264 
265 		/++
266 		Forwards to vibe.d's
267 		$(LINK2 http://vibed.org/api/vibe.core.connectionpool/ConnectionPool.maxConcurrency, ConnectionPool.maxConcurrency)
268 		+/
269 		@property uint maxConcurrency()
270 		{
271 			return m_pool.maxConcurrency;
272 		}
273 
274 		///ditto
275 		@property void maxConcurrency(uint maxConcurrent)
276 		{
277 			m_pool.maxConcurrency = maxConcurrent;
278 		}
279 
280 		/++
281 		Set a prepared statement to be automatically registered on all
282 		connections received from this pool.
283 
284 		This also clears any `autoRelease` which may have been set for this statement.
285 
286 		Calling this is not strictly necessary, as a prepared statement will
287 		automatically be registered upon its first use on any `Connection`.
288 		This is provided for those who prefer eager registration over lazy
289 		for performance reasons.
290 
291 		Once this has been called, obtaining a connection via `lockConnection`
292 		will automatically register the prepared statement on the connection
293 		if it isn't already registered on the connection. This single
294 		registration safely persists after the connection is reclaimed by the
295 		pool and locked again by another Vibe.d task.
296 		
297 		Note, due to the way Vibe.d works, it is not possible to eagerly
298 		register or release a statement on all connections already sitting
299 		in the pool. This can only be done when locking a connection.
300 		
301 		You can stop the pool from continuing to auto-register the statement
302 		by calling either `autoRelease` or `clearAuto`.
303 		+/
304 		void autoRegister(Prepared prepared)
305 		{
306 			autoRegister(prepared.sql);
307 		}
308 
309 		///ditto
310 		void autoRegister(const(char[]) sql)
311 		{
312 			preparedRegistrations.registerIfNeeded(sql, (sql) => PreparedInfo());
313 		}
314 
315 		/++
316 		Set a prepared statement to be automatically released from all
317 		connections received from this pool.
318 
319 		This also clears any `autoRegister` which may have been set for this statement.
320 
321 		Calling this is not strictly necessary. The server considers prepared
322 		statements to be per-connection, so they'll go away when the connection
323 		closes anyway. This is provided in case direct control is actually needed.
324 
325 		Once this has been called, obtaining a connection via `lockConnection`
326 		will automatically release the prepared statement from the connection
327 		if it isn't already releases from the connection.
328 		
329 		Note, due to the way Vibe.d works, it is not possible to eagerly
330 		register or release a statement on all connections already sitting
331 		in the pool. This can only be done when locking a connection.
332 
333 		You can stop the pool from continuing to auto-release the statement
334 		by calling either `autoRegister` or `clearAuto`.
335 		+/
336 		void autoRelease(Prepared prepared)
337 		{
338 			autoRelease(prepared.sql);
339 		}
340 
341 		///ditto
342 		void autoRelease(const(char[]) sql)
343 		{
344 			preparedRegistrations.queueForRelease(sql);
345 		}
346 
347 		/// Is the given statement set to be automatically registered on all
348 		/// connections obtained from this connection pool?
349 		bool isAutoRegistered(Prepared prepared)
350 		{
351 			return isAutoRegistered(prepared.sql);
352 		}
353 		///ditto
354 		bool isAutoRegistered(const(char[]) sql)
355 		{
356 			return isAutoRegistered(preparedRegistrations[sql]);
357 		}
358 		///ditto
359 		package bool isAutoRegistered(Nullable!PreparedInfo info)
360 		{
361 			return info.isNull || !info.queuedForRelease;
362 		}
363 
364 		/// Is the given statement set to be automatically released on all
365 		/// connections obtained from this connection pool?
366 		bool isAutoReleased(Prepared prepared)
367 		{
368 			return isAutoReleased(prepared.sql);
369 		}
370 		///ditto
371 		bool isAutoReleased(const(char[]) sql)
372 		{
373 			return isAutoReleased(preparedRegistrations[sql]);
374 		}
375 		///ditto
376 		package bool isAutoReleased(Nullable!PreparedInfo info)
377 		{
378 			return info.isNull || info.queuedForRelease;
379 		}
380 
381 		/++
382 		Is the given statement set for NEITHER auto-register
383 		NOR auto-release on connections obtained from
384 		this connection pool?
385 
386 		Equivalent to `!isAutoRegistered && !isAutoReleased`.
387 		+/
388 		bool isAutoCleared(Prepared prepared)
389 		{
390 			return isAutoCleared(prepared.sql);
391 		}
392 		///ditto
393 		bool isAutoCleared(const(char[]) sql)
394 		{
395 			return isAutoCleared(preparedRegistrations[sql]);
396 		}
397 		///ditto
398 		package bool isAutoCleared(Nullable!PreparedInfo info)
399 		{
400 			return info.isNull;
401 		}
402 
403 		/++
404 		Removes any `autoRegister` or `autoRelease` which may have been set
405 		for this prepared statement.
406 
407 		Does nothing if the statement has not been set for auto-register or auto-release.
408 
409 		This releases any relevent memory for potential garbage collection.
410 		+/
411 		void clearAuto(Prepared prepared)
412 		{
413 			return clearAuto(prepared.sql);
414 		}
415 		///ditto
416 		void clearAuto(const(char[]) sql)
417 		{
418 			preparedRegistrations.directLookup.remove(sql);
419 		}
420 		
421 		/++
422 		Removes ALL prepared statement `autoRegister` and `autoRelease` which have been set.
423 		
424 		This releases all relevent memory for potential garbage collection.
425 		+/
426 		void clearAllRegistrations()
427 		{
428 			preparedRegistrations.clear();
429 		}
430 	}
431 	 
432 	@("registration")
433 	debug(MYSQLN_TESTS)
434 	unittest
435 	{
436 		import mysql.commands;
437 		auto pool = new MySQLPool(testConnectionStr);
438 
439 		// Setup
440 		Connection cn = pool.lockConnection();
441 		cn.exec("DROP TABLE IF EXISTS `poolRegistration`");
442 		cn.exec("CREATE TABLE `poolRegistration` (
443 			`data` LONGBLOB
444 		) ENGINE=InnoDB DEFAULT CHARSET=utf8");
445 		immutable sql = "SELECT * from `poolRegistration`";
446 		//auto cn2 = pool.lockConnection(); // Seems to return the same connection as `cn`
447 		auto cn2 = pool.createConnection();
448 		pool.applyAuto(cn2);
449 		assert(cn !is cn2);
450 
451 		// Tests:
452 		// Initial
453 		assert(pool.isAutoCleared(sql));
454 		assert(pool.isAutoRegistered(sql));
455 		assert(pool.isAutoReleased(sql));
456 		assert(!cn.isRegistered(sql));
457 		assert(!cn2.isRegistered(sql));
458 
459 		// Register on connection #1
460 		auto prepared = cn.prepare(sql);
461 		{
462 			assert(pool.isAutoCleared(sql));
463 			assert(pool.isAutoRegistered(sql));
464 			assert(pool.isAutoReleased(sql));
465 			assert(cn.isRegistered(sql));
466 			assert(!cn2.isRegistered(sql));
467 
468 			//auto cn3 = pool.lockConnection(); // Seems to return the same connection as `cn`
469 			auto cn3 = pool.createConnection();
470 			pool.applyAuto(cn3);
471 			assert(!cn3.isRegistered(sql));
472 		}
473 
474 		// autoRegister
475 		pool.autoRegister(prepared);
476 		{
477 			assert(!pool.isAutoCleared(sql));
478 			assert(pool.isAutoRegistered(sql));
479 			assert(!pool.isAutoReleased(sql));
480 			assert(cn.isRegistered(sql));
481 			assert(!cn2.isRegistered(sql));
482 
483 			//auto cn3 = pool.lockConnection(); // Seems to return the *same* connection as `cn`
484 			auto cn3 = pool.createConnection();
485 			pool.applyAuto(cn3);
486 			assert(cn3.isRegistered(sql));
487 		}
488 
489 		// autoRelease
490 		pool.autoRelease(prepared);
491 		{
492 			assert(!pool.isAutoCleared(sql));
493 			assert(!pool.isAutoRegistered(sql));
494 			assert(pool.isAutoReleased(sql));
495 			assert(cn.isRegistered(sql));
496 			assert(!cn2.isRegistered(sql));
497 
498 			//auto cn3 = pool.lockConnection(); // Seems to return the same connection as `cn`
499 			auto cn3 = pool.createConnection();
500 			pool.applyAuto(cn3);
501 			assert(!cn3.isRegistered(sql));
502 		}
503 
504 		// clearAuto
505 		pool.clearAuto(prepared);
506 		{
507 			assert(pool.isAutoCleared(sql));
508 			assert(pool.isAutoRegistered(sql));
509 			assert(pool.isAutoReleased(sql));
510 			assert(cn.isRegistered(sql));
511 			assert(!cn2.isRegistered(sql));
512 
513 			//auto cn3 = pool.lockConnection(); // Seems to return the same connection as `cn`
514 			auto cn3 = pool.createConnection();
515 			pool.applyAuto(cn3);
516 			assert(!cn3.isRegistered(sql));
517 		}
518 	}
519 
520 	@("closedConnection") // "cct"
521 	debug(MYSQLN_TESTS)
522 	{
523 		MySQLPool cctPool;
524 		int cctCount=0;
525 		
526 		void cctStart()
527 		{
528 			import std.array;
529 			import mysql.commands;
530 
531 			cctPool = new MySQLPool(testConnectionStr);
532 			cctPool.onNewConnection = (Connection conn) { cctCount++; };
533 			assert(cctCount == 0);
534 
535 			auto cn = cctPool.lockConnection();
536 			assert(!cn.closed);
537 			cn.close();
538 			assert(cn.closed);
539 			assert(cctCount == 1);
540 		}
541 
542 		unittest
543 		{
544 			cctStart();
545 			assert(cctCount == 1);
546 
547 			auto cn = cctPool.lockConnection();
548 			assert(cctCount == 1);
549 			assert(!cn.closed);
550 		}
551 	}
552 }