cpp-know-hows

cpp related stuff

View on GitHub

Multithreading and Concurrency

Overview

C++11 introduced a standardized threading library, making concurrent programming portable across platforms. This chapter covers threads, synchronization, and thread safety.

┌─────────────────────────────────────────────────────────┐
│              MULTITHREADING COMPONENTS                  │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  THREADS          │  SYNCHRONIZATION  │  COMMUNICATION  │
│  ───────          │  ───────────────  │  ─────────────  │
│  • std::thread    │  • std::mutex     │  • Condition    │
│  • std::jthread   │  • Lock guards    │    variables    │
│  • Thread IDs     │  • Atomics        │  • Futures      │
│                   │  • Semaphores     │  • Promises     │
│                                                         │
│  C++11: Basic threading                                 │
│  C++14: Shared locks                                    │
│  C++17: Parallel algorithms                             │
│  C++20: jthread, stop_token, semaphores, barriers       │
│  C++23: Move-only function                              │
│                                                         │
└─────────────────────────────────────────────────────────┘

std::thread Basics

Creating Threads

#include <thread>
#include <iostream>

void hello() {
    std::cout << "Hello from thread!\n";
}

void hello_with_arg(int n) {
    std::cout << "Hello " << n << " times!\n";
}

int main() {
    // Create thread from function
    std::thread t1(hello);
    
    // Create thread with arguments
    std::thread t2(hello_with_arg, 5);
    
    // Create thread from lambda
    std::thread t3([]() {
        std::cout << "Hello from lambda!\n";
    });
    
    // Must join or detach before thread object is destroyed
    t1.join();  // Wait for thread to finish
    t2.join();
    t3.join();
    
    return 0;
}

Thread Lifecycle

┌────────────────────────────────────────────────────────┐
│                  Thread Lifecycle                      │
├────────────────────────────────────────────────────────┤
│                                                        │
│  Created ──► Running ──► Finished                      │
│     │           │                                      │
│     │           └─► join() ──► Thread destroyed        │
│     │                                                  │
│     └─► detach() ──► Thread runs independently         │
│                                                        │
└────────────────────────────────────────────────────────┘

Join vs Detach

#include <thread>
#include <chrono>

void work() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Work done\n";
}

