I read up on the documentation of HashPartitioner
. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitioner
partitions the distributed set based on the hash of the keys. For example if my data is like
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
So partitioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument
new HashPartitoner(numPartitions) //What does numPartitions do?
For the above dataset how would the results differ if I did
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
So how does HashPartitioner
work actually?
Well, lets make your dataset marginally more interesting:
We have six elements:
no partitioner:
and eight partitions:
Now lets define small helper to count number of elements per partition:
Since we don't have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):
Now lets repartition our dataset:
Since parameter passed to
HashPartitioner
defines number of partitions we have expect one partition:Since we have only one partition it contains all elements:
Note that the order of values after the shuffle is non-deterministic.
Same way if we use
HashPartitioner(2)
we'll get 2 partitions:
Since
rdd
is partitioned by key data won't be distributed uniformly anymore:Because with have three keys and only two different values of
hashCode
modnumPartitions
there is nothing unexpected here:Just to confirm the above:
Finally with
HashPartitioner(7)
we get seven partitions, three non-empty with 2 elements each:Summary and Notes
HashPartitioner
takes a single argument which defines number of partitionsvalues are assigned to partitions using
hash
of keys.hash
function may differ depending on the language (Scala RDD may usehashCode
,DataSets
use MurmurHash 3, PySpark,portable_hash
).In simple case like this, where key is a small integer, you can assume that
hash
is an identity (i = hash(i)
).Scala API uses
nonNegativeMod
to determine partition based on computed hash,if distribution of keys is not uniform you can end up in situations when part of your cluster is idle
keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:
In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?
Hash partitioner is neither injective nor surjective. Multiple keys can be assigned to a single partition and some partitions can remain empty.
Please note that currently hash based methods don't work in Scala when combined with REPL defined case classes (Case class equality in Apache Spark).
HashPartitioner
(or any otherPartitioner
) shuffles the data. Unless partitioning is reused between multiple operations it doesn't reduce amount of data to be shuffled.RDD
is distributed this means it is split on some number of parts. Each of this partitions is potentially on different machine. Hash partitioner with argumentnumPartitions
chooses on what partition to place pair(key, value)
in following way:numPartitions
partitions.(key, value)
in partition with numberHash(key) % numPartitions
The
HashPartitioner.getPartition
method takes a key as its argument and returns the index of the partition which the key belongs to. The partitioner has to know what the valid indices are, so it returns numbers in the right range. The number of partitions is specified through thenumPartitions
constructor argument.The implementation returns roughly
key.hashCode() % numPartitions
. See Partitioner.scala for more details.