cpp-know-hows

cpp related stuff

View on GitHub

Asynchronous Programming and Futures

Overview

C++ provides high-level abstractions for asynchronous programming through futures, promises, and async. These enable writing concurrent code without directly managing threads.

┌────────────────────────────────────────────────────────┐
│           ASYNCHRONOUS PROGRAMMING TOOLS               │
├────────────────────────────────────────────────────────┤
│                                                        │
│  ASYNC          │  FUTURES/PROMISES │  PARALLEL        │
│  ──────         │  ──────────────── │  ────────        │
│  • std::async   │  • std::future    │  • par           │
│  • Policies     │  • std::promise   │  • par_unseq     │
│  • Fire-forget  │  • packaged_task  │  • Algorithms    │
│                 │  • shared_future  │                  │
│                                                        │
│  COROUTINES (C++20)  │  THREAD POOLS                   │
│  ──────────────────  │  ────────────                   │
│  • co_await          │  • Custom pools                 │
│  • co_return         │  • Work stealing                │
│  • co_yield          │  • Task queues                  │
│                                                        │
└────────────────────────────────────────────────────────┘

std::async

Basic Usage

#include <future>
#include <iostream>

int calculate() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 42;
}

int main() {
    // Launch asynchronously
    std::future<int> result = std::async(calculate);
    
    // Do other work while calculation runs
    std::cout << "Calculating...\n";
    
    // Get result (blocks if not ready)
    int value = result.get();
    std::cout << "Result: " << value << '\n';
    
    return 0;
}

Launch Policies

#include <future>

int work() { return 42; }

int main() {
    // async: Run in separate thread (guaranteed)
    auto f1 = std::async(std::launch::async, work);
    
    // deferred: Run when get() is called (lazy evaluation)
    auto f2 = std::async(std::launch::deferred, work);
    
    // async | deferred: Let implementation decide (default)
    auto f3 = std::async(std::launch::async | std::launch::deferred, work);
    
    // Default policy
    auto f4 = std::async(work);  // Same as f3
    
    return 0;
}

Visual: async vs deferred

std::launch::async:
main thread:  ───────────►get()──────►
async thread:      ├──work──┤
                   └─ spawns immediately

std::launch::deferred:
main thread:  ───────────►get()──────►
                         └work┤
                   work runs in main thread

std::launch::async | deferred (default):
Implementation chooses based on system load

async with Arguments

#include <future>
#include <string>

int process(int x, const std::string& s) {
    return x + s.length();
}

int main() {
    // Pass arguments directly
    auto f1 = std::async(process, 10, "hello");
    
    // With lambda
    auto f2 = std::async([](int x) {
        return x * 2;
    }, 21);
    
    // With member function
    class Calculator {
    public:
        int add(int a, int b) { return a + b; }
    };
    
    Calculator calc;
    auto f3 = std::async(&Calculator::add, &calc, 5, 10);
    
    std::cout << f1.get() << '\n';  // 15
    std::cout << f2.get() << '\n';  // 42
    std::cout << f3.get() << '\n';  // 15
    
    return 0;
}

std::future

Future States

┌────────────────────────────────────────────────────────┐
│                  Future States                         │
├────────────────────────────────────────────────────────┤
│                                                        │
│  Invalid ──► Valid ──► Ready ──► Retrieved            │
│     │          │         │          │                  │
│     │          │         │          └─ get() called    │
│     │          │         └─ Result available           │
│     │          └─ Waiting for result                   │
│     └─ Default constructed or moved from               │
│                                                        │
└────────────────────────────────────────────────────────┘

Future Operations

#include <future>
#include <chrono>

int slow_calculation() {
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 42;
}

