from contextlib import contextmanager
import traceback
import re
import os
import signal
import inspect
import sys
import time
import base64
import random
import struct
from collections import OrderedDict, defaultdict
from ..utils.common import *
from ..utils import parsing_opts, text_tables
from ..utils.text_opts import format_text, format_num
from ..utils.text_tables import TRexTextTable
from .trex_events import Event
from .trex_ctx import TRexCtx
from .trex_conn import Connection
from .trex_ns import NSCmds,NSCmd,NSCmdResult
from .trex_logger import ScreenLogger
from .trex_types import *
from .trex_types import PortProfileID
from .trex_exceptions import *
from .trex_psv import *
from .trex_vlan import VLAN
from .trex_api_annotators import client_api, console_api
from ..astf.arg_verify import ArgVerify
from .stats.trex_stats import StatsBatch
from .stats.trex_global_stats import GlobalStats, UtilStats
from .stats.trex_port_stats import PortStatsSum
from .services.trex_service_int import ServiceCtx
from .services.trex_service_icmp import ServiceICMP
from .services.trex_service_arp import ServiceARP
from .services.trex_service_ipv6 import ServiceICMPv6, ServiceIPv6Scan
from .stats.trex_ns import CNsStats
from scapy.layers.l2 import Ether, Packet
from scapy.layers.inet import IP, UDP
from scapy.utils import RawPcapWriter
import pprint
## Filtered Service Mode Mask ##
## IMPORTANT - UPDATE ALSO in set_service_mode() docstring! ##
NO_MASK = 0
NO_TCP_UDP_MASK = 1
BGP_MASK = 2
DHCP_MASK = 4
TRANSPORT_MASK = 8
ALL_MASK = 255 # all bits are on
# imarom: move me to someplace apropriate
class PacketBuffer:
'''
Class to be used when sending packets via push_packets.
:parameters:
buffer : bytes or Scapy packet
Packet to send
port_src : bool
Override src MAC with TRex port src
port_dst : bool
Override dst MAC with TRex port dst
'''
def __init__(self, buffer, port_src = False, port_dst = False):
if isinstance(buffer, Packet):
self.buffer = bytes(buffer)
else:
validate_type('buffer', buffer, bytes)
self.buffer = buffer
self.port_src = port_src
self.port_dst = port_dst
class TRexClient(object):
"""
TRex abstract client
contains common code between STLClient, ASTFClient and etc.
"""
def __init__(self,
api_ver = None,
username = get_current_user(),
server = "localhost",
sync_port = 4501,
async_port = 4500,
verbose_level = "error",
logger = None,
sync_timeout = None,
async_timeout = None):
# logger
logger = logger if logger is not None else ScreenLogger()
logger.set_verbose(verbose_level)
# first create a TRex context
self.ctx = TRexCtx(api_ver,
username,
server,
sync_port,
async_port,
logger,
sync_timeout,
async_timeout)
# init objects
self.ports = OrderedDict()
# event handler
self._register_events()
# common stats (for STL, ASTF)
self.global_stats = GlobalStats(self)
self.util_stats = UtilStats()
# connection state object
self.conn = Connection(self.ctx)
# port state checker
self.psv = PortStateValidator(self)
# server version check for dynamic port addition
self.is_dynamic = False
def get_mode (self):
"""
return the mode/type for the client
"""
raise NotImplementedError()
############################ abstract #############################
############################ functions #############################
############################ #############################
def _on_connect (self):
"""
for deriving objects
actions performed when connecting to the server
"""
raise NotImplementedError
############################ events #############################
############################ #############################
############################ #############################
# register all common events
def _register_events (self):
# server stopped event
self.ctx.event_handler.register_event_handler("server stopped", self._on_server_stopped)
# subscriber thread events
self.ctx.event_handler.register_event_handler("subscriber timeout", self._on_subscriber_timeout)
self.ctx.event_handler.register_event_handler("subscriber resumed", self._on_subscriber_resumed)
self.ctx.event_handler.register_event_handler("subscriber crashed", self._on_subscriber_crashed)
# port related events
self.ctx.event_handler.register_event_handler("port started", self._on_port_started)
self.ctx.event_handler.register_event_handler("port stopped", self._on_port_stopped)
self.ctx.event_handler.register_event_handler("port paused", self._on_port_paused)
self.ctx.event_handler.register_event_handler("port resumed", self._on_port_resumed)
self.ctx.event_handler.register_event_handler("port job done", self._on_port_job_done)
self.ctx.event_handler.register_event_handler("port acquired", self._on_port_acquired)
self.ctx.event_handler.register_event_handler("port released", self._on_port_released)
self.ctx.event_handler.register_event_handler("port error", self._on_port_error)
self.ctx.event_handler.register_event_handler("port attr chg", self._on_port_attr_chg)
self.ctx.event_handler.register_event_handler("astf state changed", self._on_astf_state_chg)
self.ctx.event_handler.register_event_handler("astf profile state changed", self._on_astf_profile_state_chg)
self.ctx.event_handler.register_event_handler("astf profile cleared", self._on_astf_profile_cleared)
self.ctx.event_handler.register_event_handler("global stats update", lambda *args, **kwargs: None)
# executed when server has stopped
def _on_server_stopped (self, cause):
msg = "Server has been shutdown - cause: '{0}'".format(cause)
self.conn.mark_for_disconnect(cause)
ev = Event('server', 'warning', msg)
self.ctx.logger.critical(ev)
return ev
# executed when the subscriber thread has timed-out
def _on_subscriber_timeout (self, timeout_sec):
if self.conn.is_connected():
msg = 'Connection lost - Subscriber timeout: no data from TRex server for more than {0} seconds'.format(timeout_sec)
# we cannot simply disconnect the connection - we mark it for disconnection
# later on, the main thread will execute an ordered disconnection
self.conn.mark_for_disconnect(msg)
ev = Event('local', 'warning', msg)
self.ctx.logger.critical(ev)
return ev
# executed when the subscriber thread has resumed (after timeout)
def _on_subscriber_resumed (self):
pass
# executed when the subscriber theard has crashed
def _on_subscriber_crashed (self, e):
msg = 'subscriber thread has crashed:\n\n{0}'.format(traceback.format_exc())
# if connected, mark as disconnected
if self.conn.is_connected():
self.conn.mark_for_disconnect(msg)
ev = Event('local', 'warning', msg)
self.ctx.logger.critical(ev)
# kill the process
self.ctx.logger.critical('Terminating due to subscriber crash')
os.kill(os.getpid(), signal.SIGTERM)
return ev
def _on_port_started (self, port_id):
msg = "Port {0} has started".format(port_id)
if port_id in self.ports:
self.ports[port_id].async_event_port_started()
return Event('server', 'info', msg)
def _on_port_stopped (self, port_id):
msg = "Port {0} has stopped".format(port_id)
if port_id in self.ports:
self.ports[port_id].async_event_port_stopped()
return Event('server', 'info', msg)
def _on_port_paused (self, port_id):
msg = "Port {0} has paused".format(port_id)
if port_id in self.ports:
self.ports[port_id].async_event_port_paused()
return Event('server', 'info', msg)
def _on_port_resumed (self, port_id):
msg = "Port {0} has resumed".format(port_id)
if port_id in self.ports:
self.ports[port_id].async_event_port_resumed()
return Event('server', 'info', msg)
def _on_port_job_done (self, port_id):
msg = "Port {0} job done".format(port_id)
if port_id in self.ports:
self.ports[port_id].async_event_port_job_done()
ev = Event('server', 'info', msg)
if port_id in self.get_acquired_ports():
self.ctx.logger.info(ev)
return ev
# on port acquired event
def _on_port_acquired (self, port_id, who, session_id, force):
# if we hold the port and it was not taken by this session - show it
if port_id in self.get_acquired_ports() and session_id != self.ctx.session_id:
ev_type = 'warning'
else:
ev_type = 'info'
# format the thief/us...
if session_id == self.ctx.session_id:
user = 'you'
elif who == self.ctx.username:
user = 'another session of you'
else:
user = "'{0}'".format(who)
if force:
msg = "Port {0} was forcely taken by {1}".format(port_id, user)
else:
msg = "Port {0} was taken by {1}".format(port_id, user)
# call the handler in case its not this session
if (session_id != self.ctx.session_id) and (port_id in self.ports):
self.ports[port_id].async_event_port_acquired(who)
ev = Event('server', ev_type, msg)
if ev_type == 'warning':
self.ctx.logger.warning(ev)
return ev
# on port release event
def _on_port_released (self, port_id, who, session_id):
if session_id == self.ctx.session_id:
user = 'you'
elif who == self.ctx.username:
user = 'another session of you'
else:
user = "'{0}'".format(who)
msg = "Port {0} was released by {1}".format(port_id, user)
# call the handler in case its not this session
if (session_id != self.ctx.session_id) and (port_id in self.ports):
self.ports[port_id].async_event_port_released()
return Event('server', 'info', msg)
def _on_port_error (self, port_id):
msg = "port {0} job failed".format(port_id)
return Event('server', 'warning', msg)
def _on_port_attr_chg (self, port_id, attr):
if port_id not in self.ports:
return
diff = self.ports[port_id].async_event_port_attr_changed(attr)
if not diff:
return
msg = "port {0} attributes changed".format(port_id)
for key, (old_val, new_val) in diff.items():
msg += '\n {key}: {old} -> {new}'.format(
key = key,
old = old_val.lower() if type(old_val) is str else old_val,
new = new_val.lower() if type(new_val) is str else new_val)
return Event('server', 'info', msg)
def _on_astf_state_chg(self, ctx_state, error, epoch):
raise NotImplementedError()
def _on_astf_profile_state_chg(self, profile_id, ctx_state, error, epoch):
raise NotImplementedError()
def _on_astf_profile_cleared(self, profile_id, error, epoch):
raise NotImplementedError()
############################ private #############################
############################ functions #############################
############################ #############################
# transmit request on the RPC link
def _transmit(self, method_name, params = None):
return self.conn.rpc.transmit(method_name, params)
# execute 'method' for a port list
def _for_each_port (self, method, port_list = None, *args, **kwargs):
# specific port params
pargs = {}
if 'pargs' in kwargs:
pargs = kwargs['pargs']
del kwargs['pargs']
# handle single port case
if port_list is not None:
port_list = listify(port_list)
# none means all
port_list = port_list if port_list is not None else self.get_all_ports()
rc = RC()
for port in port_list:
port_id = int(port)
profile_id = None
if isinstance(port, PortProfileID):
profile_id = port.profile_id
# get port specific
pkwargs = pargs.get(port_id, {})
if pkwargs:
pkwargs.update(kwargs)
else:
pkwargs = kwargs
if profile_id:
pkwargs.update({'profile_id': profile_id})
func = getattr(self.ports[port_id], method)
rc.add(func(*args, **pkwargs))
return rc
# connect to server
def _connect(self):
# before we connect to the server - reject any async updates until fully init
self.ctx.event_handler.disable()
# connect to the server
rc = self.conn.connect()
if not rc:
return rc
# version
rc = self._transmit("get_version")
if not rc:
return rc
self.ctx.server_version = rc.data()
# cache system info
rc = self._transmit("get_system_info")
if not rc:
return rc
self.ctx.system_info = rc.data()
# cache supported commands
rc = self._transmit("get_supported_cmds")
if not rc:
return rc
# server version check for dynamic port addition
self.is_dynamic = 'get_profile_list' in rc.data()
self.supported_cmds = sorted(rc.data())
# STX specific code: create ports
rc = self._on_connect_create_ports(self.ctx.system_info)
if not rc:
return rc
xstat_names = self.any_port.xstats.get_names(self.any_port)
# all ports on a given TRex server instance have the same list of xstats
for port in self.ports.values():
port.xstats.set_names(xstat_names)
# sync the ports
rc = self._for_each_port('sync')
if not rc:
return rc
rc = self._on_connect_clear_stats()
if not rc:
return rc
# mark the event handler as ready to process async updates
self.ctx.event_handler.enable()
rc = self._on_connect()
if not rc:
return rc
return RC_OK()
# disconenct from server
def _disconnect(self, release_ports = True):
# release any previous acquired ports
if self.conn.is_connected() and release_ports:
with self.ctx.logger.suppress():
try:
self.release()
except TRexError:
pass
# disconnect the link to the server
self.conn.disconnect()
return RC_OK()
def _assign_ports(self, port_map):
self.ports.clear()
for key in sorted(port_map.keys()):
self.ports[key] = port_map[key]
return RC_OK()
############################ API #############################
############################ #############################
############################ #############################
############################ Getters #############################
############################ #############################
############################ #############################
@client_api('getter', False)
def probe_server (self):
"""
Probe the server for the version / mode
Can be used to determine stateless or advanced statefull mode
:parameters:
none
:return:
dictionary describing server version and configuration
:raises:
None
"""
rc = self.conn.probe_server()
if not rc:
raise TRexError(rc)
return rc.data()
@property
def logger (self):
"""
Get the associated logger
:parameters:
none
:return:
Logger object
:raises:
None
"""
return self.ctx.logger
@client_api('getter', False)
def get_verbose (self):
"""
Get the verbose mode
:parameters:
none
:return:
Get the verbose mode as Bool
:raises:
None
"""
return self.ctx.logger.get_verbose()
@client_api('getter', True)
def is_all_ports_acquired (self):
"""
is_all_ports_acquired
:parameters:
None
:return:
Returns True if all ports are acquired
:raises:
None
"""
return (self.get_all_ports() == self.get_acquired_ports())
@client_api('getter', False)
def is_connected (self):
"""
:parameters:
None
:return:
True if the connection is established to the server
o.w False
:raises:
None
"""
return self.conn.is_connected()
@client_api('getter', True)
def get_connection_info (self):
"""
:parameters:
None
:return:
Connection dict
:raises:
None
"""
return {'server' : self.ctx.server,
'sync_port' : self.ctx.sync_port,
'async_port' : self.ctx.async_port,
'username' : self.ctx.username}
@client_api('getter', True)
def get_server_supported_cmds(self):
"""
:parameters:
None
:return:
List of commands supported by server RPC
:raises:
None
"""
return self.supported_cmds
@client_api('getter', True)
def get_server_version(self):
"""
:parameters:
None
:return:
Connection dict
:raises:
None
"""
return self.ctx.server_version
@client_api('getter', True)
def get_server_system_info(self):
"""
:parameters:
None
:return:
Connection dict
:raises:
None
"""
return self.ctx.system_info
@property
def system_info (self):
# provided for backward compatability
return self.ctx.system_info
@client_api('getter', True)
def get_port_count(self):
"""
:parameters:
None
:return:
Number of ports
:raises:
None
"""
return len(self.ports)
@client_api('getter', True)
def get_port (self, port_id):
"""
:parameters:
port_id - int
:return:
Port object
:raises:
None
"""
port = self.ports.get(port_id, None)
if (port != None):
return port
else:
raise TRexArgumentError('port id', port_id, valid_values = self.get_all_ports())
@client_api('getter', True)
def get_all_ports (self):
"""
:parameters:
None
:return:
List of all ports
:raises:
None
"""
return list(self.ports)
@client_api('getter', False)
def get_acquired_ports(self):
"""
:parameters:
None
:return:
list of all acquired ports
:raises:
None
"""
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_acquired()]
@client_api('getter', True)
def get_active_ports(self, owned = True):
"""
:parameters:
owned - bool
if 'True' apply only-owned filter
:return:
List of all active ports
:raises:
None
"""
if owned:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_active() and port_obj.is_acquired()]
else:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_active()]
@client_api('getter', True)
def get_resolvable_ports (self):
"""
:parameters:
None
:return:
List of all resolveable ports (configured with L3 mode)
:raises:
None
"""
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_acquired() and port_obj.is_l3_mode()]
@client_api('getter', True)
def get_resolved_ports (self):
"""
:parameters:
None
:return:
List of all resolved ports
:raises:
None
"""
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_acquired() and port_obj.is_resolved()]
@client_api('getter', True)
def get_service_enabled_ports(self, owned = True):
"""
:parameters:
owned - bool
if 'True' apply only-owned filter
:return:
List of all ports
:raises:
None
"""
if owned:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_service_mode_on() and port_obj.is_acquired()]
else:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_service_mode_on()]
@client_api('getter', True)
def get_service_filtered_ports(self, owned = True):
"""
:parameters:
owned - bool
if 'True' apply only-owned filter
:return:
List of all ports at filtered service mode
:raises:
None
"""
if owned:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_service_filtered_mode_on() and port_obj.is_acquired()]
else:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_service_filtered_mode_on()]
@client_api('getter', True)
def get_paused_ports (self, owned = True):
"""
:parameters:
owned - bool
if 'True' apply only-owned filter
:return:
List of all paused ports
:raises:
None
"""
if owned:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_paused() and port_obj.is_acquired()]
else:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_paused()]
@client_api('getter', True)
def get_transmitting_ports (self, owned = True):
"""
:parameters:
owned - bool
if 'True' apply only-owned filter
:return:
List of all transmitting ports
:raises:
None
"""
if owned:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_transmitting() and port_obj.is_acquired()]
else:
return [port_id
for port_id, port_obj in self.ports.items()
if port_obj.is_transmitting()]
@client_api('getter', True)
def is_traffic_active (self, ports = None):
"""
Return if specified port(s) have traffic
:parameters:
ports : list
Ports on which to execute the command
:raises:
+ :exe:'TRexError'
"""
ports = ports if ports is not None else self.get_acquired_ports()
ports = self.psv.validate('is_traffic_active', ports)
return set(self.get_active_ports()).intersection(ports)
@client_api('getter', True)
def get_port_attr (self, port):
"""
get the port attributes currently set
:parameters:
port - for which port to return port attributes
:raises:
+ :exe:'TRexError'
"""
validate_type('port', port, int)
if port not in self.get_all_ports():
raise TRexError("'{0}' is not a valid port id".format(port))
return self.ports[port].get_formatted_info()
@client_api('getter', True)
def fetch_capture_packets (self, capture_id, output, pkt_count = 1000):
"""
Fetch packets from existing active capture
:parameters:
capture_id: int
an active capture ID
output: str / list
if output is a 'str' - it will be interpeted as output filename
if it is a list, the API will populate the list with packet objects
in case 'output' is a list, each element in the list is an object
containing:
'binary' - binary bytes of the packet
'origin' - RX or TX origin
'ts' - timestamp relative to the start of the capture
'index' - order index in the capture
'port' - on which port did the packet arrive or was transmitted from
pkt_count: int
maximum packets to fetch
:raises:
+ :exe:'TRexError'
"""
write_to_file = isinstance(output, basestring)
self.ctx.logger.pre_cmd("Writing up to {0} packets to '{1}'".format(pkt_count, output if write_to_file else 'list'))
# create a PCAP file
if write_to_file:
save_dir = os.path.dirname(output)
if not os.path.isdir(save_dir):
raise TRexError('Requested path is not a directory: %s' % save_dir)
try:
writer = RawPcapWriter(output, linktype = 1)
except IOError as e:
raise TRexError('Could not open file %s: %s' % (output, e))
writer._write_header(None)
else:
# clear the list
del output[:]
pending = pkt_count
rc = RC_OK()
# fetch with iterations - each iteration up to 50 packets
while pending > 0 and pkt_count > 0:
rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': min(50, pkt_count)})
if not rc:
self.ctx.logger.post_cmd(rc)
raise TRexError(rc)
pkts = rc.data()['pkts']
pending = rc.data()['pending']
start_ts = rc.data()['start_ts']
pkt_count -= len(pkts)
# write packets
for pkt in pkts:
ts = pkt['ts'] - start_ts
pkt['binary'] = base64.b64decode(pkt['binary'])
if write_to_file:
ts_sec, ts_usec = sec_split_usec(ts)
writer._write_packet(pkt['binary'], sec = ts_sec, usec = ts_usec)
else:
output.append(pkt)
self.ctx.logger.post_cmd(rc)
@client_api('getter', True)
def get_capture_status (self):
"""
Returns a dictionary where each key is an capture ID
Each value is an object describing the capture
"""
rc = self._transmit("capture", params = {'command': 'status'})
if not rc:
raise TRexError(rc)
# reformat as dictionary
output = {}
for c in rc.data():
output[c['id']] = c
return output
@client_api('getter', False)
def has_events (self):
"""
returns True in case there are events in the queue
:raises:
None
"""
return not self.ctx.event_handler.empty ()
@client_api('getter', False)
def pop_event (self):
"""
returns event from the head of the queue
:raises:
None
"""
return self.ctx.event_handler.pop_event()
@client_api('getter', False)
def get_events (self, ev_type_filter = None):
"""
returns list of the events recorded
:parameters:
ev_type_filter: list
combination of: 'warning', 'info'
:return:
warning logged events
:raises:
None
"""
return self.ctx.event_handler.get_events(ev_type_filter)
@client_api('getter', False)
def get_warnings (self):
"""
returns all the warnings logged events
:parameters:
None
:return:
warning logged events
:raises:
None
"""
return self.get_events(ev_type_filter = 'warning')
@client_api('getter', False)
def get_info (self):
"""
returns all the info logged events
:parameters:
None
:return:
info logged events
:raises:
None
"""
return self.get_events(ev_type_filter = 'info')
# get port(s) info as a list of dicts
@client_api('getter', True)
def get_port_info (self, ports = None):
ports = ports if ports is not None else self.get_all_ports()
ports = self.psv.validate('release', ports)
return [self.ports[port_id].get_formatted_info() for port_id in listify_if_int(ports)]
@client_api('command', True)
def get_util_stats(self):
"""
Get utilization stats as dictionary with 2 keys: 'cpu' and 'mbuf_stats'.
TRex CPU utilization and ports per core (list of dictionaries per core)
Each dictionary contains two keys:
1. 'ports': The ports that the core is using. Idle ports are denoted by -1.
2. 'history': List of last 20 utilization values. Each value is calculated as average in interval of second.
MBUFs memory consumption per CPU socket.
:parameters:
None
:raises:
+ :exc:`TRexError`
"""
self.ctx.logger.pre_cmd('Getting Utilization stats')
# update and convert to dict
self.util_stats.update_sync(self.conn.rpc)
return self.util_stats.to_dict()
@client_api('command', True)
def get_xstats(self, port_id):
"""
Get extended stats of port: all the counters as dict.
:parameters:
port_id: int
:returns:
Dict with names of counters as keys and values of uint64. Actual keys may vary per NIC.
:raises:
+ :exc:`TRexError`
"""
self.ctx.logger.pre_cmd('Getting xstats')
# update and convert to dict
xstats = self.ports[port_id].get_xstats()
xstats.update_sync(self.conn.rpc)
return xstats.to_dict()
############################ Commands #############################
############################ #############################
############################ #############################
@client_api('command', False)
def set_verbose (self, level):
"""
Sets verbose level
:parameters:
level : str
"none" - be silent no matter what
"critical"
"error" - show only errors (default client mode)
"warning"
"info"
"debug" - print everything
:raises:
None
"""
validate_type('level', level, basestring)
self.ctx.logger.set_verbose(level)
@client_api('command', False)
def set_timeout(self, timeout_sec):
'''
Set timeout for connectivity to TRex server. Must be not connected.
:parameters:
timeout_sec : int or float
| Timeout in seconds for sync operations.
| If async data does not arrive for this period, disconnect.
:raises:
+ :exc:`TRexError` - in case client is already connected.
'''
validate_type('timeout_sec', timeout_sec, (int, float))
if timeout_sec <= 0:
raise TRexError('timeout_sec must be positive')
if self.is_connected():
raise TRexError('Can set timeout only when not connected')
self.conn.rpc.set_timeout_sec(timeout_sec)
self.conn.async_.set_timeout_sec(timeout_sec)
@client_api('command', False)
def connect (self):
"""
Connects to the TRex server
:parameters:
None
:raises:
+ :exc:`TRexError`
"""
# cleanup from previous connection
self._disconnect()
rc = self._connect()
if not rc:
self._disconnect()
raise TRexError(rc)
@client_api('command', False)
def disconnect (self, stop_traffic = True, release_ports = True):
"""
Disconnects from the server
:parameters:
stop_traffic : bool
Attempts to stop traffic before disconnecting.
release_ports : bool
Attempts to release all the acquired ports.
"""
# try to stop ports but do nothing if not possible
if stop_traffic:
try:
self.stop()
except TRexError:
pass
self.ctx.logger.pre_cmd("Disconnecting from server at '{0}':'{1}'".format(self.ctx.server,
self.ctx.sync_port))
rc = self._disconnect(release_ports)
self.ctx.logger.post_cmd(rc)
@client_api('command', True)
def ping_rpc_server(self):
"""
Pings the RPC server
:parameters:
None
:return:
Timestamp of server
:raises:
+ :exc:`TRexError`
"""
self.ctx.logger.pre_cmd("Pinging the server on '{0}' port '{1}': ".format(self.ctx.server, self.ctx.sync_port))
rc = self._transmit("ping")
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
return rc.data()
@client_api('command', True)
def set_namespace_start(self, port, ns_cmds):
"""
Start namespace batch operation.
This commands is a batch command that interact with the kernel and could be slow
in case of a big batch.
use wait_for_async_results to block for the response, or is_async_results_ready to pool if the results is ready.
see see :class:`trex.common.trex_ns.NSCmds` and :class:`trex.common.trex_ns.NSCmdResult`
Using other Python API while there is an active batch is not recommended::
c.set_namespace_start(port=0, ns_cmds)
res = c.wait_for_async_results(port=0);
:parameters:
port: int
Port ID to set the dest address
ns_cmds : see :class:`trex.common.trex_ns.NSCmds` objects that includes batch commands
:raises:
+ :exc:`TRexError`
"""
validate_type('port', port, int)
if not isinstance(ns_cmds, NSCmds):
raise TRexTypeError('ns_cmds', type(ns_cmds), NSCmds)
json_rpc = ns_cmds.get_json_str()
if len(json_rpc)==0:
raise TRexError('commands is empty ')
self.psv.validate('set_namespace_start', port)
self.ctx.logger.pre_cmd("Setting port {0} in with namespace configuration".format(port))
rc = self.ports[port].set_namespace_start(json_rpc)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
return rc
@client_api('command', True)
def wait_for_async_results(self, port, timeout = None, cb = None):
"""
wait for the namespace batch operations to finish, return an a list of batch results
it includes something like that
[None, {'u'error':'some error'},{u'result': {u'nodes': [u'\x00\x01\x02\x03\x04\x05']}}
None : means that there is no error and command was executed
object: that include 'error' means that there is an error
object with 'result'
:parameters:
port: int
Port ID to set the dest address
timeout : in second, None is unlimited
cb: A callback function that gets an object for calculating progress of a log operation
exec_cmds: total commands executed
total_cmds: total commands in the queue
errs_cmds: number of errors in the last operation
ticket_id: ticket id
this will print the progress into the screen::
def progress_cb(obj):
prog = 100.0*( float(obj['exec_cmds']) / float(obj['total_cmds']))
err = obj['errs_cmds']
print("progress {:3.0f}% errors : {}".format(prog,err))
:raises:
+ :exc:`TRexError` in case of any error
"""
validate_type('port', port, int)
if timeout is not None:
validate_type('timeout', timeout, int)
self.ctx.logger.pre_cmd("wait_for_async_results".format(port))
rc = self.ports[port].get_async_results(timeout , cb)
self.ctx.logger.post_cmd(rc)
if not rc :
raise TRexError(rc)
# check for errors
nc = NSCmdResult(rc)
if nc.is_any_error():
raise TRexError(nc.errors())
return rc
@client_api('command', True)
def set_namespace(self, port, method, **args):
"""
a utility function that works on top of :func:`set_namespace_start` and :func:`wait_for_async_results` batch operation API.
it creates a batch of one command and one result.
It is good for slow operations that require blocking (such as get API)
the function calls::
c.set_namespace_start(Obj(method, args))
r=c.wait_for_async_results()
return (r)
usage example::
r=set_namespace(port=0,method='get_nodes')
:parameters:
port: int
Port ID to set the dest address
method: string
method name. see NSCmds object for method names
args: dict
method args
:raises:
+ :exc:`TRexError` in case of any error
"""
cmds = NSCmds()
func = getattr(cmds, method)
func(**args)
self.set_namespace_start(port,cmds)
rc = self.wait_for_async_results(port)
return (rc[0]);
@client_api('command', True)
def set_bird_node(self, node_port,
mac,
ipv4 = None,
ipv4_subnet = None,
ipv6_enabled = None,
ipv6 = None,
ipv6_subnet = None,
vlans = None,
tpids = None,
mtu = None):
"""
a utility function that works on top of :func:`set_namespace_start` and :func:`wait_for_async_results` batch operation API.
the function creates a "bird node" using veth's in bird namespace in trex.
usage example::
c.set_bird_node(node_port = 0,
mac = "00:00:00:01:00:06",
ipv4 = "1.1.1.3",
ipv4_subnet = 24,
ipv6_enabled = True,
ipv6_subnet = 124,
vlans = [22],
tpids = [0x8100])
:parameters:
node_port: int
Port id to set the bird node on
mac: string
Mac address for the new bird node in format xx:xx:xx:xx:xx:xx
ipv4: string
Ipv4 address for the new bird node
ipv4_subnet: int
Ipv4 subnet for the new bird node
ipv6_enabled: bool
True/False if ipv6 enabled on the new node, mandatory for ipv6 config
ipv6: string
Ipv6 address for the new bird node
ipv6_subnet: int
Ipv6 subnet for the new bird node
vlans: list
Array of up to 2 uint16 tags.
tpids: list
Array of tpidss that correspond to vlans.
Default is [0x8100] in case of single VLAN and [0x88a8, 0x8100] in case of QinQ.
mtu: int
MTU for bird node.
:raises:
+ :exc:`TRexError` in case of any error
"""
try:
res = self.set_namespace(node_port, method='get_nodes_info', macs_list=[mac])
res = res['result']['nodes'][0]
args = locals()
args['is_bird'] = True
args['node'] = res
if self._is_node_have_same_args(args):
return # bird node exists
else:
raise Exception('Node with mac addres: "%s" already exists with different parameters!' % mac)
except TRexError:
# node with mac addres does not exists, create new one
self.set_port_attr(promiscuous = True)
cmds = NSCmds()
cmds.add_node(mac, is_bird = True)
if ipv4 is not None and ipv4_subnet is not None:
cmds.set_ipv4(mac, ipv4, subnet = ipv4_subnet, shared_ns = True)
if ipv6_enabled is not None:
cmds.set_ipv6(mac, ipv6_enabled, src_ipv6 = ipv6 ,subnet = ipv6_subnet, shared_ns = True)
if vlans is not None:
ver_args = {"types":[{"name": "vlans", 'arg': vlans, "t": list}]}
if tpids is not None:
ver_args['types'].append({"name": "tpids", 'arg': tpids, "t": list})
ArgVerify.verify(self.__class__.__name__, ver_args)
cmds.set_vlan(mac, vlans, tpids)
if mtu is not None:
cmds.set_mtu(mac, mtu)
self.set_namespace_start(node_port, cmds)
self.wait_for_async_results(node_port)
def _is_node_have_same_args(self, args):
node = args['node']
if node['is_bird'] == args['is_bird']:
if 'src' in node['ipv4'].keys() and 'subnet' in node['ipv4'].keys():
if node['ipv4']['src'] != args['ipv4'] or node['ipv4']['subnet'] != args['ipv4_subnet']:
return False
if node['ipv6']['enabled'] != args['ipv6_enabled']:
return False
if 'subnet' in node['ipv6'].keys():
if node['ipv6']['subnet'] != args['ipv6_subnet']:
return False
else:
return False
return True
@client_api('command', True)
def is_async_results_ready(self, port):
"""
return True if the namsspace batch commnand was finished. need to call wait_for_async_results to get the resutl
for example::
while True:
if c.is_async_results_ready(0):
res = c.wait_for_async_results(0)
break;
"""
validate_type('port', port, int)
self.ctx.logger.pre_cmd("is_async_results_ready".format(port))
rc = self.ports[port].is_async_results_ready()
self.ctx.logger.post_cmd(rc)
return rc
@client_api('command', True)
def namespace_remove_all (self, ports = None):
"""
remove all namespaces from all ports
:parameters:
ports: list
The port(s) to remove all the namespaces
:raises:
+ :exc:`TRexError`
"""
# validate ports and state
ports = ports if ports is not None else self.get_acquired_ports()
# validate ports
ports = self.psv.validate('namespace_remove_all', ports, (PSV_ACQUIRED, PSV_SERVICE, PSV_IDLE))
cmds=NSCmds()
cmds.remove_all();
for port in ports:
self.set_namespace_start(port, cmds)
r=self.wait_for_async_results(port)
@client_api('command', True)
def set_l2_mode (self, port, dst_mac):
"""
Sets the port mode to L2
:parameters:
port: int
Port ID to set the dest address
dst_mac: string
Destination MAC
:raises:
+ :exc:`TRexError`
"""
validate_type('port', port, int)
self.psv.validate('set_l2_mode', port)
if not is_valid_mac(dst_mac):
raise TRexError("dest_mac is not a valid MAC address: '{0}'".format(dst_mac))
self.ctx.logger.pre_cmd("Setting port {0} in L2 mode: ".format(port))
rc = self.ports[port].set_l2_mode(dst_mac)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def set_l3_mode (self, port, src_ipv4, dst_ipv4, vlan = None):
"""
Sets the port mode to L3
:parameters:
port: int
Port ID to set the addresses
src_ipv4: string
IPv4 source address for the port
dst_ipv4: string
IPv4 destination address
vlan: int or list of ints
VLAN configuration
:raises:
+ :exc:`TRexError`
"""
self.psv.validate('set_l3_mode', port, (PSV_ACQUIRED, PSV_SERVICE))
if not is_valid_ipv4(src_ipv4):
raise TRexError("src_ipv4 is not a valid IPv4 address: '{0}'".format(src_ipv4))
if not is_valid_ipv4(dst_ipv4):
raise TRexError("dst_ipv4 is not a valid IPv4 address: '{0}'".format(dst_ipv4))
# if VLAN was given - set it
if vlan is not None:
self.set_vlan(ports = port, vlan = vlan)
self.ctx.logger.pre_cmd("Setting port {0} in L3 mode: ".format(port))
rc = self.ports[port].set_l3_mode(src_ipv4, dst_ipv4)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
# resolve the port
self.resolve(ports = port)
@client_api('command', True)
def conf_ipv6(self, port, enabled, src_ipv6 = None):
"""
Configure IPv6 of port.
:parameters:
port: uint8_t
The port to disable ipv6
enabled: bool
Wherever IPv6 should be enabled
src_ipv6: string
Src IPv6 of port or empty string for auto-address
:raises:
+ :exc:`TRexError`
"""
self.psv.validate('Configure IPv6', port, (PSV_ACQUIRED, PSV_SERVICE))
validate_type('enabled', enabled, bool)
if enabled and src_ipv6 and not is_valid_ipv6(src_ipv6):
raise TRexError("src_ipv6 is not a valid IPv6 address: '%s'" % src_ipv6)
self.logger.pre_cmd('Configuring IPv6 on port %s' % port)
rc = self.ports[port].conf_ipv6(enabled, src_ipv6)
self.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def set_vlan (self, ports = None, vlan = None):
"""
Sets the port VLAN.
VLAN tagging will be applied to control traffic
such as ARP resolution of the port
and periodic gratidious ARP
:parameters:
ports: list
The port(s) to set the vlan
vlan: either None, int or a list of up to two ints
each value representing a VLAN tag
when two are supplied, provide QinQ tagging.
The first TAG is outer and the second is inner
:raises:
+ :exc:`TRexError`
"""
# validate ports and state
ports = ports if ports is not None else self.get_acquired_ports()
# validate ports
ports = self.psv.validate('set_vlan', ports, (PSV_ACQUIRED, PSV_IDLE, PSV_SERVICE))
vlan = VLAN(vlan)
if vlan:
self.ctx.logger.pre_cmd("Setting port(s) {0} with {1}: ".format(ports, vlan.get_desc()))
else:
self.ctx.logger.pre_cmd("Clearing port(s) {0} VLAN configuration: ".format(ports))
rc = self._for_each_port('set_vlan', ports, vlan)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def clear_vlan (self, ports = None):
"""
Clear any VLAN configuration on the port
:parameters:
ports: int
On which ports to clear VLAN config
:raises:
+ :exc:`TRexError`
"""
# validate ports and state
self.set_vlan(ports = ports, vlan = [])
@client_api('command', True)
def ping_ip (self, src_port, dst_ip, pkt_size = 64, count = 5, interval_sec = 1, vlan = None, **kw):
"""
Pings an IP address through a port
:parameters:
src_port: int
On which port_id to send the ICMP PING request
dst_ip: string
Which IP to ping
pkt_size: int
Packet size to use
count: int
How many times to ping
interval_sec: float
how much time to wait between pings
vlan: int or list of ints
One or two VLAN tags o.w it will be taken from the src port configuration
:returns:
List of replies per 'count'
Each element is dictionary with following items:
Always available keys:
* formatted_string - string, human readable output, for example: 'Request timed out.'
* status - string, one of options: 'success', 'unreachable', 'timeout'
Available only if status is 'success':
* src_ip - string, IP replying to request
* rtt - float, latency of the ping (round trip time)
* ttl - int, time to live in IPv4 or hop limit in IPv6
:raises:
+ :exc:`TRexError`
"""
if not (is_valid_ipv4(dst_ip) or is_valid_ipv6(dst_ip)):
raise TRexError("dst_ip is not a valid IPv4/6 address: '{0}'".format(dst_ip))
if (pkt_size < 64) or (pkt_size > 9216):
raise TRexError("pkt_size should be a value between 64 and 9216: '{0}'".format(pkt_size))
validate_type('count', count, int)
validate_type('interval_sec', interval_sec, (int, float))
# validate src port
if is_valid_ipv4(dst_ip):
self.psv.validate('ping', src_port, (PSV_ACQUIRED, PSV_SERVICE, PSV_L3))
else:
self.psv.validate('ping', src_port, (PSV_ACQUIRED, PSV_SERVICE))
# if vlan was given use it, otherwise generate it from the port
vlan = VLAN(self.ports[src_port].get_vlan_cfg() if vlan is None else vlan)
if vlan:
self.ctx.logger.pre_cmd("Pinging {0} from port {1} over {2} with {3} bytes of data:".format(dst_ip,
src_port,
vlan.get_desc(),
pkt_size))
else:
self.ctx.logger.pre_cmd("Pinging {0} from port {1} with {2} bytes of data:".format(dst_ip,
src_port,
pkt_size))
if is_valid_ipv4(dst_ip):
return self._ping_ipv4(src_port, vlan, dst_ip, pkt_size, count, interval_sec, **kw)
else:
return self._ping_ipv6(src_port, vlan, dst_ip, pkt_size, count, interval_sec, **kw)
# IPv4 ping
def _ping_ipv4 (self, src_port, vlan, dst_ip, pkt_size, count, interval_sec, **kw):
ctx = self.create_service_ctx(port = src_port)
ping = ServiceICMP(ctx, dst_ip = dst_ip, pkt_size = pkt_size, vlan = vlan, **kw)
records = []
self.ctx.logger.info('')
for i in range(count):
ctx.run(ping)
records.append(ping.get_record())
self.ctx.logger.info(ping.get_record())
if i != (count - 1):
time.sleep(interval_sec)
return records
# IPv6 ping
def _ping_ipv6 (self, src_port, vlan, dst_ip, pkt_size, count, interval_sec, **kw):
ctx = self.create_service_ctx(port = src_port)
ping = ServiceICMPv6(ctx, dst_ip = dst_ip, pkt_size = pkt_size, vlan = vlan, **kw)
records = []
self.ctx.logger.info('')
for i in range(count):
ctx.run(ping)
record = ping.get_record()
records.append(record)
self.ctx.logger.info(record['formatted_string'])
if i != (count - 1):
time.sleep(interval_sec)
return records
@client_api('command', True)
def server_shutdown (self, force = False):
"""
Sends the server a request for total shutdown
:parameters:
force: bool
Shutdown server even if some ports are owned by another user
:raises:
+ :exc:`TRexError`
"""
self.ctx.logger.pre_cmd("Sending shutdown request for the server")
rc = self._transmit("shutdown", params = {'force': force, 'user': self.ctx.username})
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def set_global_cfg (self, params):
"""
Change global configuration parameters.
:parameters:
params: dictionary
dictionary of global configuration parameters. Available parameters:
"sched_max_stretch", type = double, default =0.0, units = usec, scheduler's maximum time for stretch in case it is a high value there won't be a scheduler compensation on burst and time will not be stretch. 0.0 means use the default internal value (~100usec). value could not be lower than 100usec will be considered as 100usec.
:raises:
+ :exc:`TRexError`
"""
self.ctx.logger.pre_cmd("Changing global configuration parameters")
rc = self._transmit("set_global_cfg", params = params)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def get_global_cfg (self):
rc = self._transmit("get_global_cfg")
if not rc:
raise TRexError(rc)
return rc.data()
@client_api('command', True)
def push_packets (self, pkts, ports = None, force = False, ipg_usec = 0):
"""
Pushes a list of packets to the server
a 'packet' can be anything with a bytes representation
such as Scapy object, a simple string, a byte array and etc.
Total size, as for PCAP pushing is limited to 1MB
unless 'force' is specified
:parameters:
pkts: scapy pkt or a list of scapy pkts
Data to send
ports: list of ints
On which ports to push the packets
force: bool
Ignore size higer than 1 MB
ipg_usec: float
IPG in usec
"""
# by default, take acquire ports
ports = ports if ports is not None else self.get_acquired_ports()
ports = self.psv.validate('push_packets', ports)
# pkts should be scapy, bytes, str or a list of them
pkts = listify(pkts)
for pkt in pkts:
if not isinstance(pkt, (Ether, bytes, PacketBuffer)):
raise TRexTypeError('pkts', type(pkt), (Ether, bytes, PacketBuffer))
# for each packet, if scapy turn to bytes and then encode64 and transform to string
pkts_base64 = []
for pkt in pkts:
if isinstance(pkt, Ether):
# scapy
binary = bytes(pkt)
use_port_dst_mac = 'dst' not in pkt.fields
use_port_src_mac = 'src' not in pkt.fields
elif isinstance(pkt, PacketBuffer):
binary = pkt.buffer
use_port_dst_mac = pkt.port_dst
use_port_src_mac = pkt.port_src
else:
# binary
binary = pkt
use_port_dst_mac = True
use_port_src_mac = True
pkts_base64.append( {'binary': base64.b64encode(binary).decode(),
'use_port_dst_mac': use_port_dst_mac,
'use_port_src_mac': use_port_src_mac} )
self.ctx.logger.pre_cmd("Pushing {0} packets on port(s) {1}:".format(len(pkts), ports))
rc = self._for_each_port('push_packets', ports, pkts_base64, force ,ipg_usec)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
return rc
################################ consolidation HERE #####################################
@client_api('command', True)
def wait_on_traffic (self, ports = None, timeout = None, rx_delay_ms = None):
"""
.. _wait_on_traffic:
Block until traffic on specified port(s) has ended
:parameters:
ports : list
Ports on which to execute the command
timeout : int
timeout in seconds
default will be blocking
:raises:
+ :exc:`TRexTimeoutError` - in case timeout has expired
+ :exe:'TRexError'
"""
ports = ports if ports is not None else self.get_acquired_ports()
ports = self.psv.validate('wait_on_traffic', ports, PSV_ACQUIRED)
timer = PassiveTimer(timeout)
# wait while any of the required ports are active
while set(self.get_active_ports()).intersection(ports):
time.sleep(0.01)
if timer.has_expired():
raise TRexTimeoutError(timeout)
@client_api('command', True)
def set_port_attr (self,
ports = None,
promiscuous = None,
link_up = None,
led_on = None,
flow_ctrl = None,
multicast = None,
vxlan_fs = None):
"""
Set port attributes
:parameters:
promiscuous: bool
Promiscuous mode
link_up: bool
Link status
led_on: bool
LED of port
flow_ctrl: int
0: disable all, 1: enable tx side, 2: enable rx side, 3: full enable
multicast: bool
Enable receiving multicast
vxlan_fs: list
| UDP ports for which HW flow stats will be read from layers after VXLAN
| UDP(<dst_port>)/VXLAN()/Ether()/... <--- NIC will look for flow stats magic here
| Limited only to supported NICs (currently i40e)
:raises:
+ :exe:'TRexError'
"""
ports = listify_if_int(ports) if ports is not None else self.get_acquired_ports()
ports = self.psv.validate('set_port_attr', ports)
# check arguments
validate_type('promiscuous', promiscuous, (bool, type(None)))
validate_type('link_up', link_up, (bool, type(None)))
validate_type('led_on', led_on, (bool, type(None)))
validate_type('flow_ctrl', flow_ctrl, (int, type(None)))
validate_type('multicast', multicast, (bool, type(None)))
validate_type('vxlan_fs', vxlan_fs, (list, type(None)))
if all_none([promiscuous, link_up, led_on, flow_ctrl, multicast, vxlan_fs]):
return
self.ctx.logger.pre_cmd("Applying attributes on port(s) {0}:".format(ports))
rc = self._for_each_port('set_attr',
ports,
promiscuous = promiscuous,
link_up = link_up,
led_on = led_on,
flow_ctrl = flow_ctrl,
multicast = multicast,
vxlan_fs = vxlan_fs)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def set_service_mode_base (self, ports = None, enabled = True, filtered = False, mask = None):
"""
Set service mode for port(s)
In service mode ports will respond to ARP, PING and etc. "enable" and "filtered" are mutual exclusive,
choose 1 of them or none
:parameters:
ports: list
For which ports to configure service mode on/off
enabled: bool
True for activating service mode, False for disabling. Mutual exclusive with "filtered"
filtered: bool
True for activating service filtered mode, False for disabling. Mutual exclusive with "enabled"
mask: int
Mask to apply on each port in ports while filtered mode is on. Only packets that are correspond
to the port mask will be transferred.
Masks flags:
NO_MASK = 0
NO_TCP_UDP_MASK = 1
BGP_MASK = 2
DHCP_MASK = 4
TRANSPORT = 8
ALL_MASK = 255
:raises:
+ :exe:'TRexError'
"""
# by default take all acquired ports
ports = ports if ports is not None else self.get_acquired_ports()
ports = self.psv.validate('set_service_mode', ports, PSV_ACQUIRED)
# verifying
ver_args = {"types":[
{"name": "enabled", 'arg': enabled, "t": bool},
{"name": "filtered", 'arg': filtered, "t": bool},
]}
if mask is not None:
ver_args['types'].append({"name": "mask", 'arg': mask, 't': int})
ArgVerify.verify(self.__class__.__name__, ver_args)
if enabled and filtered:
raise TRexError("Cannot set service mode and filtered, choose one of them")
if (not filtered and not (mask is None)) or (filtered and (mask is None)):
raise TRexError("Cannot set mask with filtered = False or the opposite , please supply both or neither")
if enabled:
self.ctx.logger.pre_cmd('Enabling service mode on port(s): {0}'.format(ports))
elif filtered:
self.ctx.logger.pre_cmd('Enable filtered service mode on port(s) {0} with mask {1}'.format(ports, mask))
else:
self.ctx.logger.pre_cmd('Disabling service mode on port(s): {0}'.format(ports))
@contextmanager
def service_mode(self, ports):
non_service_ports = list_difference(ports, self.get_service_enabled_ports())
if non_service_ports:
self.set_service_mode(ports = non_service_ports, enabled = True)
try:
yield
finally:
if non_service_ports:
self.set_service_mode(ports = non_service_ports, enabled = False)
@client_api('command', True)
def resolve (self, ports = None, retries = 0, verbose = True, vlan = None):
"""
Resolves ports (ARP resolution)
:parameters:
ports: list
List of port IDs to resolve
retries: int
How many times to retry on each port (intervals of 100 milliseconds)
verbose: bool
Log for each request the response
vlan: int or list of ints
One or two VLAN tags o.w it will be taken from the src port configuration
:raises:
+ :exe:'TRexError'
"""
# by default - resolve all the ports that are configured with IPv4 dest
ports = ports if ports is not None else self.get_resolvable_ports()
ports = self.psv.validate('arp', ports, (PSV_ACQUIRED, PSV_SERVICE, PSV_L3))
# create a VLAN object - might throw exception on error
vlan = VLAN(vlan)
if vlan:
self.ctx.logger.pre_cmd('Resolving destination over {0} on port(s) {1}:'.format(vlan.get_desc(), ports))
else:
self.ctx.logger.pre_cmd('Resolving destination on port(s) {0}:'.format(ports))
# generate the context
arps = []
ports = listify_if_int(ports)
for port in ports:
self.ports[port].invalidate_arp()
src_ipv4 = self.ports[port].get_layer_cfg()['ipv4']['src']
dst_ipv4 = self.ports[port].get_layer_cfg()['ipv4']['dst']
port_vlan = self.ports[port].get_vlan_cfg() if vlan.is_default() else vlan
ctx = self.create_service_ctx(port)
# retries
for i in range(retries + 1):
arp = ServiceARP(ctx, dst_ip = dst_ipv4, src_ip = src_ipv4, vlan = port_vlan)
ctx.run(arp)
if arp.get_record():
self.ports[port].set_l3_mode(src_ipv4, dst_ipv4, arp.get_record().dst_mac)
break
arps.append(arp)
self.ctx.logger.post_cmd(all([arp.get_record() for arp in arps]))
failed = []
for port, arp in zip(ports, arps):
if arp.get_record():
self.ctx.logger.info(format_text("Port {0} - {1}".format(port, arp.get_record()), 'bold'))
else:
failed.append(port)
if failed:
raise TRexError('Could not resolve following ports: %s' % failed)
self.ctx.logger.info('')
# alias
arp = resolve
@client_api('command', True)
def scan6(self, ports = None, timeout = 3, verbose = False):
"""
Search for IPv6 devices on ports
:parameters:
ports: list
List of port IDs at which ports to run scan6
timeout: float
how much time to wait for responses
verbose: bool
log for each request the response
:return:
list of dictionaries per neighbor:
* type - type of device: 'Router' or 'Host'
* mac - MAC address of device
* ipv6 - IPv6 address of device
:raises:
+ :exe:'TRexError'
"""
ports = ports if ports is not None else self.get_acquired_ports()
ports = self.psv.validate('scan6', ports, [PSV_ACQUIRED, PSV_SERVICE])
self.ctx.logger.pre_cmd('Scanning network for IPv6 nodes on port(s) {0}:'.format(ports))
replies_per_port = {}
with self.ctx.logger.supress():
for port in ports:
try:
ctx = self.create_service_ctx(port)
scan6 = ServiceIPv6Scan(ctx, dst_ip = 'ff02::1', timeout = timeout)
ctx.run(scan6)
replies_per_port[port] = scan6.get_record()
except:
self.ctx.logger.post_cmd(True)
raise
self.ctx.logger.post_cmd(True)
if verbose:
for port, replies in replies_per_port.items():
if replies:
max_ip_len = 0
for resp in replies:
max_ip_len = max(max_ip_len, len(resp['ipv6']))
scan_table = TRexTextTable()
scan_table.set_cols_align(['c', 'c', 'l'])
scan_table.header(['Device', 'MAC', 'IPv6 address'])
scan_table.set_cols_width([9, 19, max_ip_len + 2])
resp = 'Port %s - IPv6 search result:' % port
self.ctx.logger.info(format_text(resp, 'bold'))
node_types = defaultdict(list)
for reply in replies:
node_types[reply['type']].append(reply)
for key in sorted(node_types.keys()):
for reply in node_types[key]:
scan_table.add_row([key, reply['mac'], reply['ipv6']])
self.ctx.logger.info(scan_table.draw())
self.ctx.logger.info('')
else:
self.ctx.logger.info(format_text('Port %s: no replies! Try to ping with explicit address.' % port, 'bold'))
return replies_per_port
@client_api('command', True)
def start_capture (self, tx_ports = None, rx_ports = None, limit = 1000, mode = 'fixed', bpf_filter = ''):
"""
Starts a low rate packet capturing on the server
:parameters:
tx_ports: list
on which ports to capture TX
rx_ports: list
on which ports to capture RX
limit: int
limit how many packets will be written memory requierment is O(9K * limit)
mode: str
'fixed' - when full, newer packets will be dropped
'cyclic' - when full, older packets will be dropped
bpf_filter: str
A Berkeley Packet Filter pattern
Only packets matching the filter will be appended to the capture
:returns:
returns a dictionary:
{'id: <new_id>, 'ts': <starting timestamp>}
where 'id' is the new capture ID for future commands
and 'ts' is that server monotonic timestamp when
the capture was created
:raises:
+ :exe:'TRexError'
"""
# default values for TX / RX ports
tx_ports = tx_ports if tx_ports is not None else []
rx_ports = rx_ports if rx_ports is not None else []
# listify if single port
tx_ports = listify_if_int(tx_ports)
rx_ports = listify_if_int(rx_ports)
# check arguments
self.psv.validate('start_capture', list_remove_dup(tx_ports + rx_ports) )
validate_type('limit', limit, (int))
if limit < 0:
raise TRexError("'limit' must be a non zero value")
if mode not in ('fixed', 'cyclic'):
raise TRexError("'mode' must be either 'fixed' or 'cyclic'")
# actual job
self.ctx.logger.pre_cmd("Starting packet capturing up to {0} packets".format(limit))
# capture RPC parameters
params = {'command' : 'start',
'limit' : limit,
'mode' : mode,
'tx' : tx_ports,
'rx' : rx_ports,
'filter' : bpf_filter}
rc = self._transmit("capture", params = params)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
return {'id': rc.data()['capture_id'], 'ts': rc.data()['start_ts']}
@client_api('command', True)
def stop_capture (self, capture_id, output = None):
"""
Stops an active capture and optionally save it to a PCAP file
:parameters:
capture_id: int
an active capture ID to stop
output: None / str / list
if output is None - all the packets will be discarded
if output is a 'str' - it will be interpeted as output filename
if it is a list, the API will populate the list with packet objects
in case 'output' is a list, each element in the list is an object
containing:
'binary' - binary bytes of the packet
'origin' - RX or TX origin
'ts' - timestamp relative to the start of the capture
'index' - order index in the capture
'port' - on which port did the packet arrive or was transmitted from
:raises:
+ :exe:'TRexError'
"""
# stopping a capture requires:
# 1. stopping
# 2. fetching
# 3. saving to file
validate_type('capture_id', capture_id, (int))
validate_type('output', output, (type(None), str, list))
# stop
self.ctx.logger.pre_cmd("Stopping packet capture {0}".format(capture_id))
rc = self._transmit("capture", params = {'command': 'stop', 'capture_id': capture_id})
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
# pkt count
pkt_count = rc.data()['pkt_count']
# fetch packets
if output is not None:
self.fetch_capture_packets(capture_id, output, pkt_count)
# remove
self.ctx.logger.pre_cmd("Removing PCAP capture {0} from server".format(capture_id))
rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': capture_id})
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def start_capture_port (self, port, endpoint, bpf_filter = None):
"""
Enable capture port to receive/send raw packets directly on a ZeroMQ
Pair socket.
The ZeroMQ socket should already be bound to the endpoint passed to
this function. The TRex server will then connect to this endpoint
and start sending all the packets that matches the given BPF filter
received on the provided port, on that socket.
Any packet sends from the client to the TRex server on that ZeroMQ
socket will also be sent as 'raw' packet on the specified port.
:parameters:
port: int
The port to activate the capture port on
endpoint: string
The path to the endpoint to use to bind the socket (e.g. ipc:///tmp/my_endpoint)
Should be unique and already bound to a PAIR ZeroMQ socket type.
See ZMQ_PAIR in http://api.zeromq.org/4-0:zmq-socket
bpf_filter: string
The BPF filter to use before sending packet on the ZeroMQ socket.
It can be empty for no filter.
:raises:
+ :exc:`TRexError`
"""
self.psv.validate('Capture Port start', port, (PSV_ACQUIRED, PSV_SERVICE))
self.logger.pre_cmd("Starting capture port on port {0} with socket at {1}: ".format(port, endpoint))
rc = self.ports[port].start_capture_port(endpoint, bpf_filter)
self.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def stop_capture_port (self, port):
"""
Disable capture port
:parameters:
port: int
The port to stop the capture port on
:raises:
+ :exc:`TRexError`
"""
self.psv.validate('Capture Port stop', port, (PSV_ACQUIRED, PSV_SERVICE))
self.logger.pre_cmd("Stoping capture port on port {0}: ".format(port))
rc = self.ports[port].stop_capture_port()
self.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def set_capture_port_bpf_filter (self, port, bpf_filter):
"""
Set the BPF filter for the capture port
:parameters:
port: int
The port to change the filter of the capture port
bpf_filter: string
The new BPF filter (empty disables the filter)
:raises:
+ :exc:`TRexError`
"""
self.psv.validate('Capture Port BPF filter set', port, (PSV_ACQUIRED, PSV_SERVICE))
self.logger.pre_cmd("Setting capture port filter on port {0}: ".format(port))
rc = self.ports[port].set_capture_port_bpf_filter(bpf_filter)
self.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def remove_all_captures (self):
"""
Removes any existing captures
"""
captures = self.get_capture_status()
self.ctx.logger.pre_cmd("Removing all packet captures from server")
for capture_id in captures.keys():
# remove
rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': capture_id})
if not rc:
raise TRexError(rc)
self.ctx.logger.post_cmd(RC_OK())
@client_api('command', False)
def clear_events (self):
"""
Clear all events
:parameters:
None
:raises:
None
"""
self.ctx.event_handler.clear_events()
@client_api('command', False)
def create_service_ctx (self, port):
"""
Generates a service context.
Services can be added to the context,
and then executed
"""
return ServiceCtx(self, port)
@client_api('command', True)
def map_ports(self, ports = None, read_delay = 0.3, send_pkts = 3):
"""
Get mapping of ports (connectivity)
:parameters:
ports: list
For which ports to apply a queue, default is all acquired ports
read_delay: float
Delay in sec between sending packets and looking at received results
send_pkts: int
How much packets to send from each port
:raises:
+ :exe:'TRexError'
"""
# by default use all acquired ports
ports = ports if ports is not None else self.get_acquired_ports()
if not ports:
raise TRexError('No ports to map, acquire some ports or specify them explicitly.')
# validate
ports = self.psv.validate('map_ports', ports)
magic = random.getrandbits(32)
bpf = 'udp[8:4]= 0x%x' % magic
with self.service_mode(ports):
rc = self.start_capture(rx_ports = ports, bpf_filter = '{0} or (vlan && {0}) or (vlan && {0})'.format(bpf))
capture_id = rc['id']
try:
base_pkt = Ether() / IP() / UDP(sport=12345,dport=12345) / struct.pack('!I', magic)
for port_id in ports:
pkt = base_pkt / struct.pack('!B', port_id)
port = self.get_port(port_id)
vlan = VLAN(port.get_vlan_cfg())
vlan.embed(pkt)
if port.is_l3_mode():
src_ip = port.get_layer_cfg()['ipv4']['src']
pkt[IP].src = src_ip
pkt[IP].dst = src_ip.split('.')[0] + '.255.255.255' # broadcast src ip subnet
else:
pkt[IP].dst = '255.1.1.1' # some address that is not taken
self.push_packets([pkt] * send_pkts, ports = port_id, ipg_usec = 1)
time.sleep(read_delay)
captured_pkts = []
self.fetch_capture_packets(capture_id, captured_pkts)
finally:
self.stop_capture(capture_id)
pkts_map = {}
for tx_port in ports:
pkts_map[tx_port] = {}
for rx_port in ports:
pkts_map[tx_port][rx_port] = 0
for pkt in captured_pkts:
scapy_pkt = Ether(pkt['binary'])
if UDP not in scapy_pkt:
continue
udp_payload = bytes(scapy_pkt[UDP].payload)
if len(udp_payload) < 5:
continue
tx_port = struct.unpack('!B', udp_payload[4:5])[0]
rx_port = pkt['port']
pkts_map[tx_port][rx_port] += 1
table = {'map': {}, 'bi' : [], 'unknown': []}
# actual mapping
for tx_port in ports:
table['map'][tx_port] = None
for rx_port in ports:
if pkts_map[tx_port][rx_port] * 2 > send_pkts:
table['map'][tx_port] = rx_port
unmapped = list(ports)
while len(unmapped) > 0:
port_a = unmapped.pop(0)
port_b = table['map'][port_a]
# if unknown - add to the unknown list
if port_b == None:
table['unknown'].append(port_a)
# self-loop, due to bug?
elif port_a == port_b:
continue
# bi-directional ports
elif (table['map'][port_b] == port_a):
unmapped.remove(port_b)
table['bi'].append( (port_a, port_b) )
return table
############################ deprecated #############################
############################ #############################
############################ #############################
@client_api('command', True)
def set_rx_queue (self, ports = None, size = 1000):
"""
Sets RX queue for port(s)
The queue is cyclic and will hold last 'size' packets
:parameters:
ports: list
For which ports to apply a queue
size: int
size of the queue
:raises:
+ :exe:'TRexError'
"""
ports = ports if ports is not None else self.get_acquired_ports()
ports = self.psv.validate('set_rx_queue', ports, PSV_ACQUIRED)
# check arguments
validate_type('size', size, (int))
if size <= 0:
raise TRexError("'size' must be a positive value")
self.ctx.logger.pre_cmd("Setting RX queue on port(s) {0}:".format(ports))
rc = self._for_each_port('set_rx_queue', ports, size)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
@client_api('command', True)
def remove_rx_queue (self, ports = None):
"""
Removes RX queue from port(s)
:parameters:
ports: list
for which ports to remove the RX queue
:raises:
+ :exe:'TRexError'
"""
ports = ports if ports is not None else self.get_acquired_ports()
ports = self.psv.validate('remove_rx_queue', ports, PSV_ACQUIRED)
self.ctx.logger.pre_cmd("Removing RX queue on port(s) {0}:".format(ports))
rc = self._for_each_port('remove_rx_queue', ports)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
############################ private #############################
############################ common #############################
############################ functions #############################
def _post_acquire_common(self, ports):
for port_id in listify_if_int(ports):
if not self.ports[port_id].is_resolved():
self.ctx.logger.info(format_text('*** Warning - Port {0} destination is unresolved ***'.format(port_id), 'bold'))
def _get_stats_common (self, ports = None, sync_now = True, ext_stats = None):
"""
A common method for STL/ASTF to generate stats output
"""
# by default use all acquired ports
ports = ports if ports is not None else self.get_acquired_ports()
# validate
ports = self.psv.validate('get_stats', ports)
validate_type('sync_now', sync_now, bool)
# create the stats mapping
stats = {'global': self.global_stats}
stats.update({port.port_id : port.get_stats() for port in self.ports.values()})
# add the extended stats provided by the specific class (if any)
if ext_stats:
stats.update(ext_stats)
# if required, update the stats list
if sync_now:
rc = StatsBatch.update(stats.values(), self.conn.rpc)
if not rc:
raise TRexError(rc)
# generate the output
output = {}
for k, v in stats.items():
output[k] = v.to_dict()
# create total for ports
ps_sum = PortStatsSum()
for p in [port.get_stats() for port in self.ports.values()]:
ps_sum += p
output['total'] = ps_sum.to_dict()
return output
def _clear_stats_common (self, ports = None, clear_global = True, clear_xstats = True, ext_stats = None):
"""
A common method for STL/ASTF to clear stats
"""
# by default use all acquired ports
ports = ports if ports is not None else self.get_acquired_ports()
# validate
ports = self.psv.validate('clear_stats', ports)
stats = []
if clear_global:
stats.append(self.global_stats)
if clear_xstats:
stats += [port.get_xstats() for port in self.ports.values()]
# ports stats
stats += [port.get_stats() for port in self.ports.values()]
# reset
self.ctx.logger.pre_cmd("Clearing stats :")
rc = StatsBatch.reset(stats, self.conn.rpc)
self.ctx.logger.post_cmd(rc)
if not rc:
raise TRexError(rc)
def _is_service_req(self):
''' Return True as service mode check is required in general '''
return True
@property
def any_port(self):
for port in self.ports.values():
return port
def _get_service_params(self, opts):
"""
Common function, creates 3 arguments for set_service_mode
:parameters:
opts: argparse
The result of: parser.parse_args(line.split()).
:return:
3 arguments: enable, filtered & mask to use in set_service_mode
"""
filtered = opts.allow_no_tcp_udp or opts.allow_bgp or opts.allow_all or opts.allow_emu or opts.allow_dhcp or opts.allow_transport
mask = 0
if filtered:
if opts.allow_dhcp:
mask |= DHCP_MASK
if opts.allow_emu:
mask |= ( DHCP_MASK | NO_TCP_UDP_MASK | TRANSPORT_MASK )
if opts.allow_all:
mask = ALL_MASK
if opts.allow_bgp:
mask |= BGP_MASK
if opts.allow_no_tcp_udp:
mask |= NO_TCP_UDP_MASK
if opts.allow_transport:
mask |= TRANSPORT_MASK
else:
mask = None
enabled = False if filtered else opts.enabled
return enabled, filtered, mask
############################ console #############################
############################ commands #############################
############################ #############################
@console_api('map', 'common', True)
def map_line(self, line):
'''Maps ports topology\n'''
ports = self.get_acquired_ports()
ports = self.psv.validate('map', ports)
if not ports:
raise TRexError('map: ')
print("No ports acquired\n")
return
with self.logger.supress():
table = self.map_ports(ports = ports)
self.logger.info(format_text('\nAcquired ports topology:\n', 'bold', 'underline'))
# bi-dir ports
self.logger.info(format_text('Bi-directional ports:\n','underline'))
for port_a, port_b in table['bi']:
self.logger.info("port {0} <--> port {1}".format(port_a, port_b))
self.logger.info('')
# unknown ports
self.logger.info(format_text('Mapping unknown:\n','underline'))
for port in table['unknown']:
self.logger.info("port {0}".format(port))
self.logger.info('')
@console_api('connect', 'common', False)
def connect_line (self, line):
'''Connects to the TRex server and acquire ports'''
parser = parsing_opts.gen_parser(self,
"connect",
self.connect_line.__doc__,
parsing_opts.FORCE,
parsing_opts.READONLY)
opts = parser.parse_args(line.split())
self.connect()
if not opts.readonly:
self.acquire(force = opts.force)
@console_api('disconnect', 'common')
def disconnect_line (self, line):
'''Disconnect from the TRex server'''
parser = parsing_opts.gen_parser(self,
"disconnect",
self.disconnect_line.__doc__)
opts = parser.parse_args(line.split())
self.disconnect()
@console_api('global_cfg', 'common')
def global_cfg_line (self, line):
'''Set global configuration parameters'''
parser = parsing_opts.gen_parser(self,
"global_cfg",
self.global_cfg_line.__doc__,
parsing_opts.TUNABLES)
opts = parser.parse_args(line.split(), verify_acquired = True)
if opts.tunables:
self.set_global_cfg(opts.tunables)
for key, value in self.get_global_cfg().items():
print("{} = {}".format(key, value))
return True
@console_api('ping', 'common', True)
def ping_line (self, line):
'''Pings the server / specific IP'''
# no parameters - so ping server
if not line:
self.ping_rpc_server()
return True
parser = parsing_opts.gen_parser(self,
"ping",
self.ping_line.__doc__,
parsing_opts.SINGLE_PORT,
parsing_opts.PING_IP,
parsing_opts.PKT_SIZE,
parsing_opts.VLAN_TAGS,
parsing_opts.PING_COUNT)
opts = parser.parse_args(line.split(), verify_acquired = True)
# IP ping
# source ports maps to ports as a single port
self.ping_ip(opts.ports[0], opts.ping_ip, opts.pkt_size, opts.count, vlan = opts.vlan)
@console_api('l2', 'common', True)
def set_l2_mode_line (self, line):
'''Configures a port in L2 mode'''
parser = parsing_opts.gen_parser(self,
"l2",
self.set_l2_mode_line.__doc__,
parsing_opts.SINGLE_PORT,
parsing_opts.DST_MAC)
opts = parser.parse_args(line.split())
# source ports maps to ports as a single port
self.set_l2_mode(opts.ports[0], dst_mac = opts.dst_mac)
return True
@console_api('l3', 'common', True)
def set_l3_mode_line (self, line):
'''Configures a port in L3 mode'''
parser = parsing_opts.gen_parser(self,
"l3",
self.set_l3_mode_line.__doc__,
parsing_opts.SINGLE_PORT,
parsing_opts.SRC_IPV4,
parsing_opts.DST_IPV4,
parsing_opts.VLAN_TAGS,
)
opts = parser.parse_args(line.split())
# source ports maps to ports as a single port
self.set_l3_mode(opts.ports[0], src_ipv4 = opts.src_ipv4, dst_ipv4 = opts.dst_ipv4, vlan = opts.vlan)
return True
@console_api('ipv6', 'common', True)
def conf_ipv6_line(self, line):
'''Configures IPv6 of a port'''
parser = parsing_opts.gen_parser(self,
"port",
self.conf_ipv6_line.__doc__,
parsing_opts.SINGLE_PORT,
parsing_opts.IPV6_OPTS_CMD,
)
opts = parser.parse_args(line.split())
if opts.off:
self.conf_ipv6(opts.ports[0], False)
elif opts.auto_ipv6:
self.conf_ipv6(opts.ports[0], True)
else:
self.conf_ipv6(opts.ports[0], True, opts.src_ipv6)
return True
@console_api('vlan', 'common', True)
def set_vlan_line (self, line):
'''Configures VLAN tagging for a port.
control generated traffic such as ARP will be tagged'''
parser = parsing_opts.gen_parser(self,
"vlan",
self.set_vlan_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.VLAN_CFG,
)
opts = parser.parse_args(line.split())
if opts.clear_vlan:
self.clear_vlan(ports = opts.ports)
else:
self.set_vlan(ports = opts.ports, vlan = opts.vlan)
return True
@console_api('scan6', 'common', True)
def scan6_line(self, line):
'''Search for IPv6 neighbors'''
parser = parsing_opts.gen_parser(self,
"scan6",
self.scan6_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.TIMEOUT)
opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
kw = {}
if opts.timeout is not None:
kw['timeout'] = opts.timeout
rc_per_port = self.scan6(ports = opts.ports, verbose = True, **kw)
return True
@console_api('arp', 'common', True)
def resolve_line (self, line):
'''Performs a port ARP resolution'''
parser = parsing_opts.gen_parser(self,
"resolve",
self.resolve_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.VLAN_TAGS,
parsing_opts.RETRIES)
opts = parser.parse_args(line.split(), default_ports = self.get_resolvable_ports(), verify_acquired = True)
self.resolve(ports = opts.ports, retries = opts.retries, vlan = opts.vlan)
return True
@console_api('pkt', 'common', True)
def pkt_line (self, line):
'''Sends a Scapy format packet'''
parser = parsing_opts.gen_parser(self,
"pkt",
self.pkt_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.DRY_RUN,
parsing_opts.SCAPY_PKT_CMD,
parsing_opts.FORCE)
opts = parser.parse_args(line.split())
# show layers option
if opts.layers:
self.logger.info(format_text('\nRegistered Layers:\n', 'underline'))
self.logger.info(parsing_opts.ScapyDecoder.formatted_layers())
return
# dry run option
if opts.dry:
self.logger.info(format_text('\nPacket (Size: {0}):\n'.format(format_num(len(opts.scapy_pkt), suffix = 'B')), 'bold', 'underline'))
opts.scapy_pkt.show2()
self.logger.info(format_text('\n*** DRY RUN - no traffic was injected ***\n', 'bold'))
return
self.push_packets(pkts = opts.scapy_pkt, ports = opts.ports, force = opts.force)
return True
@console_api('shutdown', 'common', True)
def shutdown_line (self, line):
'''Shutdown the server'''
parser = parsing_opts.gen_parser(self,
"shutdown",
self.shutdown_line.__doc__,
parsing_opts.FORCE)
opts = parser.parse_args(line.split())
self.server_shutdown(force = opts.force)
return True
def _ns_add(self,opts):
port = opts.ports[0]
is_shared = opts.shared_ns is not None
cmds = NSCmds()
MAC = opts.mac
cmds.add_node(MAC, shared_ns = opts.shared_ns)
if not is_shared:
cmds.set_ipv4(MAC, opts.src_ipv4, opts.dst_ipv4, opts.subnet, is_shared)
else:
cmds.set_ipv4(MAC, opts.src_ipv4, subnet = opts.subnet, shared_ns = is_shared)
cmds.set_dg(opts.shared_ns, opts.dst_ipv4)
if opts.vlan:
cmds.set_vlan(MAC, opts.vlan, opts.tpid)
if opts.auto_ipv6:
cmds.set_ipv6(MAC, True, shared_ns = is_shared)
self.set_namespace_start(port, cmds)
self.wait_for_async_results(port);
def _ns_remove (self,opts):
port= opts.ports[0]
cmds=NSCmds()
MAC=opts.mac
cmds.remove_node(MAC)
self.set_namespace_start(port, cmds)
self.wait_for_async_results(port);
def _ns_show_countres (self,opts):
port= opts.ports[0]
cmds=NSCmds()
cmds.counters_get_meta()
cmds.counters_get_values()
port= opts.ports[0]
self.set_namespace_start(port, cmds)
r=self.wait_for_async_results(port);
ns_stat = CNsStats()
ns_stat.set_meta_values(r[0]['result']['data'], r[1]['result'][''])
ns_stat.dump_stats()
def _ns_clear_countres(self,opts):
port= opts.ports[0]
cmds=NSCmds()
cmds.clear_counters()
port= opts.ports[0]
self.set_namespace_start(port, cmds)
r=self.wait_for_async_results(port);
def _ns_show_nodes (self,opts):
cmds=NSCmds()
cmds.get_nodes()
port= opts.ports[0]
self.set_namespace_start(port, cmds)
r=self.wait_for_async_results(port);
macs=r[0]['result']['nodes']
if len(macs)==0:
print("Empty")
return;
stable = text_tables.TRexTextTable('ns nods')
stable.set_cols_align(['c','c'] )
stable.set_cols_width([10,17] )
stable.set_cols_dtype(['t','t'])
stable.header(['node-id','mac'])
cnt=0
for obj in macs:
stable.add_row([cnt,obj])
cnt +=1
if cnt>20:
print(" Limited to only 20 nodes !")
break;
text_tables.print_table_with_header(stable, untouched_header = stable.title, buffer = sys.stdout)
def _ns_remove_all (self,opts):
self.namespace_remove_all()
def _ns_show_node(self,opts):
port = opts.ports[0]
MAC =opts.mac
cmds=NSCmds()
cmds.get_nodes_info([MAC])
port= opts.ports[0]
self.set_namespace_start(port, cmds)
r=self.wait_for_async_results(port);
pprint.pprint(r[0]['result'])
@console_api('ns', 'common', True, True)
def ns_line(self, line):
'''Network namespace '''
parser = parsing_opts.gen_parser(
self,
'ns',
self.ns_line.__doc__)
def ns_add_parsers(subparsers, cmd, help = '', **k):
return subparsers.add_parser(cmd, description = help, help = help, **k)
subparsers = parser.add_subparsers(title = 'commands', dest = 'command', metavar = '')
add_parser = ns_add_parsers(subparsers, 'add', help = 'add one node')
remove_parser = ns_add_parsers(subparsers, 'remove', help = 'remove one node')
show_cnt_parser = ns_add_parsers(subparsers, 'show-counters', help = 'show counters')
clear_cnt_parser = ns_add_parsers(subparsers, 'clear-counters', help = 'clear counters')
show_nodes = ns_add_parsers(subparsers, 'show-nodes', help = 'show nodes')
show_node = ns_add_parsers(subparsers, 'show-node', help = 'show nodes')
remove_all_parser = ns_add_parsers(subparsers, 'remove-all', help = 'remove all')
add_parser.add_arg_list(
parsing_opts.SINGLE_PORT,
parsing_opts.NODE_MAC,
parsing_opts.VLAN_TAGS,
parsing_opts.VLAN_TPIDS,
parsing_opts.SRC_IPV4,
parsing_opts.DST_IPV4_NOT_REQ,
parsing_opts.IPV6_AUTO,
parsing_opts.SHARED_NS,
parsing_opts.SUBNET,
)
remove_parser.add_arg_list(
parsing_opts.SINGLE_PORT,
parsing_opts.NODE_MAC,
)
show_node.add_arg_list(
parsing_opts.SINGLE_PORT,
parsing_opts.NODE_MAC,
)
show_cnt_parser.add_arg_list(
parsing_opts.SINGLE_PORT,
)
clear_cnt_parser.add_arg_list(
parsing_opts.SINGLE_PORT,
)
show_nodes.add_arg_list(
parsing_opts.SINGLE_PORT,
)
opts = parser.parse_args(line.split())
if opts.command == 'add':
self._ns_add(opts);
return False
elif opts.command == 'remove':
self._ns_remove(opts);
elif opts.command == 'show-counters' or not opts.command:
self._ns_show_countres(opts);
return False
elif opts.command == 'clear-counters':
self._ns_clear_countres(opts);
return False
elif opts.command == 'show-nodes' :
self._ns_show_nodes(opts);
return False
elif opts.command == 'show-node' :
self._ns_show_node(opts);
return False
elif opts.command == 'remove-all':
self._ns_remove_all (opts)
return False
else:
raise TRexError('Unhandled command %s' % opts.command)
return True
@console_api('portattr', 'common', True)
def set_port_attr_line (self, line):
'''Sets port attributes '''
parser = parsing_opts.gen_parser(self,
"portattr",
self.set_port_attr_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.PROMISCUOUS,
parsing_opts.LINK_STATUS,
parsing_opts.LED_STATUS,
parsing_opts.FLOW_CTRL,
parsing_opts.VXLAN_FS,
parsing_opts.SUPPORTED,
parsing_opts.MULTICAST)
opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), allow_empty = False)
opts.prom = parsing_opts.ON_OFF_DICT.get(opts.prom)
opts.mult = parsing_opts.ON_OFF_DICT.get(opts.mult)
opts.link = parsing_opts.UP_DOWN_DICT.get(opts.link)
opts.led = parsing_opts.ON_OFF_DICT.get(opts.led)
opts.flow_ctrl = parsing_opts.FLOW_CTRL_DICT.get(opts.flow_ctrl)
# if no attributes - fall back to printing the status
if not list(filter(lambda opt:opt[0] not in ('all_ports', 'ports') and opt[1] is not None, opts._get_kwargs())):
ports = opts.ports if opts.ports else self.get_all_ports()
self.show_stats_line("--ps --port {0}".format(' '.join(str(port) for port in ports)))
return
if opts.supp:
info = self.any_port.get_formatted_info() # assume for now all ports are same
print('')
print('Supported attributes for current NICs:')
print(' Promiscuous: %s' % info['prom_supported'])
print(' Multicast: yes')
print(' Link status: %s' % info['link_change_supported'])
print(' LED status: %s' % info['led_change_supported'])
print(' Flow control: %s' % info['fc_supported'])
print(' VXLAN FS: %s' % info['is_vxlan_supported'])
print('')
else:
if not opts.ports:
raise TRexError('No acquired ports!')
self.set_port_attr(
opts.ports,
opts.prom,
opts.link,
opts.led,
opts.flow_ctrl,
opts.mult,
opts.vxlan_fs)
@console_api('events', 'basic', False)
def get_events_line (self, line):
'''Shows events log\n'''
x = [parsing_opts.ArgumentPack(['-c','--clear'],
{'action' : "store_true",
'default': False,
'help': "clear the events log"}),
parsing_opts.ArgumentPack(['-i','--info'],
{'action' : "store_true",
'default': False,
'help': "show info events"}),
parsing_opts.ArgumentPack(['-w','--warn'],
{'action' : "store_true",
'default': False,
'help': "show warning events"}),
]
parser = parsing_opts.gen_parser(self,
"events",
self.get_events_line.__doc__,
*x)
opts = parser.parse_args(line.split())
ev_type_filter = []
if opts.info:
ev_type_filter.append('info')
if opts.warn:
ev_type_filter.append('warning')
if not ev_type_filter:
ev_type_filter = None
events = self.get_events(ev_type_filter)
for ev in events:
self.logger.info(ev)
if opts.clear:
self.clear_events()
@console_api('clear', 'common', False)
def clear_stats_line (self, line):
'''Clear cached local statistics\n'''
# define a parser
parser = parsing_opts.gen_parser(self,
"clear",
self.clear_stats_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL)
opts = parser.parse_args(line.split())
self.clear_stats(opts.ports)
return RC_OK()
def get_console_methods (self):
def predicate (x):
return inspect.ismethod(x) and getattr(x, 'api_type', None) == 'console'
return {cmd[1].name : cmd[1] for cmd in inspect.getmembers(self, predicate = predicate)}
################## private common console functions ##################
def _show_global_stats (self, buffer = sys.stdout):
self.global_stats.update_sync(self.conn.rpc)
table = self.global_stats.to_table()
text_tables.print_table_with_header(table, table.title, buffer = buffer)
def _show_port_stats (self, ports, buffer = sys.stdout):
if not ports:
self.logger.warning(format_text('Empty set of ports\n', 'bold'))
return
port_stats = [self.ports[port_id].get_port_stats() for port_id in ports[:4]]
# update in a batch
StatsBatch.update(port_stats, self.conn.rpc)
tables = [stat.to_table() for stat in port_stats]
# total if more than 1
if len(port_stats) > 1:
sum = PortStatsSum()
for stats in port_stats:
sum += stats
tables.append(sum.to_table())
# merge
table = TRexTextTable.merge(tables)
# show
text_tables.print_table_with_header(table, table.title, buffer = buffer)
def _show_port_xstats (self, ports, include_zero_lines):
if not ports:
self.logger.warning(format_text('Empty set of ports\n', 'bold'))
return
port_xstats = [self.ports[port_id].get_port_xstats() for port_id in ports[:4]]
# update in a batch
StatsBatch.update(port_xstats, self.conn.rpc)
# merge
table = TRexTextTable.merge([stat.to_table(True) for stat in port_xstats],
row_filter = lambda row: include_zero_lines or any([v != '0' for v in row]))
# show
text_tables.print_table_with_header(table, table.title)
def _show_port_status (self, ports):
if not ports:
self.logger.warning(format_text('Empty set of ports\n', 'bold'))
return
# for each port, fetch port status
port_status = [self.ports[port_id].get_port_status() for port_id in ports[:4]]
# merge
table = TRexTextTable.merge(port_status)
text_tables.print_table_with_header(table, table.title)
def _show_cpu_util (self, buffer = sys.stdout):
self.util_stats.update_sync(self.conn.rpc)
table = self.util_stats.to_table('cpu')
text_tables.print_table_with_header(table, table.title, buffer = buffer)
def _show_mbuf_util (self, buffer = sys.stdout):
self.util_stats.update_sync(self.conn.rpc)
table = self.util_stats.to_table('mbuf')
text_tables.print_table_with_header(table, table.title, buffer = buffer)