Skip to main content
RichRelevance

Streaming Ingest Service

Introduction 

The streaming ingest service is responsible for streaming new, updated and deleted items which  are sent to the streaming catalog's engine to be consumed by Rich Relevance applications.

Items sent through the streaming ingest are validated for proper JSONL and format and then sent to the streaming engine. The streaming engine validates, transforms data and compares the data to the streaming view store and only "deltas" are sent to the streaming engine's "out" Kafka topic as well as to the streaming-view store. Algonomy applications, such as Find and the Legacy Catalog will consume the items from the "out" Kafka topic. 

 

There are two consumers of engine out's streamed items.  

1. Find

The engine "out" data is replicated to the front end data centers in order that Find can index the streaming information in near real time. "Complete" snapshots are indexed in a "cutover Find index" while "active snapshots are indexed in the production Find index. The Find API only references the production Find index. Find does not consume items from a "creating" snapshot.

2.  Native Catalog Adapter (Recommend and Discover)

The Native catalog adaptor consumes items from "complete" and "active" snapshots only and sends them to the Postgres Catalog. Before Recommend or Discover can leverage these updates, it is still necessary to run jobs on a regular time interval. The legacy catalog adaptor does not consume items from a "creating" snapshot.

 

Streaming view store

The streaming-engine sends items from "creating", "complete" and "active" snapshots to the streaming-view store. The streaming view store allows customers to view their data and see that it is correct. 

Streaming Ingest Limits:

  • Each payload should be compressed using either gzip or zstd (Zstandard).
  • Limit for each payload is 5MB compressed. 
  • An individual item cannot be larger than 1 million bytes.  
  • Each payload internally is broken up into "batches". Each batch is between 1- 1.5 MB. You will see the number of "batches" on the status service.  

Streaming Ingest Order

Find requires that categories and regions are ingested before products.  

Pre-requisites for the Streaming Ingest Service

Before using the Streaming-ingest service to stream catalog items, the following must be completed:

  1. Create and Publish Property Definition Collections:
    1. Create product and category property definition collections
      • Both are required for the "product" snapshot. 
      • Find requirement: Search Attributes for properties
        • The product property definition collection should also have the search attributes (storable, searchable etc) setup. The streaming catalog does not use the portal's search attribute configuration.
    2.   Create a region property definition collection
      • Required for the place snapshot.
      • NOTE: Place snapshot is required even if there are no regions.
    3. If the property collection's state is "creating", the definitions can be updated. 
    4. Publish all the property definition collections.  
      • Once published, property definitions cannot be modified but new property definitions can be added.
      • Note: If a change is required, a new property definition collection can be created or an existing one can be cloned.
  2. Create a Product Snapshot - "creating" state
    • Requires both a "published" product and category property definition collection.
  3. Create a Place Snapshot (regions)
    • Requires a "published" region property definition collection.
    • Update the Place Snapshot to "active"
    • Place snapshot is required even if there are no regions. 

Ready to Ingest Catalog Items 

A catalog is composed of items, of which each has its own unique identity. Each item id must be unique for the ItemType within a site (apiKey).

The item types currently supported are “product”, “category”,  “region” and "referenceContent". An item has a set of properties which are set in the respective property definition collections and associated to the snapshot. 

As defined in the property definition collection, some properties may be overridden within an item. For example it is common to have an over-ride for region where price may differ or to support descriptions in multiple languages.  

