Unverified Commit c73ebb85 authored by Max Dymond's avatar Max Dymond Committed by Daniel Stenberg
Browse files

ossfuzz: changes before merging the generated corpora

Before merging in the oss-fuzz corpora from Google, there are some changes
to the fuzzer.
- Add a read corpus script, to display corpus files nicely.
- Change the behaviour of the fuzzer so that TLV parse failures all now
  go down the same execution paths, which should reduce the size of the
  corpora.
- Make unknown TLVs a failure to parse, which should decrease the size
  of the corpora as well.

Closes #1881
parent bec50cc2
Loading
Loading
Loading
Loading

tests/fuzz/corpus.py

0 → 100644
+96 −0
Original line number Diff line number Diff line
#!/usr/bin/env python
#
# Common corpus functions
import logging
import struct
log = logging.getLogger(__name__)


class BaseType(object):
    TYPE_URL = 1
    TYPE_RSP1 = 2
    TYPE_USERNAME = 3
    TYPE_PASSWORD = 4
    TYPE_POSTFIELDS = 5
    TYPE_HEADER = 6
    TYPE_COOKIE = 7
    TYPE_UPLOAD1 = 8
    TYPE_RANGE = 9
    TYPE_CUSTOMREQUEST = 10
    TYPE_MAIL_RECIPIENT = 11
    TYPE_MAIL_FROM = 12


class TLVEncoder(BaseType):
    def __init__(self, output):
        self.output = output

    def write_string(self, tlv_type, wstring):
        data = wstring.encode("utf-8")
        self.write_tlv(tlv_type, len(data), data)

    def write_bytes(self, tlv_type, bytedata):
        self.write_tlv(tlv_type, len(bytedata), bytedata)

    def maybe_write_string(self, tlv_type, wstring):
        if wstring is not None:
            self.write_string(tlv_type, wstring)

    def write_tlv(self, tlv_type, tlv_length, tlv_data=None):
        log.debug("Writing TLV %d, length %d, data %r",
                  tlv_type,
                  tlv_length,
                  tlv_data)

        data = struct.pack("!H", tlv_type)
        self.output.write(data)

        data = struct.pack("!L", tlv_length)
        self.output.write(data)

        if tlv_data:
            self.output.write(tlv_data)


class TLVDecoder(BaseType):
    def __init__(self, inputdata):
        self.inputdata = inputdata
        self.pos = 0
        self.tlv = None

    def __iter__(self):
        self.pos = 0
        self.tlv = None
        return self

    def __next__(self):
        if self.tlv:
            self.pos += self.tlv.total_length()

        if (self.pos + TLVHeader.TLV_DECODE_FMT_LEN) > len(self.inputdata):
            raise StopIteration

        # Get the next TLV
        self.tlv = TLVHeader(self.inputdata[self.pos:])
        return self.tlv

    next = __next__


class TLVHeader(BaseType):
    TLV_DECODE_FMT = "!HL"
    TLV_DECODE_FMT_LEN = struct.calcsize(TLV_DECODE_FMT)

    def __init__(self, data):
        # Parse the data to populate the TLV fields
        (self.type, self.length) = struct.unpack(self.TLV_DECODE_FMT, data[0:self.TLV_DECODE_FMT_LEN])

        # Get the remaining data and store it.
        self.data = data[self.TLV_DECODE_FMT_LEN:self.TLV_DECODE_FMT_LEN + self.length]

    def __repr__(self):
        return ("{self.__class__.__name__}(type={self.type!r}, length={self.length!r}, data={self.data!r})"
                .format(self=self))

    def total_length(self):
        return self.TLV_DECODE_FMT_LEN + self.length
 No newline at end of file
+11 −3
Original line number Diff line number Diff line
@@ -53,8 +53,14 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size)
  for(tlv_rc = fuzz_get_first_tlv(&fuzz, &tlv);
      tlv_rc == 0;
      tlv_rc = fuzz_get_next_tlv(&fuzz, &tlv)) {

    /* Have the TLV in hand. Parse the TLV. */
    fuzz_parse_tlv(&fuzz, &tlv);
    rc = fuzz_parse_tlv(&fuzz, &tlv);

    if(rc != 0) {
      /* Failed to parse the TLV. Can't continue. */
      goto EXIT_LABEL;
    }
  }

  if(tlv_rc != TLV_RC_NO_MORE_TLVS) {
@@ -408,8 +414,10 @@ int fuzz_parse_tlv(FUZZ_DATA *fuzz, TLV *tlv)
    FSINGLETONTLV(TLV_TYPE_MAIL_FROM, mail_from, CURLOPT_MAIL_FROM);

    default:
      /* The fuzzer generates lots of unknown TLVs, so don't do anything if
         the TLV isn't known. */
      /* The fuzzer generates lots of unknown TLVs - we don't want these in the
         corpus so we reject any unknown TLVs. */
      rc = 255;
      goto EXIT_LABEL;
      break;
  }

