Sign In
Access to Author tools and Claude Code Assistant requires authentication.
by Adam 11 min read

Building an Options Trading Platform with Temporal Workflows

How I use Temporal to orchestrate backtesting pipelines that process years of market data across multiple ML experts in parallel.

homelab temporal trading python workflow-orchestration options backtesting machine-learning

The Problem with Trading Backtests

Options trading backtests are computationally brutal. You’re not just running a strategy against historical prices – you’re processing years of OHLCV data, computing 43+ technical indicators per day, running multiple signal-generating algorithms, applying risk policies, simulating order execution with realistic commissions, and tracking position P&L through time.

Do this sequentially for SPY from 2020 to 2024, and you’re looking at hours of runtime. Do it wrong, and you’re debugging why your backtest thinks it made $10M when it actually lost money on every trade.

I built an options trading platform that needed to:

  • Process 1,000+ trading days per backtest
  • Run 5 different “expert” algorithms (rule-based, ML, LLM-powered)
  • Execute data ingestion, feature computation, and signal generation in parallel
  • Handle position management with realistic constraints (commissions, position limits, capital management)
  • Support isolation between backtest runs (no data leakage)
  • Resume from failures without re-processing completed work

Redis Queue wasn’t going to cut it. Enter Temporal.

Architecture Overview

The platform uses a microservices architecture where each component runs as a separate Python process, all orchestrated by Temporal:

+-------------------------------------------------------------------+
|                        TEMPORAL SERVER                             |
|   (persists every workflow step to PostgreSQL)                     |
+-------------------------------------------------------------------+
                              |
              +---------------+---------------+
              |                               |
       +------v------+                 +------v------+
       |   Worker    |                 |   Worker    |
       | (Python)    |                 |  (Python)   |
       +------+------+                 +------+------+
              |                               |
   +----------+----------+         +----------+----------+
   |          |          |         |          |          |
+--v--+   +---v--+   +---v--+  +---v--+   +---v--+   +---v--+
|Ingest|  |Feature|  |Signal|  |Policy|  |Execute|  |Manage|
|Data  |  |Compute|  |Gen   |  |Engine|  |Orders |  |Positions|
+------+  +-------+  +------+  +------+  +-------+  +---------+
              |          |
        +-----+-----+----+-----+
        |     |     |    |     |
      +--+  +-+-+ +-+-+ +-+-+ +-+-+
      |ML|  |Fn | |AI | |UB | |EW |
      +--+  +---+ +---+ +---+ +---+
           (5 Expert Algorithms)

The key insight: phases 1-3 are embarrassingly parallel (each day is independent), while phase 4 must be sequential (portfolio state depends on previous days).

The TradingWorkflow

Here’s the core workflow definition. Notice how it orchestrates activities in phases:

from datetime import timedelta
from temporalio import workflow
import asyncio

with workflow.unsafe.imports_passed_through():
    from services.workflows.activities import (
        clear_backtest_data, ingest_data, compute_features,
        generate_signals, check_signals_exist, apply_policy,
        execute_orders, manage_positions
    )

@workflow.defn
class TradingWorkflow:
    @workflow.run
    async def run(
        self,
        ticker: str,
        start_date: str,
        end_date: str,
        mode: str,
        experts: list = None,
        config_dict: dict = None
    ) -> str:
        # Extract backtest_id from workflow_id for isolation
        workflow_id = workflow.info().workflow_id
        backtest_id = workflow_id.replace("backtest-", "")

        # Default config if not provided
        if config_dict is None:
            config_dict = {
                "initial_capital": 20000.0,
                "commission_per_contract": 0.65,
                "min_hold_days": 5,
                "max_trades_per_day": 2,
                "max_open_positions": 5
            }

        # Generate date list
        start = datetime.strptime(start_date, "%Y-%m-%d").date()
        end = datetime.strptime(end_date, "%Y-%m-%d").date()
        dates = []
        current = start
        while current <= end:
            dates.append(current.strftime("%Y-%m-%d"))
            current += timedelta(days=1)

        BATCH_SIZE = 30  # Avoid overwhelming worker history

        # Phase 0: Clear previous backtest data
        await workflow.execute_activity(
            clear_backtest_data,
            args=[ticker, start_date, end_date, backtest_id],
            start_to_close_timeout=timedelta(minutes=5)
        )

        # Phase 1: Parallel Ingestion
        for batch in chunked(dates, BATCH_SIZE):
            await asyncio.gather(*[
                workflow.execute_activity(
                    ingest_data,
                    args=[ticker, d],
                    start_to_close_timeout=timedelta(minutes=60)
                ) for d in batch
            ])

        # Phase 2: Parallel Feature Computation
        for batch in chunked(dates, BATCH_SIZE):
            await asyncio.gather(*[
                workflow.execute_activity(
                    compute_features,
                    args=[ticker, d],
                    start_to_close_timeout=timedelta(minutes=60)
                ) for d in batch
            ])

        # Phase 3: Parallel Signal Generation
        for batch in chunked(dates, BATCH_SIZE):
            await asyncio.gather(*[
                workflow.execute_activity(
                    generate_signals,
                    args=[ticker, d, experts, backtest_id],
                    start_to_close_timeout=timedelta(minutes=60)
                ) for d in batch
            ])

        # Phase 4: Sequential Policy & Execution
        for d in dates:
            # Check for positions to close (holding period, stop-loss, profit target)
            await workflow.execute_activity(
                manage_positions,
                args=[ticker, d, min_hold_days, config_dict, backtest_id],
                start_to_close_timeout=timedelta(minutes=30)
            )

            # Only process days with signals
            has_signals = await workflow.execute_activity(
                check_signals_exist,
                args=[ticker, d, backtest_id],
                start_to_close_timeout=timedelta(minutes=5)
            )

            if has_signals:
                await workflow.execute_activity(
                    apply_policy,
                    args=[ticker, d, config_dict, backtest_id],
                    start_to_close_timeout=timedelta(minutes=60)
                )
                await workflow.execute_activity(
                    execute_orders,
                    args=[ticker, d, config_dict, backtest_id],
                    start_to_close_timeout=timedelta(minutes=60)
                )

        # Phase 5: Close remaining positions
        await workflow.execute_activity(
            manage_positions,
            args=[ticker, dates[-1], 0, config_dict, backtest_id],
            start_to_close_timeout=timedelta(minutes=30)
        )

        return f"Backtest Complete"

