Skip to main content

KPS - Ksmux Pub Sub: Client Python ultra-performant pour système pub/sub basé sur ksmux

Project description

🐍 KPS - Ksmux Pub Sub Client Ultra-Performant

Client Python ultra-rapide pour le système pub/sub KPS (Ksmux Pub Sub) avec asyncio, uvloop, et orjson.

🚀 Installation Rapide avec UV

Prérequis

  • Python 3.11+
  • uv installé
# Installer uv si pas déjà fait
curl -LsSf https://astral.sh/uv/install.sh | sh

# Setup complet automatique
python setup.py setup

Installation Manuelle

# Créer l'environnement virtuel
uv venv

# Installer les dépendances
uv pip install websockets orjson uvloop

# Dépendances de développement (optionnel)
uv pip install pytest pytest-asyncio black ruff mypy

⚡ Performance

Optimisations Intégrées

  • asyncio natif pour concurrence maximale
  • uvloop pour 2-3x plus de performance (Linux/macOS)
  • orjson pour JSON ultra-rapide (5-10x plus rapide)
  • WebSocket optimisé sans compression
  • ThreadPoolExecutor pour callbacks utilisateur
  • Queue asynchrone avec buffer 1024

Benchmarks

📊 Publication: ~10,000 msg/s
📊 ACK System: ~5,000 ACK/s
📊 Latence: <1ms local, <10ms réseau

🎯 API Identique

Le client Python a exactement la même API que les clients Go et JavaScript :

import asyncio
from client import Client

async def main():
    # Créer le client
    client = await Client.NewClient(
        Id="my-python-app",
        Address="localhost:9313",
        Autorestart=True,
        OnId=lambda data, unsub: print(f"Direct: {data}"),
        OnDataWs=lambda data, conn: print(f"Message: {data}")
    )
    
    # Souscrire à un topic
    unsub = await client.Subscribe("events", 
        lambda data, unsub: print(f"Reçu: {data}"))
    
    # Publier des messages
    await client.Publish("events", "Hello World!")
    await client.PublishToID("target-client", {"type": "direct"})
    await client.PublishToServer("remote:9313", {"relay": True})
    
    # Système ACK complet
    ack = await client.PublishWithAck("events", "Important!", 5.0)
    responses = await ack.Wait()  # Attendre tous les ACK
    
    ack2 = await client.PublishToIDWithAck("client-id", "Direct ACK", 3.0)
    response, ok = await ack2.WaitAny()  # Premier ACK
    
    # Gestion ACK avancée
    status = await ack.GetStatus()  # Statut en temps réel
    complete = await ack.IsComplete()  # Tous reçus ?
    await ack.Cancel()  # Annuler l'attente
    
    await client.Close()

# Lancer avec uvloop automatique
asyncio.run(main())

🧪 Tests et Développement

Lancer les Tests

# Test complet avec le serveur Go
python test_python_client.py

# Ou avec uv
uv run python test_python_client.py

Outils de Développement

# Formater le code
uv run black .

# Linter
uv run ruff check .

# Type checking
uv run mypy .

# Tests unitaires
uv run pytest

Script de Setup Automatique

# Setup complet
python setup.py all

# Ou étape par étape
python setup.py setup    # Installation
python setup.py format   # Formatage
python setup.py test     # Tests

📋 Méthodes Disponibles

Client

# Connexion
client = await Client.NewClient(**options)
await client.Close()

# Pub/Sub basique
await client.Subscribe(topic, callback)
await client.Unsubscribe(topic)
await client.Publish(topic, data)

# Messages directs
await client.PublishToID(target_id, data)
await client.PublishToServer(addr, data, secure=False)

# Système ACK
ack = await client.PublishWithAck(topic, data, timeout)
ack = await client.PublishToIDWithAck(target_id, data, timeout)

ClientAck

# Attendre les réponses
responses = await ack.Wait()           # Tous les ACK
response, ok = await ack.WaitAny()     # Premier ACK

# Gestion avancée
status = await ack.GetStatus()         # Statut temps réel
complete = await ack.IsComplete()      # Tous reçus ?
await ack.Cancel()                     # Annuler

