Pyarrow s3fs partition by timestamp

2024/10/15 14:16:09

Is it possible to use a timestamp field in the pyarrow table to partition the s3fs file system by "YYYY/MM/DD/HH" while writing parquet file to s3?

Answer

I was able to achieve with a pyarrow write_to_dataset function which allows you to specify partition columns to create subdirectories.

Example:

import os
import s3fs
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.filesystem import S3FSWrapperaccess_key = <access_key>
secret_key = <secret_key>
bucket_name = <bucket_name>fs = s3fs.S3FileSystem(key=access_key, secret=secret_key)bucket_uri = 's3://{0}/{1}'.format(bucket_name, "data")data = {'date': ['2018-03-04T14:12:15.653Z', '2018-03-03T14:12:15.653Z', '2018-03-02T14:12:15.653Z', '2018-03-05T14:12:15.653Z'],'battles': [34, 25, 26, 57],'citys': ['london', 'newyork', 'boston', 'boston']}
df = pd.DataFrame(data, columns=['date', 'battles', 'citys'])
df['date'] = df['date'].map(lambda t: pd.to_datetime(t, format="%Y-%m-%dT%H:%M:%S.%fZ"))
df['year'], df['month'], df['day'] = df['date'].apply(lambda x: x.year), df['date'].apply(lambda x: x.month), df['date'].apply(lambda x: x.day)
df.groupby(by=['citys'])
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, bucket_uri, filesystem=fs, partition_cols=['year', 'month', 'day'], use_dictionary=True,  compression='snappy', use_deprecated_int96_timestamps=True)
https://en.xdnf.cn/q/69273.html

Related Q&A

flask run vs. python

Im having difficulty getting my flask app to run by using the "python" method. I have no problems usingexport FLASK_APP=microblog.py flask runbut attempting to usepython microblog.pywill resu…

Pandas-Add missing years in time series data with duplicate years

I have a dataset like this where data for some years are missing .County Year Pop 12 1999 1.1 12 2001 1.2 13 1999 1.0 13 2000 1.1I want something like County Year Pop 12 1999 1.1 12…

Saving zip list to csv in Python

How I can write below zip list to csv file in python?[{date: 2015/01/01 00:00, v: 96.5},{date: 2015/01/01 00:01, v: 97.0},{date: 2015/01/01 00:02, v: 93.75},{date: 2015/01/01 00:03, v: 96.0},{date: 20…

unable to download the pipeline provided by spark-nlp library

i am unable to use the predefined pipeline "recognize_entities_dl" provided by the spark-nlp library i tried installing different versions of pyspark and spark-nlp libraryimport sparknlp from…

Can __setattr__() can be defined in a class with __slots__?

Say I have a class which defines __slots__:class Foo(object):__slots__ = [x]def __init__(self, x=1):self.x = x# will the following work?def __setattr__(self, key, value):if key == x:object.__setattr__…

mysql-connector python IN operator stored as list

I am using mysql-connector with python and have a query like this:SELECT avg(downloadtime) FROM tb_npp where date(date) between %s and %s and host like %s",(s_date,e_date,"%" + dc + &quo…

Pandas: Use iterrows on Dataframe subset

What is the best way to do iterrows with a subset of a DataFrame?Lets take the following simple example:import pandas as pddf = pd.DataFrame({Product: list(AAAABBAA),Quantity: [5,2,5,10,1,5,2,3],Start…

Can I parameterize a pytest fixture with other fixtures?

I have a python test that uses a fixture for credentials (a tuple of userid and password)def test_something(credentials)(userid, password) = credentialsprint("Hello {0}, welcome to my test".f…

fit method in python sklearn

I am asking myself various questions about the fit method in sklearn.Question 1: when I do:from sklearn.decomposition import TruncatedSVD model = TruncatedSVD() svd_1 = model.fit(X1) svd_2 = model.fit(…

Django 1.9 JSONField update behavior

Ive recently updated to Django 1.9 and tried updating some of my model fields to use the built-in JSONField (Im using PostgreSQL 9.4.5). As I was trying to create and update my objects fields, I came a…