SObjectizer  5.1

Оглавление

Стандартный транспорт SObjectizer Версия 1.1.0.

Введение

Sobjectizer 5-ого поколения сам по себе не имеет штатного транспорта, как было в 4-ой версии SObjectizer. Начиная с 5-ой версии транспорт предоставляется в виде отдельной библиотеки: so_5_transport.

Библиотека so_5_transport содержит набор классов, посредством которых можно создавать серверные и клиентские каналы, а также осуществлять из них чтение данных и запись. С ее помощью можно создавать сетевые приложения на SObjectizer, с которым интегрирована библиотека so_5.

Заметки
Текущая реализация so_5_transport реализована только для socket-ов, возможно впоследствии добавятся другие реализации.

Принцип использования so_5_transport состоит в том, что пользователь создает собственный агент обработчик сырых бинарных данных на базе готового серверного или клиентского агентов (см. Агент a_server_base_t и Агент a_client_base_t), в зависимости от типа соединения, которое нужно реализовать пользователю. Этот пользовательский агент-обработчик выполняет роль трансформатора бинарных данных прикладного протокола в сущности прикладной логики SObjectizer приложения и обратную трансформацию для отправки бинарных данных удаленной стороне, в соответствии с реализуемым протоколом. Вместе с пользовательским агентом-обработчиком в состав коммуникационной кооперации должен входить транспортный агент (см. Агент a_server_transport_agent_t, и Агент a_client_transport_agent_t) Который отвечает за низкоуровневую часть организации коммуникационного канала. Такая связка транспортного и канального агента, позволяет создавать коммуникационные каналы поддерживаюшие заданный проткол и использовать их для разработки распределенных приложений.

Если за прикладную часть обработки бинарных данных канала отвечает пользователь, то so_5_transport отвечает за то чтобы эти данные доставить до точки в зоне ответственности пользователя. Для этого пользователю предлагается набор интерфейсов и фабрики их реализаций с различными настройками, от которых зависят характеристики канала, такие как:

Первые две характеристики не зависят от конкретной реализации и определяются параметрами блочного буфера входящих и исходящих данных (см. Блочный буфер), В зависимости от характера трафика одни настройки могут оказаться более эффективными по сравнению с другими в одних условиях и – хуже в других.

Блочный буфер

Обоснование

Работа типичного сетевого приложение предполагает, что данные будут периодически зачитываться из канал и записываться в него. На низком уровне чтение значит, что данный из канала нужно записать в некоторый буфер, данные которого затем будут обработаны, а запись означает, что данные из некоторого буфера будут записаны в канал по средствам низкоуровневых функций ОС. Т.е. с одной стороны есть постоянная потребность в буфере для чтения и буфере для записи, а с другой стороны асинхронная природа SObjectizer. Для того чтобы была возможно читать данные из канала и записывать их на одно нити, а сериализовывать прикладные данные в буфер на другой нити приложения при этом обеспечивая последовательность данных и синхронизацию доступа к ним служит класс scattered_block_stream_t.

Класс содержит пул блоков-буферов. Из этого пула можно брать блоки писать в них и вставлять в поток. Из потока блоков можно брать блоки и зачитывать из них данные, после чего возвращать блоки в пул, а непрочитанные блоки в начало потока.

Функциональные возможности класса

Класс scattered_block_stream_t представляет собой низкоуровневый интерфейс для работы с набором буферов. Он комбинирует в себе возможности менеджера памяти, который позволяет выделять блоки и освобождать их, и потока данных, в который можно добавлять блоки, в которые были записаны данные, и извлекать блоки из потока.

Блоки реализованы с помощью класса raw_block_t, который является оберткой над непрерывным массивом байт заданной длины. Функциональность, которую предоставляет raw_block_t, позволяет использовать его для записи и чтения бинарных данных.

Выделение и освобождение блоков

Размер блоков raw_block_t, которыми оперирует scattered_block_stream_t, а также минимальное и максимальное количество выделяемых блоков задается в конструкторе.

