Kinesis Firehose lambda transformation

2024/11/17 21:28:55

I have the following lambda function as part of Kinesis firehose record transformation which transforms msgpack record from the kinesis input stream to json.

Lambda Runtime: python 3.6

from __future__ import print_functionimport base64
import msgpack
import json
print('Loading function')def lambda_handler(event, context):output = []for record in event['records']:payload = msgpack.unpackb(base64.b64decode(record['data']), raw=False)# Do custom processing on the payload hereoutput_record = {'recordId': record['recordId'],'result': 'Ok','data': json.dumps(payload, ensure_ascii=False).encode('utf8')}output.append(output_record)print('Successfully processed {} records.'.format(len(event['records'])))return {'records': output}

But lambda throwing the following error:

An error occurred during JSON serialization of response: b'
{"id": "d23fd47f-3a62-4383-bcb3-abdb913ea572","timestamp": 1526358140730,"message": "Hello World"
}
' is not JSON serializable
Traceback (most recent call last):
File "/var/lang/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/var/lang/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/var/lang/lib/python3.6/json/encoder.py", line 257, in iterencodereturn _iterencode(o, 0)
File "/var/runtime/awslambda/bootstrap.py", line 110, in 
decimal_serializer
raise TypeError(repr(o) + " is not JSON serializable")

Am I doing anything wrong?

Answer

I was able to fix the issue.

Here is the code which worked for me.

from __future__ import print_functionimport base64
import msgpack
import jsonprint('Loading function')def lambda_handler(event, context):output = []for record in event['records']:payload = msgpack.unpackb(base64.b64decode(record['data']), raw=False)# Do custom processing on the payload hereoutput_record = {'recordId': record['recordId'],'result': 'Ok','data': base64.b64encode(json.dumps(payload).encode('utf-8') + b'\n').decode('utf-8')}output.append(output_record)print('Successfully processed {} records.'.format(len(event['records'])))return {'records': output}
https://en.xdnf.cn/q/71141.html

Related Q&A

Python: find out whether a list of integers is coherent

I am trying to find out whether a list of integers is coherent or at one stretch, meaning that the difference between two neighboring elements must be exactly one and that the numbers must be increasin…

Create resizable/multiline Tkinter/ttk Labels with word wrap

Is it possible to create a multi-line label with word wrap that resizes in sync with the width of its parent? In other words the wordwrap behavior of Notepad as you change the width of the NotePad win…

Unicode, regular expressions and PyPy

I wrote a program to add (limited) unicode support to Python regexes, and while its working fine on CPython 2.5.2 its not working on PyPy (1.5.0-alpha0 1.8.0, implementing Python 2.7.1 2.7.2), both run…

Python str object has no attribute read

Python 3.3.2 import json & urllib.requestJson[{"link":"www.google.com","orderid":"100000222"}, {"link":"www.google.com","orderid&quo…

Efficient upsert of pandas dataframe to MS SQL Server using pyodbc

Im trying to upsert a pandas dataframe to a MS SQL Server using pyodbc. Ive used a similar approach before to do straight inserts, but the solution Ive tried this time is incredibly slow. Is there a mo…

Comparison on the basis of min function

How exactly does the min function work for lists in python ?For example,num = [1,2,3,4,[1,2,3]]num2 = [1,2,3,4,5]min(num,num2) gives num2 as the result. Is the comparison value based or length based ?

Python Pandas rolling aggregate a column of lists

I have a simple dataframe df with a column of lists lists. I would like to generate an additional column based on lists.The df looks like:import pandas as pd lists={1:[[1]],2:[[1,2,3]],3:[[2,9,7,9]],4:…

Easy way of overriding default methods in custom Python classes?

I have a class called Cell:class Cell:def __init__(self, value, color, size):self._value = valueself._color = colorself._size = size# and other methods...Cell._value will store a string, integer, etc. …

Return first non NaN value in python list

What would be the best way to return the first non nan value from this list?testList = [nan, nan, 5.5, 5.0, 5.0, 5.5, 6.0, 6.5]edit:nan is a float

How to subplot pie chart in plotly?

How can I subplot pie1 in fig, so it be located at the first position. this is how I am doing it but it doesnt work out import pandas as pdimport numpy as npimport seaborn as snsimport plotly.offline a…