The CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.
WIP:
TODO:
Install the library using pip:
pip install cloudsdp
Or, install the library using poetry:
poetry add cloudsdp
Ingest data from a pandas dataframe:
import os
import pandas as pd
from cloudsdp.api.bigquery import BigQuery, WriteDisposition
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data = {
"name": [ f"Name{str(el)}" for el in range(0, 10000)],
"score": [ num for num in range(0, 10000)]
}
df = pd.DataFrame(data)
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "score", "field_type": "NUMERIC", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
From a list of python dicts:
import os
from cloudsdp.api.bigquery import BigQuery
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data = [{"name": "Someone", "age": 29}, {"name": "Something", "age": 22}]
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
errors = bq.ingest_rows_json(data, dataset_name, table_name)
if errors:
print("Errors", ";".join(errors))
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
From csv files stored in GCS:
import os
from cloudsdp.api.bigquery import BigQuery
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
csv_uris = ["gs://mybucket/name_age_data_1.csv", "gs://mybucket/name_age_data_2.csv"]
result = bq.ingest_csvs_from_cloud_bucket(
csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120
)
print(result)
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)