Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python

"""
Real-time log files watcher supporting log rotation.
Works with Python >= 2.6 and >= 3.2, on both POSIX and Windows.

Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
License: MIT
"""

import os
import time
import errno
import stat
import sys


class LogWatcher(object):
    """Looks for changes in all files of a directory.
    This is useful for watching log file changes in real-time.
    It also supports files rotation.

    Example:

    >>> def callback(filename, lines):
    ...     print(filename, lines)
    ...
    >>> lw = LogWatcher("/var/log/", callback)
    >>> lw.loop()
    """

    def __init__(self, folder, callback, extensions=["log"], tail_lines=0):
        """Arguments:

        (str) @folder:
            the folder to watch

        (callable) @callback:
            a function which is called every time a new line in a
            file being watched is found;
            this is called with "filename" and "lines" arguments.

        (list) @extensions:
            only watch files with these extensions

        (int) @tail_lines:
            read last N lines from files being watched before starting
        """
        self.files_map = {}
        self.callback = callback
        self.folder = os.path.realpath(folder)
        self.extensions = extensions
        assert os.path.isdir(self.folder), self.folder
        assert callable(callback), repr(callback)
        self.update_files()
        # The first time we run the script we move all file markers at EOF.
        # In case of files created afterwards we don't do this.
        for id, file in self.files_map.items():
            file.seek(os.path.getsize(file.name))  # EOF
            if tail_lines:
                try:
                    lines = self.tail(file.name, tail_lines)
                except IOError as err:
                    if err.errno != errno.ENOENT:
                        raise
                else:
                    if lines:
                        self.callback(file.name, lines)

    def __del__(self):
        self.close()

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

    def loop(self, interval=0.1, blocking=True):
        """Start the loop.
        If blocking is False make one loop then return.
        """
        while 1:
            self.update_files()
            for fid, file in list(self.files_map.items()):
                self.readfile(file)
            if not blocking:
                return
            time.sleep(interval)

    def log(self, line):
        """Log when a file is un/watched"""
        print(line)

    def listdir(self):
        """List directory and filter files by extension.
        You may want to override this to add extra logic or
        globbling support.
        """
        ls = os.listdir(self.folder)
        if self.extensions:
            return [x for x in ls if os.path.splitext(x)[1][1:] \
                                           in self.extensions]
        else:
            return ls

    @classmethod
    def open(cls, file):
        """Wrapper around open().
        By default file is opened in binary mode and readlines()
        return bytes on both Python 2 and 3.
        This means callback() will deal with a list of bytes.
        Can be overridden in order to deal with unicode strings
        instead with:

        import codecs, locale
        return codecs.open(file, 'r', encoding=locale.getpreferredencoding(),
                           errors='ignore')
        """
        return open(file, 'rb')

    @classmethod
    def tail(cls, fname, window):
        """Read last N lines from file fname."""
        if window <= 0:
            raise ValueError('invalid window %r' % window)
        f = cls.open(fname)
        BUFSIZ = 1024
        # open() was overridden and file was opened in text
        # mode; read() will return a string instead bytes.
        encoded = getattr(f, 'encoding', False)
        CR = '\n' if encoded else b'\n'
        data = '' if encoded else b''
        f.seek(0, os.SEEK_END)
        fsize = f.tell()
        block = -1
        exit = False
        while not exit:
            step = (block * BUFSIZ)
            if abs(step) >= fsize:
                f.seek(0)
                newdata = f.read(BUFSIZ - (abs(step) - fsize))
                exit = True
            else:
                f.seek(step, os.SEEK_END)
                newdata = f.read(BUFSIZ)
            data = newdata + data
            if data.count(CR) >= window:
                break
            else:
                block -= 1
        return data.splitlines()[-window:]

    def update_files(self):
        ls = []
        for name in self.listdir():
            absname = os.path.realpath(os.path.join(self.folder, name))
            try:
                st = os.stat(absname)
            except EnvironmentError as err:
                if err.errno != errno.ENOENT:
                    raise
            else:
                if not stat.S_ISREG(st.st_mode):
                    continue
                fid = self.get_file_id(st)
                ls.append((fid, absname))

        # check existent files
        for fid, file in list(self.files_map.items()):
            try:
                st = os.stat(file.name)
            except EnvironmentError as err:
                if err.errno == errno.ENOENT:
                    self.unwatch(file, fid)
                else:
                    raise
            else:
                if fid != self.get_file_id(st):
                    # same name but different file (rotation); reload it.
                    self.unwatch(file, fid)
                    self.watch(file.name)

        # add new ones
        for fid, fname in ls:
            if fid not in self.files_map:
                self.watch(fname)

    def readfile(self, file):
        lines = file.readlines()
        if lines:
            self.callback(file.name, lines)

    def watch(self, fname):
        try:
            file = self.open(fname)
            fid = self.get_file_id(os.stat(fname))
        except EnvironmentError as err:
            if err.errno != errno.ENOENT:
                raise
        else:
            self.log("watching logfile %s" % fname)
            self.files_map[fid] = file

    def unwatch(self, file, fid):
        # file no longer exists; if it has been renamed
        # try to read it for the last time in case the
        # log rotator has written something in it.
        lines = self.readfile(file)
        self.log("un-watching logfile %s" % file.name)
        del self.files_map[fid]
        if lines:
            self.callback(file.name, lines)

    @staticmethod
    def get_file_id(st):
        if os.name == 'posix':
            return "%xg%x" % (st.st_dev, st.st_ino)
        else:
            return "%f" % st.st_ctime

    def close(self):
        for id, file in self.files_map.items():
            file.close()
        self.files_map.clear()


