412 000 произведений, 108 200 авторов.

Электронная библиотека книг » Джулиан Бакнелл » Фундаментальные алгоритмы и структуры данных в Delphi » Текст книги (страница 34)
Фундаментальные алгоритмы и структуры данных в Delphi
  • Текст добавлен: 2 июня 2026, 12:30

Текст книги "Фундаментальные алгоритмы и структуры данных в Delphi"


Автор книги: Джулиан Бакнелл



сообщить о нарушении

Текущая страница: 34 (всего у книги 36 страниц)

Поскольку имеется четыре тесно связанных между собой переменных, вызовы для выполнения их считывания и обновления следует поместить внутрь критического раздела или флага синхронизации. Мы будем использовать критический раздел, поскольку эти компоненты эффективнее. Итак, это будет первым объектом синхронизации. Первым шагом выполнения каждого из четырех описанных методов будет запрос критического раздела, последним – его освобождение. Однако вспомните, что методы, которые позволяют запустить поток считывания, могут блокироваться внутри подпрограммы. Если бы этот программный блок оказался между процедурами вызова и освобождения управляющего критического раздела, возникла бы тупиковая ситуация. Поэтому необходимо обеспечить, чтобы блокировка выполнялась снаружи, после того, как критический раздел освобожден.

Поскольку одновременно только один поток записи может быть активным, может показаться целесообразным поместить объект синхронизации, который ставит потоки записи в очередь, также в критический раздел, поскольку этот раздел может принадлежать только одному потоку. Однако на практике проще воспользоваться семафором. Причина этого проста: в действительности не требуется вызов объекта синхронизации, поскольку не существует подходящего места для его освобождения. Действительно, вы убедитесь, что придется дожидаться семафора в одном потоке и освобождать его в другом. Такой подход невозможен при использовании критического раздела: поток, обращающийся к критическому разделу, владеет им.

А каким должен быть объект синхронизации для потоков считывания? Больше всего подошли бы семафор или событие сброса вручную. Как и в предыдущем случае, лучше использовать семафор, поскольку применение объекта события привело бы возникновению проблем (при получении сигнала будут освобождаться только ожидающие его прихода потоки;

в данной реализации поток может находиться в состоянии, в котором он еще не вызвал подпрограмму WaitFor).

Код интерфейса создаваемого нами класса синхронизации TtdReadWriteSync приведен в листинге 12.1. Он содержит ряд приватных полей, которые будут использоваться в четырех основных методах.

Листинг 12.1. Интерфейс класса TtdReadWriteSync

type

TtdReadWriteSync = class private

FActiveReaders : integer;

FActiveWriter : boolean;

FBlockedReaders : THandle;

{семафор}

FBlockedWriters : THandle;

{семафор}

FController : TRTLCriticalSection;

FWaitingReaders : integer;

FWaitingWriters : integer;

protected

public

constructor Create;

destructor Destroy; override;

procedure StartReading;

procedure StartWriting;

procedure StopReading;

procedure StopWriting;

end;

Приватное поле FBlockedReaders семафора предназначено для ожидающих потоков считывания, а поле FBlockedWriters – для ожидающих потоков записи. Поле FController – основной компонент, обеспечивающий последовательный доступ к объектам (к сожалению, применение подобного механизма последовательной обработки необходимо для обеспечения того, чтобы каждый поток получал целостное и неискаженное изображение всего класса).

Код метода StartReading приведен в листинге 12.2.

Листинг 12.2. Метод StartReading

procedure TtdReadWriteSync.StartReading;

var

HaveToWait : boolean;

begin

{перехватить управление критическим разделом}

EnterCriticalSection(FController);

{если существует выполняющийся поток записи или хотя бы один ожидающий своей очереди поток записи, метод добавляет себя в качестве ожидающего метода записи, обеспечивая переход в состояние ожидания}

if FActiveWriter or (FWaitingWriters <> 0) then begin

inc(FWaitingReaders);

HaveToWait :=true;

end

{в противном случае он добавляет себя в качестве еще одного выполняющегося потока считывания и обеспечивает отсутствие состояния ожидания}

else begin

inc(FActiveReaders);

HaveToWait := false;

end;

{освободить управление критическим разделом}

LeaveCriticalSection(FController);

{при необходимости ожидания нужно выполнить следующее}

if HaveToWait then

WaitForSingleObject(FBlockedReaders, INFINITE);

end;

