From 231c6ee101607aac17154c98a76e3381243e46a7 Mon Sep 17 00:00:00 2001 From: vanshikaagupta22 Date: Wed, 6 May 2026 09:25:07 +0000 Subject: [PATCH 1/2] Provide Struct datat type support for oracle plugin --- .../plugin/oracle/OracleSourceDBRecord.java | 50 ++++++- .../oracle/OracleSourceSchemaReader.java | 125 ++++++++++++++++++ .../plugin/oracle/OracleSchemaReaderTest.java | 63 ++++++++- .../oracle/OracleSourceDBRecordUnitTest.java | 116 ++++++++++++++++ 4 files changed, 352 insertions(+), 2 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java index 44131a01b..d79d67936 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java @@ -35,6 +35,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Struct; import java.sql.Timestamp; import java.sql.Types; import java.time.LocalDateTime; @@ -106,7 +107,7 @@ record = recordBuilder.build(); @Override protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException { - if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) { + if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB || sqlType == Types.STRUCT) { handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); } else { setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); @@ -341,6 +342,13 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil case OracleSourceSchemaReader.LONG_RAW: recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex)); break; + case Types.STRUCT: + java.sql.Struct structValue = (java.sql.Struct) resultSet.getObject(columnIndex); + if (structValue != null) { + recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema, + resultSet.getStatement().getConnection())); + } + break; case Types.DECIMAL: case Types.NUMERIC: // This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER. @@ -371,6 +379,46 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil } } + private StructuredRecord convertStructToRecord(java.sql.Struct struct, Schema schema, + Connection connection) throws SQLException { + Object[] attributes = struct.getAttributes(); + List fields = schema.getFields(); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + + for (int i = 0; i < fields.size() && i < attributes.length; i++) { + Schema.Field field = fields.get(i); + Object attrValue = attributes[i]; + + if (attrValue == null) { + builder.set(field.getName(), null); + continue; + } + + Schema fieldSchema = field.getSchema().isNullable() + ? field.getSchema().getNonNullable() : field.getSchema(); + + if (attrValue instanceof Struct) { + builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, connection)); + } else if (attrValue instanceof java.sql.Date) { + builder.setDate(field.getName(), ((java.sql.Date) attrValue).toLocalDate()); + } else if (attrValue instanceof java.sql.Time) { + builder.setTime(field.getName(), ((java.sql.Time) attrValue).toLocalTime()); + } else if (attrValue instanceof Timestamp) { + if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { + builder.setDateTime(field.getName(), ((Timestamp) attrValue).toLocalDateTime()); + } else { + builder.setTimestamp(field.getName(), + ((Timestamp) attrValue).toInstant().atZone(java.time.ZoneId.of("UTC"))); + } + } else if (attrValue instanceof BigDecimal) { + builder.setDecimal(field.getName(), (BigDecimal) attrValue); + } else { + builder.set(field.getName(), attrValue); + } + } + return builder.build(); + } + /** * Get the scale set in Non-nullable schema associated with the schema * */ diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 208b70410..595ac26d9 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -23,9 +23,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -70,6 +75,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final Boolean isTimestampOldBehavior; private final Boolean isPrecisionlessNumAsDecimal; private final Boolean isTimestampLtzFieldTimestamp; + private Connection connection; public OracleSourceSchemaReader() { this(null, false, false, false); @@ -136,11 +142,130 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti } return Schema.decimalOf(precision, scale); } + case Types.STRUCT: + if (connection == null) { + throw new SQLException("Cannot resolve STRUCT schema without a database connection. " + + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); + } + String typeName = metadata.getColumnTypeName(index); + String oracleSchemaName = metadata.getSchemaName(index); + return getStructSchema(connection, oracleSchemaName, typeName); default: return super.getSchema(metadata, index); } } + @Override + public List getSchemaFields(ResultSet resultSet) throws SQLException { + this.connection = resultSet.getStatement().getConnection(); + return super.getSchemaFields(resultSet); + } + + /** + * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the database metadata + * for the type's attributes. + * + * @param connection the database connection + * @param schemaName the Oracle schema owning the type + * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") + * @return a CDAP RECORD schema with fields corresponding to the STRUCT's attributes + */ + private Schema getStructSchema(Connection connection, String schemaName, + String typeName) throws SQLException { + List fields = new ArrayList<>(); + + String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? ORDER BY ATTR_NO"; + + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1)); + + try (ResultSet attrRs = stmt.executeQuery()) { + while (attrRs.next()) { + String attrName = attrRs.getString("ATTR_NAME"); + String attrTypeName = attrRs.getString("ATTR_TYPE_NAME"); + int attrSize = attrRs.getInt("PRECISION"); + int attrScale = attrRs.getInt("SCALE"); + + Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale); + if (attrSchema != null) { + fields.add(Schema.Field.of(attrName, attrSchema)); + } else { + Schema nestedSchema = getStructSchema(connection, schemaName, attrTypeName); + fields.add(Schema.Field.of(attrName, nestedSchema)); + } + } + } + } + if (fields.isEmpty()) { + throw new SQLException(String.format( + "No attributes found for Oracle STRUCT type '%s.%s'. " + + "Ensure the type exists and is accessible.", + schemaName, typeName)); + } + + return Schema.recordOf(typeName, fields); + } + + private Schema mapPrimitiveOracleType(String typeName, int precision, int scale) { + switch (typeName) { + case "TIMESTAMP WITH TZ": + return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + case "TIMESTAMP WITH LTZ": + return getTimestampLtzSchema(); + case "TIMESTAMP": + return Schema.of(Schema.LogicalType.DATETIME); + case "DATE" : + return Schema.of(Schema.LogicalType.DATE); + case "BINARY FLOAT": + case "FLOAT": + return Schema.of(Schema.Type.FLOAT); + case "BINARY DOUBLE": + case "DOUBLE": + return Schema.of(Schema.Type.DOUBLE); + case "BFILE": + case "RAW": + case "LONG RAW": + return Schema.of(Schema.Type.BYTES); + case "INTERVAL DAY TO SECOND": + case "INTERVAL YEAR TO MONTH": + case "VARCHAR2": + case "VARCHAR": + case "CHAR": + case "CLOB": + case "BLOB": + case "LONG": + return Schema.of(Schema.Type.STRING); + case "INTEGER": + return Schema.of(Schema.Type.INT); + case "NUMBER": + case "DECIMAL": + // FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double + if (Double.class.getTypeName().equals(typeName)) { + return Schema.of(Schema.Type.DOUBLE); + } else { + if (precision == 0) { + if (isPrecisionlessNumAsDecimal) { + precision = 38; + scale = 0; + LOG.warn(String.format("%s type with undefined precision and scale is detected, " + + "there may be a precision loss while running the pipeline. " + + "Please define an output precision and scale for field to avoid " + + "precision loss.", typeName)); + return Schema.decimalOf(precision, scale); + } else { + LOG.warn(String.format("%s type without precision and scale, " + + "converting into STRING type to avoid any precision loss.", + typeName)); + return Schema.of(Schema.Type.STRING); + } + } + return Schema.decimalOf(precision, scale); + } + default: + return null; + } + } + private @NotNull Schema getTimestampLtzSchema() { return isTimestampOldBehavior || isTimestampLtzFieldTimestamp ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java index 1ff77c533..c38e03acc 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java @@ -24,9 +24,13 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; import java.util.List; public class OracleSchemaReaderTest { @@ -37,6 +41,12 @@ public void getSchema_timestampLTZFieldTrue_returnTimestamp() throws SQLExceptio ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); @@ -68,9 +78,12 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); - + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(metadata.getColumnCount()).thenReturn(2); // -101 is for TIMESTAMP_TZ Mockito.when(metadata.getColumnType(1)).thenReturn(-101); @@ -91,4 +104,52 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName()); Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema()); } + + @Test + public void getSchemaFields_structType_returnRecord() throws SQLException { + OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + PreparedStatement stmt = Mockito.mock(PreparedStatement.class); + ResultSet attrRs = Mockito.mock(ResultSet.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); + Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(stmt); + Mockito.when(stmt.executeQuery()).thenReturn(attrRs); + + // One STRUCT column + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT); + Mockito.when(metadata.getColumnName(1)).thenReturn("address"); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("ADDRESS_TYPE"); + Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA"); + + // Mock ALL_TYPE_ATTRS for ADDRESS_TYPE with two VARCHAR2 attributes + Mockito.when(attrRs.next()).thenReturn(true, true, false); + Mockito.when(attrRs.getString("ATTR_NAME")).thenReturn("STREET", "CITY"); + Mockito.when(attrRs.getString("ATTR_TYPE_NAME")).thenReturn("VARCHAR2", "VARCHAR2"); + Mockito.when(attrRs.getInt("PRECISION")).thenReturn(0, 0); + Mockito.when(attrRs.getInt("SCALE")).thenReturn(0, 0); + + List actualFields = schemaReader.getSchemaFields(resultSet); + + Assert.assertEquals(1, actualFields.size()); + Schema.Field addressField = actualFields.get(0); + Assert.assertEquals("address", addressField.getName()); + + Schema addressSchema = addressField.getSchema().isNullable() + ? addressField.getSchema().getNonNullable() : addressField.getSchema(); + Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType()); + Assert.assertEquals("ADDRESS_TYPE", addressSchema.getRecordName()); + + List structFields = addressSchema.getFields(); + Assert.assertEquals(2, structFields.size()); + Assert.assertEquals("STREET", structFields.get(0).getName()); + Assert.assertEquals("CITY", structFields.get(1).getName()); + } } diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java index 77136e841..61d644b54 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java @@ -25,6 +25,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.math.BigDecimal; +import java.sql.Date; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Timestamp; @@ -234,4 +235,119 @@ public void validateTimestampTZTypeNullHandling() throws Exception { StructuredRecord record = builder.build(); Assert.assertNull(record.get("field1")); } + + @Test + public void validateStructHandling() throws Exception { + Schema streetFieldSchema = Schema.of(Schema.Type.STRING); + Schema cityFieldSchema = Schema.of(Schema.Type.STRING); + Schema addressStructSchema = Schema.recordOf("ADDRESS_TYPE", + Schema.Field.of("STREET", streetFieldSchema), + Schema.Field.of("CITY", cityFieldSchema) + ); + Schema.Field addressField = Schema.Field.of("address", addressStructSchema); + Schema schema = Schema.recordOf("dbRecord", addressField); + java.sql.Struct structMock = org.mockito.Mockito.mock(java.sql.Struct.class); + Object[] attributes = new Object[] { "123 Main St", "San Jose" }; + + when(structMock.getAttributes()).thenReturn(attributes); + java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); + java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); + when(resultSet.getStatement()).thenReturn(statementMock); + when(statementMock.getConnection()).thenReturn(connectionMock); + when(resultSet.getObject(eq(1))).thenReturn(structMock); + + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, addressField, 1, Types.STRUCT, 0, 0); + StructuredRecord record = builder.build(); + StructuredRecord addressRecord = record.get("address"); + + Assert.assertNotNull(addressRecord); + Assert.assertEquals("123 Main St", addressRecord.get("STREET")); + Assert.assertEquals("San Jose", addressRecord.get("CITY")); + } + + @Test + public void validateNestedStructHandling() throws Exception { + Schema streetFieldSchema = Schema.of(Schema.Type.STRING); + Schema cityFieldSchema = Schema.of(Schema.Type.STRING); + Schema addressStructSchema = Schema.recordOf("ADDRESS_TYPE", + Schema.Field.of("STREET", streetFieldSchema), + Schema.Field.of("CITY", cityFieldSchema) + ); + Schema personStructSchema = Schema.recordOf("PERSON_TYPE", + Schema.Field.of("NAME", Schema.of(Schema.Type.STRING)), + Schema.Field.of("ADDRESS", addressStructSchema) + ); + Schema.Field personField = Schema.Field.of("person", personStructSchema); + Schema schema = Schema.recordOf("dbRecord", personField); + + java.sql.Struct addressStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); + Object[] addressAttrs = new Object[] { "123 Main St", "San Jose" }; + when(addressStructMock.getAttributes()).thenReturn(addressAttrs); + java.sql.Struct personStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); + Object[] personAttrs = new Object[] { "John Doe", addressStructMock }; + when(personStructMock.getAttributes()).thenReturn(personAttrs); + + java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); + java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); + when(resultSet.getStatement()).thenReturn(statementMock); + when(statementMock.getConnection()).thenReturn(connectionMock); + when(resultSet.getObject(eq(1))).thenReturn(personStructMock); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, personField, 1, Types.STRUCT, 0, 0); + StructuredRecord record = builder.build(); + StructuredRecord personRecord = record.get("person"); + + Assert.assertNotNull(personRecord); + Assert.assertEquals("John Doe", personRecord.get("NAME")); + StructuredRecord addressRecord = personRecord.get("ADDRESS"); + Assert.assertNotNull(addressRecord); + Assert.assertEquals("123 Main St", addressRecord.get("STREET")); + Assert.assertEquals("San Jose", addressRecord.get("CITY")); + } + + @Test + public void validatePrimitiveTypesInStruct() throws Exception { + Schema mixStructSchema = Schema.recordOf("MIX_TYPE", + Schema.Field.of("INT_VAL", Schema.of(Schema.Type.INT)), + Schema.Field.of("DECIMAL_VAL", Schema.decimalOf(10, 2)), + Schema.Field.of("DATE_VAL", Schema.of(Schema.LogicalType.DATE)), + Schema.Field.of("DATETIME_VAL", Schema.of(Schema.LogicalType.DATETIME)), + Schema.Field.of("BYTES_VAL", Schema.of(Schema.Type.BYTES)) + ); + + Schema.Field mixField = Schema.Field.of("mix", mixStructSchema); + Schema schema = Schema.recordOf("dbRecord", mixField); + java.sql.Timestamp timestamp = java.sql.Timestamp.valueOf("2026-05-06 10:30:00"); + java.sql.Date sqlDate = java.sql.Date.valueOf("2026-05-06"); + byte[] bytes = new byte[] { 1, 2, 3 }; + java.sql.Struct mixStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); + Object[] mixAttrs = new Object[] { + 123, + new BigDecimal("45.67"), + sqlDate, + timestamp, + bytes + }; + when(mixStructMock.getAttributes()).thenReturn(mixAttrs); + java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); + java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); + when(resultSet.getStatement()).thenReturn(statementMock); + when(statementMock.getConnection()).thenReturn(connectionMock); + when(resultSet.getObject(eq(1))).thenReturn(mixStructMock); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, mixField, 1, Types.STRUCT, 0, 0); + StructuredRecord record = builder.build(); + StructuredRecord mixRecord = record.get("mix"); + + Assert.assertNotNull(mixRecord); + Assert.assertEquals(Integer.valueOf(123), mixRecord.get("INT_VAL")); + Assert.assertEquals(new BigDecimal("45.67"), mixRecord.getDecimal("DECIMAL_VAL")); + Assert.assertEquals(sqlDate.toLocalDate(), mixRecord.getDate("DATE_VAL")); + Assert.assertEquals(timestamp.toLocalDateTime(), mixRecord.getDateTime("DATETIME_VAL")); + Assert.assertArrayEquals(bytes, mixRecord.get("BYTES_VAL")); + } } From fe03c7b8e7b5b869530546d3ddd1ee04a90be748 Mon Sep 17 00:00:00 2001 From: vanshikaagupta22 Date: Tue, 19 May 2026 11:37:41 +0000 Subject: [PATCH 2/2] Fix data reading errors --- .../plugin/oracle/OracleSourceDBRecord.java | 72 +++--- .../oracle/OracleSourceSchemaReader.java | 90 ++----- .../OracleStructAttributeConverters.java | 235 ++++++++++++++++++ .../oracle/OracleStructTypeSchemaMapping.java | 146 +++++++++++ .../plugin/oracle/OracleSchemaReaderTest.java | 20 +- .../oracle/OracleSourceDBRecordUnitTest.java | 114 --------- 6 files changed, 448 insertions(+), 229 deletions(-) create mode 100644 oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java create mode 100644 oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructTypeSchemaMapping.java diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java index d79d67936..157d2e676 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java @@ -30,6 +30,8 @@ import java.lang.reflect.InvocationTargetException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.sql.Blob; +import java.sql.Clob; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -258,6 +260,31 @@ private byte[] getBfileBytes(ResultSet resultSet, String columnName) throws SQLE } } + private byte[] getBfileBytes(Object bfile) throws SQLException { + if (bfile == null) { + return null; + } + try { + ClassLoader classLoader = bfile.getClass().getClassLoader(); + Class oracleBfileClass = classLoader.loadClass("oracle.jdbc.OracleBfile"); + boolean isFileExist = (boolean) oracleBfileClass.getMethod("fileExists").invoke(bfile); + if (!isFileExist) { + return null; + } + + oracleBfileClass.getMethod("openFile").invoke(bfile); + InputStream binaryStream = (InputStream) oracleBfileClass.getMethod("getBinaryStream").invoke(bfile); + byte[] bytes = ByteStreams.toByteArray(binaryStream); + oracleBfileClass.getMethod("closeFile").invoke(bfile); + return bytes; + } catch (ClassNotFoundException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) { + throw new InvalidStageException("Field is of type 'BFILE', which is not supported " + + "with this version of the JDBC driver.", e); + } catch (IOException e) { + throw new InvalidStageException("Error reading the contents of the BFILE.", e); + } + } + private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, int columnIndex, int sqlType, int precision, int scale) throws SQLException { @@ -343,10 +370,9 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex)); break; case Types.STRUCT: - java.sql.Struct structValue = (java.sql.Struct) resultSet.getObject(columnIndex); + Struct structValue = (Struct) resultSet.getObject(columnIndex); if (structValue != null) { - recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema, - resultSet.getStatement().getConnection())); + recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema, resultSet)); } break; case Types.DECIMAL: @@ -379,42 +405,32 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil } } - private StructuredRecord convertStructToRecord(java.sql.Struct struct, Schema schema, - Connection connection) throws SQLException { + private StructuredRecord convertStructToRecord(Struct struct, Schema schema, ResultSet resultSet) + throws SQLException { Object[] attributes = struct.getAttributes(); List fields = schema.getFields(); StructuredRecord.Builder builder = StructuredRecord.builder(schema); - for (int i = 0; i < fields.size() && i < attributes.length; i++) { - Schema.Field field = fields.get(i); - Object attrValue = attributes[i]; + for (int index = 0; index < attributes.length; index++) { + Schema.Field field = fields.get(index); + Object attrValue = attributes[index]; if (attrValue == null) { builder.set(field.getName(), null); continue; } - - Schema fieldSchema = field.getSchema().isNullable() - ? field.getSchema().getNonNullable() : field.getSchema(); - + // If it is an internal nested STRUCT, recurse down if (attrValue instanceof Struct) { - builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, connection)); - } else if (attrValue instanceof java.sql.Date) { - builder.setDate(field.getName(), ((java.sql.Date) attrValue).toLocalDate()); - } else if (attrValue instanceof java.sql.Time) { - builder.setTime(field.getName(), ((java.sql.Time) attrValue).toLocalTime()); - } else if (attrValue instanceof Timestamp) { - if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { - builder.setDateTime(field.getName(), ((Timestamp) attrValue).toLocalDateTime()); - } else { - builder.setTimestamp(field.getName(), - ((Timestamp) attrValue).toInstant().atZone(java.time.ZoneId.of("UTC"))); - } - } else if (attrValue instanceof BigDecimal) { - builder.setDecimal(field.getName(), (BigDecimal) attrValue); - } else { - builder.set(field.getName(), attrValue); + Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, resultSet)); + continue; } + + String attrClassName = attrValue.getClass().getName(); + Schema fieldSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); + + OracleStructAttributeConverters.convertValue(builder, field, fieldSchema, attrValue, attrClassName, + this::getBfileBytes); } return builder.build(); } diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 595ac26d9..4a0dd8b7b 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -148,8 +148,8 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); } String typeName = metadata.getColumnTypeName(index); - String oracleSchemaName = metadata.getSchemaName(index); - return getStructSchema(connection, oracleSchemaName, typeName); + String owner = typeName.substring(0, typeName.lastIndexOf('.')); + return getStructSchema(connection, typeName, owner); default: return super.getSchema(metadata, index); } @@ -162,22 +162,22 @@ public List getSchemaFields(ResultSet resultSet) throws SQLExcepti } /** - * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the database metadata + * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the + * database metadata * for the type's attributes. * * @param connection the database connection - * @param schemaName the Oracle schema owning the type * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") - * @return a CDAP RECORD schema with fields corresponding to the STRUCT's attributes + * @return a CDAP RECORD schema with fields corresponding to the STRUCT's + * attributes */ - private Schema getStructSchema(Connection connection, String schemaName, - String typeName) throws SQLException { + private Schema getStructSchema(Connection connection, String typeName, String owner) throws SQLException { List fields = new ArrayList<>(); - - String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? ORDER BY ATTR_NO"; + String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? AND OWNER = ? ORDER BY ATTR_NO"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1)); + stmt.setString(2, owner); try (ResultSet attrRs = stmt.executeQuery()) { while (attrRs.next()) { @@ -186,11 +186,12 @@ private Schema getStructSchema(Connection connection, String schemaName, int attrSize = attrRs.getInt("PRECISION"); int attrScale = attrRs.getInt("SCALE"); - Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale); + Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale, attrName); if (attrSchema != null) { fields.add(Schema.Field.of(attrName, attrSchema)); } else { - Schema nestedSchema = getStructSchema(connection, schemaName, attrTypeName); + String nestedStructOwner = attrRs.getString("ATTR_TYPE_OWNER"); + Schema nestedSchema = getStructSchema(connection, attrTypeName, nestedStructOwner); fields.add(Schema.Field.of(attrName, nestedSchema)); } } @@ -198,72 +199,17 @@ private Schema getStructSchema(Connection connection, String schemaName, } if (fields.isEmpty()) { throw new SQLException(String.format( - "No attributes found for Oracle STRUCT type '%s.%s'. " - + "Ensure the type exists and is accessible.", - schemaName, typeName)); + "No attributes found for Oracle STRUCT type '%s'. " + + "Ensure the type exists and is accessible.", + typeName)); } return Schema.recordOf(typeName, fields); } - private Schema mapPrimitiveOracleType(String typeName, int precision, int scale) { - switch (typeName) { - case "TIMESTAMP WITH TZ": - return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); - case "TIMESTAMP WITH LTZ": - return getTimestampLtzSchema(); - case "TIMESTAMP": - return Schema.of(Schema.LogicalType.DATETIME); - case "DATE" : - return Schema.of(Schema.LogicalType.DATE); - case "BINARY FLOAT": - case "FLOAT": - return Schema.of(Schema.Type.FLOAT); - case "BINARY DOUBLE": - case "DOUBLE": - return Schema.of(Schema.Type.DOUBLE); - case "BFILE": - case "RAW": - case "LONG RAW": - return Schema.of(Schema.Type.BYTES); - case "INTERVAL DAY TO SECOND": - case "INTERVAL YEAR TO MONTH": - case "VARCHAR2": - case "VARCHAR": - case "CHAR": - case "CLOB": - case "BLOB": - case "LONG": - return Schema.of(Schema.Type.STRING); - case "INTEGER": - return Schema.of(Schema.Type.INT); - case "NUMBER": - case "DECIMAL": - // FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double - if (Double.class.getTypeName().equals(typeName)) { - return Schema.of(Schema.Type.DOUBLE); - } else { - if (precision == 0) { - if (isPrecisionlessNumAsDecimal) { - precision = 38; - scale = 0; - LOG.warn(String.format("%s type with undefined precision and scale is detected, " - + "there may be a precision loss while running the pipeline. " - + "Please define an output precision and scale for field to avoid " - + "precision loss.", typeName)); - return Schema.decimalOf(precision, scale); - } else { - LOG.warn(String.format("%s type without precision and scale, " - + "converting into STRING type to avoid any precision loss.", - typeName)); - return Schema.of(Schema.Type.STRING); - } - } - return Schema.decimalOf(precision, scale); - } - default: - return null; - } + private Schema mapPrimitiveOracleType(String typeName, int precision, int scale, String columnName) { + return OracleUserTypeSchemaMapping.mapPrimitiveOracleType(isTimestampOldBehavior, getTimestampLtzSchema(), + isPrecisionlessNumAsDecimal, typeName, precision, scale, columnName); } private @NotNull Schema getTimestampLtzSchema() { diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java new file mode 100644 index 000000000..11364f114 --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructAttributeConverters.java @@ -0,0 +1,235 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; + +import java.math.BigDecimal; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.List; + +/** + * Standalone registry of AttributeConverter strategies for converting Oracle STRUCT attributes. + */ +public final class OracleStructAttributeConverters { + + /** + * Functional interface for resolving Oracle BFILE content bytes. + */ + @FunctionalInterface + public interface BfileBytesResolver { + byte[] getBfileBytes(Object bfile) throws SQLException; + } + + /** + * Strategy interface for translating structured attributes to CDAP records. + */ + public interface AttributeConverter { + boolean canConvert(Object attrValue, String attrClassName); + void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException; + } + + private static class BigDecimalConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof BigDecimal; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + BigDecimal bigDecimal = (BigDecimal) attrValue; + if (Schema.LogicalType.DECIMAL.equals(fieldSchema.getLogicalType())) { + builder.setDecimal(field.getName(), bigDecimal.setScale(fieldSchema.getScale(), + java.math.RoundingMode.HALF_UP)); + } else if (Schema.Type.DOUBLE.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.doubleValue()); + } else if (Schema.Type.FLOAT.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.floatValue()); + } else if (Schema.Type.INT.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.intValue()); + } else if (Schema.Type.LONG.equals(fieldSchema.getType())) { + builder.set(field.getName(), bigDecimal.longValue()); + } else { + builder.set(field.getName(), bigDecimal.toString()); + } + } + } + + private static class TimestampConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Timestamp; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + Timestamp timestamp = (Timestamp) attrValue; + if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { + builder.setDateTime(field.getName(), timestamp.toLocalDateTime()); + } else if (Schema.LogicalType.DATE.equals(fieldSchema.getLogicalType())) { + builder.setDate(field.getName(), timestamp.toLocalDateTime().toLocalDate()); + } else { + builder.set(field.getName(), attrValue.toString()); + } + } + } + + private static class ZonedDateTimeConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof OffsetDateTime || attrValue instanceof ZonedDateTime; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + ZonedDateTime zonedDateTime = (attrValue instanceof OffsetDateTime) + ? ((OffsetDateTime) attrValue).atZoneSameInstant(ZoneId.of("UTC")) + : ((ZonedDateTime) attrValue).withZoneSameInstant(ZoneId.of("UTC")); + if (fieldSchema.getLogicalType() != null && + (Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType()) || + Schema.LogicalType.TIMESTAMP_MILLIS.equals(fieldSchema.getLogicalType()))) { + builder.setTimestamp(field.getName(), zonedDateTime); + } else if (Schema.Type.LONG.equals(fieldSchema.getType())) { + builder.set(field.getName(), zonedDateTime.toInstant().toEpochMilli()); + } else { + builder.set(field.getName(), zonedDateTime.toString()); + } + } + } + + private static class ClobConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Clob; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + Clob clob = (Clob) attrValue; + builder.set(field.getName(), clob.getSubString(1, (int) clob.length())); + } + } + + private static class BlobConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof Blob; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + Blob blob = (Blob) attrValue; + builder.set(field.getName(), blob.getBytes(1, (int) blob.length())); + } + } + + private static class OracleBfileConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return "oracle.jdbc.OracleBfile".equals(attrClassName); + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + builder.set(field.getName(), bfileResolver.getBfileBytes(attrValue)); + } + } + + private static class ByteArrayConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return attrValue instanceof byte[]; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + builder.set(field.getName(), (byte[]) attrValue); + } + } + + private static class OracleIntervalConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return "oracle.sql.INTERVALDS".equals(attrClassName) || "oracle.sql.INTERVALYM".equals(attrClassName); + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + builder.set(field.getName(), attrValue.toString()); + } + } + + private static class DefaultConverter implements AttributeConverter { + @Override + public boolean canConvert(Object attrValue, String attrClassName) { + return true; + } + + @Override + public void convert(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, BfileBytesResolver bfileResolver) throws SQLException { + builder.set(field.getName(), attrValue); + } + } + + private static final List CONVERTERS = Arrays.asList( + new BigDecimalConverter(), + new TimestampConverter(), + new ZonedDateTimeConverter(), + new ClobConverter(), + new BlobConverter(), + new OracleBfileConverter(), + new ByteArrayConverter(), + new OracleIntervalConverter(), + new DefaultConverter() + ); + + private OracleStructAttributeConverters() { + // Private constructor to prevent instantiation. + } + + /** + * Translates an Oracle STRUCT attribute to a CDAP structured record field. + */ + public static void convertValue(StructuredRecord.Builder builder, Schema.Field field, Schema fieldSchema, + Object attrValue, String attrClassName, + BfileBytesResolver bfileResolver) throws SQLException { + for (AttributeConverter converter : CONVERTERS) { + if (converter.canConvert(attrValue, attrClassName)) { + converter.convert(builder, field, fieldSchema, attrValue, bfileResolver); + break; + } + } + } +} diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructTypeSchemaMapping.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructTypeSchemaMapping.java new file mode 100644 index 000000000..69d980d99 --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleStructTypeSchemaMapping.java @@ -0,0 +1,146 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Registry containing schema type mappers for Oracle specific datatypes. + */ +public final class OracleUserTypeSchemaMapping { + private static final Logger LOG = LoggerFactory.getLogger(OracleUserTypeSchemaMapping.class); + + private interface TypeMapper { + Schema map(boolean isTimestampOldBehavior, Schema timestampLtzSchema, + boolean isPrecisionlessNumAsDecimal, String typeName, int precision, int scale, String columnName); + } + + private static final Map TYPE_MAPPERS = new HashMap<>(); + + static { + TypeMapper floatMapper = (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.FLOAT); + TYPE_MAPPERS.put("BINARY FLOAT", floatMapper); + TYPE_MAPPERS.put("REAL", floatMapper); + TYPE_MAPPERS.put("FLOAT", floatMapper); + + TypeMapper doubleMapper = (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.DOUBLE); + TYPE_MAPPERS.put("BINARY DOUBLE", doubleMapper); + TYPE_MAPPERS.put("DOUBLE", doubleMapper); + + // Bytes types + TypeMapper bytesMapper = (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.BYTES); + TYPE_MAPPERS.put("BFILE", bytesMapper); + TYPE_MAPPERS.put("BLOB", bytesMapper); + TYPE_MAPPERS.put("RAW", bytesMapper); + TYPE_MAPPERS.put("LONG RAW", bytesMapper); + + // String types + TypeMapper stringMapper = (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.STRING); + TYPE_MAPPERS.put("INTERVAL DAY TO SECOND", stringMapper); + TYPE_MAPPERS.put("INTERVAL YEAR TO MONTH", stringMapper); + TYPE_MAPPERS.put("VARCHAR2", stringMapper); + TYPE_MAPPERS.put("VARCHAR", stringMapper); + TYPE_MAPPERS.put("CHAR", stringMapper); + TYPE_MAPPERS.put("CHAR2", stringMapper); + TYPE_MAPPERS.put("CLOB", stringMapper); + TYPE_MAPPERS.put("NCLOB", stringMapper); + TYPE_MAPPERS.put("LONG", stringMapper); + + // Specific types + TYPE_MAPPERS.put("TIMESTAMP WITH TZ", (isOld, ltzS, precD, typeName, p, s, col) -> + isOld ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) + ); + TYPE_MAPPERS.put("TIMESTAMP WITH LTZ", (isOld, ltzS, precD, typeName, p, s, col) -> ltzS); + TYPE_MAPPERS.put("TIMESTAMP", (isOld, ltzS, precD, typeName, p, s, col) -> + isOld ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) : Schema.of(Schema.LogicalType.DATETIME) + ); + TYPE_MAPPERS.put("DATE", (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.LogicalType.DATE)); + TYPE_MAPPERS.put("TIME", (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.LogicalType.TIME_MICROS)); + TYPE_MAPPERS.put("INTEGER", (isOld, ltzS, precD, typeName, p, s, col) -> Schema.of(Schema.Type.INT)); + + TYPE_MAPPERS.put("NUMBER", OracleUserTypeSchemaMapping::mapNumberOrDecimal); + TYPE_MAPPERS.put("DECIMAL", OracleUserTypeSchemaMapping::mapNumberOrDecimal); + + // Unsupported types that throw error + TYPE_MAPPERS.put("ARRAY", (isOld, ltzS, precD, typeName, p, s, col) -> { + String errorMessage = String.format("Column %s has unsupported SQL type of %s.", col, typeName); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessage, ErrorType.SYSTEM, true, null); + }); + TYPE_MAPPERS.put("OTHER", (isOld, ltzS, precD, typeName, p, s, col) -> { + String errorMessage = String.format("Column %s has unsupported SQL type of %s.", col, typeName); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessage, ErrorType.SYSTEM, true, null); + }); + TYPE_MAPPERS.put("XML", (isOld, ltzS, precD, typeName, p, s, col) -> { + String errorMessage = String.format("Column %s has unsupported SQL type of %s.", col, typeName); + throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), + errorMessage, errorMessage, ErrorType.SYSTEM, true, null); + }); + } + + private OracleUserTypeSchemaMapping() { + // Private constructor to prevent instantiation of utility class. + } + + /** + * Maps primitive Oracle types to CDAP Schemas. + */ + public static Schema mapPrimitiveOracleType(boolean isTimestampOldBehavior, Schema timestampLtzSchema, + boolean isPrecisionlessNumAsDecimal, String typeName, + int precision, int scale, String columnName) { + TypeMapper mapper = TYPE_MAPPERS.get(typeName); + if (mapper != null) { + return mapper.map(isTimestampOldBehavior, timestampLtzSchema, isPrecisionlessNumAsDecimal, + typeName, precision, scale, columnName); + } + return null; + } + + private static Schema mapNumberOrDecimal(boolean isOld, Schema ltzS, boolean precD, String typeName, + int precision, int scale, String columnName) { + if (Double.class.getTypeName().equals(typeName)) { + return Schema.of(Schema.Type.DOUBLE); + } else { + if (precision == 0) { + if (precD) { + int newPrecision = 38; + int newScale = 0; + LOG.warn(String.format("%s type with undefined precision and scale is detected, " + + "there may be a precision loss while running the pipeline. " + + "Please define an output precision and scale for field to avoid " + + "precision loss.", typeName)); + return Schema.decimalOf(newPrecision, newScale); + } else { + LOG.warn(String.format("%s type without precision and scale, " + + "converting into STRING type to avoid any precision loss.", + typeName)); + return Schema.of(Schema.Type.STRING); + } + } + return Schema.decimalOf(precision, scale); + } + } +} diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java index c38e03acc..8898e8ab4 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java @@ -20,9 +20,7 @@ import io.cdap.cdap.api.data.schema.Schema; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; import java.sql.Connection; import java.sql.PreparedStatement; @@ -108,28 +106,22 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio @Test public void getSchemaFields_structType_returnRecord() throws SQLException { OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(); - ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); Statement statement = Mockito.mock(Statement.class); Connection connection = Mockito.mock(Connection.class); PreparedStatement stmt = Mockito.mock(PreparedStatement.class); ResultSet attrRs = Mockito.mock(ResultSet.class); - Mockito.when(resultSet.getMetaData()).thenReturn(metadata); Mockito.when(resultSet.getStatement()).thenReturn(statement); Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(stmt); Mockito.when(stmt.executeQuery()).thenReturn(attrRs); - - // One STRUCT column Mockito.when(metadata.getColumnCount()).thenReturn(1); Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT); Mockito.when(metadata.getColumnName(1)).thenReturn("address"); - Mockito.when(metadata.getColumnTypeName(1)).thenReturn("ADDRESS_TYPE"); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("CS_ITN.ADDRESS_TYPE"); Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA"); - - // Mock ALL_TYPE_ATTRS for ADDRESS_TYPE with two VARCHAR2 attributes Mockito.when(attrRs.next()).thenReturn(true, true, false); Mockito.when(attrRs.getString("ATTR_NAME")).thenReturn("STREET", "CITY"); Mockito.when(attrRs.getString("ATTR_TYPE_NAME")).thenReturn("VARCHAR2", "VARCHAR2"); @@ -138,16 +130,14 @@ public void getSchemaFields_structType_returnRecord() throws SQLException { List actualFields = schemaReader.getSchemaFields(resultSet); - Assert.assertEquals(1, actualFields.size()); Schema.Field addressField = actualFields.get(0); - Assert.assertEquals("address", addressField.getName()); - Schema addressSchema = addressField.getSchema().isNullable() ? addressField.getSchema().getNonNullable() : addressField.getSchema(); - Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType()); - Assert.assertEquals("ADDRESS_TYPE", addressSchema.getRecordName()); - List structFields = addressSchema.getFields(); + Assert.assertEquals(1, actualFields.size()); + Assert.assertEquals("address", addressField.getName()); + Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType()); + Assert.assertEquals("CS_ITN.ADDRESS_TYPE", addressSchema.getRecordName()); Assert.assertEquals(2, structFields.size()); Assert.assertEquals("STREET", structFields.get(0).getName()); Assert.assertEquals("CITY", structFields.get(1).getName()); diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java index 61d644b54..244e84ea6 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java @@ -236,118 +236,4 @@ public void validateTimestampTZTypeNullHandling() throws Exception { Assert.assertNull(record.get("field1")); } - @Test - public void validateStructHandling() throws Exception { - Schema streetFieldSchema = Schema.of(Schema.Type.STRING); - Schema cityFieldSchema = Schema.of(Schema.Type.STRING); - Schema addressStructSchema = Schema.recordOf("ADDRESS_TYPE", - Schema.Field.of("STREET", streetFieldSchema), - Schema.Field.of("CITY", cityFieldSchema) - ); - Schema.Field addressField = Schema.Field.of("address", addressStructSchema); - Schema schema = Schema.recordOf("dbRecord", addressField); - java.sql.Struct structMock = org.mockito.Mockito.mock(java.sql.Struct.class); - Object[] attributes = new Object[] { "123 Main St", "San Jose" }; - - when(structMock.getAttributes()).thenReturn(attributes); - java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); - java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); - when(resultSet.getStatement()).thenReturn(statementMock); - when(statementMock.getConnection()).thenReturn(connectionMock); - when(resultSet.getObject(eq(1))).thenReturn(structMock); - - StructuredRecord.Builder builder = StructuredRecord.builder(schema); - OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); - dbRecord.handleField(resultSet, builder, addressField, 1, Types.STRUCT, 0, 0); - StructuredRecord record = builder.build(); - StructuredRecord addressRecord = record.get("address"); - - Assert.assertNotNull(addressRecord); - Assert.assertEquals("123 Main St", addressRecord.get("STREET")); - Assert.assertEquals("San Jose", addressRecord.get("CITY")); - } - - @Test - public void validateNestedStructHandling() throws Exception { - Schema streetFieldSchema = Schema.of(Schema.Type.STRING); - Schema cityFieldSchema = Schema.of(Schema.Type.STRING); - Schema addressStructSchema = Schema.recordOf("ADDRESS_TYPE", - Schema.Field.of("STREET", streetFieldSchema), - Schema.Field.of("CITY", cityFieldSchema) - ); - Schema personStructSchema = Schema.recordOf("PERSON_TYPE", - Schema.Field.of("NAME", Schema.of(Schema.Type.STRING)), - Schema.Field.of("ADDRESS", addressStructSchema) - ); - Schema.Field personField = Schema.Field.of("person", personStructSchema); - Schema schema = Schema.recordOf("dbRecord", personField); - - java.sql.Struct addressStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); - Object[] addressAttrs = new Object[] { "123 Main St", "San Jose" }; - when(addressStructMock.getAttributes()).thenReturn(addressAttrs); - java.sql.Struct personStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); - Object[] personAttrs = new Object[] { "John Doe", addressStructMock }; - when(personStructMock.getAttributes()).thenReturn(personAttrs); - - java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); - java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); - when(resultSet.getStatement()).thenReturn(statementMock); - when(statementMock.getConnection()).thenReturn(connectionMock); - when(resultSet.getObject(eq(1))).thenReturn(personStructMock); - StructuredRecord.Builder builder = StructuredRecord.builder(schema); - OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); - dbRecord.handleField(resultSet, builder, personField, 1, Types.STRUCT, 0, 0); - StructuredRecord record = builder.build(); - StructuredRecord personRecord = record.get("person"); - - Assert.assertNotNull(personRecord); - Assert.assertEquals("John Doe", personRecord.get("NAME")); - StructuredRecord addressRecord = personRecord.get("ADDRESS"); - Assert.assertNotNull(addressRecord); - Assert.assertEquals("123 Main St", addressRecord.get("STREET")); - Assert.assertEquals("San Jose", addressRecord.get("CITY")); - } - - @Test - public void validatePrimitiveTypesInStruct() throws Exception { - Schema mixStructSchema = Schema.recordOf("MIX_TYPE", - Schema.Field.of("INT_VAL", Schema.of(Schema.Type.INT)), - Schema.Field.of("DECIMAL_VAL", Schema.decimalOf(10, 2)), - Schema.Field.of("DATE_VAL", Schema.of(Schema.LogicalType.DATE)), - Schema.Field.of("DATETIME_VAL", Schema.of(Schema.LogicalType.DATETIME)), - Schema.Field.of("BYTES_VAL", Schema.of(Schema.Type.BYTES)) - ); - - Schema.Field mixField = Schema.Field.of("mix", mixStructSchema); - Schema schema = Schema.recordOf("dbRecord", mixField); - java.sql.Timestamp timestamp = java.sql.Timestamp.valueOf("2026-05-06 10:30:00"); - java.sql.Date sqlDate = java.sql.Date.valueOf("2026-05-06"); - byte[] bytes = new byte[] { 1, 2, 3 }; - java.sql.Struct mixStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); - Object[] mixAttrs = new Object[] { - 123, - new BigDecimal("45.67"), - sqlDate, - timestamp, - bytes - }; - when(mixStructMock.getAttributes()).thenReturn(mixAttrs); - java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); - java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); - when(resultSet.getStatement()).thenReturn(statementMock); - when(statementMock.getConnection()).thenReturn(connectionMock); - when(resultSet.getObject(eq(1))).thenReturn(mixStructMock); - StructuredRecord.Builder builder = StructuredRecord.builder(schema); - OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); - dbRecord.handleField(resultSet, builder, mixField, 1, Types.STRUCT, 0, 0); - StructuredRecord record = builder.build(); - StructuredRecord mixRecord = record.get("mix"); - - Assert.assertNotNull(mixRecord); - Assert.assertEquals(Integer.valueOf(123), mixRecord.get("INT_VAL")); - Assert.assertEquals(new BigDecimal("45.67"), mixRecord.getDecimal("DECIMAL_VAL")); - Assert.assertEquals(sqlDate.toLocalDate(), mixRecord.getDate("DATE_VAL")); - Assert.assertEquals(timestamp.toLocalDateTime(), mixRecord.getDateTime("DATETIME_VAL")); - Assert.assertArrayEquals(bytes, mixRecord.get("BYTES_VAL")); - } }