Writing nested schema to BigQuery from Dataflow (Python)

2024/10/4 23:31:50

I have a Dataflow job to write to BigQuery. It works well for non-nested schema, however fails for the nested schema.

Here is my Dataflow pipeline:

pipeline_options = PipelineOptions()p = beam.Pipeline(options=pipeline_options)wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)schema = 'url: STRING,' \'ua: STRING,' \'method: STRING,' \'man: RECORD,' \'man.ip: RECORD,' \'man.ip.cc: STRING,' \'man.ip.city: STRING,' \'man.ip.as: INTEGER,' \'man.ip.country: STRING,' \'man.res: RECORD,' \'man.res.ip_dom: STRING'first = p | 'read' >> ReadFromText(wordcount_options.input)second = (first| 'process' >> (beam.ParDo(processFunction()))| 'write' >> beam.io.WriteToBigQuery('myBucket:tableFolder.test_table',schema=schema))

I created BigQuery Table using the following Schema is:

[{"mode": "NULLABLE","name": "url","type": "STRING"},{"mode": "NULLABLE","name": "ua","type": "STRING"},{"mode": "NULLABLE","name": "method","type": "STRING"},{"mode": "REPEATED","name": "man","type": "RECORD","fields":[{"mode": "REPEATED","name": "ip","type": "RECORD","fields":[{"mode": "NULLABLE","name": "cc","type": "STRING"},{"mode": "NULLABLE","name": "city","type": "STRING"},{"mode": "NULLABLE","name": "as","type": "INTEGER"},{"mode": "NULLABLE","name": "country","type": "STRING"}]},{"mode": "REPEATED","name": "res","type": "RECORD","fields":[{"mode": "NULLABLE","name": "ip_dom","type": "STRING"}]}]}
]

I am getting the following error:

BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:Message: Invalid value for: url is not a valid valueHTTP Code: 400

Question Can someone please guide me? What am I doing wrong? Also, If there is a better way to iterate through all the nested schema and write to BigQuery please suggest?

Additional info My data file:

{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}
Answer

The problem with your code is that you try to use nested fields while specifying BigQuery Table Schema as string, which is not supported. In order to push nested records into BigQuery from Apache Beam you need to create TableSchema object, i.e using built-in parser:

from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)

You need to pass schema as JSON string there, you can obtain it using the following command in your terminal (I assume that you have gcloud tools installed):

bq --project=your-gcp-project-name --format=json show your.table.name > schema.json

and in Python use it as follows:

table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))

Then in your pipeline:

 beam.io.WriteToBigQuery('myBucket:tableFolder.test_table',schema=table_schema)

You can also take a look at the example showing manual creation of TableSchema object: https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py

which is (from the linked example):

from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)

then just use table_schema variable in beam.io.WriteToBigQuery.

https://en.xdnf.cn/q/70557.html

Related Q&A

Python decorators on class members fail when decorator mechanism is a class

When creating decorators for use on class methods, Im having trouble when the decorator mechanism is a class rather than a function/closure. When the class form is used, my decorator doesnt get treated…

Why does comparison of a numpy array with a list consume so much memory?

This bit stung me recently. I solved it by removing all comparisons of numpy arrays with lists from the code. But why does the garbage collector miss to collect it?Run this and watch it eat your memor…

StringIO portability between python2 and python3 when capturing stdout

I have written a python package which I have managed to make fully compatible with both python 2.7 and python 3.4, with one exception that is stumping me so far. The package includes a command line scr…

How to redirect data to a getpass like password input?

Im wring a python script for running some command. Some of those commands require user to input password, I did try to input data in their stdin, but it doesnt work, here is two simple python program…

How to grab one random item from a database in Django/postgreSQL?

So i got the database.objects.all() and database.objects.get(name) but how would i got about getting one random item from the database. Im having trouble trying to figure out how to get it ot select on…

Pyspark Dataframe pivot and groupby count

I am working on a pyspark dataframe which looks like belowid category1 A1 A1 B2 B2 A3 B3 B3 BI want to unstack the category column and count their occurrences. So, the result I want is shown belowid A …

Create an excel file from BytesIO using python

I am using pandas library to store excel into bytesIO memory. Later, I am storing this bytesIO object into SQL Server as below-df = pandas.DataFrame(data1, columns=[col1, col2, col3])output = BytesIO()…

python send csv data to spark streaming

I would like to try and load a csv data in python and stream each row spark via SPark Streaming.Im pretty new to network stuff. Im not exactly if Im supposed to create a server python script that once …

Python string representation of binary data

Im trying to understand the way Python displays strings representing binary data.Heres an example using os.urandomIn [1]: random_bytes = os.urandom(4)In [2]: random_bytes Out[2]: \xfd\xa9\xbe\x87In [3]…

Combining Spark Streaming + MLlib

Ive tried to use a Random Forest model in order to predict a stream of examples, but it appears that I cannot use that model to classify the examples. Here is the code used in pyspark:sc = SparkContext…