int main() {
    auto future = std::async(std::launch::async, slow_calculation);
    
    // Check if valid
    if (future.valid()) {
        std::cout << "Future is valid\n";
    }
    
    // Wait for result
    future.wait();
    std::cout << "Calculation complete\n";
    
    // Wait with timeout
    auto status = future.wait_for(std::chrono::seconds(1));
    
    switch (status) {
        case std::future_status::ready:
            std::cout << "Result ready\n";
            break;
        case std::future_status::timeout:
            std::cout << "Timeout\n";
            break;
        case std::future_status::deferred:
            std::cout << "Deferred (not started)\n";
            break;
    }
    
    // Get result (blocks if not ready)
    int result = future.get();  // Can only call once!
    
    // Future is now invalid
    if (!future.valid()) {
        std::cout << "Future is invalid after get()\n";
    }
    
    return 0;
}

Exception Handling with Futures

#include <future>
#include <stdexcept>

int may_throw() {
    throw std::runtime_error("Error in async task");
    return 42;
}

int main() {
    auto future = std::async(may_throw);
    
    try {
        int result = future.get();  // Exception re-thrown here
    } catch (const std::exception& e) {
        std::cout << "Caught: " << e.what() << '\n';
    }
    
    return 0;
}

std::promise

Promise-Future Pair

#include <future>
#include <thread>

void calculate(std::promise<int> prom) {
    // Do work
    std::this_thread::sleep_for(std::chrono::seconds(1));
    
    // Set result
    prom.set_value(42);
}

int main() {
    // Create promise
    std::promise<int> prom;
    
    // Get associated future
    std::future<int> fut = prom.get_future();
    
    // Start thread with promise
    std::thread t(calculate, std::move(prom));
    
    // Wait for result
    int result = fut.get();
    std::cout << "Result: " << result << '\n';
    
    t.join();
    
    return 0;
}

Visual: Promise-Future Communication

Thread 1 (producer):        Thread 2 (consumer):
┌──────────────┐           ┌──────────────┐
│  Promise     │           │   Future     │
│              │           │              │
│ set_value(42)├──────────►│ get() ───► 42│
│              │  transfers│              │
└──────────────┘    value  └──────────────┘

Promise writes → Shared state ← Future reads

Promise with Exceptions

#include <future>
#include <thread>

void calculate_or_throw(std::promise<int> prom, bool should_throw) {
    try {
        if (should_throw) {
            throw std::runtime_error("Calculation failed");
        }
        prom.set_value(42);
    } catch (...) {
        // Set exception instead of value
        prom.set_exception(std::current_exception());
    }
}

int main() {
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();
    
    std::thread t(calculate_or_throw, std::move(prom), true);
    
    try {
        int result = fut.get();  // Exception re-thrown
    } catch (const std::exception& e) {
        std::cout << "Error: " << e.what() << '\n';
    }
    
    t.join();
    
    return 0;
}

std::packaged_task

Wrapping Function for Async Execution

#include <future>
#include <thread>

int calculate(int x) {
    return x * x;
}

int main() {
    // Create packaged task
    std::packaged_task<int(int)> task(calculate);
    
    // Get future before running task
    std::future<int> result = task.get_future();
    
    // Run task in thread
    std::thread t(std::move(task), 10);
    
    // Get result
    std::cout << "Result: " << result.get() << '\n';  // 100
    
    t.join();
    
    return 0;
}

packaged_task vs async vs promise

┌────────────────────────────────────────────────────────┐
│        async vs packaged_task vs promise               │
├────────────────────────────────────────────────────────┤
│                                                        │
│ std::async:                                            │
│   • Simplest interface                                 │
│   • Automatic thread management                        │
│   • Returns future directly                            │
│   Use when: Quick parallel task                        │
│                                                        │
│ std::packaged_task:                                    │
│   • Wraps callable                                     │
│   • Manual thread management                           │
│   • Can store and execute later                        │
│   Use when: Task queues, thread pools                  │
│                                                        │
│ std::promise:                                          │
│   • Low-level primitive                                │
│   • Manual value setting                               │
│   • Most flexible                                      │
│   Use when: Complex communication patterns             │
│                                                        │
└────────────────────────────────────────────────────────┘

std::shared_future

Multiple Readers

#include <future>
#include <thread>
#include <vector>

int calculate() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 42;
}

