[yaml] - mongodb write normalization#38376
Conversation
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a MongoDB write schema transform, enabling MongoDB writes in Beam YAML and providing a cross-language implementation. Key additions include the MongoDbWriteSchemaTransformProvider in Java, a Python wrapper in yaml_io.py, and integration test fixtures using Testcontainers. Review feedback identifies critical issues regarding shallow data conversion in both Java and Python implementations, which will cause failures for nested schemas. Additionally, a bug was found in the Java error handling logic where the error schema is unnecessarily nested, potentially causing schema mismatches.
I am having trouble creating individual review comments. Click here to see my feedback.
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java (105)
The error output PCollection is being assigned a nested error schema. The variable errorSchema (defined on line 81) already represents the schema for error records (containing the failed row and the error message). Calling ErrorHandling.errorSchema(errorSchema) again wraps this schema unnecessarily, which will lead to a schema mismatch or an unexpectedly nested structure in the error output.
outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java (127-130)
The conversion from Beam Row to BSON Document is shallow. If the input Row contains nested Row objects or Iterable fields (arrays), row.getValue() will return Beam-specific types that the MongoDB Java driver cannot serialize. This will result in a CodecConfigurationException at runtime. A recursive conversion is required to transform nested Beam Rows into BSON Documents and Iterables into BSON Lists.
sdks/python/apache_beam/yaml/yaml_io.py (744-753)
The row_to_dict function performs a shallow conversion. If the input PCollection contains rows with nested structures (nested rows or lists of rows), _asdict() or dict(row) will not recursively convert those nested Beam Row objects into dictionaries. This will likely cause serialization errors when the data is passed to the underlying mongodbio.WriteToMongoDB transform, as pymongo expects standard Python types. Additionally, avoid using a bare except: block; specify the exception type (e.g., except Exception:) to avoid catching system-exiting exceptions like SystemExit or KeyboardInterrupt.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.