# ===================================================================
# --- tests
# ===================================================================

if __name__ == '__main__':
    import unittest
    import atexit

    TESTFN = '$testfile.log'
    TESTFN2 = '$testfile2.log'
    PY3 = sys.version_info[0] == 3

    if PY3:
        def b(s):
            return s.encode("latin-1")
    else:
        def b(s):
            return s

    class TestLogWatcher(unittest.TestCase):

        def setUp(self):
            def callback(filename, lines):
                self.filename.append(filename)
                for line in lines:
                    self.lines.append(line)

            self.filename = []
            self.lines = []
            self.file = open(TESTFN, 'w')
            self.watcher = LogWatcher(os.getcwd(), callback)

        def tearDown(self):
            self.watcher.close()
            self.remove_test_files()

        def write_file(self, data):
            self.file.write(data)
            self.file.flush()

        @staticmethod
        @atexit.register
        def remove_test_files():
            for x in [TESTFN, TESTFN2]:
                try:
                    os.remove(x)
                except EnvironmentError:
                    pass

        def test_no_lines(self):
            self.watcher.loop(blocking=False)

        def test_one_line(self):
            self.write_file('foo')
            self.watcher.loop(blocking=False)
            self.assertEqual(self.lines, [b"foo"])
            self.assertEqual(self.filename, [os.path.abspath(TESTFN)])

        def test_two_lines(self):
            self.write_file('foo\n')
            self.write_file('bar\n')
            self.watcher.loop(blocking=False)
            self.assertEqual(self.lines, [b"foo\n", b"bar\n"])
            self.assertEqual(self.filename, [os.path.abspath(TESTFN)])

        def test_new_file(self):
            with open(TESTFN2, "w") as f:
                f.write("foo")
            self.watcher.loop(blocking=False)
            self.assertEqual(self.lines, [b"foo"])
            self.assertEqual(self.filename, [os.path.abspath(TESTFN2)])

        def test_file_removed(self):
            # when something is written in a file we're watching and then
            # it gets deleted we expect to be able to read what was written
            # (log rotation)
            self.write_file("foo")
            try:
                os.remove(TESTFN)
            except EnvironmentError:  # necessary on Windows
                pass
            self.watcher.loop(blocking=False)
            self.assertEqual(self.lines, [b"foo"])

        def test_tail(self):
            MAX = 10000
            content = '\n'.join([str(x) for x in range(0, MAX)])
            self.write_file(content)
            # input < BUFSIZ (1 iteration)
            lines = self.watcher.tail(self.file.name, 100)
            self.assertEqual(len(lines), 100)
            self.assertEqual(lines, [b(str(x)) for x in range(MAX-100, MAX)])
            # input > BUFSIZ (multiple iterations)
            lines = self.watcher.tail(self.file.name, 5000)
            self.assertEqual(len(lines), 5000)
            self.assertEqual(lines, [b(str(x)) for x in range(MAX-5000, MAX)])
            # input > file's total lines
            lines = self.watcher.tail(self.file.name, MAX + 9999)
            self.assertEqual(len(lines), MAX)
            self.assertEqual(lines, [b(str(x)) for x in range(0, MAX)])
            #
            self.assertRaises(ValueError, self.watcher.tail, self.file.name, 0)
            LogWatcher.tail(self.file.name, 10)

        def test_ctx_manager(self):
            with self.watcher:
                pass


    test_suite = unittest.TestSuite()
    test_suite.addTest(unittest.makeSuite(TestLogWatcher))
    unittest.TextTestRunner(verbosity=2).run(test_suite)

