Commit 139ec294 authored by filatov's avatar filatov
Browse files

offline pcap layer

parent 6e16caa0
Loading
Loading
Loading
Loading
+218 −0
Original line number Diff line number Diff line
#if defined (__CYGWIN__)
#define _GNU_SOURCE
#endif
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#include <chrono>

#include <Port.hh>

#include "pcap_offline_layer_factory.hh"

#include "loggers.hh"

#include <pcap.h>

#ifdef __CYGWIN__
typedef struct {
    bpf_int32 tv_sec;		/* seconds */
    bpf_int32 tv_usec;		/* microseconds */
}pcap_o_timeval;

typedef struct pcap_o_pkthdr { 
    pcap_o_timeval ts;	/* time stamp */
    bpf_u_int32 caplen;		/* length of portion present */
    bpf_u_int32 len;		/* length this packet (off wire) */
}pcap_o_pkthdr;
#else
typedef struct pcap_pkthdr pcap_o_pkthdr;
typedef struct timeval pcap_o_timeval;
#endif

pcap_offline_layer::pcap_offline_layer(const std::string& p_type, const std::string& param) : 
	layer(p_type), PORT(p_type.c_str()), _params(), _device(NULL), _running(FALSE) {
  loggers::get_instance().log(">>> pcap_offline_layer::pcap_offline_layer: %s, %s", p_type.c_str(), param.c_str());
  params::convert(_params, param);

  _o_params.insert(std::pair<std::string, std::string>(std::string("timestamp"), std::string()));

  char error_buffer[PCAP_ERRBUF_SIZE];
  params::const_iterator it;

  it = _params.find(std::string("realtime"));
  _realtime = ((it != _params.end()) && !it->second.empty());

  it = _params.find(std::string("loop"));
  _loop = ((it != _params.end()) && !it->second.empty());

  it = _params.find(std::string("file"));
  if ((it != _params.end()) && !it->second.empty()) {
    const std::string& file = it->second;
    _device = pcap_open_offline(file.c_str(), error_buffer);
    if (_device) {

      // Add user defined filter
      it = _params.find(std::string("filter"));
      if ((it != _params.end()) && !it->second.empty()) {
        const std::string& filter = it->second;
        // Log final PCAP filter
        loggers::get_instance().user("pcap_offline_layer::pcap_offline_layer: Filter: %s", filter.c_str());
        struct bpf_program f = {0};
        if (pcap_compile(_device, &f, filter.c_str(), 1, PCAP_NETMASK_UNKNOWN) != 0) {
          loggers::get_instance().error("pcap_offline_layer::pcap_offline_layer: Failed to compile PCAP filter");
        }else{
          if (pcap_setfilter(_device, &f) != 0) {
            loggers::get_instance().error("pcap_offline_layer::pcap_offline_layer: Failed to set PCAP filter");
          }
        }
        pcap_freecode(&f);
      }

      // create pipe and run thread
      if (pipe2(_fd, O_NONBLOCK) == -1) {
        loggers::get_instance().error("pcap_offline_layer::pcap_offline_layer: Failed to create a pipe: %s", ::strerror(errno));
      }
      // Pass the pipe handler to the polling procedure
      loggers::get_instance().log("pcap_offline_layer::pcap_offline_layer: Call handler with descriptor %d", _fd[0]);
      Handler_Add_Fd_Read(_fd[0]);
      // Create the offline reader thread
      _thread = new std::thread(&pcap_offline_layer::run, (void *)this);
      if (_thread == NULL) {
        loggers::get_instance().error("pcap_offline_layer::pcap_offline_layer: Failed to start offline thread");
      }
      while (_running == FALSE) {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
      }
      // Thread was started
      loggers::get_instance().log("<<< pcap_offline_layer::pcap_offline_layer");
    }
  }
} // End of ctor

pcap_offline_layer::~pcap_offline_layer() {
  loggers::get_instance().log(">>> pcap_offline_layer::~pcap_offline_layer");
  
  if (_device != NULL) {
    if (_thread != NULL) {
      _running = FALSE;
      // Wait for the working thread to terminate
      _thread->join();
      loggers::get_instance().log("pcap_offline_layer::~pcap_offline_layer: Thread were stops");
      // Cleanup
      delete _thread;
      close(_fd[0]);
      close(_fd[1]);
    }
    pcap_close(_device);
  }
} // End of dtor

