Agregace velkého množství streamovaných dat

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #15 kdy: 22. 09. 2021, 17:57:57 »
Ještě je možnost kouknout po nějaké off-memory hashmapě pro javu, něco jako https://github.com/jankotek/MapDB nebo podobně.  Nemám s tím zkušenost, ale bylo by to něco mezi řešením v paměti a řešením pomocí externí databáze.

do toho moc dat neulozite, zadna komprese


RDa

  • *****
  • 1 500
    • Zobrazit profil
    • E-mail
Re:Agregace velkeho mnzstvi stream dat
« Odpověď #16 kdy: 22. 09. 2021, 18:06:24 »
Převeď si to do nějakýho rozumnýho binárního formátu. 90 milionů metrik popíšeš místo stringem o velikosti desítek bajtů jedním integerem, čímž se Ti velikost zprávy smrskne skoro desetkrát a vejde se Ti to do paměti.....

Dik za hint, tohle mi asi ale nepomuze. Prevod na integer otisk postaci pro agregaci, ja to ale pak zase potrebuju expandovat na puvodni stringy a poslat dale do sveta.

Tak holt prijde ke kazdemu sloupcovemu streamu i bitmapa jestli je field used. Bitmapa bude obsahovat indikaci zda je to null/nenull, coz je 1 Gbit/hodinu * pocet sloupcu. A pro expandovani random N-teho zaznamu se holt musi spocitat kolik jich bylo prazdnych predtim, ale pokud delas exapanzi jako replay dat od pocatku, tak to je v podstate takova "dekomprese" on the fly :)

Pokud by ale celkovy pocet sloupcu byl hodne velky a kazdy record mel velice nahodny subset poli, bych spis uvazoval o klasickem binarnim DB formatu s indexem pro indikaci jaky je to sloupec - jako:  byte cc (column count), byte field_lookup[cc], unsigned data[cc] ... a pripadne to zarovnaval na nejake 1MB boundary nebo ukladal index kde ktery record zacina.

Btw podle jakych podklicu to chcete agregovat? Na beznou celkovou agregaci nepotrebujete lookup prece. A pokud by podklice nasledovali za sebou (napr. casova osa), tak staci flushnout agregaci a zacit s nulou pro dalsi cast.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #17 kdy: 22. 09. 2021, 18:10:36 »
Ještě je možnost kouknout po nějaké off-memory hashmapě pro javu, něco jako https://github.com/jankotek/MapDB nebo podobně.  Nemám s tím zkušenost, ale bylo by to něco mezi řešením v paměti a řešením pomocí externí databáze.

To vypada opravdu zajimave, diky

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #18 kdy: 22. 09. 2021, 18:13:59 »
To jsou nazvy sloupcu, uz chapu.
Ten seznam nazvu counteru ale neni uplny ani finalni, muze se celkem dynamicky menit. Kazdy typ metriky mue mit jiny set counteru

muzete mit sloupce nazev_counteru, hodnota

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #19 kdy: 22. 09. 2021, 18:18:16 »
Převeď si to do nějakýho rozumnýho binárního formátu. 90 milionů metrik popíšeš místo stringem o velikosti desítek bajtů jedním integerem, čímž se Ti velikost zprávy smrskne skoro desetkrát a vejde se Ti to do paměti.....

Dik za hint, tohle mi asi ale nepomuze. Prevod na integer otisk postaci pro agregaci, ja to ale pak zase potrebuju expandovat na puvodni stringy a poslat dale do sveta.

Tak holt prijde ke kazdemu sloupcovemu streamu i bitmapa jestli je field used. Bitmapa bude obsahovat indikaci zda je to null/nenull, coz je 1 Gbit/hodinu * pocet sloupcu. A pro expandovani random N-teho zaznamu se holt musi spocitat kolik jich bylo prazdnych predtim, ale pokud delas exapanzi jako replay dat od pocatku, tak to je v podstate takova "dekomprese" on the fly :)

Pokud by ale celkovy pocet sloupcu byl hodne velky a kazdy record mel velice nahodny subset poli, bych spis uvazoval o klasickem binarnim DB formatu s indexem pro indikaci jaky je to sloupec - jako:  byte cc (column count), byte field_lookup[cc], unsigned data[cc] ... a pripadne to zarovnaval na nejake 1MB boundary nebo ukladal index kde ktery record zacina.

