IT rekvalifikace s garancí práce. Seniorní programátoři vydělávají až 160 000 Kč/měsíc a rekvalifikace je prvním krokem. Zjisti, jak na to!
Hledáme nové posily do ITnetwork týmu. Podívej se na volné pozice a přidej se do nejagilnější firmy na trhu - Více informací.

7. diel - Java server - Zapisovacia vlákno

V minulej lekcii, Java server - Client dispatcher , sme sa zaoberali čakacie frontom s klientmi. Dnes implementujeme odosielanie správ klientovi pomocou samostatného vlákna. Dosiahneme tak pocitu lepšie odozvy servera.

Zapisovacie vlákno

Až doteraz sme posielali správy zo servera vo vlákne, ktoré práve spracovávalo prichádzajúce dáta. Tomu sa hovorí synchrónne spracovania. Problém v tomto prípade je, že sa jedná o I / O operáciu, čo je časovo náročná činnosť. Oveľa lepšie riešenie je spracovávať všetky tieto činnosti v samostatnom vlákne. My si za týmto účelom vytvoríme vlákno, ktoré bude mať jedinú úlohu: odoslať správy zo servera klientovi.

V balíčku core vytvoríme nový balík writer, v ktorom budeme vytvárať triedy týkajúce sa zapisovacieho vlákna. Opäť začneme vytvorením rozhrania, ktorým zadefinujeme metódy. Vytvoríme rozhranie IWriterThread a necháme ho dediť od rozhrania IThreadControl. Rozhranie bude mať jedinú metódu sendMessage(). Metóda bude prijímať dva parametre: ObjectOutputStream, do ktorého sa správa zapíše, a Object, ktorý je samotná správa:

public interface IWriterThread extends IThreadControl {
    void sendMessage(ObjectOutputStream writer, Object message);
}

Ďalej vytvoríme triedu WriterThread, ktorá bude dediť z triedy Thread a implementovať vyššie definované rozhranie:

public class WriterThread extends Thread implements IWriterThread {
}

V triede si zadefinujeme dve inštančný konštanty a dve premenné:

private final Semaphore semaphore = new Semaphore(0);
private final Queue<QueueTuple> messageQueue = new ConcurrentLinkedQueue<>();
private boolean working = false;
private boolean interrupt = false;

Princíp tu prítomného semafore je rovnaký ako v predchádzajúcej lekcii. Vlákno bude na semafore spať, kým nedostane signál, že má pracovať. Fronta správ messageQueue obsahuje správy, ktoré sa budú posielať. Zatiaľ je typová na neznámy typ QueueTuple, ktorý si vzápätí vytvoríme. Premenná working hovorí, či ak vlákno pracuje, alebo spí na semafore. Premenná interrupt má rovnaký účel ako v predchádzajúcich lekciách, kedy sme vytvárali triedy, ktoré dedia od triedy Thread.

Konštruktor triedy necháme tentoraz prázdny. Len za účelom lepšej orientácie dáme vláknu lepší názov. V konstruktoru zavoláme konštruktor triedy Thread s jedným preťažením, ktoré ako parameter berie názov vlákna:

public WriterThread() {
    super("WriterThread");
}

Metódy vlákna

Rozhranie nám hovorí, že musíme implementovať dve metódy: sendMessage() a shutdown(). Pozrieme sa na prvý z nich. Odoslanie správy nebude nič iné, než pridanie správy do frontu správ a v prípade, že vlákno nepracuje, tak ho prebudíme a nastavíme príznak working na hodnotu true:

@Override
public void sendMessage(ObjectOutputStream writer, Object message) {
    messageQueue.add(new QueueTuple(outputStream, message));
    if (!working) {
        working = true;
        semaphore.release();
    }
}

Metóda shutdown() bude rovnaká ako v minulej lekcii:

@Override
public void shutdown() {
    interrupt = true;
    semaphore.release();
    try {
        join();
    } catch (InterruptedException ignored) {}
}

Nastavíme príznak interrupt na hodnotu true, uvoľníme semafor a počkáme, až sa zapisovacej vlákno ukončí.

Výkonný kód vlákna

Teraz implementujeme samotný kód, ktorý sa postará o odoslaní správ v našom vlákne. Metóda run() vyzerá nasledovne:

