Текст книги "Операционная система UNIX"
Автор книги: Андрей Робачевский
Жанр:
ОС и Сети
сообщить о нарушении
Текущая страница: 29 (всего у книги 39 страниц)
В подсистеме STREAMS все данные передаются в виде сообщений. С помощью сообщений передаются данные от приложений к драйверу и обратно. Сообщения используются для взаимодействия модулей между собой. Модули могут также генерировать сообщения для уведомления прикладного процесса или друг друга о возникновении ошибок или непредвиденных ситуаций. Таким образом, сообщения являются единственным способом передачи информации между различными компонентами потока и потому занимают ключевое место в подсистеме STREAMS.
Сообщение описывается двумя структурами данных: заголовком сообщения msgb
(message block) и заголовком блока данных datab
(data block). Обе эти структуры адресуют буфер данных, где находятся фактические данные сообщения.
Заголовок сообщения msgb
имеет следующие поля:
b_next , b_prev | Используются для формирования связанного списка сообщений и соответственно адресуют следующее и предыдущее сообщение очереди |
b_cont | Указывает на продолжение сообщения и используется для связывания различных частей одного сообщения |
b_datap | Указатель на заголовок блока данных |
b_rptr , b_wptr | Указатели, определяющие расположение (начало и конец) данных в буфере данных |
b_cont | Содержит ссылку на следующую структуру msgb |
Заголовок блока данных datab
используется для описания буфера и имеет следующие поля:
db_base | Адрес начала буфера |
db_lim | Адрес ячейки памяти, следующей непосредственно за буфером. Таким образом, размер буфера равен db_lim – db_base |
db_type | Тип сообщения |
db_ref | Число заголовков сообщения, адресующих этот блок |
Использование этих структур данных для формирования очереди сообщений и сообщений, состоящих из нескольких частей, показано на рис. 5.17.
Рис. 5.17. Сообщения STREAMS
Поле b_cont
заголовка сообщения позволяет объединять несколько блоков данных в одно сообщение. Эта возможность особенно полезна при использовании подсистемы STREAMS для реализации сетевых протоколов. Сетевые протоколы имеют уровневую организацию. По мере передачи данных вниз по потоку, каждый последующий модуль (реализующий протокол определенного уровня) добавляет собственную управляющую информацию. Поскольку протоколы верхнего уровня не имеют представления об архитектуре нижних, невозможно заранее зарезервировать необходимую память под сообщение. Вместо того чтобы изменять размер буфера данных сообщения, модуль может добавлять управляющую информацию в виде отдельных частей, связывая их с помощью указателя b_cont
. Этот процесс, получивший название инкапсуляции данных, графически представлен на рис. 5.18.
Рис. 5.18. Инкапсуляция данных с использованием составных сообщений
Поле db_ref
заголовка блока данных позволяет нескольким заголовкам сообщения совместно использовать один и тот же буфер. При этом происходит виртуальное копирование сообщения, каждая копия которого может обрабатываться отдельно. Как правило, такой буфер используется совместно только для чтения, хотя сама подсистема STREAMS не накладывает никаких ограничений, возлагая всю ответственность за обработку данных на модули потока.
В качестве примера виртуального копирования можно привести реализацию протокола TCP. Протокол TCP является надежным, т.е. данные считаются доставленными только после того, как от получателя поступит подтверждение. Это означает, что протокол должен хранить копии всех отправленных, но не подтвержденных сообщений. Вместо неэффективного физического копирования, производится виртуальное дублирование сообщения, одна копия которого затем передается вниз по потоку (модулю IP), а вторая сохраняется до получения подтверждения. После отправления сообщения драйвером сетевого адаптера, одна из копий будет уничтожена, что выразится в уменьшении поля db_ref
заголовка блока данных, но сам блок данных сохранится, поскольку значение счетчика по-прежнему будет превышать 0. И только после получения подтверждения db_ref
станет равным 0, и соответствующий буфер будет освобожден.
Каждое сообщение принадлежит определенному типу, определяющему назначение сообщения и его приоритет. В зависимости от типа сообщения попадают в одну из двух категорий: обычные сообщения и приоритетные сообщения. Категория определяет порядок, в котором сообщения будут обрабатываться соответствующей процедурой xxservice()
. Приоритетные сообщения всегда помещаются перед обычными сообщениями и потому обрабатываются в первую очередь.
В подсистеме STREAMS определены следующие типы обычных сообщений:
M_DATA | Содержит обычные данные. Например, системные вызовы read(2) и write(2) осуществляют передачу данных в виде сообщений этого типа. |
M_PROTO | Содержит управляющую информацию. Обычно сообщение этого типа содержит также несколько блоков типа M_DATA . С помощью системных вызовов putmsg(2) и getmsg(2) процесс имеет возможность отправлять и получать как управляющую часть сообщения (блок M_PROTO ), так и данные (блоки M_DATA ). |
M_BREAK | Посылается драйверу устройства для генерации команды break. |
M_PASSFP | Используется в каналах STREAMS (STREAMS pipe) для передачи файлового указателя от одного конца канала к другому. |
M_SIG | Генерируется модулями или драйверами и передается вверх по потоку головному модулю для отправления процессу сигнала. |
M_DELAY | Передается драйверу устройства и указывает задержку между последовательно передаваемыми символами. Как правило, используется при работе с медленными устройствами во избежание переполнения их буферов. |
M_CTL | Используется для взаимодействия модулей потока друг с другом. Все сообщения этого типа уничтожаются головным модулем и, таким образом, не могут распространяться за пределы потока. |
M IOCTL | Формируется головным модулем в ответ на управляющие команды, переданные процессом с помощью системного вызова ioctl(2): I_LINK , I_UNLINK , I_PLINK , I_PUNLINK и I_STR . Эти команды используются для создания мультиплексированных потоков. Последняя команда используется для управления модулями потока. |
M_SETOPTS | Используется для задания различных характеристик головного модуля. |
M_RSE | Зарезервировано для внутреннего использования. Модули и драйверы должны передавать его без изменений. |
Как мы увидим далее, на передачу обычных сообщений влияет механизм управления потоком данных, который может быть реализован модулями потока. Этот механизм не оказывает влияния на передачу приоритетных сообщений. Сообщения этой категории будут переданы следующему модулю, независимо от того, насколько заполнена его очередь. Эти сообщения обеспечивают основное взаимодействие между компонентами потока. Перечисленные ниже сообщения являются высокоприоритетными:
M_COPYIN | Передается вверх по потоку головному модулю и указывает ему скопировать данные от процесса для команды ioctl(2). Сообщение допустимо в интервале между получением сообщения M_IOCTL и сообщения M_IOCACK или M_IOCNAK . |
M_COPYOUT | Передается вверх по потоку головному модулю и указывает ему передать данные, связанные с вызовом ioctl(2), процессу. Сообщение допустимо в интервале между получением сообщения M_IOCTL и сообщений M_IOCACK или M_IOCNAK . |
M_ERROR | Передается вверх по потоку головному модулю и указывает на возникновение ошибки вниз по потоку. Последующие операции с потоком будут заканчиваться ошибкой, за исключением системных вызовов close(2) и poll(2). |
M_FLUSH | При получении этого сообщения модуль должен очистить очередь (чтения, записи или обе) от сообщений. |
M_HANGUP | Передается вверх по потоку головному модулю и указывает, что драйвер не может передавать данные, обычно из-за обрыва линии (связи с удаленным объектом). |
M_IOCACK | Подтверждение предыдущего сообщения M_IOCTL . В ответ головной модуль возвратит необходимые данные процессу, сделавшему системный вызов ioctl(2). |
M_IOCNAK | Если выполнение команды ioctl(2) закончилось неудачей, это сообщение передается вверх по потоку головному модулю, в ответ на это последний возвратит процессу ошибку. |
M_PCPROTO | Высокоприоритетная версия сообщения M_PROTO . |
M_PCSIG | Высокоприоритетная версия сообщения M_SIG . |
M_PCRSE | Зарезервировано для внутреннего использования в подсистеме. |
M_READ | Сообщение передается вниз по потоку, когда от процесса поступает запрос на чтение, но в головном модуле отсутствуют данные. |
M_STOP | Предписывает немедленно прекратить передачу. |
M_START | Предписывает продолжить передачу после останова, вызванного сообщением M_STOP . |
Передача данных
Как уже обсуждалось, передача данных в потоке происходит в виде сообщений. Процесс инициирует передачу данных с помощью системных вызовов write(2) и putmsg(2), которые непосредственно взаимодействуют с головным модулем. Головной модуль формирует сообщение, копируя в него прикладные данные, и передает его следующему модулю вниз по потоку. В конечном итоге сообщение принимается драйвером, который выполняет необходимые операции с конкретным устройством. В случае, когда драйвер получает данные от устройства, он также передает их в виде сообщений вверх по потоку. Процесс имеет возможность получить данные с помощью системных вызовов read(2) или getmsg(2). Если в головном модуле данные отсутствуют, процесс блокируется и переходит в состояние сна.
Сообщения передаются модулями с помощью системной функции putnext(9F):
#include
#include
int putnext(queue_t *q, mblk_t *mp);
Эта функция адресует очередь следующего модуля параметром q
и вызывает процедуру xxput()
этой очереди, передавая ей сообщение mp
. Не поощряется непосредственный вызов функции xxput()
следующего модуля, поскольку это может вызвать определенные проблемы переносимости.
Передача данных внутри потока осуществляется асинхронно и не может блокировать процесс. Блокирование процесса возможно только при передаче данных между процессом и головным модулем. Таким образом, функции обработки данных потока – xxput()
и xxservice()
не могут блокироваться. Если процедура xxput()
не может передать данные следующему модулю, она помещает сообщение в собственную очередь, откуда оно может быть передано позже процедурой xxservice()
. Если и процедура xxservice()
не может осуществить передачу сообщения, например, из-за переполнения очереди следующего модуля, она не будет ожидать изменения ситуации, а вернет сообщение обратно в собственную очередь и завершит выполнение. Попытка передачи повторится, когда ядро через некоторое время опять запустит xxservice()
.
Процедура xxservice()
вызывается в системном контексте, а не в контексте процесса, который инициировал передачу данных. Таким образом, блокирование процедуры xxservice()
может заблокировать (перевести в состояние сна) независимый процесс, что может привести к непредсказуемым результатам и потому недопустимо. Решение этой проблемы заключается в запрещении процедурам xxput()
и xxservice()
блокирования своего выполнения.
Блокирование недопустимо и для драйвера. Обычно прием данных драйвером осуществляется с использованием прерываний. Таким образом процедура xxput()
вызывается в контексте прерывания и не может блокировать свое выполнение.
Когда процедура xxput()
не может передать сообщение следующему модулю, она вызывает функцию putq(9F), имеющую следующий вид:
#include
int putq(queue_t *q, mblk_t *mp);
Функция putq(9F) помещает сообщение mp в очередь q, где сообщение ожидает последующей передачи, и заносит очередь в список очередей, нуждающихся в обработке. Для таких очередей ядро автоматически вызывает процедуру xxservice()
. Планирование вызова процедур xxservice()
производится функцией ядра runqueues()
.[59]59
Система планирования STREAMS использует собственные функции и не имеет отношения к планированию процессов в UNIX.
[Закрыть] Функция runqueues()
вызывается ядром в двух случаях:
□ Когда какой-либо процесс выполняет операцию ввода/вывода над потоком.
□ Непосредственно перед переходом какого-либо процесса из режима ядра в режим задачи.
Заметим, что планирование обслуживания очередей не связано с конкретным процессом и производится для всей подсистемы STREAMS в целом.
Функция runqueue()
производит поиск всех потоков, нуждающихся в обработке очередей. При наличии таковых просматривается список очередей, ожидающих обработки, и для каждой из них вызывается соответствующая функция xxservice()
. Каждая процедура xxservice()
, в свою очередь, пытается передать все сообщения очереди следующему модулю. Если для каких-либо сообщений это не удается, они остаются в очереди, ожидая следующего вызова runqueue()
, после чего процесс повторяется.
Деление процесса передачи данных на два этапа, выполняемых, соответственно, функциями xxput()
и xxservice()
, позволяет реализовать механизм управления передачей данных.
Как уже упоминалось, обязательной для модуля является лишь функция xxput()
. Рассмотрим ситуацию, когда модули потока не содержат процедур xxservice()
. В этом случае, проиллюстрированном на рис. 5.19, каждый предыдущий модуль вызывает функцию xxput()
следующего, передавая ему сообщение, с помощью функции ядра putnext(9F). Функция xxput()
немедленно вызывает putnext(9F) и т.д.:
xxput(queue_t *q, mblk_t *mp) {
putnext(q, mp);
}
Рис. 5.19. Передача данных без управления потоком
Когда данные достигают драйвера, он передает их непосредственно устройству. Если устройство занято, или драйвер не может немедленно обработать данные, сообщение уничтожается. В данном примере никакого управления потоком не происходит, и очереди сообщений не используются.
Хотя такой вариант может применяться для некоторых драйверов (как правило, для псевдоустройств, например, /dev/null), в общем случае устройство не может быть все время готово к обработке данных, а потеря данных из-за занятости устройства недопустима. Таким образом, в потоке может происходить блокирование передачи данных[60]60
Блокирование передачи может происходить не только в драйвере (оконечном модуле) потока из-за занятости устройства. Возможна ситуация, когда отдельный модуль вынужден отложить обработку сообщений до наступления некоторого события.
[Закрыть], и эта ситуация не должна приводить к потере сообщений, во избежание которой необходим согласованный между модулями механизм управления потоком. Для этого сообщения обрабатываются и буферизуются в соответствующей очереди модуля, а их передача возлагается на функцию xxservice()
, вызываемую ядром автоматически. Для каждой очереди определены две ватерлинии – верхняя и нижняя, которые используются для контроля заполненности очереди. Если число сообщений превышает верхнюю ватерлинию, очередь считается переполненной, и передача сообщений блокируется, пока их число не станет меньше нижней ватерлинии.
Рассмотрим пример потока, модули 1 и 3 которого поддерживают управление потоком данных, а модуль 2 – нет. Другими словами, модуль 2 не имеет процедуры xxservice()
. Когда сообщение достигает модуля 3, вызывается его функция xxput()
. После необходимой обработки сообщения, оно помещается в очередь модуля 3 с помощью функции putq(9F). Если при этом число сообщений в очереди превышает верхнюю ватерлинию, putq(9F) устанавливает специальный флаг, сигнализирующий о том, что очередь переполнена:
mod1put(queue_t* q, mblk_t* mp) {
/* Необходимая обработка сообщения */
...
putq(q, mp);
}
Через некоторое время ядро автоматически запускает процедуру xxservice()
модуля 3. Для каждого сообщения очереди xxput()
вызывает функцию canput(9F), которая проверяет заполненность очереди следующего по потоку модуля. Функция canput(9F) имеет вид:
#include
int canput(queue_t* q);
Заметим, что canput(9F) проверяет заполненность очереди следующего модуля, реализующего механизм управления передачей данных, т.е. производящего обработку очереди с помощью процедуры xxservice()
. В противном случае, как уже говорилось, очередь модуля не принимает участия в передаче данных. В нашем примере, canput(9F) проверит заполненность очереди записи модуля 1. Функция возвращает истинное значение, если очередь может принять сообщение, и ложное – в противном случае. В зависимости от результата проверки процедура xxservice()
либо передаст сообщение следующему модулю (в нашем примере – модулю 2, который после необходимой обработки сразу же передаст его модулю 1), либо вернет сообщение обратно в очередь, если следующая очередь переполнена.
Описанная схема показана на рис. 5.20. Ниже приведен скелет процедуры xxservice()
модуля 3, иллюстрирующий описанный алгоритм передачи сообщений с использованием механизма управления передачей данных.
Рис. 5.20. Управление потоком данных
mod1service(queue_t *q) {
mblk_t* mp;
while ((mp = getq(q)) != NULL) {
if (canput(q->q_next))
putnext(q, mp);
else {
putbq(q, mp);
break;
}
}
В этом примере функция getq(9F) используется для извлечения следующего сообщения из очереди, а функция putbq(9F) – для помещения сообщения в начало очереди. Если модуль 1 блокирует передачу, т.е. canput(9F) вернет «ложно», процедура xxservice()
завершает свою работу, и сообщения начинают буферизоваться в очереди модуля 3. При этом очередь временно исключается из списка очередей, ожидающих обработки, и процедура xxservice()
для нее вызываться не будет. Данная ситуация продлится до тех пор, пока число сообщений очереди записи модуля 1 не станет меньше нижней ватерлинии.
Пока существует возникшая блокировка передачи, затор будет постепенно распространяться вверх по потоку, последовательно заполняя очереди модулей, пока, в конечном итоге, не достигнет головного модуля. Поскольку передачу данных в головной модуль (вниз по потоку) инициирует приложение, попытка передать данные в переполненный головной модуль вызовет блокирование процесса[61]61
Это единственная ситуация, в которой возможно блокирование процесса.
[Закрыть] и переход его в состояние сна.
В конечном итоге, модуль 1 обработает сообщения своей очереди, и их число станет меньше нижней ватерлинии. Как только очередь модуля 1 станет готовой к приему новых сообщений, планировщик STREAMS автоматически вызовет процедуры xxservice()
для модулей, ожидавших освобождения очереди модуля в нашем примере – для модуля 3.
Управление передачей данных в потоке требует согласованной работы всех модулей. Например, если процедура xxput()
буферизует сообщения для последующей обработки xxservice()
, такой алгоритм должен выполняться для всех сообщений.[62]62
Более точно – для всех сообщений с данным приоритетом.
[Закрыть] В противном случае, это может привести к нарушению порядка сообщений, и как следствие, к потере данных.
Когда запускается процедура xxservice()
, она должна обработать все сообщения очереди. «Уважительной» причиной прекращения обработки является переполнение очереди следующего по потоку модуля. В противном случае нарушается механизм управления передачей, и очередь может навсегда лишиться обработки.
Драйверы и модули очень похожи, они используют одинаковые структуры данных (streamtab
, qinit
, module_info
) и одинаковый интерфейс (xxopen()
, xxput()
, xxservice()
и xxclose()
). Однако между драйверами и модулями существуют различия.
Во-первых, только драйверы могут непосредственно взаимодействовать с аппаратурой и отвечать за обработку аппаратных прерываний. Поэтому драйвер должен зарегистрировать в ядре соответствующий обработчик прерываний. Аппаратура обычно генерирует прерывания при получении данных. В ответ на это драйвер копирует данные от устройства, формирует сообщение и передает его вверх по потоку.
Во-вторых, к драйверу может быть подключено несколько потоков. Как уже обсуждалось, на мультиплексировании потоков построены многие подсистемы ядра, например, поддержка сетевых протоколов. В качестве мультиплексора может выступать только драйвер. Несмотря на то что драйвер в этом случае не является оконечным модулем (см., например, рис. 5.15), размещение драйверов существенным образом отличается от встраивания модулей.
Наконец, процесс инициализации драйверов и модулей различен. Функция xxopen()
драйвера вызывается при открытии потока, в то время как инициализация модуля происходит при встраивании.
Обработку системных вызовов процессов осуществляет головной модуль. Головной модуль потока является единственным местом, где возможно блокирование обработки и, соответственно, процесса, в контексте которого осуществляется операция ввода/вывода. Головной модуль является внешним интерфейсом потока, и хотя его структура похожа на структуру обычного модуля, функции обработки здесь обеспечиваются самой подсистемой STREAMS. В отличие от точек входа в модуль или драйвер потока, реализующих специфическую для данного устройства обработку, функции головного модуля выполняют ряд общих для всех потоков задач, включающих:
□ Трансляцию данных, передаваемых процессом с помощью системных вызовов, в сообщения и передачу их вниз по потоку.
□ Сообщение об ошибках и отправление сигналов процессам, связанным с потоком.
□ Распаковку сообщений, переданных вверх по потоку, и копирование данных в пространство ядра или задачи.
Процесс передает данные потоку с помощью системных вызовов write(2) и putmsg(2). Системный вызов write(2), представляющий собой унифицированный интерфейс передачи данных любым устройствам, позволяет производить передачу простых данных в виде потока байтов, не сохраняя границы логических записей. Системный вызов putmsg(2), предназначенный специально для работы с потоками, позволяет процессу за один вызов передать управляющее сообщение и данные. Головной модуль преобразует эту информацию в единое сообщение с сохранением границ записи.
Системный вызов putmsg(2) имеет вид:
#include
int putmsg(int fildes, const struct strbuf *ctlptr,
const struct strbuf* dataptr, int flags);
С помощью этого вызова головной модуль формирует сообщение, состоящее из управляющей части M_PROTO
и данных, передаваемых в блоках M_DATA
. Содержимое сообщения передается с помощью указателей на структуру strbuf
– ctlptr
для управляющего блока и dataptr
для блоков данных.
Структура strbuf
имеет следующий формат:
struct strbuf {
int maxlen;
int len;
void *buf;
}
где maxlen
не используется, len
– размер передаваемых данных, buf
– указатель на буфер.
С помощью аргумента flags
процесс может передавать экстренные сообщения, установив флаг RS_HIPRI
.
В обоих случаях головной модуль формирует сообщение и с помощью функции canput(9F) проверяет, способен ли следующий вниз по потоку модуль, обеспечивающий механизм управления передачей, принять его. Если canput(9F) возвращает истинный ответ, сообщение передается вниз по потоку с помощью функции putnext(9F), а управление возвращается процессу. Если canput(9F) возвращает ложный ответ, выполнение процесса блокируется, и он переходит в состояние сна, пока не рассосется образовавшийся затор. Заметим, что возврат системного вызова еще не гарантирует, что данные получены устройством. Возврат из write(2) или putmsg(2) свидетельствует лишь о том, что данные были успешно скопированы в адресное пространство ядра, и в виде сообщения направлены вниз по потоку.
Процесс может получить данные из потока с помощью системных вызовов read(2) и getmsg(2). Стандартный вызов read(2) позволяет получать только обычные данные без сохранения границ сообщений.[63]63
С помощью сообщения M_SETOPTS
можно дать указания головному модулю обрабатывать сообщения M_PROTO
как обычные данные. В этом случае вызов read(2) будет возвращать содержимое как сообщений M_DATA
, так и M_PROTO
. Однако информация о типе сообщения (данных) и границы сообщений сохранены не будут.
[Закрыть] В отличие от этого вызова getmsg(2) позволяет получать данные сообщений типов M_DATA
и M_PROTO
, при этом сохраняются границы сообщений. Например, если полученное сообщение состоит из блока M_PROTO
и нескольких блоков M_DATA
, вызов getmsg(2) корректно разделит сообщение на две части: управляющую информацию и собственно данные.
Вызов getmsg(2) имеет вид:
#include
int getmsg(int fildes, struct strbuf *ctlptr,
struct strbuf *dataptr, int *flagsp);
С помощью вызова getmsg(2) прикладной процесс может получить сообщение, причем его управляющие и прикладные данные будут помещены в буферы, адресуемые ctlptr
и dataptr
соответственно. Так же как и в случае putmsg(2) эти указатели адресуют структуру strbuf
, которая отличается только тем, что поле maxlen
определяет максимальный размер буфера, a len
устанавливается равным фактическому числу полученных байтов. По умолчанию getmsg(2) получает первое полученное сообщение, однако с помощью флага RS_HIPRI
, установленного в переменной, адресуемой аргументом flagsp
, процесс может потребовать получение только экстренных сообщений.
В обоих случаях, если данные находятся в головном модуле, ядро извлекает их из сообщения, копирует в адресное пространство процесса и возвращает управление последнему. Если же в головном модуле отсутствуют сообщения, ожидающие получения, выполнение процесса блокируется, и он переходит в состояние сна до прихода сообщения.
Когда головной модуль получает сообщение, ядро проверяет, ожидает ли его какой-либо процесс. Если такой процесс имеется, ядро пробуждает процесс, копирует данные в пространство задачи и производит возврат из системного вызова. Если ни один из процессов не ожидает получения сообщения, оно буферизуется в очереди чтения головного модуля.