There are three snapshot types:

  • Product - requires the product and category product definition collections
  • Place - requires the region property definition collection
  • ReferenceContent - requires the referenceContent property definition collection.
  1. It is possible to ingest items to a snapshot when it is in any state (creating, complete or active). However, the applications' actions (Find, Recommend) depend on the state of the snapshot.  
  2. A snapshot for “product” is created and associated to product and category property definition collections. These collections must be in the "published" state. The snapshot’s state is “creating”. 
    • FIND: Does not accept items for a snapshot in the "creating" state.
      • For a "complete" snapshot, the backup Find index called “Cutover Snapshot Find Index” will be used and not the current production Find index.  
      • For an "active" snapshot, the "Cutover Snapshot Find Index" will replace the "Find Production Index".  
    • Recommend: Legacy Catalog Adaptor will not accept any items from a "creating" snapshot. Requires either a "complete" or "active" snapshot. 
  3. The "creating" snapshot is used for testing.
    • Once testing is complete, it is necessary to create a new snapshot and "complete" it before ingesting data. Data ingested into a "creating" snapshot will be never be ingested into Find or the Legacy Catalog. 
  •  Ingest items using the streaming-ingest service
    • Note: The maximum size for each payload is 5 MB. It is recommended to compress each payload using gzip or zstd
  • Check on status of the ingestion using the streaming-status service.
  • View items that were transformed and validated in the streaming-view store.  

Streaming-Ingest Services

The streaming-ingest service requires a JSONL payload that defines all the items. Each line in the JSONL payload is one item. Details on creating this payload are discussed later in this document.

Hosts:

Staging

https://staging-gateway.richrelevance.com/

QA

https://qa-gateway.richrelevance.com/

Production

https://gateway.richrelevance.com/

 

BaseURL: https://<host>/streaming-ingest/v1/

Full URL: https://<host>/streaming-ingest/v1/<apiKey>/<itemType>?snapshotId=<snapshotId>

 

 There are three ways to ingest items for a catalog. An itemType refers to product, region, category or referenceContent.

  1. One itemType only in the JSONL payload
  2. Mixed itemTypes in one JSONL payload
  3. One item only

One itemType JSONL

Action Request Syntax

Ingest new items. PUT has an action of Replace.

  • Expects a full item.
  • Accepts the full item on the Catalog Ingest API
  • Computes the deltas and only sends the deltas to engine.out.
  • When updating an existing item, properties will be deleted if not on the incoming full item.

PUT {baseURL}/<apiKey>/<itemType>?snapshotId=<snapshotId>Request Body:

{"itemId1" : {...}}\n
{"itemId1" : {...}}\n
{"itemId1" : {...}}\n

Update items. PATCH has an action of Update.

  • Accepts a partial item on the Catalog Ingest API
  • Computes the deltas and only sends the deltas to the engine.out.

 

PATCH {baseURL}/<apiKey>/<itemType>?snapshotId=<snapshotId>

Request Body:

{"itemId1" : {...}}\n
{"itemId1" : {...}}\n
{"itemId1" : {...}}\n
Delete a specific item 

DELETE {baseURL}/<apiKey>/<itemType>/<itemId>?snapshotId=<snapshotId>

Delete a list of items

 

Just need to provide a list of itemIDs. 

Curly brackets are NOT required.

 

DELETE {baseURL}/<apiKey>/<itemType>?snapshotId=<snapshotId>

Request Body:

"itemId1"\n
"itemId1"\n
"itemId1"\n

Mixed item types in the JSONL Payload

 

Action Request Syntax

Ingest new items (PUT) or Update (Patch).

 

 

 

PUT/PATCH {baseURL}/<apiKey>/<itemType>?snapshotId=<snapshotId>

Request Body:

{"itemType": {"itemId1" : {...}}}\n
{"itemType": {"itemId1" : {...}}}\n
{"itemType": {"itemId1" : {...}}}\n

 

Single item in the JSONL Payload

 

Action Request Syntax

Ingest one new item (PUT) or update one item (Patch).

 

 

 

PUT/PATCH {baseURL}/<apiKey>/<itemType>?snapshotId=<snapshotId>

Request Body:

{"itemType": {"itemId1" : {...}}}\n

Parameters

Name Description Additional Information and Options

<apiKey>

 

 

 

Unique identifier for a customer’s environment. For example, if a customer has multiple environments in production or staging, each would have a unique apiKey. There can be many apiKeys associated to a client_ID

 

