Re: [DLM] Add support for tcp communications [38/70]

From: Andrew Morton
Date: Fri Dec 01 2006 - 02:05:22 EST


On Thu, 30 Nov 2006 12:19:08 +0000
Steven Whitehouse <swhiteho@xxxxxxxxxx> wrote:

> >From fdda387f73947e6ae511ec601f5b3c6fbb582aac Mon Sep 17 00:00:00 2001
> From: Patrick Caulfield <pcaulfie@xxxxxxxxxx>
> Date: Thu, 2 Nov 2006 11:19:21 -0500
> Subject: [PATCH] [DLM] Add support for tcp communications
>
> The following patch adds a TCP based communications layer
> to the DLM which is compile time selectable. The existing SCTP
> layer gives the advantage of allowing multihoming, whereas
> the TCP layer has been heavily tested in previous versions of
> the DLM and is known to be robust and therefore can be used as
> a baseline for performance testing.
>

I don't think this code is ready.


> ...
>
> +static struct rw_semaphore nodeinfo_lock;

This can be initialised at compile-time.

> ...
>
> +#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
> +#define CBUF_EMPTY(cb) ((cb)->len == 0)
> +#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
> +#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
> +
> +#define CBUF_INIT(cb, size) \
> +do { \
> + (cb)->base = (cb)->len = 0; \
> + (cb)->mask = ((size)-1); \
> +} while(0)
> +
> +#define CBUF_EAT(cb, n) \
> +do { \
> + (cb)->len -= (n); \
> + (cb)->base += (n); \
> + (cb)->base &= (cb)->mask; \
> +} while(0)

Suggest that all of the above be converted to C: lower-case static-inline
functions.

> +
> +/* List of nodes which have writes pending */
> +static struct list_head write_nodes;

LIST_HEAD (maybe)

> +static spinlock_t write_nodes_lock;

DEFINE_SPINLOCK

> +
> +/* Maximum number of incoming messages to process before
> + * doing a schedule()
> + */
> +#define MAX_RX_MSG_COUNT 25
> +
> +/* Manage daemons */
> +static struct task_struct *recv_task;
> +static struct task_struct *send_task;
> +static wait_queue_head_t lowcomms_recv_wait;

DECLARE_WAIT_QUEUE_HEAD

> +static atomic_t accepting;

= ATOMIC_INIT(0)

Generally, initialising things at compile-time is better: smaller vmlinux,
certainty that the code got it right.

> +static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
> +{
> + struct nodeinfo *ni;
> + int r;
> + int n;
> +
> + down_read(&nodeinfo_lock);

Given that this function can sleep, I wonder if `alloc' is useful.

I see lots of callers passing in a literal "0" for `alloc'. That's in fact
a secret (GFP_ATOMIC & ~__GFP_HIGH). I doubt if that's what you really
meant. Particularly as the code could at least have used __GFP_WAIT (aka
GFP_NOIO) which is much, much more reliable than "0". In fact "0" is the
least reliable mode possible.

IOW, this is all bollixed up.

> + ni = idr_find(&nodeinfo_idr, nodeid);
> + up_read(&nodeinfo_lock);
> +
> + if (!ni && alloc) {
> + down_write(&nodeinfo_lock);
> +
> + ni = idr_find(&nodeinfo_idr, nodeid);
> + if (ni)
> + goto out_up;
> +
> + r = idr_pre_get(&nodeinfo_idr, alloc);
> + if (!r)
> + goto out_up;
> +
> + ni = kmalloc(sizeof(struct nodeinfo), alloc);

kzalloc

> + if (!ni)
> + goto out_up;
> +
> + r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
> + if (r) {
> + kfree(ni);
> + ni = NULL;
> + goto out_up;
> + }
> + if (n != nodeid) {
> + idr_remove(&nodeinfo_idr, n);
> + kfree(ni);
> + ni = NULL;
> + goto out_up;
> + }
> + memset(ni, 0, sizeof(struct nodeinfo));

hence nuke

> + spin_lock_init(&ni->lock);
> + INIT_LIST_HEAD(&ni->writequeue);
> + spin_lock_init(&ni->writequeue_lock);
> + ni->nodeid = nodeid;
> +
> + if (nodeid > max_nodeid)
> + max_nodeid = nodeid;
> + out_up:
> + up_write(&nodeinfo_lock);
> + }
> +
> + return ni;
> +}
> +
> +/* Don't call this too often... */
> +static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
> +{
> + int i;
> + struct nodeinfo *ni;
> +
> + for (i=1; i<=max_nodeid; i++) {
> + ni = nodeid2nodeinfo(i, 0);

GFP_NOIO (at least) (lots of places)

> + if (ni && ni->assoc_id == assoc)
> + return ni;
> + }
> + return NULL;
> +}
> +
>
> ...
>
> +static void send_shutdown(sctp_assoc_t associd)
> +{
> + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
> + struct msghdr outmessage;
> + struct cmsghdr *cmsg;
> + struct sctp_sndrcvinfo *sinfo;
> + int ret;
> +
> + outmessage.msg_name = NULL;
> + outmessage.msg_namelen = 0;
> + outmessage.msg_control = outcmsg;
> + outmessage.msg_controllen = sizeof(outcmsg);
> + outmessage.msg_flags = MSG_EOR;
> +
> + cmsg = CMSG_FIRSTHDR(&outmessage);
> + cmsg->cmsg_level = IPPROTO_SCTP;
> + cmsg->cmsg_type = SCTP_SNDRCV;
> + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
> + outmessage.msg_controllen = cmsg->cmsg_len;
> + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);

