1 /** 2 PostgreSQL client implementation. 3 4 Features: 5 $(UL 6 $(LI Standalone (does not depend on libpq)) 7 $(LI Binary formatting (avoids parsing overhead)) 8 $(LI Prepared statements) 9 $(LI Parametrized queries (partially working)) 10 $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/datatype-enum.html, Enums)) 11 $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/arrays.html, Arrays)) 12 $(LI $(LINK2 http://www.postgresql.org/docs/9.0/static/rowtypes.html, Composite types)) 13 ) 14 15 TODOs: 16 $(UL 17 $(LI Redesign parametrized queries) 18 $(LI BigInt/Numeric types support) 19 $(LI Geometric types support) 20 $(LI Network types support) 21 $(LI Bit string types support) 22 $(LI UUID type support) 23 $(LI XML types support) 24 $(LI Transaction support) 25 $(LI Asynchronous notifications) 26 $(LI Better memory management) 27 $(LI More friendly PGFields) 28 ) 29 30 Bugs: 31 $(UL 32 $(LI Support only cleartext and MD5 $(LINK2 http://www.postgresql.org/docs/9.0/static/auth-methods.html, authentication)) 33 $(LI Unfinished parameter handling) 34 $(LI interval is converted to Duration, which does not support months) 35 ) 36 37 $(B Data type mapping:) 38 39 $(TABLE 40 $(TR $(TH PostgreSQL type) $(TH Aliases) $(TH Default D type) $(TH D type mapping possibilities)) 41 $(TR $(TD smallint) $(TD int2) $(TD short) <td rowspan="19">Any type convertible from default D type</td>) 42 $(TR $(TD integer) $(TD int4) $(TD int)) 43 $(TR $(TD bigint) $(TD int8) $(TD long)) 44 $(TR $(TD oid) $(TD reg***) $(TD uint)) 45 $(TR $(TD decimal) $(TD numeric) $(TD not yet supported)) 46 $(TR $(TD real) $(TD float4) $(TD float)) 47 $(TR $(TD double precision) $(TD float8) $(TD double)) 48 $(TR $(TD character varying(n)) $(TD varchar(n)) $(TD string)) 49 $(TR $(TD character(n)) $(TD char(n)) $(TD string)) 50 $(TR $(TD text) $(TD) $(TD string)) 51 $(TR $(TD "char") $(TD) $(TD char)) 52 $(TR $(TD bytea) $(TD) $(TD ubyte[])) 53 $(TR $(TD timestamp without time zone) $(TD timestamp) $(TD DateTime)) 54 $(TR $(TD timestamp with time zone) $(TD timestamptz) $(TD SysTime)) 55 $(TR $(TD date) $(TD) $(TD Date)) 56 $(TR $(TD time without time zone) $(TD time) $(TD TimeOfDay)) 57 $(TR $(TD time with time zone) $(TD timetz) $(TD SysTime)) 58 $(TR $(TD interval) $(TD) $(TD Duration (without months and years))) 59 $(TR $(TD boolean) $(TD bool) $(TD bool)) 60 $(TR $(TD enums) $(TD) $(TD string) $(TD enum)) 61 $(TR $(TD arrays) $(TD) $(TD Variant[]) $(TD dynamic/static array with compatible element type)) 62 $(TR $(TD composites) $(TD record, row) $(TD Variant[]) $(TD dynamic/static array, struct or Tuple)) 63 ) 64 65 Examples: 66 with vibe.d use -version=Have_vibe_d_core and use a ConnectionPool (PostgresDB Object & lockConnection) 67 --- 68 69 auto pdb = new PostgresDB([ 70 "host" : "192.168.2.50", 71 "database" : "postgres", 72 "user" : "postgres", 73 "password" : "" 74 ]); 75 auto conn = pdb.lockConnection(); 76 77 auto cmd = new PGCommand(conn, "SELECT typname, typlen FROM pg_type"); 78 auto result = cmd.executeQuery; 79 80 try 81 { 82 foreach (row; result) 83 { 84 writeln(row["typname"], ", ", row[1]); 85 } 86 } 87 finally 88 { 89 result.close; 90 } 91 92 --- 93 without vibe.d you can use std sockets with PGConnection object 94 95 --- 96 import std.stdio; 97 import ddb.postgres; 98 99 int main(string[] argv) 100 { 101 auto conn = new PGConnection([ 102 "host" : "localhost", 103 "database" : "test", 104 "user" : "postgres", 105 "password" : "postgres" 106 ]); 107 108 scope(exit) conn.close; 109 110 auto cmd = new PGCommand(conn, "SELECT typname, typlen FROM pg_type"); 111 auto result = cmd.executeQuery; 112 113 try 114 { 115 foreach (row; result) 116 { 117 writeln(row[0], ", ", row[1]); 118 } 119 } 120 finally 121 { 122 result.close; 123 } 124 125 return 0; 126 } 127 --- 128 129 Copyright: Copyright Piotr Szturmaj 2011-. 130 License: $(LINK2 http://boost.org/LICENSE_1_0.txt, Boost License 1.0). 131 Authors: Piotr Szturmaj 132 *//* 133 Documentation contains portions copied from PostgreSQL manual (mainly field information and 134 connection parameters description). License: 135 136 Portions Copyright (c) 1996-2010, The PostgreSQL Global Development Group 137 Portions Copyright (c) 1994, The Regents of the University of California 138 139 Permission to use, copy, modify, and distribute this software and its documentation for any purpose, 140 without fee, and without a written agreement is hereby granted, provided that the above copyright 141 notice and this paragraph and the following two paragraphs appear in all copies. 142 143 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, 144 INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, 145 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY 146 OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 147 148 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT 149 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 150 PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 151 CALIFORNIA HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, 152 OR MODIFICATIONS. 153 */ 154 module ddb.postgres; 155 156 version (Have_vibe_d_core) 157 { 158 import vibe.core.net; 159 import vibe.core.stream; 160 } 161 else 162 { 163 import std.socket; 164 } 165 import std.bitmanip; 166 import std.exception; 167 import std.conv; 168 import std.traits; 169 import std.typecons; 170 import std.string; 171 import std.digest.md; 172 import core.bitop; 173 import std.variant; 174 import std.algorithm; 175 import std.stdio; 176 import std.datetime; 177 import std.uuid; 178 public import ddb.db; 179 180 private: 181 182 const PGEpochDate = Date(2000, 1, 1); 183 const PGEpochDay = PGEpochDate.dayOfGregorianCal; 184 const PGEpochTime = TimeOfDay(0, 0, 0); 185 const PGEpochDateTime = DateTime(2000, 1, 1, 0, 0, 0); 186 187 class PGStream 188 { 189 version (Have_vibe_d_core) 190 { 191 private TCPConnectionWrapper m_socket; 192 193 @property TCPConnectionWrapper socket() 194 { 195 return m_socket; 196 } 197 198 this(TCPConnectionWrapper socket) 199 { 200 m_socket = socket; 201 } 202 } 203 else 204 { 205 private Socket m_socket; 206 207 @property Socket socket() 208 { 209 return m_socket; 210 } 211 212 this(Socket socket) 213 { 214 m_socket = socket; 215 } 216 } 217 218 protected void read(ubyte[] buffer) 219 { 220 version(Have_vibe_d_core) 221 { 222 m_socket.read(buffer); 223 } 224 else 225 { 226 if (buffer.length > 0) 227 { 228 ptrdiff_t so_far = 0; 229 do { 230 so_far += m_socket.receive(buffer[so_far .. $]); 231 } while (so_far < buffer.length); 232 } 233 } 234 } 235 236 void write(ubyte[] x) 237 { 238 version(Have_vibe_d_core) 239 { 240 m_socket.write(x); 241 } 242 else 243 { 244 if (x.length > 0) 245 { 246 m_socket.send(x); 247 } 248 } 249 } 250 251 void write(ubyte x) 252 { 253 write(nativeToBigEndian(x)); // ubyte[] 254 } 255 256 void write(short x) 257 { 258 write(nativeToBigEndian(x)); // ubyte[] 259 } 260 261 void write(int x) 262 { 263 write(nativeToBigEndian(x)); // ubyte[] 264 } 265 266 void write(long x) 267 { 268 write(nativeToBigEndian(x)); 269 } 270 271 void write(float x) 272 { 273 write(nativeToBigEndian(x)); // ubyte[] 274 } 275 276 void write(double x) 277 { 278 write(nativeToBigEndian(x)); 279 } 280 281 void writeString(string x) 282 { 283 ubyte[] ub = cast(ubyte[])(x); 284 write(ub); 285 } 286 287 void writeCString(string x) 288 { 289 writeString(x); 290 write('\0'); 291 } 292 293 void writeCString(char[] x) 294 { 295 write(cast(ubyte[])x); 296 write('\0'); 297 } 298 299 void write(const ref Date x) 300 { 301 write(cast(int)(x.dayOfGregorianCal - PGEpochDay)); 302 } 303 304 void write(Date x) 305 { 306 write(cast(int)(x.dayOfGregorianCal - PGEpochDay)); 307 } 308 309 void write(const ref TimeOfDay x) 310 { 311 write(cast(int)((x - PGEpochTime).total!"usecs")); 312 } 313 314 void write(const ref DateTime x) // timestamp 315 { 316 write(cast(int)((x - PGEpochDateTime).total!"usecs")); 317 } 318 319 void write(DateTime x) // timestamp 320 { 321 write(cast(int)((x - PGEpochDateTime).total!"usecs")); 322 } 323 324 void write(const ref SysTime x) // timestamptz 325 { 326 write(cast(int)((x - SysTime(PGEpochDateTime, UTC())).total!"usecs")); 327 } 328 329 // BUG: Does not support months 330 void write(const ref core.time.Duration x) // interval 331 { 332 int months = cast(int)(x.split!"weeks".weeks/28); 333 int days = cast(int)x.split!"days".days; 334 long usecs = x.total!"usecs" - convert!("days", "usecs")(days); 335 336 write(usecs); 337 write(days); 338 write(months); 339 } 340 341 void writeTimeTz(const ref SysTime x) // timetz 342 { 343 TimeOfDay t = cast(TimeOfDay)x; 344 write(t); 345 write(cast(int)0); 346 } 347 } 348 349 char[32] MD5toHex(T...)(in T data) 350 { 351 return md5Of(data).toHexString!(LetterCase.lower); 352 } 353 354 struct Message 355 { 356 PGConnection conn; 357 char type; 358 ubyte[] data; 359 360 private size_t position = 0; 361 362 T read(T, Params...)(Params p) 363 { 364 T value; 365 read(value, p); 366 return value; 367 } 368 369 void read()(out char x) 370 { 371 x = data[position++]; 372 } 373 374 375 void read(Int)(out Int x) if((isIntegral!Int || isFloatingPoint!Int) && Int.sizeof > 1) 376 { 377 ubyte[Int.sizeof] buf; 378 buf[] = data[position..position+Int.sizeof]; 379 x = bigEndianToNative!Int(buf); 380 position += Int.sizeof; 381 } 382 383 string readCString() 384 { 385 string x; 386 readCString(x); 387 return x; 388 } 389 390 void readCString(out string x) 391 { 392 ubyte* p = data.ptr + position; 393 394 while (*p > 0) 395 p++; 396 x = cast(string)data[position .. cast(size_t)(p - data.ptr)]; 397 position = cast(size_t)(p - data.ptr + 1); 398 } 399 400 string readString(int len) 401 { 402 string x; 403 readString(x, len); 404 return x; 405 } 406 407 void readString(out string x, int len) 408 { 409 x = cast(string)(data[position .. position + len]); 410 position += len; 411 } 412 413 void read()(out bool x) 414 { 415 x = cast(bool)data[position++]; 416 } 417 418 void read()(out ubyte[] x, int len) 419 { 420 enforce(position + len <= data.length); 421 x = data[position .. position + len]; 422 position += len; 423 } 424 425 void read()(out UUID u) // uuid 426 { 427 ubyte[16] uuidData = data[position .. position + 16]; 428 position += 16; 429 u = UUID(uuidData); 430 } 431 432 void read()(out Date x) // date 433 { 434 int days = read!int; // number of days since 1 Jan 2000 435 x = PGEpochDate + dur!"days"(days); 436 } 437 438 void read()(out TimeOfDay x) // time 439 { 440 long usecs = read!long; 441 x = PGEpochTime + dur!"usecs"(usecs); 442 } 443 444 void read()(out DateTime x) // timestamp 445 { 446 long usecs = read!long; 447 x = PGEpochDateTime + dur!"usecs"(usecs); 448 } 449 450 void read()(out SysTime x) // timestamptz 451 { 452 long usecs = read!long; 453 x = SysTime(PGEpochDateTime + dur!"usecs"(usecs), UTC()); 454 x.timezone = LocalTime(); 455 } 456 457 // BUG: Does not support months 458 void read()(out core.time.Duration x) // interval 459 { 460 long usecs = read!long; 461 int days = read!int; 462 int months = read!int; 463 464 x = dur!"days"(days) + dur!"usecs"(usecs); 465 } 466 467 SysTime readTimeTz() // timetz 468 { 469 TimeOfDay time = read!TimeOfDay; 470 int zone = read!int / 60; // originally in seconds, convert it to minutes 471 Duration duration = dur!"minutes"(zone); 472 auto stz = new immutable SimpleTimeZone(duration); 473 return SysTime(DateTime(Date(0, 1, 1), time), stz); 474 } 475 476 T readComposite(T)() 477 { 478 alias DBRow!T Record; 479 480 static if (Record.hasStaticLength) 481 { 482 alias Record.fieldTypes fieldTypes; 483 484 static string genFieldAssigns() // CTFE 485 { 486 string s = ""; 487 488 foreach (i; 0 .. fieldTypes.length) 489 { 490 s ~= "read(fieldOid);\n"; 491 s ~= "read(fieldLen);\n"; 492 s ~= "if (fieldLen == -1)\n"; 493 s ~= text("record.setNull!(", i, ");\n"); 494 s ~= "else\n"; 495 s ~= text("record.set!(fieldTypes[", i, "], ", i, ")(", 496 "readBaseType!(fieldTypes[", i, "])(fieldOid, fieldLen)", 497 ");\n"); 498 // text() doesn't work with -inline option, CTFE bug 499 } 500 501 return s; 502 } 503 } 504 505 Record record; 506 507 int fieldCount, fieldLen; 508 uint fieldOid; 509 510 read(fieldCount); 511 512 static if (Record.hasStaticLength) 513 mixin(genFieldAssigns); 514 else 515 { 516 record.setLength(fieldCount); 517 518 foreach (i; 0 .. fieldCount) 519 { 520 read(fieldOid); 521 read(fieldLen); 522 523 if (fieldLen == -1) 524 record.setNull(i); 525 else 526 record[i] = readBaseType!(Record.ElemType)(fieldOid, fieldLen); 527 } 528 } 529 530 return record.base; 531 } 532 mixin template elmnt(U : U[]) 533 { 534 alias U ElemType; 535 } 536 private AT readDimension(AT)(int[] lengths, uint elementOid, int dim) 537 { 538 539 mixin elmnt!AT; 540 541 int length = lengths[dim]; 542 543 AT array; 544 static if (isDynamicArray!AT) 545 array.length = length; 546 547 int fieldLen; 548 549 foreach(i; 0 .. length) 550 { 551 static if (isArray!ElemType && !isSomeString!ElemType) 552 array[i] = readDimension!ElemType(lengths, elementOid, dim + 1); 553 else 554 { 555 static if (isNullable!ElemType) 556 alias nullableTarget!ElemType E; 557 else 558 alias ElemType E; 559 560 read(fieldLen); 561 if (fieldLen == -1) 562 { 563 static if (isNullable!ElemType || isSomeString!ElemType) 564 array[i] = null; 565 else 566 throw new Exception("Can't set NULL value to non nullable type"); 567 } 568 else 569 array[i] = readBaseType!E(elementOid, fieldLen); 570 } 571 } 572 573 return array; 574 } 575 576 T readArray(T)() 577 if (isArray!T) 578 { 579 alias multiArrayElemType!T U; 580 581 // todo: more validation, better lowerBounds support 582 int dims, hasNulls; 583 uint elementOid; 584 int[] lengths, lowerBounds; 585 586 read(dims); 587 read(hasNulls); // 0 or 1 588 read(elementOid); 589 590 if (dims == 0) 591 return T.init; 592 593 enforce(arrayDimensions!T == dims, "Dimensions of arrays do not match"); 594 static if (!isNullable!U && !isSomeString!U) 595 enforce(!hasNulls, "PostgreSQL returned NULLs but array elements are not Nullable"); 596 597 lengths.length = lowerBounds.length = dims; 598 599 int elementCount = 1; 600 601 foreach(i; 0 .. dims) 602 { 603 int len; 604 605 read(len); 606 read(lowerBounds[i]); 607 lengths[i] = len; 608 609 elementCount *= len; 610 } 611 612 T array = readDimension!T(lengths, elementOid, 0); 613 614 return array; 615 } 616 617 T readEnum(T)(int len) 618 { 619 string genCases() // CTFE 620 { 621 string s; 622 623 foreach (name; __traits(allMembers, T)) 624 { 625 s ~= text(`case "`, name, `": return T.`, name, `;`); 626 } 627 628 return s; 629 } 630 631 string enumMember = readString(len); 632 633 switch (enumMember) 634 { 635 mixin(genCases); 636 default: throw new ConvException("Can't set enum value '" ~ enumMember ~ "' to enum type " ~ T.stringof); 637 } 638 } 639 640 T readBaseType(T)(uint oid, int len = 0) 641 { 642 auto convError(T)() 643 { 644 string* type = oid in baseTypes; 645 return new ConvException("Can't convert PostgreSQL's type " ~ (type ? *type : to!string(oid)) ~ " to " ~ T.stringof); 646 } 647 648 switch (oid) 649 { 650 case 16: // bool 651 static if (isConvertible!(T, bool)) 652 return _to!T(read!bool); 653 else 654 throw convError!T(); 655 case 26, 24, 2202, 2203, 2204, 2205, 2206, 3734, 3769: // oid and reg*** aliases 656 static if (isConvertible!(T, uint)) 657 return _to!T(read!uint); 658 else 659 throw convError!T(); 660 case 21: // int2 661 static if (isConvertible!(T, short)) 662 return _to!T(read!short); 663 else 664 throw convError!T(); 665 case 23: // int4 666 static if (isConvertible!(T, int)) 667 return _to!T(read!int); 668 else 669 throw convError!T(); 670 case 20: // int8 671 static if (isConvertible!(T, long)) 672 return _to!T(read!long); 673 else 674 throw convError!T(); 675 case 700: // float4 676 static if (isConvertible!(T, float)) 677 return _to!T(read!float); 678 else 679 throw convError!T(); 680 case 701: // float8 681 static if (isConvertible!(T, double)) 682 return _to!T(read!double); 683 else 684 throw convError!T(); 685 case 1042, 1043, 25, 19, 705: // bpchar, varchar, text, name, unknown 686 static if (isConvertible!(T, string)) 687 return _to!T(readString(len)); 688 else 689 throw convError!T(); 690 case 17: // bytea 691 static if (isConvertible!(T, ubyte[])) 692 return _to!T(read!(ubyte[])(len)); 693 else 694 throw convError!T(); 695 case 2950: // UUID 696 static if(isConvertible!(T, UUID)) 697 return _to!T(read!UUID()); 698 else 699 throw convError!T(); 700 case 18: // "char" 701 static if (isConvertible!(T, char)) 702 return _to!T(read!char); 703 else 704 throw convError!T(); 705 case 1082: // date 706 static if (isConvertible!(T, Date)) 707 return _to!T(read!Date); 708 else 709 throw convError!T(); 710 case 1083: // time 711 static if (isConvertible!(T, TimeOfDay)) 712 return _to!T(read!TimeOfDay); 713 else 714 throw convError!T(); 715 case 1114: // timestamp 716 static if (isConvertible!(T, DateTime)) 717 return _to!T(read!DateTime); 718 else 719 throw convError!T(); 720 case 1184: // timestamptz 721 static if (isConvertible!(T, SysTime)) 722 return _to!T(read!SysTime); 723 else 724 throw convError!T(); 725 case 1186: // interval 726 static if (isConvertible!(T, core.time.Duration)) 727 return _to!T(read!(core.time.Duration)); 728 else 729 throw convError!T(); 730 case 1266: // timetz 731 static if (isConvertible!(T, SysTime)) 732 return _to!T(readTimeTz); 733 else 734 throw convError!T(); 735 case 2249: // record and other composite types 736 static if (isVariantN!T && T.allowed!(Variant[])) 737 return T(readComposite!(Variant[])); 738 else 739 return readComposite!T; 740 case 2287: // _record and other arrays 741 static if (isArray!T && !isSomeString!T) 742 return readArray!T; 743 else static if (isVariantN!T && T.allowed!(Variant[])) 744 return T(readArray!(Variant[])); 745 else 746 throw convError!T(); 747 case 114: //JSON 748 static if (isConvertible!(T, string)) 749 return _to!T(readString(len)); 750 else 751 throw convError!T(); 752 default: 753 if (oid in conn.arrayTypes) 754 goto case 2287; 755 else if (oid in conn.compositeTypes) 756 goto case 2249; 757 else if (oid in conn.enumTypes) 758 { 759 static if (is(T == enum)) 760 return readEnum!T(len); 761 else static if (isConvertible!(T, string)) 762 return _to!T(readString(len)); 763 else 764 throw convError!T(); 765 } 766 } 767 768 throw convError!T(); 769 } 770 } 771 772 // workaround, because std.conv currently doesn't support VariantN 773 template _to(T) 774 { 775 static if (isVariantN!T) 776 T _to(S)(S value) { T t = value; return t; } 777 else 778 T _to(A...)(A args) { return std.conv.to!T(args); } 779 } 780 781 template isConvertible(T, S) 782 { 783 static if (__traits(compiles, { S s; _to!T(s); }) || (isVariantN!T && T.allowed!S)) 784 enum isConvertible = true; 785 else 786 enum isConvertible = false; 787 } 788 789 template arrayDimensions(T : T[]) 790 { 791 static if (isArray!T && !isSomeString!T) 792 enum arrayDimensions = arrayDimensions!T + 1; 793 else 794 enum arrayDimensions = 1; 795 } 796 797 template arrayDimensions(T) 798 { 799 enum arrayDimensions = 0; 800 } 801 802 template multiArrayElemType(T : T[]) 803 { 804 static if (isArray!T && !isSomeString!T) 805 alias multiArrayElemType!T multiArrayElemType; 806 else 807 alias T multiArrayElemType; 808 } 809 810 template multiArrayElemType(T) 811 { 812 alias T multiArrayElemType; 813 } 814 815 static assert(arrayDimensions!(int) == 0); 816 static assert(arrayDimensions!(int[]) == 1); 817 static assert(arrayDimensions!(int[][]) == 2); 818 static assert(arrayDimensions!(int[][][]) == 3); 819 820 enum TransactionStatus : char { OutsideTransaction = 'I', InsideTransaction = 'T', InsideFailedTransaction = 'E' } 821 822 enum string[int] baseTypes = [ 823 // boolean types 824 16 : "bool", 825 // bytea types 826 17 : "bytea", 827 // character types 828 18 : `"char"`, // "char" - 1 byte internal type 829 1042 : "bpchar", // char(n) - blank padded 830 1043 : "varchar", 831 25 : "text", 832 19 : "name", 833 // numeric types 834 21 : "int2", 835 23 : "int4", 836 20 : "int8", 837 700 : "float4", 838 701 : "float8", 839 1700 : "numeric" 840 ]; 841 842 public: 843 844 enum PGType : int 845 { 846 OID = 26, 847 NAME = 19, 848 REGPROC = 24, 849 BOOLEAN = 16, 850 BYTEA = 17, 851 CHAR = 18, // 1 byte "char", used internally in PostgreSQL 852 BPCHAR = 1042, // Blank Padded char(n), fixed size 853 VARCHAR = 1043, 854 TEXT = 25, 855 INT2 = 21, 856 INT4 = 23, 857 INT8 = 20, 858 FLOAT4 = 700, 859 FLOAT8 = 701, 860 861 // reference https://github.com/lpsmith/postgresql-simple/blob/master/src/Database/PostgreSQL/Simple/TypeInfo/Static.hs#L74 862 DATE = 1082, 863 TIME = 1083, 864 TIMESTAMP = 1114, 865 TIMESTAMPTZ = 1184, 866 INTERVAL = 1186, 867 TIMETZ = 1266, 868 869 JSON = 114, 870 JSONARRAY = 199 871 } 872 873 class ParamException : Exception 874 { 875 this(string msg, string fn = __FILE__, size_t ln = __LINE__) @safe pure nothrow 876 { 877 super(msg, fn, ln); 878 } 879 } 880 881 /// Exception thrown on server error 882 class ServerErrorException: Exception 883 { 884 /// Contains information about this _error. Aliased to this. 885 ResponseMessage error; 886 alias error this; 887 888 this(string msg, string fn = __FILE__, size_t ln = __LINE__) @safe pure nothrow 889 { 890 super(msg, fn, ln); 891 } 892 893 this(ResponseMessage error, string fn = __FILE__, size_t ln = __LINE__) 894 { 895 super(error.toString(), fn, ln); 896 this.error = error; 897 } 898 } 899 900 /** 901 Class encapsulating errors and notices. 902 903 This class provides access to fields of ErrorResponse and NoticeResponse 904 sent by the server. More information about these fields can be found 905 $(LINK2 http://www.postgresql.org/docs/9.0/static/protocol-error-fields.html,here). 906 */ 907 class ResponseMessage 908 { 909 private string[char] fields; 910 911 private string getOptional(char type) 912 { 913 string* p = type in fields; 914 return p ? *p : ""; 915 } 916 917 /// Message fields 918 @property string severity() 919 { 920 return fields['S']; 921 } 922 923 /// ditto 924 @property string code() 925 { 926 return fields['C']; 927 } 928 929 /// ditto 930 @property string message() 931 { 932 return fields['M']; 933 } 934 935 /// ditto 936 @property string detail() 937 { 938 return getOptional('D'); 939 } 940 941 /// ditto 942 @property string hint() 943 { 944 return getOptional('H'); 945 } 946 947 /// ditto 948 @property string position() 949 { 950 return getOptional('P'); 951 } 952 953 /// ditto 954 @property string internalPosition() 955 { 956 return getOptional('p'); 957 } 958 959 /// ditto 960 @property string internalQuery() 961 { 962 return getOptional('q'); 963 } 964 965 /// ditto 966 @property string where() 967 { 968 return getOptional('W'); 969 } 970 971 /// ditto 972 @property string file() 973 { 974 return getOptional('F'); 975 } 976 977 /// ditto 978 @property string line() 979 { 980 return getOptional('L'); 981 } 982 983 /// ditto 984 @property string routine() 985 { 986 return getOptional('R'); 987 } 988 989 /** 990 Returns summary of this message using the most common fields (severity, 991 code, message, detail, hint) 992 */ 993 override string toString() 994 { 995 string s = severity ~ ' ' ~ code ~ ": " ~ message; 996 997 string* detail = 'D' in fields; 998 if (detail) 999 s ~= "\nDETAIL: " ~ *detail; 1000 1001 string* hint = 'H' in fields; 1002 if (hint) 1003 s ~= "\nHINT: " ~ *hint; 1004 1005 return s; 1006 } 1007 } 1008 1009 /** 1010 Class representing connection to PostgreSQL server. 1011 */ 1012 class PGConnection 1013 { 1014 private: 1015 PGStream stream; 1016 string[string] serverParams; 1017 int serverProcessID; 1018 int serverSecretKey; 1019 TransactionStatus trStatus; 1020 ulong lastPrepared = 0; 1021 uint[uint] arrayTypes; 1022 uint[][uint] compositeTypes; 1023 string[uint][uint] enumTypes; 1024 bool activeResultSet; 1025 1026 string reservePrepared() 1027 { 1028 synchronized (this) 1029 { 1030 1031 return to!string(lastPrepared++); 1032 } 1033 } 1034 1035 Message getMessage() 1036 { 1037 1038 char type; 1039 int len; 1040 ubyte[1] ub; 1041 stream.read(ub); // message type 1042 1043 type = bigEndianToNative!char(ub); 1044 ubyte[4] ubi; 1045 stream.read(ubi); // message length, doesn't include type byte 1046 1047 len = bigEndianToNative!int(ubi) - 4; 1048 1049 ubyte[] msg; 1050 if (len > 0) 1051 { 1052 msg = new ubyte[len]; 1053 stream.read(msg); 1054 } 1055 1056 return Message(this, type, msg); 1057 } 1058 1059 void sendStartupMessage(const string[string] params) 1060 { 1061 bool localParam(string key) 1062 { 1063 switch (key) 1064 { 1065 case "host", "port", "password": return true; 1066 default: return false; 1067 } 1068 } 1069 1070 int len = 9; // length (int), version number (int) and parameter-list's delimiter (byte) 1071 1072 foreach (key, value; params) 1073 { 1074 if (localParam(key)) 1075 continue; 1076 1077 len += key.length + value.length + 2; 1078 } 1079 1080 stream.write(len); 1081 stream.write(0x0003_0000); // version number 3 1082 foreach (key, value; params) 1083 { 1084 if (localParam(key)) 1085 continue; 1086 stream.writeCString(key); 1087 stream.writeCString(value); 1088 } 1089 stream.write(cast(ubyte)0); 1090 } 1091 1092 void sendPasswordMessage(string password) 1093 { 1094 int len = cast(int)(4 + password.length + 1); 1095 1096 stream.write('p'); 1097 stream.write(len); 1098 stream.writeCString(password); 1099 } 1100 1101 void sendParseMessage(string statementName, string query, int[] oids) 1102 { 1103 int len = cast(int)(4 + statementName.length + 1 + query.length + 1 + 2 + oids.length * 4); 1104 1105 stream.write('P'); 1106 stream.write(len); 1107 stream.writeCString(statementName); 1108 stream.writeCString(query); 1109 stream.write(cast(short)oids.length); 1110 1111 foreach (oid; oids) 1112 stream.write(oid); 1113 } 1114 1115 void sendCloseMessage(DescribeType type, string name) 1116 { 1117 stream.write('C'); 1118 stream.write(cast(int)(4 + 1 + name.length + 1)); 1119 stream.write(cast(char)type); 1120 stream.writeCString(name); 1121 } 1122 1123 void sendTerminateMessage() 1124 { 1125 stream.write('X'); 1126 stream.write(cast(int)4); 1127 } 1128 1129 void sendBindMessage(string portalName, string statementName, PGParameters params) 1130 { 1131 int paramsLen = 0; 1132 bool hasText = false; 1133 1134 foreach (param; params) 1135 { 1136 enforce(param.value.hasValue, new ParamException("Parameter $" ~ to!string(param.index) ~ " value is not initialized")); 1137 1138 void checkParam(T)(int len) 1139 { 1140 if (param.value != null) 1141 { 1142 enforce(param.value.convertsTo!T, new ParamException("Parameter's value is not convertible to " ~ T.stringof)); 1143 paramsLen += len; 1144 } 1145 } 1146 1147 /*final*/ switch (param.type) 1148 { 1149 case PGType.INT2: checkParam!short(2); break; 1150 case PGType.INT4: checkParam!int(4); break; 1151 case PGType.INT8: checkParam!long(8); break; 1152 case PGType.TEXT: 1153 paramsLen += param.value.coerce!string.length; 1154 hasText = true; 1155 break; 1156 case PGType.BYTEA: 1157 paramsLen += param.value.length; 1158 break; 1159 case PGType.JSON: 1160 paramsLen += param.value.coerce!string.length; // TODO: object serialisation 1161 break; 1162 case PGType.DATE: 1163 paramsLen += 4; break; 1164 case PGType.FLOAT4: checkParam!float(4); break; 1165 case PGType.FLOAT8: checkParam!double(8); break; 1166 case PGType.BOOLEAN: checkParam!bool(1); break; 1167 default: assert(0, "Not implemented " ~ to!string(param.type)); 1168 } 1169 } 1170 1171 int len = cast(int)( 4 + portalName.length + 1 + statementName.length + 1 + (hasText ? (params.length*2) : 2) + 2 + 2 + 1172 params.length * 4 + paramsLen + 2 + 2 ); 1173 1174 stream.write('B'); 1175 stream.write(len); 1176 stream.writeCString(portalName); 1177 stream.writeCString(statementName); 1178 if(hasText) 1179 { 1180 stream.write(cast(short) params.length); 1181 foreach(param; params) 1182 if(param.type == PGType.TEXT) 1183 stream.write(cast(short) 0); // text format 1184 else 1185 stream.write(cast(short) 1); // binary format 1186 } else { 1187 stream.write(cast(short)1); // one parameter format code 1188 stream.write(cast(short)1); // binary format 1189 } 1190 stream.write(cast(short)params.length); 1191 1192 foreach (param; params) 1193 { 1194 if (param.value == null) 1195 { 1196 stream.write(-1); 1197 continue; 1198 } 1199 1200 switch (param.type) 1201 { 1202 case PGType.INT2: 1203 stream.write(cast(int)2); 1204 stream.write(param.value.coerce!short); 1205 break; 1206 case PGType.INT4: 1207 stream.write(cast(int)4); 1208 stream.write(param.value.coerce!int); 1209 break; 1210 case PGType.INT8: 1211 stream.write(cast(int)8); 1212 stream.write(param.value.coerce!long); 1213 break; 1214 case PGType.FLOAT4: 1215 stream.write(cast(int)4); 1216 stream.write(param.value.coerce!float); 1217 break; 1218 case PGType.FLOAT8: 1219 stream.write(cast(int)8); 1220 stream.write(param.value.coerce!double); 1221 break; 1222 case PGType.TEXT: 1223 auto s = param.value.coerce!string; 1224 stream.write(cast(int) s.length); 1225 if(s.length) stream.write(cast(ubyte[]) s); 1226 break; 1227 case PGType.BYTEA: 1228 auto s = param.value; 1229 stream.write(cast(int) s.length); 1230 1231 ubyte[] x; 1232 x.length = s.length; 1233 for (int i = 0; i < x.length; i++) { 1234 x[i] = s[i].get!(ubyte); 1235 } 1236 stream.write(x); 1237 break; 1238 case PGType.JSON: 1239 auto s = param.value.coerce!string; 1240 stream.write(cast(int) s.length); 1241 stream.write(cast(ubyte[]) s); 1242 break; 1243 case PGType.DATE: 1244 stream.write(cast(int) 4); 1245 stream.write(Date.fromISOString(param.value.coerce!string)); 1246 break; 1247 case PGType.BOOLEAN: 1248 stream.write(cast(int) 1); 1249 stream.write(param.value.coerce!bool); 1250 break; 1251 default: 1252 assert(0, "Not implemented " ~ to!string(param.type)); 1253 } 1254 } 1255 1256 stream.write(cast(short)1); // one result format code 1257 stream.write(cast(short)1); // binary format 1258 } 1259 1260 enum DescribeType : char { Statement = 'S', Portal = 'P' } 1261 1262 void sendDescribeMessage(DescribeType type, string name) 1263 { 1264 stream.write('D'); 1265 stream.write(cast(int)(4 + 1 + name.length + 1)); 1266 stream.write(cast(char)type); 1267 stream.writeCString(name); 1268 } 1269 1270 void sendExecuteMessage(string portalName, int maxRows) 1271 { 1272 stream.write('E'); 1273 stream.write(cast(int)(4 + portalName.length + 1 + 4)); 1274 stream.writeCString(portalName); 1275 stream.write(cast(int)maxRows); 1276 } 1277 1278 void sendFlushMessage() 1279 { 1280 stream.write('H'); 1281 stream.write(cast(int)4); 1282 } 1283 1284 void sendSyncMessage() 1285 { 1286 stream.write('S'); 1287 stream.write(cast(int)4); 1288 } 1289 1290 ResponseMessage handleResponseMessage(Message msg) 1291 { 1292 enforce(msg.data.length >= 2); 1293 1294 char ftype; 1295 string fvalue; 1296 ResponseMessage response = new ResponseMessage; 1297 1298 while (true) 1299 { 1300 msg.read(ftype); 1301 if(ftype <=0) break; 1302 1303 msg.readCString(fvalue); 1304 response.fields[ftype] = fvalue; 1305 } 1306 1307 return response; 1308 } 1309 1310 void checkActiveResultSet() 1311 { 1312 enforce(!activeResultSet, "There's active result set, which must be closed first."); 1313 } 1314 1315 void prepare(string statementName, string query, PGParameters params) 1316 { 1317 checkActiveResultSet(); 1318 sendParseMessage(statementName, query, params.getOids()); 1319 1320 sendFlushMessage(); 1321 1322 receive: 1323 1324 Message msg = getMessage(); 1325 1326 switch (msg.type) 1327 { 1328 case 'E': 1329 // ErrorResponse 1330 ResponseMessage response = handleResponseMessage(msg); 1331 sendSyncMessage(); 1332 throw new ServerErrorException(response); 1333 case '1': 1334 // ParseComplete 1335 return; 1336 default: 1337 // async notice, notification 1338 goto receive; 1339 } 1340 } 1341 1342 void unprepare(string statementName) 1343 { 1344 checkActiveResultSet(); 1345 sendCloseMessage(DescribeType.Statement, statementName); 1346 sendFlushMessage(); 1347 1348 receive: 1349 1350 Message msg = getMessage(); 1351 1352 switch (msg.type) 1353 { 1354 case 'E': 1355 // ErrorResponse 1356 ResponseMessage response = handleResponseMessage(msg); 1357 throw new ServerErrorException(response); 1358 case '3': 1359 // CloseComplete 1360 return; 1361 default: 1362 // async notice, notification 1363 goto receive; 1364 } 1365 } 1366 1367 PGFields bind(string portalName, string statementName, PGParameters params) 1368 { 1369 checkActiveResultSet(); 1370 sendCloseMessage(DescribeType.Portal, portalName); 1371 sendBindMessage(portalName, statementName, params); 1372 sendDescribeMessage(DescribeType.Portal, portalName); 1373 sendFlushMessage(); 1374 1375 receive: 1376 1377 Message msg = getMessage(); 1378 1379 switch (msg.type) 1380 { 1381 case 'E': 1382 // ErrorResponse 1383 ResponseMessage response = handleResponseMessage(msg); 1384 sendSyncMessage(); 1385 throw new ServerErrorException(response); 1386 case '3': 1387 // CloseComplete 1388 goto receive; 1389 case '2': 1390 // BindComplete 1391 goto receive; 1392 case 'T': 1393 // RowDescription (response to Describe) 1394 PGField[] fields; 1395 short fieldCount; 1396 short formatCode; 1397 PGField fi; 1398 1399 msg.read(fieldCount); 1400 1401 fields.length = fieldCount; 1402 1403 foreach (i; 0..fieldCount) 1404 { 1405 msg.readCString(fi.name); 1406 msg.read(fi.tableOid); 1407 msg.read(fi.index); 1408 msg.read(fi.oid); 1409 msg.read(fi.typlen); 1410 msg.read(fi.modifier); 1411 msg.read(formatCode); 1412 1413 enforce(formatCode == 1, new Exception("Field's format code returned in RowDescription is not 1 (binary)")); 1414 1415 fields[i] = fi; 1416 } 1417 1418 return cast(PGFields)fields; 1419 case 'n': 1420 // NoData (response to Describe) 1421 return new immutable(PGField)[0]; 1422 default: 1423 // async notice, notification 1424 goto receive; 1425 } 1426 } 1427 1428 ulong executeNonQuery(string portalName, out uint oid) 1429 { 1430 checkActiveResultSet(); 1431 ulong rowsAffected = 0; 1432 1433 sendExecuteMessage(portalName, 0); 1434 sendSyncMessage(); 1435 sendFlushMessage(); 1436 1437 receive: 1438 1439 Message msg = getMessage(); 1440 1441 switch (msg.type) 1442 { 1443 case 'E': 1444 // ErrorResponse 1445 ResponseMessage response = handleResponseMessage(msg); 1446 throw new ServerErrorException(response); 1447 case 'D': 1448 // DataRow 1449 finalizeQuery(); 1450 throw new Exception("This query returned rows."); 1451 case 'C': 1452 // CommandComplete 1453 string tag; 1454 1455 msg.readCString(tag); 1456 1457 // GDC indexOf name conflict in std.string and std.algorithm 1458 auto s1 = std..string.indexOf(tag, ' '); 1459 if (s1 >= 0) { 1460 switch (tag[0 .. s1]) { 1461 case "INSERT": 1462 // INSERT oid rows 1463 auto s2 = lastIndexOf(tag, ' '); 1464 assert(s2 > s1); 1465 oid = to!uint(tag[s1 + 1 .. s2]); 1466 rowsAffected = to!ulong(tag[s2 + 1 .. $]); 1467 break; 1468 case "DELETE", "UPDATE", "MOVE", "FETCH": 1469 // DELETE rows 1470 rowsAffected = to!ulong(tag[s1 + 1 .. $]); 1471 break; 1472 default: 1473 // CREATE TABLE 1474 break; 1475 } 1476 } 1477 1478 goto receive; 1479 1480 case 'I': 1481 // EmptyQueryResponse 1482 goto receive; 1483 case 'Z': 1484 // ReadyForQuery 1485 return rowsAffected; 1486 default: 1487 // async notice, notification 1488 goto receive; 1489 } 1490 } 1491 1492 DBRow!Specs fetchRow(Specs...)(ref Message msg, ref PGFields fields) 1493 { 1494 alias DBRow!Specs Row; 1495 1496 static if (Row.hasStaticLength) 1497 { 1498 alias Row.fieldTypes fieldTypes; 1499 1500 static string genFieldAssigns() // CTFE 1501 { 1502 string s = ""; 1503 1504 foreach (i; 0 .. fieldTypes.length) 1505 { 1506 s ~= "msg.read(fieldLen);\n"; 1507 s ~= "if (fieldLen == -1)\n"; 1508 s ~= text("row.setNull!(", i, ")();\n"); 1509 s ~= "else\n"; 1510 s ~= text("row.set!(fieldTypes[", i, "], ", i, ")(", 1511 "msg.readBaseType!(fieldTypes[", i, "])(fields[", i, "].oid, fieldLen)", 1512 ");\n"); 1513 // text() doesn't work with -inline option, CTFE bug 1514 } 1515 1516 return s; 1517 } 1518 } 1519 1520 Row row; 1521 short fieldCount; 1522 int fieldLen; 1523 1524 msg.read(fieldCount); 1525 1526 static if (Row.hasStaticLength) 1527 { 1528 Row.checkReceivedFieldCount(fieldCount); 1529 mixin(genFieldAssigns); 1530 } 1531 else 1532 { 1533 row.setLength(fieldCount); 1534 1535 foreach (i; 0 .. fieldCount) 1536 { 1537 msg.read(fieldLen); 1538 if (fieldLen == -1) 1539 row.setNull(i); 1540 else 1541 row[i] = msg.readBaseType!(Row.ElemType)(fields[i].oid, fieldLen); 1542 } 1543 } 1544 1545 return row; 1546 } 1547 1548 void finalizeQuery() 1549 { 1550 Message msg; 1551 1552 do 1553 { 1554 msg = getMessage(); 1555 1556 // TODO: process async notifications 1557 } 1558 while (msg.type != 'Z'); // ReadyForQuery 1559 } 1560 1561 PGResultSet!Specs executeQuery(Specs...)(string portalName, ref PGFields fields) 1562 { 1563 checkActiveResultSet(); 1564 1565 PGResultSet!Specs result = new PGResultSet!Specs(this, fields, &fetchRow!Specs); 1566 1567 ulong rowsAffected = 0; 1568 1569 sendExecuteMessage(portalName, 0); 1570 sendSyncMessage(); 1571 sendFlushMessage(); 1572 1573 receive: 1574 1575 Message msg = getMessage(); 1576 1577 switch (msg.type) 1578 { 1579 case 'D': 1580 // DataRow 1581 alias DBRow!Specs Row; 1582 1583 result.row = fetchRow!Specs(msg, fields); 1584 static if (!Row.hasStaticLength) 1585 result.row.columnToIndex = &result.columnToIndex; 1586 result.validRow = true; 1587 result.nextMsg = getMessage(); 1588 1589 activeResultSet = true; 1590 1591 return result; 1592 case 'C': 1593 // CommandComplete 1594 string tag; 1595 1596 msg.readCString(tag); 1597 1598 auto s2 = lastIndexOf(tag, ' '); 1599 if (s2 >= 0) 1600 { 1601 rowsAffected = to!ulong(tag[s2 + 1 .. $]); 1602 } 1603 1604 goto receive; 1605 case 'I': 1606 // EmptyQueryResponse 1607 throw new Exception("Query string is empty."); 1608 case 's': 1609 // PortalSuspended 1610 throw new Exception("Command suspending is not supported."); 1611 case 'Z': 1612 // ReadyForQuery 1613 result.nextMsg = msg; 1614 return result; 1615 case 'E': 1616 // ErrorResponse 1617 ResponseMessage response = handleResponseMessage(msg); 1618 throw new ServerErrorException(response); 1619 default: 1620 // async notice, notification 1621 goto receive; 1622 } 1623 1624 assert(0); 1625 } 1626 1627 public: 1628 1629 1630 /** 1631 Opens connection to server. 1632 1633 Params: 1634 params = Associative array of string keys and values. 1635 1636 Currently recognized parameters are: 1637 $(UL 1638 $(LI host - Host name or IP address of the server. Required.) 1639 $(LI port - Port number of the server. Defaults to 5432.) 1640 $(LI user - The database user. Required.) 1641 $(LI database - The database to connect to. Defaults to the user name.) 1642 $(LI options - Command-line arguments for the backend. (This is deprecated in favor of setting individual run-time parameters.)) 1643 ) 1644 1645 In addition to the above, any run-time parameter that can be set at backend start time might be listed. 1646 Such settings will be applied during backend start (after parsing the command-line options if any). 1647 The values will act as session defaults. 1648 1649 Examples: 1650 --- 1651 auto conn = new PGConnection([ 1652 "host" : "localhost", 1653 "database" : "test", 1654 "user" : "postgres", 1655 "password" : "postgres" 1656 ]); 1657 --- 1658 */ 1659 this(const string[string] params) 1660 { 1661 enforce("host" in params, new ParamException("Required parameter 'host' not found")); 1662 enforce("user" in params, new ParamException("Required parameter 'user' not found")); 1663 1664 string[string] p = cast(string[string])params.dup; 1665 1666 ushort port = "port" in params? parse!ushort(p["port"]) : 5432; 1667 1668 version(Have_vibe_d_core) 1669 { 1670 stream = new PGStream(new TCPConnectionWrapper(params["host"], port)); 1671 } 1672 else 1673 { 1674 stream = new PGStream(new TcpSocket); 1675 stream.socket.connect(new InternetAddress(params["host"], port)); 1676 } 1677 sendStartupMessage(params); 1678 1679 receive: 1680 1681 Message msg = getMessage(); 1682 1683 switch (msg.type) 1684 { 1685 case 'E', 'N': 1686 // ErrorResponse, NoticeResponse 1687 1688 ResponseMessage response = handleResponseMessage(msg); 1689 1690 if (msg.type == 'N') 1691 goto receive; 1692 1693 throw new ServerErrorException(response); 1694 case 'R': 1695 // AuthenticationXXXX 1696 enforce(msg.data.length >= 4); 1697 1698 int atype; 1699 1700 msg.read(atype); 1701 1702 switch (atype) 1703 { 1704 case 0: 1705 // authentication successful, now wait for another messages 1706 goto receive; 1707 case 3: 1708 // clear-text password is required 1709 enforce("password" in params, new ParamException("Required parameter 'password' not found")); 1710 enforce(msg.data.length == 4); 1711 1712 sendPasswordMessage(params["password"]); 1713 1714 goto receive; 1715 case 5: 1716 // MD5-hashed password is required, formatted as: 1717 // "md5" + md5(md5(password + username) + salt) 1718 // where md5() returns lowercase hex-string 1719 enforce("password" in params, new ParamException("Required parameter 'password' not found")); 1720 enforce(msg.data.length == 8); 1721 1722 char[3 + 32] password; 1723 password[0 .. 3] = "md5"; 1724 password[3 .. $] = MD5toHex(MD5toHex( 1725 params["password"], params["user"]), msg.data[4 .. 8]); 1726 1727 sendPasswordMessage(to!string(password)); 1728 1729 goto receive; 1730 default: 1731 // non supported authentication type, close connection 1732 this.close(); 1733 throw new Exception("Unsupported authentication type"); 1734 } 1735 1736 case 'S': 1737 // ParameterStatus 1738 enforce(msg.data.length >= 2); 1739 1740 string pname, pvalue; 1741 1742 msg.readCString(pname); 1743 msg.readCString(pvalue); 1744 1745 serverParams[pname] = pvalue; 1746 1747 goto receive; 1748 1749 case 'K': 1750 // BackendKeyData 1751 enforce(msg.data.length == 8); 1752 1753 msg.read(serverProcessID); 1754 msg.read(serverSecretKey); 1755 1756 goto receive; 1757 1758 case 'Z': 1759 // ReadyForQuery 1760 enforce(msg.data.length == 1); 1761 1762 msg.read(cast(char)trStatus); 1763 1764 // check for validity 1765 switch (trStatus) 1766 { 1767 case 'I', 'T', 'E': break; 1768 default: throw new Exception("Invalid transaction status"); 1769 } 1770 1771 // connection is opened and now it's possible to send queries 1772 reloadAllTypes(); 1773 return; 1774 default: 1775 // unknown message type, ignore it 1776 goto receive; 1777 } 1778 } 1779 1780 /// Closes current connection to the server. 1781 void close() 1782 { 1783 sendTerminateMessage(); 1784 stream.socket.close(); 1785 } 1786 1787 /// Shorthand methods using temporary PGCommand. Semantics is the same as PGCommand's. 1788 ulong executeNonQuery(string query) 1789 { 1790 scope cmd = new PGCommand(this, query); 1791 return cmd.executeNonQuery(); 1792 } 1793 1794 /// ditto 1795 PGResultSet!Specs executeQuery(Specs...)(string query) 1796 { 1797 scope cmd = new PGCommand(this, query); 1798 return cmd.executeQuery!Specs(); 1799 } 1800 1801 /// ditto 1802 DBRow!Specs executeRow(Specs...)(string query, bool throwIfMoreRows = true) 1803 { 1804 scope cmd = new PGCommand(this, query); 1805 return cmd.executeRow!Specs(throwIfMoreRows); 1806 } 1807 1808 /// ditto 1809 T executeScalar(T)(string query, bool throwIfMoreRows = true) 1810 { 1811 scope cmd = new PGCommand(this, query); 1812 return cmd.executeScalar!T(throwIfMoreRows); 1813 } 1814 1815 void reloadArrayTypes() 1816 { 1817 auto cmd = new PGCommand(this, "SELECT oid, typelem FROM pg_type WHERE typcategory = 'A'"); 1818 auto result = cmd.executeQuery!(uint, "arrayOid", uint, "elemOid"); 1819 scope(exit) result.close; 1820 1821 arrayTypes = null; 1822 1823 foreach (row; result) 1824 { 1825 arrayTypes[row.arrayOid] = row.elemOid; 1826 } 1827 1828 arrayTypes.rehash; 1829 } 1830 1831 void reloadCompositeTypes() 1832 { 1833 auto cmd = new PGCommand(this, "SELECT a.attrelid, a.atttypid FROM pg_attribute a JOIN pg_type t ON 1834 a.attrelid = t.typrelid WHERE a.attnum > 0 ORDER BY a.attrelid, a.attnum"); 1835 auto result = cmd.executeQuery!(uint, "typeOid", uint, "memberOid"); 1836 scope(exit) result.close; 1837 1838 compositeTypes = null; 1839 1840 uint lastOid = 0; 1841 uint[]* memberOids; 1842 1843 foreach (row; result) 1844 { 1845 if (row.typeOid != lastOid) 1846 { 1847 compositeTypes[lastOid = row.typeOid] = new uint[0]; 1848 memberOids = &compositeTypes[lastOid]; 1849 } 1850 1851 *memberOids ~= row.memberOid; 1852 } 1853 1854 compositeTypes.rehash; 1855 } 1856 1857 void reloadEnumTypes() 1858 { 1859 auto cmd = new PGCommand(this, "SELECT enumtypid, oid, enumlabel FROM pg_enum ORDER BY enumtypid, oid"); 1860 auto result = cmd.executeQuery!(uint, "typeOid", uint, "valueOid", string, "valueLabel"); 1861 scope(exit) result.close; 1862 1863 enumTypes = null; 1864 1865 uint lastOid = 0; 1866 string[uint]* enumValues; 1867 1868 foreach (row; result) 1869 { 1870 if (row.typeOid != lastOid) 1871 { 1872 if (lastOid > 0) 1873 (*enumValues).rehash; 1874 1875 enumTypes[lastOid = row.typeOid] = null; 1876 enumValues = &enumTypes[lastOid]; 1877 } 1878 1879 (*enumValues)[row.valueOid] = row.valueLabel; 1880 } 1881 1882 if (lastOid > 0) 1883 (*enumValues).rehash; 1884 1885 enumTypes.rehash; 1886 } 1887 1888 void reloadAllTypes() 1889 { 1890 // todo: make simpler type lists, since we need only oids of types (without their members) 1891 reloadArrayTypes(); 1892 reloadCompositeTypes(); 1893 reloadEnumTypes(); 1894 } 1895 } 1896 1897 /// Class representing single query parameter 1898 class PGParameter 1899 { 1900 private PGParameters params; 1901 immutable short index; 1902 immutable PGType type; 1903 private Variant _value; 1904 1905 /// Value bound to this parameter 1906 @property Variant value() 1907 { 1908 return _value; 1909 } 1910 /// ditto 1911 @property Variant value(T)(T v) 1912 { 1913 params.changed = true; 1914 return _value = Variant(v); 1915 } 1916 1917 private this(PGParameters params, short index, PGType type) 1918 { 1919 enforce(index > 0, new ParamException("Parameter's index must be > 0")); 1920 this.params = params; 1921 this.index = index; 1922 this.type = type; 1923 } 1924 } 1925 1926 /// Collection of query parameters 1927 class PGParameters 1928 { 1929 private PGParameter[short] params; 1930 private PGCommand cmd; 1931 private bool changed; 1932 1933 private int[] getOids() 1934 { 1935 short[] keys = params.keys; 1936 sort(keys); 1937 1938 int[] oids = new int[params.length]; 1939 1940 foreach (size_t i, key; keys) 1941 { 1942 oids[i] = params[key].type; 1943 } 1944 1945 return oids; 1946 } 1947 1948 /// 1949 @property short length() 1950 { 1951 return cast(short)params.length; 1952 } 1953 1954 private this(PGCommand cmd) 1955 { 1956 this.cmd = cmd; 1957 } 1958 1959 /** 1960 Creates and returns new parameter. 1961 Examples: 1962 --- 1963 // without spaces between $ and number 1964 auto cmd = new PGCommand(conn, "INSERT INTO users (name, surname) VALUES ($ 1, $ 2)"); 1965 cmd.parameters.add(1, PGType.TEXT).value = "John"; 1966 cmd.parameters.add(2, PGType.TEXT).value = "Doe"; 1967 1968 assert(cmd.executeNonQuery == 1); 1969 --- 1970 */ 1971 PGParameter add(short index, PGType type) 1972 { 1973 enforce(!cmd.prepared, "Can't add parameter to prepared statement."); 1974 changed = true; 1975 return params[index] = new PGParameter(this, index, type); 1976 } 1977 1978 // todo: remove() 1979 1980 PGParameter opIndex(short index) 1981 { 1982 return params[index]; 1983 } 1984 1985 int opApply(int delegate(ref PGParameter param) dg) 1986 { 1987 int result = 0; 1988 1989 foreach (number; sort(params.keys)) 1990 { 1991 result = dg(params[number]); 1992 1993 if (result) 1994 break; 1995 } 1996 1997 return result; 1998 } 1999 } 2000 2001 /// Array of fields returned by the server 2002 alias immutable(PGField)[] PGFields; 2003 2004 /// Contains information about fields returned by the server 2005 struct PGField 2006 { 2007 /// The field name. 2008 string name; 2009 /// If the field can be identified as a column of a specific table, the object ID of the table; otherwise zero. 2010 uint tableOid; 2011 /// If the field can be identified as a column of a specific table, the attribute number of the column; otherwise zero. 2012 short index; 2013 /// The object ID of the field's data type. 2014 uint oid; 2015 /// The data type size (see pg_type.typlen). Note that negative values denote variable-width types. 2016 short typlen; 2017 /// The type modifier (see pg_attribute.atttypmod). The meaning of the modifier is type-specific. 2018 int modifier; 2019 } 2020 2021 /// Class encapsulating prepared or non-prepared statements (commands). 2022 class PGCommand 2023 { 2024 private PGConnection conn; 2025 private string _query; 2026 private PGParameters params; 2027 private PGFields _fields = null; 2028 private string preparedName; 2029 private uint _lastInsertOid; 2030 private bool prepared; 2031 2032 /// List of parameters bound to this command 2033 @property PGParameters parameters() 2034 { 2035 return params; 2036 } 2037 2038 /// List of fields that will be returned from the server. Available after successful call to bind(). 2039 @property PGFields fields() 2040 { 2041 return _fields; 2042 } 2043 2044 /** 2045 Checks if this is query or non query command. Available after successful call to bind(). 2046 Returns: true if server returns at least one field (column). Otherwise false. 2047 */ 2048 @property bool isQuery() 2049 { 2050 enforce(_fields !is null, new Exception("bind() must be called first.")); 2051 return _fields.length > 0; 2052 } 2053 2054 /// Returns: true if command is currently prepared, otherwise false. 2055 @property bool isPrepared() 2056 { 2057 return prepared; 2058 } 2059 2060 /// Query assigned to this command. 2061 @property string query() 2062 { 2063 return _query; 2064 } 2065 /// ditto 2066 @property string query(string query) 2067 { 2068 enforce(!prepared, "Can't change query for prepared statement."); 2069 return _query = query; 2070 } 2071 2072 /// If table is with OIDs, it contains last inserted OID. 2073 @property uint lastInsertOid() 2074 { 2075 return _lastInsertOid; 2076 } 2077 2078 this(PGConnection conn, string query = "") 2079 { 2080 this.conn = conn; 2081 _query = query; 2082 params = new PGParameters(this); 2083 _fields = new immutable(PGField)[0]; 2084 preparedName = ""; 2085 prepared = false; 2086 } 2087 2088 /// Prepare this statement, i.e. cache query plan. 2089 void prepare() 2090 { 2091 enforce(!prepared, "This command is already prepared."); 2092 preparedName = conn.reservePrepared(); 2093 conn.prepare(preparedName, _query, params); 2094 prepared = true; 2095 params.changed = true; 2096 } 2097 2098 /// Unprepare this statement. Goes back to normal query planning. 2099 void unprepare() 2100 { 2101 enforce(prepared, "This command is not prepared."); 2102 conn.unprepare(preparedName); 2103 preparedName = ""; 2104 prepared = false; 2105 params.changed = true; 2106 } 2107 2108 /** 2109 Binds values to parameters and updates list of returned fields. 2110 2111 This is normally done automatically, but it may be useful to check what fields 2112 would be returned from a query, before executing it. 2113 */ 2114 void bind() 2115 { 2116 checkPrepared(false); 2117 _fields = conn.bind(preparedName, preparedName, params); 2118 params.changed = false; 2119 } 2120 2121 private void checkPrepared(bool bind) 2122 { 2123 if (!prepared) 2124 { 2125 // use unnamed statement & portal 2126 conn.prepare("", _query, params); 2127 if (bind) 2128 { 2129 _fields = conn.bind("", "", params); 2130 params.changed = false; 2131 } 2132 } 2133 } 2134 2135 private void checkBound() 2136 { 2137 if (params.changed) 2138 bind(); 2139 } 2140 2141 /** 2142 Executes a non query command, i.e. query which doesn't return any rows. Commonly used with 2143 data manipulation commands, such as INSERT, UPDATE and DELETE. 2144 Examples: 2145 --- 2146 auto cmd = new PGCommand(conn, "DELETE * FROM table"); 2147 auto deletedRows = cmd.executeNonQuery; 2148 cmd.query = "UPDATE table SET quantity = 1 WHERE price > 100"; 2149 auto updatedRows = cmd.executeNonQuery; 2150 cmd.query = "INSERT INTO table VALUES(1, 50)"; 2151 assert(cmd.executeNonQuery == 1); 2152 --- 2153 Returns: Number of affected rows. 2154 */ 2155 ulong executeNonQuery() 2156 { 2157 checkPrepared(true); 2158 checkBound(); 2159 return conn.executeNonQuery(preparedName, _lastInsertOid); 2160 } 2161 2162 /** 2163 Executes query which returns row sets, such as SELECT command. 2164 Params: 2165 bufferedRows = Number of rows that may be allocated at the same time. 2166 Returns: InputRange of DBRow!Specs. 2167 */ 2168 PGResultSet!Specs executeQuery(Specs...)() 2169 { 2170 checkPrepared(true); 2171 checkBound(); 2172 return conn.executeQuery!Specs(preparedName, _fields); 2173 } 2174 2175 /** 2176 Executes query and returns only first row of the result. 2177 Params: 2178 throwIfMoreRows = If true, throws Exception when result contains more than one row. 2179 Examples: 2180 --- 2181 auto cmd = new PGCommand(conn, "SELECT 1, 'abc'"); 2182 auto row1 = cmd.executeRow!(int, string); // returns DBRow!(int, string) 2183 assert(is(typeof(i[0]) == int) && is(typeof(i[1]) == string)); 2184 auto row2 = cmd.executeRow; // returns DBRow!(Variant[]) 2185 --- 2186 Throws: Exception if result doesn't contain any rows or field count do not match. 2187 Throws: Exception if result contains more than one row when throwIfMoreRows is true. 2188 */ 2189 DBRow!Specs executeRow(Specs...)(bool throwIfMoreRows = true) 2190 { 2191 auto result = executeQuery!Specs(); 2192 scope(exit) result.close(); 2193 enforce(!result.empty(), "Result doesn't contain any rows."); 2194 auto row = result.front(); 2195 if (throwIfMoreRows) 2196 { 2197 result.popFront(); 2198 enforce(result.empty(), "Result contains more than one row."); 2199 } 2200 return row; 2201 } 2202 2203 /** 2204 Executes query returning exactly one row and field. By default, returns Variant type. 2205 Params: 2206 throwIfMoreRows = If true, throws Exception when result contains more than one row. 2207 Examples: 2208 --- 2209 auto cmd = new PGCommand(conn, "SELECT 1"); 2210 auto i = cmd.executeScalar!int; // returns int 2211 assert(is(typeof(i) == int)); 2212 auto v = cmd.executeScalar; // returns Variant 2213 --- 2214 Throws: Exception if result doesn't contain any rows or if it contains more than one field. 2215 Throws: Exception if result contains more than one row when throwIfMoreRows is true. 2216 */ 2217 T executeScalar(T = Variant)(bool throwIfMoreRows = true) 2218 { 2219 auto result = executeQuery!T(); 2220 scope(exit) result.close(); 2221 enforce(!result.empty(), "Result doesn't contain any rows."); 2222 T row = result.front(); 2223 if (throwIfMoreRows) 2224 { 2225 result.popFront(); 2226 enforce(result.empty(), "Result contains more than one row."); 2227 } 2228 return row; 2229 } 2230 } 2231 2232 /// Input range of DBRow!Specs 2233 class PGResultSet(Specs...) 2234 { 2235 alias DBRow!Specs Row; 2236 alias Row delegate(ref Message msg, ref PGFields fields) FetchRowDelegate; 2237 2238 private FetchRowDelegate fetchRow; 2239 private PGConnection conn; 2240 private PGFields fields; 2241 private Row row; 2242 private bool validRow; 2243 private Message nextMsg; 2244 private size_t[][string] columnMap; 2245 2246 private this(PGConnection conn, ref PGFields fields, FetchRowDelegate dg) 2247 { 2248 this.conn = conn; 2249 this.fields = fields; 2250 this.fetchRow = dg; 2251 validRow = false; 2252 2253 foreach (i, field; fields) 2254 { 2255 size_t[]* indices = field.name in columnMap; 2256 2257 if (indices) 2258 *indices ~= i; 2259 else 2260 columnMap[field.name] = [i]; 2261 } 2262 } 2263 2264 private size_t columnToIndex(string column, size_t index) 2265 { 2266 size_t[]* indices = column in columnMap; 2267 enforce(indices, "Unknown column name"); 2268 return (*indices)[index]; 2269 } 2270 2271 pure nothrow bool empty() 2272 { 2273 return !validRow; 2274 } 2275 2276 void popFront() 2277 { 2278 if (nextMsg.type == 'D') 2279 { 2280 row = fetchRow(nextMsg, fields); 2281 static if (!Row.hasStaticLength) 2282 row.columnToIndex = &columnToIndex; 2283 validRow = true; 2284 nextMsg = conn.getMessage(); 2285 } 2286 else 2287 validRow = false; 2288 } 2289 2290 pure nothrow Row front() 2291 { 2292 return row; 2293 } 2294 2295 /// Closes current result set. It must be closed before issuing another query on the same connection. 2296 void close() 2297 { 2298 if (nextMsg.type != 'Z') 2299 conn.finalizeQuery(); 2300 conn.activeResultSet = false; 2301 } 2302 2303 int opApply(int delegate(ref Row row) dg) 2304 { 2305 int result = 0; 2306 2307 while (!empty) 2308 { 2309 result = dg(row); 2310 popFront; 2311 2312 if (result) 2313 break; 2314 } 2315 2316 return result; 2317 } 2318 2319 int opApply(int delegate(ref size_t i, ref Row row) dg) 2320 { 2321 int result = 0; 2322 size_t i; 2323 2324 while (!empty) 2325 { 2326 result = dg(i, row); 2327 popFront; 2328 i++; 2329 2330 if (result) 2331 break; 2332 } 2333 2334 return result; 2335 } 2336 } 2337 2338 2339 version(Have_vibe_d_core) 2340 { 2341 import vibe.core.connectionpool; 2342 2343 // wrap vibe.d TCPConnection class with the scope of reopening the tcp connection if closed 2344 // by PostgreSQL it for some reason. 2345 // see https://forum.rejectedsoftware.com/groups/rejectedsoftware.vibed/thread/44097/ 2346 private class TCPConnectionWrapper 2347 { 2348 this(string host, ushort port, string bindInterface = null, ushort bindPort = cast(ushort)0u) 2349 { 2350 this.host = host; 2351 this.port = port; 2352 this.bindInterface = bindInterface; 2353 this.bindPort = bindPort; 2354 2355 connect(); 2356 } 2357 2358 void close(){ tcpConnection.close(); } 2359 2360 void write(const(ubyte[]) bytes) 2361 { 2362 // Vibe: "... If connected is false, writing to the connection will trigger an exception ..." 2363 if (!tcpConnection.connected) 2364 { 2365 // Vibe: " ... Note that close must always be called, even if the remote has already closed the 2366 // connection. Failure to do so will result in resource and memory leakage. 2367 tcpConnection.close(); 2368 connect(); 2369 } 2370 tcpConnection.write(bytes); 2371 } 2372 2373 void read(ubyte[] dst) 2374 { 2375 if (!tcpConnection.connected) 2376 { 2377 tcpConnection.close(); 2378 connect(); 2379 } 2380 if (!tcpConnection.empty) 2381 { 2382 tcpConnection.read(dst); 2383 } 2384 } 2385 2386 private 2387 { 2388 void connect() 2389 { 2390 tcpConnection = connectTCP(host, port, bindInterface, bindPort); 2391 } 2392 2393 string host; 2394 string bindInterface; 2395 ushort port; 2396 ushort bindPort; 2397 2398 TCPConnection tcpConnection; 2399 } 2400 } 2401 2402 class PostgresDB { 2403 private { 2404 string[string] m_params; 2405 ConnectionPool!PGConnection m_pool; 2406 } 2407 2408 this(string[string] conn_params) 2409 { 2410 m_params = conn_params.dup; 2411 m_pool = new ConnectionPool!PGConnection(&createConnection); 2412 } 2413 2414 auto lockConnection() { return m_pool.lockConnection(); } 2415 2416 private PGConnection createConnection() 2417 { 2418 return new PGConnection(m_params); 2419 } 2420 } 2421 } 2422 else 2423 { 2424 class PostgresDB() { 2425 static assert(false, 2426 "The 'PostgresDB' connection pool requires Vibe.d and therefore "~ 2427 "must be used with -version=Have_vibe_d_core" 2428 ); 2429 } 2430 } 2431