Прежде всего, мы перехватываем управление критическим разделом. После этого можно осуществлять управление значениями внутренних полей. При наличии выполняющегося в текущий момент или хотя бы одного ожидающего потока записи метод увеличивает число ожидающих потоков считывания, освобождает управление критическим разделом, а затем переходит в состояние ожидания семафора «заблокированные потоки считывания». При отсутствии ожидающих или выполняющихся потоков записи метод увеличивает число выполняющихся потоков считывания и освобождает критический раздел. По выходу из этого метода программа либо освобождается от необходимости ожидать прихода семафора, либо сразу пропускает состояние ожидания. Обратите внимание, что во втором случае метод увеличил число выполняющихся потоков считывания, а в первом нет. Это может показаться программной ошибкой, но вскоре мы покажем, как можно решить возникающую при этом проблему.

Рассмотрим метод StopReading, код которого приведен в листинге 12.3.

Листинг 12.3. Метод StopReading

procedure TtdReadWriteSync.StopReading;

begin

{перехватить управление критическим разделом}

EnterCriticalSection(FController);

{считывание завершено}

dec (FActiveReaders);

{если выполняется последний поток считывания и при наличии по меньшей мере одного ожидающего потока записи ему необходимо предоставить свободу действий}

if (FActiveReaders = 0) and (FWaitingWriters <> 0) then begin

dec(FWaitingWriters);

FActiveWriter :=true;

ReleaseSemaphore(FBlockedWriters, 1, nil);

end;

{освободить управление критическим разделом}

LeaveCriticalSection(FController);

end;

Как обычно, прежде всего, мы перехватываем управление критическим разделом. Этот поток стремится прекратить свои действия по считыванию, поэтому он уменьшает значение счетчика выполняющихся потоков считывания. Если результирующее значение не равно нулю, это свидетельствует о наличии других активных потоков считывания. Поэтому метод просто освобождает управление критическим разделом и осуществляет выход. Однако если этот поток был последним активным потоком считывания, теперь значение счетчика равно нулю и нужно предоставить свободу действий ожидающему потоку записи (если таковой существует). Для этого метод освобождает семафор заблокированных потоков записи. Иначе говоря, метод увеличивает значение счетчика на единицу, в результате чего система предоставит свободу действий одному, и только одному, заблокированному потоку записи, после чего немедленно снова уменьшит значение счетчика до нуля, обеспечивая блокировку всех остальных потоков записи. Однако непосредственно перед тем метод StopReading уменьшает значение счетчика ожидающих потоков записи и увеличивает значение счетчика выполняющихся потоков записи. Общий результат выполнения этого кода состоит в том, что поток записи освобождается, а значения двух счетчиков потоков записи обновляются.

Перейдем к рассмотрению метода StartWriting, код которого приведен в листинге 12.4.

Вначале снова необходимо перехватить управление критическим разделом. При наличии любых выполняющихся потоков считывания или записи метод увеличивает значение счетчика ожидающих потоков записи, освобождает управление критическим разделом, а затем ожидает освобождения семафора заблокированных потоков записи.

Листинг 12.4. Метод StartWriting

procedure TtdReadWriteSync.StartWriting;

var

HaveToWait : boolean;

begin

{перехватить управление критическим разделом}

EnterCriticalSection(FController);

{при наличии еще одного запущенного потока записи или активных потоков считывания, метод добавляет себя в качестве ожидающего потока считывания и обеспечивает переход в состояние ожидания}

if FActiveWriter or (FActiveReaders <> 0) then begin

inc(FWaitingWriters);

HaveToWait := true;

end

{в противном случае метод должен добавить себя в качестве еще одного выполняющегося потока записи и обеспечить отсутствие состояния ожидания}

else begin

FActiveWriter :=true;

HaveToWait := false;

end;

{освободить управление критическим разделом}

LeaveCriticalSection(FController);

{при необходимости ожидания нужно выполнить следующее}

if HaveToWait then

WaitForSingleObject(FBlockedWriters, INFINITE);

end;

При отсутствии каких-либо других выполняющихся потоков можно сразу начать запись. Метод увеличивает значение счетчика выполняющихся потоков записи, освобождает управление критическим разделом и осуществляет выход из подпрограммы. В любом случае, сразу по выходу из подпрограммы значение счетчика активных потоков записи оказывается установленным равным единице (либо самим этим методом, либо методом StopReading – если помните, это происходит Непосредственно перед передачей семафора заблокированных потоков записи).