Btw podle jakych podklicu to chcete agregovat? Na beznou celkovou agregaci nepotrebujete lookup prece. A pokud by podklice nasledovali za sebou (napr. casova osa), tak staci flushnout agregaci a zacit s nulou pro dalsi cast.

Pocet sloupcu (counteru), je velice variabilni. Agregovat chci pouze podle kombinace metadat.

Tady mam jiny priklad pro jiny typ zpravy

Kód: [Vybrat]
"counters": {
"MP_PINHOLE_MEDIA_BUNDLE_CH_PEAK": 0,
"MP_PINHOLE_WORST_MOS_ACTIVE": 10,
"MP_PINHOLE_TOTAL_RTP_RX": 35562640,
"MP_PINHOLE_CH_MULTIPLEXED_PEAK": 0,
"MP_PINHOLE_EVS_MODE_CHANGE_UE_INIT": 0,
"MP_PINHOLE_RTCP_APP_AMR_IO_SWITCH_SENT": 0,
"MP_PINHOLE_DSP_ALLOC_ATTEMPTS_AMR": 0,
"MP_PINHOLE_WITH_CODEC_G722": 0,
"MP_PINHOLE_DSP_ABORMAL_REL_EVS": 0,
"MP_PINHOLE_DSP_ALLOC_ATTEMPTS_OTHERS": 0,
"MP_PINHOLE_DSP_ALLOC_FAIL_CONGESTION_AMRWB": 0,
"MP_PINHOLE_DSP_ALLOC_ATTEMPTS_AMRWB": 1987,
"MP_PINHOLE_VIDEO_TRANSCODING_CH_PEAK": 0,
"MP_PINHOLE_DSP_NORMAL_REL_EVS": 1009,
"MP_PINHOLE_WITH_CODEC_G729": 0,
"MP_PINHOLE_CRYPTO_CHANNEL_PEAK": 0,
"MP_PINHOLE_WORST_R_FACTOR_ACTIVE": 7,
"MP_PINHOLE_VIDEO_TRANSCODING": 0,
"MP_PINHOLE_CH_MULTIPLEXED": 0,
"MP_PINHOLE_DSP_ABORMAL_REL_OTHERS": 0,
"MP_PINHOLE_AUDIO_TRANSCODING_CH_PEAK": 6700274,
"MP_PINHOLE_EVS_SWITCH_TO_AMR_WB_IO": 0,
"MP_PINHOLE_EVS_MODE_CHANGE_UAG_INIT": 0,
"MP_PINHOLE_RTCP_APP_AMR_IO_SWITCH_RCVD": 0,
"MP_PINHOLE_WITH_CODEC_EVS_AMR_WB_IO": 0,
"MP_PINHOLE_DSP_ALLOC_SUCCESS_AMRWB": 1986,
"MP_PINHOLE_MEAN_MOS_ACTIVE": 44,
"MP_PINHOLE_DSP_ALLOC_ATTEMPTS_EVS": 1018,
"MP_PINHOLE_WITH_CODEC_EVS_PRIMARY": 0,
"MP_PINHOLE_TOTAL_OUT_OF_SEQ": 0,
"MP_PINHOLE_WITH_CODEC_G711": 0,
"MP_PINHOLE_DSP_ALLOC_FAIL_ERROR_OTHERS": 0,
"MP_PINHOLE_FAX_TRANSCODING_CH_PEAK": 0,
"MP_PINHOLE_RTCP_APP_BANDWIDTH_SENT": 0,
"MP_PINHOLE_RTCP_APP_CHNL_AWARE_SENT": 0,
"MP_PINHOLE_DSP_ABORMAL_REL_AMR": 0,
"MP_PINHOLE_FAX_TRANSCODING_CH": 0,
"MP_PINHOLE_CODEC_MODE_CHANGE_UAG_INIT": 0,
"MP_PINHOLE_DSP_NORMAL_REL_AMR": 0,
"MP_PINHOLE_RTCP_APP_CHNL_AWARE_RCVD": 0,
"MP_PINHOLE_TOTAL_PACKET_LOSS": 978004,
"MP_PINHOLE_TOTAL_UDPTL_RX": 0,
"MP_PINHOLE_TIMESTAMP": 1632318300,
"MP_PINHOLE_AVG_PKT_LATENCY": 13,
"MP_PINHOLE_AVG_PKT_SIZE": 84,
"MP_PINHOLE_RTCP_APP_REDUNDNACY_SENT": 0,
"MP_PINHOLE_DSP_ALLOC_FAIL_ERROR_AMR": 0,
"MP_PINHOLE_RTCP_APP_CMR_SENT": 0,
"MP_PINHOLE_RTCP_APP_PRI_SWITCH_SENT": 0,
"MP_PINHOLE_DSP_ALLOC_SUCCESS_AMR": 0,
"MP_PINHOLE_RTCP_APP_BANDWIDTH_RCVD": 0,
"MP_PINHOLE_RTCP_APP_FRAME_AGG_RCVD": 0,
"MP_PINHOLE_CODEC_WITH_T38_UDPTL": 0,
"MP_PINHOLE_RTCP_APP_CMR_RCVD": 0,
"MP_PINHOLE_RTCP_APP_PRI_SWITCH_RCVD": 0,
"MP_PINHOLE_BEST_R_FACTOR_ACTIVE": 98,
"MP_PINHOLE_RTCP_APP_FRAME_AGG_SENT": 0,
"MP_PINHOLE_WITH_CODEC_AMR_WB": 6427,
"MP_PINHOLE_CRYPTO_CHANNEL": 0,
"MP_PINHOLE_CODEC_WITH_T38_RTP": 0,
"MP_PINHOLE_MAX_PKT_LATENCY": 65535,
"MP_PINHOLE_RTCP_APP_PRIMARY_RATE_SENT": 0,
"MP_PINHOLE_BEST_MOS_ACTIVE": 45,
"MP_PINHOLE_DSP_ABORMAL_REL_AMRWB": 0,
"MP_PINHOLE_EVS_SWITCH_TO_PRIMARY": 0,
"MP_PINHOLE_RTCP_APP_PRIMARY_RATE_RCVD": 0,
"MP_PINHOLE_RTCP_APP_REDUNDNACY_RCVD": 0,
"MP_PINHOLE_DSP_ALLOC_SUCCESS_EVS": 1018,
"MP_PINHOLE_DSP_ALLOC_FAIL_ERROR_AMRWB": 0,
"MP_PINHOLE_MEAN_R_FACTOR_ACTIVE": 95,
"MP_PINHOLE_DSP_ALLOC_FAIL_ERROR_EVS": 0,
"MP_PINHOLE_DSP_ALLOC_FAIL_CONGESTION_AMR": 0,
"MP_PINHOLE_WITH_CODEC_AUDIO_OTHERS": 8,
"MP_PINHOLE_WITH_DTMF_TRANSCODING": 0,
"MP_PINHOLE_AUDIO_TRANSCODING_CH": 1125,
"MP_PINHOLE_CODEC_MODE_CHANGE_UE_INIT": 0,
"MP_PINHOLE_MEDIA_BUNDLE_CH": 0,
"MP_PINHOLE_AVG_JITTER": 3867,
"MP_PINHOLE_DSP_ALLOC_FAIL_CONGESTION_OTHERS": 0,
"MP_PINHOLE_DSP_ALLOC_FAIL_CONGESTION_EVS": 0,
"MP_PINHOLE_DSP_NORMAL_REL_AMRWB": 1979,
"MP_PINHOLE_WITH_CODEC_AMR_NB": 446,
"MP_PINHOLE_DSP_NORMAL_REL_OTHERS": 0,
"MP_PINHOLE_CODEC_WITH_G_711_FAX": 0,
"MP_PINHOLE_DSP_ALLOC_SUCCESS_OTHERS": 0,
"MP_PINHOLE_MAX_JITTER": 475028227
},

