Agregace velkého množství streamovaných dat

Agregace velkého množství streamovaných dat
« kdy: 22. 09. 2021, 16:59:19 »
Dotaz do think tanku na moznosti zpracovani velkeho mnozstvi stream dat.

Kafka mi posila stream metric dat v JSON formatu.
Dat je hodne, 90 mil ruznych typu metrik, kazda prijde s periodou 5 minut, tedy za hodinu 90*12=1.08 miliardy zprav.
Format (zjednoduseny) je takovyto, realna jedna zprava ma cca 1.5kB dat.

Kód: [Vybrat]
{
"metric": "metric1"
"metadata": {
"nodeid": "node1",
"cardid": "card1",
"portid": "port1",
"lvidid": "lvid1"
},
"counters": {
"MP_TOTAL_TRANS_AUDIO_SESSIONS": 0,
"MP_PEAK_TRANS_VIDEO_SESSIONS": 0,
"MP_ACTIVE_TRANS_DTMF_SESSION": 0,
"MP_ACTIVE_AUDIO_SESSIONS": 3,
"MP_PEAK_SRTP_SESSIONS": 0,
"MP_PEAK_TRANS_AUDIO_SESSIONS": 0,
"MP_TOTAL_SRTP_SESSIONS": 0,
"MP_PEAK_TRANS_DTMF_SESSION": 0,
"MP_TOTAL_AUDIO_SESSIONS": 18215,
"MP_TOTAL_VIDEO_SESSIONS": 6,
"MP_PEAK_AUDIO_SESSIONS": 14,
"MP_PEAK_VIDEO_SESSIONS": 2,
"MP_ACTIVE_TRANS_AUDIO_SESSIONS": 0,
"MP_ACTIVE_SRTP_SESSIONS": 0,
"MP_TOTAL_ROGUE_SESSIONS": 0,
"MP_ACTIVE_VIDEO_SESSIONS": 0,
"MP_ACTIVE_TRANS_VIDEO_SESSIONS": 0,
"MP_TOTAL_TRANS_VIDEO_SESSIONS": 0
}
}


Mam existujici (muj) Java programek, ktery se napoji na kafku, nacita zpravy, mirne transformuje a posila na soket k dalsimu zpracovani. Load to zvlada bez problemu.

Nyni potrebuju nad temito daty provadet hodinove agregace, pricemz dale potrebuju agregovat podle subsetu metadat.
Pro priklad dat vyse, potrebuju agregovat countery (min/max/avg) pro kombinaci metadat nodeid,cardid,portid - hodnota lvidid se muze menit, potrebuju zachovat hodnotu lviid z posledni prijate zpravy pro kombinaci nodeid,cardid,portid.
Vysledkem ma byt obdobny JSON, kde blok metadat bude obsahovat metadata z posledni zpracovane zpravy a blok agregovanych counteru bude pro kazdy counter obsahovat sadu agregovanych dat sum/count/min/max - (avg=sum/count). Nejak takhle:

Kód: [Vybrat]
{
"metric": "metric1"
"metadata": {
"nodeid": "node1",
"cardid": "card1",
"portid": "port1",
"lvidid": "lvid23"  # hodnota metadat z posledni zpravy
},
"counters": {
"MP_TOTAL_TRANS_AUDIO_SESSIONS": [120, 20, 0, 30], # sum/count/min/max
"MP_PEAK_TRANS_VIDEO_SESSIONS": [130, 25, 0, 32],
.
.
.
}
}

Muj prvni naivni navrh je:
- pustim kafka replay od hodiny X do X+1
- v mym Java programku na cteni kafka streamu si udelam HashMapu, kde key bude kombinace nodeid,cardid,portid a value bude bean se string metadata (raw JSON) a HashMapa <countername, aggvalues array>
- pro kazdou prichozi zpravu si sestavim key (nodeid,cardid,portid) a v HashMape pro dany key vytvorim/updatuju bean (metadata string se preplacne, agregovane hodnoty prepocitaji)
- az probehnu cely kafla replay - vysmahnu ven HashMapu v JSONu

