forked from quickwit-oss/tantivy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.rs
149 lines (136 loc) · 6.22 KB
/
reader.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use std::io;
use common::{BinarySerializable, VInt};
use crate::directory::OwnedBytes;
use crate::positions::COMPRESSION_BLOCK_SIZE;
use crate::postings::compression::{BlockDecoder, VIntDecoder};
/// When accessing the positions of a term, we get a positions_idx from the `Terminfo`.
/// This means we need to skip to the `nth` position efficiently.
///
/// Blocks are compressed using bitpacking, so `skip_read` contains the number of bits
/// (values can go from 0 to 32 bits) required to decompress every block.
///
/// A given block obviously takes `(128 x num_bit_for_the_block / num_bits_in_a_byte)`,
/// so skipping a block without decompressing it is just a matter of advancing that many
/// bytes.
#[derive(Clone)]
pub struct PositionReader {
bit_widths: OwnedBytes,
positions: OwnedBytes,
block_decoder: BlockDecoder,
// offset, expressed in positions, for the first position of the block currently loaded
// block_offset is a multiple of COMPRESSION_BLOCK_SIZE.
block_offset: u64,
// offset, expressed in positions, for the position of the first block encoded
// in the `self.positions` bytes, and if bitpacked, compressed using the bitwidth in
// `self.bit_widths`.
//
// As we advance, anchor increases simultaneously with bit_widths and positions get consumed.
anchor_offset: u64,
// These are just copies used for .reset().
original_bit_widths: OwnedBytes,
original_positions: OwnedBytes,
}
impl PositionReader {
/// Open and reads the term positions encoded into the positions_data owned bytes.
pub fn open(mut positions_data: OwnedBytes) -> io::Result<PositionReader> {
let num_positions_bitpacked_blocks = VInt::deserialize(&mut positions_data)?.0 as usize;
let (bit_widths, positions) = positions_data.split(num_positions_bitpacked_blocks);
Ok(PositionReader {
bit_widths: bit_widths.clone(),
positions: positions.clone(),
block_decoder: BlockDecoder::default(),
block_offset: i64::MAX as u64,
anchor_offset: 0u64,
original_bit_widths: bit_widths,
original_positions: positions,
})
}
fn reset(&mut self) {
self.positions = self.original_positions.clone();
self.bit_widths = self.original_bit_widths.clone();
self.block_offset = i64::MAX as u64;
self.anchor_offset = 0u64;
}
/// Advance from num_blocks bitpacked blocks.
///
/// Panics if there are not that many remaining blocks.
fn advance_num_blocks(&mut self, num_blocks: usize) {
let num_bits: usize = self.bit_widths.as_ref()[..num_blocks]
.iter()
.cloned()
.map(|num_bits| num_bits as usize)
.sum();
let num_bytes_to_skip = num_bits * COMPRESSION_BLOCK_SIZE / 8;
self.bit_widths.advance(num_blocks);
self.positions.advance(num_bytes_to_skip);
self.anchor_offset += (num_blocks * COMPRESSION_BLOCK_SIZE) as u64;
}
/// block_rel_id is counted relatively to the anchor.
/// block_rel_id = 0 means the anchor block.
/// block_rel_id = i means the ith block after the anchor block.
fn load_block(&mut self, block_rel_id: usize) {
let bit_widths = self.bit_widths.as_slice();
let byte_offset: usize = bit_widths[0..block_rel_id]
.iter()
.map(|&b| b as usize)
.sum::<usize>()
* COMPRESSION_BLOCK_SIZE
/ 8;
let compressed_data = &self.positions.as_slice()[byte_offset..];
if bit_widths.len() > block_rel_id {
// that block is bitpacked.
let bit_width = bit_widths[block_rel_id];
self.block_decoder
.uncompress_block_unsorted(compressed_data, bit_width, false);
} else {
// that block is vint encoded.
self.block_decoder
.uncompress_vint_unsorted_until_end(compressed_data);
}
self.block_offset = self.anchor_offset + (block_rel_id * COMPRESSION_BLOCK_SIZE) as u64;
}
/// Fills a buffer with the positions `[offset..offset+output.len())` integers.
///
/// This function is optimized to be called with increasing values of `offset`.
pub fn read(&mut self, mut offset: u64, mut output: &mut [u32]) {
if offset < self.anchor_offset {
self.reset();
}
let delta_to_block_offset = offset as i64 - self.block_offset as i64;
if !(0..128).contains(&delta_to_block_offset) {
// The first position is not within the first block.
// (Note that it could be before or after)
// We need to possibly skip a few blocks, and decompress the first relevant block.
let delta_to_anchor_offset = offset - self.anchor_offset;
let num_blocks_to_skip =
(delta_to_anchor_offset / (COMPRESSION_BLOCK_SIZE as u64)) as usize;
self.advance_num_blocks(num_blocks_to_skip);
self.load_block(0);
} else {
// The request offset is within the loaded block.
// We still need to advance anchor_offset to our current block.
let num_blocks_to_skip =
((self.block_offset - self.anchor_offset) / COMPRESSION_BLOCK_SIZE as u64) as usize;
self.advance_num_blocks(num_blocks_to_skip);
}
// At this point, the block containing offset is loaded, and anchor has
// been updated to point to it as well.
for i in 1.. {
// we copy the part from block i - 1 that is relevant.
let offset_in_block = (offset as usize) % COMPRESSION_BLOCK_SIZE;
let remaining_in_block = COMPRESSION_BLOCK_SIZE - offset_in_block;
if remaining_in_block >= output.len() {
output.copy_from_slice(
&self.block_decoder.output_array()[offset_in_block..][..output.len()],
);
break;
}
output[..remaining_in_block]
.copy_from_slice(&self.block_decoder.output_array()[offset_in_block..]);
output = &mut output[remaining_in_block..];
// we load block #i if necessary.
offset += remaining_in_block as u64;
self.load_block(i);
}
}
}