Perfectly Random

machine learning and stuff

Spark ignores data types when filtering on equality

This may be surprising to regular pandas users and may lead to unexpected or silent errors.

The problem

Let’s construct an example dataframe to demonstrate the problem. The following dataframe has two columns – column x has type integer, column y has type string. Column x has an element 1 (integer) and column y has an element "1" (string).

df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, '1')], 'x INT, y STRING')
df.dtypes
# [('x', 'int'), ('y', 'string')]
df.show()
# +---+---+
# |  x|  y|
# +---+---+
# |  1|  a|
# |  2|  b|
# |  3|  1|
# +---+---+

Let’s say we want to count how many 1 (integer) values are in each column. We should get back these results – column x should have one instance of 1 (integer) and column y should have zero instances of 1 (integer). But only one of these happen.

df[df['x'].isin(1)].count()  # 1 (correct)
df[df['y'].isin(1)].count()  # 1 (incorrect)

We may suspect that the pyspark.sql.Column.isin method method has a bug. Let’s check another way without using .isin method.

df[df['x'] == 1].count()  # 1 (correct)
df[df['y'] == 1].count()  # 1 (incorrect)

We still get the same partially incorrect results. Let’s check something else – filter for "1" (string) instead.

df[df['x'].isin("1")].count()  # 1 (incorrect)
df[df['y'].isin("1")].count()  # 1 (correct)

df[df['x'] == "1"].count()  # 1 (incorrect)
df[df['y'] == "1"].count()  # 1 (correct)

It appears that (1) .isin is not the reason and (2) spark does not respect data types when filtering. Python is a dynamically typed language (though it now supports optional type checking). Perhaps that is causing this. Let’s just use SQL (via python) instead and verify.

df.createOrReplaceTempView('dummy')
spark.sql('SELECT * FROM dummy WHERE x = 1').count()  # 1 (correct)
spark.sql('SELECT * FROM dummy WHERE y = 1').count()  # 1 (incorrect)
spark.sql('SELECT * FROM dummy WHERE x = "1"').count()  # 1 (incorrect)
spark.sql('SELECT * FROM dummy WHERE y = "1"').count()  # 1 (correct)

We get the same results back – data types are not respected.

Let’s just use Scala

Let’s investigate if this happens in a statically typed language like Scala.

import spark.implicits._

val df = Seq(
    (1, "a"),
    (2, "b"),
    (3, "1")
).toDF("x", "y")

df.dtypes
// Array[(String, String)] = Array((x,IntegerType), (y,StringType))

df.filter($"x" === 1).count()    // 1 (correct)
df.filter($"y" === 1).count()    // 1 (incorrect)
df.filter($"x" === "1").count()  // 1 (incorrect)
df.filter($"y" === 1).count()    // 1 (correct)

Scala has the same problem!

Plain-old SQL has the same behavior

Turns out we see the same behavior in at least some SQL systems. Let’s use SQL (using sqlite3) without using spark at all.

import sqlite3
conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute('CREATE TABLE dummy (x integer, y string)')
c.execute('INSERT INTO dummy VALUES (1, "a")')
c.execute('INSERT INTO dummy VALUES (2, "b")')
c.execute('INSERT INTO dummy VALUES (3, "1")')
conn.commit()
list(c.execute('SELECT * FROM dummy'))
list(c.execute('SELECT * FROM dummy WHERE x=1'))   # [(1, 'a')]  (correct)
list(c.execute('SELECT * FROM dummy WHERE y=1'))   # [(3, 1)]    (incorrect)
list(c.execute('SELECT * FROM dummy WHERE x="1"')) # [(1, 'a')]  (incorrect)
list(c.execute('SELECT * FROM dummy WHERE y="1"')) # [(3, 1)]    (correct)
conn.close()

Turns out SQL does not respect data types either!

It gets worse! Even pandas has this bug.

Turns out that even pandas has this problem. For pyspark and SQL, this problem appears to be a consistent design issue but for pandas this problem appears to be bug instead of a design choice. I used pandas version 0.25.3 for this experiment.

pdf['x'].isin([1]).sum()  # 1 (correct)
pdf['y'].isin([1]).sum()  # 0 (correct)
pdf['x'].isin(["1"]).sum()  # 1 (incorrect)
pdf['y'].isin(["1"]).sum()  # 1 (correct)

