Source code for trex.astf.topo

import imp
import json
import yaml
import os
import sys
import math

from ..common.services.trex_service_arp import ServiceARP
from ..common.trex_exceptions import *
from ..common.trex_types import listify, validate_type, basestring
from ..utils.common import *
from ..utils.text_tables import TRexTextTable, print_table_with_header

DST_MAC, DST_IPv4, DST_IPv6 = range(3)
MAX_VIF_ID = 100000

def split_port_str(port_id):
    validate_type('port_id', port_id, basestring)
    port_ids = port_id.split('.')
    try:
        if len(port_ids) == 1:
            trex_port = int(port_id)
            sub_if    = 0
        elif len(port_ids) == 2:
            trex_port = int(port_ids[0])
            sub_if    = int(port_ids[1])
        else:
            raise ValueError('')
    except ValueError:
        raise TRexError("Invalid port_id %s, valid examples: '4' for TRex port 4 or '0.2' for TRex port 0 and sub-interface 2" % port_id)

    if sub_if < 0 or sub_if > MAX_VIF_ID:
        raise TRexError('sub_if should be between 1 and %s, got: %s' % (MAX_VIF_ID, sub_if))

    return trex_port, sub_if


[docs]class TopoGW(object): def __init__(self, port_id, src_start, src_end, dst, dst_mac = ''): ''' Defines next hop for traffic. :parameters: port_id: string Format of "A.B", where A is TRex port ID and B is sub-interface ID >= 1. src_start, src_end: strings IPv4 addresses, traffic within this range will be routed via this GW dst: string Either IPv4/v6 or MAC address. IP will need resolve before uploading to server. dst_mac: string Resolved MAC, for internal usage. ''' trex_port, sub_if = split_port_str(port_id) if trex_port % 2: raise TRexError('GW can be specified only for client (even) IFs, got: %s' % port_id) if not is_valid_ipv4(src_start): raise TRexError("src_start is not a valid IPv4 address: '%s'" % src_start) if not is_valid_ipv4(src_end): raise TRexError("src_end is not a valid IPv4 address: '%s'" % src_end) validate_type('dst', dst, basestring) if is_valid_ipv4(dst): self.dst_type = DST_IPv4 elif is_valid_ipv6(dst): self.dst_type = DST_IPv6 elif is_valid_mac(dst): self.dst_type = DST_MAC else: raise TRexError('dst should be either IPv4, IPv6 or MAC address, got: %s' % dst) self.port_id = port_id self.trex_port = trex_port self.sub_if = sub_if self.src_start = src_start self.src_end = src_end self.dst = dst if dst_mac: self.dst_mac = dst_mac if self.dst_type == DST_MAC and dst_mac != dst: raise TRexError('Both dst and dst_mac is used, but they are different: %s, %s' % (dst, dst_mac)) elif self.dst_type == DST_MAC: self.dst_mac = dst else: self.dst_mac = None def get_data(self, to_server): d = {} d['src_start'] = self.src_start d['src_end'] = self.src_end d['dst'] = self.dst if to_server: d['trex_port'] = self.trex_port d['sub_if'] = self.sub_if d['dst_mac'] = self.dst_mac else: d['port_id'] = self.port_id return d def to_code(self): data = self.get_data(False) return "TopoGW('{port_id}', '{src_start}', '{src_end}', '{dst}')".format(**data)
[docs]class TopoVIF(object): def __init__(self, port_id, src_mac, src_ipv4 = '', src_ipv6 = '', vlan = 0): ''' Source MAC and VLAN are taken from here for traffic. :parameters: port_id: string Format of "A.B", where A is TRex port ID and B is sub-interface ID >= 1. src_mac: string MAC address of virtual interface. Will be used in sent traffic. src_ipv4: string IPv4 address of interface. If specified, used in resolve, otherwise taken from TRex port. src_ipv6: string IPv6 address of interface. Currently not used. vlan: int VLAN ID, will be used in traffic and in resolve process. ''' trex_port, sub_if = split_port_str(port_id) if sub_if <= 0: raise TRexError('VIF port_id sub_if ID should be positive, got: %s' % port_id) validate_type('src_mac', src_mac, basestring) if not is_valid_mac(src_mac): raise TRexError('src_mac is not valid MAC address: %s' % src_mac) if src_ipv4 and not is_valid_ipv4(src_ipv4): raise TRexError('src_ipv4 is not valid IPv4 address: %s' % src_ipv4) if src_ipv6 and not is_valid_ipv6(src_ipv6): raise TRexError('src_ipv6 is not valid IPv6 address: %s' % src_ipv6) if vlan is None: vlan = 0 else: validate_type('vlan', vlan, int) if vlan < 0 or vlan > 4096: raise TRexError('Invalid value for VLAN: %s' % vlan) self.port_id = port_id self.trex_port = trex_port self.sub_if = sub_if self.src_mac = src_mac self.src_ipv4 = src_ipv4 self.src_ipv6 = src_ipv6 self.vlan = vlan def get_data(self, to_server): d = {} d['src_mac'] = self.src_mac d['src_ipv4'] = self.src_ipv4 d['src_ipv6'] = self.src_ipv6 d['vlan'] = self.vlan if to_server: d['trex_port'] = self.trex_port d['sub_if'] = self.sub_if else: d['port_id'] = self.port_id return d def to_code(self): data = self.get_data(False) return "TopoVIF('{port_id}', '{src_mac}', '{src_ipv4}', '{src_ipv6}', {vlan})".format(**data)
[docs]class ASTFTopology(object): ''' Init ASTFTopology from list of TopoVIFs and TopoGWs (default is empty) ''' def __init__(self, vifs = None, gws = None): self.vifs = vifs or [] self.gws = gws or []
[docs] def add_vif_obj(self, vif): ''' Add TopoVIF object ''' validate_type('vif', vif, TopoVIF) self.vifs.append(vif)
[docs] def add_gw_obj(self, gw): ''' Add TopoGW object ''' validate_type('gw', gw, TopoGW) self.gws.append(gw)
[docs] def add_vif(self, *a, **k): ''' | Create (from given arguments) and add TopoVIF object. | Instead of port_id, one may specify trex_port and sub_if - integers, TRex port ID and sub-interface ID respectfully. ''' trex_port = k.get('trex_port') if trex_port is not None: k['port_id'] = '%s.%s' % (trex_port, k['sub_if']) del k['sub_if'] del k['trex_port'] vif = TopoVIF(*a, **k) self.vifs.append(vif)
[docs] def add_gw(self, *a, **k): ''' | Create (from given arguments) and add TopoGW object. | Instead of port_id, one may specify trex_port and sub_if - integers, TRex port ID and sub-interface ID respectfully. ''' trex_port = k.get('trex_port') if trex_port is not None: k['port_id'] = '%s.%s' % (trex_port, k['sub_if']) del k['sub_if'] del k['trex_port'] gw = TopoGW(*a, **k) self.gws.append(gw)
[docs] def is_empty(self): ''' Return True if nothing is added ''' return len(self.gws) + len(self.vifs) == 0
def get_data(self, to_server = True): data = {} data['vifs'] = [vif.get_data(to_server) for vif in self.vifs] data['gws'] = [gw.get_data(to_server) for gw in self.gws] return data
class ASTFTopologyManager(object): def __init__(self, client): self.client = client def is_empty(self): for port in self.client.ports.values(): if not port.topo.is_empty(): return False return True def split_per_port(self, topo): topo_per_port = {} for port_id, port in self.client.ports.items(): topo_per_port[port_id] = ASTFTopology() for vif in topo.vifs: trex_port_id = vif.trex_port trex_port = self.client.ports.get(trex_port_id) if not trex_port: raise TRexError('VIF has port_id %s, which requires TRex interface %s' % (vif.port_id, trex_port_id)) topo_per_port[trex_port_id].add_vif_obj(vif) for gw in topo.gws: trex_port_id = gw.trex_port trex_port = self.client.ports.get(trex_port_id) if not trex_port: raise TRexError('GW has port_id %s requires TRex interface %s' % (gw.port_id, trex_port_id)) topo_per_port[trex_port_id].add_gw_obj(gw) return topo_per_port def warn(self, msg): self.client.logger.warning('WARNING: %s' % msg) def info(self, msg): self.client.logger.info(msg) def validate_topo(self, topo_per_port): start_end_gw = [] for topo in topo_per_port.values(): for gw in topo.gws: start = tuple(map(int, gw.src_start.split('.'))) end = tuple(map(int, gw.src_end.split('.'))) if start > end: raise TRexError('GW src_start: %s is higher than src_end: %s' % (gw.src_start, gw.src_end)) start_end_gw.append((start, end, gw)) start_end_gw.sort(key = lambda t:t[0]) # N*log(N) p_gw = None for start, end, gw in start_end_gw: if p_gw: if start == p_start: raise TRexError('At least two GWs start range with: %s' % (p_gw.src_start)) if p_end >= start: raise TRexError('GW ranges intersect: %s-%s, %s-%s' % (p_gw.src_start, p_gw.src_end, gw.src_start, gw.src_end)) p_start, p_end, p_gw = start, end, gw prom_warnings = {} for port_id, port in self.client.ports.items(): port_topo = topo_per_port.get(port_id) if not port_topo: continue port_attr = port.get_ts_attr() prom_enabled = port_attr['promiscuous']['enabled'] port_src_mac = port_attr['layer_cfg']['ether']['src'] port_has_ipv4 = port_attr['layer_cfg']['ipv4']['state'] != 'none' port_has_ipv6 = port_attr['layer_cfg']['ipv6']['enabled'] vif_ids = {} vif_macs = {} for vif in port_topo.vifs: vif_id = vif.sub_if if vif_id in vif_ids: raise TRexError('Duplicate VIF - %s' % vif.port_id) vif_ids[vif_id] = vif vif_mac = vif.src_mac if vif_mac in vif_macs: raise TRexError('Duplicate VIF MAC: %s' % vif_mac) vif_macs[vif_mac] = vif if not prom_enabled and vif_mac != port_src_mac: prom_warnings[port_id] = 1 for gw in port_topo.gws: gw_sub_if = gw.sub_if if gw_sub_if: port = vif_ids.get(gw_sub_if) if not port: raise TRexError('Invalid port in GW - %s' % gw.port_id) if gw.dst_type == DST_IPv4 and not port.src_ipv4: raise TRexError("VIF %s does not have IPv4 configured, can't set GW %s" % (gw.port_id, gw.dst)) elif gw.dst_type == DST_IPv6 and not port.src_ipv6: raise TRexError("VIF %s does not have IPv6 configured, can't set GW %s" % (gw.port_id, gw.dst)) else: if gw.dst_type == DST_IPv4 and not port_has_ipv4: raise TRexError("Port %s does not have IPv4 configured, can't set GW %s" % (gw.port_id, gw.dst)) elif gw.dst_type == DST_IPv6 and not port_has_ipv6: raise TRexError("Port %s does not have IPv6 configured, can't set GW %s" % (gw.port_id, gw.dst)) if prom_warnings: self.warn('Promiscuous mode must be enabled on port(s) %s for VIFs to work' % list(prom_warnings.keys())) def load(self, topology, **kw): if not self.is_empty(): raise TRexError('Topology is already loaded, clear it first') if not isinstance(topology, ASTFTopology): x = os.path.basename(topology).split('.') suffix = x[1] if len(x) == 2 else topology if suffix == 'py': topology = self.load_py(topology, **kw) elif suffix == 'json': topology = self.load_json(topology) elif suffix == 'yaml': topology = self.load_yaml(topology) else: raise TRexError("Unknown topology file type: '%s'" % suffix) topo_per_port = self.split_per_port(topology) self.validate_topo(topo_per_port) for port_id, port in self.client.ports.items(): port.topo = topo_per_port[port_id] if self.is_empty(): raise TRexError('Loaded topology is empty!') @staticmethod def load_py(python_file, **kw): # check filename if not os.path.isfile(python_file): raise TRexError("File '%s' does not exist" % python_file) basedir = os.path.dirname(python_file) sys.path.insert(0, basedir) try: file = os.path.basename(python_file).split('.')[0] module = __import__(file, globals(), locals(), [], 0) imp.reload(module) # reload the update topo = module.get_topo(**kw) if not isinstance(topo, ASTFTopology): raise TRexError('Loaded topology type is not ASTFTopology') return topo except Exception as e: raise TRexError('Could not load topology: %s' % e) finally: sys.path.pop(0) @staticmethod def from_data(data): gws = data.get('gws') if not gws: gws = [] elif type(gws) is not list: raise TRexError("Type of gws section in JSON must be a list") vifs = data.get('vifs') if not vifs: vifs = [] elif type(vifs) is not list: raise TRexError("Type of vifs section in JSON must be a list") topo = ASTFTopology() for vif_data in vifs: topo.add_vif(**vif_data) for gw_data in gws: topo.add_gw(**gw_data) return topo @staticmethod def load_json(filename): if not os.path.isfile(filename): raise TRexError("File '%s' does not exist" % filename) # read the content with open(filename) as f: file_str = f.read() data = json.loads(file_str) return ASTFTopologyManager.from_data(data) @staticmethod def load_yaml(filename): if not os.path.isfile(filename): raise TRexError("File '%s' does not exist" % filename) # read the content with open(filename) as f: file_str = f.read() data = yaml.load(file_str) return ASTFTopologyManager.from_data(data) def clear(self): for port in self.client.ports.values(): port.topo = ASTFTopology() if not self.client.handler: raise TRexError('Cleared client, but not server (not owned)') params = { 'handler': self.client.handler, } rc = self.client._transmit('topo_clear', params) if not rc: raise TRexError('Could not clear topology: %s' % rc.err()) def resolve(self, ports = None): gw_need_resolve_cnt = 0 services_cnt = 0 unresolved = [] if ports is None: port_ids = self.client.ports.keys() else: port_ids = listify(ports) for port_id in port_ids: self.validate_topo({port_id: self.client.get_port(port_id).topo}) for port_id in port_ids: port = self.client.get_port(port_id) ctx = self.client.create_service_ctx(port_id) port_attr = port.get_ts_attr() ipv4 = port_attr['layer_cfg']['ipv4'] port_vlan = port_attr['vlan']['tags'] vifs = {} for vif in port.topo.vifs: vifs[vif.port_id] = vif service_per_dest = {} for gw in port.topo.gws: if gw.dst_mac: continue gw_need_resolve_cnt += 1 dst = gw.dst if gw.dst_type == DST_IPv4: if gw.sub_if: vif = vifs[gw.port_id] vlan = vif.vlan or None service_key = '%s - %s' % (dst, [vlan]) service = service_per_dest.get(service_key) if service: service.gws.append(gw) continue service = ServiceARP(ctx, dst, vif.src_ipv4, vlan, vif.src_mac, timeout_sec = 1) else: service_key = '%s - %s' % (dst, port_vlan) service = service_per_dest.get(service_key) if service: service.gws.append(gw) continue service = ServiceARP(ctx, dst, ipv4['src'], port_vlan, timeout_sec = 1) service.gws = [gw] service_per_dest[service_key] = service elif gw.dst_type == DST_IPv6: raise TRexError('Not supported') if service_per_dest: services_cnt += len(service_per_dest) services = list(service_per_dest.values()) ctx.run(services) for service in services: record = service.get_record() if record: for gw in service.gws: gw.dst_mac = record.dst_mac else: unresolved.append(service.dst_ip) if unresolved: if len(unresolved) == 1: raise TRexError('Could not resolve GW: %s' % unresolved[0]) raise TRexError('Could not resolve %s GWs, first are: %s' % (len(unresolved), unresolved[:5])) if services_cnt: status = '%s dest(s) resolved for %s GW(s)' % (services_cnt, gw_need_resolve_cnt) else: status = 'No need to resolve anything' if ports is None: self.info(status + ', uploading to server') rc = self.client._upload_fragmented('topo_fragment', self.to_json()) if not rc: raise TRexError('Could not upload topology: %s' % rc.err()) else: self.info(status) def show(self, ports = None): if self.is_empty(): self.info('Topology is empty!') return print(ports) if ports is None: port_ids = self.client.ports.keys() else: port_ids = listify(ports) print(port_ids) sorted_ports = [self.client.ports[port_id] for port_id in sorted(port_ids)] sorted_topo = [port.topo for port in sorted_ports] vifs_table = TRexTextTable('Virtual interfaces') vifs_table.set_cols_align(['c'] * 5) vifs_table.set_cols_dtype(['t'] * 5) vifs_table.header(['Port', 'MAC', 'VLAN', 'IPv4', 'IPv6']) max_ipv6_len = 5 max_port_id = 4 for topo in sorted_topo: if not topo.vifs: continue vifs_dict = dict([(vif.port_id, index) for index, vif in enumerate(topo.vifs)]) for vif_id in sorted(vifs_dict.keys()): vif = topo.vifs[vifs_dict[vif_id]] vlan = vif.vlan or '-' ipv4 = vif.src_ipv4 or '-' ipv6 = vif.src_ipv6 or '-' max_ipv6_len = max(max_ipv6_len, len(ipv6)) max_port_id = max(max_port_id, len(vif_id)) vifs_table.add_row([vif_id, vif.src_mac, vlan, ipv4, ipv6]) vifs_table.set_cols_width([max_port_id, 17, 4, 15, max_ipv6_len]) print_table_with_header(vifs_table, untouched_header = vifs_table.title, buffer = sys.stdout) gws_table = TRexTextTable('Gateways for traffic') gws_table.set_cols_align(['c'] * 5) gws_table.set_cols_dtype(['t'] * 5) gws_table.header(['Port', 'Range start', 'Range end', 'Dest', 'Resolved']) max_dst_len = 5 max_res_len = 8 for topo in sorted_topo: if topo.gws: gws_dict = dict([(ip2int(gw.src_start), index) for index, gw in enumerate(topo.gws)]) for gw_src_int in sorted(gws_dict.keys()): gw = topo.gws[gws_dict[gw_src_int]] dst = gw.dst dst_mac = gw.dst_mac or '-' max_dst_len = max(max_dst_len, len(dst)) max_res_len = max(max_res_len, len(dst_mac)) gws_table.add_row([gw.port_id, gw.src_start, gw.src_end, dst, dst_mac]) gws_table.set_cols_width([5, 15, 15, max_dst_len, max_res_len]) print_table_with_header(gws_table, untouched_header = gws_table.title, buffer = sys.stdout) def get_merged_data(self, to_server = True): gws = [] vifs = [] for port in self.client.ports.values(): gws.extend([gw.get_data(to_server) for gw in port.topo.gws]) vifs.extend([vif.get_data(to_server) for vif in port.topo.vifs]) return {'gws': gws, 'vifs': vifs} def to_json(self, to_server = True): topo = self.get_merged_data(to_server) if to_server: return json.dumps(topo, sort_keys = True) return json.dumps(topo, indent=4, separators=(',', ': '), sort_keys = True) def to_yaml(self): topo = self.get_merged_data() return yaml.dump(topo, default_flow_style=False) def to_code(self): code = '''# !!! Auto-generated code !!!\n\nfrom trex.astf.topo import *\n\ndef get_topo():\n''' gws = [] vifs = [] for port in self.client.ports.values(): gws.extend([gw.to_code() for gw in port.topo.gws]) vifs.extend([vif.to_code() for vif in port.topo.vifs]) code += ' vifs = [\n' for vif in vifs: code += ' %s,\n' % vif code += ' ]\n' code += ' gws = [\n' for gw in gws: code += ' %s,\n' % gw code += ' ]\n' code += '\n return ASTFTopology(vifs = vifs, gws = gws)\n' return code def sync_with_server(self): rc = self.client._transmit('topo_get') if not rc: raise TRexError('Could not get topology from server: %s' % rc.err()) topo_data = rc.data().get('topo_data') if topo_data is None: raise TRexError('Server response is expected to have "topo_data" key') topology = self.from_data(topo_data) topo_per_port = self.split_per_port(topology) self.validate_topo(topo_per_port) for port_id, port in self.client.ports.items(): port.topo = topo_per_port[port_id]