+1 −1
Original line number Diff line number Diff line
@@ -173,7 +173,7 @@ char *fuzz_tlv_to_string(TLV *tlv);
        {                                                                      \
          if (!(COND))                                                         \
          {                                                                    \
            rc = 1;                                                            \
            rc = 255;                                                          \
            goto EXIT_LABEL;                                                   \
          }                                                                    \
        }
+2 −46
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@

import argparse
import logging
import struct
import corpus
import sys
sys.path.append("..")
import curl_test_data
@@ -15,7 +15,7 @@ def generate_corpus(options):
    td = curl_test_data.TestData("../data")

    with open(options.output, "wb") as f:
        enc = TLVEncoder(f)
        enc = corpus.TLVEncoder(f)

        # Write the URL to the file.
        enc.write_string(enc.TYPE_URL, options.url)
@@ -61,50 +61,6 @@ def generate_corpus(options):
    return ScriptRC.SUCCESS


class TLVEncoder(object):
    TYPE_URL = 1
    TYPE_RSP1 = 2
    TYPE_USERNAME = 3
    TYPE_PASSWORD = 4
    TYPE_POSTFIELDS = 5
    TYPE_HEADER = 6
    TYPE_COOKIE = 7
    TYPE_UPLOAD1 = 8
    TYPE_RANGE = 9
    TYPE_CUSTOMREQUEST = 10
    TYPE_MAIL_RECIPIENT = 11
    TYPE_MAIL_FROM = 12

    def __init__(self, output):
        self.output = output

    def write_string(self, tlv_type, wstring):
        data = wstring.encode("utf-8")
        self.write_tlv(tlv_type, len(data), data)

    def write_bytes(self, tlv_type, bytedata):
        self.write_tlv(tlv_type, len(bytedata), bytedata)

    def maybe_write_string(self, tlv_type, wstring):
        if wstring is not None:
            self.write_string(tlv_type, wstring)

    def write_tlv(self, tlv_type, tlv_length, tlv_data=None):
        log.debug("Writing TLV %d, length %d, data %r",
                  tlv_type,
                  tlv_length,
                  tlv_data)

        data = struct.pack("!H", tlv_type)
        self.output.write(data)

        data = struct.pack("!L", tlv_length)
        self.output.write(data)

        if tlv_data:
            self.output.write(tlv_data)


def get_options():
    parser = argparse.ArgumentParser()
    parser.add_argument("--output", required=True)
+69 −0
Original line number Diff line number Diff line
#!/usr/bin/env python
#
# Simple script which reads corpus files.

import argparse
import logging
import sys
import corpus
log = logging.getLogger(__name__)


def read_corpus(options):
    with open(options.input, "rb") as f:
        dec = corpus.TLVDecoder(f.read())
        for tlv in dec:
            print(tlv)

    return ScriptRC.SUCCESS


def get_options():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    return parser.parse_args()


def setup_logging():
    """
    Set up logging from the command line options
    """
    root_logger = logging.getLogger()
    formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
    stdout_handler = logging.StreamHandler(sys.stdout)
    stdout_handler.setFormatter(formatter)
    stdout_handler.setLevel(logging.DEBUG)
    root_logger.addHandler(stdout_handler)
    root_logger.setLevel(logging.DEBUG)


class ScriptRC(object):
    """Enum for script return codes"""
    SUCCESS = 0
    FAILURE = 1
    EXCEPTION = 2


class ScriptException(Exception):
    pass


def main():
    # Get the options from the user.
    options = get_options()

    setup_logging()

    # Run main script.
    try:
        rc = read_corpus(options)
    except Exception as e:
        log.exception(e)
        rc = ScriptRC.EXCEPTION

    log.info("Returning %d", rc)
    return rc


if __name__ == '__main__':
    sys.exit(main())