AWS Batch Job Execution Results in Step Function

2024/4/14 3:53:44

I'm newbie to AWS Step Functions and AWS Batch. I'm trying to integrate AWS Batch Job with Step Function. AWS Batch Job executes simple python scripts which output string value (High level simplified requirement) . I need to have the python script output available to the next state of the step function. How I should be able to accomplish this. AWS Batch Job output does not contain results of the python script. instead it contains all the container related information with input values.

Example : AWS Batch Job executes python script which output "Hello World". I need "Hello World" available to the next state of the step function to execute a lambda associated with it.


I was able to do it, below is my state machine, I took the sample project for running the batch job Manage a Batch Job (AWS Batch, Amazon SNS) and modified it for two lambdas for passing input/output.

{"Comment": "An example of the Amazon States Language for notification on an AWS Batch job completion","StartAt": "Submit Batch Job","TimeoutSeconds": 3600,"States": {"Submit Batch Job": {"Type": "Task","Resource": "arn:aws:states:::batch:submitJob.sync","Parameters": {"JobName": "BatchJobNotification","JobQueue": "arn:aws:batch:us-east-1:1234567890:job-queue/BatchJobQueue-737ed10e7ca3bfd","JobDefinition": "arn:aws:batch:us-east-1:1234567890:job-definition/BatchJobDefinition-89c42b1f452ac67:1"},"Next": "Notify Success","Catch": [{"ErrorEquals": ["States.ALL"],"Next": "Notify Failure"}]},"Notify Success": {"Type": "Task","Resource": "arn:aws:lambda:us-east-1:1234567890:function:readcloudwatchlogs","Parameters": {"LogStreamName.$": "$.Container.LogStreamName"},"ResultPath": "$.lambdaOutput","Next": "ConsumeLogs"},"ConsumeLogs": {"Type": "Task","Resource": "arn:aws:lambda:us-east-1:1234567890:function:consumelogs","Parameters": {"randomstring.$": "$.lambdaOutput.logs"},"End": true},"Notify Failure": {"Type": "Task","Resource": "arn:aws:states:::sns:publish","Parameters": {"Message": "Batch job submitted through Step Functions failed","TopicArn": "arn:aws:sns:us-east-1:1234567890:StepFunctionsSample-BatchJobManagement17968f39-e227-47ab-9a75-08a7dcc10c4c-SNSTopic-1GR29R8TUHQY8"},"End": true}}

The key to read logs was in the Submit Batch Job output which contains LogStreamName, that I passed to my lambda named function:readcloudwatchlogs and read the logs and then eventually passed the read logs to the next function named function:consumelogs. You can see in the attached screenshot consumelogs function printing the logs.

{"Attempts": [{"Container": {"ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243","ExitCode": 0,"LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b","NetworkInterfaces": [],"TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b"},"StartedAt": 1611329367577,"StatusReason": "Essential container in task exited","StoppedAt": 1611329367748}],"Container": {"Command": ["echo","Hello world"],"ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243","Environment": [{"Name": "MANAGED_BY_AWS","Value": "STARTED_BY_STEP_FUNCTIONS"}],"ExitCode": 0,"Image": "","LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b","TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b",
.."Tags": {"resourceArn": "arn:aws:batch:us-east-1:1234567890:job/d36ba07a-54f9-4acf-a4b8-3e5413ea5ffc"}
  • Read Logs Lambda code:
import boto3client = boto3.client('logs')def lambda_handler(event, context):print(event)response = client.get_log_events(logGroupName='/aws/batch/job',logStreamName=event.get('LogStreamName'))log = {'logs': response['events'][0]['message']}return log
  • Consume Logs Lambda Code
import jsonprint('Loading function')def lambda_handler(event, context):print(event)

enter image description here

enter image description here

Related Q&A

Simplify Django test set up with mock objects

Often when Im writing tests for my Django project, I have to write a lot more code to set up database records than I do to actually test the object under test. Currently, I try to use test fixtures to …

How to use with a nested shape?

I am building a dataset with two tensors of shape [batch,width,heigh,3] and [batch,class] for each element. For simplicity lets say class = 5.What shape do you feed to dataset.padded_batch(1000,shape)…

Python, thread and gobject

I am writing a program by a framework using pygtk. The main program doing the following things:Create a watchdog thread to monitor some resource Create a client to receive data from socket call gobjec…

How to type annotate overrided methods in a subclass?

Say I already have a method with type annotations:class Shape:def area(self) -> float:raise NotImplementedErrorWhich I will then subclass multiple times:class Circle:def area(self) -> float:retur…

Import Error: No module named pytz after using easy_install

Today is my first day at Python and have been going through problems. One that I was working on was, "Write a short program which extracts the current date and time from the operating system and p…

Python catch exception pandas.errors.ParserError: Error tokenizing data. C error

I am facing problem with my malfunction csv input file whole reading and which i can deal with by adding "error_bad_lines=False" in my read_csv func to remove those.But i need to report these…

Nested tags in BeautifulSoup - Python

Ive looked at many examples on websites and on stackoverflow but I couldnt find a universal solution to my question. Im dealing with a really messy website and Id like to scrape some data. The markup l…

How do I check if a string is a negative number before passing it through int()?

Im trying to write something that checks if a string is a number or a negative. If its a number (positive or negative) it will passed through int(). Unfortunately isdigit() wont recognize it as a numbe…

openpyxl chage font size of title y_axis.title

I am currently struggling with changing the font of y axis title & the charts title itself.I have tried to create a font setting & applying it to the titles - with no luck what so ever. new_cha…

Combination of all possible cases of a string

I am trying to create a program to generate all possible capitalization cases of a string in python. For example, given abcedfghij, I want a program to generate: Abcdefghij ABcdef.. . . aBcdef.. . ABCD…