Skip to main content

Treasure Data Driver for Python

Project description

pytd

Build Status Build status PyPI version

pytd provides user-friendly interfaces to Treasure Data's REST APIs, Presto query engine, and Plazma primary storage.

The seamless connection allows your Python code to efficiently read/write a large volume of data from/to Treasure Data. Eventually, pytd makes your day-to-day data analytics work more productive.

Installation

pip install pytd

Usage

Set your API key and endpoint to the environment variables, TD_API_KEY and TD_API_SERVER, respectively, and create a client instance:

import pytd

client = pytd.Client(database='sample_datasets')
# or, hard-code your API key, endpoint, and/or query engine:
# >>> pytd.Client(apikey='1/XXX', endpoint='https://api.treasuredata.com/', database='sample_datasets', default_engine='presto')

Query in Treasure Data

Issue Presto query and retrieve the result:

client.query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1')
# {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ['AAME', 9252], ..., ['ZUMZ', 2364]]}

In case of Hive:

client.query('select hivemall_version()', engine='hive')
# {'columns': ['_c0'], 'data': [['0.6.0-SNAPSHOT-201901-r01']]} (as of Feb, 2019)

It is also possible to explicitly initialize pytd.Client for Hive:

client_hive = pytd.Client(database='sample_datasets', default_engine='hive')
client_hive.query('select hivemall_version()')

Write data to Treasure Data

Data represented as pandas.DataFrame can be written to Treasure Data as follows:

import pandas as pd

df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
client.load_table_from_dataframe(df, 'takuti.foo', writer='bulk_import', if_exists='overwrite')

For the writer option, pytd supports three different ways to ingest data to Treasure Data:

  1. Bulk Import API: bulk_import (default)
    • Convert data into a CSV file and upload in the batch fashion.
  2. Presto INSERT INTO query: insert_into
    • Insert every single row in DataFrame by issuing an INSERT INTO query through the Presto query engine.
    • Recommended only for a small volume of data.
  3. td-spark: spark
    • Local customized Spark instance directly writes DataFrame to Treasure Data's primary storage system.

Enabling Spark Writer

Since td-spark gives special access to the main storage system via PySpark, follow the instructions below:

  1. Contact support@treasuredata.com to activate the permission to your Treasure Data account.
  2. Install pytd with [spark] option if you use the third option:
    pip install pytd[spark]
    

If you want to use existing td-spark JAR file, creating SparkWriter with td_spark_path option would be helpful.

from pytd.writer import SparkWriter

writer = SparkWriter(apikey='1/XXX', endpoint='https://api.treasuredata.com/', td_spark_path='/path/to/td-spark-assembly.jar')
client.load_table_from_dataframe(df, 'mydb.bar', writer=writer, if_exists='overwrite')

DB-API

pytd implements Python Database API Specification v2.0 with the help of prestodb/presto-python-client.

Connect to the API first:

from pytd.dbapi import connect

conn = connect(pytd.Client(database='sample_datasets'))
# or, connect with Hive:
# >>> conn = connect(pytd.Client(database='sample_datasets', default_engine='hive'))

Cursor defined by the specification allows us to flexibly fetch query results from a custom function:

def query(sql, connection):
    cur = connection.cursor()
    cur.execute(sql)
    rows = cur.fetchall()
    columns = [desc[0] for desc in cur.description]
    return {'data': rows, 'columns': columns}

query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn)

Below is an example of generator-based iterative retrieval, just like pandas.DataFrame.iterrows:

def iterrows(sql, connection):
    cur = connection.cursor()
    cur.execute(sql)
    index = 0
    columns = None
    while True:
        row = cur.fetchone()
        if row is None:
            break
        if columns is None:
            columns = [desc[0] for desc in cur.description]
        yield index, dict(zip(columns, row))
        index += 1

for index, row in iterrows('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn):
    print(index, row)
# 0 {'cnt': 590, 'symbol': 'AAIT'}
# 1 {'cnt': 82, 'symbol': 'AAL'}
# 2 {'cnt': 9252, 'symbol': 'AAME'}
# 3 {'cnt': 253, 'symbol': 'AAOI'}
# 4 {'cnt': 5980, 'symbol': 'AAON'}
# ...

How to replace pandas-td

pytd offers pandas-td-compatible functions that provide the same functionalities more efficiently. If you are still using pandas-td, we recommend you to switch to pytd as follows.

First, install the package from PyPI:

pip install pytd
# or, `pip install pytd[spark]` if you wish to use `to_td`

Next, make the following modifications on the import statements.

Before:

import pandas_td as td
In [1]: %%load_ext pandas_td.ipython

After:

import pytd.pandas_td as td
In [1]: %%load_ext pytd.pandas_td.ipython

Consequently, all pandas_td code should keep running correctly with pytd. Report an issue from here if you noticed any incompatible behaviors.

Use existing td-spark-assembly.jar file

If you want to use existing td-spark JAR file, creating SparkWriter with td_spark_path option would be helpful. You can pass a writer to connect() function.

import pytd
import pytd.pandas_td as td
import pandas as pd
apikey = '1/XXX'
endpoint = 'https://api.treasuredata.com/'

writer = pytd.writer.SparkWriter(apikey=apikey, endpoint=endpoint, td_spark_path='/path/to/td-spark-assembly.jar')
con = td.connect(apikey=apikey, endpoint=endpoint, writer=writer)

df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
td.to_td(df, 'mydb.buzz', con, if_exists='replace', index=False)

For developers

We use black and isort as a formatter, and flake8 as a linter. Our CI checks format with them.

Note that black requires Python 3.6+ while pytd supports 3.5+, so you must need to have Python 3.6+ for development.

We highly recommend you to introduce pre-commit to ensure your commit follows required format.

You can install pre-commit as follows:

pip install pre-commit
pre-commit install

Now, black, isort, and flake8 will check each time you commit changes. You can skip these check with git commit --no-verify.

If you want to check code format manually, you can install them as follows:

pip install black isort flake8

Then, you can run those tool manually;

black pytd
flake8 pytd
isort

You can run formatter, linter, and test by using nox as the following:

pip install nox # You should install at the first time
nox

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pytd-0.7.0.tar.gz (27.2 kB view details)

Uploaded Source

Built Distribution

pytd-0.7.0-py3-none-any.whl (31.5 kB view details)

Uploaded Python 3

File details

Details for the file pytd-0.7.0.tar.gz.

File metadata

  • Download URL: pytd-0.7.0.tar.gz
  • Upload date:
  • Size: 27.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.32.2 CPython/3.7.3

File hashes

Hashes for pytd-0.7.0.tar.gz
Algorithm Hash digest
SHA256 011dd76fcb41f06db40df0e57696a9f3cb40e4c26dd0e6d5735fb68e1a5b4ea1
MD5 3b306fda8f8c988418c7a7d27c894105
BLAKE2b-256 4ada530fdf72ebebc31881ea5a7d10129c92fc9ea2aefc8bb3ec290ea8856ff8

See more details on using hashes here.

File details

Details for the file pytd-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: pytd-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 31.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.32.2 CPython/3.7.3

File hashes

Hashes for pytd-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5eaf663348825cc8a59aa7b76d2a0634e5d6f9db0b9531e08a548d40d9e3f8af
MD5 db2960cab4491186bca732a5ec2797b0
BLAKE2b-256 7b6f7e7fb93385abf046dd5817c37a714a8e03c72d8dbb2899512ecdefa4eaa7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page