btllib
 All Classes Namespaces Functions Variables
indexlr.hpp
1 #ifndef BTLLIB_INDEXLR_HPP
2 #define BTLLIB_INDEXLR_HPP
3 
4 #include "btllib/bloom_filter.hpp"
5 #include "btllib/nthash.hpp"
6 #include "btllib/order_queue.hpp"
7 #include "btllib/seq_reader.hpp"
8 #include "btllib/status.hpp"
9 #include "btllib/util.hpp"
10 
11 #include <algorithm>
12 #include <atomic>
13 #include <cstdlib>
14 #include <cstring>
15 #include <functional>
16 #include <iostream>
17 #include <limits>
18 #include <memory>
19 #include <string>
20 #include <thread>
21 #include <vector>
22 
23 namespace btllib {
24 
25 class Indexlr
26 {
27 
28 public:
29  /* Has to be a struct and not an enum because:
30  * 1) Non-class enums are not name qualified and can collide
31  * 2) class enums can't be implicitly converted into integers
32  */
33  struct Flag
34  {
36  static const unsigned NO_ID = 1;
38  static const unsigned BX = 2;
40  static const unsigned SEQ = 4;
43  static const unsigned FILTER_IN = 8;
48  static const unsigned FILTER_OUT = 16;
50  static const unsigned SHORT_MODE = 32;
52  static const unsigned LONG_MODE = 64;
54  static const unsigned QUAL = 128;
55  };
56 
57  bool output_id() const { return bool(~flags & Flag::NO_ID); }
58  bool output_bx() const { return bool(flags & Flag::BX); }
59  bool output_seq() const { return bool(flags & Flag::SEQ); }
60  bool output_qual() const { return bool(flags & Flag::QUAL); }
61  bool filter_in() const { return bool(flags & Flag::FILTER_IN); }
62  bool filter_out() const { return bool(flags & Flag::FILTER_OUT); }
63  bool short_mode() const { return bool(flags & Flag::SHORT_MODE); }
64  bool long_mode() const { return bool(flags & Flag::LONG_MODE); }
65 
66  struct Minimizer
67  {
68  Minimizer() = default;
69 
70  Minimizer(uint64_t min_hash,
71  uint64_t out_hash,
72  size_t pos,
73  bool forward,
74  std::string seq)
75  : min_hash(min_hash)
76  , out_hash(out_hash)
77  , pos(pos)
78  , forward(forward)
79  , seq(std::move(seq))
80  {
81  }
82 
83  Minimizer(uint64_t min_hash,
84  uint64_t out_hash,
85  size_t pos,
86  bool forward,
87  std::string seq,
88  std::string qual)
89  : min_hash(min_hash)
90  , out_hash(out_hash)
91  , pos(pos)
92  , forward(forward)
93  , seq(std::move(seq))
94  , qual(std::move(qual))
95  {
96  }
97 
98  uint64_t min_hash = 0, out_hash = 0;
99  size_t pos = 0;
100  bool forward = false;
101  std::string seq;
102  std::string qual;
103  };
104 
105  using HashedKmer = Minimizer;
106 
107  struct Record
108  {
109  Record() {}
110 
111  Record(size_t num,
112  std::string id,
113  std::string barcode,
114  size_t readlen,
115  std::vector<Minimizer> minimizers)
116  : num(num)
117  , id(std::move(id))
118  , barcode(std::move(barcode))
119  , readlen(readlen)
120  , minimizers(std::move(minimizers))
121  {
122  }
123 
124  size_t num = 0;
125  std::string id;
126  std::string barcode;
127  size_t readlen = 0;
128  std::vector<Minimizer> minimizers;
129 
130  operator bool() const
131  {
132  return !id.empty() || !barcode.empty() || !minimizers.empty();
133  }
134  };
135 
140  Record read();
141 
161  Indexlr(std::string seqfile,
162  size_t k,
163  size_t w,
164  unsigned flags = 0,
165  unsigned threads = 5,
166  bool verbose = false,
167  const btllib::BloomFilter& bf1 = Indexlr::dummy_bf(),
168  const btllib::BloomFilter& bf2 = Indexlr::dummy_bf());
169 
170  Indexlr(std::string seqfile,
171  size_t k,
172  size_t w,
173  size_t q,
174  unsigned flags = 0,
175  unsigned threads = 5,
176  bool verbose = false,
177  const btllib::BloomFilter& bf1 = Indexlr::dummy_bf(),
178  const btllib::BloomFilter& bf2 = Indexlr::dummy_bf());
179 
180  ~Indexlr();
181 
182  void close() noexcept;
183 
184  static const size_t MAX_SIMULTANEOUS_INDEXLRS = 256;
185 
187  class RecordIterator
189  {
190  public:
191  void operator++() { record = indexlr.read(); }
192  bool operator!=(const RecordIterator& i)
193  {
194  return bool(record) || bool(i.record);
195  }
196  Record operator*() { return std::move(record); }
197  // For wrappers
198  Record next()
199  {
200  auto val = operator*();
201  operator++();
202  return val;
203  }
204 
205  private:
206  friend Indexlr;
207 
208  RecordIterator(Indexlr& indexlr, bool end)
209  : indexlr(indexlr)
210  {
211  if (!end) {
212  operator++();
213  }
214  }
215 
216  Indexlr& indexlr;
217  Record record;
218  };
220 
221  RecordIterator begin() { return RecordIterator(*this, false); }
222  RecordIterator end() { return RecordIterator(*this, true); }
223 
224 private:
225  static std::string extract_barcode(const std::string& id,
226  const std::string& comment);
227  static void filter_hashed_kmer(Indexlr::HashedKmer& hk,
228  bool filter_in,
229  bool filter_out,
230  const BloomFilter& filter_in_bf,
231  const BloomFilter& filter_out_bf);
232 
233  static void filter_kmer_qual(Indexlr::HashedKmer& hk,
234  const std::string& kmer_qual,
235  size_t q);
236  static size_t calc_kmer_quality(const std::string& qual);
237 
238  static void calc_minimizer(
239  const std::vector<Indexlr::HashedKmer>& hashed_kmers_buffer,
240  const Indexlr::Minimizer*& min_current,
241  size_t idx,
242  ssize_t& min_idx_left,
243  ssize_t& min_idx_right,
244  ssize_t& min_pos_prev,
245  size_t w,
246  std::vector<Indexlr::Minimizer>& minimizers);
247  std::vector<Minimizer> minimize(const std::string& seq,
248  const std::string& qual) const;
249 
250  const std::string seqfile;
251  const size_t k, w;
252  size_t q;
253  const unsigned flags;
254  const bool verbose;
255  const long id;
256  std::atomic<bool> closed{ false };
257 
258  static const BloomFilter& dummy_bf()
259  {
260  static const BloomFilter var;
261  return var;
262  }
263 
264  const std::reference_wrapper<const BloomFilter> filter_in_bf;
265  const std::reference_wrapper<const BloomFilter> filter_out_bf;
266  bool filter_in_enabled;
267  bool filter_out_enabled;
268 
269  SeqReader reader;
270  OrderQueueMPSC<Record> output_queue;
271 
272  using OutputQueueType = decltype(output_queue);
273  static std::unique_ptr<OutputQueueType::Block>* ready_blocks_array()
274  {
275  thread_local static std::unique_ptr<decltype(output_queue)::Block>
276  var[MAX_SIMULTANEOUS_INDEXLRS];
277  return var;
278  }
279 
280  static long* ready_blocks_owners()
281  {
282  thread_local static long var[MAX_SIMULTANEOUS_INDEXLRS] = { 0 };
283  return var;
284  }
285 
286  static size_t* ready_blocks_current()
287  {
288  thread_local static size_t var[MAX_SIMULTANEOUS_INDEXLRS] = { 0 };
289  return var;
290  }
291 
292  static std::atomic<long>& last_id()
293  {
294  static std::atomic<long> var(0);
295  return var;
296  }
297 
298  class Worker
299  {
300  public:
301  void start() { t = std::thread(do_work, this); }
302  void join() { t.join(); }
303  void set_id(const int id) { this->id = id; }
304 
305  Worker& operator=(const Worker& worker) = delete;
306  Worker& operator=(Worker&& worker) = delete;
307 
308  Worker(Indexlr& indexlr)
309  : indexlr(indexlr)
310  {
311  }
312  Worker(const Worker& worker)
313  : Worker(worker.indexlr)
314  {
315  }
316  Worker(Worker&& worker) noexcept
317  : Worker(worker.indexlr)
318  {
319  }
320 
321  private:
322  void work();
323  static void do_work(Worker* worker) { worker->work(); }
324 
325  int id = -1;
326  Indexlr& indexlr;
327  std::thread t;
328  };
329 
330  std::vector<Worker> workers;
331  Barrier end_barrier;
332  std::mutex last_block_num_mutex;
333  uint64_t last_block_num = 0;
334  bool last_block_num_valid = false;
335 };
336 
337 // Constructor for Indexlr class when q is specified
338 inline Indexlr::Indexlr(std::string seqfile,
339  const size_t k,
340  const size_t w,
341  const size_t q,
342  const unsigned flags,
343  const unsigned threads,
344  const bool verbose,
345  const BloomFilter& bf1,
346  const BloomFilter& bf2)
347  : seqfile(std::move(seqfile))
348  , k(k)
349  , w(w)
350  , q(q)
351  , flags(flags)
352  , verbose(verbose)
353  , id(++last_id())
354  , filter_in_bf(filter_in() ? bf1 : Indexlr::dummy_bf())
355  , filter_out_bf(filter_out() ? filter_in() ? bf2 : bf1 : Indexlr::dummy_bf())
356  , filter_in_enabled(filter_in())
357  , filter_out_enabled(filter_out())
358  , reader(this->seqfile,
359  short_mode() ? SeqReader::Flag::SHORT_MODE
360  : SeqReader::Flag::LONG_MODE)
361  , output_queue(reader.get_buffer_size(), reader.get_block_size())
362  , workers(std::vector<Worker>(threads, Worker(*this)))
363  , end_barrier(threads)
364 {
365  check_error(!short_mode() && !long_mode(),
366  "Indexlr: no mode selected, either short or long mode flag must "
367  "be provided.");
368  check_error(short_mode() && long_mode(),
369  "Indexlr: short and long mode are mutually exclusive.");
370  check_error(threads == 0,
371  "Indexlr: Number of processing threads cannot be 0.");
372  int id_counter = 0;
373  for (auto& worker : workers) {
374  worker.set_id(id_counter++);
375  worker.start();
376  }
377 }
378 
379 // Constructor for Indexlr class when q is not specified
380 inline Indexlr::Indexlr(std::string seqfile,
381  const size_t k,
382  const size_t w,
383  const unsigned flags,
384  const unsigned threads,
385  const bool verbose,
386  const BloomFilter& bf1,
387  const BloomFilter& bf2)
388  : Indexlr(std::move(seqfile), k, w, 0, flags, threads, verbose, bf1, bf2)
389 {
390 }
391 
392 inline Indexlr::~Indexlr()
393 {
394  close();
395 }
396 
397 inline void
398 Indexlr::close() noexcept
399 {
400  bool closed_expected = false;
401  if (closed.compare_exchange_strong(closed_expected, true)) {
402  try {
403  reader.close();
404  output_queue.close();
405  for (auto& worker : workers) {
406  worker.join();
407  }
408  } catch (const std::system_error& e) {
409  log_error("Indexlr thread join failure: " + std::string(e.what()));
410  std::exit(EXIT_FAILURE); // NOLINT(concurrency-mt-unsafe)
411  }
412  }
413 }
414 
415 // Minimerize a sequence: Find the minimizers of a vector of hash values
416 // representing a sequence.
417 /* Algorithm
418 v is a vector of non-negative integers
419 w is the window size
420 Invariants
421  0 < w <= v.size() - 1
422  0 <= l <= r <= v.size() - 1
423 Initial conditions
424  M = NIL Final set of minimizers, empty initially
425  min = -1 Minimum element
426  i = -1 Index of minimum element
427  prev = -1 Index of previous minimum element
428  l = 0 Index of left end of window
429  r = l + w - 1 Index of right end of window
430 Computation
431 At each window, if the previous minimum is out of scope, find the new,
432 right-most, minimum or else, check with only the right-most element to determine
433 if that is the new minimum. A minimizer is added to the final vector only if
434 it's index has changed. for each window of v bounded by [l, r] if (i < l) i =
435 index of minimum element in [l, r], furthest from l. else if (v[r] <= v[i]) i =
436 r min = v[i] if (i != prev) { prev = i M <- M + m
437  }
438  l = l + 1 Move window's left bound by one element
439  r = l + w - 1 Set window's right bound
440 }*/
441 
442 inline std::string
443 Indexlr::extract_barcode(const std::string& id, const std::string& comment)
444 {
445  const static std::string barcode_prefix = "BX:Z:";
446  if (startswith(comment, barcode_prefix)) {
447  const auto space_pos = comment.find(' ');
448  if (space_pos != std::string::npos) {
449  return comment.substr(barcode_prefix.size(),
450  space_pos - barcode_prefix.size());
451  }
452  return comment.substr(barcode_prefix.size());
453  }
454  const auto pound_pos = id.find('#');
455  if (pound_pos != std::string::npos) {
456  const auto slash_pos = id.find('/');
457  if (slash_pos > pound_pos) {
458  return id.substr(pound_pos + 1, slash_pos - (pound_pos + 1));
459  }
460  }
461  return "NA";
462 }
463 
464 inline void
465 Indexlr::filter_hashed_kmer(Indexlr::HashedKmer& hk,
466  bool filter_in,
467  bool filter_out,
468  const BloomFilter& filter_in_bf,
469  const BloomFilter& filter_out_bf)
470 {
471  if (filter_in && filter_out) {
472  std::vector<uint64_t> tmp;
473  tmp = { hk.min_hash };
474  if (!filter_in_bf.contains(tmp) || filter_out_bf.contains(tmp)) {
475  hk.min_hash = std::numeric_limits<uint64_t>::max();
476  }
477  } else if (filter_in) {
478  if (!filter_in_bf.contains({ hk.min_hash })) {
479  hk.min_hash = std::numeric_limits<uint64_t>::max();
480  }
481  } else if (filter_out) {
482  if (filter_out_bf.contains({ hk.min_hash })) {
483  hk.min_hash = std::numeric_limits<uint64_t>::max();
484  }
485  }
486 }
487 
488 inline void
489 Indexlr::filter_kmer_qual(Indexlr::HashedKmer& hk,
490  const std::string& kmer_qual,
491  size_t q)
492 {
493  if (calc_kmer_quality(kmer_qual) < q) {
494  hk.min_hash = std::numeric_limits<uint64_t>::max();
495  }
496 }
497 
498 inline size_t
499 Indexlr::calc_kmer_quality(const std::string& qual)
500 {
501  // convert the quality scores to integers
502  std::vector<int> qual_ints;
503  const int thirty_three = 33;
504  qual_ints.reserve(qual.size());
505  for (auto c : qual) {
506  qual_ints.push_back(c - thirty_three);
507  }
508  // calculate the mean (potential improvement: use other statistics)
509  size_t sum = 0;
510  for (auto q : qual_ints) {
511  sum += q;
512  }
513  return (sum / qual_ints.size());
514 }
515 
516 inline void
517 Indexlr::calc_minimizer(
518  const std::vector<Indexlr::HashedKmer>& hashed_kmers_buffer,
519  const Indexlr::Minimizer*& min_current,
520  const size_t idx,
521  ssize_t& min_idx_left,
522  ssize_t& min_idx_right,
523  ssize_t& min_pos_prev,
524  const size_t w,
525  std::vector<Indexlr::Minimizer>& minimizers)
526 {
527  min_idx_left = ssize_t(idx + 1 - w);
528  min_idx_right = ssize_t(idx + 1);
529  const auto& min_left =
530  hashed_kmers_buffer[min_idx_left % hashed_kmers_buffer.size()];
531  const auto& min_right =
532  hashed_kmers_buffer[(min_idx_right - 1) % hashed_kmers_buffer.size()];
533 
534  if (min_current == nullptr || min_current->pos < min_left.pos) {
535  min_current = &min_left;
536  // Use of operator '<=' returns the minimum that is furthest from left.
537  for (ssize_t i = min_idx_left; i < min_idx_right; i++) {
538  const auto& min_i = hashed_kmers_buffer[i % hashed_kmers_buffer.size()];
539  if (min_i.min_hash <= min_current->min_hash) {
540  min_current = &min_i;
541  }
542  }
543  } else if (min_right.min_hash <= min_current->min_hash) {
544  min_current = &min_right;
545  }
546  if (ssize_t(min_current->pos) > min_pos_prev &&
547  min_current->min_hash != std::numeric_limits<uint64_t>::max()) {
548  min_pos_prev = ssize_t(min_current->pos);
549  minimizers.push_back(*min_current);
550  }
551 }
552 
553 inline std::vector<Indexlr::Minimizer>
554 Indexlr::minimize(const std::string& seq, const std::string& qual) const
555 {
556  if ((k > seq.size()) || (w > seq.size() - k + 1)) {
557  return {};
558  }
559  std::vector<Minimizer> minimizers;
560  minimizers.reserve(2 * (seq.size() - k + 1) / w);
561  std::vector<HashedKmer> hashed_kmers_buffer(w + 1);
562  ssize_t min_idx_left, min_idx_right, min_pos_prev = -1;
563  const Minimizer* min_current = nullptr;
564  size_t idx = 0;
565  for (NtHash nh(seq, 2, k); nh.roll(); ++idx) {
566  auto& hk = hashed_kmers_buffer[idx % hashed_kmers_buffer.size()];
567 
568  hk = HashedKmer(nh.hashes()[0],
569  nh.hashes()[1],
570  nh.get_pos(),
571  nh.get_forward_hash() <= nh.get_reverse_hash(),
572  output_seq() ? seq.substr(nh.get_pos(), k) : "",
573  output_qual() ? qual.substr(nh.get_pos(), k) : "");
574 
575  filter_hashed_kmer(
576  hk, filter_in(), filter_out(), filter_in_bf.get(), filter_out_bf.get());
577 
578  if (q > 0) {
579  filter_kmer_qual(hk, qual.substr(nh.get_pos(), k), q);
580  }
581 
582  if (idx + 1 >= w) {
583  calc_minimizer(hashed_kmers_buffer,
584  min_current,
585  idx,
586  min_idx_left,
587  min_idx_right,
588  min_pos_prev,
589  w,
590  minimizers);
591  }
592  }
593  return minimizers;
594 }
595 
596 inline Indexlr::Record
597 Indexlr::read()
598 {
599  if (ready_blocks_owners()[id % MAX_SIMULTANEOUS_INDEXLRS] != id) {
600  ready_blocks_array()[id % MAX_SIMULTANEOUS_INDEXLRS] =
601  std::unique_ptr< // NOLINT(modernize-make-unique)
602  decltype(output_queue)::Block>(
603  new decltype(output_queue)::Block(reader.get_block_size()));
604  ready_blocks_owners()[id % MAX_SIMULTANEOUS_INDEXLRS] = id;
605  ready_blocks_current()[id % MAX_SIMULTANEOUS_INDEXLRS] = 0;
606  }
607  auto& block = *(ready_blocks_array()[id % MAX_SIMULTANEOUS_INDEXLRS]);
608  auto& current = ready_blocks_current()[id % MAX_SIMULTANEOUS_INDEXLRS];
609  if (current >= block.count) {
610  block.count = 0;
611  output_queue.read(block);
612  if (block.count == 0) {
613  output_queue.close();
614  block = decltype(output_queue)::Block(reader.get_block_size());
615  return Record();
616  }
617  current = 0;
618  }
619  return std::move(block.data[current++]);
620 }
621 
622 inline void
623 Indexlr::Worker::work()
624 {
625  decltype(indexlr.output_queue)::Block output_block(
626  indexlr.reader.get_block_size());
627  uint64_t last_block_num = 0;
628  bool last_block_num_valid = false;
629  for (;;) {
630  auto input_block = indexlr.reader.read_block();
631  if (input_block.count == 0) {
632  break;
633  }
634 
635  output_block.num = input_block.num;
636  for (size_t idx = 0; idx < input_block.count; idx++) {
637  Record record;
638  auto& reader_record = input_block.data[idx];
639  record.num = reader_record.num;
640  if (indexlr.output_bx()) {
641  record.barcode =
642  indexlr.extract_barcode(reader_record.id, reader_record.comment);
643  }
644  if (indexlr.output_id()) {
645  record.id = std::move(reader_record.id);
646  }
647 
648  record.readlen = reader_record.seq.size();
649 
650  check_info(indexlr.verbose && indexlr.k > record.readlen,
651  "Indexlr: skipped seq " + std::to_string(record.num) +
652  " on line " +
653  std::to_string(record.num * (indexlr.reader.get_format() ==
654  SeqReader::Format::FASTA
655  ? 2
656  : 4) +
657  2) +
658  "; k (" + std::to_string(indexlr.k) + ") > seq length (" +
659  std::to_string(record.readlen) + ")");
660 
661  check_info(indexlr.verbose && indexlr.w > record.readlen - indexlr.k + 1,
662  "Indexlr: skipped seq " + std::to_string(record.num) +
663  " on line " +
664  std::to_string(record.num * (indexlr.reader.get_format() ==
665  SeqReader::Format::FASTA
666  ? 2
667  : 4) +
668  2) +
669  "; w (" + std::to_string(indexlr.w) + ") > # of hashes (" +
670  std::to_string(record.readlen - indexlr.k + 1) + ")");
671 
672  if (indexlr.k <= record.readlen &&
673  indexlr.w <= record.readlen - indexlr.k + 1) {
674  record.minimizers =
675  indexlr.minimize(reader_record.seq, reader_record.qual);
676  } else {
677  record.minimizers = {};
678  }
679 
680  output_block.data[output_block.count++] = std::move(record);
681  }
682  if (output_block.count > 0) {
683  last_block_num = output_block.num;
684  last_block_num_valid = true;
685  indexlr.output_queue.write(output_block);
686  output_block.count = 0;
687  }
688  }
689  if (last_block_num_valid) {
690  std::unique_lock<std::mutex> lock(indexlr.last_block_num_mutex);
691  indexlr.last_block_num = std::max(indexlr.last_block_num, last_block_num);
692  indexlr.last_block_num_valid = true;
693  lock.unlock();
694  }
695  indexlr.end_barrier.wait();
696  if (last_block_num_valid && indexlr.last_block_num_valid &&
697  last_block_num == indexlr.last_block_num) {
698  output_block.num = last_block_num + 1;
699  indexlr.output_queue.write(output_block);
700  } else if (!indexlr.last_block_num_valid && id == 0) {
701  output_block.num = 0;
702  indexlr.output_queue.write(output_block);
703  }
704 }
705 
706 } // namespace btllib
707 
708 #endif
static const unsigned SHORT_MODE
Definition: indexlr.hpp:50
static const unsigned NO_ID
Definition: indexlr.hpp:36
Indexlr(std::string seqfile, size_t k, size_t w, unsigned flags=0, unsigned threads=5, bool verbose=false, const btllib::BloomFilter &bf1=Indexlr::dummy_bf(), const btllib::BloomFilter &bf2=Indexlr::dummy_bf())
Definition: indexlr.hpp:380
static const unsigned FILTER_IN
Definition: indexlr.hpp:43
Definition: indexlr.hpp:25
std::string join(const std::vector< std::string > &s, const std::string &delim)
bool startswith(std::string s, std::string prefix)
Definition: indexlr.hpp:33
RecordIterator begin()
Definition: indexlr.hpp:221
void check_error(bool condition, const std::string &msg)
void check_info(bool condition, const std::string &msg)
void log_error(const std::string &msg)
Definition: indexlr.hpp:107
static const unsigned FILTER_OUT
Definition: indexlr.hpp:48
Definition: bloom_filter.hpp:72
Definition: indexlr.hpp:66
static const unsigned QUAL
Definition: indexlr.hpp:54
static const unsigned LONG_MODE
Definition: indexlr.hpp:52
static const unsigned BX
Definition: indexlr.hpp:38
static const unsigned SEQ
Definition: indexlr.hpp:40
Record read()
Definition: indexlr.hpp:597