int main() {
    // Join: Wait for thread to complete
    {
        std::thread t(work);
        t.join();  // Blocks until thread finishes
        std::cout << "Thread completed\n";
    }
    
    // Detach: Thread runs independently
    {
        std::thread t(work);
        t.detach();  // Thread continues running
        std::cout << "Thread detached\n";
        // Main may exit before thread finishes!
    }
    
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

Thread with Member Functions

#include <thread>

class Worker {
public:
    void do_work(int n) {
        std::cout << "Working " << n << " times\n";
    }
    
    void operator()() {
        std::cout << "Operator() called\n";
    }
};

int main() {
    Worker w;
    
    // Method 1: Using member function pointer
    std::thread t1(&Worker::do_work, &w, 5);
    
    // Method 2: Using function object
    std::thread t2(w);  // Calls operator()
    
    // Method 3: Using lambda
    std::thread t3([&w]() {
        w.do_work(10);
    });
    
    t1.join();
    t2.join();
    t3.join();
    
    return 0;
}

Thread Management

#include <thread>
#include <iostream>

int main() {
    std::thread t([]() {
        std::cout << "Thread running\n";
    });
    
    // Get thread ID
    std::thread::id tid = t.get_id();
    std::cout << "Thread ID: " << tid << '\n';
    
    // Check if joinable
    if (t.joinable()) {
        std::cout << "Thread is joinable\n";
        t.join();
    }
    
    // After join, thread is no longer joinable
    if (!t.joinable()) {
        std::cout << "Thread is not joinable\n";
    }
    
    // Hardware concurrency
    unsigned int n = std::thread::hardware_concurrency();
    std::cout << "Hardware supports " << n << " concurrent threads\n";
    
    return 0;
}

std::jthread (C++20)

Automatic Joining Thread

#include <thread>
#include <iostream>

void work() {
    std::cout << "Working...\n";
}

int main() {
    // OLD: std::thread requires explicit join/detach
    {
        std::thread t(work);
        t.join();  // Must call join or detach
    }
    
    // NEW: std::jthread joins automatically
    {
        std::jthread jt(work);
        // Automatically joins on destruction
    }
    
    return 0;
}

Cooperative Cancellation with stop_token

#include <thread>
#include <chrono>
#include <iostream>

void worker(std::stop_token stoken) {
    while (!stoken.stop_requested()) {
        std::cout << "Working...\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
    std::cout << "Stop requested, exiting\n";
}

int main() {
    std::jthread jt(worker);
    
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    // Request stop
    jt.request_stop();
    
    // jthread automatically joins on destruction
    return 0;
}

Visual: std::thread vs std::jthread

std::thread:
┌──────────────────────────────────────┐
│ Create thread                        │
│ Do work                              │
│ Must explicitly join or detach       │ ← Can forget!
│ Destructor terminates if not joined │ ← Danger!
└──────────────────────────────────────┘

std::jthread:
┌──────────────────────────────────────┐
│ Create thread                        │
│ Do work                              │
│ Destructor automatically joins       │ ← Safe!
│ Built-in cancellation support        │ ← Bonus!
└──────────────────────────────────────┘

Mutexes and Locks

std::mutex

#include <thread>
#include <mutex>
#include <iostream>

std::mutex mtx;
int shared_counter = 0;

void increment(int n) {
    for (int i = 0; i < n; ++i) {
        mtx.lock();
        ++shared_counter;  // Critical section
        mtx.unlock();
    }
}

int main() {
    std::thread t1(increment, 1000);
    std::thread t2(increment, 1000);
    
    t1.join();
    t2.join();
    
    std::cout << "Counter: " << shared_counter << '\n';  // 2000
    
    return 0;
}

std::lock_guard

#include <thread>
#include <mutex>

std::mutex mtx;
int shared_counter = 0;

void increment_safe(int n) {
    for (int i = 0; i < n; ++i) {
        std::lock_guard<std::mutex> lock(mtx);  // RAII lock
        ++shared_counter;
        // Automatically unlocked when lock goes out of scope
    }
}

int main() {
    std::thread t1(increment_safe, 1000);
    std::thread t2(increment_safe, 1000);
    
    t1.join();
    t2.join();
    
    std::cout << "Counter: " << shared_counter << '\n';
    
    return 0;
}

std::unique_lock

#include <thread>
#include <mutex>

std::mutex mtx;

void flexible_locking() {
    std::unique_lock<std::mutex> lock(mtx);
    
    // Do some work
    
    lock.unlock();  // Can manually unlock
    
    // Do work that doesn't need lock
    
    lock.lock();  // Can manually lock again
    
    // More critical section work
}  // Automatically unlocks on destruction

// Deferred locking
void deferred_lock() {
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
    // Mutex not locked yet
    
    // ... do preparatory work ...
    
    lock.lock();  // Lock when ready
    // Critical section
}

// Try lock
void try_locking() {
    std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);
    
    if (lock.owns_lock()) {
        // Got the lock
    } else {
        // Didn't get lock, do something else
    }
}

Lock Comparison

┌────────────────────────────────────────────────────────┐
│              Lock Types Comparison                     │
├────────────────────────────────────────────────────────┤
│              │ lock_guard │ unique_lock │ scoped_lock  │
├──────────────┼────────────┼─────────────┼──────────────┤
│ RAII         │ Yes        │ Yes         │ Yes          │
│ Manual lock  │ No         │ Yes         │ No           │
│ Manual unlock│ No         │ Yes         │ No           │
│ Try lock     │ No         │ Yes         │ No           │
│ Defer lock   │ No         │ Yes         │ No           │
│ Multi-mutex  │ No         │ No          │ Yes (C++17)  │
│ Overhead     │ Minimal    │ Small       │ Minimal      │
│                                                        │
│ Use lock_guard when: Simple RAII locking               │
│ Use unique_lock when: Need flexibility                 │
│ Use scoped_lock when: Locking multiple mutexes         │
└────────────────────────────────────────────────────────┘

std::scoped_lock (C++17)

#include <thread>
#include <mutex>

std::mutex mtx1, mtx2;
int resource1 = 0, resource2 = 0;

void transfer(int amount) {
    // Locks both mutexes atomically (avoids deadlock)
    std::scoped_lock lock(mtx1, mtx2);
    
    resource1 -= amount;
    resource2 += amount;
}

int main() {
    std::thread t1(transfer, 10);
    std::thread t2(transfer, 20);
    
    t1.join();
    t2.join();
    
    return 0;
}

Deadlock Prevention

#include <thread>
#include <mutex>

std::mutex mtx1, mtx2;

// BAD: Potential deadlock
void bad_transfer() {
    std::lock_guard<std::mutex> lock1(mtx1);
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    std::lock_guard<std::mutex> lock2(mtx2);
    // If another thread locks in opposite order: DEADLOCK!
}

// GOOD: std::lock (C++11) - locks multiple mutexes atomically
void good_transfer() {
    std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
    std::lock(lock1, lock2);  // Deadlock-free
}

// BEST: std::scoped_lock (C++17)
void best_transfer() {
    std::scoped_lock lock(mtx1, mtx2);  // Atomic, deadlock-free
}

Visual: Deadlock Scenario

Thread 1              Thread 2
────────              ────────
Lock mtx1             Lock mtx2
  ✓                     ✓
Wait for mtx2  ◄────► Wait for mtx1
  ⏳                     ⏳
DEADLOCK!             DEADLOCK!

Solution with std::scoped_lock:
Thread 1              Thread 2
────────              ────────
Request mtx1, mtx2    Request mtx1, mtx2
One succeeds first    One succeeds first
  ✓                   Waits for release
Complete                ⏳
Release               Lock both
                        ✓
                      Complete

Reader-Writer Locks

std::shared_mutex (C++17)

#include <shared_mutex>
#include <thread>
#include <vector>

class ThreadSafeCounter {
    mutable std::shared_mutex mtx_;
    int value_ = 0;
    
public:
    // Multiple readers can access simultaneously
    int get() const {
        std::shared_lock<std::shared_mutex> lock(mtx_);
        return value_;
    }
    
    // Only one writer can access
    void increment() {
        std::unique_lock<std::shared_mutex> lock(mtx_);
        ++value_;
    }
    
    void set(int value) {
        std::unique_lock<std::shared_mutex> lock(mtx_);
        value_ = value;
    }
};

int main() {
    ThreadSafeCounter counter;
    
    // Many readers
    std::vector<std::thread> readers;
    for (int i = 0; i < 10; ++i) {
        readers.emplace_back([&counter]() {
            for (int j = 0; j < 100; ++j) {
                int val = counter.get();  // Concurrent reads OK
            }
        });
    }
    
    // Few writers
    std::vector<std::thread> writers;
    for (int i = 0; i < 2; ++i) {
        writers.emplace_back([&counter]() {
            for (int j = 0; j < 100; ++j) {
                counter.increment();  // Exclusive write
            }
        });
    }
    
    for (auto& t : readers) t.join();
    for (auto& t : writers) t.join();
    
    return 0;
}

Visual: Reader-Writer Lock

Shared Lock (read):
Reader 1 ──┐
Reader 2 ──┤──► All can read simultaneously
Reader 3 ──┘

Exclusive Lock (write):
Writer ──► Only one writer, blocks all readers

Timeline:
┌─────────────────────────────────────────────┐
│ R1 R2 R3 │ W1 │ R4 R5 │ W2 │ R6          │
├─────────────────────────────────────────────┤
│ ← Multiple readers can run together         │
│           ← Writer blocks all               │
│                 ← Readers together          │
│                        ← Writer blocks all  │
└─────────────────────────────────────────────┘

Atomics

std::atomic Basics

#include <atomic>
#include <thread>
#include <vector>

std::atomic<int> counter(0);  // Lock-free atomic integer

void increment(int n) {
    for (int i = 0; i < n; ++i) {
        ++counter;  // Atomic increment, no mutex needed!
        // Equivalent to: counter.fetch_add(1);
    }
}

int main() {
    std::vector<std::thread> threads;
    
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(increment, 1000);
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Counter: " << counter << '\n';  // 10000
    
    return 0;
}

Atomic Operations

#include <atomic>

int main() {
    std::atomic<int> x(10);
    
    // Basic operations
    x = 20;                    // store
    int val = x;               // load
    
    // Arithmetic (for integral types)
    ++x;                       // 21
    x++;                       // 22
    x += 5;                    // 27
    x -= 3;                    // 24
    
    // Fetch and modify
    int old = x.fetch_add(5);  // old = 24, x = 29
    old = x.fetch_sub(4);      // old = 29, x = 25
    
    // Compare and swap
    int expected = 25;
    bool success = x.compare_exchange_strong(expected, 30);
    // If x == expected, set x = 30, return true
    // Else, set expected = x, return false
    
    // Check if lock-free
    if (x.is_lock_free()) {
        std::cout << "Lock-free implementation\n";
    }
    
    return 0;
}

Memory Ordering

#include <atomic>
#include <thread>

std::atomic<int> data(0);
std::atomic<bool> ready(false);

void producer() {
    data.store(42, std::memory_order_relaxed);
    ready.store(true, std::memory_order_release);  // Release
}

void consumer() {
    while (!ready.load(std::memory_order_acquire)) {  // Acquire
        // Spin wait
    }
    int value = data.load(std::memory_order_relaxed);
    std::cout << "Data: " << value << '\n';
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);
    
    t1.join();
    t2.join();
    
    return 0;
}

