Rad sa Node.js Stream API-jem

Darko Milošević iz Florence Healthcare-a nam u tekstu piše o najnovijoj implementaciji „Stream 3″, kao i o novim i korisnim API-jima koji dolaze uz Node v10+.

Darko Milošević
24/06/2021

Uvod

Reč „Stream” koristi se u kompjuterskim naukama da se opiše komadno prikupljanje podataka koji nisu dostupni izjednom već se preuzimaju tokom vremena. Stream je u suštini skup vrednosti sličan nizovima, ali obrnut iz prostorne do vremenske ose. 

U Node.js, „stream” je naziv modula koji implementira API za rad sa streaming podacima. 

Node.js stream API je prošao značajan razvojni put od svojih početaka u 2009. godini. API koji se konstantno razvija stvara zabunu vezanu za različite načine implementiranja kao i sposobnost mešanja različitih interfejsa. 

Naš fokus biće na najnovijoj implementaciji „Stream 3”, zajedno sa novim i korisnim API-jima koji dolaze uz Node v10+. 

Osnove Stream-a

Svaki stream je instanca EventEmitter-a. To znači da emituju događaje koji se mogu upotrebiti za pisanje i čitanje podataka. 

Tipovi Stream-a

Postoje četiri osnovne vrste stream-a u okviru Node.js: 

Očitavajući:

Zapisni:

Duplex:

Transform:

Modovi stream-a

Postoje dva moda u kojima operiše Node.js:

Standardni mod:

Objektni mod:

Buffering

Svaki stream poseduje unutrašnji buffer koji se koristi za smeštanje podataka. Očitavajući i pišući streams imaju po jedan i njemu se može pristupiti kroz `readable.readableBuffer` i `writable.writableBuffer`.

Duplex i transform streams imaju po dva odvojena buffera, što svakom dozvoljava da radi samostalno. 

Veličina buffera definiše se kroz `highWatermarkOption`. Za streams koji rade u standardnom modu njime se određuje veličina buffera dok se za streams u objektnom modu određuje broj objekata. 

Backpressure – povratni pritisak

Backpressure je koncept koji je obično teško razumljiv za sve one koji započinju rad sa Stream API-jem, što ga čini čestim uzrokom grešaka. Bez povratnog pritiska, streamovi ne bi bili toliko efikasni jer je isti jedan od najvažnijih odlika streamova. 

Backpressure je signal koji pišući stream šalje natrag očitavajućem stream-u. Signal se šalje kada očitavajući stream čita podatke suviše brzo, a interni buffer pišućeg streama (koji se podešava kroz `highWatermarkOption`) se ispuni brže nego što je moguće obraditi. 

Signal upozorava očitavajući stream da treba da pauzira pre nego što pošalje još podataka. Povratni pritisak je ono što omogućava pouzdan, vučni transfer podataka između očitavajućeg i pišućeg stream-a. 

Par stvari se mogu dogoditi ako se sistem za backpressure ne uzme u obzir pri transferu podataka: 

Backpressure rukovodi pouzdanim, bez-gubitnim i memorijski-efikasnim prenosom podataka, što je i primarna svrha Node.js Stream API-ja. 

API za korisnike Stream-a

Mnoge Node.js aplikacije koriste streamove. Ako se upoznate sa API-jem za korisnike streamova, bićete u stanju da ispravno koristite i konzumirate streamove. 

Konzumiranje pišućih streamova

Svaki pišući stream poseduje sledeće metode:

writable.write(chunk[, encoding][, callback])

writable.end([chunk][, encoding][, callback])

writable.cork()

writable.uncork()

writable.destroy()

Sledeći isečak koda daje primer jednostavne upotrebe pišućeg stream-a bez rukovanja povratnim pritiskom, što verovatno NE ŽELITE da uradite: 

Ovo su događaji koji se mogu izostaviti upisnom instancom:

drain

error

finish

close

pipe

unpipe

Jednostavan primer kako napisati pišući stream ručno (bez `readable.pipe()` ), i dok se uzima u obzir backpressure:

