PcapMultiplexer.java 11.3 KB
Newer Older
filatov's avatar
filatov committed
/**
 *  Pcap capture multiplexor
 *  
 *  @author     ETSI / STF424
 *  @version    $URL: $
 *              $Id: $
 *  Note Copy jnetpcap.jar in C:\WINDOWS\Sun\Java\lib\ext, location of jpcap library
 */
package org.etsi.its.adapter;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.etsi.adapter.TERFactory;
import org.etsi.common.ByteHelper;
import org.etsi.its.adapter.layers.EthernetLayer;
import org.etsi.its.adapter.layers.Layer;
import org.etsi.ttcn.tci.CharstringValue; /* FIXME: import tci */

import org.jnetpcap.ByteBufferHandler;
import org.jnetpcap.Pcap;
import org.jnetpcap.PcapBpfProgram;
import org.jnetpcap.PcapHeader;
import org.jnetpcap.PcapIf;

public class PcapMultiplexer implements Runnable {
filatov's avatar
filatov committed
    /**
     * Unique instance of the factory
     */
    private static final PcapMultiplexer instance = new PcapMultiplexer();
    
    private static byte[] MAC_BROADCAST = new byte[]{(byte)0xFF,(byte)0xFF,(byte)0xFF,(byte)0xFF,(byte)0xFF,(byte)0xFF};
    
    private StringBuilder errbuf = new StringBuilder();     // For any error msgs  
    private String timestampOffset = "";
filatov's avatar
filatov committed
    
