define(ENDLAB,5283) dnl define(MLABEL,4222) dnl define(SERVER,1) /* delay(,) */ define(DELAY, `$1.count[$2]++; XXUNLOCK($1.lock) XXLOCK(($1.queue[$2])) ' ) /* continue(,) */ define(CONTINUE, `if ($1.count[$2] == 0) { XXUNLOCK($1.lock) } else { ($1.count[$2])--; XXUNLOCK($1.queue[$2]) } goto `L'ENDLAB;' ) /* menter() */ define(MENTER, `{ XXLOCK(($1.lock)) }' ) /* mexit() */ define(MEXIT, `XXUNLOCK($1.lock) `L'ENDLAB: ; define(`ENDLAB',eval(ENDLAB+1))' ) /* decvar(,,) */ define(DECVAR, `struct $1TYP { XXDECLOCK(lock) ifelse(eval($2 > 0),1,int count[$2];,) ifelse(eval($2 > 0),1, XXDECLOCK(queue[$2]),) $3 } $1;' ) /* moninit(,) */ define(MONINIT, `{ int q_num; ifelse(eval($2 > 0),1, for (q_num=0; q_num < $2; q_num++) { $1.count[q_num] = 0; XXLOCKINIT($1.queue[q_num]) XXLOCK(($1.queue[q_num])) },,) XXLOCKINIT($1.lock) }' ) /* create() */ define(CREATE, `{ long rc; fflush(stdout); LOCK(xx_cmem->proc_table) if ((rc = fork()) == 0) { LOCK(xx_cmem->proc_table) WHO_AM_I(&xx_my_id) UNLOCK(xx_cmem->proc_table) $1(); exit(0); } else if (rc == -1) printf("*** failure in create ***\n"); else { xx_local_slv_cnt++; /* now put in the entry for the new process */ { struct xx_process *p1; int xx_i; PROC_ID xx_proc; p1 = (struct xx_process *) G_MALLOC(sizeof(struct xx_process)); MONINIT(p1->msg_q_monitor,1) p1->first_msg = p1->last_msg = 0; p1->upid = rc; p1->socket_id = 0; p1->port = 0; xx_proc.port = htonl(0); gethostname(p1->host,64); for (xx_i=1; xx_cmem->active_processes[xx_i]; xx_i++) ; xx_cmem->active_processes[xx_i] = p1; xx_proc.proc_table_indx = htonl(xx_i); ifelse($2,,,xx_copy_id($2,&xx_proc);) } } UNLOCK(xx_cmem->proc_table) }' ) /* r_create(,) */ define(R_CRE, `{ PROC_ID xx_proc; int n,xx_so,rc,ln; int xx_i,xx_id; char *host,*xx_str2; char xx_str[512]; struct xx_msg_rec buff; struct xx_process *p1; struct passwd *pw; ln = sizeof(struct xx_msg_rec); host = "$2"; xx_i = 0; while (xx_proc.host[xx_i++] = *(host++)) ; strcpy(xx_str,"$1"); n = SERVER; xx_getserv("$2",&n,&xx_so,&rc); LOCK(xx_cmem->proc_table) xx_remote_slv_cnt++; for (xx_i=1; xx_cmem->active_processes[xx_i]; xx_i++) ; /* reserve slot */ xx_cmem->active_processes[xx_i] = xx_cmem->active_processes[ntohl(xx_my_id.proc_table_indx)]; UNLOCK(xx_cmem->proc_table) xx_proc.proc_table_indx = htonl(xx_i); remote_slaves[++j][0] = xx_i; remote_slaves[j][1] = xx_so; if ( (rc = getuid()) == -1 ) { printf("Error in r_cre with getuid\n"); exit(1); }; if ( (int) (pw = getpwuid(rc)) == -1 ) { printf("Error in r_cre with getpwuid\n"); exit(1); }; xx_str2 = xx_str + strlen("$1") + 1; while ( *(xx_str2) = *(pw->pw_name++) ) xx_str2++; xx_str2 = xx_str + strlen("$1") + 1; xx_sendserv(xx_so,&xx_proc,-1,0,xx_str, strlen("$1")+strlen(xx_str2)+2,0); buff.msg_type = 0; xx_recvserv(xx_so,(char *) &buff,&ln,&rc); /* receive info on new port from daemon - this info was stored in the buff.msg_type field in net byte order */ xx_proc.port = buff.msg_type; if ( xx_proc.port ) { printf("Slave %d created on $2 ...",xx_i); } else { printf("Error in creat of slave %d on $2\n",xx_i); exit(); }; /* connect to daemon on slave */ xx_connect(&xx_proc); ifelse($3,,,xx_copy_id($3,&xx_proc);) /* set up listener on socket */ if ((xx_id = fork()) == 0) { xx_listener(xx_so); exit(); }; xx_cmem->active_processes[xx_i]->upid = xx_id; printf( "done\n"); }' ) /* clock() */ define(CLOCK, `{ # ifdef USCLOCK $1 = getusclk(); # else { long xtime; cputm_(&xtime); $1 = xtime; } # endif }' ) define(TRACE_STRUCT,` struct zz_trace_entry { int id; int typ; int data; }; struct zz_trace_struct { int next_entry; struct zz_trace_entry trace_table[400]; LOCKDEC(zz_lock) }; ') define(BUFF_SIZE,256) define(MSG_BUFF_SIZE,5000) define(TYPE_LEN_999,0) define(PROC_ID,`struct xx_proc_id') /* env(,,)) */ define(ENV,`#include #include #include XXENV define(`MAX_PROCS',`ifelse($1,,50,$1)') define(`SH_MEM',`ifelse($2,,1500000,$2)') define(`SR_BUFFS',`ifelse($3,,600,$3)') int xx_remote_slv_cnt; int xx_local_slv_cnt; int xx_socket,xx_daemon; int *xx_upid,*xx_pt_upid; char *share(); # ifdef USCLOCK void usclk_init(); usclk_t getusclk(); # endif int xxwaiti,xxrc,xxnum; int xxwaiti; struct xx_mem_blk *xx_glob_mem; char *xx_mem(); TRACE_STRUCT CLUSTER_STRUCT struct xx_proc_id xx_my_id; /* overwritten by each process */ struct xx_cluster *xx_cmem; ') define(CLUSTER_STRUCT,` struct xx_proc_id { u_long proc_table_indx; /* stored in net byte order */ u_long port; /* stored in net byte order */ char host[64]; }; struct xx_msg_rec { struct xx_proc_id sender; struct xx_proc_id target; u_long msg_type; u_long ack; u_long length; char data[MSG_BUFF_SIZE]; }; struct xx_msg { struct xx_msg *link; PROC_ID sender; u_long type; u_long ack; struct xx_buffer *first_buff; }; struct xx_buffer { struct xx_buffer *link; u_long ln; /* # characters of buff that are data */ char buff[BUFF_SIZE]; }; struct xx_process { long upid; /* take the process id from unix */ u_long socket_id; u_long port; char host[64]; DECVAR(msg_q_monitor,1) struct xx_msg *first_msg,*last_msg; }; struct xx_cluster { struct zz_trace_struct *trace_data; LOCKDEC(avl_lock) struct xx_buffer *avail_buffs; struct xx_msg *avail_msgs; LOCKDEC(proc_table) struct xx_process *active_processes[MAX_PROCS]; }; ') /* initenv() */ define(INITENV,` { int i,j,k; struct xx_buffer *xx_b1; struct xx_msg *xx_m1; int mon_dum_status; xx_remote_slv_cnt = 0; xx_local_slv_cnt = 0; ifelse($1,,,`define(`SH_MEM',$1)') i = SH_MEM; j = SR_BUFFS; i += j * (BUFF_SIZE + sizeof(struct xx_msg)); XXINITENV if (argc > 1 && !strcmp(argv[1],"x#remote_slave#x")) { xx_socket = atol(argv[3]); xx_my_id.proc_table_indx = htonl(atol(argv[2])); xx_my_id.port = htonl(0); } else { xx_socket = 0; xx_my_id.proc_table_indx = htonl(0); xx_my_id.port = htonl(0); } if (mon_dum_status) exit(mon_dum_status); else { xx_mem(0, SH_MEM); xx_upid = xx_pt_upid = (int *) G_MALLOC(MAX_PROCS*4); xx_cmem = (struct xx_cluster *) G_MALLOC(sizeof(struct xx_cluster)); xx_cmem->trace_data = (struct zz_trace_struct *) G_MALLOC(sizeof(struct zz_trace_struct)); xx_cmem->trace_data->next_entry = 0; for (i=0; i < 400; i++) { xx_cmem->trace_data->trace_table[i].id = -1; } LOCKINIT(xx_cmem->trace_data->zz_lock) /* now allocate and format the buffer pool */ LOCKINIT(xx_cmem->avl_lock) k = SR_BUFFS ; i = k * sizeof(struct xx_buffer); xx_cmem->avail_buffs = (struct xx_buffer *) G_MALLOC(i); for (j=0,xx_b1=xx_cmem->avail_buffs; j < k; j++,xx_b1++) xx_b1->link = xx_b1+1; xx_b1--; xx_b1->link = 0; /* initialize msg avail */ k = SR_BUFFS ; i = k * sizeof(struct xx_msg); xx_cmem->avail_msgs = (struct xx_msg *) G_MALLOC(i); for (j=0,xx_m1=xx_cmem->avail_msgs; j < k; j++,xx_m1++) xx_m1->link = xx_m1+1; xx_m1--; xx_m1->link = 0; /* initialize table of active processes and array for unix ids */ LOCKINIT(xx_cmem->proc_table) for (i=1; i < MAX_PROCS; i++) { *(xx_pt_upid++) = 0; xx_cmem->active_processes[i] = 0; }; xx_pt_upid = xx_upid; /* now put in the entry for the master and possibly super master */ { int rc,xx_i; char xx_indx[4]; long getpid(); struct xx_process *p1; xx_cmem->active_processes[ntohl(xx_my_id.proc_table_indx)] = p1 = (struct xx_process *) G_MALLOC(sizeof(struct xx_process)); MONINIT(p1->msg_q_monitor,1) p1->first_msg = p1->last_msg = 0; p1->upid = getpid(); p1->socket_id = 0; p1->port = 0; gethostname(p1->host,64); if ( xx_socket) { xx_cmem->active_processes[0] = p1 = (struct xx_process *) G_MALLOC(sizeof(struct xx_process)); MONINIT(p1->msg_q_monitor,1) p1->first_msg = p1->last_msg = 0; p1->socket_id = xx_socket; p1->port = 0; /* Invoke code for listening daemon */ if ( (rc = fork()) == 0 ) xx_makeserv(xx_socket); xx_daemon = rc; /* printf ("Daemon: %d for %d\n",rc, ntohl(xx_my_id.proc_table_indx)); */ /* Wait for indx range of remote slaves */ if ( recv(xx_socket,xx_indx,3,0) == -1 ) { printf("Error: slave %d on recv of indx range\n", ntohl(xx_my_id.proc_table_indx)); exit(); } rc = atol(xx_indx); /* Reserve slots for remote slaves */ for ( xx_i=1 ; xx_i <= rc ; xx_i++ ) if ( xx_cmem->active_processes[xx_i] == 0 ) xx_cmem->active_processes[xx_i] = (struct xx_process *) 1; } } } } ') /* who_am_i() */ define(WHO_AM_I,` { long unix_id; long getpid(); int xx_i,xx_j; PROC_ID *xx_p; unix_id = getpid(); for ( xx_i=0 ; xx_i < MAX_PROCS ; xx_i++ ) if (xx_cmem->active_processes[xx_i] && ((int) xx_cmem->active_processes[xx_i] != 1) && (unix_id == xx_cmem->active_processes[xx_i]->upid)) break; if ( xx_i == MAX_PROCS ) { printf("who am i: %d can not find myself\n",unix_id); exit(-1); } xx_p = $1; xx_p->proc_table_indx = htonl(xx_i); xx_p->port = htonl(xx_cmem->active_processes[xx_i]->port); gethostname(xx_p->host,64); } ') /* where_am_i() */ define(WHERE_AM_I,`*($1) = 0;') /* char *G_MALLOC() */ define(G_MALLOC, `xx_mem(1,$1);' ) define(G_AREA,`xx_mem(2,0);') define(MEM_BLK,` struct mem_blk { char *next; int l_mem; }; ') define(LOG_EVENT,`lgevnt(xx_cmem->trace_data,ntohl(xx_my_id.proc_table_indx),$1,$2);') define(DUMP_TRACE,`dtrc(xx_cmem->trace_data);') define(WAIT_FOR_END,` { int _xxi; if (xx_remote_slv_cnt) { xx_cleanup(); } for (_xxi=0; _xxi < xx_local_slv_cnt; _xxi++) wait(0); xx_remote_slv_cnt = 0; xx_local_slv_cnt = 0; } ') define(LOAD,` if (*($1) == 0) *($2) = 1000; else { printf("*** invalid invocation of load: =%d\n",$1); exit(3); } ') /* send_r(,,[,]) */ define(SENDR,` { PROC_ID xx_proc; u_long ack_type; SEND($1,$2,$3,$4,ack) RECEIVE(&xx_proc,&ack_type,, (match_id($1) && match_type(999))) } ') /* send(, [,] [,][,]) */ define(SEND,` { PROC_ID *xx_id_pt; xx_id_pt = $1; if ((int)xx_cmem->active_processes[ntohl(xx_id_pt->proc_table_indx)] == 1) { xx_connect($1); }; if (xx_cmem->active_processes[ntohl(xx_id_pt->proc_table_indx)]->socket_id != 0) { xxnum = xx_cmem->active_processes[ntohl(xx_id_pt->proc_table_indx)]->socket_id; xx_sendserv(xxnum,$1,$2,TYPE_LEN_$2,(char *) ifelse($3,,0,$3),ifelse($4,,0,$4),ifelse($5,,0,1)); } else { xx_send($1,$2,TYPE_LEN_$2,(char *) ifelse($3,,0,$3),ifelse($4,,0,$4),ifelse($5,,0,1)); }; } ') define(match_id,`((*xx_mp)->sender.proc_table_indx == (*($1)).proc_table_indx)') define(match_type,`((*xx_mp)->type == $1)') /* receive(,,,[,]) */ define(RECEIVE,` { struct xx_buffer *xx_b2; struct xx_msg *xx_m1,**xx_mp; struct xx_process *to_proc; int xx_found; to_proc = xx_cmem->active_processes[ntohl(xx_my_id.proc_table_indx)]; xx_found = 0; MENTER(to_proc->msg_q_monitor) xx_mp = &(to_proc->first_msg); while (!xx_found) { xx_get_sender_type(to_proc,xx_mp,$1,$2); if ($4) { xx_b2 =0; xx_m1 = *xx_mp; xx_copy_msg(xx_m1,(char *) ifelse($3,,0,$3),&xx_b2,ifelse($5,,0,$5)); xx_found = 1; xx_adj_links(xx_mp,to_proc); } else { xx_mp = &((*xx_mp)->link); } } MEXIT(to_proc->msg_q_monitor) xx_send_ack($1,xx_m1,xx_b2); } ') /* messages_available(, ) */ define(MESSAGES_AVAILABLE,` { struct xx_process *to_proc; struct xx_msg **xx_mp; to_proc = xx_cmem->active_processes[ntohl(xx_my_id.proc_table_indx)]; MENTER(to_proc->msg_q_monitor) for (xx_mp= &(to_proc->first_msg),$2=0;*xx_mp && !$2;xx_mp= &((*xx_mp)->link)) if ($1) $2 = 1; MEXIT(to_proc->msg_q_monitor) } ') /* remote_create(,) */ define(REMOTE_CREATE,` { $1($2); } ') /* process_group() */ define(PROCESS_GROUP,` $1(proc_ids) PROC_ID *proc_ids; { int i,j; char str[3]; int remote_slaves[MAX_PROCS][2]; j = 0; ') /* process_entry(,[]) */ define(PROCESS_ENTRY,` ifelse($2,,`CREATE($1,proc_ids)',`R_CRE($1,$2,proc_ids)') proc_ids++; ') /* process_group_end */ define(PROCESS_GROUP_END,` if ( j ) { sprintf(str,"%d",j); for ( i=1 ; i <= j ; i++ ) if ( send(remote_slaves[i][1],str,3,0) == -1 ) { printf("Error in send index range to remote slave\n"); exit(); } } } ') /* xx_cmp_id(,) define(CMP_ID,` xx_cmp_id($1,$2) ') /* xx_copy_id(,) define(COPY_ID,` { xx_copy_id($1,$2); } ') /* type(,) */ define(MSG_TYPE,` ifelse($2,EMPTY,define(`TYPE_LEN_$1',0), ` $2 define(`I',`eval(index($2,`{')-1)') define(`TYPE_LEN_$1',sizeof(substr($2,0,I))) ') ') define(BEGIN_MSG_TYPES,) define(END_MSG_TYPES,) define(XXENV,`#include # ifdef USCLOCK # include # endif char *share(); ') define(XXINITENV,` mon_dum_status = 0; # ifdef USCLOCK usclk_init(); # endif ') define(XXLOCKTYPE,`slock_t') define(XXDECLOCK, `XXLOCKTYPE $1;') define(XXLOCKINIT, `s_init_lock(& $1); ') define(XXLOCK,`s_lock(& $1);') define(XXUNLOCK,`s_unlock(& $1);')