Memory Order Guide

┌────────────────────────────────────────────────────────┐
│               Memory Order Semantics                   │
├────────────────────────────────────────────────────────┤
│ Order            │ Use                │ Synchronization│
├──────────────────┼────────────────────┼────────────────┤
│ relaxed          │ Counter, stats     │ None           │
│ acquire          │ Consume data       │ Read-acquire   │
│ release          │ Publish data       │ Write-release  │
│ acq_rel          │ Read-modify-write  │ Both           │
│ seq_cst (default)│ When unsure        │ Full ordering  │
│                                                        │
│ Relaxed: Fastest, no synchronization                  │
│ Acquire/Release: Common, publish-subscribe pattern    │
│ Sequential: Slowest, easiest to reason about          │
└────────────────────────────────────────────────────────┘

Condition Variables

Basic Wait and Notify

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool done = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        
        {
            std::lock_guard<std::mutex> lock(mtx);
            data_queue.push(i);
            std::cout << "Produced: " << i << '\n';
        }
        cv.notify_one();  // Wake up one consumer
    }
    
    {
        std::lock_guard<std::mutex> lock(mtx);
        done = true;
    }
    cv.notify_all();  // Wake up all consumers
}

void consumer(int id) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        
        // Wait until data available or done
        cv.wait(lock, []() {
            return !data_queue.empty() || done;
        });
        
        if (data_queue.empty()) {
            // Done and no more data
            break;
        }
        
        int value = data_queue.front();
        data_queue.pop();
        lock.unlock();
        
        std::cout << "Consumer " << id << " got: " << value << '\n';
    }
}

