UdpMultiplexer.java 5.12 KB
Newer Older
garciay's avatar
garciay committed
package org.etsi.its.adapter;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import org.etsi.common.ByteHelper;
import org.etsi.its.adapter.UdpMultiplexer;
import org.etsi.its.adapter.layers.Layer;

public class UdpMultiplexer {
    
    /**
     * Parameter name for UDP port destination
     */
    public static final String UDP_PORT_KEY = "Udport";
    
    public String UdpAddress = "10.200.1.101"; // FIXME Use a generic way to retrieve UDP settings
    
    public int UdpRecvPort = 18501; // FIXME Use a generic way to retrieve UDP settings
    
    public int UdpSendPort = 18502; // FIXME Use a generic way to retrieve UDP settings
    
    /**
     * Unique instance of the factory
     */
    private static final UdpMultiplexer instance = new UdpMultiplexer();
    
    private Map<String, byte[]> clientsToMacs = new HashMap<String, byte[]>();
    
    //private Map<String, Short> clientsToFrameTypes = new HashMap<String, Short>();
    
    private HashMap<String, Layer> clientsToLayers = new HashMap<String, Layer>();
    
    private DatagramSocket iutSocket;
    private InetAddress iutAddress;
    private int iutPort;
    private Thread iutThread;
    
    /**
     * Gets the unique factory instance
     * @return UdpMultiplexer instance
     */
    public static UdpMultiplexer getInstance(){
        return instance;
    }
    
    public UdpMultiplexer() {
    }
    
    public synchronized void register(Layer client, byte[] macAddress, short frameType) {
        //TERFactory.getInstance().logDebug(">>>UdpMultiplexer.registering: " + frameType);
        
        if(clientsToMacs.isEmpty()) {
            try {
                iutAddress = InetAddress.getByName(UdpAddress);
                //TERFactory.getInstance().logDebug("UdpIpLayer.register: IUT Address: " + iutAddress.getHostAddress());
                iutPort = UdpSendPort;
                iutSocket = new DatagramSocket(UdpRecvPort);
                iutThread = new UdpThread(iutSocket);
                iutThread.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // Register client
        clientsToMacs.put(client.toString(), macAddress);
        clientsToLayers.put(client.toString(), client);
        //clientsToFrameTypes.put(client.toString(), frameType);
    }
    
    public synchronized void unregister(Layer client) {
        if(clientsToMacs.containsKey(client.toString())) {
            clientsToMacs.remove(client.toString());
            //clientsToFrameTypes.remove(client.toString());
            clientsToLayers.remove(client.toString());
            
            if(clientsToMacs.isEmpty()) {
                iutSocket.close();
                iutThread.interrupt();
                try {
                    iutThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    public byte[] send(Layer client, byte[] dest, byte[] payload, Map<String, Object> params) {
        
        if(clientsToMacs.containsKey(client.toString())) {
            
            DatagramPacket packet = null;
            if(params.containsKey(UDP_PORT_KEY)) {
                packet = new DatagramPacket(payload, payload.length, iutAddress, Integer.parseInt((String) params.get(UDP_PORT_KEY)));
            } else {
                packet = new DatagramPacket(payload, payload.length, iutAddress, iutPort);
            }
            try {
                iutSocket.send(packet);
                return packet.getData();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        return null;
    }
    
    private class UdpThread extends Thread {
        
        private DatagramSocket taSocket;
        private boolean running = true;
        
        public UdpThread(DatagramSocket taSocket) throws IOException {
            this.taSocket = taSocket;
        }
        
        @Override
        public void run() {
            
            while(running) {
                try {
                    byte[] buf = new byte[4096];
                    
                    // receive packet
                    DatagramPacket packet = new DatagramPacket(buf, buf.length);
                    taSocket.receive(packet);
                    
                    byte[] buffer = ByteHelper.extract(packet.getData(), packet.getOffset(), packet.getLength());
                    if(buffer.length < 28) {
                        continue;
                    }
                    
                    Map<String, Object> lowerInfo = new HashMap<String, Object>();
                    lowerInfo.put(Layer.RECEPTION_TIMESTAMP, System.currentTimeMillis());
                    
                    // Dispatch
                    for (String mapKey : clientsToMacs.keySet()) {
                         clientsToLayers.get(mapKey).receive(buffer, lowerInfo);
                    }
                } catch (IOException e) {
                    running = false;
                }
            }
        }
    }
    
}