- Overview
- Security Architecture
- Authentication & Authorization
- Data Protection
- Network Security
- Application Security
- Infrastructure Security
- Operational Security
- Compliance & Standards
- Security Monitoring
- Incident Response
- Security Best Practices
- Penetration Testing
- Security Tools
- Future Security Enhancements
MyPrivateVPN Network implements a comprehensive, multi-layered security architecture designed to protect user data, ensure system integrity, and maintain compliance with industry standards. This guide outlines the security measures, protocols, and best practices implemented throughout the platform.
Our security approach is built on these core principles:
- Zero Trust Architecture: Never trust, always verify
- Defense in Depth: Multiple security layers
- Least Privilege: Minimal access rights
- Security by Design: Security integrated from the start
- Continuous Monitoring: Real-time threat detection
- Compliance First: Meet regulatory requirements
- Transparency: Clear security practices
- Confidentiality: Protect user data and communications
- Integrity: Ensure data accuracy and system reliability
- Availability: Maintain service accessibility
- Authentication: Verify user and system identities
- Authorization: Control access to resources
- Non-repudiation: Prevent denial of actions
- Auditability: Comprehensive logging and monitoring
┌─────────────────────────────────────────────────────────────────┐
│ Perimeter Security │
├─────────────────────────────────────────────────────────────────┤
│ Web Application Firewall (WAF) │
│ DDoS Protection (Cloudflare) │
│ Rate Limiting & Throttling │
│ Geographic Blocking │
│ Bot Detection & Prevention │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Network Security │
├─────────────────────────────────────────────────────────────────┤
│ Network Segmentation │
│ Virtual Private Cloud (VPC/VNet) │
│ Security Groups & Network ACLs │
│ Intrusion Detection System (IDS) │
│ Network Monitoring │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Application Security │
├─────────────────────────────────────────────────────────────────┤
│ Authentication & Authorization │
│ Input Validation & Sanitization │
│ SQL Injection Prevention │
│ Cross-Site Scripting (XSS) Prevention │
│ Cross-Site Request Forgery (CSRF) Protection │
│ Security Headers & HTTP Policies │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Data Security │
├─────────────────────────────────────────────────────────────────┤
│ Encryption at Rest (AES-256) │
│ Encryption in Transit (TLS 1.3) │
│ Data Classification & Handling │
│ Secure Key Management │
│ Database Security │
│ File System Security │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Identity & Access Management │
├─────────────────────────────────────────────────────────────────┐
│ Multi-Factor Authentication (MFA) │
│ Role-Based Access Control (RBAC) │
│ Privilege Escalation Prevention │
│ Session Management │
│ Identity Federation │
│ Access Review & Certification │
└─────────────────────────────────────────────────────────────────┘
-
External Threats
- DDoS attacks
- Brute force attacks
- SQL injection
- Cross-site scripting
- Man-in-the-middle attacks
-
Internal Threats
- Malicious insiders
- Credential theft
- Privilege abuse
- Data exfiltration
-
Technical Threats
- System vulnerabilities
- Configuration errors
- Software bugs
- Network security gaps
-
Compliance Threats
- Data breaches
- Privacy violations
- Regulatory non-compliance
- Audit failures
| Threat Category | Likelihood | Impact | Risk Level | Mitigation Priority |
|---|---|---|---|---|
| DDoS Attack | High | High | Critical | 1 |
| Credential Compromise | Medium | High | High | 2 |
| Data Breach | Low | Critical | High | 3 |
| SQL Injection | Medium | Medium | Medium | 4 |
| Insider Threat | Low | High | Medium | 5 |
┌─────────────────────────────────────────────────────────────────┐
│ Authentication Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. User Login Request │
│ ↓ │
│ 2. Credential Validation │
│ ↓ │
│ 3. Multi-Factor Authentication (if enabled) │
│ ↓ │
│ 4. JWT Token Generation │
│ ↓ │
│ 5. Session Management │
│ ↓ │
│ 6. API Access Authorization │
└─────────────────────────────────────────────────────────────────┘
-
Email/Password Authentication
- Secure password policies
- Account lockout protection
- Password complexity requirements
- Regular password expiration
-
Multi-Factor Authentication (MFA)
- Time-based One-Time Passwords (TOTP)
- SMS-based verification
- Email-based verification
- Recovery codes
-
Session Management
- JWT token-based sessions
- Automatic token refresh
- Session timeout policies
- Concurrent session limits
-
Account Recovery
- Secure password reset
- Email verification required
- Account lockout protection
- Recovery audit logging
# JWT token configuration
JWT_CONFIG = {
"algorithm": "HS256", # or RS256 for asymmetric
"access_token_expire_minutes": 30,
"refresh_token_expire_days": 30,
"issuer": "myprivatevpn.com",
"audience": "myprivatevpn-api",
"secret_key": "secure_random_secret_key",
"token_type": "Bearer",
"include_claims": {
"user_id": "UUID",
"email": "string",
"role": "string",
"permissions": ["list"],
"exp": "timestamp",
"iat": "timestamp",
"iss": "issuer",
"aud": "audience"
}
}┌─────────────────────────────────────────────────────────────────┐
│ User Roles │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Administrator: │
│ • Full system access │
│ • User management │
│ • System configuration │
│ • Security settings │
│ │
│ User: │
│ • Own data access │
│ • Client management │
│ • Connection management │
│ • Basic reporting │
│ │
│ Read-Only: │
│ • Read-only access to own data │
│ • No management capabilities │
│ • Basic monitoring │
└─────────────────────────────────────────────────────────────────┘
| Resource | Admin | User | Read-Only |
|---|---|---|---|
| Users | CRUD | Read | Read |
| Servers | CRUD | Read | Read |
| Clients | CRUD | CRUD | Read |
| Connections | CRUD | CRUD | Read |
| Metrics | CRUD | Read | Read |
| Audit Logs | CRUD | Read | Read |
| System Config | CRUD | None | None |
-- Enable RLS on all tables
ALTER TABLE vpn_servers ENABLE ROW LEVEL SECURITY;
ALTER TABLE vpn_clients ENABLE ROW LEVEL SECURITY;
ALTER TABLE vpn_connections ENABLE ROW LEVEL SECURITY;
ALTER TABLE server_metrics ENABLE ROW LEVEL SECURITY;
-- Users can only access their own data
CREATE POLICY "Users can view own clients" ON vpn_clients
FOR SELECT USING (auth.uid() = user_id);
CREATE POLICY "Users can manage own clients" ON vpn_clients
FOR ALL USING (auth.uid() = user_id);
-- Admin users have full access
CREATE POLICY "Admins have full access" ON vpn_servers
FOR ALL USING (
EXISTS (
SELECT 1 FROM user_roles
WHERE user_id = auth.uid()
AND role = 'admin'
)
);┌─────────────────────────────────────────────────────────────────┐
│ Data Storage Layers │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Database (PostgreSQL): │
│ • AES-256 encryption │
│ • Transparent Data Encryption (TDE) │
│ • Column-level encryption for sensitive data │
│ • Encrypted backups │
│ │
│ File Storage: │
│ • Server configuration files │
│ • Client private keys │
│ • SSL certificates │
│ • Log files │
│ │
│ Backup Storage: │
│ • Database backups │
│ • Configuration backups │
│ • Audit logs │
└─────────────────────────────────────────────────────────────────┘
# Nginx SSL/TLS Configuration
server {
listen 443 ssl http2;
server_name vpn.myprivatevpn.com;
# SSL/TLS Configuration
ssl_certificate /etc/letsencrypt/live/vpn.myprivatevpn.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/vpn.myprivatevpn.com/privkey.pem;
# Modern SSL/TLS protocols
ssl_protocols TLSv1.2 TLSv1.3;
# Strong cipher suites
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
# SSL session configuration
ssl_session_timeout 1d;
ssl_session_cache shared:SSL:50m;
ssl_session_tickets off;
# OCSP stapling
ssl_stapling on;
ssl_stapling_verify on;
# Security headers
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline';" always;
}| Level | Description | Protection Requirements |
|---|---|---|
| Public | Publicly available information | No special protection |
| Internal | Internal use only | Access controls, logging |
| Confidential | Sensitive business data | Encryption, access controls, monitoring |
| Restricted | Highly sensitive data | Encryption, strict access controls, monitoring, audit |
# Data classification decorator
from enum import Enum
from dataclasses import dataclass
from typing import Any, Optional
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class DataSecurityPolicy:
classification: DataClassification
encryption_required: bool
access_controls: list
audit_logging: bool
retention_period: Optional[int]
# Example usage
CLIENT_DATA_POLICY = DataSecurityPolicy(
classification=DataClassification.CONFIDENTIAL,
encryption_required=True,
access_controls=["authenticated_users"],
audit_logging=True,
retention_period=2555 # 7 years
)# WireGuard key management
import secrets
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
class WireGuardKeyManager:
def __init__(self, master_key: bytes):
self.master_key = master_key
self.kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b'wireguard_key_derivation',
iterations=100000,
)
def generate_keypair(self) -> tuple[str, str]:
"""Generate WireGuard key pair"""
private_key = secrets.token_bytes(32)
public_key = self._derive_public_key(private_key)
# Encrypt private key for storage
encrypted_private_key = self._encrypt_private_key(private_key)
return base64.b64encode(public_key).decode(), encrypted_private_key
def _derive_public_key(self, private_key: bytes) -> bytes:
"""Derive public key from private key"""
# Use WireGuard's Curve25519 implementation
from curve25519 import derive_public_key
return derive_public_key(private_key)
def _encrypt_private_key(self, private_key: bytes) -> str:
"""Encrypt private key using master key"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b'private_key_encryption',
iterations=100000,
)
key = kdf.derive(self.master_key)
# Encrypt with AES-GCM
# Implementation details...
return encrypted_key_base64# Key rotation configuration
KEY_ROTATION_CONFIG = {
"wireguard_keys": {
"rotation_interval_days": 90,
"grace_period_days": 30,
"notification_advance_days": 7,
"auto_rotation": True
},
"api_keys": {
"rotation_interval_days": 30,
"grace_period_days": 7,
"notification_advance_days": 3,
"auto_rotation": False
},
"ssl_certificates": {
"rotation_interval_days": 365,
"renewal_advance_days": 30,
"auto_renewal": True
},
"database_keys": {
"rotation_interval_days": 180,
"grace_period_days": 30,
"notification_advance_days": 14,
"auto_rotation": False
}
}┌─────────────────────────────────────────────────────────────────┐
│ WireGuard Security │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Cryptography: │
│ • Curve25519 for key exchange │
│ • ChaCha20-Poly1305 for authenticated encryption │
│ • BLAKE2s for hashing │
│ • SipHash24 for hash tables │
│ │
│ Protocol Security: │
│ • UDP-based transport │
│ • Perfect forward secrecy │
│ • Automatic key rotation │
│ • Built-in anti-replay │
│ │
│ Configuration Security: │
│ • Private key never transmitted │
│ • Pre-shared key support │
│ • Connection state validation │
│ • Firewall integration │
└─────────────────────────────────────────────────────────────────┘
# WireGuard network configuration
# /etc/wireguard/wg0.conf
[Interface]
Address = 10.8.0.1/24
ListenPort = 51820
PrivateKey = <server-private-key>
# Security settings
PostUp = iptables -A FORWARD -i %i -j ACCEPT
PostUp = iptables -A FORWARD -o %i -j ACCEPT
PostUp = iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE
PostDown = iptables -D FORWARD -i %i -j ACCEPT
PostDown = iptables -D FORWARD -o %i -j ACCEPT
PostDown = iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE
# Anti-spoofing
PostUp = iptables -A FORWARD -i %i -m state --state NEW -m recent --set
PostUp = iptables -A FORWARD -i %i -m state --state NEW -m recent --update --seconds 60 --hitcount 5 -j DROP
# Rate limiting
PostUp = iptables -A FORWARD -i %i -m limit --limit 30/minute --limit-burst 5 -j ACCEPT
# Client configuration
[Peer]
PublicKey = <client-public-key>
AllowedIPs = 10.8.0.2/32
PersistentKeepalive = 25#!/bin/bash
# firewall-config.sh
# Reset UFW
ufw --force reset
# Default policies
ufw default deny incoming
ufw default allow outgoing
ufw default deny forward
# SSH access (limit connections)
ufw limit 22/tcp
# HTTP/HTTPS
ufw allow 80/tcp
ufw allow 443/tcp
# WireGuard VPN
ufw allow 51820/udp
# Application API (restrict to specific IPs)
ufw allow from 10.0.0.0/8 to any port 8000
# Rate limiting for common ports
ufw limit 22/tcp
ufw limit 51820/udp
# Log denied packets
ufw logging on
# Enable UFW
ufw --force enable#!/bin/bash
# iptables-rules.sh
# Flush existing rules
iptables -F
iptables -X
iptables -t nat -F
iptables -t nat -X
iptables -t mangle -F
iptables -t mangle -X
# Set default policies
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT
# Allow loopback
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT
# Allow established connections
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
# Allow SSH with rate limiting
iptables -A INPUT -p tcp --dport 22 -m state --state NEW -m recent --set
iptables -A INPUT -p tcp --dport 22 -m state --state NEW -m recent --update --seconds 60 --hitcount 4 -j DROP
iptables -A INPUT -p tcp --dport 22 -j ACCEPT
# Allow HTTP/HTTPS
iptables -A INPUT -p tcp --dport 80 -j ACCEPT
iptables -A INPUT -p tcp --dport 443 -j ACCEPT
# Allow WireGuard with rate limiting
iptables -A INPUT -p udp --dport 51820 -m state --state NEW -m recent --set
iptables -A INPUT -p udp --dport 51820 -m state --state NEW -m recent --update --seconds 60 --hitcount 10 -j DROP
iptables -A INPUT -p udp --dport 51820 -j ACCEPT
# Drop invalid packets
iptables -A INPUT -m state --state INVALID -j DROP
iptables -A OUTPUT -m state --state INVALID -j DROP
iptables -A FORWARD -m state --state INVALID -j DROP
# Log dropped packets
iptables -A INPUT -j LOG --log-prefix "iptables denied: " --log-level 7
iptables -A FORWARD -j LOG --log-prefix "iptables denied: " --log-level 7# Cloudflare security configuration
security_settings:
ddos_protection:
enabled: true
sensitivity: "high"
challenge_passage: "success"
bot_fight_mode:
enabled: true
challenge_passage: "success"
browser_integrity_check:
enabled: true
hotlink_protection:
enabled: false
security_level: "high"
ssl_mode: "full_strict"
always_online: true
caching_level: "simplified"
security_headers:
strict_transport_security: "max-age=31536000; includeSubDomains; preload"
x_frame_options: "SAMEORIGIN"
x_content_type_options: "nosniff"
x_xss_protection: "1; mode=block"
referrer_policy: "strict-origin-when-cross-origin"# Rate limiting configuration
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
# API rate limits
API_RATE_LIMITS = {
"auth/login": "5/minute",
"auth/register": "3/minute",
"api/servers": "100/minute",
"api/clients": "50/minute",
"api/connections": "200/minute",
"api/metrics": "1000/minute"
}
# WireGuard connection rate limits
VPN_RATE_LIMITS = {
"connection_attempts": "10/minute",
"key_generation": "5/minute",
"server_restart": "3/hour"
}
# Application rate limit decorator
@limiter.limit("100/minute")
async def get_servers(request: Request):
# API endpoint implementation
passfrom pydantic import BaseModel, validator, Field
from typing import Optional
import re
from html import escape
class CreateClientRequest(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
email: Optional[str] = Field(None, regex=r'^[^@]+@[^@]+\.[^@]+$')
platform: str = Field(..., regex='^(windows|macos|linux|ios|android)$')
description: Optional[str] = Field(None, max_length=500)
@validator('name')
def validate_name(cls, v):
# Remove potential XSS payloads
v = escape(v)
# Remove SQL injection patterns
v = re.sub(r"['\";\\/*-]", "", v)
# Remove path traversal attempts
v = re.sub(r"\.\./", "", v)
# Remove command injection patterns
v = re.sub(r"[;&|`$()]", "", v)
return v.strip()
@validator('email')
def validate_email(cls, v):
if v:
# Ensure email is properly encoded
v = v.strip().lower()
# Basic email validation
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
raise ValueError('Invalid email format')
return v
# SQL injection prevention
def safe_query_execution(query: str, params: dict):
# Use parameterized queries
if not is_safe_query(query):
raise SecurityError("Potentially unsafe query detected")
return database.execute(query, params)
def is_safe_query(query: str) -> bool:
# Check for SQL injection patterns
dangerous_patterns = [
r"(\b(union|select|insert|update|delete|drop|create|alter|exec|execute)\b)",
r"(--|;|/\\*|\\*/)",
r"(\bor\b|\band\b).*=\s*['\"]?['\"]?",
]
for pattern in dangerous_patterns:
if re.search(pattern, query, re.IGNORECASE):
return False
return Trueimport html
import json
from markupsafe import Markup, escape
def safe_json_response(data: dict) -> str:
"""Ensure JSON response is properly encoded"""
return json.dumps(data, default=str)
def safe_html_output(user_input: str) -> str:
"""Escape HTML to prevent XSS"""
return escape(user_input)
def safe_url_parameter(parameter: str) -> str:
"""URL encode and validate URL parameters"""
# Remove dangerous characters
dangerous_chars = ['<', '>', '"', "'", '&', '/', '\\']
for char in dangerous_chars:
parameter = parameter.replace(char, '')
# URL encode
from urllib.parse import quote
return quote(parameter)# Nginx security headers configuration
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
# Content Security Policy
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self' data:; connect-src 'self' wss: https:; media-src 'self'; object-src 'none'; child-src 'self'; frame-ancestors 'self';" always;
# HTTP Strict Transport Security
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
# Permissions Policy
add_header Permissions-Policy "geolocation=(), microphone=(), camera=(), payment=(), usb=(), vr=()" always;
# Feature Policy
add_header Feature-Policy "geolocation 'none'; microphone 'none'; camera 'none'" always;from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://myprivatevpn.com",
"https://dashboard.myprivatevpn.com"
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
expose_headers=["X-Total-Count", "X-Rate-Limit-Remaining"],
max_age=600
)
# Pre-flight request handling
@app.options("/{path:path}")
async def handle_options(path: str):
return {}from datetime import datetime, timedelta
import secrets
from typing import Optional
class SecureSessionManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.active_sessions = {}
self.session_timeout = timedelta(hours=24)
self.max_concurrent_sessions = 5
def create_session(self, user_id: str, ip_address: str) -> str:
"""Create a new secure session"""
# Check concurrent session limit
user_sessions = [s for s in self.active_sessions.values()
if s['user_id'] == user_id]
if len(user_sessions) >= self.max_concurrent_sessions:
# Remove oldest session
oldest_session = min(user_sessions, key=lambda x: x['created_at'])
self._invalidate_session(oldest_session['session_id'])
# Generate secure session token
session_id = secrets.token_urlsafe(32)
session_token = secrets.token_urlsafe(32)
# Store session
self.active_sessions[session_token] = {
'session_id': session_id,
'user_id': user_id,
'created_at': datetime.utcnow(),
'last_accessed': datetime.utcnow(),
'ip_address': ip_address,
'user_agent': None # Store user agent for validation
}
return session_token
def validate_session(self, session_token: str, ip_address: str) -> Optional[str]:
"""Validate session token"""
if session_token not in self.active_sessions:
return None
session = self.active_sessions[session_token]
# Check session timeout
if datetime.utcnow() - session['last_accessed'] > self.session_timeout:
self._invalidate_session(session_token)
return None
# Validate IP address (optional, can be relaxed for mobile)
if session['ip_address'] != ip_address:
# Log potential security event
self._log_security_event('IP_MISMATCH', session_token)
# Update last accessed time
session['last_accessed'] = datetime.utcnow()
return session['user_id']
def invalidate_session(self, session_token: str):
"""Invalidate session"""
self._invalidate_session(session_token)
def _invalidate_session(self, session_token: str):
"""Internal session invalidation"""
if session_token in self.active_sessions:
del self.active_sessions[session_token]
def _log_security_event(self, event_type: str, session_token: str):
"""Log security events for monitoring"""
# Implementation for security event logging
pass#!/bin/bash
# system-hardening.sh
echo "Starting system hardening..."
# 1. Update system packages
apt update && apt upgrade -y
# 2. Configure automatic security updates
cat > /etc/apt/apt.conf.d/50unattended-upgrades << EOF
Unattended-Upgrade::Allowed-Origins {
"\${distro_id}:\${distro_codename}-security";
"\${distro_id}ESMApps:\${distro_codename}-apps-security";
"\${distro_id}ESM:\${distro_codename}-infra-security";
};
Unattended-Upgrade::AutoFixInterruptedDpkg "true";
Unattended-Upgrade::MinimalSteps "true";
Unattended-Upgrade::Remove-Unused-Dependencies "true";
Unattended-Upgrade::Automatic-Reboot "true";
Unattended-Upgrade::Automatic-Reboot-Time "02:00";
EOF
# 3. Configure fail2ban
cat > /etc/fail2ban/jail.local << EOF
[DEFAULT]
bantime = 3600
findtime = 600
maxretry = 3
ignoreip = 127.0.0.1/8 ::1
[sshd]
enabled = true
port = ssh
filter = sshd
logpath = /var/log/auth.log
maxretry = 3
[nginx-http-auth]
enabled = true
filter = nginx-http-auth
port = http,https
logpath = /var/log/nginx/error.log
maxretry = 3
[nginx-limit-req]
enabled = true
filter = nginx-limit-req
port = http,https
logpath = /var/log/nginx/error.log
maxretry = 10
EOF
# 4. Secure kernel parameters
cat > /etc/sysctl.d/99-security.conf << EOF
# IP Spoofing protection
net.ipv4.conf.default.rp_filter=1
net.ipv4.conf.all.rp_filter=1
# Ignore ICMP redirects
net.ipv4.conf.all.accept_redirects = 0
net.ipv4.conf.default.accept_redirects = 0
net.ipv6.conf.all.accept_redirects = 0
net.ipv6.conf.default.accept_redirects = 0
# Ignore send redirects
net.ipv4.conf.all.send_redirects = 0
net.ipv4.conf.default.send_redirects = 0
# Disable source packet routing
net.ipv4.conf.all.accept_source_route = 0
net.ipv6.conf.all.accept_source_route = 0
net.ipv4.conf.default.accept_source_route = 0
net.ipv6.conf.default.accept_source_route = 0
# Log Martians
net.ipv4.conf.all.log_martians = 1
net.ipv4.conf.default.log_martians = 1
# Ignore ICMP pings
net.ipv4.icmp_echo_ignore_all = 1
# Ignore Directed pings
net.ipv4.icmp_echo_ignore_broadcasts = 1
# Disable IPv6 if not needed
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
# TCP SYN flood protection
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_max_syn_backlog = 2048
net.ipv4.tcp_synack_retries = 2
# Increase file limits
fs.file-max = 2097152
EOF
# 5. Configure automatic security updates
dpkg-reconfigure --priority=low unattended-upgrades
systemctl enable unattended-upgrades
systemctl start unattended-upgrades
# 6. Configure auditd for security auditing
apt install auditd audispd-plugins -y
# 7. Remove unnecessary packages
apt autoremove -y
apt autoclean
# 8. Configure automatic security updates
cat > /etc/apt/apt.conf.d/20auto-upgrades << EOF
APT::Periodic::Update-Package-Lists "1";
APT::Periodic::Download-Upgradeable-Packages "1";
APT::Periodic::AutocleanInterval "7";
APT::Periodic::Unattended-Upgrade "1";
EOF
echo "System hardening completed!"# File permissions hardening
# /etc/passwd and /etc/shadow
chmod 644 /etc/passwd
chmod 600 /etc/shadow
# SSH configuration
chmod 600 /etc/ssh/sshd_config
# WireGuard configuration
chmod 600 /etc/wireguard/*.conf
# Application directories
chown -R vpn:vpn /opt/myprivatevpn
chmod 750 /opt/myprivatevpn
# Log directories
chmod 755 /var/log
chmod 750 /var/log/myprivatevpn
# Temporary files
chmod 1777 /tmp
# Set secure umask
echo 'umask 027' >> /etc/bash.bashrc
echo 'umask 027' >> /etc/profile# Dockerfile with security best practices
FROM python:3.11-slim as builder
# Create non-root user
RUN groupadd -r vpn && useradd -r -g vpn vpn
# Install security updates
RUN apt-get update && apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Production stage
FROM python:3.11-slim as production
# Install security updates
RUN apt-get update && apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
dumb-init \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
# Create non-root user
RUN groupadd -r vpn && useradd -r -g vpn -d /app -s /sbin/nologin vpn
# Set working directory
WORKDIR /app
# Copy application from builder
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
# Copy application code
COPY --chown=vpn:vpn . .
# Switch to non-root user
USER vpn
# Use dumb-init to handle signals properly
ENTRYPOINT ["dumb-init", "--"]
# Run application as non-root user
CMD ["python", "main.py"]
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1# docker-compose.yml with security configurations
version: '3.8'
services:
vpn-api:
build: .
container_name: vpn-api
user: "1001:1001" # Non-root user
read_only: true # Read-only root filesystem
tmpfs:
- /tmp # Temporary files in memory
- /var/run # Runtime files
cap_drop:
- ALL # Drop all capabilities
cap_add:
- NET_BIND_SERVICE # Only bind to privileged ports
security_opt:
- no-new-privileges:true
cap_add:
- CHOWN
- SETGID
- SETUID
ulimits:
nproc: 65535
nofile:
soft: 1024
hard: 2048
environment:
- PYTHONUNBUFFERED=1
- PYTHONDONTWRITEBYTECODE=1
networks:
- vpn-network
volumes:
- ./logs:/app/logs:rw
- /etc/wireguard:/etc/wireguard:ro
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
container_name: redis
read_only: true
tmpfs:
- /tmp
cap_drop:
- ALL
cap_add:
- SETUID
- SETGID
security_opt:
- no-new-privileges:true
networks:
- vpn-network
volumes:
- redis-data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
container_name: nginx
read_only: true
cap_drop:
- ALL
cap_add:
- CHOWN
- SETGID
- SETUID
- NET_BIND_SERVICE
security_opt:
- no-new-privileges:true
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
- ./static:/var/www/static:ro
depends_on:
- vpn-api
networks:
- vpn-network
restart: unless-stopped
networks:
vpn-network:
driver: bridge
driver_opts:
com.docker.network.bridge.name: vpnnet
com.docker.network.driver.mtu: 1500
volumes:
redis-data:import logging
import json
from datetime import datetime
from typing import Dict, Any
class SecurityEventLogger:
def __init__(self):
self.logger = logging.getLogger('security')
self.logger.setLevel(logging.INFO)
# File handler for security events
handler = logging.FileHandler('/var/log/security/events.log')
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_authentication_event(self, event_type: str, user_id: str,
ip_address: str, success: bool,
details: Dict[str, Any] = None):
"""Log authentication events"""
event = {
'event_type': 'authentication',
'sub_type': event_type,
'user_id': user_id,
'ip_address': ip_address,
'success': success,
'timestamp': datetime.utcnow().isoformat(),
'details': details or {}
}
level = logging.INFO if success else logging.WARNING
self.logger.log(level, json.dumps(event))
def log_authorization_event(self, event_type: str, user_id: str,
resource: str, action: str, result: str,
details: Dict[str, Any] = None):
"""Log authorization events"""
event = {
'event_type': 'authorization',
'sub_type': event_type,
'user_id': user_id,
'resource': resource,
'action': action,
'result': result,
'timestamp': datetime.utcnow().isoformat(),
'details': details or {}
}
level = logging.INFO if result == 'granted' else logging.WARNING
self.logger.log(level, json.dumps(event))
def log_data_access_event(self, event_type: str, user_id: str,
resource: str, access_type: str,
details: Dict[str, Any] = None):
"""Log data access events"""
event = {
'event_type': 'data_access',
'sub_type': event_type,
'user_id': user_id,
'resource': resource,
'access_type': access_type,
'timestamp': datetime.utcnow().isoformat(),
'details': details or {}
}
self.logger.info(json.dumps(event))
def log_security_incident(self, incident_type: str, severity: str,
description: str, affected_resources: list,
details: Dict[str, Any] = None):
"""Log security incidents"""
event = {
'event_type': 'security_incident',
'incident_type': incident_type,
'severity': severity,
'description': description,
'affected_resources': affected_resources,
'timestamp': datetime.utcnow().isoformat(),
'details': details or {}
}
level = (
logging.CRITICAL if severity == 'critical' else
logging.ERROR if severity == 'high' else
logging.WARNING
)
self.logger.log(level, json.dumps(event))import numpy as np
from collections import defaultdict, deque
from datetime import datetime, timedelta
from typing import Dict, List
class SecurityAnomalyDetector:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.failed_logins = defaultdict(deque)
self.request_rates = defaultdict(deque)
self.response_times = defaultdict(deque)
def check_failed_login_pattern(self, ip_address: str, user_id: str) -> bool:
"""Check for brute force attack patterns"""
now = datetime.utcnow()
window_start = now - timedelta(minutes=15)
# Clean old entries
while self.failed_logins[ip_address] and \
self.failed_logins[ip_address][0]['timestamp'] < window_start:
self.failed_logins[ip_address].popleft()
# Check for multiple failed logins
recent_failures = [
entry for entry in self.failed_logins[ip_address]
if entry['user_id'] == user_id
]
# Check for more than 5 failed logins in 15 minutes
if len(recent_failures) >= 5:
return True
# Check for more than 10 failed logins from same IP in 15 minutes
if len(self.failed_logins[ip_address]) >= 10:
return True
return False
def check_unusual_request_pattern(self, ip_address: str,
endpoint: str) -> bool:
"""Check for unusual request patterns"""
now = datetime.utcnow()
window_start = now - timedelta(minutes=5)
# Clean old entries
while self.request_rates[ip_address] and \
self.request_rates[ip_address][0]['timestamp'] < window_start:
self.request_rates[ip_address].popleft()
# Count requests to same endpoint
recent_requests = [
entry for entry in self.request_rates[ip_address]
if entry['endpoint'] == endpoint
]
# Check for more than 50 requests to same endpoint in 5 minutes
if len(recent_requests) >= 50:
return True
return False
def check_response_time_anomaly(self, endpoint: str,
response_time: float) -> bool:
"""Check for response time anomalies"""
now = datetime.utcnow()
window_start = now - timedelta(hours=1)
# Clean old entries
while self.response_times[endpoint] and \
self.response_times[endpoint][0]['timestamp'] < window_start:
self.response_times[endpoint].popleft()
# Add current response time
self.response_times[endpoint].append({
'timestamp': now,
'response_time': response_time
})
# Check for anomalies if we have enough data
if len(self.response_times[endpoint]) >= 20:
response_times = [
entry['response_time']
for entry in self.response_times[endpoint]
]
mean = np.mean(response_times)
std = np.std(response_times)
# Check if current response time is more than 3 standard deviations from mean
if abs(response_time - mean) > 3 * std:
return True
return False
def record_failed_login(self, ip_address: str, user_id: str):
"""Record a failed login attempt"""
self.failed_logins[ip_address].append({
'timestamp': datetime.utcnow(),
'user_id': user_id
})
def record_request(self, ip_address: str, endpoint: str):
"""Record an API request"""
self.request_rates[ip_address].append({
'timestamp': datetime.utcnow(),
'endpoint': endpoint
})from enum import Enum
from functools import wraps
from fastapi import HTTPException, Depends, Request
from typing import List, Optional
class Permission(Enum):
# User management
USER_CREATE = "user:create"
USER_READ = "user:read"
USER_UPDATE = "user:update"
USER_DELETE = "user:delete"
# Server management
SERVER_CREATE = "server:create"
SERVER_READ = "server:read"
SERVER_UPDATE = "server:update"
SERVER_DELETE = "server:delete"
SERVER_RESTART = "server:restart"
# Client management
CLIENT_CREATE = "client:create"
CLIENT_READ = "client:read"
CLIENT_UPDATE = "client:update"
CLIENT_DELETE = "client:delete"
# Connection management
CONNECTION_CREATE = "connection:create"
CONNECTION_READ = "connection:read"
CONNECTION_DELETE = "connection:delete"
# Metrics and monitoring
METRICS_READ = "metrics:read"
METRICS_EXPORT = "metrics:export"
# Admin functions
ADMIN_AUDIT_LOGS = "admin:audit_logs"
ADMIN_SYSTEM_CONFIG = "admin:system_config"
ADMIN_SECURITY_SETTINGS = "admin:security_settings"
# Role-based permissions
ROLE_PERMISSIONS = {
"admin": list(Permission),
"user": [
Permission.CLIENT_CREATE,
Permission.CLIENT_READ,
Permission.CLIENT_UPDATE,
Permission.CLIENT_DELETE,
Permission.CONNECTION_CREATE,
Permission.CONNECTION_READ,
Permission.CONNECTION_DELETE,
Permission.METRICS_READ,
],
"readonly": [
Permission.USER_READ,
Permission.SERVER_READ,
Permission.CLIENT_READ,
Permission.CONNECTION_READ,
Permission.METRICS_READ,
]
}
def require_permission(permission: Permission):
"""Decorator to require specific permission"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Get current user from request
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if not request:
raise HTTPException(status_code=500, detail="Request not found")
user = getattr(request.state, 'user', None)
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
# Check if user has required permission
user_role = user.get('role', 'readonly')
if permission not in ROLE_PERMISSIONS[user_role]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(*args, **kwargs)
return wrapper
return decorator
# Usage example
@router.post("/clients")
@require_permission(Permission.CLIENT_CREATE)
async def create_client(client_data: CreateClientRequest,
current_user: dict = Depends(get_current_user)):
# Implementation
passimport asyncio
from datetime import datetime
from typing import Dict, Any, Optional
from sqlalchemy import text
class AuditLogger:
def __init__(self, database):
self.db = database
async def log_user_action(self, user_id: str, action: str,
resource: str, details: Dict[str, Any] = None):
"""Log user actions for audit trail"""
await self._write_audit_log({
'event_type': 'user_action',
'user_id': user_id,
'action': action,
'resource': resource,
'details': details or {},
'timestamp': datetime.utcnow().isoformat(),
'source_ip': details.get('source_ip') if details else None,
'user_agent': details.get('user_agent') if details else None
})
async def log_system_event(self, event_type: str, component: str,
details: Dict[str, Any] = None):
"""Log system events"""
await self._write_audit_log({
'event_type': 'system_event',
'event_type_detail': event_type,
'component': component,
'details': details or {},
'timestamp': datetime.utcnow().isoformat()
})
async def log_security_event(self, event_type: str, severity: str,
description: str, details: Dict[str, Any] = None):
"""Log security events"""
await self._write_audit_log({
'event_type': 'security_event',
'event_type_detail': event_type,
'severity': severity,
'description': description,
'details': details or {},
'timestamp': datetime.utcnow().isoformat()
})
async def log_data_access(self, user_id: str, resource: str,
operation: str, data_id: Optional[str] = None,
details: Dict[str, Any] = None):
"""Log data access events"""
await self._write_audit_log({
'event_type': 'data_access',
'user_id': user_id,
'resource': resource,
'operation': operation,
'data_id': data_id,
'details': details or {},
'timestamp': datetime.utcnow().isoformat()
})
async def _write_audit_log(self, log_entry: Dict[str, Any]):
"""Write audit log entry to database"""
query = text("""
INSERT INTO audit_logs (
event_type, event_data, timestamp, user_id
) VALUES (
:event_type, :event_data, :timestamp, :user_id
)
""")
await self.db.execute(query, {
'event_type': log_entry['event_type'],
'event_data': json.dumps(log_entry),
'timestamp': log_entry['timestamp'],
'user_id': log_entry.get('user_id')
})from datetime import datetime, timedelta
from typing import List, Optional
class GDPRCompliance:
def __init__(self, database):
self.db = database
async def export_user_data(self, user_id: str) -> Dict[str, Any]:
"""Export all user data (Right to Access)"""
# Collect all user data
user_data = {}
# Basic user info
user_query = text("SELECT * FROM auth.users WHERE id = :user_id")
user_result = await self.db.fetch_one(user_query, {'user_id': user_id})
user_data['user_info'] = dict(user_result) if user_result else {}
# Client configurations
clients_query = text("SELECT * FROM vpn_clients WHERE user_id = :user_id")
clients = await self.db.fetch_all(clients_query, {'user_id': user_id})
user_data['clients'] = [dict(client) for client in clients]
# Connection history
connections_query = text("""
SELECT vc.*, vs.name as server_name
FROM vpn_connections vc
JOIN vpn_clients vcl ON vc.client_id = vcl.id
JOIN vpn_servers vs ON vc.server_id = vs.id
WHERE vcl.user_id = :user_id
""")
connections = await self.db.fetch_all(connections_query, {'user_id': user_id})
user_data['connections'] = [dict(conn) for conn in connections]
# Audit logs
audit_query = text("SELECT * FROM audit_logs WHERE user_id = :user_id")
audit_logs = await self.db.fetch_all(audit_query, {'user_id': user_id})
user_data['audit_logs'] = [dict(log) for log in audit_logs]
return user_data
async def delete_user_data(self, user_id: str,
verify_identity: bool = True) -> bool:
"""Delete all user data (Right to Erasure)"""
if verify_identity:
# Add identity verification logic
pass
try:
async with self.db.begin() as conn:
# Delete in order to respect foreign key constraints
await conn.execute(text("DELETE FROM audit_logs WHERE user_id = :user_id"),
{'user_id': user_id})
await conn.execute(text("DELETE FROM vpn_connections WHERE client_id IN (SELECT id FROM vpn_clients WHERE user_id = :user_id)"),
{'user_id': user_id})
await conn.execute(text("DELETE FROM vpn_clients WHERE user_id = :user_id"),
{'user_id': user_id})
await conn.execute(text("DELETE FROM auth.users WHERE id = :user_id"),
{'user_id': user_id})
return True
except Exception as e:
# Log deletion failure
await self.audit_logger.log_security_event(
'DATA_DELETION_FAILED',
'high',
f'Failed to delete user data for {user_id}',
{'error': str(e)}
)
return False
async def update_user_consent(self, user_id: str, consent_data: Dict[str, Any]):
"""Update user consent preferences"""
await self._write_consent_record(user_id, consent_data)
async def _write_consent_record(self, user_id: str, consent_data: Dict[str, Any]):
"""Write consent record for audit purposes"""
query = text("""
INSERT INTO user_consent (user_id, consent_type, consent_given,
consent_date, ip_address, user_agent)
VALUES (:user_id, :consent_type, :consent_given, :consent_date,
:ip_address, :user_agent)
""")
await self.db.execute(query, {
'user_id': user_id,
'consent_type': consent_data.get('consent_type'),
'consent_given': consent_data.get('consent_given'),
'consent_date': datetime.utcnow().isoformat(),
'ip_address': consent_data.get('ip_address'),
'user_agent': consent_data.get('user_agent')
})class SOC2Compliance:
def __init__(self, security_logger: SecurityEventLogger):
self.security_logger = security_logger
self.controls = {
'CC6.1': self._validate_logical_access_controls,
'CC6.2': self._validate_system_boundaries,
'CC6.3': self._validate_data_protection,
'CC7.1': self._detect_security_events,
'CC7.2': self._monitor_security_events,
'CC7.3': self._investigate_security_events,
'CC8.1': self._manage_system_changes
}
async def _validate_logical_access_controls(self) -> Dict[str, Any]:
"""CC6.1 - Logical and Physical Access Controls"""
return {
'status': 'pass',
'controls_tested': [
'Multi-factor authentication implemented',
'Role-based access controls enforced',
'Session management controls active',
'Password policies enforced',
'Account lockout mechanisms active'
],
'last_tested': datetime.utcnow().isoformat()
}
async def _validate_system_boundaries(self) -> Dict[str, Any]:
"""CC6.2 - System Boundaries and Components"""
return {
'status': 'pass',
'controls_tested': [
'Network segmentation implemented',
'Firewall rules configured',
'VPN access controls enforced',
'Data transmission encryption verified'
],
'last_tested': datetime.utcnow().isoformat()
}
async def _validate_data_protection(self) -> Dict[str, Any]:
"""CC6.3 - Data Protection"""
return {
'status': 'pass',
'controls_tested': [
'Encryption at rest verified',
'Encryption in transit verified',
'Key management controls active',
'Data classification implemented'
],
'last_tested': datetime.utcnow().isoformat()
}
async def generate_soc2_report(self) -> Dict[str, Any]:
"""Generate SOC 2 compliance report"""
control_results = {}
overall_status = 'pass'
for control_id, control_test in self.controls.items():
try:
result = await control_test()
control_results[control_id] = result
if result['status'] != 'pass':
overall_status = 'fail'
except Exception as e:
control_results[control_id] = {
'status': 'error',
'error': str(e)
}
overall_status = 'error'
return {
'report_date': datetime.utcnow().isoformat(),
'overall_status': overall_status,
'control_results': control_results,
'evidence_retention_period': '7 years',
'next_assessment_date': (datetime.utcnow() + timedelta(days=365)).isoformat()
}from typing import List, Optional
class DataMinimization:
"""Implement privacy by design principles"""
@staticmethod
def get_minimal_user_profile(required_fields: List[str]) -> Dict[str, str]:
"""Return only required user data"""
allowed_fields = {'id', 'email', 'full_name', 'created_at'}
# Validate required fields are allowed
for field in required_fields:
if field not in allowed_fields:
raise ValueError(f"Field '{field}' not allowed for minimal profile")
return {field: field for field in required_fields}
@staticmethod
def anonymize_logs(log_data: Dict[str, Any]) -> Dict[str, Any]:
"""Anonymize sensitive data in logs"""
anonymized = log_data.copy()
# Remove or hash IP addresses
if 'ip_address' in anonymized:
anonymized['ip_address'] = DataMinimization._hash_ip(anonymized['ip_address'])
# Remove or hash email addresses
if 'email' in anonymized:
anonymized['email'] = DataMinimization._hash_email(anonymized['email'])
# Remove sensitive details
sensitive_fields = ['password', 'private_key', 'session_token']
for field in sensitive_fields:
anonymized.pop(field, None)
return anonymized
@staticmethod
def _hash_ip(ip_address: str) -> str:
"""Hash IP address for privacy"""
import hashlib
return hashlib.sha256(ip_address.encode()).hexdigest()[:16]
@staticmethod
def _hash_email(email: str) -> str:
"""Hash email address for privacy"""
import hashlib
return hashlib.sha256(email.encode()).hexdigest()[:16]import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Set
from dataclasses import dataclass
@dataclass
class SecurityAlert:
alert_type: str
severity: str
description: str
source_ip: str
user_id: Optional[str]
timestamp: datetime
details: Dict[str, Any]
class ThreatDetectionEngine:
def __init__(self, alert_threshold: int = 5):
self.alert_threshold = alert_threshold
self.failed_attempts: Dict[str, List[datetime]] = {}
self.active_sessions: Dict[str, Dict] = {}
self.known_threat_ips: Set[str] = set()
self.alert_handlers = []
async def monitor_authentication_attempts(self, ip_address: str,
user_id: Optional[str],
success: bool,
user_agent: str = None):
"""Monitor authentication attempts for threats"""
if not success:
await self._record_failed_attempt(ip_address, user_id)
# Check for brute force attacks
if self._is_brute_force_attack(ip_address):
alert = SecurityAlert(
alert_type='BRUTE_FORCE_ATTACK',
severity='HIGH',
description=f'Brute force attack detected from {ip_address}',
source_ip=ip_address,
user_id=user_id,
timestamp=datetime.utcnow(),
details={'user_agent': user_agent}
)
await self._trigger_alert(alert)
# Check for credential stuffing
if self._is_credential_stuffing(ip_address, user_agent):
alert = SecurityAlert(
alert_type='CREDENTIAL_STUFFING',
severity='CRITICAL',
description=f'Credential stuffing attack from {ip_address}',
source_ip=ip_address,
user_id=user_id,
timestamp=datetime.utcnow(),
details={'user_agent': user_agent}
)
await self._trigger_alert(alert)
def _is_brute_force_attack(self, ip_address: str) -> bool:
"""Detect brute force attacks"""
if ip_address not in self.failed_attempts:
return False
now = datetime.utcnow()
recent_failures = [
attempt for attempt in self.failed_attempts[ip_address]
if now - attempt < timedelta(minutes=15)
]
return len(recent_failures) >= self.alert_threshold
def _is_credential_stuffing(self, ip_address: str, user_agent: str) -> bool:
"""Detect credential stuffing attacks"""
# Check for multiple failed attempts with different user agents
if ip_address not in self.failed_attempts:
return False
# Simple heuristic: more than 20 failures from same IP in 15 minutes
now = datetime.utcnow()
recent_failures = [
attempt for attempt in self.failed_attempts[ip_address]
if now - attempt < timedelta(minutes=15)
]
return len(recent_failures) >= 20
async def _record_failed_attempt(self, ip_address: str, user_id: Optional[str]):
"""Record a failed authentication attempt"""
now = datetime.utcnow()
if ip_address not in self.failed_attempts:
self.failed_attempts[ip_address] = []
self.failed_attempts[ip_address].append(now)
# Clean old entries
cutoff = now - timedelta(hours=1)
self.failed_attempts[ip_address] = [
attempt for attempt in self.failed_attempts[ip_address]
if attempt > cutoff
]
async def _trigger_alert(self, alert: SecurityAlert):
"""Trigger security alert"""
for handler in self.alert_handlers:
try:
await handler(alert)
except Exception as e:
# Log handler failure but don't let it break the alert
pass
# Also log to security monitoring
self.security_logger.log_security_incident(
alert.alert_type,
alert.severity,
alert.description,
[alert.source_ip],
alert.details
)# Secure coding examples
# 1. Input validation
def validate_input(data: dict) -> dict:
"""Validate and sanitize input data"""
# Use strict type checking
assert isinstance(data.get('name'), str), "Name must be a string"
assert len(data.get('name', '')) <= 100, "Name too long"
# Sanitize string inputs
name = data['name'].strip()
name = re.sub(r'[<>"\']', '', name) # Remove potential XSS
return {'name': name}
# 2. Secure password handling
import bcrypt
def hash_password(password: str) -> str:
"""Hash password securely"""
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""Verify password securely"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
# 3. Secure random number generation
import secrets
import string
def generate_secure_token(length: int = 32) -> str:
"""Generate cryptographically secure token"""
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(length))
# 4. Secure API key generation
def generate_api_key(user_id: str, permissions: List[str]) -> Dict[str, str]:
"""Generate secure API key with metadata"""
key_id = secrets.token_urlsafe(16)
key_secret = secrets.token_urlsafe(32)
# Never store the full key, only hash
key_hash = hashlib.sha256(key_secret.encode()).hexdigest()
return {
'key_id': key_id,
'key_display': f"mpv_{key_id[:8]}_{key_secret[:8]}",
'key_hash': key_hash,
'user_id': user_id,
'permissions': permissions,
'created_at': datetime.utcnow().isoformat()
}# Security Code Review Checklist
## Authentication & Authorization
- [ ] Passwords are hashed with appropriate algorithm (bcrypt, Argon2)
- [ ] JWT tokens have appropriate expiration times
- [ ] Multi-factor authentication implemented where required
- [ ] Role-based access controls properly enforced
- [ ] Session management is secure
## Input Validation
- [ ] All user inputs are validated and sanitized
- [ ] SQL injection prevention implemented (parameterized queries)
- [ ] XSS prevention implemented (output encoding)
- [ ] CSRF protection implemented
- [ ] File upload validation and restrictions
## Data Protection
- [ ] Sensitive data is encrypted at rest
- [ ] Data is encrypted in transit (HTTPS/TLS)
- [ ] Encryption keys are properly managed
- [ ] PII is handled according to privacy requirements
- [ ] Data retention policies are implemented
## Error Handling
- [ ] Error messages don't reveal sensitive information
- [ ] Exceptions are properly caught and logged
- [ ] System failures don't expose vulnerabilities
- [ ] Debug information is not exposed in production
## Network Security
- [ ] HTTPS is enforced for all communications
- [ ] Secure headers are implemented
- [ ] CORS is properly configured
- [ ] API rate limiting is implemented
- [ ] Network security controls are in place
## Configuration
- [ ] Default credentials are changed
- [ ] Debug mode is disabled in production
- [ ] Security-related configurations are properly set
- [ ] Environment variables are used for sensitive data
- [ ] Logging is configured appropriately# AWS Security Configuration
AWSSecurityConfig:
# VPC Configuration
vpc:
enable_vpc_flow_logs: true
enable_dns_hostnames: true
enable_dns_support: true
# Security Groups
security_groups:
vpn_servers:
inbound:
- protocol: udp
port: 51820
source: 0.0.0.0/0
- protocol: tcp
port: 22
source: admin_ips
outbound:
- protocol: all
destination: 0.0.0.0/0
application_servers:
inbound:
- protocol: tcp
port: 8000
source: load_balancer_sg
outbound:
- protocol: tcp
port: 5432
destination: database_sg
database:
inbound:
- protocol: tcp
port: 5432
source: application_servers_sg
outbound: []
# IAM Policies
iam_policies:
vpn_server_policy:
version: "2012-10-17"
statement:
- effect: allow
action:
- ec2:describeinstances
- ec2:describenetworkinterfaces
- s3:getobject
- s3:putobject
resource: "*"
# Encryption
encryption:
ebs_encryption: true
rds_encryption: true
s3_encryption: "AES256"
cloudtrail_encryption: true# Security monitoring configuration
SecurityMonitoring:
# CloudTrail Configuration
cloudtrail:
enabled: true
multi_region: true
include_global_events: true
log_file_validation: true
sns_notifications: true
# GuardDuty Configuration
guardduty:
enabled: true
publishing_destination: arn:aws:s3:::security-findings-bucket
IPS_set: arn:aws:guardduty::aws:detector/security Set
# Config Configuration
config:
enabled: true
delivery_channel: sns_topic
configuration_recorder:
all_supported: true
include_global_resource_types: true
# Security Hub Configuration
security_hub:
enabled: true
standards:
- "arn:aws:securityhub:::ruleset/cis-aws-foundations-benchmark/v/1.2.0"
- "arn:aws:securityhub:::ruleset/pci-dss/v/3.2.1"
# Macie Configuration
macie:
enabled: true
session_duration: 43200-
Enhanced Authentication
- WebAuthn/FIDO2 support
- Biometric authentication
- Certificate-based authentication
-
Improved Monitoring
- AI-powered anomaly detection
- Behavioral analysis
- Threat intelligence integration
-
Compliance Automation
- Automated compliance checking
- Real-time compliance monitoring
- Compliance reporting automation
-
Advanced Security Features
- Zero-trust network access
- Software-defined perimeters
- Microsegmentation
-
Enhanced Encryption
- Post-quantum cryptography
- Homomorphic encryption for sensitive data
- Advanced key management
-
Security Automation
- Automated incident response
- Self-healing security controls
- AI-powered threat hunting
-
Next-Generation Security
- Quantum-safe encryption
- Blockchain-based audit trails
- Autonomous security systems
-
Advanced Analytics
- Predictive security analytics
- Real-time risk assessment
- Adaptive security controls
Security is a continuous journey, not a destination. MyPrivateVPN Network implements a comprehensive, multi-layered security architecture that evolves with emerging threats and compliance requirements.
| Level | Description | Current Status |
|---|---|---|
| Level 1 | Basic Security | ✅ Complete |
| Level 2 | Managed Security | ✅ Complete |
| Level 3 | Defined Security | ✅ Complete |
| Level 4 | Quantitatively Managed Security | 🔄 In Progress |
| Level 5 | Optimizing Security | 📋 Planned |
- Vulnerability Management: < 24 hours critical, < 72 hours high
- Incident Response: < 15 minutes acknowledgment, < 2 hours resolution
- Compliance: 100% SOC 2, GDPR, HIPAA compliance
- Security Training: Annual security awareness for all personnel
- Penetration Testing: Quarterly external, monthly internal
- Transparency: Regular security reports and transparency reports
- Innovation: Continuous investment in security technology
- Education: Ongoing security education for users and staff
- Partnership: Collaboration with security researchers and community
- Compliance: Proactive compliance with emerging regulations
For questions about our security practices or to report security issues, contact security@myprivatevpn.com.
Security Guide Version: 1.0
Last Updated: January 2024
Maintained by: MyPrivateVPN Security Team