diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java index 44131a01b..157d2e676 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java @@ -30,11 +30,14 @@ import java.lang.reflect.InvocationTargetException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Clob; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Struct; import java.sql.Timestamp; import java.sql.Types; import java.time.LocalDateTime; @@ -106,7 +109,7 @@ record = recordBuilder.build(); @Override protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException { - if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) { + if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB || sqlType == Types.STRUCT) { handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); } else { setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); @@ -257,6 +260,31 @@ private byte[] getBfileBytes(ResultSet resultSet, String columnName) throws SQLE } } + private byte[] getBfileBytes(Object bfile) throws SQLException { + if (bfile == null) { + return null; + } + try { + ClassLoader classLoader = bfile.getClass().getClassLoader(); + Class oracleBfileClass = classLoader.loadClass("oracle.jdbc.OracleBfile"); + boolean isFileExist = (boolean) oracleBfileClass.getMethod("fileExists").invoke(bfile); + if (!isFileExist) { + return null; + } + + oracleBfileClass.getMethod("openFile").invoke(bfile); + InputStream binaryStream = (InputStream) oracleBfileClass.getMethod("getBinaryStream").invoke(bfile); + byte[] bytes = ByteStreams.toByteArray(binaryStream); + oracleBfileClass.getMethod("closeFile").invoke(bfile); + return bytes; + } catch (ClassNotFoundException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new InvalidStageException("Field is of type 'BFILE', which is not supported " + + "with this version of the JDBC driver.", e); + } catch (IOException e) { + throw new InvalidStageException("Error reading the contents of the BFILE.", e); + } + } + private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, int columnIndex, int sqlType, int precision, int scale) throws SQLException { @@ -341,6 +369,12 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil case OracleSourceSchemaReader.LONG_RAW: recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex)); break; + case Types.STRUCT: + Struct structValue = (Struct) resultSet.getObject(columnIndex); + if (structValue != null) { + recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema, resultSet)); + } + break; case Types.DECIMAL: case Types.NUMERIC: // This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER. @@ -371,6 +405,36 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil } } + private StructuredRecord convertStructToRecord(Struct struct, Schema schema, ResultSet resultSet) + throws SQLException { + Object[] attributes = struct.getAttributes(); + List fields = schema.getFields(); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + + for (int index = 0; index < attributes.length; index++) { + Schema.Field field = fields.get(index); + Object attrValue = attributes[index]; + + if (attrValue == null) { + builder.set(field.getName(), null); + continue; + } + // If it is an internal nested STRUCT, recurse down + if (attrValue instanceof Struct) { + Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, resultSet)); + continue; + } + + String attrClassName = attrValue.getClass().getName(); + Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + + OracleStructAttributeConverters.convertValue(builder, field, fieldSchema, attrValue, attrClassName, + this::getBfileBytes); + } + return builder.build(); + } + /** * Get the scale set in Non-nullable schema associated with the schema * */ diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 208b70410..4a0dd8b7b 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -23,9 +23,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -70,6 +75,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final Boolean isTimestampOldBehavior; private final Boolean isPrecisionlessNumAsDecimal; private final Boolean isTimestampLtzFieldTimestamp; + private Connection connection; public OracleSourceSchemaReader() { this(null, false, false, false); @@ -136,11 +142,76 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti } return Schema.decimalOf(precision, scale); } + case Types.STRUCT: + if (connection == null) { + throw new SQLException("Cannot resolve STRUCT schema without a database connection. " + + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); + } + String typeName = metadata.getColumnTypeName(index); + String owner = typeName.substring(0, typeName.lastIndexOf('.')); + return getStructSchema(connection, typeName, owner); default: return super.getSchema(metadata, index); } } + @Override + public List getSchemaFields(ResultSet resultSet) throws SQLException { + this.connection = resultSet.getStatement().getConnection(); + return super.getSchemaFields(resultSet); + } + + /** + * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the + * database metadata + * for the type's attributes. + * + * @param connection the database connection + * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") + * @return a CDAP RECORD schema with fields corresponding to the STRUCT's + * attributes + */ + private Schema getStructSchema(Connection connection, String typeName, String owner) throws SQLException { + List fields = new ArrayList<>(); + String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? AND OWNER = ? ORDER BY ATTR_NO"; + + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1)); + stmt.setString(2, owner); + + try (ResultSet attrRs = stmt.executeQuery()) { + while (attrRs.next()) { + String attrName = attrRs.getString("ATTR_NAME"); + String attrTypeName = attrRs.getString("ATTR_TYPE_NAME"); + int attrSize = attrRs.getInt("PRECISION"); + int attrScale = attrRs.getInt("SCALE"); + + Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale, attrName); + if (attrSchema != null) { + fields.add(Schema.Field.of(attrName, attrSchema)); + } else { + String nestedStructOwner = attrRs.getString("ATTR_TYPE_OWNER"); + Schema nestedSchema = getStructSchema(connection, attrTypeName, nestedStructOwner); + fields.add(Schema.Field.of(attrName, nestedSchema)); + } + } + } + } + if (fields.isEmpty()) { + throw new SQLException(String.format( + "No attributes found for Oracle STRUCT type '%s'. " + + "Ensure the type exists and is accessible.", + typeName)); + } + + return Schema.recordOf(typeName, fields); + } + + private Schema mapPrimitiveOracleType(String typeName, int precision, int scale, String columnName) { + return OracleUserTypeSchemaMapping.mapPrimitiveOracleType(isTimestampOldBehavior, getTimestampLtzSchema(), + isPrecisionlessNumAsDecimal, typeName, precision, scale, columnName); + } + private @NotNull Schema getTimestampLtzSchema() { return isTimestampOldBehavior || isTimestampLtzFieldTimestamp ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java new file mode 100644 index 000000000..11364f114 --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java @@ -0,0 +1,235 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; + +import java.math.BigDecimal; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.List; + +/** + * Standalone registry of AttributeConverter strategies for converting Oracle STRUCT attributes. + */ +public final class OracleStructAttributeConverters { + + /** + * Functional interface for resolving Oracle BFILE content bytes. + */ + @FunctionalInterface + public interface BfileBytesResolver { + byte[] getBfileBytes(Object bfile) throws SQLException; + } + + /** + * Strategy interface for translating structured attributes to CDAP records. + */ + public interface AttributeConverter { + boolean canConvert(Object attrValue, String attrClassName); + void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException; + } + + private static class BigDecimalConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof BigDecimal; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + BigDecimal bigDecimal = (BigDecimal) attrValue; + if (Schema.LogicalType.DECIMAL.equals(fieldSchema.getLogicalType())) { + builder.setDecimal(field.getName(), bigDecimal.setScale(fieldSchema.getScale(), + java.math.RoundingMode.HALF_UP)); + } else if (Schema.Type.DOUBLE.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.doubleValue()); + } else if (Schema.Type.FLOAT.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.floatValue()); + } else if (Schema.Type.INT.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.intValue()); + } else if (Schema.Type.LONG.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.longValue()); + } else { + builder.set(field.getName(), bigDecimal.toString()); + } + } + } + + private static class TimestampConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Timestamp; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + Timestamp timestamp = (Timestamp) attrValue; + if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { + builder.setDateTime(field.getName(), timestamp.toLocalDateTime()); + } else if (Schema.LogicalType.DATE.equals(fieldSchema.getLogicalType())) { + builder.setDate(field.getName(), timestamp.toLocalDateTime().toLocalDate()); + } else { + builder.set(field.getName(), attrValue.toString()); + } + } + } + + private static class ZonedDateTimeConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof OffsetDateTime || attrValue instanceof ZonedDateTime; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + ZonedDateTime zonedDateTime = (attrValue instanceof OffsetDateTime) + ? ((OffsetDateTime) attrValue).atZoneSameInstant(ZoneId.of("UTC")) + : ((ZonedDateTime) attrValue).withZoneSameInstant(ZoneId.of("UTC")); + if (fieldSchema.getLogicalType() != null && + (Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType()) || + Schema.LogicalType.TIMESTAMP_MILLIS.equals(fieldSchema.getLogicalType()))) { + builder.setTimestamp(field.getName(), zonedDateTime); + } else if (Schema.Type.LONG.equals(fieldSchema.getType())) { + builder.set(field.getName(), zonedDateTime.toInstant().toEpochMilli()); + } else { + builder.set(field.getName(), zonedDateTime.toString()); + } + } + } + + private static class ClobConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Clob; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + Clob clob = (Clob) attrValue; + builder.set(field.getName(), clob.getSubString(1, (int) clob.length())); + } + } + + private static class BlobConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Blob; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + Blob blob = (Blob) attrValue; + builder.set(field.getName(), blob.getBytes(1, (int) blob.length())); + } + } + + private static class OracleBfileConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return "oracle.jdbc.OracleBfile".equals(attrClassName); + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + builder.set(field.getName(), bfileResolver.getBfileBytes(attrValue)); + } + } + + private static class ByteArrayConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof byte[]; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + builder.set(field.getName(), (byte[]) attrValue); + } + } + + private static class OracleIntervalConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return "oracle.sql.INTERVALDS".equals(attrClassName) || "oracle.sql.INTERVALYM".equals(attrClassName); + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + builder.set(field.getName(), attrValue.toString()); + } + } + + private static class DefaultConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return true; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + builder.set(field.getName(), attrValue); + } + } + + private static final List CONVERTERS = Arrays.asList( + new BigDecimalConverter(), + new TimestampConverter(), + new ZonedDateTimeConverter(), + new ClobConverter(), + new BlobConverter(), + new OracleBfileConverter(), + new ByteArrayConverter(), + new OracleIntervalConverter(), + new DefaultConverter() + ); + + private OracleStructAttributeConverters() { + // Private constructor to prevent instantiation. + } + + /** + * Translates an Oracle STRUCT attribute to a CDAP structured record field. + */ + public static void convertValue(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, String attrClassName, + BfileBytesResolver bfileResolver) throws SQLException { + for (AttributeConverter converter : CONVERTERS) { + if (converter.canConvert(attrValue, attrClassName)) { + converter.convert(builder, field, fieldSchema, attrValue, bfileResolver); + break; + } + } + } +} diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructTypeSchemaMapping.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructTypeSchemaMapping.java new file mode 100644 index 000000000..69d980d99 --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructTypeSchemaMapping.java @@ -0,0 +1,146 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Registry containing schema type mappers for Oracle specific datatypes. + */ +public final class OracleUserTypeSchemaMapping { + private static final Logger LOG = LoggerFactory.getLogger(OracleUserTypeSchemaMapping.class); + + private interface TypeMapper { + Schema map(boolean isTimestampOldBehavior, Schema timestampLtzSchema, + boolean isPrecisionlessNumAsDecimal, String typeName, int precision, int scale, String columnName); + } + + private static final Map TYPE_MAPPERS = new HashMap<>(); + + static { + TypeMapper floatMapper = (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.FLOAT); + TYPE_MAPPERS.put("BINARY FLOAT", floatMapper); + TYPE_MAPPERS.put("REAL", floatMapper); + TYPE_MAPPERS.put("FLOAT", floatMapper); + + TypeMapper doubleMapper = (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.DOUBLE); + TYPE_MAPPERS.put("BINARY DOUBLE", doubleMapper); + TYPE_MAPPERS.put("DOUBLE", doubleMapper); + + // Bytes types + TypeMapper bytesMapper = (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.BYTES); + TYPE_MAPPERS.put("BFILE", bytesMapper); + TYPE_MAPPERS.put("BLOB", bytesMapper); + TYPE_MAPPERS.put("RAW", bytesMapper); + TYPE_MAPPERS.put("LONG RAW", bytesMapper); + + // String types + TypeMapper stringMapper = (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.STRING); + TYPE_MAPPERS.put("INTERVAL DAY TO SECOND", stringMapper); + TYPE_MAPPERS.put("INTERVAL YEAR TO MONTH", stringMapper); + TYPE_MAPPERS.put("VARCHAR2", stringMapper); + TYPE_MAPPERS.put("VARCHAR", stringMapper); + TYPE_MAPPERS.put("CHAR", stringMapper); + TYPE_MAPPERS.put("CHAR2", stringMapper); + TYPE_MAPPERS.put("CLOB", stringMapper); + TYPE_MAPPERS.put("NCLOB", stringMapper); + TYPE_MAPPERS.put("LONG", stringMapper); + + // Specific types + TYPE_MAPPERS.put("TIMESTAMP WITH TZ", (isOld, ltzS, precD, typeName, p, s, col) -> + isOld ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) + ); + TYPE_MAPPERS.put("TIMESTAMP WITH LTZ", (isOld, ltzS, precD, typeName, p, s, col) -> ltzS); + TYPE_MAPPERS.put("TIMESTAMP", (isOld, ltzS, precD, typeName, p, s, col) -> + isOld ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) : Schema.of(Schema.LogicalType.DATETIME) + ); + TYPE_MAPPERS.put("DATE", (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.LogicalType.DATE)); + TYPE_MAPPERS.put("TIME", (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.LogicalType.TIME_MICROS)); + TYPE_MAPPERS.put("INTEGER", (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.INT)); + + TYPE_MAPPERS.put("NUMBER", OracleUserTypeSchemaMapping::mapNumberOrDecimal); + TYPE_MAPPERS.put("DECIMAL", OracleUserTypeSchemaMapping::mapNumberOrDecimal); + + // Unsupported types that throw error + TYPE_MAPPERS.put("ARRAY", (isOld, ltzS, precD, typeName, p, s, col) -> { + String errorMessage = String.format("Column %s has unsupported SQL type of %s.", col, typeName); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessage, ErrorType.SYSTEM, true, null); + }); + TYPE_MAPPERS.put("OTHER", (isOld, ltzS, precD, typeName, p, s, col) -> { + String errorMessage = String.format("Column %s has unsupported SQL type of %s.", col, typeName); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessage, ErrorType.SYSTEM, true, null); + }); + TYPE_MAPPERS.put("XML", (isOld, ltzS, precD, typeName, p, s, col) -> { + String errorMessage = String.format("Column %s has unsupported SQL type of %s.", col, typeName); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessage, ErrorType.SYSTEM, true, null); + }); + } + + private OracleUserTypeSchemaMapping() { + // Private constructor to prevent instantiation of utility class. + } + + /** + * Maps primitive Oracle types to CDAP Schemas. + */ + public static Schema mapPrimitiveOracleType(boolean isTimestampOldBehavior, Schema timestampLtzSchema, + boolean isPrecisionlessNumAsDecimal, String typeName, + int precision, int scale, String columnName) { + TypeMapper mapper = TYPE_MAPPERS.get(typeName); + if (mapper != null) { + return mapper.map(isTimestampOldBehavior, timestampLtzSchema, isPrecisionlessNumAsDecimal, + typeName, precision, scale, columnName); + } + return null; + } + + private static Schema mapNumberOrDecimal(boolean isOld, Schema ltzS, boolean precD, String typeName, + int precision, int scale, String columnName) { + if (Double.class.getTypeName().equals(typeName)) { + return Schema.of(Schema.Type.DOUBLE); + } else { + if (precision == 0) { + if (precD) { + int newPrecision = 38; + int newScale = 0; + LOG.warn(String.format("%s type with undefined precision and scale is detected, " + + "there may be a precision loss while running the pipeline. " + + "Please define an output precision and scale for field to avoid " + + "precision loss.", typeName)); + return Schema.decimalOf(newPrecision, newScale); + } else { + LOG.warn(String.format("%s type without precision and scale, " + + "converting into STRING type to avoid any precision loss.", + typeName)); + return Schema.of(Schema.Type.STRING); + } + } + return Schema.decimalOf(precision, scale); + } + } +} diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java index 1ff77c533..8898e8ab4 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java @@ -20,13 +20,15 @@ import io.cdap.cdap.api.data.schema.Schema; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; import java.util.List; public class OracleSchemaReaderTest { @@ -37,6 +39,12 @@ public void getSchema_timestampLTZFieldTrue_returnTimestamp() throws SQLExceptio ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); @@ -68,9 +76,12 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); - + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(metadata.getColumnCount()).thenReturn(2); // -101 is for TIMESTAMP_TZ Mockito.when(metadata.getColumnType(1)).thenReturn(-101); @@ -91,4 +102,44 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName()); Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema()); } + + @Test + public void getSchemaFields_structType_returnRecord() throws SQLException { + OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(); + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + PreparedStatement stmt = Mockito.mock(PreparedStatement.class); + ResultSet attrRs = Mockito.mock(ResultSet.class); + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); + Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(stmt); + Mockito.when(stmt.executeQuery()).thenReturn(attrRs); + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT); + Mockito.when(metadata.getColumnName(1)).thenReturn("address"); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("CS_ITN.ADDRESS_TYPE"); + Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA"); + Mockito.when(attrRs.next()).thenReturn(true, true, false); + Mockito.when(attrRs.getString("ATTR_NAME")).thenReturn("STREET", "CITY"); + Mockito.when(attrRs.getString("ATTR_TYPE_NAME")).thenReturn("VARCHAR2", "VARCHAR2"); + Mockito.when(attrRs.getInt("PRECISION")).thenReturn(0, 0); + Mockito.when(attrRs.getInt("SCALE")).thenReturn(0, 0); + + List actualFields = schemaReader.getSchemaFields(resultSet); + + Schema.Field addressField = actualFields.get(0); + Schema addressSchema = addressField.getSchema().isNullable() + ? addressField.getSchema().getNonNullable() : addressField.getSchema(); + List structFields = addressSchema.getFields(); + Assert.assertEquals(1, actualFields.size()); + Assert.assertEquals("address", addressField.getName()); + Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType()); + Assert.assertEquals("CS_ITN.ADDRESS_TYPE", addressSchema.getRecordName()); + Assert.assertEquals(2, structFields.size()); + Assert.assertEquals("STREET", structFields.get(0).getName()); + Assert.assertEquals("CITY", structFields.get(1).getName()); + } } diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java index 77136e841..244e84ea6 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java @@ -25,6 +25,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.math.BigDecimal; +import java.sql.Date; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Timestamp; @@ -234,4 +235,5 @@ public void validateTimestampTZTypeNullHandling() throws Exception { StructuredRecord record = builder.build(); Assert.assertNull(record.get("field1")); } + }