Diff to Previous Revision

--- revision 3 2011-12-06 21:10:16
+++ revision 4 2013-03-18 17:59:09
@@ -1,7 +1,8 @@
 #!/usr/bin/env python
 
 """
-Real time log files watcher supporting log rotation.
+Real-time log files watcher supporting log rotation.
+Works with Python >= 2.6 and >= 3.2, on both POSIX and Windows.
 
 Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
 License: MIT
@@ -11,6 +12,7 @@
 import time
 import errno
 import stat
+import sys
 
 
 class LogWatcher(object):
@@ -21,10 +23,10 @@
     Example:
 
     >>> def callback(filename, lines):
-    ...     print filename, lines
+    ...     print(filename, lines)
     ...
-    >>> l = LogWatcher("/var/log/", callback)
-    >>> l.loop()
+    >>> lw = LogWatcher("/var/log/", callback)
+    >>> lw.loop()
     """
 
     def __init__(self, folder, callback, extensions=["log"], tail_lines=0):
@@ -34,8 +36,8 @@
             the folder to watch
 
         (callable) @callback:
-            a function which is called every time a new line in a 
-            file being watched is found; 
+            a function which is called every time a new line in a
+            file being watched is found;
             this is called with "filename" and "lines" arguments.
 
         (list) @extensions:
@@ -48,37 +50,47 @@
         self.callback = callback
         self.folder = os.path.realpath(folder)
         self.extensions = extensions
-        assert os.path.isdir(self.folder), "%s does not exists" \
-                                            % self.folder
-        assert callable(callback)
+        assert os.path.isdir(self.folder), self.folder
+        assert callable(callback), repr(callback)
         self.update_files()
         # The first time we run the script we move all file markers at EOF.
         # In case of files created afterwards we don't do this.
-        for id, file in self.files_map.iteritems():
+        for id, file in self.files_map.items():
             file.seek(os.path.getsize(file.name))  # EOF
             if tail_lines:
-                lines = self.tail(file.name, tail_lines)
-                if lines:
-                    self.callback(file.name, lines)
+                try:
+                    lines = self.tail(file.name, tail_lines)
+                except IOError as err:
+                    if err.errno != errno.ENOENT:
+                        raise
+                else:
+                    if lines:
+                        self.callback(file.name, lines)
 
     def __del__(self):
         self.close()
 
-    def loop(self, interval=0.1, async=False):
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def loop(self, interval=0.1, blocking=True):
         """Start the loop.
-        If async is True make one loop then return.
+        If blocking is False make one loop then return.
         """
         while 1:
             self.update_files()
-            for fid, file in list(self.files_map.iteritems()):
+            for fid, file in list(self.files_map.items()):
                 self.readfile(file)
-            if async:
+            if not blocking:
                 return
             time.sleep(interval)
 
     def log(self, line):
         """Log when a file is un/watched"""
