logo       

code to perform COPY: msg#00258

db.postgresql.jdbc

Subject: code to perform COPY


Here's the goods. This has worked pretty well for me. I think it's ready
for the next stage of scrutiny.

We may want to take a slightly different approach and move some of this
logic to PG_Stream. There's alot of code overhead on sending down the
transaction-control queries (BEGIN and COMMIT) and reading the response
analyses. I may not be taking advantage of some method, or we may want to
add functionality elsewhere to support copy in a clean fashion.

The debug messages are very verbose, so we'll probably want to comment out
some of that. For large copies, it prints out a crude KB per second. I get
220KB/sec over my LAN. It appears to be CPU bound on my P4 1.6ghz, so
perhaps that can be improved to the point of being network-bound.

Like I mentioned in my previous email, since PG_Stream.input_stream is an
java.io.InputStream, the multi-byte read/receive method offers no
improvments over the single-byte method. I could very well be wrong,
though. I still haven't tested it over a high-latency connection.


// *****************
// Postgres COPY handling from org.postgresql.Connection
// *****************

/**
* This will take the name of a table and an OutputStream, construct a
COPY OUT query, send the query
* ( while bypassing QueryExecutor), direct the output of the backend into
the OutputStream and
* then check the backend to see if it's ready for another query.
* @param table the table from which it will copy
* @param out OutputStream to which the copy will be sent
* @return void
* @exception Exception if a database access error occurs
*
**/

