1 /**
2 PostgreSQL client implementation.
3 
4 Features:
5 $(UL
6     $(LI Standalone (does not depend on libpq))
7     $(LI Binary formatting (avoids parsing overhead))
8     $(LI Prepared statements)
9     $(LI Parametrized queries (partially working))
10     $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/datatype-enum.html, Enums))
11     $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/arrays.html, Arrays))
12     $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/rowtypes.html, Composite types))
13 )
14 
15 TODOs:
16 $(UL
17     $(LI Redesign parametrized queries)
18     $(LI BigInt/Numeric types support)
19     $(LI Geometric types support)
20     $(LI Network types support)
21     $(LI Bit string types support)
22     $(LI UUID type support)
23     $(LI XML types support)
24     $(LI Transaction support)
25     $(LI Asynchronous notifications)
26     $(LI Better memory management)
27     $(LI More friendly PGFields)
28 )
29 
30 Bugs:
31 $(UL
32     $(LI Support only cleartext and MD5 $(LINK2 http://www.postgresql.org/docs/9.0/static/auth-methods.html, authentication))
33     $(LI Unfinished parameter handling)
34     $(LI interval is converted to Duration, which does not support months)
35 )
36 
37 $(B Data type mapping:)
38 
39 $(TABLE
40     $(TR $(TH PostgreSQL type) $(TH Aliases) $(TH Default D type) $(TH D type mapping possibilities))
41     $(TR $(TD smallint) $(TD int2) $(TD short) <td rowspan="19">Any type convertible from default D type</td>)
42     $(TR $(TD integer) $(TD int4) $(TD int))
43     $(TR $(TD bigint) $(TD int8) $(TD long))
44     $(TR $(TD oid) $(TD reg***) $(TD uint))
45     $(TR $(TD decimal) $(TD numeric) $(TD not yet supported))
46     $(TR $(TD real) $(TD float4) $(TD float))
47     $(TR $(TD double precision) $(TD float8) $(TD double))
48     $(TR $(TD character varying(n)) $(TD varchar(n)) $(TD string))
49     $(TR $(TD character(n)) $(TD char(n)) $(TD string))
50     $(TR $(TD text) $(TD) $(TD string))
51     $(TR $(TD "char") $(TD) $(TD char))
52     $(TR $(TD bytea) $(TD) $(TD ubyte[]))
53     $(TR $(TD timestamp without time zone) $(TD timestamp) $(TD DateTime))
54     $(TR $(TD timestamp with time zone) $(TD timestamptz) $(TD SysTime))
55     $(TR $(TD date) $(TD) $(TD Date))
56     $(TR $(TD time without time zone) $(TD time) $(TD TimeOfDay))
57     $(TR $(TD time with time zone) $(TD timetz) $(TD SysTime))
58     $(TR $(TD interval) $(TD) $(TD Duration (without months and years)))
59     $(TR $(TD boolean) $(TD bool) $(TD bool))
60     $(TR $(TD enums) $(TD) $(TD string) $(TD enum))
61     $(TR $(TD arrays) $(TD) $(TD Variant[]) $(TD dynamic/static array with compatible element type))
62     $(TR $(TD composites) $(TD record, row) $(TD Variant[]) $(TD dynamic/static array, struct or Tuple))
63 )
64 
65 Examples:
66 with vibe.d use -version=Have_vibe_d_core and use a ConnectionPool (PostgresDB Object & lockConnection)
67 ---
68 
69 	auto pdb = new PostgresDB([
70 		"host" : "192.168.2.50",
71 		"database" : "postgres",
72 		"user" : "postgres",
73 		"password" : ""
74 	]);
75 	auto conn = pdb.lockConnection();
76 
77 	auto cmd = new PGCommand(conn, "SELECT typname, typlen FROM pg_type");
78 	auto result = cmd.executeQuery;
79 
80 	try
81 	{
82 		foreach (row; result)
83 		{
84 			writeln(row["typname"], ", ", row[1]);
85 		}
86 	}
87 	finally
88 	{
89 		result.close;
90 	}
91 
92 ---
93 without vibe.d you can use std sockets with PGConnection object
94 
95 ---
96 import std.stdio;
97 import ddb.postgres;
98 
99 int main(string[] argv)
100 {
101     auto conn = new PGConnection([
102         "host" : "localhost",
103         "database" : "test",
104         "user" : "postgres",
105         "password" : "postgres"
106     ]);
107 
108     scope(exit) conn.close;
109 
110     auto cmd = new PGCommand(conn, "SELECT typname, typlen FROM pg_type");
111     auto result = cmd.executeQuery;
112 
113     try
114     {
115         foreach (row; result)
116         {
117             writeln(row[0], ", ", row[1]);
118         }
119     }
120     finally
121     {
122         result.close;
123     }
124 
125     return 0;
126 }
127 ---
128 
129 Copyright: Copyright Piotr Szturmaj 2011-.
130 License: $(LINK2 http://boost.org/LICENSE_1_0.txt, Boost License 1.0).
131 Authors: Piotr Szturmaj
132 *//*
133 Documentation contains portions copied from PostgreSQL manual (mainly field information and
134 connection parameters description). License:
135 
136 Portions Copyright (c) 1996-2010, The PostgreSQL Global Development Group
137 Portions Copyright (c) 1994, The Regents of the University of California
138 
139 Permission to use, copy, modify, and distribute this software and its documentation for any purpose,
140 without fee, and without a written agreement is hereby granted, provided that the above copyright
141 notice and this paragraph and the following two paragraphs appear in all copies.
142 
143 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT,
144 INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS,
145 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY
146 OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
147 
148 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
149 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
150 PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
151 CALIFORNIA HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS,
152 OR MODIFICATIONS.
153 */
154 module ddb.postgres;
155 
156 version (Have_vibe_d_core)
157 {
158     import vibe.core.net;
159     import vibe.core.stream;
160 }
161 else
162 {
163     import std.socket;
164 }
165 import std.bitmanip;
166 import std.exception;
167 import std.conv;
168 import std.traits;
169 import std.typecons;
170 import std..string;
171 import std.digest.md;
172 import core.bitop;
173 import std.variant;
174 import std.algorithm;
175 import std.stdio;
176 import std.datetime;
177 import std.uuid;
178 public import ddb.db;
179 
180 private:
181 
182 const PGEpochDate = Date(2000, 1, 1);
183 const PGEpochDay = PGEpochDate.dayOfGregorianCal;
184 const PGEpochTime = TimeOfDay(0, 0, 0);
185 const PGEpochDateTime = DateTime(2000, 1, 1, 0, 0, 0);
186 
187 class PGStream
188 {
189     version (Have_vibe_d_core)
190     {
191         private TCPConnectionWrapper m_socket;
192 
193         @property TCPConnectionWrapper socket()
194         {
195             return m_socket;
196         }
197 
198         this(TCPConnectionWrapper socket)
199         {
200             m_socket = socket;
201         }
202     }
203     else
204     {
205         private Socket m_socket;
206 
207         @property Socket socket()
208         {
209             return m_socket;
210         }
211 
212         this(Socket socket)
213         {
214             m_socket = socket;
215         }
216     }
217 
218     protected void read(ubyte[] buffer)
219     {
220         version(Have_vibe_d_core)
221         {
222             m_socket.read(buffer);
223         }
224         else
225         {
226             if (buffer.length > 0)
227             {
228                 m_socket.receive(buffer);
229             }
230         }
231     }
232 
233     void write(ubyte[] x)
234     {
235         version(Have_vibe_d_core)
236         {
237             m_socket.write(x);
238         }
239         else
240         {
241             if (x.length > 0)
242             {
243                 m_socket.send(x);
244             }
245         }
246     }
247 
248 	void write(ubyte x)
249 	{
250 		write(nativeToBigEndian(x)); // ubyte[]
251 	}
252 
253     void write(short x)
254 	{
255 		write(nativeToBigEndian(x)); // ubyte[]
256 	}
257 
258     void write(int x)
259 	{
260 		write(nativeToBigEndian(x)); // ubyte[]
261 	}
262 
263     void write(long x)
264     {
265 		write(nativeToBigEndian(x));
266 	}
267 
268     void write(float x)
269     {
270 		write(nativeToBigEndian(x)); // ubyte[]
271     }
272 
273     void write(double x)
274     {
275 		write(nativeToBigEndian(x));
276 	}
277 
278     void writeString(string x)
279     {
280         ubyte[] ub = cast(ubyte[])(x);
281         write(ub);
282     }
283 
284     void writeCString(string x)
285     {
286         writeString(x);
287         write('\0');
288     }
289 
290     void writeCString(char[] x)
291     {
292         write(cast(ubyte[])x);
293         write('\0');
294     }
295 
296     void write(const ref Date x)
297     {
298         write(cast(int)(x.dayOfGregorianCal - PGEpochDay));
299     }
300 
301     void write(Date x)
302     {
303         write(cast(int)(x.dayOfGregorianCal - PGEpochDay));
304     }
305 
306     void write(const ref TimeOfDay x)
307 	{
308 		write(cast(int)((x - PGEpochTime).total!"usecs"));
309     }
310 
311     void write(const ref DateTime x) // timestamp
312 	{
313 		write(cast(int)((x - PGEpochDateTime).total!"usecs"));
314     }
315 
316     void write(DateTime x) // timestamp
317 	{
318 		write(cast(int)((x - PGEpochDateTime).total!"usecs"));
319     }
320 
321     void write(const ref SysTime x) // timestamptz
322 	{
323 		write(cast(int)((x - SysTime(PGEpochDateTime, UTC())).total!"usecs"));
324     }
325 
326     // BUG: Does not support months
327     void write(const ref core.time.Duration x) // interval
328 	{
329 		int months = cast(int)(x.split!"weeks".weeks/28);
330 		int days = cast(int)x.split!"days".days;
331         long usecs = x.total!"usecs" - convert!("days", "usecs")(days);
332 
333         write(usecs);
334         write(days);
335 		write(months);
336 	}
337 
338     void writeTimeTz(const ref SysTime x) // timetz
339 	{
340 		TimeOfDay t = cast(TimeOfDay)x;
341         write(t);
342 		write(cast(int)0);
343 	}
344 }
345 
346 char[32] MD5toHex(T...)(in T data)
347 {
348     return md5Of(data).toHexString!(LetterCase.lower);
349 }
350 
351 struct Message
352 {
353     PGConnection conn;
354     char type;
355     ubyte[] data;
356 
357     private size_t position = 0;
358 
359     T read(T, Params...)(Params p)
360     {
361         T value;
362         read(value, p);
363         return value;
364     }
365 
366     void read()(out char x)
367     {
368         x = data[position++];
369     }
370 
371 
372     void read(Int)(out Int x) if((isIntegral!Int || isFloatingPoint!Int) && Int.sizeof > 1)
373     {
374         ubyte[Int.sizeof] buf;
375         buf[] = data[position..position+Int.sizeof];
376         x = bigEndianToNative!Int(buf);
377         position += Int.sizeof;
378     }
379 
380     string readCString()
381     {
382         string x;
383         readCString(x);
384         return x;
385     }
386 
387     void readCString(out string x)
388     {
389         ubyte* p = data.ptr + position;
390 
391         while (*p > 0)
392             p++;
393 		x = cast(string)data[position .. cast(size_t)(p - data.ptr)];
394         position = cast(size_t)(p - data.ptr + 1);
395     }
396 
397     string readString(int len)
398     {
399         string x;
400         readString(x, len);
401         return x;
402     }
403 
404     void readString(out string x, int len)
405 	{
406 		x = cast(string)(data[position .. position + len]);
407 		position += len;
408 	}
409 
410     void read()(out bool x)
411     {
412         x = cast(bool)data[position++];
413     }
414 
415     void read()(out ubyte[] x, int len)
416     {
417         enforce(position + len <= data.length);
418         x = data[position .. position + len];
419         position += len;
420     }
421 
422     void read()(out UUID u) // uuid
423     {
424         ubyte[16] uuidData = data[position .. position + 16];
425         position += 16;
426         u = UUID(uuidData);
427     }
428 
429     void read()(out Date x) // date
430     {
431         int days = read!int; // number of days since 1 Jan 2000
432         x = PGEpochDate + dur!"days"(days);
433     }
434 
435     void read()(out TimeOfDay x) // time
436     {
437         long usecs = read!long;
438         x = PGEpochTime + dur!"usecs"(usecs);
439     }
440 
441     void read()(out DateTime x) // timestamp
442     {
443         long usecs = read!long;
444         x = PGEpochDateTime + dur!"usecs"(usecs);
445     }
446 
447     void read()(out SysTime x) // timestamptz
448     {
449         long usecs = read!long;
450         x = SysTime(PGEpochDateTime + dur!"usecs"(usecs), UTC());
451         x.timezone = LocalTime();
452     }
453 
454     // BUG: Does not support months
455     void read()(out core.time.Duration x) // interval
456     {
457         long usecs = read!long;
458         int days = read!int;
459         int months = read!int;
460 
461         x = dur!"days"(days) + dur!"usecs"(usecs);
462     }
463 
464     SysTime readTimeTz() // timetz
465     {
466         TimeOfDay time = read!TimeOfDay;
467         int zone = read!int / 60; // originally in seconds, convert it to minutes
468         Duration duration = dur!"minutes"(zone);
469         auto stz = new immutable SimpleTimeZone(duration);
470         return SysTime(DateTime(Date(0, 1, 1), time), stz);
471     }
472 
473     T readComposite(T)()
474     {
475         alias DBRow!T Record;
476 
477         static if (Record.hasStaticLength)
478         {
479             alias Record.fieldTypes fieldTypes;
480 
481             static string genFieldAssigns() // CTFE
482             {
483                 string s = "";
484 
485                 foreach (i; 0 .. fieldTypes.length)
486                 {
487                     s ~= "read(fieldOid);\n";
488                     s ~= "read(fieldLen);\n";
489                     s ~= "if (fieldLen == -1)\n";
490                     s ~= text("record.setNull!(", i, ");\n");
491                     s ~= "else\n";
492                     s ~= text("record.set!(fieldTypes[", i, "], ", i, ")(",
493                               "readBaseType!(fieldTypes[", i, "])(fieldOid, fieldLen)",
494                               ");\n");
495                     // text() doesn't work with -inline option, CTFE bug
496                 }
497 
498                 return s;
499             }
500         }
501 
502         Record record;
503 
504         int fieldCount, fieldLen;
505         uint fieldOid;
506 
507         read(fieldCount);
508 
509         static if (Record.hasStaticLength)
510             mixin(genFieldAssigns);
511         else
512         {
513             record.setLength(fieldCount);
514 
515             foreach (i; 0 .. fieldCount)
516             {
517                 read(fieldOid);
518                 read(fieldLen);
519 
520                 if (fieldLen == -1)
521                     record.setNull(i);
522                 else
523                     record[i] = readBaseType!(Record.ElemType)(fieldOid, fieldLen);
524             }
525         }
526 
527         return record.base;
528     }
529 	mixin template elmnt(U : U[])
530 	{
531 		alias U ElemType;
532 	}
533     private AT readDimension(AT)(int[] lengths, uint elementOid, int dim)
534     {
535 
536         mixin elmnt!AT;
537 
538         int length = lengths[dim];
539 
540         AT array;
541         static if (isDynamicArray!AT)
542             array.length = length;
543 
544         int fieldLen;
545 
546         foreach(i; 0 .. length)
547         {
548             static if (isArray!ElemType && !isSomeString!ElemType)
549                 array[i] = readDimension!ElemType(lengths, elementOid, dim + 1);
550             else
551             {
552                 static if (isNullable!ElemType)
553                     alias nullableTarget!ElemType E;
554                 else
555                     alias ElemType E;
556 
557                 read(fieldLen);
558                 if (fieldLen == -1)
559                 {
560                     static if (isNullable!ElemType || isSomeString!ElemType)
561                         array[i] = null;
562                     else
563                         throw new Exception("Can't set NULL value to non nullable type");
564                 }
565                 else
566                     array[i] = readBaseType!E(elementOid, fieldLen);
567             }
568         }
569 
570         return array;
571     }
572 
573     T readArray(T)()
574         if (isArray!T)
575     {
576         alias multiArrayElemType!T U;
577 
578         // todo: more validation, better lowerBounds support
579         int dims, hasNulls;
580         uint elementOid;
581         int[] lengths, lowerBounds;
582 
583         read(dims);
584         read(hasNulls); // 0 or 1
585         read(elementOid);
586 
587         if (dims == 0)
588             return T.init;
589 
590         enforce(arrayDimensions!T == dims, "Dimensions of arrays do not match");
591         static if (!isNullable!U && !isSomeString!U)
592             enforce(!hasNulls, "PostgreSQL returned NULLs but array elements are not Nullable");
593 
594         lengths.length = lowerBounds.length = dims;
595 
596         int elementCount = 1;
597 
598         foreach(i; 0 .. dims)
599         {
600             int len;
601 
602             read(len);
603             read(lowerBounds[i]);
604             lengths[i] = len;
605 
606             elementCount *= len;
607         }
608 
609         T array = readDimension!T(lengths, elementOid, 0);
610 
611         return array;
612     }
613 
614     T readEnum(T)(int len)
615     {
616         string genCases() // CTFE
617         {
618             string s;
619 
620             foreach (name; __traits(allMembers, T))
621             {
622                 s ~= text(`case "`, name, `": return T.`, name, `;`);
623             }
624 
625             return s;
626         }
627 
628         string enumMember = readString(len);
629 
630         switch (enumMember)
631         {
632             mixin(genCases);
633             default: throw new ConvException("Can't set enum value '" ~ enumMember ~ "' to enum type " ~ T.stringof);
634         }
635     }
636 
637     T readBaseType(T)(uint oid, int len = 0)
638     {
639         auto convError(T)()
640         {
641             string* type = oid in baseTypes;
642             return new ConvException("Can't convert PostgreSQL's type " ~ (type ? *type : to!string(oid)) ~ " to " ~ T.stringof);
643         }
644 
645         switch (oid)
646         {
647             case 16: // bool
648                 static if (isConvertible!(T, bool))
649                     return _to!T(read!bool);
650                 else
651                     throw convError!T();
652             case 26, 24, 2202, 2203, 2204, 2205, 2206, 3734, 3769: // oid and reg*** aliases
653                 static if (isConvertible!(T, uint))
654                     return _to!T(read!uint);
655                 else
656                     throw convError!T();
657             case 21: // int2
658                 static if (isConvertible!(T, short))
659                     return _to!T(read!short);
660                 else
661                     throw convError!T();
662             case 23: // int4
663                 static if (isConvertible!(T, int))
664                     return _to!T(read!int);
665                 else
666                     throw convError!T();
667             case 20: // int8
668                 static if (isConvertible!(T, long))
669                     return _to!T(read!long);
670                 else
671                     throw convError!T();
672             case 700: // float4
673                 static if (isConvertible!(T, float))
674                     return _to!T(read!float);
675                 else
676                     throw convError!T();
677             case 701: // float8
678                 static if (isConvertible!(T, double))
679                     return _to!T(read!double);
680                 else
681                     throw convError!T();
682             case 1042, 1043, 25, 19, 705: // bpchar, varchar, text, name, unknown
683                 static if (isConvertible!(T, string))
684                     return _to!T(readString(len));
685                 else
686                     throw convError!T();
687             case 17: // bytea
688                 static if (isConvertible!(T, ubyte[]))
689                     return _to!T(read!(ubyte[])(len));
690                 else
691                     throw convError!T();
692             case 2950: // UUID
693                 static if(isConvertible!(T, UUID))
694                     return _to!T(read!UUID());
695                 else
696                     throw convError!T();
697             case 18: // "char"
698                 static if (isConvertible!(T, char))
699                     return _to!T(read!char);
700                 else
701                     throw convError!T();
702             case 1082: // date
703                 static if (isConvertible!(T, Date))
704                     return _to!T(read!Date);
705                 else
706                     throw convError!T();
707             case 1083: // time
708                 static if (isConvertible!(T, TimeOfDay))
709                     return _to!T(read!TimeOfDay);
710                 else
711                     throw convError!T();
712             case 1114: // timestamp
713                 static if (isConvertible!(T, DateTime))
714                     return _to!T(read!DateTime);
715                 else
716                     throw convError!T();
717             case 1184: // timestamptz
718                 static if (isConvertible!(T, SysTime))
719                     return _to!T(read!SysTime);
720                 else
721                     throw convError!T();
722             case 1186: // interval
723                 static if (isConvertible!(T, core.time.Duration))
724                     return _to!T(read!(core.time.Duration));
725                 else
726                     throw convError!T();
727             case 1266: // timetz
728                 static if (isConvertible!(T, SysTime))
729                     return _to!T(readTimeTz);
730                 else
731                     throw convError!T();
732             case 2249: // record and other composite types
733                 static if (isVariantN!T && T.allowed!(Variant[]))
734                     return T(readComposite!(Variant[]));
735                 else
736                     return readComposite!T;
737             case 2287: // _record and other arrays
738                 static if (isArray!T && !isSomeString!T)
739                     return readArray!T;
740                 else static if (isVariantN!T && T.allowed!(Variant[]))
741                     return T(readArray!(Variant[]));
742                 else
743                     throw convError!T();
744             case 114: //JSON
745                 static if (isConvertible!(T, string))
746                     return _to!T(readString(len));
747                 else
748                     throw convError!T();
749             default:
750                 if (oid in conn.arrayTypes)
751                     goto case 2287;
752                 else if (oid in conn.compositeTypes)
753                     goto case 2249;
754                 else if (oid in conn.enumTypes)
755                 {
756                     static if (is(T == enum))
757                         return readEnum!T(len);
758                     else static if (isConvertible!(T, string))
759                         return _to!T(readString(len));
760                     else
761                         throw convError!T();
762                 }
763         }
764 
765         throw convError!T();
766     }
767 }
768 
769 // workaround, because std.conv currently doesn't support VariantN
770 template _to(T)
771 {
772     static if (isVariantN!T)
773         T _to(S)(S value) { T t = value; return t; }
774     else
775         T _to(A...)(A args) { return std.conv.to!T(args); }
776 }
777 
778 template isConvertible(T, S)
779 {
780     static if (__traits(compiles, { S s; _to!T(s); }) || (isVariantN!T && T.allowed!S))
781         enum isConvertible = true;
782     else
783         enum isConvertible = false;
784 }
785 
786 template arrayDimensions(T : T[])
787 {
788 	static if (isArray!T && !isSomeString!T)
789 		enum arrayDimensions = arrayDimensions!T + 1;
790 	else
791 		enum arrayDimensions = 1;
792 }
793 
794 template arrayDimensions(T)
795 {
796 		enum arrayDimensions = 0;
797 }
798 
799 template multiArrayElemType(T : T[])
800 {
801     static if (isArray!T && !isSomeString!T)
802         alias multiArrayElemType!T multiArrayElemType;
803     else
804         alias T multiArrayElemType;
805 }
806 
807 template multiArrayElemType(T)
808 {
809 	alias T multiArrayElemType;
810 }
811 
812 static assert(arrayDimensions!(int) == 0);
813 static assert(arrayDimensions!(int[]) == 1);
814 static assert(arrayDimensions!(int[][]) == 2);
815 static assert(arrayDimensions!(int[][][]) == 3);
816 
817 enum TransactionStatus : char { OutsideTransaction = 'I', InsideTransaction = 'T', InsideFailedTransaction = 'E' };
818 
819 enum string[int] baseTypes = [
820     // boolean types
821     16 : "bool",
822     // bytea types
823     17 : "bytea",
824     // character types
825     18 : `"char"`, // "char" - 1 byte internal type
826     1042 : "bpchar", // char(n) - blank padded
827     1043 : "varchar",
828     25 : "text",
829     19 : "name",
830     // numeric types
831     21 : "int2",
832     23 : "int4",
833     20 : "int8",
834     700 : "float4",
835     701 : "float8",
836     1700 : "numeric"
837 ];
838 
839 public:
840 
841 enum PGType : int
842 {
843     OID = 26,
844     NAME = 19,
845     REGPROC = 24,
846     BOOLEAN = 16,
847     BYTEA = 17,
848     CHAR = 18, // 1 byte "char", used internally in PostgreSQL
849     BPCHAR = 1042, // Blank Padded char(n), fixed size
850     VARCHAR = 1043,
851     TEXT = 25,
852     INT2 = 21,
853     INT4 = 23,
854     INT8 = 20,
855     FLOAT4 = 700,
856     FLOAT8 = 701,
857 
858     // reference https://github.com/lpsmith/postgresql-simple/blob/master/src/Database/PostgreSQL/Simple/TypeInfo/Static.hs#L74
859     DATE = 1082,
860     TIME = 1083,
861     TIMESTAMP = 1114,
862     TIMESTAMPTZ = 1184,
863     INTERVAL = 1186,
864     TIMETZ = 1266,
865 
866     JSON = 114,
867     JSONARRAY = 199
868 };
869 
870 class ParamException : Exception
871 {
872     this(string msg, string fn = __FILE__, size_t ln = __LINE__) @safe pure nothrow
873     {
874         super(msg, fn, ln);
875     }
876 }
877 
878 /// Exception thrown on server error
879 class ServerErrorException: Exception
880 {
881     /// Contains information about this _error. Aliased to this.
882     ResponseMessage error;
883     alias error this;
884 
885     this(string msg, string fn = __FILE__, size_t ln = __LINE__) @safe pure nothrow
886     {
887         super(msg, fn, ln);
888     }
889 
890     this(ResponseMessage error, string fn = __FILE__, size_t ln = __LINE__)
891     {
892         super(error.toString(), fn, ln);
893         this.error = error;
894     }
895 }
896 
897 /**
898 Class encapsulating errors and notices.
899 
900 This class provides access to fields of ErrorResponse and NoticeResponse
901 sent by the server. More information about these fields can be found
902 $(LINK2 http://www.postgresql.org/docs/9.0/static/protocol-error-fields.html,here).
903 */
904 class ResponseMessage
905 {
906     private string[char] fields;
907 
908     private string getOptional(char type)
909     {
910         string* p = type in fields;
911         return p ? *p : "";
912     }
913 
914     /// Message fields
915     @property string severity()
916     {
917         return fields['S'];
918     }
919 
920     /// ditto
921     @property string code()
922     {
923         return fields['C'];
924     }
925 
926     /// ditto
927     @property string message()
928     {
929         return fields['M'];
930     }
931 
932     /// ditto
933     @property string detail()
934     {
935         return getOptional('D');
936     }
937 
938     /// ditto
939     @property string hint()
940     {
941         return getOptional('H');
942     }
943 
944     /// ditto
945     @property string position()
946     {
947         return getOptional('P');
948     }
949 
950     /// ditto
951     @property string internalPosition()
952     {
953         return getOptional('p');
954     }
955 
956     /// ditto
957     @property string internalQuery()
958     {
959         return getOptional('q');
960     }
961 
962     /// ditto
963     @property string where()
964     {
965         return getOptional('W');
966     }
967 
968     /// ditto
969     @property string file()
970     {
971         return getOptional('F');
972     }
973 
974     /// ditto
975     @property string line()
976     {
977         return getOptional('L');
978     }
979 
980     /// ditto
981     @property string routine()
982     {
983         return getOptional('R');
984     }
985 
986     /**
987     Returns summary of this message using the most common fields (severity,
988     code, message, detail, hint)
989     */
990     override string toString()
991     {
992         string s = severity ~ ' ' ~ code ~ ": " ~ message;
993 
994         string* detail = 'D' in fields;
995         if (detail)
996             s ~= "\nDETAIL: " ~ *detail;
997 
998         string* hint = 'H' in fields;
999         if (hint)
1000             s ~= "\nHINT: " ~ *hint;
1001 
1002         return s;
1003     }
1004 }
1005 
1006 /**
1007 Class representing connection to PostgreSQL server.
1008 */
1009 class PGConnection
1010 {
1011     private:
1012         PGStream stream;
1013         string[string] serverParams;
1014         int serverProcessID;
1015         int serverSecretKey;
1016         TransactionStatus trStatus;
1017         ulong lastPrepared = 0;
1018         uint[uint] arrayTypes;
1019         uint[][uint] compositeTypes;
1020         string[uint][uint] enumTypes;
1021         bool activeResultSet;
1022 
1023         string reservePrepared()
1024         {
1025             synchronized (this)
1026             {
1027 
1028                 return to!string(lastPrepared++);
1029             }
1030         }
1031 
1032         Message getMessage()
1033         {
1034 
1035             char type;
1036             int len;
1037 			ubyte[1] ub;
1038 			stream.read(ub); // message type
1039 
1040 			type = bigEndianToNative!char(ub);
1041 			ubyte[4] ubi;
1042 			stream.read(ubi); // message length, doesn't include type byte
1043 
1044 			len = bigEndianToNative!int(ubi) - 4;
1045 
1046             ubyte[] msg;
1047             if (len > 0)
1048             {
1049                 msg = new ubyte[len];
1050                 stream.read(msg);
1051             }
1052 
1053             return Message(this, type, msg);
1054         }
1055 
1056         void sendStartupMessage(const string[string] params)
1057         {
1058             bool localParam(string key)
1059             {
1060                 switch (key)
1061                 {
1062                     case "host", "port", "password": return true;
1063                     default: return false;
1064                 }
1065             }
1066 
1067             int len = 9; // length (int), version number (int) and parameter-list's delimiter (byte)
1068 
1069             foreach (key, value; params)
1070             {
1071                 if (localParam(key))
1072                     continue;
1073 
1074                 len += key.length + value.length + 2;
1075             }
1076 
1077             stream.write(len);
1078             stream.write(0x0003_0000); // version number 3
1079             foreach (key, value; params)
1080             {
1081                 if (localParam(key))
1082                     continue;
1083                 stream.writeCString(key);
1084                 stream.writeCString(value);
1085             }
1086 		stream.write(cast(ubyte)0);
1087 	}
1088 
1089         void sendPasswordMessage(string password)
1090         {
1091             int len = cast(int)(4 + password.length + 1);
1092 
1093             stream.write('p');
1094             stream.write(len);
1095             stream.writeCString(password);
1096         }
1097 
1098         void sendParseMessage(string statementName, string query, int[] oids)
1099         {
1100             int len = cast(int)(4 + statementName.length + 1 + query.length + 1 + 2 + oids.length * 4);
1101 
1102             stream.write('P');
1103             stream.write(len);
1104             stream.writeCString(statementName);
1105             stream.writeCString(query);
1106             stream.write(cast(short)oids.length);
1107 
1108             foreach (oid; oids)
1109                 stream.write(oid);
1110         }
1111 
1112         void sendCloseMessage(DescribeType type, string name)
1113 		{
1114 			stream.write('C');
1115             stream.write(cast(int)(4 + 1 + name.length + 1));
1116             stream.write(cast(char)type);
1117             stream.writeCString(name);
1118         }
1119 
1120         void sendTerminateMessage()
1121 		{
1122 			stream.write('X');
1123             stream.write(cast(int)4);
1124         }
1125 
1126         void sendBindMessage(string portalName, string statementName, PGParameters params)
1127         {
1128             int paramsLen = 0;
1129             bool hasText = false;
1130 
1131 			foreach (param; params)
1132             {
1133                 enforce(param.value.hasValue, new ParamException("Parameter $" ~ to!string(param.index) ~ " value is not initialized"));
1134 
1135                 void checkParam(T)(int len)
1136                 {
1137                     if (param.value != null)
1138                     {
1139                         enforce(param.value.convertsTo!T, new ParamException("Parameter's value is not convertible to " ~ T.stringof));
1140                         paramsLen += len;
1141                     }
1142                 }
1143 
1144                 /*final*/ switch (param.type)
1145                 {
1146                     case PGType.INT2: checkParam!short(2); break;
1147                     case PGType.INT4: checkParam!int(4); break;
1148                     case PGType.INT8: checkParam!long(8); break;
1149                     case PGType.TEXT:
1150                         paramsLen += param.value.coerce!string.length;
1151                         hasText = true;
1152                         break;
1153                     case PGType.BYTEA:
1154                         paramsLen += param.value.length;
1155                         break;
1156                     case PGType.JSON:
1157                         paramsLen += param.value.coerce!string.length; // TODO: object serialisation
1158                         break;
1159                     case PGType.DATE:
1160                         paramsLen += 4; break;
1161                     case PGType.FLOAT4: checkParam!float(4); break;
1162                     case PGType.FLOAT8: checkParam!double(8); break;
1163                     case PGType.BOOLEAN: checkParam!bool(1); break;
1164                     default: assert(0, "Not implemented " ~ to!string(param.type));
1165                 }
1166             }
1167 
1168             int len = cast(int)( 4 + portalName.length + 1 + statementName.length + 1 + (hasText ? (params.length*2) : 2) + 2 + 2 +
1169                 params.length * 4 + paramsLen + 2 + 2 );
1170 
1171             stream.write('B');
1172             stream.write(len);
1173             stream.writeCString(portalName);
1174             stream.writeCString(statementName);
1175             if(hasText)
1176             {
1177                 stream.write(cast(short) params.length);
1178                 foreach(param; params)
1179                     if(param.type == PGType.TEXT)
1180                         stream.write(cast(short) 0); // text format
1181                     else
1182                         stream.write(cast(short) 1); // binary format
1183             } else {
1184                 stream.write(cast(short)1); // one parameter format code
1185                 stream.write(cast(short)1); // binary format
1186             }
1187             stream.write(cast(short)params.length);
1188 
1189             foreach (param; params)
1190             {
1191                 if (param.value == null)
1192                 {
1193                     stream.write(-1);
1194                     continue;
1195                 }
1196 
1197                 switch (param.type)
1198                 {
1199                     case PGType.INT2:
1200                         stream.write(cast(int)2);
1201                         stream.write(param.value.coerce!short);
1202                         break;
1203                     case PGType.INT4:
1204                         stream.write(cast(int)4);
1205                         stream.write(param.value.coerce!int);
1206                         break;
1207                     case PGType.INT8:
1208                         stream.write(cast(int)8);
1209                         stream.write(param.value.coerce!long);
1210                         break;
1211                     case PGType.FLOAT4:
1212                         stream.write(cast(int)4);
1213                         stream.write(param.value.coerce!float);
1214                         break;
1215                     case PGType.FLOAT8:
1216                         stream.write(cast(int)8);
1217                         stream.write(param.value.coerce!double);
1218                         break;
1219                     case PGType.TEXT:
1220                         auto s = param.value.coerce!string;
1221                         stream.write(cast(int) s.length);
1222                         if(s.length) stream.write(cast(ubyte[]) s);
1223                         break;
1224                     case PGType.BYTEA:
1225                         auto s = param.value;
1226                         stream.write(cast(int) s.length);
1227 
1228                         ubyte[] x;
1229                         x.length = s.length;
1230                         for (int i = 0; i < x.length; i++) {
1231                             x[i] = s[i].get!(ubyte);
1232                         }
1233                         stream.write(x);
1234                         break;
1235                     case PGType.JSON:
1236                         auto s = param.value.coerce!string;
1237                         stream.write(cast(int) s.length);
1238                         stream.write(cast(ubyte[]) s);
1239                         break;
1240                     case PGType.DATE:
1241                         stream.write(cast(int) 4);
1242                         stream.write(Date.fromISOString(param.value.coerce!string));
1243                         break;
1244                     case PGType.BOOLEAN:
1245                         stream.write(cast(int) 1);
1246                         stream.write(param.value.coerce!bool);
1247                         break;
1248                     default:
1249 						assert(0, "Not implemented " ~ to!string(param.type));
1250                 }
1251             }
1252 
1253             stream.write(cast(short)1); // one result format code
1254             stream.write(cast(short)1); // binary format
1255         }
1256 
1257         enum DescribeType : char { Statement = 'S', Portal = 'P' }
1258 
1259         void sendDescribeMessage(DescribeType type, string name)
1260 		{
1261 			stream.write('D');
1262             stream.write(cast(int)(4 + 1 + name.length + 1));
1263             stream.write(cast(char)type);
1264             stream.writeCString(name);
1265         }
1266 
1267         void sendExecuteMessage(string portalName, int maxRows)
1268 		{
1269 			stream.write('E');
1270             stream.write(cast(int)(4 + portalName.length + 1 + 4));
1271             stream.writeCString(portalName);
1272             stream.write(cast(int)maxRows);
1273         }
1274 
1275         void sendFlushMessage()
1276 		{
1277 			stream.write('H');
1278             stream.write(cast(int)4);
1279         }
1280 
1281         void sendSyncMessage()
1282 		{
1283 			stream.write('S');
1284             stream.write(cast(int)4);
1285         }
1286 
1287         ResponseMessage handleResponseMessage(Message msg)
1288         {
1289             enforce(msg.data.length >= 2);
1290 
1291 			char ftype;
1292             string fvalue;
1293             ResponseMessage response = new ResponseMessage;
1294 
1295             while (true)
1296             {
1297                 msg.read(ftype);
1298                 if(ftype <=0) break;
1299 
1300                 msg.readCString(fvalue);
1301                 response.fields[ftype] = fvalue;
1302             }
1303 
1304             return response;
1305         }
1306 
1307         void checkActiveResultSet()
1308         {
1309             enforce(!activeResultSet, "There's active result set, which must be closed first.");
1310         }
1311 
1312         void prepare(string statementName, string query, PGParameters params)
1313         {
1314             checkActiveResultSet();
1315             sendParseMessage(statementName, query, params.getOids());
1316 
1317             sendFlushMessage();
1318 
1319 	receive:
1320 
1321             Message msg = getMessage();
1322 
1323 		switch (msg.type)
1324             {
1325                 case 'E':
1326                     // ErrorResponse
1327                     ResponseMessage response = handleResponseMessage(msg);
1328                     sendSyncMessage();
1329                     throw new ServerErrorException(response);
1330                 case '1':
1331                     // ParseComplete
1332                     return;
1333                 default:
1334                     // async notice, notification
1335                     goto receive;
1336             }
1337         }
1338 
1339         void unprepare(string statementName)
1340         {
1341             checkActiveResultSet();
1342             sendCloseMessage(DescribeType.Statement, statementName);
1343             sendFlushMessage();
1344 
1345         receive:
1346 
1347             Message msg = getMessage();
1348 
1349             switch (msg.type)
1350             {
1351                 case 'E':
1352                     // ErrorResponse
1353                     ResponseMessage response = handleResponseMessage(msg);
1354                     throw new ServerErrorException(response);
1355                 case '3':
1356                     // CloseComplete
1357                     return;
1358                 default:
1359                     // async notice, notification
1360                     goto receive;
1361             }
1362         }
1363 
1364         PGFields bind(string portalName, string statementName, PGParameters params)
1365         {
1366             checkActiveResultSet();
1367             sendCloseMessage(DescribeType.Portal, portalName);
1368             sendBindMessage(portalName, statementName, params);
1369             sendDescribeMessage(DescribeType.Portal, portalName);
1370             sendFlushMessage();
1371 
1372         receive:
1373 
1374             Message msg = getMessage();
1375 
1376             switch (msg.type)
1377             {
1378                 case 'E':
1379                     // ErrorResponse
1380                     ResponseMessage response = handleResponseMessage(msg);
1381                     sendSyncMessage();
1382                     throw new ServerErrorException(response);
1383                 case '3':
1384                     // CloseComplete
1385                     goto receive;
1386                 case '2':
1387                     // BindComplete
1388                     goto receive;
1389                 case 'T':
1390                     // RowDescription (response to Describe)
1391                     PGField[] fields;
1392                     short fieldCount;
1393                     short formatCode;
1394                     PGField fi;
1395 
1396                     msg.read(fieldCount);
1397 
1398                     fields.length = fieldCount;
1399 
1400                     foreach (i; 0..fieldCount)
1401                     {
1402                         msg.readCString(fi.name);
1403                         msg.read(fi.tableOid);
1404                         msg.read(fi.index);
1405                         msg.read(fi.oid);
1406                         msg.read(fi.typlen);
1407                         msg.read(fi.modifier);
1408                         msg.read(formatCode);
1409 
1410                         enforce(formatCode == 1, new Exception("Field's format code returned in RowDescription is not 1 (binary)"));
1411 
1412                         fields[i] = fi;
1413                     }
1414 
1415                     return cast(PGFields)fields;
1416                 case 'n':
1417                     // NoData (response to Describe)
1418                     return new immutable(PGField)[0];
1419                 default:
1420                     // async notice, notification
1421                     goto receive;
1422             }
1423         }
1424 
1425         ulong executeNonQuery(string portalName, out uint oid)
1426         {
1427             checkActiveResultSet();
1428             ulong rowsAffected = 0;
1429 
1430             sendExecuteMessage(portalName, 0);
1431             sendSyncMessage();
1432             sendFlushMessage();
1433 
1434         receive:
1435 
1436             Message msg = getMessage();
1437 
1438             switch (msg.type)
1439             {
1440                 case 'E':
1441                     // ErrorResponse
1442                     ResponseMessage response = handleResponseMessage(msg);
1443                     throw new ServerErrorException(response);
1444                 case 'D':
1445                     // DataRow
1446                     finalizeQuery();
1447                     throw new Exception("This query returned rows.");
1448                 case 'C':
1449                     // CommandComplete
1450                     string tag;
1451 
1452                     msg.readCString(tag);
1453 
1454                     // GDC indexOf name conflict in std.string and std.algorithm
1455                     auto s1 = std..string.indexOf(tag, ' ');
1456                     if (s1 >= 0) {
1457                         switch (tag[0 .. s1]) {
1458                             case "INSERT":
1459                                 // INSERT oid rows
1460                                 auto s2 = lastIndexOf(tag, ' ');
1461                                 assert(s2 > s1);
1462                                 oid = to!uint(tag[s1 + 1 .. s2]);
1463                                 rowsAffected = to!ulong(tag[s2 + 1 .. $]);
1464                                 break;
1465                             case "DELETE", "UPDATE", "MOVE", "FETCH":
1466                                 // DELETE rows
1467                                 rowsAffected = to!ulong(tag[s1 + 1 .. $]);
1468                                 break;
1469                             default:
1470                                 // CREATE TABLE
1471                                 break;
1472                          }
1473                     }
1474 
1475                     goto receive;
1476 
1477                 case 'I':
1478                     // EmptyQueryResponse
1479                     goto receive;
1480                 case 'Z':
1481                     // ReadyForQuery
1482                     return rowsAffected;
1483                 default:
1484                     // async notice, notification
1485                     goto receive;
1486             }
1487         }
1488 
1489         DBRow!Specs fetchRow(Specs...)(ref Message msg, ref PGFields fields)
1490         {
1491             alias DBRow!Specs Row;
1492 
1493             static if (Row.hasStaticLength)
1494             {
1495                 alias Row.fieldTypes fieldTypes;
1496 
1497                 static string genFieldAssigns() // CTFE
1498                 {
1499                     string s = "";
1500 
1501                     foreach (i; 0 .. fieldTypes.length)
1502                     {
1503                         s ~= "msg.read(fieldLen);\n";
1504                         s ~= "if (fieldLen == -1)\n";
1505                         s ~= text("row.setNull!(", i, ")();\n");
1506                         s ~= "else\n";
1507                         s ~= text("row.set!(fieldTypes[", i, "], ", i, ")(",
1508                                   "msg.readBaseType!(fieldTypes[", i, "])(fields[", i, "].oid, fieldLen)",
1509                                   ");\n");
1510                         // text() doesn't work with -inline option, CTFE bug
1511                     }
1512 
1513                     return s;
1514                 }
1515             }
1516 
1517             Row row;
1518             short fieldCount;
1519             int fieldLen;
1520 
1521             msg.read(fieldCount);
1522 
1523             static if (Row.hasStaticLength)
1524             {
1525                 Row.checkReceivedFieldCount(fieldCount);
1526                 mixin(genFieldAssigns);
1527             }
1528             else
1529             {
1530                 row.setLength(fieldCount);
1531 
1532                 foreach (i; 0 .. fieldCount)
1533                 {
1534                     msg.read(fieldLen);
1535                     if (fieldLen == -1)
1536                         row.setNull(i);
1537                     else
1538                         row[i] = msg.readBaseType!(Row.ElemType)(fields[i].oid, fieldLen);
1539                 }
1540             }
1541 
1542             return row;
1543         }
1544 
1545         void finalizeQuery()
1546         {
1547             Message msg;
1548 
1549             do
1550             {
1551                 msg = getMessage();
1552 
1553                 // TODO: process async notifications
1554             }
1555             while (msg.type != 'Z'); // ReadyForQuery
1556         }
1557 
1558         PGResultSet!Specs executeQuery(Specs...)(string portalName, ref PGFields fields)
1559         {
1560             checkActiveResultSet();
1561 
1562             PGResultSet!Specs result = new PGResultSet!Specs(this, fields, &fetchRow!Specs);
1563 
1564             ulong rowsAffected = 0;
1565 
1566             sendExecuteMessage(portalName, 0);
1567             sendSyncMessage();
1568             sendFlushMessage();
1569 
1570         receive:
1571 
1572             Message msg = getMessage();
1573 
1574             switch (msg.type)
1575             {
1576                 case 'D':
1577                     // DataRow
1578                     alias DBRow!Specs Row;
1579 
1580                     result.row = fetchRow!Specs(msg, fields);
1581                     static if (!Row.hasStaticLength)
1582                         result.row.columnToIndex = &result.columnToIndex;
1583                     result.validRow = true;
1584                     result.nextMsg = getMessage();
1585 
1586                     activeResultSet = true;
1587 
1588                     return result;
1589                 case 'C':
1590                     // CommandComplete
1591                     string tag;
1592 
1593                     msg.readCString(tag);
1594 
1595                     auto s2 = lastIndexOf(tag, ' ');
1596                     if (s2 >= 0)
1597                     {
1598                         rowsAffected = to!ulong(tag[s2 + 1 .. $]);
1599                     }
1600 
1601                     goto receive;
1602                 case 'I':
1603                     // EmptyQueryResponse
1604                     throw new Exception("Query string is empty.");
1605                 case 's':
1606                     // PortalSuspended
1607                     throw new Exception("Command suspending is not supported.");
1608                 case 'Z':
1609                     // ReadyForQuery
1610                     result.nextMsg = msg;
1611                     return result;
1612                 case 'E':
1613                     // ErrorResponse
1614                     ResponseMessage response = handleResponseMessage(msg);
1615                     throw new ServerErrorException(response);
1616                 default:
1617                     // async notice, notification
1618                     goto receive;
1619             }
1620 
1621             assert(0);
1622         }
1623 
1624     public:
1625 
1626 
1627         /**
1628         Opens connection to server.
1629 
1630         Params:
1631         params = Associative array of string keys and values.
1632 
1633         Currently recognized parameters are:
1634         $(UL
1635             $(LI host - Host name or IP address of the server. Required.)
1636             $(LI port - Port number of the server. Defaults to 5432.)
1637             $(LI user - The database user. Required.)
1638             $(LI database - The database to connect to. Defaults to the user name.)
1639             $(LI options - Command-line arguments for the backend. (This is deprecated in favor of setting individual run-time parameters.))
1640         )
1641 
1642         In addition to the above, any run-time parameter that can be set at backend start time might be listed.
1643         Such settings will be applied during backend start (after parsing the command-line options if any).
1644         The values will act as session defaults.
1645 
1646         Examples:
1647         ---
1648         auto conn = new PGConnection([
1649             "host" : "localhost",
1650             "database" : "test",
1651             "user" : "postgres",
1652             "password" : "postgres"
1653         ]);
1654         ---
1655         */
1656         this(const string[string] params)
1657         {
1658             enforce("host" in params, new ParamException("Required parameter 'host' not found"));
1659             enforce("user" in params, new ParamException("Required parameter 'user' not found"));
1660 
1661             string[string] p = cast(string[string])params;
1662 
1663             ushort port = "port" in params? parse!ushort(p["port"]) : 5432;
1664 
1665             version(Have_vibe_d_core)
1666             {
1667                 stream = new PGStream(new TCPConnectionWrapper(params["host"], port));
1668             }
1669             else
1670             {
1671                 stream = new PGStream(new TcpSocket);
1672                 stream.socket.connect(new InternetAddress(params["host"], port));
1673             }
1674             sendStartupMessage(params);
1675 
1676         receive:
1677 
1678     		Message msg = getMessage();
1679 
1680             switch (msg.type)
1681             {
1682                 case 'E', 'N':
1683                     // ErrorResponse, NoticeResponse
1684 
1685                     ResponseMessage response = handleResponseMessage(msg);
1686 
1687 				    if (msg.type == 'N')
1688                         goto receive;
1689 
1690                     throw new ServerErrorException(response);
1691                 case 'R':
1692                     // AuthenticationXXXX
1693                     enforce(msg.data.length >= 4);
1694 
1695                     int atype;
1696 
1697                     msg.read(atype);
1698 
1699                     switch (atype)
1700                     {
1701                         case 0:
1702                             // authentication successful, now wait for another messages
1703                             goto receive;
1704                         case 3:
1705                             // clear-text password is required
1706                             enforce("password" in params, new ParamException("Required parameter 'password' not found"));
1707                             enforce(msg.data.length == 4);
1708 
1709                             sendPasswordMessage(params["password"]);
1710 
1711                             goto receive;
1712                         case 5:
1713                             // MD5-hashed password is required, formatted as:
1714                             // "md5" + md5(md5(password + username) + salt)
1715                             // where md5() returns lowercase hex-string
1716                             enforce("password" in params, new ParamException("Required parameter 'password' not found"));
1717                             enforce(msg.data.length == 8);
1718 
1719                             char[3 + 32] password;
1720                             password[0 .. 3] = "md5";
1721                             password[3 .. $] = MD5toHex(MD5toHex(
1722                                 params["password"], params["user"]), msg.data[4 .. 8]);
1723 
1724                             sendPasswordMessage(to!string(password));
1725 
1726                             goto receive;
1727                         default:
1728                             // non supported authentication type, close connection
1729                             this.close();
1730                             throw new Exception("Unsupported authentication type");
1731                     }
1732 
1733                 case 'S':
1734                     // ParameterStatus
1735                     enforce(msg.data.length >= 2);
1736 
1737                     string pname, pvalue;
1738 
1739                     msg.readCString(pname);
1740                     msg.readCString(pvalue);
1741 
1742                     serverParams[pname] = pvalue;
1743 
1744                     goto receive;
1745 
1746                 case 'K':
1747                     // BackendKeyData
1748                     enforce(msg.data.length == 8);
1749 
1750                     msg.read(serverProcessID);
1751                     msg.read(serverSecretKey);
1752 
1753                     goto receive;
1754 
1755                 case 'Z':
1756                     // ReadyForQuery
1757                     enforce(msg.data.length == 1);
1758 
1759                     msg.read(cast(char)trStatus);
1760 
1761                     // check for validity
1762                     switch (trStatus)
1763                     {
1764                         case 'I', 'T', 'E': break;
1765                         default: throw new Exception("Invalid transaction status");
1766                     }
1767 
1768                     // connection is opened and now it's possible to send queries
1769                     reloadAllTypes();
1770                     return;
1771                 default:
1772                     // unknown message type, ignore it
1773                     goto receive;
1774             }
1775         }
1776 
1777         /// Closes current connection to the server.
1778         void close()
1779         {
1780             sendTerminateMessage();
1781             stream.socket.close();
1782         }
1783 
1784         /// Shorthand methods using temporary PGCommand. Semantics is the same as PGCommand's.
1785         ulong executeNonQuery(string query)
1786         {
1787             scope cmd = new PGCommand(this, query);
1788             return cmd.executeNonQuery();
1789         }
1790 
1791         /// ditto
1792         PGResultSet!Specs executeQuery(Specs...)(string query)
1793         {
1794             scope cmd = new PGCommand(this, query);
1795             return cmd.executeQuery!Specs();
1796         }
1797 
1798         /// ditto
1799         DBRow!Specs executeRow(Specs...)(string query, bool throwIfMoreRows = true)
1800         {
1801             scope cmd = new PGCommand(this, query);
1802             return cmd.executeRow!Specs(throwIfMoreRows);
1803         }
1804 
1805         /// ditto
1806         T executeScalar(T)(string query, bool throwIfMoreRows = true)
1807         {
1808             scope cmd = new PGCommand(this, query);
1809             return cmd.executeScalar!T(throwIfMoreRows);
1810         }
1811 
1812         void reloadArrayTypes()
1813         {
1814             auto cmd = new PGCommand(this, "SELECT oid, typelem FROM pg_type WHERE typcategory = 'A'");
1815             auto result = cmd.executeQuery!(uint, "arrayOid", uint, "elemOid");
1816             scope(exit) result.close;
1817 
1818             arrayTypes = null;
1819 
1820             foreach (row; result)
1821             {
1822                 arrayTypes[row.arrayOid] = row.elemOid;
1823             }
1824 
1825             arrayTypes.rehash;
1826         }
1827 
1828         void reloadCompositeTypes()
1829         {
1830             auto cmd = new PGCommand(this, "SELECT a.attrelid, a.atttypid FROM pg_attribute a JOIN pg_type t ON
1831                                      a.attrelid = t.typrelid WHERE a.attnum > 0 ORDER BY a.attrelid, a.attnum");
1832             auto result = cmd.executeQuery!(uint, "typeOid", uint, "memberOid");
1833             scope(exit) result.close;
1834 
1835             compositeTypes = null;
1836 
1837             uint lastOid = 0;
1838             uint[]* memberOids;
1839 
1840             foreach (row; result)
1841             {
1842                 if (row.typeOid != lastOid)
1843                 {
1844                     compositeTypes[lastOid = row.typeOid] = new uint[0];
1845                     memberOids = &compositeTypes[lastOid];
1846                 }
1847 
1848                 *memberOids ~= row.memberOid;
1849             }
1850 
1851             compositeTypes.rehash;
1852         }
1853 
1854         void reloadEnumTypes()
1855         {
1856             auto cmd = new PGCommand(this, "SELECT enumtypid, oid, enumlabel FROM pg_enum ORDER BY enumtypid, oid");
1857             auto result = cmd.executeQuery!(uint, "typeOid", uint, "valueOid", string, "valueLabel");
1858             scope(exit) result.close;
1859 
1860             enumTypes = null;
1861 
1862             uint lastOid = 0;
1863             string[uint]* enumValues;
1864 
1865             foreach (row; result)
1866             {
1867                 if (row.typeOid != lastOid)
1868                 {
1869                     if (lastOid > 0)
1870                         (*enumValues).rehash;
1871 
1872                     enumTypes[lastOid = row.typeOid] = null;
1873                     enumValues = &enumTypes[lastOid];
1874                 }
1875 
1876                 (*enumValues)[row.valueOid] = row.valueLabel;
1877             }
1878 
1879             if (lastOid > 0)
1880                 (*enumValues).rehash;
1881 
1882             enumTypes.rehash;
1883         }
1884 
1885         void reloadAllTypes()
1886         {
1887             // todo: make simpler type lists, since we need only oids of types (without their members)
1888             reloadArrayTypes();
1889             reloadCompositeTypes();
1890             reloadEnumTypes();
1891         }
1892 }
1893 
1894 /// Class representing single query parameter
1895 class PGParameter
1896 {
1897     private PGParameters params;
1898     immutable short index;
1899     immutable PGType type;
1900     private Variant _value;
1901 
1902     /// Value bound to this parameter
1903     @property Variant value()
1904     {
1905         return _value;
1906     }
1907     /// ditto
1908     @property Variant value(T)(T v)
1909     {
1910         params.changed = true;
1911         return _value = Variant(v);
1912     }
1913 
1914     private this(PGParameters params, short index, PGType type)
1915     {
1916         enforce(index > 0, new ParamException("Parameter's index must be > 0"));
1917         this.params = params;
1918         this.index = index;
1919         this.type = type;
1920     }
1921 }
1922 
1923 /// Collection of query parameters
1924 class PGParameters
1925 {
1926     private PGParameter[short] params;
1927     private PGCommand cmd;
1928     private bool changed;
1929 
1930     private int[] getOids()
1931     {
1932         short[] keys = params.keys;
1933         sort(keys);
1934 
1935         int[] oids = new int[params.length];
1936 
1937         foreach (int i, key; keys)
1938         {
1939             oids[i] = params[key].type;
1940         }
1941 
1942         return oids;
1943     }
1944 
1945     ///
1946     @property short length()
1947     {
1948         return cast(short)params.length;
1949     }
1950 
1951     private this(PGCommand cmd)
1952     {
1953         this.cmd = cmd;
1954     }
1955 
1956     /**
1957     Creates and returns new parameter.
1958     Examples:
1959     ---
1960     // without spaces between $ and number
1961     auto cmd = new PGCommand(conn, "INSERT INTO users (name, surname) VALUES ($ 1, $ 2)");
1962     cmd.parameters.add(1, PGType.TEXT).value = "John";
1963     cmd.parameters.add(2, PGType.TEXT).value = "Doe";
1964 
1965     assert(cmd.executeNonQuery == 1);
1966     ---
1967     */
1968     PGParameter add(short index, PGType type)
1969     {
1970         enforce(!cmd.prepared, "Can't add parameter to prepared statement.");
1971         changed = true;
1972         return params[index] = new PGParameter(this, index, type);
1973     }
1974 
1975     // todo: remove()
1976 
1977     PGParameter opIndex(short index)
1978     {
1979         return params[index];
1980     }
1981 
1982     int opApply(int delegate(ref PGParameter param) dg)
1983     {
1984         int result = 0;
1985 
1986         foreach (number; sort(params.keys))
1987         {
1988             result = dg(params[number]);
1989 
1990             if (result)
1991                 break;
1992         }
1993 
1994         return result;
1995     }
1996 }
1997 
1998 /// Array of fields returned by the server
1999 alias immutable(PGField)[] PGFields;
2000 
2001 /// Contains information about fields returned by the server
2002 struct PGField
2003 {
2004     /// The field name.
2005     string name;
2006     /// If the field can be identified as a column of a specific table, the object ID of the table; otherwise zero.
2007     uint tableOid;
2008     /// If the field can be identified as a column of a specific table, the attribute number of the column; otherwise zero.
2009     short index;
2010     /// The object ID of the field's data type.
2011     uint oid;
2012     /// The data type size (see pg_type.typlen). Note that negative values denote variable-width types.
2013     short typlen;
2014     /// The type modifier (see pg_attribute.atttypmod). The meaning of the modifier is type-specific.
2015     int modifier;
2016 }
2017 
2018 /// Class encapsulating prepared or non-prepared statements (commands).
2019 class PGCommand
2020 {
2021     private PGConnection conn;
2022     private string _query;
2023     private PGParameters params;
2024     private PGFields _fields = null;
2025     private string preparedName;
2026     private uint _lastInsertOid;
2027     private bool prepared;
2028 
2029     /// List of parameters bound to this command
2030     @property PGParameters parameters()
2031     {
2032         return params;
2033     }
2034 
2035     /// List of fields that will be returned from the server. Available after successful call to bind().
2036     @property PGFields fields()
2037     {
2038         return _fields;
2039     }
2040 
2041     /**
2042     Checks if this is query or non query command. Available after successful call to bind().
2043     Returns: true if server returns at least one field (column). Otherwise false.
2044     */
2045     @property bool isQuery()
2046     {
2047         enforce(_fields !is null, new Exception("bind() must be called first."));
2048         return _fields.length > 0;
2049     }
2050 
2051     /// Returns: true if command is currently prepared, otherwise false.
2052     @property bool isPrepared()
2053     {
2054         return prepared;
2055     }
2056 
2057     /// Query assigned to this command.
2058     @property string query()
2059     {
2060         return _query;
2061     }
2062     /// ditto
2063     @property string query(string query)
2064     {
2065         enforce(!prepared, "Can't change query for prepared statement.");
2066         return _query = query;
2067     }
2068 
2069     /// If table is with OIDs, it contains last inserted OID.
2070     @property uint lastInsertOid()
2071     {
2072         return _lastInsertOid;
2073     }
2074 
2075     this(PGConnection conn, string query = "")
2076     {
2077         this.conn = conn;
2078         _query = query;
2079         params = new PGParameters(this);
2080         _fields = new immutable(PGField)[0];
2081         preparedName = "";
2082         prepared = false;
2083     }
2084 
2085     /// Prepare this statement, i.e. cache query plan.
2086     void prepare()
2087     {
2088         enforce(!prepared, "This command is already prepared.");
2089         preparedName = conn.reservePrepared();
2090         conn.prepare(preparedName, _query, params);
2091         prepared = true;
2092         params.changed = true;
2093     }
2094 
2095     /// Unprepare this statement. Goes back to normal query planning.
2096     void unprepare()
2097     {
2098         enforce(prepared, "This command is not prepared.");
2099         conn.unprepare(preparedName);
2100         preparedName = "";
2101         prepared = false;
2102         params.changed = true;
2103     }
2104 
2105     /**
2106     Binds values to parameters and updates list of returned fields.
2107 
2108     This is normally done automatically, but it may be useful to check what fields
2109     would be returned from a query, before executing it.
2110     */
2111     void bind()
2112     {
2113         checkPrepared(false);
2114         _fields = conn.bind(preparedName, preparedName, params);
2115         params.changed = false;
2116     }
2117 
2118     private void checkPrepared(bool bind)
2119     {
2120         if (!prepared)
2121         {
2122             // use unnamed statement & portal
2123             conn.prepare("", _query, params);
2124             if (bind)
2125             {
2126                 _fields = conn.bind("", "", params);
2127                 params.changed = false;
2128             }
2129         }
2130     }
2131 
2132     private void checkBound()
2133     {
2134         if (params.changed)
2135             bind();
2136     }
2137 
2138     /**
2139     Executes a non query command, i.e. query which doesn't return any rows. Commonly used with
2140     data manipulation commands, such as INSERT, UPDATE and DELETE.
2141     Examples:
2142     ---
2143     auto cmd = new PGCommand(conn, "DELETE * FROM table");
2144     auto deletedRows = cmd.executeNonQuery;
2145     cmd.query = "UPDATE table SET quantity = 1 WHERE price > 100";
2146     auto updatedRows = cmd.executeNonQuery;
2147     cmd.query = "INSERT INTO table VALUES(1, 50)";
2148     assert(cmd.executeNonQuery == 1);
2149     ---
2150     Returns: Number of affected rows.
2151     */
2152     ulong executeNonQuery()
2153     {
2154         checkPrepared(true);
2155         checkBound();
2156         return conn.executeNonQuery(preparedName, _lastInsertOid);
2157     }
2158 
2159     /**
2160     Executes query which returns row sets, such as SELECT command.
2161     Params:
2162     bufferedRows = Number of rows that may be allocated at the same time.
2163     Returns: InputRange of DBRow!Specs.
2164     */
2165     PGResultSet!Specs executeQuery(Specs...)()
2166     {
2167         checkPrepared(true);
2168         checkBound();
2169         return conn.executeQuery!Specs(preparedName, _fields);
2170     }
2171 
2172     /**
2173     Executes query and returns only first row of the result.
2174     Params:
2175     throwIfMoreRows = If true, throws Exception when result contains more than one row.
2176     Examples:
2177     ---
2178     auto cmd = new PGCommand(conn, "SELECT 1, 'abc'");
2179     auto row1 = cmd.executeRow!(int, string); // returns DBRow!(int, string)
2180     assert(is(typeof(i[0]) == int) && is(typeof(i[1]) == string));
2181     auto row2 = cmd.executeRow; // returns DBRow!(Variant[])
2182     ---
2183     Throws: Exception if result doesn't contain any rows or field count do not match.
2184     Throws: Exception if result contains more than one row when throwIfMoreRows is true.
2185     */
2186     DBRow!Specs executeRow(Specs...)(bool throwIfMoreRows = true)
2187     {
2188         auto result = executeQuery!Specs();
2189         scope(exit) result.close();
2190         enforce(!result.empty(), "Result doesn't contain any rows.");
2191         auto row = result.front();
2192         if (throwIfMoreRows)
2193         {
2194             result.popFront();
2195             enforce(result.empty(), "Result contains more than one row.");
2196         }
2197         return row;
2198     }
2199 
2200     /**
2201     Executes query returning exactly one row and field. By default, returns Variant type.
2202     Params:
2203     throwIfMoreRows = If true, throws Exception when result contains more than one row.
2204     Examples:
2205     ---
2206     auto cmd = new PGCommand(conn, "SELECT 1");
2207     auto i = cmd.executeScalar!int; // returns int
2208     assert(is(typeof(i) == int));
2209     auto v = cmd.executeScalar; // returns Variant
2210     ---
2211     Throws: Exception if result doesn't contain any rows or if it contains more than one field.
2212     Throws: Exception if result contains more than one row when throwIfMoreRows is true.
2213     */
2214     T executeScalar(T = Variant)(bool throwIfMoreRows = true)
2215     {
2216         auto result = executeQuery!T();
2217         scope(exit) result.close();
2218         enforce(!result.empty(), "Result doesn't contain any rows.");
2219         T row = result.front();
2220         if (throwIfMoreRows)
2221         {
2222             result.popFront();
2223             enforce(result.empty(), "Result contains more than one row.");
2224         }
2225         return row;
2226     }
2227 }
2228 
2229 /// Input range of DBRow!Specs
2230 class PGResultSet(Specs...)
2231 {
2232     alias DBRow!Specs Row;
2233     alias Row delegate(ref Message msg, ref PGFields fields) FetchRowDelegate;
2234 
2235     private FetchRowDelegate fetchRow;
2236     private PGConnection conn;
2237     private PGFields fields;
2238     private Row row;
2239     private bool validRow;
2240     private Message nextMsg;
2241     private size_t[][string] columnMap;
2242 
2243     private this(PGConnection conn, ref PGFields fields, FetchRowDelegate dg)
2244     {
2245         this.conn = conn;
2246         this.fields = fields;
2247         this.fetchRow = dg;
2248         validRow = false;
2249 
2250         foreach (i, field; fields)
2251         {
2252             size_t[]* indices = field.name in columnMap;
2253 
2254             if (indices)
2255                 *indices ~= i;
2256             else
2257                 columnMap[field.name] = [i];
2258         }
2259     }
2260 
2261     private size_t columnToIndex(string column, size_t index)
2262     {
2263         size_t[]* indices = column in columnMap;
2264         enforce(indices, "Unknown column name");
2265         return (*indices)[index];
2266     }
2267 
2268     pure nothrow bool empty()
2269     {
2270         return !validRow;
2271     }
2272 
2273     void popFront()
2274     {
2275         if (nextMsg.type == 'D')
2276         {
2277             row = fetchRow(nextMsg, fields);
2278             static if (!Row.hasStaticLength)
2279                 row.columnToIndex = &columnToIndex;
2280             validRow = true;
2281             nextMsg = conn.getMessage();
2282         }
2283         else
2284             validRow = false;
2285     }
2286 
2287     pure nothrow Row front()
2288     {
2289         return row;
2290     }
2291 
2292     /// Closes current result set. It must be closed before issuing another query on the same connection.
2293     void close()
2294     {
2295         if (nextMsg.type != 'Z')
2296             conn.finalizeQuery();
2297         conn.activeResultSet = false;
2298     }
2299 
2300     int opApply(int delegate(ref Row row) dg)
2301     {
2302         int result = 0;
2303 
2304         while (!empty)
2305         {
2306             result = dg(row);
2307             popFront;
2308 
2309             if (result)
2310                 break;
2311         }
2312 
2313         return result;
2314     }
2315 
2316     int opApply(int delegate(ref size_t i, ref Row row) dg)
2317     {
2318         int result = 0;
2319         size_t i;
2320 
2321         while (!empty)
2322         {
2323             result = dg(i, row);
2324             popFront;
2325             i++;
2326 
2327             if (result)
2328                 break;
2329         }
2330 
2331         return result;
2332     }
2333 }
2334 
2335 
2336 version(Have_vibe_d_core)
2337 {
2338     import vibe.core.connectionpool;
2339 
2340     // wrap vibe.d TCPConnection class with the scope of reopening the tcp connection if closed 
2341     // by PostgreSQL it for some reason.
2342     // see https://forum.rejectedsoftware.com/groups/rejectedsoftware.vibed/thread/44097/
2343     private class TCPConnectionWrapper 
2344     {
2345         this(string host, ushort port, string bindInterface = null, ushort bindPort = cast(ushort)0u)
2346         {
2347             this.host = host;
2348             this.port = port;
2349             this.bindInterface = bindInterface;
2350             this.bindPort = bindPort;
2351 
2352             connect();
2353         }
2354 
2355         void close(){ tcpConnection.close(); }
2356 
2357         void write(const(ubyte[]) bytes)
2358         {
2359             // Vibe:  "... If connected is false, writing to the connection will trigger an exception ..."
2360             if (!tcpConnection.connected)
2361             {
2362                 // Vibe: " ... Note that close must always be called, even if the remote has already closed the
2363                 //             connection. Failure to do so will result in resource and memory leakage.
2364                 tcpConnection.close();
2365                 connect();
2366             }
2367             tcpConnection.write(bytes);
2368         }
2369 
2370         void read(ubyte[] dst)
2371         {
2372             if (!tcpConnection.connected)
2373             {
2374                 tcpConnection.close();
2375                 connect();
2376             }
2377             if (!tcpConnection.empty)
2378             {
2379                 tcpConnection.read(dst);
2380             }
2381         }
2382 
2383         private
2384         {
2385             void connect()
2386             {
2387                 tcpConnection = connectTCP(host, port, bindInterface, bindPort);
2388             }
2389 
2390             string host;
2391             string bindInterface;
2392             ushort port;
2393             ushort bindPort;
2394 
2395             TCPConnection tcpConnection;
2396         }
2397     }
2398 
2399     class PostgresDB {
2400         private {
2401             string[string] m_params;
2402             ConnectionPool!PGConnection m_pool;
2403         }
2404 
2405         this(string[string] conn_params)
2406         {
2407             m_params = conn_params.dup;
2408             m_pool = new ConnectionPool!PGConnection(&createConnection);
2409         }
2410 
2411         auto lockConnection() { return m_pool.lockConnection(); }
2412 
2413         private PGConnection createConnection()
2414         {
2415             return new PGConnection(m_params);
2416         }
2417     }
2418 }
2419 else
2420 {
2421 	class PostgresDB() {
2422 		static assert(false,
2423 		              "The 'PostgresDB' connection pool requires Vibe.d and therefore "~
2424 		              "must be used with -version=Have_vibe_d_core"
2425 		              );
2426 	}
2427 }
2428