Commit 01e1bdb1 authored by Daniel Stenberg's avatar Daniel Stenberg
Browse files

http2: force "drainage" of streams

... which is necessary since the socket won't be readable but there is
data waiting in the buffer.
parent 7bbac214
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -174,6 +174,7 @@ CURLcode Curl_http_setup_conn(struct connectdata *conn)
  /* where to store incoming data for this stream and how big the buffer is */
  http->mem = conn->data->state.buffer;
  http->len = BUFSIZE;
  http->memlen = 0;

  return CURLE_OK;
}
+1 −0
Original line number Diff line number Diff line
@@ -169,6 +169,7 @@ struct HTTP {

  char *mem;     /* points to a buffer in memory to store received data */
  size_t len;    /* size of the buffer 'mem' points to */
  size_t memlen; /* size of data copied to mem */
};

typedef int (*sending)(void); /* Curl_send */
+94 −56
Original line number Diff line number Diff line
@@ -195,8 +195,8 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,

  (void)session;
  (void)frame;
  DEBUGF(infof(conn->data, "on_frame_recv() was called with header %x\n",
               frame->hd.type));
  DEBUGF(infof(conn->data, "on_frame_recv() header %x stream %x\n",
               frame->hd.type, stream_id));

  if(stream_id) {
    /* get the stream from the hash based on Stream ID, stream ID zero is for
@@ -267,14 +267,25 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
    left = stream->header_recvbuf->size_used - stream->nread_header_recvbuf;
    ncopy = MIN(stream->len, left);

    memcpy(stream->mem, stream->header_recvbuf->buffer +
           stream->nread_header_recvbuf, ncopy);
    memcpy(&stream->mem[stream->memlen],
           stream->header_recvbuf->buffer + stream->nread_header_recvbuf,
           ncopy);
    stream->nread_header_recvbuf += ncopy;

    stream->mem += ncopy;
    DEBUGF(infof(data_s, "Store %zu bytes headers from stream %x at %p\n",
                 ncopy, stream_id, stream->mem));

    stream->len -= ncopy;
    stream->memlen += ncopy;

    stream->mem[stream->memlen] = 0; /* DEBUG, remove this */

    DEBUGF(infof(data_s, "BUF: %s", stream->mem));

    data_s->state.drain++;
    break;
  case NGHTTP2_PUSH_PROMISE:
    DEBUGF(infof(data_s, "Got PUSH_PROMISE, RST_STREAM it!\n"));
    rv = nghttp2_submit_rst_stream(session, NGHTTP2_FLAG_NONE,
                                   frame->push_promise.promised_stream_id,
                                   NGHTTP2_CANCEL);
@@ -328,14 +339,19 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
  stream = data_s->req.protop;

  nread = MIN(stream->len, len);
  memcpy(stream->mem, data, nread);
  memcpy(&stream->mem[stream->memlen], data, nread);

  stream->mem += nread;
  stream->len -= nread;
  stream->memlen += nread;

  data_s->state.drain++;
  /* TODO: this may need to set expire for the multi_socket to work for this
     stream */

  DEBUGF(infof(conn->data, "%zu data received for stream %x "
               "(%zu left in buffer)\n",
               nread, stream_id, stream->len));
               "(%zu left in buffer %p)\n",
               nread, stream_id,
               stream->len, stream->mem));

  if(nread < len) {
    stream->data = data + nread;
@@ -528,7 +544,10 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
    Curl_add_buffer(stream->header_recvbuf, "HTTP/2.0 ", 9);
    Curl_add_buffer(stream->header_recvbuf, value, valuelen);
    Curl_add_buffer(stream->header_recvbuf, "\r\n", 2);
    data_s->state.drain++;

    DEBUGF(infof(data_s, "h2 status: HTTP/2 %03d\n",
                 stream->status_code));
    return 0;
  }
  else {
@@ -550,6 +569,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
    Curl_add_buffer(stream->header_recvbuf, ":", 1);
    Curl_add_buffer(stream->header_recvbuf, value, valuelen);
    Curl_add_buffer(stream->header_recvbuf, "\r\n", 2);
    data_s->state.drain++;

    DEBUGF(infof(data_s, "h2 header: %.*s: %.*s\n",
                 namelen, name, valuelen, value));
@@ -737,7 +757,8 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
  ssize_t rv;
  ssize_t nread;
  struct http_conn *httpc = &conn->proto.httpc;
  struct HTTP *stream = conn->data->req.protop;
  struct SessionHandle *data = conn->data;
  struct HTTP *stream = data->req.protop;

  (void)sockindex; /* we always do HTTP2 on sockindex 0 */

@@ -768,7 +789,7 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
           ncopy);
    stream->nread_header_recvbuf += ncopy;

    infof(conn->data, "http2_recv: Got %d bytes from header_recvbuf\n",
    infof(data, "http2_recv: Got %d bytes from header_recvbuf\n",
          (int)ncopy);
    return ncopy;
  }
