hdfs3¶
This project is not undergoing development¶
Pyarrow’s JNI hdfs interface is mature and stable. It also has fewer problems with configuration and various security settings, and does not require the complex build process of libhdfs3. Therefore, all users who have trouble with hdfs3 are recommended to try pyarrow.
Introduction¶
Use HDFS natively from Python.
The Hadoop File System (HDFS) is a widely deployed, distributed, data-local file system written in Java. This file system backs most clusters running Hadoop and Spark.
Pivotal produced libhdfs3, an alternative native C/C++ HDFS client that interacts with HDFS without the JVM, exposing first class support to non-JVM languages like Python.
This library, hdfs3
, is a lightweight Python wrapper around the C/C++
libhdfs3
library. It provides both direct access to libhdfs3 from Python
as well as a typical Pythonic interface.
>>> from hdfs3 import HDFileSystem
>>> hdfs = HDFileSystem(host='localhost', port=8020)
>>> hdfs.ls('/user/data')
>>> hdfs.put('local-file.txt', '/user/data/remote-file.txt')
>>> hdfs.cp('/user/data/file.txt', '/user2/data')
HDFS3 files comply with the Python File interface. This enables interactions with the broader ecosystem of PyData projects.
>>> with hdfs.open('/user/data/file.txt') as f:
... data = f.read(1000000)
>>> with hdfs.open('/user/data/file.csv.gz') as f:
... df = pandas.read_csv(f, compression='gzip', nrows=1000)
Motivation¶
We choose to use an alternative C/C++/Python HDFS client rather than the default JVM client for the following reasons:
- Convenience: Interactions between Java libraries and Native (C/C++/Python) libraries can be cumbersome. Using a native library from Python smoothes over the experience in development, maintenance, and debugging.
- Performance: Native libraries like
libhdfs3
do not suffer the long JVM startup times, improving interaction.
Related Work¶
- libhdfs3: The underlying C++ library from Apache HAWQ
- snakebite: Another Python HDFS library using Protobufs
- Dask: Parent project, a parallel computing library in Python
- Dask.distributed: Distributed computing in Python
Installation¶
Conda¶
Both the hdfs3
Python library and the compiled libhdfs3
library (and its
dependencies) are available from the
conda-forge repository using
conda:
$ conda install hdfs3 -c conda-forge
Note that conda packages are only available for the linux-64
platform.
PyPI and apt-get¶
Alternatively you can install the libhdfs3.so
library using a system
installer like apt-get
:
echo "deb https://dl.bintray.com/wangzw/deb trusty contrib" | sudo tee /etc/apt/sources.list.d/bintray-wangzw-deb.list
sudo apt-get install -y apt-transport-https
sudo apt-get update
sudo apt-get install libhdfs3 libhdfs3-dev
And install the Python wrapper library, hdfs3
, with pip
:
pip install hdfs3
Build from Source¶
See the libhdfs3 installation instructions to install the compiled library.
You can download the hdfs3
Python wrapper library from github and install
normally:
git clone git@github.com:dask/hdfs3
cd hdfs3
python setup.py install
Quickstart¶
Import hdfs3
and connect to an HDFS cluster:
>>> from hdfs3 import HDFileSystem
>>> hdfs = HDFileSystem(host='localhost', port=8020)
Write data to file:
>>> with hdfs.open('/tmp/myfile.txt', 'wb') as f:
... f.write(b'Hello, world!')
Read data back from file:
>>> with hdfs.open('/tmp/myfile.txt') as f:
... print(f.read())
Interact with files on HDFS:
>>> hdfs.ls('/tmp')
>>> hdfs.put('local-file.txt', '/tmp/remote-file.txt')
>>> hdfs.cp('/tmp/remote-file.txt', '/tmp/copied-file.txt')
Examples¶
Word count¶
Setup¶
In this example, we’ll use the hdfs3
library to count the number of words
in text files (Enron email dataset, 6.4 GB) stored in HDFS.
Copy the text data from Amazon S3 into HDFS on the cluster:
$ hadoop distcp s3n://AWS_SECRET_ID:AWS_SECRET_KEY@blaze-data/enron-email hdfs:///tmp/enron
where AWS_SECRET_ID
and AWS_SECRET_KEY
are valid AWS credentials.
Code example¶
Import hdfs3
and other standard libraries used in this example:
>>> import hdfs3
>>> from collections import defaultdict, Counter
Initalize a connection to HDFS, replacing NAMENODE_HOSTNAME
and
NAMENODE_PORT
with the hostname and port (default: 8020) of the HDFS
namenode.
>>> hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)
Generate a list of filenames from the text data in HDFS:
>>> filenames = hdfs.glob('/tmp/enron/*/*')
>>> print(filenames[:5])
['/tmp/enron/edrm-enron-v2_nemec-g_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_ring-r_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_bailey-s_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_fischer-m_xml.zip/merged.txt',
'/tmp/enron/edrm-enron-v2_geaccone-t_xml.zip/merged.txt']
Print the first 1024 bytes of the first text file:
>>> print(hdfs.head(filenames[0]))
b'Date: Wed, 29 Nov 2000 09:33:00 -0800 (PST)\r\nFrom: Xochitl-Alexis Velasc
o\r\nTo: Mark Knippa, Mike D Smith, Gerald Nemec, Dave S Laipple, Bo Barnwel
l\r\nCc: Melissa Jones, Iris Waser, Pat Radford, Bonnie Shumaker\r\nSubject:
Finalize ECS/EES Master Agreement\r\nX-SDOC: 161476\r\nX-ZLID: zl-edrm-enro
n-v2-nemec-g-2802.eml\r\n\r\nPlease plan to attend a meeting to finalize the
ECS/EES Master Agreement \r\ntomorrow 11/30/00 at 1:30 pm CST.\r\n\r\nI wi
ll email everyone tomorrow with location.\r\n\r\nDave-I will also email you
the call in number tomorrow.\r\n\r\nThanks\r\nXochitl\r\n\r\n***********\r\n
EDRM Enron Email Data Set has been produced in EML, PST and NSF format by ZL
Technologies, Inc. This Data Set is licensed under a Creative Commons Attri
bution 3.0 United States License <http://creativecommons.org/licenses/by/3.0
/us/> . To provide attribution, please cite to "ZL Technologies, Inc. (http:
//www.zlti.com)."\r\n***********\r\nDate: Wed, 29 Nov 2000 09:40:00 -0800 (P
ST)\r\nFrom: Jill T Zivley\r\nTo: Robert Cook, Robert Crockett, John Handley
, Shawna'
Create a function to count words in each file:
>>> def count_words(file):
... word_counts = defaultdict(int)
... for line in file:
... for word in line.split():
... word_counts[word] += 1
... return word_counts
>>> print(count_words(['apple banana apple', 'apple orange']))
defaultdict(int, {'apple': 3, 'banana': 1, 'orange': 1})
Count the number of words in the first text file:
>>> with hdfs.open(filenames[0]) as f:
... counts = count_words(f)
>>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
[(b'the', 1065320),
(b'of', 657220),
(b'to', 569076),
(b'and', 545821),
(b'or', 375132),
(b'in', 306271),
(b'shall', 255680),
(b'be', 210976),
(b'any', 206962),
(b'by', 194780)]
Count the number of words in all of the text files. This operation required about 10 minutes to run on a single machine with 4 cores and 16 GB RAM:
>>> all_counts = Counter()
>>> for fn in filenames:
... with hdfs.open(fn) as f:
... counts = count_words(f)
... all_counts.update(counts)
Print the total number of words and the words with the highest frequency from all of the text files:
>>> print(len(all_counts))
8797842
>>> print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
[(b'0', 67218380),
(b'the', 19586868),
(b'-', 14123768),
(b'to', 11893464),
(b'N/A', 11814665),
(b'of', 11724827),
(b'and', 10253753),
(b'in', 6684937),
(b'a', 5470371),
(b'or', 5227805)]
The complete Python script for this example is shown below:
# word-count.py
import hdfs3
from collections import defaultdict, Counter
hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)
filenames = hdfs.glob('/tmp/enron/*/*')
print(filenames[:5])
print(hdfs.head(filenames[0]))
def count_words(file):
word_counts = defaultdict(int)
for line in file:
for word in line.split():
word_counts[word] += 1
return word_counts
print(count_words(['apple banana apple', 'apple orange']))
with hdfs.open(filenames[0]) as f:
counts = count_words(f)
print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
all_counts = Counter()
for fn in filenames:
with hdfs.open(fn) as f:
counts = count_words(f)
all_counts.update(counts)
print(len(all_counts))
print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])
HDFS Configuration¶
Defaults¶
Several methods are available for configuring HDFS3.
The simplest is to load values from core-site.xml
and hdfs-site.xml
files.
HDFS3 will search typical locations and reads default configuration parameters from there.
The file locations may also be specified with the environment variables HADOOP_CONF_DIR
,
which is the directory containing the XLM files, HADOOP_INSTALL
, in which case the
files are expected in subdirectory hadoop/conf/
or LIBHDFS3_CONF
, which should
explicitly point to the hdfs-site.xml
file you wish to use.
It is also possible to pass parameters to HDFS3 when instantiating the file system. You
can either provide individual common overrides (e.g., host='myhost'
) or provide
a whole configuration as a dictionary (pars={}
) with the same key names as typically
contained in the XML config files. These parameters will take precedence over any loaded
from files, or you can disable using the default configuration at all with autoconf=False
.
The special environment variable LIBHDFS3_CONF
will be automatically set when parsing
the config files, if possible. Since the library is only loaded upon the first instantiation
of a HDFileSystem, you still have the option to change its value in os.environ
.
Short-circuit reads in HDFS¶
Typically in HDFS, all data reads go through the datanode. Alternatively, a process that runs on the same node as the data can bypass or short-circuit the communication path through the datanode and instead read directly from a file.
HDFS and hdfs3
can be configured for short-circuit reads. The easiest method is
to edit the hdfs-site.xml
file whose location you specify as above.
- Configure the appropriate settings in
hdfs-site.xml
on all of the HDFS nodes:
<configuration>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>
</configuration>
The above configuration changes should allow for short-circuit reads. If you
continue to receive warnings to retry the same node but disable read
shortcircuit feature
, check the above settings. Note that the HDFS reads
should still function despite the warning, but performance might be impacted.
For more information about configuring short-circuit reads, refer to the HDFS Short-Circuit Local Reads documentation.
High-availability mode¶
Although HDFS is resilient to failure of data-nodes, the name-node is a single repository of metadata for the system, and so a single point of failure. High-availability (HA) involves configuring fall-back name-nodes which can take over in the event of failure. A good description han be found `here`_.
In the case of `libhdfs3`_, the library used by hdfs3, the configuration required for HA can be passed to the client directly in python code, or included in configuration files, as with any other configuration options.
In python code, this could look like the following:
host = "nameservice1"
conf = {"dfs.nameservices": "nameservice1",
"dfs.ha.namenodes.nameservice1": "namenode113,namenode188",
"dfs.namenode.rpc-address.nameservice1.namenode113": "hostname_of_server1:8020",
"dfs.namenode.rpc-address.nameservice1.namenode188": "hostname_of_server2:8020",
"dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server1:50070",
"dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server2:50070",
"hadoop.security.authentication": "kerberos"
}
fs = HDFileSystem(host=host, pars=conf)
Note that no port
is specified (requires hdfs version 0.1.3), it’s value should be None
.
API¶
HDFileSystem ([host, port, connect, …]) |
Connection to an HDFS namenode |
HDFileSystem.cat (path) |
Return contents of file |
HDFileSystem.chmod (path, mode) |
Change access control of given path |
HDFileSystem.chown (path, owner, group) |
Change owner/group |
HDFileSystem.df () |
Used/free disc space on the HDFS system |
HDFileSystem.du (path[, total, deep]) |
Returns file sizes on a path. |
HDFileSystem.exists (path) |
Is there an entry at path? |
HDFileSystem.get (hdfs_path, local_path[, …]) |
Copy HDFS file to local |
HDFileSystem.getmerge (path, filename[, …]) |
Concat all files in path (a directory) to local output file |
HDFileSystem.get_block_locations (path[, …]) |
Fetch physical locations of blocks |
HDFileSystem.glob (path) |
Get list of paths mathing glob-like pattern (i.e., with “*”s). |
HDFileSystem.info (path) |
File information (as a dict) |
HDFileSystem.ls (path[, detail]) |
List files at path |
HDFileSystem.mkdir (path) |
Make directory at path |
HDFileSystem.mv (path1, path2) |
Move file at path1 to path2 |
HDFileSystem.open (path[, mode, replication, …]) |
Open a file for reading or writing |
HDFileSystem.put (filename, path[, chunk, …]) |
Copy local file to path in HDFS |
HDFileSystem.read_block (fn, offset, length) |
Read a block of bytes from an HDFS file |
HDFileSystem.rm (path[, recursive]) |
Use recursive for rm -r, i.e., delete directory and contents |
HDFileSystem.set_replication (path, replication) |
Instruct HDFS to set the replication for the given file. |
HDFileSystem.tail (path[, size]) |
Return last bytes of file |
HDFileSystem.touch (path) |
Create zero-length file |
HDFile (fs, path, mode[, replication, buff, …]) |
File on HDFS |
HDFile.close () |
Flush and close file, ensuring the data is readable |
HDFile.flush () |
Send buffer to the data-node; actual write may happen later |
HDFile.info () |
Filesystem metadata about this file |
HDFile.read ([length]) |
Read bytes from open file |
HDFile.readlines () |
Return all lines in a file as a list |
HDFile.seek (offset[, from_what]) |
Set file read position. |
HDFile.tell () |
Get current byte location in a file |
HDFile.write (data) |
Write bytes to open file (which must be in w or a mode) |
HDFSMap (hdfs, root[, check]) |
Wrap a HDFileSystem as a mutable mapping. |
-
class
hdfs3.core.
HDFileSystem
(host=<class 'hdfs3.utils.MyNone'>, port=<class 'hdfs3.utils.MyNone'>, connect=True, autoconf=True, pars=None, **kwargs)[source]¶ Connection to an HDFS namenode
>>> hdfs = HDFileSystem(host='127.0.0.1', port=8020) # doctest: +SKIP
-
cancel_token
(token=None)[source]¶ Revoke delegation token
Parameters: - token: str or None
If None, uses the instance’s token. It is an error to do that if there is no token.
-
chmod
(path, mode)[source]¶ Change access control of given path
Exactly what permissions the file will get depends on HDFS configurations.
Parameters: - path : string
file/directory to change
- mode : integer
As with the POSIX standard, each octal digit refers to user-group-all, in that order, with read-write-execute as the bits of each group.
Examples
Make read/writeable to all >>> hdfs.chmod(‘/path/to/file’, 0o777) # doctest: +SKIP
Make read/writeable only to user >>> hdfs.chmod(‘/path/to/file’, 0o700) # doctest: +SKIP
Make read-only to user >>> hdfs.chmod(‘/path/to/file’, 0o100) # doctest: +SKIP
-
concat
(destination, paths)[source]¶ Concatenate inputs to destination
Source files should all have the same block size and replication. The destination file must be in the same directory as the source files. If the target exists, it will be appended to.
Some HDFSs impose that the target file must exist and be an exact number of blocks long, and that each concated file except the last is also a whole number of blocks.
The source files are deleted on successful completion.
-
delegate_token
(user=None)[source]¶ Generate delegate auth token.
Parameters: - user: bytes/str
User to pass to delegation (defaults to user supplied to instance); this user is the only one that can renew the token.
-
du
(path, total=False, deep=False)[source]¶ Returns file sizes on a path.
Parameters: - path : string
where to look
- total : bool (False)
to add up the sizes to a grand total
- deep : bool (False)
whether to recurse into subdirectories
-
getmerge
(path, filename, blocksize=65536)[source]¶ Concat all files in path (a directory) to local output file
-
glob
(path)[source]¶ Get list of paths mathing glob-like pattern (i.e., with “*”s).
If passed a directory, gets all contained files; if passed path to a file, without any “*”, returns one-element list containing that filename. Does not support python3.5’s “**” notation.
-
ls
(path, detail=False)[source]¶ List files at path
Parameters: - path : string/bytes
location at which to list files
- detail : bool (=True)
if True, each list item is a dict of file properties; otherwise, returns list of filenames
-
open
(path, mode='rb', replication=0, buff=0, block_size=0)[source]¶ Open a file for reading or writing
Parameters: - path: string
Path of file on HDFS
- mode: string
One of ‘rb’, ‘wb’, or ‘ab’
- replication: int
Replication factor; if zero, use system default (only on write)
- buf: int (=0)
Client buffer size (bytes); if 0, use default.
- block_size: int
Size of data-node blocks if writing
-
put
(filename, path, chunk=65536, replication=0, block_size=0)[source]¶ Copy local file to path in HDFS
-
read_block
(fn, offset, length, delimiter=None)[source]¶ Read a block of bytes from an HDFS file
Starting at
offset
of the file, readlength
bytes. Ifdelimiter
is set then we ensure that the read starts and stops at delimiter boundaries that follow the locationsoffset
andoffset + length
. Ifoffset
is zero then we start at zero. The bytestring returned will not include the surrounding delimiter strings.If offset+length is beyond the eof, reads to eof.
Parameters: - fn: string
Path to filename on HDFS
- offset: int
Byte offset to start read
- length: int
Number of bytes to read
- delimiter: bytes (optional)
Ensure reading starts and stops at delimiter bytestring
See also
hdfs3.utils.read_block
Examples
>>> hdfs.read_block('/data/file.csv', 0, 13) # doctest: +SKIP b'Alice, 100\nBo' >>> hdfs.read_block('/data/file.csv', 0, 13, delimiter=b'\n') # doctest: +SKIP b'Alice, 100\nBob, 200'
-
renew_token
(token=None)[source]¶ Renew delegation token
Parameters: - token: str or None
If None, uses the instance’s token. It is an error to do that if there is no token.
Returns: - New expiration time for the token
-
set_replication
(path, replication)[source]¶ Instruct HDFS to set the replication for the given file.
If successful, the head-node’s table is updated immediately, but actual copying will be queued for later. It is acceptable to set a replication that cannot be supported (e.g., higher than the number of data-nodes).
-
-
class
hdfs3.core.
HDFile
(fs, path, mode, replication=0, buff=0, block_size=0)[source]¶ File on HDFS
Matches the standard Python file interface.
Examples
>>> with hdfs.open('/path/to/hdfs/file.txt') as f: # doctest: +SKIP ... bytes = f.read(1000) # doctest: +SKIP >>> with hdfs.open('/path/to/hdfs/file.csv') as f: # doctest: +SKIP ... df = pd.read_csv(f, nrows=1000) # doctest: +SKIP
-
next
()¶ Enables reading a file as a buffer in pandas
-
readline
(chunksize=256, lineterminator='\n')[source]¶ Return a line using buffered reading.
A line is a sequence of bytes between ``’- ‘`` markers (or given
line-terminator).
Line iteration uses this method internally.
Note: this function requires many calls to HDFS and is slow; it is in general better to wrap an HDFile with an
io.TextIOWrapper
for buffering, text decoding and newline support.
-
seek
(offset, from_what=0)[source]¶ Set file read position. Read mode only.
Attempt to move out of file bounds raises an exception. Note that, by the convention in python file seek, offset should be <=0 if from_what is 2.
Parameters: - offset : int
byte location in the file.
- from_what : int 0, 1, 2
if 0 (befault), relative to file start; if 1, relative to current location; if 2, relative to file end.
Returns: - new position
-
-
class
hdfs3.mapping.
HDFSMap
(hdfs, root, check=False)[source]¶ Wrap a HDFileSystem as a mutable mapping.
The keys of the mapping become files under the given root, and the values (which must be bytes) the contents of those files.
Parameters: - hdfs : HDFileSystem
- root : string
path to contain the stored files (directory will be created if it doesn’t exist)
- check : bool (=True)
performs a touch at the location, to check writeability.
Examples
>>> hdfs = hdfs3.HDFileSystem() # doctest: +SKIP >>> mw = HDFSMap(hdfs, '/writable/path/') # doctest: +SKIP >>> mw['loc1'] = b'Hello World' # doctest: +SKIP >>> list(mw.keys()) # doctest: +SKIP ['loc1'] >>> mw['loc1'] # doctest: +SKIP b'Hello World'
Known Limitations¶
Forked processes¶
The libhdfs3
library is not fork-safe. If you start using hdfs3
in a parent process and then fork a child process, using the library from
the child process may produce random errors because of system resources
that will not be available (such as background threads). Common solutions
include the following:
- Use threads instead of processes
- Use Python 3 and a multiprocessing context using either the “spawn” or “forkserver” method (see multiprocessing docs)
- Only instantiate
HDFileSystem
within the forked processes, do not ever start anHDFileSystem
within the parent processes