Netzob documentation¶
Netzob is an open source tool for reverse engineering, traffic generation and fuzzing of communication protocols. It can be used to infer the message format and the state machine of a protocol through passive and active processes. The model can afterward be used to simulate realistic and controllable trafic.
The main features of Netzob are:
- Protocol Vocabulary Modeling and Inference
- Netzob includes a complete model to represents the message format of a protocol (aka its vocabulary). Using specific algorithms, it allows to learn it from provided traces.
- Protocol Grammar Modeling and Inference
- The state machine of a protocol (aka its grammar) defines the valid sequences of exchanged messages. Netzob allows to learn it semi-automaticaly using specific algorithms.
- Protocol Simulation
- To support the inferring process, a dynamic analysis is perfomed based on simulated actors. These can initiate and take part in a complex communication following the infered protocol.
Contact information¶
Website: | http://www.netzob.org |
---|---|
Email: | contact@netzob.org |
Mailing list: | Users, developers and announces lists are available, use the SYMPA web interface to register. |
IRC: | You can hang-out with us on Freenode’s IRC channel #netzob @ freenode.org. |
Wiki: | Discuss strategy on Netzob’s wiki |
Twitter: | Follow Netzob’s official accounts (@Netzob) |
Netzob Overview¶
Overview of Netzob¶
Netzob has been initiated by security auditors of AMOSSYS and the CIDre research team of Supélec to address the reverse engineering of communication protocols.
Originaly, the development of Netzob has been initiated to support security auditors and evaluators in their activities of modeling and simulating undocumented protocols. The tool has then been extended to allow smart fuzzing of unknown protocol.
The following picture depicts the main modules of Netzob:
- Import module: Data import is available in two ways: either by leveraging the channel-specific captors (currently network and IPC – Inter-Process Communication), or by using specific importers (such as PCAP files, structured files and OSpy files).
- Protocol inference modules: The vocabulary and grammar inference methods constitute the core of Netzob. It allows both passive and active reverse engineering of communication flows through automated and manuals mechanisms.
- Simulation module: Given vocabulary and grammar models previously inferred, Netzob can understand and generate communication traffic between multiple actors. It can act as either a client, a server or both.
- Export module: This module permits to export an inferred model of a protocol in formats that are understandable by third party software or by a human. Current work focuses on export format compatible with main traffic dissectors (Wireshark and Scapy) and fuzzers (Peach and Sulley).
And here is a screenshot of the main graphical interface:
The following sections will describe in more details the available mechanisms.
Import and capture data¶
The first step in the inferring process of a protocol in Netzob is to capture and to import messages as samples. There are different methods to retrieve messages depending of the communication channel used (files, network, IPC, USB, etc.) and the format (PCAP, hex, raw binary flows, etc.).
The figure below describes the multiple communication channels and therefore possible sniffing point’s Netzob aims at addressing.
The current version (version 0.4) of Netzob deals with the following data sources :
- Live network communications
- Captured network communications (PCAPs)
- Inter-Process Communications (IPCs)
- Text and binary files
- API flows through oSpy file format support
Otherwise, if you plan to reverse a protocol implemented over an supported communication channel, Netzob’s can manipulates any communications flow through an XML representation. Therefore, this situation only requires a specific development to capture the targeted flow and to save it using a compatible XML.
Inferring message format and state machine with Netzob¶
The vocabulary of a communication protocol defines all the words which are integrated in it. For example, the vocabulary of a malware’s communication protocol looks like a set of possible commands : {“attack www.google.fr”, “dnspoison this.dns.server.com”, “execute ‘uname -a’”, ...}. Another example of a vocabulary is the set of valids words in the HTTP protocol : { “GET /images/logo.png HTTP/1.1 ...”, “HTTP/1.1 200 OK ...”, ...}.
Netzob’s vocabulary inferring process has been designed in order to retrieve the set of all possible words used in a targeted protocol and to identify their structures. Indeed words are made of different fields which are defined by their value and types. Hence a word can be described using the structure of its fields.
We describe the learning process implemented in Netzob to semi-automatically infer the vocabulary and the grammar of a protocol. This process, illustrated in the following picture, is performed in three main steps:
- Clustering messages and partitioning these messages in fields.
- Characterizing message fields and abstracting similar messages in symbols.
- Inferring the transition graph of the protocol.
Step 1: clustering Messages and Partitioning in Fields¶
To discover the format of a symbol, Netzob supports different partitioning approaches. In this article we describe the most accurate one, that leverages sequence alignment processes. This technique permits to align invariants in a set of messages. The Needleman-Wunsh algorithm performs this task optimally. Needleman-Wunsh is particularly effective on protocols where dynamic fields have variable lengths (as shown on the following picture).
When partitioning and clustering processes are done, we obtain a relevant first approximation of the overall message formats. The next step consists in determining the characteristics of the fields.
If the size of those fields is fixed, as in TCP and IP headers, it is preferable to apply a basic partitioning, also provided by Netzob. Such partitioning works by aligning each message by the left, and then separating successive fixed columns from successive dynamic columns.
To regroup aligned messages by similarity, the Needleman-Wunsh algorithm is used in conjunction with a clustering algorithm. The applied algorithm is UPGMA.
Step 2 : characterization of Fields¶
The field type identification partially derives from the partitioning inference step. For fields containing only invariants, the type merely corresponds to the invariant value. For other fields, the type is automatically materialized, in first approximation, with a regular expression, as shown on next figure. This form allows to easily validate the data conformity with a specific type. Moreover, Netzob offers the possibility to visualize the definition domain of a field. This helps to manually refine the type associated with a field.
Some intra-symbol dependencies are automatically identified. The size field, present in many protocol formats, is an example of intra-symbol dependency. A search algorithm has been designed to look for potential size fields and their associated payloads. By extension, this technique permits to discover encapsulated protocol payloads.
Environmental dependencies are also identified by looking for specific values retrieved during message capture. Such specific values consist of characteristics of the underlying hardware, operating system and network configuration. During the dependency analysis, these characteristics are searched in various encoding.
Step 3: inferring the Transition Graph of the Protocol¶
The third step of the learning process discovers and extracts the transition graph from a targeted protocol (also called the grammar). More formally, the grammar of a communication protocol defines the set of valid sentences which can be produced by a communication. A sentence is a sorted set of words which may be received or emmited by a protocol handler. An exemple of a simple sentence is :
["attack www.google.fr", "attack has failed", "attack www.kernel.org", "root access granted."]
which can be described using the following simple automata with S0 the initial state :
The learning process step is achieved by a set of active experiments that stimulate a real client or server implementation using successive sequences of input symbols and analyze its responses.
In Netzob, the automata used to represent or model a communication protocol is an extended version of a Mealy automata which includes semi-stochastic transitions, contextualized and parametrized inputs and outputs. The first academic presention of this model is included in a dedicated scientific paper provided in the documentation section.
The model is inferred through a dedicated active process which consists in stimulating an implementation and to analyze its responses. In this process, we use the previously infered vocabulary to discover and to learn the grammar of the communication protocol. Each stimulation is computed following an extension of the Angluin L algorithm*.
Protocol simulation¶
One of our main goal is to generate realistic network traffic from undocummented protocols. Therefore, we have implemented a dedicated module that, given vocabulary and grammar models previously infered, can simulate a communication protocol between multiple bots and masters. Besides their use of the same model, each actors is independent from the others and is organized around three main stages.
The first stage is a dedicated library that reads and writes from the network channel. It also parses the flow in messages according to previous protocols layers. The second stage uses the vocabulary to abstract received messages into symbols and vice-versa to specialize emitted symbols into messages. A memory buffer is also available to manage dependency relations. The last stage implements the grammar model and computes which symbols must be emitted or received according to the current state and time.
Smart fuzzing with Netzob¶
A typical example of dynamic vulnerability analysis is the robustness tests. It can be used to reveal software programming errors which can leads to software security vulnerabilities. These tests provide an efficient and almost automated solution to easily identify and study exposed surfaces of systems. Nevertheless, to be fully efficient, the fuzzing approaches must cover the complete definition domain and combination of all the variables which exist in a protocol (IP adresses, serial numbers, size fields, payloads, message identifer, etc.). But fuzzing typical communication interface requires too many test cases due to the complex variation domains introduced by the semantic layer of a protocol. In addition to this, an efficient fuzzing should also cover the state machine of a protocol which also brings another huge set of variations. The necessary time is nearly always too high and therefore limits the efficiency of this approach.
With all these contraints, achieving robustness tests on a target is feasible only if the expert has access to a specially designed tool for the targeted protocol. Hence the emergence of a large number of tools to verify the behavior of an application on one or more communication protocols. However in the context of proprietary communications protocols for which no specifications are published, fuzzers do not provide optimal results.
Netzob helps the security evaluator by simplifying the creation of a dedicated fuzzer for a proprietary or undocumented protocol. It allows the expert to execute a semi-automated inferring process to create a model of the targeted protocol. This model can afterward be refined by the evaluator. Finally, the created model is included in the fuzzing module of Netzob which considers the vocabulary and the grammar of the protocol to generate optimized and specific test cases. Both mutation and generation are available for fuzzing.
Export protocol model¶
The following export formats are currently provided by Netzob:
- XML format
- human readable (Wireshark like)
- Peach fuzzer export: this allows to combine efficiency of Peach Fuzzer on previously undocummented protocols.
Besides, you can write your own exporter to manipulate the inferred protocol model in your favorite tool.
Netzob has been initiated by security auditors of AMOSSYS and the CIDre research team of Supélec to address the reverse engineering of communication protocols. A detailed overview of the project is available here.
Tutorials¶
- Discover features of Netzob
- The goal of this tutorial is to present the usage of each main component of Netzob (inference of message format, construction of the state machine, generation of traffic and fuzzing) through an undocumented protocol.
- Modeling your Protocol with Netzob
- This tutorial details the main features of Netzob’s protocol modeling aspects. It shows how your protocol fields can be described with Netzob’s language.
API Documentation¶
netzob package¶
Subpackages¶
netzob.Common package¶
Subpackages¶
-
class
JSONSerializator
[source]¶ Bases:
object
-
static
serialize
(obj)[source]¶ Serialize the specified object under a specific JSON format. It inspects the specified object to search for attributes to serialize.
>>> from netzob.all import * >>> msg = RawMessage("hello") >>> print(JSONSerializator.serialize(msg))
It’s not possible to serialize a None object
>>> JSONSerializator.serialize(None) Traceback (most recent call last): ... TypeError: Cannot serialize a None object
Parameters: obj ( object
) – the object to serializeReturns: the object serialized in JSON Return type: str
-
static
-
class
AbstractMementoCreator
[source]¶ Bases:
object
Parent class of objects to save for Undo/Redo.
This abstract class must be inherited by all the objects which need to be saved for Undo/Redo processes. These objects have to provide two methods, storeInMemento and restoreFromMemento both used to save and restore current state of the object.
-
restoreFromMemento
(memento)[source]¶ This method restores current object internals with provided memento.
The provided memento should be created by the storeInMemento method and represents the current object. It returns the current state of the object before the restore operation
Parameters: memento (netzob.Common.Utils.UndoRedo.AbstractMemento.AbstractMemento) – memento containing internals to set in current object to restore it. Returns: the memento of current object before executing the restore process Return type: netzob.Common.Utils.UndoRedo.AbstractMemento.AbstractMemento
-
storeInMemento
()[source]¶ This method creates a memento to represent the current state of object.
This memento should be stored in the UndoRedo action stack and might be used as a parameter of the restoreFromMemento method.
Returns: the created memento representing current object Return type: netzob.Common.Utils.UndoRedo.AbstractMemento.AbstractMemento
-
-
NetzobLogger
(klass)[source]¶ This class decorator adds (if necessary) an instance of the logger (self.__logger) to the attached class and removes from the getState the logger.
-
typeCheck
(*types)[source]¶ Decorator which reduces the amount of code to type-check attributes.
Its allows to replace the following code:
@id.setter def id(self, id): if not isinstance(id, uuid.UUID): raise TypeError("Invalid types for argument id, must be an UUID") self.__id = id
with:
@id.setter @typeCheck(uuid.UUID) def id(self, id): self.__id = id
Note
set type = “SELF” to check the type of the self parameter
Warning
if argument is None, the type checking is not executed on it.
-
class
MatrixList
[source]¶ Bases:
list
This type of list has been created to represent it as matrix which means its a list of list.
The __str__ method has been redefined to propose a nice representation of its content.
-
headers
¶ A list of sorted strings. Each string will be displayed as a column header
-
-
class
TypedList
(membersTypes, *args)[source]¶ Bases:
collections.abc.MutableSequence
A strong typed list based on collections.MutableSequence.
The idea is to verify members type when editing the list. By using this class instead of the typical list, we enforce members type.
>>> typedList = TypedList(str) >>> typedList.append("toto") >>> typedList.extend(["titi", "tata"]) >>> len(typedList) 3 >>> typedList[1] 'titi' >>> typedList.append(3) Traceback (most recent call last): TypeError: Invalid type for argument, expecting: <type 'str'> >>> typedList.extend(["tutu", 5]) Traceback (most recent call last): TypeError: Invalid type for argument, expecting: <type 'str'>
Submodules¶
netzob.Common.CommandLine module¶
-
class
CommandLine
[source]¶ Bases:
object
Reads, validates and parses the command line arguments provided by users
-
configure
()[source]¶ Configure the parser based on Netzob’s usage and the definition of its options and arguments
-
getConfiguredParser
()[source]¶ Return (if available) the parser configured to manage provided arguments and options by user. @return: the parser
-
netzob.Common.DepCheck module¶
netzob.Common.LoggingConfiguration module¶
netzob.Common.NetzobException module¶
-
exception
NetzobException
(value)[source]¶ Bases:
Exception
Class of handling Netzob specific exceptions
-
exception
NetzobImportException
(source, message, statusCode=None, subCode=None)[source]¶ Bases:
netzob.Common.NetzobException.NetzobException
Raised if an error was encountered while importing data
netzob.Common.all module¶
Module contents¶
netzob.Import package¶
Subpackages¶
-
class
IPDecoderForICMP
[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactDecoder.Decoder
This class was added to parse the IP header of ICMP unreachables packets If you use the “standard” IPDecoder, it might crash (see bug #4870) ImpactPacket.py because the TCP header inside the IP header is incomplete
-
class
Data
(aBuffer=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.Header
This packet type can hold raw data. It’s normally employed to hold a packet’s innermost layer’s contents in those cases for which the protocol details are unknown, and there’s a copy of a valid packet available.
For instance, if all that’s known about a certain protocol is that a UDP packet with its contents set to “HELLO” initiate a new session, creating such packet is as simple as in the following code fragment: packet = UDP() packet.contains(‘HELLO’)
-
class
Ethernet
(aBuffer=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.Header
-
get_tag
(index)[source]¶ Returns an EthernetTag initialized from index-th VLAN tag. The tags are numbered from 0 to self.tag_cnt-1 as they appear in the frame. It is possible to use negative indexes as well.
-
pop_tag
(index=0)[source]¶ Removes the index-th VLAN tag and returns it as an EthernetTag object. Index defaults to 0 (the top of the stack).
-
-
class
EthernetTag
(value=2164260864)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.PacketBuffer
Represents a VLAN header specified in IEEE 802.1Q and 802.1ad. Provides methods for convenient manipulation with header fields.
-
class
Header
(length=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.PacketBuffer
,netzob.Import.PCAPImporter.ImpactPacket.ProtocolLayer
This is the base class from which all protocol definitions extend.
-
ethertype
= None¶
-
get_header_size
()[source]¶ Return the size of this header, that is, not counting neither the size of the children nor of the parents.
-
get_packet
()[source]¶ Returns the raw representation of this packet and its children as a string. The output from this method is a packet ready to be transmitted over the wire.
-
get_pseudo_header
()[source]¶ Pseudo headers can be used to limit over what content will the checksums be calculated.
-
load_header
(aBuffer)[source]¶ Properly set the state of this instance to reflect that of the raw packet passed as argument.
-
packet_printable
= '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '¶
-
protocol
= None¶
-
-
class
ICMP
(aBuffer=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.Header
-
ICMP_UNREACH
= 3¶
-
protocol
= 1¶
-
-
class
IP
(aBuffer=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.Header
-
ethertype
= 2048¶
-
-
class
IPOption
(opcode=0, size=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.PacketBuffer
-
IPOPT_EOL
= 0¶
-
IPOPT_LSRR
= 131¶
-
IPOPT_NOP
= 1¶
-
IPOPT_RR
= 7¶
-
IPOPT_SSRR
= 137¶
-
IPOPT_TS
= 68¶
-
-
class
LinuxSLL
(aBuffer=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.Header
-
type_descriptions
= ['sent to us by somebody else', 'broadcast by somebody else', 'multicast by somebody else', 'sent to somebody else to somebody else', 'sent by us']¶
-
-
class
PacketBuffer
(length=None)[source]¶ Bases:
object
Implement the basic operations utilized to operate on a packet’s raw buffer. All the packet classes derive from this one.
The byte, word, long and ip_address getters and setters accept negative indexes, having these the a similar effect as in a regular Python sequence slice.
-
compute_checksum
(anArray)[source]¶ Return the one’s complement of the one’s complement sum of all the 16-bit words in ‘anArray’
-
get_long
(index, order='!')[source]¶ Return 4-byte value at ‘index’. See struct module’s documentation to understand the meaning of ‘order’.
-
get_long_long
(index, order='!')[source]¶ Return 8-byte value at ‘index’. See struct module’s documentation to understand the meaning of ‘order’.
-
get_word
(index, order='!')[source]¶ Return 2-byte word at ‘index’. See struct module’s documentation to understand the meaning of ‘order’.
-
set_checksum_from_data
(index, data)[source]¶ Set 16-bit checksum at ‘index’ by calculating checksum of ‘data’
-
set_long
(index, value, order='!')[source]¶ Set 4-byte ‘value’ at ‘index’. See struct module’s documentation to understand the meaning of ‘order’.
-
-
class
ProtocolLayer
[source]¶ Bases:
object
Protocol Layer Manager for insertion and removal of protocol layers.
-
class
ProtocolPacket
(header_size, tail_size)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.ProtocolLayer
-
body
¶
-
body_string
¶
-
header
¶
-
load_body
(aBuffer)[source]¶ Load the packet body from string. WARNING: Using this function will break the hierarchy of preceding protocol layer
-
load_packet
(aBuffer)[source]¶ Load the whole packet from a stringWARNING: Using this function will break the hierarchy of preceding protocol layer
-
tail
¶
-
tail_string
¶
-
-
class
TCP
(aBuffer=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.Header
-
TCP_FLAGS_MASK
= 255¶
-
get_packet
()[source]¶ Returns entire packet including child data as a string. This is the function used to extract the final packet
-
protocol
= 6¶
-
-
class
TCPOption
(kind, data=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.PacketBuffer
-
TCPOPT_EOL
= 0¶
-
TCPOPT_MAXSEG
= 2¶
-
TCPOPT_NOP
= 1¶
-
TCPOPT_SACK
= 5¶
-
TCPOPT_SACK_PERMITTED
= 4¶
-
TCPOPT_SIGNATURE
= 19¶
-
TCPOPT_TIMESTAMP
= 8¶
-
TCPOPT_WINDOW
= 3¶
-
-
class
UDP
(aBuffer=None)[source]¶ Bases:
netzob.Import.PCAPImporter.ImpactPacket.Header
-
protocol
= 17¶
-
Submodules¶
netzob.Import.all module¶
Module contents¶
netzob.Inference package¶
Subpackages¶
-
class
MealyLSTAR
(inputVocabulary, membershipOracle)[source]¶ Bases:
object
This class is an implementation of the Angluin L* Algorithm as detailled in “Learning regular sets from queries and counterexamples” [Ang87].
This active grammatical inference algorithm infers state machine. It communicates with a target by sending membership queries which requires to have access to an implementation of the protocol.
To illustrate its usage, we will infer the grammar of a fake simple protocol.
>>> from netzob.all import * >>> import time
We first create a fake server which requires a vocabulary of input (I) and output (O) symbols:
>>> i0 = Symbol(name="a", fields=[Field("a\n")]) >>> i1 = Symbol(name="b", fields=[Field("b\n")]) >>> i2 = Symbol(name="c", fields=[Field("c\n")]) >>> i3 = Symbol(name="d", fields=[Field("d\n")]) >>> # List of Client > Server messages >>> I = [i0, i1, i2, i3]
>>> o0 = Symbol(name="0", fields=[Field("0")]) >>> o1 = Symbol(name="1", fields=[Field("1")]) >>> o2 = Symbol(name="2", fields=[Field("2")]) >>> o3 = Symbol(name="3", fields=[Field("3")]) >>> # List of Server > Client messages >>> O = [o0, o1, o2, o3]
>>> symbolList = I + O
Now we can create the grammar which includes 5 states
>>> s0 = State(name="S0") >>> s1 = State(name="S1") >>> s2 = State(name="S2") >>> s3 = State(name="S3") >>> s4 = State(name="S4")
and their transitions
>>> t0 = Transition(s0, s1, i0, [o0]) >>> t1 = Transition(s1, s1, i1, [o1]) >>> t2 = Transition(s1, s2, i2, [o2]) >>> t3 = Transition(s2, s1, i1, [o1]) >>> t4 = Transition(s2, s3, i0, [o0]) >>> t5 = Transition(s3, s1, i1, [o1]) >>> t6 = Transition(s3, s4, i2, [o2]) >>> t7 = Transition(s1, s4, i0, [o1])
we add an initial state and an ending state with open and close channel transitions
>>> initialState = State(name="Initial") >>> endingState = State(name="End") >>> openTransition = OpenChannelTransition(initialState, s0) >>> closeTransition = CloseChannelTransition(s4, endingState) >>> automata = Automata(initialState, symbolList)
>>> # Create an actor: Alice (a server) >>> channel = UDPServer(localIP="127.0.0.1", localPort=8887) >>> abstractionLayer = AbstractionLayer(channel, symbolList) >>> alice = Actor(automata = automata, initiator = False, abstractionLayer=abstractionLayer) >>> alice.start()
We finaly create an angluin-based grammar learner
>>> # Creates an inference channel >>> angluinChannel = UDPClient(remoteIP="127.0.0.1", remotePort=8887) # >>> angluin = MealyLSTAR(inputSymbols = I, outputSymbols = O, channel=angluinChannel) # >>> angluin.start()
# We wait for the results
>>> time.sleep(10)
# >>> while (angluin.alive): time.sleep(5) >>> print(“Inference finish”) Inference finish
>>> alice.stop()
>>> print(angluin.initialStateOfInferedGrammar) State
[Ang87] @article{Ang87, author = {Angluin, Dana}, title = {Learning regular sets from queries and counterexamples}, journal = {Inf. Comput.}, year = {1987}, volume = {75}, pages = {87–106}, month = {November} }
-
hypothesisModel
¶
-
inputVocabulary
¶
-
membershipOracle
¶
-
-
class
EntropyMeasurement
[source]¶ Bases:
object
This utility class exposes various methods related to Entropy. This measure can be usefull to identify encrypted and compressed chunk of data accross various messages. By entropy we refer to the Shanon’s one.
>>> import binascii >>> from netzob.all import * >>> fake_random_values = [b"00000906", b"00110906", b"00560902", b"00ff0901"] >>> messages = [RawMessage(binascii.unhexlify(val)) for val in fake_random_values] >>> [byte_entropy for byte_entropy in EntropyMeasurement.measure_entropy(messages)] [0.0, 2.0, 0.0, 1.5]
In the following example, 1000 messages are generated under a simple specification. In the specification, 5 bytes are randomly generated. This specificity can easily be spoited by the entropy measurement as illustred below.
>>> f1 = Field(b"hello ") >>> f2 = Field(Raw(nbBytes=5)) >>> f3 = Field(b", welcome !") >>> s = Symbol(fields=[f1, f2, f3]) >>> messages = [RawMessage(s.specialize()) for x in range(1000)] >>> bytes_entropy = [byte_entropy for byte_entropy in EntropyMeasurement.measure_entropy(messages)] >>> min(bytes_entropy[6:11]) > 7 True
You can also measure the entropy of the data that are accepeted by a specific field.
>>> f1 = Field(Raw(nbBytes=2)) >>> f2 = Field(Raw(nbBytes=(10, 20))) >>> f3 = Field(Raw(nbBytes=2)) >>> s = Symbol(fields=[f1, f2, f3]) >>> s.messages = [RawMessage(s.specialize()) for x in range(1000)] >>> bytes_entropy = [byte_entropy for byte_entropy in EntropyMeasurement.measure_values_entropy(f2.getValues())] >>> print(min(bytes_entropy[:10]) > 7) True
-
static
measure_entropy
(messages)[source]¶ This method returns the entropy of bytes found at each position of the messages.
>>> [x for x in EntropyMeasurement.measure_entropy(messages=None)] Traceback (most recent call last): ... Exception: Messages cannot be None
>>> from netzob.all import * >>> [x for x in EntropyMeasurement.measure_entropy(messages=[RawMessage()])] Traceback (most recent call last): ... Exception: At least two messages must be provided
-
static
measure_values_entropy
(values)[source]¶ This method returns the entropy of bytes found at each position of the specified values.
>>> [x for x in EntropyMeasurement.measure_values_entropy(values=None)] Traceback (most recent call last): ... Exception: values cannot be None
>>> from netzob.all import * >>> [x for x in EntropyMeasurement.measure_values_entropy(values=[])] Traceback (most recent call last): ... Exception: At least one value must be provided
-
static
Submodules¶
netzob.Inference.all module¶
Module contents¶
netzob.Model package¶
Subpackages¶
-
class
SVAS
[source]¶ Bases:
object
Represents a State Variable Assignment Strategy
The SVAS of a variable defines how its value is used while abstracting and specializing. The SVAS impacts the memorization strategy.
-
CONSTANT
= 'Constant SVAS'¶
-
EPHEMERAL
= 'Ephemeral SVAS'¶
-
PERSISTENT
= 'Persistent SVAS'¶
-
VOLATILE
= 'Volatile SVAS'¶
-
-
class
EncodingFunction
[source]¶ Bases:
netzob.Common.Utils.SortableObject.SortableObject
Represents a function which applies to modify the encoding of a data.
The application of these functions is priorized using a SortedTypedList, hence every filter needs to set their application priority.
-
class
ApplicativeData
(name, value, _id=None)[source]¶ Bases:
object
An applicative data represents an information used over the application that generated the captured flows. It can be the player name or the user email address if these informations are used somehow by the protocol.
An applicative data can be created out of any information. >>> from netzob.all import * >>> app = ApplicativeData(“Username”, ASCII(“toto”)) >>> print(app.name) Username
>>> app1 = ApplicativeData("Email", ASCII("contact@netzob.org")) >>> print(app1.value) ASCII=contact@netzob.org ((0, 144))
-
id
¶ The unique id of the applicative data.
Type: uuid.UUID
-
name
¶ The name of the applicative data.
Type: str
-
value
¶ The value of the applicative data.
Type: object
-
Submodules¶
netzob.Model.Protocol module¶
netzob.Model.all module¶
Module contents¶
netzob.Simulator package¶
Subpackages¶
-
class
AbstractChannel
(isServer, _id=UUID('ad73f16c-7b5f-411b-ae31-ca12bb5544d8'))[source]¶ Bases:
object
-
DEFAULT_WRITE_COUNTER_MAX
= -1¶
-
TYPE_IPCLIENT
= 2¶
-
TYPE_RAWETHERNETCLIENT
= 3¶
-
TYPE_RAWIPCLIENT
= 1¶
-
TYPE_SSLCLIENT
= 4¶
-
TYPE_TCPCLIENT
= 5¶
-
TYPE_TCPSERVER
= 6¶
-
TYPE_UDPCLIENT
= 7¶
-
TYPE_UDPSERVER
= 8¶
-
TYPE_UNDEFINED
= 0¶
-
channelType
¶ Returns if the communication channel type
Returns: the type of the communication channel Type: int
-
static
getLocalIP
(remoteIP)[source]¶ Retrieve the source IP address which will be used to connect to the destination IP address.
-
static
getLocalInterface
(localIP)[source]¶ Retrieve the network interface name associated with a specific IP address.
-
id
¶ the unique identifier of the channel
Type: uuid.UUID
-
isOpen
¶ Returns if the communication channel is open
Returns: the status of the communication channel Type: bool
-
isServer
¶ isServer indicates if this side of the channel plays the role of a server.
Type: bool
-
open
(timeout=None)[source]¶ Open the communication channel. If the channel is a server, it starts to listen and will create an instance for each different client.
Parameters: timeout – the maximum time to wait for a client to connect
-
read
(timeout=None)[source]¶ Read the next message on the communication channel.
@keyword timeout: the maximum time in millisecond to wait before a message can be reached @type timeout:
int
-
sendReceive
(data, timeout=None)[source]¶ Write on the communication channel the specified data and returns the corresponding response
Parameters: data (binary object) – the data to write on the channel @type timeout:
int
-
setWriteCounterMax
(maxValue)[source]¶ Change the max number of writings. When it is reached, no packet can be sent anymore until clearWriteCounter() is called. if maxValue==-1, the sending limit is deactivated.
Parameters: maxValue (int) – the new max value
-
write
(data, rate=None, duration=None)[source]¶ Write on the communication channel the specified data
Parameters: - data (bytes object) – the data to write on the channel
- rate (int) – specifies the bandwidth in octets to respect during traffic emission (should be used with duration= parameter)
- duration (int) – tells how much seconds the symbol is continuously written on the channel
- duration – tells how much time the symbol is written on the channel
-
-
class
SSLClient
(remoteIP, remotePort, localIP=None, localPort=None, timeout=2, server_cert_file=None, alpn_protocols=None)[source]¶ Bases:
netzob.Simulator.Channels.AbstractChannel.AbstractChannel
An SSLClient is a communication channel that relies on SSL. It allows to create client connecting to a specific IP:Port server over a TCP/SSL socket.
When the actor execute an OpenChannelTransition, it calls the open method on the ssl client which connects to the server.
-
localIP
¶ IP on which the server will listen.
Type: str
-
localPort
¶ TCP Port on which the server will listen. Its value must be above 0 and under 65535.
Type: int
-
open
(timeout=None)[source]¶ Open the communication channel. If the channel is a client, it starts to connect to the specified server.
-
read
(timeout=None)[source]¶ Read the next message on the communication channel.
@keyword timeout: the maximum time in millisecond to wait before a message can be reached @type timeout:
int
-
remoteIP
¶ IP on which the server will listen.
Type: str
-
remotePort
¶ TCP Port on which the server will listen. Its value must be above 0 and under 65535.
Type: int
-
sendReceive
(data, timeout=None)[source]¶ Write on the communication channel the specified data and returns the corresponding response.
-
timeout
¶
-
-
class
TCPClient
(remoteIP, remotePort, localIP=None, localPort=None, timeout=5)[source]¶ Bases:
netzob.Simulator.Channels.AbstractChannel.AbstractChannel
A TCPClient is a communication channel. It allows to create client connecting to a specific IP:Port server over a TCP socket.
When the actor execute an OpenChannelTransition, it calls the open method on the tcp client which connects to the server.
>>> from netzob.all import * >>> import time >>> client = TCPClient(remoteIP='127.0.0.1', remotePort=9999)
>>> symbol = Symbol([Field("Hello everyone!")]) >>> s0 = State() >>> s1 = State() >>> s2 = State() >>> openTransition = OpenChannelTransition(startState=s0, endState=s1) >>> mainTransition = Transition(startState=s1, endState=s1, inputSymbol=symbol, outputSymbols=[symbol]) >>> closeTransition = CloseChannelTransition(startState=s1, endState=s2) >>> automata = Automata(s0, [symbol])
>>> channel = TCPServer(localIP="127.0.0.1", localPort=8885) >>> abstractionLayer = AbstractionLayer(channel, [symbol]) >>> server = Actor(automata = automata, initiator = False, abstractionLayer=abstractionLayer)
>>> channel = TCPClient(remoteIP="127.0.0.1", remotePort=8885) >>> abstractionLayer = AbstractionLayer(channel, [symbol]) >>> client = Actor(automata = automata, initiator = True, abstractionLayer=abstractionLayer)
>>> server.start() >>> client.start()
>>> time.sleep(1) >>> client.stop() >>> server.stop()
-
localIP
¶ IP on which the server will listen.
Type: str
-
localPort
¶ TCP Port on which the server will listen. Its value must be above 0 and under 65535.
Type: int
-
open
(timeout=None)[source]¶ Open the communication channel. If the channel is a client, it starts to connect to the specified server.
-
read
(timeout=None)[source]¶ Reads the next message on the communication channel. Continues to read while it receives something.
@keyword timeout: the maximum time in millisecond to wait before a message can be reached @type timeout:
int
-
remoteIP
¶ IP on which the server will listen.
Type: str
-
remotePort
¶ TCP Port on which the server will listen. Its value must be above 0 and under 65535.
Type: int
-
sendReceive
(data, timeout=None)[source]¶ Write on the communication channel the specified data and returns the corresponding response.
-
timeout
¶
-
-
class
TCPServer
(localIP, localPort, timeout=5)[source]¶ Bases:
netzob.Simulator.Channels.AbstractChannel.AbstractChannel
A TCPServer is a communication channel. It allows to create server listening on a specified IP:Port over a TCP socket.
When the actor execute an OpenChannelTransition, it calls the open method on the tcp server which starts the server. The objective of the server is to wait for the client to connect.
>>> from netzob.all import * >>> import time >>> server = TCPServer(localIP='127.0.0.1', localPort=9999)
>>> symbol = Symbol([Field("Hello everyone!")]) >>> s0 = State() >>> s1 = State() >>> s2 = State() >>> openTransition = OpenChannelTransition(startState=s0, endState=s1) >>> mainTransition = Transition(startState=s1, endState=s1, inputSymbol=symbol, outputSymbols=[symbol]) >>> closeTransition = CloseChannelTransition(startState=s1, endState=s2) >>> automata = Automata(s0, [symbol])
>>> channel = TCPServer(localIP="127.0.0.1", localPort=8886) >>> abstractionLayer = AbstractionLayer(channel, [symbol]) >>> server = Actor(automata = automata, initiator = False, abstractionLayer=abstractionLayer)
>>> channel = TCPClient(remoteIP="127.0.0.1", remotePort=8886) >>> abstractionLayer = AbstractionLayer(channel, [symbol]) >>> client = Actor(automata = automata, initiator = True, abstractionLayer=abstractionLayer)
>>> server.start() >>> client.start()
>>> time.sleep(1) >>> client.stop() >>> server.stop()
-
localIP
¶ IP on which the server will listen.
Type: str
-
localPort
¶ TCP Port on which the server will listen. Its value must be above 0 and under 65535.
Type: int
-
open
(timeout=None)[source]¶ Open the communication channel. If the channel is a server, it starts to listen and will create an instance for each different client
-
read
(timeout=None)[source]¶ Read the next message on the communication channel.
@keyword timeout: the maximum time in millisecond to wait before a message can be reached @type timeout:
int
-
sendReceive
(data, timeout=None)[source]¶ Write on the communication channel the specified data and returns the corresponding response.
-
timeout
¶
-
-
class
UDPClient
(remoteIP, remotePort, localIP=None, localPort=None, timeout=5)[source]¶ Bases:
netzob.Simulator.Channels.AbstractChannel.AbstractChannel
A UDPClient is a communication channel. It allows to create client connecting to a specific IP:Port server over a UDP socket.
When the actor executes an OpenChannelTransition, it calls the open method on the UDP client which connects to the server.
>>> from netzob.all import * >>> import time >>> client = UDPClient(remoteIP='127.0.0.1', remotePort=9999) >>> client.open() >>> client.close()
>>> symbol = Symbol([Field("Hello everyone!")]) >>> s0 = State() >>> s1 = State() >>> s2 = State() >>> openTransition = OpenChannelTransition(startState=s0, endState=s1) >>> mainTransition = Transition(startState=s1, endState=s1, inputSymbol=symbol, outputSymbols=[symbol]) >>> closeTransition = CloseChannelTransition(startState=s1, endState=s2) >>> automata = Automata(s0, [symbol])
>>> channel = UDPServer(localIP="127.0.0.1", localPort=8883) >>> abstractionLayer = AbstractionLayer(channel, [symbol]) >>> server = Actor(automata = automata, initiator = False, abstractionLayer=abstractionLayer)
>>> channel = UDPClient(remoteIP="127.0.0.1", remotePort=8883) >>> abstractionLayer = AbstractionLayer(channel, [symbol]) >>> client = Actor(automata = automata, initiator = True, abstractionLayer=abstractionLayer)
>>> server.start() >>> client.start()
>>> time.sleep(2) >>> client.stop() >>> server.stop()
-
localIP
¶ IP on which the server will listen.
Type: str
-
localPort
¶ UDP Port on which the server will listen. Its value must be above 0 and under 65535.
Type: int
-
open
(timeout=None)[source]¶ Open the communication channel. If the channel is a client, it starts to connect to the specified server.
-
read
(timeout=None)[source]¶ Read the next message on the communication channel.
@keyword timeout: the maximum time in millisecond to wait before a message can be reached @type timeout:
int
-
remoteIP
¶ IP on which the server will listen.
Type: str
-
remotePort
¶ UDP Port on which the server will listen. Its value must be above 0 and under 65535.
Type: int
-
sendReceive
(data, timeout=None)[source]¶ Write on the communication channel the specified data and returns the corresponding response.
-
timeout
¶
-
-
class
UDPServer
(localIP, localPort, timeout=5)[source]¶ Bases:
netzob.Simulator.Channels.AbstractChannel.AbstractChannel
A UDPServer is a communication channel. It allows to create a server that listen to a specific IP:Port over a UDP socket.
When the actor executes an OpenChannelTransition, it calls the open method on the UDP server which makes it to listen for incomming messages.
>>> from netzob.all import * >>> import time >>> server = UDPServer(localIP='127.0.0.1', localPort=9999) >>> server.open() >>> server.close()
>>> symbol = Symbol([Field("Hello everyone!")]) >>> s0 = State() >>> s1 = State() >>> s2 = State() >>> openTransition = OpenChannelTransition(startState=s0, endState=s1) >>> mainTransition = Transition(startState=s1, endState=s1, inputSymbol=symbol, outputSymbols=[symbol]) >>> closeTransition = CloseChannelTransition(startState=s1, endState=s2) >>> automata = Automata(s0, [symbol])
>>> channel = UDPServer(localIP="127.0.0.1", localPort=8884) >>> abstractionLayer = AbstractionLayer(channel, [symbol]) >>> server = Actor(automata = automata, initiator = False, abstractionLayer=abstractionLayer)
>>> channel = UDPClient(remoteIP="127.0.0.1", remotePort=8884) >>> abstractionLayer = AbstractionLayer(channel, [symbol]) >>> client = Actor(automata = automata, initiator = True, abstractionLayer=abstractionLayer)
>>> server.start() >>> client.start()
>>> time.sleep(1) >>> client.stop() >>> server.stop()
-
localIP
¶ IP on which the server will listen.
Type: str
-
localPort
¶ UDP Port on which the server will listen. Its value must be above 0 and under 65535.
Type: int
-
open
(timeout=None)[source]¶ Open the communication channel. This will open a UDP socket that listen for incomming messages.
-
read
(timeout=None)[source]¶ Read the next message on the communication channel.
@keyword timeout: the maximum time in millisecond to wait before a message can be reached @type timeout:
int
-
sendReceive
(data, timeout=None)[source]¶ Write on the communication channel the specified data and returns the corresponding response.
-
timeout
¶
-
Submodules¶
netzob.Simulator.AbstractionLayer module¶
netzob.Simulator.Actor module¶
netzob.Simulator.all module¶
Module contents¶
Submodules¶
netzob.NetzobInteractiveSessionController module¶
-
class
NetzobIPythonShellController
[source]¶ Bases:
netzob.NetzobInteractiveSessionController.NetzobInteractiveSessionController
Execute Netzob in a IPython embedded shell
netzob.NetzobResources module¶
netzob.all module¶
netzob.release module¶
-
keywords
= ['Protocol', 'Inference', 'Networking', 'Reverse Engineering', 'Fuzzing', 'Security']¶ @deprecated: the official long description is now the full README.rst file
Module contents¶
Developer Guide¶
See how you can contribute to Netzob
Indices and tables¶
Licences¶
Netzob code in provided under the GPLv3 licence.
The documentation is under the CC-BY-SA licence.