Изначально scattered_block_stream_t выделяет себе минимальное количество оперативных блоков-буферов. Оперативные блоки – это готовые экземпляры raw_block_t, которые можно выделить сразу по требованию. Если все оперативные блоки закончились, например пользователь их взял или они все вставлены в поток, а пользователь обращается за очередным блоком, то scattered_block_stream_t выделяет себе еще ряд оперативных блоков, которые могут быть выданы пользователю. При этом общее количество блоков обращаемых в данном экземпляре scattered_block_stream_t не должно превышать максимального допустимого числа блоков, которое было задано в конструкторе. Такая стратегия выделения блоков, позволяет пережить кратковременные всплески объемов передаваемых или принимаемых данных. Если трафик впоследствии уменьшиться, то, при накоплении достаточного количества оперативных блоков – больше минимального, они будут уничтожаться. Это позволяет вернуть излишне выделенные ресурсы в тот момент, когда они не понадобятся сразу же после этого, т.к. создавать новые блоки понадобится не раньше чем пользователю будут выданы все оперативные блоки.

Для выделения блоков служит метод scattered_block_stream_t::occupy(), который имеет несколько версий, которые позволяют получать сразу цепочку блоков (raw_block_chain_t) и варьировать интервал времени в течении, которого можно ожидать появления оперативных блоков, которые и будут выделены.

Заметки
Если выделить блок невозможно, то scattered_block_stream_t::occupy() не выбрасывает исключение, а возвращает nullptr, либо оставляет цепочку пустой.

Для освобождения блоков служит метод scattered_block_stream_t::release(), который также имеет несколько версий, которые позволяют освобождать один блок или сразу цепочку блоков.

Всю заботу о реальной работе с памятью берут на себя описанные выше методы.

Работа с потоком

Поток представляет собой последовательность блоков, в каждом из которых есть какое-то количество записанных байт и эти байты выстроенные подряд представляют собой непрерывный поток бинарных данных. Такой подход служит базой для реализации channel_input_stream_t и channel_output_stream_t.

Все методы работы с потоком имеют имя stream_*:

Уведомители событий потока

Слушатели операций над потоком необходимы для организации асинхронной работы с scattered_block_stream_params_t, чтобы учесть нюансы возникающие со следующими событиями:

Нюансы связаны с внутренней реализацией so_5_transport, когда на такие события надо реагировать причем по разному для исходящих и входящих потоков данных. При работе с потоком, всегда кто-то пишет данные данные, а кто-то их читает. И scattered_block_stream_t разрабатывался для того, чтобы было возможно организовать работу пользователя-писателя и пользователя слушателя на разных нитях. Механизм уведомителей понадобился как раз для согласованной работы пользователя-писателя и пользователя читателя.

Уведомитель flush_notifier_t служит для информирования о том, что пользователь-писатель сделал сброс данных потока, и информирования о том что пользователь-читатель вычитал все имеющиеся на данный момент данные. Это позволяет организовать уведомление читателя, что можно читать данные, и больше такое уведомление не отправлять при последующем сбросе потока, пока читатель не вычитает все данные. При этом запись данных может продолжаться одновременно с чтением, при этом добавление новых данных в поток не будет приводить к уведомлению, т.к. читатель должен читать пока не обнаружит, что данных в потоке нет (scattered_block_stream_t::check_stream_has_blocks()).

Уведомитель free_blocks_availability_notifier_t служит для информирования о том, что пользователь-писатель исчерпал все возможные блоки и очередной запрос на выделение блоков оказался неудачным, а также о том что пользователь-читатель освободил некоторое количество блоков, которые теперь можно выделить для записи. Этот уведомитель играет важную роль играет важную роль при блокировке канала, когда, например, данные в сокете есть, но пользователь-читатель не успевает их разбирать, в результате, когда свободных блоков не остается, то канал на чтение данных из сокета и запись их в блочный буфер блокируется и разблокируется, когда блоков становится достаточно. Какое количество блоков достаточно, решает сам уведомитель.

Уведомитель stream_operation_notifier_t служит для информирования о том, что пользователь-писатель вставил блоки в начало или в конец потока, а также о том, что пользователь-читатель взял блоки из потока для чтения.

Механизм транзакций

Обоснование

При работе с При работе с потокам ввода/вывода, данные которых формируются при помощи сетевых соединений необходимо принимать во внимание следующие 2 условия.

Решение проблем, связанных с обозначенными выше условиями, берет на себя механизм транзакций, который является штатным способом для осуществления записи в канал и чтения данных из канала со стороны пользовательского кода при использовании so_5_transport.

