#include <iostream>
#include <set>
#include <stdio.h>
#include <ace/OS_main.h>
#include <so_4/api/h/api.hpp>
#include <so_4/rt/h/rt.hpp>
#include <so_4/rt/comm/h/a_sop_outgoing_channel.hpp>
#include <so_4/transport_layer/socket/h/pub.hpp>
#include <so_4/timer_thread/simple/h/pub.hpp>
#include <so_4/disp/active_obj/h/pub.hpp>
#include "common.cpp"
class a_sender_t
: public so_4::rt::agent_t
{
typedef so_4::rt::agent_t base_type_t;
private :
unsigned int m_data_size;
unsigned int m_request_count;
unsigned int m_reply_received;
unsigned int m_group_size;
unsigned int m_timeout;
unsigned int m_uid;
std::set< unsigned int > m_no_reply_uids;
bool m_is_connected;
void
show_stat() const
{
double percents = double( m_reply_received ) /
double( m_request_count ) * 100.0;
std::cout << "*** "
<< ( m_is_connected ? "connected" : "not connected" )
<< ", " << percents << "% ("
<< m_no_reply_uids.size() << ")"
<< std::endl;
}
void
send( unsigned int uid,
bool insert_to_no_reply_uids = true )
{
so_4::api::send_msg_safely(
a_common_t::agent_name(),
"msg_request",
new a_common_t::msg_request( uid, m_data_size ) );
if( insert_to_no_reply_uids )
m_no_reply_uids.insert( uid );
}
public :
a_sender_t(
unsigned int data_size,
unsigned int request_count,
unsigned int group_size,
unsigned int timeout )
: base_type_t( "a_receiver" )
, m_data_size( data_size )
, m_request_count( request_count )
, m_reply_received( 0 )
, m_group_size( group_size )
, m_timeout( timeout )
, m_uid( 0 )
, m_is_connected( false )
{
}
virtual ~a_sender_t()
{}
struct msg_timeout {};
virtual const char *
so_query_type() const;
virtual void
so_on_subscription()
{
so_4::api::make_global_agent(
a_common_t::agent_name(),
a_common_t::agent_type() );
so_subscribe( "evt_start",
so_4::rt::sobjectizer_agent_name(), "msg_start" );
so_subscribe( "evt_client_connected",
so_4::rt::comm::communicator_agent_name(),
"msg_client_connected" );
so_subscribe( "evt_client_disconnected",
so_4::rt::comm::communicator_agent_name(),
"msg_client_disconnected" );
so_subscribe( "evt_reply",
a_common_t::agent_name(),
"msg_reply" );
so_subscribe( "evt_timeout", "msg_timeout" );
}
void
evt_start(
const so_4::rt::event_data_t & )
{
so_4::api::send_msg( so_query_name(), "msg_timeout", 0,
so_query_name(), 1000, 1000 * m_timeout );
}
void
evt_client_connected(
const so_4::rt::event_data_t & data,
const so_4::rt::comm::msg_client_connected * cmd )
{
m_is_connected = true;
show_stat();
}
void
evt_client_disconnected(
const so_4::rt::event_data_t & data,
const so_4::rt::comm::msg_client_disconnected * cmd )
{
m_is_connected = false;
show_stat();
}
void
evt_reply(
const so_4::rt::event_data_t &,
const a_common_t::msg_reply * cmd )
{
if( m_no_reply_uids.find( cmd->m_uid ) !=
m_no_reply_uids.end() )
{
m_no_reply_uids.erase( cmd->m_uid );
++m_reply_received;
if( m_reply_received == m_request_count )
{
so_4::api::send_msg(
so_4::rt::sobjectizer_agent_name(),
"msg_normal_shutdown", 0 );
}
}
}
void
evt_timeout(
const so_4::rt::event_data_t & )
{
unsigned int sent = 0;
show_stat();
for( std::set< unsigned int >::iterator
it = m_no_reply_uids.begin(),
it_end = m_no_reply_uids.end();
it != it_end && sent != m_group_size;
++it,
++sent )
{
send( *it, false );
}
for( unsigned int count = 0;
sent + count != m_group_size &&
m_uid != m_request_count;
++count, ++m_uid )
{
send( m_uid );
}
}
};
SOL4_CLASS_START( a_sender_t )
SOL4_MSG_START( msg_timeout, a_sender_t::msg_timeout )
SOL4_MSG_FINISH()
SOL4_EVENT( evt_start )
SOL4_EVENT_WITH_INCIDENT_TYPE(
evt_client_connected,
so_4::rt::comm::msg_client_connected )
SOL4_EVENT_WITH_INCIDENT_TYPE(
evt_client_disconnected,
so_4::rt::comm::msg_client_disconnected )
SOL4_EVENT_WITH_INCIDENT_TYPE(
evt_reply,
a_common_t::msg_reply )
SOL4_EVENT( evt_timeout )
SOL4_STATE_START( st_normal )
SOL4_STATE_EVENT( evt_start )
SOL4_STATE_EVENT( evt_client_connected )
SOL4_STATE_EVENT( evt_client_disconnected )
SOL4_STATE_EVENT( evt_reply )
SOL4_STATE_EVENT( evt_timeout )
SOL4_STATE_FINISH()
SOL4_CLASS_FINISH()
int
main( int argc, char ** argv )
{
if( 6 == argc )
{
unsigned int data_size = 0;
sscanf( argv[ 2 ], "%u", &data_size );
unsigned int count = 0;
sscanf( argv[ 3 ], "%u", &count );
unsigned int group_size = 0;
sscanf( argv[ 4 ], "%u", &group_size );
unsigned int timeout = 0;
sscanf( argv[ 5 ], "%u", &timeout );
so_4::sop::std_filter_t * filter =
so_4::sop::create_std_filter();
filter->insert( a_common_t::agent_name() );
so_4::rt::comm::a_sop_outgoing_channel_t a_channel(
"a_channel",
so_4::transport_layer::socket::create_connector_controller(
so_4::transport_layer::socket::connector_params(
argv[ 1 ] ),
so_4::transport_layer::channel_params_t().
set_output_portion_size( 100 * 1024 ).
set_max_output_buffer_size( 600 * 1024 ) ),
so_4::sop::filter_auto_ptr_t( filter ),
so_4::rt::comm::create_def_disconnect_handler( 5000, 0 ) );
a_channel.so_add_traits(
so_4::disp::active_obj::query_active_obj_traits() );
a_sender_t a_sender(
data_size, count, group_size, timeout );
so_4::rt::agent_t * agents[] =
{
&a_channel, &a_sender
};
so_4::rt::agent_coop_t coop( "server",
agents, sizeof( agents ) / sizeof( agents[ 0 ] ) );
so_4::ret_code_t rc = so_4::api::start(
so_4::disp::active_obj::create_disp(
so_4::timer_thread::simple::create_timer_thread(),
so_4::auto_destroy_timer ),
so_4::auto_destroy_disp,
&coop );
if( rc )
{
std::cerr << "start: " << rc << std::endl;
}
return int( rc );
}
std::cerr << "sample_high_traffic_client <[ip]:port> "
"data_size count group_size timeout" << std::endl;
return 1;
}