sample/filter/c2.cpp

/*
  Клиент #2.

  Получает запросы сервера и отсылает серверу ответы.

  Демонстрирует возможность запуска SObjectizer-а на вспомогательной
  нити и проведения диалога с пользователем на контексте главной нити.
*/

#include <iostream>
#include <memory>

#include <ace/OS.h>
#include <ace/Thread_Manager.h>

#include <so_4/rt/h/rt.hpp>
#include <so_4/rt/h/msg_auto_ptr.hpp>

#include <so_4/rt/comm/h/a_sop_outgoing_channel.hpp>
#include <so_4/transport_layer/socket/h/pub.hpp>

#include <so_4/api/h/api.hpp>

#include <so_4/api/h/api.hpp>

#include <so_4/timer_thread/simple/h/pub.hpp>
#include <so_4/disp/active_obj/h/pub.hpp>

#include "c2i.hpp"
#include "date_time_formatter.hpp"

//
// Класс агента клиента #2.
//
class a_cln_t :
  public so_4::rt::agent_t
{
  typedef so_4::rt::agent_t base_type_t;
  public :
    a_cln_t();
    virtual ~a_cln_t();

    virtual const char *
    so_query_type() const;

    virtual void
    so_on_subscription();

    // Имя единственного агента этого типа в приложении.
    static const std::string &
    agent_name();

    // Реакция на появление агента в системе.
    void
    evt_start();

    // Реакция на запрос сервера.
    void
    evt_server_request(
      const c2i_t::msg_request * );
};

SOL4_CLASS_START( a_cln_t )

  SOL4_EVENT( evt_start )

  SOL4_EVENT_STC(
    evt_server_request,
    c2i_t::msg_request )

  SOL4_STATE_START( st_normal )
    SOL4_STATE_EVENT( evt_start )
    SOL4_STATE_EVENT( evt_server_request )
  SOL4_STATE_FINISH()

SOL4_CLASS_FINISH()

a_cln_t::a_cln_t()
  :
    base_type_t( agent_name().c_str() )
{
  // Делаем агента активным объектом.
  so_add_traits( so_4::disp::active_obj::
    query_active_obj_traits() );
}

a_cln_t::~a_cln_t()
{
}

void
a_cln_t::so_on_subscription()
{
  // Подписываем те события, инциденты для которых
  // в системе уже существуют.

  so_subscribe( "evt_start",
    so_4::rt::sobjectizer_agent_name(), "msg_start" );

}

const std::string &
a_cln_t::agent_name()
{
  static std::string name( "a_cln_t" );
  return name;
}

void
a_cln_t::evt_start()
{
  // Агент стартовал.

  // Нужно зарегистрировать глобального агента и
  // подписаться на его сообщение.
  so_4::ret_code_t rc = so_4::api::make_global_agent(
    c2i_t::agent_name(),
    c2i_t::agent_type() );
  if( rc )
    std::cerr << rc << std::endl;
  else
    so_subscribe( "evt_server_request",
      c2i_t::agent_name(), "msg_request" );
}

void
a_cln_t::evt_server_request(
  const c2i_t::msg_request * cmd )
{
  ACE_Date_Time now;
  std::cout << "Request from server: received at: "
      << format( now ) << std::endl;

  std::auto_ptr< c2i_t::msg_reply > reply(
      new c2i_t::msg_reply(
          cmd->m_act,
          now ) );
  so_4::api::send_msg_safely( c2i_t::agent_name(),
    "msg_reply", reply );
}

// Создание кооперации, в которую входит главный агент
// клиента #2 и коммуникационный агент (клиентский сокет).
void
create_coop(
  // Адрес серверного сокета.
  const char * sock_addr )
{
  a_cln_t * a_cln = new a_cln_t();

  // Создание клиентского сокета для взаимодействия по SOP.
  // Фильтр, который допускает только сообщения агента c2i.
  std::auto_ptr< so_4::sop::std_filter_t > filter(
    so_4::sop::create_std_filter() );
  filter->insert( c2i_t::agent_name() );

  so_4::rt::comm::a_sop_outgoing_channel_t * a_sock =
    new so_4::rt::comm::a_sop_outgoing_channel_t(
      "a_sock",
      so_4::transport_layer::socket::create_connector_controller(
          sock_addr ),
      // Назначаем фильтр.
      so_4::sop::filter_auto_ptr_t( filter ),
      // Назначаем обработчик разрывов связи.
      so_4::rt::comm::create_def_disconnect_handler( 5000, 0 ) );
  // Сокет так же будет активным агентом.
  so_4::disp::active_obj::make_active( *a_sock );

  so_4::rt::agent_t * agents[] =
  {
    a_cln, a_sock
  };
  so_4::rt::dyn_agent_coop_helper_t coop_helper(
    new so_4::rt::dyn_agent_coop_t(
      "client_coop", agents,
      sizeof( agents ) / sizeof( agents[ 0 ] ) ) );

  if( coop_helper.result() )
    std::cerr << "register_coop:\n"
      << coop_helper.result() << std::endl;
}

// Уничтожение клиентской кооперации.
void
destroy_coop()
{
  so_4::ret_code_t rc =
    so_4::api::deregister_coop( "client_coop" );
  if( rc )
    std::cerr << "deregister_coop:\n" << rc << std::endl;
}

//
// Нить, на которой будет происходить запуск SObjectizer-а
//
ACE_THR_FUNC_RETURN
sobjectizer_thread( void * )
{
  std::auto_ptr< so_4::timer_thread::timer_thread_t >
    timer_ptr( so_4::timer_thread::simple::create_timer_thread() );

  std::auto_ptr< so_4::rt::dispatcher_t >
    disp_ptr( so_4::disp::active_obj::create_disp( *timer_ptr ) );

  so_4::ret_code_t rc = so_4::api::start( *disp_ptr, 0 );
  if( rc )
  {
    std::cerr << "start: " << rc << std::endl;
  }

  return 0;
}

int
main( int argc, char ** argv )
{
  if( 2 == argc )
  {
    ACE_Thread_Manager::instance()->spawn( &sobjectizer_thread );

    // Засыпаем, чтобы дать стартовать SObjectizer.
    // Это самый простой способ синхронизации с sobj_thread_t.
    ACE_OS::sleep( 1 );

    bool is_continue = true;
    while( is_continue )
    {
      std::string choice;

      std::cout << "Choose action:\n"
        "\t0 - quit\n"
        "\t1 - create client coop\n"
        "\t2 - destroy client coop\n> "
        << std::flush;

      std::cin >> choice;

      if( choice == "0" )
      {
        // Завершаем работу.
        so_4::api::send_msg(
          so_4::rt::sobjectizer_agent_name(),
          "msg_normal_shutdown", 0,
          so_4::rt::sobjectizer_agent_name() );
        is_continue = false;
      }
      else if( choice == "1" )
        // Создаем кооперацию клиента.
        create_coop( argv[ 1 ] );
      else if( choice == "2" )
        // Уничтожаем кооперацию клиента.
        destroy_coop();
    }

    // Ожидаем завершения SObjectizer.
    ACE_Thread_Manager::instance()->wait();

    return 0;
  }
  else
  {
    std::cerr << "sample_filter_c2 <server_sock_addr>"
      << std::endl;

    return -1;
  }
}


Документация по SObjectizer v.4.4 'Тебуломста'. Последние изменения: Thu Sep 18 10:26:45 2008. Создано системой  doxygen1.5.6 Intervale SourceForge.net Logo