logo       

memory leak in udp roundtrip example?: msg#00026

lib.boost.asio.user

Subject: memory leak in udp roundtrip example?


Hi,

I was trying to find the source of a leak in my asio
application (nothing outrageously big, but it could
exhaust 1Gb in a day or so), and i keep reducing the
code until i reached a minimal skeleton that
reproduces the leak. I'm not sure if it has been
reported already since there are no bugs filed in the
tracker, so i assume it's safe to post it here.

to make the test run in a terminal (run this for a
long time (5, 10 minutes should be enough) and monitor
the memory resources of the process)
./test 5600

and in another:
./test 5598 2 (this means that this will start the
echo to port 5598+2)
( this last one can be restarted ocassionally, when a
packet is dropped, its necessary to keep the bounce
going)

If its required, i could post some valgrind traces (
the leak occurs both on mac os x and linux ) which
show obscure boost template traces. boost::bind and
ofstream types are mentioned.

the source of the test:

#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include
<boost/date_time/posix_time/posix_time_types.hpp>
#include "asio.hpp"
#include "string.h"
#include <pthread.h>
//#include "MsgFifo.h"

using namespace std;

#include <stdlib.h>
#include <sys/time.h>
#include <iostream>

using namespace std;

class Chronometer
{
public:
Chronometer(){this->start();};
~Chronometer(){};
void start() {
int result;
mPaused=false;
result = gettimeofday( &m_time0 , 0 );

}
void pause() {
gettimeofday( &m_time1 , 0 );
mPaused=true;}
void resume()
{ timeval v;
gettimeofday( &v , 0 );
m_time0.tv_sec=v.tv_sec - (m_time1.tv_sec -
m_time0.tv_sec);
m_time0.tv_usec = v.tv_usec - (m_time1.tv_usec -
m_time0.tv_usec);
mPaused=false;
}
void print()
{
if(mPaused == false) gettimeofday( &m_time1 , 0 );
cout.setf(ios::fixed);
int tmp=cout.precision(4);
cout << "Elapsed time : " << this->seconds() << "
s."<<endl ;
cout.precision(tmp);
cout.unsetf(ios::fixed);
}

void wait(float secs)
{
float now = seconds();
now += secs;
while (seconds() < now )
{
}
}

float seconds()
{
if(mPaused ==false)
{
gettimeofday( &m_time1 , 0 );
}

double vA , vB;
vA = (float)(m_time1.tv_sec - m_time0.tv_sec);
vB = (float)(m_time1.tv_usec - m_time0.tv_usec);
vB *= 0.000001;
if (vB < 0.0) { vB += 1.0; vA -= 1.0; };
return (vA + vB);

}
private:
timeval m_time0;
timeval m_time1;
bool mPaused;
};

using asio::ip::udp;

#include <iostream>
#include <sstream>
#include <string.h>
class server
{
public:
server(asio::io_service& io_service, short port)
: io_service_(io_service),
socket_(io_service, udp::endpoint(udp::v4(), port)) ,
measure() , sends( 0 ) , receives ( 0 )
{
socket_.async_receive_from(

asio::buffer(data_, max_length),
sender_endpoint_,

boost::bind(&server::handle_receive_from,
this,

asio::placeholders::error,

asio::placeholders::bytes_transferred));
}

void handle_receive_from(const asio::error& error,
size_t bytes_recvd)
{

if (!error && bytes_recvd > 0)
{

receives++;
cout << " recv at time : " << measure.seconds()<< "
:" << data_ << endl;
measure.wait(0.05);
//just an echo action
socket_.async_send_to(
asio::buffer(data_,
bytes_recvd),
sender_endpoint_,

boost::bind(&server::handle_send_to, this,

asio::placeholders::error,

asio::placeholders::bytes_transferred));


socket_.async_receive_from(

asio::buffer(data_, max_length),
sender_endpoint_,

boost::bind(&server::handle_receive_from,
this,

asio::placeholders::error,

asio::placeholders::bytes_transferred));
}
else
{
socket_.async_receive_from(

asio::buffer(data_, max_length),
sender_endpoint_,

boost::bind(&server::handle_receive_from,
this,

asio::placeholders::error,

asio::placeholders::bytes_transferred));
}
}

void handle_send_to(const asio::error& error, size_t
bytes_sent)
{
// after the echo action finished, continue listening
//cout << "server::handle_send_to, restarting
listening mode" << endl;
sends++;
socket_.async_receive_from(
asio::buffer(data_,
max_length),
sender_endpoint_,

boost::bind(&server::handle_receive_from,
this,

asio::placeholders::error,

asio::placeholders::bytes_transferred));
//cout << "listening mode ready " << endl;
}

void start_send_to(string& str , udp::endpoint& endp)
{
//strncpy( data_ , str.c_str() , max_length-1 );
socket_.async_send_to( asio::buffer( str.c_str() ,
max_length ) , endp ,

boost::bind(&server::handle_send_to , this,

asio::placeholders::error ,

asio::placeholders::bytes_transferred));
}

private:
asio::io_service& io_service_;
udp::socket socket_;
udp::endpoint sender_endpoint_;
enum { max_length = 1024 };
char data_[max_length];
Chronometer measure;
int sends , receives;

};