Ovo je jednostavan primer pošto samo upisuje istu rečenicu u petlji. Najvažniji aspekt je upravljanje povratnim pritiskom. 

Backpressure, tj. povratni pritisak je zgodno rešen kroz `readable.pipe()` metodu, što izgleda ovako: 

Ulazimo u više detalja o `readable.pipe()` metodi kasnije u ovom tekstu.

Pored fokusa na backpressure kada se stvara ručno upisivanje u pišuću instancu stream-a, praćenje mogućih grešaka dok pišete je takođe važno. 

Evo konkretnog primera za ručno upisivanje u pišući stream, uz osvrt na backpressure, ispravno upravljanje greškama i operacijama posle pisanja (u ovom slučaju logging):

Konzumiranje očitavajućih streamova 

Očitavajući streamovi mogu da rade u dva moda: 

(U dokumentima se pominje i objekt mod ali to je odvojena funkcija gde i tekući i pauzirani streamovi mogu da budu u objektnom modu ili ne) 

Svi očitavajući streamovi počinju u pauziranom modu. Za prelazak iz pauziranog u tekući mod, mora se raditi jedna od sledećih operacija koje će u sledećem delu biti opširno pokrivene: 

Za povratak na pauzirani mod, mora se uraditi jedno od sledećih: 

Postoje četiri načina konzumiranja čitajućih streamova. Developeri treba da odaberu jedan od metoda konzumiranja podataka. Mešanje API-ja može da dovede do neočekivanih ponašanja i nikada se ne treba praktikovati prilikom konzumiranja podataka iz pojedinog stream-a. 

  1. Upotrebom `readable.pause()`, `readable.resume()` i `data` događaja:

`data` događaj

`readable.pause()`

`readable.resume()`

Primer očitavajućeg streama koji se konzumira dok se podaci pišu u stdout. Ne nešto što je suviše korisno, ali poslužiće kao dobar primer:

  1. Koristeći `readable.read()` i `readable` događaj:

`readable` događaj

`readable.read([size])`

Ovo je sličan primer kao prethodni ali koristi drugi način konzimiranja očitavajućeg stream-a:

  1. Korišćenje `readable.pipe()`:

`readable.pipe(writable[, options])`

Ovo je najpogodnije za konzumiranje očitavajućeg stream-a pošto nije opširan a backpressure i zatvaranje stream-a se automatsko obavlja po završetku. 

Jednostavan primer iz prethodnih isečaka koda:

Jedna stvar koja se ne obavlja automatski je propagacija rukovanja greškom. Na primer, ako želimo da se svaki stream zatvori kada dođe do greške, moramo da priključimo osluškivače za greške u događajima. (error event listeners).

Primer potpune verzije konzumiranja očitavajućih streamova sa pipe-om uz ispravno rukovanje greškama: 

  1. Koristeći Async Iteraciju / Async Generatore:

Async Generatori su zvanično dostupni u Node v10+. The async generatori su mešavina async funkcija i generator funkcija. Oni implementiraju [Symbol.asyncIterator] metodu i mogu se koristiti za async iteraciju. U opštem smislu streamovi su iskomadani skup podataka koji se prenose tokom vremena, tako da se Async Generatori savršeno uklapaju. Evo primera: 

Konzumiranje Duplex i Transformirajućih Streamova

Duplex streamovi implementiraju i očitavajući i pišući interfejs. Jedna vrsta duplex stream-a je i `PassThrough` stream. Ova vrsta stream-a koristi se kada neki API-ji očekuju očitavajući stream kao parametar, a vi takođe želite ručno da unesete neke podatke. 

Da biste postigli oba, potrebno je da: 

Taj proces je prikazan ispod: 

Transformišući streamovi su Duplex streamovi. Ovi streamovi poseduju i očitavajući i pišući interfejs, ali njihova glavna svrha je da transformišu prolazeće podatke. 

Najuobičajeniji primer je kompresija podataka uz ugrađeni transformišući stream iz „zlib” modula:

Korisna klasa metoda (Node v10+)

`Stream.finished(stream, callback)`

Ova metoda korisna je za upravljanje greškama ili preduzimanje daljih postupaka kada je stream konzumiran. Primer: 

