Skip to main content

Treasure Data Driver for Python

Project description

pytd

Build Status Build status PyPI version

Quickly read/write your data directly from/to the Presto query engine and Plazma primary storage

Unlike the other official Treasure Data API libraries for Python, td-client-python and pandas-td, pytd gives a direct access to their back-end query and storage engines. The seamless connection allows your Python code to read and write a large volume of data in a shorter time. It eventually makes your day-to-day data analytics work more efficient and productive.

Project milestones

This project has been actively developed based on the milestones.

Installation

pip install pytd

Usage

Set your API key and endpoint to the environment variables, TD_API_KEY and TD_API_SERVER, respectively, and create a client instance:

import pytd

client = pytd.Client(database='sample_datasets')
# or, hard-code your API key, endpoint, and/or query engine:
# >>> pytd.Client(apikey='1/XXX', endpoint='https://api.treasuredata.com/', database='sample_datasets', engine='presto')

Issue Presto query and retrieve the result:

client.query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1')
# {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ['AAME', 9252], ..., ['ZUMZ', 2364]]}

In case of Hive:

client = pytd.Client(database='sample_datasets', engine='hive')
client.query('select hivemall_version()')
# {'columns': ['_c0'], 'data': [['0.6.0-SNAPSHOT-201901-r01']]} (as of Feb, 2019)

Once you install the package with PySpark dependencies, any data represented as pandas.DataFrame can directly be written to TD via td-spark:

pip install pytd[spark]
import pandas as pd

df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
client.load_table_from_dataframe(df, 'takuti.foo', if_exists='overwrite')

If you want to use existing td-spark JAR file, creating SparkWriter with td_spark_path option would be helpful.

writer = pytd.writer.SparkWriter(apikey='1/XXX', endpoint='https://api.treasuredata.com/', td_spark_path='/path/to/td-spark-assembly.jar')
client = pytd.Client(database='sample_datasets', writer=writer)
client.load_table_from_dataframe(df, 'mydb.bar', if_exists='overwrite')

DB-API

pytd implements Python Database API Specification v2.0 with the help of prestodb/presto-python-client.

Connect to the API first:

from pytd.dbapi import connect

conn = connect(pytd.Client(database='sample_datasets'))
# or, connect with Hive:
# >>> conn = connect(pytd.Client(database='sample_datasets', engine='hive'))

Cursor defined by the specification allows us to flexibly fetch query results from a custom function:

def query(sql, connection):
    cur = connection.cursor()
    cur.execute(sql)
    rows = cur.fetchall()
    columns = [desc[0] for desc in cur.description]
    return {'data': rows, 'columns': columns}

query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn)

Below is an example of generator-based iterative retrieval, just like pandas.DataFrame.iterrows:

def iterrows(sql, connection):
    cur = connection.cursor()
    cur.execute(sql)
    index = 0
    columns = None
    while True:
        row = cur.fetchone()
        if row is None:
            break
        if columns is None:
            columns = [desc[0] for desc in cur.description]
        yield index, dict(zip(columns, row))
        index += 1

for index, row in iterrows('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn):
    print(index, row)
# 0 {'cnt': 590, 'symbol': 'AAIT'}
# 1 {'cnt': 82, 'symbol': 'AAL'}
# 2 {'cnt': 9252, 'symbol': 'AAME'}
# 3 {'cnt': 253, 'symbol': 'AAOI'}
# 4 {'cnt': 5980, 'symbol': 'AAON'}
# ...

How to replace pandas-td

pytd offers pandas-td-compatible functions that provide the same functionalities in a more efficient way. If you are still using pandas-td, we recommend you to switch to pytd as follows.

First, install the package from PyPI:

pip install pytd
# or, `pip install pytd[spark]` if you wish to use `to_td`

Next, make the following modifications on the import statements.

Before:

import pandas_td as td
In [1]: %%load_ext pandas_td.ipython

After:

import pytd.pandas_td as td
In [1]: %%load_ext pytd.pandas_td.ipython

Consequently, all pandas_td code should keep running correctly with pytd. Report an issue from here if you noticed any incompatible behaviors.

Use existing td-spark-assembly.jar file

If you want to use existing td-spark JAR file, creating SparkWriter with td_spark_path option would be helpful. You can pass a writer to connect() function.

import pytd
import pytd.pandas_td as td
import pandas as pd
apikey = '1/XXX'
endpoint = 'https://api.treasuredata.com/'

writer = pytd.writer.SparkWriter(apikey=apikey, endpoint=endpoint, td_spark_path='/path/to/td-spark-assembly.jar')
con = td.connect(apikey=apikey, endpoint=endpoint, writer=writer)

df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
td.to_td(df, 'mydb.buzz', con, if_exists='replace', index=False)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytd-0.5.0.tar.gz (17.9 kB view details)

Uploaded Source

Built Distribution

pytd-0.5.0-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file pytd-0.5.0.tar.gz.

File metadata

  • Download URL: pytd-0.5.0.tar.gz
  • Upload date:
  • Size: 17.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.20.1 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.5.2

File hashes

Hashes for pytd-0.5.0.tar.gz
Algorithm Hash digest
SHA256 b33c630ed462d90f26d3c2f726e2dfa94f2a57ad1f3afb5a37a7bc8bb08f613a
MD5 6345f2385efdd88693a4fa07cf290235
BLAKE2b-256 5d79211818d9f2351db3665bd705de2350e3d1e9b0ca6bdae441a8deafd77efd

See more details on using hashes here.

File details

Details for the file pytd-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: pytd-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.20.1 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.5.2

File hashes

Hashes for pytd-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 353c54ae296146148de2b7b20bc8665ae0940fbe3901e79f1457dfe0f59b2a3f
MD5 346b059e51c26528588b750ad4909c3e
BLAKE2b-256 a617d8b7cef8fdbe1d46a47892f0cb898244cdc2166cb2666d27fa2885042f06

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page