From 5a541815a7e7f0645f532da622352b9198979833 Mon Sep 17 00:00:00 2001 From: Dave Brondsema Date: Tue, 15 Mar 2016 18:06:54 -0400 Subject: [PATCH] Option to filter ops by database(s) --- bin/oplogreplay | 8 +++++- oplogreplay/oplogreplayer.py | 4 +-- oplogreplay/oplogwatcher.py | 51 +++++++++++++++++++++++------------- 3 files changed, 42 insertions(+), 21 deletions(-) diff --git a/bin/oplogreplay b/bin/oplogreplay index cecf028..608a041 100755 --- a/bin/oplogreplay +++ b/bin/oplogreplay @@ -27,6 +27,10 @@ def parse_arguments(): help='start oplog-replay from this timestamp forward ' '(e.g.: "1339412676,7")') + parser.add_option('--database', action='append', type='string', + dest='databases', + help='only replay from this database (may specify multiple times)') + (options, args) = parser.parse_args() # Validate timestamp. @@ -83,7 +87,9 @@ def main(): # Start OplogReplayer oplogreplayer = OplogReplayer(options.source, options.dest, replay_indexes=options.replay_indexes, - ts=options.timestamp) + ts=options.timestamp, + databases=options.databases + ) oplogreplayer.start() if __name__ == '__main__': diff --git a/oplogreplay/oplogreplayer.py b/oplogreplay/oplogreplayer.py index 5cb9d29..e5af7f6 100644 --- a/oplogreplay/oplogreplayer.py +++ b/oplogreplay/oplogreplayer.py @@ -40,7 +40,7 @@ def is_index_operation(raw): OplogReplayer.is_drop_index(raw)) def __init__(self, source, dest, replay_indexes=True, ts=None, - poll_time=1.0): + poll_time=1.0, databases=None): # Create a one-time connection to source, to determine replicaset. c = pymongo.Connection(source) try: @@ -69,7 +69,7 @@ def __init__(self, source, dest, replay_indexes=True, ts=None, # Compute velocity every few ops. self._started_at = self._last_velocity_at = time.time() self._last_replay_count = 0 - OplogWatcher.__init__(self, self.source, ts=ts, poll_time=poll_time) + OplogWatcher.__init__(self, self.source, ts=ts, poll_time=poll_time, databases=databases) def print_replication_info(self): # Only print replication info every few hundred replayed ops. diff --git a/oplogreplay/oplogwatcher.py b/oplogreplay/oplogwatcher.py index 133c644..36a09fc 100644 --- a/oplogreplay/oplogwatcher.py +++ b/oplogreplay/oplogwatcher.py @@ -24,13 +24,14 @@ def __get_id(op): return opid - def __init__(self, connection, ts=None, poll_time=1.0): + def __init__(self, connection, ts=None, poll_time=1.0, databases=None): if ts is not None and not isinstance(ts, Timestamp): raise ValueError('ts argument: expected %r, got %r' % \ (Timestamp, type(ts))) self.poll_time = poll_time self.connection = connection self.ts = ts + self.databases = databases # Mark as running. self.running = True @@ -89,25 +90,39 @@ def process_op(self, ns, raw): "db" declares presence of a database "n" no op """ - # Compute the document id of the document that will be altered - # (in case of insert, update or delete). - docid = self.__get_id(raw) op = raw['op'] - if op == 'i': - self.insert(ns=ns, docid=docid, raw=raw) - elif op == 'u': - self.update(ns=ns, docid=docid, raw=raw) - elif op == 'd': - self.delete(ns=ns, docid=docid, raw=raw) - elif op == 'c': - self.command(ns=ns, raw=raw) - elif op == 'db': - self.db_declare(ns=ns, raw=raw) - elif op == 'n': - self.noop() - else: - logging.error("Unknown op: %r" % op) + process = True + + # check the database option and possibly not process this op + if op != 'n' and self.databases: + try: + db, collection = ns.split('.', 1) + except ValueError: + logging.error("Unable to parse ns: %r" % ns) + else: + if db not in self.databases: + process = False + + if process: + # Compute the document id of the document that will be altered + # (in case of insert, update or delete). + docid = self.__get_id(raw) + + if op == 'i': + self.insert(ns=ns, docid=docid, raw=raw) + elif op == 'u': + self.update(ns=ns, docid=docid, raw=raw) + elif op == 'd': + self.delete(ns=ns, docid=docid, raw=raw) + elif op == 'c': + self.command(ns=ns, raw=raw) + elif op == 'db': + self.db_declare(ns=ns, raw=raw) + elif op == 'n': + self.noop() + else: + logging.error("Unknown op: %r" % op) # Save timestamp of last processed oplog. self.ts = raw['ts']