|
memory leak in udp roundtrip example?: msg#00026lib.boost.asio.user
Hi, I was trying to find the source of a leak in my asio application (nothing outrageously big, but it could exhaust 1Gb in a day or so), and i keep reducing the code until i reached a minimal skeleton that reproduces the leak. I'm not sure if it has been reported already since there are no bugs filed in the tracker, so i assume it's safe to post it here. to make the test run in a terminal (run this for a long time (5, 10 minutes should be enough) and monitor the memory resources of the process) ./test 5600 and in another: ./test 5598 2 (this means that this will start the echo to port 5598+2) ( this last one can be restarted ocassionally, when a packet is dropped, its necessary to keep the bounce going) If its required, i could post some valgrind traces ( the leak occurs both on mac os x and linux ) which show obscure boost template traces. boost::bind and ofstream types are mentioned. the source of the test: #include <cstdlib> #include <iostream> #include <boost/bind.hpp> #include <boost/date_time/posix_time/posix_time_types.hpp> #include "asio.hpp" #include "string.h" #include <pthread.h> //#include "MsgFifo.h" using namespace std; #include <stdlib.h> #include <sys/time.h> #include <iostream> using namespace std; class Chronometer { public: Chronometer(){this->start();}; ~Chronometer(){}; void start() { int result; mPaused=false; result = gettimeofday( &m_time0 , 0 ); } void pause() { gettimeofday( &m_time1 , 0 ); mPaused=true;} void resume() { timeval v; gettimeofday( &v , 0 ); m_time0.tv_sec=v.tv_sec - (m_time1.tv_sec - m_time0.tv_sec); m_time0.tv_usec = v.tv_usec - (m_time1.tv_usec - m_time0.tv_usec); mPaused=false; } void print() { if(mPaused == false) gettimeofday( &m_time1 , 0 ); cout.setf(ios::fixed); int tmp=cout.precision(4); cout << "Elapsed time : " << this->seconds() << " s."<<endl ; cout.precision(tmp); cout.unsetf(ios::fixed); } void wait(float secs) { float now = seconds(); now += secs; while (seconds() < now ) { } } float seconds() { if(mPaused ==false) { gettimeofday( &m_time1 , 0 ); } double vA , vB; vA = (float)(m_time1.tv_sec - m_time0.tv_sec); vB = (float)(m_time1.tv_usec - m_time0.tv_usec); vB *= 0.000001; if (vB < 0.0) { vB += 1.0; vA -= 1.0; }; return (vA + vB); } private: timeval m_time0; timeval m_time1; bool mPaused; }; using asio::ip::udp; #include <iostream> #include <sstream> #include <string.h> class server { public: server(asio::io_service& io_service, short port) : io_service_(io_service), socket_(io_service, udp::endpoint(udp::v4(), port)) , measure() , sends( 0 ) , receives ( 0 ) { socket_.async_receive_from( asio::buffer(data_, max_length), sender_endpoint_, boost::bind(&server::handle_receive_from, this, asio::placeholders::error, asio::placeholders::bytes_transferred)); } void handle_receive_from(const asio::error& error, size_t bytes_recvd) { if (!error && bytes_recvd > 0) { receives++; cout << " recv at time : " << measure.seconds()<< " :" << data_ << endl; measure.wait(0.05); //just an echo action socket_.async_send_to( asio::buffer(data_, bytes_recvd), sender_endpoint_, boost::bind(&server::handle_send_to, this, asio::placeholders::error, asio::placeholders::bytes_transferred)); socket_.async_receive_from( asio::buffer(data_, max_length), sender_endpoint_, boost::bind(&server::handle_receive_from, this, asio::placeholders::error, asio::placeholders::bytes_transferred)); } else { socket_.async_receive_from( asio::buffer(data_, max_length), sender_endpoint_, boost::bind(&server::handle_receive_from, this, asio::placeholders::error, asio::placeholders::bytes_transferred)); } } void handle_send_to(const asio::error& error, size_t bytes_sent) { // after the echo action finished, continue listening //cout << "server::handle_send_to, restarting listening mode" << endl; sends++; socket_.async_receive_from( asio::buffer(data_, max_length), sender_endpoint_, boost::bind(&server::handle_receive_from, this, asio::placeholders::error, asio::placeholders::bytes_transferred)); //cout << "listening mode ready " << endl; } void start_send_to(string& str , udp::endpoint& endp) { //strncpy( data_ , str.c_str() , max_length-1 ); socket_.async_send_to( asio::buffer( str.c_str() , max_length ) , endp , boost::bind(&server::handle_send_to , this, asio::placeholders::error , asio::placeholders::bytes_transferred)); } private: asio::io_service& io_service_; udp::socket socket_; udp::endpoint sender_endpoint_; enum { max_length = 1024 }; char data_[max_length]; Chronometer measure; int sends , receives; }; struct thread_Arg { int32_t thr_id; short port; short offset_port; }; const int nb_io_threads = 1; void *io_main(void* data) { //MsgFifo< , 1024 >* msg_queue = static_cast< MsgFifo< , 1024 >* >( data ); thread_Arg* arg = (thread_Arg*)data; // here we should add ports to the io_service and run. // the async completion calls will queue incoming messages to the queue // which will be proccessed by the main thread try { asio::io_service io_service; udp::resolver resolver(io_service); //using namespace std; // For atoi. server s(io_service, arg->port); if ( arg->offset_port != -1 ) { // this side will start the echo udp::resolver::query query(udp::v4(), "0.0.0.0", "0"); udp::endpoint endp = *resolver.resolve(query); endp.port( arg->port + arg->offset_port ); string foo; foo = "foob\n"; //foo = "fooBistic"; s.start_send_to( *& foo , *& endp ); } io_service.run(); } catch (asio::error& e) { std::cerr << e << "\n"; } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } }; int main(int argc, char* argv[]) { if ((argc < 2) || (argc > 4)) { std::cerr << "Usage: test_serv <port>\n"; std::cerr << " test_serv <port> <You_start_with_offset_port>\n"; return 1; } short Port = atoi( argv[1] ); short delta_Port; if (argc == 3) delta_Port = atoi( argv[ 2 ] ); else delta_Port = -1; //MsgFifo< , 1024 > msg_queue; pthread_t io_thread[ nb_io_threads ]; pthread_attr_t atrib; pthread_attr_init( & atrib ); thread_Arg arg[ nb_io_threads ]; int r; for (int i = 0; i< nb_io_threads; i++) { r = arg[ i ].thr_id = i; arg[ i ].port = Port; arg[ i ].offset_port = delta_Port; //OSAtomicCompareAndSwap32( r , i , &(arg[ i ].thr_id) ); //CAS( & (arg[ i ].thr_id) , r , i); //cout << " Before creating the new thread... " << endl; r = pthread_create( &(io_thread[ i ]) , &atrib , &io_main , (void*)&(arg[ i ]) ); //cout << " created thread " << i << ", result: " << r << endl; }; //now we do main work, and from time to time we read from the queue messages from the io thread Chronometer wait; wait.start(); while( true ) { cout << "main thread does other things now.. " << endl; wait.wait(1.0); //cout << "main thread now checks the queue " << endl; } return 0; } __________________________________________________ Correo Yahoo! Espacio para todos tus mensajes, antivirus y antispam ¡gratis! ¡Abrí tu cuenta ya! - http://correo.yahoo.com.ar ------------------------------------------------------------------------- Take Surveys. Earn Cash. Influence the Future of IT Join SourceForge.net's Techsay panel and you'll get the chance to share your opinions on IT & business topics through brief surveys - and earn cash http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV |
|
| <Prev in Thread] | Current Thread | [Next in Thread> |
|---|---|---|
| Previous by Date: | Re: retrieving local ip address: 00026, Themistoklis Bourdenas |
|---|---|
| Next by Date: | Re: retrieving local ip address: 00026, Christopher Kohlhoff |
| Previous by Thread: | retrieving local ip addressi: 00026, Themistoklis Bourdenas |
| Next by Thread: | Re: memory leak in udp roundtrip example?: 00026, Christopher Kohlhoff |
| Indexes: | [Date] [Thread] [Top] [All Lists] |
| News | FAQ | advertise |