Buffer k rouře

ondra.novacisko.cz

Buffer k rouře
« kdy: 13. 01. 2011, 21:05:16 »
Zdar. Mám takový asi začátečnický dotaz. Mám příkaz

Kód: [Vybrat]
wget -O - http://blabla | ./collectData

Program collectData zpracovává data na stdin po větších kusech. Jakmile nasyslí dostatečný množství dat, začne počítat a chvíli si tam něco počítá, než vyplivne někam výstup a natáhne si další data. Problém je v tom, že výsledkem je, že počítač chvíli data tahá, chvíli počítá. Mě napadlo, že by bylo dobré do roury vložit nějaký větší buffer, který by způsobil, že data tahaná wgetem se během počítání natáhnou do paměti, aby byly hned k dispozici.

Inu, abych to vyzkoušel, nějaký jednorázový buffer jsem si napsal, něco jako

Kód: [Vybrat]
wget -O - http://blabla | ./pipebuffer | ./collectData

Rozhodně to zvedlo výkon celého výpočtu, procesor jede na 100%, přenos dat se my rapidně zrychlil. Nyní otázka.

Existuje v linuxu nějaký standardní příkaz na tuto funkcionalitu? Abych to nemusel balit jako balíček a dělat něco, co už udělal někdo předemnout
« Poslední změna: 13. 01. 2011, 22:19:22 od Petr Krčmář »


ondrej

Re: buffer k rouře
« Odpověď #1 kdy: 13. 01. 2011, 21:15:09 »
V debianu je na to balicek pv, nevim jak se jmenuje v ostatnich distrech. Mrkni na jeho manualove stranky, myslim ze by to mohlo byt to co hledas.

Gentoo

Re: buffer k rouře
« Odpověď #2 kdy: 13. 01. 2011, 21:22:32 »
Ondra.Novacisko: Vy pouzivate Linux? To se mi nechce verit.. :-O ...

asdfasdf

Re: buffer k rouře
« Odpověď #3 kdy: 13. 01. 2011, 21:35:05 »
Nepouzivam, ale nasiel som toto:

http://linux.die.net/man/1/mbuffer

pv pouzivam, ale nevedel som, ze ma aj buffer :) V kazdom pripade oba maju podobnu funkcionalitu.

ondra.novacisko.cz

Re: buffer k rouře
« Odpověď #4 kdy: 13. 01. 2011, 21:35:34 »
Ondra.Novacisko: Vy pouzivate Linux? To se mi nechce verit.. :-O ...
Já už drahnou dobu používám jak Windows, tak Linux (takže mohu samozřejmě srovnávat)... a nejen, že používám, dokonce v obou systémech programuju a mám tam dost aplikací. Ale to jsme trošku offtopic. Založte si na to nové téma

Děkuji za pomoc.


ondra.novacisko.cz

Moje implementace (pipebuffer)
« Odpověď #5 kdy: 14. 01. 2011, 13:19:23 »
Kód: [Vybrat]
//============================================================================
// Name        : pipebuffer.cpp
// Author      : Ondřej Novák (ondra.novacisko.cz)
// Version     : 1
// Licence     : public domain
//============================================================================

#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
using namespace std;

