Skip to content

vast-data/vastdb_sdk

Repository files navigation

version

VAST DB Python SDK

A Python SDK for seamless interaction with VAST Database and VAST Catalog. Enables powerful data operations including:

  • Schema and table management
  • Efficient data ingest and querying
  • Advanced filtering with predicate pushdown
  • Direct integration with PyArrow and DuckDB
  • File system querying through VAST Catalog

vastdb

For technical details about VAST Database architecture, see the whitepaper.

Getting Started

Requirements

Installation

pip install vastdb

See the Release Notes for the SDK.

Quick Start

Create schemas and tables, basic inserts, and selects:

import pyarrow as pa
import vastdb

session = vastdb.connect(
    endpoint='http://vip-pool.v123-xy.VastENG.lab',
    access=AWS_ACCESS_KEY_ID,
    secret=AWS_SECRET_ACCESS_KEY)

with session.transaction() as tx:
    bucket = tx.bucket("bucket-name")

    schema = bucket.create_schema("schema-name")
    print(bucket.schemas())

    columns = pa.schema([
        ('c1', pa.int16()),
        ('c2', pa.float32()),
        ('c3', pa.utf8()),
    ])
    table = schema.create_table("table-name", columns)
    print(schema.tables())
    print(table.columns())

    arrow_table = pa.table(schema=columns, data=[
        [111, 222, 333],
        [0.5, 1.5, 2.5],
        ['a', 'bb', 'ccc'],
    ])
    table.insert(arrow_table)

    # run `SELECT * FROM t`
    reader = table.select()  # return a `pyarrow.RecordBatchReader`
    result = reader.read_all()  # build an PyArrow Table from the `pyarrow.RecordBatch` objects read from VAST
    assert result == arrow_table

    # the transaction is automatically committed when exiting the context

Note: the transaction must be remain open while the returned pyarrow.RecordBatchReader generator is being used.

The list of supported data types can be found here.

Features

Select Performance

The Table.select() method accepts a QueryConfig object that modifies how the select is fulfilled.

The most important setting is the data_endpoints parameter that, when set, will allow the SDK to parallelize the select across multiple CNodes. Without this, only the CNode specified in the connect() will service the select.

from vastdb.config import QueryConfig

# load default configuration values
cfg = QueryConfig()

# set data_endpoints to CNode VIPs
cfg.data_endpoints = [
    "http://172.19.196.1",
    "http://172.19.196.2",
    "http://172.19.196.3",
    "http://172.19.196.4",
]

table.select(columns=['c1'], predicate=(_.c2 > 2), config=cfg)

If using DNS with either TTL=0 or multi-response per the Best Practice on Load Balancing CNodes, passing in the same DNS name equal to the number of VIPs is a decent proxy.

cfg.data_endpoints = ["http://vip-pool.v123-xy.VastENG.lab"] * 16  # assuming 16 VIPs in the pool

Filters and Projections

The SDK supports predicate and projection pushdown using Ibis:

from ibis import _

# SELECT c1 FROM t WHERE (c2 > 2) AND (c3 IS NULL)
table.select(columns=['c1'],
             predicate=(_.c2 > 2) & _.c3.isnull())

