Multithreading úloha v Javě

Kamhelot

Multithreading úloha v Javě
« kdy: 20. 10. 2018, 23:35:05 »
Ahoj,
mohli byste se mi, prosím, kouknout na tento kod? Snažím se udělat něco podobného, jako je třída Executor v javě. Čili můžu vytvořit x vláken, které budou mít práci. V tomhle případě generují náhodná čísla a počítají průměr. Když práci dokončí, čekají/běží dál, dokud jim není další přidělena. Všechny vlákna jsou vypnuta pomocí metody cancel() ve Worker nebo inter() v Executor. A mám tam dvě metody "run" - jedna run a druhá "runy" - to jsou vlastně dvě má řešení, jedno obsahuje wait() a druhé je bez. Když chci zrušit vlákna s "run" - tak volám cancel, když "Runy", tak inter() v Executor. Poté tam je noty() - což vlastně započne znova práci, a setNumber - což započné práci pro tu metodu bez wait
Můj dotaz - jsou obě dvě řešení použitelná/správná? Lze to takto provéct?
Pomocí synchronized(this) by nemělo docházet k tomu, že více vláken bude se najednou dostávat k proměnné average, takže by ty výsledky měly být ok. Je to tak?

Mockrát díky

Kód: [Vybrat]
public class Worker extends Thread {

    private int number;
    private double average = 0;
    private volatile boolean cancelled;

    public Worker(int number) {
        this.number = number;
        cancelled = false;
    }

    public void run() {
        synchronized (this) {
            while (!cancelled) {
                if (number > 0) {
                    for (int i = 0; i < number; i++) {
                        average += Math.random();
                        if (number - 1 == i) {
                            number = 0;
                            System.out.println(Thread.currentThread().getName() + " average is: " + average);
                        }
                    }
                }
            }
        }
        System.out.println(Thread.currentThread().getName() + " is cancelled...");
    }

    public void runy() {
        synchronized (this) {
            while (!cancelled) {
                if (number > 0) {
                    for (int i = 0; i < number; i++) {
                        average += Math.random();

                        if (number - 1 == i) {
                            number = 0;
                            System.out.println(Thread.currentThread().getName() + " average is " + average);
                            try {
                                this.wait();
                            } catch (InterruptedException ex) {
                                Thread.currentThread().interrupt();
                                System.out.println("Thread interupted");
                                cancelled = true;
                            }
                        }
                    }
                }
            }
        }
    }

    public void cancel() {
        this.cancelled = true;
    }

    public void noty(int number) {
        synchronized (this) {
            this.notifyAll();
            this.number = number;
        }
    }

    public double getAverage() {
        return this.average;
    }

    public void setNumber(int number) {
        this.number = number;
    }
public class Executor {

    private List<Worker> activeThreads;

    public Executor(){
        activeThreads = new ArrayList<>();
    }

    public void createThreads(int number){
        for(int i = 0; i < number; i++){
            Worker worker = new Worker(1000000);
            worker.setName("Vlákno " + i);
            activeThreads.add(worker);
            worker.start();
        }
    }

    public void cancelThreads(){
        activeThreads.forEach(t -> t.cancel());
    }

    public void startJob(int num){
        activeThreads.forEach(t -> t.setNumber(num));
    }


    public void getActiveThreads(){
        System.out.println(Thread.getAllStackTraces().keySet().size());
    }

    public void noti(int num){
        activeThreads.forEach(t -> t.noty(num));
    }

    public void inter(){
        activeThreads.forEach(t -> t.interrupt());
    }
« Poslední změna: 22. 10. 2018, 08:16:08 od Petr Krčmář »


O.

Re:Multithreading
« Odpověď #1 kdy: 21. 10. 2018, 00:00:25 »
Cele je to nejake divne. Jednak metoda  public double getAverage() {
        return this.average;
    }

neni synchronizovana. Pokud nekde nastavujes average a chces aby byla synchronizovana, tak i cteni musi byt synchronizovane. A navic je to member promena, pro kazde vlakno nova instance average...


tomx

Re:Multithreading
« Odpověď #2 kdy: 21. 10. 2018, 00:13:08 »
Ano, cele je to divne. Jak psal O. Ta prvni metoda ma byt busy wait? Wait neni ve while podmince (hint spurious wake up). Cele ta myslenka je podivna. Opravdu chcete psat vlastni Executor? To bude chyb :/

Java Concurrency in Practice povinne. Pak pochopite jak to napsat a hlavne, ze to nepsat.

borekz

