Logo Search packages:      
Sourcecode: sabayon version File versions  Download package


#!/usr/bin/env python

# Copyright (C) 2005 Red Hat, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.

import os
import os.path
import gobject
import gamin
import util
import fnmatch


CHANGED = gamin.GAMChanged
DELETED = gamin.GAMDeleted
CREATED = gamin.GAMCreated

def dprint (fmt, *args):
    util.debug_print (util.DEBUG_DIRMONITOR, fmt % args)

def event_to_string (event):
    if event == CHANGED:
        return "changed"
    elif event == DELETED:
        return "deleted"
    elif event == CREATED:
        return "created"
        return "invalid"

class DirectoryMonitor:
    def __init__ (self, directory, callback, data = None):
        self.directory = directory
        self.callback = callback
        self.data = data
      self.mon = None
      self.watches = {}
        self.too_many_watches = False
      self.fd = -1
      self.io_watch = None
        self.dirs_to_ignore = []
        self.files_to_ignore = []

    def set_directories_to_ignore (self, dirs):
        assert self.mon == None
        self.dirs_to_ignore = dirs
        dprint ("Ignoring directories %s", self.dirs_to_ignore)

    def set_files_to_ignore (self, files):
        assert self.mon == None
        self.files_to_ignore = files
        dprint ("Ignoring files %s", self.files_to_ignore)

    # call the user level processing
    def __invoke_callback (self, path, event):
        if self.data:
            self.callback (path, event, self.data)
            self.callback (path, event)

    # Called from the main loop when we have data
    def __pending_data (self, fd, condition):
          ret = self.mon.handle_events ()
          util.print_exception ()
      return True
    # Processing of a gamin callback
    def __handle_gamin_event (self, path, event, monitor_file):
        if event == CHANGED or event == DELETED or event == CREATED:
            if not os.path.isabs (path):
            path = os.path.join (monitor_file, path)

            dprint ("Got gamin event '%s' on '%s'", event_to_string (event), path)

          if event == CREATED and os.path.isdir (path):
            self.__monitor_dir_recurse (path, True)
          elif event == DELETED:
                if path != self.directory and self.watches.has_key (path):
                    del self.watches[path]
                    if len (self.watches) < N_WATCHES_LIMIT:
                        self.too_many_watches = False
                    self.mon.stop_watch (path)

            if not self.__should_ignore_dir (path) and \
               not self.__should_ignore_file (path):
                self.__invoke_callback (path, event)

    def __should_ignore_dir (self, dir):
        dir = os.path.normpath (dir)
        for ignore_dir in self.dirs_to_ignore:
            ignore_path = os.path.normpath (os.path.join (self.directory, ignore_dir))

            if fnmatch.fnmatch (dir, ignore_path):
                dprint ("Ignoring directory '%s'", dir)
                return True

        parent = os.path.dirname (dir)
        if parent != dir:
            return self.__should_ignore_dir (parent)
            return False
    def __should_ignore_file (self, file):
        file = os.path.normpath (file)
        for ignore_file in self.files_to_ignore:
            ignore_path = os.path.normpath (os.path.join (self.directory, ignore_file))
            if fnmatch.fnmatch (file, ignore_path):
                dprint ("Ignoring file '%s'", file)
                return True
        return self.__should_ignore_dir (os.path.dirname (file))

    def __monitor_dir (self, dir):
        if len (self.watches) >= N_WATCHES_LIMIT:
            if not self.too_many_watches:
                print "Too many directories to watch on %s" % (self.directory)
                self.too_many_watches = True

        if self.__should_ignore_dir (dir):
            self.mon.watch_directory (dir, self.__handle_gamin_event, dir)
            print _("Failed to add monitor for %s") % (dir)
          util.print_exception ()

    def __monitor_dir_recurse (self, dir, new_dir = False):
        if self.too_many_watches:
        if dir != self.directory:
            self.__monitor_dir (dir)
        for entry in os.listdir (dir):
            path = os.path.join (dir, entry)
            if self.__should_ignore_dir (path) or \
               self.__should_ignore_file (path):
            if new_dir:
                self.__invoke_callback (path, CREATED)
            if os.path.isdir (path):
                self.__monitor_dir_recurse (path, new_dir)
    def start (self):
        if self.mon != None:

        dprint ("Starting to recursively monitor '%s'", self.directory)

      self.mon = gamin.WatchMonitor ()
      # ignore (End)Exists events since we scan the tree ourselves
          self.mon.no_exists ()
      self.fd = self.mon.get_fd ()
      self.io_watch = gobject.io_add_watch (self.fd,
        self.__monitor_dir (self.directory)
        self.__monitor_dir_recurse (self.directory)

    def stop (self):
        if self.mon == None:

        dprint ("Stopping recursive monitoring of '%s'", self.directory)

        for path in self.watches:
            self.mon.stop_watch (path)
        gobject.source_remove (self.io_watch)
      self.io_watch = 0
      self.mon.disconnect ()
      self.mon = None
      self.fd = -1

def run_unit_tests ():
    import tempfile
    import shutil

    temp_path = tempfile.mkdtemp (prefix = "test-monitor-")

    def handle_change (path, event, data):
        (expected, main_loop) = data
        if len (expected) > 0:
            i = 0
            for (expected_path, expected_event) in expected:
                if expected_path == path and expected_event == event:
                i += 1
            if i < len (expected):
                del expected[i]
        if len (expected) == 0:
            main_loop.quit ()

    def expect (expected, path, event):
        expected.append ((path, event))

    main_loop = gobject.MainLoop ()

    expected = []
    def should_not_be_reached (expected):
        for (path, event) in expected:
            print _("Expected event: %s %s") % (path, event_to_string (event))
        assert False
        return True
    timeout = gobject.timeout_add (5 * 1000, should_not_be_reached, expected)

    monitor = DirectoryMonitor (temp_path, handle_change, (expected, main_loop))
    monitor.set_directories_to_ignore (["bar"])
    monitor.set_files_to_ignore (["foobar/foo/foo.txt"])
    monitor.start ()

    expect (expected, os.path.join (temp_path, "foo.txt"), CREATED)
    f = file (os.path.join (temp_path, "foo.txt"), "w")
    f.close ()

    # ignored
    # expect (expected, os.path.join (temp_path, "bar"), CREATED)
    os.mkdir (os.path.join (temp_path, "bar"))

    expect (expected, os.path.join (temp_path, "foobar"), CREATED)
    expect (expected, os.path.join (temp_path, "foobar/foo"), CREATED)
    expect (expected, os.path.join (temp_path, "foobar/foo/bar"), CREATED)
    expect (expected, os.path.join (temp_path, "foobar/foo/bar/foo"), CREATED)
    expect (expected, os.path.join (temp_path, "foobar/foo/bar/foo/bar"), CREATED)
    os.makedirs (os.path.join (temp_path, "foobar/foo/bar/foo/bar"))

    # ignored:
    # expect (expected, os.path.join (temp_path, "foobar/foo/foo.txt"), CREATED)
    f = file (os.path.join (temp_path, "foobar/foo/foo.txt"), "w")
    f.close ()

    main_loop.run ()
    expect (expected, os.path.join (temp_path, "foobar/foo/bar/foo/bar"), DELETED)
    expect (expected, os.path.join (temp_path, "foobar/foo/bar/foo"), DELETED)
    expect (expected, os.path.join (temp_path, "foobar/foo/bar"), DELETED)
    # ignored:
    # expect (expected, os.path.join (temp_path, "foobar/foo/foo.txt"), DELETED)
    expect (expected, os.path.join (temp_path, "foobar/foo"), DELETED)
    expect (expected, os.path.join (temp_path, "foobar"), DELETED)
    # FIXME: we should be getting this event, but we don't seem to be
    # expect (expected, os.path.join (temp_path, "foo.txt"), DELETED)
    # ignore:
    # expect (expected, os.path.join (temp_path, "bar"), DELETED)
    expect (expected, temp_path, DELETED)

    shutil.rmtree (temp_path, True)

    main_loop.run ()

    gobject.source_remove (timeout)

    monitor.stop ()

Generated by  Doxygen 1.6.0   Back to index