Fórum Root.cz
Hlavní témata => Vývoj => Téma založeno: Xgamer8 06. 01. 2012, 17:10:44
-
Dobrý deň,
učim sa pracovať v jave s viacerymi vlaknami, a tak som si vymyslel take zadanie, že pomocou knihovne Jsoup zparsujem html stranku a vytiahnem z nej všetky odkazy. Tieto odkazy uložim do fronty. Podla toho či bol už dosiahnuty počet odkazov tak sa odkazy pridavaju. Pre lepšiu efektivitu chcem načitavať stranky vo viacerych vlaknach. Na to použivam Thread Pool.
Ako frontu použivam ArrayBlockingQueue. Problem je v tom, že mi to nefunguje tak ako by malo. ExecutorService sice spustí vlakno ale neprida do fronty žiadne nové odkazy ale pokial vo vnutornej triede CollectThread zavolam v konštruktore metodu run() tak to funguje ale vytvoria sa duplicitne zaznamy( a volat v konštruktore run() je aj tak asi blbosť nie?:) ) Keď ju tam nevolam tak to funguje tak že tam prida niektore odkazy, inak je tam proste null.
Snažim sa prist na to prečo to má take správanie, napadá ma iba to že je to nejako blbo sychronizovane ale nejako si v tom neviem najst chybu a opraviť ju. Preto by som chcel poprosiť niekoho kto sa rozumie problematike aby sa na to pozrel a poradil mi kde robim chybu a hlavne prečo mi to nefunguje tak ako si to prestavujem:). Ďakujem
Tu je kod:
package collector;
import java.io.IOException;
import java.util.concurrent.*;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
public class Collector {
private String url;
private int links; //predanie parametru konštruktora
private LinksCounter clinks; //count links
private ArrayBlockingQueue<String> LinksList;
public Collector(String baseURL, int Nlinks){
this.url = baseURL;
this.links = Nlinks;
this.clinks = new LinksCounter(0);
this.LinksList = new ArrayBlockingQueue<String>(links);
try {
LinksList.add(url);
CollectFromWeb();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private synchronized void GetLinksList(Document html) throws InterruptedException{
Elements collectedLinks = html.select("a[href]");
for(Element link:collectedLinks){
if(clinks.get() == links) break;
else{
String current = link.attr("abs:href");
if(!current.equals(url) && current.startsWith(url) && !current.contains("#")){
LinksList.put(current);
clinks.up();
}
}
}
}
private void CollectFromWeb() throws InterruptedException{
ExecutorService executor = Executors.newFixedThreadPool(4);
for(int i = 0; i < links; i++){
CollectThread worker = new CollectThread(LinksList.take());
executor.execute(worker);
System.out.println("Collecting from "+LinksList.peek());
}
executor.shutdown();
System.out.println(clinks.get());
}
public void WriteOutQueue(){
while(!LinksList.isEmpty()){
System.out.println(LinksList.poll());
}
System.out.println(clinks.get());
}
public void DuplicityTester(){}
class CollectThread implements Runnable{
private String url;
public CollectThread(String url) {
this.url = url;
}
@Override
public void run() {
//ak je potrebova zbierame linky
if(clinks.get() != links){
try {
Document html = Jsoup.connect(url).get();
GetLinksList(html);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName());
}
}
class LinksCounter{
private int count;
public LinksCounter(int num){
this.count = num;
}
public synchronized int get(){
return this.count;
}
public synchronized void up(){
this.count++;
}
}
}
-
Nechybi ti tam nejaky cyklus v run() ? Takhle vytvoris links vlaken, v kazdem zpracujes jeden odkaz a koncis.
-
To rieši metoda GetLinksList() a tam je foreach cyklus ktory prechádza jednotlive Elements(datový typ Jsoup knihovne).
-
No to neresi. Ta metoda nevytvari nova vlakna, jen pridava linky. Kazde vlakno zpracuje jeden link a konec.
-
A jestli to dobre chapu, tak je tam chyba v navrhu v tom, ze kod co se ma vykonavat vsemi vlakny(private synchronized void GetLinksList) je synchronizovan. Takze ho bykonava vzdy jen jedno vlakno, protoze mas jen jednu instanci Collectoru
-
Ako chyba v navrhu je dost možna... kedže sa s threadmi len zoznamujem.. .ale ked je to synchronizovat to znamena že v jeden okamih tam má pristup len jedno vlakno nie? takže ked si vlakno dokonči svoju robotu zase do toho može pristupit nove vlakno. No ale skušal som to aj tak že to tam nemal metodu, že som mal obsah metody priamo v rune ale aj tak to nerobilo to čo malo ...:(
-
Me ten kod neni moc jasny. Promenna links je cilovy pocet odkazu, ktere chces najit? Z kodu to vypada, ze to je vlastne pocet vlaken, ktere spustis. No a protoze v tom run() nemas zadny cyklus, tak je to taky pocet linku, ktere projdes
-
links je počet dokumentov ktore sa maju spracovať. Moja povodna idea bola uplne jednoducha. Použijem 4 vlakna ktoré sa budu znovu používať až dovtedy pokial nebude dosiahnuty počet požadovaných dokumentov. Každý thread sa bude starat o jeden dokument. Pri realizácii som sa však v tom asi už stratil. K tomu cyklu v rune to tam mam ako dat cyklus while( pocet linkov != pocet pozadovanych linkov)? Ved keby je to tak tak to bude stále len v jednom tom threade nie? Čiže všetko vykoná jeden thread sam?.
-
Ty ty vlakna ale nevytvoris ctyri, ale vytvoris jich tolik, kolik je hodnota links. Ten Executor pak akorat zaridi, ze z te kupy vlaken najednou pobezi jen ctyri. Fatalni chyba je tady ale predevsim v nepochopeni Runnable a run(). Vykonavani vlakna skonci tehdy, kdyz skonci metoda run().
-
Trochu jsem prepsal ten tvuj zamer do funkcniho programu. Staci spustit
package collector;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
public class Collector {
static final ArrayBlockingQueue<String> unprocessedLinks = new ArrayBlockingQueue<String>(100);
static final Set<String> processedLinks = new HashSet<String>();
static int runningWorkers;
static int lastWorkerHit;
final int threadCount=4;
final int targetCount;
public Collector(String baseURL, int targetCount) throws InterruptedException {
unprocessedLinks.add(baseURL);
this.targetCount=targetCount;
}
public void start(){
for (int i = 0; i < threadCount; i++)
new Thread(new Worker(i)).start();
new Thread(new StatusWorker()).start();
}
private class Worker implements Runnable {
private int workerNumber;
public Worker(int workerNumber) {
runningWorkers += 1;
this.workerNumber = workerNumber+1;
}
@Override
public void run() {
String currentLink;
System.out.println("Worker #"+workerNumber+" started");
while (processedLinks.size()<targetCount) {
try {
currentLink=unprocessedLinks.take();
processedLinks.add(currentLink);
Document html = Jsoup.connect(currentLink).get();
Elements collectedLinks = html.select("a[href]");
for (Element element : collectedLinks) {
String newLink = element.attr("abs:href");
if (!newLink.contains("#") && !processedLinks.contains(newLink))
unprocessedLinks.add(newLink);
}
lastWorkerHit=workerNumber;
} catch (Exception e) {
//who cares? We've got plenty of links
}
}
runningWorkers--;
System.out.println("Worker #"+workerNumber+" stopped");
}
}
private static class StatusWorker implements Runnable {
@Override
public void run() {
while (runningWorkers > 0) {
System.out.println("Processed:" + processedLinks.size() + " Unprocessed:" + unprocessedLinks.size()+" Last link processed by worker #"+lastWorkerHit);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
}
System.out.println("StatusWorker stopped");
System.out.println("=====PROCESSED LINKS=====");
for (String link: processedLinks)
System.out.println(link);
}
}
public static void main(String[] args)throws Exception{
Collector c = new Collector("http://www.root.cz", 100);
c.start();
}
}
P.S. Doufam, ze to neni domaci ukol a ze nejsi ten bezradnej moravskej zabijak Googlu
-
Neni to ukol :) Snažim sa to len pochopiť:) Má to služit ako sample ku lušteniu monoalfabeticky a polyalfabetickych šifier. Tento zberač sa má starat o ziskanie frekvenčnej analyzy znakov z webu. Ďakujem:)
PS: zabijam s tým cely deň:D a vy ste to za hodinu spravili, klobuk dolu :)
-
No popravde jsem se spis zasek na tom, kolik zabavy si s tim uziju..
Jeste to podstatne vylepsuju(uz davam cca 500 linku za vterinu) a zkousim ruzne blbiny. Treba za jak dlouho najdu link obsahujici "porn", kdyz zacnu na ruznych strankach :D
P.S. Root si stoji docela dobre:)
-
:D Hej ma to všeliake využitie:)
-
Nevis, co mam dat misto tohohle
html.select("a[href]");
abych dostal emailovy adresy? ;D
-
Tak to veru neviem :D
-
Ešte by som sa chcel spýtať, pozeral som na ten tvoj kod, snažil som sa ho pochopiť..:) ale mam otazku, ako spraviť aby som mohol vykonať ešte niečo v maine po volaní metody start()? Lebo ak to dobre chápem tak main si prebehne spustia sa tie thready v triede Collector a potom main už len čaka na to dokedy sa dokončia, čiže v nom už nemožem nič spravit. Ako by sa dal spravit nejaky test či už boli tie thready dokončene aby sa v maine mohlo ešte niečo vykonat?
-
To mas v te tride StatusWorker, ktera vypisuje kolik uz jsi nasel linku. Kdyz se ty vlakna co hledaji linky ukonci(while (runningWorkers > 0)while (runningWorkers > 0)), tak prestane vypisovat status a vypise ti nalezene linky a skonci.
-
Ano, ale to ukončí aj main, kedže dopehne aj StatusWorker, lenže ja by som potreboval ešte priamo v maine pracovať so ziskanými datami. Z tej analyzy vytvaram mapu znakov ktora je premmenna triedy Collector, avšak potrebujem s nou pracovat priamo v maine, kedže to predavam dalšiemu objektu atd, teda v main metode by som potreboval kontrolovat stav threadov aby ked už dobehnut thready aby pokračoval v maine.
-
Skús Thread.join().
-
Thread.join() nefunguje, zamrzne cela aplikacia, vyriešil som to pridanim riadku while (!executor.isTerminated()) {} do metody start() avšak mam ju upravenu do tejto podoby:
public void start(){
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
CollectThread workers[] = new CollectThread[nThreads];
for(int i = 0; i < nThreads; i++){
workers[i]= new CollectThread();
executor.execute(workers[i]);
}
executor.shutdown();
while (!executor.isTerminated()) {}
Ak to dobre chápem tak sa z tej metody nevyskočí dovtedy pokial maju thready čo robiť. Avšak neviem či je to zrovna idealne riešenie ale funguje:)
-
No popravde jsem se spis zasek na tom, kolik zabavy si s tim uziju..
Jeste to podstatne vylepsuju(uz davam cca 500 linku za vterinu)
Nechcel by si sa podelit o to ako si to tak zrýchlil?:)
-
Experimentovanim jsem dostal ten kod do stavu, ktery bych nerad nekam posilal :)
Kazdopadne tam je porad hned nekolik brzd: ArrayBlockingQueue je v tomto pripade podle me hloupost, protoze se ti ta vlakna zasekavaji jenom kvuli tomu, ze nemuzou pridavat vysledky. Taky snadno nezajistis, abys nepridaval neco, co uz jsi pridal. Nejaky set bude lepsi, nebo aspon drzet vedle toho set, abyses moh ptat na existenci s logaritmickou slozitosti.
Pocet vlaken muzes dat podstatne vyssi, nez je pocet jader, protoze je stejne bude brzdit IO wait. Nejlip mi to beha tak pri cca 50 vlaknech. Problem je, ze najednou dostanes treba dvacet linku, ktere zkapou na timeout. Aby se ti nezaseklo hodne vlaken najednou, tak bud potrebujes vybitat linky nahodne, nebo pouzit nejake sofistikovanejsi metody(radit vysledky podle rychlosti nacteni toho, co prave zpracovavas). No bylo by toho asi dost, ale uz tim asi nechci travit dalsi cas :-\
-
Jasne, aj tak veľmi pekne dakujem dosť si mi pomohol:)
-
Odo mna par postrehov, len co som letmo zbadal.
Davajte si pozor na counting thread workerov - myslim ze v Jave 6 by mali mat primitivne typu (okrem long a double) pristup do pamate ako atomicku operaciu. To ale neznamena, ze --- worker++ --- je atomicka operacia - tu si treba uvedomit,ze ++ sa sklada z troch operacii, ktore su sice kazda atomicka (load, add, store), ale spolu nie su atomicke a musia sa lockovat (alebo vymysliet nieco ine)
VOLATILE vam tak isto nezaruci uplnu atomickost bez side-effects, ktore sa mozu vyskytnut (preto sa pouziva napriklad iba pri boolean-och, kde potrebujete mat istotu,ze je to true a iny side-effect (nieco, cokolvek ine ako true) vam neprekaza --- preto sa volatile moc neodporuca pri ciselnych typoch, tam je bud konkretne cislo alebo nieco ide dost odlisny stav).
-
Odo mna par postrehov, len co som letmo zbadal.
Davajte si pozor na counting thread workerov - myslim ze v Jave 6 by mali mat primitivne typu (okrem long a double) pristup do pamate ako atomicku operaciu. To ale neznamena, ze --- worker++ --- je atomicka operacia - tu si treba uvedomit,ze ++ sa sklada z troch operacii, ktore su sice kazda atomicka (load, add, store), ale spolu nie su atomicke a musia sa lockovat (alebo vymysliet nieco ine)
VOLATILE vam tak isto nezaruci uplnu atomickost bez side-effects, ktore sa mozu vyskytnut (preto sa pouziva napriklad iba pri boolean-och, kde potrebujete mat istotu,ze je to true a iny side-effect (nieco, cokolvek ine ako true) vam neprekaza --- preto sa volatile moc neodporuca pri ciselnych typoch, tam je bud konkretne cislo alebo nieco ide dost odlisny stav).
S tim ++ mas pravdu. Dokonce jsem na to myslel, kdyz jsem to psal a pak jsem to nejak vypustil z hlavy :D Nejlepsi bude asi AtomicInteger. Jinak ja s vlakny nikdy pracovat nemusel(vyjma pripadu, kdy me k tomu nuti framework) a znalosti mam tedy jenom nactene, neproverene praxi. Tohle je jen na rychlo spichnuta funkcni implementace a urcite by se to dalo napsat mnohem lepe...