LOC-24 Added tests for kick/ban functionality
[LocalChat.git] / tests / run_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
#!/usr/bin/env python
#
# apt-get install:
#   python-psutil
#
import subprocess
import psutil
import os
import sys
import time
import sqlite3
import traceback
import json
 
 
try:
    from subprocess import DEVNULL # py3k
except ImportError:
    import os
    DEVNULL = open(os.devnull, 'wb')
 
 
DB_FILE = "%s/localchat-testing.db" % (os.getcwd(),)
PARENT_DIR = "%s/.." % (os.path.dirname(os.path.abspath(__file__)),)
sys.path.append('%s/client/' % (PARENT_DIR,))
 
import LocalChatClient
STORAGE = {}
 
 
def restartServer(proc1):
    ''' Start the server component
    
    If already running (i.e. proc1 != False) kill the existing instance
    '''
    
    # If we've already got a process, kill it
    if proc1:
        kill(proc1.pid)
    
    try:
        serverloc = "%s/server/LocalChat.py" % (PARENT_DIR,)
        proc1 = subprocess.Popen([serverloc,'--testing-mode-enable'],stderr=subprocess.STDOUT,stdout=DEVNULL)
    except Exception as e:
        print "Failed to start server"
        print e
        return False
    
    # Give it time to start up
    time.sleep(5)
    return proc1
 
 
def getClientInstance():
    ''' Get an instance of the client class
    '''
    return LocalChatClient.msgHandler()
 
 
def exit(proc1,code=0):
    ''' Tidy up and exit
    '''
    
    if proc1:
        kill(proc1.pid)
        
    sys.exit(code)
 
 
 
def kill(proc_pid):
    ''' Kill the process and it's children
    
    From https://stackoverflow.com/a/25134985
    
    We use this because proc1.kill() doesn't seem to kill of the child thread of the server, even if shell=True
    '''
    process = psutil.Process(proc_pid)
    for proc in process.children(recursive=True):
        proc.kill()
    process.kill()
 
 
 
def make_table(columns, data):
    """Create an ASCII table and return it as a string.
 
    Pass a list of strings to use as columns in the table and a list of
    dicts. The strings in 'columns' will be used as the keys to the dicts in
    'data.'
 
 
    https://snippets.bentasker.co.uk/page-1705192300-Make-ASCII-Table-Python.html
    """
    # Calculate how wide each cell needs to be
    cell_widths = {}
    for c in columns:
        lens = []
        values = [lens.append(len(str(d.get(c, "")))) for d in data]
        lens.append(len(c))
        lens.sort()
        cell_widths[c] = max(lens)
 
    # Used for formatting rows of data
    row_template = "|" + " {} |" * len(columns)
 
    # CONSTRUCT THE TABLE
 
    # The top row with the column titles
    justified_column_heads = [c.ljust(cell_widths[c]) for c in columns]
    header = row_template.format(*justified_column_heads)
    # The second row contains separators
    sep = "|" + "-" * (len(header) - 2) + "|"
    end = "-" * len(header)
    # Rows of data
    rows = []
 
    for d in data:
        fields = [str(d.get(c, "")).ljust(cell_widths[c]) for c in columns]
        row = row_template.format(*fields)
        rows.append(row)
    rows.append(end)
    return "\n".join([header, sep] + rows)
 
 
 
 
 
def run_tests():
    
    # Get an instance of the client
    msg = getClientInstance();
    
    test_results = []
    tests = ['test_one','test_two','test_three','test_four']
    x = 1
    for test in tests:
        print "Running %s " % (test,)
        stat,isFatal = globals()[test](msg)
        stat['No'] = x
        test_results.append(stat)
        if isFatal and stat['Result'] == 'FAIL':
            break
 
        x = x + 1
 
    return test_results
 
 
 
 
def test_one(msg):
    ''' Create a room and verify that it gets created
    '''
    
    result = {'Test' : 'Create a Room','Result' : 'FAIL', 'Notes': '' }
    # Test 1 - create a room and check it's recorded in the DB
    n = msg.createRoom('TestRoom1','testadmin')
    
    if not n:
        result['Notes'] = 'Empty Response'
        return result
    
    # The client should have given us two passwords
    if len(n) < 2:
        result['Notes'] = 'Response too small'
        return result
    
    # Seperate out the return value
    roompass = n[0]
    userpass = n[1] # user specific password    
 
    STORAGE['room'] = {"name":"TestRoom1",
                       "RoomPass":roompass,
                       "UserPass":userpass,
                       'User':'testadmin'
                       }
    
    # Check the DB to ensure the room was actually created
    CURSOR.execute("SELECT * from rooms where name=?",('TestRoom1',))
    r = CURSOR.fetchone()
    
    if not r:
        result['Notes'] = 'Room not in DB'
        return result
    
    result['Result'] = 'Pass'
    return [result,True]
 
 
 
 
