sample/parent_insend/main.cpp

/*
  Демонстрация insend-событий и взаимоотношений коопераций.

  Пример работает в интерактивном режиме. Оператор указывает
  момент регистрации и дерегистрации кооперации маршрутизатора.
  Агент-маршрутизатор в своем событии evt_start создает
  дочернюю кооперацию с агентом серверного сокета.

  Далее агент-маршрутизатор отслеживает сообщения о подключениях
  новых клиентов. Для каждого нового клиента создается агент-обработчик.
  Все приходящие от клиента данные пересылаются маршрутизатором
  агентам-обработчикам.

  Агент серверного сокета и агенты-обработчики являются активными
  агентами. Агент-маршрутизатор является пассивным агентом, но
  он использует insend-события.
*/

#include <iostream>
#include <map>

#include <ace/OS.h>
#include <ace/Thread_Manager.h>

#include <cpp_util_2/h/lexcast.hpp>

#include <so_4/api/h/api.hpp>
#include <so_4/rt/h/rt.hpp>

#include <so_4/rt/comm/h/a_raw_incoming_channel_processor.hpp>
#include <so_4/transport_layer/socket/h/pub.hpp>

#include <so_4/timer_thread/simple/h/pub.hpp>
#include <so_4/disp/active_obj/h/pub.hpp>

// Имя агента-маршрутизатора и его кооперации.
const std::string router_agent_name( "a_router" );
// Имя агента-серверного сокета.
const std::string server_agent_name( "a_server" );

// Класс агента, который обслуживает конкретный канал.
class a_listener_t
: public so_4::rt::agent_t
{
  typedef so_4::rt::agent_t base_type_t;
  public :
    a_listener_t( const std::string & agent_name )
    : base_type_t( agent_name )
    {
      so_4::disp::active_obj::make_active( *this );

      std::cout << so_query_name() << " created" << std::endl;
    }
    virtual ~a_listener_t()
    {
      std::cout << so_query_name() << " destroyed" << std::endl;
    }

    // Владеет сообщением msg_data, тип которого уже определен.
    typedef so_4::rt::comm::msg_raw_package msg_data;

    virtual const char *
    so_query_type() const;

    void
    so_on_subscription()
    {
      so_subscribe( "evt_data", "msg_data" );
    }

    void
    evt_data( const msg_data & cmd )
    {
      std::cout << "agent: " << so_query_name()
        << ", channel: " << cmd.m_channel
        << ", received: " << cmd.m_package.size()
        << " byte(s)" << std::endl;

      cmd.unblock_channel();
    }
};

SOL4_CLASS_START( a_listener_t )

  SOL4_MSG_START( msg_data, a_listener_t::msg_data )
    SOL4_MSG_CHECKER( a_listener_t::msg_data::check )
  SOL4_MSG_FINISH()

  SOL4_EVENT_STC( evt_data, a_listener_t::msg_data )

  SOL4_STATE_START( st_normal )
    SOL4_STATE_EVENT( evt_data )
  SOL4_STATE_FINISH()

SOL4_CLASS_FINISH()

