forked from quickwit-oss/tantivy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
segment_postings.rs
306 lines (279 loc) · 10.6 KB
/
segment_postings.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
use common::HasLen;
use crate::docset::DocSet;
use crate::fastfield::AliveBitSet;
use crate::positions::PositionReader;
use crate::postings::compression::COMPRESSION_BLOCK_SIZE;
use crate::postings::{branchless_binary_search, BlockSegmentPostings, Postings};
use crate::{DocId, TERMINATED};
/// `SegmentPostings` represents the inverted list or postings associated with
/// a term in a `Segment`.
///
/// As we iterate through the `SegmentPostings`, the frequencies are optionally decoded.
/// Positions on the other hand, are optionally entirely decoded upfront.
#[derive(Clone)]
pub struct SegmentPostings {
pub(crate) block_cursor: BlockSegmentPostings,
cur: usize,
position_reader: Option<PositionReader>,
}
impl SegmentPostings {
/// Returns an empty segment postings object
pub fn empty() -> Self {
SegmentPostings {
block_cursor: BlockSegmentPostings::empty(),
cur: 0,
position_reader: None,
}
}
/// Compute the number of non-deleted documents.
///
/// This method will clone and scan through the posting lists.
/// (this is a rather expensive operation).
pub fn doc_freq_given_deletes(&self, alive_bitset: &AliveBitSet) -> u32 {
let mut docset = self.clone();
let mut doc_freq = 0;
loop {
let doc = docset.doc();
if doc == TERMINATED {
return doc_freq;
}
if alive_bitset.is_alive(doc) {
doc_freq += 1u32;
}
docset.advance();
}
}
/// Returns the overall number of documents in the block postings.
/// It does not take in account whether documents are deleted or not.
pub fn doc_freq(&self) -> u32 {
self.block_cursor.doc_freq()
}
/// Creates a segment postings object with the given documents
/// and no frequency encoded.
///
/// This method is mostly useful for unit tests.
///
/// It serializes the doc ids using tantivy's codec
/// and returns a `SegmentPostings` object that embeds a
/// buffer with the serialized data.
#[cfg(test)]
pub fn create_from_docs(docs: &[u32]) -> SegmentPostings {
use crate::directory::FileSlice;
use crate::postings::serializer::PostingsSerializer;
use crate::schema::IndexRecordOption;
let mut buffer = Vec::new();
{
let mut postings_serializer =
PostingsSerializer::new(&mut buffer, 0.0, IndexRecordOption::Basic, None);
postings_serializer.new_term(docs.len() as u32, false);
for &doc in docs {
postings_serializer.write_doc(doc, 1u32);
}
postings_serializer
.close_term(docs.len() as u32)
.expect("In memory Serialization should never fail.");
}
let block_segment_postings = BlockSegmentPostings::open(
docs.len() as u32,
FileSlice::from(buffer),
IndexRecordOption::Basic,
IndexRecordOption::Basic,
)
.unwrap();
SegmentPostings::from_block_postings(block_segment_postings, None)
}
/// Helper functions to create `SegmentPostings` for tests.
#[cfg(test)]
pub fn create_from_docs_and_tfs(
doc_and_tfs: &[(u32, u32)],
fieldnorms: Option<&[u32]>,
) -> SegmentPostings {
use crate::directory::FileSlice;
use crate::fieldnorm::FieldNormReader;
use crate::postings::serializer::PostingsSerializer;
use crate::schema::IndexRecordOption;
use crate::Score;
let mut buffer: Vec<u8> = Vec::new();
let fieldnorm_reader = fieldnorms.map(FieldNormReader::for_test);
let average_field_norm = fieldnorms
.map(|fieldnorms| {
if fieldnorms.is_empty() {
return 0.0;
}
let total_num_tokens: u64 = fieldnorms
.iter()
.map(|&fieldnorm| fieldnorm as u64)
.sum::<u64>();
total_num_tokens as Score / fieldnorms.len() as Score
})
.unwrap_or(0.0);
let mut postings_serializer = PostingsSerializer::new(
&mut buffer,
average_field_norm,
IndexRecordOption::WithFreqs,
fieldnorm_reader,
);
postings_serializer.new_term(doc_and_tfs.len() as u32, true);
for &(doc, tf) in doc_and_tfs {
postings_serializer.write_doc(doc, tf);
}
postings_serializer
.close_term(doc_and_tfs.len() as u32)
.unwrap();
let block_segment_postings = BlockSegmentPostings::open(
doc_and_tfs.len() as u32,
FileSlice::from(buffer),
IndexRecordOption::WithFreqs,
IndexRecordOption::WithFreqs,
)
.unwrap();
SegmentPostings::from_block_postings(block_segment_postings, None)
}
/// Reads a Segment postings from an &[u8]
///
/// * `len` - number of document in the posting lists.
/// * `data` - data array. The complete data is not necessarily used.
/// * `freq_handler` - the freq handler is in charge of decoding frequencies and/or positions
pub(crate) fn from_block_postings(
segment_block_postings: BlockSegmentPostings,
position_reader: Option<PositionReader>,
) -> SegmentPostings {
SegmentPostings {
block_cursor: segment_block_postings,
cur: 0, // cursor within the block
position_reader,
}
}
}
impl DocSet for SegmentPostings {
// goes to the next element.
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> DocId {
debug_assert!(self.block_cursor.block_is_loaded());
if self.cur == COMPRESSION_BLOCK_SIZE - 1 {
self.cur = 0;
self.block_cursor.advance();
} else {
self.cur += 1;
}
self.doc()
}
fn seek(&mut self, target: DocId) -> DocId {
debug_assert!(self.doc() <= target);
if self.doc() >= target {
return self.doc();
}
self.block_cursor.seek(target);
// At this point we are on the block, that might contain our document.
let output = self.block_cursor.full_block();
self.cur = branchless_binary_search(output, target);
// The last block is not full and padded with the value TERMINATED,
// so that we are guaranteed to have at least doc in the block (a real one or the padding)
// that is greater or equal to the target.
debug_assert!(self.cur < COMPRESSION_BLOCK_SIZE);
// `doc` is now the first element >= `target`
// If all docs are smaller than target the current block should be incomplemented and padded
// with the value `TERMINATED`.
//
// After the search, the cursor should point to the first value of TERMINATED.
let doc = output[self.cur];
debug_assert!(doc >= target);
debug_assert_eq!(doc, self.doc());
doc
}
/// Return the current document's `DocId`.
#[inline]
fn doc(&self) -> DocId {
self.block_cursor.doc(self.cur)
}
fn size_hint(&self) -> u32 {
self.len() as u32
}
}
impl HasLen for SegmentPostings {
fn len(&self) -> usize {
self.block_cursor.doc_freq() as usize
}
}
impl Postings for SegmentPostings {
/// Returns the frequency associated with the current document.
/// If the schema is set up so that no frequency have been encoded,
/// this method should always return 1.
///
/// # Panics
///
/// Will panics if called without having called advance before.
fn term_freq(&self) -> u32 {
debug_assert!(
// Here we do not use the len of `freqs()`
// because it is actually ok to request for the freq of doc
// even if no frequency were encoded for the field.
//
// In that case we hit the block just as if the frequency had been
// decoded. The block is simply prefilled by the value 1.
self.cur < COMPRESSION_BLOCK_SIZE,
"Have you forgotten to call `.advance()` at least once before calling `.term_freq()`."
);
self.block_cursor.freq(self.cur)
}
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq();
let prev_len = output.len();
if let Some(position_reader) = self.position_reader.as_mut() {
debug_assert!(
!self.block_cursor.freqs().is_empty(),
"No positions available"
);
let read_offset = self.block_cursor.position_offset()
+ (self.block_cursor.freqs()[..self.cur]
.iter()
.cloned()
.sum::<u32>() as u64);
// TODO: instead of zeroing the output, we could use MaybeUninit or similar.
output.resize(prev_len + term_freq as usize, 0u32);
position_reader.read(read_offset, &mut output[prev_len..]);
let mut cum = offset;
for output_mut in output[prev_len..].iter_mut() {
cum += *output_mut;
*output_mut = cum;
}
}
}
}
#[cfg(test)]
mod tests {
use common::HasLen;
use super::SegmentPostings;
use crate::docset::{DocSet, TERMINATED};
use crate::fastfield::AliveBitSet;
use crate::postings::postings::Postings;
#[test]
fn test_empty_segment_postings() {
let mut postings = SegmentPostings::empty();
assert_eq!(postings.advance(), TERMINATED);
assert_eq!(postings.advance(), TERMINATED);
assert_eq!(postings.len(), 0);
}
#[test]
fn test_empty_postings_doc_returns_terminated() {
let mut postings = SegmentPostings::empty();
assert_eq!(postings.doc(), TERMINATED);
assert_eq!(postings.advance(), TERMINATED);
}
#[test]
fn test_empty_postings_doc_term_freq_returns_0() {
let postings = SegmentPostings::empty();
assert_eq!(postings.term_freq(), 1);
}
#[test]
fn test_doc_freq() {
let docs = SegmentPostings::create_from_docs(&[0, 2, 10]);
assert_eq!(docs.doc_freq(), 3);
let alive_bitset = AliveBitSet::for_test_from_deleted_docs(&[2], 12);
assert_eq!(docs.doc_freq_given_deletes(&alive_bitset), 2);
let all_deleted =
AliveBitSet::for_test_from_deleted_docs(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], 12);
assert_eq!(docs.doc_freq_given_deletes(&all_deleted), 0);
}
}