The backtest_id parameter is critical – it isolates each backtest run, preventing data from one run polluting another. Without this, running two backtests on SPY would have their signals and positions mixed together.

Activities: Where the Work Happens

Temporal activities are the actual work units. They can call external services, write to databases, and take as long as needed. Here’s the signal generation activity that runs 5 expert algorithms in parallel:

from temporalio import activity
import asyncio

ALL_EXPERTS = ["function", "ml", "agent", "ultra_bull", "elliott_wave"]

@activity.defn
async def generate_signals(
    ticker: str,
    asof_date: str,
    experts: list = None,
    backtest_id: str = None
) -> str:
    if not experts:
        experts = ALL_EXPERTS

    procs = {}

    if "function" in experts:
        cmd = ["python", "services/algos/function-expert/main.py",
               "--ticker", ticker, "--date", asof_date]
        if backtest_id:
            cmd.extend(["--backtest-id", backtest_id])
        procs["function"] = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

    if "ml" in experts:
        cmd = ["python", "services/algos/ml-expert/main.py",
               "--ticker", ticker, "--date", asof_date]
        if backtest_id:
            cmd.extend(["--backtest-id", backtest_id])
        procs["ml"] = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

    # ... similar for agent, ultra_bull, elliott_wave

    await asyncio.gather(*[p.wait() for p in procs.values()])

    failed = [name for name, proc in procs.items() if proc.returncode != 0]
    if failed:
        raise Exception(f"Expert(s) failed: {', '.join(failed)}")

    return f"Generated signals for {ticker} on {asof_date}"

Each expert is a separate Python process. The function expert uses rule-based technical analysis. The ML expert runs LightGBM models. The agent expert calls OpenAI’s API. They all run in parallel, and if any fails, the activity throws an exception that Temporal will retry.

Caching: Don’t Recompute What You’ve Already Computed

Data ingestion and feature computation are deterministic – running them twice for the same ticker/date produces identical results. So I built a Redis-backed caching layer:

class BacktestCache:
    VERSION = "v7"  # Bump when algo logic changes
    PREFIX = "backtest"

    def __init__(self, db=None):
        self.db = db or get_cache()

    def _make_key(self, activity: str, ticker: str, date: str, extra: str = "") -> str:
        parts = [self.PREFIX, activity, ticker, date, self.VERSION]
        if extra:
            parts.append(extra)
        return ":".join(parts)

    def is_completed(self, activity: str, ticker: str, date: str, extra: str = "") -> bool:
        if self.db is None:
            return False
        key = self._make_key(activity, ticker, date, extra)
        return bool(self.db.exists(key))

    def mark_completed(self, activity: str, ticker: str, date: str,
                       result: str = "ok", ttl: int = None, extra: str = "") -> None:
        key = self._make_key(activity, ticker, date, extra)
        data = {"result": result, "completed_at": datetime.now().isoformat()}
        if ttl:
            self.db.setex(key, ttl, json.dumps(data))
        else:
            self.db.set(key, json.dumps(data))

