sample/high_traffic/client.cpp

/*
  Тестирование поведения коммуникационных агентов при
  большом трафике.

  Клиентская часть.
*/

#include <iostream>
#include <set>

#include <stdio.h>

#include <ace/OS_main.h>

#include <so_4/api/h/api.hpp>
#include <so_4/rt/h/rt.hpp>

#include <so_4/rt/comm/h/a_sop_outgoing_channel.hpp>
#include <so_4/transport_layer/socket/h/pub.hpp>

#include <so_4/timer_thread/simple/h/pub.hpp>
#include <so_4/disp/active_obj/h/pub.hpp>

#include "common.cpp"

// Класс тестового агента, который отсылает сообщения.
class a_sender_t
  : public so_4::rt::agent_t
{
  typedef so_4::rt::agent_t base_type_t;
  private :
    // Размер одного запроса.
    unsigned int  m_data_size;

    // Количество запросов, которые нужно отослать на сервер.
    unsigned int  m_request_count;
    // Количество полученных от сервера ответов.
    unsigned int  m_reply_received;

    // Количество запросов в группе.
    unsigned int  m_group_size;
    // Тайм-аут между отсылкой групп.
    unsigned int  m_timeout;

    // Счетчик идентификаторов для отсылаемых запросов.
    unsigned int  m_uid;

    // Множество идентификаторов отосланных запросов,
    // на которые еще не были полученны ответы.
    std::set< unsigned int >  m_no_reply_uids;

    // Признак того, что есть соединение с сервером.
    bool  m_is_connected;

    void
    show_stat() const
    {
      double percents = double( m_reply_received ) /
        double( m_request_count ) * 100.0;
      std::cout << "*** "
        << ( m_is_connected ? "connected" : "not connected" )
        << ", " << percents << "% ("
        << m_no_reply_uids.size() << ")"
        << std::endl;
    }

    void
    send( unsigned int uid,
      bool insert_to_no_reply_uids = true )
    {
      so_4::api::send_msg_safely(
        a_common_t::agent_name(),
        "msg_request",
        new a_common_t::msg_request( uid, m_data_size ) );
      if( insert_to_no_reply_uids )
        m_no_reply_uids.insert( uid );
    }

  public :
    a_sender_t(
      unsigned int data_size,
      unsigned int request_count,
      unsigned int group_size,
      unsigned int timeout )
    : base_type_t( "a_receiver" )
    , m_data_size( data_size )
    , m_request_count( request_count )
    , m_reply_received( 0 )
    , m_group_size( group_size )
    , m_timeout( timeout )
    , m_uid( 0 )
    , m_is_connected( false )
    {
    }
    virtual ~a_sender_t()
    {}

    struct  msg_timeout {};

    virtual const char *
    so_query_type() const;

    virtual void
    so_on_subscription()
    {
      // Агент a_common должен быть глобальным.
      so_4::api::make_global_agent(
        a_common_t::agent_name(),
        a_common_t::agent_type() );

      so_subscribe( "evt_start",
        so_4::rt::sobjectizer_agent_name(), "msg_start" );

      so_subscribe( "evt_client_connected",
        so_4::rt::comm::communicator_agent_name(),
        "msg_client_connected" );

      so_subscribe( "evt_client_disconnected",
        so_4::rt::comm::communicator_agent_name(),
        "msg_client_disconnected" );

      so_subscribe( "evt_reply",
        a_common_t::agent_name(),
        "msg_reply" );

      so_subscribe( "evt_timeout", "msg_timeout" );
    }

    void
    evt_start(
      const so_4::rt::event_data_t & )
    {
      so_4::api::send_msg( so_query_name(), "msg_timeout", 0,
        so_query_name(), 1000, 1000 * m_timeout );
    }

    void
    evt_client_connected(
      const so_4::rt::event_data_t & data,
      const so_4::rt::comm::msg_client_connected * cmd )
    {
      m_is_connected = true;
      show_stat();
    }

    void
    evt_client_disconnected(
      const so_4::rt::event_data_t & data,
      const so_4::rt::comm::msg_client_disconnected * cmd )
    {
      m_is_connected = false;
      show_stat();
    }

    void
    evt_reply(
      const so_4::rt::event_data_t &,
      const a_common_t::msg_reply * cmd )
    {
      if( m_no_reply_uids.find( cmd->m_uid ) !=
        m_no_reply_uids.end() )
      {
        m_no_reply_uids.erase( cmd->m_uid );
        ++m_reply_received;
//        show_stat();

        if( m_reply_received == m_request_count )
        {
          so_4::api::send_msg(
            so_4::rt::sobjectizer_agent_name(),
            "msg_normal_shutdown", 0 );
        }
      }
    }

    void
    evt_timeout(
      const so_4::rt::event_data_t & )
    {
      unsigned int sent = 0;

      show_stat();

      // Сначала отсылаем повторно те запросы, на которые не получено
      // ответов.
      for( std::set< unsigned int >::iterator
          it = m_no_reply_uids.begin(),
          it_end = m_no_reply_uids.end();
        it != it_end && sent != m_group_size;
        ++it,
        ++sent )
      {
        send( *it, false );
      }

      // Затем доведем группу до нужного размера новыми запросами.
      for( unsigned int count = 0;
        sent + count != m_group_size &&
        m_uid != m_request_count;
        ++count, ++m_uid )
      {
        send( m_uid );
      }
    }
};