@@ -780,22 +801,37 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
    stream->data += nread;
    stream->datalen -= nread;

    infof(conn->data, "%zu data bytes written\n", nread);
    infof(data, "%zu data bytes written\n", nread);
    if(stream->datalen == 0) {
      stream->data = NULL;
      stream->datalen = 0;
    }
    infof(conn->data, "http2_recv: Got %d bytes from stream->data\n",
    infof(data, "http2_recv: Got %d bytes from stream->data\n",
          (int)nread);
    return nread;
  }

  if(data->state.drain) {
    DEBUGF(infof(data, "http2_recv: DRAIN %zu bytes stream %x!! (%p => %p)\n",
                 stream->memlen, stream->stream_id,
                 stream->mem, mem));
    if(mem != stream->mem) {
      /* if we didn't get the same buffer this time, we must move the data to
         the beginning */
      memmove(mem, stream->mem, stream->memlen);
      stream->len = len - stream->memlen;
      stream->mem = mem;
    }
  }
  else {
    /* remember where to store incoming data for this stream and how big the
       buffer is */
    stream->mem = mem;
    stream->len = len;
    stream->memlen = 0;

  infof(conn->data, "http2_recv: %d bytes buffer\n", stream->len);
    infof(data, "http2_recv: %d bytes buffer (stream %x)\n",
          stream->len, stream->stream_id);

    nread = ((Curl_recv*)httpc->recv_underlying)(conn, FIRSTSOCKET,
                                                 httpc->inbuf, H2_BUFSIZE,
@@ -806,29 +842,29 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
    }

    if(nread == -1) {
    failf(conn->data, "Failed receiving HTTP2 data");
      failf(data, "Failed receiving HTTP2 data");
      *err = result;
      return 0;
    }

    if(nread == 0) {
    failf(conn->data, "Unexpected EOF");
      failf(data, "Unexpected EOF");
      *err = CURLE_RECV_ERROR;
      return -1;
    }

  DEBUGF(infof(conn->data, "nread=%zd\n", nread));
    DEBUGF(infof(data, "nread=%zd\n", nread));

    rv = nghttp2_session_mem_recv(httpc->h2,
                                  (const uint8_t *)httpc->inbuf, nread);

    if(nghttp2_is_fatal((int)rv)) {
    failf(conn->data, "nghttp2_session_mem_recv() returned %d:%s\n",
      failf(data, "nghttp2_session_mem_recv() returned %d:%s\n",
            rv, nghttp2_strerror((int)rv));
      *err = CURLE_RECV_ERROR;
      return 0;
    }
  DEBUGF(infof(conn->data, "nghttp2_session_mem_recv() returns %zd\n", rv));
    DEBUGF(infof(data, "nghttp2_session_mem_recv() returns %zd\n", rv));
    /* Always send pending frames in nghttp2 session, because
       nghttp2_session_mem_recv() may queue new frame */
    rv = nghttp2_session_send(httpc->h2);
@@ -836,10 +872,12 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
      *err = CURLE_SEND_ERROR;
      return 0;
    }
  }
  if(len != stream->len) {
    infof(conn->data, "http2_recv: returns %d for stream %x (%zu/%zu)\n",
    infof(data, "http2_recv: returns %d for stream %x (%zu/%zu)\n",
          len - stream->len, stream->stream_id,
          len, stream->len);
    data->state.drain = 0; /* this stream is hereby drained */
    return len - stream->len;
  }
  /* If stream is closed, return 0 to signal the http routine to close
@@ -849,17 +887,17 @@ static ssize_t http2_recv(struct connectdata *conn, int sockindex,
       function. */
    stream->closed = FALSE;
    if(stream->error_code != NGHTTP2_NO_ERROR) {
      failf(conn->data,
      failf(data,
            "HTTP/2 stream = %x was not closed cleanly: error_code = %d",
            stream->stream_id, stream->error_code);
      *err = CURLE_HTTP2;
      return -1;
    }
    DEBUGF(infof(conn->data, "http2_recv returns 0\n"));
    DEBUGF(infof(data, "http2_recv returns 0\n"));
    return 0;
  }
  *err = CURLE_AGAIN;
  DEBUGF(infof(conn->data, "http2_recv returns -1, AGAIN\n"));
  DEBUGF(infof(data, "http2_recv returns -1, AGAIN\n"));
  return -1;
}

+2 −0
Original line number Diff line number Diff line
@@ -1500,6 +1500,8 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
        break;
      }

      DEBUGF(infof(data, "%s:%d call Curl_readwrite\n", __func__, __LINE__));

      /* read/write data if it is ready to do so */
      result = Curl_readwrite(data->easy_conn, &done);

+8 −0
Original line number Diff line number Diff line
@@ -1046,6 +1046,11 @@ CURLcode Curl_readwrite(struct connectdata *conn,
  else
    fd_write = CURL_SOCKET_BAD;

  if(conn->data->state.drain) {
    select_res |= CURL_CSELECT_IN;
    DEBUGF(infof(data, "%s: forcibly told to drain data\n", __func__));
  }

  if(!select_res) /* Call for select()/poll() only, if read/write/error
                     status is not known. */
    select_res = Curl_socket_ready(fd_read, fd_write, 0);
@@ -1055,6 +1060,9 @@ CURLcode Curl_readwrite(struct connectdata *conn,
    return CURLE_SEND_ERROR;
  }

  DEBUGF(infof(data, "%s: keepon: %x select_res %x\n", __func__, k->keepon,
               select_res));

  /* We go ahead and do a read if we have a readable socket or if
     the stream was rewound (in which case we have data in a
     buffer) */
Loading