int main() {
    std::thread prod(producer);
    std::thread cons1(consumer, 1);
    std::thread cons2(consumer, 2);
    
    prod.join();
    cons1.join();
    cons2.join();
    
    return 0;
}

Visual: Condition Variable

Producer                        Consumer
────────                        ────────
Lock mutex                      Lock mutex
Add data to queue               Wait on cv (releases mutex)
Unlock mutex                    (sleeps)
Notify cv           ─────►      Wakes up (reacquires mutex)
                                Check condition (queue not empty)
                                Process data
                                Unlock mutex

Spurious Wakeups

// BAD: Not protected against spurious wakeups
void bad_consumer() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock);  // May wake up even if not notified!
    // Process data (might not be ready)
}

// GOOD: Always use predicate
void good_consumer() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []() {
        return !data_queue.empty();  // Only proceed if true
    });
    // Process data (definitely ready)
}

// The predicate version is equivalent to:
void equivalent() {
    std::unique_lock<std::mutex> lock(mtx);
    while (!condition) {
        cv.wait(lock);
    }
}

Thread-Safe Data Structures

Thread-Safe Queue

#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>

template<typename T>
class ThreadSafeQueue {
private:
    mutable std::mutex mtx_;
    std::condition_variable cv_;
    std::queue<T> queue_;
    
public:
    void push(T value) {
        std::lock_guard<std::mutex> lock(mtx_);
        queue_.push(std::move(value));
        cv_.notify_one();
    }
    
