mardi 23 novembre 2010

Prototype d'un pool de connexion au niveau du client

/* Compilation: make -f demo_proc.mk EXE=test_rac_recette_con_cache OBJS=test_rac_recette_con_cache.o build PROCFLAGS="sqlcheck=full userid=xxx/xxx@xxx code=ansi_c define=V8" */

#include stdio.h
#include stdlib.h
#include string.h
#include errno.h
#include unistd.h

#include sys/time.h
#include sys/types.h
#include sys/wait.h
#include sys/ipc.h
#include sys/shm.h
#include sys/sem.h

#include sqlca.h
#include sqlcpr.h

#define UNAME_LEN 30
#define PWD_LEN 30
#define CONNECT_STRING_LEN 128
#define NB_CONNEXION 50

typedef int SEMAPHORE;

/* Contexte de connexion */
EXEC SQL BEGIN DECLARE SECTION;
typedef struct {
    sql_context sql_cntxt[NB_CONNEXION];
    short h_dspnb[NB_CONNEXION];
} sCntxtConnexion;

sql_context sql_cntxt;

VARCHAR username[UNAME_LEN];
VARCHAR password[PWD_LEN];
VARCHAR connect_string[CONNECT_STRING_LEN];
EXEC SQL END DECLARE SECTION;

sCntxtConnexion *p_cntx;

SEMAPHORE sem;

void sql_error();
int trait_rqt(int i);

int detruire_sem(SEMAPHORE sem);
int changer_sem(SEMAPHORE sem, int val);
SEMAPHORE creer_sem(key_t key);
void P(SEMAPHORE sem);
void V(SEMAPHORE sem);

int main(int argc, char* argv[]) {
int i,j;
int nb_process;
pid_t pid;
int status;

key_t cle, cle_sem;
int id;

int cpt;

if (argc != 5) { printf("test_rac_recette_con   ... \n"); return 0; }

printf("Pere: user %s mot de passe %s alias %s \n", argv[1], argv[2], argv[3]);

strcpy((char *) username.arr, argv[1]);
username.len = strlen((char *) username.arr);
strcpy((char *) password.arr, argv[2]);
password.len = strlen((char *) password.arr);
strcpy((char *) connect_string.arr, argv[3]);
connect_string.len = strlen((char *) connect_string.arr);

EXEC SQL WHENEVER SQLERROR DO sql_error("ORACLE error--\n");

nb_process = atoi(argv[4]);
j = 1;

EXEC SQL ENABLE THREADS;

/* Création du segment de mémoire partagée */
cle = ftok(getenv("HOME"), 'A');
if (cle == -1) {
    printf("Pere: pb ftok \n");
    return -1;
}
   
/* 0666: droits */
/* ipcrm -m */
id = shmget(cle, sizeof(sCntxtConnexion), IPC_CREAT | IPC_EXCL | 0666);
if (id == -1) {
    switch (errno) {
       case EEXIST:
      printf("Pere: le segment existe deja \n");
       default:
          printf("Pere: shmget \n");
          return -1;
    }
}

p_cntx = (sCntxtConnexion *) shmat(id, NULL, SHM_R | SHM_W);
if (p_cntx == NULL) {
    printf("Pere: shmat \n");
    return -1;
}

/* Création du pool */
for i in 0..NB_CONNEXION
-1
     EXEC SQL CONTEXT ALLOCATE :sql_cntxt;
     EXEC SQL CONTEXT USE :sql_cntxt;
     EXEC SQL CONNECT :username IDENTIFIED BY :password USING :connect_string;
     p_cntx->sql_cntxt[i] = sql_cntxt;
     p_cntx->h_dspnb[i]=0;

system("ipcs -m");

/* Test du pool sans fork */
sql_cntxt = p_cntx->sql_cntxt[0];
EXEC SQL CONTEXT USE :sql_cntxt;
EXEC SQL select count(*) into :cpt from alphacompactee;
printf("Pere: cpt: %d \n", cpt);

/* Création d'un sémaphore */
cle_sem = ftok(getenv("HOME"), 'B');
if (cle_sem == -1) {
    printf("Pere: pb ftok \n");
    return -1;
}

sem = creer_sem(cle_sem);

printf("Semaphore: \n");
system("ipcs -s | grep snotter");


/* Fork des processus fils */
for(i = 0; i < nb_process; i++) {
    pid = fork();
    if (pid == 0) {
        /* printf("Fils: %d \n", i); */
        trait_rqt(i);
        return 0;
    }
}

for(i = 0; i < nb_process; i++) {
    wait(&status);
}

printf("Pere: fin des fils ... \n");


/* Libération du pool */
for i in 0..NB_CONNEXION-1
     sql_cntxt = p_cntx->sql_cntxt[i];
     EXEC SQL CONTEXT USE :sql_cntxt;
     /*EXEC SQL COMMIT RELEASE;*/
     EXEC SQL CONTEXT FREE :sql_cntxt;

/* Suppression du segment de mémoire partagée */
/* (char *) */
if (shmdt((void *) p_cntx) == -1) {
    printf("Pere: shmdt \n");
    return -1;
}

if (shmctl(id, IPC_RMID, NULL) == -1) {
    printf("Pere: shmctl(remove) \n");
    return -1;
}


/* Suppression du sémaphore */
if (detruire_sem(sem) == -1) {
    printf("Pere: detruire_sem \n");
    return -1;
};

return 0;

}

