//============================================================================
// Name : pipebuffer.cpp
// Author : Ondřej Novák (ondra.novacisko.cz)
// Version : 1
// Licence : public domain
//============================================================================
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
using namespace std;
int main(int argc, char **argv) {
std::size_t maxSize = 0;
std::size_t chunkSize = 4096;
std::size_t initialSize = 4096;
int index = 0;
int loglevel = 0;
bool showbuff = false;
fcntl(1, F_SETFL, fcntl(1, F_GETFL) | O_NONBLOCK);
fcntl(0, F_SETFL, fcntl(0, F_GETFL) | O_NONBLOCK);
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i],"-v") == 0) loglevel=1;
else if (strcmp(argv[i],"-vv") == 0) loglevel=2;
else if (strcmp(argv[i],"-vvv") == 0) loglevel=3;
else if (strcmp(argv[i],"-vvvv") == 0) loglevel=4;
else if (strcmp(argv[i],"-p") == 0) showbuff = true;
else {
char c = 0;
unsigned long val;
std::size_t fval;
if (sscanf(argv[i],"%lu%c",&val,&c) >= 1) {
switch (c) {
case 0: fval = val; break;
case 'K': fval = (std::size_t)val * 1024;break;
case 'M': fval = (std::size_t)val * 1024 * 1024;break;
case 'G': fval = (std::size_t)val * 1024 * 1024 * 1024;break;
default:
std::cerr << "Invalid parameter: " << argv[i] << std::endl;
return -1;
}
} else {
std::cerr << "Invalid parameter: " << argv[i] << std::endl;
return -1;
}
if (index == 0) maxSize = fval;
else if (index == 1) initialSize = fval;
index++;
}
}
std::size_t curBufSz = initialSize;
char *buffer = (char *)malloc(curBufSz);
std::size_t wrpos = 0;
std::size_t rdpos = 0;
bool empty = true;
bool closed = false;
int fdin = 0;
int fdout = 1;
std::size_t newBufSz = curBufSz;
std::size_t needExtra = 0;
bool underflow = false;
bool overflow = false;
std::size_t lasths = 0;
std::size_t totalunder = 0;
do {
fd_set wrset,rdset;
FD_ZERO(&wrset);
FD_ZERO(&rdset);
if (!empty) {
FD_SET(fdout,&wrset);
} else if (closed) {
if (loglevel >= 1)
std::cerr << "buffer empty" << std::endl;
break;
}
if ((wrpos != rdpos || empty) && !closed) {
FD_SET(fdin,&rdset);
} else if (underflow) {
// if (needExtra > curBufSz * 3) needExtra = curBufSz*3;
newBufSz += needExtra;
totalunder+=newBufSz;
newBufSz = totalunder/4;
totalunder-=newBufSz;
needExtra = 0;
underflow = false;
if ((maxSize && newBufSz > maxSize) || newBufSz < curBufSz)
newBufSz = curBufSz;
if (newBufSz != curBufSz && loglevel >=2 )
std::cerr << "Buffer expansion: " << (newBufSz / 1024) << "KB" << std::endl;
} else {
overflow = true;
}
select(fdout+1,&rdset,&wrset,0,0);
if (FD_ISSET(fdin,&rdset)) {
char *wrptr = buffer + wrpos;
std::size_t rmsz = (wrpos > rdpos || empty)?curBufSz-wrpos:rdpos - wrpos;
int bk = rmsz > chunkSize?chunkSize:(int)rmsz;
int rd = read(fdin,wrptr,bk);
if (rd < 1) {
if (rd < 0 && loglevel >=4)
std::cerr << "input error" << errno << std::endl;
if (loglevel >= 1)
std::cerr << "input stream closed " << std::endl;
closed = true;
} else {
if (loglevel >= 4)
std::cerr << "read bytes: " << rd << std::endl;
wrpos += rd;
if (wrpos >= curBufSz)
wrpos = 0;
empty = false;
}
}
if (FD_ISSET(fdout,&wrset)) {
char *rdptr = buffer + rdpos;
std::size_t rmsz = (rdpos >= wrpos && !empty)?curBufSz-rdpos:wrpos - rdpos;
int bk = rmsz > chunkSize?chunkSize:(int)rmsz;
int rd = write(fdout,rdptr,bk);
if (rd < 1) {
if (rd == -1 && errno == EWOULDBLOCK) {
rd = 0;
} else {
if (loglevel >= 1)
std::cerr << "output stream closed" << std::endl;
break;
}
} else {
if (loglevel >= 4)
std::cerr << "write bytes: " << rd << std::endl;
rdpos += rd;
if (rdpos >= curBufSz) {
rdpos = 0;
}
if (rdpos < wrpos && newBufSz != curBufSz) {
if (newBufSz < wrpos) newBufSz = wrpos;
if (newBufSz < rdpos) newBufSz = rdpos;
char *pp = (char *)realloc(buffer, newBufSz);
if (pp != 0) {
if (loglevel >= 3)
std::cerr << "buffer resized: " << newBufSz << std::endl;
buffer = pp;
curBufSz = newBufSz;
}
}
empty = rdpos == wrpos;
if (empty) {
if (underflow) {
needExtra+=rd;
if (loglevel >= 3)
std::cerr << "buffer extra: " << needExtra << std::endl;
} else {
underflow = true;
if (loglevel >= 2)
std::cerr << "buffer underflow" << std::endl;
}
}
if (overflow) {
needExtra+=rd;
overflow = false;
if (loglevel >= 3)
std::cerr << "buffer extra: " << needExtra << std::endl;
}
}
}
if (showbuff) {
std::size_t sz = wrpos>rdpos?wrpos - rdpos:wrpos+curBufSz - rdpos;
if (empty) sz = 0;
std::size_t hashcnt = sz / (curBufSz / 35);
if (lasths != hashcnt) {
fputc('[',stderr);
for (std::size_t i = 0; i < 35;i++)
if (i >=hashcnt) fputc( '.',stderr);else fputc(underflow?'x':'#',stderr);
fputc(']',stderr);
fprintf(stderr," %luKB\r",(unsigned long)(curBufSz/1024));
fflush(stderr);
lasths = hashcnt;
}
}
} while (true);
return 0;
}
Netvrdím, že je to hezký a čistý kód ... konečně, je to nacpané celé do mainu :-) Ale mohlo by se to někomu hodit. Přeložíme pomocí
g++ pipebuffer.cpp -o pipebuffer
Pokud je použito bez parametrů mezi dvěma |, pak se to samo snaží zjistit potřebnou velikost bufferu. Jinak to přijme dva parametr, první udává maximální velikost buffer, a druhý počáteční velikost. Volitelně lze připojit -p (progress) -v (verbose) případně -vv, -vvv, -vvvv
pipebuffer -p 200M 100M # progress, max 200MB, počáteční 100MB
pipebuffer -p 200M # progress, max 200MB, počáteční 4KB
pipebuffer -p # progress, automatická velikost bufferu bez limitu (může se zastavit pokud selže realloc, pak se už nerozšiřuje)