    std::optional<T> pop() {
        std::unique_lock<std::mutex> lock(mtx_);
        cv_.wait(lock, [this]() { return !queue_.empty(); });
        
        T value = std::move(queue_.front());
        queue_.pop();
        return value;
    }
    
    std::optional<T> try_pop() {
        std::lock_guard<std::mutex> lock(mtx_);
        if (queue_.empty()) {
            return std::nullopt;
        }
        
        T value = std::move(queue_.front());
        queue_.pop();
        return value;
    }
    
    bool empty() const {
        std::lock_guard<std::mutex> lock(mtx_);
        return queue_.empty();
    }
    
    size_t size() const {
        std::lock_guard<std::mutex> lock(mtx_);
        return queue_.size();
    }
};

Thread Local Storage

thread_local Variables

#include <thread>
#include <iostream>

thread_local int tls_counter = 0;  // Each thread has its own copy

void increment() {
    ++tls_counter;
    std::cout << "Thread " << std::this_thread::get_id() 
              << ": " << tls_counter << '\n';
}

int main() {
    std::thread t1([]() {
        for (int i = 0; i < 3; ++i) increment();
    });
    
    std::thread t2([]() {
        for (int i = 0; i < 3; ++i) increment();
    });
    
    t1.join();
    t2.join();
    
    // Each thread's counter goes 1, 2, 3
    // They don't interfere with each other
    
    return 0;
}

Common Patterns

Double-Checked Locking (Singleton)

#include <mutex>
#include <atomic>

class Singleton {
    static std::atomic<Singleton*> instance_;
    static std::mutex mtx_;
    
    Singleton() = default;
    
public:
    static Singleton* getInstance() {
        Singleton* tmp = instance_.load(std::memory_order_acquire);
        if (tmp == nullptr) {
            std::lock_guard<std::mutex> lock(mtx_);
            tmp = instance_.load(std::memory_order_relaxed);
            if (tmp == nullptr) {
                tmp = new Singleton;
                instance_.store(tmp, std::memory_order_release);
            }
        }
        return tmp;
    }
};

// C++11: Better way using std::call_once
class BetterSingleton {
    static std::once_flag init_flag_;
    static BetterSingleton* instance_;
    
    BetterSingleton() = default;
    
public:
    static BetterSingleton* getInstance() {
        std::call_once(init_flag_, []() {
            instance_ = new BetterSingleton;
        });
        return instance_;
    }
};

// C++11: Best way using local static (thread-safe)
class BestSingleton {
    BestSingleton() = default;
    
public:
    static BestSingleton& getInstance() {
        static BestSingleton instance;  // Thread-safe in C++11+
        return instance;
    }
};

Best Practices

1. Always Prefer RAII Locks

// BAD
mtx.lock();
// ... work ...
mtx.unlock();

// GOOD
{
    std::lock_guard<std::mutex> lock(mtx);
    // ... work ...
}  // Automatically unlocked