SOL4_CLASS_START( a_sender_t )

  SOL4_MSG_START( msg_timeout, a_sender_t::msg_timeout )
  SOL4_MSG_FINISH()

  SOL4_EVENT( evt_start )
  SOL4_EVENT_WITH_INCIDENT_TYPE(
    evt_client_connected,
    so_4::rt::comm::msg_client_connected )
  SOL4_EVENT_WITH_INCIDENT_TYPE(
    evt_client_disconnected,
    so_4::rt::comm::msg_client_disconnected )
  SOL4_EVENT_WITH_INCIDENT_TYPE(
    evt_reply,
    a_common_t::msg_reply )
  SOL4_EVENT( evt_timeout )

  SOL4_STATE_START( st_normal )
    SOL4_STATE_EVENT( evt_start )
    SOL4_STATE_EVENT( evt_client_connected )
    SOL4_STATE_EVENT( evt_client_disconnected )
    SOL4_STATE_EVENT( evt_reply )
    SOL4_STATE_EVENT( evt_timeout )
  SOL4_STATE_FINISH()

SOL4_CLASS_FINISH()

int
main( int argc, char ** argv )
{
  if( 6 == argc )
  {
    unsigned int data_size = 0;
    sscanf( argv[ 2 ], "%u", &data_size );

    unsigned int count = 0;
    sscanf( argv[ 3 ], "%u", &count );

    unsigned int group_size = 0;
    sscanf( argv[ 4 ], "%u", &group_size );

    unsigned int timeout = 0;
    sscanf( argv[ 5 ], "%u", &timeout );


    so_4::sop::std_filter_t * filter =
      so_4::sop::create_std_filter();
    filter->insert( a_common_t::agent_name() );

    so_4::rt::comm::a_sop_outgoing_channel_t a_channel(
      "a_channel",
      so_4::transport_layer::socket::create_connector_controller(
          so_4::transport_layer::socket::connector_params(
              argv[ 1 ] ),
          so_4::transport_layer::channel_params_t().
              set_output_portion_size( 100 * 1024 ).
              set_max_output_buffer_size( 600 * 1024 ) ),
      so_4::sop::filter_auto_ptr_t( filter ),
      so_4::rt::comm::create_def_disconnect_handler( 5000, 0 ) );
    a_channel.so_add_traits(
      so_4::disp::active_obj::query_active_obj_traits() );

    a_sender_t a_sender(
      data_size, count, group_size, timeout );

    so_4::rt::agent_t * agents[] =
    {
      &a_channel, &a_sender
    };
    so_4::rt::agent_coop_t coop( "server",
      agents, sizeof( agents ) / sizeof( agents[ 0 ] ) );

    so_4::ret_code_t rc = so_4::api::start(
      // Диспетчер будет уничтожен при выходе из start().
      so_4::disp::active_obj::create_disp(
        // Таймер будет уничтожен диспетчером.
        so_4::timer_thread::simple::create_timer_thread(),
        so_4::auto_destroy_timer ),
      so_4::auto_destroy_disp,
      &coop );

    if( rc )
    {
      std::cerr << "start: " << rc << std::endl;
    }

    return int( rc );
  }

  std::cerr << "sample_high_traffic_client <[ip]:port> "
    "data_size count group_size timeout" << std::endl;
  return 1;
}

Документация по SObjectizer v.4.4 'Тебуломста'. Последние изменения: Thu Sep 18 10:26:45 2008. Создано системой  doxygen1.5.6 Intervale SourceForge.net Logo