Revision d80b60fc2433cc934533f594bfa4e2b6c6103ba3 authored by Alberto Garcia on 08 June 2018, 15:15:36 UTC, committed by Michael Roth on 21 June 2018, 01:45:06 UTC
The throttle block filter can be reopened, and with this it is
possible to change the throttle group that the filter belongs to.

The way the code does that is the following:

  - On throttle_reopen_prepare(): create a new ThrottleGroupMember
    and attach it to the new throttle group.

  - On throttle_reopen_commit(): detach the old ThrottleGroupMember,
    delete it and replace it with the new one.

The problem with this is that by replacing the ThrottleGroupMember the
previous value of io_limits_disabled is lost, causing an assertion
failure in throttle_co_drain_end().

This problem can be reproduced by reopening a throttle node:

   $QEMU -monitor stdio
   -object throttle-group,id=tg0,x-iops-total=1000 \
   -blockdev node-name=hd0,driver=qcow2,file.driver=file,file.filename=hd.qcow2 \
   -blockdev node-name=root,driver=throttle,throttle-group=tg0,file=hd0,read-only=on

   (qemu) block_stream root
   block/throttle.c:214: throttle_co_drain_end: Assertion `tgm->io_limits_disabled' failed.

Since we only want to change the throttle group on reopen there's no
need to create a ThrottleGroupMember and discard the old one. It's
easier if we simply detach it from its current group and attach it to
the new one.

Signed-off-by: Alberto Garcia <berto@igalia.com>
Message-id: 20180608151536.7378-1-berto@igalia.com
Signed-off-by: Max Reitz <mreitz@redhat.com>
(cherry picked from commit bc33c047d1ec0b35c9cd8be62bcefae2da28654f)
Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
1 parent f647a4f
Raw File
qemu-coroutine.c
/*
 * QEMU coroutines
 *
 * Copyright IBM, Corp. 2011
 *
 * Authors:
 *  Stefan Hajnoczi    <stefanha@linux.vnet.ibm.com>
 *  Kevin Wolf         <kwolf@redhat.com>
 *
 * This work is licensed under the terms of the GNU LGPL, version 2 or later.
 * See the COPYING.LIB file in the top-level directory.
 *
 */

#include "qemu/osdep.h"
#include "trace.h"
#include "qemu-common.h"
#include "qemu/thread.h"
#include "qemu/atomic.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
#include "block/aio.h"

enum {
    POOL_BATCH_SIZE = 64,
};

/** Free list to speed up creation */
static QSLIST_HEAD(, Coroutine) release_pool = QSLIST_HEAD_INITIALIZER(pool);
static unsigned int release_pool_size;
static __thread QSLIST_HEAD(, Coroutine) alloc_pool = QSLIST_HEAD_INITIALIZER(pool);
static __thread unsigned int alloc_pool_size;
static __thread Notifier coroutine_pool_cleanup_notifier;

static void coroutine_pool_cleanup(Notifier *n, void *value)
{
    Coroutine *co;
    Coroutine *tmp;

    QSLIST_FOREACH_SAFE(co, &alloc_pool, pool_next, tmp) {
        QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
        qemu_coroutine_delete(co);
    }
}

Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
{
    Coroutine *co = NULL;

    if (CONFIG_COROUTINE_POOL) {
        co = QSLIST_FIRST(&alloc_pool);
        if (!co) {
            if (release_pool_size > POOL_BATCH_SIZE) {
                /* Slow path; a good place to register the destructor, too.  */
                if (!coroutine_pool_cleanup_notifier.notify) {
                    coroutine_pool_cleanup_notifier.notify = coroutine_pool_cleanup;
                    qemu_thread_atexit_add(&coroutine_pool_cleanup_notifier);
                }

                /* This is not exact; there could be a little skew between
                 * release_pool_size and the actual size of release_pool.  But
                 * it is just a heuristic, it does not need to be perfect.
                 */
                alloc_pool_size = atomic_xchg(&release_pool_size, 0);
                QSLIST_MOVE_ATOMIC(&alloc_pool, &release_pool);
                co = QSLIST_FIRST(&alloc_pool);
            }
        }
        if (co) {
            QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
            alloc_pool_size--;
        }
    }

    if (!co) {
        co = qemu_coroutine_new();
    }

    co->entry = entry;
    co->entry_arg = opaque;
    QSIMPLEQ_INIT(&co->co_queue_wakeup);
    return co;
}

static void coroutine_delete(Coroutine *co)
{
    co->caller = NULL;

    if (CONFIG_COROUTINE_POOL) {
        if (release_pool_size < POOL_BATCH_SIZE * 2) {
            QSLIST_INSERT_HEAD_ATOMIC(&release_pool, co, pool_next);
            atomic_inc(&release_pool_size);
            return;
        }
        if (alloc_pool_size < POOL_BATCH_SIZE) {
            QSLIST_INSERT_HEAD(&alloc_pool, co, pool_next);
            alloc_pool_size++;
            return;
        }
    }

    qemu_coroutine_delete(co);
}

void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
{
    Coroutine *self = qemu_coroutine_self();
    CoroutineAction ret;

    /* Cannot rely on the read barrier for co in aio_co_wake(), as there are
     * callers outside of aio_co_wake() */
    const char *scheduled = atomic_mb_read(&co->scheduled);

    trace_qemu_aio_coroutine_enter(ctx, self, co, co->entry_arg);

    /* if the Coroutine has already been scheduled, entering it again will
     * cause us to enter it twice, potentially even after the coroutine has
     * been deleted */
    if (scheduled) {
        fprintf(stderr,
                "%s: Co-routine was already scheduled in '%s'\n",
                __func__, scheduled);
        abort();
    }

    if (co->caller) {
        fprintf(stderr, "Co-routine re-entered recursively\n");
        abort();
    }

    co->caller = self;
    co->ctx = ctx;

    /* Store co->ctx before anything that stores co.  Matches
     * barrier in aio_co_wake and qemu_co_mutex_wake.
     */
    smp_wmb();

    ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);

    qemu_co_queue_run_restart(co);

    /* Beware, if ret == COROUTINE_YIELD and qemu_co_queue_run_restart()
     * has started any other coroutine, "co" might have been reentered
     * and even freed by now!  So be careful and do not touch it.
     */

    switch (ret) {
    case COROUTINE_YIELD:
        return;
    case COROUTINE_TERMINATE:
        assert(!co->locks_held);
        trace_qemu_coroutine_terminate(co);
        coroutine_delete(co);
        return;
    default:
        abort();
    }
}

void qemu_coroutine_enter(Coroutine *co)
{
    qemu_aio_coroutine_enter(qemu_get_current_aio_context(), co);
}

void qemu_coroutine_enter_if_inactive(Coroutine *co)
{
    if (!qemu_coroutine_entered(co)) {
        qemu_coroutine_enter(co);
    }
}

void coroutine_fn qemu_coroutine_yield(void)
{
    Coroutine *self = qemu_coroutine_self();
    Coroutine *to = self->caller;

    trace_qemu_coroutine_yield(self, to);

    if (!to) {
        fprintf(stderr, "Co-routine is yielding to no one\n");
        abort();
    }

    self->caller = NULL;
    qemu_coroutine_switch(self, to, COROUTINE_YIELD);
}

bool qemu_coroutine_entered(Coroutine *co)
{
    return co->caller;
}
back to top