2. Keep Critical Sections Small

// BAD: Long critical section
{
    std::lock_guard<std::mutex> lock(mtx);
    expensive_computation();  // Holds lock unnecessarily
    shared_data++;
}

// GOOD: Minimal critical section
expensive_computation();  // Do this first
{
    std::lock_guard<std::mutex> lock(mtx);
    shared_data++;  // Quick operation under lock
}

3. Use Atomics for Simple Counters

// GOOD for simple counter
std::atomic<int> counter(0);
++counter;  // Lock-free

// Overkill for simple counter
std::mutex mtx;
int counter = 0;
{
    std::lock_guard<std::mutex> lock(mtx);
    ++counter;
}

4. Prefer jthread over thread (C++20)

// OLD
std::thread t(work);
t.join();  // Must remember

// NEW
std::jthread jt(work);
// Automatically joins

Common Pitfalls

1. Forgetting to Join/Detach

// BAD: Thread destroyed without join/detach
void bad() {
    std::thread t(work);
    // Terminates program if t not joined!
}

// GOOD: Always join or detach
void good() {
    std::thread t(work);
    t.join();
}

2. Data Race

int shared_var = 0;

// BAD: Data race
void increment() {
    ++shared_var;  // Not atomic!
}

// GOOD: Protected access
std::mutex mtx;
void increment_safe() {
    std::lock_guard<std::mutex> lock(mtx);
    ++shared_var;
}

// BETTER: Use atomic
std::atomic<int> atomic_var(0);
void increment_atomic() {
    ++atomic_var;  // Atomic operation
}

3. Deadlock

// BAD: Can deadlock
std::mutex m1, m2;
void thread1() {
    std::lock_guard<std::mutex> lock1(m1);
    std::lock_guard<std::mutex> lock2(m2);
}
void thread2() {
    std::lock_guard<std::mutex> lock2(m2);
    std::lock_guard<std::mutex> lock1(m1);
}

// GOOD: Lock in same order or use scoped_lock
void safe() {
    std::scoped_lock lock(m1, m2);
}

Complete Practical Example: Concurrent Web Server Simulator

Here’s a comprehensive example integrating threads, synchronization, atomics, and thread-safe data structures:

#include <iostream>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <atomic>
#include <condition_variable>
#include <queue>
#include <vector>
#include <string>
#include <chrono>
#include <random>
#include <functional>

// Thread-safe request queue
class RequestQueue {
private:
    std::queue<std::string> queue_;
    mutable std::mutex mutex_;
    std::condition_variable cv_;
    std::atomic<bool> shutdown_{false};
    
public:
    void push(std::string request) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            queue_.push(std::move(request));
        }
        cv_.notify_one();  // Wake up one waiting thread
    }
    
    bool pop(std::string& request, std::chrono::milliseconds timeout) {
        std::unique_lock<std::mutex> lock(mutex_);
        
        // Wait for request or shutdown
        if (!cv_.wait_for(lock, timeout, [this] {
            return !queue_.empty() || shutdown_;
        })) {
            return false;  // Timeout
        }
        
        if (shutdown_ && queue_.empty()) {
            return false;
        }
        
        request = std::move(queue_.front());
        queue_.pop();
        return true;
    }
    
    void shutdown() {
        shutdown_ = true;
        cv_.notify_all();  // Wake all waiting threads
    }
    
    size_t size() const {
        std::lock_guard<std::mutex> lock(mutex_);
        return queue_.size();
    }
};

// Thread-safe statistics with atomics
class ServerStats {
private:
    std::atomic<uint64_t> total_requests_{0};
    std::atomic<uint64_t> successful_requests_{0};
    std::atomic<uint64_t> failed_requests_{0};
    std::atomic<uint64_t> total_processing_time_ms_{0};
    
public:
    void record_request(bool success, uint64_t processing_time_ms) {
        total_requests_.fetch_add(1, std::memory_order_relaxed);
        
        if (success) {
            successful_requests_.fetch_add(1, std::memory_order_relaxed);
        } else {
            failed_requests_.fetch_add(1, std::memory_order_relaxed);
        }
        
        total_processing_time_ms_.fetch_add(processing_time_ms, 
                                            std::memory_order_relaxed);
    }
    
