Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ output
# syntax: regexp
# ^\.pc/

.vscode/
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,8 @@ The output looks like this:
| .option("enable_index_cache", "true") | When true (default), calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. |
| .option("input_split_records", 50000) | Specifies how many records will be allocated to each split/partition. It will be processed by Spark tasks. (The default is not set and the split will happen according to size, see the next option) |
| .option("input_split_size_mb", 100) | Specify how many megabytes to allocate to each partition/split. (The default is 100 MB) |

| .option("record_header_name", "HEADER") | Assuming a copybook definition represents a header, offsets the file read start by the number of bytes in that header and excludes the record definition from the output schema. |
| .option("record_trailer_name", "TRAILER") | Assuming a copybook definition represents a trailer, offsets the file read end by the number of bytes in that trailer and excludes the record definition from the output schema. |
##### Helper fields generation options
Comment on lines +1618 to 1619
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a blank line after the multisegment options table.

Line 1618 is immediately followed by a heading, which triggers markdownlint MD058 (tables should be surrounded by blank lines).

✅ Minimal markdown fix
 | .option("record_trailer_name", "TRAILER")                                             | Assuming a copybook definition represents a trailer, offsets the file read end by the number of bytes in that trailer and excludes the record definition from the output schema.                                                                                                                                                                                                                                                                                            |
+
 ##### Helper fields generation options    
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
| .option("record_trailer_name", "TRAILER") | Assuming a copybook definition represents a trailer, offsets the file read end by the number of bytes in that trailer and excludes the record definition from the output schema. |
##### Helper fields generation options
| .option("record_trailer_name", "TRAILER") | Assuming a copybook definition represents a trailer, offsets the file read end by the number of bytes in that trailer and excludes the record definition from the output schema. |
##### Helper fields generation options
🧰 Tools
🪛 markdownlint-cli2 (0.21.0)

[warning] 1618-1618: Tables should be surrounded by blank lines

(MD058, blanks-around-tables)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 1618 - 1619, The README has a Markdown lint issue
MD058 because the table ending with the `.option("record_trailer_name",
"TRAILER")` row is immediately followed by the heading `##### Helper fields
generation options`; insert a single blank line between the table and that
heading so the table is surrounded by blank lines (i.e., add one empty line
after the table row containing `.option("record_trailer_name", "TRAILER")` and
before the `##### Helper fields generation options` heading).