int main() {
    // Regular future: single consumer
    std::future<int> fut = std::async(calculate);
    
    // Convert to shared_future: multiple consumers
    std::shared_future<int> shared_fut = fut.share();
    
    // Multiple threads can get the value
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back([shared_fut, i]() {
            int value = shared_fut.get();  // All can call get()
            std::cout << "Thread " << i << " got: " << value << '\n';
        });
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    return 0;
}

Visual: future vs shared_future

std::future:
Producer ──► Future ──► Consumer
                        (single get())

std::shared_future:
                    ┌──► Consumer 1
                    │
Producer ──► Future ├──► Consumer 2
                    │
                    └──► Consumer 3
                         (multiple get())

Parallel Algorithms (C++17)

Execution Policies

#include <algorithm>
#include <execution>
#include <vector>

int main() {
    std::vector<int> vec(1000000);
    
    // Sequential (default)
    std::sort(vec.begin(), vec.end());
    
    // Parallel execution
    std::sort(std::execution::par, vec.begin(), vec.end());
    
    // Parallel unsequenced (vectorized)
    std::sort(std::execution::par_unseq, vec.begin(), vec.end());
    
    // Sequential (explicit)
    std::sort(std::execution::seq, vec.begin(), vec.end());
    
    return 0;
}

Parallel Algorithm Examples

#include <algorithm>
#include <execution>
#include <vector>
#include <numeric>

int main() {
    std::vector<int> data(10000000);
    std::iota(data.begin(), data.end(), 0);
    
    // Parallel for_each
    std::for_each(std::execution::par, data.begin(), data.end(),
        [](int& x) { x = x * x; });
    
    // Parallel transform
    std::vector<int> result(data.size());
    std::transform(std::execution::par, 
                   data.begin(), data.end(), result.begin(),
                   [](int x) { return x * 2; });
    
    // Parallel reduce
    int sum = std::reduce(std::execution::par, 
                         data.begin(), data.end());
    
    // Parallel transform_reduce (map-reduce)
    int sum_of_squares = std::transform_reduce(
        std::execution::par,
        data.begin(), data.end(),
        0,  // initial value
        std::plus<>(),  // reduction operation
        [](int x) { return x * x; }  // transformation
    );
    
    return 0;
}

Execution Policy Performance

┌───────────────────────────────────────────────────────┐
│            Execution Policy Guidelines                │
├───────────────────────────────────────────────────────┤
│ Policy     │ Parallelism │ Vectorization │ Use when   │
├────────────┼─────────────┼───────────────┼────────────┤
│ seq        │ No          │ No            │ Debug      │
│ par        │ Yes         │ No            │ Safe ops   │
│ par_unseq  │ Yes         │ Yes           │ Fast, safe │
│ unseq      │ No          │ Yes           │ Sequential │
│                                                       │
│ Requirements for par_unseq:                           │
│ • No data races                                       │
│ • No deadlocks                                        │
│ • No blocking operations                              │
│ • Safe for vectorization                              │
└───────────────────────────────────────────────────────┘

Thread Pools

