Example Integrations

Example 1: Cluvio

import os

from secodadk import SecodaIntegration, Resource, DeclaredLineage, InternalResource

BASE_URL = "https://api.cluvio.com"


class FakeCluvioIntegration(SecodaIntegration):
    def extract(self):
        token = (
            self.http_post(
                f"{BASE_URL}/users/sign_in",
                json={
                    "user": {
                        "email": self.credentials.get("email"),
                        "password": self.credentials.get("password"),
                    }
                },
                headers={"Accept": "application/json"},
            )
            .json()
            .get("token")
        )

        auth = dict(
            headers={"token": token},
            verify=True,
        )

        group_databuilder_id = "dashboard_group.default"
        self.declare_resource(
            Resource(
                entity_type="dashboard_group",
                databuilder_id=group_databuilder_id,
                title="default",
            )
        )

        for dashboard in (
            self.http_get(
                f"{BASE_URL}/dashboards/",
                **auth,
            )
            .json()
            .get("data", [])
        ):
            dashboard_databuilder_id = f"dashboard.{dashboard.get('id')}"

            self.declare_resource(
                Resource(
                    entity_type="dashboard",
                    databuilder_id=dashboard_databuilder_id,
                    title=dashboard.get("attributes", {}).get("name"),
                    description=dashboard.get("attributes", {}).get("description"),
                    parent_databuilder_id=group_databuilder_id,
                    external_updated_at=dashboard.get("attributes", {}).get(
                        "updated_at"
                    ),
                    product="cluvio",
                    native_type="dashboard",
                )
            )


            self.declare_lineage(
                DeclaredLineage(
                    from_identifier=InternalResource(
                        databuilder_id=dashboard_databuilder_id
                    ),
                    to_identifier=InternalResource(databuilder_id=group_databuilder_id),
                )
            )


if __name__ == "__main__":
    FakeCluvioIntegration(
        credentials={
            "email": os.environ.get("EMAIL"),
            "password": os.environ.get("PASSWORD"),
        }
    ).start()

Example 2: Google Cloud Storage

See webinar reviewing this example here.

import os
import csv
import json
from io import StringIO
from secodadk import SecodaIntegration, Resource, DeclaredLineage, InternalResource, ExternalTable
from google.cloud import storage

class GoogleCloudStorageIntegration(SecodaIntegration):
    def extract(self):

        json_key = json.loads(self.credentials.get("json_file"))
        storage_client = storage.Client.from_service_account_info(json_key)

        BUCKET_NAME = self.credentials.get("bucket_name")
        bucket = storage_client.get_bucket(BUCKET_NAME)
        blobs = list(bucket.list_blobs())

        for blob in blobs: 
            # Sample blob names:
            # 2023/
            # 2023/montreal/
            # 2023/montreal/montreal_sales_data.csv

            parts = [part for part in blob.name.split('/') if part]
            if len(parts) == 1:
                ## Database
                self.declare_resource(
                    Resource(
                        title=parts[0],
                        entity_type="database",
                        native_type="folder",
                        databuilder_id=f"{BUCKET_NAME}.{parts[0]}",
                    )
                )
            elif len(parts) == 2:
                ## Schema
                self.declare_resource(
                    Resource(
                        title=parts[1],
                        entity_type="schema",
                        native_type="sub-folder",
                        database=parts[0],
                        databuilder_id=f"{BUCKET_NAME}.{parts[0]}.{parts[1]}"
                    )
                )
            elif len(parts) == 3:
                ## Table
                table_name = (parts[2])[:-4]
                self.declare_resource(
                    Resource(
                        title=table_name, 
                        entity_type="table",
                        native_type="csv",
                        database=parts[0],
                        schema=parts[1],
                        databuilder_id=f"{BUCKET_NAME}.{parts[0]}.{parts[1]}.{table_name}",
                        description=f"CSV data from pop up sales. Can be found here: {blob.public_url}"
                    )
                )

                dataset = BUCKET_NAME.replace("-", "_")
                self.declare_lineage(
                    DeclaredLineage(
                        from_identifier = InternalResource(
                            databuilder_id=f"{BUCKET_NAME}.{parts[0]}.{parts[1]}.{table_name}"
                        ),
                        to_identifier = ExternalTable(
                            database="secoda-web",
                            schema=dataset,
                            table=table_name
                        )
                    )
                )

                csv_string = blob.download_as_text(encoding='utf-8')
                csv_data = csv.reader(StringIO(csv_string), delimiter=',')
                header = next(csv_data)

                for field in header:
                    ## Column
                    self.declare_resource(
                        Resource(
                            title=field,
                            entity_type="column",
                            native_type="field",
                            database=parts[0],
                            schema=parts[1],
                            table=table_name,
                            parent_databuilder_id=f"{BUCKET_NAME}.{parts[0]}.{parts[1]}.{table_name}",
                            databuilder_id=f"{BUCKET_NAME}.{parts[0]}.{parts[1]}.{table_name}.{field}"
                        )
                    )

if __name__ == "__main__":
    GoogleCloudStorageIntegration(
        credentials={
            "json_file": os.environ.get("json_file"),
            "bucket_name": os.environ.get("bucket_name")
        }
    ).start()

Last updated