so_4: Версия 4.4.0. Новая транспортная подсистема

Целью разработки версии 4.4.0 является полный переход от самописных средств поддержки низкоуровневых механизмов ОС (многопоточность, IPC) на средства библиотеки ACE (http://www.cs.wustl.edu/~schmidt/ACE.html). Наиболее существенно этот переход затронул транспортную подсистему SObjectizer.

Транспортная подсистема предыдущих версий SObjectizer (начиная с версии 4.2.4, когда был добавлен унифицированный механизм io_channels и транспортных агентов) была ориентирована на сокеты и TCP/IP, была очень примитивной и не эффективной. Библиотека ACE позволяет реализовать транспортный слой SObjectizer гораздо более эффективно. Поэтому в версии 4.4.0 произведены очень серьезные преобразования транспортной подсистемы: выброшен механизм io_channels, выброшены все старые транспортные агенты. Вместо них в SObjectizer добавлены совершенно новые средства: пространство имен so_4::transport_layer и новые транспортные агенты.

При этом перевод старых приложений на SObjectizer 4.4.0 будет довольно простым, поскольку основные принципы взаимодействия прикладных агентов с транспортными остались прежними.

Рассказ о новой транспортной подсистеме будет состоять из двух частей: сначала рассказывается о новых транспортных агентах, а после этого о деталях реализации IPC механизмов в so_4::transport_layer.

См.так же so_4: Версия 4.4.0. Управление ACE-реакторами для информации о том, как можно управлять ACE-реакторами и привязывать транспортных агентов к реакторам.

Транспортные агенты

В версии 4.4.0 добавлены новые типы агентов для SOP- (so_4::rt::comm::a_sop_outgoing_channel_t, so_4::rt::comm::a_sop_incoming_channel_processor_t) и RAW-каналов (so_4::rt::comm::a_raw_outgoing_channel_t, so_4::rt::comm::a_raw_incoming_channel_processor_t). В действительности эти агенты реализованны на общей базе (класс so_4::rt::comm::a_channel_processor_base_t) и отличаются принципами обработки входящего/исходящего трафика, а так же наборами сообщений и порядком отсылки некоторых сообщений.

Транспортные агенты для исходящих каналов

Для работы с исходящими каналами (т.е. когда клиент подключается к удаленному серверу) предназначены агенты so_4::rt::comm::a_sop_outgoing_channel_t и so_4::rt::comm::a_raw_outgoing_channel_t. Они получают все необходимые параметры в конструкторе и начав работу в SObjectizer (т.е. по сообщению msg_start) предпринимают попытки установления соединения с удаленным сервером. Если попытка удалась, то отсылается сообщения so_4::rt::comm::msg_success и so_4::rt::comm::msg_client_connected. Если попытка не удалась, то отсылается сообщение so_4::rt::comm::msg_fail.

Создание транспортного агента выглядит, например, вот так:

//
// Для RAW-канала.
//

so_4::rt::comm::a_raw_outgoing_channel_t * a_tcp_clnsock =
  new so_4::rt::comm::a_raw_outgoing_channel_t(
    // Имя агента.
    "a_raw_client",
    // connector_controller, который будет устанавливать
    // соединение с удаленным узлом.
    so_4::transport_layer::socket::create_connector_controller( ip_address ),
    // Без обработчика разрывов связи.
    so_4::rt::comm::disconnect_handler_auto_ptr_t() );

//
// Для SOP-канала.
//

so_4::rt::comm::a_sop_outgoing_channel_t * a_sock =
  new so_4::rt::comm::a_sop_outgoing_channel_t(
    // Имя агента.
    "a_sop_client",
    // connector_controller, который будет устанавливать
    // соединение с удаленным узлом.
    so_4::transport_layer::socket::create_connector_controller( ip_address ),
    // Фильтр канала.
    so_4::sop::filter_auto_ptr_t( filter ),
    // Обработчик разрывов связи.
    so_4::rt::comm::create_def_disconnect_handler(
        // Повторять попытки подключения каждые 5 секунд.
        5000,
        // При разрыве связи сразу пытаться восстановить подкючение.
        0 ) );

Транспортному агенту можно запретить предпринимать попытки установления соединения при обработке сообщения msg_start (т.е. когда нужно создать агента, но еще не нужно устанавливать соединение). Для этого следует воспользоваться методами: so_4::rt::comm::a_sop_outgoing_channel_t::set_autoconnect_on_start() и so_4::rt::comm::a_raw_outgoing_channel_t::set_autoconnect_on_start(). Вызывать их нужно перед регистрацией агентов!

so_4::rt::comm::a_sop_outgoing_channel_t * a_sock =
  new so_4::rt::comm::a_sop_outgoing_channel_t( ... );
// Запрет агенту самому устанавливать соединение при старте.
a_sock->set_autoconnect_on_start( false );

При разрыве соединения агент отсылает сообщение so_4::rt::comm::msg_client_disconnected. Если агенту назначен обработчик разрывов связи, то этот обработчик задействуется для восстановления связи.

Инициировать установление соединения можно посредством сообщения so_4::rt::comm::msg_connect. Принудительно закрыть установленное соединение можно посредством сообщения so_4::rt::comm::msg_close_channel.

RAW-каналы отсылают прочитанные ими данные посредством сообщений so_4::rt::comm::msg_raw_package. Для отсылки данных в RAW-канал транспортному агенту нужно отослать сообщение so_4::rt::comm::msg_send_package.

Заметки:
Для SOP-канала сообщение msg_client_connected может придти значительно позже сообщения msg_success. Происходит это потому, что клиент обменивается с сервером специальными сообщениями -- выполняют процедуру handshake. Только по ее успешному завершению SOP-канал считается созданным и инициируется сообщение msg_client_connected. Если же handshake завершается неудачно или вообще обрывается из-за ошибок ввода-вывода, то сообщения msg_client_connected после msg_success вообще не будет.

Транспортные агенты для входящих каналов

Для работы с входящими каналами (т.е. когда сервер принимает подключения удаленных клиентов) предназначены агенты so_4::rt::comm::a_sop_incoming_channel_processor_t и so_4::rt::comm::a_raw_incoming_channel_processor_t. Название channel_processor говорит о том, что один агент обслуживает всех подключившихся клиентов.

Данные агенты получают все необходимые параметры в конструкторе и начав работу в SObjectizer (т.е. по сообщению msg_start) создают серверную IPC-точку доступа (например, для случая TCP/IP -- серверный сокет, переведенный в "слушающее" состояние). Если создание серверной IPC-точки проходит успешно, то отсылается сообщение so_4::rt::comm::msg_success, после чего агенты начинают принимать входящие подключения. Если попытка создания IPC-точки не удалась, то отсылается сообщение so_4::rt::comm::msg_fail после чего агент переходит в состояние st_failed и ничего больше не делает.

Создание транспортного агента выглядит, например, вот так:

//
// Для RAW-каналов.
//
so_4::rt::comm::a_raw_incoming_channel_processor_t * a_tcp_srvsock =
  new so_4::rt::comm::a_raw_incoming_channel_processor_t(
    // Имя агента.
    "a_raw_server",
    // acceptor_connector, который будет создавать IPC-точку
    // и принимать новые подключения.
    so_4::transport_layer::socket::create_acceptor_controller(
        ip_address ) );

//
// Для SOP-каналов.
//
so_4::rt::comm::a_sop_incoming_channel_processor_t * a_channel =
  new so_4::rt::comm::a_sop_incoming_channel_processor_t(
    // Имя агента.
    "a_sop_server",
    // acceptor_connector, который будет создавать IPC-точку
    // и принимать новые подключения.
    so_4::transport_layer::socket::create_acceptor_controller( ip_address ) );

Если SOP-каналам нужен серверный фильтр, то он должен быть передан в конструкторе so_4::rt::comm::a_sop_incoming_channel_processor_t:

so_4::rt::comm::a_sop_incoming_channel_processor_t * a_channel =
  new so_4::rt::comm::a_sop_incoming_channel_processor_t(
    "a_sop_server",
    so_4::transport_layer::socket::create_acceptor_controller( ip_address ),
    so_4::sop::filter_auto_ptr_t( filter ) );

При принятии очередного подключения агенты отсылают сообщение so_4::rt::comm::msg_client_connected. При отключении клиента -- сообщение so_4::rt::comm::msg_client_disconnected. Принудительно закрыть клиента можно с помощью сообщения so_4::rt::comm::msg_close_channel.

RAW-каналы отсылают прочитанные ими данные посредством сообщений so_4::rt::comm::msg_raw_package. Для отсылки данных в RAW-канал транспортному агенту нужно отослать сообщение so_4::rt::comm::msg_send_package. При этом нужно помнить, что в случае агента a_raw_incoming_channel_processor_t один агент обслуживает весь трафик всех клиентов.

Пороги ввода/вывода

В отличии от предыдущих версий в версии 4.4.0 осталось только понятие порога входящих данных. Эта величина описывается типом so_4::transport_layer::threshold_t и указывает количество пакетов или их суммарный объем, после которого чтение из канала нужно прекратить. Канал, который превысил порог входящих данных считается заблокированным. Признак блокированности канала передается в сообщении so_4::rt::comm::msg_raw_package.

Если канал находится в заблокированном состоянии слишком долго, он принудительно закрывается. Поэтому нужно не забывать выполнять разблокирование канала при получении сообщения msg_raw_package (например, обращаясь к методу so_4::rt::comm::msg_raw_package::unblock_channel()).

Компрессия SOP-трафика

Предыдущие версии SObjectizer выполняли автоматическую компрессию SOP-трафика с помощью библиотеки zlib. Отключить ее было невозможно.

Как показали замеры производительности транспортного слоя в версии 4.4.0 архивация трафика приводит к существенному (до 2-х раз) снижению пропускной способности на быстрых каналах (например, при работе приложений на одной машине или в быстрой локальной сети). Поэтому в версии 4.4.0 компрессия SOP-трафика по умолчанию отключена.

Однако, если SObjectizer-приложения работают на территориально удаленных друг от друга компьютерах, соединенных медленным каналом, компрессия SOP-трафика может оказаться удобным решением.

Для того, чтобы можно было управлять компрессией SOP-трафика в версии 4.4.0 был добавлен класс so_4::rt::comm::handshaking_params_t, который позволяет задать параметры процедуры handshake. Процедура handshake выполняется автоматически при подключении SOP-клиента к SOP-серверу. Во время этой процедуры стороны договариваются о различных параметрах будущего SOP-соединения.

В версии 4.4.0 единственным параметром, который обговаривается во время процедуры handshake является режим компрессии SOP-трафика. Если на обоих сторонах компрессия разрешена, то SOP-трафик будет сжиматься посредством библиотеки zlib. Если же хотя бы одна сторона отказывается от компрессии, то SOP-трафик будет идти в неархивированном виде.

Для того, чтобы включить компрессию, необходимо:

Например:

so_4::rt::comm::a_sop_outgoing_channel_t a_channel(
  "a_channel",
  so_4::transport_layer::socket::create_connector_controller(
      cfg.m_ip ),
  so_4::sop::filter_auto_ptr_t( filter ),
  so_4::rt::comm::create_def_disconnect_handler( 1000, 0 ) );

a_channel.set_handshaking_params(
    so_4::rt::comm::handshaking_params_t().enable_compression() );

Параметры каналов

В отличии от предыдущих версий в версии 4.4.0 появилась возможность управлять некоторыми параметрами транспортных каналов. Например, можно установить размеры буферов чтения/записи (т.е. максимальное количество байт пересылаемых/получаемых за одну операцию send/recv), максимальный объем буфера исходящих данных, порог входящих данных, максимальные времена блокированности канала и отсутствия готовности канала к операции send.

Все эти значения задаются в объекте so_4::transport_layer::channel_params_t. Если этот параметр не задан, то SObjectizer использует собственные значения по умолчанию.

Например, вот так можно указать, что канал может быть заблокированным не более 5 секунд, а операция записи должна выполняться блоками по 64K:

so_4::rt::comm::a_raw_outgoing_channel_t * a_tcp_clnsock =
  new so_4::rt::comm::a_raw_outgoing_channel_t(
    "a_tcp_client",
    so_4::transport_layer::socket::create_connector_controller(
      so_4::transport_layer::socket::connector_params( ip_address ),
      so_4::transport_layer::channel_params_t().
        set_max_input_block_timeout( 5000 ).
        set_output_portion_size( 64*1024 ) )
    so_4::rt::comm::disconnect_handler_auto_ptr_t() );

Детали реализации транспортного слоя

ACE предлагает большой набор средств для организации IPC. Сюда входят классы для поддержки конкретных типов IPC (например, такие как ACE_SOCK_Stream), классы для организации реактивной (ACE_Event_Handler и ACE_Reactor) и проактивной обработки событий ввода-вывода (ACE_Handler и ACE_Proactor), а так же наиболее высокоуровневые ACE_Acceptor/ACE_Connector и ACE_Asynch_Acceptor/ACE_Async_Connector.

В SObjectizer предпринята попытка абстрагировать транспортных агентов от типа транспорта (TCP/IP, пайпы, разделяемая память и т.д.) и способов выполнения операций ввода-вывода (синхронных и асинхронных). Для этого в so_4::transport_layer определяются следующие основные интерфейсы:

Интерфейсы connector_controller_t и acceptor_controller_t являются отображением в SObjectizer ACE-овского подхода по выделению Connector-ов и Acceptor-ов в отдельные обобщенные классы. Реализации этих интефейсов отвечают за создание конкретных реализаций интерфейса channel_controller_t. Т.е. в SObjectizer для каждого типа транспорта и способа реализации ввода/вывода существует своя реализация упомянутых выше интерфейсов.

Благодоря этому транспортный агент должен получить в конструкторе объект connector_controller (для исходящих каналов) или acceptor_controller (для входящих каналов), а уже выбор конкретных реализаций этих объектов лежит на пользователе. Так, если требуется использовать TCP/IP, то пользователю следует воспользоваться одной из функций в so_4::transport_layer::socket для создания подходящей реализации connector/acceptor_controller-а.

Транспортный агент для исходящих соединений при необходимости выполнения подключения к удаленному узлу у connector_controller-а вызывается метод so_4::transport_layer::connector_controller_t::connect().В результате чего агенту отсылается либо so_4::transport_layer::msg_channel_created (если соединение установлено), либо so_4::transport_layer::msg_channel_failed (если соединение не установлено).

Транспортный агент для входящих соединений при получении сообщения so_4::rt::msg_start вызывает у своего acceptor_controller-а метод so_4::transport_layer::acceptor_controller_t::create(). После чего acceptor_controller выполняет свою работу и отсылает транспортному агенту сообщения so_4::transport_layer::msg_channel_created при подключении очередного клиента.

В сообщении so_4::transport_layer::msg_channel_created транспортному агенту передается указатель на конкретную реализацию channel_controller. Агент должен сохранить этот указатель у себя. При необходимости закрыть соединение агент должен просто вызвать delete для данного указателя. Если же соединение закрывается по каким-то внешним причинам (диагностируется разрыв связи, к примеру), то транспортному агенту отсылается сообщение so_4::transport_layer::msg_channel_lost.

Контексты, на которых выполняются операции ввода вывода

В предыдущих версиях SObjectizer транспортные агенты выполняли операции ввода-вывода в своих событиях. Поэтому транспортные агенты было разумно делать активными агентами или членами активных групп, чтобы ввод-вывод не сказывался на других агентах. Новая транспортная подсистема использует классы ACE_Reactor и ACE_Event_Handler для того, чтобы отслеживать готовность каналов к чтению/записи. Это означает, что методы ACE_Event_Handler::handle_input/handle_output выполняются на нити некоторого ACE_Reactor-а. И операции ввода/вывода так же выполняются на этой же нити.

С операциями чтения все просто -- они всегда осуществляются в handle_input. Но с операциями записи ситуация сложнее. Когда транспортный агент получает сообщение so_4::rt::comm::msg_send_package, он сразу пытается выполнить запись в канал на контексте той нити, на которой его запустил диспетчер. Если же запись была выполнена не полностью, то остаток данных помещается в буфер исходящих данных и ACE_Reactor-у дается указание определить момент готовности канала к записи. Если ACE_Reactor-у удается это определить, он вызывает метод handle_output, но на контексте собственной нити. Т.е. операция записи может выполняться либо на конктексте обработчика агента, либо на контексте ACE_Reactor-а, обслуживающего данный канал.

Следствиями из такого способа работы являются:


Документация по SObjectizer v.4.4 'Тебуломста'. Последние изменения: Thu Sep 18 10:26:48 2008. Создано системой  doxygen1.5.6 Intervale SourceForge.net Logo