Problem je s velikosti dat. Vyse popsanou agregaci se dostanu na cca 70 milionu keys v HashMape. Pokud pocitam, ze jeden value v HashMape zabere cca 2kB RAM, celkem ta HashMapa zabere 130GB RAM - nesmysl.

Muj druhy naivni navrh, jako HashMapu pouziju Postgresa, ve kterym bude jedna tabulka reprezentujici vyse popsanou HashMapu ve formatu
Kód: [Vybrat]
(
  key varchar(255),
  aggdata jsonb
)
A na postgresu bude pro insertovani PLSQL procedura, ktera provede potrebne vytvoreni/update pole "aggdata jsonb"

Velikost dat by postgres zvladnout mel, jak to bude s rychlosti, netusim.


Prosim, budu vdecen za jakekoliv hinty, jak toto resit, dik.
« Poslední změna: 22. 09. 2021, 22:43:32 od Petr Krčmář »


Re:Agregace velkeho mnzstvi stream dat
« Odpověď #1 kdy: 22. 09. 2021, 17:18:49 »
kokud to jde nejak rozlozit do sloupcu, lil bych do sloupcove databaze, treba clickhouse

ukladat to jako JSON je nesmysl, mimo jine prijdte o moznost komprese

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #2 kdy: 22. 09. 2021, 17:31:35 »
treba takove sloupce, stovky miliard zaznamu by nemel byt problem na terabitovem disku, agregacni dotazy v radu par sekund
Kód: [Vybrat]
datetime
metric
metadata_nodeid
metadata_cardid
metadata_portid
metadata_lvidid
counters_MP_TOTAL_TRANS_AUDIO_SESSIONS
counters_MP_PEAK_TRANS_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_DTMF_SESSION
counters_MP_ACTIVE_AUDIO_SESSIONS
counters_MP_PEAK_SRTP_SESSIONS
counters_MP_PEAK_TRANS_AUDIO_SESSIONS
counters_MP_TOTAL_SRTP_SESSIONS
counters_MP_PEAK_TRANS_DTMF_SESSION
counters_MP_TOTAL_AUDIO_SESSIONS
counters_MP_TOTAL_VIDEO_SESSIONS
counters_MP_PEAK_AUDIO_SESSIONS
counters_MP_PEAK_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_AUDIO_SESSIONS
counters_MP_ACTIVE_SRTP_SESSIONS
counters_MP_TOTAL_ROGUE_SESSIONS
counters_MP_ACTIVE_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_VIDEO_SESSIONS
counters_MP_TOTAL_TRANS_VIDEO_SESSIONS

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #3 kdy: 22. 09. 2021, 17:32:09 »
kokud to jde nejak rozlozit do sloupcu, lil bych do sloupcove databaze, treba clickhouse

ukladat to jako JSON je nesmysl, mimo jine prijdte o moznost komprese

Ja to nepotrebuju ukladat, jenom zpracovat agregaci a poslat dal do sveta

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #4 kdy: 22. 09. 2021, 17:35:21 »
treba takove sloupce, stovky miliard zaznamu by nemel byt problem na terabitovem disku, agregacni dotazy v radu par sekund
Kód: [Vybrat]
datetime
metric
metadata_nodeid
metadata_cardid
metadata_portid
metadata_lvidid
counters_MP_TOTAL_TRANS_AUDIO_SESSIONS
counters_MP_PEAK_TRANS_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_DTMF_SESSION
counters_MP_ACTIVE_AUDIO_SESSIONS
counters_MP_PEAK_SRTP_SESSIONS
counters_MP_PEAK_TRANS_AUDIO_SESSIONS
counters_MP_TOTAL_SRTP_SESSIONS
counters_MP_PEAK_TRANS_DTMF_SESSION
counters_MP_TOTAL_AUDIO_SESSIONS
counters_MP_TOTAL_VIDEO_SESSIONS
counters_MP_PEAK_AUDIO_SESSIONS
counters_MP_PEAK_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_AUDIO_SESSIONS
counters_MP_ACTIVE_SRTP_SESSIONS
counters_MP_TOTAL_ROGUE_SESSIONS
counters_MP_ACTIVE_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_VIDEO_SESSIONS
counters_MP_TOTAL_TRANS_VIDEO_SESSIONS
To vubec neznam, jak to funguje? Jak k sobe spojim countery co nalezi k urcite metrice?
Je to rozvoj nazvu sloupce?