Note how only the third line in the above snippet returns incorrect results. This indicates that pandas ignores the data type sometimes but not always. Further, in the case of pandas the problem is only in the .isin method and not in general (like in SQL and pyspark), as shown by the following example that does not use .isin.

# Results are correct when we don't use pd.Series.isin() method
pdf['x'].apply(lambda x: x == 1).sum()   # 1 (correct)
pdf['y'].apply(lambda y: y == 1).sum()   # 0 (correct)
pdf['x'].apply(lambda x: x == "1").sum() # 0 (correct)
pdf['y'].apply(lambda y: y == "1").sum() # 1 (correct)

(pdf['x'] == 1).sum()    # 1 (correct)
(pdf['y'] == 1).sum()    # 0 (correct)
(pdf['x'] == "1").sum()  # 0 (correct)
(pdf['y'] == "1").sum()  # 1 (correct)

Looking even deeper into the pandas.Series.isin method, we see that it relies upon the pandas.core.algorithms.isin function. Lines 452-453 (in pandas version 0.25.3) contain the following code (comments are mine):

# Lines 452-453 in pandas 0.25.3
comps, dtype, _ = _ensure_data(comps)              # comps = elements of the column
values, _, _ = _ensure_data(values, dtype=dtype)   # values = list of values passed to `.isin`

As you can see, values get type casted into the dtype of the column. We can verify this by actually running the code ourselves.

from pandas.core.algorithms import _ensure_data
comps = pdf['x']
values = ["1"]  # str
comps, dtype, _ = _ensure_data(comps)
print(dtype)  # int64
values, _, _ = _ensure_data(values, dtype=dtype)
print(values)  # [1]
print(values.dtype)  # int64

The reason why this does not affect column y in pdf above is because _ensure_data returns dtype=object for column y. Searching pandas GitHub issues, we see that this bug has been brought up before but was somehow ignored.

Takeaway

Spark and SQL ignore data types when filtering on equality. This seems to be a design issue and is consistent throughout (or at least as far as I can see). Pandas, on the other hand, exhibits this problem inconsistently which may lead to complacency.

For end users, the best way to prevent this mistake is to always manually ensure that the data types of the column match the data type of every value in a list values used for equality filtering.

Python's reference cycle collector

The standard implementation of CPython 3 has both reference counting and a generational garbage collector. The generational garbage collector is responsible for collecting reference cycles periodically. To review, here is an example of a reference cycle (note the use of .append).

x = [1, 2, 3]
y = [x]
x.append(y)
print(x)
# [1, 2, 3, [[...]]]
# [...] is a tell-tale sign of a ref cycle

Let’s see the generational garbage collector in action. Define a few helper functions for our investigation.

import sys, os, gc, psutil  # ignore E401
import numpy as np

process = psutil.Process(os.getpid())

def print_memory():
    memory_mb = int(np.round(process.memory_info().rss / 1e6))
    print('{}MB'.format(memory_mb))

def create_ref_cycle():
    a = [np.random.rand(2000, 2000)]
    b = [a]
    a.append(b)  # using .append() creates a ref cycle

def no_create_ref_cycle():
    a = [np.random.rand(2000, 2000)]
    b = [a]
    a = a + [b]  # redefining `a` doesn't create a ref cycle

Garbage collector is enabled by default. Let’s disable the garbage collector and create lots of reference cycles that become inaccessible to us. Measuring memory usage is a messy business that depends on lots of factors including the OS type. We will look at the resident set (rss) memory which may not be what your OS’s GUI reports (such as the Activity Monitor in MacOS). Your results will vary wildly every time you run these snippets. Our aim is to notice the effect of enabling or disabling the garbage collection instead of trying to get accurate memory usage. Also note that Python itself doesn’t always return all of the unused memory back to the OS.

# In a new python session with helper functions defined
gc.disable()
print_memory()
# 63MB
for _ in range(100):
    create_ref_cycle()  # consumes ~32MB per iteration
    print_memory()
# ...
# 3168MB
# 3200MB
# 3232MB
# 3264MB
gc.enable()
gc.collect()
print_memory()
# 2112MB

Let’s now run the function that does not create reference cycles. The memory usage for this function does not keep on increasing as a result of reference cycles (though there might be minor increases due to other, unrelated reasons).

