Re: Shared memory not SMP safe to user-mode code

Petr Vandrovec Ing. VTEI (VANDROVE@vc.cvut.cz)
Wed, 1 Dec 1999 11:49:28 MET-1


On 30 Nov 99 at 20:38, Richard B. Johnson wrote:
> Compilation with -S -O2 show that no bad optimization is done.
> In fact, the code works perfectly once the flush() macro was added.
> If you core-dump and have the flush() macro, the problem is the missing
> "$" in:
> "\tpushl $1f\n"
> |__ needs this.
Ok, so I do not know, which CPUs you have, but your code really does
prove nothing. It sooner or later ends with spin != 0 and both tasks
spinning in
while (pars->spin) key = rdtsc();
Code generated when 'spin' is not volatile is
.L235:
movl 20(%eax),%eax
testl %eax,%eax
je .L239
movl %eax,%ecx
.p2align 4,,7
.L240:
#APP
rdtsc
#NO_APP
movl %eax,-4(%ebp)
testl %ecx,%ecx
jne .L240
.L239:
and I do not see any reload of 'ptrs->spin' inside loop. It checks only
ecx register - and ecx register will not change, for sure... So only
one (first) task could pass through this (with some luck, both tasks
will find zero here on first loop...).
After changing spin to be volatile:
.L229:
movl 20(%edx),%eax # pars->spin is fetched here (1)
testl %eax,%eax
je .L233
movl %edx,%ecx
.p2align 4,,7
.L234:
#APP
rdtsc
#NO_APP
movl %eax,-4(%ebp)
movl 20(%ecx),%eax # pars->spin is fetched here (2)
testl %eax,%eax
jne .L234
## ok, loop is correct
.L233:
movl -8(%ebp),%ecx # load ptrs
movl 20(%ecx),%eax # load pars->spin (3)
movl -4(%ebp),%edx # load key
addl %edx,%eax # add pars->spin + key
movl %eax,20(%ecx) # store it (4)
movl -8(%ebp),%ecx # refetch pars (5)
movl 20(%ecx),%edx # reread value from pars->spin (6)
movl -4(%ebp),%eax
cmpl %eax,%edx # check for != key
je .L237
...
So you can see that if other CPU will modify pars->spin between points
(1) and (3) (or (2) and (3)), both cpus will enter your region. And what's
worse, if other CPU will modify pars->spin value beteen points (3) and
(4), this change will be lost.
Also, operation (4) will mark cache line as exclusive owned by this CPU,
refetch ptrs is for sure in cache, so reread on (6) will almost always
hit value just written in (4) (it is possible that this new value was not
even observed by other CPU yet).
You should really use SMP variant of atomic_add() and atomic_dec() instead
of simple add and some construct from atomic_dec_and_test() instead of
spinning on pars->spin (or you can use simple spinlock from kernel headers).
(BTW, after I removed 'volatile' from key, generated code is much better.
Why you marked 'key' as volatile, this variable is local and each process
has own instance.)

In short, what about this? Compile it with either -DSPIN to use spinlocks
or without -DSPIN to use atomic_add/atomic_sub. It will work on intel
only. If you remove rdtsc/flush stuff, it will work on all architectures
which do spin_lock/spin_unlock and/or atomic_add/atomic_sub/atomic_read
inline, without kernel function support. Ignore warnings in compilation,
I'm printing value of atomic_t/spinlock_t directly to save some ifdefs.
I also added third process which prints spin value continuously to see
whether deadlock (bug in code) occured (for example while (atomic_read())
spinning endlessly).
Best regards,
Petr Vandrovec
vandrove@vc.cvut.cz


#define __SMP__
#define CONFIG_SMP

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/mman.h>
#include <asm/spinlock.h>
#include <asm/atomic.h>

extern void iopl(int);

typedef struct {
unsigned int pad0;
unsigned int pad1;
unsigned int pad2;
unsigned int i;
unsigned int j;
#ifdef SPIN
spinlock_t spin;
#else
atomic_t spin;
#endif
unsigned int pad3;
unsigned int pad4;
} PARS __attribute__((packed));

#define KEY 0xdeadface
#define PAGE_SIZE 0x1000
#define FIRST_FLAGS (IPC_EXCL|IPC_CREAT|SHM_R|SHM_W)
#define NEXT_FLAGS (IPC_EXCL|SHM_R|SHM_W)
#define PROT (PROT_READ|PROT_WRITE)
#define FLAGS (MAP_FIXED|MAP_SHARED)
#if !defined(MAP_FAILED)
#define MAP_FAILED (void *) -1
#endif