public void copyOut(String table, OutputStream out) throws Exception
{
synchronized(pg_stream) {

String query = "COPY " + table + " TO STDOUT";
if (Driver.logDebug) Driver.debug("Sending query '" + query + "' to
backend.");

// duplicates statements in QueryExecutor.sendQuery
pg_stream.SendChar('Q');
pg_stream.Send(this.getEncoding().encode( query ));
pg_stream.SendChar(0);
pg_stream.flush();

// check response from backend
int response = pg_stream.ReceiveChar();

if (response == 'H') {
if (Driver.logDebug) Driver.debug("Received 'H' from backend.
The COPY out is beginning.");
} else if ( response == 'E') {
String error_string =
pg_stream.ReceiveString(this.getEncoding());
throw new SQLException( error_string );
} else {
throw new SQLException("Copy Out should receive H from backend,
but instead received: " + (char)response );
}

long start_time = System.currentTimeMillis();

int a, b, c;

// read input stream one char at a time, but always hold three
a = pg_stream.ReceiveChar();
b = pg_stream.ReceiveChar();
c = pg_stream.ReceiveChar();

int counter = 3;

while (true) {
if ( Driver.logDebug && counter % 100000 == 0) {
int rate = counter / (int)(System.currentTimeMillis() -
start_time);
System.out.println( (counter / 1000) + " KB total at " +
rate + " KB/sec");
}

if ( a == '\\' && b == '.' && c == '\n' ) {
// this sequence of bytes means the copy is over
if (Driver.logDebug) Driver.debug("Received '\\.' from
backend. The COPY out stream is finished.");
break;
}

out.write(a);

a = b;
b = c;

try {
// the following looks unoptimized, but is really the same
as java.io.OutputStream.read(byte[] b, int off, int len)
c = pg_stream.ReceiveChar();
} catch (Exception e) {
// maybe the connection is screwed, or maybe we can salvage
it. I don't know.
throw e;
}

counter++;
}

// the backend should send the string "COPY"
String str = pg_stream.ReceiveString(this.getEncoding());

// check to make sure the backend is ready for the next query
response = pg_stream.ReceiveChar();

if (response == 'Z') {
if (Driver.logDebug) Driver.debug("Received 'Z' from backend.
It's ready for the next query. COPY out has completed");
} else if ( response == 'E') {
String error_string =
pg_stream.ReceiveString(this.getEncoding());
throw new SQLException( error_string );
} else {
throw new SQLException("Copy should receive Z from backend, but
instead received: " + (char)response );
}
}

return;
}


/*
* This will take the name of a table and a ByteArrayInputStream,
construct a COPY IN query,
* send the query ( while bypassing QueryExecutor), send the bytes of data
and send the
* 3-byte sequence that signifies the end of the copy. We enclose this in
a transaction to ensure
* that the entire stream is copied in, or nothing at all.
* @param table the table to which it will copy
* @param in InputStream from which the copy will be read
* @return void
* @exception Exception if a database access error occurs
*/

public void copyIn (String table, InputStream in) throws Exception
{
int response;
String str;

synchronized(pg_stream) {

// BEGIN the COPY in transaction

pg_stream.SendChar('Q');
pg_stream.Send(this.getEncoding().encode( "BEGIN" ));
pg_stream.SendChar(0);
pg_stream.flush();

// the backend shoul send the string "CBEGIN" - 'C' for "Completed"
and 'BEGIN' for the type of completed query
str = pg_stream.ReceiveString(this.getEncoding());
if ( ! str.equals("CBEGIN") ) throw new SQLException("BEGIN seemed
to fail before COPY in. Received: '" + str + "'");
response = pg_stream.ReceiveChar();
if ( response != 'Z' ) throw new SQLException("Backend is not ready
for next query. Instead of 'Z', received: '" + (char)response + "'");


// now "inside" a transaction and ready for COPYing in

String query = "COPY " + table + " FROM STDIN";
//if (Driver.logDebug) Driver.debug("Sending query '" + query + "'
to backend.");

// duplicates statements in QueryExecutor.sendQuery
pg_stream.SendChar('Q');
pg_stream.Send(this.getEncoding().encode( query ));
pg_stream.SendChar(0);
pg_stream.flush();

// check response from backend
response = pg_stream.ReceiveChar();

if (response == 'G') {
if (Driver.logDebug) Driver.debug("Received 'G' from backend.
The COPY in should now begin.");
} else if ( response == 'E') {
String error_string =
pg_stream.ReceiveString(this.getEncoding());
throw new SQLException( error_string );
} else {
throw new SQLException("Copy should receive G from backend, but
instead received: " + (char)response );
}


// send the whole input stream

int b; // a byte placeholder to read from in and send to backend

while (true) {

try {
b = in.read();
} catch (IOException e) {
// maybe the connection is screwed, or maybe we can salvage
it. I don't know.
throw new SQLException("While reading from InputStream, it
threw exception: '" + e + "'");
}

// we may want to check for the termination string '\\.\n'
because we send it later. It wouldn't be good to send it twice

if (b == -1) {
break; // the InputStream is finished
} else {
try {
pg_stream.SendChar((char)b);
} catch (IOException e) {
throw new SQLException("While sending a char to the
backend, it threw exception: '" + e + "'" );
}
}
}

// send the special row
if (Driver.logDebug) Driver.debug("Sending the byte seqence '\\.'.
The frontend has finished the COPY in stream.");
pg_stream.Send( new byte[] { (byte)'\\', (byte)'.', (byte)'\n' } );
pg_stream.flush();

// the backend send the string "CCOPY" - 'C' for "Completed" and
'COPY' for the type of completed query
str = pg_stream.ReceiveString(this.getEncoding());
//if (Driver.logDebug) Driver.debug( "Should be CCOPY: " + str);

// check to make sure the backend is ready for the next query
response = pg_stream.ReceiveChar();

if (response == 'Z') {
if (Driver.logDebug) Driver.debug("Received 'Z' from backend.
It's ready for the next query. COPY in has completed");
} else if ( response == 'E') {
String error_string =
pg_stream.ReceiveString(this.getEncoding());
throw new SQLException( error_string );
} else {
throw new SQLException("Copy should receive Z from backend, but
instead received: " + (char)response );
}


// since we reached this point with an error, COMMIT the COPY in
transaction

pg_stream.SendChar('Q');
pg_stream.Send(this.getEncoding().encode( "COMMIT" ));
pg_stream.SendChar(0);
pg_stream.flush();

// the backend shoul send the string "CCOMMIT" - 'C' for
"Completed" and 'COMMIT' for the type of completed query
str = pg_stream.ReceiveString(this.getEncoding());
if ( ! str.equals("CCOMMIT") ) throw new SQLException("COMMIT
seemed to fail after COPY in. Received: '" + str + "'");
response = pg_stream.ReceiveChar();
if ( response != 'Z' ) throw new SQLException("Backend is not ready
for next query. Instead of 'Z', received: '" + (char)response + "'");
}
}











// *****************
// Postgres COPY testing from org.postgresql.test.JDBCTests
// *****************



/*
* Tests the copyOut functionality by copying data from pg_class into a
test table,
* then it copies that data out and check the number of 'rows' that
came out.
*/
public void testCopyOut()
{
try
{
org.postgresql.Connection conn =
(org.postgresql.Connection)JDBC2Tests.openDB();

java.sql.Statement st = conn.createStatement();
st.executeUpdate( "INSERT into copy_out_test SELECT
relowner, relpages FROM pg_class LIMIT 20" );

ByteArrayOutputStream out = new ByteArrayOutputStream();
java.sql.ResultSet rs = st.executeQuery( "SELECT count(*)
AS row_count FROM copy_out_test" );
rs.first();
int row_count = rs.getInt("row_count"); // the number of
rows in the table copy_out_test

conn.copyOut("copy_out_test",out);

ByteArrayInputStream in = new
ByteArrayInputStream(out.toByteArray());
int newline_count = 0;
for ( int i = in.read(); i != -1; i = in.read() ) {
if (i == '\n') newline_count++; // count the number of
newlines
}

// # of newlines should equal original LIMIT #
assertEquals(row_count,newline_count);
}
catch (Exception ex)
{
assertTrue(ex.getMessage(), false);
}
}



/*
* Tests the copyIn functionality by copying in an array of bytes, then
checking their integrity in the table
*/
public void testCopyIn()
{
try
{
org.postgresql.Connection conn =
(org.postgresql.Connection)JDBC2Tests.openDB();
java.sql.Statement st = conn.createStatement();
byte[] dummy_array = new byte[] {
(byte)'T',(byte)'o',(byte)'m',(byte)'\t',(byte)'1',(byte)'2',(byte)'\n',
//Tom 12

(byte)'B',(byte)'o',(byte)'b',(byte)'\t',(byte)'2',(byte)'9',(byte)'\n',
//Bob 29

(byte)'H',(byte)'a',(byte)'n',(byte)'\t',(byte)'4',(byte)'1',(byte)'\n'
//Han 41
};

InputStream in = new ByteArrayInputStream(dummy_array);
conn.copyIn("copy_in_test",in);

java.sql.ResultSet rs = st.executeQuery( "SELECT sum(two)
AS sum FROM copy_in_test" );
rs.first();
int sum = rs.getInt("sum");
assertEquals(sum,82); // the sum of column 'two' should
equal 82 (i.e. 12 + 29 + 41)

}
catch (Exception ex)
{
assertTrue(ex.getMessage(), false);
}
}





/*
* Test a sequence of copy commands by
*
*/
public void testCopyOutCopyIn()
{
try
{
org.postgresql.Connection conn =
(org.postgresql.Connection)JDBC2Tests.openDB();
java.sql.Statement st = conn.createStatement();

st.executeUpdate( "INSERT into copy_out_copy_in_test SELECT
relowner, relpages FROM pg_class LIMIT 20" );

ByteArrayOutputStream out = new ByteArrayOutputStream();
java.sql.ResultSet rs = st.executeQuery( "SELECT count(*)
AS row_count FROM copy_out_copy_in_test" );
rs.first();
int row_count = rs.getInt("row_count"); // the number of
rows in the table copy_out_copy_in_test

conn.copyOut("copy_out_copy_in_test",out);

ByteArrayInputStream in = new
ByteArrayInputStream(out.toByteArray());
int newline_count = 0;
for ( int i = in.read(); i != -1; i = in.read() ) {
if (i == '\n') newline_count++; // count the number of
newlines
}

// # of newlines should equal original LIMIT #
assertEquals(row_count,newline_count);

in = new ByteArrayInputStream(out.toByteArray());
conn.copyIn("copy_out_copy_in_test",in);

rs = st.executeQuery( "SELECT count(*) AS row_count FROM
copy_out_copy_in_test" );
rs.first();
row_count = rs.getInt("row_count"); // the number of rows
in the table copy_out_copy_in_test

assertEquals(row_count , newline_count*2);
}
catch (Exception ex)
{
assertTrue(ex.getMessage(), false);
}
}




/*
* Test a sequence of copy commands by copying in an array of bytes,
then copying them out
* and comparing the lengths of the two arrays.
*/
public void testCopyInCopyOut()
{
try
{
org.postgresql.Connection conn =
(org.postgresql.Connection)JDBC2Tests.openDB();

java.sql.Statement st = conn.createStatement();
byte[] input_array = new byte[] {
(byte)'T',(byte)'o',(byte)'m',(byte)'\t',(byte)'1',(byte)'2',(byte)'\n',

(byte)'B',(byte)'o',(byte)'b',(byte)'\t',(byte)'2',(byte)'9',(byte)'\n',

(byte)'H',(byte)'a',(byte)'n',(byte)'\t',(byte)'4',(byte)'1',(byte)'\n'
};

InputStream in = new ByteArrayInputStream(input_array);
conn.copyIn("copy_in_copy_out_test",in);

java.sql.ResultSet rs = st.executeQuery( "SELECT sum(two)
AS sum FROM copy_in_copy_out_test" );
rs.first();
int sum = rs.getInt("sum");
assertEquals(sum,82); // the sum of column 'two' should
equal 82 (i.e. 12 + 29 + 41)

// copy out the everything from the table we just copy'ed
into

ByteArrayOutputStream out = new ByteArrayOutputStream();
conn.copyOut("copy_in_copy_out_test",out);

int input_length = input_array.length;
int output_length = out.size();
assertEquals(input_length, output_length); // the input and
output array should be the same size
}
catch (Exception ex)
{
assertTrue(ex.getMessage(), false);
}
}












// *****************
// test using pipes so that you don't have to read everything into memory
before you sent it out.
// *****************



final org.postgresql.Connection local_con =
(org.postgresql.Connection)DriverManager.getConnection("jdbc:postgresql://localhost/test1?loglevel=2",
"test" , "password");
final org.postgresql.Connection local_con =
(org.postgresql.Connection)DriverManager.getConnection("jdbc:postgresql://localhost/test2?loglevel=2",
"test" , "password");


final PipedOutputStream pout = new PipedOutputStream();
final PipedInputStream pin = new PipedInputStream(pout);

final Thread copierOut = new Thread() {
public void run() {
try {
remote_con.copyOut("some_source_table",pout);
pout.close(); // if you don't close it, the pipe will
be considered "broken" when the Thread ends
} catch (Exception e) {
System.err.println("Caught error while copying out " +
e);
}
}
};


final Thread copierIn = new Thread() {
public void run() {
try {
local_con.copyIn("some_destination_table",pin);
} catch (Exception e) {
System.err.println("Caught error while copying in " +
e);
}
}
};

copierOut.start();
copierIn.start();






---------------------------(end of broadcast)---------------------------
TIP 1: subscribe and unsubscribe commands go to majordomo@xxxxxxxxxxxxxx





<Prev in Thread] Current Thread [Next in Thread>
Google Custom Search

News | FAQ | advertise