Cache keys include a version number. When I change the feature computation logic, I bump the version and all cached results are effectively invalidated.

The version has been bumped 7 times. Every time I thought “the features are final,” I found a bug or added a new indicator.

The Expert System

The platform runs 5 different signal-generating experts:

Function Expert (Rule-Based)

Classic technical analysis: RSI overbought/oversold, MACD crossovers, Bollinger Band squeezes, ADX trend strength. No machine learning, just rules I’ve refined over years:

def analyze_direction(self, indicators: Dict[str, float]) -> Dict[str, Any]:
    signals = []
    bullish_score = 0
    bearish_score = 0

    rsi_14 = indicators.get('rsi_14', 50)
    if rsi_14 < 30:  # Oversold
        bullish_score += 2
        signals.append(f"RSI oversold ({rsi_14:.1f})")
    elif rsi_14 > 70:  # Overbought
        bearish_score += 2
        signals.append(f"RSI overbought ({rsi_14:.1f})")

    # MACD, moving averages, stochastic...

    if bullish_score > bearish_score * 1.5:
        return {'bias': 'bullish', 'strength': strength, 'signals': signals}
    elif bearish_score > bullish_score * 1.5:
        return {'bias': 'bearish', 'strength': strength, 'signals': signals}
    else:
        return {'bias': 'neutral', 'strength': 0.3, 'signals': signals}

ML Expert (LightGBM)

Trained on 43 technical indicators to predict direction, volatility, and forward returns:

  • Direction model: Binary classifier (up/down)
  • Volatility model: Regressor predicting 20-day realized volatility
  • Returns model: Regressor predicting 5-day returns

The volatility model performs best. Turns out predicting how much a stock will move is easier than predicting which direction.

Agent Expert (LLM-Powered)

Calls OpenAI’s API with market context and asks for trading signals. This one’s experimental – I’m curious if LLMs can spot patterns I haven’t codified into rules. Early results are… mixed.

Ultra Bull and Elliott Wave Experts

Specialized strategies. Ultra Bull looks for extreme dip-buying opportunities. Elliott Wave attempts to identify wave patterns (with limited success – Elliott Wave is more art than science).

Policy Engine: The Risk Gatekeeper

Raw signals don’t become trades. They pass through a policy engine that enforces constraints:

class PolicyEngine:
    def __init__(self, config: BacktestConfig, backtest_id: str = None):
        self.config = config
        self.max_positions = config.max_open_positions
        self.min_confidence = 0.6
        self.backtest_id = backtest_id

    def _check_daily_trade_limit(self) -> tuple[bool, str]:
        if self.trades_today >= self.config.max_trades_per_day:
            return False, f"Daily trade limit ({self.config.max_trades_per_day}) reached"
        return True, "OK"

    def _get_available_capital(self) -> float:
        # Account balance = initial_capital + realized P&L (compounding!)
        with self.db_client.engine.connect() as conn:
            pnl_result = conn.execute(text("""
                SELECT COALESCE(SUM(net_pnl), 0) as total_pnl
                FROM trades
                WHERE backtest_id = :backtest_id AND status = 'closed'
            """), {"backtest_id": self.backtest_id})

            pos_result = conn.execute(text("""
                SELECT COALESCE(SUM(quantity * entry_price), 0) as total_invested
                FROM positions
                WHERE status = 'open' AND backtest_id = :backtest_id
            """), {"backtest_id": self.backtest_id})

        account_balance = self.config.initial_capital + realized_pnl
        available = account_balance - total_invested
        return max(0.0, available)

    def evaluate(self, signals: List[SignalEnvelope], current_price: float) -> List[OrderIntent]:
        decisions = []

        for signal in signals:
            best_intent = max(signal.intent, key=lambda x: x.confidence)

            # Always check confidence
            if best_intent.confidence < self.min_confidence:
                continue

            # Check limits
            if not self._check_daily_trade_limit()[0]:
                continue
            if not self._check_position_limits(ticker)[0]:
                continue
            if not self._check_capital_available(estimated_cost)[0]:
                continue

            # Signal passed all checks - create order
            decisions.append(self._create_order_intent(signal, best_intent))

        return decisions

The compounding logic is intentional. Winners increase buying power; losers reduce it. Natural risk management.

Position Management: Exit Strategy

Positions don’t just sit open forever. The manage_positions activity checks three exit conditions:

  1. Time-based: Position held for N days (default 5)
  2. Stop-loss: P&L drops below -35%
  3. Profit target: P&L exceeds +75%
