diff --git a/src/binlog.rs b/src/binlog.rs index c80320f..d846f62 100644 --- a/src/binlog.rs +++ b/src/binlog.rs @@ -20,6 +20,7 @@ struct DecimalVal { impl DecimalVal { fn decode(input: &[u8], precision: u8, scale: u8) -> IResult<&[u8], Self> { + //info!("precision:{}, scale:{}", precision, scale); let integral = precision - scale; let uncomp_intg = integral / DIG_PER_DEC as u8; let uncomp_frac = scale / DIG_PER_DEC as u8; @@ -28,14 +29,18 @@ impl DecimalVal { let comp_frac_bytes = COMPRESSED_BYTES[comp_frac as usize]; let comp_intg_bytes = COMPRESSED_BYTES[comp_intg as usize]; - + //info!("comp_frac_bytes:{} comp_intg_bytes:{}", comp_frac_bytes, comp_intg_bytes); let total_bytes = 4 * uncomp_intg + 4 * uncomp_frac + comp_frac_bytes as u8 + comp_intg_bytes as u8; - + //info!("total bytes:{}", total_bytes); let (rest, decimal_bs) = take_bytes(input, total_bytes as usize)?; + //info!("decimal byte:{:?}", decimal_bs); let is_negative = (decimal_bs[0] & 0x80) == 0; + //info!("is_negative:{}", is_negative); + let mut bs = BytesMut::new(); - bs.resize(total_bytes as usize, 0); + //bs.resize(total_bytes as usize, 0); bs.extend_from_slice(decimal_bs); + //info!("bs:{:?}", bs); bs[0] ^= 0x80; if is_negative { @@ -48,6 +53,8 @@ impl DecimalVal { intg_str = "-".to_string(); } let mut decimal_cursor = Cursor::new(bs.as_bytes()); + //info!("decimal_cursor:{:?}", bs.as_bytes()); + let mut is_intg_empty = true; // compressed integral if comp_intg_bytes > 0 { @@ -81,7 +88,7 @@ impl DecimalVal { let value = decimal_cursor.read_u32::().unwrap(); frac_str += format!("{value:0size$}", value = value, size = DIG_PER_DEC).as_str(); } - + //info!("intg_str:{}, frac_str:{}", &intg_str, &frac_str); // compressed fractional if comp_frac_bytes > 0 { let value = decimal_cursor.read_uint::(comp_frac_bytes).unwrap(); diff --git a/src/executor.rs b/src/executor.rs index 82076a9..61a8eb7 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -596,6 +596,7 @@ fn worker_body(thread_id: usize, rx: Receiver, mapping: &mut TableMet } let mut message = DmlMessage::from_dml(current_data, &mut meta); let json_str = message.format_json(&mut meta); + info!("will sent:\n{}", &json_str); if ports.len() > 0 { for (mq_name, topic) in ports { let msg_qu = QueueMessage { topic, payload: json_str.clone(), pos };