[RFC/PATCH 3/4] relayfs scheme-specific and resizing code

From: Tom Zanussi
Date: Mon Dec 01 2003 - 16:05:26 EST


diff -urpN -X dontdiff linux-2.6.0-test11/fs/relayfs/relay_locking.c linux-2.6.0-test11.cur/fs/relayfs/relay_locking.c
--- linux-2.6.0-test11/fs/relayfs/relay_locking.c Wed Dec 31 18:00:00 1969
+++ linux-2.6.0-test11.cur/fs/relayfs/relay_locking.c Mon Dec 1 10:29:16 2003
@@ -0,0 +1,322 @@
+/*
+ * RelayFS locking scheme implementation.
+ *
+ * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim@xxxxxxxxxxx)
+ * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi@xxxxxxxxxx), IBM Corp
+ *
+ * This file is released under the GPL.
+ */
+
+#include <asm/relay.h>
+#include "relay_locking.h"
+#include "resize.h"
+
+/**
+ * switch_buffers - switches between read and write buffers.
+ * @cur_time: current time.
+ * @cur_tsc: the TSC associated with current_time, if applicable
+ * @rchan: the channel
+ * @finalizing: if true, don't start a new buffer
+ * @resetting: if true,
+ *
+ * This should be called from with interrupts disabled.
+ */
+static void
+switch_buffers(struct timeval cur_time,
+ u32 cur_tsc,
+ struct rchan *rchan,
+ int finalizing,
+ int resetting,
+ int finalize_buffer_only)
+{
+ char *chan_buf_end;
+ int bytes_written;
+
+ if (!rchan->half_switch) {
+ bytes_written = rchan->callbacks->buffer_end(rchan->id,
+ cur_write_pos(rchan), write_buf_end(rchan),
+ cur_time, cur_tsc, using_tsc(rchan));
+ if (bytes_written == 0)
+ rchan->unused_bytes[rchan->buf_idx % rchan->n_bufs] =
+ write_buf_end(rchan) - cur_write_pos(rchan);
+ }
+
+ if (finalize_buffer_only) {
+ rchan->bufs_produced++;
+ return;
+ }
+
+ chan_buf_end = rchan->buf + rchan->n_bufs * rchan->buf_size;
+ if((write_buf(rchan) + rchan->buf_size >= chan_buf_end) || resetting)
+ write_buf(rchan) = rchan->buf;
+ else
+ write_buf(rchan) += rchan->buf_size;
+ write_buf_end(rchan) = write_buf(rchan) + rchan->buf_size;
+ write_limit(rchan) = write_buf_end(rchan) - rchan->end_reserve;
+ cur_write_pos(rchan) = write_buf(rchan);
+
+ rchan->buf_start_time = cur_time;
+ rchan->buf_start_tsc = cur_tsc;
+
+ if (resetting)
+ rchan->buf_idx = 0;
+ else
+ rchan->buf_idx++;
+ rchan->buf_id++;
+
+ if (!packet_delivery(rchan))
+ rchan->unused_bytes[rchan->buf_idx % rchan->n_bufs] = 0;
+
+ if (resetting) {
+ rchan->bufs_produced = rchan->bufs_produced + rchan->n_bufs;
+ rchan->bufs_produced -= rchan->bufs_produced % rchan->n_bufs;
+ rchan->bufs_consumed = rchan->bufs_produced;
+ rchan->bytes_consumed = 0;
+ update_readers_consumed(rchan, rchan->bufs_consumed, rchan->bytes_consumed);
+ } else if (!rchan->half_switch)
+ rchan->bufs_produced++;
+
+ rchan->half_switch = 0;
+
+ if (!finalizing) {
+ bytes_written = rchan->callbacks->buffer_start(rchan->id, cur_write_pos(rchan), rchan->buf_id, cur_time, cur_tsc, using_tsc(rchan));
+ cur_write_pos(rchan) += bytes_written;
+ }
+}
+
+/**
+ * locking_reserve - reserve a slot in the buffer for an event.
+ * @rchan: the channel
+ * @slot_len: the length of the slot to reserve
+ * @ts: variable that will receive the time the slot was reserved
+ * @tsc: the timestamp counter associated with time
+ * @err: receives the result flags
+ * @interrupting: if this write is interrupting another, set to non-zero
+ *
+ * Returns pointer to the beginning of the reserved slot, NULL if error.
+ *
+ * The err value contains the result flags and is an ORed combination
+ * of the following:
+ *
+ * RELAY_BUFFER_SWITCH_NONE - no buffer switch occurred
+ * RELAY_EVENT_DISCARD_NONE - event should not be discarded
+ * RELAY_BUFFER_SWITCH - buffer switch occurred
+ * RELAY_EVENT_DISCARD - event should be discarded (all buffers are full)
+ * RELAY_EVENT_TOO_LONG - event won't fit into even an empty buffer
+ */
+inline char *
+locking_reserve(struct rchan *rchan,
+ u32 slot_len,
+ struct timeval *ts,
+ u32 *tsc,
+ int *err,
+ int *interrupting)
+{
+ u32 buffers_ready;
+ int bytes_written;
+
+ *err = RELAY_BUFFER_SWITCH_NONE;
+
+ if (slot_len >= rchan->buf_size) {
+ *err = RELAY_WRITE_DISCARD | RELAY_WRITE_TOO_LONG;
+ return NULL;
+ }
+
+ if (rchan->initialized == 0) {
+ rchan->initialized = 1;
+ get_timestamp(&rchan->buf_start_time,
+ &rchan->buf_start_tsc, rchan);
+ rchan->unused_bytes[0] = 0;
+ bytes_written = rchan->callbacks->buffer_start(
+ rchan->id, cur_write_pos(rchan),
+ rchan->buf_id, rchan->buf_start_time,
+ rchan->buf_start_tsc, using_tsc(rchan));
+ cur_write_pos(rchan) += bytes_written;
+ *tsc = get_time_delta(ts, rchan);
+ return cur_write_pos(rchan);
+ }
+
+ *tsc = get_time_delta(ts, rchan);
+
+ if (in_progress_event_size(rchan)) {
+ interrupted_pos(rchan) = cur_write_pos(rchan);
+ cur_write_pos(rchan) = in_progress_event_pos(rchan)
+ + in_progress_event_size(rchan)
+ + interrupting_size(rchan);
+ *interrupting = 1;
+ } else {
+ in_progress_event_pos(rchan) = cur_write_pos(rchan);
+ in_progress_event_size(rchan) = slot_len;
+ interrupting_size(rchan) = 0;
+ }
+
+ if (cur_write_pos(rchan) + slot_len > write_limit(rchan)) {
+ if (atomic_read(&rchan->suspended) == 1) {
+ in_progress_event_pos(rchan) = NULL;
+ in_progress_event_size(rchan) = 0;
+ interrupting_size(rchan) = 0;
+ *err = RELAY_WRITE_DISCARD;
+ return NULL;
+ }
+
+ buffers_ready = rchan->bufs_produced - rchan->bufs_consumed;
+ if (buffers_ready == rchan->n_bufs - 1) {
+ if (!mode_continuous(rchan)) {
+ atomic_set(&rchan->suspended, 1);
+ in_progress_event_pos(rchan) = NULL;
+ in_progress_event_size(rchan) = 0;
+ interrupting_size(rchan) = 0;
+ get_timestamp(ts, tsc, rchan);
+ switch_buffers(*ts, *tsc, rchan, 0, 0, 1);
+ recalc_time_delta(ts, tsc, rchan);
+ rchan->half_switch = 1;
+
+ cur_write_pos(rchan) = write_buf_end(rchan) - 1;
+ *err = RELAY_BUFFER_SWITCH | RELAY_WRITE_DISCARD;
+ return NULL;
+ }
+ }
+
+ get_timestamp(ts, tsc, rchan);
+ switch_buffers(*ts, *tsc, rchan, 0, 0, 0);
+ recalc_time_delta(ts, tsc, rchan);
+ *err = RELAY_BUFFER_SWITCH;
+ }
+
+ return cur_write_pos(rchan);
+}
+
+/**
+ * locking_commit - commit a reserved slot in the buffer
+ * @rchan: the channel
+ * @from: commit the length starting here
+ * @len: length committed
+ * @deliver: length committed
+ * @interrupting: not used
+ *
+ * Commits len bytes and calls deliver callback if applicable.
+ */
+inline void
+locking_commit(struct rchan *rchan,
+ char *from,
+ u32 len,
+ int deliver,
+ int interrupting)
+{
+ cur_write_pos(rchan) += len;
+
+ if (interrupting) {
+ cur_write_pos(rchan) = interrupted_pos(rchan);
+ interrupting_size(rchan) += len;
+ } else {
+ in_progress_event_size(rchan) = 0;
+ if (interrupting_size(rchan)) {
+ cur_write_pos(rchan) += interrupting_size(rchan);
+ interrupting_size(rchan) = 0;
+ }
+ }
+
+ if (deliver) {
+ if (bulk_delivery(rchan)) {
+ u32 cur_idx = cur_write_pos(rchan) - rchan->buf;
+ u32 cur_bufno = cur_idx / rchan->buf_size;
+ from = rchan->buf + cur_bufno * rchan->buf_size;
+ len = cur_idx - cur_bufno * rchan->buf_size;
+ }
+ rchan->callbacks->deliver(rchan->id, from, len);
+ expand_check(rchan);
+ }
+}
+
+/**
+ * locking_finalize: - finalize last buffer at end of channel use
+ * @rchan: the channel
+ */
+inline void
+locking_finalize(struct rchan *rchan)
+{
+ unsigned long int flags;
+ struct timeval time;
+ u32 tsc;
+
+ local_irq_save(flags);
+ get_timestamp(&time, &tsc, rchan);
+ switch_buffers(time, tsc, rchan, 1, 0, 0);
+ local_irq_restore(flags);
+}
+
+/**
+ * locking_get_offset - get current and max 'file' offsets for VFS
+ * @rchan: the channel
+ * @max_offset: maximum channel offset
+ *
+ * Returns the current and maximum buffer offsets in VFS terms.
+ */
+u32
+locking_get_offset(struct rchan *rchan,
+ u32 *max_offset)
+{
+ if (max_offset)
+ *max_offset = rchan->buf_size * rchan->n_bufs - 1;
+
+ return cur_write_pos(rchan) - rchan->buf;
+}
+
+/**
+ * locking_reset - reset the channel
+ * @rchan: the channel
+ * @init: 1 if this is a first-time channel initialization
+ */
+void locking_reset(struct rchan *rchan, int init)
+{
+ if (init)
+ channel_lock(rchan) = SPIN_LOCK_UNLOCKED;
+ write_buf(rchan) = rchan->buf;
+ write_buf_end(rchan) = write_buf(rchan) + rchan->buf_size;
+ cur_write_pos(rchan) = write_buf(rchan);
+ write_limit(rchan) = write_buf_end(rchan) - rchan->end_reserve;
+ in_progress_event_pos(rchan) = NULL;
+ in_progress_event_size(rchan) = 0;
+ interrupted_pos(rchan) = NULL;
+ interrupting_size(rchan) = 0;
+}
+
+/**
+ * locking_reset_index - atomically set channel index to the beginning
+ * @rchan: the channel
+ *
+ * If this fails, it means that something else just logged something
+ * and therefore we probably no longer want to do this. It's up to the
+ * caller anyway...
+ *
+ * Returns 0 if the index was successfully set, negative otherwise
+ */
+int
+locking_reset_index(struct rchan *rchan, u32 old_idx)
+{
+ unsigned long flags;
+ struct timeval time;
+ u32 tsc;
+ u32 cur_idx;
+
+ relay_lock_channel(rchan, flags);
+ cur_idx = locking_get_offset(rchan, NULL);
+ if (cur_idx != old_idx) {
+ relay_unlock_channel(rchan, flags);
+ return -1;
+ }
+
+ get_timestamp(&time, &tsc, rchan);
+ switch_buffers(time, tsc, rchan, 0, 1, 0);
+
+ relay_unlock_channel(rchan, flags);
+
+ return 0;
+}
+
+
+
+
+
+
+
diff -urpN -X dontdiff linux-2.6.0-test11/fs/relayfs/relay_locking.h linux-2.6.0-test11.cur/fs/relayfs/relay_locking.h
--- linux-2.6.0-test11/fs/relayfs/relay_locking.h Wed Dec 31 18:00:00 1969
+++ linux-2.6.0-test11.cur/fs/relayfs/relay_locking.h Wed Nov 26 22:48:42 2003
@@ -0,0 +1,34 @@
+#ifndef _RELAY_LOCKING_H
+#define _RELAY_LOCKING_H
+
+extern char *
+locking_reserve(struct rchan *rchan,
+ u32 slot_len,
+ struct timeval *time_stamp,
+ u32 *tsc,
+ int *err,
+ int *interrupting);
+
+extern void
+locking_commit(struct rchan *rchan,
+ char *from,
+ u32 len,
+ int deliver,
+ int interrupting);
+
+extern void
+locking_resume(struct rchan *rchan);
+
+extern void
+locking_finalize(struct rchan *rchan);
+
+extern u32
+locking_get_offset(struct rchan *rchan, u32 *max_offset);
+
+extern void
+locking_reset(struct rchan *rchan, int init);
+
+extern int
+locking_reset_index(struct rchan *rchan, u32 old_idx);
+
+#endif /* _RELAY_LOCKING_H */
diff -urpN -X dontdiff linux-2.6.0-test11/fs/relayfs/relay_lockless.c linux-2.6.0-test11.cur/fs/relayfs/relay_lockless.c
--- linux-2.6.0-test11/fs/relayfs/relay_lockless.c Wed Dec 31 18:00:00 1969
+++ linux-2.6.0-test11.cur/fs/relayfs/relay_lockless.c Mon Dec 1 10:26:18 2003
@@ -0,0 +1,538 @@
+/*
+ * RelayFS lockless scheme implementation.
+ *
+ * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim@xxxxxxxxxxx)
+ * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi@xxxxxxxxxx), IBM Corp
+ * Copyright (C) 2002, 2003 - Bob Wisniewski (bob@xxxxxxxxxxxxxx), IBM Corp
+ *
+ * This file is released under the GPL.
+ */
+
+#include <asm/relay.h>
+#include "relay_lockless.h"
+#include "resize.h"
+
+/**
+ * compare_and_store_volatile - self-explicit
+ * @ptr: ptr to the word that will receive the new value
+ * @oval: the value we think is currently in *ptr
+ * @nval: the value *ptr will get if we were right
+ */
+inline int
+compare_and_store_volatile(volatile u32 *ptr,
+ u32 oval,
+ u32 nval)
+{
+ u32 prev;
+
+ barrier();
+ prev = cmpxchg(ptr, oval, nval);
+ barrier();
+
+ return (prev == oval);
+}
+
+/**
+ * atomic_set_volatile - atomically set the value in ptr to nval.
+ * @ptr: ptr to the word that will receive the new value
+ * @nval: the new value
+ */
+inline void
+atomic_set_volatile(atomic_t *ptr,
+ u32 nval)
+{
+ barrier();
+ atomic_set(ptr, (int)nval);
+ barrier();
+}
+
+/**
+ * atomic_add_volatile - atomically add val to the value at ptr.
+ * @ptr: ptr to the word that will receive the addition
+ * @val: the value to add to *ptr
+ */
+inline void
+atomic_add_volatile(atomic_t *ptr, u32 val)
+{
+ barrier();
+ atomic_add((int)val, ptr);
+ barrier();
+}
+
+/**
+ * atomic_sub_volatile - atomically substract val from the value at ptr.
+ * @ptr: ptr to the word that will receive the subtraction
+ * @val: the value to subtract from *ptr
+ */
+inline void
+atomic_sub_volatile(atomic_t *ptr, s32 val)
+{
+ barrier();
+ atomic_sub((int)val, ptr);
+ barrier();
+}
+
+/**
+ * lockless_commit - commit a reserved slot in the buffer
+ * @rchan: the channel
+ * @from: commit the length starting here
+ * @len: length committed
+ * @deliver: length committed
+ * @interrupting: not used
+ *
+ * Commits len bytes and calls deliver callback if applicable.
+ */
+inline void
+lockless_commit(struct rchan *rchan,
+ char *from,
+ u32 len,
+ int deliver,
+ int interrupting)
+{
+ u32 bufno, idx;
+
+ idx = from - rchan->buf;
+
+ if (len > 0) {
+ bufno = RELAY_BUFNO_GET(idx, offset_bits(rchan));
+ atomic_add_volatile(&fill_count(rchan, bufno), len);
+ }
+
+ if (deliver) {
+ u32 mask = offset_mask(rchan);
+ if (bulk_delivery(rchan)) {
+ from = rchan->buf + RELAY_BUF_OFFSET_CLEAR(idx, mask);
+ len += RELAY_BUF_OFFSET_GET(idx, mask);
+ }
+ rchan->callbacks->deliver(rchan->id, from, len);
+ expand_check(rchan);
+ }
+}
+
+/**
+ * get_buffer_end - get the address of the end of buffer
+ * @rchan: the channel
+ * @buf_idx: index into channel corresponding to address
+ */
+static inline char *
+get_buffer_end(struct rchan *rchan, u32 buf_idx)
+{
+ return rchan->buf
+ + RELAY_BUF_OFFSET_CLEAR(buf_idx, offset_mask(rchan))
+ + RELAY_BUF_SIZE(offset_bits(rchan));
+}
+
+
+/**
+ * finalize_buffer - utility function consolidating end-of-buffer tasks.
+ * @rchan: the channel
+ * @end_idx: index into buffer to write the end-buffer event at
+ * @size_lost: number of unused bytes at the end of the buffer
+ * @time_stamp: the time of the end-buffer event
+ * @tsc: the timestamp counter associated with time
+ * @resetting: are we resetting the channel?
+ *
+ * This function must be called with local irqs disabled.
+ */
+static inline void
+finalize_buffer(struct rchan *rchan,
+ u32 end_idx,
+ u32 size_lost,
+ struct timeval *time_stamp,
+ u32 *tsc,
+ int resetting)
+{
+ char* cur_write_pos;
+ char* write_buf_end;
+ u32 bufno;
+ int bytes_written;
+
+ cur_write_pos = rchan->buf + end_idx;
+ write_buf_end = get_buffer_end(rchan, end_idx - 1);
+
+ bytes_written = rchan->callbacks->buffer_end(rchan->id, cur_write_pos,
+ write_buf_end, *time_stamp, *tsc, using_tsc(rchan));
+ if (bytes_written == 0)
+ rchan->unused_bytes[rchan->buf_idx % rchan->n_bufs] = size_lost;
+ bufno = RELAY_BUFNO_GET(end_idx, offset_bits(rchan));
+ atomic_add_volatile(&fill_count(rchan, bufno), size_lost);
+ if (resetting) {
+ rchan->bufs_produced = rchan->bufs_produced + rchan->n_bufs;
+ rchan->bufs_produced -= rchan->bufs_produced % rchan->n_bufs;
+ rchan->bufs_consumed = rchan->bufs_produced;
+ rchan->bytes_consumed = 0;
+ update_readers_consumed(rchan, rchan->bufs_consumed, rchan->bytes_consumed);
+ } else
+ rchan->bufs_produced++;
+}
+
+/**
+ * lockless_finalize: - finalize last buffer at end of channel use
+ * @rchan: the channel
+ */
+inline void
+lockless_finalize(struct rchan *rchan)
+{
+ u32 event_end_idx;
+ u32 size_lost;
+ unsigned long int flags;
+ struct timeval time;
+ u32 tsc;
+
+ event_end_idx = RELAY_BUF_OFFSET_GET(idx(rchan), offset_mask(rchan));
+ size_lost = RELAY_BUF_SIZE(offset_bits(rchan)) - event_end_idx;
+
+ local_irq_save(flags);
+ get_timestamp(&time, &tsc, rchan);
+ finalize_buffer(rchan, idx(rchan) & idx_mask(rchan), size_lost,
+ &time, &tsc, 0);
+ local_irq_restore(flags);
+}
+
+/**
+ * discard_check: - determine whether a write should be discarded
+ * @rchan: the channel
+ * @old_idx: index into buffer where check for space should begin
+ * @write_len: the length of the write to check
+ * @time_stamp: the time of the end-buffer event
+ * @tsc: the timestamp counter associated with time
+ *
+ * The return value contains the result flags and is an ORed combination
+ * of the following:
+ *
+ * RELAY_WRITE_DISCARD_NONE - write should not be discarded
+ * RELAY_BUFFER_SWITCH - buffer switch occurred
+ * RELAY_WRITE_DISCARD - write should be discarded (all buffers are full)
+ * RELAY_WRITE_TOO_LONG - write won't fit into even an empty buffer
+ */
+static inline int
+discard_check(struct rchan *rchan,
+ u32 old_idx,
+ u32 write_len,
+ struct timeval *time_stamp,
+ u32 *tsc)
+{
+ u32 buffers_ready;
+ u32 offset_mask = offset_mask(rchan);
+ u8 offset_bits = offset_bits(rchan);
+ u32 idx_mask = idx_mask(rchan);
+ u32 size_lost;
+ unsigned long int flags;
+
+ if (write_len > RELAY_BUF_SIZE(offset_bits))
+ return RELAY_WRITE_DISCARD | RELAY_WRITE_TOO_LONG;
+
+ if (mode_continuous(rchan))
+ return RELAY_WRITE_DISCARD_NONE;
+
+ local_irq_save(flags);
+ if (atomic_read(&rchan->suspended) == 1) {
+ local_irq_restore(flags);
+ return RELAY_WRITE_DISCARD;
+ }
+ if (rchan->half_switch) {
+ local_irq_restore(flags);
+ return RELAY_WRITE_DISCARD_NONE;
+ }
+ buffers_ready = rchan->bufs_produced - rchan->bufs_consumed;
+ if (buffers_ready == rchan->n_bufs - 1) {
+ atomic_set(&rchan->suspended, 1);
+ size_lost = RELAY_BUF_SIZE(offset_bits)
+ - RELAY_BUF_OFFSET_GET(old_idx, offset_mask);
+ finalize_buffer(rchan, old_idx & idx_mask, size_lost,
+ time_stamp, tsc, 0);
+ rchan->half_switch = 1;
+ idx(rchan) = RELAY_BUF_OFFSET_CLEAR((old_idx & idx_mask), offset_mask(rchan)) + RELAY_BUF_SIZE(offset_bits) - 1;
+ local_irq_restore(flags);
+
+ return RELAY_BUFFER_SWITCH | RELAY_WRITE_DISCARD;
+ }
+ local_irq_restore(flags);
+
+ return RELAY_WRITE_DISCARD_NONE;
+}
+
+/**
+ * switch_buffers - switch over to a new sub-buffer
+ * @rchan: the channel
+ * @slot_len: the length of the slot needed for the current write
+ * @offset: the offset calculated for the new index
+ * @ts: timestamp
+ * @tsc: the timestamp counter associated with time
+ * @old_idx: the value of the buffer control index when we were called
+ * @old_idx: the new calculated value of the buffer control index
+ * @resetting: are we resetting the channel?
+ */
+static inline void
+switch_buffers(struct rchan *rchan,
+ u32 slot_len,
+ u32 offset,
+ struct timeval *ts,
+ u32 *tsc,
+ u32 new_idx,
+ u32 old_idx,
+ int resetting)
+{
+ u32 size_lost = rchan->end_reserve;
+ unsigned long int flags;
+ u32 idx_mask = idx_mask(rchan);
+ u8 offset_bits = offset_bits(rchan);
+ char *cur_write_pos;
+ u32 new_buf_no;
+ u32 start_reserve = rchan->start_reserve;
+
+ if (resetting)
+ size_lost = RELAY_BUF_SIZE(offset_bits(rchan)) - old_idx % rchan->buf_size;
+
+ if (offset > 0)
+ size_lost += slot_len - offset;
+ else
+ old_idx += slot_len;
+
+ local_irq_save(flags);
+ if (!rchan->half_switch)
+ finalize_buffer(rchan, old_idx & idx_mask, size_lost,
+ ts, tsc, resetting);
+ rchan->half_switch = 0;
+ rchan->buf_start_time = *ts;
+ rchan->buf_start_tsc = *tsc;
+ local_irq_restore(flags);
+
+ cur_write_pos = rchan->buf + RELAY_BUF_OFFSET_CLEAR((new_idx
+ & idx_mask), offset_mask(rchan));
+ if (resetting)
+ rchan->buf_idx = 0;
+ else
+ rchan->buf_idx++;
+ rchan->buf_id++;
+ rchan->unused_bytes[rchan->buf_idx % rchan->n_bufs] = 0;
+ rchan->callbacks->buffer_start(rchan->id, cur_write_pos,
+ rchan->buf_id, *ts, *tsc, using_tsc(rchan));
+ new_buf_no = RELAY_BUFNO_GET(new_idx & idx_mask, offset_bits);
+ atomic_sub_volatile(&fill_count(rchan, new_buf_no),
+ RELAY_BUF_SIZE(offset_bits) - start_reserve);
+ if (atomic_read(&fill_count(rchan, new_buf_no)) < start_reserve)
+ atomic_set_volatile(&fill_count(rchan, new_buf_no),
+ start_reserve);
+}
+
+/**
+ * lockless_reserve_slow - the slow reserve path in the lockless scheme
+ * @rchan: the channel
+ * @slot_len: the length of the slot to reserve
+ * @ts: variable that will receive the time the slot was reserved
+ * @tsc: the timestamp counter associated with time
+ * @old_idx: the value of the buffer control index when we were called
+ * @err: receives the result flags
+ *
+ * Returns pointer to the beginning of the reserved slot, NULL if error.
+
+ * err values same as for lockless_reserve.
+ */
+static inline char *
+lockless_reserve_slow(struct rchan *rchan,
+ u32 slot_len,
+ struct timeval *ts,
+ u32 *tsc,
+ u32 old_idx,
+ int *err)
+{
+ u32 new_idx, offset;
+ unsigned long int flags;
+ u32 offset_mask = offset_mask(rchan);
+ u32 idx_mask = idx_mask(rchan);
+ u32 start_reserve = rchan->start_reserve;
+ u32 end_reserve = rchan->end_reserve;
+ int discard_event;
+ u32 reserved_idx;
+ char *cur_write_pos;
+ int initializing = 0;
+
+ *err = RELAY_BUFFER_SWITCH_NONE;
+
+ discard_event = discard_check(rchan, old_idx, slot_len, ts, tsc);
+ if (discard_event != RELAY_WRITE_DISCARD_NONE) {
+ *err = discard_event;
+ return NULL;
+ }
+
+ local_irq_save(flags);
+ if (rchan->initialized == 0) {
+ rchan->initialized = initializing = 1;
+ idx(rchan) = rchan->start_reserve + rchan->rchan_start_reserve;
+ }
+ local_irq_restore(flags);
+
+ do {
+ old_idx = idx(rchan);
+ new_idx = old_idx + slot_len;
+
+ offset = RELAY_BUF_OFFSET_GET(new_idx + end_reserve,
+ offset_mask);
+ if ((offset < slot_len) && (offset > 0)) {
+ reserved_idx = RELAY_BUF_OFFSET_CLEAR(new_idx
+ + end_reserve, offset_mask) + start_reserve;
+ new_idx = reserved_idx + slot_len;
+ } else if (offset < slot_len) {
+ reserved_idx = old_idx;
+ new_idx = RELAY_BUF_OFFSET_CLEAR(new_idx
+ + end_reserve, offset_mask) + start_reserve;
+ } else
+ reserved_idx = old_idx;
+ get_timestamp(ts, tsc, rchan);
+ } while (!compare_and_store_volatile(&idx(rchan), old_idx, new_idx));
+
+ reserved_idx &= idx_mask;
+
+ if (initializing == 1) {
+ cur_write_pos = rchan->buf
+ + RELAY_BUF_OFFSET_CLEAR((old_idx & idx_mask),
+ offset_mask(rchan));
+ rchan->buf_start_time = *ts;
+ rchan->buf_start_tsc = *tsc;
+ rchan->unused_bytes[0] = 0;
+
+ rchan->callbacks->buffer_start(rchan->id, cur_write_pos,
+ rchan->buf_id, *ts, *tsc, using_tsc(rchan));
+ }
+
+ if (offset < slot_len) {
+ switch_buffers(rchan, slot_len, offset, ts, tsc, new_idx,
+ old_idx, 0);
+ *err = RELAY_BUFFER_SWITCH;
+ }
+
+ /* If not using TSC, need to calc time delta */
+ recalc_time_delta(ts, tsc, rchan);
+
+ return rchan->buf + reserved_idx;
+}
+
+/**
+ * lockless_reserve - reserve a slot in the buffer for an event.
+ * @rchan: the channel
+ * @slot_len: the length of the slot to reserve
+ * @ts: variable that will receive the time the slot was reserved
+ * @tsc: the timestamp counter associated with time
+ * @err: receives the result flags
+ * @interrupting: not used
+ *
+ * Returns pointer to the beginning of the reserved slot, NULL if error.
+ *
+ * The err value contains the result flags and is an ORed combination
+ * of the following:
+ *
+ * RELAY_BUFFER_SWITCH_NONE - no buffer switch occurred
+ * RELAY_EVENT_DISCARD_NONE - event should not be discarded
+ * RELAY_BUFFER_SWITCH - buffer switch occurred
+ * RELAY_EVENT_DISCARD - event should be discarded (all buffers are full)
+ * RELAY_EVENT_TOO_LONG - event won't fit into even an empty buffer
+ */
+inline char *
+lockless_reserve(struct rchan *rchan,
+ u32 slot_len,
+ struct timeval *ts,
+ u32 *tsc,
+ int *err,
+ int *interrupting)
+{
+ u32 old_idx, new_idx, offset;
+ u32 offset_mask = offset_mask(rchan);
+
+ do {
+ old_idx = idx(rchan);
+ new_idx = old_idx + slot_len;
+
+ offset = RELAY_BUF_OFFSET_GET(new_idx + rchan->end_reserve,
+ offset_mask);
+ if (offset < slot_len)
+ return lockless_reserve_slow(rchan, slot_len,
+ ts, tsc, old_idx, err);
+ get_time_or_tsc(ts, tsc, rchan);
+ } while (!compare_and_store_volatile(&idx(rchan), old_idx, new_idx));
+
+ /* If not using TSC, need to calc time delta */
+ recalc_time_delta(ts, tsc, rchan);
+
+ *err = RELAY_BUFFER_SWITCH_NONE;
+
+ return rchan->buf + (old_idx & idx_mask(rchan));
+}
+
+/**
+ * lockless_get_offset - get current and max channel offsets
+ * @rchan: the channel
+ * @max_offset: maximum channel offset
+ *
+ * Returns the current and maximum channel offsets.
+ */
+u32
+lockless_get_offset(struct rchan *rchan,
+ u32 *max_offset)
+{
+ if (max_offset)
+ *max_offset = rchan->buf_size * rchan->n_bufs - 1;
+
+ return rchan->initialized ? idx(rchan) & idx_mask(rchan) : 0;
+}
+
+/**
+ * lockless_reset - reset the channel
+ * @rchan: the channel
+ * @init: 1 if this is a first-time channel initialization
+ */
+void lockless_reset(struct rchan *rchan, int init)
+{
+ int i;
+
+ /* Start first buffer at 0 - (end_reserve + 1) so that it
+ gets initialized via buffer_start callback as well. */
+ idx(rchan) = 0UL - (rchan->end_reserve + 1);
+ idx_mask(rchan) =
+ (1UL << (bufno_bits(rchan) + offset_bits(rchan))) - 1;
+ atomic_set(&fill_count(rchan, 0),
+ (int)rchan->start_reserve +
+ (int)rchan->rchan_start_reserve);
+ for (i = 1; i < rchan->n_bufs; i++)
+ atomic_set(&fill_count(rchan, i),
+ (int)RELAY_BUF_SIZE(offset_bits(rchan)));
+}
+
+/**
+ * lockless_reset_index - atomically set channel index to the beginning
+ * @rchan: the channel
+ * @old_idx: the current index
+ *
+ * If this fails, it means that something else just logged something
+ * and therefore we probably no longer want to do this. It's up to the
+ * caller anyway...
+ *
+ * Returns 0 if the index was successfully set, negative otherwise
+ */
+int
+lockless_reset_index(struct rchan *rchan, u32 old_idx)
+{
+ struct timeval ts;
+ u32 tsc;
+ u32 new_idx;
+
+ if (compare_and_store_volatile(&idx(rchan), old_idx, 0)) {
+ new_idx = rchan->start_reserve;
+ switch_buffers(rchan, 0, 0, &ts, &tsc, new_idx, old_idx, 1);
+ return 0;
+ } else
+ return -1;
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
diff -urpN -X dontdiff linux-2.6.0-test11/fs/relayfs/relay_lockless.h linux-2.6.0-test11.cur/fs/relayfs/relay_lockless.h
--- linux-2.6.0-test11/fs/relayfs/relay_lockless.h Wed Dec 31 18:00:00 1969
+++ linux-2.6.0-test11.cur/fs/relayfs/relay_lockless.h Wed Nov 26 22:48:42 2003
@@ -0,0 +1,34 @@
+#ifndef _RELAY_LOCKLESS_H
+#define _RELAY_LOCKLESS_H
+
+extern char *
+lockless_reserve(struct rchan *rchan,
+ u32 slot_len,
+ struct timeval *time_stamp,
+ u32 *tsc,
+ int * interrupting,
+ int * errcode);
+
+extern void
+lockless_commit(struct rchan *rchan,
+ char * from,
+ u32 len,
+ int deliver,
+ int interrupting);
+
+extern void
+lockless_resume(struct rchan *rchan);
+
+extern void
+lockless_finalize(struct rchan *rchan);
+
+extern u32
+lockless_get_offset(struct rchan *rchan, u32 *max_offset);
+
+extern void
+lockless_reset(struct rchan *rchan, int init);
+
+extern int
+lockless_reset_index(struct rchan *rchan, u32 old_idx);
+
+#endif/* _RELAY_LOCKLESS_H */
diff -urpN -X dontdiff linux-2.6.0-test11/fs/relayfs/resize.c linux-2.6.0-test11.cur/fs/relayfs/resize.c
--- linux-2.6.0-test11/fs/relayfs/resize.c Wed Dec 31 18:00:00 1969
+++ linux-2.6.0-test11.cur/fs/relayfs/resize.c Mon Dec 1 11:51:17 2003
@@ -0,0 +1,957 @@
+/*
+ * RelayFS buffer management and resizing code.
+ *
+ * Copyright (C) 2002, 2003 - Tom Zanussi (zanussi@xxxxxxxxxx), IBM Corp
+ * Copyright (C) 1999, 2000, 2001, 2002 - Karim Yaghmour (karim@xxxxxxxxxxx)
+ *
+ * This file is released under the GPL.
+ */
+
+#include <linux/module.h>
+#include <linux/vmalloc.h>
+#include <linux/mm.h>
+#include <linux/klog.h>
+#include <asm/relay.h>
+#include "resize.h"
+
+/**
+ * alloc_page_array - alloc array to hold pages, but not pages
+ * @size: the total size of the memory represented by the page array
+ * @page_count: the number of pages the array can hold
+ * @err: 0 on success, negative otherwise
+ *
+ * Returns a pointer to the page array if successful, NULL otherwise.
+ */
+static struct page **
+alloc_page_array(int size, int *page_count, int *err)
+{
+ int n_pages;
+ struct page **page_array;
+ int page_array_size;
+
+ *err = 0;
+
+ size = PAGE_ALIGN(size);
+ n_pages = size >> PAGE_SHIFT;
+ page_array_size = n_pages * sizeof(struct page *);
+ page_array = kmalloc(page_array_size, GFP_KERNEL);
+ if (page_array == NULL) {
+ *err = -ENOMEM;
+ return NULL;
+ }
+ *page_count = n_pages;
+ memset(page_array, 0, page_array_size);
+
+ return page_array;
+}
+
+/**
+ * free_page_array - free array to hold pages, but not pages
+ * @page_array: pointer to the page array
+ */
+static inline void
+free_page_array(struct page **page_array)
+{
+ kfree(page_array);
+}
+
+/**
+ * depopulate_page_array - free and unreserve all pages in the array
+ * @page_array: pointer to the page array
+ * @page_count: number of pages to free
+ */
+static void
+depopulate_page_array(struct page **page_array, int page_count)
+{
+ int i;
+
+ for (i = 0; i < page_count; i++) {
+ ClearPageReserved(page_array[i]);
+ __free_page(page_array[i]);
+ }
+}
+
+/**
+ * populate_page_array - allocate and reserve pages
+ * @page_array: pointer to the page array
+ * @page_count: number of pages to allocate
+ *
+ * Returns 0 if successful, negative otherwise.
+ */
+static int
+populate_page_array(struct page **page_array, int page_count)
+{
+ int i;
+
+ for (i = 0; i < page_count; i++) {
+ page_array[i] = alloc_page(GFP_KERNEL);
+ if (unlikely(!page_array[i])) {
+ depopulate_page_array(page_array, i);
+ return -ENOMEM;
+ }
+ SetPageReserved(page_array[i]);
+ }
+ return 0;
+}
+
+/**
+ * alloc_rchan_buf - allocate the initial channel buffer
+ * @size: total size of the buffer
+ * @page_array: receives a pointer to the buffer's page array
+ * @page_count: receives the number of pages allocated
+ *
+ * Returns a pointer to the resulting buffer, NULL if unsuccessful
+ */
+void *
+alloc_rchan_buf(unsigned long size, struct page ***page_array, int *page_count)
+{
+ void *mem;
+ int err;
+
+ *page_array = alloc_page_array(size, page_count, &err);
+ if (!*page_array)
+ return NULL;
+
+ err = populate_page_array(*page_array, *page_count);
+ if (err) {
+ free_page_array(*page_array);
+ *page_array = NULL;
+ return NULL;
+ }
+
+ mem = vmap(*page_array, *page_count, GFP_KERNEL, PAGE_KERNEL);
+ if (!mem) {
+ depopulate_page_array(*page_array, *page_count);
+ free_page_array(*page_array);
+ *page_array = NULL;
+ return NULL;
+ }
+ memset(mem, 0, size);
+
+ return mem;
+}
+
+/**
+ * free_rchan_buf - free a channel buffer
+ * @buf: pointer to the buffer to free
+ * @page_array: pointer to the buffer's page array
+ * @page_count: number of pages in page array
+ */
+void
+free_rchan_buf(void *buf, struct page **page_array, int page_count)
+{
+ vunmap(buf);
+ depopulate_page_array(page_array, page_count);
+ free_page_array(page_array);
+}
+
+/**
+ * expand_check - check whether the channel needs expanding
+ * @rchan: the channel
+ *
+ * If the channel needs expanding, the needs_resize callback is
+ * called with RELAY_RESIZE_EXPAND.
+ *
+ * Returns the suggested number of sub-buffers for the new
+ * buffer.
+ */
+void
+expand_check(struct rchan *rchan)
+{
+ u32 active_bufs;
+ u32 new_n_bufs = 0;
+ u32 threshold = rchan->n_bufs * RESIZE_THRESHOLD;
+
+ if (rchan->resize_min == 0)
+ return;
+
+ if (rchan->resizing || rchan->replace_buffer)
+ return;
+
+ active_bufs = rchan->bufs_produced - rchan->bufs_consumed + 1;
+
+ if (rchan->resize_max && active_bufs == threshold) {
+ new_n_bufs = rchan->n_bufs * 2;
+ }
+
+ if (new_n_bufs)
+ rchan->callbacks->needs_resize(rchan->id,
+ RELAY_RESIZE_EXPAND,
+ rchan->buf_size,
+ new_n_bufs);
+}
+
+/**
+ * can_shrink - check whether the channel can shrink
+ * @rchan: the channel
+ * @cur_idx: the current channel index
+ *
+ * Returns the suggested number of sub-buffers for the new
+ * buffer, 0 if the buffer is not shrinkable.
+ */
+static inline u32
+can_shrink(struct rchan *rchan, u32 cur_idx)
+{
+ u32 active_bufs = rchan->bufs_produced - rchan->bufs_consumed + 1;
+ u32 new_n_bufs = 0;
+ u32 cur_bufno_bytes = cur_idx % rchan->buf_size;
+
+ if (rchan->resize_min == 0 ||
+ rchan->resize_min >= rchan->n_bufs * rchan->buf_size)
+ goto out;
+
+ if (active_bufs > 1)
+ goto out;
+
+ if (cur_bufno_bytes != rchan->bytes_consumed)
+ goto out;
+
+ new_n_bufs = rchan->resize_min / rchan->buf_size;
+out:
+ return new_n_bufs;
+}
+
+/**
+ * shrink_check: - timer function checking whether the channel can shrink
+ * @data: unused
+ *
+ * Every SHRINK_TIMER_SECS, check whether the channel is shrinkable.
+ * If so, we attempt to atomically reset the channel to the beginning.
+ * The needs_resize callback is then called with RELAY_RESIZE_SHRINK.
+ * If the reset fails, it means we really shouldn't be shrinking now
+ * and need to wait until the next time around.
+ */
+static void
+shrink_check(unsigned long data)
+{
+ struct rchan *rchan = (struct rchan *)data;
+ u32 shrink_to_nbufs, cur_idx;
+
+ del_timer(&rchan->shrink_timer);
+ rchan->shrink_timer.expires = jiffies + SHRINK_TIMER_SECS * HZ;
+ add_timer(&rchan->shrink_timer);
+
+ if (rchan->resizing || rchan->replace_buffer)
+ return;
+
+ cur_idx = relay_get_offset(rchan, NULL);
+ shrink_to_nbufs = can_shrink(rchan, cur_idx);
+ if (shrink_to_nbufs != 0 && reset_index(rchan, cur_idx) == 0)
+ rchan->callbacks->needs_resize(rchan->id,
+ RELAY_RESIZE_SHRINK,
+ rchan->buf_size,
+ shrink_to_nbufs);
+}
+
+/**
+ * init_shrink_timer: - Start timer used to check shrinkability.
+ * @rchan: the channel
+ */
+void
+init_shrink_timer(struct rchan *rchan)
+{
+ if (rchan->resize_min) {
+ init_timer(&rchan->shrink_timer);
+ rchan->shrink_timer.function = shrink_check;
+ rchan->shrink_timer.data = (unsigned long)rchan;
+ rchan->shrink_timer.expires = jiffies + SHRINK_TIMER_SECS * HZ;
+ add_timer(&rchan->shrink_timer);
+ }
+}
+
+
+/**
+ * alloc_new_pages - allocate new pages for expanding buffer
+ * @rchan: the channel
+ *
+ * Returns 0 on success, negative otherwise.
+ */
+static int
+alloc_new_pages(struct rchan *rchan)
+{
+ int new_pages_size, err;
+
+ if (unlikely(rchan->expand_page_array)) BUG();
+
+ new_pages_size = rchan->resize_alloc_size - rchan->alloc_size;
+ rchan->expand_page_array = alloc_page_array(new_pages_size,
+ &rchan->expand_page_count, &err);
+ if (rchan->expand_page_array == NULL) {
+ rchan->resize_err = -ENOMEM;
+ return -ENOMEM;
+ }
+
+ err = populate_page_array(rchan->expand_page_array,
+ rchan->expand_page_count);
+ if (err) {
+ rchan->resize_err = -ENOMEM;
+ free_page_array(rchan->expand_page_array);
+ rchan->expand_page_array = NULL;
+ }
+
+ return err;
+}
+
+/**
+ * clear_resize_offsets - helper function for buffer resizing
+ * @rchan: the channel
+ *
+ * Clear the saved offset changes.
+ */
+static inline void
+clear_resize_offsets(struct rchan *rchan)
+{
+ unsigned i;
+
+ rchan->offset_change_count = 0;
+
+ for (i = 0; i < MAX_RESIZE_OFFSETS; i++) {
+ rchan->resize_offsets[i].ge = 0UL;
+ rchan->resize_offsets[i].le = 0UL;
+ rchan->resize_offsets[i].delta = 0;
+ }
+}
+
+/**
+ * save_resize_offset - helper function for buffer resizing
+ * @rchan: the channel
+ * @ge: affected region ge this
+ * @le: affected region le this
+ * @delta: apply this delta
+ *
+ * Save a resize offset.
+ */
+static inline void
+save_resize_offset(struct rchan *rchan, u32 ge, u32 le, int delta)
+{
+ unsigned slot = rchan->offset_change_count;
+
+ if (slot < MAX_RESIZE_OFFSETS) {
+ rchan->resize_offsets[slot].ge = ge;
+ rchan->resize_offsets[slot].le = le;
+ rchan->resize_offsets[slot].delta = delta;
+ rchan->offset_change_count++;
+ }
+}
+
+/**
+ * update_file_offset - apply offset change to reader
+ * @reader: the channel reader
+ * @change_idx: the offset index into the offsets array
+ *
+ * Returns non-zero if the offset was applied.
+ *
+ * Apply the offset delta saved in change_idx to the reader's
+ * current read position.
+ */
+static inline int
+update_file_offset(struct rchan_reader *reader, unsigned change_idx)
+{
+ int applied = 0;
+ struct rchan *rchan = reader->rchan;
+ u32 f_pos;
+ int delta = reader->rchan->resize_offsets[change_idx].delta;
+
+ if (reader->vfs_reader)
+ f_pos = (u32)reader->pos.file->f_pos;
+ else
+ f_pos = reader->pos.f_pos;
+
+ if ((f_pos >= rchan->resize_offsets[change_idx].ge) &&
+ (f_pos <= rchan->resize_offsets[change_idx].le)) {
+ if (reader->vfs_reader)
+ reader->pos.file->f_pos += delta;
+ else
+ reader->pos.f_pos += delta;
+ applied = 1;
+ }
+
+ return applied;
+}
+
+/**
+ * update_file_offsets - apply offset change to readers
+ * @rchan: the channel
+ *
+ * Apply the saved offset deltas to all files open on the channel.
+ */
+static inline void
+update_file_offsets(struct rchan *rchan)
+{
+ struct list_head *p;
+ struct rchan_reader *reader;
+ unsigned i;
+
+ read_lock(&rchan->open_readers_lock);
+ list_for_each(p, &rchan->open_readers) {
+ reader = list_entry(p, struct rchan_reader, list);
+ for (i = 0; i < rchan->offset_change_count; i++) {
+ if (update_file_offset(reader, i)) {
+ reader->offset_changed = 1;
+ break;
+ }
+ }
+ }
+ read_unlock(&rchan->open_readers_lock);
+}
+
+/**
+ * expand_rchan_buf - expand the channel buffer
+ * @rchan: the channel
+ * @newsize: the size of the new buffer
+ * @oldsize: the size of the old buffer
+ * @old_n_bufs: the number of sub-buffers in the old buffer
+ *
+ * Inserts new pages into the old buffer to create a larger
+ * new channel buffer, splitting them at old_cur_idx, the bottom
+ * half of the old buffer going to the bottom of the new, likewise
+ * for the top half.
+ */
+static void
+expand_rchan_buf(struct rchan *rchan, int newsize, int oldsize, u32 old_n_bufs)
+{
+ u32 cur_idx;
+ int cur_bufno, delta, i, j;
+ u32 ge, le;
+ int cur_pageno;
+ u32 free_bufs, free_pages;
+ u32 free_pages_in_cur_buf;
+ u32 free_bufs_to_end;
+ u32 cur_pages = rchan->alloc_size >> PAGE_SHIFT;
+ u32 pages_per_buf = cur_pages / rchan->n_bufs;
+ u32 bufs_ready = rchan->bufs_produced - rchan->bufs_consumed;
+
+ if (bufs_ready >= rchan->n_bufs) {
+ bufs_ready = rchan->n_bufs;
+ free_bufs = 0;
+ } else
+ free_bufs = rchan->n_bufs - bufs_ready - 1;
+
+ cur_idx = relay_get_offset(rchan, NULL);
+ cur_pageno = cur_idx / PAGE_SIZE;
+ cur_bufno = cur_idx / rchan->buf_size;
+
+ free_pages_in_cur_buf = (pages_per_buf - 1) - (cur_pageno % pages_per_buf);
+ free_pages = free_bufs * pages_per_buf + free_pages_in_cur_buf;
+ free_bufs_to_end = (rchan->n_bufs - 1) - cur_bufno;
+ if (free_bufs >= free_bufs_to_end)
+ free_pages = free_bufs_to_end * pages_per_buf + free_pages_in_cur_buf;
+
+ if (!rchan->resize_page_array || !rchan->expand_page_array ||
+ !rchan->buf_page_array)
+ return;
+
+ for (i = 0, j = 0; i <= cur_pageno + free_pages; i++, j++)
+ rchan->resize_page_array[j] = rchan->buf_page_array[i];
+ for (i = 0; i < rchan->expand_page_count; i++, j++)
+ rchan->resize_page_array[j] = rchan->expand_page_array[i];
+ for (i = cur_pageno + free_pages + 1; i < rchan->buf_page_count; i++, j++)
+ rchan->resize_page_array[j] = rchan->buf_page_array[i];
+
+ rchan->old_buf_page_array = rchan->buf_page_array;
+ rchan->buf_page_array = rchan->resize_page_array;
+ rchan->buf_page_count = rchan->resize_page_count;
+
+ rchan->resize_page_array = NULL;
+ rchan->resize_page_count = 0;
+
+ delta = newsize - oldsize;
+ ge = (cur_bufno + 1) * rchan->buf_size;
+ le = oldsize;
+ save_resize_offset(rchan, ge, le, delta);
+}
+
+/**
+ * shrink_rchan_buf - shrink the channel buffer
+ * @rchan: the channel
+ *
+ * Removes pages from the old buffer to create a smaller
+ * new channel buffer.
+ */
+static void
+shrink_rchan_buf(struct rchan *rchan)
+{
+ int i;
+ int copy_end_page;
+
+ if (!rchan->resize_page_array || !rchan->shrink_page_array ||
+ !rchan->buf_page_array)
+ return;
+
+ copy_end_page = rchan->resize_alloc_size / PAGE_SIZE;
+
+ for (i = 0; i <= copy_end_page; i++)
+ rchan->resize_page_array[i] = rchan->buf_page_array[i];
+
+ rchan->old_buf_page_array = rchan->buf_page_array;
+ rchan->buf_page_array = rchan->resize_page_array;
+ rchan->buf_page_count = rchan->resize_page_count;
+
+ rchan->resize_page_array = NULL;
+ rchan->resize_page_count = 0;
+}
+
+/**
+ * relaybuf_alloc - allocate a new resized channel buffer
+ * @private: pointer to the channel struct
+ *
+ * Internal - manages the allocation and remapping of new channel
+ * buffers.
+ */
+static void
+relaybuf_alloc(void *private)
+{
+ struct rchan *rchan = (struct rchan *)private;
+ int i, j, err;
+ u32 old_cur_idx;
+ int free_size;
+ int free_start_page, free_end_page;
+ u32 newsize, oldsize;
+
+ if (rchan->resize_alloc_size > rchan->alloc_size) {
+ err = alloc_new_pages(rchan);
+ if (err) goto cleanup;
+ } else {
+ free_size = rchan->alloc_size - rchan->resize_alloc_size;
+ BUG_ON(free_size <= 0);
+ rchan->shrink_page_array = alloc_page_array(free_size,
+ &rchan->shrink_page_count, &err);
+ if (rchan->shrink_page_array == NULL)
+ goto cleanup;
+ free_start_page = rchan->resize_alloc_size / PAGE_SIZE;
+ free_end_page = rchan->alloc_size / PAGE_SIZE;
+ for (i = 0, j = free_start_page; j < free_end_page; i++, j++)
+ rchan->shrink_page_array[i] = rchan->buf_page_array[j];
+ }
+
+ rchan->resize_page_array = alloc_page_array(rchan->resize_alloc_size,
+ &rchan->resize_page_count, &err);
+ if (rchan->resize_page_array == NULL)
+ goto cleanup;
+
+ old_cur_idx = relay_get_offset(rchan, NULL);
+ clear_resize_offsets(rchan);
+ newsize = rchan->resize_alloc_size;
+ oldsize = rchan->alloc_size;
+ if (newsize > oldsize)
+ expand_rchan_buf(rchan, newsize, oldsize, rchan->n_bufs);
+ else
+ shrink_rchan_buf(rchan);
+
+ rchan->resize_buf = vmap(rchan->buf_page_array, rchan->buf_page_count,
+ GFP_KERNEL, PAGE_KERNEL);
+ if (rchan->resize_buf == NULL)
+ goto cleanup;
+
+ rchan->replace_buffer = 1;
+ rchan->resizing = 0;
+
+ rchan->callbacks->needs_resize(rchan->id, RELAY_RESIZE_REPLACE, 0, 0);
+ return;
+
+cleanup:
+ if (rchan->expand_page_array) {
+ depopulate_page_array(rchan->expand_page_array,
+ rchan->expand_page_count);
+ free_page_array(rchan->expand_page_array);
+ rchan->expand_page_array = NULL;
+ rchan->expand_page_count = 0;
+ } else if (rchan->shrink_page_array) {
+ free_page_array(rchan->shrink_page_array);
+ rchan->shrink_page_array = NULL;
+ rchan->shrink_page_count = 0;
+ }
+
+ if (rchan->resize_page_array) {
+ free_page_array(rchan->resize_page_array);
+ rchan->resize_page_array = NULL;
+ rchan->resize_page_count = 0;
+ }
+
+ rchan->resize_err = -ENOMEM;
+ return;
+}
+
+/**
+ * relaybuf_free - free a resized channel buffer
+ * @private: pointer to the channel struct
+ *
+ * Internal - manages the de-allocation and unmapping of old channel
+ * buffers.
+ */
+static void
+relaybuf_free(void *private)
+{
+ struct free_rchan_buf *free_buf = (struct free_rchan_buf *)private;
+
+ if (unlikely(!free_buf->old_buf))
+ BUG();
+
+ vunmap(free_buf->old_buf);
+
+ if (free_buf->old_buf_page_array)
+ free_page_array(free_buf->old_buf_page_array);
+ if (free_buf->expand_page_array)
+ free_page_array(free_buf->expand_page_array);
+ if (free_buf->shrink_page_array) {
+ depopulate_page_array(free_buf->shrink_page_array,
+ free_buf->shrink_page_count);
+ free_page_array(free_buf->shrink_page_array);
+ }
+
+ kfree(free_buf);
+}
+
+/**
+ * calc_order - determine the power-of-2 order of a resize
+ * @high: the larger size
+ * @low: the smaller size
+ *
+ * Returns order
+ */
+static inline int
+calc_order(u32 high, u32 low)
+{
+ int order = 0;
+
+ if (!high || !low || high <= low)
+ return 0;
+
+ while (high > low) {
+ order++;
+ high /= 2;
+ }
+
+ return order;
+}
+
+/**
+ * check_size - check the sanity of the requested channel size
+ * @rchan: the channel
+ * @nbufs: the new number of sub-buffers
+ * @err: return code
+ *
+ * Returns the non-zero total buffer size if ok, otherwise 0 and
+ * sets errcode if not.
+ */
+static inline u32
+check_size(struct rchan *rchan, u32 nbufs, int *err)
+{
+ u32 new_channel_size = 0;
+
+ *err = 0;
+
+ if (nbufs > rchan->n_bufs) {
+ rchan->resize_order = calc_order(nbufs, rchan->n_bufs);
+ if (!rchan->resize_order) {
+ *err = -EINVAL;
+ goto out;
+ }
+
+ new_channel_size = rchan->buf_size * nbufs;
+ if (new_channel_size > rchan->resize_max) {
+ *err = -EINVAL;
+ goto out;
+ }
+ } else if (nbufs < rchan->n_bufs) {
+ if (rchan->n_bufs < 2) {
+ *err = -EINVAL;
+ goto out;
+ }
+ rchan->resize_order = -calc_order(rchan->n_bufs, nbufs);
+ if (!rchan->resize_order) {
+ *err = -EINVAL;
+ goto out;
+ }
+
+ new_channel_size = rchan->buf_size * nbufs;
+ if (new_channel_size < rchan->resize_min) {
+ *err = -EINVAL;
+ goto out;
+ }
+ } else
+ *err = -EINVAL;
+out:
+ return new_channel_size;
+}
+
+/**
+ * __relay_realloc_buffer - allocate a new channel buffer
+ * @rchan: the channel
+ * @new_nbufs: the new number of sub-buffers
+ * @async: do the allocation using a work queue
+ *
+ * Internal - see relay_realloc_buffer() for details.
+ */
+static int
+__relay_realloc_buffer(struct rchan *rchan, u32 new_nbufs, int async)
+{
+ u32 new_channel_size;
+ int err = 0;
+
+ if (new_nbufs == rchan->n_bufs)
+ return -EINVAL;
+
+ if (down_trylock(&rchan->resize_sem))
+ return -EBUSY;
+
+ if (rchan->replace_buffer) {
+ err = -EBUSY;
+ goto out;
+ }
+
+ if (rchan->resizing) {
+ err = -EBUSY;
+ goto out;
+ } else
+ rchan->resizing = 1;
+
+ if (rchan->resize_failures > MAX_RESIZE_FAILURES) {
+ err = -ENOMEM;
+ goto out;
+ }
+
+ new_channel_size = check_size(rchan, new_nbufs, &err);
+ if (err)
+ goto out;
+
+ rchan->resize_n_bufs = new_nbufs;
+ rchan->resize_buf_size = rchan->buf_size;
+ rchan->resize_alloc_size = FIX_SIZE(new_channel_size);
+
+ if (async) {
+ INIT_WORK(&rchan->work, relaybuf_alloc, rchan);
+ schedule_delayed_work(&rchan->work, 1);
+ } else
+ relaybuf_alloc((void *)rchan);
+out:
+ up(&rchan->resize_sem);
+
+ return err;
+}
+
+/**
+ * relay_realloc_buffer - allocate a new channel buffer
+ * @rchan_id: the channel id
+ * @bufsize: the new sub-buffer size
+ * @nbufs: the new number of sub-buffers
+ *
+ * Allocates a new channel buffer using the specified sub-buffer size
+ * and count. If async is non-zero, the allocation is done in the
+ * background using a work queue. When the allocation has completed,
+ * the needs_resize() callback is called with a resize_type of
+ * RELAY_RESIZE_REPLACE. This function doesn't replace the old buffer
+ * with the new - see relay_replace_buffer(). See
+ * Documentation/filesystems/relayfs.txt for more details.
+ *
+ * Returns 0 on success, or errcode if the channel is busy or if
+ * the allocation couldn't happen for some reason.
+ */
+int
+relay_realloc_buffer(int rchan_id, u32 new_nbufs, int async)
+{
+ int err;
+
+ struct rchan *rchan;
+
+ rchan = rchan_get(rchan_id);
+ if (rchan == NULL)
+ return -EBADF;
+
+ err = __relay_realloc_buffer(rchan, new_nbufs, async);
+
+ rchan_put(rchan);
+
+ return err;
+}
+
+
+
+/**
+ * do_replace_buffer - does the work of channel buffer replacement
+ * @rchan: the channel
+ * @newsize: new channel buffer size
+ * @oldsize: old channel buffer size
+ * @old_n_bufs: old channel sub-buffer count
+ *
+ * Does the work of switching buffers and fixing everything up
+ * so the channel can continue with a new size.
+ */
+static void
+do_replace_buffer(struct rchan *rchan,
+ int newsize,
+ int oldsize,
+ u32 old_n_bufs)
+{
+ int i;
+ u32 cur_idx;
+ u32 cur_bufno;
+ u32 newbufs;
+
+ cur_idx = relay_get_offset(rchan, NULL);
+ cur_bufno = cur_idx / rchan->buf_size;
+
+ rchan->buf = rchan->resize_buf;
+ rchan->alloc_size = rchan->resize_alloc_size;
+ rchan->n_bufs = rchan->resize_n_bufs;
+
+ if (newsize > oldsize) {
+ newbufs = (newsize - oldsize) / rchan->buf_size;
+ for (i = cur_bufno + 1; i < old_n_bufs; i++) {
+ if (using_lockless(rchan))
+ atomic_set(&fill_count(rchan, i + newbufs),
+ atomic_read(&fill_count(rchan, i)));
+ rchan->unused_bytes[i + newbufs] = rchan->unused_bytes[i];
+ }
+ for (i = cur_bufno + 1; i < cur_bufno + newbufs + 1; i++) {
+ if (using_lockless(rchan))
+ atomic_set(&fill_count(rchan, i),
+ (int)RELAY_BUF_SIZE(offset_bits(rchan)));
+ rchan->unused_bytes[i] = 0;
+ }
+ }
+
+ if (!using_lockless(rchan)) {
+ cur_write_pos(rchan) = rchan->buf + cur_idx;
+ write_buf(rchan) = rchan->buf + cur_bufno * rchan->buf_size;
+ write_buf_end(rchan) = write_buf(rchan) + rchan->buf_size;
+ write_limit(rchan) = write_buf_end(rchan) - rchan->end_reserve;
+ } else {
+ bufno_bits(rchan) += rchan->resize_order;
+ idx_mask(rchan) =
+ (1UL << (bufno_bits(rchan) + offset_bits(rchan))) - 1;
+ }
+
+ if (rchan->offset_change_count)
+ update_file_offsets(rchan);
+
+ rchan->resize_buf = NULL;
+ rchan->resize_buf_size = 0;
+ rchan->resize_alloc_size = 0;
+ rchan->resize_n_bufs = 0;
+ rchan->resize_err = 0;
+ rchan->resize_order = 0;
+
+ atomic_set(&rchan->suspended, 0);
+
+ rchan->callbacks->needs_resize(rchan->id,
+ RELAY_RESIZE_REPLACED,
+ rchan->buf_size,
+ rchan->n_bufs);
+}
+
+/**
+ * free_replaced_buffer - free a channel's old buffer
+ * @rchan: the channel
+ * @oldbuf: the old buffer
+ * @oldsize: old buffer size
+ *
+ * Frees a channel buffer via work queue.
+ */
+static int
+free_replaced_buffer(struct rchan *rchan, char *oldbuf, int oldsize)
+{
+ struct free_rchan_buf *free_buf;
+
+ free_buf = kmalloc(sizeof(struct free_rchan_buf), GFP_ATOMIC);
+ if (!free_buf) {
+ return -ENOMEM;
+ }
+ memset(free_buf, 0, sizeof(struct free_rchan_buf));
+
+ free_buf->old_buf = oldbuf;
+ free_buf->old_buf_page_array = rchan->old_buf_page_array;
+ rchan->old_buf_page_array = NULL;
+ free_buf->expand_page_array = rchan->expand_page_array;
+ free_buf->shrink_page_array = rchan->shrink_page_array;
+ free_buf->shrink_page_count = rchan->shrink_page_count;
+
+ rchan->expand_page_array = NULL;
+ rchan->expand_page_count = 0;
+ rchan->shrink_page_array = NULL;
+ rchan->shrink_page_count = 0;
+
+ INIT_WORK(&free_buf->work, relaybuf_free, free_buf);
+ schedule_delayed_work(&free_buf->work, 1);
+
+ return 0;
+}
+
+/**
+ * __relay_replace_buffer - replace channel buffer with new buffer
+ * @rchan: the channel
+ *
+ * Internal - see relay_replace_buffer() for details.
+ *
+ * Returns 0 if successful, negative otherwise.
+ */
+static int
+__relay_replace_buffer(struct rchan *rchan)
+{
+ int oldsize;
+ int err = 0;
+ char *oldbuf;
+
+ if (down_trylock(&rchan->resize_sem))
+ return -EBUSY;
+
+ if (!rchan->replace_buffer)
+ goto out;
+
+ if (rchan->resizing) {
+ err = -EBUSY;
+ goto out;
+ }
+
+ if (rchan->resize_buf == NULL) {
+ err = -EINVAL;
+ goto out;
+ }
+
+ oldbuf = rchan->buf;
+ oldsize = rchan->alloc_size;
+
+ do_replace_buffer(rchan, rchan->resize_alloc_size,
+ oldsize, rchan->n_bufs);
+
+ err = free_replaced_buffer(rchan, oldbuf, oldsize);
+out:
+ rchan->replace_buffer = 0;
+ up(&rchan->resize_sem);
+
+ return err;
+}
+
+/**
+ * relay_replace_buffer - replace channel buffer with new buffer
+ * @rchan_id: the channel id
+ *
+ * Replaces the current channel buffer with the new buffer allocated
+ * by relay_alloc_buffer and contained in the channel struct. When the
+ * replacement is complete, the needs_resize() callback is called with
+ * RELAY_RESIZE_REPLACED.
+ *
+ * Returns 0 on success, or errcode if the channel is busy or if
+ * the replacement or previous allocation didn't happen for some reason.
+ */
+int
+relay_replace_buffer(int rchan_id)
+{
+ int err;
+
+ struct rchan *rchan;
+
+ rchan = rchan_get(rchan_id);
+ if (rchan == NULL)
+ return -EBADF;
+
+ err = __relay_replace_buffer(rchan);
+
+ rchan_put(rchan);
+
+ return err;
+}
+
+EXPORT_SYMBOL(relay_realloc_buffer);
+EXPORT_SYMBOL(relay_replace_buffer);
+
diff -urpN -X dontdiff linux-2.6.0-test11/fs/relayfs/resize.h linux-2.6.0-test11.cur/fs/relayfs/resize.h
--- linux-2.6.0-test11/fs/relayfs/resize.h Wed Dec 31 18:00:00 1969
+++ linux-2.6.0-test11.cur/fs/relayfs/resize.h Mon Dec 1 12:10:41 2003
@@ -0,0 +1,49 @@
+#ifndef _RELAY_RESIZE_H
+#define _RELAY_RESIZE_H
+
+/*
+ * If the channel usage has been below the low water mark for more than
+ * this amount of time, we can shrink the buffer if necessary.
+ */
+#define SHRINK_TIMER_SECS 60
+
+/* This inspired by rtai/shmem */
+#define FIX_SIZE(x) (((x) - 1) & PAGE_MASK) + PAGE_SIZE
+
+/* Don't attempt resizing again after this many failures */
+#define MAX_RESIZE_FAILURES 1
+
+/* Trigger resizing if a resizable channel is this full */
+#define RESIZE_THRESHOLD 3 / 4
+
+/*
+ * Used for deferring resized channel free
+ */
+struct free_rchan_buf
+{
+ char *old_buf;
+ struct page **old_buf_page_array;
+ struct page **expand_page_array;
+ struct page **shrink_page_array;
+ int shrink_page_count;
+
+ struct work_struct work; /* resize de-allocation work struct */
+};
+
+extern void *
+alloc_rchan_buf(unsigned long size,
+ struct page ***page_array,
+ int *page_count);
+
+extern void
+free_rchan_buf(void *buf,
+ struct page **page_array,
+ int page_count);
+
+extern void
+expand_check(struct rchan *rchan);
+
+extern void
+init_shrink_timer(struct rchan *rchan);
+
+#endif/* _RELAY_RESIZE_H */

--
Regards,

Tom Zanussi <zanussi@xxxxxxxxxx>
IBM Linux Technology Center/RAS

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/