int main(int argc, char **argv) {
std::size_t maxSize = 0;
std::size_t chunkSize = 4096;
std::size_t initialSize = 4096;
int index = 0;
int loglevel = 0;
bool showbuff = false;

fcntl(1, F_SETFL, fcntl(1, F_GETFL) | O_NONBLOCK);
fcntl(0, F_SETFL, fcntl(0, F_GETFL) | O_NONBLOCK);


for (int i = 1; i < argc; i++) {
if (strcmp(argv[i],"-v") == 0) loglevel=1;
else if (strcmp(argv[i],"-vv") == 0) loglevel=2;
else if (strcmp(argv[i],"-vvv") == 0) loglevel=3;
else if (strcmp(argv[i],"-vvvv") == 0) loglevel=4;
else if (strcmp(argv[i],"-p") == 0) showbuff = true;
else {
char c = 0;
unsigned long val;
std::size_t fval;
if (sscanf(argv[i],"%lu%c",&val,&c) >= 1) {

switch (c) {
case 0: fval = val; break;
case 'K': fval = (std::size_t)val * 1024;break;
case 'M': fval = (std::size_t)val * 1024 * 1024;break;
case 'G': fval = (std::size_t)val * 1024 * 1024 * 1024;break;
default:
std::cerr << "Invalid parameter: " << argv[i] << std::endl;
return -1;
}

} else {
std::cerr << "Invalid parameter: " << argv[i] << std::endl;
return -1;
}
if (index == 0) maxSize = fval;
else if (index == 1) initialSize = fval;

index++;
}
}

std::size_t curBufSz = initialSize;
char *buffer = (char *)malloc(curBufSz);
std::size_t wrpos = 0;
std::size_t rdpos = 0;
bool empty = true;
bool closed = false;

int fdin = 0;
int fdout = 1;
std::size_t newBufSz = curBufSz;
std::size_t needExtra = 0;
bool underflow = false;
bool overflow = false;
std::size_t lasths = 0;
std::size_t totalunder = 0;

do {

      fd_set wrset,rdset;
FD_ZERO(&wrset);
FD_ZERO(&rdset);
if (!empty) {
FD_SET(fdout,&wrset);
} else if (closed) {
if (loglevel >= 1)
std::cerr << "buffer empty" << std::endl;
break;
}
if ((wrpos != rdpos || empty) && !closed) {
FD_SET(fdin,&rdset);
} else if (underflow) {
// if (needExtra > curBufSz * 3) needExtra = curBufSz*3;
newBufSz += needExtra;
totalunder+=newBufSz;
newBufSz = totalunder/4;
totalunder-=newBufSz;
needExtra = 0;
underflow = false;
if ((maxSize && newBufSz > maxSize) || newBufSz < curBufSz)
newBufSz = curBufSz;
if (newBufSz != curBufSz && loglevel >=2 )
std::cerr << "Buffer expansion: " << (newBufSz / 1024) << "KB" << std::endl;
} else {
overflow = true;
}
select(fdout+1,&rdset,&wrset,0,0);

if (FD_ISSET(fdin,&rdset)) {

char *wrptr = buffer + wrpos;
std::size_t rmsz = (wrpos > rdpos || empty)?curBufSz-wrpos:rdpos - wrpos;
int bk = rmsz > chunkSize?chunkSize:(int)rmsz;
int rd = read(fdin,wrptr,bk);
if (rd < 1) {
if (rd < 0 && loglevel >=4)
std::cerr << "input error" << errno << std::endl;
if (loglevel >= 1)
std::cerr << "input stream closed " << std::endl;
closed = true;
} else {
if (loglevel >= 4)
std::cerr << "read bytes: " << rd << std::endl;
wrpos += rd;
if (wrpos >= curBufSz)
wrpos = 0;
empty = false;
}
}
if (FD_ISSET(fdout,&wrset)) {
char *rdptr = buffer + rdpos;
std::size_t rmsz = (rdpos >= wrpos && !empty)?curBufSz-rdpos:wrpos - rdpos;
int bk = rmsz > chunkSize?chunkSize:(int)rmsz;
int rd = write(fdout,rdptr,bk);
if (rd < 1) {
if (rd == -1 && errno == EWOULDBLOCK) {
rd = 0;
} else {
if (loglevel >= 1)
std::cerr << "output stream closed" << std::endl;
break;
}
} else {
if (loglevel >= 4)
std::cerr << "write bytes: " << rd << std::endl;
rdpos += rd;
if (rdpos >= curBufSz) {
rdpos = 0;
}
if (rdpos < wrpos && newBufSz != curBufSz) {
if (newBufSz < wrpos) newBufSz = wrpos;
if (newBufSz < rdpos) newBufSz = rdpos;
char *pp = (char *)realloc(buffer, newBufSz);
if (pp != 0) {
if (loglevel >= 3)
std::cerr << "buffer resized: " << newBufSz << std::endl;
buffer = pp;
curBufSz = newBufSz;
}
}
empty = rdpos == wrpos;
if (empty) {
if (underflow) {
needExtra+=rd;
if (loglevel >= 3)
std::cerr << "buffer extra: " << needExtra << std::endl;
} else {
underflow = true;
if (loglevel >= 2)
std::cerr << "buffer underflow" << std::endl;
}
}
if (overflow) {
needExtra+=rd;
overflow = false;
if (loglevel >= 3)
std::cerr << "buffer extra: " << needExtra << std::endl;
}
}
}
if (showbuff) {
std::size_t sz = wrpos>rdpos?wrpos - rdpos:wrpos+curBufSz - rdpos;
if (empty) sz = 0;
std::size_t hashcnt = sz / (curBufSz / 35);
if (lasths != hashcnt) {
fputc('[',stderr);
for (std::size_t i = 0; i < 35;i++)
if (i >=hashcnt) fputc( '.',stderr);else fputc(underflow?'x':'#',stderr);
fputc(']',stderr);
fprintf(stderr," %luKB\r",(unsigned long)(curBufSz/1024));
fflush(stderr);
lasths = hashcnt;
}
}


} while (true);

return 0;
}

Netvrdím, že je to hezký a čistý kód ... konečně, je to nacpané celé do mainu :-) Ale mohlo by se to někomu hodit. Přeložíme pomocí
Kód: [Vybrat]
g++ pipebuffer.cpp -o pipebuffer

Pokud je použito bez parametrů mezi dvěma |, pak se to samo snaží zjistit potřebnou velikost bufferu. Jinak to přijme dva parametr, první udává maximální velikost buffer, a druhý počáteční velikost. Volitelně lze připojit -p (progress) -v (verbose) případně -vv, -vvv, -vvvv

Kód: [Vybrat]
pipebuffer -p 200M 100M   # progress, max 200MB, počáteční 100MB
pipebuffer -p 200M  # progress, max 200MB, počáteční 4KB
pipebuffer -p # progress, automatická velikost bufferu bez limitu (může se zastavit pokud selže realloc, pak se už nerozšiřuje)