1#!/usr/bin/env python
2# NBD server - fault injection utility
3#
4# Configuration file syntax:
5#   [inject-error "disconnect-neg1"]
6#   event=neg1
7#   io=readwrite
8#   when=before
9#
10# Note that Python's ConfigParser squashes together all sections with the same
11# name, so give each [inject-error] a unique name.
12#
13# inject-error options:
14#   event - name of the trigger event
15#           "neg1" - first part of negotiation struct
16#           "export" - export struct
17#           "neg2" - second part of negotiation struct
18#           "request" - NBD request struct
19#           "reply" - NBD reply struct
20#           "data" - request/reply data
21#   io    - I/O direction that triggers this rule:
22#           "read", "write", or "readwrite"
23#           default: readwrite
24#   when  - after how many bytes to inject the fault
25#           -1 - inject error after I/O
26#           0 - inject error before I/O
27#           integer - inject error after integer bytes
28#           "before" - alias for 0
29#           "after" - alias for -1
30#           default: before
31#
32# Currently the only error injection action is to terminate the server process.
33# This resets the TCP connection and thus forces the client to handle
34# unexpected connection termination.
35#
36# Other error injection actions could be added in the future.
37#
38# Copyright Red Hat, Inc. 2014
39#
40# Authors:
41#   Stefan Hajnoczi <stefanha@redhat.com>
42#
43# This work is licensed under the terms of the GNU GPL, version 2 or later.
44# See the COPYING file in the top-level directory.
45
46from __future__ import print_function
47import sys
48import socket
49import struct
50import collections
51if sys.version_info.major >= 3:
52    import configparser
53else:
54    import ConfigParser as configparser
55
56FAKE_DISK_SIZE = 8 * 1024 * 1024 * 1024 # 8 GB
57
58# Protocol constants
59NBD_CMD_READ = 0
60NBD_CMD_WRITE = 1
61NBD_CMD_DISC = 2
62NBD_REQUEST_MAGIC = 0x25609513
63NBD_SIMPLE_REPLY_MAGIC = 0x67446698
64NBD_PASSWD = 0x4e42444d41474943
65NBD_OPTS_MAGIC = 0x49484156454F5054
66NBD_CLIENT_MAGIC = 0x0000420281861253
67NBD_OPT_EXPORT_NAME = 1 << 0
68
69# Protocol structs
70neg_classic_struct = struct.Struct('>QQQI124x')
71neg1_struct = struct.Struct('>QQH')
72export_tuple = collections.namedtuple('Export', 'reserved magic opt len')
73export_struct = struct.Struct('>IQII')
74neg2_struct = struct.Struct('>QH124x')
75request_tuple = collections.namedtuple('Request', 'magic type handle from_ len')
76request_struct = struct.Struct('>IIQQI')
77reply_struct = struct.Struct('>IIQ')
78
79def err(msg):
80    sys.stderr.write(msg + '\n')
81    sys.exit(1)
82
83def recvall(sock, bufsize):
84    received = 0
85    chunks = []
86    while received < bufsize:
87        chunk = sock.recv(bufsize - received)
88        if len(chunk) == 0:
89            raise Exception('unexpected disconnect')
90        chunks.append(chunk)
91        received += len(chunk)
92    return b''.join(chunks)
93
94class Rule(object):
95    def __init__(self, name, event, io, when):
96        self.name = name
97        self.event = event
98        self.io = io
99        self.when = when
100
101    def match(self, event, io):
102        if event != self.event:
103            return False
104        if io != self.io and self.io != 'readwrite':
105            return False
106        return True
107
108class FaultInjectionSocket(object):
109    def __init__(self, sock, rules):
110        self.sock = sock
111        self.rules = rules
112
113    def check(self, event, io, bufsize=None):
114        for rule in self.rules:
115            if rule.match(event, io):
116                if rule.when == 0 or bufsize is None:
117                    print('Closing connection on rule match %s' % rule.name)
118                    self.sock.close()
119                    sys.stdout.flush()
120                    sys.exit(0)
121                if rule.when != -1:
122                    return rule.when
123        return bufsize
124
125    def send(self, buf, event):
126        bufsize = self.check(event, 'write', bufsize=len(buf))
127        self.sock.sendall(buf[:bufsize])
128        self.check(event, 'write')
129
130    def recv(self, bufsize, event):
131        bufsize = self.check(event, 'read', bufsize=bufsize)
132        data = recvall(self.sock, bufsize)
133        self.check(event, 'read')
134        return data
135
136    def close(self):
137        self.sock.close()
138
139def negotiate_classic(conn):
140    buf = neg_classic_struct.pack(NBD_PASSWD, NBD_CLIENT_MAGIC,
141                                  FAKE_DISK_SIZE, 0)
142    conn.send(buf, event='neg-classic')
143
144def negotiate_export(conn):
145    # Send negotiation part 1
146    buf = neg1_struct.pack(NBD_PASSWD, NBD_OPTS_MAGIC, 0)
147    conn.send(buf, event='neg1')
148
149    # Receive export option
150    buf = conn.recv(export_struct.size, event='export')
151    export = export_tuple._make(export_struct.unpack(buf))
152    assert export.magic == NBD_OPTS_MAGIC
153    assert export.opt == NBD_OPT_EXPORT_NAME
154    name = conn.recv(export.len, event='export-name')
155
156    # Send negotiation part 2
157    buf = neg2_struct.pack(FAKE_DISK_SIZE, 0)
158    conn.send(buf, event='neg2')
159
160def negotiate(conn, use_export):
161    '''Negotiate export with client'''
162    if use_export:
163        negotiate_export(conn)
164    else:
165        negotiate_classic(conn)
166
167def read_request(conn):
168    '''Parse NBD request from client'''
169    buf = conn.recv(request_struct.size, event='request')
170    req = request_tuple._make(request_struct.unpack(buf))
171    assert req.magic == NBD_REQUEST_MAGIC
172    return req
173
174def write_reply(conn, error, handle):
175    buf = reply_struct.pack(NBD_SIMPLE_REPLY_MAGIC, error, handle)
176    conn.send(buf, event='reply')
177
178def handle_connection(conn, use_export):
179    negotiate(conn, use_export)
180    while True:
181        req = read_request(conn)
182        if req.type == NBD_CMD_READ:
183            write_reply(conn, 0, req.handle)
184            conn.send(b'\0' * req.len, event='data')
185        elif req.type == NBD_CMD_WRITE:
186            _ = conn.recv(req.len, event='data')
187            write_reply(conn, 0, req.handle)
188        elif req.type == NBD_CMD_DISC:
189            break
190        else:
191            print('unrecognized command type %#02x' % req.type)
192            break
193    conn.close()
194
195def run_server(sock, rules, use_export):
196    while True:
197        conn, _ = sock.accept()
198        handle_connection(FaultInjectionSocket(conn, rules), use_export)
199
200def parse_inject_error(name, options):
201    if 'event' not in options:
202        err('missing \"event\" option in %s' % name)
203    event = options['event']
204    if event not in ('neg-classic', 'neg1', 'export', 'neg2', 'request', 'reply', 'data'):
205        err('invalid \"event\" option value \"%s\" in %s' % (event, name))
206    io = options.get('io', 'readwrite')
207    if io not in ('read', 'write', 'readwrite'):
208        err('invalid \"io\" option value \"%s\" in %s' % (io, name))
209    when = options.get('when', 'before')
210    try:
211        when = int(when)
212    except ValueError:
213        if when == 'before':
214            when = 0
215        elif when == 'after':
216            when = -1
217        else:
218            err('invalid \"when\" option value \"%s\" in %s' % (when, name))
219    return Rule(name, event, io, when)
220
221def parse_config(config):
222    rules = []
223    for name in config.sections():
224        if name.startswith('inject-error'):
225            options = dict(config.items(name))
226            rules.append(parse_inject_error(name, options))
227        else:
228            err('invalid config section name: %s' % name)
229    return rules
230
231def load_rules(filename):
232    config = configparser.RawConfigParser()
233    with open(filename, 'rt') as f:
234        config.readfp(f, filename)
235    return parse_config(config)
236
237def open_socket(path):
238    '''Open a TCP or UNIX domain listen socket'''
239    if ':' in path:
240        host, port = path.split(':', 1)
241        sock = socket.socket()
242        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
243        sock.bind((host, int(port)))
244
245        # If given port was 0 the final port number is now available
246        path = '%s:%d' % sock.getsockname()
247    else:
248        sock = socket.socket(socket.AF_UNIX)
249        sock.bind(path)
250    sock.listen(0)
251    print('Listening on %s' % path)
252    sys.stdout.flush() # another process may be waiting, show message now
253    return sock
254
255def usage(args):
256    sys.stderr.write('usage: %s [--classic-negotiation] <tcp-port>|<unix-path> <config-file>\n' % args[0])
257    sys.stderr.write('Run an fault injector NBD server with rules defined in a config file.\n')
258    sys.exit(1)
259
260def main(args):
261    if len(args) != 3 and len(args) != 4:
262        usage(args)
263    use_export = True
264    if args[1] == '--classic-negotiation':
265        use_export = False
266    elif len(args) == 4:
267        usage(args)
268    sock = open_socket(args[1 if use_export else 2])
269    rules = load_rules(args[2 if use_export else 3])
270    run_server(sock, rules, use_export)
271    return 0
272
273if __name__ == '__main__':
274    sys.exit(main(sys.argv))
275