Skip to main content

Streaming validations (beta)

RDFShape API features some tools for validating Kafka streams of RDF data instead of static datasets.

You may find more information and try it out using the following API endpoint .

Notice this is just a beta version exposed in RDFShape API/Clients as a demo

Motivation and tools

The processing of streaming validations is part of one of WESO's student's Master's theis, which involved the development of Comet, a library capable of validating streams of RDF data using SHaclEX under the hood.

For purposes beyond this demo's limitations, we recommend trying Comet out yourself.

Usage

As it is streams we are dealing with, the communication with RDFShape API's is not done through HTTP requests anymore, but through WebSockets .

The workflow (simplified) goes as follows:

  1. The client attempts to open a WebSockets connection with the server.
  2. If the connection attempt succeeds, the server will remain waiting for the client.
  3. The client may then send a WebSockets message in JSON (see Data Model) requesting the server to perform a certain validation on an input RDF data stream.
  4. If the client's request is correct, the server will begin the validation, sending each output back to the client in separate WebSockets messages.

Data model

Request model

For the server to start sending results to a client, it is the latter which has to first send a request to the server.

These requests are JSON-formatted WebSockets messages, telling the server how the validation should be performed, including:

  • configuration: Parent object of the JSON tree.
    • validator: Information on how the Comet's should operate.
      • schema: Schema that the RDF data will be validated against. Formatted as in the rest of API requests.
      • triggerMode: Validation trigger that the RDF data will be validated against. Formatted as in the rest of API requests.
      • haltOnInvalid (Optional): Whether if the streaming validation should stop the moment an incoming item does not validate. Default: false.
      • haltOnErrored (Optional): Whether if the streaming validation should stop the moment an error occurs during a validation, or just ignore it. Default: false.
    • extractor: Information on how Comet's Kafka extractor should operate.
      • data: Object with the properties that incoming RDF data is expected to have.
        • format: Format of the incoming RDF data (Turtle, JSONLD, etc.).
        • inference (Optional): Inference to be applied on the incoming RDF data (RDFS, OWL, etc.). Default: NONE.
    • stream: Information for Comet to consume an incoming Kafka stream.
      • server: Hostname/IP address of the Kafka server streaming RDF data.
      • port (Optional): Port from which the Kafka server is streaming data. Default: 9092.
      • topic: Topic on which the Kafka server is streaming data.
      • groupId (Optional): Group that the Kafka consumer shall identify with. Useful to resume validations where they left off. Default: string with the name of the app: (appName-appVersion).
Example client message requesting a streaming validation
{
"configuration": {
"validator": {
"haltOnInvalid": false,
"haltOnErrored": false,
"schema": {
"content": "PREFIX ex: <http://example.org/>\nPREFIX xsd: <http://www.w3.org/2001/XMLSchema#>\n\n# Filters of a valid sensor reading\nex:ValidReading {\n ex:readingDatetime xsd:dateTime ; # Has a VALID timestamp\n ex:readingTemperature xsd:decimal MININCLUSIVE 18 MAXINCLUSIVE 20 + ; # 1+ readings in range 18-20\n ex:status [ \"OK\" \"RUNNING\" ] # Status must be one of \n}",
"engine": "ShEx",
"format": "ShExC",
"source": "byText"
},
"triggerMode": {
"shape-map": {
"content": "ex:reading@ex:ValidReading",
"format": "Compact",
"source": "byText"
},
"type": "ShapeMap"
}
},
"extractor": {
"data": {
"format": "Turtle",
"inference": "None"
},
},
"stream": {
"server": "localhost",
"port": 9092,
"topic": "rdf",
"groupId": "myGroup"
}
}
}

Response model

Though subject to change, results emitted from RDFShape API to the client have the following structure:

  • type: Metadata telling the client the type of content this message has. The possible values are:
    • result: the message contains a JSON-formatted validation result.
    • error: the message contains a JSON-formatted error
  • content: Contents of the message itself
    • For results, these contents will be the validation results.
    • For errors, these contents will be an error description.
Example server response for a validation result
{
"type": "result",
"content": {
"valid": true,
"status": "valid",
"message": "Data validation was successful",
"instant": "2022-05-05T15:00:57.925050695Z",
"report": {
"valid": true,
"type": "Result",
"message": "Validated",
"shapeMap": [ ... ],
"errors": [],
"nodesPrefixMap": { ... },
"shapesPrefixMap": { ... }
}
}
}

WebSockets stream closure

If the WebSockets client does not disconnect, the streaming validation will keep running unless:

  • An invalid/erroring validation takes places and the validator was configured to stop on these cases.
  • The validator does not receive any data to validate for a certain time period: the WebSockets connection is closed to save resources.

In the event of closure, two WebSocket frames are sent to the client:

  1. A standard WebSocket frame containing JSON-formatted text explaining the error that prompted the connection to close, including:
    • type: will be error.
    • content:
      • message: Simplified error message.
      • reason: Detailed cause of the error, only available when the error cause is not-validating RDF data, in which case the validation report will be included here.
  2. A closing WebSocket frame, with a short description of the closure reason and its corresponding close code:
    • 3000: The client's request did not contain valid JSON data.
    • 3001: The client's request did not contain a valid configuration.
    • 3002: A validation item was invalid.
    • 3003: An error occurred while validating an item.
    • 3004: No items were received for a while.
    • 3005: The configuration contained invalid values.
    • 3006: An invalid value was provided to the server.
    • 3007: An error occurred connecting to the Kafka stream.
    • 4999: Connection closed due to an unknown error.
Example last server response before closure
{
"type": "error",
"content": {
"message": "StreamInvalidItemException - Stream halted because an item was invalid",
"reason": {
"valid": false,
"type": "Result",
"message": "Validated with errors",
"shapeMap": [ ... ],
"errors": [ ... ],
"nodesPrefixMap": { ... },
"shapesPrefixMap": { ... }
}
}
}