Example Integrations
Cluvio Integration
import os
from secodadk import SecodaIntegration, Resource, DeclaredLineage, InternalResource
BASE_URL = "https://api.cluvio.com"
class FakeCluvioIntegration(SecodaIntegration):
def extract(self):
token = (
self.http_post(
f"{BASE_URL}/users/sign_in",
json={
"user": {
"email": self.credentials.get("email"),
"password": self.credentials.get("password"),
}
},
headers={"Accept": "application/json"},
)
.json()
.get("token")
)
auth = dict(
headers={"token": token},
verify=True,
)
group_databuilder_id = "dashboard_group.default"
self.declare_resource(
Resource(
entity_type="dashboard_group",
databuilder_id=group_databuilder_id,
title="default",
)
)
for dashboard in (
self.http_get(
f"{BASE_URL}/dashboards/",
**auth,
)
.json()
.get("data", [])
):
dashboard_databuilder_id = f"dashboard.{dashboard.get('id')}"
self.declare_resource(
Resource(
entity_type="dashboard",
databuilder_id=dashboard_databuilder_id,
title=dashboard.get("attributes", {}).get("name"),
description=dashboard.get("attributes", {}).get("description"),
parent_databuilder_id=group_databuilder_id,
external_updated_at=dashboard.get("attributes", {}).get(
"updated_at"
),
product="cluvio",
native_type="dashboard",
)
)
self.declare_lineage(
DeclaredLineage(
from_identifier=InternalResource(
databuilder_id=dashboard_databuilder_id
),
to_identifier=InternalResource(databuilder_id=group_databuilder_id),
)
)
if __name__ == "__main__":
FakeCluvioIntegration(
credentials={
"email": os.environ.get("EMAIL"),
"password": os.environ.get("PASSWORD"),
}
).start()
Last updated