И, наконец, можно приступить к рассмотрению метода StopWriting, код которого приведен в листинге 12.5.

Как и ранее, первоначальная задача состоит в перехвате управления критическим разделом. Затем, поскольку запись завершена, метод уменьшает значение счетчика активных потоков записи. Теперь выполняется проверка количества ожидающих потоков считывания. Мы входим в цикл, который уменьшает значение счетчика активных потоков считывания и освобождает семафор. Семафор, в свою очередь, освобождает от ожидания один поток считывания. Со временем, по завершении цикла, все потоки считывания будут освобождены и смогут считаться активными (обратите внимание, что они все будут использовать соответствующее обращение к методу StartReading). Если, с другой стороны, не существует никаких ожидающих потоков считывания, метод выполняет проверку на наличие каких-либо ожидающих потоков записи. Если такие потоки существуют, метод освобождает только один поток записи таким же образом, как уже было описано при рассмотрении метода StopReading. И, наконец, независимо ни от чего, метод освобождает управление критическим разделом.

Листинг 12.5. Метод StopWriting

procedure TtdReadWriteSync.StopWriting;

var

i : integer;

begin

{перехватить управление критическим разделом}

EnterCriticalSection(FController);

{запись завершена}

FActiveWriter := false;

{если имеется хотя бы один ожидающий поток записи, освободить их всех}

if (FWaitingReaders <> 0) then begin

FActiveReaders := FWaitingReaders;

FWaitingReaders := 0;

ReleaseSemaphore(FBlockedReaders, FActiveReadersr nil);

end

{в противном случае, при наличии по меньшей мере одного ожидающего потока записи, ему необходимо предоставить свободу действий}

else

if (FWaitingWriters <> 0) then begin

dec(FWaitingWriters);

FActiveWriter :=true;

ReleaseSemaphore(FBlockedWriters, 1, nil);

end;

{освободить управление критическим разделом}

LeaveCriticalSection(FController);

end;

Нам осталось рассмотреть только два метода: конструктор Create и деструктор Destroy. Код реализации этих методов показан в листинге 12.6.

Листинг 12.6. Создание и уничтожение объекта синхронизации

constructor TtdReadWriteSync.Create;

var

NameZ : array [0..MAXJPATH] of AnsiChar;

begin

inherited Create;

{создать примитивные объекты синхронизации}

GetRandomObjName (NameZ, ' tdRW.BlockedReaders' );

FBlockedReaders := CreateSemaphore(nil, 0, MaxReaders, NameZ);

GetRandomObjName(NameZ, 'tdRW.BlockedWriters');

FBlockedWriters := CreateSemaphore(nil, 0, 1, NameZ);

InitializeCriticalSection(FController);

end;

destructor TtdReadWriteSyhc.Destroy;

begin

CloseHandle(FBlockedReaders);

CloseHandle(FBlockedWriters);

DeleteCriticalSection(FController);

inherited Destroy;

end;

Как видите, конструктор Create будет создавать три примитивных объекта синхронизации, а деструктор Destroy будет, соответственно, их уничтожать.

Полный исходный код класса TtdReadWriteSync можно найти на Web-сайте издательства, в разделе материалов. После выгрузки материалов отыщите среди них файл TDRWSync.pas.

Алгоритм производителей-потребителей

Еще один многопоточный алгоритм, тесно связанный с проблемой потоков считывания и записи – алгоритм, решающий проблему производителей и потребителей.

Этот раздел адресован только тем программистам, которые работают в среде 32-раздядной Windows. Delphi I вообще не поддерживает многопоточную обработку, в то время как Kylix и Linux не предоставляют необходимых примитивных объектов синхронизации, с помощью которых можно было бы решить проблему производителей-потребителей.

В этой ситуации имеется один или более потоков, создающих данные (их называют производителями (producers)), которые будут использоваться или потребляться одним или большим количеством других потоков (называемых потребителями (consumers)). Как видите, эта задача тесно связана с алгоритмом потоков считывания-записи: потребителей можно считать потоками считывания данных, записанных производителями. Примером использования этого алгоритма может послужить программа потокового видео: в этом случае будет существовать поток, который загружает видео из какого-то Web-сайта, и поток, который воспроизводит загруженное видео. Ни один из этих потоков не должен беспокоиться о том, что должен делать второй.