@Override
public void run() {
    while(!interrupt) {
        while(messageQueue.isEmpty() && !interrupt) {
            try {
                semaphore.acquire();
            } catch (InterruptedException ignored) {}
        }
        working = true;
        while(!messageQueue.isEmpty()) {
            final QueueTuple entry = messageQueue.poll();
            try {
                entry.writer.writeLine(entry.message);
                entry.writer.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        working = false;
    }
}

Kód je veľmi podobný tomu, čo sme implementovali u client Dispatcher. Máme tu nekonečnú slučku závisiaci na premenné interrupt. Nasleduje ďalšia nekonečná slučka pre čakanie vlákna na semafore. Prečo je čakanie vlákna v slučke sme rozoberali opäť v minulej lekcii. V ďalšej slučke sa iteruje cez front správ a postupne sa odošlú všetky správy, ktoré sa nahromadili. Keď vo fronte nie sú žiadne správy, ide vlákno opäť odpočívať na semafor.

Zostáva už len nadefinovať triedu QueueTuple. Ide o jednoduchú prepravku, ktorá bude obsahovať dve konštanty. Nie je náhoda, že tieto konštanty sa získajú v metóde sendMessage(). Kód celej triedy je tu:

private static final class QueueTuple {
    final Object message;
    final ObjectOutputStream writer;

    private QueueTuple(ObjectOutputStream writer, Object message) {
        this.message = message;
        this.writer = writer;
    }
}

Triedu umiestnite do triedy WriterThread, pretože nikde inde ju používať nebudeme.

Prepojenie logiky

V triede ConnectionManager vytvoríme inštančný konštantu zapisovacieho vlákna, ktorú budeme inicializovať v konštruktory z parametra:

private final IWriterThread writerThread;
public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread,
    ExecutorService pool, int maxClients) {
    this.clientDispatcher = clientDispatcher;
    this.writerThread = writerThread;
    this.pool = pool;
    this.maxClients = maxClients;
}

Spustenie vlákna vykonáme v metóde onServerStart() v správcovi spojení hneď po zapnutí client Dispatcher:

writerThread.start();

Ukončenie vlákna vykonáme opäť v metóde onServerStop() po vypnutí client Dispatcher:

writerThread.shutdown();

Ďalej musíme upraviť triedu Client, aby mala prístup k zapisovacímu vláknu. Vytvoríme teda aj v triede Client inštančný konštantu zapisovacieho vlákna, ale budeme ju inicializovať v konstruktoru, ktorý ju získa ako parameter:

Client(Socket socket, IWriterThread writerThread) throws IOException {
    this.socket = socket;
    writer = new ObjectOutputStream(socket.getOutputStream());
    this.writerThread = writerThread;
}

Vytvorenie inštancie triedy Client pri prichádzajúcom spojení nechám na láskavom čitateľovi.

Nakoniec musíme upraviť továreň na výrobu správcu spojenie, pretože sme opäť upravili konštruktor triedy ConnectionManager. V továrni pridáme novú inštančný konštantu typu IWriterThread, ktorú budeme inicializovať v konštruktory pomocou parametra:

private final IWriterThread writerThread;

@Inject
public ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory,
    IWriterThread writerThread) {
    this.clientDispatcherFactory = clientDispatcherFactory;
    this.writerThread = writerThread;
}

Vytvorenie inštancie by už nemalo robiť problémy:

return new ConnectionManager(clientDispatcherFactory.getClientDispatcher(waitingQueueSize), writerThread, pool, maxClients);

To by bolo z dnešnej lekcie všetko. Funkčnosť sme nezmenili, iba vylepšili odozvu servera. Nabudúce, v lekcii Java server - Komunikačný protokol , navrhneme komunikačný protokol.


 

Mal si s čímkoľvek problém? Stiahni si vzorovú aplikáciu nižšie a porovnaj ju so svojím projektom, chybu tak ľahko nájdeš.

Stiahnuť

Stiahnutím nasledujúceho súboru súhlasíš s licenčnými podmienkami

Stiahnuté 18x (138.86 kB)
Aplikácia je vrátane zdrojových kódov v jazyku Java

 

Predchádzajúci článok
Java server - Client dispatcher
Všetky články v sekcii
Server pre klientskej aplikácie v Jave
Preskočiť článok
(neodporúčame)
Java server - Komunikačný protokol
Článok pre vás napísal Petr Štechmüller
Avatar
Užívateľské hodnotenie:
Ešte nikto nehodnotil, buď prvý!
Autor se věnuje primárně programování v Javě, ale nebojí se ani webových technologií.
Aktivity