🔧 Configuration Avancée

Options de Connexion

client = await Client.NewClient(
    Id="unique-client-id",           # ID du client
    Address="localhost:9313",        # Serveur
    Secure=False,                    # WSS si True
    Path="/ws/bus",                  # Chemin WebSocket
    Autorestart=True,                # Reconnexion auto
    RestartEvery=10.0,               # Intervalle (secondes)
    OnDataWs=callback_all_messages,  # Tous les messages
    OnId=callback_direct_messages,   # Messages directs
    OnClose=callback_on_close        # Fermeture
)

Callbacks

def handle_message(data, unsub_fn):
    """Callback pour messages de topic"""
    print(f"Reçu: {data}")
    # unsub_fn() pour se désabonner

def handle_direct(data, subscriber):
    """Callback pour messages directs"""
    print(f"Direct: {data}")

def handle_all(data, conn):
    """Callback pour tous les messages WebSocket"""
    print(f"WS: {data}")

🐛 Debugging

Logs Détaillés

import logging
logging.basicConfig(level=logging.DEBUG)

# Le client affichera tous les détails
client = await Client.NewClient(...)

Monitoring Performance

import time

start = time.time()
await client.Publish("test", "data")
print(f"Publish: {(time.time() - start)*1000:.2f}ms")

🔗 Intégration

Avec FastAPI

from fastapi import FastAPI
from client import Client

app = FastAPI()
client = None

@app.on_event("startup")
async def startup():
    global client
    client = await Client.NewClient(Address="localhost:9313")

@app.post("/publish/{topic}")
async def publish(topic: str, data: dict):
    await client.Publish(topic, data)
    return {"status": "published"}

Avec Django Channels

from channels.generic.websocket import AsyncWebsocketConsumer
from client import Client

class BusConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.client = await Client.NewClient(
            Address="localhost:9313",
            OnId=self.handle_bus_message
        )
        await self.accept()
    
    async def handle_bus_message(self, data, unsub):
        await self.send_json(data)

📦 Distribution

Build avec uv

# Build du package
uv build

# Publier sur PyPI
uv publish

Docker

FROM python:3.12-slim

# Installer uv
RUN pip install uv

# Copier les fichiers
COPY . /app
WORKDIR /app

# Installer les dépendances
RUN uv pip install --system .

# Lancer l'application
CMD ["python", "test_python_client.py"]

🤝 Compatibilité

  • Python 3.11+
  • Linux (uvloop optimal)
  • macOS (uvloop optimal)
  • Windows (asyncio standard)
  • Docker/Kubernetes
  • FastAPI/Django/Flask

📈 Roadmap

  • Support MQTT bridge
  • Métriques Prometheus
  • Clustering automatique
  • Compression adaptative
  • Rate limiting intégré

Le client Python le plus rapide pour KPS (Ksmux Pub Sub) ! 🚀🐍

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ksps-1.0.0.tar.gz (4.7 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ksps-1.0.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file ksps-1.0.0.tar.gz.

File metadata

  • Download URL: ksps-1.0.0.tar.gz
  • Upload date:
  • Size: 4.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.11

File hashes

Hashes for ksps-1.0.0.tar.gz
Algorithm Hash digest
SHA256 2daec169ceadc20be57e43f87487b823bae5a7ef59bd8671ed7a0407b232f87a
MD5 c9d89119fb2daf741b83bfcb8db0e374
BLAKE2b-256 66c8b5316bf36890b0a2e2a8c7597fd9aa81220d9841bd9398898dbb963b0e63

See more details on using hashes here.

File details

Details for the file ksps-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: ksps-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.11

File hashes

Hashes for ksps-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c9f36d9bd7e29887d1fb7c8afbbfdb45bda8e12358be4786efe6c81c227b0a00
MD5 fcac878447eb8de905914f35e4b0a8b3
BLAKE2b-256 6cf3855269b70578221150f3296fddeabe8efe1f19f54247586f72803c3ee05e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page