CloudSDP

codecov test

CloudSDP Library

The CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.

Features

WIP:

TODO:

Installation

Install the library using pip:

pip install cloudsdp

Or, install the library using poetry:

poetry add cloudsdp

QuickStart

Data Ingestion

Create dataset, ingest data and cleanup

Ingest data from a pandas dataframe:

import os
import pandas as pd

from cloudsdp.api.bigquery import BigQuery, WriteDisposition


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = {
        "name": [ f"Name{str(el)}" for el in range(0, 10000)],
        "score": [ num for num in range(0, 10000)]
    }
    df = pd.DataFrame(data)

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "score", "field_type": "NUMERIC", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)
    bq.create_table(table_name, data_schema, dataset_name)

    bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

From a list of python dicts:

import os

from cloudsdp.api.bigquery import BigQuery

PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = [{"name": "Someone", "age": 29}, {"name": "Something", "age": 22}]

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    errors = bq.ingest_rows_json(data, dataset_name, table_name)
    if errors:
        print("Errors", ";".join(errors))

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

From csv files stored in GCS:


import os

from cloudsdp.api.bigquery import BigQuery


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    csv_uris = ["gs://mybucket/name_age_data_1.csv", "gs://mybucket/name_age_data_2.csv"]

    result = bq.ingest_csvs_from_cloud_bucket(
        csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120
    )
    print(result)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)