diff --git a/kmc_tools/fastq_reader.cpp b/kmc_tools/fastq_reader.cpp index 17e7063..67a105e 100644 --- a/kmc_tools/fastq_reader.cpp +++ b/kmc_tools/fastq_reader.cpp @@ -166,23 +166,17 @@ bool CFastqReader::GetPart(uchar *&_part, uint64 &_size) // Read data if(mode == m_plain) - readed = fread(part+part_filled, 1, part_size, in); + readed = fread(part+part_filled, 1, part_size - part_filled, in); else if(mode == m_gzip) - readed = gzread(in_gzip, part+part_filled, (int) part_size); + readed = gzread(in_gzip, part+part_filled, (int) (part_size - part_filled)); else if(mode == m_bzip2) - readed = BZ2_bzRead(&bzerror, in_bzip2, part+part_filled, (int) part_size); + readed = BZ2_bzRead(&bzerror, in_bzip2, part+part_filled, (int) (part_size - part_filled)); else readed = 0; // Never should be here int64 total_filled = part_filled + readed; int64 i; - if(part_filled >= OVERHEAD_SIZE) - { - cerr << "Error: Wrong input file!\n"; - exit(1); - } - if(IsEof()) { _part = part; @@ -196,70 +190,83 @@ bool CFastqReader::GetPart(uchar *&_part, uint64 &_size) if(file_type == CFilteringParams::file_type::fasta) // FASTA files { // Looking for a FASTA record at the end of the area - int64 line_start[3]; - int32 j; - - i = total_filled - OVERHEAD_SIZE / 2; - for(j = 0; j < 3; ++j) + i = total_filled - 1; + int64 start, end; + int64 line_start[4], line_end[4]; + int readed_lines = 0; + bool success = false; + int k; + while (i >= 0 && readed_lines < 4) { - if(!SkipNextEOL(part, i, total_filled)) - break; - line_start[j] = i; - } + GetFullLineFromEnd(start, end, part, i); - _part = part; - if(j < 3) - _size = 0; - else - { - int k; - for(k = 0; k < 2; ++k) - if(part[line_start[k]+0] == '>') + line_start[4 - readed_lines - 1] = start; + line_end[4 - readed_lines - 1] = end; + ++readed_lines; + + if (readed_lines >= 2) + { + k = 4 - readed_lines; + if (part[line_start[k]] == '>') + { + success = true; break; - if(k == 2) - _size = 0; - else - _size = line_start[k]; - } - } - else // FASTQ file - { + } + } + } // Looking for a FASTQ record at the end of the area - int64 line_start[9]; - int32 j; - - i = total_filled - OVERHEAD_SIZE / 2; - for(j = 0; j < 9; ++j) + if (!success) { - if(!SkipNextEOL(part, i, total_filled)) - break; - line_start[j] = i; + cerr << "Error: Wrong input file!\n"; + exit(1); } _part = part; - if(j < 9) - _size = 0; - else + _size = line_end[k + 1]; + } + else + { + i = total_filled - 1; + int64 start, end; + int64 line_start[8], line_end[8]; + int readed_lines = 0; + bool success = false; + int k; + while (i >= 0 && readed_lines < 8) { - int k; - for(k = 0; k < 4; ++k) + GetFullLineFromEnd(start, end, part, i); + line_start[8 - readed_lines - 1] = start; + line_end[8 - readed_lines - 1] = end; + ++readed_lines; + + if (readed_lines >= 4) { - if(part[line_start[k]+0] == '@' && part[line_start[k+2]+0] == '+') + k = 8 - readed_lines; + if (part[line_start[k]] == '@' && part[line_start[k + 2]] == '+') { - if(part[line_start[k+2]+1] == '\n' || part[line_start[k+2]+1] == '\r') + if (part[line_start[k + 2] + 1] == '\n' || part[line_start[k + 2] + 1] == '\r') + { + success = true; break; - if(line_start[k+1]-line_start[k] == line_start[k+3]-line_start[k+2] && - memcmp(part+line_start[k]+1, part+line_start[k+2]+1, line_start[k+3]-line_start[k+2]-1) == 0) + } + if (line_start[k + 1] - line_start[k] == line_start[k + 3] - line_start[k + 2] && + memcmp(part + line_start[k] + 1, part + line_start[k + 2] + 1, line_start[k + 3] - line_start[k + 2] - 1) == 0) + { + success = true; break; + } } } + } - if(k == 4) - _size = 0; - else - _size = line_start[k]; + if (!success) + { + cerr << "Error: Wrong input file!\n"; + exit(1); } + _part = part; + _size = line_end[k + 3]; } // Allocate new memory for the buffer @@ -287,6 +294,17 @@ bool CFastqReader::SkipNextEOL(uchar *part, int64 &pos, int64 max_pos) return true; } +void CFastqReader::GetFullLineFromEnd(int64& line_sart, int64& line_end, uchar* buff, int64& pos) +{ + while (pos >= 0 && buff[pos] != '\n' && buff[pos] != '\r') + --pos; + line_end = pos + 1; + while (pos >= 0 && (buff[pos] == '\n' || buff[pos] == '\r')) + --pos; + while (pos >= 0 && buff[pos] != '\n' && buff[pos] != '\r') + --pos; + line_sart = pos + 1; +}; //---------------------------------------------------------------------------------- // Check whether there is an EOF bool CFastqReader::IsEof() diff --git a/kmc_tools/fastq_reader.h b/kmc_tools/fastq_reader.h index 5249e1a..a323ae4 100644 --- a/kmc_tools/fastq_reader.h +++ b/kmc_tools/fastq_reader.h @@ -53,6 +53,9 @@ class CFastqReader { bool SkipNextEOL(uchar *part, int64 &pos, int64 max_pos); + void GetFullLineFromEnd(int64& line_sart, int64& line_end, uchar* buff, int64& pos); + + bool IsEof(); public: diff --git a/kmer_counter/bam_utils.h b/kmer_counter/bam_utils.h index a2cd8d6..d9b708b 100644 --- a/kmer_counter/bam_utils.h +++ b/kmer_counter/bam_utils.h @@ -1,3 +1,13 @@ +/* +This file is a part of KMC software distributed under GNU GPL 3 licence. +The homepage of the KMC project is http://sun.aei.polsl.pl/kmc + +Authors: Sebastian Deorowicz, Agnieszka Debudaj-Grabysz, Marek Kokot + +Version: 3.0.0 +Date : 2017-01-28 +*/ + #ifndef _BAM_UTILS_H #define _BAM_UTILS_H #include "defs.h" diff --git a/kmer_counter/fastq_reader.cpp b/kmer_counter/fastq_reader.cpp index 2a4c330..17f3d01 100644 --- a/kmer_counter/fastq_reader.cpp +++ b/kmer_counter/fastq_reader.cpp @@ -2,9 +2,9 @@ /* This file is a part of KMC software distributed under GNU GPL 3 licence. The homepage of the KMC project is http://sun.aei.polsl.pl/kmc - + Authors: Sebastian Deorowicz, Agnieszka Debudaj-Grabysz, Marek Kokot - + Version: 3.0.0 Date : 2017-01-28 */ @@ -32,16 +32,16 @@ CFastqReader::CFastqReader(CMemoryMonitor *_mm, CMemoryPoolWithBamSupport *_pmm_ part_queue = _part_queue; stats_part_queue = _stats_part_queue; - mm = _mm; + mm = _mm; pmm_fastq = _pmm_fastq; pmm_binary_file_reader = _pmm_binary_file_reader; - file_type = _file_type; + file_type = _file_type; kmer_len = _kmer_len; - + // Size and pointer for the buffer part_size = 1 << 23; - part = NULL; + part = NULL; containsNextChromosome = false; @@ -52,7 +52,7 @@ CFastqReader::CFastqReader(CMemoryMonitor *_mm, CMemoryPoolWithBamSupport *_pmm_ // Destructor - close the files CFastqReader::~CFastqReader() { - if(part) + if (part) pmm_fastq->free(part); } @@ -60,7 +60,7 @@ CFastqReader::~CFastqReader() // Set part size of the buffer bool CFastqReader::SetPartSize(uint64 _part_size) { - if(_part_size < (1 << 20) || _part_size > (1 << 30)) + if (_part_size < (1 << 20) || _part_size >(1 << 30)) return false; part_size = _part_size; @@ -69,11 +69,11 @@ bool CFastqReader::SetPartSize(uint64 _part_size) } void CFastqReader::ProcessBamBinaryPart(uchar* data, uint64 size, uint32 id, uint32 file_no) -{ +{ pmm_fastq->bam_reserve_gunzip(part, id); - + part_filled = 0; - + uint64_t inpos = 0; while (inpos < size) { @@ -95,7 +95,7 @@ void CFastqReader::ProcessBamBinaryPart(uchar* data, uint64 size, uint32 id, uin pmm_fastq->free(part); part = nullptr; return; - } + } uchar* prepare_for_splitter_data; uint64 prepare_for_splitter_size; @@ -160,7 +160,7 @@ void CFastqReader::ProcessBamBinaryPart(uchar* data, uint64 size, uint32 id, uin uint64_t CRC_pos = inpos - 8; uint32_t CRC; read_uint32_t(CRC, data, CRC_pos); - + uLong crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, part + part_filled - ISIZE, ISIZE); if (crc != CRC) @@ -168,7 +168,7 @@ void CFastqReader::ProcessBamBinaryPart(uchar* data, uint64 size, uint32 id, uin cerr << "Error: BGZF CRC check error. Input file may be corrupted\n"; exit(1); } - + } if (!bam_task_manager->PushGunzippedPart(part, part_filled, id, file_no)) pmm_fastq->free(part); @@ -273,7 +273,7 @@ void CFastqReader::PreparePartForSplitter(uchar* data, uint64 size, uint32 id, u { if (part_queue) - { + { part_queue->push(data, size); } else if (stats_part_queue) @@ -312,7 +312,7 @@ void CFastqReader::PreparePartForSplitter(uchar* data, uint64 size, uint32 id, u } if (span_multiple_bgzf_blocks) - { + { pmm_fastq->reserve(state.prev_part_data); memcpy(state.prev_part_data, data + bpos, size - bpos); state.prev_part_size = size - bpos; @@ -357,17 +357,17 @@ void CFastqReader::ProcessBam() while (bam_task_manager->PopTask(taskType, data, size, id, file_no)) { if (taskType == CBamTaskManager::TaskType::Gunzip) - { + { ProcessBamBinaryPart(data, size, id, file_no); pmm_binary_file_reader->free(data); - bam_task_manager->NotifyIdFinished(id); + bam_task_manager->NotifyIdFinished(id); pmm_fastq->bam_notify_id_finished(id); } else if (taskType == CBamTaskManager::TaskType::PrepareForSplitter) { PreparePartForSplitter(data, size, id, file_no); bam_task_manager->NotifySplitterPrepareTaskDone(); - } + } else { cerr << "Error: should never be here, plase contact authors, CODE: FastqReader_" << __LINE__ << "\n"; @@ -379,57 +379,57 @@ void CFastqReader::ProcessBam() // Read a part of the file in multi line fasta format bool CFastqReader::GetPartFromMultilneFasta(uchar *&_part, uint64 &_size) { - uint64 readed = 0; + uint64 readed = 0; - if(!containsNextChromosome) - { - if (data_src.Finished()) - return false; - } + if (!containsNextChromosome) + { + if (data_src.Finished()) + return false; + } - readed = data_src.read(part + part_filled, part_size - part_filled); - - int64 total_filled = part_filled + readed; - int64 last_header_pos = 0; - int64 pos = 0; - for(int64 i = 0 ; i < total_filled ;++i )//find last '>' and remove EOLs - { - if(part[i] == '>') - { - int64 tmp = i; - bool next_line = SkipNextEOL(part, i, total_filled); - if (!next_line) - i = total_filled; - copy(part+tmp, part+i, part+pos); - last_header_pos = pos; - pos += i - tmp; - if (!next_line) - break; - } - if(part[i] != '\n' && part[i] != '\r') - { - part[pos++] = part[i]; - } - } + readed = data_src.read(part + part_filled, part_size - part_filled); - _part = part; - if(last_header_pos == 0)//data in block belong to one seq - { - part_filled = kmer_len - 1; - _size = pos; - pmm_fastq->reserve(part); - copy(_part+_size-part_filled, _part+_size, part); - containsNextChromosome = false; + int64 total_filled = part_filled + readed; + int64 last_header_pos = 0; + int64 pos = 0; + for (int64 i = 0; i < total_filled; ++i)//find last '>' and remove EOLs + { + if (part[i] == '>') + { + int64 tmp = i; + bool next_line = SkipNextEOL(part, i, total_filled); + if (!next_line) + i = total_filled; + copy(part + tmp, part + i, part + pos); + last_header_pos = pos; + pos += i - tmp; + if (!next_line) + break; } - else//next seq starts at last_header_pos + if (part[i] != '\n' && part[i] != '\r') { - _size = last_header_pos; - part_filled = pos - last_header_pos; - pmm_fastq->reserve(part); - copy(_part + last_header_pos, _part + pos, part); - containsNextChromosome = true; + part[pos++] = part[i]; } - return true; + } + + _part = part; + if (last_header_pos == 0)//data in block belong to one seq + { + part_filled = kmer_len - 1; + _size = pos; + pmm_fastq->reserve(part); + copy(_part + _size - part_filled, _part + _size, part); + containsNextChromosome = false; + } + else//next seq starts at last_header_pos + { + _size = last_header_pos; + part_filled = pos - last_header_pos; + pmm_fastq->reserve(part); + copy(_part + last_header_pos, _part + pos, part); + containsNextChromosome = true; + } + return true; } bool CFastqReader::GetPartNew(uchar *&_part, uint64 &_size) @@ -442,19 +442,13 @@ bool CFastqReader::GetPartNew(uchar *&_part, uint64 &_size) uint64 readed; // Read data - readed = data_src.read(part + part_filled, part_size); + readed = data_src.read(part + part_filled, part_size - part_filled); //std::cerr.write((char*)part + part_filled, readed); int64 total_filled = part_filled + readed; int64 i; - if (part_filled >= OVERHEAD_SIZE) - { - cerr << "Error: Wrong input file!\n"; - exit(1); - } - if (data_src.Finished()) { _part = part; @@ -468,70 +462,83 @@ bool CFastqReader::GetPartNew(uchar *&_part, uint64 &_size) if (file_type == fasta) // FASTA files { // Looking for a FASTA record at the end of the area - int64 line_start[3]; - int32 j; - - i = total_filled - OVERHEAD_SIZE / 2; - for (j = 0; j < 3; ++j) + i = total_filled - 1; + int64 start, end; + int64 line_start[4], line_end[4]; + int readed_lines = 0; + bool success = false; + int k; + while (i >= 0 && readed_lines < 4) { - if (!SkipNextEOL(part, i, total_filled)) - break; - line_start[j] = i; - } + GetFullLineFromEnd(start, end, part, i); - _part = part; - if (j < 3) - _size = 0; - else - { - int k; - for (k = 0; k < 2; ++k) - if (part[line_start[k] + 0] == '>') - break; + line_start[4 - readed_lines - 1] = start; + line_end[4 - readed_lines - 1] = end; + ++readed_lines; - if (k == 2) - _size = 0; - else - _size = line_start[k]; + if (readed_lines >= 2) + { + k = 4 - readed_lines; + if (part[line_start[k]] == '>') + { + success = true; + break; + + } + } } - } - else // FASTQ file - { // Looking for a FASTQ record at the end of the area - int64 line_start[9]; - int32 j; - - i = total_filled - OVERHEAD_SIZE / 2; - for (j = 0; j < 9; ++j) + if (!success) { - if (!SkipNextEOL(part, i, total_filled)) - break; - line_start[j] = i; + cerr << "Error: Wrong input file!\n"; + exit(1); } _part = part; - if (j < 9) - _size = 0; - else - { - int k; - for (k = 0; k < 4; ++k) + _size = line_end[k + 1]; + } + else + { + i = total_filled - 1; + int64 start, end; + int64 line_start[8], line_end[8]; + int readed_lines = 0; + bool success = false; + int k; + while (i >= 0 && readed_lines < 8) + { + GetFullLineFromEnd(start, end, part, i); + line_start[8 - readed_lines - 1] = start; + line_end[8 - readed_lines - 1] = end; + ++readed_lines; + + if (readed_lines >= 4) { - if (part[line_start[k] + 0] == '@' && part[line_start[k + 2] + 0] == '+') + k = 8 - readed_lines; + if (part[line_start[k]] == '@' && part[line_start[k + 2]] == '+') { if (part[line_start[k + 2] + 1] == '\n' || part[line_start[k + 2] + 1] == '\r') + { + success = true; break; + } if (line_start[k + 1] - line_start[k] == line_start[k + 3] - line_start[k + 2] && memcmp(part + line_start[k] + 1, part + line_start[k + 2] + 1, line_start[k + 3] - line_start[k + 2] - 1) == 0) + { + success = true; break; + } } } + } - if (k == 4) - _size = 0; - else - _size = line_start[k]; + if (!success) + { + cerr << "Error: Wrong input file!\n"; + exit(1); } + _part = part; + _size = line_end[k + 3]; } // Allocate new memory for the buffer @@ -547,18 +554,29 @@ bool CFastqReader::GetPartNew(uchar *&_part, uint64 &_size) bool CFastqReader::SkipNextEOL(uchar *part, int64 &pos, int64 max_pos) { int64 i; - for(i = pos; i < max_pos-2; ++i) - if((part[i] == '\n' || part[i] == '\r') && !(part[i+1] == '\n' || part[i+1] == '\r')) + for (i = pos; i < max_pos - 2; ++i) + if ((part[i] == '\n' || part[i] == '\r') && !(part[i + 1] == '\n' || part[i + 1] == '\r')) break; - if(i >= max_pos-2) + if (i >= max_pos - 2) return false; - pos = i+1; + pos = i + 1; return true; } +void CFastqReader::GetFullLineFromEnd(int64& line_sart, int64& line_end, uchar* buff, int64& pos) +{ + while (pos >= 0 && buff[pos] != '\n' && buff[pos] != '\r') + --pos; + line_end = pos + 1; + while (pos >= 0 && (buff[pos] == '\n' || buff[pos] == '\r')) + --pos; + while (pos >= 0 && buff[pos] != '\n' && buff[pos] != '\r') + --pos; + line_sart = pos + 1; +}; @@ -569,7 +587,7 @@ void CFastqReaderDataSrc::init_stream() { switch (compression_type) { - case CompressionType::plain: + case CompressionType::plain: in_data_pos = 0; break; case CompressionType::gzip: @@ -628,7 +646,7 @@ uint64 CFastqReaderDataSrc::read(uchar* buff, uint64 size) end_reached = true; return 0; } - in_progress = true; + in_progress = true; init_stream(); } @@ -644,10 +662,10 @@ uint64 CFastqReaderDataSrc::read(uchar* buff, uint64 size) { pmm_binary_file_reader->free(in_data); in_data = nullptr; - if(!binary_pack_queue->pop(in_data, in_data_size, file_part, compression_type)) + if (!binary_pack_queue->pop(in_data, in_data_size, file_part, compression_type)) return 0; stream.avail_in = (uint32)in_data_size; - stream.next_in = in_data; + stream.next_in = in_data; } ret = inflate(&stream, Z_NO_FLUSH); @@ -657,7 +675,7 @@ uint64 CFastqReaderDataSrc::read(uchar* buff, uint64 size) case Z_NEED_DICT: ret = Z_DATA_ERROR; /* and fall through */ case Z_DATA_ERROR: - cerr << "Some error while reading gzip file\n"; + cerr << "Some error while reading gzip file\n"; exit(1); case Z_MEM_ERROR: inflateEnd(&stream); @@ -665,7 +683,7 @@ uint64 CFastqReaderDataSrc::read(uchar* buff, uint64 size) } if (ret == Z_STREAM_END) - { + { bool multistream = stream.avail_in || !binary_pack_queue->is_next_last(); if (!multistream) { @@ -685,7 +703,7 @@ uint64 CFastqReaderDataSrc::read(uchar* buff, uint64 size) { //equivalent of inflateReset /* inflateEnd(&stream); - if (inflateInit2(&stream, 31) != Z_OK) + if (inflateInit2(&stream, 31) != Z_OK) { cerr << "Error while reading gzip file\n"; exit(1); @@ -713,13 +731,13 @@ uint64 CFastqReaderDataSrc::read(uchar* buff, uint64 size) in_data = nullptr; binary_pack_queue->pop(in_data, in_data_size, file_part, compression_type); _bz_stram.avail_in = (uint32)in_data_size; - _bz_stram.next_in = (char*)in_data; + _bz_stram.next_in = (char*)in_data; } ret = BZ2_bzDecompress(&_bz_stram); if (ret == BZ_PARAM_ERROR || ret == BZ_DATA_ERROR || ret == BZ_DATA_ERROR_MAGIC || ret == BZ_MEM_ERROR) { BZ2_bzDecompressEnd(&_bz_stram); - cerr << "bz2 reading error\n"; + cerr << "bz2 reading error\n"; } if (ret == BZ_STREAM_END) { @@ -752,7 +770,7 @@ uint64 CFastqReaderDataSrc::read(uchar* buff, uint64 size) } while (_bz_stram.avail_out); return size - _bz_stram.avail_out; } - else if (compression_type == CompressionType::plain) + else if (compression_type == CompressionType::plain) { uint64 out_pos = 0; do @@ -799,10 +817,10 @@ CWFastqReader::CWFastqReader(CKMCParams &Params, CKMCQueues &Queues, CBinaryPack //input_files_queue = Queues.input_files_queue; binary_pack_queue = _binary_pack_queue; bam_task_manager = Queues.bam_task_manager; - part_size = Params.fastq_buffer_size; - part_queue = Queues.part_queue; - file_type = Params.file_type; - kmer_len = Params.p_k; + part_size = Params.fastq_buffer_size; + part_queue = Queues.part_queue; + file_type = Params.file_type; + kmer_len = Params.p_k; fqr = NULL; diff --git a/kmer_counter/fastq_reader.h b/kmer_counter/fastq_reader.h index eba6d4a..9d55889 100644 --- a/kmer_counter/fastq_reader.h +++ b/kmer_counter/fastq_reader.h @@ -103,6 +103,7 @@ class CFastqReader { bool SkipNextEOL(uchar *part, int64 &pos, int64 max_pos); + void GetFullLineFromEnd(int64& line_sart, int64& line_end, uchar* buff, int64& pos); void ProcessBamBinaryPart(uchar* data, uint64 size, uint32 id, uint32 file_no); void PreparePartForSplitter(uchar* data, uint64 size, uint32 id, uint32 file_no); diff --git a/kmer_counter/splitter.h b/kmer_counter/splitter.h index 0d78c00..a0a607d 100644 --- a/kmer_counter/splitter.h +++ b/kmer_counter/splitter.h @@ -91,7 +91,8 @@ template class CSplitter { ~CSplitter(); }; -template uint32 CSplitter::MAX_LINE_SIZE = 1 << 14; +//template uint32 CSplitter::MAX_LINE_SIZE = 1 << 14; +template uint32 CSplitter::MAX_LINE_SIZE = 1 << 16; //************************************************************************************************************