Типы транзакций:

Сами экземпляры *_trx_t создаются обернутыми в std::unique_ptr. Уничтожение объекта транзакции, если она не была явно подтверждена или отменена, приводит к отмене транзакции.

Транзакция чтения данных

С началом транзакции input_trx_t можно вести чтение данных по средством потока oess_2::io::ibinstream_t библиотеки oess_2, для доступа к которому служит метод input_trx_t::istream(). В ходе чтения данных можно ставить контрольные точки, и продолжать транзакцию зафиксировав вычитанные из потока к данному моменту данные. Для этого служит метод input_trx_t::checkpoint(). Завершить транзакцию можно либо подтвердив – input_trx_t::commit(), либо отменив ее – input_trx_t::rollback(). При подтверждении транзакции вычитанные данные фиксируются. При отмене все вычитанные данные с момента последней контрольной точки будут возвращены обратно в начало потока входящих данных канала.

Важным вспомогательным методом является input_trx_t::stream_size(), который показывает сколько байт доступно для чтения в рамках текущей транзакции в данный момент. По мере считывание это число уменьшается. Это, например, позволяет оценить возможно ли считать заголовок некоторого сообщения бинарного протокола и самого сообщения:

a_sample_channel_processor_t::handle_incoming_data()
{
// Начинаем транзакцию чтения.
in_trx = m_channel_io.begin_input_trx();
// Пока можем считать заголовок, будем пытаться обработать данные.
while(
some_protocol::message_header_t::BIN_IMAGE_SIZE <= in_trx->stream_size() )
{
some_protocol::message_header_t header;
in_trx->istream() >> header;
// Если можно вычитать и само сообщение,
// то считываем и обрабатываем его.
if( header.m_message_length <= in_trx->stream_size() )
{
oess_2::io::isubbinstream_t msg_binstream(
in_trx->istream(),
header.m_message_length );
read_and_handle_message( header, msg_binstream );
// Подтверждаем зачитанный заголовок и само сообщение.
in_trx->checkpoint();
}
else
{
// Если само сообщение вычитать пока нельзя,
// то отменяем транзакцию, чтобы считанный
// заголовок снова оказался в начале потока.
in_trx->rollback();
}
}
}

Транзакция записи данных

С началом транзакции output_trx_t можно вести запись данных по средством потока oess_2::io::obinstream_t библиотеки oess_2, для доступа к которому служит метод output_trx_t::ostream(). В ходе чтения данных можно ставить контрольные точки, и продолжать транзакцию зафиксировав записанные в поток данные. Для этого служит метод output_trx_t::checkpoint(). Завершить транзакцию можно либо подтвердив – output_trx_t::commit(), либо отменив ее – output_trx_t::rollback(). При подтверждении транзакции записанные данные фиксируются. При отмене все записанные данные с момента последней контрольной точки будут потеряны и в исходящий поток данных канала не поступят.

Важным вспомогательным методом является output_trx_t::stream_size(), который показывает сколько байт записано для чтения в рамках текущей транзакции в данный момент, но не подтверждено, т.е. количество записанных байт с момента последней контрольной точки. Такая оценка полезна для более рациональной записи данных. Например:

a_sample_channel_processor_t::handle_outgoing_data(
const some_protocol::message_package_t & pkg )
{
// Начинаем транзакцию записи.
out_trx = m_channel_io.begin_output_trx();
std::for_each(
pkg.m_messages.begin(),
pkg.m_messages.end(),
[&out_trx ]( some_protocol::message_t & msg ){
out_trx->ostream() << msg;
// Если данных больше 1Kb, то фиксируем записанное.
if( 1024 < out_trx->stream_size() )
out_trx->checkpoint();
} );
// Фиксируем записанное.
out_trx->commit();
}

Контроллер канала

При установлении соединения с удаленной стороной, необходимо иметь возможность контролировать это соединение. Как минимум надо иметь возможность закрыть его. Кроме того при установлении соединения может понадобиться, чтобы операции вычитывания данных из канала не происходила, пока не произведены необходимые приготовления.

Для решения этих задач служит контролер канала – channel_controller_t.

