btllib
Loading...
Searching...
No Matches
indexlr.hpp
1#ifndef BTLLIB_INDEXLR_HPP
2#define BTLLIB_INDEXLR_HPP
3
4#include "btllib/bloom_filter.hpp"
5#include "btllib/nthash.hpp"
6#include "btllib/order_queue.hpp"
7#include "btllib/seq_reader.hpp"
8#include "btllib/status.hpp"
9#include "btllib/util.hpp"
10
11#include <algorithm>
12#include <atomic>
13#include <cstdlib>
14#include <cstring>
15#include <functional>
16#include <iostream>
17#include <limits>
18#include <memory>
19#include <string>
20#include <thread>
21#include <vector>
22
23namespace btllib {
24
26{
27
28public:
29 /* Has to be a struct and not an enum because:
30 * 1) Non-class enums are not name qualified and can collide
31 * 2) class enums can't be implicitly converted into integers
32 */
33 struct Flag
34 {
36 static const unsigned NO_ID = 1;
38 static const unsigned BX = 2;
40 static const unsigned SEQ = 4;
43 static const unsigned FILTER_IN = 8;
48 static const unsigned FILTER_OUT = 16;
50 static const unsigned SHORT_MODE = 32;
52 static const unsigned LONG_MODE = 64;
54 static const unsigned QUAL = 128;
55 };
56
57 bool output_id() const { return bool(~flags & Flag::NO_ID); }
58 bool output_bx() const { return bool(flags & Flag::BX); }
59 bool output_seq() const { return bool(flags & Flag::SEQ); }
60 bool output_qual() const { return bool(flags & Flag::QUAL); }
61 bool filter_in() const { return bool(flags & Flag::FILTER_IN); }
62 bool filter_out() const { return bool(flags & Flag::FILTER_OUT); }
63 bool short_mode() const { return bool(flags & Flag::SHORT_MODE); }
64 bool long_mode() const { return bool(flags & Flag::LONG_MODE); }
65
66 struct Minimizer
67 {
68 Minimizer() = default;
69
70 Minimizer(uint64_t min_hash,
71 uint64_t out_hash,
72 size_t pos,
73 bool forward,
74 std::string seq)
75 : min_hash(min_hash)
76 , out_hash(out_hash)
77 , pos(pos)
78 , forward(forward)
79 , seq(std::move(seq))
80 {
81 }
82
83 Minimizer(uint64_t min_hash,
84 uint64_t out_hash,
85 size_t pos,
86 bool forward,
87 std::string seq,
88 std::string qual)
89 : min_hash(min_hash)
90 , out_hash(out_hash)
91 , pos(pos)
92 , forward(forward)
93 , seq(std::move(seq))
94 , qual(std::move(qual))
95 {
96 }
97
98 uint64_t min_hash = 0, out_hash = 0;
99 size_t pos = 0;
100 bool forward = false;
101 std::string seq;
102 std::string qual;
103 };
104
105 using HashedKmer = Minimizer;
106
107 struct Record
108 {
109 Record() {}
110
111 Record(size_t num,
112 std::string id,
113 std::string barcode,
114 size_t readlen,
115 std::vector<Minimizer> minimizers)
116 : num(num)
117 , id(std::move(id))
118 , barcode(std::move(barcode))
119 , readlen(readlen)
120 , minimizers(std::move(minimizers))
121 {
122 }
123
124 size_t num = 0;
125 std::string id;
126 std::string barcode;
127 size_t readlen = 0;
128 std::vector<Minimizer> minimizers;
129
130 operator bool() const
131 {
132 return !id.empty() || !barcode.empty() || !minimizers.empty();
133 }
134 };
135
140 Record read();
141
161 Indexlr(std::string seqfile,
162 size_t k,
163 size_t w,
164 unsigned flags = 0,
165 unsigned threads = 5,
166 bool verbose = false,
167 const btllib::BloomFilter& bf1 = Indexlr::dummy_bf(),
168 const btllib::BloomFilter& bf2 = Indexlr::dummy_bf());
169
170 Indexlr(std::string seqfile,
171 size_t k,
172 size_t w,
173 size_t q,
174 unsigned flags = 0,
175 unsigned threads = 5,
176 bool verbose = false,
177 const btllib::BloomFilter& bf1 = Indexlr::dummy_bf(),
178 const btllib::BloomFilter& bf2 = Indexlr::dummy_bf());
179
180 ~Indexlr();
181
182 void close() noexcept;
183
184 static const size_t MAX_SIMULTANEOUS_INDEXLRS = 256;
185
188 class RecordIterator
189 {
190 public:
191 void operator++() { record = indexlr.read(); }
192 bool operator!=(const RecordIterator& i)
193 {
194 return bool(record) || bool(i.record);
195 }
196 Record operator*() { return std::move(record); }
197 // For wrappers
198 Record next()
199 {
200 auto val = operator*();
201 operator++();
202 return val;
203 }
204
205 private:
206 friend Indexlr;
207
208 RecordIterator(Indexlr& indexlr, bool end)
209 : indexlr(indexlr)
210 {
211 if (!end) {
212 operator++();
213 }
214 }
215
216 Indexlr& indexlr;
217 Record record;
218 };
220
221 RecordIterator begin() { return RecordIterator(*this, false); }
222 RecordIterator end() { return RecordIterator(*this, true); }
223
224private:
225 static std::string extract_barcode(const std::string& id,
226 const std::string& comment);
227 static void filter_hashed_kmer(Indexlr::HashedKmer& hk,
228 bool filter_in,
229 bool filter_out,
230 const BloomFilter& filter_in_bf,
231 const BloomFilter& filter_out_bf);
232
233 static void filter_kmer_qual(Indexlr::HashedKmer& hk,
234 const std::string& kmer_qual,
235 size_t q);
236 static size_t calc_kmer_quality(const std::string& qual);
237
238 static void calc_minimizer(
239 const std::vector<Indexlr::HashedKmer>& hashed_kmers_buffer,
240 const Indexlr::Minimizer*& min_current,
241 size_t idx,
242 ssize_t& min_idx_left,
243 ssize_t& min_idx_right,
244 ssize_t& min_pos_prev,
245 size_t w,
246 std::vector<Indexlr::Minimizer>& minimizers);
247 std::vector<Minimizer> minimize(const std::string& seq,
248 const std::string& qual) const;
249
250 const std::string seqfile;
251 const size_t k, w;
252 size_t q;
253 const unsigned flags;
254 const bool verbose;
255 const long id;
256 std::atomic<bool> closed{ false };
257
258 static const BloomFilter& dummy_bf()
259 {
260 static const BloomFilter var;
261 return var;
262 }
263
264 const std::reference_wrapper<const BloomFilter> filter_in_bf;
265 const std::reference_wrapper<const BloomFilter> filter_out_bf;
266 bool filter_in_enabled;
267 bool filter_out_enabled;
268
269 SeqReader reader;
270 OrderQueueMPSC<Record> output_queue;
271
272 using OutputQueueType = decltype(output_queue);
273 static std::unique_ptr<OutputQueueType::Block>* ready_blocks_array()
274 {
275 thread_local static std::unique_ptr<decltype(output_queue)::Block>
276 var[MAX_SIMULTANEOUS_INDEXLRS];
277 return var;
278 }
279
280 static long* ready_blocks_owners()
281 {
282 thread_local static long var[MAX_SIMULTANEOUS_INDEXLRS] = { 0 };
283 return var;
284 }
285
286 static size_t* ready_blocks_current()
287 {
288 thread_local static size_t var[MAX_SIMULTANEOUS_INDEXLRS] = { 0 };
289 return var;
290 }
291
292 static std::atomic<long>& last_id()
293 {
294 static std::atomic<long> var(0);
295 return var;
296 }
297
298 class Worker
299 {
300 public:
301 void start() { t = std::thread(do_work, this); }
302 void join() { t.join(); }
303 void set_id(const int id) { this->id = id; }
304
305 Worker& operator=(const Worker& worker) = delete;
306 Worker& operator=(Worker&& worker) = delete;
307
308 Worker(Indexlr& indexlr)
309 : indexlr(indexlr)
310 {
311 }
312 Worker(const Worker& worker)
313 : Worker(worker.indexlr)
314 {
315 }
316 Worker(Worker&& worker) noexcept
317 : Worker(worker.indexlr)
318 {
319 }
320
321 private:
322 void work();
323 static void do_work(Worker* worker) { worker->work(); }
324
325 int id = -1;
326 Indexlr& indexlr;
327 std::thread t;
328 };
329
330 std::vector<Worker> workers;
331 Barrier end_barrier;
332 std::mutex last_block_num_mutex;
333 uint64_t last_block_num = 0;
334 bool last_block_num_valid = false;
335};
336
337// Constructor for Indexlr class when q is specified
338inline Indexlr::Indexlr(std::string seqfile,
339 const size_t k,
340 const size_t w,
341 const size_t q,
342 const unsigned flags,
343 const unsigned threads,
344 const bool verbose,
345 const BloomFilter& bf1,
346 const BloomFilter& bf2)
347 : seqfile(std::move(seqfile))
348 , k(k)
349 , w(w)
350 , q(q)
351 , flags(flags)
352 , verbose(verbose)
353 , id(++last_id())
354 , filter_in_bf(filter_in() ? bf1 : Indexlr::dummy_bf())
355 , filter_out_bf(filter_out() ? filter_in() ? bf2 : bf1 : Indexlr::dummy_bf())
356 , filter_in_enabled(filter_in())
357 , filter_out_enabled(filter_out())
358 , reader(this->seqfile,
359 short_mode() ? SeqReader::Flag::SHORT_MODE
360 : SeqReader::Flag::LONG_MODE)
361 , output_queue(reader.get_buffer_size(), reader.get_block_size())
362 , workers(std::vector<Worker>(threads, Worker(*this)))
363 , end_barrier(threads)
364{
365 check_error(!short_mode() && !long_mode(),
366 "Indexlr: no mode selected, either short or long mode flag must "
367 "be provided.");
368 check_error(short_mode() && long_mode(),
369 "Indexlr: short and long mode are mutually exclusive.");
370 check_error(threads == 0,
371 "Indexlr: Number of processing threads cannot be 0.");
372 int id_counter = 0;
373 for (auto& worker : workers) {
374 worker.set_id(id_counter++);
375 worker.start();
376 }
377}
378
379// Constructor for Indexlr class when q is not specified
380inline Indexlr::Indexlr(std::string seqfile,
381 const size_t k,
382 const size_t w,
383 const unsigned flags,
384 const unsigned threads,
385 const bool verbose,
386 const BloomFilter& bf1,
387 const BloomFilter& bf2)
388 : Indexlr(std::move(seqfile), k, w, 0, flags, threads, verbose, bf1, bf2)
389{
390}
391
392inline Indexlr::~Indexlr()
393{
394 close();
395}
396
397inline void
398Indexlr::close() noexcept
399{
400 bool closed_expected = false;
401 if (closed.compare_exchange_strong(closed_expected, true)) {
402 try {
403 reader.close();
404 output_queue.close();
405 for (auto& worker : workers) {
406 worker.join();
407 }
408 } catch (const std::system_error& e) {
409 log_error("Indexlr thread join failure: " + std::string(e.what()));
410 std::exit(EXIT_FAILURE); // NOLINT(concurrency-mt-unsafe)
411 }
412 }
413}
414
415// Minimerize a sequence: Find the minimizers of a vector of hash values
416// representing a sequence.
417/* Algorithm
418v is a vector of non-negative integers
419w is the window size
420Invariants
421 0 < w <= v.size() - 1
422 0 <= l <= r <= v.size() - 1
423Initial conditions
424 M = NIL Final set of minimizers, empty initially
425 min = -1 Minimum element
426 i = -1 Index of minimum element
427 prev = -1 Index of previous minimum element
428 l = 0 Index of left end of window
429 r = l + w - 1 Index of right end of window
430Computation
431At each window, if the previous minimum is out of scope, find the new,
432right-most, minimum or else, check with only the right-most element to determine
433if that is the new minimum. A minimizer is added to the final vector only if
434it's index has changed. for each window of v bounded by [l, r] if (i < l) i =
435index of minimum element in [l, r], furthest from l. else if (v[r] <= v[i]) i =
436r min = v[i] if (i != prev) { prev = i M <- M + m
437 }
438 l = l + 1 Move window's left bound by one element
439 r = l + w - 1 Set window's right bound
440}*/
441
442inline std::string
443Indexlr::extract_barcode(const std::string& id, const std::string& comment)
444{
445 const static std::string barcode_prefix = "BX:Z:";
446 if (startswith(comment, barcode_prefix)) {
447 const auto space_pos = comment.find(' ');
448 if (space_pos != std::string::npos) {
449 return comment.substr(barcode_prefix.size(),
450 space_pos - barcode_prefix.size());
451 }
452 return comment.substr(barcode_prefix.size());
453 }
454 const auto pound_pos = id.find('#');
455 if (pound_pos != std::string::npos) {
456 const auto slash_pos = id.find('/');
457 if (slash_pos > pound_pos) {
458 return id.substr(pound_pos + 1, slash_pos - (pound_pos + 1));
459 }
460 }
461 return "NA";
462}
463
464inline void
465Indexlr::filter_hashed_kmer(Indexlr::HashedKmer& hk,
466 bool filter_in,
467 bool filter_out,
468 const BloomFilter& filter_in_bf,
469 const BloomFilter& filter_out_bf)
470{
471 if (filter_in && filter_out) {
472 std::vector<uint64_t> tmp;
473 tmp = { hk.min_hash };
474 if (!filter_in_bf.contains(tmp) || filter_out_bf.contains(tmp)) {
475 hk.min_hash = std::numeric_limits<uint64_t>::max();
476 }
477 } else if (filter_in) {
478 if (!filter_in_bf.contains({ hk.min_hash })) {
479 hk.min_hash = std::numeric_limits<uint64_t>::max();
480 }
481 } else if (filter_out) {
482 if (filter_out_bf.contains({ hk.min_hash })) {
483 hk.min_hash = std::numeric_limits<uint64_t>::max();
484 }
485 }
486}
487
488inline void
489Indexlr::filter_kmer_qual(Indexlr::HashedKmer& hk,
490 const std::string& kmer_qual,
491 size_t q)
492{
493 if (calc_kmer_quality(kmer_qual) < q) {
494 hk.min_hash = std::numeric_limits<uint64_t>::max();
495 }
496}
497
498inline size_t
499Indexlr::calc_kmer_quality(const std::string& qual)
500{
501 // convert the quality scores to integers
502 std::vector<int> qual_ints;
503 const int thirty_three = 33;
504 qual_ints.reserve(qual.size());
505 for (auto c : qual) {
506 qual_ints.push_back(c - thirty_three);
507 }
508 // calculate the mean (potential improvement: use other statistics)
509 size_t sum = 0;
510 for (auto q : qual_ints) {
511 sum += q;
512 }
513 return (sum / qual_ints.size());
514}
515
516inline void
517Indexlr::calc_minimizer(
518 const std::vector<Indexlr::HashedKmer>& hashed_kmers_buffer,
519 const Indexlr::Minimizer*& min_current,
520 const size_t idx,
521 ssize_t& min_idx_left,
522 ssize_t& min_idx_right,
523 ssize_t& min_pos_prev,
524 const size_t w,
525 std::vector<Indexlr::Minimizer>& minimizers)
526{
527 min_idx_left = ssize_t(idx + 1 - w);
528 min_idx_right = ssize_t(idx + 1);
529 const auto& min_left =
530 hashed_kmers_buffer[min_idx_left % hashed_kmers_buffer.size()];
531 const auto& min_right =
532 hashed_kmers_buffer[(min_idx_right - 1) % hashed_kmers_buffer.size()];
533
534 if (min_current == nullptr || min_current->pos < min_left.pos) {
535 min_current = &min_left;
536 // Use of operator '<=' returns the minimum that is furthest from left.
537 for (ssize_t i = min_idx_left; i < min_idx_right; i++) {
538 const auto& min_i = hashed_kmers_buffer[i % hashed_kmers_buffer.size()];
539 if (min_i.min_hash <= min_current->min_hash) {
540 min_current = &min_i;
541 }
542 }
543 } else if (min_right.min_hash <= min_current->min_hash) {
544 min_current = &min_right;
545 }
546 if (ssize_t(min_current->pos) > min_pos_prev &&
547 min_current->min_hash != std::numeric_limits<uint64_t>::max()) {
548 min_pos_prev = ssize_t(min_current->pos);
549 minimizers.push_back(*min_current);
550 }
551}
552
553inline std::vector<Indexlr::Minimizer>
554Indexlr::minimize(const std::string& seq, const std::string& qual) const
555{
556 if ((k > seq.size()) || (w > seq.size() - k + 1)) {
557 return {};
558 }
559 std::vector<Minimizer> minimizers;
560 minimizers.reserve(2 * (seq.size() - k + 1) / w);
561 std::vector<HashedKmer> hashed_kmers_buffer(w + 1);
562 ssize_t min_idx_left, min_idx_right, min_pos_prev = -1;
563 const Minimizer* min_current = nullptr;
564 size_t idx = 0;
565 for (NtHash nh(seq, 2, k); nh.roll(); ++idx) {
566 auto& hk = hashed_kmers_buffer[idx % hashed_kmers_buffer.size()];
567
568 hk = HashedKmer(nh.hashes()[0],
569 nh.hashes()[1],
570 nh.get_pos(),
571 nh.get_forward_hash() <= nh.get_reverse_hash(),
572 output_seq() ? seq.substr(nh.get_pos(), k) : "",
573 output_qual() ? qual.substr(nh.get_pos(), k) : "");
574
575 filter_hashed_kmer(
576 hk, filter_in(), filter_out(), filter_in_bf.get(), filter_out_bf.get());
577
578 if (q > 0) {
579 filter_kmer_qual(hk, qual.substr(nh.get_pos(), k), q);
580 }
581
582 if (idx + 1 >= w) {
583 calc_minimizer(hashed_kmers_buffer,
584 min_current,
585 idx,
586 min_idx_left,
587 min_idx_right,
588 min_pos_prev,
589 w,
590 minimizers);
591 }
592 }
593 return minimizers;
594}
595
596inline Indexlr::Record
598{
599 if (ready_blocks_owners()[id % MAX_SIMULTANEOUS_INDEXLRS] != id) {
600 ready_blocks_array()[id % MAX_SIMULTANEOUS_INDEXLRS] =
601 std::unique_ptr< // NOLINT(modernize-make-unique)
602 decltype(output_queue)::Block>(
603 new decltype(output_queue)::Block(reader.get_block_size()));
604 ready_blocks_owners()[id % MAX_SIMULTANEOUS_INDEXLRS] = id;
605 ready_blocks_current()[id % MAX_SIMULTANEOUS_INDEXLRS] = 0;
606 }
607 auto& block = *(ready_blocks_array()[id % MAX_SIMULTANEOUS_INDEXLRS]);
608 auto& current = ready_blocks_current()[id % MAX_SIMULTANEOUS_INDEXLRS];
609 if (current >= block.count) {
610 block.count = 0;
611 output_queue.read(block);
612 if (block.count == 0) {
613 output_queue.close();
614 block = decltype(output_queue)::Block(reader.get_block_size());
615 return Record();
616 }
617 current = 0;
618 }
619 return std::move(block.data[current++]);
620}
621
622inline void
623Indexlr::Worker::work()
624{
625 decltype(indexlr.output_queue)::Block output_block(
626 indexlr.reader.get_block_size());
627 uint64_t last_block_num = 0;
628 bool last_block_num_valid = false;
629 for (;;) {
630 auto input_block = indexlr.reader.read_block();
631 if (input_block.count == 0) {
632 break;
633 }
634
635 output_block.num = input_block.num;
636 for (size_t idx = 0; idx < input_block.count; idx++) {
637 Record record;
638 auto& reader_record = input_block.data[idx];
639 record.num = reader_record.num;
640 if (indexlr.output_bx()) {
641 record.barcode =
642 indexlr.extract_barcode(reader_record.id, reader_record.comment);
643 }
644 if (indexlr.output_id()) {
645 record.id = std::move(reader_record.id);
646 }
647
648 record.readlen = reader_record.seq.size();
649
650 check_info(indexlr.verbose && indexlr.k > record.readlen,
651 "Indexlr: skipped seq " + std::to_string(record.num) +
652 " on line " +
653 std::to_string(record.num * (indexlr.reader.get_format() ==
654 SeqReader::Format::FASTA
655 ? 2
656 : 4) +
657 2) +
658 "; k (" + std::to_string(indexlr.k) + ") > seq length (" +
659 std::to_string(record.readlen) + ")");
660
661 check_info(indexlr.verbose && indexlr.w > record.readlen - indexlr.k + 1,
662 "Indexlr: skipped seq " + std::to_string(record.num) +
663 " on line " +
664 std::to_string(record.num * (indexlr.reader.get_format() ==
665 SeqReader::Format::FASTA
666 ? 2
667 : 4) +
668 2) +
669 "; w (" + std::to_string(indexlr.w) + ") > # of hashes (" +
670 std::to_string(record.readlen - indexlr.k + 1) + ")");
671
672 if (indexlr.k <= record.readlen &&
673 indexlr.w <= record.readlen - indexlr.k + 1) {
674 record.minimizers =
675 indexlr.minimize(reader_record.seq, reader_record.qual);
676 } else {
677 record.minimizers = {};
678 }
679
680 output_block.data[output_block.count++] = std::move(record);
681 }
682 if (output_block.count > 0) {
683 last_block_num = output_block.num;
684 last_block_num_valid = true;
685 indexlr.output_queue.write(output_block);
686 output_block.count = 0;
687 }
688 }
689 if (last_block_num_valid) {
690 std::unique_lock<std::mutex> lock(indexlr.last_block_num_mutex);
691 indexlr.last_block_num = std::max(indexlr.last_block_num, last_block_num);
692 indexlr.last_block_num_valid = true;
693 lock.unlock();
694 }
695 indexlr.end_barrier.wait();
696 if (last_block_num_valid && indexlr.last_block_num_valid &&
697 last_block_num == indexlr.last_block_num) {
698 output_block.num = last_block_num + 1;
699 indexlr.output_queue.write(output_block);
700 } else if (!indexlr.last_block_num_valid && id == 0) {
701 output_block.num = 0;
702 indexlr.output_queue.write(output_block);
703 }
704}
705
706} // namespace btllib
707
708#endif
Definition bloom_filter.hpp:73
Definition indexlr.hpp:26
Indexlr(std::string seqfile, size_t k, size_t w, unsigned flags=0, unsigned threads=5, bool verbose=false, const btllib::BloomFilter &bf1=Indexlr::dummy_bf(), const btllib::BloomFilter &bf2=Indexlr::dummy_bf())
Definition indexlr.hpp:380
RecordIterator begin()
Definition indexlr.hpp:221
Record read()
Definition indexlr.hpp:597
OrderQueueMPMC< Record >::Block read_block()
Definition aahash.hpp:12
void check_error(bool condition, const std::string &msg)
void log_error(const std::string &msg)
bool startswith(std::string s, std::string prefix)
void check_info(bool condition, const std::string &msg)
Definition indexlr.hpp:34
static const unsigned BX
Definition indexlr.hpp:38
static const unsigned SEQ
Definition indexlr.hpp:40
static const unsigned LONG_MODE
Definition indexlr.hpp:52
static const unsigned QUAL
Definition indexlr.hpp:54
static const unsigned FILTER_IN
Definition indexlr.hpp:43
static const unsigned FILTER_OUT
Definition indexlr.hpp:48
static const unsigned NO_ID
Definition indexlr.hpp:36
static const unsigned SHORT_MODE
Definition indexlr.hpp:50
Definition indexlr.hpp:67
Definition indexlr.hpp:108