#include <iostream>
#include <ace/OS_main.h>
#include <so_4/rt/h/rt.hpp>
#include <so_4/api/h/api.hpp>
#include <so_4/timer_thread/simple/h/pub.hpp>
#include <so_4/disp/active_obj/h/pub.hpp>
#include <so_4/rt/comm/h/a_raw_outgoing_channel.hpp>
#include <so_4/transport_layer/socket/h/pub.hpp>
class a_main_t :
public so_4::rt::agent_t
{
typedef so_4::rt::agent_t base_type_t;
public :
a_main_t(
int argc,
char ** argv );
virtual ~a_main_t();
virtual const char *
so_query_type() const;
virtual void
so_on_subscription();
static std::string &
agent_name();
static std::string &
tcp_agent_name();
void
evt_success(
const so_4::rt::comm::msg_success * );
void
evt_fail(
const so_4::rt::comm::msg_fail * );
void
evt_client_connected(
const so_4::rt::comm::msg_client_connected * cmd );
void
evt_client_disconnected(
const so_4::rt::comm::msg_client_disconnected * cmd );
void
evt_incoming_data(
const so_4::rt::comm::msg_raw_package * cmd );
private :
char ** m_argv;
int m_argc;
void
shutdown()
{
so_4::api::send_msg(
so_4::rt::sobjectizer_agent_name(),
"msg_normal_shutdown", 0 );
}
};
SOL4_CLASS_START( a_main_t )
SOL4_EVENT_STC(
evt_success,
so_4::rt::comm::msg_success )
SOL4_EVENT_STC(
evt_fail,
so_4::rt::comm::msg_fail )
SOL4_EVENT_STC(
evt_client_connected,
so_4::rt::comm::msg_client_connected )
SOL4_EVENT_STC(
evt_client_disconnected,
so_4::rt::comm::msg_client_disconnected )
SOL4_EVENT_STC(
evt_incoming_data,
so_4::rt::comm::msg_raw_package )
SOL4_STATE_START( st_normal )
SOL4_STATE_EVENT( evt_success )
SOL4_STATE_EVENT( evt_fail )
SOL4_STATE_EVENT( evt_client_connected )
SOL4_STATE_EVENT( evt_client_disconnected )
SOL4_STATE_EVENT( evt_incoming_data )
SOL4_STATE_FINISH()
SOL4_CLASS_FINISH()
a_main_t::a_main_t(
int argc,
char ** argv )
:
base_type_t( agent_name().c_str() )
, m_argc( argc )
, m_argv( argv )
{
}
a_main_t::~a_main_t()
{
}
void
a_main_t::so_on_subscription()
{
so_subscribe( "evt_success", tcp_agent_name(), "msg_success" );
so_subscribe( "evt_fail", tcp_agent_name(), "msg_fail" );
so_subscribe( "evt_client_connected", tcp_agent_name(),
"msg_client_connected" );
so_subscribe( "evt_client_disconnected", tcp_agent_name(),
"msg_client_disconnected" );
so_subscribe( "evt_incoming_data", tcp_agent_name(),
"msg_raw_package" );
}
std::string &
a_main_t::agent_name()
{
static std::string name( "a_main" );
return name;
}
std::string &
a_main_t::tcp_agent_name()
{
static std::string name( "a_tcp_srvsock" );
return name;
}
void
a_main_t::evt_success(
const so_4::rt::comm::msg_success * cmd )
{
std::cout << so_query_name() << ".evt_success"
<< ": channel: " << cmd->m_channel << std::endl;
}
void
a_main_t::evt_fail(
const so_4::rt::comm::msg_fail * cmd )
{
std::cout << so_query_name() << ".evt_fail: "
<< cmd->m_reason << std::endl;
shutdown();
}
const std::string &
replace_escaped_lf( std::string & what )
{
std::string::size_type where = 0;
while( std::string::npos != ( where = what.find( "\\n", where ) ) )
{
what.replace( where, 2, "\r\n" );
where += 2;
}
return what;
}
void
a_main_t::evt_client_connected(
const so_4::rt::comm::msg_client_connected * cmd )
{
std::cout << so_query_name() << ".evt_client_connected: "
<< cmd->m_channel << std::endl;
std::cout << "sending data" << std::endl;
for( int i = 0; i != m_argc; ++i )
{
std::string what( m_argv[ i ] );
replace_escaped_lf( what );
so_4::rt::comm_buf_t data;
data.insert( 0, what.data(), what.size() );
data.insert( data.size(), "\r\n\r\n", 4 );
so_4::api::send_msg_safely(
cmd->m_channel.comm_agent(), "msg_send_package",
new so_4::rt::comm::msg_send_package(
cmd->m_channel.client(),
data ) );
}
}
void
a_main_t::evt_client_disconnected(
const so_4::rt::comm::msg_client_disconnected * cmd )
{
std::cout << so_query_name() << ".evt_client_disconnected: "
<< cmd->m_channel << std::endl;
shutdown();
}
void
a_main_t::evt_incoming_data(
const so_4::rt::comm::msg_raw_package * cmd )
{
std::cout << so_query_name() << ".evt_incoming_data: "
<< cmd->m_channel
<< "\n\tdata size: " << cmd->m_package.size()
<< "\n\tis channel blocked: " << cmd->m_is_blocked
<< std::endl;
std::string v(
(const char *)cmd->m_package.ptr(),
cmd->m_package.size() );
std::cout << v << std::endl;
cmd->unblock_channel();
}
so_4::rt::agent_coop_t *
create_coop( const char * ip_address,
int argc,
char ** argv )
{
a_main_t * a_main = new a_main_t( argc, argv );
so_4::rt::comm::a_raw_outgoing_channel_t * a_tcp_clnsock =
new so_4::rt::comm::a_raw_outgoing_channel_t(
a_main_t::tcp_agent_name(),
so_4::transport_layer::socket::create_connector_controller(
so_4::transport_layer::socket::connector_params(
ip_address ),
so_4::transport_layer::channel_params_t().set_input_threshold(
so_4::transport_layer::threshold_t( 1, 1 ) ) ),
so_4::rt::comm::disconnect_handler_auto_ptr_t() );
a_tcp_clnsock->so_add_traits(
so_4::disp::active_obj::query_active_obj_traits() );
so_4::rt::agent_t * agents[] = {
a_main, a_tcp_clnsock
};
return new so_4::rt::dyn_agent_coop_t( "srvsock1",
agents, sizeof( agents ) / sizeof( agents[ 0 ] ) );
}
int
main( int argc, char ** argv )
{
if( 2 <= argc )
{
so_4::ret_code_t rc = so_4::api::start(
so_4::disp::active_obj::create_disp(
so_4::timer_thread::simple::create_timer_thread(),
so_4::auto_destroy_timer ),
so_4::auto_destroy_disp,
create_coop( argv[ 1 ], argc - 2, &(argv[ 2 ]) ) );
if( rc )
{
std::cerr << "start: " << rc << std::endl;
}
return rc;
}
else
std::cerr << "sample_raw_channel_tcp_cln "
"<ip-address> [data_to_send]"
<< std::endl;
return 0;
}