1 /**
2 PostgreSQL client implementation.
3 
4 Features:
5 $(UL
6     $(LI Standalone (does not depend on libpq))
7     $(LI Binary formatting (avoids parsing overhead))
8     $(LI Prepared statements)
9     $(LI Parametrized queries (partially working))
10     $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/datatype-enum.html, Enums))
11     $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/arrays.html, Arrays))
12     $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/rowtypes.html, Composite types))
13 )
14 
15 TODOs:
16 $(UL
17     $(LI Redesign parametrized queries)
18     $(LI BigInt/Numeric types support)
19     $(LI Geometric types support)
20     $(LI Network types support)
21     $(LI Bit string types support)
22     $(LI UUID type support)
23     $(LI XML types support)
24     $(LI Transaction support)
25     $(LI Asynchronous notifications)
26     $(LI Better memory management)
27     $(LI More friendly PGFields)
28 )
29 
30 Bugs:
31 $(UL
32     $(LI Support only cleartext and MD5 $(LINK2 http://www.postgresql.org/docs/9.0/static/auth-methods.html, authentication))
33     $(LI Unfinished parameter handling)
34     $(LI interval is converted to Duration, which does not support months)
35 )
36 
37 $(B Data type mapping:)
38 
39 $(TABLE
40     $(TR $(TH PostgreSQL type) $(TH Aliases) $(TH Default D type) $(TH D type mapping possibilities))
41     $(TR $(TD smallint) $(TD int2) $(TD short) <td rowspan="19">Any type convertible from default D type</td>)
42     $(TR $(TD integer) $(TD int4) $(TD int))
43     $(TR $(TD bigint) $(TD int8) $(TD long))
44     $(TR $(TD oid) $(TD reg***) $(TD uint))
45     $(TR $(TD decimal) $(TD numeric) $(TD not yet supported))
46     $(TR $(TD real) $(TD float4) $(TD float))
47     $(TR $(TD double precision) $(TD float8) $(TD double))
48     $(TR $(TD character varying(n)) $(TD varchar(n)) $(TD string))
49     $(TR $(TD character(n)) $(TD char(n)) $(TD string))
50     $(TR $(TD text) $(TD) $(TD string))
51     $(TR $(TD "char") $(TD) $(TD char))
52     $(TR $(TD bytea) $(TD) $(TD ubyte[]))
53     $(TR $(TD timestamp without time zone) $(TD timestamp) $(TD DateTime))
54     $(TR $(TD timestamp with time zone) $(TD timestamptz) $(TD SysTime))
55     $(TR $(TD date) $(TD) $(TD Date))
56     $(TR $(TD time without time zone) $(TD time) $(TD TimeOfDay))
57     $(TR $(TD time with time zone) $(TD timetz) $(TD SysTime))
58     $(TR $(TD interval) $(TD) $(TD Duration (without months and years)))
59     $(TR $(TD boolean) $(TD bool) $(TD bool))
60     $(TR $(TD enums) $(TD) $(TD string) $(TD enum))
61     $(TR $(TD arrays) $(TD) $(TD Variant[]) $(TD dynamic/static array with compatible element type))
62     $(TR $(TD composites) $(TD record, row) $(TD Variant[]) $(TD dynamic/static array, struct or Tuple))
63 )
64 
65 Examples:
66 with vibe.d use -version=Have_vibe_d_core and use a ConnectionPool (PostgresDB Object & lockConnection)
67 ---
68 
69 	auto pdb = new PostgresDB([
70 		"host" : "192.168.2.50",
71 		"database" : "postgres",
72 		"user" : "postgres",
73 		"password" : ""
74 	]);
75 	auto conn = pdb.lockConnection();
76 
77 	auto cmd = new PGCommand(conn, "SELECT typname, typlen FROM pg_type");
78 	auto result = cmd.executeQuery;
79 
80 	try
81 	{
82 		foreach (row; result)
83 		{
84 			writeln(row["typname"], ", ", row[1]);
85 		}
86 	}
87 	finally
88 	{
89 		result.close;
90 	}
91 
92 ---
93 without vibe.d you can use std sockets with PGConnection object
94 
95 ---
96 import std.stdio;
97 import ddb.postgres;
98 
99 int main(string[] argv)
100 {
101     auto conn = new PGConnection([
102         "host" : "localhost",
103         "database" : "test",
104         "user" : "postgres",
105         "password" : "postgres"
106     ]);
107 
108     scope(exit) conn.close;
109 
110     auto cmd = new PGCommand(conn, "SELECT typname, typlen FROM pg_type");
111     auto result = cmd.executeQuery;
112 
113     try
114     {
115         foreach (row; result)
116         {
117             writeln(row[0], ", ", row[1]);
118         }
119     }
120     finally
121     {
122         result.close;
123     }
124 
125     return 0;
126 }
127 ---
128 
129 Copyright: Copyright Piotr Szturmaj 2011-.
130 License: $(LINK2 http://boost.org/LICENSE_1_0.txt, Boost License 1.0).
131 Authors: Piotr Szturmaj
132 *//*
133 Documentation contains portions copied from PostgreSQL manual (mainly field information and
134 connection parameters description). License:
135 
136 Portions Copyright (c) 1996-2010, The PostgreSQL Global Development Group
137 Portions Copyright (c) 1994, The Regents of the University of California
138 
139 Permission to use, copy, modify, and distribute this software and its documentation for any purpose,
140 without fee, and without a written agreement is hereby granted, provided that the above copyright
141 notice and this paragraph and the following two paragraphs appear in all copies.
142 
143 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT,
144 INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS,
145 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY
146 OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
147 
148 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
149 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
150 PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
151 CALIFORNIA HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
152 OR MODIFICATIONS.
153 */
154 module ddb.postgres;
155 
156 version (Have_vibe_d_core)
157 {
158     import vibe.core.net;
159     import vibe.core.stream;
160 }
161 else
162 {
163     import std.socket;
164 }
165 import std.bitmanip;
166 import std.exception;
167 import std.conv;
168 import std.traits;
169 import std.typecons;
170 import std.string;
171 import std.digest.md;
172 import core.bitop;
173 import std.variant;
174 import std.algorithm;
175 import std.stdio;
176 import std.datetime;
177 import std.uuid;
178 public import ddb.db;
179 
180 private:
181 
182 const PGEpochDate = Date(2000, 1, 1);
183 const PGEpochDay = PGEpochDate.dayOfGregorianCal;
184 const PGEpochTime = TimeOfDay(0, 0, 0);
185 const PGEpochDateTime = DateTime(2000, 1, 1, 0, 0, 0);
186 
187 class PGStream
188 {
189     version (Have_vibe_d_core)
190     {
191         private TCPConnectionWrapper m_socket;
192 
193         @property TCPConnectionWrapper socket()
194         {
195             return m_socket;
196         }
197 
198         this(TCPConnectionWrapper socket)
199         {
200             m_socket = socket;
201         }
202     }
203     else
204     {
205         private Socket m_socket;
206 
207         @property Socket socket()
208         {
209             return m_socket;
210         }
211 
212         this(Socket socket)
213         {
214             m_socket = socket;
215         }
216     }
217 
218     protected void read(ubyte[] buffer)
219     {
220         version(Have_vibe_d_core)
221         {
222             m_socket.read(buffer);
223         }
224         else
225         {
226             if (buffer.length > 0)
227             {
228                 ptrdiff_t so_far = 0;
229                 do {
230                     so_far += m_socket.receive(buffer[so_far .. $]);
231                 } while (so_far < buffer.length);
232             }
233         }
234     }
235 
236     void write(ubyte[] x)
237     {
238         version(Have_vibe_d_core)
239         {
240             m_socket.write(x);
241         }
242         else
243         {
244             if (x.length > 0)
245             {
246                 m_socket.send(x);
247             }
248         }
249     }
250 
251 	void write(ubyte x)
252 	{
253 		write(nativeToBigEndian(x)); // ubyte[]
254 	}
255 
256     void write(short x)
257 	{
258 		write(nativeToBigEndian(x)); // ubyte[]
259 	}
260 
261     void write(int x)
262 	{
263 		write(nativeToBigEndian(x)); // ubyte[]
264 	}
265 
266     void write(long x)
267     {
268 		write(nativeToBigEndian(x));
269 	}
270 
271     void write(float x)
272     {
273 		write(nativeToBigEndian(x)); // ubyte[]
274     }
275 
276     void write(double x)
277     {
278 		write(nativeToBigEndian(x));
279 	}
280 
281     void writeString(string x)
282     {
283         ubyte[] ub = cast(ubyte[])(x);
284         write(ub);
285     }
286 
287     void writeCString(string x)
288     {
289         writeString(x);
290         write('\0');
291     }
292 
293     void writeCString(char[] x)
294     {
295         write(cast(ubyte[])x);
296         write('\0');
297     }
298 
299     void write(const ref Date x)
300     {
301         write(cast(int)(x.dayOfGregorianCal - PGEpochDay));
302     }
303 
304     void write(Date x)
305     {
306         write(cast(int)(x.dayOfGregorianCal - PGEpochDay));
307     }
308 
309     void write(const ref TimeOfDay x)
310 	{
311 		write(cast(int)((x - PGEpochTime).total!"usecs"));
312     }
313 
314     void write(const ref DateTime x) // timestamp
315 	{
316 		write(cast(int)((x - PGEpochDateTime).total!"usecs"));
317     }
318 
319     void write(DateTime x) // timestamp
320 	{
321 		write(cast(int)((x - PGEpochDateTime).total!"usecs"));
322     }
323 
324     void write(const ref SysTime x) // timestamptz
325 	{
326 		write(cast(int)((x - SysTime(PGEpochDateTime, UTC())).total!"usecs"));
327     }
328 
329     // BUG: Does not support months
330     void write(const ref core.time.Duration x) // interval
331 	{
332 		int months = cast(int)(x.split!"weeks".weeks/28);
333 		int days = cast(int)x.split!"days".days;
334         long usecs = x.total!"usecs" - convert!("days", "usecs")(days);
335 
336         write(usecs);
337         write(days);
338 		write(months);
339 	}
340 
341     void writeTimeTz(const ref SysTime x) // timetz
342 	{
343 		TimeOfDay t = cast(TimeOfDay)x;
344         write(t);
345 		write(cast(int)0);
346 	}
347 }
348 
349 char[32] MD5toHex(T...)(in T data)
350 {
351     return md5Of(data).toHexString!(LetterCase.lower);
352 }
353 
354 struct Message
355 {
356     PGConnection conn;
357     char type;
358     ubyte[] data;
359 
360     private size_t position = 0;
361 
362     T read(T, Params...)(Params p)
363     {
364         T value;
365         read(value, p);
366         return value;
367     }
368 
369     void read()(out char x)
370     {
371         x = data[position++];
372     }
373 
374 
375     void read(Int)(out Int x) if((isIntegral!Int || isFloatingPoint!Int) && Int.sizeof > 1)
376     {
377         ubyte[Int.sizeof] buf;
378         buf[] = data[position..position+Int.sizeof];
379         x = bigEndianToNative!Int(buf);
380         position += Int.sizeof;
381     }
382 
383     string readCString()
384     {
385         string x;
386         readCString(x);
387         return x;
388     }
389 
390     void readCString(out string x)
391     {
392         ubyte* p = data.ptr + position;
393 
394         while (*p > 0)
395             p++;
396 		x = cast(string)data[position .. cast(size_t)(p - data.ptr)];
397         position = cast(size_t)(p - data.ptr + 1);
398     }
399 
400     string readString(int len)
401     {
402         string x;
403         readString(x, len);
404         return x;
405     }
406 
407     void readString(out string x, int len)
408 	{
409 		x = cast(string)(data[position .. position + len]);
410 		position += len;
411 	}
412 
413     void read()(out bool x)
414     {
415         x = cast(bool)data[position++];
416     }
417 
418     void read()(out ubyte[] x, int len)
419     {
420         enforce(position + len <= data.length);
421         x = data[position .. position + len];
422         position += len;
423     }
424 
425     void read()(out UUID u) // uuid
426     {
427         ubyte[16] uuidData = data[position .. position + 16];
428         position += 16;
429         u = UUID(uuidData);
430     }
431 
432     void read()(out Date x) // date
433     {
434         int days = read!int; // number of days since 1 Jan 2000
435         x = PGEpochDate + dur!"days"(days);
436     }
437 
438     void read()(out TimeOfDay x) // time
439     {
440         long usecs = read!long;
441         x = PGEpochTime + dur!"usecs"(usecs);
442     }
443 
444     void read()(out DateTime x) // timestamp
445     {
446         long usecs = read!long;
447         x = PGEpochDateTime + dur!"usecs"(usecs);
448     }
449 
450     void read()(out SysTime x) // timestamptz
451     {
452         long usecs = read!long;
453         x = SysTime(PGEpochDateTime + dur!"usecs"(usecs), UTC());
454         x.timezone = LocalTime();
455     }
456 
457     // BUG: Does not support months
458     void read()(out core.time.Duration x) // interval
459     {
460         long usecs = read!long;
461         int days = read!int;
462         int months = read!int;
463 
464         x = dur!"days"(days) + dur!"usecs"(usecs);
465     }
466 
467     SysTime readTimeTz() // timetz
468     {
469         TimeOfDay time = read!TimeOfDay;
470         int zone = read!int / 60; // originally in seconds, convert it to minutes
471         Duration duration = dur!"minutes"(zone);
472         auto stz = new immutable SimpleTimeZone(duration);
473         return SysTime(DateTime(Date(0, 1, 1), time), stz);
474     }
475 
476     T readComposite(T)()
477     {
478         alias DBRow!T Record;
479 
480         static if (Record.hasStaticLength)
481         {
482             alias Record.fieldTypes fieldTypes;
483 
484             static string genFieldAssigns() // CTFE
485             {
486                 string s = "";
487 
488                 foreach (i; 0 .. fieldTypes.length)
489                 {
490                     s ~= "read(fieldOid);\n";
491                     s ~= "read(fieldLen);\n";
492                     s ~= "if (fieldLen == -1)\n";
493                     s ~= text("record.setNull!(", i, ");\n");
494                     s ~= "else\n";
495                     s ~= text("record.set!(fieldTypes[", i, "], ", i, ")(",
496                               "readBaseType!(fieldTypes[", i, "])(fieldOid, fieldLen)",
497                               ");\n");
498                     // text() doesn't work with -inline option, CTFE bug
499                 }
500 
501                 return s;
502             }
503         }
504 
505         Record record;
506 
507         int fieldCount, fieldLen;
508         uint fieldOid;
509 
510         read(fieldCount);
511 
512         static if (Record.hasStaticLength)
513             mixin(genFieldAssigns);
514         else
515         {
516             record.setLength(fieldCount);
517 
518             foreach (i; 0 .. fieldCount)
519             {
520                 read(fieldOid);
521                 read(fieldLen);
522 
523                 if (fieldLen == -1)
524                     record.setNull(i);
525                 else
526                     record[i] = readBaseType!(Record.ElemType)(fieldOid, fieldLen);
527             }
528         }
529 
530         return record.base;
531     }
532 	mixin template elmnt(U : U[])
533 	{
534 		alias U ElemType;
535 	}
536     private AT readDimension(AT)(int[] lengths, uint elementOid, int dim)
537     {
538 
539         mixin elmnt!AT;
540 
541         int length = lengths[dim];
542 
543         AT array;
544         static if (isDynamicArray!AT)
545             array.length = length;
546 
547         int fieldLen;
548 
549         foreach(i; 0 .. length)
550         {
551             static if (isArray!ElemType && !isSomeString!ElemType)
552                 array[i] = readDimension!ElemType(lengths, elementOid, dim + 1);
553             else
554             {
555                 static if (isNullable!ElemType)
556                     alias nullableTarget!ElemType E;
557                 else
558                     alias ElemType E;
559 
560                 read(fieldLen);
561                 if (fieldLen == -1)
562                 {
563                     static if (isNullable!ElemType || isSomeString!ElemType)
564                         array[i] = null;
565                     else
566                         throw new Exception("Can't set NULL value to non nullable type");
567                 }
568                 else
569                     array[i] = readBaseType!E(elementOid, fieldLen);
570             }
571         }
572 
573         return array;
574     }
575 
576     T readArray(T)()
577         if (isArray!T)
578     {
579         alias multiArrayElemType!T U;
580 
581         // todo: more validation, better lowerBounds support
582         int dims, hasNulls;
583         uint elementOid;
584         int[] lengths, lowerBounds;
585 
586         read(dims);
587         read(hasNulls); // 0 or 1
588         read(elementOid);
589 
590         if (dims == 0)
591             return T.init;
592 
593         enforce(arrayDimensions!T == dims, "Dimensions of arrays do not match");
594         static if (!isNullable!U && !isSomeString!U)
595             enforce(!hasNulls, "PostgreSQL returned NULLs but array elements are not Nullable");
596 
597         lengths.length = lowerBounds.length = dims;
598 
599         int elementCount = 1;
600 
601         foreach(i; 0 .. dims)
602         {
603             int len;
604 
605             read(len);
606             read(lowerBounds[i]);
607             lengths[i] = len;
608 
609             elementCount *= len;
610         }
611 
612         T array = readDimension!T(lengths, elementOid, 0);
613 
614         return array;
615     }
616 
617     T readEnum(T)(int len)
618     {
619         string genCases() // CTFE
620         {
621             string s;
622 
623             foreach (name; __traits(allMembers, T))
624             {
625                 s ~= text(`case "`, name, `": return T.`, name, `;`);
626             }
627 
628             return s;
629         }
630 
631         string enumMember = readString(len);
632 
633         switch (enumMember)
634         {
635             mixin(genCases);
636             default: throw new ConvException("Can't set enum value '" ~ enumMember ~ "' to enum type " ~ T.stringof);
637         }
638     }
639 
640     T readBaseType(T)(uint oid, int len = 0)
641     {
642         auto convError(T)()
643         {
644             string* type = oid in baseTypes;
645             return new ConvException("Can't convert PostgreSQL's type " ~ (type ? *type : to!string(oid)) ~ " to " ~ T.stringof);
646         }
647 
648         switch (oid)
649         {
650             case 16: // bool
651                 static if (isConvertible!(T, bool))
652                     return _to!T(read!bool);
653                 else
654                     throw convError!T();
655             case 26, 24, 2202, 2203, 2204, 2205, 2206, 3734, 3769: // oid and reg*** aliases
656                 static if (isConvertible!(T, uint))
657                     return _to!T(read!uint);
658                 else
659                     throw convError!T();
660             case 21: // int2
661                 static if (isConvertible!(T, short))
662                     return _to!T(read!short);
663                 else
664                     throw convError!T();
665             case 23: // int4
666                 static if (isConvertible!(T, int))
667                     return _to!T(read!int);
668                 else
669                     throw convError!T();
670             case 20: // int8
671                 static if (isConvertible!(T, long))
672                     return _to!T(read!long);
673                 else
674                     throw convError!T();
675             case 700: // float4
676                 static if (isConvertible!(T, float))
677                     return _to!T(read!float);
678                 else
679                     throw convError!T();
680             case 701: // float8
681                 static if (isConvertible!(T, double))
682                     return _to!T(read!double);
683                 else
684                     throw convError!T();
685             case 1042, 1043, 25, 19, 705: // bpchar, varchar, text, name, unknown
686                 static if (isConvertible!(T, string))
687                     return _to!T(readString(len));
688                 else
689                     throw convError!T();
690             case 17: // bytea
691                 static if (isConvertible!(T, ubyte[]))
692                     return _to!T(read!(ubyte[])(len));
693                 else
694                     throw convError!T();
695             case 2950: // UUID
696                 static if(isConvertible!(T, UUID))
697                     return _to!T(read!UUID());
698                 else
699                     throw convError!T();
700             case 18: // "char"
701                 static if (isConvertible!(T, char))
702                     return _to!T(read!char);
703                 else
704                     throw convError!T();
705             case 1082: // date
706                 static if (isConvertible!(T, Date))
707                     return _to!T(read!Date);
708                 else
709                     throw convError!T();
710             case 1083: // time
711                 static if (isConvertible!(T, TimeOfDay))
712                     return _to!T(read!TimeOfDay);
713                 else
714                     throw convError!T();
715             case 1114: // timestamp
716                 static if (isConvertible!(T, DateTime))
717                     return _to!T(read!DateTime);
718                 else
719                     throw convError!T();
720             case 1184: // timestamptz
721                 static if (isConvertible!(T, SysTime))
722                     return _to!T(read!SysTime);
723                 else
724                     throw convError!T();
725             case 1186: // interval
726                 static if (isConvertible!(T, core.time.Duration))
727                     return _to!T(read!(core.time.Duration));
728                 else
729                     throw convError!T();
730             case 1266: // timetz
731                 static if (isConvertible!(T, SysTime))
732                     return _to!T(readTimeTz);
733                 else
734                     throw convError!T();
735             case 2249: // record and other composite types
736                 static if (isVariantN!T && T.allowed!(Variant[]))
737                     return T(readComposite!(Variant[]));
738                 else
739                     return readComposite!T;
740             case 2287: // _record and other arrays
741                 static if (isArray!T && !isSomeString!T)
742                     return readArray!T;
743                 else static if (isVariantN!T && T.allowed!(Variant[]))
744                     return T(readArray!(Variant[]));
745                 else
746                     throw convError!T();
747             case 114: //JSON
748                 static if (isConvertible!(T, string))
749                     return _to!T(readString(len));
750                 else
751                     throw convError!T();
752             default:
753                 if (oid in conn.arrayTypes)
754                     goto case 2287;
755                 else if (oid in conn.compositeTypes)
756                     goto case 2249;
757                 else if (oid in conn.enumTypes)
758                 {
759                     static if (is(T == enum))
760                         return readEnum!T(len);
761                     else static if (isConvertible!(T, string))
762                         return _to!T(readString(len));
763                     else
764                         throw convError!T();
765                 }
766         }
767 
768         throw convError!T();
769     }
770 }
771 
772 // workaround, because std.conv currently doesn't support VariantN
773 template _to(T)
774 {
775     static if (isVariantN!T)
776         T _to(S)(S value) { T t = value; return t; }
777     else
778         T _to(A...)(A args) { return std.conv.to!T(args); }
779 }
780 
781 template isConvertible(T, S)
782 {
783     static if (__traits(compiles, { S s; _to!T(s); }) || (isVariantN!T && T.allowed!S))
784         enum isConvertible = true;
785     else
786         enum isConvertible = false;
787 }
788 
789 template arrayDimensions(T : T[])
790 {
791 	static if (isArray!T && !isSomeString!T)
792 		enum arrayDimensions = arrayDimensions!T + 1;
793 	else
794 		enum arrayDimensions = 1;
795 }
796 
797 template arrayDimensions(T)
798 {
799 		enum arrayDimensions = 0;
800 }
801 
802 template multiArrayElemType(T : T[])
803 {
804     static if (isArray!T && !isSomeString!T)
805         alias multiArrayElemType!T multiArrayElemType;
806     else
807         alias T multiArrayElemType;
808 }
809 
810 template multiArrayElemType(T)
811 {
812 	alias T multiArrayElemType;
813 }
814 
815 static assert(arrayDimensions!(int) == 0);
816 static assert(arrayDimensions!(int[]) == 1);
817 static assert(arrayDimensions!(int[][]) == 2);
818 static assert(arrayDimensions!(int[][][]) == 3);
819 
820 enum TransactionStatus : char { OutsideTransaction = 'I', InsideTransaction = 'T', InsideFailedTransaction = 'E' }
821 
822 enum string[int] baseTypes = [
823     // boolean types
824     16 : "bool",
825     // bytea types
826     17 : "bytea",
827     // character types
828     18 : `"char"`, // "char" - 1 byte internal type
829     1042 : "bpchar", // char(n) - blank padded
830     1043 : "varchar",
831     25 : "text",
832     19 : "name",
833     // numeric types
834     21 : "int2",
835     23 : "int4",
836     20 : "int8",
837     700 : "float4",
838     701 : "float8",
839     1700 : "numeric"
840 ];
841 
842 public:
843 
844 enum PGType : int
845 {
846     OID = 26,
847     NAME = 19,
848     REGPROC = 24,
849     BOOLEAN = 16,
850     BYTEA = 17,
851     CHAR = 18, // 1 byte "char", used internally in PostgreSQL
852     BPCHAR = 1042, // Blank Padded char(n), fixed size
853     VARCHAR = 1043,
854     TEXT = 25,
855     INT2 = 21,
856     INT4 = 23,
857     INT8 = 20,
858     FLOAT4 = 700,
859     FLOAT8 = 701,
860 
861     // reference https://github.com/lpsmith/postgresql-simple/blob/master/src/Database/PostgreSQL/Simple/TypeInfo/Static.hs#L74
862     DATE = 1082,
863     TIME = 1083,
864     TIMESTAMP = 1114,
865     TIMESTAMPTZ = 1184,
866     INTERVAL = 1186,
867     TIMETZ = 1266,
868 
869     JSON = 114,
870     JSONARRAY = 199
871 }
872 
873 class ParamException : Exception
874 {
875     this(string msg, string fn = __FILE__, size_t ln = __LINE__) @safe pure nothrow
876     {
877         super(msg, fn, ln);
878     }
879 }
880 
881 /// Exception thrown on server error
882 class ServerErrorException: Exception
883 {
884     /// Contains information about this _error. Aliased to this.
885     ResponseMessage error;
886     alias error this;
887 
888     this(string msg, string fn = __FILE__, size_t ln = __LINE__) @safe pure nothrow
889     {
890         super(msg, fn, ln);
891     }
892 
893     this(ResponseMessage error, string fn = __FILE__, size_t ln = __LINE__)
894     {
895         super(error.toString(), fn, ln);
896         this.error = error;
897     }
898 }
899 
900 /**
901 Class encapsulating errors and notices.
902 
903 This class provides access to fields of ErrorResponse and NoticeResponse
904 sent by the server. More information about these fields can be found
905 $(LINK2 http://www.postgresql.org/docs/9.0/static/protocol-error-fields.html,here).
906 */
907 class ResponseMessage
908 {
909     private string[char] fields;
910 
911     private string getOptional(char type)
912     {
913         string* p = type in fields;
914         return p ? *p : "";
915     }
916 
917     /// Message fields
918     @property string severity()
919     {
920         return fields['S'];
921     }
922 
923     /// ditto
924     @property string code()
925     {
926         return fields['C'];
927     }
928 
929     /// ditto
930     @property string message()
931     {
932         return fields['M'];
933     }
934 
935     /// ditto
936     @property string detail()
937     {
938         return getOptional('D');
939     }
940 
941     /// ditto
942     @property string hint()
943     {
944         return getOptional('H');
945     }
946 
947     /// ditto
948     @property string position()
949     {
950         return getOptional('P');
951     }
952 
953     /// ditto
954     @property string internalPosition()
955     {
956         return getOptional('p');
957     }
958 
959     /// ditto
960     @property string internalQuery()
961     {
962         return getOptional('q');
963     }
964 
965     /// ditto
966     @property string where()
967     {
968         return getOptional('W');
969     }
970 
971     /// ditto
972     @property string file()
973     {
974         return getOptional('F');
975     }
976 
977     /// ditto
978     @property string line()
979     {
980         return getOptional('L');
981     }
982 
983     /// ditto
984     @property string routine()
985     {
986         return getOptional('R');
987     }
988 
989     /**
990     Returns summary of this message using the most common fields (severity,
991     code, message, detail, hint)
992     */
993     override string toString()
994     {
995         string s = severity ~ ' ' ~ code ~ ": " ~ message;
996 
997         string* detail = 'D' in fields;
998         if (detail)
999             s ~= "\nDETAIL: " ~ *detail;
1000 
1001         string* hint = 'H' in fields;
1002         if (hint)
1003             s ~= "\nHINT: " ~ *hint;
1004 
1005         return s;
1006     }
1007 }
1008 
1009 /**
1010 Class representing connection to PostgreSQL server.
1011 */
1012 class PGConnection
1013 {
1014     private:
1015         PGStream stream;
1016         string[string] serverParams;
1017         int serverProcessID;
1018         int serverSecretKey;
1019         TransactionStatus trStatus;
1020         ulong lastPrepared = 0;
1021         uint[uint] arrayTypes;
1022         uint[][uint] compositeTypes;
1023         string[uint][uint] enumTypes;
1024         bool activeResultSet;
1025 
1026         string reservePrepared()
1027         {
1028             synchronized (this)
1029             {
1030 
1031                 return to!string(lastPrepared++);
1032             }
1033         }
1034 
1035         Message getMessage()
1036         {
1037 
1038             char type;
1039             int len;
1040 			ubyte[1] ub;
1041 			stream.read(ub); // message type
1042 
1043 			type = bigEndianToNative!char(ub);
1044 			ubyte[4] ubi;
1045 			stream.read(ubi); // message length, doesn't include type byte
1046 
1047 			len = bigEndianToNative!int(ubi) - 4;
1048 
1049             ubyte[] msg;
1050             if (len > 0)
1051             {
1052                 msg = new ubyte[len];
1053                 stream.read(msg);
1054             }
1055 
1056             return Message(this, type, msg);
1057         }
1058 
1059         void sendStartupMessage(const string[string] params)
1060         {
1061             bool localParam(string key)
1062             {
1063                 switch (key)
1064                 {
1065                     case "host", "port", "password": return true;
1066                     default: return false;
1067                 }
1068             }
1069 
1070             int len = 9; // length (int), version number (int) and parameter-list's delimiter (byte)
1071 
1072             foreach (key, value; params)
1073             {
1074                 if (localParam(key))
1075                     continue;
1076 
1077                 len += key.length + value.length + 2;
1078             }
1079 
1080             stream.write(len);
1081             stream.write(0x0003_0000); // version number 3
1082             foreach (key, value; params)
1083             {
1084                 if (localParam(key))
1085                     continue;
1086                 stream.writeCString(key);
1087                 stream.writeCString(value);
1088             }
1089 		stream.write(cast(ubyte)0);
1090 	}
1091 
1092         void sendPasswordMessage(string password)
1093         {
1094             int len = cast(int)(4 + password.length + 1);
1095 
1096             stream.write('p');
1097             stream.write(len);
1098             stream.writeCString(password);
1099         }
1100 
1101         void sendParseMessage(string statementName, string query, int[] oids)
1102         {
1103             int len = cast(int)(4 + statementName.length + 1 + query.length + 1 + 2 + oids.length * 4);
1104 
1105             stream.write('P');
1106             stream.write(len);
1107             stream.writeCString(statementName);
1108             stream.writeCString(query);
1109             stream.write(cast(short)oids.length);
1110 
1111             foreach (oid; oids)
1112                 stream.write(oid);
1113         }
1114 
1115         void sendCloseMessage(DescribeType type, string name)
1116 		{
1117 			stream.write('C');
1118             stream.write(cast(int)(4 + 1 + name.length + 1));
1119             stream.write(cast(char)type);
1120             stream.writeCString(name);
1121         }
1122 
1123         void sendTerminateMessage()
1124 		{
1125 			stream.write('X');
1126             stream.write(cast(int)4);
1127         }
1128 
1129         void sendBindMessage(string portalName, string statementName, PGParameters params)
1130         {
1131             int paramsLen = 0;
1132             bool hasText = false;
1133 
1134 			foreach (param; params)
1135             {
1136                 enforce(param.value.hasValue, new ParamException("Parameter $" ~ to!string(param.index) ~ " value is not initialized"));
1137 
1138                 void checkParam(T)(int len)
1139                 {
1140                     if (param.value != null)
1141                     {
1142                         enforce(param.value.convertsTo!T, new ParamException("Parameter's value is not convertible to " ~ T.stringof));
1143                         paramsLen += len;
1144                     }
1145                 }
1146 
1147                 /*final*/ switch (param.type)
1148                 {
1149                     case PGType.INT2: checkParam!short(2); break;
1150                     case PGType.INT4: checkParam!int(4); break;
1151                     case PGType.INT8: checkParam!long(8); break;
1152                     case PGType.TEXT:
1153                         paramsLen += param.value.coerce!string.length;
1154                         hasText = true;
1155                         break;
1156                     case PGType.BYTEA:
1157                         paramsLen += param.value.length;
1158                         break;
1159                     case PGType.JSON:
1160                         paramsLen += param.value.coerce!string.length; // TODO: object serialisation
1161                         break;
1162                     case PGType.DATE:
1163                         paramsLen += 4; break;
1164                     case PGType.FLOAT4: checkParam!float(4); break;
1165                     case PGType.FLOAT8: checkParam!double(8); break;
1166                     case PGType.BOOLEAN: checkParam!bool(1); break;
1167                     default: assert(0, "Not implemented " ~ to!string(param.type));
1168                 }
1169             }
1170 
1171             int len = cast(int)( 4 + portalName.length + 1 + statementName.length + 1 + (hasText ? (params.length*2) : 2) + 2 + 2 +
1172                 params.length * 4 + paramsLen + 2 + 2 );
1173 
1174             stream.write('B');
1175             stream.write(len);
1176             stream.writeCString(portalName);
1177             stream.writeCString(statementName);
1178             if(hasText)
1179             {
1180                 stream.write(cast(short) params.length);
1181                 foreach(param; params)
1182                     if(param.type == PGType.TEXT)
1183                         stream.write(cast(short) 0); // text format
1184                     else
1185                         stream.write(cast(short) 1); // binary format
1186             } else {
1187                 stream.write(cast(short)1); // one parameter format code
1188                 stream.write(cast(short)1); // binary format
1189             }
1190             stream.write(cast(short)params.length);
1191 
1192             foreach (param; params)
1193             {
1194                 if (param.value == null)
1195                 {
1196                     stream.write(-1);
1197                     continue;
1198                 }
1199 
1200                 switch (param.type)
1201                 {
1202                     case PGType.INT2:
1203                         stream.write(cast(int)2);
1204                         stream.write(param.value.coerce!short);
1205                         break;
1206                     case PGType.INT4:
1207                         stream.write(cast(int)4);
1208                         stream.write(param.value.coerce!int);
1209                         break;
1210                     case PGType.INT8:
1211                         stream.write(cast(int)8);
1212                         stream.write(param.value.coerce!long);
1213                         break;
1214                     case PGType.FLOAT4:
1215                         stream.write(cast(int)4);
1216                         stream.write(param.value.coerce!float);
1217                         break;
1218                     case PGType.FLOAT8:
1219                         stream.write(cast(int)8);
1220                         stream.write(param.value.coerce!double);
1221                         break;
1222                     case PGType.TEXT:
1223                         auto s = param.value.coerce!string;
1224                         stream.write(cast(int) s.length);
1225                         if(s.length) stream.write(cast(ubyte[]) s);
1226                         break;
1227                     case PGType.BYTEA:
1228                         auto s = param.value;
1229                         stream.write(cast(int) s.length);
1230 
1231                         ubyte[] x;
1232                         x.length = s.length;
1233                         for (int i = 0; i < x.length; i++) {
1234                             x[i] = s[i].get!(ubyte);
1235                         }
1236                         stream.write(x);
1237                         break;
1238                     case PGType.JSON:
1239                         auto s = param.value.coerce!string;
1240                         stream.write(cast(int) s.length);
1241                         stream.write(cast(ubyte[]) s);
1242                         break;
1243                     case PGType.DATE:
1244                         stream.write(cast(int) 4);
1245                         stream.write(Date.fromISOString(param.value.coerce!string));
1246                         break;
1247                     case PGType.BOOLEAN:
1248                         stream.write(cast(int) 1);
1249                         stream.write(param.value.coerce!bool);
1250                         break;
1251                     default:
1252 						assert(0, "Not implemented " ~ to!string(param.type));
1253                 }
1254             }
1255 
1256             stream.write(cast(short)1); // one result format code
1257             stream.write(cast(short)1); // binary format
1258         }
1259 
1260         enum DescribeType : char { Statement = 'S', Portal = 'P' }
1261 
1262         void sendDescribeMessage(DescribeType type, string name)
1263 		{
1264 			stream.write('D');
1265             stream.write(cast(int)(4 + 1 + name.length + 1));
1266             stream.write(cast(char)type);
1267             stream.writeCString(name);
1268         }
1269 
1270         void sendExecuteMessage(string portalName, int maxRows)
1271 		{
1272 			stream.write('E');
1273             stream.write(cast(int)(4 + portalName.length + 1 + 4));
1274             stream.writeCString(portalName);
1275             stream.write(cast(int)maxRows);
1276         }
1277 
1278         void sendFlushMessage()
1279 		{
1280 			stream.write('H');
1281             stream.write(cast(int)4);
1282         }
1283 
1284         void sendSyncMessage()
1285 		{
1286 			stream.write('S');
1287             stream.write(cast(int)4);
1288         }
1289 
1290         ResponseMessage handleResponseMessage(Message msg)
1291         {
1292             enforce(msg.data.length >= 2);
1293 
1294 			char ftype;
1295             string fvalue;
1296             ResponseMessage response = new ResponseMessage;
1297 
1298             while (true)
1299             {
1300                 msg.read(ftype);
1301                 if(ftype <=0) break;
1302 
1303                 msg.readCString(fvalue);
1304                 response.fields[ftype] = fvalue;
1305             }
1306 
1307             return response;
1308         }
1309 
1310         void checkActiveResultSet()
1311         {
1312             enforce(!activeResultSet, "There's active result set, which must be closed first.");
1313         }
1314 
1315         void prepare(string statementName, string query, PGParameters params)
1316         {
1317             checkActiveResultSet();
1318             sendParseMessage(statementName, query, params.getOids());
1319 
1320             sendFlushMessage();
1321 
1322 	receive:
1323 
1324             Message msg = getMessage();
1325 
1326 		switch (msg.type)
1327             {
1328                 case 'E':
1329                     // ErrorResponse
1330                     ResponseMessage response = handleResponseMessage(msg);
1331                     sendSyncMessage();
1332                     throw new ServerErrorException(response);
1333                 case '1':
1334                     // ParseComplete
1335                     return;
1336                 default:
1337                     // async notice, notification
1338                     goto receive;
1339             }
1340         }
1341 
1342         void unprepare(string statementName)
1343         {
1344             checkActiveResultSet();
1345             sendCloseMessage(DescribeType.Statement, statementName);
1346             sendFlushMessage();
1347 
1348         receive:
1349 
1350             Message msg = getMessage();
1351 
1352             switch (msg.type)
1353             {
1354                 case 'E':
1355                     // ErrorResponse
1356                     ResponseMessage response = handleResponseMessage(msg);
1357                     throw new ServerErrorException(response);
1358                 case '3':
1359                     // CloseComplete
1360                     return;
1361                 default:
1362                     // async notice, notification
1363                     goto receive;
1364             }
1365         }
1366 
1367         PGFields bind(string portalName, string statementName, PGParameters params)
1368         {
1369             checkActiveResultSet();
1370             sendCloseMessage(DescribeType.Portal, portalName);
1371             sendBindMessage(portalName, statementName, params);
1372             sendDescribeMessage(DescribeType.Portal, portalName);
1373             sendFlushMessage();
1374 
1375         receive:
1376 
1377             Message msg = getMessage();
1378 
1379             switch (msg.type)
1380             {
1381                 case 'E':
1382                     // ErrorResponse
1383                     ResponseMessage response = handleResponseMessage(msg);
1384                     sendSyncMessage();
1385                     throw new ServerErrorException(response);
1386                 case '3':
1387                     // CloseComplete
1388                     goto receive;
1389                 case '2':
1390                     // BindComplete
1391                     goto receive;
1392                 case 'T':
1393                     // RowDescription (response to Describe)
1394                     PGField[] fields;
1395                     short fieldCount;
1396                     short formatCode;
1397                     PGField fi;
1398 
1399                     msg.read(fieldCount);
1400 
1401                     fields.length = fieldCount;
1402 
1403                     foreach (i; 0..fieldCount)
1404                     {
1405                         msg.readCString(fi.name);
1406                         msg.read(fi.tableOid);
1407                         msg.read(fi.index);
1408                         msg.read(fi.oid);
1409                         msg.read(fi.typlen);
1410                         msg.read(fi.modifier);
1411                         msg.read(formatCode);
1412 
1413                         enforce(formatCode == 1, new Exception("Field's format code returned in RowDescription is not 1 (binary)"));
1414 
1415                         fields[i] = fi;
1416                     }
1417 
1418                     return cast(PGFields)fields;
1419                 case 'n':
1420                     // NoData (response to Describe)
1421                     return new immutable(PGField)[0];
1422                 default:
1423                     // async notice, notification
1424                     goto receive;
1425             }
1426         }
1427 
1428         ulong executeNonQuery(string portalName, out uint oid)
1429         {
1430             checkActiveResultSet();
1431             ulong rowsAffected = 0;
1432 
1433             sendExecuteMessage(portalName, 0);
1434             sendSyncMessage();
1435             sendFlushMessage();
1436 
1437         receive:
1438 
1439             Message msg = getMessage();
1440 
1441             switch (msg.type)
1442             {
1443                 case 'E':
1444                     // ErrorResponse
1445                     ResponseMessage response = handleResponseMessage(msg);
1446                     throw new ServerErrorException(response);
1447                 case 'D':
1448                     // DataRow
1449                     finalizeQuery();
1450                     throw new Exception("This query returned rows.");
1451                 case 'C':
1452                     // CommandComplete
1453                     string tag;
1454 
1455                     msg.readCString(tag);
1456 
1457                     // GDC indexOf name conflict in std.string and std.algorithm
1458                     auto s1 = std..string.indexOf(tag, ' ');
1459                     if (s1 >= 0) {
1460                         switch (tag[0 .. s1]) {
1461                             case "INSERT":
1462                                 // INSERT oid rows
1463                                 auto s2 = lastIndexOf(tag, ' ');
1464                                 assert(s2 > s1);
1465                                 oid = to!uint(tag[s1 + 1 .. s2]);
1466                                 rowsAffected = to!ulong(tag[s2 + 1 .. $]);
1467                                 break;
1468                             case "DELETE", "UPDATE", "MOVE", "FETCH":
1469                                 // DELETE rows
1470                                 rowsAffected = to!ulong(tag[s1 + 1 .. $]);
1471                                 break;
1472                             default:
1473                                 // CREATE TABLE
1474                                 break;
1475                          }
1476                     }
1477 
1478                     goto receive;
1479 
1480                 case 'I':
1481                     // EmptyQueryResponse
1482                     goto receive;
1483                 case 'Z':
1484                     // ReadyForQuery
1485                     return rowsAffected;
1486                 default:
1487                     // async notice, notification
1488                     goto receive;
1489             }
1490         }
1491 
1492         DBRow!Specs fetchRow(Specs...)(ref Message msg, ref PGFields fields)
1493         {
1494             alias DBRow!Specs Row;
1495 
1496             static if (Row.hasStaticLength)
1497             {
1498                 alias Row.fieldTypes fieldTypes;
1499 
1500                 static string genFieldAssigns() // CTFE
1501                 {
1502                     string s = "";
1503 
1504                     foreach (i; 0 .. fieldTypes.length)
1505                     {
1506                         s ~= "msg.read(fieldLen);\n";
1507                         s ~= "if (fieldLen == -1)\n";
1508                         s ~= text("row.setNull!(", i, ")();\n");
1509                         s ~= "else\n";
1510                         s ~= text("row.set!(fieldTypes[", i, "], ", i, ")(",
1511                                   "msg.readBaseType!(fieldTypes[", i, "])(fields[", i, "].oid, fieldLen)",
1512                                   ");\n");
1513                         // text() doesn't work with -inline option, CTFE bug
1514                     }
1515 
1516                     return s;
1517                 }
1518             }
1519 
1520             Row row;
1521             short fieldCount;
1522             int fieldLen;
1523 
1524             msg.read(fieldCount);
1525 
1526             static if (Row.hasStaticLength)
1527             {
1528                 Row.checkReceivedFieldCount(fieldCount);
1529                 mixin(genFieldAssigns);
1530             }
1531             else
1532             {
1533                 row.setLength(fieldCount);
1534 
1535                 foreach (i; 0 .. fieldCount)
1536                 {
1537                     msg.read(fieldLen);
1538                     if (fieldLen == -1)
1539                         row.setNull(i);
1540                     else
1541                         row[i] = msg.readBaseType!(Row.ElemType)(fields[i].oid, fieldLen);
1542                 }
1543             }
1544 
1545             return row;
1546         }
1547 
1548         void finalizeQuery()
1549         {
1550             Message msg;
1551 
1552             do
1553             {
1554                 msg = getMessage();
1555 
1556                 // TODO: process async notifications
1557             }
1558             while (msg.type != 'Z'); // ReadyForQuery
1559         }
1560 
1561         PGResultSet!Specs executeQuery(Specs...)(string portalName, ref PGFields fields)
1562         {
1563             checkActiveResultSet();
1564 
1565             PGResultSet!Specs result = new PGResultSet!Specs(this, fields, &fetchRow!Specs);
1566 
1567             ulong rowsAffected = 0;
1568 
1569             sendExecuteMessage(portalName, 0);
1570             sendSyncMessage();
1571             sendFlushMessage();
1572 
1573         receive:
1574 
1575             Message msg = getMessage();
1576 
1577             switch (msg.type)
1578             {
1579                 case 'D':
1580                     // DataRow
1581                     alias DBRow!Specs Row;
1582 
1583                     result.row = fetchRow!Specs(msg, fields);
1584                     static if (!Row.hasStaticLength)
1585                         result.row.columnToIndex = &result.columnToIndex;
1586                     result.validRow = true;
1587                     result.nextMsg = getMessage();
1588 
1589                     activeResultSet = true;
1590 
1591                     return result;
1592                 case 'C':
1593                     // CommandComplete
1594                     string tag;
1595 
1596                     msg.readCString(tag);
1597 
1598                     auto s2 = lastIndexOf(tag, ' ');
1599                     if (s2 >= 0)
1600                     {
1601                         rowsAffected = to!ulong(tag[s2 + 1 .. $]);
1602                     }
1603 
1604                     goto receive;
1605                 case 'I':
1606                     // EmptyQueryResponse
1607                     throw new Exception("Query string is empty.");
1608                 case 's':
1609                     // PortalSuspended
1610                     throw new Exception("Command suspending is not supported.");
1611                 case 'Z':
1612                     // ReadyForQuery
1613                     result.nextMsg = msg;
1614                     return result;
1615                 case 'E':
1616                     // ErrorResponse
1617                     ResponseMessage response = handleResponseMessage(msg);
1618                     throw new ServerErrorException(response);
1619                 default:
1620                     // async notice, notification
1621                     goto receive;
1622             }
1623 
1624             assert(0);
1625         }
1626 
1627     public:
1628 
1629 
1630         /**
1631         Opens connection to server.
1632 
1633         Params:
1634         params = Associative array of string keys and values.
1635 
1636         Currently recognized parameters are:
1637         $(UL
1638             $(LI host - Host name or IP address of the server. Required.)
1639             $(LI port - Port number of the server. Defaults to 5432.)
1640             $(LI user - The database user. Required.)
1641             $(LI database - The database to connect to. Defaults to the user name.)
1642             $(LI options - Command-line arguments for the backend. (This is deprecated in favor of setting individual run-time parameters.))
1643         )
1644 
1645         In addition to the above, any run-time parameter that can be set at backend start time might be listed.
1646         Such settings will be applied during backend start (after parsing the command-line options if any).
1647         The values will act as session defaults.
1648 
1649         Examples:
1650         ---
1651         auto conn = new PGConnection([
1652             "host" : "localhost",
1653             "database" : "test",
1654             "user" : "postgres",
1655             "password" : "postgres"
1656         ]);
1657         ---
1658         */
1659         this(const string[string] params)
1660         {
1661             enforce("host" in params, new ParamException("Required parameter 'host' not found"));
1662             enforce("user" in params, new ParamException("Required parameter 'user' not found"));
1663 
1664             string[string] p = cast(string[string])params.dup;
1665 
1666             ushort port = "port" in params? parse!ushort(p["port"]) : 5432;
1667 
1668             version(Have_vibe_d_core)
1669             {
1670                 stream = new PGStream(new TCPConnectionWrapper(params["host"], port));
1671             }
1672             else
1673             {
1674                 stream = new PGStream(new TcpSocket);
1675                 stream.socket.connect(new InternetAddress(params["host"], port));
1676             }
1677             sendStartupMessage(params);
1678 
1679         receive:
1680 
1681     		Message msg = getMessage();
1682 
1683             switch (msg.type)
1684             {
1685                 case 'E', 'N':
1686                     // ErrorResponse, NoticeResponse
1687 
1688                     ResponseMessage response = handleResponseMessage(msg);
1689 
1690 				    if (msg.type == 'N')
1691                         goto receive;
1692 
1693                     throw new ServerErrorException(response);
1694                 case 'R':
1695                     // AuthenticationXXXX
1696                     enforce(msg.data.length >= 4);
1697 
1698                     int atype;
1699 
1700                     msg.read(atype);
1701 
1702                     switch (atype)
1703                     {
1704                         case 0:
1705                             // authentication successful, now wait for another messages
1706                             goto receive;
1707                         case 3:
1708                             // clear-text password is required
1709                             enforce("password" in params, new ParamException("Required parameter 'password' not found"));
1710                             enforce(msg.data.length == 4);
1711 
1712                             sendPasswordMessage(params["password"]);
1713 
1714                             goto receive;
1715                         case 5:
1716                             // MD5-hashed password is required, formatted as:
1717                             // "md5" + md5(md5(password + username) + salt)
1718                             // where md5() returns lowercase hex-string
1719                             enforce("password" in params, new ParamException("Required parameter 'password' not found"));
1720                             enforce(msg.data.length == 8);
1721 
1722                             char[3 + 32] password;
1723                             password[0 .. 3] = "md5";
1724                             password[3 .. $] = MD5toHex(MD5toHex(
1725                                 params["password"], params["user"]), msg.data[4 .. 8]);
1726 
1727                             sendPasswordMessage(to!string(password));
1728 
1729                             goto receive;
1730                         default:
1731                             // non supported authentication type, close connection
1732                             this.close();
1733                             throw new Exception("Unsupported authentication type");
1734                     }
1735 
1736                 case 'S':
1737                     // ParameterStatus
1738                     enforce(msg.data.length >= 2);
1739 
1740                     string pname, pvalue;
1741 
1742                     msg.readCString(pname);
1743                     msg.readCString(pvalue);
1744 
1745                     serverParams[pname] = pvalue;
1746 
1747                     goto receive;
1748 
1749                 case 'K':
1750                     // BackendKeyData
1751                     enforce(msg.data.length == 8);
1752 
1753                     msg.read(serverProcessID);
1754                     msg.read(serverSecretKey);
1755 
1756                     goto receive;
1757 
1758                 case 'Z':
1759                     // ReadyForQuery
1760                     enforce(msg.data.length == 1);
1761 
1762                     msg.read(cast(char)trStatus);
1763 
1764                     // check for validity
1765                     switch (trStatus)
1766                     {
1767                         case 'I', 'T', 'E': break;
1768                         default: throw new Exception("Invalid transaction status");
1769                     }
1770 
1771                     // connection is opened and now it's possible to send queries
1772                     reloadAllTypes();
1773                     return;
1774                 default:
1775                     // unknown message type, ignore it
1776                     goto receive;
1777             }
1778         }
1779 
1780         /// Closes current connection to the server.
1781         void close()
1782         {
1783             sendTerminateMessage();
1784             stream.socket.close();
1785         }
1786 
1787         /// Shorthand methods using temporary PGCommand. Semantics is the same as PGCommand's.
1788         ulong executeNonQuery(string query)
1789         {
1790             scope cmd = new PGCommand(this, query);
1791             return cmd.executeNonQuery();
1792         }
1793 
1794         /// ditto
1795         PGResultSet!Specs executeQuery(Specs...)(string query)
1796         {
1797             scope cmd = new PGCommand(this, query);
1798             return cmd.executeQuery!Specs();
1799         }
1800 
1801         /// ditto
1802         DBRow!Specs executeRow(Specs...)(string query, bool throwIfMoreRows = true)
1803         {
1804             scope cmd = new PGCommand(this, query);
1805             return cmd.executeRow!Specs(throwIfMoreRows);
1806         }
1807 
1808         /// ditto
1809         T executeScalar(T)(string query, bool throwIfMoreRows = true)
1810         {
1811             scope cmd = new PGCommand(this, query);
1812             return cmd.executeScalar!T(throwIfMoreRows);
1813         }
1814 
1815         void reloadArrayTypes()
1816         {
1817             auto cmd = new PGCommand(this, "SELECT oid, typelem FROM pg_type WHERE typcategory = 'A'");
1818             auto result = cmd.executeQuery!(uint, "arrayOid", uint, "elemOid");
1819             scope(exit) result.close;
1820 
1821             arrayTypes = null;
1822 
1823             foreach (row; result)
1824             {
1825                 arrayTypes[row.arrayOid] = row.elemOid;
1826             }
1827 
1828             arrayTypes.rehash;
1829         }
1830 
1831         void reloadCompositeTypes()
1832         {
1833             auto cmd = new PGCommand(this, "SELECT a.attrelid, a.atttypid FROM pg_attribute a JOIN pg_type t ON
1834                                      a.attrelid = t.typrelid WHERE a.attnum > 0 ORDER BY a.attrelid, a.attnum");
1835             auto result = cmd.executeQuery!(uint, "typeOid", uint, "memberOid");
1836             scope(exit) result.close;
1837 
1838             compositeTypes = null;
1839 
1840             uint lastOid = 0;
1841             uint[]* memberOids;
1842 
1843             foreach (row; result)
1844             {
1845                 if (row.typeOid != lastOid)
1846                 {
1847                     compositeTypes[lastOid = row.typeOid] = new uint[0];
1848                     memberOids = &compositeTypes[lastOid];
1849                 }
1850 
1851                 *memberOids ~= row.memberOid;
1852             }
1853 
1854             compositeTypes.rehash;
1855         }
1856 
1857         void reloadEnumTypes()
1858         {
1859             auto cmd = new PGCommand(this, "SELECT enumtypid, oid, enumlabel FROM pg_enum ORDER BY enumtypid, oid");
1860             auto result = cmd.executeQuery!(uint, "typeOid", uint, "valueOid", string, "valueLabel");
1861             scope(exit) result.close;
1862 
1863             enumTypes = null;
1864 
1865             uint lastOid = 0;
1866             string[uint]* enumValues;
1867 
1868             foreach (row; result)
1869             {
1870                 if (row.typeOid != lastOid)
1871                 {
1872                     if (lastOid > 0)
1873                         (*enumValues).rehash;
1874 
1875                     enumTypes[lastOid = row.typeOid] = null;
1876                     enumValues = &enumTypes[lastOid];
1877                 }
1878 
1879                 (*enumValues)[row.valueOid] = row.valueLabel;
1880             }
1881 
1882             if (lastOid > 0)
1883                 (*enumValues).rehash;
1884 
1885             enumTypes.rehash;
1886         }
1887 
1888         void reloadAllTypes()
1889         {
1890             // todo: make simpler type lists, since we need only oids of types (without their members)
1891             reloadArrayTypes();
1892             reloadCompositeTypes();
1893             reloadEnumTypes();
1894         }
1895 }
1896 
1897 /// Class representing single query parameter
1898 class PGParameter
1899 {
1900     private PGParameters params;
1901     immutable short index;
1902     immutable PGType type;
1903     private Variant _value;
1904 
1905     /// Value bound to this parameter
1906     @property Variant value()
1907     {
1908         return _value;
1909     }
1910     /// ditto
1911     @property Variant value(T)(T v)
1912     {
1913         params.changed = true;
1914         return _value = Variant(v);
1915     }
1916 
1917     private this(PGParameters params, short index, PGType type)
1918     {
1919         enforce(index > 0, new ParamException("Parameter's index must be > 0"));
1920         this.params = params;
1921         this.index = index;
1922         this.type = type;
1923     }
1924 }
1925 
1926 /// Collection of query parameters
1927 class PGParameters
1928 {
1929     private PGParameter[short] params;
1930     private PGCommand cmd;
1931     private bool changed;
1932 
1933     private int[] getOids()
1934     {
1935         short[] keys = params.keys;
1936         sort(keys);
1937 
1938         int[] oids = new int[params.length];
1939 
1940         foreach (size_t i, key; keys)
1941         {
1942             oids[i] = params[key].type;
1943         }
1944 
1945         return oids;
1946     }
1947 
1948     ///
1949     @property short length()
1950     {
1951         return cast(short)params.length;
1952     }
1953 
1954     private this(PGCommand cmd)
1955     {
1956         this.cmd = cmd;
1957     }
1958 
1959     /**
1960     Creates and returns new parameter.
1961     Examples:
1962     ---
1963     // without spaces between $ and number
1964     auto cmd = new PGCommand(conn, "INSERT INTO users (name, surname) VALUES ($ 1, $ 2)");
1965     cmd.parameters.add(1, PGType.TEXT).value = "John";
1966     cmd.parameters.add(2, PGType.TEXT).value = "Doe";
1967 
1968     assert(cmd.executeNonQuery == 1);
1969     ---
1970     */
1971     PGParameter add(short index, PGType type)
1972     {
1973         enforce(!cmd.prepared, "Can't add parameter to prepared statement.");
1974         changed = true;
1975         return params[index] = new PGParameter(this, index, type);
1976     }
1977 
1978     // todo: remove()
1979 
1980     PGParameter opIndex(short index)
1981     {
1982         return params[index];
1983     }
1984 
1985     int opApply(int delegate(ref PGParameter param) dg)
1986     {
1987         int result = 0;
1988 
1989         foreach (number; sort(params.keys))
1990         {
1991             result = dg(params[number]);
1992 
1993             if (result)
1994                 break;
1995         }
1996 
1997         return result;
1998     }
1999 }
2000 
2001 /// Array of fields returned by the server
2002 alias immutable(PGField)[] PGFields;
2003 
2004 /// Contains information about fields returned by the server
2005 struct PGField
2006 {
2007     /// The field name.
2008     string name;
2009     /// If the field can be identified as a column of a specific table, the object ID of the table; otherwise zero.
2010     uint tableOid;
2011     /// If the field can be identified as a column of a specific table, the attribute number of the column; otherwise zero.
2012     short index;
2013     /// The object ID of the field's data type.
2014     uint oid;
2015     /// The data type size (see pg_type.typlen). Note that negative values denote variable-width types.
2016     short typlen;
2017     /// The type modifier (see pg_attribute.atttypmod). The meaning of the modifier is type-specific.
2018     int modifier;
2019 }
2020 
2021 /// Class encapsulating prepared or non-prepared statements (commands).
2022 class PGCommand
2023 {
2024     private PGConnection conn;
2025     private string _query;
2026     private PGParameters params;
2027     private PGFields _fields = null;
2028     private string preparedName;
2029     private uint _lastInsertOid;
2030     private bool prepared;
2031 
2032     /// List of parameters bound to this command
2033     @property PGParameters parameters()
2034     {
2035         return params;
2036     }
2037 
2038     /// List of fields that will be returned from the server. Available after successful call to bind().
2039     @property PGFields fields()
2040     {
2041         return _fields;
2042     }
2043 
2044     /**
2045     Checks if this is query or non query command. Available after successful call to bind().
2046     Returns: true if server returns at least one field (column). Otherwise false.
2047     */
2048     @property bool isQuery()
2049     {
2050         enforce(_fields !is null, new Exception("bind() must be called first."));
2051         return _fields.length > 0;
2052     }
2053 
2054     /// Returns: true if command is currently prepared, otherwise false.
2055     @property bool isPrepared()
2056     {
2057         return prepared;
2058     }
2059 
2060     /// Query assigned to this command.
2061     @property string query()
2062     {
2063         return _query;
2064     }
2065     /// ditto
2066     @property string query(string query)
2067     {
2068         enforce(!prepared, "Can't change query for prepared statement.");
2069         return _query = query;
2070     }
2071 
2072     /// If table is with OIDs, it contains last inserted OID.
2073     @property uint lastInsertOid()
2074     {
2075         return _lastInsertOid;
2076     }
2077 
2078     this(PGConnection conn, string query = "")
2079     {
2080         this.conn = conn;
2081         _query = query;
2082         params = new PGParameters(this);
2083         _fields = new immutable(PGField)[0];
2084         preparedName = "";
2085         prepared = false;
2086     }
2087 
2088     /// Prepare this statement, i.e. cache query plan.
2089     void prepare()
2090     {
2091         enforce(!prepared, "This command is already prepared.");
2092         preparedName = conn.reservePrepared();
2093         conn.prepare(preparedName, _query, params);
2094         prepared = true;
2095         params.changed = true;
2096     }
2097 
2098     /// Unprepare this statement. Goes back to normal query planning.
2099     void unprepare()
2100     {
2101         enforce(prepared, "This command is not prepared.");
2102         conn.unprepare(preparedName);
2103         preparedName = "";
2104         prepared = false;
2105         params.changed = true;
2106     }
2107 
2108     /**
2109     Binds values to parameters and updates list of returned fields.
2110 
2111     This is normally done automatically, but it may be useful to check what fields
2112     would be returned from a query, before executing it.
2113     */
2114     void bind()
2115     {
2116         checkPrepared(false);
2117         _fields = conn.bind(preparedName, preparedName, params);
2118         params.changed = false;
2119     }
2120 
2121     private void checkPrepared(bool bind)
2122     {
2123         if (!prepared)
2124         {
2125             // use unnamed statement & portal
2126             conn.prepare("", _query, params);
2127             if (bind)
2128             {
2129                 _fields = conn.bind("", "", params);
2130                 params.changed = false;
2131             }
2132         }
2133     }
2134 
2135     private void checkBound()
2136     {
2137         if (params.changed)
2138             bind();
2139     }
2140 
2141     /**
2142     Executes a non query command, i.e. query which doesn't return any rows. Commonly used with
2143     data manipulation commands, such as INSERT, UPDATE and DELETE.
2144     Examples:
2145     ---
2146     auto cmd = new PGCommand(conn, "DELETE * FROM table");
2147     auto deletedRows = cmd.executeNonQuery;
2148     cmd.query = "UPDATE table SET quantity = 1 WHERE price > 100";
2149     auto updatedRows = cmd.executeNonQuery;
2150     cmd.query = "INSERT INTO table VALUES(1, 50)";
2151     assert(cmd.executeNonQuery == 1);
2152     ---
2153     Returns: Number of affected rows.
2154     */
2155     ulong executeNonQuery()
2156     {
2157         checkPrepared(true);
2158         checkBound();
2159         return conn.executeNonQuery(preparedName, _lastInsertOid);
2160     }
2161 
2162     /**
2163     Executes query which returns row sets, such as SELECT command.
2164     Params:
2165     bufferedRows = Number of rows that may be allocated at the same time.
2166     Returns: InputRange of DBRow!Specs.
2167     */
2168     PGResultSet!Specs executeQuery(Specs...)()
2169     {
2170         checkPrepared(true);
2171         checkBound();
2172         return conn.executeQuery!Specs(preparedName, _fields);
2173     }
2174 
2175     /**
2176     Executes query and returns only first row of the result.
2177     Params:
2178     throwIfMoreRows = If true, throws Exception when result contains more than one row.
2179     Examples:
2180     ---
2181     auto cmd = new PGCommand(conn, "SELECT 1, 'abc'");
2182     auto row1 = cmd.executeRow!(int, string); // returns DBRow!(int, string)
2183     assert(is(typeof(i[0]) == int) && is(typeof(i[1]) == string));
2184     auto row2 = cmd.executeRow; // returns DBRow!(Variant[])
2185     ---
2186     Throws: Exception if result doesn't contain any rows or field count do not match.
2187     Throws: Exception if result contains more than one row when throwIfMoreRows is true.
2188     */
2189     DBRow!Specs executeRow(Specs...)(bool throwIfMoreRows = true)
2190     {
2191         auto result = executeQuery!Specs();
2192         scope(exit) result.close();
2193         enforce(!result.empty(), "Result doesn't contain any rows.");
2194         auto row = result.front();
2195         if (throwIfMoreRows)
2196         {
2197             result.popFront();
2198             enforce(result.empty(), "Result contains more than one row.");
2199         }
2200         return row;
2201     }
2202 
2203     /**
2204     Executes query returning exactly one row and field. By default, returns Variant type.
2205     Params:
2206     throwIfMoreRows = If true, throws Exception when result contains more than one row.
2207     Examples:
2208     ---
2209     auto cmd = new PGCommand(conn, "SELECT 1");
2210     auto i = cmd.executeScalar!int; // returns int
2211     assert(is(typeof(i) == int));
2212     auto v = cmd.executeScalar; // returns Variant
2213     ---
2214     Throws: Exception if result doesn't contain any rows or if it contains more than one field.
2215     Throws: Exception if result contains more than one row when throwIfMoreRows is true.
2216     */
2217     T executeScalar(T = Variant)(bool throwIfMoreRows = true)
2218     {
2219         auto result = executeQuery!T();
2220         scope(exit) result.close();
2221         enforce(!result.empty(), "Result doesn't contain any rows.");
2222         T row = result.front();
2223         if (throwIfMoreRows)
2224         {
2225             result.popFront();
2226             enforce(result.empty(), "Result contains more than one row.");
2227         }
2228         return row;
2229     }
2230 }
2231 
2232 /// Input range of DBRow!Specs
2233 class PGResultSet(Specs...)
2234 {
2235     alias DBRow!Specs Row;
2236     alias Row delegate(ref Message msg, ref PGFields fields) FetchRowDelegate;
2237 
2238     private FetchRowDelegate fetchRow;
2239     private PGConnection conn;
2240     private PGFields fields;
2241     private Row row;
2242     private bool validRow;
2243     private Message nextMsg;
2244     private size_t[][string] columnMap;
2245 
2246     private this(PGConnection conn, ref PGFields fields, FetchRowDelegate dg)
2247     {
2248         this.conn = conn;
2249         this.fields = fields;
2250         this.fetchRow = dg;
2251         validRow = false;
2252 
2253         foreach (i, field; fields)
2254         {
2255             size_t[]* indices = field.name in columnMap;
2256 
2257             if (indices)
2258                 *indices ~= i;
2259             else
2260                 columnMap[field.name] = [i];
2261         }
2262     }
2263 
2264     private size_t columnToIndex(string column, size_t index)
2265     {
2266         size_t[]* indices = column in columnMap;
2267         enforce(indices, "Unknown column name");
2268         return (*indices)[index];
2269     }
2270 
2271     pure nothrow bool empty()
2272     {
2273         return !validRow;
2274     }
2275 
2276     void popFront()
2277     {
2278         if (nextMsg.type == 'D')
2279         {
2280             row = fetchRow(nextMsg, fields);
2281             static if (!Row.hasStaticLength)
2282                 row.columnToIndex = &columnToIndex;
2283             validRow = true;
2284             nextMsg = conn.getMessage();
2285         }
2286         else
2287             validRow = false;
2288     }
2289 
2290     pure nothrow Row front()
2291     {
2292         return row;
2293     }
2294 
2295     /// Closes current result set. It must be closed before issuing another query on the same connection.
2296     void close()
2297     {
2298         if (nextMsg.type != 'Z')
2299             conn.finalizeQuery();
2300         conn.activeResultSet = false;
2301     }
2302 
2303     int opApply(int delegate(ref Row row) dg)
2304     {
2305         int result = 0;
2306 
2307         while (!empty)
2308         {
2309             result = dg(row);
2310             popFront;
2311 
2312             if (result)
2313                 break;
2314         }
2315 
2316         return result;
2317     }
2318 
2319     int opApply(int delegate(ref size_t i, ref Row row) dg)
2320     {
2321         int result = 0;
2322         size_t i;
2323 
2324         while (!empty)
2325         {
2326             result = dg(i, row);
2327             popFront;
2328             i++;
2329 
2330             if (result)
2331                 break;
2332         }
2333 
2334         return result;
2335     }
2336 }
2337 
2338 
2339 version(Have_vibe_d_core)
2340 {
2341     import vibe.core.connectionpool;
2342 
2343     // wrap vibe.d TCPConnection class with the scope of reopening the tcp connection if closed 
2344     // by PostgreSQL it for some reason.
2345     // see https://forum.rejectedsoftware.com/groups/rejectedsoftware.vibed/thread/44097/
2346     private class TCPConnectionWrapper 
2347     {
2348         this(string host, ushort port, string bindInterface = null, ushort bindPort = cast(ushort)0u)
2349         {
2350             this.host = host;
2351             this.port = port;
2352             this.bindInterface = bindInterface;
2353             this.bindPort = bindPort;
2354 
2355             connect();
2356         }
2357 
2358         void close(){ tcpConnection.close(); }
2359 
2360         void write(const(ubyte[]) bytes)
2361         {
2362             // Vibe:  "... If connected is false, writing to the connection will trigger an exception ..."
2363             if (!tcpConnection.connected)
2364             {
2365                 // Vibe: " ... Note that close must always be called, even if the remote has already closed the
2366                 //             connection. Failure to do so will result in resource and memory leakage.
2367                 tcpConnection.close();
2368                 connect();
2369             }
2370             tcpConnection.write(bytes);
2371         }
2372 
2373         void read(ubyte[] dst)
2374         {
2375             if (!tcpConnection.connected)
2376             {
2377                 tcpConnection.close();
2378                 connect();
2379             }
2380             if (!tcpConnection.empty)
2381             {
2382                 tcpConnection.read(dst);
2383             }
2384         }
2385 
2386         private
2387         {
2388             void connect()
2389             {
2390                 tcpConnection = connectTCP(host, port, bindInterface, bindPort);
2391             }
2392 
2393             string host;
2394             string bindInterface;
2395             ushort port;
2396             ushort bindPort;
2397 
2398             TCPConnection tcpConnection;
2399         }
2400     }
2401 
2402     class PostgresDB {
2403         private {
2404             string[string] m_params;
2405             ConnectionPool!PGConnection m_pool;
2406         }
2407 
2408         this(string[string] conn_params)
2409         {
2410             m_params = conn_params.dup;
2411             m_pool = new ConnectionPool!PGConnection(&createConnection);
2412         }
2413 
2414         auto lockConnection() { return m_pool.lockConnection(); }
2415 
2416         private PGConnection createConnection()
2417         {
2418             return new PGConnection(m_params);
2419         }
2420     }
2421 }
2422 else
2423 {
2424 	class PostgresDB() {
2425 		static assert(false,
2426 		              "The 'PostgresDB' connection pool requires Vibe.d and therefore "~
2427 		              "must be used with -version=Have_vibe_d_core"
2428 		              );
2429 	}
2430 }
2431