diff --git a/.gitignore b/.gitignore index 30d9a756..3ea0bfac 100644 --- a/.gitignore +++ b/.gitignore @@ -49,3 +49,4 @@ output # syntax: regexp # ^\.pc/ +.vscode/ \ No newline at end of file diff --git a/README.md b/README.md index b868800d..89feb2d9 100644 --- a/README.md +++ b/README.md @@ -1614,7 +1614,8 @@ The output looks like this: | .option("enable_index_cache", "true") | When true (default), calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. | | .option("input_split_records", 50000) | Specifies how many records will be allocated to each split/partition. It will be processed by Spark tasks. (The default is not set and the split will happen according to size, see the next option) | | .option("input_split_size_mb", 100) | Specify how many megabytes to allocate to each partition/split. (The default is 100 MB) | - +| .option("record_header_name", "HEADER") | Assuming a copybook definition represents a header, offsets the file read start by the number of bytes in that header and excludes the record definition from the output schema. | +| .option("record_trailer_name", "TRAILER") | Assuming a copybook definition represents a trailer, offsets the file read end by the number of bytes in that trailer and excludes the record definition from the output schema. | ##### Helper fields generation options | Option (usage example) | Description | diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index 10b5f20a..f782396b 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -71,7 +71,8 @@ object RecordExtractors { activeSegmentRedefine: String = "", generateInputFileField: Boolean = false, inputFileName: String = "", - handler: RecordHandler[T] + handler: RecordHandler[T], + recordsToExclude: Set[String] = Set.empty ): Seq[Any] = { val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]] val corruptFields = new ArrayBuffer[CorruptField] @@ -202,7 +203,7 @@ object RecordExtractors { val records: ListBuffer[T] = ListBuffer.empty[T] - for (record <- rootRecords) yield { + for (record <- rootRecords if !recordsToExclude.contains(record.name.toUpperCase)) yield { val (size, values) = getGroupValues(nextOffset, record.asInstanceOf[Group]) if (!record.isRedefined) { nextOffset += size @@ -259,7 +260,8 @@ object RecordExtractors { recordId: Long = 0, generateInputFileField: Boolean = false, inputFileName: String = "", - handler: RecordHandler[T] + handler: RecordHandler[T], + recordsToExclude: Set[String] = Set.empty ): Seq[Any] = { val isAstFlat = ast.children.exists(_.isInstanceOf[Primitive]) @@ -422,7 +424,7 @@ object RecordExtractors { val records: ListBuffer[T] = ListBuffer.empty[T] - rootRecords.collect { case grp: Group if grp.parentSegment.isEmpty => + rootRecords.collect { case grp: Group if grp.parentSegment.isEmpty && !recordsToExclude.contains(grp.name.toUpperCase) => val (size, values) = getGroupValues(nextOffset, grp, segmentsData(0)._2, 0, segmentsData(0)._1 :: Nil) nextOffset += size records += values diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala index 7e85cb8e..57048a42 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala @@ -96,7 +96,8 @@ class FixedLenNestedRowIterator[T: ClassTag]( generateCorruptFields = generateCorruptFields, generateCorruptFieldsAsHex = generateCorruptFieldsAsHex, activeSegmentRedefine = activeSegmentRedefine, - handler = handler + handler = handler, + recordsToExclude = readerProperties.recordsToExclude ) // Advance byte index to the next record diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenHierarchicalIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenHierarchicalIterator.scala index 8e110b33..e8f1881c 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenHierarchicalIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenHierarchicalIterator.scala @@ -149,7 +149,8 @@ final class VarLenHierarchicalIterator[T: ClassTag](cobolSchema: Copybook, recordIndex, generateInputFileName, dataStream.inputFileName, - handler + handler, + readerProperties.recordsToExclude ) } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala index edc6459a..c25690cf 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala @@ -109,7 +109,8 @@ final class VarLenNestedIterator[T: ClassTag](cobolSchema: Copybook, activeSegmentRedefine = segmentRedefine, generateInputFileName, dataStream.inputFileName, - handler + handler, + readerProperties.recordsToExclude )) recordFetched = true diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index 3e5e2e53..9ca4be98 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -111,5 +111,7 @@ case class CobolParameters( debugLayoutPositions: Boolean, enableSelfChecks: Boolean, metadataPolicy: MetadataPolicy, + recordHeaderName: Option[String] = None, + recordTrailerName: Option[String] = None, options: Map[String, String] ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala index 197f182c..efa887e1 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala @@ -60,6 +60,8 @@ object CobolParametersParser extends Logging { val PARAM_RECORD_END_OFFSET = "record_end_offset" val PARAM_FILE_START_OFFSET = "file_start_offset" val PARAM_FILE_END_OFFSET = "file_end_offset" + val PARAM_RECORD_HEADER_NAME = "record_header_name" + val PARAM_RECORD_TRAILER_NAME = "record_trailer_name" val PARAM_IS_XCOM = "is_xcom" val PARAM_IS_TEXT = "is_text" @@ -310,6 +312,8 @@ object CobolParametersParser extends Logging { params.getOrElse(PARAM_DEBUG_LAYOUT_POSITIONS, "false").toBoolean, params.getOrElse(PARAM_ENABLE_SELF_CHECKS, "false").toBoolean, MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic")), + params.get(PARAM_RECORD_HEADER_NAME).map(_.trim).filter(_.nonEmpty), + params.get(PARAM_RECORD_TRAILER_NAME).map(_.trim).filter(_.nonEmpty), params.getMap ) validateSparkCobolOptions(params, recordFormat, validateRedundantOptions) @@ -412,6 +416,9 @@ object CobolParametersParser extends Logging { CorruptFieldsPolicy.Disabled } + val recordsToExclude = (parameters.recordHeaderName.map(n => CopybookParser.transformIdentifier(n).toUpperCase).toSet ++ + parameters.recordTrailerName.map(n => CopybookParser.transformIdentifier(n).toUpperCase).toSet) + ReaderParameters( recordFormat = parameters.recordFormat, isEbcdic = parameters.isEbcdic, @@ -470,6 +477,7 @@ object CobolParametersParser extends Logging { varLenParams.reAdditionalInfo, varLenParams.inputFileNameColumn, parameters.metadataPolicy, + recordsToExclude, parameters.options ) } @@ -982,6 +990,22 @@ object CobolParametersParser extends Logging { params.contains(PARAM_ENABLE_INDEX_CACHE) && params(PARAM_ENABLE_INDEX_CACHE).toBoolean) throw new IllegalArgumentException(s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.") + if (params.contains(PARAM_RECORD_HEADER_NAME) && params.contains(PARAM_FILE_START_OFFSET)) { + throw new IllegalArgumentException(s"Options '$PARAM_RECORD_HEADER_NAME' and '$PARAM_FILE_START_OFFSET' cannot be used together.") + } + + if (params.contains(PARAM_RECORD_TRAILER_NAME) && params.contains(PARAM_FILE_END_OFFSET)) { + throw new IllegalArgumentException(s"Options '$PARAM_RECORD_TRAILER_NAME' and '$PARAM_FILE_END_OFFSET' cannot be used together.") + } + + if (params.contains(PARAM_RECORD_HEADER_NAME) && params.contains(PARAM_RECORD_TRAILER_NAME)) { + val headerName = params(PARAM_RECORD_HEADER_NAME).trim + val trailerName = params(PARAM_RECORD_TRAILER_NAME).trim + if (headerName.equalsIgnoreCase(trailerName)) { + throw new IllegalArgumentException(s"Options '$PARAM_RECORD_HEADER_NAME' and '$PARAM_RECORD_TRAILER_NAME' cannot refer to the same record ('$headerName').") + } + } + if (validateRedundantOptions && unusedKeys.nonEmpty) { val unusedKeyStr = unusedKeys.mkString(",") val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr." diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index f71282a8..0679f8e5 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -139,5 +139,6 @@ case class ReaderParameters( reAdditionalInfo: String = "", inputFileNameColumn: String = "", metadataPolicy: MetadataPolicy = MetadataPolicy.Basic, + recordsToExclude: Set[String] = Set.empty, options: Map[String, String] = Map.empty ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala index b6e9e00e..db0be84d 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala @@ -55,13 +55,24 @@ class CobolSchema(val copybook: Copybook, val corruptSchemaPolicy: CorruptFieldsPolicy, val generateSegIdFieldsCnt: Int = 0, val segmentIdProvidedPrefix: String = "", - val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable { + val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic, + val recordsToExclude: Set[String] = Set.empty) extends Serializable { val segmentIdPrefix: String = if (segmentIdProvidedPrefix.isEmpty) getDefaultSegmentIdPrefix else segmentIdProvidedPrefix def getCobolSchema: Copybook = copybook - lazy val getRecordSize: Int = copybook.getRecordSize + lazy val getRecordSize: Int = { + if (recordsToExclude.isEmpty) { + copybook.getRecordSize + } else { + val excludedSize = copybook.ast.children + .filter(r => recordsToExclude.contains(r.name.toUpperCase)) + .map(_.binaryProperties.actualSize) + .sum + copybook.getRecordSize - excludedSize + } + } def isRecordFixedSize: Boolean = copybook.isRecordFixedSize @@ -147,7 +158,8 @@ object CobolSchema { readerParameters.corruptFieldsPolicy, segIdFieldCount, segmentIdPrefix, - readerParameters.metadataPolicy + readerParameters.metadataPolicy, + readerParameters.recordsToExclude ) } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala index e0450c23..e3bf8162 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala @@ -77,7 +77,8 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes generateRecordBytes = readerParams.generateRecordBytes, generateCorruptFields = generateCorruptFields, generateCorruptFieldsAsHex = generateCorruptFieldsAsHex, - handler = recordHandler) + handler = recordHandler, + recordsToExclude = readerParams.recordsToExclude) Row.fromSeq(record) }) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index 238cc88f..d1ba9d10 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -60,7 +60,8 @@ class CobolSchema(copybook: Copybook, corruptFieldsPolicy: CorruptFieldsPolicy, generateSegIdFieldsCnt: Int, segmentIdProvidedPrefix: String, - metadataPolicy: MetadataPolicy) + metadataPolicy: MetadataPolicy, + recordsToExclude: Set[String] = Set.empty) extends CobolReaderSchema(copybook, schemaRetentionPolicy, isDisplayAlwaysString, @@ -70,7 +71,9 @@ class CobolSchema(copybook: Copybook, generateRecordBytes, corruptFieldsPolicy, generateSegIdFieldsCnt, - segmentIdProvidedPrefix + segmentIdProvidedPrefix, + metadataPolicy, + recordsToExclude ) with Logging with Serializable { @throws(classOf[IllegalStateException]) @@ -83,7 +86,8 @@ class CobolSchema(copybook: Copybook, @throws(classOf[IllegalStateException]) private def createSparkSchema(): StructType = { val generateCorruptFields = corruptFieldsPolicy != CorruptFieldsPolicy.Disabled - val records = for (record <- copybook.getRootRecords) yield { + val records = for (record <- copybook.getRootRecords + if !recordsToExclude.contains(record.name.toUpperCase)) yield { val group = record.asInstanceOf[Group] val redefines = copybook.getAllSegmentRedefines parseGroup(group, redefines) @@ -332,7 +336,8 @@ object CobolSchema { schema.corruptSchemaPolicy, schema.generateSegIdFieldsCnt, schema.segmentIdPrefix, - schema.metadataPolicy + schema.metadataPolicy, + schema.recordsToExclude ) } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala index 17ac03ee..e6af7995 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala @@ -24,8 +24,10 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} import za.co.absa.cobrix.cobol.internal.Logging +import za.co.absa.cobrix.cobol.parser.CopybookParser +import za.co.absa.cobrix.cobol.parser.ast.Group import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._ -import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters} +import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, CobolParametersParser, Parameters, VariableLengthParameters} import za.co.absa.cobrix.cobol.reader.schema.CobolSchema import za.co.absa.cobrix.spark.cobol.reader._ import za.co.absa.cobrix.spark.cobol.source.copybook.CopybookContentLoader @@ -156,20 +158,133 @@ object DefaultSource { * This method will probably be removed once the correct hierarchy for [[FixedLenReader]] is put in place. */ def buildEitherReader(spark: SparkSession, cobolParameters: CobolParameters, hasCompressedFiles: Boolean): Reader = { - val reader = if (cobolParameters.isText && cobolParameters.variableLengthParams.isEmpty) { - createTextReader(cobolParameters, spark) - } else if (cobolParameters.variableLengthParams.isEmpty && !hasCompressedFiles) { - createFixedLengthReader(cobolParameters, spark) + val resolvedParameters = resolveHeaderTrailerOffsets(cobolParameters, spark) + + val reader = if (resolvedParameters.isText && resolvedParameters.variableLengthParams.isEmpty) { + createTextReader(resolvedParameters, spark) + } else if (resolvedParameters.variableLengthParams.isEmpty && !hasCompressedFiles) { + createFixedLengthReader(resolvedParameters, spark) } else { - createVariableLengthReader(cobolParameters, spark) + createVariableLengthReader(resolvedParameters, spark) } - if (cobolParameters.debugLayoutPositions) + if (resolvedParameters.debugLayoutPositions) logger.info(s"Layout positions:\n${reader.getCobolSchema.copybook.generateRecordLayoutPositions()}") reader } + /** + * Resolves record_header_name / record_trailer_name options into file_start_offset / file_end_offset + * by parsing the copybook and looking up the named root-level records. + */ + private def resolveHeaderTrailerOffsets(parameters: CobolParameters, spark: SparkSession): CobolParameters = { + if (parameters.recordHeaderName.isEmpty && parameters.recordTrailerName.isEmpty) { + return parameters + } + + val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration) + val copybook = if (copybookContent.size == 1) + CopybookParser.parseTree(copybookContent.head) + else + za.co.absa.cobrix.cobol.parser.Copybook.merge(copybookContent.map(CopybookParser.parseTree(_))) + + val rootRecords = copybook.ast.children + + var fileStartOffset = 0 + var fileEndOffset = 0 + + parameters.recordHeaderName.foreach { headerName => + val transformedHeaderName = CopybookParser.transformIdentifier(headerName) + val headerRecord = rootRecords.find(_.name.equalsIgnoreCase(transformedHeaderName)) + .getOrElse(throw new IllegalArgumentException( + s"Record '$headerName' specified in '${PARAM_RECORD_HEADER_NAME}' is not found among the root-level (01) records of the copybook. " + + s"Available root records: ${rootRecords.map(_.name).mkString(", ")}" + )) + + headerRecord match { + case _: Group => // OK + case _ => throw new IllegalArgumentException( + s"Record '$headerName' specified in '${PARAM_RECORD_HEADER_NAME}' is a primitive field, not a group record. " + + "The copybook might be flat (no 01-level groups)." + ) + } + + fileStartOffset = headerRecord.binaryProperties.offset + headerRecord.binaryProperties.actualSize + } + + parameters.recordTrailerName.foreach { trailerName => + val transformedTrailerName = CopybookParser.transformIdentifier(trailerName) + val trailerRecord = rootRecords.find(_.name.equalsIgnoreCase(transformedTrailerName)) + .getOrElse(throw new IllegalArgumentException( + s"Record '$trailerName' specified in '${PARAM_RECORD_TRAILER_NAME}' is not found among the root-level (01) records of the copybook. " + + s"Available root records: ${rootRecords.map(_.name).mkString(", ")}" + )) + + trailerRecord match { + case _: Group => // OK + case _ => throw new IllegalArgumentException( + s"Record '$trailerName' specified in '${PARAM_RECORD_TRAILER_NAME}' is a primitive field, not a group record. " + + "The copybook might be flat (no 01-level groups)." + ) + } + + fileEndOffset = trailerRecord.binaryProperties.actualSize + } + + // Compute the data-only record length by subtracting excluded records from total + val excludedNames = (parameters.recordHeaderName.map(n => CopybookParser.transformIdentifier(n).toUpperCase).toSet ++ + parameters.recordTrailerName.map(n => CopybookParser.transformIdentifier(n).toUpperCase).toSet) + val totalSize = copybook.getRecordSize + val excludedSize = rootRecords + .filter(r => excludedNames.contains(r.name.toUpperCase)) + .map(_.binaryProperties.actualSize) + .sum + val dataRecordLength = totalSize - excludedSize + val updatedRecordLength = if (parameters.recordLength.isEmpty && dataRecordLength > 0 && dataRecordLength < totalSize) + Some(dataRecordLength) + else + parameters.recordLength + + val existingVarLen = parameters.variableLengthParams + val updatedVarLen = existingVarLen match { + case Some(vlp) => + Some(vlp.copy( + fileStartOffset = if (parameters.recordHeaderName.isDefined) fileStartOffset else vlp.fileStartOffset, + fileEndOffset = if (parameters.recordTrailerName.isDefined) fileEndOffset else vlp.fileEndOffset + )) + case None if fileStartOffset > 0 || fileEndOffset > 0 => + Some(VariableLengthParameters( + isRecordSequence = false, + bdw = None, + isRdwBigEndian = false, + isRdwPartRecLength = false, + rdwAdjustment = 0, + recordHeaderParser = None, + recordExtractor = None, + rhpAdditionalInfo = None, + reAdditionalInfo = "", + recordLengthField = "", + recordLengthMap = Map.empty, + fileStartOffset = fileStartOffset, + fileEndOffset = fileEndOffset, + generateRecordId = false, + isUsingIndex = false, + isIndexCachingAllowed = false, + inputSplitRecords = None, + inputSplitSizeMB = None, + inputSplitSizeCompressedMB = None, + improveLocality = false, + optimizeAllocation = false, + inputFileNameColumn = "", + occursMappings = parameters.occursMappings + )) + case other => other + } + + parameters.copy(variableLengthParams = updatedVarLen, recordLength = updatedRecordLength) + } + /** * Creates a Reader that knows how to consume text Cobol records. */ diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test13cCopybookHeaderTrailerSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test13cCopybookHeaderTrailerSpec.scala new file mode 100644 index 00000000..6eb7289c --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test13cCopybookHeaderTrailerSpec.scala @@ -0,0 +1,228 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.integration + +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +class Test13cCopybookHeaderTrailerSpec extends AnyFunSuite with SparkTestBase with BinaryFileFixture { + + // Copybook with header record (10 bytes), data record (10 bytes), and trailer record (8 bytes) + private val copybookContents = + """ 01 HEADER-REC. + | 05 HDR-TAG PIC X(4). + | 05 HDR-DATE PIC X(6). + | 01 DATA-REC. + | 05 NAME PIC X(6). + | 05 AMOUNT PIC 9(4). + | 01 TRAILER-REC. + | 05 TRL-TAG PIC X(4). + | 05 TRL-COUNT PIC 9(4). + |""".stripMargin + + // Header: "HDR 240101" (10 bytes) + // Data record 1: "ALICE 0100" (10 bytes) + // Data record 2: "BOB 0200" (10 bytes) + // Trailer: "TRL 0002" (8 bytes) + private val testData: Array[Byte] = Array[Byte]( + // Header record: HDR 240101 (10 bytes) + 0x48, 0x44, 0x52, 0x20, 0x32, 0x34, 0x30, 0x31, 0x30, 0x31, + // Data record 1: ALICE 0100 (10 bytes) + 0x41, 0x4C, 0x49, 0x43, 0x45, 0x20, 0x30, 0x31, 0x30, 0x30, + // Data record 2: BOB 0200 (10 bytes) + 0x42, 0x4F, 0x42, 0x20, 0x20, 0x20, 0x30, 0x32, 0x30, 0x30, + // Trailer record: TRL 0002 (8 bytes) + 0x54, 0x52, 0x4C, 0x20, 0x30, 0x30, 0x30, 0x32 + ) + + test("Test reading with record_header_name and record_trailer_name") { + withTempBinFile("test13c", ".dat", testData) { tempFile => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybookContents) + .option("encoding", "ascii") + .option("record_header_name", "HEADER-REC") + .option("record_trailer_name", "TRAILER-REC") + .option("schema_retention_policy", "collapse_root") + .load(tempFile) + + // Schema should only contain data record fields, not header/trailer fields + val fieldNames = df.schema.fieldNames + assert(fieldNames.contains("NAME"), s"Schema should contain NAME field but has: ${fieldNames.mkString(", ")}") + assert(fieldNames.contains("AMOUNT"), s"Schema should contain AMOUNT field but has: ${fieldNames.mkString(", ")}") + assert(!fieldNames.contains("HDR_TAG"), s"Schema should NOT contain HDR_TAG but has: ${fieldNames.mkString(", ")}") + assert(!fieldNames.contains("TRL_TAG"), s"Schema should NOT contain TRL_TAG but has: ${fieldNames.mkString(", ")}") + + // Data should contain 2 records + assert(df.count() == 2) + + val data = df.orderBy("NAME").collect() + assert(data(0).getString(0).trim == "ALICE") + assert(data(0).getInt(1) == 100) + assert(data(1).getString(0).trim == "BOB") + assert(data(1).getInt(1) == 200) + } + } + + test("Test that record_header_name produces same results as manual file_start_offset with data-only copybook") { + // Copybook with only the data record (no header/trailer definitions) + val dataOnlyCopybook = + """ 01 DATA-REC. + | 05 NAME PIC X(6). + | 05 AMOUNT PIC 9(4). + |""".stripMargin + + withTempBinFile("test13c", ".dat", testData) { tempFile => + val dfWithHeaderName = spark + .read + .format("cobol") + .option("copybook_contents", copybookContents) + .option("encoding", "ascii") + .option("record_header_name", "HEADER-REC") + .option("record_trailer_name", "TRAILER-REC") + .option("schema_retention_policy", "collapse_root") + .load(tempFile) + + val dfWithOffsets = spark + .read + .format("cobol") + .option("copybook_contents", dataOnlyCopybook) + .option("encoding", "ascii") + .option("file_start_offset", "10") + .option("file_end_offset", "8") + .option("schema_retention_policy", "collapse_root") + .load(tempFile) + + val resultHeaderName = dfWithHeaderName.orderBy("NAME").collect().map(r => (r.getString(0).trim, r.getInt(1))) + val resultOffsets = dfWithOffsets.orderBy("NAME").collect().map(r => (r.getString(0).trim, r.getInt(1))) + + assert(resultHeaderName.sameElements(resultOffsets), + s"Results should match. Header name: ${resultHeaderName.mkString(",")}. Offsets: ${resultOffsets.mkString(",")}") + } + } + + test("Test only record_header_name without trailer") { + // When only header is excluded, each data record includes DATA-REC (10 bytes) + TRAILER-REC (8 bytes) = 18 bytes + val testDataNoTrailer: Array[Byte] = Array[Byte]( + // Header record: HDR 240101 (10 bytes) + 0x48, 0x44, 0x52, 0x20, 0x32, 0x34, 0x30, 0x31, 0x30, 0x31, + // Data record 1: ALICE 0100 (10 bytes DATA-REC) + TRL 0001 (8 bytes TRAILER-REC) = 18 bytes + 0x41, 0x4C, 0x49, 0x43, 0x45, 0x20, 0x30, 0x31, 0x30, 0x30, + 0x54, 0x52, 0x4C, 0x20, 0x30, 0x30, 0x30, 0x31, + // Data record 2: BOB 0200 (10 bytes DATA-REC) + TRL 0002 (8 bytes TRAILER-REC) = 18 bytes + 0x42, 0x4F, 0x42, 0x20, 0x20, 0x20, 0x30, 0x32, 0x30, 0x30, + 0x54, 0x52, 0x4C, 0x20, 0x30, 0x30, 0x30, 0x32 + ) + + withTempBinFile("test13c", ".dat", testDataNoTrailer) { tempFile => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybookContents) + .option("encoding", "ascii") + .option("record_header_name", "HEADER-REC") + .option("schema_retention_policy", "collapse_root") + .load(tempFile) + + assert(df.count() == 2) + + // Schema should include both DATA-REC and TRAILER-REC fields (only header is excluded) + val fieldNames = df.schema.fieldNames + assert(fieldNames.contains("NAME")) + assert(fieldNames.contains("AMOUNT")) + assert(fieldNames.contains("TRL_TAG")) + assert(fieldNames.contains("TRL_COUNT")) + assert(!fieldNames.contains("HDR_TAG")) + } + } + + test("Test error when record_header_name is used with file_start_offset") { + withTempBinFile("test13c", ".dat", testData) { tempFile => + val ex = intercept[IllegalArgumentException] { + spark + .read + .format("cobol") + .option("copybook_contents", copybookContents) + .option("encoding", "ascii") + .option("record_header_name", "HEADER-REC") + .option("file_start_offset", "10") + .option("schema_retention_policy", "collapse_root") + .load(tempFile) + .count() + } + assert(ex.getMessage.contains("record_header_name")) + assert(ex.getMessage.contains("file_start_offset")) + } + } + + test("Test error when record_trailer_name is used with file_end_offset") { + withTempBinFile("test13c", ".dat", testData) { tempFile => + val ex = intercept[IllegalArgumentException] { + spark + .read + .format("cobol") + .option("copybook_contents", copybookContents) + .option("encoding", "ascii") + .option("record_trailer_name", "TRAILER-REC") + .option("file_end_offset", "8") + .option("schema_retention_policy", "collapse_root") + .load(tempFile) + .count() + } + assert(ex.getMessage.contains("record_trailer_name")) + assert(ex.getMessage.contains("file_end_offset")) + } + } + + test("Test error when record_header_name references a non-existent record") { + withTempBinFile("test13c", ".dat", testData) { tempFile => + val ex = intercept[IllegalArgumentException] { + spark + .read + .format("cobol") + .option("copybook_contents", copybookContents) + .option("encoding", "ascii") + .option("record_header_name", "NON-EXISTENT") + .option("schema_retention_policy", "collapse_root") + .load(tempFile) + .count() + } + assert(ex.getMessage.contains("NON-EXISTENT")) + assert(ex.getMessage.contains("not found")) + } + } + + test("Test error when record_header_name and record_trailer_name are the same") { + withTempBinFile("test13c", ".dat", testData) { tempFile => + val ex = intercept[IllegalArgumentException] { + spark + .read + .format("cobol") + .option("copybook_contents", copybookContents) + .option("encoding", "ascii") + .option("record_header_name", "HEADER-REC") + .option("record_trailer_name", "HEADER-REC") + .option("schema_retention_policy", "collapse_root") + .load(tempFile) + .count() + } + assert(ex.getMessage.contains("cannot refer to the same record")) + } + } +}