Server IP : 85.214.239.14 / Your IP : 3.22.75.216 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/cwd/proc/3/cwd/proc/2/task/2/cwd/lib/node_modules/pm2/node_modules/culvert/ |
Upload File : |
Culvert ======= Channel for easy streaming of work between complex logics. This is used in place of streams for CSP style flow. I use it in js-git for network and file streams. Usually, you'll want to split sides to create a duplex channel. ```js var makeChannel = require('culvert'); var serverChannel = makeChannel(); var clientChannel = makeChannel(); function connect(host, port) { // This represents the server-side of the duplex pipe var socket = { put: serverChannel.put, drain: serverChannel.drain, take: cientChannel.drain }; // When we want to send data to the consumer... socket.put(someData); // When we want to read from the consumer... socket.take(function (err, item) {}); // Return the client's end of the pipe return { put: clientChannel.put, drain: clientChannel.drain, take: serverChannel.take }; } ``` If you want/need to preserve back-pressure and honor the buffer limit, make sure to wait for drain when `put` returns false. ```js // Start a read socket.take(onData); function onData(err, item) { if (err) throw err; if (item === undefined) { // End stream when nothing comes out console.log("done"); } else if (socket.put(item)) { // If put returned true, keep reading socket.take(onData); } else { // Otherwise pause and wait for drain socket.drain(onDrain); } } function onDrain(err) { if (err) throw err; // Resume reading socket.take(onData); } ``` If you're using continuables and generators, it's much nicer syntax. ```js var item; while (item = yield socket.take, item !== undefined) { if (!socket.put(item)) yield socket.drain; } console.log("done"); ``` Also the continuable version won't blow the stack if lots of events come in on the same tick. ## makeChannel(bufferSize, monitor) Create a new channel. The optional bufferSize is how many items can be in the queue and still be considered not full. The optional monitor function will get called with `(type, item)` where `type` is either "put" or "take" and `item` is the value being put or taken. ## channel.put(item) -> more This is a sync function. You can add as many items to the channel as you want and it will queue them up. This returns `true` when the queue is smaller than bufferSize, it returns false if you should wait for drain. ## channel.drain(callback) Drain is a reusable continuable. Use this when you want to wait for the buffer to be below the bufferSize mark. ## channel.take(callback) Take is for reading. The callback will have the next item. It may call sync or it may be later.