Stream.pipeline(…streams[, callback])`

Ova metoda je najčistiji i najmanje opširan način izgradnje stream pipeline-a. U poređenju sa `readable.pipe()`, sve se obavlja automatski, uključujući i propagaciju greške i čišćenje resursa po završetku procesa. Primer: 

API za one koji implementiraju Stream

Stream API je podložan proširivanju i nudi interfejs u kome developeri mogu da stvore sopstvene produžetke stream-a. Postoje dva načina da implementirate vaš stream: 

  1. Proširite ispravnu parent klasu:

Nova klasa stream-a mora da implementira jednu ili više specifičnih metoda koje zavise od vrste stream-a koji se stvara (one će biti navedene dok prolazimo kroz implementaciju svakog tipa stream-a) 

Tim metodama prethodi podvučna linija kao prefiks i koriste se samo za implementiranje novih streamova. Ako se upotrebe tokom konzumiranja izazvaće neočekivana ponašanja. 

  1. Proširivanje streamova je pojednostavljeni način da se direktno stvore instance i pruže ispravne metode kao opcije za konstrukciju: 

Dobro je zapamtiti da u ovom slučaju potrebnim metodama nije pridružena podvučna linija kao prefiks. 

Implementiranje pišućih stream-ova

Da bismo implementirali pišući stream, moramo da obezbedimo `writable._write()` metodu za slanje podataka podložnom resursu:

`writable._write(chunk, encoding, callback)`

Evo jedne jednostavne implementacije pišućeg stream-a:

Ovaj stream „pipuje” standardni unos u standardni iznos, osim kada se unese crtica napred, tada se stream baca. Ovaj primer služi za demonstraciju. 

Implementiranje očitavajućeg stream-a

Da bi se implementirao novi očitavajući stream, moramo da prizovemo „readable constructor” funkciju i implementiramo `readable._read()` metodu (druge metode su opcionalne), dok unutar nje pozivamo `readable.push()`:

`readable._read(size)`

`readable.push()`

Ova implementacija očitavajućeg stream-a prikazana ispod generisaće nasumično određene integere između 1 i 10 svake sekunde kroz jedan minut, a tada će stream da završi generaciju podataka i zatvori se. 

Implementiranje Duplex stream-a

Duplex stream implementira i očitavajuće i pišuće interfejse nezavisno jedan od drugoga. Dupleks klasa obično nasleđuje iz stream.Readable i parazitski stream.Writable (JavaScript ne podržava višestruko nasleđivanje).

Da biste stvorili drugačiju implementaciju duplex stream-a, morate da implementirate sve potrebne metode za očitavajuće i pišuće streamove, a to su `readable._read()` i `writable._write()`.

Strim prikazan ispod loguje sve iz stdin (pišuće strane), i pipuje nasumične „smajlije” u stdout (očitavajuća strama) sve dok se „tužni smajli” ne pojavi, kada se očitavajući stream terminiše. 

Implementacija transformirajućeg stream-a

Transformirajući stream sličan je duplex stream-u (on je vrsta duplex stream-a) ali poseduje jednostavniji interfejs. Iznos se izvodi iz unosa. Nema potrebe da iznos bude iste veličine kao unos, da ima isti broj grupa podataka ili da stigne u isto vreme. 

Samo jedna metoda je potrebna za implementiranje transformirajućeg stream-a, a to je `transform._transform()` metoda (`transform._flush()` je opcionalna).

`transform._transform(chunk, encoding, callback)`

`transform._flush(callback)` — optional.

Zaključak

U ovom članku naučili smo kako da konzumiramo sve tipove Node.js Streamova. Takođe smo naučili da implementiramo sopstvene streamove i koristimo njihove moćne funkcije. 

Node.js Streamovi imaju reputaciju da je sa njima teško raditi, ali uz dobro razumevanje njihovih određenih API-ja postaće dragocena alatka za vaš rad. 

Darko Milošević

Objavio/la članak.

četvrtak, 24. Jun, 2021.

IT Industrija

🔥 Najčitanije