  • ****
  • 492
    • Zobrazit profil
    • E-mail
Re:Multithreading
« Odpověď #3 kdy: 21. 10. 2018, 08:41:36 »
Pokud nekde nastavujes average a chces aby byla synchronizovana, tak i cteni musi byt synchronizovane.
Odkdy je čtení proměnné typu double neatomické ?

ttt

Re:Multithreading
« Odpověď #4 kdy: 21. 10. 2018, 09:00:10 »
Citace
For the purposes of the Java programming language memory model, a single write to a non-volatile long or double value is treated as two separate writes: one to each 32-bit half. This can result in a situation where a thread sees the first 32 bits of a 64-bit value from one write, and the second 32 bits from another write.

Zdroj: https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7


Re:Multithreading
« Odpověď #5 kdy: 21. 10. 2018, 09:07:06 »
Pokud nekde nastavujes average a chces aby byla synchronizovana, tak i cteni musi byt synchronizovane.
Odkdy je čtení proměnné typu double neatomické ?
O ale nepsal, že to není atomické. Synchronizace (jak napovídá název) slouží k tomu, aby se přístup jednotlivých vláken k objektu seřadil v čase za sebe a nestalo se, že vlákno bude pracovat se starou hodnotou. Atomicita je jen zvláštní případ, který zajišťuje, že vlákno nebude pracovat se starou částí hodnoty.

balki

Re:Multithreading
« Odpověď #6 kdy: 21. 10. 2018, 09:32:59 »
Ano, cele je to divne. Jak psal O. Ta prvni metoda ma byt busy wait? Wait neni ve while podmince (hint spurious wake up). Cele ta myslenka je podivna. Opravdu chcete psat vlastni Executor? To bude chyb :/

Java Concurrency in Practice povinne. Pak pochopite jak to napsat a hlavne, ze to nepsat.

+1

Kamhelot

Re:Multithreading
« Odpověď #7 kdy: 21. 10. 2018, 09:55:04 »
Díky za reakce. Ten kod píši z důvodu procvičování, respektive snažím se naučit multithreading. Z toho důvodu bych moc ocenil, kdybyste mi mohli napsat, co je přesně blbě a jak to opravit, aby to bylo správně...
Díky tomu, že vytvářím nové instance, tak to synchronized tam nemusí být, ale jen v tom příkladu bez wait(), že? Protože wait() by mělo být v synchronized bloku, je to tak? A mělo by být ve while podmínce - to znamená, že za tím if(), kde následuje wait() bude nahrazeno while() podmínkou?
Díky

Re:Multithreading
« Odpověď #8 kdy: 21. 10. 2018, 12:42:58 »
Díky za reakce. Ten kod píši z důvodu procvičování, respektive snažím se naučit multithreading. Z toho důvodu bych moc ocenil, kdybyste mi mohli napsat, co je přesně blbě a jak to opravit, aby to bylo správně...
Díky tomu, že vytvářím nové instance, tak to synchronized tam nemusí být, ale jen v tom příkladu bez wait(), že? Protože wait() by mělo být v synchronized bloku, je to tak? A mělo by být ve while podmínce - to znamená, že za tím if(), kde následuje wait() bude nahrazeno while() podmínkou?
Díky
Procvičování je chvályhodné, ale není dobré se vrhnout hned na úroveň "velmi pokročilý". A synchronizace pomocí primitiv v Javě je taková úroveň - i zkušení programátoři s dlouholetou prací v tom nasekají chyby.

Existuje ale pár jednoduchých pravidel:
  • Nepoužívej primitiva (synchronized / wait / notify), používej již hotové třídy z java.util.concurrent: zámky, podmínky, bariéry, fronty, semafory
  • Pokud už chceš použít monitor (synchronized), tak jen velmi triviálně: synchronized sekce kolem čtení a zápisu do atributu (member field).   Žádné wait a notify!
  • Nemíchat monitory: jeden atribut (member field) je vždy zamčen stejným monitorem (pokud to budeš míchat, tak si můžeš uvařit deadlock). Pokud se jeden sychronized blok ocitne uvnitř jiného (pozor, platí i pro volání metod) tak by měly mít stejný monitor (opět - při neopatrné manipulaci hrozí deadlock).
  • V synchrozed sekci pokud možno žádné smyčky, žádné volání cizích objektů a rozhodně žádné I/O (včetně println)!
  • Neučit se metodou pokud/omyl. V případě paralelního kódu neexistuje žádné "otestuji zda to funguje"!

Re:Multithreading
« Odpověď #9 kdy: 21. 10. 2018, 12:45:13 »
Jo a málem bych zapomněl: pozor na rozdíl mezi monitorem a mutexem, v tom mají často začátečníci guláš (hlavně problém zkušených programátorů kteří tak nějak intuitivně předpokládají, že synchronized je mutex, se kterým už dělali).