Declare
Data

Get Started in ≤1 Minute

From PySpark code running on costly Spark Clusters to DeclareData Fuse in minutes. Our mission is to reduce unnecessary complexity and simplify your data processing workflows.

Overview

DeclareData Fuse is a drop-in alternative for running PySpark code that operates without the need for high overhead and expensive Spark clusters. It is designed to run on your local machine as well as in production, providing the same APIs and functionality as PySpark and is optimized for working with datasets less than 5TB.

Why Use Fuse vs Apache Spark?

  • 🚀 Process terabyte-scale workflows faster locally and in production
  • 💰 Cut cloud costs by 50%+ without compromising speed
  • 😄 Accelerate data engineering workflows, increase happiness
  • ✨ Keep your existing PySpark codebase

Usage

Your PySpark code should work without modifications! No need to learn new APIs or refactor your codebase.


# Before: from pyspark.sql import SparkSession
# Before: from pyspark.sql import functions as F
# Before: from pyspark.sql.window import Window

# After: only update the imports
from fuse_python.session import session
import fuse_python.functions as F
from fuse_python.window import Window

# Your existing PySpark code works unchanged
spark = session.builder.getOrCreate()

# Rest of your code stays exactly the same
# S3 support also built-in
data = [
    ("CA", "Los Angeles", 2023, 100),
    ("CA", "San Francisco", 2023, 80),
    ("NY", "New York City", 2023, 150),
    ("NY", "Buffalo", 2023, 70)
]

df = spark.createDataFrame(data, ["state", "city", "year", "incidents"])

window_spec = Window.partitionBy("state") \
                   .orderBy(F.desc("incidents")) \
                   .rowsBetween(Window.unboundedPreceding, Window.currentRow)

result = df.withColumn("running_total", F.sum("incidents").over(window_spec)) \
          .withColumn("rank", F.row_number().over(window_spec)) \
          .orderBy("state", F.desc("incidents"))

result.show()
                

More Examples

Complex operations like window functions work seamlessly. Here's an example calculating running totals within categories:


from fuse_python.session import session
import fuse_python.functions as F
from fuse_python.window import Window

spark = session.builder.getOrCreate()

# Sample e-commerce data
data = [
    # (date, category, product, price, quantity, customer_id, country)
    ("2023-01-01", "Electronics", "Laptop", 1200.00, 1, "C1", "USA"),
    ("2023-01-01", "Electronics", "Phone", 800.00, 2, "C2", "Canada"),
    ("2023-01-02", "Books", "Python Guide", 50.00, 3, "C1", "USA"),
    ("2023-01-02", "Electronics", "Laptop", 1200.00, 1, "C3", "USA"),
    ("2023-01-03", "Books", "Data Science", 45.00, 2, "C2", "Canada"),
    ("2023-02-01", "Electronics", "Phone", 800.00, 1, "C4", "Mexico")
]

# Create DataFrame with multiple columns
df = spark.createDataFrame(data, [
    "date", "category", "product", "price", 
    "quantity", "customer_id", "country"
])

# Multiple transformations
result = df \
    .withColumn("date", F.to_date("date")) \
    .withColumn("month", F.date_format("date", "yyyy-MM")) \
    .withColumn("revenue", F.col("price") * F.col("quantity")) \
    .withColumn("discount", F.when(F.col("quantity") > 1, 0.1).otherwise(0)) \
    .withColumn("final_revenue", 
                F.col("revenue") * (1 - F.col("discount")))

# Complex windowing
customer_window = Window.partitionBy("customer_id") \
                       .orderBy("date") \
                       .rowsBetween(Window.unboundedPreceding, Window.currentRow)

category_window = Window.partitionBy("category", "month") \
                       .orderBy(F.desc("revenue"))

# Apply multiple window functions
analysis = result \
    .withColumn("customer_total_spend", 
                F.sum("final_revenue").over(customer_window)) \
    .withColumn("category_rank", 
                F.dense_rank().over(category_window)) \
    .withColumn("category_revenue_pct", 
                F.col("revenue") / F.sum("revenue").over(Window.partitionBy("category"))) \
    .withColumn("prev_purchase_date", 
                F.lag("date").over(Window.partitionBy("customer_id").orderBy("date")))

# Complex aggregation
summary = analysis \
    .groupBy("category", "month") \
    .agg(
        F.count("*").alias("total_sales"),
        F.sum("revenue").alias("total_revenue"),
        F.avg("revenue").alias("avg_revenue"),
        F.countDistinct("customer_id").alias("unique_customers"),
        F.collect_list("product").alias("products_sold")
    ) \
    .where(F.col("total_revenue") > 1000) \
    .orderBy(F.desc("total_revenue"))

# Show results
print("Detailed Sales Analysis:")
summary.show(truncate=False)

# Advanced filtering and selection
top_customers = analysis \
    .where((F.col("country").isin("USA", "Canada")) & 
           (F.col("category_rank") == 1)) \
    .select(
        "customer_id",
        "category",
        "month",
        F.round("customer_total_spend", 2).alias("total_spend"),
        F.datediff("date", "prev_purchase_date").alias("days_between_purchases")
    ) \
    .orderBy(F.desc("total_spend"))

print("\nTop Customers by Category:")
top_customers.show()
                

Install

Fuse is a standalone server binary and client library that you can install on your local machine. It requires no additional dependencies and is designed to work with Python 3.10+.

$ curl -LsSf https://declaredata.com/fuse/install.sh | sh

Deploy

Deploying your code is as simple as running it locally. WIP.


from fuse_python.session import session
import fuse_python.functions as F
from contextlib import contextmanager