#!/usr/bin/env python
"""
Real-time log files watcher supporting log rotation.
Works with Python >= 2.6 and >= 3.2, on both POSIX and Windows.
Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
License: MIT
"""
import os
import time
import errno
import stat
import sys
class LogWatcher(object):
"""Looks for changes in all files of a directory.
This is useful for watching log file changes in real-time.
It also supports files rotation.
Example:
>>> def callback(filename, lines):
... print(filename, lines)
...
>>> lw = LogWatcher("/var/log/", callback)
>>> lw.loop()
"""
def __init__(self, folder, callback, extensions=["log"], tail_lines=0):
"""Arguments:
(str) @folder:
the folder to watch
(callable) @callback:
a function which is called every time a new line in a
file being watched is found;
this is called with "filename" and "lines" arguments.
(list) @extensions:
only watch files with these extensions
(int) @tail_lines:
read last N lines from files being watched before starting
"""
self.files_map = {}
self.callback = callback
self.folder = os.path.realpath(folder)
self.extensions = extensions
assert os.path.isdir(self.folder), self.folder
assert callable(callback), repr(callback)
self.update_files()
# The first time we run the script we move all file markers at EOF.
# In case of files created afterwards we don't do this.
for id, file in self.files_map.items():
file.seek(os.path.getsize(file.name)) # EOF
if tail_lines:
try:
lines = self.tail(file.name, tail_lines)
except IOError as err:
if err.errno != errno.ENOENT:
raise
else:
if lines:
self.callback(file.name, lines)
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def loop(self, interval=0.1, blocking=True):
"""Start the loop.
If blocking is False make one loop then return.
"""
while 1:
self.update_files()
for fid, file in list(self.files_map.items()):
self.readfile(file)
if not blocking:
return
time.sleep(interval)
def log(self, line):
"""Log when a file is un/watched"""
print(line)
def listdir(self):
"""List directory and filter files by extension.
You may want to override this to add extra logic or
globbling support.
"""
ls = os.listdir(self.folder)
if self.extensions:
return [x for x in ls if os.path.splitext(x)[1][1:] \
in self.extensions]
else:
return ls
@classmethod
def open(cls, file):
"""Wrapper around open().
By default file is opened in binary mode and readlines()
return bytes on both Python 2 and 3.
This means callback() will deal with a list of bytes.
Can be overridden in order to deal with unicode strings
instead with:
import codecs, locale
return codecs.open(file, 'r', encoding=locale.getpreferredencoding(),
errors='ignore')
"""
return open(file, 'rb')
@classmethod
def tail(cls, fname, window):
"""Read last N lines from file fname."""
if window <= 0:
raise ValueError('invalid window %r' % window)
f = cls.open(fname)
BUFSIZ = 1024
# open() was overridden and file was opened in text
# mode; read() will return a string instead bytes.
encoded = getattr(f, 'encoding', False)
CR = '\n' if encoded else b'\n'
data = '' if encoded else b''
f.seek(0, os.SEEK_END)
fsize = f.tell()
block = -1
exit = False
while not exit:
step = (block * BUFSIZ)
if abs(step) >= fsize:
f.seek(0)
newdata = f.read(BUFSIZ - (abs(step) - fsize))
exit = True
else:
f.seek(step, os.SEEK_END)
newdata = f.read(BUFSIZ)
data = newdata + data
if data.count(CR) >= window:
break
else:
block -= 1
return data.splitlines()[-window:]
def update_files(self):
ls = []
for name in self.listdir():
absname = os.path.realpath(os.path.join(self.folder, name))
try:
st = os.stat(absname)
except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
else:
if not stat.S_ISREG(st.st_mode):
continue
fid = self.get_file_id(st)
ls.append((fid, absname))
# check existent files
for fid, file in list(self.files_map.items()):
try:
st = os.stat(file.name)
except EnvironmentError as err:
if err.errno == errno.ENOENT:
self.unwatch(file, fid)
else:
raise
else:
if fid != self.get_file_id(st):
# same name but different file (rotation); reload it.
self.unwatch(file, fid)
self.watch(file.name)
# add new ones
for fid, fname in ls:
if fid not in self.files_map:
self.watch(fname)
def readfile(self, file):
lines = file.readlines()
if lines:
self.callback(file.name, lines)
def watch(self, fname):
try:
file = self.open(fname)
fid = self.get_file_id(os.stat(fname))
except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
else:
self.log("watching logfile %s" % fname)
self.files_map[fid] = file
def unwatch(self, file, fid):
# file no longer exists; if it has been renamed
# try to read it for the last time in case the
# log rotator has written something in it.
lines = self.readfile(file)
self.log("un-watching logfile %s" % file.name)
del self.files_map[fid]
if lines:
self.callback(file.name, lines)
@staticmethod
def get_file_id(st):
if os.name == 'posix':
return "%xg%x" % (st.st_dev, st.st_ino)
else:
return "%f" % st.st_ctime
def close(self):
for id, file in self.files_map.items():
file.close()
self.files_map.clear()
# ===================================================================
# --- tests
# ===================================================================
if __name__ == '__main__':
import unittest
import atexit
TESTFN = '$testfile.log'
TESTFN2 = '$testfile2.log'
PY3 = sys.version_info[0] == 3
if PY3:
def b(s):
return s.encode("latin-1")
else:
def b(s):
return s
class TestLogWatcher(unittest.TestCase):
def setUp(self):
def callback(filename, lines):
self.filename.append(filename)
for line in lines:
self.lines.append(line)
self.filename = []
self.lines = []
self.file = open(TESTFN, 'w')
self.watcher = LogWatcher(os.getcwd(), callback)
def tearDown(self):
self.watcher.close()
self.remove_test_files()
def write_file(self, data):
self.file.write(data)
self.file.flush()
@staticmethod
@atexit.register
def remove_test_files():
for x in [TESTFN, TESTFN2]:
try:
os.remove(x)
except EnvironmentError:
pass
def test_no_lines(self):
self.watcher.loop(blocking=False)
def test_one_line(self):
self.write_file('foo')
self.watcher.loop(blocking=False)
self.assertEqual(self.lines, [b"foo"])
self.assertEqual(self.filename, [os.path.abspath(TESTFN)])
def test_two_lines(self):
self.write_file('foo\n')
self.write_file('bar\n')
self.watcher.loop(blocking=False)
self.assertEqual(self.lines, [b"foo\n", b"bar\n"])
self.assertEqual(self.filename, [os.path.abspath(TESTFN)])
def test_new_file(self):
with open(TESTFN2, "w") as f:
f.write("foo")
self.watcher.loop(blocking=False)
self.assertEqual(self.lines, [b"foo"])
self.assertEqual(self.filename, [os.path.abspath(TESTFN2)])
def test_file_removed(self):
# when something is written in a file we're watching and then
# it gets deleted we expect to be able to read what was written
# (log rotation)
self.write_file("foo")
try:
os.remove(TESTFN)
except EnvironmentError: # necessary on Windows
pass
self.watcher.loop(blocking=False)
self.assertEqual(self.lines, [b"foo"])
def test_tail(self):
MAX = 10000
content = '\n'.join([str(x) for x in range(0, MAX)])
self.write_file(content)
# input < BUFSIZ (1 iteration)
lines = self.watcher.tail(self.file.name, 100)
self.assertEqual(len(lines), 100)
self.assertEqual(lines, [b(str(x)) for x in range(MAX-100, MAX)])
# input > BUFSIZ (multiple iterations)
lines = self.watcher.tail(self.file.name, 5000)
self.assertEqual(len(lines), 5000)
self.assertEqual(lines, [b(str(x)) for x in range(MAX-5000, MAX)])
# input > file's total lines
lines = self.watcher.tail(self.file.name, MAX + 9999)
self.assertEqual(len(lines), MAX)
self.assertEqual(lines, [b(str(x)) for x in range(0, MAX)])
#
self.assertRaises(ValueError, self.watcher.tail, self.file.name, 0)
LogWatcher.tail(self.file.name, 10)
def test_ctx_manager(self):
with self.watcher:
pass
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(TestLogWatcher))
unittest.TextTestRunner(verbosity=2).run(test_suite)
Diff to Previous Revision
--- revision 3 2011-12-06 21:10:16
+++ revision 4 2013-03-18 17:59:09
@@ -1,7 +1,8 @@
#!/usr/bin/env python
"""
-Real time log files watcher supporting log rotation.
+Real-time log files watcher supporting log rotation.
+Works with Python >= 2.6 and >= 3.2, on both POSIX and Windows.
Author: Giampaolo Rodola' <g.rodola [AT] gmail [DOT] com>
License: MIT
@@ -11,6 +12,7 @@
import time
import errno
import stat
+import sys
class LogWatcher(object):
@@ -21,10 +23,10 @@
Example:
>>> def callback(filename, lines):
- ... print filename, lines
+ ... print(filename, lines)
...
- >>> l = LogWatcher("/var/log/", callback)
- >>> l.loop()
+ >>> lw = LogWatcher("/var/log/", callback)
+ >>> lw.loop()
"""
def __init__(self, folder, callback, extensions=["log"], tail_lines=0):
@@ -34,8 +36,8 @@
the folder to watch
(callable) @callback:
- a function which is called every time a new line in a
- file being watched is found;
+ a function which is called every time a new line in a
+ file being watched is found;
this is called with "filename" and "lines" arguments.
(list) @extensions:
@@ -48,37 +50,47 @@
self.callback = callback
self.folder = os.path.realpath(folder)
self.extensions = extensions
- assert os.path.isdir(self.folder), "%s does not exists" \
- % self.folder
- assert callable(callback)
+ assert os.path.isdir(self.folder), self.folder
+ assert callable(callback), repr(callback)
self.update_files()
# The first time we run the script we move all file markers at EOF.
# In case of files created afterwards we don't do this.
- for id, file in self.files_map.iteritems():
+ for id, file in self.files_map.items():
file.seek(os.path.getsize(file.name)) # EOF
if tail_lines:
- lines = self.tail(file.name, tail_lines)
- if lines:
- self.callback(file.name, lines)
+ try:
+ lines = self.tail(file.name, tail_lines)
+ except IOError as err:
+ if err.errno != errno.ENOENT:
+ raise
+ else:
+ if lines:
+ self.callback(file.name, lines)
def __del__(self):
self.close()
- def loop(self, interval=0.1, async=False):
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def loop(self, interval=0.1, blocking=True):
"""Start the loop.
- If async is True make one loop then return.
+ If blocking is False make one loop then return.
"""
while 1:
self.update_files()
- for fid, file in list(self.files_map.iteritems()):
+ for fid, file in list(self.files_map.items()):
self.readfile(file)
- if async:
+ if not blocking:
return
time.sleep(interval)
def log(self, line):
"""Log when a file is un/watched"""
- print line
+ print(line)
def listdir(self):
"""List directory and filter files by extension.
@@ -92,36 +104,52 @@
else:
return ls
- @staticmethod
- def tail(fname, window):
+ @classmethod
+ def open(cls, file):
+ """Wrapper around open().
+ By default file is opened in binary mode and readlines()
+ return bytes on both Python 2 and 3.
+ This means callback() will deal with a list of bytes.
+ Can be overridden in order to deal with unicode strings
+ instead with:
+
+ import codecs, locale
+ return codecs.open(file, 'r', encoding=locale.getpreferredencoding(),
+ errors='ignore')
+ """
+ return open(file, 'rb')
+
+ @classmethod
+ def tail(cls, fname, window):
"""Read last N lines from file fname."""
- try:
- f = open(fname, 'r')
- except IOError, err:
- if err.errno == errno.ENOENT:
- return []
+ if window <= 0:
+ raise ValueError('invalid window %r' % window)
+ f = cls.open(fname)
+ BUFSIZ = 1024
+ # open() was overridden and file was opened in text
+ # mode; read() will return a string instead bytes.
+ encoded = getattr(f, 'encoding', False)
+ CR = '\n' if encoded else b'\n'
+ data = '' if encoded else b''
+ f.seek(0, os.SEEK_END)
+ fsize = f.tell()
+ block = -1
+ exit = False
+ while not exit:
+ step = (block * BUFSIZ)
+ if abs(step) >= fsize:
+ f.seek(0)
+ newdata = f.read(BUFSIZ - (abs(step) - fsize))
+ exit = True
else:
- raise
- else:
- BUFSIZ = 1024
- f.seek(0, os.SEEK_END)
- fsize = f.tell()
- block = -1
- data = ""
- exit = False
- while not exit:
- step = (block * BUFSIZ)
- if abs(step) >= fsize:
- f.seek(0)
- exit = True
- else:
- f.seek(step, os.SEEK_END)
- data = f.read().strip()
- if data.count('\n') >= window:
- break
- else:
- block -= 1
- return data.splitlines()[-window:]
+ f.seek(step, os.SEEK_END)
+ newdata = f.read(BUFSIZ)
+ data = newdata + data
+ if data.count(CR) >= window:
+ break
+ else:
+ block -= 1
+ return data.splitlines()[-window:]
def update_files(self):
ls = []
@@ -129,7 +157,7 @@
absname = os.path.realpath(os.path.join(self.folder, name))
try:
st = os.stat(absname)
- except EnvironmentError, err:
+ except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
else:
@@ -139,10 +167,10 @@
ls.append((fid, absname))
# check existent files
- for fid, file in list(self.files_map.iteritems()):
+ for fid, file in list(self.files_map.items()):
try:
st = os.stat(file.name)
- except EnvironmentError, err:
+ except EnvironmentError as err:
if err.errno == errno.ENOENT:
self.unwatch(file, fid)
else:
@@ -165,9 +193,9 @@
def watch(self, fname):
try:
- file = open(fname, "r")
+ file = self.open(fname)
fid = self.get_file_id(os.stat(fname))
- except EnvironmentError, err:
+ except EnvironmentError as err:
if err.errno != errno.ENOENT:
raise
else:
@@ -186,18 +214,126 @@
@staticmethod
def get_file_id(st):
- return "%xg%x" % (st.st_dev, st.st_ino)
+ if os.name == 'posix':
+ return "%xg%x" % (st.st_dev, st.st_ino)
+ else:
+ return "%f" % st.st_ctime
def close(self):
- for id, file in self.files_map.iteritems():
+ for id, file in self.files_map.items():
file.close()
self.files_map.clear()
+# ===================================================================
+# --- tests
+# ===================================================================
+
if __name__ == '__main__':
- def callback(filename, lines):
- for line in lines:
- print line
-
- l = LogWatcher("/var/log/", callback)
- l.loop()
+ import unittest
+ import atexit
+
+ TESTFN = '$testfile.log'
+ TESTFN2 = '$testfile2.log'
+ PY3 = sys.version_info[0] == 3
+
+ if PY3:
+ def b(s):
+ return s.encode("latin-1")
+ else:
+ def b(s):
+ return s
+
+ class TestLogWatcher(unittest.TestCase):
+
+ def setUp(self):
+ def callback(filename, lines):
+ self.filename.append(filename)
+ for line in lines:
+ self.lines.append(line)
+
+ self.filename = []
+ self.lines = []
+ self.file = open(TESTFN, 'w')
+ self.watcher = LogWatcher(os.getcwd(), callback)
+
+ def tearDown(self):
+ self.watcher.close()
+ self.remove_test_files()
+
+ def write_file(self, data):
+ self.file.write(data)
+ self.file.flush()
+
+ @staticmethod
+ @atexit.register
+ def remove_test_files():
+ for x in [TESTFN, TESTFN2]:
+ try:
+ os.remove(x)
+ except EnvironmentError:
+ pass
+
+ def test_no_lines(self):
+ self.watcher.loop(blocking=False)
+
+ def test_one_line(self):
+ self.write_file('foo')
+ self.watcher.loop(blocking=False)
+ self.assertEqual(self.lines, [b"foo"])
+ self.assertEqual(self.filename, [os.path.abspath(TESTFN)])
+
+ def test_two_lines(self):
+ self.write_file('foo\n')
+ self.write_file('bar\n')
+ self.watcher.loop(blocking=False)
+ self.assertEqual(self.lines, [b"foo\n", b"bar\n"])
+ self.assertEqual(self.filename, [os.path.abspath(TESTFN)])
+
+ def test_new_file(self):
+ with open(TESTFN2, "w") as f:
+ f.write("foo")
+ self.watcher.loop(blocking=False)
+ self.assertEqual(self.lines, [b"foo"])
+ self.assertEqual(self.filename, [os.path.abspath(TESTFN2)])
+
+ def test_file_removed(self):
+ # when something is written in a file we're watching and then
+ # it gets deleted we expect to be able to read what was written
+ # (log rotation)
+ self.write_file("foo")
+ try:
+ os.remove(TESTFN)
+ except EnvironmentError: # necessary on Windows
+ pass
+ self.watcher.loop(blocking=False)
+ self.assertEqual(self.lines, [b"foo"])
+
+ def test_tail(self):
+ MAX = 10000
+ content = '\n'.join([str(x) for x in range(0, MAX)])
+ self.write_file(content)
+ # input < BUFSIZ (1 iteration)
+ lines = self.watcher.tail(self.file.name, 100)
+ self.assertEqual(len(lines), 100)
+ self.assertEqual(lines, [b(str(x)) for x in range(MAX-100, MAX)])
+ # input > BUFSIZ (multiple iterations)
+ lines = self.watcher.tail(self.file.name, 5000)
+ self.assertEqual(len(lines), 5000)
+ self.assertEqual(lines, [b(str(x)) for x in range(MAX-5000, MAX)])
+ # input > file's total lines
+ lines = self.watcher.tail(self.file.name, MAX + 9999)
+ self.assertEqual(len(lines), MAX)
+ self.assertEqual(lines, [b(str(x)) for x in range(0, MAX)])
+ #
+ self.assertRaises(ValueError, self.watcher.tail, self.file.name, 0)
+ LogWatcher.tail(self.file.name, 10)
+
+ def test_ctx_manager(self):
+ with self.watcher:
+ pass
+
+
+ test_suite = unittest.TestSuite()
+ test_suite.addTest(unittest.makeSuite(TestLogWatcher))
+ unittest.TextTestRunner(verbosity=2).run(test_suite)