Tam muze opravdu byt libovolny bordel a v libovolnem poradi


Logik

  • *****
  • 972
    • Zobrazit profil
    • E-mail
Re:Agregace velkeho mnzstvi stream dat
« Odpověď #20 kdy: 22. 09. 2021, 18:22:34 »
Citace
Dik za hint, tohle mi asi ale nepomuze. Prevod na integer otisk postaci pro agregaci, ja to ale pak zase potrebuju expandovat na puvodni stringy a poslat dale do sveta.
A co Ti brání si držet tabulku: "id metriky":"metrika"? S 90 milionama metrik to dělá řádově 4GB.....
To, co je tady problém je opačná otázka, jak dostatečně rychle zjistit ID z metriky, tak aby bylo rozumně spojitý (neděravý) a tedy to zabralo rozumně paměti. A tady mi to zní jako perfektní úloha na nějakou formu "perfektního hashování".

Anebo, ono umístění řetězce v paměti je ideální "perfektní hash", navíc zadarmo a reversibilní :-). Takže se opravdu úloha redukuje na perfektní zahašování stringu. Stačí si uvědomit, že klíč k redukci paměťové náročnosti je v tom, držet řetězec s názvem metriky v paměti pouze jednou a vždy na něj pouze odkazovat.
Pravda, v takto jednoduchém modelu má hashovací klíč 8 bytů, ale to furt není nic, co by se do rozumnýho stroje do paměti nevešlo. A popř. jde pak ještě perfektně hashovat ten 64bit pointer do nějakého 32bit čísla, kdybys na tom chtěl pálit programátorskej čas (jakože kdyžtak dokoupit paměť by vyšlo levnějc :-)).