// Класс агента-маршрутизатора.
class a_router_t
: public so_4::rt::agent_t
{
  typedef so_4::rt::agent_t base_type_t;
  private :
    // Кооперация с агентом серверного сокета является
    // атрибутом агента-маршрутизатора. При этом кооперация
    // регистрируется как статическая, а не динамическая. Т.е. она не
    // будет уничтожена через delete при дерегистрации.
    so_4::rt::comm::a_raw_incoming_channel_processor_t  m_srv_channel;
    so_4::rt::agent_coop_t  m_srv_channel_coop;

    // Тип карты подключенных клиентов.
    typedef std::map< so_4::rt::comm_channel_t, std::string >
      client_map_t;

    // Карта подключенных клиентов.
    // Доступ к ней не синхронизируется, т.к. маршрутизатор работает
    // только на контексте нити агента серверного сокета.
    client_map_t  m_clients;

    // Счетчик для создания уникальных имен дочерних агентов.
    int m_child_counter;

    // Вспомогательный метод для упрощения hook-а подписки
    // на сообщения агента серверного сокета.
    void
    setup_subscr_hook(
      const std::string & event,
      const std::string & msg )
    {
      so_4::rt::def_subscr_hook( m_srv_channel_coop,
        *this, event, m_srv_channel, msg, 0, &std::cerr,
        so_4::rt::evt_subscr_t::insend_dispatching );
    }

  public :
    a_router_t(
      const std::string & ip )
    : base_type_t( router_agent_name )
    , m_srv_channel(
        server_agent_name,
        so_4::transport_layer::socket::create_acceptor_controller( ip ) )
    , m_srv_channel_coop( m_srv_channel )
    , m_child_counter( 0 )
    {
      // Серверный агент должен быть активным агентом.
      so_4::disp::active_obj::make_active( m_srv_channel );

      std::cout << so_query_name() << " created" << std::endl;
    }
    virtual ~a_router_t()
    {
      std::cout << so_query_name() << " destroyed" << std::endl;
    }

    virtual const char *
    so_query_type() const;

    virtual void
    so_on_subscription()
    {
      so_subscribe( "evt_start",
        so_4::rt::sobjectizer_agent_name(), "msg_start" );
    }

    void
    evt_start()
    {
      // Регистрируем кооперацию с серверным сокетом.
      // И подписываемся на сообщения агента серверного сокета.
      setup_subscr_hook( "evt_srv_channel_success", "msg_success" );
      setup_subscr_hook( "evt_srv_channel_fail", "msg_fail" );
      setup_subscr_hook( "evt_client_connected",
        "msg_client_connected" );
      setup_subscr_hook( "evt_client_disconnected",
        "msg_client_disconnected" );
      setup_subscr_hook( "evt_channel_data",
        "msg_raw_package" );

      // Регистрируем кооперацию.
      // Мы должны быть родителями для кооперации.
      m_srv_channel_coop.set_parent_coop_name(
        so_query_coop()->query_name() );
      so_4::api::register_coop( m_srv_channel_coop );
    }

    void
    evt_srv_channel_success()
    {
      std::cout << "server channel created" << std::endl;
    }

    void
    evt_srv_channel_fail(
      const so_4::rt::comm::msg_fail & cmd )
    {
      std::cout << "server channel not created: "
        << cmd.m_reason << std::endl;
    }

    void
    evt_client_connected(
      const so_4::rt::comm::msg_client_connected & cmd )
    {
      std::cout << "client connected: "
        << cmd.m_channel << std::endl;

      // Создаем для этого канала новую кооперацию.
      std::string agent_name( "a_listener_" +
        cpp_util_2::slexcast( ++m_child_counter ) );
      so_4::rt::dyn_agent_coop_t * coop(
        new so_4::rt::dyn_agent_coop_t(
          new a_listener_t( agent_name ) ) );
      // Указываем, что мы являемся родителями этой кооперации.
      coop->set_parent_coop_name(
        so_query_coop()->query_name() );

      // Регистрируем новую кооперацию.
      so_4::rt::dyn_agent_coop_helper_t h( coop );
      if( !h.result() )
      {
        m_clients[ cmd.m_channel ] = agent_name;
      }
    }

    void
    evt_client_disconnected(
      const so_4::rt::comm::msg_client_disconnected & cmd )
    {
      std::cout << "client disconnected: "
        << cmd.m_channel << std::endl;

      client_map_t::iterator it( m_clients.find( cmd.m_channel ) );
      if( it != m_clients.end() )
      {
        // Такой клиент нам известен. Нужно уничтожить прикладного
        // агента для этого клиента.
        so_4::api::deregister_coop( it->second );
        m_clients.erase( it );
      }
    }

    void
    evt_channel_data(
      const so_4::rt::comm::msg_raw_package & cmd )
    {
      // Определяем, какому агенту нужно переслать эти данные.
      client_map_t::iterator it( m_clients.find( cmd.m_channel ) );
      if( it != m_clients.end() )
      {
        so_4::api::send_msg_safely( it->second, "msg_data",
          new a_listener_t::msg_data( cmd ) );
      }
    }
};