def test_two(msg):
    ''' Try joining the previously created room with invalid credentials
    '''
    
    result = {'Test' : 'Join the room with invalid creds','Result' : 'FAIL', 'Notes': '' }
    n = msg.joinRoom(STORAGE['room']['User'],STORAGE['room']['name'],
                     "%s:%s" % (STORAGE['room']['RoomPass'],'BlatantlyWrong'))
    
    if n:
        result['Notes'] = 'Allowed to join with invalid pass'
        return [result,False]
    
    
    # Now try with an invalid username
    result = {'Test' : 'Join the room with invalid creds','Result' : 'FAIL', 'Notes': '' }
    n = msg.joinRoom('AlsoWrong',STORAGE['room']['name'],"%s:%s" % (STORAGE['room']['RoomPass'],'BlatantlyWrong'))
    
    if n:
        result['Notes'] = 'Invalid user Allowed to join'
        return [result,False]
        
    result['Result'] = 'Pass'
    return [result,'False']
 
 
 
 
def test_three(msg):
    ''' Join the previously created room
    '''
    
    result = {'Test' : 'Join the room','Result' : 'FAIL', 'Notes': '' }
    n = msg.joinRoom(STORAGE['room']['User'],STORAGE['room']['name'],
                     "%s:%s" % (STORAGE['room']['RoomPass'],STORAGE['room']['UserPass'])
                     )
    
    if not n:
        result['Notes'] = 'Could not join'
        return [result,True]
    
    # Check the DB to ensure we're now active
    CURSOR.execute("SELECT * from users where username=? and active=1",(STORAGE['room']['User'],))
    r = CURSOR.fetchone()
    
    if not r:
        result['Notes'] = 'Not Active in DB'
        return [result,True]
    
    # Check we've got a session token
    if not msg.sesskey:
        result['Notes'] = 'No Session Key'
        return [result,True]
        
    # Check the client has recorded what it needs to
    if not msg.room:
        result['Notes'] = 'Client forgot room'
        return [result,True]
 
    if not msg.user:
        result['Notes'] = 'Client forgot user'
        return [result,True]
 
    if not msg.roompass:
        result['Notes'] = 'Client forgot roompass'
        return [result,True]
 
    if not msg.syskey:
        result['Notes'] = 'No SYSTEM key'
        return [result,True]
    
    
    result['Result'] = 'Pass'
    return [result,True]
 
 
def test_four(msg):
    ''' When we joined, SYSTEM will have pushed a message. Ensure it's encrypted
    '''
    
    result = {'Test' : 'SYSTEM uses E2E','Result' : 'FAIL', 'Notes': '' }
    isFatal = False
 
    CURSOR.execute("SELECT msg FROM messages where user='SYSTEM' ORDER BY ts DESC")
    r = CURSOR.fetchone()
    
    if not r:
        result['Notes'] = 'No System Message'
        return [result,isFatal]
    
    
    try:
        json.loads(r[0])
        result['Notes'] = 'System Message not E2E encrypted'
        return [result,isFatal]       
    except:
        # This is a good thing in this case!
        
        # Now try and decrypt the message
        m = msg.decrypt(r[0],'SYSTEM')
        if not m:
            result['Notes'] = 'Could not decrypt'
            return [result,isFatal]     
    
        # Now check we got valid json
        try:
            j = json.loads(m)
            # Finally
            if "text" not in j or "verb" not in j:
                result['Notes'] = 'Not valid msg payload'
                return [result,isFatal]              
            
            # Otherwise, we're good
            result['Result'] = 'Pass'
            return [result,isFatal]
            
        except:
            result['Notes'] = 'Not valid JSON'
            return [result,isFatal]              
    
 
 
 
 
if __name__ == '__main__':
    # Start the server
    proc1 = restartServer(False)
 
    if not proc1:
        # Server start failed.
        # abort, abort, abort
        exit(proc1,1)
 
    
    
    CONN = sqlite3.connect(DB_FILE)
    CURSOR = CONN.cursor()
    
    try:
        # I don't like generic catchall exceptions
        # but, we want to make sure we kill the background
        # process if there is one.
        results = run_tests()
    except Exception as e:
        print traceback.format_exc()
        print e
        exit(proc1,1)
    
    cols = ['No','Test','Result','Notes']
    print make_table(cols,results)
    
    exit(proc1,0)