Google Cloud Dataflow fails in combine function due to worker losing contact

2024/10/13 8:20:06

My Dataflow consistently fails in my combine function with no errors reported in the logs beyond a single entry of:

 A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service.

I am using the Apache Beam Python SDK 2.4.0. I have tried performing this step with both CombinePerKey and CombineGlobally. The pipeline failed in the combine function in both cases. The pipeline completes when running with a smaller amount of data.

Am I exhausting worker resources and not being told about it? What can cause a worker to lose contact with the service?

Update:

Using n1-highmem-4 workers gives me the same failure. When I check Stackdriver I see no errors, but three kinds of warnings: No session file found, Refusing to split, and Processing lull. My input collection size says it's 17,000 elements spread across ~60 MB, but Stackdriver has a statement saying I'm using ~25 GB on a single worker which is getting towards the max. For this input, each accumulator created in my CombineFn should take roughly 150 MB memory. Is my pipeline creating too many accumulators and exhausting its memory? If so, how can I tell it to merge accumulators more often or limit the number created?

I do have an error log entry verifying my worker was killed due to OOM. It just isn't tagged as a worker error which is the default filtering for the Dataflow monitor.

The pipeline definition looks something like:

table1 = (p | "Read Table1" >> beam.io.Read(beam.io.BigQuerySource(query=query))| "Key rows" >> beam.Map(lambda row: (row['key'], row)))
table2 = (p | "Read Table2" >> beam.io.Read(beam.io.BigQuerySource(query=query))| "Key rows" >> beam.Map(lambda row: (row['key'], row)))merged = ({"table1": table1, "table2": table2}| "Join" >> beam.CoGroupByKey()| "Reshape" >> beam.ParDo(ReshapeData())| "Key rows" >> beam.Map(lambda row: (row['key'], row)))| "Build matrix" >> beam.CombinePerKey(MatrixCombiner())  # Dies here| "Write matrix" >> beam.io.avroio.WriteToAvro())
Answer

Running with fewer workers leads to less accumulators and successful completion of the pipeline.

https://en.xdnf.cn/q/118104.html

Related Q&A

AttributeError: super object has no attribute __getattr__

Ive been searching for the solution of this problem over the all internet but I still cant find the right solution. There are lots of generic answers but none of those have solved my problem..I am tryi…

Selenium load time errors - looking for possible workaround

I am trying to data scrape from a certain website. I am using Selenium so that I can log myself in, and then start parsing through data. I have 3 main errors:Last page # not loading properly. here I am…

How to POST ndb.StructuredProperty?

Problem:I have following EndpointsModels,class Role(EndpointsModel):label = ndb.StringProperty()level = ndb.IntegerProperty()class Application(EndpointsModel):created = ndb.DateTimeProperty(auto_now_ad…

Issue computing difference between two csv files

Im trying to obtain the difference between two csv files A.csv and B.csv in order to obtain new rows added in the second file. A.csv has the following data.acct ABC 88888888 99999999 ABC-G…

How do I display an extremly long image in Tkinter? (how to get around canvas max limit)

Ive tried multiple ways of displaying large images with tkinterreally long image No matter what Ive tried, there doesnt seem to be any code that works. The main issue is that Canvas has a maximum heigh…

NoneType has no attribute IF-ELSE solution

Im parsing an HTML file and searching for status of order in it. Sometimes, status doesnt exist, so BeautifulSoup returns NoneType, when Im using it. To solve this problem I use if-else statement, but …

looking for an inverted heap in python

Id like to comb off the n largest extremes from a timeseries. heapq works perfectly for the nlargestdef nlargest(series, n):count = 0heap = []for e in series:if count < n:count+=1hp.heappush(heap, e…

Concatenating Multiple DataFrames with Non-Standard Columns

Is there a good way to concatenate a list of DataFrames where the columns are not regular between DataFrames? The desired outcome is to match up all columns that are a match but to keep the ones that …

Python Conditionally Add Class to td Tags in HTML Table

I have some data in the form of a csv file that Im reading into Python and converting to an HTML table using Pandas.Heres some example data:name threshold col1 col2 col3 A 10 12 9 13…

Sort a dictionary of dictionaries python

I have a dictionary of dictionaries like the followingd = {hain: {facet: 1, wrapp: 1, chinoiserie: 1}, library: {sconc: 1, floor: 1, wall: 2, lamp: 6, desk: 1, table: 1, maine: 1} }So, I want to rever…