Simple Thread Pool Implementation

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t threads) : stop_(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers_.emplace_back([this]() {
                while (true) {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        condition_.wait(lock, [this]() {
                            return stop_ || !tasks_.empty();
                        });
                        
                        if (stop_ && tasks_.empty()) {
                            return;
                        }
                        
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::invoke_result<F, Args...>::type> {
        
        using return_type = typename std::invoke_result<F, Args...>::type;
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> result = task->get_future();
        
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            
            if (stop_) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            
            tasks_.emplace([task]() { (*task)(); });
        }
        
        condition_.notify_one();
        return result;
    }
    
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        
        condition_.notify_all();
        
        for (std::thread& worker : workers_) {
            worker.join();
        }
    }
    
private:
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};

// Usage
int main() {
    ThreadPool pool(4);  // 4 worker threads
    
    std::vector<std::future<int>> results;
    
    for (int i = 0; i < 8; ++i) {
        results.emplace_back(
            pool.enqueue([i]() {
                std::this_thread::sleep_for(std::chrono::seconds(1));
                return i * i;
            })
        );
    }
    
    for (auto& result : results) {
        std::cout << result.get() << ' ';
    }
    std::cout << '\n';
    
    return 0;
}

Visual: Thread Pool

Task Queue:          Worker Threads:
┌──────────┐        ┌─────────────┐
│ Task 1   │───────►│  Thread 1   │
│ Task 2   │───────►│  Thread 2   │
│ Task 3   │───────►│  Thread 3   │
│ Task 4   │───────►│  Thread 4   │
│ Task 5   │        └─────────────┘
│ Task 6   │              ↓
│  ...     │        ┌─────────────┐
└──────────┘        │   Results   │
                    └─────────────┘

Benefits:
• Reuse threads (avoid creation overhead)
• Control concurrency level
• Queue tasks efficiently

Coroutines (C++20)

Basic Coroutine

#include <coroutine>
#include <iostream>

struct Generator {
    struct promise_type {
        int current_value;
        
        Generator get_return_object() {
            return Generator{
                std::coroutine_handle<promise_type>::from_promise(*this)
            };
        }
        
        std::suspend_always initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() {}
        
        std::suspend_always yield_value(int value) {
            current_value = value;
            return {};
        }
        
        void return_void() {}
    };
    
    std::coroutine_handle<promise_type> h_;
    
    ~Generator() { if (h_) h_.destroy(); }
    
    bool move_next() {
        h_.resume();
        return !h_.done();
    }
    
    int current_value() {
        return h_.promise().current_value;
    }
};

Generator counter(int start, int end) {
    for (int i = start; i < end; ++i) {
        co_yield i;  // Suspend and return value
    }
}

int main() {
    auto gen = counter(0, 5);
    
    while (gen.move_next()) {
        std::cout << gen.current_value() << ' ';
    }
    // Output: 0 1 2 3 4
    
    return 0;
}

co_await, co_yield, co_return

┌────────────────────────────────────────────────────────┐
│              Coroutine Keywords                        │
├────────────────────────────────────────────────────────┤
│                                                        │
│ co_await expr:                                         │
│   • Suspends coroutine                                 │
│   • Waits for async operation                          │
│   • Resumes when ready                                 │
│                                                        │
│ co_yield value:                                        │
│   • Suspends and returns value                         │
│   • Generator pattern                                  │
│   • Can resume later                                   │
│                                                        │
│ co_return value:                                       │
│   • Returns final value                                │
│   • Completes coroutine                                │
│   • Cannot resume                                      │
│                                                        │
└────────────────────────────────────────────────────────┘

Best Practices

1. Use std::async for Simple Cases

// GOOD: Simple and clean
auto future = std::async([]() { return expensive_calculation(); });
auto result = future.get();

2. Use Thread Pools for Many Tasks

// BAD: Creating thread for each task
for (int i = 0; i < 1000; ++i) {
    std::thread t([i]() { process(i); });
    t.detach();  // Also bad: no way to get results
}

// GOOD: Use thread pool
ThreadPool pool(std::thread::hardware_concurrency());
std::vector<std::future<int>> results;
for (int i = 0; i < 1000; ++i) {
    results.push_back(pool.enqueue(process, i));
}

3. Check future Status Before Blocking

auto future = std::async(long_calculation);

// Do other work...

// Check if ready before blocking
if (future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
    auto result = future.get();
} else {
    // Not ready, do something else
}

4. Use Parallel Algorithms When Appropriate

std::vector<int> data(1000000);

// If operation is CPU-intensive and thread-safe:
std::sort(std::execution::par, data.begin(), data.end());

Common Pitfalls

1. Blocking in Destructor

// BAD: Blocks in destructor
{
    auto future = std::async(std::launch::async, long_task);
    // Destructor blocks until task completes!
}

// GOOD: Explicitly get or detach
{
    auto future = std::async(std::launch::async, long_task);
    future.get();  // Or wait() if you don't need result
}

2. Default Launch Policy

// BAD: May not actually run concurrently
auto future = std::async(task);  // Might be deferred!

// GOOD: Explicit policy
auto future = std::async(std::launch::async, task);

3. Calling get() Multiple Times

auto future = std::async(calculate);

int result1 = future.get();  // OK
// int result2 = future.get();  // ERROR: Can only call once!

// Use shared_future for multiple gets
auto shared = std::async(calculate).share();
int result1 = shared.get();  // OK
int result2 = shared.get();  // OK

4. Exception Handling

// BAD: Exception ignored
auto future = std::async(may_throw);
// If exception thrown and never retrieved, program terminates

// GOOD: Always get() and handle exceptions
try {
    auto result = future.get();
} catch (const std::exception& e) {
    // Handle error
}

Performance Comparison

┌────────────────────────────────────────────────────────┐
│         Async Approach Performance                     │
├────────────────────────────────────────────────────────┤
│ Approach       │ Overhead │ Control │ Use case        │
├────────────────┼──────────┼─────────┼─────────────────┤
│ std::async     │ Medium   │ Low     │ Simple tasks    │
│ Thread pool    │ Low      │ High    │ Many tasks      │
│ Parallel algo  │ Low      │ None    │ STL operations  │
│ Manual threads │ Low      │ High    │ Custom needs    │
│ Coroutines     │ Very low │ High    │ Async I/O       │
└────────────────────────────────────────────────────────┘

Real-World Examples

Parallel File Processing

#include <future>
#include <vector>
#include <filesystem>

std::vector<std::string> process_files(const std::vector<std::string>& files) {
    std::vector<std::future<std::string>> futures;
    
    for (const auto& file : files) {
        futures.push_back(std::async(std::launch::async, [file]() {
            // Process file
            return file + " processed";
        }));
    }
    
    std::vector<std::string> results;
    for (auto& future : futures) {
        results.push_back(future.get());
    }
    
    return results;
}

Timeout Pattern

#include <future>
#include <chrono>

template<typename F>
auto run_with_timeout(F func, std::chrono::milliseconds timeout) {
    auto future = std::async(std::launch::async, func);
    
    if (future.wait_for(timeout) == std::future_status::timeout) {
        throw std::runtime_error("Operation timed out");
    }
    
    return future.get();
}

Summary

Choose based on needs:
• Simple parallel task → std::async
• Many small tasks → Thread pool
• STL operations → Parallel algorithms
• Complex coordination → promise/future
• Generator pattern → Coroutines

Complete Practical Example: Parallel Image Processing Pipeline

Here’s a comprehensive example integrating async, futures, promises, and parallel algorithms:

#include <iostream>
#include <future>
#include <vector>
#include <string>
#include <chrono>
#include <algorithm>
#include <execution>
#include <numeric>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>

// Simulated image data
struct Image {
    std::string name;
    int width, height;
    std::vector<uint8_t> pixels;
    
    Image(std::string n, int w, int h)
        : name(std::move(n)), width(w), height(h)
        , pixels(w * h * 3, 128) {}  // RGB
};

// Image processing operations
class ImageProcessor {
public:
    // Simulate expensive operation
    static Image resize(const Image& img, int new_width, int new_height) {
        std::cout << "  [Resizing " << img.name << "]\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        
        return Image{img.name + "_resized", new_width, new_height};
    }
    
    static Image apply_filter(const Image& img, const std::string& filter) {
        std::cout << "  [Applying " << filter << " to " << img.name << "]\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
        
        return Image{img.name + "_" + filter, img.width, img.height};
    }
    
    static Image compress(const Image& img, int quality) {
        std::cout << "  [Compressing " << img.name << " (Q=" << quality << ")]\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
        
        return Image{img.name + "_compressed", img.width, img.height};
    }
    
    static bool save(const Image& img, const std::string& path) {
        std::cout << "  [Saving " << img.name << " to " << path << "]\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
        return true;
    }
};

// 1. Async task launching
void demo_async_basics() {
    std::cout << "\n=== std::async Basics ===\n";
    
    Image img("photo.jpg", 1920, 1080);
    
    // Launch async task
    auto future_resized = std::async(std::launch::async,
        ImageProcessor::resize, img, 800, 600);
    
    std::cout << "Resize task launched, doing other work...\n";
    
    // Do other work while resizing happens
    std::this_thread::sleep_for(std::chrono::milliseconds(50));
    
    // Get result (blocks if not ready)
    Image resized = future_resized.get();
    std::cout << "Got result: " << resized.name << "\n";
}

// 2. Multiple concurrent tasks
void demo_concurrent_processing() {
    std::cout << "\n=== Concurrent Processing ===\n";
    
    std::vector<Image> images = {
        Image("img1.jpg", 1920, 1080),
        Image("img2.jpg", 1920, 1080),
        Image("img3.jpg", 1920, 1080),
        Image("img4.jpg", 1920, 1080)
    };
    
    auto start = std::chrono::steady_clock::now();
    
    // Launch multiple async tasks
    std::vector<std::future<Image>> futures;
    
    for (const auto& img : images) {
        futures.push_back(std::async(std::launch::async,
            ImageProcessor::resize, img, 800, 600));
    }
    
    // Wait for all results
    std::vector<Image> resized_images;
    for (auto& future : futures) {
        resized_images.push_back(future.get());
    }
    
    auto end = std::chrono::steady_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "Processed " << images.size() << " images in " 
              << duration.count() << "ms (parallel)\n";
}

// 3. Promise and Future for custom async operations
void demo_promise_future() {
    std::cout << "\n=== Promise and Future ===\n";
    
    std::promise<Image> promise;
    std::future<Image> future = promise.get_future();
    
    // Producer thread
    std::thread producer([&promise]() {
        std::cout << "  [Producer: Starting work...]\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        
        Image img("async_image.jpg", 1024, 768);
        std::cout << "  [Producer: Setting value]\n";
        promise.set_value(img);
    });
    
    // Consumer
    std::cout << "Consumer: Waiting for result...\n";
    Image result = future.get();  // Blocks until promise.set_value()
    std::cout << "Consumer: Got " << result.name << "\n";
    
    producer.join();
}

// 4. Processing pipeline with futures
void demo_processing_pipeline() {
    std::cout << "\n=== Processing Pipeline ===\n";
    
    Image img("original.jpg", 1920, 1080);
    
    // Chain async operations
    auto future1 = std::async(std::launch::async,
        ImageProcessor::resize, img, 800, 600);
    
    auto future2 = std::async(std::launch::async, [](std::future<Image>& f) {
        Image resized = f.get();
        return ImageProcessor::apply_filter(resized, "blur");
    }, std::ref(future1));
    
    auto future3 = std::async(std::launch::async, [](std::future<Image>& f) {
        Image filtered = f.get();
        return ImageProcessor::compress(filtered, 85);
    }, std::ref(future2));
    
    Image final_image = future3.get();
    std::cout << "Pipeline complete: " << final_image.name << "\n";
}

// 5. Parallel algorithms
void demo_parallel_algorithms() {
    std::cout << "\n=== Parallel Algorithms ===\n";
    
    // Generate large dataset
    std::vector<int> data(1000000);
    std::iota(data.begin(), data.end(), 1);
    
    auto start = std::chrono::steady_clock::now();
    
    // Parallel transform
    std::transform(std::execution::par,
        data.begin(), data.end(), data.begin(),
        [](int x) { return x * x; });
    
    // Parallel reduce
    long long sum = std::reduce(std::execution::par,
        data.begin(), data.end(), 0LL);
    
    auto end = std::chrono::steady_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    
    std::cout << "Parallel processing: sum = " << sum 
              << " (took " << duration.count() << "ms)\n";
}

// 6. Thread pool implementation
class ThreadPool {
private:
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex mutex_;
    std::condition_variable cv_;
    bool stop_ = false;
    
public:
    explicit ThreadPool(size_t num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this]() {
                while (true) {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(mutex_);
                        cv_.wait(lock, [this]() {
                            return stop_ || !tasks_.empty();
                        });
                        
                        if (stop_ && tasks_.empty()) return;
                        
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            stop_ = true;
        }
        cv_.notify_all();
        
        for (auto& worker : workers_) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }
    
    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        
        using return_type = typename std::result_of<F(Args...)>::type;
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> future = task->get_future();
        
        {
            std::unique_lock<std::mutex> lock(mutex_);
            if (stop_) {
                throw std::runtime_error("ThreadPool is stopped");
            }
            tasks_.emplace([task]() { (*task)(); });
        }
        
        cv_.notify_one();
        return future;
    }
};