Мы сымитируем этот процесс подпрограммой копирования нескольких потоков. Производитель будет копировать данные из потока в очередь буферов. Затем потребитель будет копировать данные из буферов в другой поток. Например, мог бы существовать производитель, считывающий несжатые данные из потока, и два потребителя данных: один, сжимающий данные в другой поток с помощью одного алгоритма, и второй, сжимающий их с помощью другого алгоритма, что теоретически позволяет выбирать более плотно упакованные данные. В этом случае производитель может продолжать работу и пытаться максимально быстро заполнять буфера в очереди, а потребители, в свою очередь, могут пытаться максимально быстро их считывать. Работа производителя будет тормозиться, если потребители работают недостаточно быстро и очередь заполняется непрочитанными буферами. Аналогично, работа потребителей будет замедляться, если производитель работает медленно и очередь опустошается.

Модель с одним производителем и одним потребителем

Вначале рассмотрим модель с одним производителем и одним потребителем. Затем мы ее расширим до модели с одним производителем и несколькими потребителями. Нам необходимо, чтобы сразу после генерирования производителем «достаточного» объема данных потребитель мог начинать использовать уже сгенерированные данные. Поэтому необходимо рассмотреть три ситуации: производитель и потребитель работают согласованно;

потребитель прекращает свою работу или блокируется, поскольку производитель не создал достаточный объем данных;

производитель блокируется, поскольку потребитель не успел выполнить считывание уже созданных данных.

В примере с копированием потока производитель будет прекращать работу, если ему удастся заполнить все буферы прежде, чем потребитель успеет считать и обработать первый буфер. Потребитель будет блокироваться, если ему удастся обработать все буферы прежде, чем производитель успеет заполнить еще один буфер.

Следовательно, разрабатываемый нами класс синхронизации должен содержать четыре метода: вызываемый производителем, чтобы начать генерирование данных;

вызываемый при наличии каких-либо данных, готовых для использования потребителем;

вызываемый потребителем, чтобы начать потребление данных;

и, наконец, вызываемый потребителем по завершении потребления им объема данных, достаточного для возобновления генерации данных производителем. Как и в случае потоков считывания-записи, оба метода запуска могут блокировать вызывающие их потоки.

Полный код интерфейса и реализации класса производителя-потребителя приведен в листинге 12.7. Как видите, реализация весьма проста.

Листинг 12.7. Класс синхронизации одного производителя и одного потребителя type

TtdProduceConsumeSync = class private

FHasData : THandle;

{семафор}

FNeedsData : THandle;

{семафор}

protected

public

constructor Create(aBufferCount : integer);

destructor Destroy; override;

procedure StartConsuming;

procedure StartProducing;

procedure StopConsuming;

procedure StopProducing;

end;

Первым делом, мы рассмотрим метод StartProducing (см. листинг 12.8), вызываемый производителем для запуска генерирования данных. Метод будет вызывать блокировку, если потребитель не успел использовать достаточно данных, чтобы производитель мог заменить их новыми. Метод достаточно прост: он просто ожидает передачи семафора «требуются данные». Как мы увидим, этот семафор будет передаваться потребителем.

Листинг 12.8. Метод StartProducing

procedure TtdProduceConsumeSync.StartProducing;

begin

{чтобы генерирование было начато, должен быть передан семафор "требуются данные"}

WaitForSingleObject(FNeedsData, INFINITE);

end;

Производитель будет вызывать второй метод, StopProducing (см. листинг 12.9), сообщающий потребителю о том, что он сгенерировал определенные (возможно все) данные, и что, следовательно, существуют данные, которые нужно использовать. Его реализация также проста: код просто передает семафор «имеются данные», ожидаемый потребителем.

Листинг 12.9. Метод StopProducing

procedure TtdProduceConsumeSync.StopProducing;

begin

{при генерировании каких-либо дополнительных данных потребителю нужно сообщить о необходимости их использования}

ReleaseSemaphore(FHasData, 1, nil);

end;

Третий метод, StartConsuming (листинг 12.10), вызывается потребителем перед тем, как он приступит к потреблению сгенерированных производителем данных. Метод будет вызывать блокировку на время ожидания семафора «имеются данные», который будет передаваться немедленно, если производитель уже сгенерировал какие-либо данные.

Листинг 12.10. Метод StartConcuming

procedure TtdProduceConsumeSync.StartConsuming;

begin

{чтобы можно было начать потребление данных, должен быть передан семафор "имеются данные"}

