How to read BigQuery table using python pipeline code in GCP Dataflow

2024/10/15 3:12:56

Could someone please share syntax to read/write bigquery table in a pipeline written in python for GCP Dataflow

Answer

Run on Dataflow

First, construct a Pipeline with the following options for it to run on GCP DataFlow:

import apache_beam as beamoptions = {'project': <project>,'runner': 'DataflowRunner','region': <region>,'setup_file': <setup.py file>}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)

Read from BigQuery

Define a BigQuerySource with your query and use beam.io.Read to read data from BQ:

BQ_source = beam.io.BigQuerySource(query = <query>)
BQ_data = pipeline | beam.io.Read(BQ_source)

Write to BigQuery

There are two options to write to bigquery:

  • use a BigQuerySink and beam.io.Write:

    BQ_sink = beam.io.BigQuerySink(<table>, dataset=<dataset>, project=<project>)
    BQ_data | beam.io.Write(BQ_sink)
    
  • use beam.io.WriteToBigQuery:

    BQ_data | beam.io.WriteToBigQuery(<table>, dataset=<dataset>, project=<project>)
    
https://en.xdnf.cn/q/69330.html

Related Q&A

How can I wrap a python function in a way that works with with inspect.signature?

Some uncontroversial background experimentation up front: import inspectdef func(foo, bar):passprint(inspect.signature(func)) # Prints "(foo, bar)" like youd expectdef decorator(fn):def _wra…

Python OpenCV Error: TypeError: Image data cannot be converted to float

So I am trying to create a Python Program to detect similar details in two images using Pythons OpenCV. I have the two images and they are in my current directory, and they exist (see the code in line…

Specify timestamp on each packet in Scapy?

With Scapy, when I create a packet and write it to a pcap file, it sets the timestamp of the packet to the current time.This is my current usage. 1335494712.991895 being the time I created the packet:&…

Converting a dataframe to dictionary with multiple values

I have a dataframe likeSr.No ID A B C D1 Tom Earth English BMW2 Tom Mars Spanish BMW Green 3 Michael Mercury Hindi …

How do I create KeyPoints to compute SIFT?

I am using OpenCV-Python.I have identified corner points using cv2.cornerHarris. The output is of type dst.I need to compute SIFT features of the corner points. The input to sift.compute() has to be of…

Error in Tensorboards(PyTorch) add_graph

Im following this Pytorchs Tensorboard documentation. I have the following code: model = torchvision.models.resnet50(False) writer.add_graph(model)It throws the following error:_ = model(*args) # dont…

Population must be a sequence or set. For dicts, use list(d)

I try to excute this code and I get the error bellow, I get the error in the random function and I dont know how to fix it, please help me.def load_data(sample_split=0.3, usage=Training, to_cat=True, v…

Why do imports fail in setuptools entry_point scripts, but not in python interpreter?

I have the following project structure:project |-project.py |-__init__.py |-setup.py |-lib|-__init__.py|-project|-__init__.py|-tools.pywith project.py:from project.lib import *def main():print("ma…

msgpack unserialising dict key strings to bytes

I am having issues with msgpack in python. It seems that when serialising a dict, if the keys are strings str, they are not unserialised properly and causing KeyError exceptions to be raised.Example:&g…

Better solution for Python Threading.Event semi-busy waiting

Im using pretty standard Threading.Event: Main thread gets to a point where its in a loop that runs:event.wait(60)The other blocks on a request until a reply is available and then initiates a:event.set…