EDIT: A nebo zůstat u indexace 4bit integerem, a název metriky držet v datové struktuře spolu s agregovanými hodnotami, to je vlastně ještě přirozenější, než mít "extra slovník".
« Poslední změna: 22. 09. 2021, 18:24:12 od Logik »

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #21 kdy: 22. 09. 2021, 18:35:41 »
Citace
Dik za hint, tohle mi asi ale nepomuze. Prevod na integer otisk postaci pro agregaci, ja to ale pak zase potrebuju expandovat na puvodni stringy a poslat dale do sveta.
A co Ti brání si držet tabulku: "id metriky":"metrika"? S 90 milionama metrik to dělá řádově 4GB.....
To, co je tady problém je opačná otázka, jak dostatečně rychle zjistit ID z metriky, tak aby bylo rozumně spojitý (neděravý) a tedy to zabralo rozumně paměti. A tady mi to zní jako perfektní úloha na nějakou formu "perfektního hashování".

Anebo, ono umístění řetězce v paměti je ideální "perfektní hash", navíc zadarmo a reversibilní :-). Takže se opravdu úloha redukuje na perfektní zahašování stringu. Stačí si uvědomit, že klíč k redukci paměťové náročnosti je v tom, držet řetězec s názvem metriky v paměti pouze jednou a vždy na něj pouze odkazovat.
Pravda, v takto jednoduchém modelu má hashovací klíč 8 bytů, ale to furt není nic, co by se do rozumnýho stroje do paměti nevešlo. A popř. jde pak ještě perfektně hashovat ten 64bit pointer do nějakého 32bit čísla, kdybys na tom chtěl pálit programátorskej čas (jakože kdyžtak dokoupit paměť by vyšlo levnějc :-)).

EDIT: A nebo zůstat u indexace 4bit integerem, a název metriky držet v datové struktuře spolu s agregovanými hodnotami, to je vlastně ještě přirozenější, než mít "extra slovník".

To je pekny hint.
Jenom bych to zevseobecnil na obousmernou mapu (dve mapy) id_retezce/retezec,to by mohlo pomoct. A pouzit to pro vsecky nazvy metrik, metadat, counteru.
Ale to bude imho porad moc, i kdybych to zdrcnul na 100B per agregovany zaznam (coz je v pripade Java Collections IMHO nemozne), porad je to 7GB RAM

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #22 kdy: 22. 09. 2021, 19:18:35 »
Ked uz tam mas niekde postgres tak ta asi bude zaujimat toto: https://www.timescale.com/

Robi to presne to co pozadujes. Naviac ti to poskytne efektivnejsie a bezpecnejsie ulozisko ako kafka...

Logik

  • *****
  • 972
    • Zobrazit profil
    • E-mail