Re:Agregace velkeho mnzstvi stream dat
« Odpověď #5 kdy: 22. 09. 2021, 17:37:33 »
kokud to jde nejak rozlozit do sloupcu, lil bych do sloupcove databaze, treba clickhouse

ukladat to jako JSON je nesmysl, mimo jine prijdte o moznost komprese

Ja to nepotrebuju ukladat, jenom zpracovat agregaci a poslat dal do sveta

ok, mozna bezpecnejsi to ukladat (alespon par dnu zpet) a agregovat ta data offline

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #6 kdy: 22. 09. 2021, 17:38:29 »
treba takove sloupce, stovky miliard zaznamu by nemel byt problem na terabitovem disku, agregacni dotazy v radu par sekund
Kód: [Vybrat]
datetime
metric
metadata_nodeid
metadata_cardid
metadata_portid
metadata_lvidid
counters_MP_TOTAL_TRANS_AUDIO_SESSIONS
counters_MP_PEAK_TRANS_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_DTMF_SESSION
counters_MP_ACTIVE_AUDIO_SESSIONS
counters_MP_PEAK_SRTP_SESSIONS
counters_MP_PEAK_TRANS_AUDIO_SESSIONS
counters_MP_TOTAL_SRTP_SESSIONS
counters_MP_PEAK_TRANS_DTMF_SESSION
counters_MP_TOTAL_AUDIO_SESSIONS
counters_MP_TOTAL_VIDEO_SESSIONS
counters_MP_PEAK_AUDIO_SESSIONS
counters_MP_PEAK_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_AUDIO_SESSIONS
counters_MP_ACTIVE_SRTP_SESSIONS
counters_MP_TOTAL_ROGUE_SESSIONS
counters_MP_ACTIVE_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_VIDEO_SESSIONS
counters_MP_TOTAL_TRANS_VIDEO_SESSIONS
To vubec neznam, jak to funguje? Jak k sobe spojim countery co nalezi k urcite metrice?
Je to rozvoj nazvu sloupce?

nazev metriky mate v sloupci metric

Logik

  • *****
  • 1 031
    • Zobrazit profil
    • E-mail
Re:Agregace velkeho mnzstvi stream dat
« Odpověď #7 kdy: 22. 09. 2021, 17:39:37 »
Převeď si to do nějakýho rozumnýho binárního formátu. 90 milionů metrik popíšeš místo stringem o velikosti desítek bajtů jedním integerem, čímž se Ti velikost zprávy smrskne skoro desetkrát a vejde se Ti to do paměti.....

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #8 kdy: 22. 09. 2021, 17:43:54 »
treba takove sloupce, stovky miliard zaznamu by nemel byt problem na terabitovem disku, agregacni dotazy v radu par sekund
Kód: [Vybrat]
datetime
metric
metadata_nodeid
metadata_cardid
metadata_portid
metadata_lvidid
counters_MP_TOTAL_TRANS_AUDIO_SESSIONS
counters_MP_PEAK_TRANS_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_DTMF_SESSION
counters_MP_ACTIVE_AUDIO_SESSIONS
counters_MP_PEAK_SRTP_SESSIONS
counters_MP_PEAK_TRANS_AUDIO_SESSIONS
counters_MP_TOTAL_SRTP_SESSIONS
counters_MP_PEAK_TRANS_DTMF_SESSION
counters_MP_TOTAL_AUDIO_SESSIONS
counters_MP_TOTAL_VIDEO_SESSIONS
counters_MP_PEAK_AUDIO_SESSIONS
counters_MP_PEAK_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_AUDIO_SESSIONS
counters_MP_ACTIVE_SRTP_SESSIONS
counters_MP_TOTAL_ROGUE_SESSIONS
counters_MP_ACTIVE_VIDEO_SESSIONS
counters_MP_ACTIVE_TRANS_VIDEO_SESSIONS
counters_MP_TOTAL_TRANS_VIDEO_SESSIONS
To vubec neznam, jak to funguje? Jak k sobe spojim countery co nalezi k urcite metrice?
Je to rozvoj nazvu sloupce?

