btllib
Loading...
Searching...
No Matches
seq_reader_sam_module.hpp
1#ifndef BTLLIB_SEQ_READER_SAM_MODULE_HPP
2#define BTLLIB_SEQ_READER_SAM_MODULE_HPP
3
4#include "btllib/cstring.hpp"
5#include "btllib/process_pipeline.hpp"
6#include "btllib/seq.hpp"
7#include "btllib/status.hpp"
8
9#include <cstdio>
10#include <memory>
11#include <thread>
12
13namespace btllib {
14
16class SeqReaderSamModule
17{
18
19private:
20 friend class SeqReader;
21
22 enum class Stage
23 {
24 HEADER,
25 ALIGNMENTS
26 };
27
28 ~SeqReaderSamModule()
29 {
30 if (loader_thread) {
31 loader_thread->join();
32 }
33 }
34
35 std::unique_ptr<ProcessPipeline> samtools_process;
36 std::unique_ptr<std::thread> loader_thread;
37 CString tmp;
38 static const size_t LOADER_BLOCK_SIZE = 4096;
39
40 static bool buffer_valid(const char* buffer, size_t size);
41 template<typename ReaderType, typename RecordType>
42 bool read_buffer(ReaderType& reader, RecordType& record);
43 template<typename ReaderType, typename RecordType>
44 bool read_transition(ReaderType& reader, RecordType& record);
45 template<typename ReaderType, typename RecordType>
46 bool read_file(ReaderType& reader, RecordType& record);
47};
48
49template<typename ReaderType, typename RecordType>
50inline bool
51SeqReaderSamModule::read_buffer(ReaderType& reader, RecordType& record)
52{
53 (void)reader;
54 (void)record;
55 {
56 const ProcessPipeline version_test(
57 "samtools --version 2>/dev/stdout | head -n2");
58 char* line = nullptr;
59 size_t n = 0;
60 std::string version = "\n";
61
62 check_error(getline(&line, &n, version_test.out) < 2,
63 "Failed to get samtools version.");
64 version += line;
65
66 line = nullptr;
67 check_error(getline(&line, &n, version_test.out) < 2,
68 "Failed to get samtools version.");
69 version += line;
70 version.pop_back();
71
72 log_info(version);
73 }
74 samtools_process =
75 std::unique_ptr<ProcessPipeline>( // NOLINT(modernize-make-unique)
76 new ProcessPipeline("samtools fastq"));
77 loader_thread =
78 std::unique_ptr<std::thread>(new std::thread([this, &reader]() {
79 check_error(fwrite(reader.buffer.data.data() + reader.buffer.start,
80 1,
81 reader.buffer.end - reader.buffer.start,
82 samtools_process->in) !=
83 reader.buffer.end - reader.buffer.start,
84 "SeqReader SAM module: fwrite failed.");
85 reader.buffer.start = reader.buffer.end;
86 if (std::ferror(reader.source) == 0 && std::feof(reader.source) == 0) {
87 const auto p = std::fgetc(reader.source);
88 if (p != EOF) {
89 const auto ret = std::ungetc(p, reader.source);
90 check_error(ret == EOF, "SeqReaderSamModule: ungetc failed.");
91 while (std::ferror(reader.source) == 0 &&
92 std::feof(reader.source) == 0) {
93 const size_t bufsize = LOADER_BLOCK_SIZE;
94 char buf[bufsize];
95 size_t bytes_read = fread(buf, 1, bufsize, reader.source);
96 check_error(fwrite(buf, 1, bytes_read, samtools_process->in) !=
97 bytes_read,
98 "SeqReader SAM module: fwrite failed.");
99 }
100 }
101 }
102 samtools_process->close_in();
103 }));
104 return false;
105}
106
107template<typename ReaderType, typename RecordType>
108inline bool
109SeqReaderSamModule::read_transition(ReaderType& reader, RecordType& record)
110{
111 (void)reader;
112 (void)record;
113 return false;
114}
115
116template<typename ReaderType, typename RecordType>
117inline bool
118SeqReaderSamModule::read_file(ReaderType& reader, RecordType& record)
119{
120 if (!reader.file_at_end(samtools_process->out)) {
121 reader.readline_file(record.header, samtools_process->out);
122 reader.readline_file(record.seq, samtools_process->out);
123 reader.readline_file(tmp, samtools_process->out);
124 reader.readline_file(record.qual, samtools_process->out);
125 return true;
126 }
127 return false;
128}
130
131} // namespace btllib
132
133#endif
Definition aahash.hpp:12
void check_error(bool condition, const std::string &msg)
void log_info(const std::string &msg)