diff --git a/src/mainParallel.c b/src/mainParallel.c index aa360cd..43e5cb6 100644 --- a/src/mainParallel.c +++ b/src/mainParallel.c @@ -83,7 +83,7 @@ The fact that you are presently reading this means that you have had knowledge o #define min(a,b) (a>=b?b:a) #define MAX_CHAR_SIZE 2048 #define MAX_CHR_NAME_SIZE 200 -#define SMALL_STACK (1024*1024*512) +#define SMALL_STACK (1024*1024) #define BIG_STACK (1024*1024*512) #ifdef TIMING @@ -213,6 +213,8 @@ int main(int argc, char *argv[]) { char *p, *q, *s, *e; int fd_in1; + int ret_code_1 = 0; + int goff_idx = 0; int proc_num, rank_num; int res, count; int files, nargs; @@ -553,7 +555,7 @@ int main(int argc, char *argv[]) { size_t size_chunk_2; size_t total_local_reads_aligned= 0; size_t total_reads_check = 0; - + size_t total_num_reads = 0; size_t grand_total_num_reads = 0; size_t *begin_offset_chunk = NULL; @@ -570,7 +572,7 @@ int main(int argc, char *argv[]) { pthread_t threads[NUM_THREADS]; - + pthread_attr_t attr; //MPI_Info finfo; //MPI_Info_create(&finfo); @@ -740,7 +742,7 @@ int main(int argc, char *argv[]) { * Rank 0 estimate the size of a read * */ - int blen; + int blen = 0; off_t tmp_sz = 1024; if (rank_num == 0){ // 512 Mo are @@ -763,16 +765,18 @@ int main(int argc, char *argv[]) { blen = q - p; p = ++q; /* Split local buffer in chunks of 100000000 bases */ free(buffer); + close(fd_tmp); } - + //Rank O broadcast the size of a read - res = MPI_Bcast(&blen, 1, MPI_LONG_LONG_INT, 0, MPI_COMM_WORLD); + res = MPI_Bcast(&blen, 1, MPI_INT, 0, MPI_COMM_WORLD); assert(res == MPI_SUCCESS); + //fprintf(stderr, "rank %d ::: blen = %d \n", rank_num, blen); + /* * Split sequence files in chunks */ - assert(fd_in1 != -1); size_t *goff = NULL; //global offset contain the start offset in the fastq @@ -787,7 +791,6 @@ int main(int argc, char *argv[]) { int i12=0; for ( i12 = 0; i12 < proc_num * NUM_THREADS_1 + 1; i12++ ) { - //fprintf(stderr, "rank %d :: goff[%d] = %zu \n", rank_num, i12, goff[i12] ); goff_inter[i12] = goff[i12]; } @@ -796,122 +799,104 @@ int main(int argc, char *argv[]) { res = MPI_Allgather(&goff_inter[rank_num*NUM_THREADS_1], NUM_THREADS_1, MPI_LONG_LONG_INT, goff , NUM_THREADS_1, MPI_LONG_LONG_INT, MPI_COMM_WORLD); assert(res == MPI_SUCCESS); - + MPI_Barrier(MPI_COMM_WORLD); // fprintf(stderr, "rank %d :: after mpigatherall \n", rank_num ); free(goff_inter); - //we compute the new size according to the shift - //We calculate the size to read for each process - - MPI_Barrier(MPI_COMM_WORLD); - - size_t local_num_reads = 0; - size_t total_num_reads = 0; - size_t *local_read_offsets = NULL;// calloc(1 , sizeof(size_t)); - size_t *local_read_bytes = NULL;//calloc(1, sizeof(size_t)); - int *local_read_size = NULL;//calloc(1, sizeof(int)); - - //assert( local_read_bytes != NULL); - //assert( local_read_offsets != NULL); - //assert( local_read_size != NULL); - bef = MPI_Wtime(); + //find number of reads + size_t *local_num_reads_t = calloc(NUM_THREADS_1, sizeof(size_t)); + size_t *total_num_reads_t = calloc(NUM_THREADS_1, sizeof(size_t)); - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, SMALL_STACK); - pthread_attr_setdetachstate(&attr, 0); + struct struct_data_thread_1 *td_1 = malloc(NUM_THREADS_1 * sizeof(struct struct_data_thread_1)); - pthread_t threads_1[NUM_THREADS_1]; + pthread_attr_t attr1; + pthread_attr_init(&attr1); + pthread_attr_setstacksize(&attr1, BIG_STACK); + pthread_attr_setdetachstate(&attr1, 0); + pthread_t threads_1[NUM_THREADS_1]; + + bef = MPI_Wtime(); + for ( n = 0; n < NUM_THREADS_1; n++){ - struct struct_data_thread_1 *td_1 = malloc(NUM_THREADS_1 * sizeof(struct struct_data_thread_1)); + goff_idx = (rank_num * NUM_THREADS_1) + n; + td_1[n].offset_in_file_mt = goff[goff_idx]; + td_1[n].size2read_mt = goff[goff_idx + 1] - goff[goff_idx]; + td_1[n].file_r1_mt = file_r1; + td_1[n].local_num_reads_mt = &local_num_reads_t[n]; + td_1[n].total_num_reads_mt = &total_num_reads_t[n]; + td_1[n].proc_num_mt = proc_num; + td_1[n].rank_num_mt = rank_num; + td_1[n].thread_num_mt = n; + ret_code_1 = pthread_create(&threads_1[n], &attr1, find_reads_number_mt, (void *)(&td_1[n])); + assert(ret_code_1 == 0); + } - size_t *local_num_reads_t = calloc(NUM_THREADS_1, sizeof(size_t)); - size_t *total_num_reads_t = calloc(NUM_THREADS_1, sizeof(size_t)); - size_t **local_read_offsets_t = calloc(NUM_THREADS_1, sizeof(size_t*)); - size_t **local_read_bytes_t = calloc(NUM_THREADS_1, sizeof(size_t*)); - int **local_read_size_t = calloc(NUM_THREADS_1, sizeof(int*)); - - int goff_idx = 0; - for ( n = 0; n < NUM_THREADS_1; n++){ - - goff_idx = (rank_num * NUM_THREADS_1) + n; - td_1[n].offset_in_file_mt = goff[goff_idx]; - td_1[n].size2read_mt = goff[goff_idx + 1] - goff[goff_idx]; - td_1[n].file_r1_mt = file_r1; - td_1[n].local_num_reads_mt = &local_num_reads_t[n]; - td_1[n].total_num_reads_mt = &total_num_reads_t[n]; - td_1[n].local_read_offsets_mt = &local_read_offsets_t[n]; - td_1[n].local_read_size_mt = &local_read_size_t[n]; - td_1[n].local_read_bytes_mt = &local_read_bytes_t[n]; - td_1[n].proc_num_mt = proc_num; - td_1[n].rank_num_mt = rank_num; - td_1[n].thread_num_mt = n; - td_1[n].previous_read_num = 0; - pthread_create(&threads_1[n], &attr, find_reads_size_and_offsets_mt, (void *)(&td_1[n])); + for (n = 0; n < NUM_THREADS_1; n++){ + pthread_join(threads_1[n], (void *)(&td_1[n])); + total_num_reads += *(td_1[n].total_num_reads_mt); + } + aft = MPI_Wtime(); - } - - - total_num_reads = 0; - for (n = 0; n < NUM_THREADS_1; n++){ - pthread_join(threads_1[n], (void *)(&td_1[n])); - total_num_reads += *(td_1[n].total_num_reads_mt); - } + size_t *local_read_offsets = calloc(total_num_reads , sizeof(size_t)); + size_t *local_read_bytes = calloc(total_num_reads, sizeof(size_t)); + int *local_read_size = calloc(total_num_reads, sizeof(int)); - local_read_offsets = calloc(total_num_reads, sizeof(size_t)); - local_read_size = calloc(total_num_reads, sizeof(int)); - local_read_bytes = calloc(total_num_reads, sizeof(size_t)); + assert( local_read_bytes != NULL); + assert( local_read_offsets != NULL); + assert( local_read_size != NULL); - assert(local_read_offsets); - assert(local_read_size); - assert(local_read_bytes); - size_t tmp_var = 0; - for (n = 0; n < NUM_THREADS_1; n++){ - td_1[n].local_read_offsets = local_read_offsets; - td_1[n].local_read_size = local_read_size; - td_1[n].local_read_bytes = local_read_bytes; - td_1[n].previous_read_num = tmp_var; - tmp_var += *(td_1[n].total_num_reads_mt); + //fprintf(stderr, "%s: rank %d after find_number_of_reads_mt total_num_reads = %zu in %f s\n", __func__, rank_num, total_num_reads, aft-bef ); - pthread_create(&threads_1[n], &attr, copy_local_read_info_mt, (void *)(&td_1[n])); + MPI_Barrier(MPI_COMM_WORLD); + //find offset and size of reads - } + size_t local_num_reads = 0; + //size_t total_num_reads = 0; + size_t u1 = 0; - for (n = 0; n < NUM_THREADS_1; n++){ - pthread_join(threads_1[n], (void *)(&td_1[n])); + bef = MPI_Wtime(); - free(local_read_offsets_t[n]); - free(local_read_bytes_t[n]); - free(local_read_size_t[n]); - } + pthread_attr_t attr2; + pthread_attr_init(&attr2); + pthread_attr_setstacksize(&attr2, BIG_STACK); + pthread_attr_setdetachstate(&attr2, 0); + pthread_t threads_2[NUM_THREADS_1]; - - free(local_num_reads_t); - free(total_num_reads_t); + struct struct_data_thread_2 *td_2 = malloc(NUM_THREADS_1 * sizeof(struct struct_data_thread_2)); - free(local_read_offsets_t); - free(local_read_bytes_t); - free(local_read_size_t); + size_t tmp_var = 0; - pthread_attr_destroy(&attr); - free(td_1); + for ( n = 0; n < NUM_THREADS_1; n++){ - - /* - find_reads_size_and_offsets(goff[ind], - siz2read, - file_r1, - &local_num_reads, - &total_num_reads, - &local_read_offsets, - &local_read_size, - &local_read_bytes, - proc_num, - rank_num); - */ + goff_idx = (rank_num * NUM_THREADS_1) + n; + td_2[n].offset_in_file_mt = goff[goff_idx]; + td_2[n].size2read_mt = goff[goff_idx + 1] - goff[goff_idx]; + td_2[n].file_r1_mt = file_r1; + td_2[n].proc_num_mt = proc_num; + td_2[n].rank_num_mt = rank_num; + td_2[n].thread_num_mt = n; + td_2[n].previous_read_num = tmp_var; + td_2[n].local_read_offsets = local_read_offsets; + td_2[n].local_read_size = local_read_size; + td_2[n].local_read_bytes = local_read_bytes; + td_2[n].total_num_read = *(td_1[n].total_num_reads_mt); + tmp_var += *(td_1[n].total_num_reads_mt); + + ret_code_1 = pthread_create(&threads_2[n], &attr2, find_reads_size_and_offsets_mt, (void *)(&td_2[n])); + assert(ret_code_1 == 0); + } + //total_num_reads = 0; + for (n = 0; n < NUM_THREADS_1; n++){ + pthread_join(threads_2[n], (void *)(&td_2[n])); + } + + pthread_attr_destroy(&attr1); + pthread_attr_destroy(&attr2); + free(td_1); + free(td_2); MPI_Barrier(MPI_COMM_WORLD); aft = MPI_Wtime(); @@ -941,6 +926,7 @@ int main(int argc, char *argv[]) { chunck_num += 2; //the last chunk hold the remain bases size_t h=0; + for ( h = 0; h < total_num_reads; h++){ assert( local_read_size[h] == blen ); assert( local_read_offsets[h] >= 0 ); @@ -1098,7 +1084,7 @@ int main(int argc, char *argv[]) { res = MPI_Barrier(MPI_COMM_WORLD); assert(res == MPI_SUCCESS); aft = MPI_Wtime(); - xfprintf(stderr, "%s: synched processes (%.02f)\n", __func__, aft - bef); + //xfprintf(stderr, "%s: synched processes (%.02f)\n", __func__, aft - bef); res = MPI_File_open(MPI_COMM_WORLD, file_out_ext, MPI_MODE_WRONLY|MPI_MODE_APPEND, MPI_INFO_NULL, &fh_out); @@ -1155,6 +1141,8 @@ int main(int argc, char *argv[]) { before_local_mapping = MPI_Wtime(); //we loop the chunck_count //size_t u1 = rank_num; + fprintf(stderr, "rank %d ::: total_chunks = %zu \n",rank_num, total_chunks); + while ( u1 < total_chunks){ offset_chunk = all_begin_offset_chunk[u1]; @@ -1175,15 +1163,15 @@ int main(int argc, char *argv[]) { buffer_r2[size_chunk]=0; struct struct_pread_fastq *td_pread1; - td_pread1 = malloc (NUM_THREADS * sizeof(struct struct_pread_fastq)); + td_pread1 = malloc (NUM_THREADS_1 * sizeof(struct struct_pread_fastq)); bef = MPI_Wtime(); pthread_attr_t attr4; pthread_attr_init(&attr4); pthread_attr_setstacksize(&attr4, BIG_STACK); pthread_attr_setdetachstate(&attr4, 0); - for( n = 0; n < NUM_THREADS; n++ ){ - td_pread1[n].total_thread = NUM_THREADS; + for( n = 0; n < NUM_THREADS_1; n++ ){ + td_pread1[n].total_thread = NUM_THREADS_1; td_pread1[n].thread_id = n; td_pread1[n].job_rank = rank_num; td_pread1[n].offset= offset_chunk; @@ -1195,7 +1183,7 @@ int main(int argc, char *argv[]) { fprintf(stderr, "ERROR; return code from pthread_create() is %d\n", ret_code); } } - for(n=0; nchunk_size=10000000; @@ -2026,6 +2014,8 @@ int main(int argc, char *argv[]) { if (local_read_offsets) free(local_read_offsets); if (local_read_size) free(local_read_size); + if (local_read_bytes) free(local_read_bytes); + if (local_read_bytes_2) free(local_read_bytes_2); if (local_read_offsets_2) free(local_read_offsets_2); if (local_read_size_2) free(local_read_size_2); @@ -2539,6 +2529,7 @@ int main(int argc, char *argv[]) { struct thread_data *td; td = malloc (NUM_THREADS * sizeof(struct thread_data)); bef = MPI_Wtime(); + pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, SMALL_STACK); pthread_attr_setdetachstate(&attr, 0); @@ -2784,6 +2775,7 @@ int main(int argc, char *argv[]) { struct thread_data_compress_by_chr *tdc; tdc = malloc (NUM_THREADS * sizeof(struct thread_data_compress_by_chr)); bef = MPI_Wtime(); + pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, SMALL_STACK); pthread_attr_setdetachstate(&attr, 0); @@ -2917,37 +2909,19 @@ int main(int argc, char *argv[]) { ///find offsets and sizes for the first file bef = MPI_Wtime(); - /* - find_reads_size_and_offsets(goff[ind], - siz2read, - file_r1, - &local_num_reads, - &total_num_reads, - &local_read_offsets, - &local_read_size, - &local_read_bytes, - proc_num, - rank_num); - */ + + size_t *local_num_reads_t = calloc(NUM_THREADS_1, sizeof(size_t)); + size_t *total_num_reads_t = calloc(NUM_THREADS_1, sizeof(size_t)); - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, SMALL_STACK); - pthread_attr_setdetachstate(&attr, 0); + struct struct_data_thread_1 *td_1 = malloc(NUM_THREADS_1 * sizeof(struct struct_data_thread_1)); + pthread_attr_t attr1; + pthread_attr_init(&attr1); + pthread_attr_setstacksize(&attr1, BIG_STACK); + pthread_attr_setdetachstate(&attr1, 0); pthread_t threads_1[NUM_THREADS_1]; - struct struct_data_thread_1 *td_1; - - size_t *local_num_reads_t = calloc(NUM_THREADS_1, sizeof(size_t)); - size_t *total_num_reads_t = calloc(NUM_THREADS_1, sizeof(size_t)); - size_t **local_read_offsets_t = calloc(NUM_THREADS_1, sizeof(size_t*)); - size_t **local_read_bytes_t = calloc(NUM_THREADS_1, sizeof(size_t*)); - int **local_read_size_t = calloc(NUM_THREADS_1, sizeof(int*)); - - td_1 = malloc(NUM_THREADS_1 * sizeof( struct struct_data_thread_1)); - - int goff_idx = 0; + bef = MPI_Wtime(); for ( n = 0; n < NUM_THREADS_1; n++){ goff_idx = (rank_num * NUM_THREADS_1) + n; @@ -2956,59 +2930,68 @@ int main(int argc, char *argv[]) { td_1[n].file_r1_mt = file_r1; td_1[n].local_num_reads_mt = &local_num_reads_t[n]; td_1[n].total_num_reads_mt = &total_num_reads_t[n]; - td_1[n].local_read_offsets_mt = &local_read_offsets_t[n]; - td_1[n].local_read_size_mt = &local_read_size_t[n]; - td_1[n].local_read_bytes_mt = &local_read_bytes_t[n]; td_1[n].proc_num_mt = proc_num; td_1[n].rank_num_mt = rank_num; td_1[n].thread_num_mt = n; - td_1[n].previous_read_num = 0; - pthread_create(&threads_1[n], &attr, find_reads_size_and_offsets_mt, (void *)(&td_1[n])); - + ret_code_1 = pthread_create(&threads_1[n], &attr1, find_reads_number_mt, (void *)(&td_1[n])); + assert(ret_code_1 == 0); } - total_num_reads = 0; - for (n = 0; n < NUM_THREADS_1; n++){ + + for (n = 0; n < NUM_THREADS_1; n++){ pthread_join(threads_1[n], (void *)(&td_1[n])); total_num_reads += *(td_1[n].total_num_reads_mt); } + + local_read_offsets = calloc(total_num_reads , sizeof(size_t)); + local_read_bytes = calloc(total_num_reads, sizeof(size_t)); + local_read_size = calloc(total_num_reads, sizeof(int)); + + assert( local_read_bytes != NULL); + assert( local_read_offsets != NULL); + assert( local_read_size != NULL); - local_read_offsets = calloc(total_num_reads, sizeof(size_t)); - local_read_size = calloc(total_num_reads, sizeof(int)); - local_read_bytes = calloc(total_num_reads, sizeof(size_t)); + pthread_attr_t attr2; + pthread_attr_init(&attr2); + pthread_attr_setstacksize(&attr2, BIG_STACK); + pthread_attr_setdetachstate(&attr2, 0); + pthread_t threads_2[NUM_THREADS_1]; - assert(local_read_offsets); - assert(local_read_size); - assert(local_read_bytes); + struct struct_data_thread_2 *td_2 = malloc(NUM_THREADS_1 * sizeof(struct struct_data_thread_2)); size_t tmp_var = 0; - for (n = 0; n < NUM_THREADS_1; n++){ - td_1[n].local_read_offsets = local_read_offsets; - td_1[n].local_read_size = local_read_size; - td_1[n].local_read_bytes = local_read_bytes; - td_1[n].previous_read_num = tmp_var; - tmp_var += *(td_1[n].total_num_reads_mt); - pthread_create(&threads_1[n], &attr, copy_local_read_info_mt, (void *)(&td_1[n])); + for ( n = 0; n < NUM_THREADS_1; n++){ + + goff_idx = (rank_num * NUM_THREADS_1) + n; + td_2[n].offset_in_file_mt = goff[goff_idx]; + td_2[n].size2read_mt = goff[goff_idx + 1] - goff[goff_idx]; + td_2[n].file_r1_mt = file_r1; + td_2[n].proc_num_mt = proc_num; + td_2[n].rank_num_mt = rank_num; + td_2[n].thread_num_mt = n; + td_2[n].previous_read_num = tmp_var; + td_2[n].local_read_offsets = local_read_offsets; + td_2[n].local_read_size = local_read_size; + td_2[n].local_read_bytes = local_read_bytes; + td_2[n].total_num_read = *(td_1[n].total_num_reads_mt); + tmp_var += *(td_1[n].total_num_reads_mt); + + ret_code_1 = pthread_create(&threads_2[n], &attr2, find_reads_size_and_offsets_mt, (void *)(&td_2[n])); + assert(ret_code_1 == 0); } + //total_num_reads = 0; for (n = 0; n < NUM_THREADS_1; n++){ - pthread_join(threads_1[n], (void *)(&td_1[n])); - - free(local_read_offsets_t[n]); - free(local_read_bytes_t[n]); - free(local_read_size_t[n]); + pthread_join(threads_2[n], (void *)(&td_2[n])); } - free(local_num_reads_t); - free(total_num_reads_t); - - free(local_read_offsets_t); - free(local_read_bytes_t); - free(local_read_size_t); + MPI_Barrier(MPI_COMM_WORLD); - pthread_attr_destroy(&attr); + pthread_attr_destroy(&attr1); + pthread_attr_destroy(&attr2); free(td_1); + free(td_2); aft = MPI_Wtime(); fprintf(stderr, "%s: rank %d num reads parsed: %zu ::: time spend reading and parsing entire buffer = (%.02f) \n", __func__, rank_num, total_num_reads, aft - bef); @@ -3518,7 +3501,7 @@ int main(int argc, char *argv[]) { int ret_code = 0; struct thread_data_compress_by_chr_single *tdc; tdc = malloc (NUM_THREADS * sizeof(struct thread_data_compress_by_chr_single)); - + pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, SMALL_STACK); pthread_attr_setdetachstate(&attr, 0); @@ -3567,7 +3550,7 @@ int main(int argc, char *argv[]) { int ret_code = 0; struct thread_data_compress_by_chr_single *tdc; tdc = malloc (NUM_THREADS * sizeof(struct thread_data_compress_by_chr_single)); - + pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, SMALL_STACK); pthread_attr_setdetachstate(&attr, 0); diff --git a/src/parallel_aux.c b/src/parallel_aux.c index 7b0d8f2..3e8fefb 100644 --- a/src/parallel_aux.c +++ b/src/parallel_aux.c @@ -45,7 +45,7 @@ The fact that you are presently reading this means that you have had knowledge o #include "bgzf.h" #include "bgzf.c" -#define DEFAULT_INBUF_SIZE (1024*1024*512) +#define DEFAULT_INBUF_SIZE (1024*1024*1024) #define NB_PROC "16" //numer of threads for writing #define CB_NODES "2" //numer of server for writing #define CB_BLOCK_SIZE "268435456" /* 256 MBytes - should match FS block size */ @@ -55,7 +55,7 @@ The fact that you are presently reading this means that you have had knowledge o #define MAX_CHAR_SIZE 2048 #define MAX_CHR_NAME_SIZE 200 #define SMALL_STACK (1024*1024) - +#define BIG_STACK (1024*1024*512) #ifdef TIMING #define xfprintf fprintf @@ -142,6 +142,113 @@ void init_goff(size_t *goff, MPI_File mpi_filed, size_t fsize,int numproc,int ra return; } +void *find_reads_number_mt(void *thread_arg){ + + #define _read_token_ret(_p) (_p); do { char *tab = strchr((_p), '\n'); if (!tab) goto err_ret; (_p) = tab + 1; } while (0) + struct struct_data_thread_1 *my_data; + my_data = (struct struct_data_thread_1 *) thread_arg; + + size_t offset_in_file = my_data->offset_in_file_mt; + size_t siz2read = my_data->size2read_mt; + char *file_to_read = my_data->file_r1_mt; + size_t *p_local_num_reads = my_data->local_num_reads_mt; + size_t *p_total_num_reads = my_data->total_num_reads_mt; + int proc_num = my_data->proc_num_mt; + int rank_num = my_data->rank_num_mt; + int thread_num = my_data->thread_num_mt; + + MPI_File mpi_fd; + MPI_Status status; + int count; + //int fd; + int res; + res = MPI_File_open(MPI_COMM_SELF, file_to_read, MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fd); + assert(res==MPI_SUCCESS); + char *b, *r, *t, *e, *p, *q; + size_t total_computed = 0; + size_t offset_end_buff; + size_t pos_in_vect = 0; + size_t lines = 0; + size_t total_parsing = 0; + size_t g=0; + + + //preallocation + char *buffer_r = malloc(DEFAULT_INBUF_SIZE + 1); + assert( buffer_r != NULL ); + buffer_r[DEFAULT_INBUF_SIZE] = '0'; + + + *p_total_num_reads = 0; + size_t read_buffer_sz = 0; + if ( siz2read < DEFAULT_INBUF_SIZE ) read_buffer_sz = siz2read; + else read_buffer_sz = DEFAULT_INBUF_SIZE; + double bef,aft; + + while (1){ + + //bef = MPI_Wtime(); + res = MPI_File_read_at(mpi_fd, (MPI_Offset)offset_in_file, buffer_r, read_buffer_sz, MPI_CHAR, &status); + assert(res == MPI_SUCCESS); + res = MPI_Get_count(&status, MPI_CHAR, &count); + assert(res == MPI_SUCCESS); + assert(*buffer_r == '@'); + //aft = MPI_Wtime(); + //fprintf(stderr, "%s: rank %d thread %d time to read a chunck %f\n", __func__, rank_num, thread_num, aft - bef); + + //bef = MPI_Wtime(); + b = buffer_r; + r = b + read_buffer_sz; + + if ( read_buffer_sz == DEFAULT_INBUF_SIZE){ + while (r-- != b){if (*r == '\n' && *(r+1) == '+') {r--; break;}} + while (r-- != b){if (*r == '\n') break;} + while (r-- != b){if (*r == '@') break;} + r--; + offset_end_buff = (r - b); + } + else + offset_end_buff = (r - b); + + t = buffer_r ; + e = buffer_r + offset_end_buff; + lines = 0; + p = buffer_r; + q = buffer_r; + total_computed = 0; + + while (p < e) { q = _read_token_ret(p); lines++; q = p; } + + *p_local_num_reads = (lines/4); + *p_total_num_reads += *p_local_num_reads; + + + total_parsing += offset_end_buff; + if (total_parsing == siz2read) { break;} + if ((siz2read - total_parsing) < DEFAULT_INBUF_SIZE) + read_buffer_sz = siz2read - total_parsing; + else read_buffer_sz = DEFAULT_INBUF_SIZE; + + offset_in_file += offset_end_buff + 1; + //aft = MPI_Wtime(); + //fprintf(stderr, "%s: rank %d thread %d time to analyze a chunck %f\n", __func__, rank_num, thread_num, aft - bef); + } + + free(buffer_r); + assert(total_parsing == siz2read); + MPI_File_close(&mpi_fd); + + return (void *)0; + + err_ret: + return (void *)-2; + +} + + + + + ///Function used to find the starting reading offset in a given file according to the number of processes used. //Parameters (ordered): pointer on the vector that will save these offsets // the size of the given file @@ -162,13 +269,13 @@ void find_process_starting_offset_mt(size_t *goff, size_t size, char* file_to_re assert(res==MPI_SUCCESS); ///other resources - off_t tmp_sz = 2048; //size of the sample from the file, big enough to contain a full read + off_t tmp_sz = 1024; //size of the sample from the file, big enough to contain a full read char *buffer_r0 = malloc( tmp_sz + 1); //buffer used to save the sample assert(buffer_r0); buffer_r0[tmp_sz] = '\0'; size_t lsize = size/(proc_num * nthreads); //proportion of the file 1 process should read int i; //used as an iterator - char *p, *e; //pointers on the buffer ot find the start of a read + char *p, *e, *q; //pointers on the buffer ot find the start of a read ///define the arbitrary offsets goff[0]=0; @@ -180,50 +287,39 @@ void find_process_starting_offset_mt(size_t *goff, size_t size, char* file_to_re while ( k < nthreads){ - - res = MPI_File_read_at(mpi_fd, (MPI_Offset)goff[first_index + k], buffer_r0, tmp_sz, MPI_CHAR, &status); //read the wanted part of the file nd save it into the buffer + res = MPI_File_read_at(mpi_fd, (MPI_Offset)goff[first_index + k], buffer_r0, tmp_sz, MPI_CHAR, &status); assert(res == MPI_SUCCESS); p = buffer_r0; e = buffer_r0 + tmp_sz; - //browse the buffer to find the beginning of the next read - - /* - while (p < e) { - if (*p != '+') { p++; continue; } - if (p != buffer_r0 && *(p-1) != '\n') { p++; continue; } - while (p < e && *p != '\n') p++; - p++; - while (p < e && *p != '\n') p++; - p++; - if (p < e && *p == '@') break; - p++; - } - */ - /* - while (p < e) { - if (*p != '@') { p++; continue; } - if (p != buffer_r0 && *(p-1) != '\n') { p++; continue; } - q = p + 1; - while (q < e && *q != '\n') q++; - q++; - while (q < e && *q != '\n') q++; - q++; - if (q < e && *q == '+') break; - p++; - } - */ - while (p++ != e){if (*p == '\n' && *(p+1) == '+' && *(p+2) == '\n') {p++; break;}} - p++; - while (p++ != e){if (*p == '\n') break;} - p++; - - assert(*p == '@'); + /* + while (p++ != e){if (*p == '\n' && *(p+1) == '+' && *(p+2) == '\n') {p++; break;}} + p++; + while (p++ != e){if (*p == '\n') break;} + p++; + assert(*p == '@'); //we update begining offsets with the found value goff[first_index + k] += p - buffer_r0; memset( buffer_r0, 0, tmp_sz * sizeof(char)); k++; + */ + while (p < e) { + if (*p != '@') { p++; continue; } + if (p != buffer_r0 && *(p-1) != '\n') { p++; continue; } + q = p + 1; + while (q < e && *q != '\n') q++; q++; + while (q < e && *q != '\n') q++; q++; + if (q < e && *q == '+') break; + p++; + + } + + assert(*p == '@'); + //we update begining offsets with the found value + goff[first_index + k] += p - buffer_r0; + memset( buffer_r0, 0, tmp_sz * sizeof(char)); + k++; } //fprintf(stderr, "finish finding offset rank =%d \n", rank_num); ///free the resources no longer needed @@ -235,129 +331,153 @@ void find_process_starting_offset_mt(size_t *goff, size_t size, char* file_to_re } void *find_reads_size_and_offsets_mt(void *thread_arg){ + #define _read_token_ret(_p) (_p); do { char *tab = strchr((_p), '\n'); if (!tab) goto err_ret; (_p) = tab + 1; } while (0) + + struct struct_data_thread_2 *my_data; + my_data = (struct struct_data_thread_2 *) thread_arg; + + size_t offset_in_file = my_data->offset_in_file_mt; + size_t siz2read = my_data->size2read_mt; + char *file_to_read = my_data->file_r1_mt; + size_t *local_read_offsets = my_data->local_read_offsets; + int *local_read_size = my_data->local_read_size; + size_t *local_read_bytes = my_data->local_read_bytes; + int proc_num = my_data->proc_num_mt; + int rank_num = my_data->rank_num_mt; + int thread_num = my_data->thread_num_mt; + + MPI_File mpi_fd; + MPI_Status status; + int count; + //int fd; + int res; + res = MPI_File_open(MPI_COMM_SELF, file_to_read, MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fd); + assert(res==MPI_SUCCESS); + //fd = open(file_to_read, O_RDONLY); + + + //char *buffer_r; + char *b, *r, *t, *e, *p, *q, *start_read_p, *end_read_p; + size_t total_computed = 0; + size_t offset_end_buff; + size_t start_read_offset = 0; + size_t end_read_offset = 0; + size_t pos_in_vect = my_data->previous_read_num; + size_t lines = 0; + size_t total_parsing = 0; + size_t g=0; + size_t start_read=0; + + MPI_Datatype arraytype; + MPI_Datatype arraytype0; + MPI_Datatype arraytype_r1; + MPI_Datatype arraytype_r2; + + double bef,aft; + //preallocation + char *buffer_r = malloc(DEFAULT_INBUF_SIZE + 1); + assert( buffer_r != NULL ); + buffer_r[DEFAULT_INBUF_SIZE] = '0'; + size_t read_buffer_sz = 0; + if ( siz2read < DEFAULT_INBUF_SIZE ) read_buffer_sz = siz2read; + else read_buffer_sz = DEFAULT_INBUF_SIZE; + + assert(local_read_offsets); + assert(local_read_size); + assert(local_read_bytes); + + while (1){ + + + //bef = MPI_Wtime(); + + res = MPI_File_read_at(mpi_fd, (MPI_Offset)offset_in_file, buffer_r, read_buffer_sz, MPI_CHAR, &status); + assert(res == MPI_SUCCESS); + res = MPI_Get_count(&status, MPI_CHAR, &count); + assert(res == MPI_SUCCESS); + assert(*buffer_r == '@'); + + /* + aft = MPI_Wtime(); + if (rank_num == 0 && thread_num == 0) + fprintf(stderr, "%s: rank %d thread %d time to read a chunck = %f\n", __func__, rank_num, thread_num, aft - bef); + */ + + b = buffer_r; + r = b + read_buffer_sz; + + if ( read_buffer_sz == DEFAULT_INBUF_SIZE){ + while (r-- != b){if (*r == '\n' && *(r+1) == '+') {r--; break;}} + while (r-- != b){if (*r == '\n') break;} + while (r-- != b){if (*r == '@') break;} + r--; + offset_end_buff = (r - b); + } + else + offset_end_buff = (r - b); - struct struct_data_thread_1 *my_data; - my_data = (struct struct_data_thread_1 *) thread_arg; - - size_t offset_in_file = my_data->offset_in_file_mt; - size_t siz2read = my_data->size2read_mt; - char *file_to_read = my_data->file_r1_mt; - size_t *p_local_num_reads = my_data->local_num_reads_mt; - size_t *p_total_num_reads = my_data->total_num_reads_mt; - size_t **local_read_offsets = my_data->local_read_offsets_mt; - int **local_read_size = my_data->local_read_size_mt; - size_t **local_read_bytes = my_data->local_read_bytes_mt; - //int rank_num = my_data->rank_num_mt; - //int thread_num = my_data->thread_num_mt; + p = buffer_r; + q = buffer_r; + t = buffer_r; + e = buffer_r + offset_end_buff; + int size=0; + start_read_offset = 0; + g = offset_in_file; + + while (p < e){ + + //qname + start_read_p = p; + + q = _read_token_ret(p); + q = p; + //read + q = _read_token_ret(p); + size = p - q - 1; + q = p; + //+ + q = _read_token_ret(p); + q = p; + //qual + q = _read_token_ret(p); + q = p; + end_read_p = p; + + (local_read_offsets)[pos_in_vect] = g; + (local_read_bytes)[pos_in_vect] = (end_read_p - start_read_p); + assert((local_read_bytes)[pos_in_vect] != 0); + (local_read_size)[pos_in_vect] = size; + assert((local_read_size)[pos_in_vect] != 0); + + size = 0; + pos_in_vect++; + g += (end_read_p - start_read_p); + //t++;g++; - MPI_File mpi_fd = NULL; - MPI_Status status; - int res; - res = MPI_File_open(MPI_COMM_WORLD, file_to_read, MPI_MODE_RDONLY, MPI_INFO_NULL, &mpi_fd); - assert(res==MPI_SUCCESS); - char *b, *r, *t, *e, *i; - size_t offset_end_buff; - size_t pos_in_vect = 0; - size_t lines = 0; - size_t lines3 = 0; - int size = 0; - size_t total_parsing = 0; - size_t g=0; - size_t start_read_offset = 0; - *p_total_num_reads = 0; - size_t read_buffer_sz = 0; - if ( siz2read < DEFAULT_INBUF_SIZE ) read_buffer_sz = siz2read; - else read_buffer_sz = DEFAULT_INBUF_SIZE; - char *buffer_r = NULL; - int last_round = 0; - while (1){ - buffer_r = malloc((read_buffer_sz + 1)*sizeof(char)); - assert(buffer_r ); - buffer_r[read_buffer_sz] = '\0'; - res = MPI_File_read_at(mpi_fd, (MPI_Offset)offset_in_file, buffer_r, read_buffer_sz, MPI_CHAR, &status); - assert(res == MPI_SUCCESS); - assert(*buffer_r == '@'); - read_buffer_sz = strlen(buffer_r); - b = buffer_r; - r = buffer_r + read_buffer_sz; - if ( read_buffer_sz == DEFAULT_INBUF_SIZE){ - while (r-- != b){if ((*r == '+') && (*(r+1) == '\n') && (*(r-1) == '\n')) {r--; break;}} - while (r-- != b){if (*r == '@') break;} - r--; - offset_end_buff = (r - b); - } - else{ - last_round = 1; - offset_end_buff = (r - b); - assert( offset_end_buff == read_buffer_sz); - } - t = buffer_r ; - e = buffer_r + offset_end_buff; - lines = 0; - do{ - if (*t == '\n') lines++; - } while (t++ < e); - *p_local_num_reads = (lines/4); - *p_total_num_reads += *p_local_num_reads; - *local_read_size = (int *)realloc(*local_read_size, sizeof(int) * (*p_total_num_reads)); - *local_read_offsets = (size_t *)realloc(*local_read_offsets, sizeof(size_t) * (*p_total_num_reads)); - *local_read_bytes = (size_t *)realloc(*local_read_bytes, sizeof(size_t) * (*p_total_num_reads)); - assert( *local_read_offsets != NULL); - assert( *local_read_size != NULL); - assert( *local_read_bytes != NULL); - i= buffer_r; - lines3 = 0; - size = 0; - start_read_offset = 0; - g = offset_in_file; - do{ - if (lines3 == lines) break; - assert( *i == '@'); - start_read_offset = g; - while (*i != '\n'){ i++; g++;} - i++;g++; lines3++; - while (*i != '\n' ){ size++; i++; g++;} - i++;g++; lines3++; - while (*i != '\n' ){ i++; g++;} - i++;g++; lines3++; - while (*i != '\n' ){ i++; g++;} - lines3++; - (*local_read_offsets)[pos_in_vect] = start_read_offset; - (*local_read_bytes)[pos_in_vect] = (g - start_read_offset) + 1; - assert((*local_read_bytes)[pos_in_vect] != 0); - (*local_read_size)[pos_in_vect] = size; - assert((*local_read_size)[pos_in_vect] != 0); + } - size = 0; - pos_in_vect++; - i++;g++; - }while (i < e); + total_parsing += offset_end_buff; + if (total_parsing == siz2read) { break;} + if ((siz2read - total_parsing) < DEFAULT_INBUF_SIZE) + read_buffer_sz = siz2read - total_parsing; + else read_buffer_sz = DEFAULT_INBUF_SIZE; - assert( lines == lines3 ); - if (!last_round) - total_parsing += offset_end_buff + 1; - else - total_parsing += offset_end_buff; + offset_in_file += offset_end_buff + 1; + } - if (total_parsing == siz2read) {free(buffer_r); break;} - free(buffer_r); - if ((siz2read - total_parsing) < DEFAULT_INBUF_SIZE) - read_buffer_sz = siz2read - total_parsing; - else - read_buffer_sz = DEFAULT_INBUF_SIZE; + free(buffer_r); + assert(total_parsing == siz2read); + MPI_File_close(&mpi_fd); + + return (void *)0; - offset_in_file += offset_end_buff + 1; - } - assert(total_parsing == siz2read); - MPI_File_close(&mpi_fd); - return 0; + err_ret: + return (void *)-2; } - - /* * * LEFT FOR BACKUP - * BUT... + * BUT void *find_reads_size_and_offsets_mt(void *thread_arg){ @@ -514,7 +634,7 @@ void *find_reads_size_and_offsets_mt(void *thread_arg){ close(fd); return 0; } -*/ + void *copy_local_read_info_mt(void *thread_arg){ struct struct_data_thread_1 *my_data; @@ -537,7 +657,7 @@ void *copy_local_read_info_mt(void *thread_arg){ //fprintf(stderr, "finish copy_local_read_info_mt rank %d thread %d step 1 \n", rank_num, thread_num); return 0; } - +*/ diff --git a/src/parallel_aux.h b/src/parallel_aux.h index 0373899..3732ab8 100644 --- a/src/parallel_aux.h +++ b/src/parallel_aux.h @@ -95,6 +95,7 @@ void *compress_thread_by_chr(void *threadarg); void *compress_thread_by_chr_single(void *threadarg); void *call_fixmate(void *threadarg); void *copy_local_read_info_mt(void *thread_arg); +void *find_reads_number_mt(void *thread_arg); void *find_reads_size_and_offsets_mt(void *thread_arg); void *copy_buffer_write_thr(void *thread_arg); void compute_buffer_size_thr(void *thread_arg); @@ -187,16 +188,26 @@ struct struct_data_thread{ }; -//struct used to find read size, offset, and bytes in multithread struct struct_data_thread_1{ size_t offset_in_file_mt; size_t size2read_mt; char *file_r1_mt; size_t *local_num_reads_mt; size_t *total_num_reads_mt; - size_t **local_read_offsets_mt; - int **local_read_size_mt; - size_t **local_read_bytes_mt; + int proc_num_mt; + int rank_num_mt; + int thread_num_mt; +}; + +//struct used to find read size, offset, and bytes in multithread +struct struct_data_thread_2{ + size_t offset_in_file_mt; + size_t size2read_mt; + char *file_r1_mt; + size_t total_num_read; + size_t *local_read_offsets_mt; + int *local_read_size_mt; + size_t *local_read_bytes_mt; size_t *local_read_offsets; int *local_read_size; size_t *local_read_bytes; @@ -204,10 +215,8 @@ struct struct_data_thread_1{ int proc_num_mt; int rank_num_mt; int thread_num_mt; - MPI_Comm comm_mt; }; - struct struct_pread_fastq{ int total_thread;