logo       
Google Custom Search
    AddThis Social Bookmark Button

Re: quote_ident and schemas (was Re: [HACKERS] connectby: msg#00157

Subject: Re: quote_ident and schemas (was Re: [HACKERS] connectby
Joe Conway wrote:
p.s. There are similar issues in dblink, but they appear a bit more difficult to address. I'll attempt to get them resloved this weekend, again in hopes to get them applied before 7.3 is released.


Attached patch removes most (hopefully just the appropriate ones) calls in dblink to quote_ident, requiring the user to quote their own identifiers. I also added to the regression test a case for a quoted, schema qualified table name.

If it's not too late, I'd like this to get into 7.3, but in any case,
please apply to HEAD.

Thanks,

Joe
Index: contrib/dblink/README.dblink
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/README.dblink,v
retrieving revision 1.6
diff -c -r1.6 README.dblink
*** contrib/dblink/README.dblink        2 Sep 2002 06:13:31 -0000       1.6
--- contrib/dblink/README.dblink        23 Nov 2002 17:21:40 -0000
***************
*** 151,156 ****
--- 151,160 ----
  
  Documentation:
  
+   Note: Parameters representing relation names must include double
+      quotes if the names are mixed-case or contain special characters. They
+      must also be appropriately qualified with schema name if applicable.
+ 
    See the following files:
       doc/connection
       doc/cursor
Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/dblink.c,v
retrieving revision 1.18
diff -c -r1.18 dblink.c
*** contrib/dblink/dblink.c     13 Nov 2002 00:39:46 -0000      1.18
--- contrib/dblink/dblink.c     23 Nov 2002 16:45:36 -0000
***************
*** 71,76 ****
--- 71,77 ----
  static void append_res_ptr(dblink_results * results);
  static void remove_res_ptr(dblink_results * results);
  static TupleDesc pgresultGetTupleDesc(PGresult *res);
+ static char *generate_relation_name(Oid relid);
  
  /* Global */
  List     *res_id = NIL;
***************
*** 171,177 ****
        }
        PQclear(res);
  
!       appendStringInfo(str, "DECLARE %s CURSOR FOR %s", 
quote_ident_cstr(curname), sql);
        res = PQexec(conn, str->data);
        if (!res ||
                (PQresultStatus(res) != PGRES_COMMAND_OK &&
--- 172,178 ----
        }
        PQclear(res);
  
!       appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
        res = PQexec(conn, str->data);
        if (!res ||
                (PQresultStatus(res) != PGRES_COMMAND_OK &&
***************
*** 210,216 ****
        else
                elog(ERROR, "dblink_close: no connection available");
  
!       appendStringInfo(str, "CLOSE %s", quote_ident_cstr(curname));
  
        /* close the cursor */
        res = PQexec(conn, str->data);
--- 211,217 ----
        else
                elog(ERROR, "dblink_close: no connection available");
  
!       appendStringInfo(str, "CLOSE %s", curname);
  
        /* close the cursor */
        res = PQexec(conn, str->data);
***************
*** 287,293 ****
                else
                        elog(ERROR, "dblink_fetch: no connection available");
  
!               appendStringInfo(str, "FETCH %d FROM %s", howmany, 
quote_ident_cstr(curname));
  
                res = PQexec(conn, str->data);
                if (!res ||
--- 288,294 ----
                else
                        elog(ERROR, "dblink_fetch: no connection available");
  
!               appendStringInfo(str, "FETCH %d FROM %s", howmany, curname);
  
                res = PQexec(conn, str->data);
                if (!res ||
***************
*** 306,312 ****
                {
                        /* cursor does not exist - closed already or bad name */
                        PQclear(res);
!                       elog(ERROR, "dblink_fetch: cursor %s does not exist", 
quote_ident_cstr(curname));
                }
  
                funcctx->max_calls = PQntuples(res);
--- 307,313 ----
                {
                        /* cursor does not exist - closed already or bad name */
                        PQclear(res);
!                       elog(ERROR, "dblink_fetch: cursor %s does not exist", 
curname);
                }
  
                funcctx->max_calls = PQntuples(res);
***************
*** 1527,1537 ****
        int                     i;
        bool            needComma;
  
        /*
         * Open relation using relid
         */
        rel = relation_open(relid, AccessShareLock);
-       relname = RelationGetRelationName(rel);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
  
--- 1528,1540 ----
        int                     i;
        bool            needComma;
  
+       /* get relation name including any needed schema prefix and quoting */
+       relname = generate_relation_name(relid);
+ 
        /*
         * Open relation using relid
         */
        rel = relation_open(relid, AccessShareLock);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
  
***************
*** 1539,1545 ****
        if (!tuple)
                elog(ERROR, "dblink_build_sql_insert: row not found");
  
!       appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
  
        needComma = false;
        for (i = 0; i < natts; i++)
--- 1542,1548 ----
        if (!tuple)
                elog(ERROR, "dblink_build_sql_insert: row not found");
  
!       appendStringInfo(str, "INSERT INTO %s(", relname);
  
        needComma = false;
        for (i = 0; i < natts; i++)
***************
*** 1610,1624 ****
        char       *val;
        int                     i;
  
        /*
         * Open relation using relid
         */
        rel = relation_open(relid, AccessShareLock);
-       relname = RelationGetRelationName(rel);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
  
!       appendStringInfo(str, "DELETE FROM %s WHERE ", 
quote_ident_cstr(relname));
        for (i = 0; i < pknumatts; i++)
        {
                int16           pkattnum = pkattnums[i];
--- 1613,1629 ----
        char       *val;
        int                     i;
  
+       /* get relation name including any needed schema prefix and quoting */
+       relname = generate_relation_name(relid);
+ 
        /*
         * Open relation using relid
         */
        rel = relation_open(relid, AccessShareLock);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
  
!       appendStringInfo(str, "DELETE FROM %s WHERE ", relname);
        for (i = 0; i < pknumatts; i++)
        {
                int16           pkattnum = pkattnums[i];
***************
*** 1669,1679 ****
        int                     i;
        bool            needComma;
  
        /*
         * Open relation using relid
         */
        rel = relation_open(relid, AccessShareLock);
-       relname = RelationGetRelationName(rel);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
  
--- 1674,1686 ----
        int                     i;
        bool            needComma;
  
+       /* get relation name including any needed schema prefix and quoting */
+       relname = generate_relation_name(relid);
+ 
        /*
         * Open relation using relid
         */
        rel = relation_open(relid, AccessShareLock);
        tupdesc = rel->rd_att;
        natts = tupdesc->natts;
  
***************
*** 1681,1687 ****
        if (!tuple)
                elog(ERROR, "dblink_build_sql_update: row not found");
  
!       appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
  
        needComma = false;
        for (i = 0; i < natts; i++)
--- 1688,1694 ----
        if (!tuple)
                elog(ERROR, "dblink_build_sql_update: row not found");
  
!       appendStringInfo(str, "UPDATE %s SET ", relname);
  
        needComma = false;
        for (i = 0; i < natts; i++)
***************
*** 1813,1823 ****
        int                     i;
        char       *val = NULL;
  
        /*
         * Open relation using relid
         */
        rel = relation_open(relid, AccessShareLock);
-       relname = RelationGetRelationName(rel);
        tupdesc = CreateTupleDescCopy(rel->rd_att);
        relation_close(rel, AccessShareLock);
  
--- 1820,1832 ----
        int                     i;
        char       *val = NULL;
  
+       /* get relation name including any needed schema prefix and quoting */
+       relname = generate_relation_name(relid);
+ 
        /*
         * Open relation using relid
         */
        rel = relation_open(relid, AccessShareLock);
        tupdesc = CreateTupleDescCopy(rel->rd_att);
        relation_close(rel, AccessShareLock);
  
***************
*** 1831,1837 ****
         * Build sql statement to look up tuple of interest Use src_pkattvals
         * as the criteria.
         */
!       appendStringInfo(str, "SELECT * FROM %s WHERE ", 
quote_ident_cstr(relname));
  
        for (i = 0; i < pknumatts; i++)
        {
--- 1840,1846 ----
         * Build sql statement to look up tuple of interest Use src_pkattvals
         * as the criteria.
         */
!       appendStringInfo(str, "SELECT * FROM %s WHERE ", relname);
  
        for (i = 0; i < pknumatts; i++)
        {
***************
*** 2002,2005 ****
--- 2011,2048 ----
        }
  
        return desc;
+ }
+ 
+ /*
+  * generate_relation_name - copied from ruleutils.c
+  *            Compute the name to display for a relation specified by OID
+  *
+  * The result includes all necessary quoting and schema-prefixing.
+  */
+ static char *
+ generate_relation_name(Oid relid)
+ {
+       HeapTuple       tp;
+       Form_pg_class reltup;
+       char       *nspname;
+       char       *result;
+ 
+       tp = SearchSysCache(RELOID,
+                                               ObjectIdGetDatum(relid),
+                                               0, 0, 0);
+       if (!HeapTupleIsValid(tp))
+               elog(ERROR, "cache lookup of relation %u failed", relid);
+       reltup = (Form_pg_class) GETSTRUCT(tp);
+ 
+       /* Qualify the name if not visible in search path */
+       if (RelationIsVisible(relid))
+               nspname = NULL;
+       else
+               nspname = get_namespace_name(reltup->relnamespace);
+ 
+       result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
+ 
+       ReleaseSysCache(tp);
+ 
+       return result;
  }
Index: contrib/dblink/expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/expected/dblink.out,v
retrieving revision 1.6
diff -c -r1.6 dblink.out
*** contrib/dblink/expected/dblink.out  3 Nov 2002 04:52:09 -0000       1.6
--- contrib/dblink/expected/dblink.out  23 Nov 2002 17:05:41 -0000
***************
*** 59,64 ****
--- 59,101 ----
   DELETE FROM foo WHERE f1 = '0' AND f2 = 'a'
  (1 row)
  
+ -- retest using a quoted and schema qualified table
+ CREATE SCHEMA "MySchema";
+ CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key 
(f1,f2));
+ NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index 'Foo_pkey' for 
table 'Foo'
+ INSERT INTO "MySchema"."Foo" VALUES (0,'a','{"a0","b0","c0"}');
+ -- list the primary key fields
+ SELECT *
+ FROM dblink_get_pkey('"MySchema"."Foo"');
+  position | colname 
+ ----------+---------
+         1 | f1
+         2 | f2
+ (2 rows)
+ 
+ -- build an insert statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_insert('"MySchema"."Foo"','1 2',2,'{"0", 
"a"}','{"99", "xyz"}');
+                         dblink_build_sql_insert                         
+ ------------------------------------------------------------------------
+  INSERT INTO "MySchema"."Foo"(f1,f2,f3) VALUES('99','xyz','{a0,b0,c0}')
+ (1 row)
+ 
+ -- build an update statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_update('"MySchema"."Foo"','1 2',2,'{"0", 
"a"}','{"99", "xyz"}');
+                                        dblink_build_sql_update                
                       
+ 
-----------------------------------------------------------------------------------------------------
+  UPDATE "MySchema"."Foo" SET f1 = '99', f2 = 'xyz', f3 = '{a0,b0,c0}' WHERE 
f1 = '99' AND f2 = 'xyz'
+ (1 row)
+ 
+ -- build a delete statement based on a local tuple,
+ SELECT dblink_build_sql_delete('"MySchema"."Foo"','1 2',2,'{"0", "a"}');
+                  dblink_build_sql_delete                  
+ ----------------------------------------------------------
+  DELETE FROM "MySchema"."Foo" WHERE f1 = '0' AND f2 = 'a'
+ (1 row)
+ 
  -- regular old dblink
  SELECT *
  FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c 
text[])
Index: contrib/dblink/sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.6
diff -c -r1.6 dblink.sql
*** contrib/dblink/sql/dblink.sql       3 Nov 2002 04:52:09 -0000       1.6
--- contrib/dblink/sql/dblink.sql       23 Nov 2002 17:05:37 -0000
***************
*** 44,49 ****
--- 44,69 ----
  -- build a delete statement based on a local tuple,
  SELECT dblink_build_sql_delete('foo','1 2',2,'{"0", "a"}');
  
+ -- retest using a quoted and schema qualified table
+ CREATE SCHEMA "MySchema";
+ CREATE TABLE "MySchema"."Foo"(f1 int, f2 text, f3 text[], primary key 
(f1,f2));
+ INSERT INTO "MySchema"."Foo" VALUES (0,'a','{"a0","b0","c0"}');
+ 
+ -- list the primary key fields
+ SELECT *
+ FROM dblink_get_pkey('"MySchema"."Foo"');
+ 
+ -- build an insert statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_insert('"MySchema"."Foo"','1 2',2,'{"0", 
"a"}','{"99", "xyz"}');
+ 
+ -- build an update statement based on a local tuple,
+ -- replacing the primary key values with new ones
+ SELECT dblink_build_sql_update('"MySchema"."Foo"','1 2',2,'{"0", 
"a"}','{"99", "xyz"}');
+ 
+ -- build a delete statement based on a local tuple,
+ SELECT dblink_build_sql_delete('"MySchema"."Foo"','1 2',2,'{"0", "a"}');
+ 
  -- regular old dblink
  SELECT *
  FROM dblink('dbname=regression','SELECT * FROM foo') AS t(a int, b text, c 
text[])
---------------------------(end of broadcast)---------------------------
TIP 2: you can get off all lists at once with the unregister command
    (send "unregister YourEmailAddressHere" to majordomo@xxxxxxxxxxxxxx)

Try Searching:
servers, voip, java, networking, microsoft ...
<Prev in Thread] Current Thread [Next in Thread>