struct thread_Arg {

int32_t thr_id;
short port;
short offset_port;
};


const int nb_io_threads = 1;

void *io_main(void* data)
{
//MsgFifo< , 1024 >* msg_queue = static_cast<
MsgFifo< , 1024 >* >( data );
thread_Arg* arg = (thread_Arg*)data;
// here we should add ports to the io_service and
run.
// the async completion calls will queue incoming
messages to the queue
// which will be proccessed by the main thread
try
{

asio::io_service io_service;
udp::resolver resolver(io_service);
//using namespace std; // For atoi.
server s(io_service, arg->port);
if ( arg->offset_port != -1 )
{
// this side will start the echo
udp::resolver::query query(udp::v4(), "0.0.0.0",
"0");
udp::endpoint endp = *resolver.resolve(query);
endp.port( arg->port + arg->offset_port );
string foo;
foo = "foob\n";
//foo = "fooBistic";
s.start_send_to( *& foo , *& endp );

}

io_service.run();
}
catch (asio::error& e)
{
std::cerr << e << "\n";
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
};

int main(int argc, char* argv[])
{
if ((argc < 2) || (argc > 4))
{
std::cerr << "Usage: test_serv <port>\n";
std::cerr << " test_serv <port>
<You_start_with_offset_port>\n";
return 1;
}
short Port = atoi( argv[1] );
short delta_Port;
if (argc == 3)
delta_Port = atoi( argv[ 2 ] );
else
delta_Port = -1;



//MsgFifo< , 1024 > msg_queue;
pthread_t io_thread[ nb_io_threads ];
pthread_attr_t atrib;
pthread_attr_init( & atrib );
thread_Arg arg[ nb_io_threads ];
int r;
for (int i = 0; i< nb_io_threads; i++)
{
r = arg[ i ].thr_id = i;
arg[ i ].port = Port;
arg[ i ].offset_port = delta_Port;
//OSAtomicCompareAndSwap32( r , i , &(arg[ i
].thr_id) );
//CAS( & (arg[ i ].thr_id) , r , i);
//cout << " Before creating the new thread... " <<
endl;
r = pthread_create( &(io_thread[ i ]) , &atrib ,
&io_main , (void*)&(arg[ i ]) );
//cout << " created thread " << i << ", result: " <<
r << endl;
};

//now we do main work, and from time to time we read
from the queue messages from the io thread

Chronometer wait;
wait.start();
while( true )
{

cout << "main thread does other things now.. " <<
endl;
wait.wait(1.0);
//cout << "main thread now checks the queue " <<
endl;
}

return 0;
}



__________________________________________________
Correo Yahoo!
Espacio para todos tus mensajes, antivirus y antispam ¡gratis!
¡Abrí tu cuenta ya! - http://correo.yahoo.com.ar

-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys - and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV


<Prev in Thread] Current Thread [Next in Thread>
Google Custom Search

News | FAQ | advertise