Skip to content

Commit

Permalink
Feature hotfixes (#274)
Browse files Browse the repository at this point in the history
* hot fixes for data analyzer issues

* hotfixes for issues in DataAnalyzer

* changed comment wording
  • Loading branch information
ronanstokes-db authored May 22, 2024
1 parent 1113d73 commit 2482dca
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 15 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
## Change History
All notable changes to the Databricks Labs Data Generator will be documented in this file.


### Version 0.3.6 Post 1

#### Changed
* Updated docs for complex data types / JSON to correct code examples
* Updated license file in public docs

#### Fixed
* Fixed scenario where `DataAnalyzer` is used on dataframe containing a column named `summary`


### Version 0.3.6

#### Changed
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ details of use and many examples.

Release notes and details of the latest changes for this specific release
can be found in the GitHub repository
[here](https://github.com/databrickslabs/dbldatagen/blob/release/v0.3.6/CHANGELOG.md)
[here](https://github.com/databrickslabs/dbldatagen/blob/release/v0.3.6post1/CHANGELOG.md)

# Installation

Expand Down
2 changes: 1 addition & 1 deletion dbldatagen/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def get_version(version):
return version_info


__version__ = "0.3.6" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion
__version__ = "0.3.6post1" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion
__version_info__ = get_version(__version__)


Expand Down
72 changes: 64 additions & 8 deletions dbldatagen/data_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
This code is experimental and both APIs and code generated is liable to change in future versions.
"""
from pyspark.sql.types import LongType, FloatType, IntegerType, StringType, DoubleType, BooleanType, ShortType, \
TimestampType, DateType, DecimalType, ByteType, BinaryType, StructType, ArrayType, DataType
import logging

import pyspark.sql as ssql
import pyspark.sql.functions as F
from pyspark.sql.types import LongType, FloatType, IntegerType, StringType, DoubleType, BooleanType, ShortType, \
TimestampType, DateType, DecimalType, ByteType, BinaryType, StructType, ArrayType, DataType

from .utils import strip_margins
from .spark_singleton import SparkSingleton
from .utils import strip_margins

SUMMARY_FIELD_NAME = "summary"
SUMMARY_FIELD_NAME_RENAMED = "__summary__"
DATA_SUMMARY_FIELD_NAME = "__data_summary__"


class DataAnalyzer:
Expand All @@ -23,6 +27,8 @@ class DataAnalyzer:
:param df: Spark dataframe to analyze
:param sparkSession: Spark session instance to use when performing spark operations
:param debug: If True, additional debug information is logged
:param verbose: If True, additional information is logged
.. warning::
Experimental
Expand All @@ -43,11 +49,17 @@ class DataAnalyzer:
|# Column definitions are stubs only - modify to generate correct data
|#""", '|')

def __init__(self, df=None, sparkSession=None):
def __init__(self, df=None, sparkSession=None, debug=False, verbose=False):
""" Constructor:
:param df: Dataframe to analyze
:param sparkSession: Spark session to use
"""
# set up logging
self.verbose = verbose
self.debug = debug

self._setupLogger()

assert df is not None, "dataframe must be supplied"

self._df = df
Expand All @@ -58,6 +70,19 @@ def __init__(self, df=None, sparkSession=None):
self._sparkSession = sparkSession
self._dataSummary = None

def _setupLogger(self):
"""Set up logging
This will set the logger at warning, info or debug levels depending on the instance construction parameters
"""
self.logger = logging.getLogger("DataAnalyzer")
if self.debug:
self.logger.setLevel(logging.DEBUG)
elif self.verbose:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.WARNING)

def _displayRow(self, row):
"""Display details for row"""
results = []
Expand Down Expand Up @@ -95,6 +120,31 @@ def _addMeasureToSummary(self, measureName, summaryExpr="''", fieldExprs=None, d

return dfResult

def _get_dataframe_describe_stats(self, df):
""" Get summary statistics for dataframe handling renaming of summary field if necessary"""
print("schema", df.schema)

src_fields = [fld.name for fld in df.schema.fields]
print("src_fields", src_fields)
renamed_summary = False

# get summary statistics handling the case where a field named 'summary' exists
# if the `summary` field name exists, we'll rename it to avoid a conflict
if SUMMARY_FIELD_NAME in src_fields:
renamed_summary = True
df = df.withColumnRenamed(SUMMARY_FIELD_NAME, SUMMARY_FIELD_NAME_RENAMED)

# The dataframe describe method produces a field named `summary`. We'll rename this to avoid conflict with
# any natural fields using the same name.
summary_df = df.describe().withColumnRenamed(SUMMARY_FIELD_NAME, DATA_SUMMARY_FIELD_NAME)

# if we renamed a field called `summary` in the data, we'll rename it back.
# The data summary field produced by the describe method has already been renamed so there will be no conflict.
if renamed_summary:
summary_df = summary_df.withColumnRenamed(SUMMARY_FIELD_NAME_RENAMED, SUMMARY_FIELD_NAME)

return summary_df

def summarizeToDF(self):
""" Generate summary analysis of data set as dataframe
Expand Down Expand Up @@ -154,11 +204,12 @@ def summarizeToDF(self):
dfData=self._df,
dfSummary=dfDataSummary)

descriptionDf = self._df.describe().where("summary in ('mean', 'stddev')")
descriptionDf = (self._get_dataframe_describe_stats(self._df)
.where(f"{DATA_SUMMARY_FIELD_NAME} in ('mean', 'stddev')"))
describeData = descriptionDf.collect()

for row in describeData:
measure = row['summary']
measure = row[DATA_SUMMARY_FIELD_NAME]

values = {k[0]: '' for k in dtypes}

Expand Down Expand Up @@ -401,7 +452,12 @@ def scriptDataGeneratorFromData(self, suppressOutput=False, name=None):
"""
assert self._df is not None
assert type(self._df) is ssql.DataFrame, "sourceDf must be a valid Pyspark dataframe"

if not isinstance(self._df, ssql.DataFrame):
self.logger.warning(strip_margins(
"""The parameter `sourceDf` should be a valid Pyspark dataframe.
|Note this warning may false due to use of remote connection to a Spark cluster""",
'|'))

if self._dataSummary is None:
df_summary = self.summarizeToDF()
Expand Down
2 changes: 0 additions & 2 deletions dbldatagen/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def deprecated(message=""):
"""

# create closure around function that follows use of the decorator

def deprecated_decorator(func):
@functools.wraps(func)
def deprecated_func(*args, **kwargs):
Expand Down Expand Up @@ -290,7 +289,6 @@ def split_list_matching_condition(lst, cond):
x = ['id', 'city_name', 'id', 'city_id', 'city_pop', 'id', 'city_id', 'city_pop','city_id', 'city_pop','id']
splitListOnCondition(x, lambda el: el == 'id')
Result:
`[['id'], ['city_name'], ['id'], ['city_id', 'city_pop'],
['id'], ['city_id', 'city_pop', 'city_id', 'city_pop'], ['id']]`
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
author = 'Databricks Inc'

# The full version, including alpha/beta/rc tags
release = "0.3.6" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion
release = "0.3.6post1" # DO NOT EDIT THIS DIRECTLY! It is managed by bumpversion


# -- General configuration ---------------------------------------------------
Expand Down
24 changes: 24 additions & 0 deletions docs/source/generating_json_data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Generating JSON and Structured Column Data
This section explores generating JSON and structured column data. By structured columns,
we mean columns that are some combination of `struct`, `array` and `map` of other types.

*Note that some of the examples are code fragments for illustration purposes only.*

Generating JSON data
--------------------
There are several methods for generating JSON data:
Expand All @@ -25,6 +27,7 @@ The following example illustrates the basic technique for generating JSON data f
import dbldatagen as dg
device_population = 100000
country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
Expand Down Expand Up @@ -106,6 +109,7 @@ Note that in the current release, the `expr` attribute will override other colum
import dbldatagen as dg
device_population = 100000
country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
Expand Down Expand Up @@ -221,6 +225,7 @@ functions such as `named_struct` and `to_json`.
import dbldatagen as dg
device_population = 100000
country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
Expand Down Expand Up @@ -341,6 +346,25 @@ populated.

The following example illustrates this:

.. code-block:: python
import dbldatagen as dg
column_count = 10
data_rows = 10 * 1000
df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows)
.withIdOutput()
.withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)",
numColumns=column_count, structType="array")
.withColumn("code1", "integer", minValue=100, maxValue=200)
.withColumn("code2", "integer", minValue=0, maxValue=10)
.withColumn("code3", "string", values=['one', 'two', 'three'])
.withColumn("code4", "string", values=['one', 'two', 'three'])
.withColumn("code5", dg.INFER_DATATYPE, expr="current_date()")
.withColumn("code6", dg.INFER_DATATYPE, expr="code1 + code2")
.withColumn("code7", dg.INFER_DATATYPE, expr="concat(code3, code4)")
)
Using multi feature columns to generate arrays
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.6
current_version = 0.3.6post1
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+){0,1}(?P<release>\D*)(?P<build>\d*)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

setuptools.setup(
name="dbldatagen",
version="0.3.6",
version="0.3.6post1",
author="Ronan Stokes, Databricks",
description="Databricks Labs - PySpark Synthetic Data Generator",
long_description=long_description,
Expand Down
6 changes: 6 additions & 0 deletions tests/test_generation_from_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,9 @@ def test_summarize_to_df(self, generation_spec, testLogger):
df = analyzer.summarizeToDF()

df.show()

def test_df_containing_summary(self):
df = spark.range(10).withColumnRenamed("id", "summary")
summary_df = dg.DataAnalyzer(sparkSession=spark, df=df).summarizeToDF()

assert summary_df.count() == 10

0 comments on commit 2482dca

Please sign in to comment.