Skip to main content

Treasure Data extension for pyspark

Project description

td_pyspark

Treasure Data extension for using pyspark.

$ pip install td-pyspark 

Introduction

First contact support@treasure-data.com to enable td-spark feature. This feature is disabled by default.

td-pyspark is a library to enable Python to access tables in Treasure Data. The features of td_pyspark include:

  • Reading tables in Treasure Data as DataFrame
  • Writing DataFrames to Treasure Data
  • Submitting Presto queries and read the query results as DataFrames

As of June 2019, Spark 2.4.x + Scala 2.11 is supported. Spark 2.4.3 is preferred.

For more details, see also td-spark FAQ.

Quick Start with Docker

You can try td_pyspark using Docker without installing Spark nor Python.

First create td-spark.conf file and set your TD API KEY and site (us, jp, eu01) configurations:

td-spark.conf

spark.td.apikey (Your TD API KEY)
spark.td.site (Your site: us, jp, eu01)
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.enabled true

Launch pyspark Docker image. This image already has a pre-installed td_pyspark library:

$ docker run -it -e TD_SPARK_CONF=td-spark.conf -v $(pwd):/opt/spark/work armtd/td-spark-pyspark:latest
Python 3.6.6 (default, Aug 24 2018, 05:04:18)
[GCC 6.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
19/06/13 19:33:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Python version 3.6.6 (default, Aug 24 2018 05:04:18)
SparkSession available as 'spark'.
2019-06-13 19:33:49.449Z debug [spark] Loading com.treasuredata.spark package - (package.scala:23)
2019-06-13 19:33:49.486Z  info [spark] td-spark version:1.2.0+31-d0f3a15e, revision:d0f3a15, build_time:2019-06-13T10:33:43.655-0700 - (package.scala:24)
2019-06-13 19:33:50.310Z  info [TDServiceConfig] td-spark site: us - (TDServiceConfig.scala:36)
2019-06-13 19:33:51.877Z  info [LifeCycleManager] [session:7ebc16af] Starting a new lifecycle ... - (LifeCycleManager.scala:187)
2019-06-13 19:33:51.880Z  info [LifeCycleManager] [session:7ebc16af] ======== STARTED ======== - (LifeCycleManager.scala:191)
>>> 

Try read a sample table by specifying a time range:

>>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df()
>>> df.show()
2019-06-13 19:48:51.605Z  info [TDRelation] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-06 00:00:00Z) - (TDRelation.scala:170)
2019-06-13 19:48:51.950Z  info [TDRelation] Retrieved 2 partition entries - (TDRelation.scala:176)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|192.225.229.196|  /category/software|                   -| 200|Mozilla/5.0 (Maci...| 117|   GET|1412382292|
|null|120.168.215.131|  /category/software|                   -| 200|Mozilla/5.0 (comp...|  53|   GET|1412382284|
|null|180.198.173.136|/category/electro...| /category/computers| 200|Mozilla/5.0 (Wind...| 106|   GET|1412382275|
|null| 140.168.145.49|   /item/garden/2832|      /item/toys/230| 200|Mozilla/5.0 (Maci...| 122|   GET|1412382267|
|null|  52.168.78.222|/category/electro...|    /item/games/2532| 200|Mozilla/5.0 (comp...|  73|   GET|1412382259|
|null|  32.42.160.165|   /category/cameras|/category/cameras...| 200|Mozilla/5.0 (Wind...| 117|   GET|1412382251|
|null|   48.204.59.23|  /category/software|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...|  52|   GET|1412382243|
|null|136.207.150.227|/category/electro...|                   -| 200|Mozilla/5.0 (iPad...| 120|   GET|1412382234|
|null| 204.21.174.187|   /category/jewelry|   /item/office/3462| 200|Mozilla/5.0 (Wind...|  59|   GET|1412382226|
|null|  224.198.88.93|    /category/office|     /category/music| 200|Mozilla/4.0 (comp...|  46|   GET|1412382218|
|null|   96.54.24.116|     /category/games|                   -| 200|Mozilla/5.0 (Wind...|  40|   GET|1412382210|
|null| 184.42.224.210| /category/computers|                   -| 200|Mozilla/5.0 (Wind...|  95|   GET|1412382201|
|null|  144.72.47.212|/item/giftcards/4684|    /item/books/1031| 200|Mozilla/5.0 (Wind...|  65|   GET|1412382193|
|null| 40.213.111.170|     /item/toys/1085|   /category/cameras| 200|Mozilla/5.0 (Wind...|  65|   GET|1412382185|
|null| 132.54.226.209|/item/electronics...|  /category/software| 200|Mozilla/5.0 (comp...| 121|   GET|1412382177|
|null|  108.219.68.64|/category/cameras...|                   -| 200|Mozilla/5.0 (Maci...|  54|   GET|1412382168|
|null| 168.66.149.218| /item/software/4343|  /category/software| 200|Mozilla/4.0 (comp...| 139|   GET|1412382160|
|null|  80.66.118.103|  /category/software|                   -| 200|Mozilla/4.0 (comp...|  92|   GET|1412382152|
|null|140.171.147.207|     /category/music|   /category/jewelry| 200|Mozilla/5.0 (Wind...| 119|   GET|1412382144|
|null| 84.132.164.204| /item/software/4783|/category/electro...| 200|Mozilla/5.0 (Wind...| 137|   GET|1412382135|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 20 rows
>>> 

Usage

TDSparkContext is an entry point to access td_pyspark's functionalities. To create TDSparkContext, pass your SparkSession (spark) to TDSparkContext:

td = TDSparkContext(spark)

Reading Tables as DataFrames

To read a table, use td.table(table name):

df = td.table("sample_datasets.www_access").df()
df.show()

To change the context database, use td.use(database_name):

td.use("sample_datasets")
# Accesses sample_datasets.www_access
df = td.table("www_access").df()

By calling .df() your table data will be read as Spark's DataFrame. The usage of the DataFrame is the same with PySpark. See also PySpark DataFrame documentation.

Specifying Time Ranges

Treasure Data is a time series database, so reading recent data by specifying a time range is important to reduce the amount of data to be processed. .within(...) function can be used to specify a target time range in a concise syntax. within function accepts the same syntax used in TD_INTERVAL function in Presto.

For example, to read the last 1 hour range of data, use within("-1h"):

td.table("tbl").within("-1h").df()

You can also read the last day's data:

td.table("tbl").within("-1d").df()

You can also specify an offset of the relative time range. This example reads the last days's data beginning from 7 days ago:

td.table("tbl").within("-1d/-7d").df()

If you know an exact time range, within("(start time)/(end time)") is useful:

>>> df = td.table("sample_datasets.www_access").within("2014-10-04/2014-10-05").df()
>>> df.show()
2019-06-13 20:12:01.400Z  info [TDRelation] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-05 00:00:00Z) - (TDRelation.scala:170)
...

See this doc for more examples of interval strings.

Submitting Presto Queries

If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult. In this case, you can utilize Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark:

>>> q = td.presto("select code, * from sample_datasets.www_access")
>>> q.show()
2019-06-13 20:09:13.245Z  info [TDPrestoJDBCRDD]  - (TDPrestoRelation.scala:106)
Submit Presto query:
select code, count(*) cnt from sample_datasets.www_access group by 1
+----+----+
|code| cnt|
+----+----+
| 200|4981|
| 500|   2|
| 404|  17|
+----+----+

The query result is represented as a DataFrame.

To run non query statements (e.g., INSERT INTO, CREATE TABLE, etc.) use execute_presto(sql):

td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")

Using SparkSQL

To use tables in Treaure Data inside Spark SQL, create a view with df.createOrRepalceTempView(...):

# Read TD table as a DataFrame
df = td.table("mydb.test1").df()
# Register the DataFrame as a view
df.createOrReplaceTempView("test1")

spark.sql("SELECT * FROM test1").show()

Create or Drop Databases and Tables

Create a new table or database:

td.create_database_if_not_exists("mydb")
td.create_table_if_not_exists("mydb.test1")

Delete unnecessary tables:

td.drop_table_if_exists("mydb.test1")
td.drop_database_if_exists("mydb")

You can also check the presence of a table:

td.table("mydb.test1").exists() # True if the table exists

Create User-Defined Partition Tables

User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values).

You can create a UDP table partitioned by id (string type column) as follows:

td.create_udp_s("mydb.user_list", "id")

To create a UDP table, partitioned by Long (bigint) type column, use td.create_udp_l:

td.create_udp_l("mydb.departments", "dept_id")

Uploading DataFrames to Treasure Data

To save your local DataFrames as a table, td.insert_into(df, table) and td.create_or_replace(df, table) can be used:

# Insert the records in the input DataFrame to the target table:
td.insert_into(df, "mydb.tbl1")

# Create or replace the target table with the content of the input DataFrame:
td.create_or_replace(df, "mydb.tbl2")

Running PySpark jobs with spark-submit

To submit your PySpark script to a Spark cluster, you will need the following files:

  • td-spark.conf file that describes your TD API key and spark.td.site (See above).
  • td_pyspark.py
    • Check the file location using pip show -f td-pyspark, and copy td_pyspark.py to your favorite location
  • td-spark-assembly.jar
    • Get the latest version from Download page.
  • Pre-build Spark
    • Download Spark 2.4.x with Hadoop 2.7.x (built for Scala 2.11)
    • Extract the downloaded archive. This folder location will be your $SPARK_HOME.

Here is an example PySpark application code: my_app.py

import td_pyspark
from pyspark.sql import SparkSession

# Create a new SparkSession 
spark = SparkSession\
    .builder\
    .appName("myapp")\
    .getOrCreate()

# Create TDSparkContext
td = td_pyspark.TDSparkContext(spark)

# Read the table data within -1d (yesterday) range as DataFrame
df = td.table("sample_datasets.www_access").within("-1d").df()
df.show()

To run my_app.py use spark-submit by specifying the necessary files mentioned above:

# Launching PySpark with the local mode
$ ${SPARK_HOME}/bin/spark-submit --master "local[4]"\
  --driver-class-path td-spark-assembly.jar\
  --properties-file=td-spark.conf\
  --py-files td_pyspark.py\
  my_app.py

local[4] means running a Spark cluster locally using 4 threads.

To use a remote Spark cluster, specify master address, e.g., --master=spark://(master node IP address):7077.

Using td-spark assembly included in the PyPI package.

The package contains pre-built binary of td-spark so that you can add it into the classpath as default. TDSparkContextBuilder.default_jar_path() returns the path to the default td-spark-assembly.jar file. Passing the path to jars method of TDSparkContextBuilder will automatically build the SparkSession including the default jar.

import td_pyspark
from pyspark.sql import SparkSession

builder = SparkSession\
    .builder\
    .appName("td-pyspark-app")

td = td_pyspark.TDSparkContextBuilder(builder)\
    .apikey("XXXXXXXXXXXXXX")\
    .jars(TDSparkContextBuilder.default_jar_path())\
    .build()

For Developers

Running pyspark with td_pyspark:

$ ${SPARK_HOME}/bin/spark-submit --master "local[4]"  --driver-class-path td-spark-assembly.jar  --properties-file=td-spark.conf --py-files td_pyspark.py your_app.py

How To Publish td_pyspark

Prerequisites

Twine is a secure utility to publish the python package. It's commonly used to publish Python package to PyPI. First you need to install the package in advance.

$ pip install twine

Having the configuration file for PyPI credential may be useful.

$ cat << 'EOF' > ~/.pypirc 
[distutils]
index-servers =
  pypi
  pypitest

[pypi]
repository=https://upload.pypi.org/legacy/
username=<your_username>
password=<your_password>

[pypitest]
repository=https://test.pypi.org/legacy/
username=<your_username>

password=<your_password>
EOF

Build Package

Build the package in the raw source code and wheel format.

$ make package

Publish Package

Upload the package to the test repository first.

$ twine upload \
  --repository pypitest \
  dist/*

If you do not find anything wrong in the test repository, then it's time to publish the package.

$ twine upload \
  --repository pypi \
  dist/*

TDSparkContextBuilder automatically set site specific information.

Customize API endpoints

Use TDSparkContextBuilder to specify differnt API endpoints (e.g., development API):

import td_pyspark
from pyspark.sql import SparkSession

builder = SparkSession\
    .builder\
    .appName("td-pyspark-app")

td = td_pyspark.TDSparkContextBuilder(builder)\
    .apikey("XXXXXXXXXXXXXX")\
    .api_endpoint("api.treasuredata.com")\
    .build()

# Read the table data within -1d (yesterday) range as DataFrame
df = td.table("sample_datasets.www_access")\
    .within("-1d")\
    .df()

df.show()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

td_pyspark-19.9.0.tar.gz (23.0 MB view details)

Uploaded Source

Built Distribution

td_pyspark-19.9.0-py3-none-any.whl (45.8 MB view details)

Uploaded Python 3

File details

Details for the file td_pyspark-19.9.0.tar.gz.

File metadata

  • Download URL: td_pyspark-19.9.0.tar.gz
  • Upload date:
  • Size: 23.0 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.14.0 pkginfo/1.5.0.1 requests/2.20.1 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.3

File hashes

Hashes for td_pyspark-19.9.0.tar.gz
Algorithm Hash digest
SHA256 14d7dd49ecdb818d1579c63812729d5eed7393da50456004b3560061882fb795
MD5 cb042fa5e86d69452f5838d860b6b848
BLAKE2b-256 f4fc4e0149a6c44d58fbbd0c138f025c2966f209dd3bf4bd61443f1d3f3d3e28

See more details on using hashes here.

File details

Details for the file td_pyspark-19.9.0-py3-none-any.whl.

File metadata

  • Download URL: td_pyspark-19.9.0-py3-none-any.whl
  • Upload date:
  • Size: 45.8 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.14.0 pkginfo/1.5.0.1 requests/2.20.1 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.31.1 CPython/3.7.3

File hashes

Hashes for td_pyspark-19.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3bf6be6d89ea3284d21d4a24ae8e97e44bfbad989b459b046793e4106e2c1cd4
MD5 653929feccce51e5299258f71219f689
BLAKE2b-256 aa6e3288fe1409f71204ee3b96382a4bcca900912dc77a5a0cef8b55fcab23ea

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page