ping_srv.cpp
#include <iostream>
#include <string>
#include <sstream>
#include <time.h>
#include <ace/OS_main.h>
#include <sample/mbapi_4/ping/h/messages.hpp>
namespace sample_mbapi_4
{
namespace
{
std::string
now_to_str()
{
std::ostringstream sout;
time_t t = time( 0 );
sout << asctime( localtime( &t ) );
return sout.str();
}
}
class a_main_t
:
{
public :
a_main_t(
:
base_type_t( env ),
m_self_endpoint( self_endpoint )
{}
virtual ~a_main_t()
{}
virtual void
so_define_agent();
void
evt_ping(
{
std::cout << "ping #" << msg->msg().m_num << "\n"
<< msg->msg().m_timestamp
<< "from: " << msg->from().name() << "\n"
<< "to: " << msg->to().name() << "\n\n";
std::unique_ptr< pong_t > pong( new pong_t );
pong->m_timestamp = now_to_str();
pong->m_num = msg->msg().m_num;
m_self_endpoint_bind->send(
msg->from(),
std::move( pong ) );
}
private:
m_self_endpoint_bind;
};
void
a_main_t::so_define_agent()
{
m_self_endpoint_bind = so_environment()
->create_endpoint_bind( m_self_endpoint, *this );
m_self_endpoint_bind->subscribe_event(
so_default_state(),
&a_main_t::evt_ping );
}
struct server_data_t
{
std::string m_server_addr;
std::string m_self_endpoint;
void
register_channel_coop(
{
"active_obj" ) );
acceptor_creator( env );
std::unique_ptr< a_server_transport_agent_t > ta(
new a_server_transport_agent_t(
env,
acceptor_creator.
create( m_server_addr ) ) );
a_mbapi_incoming_channel(
env,
ta->query_notificator_mbox(),
coop->add_agent( a_mbapi_incoming_channel );
if( 0 != rc )
{
std::cerr << "reg coop ret_code = " << rc << "\n\n";
throw std::runtime_error( "channel coop not registered" );
}
}
void
add_ping_messages_oess_repository(
{
->add_repository( ping_t::get_repository() );
}
void
{
register_channel_coop( env );
add_ping_messages_oess_repository( env );
new a_main_t(
env,
coop->add_agent( a_main );
if( 0 != rc )
{
std::cerr << "reg coop ret_code = " << rc << "\n\n";
throw std::runtime_error( "channel coop not registered" );
}
else
{
std::cout << "Type \"quit\" to quit." << std::endl;
std::string cmd;
do
{
std::cin >> cmd;
} while( cmd != "quit" );
}
}
};
}
int
main( int argc, char ** argv )
{
try
{
if( 3 == argc )
{
sample_mbapi_4::server_data_t server_data;
server_data.m_server_addr = argv[ 1 ];
server_data.m_self_endpoint = argv[ 2 ];
server_data,
&sample_mbapi_4::server_data_t::init,
.add_named_dispatcher(
.add_layer(
std::unique_ptr< so_5_transport::reactor_layer_t >(
.add_layer(
std::unique_ptr< mbapi_4::mbapi_layer_t >(
}
else
std::cerr << "sample.mbapi_4.ping.srv "
"<ip:port> <self-endpoint>" << std::endl;
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}
ping_cln.cpp
#include <iostream>
#include <string>
#include <sstream>
#include <time.h>
#include <sample/mbapi_4/ping/h/messages.hpp>
namespace sample_mbapi_4
{
namespace
{
std::string
now_to_str()
{
std::ostringstream sout;
time_t t = time( 0 );
sout << asctime( localtime( &t ) );
return sout.str();
}
}
class a_main_t
:
{
public :
a_main_t(
:
base_type_t( env ),
m_self_endpoint( self_endpoint ),
m_target_endpoint( target_endpoint ),
m_num_counter( 0 ),
m_mbox( so_environment().create_local_mbox() )
{}
virtual ~a_main_t()
{}
virtual void
so_define_agent();
virtual void
so_evt_start();
void
evt_ping(
{
std::cout << "ping #" << msg->msg().m_num << "\n"
<< msg->msg().m_timestamp
<< "from: " << msg->from().name() << "\n"
<< "to: " << msg->to().name() << "\n\n";
std::unique_ptr< pong_t > pong( new pong_t );
pong->m_timestamp = now_to_str();
pong->m_num = msg->msg().m_num;
m_self_endpoint_bind->send(
msg->from(),
std::move( pong ) );
}
void
evt_pong(
{
std::cout << "pong #" << msg->msg().m_num << "\n"
<< msg->msg().m_timestamp
<< "from: " << msg->from().name() << "\n"
<< "to: " << msg->to().name() << "\n\n";
}
struct tick
:
{};
void
evt_send(
{
std::unique_ptr< ping_t > ping( new ping_t );
ping->m_timestamp = now_to_str();
ping->m_num = ++m_num_counter;
m_self_endpoint_bind->send(
m_target_endpoint,
std::move( ping ) );
}
private:
m_self_endpoint_bind;
unsigned int m_num_counter;
};
void
a_main_t::so_define_agent()
{
m_self_endpoint_bind = so_environment()
->create_endpoint_bind( m_self_endpoint, *this );
so_subscribe( m_mbox )
.event( &a_main_t::evt_send );
m_self_endpoint_bind->subscribe_event(
so_default_state(),
&a_main_t::evt_ping );
m_self_endpoint_bind->subscribe_event(
so_default_state(),
&a_main_t::evt_pong );
}
void
a_main_t::so_evt_start()
{
m_tick_timer_id =
so_environment().schedule_timer< a_main_t::tick >(
m_mbox,
1 * 1000,
5 * 1000 );
}
struct client_data_t
{
std::string m_server_addr;
std::string m_self_endpoint;
std::string m_target_endpoint;
void
register_channel_coop(
{
"active_obj" ) );
connector_creator( env );
std::unique_ptr< a_client_transport_agent_t > ta(
new a_client_transport_agent_t(
env,
connector_creator.
create( m_server_addr ) ) );
a_mbapi_outgoing_channel(
env,
ta->query_notificator_mbox(),
coop->add_agent( a_mbapi_outgoing_channel );
if( 0 != rc )
{
std::cerr << "reg coop ret_code = " << rc << "\n\n";
throw std::runtime_error( "channel coop not registered" );
}
}
void
add_ping_messages_oess_repository(
{
->add_repository( ping_t::get_repository() );
}
void
{
register_channel_coop( env );
add_ping_messages_oess_repository( env );
new a_main_t(
env,
coop->add_agent( a_main );
if( 0 != rc )
{
std::cerr << "reg coop ret_code = " << rc << "\n\n";
throw std::runtime_error( "channel coop not registered" );
}
else
{
std::cout << "Type \"quit\" to quit." << std::endl;
std::string cmd;
do
{
std::cin >> cmd;
} while( cmd != "quit" );
}
}
};
}
int
main( int argc, char ** argv )
{
try
{
if( 4 == argc )
{
sample_mbapi_4::client_data_t client_data;
client_data.m_server_addr = argv[ 1 ];
client_data.m_self_endpoint = argv[ 2 ];
client_data.m_target_endpoint = argv[ 3 ];
client_data,
&sample_mbapi_4::client_data_t::init,
.add_named_dispatcher(
.add_layer(
std::unique_ptr< so_5_transport::reactor_layer_t >(
.add_layer(
std::unique_ptr< mbapi_4::mbapi_layer_t >(
}
else
std::cerr << "sample.mbapi_4.ping.cln "
"<ip:port> <self-endpoint> <target-endpoint>" << std::endl;
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}
ping_proxy.cpp
#include <iostream>
#include <string>
#include <cpp_util_2/h/lexcast.hpp>
namespace sample_mbapi_4
{
typedef std::vector< std::string > addr_list_t;
struct proxy_data_t
{
addr_list_t m_server_addrs;
addr_list_t m_client_addrs;
void
register_server_channel_coop(
const std::string & coop_name,
const std::string & addr )
{
"active_obj" ) );
acceptor_creator( env );
std::unique_ptr< a_server_transport_agent_t > ta(
new a_server_transport_agent_t(
env,
acceptor_creator.
create( addr ) ) );
a_mbapi_incoming_channel(
env,
ta->query_notificator_mbox(),
coop->add_agent( a_mbapi_incoming_channel );
if( 0 != rc )
{
std::cerr << "reg coop ret_code = " << rc << "\n\n";
throw std::runtime_error( "channel coop not registered" );
}
}
void
register_client_channel_coop(
const std::string & coop_name,
const std::string & addr )
{
"active_obj" ) );
connector_creator( env );
std::unique_ptr< a_client_transport_agent_t > ta(
new a_client_transport_agent_t(
env,
connector_creator.
create( addr ) ) );
a_mbapi_outgoing_channel(
env,
ta->query_notificator_mbox(),
coop->add_agent( a_mbapi_outgoing_channel );
if( 0 != rc )
{
std::cerr << "reg coop ret_code = " << rc << "\n\n";
throw std::runtime_error( "channel coop not registered" );
}
}
void
{
for(
size_t i = 0,
n = m_server_addrs.size();
++i )
{
register_server_channel_coop(
env,
"server_channel_coop_" + cpp_util_2::slexcast( i ),
m_server_addrs[ i ] );
}
for(
size_t i = 0, n = m_client_addrs.size();
++i )
{
register_client_channel_coop(
env,
"client_channel_coop_" + cpp_util_2::slexcast( i ),
m_client_addrs[ i ] );
}
std::cout << "Proxy started.\n"
"Type \"quit\" to quit." << std::endl;
std::string cmd;
do
{
std::cin >> cmd;
} while( cmd != "quit" );
}
};
}
const std::string g_server_arg = "-s";
const std::string g_client_arg = "-c";
int
main( int argc, char ** argv )
{
try
{
sample_mbapi_4::proxy_data_t proxy_data;
size_t option_index = 1;
while( option_index + 1 < argc )
{
if( argv[ option_index ] == g_server_arg )
proxy_data.m_server_addrs.push_back( argv[ option_index + 1 ] );
else if( argv[ option_index ] == g_client_arg )
proxy_data.m_client_addrs.push_back( argv[ option_index + 1 ] );
else
break;
option_index += 2;
}
if( option_index == argc )
{
proxy_data,
&sample_mbapi_4::proxy_data_t::init,
.add_named_dispatcher(
.add_layer(
std::unique_ptr< so_5_transport::reactor_layer_t >(
.add_layer(
std::unique_ptr< mbapi_4::mbapi_layer_t >(
}
else
std::cerr << "sample.mbapi_4.ping.proxy [OPTIONS]\n"
"OPTIONS:\n"
"\t-s <ip:port>\t server channel param\n"
"\t-c <ip:port>\t client channel param\n";
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}
messages.hpp
#if !defined( _SAMPLE__MBAPI_4__PING__MESSAGES_HPP_ )
#define _SAMPLE__MBAPI_4__PING__MESSAGES_HPP_
#include <string>
#include <oess_2/defs/h/types.hpp>
#include <oess_2/stdsn/h/serializable.hpp>
namespace sample_mbapi_4
{
struct ping_t
:
public oess_2::stdsn::serializable_t
{
OESS_SERIALIZER( ping_t )
public:
std::string m_timestamp;
oess_2::uint_t m_num;
};
struct pong_t
:
public oess_2::stdsn::serializable_t
{
OESS_SERIALIZER( pong_t )
public:
std::string m_timestamp;
oess_2::uint_t m_num;
};
}
#endif
messages.cpp
#include <oess_2/stdsn/h/serializable.hpp>
#include <oess_2/stdsn/h/ent_std.hpp>
#include <oess_2/stdsn/h/inout_templ.hpp>
#include <oess_2/stdsn/h/shptr.hpp>
#include <sample/mbapi_4/ping/h/messages.hpp>
#include "generated.ddl.cpp"
messages.ddl
{type sample_mbapi_4::ping_t
{attr m_timestamp
{of std::string}
}
{attr m_num
{of oess_2::uint_t}
}
}
{type sample_mbapi_4::pong_t
{attr m_timestamp
{of std::string}
}
{attr m_num
{of oess_2::uint_t}
}
}