    void print_stats() const {
        uint64_t total = total_requests_.load(std::memory_order_relaxed);
        uint64_t successful = successful_requests_.load(std::memory_order_relaxed);
        uint64_t failed = failed_requests_.load(std::memory_order_relaxed);
        uint64_t total_time = total_processing_time_ms_.load(std::memory_order_relaxed);
        
        std::cout << "\n=== Server Statistics ===\n";
        std::cout << "Total requests: " << total << "\n";
        std::cout << "Successful: " << successful << "\n";
        std::cout << "Failed: " << failed << "\n";
        
        if (total > 0) {
            std::cout << "Success rate: " 
                      << (100.0 * successful / total) << "%\n";
            std::cout << "Average processing time: " 
                      << (total_time / total) << "ms\n";
        }
    }
};

// Shared resource cache with reader-writer lock
class ResourceCache {
private:
    std::map<std::string, std::string> cache_;
    mutable std::shared_mutex mutex_;
    std::atomic<uint64_t> hits_{0};
    std::atomic<uint64_t> misses_{0};
    
public:
    // Multiple readers can access simultaneously
    bool get(const std::string& key, std::string& value) const {
        std::shared_lock<std::shared_mutex> lock(mutex_);
        
        auto it = cache_.find(key);
        if (it != cache_.end()) {
            value = it->second;
            hits_.fetch_add(1, std::memory_order_relaxed);
            return true;
        }
        
        misses_.fetch_add(1, std::memory_order_relaxed);
        return false;
    }
    
    // Only one writer at a time
    void put(const std::string& key, std::string value) {
        std::unique_lock<std::shared_mutex> lock(mutex_);
        cache_[key] = std::move(value);
    }
    
    void print_cache_stats() const {
        uint64_t hits = hits_.load(std::memory_order_relaxed);
        uint64_t misses = misses_.load(std::memory_order_relaxed);
        uint64_t total = hits + misses;
        
        std::cout << "\n=== Cache Statistics ===\n";
        std::cout << "Hits: " << hits << "\n";
        std::cout << "Misses: " << misses << "\n";
        
        if (total > 0) {
            std::cout << "Hit rate: " << (100.0 * hits / total) << "%\n";
        }
    }
};

// Worker thread class
class WorkerThread {
private:
    std::thread thread_;
    int id_;
    RequestQueue& queue_;
    ServerStats& stats_;
    ResourceCache& cache_;
    std::atomic<bool>& running_;
    
    void process_request(const std::string& request) {
        auto start = std::chrono::steady_clock::now();
        
        // Check cache first
        std::string cached_response;
        if (cache_.get(request, cached_response)) {
            std::cout << "Worker " << id_ << ": Cache hit for " 
                      << request << "\n";
        } else {
            // Simulate processing
            std::this_thread::sleep_for(
                std::chrono::milliseconds(10 + (std::rand() % 40))
            );
            
            cached_response = "Response for " + request;
            cache_.put(request, cached_response);
            
            std::cout << "Worker " << id_ << ": Processed " 
                      << request << "\n";
        }
        
        auto end = std::chrono::steady_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
            end - start
        ).count();
        
        // Randomly fail some requests (10%)
        bool success = (std::rand() % 10) != 0;
        stats_.record_request(success, duration);
    }
    
    void run() {
        while (running_.load(std::memory_order_relaxed)) {
            std::string request;
            
            if (queue_.pop(request, std::chrono::milliseconds(100))) {
                try {
                    process_request(request);
                }
                catch (const std::exception& e) {
                    std::cerr << "Worker " << id_ << " error: " 
                              << e.what() << "\n";
                    stats_.record_request(false, 0);
                }
            }
        }
        
        std::cout << "Worker " << id_ << " shutting down\n";
    }
    