WaitForSingleObject(FHasData, INFINITE);

end;

Последний метод, StopConcuming (листинг 12.11), вызывается потребителем при считывании им достаточного объема (или всех) данных, чтобы производитель мог сгенерировать дополнительные данные. Очевидно, что этот метод всего лишь передает семафор «требуются данные», который будет предоставлять свободу действий производителю, если тот находится в состоянии ожидания.

Листинг 12.11. Метод StopConcuming

procedure TtdProduceConsumeSync.StopConsuming;

begin

{если какие-либо данные были использованы, нужно сигнализировать производителю о необходимости генерации дополнительных данных}

ReleaseSemaphore(FNeedsData, 1, nil);

end;

Полный исходный код класса TtdProduceConsumeSync можно найти на Web-сайте издательства, в разделе материалов. После выгрузки материалов отыщите среди них файл TDPCSync.pas.

Обратите внимание, что при использовании объекта семафора Windows неявно предполагается, что данные могут храниться только в 127 или меньшем количестве буферов, поскольку каждый раз, когда производитель сообщает, что потребитель может использовать какие-либо дополнительные данные, значение семафора "имеются данные" увеличивается на единицу (а его максимальное значение ограничено величиной, равной 127). Аналогичные соображения справедливы и по отношению к семафору "требуются данные". Однако в целом, это не столь уж большое ограничение. Во множестве сценариев с применением производителя-потребителя для передачи данных используется всего один буфер, а подпрограмма копирования потока, которую мы будем рассматривать, использует очередь буферов, содержащую 20 элементов.

Очередь буферов, используемая в рассматриваемом примере копирования потока, реализована в виде циклической очереди. Очередь создается с заранее выделенными всеми ее буферами. Код реализации этого класса приведен в листинге 12.12.

Обратите внимание, что мы не будем использовать диспетчер кучи во время процесса копирования потока, поскольку критический раздел защищает диспетчер кучи в многопоточной подпрограмме. Если начать вызывать подпрограммы распределения и освобождения памяти из потоков, они слишком легко смогут блокировать одна другую и, возможно, препятствовать достижению основной цели применения класса синхронизации производителя-потребителя.

Производитель будет заполнять буфер в начале очереди, а затем перемещать указатель начала очереди. С другой стороны, потребитель будет считывать, данные из буфера в конце очереди, а затем перемещать конец очереди. Процессы заполнения и считывания могут происходить одновременно, поскольку они используют различные буферы.

Листинг 12.12. Класс TQueuedBuffers, предназначенный для выполнения копирования потока

type

PBuffer= ^TBuffer;

TBuffer = packed record

bCount : longint;

bBlock : array [0..pred(BufferSize)] of byte;

end;

PBufferArray = ^TBufferArray;

TBufferArray = array [0..1023] of PBuffer;

type

TQueuedBuffers = class private

FBufCount : integer;

FBuffers : PBufferArray;

FHead : integer;

FTail : integer;

protected

function qbGetHead : PBuffer;

function qbGetTail : PBuffer;

public

constructor Create(aBufferCount : integer);

destructor Destroy; override;

procedure AdvanceHead;

procedure AdvanceTail;

property Head : PBuffer read qbGetHead;

property Tail : PBuffer read qbGetTail;

end;

constructor TQueuedBuffer s.Create(aBufferCount : integer);

var

i : integer;

begin

inherited Create;

{распределить буферы}

FBuffers := AllocMem(aBufferCount * sizeof(pointer));

for i := 0 to pred(aBufferCount) do

GetMem(FBuffers^[i], sizeof(TBuffer));

FBufCount := aBufferCount;

end;

destructor TQueuedBuffers.Destroy;

var

i : integer;

begin

{освободить буферы}

if (FBuffers <> nil) then begin

for i := 0 to pred( FBuf Count) do

if (FBuffers^[i] <> nil) then

FreeMem(FBuffers^[i], sizeof(TBuffer));

FreeMem(FBuffers, FBufCount * sizeof(pointer));

end;

inherited Destroy;

end;

procedure TQueuedBuffers.AdvanceHead;

begin

inc(FHead);

if (FHead = FBufCount) then

FHead := 0;

end;

procedure TQueuedBuffers.AdvanceTail;

begin

inc(FTail);

if (FTail = FBuf Count) then

FTail := 0;

end;

function TQueuedBuffers.qbGetHead : PBuffer;

begin

Result := FBuffers^[FHead];