SOL4_CLASS_START( a_router_t )

  SOL4_EVENT( evt_start )
  SOL4_EVENT( evt_srv_channel_success )
  SOL4_EVENT_STC( evt_srv_channel_fail,
    so_4::rt::comm::msg_fail )
  SOL4_EVENT_STC( evt_client_connected,
    so_4::rt::comm::msg_client_connected )
  SOL4_EVENT_STC( evt_client_disconnected,
    so_4::rt::comm::msg_client_disconnected )
  SOL4_EVENT_STC( evt_channel_data,
    so_4::rt::comm::msg_raw_package )

  SOL4_STATE_START( st_normal )
    SOL4_STATE_EVENT( evt_start )
    SOL4_STATE_EVENT( evt_srv_channel_success )
    SOL4_STATE_EVENT( evt_srv_channel_fail )
    SOL4_STATE_EVENT( evt_client_connected )
    SOL4_STATE_EVENT( evt_client_disconnected )
    SOL4_STATE_EVENT( evt_channel_data )
  SOL4_STATE_FINISH()

SOL4_CLASS_FINISH()

//
// Нить, на которой будет происходить запуск SObjectizer-а
//
ACE_THR_FUNC_RETURN
sobjectizer_thread( void * )
{
  so_4::ret_code_t rc = so_4::api::start(
    // Диспетчер будет уничтожен при выходе из start().
    so_4::disp::active_obj::create_disp(
      // Таймер будет уничтожен диспетчером.
      so_4::timer_thread::simple::create_timer_thread(),
      so_4::auto_destroy_timer ),
    so_4::auto_destroy_disp, 0 );
  if( rc )
  {
    std::cerr << "start: " << rc << std::endl;
  }

  return 0;
}

void
create_coop( const std::string & ip )
{
  so_4::rt::dyn_agent_coop_helper_t helper(
    new so_4::rt::dyn_agent_coop_t(
      new a_router_t( ip ) ) );
}

void
destroy_coop()
{
  so_4::api::deregister_coop( router_agent_name );
}

int
main( int argc, char ** argv )
{
  if( 2 == argc )
  {
    ACE_Thread_Manager::instance()->spawn( &sobjectizer_thread );

    // Засыпаем, чтобы дать стартовать SObjectizer.
    // Это самый простой способ синхронизации с sobj_thread_t.
    ACE_OS::sleep( 1 );

    bool is_continue = true;
    while( is_continue )
    {
      std::string choice;

      std::cout << "Choose action:\n"
        "\t0 - quit\n"
        "\t1 - create router coop\n"
        "\t2 - destroy router coop\n> "
        << std::flush;

      std::cin >> choice;

      if( choice == "0" )
      {
        // Завершаем работу.
        so_4::api::send_msg(
          so_4::rt::sobjectizer_agent_name(),
          "msg_normal_shutdown", 0,
          so_4::rt::sobjectizer_agent_name() );
        is_continue = false;
      }
      else if( choice == "1" )
        // Создаем кооперацию клиента.
        create_coop( argv[ 1 ] );
      else if( choice == "2" )
        // Уничтожаем кооперацию клиента.
        destroy_coop();
    }

    // Ожидаем завершения SObjectizer.
    ACE_Thread_Manager::instance()->wait();

    return 0;
  }
  else
  {
    std::cerr << "sample_parent_insend <server_sock_addr>"
      << std::endl;

    return -1;
  }
}


Документация по SObjectizer v.4.4 'Тебуломста'. Последние изменения: Thu Sep 18 10:26:45 2008. Создано системой  doxygen1.5.6 Intervale SourceForge.net Logo