| Option (usage example) | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ object RecordExtractors {
activeSegmentRedefine: String = "",
generateInputFileField: Boolean = false,
inputFileName: String = "",
handler: RecordHandler[T]
handler: RecordHandler[T],
recordsToExclude: Set[String] = Set.empty
): Seq[Any] = {
val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]]
val corruptFields = new ArrayBuffer[CorruptField]
Expand Down Expand Up @@ -202,7 +203,7 @@ object RecordExtractors {

val records: ListBuffer[T] = ListBuffer.empty[T]

for (record <- rootRecords) yield {
for (record <- rootRecords if !recordsToExclude.contains(record.name.toUpperCase)) yield {
val (size, values) = getGroupValues(nextOffset, record.asInstanceOf[Group])
if (!record.isRedefined) {
nextOffset += size
Expand Down Expand Up @@ -259,7 +260,8 @@ object RecordExtractors {
recordId: Long = 0,
generateInputFileField: Boolean = false,
inputFileName: String = "",
handler: RecordHandler[T]
handler: RecordHandler[T],
recordsToExclude: Set[String] = Set.empty
): Seq[Any] = {
val isAstFlat = ast.children.exists(_.isInstanceOf[Primitive])

Expand Down Expand Up @@ -422,7 +424,7 @@ object RecordExtractors {

val records: ListBuffer[T] = ListBuffer.empty[T]

rootRecords.collect { case grp: Group if grp.parentSegment.isEmpty =>
rootRecords.collect { case grp: Group if grp.parentSegment.isEmpty && !recordsToExclude.contains(grp.name.toUpperCase) =>
val (size, values) = getGroupValues(nextOffset, grp, segmentsData(0)._2, 0, segmentsData(0)._1 :: Nil)
nextOffset += size
records += values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class FixedLenNestedRowIterator[T: ClassTag](
generateCorruptFields = generateCorruptFields,
generateCorruptFieldsAsHex = generateCorruptFieldsAsHex,
activeSegmentRedefine = activeSegmentRedefine,
handler = handler
handler = handler,
recordsToExclude = readerProperties.recordsToExclude
)

// Advance byte index to the next record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ final class VarLenHierarchicalIterator[T: ClassTag](cobolSchema: Copybook,
recordIndex,
generateInputFileName,
dataStream.inputFileName,
handler
handler,
readerProperties.recordsToExclude
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ final class VarLenNestedIterator[T: ClassTag](cobolSchema: Copybook,
activeSegmentRedefine = segmentRedefine,
generateInputFileName,
dataStream.inputFileName,
handler
handler,
readerProperties.recordsToExclude
))

recordFetched = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,7 @@ case class CobolParameters(
debugLayoutPositions: Boolean,
enableSelfChecks: Boolean,
metadataPolicy: MetadataPolicy,
recordHeaderName: Option[String] = None,
recordTrailerName: Option[String] = None,
options: Map[String, String]
)
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ object CobolParametersParser extends Logging {
val PARAM_RECORD_END_OFFSET = "record_end_offset"
val PARAM_FILE_START_OFFSET = "file_start_offset"
val PARAM_FILE_END_OFFSET = "file_end_offset"
val PARAM_RECORD_HEADER_NAME = "record_header_name"
val PARAM_RECORD_TRAILER_NAME = "record_trailer_name"
val PARAM_IS_XCOM = "is_xcom"
val PARAM_IS_TEXT = "is_text"

Expand Down Expand Up @@ -310,6 +312,8 @@ object CobolParametersParser extends Logging {
params.getOrElse(PARAM_DEBUG_LAYOUT_POSITIONS, "false").toBoolean,
params.getOrElse(PARAM_ENABLE_SELF_CHECKS, "false").toBoolean,
MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic")),
params.get(PARAM_RECORD_HEADER_NAME).map(_.trim).filter(_.nonEmpty),
params.get(PARAM_RECORD_TRAILER_NAME).map(_.trim).filter(_.nonEmpty),
params.getMap
)
validateSparkCobolOptions(params, recordFormat, validateRedundantOptions)
Expand Down Expand Up @@ -412,6 +416,9 @@ object CobolParametersParser extends Logging {
CorruptFieldsPolicy.Disabled
}

val recordsToExclude = (parameters.recordHeaderName.map(n => CopybookParser.transformIdentifier(n).toUpperCase).toSet ++
parameters.recordTrailerName.map(n => CopybookParser.transformIdentifier(n).toUpperCase).toSet)

ReaderParameters(
recordFormat = parameters.recordFormat,
isEbcdic = parameters.isEbcdic,
Expand Down Expand Up @@ -470,6 +477,7 @@ object CobolParametersParser extends Logging {
varLenParams.reAdditionalInfo,
varLenParams.inputFileNameColumn,
parameters.metadataPolicy,
recordsToExclude,
parameters.options
)
}
Expand Down Expand Up @@ -982,6 +990,22 @@ object CobolParametersParser extends Logging {
params.contains(PARAM_ENABLE_INDEX_CACHE) && params(PARAM_ENABLE_INDEX_CACHE).toBoolean)
throw new IllegalArgumentException(s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")

if (params.contains(PARAM_RECORD_HEADER_NAME) && params.contains(PARAM_FILE_START_OFFSET)) {
throw new IllegalArgumentException(s"Options '$PARAM_RECORD_HEADER_NAME' and '$PARAM_FILE_START_OFFSET' cannot be used together.")
}

if (params.contains(PARAM_RECORD_TRAILER_NAME) && params.contains(PARAM_FILE_END_OFFSET)) {
throw new IllegalArgumentException(s"Options '$PARAM_RECORD_TRAILER_NAME' and '$PARAM_FILE_END_OFFSET' cannot be used together.")
}

if (params.contains(PARAM_RECORD_HEADER_NAME) && params.contains(PARAM_RECORD_TRAILER_NAME)) {
val headerName = params(PARAM_RECORD_HEADER_NAME).trim
val trailerName = params(PARAM_RECORD_TRAILER_NAME).trim
if (headerName.equalsIgnoreCase(trailerName)) {
throw new IllegalArgumentException(s"Options '$PARAM_RECORD_HEADER_NAME' and '$PARAM_RECORD_TRAILER_NAME' cannot refer to the same record ('$headerName').")
}
}

if (validateRedundantOptions && unusedKeys.nonEmpty) {
val unusedKeyStr = unusedKeys.mkString(",")
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,6 @@ case class ReaderParameters(
reAdditionalInfo: String = "",
inputFileNameColumn: String = "",
metadataPolicy: MetadataPolicy = MetadataPolicy.Basic,
recordsToExclude: Set[String] = Set.empty,
options: Map[String, String] = Map.empty
)
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,24 @@ class CobolSchema(val copybook: Copybook,
val corruptSchemaPolicy: CorruptFieldsPolicy,
val generateSegIdFieldsCnt: Int = 0,
val segmentIdProvidedPrefix: String = "",
val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable {
val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic,
val recordsToExclude: Set[String] = Set.empty) extends Serializable {

val segmentIdPrefix: String = if (segmentIdProvidedPrefix.isEmpty) getDefaultSegmentIdPrefix else segmentIdProvidedPrefix

def getCobolSchema: Copybook = copybook

lazy val getRecordSize: Int = copybook.getRecordSize
lazy val getRecordSize: Int = {
if (recordsToExclude.isEmpty) {
copybook.getRecordSize
} else {
val excludedSize = copybook.ast.children
.filter(r => recordsToExclude.contains(r.name.toUpperCase))
.map(_.binaryProperties.actualSize)
.sum
copybook.getRecordSize - excludedSize
}
}

def isRecordFixedSize: Boolean = copybook.isRecordFixedSize

Expand Down Expand Up @@ -147,7 +158,8 @@ object CobolSchema {
readerParameters.corruptFieldsPolicy,
segIdFieldCount,
segmentIdPrefix,
readerParameters.metadataPolicy
readerParameters.metadataPolicy,
readerParameters.recordsToExclude
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes
generateRecordBytes = readerParams.generateRecordBytes,
generateCorruptFields = generateCorruptFields,
generateCorruptFieldsAsHex = generateCorruptFieldsAsHex,
handler = recordHandler)
handler = recordHandler,
recordsToExclude = readerParams.recordsToExclude)
Row.fromSeq(record)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class CobolSchema(copybook: Copybook,
corruptFieldsPolicy: CorruptFieldsPolicy,
generateSegIdFieldsCnt: Int,
segmentIdProvidedPrefix: String,
metadataPolicy: MetadataPolicy)
metadataPolicy: MetadataPolicy,
recordsToExclude: Set[String] = Set.empty)
extends CobolReaderSchema(copybook,
schemaRetentionPolicy,
isDisplayAlwaysString,
Expand All @@ -70,7 +71,9 @@ class CobolSchema(copybook: Copybook,
generateRecordBytes,
corruptFieldsPolicy,
generateSegIdFieldsCnt,
segmentIdProvidedPrefix
segmentIdProvidedPrefix,
metadataPolicy,
recordsToExclude
) with Logging with Serializable {

@throws(classOf[IllegalStateException])
Expand All @@ -83,7 +86,8 @@ class CobolSchema(copybook: Copybook,
@throws(classOf[IllegalStateException])
private def createSparkSchema(): StructType = {
val generateCorruptFields = corruptFieldsPolicy != CorruptFieldsPolicy.Disabled
val records = for (record <- copybook.getRootRecords) yield {
val records = for (record <- copybook.getRootRecords
if !recordsToExclude.contains(record.name.toUpperCase)) yield {
val group = record.asInstanceOf[Group]
val redefines = copybook.getAllSegmentRedefines
parseGroup(group, redefines)
Expand Down Expand Up @@ -332,7 +336,8 @@ object CobolSchema {
schema.corruptSchemaPolicy,
schema.generateSegIdFieldsCnt,
schema.segmentIdPrefix,
schema.metadataPolicy
schema.metadataPolicy,
schema.recordsToExclude
)
}

Expand Down
Loading
Loading