71 unsigned threads = 3);
81 void close() noexcept;
97 friend std::ostream& operator<<(std::ostream& os,
const Format f)
99 return os << static_cast<int32_t>(f);
102 Format get_format()
const {
return format; }
106 size_t num = std::numeric_limits<size_t>::max();
112 operator bool()
const {
return !seq.empty(); }
121 static const size_t MAX_SIMULTANEOUS_SEQREADERS = 256;
128 void operator++() { record = reader.read(); }
129 bool operator!=(
const RecordIterator& i)
131 return bool(record) || bool(i.record);
133 Record operator*() {
return std::move(record); }
137 auto val = operator*();
145 RecordIterator(SeqReader& reader,
bool end)
158 RecordIterator
begin() {
return RecordIterator(*
this,
false); }
159 RecordIterator end() {
return RecordIterator(*
this,
true); }
161 size_t get_buffer_size()
const {
return buffer_size; }
162 size_t get_block_size()
const {
return block_size; }
164 static const size_t SHORT_MODE_BUFFER_SIZE = 32;
165 static const size_t SHORT_MODE_BLOCK_SIZE = 32;
167 static const size_t LONG_MODE_BUFFER_SIZE = 4;
168 static const size_t LONG_MODE_BLOCK_SIZE = 1;
170 static const size_t FORMAT_BUFFER_SIZE = 16384;
177 : data(FORMAT_BUFFER_SIZE)
181 std::vector<char> data;
184 bool eof_newline_inserted =
false;
194 const std::string& source_path;
196 const unsigned flags;
197 const unsigned threads;
198 Format format = Format::UNDETERMINED;
199 std::atomic<bool> closed{
false };
201 std::unique_ptr<std::thread> reader_thread;
202 std::vector<std::unique_ptr<std::thread>> processor_threads;
203 std::mutex format_mutex;
204 std::condition_variable format_cv;
205 std::atomic<bool> reader_end{
false };
206 RecordCString* reader_record =
nullptr;
207 const std::atomic<size_t> buffer_size;
208 const std::atomic<size_t> block_size;
209 OrderQueueSPMC<RecordCString> cstring_queue;
210 OrderQueueMPMC<Record> output_queue;
211 std::atomic<size_t> dummy_block_num{ 0 };
215 thread_local static std::unique_ptr<
decltype(output_queue)::Block>
217 ready_records_array[MAX_SIMULTANEOUS_SEQREADERS];
220 thread_local static long ready_records_owners[MAX_SIMULTANEOUS_SEQREADERS];
223 thread_local static size_t ready_records_current[MAX_SIMULTANEOUS_SEQREADERS];
226 static std::atomic<long> last_id;
229 void determine_format();
231 void start_processors();
234 bool readline_buffer_append(CString& s);
235 static void readline_file(CString& s, FILE* f);
236 void readline_file_append(CString& s, FILE* f);
237 static bool file_at_end(FILE* f);
239 int ungetc_buffer(
int c);
241 void update_cstring_records(OrderQueueSPMC<RecordCString>::Block& records,
245 template<
typename Module>
246 void read_from_buffer(Module& module,
247 OrderQueueSPMC<RecordCString>::Block& records,
250 template<
typename Module>
251 void read_transition(Module& module,
252 OrderQueueSPMC<RecordCString>::Block& records,
255 template<
typename Module>
256 void read_from_file(Module& module,
257 OrderQueueSPMC<RecordCString>::Block& records,
261 friend class SeqReaderFastaModule;
262 SeqReaderFastaModule fasta_module;
264 friend class SeqReaderMultilineFastaModule;
265 SeqReaderMultilineFastaModule multiline_fasta_module;
267 friend class SeqReaderFastqModule;
268 SeqReaderFastqModule fastq_module;
270 friend class SeqReaderMultilineFastqModule;
271 SeqReaderMultilineFastqModule multiline_fastq_module;
273 friend class SeqReaderSamModule;
274 SeqReaderSamModule sam_module;
276 int module_in_use = 0;