[patch] new pipe code

From: Manfred Spraul (manfreds@colorfullife.com)
Date: Wed Mar 01 2000 - 16:18:23 EST


My new pipe code is finished:

* it scales better with multiple readers from the same pipe.

* it's slightly faster in the normal, uncontended case. (~1%)

It's tested on i386 SMP.

--
	Manfred

// $Header$ // Kernel Version: // VERSION = 2 // PATCHLEVEL = 3 // SUBLEVEL = 48 // EXTRAVERSION = --- 2.3/fs/pipe.c Sun Feb 27 08:57:11 2000 +++ build-2.3/fs/pipe.c Wed Mar 1 21:39:33 2000 @@ -29,10 +29,13 @@ * * Reads with count = 0 should always return 0. * -- Julian Bradfield 1999-06-07. + * + * new locking code + * (c) 2000 Manfred Spraul <manfreds@colorfullife.com> */ /* Drop the inode semaphore and wait for a pipe event, atomically */ -static void pipe_wait(struct inode * inode) +void pipe_wait(struct inode * inode) { DECLARE_WAITQUEUE(wait, current); current->state = TASK_INTERRUPTIBLE; @@ -43,99 +46,110 @@ current->state = TASK_RUNNING; } +static int wait_for_data(struct file* filp) +{ + struct pipe_inode_info* pipe = filp->f_dentry->d_inode->i_pipe; + + /* FIXME: it's possible that data is available.*/ + if (filp->f_flags & O_NONBLOCK) + return -EAGAIN; + current->state=TASK_INTERRUPTIBLE; + spin_unlock(&pipe->lock); + schedule(); + spin_lock(&pipe->lock); + if (signal_pending(current)) + return -ERESTARTSYS; + return 0; +} + static ssize_t pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; - ssize_t size, read, ret; + struct pipe_inode_info *pipe = inode->i_pipe; + struct pipe_readerdesc pr; + ssize_t read, ret; /* Seeks are not allowed on pipes. */ - ret = -ESPIPE; - read = 0; if (ppos != &filp->f_pos) - goto out_nolock; + return -ESPIPE; /* Always return 0 on null read. */ - ret = 0; if (count == 0) - goto out_nolock; + return 0; - /* Get the pipe semaphore */ - ret = -ERESTARTSYS; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - - if (PIPE_EMPTY(*inode)) { -do_more_read: - ret = 0; - if (!PIPE_WRITERS(*inode)) - goto out; - - ret = -EAGAIN; - if (filp->f_flags & O_NONBLOCK) - goto out; - - for (;;) { - PIPE_WAITING_READERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_READERS(*inode)--; - ret = -ERESTARTSYS; - if (signal_pending(current)) - goto out_nolock; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - ret = 0; - if (!PIPE_EMPTY(*inode)) - break; - if (!PIPE_WRITERS(*inode)) - goto out; + ret = 0; + read = 0; + pr.tsk = current; + spin_lock(&pipe->lock); + list_add_tail(&pr.list, &pipe->waiting_readers); + for(;;) { + if(pipe->waiting_readers.next == &pr.list) + break; + ret = wait_for_data(filp); + if(ret != 0) + goto out_unlink; + } + for(;;) { + int avail; + avail=PIPE_LEN(*inode); + if(avail != 0) { + int start; + if(avail>count) + avail=count; + /* transfer data*/ + start = pipe->start; + spin_unlock(&pipe->lock); + + if(start+avail<=PIPE_SIZE) { + ret = copy_to_user(buf, pipe->base+start, avail); + } else { + int p1 = PIPE_SIZE-start; + ret = copy_to_user(buf, pipe->base+start, p1); + ret |= copy_to_user(buf+p1, pipe->base, avail-p1); + } + + spin_lock(&pipe->lock); + if(ret!=0) { + read=0; + ret = -EFAULT; + goto out_unlink; + } + read += avail; + buf += avail; + count-=avail; + pipe->start = (start+avail)&(PIPE_SIZE-1); + PIPE_LEN(*inode) -= avail; + /* wake up polling threads*/ + if(waitqueue_active(&pipe->wait)) + wake_up(&pipe->wait); + if (list_empty(&pipe->waiting_writers)) { + /* Cache behaviour optimization */ + if(PIPE_LEN(*inode) == 0) + pipe->start = 0; + } else { + /* wake up writing threads*/ + wake_up_process(list_entry(pipe->waiting_writers.next,struct pipe_writerdesc,list)->tsk); + } } + if(count<=0) + break; + /* wait for more data*/ + if (!pipe->writers) + break; + ret = wait_for_data(filp); + if(ret != 0) + goto out_unlink; } - /* Read what data is available. */ - ret = -EFAULT; - while (count > 0 && (size = PIPE_LEN(*inode))) { - char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode); - ssize_t chars = PIPE_MAX_RCHUNK(*inode); - - if (chars > count) - chars = count; - if (chars > size) - chars = size; - - if (copy_to_user(buf, pipebuf, chars)) - goto out; - - read += chars; - PIPE_START(*inode) += chars; - PIPE_START(*inode) &= (PIPE_SIZE - 1); - PIPE_LEN(*inode) -= chars; - count -= chars; - buf += chars; - } - - /* Cache behaviour optimization */ - if (!PIPE_LEN(*inode)) - PIPE_START(*inode) = 0; - - if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) { - /* - * We know that we are going to sleep: signal - * writers synchronously that there is more - * room. - */ - wake_up_interruptible_sync(PIPE_WAIT(*inode)); - if (!PIPE_EMPTY(*inode)) - BUG(); - goto do_more_read; +out_unlink: + if ((pipe->waiting_readers.next == &pr.list)&& + (pipe->waiting_readers.prev != &pr.list)) { + wake_up_process(list_entry(pr.list.next,struct pipe_readerdesc,list)->tsk); } - /* Signal writers asynchronously that there is more room. */ - wake_up_interruptible(PIPE_WAIT(*inode)); + list_del(&pr.list); - ret = read; -out: - up(PIPE_SEM(*inode)); -out_nolock: + spin_unlock(&pipe->lock); if (read) ret = read; return ret; @@ -145,124 +159,104 @@ pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; - ssize_t free, written, ret; + struct pipe_inode_info *pipe = inode->i_pipe; + struct pipe_writerdesc pw; + ssize_t minfree, written, ret; /* Seeks are not allowed on pipes. */ - ret = -ESPIPE; - written = 0; if (ppos != &filp->f_pos) - goto out_nolock; + return -ESPIPE; /* Null write succeeds. */ - ret = 0; if (count == 0) - goto out_nolock; - - ret = -ERESTARTSYS; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - -do_more_write: - /* No readers yields SIGPIPE. */ - if (!PIPE_READERS(*inode)) - goto sigpipe; - - /* If count <= PIPE_BUF, we have to make it atomic. */ - free = (count <= PIPE_BUF ? count : 1); - - /* Wait, or check for, available space. */ - if (filp->f_flags & O_NONBLOCK) { - ret = -EAGAIN; - if (PIPE_FREE(*inode) < free) - goto out; - } else { - while (PIPE_FREE(*inode) < free) { - PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_WRITERS(*inode)--; - ret = -ERESTARTSYS; - if (signal_pending(current)) - goto out_nolock; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; + return 0; - if (!PIPE_READERS(*inode)) - goto sigpipe; - } + pw.tsk = current; + ret = 0; + written = 0; + minfree=1; + if (count <= PIPE_BUF) + minfree=count; + + spin_lock(&pipe->lock); + list_add_tail(&pw.list, &pipe->waiting_writers); + for(;;) { + if(pipe->waiting_writers.next == &pw.list) + break; + ret = wait_for_data(filp); + if(ret != 0) + goto out_unlink; } - - /* Copy into available space. */ - ret = -EFAULT; - while (count > 0) { - int space; - char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode); - ssize_t chars = PIPE_MAX_WCHUNK(*inode); - - if ((space = PIPE_FREE(*inode)) != 0) { - if (chars > count) - chars = count; - if (chars > space) - chars = space; - - if (copy_from_user(pipebuf, buf, chars)) - goto out; - - written += chars; - PIPE_LEN(*inode) += chars; - count -= chars; - buf += chars; - space = PIPE_FREE(*inode); - continue; + for(;;) { + int free; + if (pipe->readers==0) + goto sigpipe; + free = PIPE_FREE(*inode); + if(free>=minfree) { + int start; + if(free>count) + free=count; + /* transfer data*/ + start = PIPE_END(*inode); + spin_unlock(&pipe->lock); + + if(start+free<=PIPE_SIZE) { + ret = copy_from_user(pipe->base+start, buf, free); + } else { + int p1=PIPE_SIZE-start; + + ret = copy_from_user(pipe->base+start, buf, p1); + ret |= copy_from_user(pipe->base, buf+p1, free-p1); + } + + spin_lock(&pipe->lock); + if(ret!=0) { + written=0; + ret = -EFAULT; + goto out_unlink; + } + written += free; + buf += free; + count-=free; + PIPE_LEN(*inode) += free; + /* wake up polling threads*/ + if(waitqueue_active(&pipe->wait)) + wake_up(&pipe->wait); + /* wake up reading threads*/ + if(!list_empty(&pipe->waiting_readers)) + wake_up_process(list_entry(pipe->waiting_readers.next,struct pipe_readerdesc,list)->tsk); } - - ret = written; - if (filp->f_flags & O_NONBLOCK) + if(count<=0) break; - - do { - /* - * Synchronous wake-up: it knows that this process - * is going to give up this CPU, so it doesnt have - * to do idle reschedules. - */ - wake_up_interruptible_sync(PIPE_WAIT(*inode)); - PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_WRITERS(*inode)--; - if (signal_pending(current)) - goto out_nolock; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - if (!PIPE_READERS(*inode)) - goto sigpipe; - } while (!PIPE_FREE(*inode)); - ret = -EFAULT; - } - - if (count && PIPE_WAITING_READERS(*inode) && - !(filp->f_flags & O_NONBLOCK)) { - wake_up_interruptible_sync(PIPE_WAIT(*inode)); - goto do_more_write; + /* wait for more free space*/ + if (pipe->readers==0) + goto sigpipe; + ret = wait_for_data(filp); + if(ret != 0) + goto out_unlink; } - /* Signal readers asynchronously that there is more data. */ - wake_up_interruptible(PIPE_WAIT(*inode)); - - inode->i_ctime = inode->i_mtime = CURRENT_TIME; - mark_inode_dirty(inode); - -out: - up(PIPE_SEM(*inode)); -out_nolock: + + if (written) { + inode->i_ctime = inode->i_mtime = CURRENT_TIME; + mark_inode_dirty(inode); + } +out_unlink: + if ((pipe->waiting_writers.next == &pw.list) && + (pipe->waiting_writers.prev != &pw.list)) { + wake_up_process(list_entry(pw.list.next,struct pipe_writerdesc,list)->tsk); + } + list_del(&pw.list); + spin_unlock(&pipe->lock); if (written) ret = written; return ret; sigpipe: if (written) - goto out; - up(PIPE_SEM(*inode)); + goto out_unlink; + ret=-EPIPE; send_sig(SIGPIPE, current, 0); - return -EPIPE; + goto out_unlink; } static loff_t @@ -303,7 +297,7 @@ poll_wait(filp, PIPE_WAIT(*inode), wait); - /* Reading only -- no need for aquiring the semaphore. */ + spin_lock(PIPE_LOCK(*inode)); mask = POLLIN | POLLRDNORM; if (PIPE_EMPTY(*inode)) mask = POLLOUT | POLLWRNORM; @@ -311,6 +305,7 @@ mask |= POLLHUP; if (!PIPE_READERS(*inode)) mask |= POLLERR; + spin_unlock(PIPE_LOCK(*inode)); return mask; } @@ -328,12 +323,13 @@ poll_wait(filp, PIPE_WAIT(*inode), wait); - /* Reading only -- no need for aquiring the semaphore. */ + spin_lock(PIPE_LOCK(*inode)); mask = POLLIN | POLLRDNORM; if (PIPE_EMPTY(*inode)) mask = POLLOUT | POLLWRNORM; if (!PIPE_READERS(*inode)) mask |= POLLERR; + spin_unlock(PIPE_LOCK(*inode)); return mask; } @@ -369,7 +365,7 @@ poll_wait(filp, PIPE_WAIT(*inode), wait); - /* Reading only -- no need for aquiring the semaphore. */ + spin_lock(PIPE_LOCK(*inode)); if (!PIPE_EMPTY(*inode)) { filp->f_op = &read_fifo_fops; mask = POLLIN | POLLRDNORM; @@ -377,10 +373,41 @@ filp->f_op = &read_fifo_fops; mask = POLLOUT | POLLWRNORM; } + spin_unlock(PIPE_LOCK(*inode)); return mask; } +void cleanup_pipe_data(struct inode* inode) +{ + struct pipe_inode_info *info = inode->i_pipe; + inode->i_pipe = NULL; + free_page((unsigned long) info->base); + kfree(info); +} + +void pipe_no_readers(struct inode* inode) +{ + struct pipe_inode_info* pipe = inode->i_pipe; + wake_up_interruptible(&pipe->wait); + spin_lock(&pipe->lock); + if(!list_empty(&pipe->waiting_writers)) { + wake_up_process(list_entry(pipe->waiting_writers.next,struct pipe_writerdesc,list)->tsk); + } + spin_unlock(&pipe->lock); +} + +void pipe_no_writers(struct inode* inode) +{ + struct pipe_inode_info* pipe = inode->i_pipe; + wake_up_interruptible(&pipe->wait); + spin_lock(&pipe->lock); + if(!list_empty(&pipe->waiting_readers)) { + wake_up_process(list_entry(pipe->waiting_readers.next,struct pipe_readerdesc,list)->tsk); + } + spin_unlock(&pipe->lock); +} + static int pipe_release(struct inode *inode, int decr, int decw) { @@ -388,12 +415,15 @@ PIPE_READERS(*inode) -= decr; PIPE_WRITERS(*inode) -= decw; if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) { - struct pipe_inode_info *info = inode->i_pipe; - inode->i_pipe = NULL; - free_page((unsigned long) info->base); - kfree(info); + /* no readers and no writers means noone uses the pipe. + * FIXME: check if someone could poll the pipe. + */ + cleanup_pipe_data(inode); } else { - wake_up_interruptible(PIPE_WAIT(*inode)); + if(!PIPE_READERS(*inode)) + pipe_no_readers(inode); + if(!PIPE_WRITERS(*inode)) + pipe_no_writers(inode); } up(PIPE_SEM(*inode)); @@ -531,30 +561,43 @@ release: pipe_rdwr_release, }; -static struct inode * get_pipe_inode(void) +int init_pipe_data(struct inode* inode) { - struct inode *inode = get_empty_inode(); unsigned long page; - if (!inode) - goto fail_inode; - page = __get_free_page(GFP_USER); if (!page) - goto fail_iput; + return -ENOMEM; inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL); - if (!inode->i_pipe) - goto fail_page; - - inode->i_fop = &rdwr_pipe_fops; + if (!inode->i_pipe) { + free_page(page); + return -ENOMEM; + } - init_waitqueue_head(PIPE_WAIT(*inode)); + spin_lock_init(PIPE_LOCK(*inode)); + INIT_LIST_HEAD(PIPE_RLIST(*inode)); + INIT_LIST_HEAD(PIPE_WLIST(*inode)); PIPE_BASE(*inode) = (char *) page; PIPE_START(*inode) = PIPE_LEN(*inode) = 0; + PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0; + init_waitqueue_head(PIPE_WAIT(*inode)); + + return 0; +} + +static struct inode * get_pipe_inode(void) +{ + struct inode *inode = get_empty_inode(); + + if (!inode) + return NULL; + if(init_pipe_data(inode) < 0) + goto fail_iput; + PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1; - PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0; + inode->i_fop = &rdwr_pipe_fops; /* * Mark the inode dirty from the very beginning, * that way it will never be moved to the dirty @@ -567,13 +610,11 @@ inode->i_gid = current->fsgid; inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; inode->i_blksize = PAGE_SIZE; + return inode; -fail_page: - free_page(page); fail_iput: iput(inode); -fail_inode: return NULL; } @@ -634,9 +675,7 @@ close_f12_inode_i: put_unused_fd(i); close_f12_inode: - free_page((unsigned long) PIPE_BASE(*inode)); - kfree(inode->i_pipe); - inode->i_pipe = NULL; + cleanup_pipe_data(inode); iput(inode); close_f12: put_filp(f2); --- 2.3/fs/fifo.c Sun Feb 27 08:57:11 2000 +++ build-2.3/fs/fifo.c Wed Mar 1 20:34:17 2000 @@ -20,27 +20,10 @@ if (down_interruptible(PIPE_SEM(*inode))) goto err_nolock_nocleanup; - if (! inode->i_pipe) { - unsigned long page; - struct pipe_inode_info *info; - - info = kmalloc(sizeof(struct pipe_inode_info),GFP_KERNEL); - - ret = -ENOMEM; - if (!info) + if (!inode->i_pipe) { + ret = init_pipe_data(inode); + if(ret < 0) goto err_nocleanup; - page = __get_free_page(GFP_KERNEL); - if (!page) { - kfree(info); - goto err_nocleanup; - } - - inode->i_pipe = info; - - init_waitqueue_head(PIPE_WAIT(*inode)); - PIPE_BASE(*inode) = (char *) page; - PIPE_START(*inode) = PIPE_LEN(*inode) = 0; - PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0; } switch (filp->f_mode) { @@ -58,8 +41,7 @@ while (!PIPE_WRITERS(*inode)) { if (signal_pending(current)) goto err_rd; - up(PIPE_SEM(*inode)); - interruptible_sleep_on(PIPE_WAIT(*inode)); + pipe_wait(inode); /* Note that using down_interruptible here and similar places below is pointless, @@ -90,8 +72,8 @@ while (!PIPE_READERS(*inode)) { if (signal_pending(current)) goto err_wr; - up(PIPE_SEM(*inode)); - interruptible_sleep_on(PIPE_WAIT(*inode)); + pipe_wait(inode); + down(PIPE_SEM(*inode)); } break; @@ -122,22 +104,20 @@ err_rd: if (!--PIPE_READERS(*inode)) - wake_up_interruptible(PIPE_WAIT(*inode)); + pipe_no_readers(inode); ret = -ERESTARTSYS; goto err; err_wr: if (!--PIPE_WRITERS(*inode)) - wake_up_interruptible(PIPE_WAIT(*inode)); + pipe_no_writers(inode); ret = -ERESTARTSYS; goto err; err: if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) { - struct pipe_inode_info *info = inode->i_pipe; - inode->i_pipe = NULL; - free_page((unsigned long)info->base); - kfree(info); + /* FIXME: could someone poll this filp? */ + cleanup_pipe_data(inode); } err_nocleanup: --- 2.3/include/linux/pipe_fs_i.h Tue Dec 7 10:48:22 1999 +++ build-2.3/include/linux/pipe_fs_i.h Wed Mar 1 20:34:17 2000 @@ -1,16 +1,34 @@ #ifndef _LINUX_PIPE_FS_I_H #define _LINUX_PIPE_FS_I_H +struct pipe_writerdesc { + struct list_head list; + struct task_struct* tsk; +}; + +struct pipe_readerdesc { + struct list_head list; + struct task_struct* tsk; +}; + struct pipe_inode_info { - wait_queue_head_t wait; + spinlock_t lock; + struct list_head waiting_readers; + struct list_head waiting_writers; char *base; unsigned int start; unsigned int readers; unsigned int writers; - unsigned int waiting_readers; - unsigned int waiting_writers; + + wait_queue_head_t wait; }; +int init_pipe_data (struct inode* inode); +void cleanup_pipe_data (struct inode* inode); +void pipe_wait (struct inode* inode); +void pipe_no_writers (struct inode* inode); +void pipe_no_readers (struct inode* inode); + /* Differs from PIPE_BUF in that PIPE_SIZE is the length of the actual memory allocation, whereas PIPE_BUF makes atomicity guarantees. */ #define PIPE_SIZE PAGE_SIZE @@ -22,8 +40,9 @@ #define PIPE_LEN(inode) ((inode).i_size) #define PIPE_READERS(inode) ((inode).i_pipe->readers) #define PIPE_WRITERS(inode) ((inode).i_pipe->writers) -#define PIPE_WAITING_READERS(inode) ((inode).i_pipe->waiting_readers) -#define PIPE_WAITING_WRITERS(inode) ((inode).i_pipe->waiting_writers) +#define PIPE_LOCK(inode) (&(inode).i_pipe->lock) +#define PIPE_RLIST(inode) (&(inode).i_pipe->waiting_readers) +#define PIPE_WLIST(inode) (&(inode).i_pipe->waiting_writers) #define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0) #define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE) @@ -32,4 +51,15 @@ #define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode)) #define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode)) +/* synchronization: + * PIPE_SEM protects PIPE_READERS, PIPE_WRITERS and i_pipe. + * i_pipe is also changed before fd_install() and after the last + * frip(). + * PIPE_LOCK protects PIPE_RLIST, PIPE_WLIST, PIPE_START, PIPE_LEN. + * new readers, writers are added to the tail of their lists. + * the head of PIPE_{R,W}LIST can {read,write} data without owning + * PIPE_LOCK. + * if a thread decreases PIPE_{READERS,WRITERS} to 0, then it must + * call pipe_no_{readers,writes}. + */ #endif

- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majordomo@vger.rutgers.edu Please read the FAQ at http://www.tux.org/lkml/



This archive was generated by hypermail 2b29 : Tue Mar 07 2000 - 21:00:10 EST