void demo_thread_pool() {
    std::cout << "\n=== Thread Pool ===\n";
    
    ThreadPool pool(4);
    
    std::vector<std::future<Image>> results;
    
    // Submit tasks to pool
    for (int i = 0; i < 6; ++i) {
        Image img("img" + std::to_string(i) + ".jpg", 1920, 1080);
        
        results.push_back(pool.enqueue(
            ImageProcessor::resize, img, 800, 600
        ));
    }
    
    std::cout << "Submitted " << results.size() << " tasks to pool\n";
    
    // Get results
    for (size_t i = 0; i < results.size(); ++i) {
        Image result = results[i].get();
        std::cout << "Task " << i << " complete: " << result.name << "\n";
    }
}

// 7. Batch processing with futures
void demo_batch_processing() {
    std::cout << "\n=== Batch Processing ===\n";
    
    std::vector<Image> images;
    for (int i = 0; i < 10; ++i) {
        images.emplace_back("batch_" + std::to_string(i) + ".jpg", 1920, 1080);
    }
    
    // Process in batches
    const size_t batch_size = 3;
    std::vector<std::future<std::vector<Image>>> batch_futures;
    
    for (size_t i = 0; i < images.size(); i += batch_size) {
        size_t end = std::min(i + batch_size, images.size());
        
        std::vector<Image> batch(images.begin() + i, images.begin() + end);
        
        batch_futures.push_back(std::async(std::launch::async,
            [](std::vector<Image> imgs) {
                std::vector<Image> processed;
                for (const auto& img : imgs) {
                    processed.push_back(ImageProcessor::resize(img, 800, 600));
                }
                return processed;
            }, batch));
    }
    
    // Collect results
    int total_processed = 0;
    for (auto& future : batch_futures) {
        auto batch_result = future.get();
        total_processed += batch_result.size();
    }
    
    std::cout << "Processed " << total_processed << " images in batches\n";
}

