Итерируемые потоки¶
Стабильность: 1 – Экспериментальная
Эта возможность не подпадает под правила семантического версионирования. Несовместимые назад изменения или удаление могут произойти в любом будущем релизе. Использовать такую возможность в рабочем окружении не рекомендуется.
Модуль node:stream/iter предоставляет потоковый API на основе итерируемых объектов вместо событийной иерархии классов Readable/Writable/Transform или интерфейсов Web Streams ReadableStream/WritableStream/TransformStream.
Модуль доступен только при включённом флаге CLI --experimental-stream-iter.
Потоки представлены как AsyncIterable<Uint8Array[]> (асинхронно) или Iterable<Uint8Array[]> (синхронно). Базовых классов для наследования нет — любой объект с протоколом итератора может участвовать. Преобразования — обычные функции или объекты с методом transform.
Данные передаются пакетами (Uint8Array[] за одну итерацию), чтобы амортизировать стоимость асинхронных операций.
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 8 9 10 11 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
Основные понятия¶
Байтовые потоки¶
Все данные в этом API представлены как байты Uint8Array. Строки при передаче в from(), push() или pipeTo() автоматически кодируются в UTF-8. Это устраняет неоднозначность кодировок и позволяет передавать данные без копирования между потоками и нативным кодом.
Пакетирование¶
Каждая итерация выдаёт пакет — массив фрагментов Uint8Array (Uint8Array[]). Пакетирование амортизирует стоимость await и создания Promise на нескольких фрагментах. Потребитель, обрабатывающий по одному фрагменту, может просто обойти внутренний массив:
1 2 3 4 5 | |
1 2 3 4 5 6 7 | |
Преобразования¶
Преобразования бывают двух видов:
-
Без состояния — функция
(chunks, options) => result, вызываемая один раз на пакет. ПринимаетUint8Array[](илиnullкак сигнал сброса) и объектoptions. ВозвращаетUint8Array[],nullили итерируемое фрагментов. -
С состоянием — объект
{ transform(source, options) }, гдеtransform— генератор (синхронный или асинхронный), получающий весь восходящий итерируемый поток и объектoptions, и выдающий выход. Так делают сжатие, шифрование и любые преобразования, которым нужен буфер между пакетами.
В обоих случаях передаётся параметр options со свойством:
options.signal<AbortSignal>Сигнал прерывания при отмене конвейера, ошибке или остановке чтения потребителем. Преобразования могут проверятьsignal.abortedили слушать событие'abort'для досрочной очистки.
Сигнал сброса (null) посылается после окончания источника, чтобы преобразования могли выдать хвостовые данные (например, подписи сжатия).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | |
Модели pull и push¶
API поддерживает две модели:
-
pull— данные идут по требованию.pull()иpullSync()создают ленивые конвейеры: источник читается только когда потребитель итерирует. -
push— данные записываются явно.push()создаёт пару writer/readable с обратным давлением. Writer записывает данные; readable потребляется как async iterable.
Обратное давление¶
У pull-потоков обратное давление естественное — темп задаёт потребитель, источник не читается быстрее, чем успевает обработка. Потокам push нужно явное обратное давление: производитель и потребитель работают независимо. Параметры highWaterMark и backpressure у push(), broadcast() и share() задают поведение.
Двухбуферная модель¶
Потоки push используют двухчастную буферизацию. Представьте ведро (слоты), заполняемое через шланг (ожидающие записи), с поплавковым клапаном, который закрывается, когда ведро полно:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
-
Слоты (ведро) — данные, готовые потребителю, не больше
highWaterMark. Когда потребитель читает, он за раз опустошает все слоты в один пакет. -
Ожидающие записи (шланг) — записи, ждущие места в слотах. После того как потребитель опустошил буфер, ожидающие записи попадают в освободившиеся слоты и их промисы завершаются.
Как политики используют буферы:
| Политика | Лимит слотов | Лимит ожидающих записей |
|---|---|---|
'strict' | highWaterMark | highWaterMark |
'block' | highWaterMark | Без ограничения |
'drop-oldest' | highWaterMark | Н/д (никогда не ждёт) |
'drop-newest' | highWaterMark | Н/д (никогда не ждёт) |
Строгий режим (по умолчанию)¶
Режим 'strict' отсекает сценарии «записал и забыл», когда производитель вызывает write() без await, что вело бы к неограниченному росту памяти. Ограничиваются и буфер слотов, и очередь ожидающих записей значением highWaterMark.
Если каждая запись ожидается через await, одновременно может быть не больше одной ожидающей записи (вашей), лимит очереди не достигается. Неожидаемые записи накапливаются в очереди и при переполнении вызывают исключение:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Забытый await в итоге приведёт к исключению:
1 2 3 4 5 | |
Блокировка¶
В режиме 'block' слоты ограничены highWaterMark, а очередь ожидающих записей не ограничена. Записи с await блокируются, пока потребитель не освободит место, как в 'strict'. Отличие: неожидаемые записи бессрочно ставятся в очередь без исключения — возможна утечка памяти, если производитель забывает await.
Так по умолчанию ведут себя классические потоки Node.js и Web Streams. Используйте, когда контролируете производителя и он корректно ожидает записи, или при переносе кода с этих API.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Вытеснение старейшего¶
Записи никогда не ждут. Когда буфер слотов полон, самый старый фрагмент вытесняется, чтобы освободить место новой записи. Потребитель всегда видит наиболее свежие данные. Удобно для live-лент, телеметрии и сценариев, где устаревшие данные менее важны текущих.
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 | |
Отбрасывание новых¶
Записи никогда не ждут. Когда буфер слотов полон, входящая запись тихо отбрасывается. Потребитель обрабатывает уже буферизованное без лавины новых данных. Удобно для ограничения скорости или сброса нагрузки под давлением.
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 | |
Интерфейс Writer¶
Writer — любой объект, соответствующий интерфейсу Writer. Обязателен только write(); остальные методы необязательны.
У каждого асинхронного метода есть синхронный вариант *Sync для схемы try-fallback: сначала быстрый синхронный путь, при необходимости — асинхронная версия, если синхронный вызов не смог завершиться:
1 2 3 4 | |
writer.desiredSize¶
- number | null
Число свободных слотов буфера до достижения high water mark. Возвращает null, если writer закрыт или потребитель отключился.
Значение всегда неотрицательно.
writer.end([options])¶
options<Object>signal<AbortSignal>Отменить только эту операцию. Сигнал отменяет только ожидающий вызовend(); сам writer в ошибку не переводит.
- Возвращает:
<Promise<number>>Всего записано байт.
Сигнализирует, что данных больше не будет.
writer.endSync()¶
- Возвращает:
<number>Всего записано байт или-1, если writer не открыт.
Синхронный вариант writer.end(). Возвращает -1, если writer уже закрыт или в ошибке. Подходит для try-fallback:
1 2 3 4 | |
writer.fail(reason)¶
reason<any>
Переводит writer в терминальное состояние ошибки. Если writer уже закрыт или в ошибке, вызов ничего не делает. В отличие от write() и end(), fail() всегда синхронен: ошибка writer — чистый переход состояния без асинхронной работы.
writer.write(chunk[, options])¶
chunk<Uint8Array>|<string>options<Object>signal<AbortSignal>Отменить только эту запись. Сигнал отменяет только ожидающий вызовwrite(); сам writer в ошибку не переводит.
- Возвращает:
<Promise<void>>
Записывает фрагмент. Промис завершается, когда в буфере есть место.
writer.writeSync(chunk)¶
chunk<Uint8Array>|<string>- Возвращает:
<boolean>true, если запись принята,false, если буфер полон.
Синхронная запись. Не блокирует; при активном обратном давлении возвращает false.
writer.writev(chunks[, options])¶
chunks<Uint8Array[]>|<string[]>options<Object>signal<AbortSignal>Отменить только эту запись. Сигнал отменяет только ожидающий вызовwritev(); сам writer в ошибку не переводит.
- Возвращает:
<Promise<void>>
Записывает несколько фрагментов одним пакетом.
writer.writevSync(chunks)¶
chunks<Uint8Array[]>|<string[]>- Возвращает:
<boolean>true, если запись принята,false, если буфер полон.
Синхронная пакетная запись.
Модуль stream/iter¶
Все функции доступны и как именованные экспорты, и как свойства объекта пространства имён Stream:
1 2 3 4 5 | |
1 2 3 4 5 | |
Префикс node: в спецификаторе модуля указывать необязательно.
Источники¶
from(input)¶
input<string>|<ArrayBuffer>|<ArrayBufferView>|<Iterable>|<AsyncIterable>|<Object>Не должен бытьnullилиundefined.- Возвращает:
<AsyncIterable<Uint8Array[]>>
Создаёт асинхронный байтовый поток из входных данных. Строки кодируются в UTF-8. Значения ArrayBuffer и ArrayBufferView оборачиваются в Uint8Array. Массивы и итерируемые объекты рекурсивно разворачиваются и нормализуются.
Объекты с Symbol.for('Stream.toAsyncStreamable') или Symbol.for('Stream.toStreamable') преобразуются по этим протоколам. Протокол toAsyncStreamable имеет приоритет над toStreamable, тот — над протоколами итерации (Symbol.asyncIterator, Symbol.iterator).
1 2 3 4 5 | |
1 2 3 4 5 6 7 8 9 | |
fromSync(input)¶
input<string>|<ArrayBuffer>|<ArrayBufferView>|<Iterable>|<Object>Не должен бытьnullилиundefined.- Возвращает:
<Iterable<Uint8Array[]>>
Синхронный вариант from(). Возвращает синхронный итерируемый объект. Не принимает async iterable и промисы. Объекты с Symbol.for('Stream.toStreamable') преобразуются по этому протоколу (приоритет над Symbol.iterator). Протокол toAsyncStreamable полностью игнорируется.
1 2 3 | |
1 2 3 | |
Конвейеры¶
pipeTo(source[, ...transforms], writer[, options])¶
source<AsyncIterable>|<Iterable>Источник данных....transforms<Function>|<Object>Ноль или несколько преобразований.writer<Object>Назначение с методомwrite(chunk).options<Object>signal<AbortSignal>Прервать конвейер.preventClose<boolean>Еслиtrue, не вызыватьwriter.end()при окончании источника. По умолчанию:false.preventFail<boolean>Еслиtrue, не вызыватьwriter.fail()при ошибке. По умолчанию:false.
- Возвращает:
<Promise<number>>Всего записано байт.
Направляет источник через преобразования в writer. Если у writer есть writev(chunks), целые пакеты передаются одним вызовом (scatter/gather I/O).
Если writer реализует необязательные *Sync (writeSync, writevSync, endSync), pipeTo() сначала пытается использовать синхронные методы как быстрый путь и переходит к асинхронным только если синхронный вызов не смог завершиться (например, обратное давление или ожидание следующего тика). fail() всегда вызывается синхронно.
1 2 3 4 5 6 7 8 9 10 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
pipeToSync(source[, ...transforms], writer[, options])¶
source<Iterable>Синхронный источник данных....transforms<Function>|<Object>Ноль или несколько синхронных преобразований.writer<Object>Назначение с методомwrite(chunk).options<Object>- Возвращает:
<number>Всего записано байт.
Синхронный вариант pipeTo(). Источник, все преобразования и writer должны быть синхронными. Async iterable и промисы не допускаются.
У writer должны быть *Sync (writeSync, writevSync, endSync) и fail().
pull(source[, ...transforms][, options])¶
source<AsyncIterable>|<Iterable>Источник данных....transforms<Function>|<Object>Ноль или несколько преобразований.options<Object>signal<AbortSignal>Прервать конвейер.
- Возвращает:
<AsyncIterable<Uint8Array[]>>
Создаёт ленивый асинхронный конвейер. Данные из source не читаются, пока возвращаемый итерируемый объект не потребляют. Преобразования применяются по порядку.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | |
Использование AbortSignal:
1 2 3 4 5 | |
1 2 3 4 5 | |
pullSync(source[, ...transforms])¶
source<Iterable>Синхронный источник данных....transforms<Function>|<Object>Ноль или несколько синхронных преобразований.- Возвращает:
<Iterable<Uint8Array[]>>
Синхронный вариант pull(). Все преобразования должны быть синхронными.
Push-потоки¶
push([...transforms][, options])¶
...transforms<Function>|<Object>Необязательные преобразования на стороне readable.options<Object>highWaterMark<number>Максимум буферизованных слотов до включения обратного давления. Должно быть >= 1; меньшие значения приводятся к 1. По умолчанию:4.backpressure<string>Политика обратного давления:'strict','block','drop-oldest'или'drop-newest'. По умолчанию:'strict'.signal<AbortSignal>Прервать поток.
- Возвращает:
<Object>writer<PushWriter>Сторона writer.readable<AsyncIterable<Uint8Array[]>>Сторона readable.
Создаёт push-поток с обратным давлением. Writer записывает данные; readable потребляется как async iterable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | |
Writer, возвращаемый push(), соответствует интерфейсу Writer.
Дуплексные каналы¶
duplex([options])¶
options<Object>highWaterMark<number>Размер буфера в обе стороны. По умолчанию:4.backpressure<string>Политика в обе стороны. По умолчанию:'strict'.signal<AbortSignal>Сигнал отмены для обоих каналов.a<Object>Параметры направления A→B. Переопределяют общие опции.b<Object>Параметры направления B→A. Переопределяют общие опции.
- Возвращает:
<Array>Пара[channelA, channelB]дуплексных каналов.
Создаёт пару связанных дуплексных каналов для двусторонней связи, по аналогии с socketpair(). Данные, записанные в writer одного канала, появляются в readable другого.
У каждого канала:
writer— объект интерфейса Writer для отправки данных пиру.readable—AsyncIterable<Uint8Array[]>для чтения данных от пира.close()— закрыть этот конец канала (идемпотентно).[Symbol.asyncDispose]()— поддержка async dispose дляawait using.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | |
Потребители¶
array(source[, options])¶
source<AsyncIterable<Uint8Array[]>>|<Iterable<Uint8Array[]>>options<Object>signal<AbortSignal>limit<number>Максимум байт к потреблению. Если сумма байт превышает лимит, выбрасываетсяERR_OUT_OF_RANGE
- Возвращает:
<Promise<Uint8Array[]>>
Собирает все фрагменты в массив значений Uint8Array (без слияния в один буфер).
arrayBuffer(source[, options])¶
source<AsyncIterable<Uint8Array[]>>|<Iterable<Uint8Array[]>>options<Object>signal<AbortSignal>limit<number>Максимум байт к потреблению. Если сумма байт превышает лимит, выбрасываетсяERR_OUT_OF_RANGE
- Возвращает:
<Promise<ArrayBuffer>>
Собирает все байты в ArrayBuffer.
arrayBufferSync(source[, options])¶
source<Iterable<Uint8Array[]>>options<Object>limit<number>Максимум байт к потреблению. Если сумма байт превышает лимит, выбрасываетсяERR_OUT_OF_RANGE
- Возвращает:
<ArrayBuffer>
Синхронный вариант arrayBuffer().
arraySync(source[, options])¶
source<Iterable<Uint8Array[]>>options<Object>limit<number>Максимум байт к потреблению. Если сумма байт превышает лимит, выбрасываетсяERR_OUT_OF_RANGE
- Возвращает:
<Uint8Array[]>
Синхронный вариант array().
bytes(source[, options])¶
source<AsyncIterable<Uint8Array[]>>|<Iterable<Uint8Array[]>>options<Object>signal<AbortSignal>limit<number>Максимум байт к потреблению. Если сумма байт превышает лимит, выбрасываетсяERR_OUT_OF_RANGE
- Возвращает:
<Promise<Uint8Array>>
Собирает все байты потока в один Uint8Array.
1 2 3 4 | |
1 2 3 4 5 6 7 8 | |
bytesSync(source[, options])¶
source<Iterable<Uint8Array[]>>options<Object>limit<number>Максимум байт к потреблению. Если сумма байт превышает лимит, выбрасываетсяERR_OUT_OF_RANGE
- Возвращает:
<Uint8Array>
Синхронный вариант bytes().
text(source[, options])¶
source<AsyncIterable<Uint8Array[]>>|<Iterable<Uint8Array[]>>options<Object>encoding<string>Кодировка текста. По умолчанию:'utf-8'.signal<AbortSignal>limit<number>Максимум байт к потреблению. Если сумма байт превышает лимит, выбрасываетсяERR_OUT_OF_RANGE
- Возвращает:
<Promise<string>>
Собирает все байты и декодирует как текст.
1 2 3 | |
1 2 3 4 5 6 7 | |
textSync(source[, options])¶
source<Iterable<Uint8Array[]>>options<Object>- Возвращает:
<string>
Синхронный вариант text().
Утилиты¶
ondrain(drainable)¶
drainable<Object>Объект, реализующий протокол drainable.- Возвращает:
<Promise<boolean>>| null
Ожидает снятия обратного давления у drainable writer. Возвращает промис, который разрешается в true, когда writer снова может принять данные, или null, если объект не реализует протокол drainable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | |
merge(...sources[, options])¶
...sources<AsyncIterable<Uint8Array[]>>|<Iterable<Uint8Array[]>>Два или более итерируемых источника.options<Object>signal<AbortSignal>
- Возвращает:
<AsyncIterable<Uint8Array[]>>
Объединяет несколько async iterable, выдавая пакеты в порядке появления (что первым дало данные). Все источники потребляются параллельно.
1 2 3 4 | |
1 2 3 4 5 6 7 8 | |
tap(callback)¶
callback<Function>(chunks) => voidВызывается для каждого пакета.- Возвращает:
<Function>Преобразование без состояния.
Создаёт сквозное преобразование, которое наблюдает за пакетами, не меняя их. Удобно для логов, метрик и отладки.
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 8 9 10 11 | |
tap() намеренно не запрещает изменять фрагменты на месте в callback; возвращаемые значения игнорируются.
tapSync(callback)¶
callback<Function>- Возвращает:
<Function>
Синхронный вариант tap().
Несколько потребителей¶
broadcast([options])¶
options<Object>highWaterMark<number>Размер буфера в слотах. Должно быть >= 1; меньшие значения приводятся к 1. По умолчанию:16.backpressure<string>'strict','block','drop-oldest'или'drop-newest'. По умолчанию:'strict'.signal<AbortSignal>
- Возвращает:
<Object>writer<BroadcastWriter>broadcast<Broadcast>
Создаёт широковещательный канал в модели push для нескольких потребителей. Один writer записывает данные нескольким потребителям. У каждого потребителя свой курсор в общем буфере.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | |
broadcast.bufferSize¶
Число фрагментов в буфере.
broadcast.cancel([reason])¶
reason<Error>
Отменяет широковещание. Все потребители получают ошибку.
broadcast.consumerCount¶
Число активных потребителей.
broadcast.push([...transforms][, options])¶
...transforms<Function>|<Object>options<Object>signal<AbortSignal>
- Возвращает:
<AsyncIterable<Uint8Array[]>>
Создаёт нового потребителя. Каждый получает все данные, записанные в broadcast с момента подписки. Необязательные преобразования применяются к виду данных для этого потребителя.
broadcast[Symbol.dispose]()¶
Синоним broadcast.cancel().
Broadcast.from(input[, options])¶
input<AsyncIterable>|<Iterable>| Broadcastableoptions<Object>Как уbroadcast().- Возвращает:
<Object>{ writer, broadcast }
Создаёт Broadcast из существующего источника. Источник потребляется автоматически и транслируется всем подписчикам.
share(source[, options])¶
source<AsyncIterable>Источник для совместного использования.options<Object>- Возвращает:
<Share>
Создаёт общий поток в модели pull для нескольких потребителей. В отличие от broadcast(), источник читается только когда потребитель делает pull. Несколько потребителей делят один буфер.
1 2 3 4 5 6 7 8 9 10 11 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
share.bufferSize¶
Число фрагментов в буфере.
share.cancel([reason])¶
reason<Error>
Отменяет общий поток. Все потребители получают ошибку.
share.consumerCount¶
Число активных потребителей.
share.pull([...transforms][, options])¶
...transforms<Function>|<Object>options<Object>signal<AbortSignal>
- Возвращает:
<AsyncIterable<Uint8Array[]>>
Создаёт нового потребителя общего источника.
share[Symbol.dispose]()¶
Синоним share.cancel().
Share.from(input[, options])¶
input<AsyncIterable>| Shareableoptions<Object>Как уshare().- Возвращает:
<Share>
Создаёт Share из существующего источника.
shareSync(source[, options])¶
source<Iterable>Синхронный источник для совместного использования.options<Object>- Возвращает:
<SyncShare>
Синхронный вариант share().
SyncShare.fromSync(input[, options])¶
input<Iterable>| SyncShareableoptions<Object>- Возвращает:
<SyncShare>
Преобразования сжатия и распаковки¶
Преобразования сжатия и распаковки для pull(), pullSync(), pipeTo() и pipeToSync() доступны в модуле node:zlib/iter. Подробнее см. в документации node:zlib/iter.
Совместимость с классическими потоками¶
Эти функции связывают классические потоки stream.Readable/stream.Writable с API stream/iter.
И fromReadable(), и fromWritable() принимают объекты по контракту «утиной типизации» — не требуется наследование от stream.Readable или stream.Writable. Минимальный контракт для каждой функции описан ниже.
fromReadable(readable)¶
Стабильность: 1 – Экспериментальная
Эта возможность не подпадает под правила семантического версионирования. Несовместимые назад изменения или удаление могут произойти в любом будущем релизе.
readable<stream.Readable>|<Object>Классический Readable или любой объект с методамиread()иon().- Возвращает:
<AsyncIterable<Uint8Array[]>>Источник async iterable для stream/iter.
Преобразует классический Readable (или эквивалент по контракту) в источник async iterable stream/iter, который можно передать в from(), pull(), text() и т.д.
Если объект реализует протокол toAsyncStreamable (как stream.Readable), используется он. Иначе выполняется проверка по read() и on() (EventEmitter) и поток оборачивается в пакетный async iterator.
Результат кэшируется на экземпляр — два вызова fromReadable() с тем же потоком возвращают один и тот же iterable.
Для Readable в object mode или с кодировкой фрагменты автоматически приводятся к Uint8Array.
1 2 3 4 5 6 7 8 9 | |
1 2 3 4 5 6 7 8 9 10 11 12 | |
fromWritable(writable[, options])¶
Стабильность: 1 – Экспериментальная
Эта возможность не подпадает под правила семантического версионирования. Несовместимые назад изменения или удаление могут произойти в любом будущем релизе.
writable<stream.Writable>|<Object>Классический Writable или любой объект с методамиwrite()иon().options<Object>backpressure<string>Политика обратного давления. По умолчанию:'strict'.'strict'— записи отклоняются при полном буфере. Выявляет вызовы, игнорирующие обратное давление.'block'— записи ждут drain при полном буфере. Рекомендуется сpipeTo().'drop-newest'— при полном буфере новые записи тихо отбрасываются.'drop-oldest'— не поддерживается. ВыбрасываетсяERR_INVALID_ARG_VALUE.
- Возвращает:
<Object>Адаптер Writer для stream/iter.
Создаёт адаптер Writer stream/iter из классического Writable (или эквивалента по контракту). Его можно передать в pipeTo() как назначение.
Так как все записи в классический Writable по сути асинхронны, синхронные методы Writer (writeSync, writevSync, endSync) всегда возвращают false или -1, передавая работу асинхронному пути. Параметр options.signal на запись из интерфейса Writer также игнорируется.
Результат кэшируется на экземпляр — два вызова fromWritable() с тем же потоком возвращают один и тот же Writer.
Для потоков без writableHighWaterMark, writableLength и подобных свойств используются разумные значения по умолчанию. Writable в object mode (если определяется) отклоняются: интерфейс Writer только для байтов.
1 2 3 4 5 6 7 8 9 | |
1 2 3 4 5 6 7 8 9 10 11 12 | |
toReadable(source[, options])¶
Стабильность: 1 – Экспериментальная
Эта возможность не подпадает под правила семантического версионирования. Несовместимые назад изменения или удаление могут произойти в любом будущем релизе.
source<AsyncIterable>ИсточникAsyncIterable<Uint8Array[]>, например результатpull()илиfrom().options<Object>highWaterMark<number>Внутренний размер буфера в байтах до включения обратного давления. По умолчанию:65536(64 КБ).signal<AbortSignal>Необязательный сигнал отмены readable.
- Возвращает:
<stream.Readable>
Создаёт stream.Readable в байтовом режиме из AsyncIterable<Uint8Array[]> (родной формат пакетов API stream/iter). Каждый Uint8Array в выданном пакете передаётся в Readable отдельным фрагментом.
1 2 3 4 5 6 7 8 | |
1 2 3 4 5 6 7 8 | |
toReadableSync(source[, options])¶
Стабильность: 1 – Экспериментальная
Эта возможность не подпадает под правила семантического версионирования. Несовместимые назад изменения или удаление могут произойти в любом будущем релизе.
source<Iterable>ИсточникIterable<Uint8Array[]>, например результатpullSync()илиfromSync().options<Object>highWaterMark<number>Внутренний размер буфера в байтах до включения обратного давления. По умолчанию:65536(64 КБ).
- Возвращает:
<stream.Readable>
Создаёт stream.Readable в байтовом режиме из синхронного Iterable<Uint8Array[]>. Метод _read() извлекает данные из итератора синхронно, поэтому данные сразу доступны через readable.read().
1 2 3 4 5 6 | |
1 2 3 4 5 6 | |
toWritable(writer)¶
Стабильность: 1 – Экспериментальная
Эта возможность не подпадает под правила семантического версионирования. Несовместимые назад изменения или удаление могут произойти в любом будущем релизе.
writer<Object>Writer stream/iter. Обязателен толькоwrite();end(),fail(),writeSync(),writevSync(),endSync(), иwritev()необязательны.- Возвращает:
<stream.Writable>
Создаёт классический stream.Writable, опирающийся на Writer stream/iter.
Каждый вызов _write() / _writev() сначала пытается синхронные методы Writer (writeSync / writevSync), при необходимости переходит к асинхронным, если синхронный путь вернул false или выбросил исключение. Аналогично _final() вызывает endSync() перед end(). При успехе синхронного пути callback откладывается через queueMicrotask, чтобы сохранить асинхронный контракт.
highWaterMark у Writable устанавливается в Number.MAX_SAFE_INTEGER, чтобы по сути отключить внутреннюю буферизацию Writable и позволить нижележащему Writer управлять обратным давлением.
1 2 3 4 5 6 7 | |
1 2 3 4 5 6 7 | |
Символы протокола¶
Эти общеизвестные символы позволяют сторонним объектам участвовать в протоколе потоков без прямого импорта из node:stream/iter.
Stream.broadcastProtocol¶
- Значение:
Symbol.for('Stream.broadcastProtocol')
Значением должна быть функция. При вызове из Broadcast.from() она получает опции, переданные в Broadcast.from(), и должна вернуть объект, соответствующий интерфейсу Broadcast. Реализация полностью на усмотрение автора — можно управлять потребителями, буферизацией и обратным давлением как угодно.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | |
Stream.drainableProtocol¶
- Значение:
Symbol.for('Stream.drainableProtocol')
Реализуйте, чтобы writer был совместим с ondrain(). Метод должен возвращать промис, который разрешается при снятии обратного давления, или null, если обратного давления нет.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | |
Stream.shareProtocol¶
- Значение:
Symbol.for('Stream.shareProtocol')
Значением должна быть функция. При вызове из Share.from() она получает опции Share.from() и должна вернуть объект, соответствующий интерфейсу Share. Реализация полностью на усмотрение автора — общий источник, потребители, буферизация и обратное давление задаются произвольно.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | |
Stream.shareSyncProtocol¶
- Значение:
Symbol.for('Stream.shareSyncProtocol')
Значением должна быть функция. При вызове из SyncShare.fromSync() она получает опции SyncShare.fromSync() и должна вернуть объект, соответствующий интерфейсу SyncShare. Реализация полностью на усмотрение автора — общий источник, потребители и буферизация задаются произвольно.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | |
Stream.toAsyncStreamable¶
- Значение:
Symbol.for('Stream.toAsyncStreamable')
Значением должна быть функция, преобразующая объект в потоковое значение. Когда объект встречается в конвейере (как источник для from() или как значение, возвращаемое преобразованием), вызывается этот метод для получения данных. Можно вернуть (или дать через Promise) любое потоковое значение: строку, Uint8Array, AsyncIterable, Iterable или другой потоковый объект.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
Stream.toStreamable¶
- Значение:
Symbol.for('Stream.toStreamable')
Значением должна быть функция, синхронно преобразующая объект в потоковое значение. Когда объект встречается в конвейере (как источник для fromSync() или как значение, возвращаемое синхронным преобразованием), вызывается этот метод. Нужно синхронно вернуть потоковое значение: строку, Uint8Array или Iterable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