/*-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-*/
/*
* This gets a unique number. Since it takes CPU cycles to call this
* thing, it will be different for every caller.
*/
static __inline__ unsigned int unique()
{
int ret;
__asm__ __volatile__( "rdtsc\n"
: "=eax" (ret)
: /* No inputs */
: "edx" );
return ret;
}
/*-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-*/
static __inline__ void flush()
{
__asm__ __volatile__("pushl %cs\n"
"\tpushl $1f\n"
"\tretl\n"
"\t1:\n"
);
}
#define ERRORS(s) \
{ \
fprintf(stderr, "Error from line %d, file %s, call %s() (%s)\n", \
__LINE__,__FILE__,s,strerror(errno)); \
exit(EXIT_FAILURE); \
}

/*-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-*/
/*
* This initialzes one page of shared memory. If it doesn't exist,
* it is created.
*/
void init_shmem(PARS **pars)
{
int i;
if(*pars == NULL)
{
if((i = shmget(KEY, PAGE_SIZE, FIRST_FLAGS)) < 0)
if((i = shmget(KEY, PAGE_SIZE, NEXT_FLAGS)) < 0)
ERRORS("shmget");
if((*pars = (PARS *) shmat(i, NULL, 0)) == MAP_FAILED)
ERRORS("shmat");
}
}
/*-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-*/
/*
* This detaches a shared memory segment.
*/
void quit_shmem(PARS **pars)
{
if(shmdt((const void *) *pars) < 0)
ERRORS("shmdt");
*pars = NULL;
}
/*-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-*/

int main()
{
PARS *pars = NULL;
unsigned int key;
init_shmem(&pars);
iopl(3);
memset(pars, 0x00, PAGE_SIZE);
fprintf(stderr, "Spin = %u\n", pars->spin);
fprintf(stderr, "J = %u\n", pars->j);
fprintf(stderr, "I = %u\n", pars->i);

switch(fork())
{
case 0:
quit_shmem(&pars);
init_shmem(&pars);
if (fork()) {
while (1) {
printf("%08X\r", pars->spin);
flush(stdout);
}
}
fprintf(stderr, "CHILD\n");
fprintf(stderr, "Spin = %u\n", pars->spin);
fprintf(stderr, "J = %u\n", pars->j);
fprintf(stderr, "I = %u\n", pars->i);
key = unique();
for(;;)
{
#ifdef SPIN
spin_lock(&pars->spin);
#else
unsigned int readval;

while (atomic_read(&pars->spin)) key = unique();
atomic_add(key, &pars->spin);
if ((readval = atomic_read(&pars->spin)) != key) {
//flush();
atomic_sub(key, &pars->spin);
fprintf(stderr, " Child spinning with %u\n", readval);
fprintf(stderr, " Key value was %u\n", key);
fprintf(stderr, "Pad0 = %u\n", pars->pad0);
fprintf(stderr, "Pad1 = %u\n", pars->pad1);
fprintf(stderr, "Pad2 = %u\n", pars->pad2);
fprintf(stderr, "Pad3 = %u\n", pars->pad3);
usleep(rand() % 10000);
continue;
}
#endif
if(pars->i < pars->j)
fprintf(stderr, "i = %08x, j = %08x\n", pars->i, pars->j);
else
fprintf(stderr, "Good\n");
#ifdef SPIN
spin_unlock(&pars->spin);
#else
atomic_sub(key, &pars->spin);
#endif
}
case -1:
ERRORS("fork");
default:
fprintf(stderr, "PARENT\n");
fprintf(stderr, "Spin = %u\n", pars->spin);
fprintf(stderr, "J = %u\n", pars->j);
fprintf(stderr, "I = %u\n", pars->i);
key = unique();
for(;;)
{
#ifdef SPIN
spin_lock(&pars->spin);
#else
unsigned int readval;

while (atomic_read(&pars->spin)) key = unique();
atomic_add(key, &pars->spin);
if((readval = atomic_read(&pars->spin)) != key)
{
//flush();
atomic_sub(key, &pars->spin);
fprintf(stderr, "Parent spinning with %u\n", readval);
fprintf(stderr, " Key value was %u\n", key);
usleep(rand() % 10000);
continue;
}
#endif
pars->i++;
pars->j++;
#ifdef SPIN
spin_unlock(&pars->spin);
#else
atomic_sub(key, &pars->spin);
#endif
}
}
return 0;
}

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@vger.rutgers.edu
Please read the FAQ at http://www.tux.org/lkml/