Использование stream-функций.

Это вторая статья из серии “Многопоточность” в PHP.

В прошлый раз мы рассмотрели curl, теперь давайте поговорим о stream-функциях.

Вольный перевод документации:

Streams (потоки) были введены в PHP 4.3.0 как способ обобщить работу с файлами, сетью, сжатием данных и другими процессами, которые совместно используют единый набор функций. В простейшем определении, поток – объект ресурса, имеющий "потокообразное" поведение, т.е. из которого можно читать, в который можно писать и внутри которого можно перемещаться.

stream_socket-функции являются частью streams, при помощи них мы можем коннектиться или писать/читать данные, не дожидаясь завершения предыдущей операции.

Итак, дано: массив с урлами. Задача: непоследовательно получить их содержимое.

Давайте рассмотрим код, который решает поставленную нами задачу:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
<?php
// страницы, содержимое которых надо получить
$urls = array('www.yandex.ru', 'www.google.ru', 'www.mail.ru', 'www.rambler.ru');
 
$rtasks = array(); // задачи чтения
$wtasks = array(); // задачи записи
$results = array(); // результаты
 
foreach ($urls as $url) {
    // открываем отдельный сокет
    $sh = stream_socket_client($url.':80', $errno, $errstr, 10,
        STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
    if (!$sh) continue;
    // добавляем в задачи для записи
    $wtasks[$url] = $sh;
}
 
while ($wtasks || $rtasks) {
    // массив для сокетов с возможностью чтения
    $rtasks_ = $rtasks;
    // массив для сокетов с возможностью записи
    $wtasks_ = $wtasks;
 
    // ждем результатов из сокетов
    $n = stream_select($rtasks_, $wtasks_, $e=null, 10);
    if ($n > 0) {
        // сокеты, доступные для записи
        foreach ($wtasks_ as $sh) {
            // ищем урл страницы по дескриптору сокета в массиве задач записи
            $url = array_search($sh, $wtasks);
            // удаляем из задач записи
            unset($wtasks[$url]);
            // добавляем в задачи чтения
            $rtasks[$url] = $sh;
            // формируем http-заголовки
            $headers  = "GET / HTTP/1.0\r\n";
            $headers .= "Host: ".$url."\r\n";
            $headers .= "User-Agent: Mozilla/5.0 (Windows; U; Windows NT 5.1; ru; rv:1.9.0.6) Gecko/2009011913 MRA 5.3 (build 02557) Firefox/3.0.6\r\n";
            $headers .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
            $headers .= "Accept-Language: ru,en-us;q=0.7,en;q=0.3\r\n";
            $headers .= "Accept-Encoding: gzip,deflate\r\n";
            $headers .= "Accept-Charset: windows-1251,utf-8;q=0.7,*;q=0.7\r\n";
            $headers .= "\r\n";
            // записываем в сокет
            if (fwrite($sh, $headers) === false) fclose($sh);
        }
 
        // сокеты, доступные для чтения
        foreach ($rtasks_ as $sh) {
            // ищем урл страницы по дескриптору сокета в массиве задач чтения
            $url = array_search($sh, $rtasks);
            if (!$url) continue;
            // считываем результат из сокета
            $result = '';
            while ($r = fread($sh, 8192)) $result .= $r;
            // закрываем сокет
            fclose($sh);
            // удаляем из задач чтения
            unset($rtasks[$url]);
            // заносим html в массиы результатов
            $results[$url] = $result;
        }
    }
    else {
        break;
    }
}
?>

Сначала инициализируются три массива: $rtasks – в него будут заноситься сокеты, из которых можно читать данные, $wtasks – для сокетов, готовых к записи, и $results – массив для сохранения содержимого заданных страниц.

Затем в цикле функцией stream_socket_client открываются сокеты для каждого из урлов, при этом ставится флаг STREAM_CLIENT_ASYNC_CONNECT, который указывает, что каждое следующее соединение надо открывать не дожидаясь завершения открытия предыдущего. Флаг STREAM_CLIENT_CONNECT указывает, что мы создаем именно клиентское подключение.
Дескрипторы сокетов заносятся в массив $wtasks.

В 18-ой строке начинается главный цикл, и сразу массивам $rtasks_ и $wtasks_, передаваемым в функцию stream_select, присваиваются массивы, в которых хранятся сокеты для чтения и записи соответственно. Функция stream_select отслеживает наличие данных в потоках и принимает на вход несколько параметров: первый – это массив дескрипторов, в которых ожидается завершение операции чтения, второй – массив с дескрипторами, ожидающими завершение записи, третий – для особых случаев (поступление "out-of-band data" - внеполосных данных) и четвертый параметр - это таймаут. Массивы передаются в функцию по указателям, и как только функция возвращает число больше 0, в них будут находится сокеты, готовые к действиям.

В 28-ой строке мы смотрим, в какие сокеты можно записать данные (надо что-то отправить, прежде чем нам придет ответ-результат), и если такие сокеты есть, то удаляем из массива для записи текущий дескриптор, помещаем его в массив для чтения, и отправляем заголовки, необходимые для получения контента.
Далее, в 49-ой строчке, просматриваем сокеты, готовые для считывания из них данных, забираем содержимое, удаляем дескриптор из масива для чтения и закрываем сокет. Результат заносим в массив $results. Повторяем эти операции до тех пор, пока у нас остаются какие-либо открытые сокеты.

После работы скрипта нужный нам контент находится в массиве $results.

В статье рассмотрен пример работы с функциями stream_socket_*, на очереди асинхронные сокеты.