Vydělávej až 160.000 Kč měsíčně! Akreditované rekvalifikační kurzy s garancí práce od 0 Kč. Více informací.
Hledáme nové posily do ITnetwork týmu. Podívej se na volné pozice a přidej se do nejagilnější firmy na trhu - Více informací.

6. diel - Java server - Client dispatcher

V minulej lekcii, Java server - Správca spojenia , sme implementovali správcu spojenia. Dnes sa pozrieme na správu klientov, ktorí boli presunutí do čakacej fronty, pretože boli naplnené maximálnej kapacity servera.

Client dispatcher

Trieda bude mať jednoduchá úloha. Bude sa snažiť udržať aktívne spojenie medzi serverom a klientom. V definovanom intervale bude prechádzať jednotlivé klientov v čakacej fronte a odošle im (zatiaľ) jednoduchú textovú správu s informáciou o počte klientov vo fronte. V prípade, že sa správu nepodarí doručiť, ukončí sa spojenie s klientom a klient sa vyradí z čakacej fronty. Celý tento proces sa bude diať len, ak vo fronte budú nejakí klienti.

Funkcie

Funkcia som už opísal v odseku vyššie, teraz si ich prehľadne spíšeme v bodoch:

  • vloženie klienta do fronty
  • získanie klienta z frontu
  • otázku, či je vo fronte klient

V balíčku core založíme nový balíček s názvom dispatcher, v ktorom vytvoríme rozhraní IClientDispatcher. Rozhranie bude definovať funkcie Dispatcher:

public interface IClientDispatcher extends IThreadControl {
    boolean hasClientInQueue();
    Client getClientFromQueue();
    boolean addClientToQueue(Client client);
}

Rozhranie dedí od IThreadControl, aby sme mohli ovládať vlákno, v ktorom dispatcher pobeží.

Implementácia triedy

Triedu, ktorá bude implementovať rozhranie, nazveme jednoducho ClientDispatcher, necháme ju dediť od triedy Thread a implementovať rozhranie IClientDispatcher:

public class ClientDispatcher extends Thread implements IClientDispatcher

Nadefinujeme si jednu triedny konštantu, ktorá bude reprezentovať interval, v ktorom sa bude opakovať komunikácia s klientmi vo fronte:

private static final int SLEEP_TIME = 5000;

Nasledujú inštančné premenné:

private boolean interupt = false;
private final Semaphore semaphore = new Semaphore(0);
private final Queue<Client> waitingQueue = new ConcurrentLinkedQueue<>();
private final Collection<Client> clientsToRemove = new ArrayList<>();
private final int waitingQueueSize;

Premenná interrupt bude riadiť vlákno. Kým bude mať hodnotu false, vlákno bude bežať. Semafor tu bude mať riadiacu funkciu. Kým vo fronte nebudú žiadni klienti, vlákno bude na semafore čakať. Akonáhle sa pripojený klient dostane do fronty, vlákno prejde cez semafor a bude robiť svoju prácu. Po odobratí všetkých klientov z frontu sa vlákno opäť uspia na semafore. Nasledujú dve kolekcie. Vo waitingQueue sa budú uchovávať klienti a clientsToRemove bude obsahovať klientov, ktorí ukončili spojenie a je potrebné ich odstrániť z frontu. Premenná waitingQueueSize obsahuje maximálny počet klientov vo fronte.

Konštruktor triedy bude vyžadovať jediný parameter. Bude to práve maximálny počet klientov, ktorí budú čakať v rade:

public ClientDispatcher(int waitingQueueSize) {
    this.waitingQueueSize = waitingQueueSize;
}

Implementácia funkcií

Začneme implementovať metódy z rozhrania IClientDispatcher:

@Override
public boolean hasClientInQueue() {
    return !waitingQueue.isEmpty();
}

@Override
public Client getClientFromQueue() {
    return waitingQueue.poll();
}

@Override
public boolean addClientToQueue(Client client) {
    if (waitingQueue.size() < waitingQueueSize) {
        waitingQueue.add(client);
        semaphore.release();
        return true;
    }

    return false;
}

Prvé dve metódy majú implementáciu jednoduchú a netreba ich komentovať. U funkcie pre pridanie klienta do fronty musíme najskôr zistiť, či ak front pojme ďalšieho klienta. Pokiaľ ho pojme, uvoľní sa semafor a vráti sa true, inak sa vráti false a nič viac sa nestane.

