Re: [PATCH 2.6.11-rc4-mm1] connector: Add a fork connector
From: Evgeniy Polyakov
Date: Thu Mar 03 2005 - 07:26:36 EST
On Thu, 2005-03-03 at 14:51 +0300, Evgeniy Polyakov wrote:
> Simple program to test fork() performance.
...
In a bit more advanced version it checks for error value,
but it never happend.
It can also have more fine grained measurment,
but IMHO the picture is clear for small systems.
> Creating 10k forks 100 times.
> Results on 2-way SMP(1+1HT) Xeon for one fork()+exit():
>
> 2.6.11-rc4-mm1 494 usec
Actually sometimes it drops to 480 usecs.
> 2.6.11-rc4-mm1-fork-connector-no_userspace 509 usec
> 2.6.11-rc4-mm1-fork-connector-userspace 520 usec
>
> 5% fork() degradation(connector with userspace vs. vanilla) with fork() connector.
> On my test system global fork lock does not cost anything
> (tested both with and without userspace listener), but it is only 2-way(pseudo).
connector.c used in experiments is attached.
If fork connector analysis will show that global fork lock is a big
bottleneck,
than seq counter can be replaced with per-cpu counter, but then inner
header should
include cpu id to properly distinguish messages.
But it is totaly fork's connector area, so I will not break things.
--
Evgeniy Polyakov
Crash is better than data corruption -- Arthur Grabowski
/*
* connector.c
*
* 2004 Copyright (c) Evgeniy Polyakov <johnpol@xxxxxxxxxxx>
* All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/list.h>
#include <linux/skbuff.h>
#include <linux/netlink.h>
#include <linux/moduleparam.h>
#include <linux/connector.h>
#include <net/sock.h>
MODULE_LICENSE("GPL");
MODULE_AUTHOR("Evgeniy Polyakov <johnpol@xxxxxxxxxxx>");
MODULE_DESCRIPTION("Generic userspace <-> kernelspace connector.");
static int unit = NETLINK_NFLOG;
static u32 cn_idx = -1;
static u32 cn_val = -1;
module_param(unit, int, 0);
module_param(cn_idx, uint, 0);
module_param(cn_val, uint, 0);
static DEFINE_SPINLOCK(notify_lock);
static LIST_HEAD(notify_list);
static struct cn_dev cdev;
int cn_already_initialized = 0;
int cn_fork_enable = 0;
struct cb_id cb_fork_id = { CN_IDX_FORK, CN_VAL_FORK };
/*
* msg->seq and msg->ack are used to determine message genealogy.
* When someone sends message it puts there locally unique sequence
* and random acknowledge numbers.
* Sequence number may be copied into nlmsghdr->nlmsg_seq too.
*
* Sequence number is incremented with each message to be sent.
*
* If we expect reply to our message,
* then sequence number in received message MUST be the same as in original message,
* and acknowledge number MUST be the same + 1.
*
* If we receive message and it's sequence number is not equal to one we are expecting,
* then it is new message.
* If we receive message and it's sequence number is the same as one we are expecting,
* but it's acknowledge is not equal acknowledge number in original message + 1,
* then it is new message.
*
*/
void cn_netlink_send(struct cn_msg *msg, u32 __groups)
{
struct cn_callback_entry *n, *__cbq;
unsigned int size;
struct sk_buff *skb, *uskb;
struct nlmsghdr *nlh;
struct cn_msg *data;
struct cn_dev *dev = &cdev;
u32 groups = 0;
int found = 0;
if (!__groups)
{
spin_lock_bh(&dev->cbdev->queue_lock);
list_for_each_entry_safe(__cbq, n, &dev->cbdev->queue_list, callback_entry) {
if (cn_cb_equal(&__cbq->cb->id, &msg->id)) {
found = 1;
groups = __cbq->group;
}
}
spin_unlock_bh(&dev->cbdev->queue_lock);
if (!found) {
printk(KERN_ERR "Failed to find multicast netlink group for callback[0x%x.0x%x]. seq=%u\n",
msg->id.idx, msg->id.val, msg->seq);
return;
}
}
else
groups = __groups;
size = NLMSG_SPACE(sizeof(*msg) + msg->len);
skb = alloc_skb(size, GFP_ATOMIC);
if (!skb) {
printk(KERN_ERR "Failed to allocate new skb with size=%u.\n", size);
return;
}
nlh = NLMSG_PUT(skb, 0, msg->seq, NLMSG_DONE, size - NLMSG_ALIGN(sizeof(*nlh)));
data = (struct cn_msg *)NLMSG_DATA(nlh);
memcpy(data, msg, sizeof(*data) + msg->len);
#if 0
printk("%s: len=%u, seq=%u, ack=%u, group=%u.\n",
__func__, msg->len, msg->seq, msg->ack, groups);
#endif
NETLINK_CB(skb).dst_groups = groups;
uskb = skb_clone(skb, GFP_ATOMIC);
if (uskb) {
netlink_unicast(dev->nls, uskb, 0, 0);
}
netlink_broadcast(dev->nls, skb, 0, groups, GFP_ATOMIC);
return;
nlmsg_failure:
printk(KERN_ERR "Failed to send %u.%u\n", msg->seq, msg->ack);
kfree_skb(skb);
return;
}
static int cn_call_callback(struct cn_msg *msg, void (*destruct_data) (void *), void *data)
{
struct cn_callback_entry *n, *__cbq;
struct cn_dev *dev = &cdev;
int found = 0;
spin_lock_bh(&dev->cbdev->queue_lock);
list_for_each_entry_safe(__cbq, n, &dev->cbdev->queue_list, callback_entry) {
if (cn_cb_equal(&__cbq->cb->id, &msg->id)) {
__cbq->cb->priv = msg;
__cbq->ddata = data;
__cbq->destruct_data = destruct_data;
queue_work(dev->cbdev->cn_queue, &__cbq->work);
found = 1;
break;
}
}
spin_unlock_bh(&dev->cbdev->queue_lock);
return found;
}
static int __cn_rx_skb(struct sk_buff *skb, struct nlmsghdr *nlh)
{
u32 pid, uid, seq, group;
struct cn_msg *msg;
pid = NETLINK_CREDS(skb)->pid;
uid = NETLINK_CREDS(skb)->uid;
seq = nlh->nlmsg_seq;
group = NETLINK_CB((skb)).groups;
msg = (struct cn_msg *)NLMSG_DATA(nlh);
if (NLMSG_SPACE(msg->len + sizeof(*msg)) != nlh->nlmsg_len) {
printk(KERN_ERR "skb does not have enough length: "
"requested msg->len=%u[%u], nlh->nlmsg_len=%u, skb->len=%u.\n",
msg->len, NLMSG_SPACE(msg->len + sizeof(*msg)),
nlh->nlmsg_len, skb->len);
kfree_skb(skb);
return -EINVAL;
}
#if 0
printk(KERN_INFO "pid=%u, uid=%u, seq=%u, group=%u.\n",
pid, uid, seq, group);
#endif
return cn_call_callback(msg, (void (*)(void *))kfree_skb, skb);
}
static void cn_rx_skb(struct sk_buff *__skb)
{
struct nlmsghdr *nlh;
u32 len;
int err;
struct sk_buff *skb;
skb = skb_get(__skb);
if (!skb) {
printk(KERN_ERR "Failed to reference an skb.\n");
kfree_skb(__skb);
return;
}
#if 0
printk(KERN_INFO
"skb: len=%u, data_len=%u, truesize=%u, proto=%u, cloned=%d, shared=%d.\n",
skb->len, skb->data_len, skb->truesize, skb->protocol,
skb_cloned(skb), skb_shared(skb));
#endif
while (skb->len >= NLMSG_SPACE(0)) {
nlh = (struct nlmsghdr *)skb->data;
if (nlh->nlmsg_len < sizeof(struct cn_msg) ||
skb->len < nlh->nlmsg_len ||
nlh->nlmsg_len > CONNECTOR_MAX_MSG_SIZE) {
#if 0
printk(KERN_INFO "nlmsg_len=%u, sizeof(*nlh)=%u\n",
nlh->nlmsg_len, sizeof(*nlh));
#endif
kfree_skb(skb);
break;
}
len = NLMSG_ALIGN(nlh->nlmsg_len);
if (len > skb->len)
len = skb->len;
err = __cn_rx_skb(skb, nlh);
if (err) {
#if 0
if (err < 0 && (nlh->nlmsg_flags & NLM_F_ACK))
netlink_ack(skb, nlh, -err);
#endif
break;
} else {
#if 0
if (nlh->nlmsg_flags & NLM_F_ACK)
netlink_ack(skb, nlh, 0);
#endif
break;
}
skb_pull(skb, len);
}
kfree_skb(__skb);
}
static void cn_input(struct sock *sk, int len)
{
struct sk_buff *skb;
while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL)
cn_rx_skb(skb);
}
static void cn_notify(struct cb_id *id, u32 notify_event)
{
struct cn_ctl_entry *ent;
spin_lock_bh(¬ify_lock);
list_for_each_entry(ent, ¬ify_list, notify_entry) {
int i;
struct cn_notify_req *req;
struct cn_ctl_msg *ctl = ent->msg;
int a, b;
a = b = 0;
req = (struct cn_notify_req *)ctl->data;
for (i=0; i<ctl->idx_notify_num; ++i, ++req) {
if (id->idx >= req->first && id->idx < req->first + req->range) {
a = 1;
break;
}
}
for (i=0; i<ctl->val_notify_num; ++i, ++req) {
if (id->val >= req->first && id->val < req->first + req->range) {
b = 1;
break;
}
}
if (a && b) {
struct cn_msg m;
printk(KERN_INFO "Notifying group %x with event %u about %x.%x.\n",
ctl->group, notify_event,
id->idx, id->val);
memset(&m, 0, sizeof(m));
m.ack = notify_event;
memcpy(&m.id, id, sizeof(m.id));
cn_netlink_send(&m, ctl->group);
}
}
spin_unlock_bh(¬ify_lock);
}
int cn_add_callback(struct cb_id *id, char *name, void (*callback) (void *))
{
int err;
struct cn_dev *dev = &cdev;
struct cn_callback *cb;
cb = kmalloc(sizeof(*cb), GFP_KERNEL);
if (!cb) {
printk(KERN_INFO "%s: Failed to allocate new struct cn_callback.\n",
dev->cbdev->name);
return -ENOMEM;
}
memset(cb, 0, sizeof(*cb));
snprintf(cb->name, sizeof(cb->name), "%s", name);
memcpy(&cb->id, id, sizeof(cb->id));
cb->callback = callback;
atomic_set(&cb->refcnt, 0);
err = cn_queue_add_callback(dev->cbdev, cb);
if (err) {
kfree(cb);
return err;
}
cn_notify(id, 0);
return 0;
}
void cn_del_callback(struct cb_id *id)
{
struct cn_dev *dev = &cdev;
struct cn_callback_entry *n, *__cbq;
list_for_each_entry_safe(__cbq, n, &dev->cbdev->queue_list, callback_entry) {
if (cn_cb_equal(&__cbq->cb->id, id)) {
cn_queue_del_callback(dev->cbdev, __cbq->cb);
cn_notify(id, 1);
break;
}
}
}
static int cn_ctl_msg_equals(struct cn_ctl_msg *m1, struct cn_ctl_msg *m2)
{
int i;
struct cn_notify_req *req1, *req2;
if (m1->idx_notify_num != m2->idx_notify_num)
return 0;
if (m1->val_notify_num != m2->val_notify_num)
return 0;
if (m1->len != m2->len)
return 0;
if ((m1->idx_notify_num + m1->val_notify_num)*sizeof(*req1) != m1->len) {
printk(KERN_ERR "Notify entry[idx_num=%x, val_num=%x, len=%u] contains garbage. Removing.\n",
m1->idx_notify_num, m1->val_notify_num, m1->len);
return 1;
}
req1 = (struct cn_notify_req *)m1->data;
req2 = (struct cn_notify_req *)m2->data;
for (i=0; i<m1->idx_notify_num; ++i) {
if (memcmp(req1, req2, sizeof(*req1)))
return 0;
req1++;
req2++;
}
for (i=0; i<m1->val_notify_num; ++i) {
if (memcmp(req1, req2, sizeof(*req1)))
return 0;
req1++;
req2++;
}
return 1;
}
static void cn_callback(void * data)
{
struct cn_msg *msg = (struct cn_msg *)data;
struct cn_ctl_msg *ctl;
struct cn_ctl_entry *ent;
u32 size;
if (msg->len < sizeof(*ctl)) {
printk(KERN_ERR "Wrong connector request size %u, must be >= %u.\n",
msg->len, sizeof(*ctl));
return;
}
ctl = (struct cn_ctl_msg *)msg->data;
size = sizeof(*ctl) + (ctl->idx_notify_num + ctl->val_notify_num)*sizeof(struct cn_notify_req);
if (msg->len != size) {
printk(KERN_ERR "Wrong connector request size %u, must be == %u.\n",
msg->len, size);
return;
}
if (ctl->len + sizeof(*ctl) != msg->len) {
printk(KERN_ERR "Wrong message: msg->len=%u must be equal to inner_len=%u [+%u].\n",
msg->len, ctl->len, sizeof(*ctl));
return;
}
/*
* Remove notification.
*/
if (ctl->group == 0) {
struct cn_ctl_entry *n;
spin_lock_bh(¬ify_lock);
list_for_each_entry_safe(ent, n, ¬ify_list, notify_entry) {
if (cn_ctl_msg_equals(ent->msg, ctl)) {
list_del(&ent->notify_entry);
kfree(ent);
}
}
spin_unlock_bh(¬ify_lock);
return;
}
size += sizeof(*ent);
ent = kmalloc(size, GFP_ATOMIC);
if (!ent) {
printk(KERN_ERR "Failed to allocate %d bytes for new notify entry.\n", size);
return;
}
memset(ent, 0, size);
ent->msg = (struct cn_ctl_msg *)(ent + 1);
memcpy(ent->msg, ctl, size - sizeof(*ent));
spin_lock_bh(¬ify_lock);
list_add(&ent->notify_entry, ¬ify_list);
spin_unlock_bh(¬ify_lock);
{
int i;
struct cn_notify_req *req;
printk("Notify group %x for idx: ", ctl->group);
req = (struct cn_notify_req *)ctl->data;
for (i=0; i<ctl->idx_notify_num; ++i, ++req) {
printk("%u-%u ", req->first, req->first+req->range-1);
}
printk("\nNotify group %x for val: ", ctl->group);
for (i=0; i<ctl->val_notify_num; ++i, ++req) {
printk("%u-%u ", req->first, req->first+req->range-1);
}
printk("\n");
}
}
static void cn_fork_callback(void *data)
{
if (cn_already_initialized)
cn_fork_enable = 1;
}
static int cn_init(void)
{
struct cn_dev *dev = &cdev;
int err;
dev->input = cn_input;
dev->id.idx = cn_idx;
dev->id.val = cn_val;
dev->nls = netlink_kernel_create(unit, dev->input);
if (!dev->nls) {
printk(KERN_ERR "Failed to create new netlink socket(%u).\n",
unit);
return -EIO;
}
dev->cbdev = cn_queue_alloc_dev("cqueue", dev->nls);
if (!dev->cbdev) {
if (dev->nls->sk_socket)
sock_release(dev->nls->sk_socket);
return -EINVAL;
}
err = cn_add_callback(&dev->id, "connector", &cn_callback);
if (err) {
cn_queue_free_dev(dev->cbdev);
if (dev->nls->sk_socket)
sock_release(dev->nls->sk_socket);
return -EINVAL;
}
err = cn_add_callback(&cb_fork_id, "cn_fork", &cn_fork_callback);
if (err) {
cn_del_callback(&dev->id);
cn_queue_free_dev(dev->cbdev);
if (dev->nls->sk_socket)
sock_release(dev->nls->sk_socket);
return -EINVAL;
}
cn_already_initialized = 1;
return 0;
}
static void cn_fini(void)
{
struct cn_dev *dev = &cdev;
cn_del_callback(&dev->id);
cn_queue_free_dev(dev->cbdev);
if (dev->nls->sk_socket)
sock_release(dev->nls->sk_socket);
}
module_init(cn_init);
module_exit(cn_fini);
EXPORT_SYMBOL_GPL(cn_add_callback);
EXPORT_SYMBOL_GPL(cn_del_callback);
EXPORT_SYMBOL_GPL(cn_netlink_send);
Attachment:
signature.asc
Description: This is a digitally signed message part