Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft PR : Feature streaming enhancements merged - not for review #254

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
144 changes: 123 additions & 21 deletions dbldatagen/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,18 @@
from .spark_singleton import SparkSingleton
from .utils import ensure, topologicalSort, DataGenError, deprecated, split_list_matching_condition

START_TIMESTAMP_OPTION = "startTimestamp"
ROWS_PER_SECOND_OPTION = "rowsPerSecond"
AGE_LIMIT_OPTION = "ageLimit"
NUM_PARTITIONS_OPTION = "numPartitions"
ROWS_PER_BATCH_OPTION = "rowsPerBatch"
STREAMING_SOURCE_OPTION = "streamingSource"

_OLD_MIN_OPTION = 'min'
_OLD_MAX_OPTION = 'max'
RATE_SOURCE = "rate"
RATE_PER_MICRO_BATCH_SOURCE = "rate-micro-batch"
SPARK_RATE_MICROBATCH_VERSION = "3.2.1"

_STREAMING_TIMESTAMP_COLUMN = "_source_timestamp"

Expand Down Expand Up @@ -1058,32 +1068,124 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None):
df1 = df1.withColumnRenamed(SPARK_RANGE_COLUMN, self._seedColumnName)

else:
status = (
f"Generating streaming data frame with ids from {startId} to {end_id} with {id_partitions} partitions")
self.logger.info(status)
self.executionHistory.append(status)
df1 = self._getStreamingBaseDataFrame(startId, options)

return df1

df1 = (self.sparkSession.readStream
.format("rate"))
if options is not None:
if "rowsPerSecond" not in options:
options['rowsPerSecond'] = 1
if "numPartitions" not in options:
options['numPartitions'] = id_partitions
def _getStreamingSource(self, options=None, spark_version=None):
""" get streaming source from options

for k, v in options.items():
df1 = df1.option(k, v)
df1 = (df1.load()
.withColumnRenamed("value", self._seedColumnName)
)
:param options: dictionary of options
:returns: streaming source if present in options (popping option from options), or default if not present

Default streaming source is computed based on whether we are running on Spark version 3.2.1 or later

if using spark version 3.2.1 or later - `rate-micro-batch` is used as source, otherwise `rate` is used as source
"""
streaming_source = None
if options is not None:
if STREAMING_SOURCE_OPTION in options:
streaming_source = options[STREAMING_SOURCE_OPTION]
assert streaming_source in [RATE_SOURCE, RATE_PER_MICRO_BATCH_SOURCE], \
f"Invalid streaming source - only ['{RATE_SOURCE}', ['{RATE_PER_MICRO_BATCH_SOURCE}'] supported"

if spark_version is None:
spark_version = self.sparkSession.version

if streaming_source is None:
# if using Spark 3.2.1, then default should be RATE_PER_MICRO_BATCH_SOURCE
if spark_version >= SPARK_RATE_MICROBATCH_VERSION:
streaming_source = RATE_PER_MICRO_BATCH_SOURCE
else:
df1 = (df1.option("rowsPerSecond", 1)
.option("numPartitions", id_partitions)
.load()
.withColumnRenamed("value", self._seedColumnName)
)
streaming_source = RATE_SOURCE

return streaming_source

def _getCurrentSparkTimestamp(self, asLong=False):
""" get current spark timestamp

:param asLong: if True, returns current spark timestamp as long, string otherwise
"""
if asLong:
return (self.sparkSession.sql(f"select cast(now() as long) as start_timestamp")
.collect()[0]['start_timestamp'])
else:
return (self.sparkSession.sql(f"select cast(now() as string) as start_timestamp")
.collect()[0]['start_timestamp'])

def _prepareStreamingOptions(self, options=None, spark_version=None):
default_streaming_partitions = (self.partitions if self.partitions is not None
else self.sparkSession.sparkContext.defaultParallelism)

streaming_source = self._getStreamingSource(options, spark_version)

if options is None:
new_options = ({ROWS_PER_SECOND_OPTION: default_streaming_partitions} if streaming_source == RATE_SOURCE
else {ROWS_PER_BATCH_OPTION: default_streaming_partitions})
else:
new_options = options.copy()

if NUM_PARTITIONS_OPTION in new_options:
streaming_partitions = new_options[NUM_PARTITIONS_OPTION]
else:
streaming_partitions = default_streaming_partitions
new_options[NUM_PARTITIONS_OPTION] = streaming_partitions