CMSG_DATA() returns void* - unneeded cast

> + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
> +
> + sinfo->sinfo_flags |= MSG_EOF;
> + sinfo->sinfo_assoc_id = associd;
> +
> + ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
> +
> + if (ret != 0)
> + log_print("send EOF to node failed: %d", ret);
> +}
>
> ...
>
> + ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
> + if (!ni)
> + return;
> +
> + /* Save the assoc ID */
> + spin_lock(&ni->lock);
> + ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
> + spin_unlock(&ni->lock);

A bare assignment inside a lock like this is quite unusual. It is often an
indication that something is wrong.

> +/* Data received from remote end */
> +static int receive_from_sock(void)
> +{
> + int ret = 0;
> + struct msghdr msg;
> + struct kvec iov[2];
> + unsigned len;
> + int r;
> + struct sctp_sndrcvinfo *sinfo;
> + struct cmsghdr *cmsg;
> + struct nodeinfo *ni;
> +
> + /* These two are marginally too big for stack allocation, but this
> + * function is (currently) only called by dlm_recvd so static should be
> + * OK.
> + */
> + static struct sockaddr_storage msgname;
> + static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];

whoa. This is globally singly-threaded code??


> + if (sctp_con.sock == NULL)
> + goto out;
> +
> + if (sctp_con.rx_page == NULL) {
> + /*
> + * This doesn't need to be atomic, but I think it should
> + * improve performance if it is.
> + */
> + sctp_con.rx_page = alloc_page(GFP_ATOMIC);
> + if (sctp_con.rx_page == NULL)
> + goto out_resched;
> + CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE);
> + }
> +
> + memset(&incmsg, 0, sizeof(incmsg));
> + memset(&msgname, 0, sizeof(msgname));
> +
> + memset(incmsg, 0, sizeof(incmsg));

You just zeroed it twice.

