1 #!/usr/bin/env python3 2 # SPDX-License-Identifier: GPL-2.0 3 4 import argparse 5 import errno 6 import logging 7 import socket 8 import struct 9 import time 10 11 import usb.core 12 import usb.util 13 14 15 def path_from_usb_dev(dev): 16 """Takes a pyUSB device as argument and re 17 The string is a Path representation of the 18 19 This path is used to find a USB device on 20 The path is made up of the number of the U 21 if dev.port_numbers: 22 dev_path = ".".join(str(i) for i in de 23 return f"{dev.bus}-{dev_path}" 24 return "" 25 26 27 HEXDUMP_FILTER = "".join(chr(x).isprintable() 28 29 30 class Forwarder: 31 @staticmethod 32 def _log_hexdump(data): 33 if not logging.root.isEnabledFor(loggi 34 return 35 L = 16 36 for c in range(0, len(data), L): 37 chars = data[c : c + L] 38 dump = " ".join(f"{x:02x}" for x i 39 printable = "".join(HEXDUMP_FILTER 40 line = f"{c:08x} {dump:{L*3}s} |{ 41 logging.root.log(logging.TRACE, "% 42 43 def __init__(self, server, vid, pid, path) 44 self.stats = { 45 "c2s packets": 0, 46 "c2s bytes": 0, 47 "s2c packets": 0, 48 "s2c bytes": 0, 49 } 50 self.stats_logged = time.monotonic() 51 52 def find_filter(dev): 53 dev_path = path_from_usb_dev(dev) 54 if path is not None: 55 return dev_path == path 56 return True 57 58 dev = usb.core.find(idVendor=vid, idPr 59 if dev is None: 60 raise ValueError("Device not found 61 62 logging.info(f"found device: {dev.bus} 63 64 # dev.set_configuration() is not neces 65 usb9pfs = None 66 # g_multi adds 9pfs as last interface 67 cfg = dev.get_active_configuration() 68 for intf in cfg: 69 # we have to detach the usb-storag 70 # stall option could be set, which 71 # resets and our transfers will ru 72 if intf.bInterfaceClass == 0x08: 73 if dev.is_kernel_driver_active 74 dev.detach_kernel_driver(i 75 76 if intf.bInterfaceClass == 0xFF an 77 usb9pfs = intf 78 if usb9pfs is None: 79 raise ValueError("Interface not fo 80 81 logging.info(f"claiming interface:\n{u 82 usb.util.claim_interface(dev, usb9pfs. 83 ep_out = usb.util.find_descriptor( 84 usb9pfs, 85 custom_match=lambda e: usb.util.en 86 ) 87 assert ep_out is not None 88 ep_in = usb.util.find_descriptor( 89 usb9pfs, 90 custom_match=lambda e: usb.util.en 91 ) 92 assert ep_in is not None 93 logging.info("interface claimed") 94 95 self.ep_out = ep_out 96 self.ep_in = ep_in 97 self.dev = dev 98 99 # create and connect socket 100 self.s = socket.socket(socket.AF_INET, 101 self.s.connect(server) 102 103 logging.info("connected to server") 104 105 def c2s(self): 106 """forward a request from the USB clie 107 data = None 108 while data is None: 109 try: 110 logging.log(logging.TRACE, "c2 111 data = self.ep_in.read(self.ep 112 except usb.core.USBTimeoutError: 113 logging.log(logging.TRACE, "c2 114 continue 115 except usb.core.USBError as e: 116 if e.errno == errno.EIO: 117 logging.debug("c2s: readin 118 time.sleep(0.5) 119 continue 120 logging.error("c2s: reading fa 121 raise 122 size = struct.unpack("<I", data[:4])[0 123 while len(data) < size: 124 data += self.ep_in.read(size - len 125 logging.log(logging.TRACE, "c2s: writi 126 self._log_hexdump(data) 127 self.s.send(data) 128 logging.debug("c2s: forwarded %i bytes 129 self.stats["c2s packets"] += 1 130 self.stats["c2s bytes"] += size 131 132 def s2c(self): 133 """forward a response from the TCP ser 134 logging.log(logging.TRACE, "s2c: readi 135 data = self.s.recv(4) 136 size = struct.unpack("<I", data[:4])[0 137 while len(data) < size: 138 data += self.s.recv(size - len(dat 139 logging.log(logging.TRACE, "s2c: writi 140 self._log_hexdump(data) 141 while data: 142 written = self.ep_out.write(data) 143 assert written > 0 144 data = data[written:] 145 if size % self.ep_out.wMaxPacketSize = 146 logging.log(logging.TRACE, "sendin 147 self.ep_out.write(b"") 148 logging.debug("s2c: forwarded %i bytes 149 self.stats["s2c packets"] += 1 150 self.stats["s2c bytes"] += size 151 152 def log_stats(self): 153 logging.info("statistics:") 154 for k, v in self.stats.items(): 155 logging.info(f" {k+':':14s} {v}") 156 157 def log_stats_interval(self, interval=5): 158 if (time.monotonic() - self.stats_logg 159 return 160 161 self.log_stats() 162 self.stats_logged = time.monotonic() 163 164 165 def try_get_usb_str(dev, name): 166 try: 167 with open(f"/sys/bus/usb/devices/{dev. 168 return f.read().strip() 169 except FileNotFoundError: 170 return None 171 172 173 def list_usb(args): 174 vid, pid = [int(x, 16) for x in args.id.sp 175 176 print("Bus | Addr | Manufacturer | Pro 177 print("--- | ---- | ---------------- | --- 178 for dev in usb.core.find(find_all=True, id 179 path = path_from_usb_dev(dev) or "" 180 manufacturer = try_get_usb_str(dev, "m 181 product = try_get_usb_str(dev, "produc 182 print( 183 f"{dev.bus:3} | {dev.address:4} | 184 ) 185 186 187 def connect(args): 188 vid, pid = [int(x, 16) for x in args.id.sp 189 190 f = Forwarder(server=(args.server, args.po 191 192 try: 193 while True: 194 f.c2s() 195 f.s2c() 196 f.log_stats_interval() 197 finally: 198 f.log_stats() 199 200 201 def main(): 202 parser = argparse.ArgumentParser( 203 description="Forward 9PFS requests fro 204 ) 205 206 parser.add_argument("--id", type=str, defa 207 parser.add_argument("--path", type=str, re 208 parser.add_argument("-v", "--verbose", act 209 210 subparsers = parser.add_subparsers() 211 subparsers.required = True 212 subparsers.dest = "command" 213 214 parser_list = subparsers.add_parser("list" 215 parser_list.set_defaults(func=list_usb) 216 217 parser_connect = subparsers.add_parser( 218 "connect", help="Forward messages betw 219 ) 220 parser_connect.set_defaults(func=connect) 221 connect_group = parser_connect.add_argumen 222 connect_group.required = True 223 parser_connect.add_argument("-s", "--serve 224 parser_connect.add_argument("-p", "--port" 225 226 args = parser.parse_args() 227 228 logging.TRACE = logging.DEBUG - 5 229 logging.addLevelName(logging.TRACE, "TRACE 230 231 if args.verbose >= 2: 232 level = logging.TRACE 233 elif args.verbose: 234 level = logging.DEBUG 235 else: 236 level = logging.INFO 237 logging.basicConfig(level=level, format="% 238 239 args.func(args) 240 241 242 if __name__ == "__main__": 243 main()
Linux® is a registered trademark of Linus Torvalds in the United States and other countries.
TOMOYO® is a registered trademark of NTT DATA CORPORATION.