forked from quickwit-oss/tantivy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
json_postings_writer.rs
109 lines (101 loc) · 3.89 KB
/
json_postings_writer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use std::io;
use common::json_path_writer::JSON_END_OF_PATH;
use stacker::Addr;
use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder};
use crate::postings::{FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter};
use crate::schema::{Field, Type};
use crate::tokenizer::TokenStream;
use crate::{DocId, Term};
/// The `JsonPostingsWriter` is odd in that it relies on a hidden contract:
///
/// `subscribe` is called directly to index non-text tokens, while
/// `index_text` is used to index text.
#[derive(Default)]
pub(crate) struct JsonPostingsWriter<Rec: Recorder> {
str_posting_writer: SpecializedPostingsWriter<Rec>,
non_str_posting_writer: SpecializedPostingsWriter<DocIdRecorder>,
}
impl<Rec: Recorder> From<JsonPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
fn from(json_postings_writer: JsonPostingsWriter<Rec>) -> Box<dyn PostingsWriter> {
Box::new(json_postings_writer)
}
}
impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
#[inline]
fn subscribe(
&mut self,
doc: crate::DocId,
pos: u32,
term: &crate::Term,
ctx: &mut IndexingContext,
) {
self.non_str_posting_writer.subscribe(doc, pos, term, ctx);
}
fn index_text(
&mut self,
doc_id: DocId,
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
) {
self.str_posting_writer.index_text(
doc_id,
token_stream,
term_buffer,
ctx,
indexing_position,
);
}
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
&self,
ordered_term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut term_buffer = Term::with_capacity(48);
let mut buffer_lender = BufferLender::default();
term_buffer.clear_with_field_and_type(Type::Json, Field::from_field_id(0));
let mut prev_term_id = u32::MAX;
let mut term_path_len = 0; // this will be set in the first iteration
for (_field, path_id, term, addr) in ordered_term_addrs {
if prev_term_id != path_id.path_id() {
term_buffer.truncate_value_bytes(0);
term_buffer.append_path(ordered_id_to_path[path_id.path_id() as usize].as_bytes());
term_buffer.append_bytes(&[JSON_END_OF_PATH]);
term_path_len = term_buffer.len_bytes();
prev_term_id = path_id.path_id();
}
term_buffer.truncate_value_bytes(term_path_len);
term_buffer.append_bytes(term);
if let Some(json_value) = term_buffer.value().as_json_value_bytes() {
let typ = json_value.typ();
if typ == Type::Str {
SpecializedPostingsWriter::<Rec>::serialize_one_term(
term_buffer.serialized_value_bytes(),
*addr,
&mut buffer_lender,
ctx,
serializer,
)?;
} else {
SpecializedPostingsWriter::<DocIdRecorder>::serialize_one_term(
term_buffer.serialized_value_bytes(),
*addr,
&mut buffer_lender,
ctx,
serializer,
)?;
}
}
}
Ok(())
}
fn total_num_tokens(&self) -> u64 {
self.str_posting_writer.total_num_tokens() + self.non_str_posting_writer.total_num_tokens()
}
}