> + msg.msg_name = &msgname;
> + msg.msg_namelen = sizeof(msgname);
> + msg.msg_flags = 0;
> + msg.msg_control = incmsg;
> + msg.msg_controllen = sizeof(incmsg);
> + msg.msg_iovlen = 1;
> +
> + /* I don't see why this circular buffer stuff is necessary for SCTP
> + * which is a packet-based protocol, but the whole thing breaks under
> + * load without it! The overhead is minimal (and is in the TCP lowcomms
> + * anyway, of course) so I'll leave it in until I can figure out what's
> + * really happening.
> + */
> +
> + /*
> + * iov[0] is the bit of the circular buffer between the current end
> + * point (cb.base + cb.len) and the end of the buffer.
> + */
> + iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb);
> + iov[0].iov_base = page_address(sctp_con.rx_page) +
> + CBUF_DATA(&sctp_con.cb);
> + iov[1].iov_len = 0;
> +
> + /*
> + * iov[1] is the bit of the circular buffer between the start of the
> + * buffer and the start of the currently used section (cb.base)
> + */
> + if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) {
> + iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb);
> + iov[1].iov_len = sctp_con.cb.base;
> + iov[1].iov_base = page_address(sctp_con.rx_page);
> + msg.msg_iovlen = 2;
> + }
> + len = iov[0].iov_len + iov[1].iov_len;
> +
> + r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
> + MSG_NOSIGNAL | MSG_DONTWAIT);
> + if (ret <= 0)
> + goto out_close;
> +
> + msg.msg_control = incmsg;
> + msg.msg_controllen = sizeof(incmsg);
> + cmsg = CMSG_FIRSTHDR(&msg);
> + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
> +
> + if (msg.msg_flags & MSG_NOTIFICATION) {
> + process_sctp_notification(&msg, page_address(sctp_con.rx_page));
> + return 0;
> + }
> +
> + /* Is this a new association ? */
> + ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
> + if (ni) {
> + ni->assoc_id = sinfo->sinfo_assoc_id;
> + if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
> +
> + if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
> + spin_lock_bh(&write_nodes_lock);
> + list_add_tail(&ni->write_list, &write_nodes);
> + spin_unlock_bh(&write_nodes_lock);
> + }
> + wake_up_process(send_task);
> + }
> + }
> +
> + /* INIT sends a message with length of 1 - ignore it */
> + if (r == 1)
> + return 0;
> +
> + CBUF_ADD(&sctp_con.cb, ret);
> + ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
> + page_address(sctp_con.rx_page),
> + sctp_con.cb.base, sctp_con.cb.len,
> + PAGE_CACHE_SIZE);
> + if (ret < 0)
> + goto out_close;
> + CBUF_EAT(&sctp_con.cb, ret);
> +
> + out:

Labels go in column 0 or 1

> + ret = 0;
> + goto out_ret;
> +
> + out_resched:
> + lowcomms_data_ready(sctp_con.sock->sk, 0);
> + ret = 0;
> + schedule();

What's the random schedule() for? If !need_reached() it's just a waste of
cycles.