@activity.defn
async def manage_positions(
    ticker: str,
    asof_date: str,
    hold_days: int = 5,
    config_dict: dict = None,
    backtest_id: str = None,
    stop_loss_pct: float = -0.35,
    profit_target_pct: float = 0.75
) -> str:
    # Get all open positions
    with client.engine.connect() as conn:
        result = conn.execute(text("""
            SELECT position_id, entry_price, symbol, entry_time
            FROM positions
            WHERE ticker = :ticker AND status = 'open'
            AND backtest_id = :backtest_id
        """), {"ticker": ticker, "backtest_id": backtest_id})

    for pos in all_open_positions:
        current_pnl_pct = (current_price - entry_price) / entry_price

        if current_pnl_pct <= stop_loss_pct:
            exit_reason = "stop_loss"
        elif current_pnl_pct >= profit_target_pct:
            exit_reason = "profit_target"
        elif days_held >= hold_days:
            exit_reason = "time_exit"
        else:
            continue  # Keep holding

        # Close the position, create exit fill, update trade record
        # ...

Options positions use proper pricing – the activity looks up historical option prices from the options_bars_day table rather than using the underlying’s price.

Running the Worker

Workers poll Temporal for tasks and execute them. I built a simple CLI to manage multiple workers:

./scripts/workers.py start 4       # Start 4 workers in tmux
./scripts/workers.py stop          # Stop all workers
./scripts/workers.py status        # Show status
./scripts/workers.py logs          # Attach to view logs

Each worker runs in a tmux pane, making it easy to watch parallel execution in real-time. Temporal handles task distribution automatically – if one worker is processing a heavy transcode, others pick up lighter tasks.

Triggering Backtests via API

The FastAPI backend starts workflows and monitors progress:

@router.post("/run", response_model=BacktestResponse)
async def run_backtest(request: BacktestRequest, background_tasks: BackgroundTasks):
    backtest_id = str(uuid.uuid4())[:8]
    temporal_client = get_temporal_client()

    workflow_id = f"backtest-{backtest_id}"
    handle = await temporal_client.start_workflow(
        TradingWorkflow.run,
        args=[request.ticker, request.startDate, request.endDate,
              "backtest", request.experts, config_dict],
        id=workflow_id,
        task_queue="trading-task-queue"
    )

    # Monitor in background
    background_tasks.add_task(monitor_workflow, backtest_id, handle)

    return BacktestResponse(
        backtest_id=backtest_id,
        status="running",
        message=f"Backtest started via Temporal workflow {workflow_id}"
    )

The UI polls the status endpoint and displays real-time progress. When the workflow completes (or fails), results are fetched from the database and cached.

Lessons Learned

1. Batch Your Parallel Activities

My first implementation fired all 1,000+ days as parallel activities. Temporal’s history grew massive, and workers struggled. Batching into groups of 30 solved it:

BATCH_SIZE = 30
for batch in chunked(dates, BATCH_SIZE):
    await asyncio.gather(*[
        workflow.execute_activity(activity, args=[d], ...)
        for d in batch
    ])

2. Timeouts Matter

Set start_to_close_timeout generously for activities that call external services or do heavy computation. Data ingestion hitting a slow API? Give it 60 minutes. Feature computation for one day? 60 minutes. The timeout isn’t “how long it should take” – it’s “when should Temporal give up and retry.”

3. Isolation is Everything

Without backtest_id filtering everywhere, I had backtests reading signals from previous runs. Subtle bugs that produce believable-but-wrong results are the worst kind. Every query that touches signals, decisions, positions, or trades must filter by backtest_id.

4. Cache Deterministic Work

Data ingestion and feature computation are deterministic. Running them twice for SPY on 2023-01-15 produces identical results. Cache them. But signal generation for a specific backtest? Don’t cache – you want fresh signals isolated to that run.

5. The Temporal UI is Your Friend

When a workflow fails, the Temporal UI shows exactly which activity failed, what inputs it received, and what exception it threw. No more grepping through logs trying to figure out which of 5,000 activities broke.

Results

A 4-year backtest (2020-2024) on SPY now runs in under 30 minutes with 4 workers. Cold start (no cache) takes about 2 hours. If a worker crashes mid-execution, the workflow resumes from the last completed activity.

The combination of parallel execution for independent work and sequential execution for stateful operations makes complex backtests tractable. I can iterate on trading strategies without waiting hours for results.

More importantly, I can trust the results. Temporal’s durability guarantees mean no silent failures, and the UI provides complete visibility into what happened.

Getting Started

If you’re building something similar:

  1. Start with Temporal’s Python SDK
  2. Use the temporalio/auto-setup Docker image for quick local development
  3. Design workflows in phases: parallel where possible, sequential where necessary
  4. Add caching for deterministic operations early
  5. Filter everything by a run/session ID to ensure isolation

The learning curve is steep, but the payoff is real. My trading platform went from “I think it works” to “I can prove it works” – and that’s worth every hour spent learning Temporal.


Building trading systems with Temporal? I’d love to hear about your architecture choices.