Для закрытия канала служит метод channel_controller_t::close(). А для разрешения вычитывать данные из канала – channel_controller_t::enforce_input_detection().

Кроме того контролер канала предоставляет информация об адресах обоих сторон подключения: channel_controller_t::local_address_as_string() и channel_controller_t::remote_address_as_string().

Интеграция с паттернами Reactor и Acceptor-Connector и их реализацией в библиотеке ACE

Обоснование

SObjectizer написан с помощью библиотеки ACE. ACE предоставляет различные кроссплатформенные обертки над низкоуровневыми API различных ОС, в частности наиболее важными являются:

Т.к. API для сетевого программирования в различных ОС может сильно отличаться, было бы нерационально писать собственные кроссплатформенные обертки, проверить работоспособность которых представляется крайне затруднительным. Тогда встает вопрос, какую open-source библиотеку выбрать для использования в разработке транспорта SObjectizer. Этот вопрос был решен в пользу ACE, хотя существует и ряд других не менее хороших open-source библиотек – например, Boost и Qt. Эта библиотека доказала свою применимость на практике и, более того, уже используется в SObjectizer, поэтому и была выбрана в качестве низкоуровневого каркаса для реализации сетевого слоя.

Для организации сетевых соединений на данный момент используется комбинация двух следующих паттернов:

Интеграция с каркасом ACE_Reactor

Вспомогательные классы для работы с реатоками ACE находятся в пространстве имен so_5_transport::ace.

Для создания готовых реакторов можно воспользоваться следующими функциями:

Для того чтобы реакторы были доступны из SObjectizer Environment и работать с ними можно было вплоть до его завершения в so_5_transport добавлен слой reactor_layer_t позволяющий организовать работу сразу нескольких реакторов. Этот слой всегда предоставляет реактор по умолчанию, который создается по средством ace::make_select_reactor(). Добавление дополнительных реакторов происходит при помощи reactor_layer_params_t, с помощью которого можно задать дополнительные именованные реакторы.

В итоге получение указателя на действующий реактор происходит следующим образом:

// Реактор по умолчанию.
ACE_Reactor * reactor =
so_env.query_layer< reactor_layer_t >()->query_default_reactor();
// Именованный реактор.
ACE_Reactor * reactor =
so_env.query_layer< reactor_layer_t >()->query_named_reactor(
reactor_name );

Интеграция с каркасом Acceptor-Connector

Обертками над Acceptor-ом и Connector-ом являются классы acceptor_controller_t и connector_controller_t соответственно. Классы являются лишь интерфейсами для установки Acceptor-а Connector-а на каркас ACE_Reactor. Конкретные настройки для конкретной реализации возлагаются на классы наследники.

Главным методом acceptor_controller_t является acceptor_controller_t::create(), который пытается открыть серверное соединение.

Главным методом connector_controller_t является connector_controller_t::connect(), который пытается установить соединение с удаленной стороной.

Параметрами обоих описанных методов является mbox. Идея состоит в том, что этот mbox будет передан конкретным обработчикам установленного соединения и они в свою очередь будут рассылать информационные сообщения на этот mbox (см. Информационные сообщения канала).

Реализация сетевого слоя на сокетах TCP/IP

В данный момент реализация транспортного слоя библиотеки so_5_transport, существует только на основе TCP/IP сокетах. Классы и функция для создания создания TCP/IP соединений находятся в пространстве имен so_5_transport::socket. Основными классами являются фабрики создания контролеров Acceptor/Connector:

Коммуникационные агенты

Обоснование

Библиотека so_5_transport является надстройкой над SObjectizer, и была бы не полной, если бы не предоставляла готовых агентов, которые бы облегчали разработку сетевых приложений на SObjectizer. С их помощью становится возможным сосредоточиться на прикладной логике, оставив рутинную работу готовым агентам.

Для упрощения создания серверных и клиентских каналов в so_5_transport предоставляются следующие агенты:

Информационные сообщения канала

Ниже приведены SObjectizer-сообщения для уведомлений о событиях канала. Все сообщения отправляет конкретный Acceptor или Connector (см. Интеграция с каркасом Acceptor-Connector)

Агенты

Агент a_channel_base_t

Определяет методы получения уведомлений о событиях канала, и делегирует их обработку hook-методам, которые должны быть переопределены в наследниках (Информационные сообщения канала).