int trait_rqt(int i) {
int j;
hrtime_t point1, point2;
char tcHeure[20];
struct timeval tv;
struct tm *tm;

int cpt;

gettimeofday(&tv);
tm=localtime(&tv.tv_sec);
memset(tcHeure, '\0', sizeof(tcHeure));
sprintf(tcHeure, " %d:%02d:%02d %03d ", tm->tm_hour, tm->tm_min, tm->tm_sec, tv.tv_usec/1000);

point1 = gethrtime();


/* Attente sur le sémaphore */
printf("ATT Fils %d wait \n", i);
P(sem);


/* Choix d'une connexion disponible */

for j in 0..NB_CONNEXION-1
  if (p_cntx->h_dspnb[j]==0) {
      p_cntx->h_dspnb[j]=1;
      break;
  }
  else {
      printf("Fils %d Conn %d deja prise \n", i, j); 
  }
  

printf("Fils %d Conn %d prise \n", i, j);

/* Libération du sémaphore */
V(sem);

point2 = gethrtime();
printf("Fils Conn %d; %s; %lld ; %lld ; %lld \n", i, tcHeure, (point2 - point1)/1000000, point1, point2);

gettimeofday(&tv);
tm=localtime(&tv.tv_sec);
memset(tcHeure, '\0', sizeof(tcHeure));
sprintf(tcHeure, " %d:%02d:%02d %03d ", tm->tm_hour, tm->tm_min, tm->tm_sec, tv.tv_usec/1000);

point1 = gethrtime();

sql_cntxt = p_cntx->sql_cntxt[j];
EXEC SQL CONTEXT USE :sql_cntxt;

/* Exécution d'une requête */
EXEC SQL select count(*) into :cpt from alphacompactee;

point2 = gethrtime();

printf("Fils Req %d; %s; %lld ; %lld ; %lld \n", i, tcHeure, (point2 - point1)/1000000, point1, point2);

/* Attente sur le sémaphore */

printf("ATT Fils %d wait \n", i);
P(sem);
 

/* Libération de la connexion */
p_cntx->h_dspnb[j]=0;
 

/* Libération du sémaphore */
printf("Fils %d Conn %d liberee\n", i, j);
V(sem);

return 0;
}
 
void sql_error(char *msg) {
char err_msg[128];
int buf_len, msg_len;

EXEC SQL WHENEVER SQLERROR CONTINUE;

printf("\n%s\n", msg);
buf_len = sizeof (err_msg);
 sqlglm(err_msg, (unsigned int *)&buf_len,(unsigned int *)&msg_len);
if (msg_len > buf_len)
    msg_len = buf_len;
printf("%.*s\n", msg_len, err_msg);
EXEC SQL ROLLBACK RELEASE;
exit(1);

}

int detruire_sem(SEMAPHORE sem) {
if (semctl(sem, 0, IPC_RMID, 0) != 0) {
    printf("detruire_sem \n");
    return -1;
}
return 0;
}

int changer_sem(SEMAPHORE sem, int val) {
struct sembuf sb[1];

sb[0].sem_num = 0;
sb[0].sem_op = val;
sb[0].sem_flg = 0;
if (semop(sem, sb, 1) != 0) {
    printf("changer_sem \n");
    return -1;
}
return 0;
}

SEMAPHORE creer_sem(key_t key) {
SEMAPHORE sem;
int r;
union semun {
int val;
struct semid_ds *buf;
ushort *array;
} s_ctl;

sem = semget(key, 1, IPC_CREAT | 0666);
if (sem < 0) {            
    printf("creer_sem \n");
    exit(EXIT_FAILURE);
}

s_ctl.val =  1;  
r = semctl (sem, 0, SETVAL, s_ctl );
if (r < 0) {
    printf("initialisation sémaphore \n");
    exit(EXIT_FAILURE);
}
return sem;
}

void P(SEMAPHORE sem) {
changer_sem(sem, -1);
}

void V(SEMAPHORE sem) {
changer_sem(sem, 1);
}