Поскольку коммуникационный агент обрабатывает новую порцию данных не дожидаясь пока агент-коммуникатор справиться с предыдущей, то возможны ситуации, когда при интенсивном входящем трафике коммуникационный агент будет обеспечивать агент-коммуникатор работой быстрее, чем агент-коммуникатор будет с ней справляться. Что приведет к деградации приложения, т.к. все время будет тратиться на обработку сообщений so_4::rt::comm::msg_sop_package.
Аналогичная ситуация и с raw-соединениями. Только там с трафиком вынужден разбираться конкретный прикладной агент, который инициировал создание этого соединения.
Ситуация усугубляется тем, что если коммуникационный агент работает на собственной нити (как активный объект или член активной группы), на которой нет обработки поступающих от него данных, то не произойдет закрытие соединения из-за превышения объема входного буфера данного агента -- агент успевает его разбирать и генерировать все новые и новые порции сообщений so_4::rt::comm::msg_sop_package (so_4::rt::comm::msg_raw_package). Проблемы возникают как раз не у коммуникационного, а у прикладных агентов.
Проблему можно было бы решить предоставив прикладным агентам право разрешать или запрещать коммуникационному агенту извлекать данные из канала. В качестве такого решения был предложен механизм порогов входного потока.
При извлечении и разборе входящих данных на пакеты коммуникационный агент подсчитывает, сколько сообщений (пакетов) было получено и какой объем они составили. Затем, эти значения сравниваются со значениями установленого для канала порога. Если оказывается, что либо количество сообщений, либо объем сообщений превысили установленый порог, то ввод из канала блокируется. В сообщении so_4::rt::comm::msg_sop_package (so_4::rt::comm::msg_raw_package) атрибут m_is_blocked устанавливается в true.
Если входной поток в канале оказался заблокированным, то коммуникационный агент даже не проверяет физическое существование данных для чтения в канале. Для TCP/IP сокетов это будет означать, что если передающая сторона продолжает запись в сокет, то со временем сокет окажется недоступным для записи. Передающая сторона это способна обнаружить и либо прекратит запись до перехода сокета в нормальное состояние, либо инициирует закрытие сокета со своей стороны.
Для разблокирования входного потока коммуникационному агенту нужно отослать сообщение so_4::rt::comm::msg_unblock_channel. Проще всего это сделать вызвав метод so_4::rt::comm::msg_sop_package::unblock_channel() или so_4::rt::comm::msg_raw_package::unblock_channel().
Для SOP-каналов разблокировкой входных потоков занимается агент-коммуникатор. Для raw-каналов этой работой должен заниматься прикладной агент, который занимается вводом/выводом через этот канал.
Атрибут so_4::rt::comm::msg_sop_package::m_is_blocked (so_4::rt::comm::msg_raw_package::m_is_blocked) выставляется в true только в том сообщении, которое вызвало превышение порога. Например, если коммуникационный агент для SOP-канала извлек прочитал 32Kb данных, в которых находится 500 SOP-пакетов, а порог установлен в 50 сообщений, то:
Для raw-каналов обработка порога отличается следующим образом -- каждый извлеченный из физического канала блок данных отсылается как сообщение so_4::rt::comm::msg_raw_package. Атрибут m_is_blocked устанавливается в true, только если превышается количество ранее отосланных сообщений so_4::rt::comm::msg_raw_package или суммарный объем находящихся в них данных.
Для SOP-каналов пороги входного потока устанавливаются агентами so_4::rt::comm::a_cln_channel_t, so_4::rt::comm::a_srv_channel_t. Для raw-каналов используется значение по-умолчанию -- so_4::rt::comm::threshold_t::infinite(). Определить текущее значение порога входного потока можно с помощью методов so_4::rt::comm::a_cln_channel_base_t::in_threshold(), so_4::rt::comm::a_srv_channel_base_t::in_threshold().
Для обеспечения совместимости с предыдущими версиями SObjectizer-а для raw-каналов по-умолчанию устанавливается максимальное значение порога (so_4::rt::comm::threshold_t::infinite()).
Для платформ с 32-битовым unsigned int это означает порог в 4 294 967 295 сообщений и 4 294 967 295 байт. Превзойти такой порог по количеству сообщений вряд ли возможно, но объем сообщений в таком пороге составляет всего ~4Gb. Поэтому, если raw-канал используется для передачи очень большого трафика, то код по работе с raw-каналами нужно адаптировать к новой версии SObjectizer-а, т.к. после извлечения ~4Gb прикладных сообщений канал будет заблокирован.
Если же объем трафика по raw-каналам не будет превышать 4Gb, то написанный ранее код по работе с raw-каналами будет совместим с SObjectizer v.4.2.6.
При большом исходящем трафике может сложится ситуация, что в очередь к коммуникационному агенту будет поставлено очень много сообщений so_4::rt::comm::msg_send_package, после которых в очередь станет сообщение msg_do_io. При последовательной обработке сообщений из очереди может произойти переполнение буфера ожидающих отправки данных еще до того, как будет обработано сообщение msg_do_io. Т.е. все отсылаемые данные будут потеряны еще до того, как хоть какая-то их часть будет отправлена в физический канал.
Для того, чтобы этого не происходило для канала можно назначить порог выходного потока или порог наполнения буфера ожидающих отправки данных. Этот порог так же задается значениями типа so_4::rt::comm::threshold_t (см. so_4::rt::comm::a_cln_channel_base_t::set_out_threshold(), so_4::rt::comm::a_srv_channel_base_t::set_out_threshold()).
Порог выходного потока обрабатывается следующим образом:
Такая обработка порога выходного буфера приводит к следующим особенностям:
Для SOP-каналов пороги входного потока устанавливаются агентами so_4::rt::comm::a_cln_channel_t, so_4::rt::comm::a_srv_channel_t. Для raw-каналов используется значение по-умолчанию -- so_4::rt::comm::threshold_t::infinite(). Определить текущее значение порога входного потока можно с помощью методов so_4::rt::comm::a_cln_channel_base_t::out_threshold(), so_4::rt::comm::a_srv_channel_base_t::out_threshold().
Документация по SObjectizer v.4.4 'Тебуломста'. Последние изменения: Thu Sep 18 10:26:48 2008. Создано системой 1.5.6 |