From 74539d5afc9660a9ceb6353c3cdde8610bc729f8 Mon Sep 17 00:00:00 2001 From: schutzekatze Date: Wed, 7 Oct 2020 17:28:34 -0700 Subject: [PATCH] Fix buffer-to-file transition bug in SeqReader --- docs/seq__reader_8hpp_source.html | 748 +++++++++++++++--------------- docs/seq_reader_8cpp-example.html | 2 +- include/btllib/seq_reader.hpp | 10 + 3 files changed, 390 insertions(+), 370 deletions(-) diff --git a/docs/seq__reader_8hpp_source.html b/docs/seq__reader_8hpp_source.html index 555b7d16..d77f2348 100644 --- a/docs/seq__reader_8hpp_source.html +++ b/docs/seq__reader_8hpp_source.html @@ -922,405 +922,415 @@
864  seq_reader.read_stage = ReadStage::HEADER;
865  return true;
866  }
-
867  }
-
868  return false;
-
869  }
-
870 };
-
871 
-
872 struct SeqReader::read_sam_buffer
-
873 {
-
874  bool operator()(SeqReader& seq_reader)
-
875  {
-
876  READ_SAM( // NOLINT
-
877  if (!seq_reader.readline_buffer_append( // NOLINT
-
878  seq_reader.tmp)) { return false; }, // NOLINT
-
879  seq_reader.tmp.clear(); // NOLINT
-
880  return true; // NOLINT
-
881  ,
-
882  if (seq_reader.buffer_start >= seq_reader.buffer_end) {
-
883  return false;
-
884  }) // NOLINT
-
885  }
-
886 };
-
887 
-
888 struct SeqReader::read_gfa2_buffer
-
889 {
-
890  bool operator()(SeqReader& seq_reader)
-
891  {
-
892  READ_GFA2( // NOLINT
-
893  if (!seq_reader.readline_buffer_append( // NOLINT
-
894  seq_reader.tmp)) { return false; }, // NOLINT
-
895  seq_reader.tmp.clear(); // NOLINT
-
896  return true; // NOLINT
-
897  ,
-
898  if (seq_reader.buffer_start >= seq_reader.buffer_end) {
-
899  return false;
-
900  }) // NOLINT
-
901  }
-
902 };
-
903 
-
904 struct SeqReader::read_fasta_transition
-
905 {
-
906  void operator()(SeqReader& seq_reader)
-
907  {
-
908  switch (seq_reader.read_stage) {
-
909  case ReadStage::HEADER: {
-
910  seq_reader.readline_file_append(seq_reader.reader_record->header);
-
911  seq_reader.read_stage = ReadStage::SEQ;
-
912  }
-
913  // fall through
-
914  case ReadStage::SEQ: {
-
915  seq_reader.readline_file_append(seq_reader.reader_record->seq);
-
916  seq_reader.read_stage = ReadStage::HEADER;
-
917  }
-
918  default: {
-
919  log_error("SeqReader has entered an invalid state.");
-
920  std::exit(EXIT_FAILURE);
-
921  }
-
922  }
-
923  }
-
924 };
-
925 
-
926 struct SeqReader::read_fastq_transition
-
927 {
-
928  void operator()(SeqReader& seq_reader)
-
929  {
-
930  switch (seq_reader.read_stage) {
-
931  case ReadStage::HEADER: {
-
932  seq_reader.readline_file_append(seq_reader.reader_record->header);
-
933  seq_reader.read_stage = ReadStage::SEQ;
-
934  }
-
935  // fall through
-
936  case ReadStage::SEQ: {
-
937  seq_reader.readline_file_append(seq_reader.reader_record->seq);
-
938  seq_reader.read_stage = ReadStage::SEP;
+
867  default: {
+
868  log_error("SeqReader has entered an invalid state.");
+
869  std::exit(EXIT_FAILURE);
+
870  }
+
871  }
+
872  return false;
+
873  }
+
874 };
+
875 
+
876 struct SeqReader::read_sam_buffer
+
877 {
+
878  bool operator()(SeqReader& seq_reader)
+
879  {
+
880  READ_SAM( // NOLINT
+
881  if (!seq_reader.readline_buffer_append( // NOLINT
+
882  seq_reader.tmp)) { return false; }, // NOLINT
+
883  seq_reader.tmp.clear(); // NOLINT
+
884  return true; // NOLINT
+
885  ,
+
886  if (seq_reader.buffer_start >= seq_reader.buffer_end) {
+
887  return false;
+
888  }) // NOLINT
+
889  }
+
890 };
+
891 
+
892 struct SeqReader::read_gfa2_buffer
+
893 {
+
894  bool operator()(SeqReader& seq_reader)
+
895  {
+
896  READ_GFA2( // NOLINT
+
897  if (!seq_reader.readline_buffer_append( // NOLINT
+
898  seq_reader.tmp)) { return false; }, // NOLINT
+
899  seq_reader.tmp.clear(); // NOLINT
+
900  return true; // NOLINT
+
901  ,
+
902  if (seq_reader.buffer_start >= seq_reader.buffer_end) {
+
903  return false;
+
904  }) // NOLINT
+
905  }
+
906 };
+
907 
+
908 struct SeqReader::read_fasta_transition
+
909 {
+
910  void operator()(SeqReader& seq_reader)
+
911  {
+
912  switch (seq_reader.read_stage) {
+
913  case ReadStage::HEADER: {
+
914  seq_reader.readline_file_append(seq_reader.reader_record->header);
+
915  seq_reader.read_stage = ReadStage::SEQ;
+
916  }
+
917  // fall through
+
918  case ReadStage::SEQ: {
+
919  seq_reader.readline_file_append(seq_reader.reader_record->seq);
+
920  seq_reader.read_stage = ReadStage::HEADER;
+
921  return;
+
922  }
+
923  default: {
+
924  log_error("SeqReader has entered an invalid state.");
+
925  std::exit(EXIT_FAILURE);
+
926  }
+
927  }
+
928  }
+
929 };
+
930 
+
931 struct SeqReader::read_fastq_transition
+
932 {
+
933  void operator()(SeqReader& seq_reader)
+
934  {
+
935  switch (seq_reader.read_stage) {
+
936  case ReadStage::HEADER: {
+
937  seq_reader.readline_file_append(seq_reader.reader_record->header);
+
938  seq_reader.read_stage = ReadStage::SEQ;
939  }
940  // fall through
-
941  case ReadStage::SEP: {
-
942  seq_reader.readline_file_append(seq_reader.tmp);
-
943  seq_reader.read_stage = ReadStage::QUAL;
-
944  seq_reader.tmp.clear();
-
945  }
-
946  // fall through
-
947  case ReadStage::QUAL: {
-
948  seq_reader.readline_file_append(seq_reader.reader_record->qual);
-
949  seq_reader.read_stage = ReadStage::HEADER;
+
941  case ReadStage::SEQ: {
+
942  seq_reader.readline_file_append(seq_reader.reader_record->seq);
+
943  seq_reader.read_stage = ReadStage::SEP;
+
944  }
+
945  // fall through
+
946  case ReadStage::SEP: {
+
947  seq_reader.readline_file_append(seq_reader.tmp);
+
948  seq_reader.read_stage = ReadStage::QUAL;
+
949  seq_reader.tmp.clear();
950  }
-
951  }
-
952  }
-
953 };
-
954 
-
955 struct SeqReader::read_sam_transition
-
956 {
-
957  void operator()(SeqReader& seq_reader)
-
958  {
-
959  READ_SAM( // NOLINT
-
960  seq_reader.readline_file_append(seq_reader.tmp); // NOLINT
-
961  , , if (bool(feof(seq_reader.source))) { break; }) // NOLINT
+
951  // fall through
+
952  case ReadStage::QUAL: {
+
953  seq_reader.readline_file_append(seq_reader.reader_record->qual);
+
954  seq_reader.read_stage = ReadStage::HEADER;
+
955  return;
+
956  }
+
957  default: {
+
958  log_error("SeqReader has entered an invalid state.");
+
959  std::exit(EXIT_FAILURE);
+
960  }
+
961  }
962  }
963 };
964 
-
965 struct SeqReader::read_gfa2_transition
+
965 struct SeqReader::read_sam_transition
966 {
967  void operator()(SeqReader& seq_reader)
968  {
-
969  READ_GFA2( // NOLINT
+
969  READ_SAM( // NOLINT
970  seq_reader.readline_file_append(seq_reader.tmp); // NOLINT
971  , , if (bool(feof(seq_reader.source))) { break; }) // NOLINT
972  }
973 };
974 
-
975 struct SeqReader::read_fasta_file
+
975 struct SeqReader::read_gfa2_transition
976 {
977  void operator()(SeqReader& seq_reader)
978  {
-
979  seq_reader.readline_file(seq_reader.reader_record->header);
-
980  seq_reader.readline_file(seq_reader.reader_record->seq);
-
981  }
-
982 };
-
983 
-
984 struct SeqReader::read_fastq_file
-
985 {
-
986  void operator()(SeqReader& seq_reader)
-
987  {
-
988  seq_reader.readline_file(seq_reader.reader_record->header);
-
989  seq_reader.readline_file(seq_reader.reader_record->seq);
-
990  seq_reader.readline_file(seq_reader.tmp);
-
991  seq_reader.readline_file(seq_reader.reader_record->qual);
-
992  }
-
993 };
-
994 
-
995 struct SeqReader::read_sam_file
-
996 {
-
997  void operator()(SeqReader& seq_reader)
-
998  {
-
999  READ_SAM( // NOLINT
-
1000  seq_reader.readline_file(seq_reader.tmp); // NOLINT
-
1001  , , if (bool(feof(seq_reader.source))) { break; }) // NOLINT
+
979  READ_GFA2( // NOLINT
+
980  seq_reader.readline_file_append(seq_reader.tmp); // NOLINT
+
981  , , if (bool(feof(seq_reader.source))) { break; }) // NOLINT
+
982  }
+
983 };
+
984 
+
985 struct SeqReader::read_fasta_file
+
986 {
+
987  void operator()(SeqReader& seq_reader)
+
988  {
+
989  seq_reader.readline_file(seq_reader.reader_record->header);
+
990  seq_reader.readline_file(seq_reader.reader_record->seq);
+
991  }
+
992 };
+
993 
+
994 struct SeqReader::read_fastq_file
+
995 {
+
996  void operator()(SeqReader& seq_reader)
+
997  {
+
998  seq_reader.readline_file(seq_reader.reader_record->header);
+
999  seq_reader.readline_file(seq_reader.reader_record->seq);
+
1000  seq_reader.readline_file(seq_reader.tmp);
+
1001  seq_reader.readline_file(seq_reader.reader_record->qual);
1002  }
1003 };
1004 
-
1005 struct SeqReader::read_gfa2_file
+
1005 struct SeqReader::read_sam_file
1006 {
1007  void operator()(SeqReader& seq_reader)
1008  {
-
1009  READ_GFA2( // NOLINT
+
1009  READ_SAM( // NOLINT
1010  seq_reader.readline_file(seq_reader.tmp); // NOLINT
1011  , , if (bool(feof(seq_reader.source))) { break; }) // NOLINT
1012  }
1013 };
-
1015 
-
1016 template<typename F>
-
1017 inline void
-
1018 SeqReader::read_from_buffer(
-
1019  F f,
-
1020  OrderQueueSPSC<RecordCString, RECORD_QUEUE_SIZE, RECORD_BLOCK_SIZE>::Block&
-
1021  records,
-
1022  size_t& counter)
-
1023 {
-
1024  for (; buffer_start < buffer_end && !reader_end;) {
-
1025  reader_record = &(records.data[records.count]);
-
1026  if (!f(*this) || reader_record->seq.empty()) {
-
1027  break;
-
1028  }
-
1029  records.count++;
-
1030  if (records.count == RECORD_BLOCK_SIZE) {
-
1031  records.current = 0;
-
1032  records.num = counter++;
-
1033  cstring_queue.write(records);
-
1034  records.current = 0;
-
1035  records.count = 0;
-
1036  }
-
1037  }
-
1038 }
-
1039 
-
1040 template<typename F>
-
1041 inline void
-
1042 SeqReader::read_transition(
-
1043  F f,
-
1044  OrderQueueSPSC<RecordCString, RECORD_QUEUE_SIZE, RECORD_BLOCK_SIZE>::Block&
-
1045  records,
-
1046  size_t& counter)
-
1047 {
-
1048  if (std::ferror(source) == 0 && std::feof(source) == 0 && !reader_end) {
-
1049  int p = std::fgetc(source);
-
1050  if (p != EOF) {
-
1051  std::ungetc(p, source);
-
1052  reader_record = &(records.data[records.count]);
-
1053  f(*this);
-
1054  if (!reader_record->seq.empty()) {
-
1055  records.count++;
-
1056  if (records.count == RECORD_BLOCK_SIZE) {
-
1057  records.current = 0;
-
1058  records.num = counter++;
-
1059  cstring_queue.write(records);
-
1060  records.current = 0;
-
1061  records.count = 0;
-
1062  }
-
1063  }
-
1064  }
-
1065  }
-
1066 }
-
1067 
-
1068 template<typename F>
-
1069 inline void
-
1070 SeqReader::read_from_file(
-
1071  F f,
-
1072  OrderQueueSPSC<RecordCString, RECORD_QUEUE_SIZE, RECORD_BLOCK_SIZE>::Block&
-
1073  records,
-
1074  size_t& counter)
-
1075 {
-
1076  for (; std::ferror(source) == 0 && std::feof(source) == 0 && !reader_end;) {
-
1077  reader_record = &(records.data[records.count]);
-
1078  f(*this);
-
1079  if (reader_record->seq.empty()) {
-
1080  break;
-
1081  }
-
1082  records.count++;
-
1083  if (records.count == RECORD_BLOCK_SIZE) {
-
1084  records.current = 0;
-
1085  records.num = counter++;
-
1086  cstring_queue.write(records);
-
1087  records.current = 0;
-
1088  records.count = 0;
-
1089  }
-
1090  }
-
1091 }
-
1092 
-
1093 inline void
-
1094 SeqReader::start_reader()
-
1095 {
-
1096  reader_thread = std::unique_ptr<std::thread>(new std::thread([this]() {
-
1097  {
-
1098  std::unique_lock<std::mutex> lock(format_mutex);
-
1099  determine_format();
-
1100  format_cv.notify_all();
-
1101  }
+
1014 
+
1015 struct SeqReader::read_gfa2_file
+
1016 {
+
1017  void operator()(SeqReader& seq_reader)
+
1018  {
+
1019  READ_GFA2( // NOLINT
+
1020  seq_reader.readline_file(seq_reader.tmp); // NOLINT
+
1021  , , if (bool(feof(seq_reader.source))) { break; }) // NOLINT
+
1022  }
+
1023 };
+
1025 
+
1026 template<typename F>
+
1027 inline void
+
1028 SeqReader::read_from_buffer(
+
1029  F f,
+
1030  OrderQueueSPSC<RecordCString, RECORD_QUEUE_SIZE, RECORD_BLOCK_SIZE>::Block&
+
1031  records,
+
1032  size_t& counter)
+
1033 {
+
1034  for (; buffer_start < buffer_end && !reader_end;) {
+
1035  reader_record = &(records.data[records.count]);
+
1036  if (!f(*this) || reader_record->seq.empty()) {
+
1037  break;
+
1038  }
+
1039  records.count++;
+
1040  if (records.count == RECORD_BLOCK_SIZE) {
+
1041  records.current = 0;
+
1042  records.num = counter++;
+
1043  cstring_queue.write(records);
+
1044  records.current = 0;
+
1045  records.count = 0;
+
1046  }
+
1047  }
+
1048 }
+
1049 
+
1050 template<typename F>
+
1051 inline void
+
1052 SeqReader::read_transition(
+
1053  F f,
+
1054  OrderQueueSPSC<RecordCString, RECORD_QUEUE_SIZE, RECORD_BLOCK_SIZE>::Block&
+
1055  records,
+
1056  size_t& counter)
+
1057 {
+
1058  if (std::ferror(source) == 0 && std::feof(source) == 0 && !reader_end) {
+
1059  int p = std::fgetc(source);
+
1060  if (p != EOF) {
+
1061  std::ungetc(p, source);
+
1062  reader_record = &(records.data[records.count]);
+
1063  f(*this);
+
1064  if (!reader_record->seq.empty()) {
+
1065  records.count++;
+
1066  if (records.count == RECORD_BLOCK_SIZE) {
+
1067  records.current = 0;
+
1068  records.num = counter++;
+
1069  cstring_queue.write(records);
+
1070  records.current = 0;
+
1071  records.count = 0;
+
1072  }
+
1073  }
+
1074  }
+
1075  }
+
1076 }
+
1077 
+
1078 template<typename F>
+
1079 inline void
+
1080 SeqReader::read_from_file(
+
1081  F f,
+
1082  OrderQueueSPSC<RecordCString, RECORD_QUEUE_SIZE, RECORD_BLOCK_SIZE>::Block&
+
1083  records,
+
1084  size_t& counter)
+
1085 {
+
1086  for (; std::ferror(source) == 0 && std::feof(source) == 0 && !reader_end;) {
+
1087  reader_record = &(records.data[records.count]);
+
1088  f(*this);
+
1089  if (reader_record->seq.empty()) {
+
1090  break;
+
1091  }
+
1092  records.count++;
+
1093  if (records.count == RECORD_BLOCK_SIZE) {
+
1094  records.current = 0;
+
1095  records.num = counter++;
+
1096  cstring_queue.write(records);
+
1097  records.current = 0;
+
1098  records.count = 0;
+
1099  }
+
1100  }
+
1101 }
1102 
-
1103  size_t counter = 0;
-
1104  decltype(cstring_queue)::Block records;
-
1105  switch (format) {
-
1106  case Format::FASTA: {
-
1107  read_from_buffer(read_fasta_buffer(), records, counter);
-
1108  read_transition(read_fasta_transition(), records, counter);
-
1109  read_from_file(read_fasta_file(), records, counter);
-
1110  break;
-
1111  }
-
1112  case Format::FASTQ: {
-
1113  read_from_buffer(read_fastq_buffer(), records, counter);
-
1114  read_transition(read_fastq_transition(), records, counter);
-
1115  read_from_file(read_fastq_file(), records, counter);
-
1116  break;
-
1117  }
-
1118  case Format::SAM: {
-
1119  read_from_buffer(read_sam_buffer(), records, counter);
-
1120  read_transition(read_sam_transition(), records, counter);
-
1121  read_from_file(read_sam_file(), records, counter);
-
1122  break;
-
1123  }
-
1124  case Format::GFA2: {
-
1125  read_from_buffer(read_gfa2_buffer(), records, counter);
-
1126  read_transition(read_gfa2_transition(), records, counter);
-
1127  read_from_file(read_gfa2_file(), records, counter);
-
1128  break;
-
1129  }
-
1130  default: {
-
1131  break;
-
1132  }
-
1133  }
-
1134 
-
1135  reader_end = true;
-
1136  if (records.count > 0) {
-
1137  records.current = 0;
-
1138  records.num = counter++;
-
1139  cstring_queue.write(records);
-
1140  }
-
1141  for (unsigned i = 0; i < threads; i++) {
-
1142  decltype(cstring_queue)::Block dummy;
-
1143  dummy.num = counter++;
-
1144  dummy.current = 0;
-
1145  dummy.count = 0;
-
1146  cstring_queue.write(dummy);
-
1147  }
-
1148  }));
-
1149 }
-
1150 
-
1151 inline void
-
1152 SeqReader::start_processor()
-
1153 {
-
1154  processor_threads.reserve(threads);
-
1155  for (unsigned i = 0; i < threads; i++) {
-
1156  processor_threads.push_back(
-
1157  std::unique_ptr<std::thread>(new std::thread([this]() {
-
1158  decltype(cstring_queue)::Block records_in;
-
1159  decltype(output_queue)::Block records_out;
-
1160  for (;;) {
-
1161  cstring_queue.read(records_in);
-
1162  for (size_t i = 0; i < records_in.count; i++) {
-
1163  records_out.data[i].seq = std::string(records_in.data[i].seq.s,
-
1164  records_in.data[i].seq.size);
-
1165  auto& seq = records_out.data[i].seq;
-
1166  if (!seq.empty() && seq.back() == '\n') {
-
1167  seq.pop_back();
-
1168  }
-
1169 
-
1170  records_out.data[i].qual = std::string(
-
1171  records_in.data[i].qual.s, records_in.data[i].qual.size);
-
1172  auto& qual = records_out.data[i].qual;
-
1173  if (!qual.empty() && qual.back() == '\n') {
-
1174  qual.pop_back();
-
1175  }
-
1176 
-
1177  char* space = std::strstr(records_in.data[i].header, " ");
-
1178  size_t name_start =
-
1179  (format == Format::FASTA || format == Format::FASTQ) ? 1 : 0;
-
1180  if (space == nullptr) {
-
1181  records_out.data[i].name =
-
1182  std::string(records_in.data[i].header.s + name_start,
-
1183  records_in.data[i].header.size - name_start);
-
1184  records_out.data[i].comment = "";
-
1185  } else {
-
1186  records_out.data[i].name =
-
1187  std::string(records_in.data[i].header.s + name_start,
-
1188  space - records_in.data[i].header.s - name_start);
-
1189  records_out.data[i].comment =
-
1190  std::string(space + 1,
-
1191  records_in.data[i].header.size -
-
1192  (space - records_in.data[i].header.s) - 1);
-
1193  }
-
1194  records_in.data[i].header.clear();
-
1195  auto& name = records_out.data[i].name;
-
1196  auto& comment = records_out.data[i].comment;
-
1197  if (!name.empty() && name.back() == '\n') {
-
1198  name.pop_back();
-
1199  }
-
1200  if (!comment.empty() && comment.back() == '\n') {
-
1201  comment.pop_back();
-
1202  }
-
1203  if (trim_masked()) {
-
1204  const auto len = seq.length();
-
1205  size_t trim_start = 0, trim_end = seq.length();
-
1206  while (trim_start <= len && bool(islower(seq[trim_start]))) {
-
1207  trim_start++;
-
1208  }
-
1209  while (trim_end > 0 && bool(islower(seq[trim_end - 1]))) {
-
1210  trim_end--;
-
1211  }
-
1212  seq.erase(trim_end);
-
1213  seq.erase(0, trim_start);
-
1214  if (!qual.empty()) {
-
1215  qual.erase(trim_end);
-
1216  qual.erase(0, trim_start);
-
1217  }
-
1218  }
-
1219  if (fold_case()) {
-
1220  for (auto& c : seq) {
-
1221  char old = c;
-
1222  c = CAPITALS[unsigned(c)];
-
1223  if (!bool(c)) {
-
1224  log_error(std::string("A sequence contains invalid "
-
1225  "IUPAC character: ") +
-
1226  old);
-
1227  std::exit(EXIT_FAILURE);
-
1228  }
-
1229  }
-
1230  }
-
1231  records_out.data[i].num = records_in.num * RECORD_BLOCK_SIZE + i;
-
1232  }
-
1233  records_out.count = records_in.count;
-
1234  records_out.current = records_in.current;
-
1235  records_out.num = records_in.num;
-
1236  if (records_out.count == 0) {
-
1237  output_queue.write(records_out);
-
1238  break;
-
1239  }
-
1240  output_queue.write(records_out);
-
1241  }
-
1242  })));
-
1243  }
-
1244 }
-
1245 
-
1246 inline SeqReader::Record
-
1247 SeqReader::read()
-
1248 {
-
1249  auto& ready_records = ready_records_array()[id % MAX_SIMULTANEOUS_SEQREADERS];
-
1250  if (ready_records.count <= ready_records.current) {
-
1251  output_queue.read(ready_records);
-
1252  if (ready_records.count <= ready_records.current) {
-
1253  close();
-
1254  ready_records = decltype(output_queue)::Block();
-
1255  return Record();
-
1256  }
-
1257  }
-
1258  return std::move(ready_records.data[ready_records.current++]);
-
1259 }
-
1260 
-
1261 } // namespace btllib
-
1262 
-
1263 #endif
+
1103 inline void
+
1104 SeqReader::start_reader()
+
1105 {
+
1106  reader_thread = std::unique_ptr<std::thread>(new std::thread([this]() {
+
1107  {
+
1108  std::unique_lock<std::mutex> lock(format_mutex);
+
1109  determine_format();
+
1110  format_cv.notify_all();
+
1111  }
+
1112 
+
1113  size_t counter = 0;
+
1114  decltype(cstring_queue)::Block records;
+
1115  switch (format) {
+
1116  case Format::FASTA: {
+
1117  read_from_buffer(read_fasta_buffer(), records, counter);
+
1118  read_transition(read_fasta_transition(), records, counter);
+
1119  read_from_file(read_fasta_file(), records, counter);
+
1120  break;
+
1121  }
+
1122  case Format::FASTQ: {
+
1123  read_from_buffer(read_fastq_buffer(), records, counter);
+
1124  read_transition(read_fastq_transition(), records, counter);
+
1125  read_from_file(read_fastq_file(), records, counter);
+
1126  break;
+
1127  }
+
1128  case Format::SAM: {
+
1129  read_from_buffer(read_sam_buffer(), records, counter);
+
1130  read_transition(read_sam_transition(), records, counter);
+
1131  read_from_file(read_sam_file(), records, counter);
+
1132  break;
+
1133  }
+
1134  case Format::GFA2: {
+
1135  read_from_buffer(read_gfa2_buffer(), records, counter);
+
1136  read_transition(read_gfa2_transition(), records, counter);
+
1137  read_from_file(read_gfa2_file(), records, counter);
+
1138  break;
+
1139  }
+
1140  default: {
+
1141  break;
+
1142  }
+
1143  }
+
1144 
+
1145  reader_end = true;
+
1146  if (records.count > 0) {
+
1147  records.current = 0;
+
1148  records.num = counter++;
+
1149  cstring_queue.write(records);
+
1150  }
+
1151  for (unsigned i = 0; i < threads; i++) {
+
1152  decltype(cstring_queue)::Block dummy;
+
1153  dummy.num = counter++;
+
1154  dummy.current = 0;
+
1155  dummy.count = 0;
+
1156  cstring_queue.write(dummy);
+
1157  }
+
1158  }));
+
1159 }
+
1160 
+
1161 inline void
+
1162 SeqReader::start_processor()
+
1163 {
+
1164  processor_threads.reserve(threads);
+
1165  for (unsigned i = 0; i < threads; i++) {
+
1166  processor_threads.push_back(
+
1167  std::unique_ptr<std::thread>(new std::thread([this]() {
+
1168  decltype(cstring_queue)::Block records_in;
+
1169  decltype(output_queue)::Block records_out;
+
1170  for (;;) {
+
1171  cstring_queue.read(records_in);
+
1172  for (size_t i = 0; i < records_in.count; i++) {
+
1173  records_out.data[i].seq = std::string(records_in.data[i].seq.s,
+
1174  records_in.data[i].seq.size);
+
1175  auto& seq = records_out.data[i].seq;
+
1176  if (!seq.empty() && seq.back() == '\n') {
+
1177  seq.pop_back();
+
1178  }
+
1179 
+
1180  records_out.data[i].qual = std::string(
+
1181  records_in.data[i].qual.s, records_in.data[i].qual.size);
+
1182  auto& qual = records_out.data[i].qual;
+
1183  if (!qual.empty() && qual.back() == '\n') {
+
1184  qual.pop_back();
+
1185  }
+
1186 
+
1187  char* space = std::strstr(records_in.data[i].header, " ");
+
1188  size_t name_start =
+
1189  (format == Format::FASTA || format == Format::FASTQ) ? 1 : 0;
+
1190  if (space == nullptr) {
+
1191  records_out.data[i].name =
+
1192  std::string(records_in.data[i].header.s + name_start,
+
1193  records_in.data[i].header.size - name_start);
+
1194  records_out.data[i].comment = "";
+
1195  } else {
+
1196  records_out.data[i].name =
+
1197  std::string(records_in.data[i].header.s + name_start,
+
1198  space - records_in.data[i].header.s - name_start);
+
1199  records_out.data[i].comment =
+
1200  std::string(space + 1,
+
1201  records_in.data[i].header.size -
+
1202  (space - records_in.data[i].header.s) - 1);
+
1203  }
+
1204  records_in.data[i].header.clear();
+
1205  auto& name = records_out.data[i].name;
+
1206  auto& comment = records_out.data[i].comment;
+
1207  if (!name.empty() && name.back() == '\n') {
+
1208  name.pop_back();
+
1209  }
+
1210  if (!comment.empty() && comment.back() == '\n') {
+
1211  comment.pop_back();
+
1212  }
+
1213  if (trim_masked()) {
+
1214  const auto len = seq.length();
+
1215  size_t trim_start = 0, trim_end = seq.length();
+
1216  while (trim_start <= len && bool(islower(seq[trim_start]))) {
+
1217  trim_start++;
+
1218  }
+
1219  while (trim_end > 0 && bool(islower(seq[trim_end - 1]))) {
+
1220  trim_end--;
+
1221  }
+
1222  seq.erase(trim_end);
+
1223  seq.erase(0, trim_start);
+
1224  if (!qual.empty()) {
+
1225  qual.erase(trim_end);
+
1226  qual.erase(0, trim_start);
+
1227  }
+
1228  }
+
1229  if (fold_case()) {
+
1230  for (auto& c : seq) {
+
1231  char old = c;
+
1232  c = CAPITALS[unsigned(c)];
+
1233  if (!bool(c)) {
+
1234  log_error(std::string("A sequence contains invalid "
+
1235  "IUPAC character: ") +
+
1236  old);
+
1237  std::exit(EXIT_FAILURE);
+
1238  }
+
1239  }
+
1240  }
+
1241  records_out.data[i].num = records_in.num * RECORD_BLOCK_SIZE + i;
+
1242  }
+
1243  records_out.count = records_in.count;
+
1244  records_out.current = records_in.current;
+
1245  records_out.num = records_in.num;
+
1246  if (records_out.count == 0) {
+
1247  output_queue.write(records_out);
+
1248  break;
+
1249  }
+
1250  output_queue.write(records_out);
+
1251  }
+
1252  })));
+
1253  }
+
1254 }
+
1255 
+
1256 inline SeqReader::Record
+
1257 SeqReader::read()
+
1258 {
+
1259  auto& ready_records = ready_records_array()[id % MAX_SIMULTANEOUS_SEQREADERS];
+
1260  if (ready_records.count <= ready_records.current) {
+
1261  output_queue.read(ready_records);
+
1262  if (ready_records.count <= ready_records.current) {
+
1263  close();
+
1264  ready_records = decltype(output_queue)::Block();
+
1265  return Record();
+
1266  }
+
1267  }
+
1268  return std::move(ready_records.data[ready_records.current++]);
+
1269 }
+
1270 
+
1271 } // namespace btllib
+
1272 
+
1273 #endif
Definition: seq_reader.hpp:72
-
Record read()
Definition: seq_reader.hpp:1247
+
Record read()
Definition: seq_reader.hpp:1257
static const unsigned NO_TRIM_MASKED
Definition: seq_reader.hpp:45
Definition: seq_reader.hpp:32
static const unsigned FOLD_CASE
Definition: seq_reader.hpp:41
diff --git a/docs/seq_reader_8cpp-example.html b/docs/seq_reader_8cpp-example.html index 791a1114..b6080d5a 100644 --- a/docs/seq_reader_8cpp-example.html +++ b/docs/seq_reader_8cpp-example.html @@ -82,7 +82,7 @@
}
Definition: seq_reader.hpp:72
-
Record read()
Definition: seq_reader.hpp:1247
+
Record read()
Definition: seq_reader.hpp:1257
Definition: seq_reader.hpp:32