Skip to content

Commit

Permalink
修复decimal数据处理的bug
Browse files Browse the repository at this point in the history
  • Loading branch information
ipconfiger committed Mar 14, 2024
1 parent d1d4e60 commit 4acf0bf
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/binlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ struct DecimalVal {

impl DecimalVal {
fn decode(input: &[u8], precision: u8, scale: u8) -> IResult<&[u8], Self> {
//info!("precision:{}, scale:{}", precision, scale);
let integral = precision - scale;
let uncomp_intg = integral / DIG_PER_DEC as u8;
let uncomp_frac = scale / DIG_PER_DEC as u8;
Expand All @@ -28,14 +29,18 @@ impl DecimalVal {

let comp_frac_bytes = COMPRESSED_BYTES[comp_frac as usize];
let comp_intg_bytes = COMPRESSED_BYTES[comp_intg as usize];

//info!("comp_frac_bytes:{} comp_intg_bytes:{}", comp_frac_bytes, comp_intg_bytes);
let total_bytes = 4 * uncomp_intg + 4 * uncomp_frac + comp_frac_bytes as u8 + comp_intg_bytes as u8;

//info!("total bytes:{}", total_bytes);
let (rest, decimal_bs) = take_bytes(input, total_bytes as usize)?;
//info!("decimal byte:{:?}", decimal_bs);
let is_negative = (decimal_bs[0] & 0x80) == 0;
//info!("is_negative:{}", is_negative);

let mut bs = BytesMut::new();
bs.resize(total_bytes as usize, 0);
//bs.resize(total_bytes as usize, 0);
bs.extend_from_slice(decimal_bs);
//info!("bs:{:?}", bs);

bs[0] ^= 0x80;
if is_negative {
Expand All @@ -48,6 +53,8 @@ impl DecimalVal {
intg_str = "-".to_string();
}
let mut decimal_cursor = Cursor::new(bs.as_bytes());
//info!("decimal_cursor:{:?}", bs.as_bytes());

let mut is_intg_empty = true;
// compressed integral
if comp_intg_bytes > 0 {
Expand Down Expand Up @@ -81,7 +88,7 @@ impl DecimalVal {
let value = decimal_cursor.read_u32::<BigEndian>().unwrap();
frac_str += format!("{value:0size$}", value = value, size = DIG_PER_DEC).as_str();
}

//info!("intg_str:{}, frac_str:{}", &intg_str, &frac_str);
// compressed fractional
if comp_frac_bytes > 0 {
let value = decimal_cursor.read_uint::<BigEndian>(comp_frac_bytes).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ fn worker_body(thread_id: usize, rx: Receiver<RowEvents>, mapping: &mut TableMet
}
let mut message = DmlMessage::from_dml(current_data, &mut meta);
let json_str = message.format_json(&mut meta);
info!("will sent:\n{}", &json_str);
if ports.len() > 0 {
for (mq_name, topic) in ports {
let msg_qu = QueueMessage { topic, payload: json_str.clone(), pos };
Expand Down

0 comments on commit 4acf0bf

Please sign in to comment.