void* pcap_offline_layer::run(void* p_this) {
  pcap_offline_layer& p = *static_cast<pcap_offline_layer *>(p_this);
  return p.thread();
}


static long timeval_diff (const pcap_o_timeval &x, const pcap_o_timeval &y)
{
  pcap_o_timeval z = y;
  /* Perform the carry for the later subtraction by updating y. */
  if (x.tv_usec < y.tv_usec) {
    int nsec = (y.tv_usec - x.tv_usec) / 1000000 + 1;
    z.tv_usec -= 1000000 * nsec;
    z.tv_sec  += nsec;
  }
  if (x.tv_usec - z.tv_usec > 1000000) {
    int nsec = (x.tv_usec - z.tv_usec) / 1000000;
    z.tv_usec += 1000000 * nsec;
    z.tv_sec -= nsec;
  }

  return (x.tv_sec - z.tv_sec) * 1000 + ((x.tv_usec - z.tv_usec)/1000);
}

void* pcap_offline_layer::thread() {
  struct pcap_o_pkthdr *pkt_header;
  struct pcap_o_pkthdr lh;
  const u_char *pkt_data;
  unsigned char pkt_count = 0;

  loggers::get_instance().log(">>> pcap_offline_layer::run");

  memset(&lh, 0, sizeof(lh));

  _running = TRUE;

  // wait a bit before sending first packet
  std::this_thread::sleep_for(std::chrono::milliseconds(500));

  while (_running) { // Loop while _running flag is up
    // get next frame
    int result = pcap_next_ex(_device, (struct pcap_pkthdr**)&pkt_header, &pkt_data);
    if(result == 2){
      if(_loop){

      }else{
        _running = FALSE;
        return NULL;
      }
    }
    if(_realtime) {
      // wait for next packet timestamp
      if(lh.ts.tv_sec|lh.ts.tv_usec){
        long diff = timeval_diff(pkt_header->ts, lh.ts);
        if(diff > 0) {
          loggers::get_instance().log("<<< pcap_offline_layer::run: Wait %d msec", diff);
          std::this_thread::sleep_for(std::chrono::milliseconds(diff));
          loggers::get_instance().log("<<< pcap_offline_layer::run: Wait done");
        }
      }
    }
    while(_running && !_resume.try_lock()) {
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    lh = *pkt_header;
#if 0
    {
      char buf[128];
      std::time_t t = pkt_header->ts.tv_sec;
      std::tm * pt = std::localtime( &t );
      t = std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", pt);
      std::sprintf(buf+t, ".%06ld", pkt_header->ts.tv_usec);
      _o_params["timestamp"] = std::string(buf);
    }
#else
    _o_params["timestamp"] = std::to_string(pkt_header->ts.tv_usec);
#endif
    _o_data = OCTETSTRING(pkt_header->len, pkt_data);
    write(_fd[1], &pkt_count, 1);pkt_count++;
  }
  
  loggers::get_instance().log("<<< pcap_offline_layer::run");
  return NULL;
}

void pcap_offline_layer::send_data(OCTETSTRING& data, params& params) {
  loggers::get_instance().log("pcap_offline_layer::send_data: Offline mode, operation was skipped");
}

void pcap_offline_layer::receive_data(OCTETSTRING& data, params& params) {
  loggers::get_instance().log(">>> pcap_offline_layer::receive_data: Received %d bytes", data.lengthof());
  loggers::get_instance().log_to_hexa("Packet dump", data);
  
  // Pass the packet to the upper layers
  receive_to_all_layers(data, params);
}

void pcap_offline_layer::Handle_Fd_Event_Readable(int fd) {
  //loggers::get_instance().log(">>> pcap_offline_layer::Handle_Fd_Event_Readable: %d", fd);
  char c[2];
  // Process the packet at this layer
  this->receive_data(_o_data, _o_params);
  read(_fd[0], &c, 1);
  _resume.unlock();
}

pcap_offline_layer_factory pcap_offline_layer_factory::_f;
+76 −0
Original line number Diff line number Diff line
/*!
 * \file      pcap_offline_layer.hh
 * \brief     Header file for ITS Offline Pcap port layer.
 * \author    ETSI STF525
 * \copyright ETSI Copyright Notification
 *            No part may be reproduced except as authorized by written permission.
 *            The copyright and the foregoing restriction extend to reproduction in all media.
 *            All rights reserved.
 * \version   0.1
 */
#pragma once

#include <thread>
#include <mutex>

#include "t_layer.hh"
#include "params.hh"

#include <OctetString.hh>

class PORT; //! Forward declaration of TITAN class

typedef struct pcap pcap_t;

/*!
 * \class pcap_layer
 * \brief  This class provides description of ITS PCAP port protocol layer
 */
class pcap_offline_layer : public layer, public PORT {
  params _params;            //! Layer parameters
  pcap_t* _device;           //! Device handle
  std::thread* _thread;      //! Thread handle, used to read PCAP file instead of NIC, used in file mode
  std::mutex _resume;
  bool _running;             //! Set to true when the thread is running, used in file mode
  bool _realtime;            //! Set to true if realtime delay shall be added between packets
  bool _loop;                //! Set to true if playback shall be looped
  int _fd[2];                //! pipe to signal to Titan

  params                _o_params;
  OCTETSTRING           _o_data;

  static void* run(void* p_this);
public: 
  void* thread(void);
public: //! \publicsection
  /*!
   * \brief Specialised constructor
   *        Create a new instance of the pcap_layer class
   * \param[in] p_type \todo
   * \param[in] p_param \todo
   */
  pcap_offline_layer(const std::string& p_type, const std::string& param);
  /*!
   * \brief Default destructor
   */
  virtual ~pcap_offline_layer();

  /*!
   * \virtual
   * \fn void send_data(OCTETSTRING& data, params& params);
   * \brief Send bytes formated data to the lower layers
   * \param[in] p_data The data to be sent
   * \param[in] p_params Some parameters to overwrite default value of the lower layers parameters
   */
  virtual void send_data(OCTETSTRING& data, params& params);
  /*!
   * \virtual
   * \fn void receive_data(OCTETSTRING& data, params& params);
   * \brief Receive bytes formated data from the lower layers
   * \param[in] p_data The bytes formated data received
   * \param[in] p_params Some lower layers parameters values when data was received
   */
  virtual void receive_data(OCTETSTRING& data, params& info);
  
  void Handle_Fd_Event_Readable(int fd);
};
+45 −0
Original line number Diff line number Diff line
/*!
 * \file      pcap_offline_layer_factory.hh
 * \brief     Header file for Pcap layer factory.
 * \author    ETSI STF525
 * \copyright ETSI Copyright Notification
 *            No part may be reproduced except as authorized by written permission.
 *            The copyright and the foregoing restriction extend to reproduction in all media.
 *            All rights reserved.
 * \version   0.1
 */
#pragma once

#include "layer_stack_builder.hh"

#include "pcap_offline_layer.hh"

/*!
 * \class pcap_offline_layer_factory
 * \brief  This class provides a factory class to create an pcap_offline_layer class instance
 */
class pcap_offline_layer_factory: public layer_factory {
  static pcap_offline_layer_factory _f; //! Reference to the unique instance of this class
public: //! \publicsection
  /*!
   * \brief Default constructor
   *        Create a new instance of the udp_layer_factory class
   * \remark The PCAP layer identifier is PCAP
   */
  pcap_offline_layer_factory() {
    // register factory
    layer_stack_builder::register_layer_factory("PCAP_FILE", this);
  };
  /*!
   * \fn layer* create_layer(const std::string & type, const std::string & param);
   * \brief  Create the layers stack based on the provided layers stack description
   * \param[in] p_type The provided layers stack description
   * \param[in] p_params Optional parameters
   * \return 0 on success, -1 otherwise
   * \inline
   */
  inline virtual layer* create_layer(const std::string& p_type, const std::string& p_param) {
    return new pcap_offline_layer(p_type, p_param);
  };
}; // End of class pcap_offline_layer_factory