> + goto out_ret;
> +
> + out_close:
> + if (ret != -EAGAIN)
> + log_print("error reading from sctp socket: %d", ret);
> + out_ret:
> + return ret;
> +}
> +
>
> ...
>
> +void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
> +{
> + struct writequeue_entry *e;
> + int offset = 0;
> + int users = 0;
> + struct nodeinfo *ni;
> +
> + if (!atomic_read(&accepting))
> + return NULL;

hm, this looks racy. What happens if `accepting' goes to zero 50
nanoseconds later?

> + ni = nodeid2nodeinfo(nodeid, allocation);
> + if (!ni)
> + return NULL;
> +
> + spin_lock(&ni->writequeue_lock);
> + e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
> + if (((struct list_head *) e == &ni->writequeue) ||

That typecast looks fishy. Perhaps a container_of() or something would be
clearer.

> + (PAGE_CACHE_SIZE - e->end < len)) {
> + e = NULL;
> + } else {
> + offset = e->end;
> + e->end += len;
> + users = e->users++;
> + }
> + spin_unlock(&ni->writequeue_lock);
> +
> + if (e) {
> + got_one:

whoa, how'd that label get all the way over there?

> + if (users == 0)
> + kmap(e->page);
> + *ppc = page_address(e->page) + offset;
> + return e;
> + }
> +
> + e = new_writequeue_entry(allocation);
> + if (e) {
> + spin_lock(&ni->writequeue_lock);
> + offset = e->end;
> + e->end += len;
> + e->ni = ni;
> + users = e->users++;
> + list_add_tail(&e->list, &ni->writequeue);
> + spin_unlock(&ni->writequeue_lock);
> + goto got_one;
> + }
> + return NULL;
> +}
> +
>
> ...
>
> +static void initiate_association(int nodeid)
> +{
> + struct sockaddr_storage rem_addr;
> + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];

Another static buffer to worry about. Globally singly-threaded code?

> + struct msghdr outmessage;
> + struct cmsghdr *cmsg;
> + struct sctp_sndrcvinfo *sinfo;
> + int ret;
> + int addrlen;
> + char buf[1];
> + struct kvec iov[1];
> + struct nodeinfo *ni;
> +
> + log_print("Initiating association with node %d", nodeid);
> +
> + ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
> + if (!ni)
> + return;
> +
> + if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
> + log_print("no address for nodeid %d", nodeid);
> + return;

Am surprised that all these errors appear to be ignored.

> + }
> +
> + make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen);
> +
> + outmessage.msg_name = &rem_addr;
> + outmessage.msg_namelen = addrlen;
> + outmessage.msg_control = outcmsg;
> + outmessage.msg_controllen = sizeof(outcmsg);
> + outmessage.msg_flags = MSG_EOR;
> +
> + iov[0].iov_base = buf;
> + iov[0].iov_len = 1;
> +
> + /* Real INIT messages seem to cause trouble. Just send a 1 byte message
> + we can afford to lose */
> + cmsg = CMSG_FIRSTHDR(&outmessage);
> + cmsg->cmsg_level = IPPROTO_SCTP;
> + cmsg->cmsg_type = SCTP_SNDRCV;
> + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
> + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
> + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
> + sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
> +
> + outmessage.msg_controllen = cmsg->cmsg_len;
> + ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
> + if (ret < 0) {
> + log_print("send INIT to node failed: %d", ret);
> + /* Try again later */
> + clear_bit(NI_INIT_PENDING, &ni->flags);
> + }
> +}
> +
> +/* Send a message */
> +static int send_to_sock(struct nodeinfo *ni)
> +{
> + int ret = 0;
> + struct writequeue_entry *e;
> + int len, offset;
> + struct msghdr outmsg;
> + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];

Singly-threaded?

> + struct cmsghdr *cmsg;
> + struct sctp_sndrcvinfo *sinfo;
> + struct kvec iov;
> +
> + /* See if we need to init an association before we start
> + sending precious messages */

Please use tabs.

> + spin_lock(&ni->lock);
> + if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
> + spin_unlock(&ni->lock);
> + initiate_association(ni->nodeid);
> + return 0;
> + }
> + spin_unlock(&ni->lock);
> +
> + outmsg.msg_name = NULL; /* We use assoc_id */
> + outmsg.msg_namelen = 0;
> + outmsg.msg_control = outcmsg;
> + outmsg.msg_controllen = sizeof(outcmsg);
> + outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
> +
> + cmsg = CMSG_FIRSTHDR(&outmsg);
> + cmsg->cmsg_level = IPPROTO_SCTP;
> + cmsg->cmsg_type = SCTP_SNDRCV;
> + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
> + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);

Unneeded cast

> + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
> + sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
> + sinfo->sinfo_assoc_id = ni->assoc_id;
> + outmsg.msg_controllen = cmsg->cmsg_len;
> +
> + spin_lock(&ni->writequeue_lock);
> + for (;;) {
> + if (list_empty(&ni->writequeue))
> + break;
> + e = list_entry(ni->writequeue.next, struct writequeue_entry,
> + list);
> + len = e->len;
> + offset = e->offset;
> + BUG_ON(len == 0 && e->users == 0);
> + spin_unlock(&ni->writequeue_lock);
> + kmap(e->page);

I think this kmap() gets leaked

> + ret = 0;
> + if (len) {
> + iov.iov_base = page_address(e->page)+offset;
> + iov.iov_len = len;
> +
> + ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
> + len);
> + if (ret == -EAGAIN) {
> + sctp_con.eagain_flag = 1;
> + goto out;
> + } else if (ret < 0)
> + goto send_error;
> + } else {
> + /* Don't starve people filling buffers */
> + schedule();
> + }
> +
> + spin_lock(&ni->writequeue_lock);
> + e->offset += ret;
> + e->len -= ret;
> +
> + if (e->len == 0 && e->users == 0) {
> + list_del(&e->list);
> + free_entry(e);
> + continue;
> + }
> + }
> + spin_unlock(&ni->writequeue_lock);
> + out:
> + return ret;
> +
> + send_error:
> + log_print("Error sending to node %d %d", ni->nodeid, ret);
> + spin_lock(&ni->lock);
> + if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
> + ni->assoc_id = 0;
> + spin_unlock(&ni->lock);
> + initiate_association(ni->nodeid);
> + } else
> + spin_unlock(&ni->lock);
> +
> + return ret;
> +}
>
> ...
>
> +static void dealloc_nodeinfo(void)
> +{
> + int i;
> +
> + for (i=1; i<=max_nodeid; i++) {
> + struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
> + if (ni) {
> + idr_remove(&nodeinfo_idr, i);

Didn't that need locking?

> + kfree(ni);
> + }
> + }
> +}
> +
>
> ..
>
> +static int write_list_empty(void)
> +{
> + int status;
> +
> + spin_lock_bh(&write_nodes_lock);
> + status = list_empty(&write_nodes);
> + spin_unlock_bh(&write_nodes_lock);
> +
> + return status;
> +}

This function's return value is meaningless. As soon as the lock gets
dropped, the return value can get out of sync with reality.

Looking at the caller, this _might_ happen to be OK, but it's a nasty and
dangerous thing. Really the locking should be moved into the caller.

> +static int dlm_recvd(void *data)
> +{
> + DECLARE_WAITQUEUE(wait, current);
> +
> + while (!kthread_should_stop()) {
> + int count = 0;
> +
> + set_current_state(TASK_INTERRUPTIBLE);
> + add_wait_queue(&lowcomms_recv_wait, &wait);
> + if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
> + schedule();
> + remove_wait_queue(&lowcomms_recv_wait, &wait);
> + set_current_state(TASK_RUNNING);
> +
> + if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
> + int ret;
> +
> + do {
> + ret = receive_from_sock();
> +
> + /* Don't starve out everyone else */
> + if (++count >= MAX_RX_MSG_COUNT) {
> + schedule();

cond_resched() would be more efficient. Potentially a lot more efficient.
(several places).


> + count = 0;
> + }
> + } while (!kthread_should_stop() && ret >=0);
> + }
> + schedule();
> + }
> +
> + return 0;
> +}
>
> ...
>
> +static int daemons_start(void)
> +{
> + struct task_struct *p;
> + int error;
> +
> + p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
> + error = IS_ERR(p);
> + if (error) {
> + log_print("can't start dlm_recvd %d", error);
> + return error;
> + }

whitespace broke

> + recv_task = p;
> +
> + p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
> + error = IS_ERR(p);
> + if (error) {
> + log_print("can't start dlm_sendd %d", error);
> + kthread_stop(recv_task);
> + return error;
> + }

ditto

> + send_task = p;
> +
> + return 0;
> +}
> +
>
> ...
>
> +#ifndef FALSE
> +#define FALSE 0
> +#define TRUE 1
> +#endif

No, we have a kernel-wide bool/true/false implementation. Please use that.

>
> +#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0)
> +#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
> +#define CBUF_EMPTY(cb) ((cb)->len == 0)
> +#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
> +#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \
> + (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0)
> +#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)

This cbuf API seems to be part-implemented in two places.

Ditto previous comments about macro->inline conversion

> ...
> +static struct sockaddr_storage dlm_local_addr;
> +
> +/* Manage daemons */
> +static struct task_struct *recv_task;
> +static struct task_struct *send_task;
> +
> +static wait_queue_t lowcomms_send_waitq_head;
> +static wait_queue_head_t lowcomms_send_waitq;
> +static wait_queue_t lowcomms_recv_waitq_head;
> +static wait_queue_head_t lowcomms_recv_waitq;
> +
> +/* An array of pointers to connections, indexed by NODEID */
> +static struct connection **connections;
> +static struct semaphore connections_lock;
> +static kmem_cache_t *con_cache;
> +static int conn_array_size;
> +static atomic_t accepting;
> +
> +/* List of sockets that have reads pending */
> +static struct list_head read_sockets;
> +static spinlock_t read_sockets_lock;
> +
> +/* List of sockets which have writes pending */
> +static struct list_head write_sockets;
> +static spinlock_t write_sockets_lock;
> +
> +/* List of sockets which have connects pending */
> +static struct list_head state_sockets;
> +static spinlock_t state_sockets_lock;

See previous comments regarding static initialisation

> +static struct connection *nodeid2con(int nodeid, gfp_t allocation)
> +{
> + struct connection *con = NULL;
> +
> + down(&connections_lock);
> + if (nodeid >= conn_array_size) {
> + int new_size = nodeid + NODE_INCREMENT;
> + struct connection **new_conns;
> +
> + new_conns = kmalloc(sizeof(struct connection *) *
> + new_size, allocation);
> + if (!new_conns)
> + goto finish;
> +
> + memset(new_conns, 0, sizeof(struct connection *) * new_size);

kzalloc()

> + memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
> + conn_array_size = new_size;
> + kfree(connections);
> + connections = new_conns;
> +
> + }
> +
> + con = connections[nodeid];
> + if (con == NULL && allocation) {
> + con = kmem_cache_alloc(con_cache, allocation);
> + if (!con)
> + goto finish;
> +
> + memset(con, 0, sizeof(*con));

kmem_cache_zalloc()

> + con->nodeid = nodeid;
> + init_rwsem(&con->sock_sem);
> + INIT_LIST_HEAD(&con->writequeue);
> + spin_lock_init(&con->writequeue_lock);
> +
> + connections[nodeid] = con;
> + }
> +
> + finish:
> + up(&connections_lock);
> + return con;
> +}
>
> ...
>
> +/* Add the port number to an IP6 or 4 sockaddr and return the address
> + length */
> +static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
> + int *addr_len)
> +{
> + saddr->ss_family = dlm_local_addr.ss_family;
> + if (saddr->ss_family == AF_INET) {
> + struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
> + in4_addr->sin_port = cpu_to_be16(port);
> + *addr_len = sizeof(struct sockaddr_in);
> + }

whitespace broke

> + else {

} else {

> + struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
> + in6_addr->sin6_port = cpu_to_be16(port);
> + *addr_len = sizeof(struct sockaddr_in6);
> + }
> +}
> +
>
> ...
>
> +static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr)

Exceeds 80 cols (lots of places)

> +{
> + struct socket *sock = NULL;
> + mm_segment_t fs;

whitespace broke

> + int result = 0;
> + int one = 1;
> + int addr_len;
> +
> + if (dlm_local_addr.ss_family == AF_INET)
> + addr_len = sizeof(struct sockaddr_in);
> + else
> + addr_len = sizeof(struct sockaddr_in6);
> +
> + /* Create a socket to communicate with */
> + result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
> + if (result < 0) {
> + printk("dlm: Can't create listening comms socket\n");
> + goto create_out;
> + }
> +
> + fs = get_fs();
> + set_fs(get_ds());
> + result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
> + set_fs(fs);
> + if (result < 0) {
> + printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result);
> + }
> + sock->sk->sk_user_data = con;
> + con->rx_action = accept_from_sock;
> + con->sock = sock;
> +
> + /* Bind to our port */
> + make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
> + result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
> + if (result < 0) {
> + printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
> + sock_release(sock);
> + sock = NULL;
> + con->sock = NULL;
> + goto create_out;
> + }
> +
> + fs = get_fs();
> + set_fs(get_ds());
> +
> + result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one));
> + set_fs(fs);
> + if (result < 0) {
> + printk("dlm: Set keepalive failed: %d\n", result);
> + }
> +
> + result = sock->ops->listen(sock, 5);
> + if (result < 0) {
> + printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
> + sock_release(sock);
> + sock = NULL;
> + goto create_out;
> + }
> +
> + create_out:
> + return sock;
> +}
> +
> +
> +/* Listen on all interfaces */
> +static int listen_for_all(void)
> +{
> + struct socket *sock = NULL;
> + struct connection *con = nodeid2con(0, GFP_KERNEL);
> + int result = -EINVAL;
> +
> + /* We don't support multi-homed hosts */
> + memset(con, 0, sizeof(*con));

It would be more efficient to make nodeid2con() return a zeroed object.

> + init_rwsem(&con->sock_sem);
> + spin_lock_init(&con->writequeue_lock);
> + INIT_LIST_HEAD(&con->writequeue);
> + set_bit(CF_IS_OTHERCON, &con->flags);
> +
> + sock = create_listen_sock(con, &dlm_local_addr);
> + if (sock) {
> + add_sock(sock, con);
> + result = 0;
> + }
> + else {
> + result = -EADDRINUSE;
> + }
> +
> + return result;
> +}
> +
> +}
> +
>
> ...
>
> +void *dlm_lowcomms_get_buffer(int nodeid, int len,
> + gfp_t allocation, char **ppc)
> +{
> + struct connection *con;
> + struct writequeue_entry *e;
> + int offset = 0;
> + int users = 0;
> +
> + if (!atomic_read(&accepting))
> + return NULL;
> +
> + con = nodeid2con(nodeid, allocation);
> + if (!con)
> + return NULL;
> +
> + spin_lock(&con->writequeue_lock);
> + e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
> + if (((struct list_head *) e == &con->writequeue) ||

There's that cast again

> + (PAGE_CACHE_SIZE - e->end < len)) {
> + e = NULL;
> + } else {
> + offset = e->end;
> + e->end += len;
> + users = e->users++;
> + }
> + spin_unlock(&con->writequeue_lock);
> +
> + if (e) {
> + got_one:
> + if (users == 0)
> + kmap(e->page);

Please check that this kmap (and the others) don't get leaked.

> + *ppc = page_address(e->page) + offset;
> + return e;
> + }
> +
> + e = new_writequeue_entry(con, allocation);
> + if (e) {
> + spin_lock(&con->writequeue_lock);
> + offset = e->end;
> + e->end += len;
> + users = e->users++;
> + list_add_tail(&e->list, &con->writequeue);
> + spin_unlock(&con->writequeue_lock);
> + goto got_one;
> + }
> + return NULL;
> +}
> +
> +/* Send a message */
> +static int send_to_sock(struct connection *con)
> +{
> + int ret = 0;
> + ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
> + const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
> + struct writequeue_entry *e;
> + int len, offset;
> +
> + down_read(&con->sock_sem);
> + if (con->sock == NULL)
> + goto out_connect;
> +
> + sendpage = con->sock->ops->sendpage;
> +
> + spin_lock(&con->writequeue_lock);
> + for (;;) {
> + e = list_entry(con->writequeue.next, struct writequeue_entry,
> + list);
> + if ((struct list_head *) e == &con->writequeue)
> + break;
> +
> + len = e->len;
> + offset = e->offset;
> + BUG_ON(len == 0 && e->users == 0);
> + spin_unlock(&con->writequeue_lock);
> +
> + ret = 0;
> + if (len) {
> + ret = sendpage(con->sock, e->page, offset, len,
> + msg_flags);
> + if (ret == -EAGAIN || ret == 0)
> + goto out;
> + if (ret <= 0)
> + goto send_error;
> + }
> + else {
> + /* Don't starve people filling buffers */
> + schedule();

cond_resched()

> + }
> +
> + spin_lock(&con->writequeue_lock);
> + e->offset += ret;
> + e->len -= ret;
> +
> + if (e->len == 0 && e->users == 0) {
> + list_del(&e->list);
> + free_entry(e);
> + continue;
> + }
> + }
> + spin_unlock(&con->writequeue_lock);
> + out:
> + up_read(&con->sock_sem);
> + return ret;
> +
> + send_error:
> + up_read(&con->sock_sem);
> + close_connection(con, FALSE);
> + lowcomms_connect_sock(con);
> + return ret;
> +
> + out_connect:
> + up_read(&con->sock_sem);

More wayward labels there.

> + lowcomms_connect_sock(con);
> + return 0;
> +}
> +
>
> ...
>
> +/* Try to send any messages that are pending
> + */
> +static void process_output_queue(void)
> +{
> + struct list_head *list;
> + struct list_head *temp;
> + int ret;
> +
> + spin_lock_bh(&write_sockets_lock);
> + list_for_each_safe(list, temp, &write_sockets) {
> + struct connection *con =
> + list_entry(list, struct connection, write_list);
> + clear_bit(CF_WRITE_PENDING, &con->flags);
> + list_del(&con->write_list);
> +
> + spin_unlock_bh(&write_sockets_lock);
> +
> + ret = send_to_sock(con);
> + if (ret < 0) {
> + }

hmm.

> + spin_lock_bh(&write_sockets_lock);
> + }
> + spin_unlock_bh(&write_sockets_lock);
> +}
> +
> +static void process_state_queue(void)
> +{
> + struct list_head *list;
> + struct list_head *temp;
> + int ret;
> +
> + spin_lock_bh(&state_sockets_lock);
> + list_for_each_safe(list, temp, &state_sockets) {
> + struct connection *con =
> + list_entry(list, struct connection, state_list);
> + list_del(&con->state_list);
> + clear_bit(CF_CONNECT_PENDING, &con->flags);
> + spin_unlock_bh(&state_sockets_lock);
> +
> + ret = connect_to_sock(con);
> + if (ret < 0) {
> + }

?

> + spin_lock_bh(&state_sockets_lock);
> + }
> + spin_unlock_bh(&state_sockets_lock);
> +}
> +
> +
> +/* Discard all entries on the write queues */
> +static void clean_writequeues(void)
> +{
> + int nodeid;
> +
> + for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
> + struct connection *con = nodeid2con(nodeid, 0);
> +
> + if (con)
> + clean_one_writequeue(con);
> + }
> +}
> +
> +static int read_list_empty(void)
> +{
> + int status;
> +
> + spin_lock_bh(&read_sockets_lock);
> + status = list_empty(&read_sockets);
> + spin_unlock_bh(&read_sockets_lock);
> +
> + return status;
> +}

Again, unsafe.

> +static int daemons_start(void)
> +{
> + struct task_struct *p;
> + int error;
> +
> + p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
> + error = IS_ERR(p);
> + if (error) {
> + log_print("can't start dlm_recvd %d", error);
> + return error;
> + }
> + recv_task = p;
> +
> + p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
> + error = IS_ERR(p);
> + if (error) {
> + log_print("can't start dlm_sendd %d", error);
> + kthread_stop(recv_task);
> + return error;

whitespace broke.

> + }
> + send_task = p;
> +
> + return 0;
> +}
> +
>
> ...
>
> +/* This is quite likely to sleep... */
> +int dlm_lowcomms_start(void)
> +{
> + int error = 0;
> +
> + error = -ENOTCONN;
> +
> + /*
> + * Temporarily initialise the waitq head so that lowcomms_send_message
> + * doesn't crash if it gets called before the thread is fully
> + * initialised
> + */
> + init_waitqueue_head(&lowcomms_send_waitq);
> +
> + error = -ENOMEM;
> + connections = kmalloc(sizeof(struct connection *) *
> + NODE_INCREMENT, GFP_KERNEL);
> + if (!connections)
> + goto out;
> +
> + memset(connections, 0,
> + sizeof(struct connection *) * NODE_INCREMENT);

more kzalloc()

> + conn_array_size = NODE_INCREMENT;
> +
> + if (dlm_our_addr(&dlm_local_addr, 0)) {
> + log_print("no local IP address has been set");
> + goto fail_free_conn;
> + }
> + if (!dlm_our_addr(&dlm_local_addr, 1)) {
> + log_print("This dlm comms module does not support multi-homed clustering");
> + goto fail_free_conn;
> + }
> +
> + con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
> + __alignof__(struct connection), 0, NULL, NULL);
> + if (!con_cache)
> + goto fail_free_conn;
> +
> +
> + /* Start listening */
> + error = listen_for_all();
> + if (error)
> + goto fail_unlisten;
> +
> + error = daemons_start();
> + if (error)
> + goto fail_unlisten;
> +
> + atomic_set(&accepting, 1);
> +
> + return 0;
> +
> + fail_unlisten:
> + close_connection(connections[0], 0);
> + kmem_cache_free(con_cache, connections[0]);
> + kmem_cache_destroy(con_cache);
> +
> + fail_free_conn:
> + kfree(connections);
> +
> + out:
> + return error;
> +}
> +

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/