Using `latitude longitude` given your favorite maps site or application. This is useful when SpeedTest detected location does not really reflect reality.
557 lines
16 KiB
Python
Executable File
557 lines
16 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# Copyright 2013 Matt Martz
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
__version__ = '0.2.1'
|
|
|
|
try:
|
|
from urllib2 import urlopen, Request
|
|
except ImportError:
|
|
from urllib.request import urlopen, Request
|
|
|
|
import math
|
|
import time
|
|
import os
|
|
import sys
|
|
import threading
|
|
import re
|
|
import signal
|
|
from xml.dom import minidom as DOM
|
|
|
|
try:
|
|
from Queue import Queue
|
|
except ImportError:
|
|
from queue import Queue
|
|
|
|
try:
|
|
from urlparse import urlparse
|
|
except ImportError:
|
|
from urllib.parse import urlparse
|
|
|
|
try:
|
|
from urlparse import parse_qs
|
|
except ImportError:
|
|
try:
|
|
from urllib.parse import parse_qs
|
|
except ImportError:
|
|
from cgi import parse_qs
|
|
|
|
try:
|
|
from hashlib import md5
|
|
except ImportError:
|
|
from md5 import md5
|
|
|
|
try:
|
|
from argparse import ArgumentParser as ArgParser
|
|
except ImportError:
|
|
from optparse import OptionParser as ArgParser
|
|
|
|
try:
|
|
import builtins
|
|
except ImportError:
|
|
def print_(*args, **kwargs):
|
|
"""The new-style print function taken from
|
|
https://pypi.python.org/pypi/six/
|
|
|
|
"""
|
|
fp = kwargs.pop("file", sys.stdout)
|
|
if fp is None:
|
|
return
|
|
|
|
def write(data):
|
|
if not isinstance(data, basestring):
|
|
data = str(data)
|
|
fp.write(data)
|
|
|
|
want_unicode = False
|
|
sep = kwargs.pop("sep", None)
|
|
if sep is not None:
|
|
if isinstance(sep, unicode):
|
|
want_unicode = True
|
|
elif not isinstance(sep, str):
|
|
raise TypeError("sep must be None or a string")
|
|
end = kwargs.pop("end", None)
|
|
if end is not None:
|
|
if isinstance(end, unicode):
|
|
want_unicode = True
|
|
elif not isinstance(end, str):
|
|
raise TypeError("end must be None or a string")
|
|
if kwargs:
|
|
raise TypeError("invalid keyword arguments to print()")
|
|
if not want_unicode:
|
|
for arg in args:
|
|
if isinstance(arg, unicode):
|
|
want_unicode = True
|
|
break
|
|
if want_unicode:
|
|
newline = unicode("\n")
|
|
space = unicode(" ")
|
|
else:
|
|
newline = "\n"
|
|
space = " "
|
|
if sep is None:
|
|
sep = space
|
|
if end is None:
|
|
end = newline
|
|
for i, arg in enumerate(args):
|
|
if i:
|
|
write(sep)
|
|
write(arg)
|
|
write(end)
|
|
else:
|
|
print_ = getattr(builtins, 'print')
|
|
del builtins
|
|
|
|
|
|
def distance(origin, destination):
|
|
"""Determine distance between 2 sets of [lat,lon] in km"""
|
|
|
|
lat1, lon1 = origin
|
|
lat2, lon2 = destination
|
|
radius = 6371 # km
|
|
|
|
dlat = math.radians(lat2-lat1)
|
|
dlon = math.radians(lon2-lon1)
|
|
a = (math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1))
|
|
* math.cos(math.radians(lat2)) * math.sin(dlon / 2)
|
|
* math.sin(dlon / 2))
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
d = radius * c
|
|
|
|
return d
|
|
|
|
|
|
class FileGetter(threading.Thread):
|
|
def __init__(self, url, start):
|
|
self.url = url
|
|
self.result = None
|
|
self.starttime = start
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
self.result = [0]
|
|
try:
|
|
if (time.time() - self.starttime) <= 10:
|
|
f = urlopen(self.url)
|
|
while 1 and not shutdown_event.is_set():
|
|
self.result.append(len(f.read(10240)))
|
|
if self.result[-1] == 0:
|
|
break
|
|
f.close()
|
|
except IOError:
|
|
pass
|
|
|
|
|
|
def downloadSpeed(files, quiet=False):
|
|
start = time.time()
|
|
|
|
def producer(q, files):
|
|
for file in files:
|
|
thread = FileGetter(file, start)
|
|
thread.start()
|
|
q.put(thread, True)
|
|
if not quiet and not shutdown_event.is_set():
|
|
sys.stdout.write('.')
|
|
sys.stdout.flush()
|
|
|
|
finished = []
|
|
|
|
def consumer(q, total_files):
|
|
while len(finished) < total_files:
|
|
thread = q.get(True)
|
|
while thread.is_alive():
|
|
thread.join(timeout=0.1)
|
|
finished.append(sum(thread.result))
|
|
del thread
|
|
|
|
q = Queue(6)
|
|
prod_thread = threading.Thread(target=producer, args=(q, files))
|
|
cons_thread = threading.Thread(target=consumer, args=(q, len(files)))
|
|
start = time.time()
|
|
prod_thread.start()
|
|
cons_thread.start()
|
|
while prod_thread.is_alive():
|
|
prod_thread.join(timeout=0.1)
|
|
while cons_thread.is_alive():
|
|
cons_thread.join(timeout=0.1)
|
|
return (sum(finished)/(time.time()-start))
|
|
|
|
|
|
class FilePutter(threading.Thread):
|
|
def __init__(self, url, start, size):
|
|
self.url = url
|
|
chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
|
|
data = chars * (int(round(int(size) / 36.0)))
|
|
self.data = ('content1=%s' % data[0:int(size)-9]).encode()
|
|
del data
|
|
self.result = None
|
|
self.starttime = start
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
try:
|
|
if ((time.time() - self.starttime) <= 10 and
|
|
not shutdown_event.is_set()):
|
|
f = urlopen(self.url, self.data)
|
|
f.read(11)
|
|
f.close()
|
|
self.result = len(self.data)
|
|
else:
|
|
self.result = 0
|
|
except IOError:
|
|
self.result = 0
|
|
|
|
|
|
def uploadSpeed(url, sizes, quiet=False):
|
|
start = time.time()
|
|
|
|
def producer(q, sizes):
|
|
for size in sizes:
|
|
thread = FilePutter(url, start, size)
|
|
thread.start()
|
|
q.put(thread, True)
|
|
if not quiet and not shutdown_event.is_set():
|
|
sys.stdout.write('.')
|
|
sys.stdout.flush()
|
|
|
|
finished = []
|
|
|
|
def consumer(q, total_sizes):
|
|
while len(finished) < total_sizes:
|
|
thread = q.get(True)
|
|
while thread.is_alive():
|
|
thread.join(timeout=0.1)
|
|
finished.append(thread.result)
|
|
del thread
|
|
|
|
q = Queue(6)
|
|
prod_thread = threading.Thread(target=producer, args=(q, sizes))
|
|
cons_thread = threading.Thread(target=consumer, args=(q, len(sizes)))
|
|
start = time.time()
|
|
prod_thread.start()
|
|
cons_thread.start()
|
|
while prod_thread.is_alive():
|
|
prod_thread.join(timeout=0.1)
|
|
while cons_thread.is_alive():
|
|
cons_thread.join(timeout=0.1)
|
|
return (sum(finished)/(time.time()-start))
|
|
|
|
|
|
def getAttributesByTagName(dom, tagName):
|
|
elem = dom.getElementsByTagName(tagName)[0]
|
|
return dict(list(elem.attributes.items()))
|
|
|
|
|
|
def getConfig():
|
|
"""Download the speedtest.net configuration and return only the data
|
|
we are interested in
|
|
"""
|
|
|
|
uh = urlopen('http://www.speedtest.net/speedtest-config.php')
|
|
configxml = uh.read()
|
|
if int(uh.code) != 200:
|
|
return None
|
|
uh.close()
|
|
root = DOM.parseString(configxml)
|
|
config = {
|
|
'client': getAttributesByTagName(root, 'client'),
|
|
'times': getAttributesByTagName(root, 'times'),
|
|
'download': getAttributesByTagName(root, 'download'),
|
|
'upload': getAttributesByTagName(root, 'upload')}
|
|
|
|
del root
|
|
return config
|
|
|
|
|
|
def closestServers(client, all=False):
|
|
"""Determine the 5 closest speedtest.net servers based on geographic
|
|
distance
|
|
"""
|
|
|
|
uh = urlopen('http://www.speedtest.net/speedtest-servers.php')
|
|
serversxml = uh.read()
|
|
if int(uh.code) != 200:
|
|
return None
|
|
uh.close()
|
|
root = DOM.parseString(serversxml)
|
|
servers = {}
|
|
for server in root.getElementsByTagName('server'):
|
|
attrib = dict(list(server.attributes.items()))
|
|
d = distance([float(client['lat']), float(client['lon'])],
|
|
[float(attrib.get('lat')), float(attrib.get('lon'))])
|
|
attrib['d'] = d
|
|
if d not in servers:
|
|
servers[d] = [attrib]
|
|
else:
|
|
servers[d].append(attrib)
|
|
del root
|
|
|
|
closest = []
|
|
for d in sorted(servers.keys()):
|
|
for s in servers[d]:
|
|
closest.append(s)
|
|
if len(closest) == 5 and not all:
|
|
break
|
|
else:
|
|
continue
|
|
break
|
|
|
|
del servers
|
|
return closest
|
|
|
|
|
|
def getBestServer(servers):
|
|
"""Perform a speedtest.net "ping" to determine which speedtest.net
|
|
server has the lowest latency
|
|
"""
|
|
|
|
results = {}
|
|
for server in servers:
|
|
cum = []
|
|
url = os.path.dirname(server['url'])
|
|
for i in range(0, 3):
|
|
uh = urlopen('%s/latency.txt' % url)
|
|
start = time.time()
|
|
text = uh.read(9)
|
|
total = time.time() - start
|
|
if int(uh.code) == 200 and text == 'test=test'.encode():
|
|
cum.append(total)
|
|
else:
|
|
cum.append(3600)
|
|
uh.close()
|
|
avg = round((sum(cum) / 3) * 1000000, 3)
|
|
results[avg] = server
|
|
|
|
fastest = sorted(results.keys())[0]
|
|
best = results[fastest]
|
|
best['latency'] = fastest
|
|
|
|
return best
|
|
|
|
|
|
def ctrl_c(signum, frame):
|
|
global shutdown_event
|
|
shutdown_event.set()
|
|
raise SystemExit('\nCancelling...')
|
|
|
|
|
|
def version():
|
|
raise SystemExit(__version__)
|
|
|
|
|
|
def speedtest():
|
|
"""Run the full speedtest.net test"""
|
|
|
|
global shutdown_event
|
|
shutdown_event = threading.Event()
|
|
|
|
signal.signal(signal.SIGINT, ctrl_c)
|
|
|
|
description = (
|
|
'Command line interface for testing internet bandwidth using '
|
|
'speedtest.net.\n'
|
|
'------------------------------------------------------------'
|
|
'--------------\n'
|
|
'https://github.com/sivel/speedtest-cli')
|
|
|
|
parser = ArgParser(description=description)
|
|
try:
|
|
parser.add_argument = parser.add_option
|
|
except AttributeError:
|
|
pass
|
|
parser.add_argument('--share', action='store_true',
|
|
help='Generate and provide a URL to the speedtest.net '
|
|
'share results image')
|
|
parser.add_argument('--simple', action='store_true',
|
|
help='Suppress verbose output, only show basic '
|
|
'information')
|
|
parser.add_argument('--list', action='store_true',
|
|
help='Display a list of speedtest.net servers '
|
|
'sorted by distance')
|
|
parser.add_argument('--server', help='Specify a server ID to test against')
|
|
parser.add_argument('--loc', type=float, nargs=2, metavar=('lat', 'lon'),
|
|
help='Specify own location')
|
|
parser.add_argument('--mini', help='URL of the Speedtest Mini server')
|
|
parser.add_argument('--version', action='store_true',
|
|
help='Show the version number and exit')
|
|
|
|
options = parser.parse_args()
|
|
if isinstance(options, tuple):
|
|
args = options[0]
|
|
else:
|
|
args = options
|
|
del options
|
|
|
|
if args.version:
|
|
version()
|
|
|
|
if not args.simple:
|
|
print_('Retrieving speedtest.net configuration...')
|
|
config = getConfig()
|
|
|
|
if args.loc:
|
|
config['client']['lat'] = args.loc[0]
|
|
config['client']['lon'] = args.loc[1]
|
|
|
|
if not args.simple:
|
|
print_('Retrieving speedtest.net server list...')
|
|
if args.list or args.server:
|
|
servers = closestServers(config['client'], True)
|
|
if args.list:
|
|
serverList = []
|
|
for server in servers:
|
|
line = ('%(id)4s) %(sponsor)s (%(name)s, %(country)s) '
|
|
'[%(d)0.2f km]' % server)
|
|
serverList.append(line)
|
|
try:
|
|
print_('\n'.join(serverList).encode('utf-8', 'ignore'))
|
|
except IOError:
|
|
pass
|
|
sys.exit(0)
|
|
else:
|
|
servers = closestServers(config['client'])
|
|
|
|
if not args.simple:
|
|
print_('Testing from %(isp)s (%(ip)s)...' % config['client'])
|
|
|
|
if args.server:
|
|
try:
|
|
best = getBestServer(filter(lambda x: x['id'] == args.server,
|
|
servers))
|
|
except IndexError:
|
|
print_('Invalid server ID')
|
|
sys.exit(1)
|
|
elif args.mini:
|
|
name, ext = os.path.splitext(args.mini)
|
|
if ext:
|
|
url = os.path.dirname(args.mini)
|
|
else:
|
|
url = args.mini
|
|
urlparts = urlparse(url)
|
|
try:
|
|
f = urlopen(args.mini)
|
|
except:
|
|
print_('Invalid Speedtest Mini URL')
|
|
sys.exit(1)
|
|
else:
|
|
text = f.read()
|
|
f.close()
|
|
extension = re.findall('upload_extension: "([^"]+)"', text.decode())
|
|
if not urlparts or not extension:
|
|
print_('Please provide the full URL of your Speedtest Mini server')
|
|
sys.exit(1)
|
|
servers = [{
|
|
'sponsor': 'Speedtest Mini',
|
|
'name': urlparts[1],
|
|
'd': 0,
|
|
'url': '%s/speedtest/upload.%s' % (url.rstrip('/'), extension[0]),
|
|
'latency': 0,
|
|
'id': 0
|
|
}]
|
|
try:
|
|
best = getBestServer(servers)
|
|
except:
|
|
best = servers[0]
|
|
else:
|
|
if not args.simple:
|
|
print_('Selecting best server based on ping...')
|
|
best = getBestServer(servers)
|
|
|
|
if not args.simple:
|
|
print_('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: '
|
|
'%(latency)s ms' % best)
|
|
else:
|
|
print_('Ping: %(latency)s ms' % best)
|
|
|
|
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
|
|
urls = []
|
|
for size in sizes:
|
|
for i in range(0, 4):
|
|
urls.append('%s/random%sx%s.jpg' %
|
|
(os.path.dirname(best['url']), size, size))
|
|
if not args.simple:
|
|
print_('Testing download speed', end='')
|
|
dlspeed = downloadSpeed(urls, args.simple)
|
|
if not args.simple:
|
|
print_()
|
|
print_('Download: %0.2f Mbit/s' % ((dlspeed / 1000 / 1000) * 8))
|
|
|
|
sizesizes = [int(.25 * 1000 * 1000), int(.5 * 1000 * 1000)]
|
|
sizes = []
|
|
for size in sizesizes:
|
|
for i in range(0, 25):
|
|
sizes.append(size)
|
|
if not args.simple:
|
|
print_('Testing upload speed', end='')
|
|
ulspeed = uploadSpeed(best['url'], sizes, args.simple)
|
|
if not args.simple:
|
|
print_()
|
|
print_('Upload: %0.2f Mbit/s' % ((ulspeed / 1000 / 1000) * 8))
|
|
|
|
if args.share and args.mini:
|
|
print_('Cannot generate a speedtest.net share results image while '
|
|
'testing against a Speedtest Mini server')
|
|
elif args.share:
|
|
dlspeedk = int(round((dlspeed / 1000) * 8, 0))
|
|
ping = int(round(best['latency'], 0))
|
|
ulspeedk = int(round((ulspeed / 1000) * 8, 0))
|
|
|
|
apiData = [
|
|
'download=%s' % dlspeedk,
|
|
'ping=%s' % ping,
|
|
'upload=%s' % ulspeedk,
|
|
'promo=',
|
|
'startmode=%s' % 'pingselect',
|
|
'recommendedserverid=%s' % best['id'],
|
|
'accuracy=%s' % 1,
|
|
'serverid=%s' % best['id'],
|
|
'hash=%s' % md5(('%s-%s-%s-%s' %
|
|
(ping, ulspeedk, dlspeedk, '297aae72'))
|
|
.encode()).hexdigest()]
|
|
|
|
req = Request('http://www.speedtest.net/api/api.php',
|
|
data='&'.join(apiData).encode())
|
|
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
|
|
f = urlopen(req)
|
|
response = f.read()
|
|
code = f.code
|
|
f.close()
|
|
|
|
if int(code) != 200:
|
|
print_('Could not submit results to speedtest.net')
|
|
sys.exit(1)
|
|
|
|
qsargs = parse_qs(response.decode())
|
|
resultid = qsargs.get('resultid')
|
|
if not resultid or len(resultid) != 1:
|
|
print_('Could not submit results to speedtest.net')
|
|
sys.exit(1)
|
|
|
|
print_('Share results: http://www.speedtest.net/result/%s.png' %
|
|
resultid[0])
|
|
|
|
|
|
def main():
|
|
try:
|
|
speedtest()
|
|
except KeyboardInterrupt:
|
|
print_('\nCancelling...')
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
# vim:ts=4:sw=4:expandtab
|