Наследование от данного класса избавляет от необходимости самому подписывать агента на события канала и хранить mbox для уведомлений. Если наследник определяет дополнительные состояния и хочет получать уведомления о событиях с каналом, то можно добавить состояние в список состояний для подписки на события канала.

См. также
a_channel_base_t::so_add_state_for_channel_events.

Агент a_server_base_t

В дополнениям к методам a_channel_base_t, определяет метод для приема сообщения msg_create_server_socket_result и делегирует его обработку hook-методу a_server_base_t::so_handle_create_server_socket_result

Агент a_client_base_t

По сравнению с a_channel_base_t не добавляет новых возможностей а служит для ясности имен базовых классов для серверного и клиентского соединения.

Агент a_transport_agent_t

Агент ведет список контроллеров для действующих каналов, через которые инициирует закрытие канала, при дерегистрации агента канала. Является базовым классом для агентов a_server_transport_agent_t и a_client_transport_agent_t.

Агент a_server_transport_agent_t

В своем начальном событии – so_evt_start(), пытается установить сервер. Результат создания сервера отсылается в виде сообщения msg_create_server_socket_result на mbox канала.

Агент a_client_transport_agent_t

В своем начальном событии – so_evt_start(), пытается установить подключение к серверу. Если установить подключение не удается и переподключение возможно, то клиент начинает периодически пытаться подключиться к серверу. В случае закрытия канала, агент снова производит попытку подключения. Настройки переподключения задаются с помощью класса client_reconnect_params_t.

Кооперация агентов транспортного канала

Для создания собственное кооперации обработки канала, надо написать собственного агента, который наследуется от a_server_base_t или от a_client_base_t, и определить логику обработки входящих данных канала (so_handle_*), а также добавить логику, которая будет обеспечивать исходящие данные, если таковые нужны.

Канал может быть серверный или клиентский. В зависимости от этого кооперация будет состоять из разных агентов.

Серверная кооперация

Серверная кооперация должна состоять из определенного пользователем агента наследника a_server_base_t и транспортного агента a_server_transport_agent_t.

Пример создания серверной кооперации, взятый из so_sysconf_mbapi_ichannel:

acceptor_creator( env );
acceptor_params(
cfg.m_ip) );
std::unique_ptr< a_server_transport_agent_t > ta(
new a_server_transport_agent_t(
env,
acceptor_creator.create(
acceptor_params,
cfg.m_channel_params ) ) );
a_mbapi_incoming_channel(
new a_failure_handler_t(
env,
ta->query_notificator_mbox(),
cfg.m_handshaking_params ) );
coop->add_agent( a_mbapi_incoming_channel );
coop->add_agent( so_5::rt::agent_ref_t( ta.release() ) );
// Регистрируем кооперацию.
const so_5::ret_code_t rc = env.register_coop(
std::move(coop), so_5::DO_NOT_THROW_ON_ERROR );

Клиентская кооперация

Клиентская кооперация должна сотоять из определенного пользователем агента наследника a_client_base_t и транспортного агента a_client_transport_agent_t.

Пример создания клиентской кооперации, взятый из so_sysconf_mbapi_ochannel:

connector_creator( env );
connector_params(
cfg.m_ip) );
client_reconnect_params(
cfg.m_on_lost_reconnect_timeout,
cfg.m_on_failed_reconnect_timeout,
cfg.m_do_reconnect);
std::unique_ptr< a_client_transport_agent_t > ta(
new a_client_transport_agent_t(
env,
connector_creator.create(
connector_params,
cfg.m_channel_params ),
client_reconnect_params ) );
a_mbapi_outgoing_channel(
env,
ta->query_notificator_mbox(),
cfg.m_handshaking_params ) );
coop->add_agent( a_mbapi_outgoing_channel );
coop->add_agent( so_5::rt::agent_ref_t( ta.release() ) );
// Регистрируем кооперацию.
const so_5::ret_code_t rc = env.register_coop(
std::move(coop), so_5::DO_NOT_THROW_ON_ERROR );

Документация по SObjectizer v.5.1 'Джимара'. Последние изменения: Ср 15 Май 2013 12:56:21. Создано системой  doxygen1.8.3.1 Intervale SourceForge.net Logo