iwlist.py 3.43 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import re
import subprocess
import sys
import time
from collections import defaultdict

import yaml

ADDRESS_RE = re.compile(r'^Cell \d+ - Address: (?P<address>([0-9A-F]{2}:){5}[0-9A-F]{2})$')
CHANNEL_RE = re.compile(r'^Channel:(?P<channel>\d+)$')
FREQUENCY_RE = re.compile(r'^Frequency:(?P<freq>\d\.\d+) GHz( \(Channel \d+\))?$')
POWER_RE = re.compile(r'^Quality=\d+/\d+  Signal level=(?P<power>-\d+) dBm')
ESSID_RE = re.compile(r'^ESSID:"(?P<essid>.*)"$')


def scan(interface='wlp2s0', retries=5):
    if retries == 0:
        print('Giving up...', file=sys.stderr)
        return None
    proc = subprocess.run(['iwlist', interface, 'scanning'], stdout=subprocess.PIPE)
    stdout = (l.strip() for l in proc.stdout.decode('utf8').splitlines())
    first_line = next(stdout).lower()
    if 'no scan results' in first_line:
        print(
            'Scanning was not successfull... Retrying for {} times'.format(retries),
            file=sys.stderr,
        )
        time.sleep(1)
        # try again
        return scan(interface, retries - 1)
    assert 'scan completed' in first_line
    cells = {}
    current_cell = {}
    for line in stdout:
        if ADDRESS_RE.match(line):
            if current_cell:
                cells[current_cell['address']] = current_cell
                current_cell = {}
            current_cell['address'] = ADDRESS_RE.match(line).group('address')
        elif CHANNEL_RE.match(line):
            current_cell['channel'] = CHANNEL_RE.match(line).group('channel')
        elif FREQUENCY_RE.match(line):
            current_cell['freq'] = float(FREQUENCY_RE.match(line).group('freq'))  # GHz
        elif POWER_RE.match(line):
            current_cell['power'] = int(POWER_RE.match(line).group('power'))  # dBm
        elif ESSID_RE.match(line):
            current_cell['essid'] = ESSID_RE.match(line).group('essid')
        # else:
        #     print(line)

    if current_cell:
        cells[current_cell['address']] = current_cell

    return cells


def make_reliable_measurement(times=5, delay=1):
    measurements = []
    for _ in range(times):
        point = scan()
        if point:
            measurements.append(point)
        time.sleep(delay)
    if not measurements:
        return None
    # average
    cells = defaultdict(list)
    for measurement in measurements:
        for address, cell in measurement.items():
            cells[address].append(cell)
    avg_cells = {}
    for address, cell_measurements in cells.items():
        assert all(m['channel'] == cell_measurements[0]['channel'] for m in cell_measurements)
        assert all(m['essid'] == cell_measurements[0]['essid'] for m in cell_measurements)
        assert all(m['freq'] == cell_measurements[0]['freq'] for m in cell_measurements)
        avg_cells[address] = {
            'address': address,
            'channel': cell_measurements[0]['channel'],
            'essid': cell_measurements[0]['essid'],
            'freq': cell_measurements[0]['freq'],
            'power': sum(m['power'] for m in cell_measurements) / len(cell_measurements),
        }
    return avg_cells


def append_measurement(lat, lon, acc, filename='data.yml'):
    t1 = time.time()
    measurement = make_reliable_measurement()
    t2 = time.time()
    timestamp = int((t2 + t1) // 2)
    with open(filename, 'a') as file:
        yaml.dump([{
            'lat': lat,
            'lon': lon,
            'acc': acc,
            'time': timestamp,
            'data': measurement,
        }], stream=file)