Provided by RR to customer
<snapshotId> Unique identifier for the snapshot. Snapshot ID is provided in the response when creating the snapshot (POST)
<itemType>

Unique identifier for the item. It must be unique for the itemType in the apiKey (SiteID)

String. 255 character limit.     

 
force Force the catalog ingest to send the changes to the engine.out

false

Default=false

Note: Setting 'True' for this parameter is no longer supported as of May 2023. To use this feature, please contact Algonomy support.

timestamp Timestamp is in the response to submitting the catalog ingest jsonl.  

Example:

"timestamp": "2019-06-10T18:29:22.128Z",

trackingId trackingId is in the response to submitting the catalog ingest jsonl. It can be used to track the status of the job.

Example:

 

"trackingId": "aab7b5a0-8bad-11e9-a345-d184f51ec4d3"

 

Parameters for the JSONL payload

Name Description Syntax

properties

 

 

 

"properties" is a container of key-value pairs. The key is a property name that matches a property definition id. The property  conforms to the property definition, including type, and format.

The properties container is used for all properties except for the identity. The identity is a key to the object within JSON, as described above. That way, common tools can perform differencing, override and fallback, and validation actions.

"

"properties": {
   "propertyName": "propertyValue",
   "propertyListName": ["value1", "value2"]
 },

 

associations

An item may reference other items. Those references go in the "associations" section. Associations must include the item type as well as an association name, to support more than one association for the same item type.

For example, a product may reference a list of categories, or an engage content may reference a list of tags.

"associations" : {
     "itemType1" : {
     "assoc11" : ["id111", "id112"],
     "assoc12" : ["id121", "id122"]
 },
     "itemType2" : {
     "assoc21" : ["id211", "id221"]
     }
 }
overrides

An override is a collection of properties that override base item properties when the override condition is present.

The overrideType may have specific rules and restrictions that are evaluated before usage. This may be after ingestion. For example, the language override limits its override name to one of the supported ISO language codes, and includes properties that are language specific versions of the base item properties.

Any override may include nested overrides. For example "sku" overrides may include nested "language" overrides. The most nested override value is the one that would be used.

A precedence is needed in the general case.

This example structure is ambiguous for override types "A" and "B". Which value is used: valueAB or valueBA?

A
  B
    name:valueAB

B
  A
   name:valueBA

