sample/parent_insend/main.cpp
#include <iostream>
#include <map>
#include <ace/OS.h>
#include <ace/Thread_Manager.h>
#include <cpp_util_2/h/lexcast.hpp>
#include <so_4/api/h/api.hpp>
#include <so_4/rt/h/rt.hpp>
#include <so_4/rt/comm/h/a_raw_incoming_channel_processor.hpp>
#include <so_4/transport_layer/socket/h/pub.hpp>
#include <so_4/timer_thread/simple/h/pub.hpp>
#include <so_4/disp/active_obj/h/pub.hpp>
const std::string router_agent_name( "a_router" );
const std::string server_agent_name( "a_server" );
class a_listener_t
: public so_4::rt::agent_t
{
typedef so_4::rt::agent_t base_type_t;
public :
a_listener_t( const std::string & agent_name )
: base_type_t( agent_name )
{
so_4::disp::active_obj::make_active( *this );
std::cout << so_query_name() << " created" << std::endl;
}
virtual ~a_listener_t()
{
std::cout << so_query_name() << " destroyed" << std::endl;
}
typedef so_4::rt::comm::msg_raw_package msg_data;
virtual const char *
so_query_type() const;
void
so_on_subscription()
{
so_subscribe( "evt_data", "msg_data" );
}
void
evt_data( const msg_data & cmd )
{
std::cout << "agent: " << so_query_name()
<< ", channel: " << cmd.m_channel
<< ", received: " << cmd.m_package.size()
<< " byte(s)" << std::endl;
cmd.unblock_channel();
}
};
SOL4_CLASS_START( a_listener_t )
SOL4_MSG_START( msg_data, a_listener_t::msg_data )
SOL4_MSG_CHECKER( a_listener_t::msg_data::check )
SOL4_MSG_FINISH()
SOL4_EVENT_STC( evt_data, a_listener_t::msg_data )
SOL4_STATE_START( st_normal )
SOL4_STATE_EVENT( evt_data )
SOL4_STATE_FINISH()
SOL4_CLASS_FINISH()
class a_router_t
: public so_4::rt::agent_t
{
typedef so_4::rt::agent_t base_type_t;
private :
so_4::rt::comm::a_raw_incoming_channel_processor_t m_srv_channel;
so_4::rt::agent_coop_t m_srv_channel_coop;
typedef std::map< so_4::rt::comm_channel_t, std::string >
client_map_t;
client_map_t m_clients;
int m_child_counter;
void
setup_subscr_hook(
const std::string & event,
const std::string & msg )
{
so_4::rt::def_subscr_hook( m_srv_channel_coop,
*this, event, m_srv_channel, msg, 0, &std::cerr,
so_4::rt::evt_subscr_t::insend_dispatching );
}
public :
a_router_t(
const std::string & ip )
: base_type_t( router_agent_name )
, m_srv_channel(
server_agent_name,
so_4::transport_layer::socket::create_acceptor_controller( ip ) )
, m_srv_channel_coop( m_srv_channel )
, m_child_counter( 0 )
{
so_4::disp::active_obj::make_active( m_srv_channel );
std::cout << so_query_name() << " created" << std::endl;
}
virtual ~a_router_t()
{
std::cout << so_query_name() << " destroyed" << std::endl;
}
virtual const char *
so_query_type() const;
virtual void
so_on_subscription()
{
so_subscribe( "evt_start",
so_4::rt::sobjectizer_agent_name(), "msg_start" );
}
void
evt_start()
{
setup_subscr_hook( "evt_srv_channel_success", "msg_success" );
setup_subscr_hook( "evt_srv_channel_fail", "msg_fail" );
setup_subscr_hook( "evt_client_connected",
"msg_client_connected" );
setup_subscr_hook( "evt_client_disconnected",
"msg_client_disconnected" );
setup_subscr_hook( "evt_channel_data",
"msg_raw_package" );
m_srv_channel_coop.set_parent_coop_name(
so_query_coop()->query_name() );
so_4::api::register_coop( m_srv_channel_coop );
}
void
evt_srv_channel_success()
{
std::cout << "server channel created" << std::endl;
}
void
evt_srv_channel_fail(
const so_4::rt::comm::msg_fail & cmd )
{
std::cout << "server channel not created: "
<< cmd.m_reason << std::endl;
}
void
evt_client_connected(
const so_4::rt::comm::msg_client_connected & cmd )
{
std::cout << "client connected: "
<< cmd.m_channel << std::endl;
std::string agent_name( "a_listener_" +
cpp_util_2::slexcast( ++m_child_counter ) );
so_4::rt::dyn_agent_coop_t * coop(
new so_4::rt::dyn_agent_coop_t(
new a_listener_t( agent_name ) ) );
coop->set_parent_coop_name(
so_query_coop()->query_name() );
so_4::rt::dyn_agent_coop_helper_t h( coop );
if( !h.result() )
{
m_clients[ cmd.m_channel ] = agent_name;
}
}
void
evt_client_disconnected(
const so_4::rt::comm::msg_client_disconnected & cmd )
{
std::cout << "client disconnected: "
<< cmd.m_channel << std::endl;
client_map_t::iterator it( m_clients.find( cmd.m_channel ) );
if( it != m_clients.end() )
{
so_4::api::deregister_coop( it->second );
m_clients.erase( it );
}
}
void
evt_channel_data(
const so_4::rt::comm::msg_raw_package & cmd )
{
client_map_t::iterator it( m_clients.find( cmd.m_channel ) );
if( it != m_clients.end() )
{
so_4::api::send_msg_safely( it->second, "msg_data",
new a_listener_t::msg_data( cmd ) );
}
}
};
SOL4_CLASS_START( a_router_t )
SOL4_EVENT( evt_start )
SOL4_EVENT( evt_srv_channel_success )
SOL4_EVENT_STC( evt_srv_channel_fail,
so_4::rt::comm::msg_fail )
SOL4_EVENT_STC( evt_client_connected,
so_4::rt::comm::msg_client_connected )
SOL4_EVENT_STC( evt_client_disconnected,
so_4::rt::comm::msg_client_disconnected )
SOL4_EVENT_STC( evt_channel_data,
so_4::rt::comm::msg_raw_package )
SOL4_STATE_START( st_normal )
SOL4_STATE_EVENT( evt_start )
SOL4_STATE_EVENT( evt_srv_channel_success )
SOL4_STATE_EVENT( evt_srv_channel_fail )
SOL4_STATE_EVENT( evt_client_connected )
SOL4_STATE_EVENT( evt_client_disconnected )
SOL4_STATE_EVENT( evt_channel_data )
SOL4_STATE_FINISH()
SOL4_CLASS_FINISH()
ACE_THR_FUNC_RETURN
sobjectizer_thread( void * )
{
so_4::ret_code_t rc = so_4::api::start(
so_4::disp::active_obj::create_disp(
so_4::timer_thread::simple::create_timer_thread(),
so_4::auto_destroy_timer ),
so_4::auto_destroy_disp, 0 );
if( rc )
{
std::cerr << "start: " << rc << std::endl;
}
return 0;
}
void
create_coop( const std::string & ip )
{
so_4::rt::dyn_agent_coop_helper_t helper(
new so_4::rt::dyn_agent_coop_t(
new a_router_t( ip ) ) );
}
void
destroy_coop()
{
so_4::api::deregister_coop( router_agent_name );
}
int
main( int argc, char ** argv )
{
if( 2 == argc )
{
ACE_Thread_Manager::instance()->spawn( &sobjectizer_thread );
ACE_OS::sleep( 1 );
bool is_continue = true;
while( is_continue )
{
std::string choice;
std::cout << "Choose action:\n"
"\t0 - quit\n"
"\t1 - create router coop\n"
"\t2 - destroy router coop\n> "
<< std::flush;
std::cin >> choice;
if( choice == "0" )
{
so_4::api::send_msg(
so_4::rt::sobjectizer_agent_name(),
"msg_normal_shutdown", 0,
so_4::rt::sobjectizer_agent_name() );
is_continue = false;
}
else if( choice == "1" )
create_coop( argv[ 1 ] );
else if( choice == "2" )
destroy_coop();
}
ACE_Thread_Manager::instance()->wait();
return 0;
}
else
{
std::cerr << "sample_parent_insend <server_sock_addr>"
<< std::endl;
return -1;
}
}