1 #ifndef BTLLIB_INDEXLR_HPP
2 #define BTLLIB_INDEXLR_HPP
4 #include "btllib/bloom_filter.hpp"
5 #include "btllib/nthash.hpp"
6 #include "btllib/order_queue.hpp"
7 #include "btllib/seq_reader.hpp"
8 #include "btllib/status.hpp"
9 #include "btllib/util.hpp"
36 static const unsigned NO_ID = 1;
38 static const unsigned BX = 2;
40 static const unsigned SEQ = 4;
54 static const unsigned QUAL = 128;
57 bool output_id()
const {
return bool(~flags &
Flag::NO_ID); }
58 bool output_bx()
const {
return bool(flags &
Flag::BX); }
59 bool output_seq()
const {
return bool(flags &
Flag::SEQ); }
60 bool output_qual()
const {
return bool(flags &
Flag::QUAL); }
94 , qual(std::move(qual))
98 uint64_t min_hash = 0, out_hash = 0;
100 bool forward =
false;
115 std::vector<Minimizer> minimizers)
118 , barcode(std::move(barcode))
120 , minimizers(std::move(minimizers))
128 std::vector<Minimizer> minimizers;
130 operator bool()
const
132 return !
id.empty() || !barcode.empty() || !minimizers.empty();
165 unsigned threads = 5,
166 bool verbose =
false,
175 unsigned threads = 5,
176 bool verbose =
false,
182 void close() noexcept;
184 static const
size_t MAX_SIMULTANEOUS_INDEXLRS = 256;
191 void operator++() { record = indexlr.read(); }
192 bool operator!=(
const RecordIterator& i)
194 return bool(record) || bool(i.record);
196 Record operator*() {
return std::move(record); }
200 auto val = operator*();
208 RecordIterator(
Indexlr& indexlr,
bool end)
221 RecordIterator
begin() {
return RecordIterator(*
this,
false); }
222 RecordIterator end() {
return RecordIterator(*
this,
true); }
225 static std::string extract_barcode(
const std::string&
id,
226 const std::string& comment);
227 static void filter_hashed_kmer(Indexlr::HashedKmer& hk,
230 const BloomFilter& filter_in_bf,
231 const BloomFilter& filter_out_bf);
233 static void filter_kmer_qual(Indexlr::HashedKmer& hk,
234 const std::string& kmer_qual,
236 static size_t calc_kmer_quality(
const std::string& qual);
238 static void calc_minimizer(
239 const std::vector<Indexlr::HashedKmer>& hashed_kmers_buffer,
240 const Indexlr::Minimizer*& min_current,
242 ssize_t& min_idx_left,
243 ssize_t& min_idx_right,
244 ssize_t& min_pos_prev,
246 std::vector<Indexlr::Minimizer>& minimizers);
247 std::vector<Minimizer> minimize(
const std::string& seq,
248 const std::string& qual)
const;
250 const std::string seqfile;
253 const unsigned flags;
256 std::atomic<bool> closed{
false };
258 static const BloomFilter& dummy_bf()
260 static const BloomFilter var;
264 const std::reference_wrapper<const BloomFilter> filter_in_bf;
265 const std::reference_wrapper<const BloomFilter> filter_out_bf;
266 bool filter_in_enabled;
267 bool filter_out_enabled;
270 OrderQueueMPSC<Record> output_queue;
272 using OutputQueueType = decltype(output_queue);
273 static std::unique_ptr<OutputQueueType::Block>* ready_blocks_array()
275 thread_local
static std::unique_ptr<decltype(output_queue)::Block>
276 var[MAX_SIMULTANEOUS_INDEXLRS];
280 static long* ready_blocks_owners()
282 thread_local
static long var[MAX_SIMULTANEOUS_INDEXLRS] = { 0 };
286 static size_t* ready_blocks_current()
288 thread_local
static size_t var[MAX_SIMULTANEOUS_INDEXLRS] = { 0 };
292 static std::atomic<long>& last_id()
294 static std::atomic<long> var(0);
301 void start() { t = std::thread(do_work,
this); }
302 void join() { t.join(); }
303 void set_id(
const int id) { this->
id = id; }
305 Worker& operator=(
const Worker& worker) =
delete;
306 Worker& operator=(Worker&& worker) =
delete;
312 Worker(
const Worker& worker)
313 : Worker(worker.indexlr)
316 Worker(Worker&& worker) noexcept
317 : Worker(worker.indexlr)
323 static void do_work(Worker* worker) { worker->work(); }
330 std::vector<Worker> workers;
332 std::mutex last_block_num_mutex;
333 uint64_t last_block_num = 0;
334 bool last_block_num_valid =
false;
342 const unsigned flags,
343 const unsigned threads,
345 const BloomFilter& bf1,
346 const BloomFilter& bf2)
347 : seqfile(std::move(seqfile))
354 , filter_in_bf(filter_in() ? bf1 : Indexlr::dummy_bf())
355 , filter_out_bf(filter_out() ? filter_in() ? bf2 : bf1 : Indexlr::dummy_bf())
356 , filter_in_enabled(filter_in())
357 , filter_out_enabled(filter_out())
358 , reader(this->seqfile,
359 short_mode() ? SeqReader::Flag::SHORT_MODE
360 : SeqReader::Flag::LONG_MODE)
361 , output_queue(reader.get_buffer_size(), reader.get_block_size())
362 , workers(std::vector<Worker>(threads, Worker(*this)))
363 , end_barrier(threads)
366 "Indexlr: no mode selected, either short or long mode flag must "
369 "Indexlr: short and long mode are mutually exclusive.");
371 "Indexlr: Number of processing threads cannot be 0.");
373 for (
auto& worker : workers) {
374 worker.set_id(id_counter++);
383 const unsigned flags,
384 const unsigned threads,
388 :
Indexlr(std::move(seqfile), k, w, 0, flags, threads, verbose, bf1, bf2)
392 inline Indexlr::~Indexlr()
398 Indexlr::close() noexcept
400 bool closed_expected =
false;
401 if (closed.compare_exchange_strong(closed_expected,
true)) {
404 output_queue.close();
405 for (
auto& worker : workers) {
408 }
catch (
const std::system_error& e) {
409 log_error(
"Indexlr thread join failure: " + std::string(e.what()));
410 std::exit(EXIT_FAILURE);
443 Indexlr::extract_barcode(
const std::string&
id,
const std::string& comment)
445 const static std::string barcode_prefix =
"BX:Z:";
447 const auto space_pos = comment.find(
' ');
448 if (space_pos != std::string::npos) {
449 return comment.substr(barcode_prefix.size(),
450 space_pos - barcode_prefix.size());
452 return comment.substr(barcode_prefix.size());
454 const auto pound_pos =
id.find(
'#');
455 if (pound_pos != std::string::npos) {
456 const auto slash_pos =
id.find(
'/');
457 if (slash_pos > pound_pos) {
458 return id.substr(pound_pos + 1, slash_pos - (pound_pos + 1));
465 Indexlr::filter_hashed_kmer(Indexlr::HashedKmer& hk,
468 const BloomFilter& filter_in_bf,
469 const BloomFilter& filter_out_bf)
471 if (filter_in && filter_out) {
472 std::vector<uint64_t> tmp;
473 tmp = { hk.min_hash };
474 if (!filter_in_bf.contains(tmp) || filter_out_bf.contains(tmp)) {
475 hk.min_hash = std::numeric_limits<uint64_t>::max();
477 }
else if (filter_in) {
478 if (!filter_in_bf.contains({ hk.min_hash })) {
479 hk.min_hash = std::numeric_limits<uint64_t>::max();
481 }
else if (filter_out) {
482 if (filter_out_bf.contains({ hk.min_hash })) {
483 hk.min_hash = std::numeric_limits<uint64_t>::max();
489 Indexlr::filter_kmer_qual(Indexlr::HashedKmer& hk,
490 const std::string& kmer_qual,
493 if (calc_kmer_quality(kmer_qual) < q) {
494 hk.min_hash = std::numeric_limits<uint64_t>::max();
499 Indexlr::calc_kmer_quality(
const std::string& qual)
502 std::vector<int> qual_ints;
503 const int thirty_three = 33;
504 qual_ints.reserve(qual.size());
505 for (
auto c : qual) {
506 qual_ints.push_back(c - thirty_three);
510 for (
auto q : qual_ints) {
513 return (sum / qual_ints.size());
517 Indexlr::calc_minimizer(
518 const std::vector<Indexlr::HashedKmer>& hashed_kmers_buffer,
519 const Indexlr::Minimizer*& min_current,
521 ssize_t& min_idx_left,
522 ssize_t& min_idx_right,
523 ssize_t& min_pos_prev,
525 std::vector<Indexlr::Minimizer>& minimizers)
527 min_idx_left = ssize_t(idx + 1 - w);
528 min_idx_right = ssize_t(idx + 1);
529 const auto& min_left =
530 hashed_kmers_buffer[min_idx_left % hashed_kmers_buffer.size()];
531 const auto& min_right =
532 hashed_kmers_buffer[(min_idx_right - 1) % hashed_kmers_buffer.size()];
534 if (min_current ==
nullptr || min_current->pos < min_left.pos) {
535 min_current = &min_left;
537 for (ssize_t i = min_idx_left; i < min_idx_right; i++) {
538 const auto& min_i = hashed_kmers_buffer[i % hashed_kmers_buffer.size()];
539 if (min_i.min_hash <= min_current->min_hash) {
540 min_current = &min_i;
543 }
else if (min_right.min_hash <= min_current->min_hash) {
544 min_current = &min_right;
546 if (ssize_t(min_current->pos) > min_pos_prev &&
547 min_current->min_hash != std::numeric_limits<uint64_t>::max()) {
548 min_pos_prev = ssize_t(min_current->pos);
549 minimizers.push_back(*min_current);
553 inline std::vector<Indexlr::Minimizer>
554 Indexlr::minimize(
const std::string& seq,
const std::string& qual)
const
556 if ((k > seq.size()) || (w > seq.size() - k + 1)) {
559 std::vector<Minimizer> minimizers;
560 minimizers.reserve(2 * (seq.size() - k + 1) / w);
561 std::vector<HashedKmer> hashed_kmers_buffer(w + 1);
562 ssize_t min_idx_left, min_idx_right, min_pos_prev = -1;
563 const Minimizer* min_current =
nullptr;
565 for (NtHash nh(seq, 2, k); nh.roll(); ++idx) {
566 auto& hk = hashed_kmers_buffer[idx % hashed_kmers_buffer.size()];
568 hk = HashedKmer(nh.hashes()[0],
571 nh.get_forward_hash() <= nh.get_reverse_hash(),
572 output_seq() ? seq.substr(nh.get_pos(), k) :
"",
573 output_qual() ? qual.substr(nh.get_pos(), k) :
"");
576 hk, filter_in(), filter_out(), filter_in_bf.get(), filter_out_bf.get());
579 filter_kmer_qual(hk, qual.substr(nh.get_pos(), k), q);
583 calc_minimizer(hashed_kmers_buffer,
596 inline Indexlr::Record
599 if (ready_blocks_owners()[
id % MAX_SIMULTANEOUS_INDEXLRS] !=
id) {
600 ready_blocks_array()[
id % MAX_SIMULTANEOUS_INDEXLRS] =
602 decltype(output_queue)::Block>(
603 new decltype(output_queue)::Block(reader.get_block_size()));
604 ready_blocks_owners()[
id % MAX_SIMULTANEOUS_INDEXLRS] = id;
605 ready_blocks_current()[
id % MAX_SIMULTANEOUS_INDEXLRS] = 0;
607 auto& block = *(ready_blocks_array()[
id % MAX_SIMULTANEOUS_INDEXLRS]);
608 auto& current = ready_blocks_current()[
id % MAX_SIMULTANEOUS_INDEXLRS];
609 if (current >= block.count) {
611 output_queue.read(block);
612 if (block.count == 0) {
613 output_queue.close();
614 block = decltype(output_queue)::Block(reader.get_block_size());
619 return std::move(block.data[current++]);
623 Indexlr::Worker::work()
625 decltype(indexlr.output_queue)::Block output_block(
626 indexlr.reader.get_block_size());
627 uint64_t last_block_num = 0;
628 bool last_block_num_valid =
false;
630 auto input_block = indexlr.reader.read_block();
631 if (input_block.count == 0) {
635 output_block.num = input_block.num;
636 for (
size_t idx = 0; idx < input_block.count; idx++) {
638 auto& reader_record = input_block.data[idx];
639 record.num = reader_record.num;
640 if (indexlr.output_bx()) {
642 indexlr.extract_barcode(reader_record.id, reader_record.comment);
644 if (indexlr.output_id()) {
645 record.id = std::move(reader_record.id);
648 record.readlen = reader_record.seq.size();
650 check_info(indexlr.verbose && indexlr.k > record.readlen,
651 "Indexlr: skipped seq " + std::to_string(record.num) +
653 std::to_string(record.num * (indexlr.reader.get_format() ==
654 SeqReader::Format::FASTA
658 "; k (" + std::to_string(indexlr.k) +
") > seq length (" +
659 std::to_string(record.readlen) +
")");
661 check_info(indexlr.verbose && indexlr.w > record.readlen - indexlr.k + 1,
662 "Indexlr: skipped seq " + std::to_string(record.num) +
664 std::to_string(record.num * (indexlr.reader.get_format() ==
665 SeqReader::Format::FASTA
669 "; w (" + std::to_string(indexlr.w) +
") > # of hashes (" +
670 std::to_string(record.readlen - indexlr.k + 1) +
")");
672 if (indexlr.k <= record.readlen &&
673 indexlr.w <= record.readlen - indexlr.k + 1) {
675 indexlr.minimize(reader_record.seq, reader_record.qual);
677 record.minimizers = {};
680 output_block.data[output_block.count++] = std::move(record);
682 if (output_block.count > 0) {
683 last_block_num = output_block.num;
684 last_block_num_valid =
true;
685 indexlr.output_queue.write(output_block);
686 output_block.count = 0;
689 if (last_block_num_valid) {
690 std::unique_lock<std::mutex> lock(indexlr.last_block_num_mutex);
691 indexlr.last_block_num = std::max(indexlr.last_block_num, last_block_num);
692 indexlr.last_block_num_valid =
true;
695 indexlr.end_barrier.wait();
696 if (last_block_num_valid && indexlr.last_block_num_valid &&
697 last_block_num == indexlr.last_block_num) {
698 output_block.num = last_block_num + 1;
699 indexlr.output_queue.write(output_block);
700 }
else if (!indexlr.last_block_num_valid &&
id == 0) {
701 output_block.num = 0;
702 indexlr.output_queue.write(output_block);
static const unsigned SHORT_MODE
Definition: indexlr.hpp:50
static const unsigned NO_ID
Definition: indexlr.hpp:36
Indexlr(std::string seqfile, size_t k, size_t w, unsigned flags=0, unsigned threads=5, bool verbose=false, const btllib::BloomFilter &bf1=Indexlr::dummy_bf(), const btllib::BloomFilter &bf2=Indexlr::dummy_bf())
Definition: indexlr.hpp:380
static const unsigned FILTER_IN
Definition: indexlr.hpp:43
Definition: indexlr.hpp:25
std::string join(const std::vector< std::string > &s, const std::string &delim)
bool startswith(std::string s, std::string prefix)
Definition: indexlr.hpp:33
RecordIterator begin()
Definition: indexlr.hpp:221
void check_error(bool condition, const std::string &msg)
void check_info(bool condition, const std::string &msg)
void log_error(const std::string &msg)
Definition: indexlr.hpp:107
static const unsigned FILTER_OUT
Definition: indexlr.hpp:48
Definition: bloom_filter.hpp:72
Definition: indexlr.hpp:66
static const unsigned QUAL
Definition: indexlr.hpp:54
static const unsigned LONG_MODE
Definition: indexlr.hpp:52
static const unsigned BX
Definition: indexlr.hpp:38
static const unsigned SEQ
Definition: indexlr.hpp:40
Record read()
Definition: indexlr.hpp:597