//============================================================================
// Name        : pipebuffer.cpp
// Author      : Ondřej Novák (ondra.novacisko.cz)
// Version     : 1
// Licence     : public domain
//============================================================================
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
using namespace std;
int main(int argc, char **argv) {
	std::size_t maxSize = 0;
	std::size_t chunkSize = 4096;
	std::size_t initialSize = 4096;
	int index = 0;
	int loglevel = 0;
	bool showbuff = false;
	fcntl(1, F_SETFL, fcntl(1, F_GETFL) | O_NONBLOCK);
	fcntl(0, F_SETFL, fcntl(0, F_GETFL) | O_NONBLOCK);
	
	for (int i = 1; i < argc; i++) {
		if (strcmp(argv[i],"-v") == 0) loglevel=1;
		else if (strcmp(argv[i],"-vv") == 0) loglevel=2;
		else if (strcmp(argv[i],"-vvv") == 0) loglevel=3;
		else if (strcmp(argv[i],"-vvvv") == 0) loglevel=4;
		else if (strcmp(argv[i],"-p") == 0) showbuff = true;
		else {
			char c = 0;
			unsigned long val;
			std::size_t fval;
			if (sscanf(argv[i],"%lu%c",&val,&c) >= 1) {
				switch (c) {
				case 0: fval = val; break;
				case 'K': fval = (std::size_t)val * 1024;break;
				case 'M': fval = (std::size_t)val * 1024 * 1024;break;
				case 'G': fval = (std::size_t)val * 1024 * 1024 * 1024;break;
				default:
					std::cerr << "Invalid parameter: " << argv[i] << std::endl;
					return -1;
				}
			} else {
				std::cerr << "Invalid parameter: " << argv[i] << std::endl;
				return -1;
			}
			if (index == 0) maxSize = fval;
			else if (index == 1) initialSize = fval;
			index++;
		}
	}
	std::size_t curBufSz = initialSize;
	char *buffer = (char *)malloc(curBufSz);
	std::size_t wrpos = 0;
	std::size_t rdpos = 0;
	bool empty = true;
	bool closed = false;
	int fdin = 0;
	int fdout = 1;
	std::size_t newBufSz = curBufSz;
	std::size_t needExtra = 0;
	bool underflow = false;
	bool overflow = false;
	std::size_t lasths = 0;
	std::size_t totalunder = 0;
	
	do {
  	    fd_set wrset,rdset;
		FD_ZERO(&wrset);
		FD_ZERO(&rdset);
		if (!empty) {
			FD_SET(fdout,&wrset);
		} else if (closed) {
			if (loglevel >= 1)
				std::cerr << "buffer empty" << std::endl;
			break;
		}
		if ((wrpos != rdpos || empty) && !closed) {
			FD_SET(fdin,&rdset);
		} else if (underflow) {
//			if (needExtra > curBufSz * 3) needExtra = curBufSz*3;
			newBufSz += needExtra;
			totalunder+=newBufSz;
			newBufSz = totalunder/4;
			totalunder-=newBufSz;
			needExtra = 0;
			underflow = false;
			if ((maxSize && newBufSz > maxSize) || newBufSz < curBufSz)
				newBufSz = curBufSz;
			if (newBufSz != curBufSz && loglevel >=2 )
				std::cerr << "Buffer expansion: " << (newBufSz / 1024) << "KB" << std::endl;
		} else {
			overflow = true;
		}
		select(fdout+1,&rdset,&wrset,0,0);
		if (FD_ISSET(fdin,&rdset)) {
			char *wrptr = buffer + wrpos;
			std::size_t rmsz = (wrpos > rdpos || empty)?curBufSz-wrpos:rdpos - wrpos;
			int bk = rmsz > chunkSize?chunkSize:(int)rmsz;
			int rd = read(fdin,wrptr,bk);
			if (rd < 1) {
				if (rd < 0 && loglevel >=4)
					std::cerr << "input error" << errno << std::endl;
				if (loglevel >= 1)
					std::cerr << "input stream closed " << std::endl;
				closed = true;
			} else {
				if (loglevel >= 4)
					std::cerr << "read bytes: " << rd << std::endl;
				wrpos += rd;
				if (wrpos >= curBufSz)
					wrpos = 0;
				empty = false;
			}
		}
		if (FD_ISSET(fdout,&wrset)) {
			char *rdptr = buffer + rdpos;
			std::size_t rmsz = (rdpos >= wrpos && !empty)?curBufSz-rdpos:wrpos - rdpos;
			int bk = rmsz > chunkSize?chunkSize:(int)rmsz;
			int rd = write(fdout,rdptr,bk);
			if (rd < 1) {
				if (rd == -1 && errno == EWOULDBLOCK) {
					rd = 0;
				} else {
					if (loglevel >= 1)
					std::cerr << "output stream closed" << std::endl;
				break;
				}
			} else {
				if (loglevel >= 4)
					std::cerr << "write bytes: " << rd << std::endl;
				rdpos += rd;
				if (rdpos >= curBufSz) {
					rdpos = 0;
				}
				if (rdpos < wrpos && newBufSz != curBufSz) {
					if (newBufSz < wrpos) newBufSz = wrpos;
					if (newBufSz < rdpos) newBufSz = rdpos;
					char *pp = (char *)realloc(buffer, newBufSz);
					if (pp != 0) {
						if (loglevel >= 3)
							std::cerr << "buffer resized: " << newBufSz << std::endl;
						buffer = pp;
						curBufSz = newBufSz;
					}
				}
				empty = rdpos == wrpos;
				if (empty) {
					if (underflow) {
						needExtra+=rd;
						if (loglevel >= 3)
							std::cerr << "buffer extra: " << needExtra << std::endl;
					} else {
						underflow = true;
						if (loglevel >= 2)
							std::cerr << "buffer underflow" << std::endl;
					}
				}
				if (overflow) {
					needExtra+=rd;
					overflow = false;
					if (loglevel >= 3)
						std::cerr << "buffer extra: " << needExtra << std::endl;
				}
			}
		}
		if (showbuff) {
				std::size_t sz = wrpos>rdpos?wrpos - rdpos:wrpos+curBufSz - rdpos;
				if (empty) sz = 0;
				std::size_t hashcnt = sz / (curBufSz / 35);
				if (lasths != hashcnt) {
					fputc('[',stderr);
					for (std::size_t i = 0; i < 35;i++)
						if (i >=hashcnt) fputc( '.',stderr);else fputc(underflow?'x':'#',stderr);
					fputc(']',stderr);
					fprintf(stderr," %luKB\r",(unsigned long)(curBufSz/1024));
					fflush(stderr);
					lasths = hashcnt;
				}
		} 
	} while (true);
	return 0;
}
Netvrdím, že je to hezký a čistý kód ... konečně, je to nacpané celé do mainu :-) Ale mohlo by se to někomu hodit. Přeložíme pomocí 
g++ pipebuffer.cpp -o pipebuffer
Pokud je použito bez parametrů mezi dvěma |, pak se to samo snaží zjistit potřebnou velikost bufferu. Jinak to přijme dva parametr, první udává maximální velikost buffer, a druhý počáteční velikost. Volitelně lze připojit -p (progress) -v (verbose) případně -vv, -vvv, -vvvv
pipebuffer -p 200M 100M   # progress, max 200MB, počáteční 100MB
pipebuffer -p 200M  # progress, max 200MB, počáteční 4KB
pipebuffer -p # progress, automatická velikost bufferu bez limitu (může se zastavit pokud selže realloc, pak se už nerozšiřuje)