# SELECT c2, c3 FROM t WHERE (c2 BETWEEN 0 AND 1) OR (c2 > 10)
table.select(columns=['c2', 'c3'],
             predicate=(_.c2.between(0, 1) | (_.c2 > 10))

# SELECT * FROM t WHERE c3 LIKE '%substring%'
table.select(predicate=_.c3.contains('substring'))

See the Predicate pushdown support document for more information on constructing predicates using Ibis.

Import Parquet files via S3 protocol

You can efficiently create tables from Parquet files that already exist in an S3 bucket on VAST without copying them via the client. If more than one file is included in parquet_files they will be loaded concurrently.

with session.transaction() as tx:
    schema = tx.bucket('database-name').schema('schema-name')
    table = util.create_table_from_files(
        schema=schema, table_name='imported-table',
        parquet_files=['/bucket-name/staging/file.parquet'])

If the table already exists, you can use the table.import_files() method to add more data to the table from Parquet files that already exist in an S3 bucket on VAST.

with session.transaction() as tx:
    table = tx.bucket('database-name').schema('schema-name').table('table-name')
    table.import_files(["/bucket-name/staging/file2.parquet"])

Semi-sorted Projections

Create, list and delete available semi-sorted projections:

p = table.create_projection('proj', sorted=['c3'], unsorted=['c1'])
print(table.projections())
print(p.get_stats())
p.drop()

Snapshots

You can access the VAST Database using snapshots:

snaps = bucket.list_snapshots()
batches = snaps[0].schema('schema-name').table('table-name').select()

Interactive and Non-Interactive Workflows

A Table created via the Schema object (tx.bucket('..').schema('..').table('..')) loads metadata and stats eagerly allowing for interactive development. Each object (bucket, schema, table) requires one or more round-trips to the server and .table() will fetch the full table schema.

It's generally more efficient to use the TableMetadata interface that allows for both lazy loading of the schemas as it's needed, as well as allowing reusing the metadata across transactions.

# load the table schema & stats into an object we can use across transactions
table_md = TableMetadata(TableRef("bucket-name", "schema-name", "table-name"))
with session.transaction() as tx:
    table_md.load(tx)

# you may init the TableMetadata with the schema in advanced (to save that round-trip)
table_md = TableMetadata(TableRef("bucket-name", "schema-name", "table-name"),
                         arrow_schema=<some-arrow-schema>)

# now we can reuse it without the overhead of reloading the schema and stats,
# such as for inserts:
with session.transaction() as tx:
    table = tx.table_from_metadata(table_md)
    rows = [{"id": 1, "name": "row1"}]
    py_records = pa.RecordBatch.from_pylist(rows, schema=table_md.arrow_schema)
    table.insert(py_records)

# and selects:
with session.transaction() as tx:
    table = tx.table_from_metadata(table_md)
    reader = table.select()
    results = reader.read_all()
    print(results)

Some table operations, like table.import_files(), does not require the client to know the table schema, and using the TableMetadata interface will bypass fetching the schema entirely.

table_md = TableMetadata(TableRef("bucket-name", "schema-name", "table-name"))

with session.transaction() as tx:
    table = tx.table_from_metadata(table_md)
    table.import_files(["/bucket-name/staging/file2.parquet"])

Post-processing

Export

Table.select() returns a stream of PyArrow record batches, which can be directly exported into a Parquet file:

batches = table.select()
with contextlib.closing(pa.parquet.ParquetWriter('/path/to/file.parquet', batches.schema)) as writer:
    for batch in table_batches:
        writer.write_batch(batch)

DuckDB Integration

Use DuckDB to post-process the resulting stream of PyArrow record batches:

from ibis import _

import duckdb
conn = duckdb.connect()

with session.transaction() as tx:
    table = tx.bucket("bucket-name").schema("schema-name").table("table-name")
    batches = table.select(columns=['c1'], predicate=(_.c2 > 2))
    print(conn.execute("SELECT sum(c1) FROM batches").arrow())

Note: the transaction must be active while the DuckDB query is executing and fetching results using the Python SDK.

VAST Catalog

The VAST Catalog can be queried as a regular table:

import pyarrow as pa
import vastdb

session = vastdb.connect(
    endpoint='http://vip-pool.v123-xy.VastENG.lab',
    access=AWS_ACCESS_KEY_ID,
    secret=AWS_SECRET_ACCESS_KEY)

with session.transaction() as tx:
    table = tx.catalog().select(['element_type']).read_all()
    df = table.to_pandas()

    total_elements = len(df)
    print(f"Total elements in the catalog: {total_elements}")

    file_count = (df['element_type'] == 'FILE').sum()
    print(f"Number of files/objects: {file_count}")

    distinct_elements = df['element_type'].unique()
    print("Distinct element types on the system:")
    print(distinct_elements)

More Information

See these blog posts for more examples:

See also the full Vast DB Python SDK documentation

About

Python-based SDK designed for interacting with VAST Database & VAST Catalog, enabling schema and table management, efficient ingest, query and modification of columnar data.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Contributors

Languages