public:
    WorkerThread(int id, RequestQueue& queue, ServerStats& stats,
                ResourceCache& cache, std::atomic<bool>& running)
        : id_(id), queue_(queue), stats_(stats)
        , cache_(cache), running_(running) {
        
        thread_ = std::thread(&WorkerThread::run, this);
    }
    
    ~WorkerThread() {
        if (thread_.joinable()) {
            thread_.join();
        }
    }
    
    // Delete copy, allow move
    WorkerThread(const WorkerThread&) = delete;
    WorkerThread& operator=(const WorkerThread&) = delete;
    
    WorkerThread(WorkerThread&& other) noexcept
        : thread_(std::move(other.thread_))
        , id_(other.id_)
        , queue_(other.queue_)
        , stats_(other.stats_)
        , cache_(other.cache_)
        , running_(other.running_) {}
};

// Main server class
class WebServer {
private:
    RequestQueue queue_;
    ServerStats stats_;
    ResourceCache cache_;
    std::vector<WorkerThread> workers_;
    std::atomic<bool> running_{true};
    
public:
    void start(int num_workers) {
        std::cout << "Starting server with " << num_workers << " workers\n";
        
        // Create worker threads
        for (int i = 0; i < num_workers; ++i) {
            workers_.emplace_back(i, queue_, stats_, cache_, running_);
        }
    }
    
    void handle_request(const std::string& request) {
        queue_.push(request);
    }
    
    void stop() {
        std::cout << "\nShutting down server...\n";
        running_ = false;
        queue_.shutdown();
        
        // Wait for all workers
        workers_.clear();
        
        std::cout << "Server stopped\n";
        stats_.print_stats();
        cache_.print_cache_stats();
    }
    
    void wait_until_empty() {
        while (queue_.size() > 0) {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        // Give some extra time for last requests
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
};

// Load generator
void generate_load(WebServer& server, int num_requests) {
    std::vector<std::string> endpoints = {
        "/api/users",
        "/api/products",
        "/api/orders",
        "/api/users",  // Duplicate for cache hits
        "/api/products",  // Duplicate for cache hits
        "/health",
        "/status"
    };
    
    std::cout << "\nGenerating " << num_requests << " requests...\n";
    
    for (int i = 0; i < num_requests; ++i) {
        std::string endpoint = endpoints[std::rand() % endpoints.size()];
        std::string request = endpoint + "?id=" + std::to_string(i);
        
        server.handle_request(request);
        
        // Small delay between requests
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
    }
    
    std::cout << "Finished generating requests\n";
}

int main() {
    std::srand(std::time(nullptr));
    
    WebServer server;
    
    // Start server with 4 worker threads
    server.start(4);
    
    // Simulate client load
    std::thread load_generator([&server]() {
        generate_load(server, 50);
    });
    
    // Let server run
    load_generator.join();
    
    // Wait for queue to empty
    server.wait_until_empty();
    
    // Shutdown
    server.stop();
    
    return 0;
}

Output (Sample):

Starting server with 4 workers

Generating 50 requests...
Worker 0: Processed /api/users?id=0
Worker 1: Processed /api/products?id=1
Worker 2: Processed /health?id=2
Worker 3: Processed /api/orders?id=3
Worker 0: Cache hit for /api/users?id=4
Worker 1: Processed /status?id=5
...
Finished generating requests

Shutting down server...
Worker 0 shutting down
Worker 1 shutting down
Worker 2 shutting down
Worker 3 shutting down
Server stopped

=== Server Statistics ===
Total requests: 50
Successful: 45
Failed: 5
Success rate: 90%
Average processing time: 18ms

=== Cache Statistics ===
Hits: 22
Misses: 28
Hit rate: 44%

Concepts Demonstrated:

This example shows a real-world use case for multithreading!


Next Steps


Part 14 of 22 - Multithreading and Concurrency