if streaming_source == RATE_PER_MICRO_BATCH_SOURCE:
if START_TIMESTAMP_OPTION not in new_options:
new_options[START_TIMESTAMP_OPTION] = self._getCurrentSparkTimestamp(asLong=True)

if ROWS_PER_BATCH_OPTION not in new_options:
# generate one row per partition
new_options[ROWS_PER_BATCH_OPTION] = streaming_partitions

elif streaming_source == RATE_SOURCE:
if ROWS_PER_SECOND_OPTION not in new_options:
new_options[ROWS_PER_SECOND_OPTION] = streaming_partitions
else:
assert streaming_source in [RATE_SOURCE, RATE_PER_MICRO_BATCH_SOURCE], \
f"Invalid streaming source - only ['{RATE_SOURCE}', ['{RATE_PER_MICRO_BATCH_SOURCE}'] supported"

return streaming_source, new_options

def _getStreamingBaseDataFrame(self, startId=0, options=None):
"""Generate base streaming data frame"""
end_id = self._rowCount + startId

# determine streaming source
streaming_source, options = self._prepareStreamingOptions(options)
partitions = options[NUM_PARTITIONS_OPTION]

if streaming_source == RATE_SOURCE:
status = f"Generating streaming data with rate source with {partitions} partitions"
else:
status = f"Generating streaming data with rate-micro-batch source with {partitions} partitions"

self.logger.info(status)
self.executionHistory.append(status)

age_limit_interval = None

if STREAMING_SOURCE_OPTION in options:
options.pop(STREAMING_SOURCE_OPTION)

if AGE_LIMIT_OPTION in options:
age_limit_interval = options.pop("ageLimit")
assert age_limit_interval is not None and float(age_limit_interval) > 0.0, "invalid age limit"

assert AGE_LIMIT_OPTION not in options
assert STREAMING_SOURCE_OPTION not in options

df1 = self.sparkSession.readStream.format(streaming_source)

for k, v in options.items():
df1 = df1.option(k, v)

df1 = df1.load().withColumnRenamed("value", ColumnGenerationSpec.SEED_COLUMN)

if age_limit_interval is not None:
df1 = df1.where(f"""abs(cast(now() as double) - cast(`timestamp` as double ))
< cast({age_limit_interval} as double)""")
return df1

def _computeColumnBuildOrder(self):
Expand Down
8 changes: 8 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ create-dev-env:
@echo "$(OK_COLOR)=> making conda dev environment$(NO_COLOR)"
conda create -n $(ENV_NAME) python=3.8.10

create-dev-env-321:
@echo "$(OK_COLOR)=> making conda dev environment for Spark 3.2.1$(NO_COLOR)"
conda create -n $(ENV_NAME) python=3.8.10

create-github-build-env:
@echo "$(OK_COLOR)=> making conda dev environment$(NO_COLOR)"
conda create -n pip_$(ENV_NAME) python=3.8
Expand All @@ -37,6 +41,10 @@ install-dev-dependencies:
@echo "$(OK_COLOR)=> installing dev environment requirements$(NO_COLOR)"
pip install -r python/dev_require.txt

install-dev-dependencies321:
@echo "$(OK_COLOR)=> installing dev environment requirements for Spark 3.2.1$(NO_COLOR)"
pip install -r python/dev_require_321.txt

clean-dev-env:
@echo "$(OK_COLOR)=> Cleaning dev environment$(NO_COLOR)"
@echo "Current version: $(CURRENT_VERSION)"
Expand Down
33 changes: 33 additions & 0 deletions python/dev_require_321.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# The following packages are used in building the test data generator framework.
# All packages used are already installed in the Databricks runtime environment for version 6.5 or later
numpy==1.20.1
pandas==1.2.4
pickleshare==0.7.5
py4j==0.10.9.3
pyarrow==4.0.0
pyspark==3.2.1
python-dateutil==2.8.1
six==1.15.0

# The following packages are required for development only
wheel==0.36.2
setuptools==52.0.0
bumpversion
pytest
pytest-cov
pytest-timeout
rstcheck
prospector

# The following packages are only required for building documentation and are not required at runtime
sphinx==5.0.0
sphinx_rtd_theme
nbsphinx
numpydoc==0.8
pypandoc
ipython==7.16.3
recommonmark
sphinx-markdown-builder
rst2pdf==0.98
Jinja2 < 3.1

Loading
Loading