|
Re: code to perform COPY: msg#00259db.postgresql.jdbc
Michael, Any chance you can submit this as a context diff cvs diff -c Dave On Fri, 2002-06-28 at 16:48, Michael Adler wrote: > > Here's the goods. This has worked pretty well for me. I think it's ready > for the next stage of scrutiny. > > We may want to take a slightly different approach and move some of this > logic to PG_Stream. There's alot of code overhead on sending down the > transaction-control queries (BEGIN and COMMIT) and reading the response > analyses. I may not be taking advantage of some method, or we may want to > add functionality elsewhere to support copy in a clean fashion. > > The debug messages are very verbose, so we'll probably want to comment out > some of that. For large copies, it prints out a crude KB per second. I get > 220KB/sec over my LAN. It appears to be CPU bound on my P4 1.6ghz, so > perhaps that can be improved to the point of being network-bound. > > Like I mentioned in my previous email, since PG_Stream.input_stream is an > java.io.InputStream, the multi-byte read/receive method offers no > improvments over the single-byte method. I could very well be wrong, > though. I still haven't tested it over a high-latency connection. > > > // ***************** > // Postgres COPY handling from org.postgresql.Connection > // ***************** > > /** > * This will take the name of a table and an OutputStream, construct a > COPY OUT query, send the query > * ( while bypassing QueryExecutor), direct the output of the backend > into the OutputStream and > * then check the backend to see if it's ready for another query. > * @param table the table from which it will copy > * @param out OutputStream to which the copy will be sent > * @return void > * @exception Exception if a database access error occurs > * > **/ > > public void copyOut(String table, OutputStream out) throws Exception > { > synchronized(pg_stream) { > > String query = "COPY " + table + " TO STDOUT"; > if (Driver.logDebug) Driver.debug("Sending query '" + query + "' > to backend."); > > // duplicates statements in QueryExecutor.sendQuery > pg_stream.SendChar('Q'); > pg_stream.Send(this.getEncoding().encode( query )); > pg_stream.SendChar(0); > pg_stream.flush(); > > // check response from backend > int response = pg_stream.ReceiveChar(); > > if (response == 'H') { > if (Driver.logDebug) Driver.debug("Received 'H' from backend. > The COPY out is beginning."); > } else if ( response == 'E') { > String error_string = > pg_stream.ReceiveString(this.getEncoding()); > throw new SQLException( error_string ); > } else { > throw new SQLException("Copy Out should receive H from > backend, but instead received: " + (char)response ); > } > > long start_time = System.currentTimeMillis(); > > int a, b, c; > > // read input stream one char at a time, but always hold three > a = pg_stream.ReceiveChar(); > b = pg_stream.ReceiveChar(); > c = pg_stream.ReceiveChar(); > > int counter = 3; > > while (true) { > if ( Driver.logDebug && counter % 100000 == 0) { > int rate = counter / (int)(System.currentTimeMillis() - > start_time); > System.out.println( (counter / 1000) + " KB total at " + > rate + " KB/sec"); > } > > if ( a == '\\' && b == '.' && c == '\n' ) { > // this sequence of bytes means the copy is over > if (Driver.logDebug) Driver.debug("Received '\\.' from > backend. The COPY out stream is finished."); > break; > } > > out.write(a); > > a = b; > b = c; > > try { > // the following looks unoptimized, but is really the > same as java.io.OutputStream.read(byte[] b, int off, int len) > c = pg_stream.ReceiveChar(); > } catch (Exception e) { > // maybe the connection is screwed, or maybe we can > salvage it. I don't know. > throw e; > } > > counter++; > } > > // the backend should send the string "COPY" > String str = pg_stream.ReceiveString(this.getEncoding()); > > // check to make sure the backend is ready for the next query > response = pg_stream.ReceiveChar(); > > if (response == 'Z') { > if (Driver.logDebug) Driver.debug("Received 'Z' from backend. > It's ready for the next query. COPY out has completed"); > } else if ( response == 'E') { > String error_string = > pg_stream.ReceiveString(this.getEncoding()); > throw new SQLException( error_string ); > } else { > throw new SQLException("Copy should receive Z from backend, > but instead received: " + (char)response ); > } > } > > return; > } > > > /* > * This will take the name of a table and a ByteArrayInputStream, > construct a COPY IN query, > * send the query ( while bypassing QueryExecutor), send the bytes of > data and send the > * 3-byte sequence that signifies the end of the copy. We enclose this > in a transaction to ensure > * that the entire stream is copied in, or nothing at all. > * @param table the table to which it will copy > * @param in InputStream from which the copy will be read > * @return void > * @exception Exception if a database access error occurs > */ > > public void copyIn (String table, InputStream in) throws Exception > { > int response; > String str; > > synchronized(pg_stream) { > > // BEGIN the COPY in transaction > > pg_stream.SendChar('Q'); > pg_stream.Send(this.getEncoding().encode( "BEGIN" )); > pg_stream.SendChar(0); > pg_stream.flush(); > > // the backend shoul send the string "CBEGIN" - 'C' for > "Completed" and 'BEGIN' for the type of completed query > str = pg_stream.ReceiveString(this.getEncoding()); > if ( ! str.equals("CBEGIN") ) throw new SQLException("BEGIN > seemed to fail before COPY in. Received: '" + str + "'"); > response = pg_stream.ReceiveChar(); > if ( response != 'Z' ) throw new SQLException("Backend is not > ready for next query. Instead of 'Z', received: '" + (char)response + "'"); > > > // now "inside" a transaction and ready for COPYing in > > String query = "COPY " + table + " FROM STDIN"; > //if (Driver.logDebug) Driver.debug("Sending query '" + query + > "' to backend."); > > // duplicates statements in QueryExecutor.sendQuery > pg_stream.SendChar('Q'); > pg_stream.Send(this.getEncoding().encode( query )); > pg_stream.SendChar(0); > pg_stream.flush(); > > // check response from backend > response = pg_stream.ReceiveChar(); > > if (response == 'G') { > if (Driver.logDebug) Driver.debug("Received 'G' from backend. > The COPY in should now begin."); > } else if ( response == 'E') { > String error_string = > pg_stream.ReceiveString(this.getEncoding()); > throw new SQLException( error_string ); > } else { > throw new SQLException("Copy should receive G from backend, > but instead received: " + (char)response ); > } > > > // send the whole input stream > > int b; // a byte placeholder to read from in and send to backend > > while (true) { > > try { > b = in.read(); > } catch (IOException e) { > // maybe the connection is screwed, or maybe we can > salvage it. I don't know. > throw new SQLException("While reading from InputStream, > it threw exception: '" + e + "'"); > } > > // we may want to check for the termination string '\\.\n' > because we send it later. It wouldn't be good to send it twice > > if (b == -1) { > break; // the InputStream is finished > } else { > try { > pg_stream.SendChar((char)b); > } catch (IOException e) { > throw new SQLException("While sending a char to the > backend, it threw exception: '" + e + "'" ); > } > } > } > > // send the special row > if (Driver.logDebug) Driver.debug("Sending the byte seqence > '\\.'. The frontend has finished the COPY in stream."); > pg_stream.Send( new byte[] { (byte)'\\', (byte)'.', (byte)'\n' } > ); > pg_stream.flush(); > > // the backend send the string "CCOPY" - 'C' for "Completed" and > 'COPY' for the type of completed query > str = pg_stream.ReceiveString(this.getEncoding()); > //if (Driver.logDebug) Driver.debug( "Should be CCOPY: " + str); > > // check to make sure the backend is ready for the next query > response = pg_stream.ReceiveChar(); > > if (response == 'Z') { > if (Driver.logDebug) Driver.debug("Received 'Z' from backend. > It's ready for the next query. COPY in has completed"); > } else if ( response == 'E') { > String error_string = > pg_stream.ReceiveString(this.getEncoding()); > throw new SQLException( error_string ); > } else { > throw new SQLException("Copy should receive Z from backend, > but instead received: " + (char)response ); > } > > > // since we reached this point with an error, COMMIT the COPY in > transaction > > pg_stream.SendChar('Q'); > pg_stream.Send(this.getEncoding().encode( "COMMIT" )); > pg_stream.SendChar(0); > pg_stream.flush(); > > // the backend shoul send the string "CCOMMIT" - 'C' for > "Completed" and 'COMMIT' for the type of completed query > str = pg_stream.ReceiveString(this.getEncoding()); > if ( ! str.equals("CCOMMIT") ) throw new SQLException("COMMIT > seemed to fail after COPY in. Received: '" + str + "'"); > response = pg_stream.ReceiveChar(); > if ( response != 'Z' ) throw new SQLException("Backend is not > ready for next query. Instead of 'Z', received: '" + (char)response + "'"); > } > } > > > > > > > > > > > > // ***************** > // Postgres COPY testing from org.postgresql.test.JDBCTests > // ***************** > > > > /* > * Tests the copyOut functionality by copying data from pg_class into > a test table, > * then it copies that data out and check the number of 'rows' that > came out. > */ > public void testCopyOut() > { > try > { > org.postgresql.Connection conn = > (org.postgresql.Connection)JDBC2Tests.openDB(); > > java.sql.Statement st = conn.createStatement(); > st.executeUpdate( "INSERT into copy_out_test SELECT > relowner, relpages FROM pg_class LIMIT 20" ); > > ByteArrayOutputStream out = new ByteArrayOutputStream(); > java.sql.ResultSet rs = st.executeQuery( "SELECT count(*) > AS row_count FROM copy_out_test" ); > rs.first(); > int row_count = rs.getInt("row_count"); // the number of > rows in the table copy_out_test > > conn.copyOut("copy_out_test",out); > > ByteArrayInputStream in = new > ByteArrayInputStream(out.toByteArray()); > int newline_count = 0; > for ( int i = in.read(); i != -1; i = in.read() ) { > if (i == '\n') newline_count++; // count the number > of newlines > } > > // # of newlines should equal original LIMIT # > assertEquals(row_count,newline_count); > } > catch (Exception ex) > { > assertTrue(ex.getMessage(), false); > } > } > > > > /* > * Tests the copyIn functionality by copying in an array of bytes, > then checking their integrity in the table > */ > public void testCopyIn() > { > try > { > org.postgresql.Connection conn = > (org.postgresql.Connection)JDBC2Tests.openDB(); > java.sql.Statement st = conn.createStatement(); > byte[] dummy_array = new byte[] { > (byte)'T',(byte)'o',(byte)'m',(byte)'\t',(byte)'1',(byte)'2',(byte)'\n', > //Tom 12 > > (byte)'B',(byte)'o',(byte)'b',(byte)'\t',(byte)'2',(byte)'9',(byte)'\n', > //Bob 29 > > (byte)'H',(byte)'a',(byte)'n',(byte)'\t',(byte)'4',(byte)'1',(byte)'\n' > //Han 41 > }; > > InputStream in = new ByteArrayInputStream(dummy_array); > conn.copyIn("copy_in_test",in); > > java.sql.ResultSet rs = st.executeQuery( "SELECT sum(two) > AS sum FROM copy_in_test" ); > rs.first(); > int sum = rs.getInt("sum"); > assertEquals(sum,82); // the sum of column 'two' should > equal 82 (i.e. 12 + 29 + 41) > > } > catch (Exception ex) > { > assertTrue(ex.getMessage(), false); > } > } > > > > > > /* > * Test a sequence of copy commands by > * > */ > public void testCopyOutCopyIn() > { > try > { > org.postgresql.Connection conn = > (org.postgresql.Connection)JDBC2Tests.openDB(); > java.sql.Statement st = conn.createStatement(); > > st.executeUpdate( "INSERT into copy_out_copy_in_test > SELECT relowner, relpages FROM pg_class LIMIT 20" ); > > ByteArrayOutputStream out = new ByteArrayOutputStream(); > java.sql.ResultSet rs = st.executeQuery( "SELECT count(*) > AS row_count FROM copy_out_copy_in_test" ); > rs.first(); > int row_count = rs.getInt("row_count"); // the number of > rows in the table copy_out_copy_in_test > > conn.copyOut("copy_out_copy_in_test",out); > > ByteArrayInputStream in = new > ByteArrayInputStream(out.toByteArray()); > int newline_count = 0; > for ( int i = in.read(); i != -1; i = in.read() ) { > if (i == '\n') newline_count++; // count the number > of newlines > } > > // # of newlines should equal original LIMIT # > assertEquals(row_count,newline_count); > > in = new ByteArrayInputStream(out.toByteArray()); > conn.copyIn("copy_out_copy_in_test",in); > > rs = st.executeQuery( "SELECT count(*) AS row_count FROM > copy_out_copy_in_test" ); > rs.first(); > row_count = rs.getInt("row_count"); // the number of rows > in the table copy_out_copy_in_test > > assertEquals(row_count , newline_count*2); > } > catch (Exception ex) > { > assertTrue(ex.getMessage(), false); > } > } > > > > > /* > * Test a sequence of copy commands by copying in an array of bytes, > then copying them out > * and comparing the lengths of the two arrays. > */ > public void testCopyInCopyOut() > { > try > { > org.postgresql.Connection conn = > (org.postgresql.Connection)JDBC2Tests.openDB(); > > java.sql.Statement st = conn.createStatement(); > byte[] input_array = new byte[] { > (byte)'T',(byte)'o',(byte)'m',(byte)'\t',(byte)'1',(byte)'2',(byte)'\n', > > (byte)'B',(byte)'o',(byte)'b',(byte)'\t',(byte)'2',(byte)'9',(byte)'\n', > > (byte)'H',(byte)'a',(byte)'n',(byte)'\t',(byte)'4',(byte)'1',(byte)'\n' > }; > > InputStream in = new ByteArrayInputStream(input_array); > conn.copyIn("copy_in_copy_out_test",in); > > java.sql.ResultSet rs = st.executeQuery( "SELECT sum(two) > AS sum FROM copy_in_copy_out_test" ); > rs.first(); > int sum = rs.getInt("sum"); > assertEquals(sum,82); // the sum of column 'two' should > equal 82 (i.e. 12 + 29 + 41) > > // copy out the everything from the table we just copy'ed > into > > ByteArrayOutputStream out = new ByteArrayOutputStream(); > conn.copyOut("copy_in_copy_out_test",out); > > int input_length = input_array.length; > int output_length = out.size(); > assertEquals(input_length, output_length); // the input > and output array should be the same size > } > catch (Exception ex) > { > assertTrue(ex.getMessage(), false); > } > } > > > > > > > > > > > > > // ***************** > // test using pipes so that you don't have to read everything into memory > before you sent it out. > // ***************** > > > > final org.postgresql.Connection local_con = > (org.postgresql.Connection)DriverManager.getConnection("jdbc:postgresql://localhost/test1?loglevel=2", > "test" , "password"); > final org.postgresql.Connection local_con = > (org.postgresql.Connection)DriverManager.getConnection("jdbc:postgresql://localhost/test2?loglevel=2", > "test" , "password"); > > > final PipedOutputStream pout = new PipedOutputStream(); > final PipedInputStream pin = new PipedInputStream(pout); > > final Thread copierOut = new Thread() { > public void run() { > try { > remote_con.copyOut("some_source_table",pout); > pout.close(); // if you don't close it, the pipe will > be considered "broken" when the Thread ends > } catch (Exception e) { > System.err.println("Caught error while copying out " > + e); > } > } > }; > > > final Thread copierIn = new Thread() { > public void run() { > try { > local_con.copyIn("some_destination_table",pin); > } catch (Exception e) { > System.err.println("Caught error while copying in " + > e); > } > } > }; > > copierOut.start(); > copierIn.start(); > > > > > > > ---------------------------(end of broadcast)--------------------------- > TIP 1: subscribe and unsubscribe commands go to majordomo@xxxxxxxxxxxxxx > > > ---------------------------(end of broadcast)--------------------------- TIP 1: subscribe and unsubscribe commands go to majordomo@xxxxxxxxxxxxxx |
|
| <Prev in Thread] | Current Thread | [Next in Thread> |
|---|---|---|
| Previous by Date: | code to perform COPY: 00259, Michael Adler |
|---|---|
| Next by Date: | Re: JDBC parse error: 00259, Barry Lind |
| Previous by Thread: | code to perform COPYi: 00259, Michael Adler |
| Next by Thread: | Re: Problem with JDBC: no suitable driver: 00259, Konstantinos Spyropoulos |
| Indexes: | [Date] [Thread] [Top] [All Lists] |
| News | FAQ | advertise |