Re:Agregace velkeho mnzstvi stream dat
« Odpověď #23 kdy: 22. 09. 2021, 19:27:10 »
Pokud bys šel tímdle směrem, tak to holt je úloha spíš na C/C++, byť i v Javě se asi dá psát úsporně (např. si předalokovat bytový vektor a z něho pak "alokovat" stringy sám, ale pak je trochu otázka, proč to psát v JAVĚ.... - ale jinak ohledně toho mě neber moc vážně, Javu znam z rychlíku).
Na C++ existuje Kafka klient:https://docs.confluent.io/clients-librdkafka/current/overview.html
A ohledně 7GB paměti - to je hodně? 8GB modul stojí litr, pokud ten stroj, kde to má běžet, by to fakt nezvlád.... Oproti ceně, za "kterou to napíšeš", to jsou drobný.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #24 kdy: 22. 09. 2021, 19:29:07 »
Pripadne ak chces predasa len programovat, tak by som nevymyslal koleso, ale pouzil RRD tool. Ten by mal mat implementaciu hadam v kazdom jazyku.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #25 kdy: 22. 09. 2021, 19:32:37 »
Ked uz tam mas niekde postgres tak ta asi bude zaujimat toto: https://www.timescale.com/

Robi to presne to co pozadujes. Naviac ti to poskytne efektivnejsie a bezpecnejsie ulozisko ako kafka...

Timescale bezne pouzivam, ale ja resim jiny problem.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #26 kdy: 22. 09. 2021, 19:37:27 »
Ked uz tam mas niekde postgres tak ta asi bude zaujimat toto: https://www.timescale.com/

Robi to presne to co pozadujes. Naviac ti to poskytne efektivnejsie a bezpecnejsie ulozisko ako kafka...

timescale zrovna moc efektivni ve vyuziti mista na disku neni, ma spatnou kompresi, pokud nepotrebujete delat nejake fancy dotazy, tak bych se mu vyhnul.

a nemusi stihat zapis, 1 mld za hodinu je celkem dost dat.

tady jsou nejake benchmarky

https://altinity.com/blog/clickhouse-for-time-series

https://github.com/timescale/tsbs

rozdil celkem vyrazny 26GB timescale vs 0.5GB influx vs 1.2GB clickhouse
« Poslední změna: 22. 09. 2021, 19:39:32 od A.P.Hacker »

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #27 kdy: 22. 09. 2021, 19:39:29 »
Pokud bys šel tímdle směrem, tak to holt je úloha spíš na C/C++, byť i v Javě se asi dá psát úsporně (např. si předalokovat bytový vektor a z něho pak "alokovat" stringy sám, ale pak je trochu otázka, proč to psát v JAVĚ.... - ale jinak ohledně toho mě neber moc vážně, Javu znam z rychlíku).
Na C++ existuje Kafka klient:https://docs.confluent.io/clients-librdkafka/current/overview.html
A ohledně 7GB paměti - to je hodně? 8GB modul stojí litr, pokud ten stroj, kde to má běžet, by to fakt nezvlád.... Oproti ceně, za "kterou to napíšeš", to jsou drobný.

Javu preferuju, protoze se v ni dela mnohem snaz, nez v C++, snazsi DevOps, navic ma jit o rozvoj existujiciho programu.
Navic konfigurace memory heapu pro JVM v takovych velikostech uz neni zadek, aby ta RAM byla vyuzivana aspon trochu efektivne.
Nez C++, to bych to pro tento ucel radsi delal v GO. Ale jako prvni pokus zkusim ten MapDB projekt.
To by mohlo fungovat bez excesivniho usili at uz programatorskeho, nebo DevOps.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #28 kdy: 22. 09. 2021, 19:46:58 »
Jj, ze mas velky pocet dopredu neurcenych klucov, ktorych hodnoty chces agregovat... To by malo ist aj v timescale.

Alternativou by bola PipelineDB, ta bola tiez ako extension pre postgres, nieco ako rrdtool pre velke objemy dat. Neviem ako je to s nimi teraz, po tom co ich kupila confluent tak pre aktualny postgres asi nebude.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #29 kdy: 22. 09. 2021, 19:50:28 »
Ale jako prvni pokus zkusim ten MapDB projekt.

nejhorsi mozna volba, kdyz uz chcete ukladat json, tak je efektivnejsi Mongo.