To improve the given script using asyncio, you can use the aiohttp library for making asynchronous HTTP requests. Here’s the modified version of the script:

#!/usr/bin/env python3
import json
import asyncio
import aiohttp
import time

from typing import List, Dict

TIME_BETWEEN_REQUESTS = 5
TIME_TOTAL = 60


async def get_latency(session, domain):
    try:
        start = time.time()
        if not domain.startswith(("http://", "https://")):
            domain = "https://" + domain
        async with session.get(domain, timeout=3) as response:
            end = time.time()
            return end - start
    except asyncio.TimeoutError:
        return float("inf")
    except aiohttp.client_exceptions.ServerDisconnectedError:
        return float("inf")


def add_latency_to_domain(domain, latency, latencies):
    if domain not in latencies:
        latencies[domain] = []
    latencies[domain].append(latency)
    return latencies


async def measure_latencies_for_domains(session, domains, latencies):
    tasks = []
    for domain in domains:
        tasks.append(get_latency(session, domain))

    results = await asyncio.gather(*tasks)

    for domain, latency in zip(domains, results):
        latencies = add_latency_to_domain(domain, latency, latencies)

    return latencies


async def measure_latencies(domains, duration):
    latencies = {}
    start_time = time.time()
    end_time = start_time + duration

    async with aiohttp.ClientSession() as session:
        while time.time() < end_time:
            latencies = await measure_latencies_for_domains(session, domains, latencies)
            await asyncio.sleep(TIME_BETWEEN_REQUESTS)

    return latencies


def average_latencies(latencies):
    averages = []
    for domain, latency_list in latencies.items():
        avg_latency = sum(latency_list) / len(latency_list)
        averages.append((domain, avg_latency))
    return averages


def sort_latencies(averages):
    return sorted(averages, key=lambda x: x[1])


async def get_latency_report(domains, duration):
    latencies = await measure_latencies(domains, duration)
    averages = average_latencies(latencies)
    return sort_latencies(averages)


def get_instances(data: Dict) -> List[Dict]:
    instances = []
    for instance_details in data["instance_details"]:
        instances.append(instance_details)
    return instances


def get_domains(instances: List[Dict]) -> List[str]:
    return [instance["domain"] for instance in instances]


def load_json_data(filepath: str) -> Dict:
    with open(filepath) as json_data:
        return json.load(json_data)


async def main():
    data = load_json_data('stats.json')
    instances = get_instances(data)
    domains = get_domains(instances)
    report = await get_latency_report(domains, TIME_TOTAL)
    for domain, avg_latency in report:
        print(f"{domain}: {avg_latency:.2f} seconds")


if __name__ == "__main__":
    asyncio.run(main())

This version of the script should be able to ping multiple instances simultaneously using asyncio.

    • zinklog@lemmy.fmhy.ml
      link
      fedilink
      arrow-up
      1
      ·
      1 year ago

      Ah in that case I think it would be better if you mention the prompt asked in the body post, so users have a better idea of the context.