// 8. Error handling with futures
void demo_error_handling() {
    std::cout << "\n=== Error Handling with Futures ===\n";
    
    auto future = std::async(std::launch::async, []() {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        throw std::runtime_error("Simulated processing error");
        return Image("never_reached.jpg", 800, 600);
    });
    
    try {
        Image result = future.get();  // Rethrows exception
        std::cout << "Got result: " << result.name << "\n";
    }
    catch (const std::exception& e) {
        std::cout << "Caught exception: " << e.what() << "\n";
    }
}

int main() {
    std::cout << "=== Async Programming Demo ===\n";
    
    // 1. Async basics
    demo_async_basics();
    
    // 2. Concurrent processing
    demo_concurrent_processing();
    
    // 3. Promise and future
    demo_promise_future();
    
    // 4. Processing pipeline
    demo_processing_pipeline();
    
    // 5. Parallel algorithms
    demo_parallel_algorithms();
    
    // 6. Thread pool
    demo_thread_pool();
    
    // 7. Batch processing
    demo_batch_processing();
    
    // 8. Error handling
    demo_error_handling();
    
    std::cout << "\n=== Demo Complete ===\n";
    
    return 0;
}

Concepts Demonstrated:

This example shows real-world async programming patterns!


Next Steps


Part 15 of 22 - Asynchronous Programming and Futures