if
statements in the code.I would note that in a real world chat application that it unlikely different rooms would have different resource URIs but for the purposes of the presentation this made sense.
package websocket; import static java.util.Collections.emptySet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; @ServerEndpoint(value = "/chat/{room}") public class ChatService { private static final Set<Session> EMPTY_ROOM = emptySet(); static final ConcurrentMap<String, Set<Session>> rooms = new ConcurrentHashMap<>(); @OnOpen public void onOpen(Session peer, @PathParam("room") String room) { rooms.computeIfAbsent(room, s -> new CopyOnWriteArraySet<Session>()).add(peer); } @OnClose public void onClose(Session peer, @PathParam("room") String room) { rooms.getOrDefault(room, EMPTY_ROOM).remove(peer); } @OnError public void onError(Session peer, Throwable th, @PathParam("room") String room) { System.out.println("Peer error " + room + " " + th); } @OnMessage public void message(String message, Session peer, @PathParam("room") String room) { // Send a message to all peers in a room who are not the current // peer and are still open. Send the message asynchronously to ensure // that the first client is not hung up. rooms.getOrDefault(room, EMPTY_ROOM).parallelStream() .filter(s -> s != peer && s.isOpen()) .forEach(s -> s.getAsyncRemote().sendObject(message)); }; }
One of the problem with the above design is that there is no error logging when an invalid room is used. This could be useful to diagnose errors. Not wanting to use any conditional statements you could use an Optional object:
import java.util.Optional; @OnClose public void onClose(Session peer, @PathParam("room") String room) { Optional.ofNullable(rooms.get(room)) .orElseThrow(() -> new IllegalStateException("Cannot find room " + room)) .remove(peer); }
Of course you might want this on your method objects, so you can use default methods to create a mixin for this on all your Map objects with a trivial subclass.
import java.util.Optional; public interface OptionalMap<K,V> extends ConcurrentMap<K,V> { public default Optional<V> find(K key) { return Optional.ofNullable(get(key)); } } public static class OptionalHashMap<K,V> extends ConcurrentHashMap<K,V> implements OptionalMap<K,V> { } static final OptionalMap<String, Set<Session>> rooms = new OptionalHashMap<>(); @OnClose public void onClose(Session peer, @PathParam("room") String room) { rooms.find(room) .orElseThrow(() -> new IllegalStateException("Cannot find room " + room)) .remove(peer); }
Whilst working on my presentation it became apparent that it was also possible to use the "openSessions" and "getUserProperties" method to store discrimination data against the Session. I don't have enough experience yet to say which is the better design for a particular case.
package websocket; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; @ServerEndpoint(value = "/chat/{room}") public class ChatService { private static final String ROOM_PROPERTY = "ROOM"; @OnOpen public void onOpen(Session peer, @PathParam("room") String room) { peer.getUserProperties().put(ROOM_PROPERTY, room); } @OnClose public void onClose(Session peer, @PathParam("room") String room) { // No need to tidy up and data is store against peer } @OnError public void onError(Session peer, Throwable th, @PathParam("room") String room) { System.out.println("Peer error " + room + " " + th); } @OnMessage public void message(String message, Session peer, @PathParam("room") String room) { peer.getOpenSessions().parallelStream() .filter(s -> room.equals(s.getUserProperties().get(ROOM_PROPERTY))) .filter(s -> s != peer && s.isOpen()) .forEach(s -> s.getAsyncRemote().sendObject(message)); }; }