 |
» |
|
|
 |
Some CPSlib functions map directly to existing pthread functions,
as shown in Table A-1 “CPSlib library functions to pthreads mapping”. However,
certain CPSlib functions, such as cps_plevel,
are obsolete in the scope of pthreads. While about half of the CPSlib
functions do not map to pthreads, their tasks can be simulated by
the programmer. The examples presented in the following sections demonstrate
various constructs that can be programmed to mimic unmappable
CPSlib functions in pthreads. The examples shown here are provided
as a first step in replacing previous functionality provided by
CPSlib with POSIX thread standard calls. This is not a tutorial in pthreads, nor do these examples
describe complex pthreads operations, such as nesting. For a definitive
description of how to use pthreads functions, see the book
Threadtime by
Scott Norton and
Mark D. Dipasquale. Symmetric parallelism |  |
Symmetric parallel threads are spawned in CPSlib using cps_ppcall() or cps_ppcalln(). There is no logical mapping of
these CPSlib functions to pthread functions. However you can create
a program, similar to the one shown in the ppcall.c
example below, to achieve the same results. This example also includes the following CPSlib thread information
functions: my_nsthreads
(a map created for
cps_nthreads) returns the number
of threads in the current spawn context.
my_stid
(a map created for cps_stid) returns the spawn thread ID of the calling
thread.
The ppcall.c example performs
other tasks associated with symmetrical thread processing, including
the following: Allocates a cell barrier data structure
based upon the number of threads in the current process by calling
my_barrier_alloc
Provides a barrier for threads to
"join" or synchronize after parallel work is completed
by calling my_join_barrier
Creates data structures for threads
created using pthread_create
Uses the CPS_STACK_SIZE
environment variable to determine the stacksize
Determines the number of threads to
create by calling pthread_num_processors_np()
Returns the number of threads by
calling my_nsthreads()
Returns the is_parallel
flag by calling my_is_parallel()
 |
/* * ppcall.c * function * Symmetric parallel interface to using pthreads * called my_thread package. * */ #ifndef _HPUX_SOURCE #define _HPUX_SOURCE #endif#include <spp_prog_model.h> #include <pthread.h> #include <stdlib.h> #include <errno.h> #include "my_ppcall.h" #define K 1024 #define MB K*K struct thread_data { int stid; int nsthreads; int release_flag; r}; }; typedef struct thread_data thread_t; typedef struct thread_data *thread_p; #define WAIT_UNKNOWN0 #define WAIT_SPIN1 #define WAIT_SUSPEND2 #define MAX_THREADS64 #define W_CACHE_SIZE 8 #define B_CACHE_SIZE 32 typedef struct { int volatile c_cell; int c_pad[W_CACHE_SIZE-1]; } cell_t; #define ICELL_SZ (sizeof(int)*3+sizeof(char *)) struct cell_barrier { int br_c_magic; int volatile br_c_release; char * br_c_free_ptr; int br_c_cell_cnt; char br_c_pad[B_CACHE_SIZE-ICELL_SZ]; cell_t br_c_cells[1]; }; #define BR_CELL_T_SIZE(x) (sizeof(struct cell_barrier) + (sizeof(cell_t)*x)) /* * ALIGN - to align objects on specific alignments (usually on * cache line boundaries. * * arguments * obj- pointer object to align * alignment- alignment to align obj on * * Notes: * We cast obj to a long, so that this code will work in * either narrow or wide modes of the compilers. */ #define ALIGN(obj, alignment)\ ((((long) obj) + alignment - 1) & ~(alignment - 1)) typedef struct cell_barrier * cell_barrier_t; /* * File Variable Dictionary: * * my_thread_mutex- mutex to control access to the following: * my_func, idle_release_flag, my_arg, * my_call_thread_max, my_threads_are_init, * my_threads_are_parallel. * * idle_release_flag - flag to release spinning * idle threads * my_func - user specified function to call * my_arg - argument to pass to my_func * my_call_thread_max - maximum number of threads * needed on this ppcall * my_threads_are_init - my thread package init flag * my_threads_are_parallel - we are executing parallel * code flag * my_thread_ids - list of child thread ids * my_barrier - barrier used by the join * my_thread_ptr - the current thread thread - pointer in thread-private * memory. */ static pthread_mutex_tmy_thread_mutex = PTHREAD_MUTEX_INITIALIZER; static int volatile idle_release_flag = 0; static void (*my_func)(void *); static void *my_arg; static int my_call_thread_max; static int my_stacksize = 8*MB; static int thread_count = 1; static int my_threads_are_init = 0; static int volatile my_threads_are_parallel = 0; static pthread_t my_thread_ids[MAX_THREADS]; static cell_barrier_t my_barrier; static thread_p thread_private my_thread_ptr; /* * my_barrier_alloc * Allocate cell barrier data structure based upon the * number of threads that are in the current process. * * arguments * brc - pointer pointer to the user cell barrier * n - number of threads that will use this barrier * * return * 0- success * -1- failed to allocate cell barrier */ static int my_barrier_alloc(cell_barrier_t *brc, int n) { cell_barrier_t b; char *p; int i; /* * Allocate cell barrier for "n" threads */ if ( (p = (char *) malloc(BR_CELL_T_SIZE(n))) == 0 ) return -1; /* * Align the barrier on a cache line for maximum performance. */ b = (cell_barrier_t) ALIGN(p, B_CACHE_SIZE); b->br_c_magic = 0x4200beef; b->br_c_cell_cnt = n; /* keep track of the # of threads */ b->br_c_release = 0; /* initialize release flag */ b->br_c_free_ptr = p; /* keep track of orginal malloc ptr */ for(i = 0; i < n; i++ ) b->br_c_cells[i].c_cell = 0;/* zero the cell flags */ *brc = b; return 0; } /* * my_join_barrier * Provide a barrier for all threads to sync up at, after * they have finished performing parallel work. * * arguments * b - pointer to cell barrier * id - id of the thread (need to be in the * range of 0 - (N-1), where N is the *number of threads).
|
 |
 |