"overrides" : {
 "overrideType" : {
    "overrideName" : {
     "properties": {
     "propertyName": "overridePropertyValue",
     ...
  },
    "overrides" : {
     "nestedOverrideType" : {
     "nestedOverrideName" : {
     ...
     }
 }
     

 

Examples 

JSONL payload - product

Here is an example of all the parts for a  product item

{
  "product": {
    "prod123456": {
      "properties": {
        "name": "Pokemon T-Shirt",
        "price": 400,
        "color": "black"
      },
      "associations": {
        "brand": [
          "nintendo"
        ],
        "category": {
          "categories": [
            "shirt",
            "seasonal"
          ]
        }
      },
      "overrides": {
        "region": {
          "store123": {
            "properties": {
              "price": 1000
            }
          }
        },
        "language": {
          "fr-FR": {
            "properties": {
              "color": "noir"
            }
          }
        },
          "1234b": {
            "properties": {
              "imageUrl": "b.jpg"
            },
            "overrides" : {
             "language": {
              "fr-FR": {
                "properties": {
                  "name": "T-Shirt Pokemon blanc",
                  "color": "blanc"
                }
              },
              "en-US": {
                "properties": {
                  "name": "...",
                  "color": "..."
                }
              }
             }
           }
         }
        }
      }
    }
  }
}
Ingestion of SKU Attributes via Streaming-ingest Service

SKU Attributes are handled differently from normal product attributes in the Streaming API. If you plan to use the SKU feature, please ensure that you have added SKU tags to your instrumentation. Additionally, there are two prerequisites to follow before ingesting SKU data with streaming:

1. Header Section with "skuFacets" Information

Include a header section that provides "skuFacets" information tagging the properties you want to consider as SKU properties. These properties should already be defined in the propertyDefinition section. For more information, refer to the "Adding SKU as New Attributes" section. For example, if you want to specify color and size as SKU attributes, then you need to define the header as follows:

{"headers" : {"skuFacets" : ["color","size"]}

2. Overrides Section

Add a nested section called "overrides" with the sku property values that correspond to the SKU attributes defined in the header section. For example:

"overrides": {"sku": {"123": {"properties": {"color":"Charcoal","size":"M"}

Here is an example of a product payload with both headers and the override section:

  {
  "product-2600": {
    "headers": {
      "skuFacets": ["color","size","gender"]
    },
    "overrides": {
      "sku": {
        "sku1234": {
          "properties": {
            "color": "Red",
            "gender": "Girls",
            "size": "13M"
          }
        },
        "sku4567": {
          "properties": {
            "color": "Green",
            "gender": "boys",
            "size": "10M"
          }
        }
      }
    }
  }
}


3. Deleting or Removing an SKU Attribute:

To delete or remove an SKU attribute from a product, remove the attribute from both the SKU facet header and the subsequent overrides section. For example, to remove the ‘gender’ attribute from the collection of SKU attributes (from the above example), use the following ingest payload:

 {
  "product-2600": {
    "headers": {
      "skuFacets": ["color","size"]
    },
    "overrides": {
      "sku": {
        "sku1234": {
          "properties": {
            "color": "Red",
            "size": "13M"
          }
        },
        "sku4567": {
          "properties": {
            "color": "Green",
            "size": "10M"
          }
        }
      }
    }
  }
}

4. Removing a skuID4567 from the Product:

If the intention is to remove a skuID4567 from the product, use the example payload:

{
  "product-2600": {
    "headers": {
      "skuFacets": ["color","size","gender"]
    },
    "overrides": {
      "sku": {
        "sku1234": {
          "properties": {
            "color": "Red",
            "gender": "Girls",
            "size": "13M"
          }
        },
        "sku4567": null
      }
    }
  }
}
Notes
  • SKU updates are only supported with the PUT method. In other words, the platform only supports replacing the existing SKU attribute values with incoming SKU attributes. Attempting to ingest SKU attributes via the PATCH method will result in an invalid item response from the Streaming API.
  • All properties defined in the "skuFacet" header should have corresponding values specified under the overrides section. For example, in the above example, if an update is sent without a size value in the overrides section then that payload will be rejected as invalid. The attributes defined in the "skuFacet" headers should match the attributes specified in the overrides section.
Adding a New Item Using the Streaming-ingest Service

To add a new item using the streaming-ingest service, follow the example below. The response and payload are provided for reference.

apiKey=1234

snapshotId=197

PUT    https://gateway.richrelevance.com/streaming-ingest/v1/1234/category?snapshotId=197

Body:

{"style_731":{"properties":{"name":"Enfärgade T-shirts"},

"overrides":{"language":

{"da-DK":{"properties":{"name":"Ensfarvede T-shirts"}},

"no-NO":{"properties":{"name":"Enfargede t-skjorter"}},

"nl-NL":{"properties":{"name":"EffenT-shirts"}},

"sv-SE":{"properties":{"name":"Enfärgade T-shirts"}},

"fi-FI":{"properties":{"name":"Yksiväriset t-paidat"}},

"en-US":{"properties":{"name":"Single colored t-shirts"}},

"de-DE":{"properties":{"name":"T-Shirts unifarben"}}}},

"associations":{"category":{"parent":["tshirt_group_83"]}}}}

 

Response

status code 202

body:

{

"timestamp": "2019-06-10T18:29:22.128Z",

"trackingId": "aab7b5a0-8bad-11e9-a345-d184f51ec4d3"

}

  • Was this article helpful?