# In a new python session with helper functions defined
gc.disable()
print_memory()
# 63MB
for _ in range(100):
    no_create_ref_cycle()  # consumes nothing per iteration
    print_memory()
# ...
# 95MB
# 95MB
# 95MB
# 95MB
gc.enable()
gc.collect()
print_memory()
# 95MB

The absolute memory numbers are not reliable – they vary by OS, RAM, other processes, and lots of other factors. Garbage collection runs periodically (not continuously) based on heuristics. You may see a temporary increase in memory usage until the garbage collection runs again. GC is also not perfect and does not prevent all memory leaks.

Takeaway

Don’t create reference cycles in your code. Know that the garbage collection in python is imperfect.

Spark Catalog API

Spark Catalog API (spark.catalog.*) provides good information about what tables are cached and which UDF functions are available.

from pyspark.sql.functions import udf
my_square = udf(lambda x: x * x, 'double')
spark.udf.register('my_square', my_square)
spark.catalog.listFunctions()
# ...
# Function(name='my_square', description=None, className=None, isTemporary=True),
# ...

You can see a list of tables and see which ones are cached.

df = spark.createDataFrame([(x, x) for x in range(10000)], 'a INT, b INT')
df.is_cached
# False
df.createOrReplaceTempView('dummy')
spark.catalog.listTables()
# [Table(name='dummy', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
spark.catalog.isCached('dummy')
# False
df.cache()
spark.catalog.isCached('dummy')
# True

Other functions such as spark.catalog.clearCache() could also be very useful.

Empty spark dataframes

Creating an empty spark dataframe is a bit tricky. Let’s see some examples. First, let’s create a SparkSession object to use.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('my_app').getOrCreate()

The following command fails because the schema cannot be inferred. We can make it work by specifying the schema as a string.

spark.createDataFrame([])  # fails!
# ...
# ValueError: can not infer schema from empty dataset

df = spark.createDataFrame([], 'a INT, b DOUBLE, c STRING')  # works!
df.dtypes
# [('a', 'int'), ('b', 'double'), ('c', 'string')]

Things get a little bit more interesting when we create a spark dataframe from a pandas dataframe.

x = pd.DataFrame({'a': [1.0], 'b': [1.0]})  # works as expected
print(x.dtypes)  # pandas dataframe has a schema
# a    float64
# b    float64
# dtype: object
spark.createDataFrame(x).dtypes  # no need to specify schema because it can be inferred
# [('a', 'double'), ('b', 'double')]

An empty pandas dataframe has a schema but spark is unable to infer it.

y = pd.DataFrame({'a': [], 'b': []})
print(y.dtypes)  # default dtype is float64
# a    float64
# b    float64
# dtype: object
spark.createDataFrame(y)  # fails!
# ...
# ValueError: can not infer schema from empty dataset

Now, funnily enough, spark completely ignores an empty pandas dataframe’s schema.

# works as expected
spark.createDataFrame(pd.DataFrame({'a': [], 'b': []}), 'a INT, b DOUBLE').dtypes
# [('a', 'int'), ('b', 'double')]

# also works!
spark.createDataFrame(pd.DataFrame({'a': [], 'b': []}), 'a INT').dtypes
# [('a', 'int')]

# also works!
spark.createDataFrame(pd.DataFrame({'a': [], 'b': []}), 'b INT').dtypes
# [('b', 'int')]

# still works!
spark.createDataFrame(pd.DataFrame({'a': [], 'b': []}), 'c INT').dtypes
# [('c', 'int')]

Search & destroy files

It is a common task to recursively seek and destroy all .pyc files within current directory. The following command does that for us in a bash shell.

find . -name "*.pyc" -type f -exec rm "{}" \;

Simply typing find . -name "*.pyc" -type f would list all files (and not directories) whose name matches the glob *.pyc within the directory . (current directory). find also accepts -exec option in both Linux and MacOS (though -delete option may not always be available). See other variants here. The above command works even for filenames or paths that have spaces in them. In contrast, using find . -name "*.pyc" -type f | xargs rm is risky when filenames or paths contain spaces. Here is a demo:

# In a bash shell
$ find . -name "*.pyc" -type f
./hello world.pyc
./p q r/this has spaces.pyc
./p q r/d.pyc
./abc.pyc
./a/pqr.pyc

$ find . -name "*.pyc" -type f -exec rm "{}" \;
$ find . -name "*.pyc" -type f
# Nothing anymore!