end;

function TQueuedBuffers.qbGetTail : PBuffer;

begin

Result := FBuffers^[FTail];

end;

Менее очевидно то, что указатели начала и конца очереди не должны быть защищены от изменений критическими разделами или какими-то аналогичными элементами. На первый взгляд это кажется противоречащим здравому смыслу и всем правилам совместного использования данных в различных потоках. Однако поток потребителя никогда не будет обращаться к указателю конца очереди. О наличии данных, которые нужно считать из указателя начала очереди, ему будет сообщать поток производителя (в этот момент времени указатели начала и конца очереди будут различными). Аналогично, поток производителя никогда не будет обращаться к указателю начала очереди, поскольку о наличии места для добавления данных в конце очереди ему будет сообщать поток потребителя.

Коды реализации классов производителя и потребителя приведены в листинге 12.13. Эти классы являются производными от класса TThread. Код реализации каждого из перекрытых методов Execute не отличается от ранее описанного. Поток производителя входит в цикл. На каждом шаге цикла он вызывает метод StartProducer объекта синхронизации, а затем считывает блок данных из исходного потока в буфер в конце очереди. После этого он смещает указатель конца очереди. И, в заключение, он вызывает метод StopProducing и повторяет цикл с начала. Выполнение цикла прекращается, как только поток производителя устанавливает буфер в состояние, соответствующее отсутствию в нем каких-либо данных (потребитель воспринимает это состояние в качестве признака "конец потока").

В свою очередь, цикл потока потребителя выполняется следующим образом. Вначале поток вызывает метод StartConsuming объекта синхронизации. Возврат из этого метода свидетельствует об отсутствии данных для считывания в объекте поставленных в очередь буферов. Поток считывает данные из буфера, определяемого указателем начала очереди, и записывает их в поток назначения. Затем он смещает указатель начала очереди. Сразу после считывания всех данных из заполненного буфера он вызывает метод StopConsuming объекта синхронизации и повторяет цикл сначала. Работа потребителя останавливается при получении им пустого буфера.

Листинг 12.13. Классы производителя и потребителя

type

TProducer = class (TThread) private

FBuffers : TQueuedBuffers;

FStream : TStream;

FSyncObj : TtdProduceConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

end;

constructor TProducer.Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

begin

inherited Create (true);

FStream := aStream;

FSyncObj :=,aSyncObj;

FBuffers aBuffers;

end;

procedure TProducer.Execute;

var

Tail : PBuffer;

begin

{выполнять до момента опустошения потока...}

repeat

{сигнализировать о готовности к началу генерирования данных}

FSyncObj.StartProducing;

{считать блок из потока в конечный буфер}

Tail FBuffers.Tail;

Tail^.bCount := FStream.Read(Tail^.bBlock, BufferSize);

{переместить указатель конца очереди}

FBuffers.AdvanceTail;

{поскольку выполняется запись нового буфера, необходимо сигнализировать о созданных данных}

FSyncObj.StopProducing;

until (Tail^.bCount ? 0);

end;

type

TConsumer = class(TThread) private

FBuffers : TQueuedBuffers;

FStream : TStream;

FSyncObj : TtdProduceConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

end;

constructor TConsumer.Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

begin

inherited Create (true);

FStream := aStream;

FSyncObj := aSyncObj;

FBuffers := aBuffers;

end;

procedure TConsumer.Execute;

var

Head : PBuffer;

begin

{сигнализировать о готовности к началу потребления данных}

FSyncObj.StartConsuming;

{извлечь начальный буфер}

Head := FBuffers.Head;

{до тех пор, пока начальный буфер не опустошен...}

while (Head^.bCount <> 0) do

begin

{выполнить запись блока из начального буфера в поток}

FStream.Write(Head^.bBlock, Head^.bCount);

{переместить указатель начала очереди}

FBuffers.AdvanceHead;

{поскольку было выполнено считывание и обработка буфера, необходимо сообщить о том, что данные были использованы}

FSyncObj.StopConsuming;

{сигнализировать о готовности снова приступить к потреблению данных}

FSyncObj.StartConsuming;

{извлечь начальный буфер}

Head := FBuffers.Head;

end;

end;

