1 /** 2 PostgreSQL client implementation. 3 4 Features: 5 $(UL 6 $(LI Standalone (does not depend on libpq)) 7 $(LI Binary formatting (avoids parsing overhead)) 8 $(LI Prepared statements) 9 $(LI Parametrized queries (partially working)) 10 $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/datatype-enum.html, Enums)) 11 $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/arrays.html, Arrays)) 12 $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/rowtypes.html, Composite types)) 13 ) 14 15 TODOs: 16 $(UL 17 $(LI Redesign parametrized queries) 18 $(LI BigInt/Numeric types support) 19 $(LI Geometric types support) 20 $(LI Network types support) 21 $(LI Bit string types support) 22 $(LI UUID type support) 23 $(LI XML types support) 24 $(LI Transaction support) 25 $(LI Asynchronous notifications) 26 $(LI Better memory management) 27 $(LI More friendly PGFields) 28 ) 29 30 Bugs: 31 $(UL 32 $(LI Support only cleartext and MD5 $(LINK2 http://www.postgresql.org/docs/9.0/static/auth-methods.html, authentication)) 33 $(LI Unfinished parameter handling) 34 $(LI interval is converted to Duration, which does not support months) 35 ) 36 37 $(B Data type mapping:) 38 39 $(TABLE 40 $(TR $(TH PostgreSQL type) $(TH Aliases) $(TH Default D type) $(TH D type mapping possibilities)) 41 $(TR $(TD smallint) $(TD int2) $(TD short) <td rowspan="19">Any type convertible from default D type</td>) 42 $(TR $(TD integer) $(TD int4) $(TD int)) 43 $(TR $(TD bigint) $(TD int8) $(TD long)) 44 $(TR $(TD oid) $(TD reg***) $(TD uint)) 45 $(TR $(TD decimal) $(TD numeric) $(TD not yet supported)) 46 $(TR $(TD real) $(TD float4) $(TD float)) 47 $(TR $(TD double precision) $(TD float8) $(TD double)) 48 $(TR $(TD character varying(n)) $(TD varchar(n)) $(TD string)) 49 $(TR $(TD character(n)) $(TD char(n)) $(TD string)) 50 $(TR $(TD text) $(TD) $(TD string)) 51 $(TR $(TD "char") $(TD) $(TD char)) 52 $(TR $(TD bytea) $(TD) $(TD ubyte[])) 53 $(TR $(TD timestamp without time zone) $(TD timestamp) $(TD DateTime)) 54 $(TR $(TD timestamp with time zone) $(TD timestamptz) $(TD SysTime)) 55 $(TR $(TD date) $(TD) $(TD Date)) 56 $(TR $(TD time without time zone) $(TD time) $(TD TimeOfDay)) 57 $(TR $(TD time with time zone) $(TD timetz) $(TD SysTime)) 58 $(TR $(TD interval) $(TD) $(TD Duration (without months and years))) 59 $(TR $(TD boolean) $(TD bool) $(TD bool)) 60 $(TR $(TD enums) $(TD) $(TD string) $(TD enum)) 61 $(TR $(TD arrays) $(TD) $(TD Variant[]) $(TD dynamic/static array with compatible element type)) 62 $(TR $(TD composites) $(TD record, row) $(TD Variant[]) $(TD dynamic/static array, struct or Tuple)) 63 ) 64 65 Examples: 66 with vibe.d use -version=Have_vibe_d_core and use a ConnectionPool (PostgresDB Object & lockConnection) 67 --- 68 69 auto pdb = new PostgresDB([ 70 "host" : "192.168.2.50", 71 "database" : "postgres", 72 "user" : "postgres", 73 "password" : "" 74 ]); 75 auto conn = pdb.lockConnection(); 76 77 auto cmd = new PGCommand(conn, "SELECT typname, typlen FROM pg_type"); 78 auto result = cmd.executeQuery; 79 80 try 81 { 82 foreach (row; result) 83 { 84 writeln(row["typname"], ", ", row[1]); 85 } 86 } 87 finally 88 { 89 result.close; 90 } 91 92 --- 93 without vibe.d you can use std sockets with PGConnection object 94 95 --- 96 import std.stdio; 97 import ddb.postgres; 98 99 int main(string[] argv) 100 { 101 auto conn = new PGConnection([ 102 "host" : "localhost", 103 "database" : "test", 104 "user" : "postgres", 105 "password" : "postgres" 106 ]); 107 108 scope(exit) conn.close; 109 110 auto cmd = new PGCommand(conn, "SELECT typname, typlen FROM pg_type"); 111 auto result = cmd.executeQuery; 112 113 try 114 { 115 foreach (row; result) 116 { 117 writeln(row[0], ", ", row[1]); 118 } 119 } 120 finally 121 { 122 result.close; 123 } 124 125 return 0; 126 } 127 --- 128 129 Copyright: Copyright Piotr Szturmaj 2011-. 130 License: $(LINK2 http://boost.org/LICENSE_1_0.txt, Boost License 1.0). 131 Authors: Piotr Szturmaj 132 *//* 133 Documentation contains portions copied from PostgreSQL manual (mainly field information and 134 connection parameters description). License: 135 136 Portions Copyright (c) 1996-2010, The PostgreSQL Global Development Group 137 Portions Copyright (c) 1994, The Regents of the University of California 138 139 Permission to use, copy, modify, and distribute this software and its documentation for any purpose, 140 without fee, and without a written agreement is hereby granted, provided that the above copyright 141 notice and this paragraph and the following two paragraphs appear in all copies. 142 143 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, 144 INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, 145 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY 146 OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 147 148 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT 149 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 150 PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 151 CALIFORNIA HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, 152 OR MODIFICATIONS. 153 */ 154 module ddb.postgres; 155 156 version (Have_vibe_d_core) 157 { 158 import vibe.core.net; 159 import vibe.core.stream; 160 } 161 else 162 { 163 import std.socket; 164 } 165 import std.bitmanip; 166 import std.exception; 167 import std.conv; 168 import std.traits; 169 import std.typecons; 170 import std..string; 171 import std.digest.md; 172 import core.bitop; 173 import std.variant; 174 import std.algorithm; 175 import std.stdio; 176 import std.datetime; 177 import std.uuid; 178 public import ddb.db; 179 180 private: 181 182 const PGEpochDate = Date(2000, 1, 1); 183 const PGEpochDay = PGEpochDate.dayOfGregorianCal; 184 const PGEpochTime = TimeOfDay(0, 0, 0); 185 const PGEpochDateTime = DateTime(2000, 1, 1, 0, 0, 0); 186 187 class PGStream 188 { 189 version (Have_vibe_d_core) 190 { 191 private TCPConnectionWrapper m_socket; 192 193 @property TCPConnectionWrapper socket() 194 { 195 return m_socket; 196 } 197 198 this(TCPConnectionWrapper socket) 199 { 200 m_socket = socket; 201 } 202 } 203 else 204 { 205 private Socket m_socket; 206 207 @property Socket socket() 208 { 209 return m_socket; 210 } 211 212 this(Socket socket) 213 { 214 m_socket = socket; 215 } 216 } 217 218 protected void read(ubyte[] buffer) 219 { 220 version(Have_vibe_d_core) 221 { 222 m_socket.read(buffer); 223 } 224 else 225 { 226 if (buffer.length > 0) 227 { 228 m_socket.receive(buffer); 229 } 230 } 231 } 232 233 void write(ubyte[] x) 234 { 235 version(Have_vibe_d_core) 236 { 237 m_socket.write(x); 238 } 239 else 240 { 241 if (x.length > 0) 242 { 243 m_socket.send(x); 244 } 245 } 246 } 247 248 void write(ubyte x) 249 { 250 write(nativeToBigEndian(x)); // ubyte[] 251 } 252 253 void write(short x) 254 { 255 write(nativeToBigEndian(x)); // ubyte[] 256 } 257 258 void write(int x) 259 { 260 write(nativeToBigEndian(x)); // ubyte[] 261 } 262 263 void write(long x) 264 { 265 write(nativeToBigEndian(x)); 266 } 267 268 void write(float x) 269 { 270 write(nativeToBigEndian(x)); // ubyte[] 271 } 272 273 void write(double x) 274 { 275 write(nativeToBigEndian(x)); 276 } 277 278 void writeString(string x) 279 { 280 ubyte[] ub = cast(ubyte[])(x); 281 write(ub); 282 } 283 284 void writeCString(string x) 285 { 286 writeString(x); 287 write('\0'); 288 } 289 290 void writeCString(char[] x) 291 { 292 write(cast(ubyte[])x); 293 write('\0'); 294 } 295 296 void write(const ref Date x) 297 { 298 write(cast(int)(x.dayOfGregorianCal - PGEpochDay)); 299 } 300 301 void write(Date x) 302 { 303 write(cast(int)(x.dayOfGregorianCal - PGEpochDay)); 304 } 305 306 void write(const ref TimeOfDay x) 307 { 308 write(cast(int)((x - PGEpochTime).total!"usecs")); 309 } 310 311 void write(const ref DateTime x) // timestamp 312 { 313 write(cast(int)((x - PGEpochDateTime).total!"usecs")); 314 } 315 316 void write(DateTime x) // timestamp 317 { 318 write(cast(int)((x - PGEpochDateTime).total!"usecs")); 319 } 320 321 void write(const ref SysTime x) // timestamptz 322 { 323 write(cast(int)((x - SysTime(PGEpochDateTime, UTC())).total!"usecs")); 324 } 325 326 // BUG: Does not support months 327 void write(const ref core.time.Duration x) // interval 328 { 329 int months = cast(int)(x.split!"weeks".weeks/28); 330 int days = cast(int)x.split!"days".days; 331 long usecs = x.total!"usecs" - convert!("days", "usecs")(days); 332 333 write(usecs); 334 write(days); 335 write(months); 336 } 337 338 void writeTimeTz(const ref SysTime x) // timetz 339 { 340 TimeOfDay t = cast(TimeOfDay)x; 341 write(t); 342 write(cast(int)0); 343 } 344 } 345 346 char[32] MD5toHex(T...)(in T data) 347 { 348 return md5Of(data).toHexString!(LetterCase.lower); 349 } 350 351 struct Message 352 { 353 PGConnection conn; 354 char type; 355 ubyte[] data; 356 357 private size_t position = 0; 358 359 T read(T, Params...)(Params p) 360 { 361 T value; 362 read(value, p); 363 return value; 364 } 365 366 void read()(out char x) 367 { 368 x = data[position++]; 369 } 370 371 372 void read(Int)(out Int x) if((isIntegral!Int || isFloatingPoint!Int) && Int.sizeof > 1) 373 { 374 ubyte[Int.sizeof] buf; 375 buf[] = data[position..position+Int.sizeof]; 376 x = bigEndianToNative!Int(buf); 377 position += Int.sizeof; 378 } 379 380 string readCString() 381 { 382 string x; 383 readCString(x); 384 return x; 385 } 386 387 void readCString(out string x) 388 { 389 ubyte* p = data.ptr + position; 390 391 while (*p > 0) 392 p++; 393 x = cast(string)data[position .. cast(size_t)(p - data.ptr)]; 394 position = cast(size_t)(p - data.ptr + 1); 395 } 396 397 string readString(int len) 398 { 399 string x; 400 readString(x, len); 401 return x; 402 } 403 404 void readString(out string x, int len) 405 { 406 x = cast(string)(data[position .. position + len]); 407 position += len; 408 } 409 410 void read()(out bool x) 411 { 412 x = cast(bool)data[position++]; 413 } 414 415 void read()(out ubyte[] x, int len) 416 { 417 enforce(position + len <= data.length); 418 x = data[position .. position + len]; 419 position += len; 420 } 421 422 void read()(out UUID u) // uuid 423 { 424 ubyte[16] uuidData = data[position .. position + 16]; 425 position += 16; 426 u = UUID(uuidData); 427 } 428 429 void read()(out Date x) // date 430 { 431 int days = read!int; // number of days since 1 Jan 2000 432 x = PGEpochDate + dur!"days"(days); 433 } 434 435 void read()(out TimeOfDay x) // time 436 { 437 long usecs = read!long; 438 x = PGEpochTime + dur!"usecs"(usecs); 439 } 440 441 void read()(out DateTime x) // timestamp 442 { 443 long usecs = read!long; 444 x = PGEpochDateTime + dur!"usecs"(usecs); 445 } 446 447 void read()(out SysTime x) // timestamptz 448 { 449 long usecs = read!long; 450 x = SysTime(PGEpochDateTime + dur!"usecs"(usecs), UTC()); 451 x.timezone = LocalTime(); 452 } 453 454 // BUG: Does not support months 455 void read()(out core.time.Duration x) // interval 456 { 457 long usecs = read!long; 458 int days = read!int; 459 int months = read!int; 460 461 x = dur!"days"(days) + dur!"usecs"(usecs); 462 } 463 464 SysTime readTimeTz() // timetz 465 { 466 TimeOfDay time = read!TimeOfDay; 467 int zone = read!int / 60; // originally in seconds, convert it to minutes 468 Duration duration = dur!"minutes"(zone); 469 auto stz = new immutable SimpleTimeZone(duration); 470 return SysTime(DateTime(Date(0, 1, 1), time), stz); 471 } 472 473 T readComposite(T)() 474 { 475 alias DBRow!T Record; 476 477 static if (Record.hasStaticLength) 478 { 479 alias Record.fieldTypes fieldTypes; 480 481 static string genFieldAssigns() // CTFE 482 { 483 string s = ""; 484 485 foreach (i; 0 .. fieldTypes.length) 486 { 487 s ~= "read(fieldOid);\n"; 488 s ~= "read(fieldLen);\n"; 489 s ~= "if (fieldLen == -1)\n"; 490 s ~= text("record.setNull!(", i, ");\n"); 491 s ~= "else\n"; 492 s ~= text("record.set!(fieldTypes[", i, "], ", i, ")(", 493 "readBaseType!(fieldTypes[", i, "])(fieldOid, fieldLen)", 494 ");\n"); 495 // text() doesn't work with -inline option, CTFE bug 496 } 497 498 return s; 499 } 500 } 501 502 Record record; 503 504 int fieldCount, fieldLen; 505 uint fieldOid; 506 507 read(fieldCount); 508 509 static if (Record.hasStaticLength) 510 mixin(genFieldAssigns); 511 else 512 { 513 record.setLength(fieldCount); 514 515 foreach (i; 0 .. fieldCount) 516 { 517 read(fieldOid); 518 read(fieldLen); 519 520 if (fieldLen == -1) 521 record.setNull(i); 522 else 523 record[i] = readBaseType!(Record.ElemType)(fieldOid, fieldLen); 524 } 525 } 526 527 return record.base; 528 } 529 mixin template elmnt(U : U[]) 530 { 531 alias U ElemType; 532 } 533 private AT readDimension(AT)(int[] lengths, uint elementOid, int dim) 534 { 535 536 mixin elmnt!AT; 537 538 int length = lengths[dim]; 539 540 AT array; 541 static if (isDynamicArray!AT) 542 array.length = length; 543 544 int fieldLen; 545 546 foreach(i; 0 .. length) 547 { 548 static if (isArray!ElemType && !isSomeString!ElemType) 549 array[i] = readDimension!ElemType(lengths, elementOid, dim + 1); 550 else 551 { 552 static if (isNullable!ElemType) 553 alias nullableTarget!ElemType E; 554 else 555 alias ElemType E; 556 557 read(fieldLen); 558 if (fieldLen == -1) 559 { 560 static if (isNullable!ElemType || isSomeString!ElemType) 561 array[i] = null; 562 else 563 throw new Exception("Can't set NULL value to non nullable type"); 564 } 565 else 566 array[i] = readBaseType!E(elementOid, fieldLen); 567 } 568 } 569 570 return array; 571 } 572 573 T readArray(T)() 574 if (isArray!T) 575 { 576 alias multiArrayElemType!T U; 577 578 // todo: more validation, better lowerBounds support 579 int dims, hasNulls; 580 uint elementOid; 581 int[] lengths, lowerBounds; 582 583 read(dims); 584 read(hasNulls); // 0 or 1 585 read(elementOid); 586 587 if (dims == 0) 588 return T.init; 589 590 enforce(arrayDimensions!T == dims, "Dimensions of arrays do not match"); 591 static if (!isNullable!U && !isSomeString!U) 592 enforce(!hasNulls, "PostgreSQL returned NULLs but array elements are not Nullable"); 593 594 lengths.length = lowerBounds.length = dims; 595 596 int elementCount = 1; 597 598 foreach(i; 0 .. dims) 599 { 600 int len; 601 602 read(len); 603 read(lowerBounds[i]); 604 lengths[i] = len; 605 606 elementCount *= len; 607 } 608 609 T array = readDimension!T(lengths, elementOid, 0); 610 611 return array; 612 } 613 614 T readEnum(T)(int len) 615 { 616 string genCases() // CTFE 617 { 618 string s; 619 620 foreach (name; __traits(allMembers, T)) 621 { 622 s ~= text(`case "`, name, `": return T.`, name, `;`); 623 } 624 625 return s; 626 } 627 628 string enumMember = readString(len); 629 630 switch (enumMember) 631 { 632 mixin(genCases); 633 default: throw new ConvException("Can't set enum value '" ~ enumMember ~ "' to enum type " ~ T.stringof); 634 } 635 } 636 637 T readBaseType(T)(uint oid, int len = 0) 638 { 639 auto convError(T)() 640 { 641 string* type = oid in baseTypes; 642 return new ConvException("Can't convert PostgreSQL's type " ~ (type ? *type : to!string(oid)) ~ " to " ~ T.stringof); 643 } 644 645 switch (oid) 646 { 647 case 16: // bool 648 static if (isConvertible!(T, bool)) 649 return _to!T(read!bool); 650 else 651 throw convError!T(); 652 case 26, 24, 2202, 2203, 2204, 2205, 2206, 3734, 3769: // oid and reg*** aliases 653 static if (isConvertible!(T, uint)) 654 return _to!T(read!uint); 655 else 656 throw convError!T(); 657 case 21: // int2 658 static if (isConvertible!(T, short)) 659 return _to!T(read!short); 660 else 661 throw convError!T(); 662 case 23: // int4 663 static if (isConvertible!(T, int)) 664 return _to!T(read!int); 665 else 666 throw convError!T(); 667 case 20: // int8 668 static if (isConvertible!(T, long)) 669 return _to!T(read!long); 670 else 671 throw convError!T(); 672 case 700: // float4 673 static if (isConvertible!(T, float)) 674 return _to!T(read!float); 675 else 676 throw convError!T(); 677 case 701: // float8 678 static if (isConvertible!(T, double)) 679 return _to!T(read!double); 680 else 681 throw convError!T(); 682 case 1042, 1043, 25, 19, 705: // bpchar, varchar, text, name, unknown 683 static if (isConvertible!(T, string)) 684 return _to!T(readString(len)); 685 else 686 throw convError!T(); 687 case 17: // bytea 688 static if (isConvertible!(T, ubyte[])) 689 return _to!T(read!(ubyte[])(len)); 690 else 691 throw convError!T(); 692 case 2950: // UUID 693 static if(isConvertible!(T, UUID)) 694 return _to!T(read!UUID()); 695 else 696 throw convError!T(); 697 case 18: // "char" 698 static if (isConvertible!(T, char)) 699 return _to!T(read!char); 700 else 701 throw convError!T(); 702 case 1082: // date 703 static if (isConvertible!(T, Date)) 704 return _to!T(read!Date); 705 else 706 throw convError!T(); 707 case 1083: // time 708 static if (isConvertible!(T, TimeOfDay)) 709 return _to!T(read!TimeOfDay); 710 else 711 throw convError!T(); 712 case 1114: // timestamp 713 static if (isConvertible!(T, DateTime)) 714 return _to!T(read!DateTime); 715 else 716 throw convError!T(); 717 case 1184: // timestamptz 718 static if (isConvertible!(T, SysTime)) 719 return _to!T(read!SysTime); 720 else 721 throw convError!T(); 722 case 1186: // interval 723 static if (isConvertible!(T, core.time.Duration)) 724 return _to!T(read!(core.time.Duration)); 725 else 726 throw convError!T(); 727 case 1266: // timetz 728 static if (isConvertible!(T, SysTime)) 729 return _to!T(readTimeTz); 730 else 731 throw convError!T(); 732 case 2249: // record and other composite types 733 static if (isVariantN!T && T.allowed!(Variant[])) 734 return T(readComposite!(Variant[])); 735 else 736 return readComposite!T; 737 case 2287: // _record and other arrays 738 static if (isArray!T && !isSomeString!T) 739 return readArray!T; 740 else static if (isVariantN!T && T.allowed!(Variant[])) 741 return T(readArray!(Variant[])); 742 else 743 throw convError!T(); 744 case 114: //JSON 745 static if (isConvertible!(T, string)) 746 return _to!T(readString(len)); 747 else 748 throw convError!T(); 749 default: 750 if (oid in conn.arrayTypes) 751 goto case 2287; 752 else if (oid in conn.compositeTypes) 753 goto case 2249; 754 else if (oid in conn.enumTypes) 755 { 756 static if (is(T == enum)) 757 return readEnum!T(len); 758 else static if (isConvertible!(T, string)) 759 return _to!T(readString(len)); 760 else 761 throw convError!T(); 762 } 763 } 764 765 throw convError!T(); 766 } 767 } 768 769 // workaround, because std.conv currently doesn't support VariantN 770 template _to(T) 771 { 772 static if (isVariantN!T) 773 T _to(S)(S value) { T t = value; return t; } 774 else 775 T _to(A...)(A args) { return std.conv.to!T(args); } 776 } 777 778 template isConvertible(T, S) 779 { 780 static if (__traits(compiles, { S s; _to!T(s); }) || (isVariantN!T && T.allowed!S)) 781 enum isConvertible = true; 782 else 783 enum isConvertible = false; 784 } 785 786 template arrayDimensions(T : T[]) 787 { 788 static if (isArray!T && !isSomeString!T) 789 enum arrayDimensions = arrayDimensions!T + 1; 790 else 791 enum arrayDimensions = 1; 792 } 793 794 template arrayDimensions(T) 795 { 796 enum arrayDimensions = 0; 797 } 798 799 template multiArrayElemType(T : T[]) 800 { 801 static if (isArray!T && !isSomeString!T) 802 alias multiArrayElemType!T multiArrayElemType; 803 else 804 alias T multiArrayElemType; 805 } 806 807 template multiArrayElemType(T) 808 { 809 alias T multiArrayElemType; 810 } 811 812 static assert(arrayDimensions!(int) == 0); 813 static assert(arrayDimensions!(int[]) == 1); 814 static assert(arrayDimensions!(int[][]) == 2); 815 static assert(arrayDimensions!(int[][][]) == 3); 816 817 enum TransactionStatus : char { OutsideTransaction = 'I', InsideTransaction = 'T', InsideFailedTransaction = 'E' }; 818 819 enum string[int] baseTypes = [ 820 // boolean types 821 16 : "bool", 822 // bytea types 823 17 : "bytea", 824 // character types 825 18 : `"char"`, // "char" - 1 byte internal type 826 1042 : "bpchar", // char(n) - blank padded 827 1043 : "varchar", 828 25 : "text", 829 19 : "name", 830 // numeric types 831 21 : "int2", 832 23 : "int4", 833 20 : "int8", 834 700 : "float4", 835 701 : "float8", 836 1700 : "numeric" 837 ]; 838 839 public: 840 841 enum PGType : int 842 { 843 OID = 26, 844 NAME = 19, 845 REGPROC = 24, 846 BOOLEAN = 16, 847 BYTEA = 17, 848 CHAR = 18, // 1 byte "char", used internally in PostgreSQL 849 BPCHAR = 1042, // Blank Padded char(n), fixed size 850 VARCHAR = 1043, 851 TEXT = 25, 852 INT2 = 21, 853 INT4 = 23, 854 INT8 = 20, 855 FLOAT4 = 700, 856 FLOAT8 = 701, 857 858 // reference https://github.com/lpsmith/postgresql-simple/blob/master/src/Database/PostgreSQL/Simple/TypeInfo/Static.hs#L74 859 DATE = 1082, 860 TIME = 1083, 861 TIMESTAMP = 1114, 862 TIMESTAMPTZ = 1184, 863 INTERVAL = 1186, 864 TIMETZ = 1266, 865 866 JSON = 114, 867 JSONARRAY = 199 868 }; 869 870 class ParamException : Exception 871 { 872 this(string msg, string fn = __FILE__, size_t ln = __LINE__) @safe pure nothrow 873 { 874 super(msg, fn, ln); 875 } 876 } 877 878 /// Exception thrown on server error 879 class ServerErrorException: Exception 880 { 881 /// Contains information about this _error. Aliased to this. 882 ResponseMessage error; 883 alias error this; 884 885 this(string msg, string fn = __FILE__, size_t ln = __LINE__) @safe pure nothrow 886 { 887 super(msg, fn, ln); 888 } 889 890 this(ResponseMessage error, string fn = __FILE__, size_t ln = __LINE__) 891 { 892 super(error.toString(), fn, ln); 893 this.error = error; 894 } 895 } 896 897 /** 898 Class encapsulating errors and notices. 899 900 This class provides access to fields of ErrorResponse and NoticeResponse 901 sent by the server. More information about these fields can be found 902 $(LINK2 http://www.postgresql.org/docs/9.0/static/protocol-error-fields.html,here). 903 */ 904 class ResponseMessage 905 { 906 private string[char] fields; 907 908 private string getOptional(char type) 909 { 910 string* p = type in fields; 911 return p ? *p : ""; 912 } 913 914 /// Message fields 915 @property string severity() 916 { 917 return fields['S']; 918 } 919 920 /// ditto 921 @property string code() 922 { 923 return fields['C']; 924 } 925 926 /// ditto 927 @property string message() 928 { 929 return fields['M']; 930 } 931 932 /// ditto 933 @property string detail() 934 { 935 return getOptional('D'); 936 } 937 938 /// ditto 939 @property string hint() 940 { 941 return getOptional('H'); 942 } 943 944 /// ditto 945 @property string position() 946 { 947 return getOptional('P'); 948 } 949 950 /// ditto 951 @property string internalPosition() 952 { 953 return getOptional('p'); 954 } 955 956 /// ditto 957 @property string internalQuery() 958 { 959 return getOptional('q'); 960 } 961 962 /// ditto 963 @property string where() 964 { 965 return getOptional('W'); 966 } 967 968 /// ditto 969 @property string file() 970 { 971 return getOptional('F'); 972 } 973 974 /// ditto 975 @property string line() 976 { 977 return getOptional('L'); 978 } 979 980 /// ditto 981 @property string routine() 982 { 983 return getOptional('R'); 984 } 985 986 /** 987 Returns summary of this message using the most common fields (severity, 988 code, message, detail, hint) 989 */ 990 override string toString() 991 { 992 string s = severity ~ ' ' ~ code ~ ": " ~ message; 993 994 string* detail = 'D' in fields; 995 if (detail) 996 s ~= "\nDETAIL: " ~ *detail; 997 998 string* hint = 'H' in fields; 999 if (hint) 1000 s ~= "\nHINT: " ~ *hint; 1001 1002 return s; 1003 } 1004 } 1005 1006 /** 1007 Class representing connection to PostgreSQL server. 1008 */ 1009 class PGConnection 1010 { 1011 private: 1012 PGStream stream; 1013 string[string] serverParams; 1014 int serverProcessID; 1015 int serverSecretKey; 1016 TransactionStatus trStatus; 1017 ulong lastPrepared = 0; 1018 uint[uint] arrayTypes; 1019 uint[][uint] compositeTypes; 1020 string[uint][uint] enumTypes; 1021 bool activeResultSet; 1022 1023 string reservePrepared() 1024 { 1025 synchronized (this) 1026 { 1027 1028 return to!string(lastPrepared++); 1029 } 1030 } 1031 1032 Message getMessage() 1033 { 1034 1035 char type; 1036 int len; 1037 ubyte[1] ub; 1038 stream.read(ub); // message type 1039 1040 type = bigEndianToNative!char(ub); 1041 ubyte[4] ubi; 1042 stream.read(ubi); // message length, doesn't include type byte 1043 1044 len = bigEndianToNative!int(ubi) - 4; 1045 1046 ubyte[] msg; 1047 if (len > 0) 1048 { 1049 msg = new ubyte[len]; 1050 stream.read(msg); 1051 } 1052 1053 return Message(this, type, msg); 1054 } 1055 1056 void sendStartupMessage(const string[string] params) 1057 { 1058 bool localParam(string key) 1059 { 1060 switch (key) 1061 { 1062 case "host", "port", "password": return true; 1063 default: return false; 1064 } 1065 } 1066 1067 int len = 9; // length (int), version number (int) and parameter-list's delimiter (byte) 1068 1069 foreach (key, value; params) 1070 { 1071 if (localParam(key)) 1072 continue; 1073 1074 len += key.length + value.length + 2; 1075 } 1076 1077 stream.write(len); 1078 stream.write(0x0003_0000); // version number 3 1079 foreach (key, value; params) 1080 { 1081 if (localParam(key)) 1082 continue; 1083 stream.writeCString(key); 1084 stream.writeCString(value); 1085 } 1086 stream.write(cast(ubyte)0); 1087 } 1088 1089 void sendPasswordMessage(string password) 1090 { 1091 int len = cast(int)(4 + password.length + 1); 1092 1093 stream.write('p'); 1094 stream.write(len); 1095 stream.writeCString(password); 1096 } 1097 1098 void sendParseMessage(string statementName, string query, int[] oids) 1099 { 1100 int len = cast(int)(4 + statementName.length + 1 + query.length + 1 + 2 + oids.length * 4); 1101 1102 stream.write('P'); 1103 stream.write(len); 1104 stream.writeCString(statementName); 1105 stream.writeCString(query); 1106 stream.write(cast(short)oids.length); 1107 1108 foreach (oid; oids) 1109 stream.write(oid); 1110 } 1111 1112 void sendCloseMessage(DescribeType type, string name) 1113 { 1114 stream.write('C'); 1115 stream.write(cast(int)(4 + 1 + name.length + 1)); 1116 stream.write(cast(char)type); 1117 stream.writeCString(name); 1118 } 1119 1120 void sendTerminateMessage() 1121 { 1122 stream.write('X'); 1123 stream.write(cast(int)4); 1124 } 1125 1126 void sendBindMessage(string portalName, string statementName, PGParameters params) 1127 { 1128 int paramsLen = 0; 1129 bool hasText = false; 1130 1131 foreach (param; params) 1132 { 1133 enforce(param.value.hasValue, new ParamException("Parameter $" ~ to!string(param.index) ~ " value is not initialized")); 1134 1135 void checkParam(T)(int len) 1136 { 1137 if (param.value != null) 1138 { 1139 enforce(param.value.convertsTo!T, new ParamException("Parameter's value is not convertible to " ~ T.stringof)); 1140 paramsLen += len; 1141 } 1142 } 1143 1144 /*final*/ switch (param.type) 1145 { 1146 case PGType.INT2: checkParam!short(2); break; 1147 case PGType.INT4: checkParam!int(4); break; 1148 case PGType.INT8: checkParam!long(8); break; 1149 case PGType.TEXT: 1150 paramsLen += param.value.coerce!string.length; 1151 hasText = true; 1152 break; 1153 case PGType.BYTEA: 1154 paramsLen += param.value.length; 1155 break; 1156 case PGType.JSON: 1157 paramsLen += param.value.coerce!string.length; // TODO: object serialisation 1158 break; 1159 case PGType.DATE: 1160 paramsLen += 4; break; 1161 case PGType.FLOAT4: checkParam!float(4); break; 1162 case PGType.FLOAT8: checkParam!double(8); break; 1163 case PGType.BOOLEAN: checkParam!bool(1); break; 1164 default: assert(0, "Not implemented " ~ to!string(param.type)); 1165 } 1166 } 1167 1168 int len = cast(int)( 4 + portalName.length + 1 + statementName.length + 1 + (hasText ? (params.length*2) : 2) + 2 + 2 + 1169 params.length * 4 + paramsLen + 2 + 2 ); 1170 1171 stream.write('B'); 1172 stream.write(len); 1173 stream.writeCString(portalName); 1174 stream.writeCString(statementName); 1175 if(hasText) 1176 { 1177 stream.write(cast(short) params.length); 1178 foreach(param; params) 1179 if(param.type == PGType.TEXT) 1180 stream.write(cast(short) 0); // text format 1181 else 1182 stream.write(cast(short) 1); // binary format 1183 } else { 1184 stream.write(cast(short)1); // one parameter format code 1185 stream.write(cast(short)1); // binary format 1186 } 1187 stream.write(cast(short)params.length); 1188 1189 foreach (param; params) 1190 { 1191 if (param.value == null) 1192 { 1193 stream.write(-1); 1194 continue; 1195 } 1196 1197 switch (param.type) 1198 { 1199 case PGType.INT2: 1200 stream.write(cast(int)2); 1201 stream.write(param.value.coerce!short); 1202 break; 1203 case PGType.INT4: 1204 stream.write(cast(int)4); 1205 stream.write(param.value.coerce!int); 1206 break; 1207 case PGType.INT8: 1208 stream.write(cast(int)8); 1209 stream.write(param.value.coerce!long); 1210 break; 1211 case PGType.FLOAT4: 1212 stream.write(cast(int)4); 1213 stream.write(param.value.coerce!float); 1214 break; 1215 case PGType.FLOAT8: 1216 stream.write(cast(int)8); 1217 stream.write(param.value.coerce!double); 1218 break; 1219 case PGType.TEXT: 1220 auto s = param.value.coerce!string; 1221 stream.write(cast(int) s.length); 1222 if(s.length) stream.write(cast(ubyte[]) s); 1223 break; 1224 case PGType.BYTEA: 1225 auto s = param.value; 1226 stream.write(cast(int) s.length); 1227 1228 ubyte[] x; 1229 x.length = s.length; 1230 for (int i = 0; i < x.length; i++) { 1231 x[i] = s[i].get!(ubyte); 1232 } 1233 stream.write(x); 1234 break; 1235 case PGType.JSON: 1236 auto s = param.value.coerce!string; 1237 stream.write(cast(int) s.length); 1238 stream.write(cast(ubyte[]) s); 1239 break; 1240 case PGType.DATE: 1241 stream.write(cast(int) 4); 1242 stream.write(Date.fromISOString(param.value.coerce!string)); 1243 break; 1244 case PGType.BOOLEAN: 1245 stream.write(cast(int) 1); 1246 stream.write(param.value.coerce!bool); 1247 break; 1248 default: 1249 assert(0, "Not implemented " ~ to!string(param.type)); 1250 } 1251 } 1252 1253 stream.write(cast(short)1); // one result format code 1254 stream.write(cast(short)1); // binary format 1255 } 1256 1257 enum DescribeType : char { Statement = 'S', Portal = 'P' } 1258 1259 void sendDescribeMessage(DescribeType type, string name) 1260 { 1261 stream.write('D'); 1262 stream.write(cast(int)(4 + 1 + name.length + 1)); 1263 stream.write(cast(char)type); 1264 stream.writeCString(name); 1265 } 1266 1267 void sendExecuteMessage(string portalName, int maxRows) 1268 { 1269 stream.write('E'); 1270 stream.write(cast(int)(4 + portalName.length + 1 + 4)); 1271 stream.writeCString(portalName); 1272 stream.write(cast(int)maxRows); 1273 } 1274 1275 void sendFlushMessage() 1276 { 1277 stream.write('H'); 1278 stream.write(cast(int)4); 1279 } 1280 1281 void sendSyncMessage() 1282 { 1283 stream.write('S'); 1284 stream.write(cast(int)4); 1285 } 1286 1287 ResponseMessage handleResponseMessage(Message msg) 1288 { 1289 enforce(msg.data.length >= 2); 1290 1291 char ftype; 1292 string fvalue; 1293 ResponseMessage response = new ResponseMessage; 1294 1295 while (true) 1296 { 1297 msg.read(ftype); 1298 if(ftype <=0) break; 1299 1300 msg.readCString(fvalue); 1301 response.fields[ftype] = fvalue; 1302 } 1303 1304 return response; 1305 } 1306 1307 void checkActiveResultSet() 1308 { 1309 enforce(!activeResultSet, "There's active result set, which must be closed first."); 1310 } 1311 1312 void prepare(string statementName, string query, PGParameters params) 1313 { 1314 checkActiveResultSet(); 1315 sendParseMessage(statementName, query, params.getOids()); 1316 1317 sendFlushMessage(); 1318 1319 receive: 1320 1321 Message msg = getMessage(); 1322 1323 switch (msg.type) 1324 { 1325 case 'E': 1326 // ErrorResponse 1327 ResponseMessage response = handleResponseMessage(msg); 1328 sendSyncMessage(); 1329 throw new ServerErrorException(response); 1330 case '1': 1331 // ParseComplete 1332 return; 1333 default: 1334 // async notice, notification 1335 goto receive; 1336 } 1337 } 1338 1339 void unprepare(string statementName) 1340 { 1341 checkActiveResultSet(); 1342 sendCloseMessage(DescribeType.Statement, statementName); 1343 sendFlushMessage(); 1344 1345 receive: 1346 1347 Message msg = getMessage(); 1348 1349 switch (msg.type) 1350 { 1351 case 'E': 1352 // ErrorResponse 1353 ResponseMessage response = handleResponseMessage(msg); 1354 throw new ServerErrorException(response); 1355 case '3': 1356 // CloseComplete 1357 return; 1358 default: 1359 // async notice, notification 1360 goto receive; 1361 } 1362 } 1363 1364 PGFields bind(string portalName, string statementName, PGParameters params) 1365 { 1366 checkActiveResultSet(); 1367 sendCloseMessage(DescribeType.Portal, portalName); 1368 sendBindMessage(portalName, statementName, params); 1369 sendDescribeMessage(DescribeType.Portal, portalName); 1370 sendFlushMessage(); 1371 1372 receive: 1373 1374 Message msg = getMessage(); 1375 1376 switch (msg.type) 1377 { 1378 case 'E': 1379 // ErrorResponse 1380 ResponseMessage response = handleResponseMessage(msg); 1381 sendSyncMessage(); 1382 throw new ServerErrorException(response); 1383 case '3': 1384 // CloseComplete 1385 goto receive; 1386 case '2': 1387 // BindComplete 1388 goto receive; 1389 case 'T': 1390 // RowDescription (response to Describe) 1391 PGField[] fields; 1392 short fieldCount; 1393 short formatCode; 1394 PGField fi; 1395 1396 msg.read(fieldCount); 1397 1398 fields.length = fieldCount; 1399 1400 foreach (i; 0..fieldCount) 1401 { 1402 msg.readCString(fi.name); 1403 msg.read(fi.tableOid); 1404 msg.read(fi.index); 1405 msg.read(fi.oid); 1406 msg.read(fi.typlen); 1407 msg.read(fi.modifier); 1408 msg.read(formatCode); 1409 1410 enforce(formatCode == 1, new Exception("Field's format code returned in RowDescription is not 1 (binary)")); 1411 1412 fields[i] = fi; 1413 } 1414 1415 return cast(PGFields)fields; 1416 case 'n': 1417 // NoData (response to Describe) 1418 return new immutable(PGField)[0]; 1419 default: 1420 // async notice, notification 1421 goto receive; 1422 } 1423 } 1424 1425 ulong executeNonQuery(string portalName, out uint oid) 1426 { 1427 checkActiveResultSet(); 1428 ulong rowsAffected = 0; 1429 1430 sendExecuteMessage(portalName, 0); 1431 sendSyncMessage(); 1432 sendFlushMessage(); 1433 1434 receive: 1435 1436 Message msg = getMessage(); 1437 1438 switch (msg.type) 1439 { 1440 case 'E': 1441 // ErrorResponse 1442 ResponseMessage response = handleResponseMessage(msg); 1443 throw new ServerErrorException(response); 1444 case 'D': 1445 // DataRow 1446 finalizeQuery(); 1447 throw new Exception("This query returned rows."); 1448 case 'C': 1449 // CommandComplete 1450 string tag; 1451 1452 msg.readCString(tag); 1453 1454 // GDC indexOf name conflict in std.string and std.algorithm 1455 auto s1 = std..string.indexOf(tag, ' '); 1456 if (s1 >= 0) { 1457 switch (tag[0 .. s1]) { 1458 case "INSERT": 1459 // INSERT oid rows 1460 auto s2 = lastIndexOf(tag, ' '); 1461 assert(s2 > s1); 1462 oid = to!uint(tag[s1 + 1 .. s2]); 1463 rowsAffected = to!ulong(tag[s2 + 1 .. $]); 1464 break; 1465 case "DELETE", "UPDATE", "MOVE", "FETCH": 1466 // DELETE rows 1467 rowsAffected = to!ulong(tag[s1 + 1 .. $]); 1468 break; 1469 default: 1470 // CREATE TABLE 1471 break; 1472 } 1473 } 1474 1475 goto receive; 1476 1477 case 'I': 1478 // EmptyQueryResponse 1479 goto receive; 1480 case 'Z': 1481 // ReadyForQuery 1482 return rowsAffected; 1483 default: 1484 // async notice, notification 1485 goto receive; 1486 } 1487 } 1488 1489 DBRow!Specs fetchRow(Specs...)(ref Message msg, ref PGFields fields) 1490 { 1491 alias DBRow!Specs Row; 1492 1493 static if (Row.hasStaticLength) 1494 { 1495 alias Row.fieldTypes fieldTypes; 1496 1497 static string genFieldAssigns() // CTFE 1498 { 1499 string s = ""; 1500 1501 foreach (i; 0 .. fieldTypes.length) 1502 { 1503 s ~= "msg.read(fieldLen);\n"; 1504 s ~= "if (fieldLen == -1)\n"; 1505 s ~= text("row.setNull!(", i, ")();\n"); 1506 s ~= "else\n"; 1507 s ~= text("row.set!(fieldTypes[", i, "], ", i, ")(", 1508 "msg.readBaseType!(fieldTypes[", i, "])(fields[", i, "].oid, fieldLen)", 1509 ");\n"); 1510 // text() doesn't work with -inline option, CTFE bug 1511 } 1512 1513 return s; 1514 } 1515 } 1516 1517 Row row; 1518 short fieldCount; 1519 int fieldLen; 1520 1521 msg.read(fieldCount); 1522 1523 static if (Row.hasStaticLength) 1524 { 1525 Row.checkReceivedFieldCount(fieldCount); 1526 mixin(genFieldAssigns); 1527 } 1528 else 1529 { 1530 row.setLength(fieldCount); 1531 1532 foreach (i; 0 .. fieldCount) 1533 { 1534 msg.read(fieldLen); 1535 if (fieldLen == -1) 1536 row.setNull(i); 1537 else 1538 row[i] = msg.readBaseType!(Row.ElemType)(fields[i].oid, fieldLen); 1539 } 1540 } 1541 1542 return row; 1543 } 1544 1545 void finalizeQuery() 1546 { 1547 Message msg; 1548 1549 do 1550 { 1551 msg = getMessage(); 1552 1553 // TODO: process async notifications 1554 } 1555 while (msg.type != 'Z'); // ReadyForQuery 1556 } 1557 1558 PGResultSet!Specs executeQuery(Specs...)(string portalName, ref PGFields fields) 1559 { 1560 checkActiveResultSet(); 1561 1562 PGResultSet!Specs result = new PGResultSet!Specs(this, fields, &fetchRow!Specs); 1563 1564 ulong rowsAffected = 0; 1565 1566 sendExecuteMessage(portalName, 0); 1567 sendSyncMessage(); 1568 sendFlushMessage(); 1569 1570 receive: 1571 1572 Message msg = getMessage(); 1573 1574 switch (msg.type) 1575 { 1576 case 'D': 1577 // DataRow 1578 alias DBRow!Specs Row; 1579 1580 result.row = fetchRow!Specs(msg, fields); 1581 static if (!Row.hasStaticLength) 1582 result.row.columnToIndex = &result.columnToIndex; 1583 result.validRow = true; 1584 result.nextMsg = getMessage(); 1585 1586 activeResultSet = true; 1587 1588 return result; 1589 case 'C': 1590 // CommandComplete 1591 string tag; 1592 1593 msg.readCString(tag); 1594 1595 auto s2 = lastIndexOf(tag, ' '); 1596 if (s2 >= 0) 1597 { 1598 rowsAffected = to!ulong(tag[s2 + 1 .. $]); 1599 } 1600 1601 goto receive; 1602 case 'I': 1603 // EmptyQueryResponse 1604 throw new Exception("Query string is empty."); 1605 case 's': 1606 // PortalSuspended 1607 throw new Exception("Command suspending is not supported."); 1608 case 'Z': 1609 // ReadyForQuery 1610 result.nextMsg = msg; 1611 return result; 1612 case 'E': 1613 // ErrorResponse 1614 ResponseMessage response = handleResponseMessage(msg); 1615 throw new ServerErrorException(response); 1616 default: 1617 // async notice, notification 1618 goto receive; 1619 } 1620 1621 assert(0); 1622 } 1623 1624 public: 1625 1626 1627 /** 1628 Opens connection to server. 1629 1630 Params: 1631 params = Associative array of string keys and values. 1632 1633 Currently recognized parameters are: 1634 $(UL 1635 $(LI host - Host name or IP address of the server. Required.) 1636 $(LI port - Port number of the server. Defaults to 5432.) 1637 $(LI user - The database user. Required.) 1638 $(LI database - The database to connect to. Defaults to the user name.) 1639 $(LI options - Command-line arguments for the backend. (This is deprecated in favor of setting individual run-time parameters.)) 1640 ) 1641 1642 In addition to the above, any run-time parameter that can be set at backend start time might be listed. 1643 Such settings will be applied during backend start (after parsing the command-line options if any). 1644 The values will act as session defaults. 1645 1646 Examples: 1647 --- 1648 auto conn = new PGConnection([ 1649 "host" : "localhost", 1650 "database" : "test", 1651 "user" : "postgres", 1652 "password" : "postgres" 1653 ]); 1654 --- 1655 */ 1656 this(const string[string] params) 1657 { 1658 enforce("host" in params, new ParamException("Required parameter 'host' not found")); 1659 enforce("user" in params, new ParamException("Required parameter 'user' not found")); 1660 1661 string[string] p = cast(string[string])params; 1662 1663 ushort port = "port" in params? parse!ushort(p["port"]) : 5432; 1664 1665 version(Have_vibe_d_core) 1666 { 1667 stream = new PGStream(new TCPConnectionWrapper(params["host"], port)); 1668 } 1669 else 1670 { 1671 stream = new PGStream(new TcpSocket); 1672 stream.socket.connect(new InternetAddress(params["host"], port)); 1673 } 1674 sendStartupMessage(params); 1675 1676 receive: 1677 1678 Message msg = getMessage(); 1679 1680 switch (msg.type) 1681 { 1682 case 'E', 'N': 1683 // ErrorResponse, NoticeResponse 1684 1685 ResponseMessage response = handleResponseMessage(msg); 1686 1687 if (msg.type == 'N') 1688 goto receive; 1689 1690 throw new ServerErrorException(response); 1691 case 'R': 1692 // AuthenticationXXXX 1693 enforce(msg.data.length >= 4); 1694 1695 int atype; 1696 1697 msg.read(atype); 1698 1699 switch (atype) 1700 { 1701 case 0: 1702 // authentication successful, now wait for another messages 1703 goto receive; 1704 case 3: 1705 // clear-text password is required 1706 enforce("password" in params, new ParamException("Required parameter 'password' not found")); 1707 enforce(msg.data.length == 4); 1708 1709 sendPasswordMessage(params["password"]); 1710 1711 goto receive; 1712 case 5: 1713 // MD5-hashed password is required, formatted as: 1714 // "md5" + md5(md5(password + username) + salt) 1715 // where md5() returns lowercase hex-string 1716 enforce("password" in params, new ParamException("Required parameter 'password' not found")); 1717 enforce(msg.data.length == 8); 1718 1719 char[3 + 32] password; 1720 password[0 .. 3] = "md5"; 1721 password[3 .. $] = MD5toHex(MD5toHex( 1722 params["password"], params["user"]), msg.data[4 .. 8]); 1723 1724 sendPasswordMessage(to!string(password)); 1725 1726 goto receive; 1727 default: 1728 // non supported authentication type, close connection 1729 this.close(); 1730 throw new Exception("Unsupported authentication type"); 1731 } 1732 1733 case 'S': 1734 // ParameterStatus 1735 enforce(msg.data.length >= 2); 1736 1737 string pname, pvalue; 1738 1739 msg.readCString(pname); 1740 msg.readCString(pvalue); 1741 1742 serverParams[pname] = pvalue; 1743 1744 goto receive; 1745 1746 case 'K': 1747 // BackendKeyData 1748 enforce(msg.data.length == 8); 1749 1750 msg.read(serverProcessID); 1751 msg.read(serverSecretKey); 1752 1753 goto receive; 1754 1755 case 'Z': 1756 // ReadyForQuery 1757 enforce(msg.data.length == 1); 1758 1759 msg.read(cast(char)trStatus); 1760 1761 // check for validity 1762 switch (trStatus) 1763 { 1764 case 'I', 'T', 'E': break; 1765 default: throw new Exception("Invalid transaction status"); 1766 } 1767 1768 // connection is opened and now it's possible to send queries 1769 reloadAllTypes(); 1770 return; 1771 default: 1772 // unknown message type, ignore it 1773 goto receive; 1774 } 1775 } 1776 1777 /// Closes current connection to the server. 1778 void close() 1779 { 1780 sendTerminateMessage(); 1781 stream.socket.close(); 1782 } 1783 1784 /// Shorthand methods using temporary PGCommand. Semantics is the same as PGCommand's. 1785 ulong executeNonQuery(string query) 1786 { 1787 scope cmd = new PGCommand(this, query); 1788 return cmd.executeNonQuery(); 1789 } 1790 1791 /// ditto 1792 PGResultSet!Specs executeQuery(Specs...)(string query) 1793 { 1794 scope cmd = new PGCommand(this, query); 1795 return cmd.executeQuery!Specs(); 1796 } 1797 1798 /// ditto 1799 DBRow!Specs executeRow(Specs...)(string query, bool throwIfMoreRows = true) 1800 { 1801 scope cmd = new PGCommand(this, query); 1802 return cmd.executeRow!Specs(throwIfMoreRows); 1803 } 1804 1805 /// ditto 1806 T executeScalar(T)(string query, bool throwIfMoreRows = true) 1807 { 1808 scope cmd = new PGCommand(this, query); 1809 return cmd.executeScalar!T(throwIfMoreRows); 1810 } 1811 1812 void reloadArrayTypes() 1813 { 1814 auto cmd = new PGCommand(this, "SELECT oid, typelem FROM pg_type WHERE typcategory = 'A'"); 1815 auto result = cmd.executeQuery!(uint, "arrayOid", uint, "elemOid"); 1816 scope(exit) result.close; 1817 1818 arrayTypes = null; 1819 1820 foreach (row; result) 1821 { 1822 arrayTypes[row.arrayOid] = row.elemOid; 1823 } 1824 1825 arrayTypes.rehash; 1826 } 1827 1828 void reloadCompositeTypes() 1829 { 1830 auto cmd = new PGCommand(this, "SELECT a.attrelid, a.atttypid FROM pg_attribute a JOIN pg_type t ON 1831 a.attrelid = t.typrelid WHERE a.attnum > 0 ORDER BY a.attrelid, a.attnum"); 1832 auto result = cmd.executeQuery!(uint, "typeOid", uint, "memberOid"); 1833 scope(exit) result.close; 1834 1835 compositeTypes = null; 1836 1837 uint lastOid = 0; 1838 uint[]* memberOids; 1839 1840 foreach (row; result) 1841 { 1842 if (row.typeOid != lastOid) 1843 { 1844 compositeTypes[lastOid = row.typeOid] = new uint[0]; 1845 memberOids = &compositeTypes[lastOid]; 1846 } 1847 1848 *memberOids ~= row.memberOid; 1849 } 1850 1851 compositeTypes.rehash; 1852 } 1853 1854 void reloadEnumTypes() 1855 { 1856 auto cmd = new PGCommand(this, "SELECT enumtypid, oid, enumlabel FROM pg_enum ORDER BY enumtypid, oid"); 1857 auto result = cmd.executeQuery!(uint, "typeOid", uint, "valueOid", string, "valueLabel"); 1858 scope(exit) result.close; 1859 1860 enumTypes = null; 1861 1862 uint lastOid = 0; 1863 string[uint]* enumValues; 1864 1865 foreach (row; result) 1866 { 1867 if (row.typeOid != lastOid) 1868 { 1869 if (lastOid > 0) 1870 (*enumValues).rehash; 1871 1872 enumTypes[lastOid = row.typeOid] = null; 1873 enumValues = &enumTypes[lastOid]; 1874 } 1875 1876 (*enumValues)[row.valueOid] = row.valueLabel; 1877 } 1878 1879 if (lastOid > 0) 1880 (*enumValues).rehash; 1881 1882 enumTypes.rehash; 1883 } 1884 1885 void reloadAllTypes() 1886 { 1887 // todo: make simpler type lists, since we need only oids of types (without their members) 1888 reloadArrayTypes(); 1889 reloadCompositeTypes(); 1890 reloadEnumTypes(); 1891 } 1892 } 1893 1894 /// Class representing single query parameter 1895 class PGParameter 1896 { 1897 private PGParameters params; 1898 immutable short index; 1899 immutable PGType type; 1900 private Variant _value; 1901 1902 /// Value bound to this parameter 1903 @property Variant value() 1904 { 1905 return _value; 1906 } 1907 /// ditto 1908 @property Variant value(T)(T v) 1909 { 1910 params.changed = true; 1911 return _value = Variant(v); 1912 } 1913 1914 private this(PGParameters params, short index, PGType type) 1915 { 1916 enforce(index > 0, new ParamException("Parameter's index must be > 0")); 1917 this.params = params; 1918 this.index = index; 1919 this.type = type; 1920 } 1921 } 1922 1923 /// Collection of query parameters 1924 class PGParameters 1925 { 1926 private PGParameter[short] params; 1927 private PGCommand cmd; 1928 private bool changed; 1929 1930 private int[] getOids() 1931 { 1932 short[] keys = params.keys; 1933 sort(keys); 1934 1935 int[] oids = new int[params.length]; 1936 1937 foreach (int i, key; keys) 1938 { 1939 oids[i] = params[key].type; 1940 } 1941 1942 return oids; 1943 } 1944 1945 /// 1946 @property short length() 1947 { 1948 return cast(short)params.length; 1949 } 1950 1951 private this(PGCommand cmd) 1952 { 1953 this.cmd = cmd; 1954 } 1955 1956 /** 1957 Creates and returns new parameter. 1958 Examples: 1959 --- 1960 // without spaces between $ and number 1961 auto cmd = new PGCommand(conn, "INSERT INTO users (name, surname) VALUES ($ 1, $ 2)"); 1962 cmd.parameters.add(1, PGType.TEXT).value = "John"; 1963 cmd.parameters.add(2, PGType.TEXT).value = "Doe"; 1964 1965 assert(cmd.executeNonQuery == 1); 1966 --- 1967 */ 1968 PGParameter add(short index, PGType type) 1969 { 1970 enforce(!cmd.prepared, "Can't add parameter to prepared statement."); 1971 changed = true; 1972 return params[index] = new PGParameter(this, index, type); 1973 } 1974 1975 // todo: remove() 1976 1977 PGParameter opIndex(short index) 1978 { 1979 return params[index]; 1980 } 1981 1982 int opApply(int delegate(ref PGParameter param) dg) 1983 { 1984 int result = 0; 1985 1986 foreach (number; sort(params.keys)) 1987 { 1988 result = dg(params[number]); 1989 1990 if (result) 1991 break; 1992 } 1993 1994 return result; 1995 } 1996 } 1997 1998 /// Array of fields returned by the server 1999 alias immutable(PGField)[] PGFields; 2000 2001 /// Contains information about fields returned by the server 2002 struct PGField 2003 { 2004 /// The field name. 2005 string name; 2006 /// If the field can be identified as a column of a specific table, the object ID of the table; otherwise zero. 2007 uint tableOid; 2008 /// If the field can be identified as a column of a specific table, the attribute number of the column; otherwise zero. 2009 short index; 2010 /// The object ID of the field's data type. 2011 uint oid; 2012 /// The data type size (see pg_type.typlen). Note that negative values denote variable-width types. 2013 short typlen; 2014 /// The type modifier (see pg_attribute.atttypmod). The meaning of the modifier is type-specific. 2015 int modifier; 2016 } 2017 2018 /// Class encapsulating prepared or non-prepared statements (commands). 2019 class PGCommand 2020 { 2021 private PGConnection conn; 2022 private string _query; 2023 private PGParameters params; 2024 private PGFields _fields = null; 2025 private string preparedName; 2026 private uint _lastInsertOid; 2027 private bool prepared; 2028 2029 /// List of parameters bound to this command 2030 @property PGParameters parameters() 2031 { 2032 return params; 2033 } 2034 2035 /// List of fields that will be returned from the server. Available after successful call to bind(). 2036 @property PGFields fields() 2037 { 2038 return _fields; 2039 } 2040 2041 /** 2042 Checks if this is query or non query command. Available after successful call to bind(). 2043 Returns: true if server returns at least one field (column). Otherwise false. 2044 */ 2045 @property bool isQuery() 2046 { 2047 enforce(_fields !is null, new Exception("bind() must be called first.")); 2048 return _fields.length > 0; 2049 } 2050 2051 /// Returns: true if command is currently prepared, otherwise false. 2052 @property bool isPrepared() 2053 { 2054 return prepared; 2055 } 2056 2057 /// Query assigned to this command. 2058 @property string query() 2059 { 2060 return _query; 2061 } 2062 /// ditto 2063 @property string query(string query) 2064 { 2065 enforce(!prepared, "Can't change query for prepared statement."); 2066 return _query = query; 2067 } 2068 2069 /// If table is with OIDs, it contains last inserted OID. 2070 @property uint lastInsertOid() 2071 { 2072 return _lastInsertOid; 2073 } 2074 2075 this(PGConnection conn, string query = "") 2076 { 2077 this.conn = conn; 2078 _query = query; 2079 params = new PGParameters(this); 2080 _fields = new immutable(PGField)[0]; 2081 preparedName = ""; 2082 prepared = false; 2083 } 2084 2085 /// Prepare this statement, i.e. cache query plan. 2086 void prepare() 2087 { 2088 enforce(!prepared, "This command is already prepared."); 2089 preparedName = conn.reservePrepared(); 2090 conn.prepare(preparedName, _query, params); 2091 prepared = true; 2092 params.changed = true; 2093 } 2094 2095 /// Unprepare this statement. Goes back to normal query planning. 2096 void unprepare() 2097 { 2098 enforce(prepared, "This command is not prepared."); 2099 conn.unprepare(preparedName); 2100 preparedName = ""; 2101 prepared = false; 2102 params.changed = true; 2103 } 2104 2105 /** 2106 Binds values to parameters and updates list of returned fields. 2107 2108 This is normally done automatically, but it may be useful to check what fields 2109 would be returned from a query, before executing it. 2110 */ 2111 void bind() 2112 { 2113 checkPrepared(false); 2114 _fields = conn.bind(preparedName, preparedName, params); 2115 params.changed = false; 2116 } 2117 2118 private void checkPrepared(bool bind) 2119 { 2120 if (!prepared) 2121 { 2122 // use unnamed statement & portal 2123 conn.prepare("", _query, params); 2124 if (bind) 2125 { 2126 _fields = conn.bind("", "", params); 2127 params.changed = false; 2128 } 2129 } 2130 } 2131 2132 private void checkBound() 2133 { 2134 if (params.changed) 2135 bind(); 2136 } 2137 2138 /** 2139 Executes a non query command, i.e. query which doesn't return any rows. Commonly used with 2140 data manipulation commands, such as INSERT, UPDATE and DELETE. 2141 Examples: 2142 --- 2143 auto cmd = new PGCommand(conn, "DELETE * FROM table"); 2144 auto deletedRows = cmd.executeNonQuery; 2145 cmd.query = "UPDATE table SET quantity = 1 WHERE price > 100"; 2146 auto updatedRows = cmd.executeNonQuery; 2147 cmd.query = "INSERT INTO table VALUES(1, 50)"; 2148 assert(cmd.executeNonQuery == 1); 2149 --- 2150 Returns: Number of affected rows. 2151 */ 2152 ulong executeNonQuery() 2153 { 2154 checkPrepared(true); 2155 checkBound(); 2156 return conn.executeNonQuery(preparedName, _lastInsertOid); 2157 } 2158 2159 /** 2160 Executes query which returns row sets, such as SELECT command. 2161 Params: 2162 bufferedRows = Number of rows that may be allocated at the same time. 2163 Returns: InputRange of DBRow!Specs. 2164 */ 2165 PGResultSet!Specs executeQuery(Specs...)() 2166 { 2167 checkPrepared(true); 2168 checkBound(); 2169 return conn.executeQuery!Specs(preparedName, _fields); 2170 } 2171 2172 /** 2173 Executes query and returns only first row of the result. 2174 Params: 2175 throwIfMoreRows = If true, throws Exception when result contains more than one row. 2176 Examples: 2177 --- 2178 auto cmd = new PGCommand(conn, "SELECT 1, 'abc'"); 2179 auto row1 = cmd.executeRow!(int, string); // returns DBRow!(int, string) 2180 assert(is(typeof(i[0]) == int) && is(typeof(i[1]) == string)); 2181 auto row2 = cmd.executeRow; // returns DBRow!(Variant[]) 2182 --- 2183 Throws: Exception if result doesn't contain any rows or field count do not match. 2184 Throws: Exception if result contains more than one row when throwIfMoreRows is true. 2185 */ 2186 DBRow!Specs executeRow(Specs...)(bool throwIfMoreRows = true) 2187 { 2188 auto result = executeQuery!Specs(); 2189 scope(exit) result.close(); 2190 enforce(!result.empty(), "Result doesn't contain any rows."); 2191 auto row = result.front(); 2192 if (throwIfMoreRows) 2193 { 2194 result.popFront(); 2195 enforce(result.empty(), "Result contains more than one row."); 2196 } 2197 return row; 2198 } 2199 2200 /** 2201 Executes query returning exactly one row and field. By default, returns Variant type. 2202 Params: 2203 throwIfMoreRows = If true, throws Exception when result contains more than one row. 2204 Examples: 2205 --- 2206 auto cmd = new PGCommand(conn, "SELECT 1"); 2207 auto i = cmd.executeScalar!int; // returns int 2208 assert(is(typeof(i) == int)); 2209 auto v = cmd.executeScalar; // returns Variant 2210 --- 2211 Throws: Exception if result doesn't contain any rows or if it contains more than one field. 2212 Throws: Exception if result contains more than one row when throwIfMoreRows is true. 2213 */ 2214 T executeScalar(T = Variant)(bool throwIfMoreRows = true) 2215 { 2216 auto result = executeQuery!T(); 2217 scope(exit) result.close(); 2218 enforce(!result.empty(), "Result doesn't contain any rows."); 2219 T row = result.front(); 2220 if (throwIfMoreRows) 2221 { 2222 result.popFront(); 2223 enforce(result.empty(), "Result contains more than one row."); 2224 } 2225 return row; 2226 } 2227 } 2228 2229 /// Input range of DBRow!Specs 2230 class PGResultSet(Specs...) 2231 { 2232 alias DBRow!Specs Row; 2233 alias Row delegate(ref Message msg, ref PGFields fields) FetchRowDelegate; 2234 2235 private FetchRowDelegate fetchRow; 2236 private PGConnection conn; 2237 private PGFields fields; 2238 private Row row; 2239 private bool validRow; 2240 private Message nextMsg; 2241 private size_t[][string] columnMap; 2242 2243 private this(PGConnection conn, ref PGFields fields, FetchRowDelegate dg) 2244 { 2245 this.conn = conn; 2246 this.fields = fields; 2247 this.fetchRow = dg; 2248 validRow = false; 2249 2250 foreach (i, field; fields) 2251 { 2252 size_t[]* indices = field.name in columnMap; 2253 2254 if (indices) 2255 *indices ~= i; 2256 else 2257 columnMap[field.name] = [i]; 2258 } 2259 } 2260 2261 private size_t columnToIndex(string column, size_t index) 2262 { 2263 size_t[]* indices = column in columnMap; 2264 enforce(indices, "Unknown column name"); 2265 return (*indices)[index]; 2266 } 2267 2268 pure nothrow bool empty() 2269 { 2270 return !validRow; 2271 } 2272 2273 void popFront() 2274 { 2275 if (nextMsg.type == 'D') 2276 { 2277 row = fetchRow(nextMsg, fields); 2278 static if (!Row.hasStaticLength) 2279 row.columnToIndex = &columnToIndex; 2280 validRow = true; 2281 nextMsg = conn.getMessage(); 2282 } 2283 else 2284 validRow = false; 2285 } 2286 2287 pure nothrow Row front() 2288 { 2289 return row; 2290 } 2291 2292 /// Closes current result set. It must be closed before issuing another query on the same connection. 2293 void close() 2294 { 2295 if (nextMsg.type != 'Z') 2296 conn.finalizeQuery(); 2297 conn.activeResultSet = false; 2298 } 2299 2300 int opApply(int delegate(ref Row row) dg) 2301 { 2302 int result = 0; 2303 2304 while (!empty) 2305 { 2306 result = dg(row); 2307 popFront; 2308 2309 if (result) 2310 break; 2311 } 2312 2313 return result; 2314 } 2315 2316 int opApply(int delegate(ref size_t i, ref Row row) dg) 2317 { 2318 int result = 0; 2319 size_t i; 2320 2321 while (!empty) 2322 { 2323 result = dg(i, row); 2324 popFront; 2325 i++; 2326 2327 if (result) 2328 break; 2329 } 2330 2331 return result; 2332 } 2333 } 2334 2335 2336 version(Have_vibe_d_core) 2337 { 2338 import vibe.core.connectionpool; 2339 2340 // wrap vibe.d TCPConnection class with the scope of reopening the tcp connection if closed 2341 // by PostgreSQL it for some reason. 2342 // see https://forum.rejectedsoftware.com/groups/rejectedsoftware.vibed/thread/44097/ 2343 private class TCPConnectionWrapper 2344 { 2345 this(string host, ushort port, string bindInterface = null, ushort bindPort = cast(ushort)0u) 2346 { 2347 this.host = host; 2348 this.port = port; 2349 this.bindInterface = bindInterface; 2350 this.bindPort = bindPort; 2351 2352 connect(); 2353 } 2354 2355 void close(){ tcpConnection.close(); } 2356 2357 void write(const(ubyte[]) bytes) 2358 { 2359 // Vibe: "... If connected is false, writing to the connection will trigger an exception ..." 2360 if (!tcpConnection.connected) 2361 { 2362 // Vibe: " ... Note that close must always be called, even if the remote has already closed the 2363 // connection. Failure to do so will result in resource and memory leakage. 2364 tcpConnection.close(); 2365 connect(); 2366 } 2367 tcpConnection.write(bytes); 2368 } 2369 2370 void read(ubyte[] dst) 2371 { 2372 if (!tcpConnection.connected) 2373 { 2374 tcpConnection.close(); 2375 connect(); 2376 } 2377 if (!tcpConnection.empty) 2378 { 2379 tcpConnection.read(dst); 2380 } 2381 } 2382 2383 private 2384 { 2385 void connect() 2386 { 2387 tcpConnection = connectTCP(host, port, bindInterface, bindPort); 2388 } 2389 2390 string host; 2391 string bindInterface; 2392 ushort port; 2393 ushort bindPort; 2394 2395 TCPConnection tcpConnection; 2396 } 2397 } 2398 2399 class PostgresDB { 2400 private { 2401 string[string] m_params; 2402 ConnectionPool!PGConnection m_pool; 2403 } 2404 2405 this(string[string] conn_params) 2406 { 2407 m_params = conn_params.dup; 2408 m_pool = new ConnectionPool!PGConnection(&createConnection); 2409 } 2410 2411 auto lockConnection() { return m_pool.lockConnection(); } 2412 2413 private PGConnection createConnection() 2414 { 2415 return new PGConnection(m_params); 2416 } 2417 } 2418 } 2419 else 2420 { 2421 class PostgresDB() { 2422 static assert(false, 2423 "The 'PostgresDB' connection pool requires Vibe.d and therefore "~ 2424 "must be used with -version=Have_vibe_d_core" 2425 ); 2426 } 2427 } 2428