Výkonný kód vlákna

Teraz implementujeme najdôležitejšie metódu, run(), v ktorej sa bude odohrávať všetka logika:

@Override
public void run() {
    while(!interupt) {
        while(waitingQueue.isEmpty() && !interupt) {
            try {
                semaphore.acquire();
            } catch (InterruptedException ignored) {}
        }

        if (interupt) {
            clientsToRemove.addAll(waitingQueue);
        } else {
        final int count = waitingQueue.size();
            waitingQueue.iterator().forEach(client -> {
                try {
                    client.writer.write(("count: " + count + "\n").getBytes());
                    client.writer.flush();
                } catch (IOException e) {
                    clientsToRemove.add(client);
                }
            });
        }

        waitingQueue.removeAll(clientsToRemove);
        for (Client client : clientsToRemove) {
            client.close();
        }
        clientsToRemove.clear();

        try {
            Thread.sleep(SLEEP_TIME);
        } catch (InterruptedException ignored) {}
    }
}

Ako prvý sa nadefinuje nekonečná slučka, ktorá bude závislá na premenné interupt. Nasleduje ďalšia slučka, ktorá bude závislá na semafore. Vždy je lepšie mať čakanie na semafore v slučke než v jednej podmienke. Rozdiel medzi kódom:

if (waitingQueue.isEmpty() && !interupt) {
    try {
        semaphore.acquire();
    } catch (InterruptedException ignored) {}
}

a kódom:

while (waitingQueue.isEmpty() && !interupt) {
    try {
        semaphore.acquire();
    } catch (InterruptedException ignored) {}
}

je síce iba v jednom slove (if a while), ale významovo je rozdiel veľký. Počas čakania na semafore sa môže vyvolať ona InterruptedException výnimka. Ak by sme mali čakanie na semafore pomocou if, tak by vlákno začalo zbytočne vykonávať výkonný kód. Preto je dôležité čakať na semafore v slučke. V slučke sa kontrolujú dve veci:

  1. obsadenosť fronty
  2. príznak interupt

Ak sa vlákno vzbudí na semafore a front bude prázdna, alebo príznak interupt bude false, tak sa vlákno opäť uspí. Je dôležité, aby bol príznak interupt prítomný. Inak by sme nemohli ukončiť vlákno pri vypínaní servera.

Keď opustíme čakanie vlákna na semafore, nasleduje vyhodnotenie, či ak sa bude vlákno ukončovať alebo nie. Pokiaľ má nastať ukončenie vlákna, tak sa všetci klienti z frontu vloží to kolekcia pre odstránenie klientov z frontu. V prípade štandardného priechodu sa pošle všetkým klientom jednoduchá správa. Ak sa správu nepodarí doručiť, vloží sa klient do kolekcie pre odstránenie klientov z frontu, pretože klient najskôr neudržal spojenie.

Vo finále sa ukončí spojenie so všetkými užívateľmi, ktorí boli v kolekcii na odstránenie klientov, a počká sa definovaný čas a celý cyklus začne od začiatku.

Ukončenie vlákna

Nakoniec implementujeme metódu shutdown(), ktorú nám predpisuje rozhranie IThreadControl:

@Override
public void shutdown() {
    interupt = true;
    semaphore.release();
    try {
        join();
    } catch (InterruptedException ignored) { }

}

V tejto metóde urobíme tri veci:

  1. nastavíme príznak interupt na true
  2. uvoľníme semafor
  3. počkáme, až sa vlákno ukončí

Uvoľnením semafore sa spustí vlákno Dispatcher. Vďaka tomu, že sme nastavili príznak interupt na true, pridajú sa všetci klienti z frontu na zoznam adeptov pre ukončenie spojenia. Po ukončení spojenia s klientmi a odobratie z frontu sa už nesplní podmienka v nekonečnej slučke a vlákno sa bezpečne ukončí.

Prepojenie logiky

V druhej časti dnešného článku použijeme client dispatcher v triede ConnectionManager. Pre začiatok pridáme novú triednu konštantu typu IClientDispatcher a do konstruktoru triedy ConnectionManager parameter rovnakého typu, ktorým konštantu inicializujeme:

public ConnectionManager(IClientDispatcher clientDispatcher, ExecutorService pool,
    int maxClients) {
    this.clientDispatcher = clientDispatcher;
    this.pool = pool;
    this.maxClients = maxClients;
}

Ďalej dokončíme implementáciu metódy insertClientToListOrQueue(). Upravíme connectionClosedListener tak, aby sa server pokúsil získať z frontu čakajúceho klienta a pridal ho medzi aktívnych klientov:

client.setConnectionClosedListener(() - > {
    clients.remove(client);
    if (clientDispatcher.hasClientInQueue()) {
        this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue());
    }
});

Namiesto druhého TODO naimplementujeme vloženie klienta do fronty:

if (!clientDispatcher.addClientToQueue(client)) {
    client.close();
}

Tu využijeme návratovej hodnoty metódy addClientToQueue(), ktorá vráti true, ak klienta vloží do fronty. Ak je fronta plná, vráti false a ako reakciu na plnú front odpojíme klienta.

Teraz je potrebné už len spustiť vlákno Dispatcher. O spustení sa postaráme v metóde onServerStart() triedy ConnectionManager, kde zavoláme:

clientDispatcher.start();

V metóde onServerStop() ukončíme Dispatcher:

clientDispatcher.shutdown();

Nad Dispatcher zavoláme metódu shutdown(), ktorou sa nastaví príznak a prebudí vlákno. Po chvíli sa vlákno Dispatcher ukončí.

Nakoniec vytvoríme továreň na výrobu Dispatcher a zaregistrujeme ju. Vytvoríme teda rozhranie IClientDispatcherFactory, ktoré bude mať jedinú metódu getClientDispatcher(), ktorá bude v parametri prijímať maximálny počet klientov vo fronte.

public interface IClientDispatcherFactory {
    IClientDispatcher getClientDispatcher(int waitingQueueSize);
}

Implementácia tohto rozhrania bude veľmi jednoduchá. Vytvoríme teda triedu ClientDispatcherFactory, ktorá bude toto rozhranie implementovať a implementujeme jedinú metódu getClientDispatcher():

public class ClientDispatcherFactory implements IClientDispatcherFactory {
    @Override
    public IClientDispatcher getClientDispatcher(int waitingQueueSize) {
        return new ClientDispatcher(waitingQueueSize);
    }
}

Továreň zaregistrujeme v triede ServerModule obvyklým spôsobom:

bind(IClientDispatcherFactory.class).to(ClientDispatcherFactory.class);

Všetko je takmer v poriadku, až na továreň správcu spojenie ConnectionManagerFactory. Triede ConnectionManager sme zmenili signatúru konstruktoru pridaním parametra typu IClientDispatcher. Vytvoríme teda v tejto továrni novú inštančný konštantu typu IClientDispatcherFactory. Túto konštantu bude dostávať továreň správcu spojenia v konstruktoru:

private final IClientDispatcherFactory clientDispatcherFactory;

@Inject
ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory) {
    this.clientDispatcherFactory = clientDispatcherFactory;
}

Teraz nám nič nebráni upraviť metódu getConnectionManager(). Pre vytvorenie novej inštancie správcu spojenie využijeme práve továreň client Dispatcher:

return new ConnectionManager(clientDispatcherFactory.getClientDispatcher( waitingQueueSize), pool, maxClients);

Tým by sme mali hotovú správu klientov čakajúcich vo fronte. Nabudúce, v lekcii Java server - Zapisovacia vlákno , vytvoríme vlákno, ktoré bude asynchrónne odosielať správy zo servera ku klientom.


 

Mal si s čímkoľvek problém? Stiahni si vzorovú aplikáciu nižšie a porovnaj ju so svojím projektom, chybu tak ľahko nájdeš.

Stiahnuť

Stiahnutím nasledujúceho súboru súhlasíš s licenčnými podmienkami

Stiahnuté 20x (156.7 kB)
Aplikácia je vrátane zdrojových kódov v jazyku Java

 

Predchádzajúci článok
Java server - Správca spojenia
Všetky články v sekcii
Server pre klientskej aplikácie v Jave
Preskočiť článok
(neodporúčame)
Java server - Zapisovacia vlákno
Článok pre vás napísal Petr Štechmüller
Avatar
Užívateľské hodnotenie:
Ešte nikto nehodnotil, buď prvý!
Autor se věnuje primárně programování v Javě, ale nebojí se ani webových technologií.
Aktivity