1 module mysql.protocol.commands;
2 
3 import std.algorithm;
4 import std.conv;
5 import std.datetime;
6 import std.digest.sha;
7 import std.exception;
8 import std.range;
9 import std.socket;
10 import std.stdio;
11 import std.string;
12 import std.traits;
13 import std.variant;
14 
15 import mysql.common;
16 import mysql.connection;
17 import mysql.protocol.constants;
18 import mysql.protocol.extra_types;
19 import mysql.protocol.packets;
20 
21 /**
22  * Encapsulation of an SQL command or query.
23  *
24  * A Command be be either a one-off SQL query, or may use a prepared statement.
25  * Commands that are expected to return a result set - queries - have distinctive methods
26  * that are enforced. That is it will be an error to call such a method with an SQL command
27  * that does not produce a result set.
28  */
29 struct Command
30 {
31 package:
32     Connection _con;
33     const(char)[] _sql;
34     uint _hStmt;
35     ulong _insertID;
36     bool _rowsPending, _headersPending, _pendingBinary, _rebound;
37     ushort _psParams, _psWarnings, _fieldCount;
38     ResultSetHeaders _rsh;
39     PreparedStmtHeaders _psh;
40     Variant[] _inParams;
41     ParameterSpecialization[] _psa;
42     string _prevFunc;
43 
44     bool sendCmd(CommandType cmd)
45     {
46         enforceEx!MYX(!(_headersPending || _rowsPending),
47             "There are result set elements pending - purgeResult() required.");
48 
49         scope(failure) _con.kill();
50         _con.sendCmd(cmd, _sql);
51         return true;
52     }
53 
54     static ubyte[] makeBitmap(in ParameterSpecialization[] psa) pure nothrow
55     {
56         size_t bml = (psa.length+7)/8;
57         ubyte[] bma;
58         bma.length = bml;
59         foreach (size_t i, PSN psn; psa)
60         {
61             if (!psn.isNull)
62                 continue;
63             size_t bn = i/8;
64             size_t bb = i%8;
65             ubyte sr = 1;
66             sr <<= bb;
67             bma[bn] |= sr;
68         }
69         return bma;
70     }
71 
72     ubyte[] makePSPrefix(ubyte flags = 0) pure const nothrow
73     {
74         ubyte[] prefix;
75         prefix.length = 14;
76 
77         prefix[4] = CommandType.STMT_EXECUTE;
78         _hStmt.packInto(prefix[5..9]);
79         prefix[9] = flags;   // flags, no cursor
80         prefix[10] = 1; // iteration count - currently always 1
81         prefix[11] = 0;
82         prefix[12] = 0;
83         prefix[13] = 0;
84 
85         return prefix;
86     }
87 
88     ubyte[] analyseParams(out ubyte[] vals, out bool longData)
89     {
90         size_t pc = _inParams.length;
91         ubyte[] types;
92         types.length = pc*2;
93         size_t alloc = pc*20;
94         vals.length = alloc;
95         uint vcl = 0, len;
96         int ct = 0;
97 
98         void reAlloc(size_t n)
99         {
100             if (vcl+n < alloc)
101                 return;
102             size_t inc = (alloc*3)/2;
103             if (inc <  n)
104                 inc = n;
105             alloc += inc;
106             vals.length = alloc;
107         }
108 
109         foreach (size_t i; 0..pc)
110         {
111             if (_psa[i].chunkSize)
112                 longData= true;
113             bool isnull = _psa[i].isNull;
114             Variant v = _inParams[i];
115             SQLType ext = _psa[i].type;
116             string ts = v.type.toString();
117             bool isRef;
118             if (ts[$-1] == '*')
119             {
120                 ts.length = ts.length-1;
121                 isRef= true;
122             }
123 
124             enum UNSIGNED  = 0x80;
125             enum SIGNED    = 0;
126             switch (ts)
127             {
128                 case "bool":
129                     if (ext == SQLType.INFER_FROM_D_TYPE)
130                         types[ct++] = SQLType.BIT;
131                     else
132                         types[ct++] = cast(ubyte) ext;
133                     types[ct++] = SIGNED;
134                     if (isnull) break;
135                     reAlloc(2);
136                     bool bv = isRef? *(v.get!(bool*)): v.get!(bool);
137                     vals[vcl++] = 1;
138                     vals[vcl++] = bv? 0x31: 0x30;
139                     break;
140                 case "byte":
141                     types[ct++] = SQLType.TINY;
142                     types[ct++] = SIGNED;
143                     if (isnull) break;
144                     reAlloc(1);
145                     vals[vcl++] = isRef? *(v.get!(byte*)): v.get!(byte);
146                     break;
147                 case "ubyte":
148                     types[ct++] = SQLType.TINY;
149                     types[ct++] = UNSIGNED;
150                     if (isnull) break;
151                     reAlloc(1);
152                     vals[vcl++] = isRef? *(v.get!(ubyte*)): v.get!(ubyte);
153                     break;
154                 case "short":
155                     types[ct++] = SQLType.SHORT;
156                     types[ct++] = SIGNED;
157                     if (isnull) break;
158                     reAlloc(2);
159                     short si = isRef? *(v.get!(short*)): v.get!(short);
160                     vals[vcl++] = cast(ubyte) (si & 0xff);
161                     vals[vcl++] = cast(ubyte) ((si >> 8) & 0xff);
162                     break;
163                 case "ushort":
164                     types[ct++] = SQLType.SHORT;
165                     types[ct++] = UNSIGNED;
166                     reAlloc(2);
167                     ushort us = isRef? *(v.get!(ushort*)): v.get!(ushort);
168                     vals[vcl++] = cast(ubyte) (us & 0xff);
169                     vals[vcl++] = cast(ubyte) ((us >> 8) & 0xff);
170                     break;
171                 case "int":
172                     types[ct++] = SQLType.INT;
173                     types[ct++] = SIGNED;
174                     if (isnull) break;
175                     reAlloc(4);
176                     int ii = isRef? *(v.get!(int*)): v.get!(int);
177                     vals[vcl++] = cast(ubyte) (ii & 0xff);
178                     vals[vcl++] = cast(ubyte) ((ii >> 8) & 0xff);
179                     vals[vcl++] = cast(ubyte) ((ii >> 16) & 0xff);
180                     vals[vcl++] = cast(ubyte) ((ii >> 24) & 0xff);
181                     break;
182                 case "uint":
183                     types[ct++] = SQLType.INT;
184                     types[ct++] = UNSIGNED;
185                     if (isnull) break;
186                     reAlloc(4);
187                     uint ui = isRef? *(v.get!(uint*)): v.get!(uint);
188                     vals[vcl++] = cast(ubyte) (ui & 0xff);
189                     vals[vcl++] = cast(ubyte) ((ui >> 8) & 0xff);
190                     vals[vcl++] = cast(ubyte) ((ui >> 16) & 0xff);
191                     vals[vcl++] = cast(ubyte) ((ui >> 24) & 0xff);
192                     break;
193                 case "long":
194                     types[ct++] = SQLType.LONGLONG;
195                     types[ct++] = SIGNED;
196                     if (isnull) break;
197                     reAlloc(8);
198                     long li = isRef? *(v.get!(long*)): v.get!(long);
199                     vals[vcl++] = cast(ubyte) (li & 0xff);
200                     vals[vcl++] = cast(ubyte) ((li >> 8) & 0xff);
201                     vals[vcl++] = cast(ubyte) ((li >> 16) & 0xff);
202                     vals[vcl++] = cast(ubyte) ((li >> 24) & 0xff);
203                     vals[vcl++] = cast(ubyte) ((li >> 32) & 0xff);
204                     vals[vcl++] = cast(ubyte) ((li >> 40) & 0xff);
205                     vals[vcl++] = cast(ubyte) ((li >> 48) & 0xff);
206                     vals[vcl++] = cast(ubyte) ((li >> 56) & 0xff);
207                     break;
208                 case "ulong":
209                     types[ct++] = SQLType.LONGLONG;
210                     types[ct++] = UNSIGNED;
211                     if (isnull) break;
212                     reAlloc(8);
213                     ulong ul = isRef? *(v.get!(ulong*)): v.get!(ulong);
214                     vals[vcl++] = cast(ubyte) (ul & 0xff);
215                     vals[vcl++] = cast(ubyte) ((ul >> 8) & 0xff);
216                     vals[vcl++] = cast(ubyte) ((ul >> 16) & 0xff);
217                     vals[vcl++] = cast(ubyte) ((ul >> 24) & 0xff);
218                     vals[vcl++] = cast(ubyte) ((ul >> 32) & 0xff);
219                     vals[vcl++] = cast(ubyte) ((ul >> 40) & 0xff);
220                     vals[vcl++] = cast(ubyte) ((ul >> 48) & 0xff);
221                     vals[vcl++] = cast(ubyte) ((ul >> 56) & 0xff);
222                     break;
223                 case "float":
224                     types[ct++] = SQLType.FLOAT;
225                     types[ct++] = SIGNED;
226                     if (isnull) break;
227                     reAlloc(4);
228                     float f = isRef? *(v.get!(float*)): v.get!(float);
229                     ubyte* ubp = cast(ubyte*) &f;
230                     vals[vcl++] = *ubp++;
231                     vals[vcl++] = *ubp++;
232                     vals[vcl++] = *ubp++;
233                     vals[vcl++] = *ubp;
234                     break;
235                 case "double":
236                     types[ct++] = SQLType.DOUBLE;
237                     types[ct++] = SIGNED;
238                     if (isnull) break;
239                     reAlloc(8);
240                     double d = isRef? *(v.get!(double*)): v.get!(double);
241                     ubyte* ubp = cast(ubyte*) &d;
242                     vals[vcl++] = *ubp++;
243                     vals[vcl++] = *ubp++;
244                     vals[vcl++] = *ubp++;
245                     vals[vcl++] = *ubp++;
246                     vals[vcl++] = *ubp++;
247                     vals[vcl++] = *ubp++;
248                     vals[vcl++] = *ubp++;
249                     vals[vcl++] = *ubp;
250                     break;
251                 case "std.datetime.Date":
252                     types[ct++] = SQLType.DATE;
253                     types[ct++] = SIGNED;
254                     Date date = isRef? *(v.get!(Date*)): v.get!(Date);
255                     ubyte[] da = pack(date);
256                     size_t l = da.length;
257                     if (isnull) break;
258                     reAlloc(l);
259                     vals[vcl..vcl+l] = da[];
260                     vcl += l;
261                     break;
262                 case "std.datetime.Time":
263                     types[ct++] = SQLType.TIME;
264                     types[ct++] = SIGNED;
265                     TimeOfDay time = isRef? *(v.get!(TimeOfDay*)): v.get!(TimeOfDay);
266                     ubyte[] ta = pack(time);
267                     size_t l = ta.length;
268                     if (isnull) break;
269                     reAlloc(l);
270                     vals[vcl..vcl+l] = ta[];
271                     vcl += l;
272                     break;
273                 case "std.datetime.DateTime":
274                     types[ct++] = SQLType.DATETIME;
275                     types[ct++] = SIGNED;
276                     DateTime dt = isRef? *(v.get!(DateTime*)): v.get!(DateTime);
277                     ubyte[] da = pack(dt);
278                     size_t l = da.length;
279                     if (isnull) break;
280                     reAlloc(l);
281                     vals[vcl..vcl+l] = da[];
282                     vcl += l;
283                     break;
284                 case "connect.Timestamp":
285                     types[ct++] = SQLType.TIMESTAMP;
286                     types[ct++] = SIGNED;
287                     Timestamp tms = isRef? *(v.get!(Timestamp*)): v.get!(Timestamp);
288                     DateTime dt = mysql.protocol.packet_helpers.toDateTime(tms.rep);
289                     ubyte[] da = pack(dt);
290                     size_t l = da.length;
291                     if (isnull) break;
292                     reAlloc(l);
293                     vals[vcl..vcl+l] = da[];
294                     vcl += l;
295                     break;
296                 case "immutable(char)[]":
297                     if (ext == SQLType.INFER_FROM_D_TYPE)
298                         types[ct++] = SQLType.VARCHAR;
299                     else
300                         types[ct++] = cast(ubyte) ext;
301                     types[ct++] = SIGNED;
302                     if (isnull) break;
303                     string s = isRef? *(v.get!(string*)): v.get!(string);
304                     ubyte[] packed = packLCS(cast(void[]) s);
305                     reAlloc(packed.length);
306                     vals[vcl..vcl+packed.length] = packed[];
307                     vcl += packed.length;
308                     break;
309                 case "char[]":
310                     if (ext == SQLType.INFER_FROM_D_TYPE)
311                         types[ct++] = SQLType.VARCHAR;
312                     else
313                         types[ct++] = cast(ubyte) ext;
314                     types[ct++] = SIGNED;
315                     if (isnull) break;
316                     char[] ca = isRef? *(v.get!(char[]*)): v.get!(char[]);
317                     ubyte[] packed = packLCS(cast(void[]) ca);
318                     reAlloc(packed.length);
319                     vals[vcl..vcl+packed.length] = packed[];
320                     vcl += packed.length;
321                     break;
322                 case "byte[]":
323                     if (ext == SQLType.INFER_FROM_D_TYPE)
324                         types[ct++] = SQLType.TINYBLOB;
325                     else
326                         types[ct++] = cast(ubyte) ext;
327                     types[ct++] = SIGNED;
328                     if (isnull) break;
329                     byte[] ba = isRef? *(v.get!(byte[]*)): v.get!(byte[]);
330                     ubyte[] packed = packLCS(cast(void[]) ba);
331                     reAlloc(packed.length);
332                     vals[vcl..vcl+packed.length] = packed[];
333                     vcl += packed.length;
334                     break;
335                 case "ubyte[]":
336                     if (ext == SQLType.INFER_FROM_D_TYPE)
337                         types[ct++] = SQLType.TINYBLOB;
338                     else
339                         types[ct++] = cast(ubyte) ext;
340                     types[ct++] = SIGNED;
341                     if (isnull) break;
342                     ubyte[] uba = isRef? *(v.get!(ubyte[]*)): v.get!(ubyte[]);
343                     ubyte[] packed = packLCS(cast(void[]) uba);
344                     reAlloc(packed.length);
345                     vals[vcl..vcl+packed.length] = packed[];
346                     vcl += packed.length;
347                     break;
348                 case "void":
349                     throw new MYX("Unbound parameter " ~ to!string(i), __FILE__, __LINE__);
350                 default:
351                     throw new MYX("Unsupported parameter type " ~ ts, __FILE__, __LINE__);
352             }
353         }
354         vals.length = vcl;
355         return types;
356     }
357 
358     void sendLongData()
359     {
360         assert(_psa.length <= ushort.max); // parameter number is sent as short
361         foreach (ushort i, PSN psn; _psa)
362         {
363             if (!psn.chunkSize) continue;
364             uint cs = psn.chunkSize;
365             uint delegate(ubyte[]) dg = psn.chunkDelegate;
366 
367             ubyte[] chunk;
368             chunk.length = cs+11;
369             chunk.setPacketHeader(0 /*each chunk is separate cmd*/);
370             chunk[4] = CommandType.STMT_SEND_LONG_DATA;
371             _hStmt.packInto(chunk[5..9]); // statement handle
372             packInto(i, chunk[9..11]); // parameter number
373 
374             // byte 11 on is payload
375             for (;;)
376             {
377                 uint sent = dg(chunk[11..cs+11]);
378                 if (sent < cs)
379                 {
380                     if (sent == 0)    // data was exact multiple of chunk size - all sent
381                         break;
382                     sent += 7;        // adjust for non-payload bytes
383                     chunk.length = chunk.length - (cs-sent);     // trim the chunk
384                     packInto!(uint, true)(cast(uint)sent, chunk[0..3]);
385                     _con.send(chunk);
386                     break;
387                 }
388                 _con.send(chunk);
389             }
390         }
391     }
392 
393 public:
394 
395     /**
396      * Construct a naked Command object
397      *
398      * Params: con = A Connection object to communicate with the server
399      */
400     this(Connection con)
401     {
402         _con = con;
403         _con.resetPacket();
404     }
405 
406     /**
407      * Construct a Command object complete with SQL
408      *
409      * Params: con = A Connection object to communicate with the server
410      *                sql = SQL command string.
411      */
412     this(Connection con, const(char)[] sql)
413     {
414         _sql = sql;
415         this(con);
416     }
417 
418     @property
419     {
420         /// Get the current SQL for the Command
421         const(char)[] sql() pure const nothrow { return _sql; }
422 
423         /**
424         * Set a new SQL command.
425         *
426         * This can have quite profound side effects. It resets the Command to
427         * an initial state. If a query has been issued on the Command that
428         * produced a result set, then all of the result set packets - field
429         * description sequence, EOF packet, result rows sequence, EOF packet
430         * must be flushed from the server before any further operation can be
431         * performed on the Connection. If you want to write speedy and efficient
432         * MySQL programs, you should bear this in mind when designing your
433         * queries so that you are not requesting many rows when one would do.
434         *
435         * Params: sql = SQL command string.
436         */
437         const(char)[] sql(const(char)[] sql)
438         {
439             if (_hStmt)
440             {
441                 purgeResult();
442                 releaseStatement();
443                 _con.resetPacket();
444             }
445             return this._sql = sql;
446         }
447     }
448 
449     /**
450      * Submit an SQL command to the server to be compiled into a prepared statement.
451      *
452      * The result of a successful outcome will be a statement handle - an ID -
453      * for the prepared statement, a count of the parameters required for
454      * excution of the statement, and a count of the columns that will be present
455      * in any result set that the command generates. Thes values will be stored
456      * in in the Command struct.
457      *
458      * The server will then proceed to send prepared statement headers,
459      * including parameter descriptions, and result set field descriptions,
460      * followed by an EOF packet.
461      *
462      * If there is an existing statement handle in the Command struct, that
463      * prepared statement is released.
464      *
465      * Throws: MySQLException if there are pending result set items, or if the
466      * server has a problem.
467      */
468     void prepare()
469     {
470         enforceEx!MYX(!(_headersPending || _rowsPending),
471             "There are result set elements pending - purgeResult() required.");
472 
473         scope(failure) _con.kill();
474 
475         if (_hStmt)
476             releaseStatement();
477         _con.sendCmd(CommandType.STMT_PREPARE, _sql);
478         _fieldCount = 0;
479 
480         ubyte[] packet = _con.getPacket();
481         if (packet.front == ResultPacketMarker.ok)
482         {
483             packet.popFront();
484             _hStmt              = packet.consume!int();
485             _fieldCount         = packet.consume!short();
486             _psParams           = packet.consume!short();
487 
488             _inParams.length    = _psParams;
489             _psa.length         = _psParams;
490 
491             packet.popFront(); // one byte filler
492             _psWarnings         = packet.consume!short();
493 
494             // At this point the server also sends field specs for parameters
495             // and columns if there were any of each
496             _psh = PreparedStmtHeaders(_con, _fieldCount, _psParams);
497         }
498         else if(packet.front == ResultPacketMarker.error)
499         {
500             auto error = OKErrorPacket(packet);
501             enforcePacketOK(error);
502             assert(0); // FIXME: what now?
503         }
504         else
505             assert(0); // FIXME: what now?
506     }
507 
508     /**
509      * Release a prepared statement.
510      *
511      * This method tells the server that it can dispose of the information it
512      * holds about the current prepared statement, and resets the Command
513      * object to an initial state in that respect.
514      */
515     void releaseStatement()
516     {
517         scope(failure) _con.kill();
518 
519         ubyte[] packet;
520         packet.length = 9;
521         packet.setPacketHeader(0/*packet number*/);
522         _con.bumpPacket();
523         packet[4] = CommandType.STMT_CLOSE;
524         _hStmt.packInto(packet[5..9]);
525         purgeResult();
526         _con.send(packet);
527         // It seems that the server does not find it necessary to send a response
528         // for this command.
529         _hStmt = 0;
530     }
531 
532     /**
533      * Flush any outstanding result set elements.
534      *
535      * When the server responds to a command that produces a result set, it
536      * queues the whole set of corresponding packets over the current connection.
537      * Before that Connection can embark on any new command, it must receive
538      * all of those packets and junk them.
539      * http://www.mysqlperformanceblog.com/2007/07/08/mysql-net_write_timeout-vs-wait_timeout-and-protocol-notes/
540      */
541     ulong purgeResult()
542     {
543         scope(failure) _con.kill();
544 
545         ulong rows = 0;
546         if (_fieldCount)
547         {
548             if (_headersPending)
549             {
550                 for (size_t i = 0;; i++)
551                 {
552                     if (_con.getPacket().isEOFPacket())
553                     {
554                         _headersPending = false;
555                         break;
556                     }
557                     enforceEx!MYXProtocol(i < _fieldCount, "Field header count exceeded but no EOF packet found.");
558                 }
559             }
560             if (_rowsPending)
561             {
562                 for (;;  rows++)
563                 {
564                     if (_con.getPacket().isEOFPacket())
565                     {
566                         _rowsPending = _pendingBinary = false;
567                         break;
568                     }
569                 }
570             }
571         }
572         _fieldCount = 0;
573         _con.resetPacket();
574         return rows;
575     }
576 
577     /**
578      * Bind a D variable to a prepared statement parameter.
579      *
580      * In this implementation, binding comprises setting a value into the
581      * appropriate element of an array of Variants which represent the
582      * parameters, and setting any required specializations.
583      *
584      * To bind to some D variable, we set the corrsponding variant with its
585      * address, so there is no need to rebind between calls to execPreparedXXX.
586      */
587     void bindParameter(T)(ref T val, size_t pIndex, ParameterSpecialization psn = PSN(0, false, SQLType.INFER_FROM_D_TYPE, 0, null))
588     {
589         // Now in theory we should be able to check the parameter type here, since the
590         // protocol is supposed to send us type information for the parameters, but this
591         // capability seems to be broken. This assertion is supported by the fact that
592         // the same information is not available via the MySQL C API either. It is up
593         // to the programmer to ensure that appropriate type information is embodied
594         // in the variant array, or provided explicitly. This sucks, but short of
595         // having a client side SQL parser I don't see what can be done.
596         //
597         // We require that the statement be prepared at this point so we can at least
598         // check that the parameter number is within the required range
599         enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound.");
600         enforceEx!MYX(pIndex < _psParams, "Parameter number is out of range for the prepared statement.");
601         _inParams[pIndex] = &val;
602         psn.pIndex = pIndex;
603         _psa[pIndex] = psn;
604     }
605 
606     /**
607      * Bind a tuple of D variables to the parameters of a prepared statement.
608      *
609      * You can use this method to bind a set of variables if you don't need any specialization,
610      * that is there will be no null values, and chunked transfer is not neccessary.
611      *
612      * The tuple must match the required number of parameters, and it is the programmer's
613      * responsibility to ensure that they are of appropriate types.
614      */
615     void bindParameterTuple(T...)(ref T args)
616     {
617         enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound.");
618         enforceEx!MYX(args.length == _psParams, "Argument list supplied does not match the number of parameters.");
619         foreach (size_t i, dummy; args)
620             _inParams[i] = &args[i];
621     }
622 
623     /**
624      * Bind a Variant[] as the parameters of a prepared statement.
625      *
626      * You can use this method to bind a set of variables in Variant form to
627      * the parameters of a prepared statement.
628      *
629      * Parameter specializations can be added if required. This method could be
630      * used to add records from a data entry form along the lines of
631      * ------------
632      * auto c = Command(con, "insert into table42 values(?, ?, ?)");
633      * c.prepare();
634      * Variant[] va;
635      * va.length = 3;
636      * c.bindParameters(va);
637      * DataRecord dr;    // Some data input facility
638      * ulong ra;
639      * do
640      * {
641      *     dr.get();
642      *     va[0] = dr("Name");
643      *     va[1] = dr("City");
644      *     va[2] = dr("Whatever");
645      *     c.execPrepared(ra);
646      * } while(tod < "17:30");
647      * ------------
648      * Params: va = External list of Variants to be used as parameters
649      *                psnList = any required specializations
650      */
651     void bindParameters(Variant[] va, ParameterSpecialization[] psnList= null)
652     {
653         enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound.");
654         enforceEx!MYX(va.length == _psParams, "Param count supplied does not match prepared statement");
655         _inParams[] = va[];
656         if (psnList !is null)
657         {
658             foreach (PSN psn; psnList)
659                 _psa[psn.pIndex] = psn;
660         }
661     }
662 
663     /**
664      * Access a prepared statement parameter for update.
665      *
666      * Another style of usage would simply update the parameter Variant directly
667      *
668      * ------------
669      * c.param(0) = 42;
670      * c.param(1) = "The answer";
671      * ------------
672      * Params: index = The zero based index
673      */
674     ref Variant param(size_t index) pure
675     {
676         enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound.");
677         enforceEx!MYX(index < _psParams, "Parameter index out of range.");
678         return _inParams[index];
679     }
680 
681     /**
682      * Sets a prepared statement parameter to NULL.
683      *
684      * Params: index = The zero based index
685      */
686     void setNullParam(size_t index)
687     {
688         enforceEx!MYX(_hStmt, "The statement must be prepared before parameters are bound.");
689         enforceEx!MYX(index < _psParams, "Parameter index out of range.");
690         _psa[index].isNull = true;
691         _inParams[index] = "";
692     }
693 
694     /**
695      * Execute a one-off SQL command.
696      *
697      * Use this method when you are not going to be using the same command repeatedly.
698      * It can be used with commands that don't produce a result set, or those that
699      * do. If there is a result set its existence will be indicated by the return value.
700      *
701      * Any result set can be accessed vis getNextRow(), but you should really be
702      * using execSQLResult() or execSQLSequence() for such queries.
703      *
704      * Params: ra = An out parameter to receive the number of rows affected.
705      * Returns: true if there was a (possibly empty) result set.
706      */
707     bool execSQL(out ulong ra)
708     {
709         scope(failure) _con.kill();
710 
711         _con.sendCmd(CommandType.QUERY, _sql);
712         _fieldCount = 0;
713         ubyte[] packet = _con.getPacket();
714         bool rv;
715         if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
716         {
717             _con.resetPacket();
718             auto okp = OKErrorPacket(packet);
719             enforcePacketOK(okp);
720             ra = okp.affected;
721             _con._serverStatus = okp.serverStatus;
722             _insertID = okp.insertID;
723             rv = false;
724         }
725         else
726         {
727             // There was presumably a result set
728             assert(packet.front >= 1 && packet.front <= 250); // ResultSet packet header should have this value
729             _headersPending = _rowsPending = true;
730             _pendingBinary = false;
731             auto lcb = packet.consumeIfComplete!LCB();
732             assert(!lcb.isNull);
733             assert(!lcb.isIncomplete);
734             _fieldCount = cast(ushort)lcb.value;
735             assert(_fieldCount == lcb.value);
736             rv = true;
737             ra = 0;
738         }
739         return rv;
740     }
741 
742     /**
743      * Execute a one-off SQL command for the case where you expect a result set,
744      * and want it all at once.
745      *
746      * Use this method when you are not going to be using the same command repeatedly.
747      * This method will throw if the SQL command does not produce a result set.
748      *
749      * If there are long data items among the expected result columns you can specify
750      * that they are to be subject to chunked transfer via a delegate.
751      *
752      * Params: csa = An optional array of ColumnSpecialization structs.
753      * Returns: A (possibly empty) ResultSet.
754      */
755     ResultSet execSQLResult(ColumnSpecialization[] csa = null)
756     {
757         ulong ra;
758         enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set.");
759 
760         _rsh = ResultSetHeaders(_con, _fieldCount);
761         if (csa !is null)
762             _rsh.addSpecializations(csa);
763         _headersPending = false;
764 
765         Row[] rows;
766         while(true)
767         {
768             auto packet = _con.getPacket();
769             if(packet.isEOFPacket())
770                 break;
771             rows ~= Row(_con, packet, _rsh, false);
772             // As the row fetches more data while incomplete, it might already have
773             // fetched the EOF marker, so we have to check it again
774             if(!packet.empty && packet.isEOFPacket())
775                 break;
776         }
777         _rowsPending = _pendingBinary = false;
778 
779         return ResultSet(rows, _rsh.fieldNames);
780     }
781 
782     /**
783      * Execute a one-off SQL command for the case where you expect a result set,
784      * and want to deal with it a row at a time.
785      *
786      * Use this method when you are not going to be using the same command repeatedly.
787      * This method will throw if the SQL command does not produce a result set.
788      *
789      * If there are long data items among the expected result columns you can specify
790      * that they are to be subject to chunked transfer via a delegate.
791      *
792      * Params: csa = An optional array of ColumnSpecialization structs.
793      * Returns: A (possibly empty) ResultSequence.
794      */
795     ResultSequence execSQLSequence(ColumnSpecialization[] csa = null)
796     {
797         uint alloc = 20;
798         Row[] rra;
799         rra.length = alloc;
800         uint cr = 0;
801         ulong ra;
802         enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set.");
803         _rsh = ResultSetHeaders(_con, _fieldCount);
804         if (csa !is null)
805             _rsh.addSpecializations(csa);
806 
807         _headersPending = false;
808         return ResultSequence(&this, _rsh.fieldNames);
809     }
810 
811     /**
812      * Execute a one-off SQL command to place result values into a set of D variables.
813      *
814      * Use this method when you are not going to be using the same command repeatedly.
815      * It will throw if the specified command does not produce a result set, or if
816      * any column type is incompatible with the corresponding D variable.
817      *
818      * Params: args = A tuple of D variables to receive the results.
819      * Returns: true if there was a (possibly empty) result set.
820      */
821     void execSQLTuple(T...)(ref T args)
822     {
823         ulong ra;
824         enforceEx!MYX(execSQL(ra), "The executed query did not produce a result set.");
825         Row rr = getNextRow();
826         /+if (!rr._valid)   // The result set was empty - not a crime.
827             return;+/
828         enforceEx!MYX(rr._values.length == args.length, "Result column count does not match the target tuple.");
829         foreach (size_t i, dummy; args)
830         {
831             enforceEx!MYX(typeid(args[i]).toString() == rr._values[i].type.toString(),
832                 "Tuple "~to!string(i)~" type and column type are not compatible.");
833             args[i] = rr._values[i].get!(typeof(args[i]));
834         }
835         // If there were more rows, flush them away
836         // Question: Should I check in purgeResult and throw if there were - it's very inefficient to
837         // allow sloppy SQL that does not ensure just one row!
838         purgeResult();
839     }
840 
841     /**
842      * Execute a prepared command.
843      *
844      * Use this method when you will use the same SQL command repeatedly.
845      * It can be used with commands that don't produce a result set, or those that
846      * do. If there is a result set its existence will be indicated by the return value.
847      *
848      * Any result set can be accessed vis getNextRow(), but you should really be
849      * using execPreparedResult() or execPreparedSequence() for such queries.
850      *
851      * Params: ra = An out parameter to receive the number of rows affected.
852      * Returns: true if there was a (possibly empty) result set.
853      */
854     bool execPrepared(out ulong ra)
855     {
856         enforceEx!MYX(_hStmt, "The statement has not been prepared.");
857         scope(failure) _con.kill();
858 
859         ubyte[] packet;
860         _con.resetPacket();
861 
862         ubyte[] prefix = makePSPrefix(0);
863         size_t len = prefix.length;
864         bool longData;
865 
866         if (_psh._paramCount)
867         {
868             ubyte[] one = [ 1 ];
869             ubyte[] vals;
870             ubyte[] types = analyseParams(vals, longData);
871             ubyte[] nbm = makeBitmap(_psa);
872             packet = prefix ~ nbm ~ one ~ types ~ vals;
873         }
874         else
875             packet = prefix;
876 
877         if (longData)
878             sendLongData();
879 
880         assert(packet.length <= uint.max);
881         packet.setPacketHeader(_con.pktNumber);
882         _con.bumpPacket();
883         _con.send(packet);
884         packet = _con.getPacket();
885         bool rv;
886         if (packet.front == ResultPacketMarker.ok || packet.front == ResultPacketMarker.error)
887         {
888             _con.resetPacket();
889             auto okp = OKErrorPacket(packet);
890             enforcePacketOK(okp);
891             ra = okp.affected;
892             _con._serverStatus = okp.serverStatus;
893             _insertID = okp.insertID;
894             rv = false;
895         }
896         else
897         {
898             // There was presumably a result set
899             _headersPending = _rowsPending = _pendingBinary = true;
900             auto lcb = packet.consumeIfComplete!LCB();
901             assert(!lcb.isIncomplete);
902             _fieldCount = cast(ushort)lcb.value;
903             rv = true;
904         }
905         return rv;
906     }
907 
908     /**
909      * Execute a prepared SQL command for the case where you expect a result set,
910      * and want it all at once.
911      *
912      * Use this method when you will use the same command repeatedly.
913      * This method will throw if the SQL command does not produce a result set.
914      *
915      * If there are long data items among the expected result columns you can specify
916      * that they are to be subject to chunked transfer via a delegate.
917      *
918      * Params: csa = An optional array of ColumnSpecialization structs.
919      * Returns: A (possibly empty) ResultSet.
920      */
921     ResultSet execPreparedResult(ColumnSpecialization[] csa = null)
922     {
923         ulong ra;
924         enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set.");
925         uint alloc = 20;
926         Row[] rra;
927         rra.length = alloc;
928         uint cr = 0;
929         _rsh = ResultSetHeaders(_con, _fieldCount);
930         if (csa !is null)
931             _rsh.addSpecializations(csa);
932         _headersPending = false;
933         ubyte[] packet;
934         for (size_t i = 0;; i++)
935         {
936             packet = _con.getPacket();
937             if (packet.isEOFPacket())
938                 break;
939             Row row = Row(_con, packet, _rsh, true);
940             if (cr >= alloc)
941             {
942                 alloc = (alloc*3)/2;
943                 rra.length = alloc;
944             }
945             rra[cr++] = row;
946             if (!packet.empty && packet.isEOFPacket())
947                 break;
948         }
949         _rowsPending = _pendingBinary = false;
950         rra.length = cr;
951         ResultSet rs = ResultSet(rra, _rsh.fieldNames);
952         return rs;
953     }
954 
955     /**
956      * Execute a prepared SQL command for the case where you expect a result set,
957      * and want to deal with it one row at a time.
958      *
959      * Use this method when you will use the same command repeatedly.
960      * This method will throw if the SQL command does not produce a result set.
961      *
962      * If there are long data items among the expected result columns you can
963      * specify that they are to be subject to chunked transfer via a delegate.
964      *
965      * Params: csa = An optional array of ColumnSpecialization structs.
966      * Returns: A (possibly empty) ResultSequence.
967      */
968     ResultSequence execPreparedSequence(ColumnSpecialization[] csa = null)
969     {
970         ulong ra;
971         enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set.");
972         uint alloc = 20;
973         Row[] rra;
974         rra.length = alloc;
975         uint cr = 0;
976         _rsh = ResultSetHeaders(_con, _fieldCount);
977         if (csa !is null)
978             _rsh.addSpecializations(csa);
979         _headersPending = false;
980         return ResultSequence(&this, _rsh.fieldNames);
981     }
982 
983     /**
984      * Execute a prepared SQL command to place result values into a set of D variables.
985      *
986      * Use this method when you will use the same command repeatedly.
987      * It will throw if the specified command does not produce a result set, or
988      * if any column type is incompatible with the corresponding D variable
989      *
990      * Params: args = A tuple of D variables to receive the results.
991      * Returns: true if there was a (possibly empty) result set.
992      */
993     void execPreparedTuple(T...)(ref T args)
994     {
995         ulong ra;
996         enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set.");
997         Row rr = getNextRow();
998         // enforceEx!MYX(rr._valid, "The result set was empty.");
999         enforceEx!MYX(rr._values.length == args.length, "Result column count does not match the target tuple.");
1000         foreach (size_t i, dummy; args)
1001         {
1002             enforceEx!MYX(typeid(args[i]).toString() == rr._values[i].type.toString(),
1003                 "Tuple "~to!string(i)~" type and column type are not compatible.");
1004             args[i] = rr._values[i].get!(typeof(args[i]));
1005         }
1006         // If there were more rows, flush them away
1007         // Question: Should I check in purgeResult and throw if there were - it's very inefficient to
1008         // allow sloppy SQL that does not ensure just one row!
1009         purgeResult();
1010     }
1011 
1012     /**
1013      * Get the next Row of a pending result set.
1014      *
1015      * This method can be used after either execSQL() or execPrepared() have returned true
1016      * to retrieve result set rows sequentially.
1017      *
1018      * Similar functionality is available via execSQLSequence() and execPreparedSequence() in
1019      * which case the interface is presented as a forward range of Rows.
1020      *
1021      * This method allows you to deal with very large result sets either a row at a time,
1022      * or by feeding the rows into some suitable container such as a linked list.
1023      *
1024      * Returns: A Row object.
1025      */
1026     Row getNextRow()
1027     {
1028         scope(failure) _con.kill();
1029 
1030         if (_headersPending)
1031         {
1032             _rsh = ResultSetHeaders(_con, _fieldCount);
1033             _headersPending = false;
1034         }
1035         ubyte[] packet;
1036         Row rr;
1037         packet = _con.getPacket();
1038         if (packet.isEOFPacket())
1039         {
1040             _rowsPending = _pendingBinary = false;
1041             return rr;
1042         }
1043         if (_pendingBinary)
1044             rr = Row(_con, packet, _rsh, true);
1045         else
1046             rr = Row(_con, packet, _rsh, false);
1047         //rr._valid = true;
1048         return rr;
1049     }
1050 
1051     /**
1052      * Execute a stored function, with any required input variables, and store the
1053      * return value into a D variable.
1054      *
1055      * For this method, no query string is to be provided. The required one is of
1056      * the form "select foo(?, ? ...)". The method generates it and the appropriate
1057      * bindings - in, and out. Chunked transfers are not supported in either
1058      * direction. If you need them, create the parameters separately, then use
1059      * execPreparedResult() to get a one-row, one-column result set.
1060      *
1061      * If it is not possible to convert the column value to the type of target,
1062      * then execFunction will throw. If the result is NULL, that is indicated
1063      * by a false return value, and target is unchanged.
1064      *
1065      * In the interest of performance, this method assumes that the user has the
1066      * equired information about the number and types of IN parameters and the
1067      * type of the output variable. In the same interest, if the method is called
1068      * repeatedly for the same stored function, prepare() is omitted after the first call.
1069      *
1070      * Params:
1071      *    T = The type of the variable to receive the return result.
1072      *    U = type tuple of arguments
1073      *    name = The name of the stored function.
1074      *    target = the D variable to receive the stored function return result.
1075      *    args = The list of D variables to act as IN arguments to the stored function.
1076      *
1077      */
1078     bool execFunction(T, U...)(string name, ref T target, U args)
1079     {
1080         bool repeatCall = (name == _prevFunc);
1081         enforceEx!MYX(repeatCall || _hStmt == 0, "You must not prepare the statement before calling execFunction");
1082         if (!repeatCall)
1083         {
1084             _sql = "select " ~ name ~ "(";
1085             bool comma = false;
1086             foreach (arg; args)
1087             {
1088                 if (comma)
1089                     _sql ~= ",?";
1090                 else
1091                 {
1092                     _sql ~= "?";
1093                     comma = true;
1094                 }
1095             }
1096             _sql ~= ")";
1097             prepare();
1098             _prevFunc = name;
1099         }
1100         bindParameterTuple(args);
1101         ulong ra;
1102         enforceEx!MYX(execPrepared(ra), "The executed query did not produce a result set.");
1103         Row rr = getNextRow();
1104         /+enforceEx!MYX(rr._valid, "The result set was empty.");+/
1105         enforceEx!MYX(rr._values.length == 1, "Result was not a single column.");
1106         enforceEx!MYX(typeid(target).toString() == rr._values[0].type.toString(),
1107                         "Target type and column type are not compatible.");
1108         if (!rr.isNull(0))
1109             target = rr._values[0].get!(T);
1110         // If there were more rows, flush them away
1111         // Question: Should I check in purgeResult and throw if there were - it's very inefficient to
1112         // allow sloppy SQL that does not ensure just one row!
1113         purgeResult();
1114         return !rr.isNull(0);
1115     }
1116 
1117     /**
1118      * Execute a stored procedure, with any required input variables.
1119      *
1120      * For this method, no query string is to be provided. The required one is
1121      * of the form "call proc(?, ? ...)". The method generates it and the
1122      * appropriate in bindings. Chunked transfers are not supported. If you
1123      * need them, create the parameters separately, then use execPrepared() or
1124      * execPreparedResult().
1125      *
1126      * In the interest of performance, this method assumes that the user has
1127      * the required information about the number and types of IN parameters.
1128      * In the same interest, if the method is called repeatedly for the same
1129      * stored function, prepare() and other redundant operations are omitted
1130      * after the first call.
1131      *
1132      * OUT parameters are not currently supported. It should generally be
1133      * possible with MySQL to present them as a result set.
1134      *
1135      * Params:
1136      *    T = Type tuple
1137      *    name = The name of the stored procedure.
1138      *    args = Tuple of args
1139      * Returns: True if the SP created a result set.
1140      */
1141     bool execProcedure(T...)(string name, ref T args)
1142     {
1143         bool repeatCall = (name == _prevFunc);
1144         enforceEx!MYX(repeatCall || _hStmt == 0, "You must not prepare a statement before calling execProcedure");
1145         if (!repeatCall)
1146         {
1147             _sql = "call " ~ name ~ "(";
1148             bool comma = false;
1149             foreach (arg; args)
1150             {
1151                 if (comma)
1152                     _sql ~= ",?";
1153                 else
1154                 {
1155                     _sql ~= "?";
1156                     comma = true;
1157                 }
1158             }
1159             _sql ~= ")";
1160             prepare();
1161             _prevFunc = name;
1162         }
1163         bindParameterTuple(args);
1164         ulong ra;
1165         return execPrepared(ra);
1166     }
1167 
1168     /// After a command that inserted a row into a table with an auto-increment
1169     /// ID column, this method allows you to retrieve the last insert ID.
1170     @property ulong lastInsertID() pure const nothrow { return _insertID; }
1171 
1172     /// Gets the number of parameters in this Command
1173     @property ushort numParams() pure const nothrow
1174     {
1175         return _psParams;
1176     }
1177 
1178     /// Gets the number of rows pending
1179     @property bool rowsPending() pure const nothrow { return _rowsPending; }
1180 
1181     /// Gets the result header's field descriptions.
1182     @property FieldDescription[] resultFieldDescriptions() pure { return _rsh.fieldDescriptions; }
1183     /// Gets the prepared header's field descriptions.
1184     @property FieldDescription[] preparedFieldDescriptions() pure { return _psh.fieldDescriptions; }
1185     /// Gets the prepared header's param descriptions.
1186     @property ParamDescription[] preparedParamDescriptions() pure { return _psh.paramDescriptions; }
1187 }