* return *none */ static void my_join_barrier(cell_barrier_t b, int id) { int i, key; /* * Get the release flag value, before we signal that we * are at the barrier. */ key = b->br_c_release; if ( id == 0 ) { /* * make thread 0 (i.e. parent thread) wait for the child * threads to show up. */ for( i = 1; i < thread_count; i++ ) { /* * wait on the Nth cell */ while ( b->br_c_cells[i].c_cell == 0 ) /* spin */; /* * We can reset the Nth cell now, * because it is not being used anymore * until the next barrier. /* b->br_c_cells[i].c_cell = 0; } /* * signal all of the child threads to leave the barrier. */ ++b->br_c_release; } else { /* * signal that the Nth thread has arrived at the barrier. */ b->br_c_cells[id].c_cell = -1; while ( key == b->br_c_release ) /* spin */; } } /* * idle_threads * All of the process child threads will execute this * code. It is the idle loop where the child threads wait * for parallel work. * arguments * thr- thread pointer * * algorithm: * Initialize some thread specific data structures. * Loop forever on the following: * Wait until we have work. * Get global values on what work needs to be done. * Call user specified function with argument. * Call barrier code to sync up all threads. */static void idle_threads(thread_p thr) { /* * initialized the thread thread-private memory pointer. */ my_thread_ptr = thr; for(;;) { /* * threads spin here waiting for work to be assign * to them. */ while( thr->release_flag == idle_release_flag ) /* spin until idle_release_flag changes */; thr->release_flag = idle_release_flag; thr->nsthreads = my_call_thread_max; /* * call user function with their specified argument. */ if ( thr->stid < my_call_thread_max ) (*my_func)(my_arg); /* * make all threads join before they were to the idle loop. */ my_join_barrier(my_barrier, thr->stid); } } /** create_threads * This routine creates all of the MY THREADS package data * structures and child threads. * * arguments: * none * * return: * none * * algorithm: * Allocate data structures for a thread * Create the thread via the pthread_create call. * If the create call is successful, repeat until the * number of threads equal the number of processors. * */ static void create_threads() { pthread_attr_t attr; char *env_val; int i, rv, cpus, processors; thread_p thr; /* * allocate and initialize the thread structure for the * parent thread. */ if ( (thr = (thread_p) malloc(sizeof(thread_t))) == NULL ) { fprintf(stderr,"my_threads: Fatal error: can not allocate memory for main thread\n"); abort(); } my_thread_ptr = thr; thr->stid = 0; thr->release_flag = 0; /* * initialize attribute structure */ (void) pthread_attr_init(&attr); /* * Check to see if the CPS_STACK_SIZE env variable is defined. * If it is, then use that as the stacksize. */ if ( (env_val = getenv("CPS_STACK_SIZE")) != NULL ) { int val; val = atoi(env_val); if ( val > 128 ) my_stacksize = val * K; } (void) pthread_attr_setstacksize(&attr,my_stacksize); /* * determine how many threads we will create. */ processors = cpus = pthread_num_processors_np(); if ( (env_val = getenv("MP_NUMBER_OF_THREADS")) != NULL ) { int val; val = atoi(env_val); if ( val >= 1 ) cpus = val; } for(i = 1; i < cpus && i < MAX_THREADS; i++ ) { /* * allocate and initialize thread data structure. */ if ( (thr = (thread_p) malloc(sizeof(thread_t))) == NULL ) break; thr->stid = i; thr->release_flag = 0; rv = pthread_create(&my_thread_ids[i-1], &attr, (void *(*)(void *))idle_threads, (void *) thr); if ( rv != 0 ) { free(thr); break; } thread_count++; } my_threads_are_init = 1; my_barrier_alloc(&my_barrier, thread_count); /* * since we are done with this attribute, get rid of it. */ (void) pthread_attr_destroy(&attr); } /* * my_ppcall * Call user specified routine in parallel. * * arguments: * max- maximum number of threads that are needed. * func- user specified function to call * arg- user specified argument to pass to func * * return: * 0- success * -1- error * * algorithm: * If we are already parallel, then return with an error * code. Allocate threads and internal data structures, * if this is the first call. * Determine how many threads we need. * Set global variables. * Signal the child threads that they have parallel work. * At this point we signal all of the child threads and * let them determine if they need to take part in the * parallel call. Call the user specified function. * Barrier call will sync up all threads. */ int my_ppcall(int max, void (*func)(void *), void *arg) { thread_p thr; int i, suspend; /* * check for error conditions */ if ( max <= 0 || func == NULL ) return EINVAL; if ( my_threads_are_parallel ) return EAGAIN; (void) pthread_mutex_lock(&my_thread_mutex); if ( my_threads_are_parallel ) { (void) pthread_mutex_unlock(&my_thread_mutex); return EAGAIN; } /* * create the child threads, if they are not already created. */ if ( !my_threads_are_init ) create_threads(); /* * set global variables to communicate to child threads. */ if ( max > thread_count ) my_call_thread_max = thread_count; else my_call_thread_max = max; my_func = func; my_arg = arg; my_thread_ptr->nsthreads = my_call_thread_max; ++my_threads_are_parallel; /* * signal all of the child threads to exit the spin loop */ ++idle_release_flag; (void) pthread_mutex_unlock(&my_thread_mutex); /* * call user func with user specified argument */ (*my_func)(my_arg); /* * call join to make sure all of the threads are done doing * there work. */ my_join_barrier(my_barrier, my_thread_ptr->stid); (void) pthread_mutex_lock(&my_thread_mutex); /* * reset the parallel flag */ my_threads_are_parallel = 0; (void) pthread_mutex_unlock(&my_thread_mutex); return 0; } /* * my_stid * Return thread spawn thread id. This will be in the range * of 0 to N-1, where N is the number of threads in the * process. * arguments: * none * * return * spawn thread id */ int my_stid(void) { return my_thread_ptr->stid; } /* * my_nsthreads * Return the number of threads in the current spawn. * * arguments: * none * * return * number of threads in the current spawn */ int my_nsthreads(void) { return my_thread_ptr->nsthreads; } /* * my_is_parallel * Return the is parallel flag * * arguments: * none * * return * 1- if we are parallel * 0- otherwise */ int my_is_parallel(void) { int rv; /* * if my_threads_are_init is set, then we are parallel, * otherwise we not. */ (void) pthread_mutex_lock(&my_thread_mutex); rv = my_threads_are_init; (void) pthread_mutex_unlock(&my_thread_mutex); return rv; } /* * my_complex_cpus * Return the number of threads in the current process. * * arguments: * none * * return * number of threads created by this process */ int my_complex_cpus(void) { int rv; /* * Return the number of threads that we current have. */ (void) pthread_mutex_lock(&my_thread_mutex); rv = thread_count; (void) pthread_mutex_unlock(&my_thread_mutex); return rv; } |
 |
Asymmetric parallelism |  |
Asymmetric parallelism is used when each thread executes a different,
independent instruction stream. Asymmetric threads are analogous
to the Unix fork system call construct
in that the threads are disjoined. Some of the asymmetric CPSlib functions map to pthread functions,
while others are no longer used, as noted below: cps_thread_create() spawned asymmetric
threads and now maps to the pthread function pthread_create().
cps_thread_createn(), which spawned asymmetric
threads with multiple arguments, also maps to pthread_create().
However, pthread_create() only
supports the passing of one argument.
CPSlib terminated asymmetric threads
using cps_thread_exit(), which now maps to the pthread
function pthread_exit().
cps_thread_register_lock has no corresponding pthread
function. It was formerly used in conjunction with m_lock,
both of which have been replaced with one call to pthread_join.
cps_plevel(), the CPSlib function which determined
the current level of parallelism, does not have a corresponding
pthread function, because levels do not mean anything to pthreads.
The first example in this section cps_create.c,
provides an example of the above CPSlib functions being used to
create asymmetric parallelism. /* * create.c * Show how to use all of the cps asymmetric functions. * */ #include <cps.h> mem_sema_t wait_lock; void tfunc(void *arg) { int i; /* * Register the wait_lock, so that the parent thread * can wait on us to exit. */ (void) cps_thread_register_lock(&wait_lock); for( i = 0; i < 100000; i++ ) /* spin for a spell */; printf("tfunc: ktid = %d\n", cps_ktid()); cps_thread_exit(); } main() { int node = 0; ktid_t ktid; /* * Initialize and lock the wait_lock. */ m_init32(&wait_lock, &node); m_cond_lock(&wait_lock); ktid = cps_thread_create(&node, tfunc, NULL); /* * We wait for the wait_lock to be release. That is * how we know that the child thread * has terminated. */ m_lock(&wait_lock); exit(0); }
|
The example below shows how to use the pth_create.c
function to map to asymmetric functions provided by the CPSlib example. /* * pth_create.c * Show how to use all of the pthread functions that map to cps asymmetric functions. * * */ #include <pthread.h> void tfunc(void *arg) { int i; for( i = 0; i < 100000; i++ ) /* spin for a spell */; printf("tfunc: ktid = %d\n", pthread_self()); pthread_exit(0); } main() { pthread_t ktid; int status; (void) pthread_create(&ktid, NULL, (void *(*)(void *) tfunc, NULL); /* * Wait for the child to terminate. */ (void) pthread_join(ktid, NULL); exit(0); } |
Synchronization using high-level functions |  |
This section demonstrates how to use
barriers and
mutexes to
synchronize symmetrically parallel code. Implicit barriers are operations in a program where threads
are restricted from completion based upon the status of the other
threads. For example, in the ppcall.c
example (“ppcall.c”), a join operation
occurs after all spawned threads terminate and before the function
returns. This type of implicit barrier is often the only type of
barrier required. The my_barrier.c example
shown below provides a pthreads implementation of CPSlib barrier
routines. This includes the following example functions: my_init_barrier is
similar to the cps_barrier_alloc
function in that it allocates the barrier (br)
and sets its associated memory counter to zero. my_barrier, like
the CPSlib function cps_barrier,
operates as barrier wait routine. When the value of the shared counter
is equal to the argument n (number of
threads), the counter is set to zero. my_barrier-destroy,
like
cps_barrier_free, releases the
barrier.
 |
/* * my_barrier.c *Code to support a fetch and increment type barrier. */ #ifndef _HPUX_SOURCE #define _HPUX_SOURCE #endif #include <pthread.h> #include <errno.h> /* * barrier * magic barrier valid flag * counter shared counter between threads * release shared release flag, used to signal waiting * threads to stop waiting. * lock binary semaphore use to control read/write * access to counter and write access to * release. */ struct barrier { int magic; int volatile counter; int volatile release; pthread_mutex_t lock; }; #define VALID_BARRIER 0x4242beef #define INVALID_BARRIER 0xdeadbeef typedef struct barrier barrier_t; typedef struct barrier *barrier_p; /* * my_barrier_init * Initialized a barrier for use. * * arguments * br- pointer to the barrier to be initialize. * * return * 0- success * >0- error code of failure. */ int my_barrier_init(barrier_p *br) { barrier_p b, n; int rv; b = (barrier_p) *br; if ( b != NULL ) return EINVAL; if ( (n = (barrier_p) malloc(sizeof(*n))) == NULL ) return ENOMEM; if ( (rv = pthread_mutex_init(&n->lock, NULL)) != 0 ) return rv; n->magic = VALID_BARRIER; n->counter = 0; n->release = 0; *br = n; return 0; } /* * my_barrier * barrier wait routine. * * arguments * br - barrier to wait on * n - number of threads to wait on * * return * 0 - success * EINVAL - invalid arguments */ int my_barrier(barrier_p br, int n) { int rv; int key; if ( br == NULL || br->magic != VALID_BARRIER ) return EINVAL; pthread_mutex_lock(&br->lock); key = br->release;/* get release flag */ rv = br->counter++;/* fetch and inc shared counter */ /* * See if we are the last thread into the barrier */ if ( rv == n-1 ) { /* * We are the last thread, so clear the counter * and signal the other threads by changing the * release flag. */ br->counter = 0; ++br->release; pthread_mutex_unlock(&br->lock); } else { pthread_mutex_unlock(&br->lock); /* * We are not the last thread, so wait * until the release flag changes. */ while( key == br->release ) /* spin */; } return 0; } /* * my_barrier_destroy *destroy a barrier * * arguments *b- barrier to destory * * return *0- success *> 0 - error code for why can not destroy barrier */ int my_barrier_destroy(barrier_p *b) { barrier_p br = (barrier_p) *b; int rv; if ( br == NULL || br->magic != VALID_BARRIER ) return EINVAL; if ( (rv = pthread_mutex_destroy(&br->lock)) != 0 ) return rv; br->magic = INVALID_BARRIER; br->counter = 0; br->release = 0; *b = NULL; return 0; } |
 |
Mutexes (
binary semaphores) allow threads to control access to shared data
and resources. The CPSlib mutex functions map directly to existing
pthread mutex functions as shown in Table A-1 “CPSlib library functions to pthreads mapping”. The example below, pth_mutex.c,
shows a basic pthread mutex program using the pthread_mutex_init,
pthread_mutex_lock, pthread_mutex_trylock, and pthread_mutex_unlock. There are some differences between the behavior of CPSlib
mutex functions and low-level locks (
cache semaphores and
memory semaphores) and the behavior of
pthread mutex functions, as described below: CPS cache and memory semaphores do not perform deadlock detection. The default pthread mutex does not perform deadlock
detection under HP-UX. This may be different from other operating
systems. pthread_mutex_lock will only detect
deadlock if the mutex is of the type PTHREAD_MUTEX_ERRORCHECK. All of the
CPSlib unlock routines allow other threads to release a lock that
they do not own. This is not true with pthread_mutex_unlock. If you do this with pthread_mutex_unlock,
it will result in undesirable behavior.
 |
/* * pth_mutex.c * Demostrate pthread mutex calls. * * Notes when switching from cps mutex, cache semaphore or * memory semaphores to pthread mutex: * *1) Cps cache and memory semaphores did no checking. *2) All of the cps semaphore unlock routines allow * other threads to release a lock that they do not * own. This is not the case with * pthread_mutex_unlock. It is either a error or a * undefinedbehavior. *3) The default pthread mutex does not do deadlock * detection under HP-UX (this can be different on other operation systems). */ #ifndef _HPUX_SOURCE #define _HPUX_SOURCE #endif #include <pthread.h> #include <errno.h> pthread_mutex_t counter_lock; int volatile counter = 0; void tfunc() { (void) pthread_mutex_lock(&counter_lock); ++counter; (void) pthread_mutex_unlock(&counter_lock); } main() { pthread_t tid; if ( (errno = pthread_mutex_init(&counter_lock, NULL)) != 0 ) { perror("pth_mutex: pthread_mutex_init failed"); abort(); } if ( (errno = pthread_create(&tid, NULL, (void *(*)(void *)) tfunc, NULL)) != 0 ) { perror("pth_mutex: pthread_create failed"); abort(); } tfunc(); (void) pthread_join(tid, NULL); if ( (errno = pthread_mutex_destroy(&counter_lock)) != 0 ) { perror("pth_mutex: pthread_mutex_destroy failed"); abort(); } if ( counter != 2 ) { errno = EINVAL; perror("pth_mutex: counter value is wrong"); abort(); } printf("PASSED\n"); exit(0); } |
 |
Synchronization using low-level functions |  |
This section demonstrates how to use semaphores to synchronize
symmetrically parallel code. This includes functions, such as low-level
locks, for which there are pthread mappings, and low-level counter
semaphores for which there are no pthread mappings. In this instance,
an example is provided so that you can create a program to emulate
CPSlib functions, using pthreads. Low-level counter semaphoresThe CPSlib
[mc]_init32 routines allocate and set the low-level
CPSlib semaphores to be used as counters. There are no pthread mappings
for these functions. However, a pthread example is provided below. This example, fetch_and_inc.c,
documents the following tasks: my_init
allocates a counter semaphore and initializes the counter associated
with it (p) to a value. my_fetch_and_clear
returns the current value of the counter associated with the semaphore
and clears the counter. my_fetch_and_inc increments
the value of the counter associated with the semaphore and returns
the old value. my_fetch_and_dec decrements
the value of the counter associated with the semaphore and returns
the old value. my_fetch_and_add
adds a value (int val) to the counter associated with the semaphore
and returns the old value of the integer. my_fetch_and_set
returns the current value of the counter associated with the semaphore,
and sets the semaphore to the new value contained in int val.
The [mc]_init32 routines
allocate and set the low-level cps semaphores to be used as either
counters or locks. An example for counters provides pthread implementation
in the place of the following CPSlib functions:
 |
/* * fetch_and_inc * How to support fetch_and_inc type semaphores using pthreads * */ #ifndef _HPUX_SOURCE #define _HPUX_SOURCE #endif #include <pthread.h> #include <errno.h> struct fetch_and_inc { int volatilevalue; pthread_mutex_tlock; }; typedef struct fetch_and_inc fetch_and_inc_t; typedef struct fetch_and_inc *fetch_and_inc_p; int my_init(fetch_and_inc_p *counter, int val) { fetch_and_inc_p p; int rv; if ( (p = (fetch_and_inc_p) malloc(sizeof(*p))) == NULL ) return ENOMEM; if ( (rv = pthread_mutex_init(&p->lock, NULL)) != 0 ) return rv; p->value = val; *counter = p; return 0; } int my_fetch(fetch_and_inc_p counter) { int rv; pthread_mutex_lock(&counter->lock); rv = counter->value; pthread_mutex_unlock(&counter->lock); return rv; } int my_fetch_and_clear(fetch_and_inc_p counter) { int rv; pthread_mutex_lock(&counter->lock); rv = counter->value; counter->value = 0; pthread_mutex_unlock(&counter->lock); return rv; } int my_fetch_and_inc(fetch_and_inc_p counter) { int rv; pthread_mutex_lock(&counter->lock); rv = counter->value++; pthread_mutex_unlock(&counter->lock); return rv; } int my_fetch_and_dec(fetch_and_inc_p counter) { int rv; pthread_mutex_lock(&counter->lock); rv = counter->value--; pthread_mutex_unlock(&counter->lock); return rv; } int my_fetch_and_add(fetch_and_inc_p counter, int val) { int rv; pthread_mutex_lock(&counter->lock); rv = counter->value; counter->value += val; pthread_mutex_unlock(&counter->lock); return rv; } int my_fetch_and_set(fetch_and_inc_p counter, int val) { int rv; pthread_mutex_lock(&counter->lock); rv = counter->value; counter->value = val; pthread_mutex_unlock(&counter->lock); return rv; } |
 |
|