-        print line
+        print(line)
 
     def listdir(self):
         """List directory and filter files by extension.
@@ -92,36 +104,52 @@
         else:
             return ls
 
-    @staticmethod
-    def tail(fname, window):
+    @classmethod
+    def open(cls, file):
+        """Wrapper around open().
+        By default file is opened in binary mode and readlines()
+        return bytes on both Python 2 and 3.
+        This means callback() will deal with a list of bytes.
+        Can be overridden in order to deal with unicode strings
+        instead with:
+
+        import codecs, locale
+        return codecs.open(file, 'r', encoding=locale.getpreferredencoding(),
+                           errors='ignore')
+        """
+        return open(file, 'rb')
+
+    @classmethod
+    def tail(cls, fname, window):
         """Read last N lines from file fname."""
-        try:
-            f = open(fname, 'r')
-        except IOError, err:
-            if err.errno == errno.ENOENT:
-                return []
+        if window <= 0:
+            raise ValueError('invalid window %r' % window)
+        f = cls.open(fname)
+        BUFSIZ = 1024
+        # open() was overridden and file was opened in text
+        # mode; read() will return a string instead bytes.
+        encoded = getattr(f, 'encoding', False)
+        CR = '\n' if encoded else b'\n'
+        data = '' if encoded else b''
+        f.seek(0, os.SEEK_END)
+        fsize = f.tell()
+        block = -1
+        exit = False
+        while not exit:
+            step = (block * BUFSIZ)
+            if abs(step) >= fsize:
+                f.seek(0)
+                newdata = f.read(BUFSIZ - (abs(step) - fsize))
+                exit = True
             else:
-                raise
-        else:
-            BUFSIZ = 1024
-            f.seek(0, os.SEEK_END)
-            fsize = f.tell()
-            block = -1
-            data = ""
-            exit = False
-            while not exit:
-                step = (block * BUFSIZ)
-                if abs(step) >= fsize:
-                    f.seek(0)
-                    exit = True
-                else:
-                    f.seek(step, os.SEEK_END)
-                data = f.read().strip()
-                if data.count('\n') >= window:
-                    break
-                else:
-                    block -= 1
-            return data.splitlines()[-window:]
+                f.seek(step, os.SEEK_END)
+                newdata = f.read(BUFSIZ)
+            data = newdata + data
+            if data.count(CR) >= window:
+                break
+            else:
+                block -= 1
+        return data.splitlines()[-window:]
 
     def update_files(self):
         ls = []
@@ -129,7 +157,7 @@
             absname = os.path.realpath(os.path.join(self.folder, name))
             try:
                 st = os.stat(absname)
-            except EnvironmentError, err:
+            except EnvironmentError as err:
                 if err.errno != errno.ENOENT:
                     raise
             else:
@@ -139,10 +167,10 @@
                 ls.append((fid, absname))
 
         # check existent files
-        for fid, file in list(self.files_map.iteritems()):
+        for fid, file in list(self.files_map.items()):
             try:
                 st = os.stat(file.name)
-            except EnvironmentError, err:
+            except EnvironmentError as err:
                 if err.errno == errno.ENOENT:
                     self.unwatch(file, fid)
                 else:
@@ -165,9 +193,9 @@
 
     def watch(self, fname):
         try:
-            file = open(fname, "r")
+            file = self.open(fname)
             fid = self.get_file_id(os.stat(fname))
-        except EnvironmentError, err:
+        except EnvironmentError as err:
             if err.errno != errno.ENOENT:
                 raise
         else:
@@ -186,18 +214,126 @@
 
     @staticmethod
     def get_file_id(st):
-        return "%xg%x" % (st.st_dev, st.st_ino)
+        if os.name == 'posix':
+            return "%xg%x" % (st.st_dev, st.st_ino)
+        else:
+            return "%f" % st.st_ctime
 
     def close(self):
-        for id, file in self.files_map.iteritems():
+        for id, file in self.files_map.items():
             file.close()
         self.files_map.clear()
 
 
+# ===================================================================
+# --- tests
+# ===================================================================
+
 if __name__ == '__main__':
-    def callback(filename, lines):
-        for line in lines:
-            print line
-
-    l = LogWatcher("/var/log/", callback)
-    l.loop()
+    import unittest
+    import atexit
+
+    TESTFN = '$testfile.log'
+    TESTFN2 = '$testfile2.log'
+    PY3 = sys.version_info[0] == 3
+
+    if PY3:
+        def b(s):
+            return s.encode("latin-1")
+    else:
+        def b(s):
+            return s
+
+    class TestLogWatcher(unittest.TestCase):
+
+        def setUp(self):
+            def callback(filename, lines):
+                self.filename.append(filename)
+                for line in lines:
+                    self.lines.append(line)
+
+            self.filename = []
+            self.lines = []
+            self.file = open(TESTFN, 'w')
+            self.watcher = LogWatcher(os.getcwd(), callback)
+
+        def tearDown(self):
+            self.watcher.close()
+            self.remove_test_files()
+
+        def write_file(self, data):
+            self.file.write(data)
+            self.file.flush()
+
+        @staticmethod
+        @atexit.register
+        def remove_test_files():
+            for x in [TESTFN, TESTFN2]:
+                try:
+                    os.remove(x)
+                except EnvironmentError:
+                    pass
+
+        def test_no_lines(self):
+            self.watcher.loop(blocking=False)
+
+        def test_one_line(self):
+            self.write_file('foo')
+            self.watcher.loop(blocking=False)
+            self.assertEqual(self.lines, [b"foo"])
+            self.assertEqual(self.filename, [os.path.abspath(TESTFN)])
+
+        def test_two_lines(self):
+            self.write_file('foo\n')
+            self.write_file('bar\n')
+            self.watcher.loop(blocking=False)
+            self.assertEqual(self.lines, [b"foo\n", b"bar\n"])
+            self.assertEqual(self.filename, [os.path.abspath(TESTFN)])
+
+        def test_new_file(self):
+            with open(TESTFN2, "w") as f:
+                f.write("foo")
+            self.watcher.loop(blocking=False)
+            self.assertEqual(self.lines, [b"foo"])
+            self.assertEqual(self.filename, [os.path.abspath(TESTFN2)])
+
+        def test_file_removed(self):
+            # when something is written in a file we're watching and then
+            # it gets deleted we expect to be able to read what was written
+            # (log rotation)
+            self.write_file("foo")
+            try:
+                os.remove(TESTFN)
+            except EnvironmentError:  # necessary on Windows
+                pass
+            self.watcher.loop(blocking=False)
+            self.assertEqual(self.lines, [b"foo"])
+
+        def test_tail(self):
+            MAX = 10000
+            content = '\n'.join([str(x) for x in range(0, MAX)])
+            self.write_file(content)
+            # input < BUFSIZ (1 iteration)
+            lines = self.watcher.tail(self.file.name, 100)
+            self.assertEqual(len(lines), 100)
+            self.assertEqual(lines, [b(str(x)) for x in range(MAX-100, MAX)])
+            # input > BUFSIZ (multiple iterations)
+            lines = self.watcher.tail(self.file.name, 5000)
+            self.assertEqual(len(lines), 5000)
+            self.assertEqual(lines, [b(str(x)) for x in range(MAX-5000, MAX)])
+            # input > file's total lines
+            lines = self.watcher.tail(self.file.name, MAX + 9999)
+            self.assertEqual(len(lines), MAX)
+            self.assertEqual(lines, [b(str(x)) for x in range(0, MAX)])
+            #
+            self.assertRaises(ValueError, self.watcher.tail, self.file.name, 0)
+            LogWatcher.tail(self.file.name, 10)
+
+        def test_ctx_manager(self):
+            with self.watcher:
+                pass
+
+
+    test_suite = unittest.TestSuite()
+    test_suite.addTest(unittest.makeSuite(TestLogWatcher))
+    unittest.TextTestRunner(verbosity=2).run(test_suite)

History