Source code for trex.astf.trex_astf_client

from __future__ import print_function
import hashlib
import sys
import time
import os
import shlex

from ..utils.common import get_current_user, user_input, PassiveTimer
from ..utils import parsing_opts, text_tables

from ..common.trex_api_annotators import client_api, console_api
from ..common.trex_client import TRexClient, NO_TCP_UDP_MASK
from ..common.trex_events import Event
from ..common.trex_exceptions import TRexError, TRexTimeoutError
from ..common.trex_types import *
from ..common.trex_types import DEFAULT_PROFILE_ID, ALL_PROFILE_ID

from .trex_astf_port import ASTFPort
from .trex_astf_profile import ASTFProfile
from .topo import ASTFTopologyManager
from .stats.traffic import CAstfTrafficStats
from .stats.latency import CAstfLatencyStats
from ..utils.common import  is_valid_ipv4, is_valid_ipv6
from ..utils.text_opts import format_text
from ..astf.trex_astf_exceptions import ASTFErrorBadTG

astf_states = [
    'STATE_IDLE',
    'STATE_ASTF_LOADED',
    'STATE_ASTF_PARSE',
    'STATE_ASTF_BUILD',
    'STATE_TX',
    'STATE_ASTF_CLEANUP',
    'STATE_ASTF_DELETE']

class TunnelType:
      NONE = 0
      GTP  = 1
      