И, наконец, мы можем рассмотреть подпрограмму копирования потока, приведенную в листинге 12.14. Она принимает два параметра: входной поток и выходной поток. Подпрограмма создает специальный объект типа TQueuedBuffers. Этот объект содержит все ресурсы и методы, необходимые для реализации организованного в виде очереди набора буферов. Он создает также экземпляр класса TtdProducerConsumerSync, который будет действовать в качестве объекта синхронизации, обеспечивающего согласованную работу производителя и потребителя.

Листинг 12.14. Многопоточное копирование

procedure ThreadedCopyStream(aSrcStream, aDestStream : TStream);

var

SyncObj : TtdProduceConsumeSync;

Buffers : TQueuedBuffers;

Producer : TProducer;

Consumer : TConsumer;

WaitArray : array [ 0..1] of THandle;

begin

SyncObj := nil;

Buffers := nil;

Producer :=nil;

Consumer :=nil;

try

{создать объект синхронизации, объект организованных в виде очереди буферов (с 20 буферами) и два потока}

SyncObj := TtdProduceConsumeSync.Create(20);

Buffers := TQueuedBuffers.Create(20);

Producer := TProducer.Create(aSrcStream, SyncObj, Buffers);

Consumer := TConsumer.Create(aDestStream, SyncObj, Buffers);

{сохранить дескрипторы потоков, что обеспечивает возможность ожидания их передачи}

WaitArray[0] := Producer.Handle;

WaitArray[1] := Consumer.Handle;

{запустить потоки}

Consumer.Resume;

Producer.Resume;

{ожидать окончания потоков}

WaitForMultipleObjects(2, @WaitArray, true, INFINITE);

finally

Producer.Free;

Consumer.Free;

Buffers.Free;

SyncObj.Free;

end;

end;

Затем подпрограмма копирования создает два потока, между которыми будет выполняться копирование, и возобновляет их выполнение (потоки создаются в приостановленном состоянии). Далее подпрограмма дожидается завершения обоих потоков и выполняет очистку. Полный код подпрограммы можно найти в файлах TstCopy.dpr и TstCopyu.pas на web-сайте издательства, в разделе материалов.

Модель с одним производителем и несколькими потребителями

Реализовать рассмотренное приложение, в котором используется модель «производитель-потребитель», было достаточно просто. Теперь рассмотрим модель с одним производителем и несколькими потребителями. В этом случае имеется поток, который создает данные. Предположим, что существует несколько потоков, которым требуется считывать созданные данные. В упомянутом ранее примере использовались два потребителя, которые сжимали данные с применением разных алгоритмов. Еще одним примером мог бы служить браузер. Будем считать, что производитель выгружает web-страницу из удаленного сайта, а один потребитель считывает HTML-код, чтобы выполнить его сохранение на диске, второй считывает код для его отображения на экране, а третий – с целью отображения индикатора выполнения. Создание этих процессов как отдельных потребителей упрощает написание кода, поскольку каждый процесс должен выполнять только одну задачу.

Итак, что же требуется, чтобы объект синхронизации поддерживал согласованную работу производителя и потребителей? Во-первых, производитель должен сообщать всем потребителям о наличии данных для считывания. Предположительно скорости работы потребителей будут различными, и поэтому они будут обрабатывать данные с различной частотой. Это предполагает существование по одному семафору "имеются данные" на каждый потребитель. Будем считать, что существует список буферов, которые производитель должен пополнять данными. И более того, этот список организован в виде циклической очереди. Следовательно, нам нужен единственный указатель конца очереди (управляемый исключительно производителем) и по одному указателю начала очереди для каждого потребителя, поскольку, по всей вероятности, каждый потребитель будет считывать буфера с различной частотой.

Так как же быть с производителем? Каким образом он узнает, что можно снова заполнять буфер данных? Понятно, что он может это делать только после того, как последний (предположительно самый медленный) потребитель прочитал достаточный объем данных, чтобы появилось место для его заполнения новыми данными (иначе говоря, как только буфер снова освободится). Это, в свою очередь, предполагает, что должен существовать счетчик потребителей для каждого буфера данных. Каждый раз, когда потребитель считывает данные из буфера, он уменьшает значение этого счетчика (число потребителей, которым еще только предстоит выполнить считывание данных из этого буфера). Таким образом, когда последний потребитель приступает к использованию каких-либо данных, известно, что он является последним, поскольку после уменьшения значение счетчика должно быть равно нулю. Обратите внимание, что потребители являются потоками и, следовательно, уменьшение значения счетчика следует выполнять безопасным для потоков образом.


    Ваша оценка произведения:

Популярные книги за неделю