ingesting-data
PassData ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases. Use when importing CSV/JSON/Parquet files, pulling from S3/GCS buckets, consuming API feeds, or building ETL pipelines.
(0)
0stars
0downloads
0views
Install Skill
Skills are third-party code from public GitHub repositories. SkillHub scans for known malicious patterns but cannot guarantee safety. Review the source code before installing.
Install globally (user-level):
npx skillhub install Albmartinez13/ai-design-components/ingesting-dataInstall in current project:
npx skillhub install Albmartinez13/ai-design-components/ingesting-data --projectSuggested path: ~/.claude/skills/ingesting-data/
SKILL.md Content
---
name: ingesting-data
description: Data ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases. Use when importing CSV/JSON/Parquet files, pulling from S3/GCS buckets, consuming API feeds, or building ETL pipelines.
---
# Data Ingestion Patterns
This skill provides patterns for getting data INTO your systems from external sources.
## When to Use This Skill
- Importing CSV, JSON, Parquet, or Excel files
- Loading data from S3, GCS, or Azure Blob storage
- Consuming REST/GraphQL API feeds
- Building ETL/ELT pipelines
- Database migration and CDC (Change Data Capture)
- Streaming data ingestion from Kafka/Kinesis
## Ingestion Pattern Decision Tree
```
What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md
```
## Quick Start by Language
### Python (Recommended for ETL)
**dlt (data load tool) - Modern Python ETL:**
```python
import dlt
# Define a source
@dlt.source
def github_source(repo: str):
@dlt.resource(write_disposition="merge", primary_key="id")
def issues():
response = requests.get(f"https://api.github.com/repos/{repo}/issues")
yield response.json()
return issues
# Load to destination
pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="postgres", # or duckdb, bigquery, snowflake
dataset_name="github_data"
)
load_info = pipeline.run(github_source("owner/repo"))
print(load_info)
```
**Polars for file processing (faster than pandas):**
```python
import polars as pl
# Read CSV with schema inference
df = pl.read_csv("data.csv")
# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")
# Read JSON lines
df = pl.read_ndjson("events.jsonl")
# Write to database
df.write_database(
table_name="events",
connection="postgresql://user:pass@localhost/db",
if_table_exists="append"
)
```
### TypeScript/Node.js
**S3 ingestion:**
```typescript
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";
const s3 = new S3Client({ region: "us-east-1" });
async function ingestFromS3(bucket: string, key: string) {
const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
const body = await response.Body?.transformToString();
// Parse CSV
const records = parse(body, { columns: true, skip_empty_lines: true });
// Insert to database
await db.insert(eventsTable).values(records);
}
```
**API feed polling:**
```typescript
import { Hono } from "hono";
// Webhook receiver for real-time ingestion
const app = new Hono();
app.post("/webhooks/stripe", async (c) => {
const event = await c.req.json();
// Validate webhook signature
const signature = c.req.header("stripe-signature");
// ... validation logic
// Ingest event
await db.insert(stripeEventsTable).values({
eventId: event.id,
type: event.type,
data: event.data,
receivedAt: new Date()
});
return c.json({ received: true });
});
```
### Rust
**High-performance file ingestion:**
```rust
use polars::prelude::*;
use aws_sdk_s3::Client;
async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
// Download from S3
let resp = client.get_object()
.bucket(bucket)
.key(key)
.send()
.await?;
let bytes = resp.body.collect().await?.into_bytes();
// Parse with Polars
let df = ParquetReader::new(Cursor::new(bytes))
.finish()?;
Ok(df)
}
```
### Go
**Concurrent file processing:**
```go
package main
import (
"context"
"encoding/csv"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return err
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
records, err := reader.ReadAll()
if err != nil {
return err
}
// Batch insert to database
return batchInsert(ctx, records)
}
```
## Ingestion Patterns
### 1. Batch Ingestion (Files/Storage)
For periodic bulk loads:
```
Source → Extract → Transform → Load → Validate
↓ ↓ ↓ ↓ ↓
S3 Download Clean/Map Insert Count check
```
**Key considerations:**
- Use chunked reading for large files (>100MB)
- Implement idempotency with checksums
- Track file processing state
- Handle partial failures
### 2. Streaming Ingestion (Real-time)
For continuous data flow:
```
Source → Buffer → Process → Load → Ack
↓ ↓ ↓ ↓ ↓
Kafka In-memory Transform DB Commit offset
```
**Key considerations:**
- At-least-once vs exactly-once semantics
- Backpressure handling
- Dead letter queues for failures
- Checkpoint management
### 3. API Polling (Feeds)
For external API data:
```
Schedule → Fetch → Dedupe → Load → Update cursor
↓ ↓ ↓ ↓ ↓
Cron API call By ID Insert Last timestamp
```
**Key considerations:**
- Rate limiting and backoff
- Incremental loading (cursors, timestamps)
- API pagination handling
- Retry with exponential backoff
### 4. Change Data Capture (CDC)
For database replication:
```
Source DB → Capture changes → Transform → Target DB
↓ ↓ ↓ ↓
Postgres Debezium/WAL Map schema Insert/Update
```
**Key considerations:**
- Initial snapshot + streaming changes
- Schema evolution handling
- Ordering guarantees
- Conflict resolution
## Library Recommendations
| Use Case | Python | TypeScript | Rust | Go |
|----------|--------|------------|------|-----|
| **ETL Framework** | dlt, Meltano, Dagster | - | - | - |
| **Cloud Storage** | boto3, gcsfs, adlfs | @aws-sdk/*, @google-cloud/* | aws-sdk-s3, object_store | aws-sdk-go-v2 |
| **File Processing** | polars, pandas, pyarrow | papaparse, xlsx, parquetjs | polars-rs, arrow-rs | encoding/csv, parquet-go |
| **Streaming** | confluent-kafka, aiokafka | kafkajs | rdkafka-rs | franz-go, sarama |
| **CDC** | Debezium, pg_logical | - | - | - |
## Reference Documentation
- `references/cloud-storage.md` - S3, GCS, Azure Blob patterns
- `references/file-formats.md` - CSV, JSON, Parquet, Excel handling
- `references/api-feeds.md` - REST polling, webhooks, GraphQL subscriptions
- `references/streaming-sources.md` - Kafka, Kinesis, Pub/Sub
- `references/database-migration.md` - Schema migration, CDC patterns
- `references/etl-tools.md` - dlt, Meltano, Airbyte, Fivetran
## Scripts
- `scripts/validate_csv_schema.py` - Validate CSV against expected schema
- `scripts/test_s3_connection.py` - Test S3 bucket connectivity
- `scripts/generate_dlt_pipeline.py` - Generate dlt pipeline scaffold
## Chaining with Database Skills
After ingestion, chain to appropriate database skill:
| Destination | Chain to Skill |
|-------------|----------------|
| PostgreSQL, MySQL | `databases-relational` |
| MongoDB, DynamoDB | `databases-document` |
| Qdrant, Pinecone | `databases-vector` (after embedding) |
| ClickHouse, TimescaleDB | `databases-timeseries` |
| Neo4j | `databases-graph` |
For vector databases, chain through `ai-data-engineering` for embedding:
```
ingesting-data → ai-data-engineering → databases-vector
```