[docs]class ASTFClient(TRexClient): port_states = [getattr(ASTFPort, state, 0) for state in astf_states] def __init__(self, username = get_current_user(), server = "localhost", sync_port = 4501, async_port = 4500, verbose_level = "error", logger = None, sync_timeout = None, async_timeout = None): """ TRex advance stateful client :parameters: username : string the user name, for example imarom server : string the server name or ip sync_port : int the RPC port async_port : int the ASYNC port (subscriber port) verbose_level: str one of "none", "critical", "error", "info", "debug" logger: instance of AbstractLogger if None, will use ScreenLogger sync_timeout: int time in sec for timeout for RPC commands. for local lab keep it as default (3 sec) higher number would be more resilient for Firewalls but slower to identify real server crash async_timeout: int time in sec for timeout for async notification. for local lab keep it as default (3 sec) higher number would be more resilient for Firewalls but slower to identify real server crash """ api_ver = {'name': 'ASTF', 'major': 2, 'minor': 1} TRexClient.__init__(self, api_ver, username, server, sync_port, async_port, verbose_level, logger, sync_timeout, async_timeout) self.handler = '' self.traffic_stats = CAstfTrafficStats(self.conn.rpc) self.latency_stats = CAstfLatencyStats(self.conn.rpc) self.topo_mngr = ASTFTopologyManager(self) self.sync_waiting = False self.last_error = '' self.last_profile_error = {} self.epoch = None self.state = None for index, state in enumerate(astf_states): setattr(self, state, index) self.transient_states = [ self.STATE_ASTF_PARSE, self.STATE_ASTF_BUILD, self.STATE_ASTF_CLEANUP, self.STATE_ASTF_DELETE] self.astf_profile_state = {'_': 0} def get_mode(self): return "ASTF" ############################ called ############################# ############################ by base ############################# ############################ TRex Client ############################# def _on_connect(self): self.sync_waiting = False self.last_error = '' self.sync() self.topo_mngr.sync_with_server() return RC_OK() def _on_connect_create_ports(self, system_info): """ called when connecting to the server triggered by the common client object """ # create ports port_map = {} for port_info in system_info['ports']: port_id = port_info['index'] port_map[port_id] = ASTFPort(self.ctx, port_id, self.conn.rpc, port_info) return self._assign_ports(port_map) def _on_connect_clear_stats(self): self.traffic_stats.reset() self.latency_stats.reset() with self.ctx.logger.suppress(verbose = "warning"): self.clear_stats(ports = self.get_all_ports(), clear_xstats = False, clear_traffic = False) return RC_OK() def _on_astf_state_chg(self, ctx_state, error, epoch): if ctx_state < 0 or ctx_state >= len(astf_states): raise TRexError('Unhandled ASTF state: %s' % ctx_state) if epoch is None or self.epoch is None: return self.last_error = error if error and not self.sync_waiting: self.ctx.logger.error('Last command failed: %s' % error) self.state = ctx_state port_state = self.apply_port_states() port_state_name = ASTFPort.STATES_MAP[port_state].capitalize() if error: return Event('server', 'error', 'Moved to state: %s after error: %s' % (port_state_name, error)) else: return Event('server', 'info', 'Moved to state: %s' % port_state_name) def _on_astf_profile_state_chg(self, profile_id, ctx_state, error, epoch): if ctx_state < 0 or ctx_state >= len(astf_states): raise TRexError('Unhandled ASTF state: %s' % ctx_state) if epoch is None or self.epoch is None: return if error: self.last_profile_error[profile_id] = error if not self.sync_waiting: self.ctx.logger.error('Last profile %s command failed: %s' % (profile_id, error)) # update profile state self.astf_profile_state[profile_id] = ctx_state if error: return Event('server', 'error', 'Moved to profile %s state: %s after error: %s' % (profile_id, ctx_state, error)) else: return Event('server', 'info', 'Moved to profile %s state: %s' % (profile_id, ctx_state)) def _on_astf_profile_cleared(self, profile_id, error, epoch): if epoch is None or self.epoch is None: return if error: self.last_profile_error[profile_id] = error if not self.sync_waiting: self.ctx.logger.error('Last profile %s command failed: %s' % (profile_id, error)) # remove profile and template group name self.astf_profile_state.pop(profile_id, None) self.traffic_stats._clear_tg_name(profile_id) if error: return Event('server', 'error', 'Can\'t remove profile %s after error: %s' % (profile_id, error)) else: return Event('server', 'info', 'Removed profile : %s' % profile_id) ############################ helper ############################# ############################ funcs ############################# ############################ ############################# # Check console API ports argument def validate_profile_id_input(self, pid_input = DEFAULT_PROFILE_ID, start = False): valid_pids = [] ok_states = [self.STATE_IDLE, self.STATE_ASTF_LOADED] # check profile ID's type if type(pid_input) is not list: profile_list = pid_input.split() else: profile_list = pid_input if ALL_PROFILE_ID in profile_list: if start == True: raise TRexError("Cannot have %s as a profile value for start command" % ALL_PROFILE_ID) else: self.sync() # return profiles can be operational only for the requests. # STATE_IDLE is operational for 'profile_clear.' return [pid for pid, state in self.astf_profile_state.items() if state is not self.STATE_ASTF_DELETE] for profile_id in profile_list: if profile_id not in list(self.astf_profile_state.keys()): self.sync() break # Check if profile_id is a valid profile name for profile_id in profile_list: if profile_id not in list(self.astf_profile_state.keys()): if start == True: self.astf_profile_state[profile_id] = self.STATE_IDLE else: raise TRexError("ASTF profile_id %s does not exist." % profile_id) if start == True: if self.is_dynamic and self.astf_profile_state.get(profile_id) not in ok_states: raise TRexError("%s state:Transmitting, should be one of following:Idle, Loaded profile" % profile_id) if profile_id not in valid_pids: valid_pids.append(profile_id) return valid_pids def apply_port_states(self): port_state = self.port_states[self.state] for port in self.ports.values(): port.state = port_state return port_state def wait_for_steady(self, profile_id=None): timer = PassiveTimer() while True: state = self._get_profile_state(profile_id) if profile_id else self.state if state not in self.transient_states: break if timer.has_elapsed(0.1): self.sync() else: time.sleep(0.001) def wait_for_profile_state(self, profile_id, wait_state, timeout = None): timer = PassiveTimer(timeout) while self._get_profile_state(profile_id) != wait_state: if timer.has_elapsed(0.1): self.sync() else: time.sleep(0.001) if timer.has_expired(): raise TRexTimeoutError(timeout) def inc_epoch(self): rc = self._transmit('inc_epoch', {'handler': self.handler}) if not rc: raise TRexError(rc.err()) self.sync() def _set_profile_state(self, profile_id, state): self.astf_profile_state[profile_id] = state def _get_profile_state(self, profile_id): return self.astf_profile_state.get(profile_id, self.STATE_IDLE) if self.is_dynamic else self.state def _transmit_async(self, rpc_func, ok_states, bad_states = None, ready_state = None, **k): profile_id = k['params']['profile_id'] ok_states = listify(ok_states) if bad_states is not None: bad_states = listify(bad_states) self.wait_for_steady() if rpc_func == 'start' and self.state is not self.STATE_TX: self.inc_epoch() self.sync_waiting = True try: if ready_state: assert ready_state not in self.transient_states if self._get_profile_state(profile_id) != ready_state: self.wait_for_profile_state(profile_id, ready_state) else: self.wait_for_steady(profile_id) rc = self._transmit(rpc_func, **k) if not rc: return rc timer = PassiveTimer() while True: state = self._get_profile_state(profile_id) if state in ok_states: return RC_OK() # check transient state transition first to avoid wrong decision (e.g. 'start') if ready_state and state in self.transient_states: ready_state = None if self.last_profile_error.get(profile_id) or (not ready_state and bad_states and state in bad_states): error = self.last_profile_error.pop(profile_id, None) general_error = 'Unknown error, state: {}, profile: {}'.format(state, profile_id) return RC_ERR(error or general_error) if timer.has_elapsed(0.2): self.sync() # in case state change lost in async(SUB/PUB) channel else: time.sleep(0.001) finally: self.sync_waiting = False def check_states(self, ok_states): cnt = 0 while True: if self.state in ok_states: break cnt = cnt + 1 if cnt % 10 == 0: self.sync() else: time.sleep(0.1) # 100ms self.sync() # guarantee to update profile states def _is_service_req(self): ''' Return False as service mode check is not required in ASTF ''' return False ############################ ASTF ############################# ############################ API ############################# ############################ ############################# @client_api('command', True)
[docs] def reset(self, restart = False): """ Force acquire ports, stop the traffic, remove loaded traffic and clear stats :parameters: restart: bool Restart the NICs (link down / up) :raises: + :exc:`TRexError` """ ports = self.get_all_ports() if restart: self.ctx.logger.pre_cmd("Hard resetting ports {0}:".format(ports)) else: self.ctx.logger.pre_cmd("Resetting ports {0}:".format(ports)) try: with self.ctx.logger.suppress(): # force take the port and ignore any streams on it self.acquire(force = True) self.stop(False, pid_input=ALL_PROFILE_ID) self.check_states(ok_states=[self.STATE_ASTF_LOADED, self.STATE_IDLE]) self.stop_latency() self.traffic_stats.reset() self.latency_stats.reset() self.clear_profile(False, pid_input=ALL_PROFILE_ID) self.check_states(ok_states=[self.STATE_IDLE]) self.clear_stats(ports, pid_input = ALL_PROFILE_ID) self.set_port_attr(ports, promiscuous = False if self.any_port.is_prom_supported() else None, link_up = True if restart else None) self.remove_rx_queue(ports) self.remove_all_captures() self._for_each_port('stop_capture_port', ports) self.ctx.logger.post_cmd(RC_OK()) except TRexError as e: self.ctx.logger.post_cmd(False) raise
@client_api('command', True)
[docs] def acquire(self, force = False): """ Acquires ports for executing commands :parameters: force : bool Force acquire the ports. :raises: + :exc:`TRexError` """ ports = self.get_all_ports() if force: self.ctx.logger.pre_cmd('Force acquiring ports %s:' % ports) else: self.ctx.logger.pre_cmd('Acquiring ports %s:' % ports) params = {'force': force, 'user': self.ctx.username, 'session_id': self.ctx.session_id} rc = self._transmit('acquire', params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError('Could not acquire context: %s' % rc.err()) self.handler = rc.data()['handler'] for port_id, port_rc in rc.data()['ports'].items(): self.ports[int(port_id)]._set_handler(port_rc) self._post_acquire_common(ports)
@client_api('command', True) def sync(self): self.epoch = None params = {'profile_id': "sync"} rc = self._transmit('sync', params) if not rc: raise TRexError(rc.err()) self.state = rc.data()['state'] self.apply_port_states() if self.is_dynamic: self.astf_profile_state = rc.data()['state_profile'] else: self.astf_profile_state[DEFAULT_PROFILE_ID] = self.state self.epoch = rc.data()['epoch'] return self.astf_profile_state @client_api('command', True)
[docs] def release(self, force = False): """ Release ports :parameters: none :raises: + :exc:`TRexError` """ ports = self.get_acquired_ports() self.ctx.logger.pre_cmd("Releasing ports {0}:".format(ports)) params = {'handler': self.handler} rc = self._transmit('release', params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError('Could not release context: %s' % rc.err()) self.handler = '' for port_id in ports: self.ports[port_id]._clear_handler()
def _upload_fragmented(self, rpc_cmd, upload_string, pid_input = DEFAULT_PROFILE_ID): index_start = 0 fragment_length = 1000 # first fragment is small, we compare hash before sending the rest while len(upload_string) > index_start: index_end = index_start + fragment_length params = { 'handler': self.handler, 'profile_id' : pid_input, 'fragment': upload_string[index_start:index_end], } if index_start == 0: params['frag_first'] = True if index_end >= len(upload_string): params['frag_last'] = True if params.get('frag_first') and not params.get('frag_last'): params['md5'] = hashlib.md5(upload_string.encode()).hexdigest() rc = self._transmit(rpc_cmd, params = params) if not rc: return rc if params.get('frag_first') and not params.get('frag_last'): if rc.data() and rc.data().get('matches_loaded'): break index_start = index_end fragment_length = 500000 # rest of fragments are larger return RC_OK() @client_api('command', True)
[docs] def set_service_mode (self, ports = None, enabled = True, filtered = False, mask = None): ''' based on :meth:`trex.astf.trex_astf_client.ASTFClient.set_service_mode_base` ''' # call the base method self.set_service_mode_base(ports = ports, enabled = enabled, filtered = filtered, mask = mask) # in ASTF send to all ports with the handler of the ctx params = {"handler": self.handler, "enabled": enabled, "filtered": filtered} if filtered: params['mask'] = mask # transmit server once for all the ports rc = self._transmit('service', params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc) else: # sending all ports in order to change their attributes self._for_each_port('set_service_mode', None, enabled, filtered, mask)
@client_api('command', True)
[docs] def load_profile(self, profile, tunables = {}, pid_input = DEFAULT_PROFILE_ID): """ Upload ASTF profile to server :parameters: profile: string or ASTFProfile Path to profile filename or profile object tunables: dict forward those key-value pairs to the profile file pid_input: string Input profile ID :raises: + :exc:`TRexError` """ if not isinstance(profile, ASTFProfile): try: profile = ASTFProfile.load(profile, **tunables) except Exception as e: self.astf_profile_state.pop(pid_input, None) raise TRexError('Could not load profile: %s' % e) #when ".. -t --help", is called then return if profile is None: return profile_json = profile.to_json_str(pretty = False, sort_keys = True) self.ctx.logger.pre_cmd('Loading traffic at acquired ports.') rc = self._upload_fragmented('profile_fragment', profile_json, pid_input = pid_input) if not rc: self.ctx.logger.post_cmd(False) raise TRexError('Could not load profile, error: %s' % rc.err()) self.ctx.logger.post_cmd(True)
@client_api('command', False)
[docs] def get_traffic_distribution(self, start_ip, end_ip, dual_ip, seq_split): ''' Get distribution of IP range per TRex port per core :parameters: start_ip: IP string Related to "ip_range" argument of ASTFIPGenDist end_ip: IP string Related to "ip_range" argument of ASTFIPGenDist dual_ip: IP string Related to "ip_offset" argument of ASTFIPGenGlobal seq_split: bool Related to "per_core_distribution" argument of ASTFIPGenDist, "seq" => seq_split=True ''' if not is_valid_ipv4(start_ip): raise TRexError("start_ip is not a valid IPv4 address: '%s'" % start_ip) if not is_valid_ipv4(end_ip): raise TRexError("end_ip is not a valid IPv4 address: '%s'" % end_ip) if not is_valid_ipv4(dual_ip): raise TRexError("dual_ip is not a valid IPv4 address: '%s'" % dual_ip) params = { 'start_ip': start_ip, 'end_ip': end_ip, 'dual_ip': dual_ip, 'seq_split': seq_split, } rc = self._transmit('get_traffic_dist', params = params) if not rc: raise TRexError(rc.err()) res = {} for port_id, port_data in rc.data().items(): core_dict = {} for core_id, core_data in port_data.items(): core_dict[int(core_id)] = core_data res[int(port_id)] = core_dict return res
@client_api('command', True)
[docs] def clear_profile(self, block = True, pid_input = DEFAULT_PROFILE_ID): """ Clear loaded profile :parameters: pid_input: string Input profile ID :raises: + :exc:`TRexError` """ ok_states = [self.STATE_IDLE, self.STATE_ASTF_LOADED] valid_pids = self.validate_profile_id_input(pid_input) for profile_id in valid_pids: profile_state = self.astf_profile_state.get(profile_id) if profile_state in ok_states: params = { 'handler': self.handler, 'profile_id': profile_id } self.ctx.logger.pre_cmd('Clearing loaded profile.') if block: rc = self._transmit_async('profile_clear', params = params, ok_states = self.STATE_IDLE) else: rc = self._transmit('profile_clear', params = params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc.err()) else: self.logger.info(format_text("Cannot remove a profile: %s is not state IDLE and state LOADED.\n" % profile_id, "bold", "magenta"))
@client_api('command', True)
[docs] def start(self, mult = 1, duration = -1, nc = False, block = True, latency_pps = 0, ipv6 = False, pid_input = DEFAULT_PROFILE_ID, client_mask = 0xffffffff): """ Start the traffic on loaded profile. Procedure is async. :parameters: mult: int Multiply total CPS of profile by this value. duration: float Start new flows for this duration. Negative value means infinite nc: bool Do not wait for flows to close at end of duration. block: bool Wait for traffic to be started (operation is async). latency_pps: uint32_t Rate of latency packets. Zero value means disable. ipv6: bool Convert traffic to IPv6. client_mask: uint32_t Bitmask of enabled client ports. pid_input: string Input profile ID :raises: + :exc:`TRexError` """ params = { 'handler': self.handler, 'profile_id': pid_input, 'mult': mult, 'nc': nc, 'duration': duration, 'latency_pps': latency_pps, 'ipv6': ipv6, 'client_mask': client_mask, } self.ctx.logger.pre_cmd('Starting traffic.') valid_pids = self.validate_profile_id_input(pid_input, start = True) for profile_id in valid_pids: if block: rc = self._transmit_async('start', params = params, ok_states = self.STATE_TX, bad_states = self.STATE_ASTF_LOADED, ready_state = self.STATE_ASTF_LOADED) else: rc = self._transmit('start', params = params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc.err())
@client_api('command', True)
[docs] def stop(self, block = True, pid_input = DEFAULT_PROFILE_ID, is_remove = False): """ Stop the traffic. :parameters: block: bool Wait for traffic to be stopped (operation is async) Default is True pid_input: string Input profile ID is_remove: bool Remove the profile id Default is False :raises: + :exc:`TRexError` """ valid_pids = self.validate_profile_id_input(pid_input) for profile_id in valid_pids: profile_state = self.astf_profile_state.get(profile_id) # 'stop' will be silently ignored in server-side PARSE/BUILD state. # So, TX state should be forced to avoid unexpected hanging situation. if profile_state in {self.STATE_ASTF_PARSE, self.STATE_ASTF_BUILD}: self.wait_for_profile_state(profile_id, self.STATE_TX) profile_state = self.astf_profile_state.get(profile_id) if profile_state is self.STATE_TX: params = { 'handler': self.handler, 'profile_id': profile_id } self.ctx.logger.pre_cmd('Stopping traffic.') if block or is_remove: rc = self._transmit_async('stop', params = params, ok_states = [self.STATE_IDLE, self.STATE_ASTF_LOADED]) else: rc = self._transmit('stop', params = params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc.err()) profile_state = self.astf_profile_state.get(profile_id) if is_remove: if profile_state is self.STATE_ASTF_CLEANUP: self.wait_for_profile_state(profile_id, self.STATE_ASTF_LOADED) self.clear_profile(block = block, pid_input = profile_id)
@client_api('command', True)
[docs] def update(self, mult, pid_input = DEFAULT_PROFILE_ID): """ Update the rate of running traffic. :parameters: mult: int Multiply total CPS of profile by this value (not relative to current running rate) Default is 1 pid_input: string Input profile ID :raises: + :exc:`TRexError` """ valid_pids = self.validate_profile_id_input(pid_input) for profile_id in valid_pids: params = { 'handler': self.handler, 'profile_id': profile_id, 'mult': mult, } self.ctx.logger.pre_cmd('Updating traffic.') rc = self._transmit('update', params = params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc.err())
@client_api('command', True)
[docs] def get_profiles(self): """ Get profile list from Server. """ params = { 'handler': self.handler, } self.ctx.logger.pre_cmd('Getting profile list.') rc = self._transmit('get_profile_list', params = params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc.err())
@client_api('command', True)
[docs] def wait_on_traffic(self, timeout = None, profile_id = None): """ Block until traffic stops :parameters: timeout: int Timeout in seconds Default is blocking profile_id: string Profile ID :raises: + :exc:`TRexTimeoutError` - in case timeout has expired + :exc:`TRexError` """ if profile_id is None: ports = self.get_all_ports() TRexClient.wait_on_traffic(self, ports, timeout) else: self.wait_for_profile_state(profile_id, self.STATE_ASTF_LOADED, timeout) # get stats
@client_api('getter', True)
[docs] def get_stats(self, ports = None, sync_now = True, skip_zero = True, pid_input = DEFAULT_PROFILE_ID, is_sum = False): """ Gets all statistics on given ports, traffic and latency. :parameters: ports: list sync_now: boolean skip_zero: boolean pid_input: string Input profile ID is_sum: boolean Get total counter values """ stats = self._get_stats_common(ports, sync_now) stats['traffic'] = self.get_traffic_stats(skip_zero, pid_input, is_sum = is_sum) stats['latency'] = self.get_latency_stats(skip_zero) return stats # clear stats
@client_api('getter', True)
[docs] def clear_stats(self, ports = None, clear_global = True, clear_xstats = True, clear_traffic = True, pid_input = DEFAULT_PROFILE_ID): """ Clears statistics in given ports. :parameters: ports: list clear_global: boolean clear_xstats: boolean clear_traffic: boolean pid_input: string Input profile ID """ valid_pids = self.validate_profile_id_input(pid_input) for profile_id in valid_pids: if clear_traffic: self.clear_traffic_stats(profile_id) self.clear_traffic_stats(is_sum = True) return self._clear_stats_common(ports, clear_global, clear_xstats)
@client_api('getter', True)
[docs] def get_tg_names(self, pid_input = DEFAULT_PROFILE_ID): """ Returns a list of the names of all template groups defined in the current profile. :parameters: pid_input: string Input profile ID :raises: + :exc:`TRexError` """ return self.traffic_stats.get_tg_names(pid_input)
@client_api('getter', True)
[docs] def get_traffic_tg_stats(self, tg_names, skip_zero=True, pid_input = DEFAULT_PROFILE_ID): """ Returns the traffic statistics for the template groups specified in tg_names. :parameters: tg_names: list or string Contains the names of the template groups for which we want to get traffic statistics. skip_zero: boolean pid_input: string Input profile ID :raises: + :exc:`TRexError` + :exc:`ASTFErrorBadTG` Can be thrown if tg_names is empty or contains a invalid name. """ validate_type('tg_names', tg_names, (list, basestring)) return self.traffic_stats.get_traffic_tg_stats(tg_names, skip_zero, pid_input = pid_input)
@client_api('getter', True)
[docs] def get_traffic_stats(self, skip_zero = True, pid_input = DEFAULT_PROFILE_ID, is_sum = False): """ Returns aggregated traffic statistics. :parameters: skip_zero: boolean pid_input: string Input profile ID is_sum: boolean Get total counter values """ return self.traffic_stats.get_stats(skip_zero, pid_input = pid_input, is_sum = is_sum)
@client_api('getter', True)
[docs] def get_profiles_state(self): """ Gets an dictionary with the states of all the profiles. :returns: Dictionary containing profiles and their states. Keys are strings, `pid` (profile ID). Each profile can be in one of the following states: ['STATE_IDLE', 'STATE_ASTF_LOADED', 'STATE_ASTF_PARSE', 'STATE_ASTF_BUILD', 'STATE_TX', 'STATE_ASTF_CLEANUP', 'STATE_ASTF_DELETE', 'STATE_UNKNOWN']. """ states = {} for key, value in self.astf_profile_state.items(): states[key] = astf_states[value] if value in range(len(astf_states)) else "STATE_UNKNOWN" return states
@client_api('getter', True)
[docs] def is_traffic_stats_error(self, stats): ''' Return Tuple if there is an error and what is the error (Bool,Errors) :parameters: stats: dict from get_traffic_stats output ''' return self.traffic_stats.is_traffic_stats_error(stats)
@client_api('getter', True)
[docs] def clear_traffic_stats(self, pid_input = DEFAULT_PROFILE_ID, is_sum = False): """ Clears traffic statistics. :parameters: pid_input: string Input profile ID """ return self.traffic_stats.clear_stats(pid_input, is_sum)
@client_api('getter', True)
[docs] def get_latency_stats(self,skip_zero =True): """ Gets latency statistics. :parameters: skip_zero: boolean """ return self.latency_stats.get_stats(skip_zero)
@client_api('command', True)
[docs] def start_latency(self, mult = 1, src_ipv4="16.0.0.1", dst_ipv4="48.0.0.1", ports_mask=0x7fffffff, dual_ipv4 = "1.0.0.0"): ''' Start ICMP latency traffic. :parameters: mult: float number of packets per second src_ipv4: IP string IPv4 source address for the port dst_ipv4: IP string IPv4 destination address ports_mask: uint32_t bitmask of ports dual_ipv4: IP string IPv4 address to be added for each pair of ports (starting from second pair) .. note:: VLAN will be taken from interface configuration :raises: + :exc:`TRexError` ''' if not is_valid_ipv4(src_ipv4): raise TRexError("src_ipv4 is not a valid IPv4 address: '{0}'".format(src_ipv4)) if not is_valid_ipv4(dst_ipv4): raise TRexError("dst_ipv4 is not a valid IPv4 address: '{0}'".format(dst_ipv4)) if not is_valid_ipv4(dual_ipv4): raise TRexError("dual_ipv4 is not a valid IPv4 address: '{0}'".format(dual_ipv4)) params = { 'handler': self.handler, 'mult': mult, 'src_addr': src_ipv4, 'dst_addr': dst_ipv4, 'dual_port_addr': dual_ipv4, 'mask': ports_mask, } self.ctx.logger.pre_cmd('Starting latency traffic.') rc = self._transmit("start_latency", params = params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc.err())
@client_api('command', True)
[docs] def stop_latency(self): ''' Stop latency traffic. ''' params = { 'handler': self.handler } self.ctx.logger.pre_cmd('Stopping latency traffic.') rc = self._transmit("stop_latency", params = params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc.err())
@client_api('command', True)
[docs] def update_latency(self, mult = 1): ''' Update rate of latency traffic. :parameters: mult: float number of packets per second :raises: + :exc:`TRexError` ''' params = { 'handler': self.handler, 'mult': mult, } self.ctx.logger.pre_cmd('Updating latency rate.') rc = self._transmit("update_latency", params = params) self.ctx.logger.post_cmd(rc) if not rc: raise TRexError(rc.err())
@client_api('command', True)
[docs] def topo_load(self, topology, tunables = {}): ''' Load network topology :parameters: topology: string or ASTFTopology | Path to topology filename or topology object | Supported file formats: | * JSON | * YAML | * Python tunables: dict forward those key-value pairs to the topology Python file :raises: + :exc:`TRexError` ''' self.topo_mngr.load(topology, **tunables) print('')
@client_api('command', True)
[docs] def topo_clear(self): ''' Clear network topology ''' self.topo_mngr.clear()
@client_api('command', True)
[docs] def topo_resolve(self, ports = None): ''' Resolve current network topology. On success, upload to server ''' self.topo_mngr.resolve(ports)
@client_api('command', False)
[docs] def topo_show(self, ports = None): ''' Show current network topology status ''' self.topo_mngr.show(ports) print('')
@client_api('command', False)
[docs] def topo_save(self, filename): ''' Save current topology to file :parameters: filename: string | Path to topology filename, supported formats: | * JSON | * YAML | * Python ''' if os.path.exists(filename): if os.path.islink(filename) or not os.path.isfile(filename): raise TRexError("Given path exists and it's not a file!") sys.stdout.write('\nFilename %s already exists, overwrite? (y/N) ' % filename) ans = user_input().strip() if ans.lower() not in ('y', 'yes'): print('Not saving.') return try: if filename.endswith('.json'): self.ctx.logger.pre_cmd('Saving topology to JSON: %s' % filename) code = self.topo_mngr.to_json(False) elif filename.endswith('.yaml'): self.ctx.logger.pre_cmd('Saving topology to YAML: %s' % filename) code = self.topo_mngr.to_yaml() elif filename.endswith('.py'): self.ctx.logger.pre_cmd('Saving topology to Python script: %s' % filename) code = self.topo_mngr.to_code() else: self.ctx.logger.error('Saved filename should be .py or .json or .yaml') return with open(filename, 'w') as f: f.write(code) except Exception as e: self.ctx.logger.post_cmd(False) raise TRexError('Saving file failed: %s' % e) self.ctx.logger.post_cmd(True) # private function to form json data for GTP tunnel
def _update_gtp_tunnel(self, client_list): json_attr = [] for key, value in client_list.items(): json_attr.append({'client_ip' : key, 'sip': value.sip, 'dip' : value.dip, 'teid' : value.teid, "version" :value.version}) return json_attr # execute 'method' for inserting/updateing tunnel info for clients def update_tunnel_client_record (self, client_list, tunnel_type): json_attr = [] if tunnel_type == TunnelType.GTP: json_attr = self._update_gtp_tunnel(client_list) else: raise TRexError('Invalid Tunnel Type: %d' % tunnel_type) params = {"tunnel_type": tunnel_type, "attr": json_attr } return self._transmit("update_tunnel_client", params) # execute 'method' for Making a client active/inactive
[docs] def set_client_enable(self, client_list, is_enable): ''' Version: 1 API to toggle state of client Input: List of clients and Action : state flag ''' json_attr = [] for key in client_list: json_attr.append({'client_ip' : key}) params = {"is_enable": is_enable, "is_range": False, "attr": json_attr } return self._transmit("enable_disable_client", params) # execute 'method' for Making a client active/inactive
[docs] def set_client_enable_range(self, client_start, client_end, is_enable): ''' Version: 2 API to toggle state of client Input: Client range and Action : state flag ''' json_attr = [] json_attr.append({'client_start_ip' : client_start, 'client_end_ip' : client_end}) params = {"is_enable": is_enable, "is_range": True, "attr": json_attr } return self._transmit("enable_disable_client", params) # execute 'method' for getting clients stats
[docs] def get_clients_info (self, client_list): ''' Version 1 API to get client information: Currently only state and if client is present. Input: List of clients ''' json_attr = [] for key in client_list: json_attr.append({'client_ip' : key}) params = {"is_range": False, "attr": json_attr } return self._transmit("get_clients_info", params) # execute 'method' for getting clients stats
[docs] def get_clients_info_range (self, client_start, client_end): ''' Version 2 API to get client information: Currently only state and if client is present. Input: Client range ''' json_attr = [] json_attr.append({'client_start_ip' : client_start, 'client_end_ip' : client_end}) params = {"is_range": True, "attr": json_attr } return self._transmit("get_clients_info", params) ############################ console ############################# ############################ commands ############################# ############################ #############################
@console_api('acquire', 'common', True)
[docs] def acquire_line (self, line): '''Acquire ports\n''' # define a parser parser = parsing_opts.gen_parser( self, 'acquire', self.acquire_line.__doc__, parsing_opts.FORCE) opts = parser.parse_args(shlex.split(line)) self.acquire(force = opts.force) return True
@console_api('reset', 'common', True)
[docs] def reset_line(self, line): '''Reset ports''' parser = parsing_opts.gen_parser( self, 'reset', self.reset_line.__doc__, parsing_opts.PORT_RESTART ) opts = parser.parse_args(shlex.split(line)) self.reset(restart = opts.restart) return True
@console_api('start', 'ASTF', True)
[docs] def start_line(self, line): '''Start traffic command''' # parse tunables with the previous form. (-t var1=x1,var2=x2..) def parse_tunables_old_version(tunables_parameters): parser = parsing_opts.gen_parser(self, "start", self.start_line.__doc__, parsing_opts.TUNABLES) args = parser.parse_args(tunables_parameters.split()) return args.tunables # parser for parsing the start command arguments parser = parsing_opts.gen_parser(self, 'start', self.start_line.__doc__, parsing_opts.FILE_PATH, parsing_opts.MULTIPLIER_NUM, parsing_opts.DURATION, parsing_opts.ARGPARSE_TUNABLES, parsing_opts.ASTF_NC, parsing_opts.ASTF_LATENCY, parsing_opts.ASTF_IPV6, parsing_opts.ASTF_CLIENT_CTRL, parsing_opts.ASTF_PROFILE_LIST ) opts = parser.parse_args(shlex.split(line)) help_flags = ('-h', '--help') # if the user chose to pass the tunables arguments in previous version (-t var1=x1,var2=x2..) # we decode the tunables and then convert the output from dictionary to list in order to have the same format with the # newer version. tunable_dict = {} if "-t" in line and '=' in line: tunable_parameter = "-t " + line.split("-t")[1].strip("-h").strip("--help").strip() tunable_dict = parse_tunables_old_version(tunable_parameter) tunable_list = [] # converting from tunables dictionary to list for tunable_key in tunable_dict: tunable_list.extend(["--{}".format(tunable_key), str(tunable_dict[tunable_key])]) if any(h in opts.tunables for h in help_flags): tunable_list.append("--help") opts.tunables = tunable_list tunable_dict["tunables"] = opts.tunables valid_pids = self.validate_profile_id_input(opts.profiles, start = True) for profile_id in valid_pids: self.load_profile(opts.file[0], tunable_dict, pid_input = profile_id) #when ".. -t --help", is called the help message is being printed once and then it returns to the console if any(h in opts.tunables for h in help_flags): break kw = {} if opts.clients: for client in opts.clients: if client not in self.ports: raise TRexError('Invalid client interface: %d' % client) if client & 1: raise TRexError('Following interface is not client: %d' % client) kw['client_mask'] = self._calc_port_mask(opts.clients) elif opts.servers_only: kw['client_mask'] = 0 self.start(opts.mult, opts.duration, opts.nc, False, opts.latency_pps, opts.ipv6, pid_input = profile_id, **kw) return True
@console_api('stop', 'ASTF', True)
[docs] def stop_line(self, line): '''Stop traffic command''' parser = parsing_opts.gen_parser( self, 'stop', self.stop_line.__doc__, parsing_opts.ASTF_PROFILE_DEFAULT_LIST, parsing_opts.REMOVE ) opts = parser.parse_args(shlex.split(line)) self.stop(False, pid_input = opts.profiles, is_remove = opts.remove)
@console_api('update', 'ASTF', True)
[docs] def update_line(self, line): '''Update traffic multiplier''' parser = parsing_opts.gen_parser( self, 'update', self.update_line.__doc__, parsing_opts.MULTIPLIER_NUM, parsing_opts.ASTF_PROFILE_DEFAULT_LIST ) opts = parser.parse_args(shlex.split(line)) self.update(opts.mult, pid_input = opts.profiles)
@console_api('service', 'ASTF', True)
[docs] def service_line (self, line): '''Configures port for service mode. In service mode ports will reply to ARP, PING and etc. In ASTF, command will apply on all ports. ''' parser = parsing_opts.gen_parser(self, "service", self.service_line.__doc__, parsing_opts.SERVICE_GROUP) opts = parser.parse_args(line.split()) enabled, filtered, mask = self._get_service_params(opts) if mask is not None and ((mask & NO_TCP_UDP_MASK) == 0): raise TRexError('Cannot set NO_TCP_UDP off in ASTF!') self.set_service_mode(enabled = enabled, filtered = filtered, mask = mask) return True
@staticmethod def _calc_port_mask(ports): mask =0 for p in ports: mask += (1<<p) return mask @console_api('latency', 'ASTF', True)
[docs] def latency_line(self, line): '''Latency-related commands''' parser = parsing_opts.gen_parser( self, 'latency', self.latency_line.__doc__) def latency_add_parsers(subparsers, cmd, help = '', **k): return subparsers.add_parser(cmd, description = help, help = help, **k) subparsers = parser.add_subparsers(title = 'commands', dest = 'command', metavar = '') start_parser = latency_add_parsers(subparsers, 'start', help = 'Start latency traffic') latency_add_parsers(subparsers, 'stop', help = 'Stop latency traffic') update_parser = latency_add_parsers(subparsers, 'update', help = 'Update rate of running latency') latency_add_parsers(subparsers, 'show', help = 'alias for stats -l') latency_add_parsers(subparsers, 'hist', help = 'alias for stats --lh') latency_add_parsers(subparsers, 'counters', help = 'alias for stats --lc') start_parser.add_arg_list( parsing_opts.MULTIPLIER_NUM, parsing_opts.SRC_IPV4, parsing_opts.DST_IPV4, parsing_opts.PORT_LIST, parsing_opts.DUAL_IPV4 ) update_parser.add_arg_list( parsing_opts.MULTIPLIER_NUM, ) opts = parser.parse_args(shlex.split(line)) if opts.command == 'start': ports_mask = self._calc_port_mask(opts.ports) self.start_latency(opts.mult, opts.src_ipv4, opts.dst_ipv4, ports_mask, opts.dual_ip) elif opts.command == 'stop': self.stop_latency() elif opts.command == 'update': self.update_latency(mult = opts.mult) elif opts.command == 'show' or not opts.command: self._show_latency_stats() elif opts.command == 'hist': self._show_latency_histogram() elif opts.command == 'counters': self._show_latency_counters() else: raise TRexError('Unhandled command %s' % opts.command) return True
@console_api('topo', 'ASTF', True, True)
[docs] def topo_line(self, line): '''Topology-related commands''' parser = parsing_opts.gen_parser( self, 'topo', self.topo_line.__doc__) def topology_add_parsers(subparsers, cmd, help = '', **k): return subparsers.add_parser(cmd, description = help, help = help, **k) subparsers = parser.add_subparsers(title = 'commands', dest = 'command', metavar = '') load_parser = topology_add_parsers(subparsers, 'load', help = 'Load topology from file') reso_parser = topology_add_parsers(subparsers, 'resolve', help = 'Resolve loaded topology, push to server on success') show_parser = topology_add_parsers(subparsers, 'show', help = 'Show current topology status') topology_add_parsers(subparsers, 'clear', help = 'Clear current topology') save_parser = topology_add_parsers(subparsers, 'save', help = 'Save topology to file') load_parser.add_arg_list( parsing_opts.FILE_PATH, parsing_opts.TUNABLES, ) reso_parser.add_arg_list( parsing_opts.PORT_LIST_NO_DEFAULT, ) show_parser.add_arg_list( parsing_opts.PORT_LIST_NO_DEFAULT, ) save_parser.add_arg_list( parsing_opts.FILE_PATH_NO_CHECK, ) opts = parser.parse_args(shlex.split(line)) if opts.command == 'load': self.topo_load(opts.file[0], opts.tunables) return False elif opts.command == 'resolve': self.topo_resolve(opts.ports_no_default) elif opts.command == 'show' or not opts.command: self.topo_show(opts.ports_no_default) return False elif opts.command == 'clear': self.topo_clear() elif opts.command == 'save': self.topo_save(opts.file[0]) else: raise TRexError('Unhandled command %s' % opts.command) return True
@console_api('clear', 'common', False)
[docs] def clear_stats_line (self, line): '''Clear cached local statistics\n''' # define a parser parser = parsing_opts.gen_parser(self, "clear", self.clear_stats_line.__doc__, parsing_opts.PORT_LIST_WITH_ALL) opts = parser.parse_args(line.split()) self.clear_stats(opts.ports, pid_input = ALL_PROFILE_ID) return RC_OK()
@console_api('stats', 'common', True)
[docs] def show_stats_line (self, line): '''Show various statistics\n''' # define a parser parser = parsing_opts.gen_parser( self, 'stats', self.show_stats_line.__doc__, parsing_opts.PORT_LIST, parsing_opts.ASTF_STATS_GROUP, parsing_opts.ASTF_PROFILE_STATS) astf_profiles_state = self.get_profiles_state() valid_pids = list(astf_profiles_state.keys()) opts = parser.parse_args(shlex.split(line)) if not opts: return # without parameters show only global and ports if not opts.stats: self._show_global_stats() self._show_port_stats(opts.ports) return if self.is_dynamic == True and opts.pfname == None: is_sum = True valid_pids = self.validate_profile_id_input(pid_input = DEFAULT_PROFILE_ID) else: is_sum = False valid_pids = self.validate_profile_id_input(pid_input = opts.pfname) # decode which stats to show if opts.stats == 'global': self._show_global_stats() elif opts.stats == 'ports': self._show_port_stats(opts.ports) elif opts.stats == 'xstats': self._show_port_xstats(opts.ports, False) elif opts.stats == 'xstats_inc_zero': self._show_port_xstats(opts.ports, True) elif opts.stats == 'status': self._show_port_status(opts.ports) elif opts.stats == 'cpu': self._show_cpu_util() elif opts.stats == 'mbuf': self._show_mbuf_util() elif opts.stats == 'astf': for profile_id in valid_pids: self._show_traffic_stats(False, pid_input = profile_id, is_sum = is_sum) elif opts.stats == 'astf_inc_zero': for profile_id in valid_pids: self._show_traffic_stats(True, pid_input = profile_id, is_sum = is_sum) elif opts.stats == 'latency': self._show_latency_stats() elif opts.stats == 'latency_histogram': self._show_latency_histogram() elif opts.stats == 'latency_counters': self._show_latency_counters() else: raise TRexError('Unhandled stat: %s' % opts.stats)
@console_api('template_group', 'ASTF', True)
[docs] def template_group_line(self, line): "Template group commands" parser = parsing_opts.gen_parser( self, 'template_group', self.template_group_line.__doc__ ) def template_group_add_parsers(subparsers, cmd, help = '', **k): return subparsers.add_parser(cmd, description = help, help = help, **k) subparsers = parser.add_subparsers(title = 'commands', dest = 'command', metavar = '') names_parser = template_group_add_parsers(subparsers, 'names', help = 'Get template group names') stats_parser = template_group_add_parsers(subparsers, 'stats', help = 'Get stats for template group') names_parser.add_arg_list(parsing_opts.TG_NAME_START) names_parser.add_arg_list(parsing_opts.TG_NAME_AMOUNT) names_parser.add_arg_list(parsing_opts.ASTF_PROFILE_LIST) stats_parser.add_arg_list(parsing_opts.TG_STATS) stats_parser.add_arg_list(parsing_opts.ASTF_PROFILE_LIST) opts = parser.parse_args(shlex.split(line)) if not opts or not opts.command: parser.print_help() return pid_input = opts.profiles valid_pids = self.validate_profile_id_input(pid_input) for profile_id in valid_pids: if opts.command == 'names': print(format_text("Profile ID: %s" % profile_id, 'bold')) self.traffic_stats._show_tg_names(start=opts.start, amount=opts.amount, pid_input = profile_id) elif opts.command == 'stats': try: self.get_tg_names(profile_id) tgid = self.traffic_stats._translate_names_to_ids(opts.name, pid_input = profile_id) self._show_traffic_stats(include_zero_lines=False, tgid = tgid[0], pid_input = profile_id) except ASTFErrorBadTG: print(format_text("Template group name %s doesn't exist!" % opts.name, 'bold')) else: raise TRexError('Unhandled command: %s' % opts.command)
def _get_num_of_tgids(self, pid_input = DEFAULT_PROFILE_ID): return self.traffic_stats._get_num_of_tgids(pid_input) def _show_traffic_stats(self, include_zero_lines, buffer = sys.stdout, tgid = 0, pid_input = DEFAULT_PROFILE_ID, is_sum = False): table = self.traffic_stats.to_table(include_zero_lines, tgid, pid_input, is_sum = is_sum) text_tables.print_table_with_header(table, untouched_header = table.title, buffer = buffer) def _show_latency_stats(self, buffer = sys.stdout): table = self.latency_stats.to_table_main() text_tables.print_table_with_header(table, untouched_header = table.title, buffer = buffer) def _show_latency_histogram(self, buffer = sys.stdout): table = self.latency_stats.histogram_to_table() text_tables.print_table_with_header(table, untouched_header = table.title, buffer = buffer) def _show_latency_counters(self, buffer = sys.stdout): table = self.latency_stats.counters_to_table() text_tables.print_table_with_header(table, untouched_header = table.title, buffer = buffer) def _show_profiles_states(self, buffer = sys.stdout): table = text_tables.TRexTextTable() table.set_cols_align(["c"] + ["c"]) table.set_cols_width([20] + [20]) table.header(["ID", "State"]) self.sync() profiles_state = sorted(self.get_profiles_state().items()) for profile_id, state in profiles_state: table.add_row([ profile_id, state ]) return table @console_api('profiles', 'ASTF', True, True)
[docs] def profiles_line(self, line): '''Get loaded to profiles information''' parser = parsing_opts.gen_parser(self, "profiles", self.profiles_line.__doc__) opts = parser.parse_args(line.split()) if not opts: return opts table = self._show_profiles_states() if not table: self.logger.info(format_text("No profiles found with desired filter.\n", "bold", "magenta")) text_tables.print_table_with_header(table, header = 'Profile states')