btllib
Loading...
Searching...
No Matches
seq_reader.hpp
1#ifndef BTLLIB_SEQ_READER_HPP
2#define BTLLIB_SEQ_READER_HPP
3
4#include "btllib/cstring.hpp"
5#include "btllib/data_stream.hpp"
6#include "btllib/order_queue.hpp"
7#include "btllib/seq.hpp"
8#include "btllib/seq_reader_fasta_module.hpp"
9#include "btllib/seq_reader_fastq_module.hpp"
10#include "btllib/seq_reader_multiline_fasta_module.hpp"
11#include "btllib/seq_reader_multiline_fastq_module.hpp"
12#include "btllib/seq_reader_sam_module.hpp"
13#include "btllib/status.hpp"
14
15#include <atomic>
16#include <cctype>
17#include <condition_variable>
18#include <cstdio>
19#include <cstdlib>
20#include <cstring>
21#include <limits>
22#include <memory>
23#include <mutex>
24#include <stack>
25#include <string>
26#include <thread>
27#include <vector>
28
29namespace btllib {
30
42{
43public:
44 /* Has to be a struct and not an enum because:
45 * 1) Non-class enums are not name qualified and can collide
46 * 2) class enums can't be implicitly converted into integers
47 */
48 struct Flag
49 {
51 static const unsigned FOLD_CASE = 1;
54 static const unsigned TRIM_MASKED = 2;
56 static const unsigned SHORT_MODE = 4;
58 static const unsigned LONG_MODE = 8;
59 };
60
69 SeqReader(const std::string& source_path,
70 unsigned flags,
71 unsigned threads = 3);
72
73 SeqReader(const SeqReader&) = delete;
74 SeqReader(SeqReader&&) = delete;
75
76 SeqReader& operator=(const SeqReader&) = delete;
77 SeqReader& operator=(SeqReader&&) = delete;
78
79 ~SeqReader();
80
81 void close() noexcept;
82
83 bool fold_case() const { return bool(flags & Flag::FOLD_CASE); }
84 bool trim_masked() const { return bool(flags & Flag::TRIM_MASKED); }
85 bool short_mode() const { return bool(flags & Flag::SHORT_MODE); }
86 bool long_mode() const { return bool(flags & Flag::LONG_MODE); }
87
88 enum class Format
89 {
90 UNDETERMINED,
91 FASTA,
92 FASTQ,
93 SAM,
94 INVALID
95 };
96
97 friend std::ostream& operator<<(std::ostream& os, const Format f)
98 {
99 return os << static_cast<int32_t>(f);
100 }
101
102 Format get_format() const { return format; }
103
104 struct Record
105 {
106 size_t num = std::numeric_limits<size_t>::max();
107 std::string id;
108 std::string comment;
109 std::string seq;
110 std::string qual;
111
112 operator bool() const { return !seq.empty(); }
113 };
114
117
119 OrderQueueMPMC<Record>::Block read_block();
120
121 static const size_t MAX_SIMULTANEOUS_SEQREADERS = 256;
122
125 class RecordIterator
126 {
127 public:
128 void operator++() { record = reader.read(); }
129 bool operator!=(const RecordIterator& i)
130 {
131 return bool(record) || bool(i.record);
132 }
133 Record operator*() { return std::move(record); }
134 // For wrappers
135 Record next()
136 {
137 auto val = operator*();
138 operator++();
139 return val;
140 }
141
142 private:
143 friend SeqReader;
144
145 RecordIterator(SeqReader& reader, bool end)
146 : reader(reader)
147 {
148 if (!end) {
149 operator++();
150 }
151 }
152
153 SeqReader& reader;
154 Record record;
155 };
157
158 RecordIterator begin() { return RecordIterator(*this, false); }
159 RecordIterator end() { return RecordIterator(*this, true); }
160
161 size_t get_buffer_size() const { return buffer_size; }
162 size_t get_block_size() const { return block_size; }
163
164 static const size_t SHORT_MODE_BUFFER_SIZE = 32;
165 static const size_t SHORT_MODE_BLOCK_SIZE = 32;
166
167 static const size_t LONG_MODE_BUFFER_SIZE = 4;
168 static const size_t LONG_MODE_BLOCK_SIZE = 1;
169
170 static const size_t FORMAT_BUFFER_SIZE = 16384;
171
172private:
173 struct Buffer
174 {
175
176 Buffer()
177 : data(FORMAT_BUFFER_SIZE)
178 {
179 }
180
181 std::vector<char> data;
182 size_t start = 0;
183 size_t end = 0;
184 bool eof_newline_inserted = false;
185 };
186
187 struct RecordCString
188 {
189 CString header;
190 CString seq;
191 CString qual;
192 };
193
194 const std::string& source_path;
195 DataSource source;
196 const unsigned flags;
197 const unsigned threads;
198 Format format = Format::UNDETERMINED; // Format of the source file
199 std::atomic<bool> closed{ false };
200 Buffer buffer;
201 std::unique_ptr<std::thread> reader_thread;
202 std::vector<std::unique_ptr<std::thread>> processor_threads;
203 std::mutex format_mutex;
204 std::condition_variable format_cv;
205 std::atomic<bool> reader_end{ false };
206 RecordCString* reader_record = nullptr;
207 const std::atomic<size_t> buffer_size;
208 const std::atomic<size_t> block_size;
209 OrderQueueSPMC<RecordCString> cstring_queue;
210 OrderQueueMPMC<Record> output_queue;
211 std::atomic<size_t> dummy_block_num{ 0 };
212 const long id;
213
214 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
215 thread_local static std::unique_ptr<decltype(output_queue)::Block>
216 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
217 ready_records_array[MAX_SIMULTANEOUS_SEQREADERS];
218
219 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
220 thread_local static long ready_records_owners[MAX_SIMULTANEOUS_SEQREADERS];
221
222 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
223 thread_local static size_t ready_records_current[MAX_SIMULTANEOUS_SEQREADERS];
224
225 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
226 static std::atomic<long> last_id;
227
228 bool load_buffer();
229 void determine_format();
230 void start_reader();
231 void start_processors();
232
233 CString tmp;
234 bool readline_buffer_append(CString& s);
235 static void readline_file(CString& s, FILE* f);
236 void readline_file_append(CString& s, FILE* f);
237 static bool file_at_end(FILE* f);
238 int getc_buffer();
239 int ungetc_buffer(int c);
240
241 void update_cstring_records(OrderQueueSPMC<RecordCString>::Block& records,
242 size_t& counter);
243
245 template<typename Module>
246 void read_from_buffer(Module& module,
247 OrderQueueSPMC<RecordCString>::Block& records,
248 size_t& counter);
249
250 template<typename Module>
251 void read_transition(Module& module,
252 OrderQueueSPMC<RecordCString>::Block& records,
253 size_t& counter);
254
255 template<typename Module>
256 void read_from_file(Module& module,
257 OrderQueueSPMC<RecordCString>::Block& records,
258 size_t& counter);
260
261 friend class SeqReaderFastaModule;
262 SeqReaderFastaModule fasta_module;
263
264 friend class SeqReaderMultilineFastaModule;
265 SeqReaderMultilineFastaModule multiline_fasta_module;
266
267 friend class SeqReaderFastqModule;
268 SeqReaderFastqModule fastq_module;
269
270 friend class SeqReaderMultilineFastqModule;
271 SeqReaderMultilineFastqModule multiline_fastq_module;
272
273 friend class SeqReaderSamModule;
274 SeqReaderSamModule sam_module;
275
276 int module_in_use = 0;
277
278 void postprocess();
279};
280
281template<typename Module>
282inline void
283SeqReader::read_from_buffer(Module& module,
284 OrderQueueSPMC<RecordCString>::Block& records,
285 size_t& counter)
286{
287 while (!reader_end) {
288 reader_record = &(records.data[records.count]);
289 if (!module.read_buffer(*this, *reader_record) ||
290 reader_record->seq.empty()) {
291 break;
292 }
293 update_cstring_records(records, counter);
294 }
295}
296
297template<typename Module>
298inline void
299SeqReader::read_transition(Module& module,
300 OrderQueueSPMC<RecordCString>::Block& records,
301 size_t& counter)
302{
303 if (!reader_end) {
304 reader_record = &(records.data[records.count]);
305 module.read_transition(*this, *reader_record);
306 if (!reader_record->seq.empty()) {
307 update_cstring_records(records, counter);
308 }
309 } else if (reader_record != nullptr && !reader_record->seq.empty()) {
310 update_cstring_records(records, counter);
311 }
312}
313
314template<typename Module>
315inline void
316SeqReader::read_from_file(Module& module,
317 OrderQueueSPMC<RecordCString>::Block& records,
318 size_t& counter)
319{
320 while (!reader_end) {
321 reader_record = &(records.data[records.count]);
322 if (!module.read_file(*this, *reader_record) ||
323 reader_record->seq.empty()) {
324 break;
325 }
326 update_cstring_records(records, counter);
327 }
328}
329
330} // namespace btllib
331
332#endif
Definition seq_reader.hpp:42
SeqReader(const std::string &source_path, unsigned flags, unsigned threads=3)
OrderQueueMPMC< Record >::Block read_block()
RecordIterator begin()
Definition seq_reader.hpp:158
Definition aahash.hpp:12
Definition seq_reader.hpp:49
static const unsigned TRIM_MASKED
Definition seq_reader.hpp:54
static const unsigned FOLD_CASE
Definition seq_reader.hpp:51
static const unsigned SHORT_MODE
Definition seq_reader.hpp:56
static const unsigned LONG_MODE
Definition seq_reader.hpp:58
Definition seq_reader.hpp:105