Re: [BUG] msr-trace.h:42 suspicious rcu_dereference_check() usage!
From: Steven Rostedt
Date: Mon Nov 28 2016 - 17:46:16 EST
On Mon, 28 Nov 2016 13:48:25 -0800
Andi Kleen <andi@xxxxxxxxxxxxxx> wrote:
> > I took a look at this and forced some more functions to be inlined. I
> > did a little tweaking here and there. Could you pull my tree and see if
> > things are better? I don't currently have the hardware to run this
> > myself.
> >
> > git://git.kernel.org/pub/scm/linux/kernel/git/rostedt/linux-trace.git
> >
> > branch: ftrace/core
>
> Didn't help much unfortunately. I think you need a real fast path here.
Well, actually it's been optimized a lot from the first instances.
Note, tracepoints need to be quite generic, so where it can be optimized
is not obvious.
>
> http://halobates.de/tracepoint-trace2
>
> $ wc -l tracepoint-trace2
> 595 tracepoint-trace2
>
> Opcode Histogram:
>
> 201 mov
> 51 push
> 51 pop
> 34 test
> 32 cmp
> 26 jz
> 24 jnz
> 24 and
> 16 ret
> 16 lea
> 15 call
> 12 add
> 10 jmp
>
> Functions with # of instructions:
>
> 25 trace_event_raw_event_sched_switch
This is from the TRACE_EVENT macro.
trace_event_raw_event_##call(void *__data, proto) \
{ \
struct trace_event_file *trace_file = __data; \
struct trace_event_data_offsets_##call __maybe_unused __data_offsets;\
struct trace_event_buffer fbuffer; \
struct trace_event_raw_##call *entry; \
int __data_size; \
\
if (trace_trigger_soft_disabled(trace_file)) \
return; \
\
__data_size = trace_event_get_offsets_##call(&__data_offsets, args); \
\
entry = trace_event_buffer_reserve(&fbuffer, trace_file, \
sizeof(*entry) + __data_size); \
> 19 trace_event_buffer_reserve
void *trace_event_buffer_reserve(struct trace_event_buffer *fbuffer,
struct trace_event_file *trace_file,
unsigned long len)
{
struct trace_event_call *event_call = trace_file->event_call;
if ((trace_file->flags & EVENT_FILE_FL_PID_FILTER) &&
trace_event_ignore_this_pid(trace_file))
return NULL;
local_save_flags(fbuffer->flags);
fbuffer->pc = preempt_count();
/*
* If CONFIG_PREEMPT is enabled, then the tracepoint itself disables
* preemption (adding one to the preempt_count). Since we are
* interested in the preempt_count at the time the tracepoint was
* hit, we need to subtract one to offset the increment.
*/
if (IS_ENABLED(CONFIG_PREEMPT))
fbuffer->pc--;
fbuffer->trace_file = trace_file;
fbuffer->event =
trace_event_buffer_lock_reserve(&fbuffer->buffer, trace_file,
event_call->event.type, len,
fbuffer->flags, fbuffer->pc);
> 21 trace_event_buffer_lock_reserve
struct ring_buffer_event *
trace_event_buffer_lock_reserve(struct ring_buffer **current_rb,
struct trace_event_file *trace_file,
int type, unsigned long len,
unsigned long flags, int pc)
{
struct ring_buffer_event *entry;
int val;
*current_rb = trace_file->tr->trace_buffer.buffer;
if ((trace_file->flags &
(EVENT_FILE_FL_SOFT_DISABLED | EVENT_FILE_FL_FILTERED)) &&
(entry = this_cpu_read(trace_buffered_event))) {
Note: Some of these if statements can be encapsulated with jump labels, as they
are false pretty much all of the time.
/* Try to use the per cpu buffer first */
val = this_cpu_inc_return(trace_buffered_event_cnt);
if (val == 1) {
trace_event_setup(entry, type, flags, pc);
entry->array[0] = len;
return entry;
}
this_cpu_dec(trace_buffered_event_cnt);
}
entry = trace_buffer_lock_reserve(*current_rb,
type, len, flags, pc);
> 57 ring_buffer_lock_reserve
struct ring_buffer_event *
ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
{
struct ring_buffer_per_cpu *cpu_buffer;
struct ring_buffer_event *event;
int cpu;
/* If we are tracing schedule, we don't want to recurse */
preempt_disable_notrace();
if (unlikely(atomic_read(&buffer->record_disabled)))
goto out;
cpu = raw_smp_processor_id();
if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
goto out;
cpu_buffer = buffer->buffers[cpu];
if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
goto out;
if (unlikely(length > BUF_MAX_DATA_SIZE))
goto out;
if (unlikely(trace_recursive_lock(cpu_buffer)))
goto out;
event = rb_reserve_next_event(buffer, cpu_buffer, length);
if (!event)
goto out_unlock;
static struct ring_buffer_event *
rb_reserve_next_event(struct ring_buffer *buffer,
struct ring_buffer_per_cpu *cpu_buffer,
unsigned long length)
{
struct ring_buffer_event *event;
struct rb_event_info info;
int nr_loops = 0;
u64 diff;
rb_start_commit(cpu_buffer);
#ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
/*
* Due to the ability to swap a cpu buffer from a buffer
* it is possible it was swapped before we committed.
* (committing stops a swap). We check for it here and
* if it happened, we have to fail the write.
*/
barrier();
if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
local_dec(&cpu_buffer->committing);
local_dec(&cpu_buffer->commits);
return NULL;
}
#endif
info.length = rb_calculate_event_length(length);
again:
info.add_timestamp = 0;
info.delta = 0;
/*
* We allow for interrupts to reenter here and do a trace.
* If one does, it will cause this original code to loop
* back here. Even with heavy interrupts happening, this
* should only happen a few times in a row. If this happens
* 1000 times in a row, there must be either an interrupt
* storm or we have something buggy.
* Bail!
*/
if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
goto out_fail;
info.ts = rb_time_stamp(cpu_buffer->buffer);
diff = info.ts - cpu_buffer->write_stamp;
/* make sure this diff is calculated here */
barrier();
/* Did the write stamp get updated already? */
if (likely(info.ts >= cpu_buffer->write_stamp)) {
info.delta = diff;
if (unlikely(test_time_stamp(info.delta)))
rb_handle_timestamp(cpu_buffer, &info);
}
event = __rb_reserve_next(cpu_buffer, &info);
> 3 trace_clock_local
> 3 sched_clock
> 24 native_sched_clock
Not much we can do about the clock
> 3 sched_clock
> 2 trace_clock_local
> 11 ring_buffer_lock_reserve
> 81 __rb_reserve_next
static struct ring_buffer_event *
__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
struct rb_event_info *info)
{
struct ring_buffer_event *event;
struct buffer_page *tail_page;
unsigned long tail, write;
/*
* If the time delta since the last event is too big to
* hold in the time field of the event, then we append a
* TIME EXTEND event ahead of the data event.
*/
if (unlikely(info->add_timestamp))
info->length += RB_LEN_TIME_EXTEND;
/* Don't let the compiler play games with cpu_buffer->tail_page */
tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
write = local_add_return(info->length, &tail_page->write);
/* set write to only the index of the write */
write &= RB_WRITE_MASK;
tail = write - info->length;
/*
* If this is the first commit on the page, then it has the same
* timestamp as the page itself.
*/
if (!tail)
info->delta = 0;
/* See if we shot pass the end of this buffer page */
if (unlikely(write > BUF_PAGE_SIZE))
return rb_move_tail(cpu_buffer, tail, info);
/* We reserved something on the buffer */
event = __rb_page_index(tail_page, tail);
kmemcheck_annotate_bitfield(event, bitfield);
rb_update_event(cpu_buffer, event, info);
local_inc(&tail_page->entries);
/*
* If this is the first commit on the page, then update
* its timestamp.
*/
if (!tail)
tail_page->page->time_stamp = info->ts;
/* account for these added bytes */
local_add(info->length, &cpu_buffer->entries_bytes);
return event;
}
> 11 ring_buffer_lock_reserve
> 6 trace_event_buffer_lock_reserve
if (!fbuffer->event)
return NULL;
fbuffer->entry = ring_buffer_event_data(fbuffer->event);
return fbuffer->entry;
> 18 ring_buffer_event_data
static void *
rb_event_data(struct ring_buffer_event *event)
{
if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
event = skip_time_extend(event);
BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
/* If length is in len field, then array[0] has the data */
if (event->type_len)
return (void *)&event->array[0];
/* Otherwise length is in array[0] and array[1] has the data */
return (void *)&event->array[1];
}
Hmm, the above could probably be optimized better.
> 5 trace_event_buffer_lock_reserve
> 38 tracing_generic_entry_update
I tried to optimize this one, but nothing really helped.
void
tracing_generic_entry_update(struct trace_entry *entry, unsigned long flags,
int pc)
{
struct task_struct *tsk = current;
entry->preempt_count = pc & 0xff;
entry->pid = (tsk) ? tsk->pid : 0;
entry->flags =
#ifdef CONFIG_TRACE_IRQFLAGS_SUPPORT
(irqs_disabled_flags(flags) ? TRACE_FLAG_IRQS_OFF : 0) |
#else
TRACE_FLAG_IRQS_NOSUPPORT |
#endif
((pc & NMI_MASK ) ? TRACE_FLAG_NMI : 0) |
((pc & HARDIRQ_MASK) ? TRACE_FLAG_HARDIRQ : 0) |
((pc & SOFTIRQ_MASK) ? TRACE_FLAG_SOFTIRQ : 0) |
(tif_need_resched() ? TRACE_FLAG_NEED_RESCHED : 0) |
(test_preempt_need_resched() ? TRACE_FLAG_PREEMPT_RESCHED : 0);
}
> 11 trace_event_buffer_lock_reserve
> 5 trace_event_buffer_reserve
> 18 ring_buffer_event_data
Strange, this is called twice?
> 4 trace_event_buffer_reserve
> 28 trace_event_raw_event_sched_switch
> 30 trace_event_buffer_commit
> 19 trace_buffer_unlock_commit_regs
> 11 ring_buffer_unlock_commit
> 85 rb_commit
The rb_commit is a pain due to lockless commits.
Namely this part:
static void
rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
{
unsigned long max_count;
/*
* We only race with interrupts and NMIs on this CPU.
* If we own the commit event, then we can commit
* all others that interrupted us, since the interruptions
* are in stack format (they finish before they come
* back to us). This allows us to do a simple loop to
* assign the commit to the tail.
*/
again:
max_count = cpu_buffer->nr_pages * 100;
while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
if (RB_WARN_ON(cpu_buffer, !(--max_count)))
return;
if (RB_WARN_ON(cpu_buffer,
rb_is_reader_page(cpu_buffer->tail_page)))
return;
local_set(&cpu_buffer->commit_page->page->commit,
rb_page_write(cpu_buffer->commit_page));
rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
/* Only update the write stamp if the page has an event */
if (rb_page_write(cpu_buffer->commit_page))
cpu_buffer->write_stamp =
cpu_buffer->commit_page->page->time_stamp;
/* add barrier to keep gcc from optimizing too much */
barrier();
}
while (rb_commit_index(cpu_buffer) !=
rb_page_write(cpu_buffer->commit_page)) {
local_set(&cpu_buffer->commit_page->page->commit,
rb_page_write(cpu_buffer->commit_page));
RB_WARN_ON(cpu_buffer,
local_read(&cpu_buffer->commit_page->page->commit) &
~RB_WRITE_MASK);
barrier();
}
/* again, keep gcc from optimizing */
barrier();
/*
* If an interrupt came in just after the first while loop
* and pushed the tail page forward, we will be left with
* a dangling commit that will never go forward.
*/
if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
goto again;
}
> 18 ring_buffer_unlock_commit
> 10 trace_buffer_unlock_commit_regs
> 3 ftrace_trace_userstack
I should make this call a jump label, as it's seldom used.
> 7 trace_buffer_unlock_commit_regs
> 11 trace_event_buffer_commit
> 8 trace_event_raw_event_sched_switch
>
There are a few things that can still be optimized. But the issue
really is that trace_events are generic and macro created, which I need
to balance how much gets inlined vs how much is passed around to
functions.
And the ring buffer is lockless, thus it needs to do various hoops to
make sure things don't break.
-- Steve