Skip to content
multi.c 84.6 KiB
Newer Older
      if(remove_sock_from_hash) {
        /* in this case 'entry' is always non-NULL */
Daniel Stenberg's avatar
Daniel Stenberg committed
          multi->socket_cb(data,
                           s,
                           CURL_POLL_REMOVE,
                           multi->socket_userp,
                           entry->socketp);
Daniel Stenberg's avatar
Daniel Stenberg committed
  memcpy(data->sockets, socks, num*sizeof(curl_socket_t));
  data->numsocks = num;
/*
 * Curl_multi_closed()
 *
 * Used by the connect code to tell the multi_socket code that one of the
 * sockets we were using have just been closed.  This function will then
 * remove it from the sockethash for this handle to make the multi_socket API
 * behave properly, especially for the case when libcurl will create another
 * socket again and it gets the same file descriptor number.
 */

void Curl_multi_closed(struct connectdata *conn, curl_socket_t s)
{
  struct Curl_multi *multi = conn->data->multi;
  if(multi) {
    /* this is set if this connection is part of a handle that is added to
       a multi handle, and only then this is necessary */
    struct Curl_sh_entry *entry =
      Curl_hash_pick(multi->sockhash, (char *)&s, sizeof(s));

    if(entry) {
      if(multi->socket_cb)
        multi->socket_cb(conn->data, s, CURL_POLL_REMOVE,
                         multi->socket_userp,
                         entry->socketp);

      /* now remove it from the socket hash */
      sh_delentry(multi->sockhash, s);
    }
  }
}



/*
 * add_next_timeout()
 *
 * Each SessionHandle has a list of timeouts. The add_next_timeout() is called
 * when it has just been removed from the splay tree because the timeout has
 * expired. This function is then to advance in the list to pick the next
 * timeout to use (skip the already expired ones) and add this node back to
 * the splay tree again.
 *
 * The splay tree only has each sessionhandle as a single node and the nearest
 * timeout is used to sort it on.
 */
static CURLMcode add_next_timeout(struct timeval now,
                                  struct Curl_multi *multi,
                                  struct SessionHandle *d)
{
  struct timeval *tv = &d->state.expiretime;
  struct curl_llist *list = d->state.timeoutlist;
  struct curl_llist_element *e;

  /* move over the timeout list for this specific handle and remove all
     timeouts that are now passed tense and store the next pending
     timeout in *tv */
  for(e = list->head; e; ) {
    struct curl_llist_element *n = e->next;
    long diff = curlx_tvdiff(*(struct timeval *)e->ptr, now);
    if(diff <= 0)
      /* remove outdated entry */
      Curl_llist_remove(list, e, NULL);
    else
      /* the list is sorted so get out on the first mismatch */
      break;
    e = n;
  }
  e = list->head;
  if(!e) {
    /* clear the expire times within the handles that we remove from the
       splay tree */
    tv->tv_sec = 0;
    tv->tv_usec = 0;
  }
  else {
    /* copy the first entry to 'tv' */
    memcpy(tv, e->ptr, sizeof(*tv));

    /* remove first entry from list */
    Curl_llist_remove(list, e, NULL);

    /* insert this node again into the splay */
    multi->timetree = Curl_splayinsert(*tv, multi->timetree,
                                       &d->state.timenode);
  }
  return CURLM_OK;
}

static CURLMcode multi_socket(struct Curl_multi *multi,
                              bool checkall,
{
  CURLMcode result = CURLM_OK;
  struct SessionHandle *data = NULL;
  struct Curl_tree *t;
  struct timeval now = Curl_tvnow();
    /* *perform() deals with running_handles on its own */
    result = curl_multi_perform(multi, running_handles);

    /* walk through each easy handle and do the socket state change magic
       and callbacks */
Daniel Stenberg's avatar
Daniel Stenberg committed
    data=multi->easyp;
    while(data) {
      singlesocket(multi, data);
      data = data->next;
    /* or should we fall-through and do the timer-based stuff? */

    struct Curl_sh_entry *entry =
      Curl_hash_pick(multi->sockhash, (char *)&s, sizeof(s));

    if(!entry)
      /* Unmatched socket, we can't act on it but we ignore this fact.  In
         real-world tests it has been proved that libevent can in fact give
         the application actions even though the socket was just previously
         asked to get removed, so thus we better survive stray socket actions
         and just move on. */
      ;
    else {
      data = entry->easy;
      if(data->magic != CURLEASY_MAGIC_NUMBER)
        /* bad bad bad bad bad bad bad */
        return CURLM_INTERNAL_ERROR;
      /* If the pipeline is enabled, take the handle which is in the head of
         the pipeline. If we should write into the socket, take the send_pipe
         head.  If we should read from the socket, take the recv_pipe head. */
      if(data->easy_conn) {
        if((ev_bitmask & CURL_POLL_OUT) &&
           data->easy_conn->send_pipe &&
           data->easy_conn->send_pipe->head)
          data = data->easy_conn->send_pipe->head->ptr;
        else if((ev_bitmask & CURL_POLL_IN) &&
                data->easy_conn->recv_pipe &&
                data->easy_conn->recv_pipe->head)
          data = data->easy_conn->recv_pipe->head->ptr;
      if(data->easy_conn &&
         !(data->easy_conn->handler->flags & PROTOPT_DIRLOCK))
        /* set socket event bitmask if they're not locked */
        data->easy_conn->cselect_bits = ev_bitmask;
        result = multi_runsingle(multi, now, data);
      while(CURLM_CALL_MULTI_PERFORM == result);
      if(data->easy_conn &&
         !(data->easy_conn->handler->flags & PROTOPT_DIRLOCK))
        /* clear the bitmask only if not locked */
        data->easy_conn->cselect_bits = 0;
      if(CURLM_OK >= result)
        /* get the socket(s) and check if the state has been changed since
           last */
        singlesocket(multi, data);
      /* Now we fall-through and do the timer-based stuff, since we don't want
         to force the user to have to deal with timeouts as long as at least
         one connection in fact has traffic. */
      data = NULL; /* set data to NULL again to avoid calling
                      multi_runsingle() in case there's no need to */
    }
  /* Compensate for bad precision timers that might've triggered too early.

     This precaution was added in commit 2c72732ebf3da5e as a result of bad
     resolution in the windows function use(d).

     The problematic case here is when using the multi_socket API and libcurl
     has told the application about a timeout, and that timeout is what fires
     off a bit early. As we don't have any IDs associated with the timeout we
     can't tell which timeout that fired off but we only have the times to use
     to check what to do. If it fires off too early, we don't run the correct
     actions and we don't tell the application again about the same timeout as
     was already first in the queue...

     Originally we made the timeouts run 40 milliseconds early on all systems,
     but now we have an #ifdef setup to provide a decent precaution inaccuracy
     margin.
  */

  now.tv_usec += MULTI_TIMEOUT_INACCURACY;
    now.tv_sec++;
    now.tv_usec -= 1000000;
  }

  /*
   * The loop following here will go on as long as there are expire-times left
   * to process in the splay and 'data' will be re-assigned for every expired
   * handle we deal with.
   */
  do {
    /* the first loop lap 'data' can be NULL */
    if(data) {
        result = multi_runsingle(multi, now, data);
      while(CURLM_CALL_MULTI_PERFORM == result);
      if(CURLM_OK >= result)
        /* get the socket(s) and check if the state has been changed since
           last */
        singlesocket(multi, data);
    }

    /* Check if there's one (more) expired timer to deal with! This function
       extracts a matching node if there is one */

    multi->timetree = Curl_splaygetbest(now, multi->timetree, &t);
    if(t) {
      data = t->payload; /* assign this for next loop */
      (void)add_next_timeout(now, multi, t->payload);
#undef curl_multi_setopt
CURLMcode curl_multi_setopt(CURLM *multi_handle,
                            CURLMoption option, ...)
{
  struct Curl_multi *multi=(struct Curl_multi *)multi_handle;
  CURLMcode res = CURLM_OK;
  va_list param;

  if(!GOOD_MULTI_HANDLE(multi))
    return CURLM_BAD_HANDLE;

  va_start(param, option);

  switch(option) {
  case CURLMOPT_SOCKETFUNCTION:
    multi->socket_cb = va_arg(param, curl_socket_callback);
    break;
  case CURLMOPT_SOCKETDATA:
    multi->socket_userp = va_arg(param, void *);
    break;
    multi->pipelining_enabled = (0 != va_arg(param, long)) ? TRUE : FALSE;
  case CURLMOPT_TIMERFUNCTION:
    multi->timer_cb = va_arg(param, curl_multi_timer_callback);
    break;
  case CURLMOPT_TIMERDATA:
    multi->timer_userp = va_arg(param, void *);
    break;
  case CURLMOPT_MAXCONNECTS:
    multi->maxconnects = va_arg(param, long);
    break;
  case CURLMOPT_MAX_HOST_CONNECTIONS:
    multi->max_host_connections = va_arg(param, long);
    break;
  case CURLMOPT_MAX_PIPELINE_LENGTH:
    multi->max_pipeline_length = va_arg(param, long);
    break;
  case CURLMOPT_CONTENT_LENGTH_PENALTY_SIZE:
    multi->content_length_penalty_size = va_arg(param, long);
    break;
  case CURLMOPT_CHUNK_LENGTH_PENALTY_SIZE:
    multi->chunk_length_penalty_size = va_arg(param, long);
    break;
  case CURLMOPT_PIPELINING_SITE_BL:
    res = Curl_pipeline_set_site_blacklist(va_arg(param, char **),
                                           &multi->pipelining_site_bl);
    break;
  case CURLMOPT_PIPELINING_SERVER_BL:
    res = Curl_pipeline_set_server_blacklist(va_arg(param, char **),
                                             &multi->pipelining_server_bl);
    break;
  case CURLMOPT_MAX_TOTAL_CONNECTIONS:
    multi->max_total_connections = va_arg(param, long);
    break;
/* we define curl_multi_socket() in the public multi.h header */
#undef curl_multi_socket
CURLMcode curl_multi_socket(CURLM *multi_handle, curl_socket_t s,
                            int *running_handles)
  CURLMcode result = multi_socket((struct Curl_multi *)multi_handle, FALSE, s,
    update_timer((struct Curl_multi *)multi_handle);
  return result;
}

CURLMcode curl_multi_socket_action(CURLM *multi_handle, curl_socket_t s,
Daniel Stenberg's avatar
Daniel Stenberg committed
                                   int ev_bitmask, int *running_handles)
{
  CURLMcode result = multi_socket((struct Curl_multi *)multi_handle, FALSE, s,
                                  ev_bitmask, running_handles);
    update_timer((struct Curl_multi *)multi_handle);
  return result;
CURLMcode curl_multi_socket_all(CURLM *multi_handle, int *running_handles)
  CURLMcode result = multi_socket((struct Curl_multi *)multi_handle,
                                  TRUE, CURL_SOCKET_BAD, 0, running_handles);
    update_timer((struct Curl_multi *)multi_handle);
  return result;
static CURLMcode multi_timeout(struct Curl_multi *multi,
                               long *timeout_ms)
  static struct timeval tv_zero = {0,0};

  if(multi->timetree) {
    /* we have a tree of expire times */
    struct timeval now = Curl_tvnow();

    /* splay the lowest to the bottom */
    multi->timetree = Curl_splay(tv_zero, multi->timetree);
    if(Curl_splaycomparekeys(multi->timetree->key, now) > 0) {
      /* some time left before expiration */
      *timeout_ms = curlx_tvdiff(multi->timetree->key, now);
      if(!*timeout_ms)
        /*
         * Since we only provide millisecond resolution on the returned value
         * and the diff might be less than one millisecond here, we don't
         * return zero as that may cause short bursts of busyloops on fast
         * processors while the diff is still present but less than one
         * millisecond! instead we return 1 until the time is ripe.
         */
        *timeout_ms=1;
    }
      /* 0 means immediately */
      *timeout_ms = 0;
  }
  else
    *timeout_ms = -1;

  return CURLM_OK;
}

CURLMcode curl_multi_timeout(CURLM *multi_handle,
                             long *timeout_ms)
{
  struct Curl_multi *multi=(struct Curl_multi *)multi_handle;

  /* First, make some basic checks that the CURLM handle is a good handle */
  if(!GOOD_MULTI_HANDLE(multi))
    return CURLM_BAD_HANDLE;

  return multi_timeout(multi, timeout_ms);
}

/*
 * Tell the application it should update its timers, if it subscribes to the
 * update timer callback.
 */
static int update_timer(struct Curl_multi *multi)
{
  long timeout_ms;
  if(multi_timeout(multi, &timeout_ms)) {
    static const struct timeval none={0,0};
    if(Curl_splaycomparekeys(none, multi->timer_lastcall)) {
      multi->timer_lastcall = none;
      /* there's no timeout now but there was one previously, tell the app to
         disable it */
      return multi->timer_cb((CURLM*)multi, -1, multi->timer_userp);
    }

  /* When multi_timeout() is done, multi->timetree points to the node with the
   * timeout we got the (relative) time-out time for. We can thus easily check
   * if this is the same (fixed) time as we got in a previous call and then
   * avoid calling the callback again. */
  if(Curl_splaycomparekeys(multi->timetree->key, multi->timer_lastcall) == 0)
    return 0;

  multi->timer_lastcall = multi->timetree->key;

  return multi->timer_cb((CURLM*)multi, timeout_ms, multi->timer_userp);
}

void Curl_multi_set_easy_connection(struct SessionHandle *handle,
                                    struct connectdata *conn)
  handle->easy_conn = conn;
}

static bool isHandleAtHead(struct SessionHandle *handle,
                           struct curl_llist *pipeline)
{
  struct curl_llist_element *curr = pipeline->head;
  if(curr)
    return (curr->ptr == handle) ? TRUE : FALSE;
Daniel Stenberg's avatar
Daniel Stenberg committed
/*
 * multi_freetimeout()
 *
 * Callback used by the llist system when a single timeout list entry is
 * destroyed.
 */
static void multi_freetimeout(void *user, void *entryptr)
{
  (void)user;

  /* the entry was plain malloc()'ed */
  free(entryptr);
}

/*
 * multi_addtimeout()
 *
 * Add a timestamp to the list of timeouts. Keep the list sorted so that head
 * of list is always the timeout nearest in time.
 *
 */
static CURLMcode
multi_addtimeout(struct curl_llist *timeoutlist,
                 struct timeval *stamp)
{
  struct curl_llist_element *e;
  struct timeval *timedup;
  struct curl_llist_element *prev = NULL;

  timedup = malloc(sizeof(*timedup));
  if(!timedup)
    return CURLM_OUT_OF_MEMORY;

  /* copy the timestamp */
  memcpy(timedup, stamp, sizeof(*timedup));

  if(Curl_llist_count(timeoutlist)) {
    /* find the correct spot in the list */
    for(e = timeoutlist->head; e; e = e->next) {
      struct timeval *checktime = e->ptr;
      long diff = curlx_tvdiff(*checktime, *timedup);
      if(diff > 0)
        break;
      prev = e;
    }

  }
  /* else
     this is the first timeout on the list */

  if(!Curl_llist_insert_next(timeoutlist, prev, timedup)) {
    free(timedup);
Daniel Stenberg's avatar
Daniel Stenberg committed
    return CURLM_OUT_OF_MEMORY;
Daniel Stenberg's avatar
Daniel Stenberg committed

  return CURLM_OK;
}
Daniel Stenberg's avatar
Daniel Stenberg committed
/*
 * Curl_expire()
 *
 * given a number of milliseconds from now to use to set the 'act before
 * this'-time for the transfer, to be extracted by curl_multi_timeout()
 *
 * Note that the timeout will be added to a queue of timeouts if it defines a
 * moment in time that is later than the current head of queue.
 *
 * Pass zero to clear all timeout values for this handle.
void Curl_expire(struct SessionHandle *data, long milli)
{
  struct Curl_multi *multi = data->multi;
  struct timeval *nowp = &data->state.expiretime;
  /* this is only interesting while there is still an associated multi struct
     remaining! */
  if(!multi)
    return;

  if(!milli) {
    /* No timeout, clear the time data. */
    if(nowp->tv_sec || nowp->tv_usec) {
      /* Since this is an cleared time, we must remove the previous entry from
         the splay tree */
Daniel Stenberg's avatar
Daniel Stenberg committed
      struct curl_llist *list = data->state.timeoutlist;

      rc = Curl_splayremovebyaddr(multi->timetree,
                                  &data->state.timenode,
                                  &multi->timetree);
      if(rc)
        infof(data, "Internal error clearing splay node = %d\n", rc);
Daniel Stenberg's avatar
Daniel Stenberg committed

      /* flush the timeout list too */
      while(list->size > 0)
        Curl_llist_remove(list, list->tail, NULL);

#ifdef DEBUGBUILD
Yang Tse's avatar
Yang Tse committed
      nowp->tv_sec = 0;
Yang Tse's avatar
Yang Tse committed
      nowp->tv_usec = 0;
    }
  }
  else {
    struct timeval set;

    set = Curl_tvnow();
    set.tv_sec += milli/1000;
    set.tv_usec += (milli%1000)*1000;

    if(nowp->tv_sec || nowp->tv_usec) {
      /* This means that the struct is added as a node in the splay tree.
         Compare if the new time is earlier, and only remove-old/add-new if it
         is. */
Daniel Stenberg's avatar
Daniel Stenberg committed
      if(diff > 0) {
        /* the new expire time was later so just add it to the queue
           and get out */
        multi_addtimeout(data->state.timeoutlist, &set);
Daniel Stenberg's avatar
Daniel Stenberg committed
      }

      /* the new time is newer than the presently set one, so add the current
         to the queue and update the head */
      multi_addtimeout(data->state.timeoutlist, nowp);

      /* Since this is an updated time, we must remove the previous entry from
         the splay tree first and then re-add the new value */
      rc = Curl_splayremovebyaddr(multi->timetree,
                                  &data->state.timenode,
                                  &multi->timetree);
      if(rc)
        infof(data, "Internal error removing splay node = %d\n", rc);
    multi->timetree = Curl_splayinsert(*nowp,
                                       multi->timetree,
                                       &data->state.timenode);
  }
#if 0
  Curl_splayprint(multi->timetree, 0, TRUE);
#endif
}

CURLMcode curl_multi_assign(CURLM *multi_handle,
                            curl_socket_t s, void *hashp)
{
  struct Curl_sh_entry *there = NULL;
  struct Curl_multi *multi = (struct Curl_multi *)multi_handle;

  if(s != CURL_SOCKET_BAD)
    there = Curl_hash_pick(multi->sockhash, (char *)&s, sizeof(curl_socket_t));

  if(!there)
    return CURLM_BAD_SOCKET;

  there->socketp = hashp;

  return CURLM_OK;
}
size_t Curl_multi_max_host_connections(struct Curl_multi *multi)
{
  return multi ? multi->max_host_connections : 0;
}

size_t Curl_multi_max_total_connections(struct Curl_multi *multi)
{
  return multi ? multi->max_total_connections : 0;
}

size_t Curl_multi_max_pipeline_length(struct Curl_multi *multi)
{
  return multi ? multi->max_pipeline_length : 0;
}

curl_off_t Curl_multi_content_length_penalty_size(struct Curl_multi *multi)
{
  return multi ? multi->content_length_penalty_size : 0;
}

curl_off_t Curl_multi_chunk_length_penalty_size(struct Curl_multi *multi)
{
  return multi ? multi->chunk_length_penalty_size : 0;
}

struct curl_llist *Curl_multi_pipelining_site_bl(struct Curl_multi *multi)
{
  return multi->pipelining_site_bl;
}

struct curl_llist *Curl_multi_pipelining_server_bl(struct Curl_multi *multi)
{
  return multi->pipelining_server_bl;
}

void Curl_multi_process_pending_handles(struct Curl_multi *multi)
{
Daniel Stenberg's avatar
Daniel Stenberg committed
  struct SessionHandle *data;
Daniel Stenberg's avatar
Daniel Stenberg committed
  data=multi->easyp;
  while(data) {
    if(data->mstate == CURLM_STATE_CONNECT_PEND) {
      multistate(data, CURLM_STATE_CONNECT);
      /* Make sure that the handle will be processed soonish. */
Daniel Stenberg's avatar
Daniel Stenberg committed
      Curl_expire(data, 1);
Daniel Stenberg's avatar
Daniel Stenberg committed
    data = data->next; /* operate on next handle */
#ifdef DEBUGBUILD
void Curl_multi_dump(const struct Curl_multi *multi_handle)
{
  struct Curl_multi *multi=(struct Curl_multi *)multi_handle;
Daniel Stenberg's avatar
Daniel Stenberg committed
  struct SessionHandle *data;
  int i;
  fprintf(stderr, "* Multi status: %d handles, %d alive\n",
          multi->num_easy, multi->num_alive);
Daniel Stenberg's avatar
Daniel Stenberg committed
  for(data=multi->easyp; data; data = data->next) {
    if(data->mstate < CURLM_STATE_COMPLETED) {
      /* only display handles that are not completed */
      fprintf(stderr, "handle %p, state %s, %d sockets\n",
Daniel Stenberg's avatar
Daniel Stenberg committed
              (void *)data,
              statename[data->mstate], data->numsocks);
      for(i=0; i < data->numsocks; i++) {
        curl_socket_t s = data->sockets[i];
        struct Curl_sh_entry *entry =
          Curl_hash_pick(multi->sockhash, (char *)&s, sizeof(s));

        fprintf(stderr, "%d ", (int)s);
        if(!entry) {
          fprintf(stderr, "INTERNAL CONFUSION\n");
          continue;
        }
        fprintf(stderr, "[%s %s] ",
                entry->action&CURL_POLL_IN?"RECVING":"",
                entry->action&CURL_POLL_OUT?"SENDING":"");
      }
Daniel Stenberg's avatar
Daniel Stenberg committed
      if(data->numsocks)