nazev metriky mate v sloupci metric

To jsou nazvy sloupcu, uz chapu.
Ten seznam nazvu counteru ale neni uplny ani finalni, muze se celkem dynamicky menit. Kazdy typ metriky mue mit jiny set counteru

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #9 kdy: 22. 09. 2021, 17:46:05 »
kokud to jde nejak rozlozit do sloupcu, lil bych do sloupcove databaze, treba clickhouse

ukladat to jako JSON je nesmysl, mimo jine prijdte o moznost komprese

Ja to nepotrebuju ukladat, jenom zpracovat agregaci a poslat dal do sveta

ok, mozna bezpecnejsi to ukladat (alespon par dnu zpet) a agregovat ta data offline

Ok, diky za hint, ulozene by to melo byt v kafce, co neni v kafce, to neexistuje. Melo by stacit si do sloupcove DB ulozit pouze tu jednu aktualne zpracovavanou hodinu.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #10 kdy: 22. 09. 2021, 17:49:18 »
kokud to jde nejak rozlozit do sloupcu, lil bych do sloupcove databaze, treba clickhouse

ukladat to jako JSON je nesmysl, mimo jine prijdte o moznost komprese

Ja to nepotrebuju ukladat, jenom zpracovat agregaci a poslat dal do sveta

ok, mozna bezpecnejsi to ukladat (alespon par dnu zpet) a agregovat ta data offline

Ok, diky za hint, ulozene by to melo byt v kafce, co neni v kafce, to neexistuje. Melo by stacit si do sloupcove DB ulozit pouze tu jednu aktualne zpracovavanou hodinu.

ok, ale ta kafka zbytecne plytva mistem, v sloupcove DB neni problem ulozit nekolik dnu do cca desitek GB. Miliarda radku za hodinu nejsou pro sloupcove databaze velka data.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #11 kdy: 22. 09. 2021, 17:49:48 »
Převeď si to do nějakýho rozumnýho binárního formátu. 90 milionů metrik popíšeš místo stringem o velikosti desítek bajtů jedním integerem, čímž se Ti velikost zprávy smrskne skoro desetkrát a vejde se Ti to do paměti.....

Dik za hint, tohle mi asi ale nepomuze. Prevod na integer otisk postaci pro agregaci, ja to ale pak zase potrebuju expandovat na puvodni stringy a poslat dale do sveta.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #12 kdy: 22. 09. 2021, 17:52:07 »
kokud to jde nejak rozlozit do sloupcu, lil bych do sloupcove databaze, treba clickhouse

ukladat to jako JSON je nesmysl, mimo jine prijdte o moznost komprese

Ja to nepotrebuju ukladat, jenom zpracovat agregaci a poslat dal do sveta

ok, mozna bezpecnejsi to ukladat (alespon par dnu zpet) a agregovat ta data offline

Ok, diky za hint, ulozene by to melo byt v kafce, co neni v kafce, to neexistuje. Melo by stacit si do sloupcove DB ulozit pouze tu jednu aktualne zpracovavanou hodinu.

ok, ale ta kafka zbytecne plytva mistem, v sloupcove DB neni problem ulozit nekolik dnu do cca desitek GB. Miliarda radku za hodinu nejsou pro sloupcove databaze velka data.

To je pravda, ale to neni muj boj. Na danem projektu jsem ciste v pozici precerpavace kejdy z kafky dale do sveta.

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #13 kdy: 22. 09. 2021, 17:54:09 »
To je pravda, ale to neni muj boj. Na danem projektu jsem ciste v pozici precerpavace kejdy z kafky dale do sveta.

ok, ale podle me ta agregovana data v JSONu budou na disku vetsi nez by byla "splostela" data v tabulce

Re:Agregace velkeho mnzstvi stream dat
« Odpověď #14 kdy: 22. 09. 2021, 17:54:29 »
Ještě je možnost kouknout po nějaké off-memory hashmapě pro javu, něco jako https://github.com/jankotek/MapDB nebo podobně.  Nemám s tím zkušenost, ale bylo by to něco mezi řešením v paměti a řešením pomocí externí databáze.