Skip to content
代码片段 群组 项目
未验证 提交 06a5a68a 编辑于 作者: Alex Diachenko's avatar Alex Diachenko 提交者: GitHub
浏览文件

KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (#9320)

The `org.apache.kafka.connect.data.Values#parse` method parses integers, which are larger than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`.

That means we are losing precision for these larger integers.

For example:
`SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");`
returns:
`SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}`

Also, this method parses values that can be parsed as `FLOAT32` to `FLOAT64`.

This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that don't have fraction part(`decimal.scale()!=0`) only, and use an arbitrary-precision `org.apache.kafka.connect.data.Decimal` otherwise.
Also, it updates the method to parse numbers, that can be represented as `float` to `FLOAT64`.

Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double` types.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
上级 34efc057
No related branches found
No related tags found
无相关合并请求
......@@ -943,8 +943,14 @@ public class Values {
} catch (ArithmeticException e) {
// continue
}
float fValue = decimal.floatValue();
if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY
&& decimal.scale() != 0) {
return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue);
}
double dValue = decimal.doubleValue();
if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) {
if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY
&& decimal.scale() != 0) {
return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
}
Schema schema = Decimal.schema(decimal.scale());
......
......@@ -21,6 +21,8 @@ import org.apache.kafka.connect.data.Values.Parser;
import org.apache.kafka.connect.errors.DataException;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -726,6 +728,132 @@ public class ValuesTest {
public void canConsume() {
}
@Test
public void shouldParseBigIntegerAsDecimalWithZeroScale() {
BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new BigInteger("1"));
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Decimal.schema(0), schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof BigDecimal);
assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
value = BigInteger.valueOf(Long.MIN_VALUE).subtract(new BigInteger("1"));
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Decimal.schema(0), schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof BigDecimal);
assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
}
@Test
public void shouldParseByteAsInt8() {
Byte value = Byte.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Byte);
assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
value = Byte.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Byte);
assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
}
@Test
public void shouldParseShortAsInt16() {
Short value = Short.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Short);
assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
value = Short.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Short);
assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
}
@Test
public void shouldParseIntegerAsInt32() {
Integer value = Integer.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Integer);
assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
value = Integer.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Integer);
assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
}
@Test
public void shouldParseLongAsInt64() {
Long value = Long.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Long);
assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
value = Long.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Long);
assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
}
@Test
public void shouldParseFloatAsFloat32() {
Float value = Float.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Float);
assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
value = -Float.MAX_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Float);
assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
}
@Test
public void shouldParseDoubleAsFloat64() {
Double value = Double.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Double);
assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0);
value = -Double.MAX_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Double);
assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0);
}
protected void assertParsed(String input) {
assertParsed(input, input);
}
......
0% 加载中 .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册