Skip to content

Commit

Permalink
[Hotfix] Fix Duration class can't load for pyflink
Browse files Browse the repository at this point in the history
  • Loading branch information
RocMarshal authored and 1996fanrui committed Mar 11, 2024
1 parent 972dc0c commit eb9bcfa
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions flink-python/pyflink/datastream/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ def cleanup_incrementally(self,
def cleanup_in_rocksdb_compact_filter(
self,
query_time_after_num_entries,
periodic_compaction_time=Duration.of_days(30)) -> \
periodic_compaction_time=None) -> \
'StateTtlConfig.Builder':
"""
Cleanup expired state while Rocksdb compaction is running.
Expand All @@ -833,7 +833,8 @@ def cleanup_in_rocksdb_compact_filter(
self._strategies[
StateTtlConfig.CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER] = \
StateTtlConfig.CleanupStrategies.RocksdbCompactFilterCleanupStrategy(
query_time_after_num_entries, periodic_compaction_time)
query_time_after_num_entries,
periodic_compaction_time if periodic_compaction_time else Duration.of_days(30))
return self

def disable_cleanup_in_background(self) -> 'StateTtlConfig.Builder':
Expand Down Expand Up @@ -925,9 +926,10 @@ class RocksdbCompactFilterCleanupStrategy(CleanupStrategy):

def __init__(self,
query_time_after_num_entries: int,
periodic_compaction_time=Duration.of_days(30)):
periodic_compaction_time=None):
self._query_time_after_num_entries = query_time_after_num_entries
self._periodic_compaction_time = periodic_compaction_time
self._periodic_compaction_time = periodic_compaction_time \
if periodic_compaction_time else Duration.of_days(30)

def get_query_time_after_num_entries(self) -> int:
return self._query_time_after_num_entries
Expand Down

0 comments on commit eb9bcfa

Please sign in to comment.