    private PcapMultiplexer() {
        
        filter = "";
        offlineMode = ((CharstringValue)TERFactory.getInstance().getTaParameter("OfflineMode")).getString().toLowerCase().equals("true"); 
        if (!offlineMode) {
            // Obtain the list of network interfaces
            List<PcapIf> alldevs = new ArrayList<PcapIf>(); // Will be filled with NICs  
            
                              
            int r = Pcap.findAllDevs(alldevs, errbuf);  
            if (r != Pcap.OK || alldevs.isEmpty()) {  
              TERFactory.getInstance().logError("Can't read list of devices, error is %s" + errbuf.toString());  
              return;  
            }  
    
            // Find the right interface
            int ifaceIndex = 0;
            String expectedIface = ((CharstringValue)TERFactory.getInstance().getTaParameter("LocalEthernetMAC")).getString().toLowerCase(); 
            for( ; ifaceIndex < alldevs.size(); ifaceIndex++) {
                try {
                    if (expectedIface.equalsIgnoreCase(ByteHelper.byteArrayToString(alldevs.get(ifaceIndex).getHardwareAddress()))) {
                        // Interface found
                        break;
                    }
                } catch (IOException e) {
                    // ignore
            }
            // Check result
            if (ifaceIndex == alldevs.size()) {
                throw new RuntimeException(String.format("PcapMultiplexer: Network interface %s not found", expectedIface));
            }
           
            device = alldevs.get(ifaceIndex);
            //TERFactory.getInstance().logDebug("Listening: " + device.getName());
        } else {
            file = ((CharstringValue)TERFactory.getInstance().getTaParameter("PcapFile")).getString().toLowerCase(); 
            if ((file == null) || file.isEmpty()) {
                throw new RuntimeException(String.format("PcapMultiplexer: failed to open '%s'", file));
            }
            timestampOffset = ((CharstringValue)TERFactory.getInstance().getTaParameter("OffsetTime")).getString().toLowerCase();
filatov's avatar
filatov committed
        }
    }
    
    /**
     * Gets the unique factory instance
     * @return PcapMultiplexer instance
     */
    public static PcapMultiplexer getInstance(){
        return instance;
    }
    
    public synchronized void register(Layer client, byte[] macAddress, short frameType) {
        //TERFactory.getInstance().logDebug(">>>PcapMultiplexer.registering: " + frameType);
filatov's avatar
filatov committed
        
        if(clientsToMacs.isEmpty()) {
            if (!offlineMode) { // Open interface 
                int snaplen = 64 * 1024;            // Capture all packets, no truncation  
                int flags = Pcap.MODE_PROMISCUOUS;  // capture all packets  
                int timeout = 10;                   // 10 millis  
                pcap = Pcap.openLive(device.getName(), snaplen, flags, timeout, errbuf);
                if (pcap == null) { // Check result
                    TERFactory.getInstance().logError("Error while opening device for capture: " + errbuf.toString());  
                    return;  
                }  
                captureThread = new Thread(this);
                captureThread.start();
                filter = "";
            } else { // Open file
                pcap = Pcap.openOffline(file, errbuf);
                if (pcap == null) { // Check result
                    TERFactory.getInstance().logError("Error while opening device for capture: " + errbuf.toString());  
                    return;  
                }
                captureThread = new Thread(this);
                captureThread.start();
                filter = "";
            }
            if (pcap == null) { // Check result
                TERFactory.getInstance().logError("Error while opening device for capture: " + errbuf.toString());  
                return;  
filatov's avatar
filatov committed
            }  
            captureThread = new Thread(this);
            captureThread.start();
            if (!timestampOffset.isEmpty()) {
                filter = "frame time_delta " + timestampOffset + " and ";
            }
filatov's avatar
filatov committed
        }
        else {
            //TERFactory.getInstance().logDebug("Another Client !");
filatov's avatar
filatov committed
            filter = filter + " and ";
        }

        // Update Filter
        String strMacAddress = String.format("%02x", macAddress[0]);
        for(int i=1; i < macAddress.length; i++) {
            strMacAddress += String.format(":%02x", macAddress[i]);
        }
        
        filter = filter + "not ether src " + strMacAddress;
        //TERFactory.getInstance().logDebug("New filter: " + filter);
filatov's avatar
filatov committed

        // Apply filter
        PcapBpfProgram bpfFilter = new PcapBpfProgram();
        int optimize = 0; // 1 means true, 0 means false
        int netmask = 0;            
        int r = pcap.compile(bpfFilter, filter, optimize, netmask);
        if (r != Pcap.OK) {
            //TERFactory.getInstance().logDebug("Filter error: " + pcap.getErr());
filatov's avatar
filatov committed
        }
        pcap.setFilter(bpfFilter);

        // Register client
        clientsToMacs.put(client.toString(), macAddress);
        clientsToLayers.put(client.toString(), client);
        clientsToFrameTypes.put(client.toString(), frameType);
    }
    
    public synchronized void unregister(Layer client) {
        if(clientsToMacs.containsKey(client.toString())) {
            clientsToMacs.remove(client.toString());
            clientsToFrameTypes.remove(client.toString());
            clientsToLayers.remove(client.toString());
            
            if(clientsToMacs.isEmpty()) {
                pcap.breakloop();
                try {
                    captureThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                pcap.close();
            }
        }
    }
    
    /**
     * Thread function for jpcap capture loop
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        
        ByteBufferHandler<Object> handler = new ByteBufferHandler<Object>() {
            
            @Override
            public void nextPacket(PcapHeader pcapHeader, ByteBuffer byteBuffer, Object user) {
                if(byteBuffer.remaining() < 14) {
                    return;
                }
filatov's avatar
filatov committed
                Map<String, Object> lowerInfo = new HashMap<String, Object>();

                // Extract Dst info                
                byte[] dst = new byte[6];
                byteBuffer.get(dst, 0, dst.length);
                lowerInfo.put(EthernetLayer.LINK_LAYER_DESTINATION, dst);
                
                // Skip Src
                byteBuffer.position(byteBuffer.position() + 6);

                // Extract FrameType info
                byte[] rawFrameType = new byte[2];
                byteBuffer.get(rawFrameType, 0, rawFrameType.length);
                short frameType = ByteHelper.byteArrayToInt(rawFrameType).shortValue();
                
                // Extract Data
                byte[] data = new byte[byteBuffer.remaining()];
                byteBuffer.get(data, 0, byteBuffer.remaining());
                
                // Dispatch
                for (String mapKey : clientsToMacs.keySet()) {
                    if(frameType == clientsToFrameTypes.get(mapKey)) {
                        if(Arrays.equals(dst, MAC_BROADCAST)
                                || Arrays.equals(dst, clientsToMacs.get(mapKey))) {
                            
                            lowerInfo.put(Layer.RECEPTION_TIMESTAMP, pcapHeader.timestampInMicros());
                            clientsToLayers.get(mapKey).receive(data, lowerInfo);
                        }
                    }
                }
        if (offlineMode) {
            try {
                Thread.sleep(4000); // TOTO Use a parameter instead of an hardcoded value
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
filatov's avatar
filatov committed
        pcap.loop(-1, handler, null);
    }
        
    public byte[] sendPacket(Layer client, byte[] dest, byte[] payload) {

        if(clientsToMacs.containsKey(client.toString())) {
            
            byte[] packet = ByteHelper.concat(
                dest,   
                clientsToMacs.get(client.toString()), 
                ByteHelper.intToByteArray(clientsToFrameTypes.get(client.toString()), 2), 
                payload);
            
filatov's avatar
filatov committed
            pcap.sendPacket(packet);
            return packet;
        }
        return null;
    }
    
    public void resetFilter(String pcapFilter) { 
        // Sanity check
        if ((pcapFilter == null) || pcapFilter.isEmpty()) {
            return;
        }
        
        filter = pcapFilter;
        TERFactory.getInstance().logDebug("resetFilter: New filter: " + filter);
        
        // Apply filter
        PcapBpfProgram bpfFilter = new PcapBpfProgram();
        int optimize = 0; // 1 means true, 0 means false
        int netmask = 0;            
        int r = pcap.compile(bpfFilter, filter, optimize, netmask);
        if (r != Pcap.OK) {
            TERFactory.getInstance().logError("Filter error: " + pcap.getErr());
garciay's avatar
garciay committed
        } else {
            r = pcap. setFilter(bpfFilter);
            if (r != Pcap.OK) {
                TERFactory.getInstance().logError("Filter error: " + pcap.getErr());
            }
filatov's avatar
filatov committed
    /**
     * Jpcap capture device
     */
    private Pcap pcap;
    
    /**
     * Jpcap capture thread instance.
     */
    private Thread captureThread;
    
    PcapIf device = null;
    boolean offlineMode = false;
    String file = "";
filatov's avatar
filatov committed
    private String filter;
    private Map<String, byte[]> clientsToMacs = new HashMap<String, byte[]>();
    private Map<String, Short> clientsToFrameTypes = new HashMap<String, Short>();
    private HashMap<String, Layer> clientsToLayers = new HashMap<String, Layer>();