[rtems-central commit] specverify: Add SpecVerify.verify_all()

Sebastian Huber sebh at rtems.org
Fri Oct 23 10:55:13 UTC 2020


Module:    rtems-central
Branch:    master
Commit:    035590161de68f0d22499f915738f242d49ce7aa
Changeset: http://git.rtems.org/rtems-central/commit/?id=035590161de68f0d22499f915738f242d49ce7aa

Author:    Sebastian Huber <sebastian.huber at embedded-brains.de>
Date:      Fri Oct 23 12:50:24 2020 +0200

specverify: Add SpecVerify.verify_all()

Introduce a VerifyStatus tuple with an aggregated verify status.

---

 rtemsspec/specverify.py            | 84 ++++++++++++++++++++++++++++++--------
 rtemsspec/tests/test_specverify.py |  7 +++-
 2 files changed, 72 insertions(+), 19 deletions(-)

diff --git a/rtemsspec/specverify.py b/rtemsspec/specverify.py
index c200a06..cec03c6 100644
--- a/rtemsspec/specverify.py
+++ b/rtemsspec/specverify.py
@@ -24,15 +24,44 @@
 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 # POSSIBILITY OF SUCH DAMAGE.
 
+from contextlib import contextmanager
 import logging
 import re
-from typing import Any, Dict, List, NamedTuple, Set
+from typing import Any, Dict, Iterator, List, NamedTuple, Set
 
 from rtemsspec.items import Item, ItemCache
 
 _VerifierMap = Dict[str, "_Verifier"]
 
 
+class VerifyStatus(NamedTuple):
+    """ This tuple provides the verify message counts by category. """
+    critical: int
+    error: int
+    warning: int
+    info: int
+    debug: int
+
+
+class _Filter(logging.Filter):
+    def __init__(self):
+        super().__init__()
+        self._counts = {}  # type: Dict[int, int]
+
+    def filter(self, record: logging.LogRecord) -> int:
+        count = self._counts.get(record.levelno, 0)
+        self._counts[record.levelno] = count + 1
+        return 1
+
+    def get_verify_info(self) -> VerifyStatus:
+        """ Returns the gathered verify information. """
+        return VerifyStatus(self._counts.get(logging.CRITICAL, 0),
+                            self._counts.get(logging.ERROR, 0),
+                            self._counts.get(logging.WARNING, 0),
+                            self._counts.get(logging.INFO, 0),
+                            self._counts.get(logging.DEBUG, 0))
+
+
 def _type_name(value: Any):
     type_name = type(value).__name__
     if type_name == "NoneType":
@@ -449,6 +478,15 @@ def _gather_item_verifiers(item: Item, verifier_map: _VerifierMap) -> None:
             _create_verifier(link.item, verifier_map)
 
 
+ at contextmanager
+def _add_filter() -> Iterator[_Filter]:
+    logger = logging.getLogger()
+    log_filter = _Filter()
+    logger.addFilter(log_filter)
+    yield log_filter
+    logger.removeFilter(log_filter)
+
+
 class SpecVerifier:
     """ Verifies items according to the specification of the specification. """
 
@@ -465,21 +503,30 @@ class SpecVerifier:
         try:
             root_item = item_cache[root_uid]
         except KeyError:
-            logging.error("root type item does not exist in item cache")
-            return
-        self._root_verifier = _create_verifier(root_item, verifier_map)
-        _gather_item_verifiers(root_item, verifier_map)
-        for name in sorted(verifier_map):
-            logging.info("type: %s", name)
-            verifier_map[name].resolve_type_refinements()
-        logging.info("start specification item verification")
-        for key in sorted(item_cache.all):
-            item = item_cache[key]
-            self._root_verifier.verify(_Path(item, f"{item.uid}:"), item.data)
-        logging.info("finished specification item verification")
-
-
-def verify(config: dict, item_cache: ItemCache) -> None:
+            self._root_verifier = None
+        else:
+            self._root_verifier = _create_verifier(root_item, verifier_map)
+            _gather_item_verifiers(root_item, verifier_map)
+            for name in sorted(verifier_map):
+                logging.info("type: %s", name)
+                verifier_map[name].resolve_type_refinements()
+
+    def verify_all(self, item_cache: ItemCache) -> VerifyStatus:
+        """ Verifies all items of the cache. """
+        with _add_filter() as log_filter:
+            if self._root_verifier is None:
+                logging.error("root type item does not exist in item cache")
+            else:
+                logging.info("start specification item verification")
+                for key in sorted(item_cache.all):
+                    item = item_cache[key]
+                    self._root_verifier.verify(_Path(item, f"{item.uid}:"),
+                                               item.data)
+                logging.info("finished specification item verification")
+        return log_filter.get_verify_info()
+
+
+def verify(config: dict, item_cache: ItemCache) -> VerifyStatus:
     """
     Verifies specification items according to the configuration.
 
@@ -490,5 +537,6 @@ def verify(config: dict, item_cache: ItemCache) -> None:
         root_uid = config["root-type"]
     except KeyError:
         logging.error("configuration has no root type")
-        return
-    SpecVerifier(item_cache, root_uid)
+        return VerifyStatus(0, 1, 0, 0, 0)
+    verifier = SpecVerifier(item_cache, root_uid)
+    return verifier.verify_all(item_cache)
diff --git a/rtemsspec/tests/test_specverify.py b/rtemsspec/tests/test_specverify.py
index c382820..9b70aaa 100644
--- a/rtemsspec/tests/test_specverify.py
+++ b/rtemsspec/tests/test_specverify.py
@@ -38,9 +38,14 @@ def test_no_root_type(caplog, tmpdir):
     item_cache = ItemCache(item_cache_config)
     config = {}
     caplog.set_level(logging.INFO)
-    verify(config, item_cache)
+    info = verify(config, item_cache)
     assert get_and_clear_log(
         caplog) == """ERROR configuration has no root type"""
+    assert info.critical == 0
+    assert info.error == 1
+    assert info.warning == 0
+    assert info.info == 0
+    assert info.debug == 0
 
 
 def test_no_root_item(caplog, tmpdir):



More information about the vc mailing list