FB_init

Monday, February 03, 2020

Personalizing searches integrating Elasticsearch with Amazon Personalize

  In this post I'll describe a way to personalize Elasticsearch queries integrating it with Amazon Personalize. The main use case is for Elasticsearch to index products for e-commerce searches. Amazon Personalize, as the name implies, is a system that provides "personalization" to users. In summary, Amazon Personalize can return lists of products recommended for a given user. This list can be ranked too.

  Elasticsearch provides the ability for queries to contain weights and boosts. Elasticsearch uses these numbers as multiplying factors when interpreting the score and, consequently, the ranking of the search results.

  Amazon Personalize has the notion of "item". In most cases an "item" is a product. You will need to decide if it is a base product or a variant/SKU, but I won't elaborate on this topic in this post. In the architecture described, we'll have one instance of Amazon Personalize where an item is a product, and another instance where an item is a category. Many times, e-commerce catalogues are "sparse", in the sense that there are many products with considerable rate of product renewal and shoppers don't have many orders in their order history. While the code will only reference one Personalize instance for categories, it can be easily extended to add another instance for brands.

  The code is in Python, and has a dependency on boto3. The code runs intercepting an Elasticsearch query and injecting product ids and category ids with weights and boosts.

  We begin with a high-level function. For categories and for products, we retrieve recommendations, we rank these recommendations and we inject the weights and boosts into the Elasticsearch query:

def search_with_personalization(user, search):
config = configparser.ConfigParser()
config.read('config.conf')
categories = get_category_recommendations(config, user)
ranked_categories = get_category_ranking(config, user, categories)
products = get_product_recommendations(config, user)
ranked_products = get_product_ranking(config, user, products)
query_es(search, ranked_categories, ranked_products)


  How do we know that the recommended products returned by Amazon Personalize will be related to the query? Well, we don't know. We can keep in mind the principle of generality of the query. If the query is general (searching for "electronics"), then personalization can have more "influence". Conversely, if the query is specific (searching for "an iPod with 32 Gb of memory"), then personalization should not have much "influence". The code presented here could be extended such that when there are facets included in the query, then the retrieval of product recommendations from Personalized is skipped, while leaving the calls to retrieve recommendations for categories and brands.

  These are the basic functions that retrieve recommendations from Personalize:

def get_product_recommendations(config, user):
_log_info('Retrieving product recommendations')
return _get_recommendations(user, config['DEFAULT']['product_recommendations_campaignArn'])
def _get_recommendations(user, campaign):
personalize = boto3.client('personalize-runtime', 'us-east-2')
response = personalize.get_recommendations(campaignArn=campaign, userId=user)
answer = []
_log_info('Recommended items:')
for item in response['itemList']:
item_id = item['itemId']
answer.append(item_id)
_log_info('itemId:' + item_id)
return answer
  These are our functions that retrieve the ranking from Personalize:

def get_product_ranking(config, user, input_list):
_log_info('Retrieving product ranking')
answer = _get_ranking(config['DEFAULT']['product_rankings_campaignArn'], user, input_list,
float(config['DEFAULT']['product_ranking_start']),
float(config['DEFAULT']['product_ranking_steps_down']))
return answer
def _get_ranking(campaign, user, input_list, start, steps_down):
personalize = boto3.client('personalize-runtime', 'us-east-2')
response = personalize.get_personalized_ranking(campaignArn=campaign, userId=user, inputList=input_list)
answer = {}
i = 0
_log_info('Ranked items:')
for item in response['personalizedRanking']:
k = start - (i*steps_down)
answer[item['itemId']] = k
_log_info('Ranking ' + item['itemId'] + ' to ' + str(k))
i = i+1
return answer
  Another detail of the architecture is that there are two campaigns for products and two campaigns for categories. The first campaign has recipe "hrnn" (for recommendations) and the second campaign has recipe "rank" (for ranking).

  The code assigns weights to products in descending order, based on the ranking returned by Personalize. The initial weight and the "step" can be tuned according to the data.

  The most important function is the one that injects the boost and weight values into the Elasticsearch query.

def query_es(text_search, category_boost_pairs, product_id_weight_pairs):
client = Elasticsearch()
the_body = {
"query": {
"function_score": {
"query": {
"bool": {
"should": arrange_json_array(
transform_category_boost(category_boost_pairs), {
"match": {
"keywords": text_search
}
})
}
},
"boost": "5", # Configurable
"functions": transform_product_id_weights(product_id_weight_pairs),
"score_mode": "max",
"boost_mode": "multiply"
}
}
}
response = client.search(
index="bank",
body=the_body
)
if response is None or response['hits'] is None or response['hits']['hits'] is None\
or len(response['hits']['hits']) == 0:
_log_info('No search results.')
else:
_log_info('There are ' + str(len(response['hits']['hits'])) + ' search results.')
i = 1
for hit in response['hits']['hits']:
_log_info('#' + str(i) + ': Score:' + str(hit['_score']) + ', search result:' + str(hit['_source']))
i = i + 1

This is the complete source code with sample configuration:

[DEFAULT]
product_ranking_start = 10
product_ranking_steps_down = 0.2
cat_ranking_start = 10
cat_ranking_steps_down = 0.2
product_recommendations_campaignArn=arn:aws:personalize:us-east-2:11123456:campaign/es-test03-hrnn
product_rankings_campaignArn=arn:aws:personalize:us-east-2:222789:campaign/es-test03-rank
property1_recommendations_campaignArn=arn:aws:personalize:us-east-2:3333456:campaign/es-test03-cat-hrnn
property1_rankings_campaignArn=arn:aws:personalize:us-east-2:444789:campaign/es-test03-cat-rank
view raw config.conf hosted with ❤ by GitHub
import boto3
from elasticsearch import Elasticsearch
import sys
import getopt
import configparser
import os
os.environ['AWS_PROFILE'] = "default"
os.environ['AWS_DEFAULT_REGION'] = "us-east-2"
# Call with arguments -u <user> -s <search_text>
def main(argv):
opts, args = getopt.getopt(argv, "u:s:")
user = "_"
search = "_"
for opt, arg in opts:
if opt == '-u':
user = arg
if opt == '-s':
search = arg
search_with_personalization(user, search)
def search_with_personalization(user, search):
config = configparser.ConfigParser()
config.read('config.conf')
categories = get_category_recommendations(config, user)
ranked_categories = get_category_ranking(config, user, categories)
products = get_product_recommendations(config, user)
ranked_products = get_product_ranking(config, user, products)
query_es(search, ranked_categories, ranked_products)
def _log_info(msg):
print(msg)
def get_product_recommendations(config, user):
_log_info('Retrieving product recommendations')
return _get_recommendations(user, config['DEFAULT']['product_recommendations_campaignArn'])
def get_category_recommendations(config, user):
_log_info('Retrieving category (category) recommendations')
return _get_recommendations(user, config['DEFAULT']['category_recommendations_campaignArn'])
def _get_recommendations(user, campaign):
personalize = boto3.client('personalize-runtime', 'us-east-2')
response = personalize.get_recommendations(campaignArn=campaign, userId=user)
answer = []
_log_info('Recommended items:')
for item in response['itemList']:
item_id = item['itemId']
answer.append(item_id)
_log_info('itemId:' + item_id)
return answer
def get_product_ranking(config, user, input_list):
_log_info('Retrieving product ranking')
answer = _get_ranking(config['DEFAULT']['product_rankings_campaignArn'], user, input_list,
float(config['DEFAULT']['product_ranking_start']),
float(config['DEFAULT']['product_ranking_steps_down']))
return answer
def get_category_ranking(config, user, input_list):
_log_info('Retrieving category (category) ranking')
answer = _get_ranking(config['DEFAULT']['category_rankings_campaignArn'], user, input_list,
float(config['DEFAULT']['cat_ranking_start']), int(config['DEFAULT']['cat_ranking_steps_down']))
return answer
def _get_ranking(campaign, user, input_list, start, steps_down):
personalize = boto3.client('personalize-runtime', 'us-east-2')
response = personalize.get_personalized_ranking(campaignArn=campaign, userId=user, inputList=input_list)
answer = {}
i = 0
_log_info('Ranked items:')
for item in response['personalizedRanking']:
k = start - (i*steps_down)
answer[item['itemId']] = k
_log_info('Ranking ' + item['itemId'] + ' to ' + str(k))
i = i+1
return answer
def transform_category_boost(category_boost_pairs):
root = []
for key in category_boost_pairs.keys():
root.append({"match": {
"category": {
"query": key,
"boost": category_boost_pairs[key]
}
}})
return root
def transform_product_id_weights(product_id_weight_pairs):
root = []
for key in product_id_weight_pairs.keys():
root.append(
{
"filter": {
"ids": {
"values": [
key
]
}
},
"weight": product_id_weight_pairs[key]
}
)
return root
def arrange_json_array(a, b):
a.append(b)
return a
def query_es(text_search, category_boost_pairs, product_id_weight_pairs):
client = Elasticsearch()
the_body = {
"query": {
"function_score": {
"query": {
"bool": {
"should": arrange_json_array(
transform_category_boost(category_boost_pairs), {
"match": {
"keywords": text_search
}
})
}
},
"boost": "5",
"functions": transform_product_id_weights(product_id_weight_pairs),
"score_mode": "max",
"boost_mode": "multiply"
}
}
}
response = client.search(
index="bank",
body=the_body
)
if response is None or response['hits'] is None or response['hits']['hits'] is None\
or len(response['hits']['hits']) == 0:
_log_info('No search results.')
else:
_log_info('There are ' + str(len(response['hits']['hits'])) + ' search results.')
i = 1
for hit in response['hits']['hits']:
_log_info('#' + str(i) + ': Score:' + str(hit['_score']) + ', search result:' + str(hit['_source']))
i = i + 1
if __name__ == "__main__":
main(sys.argv[1:])
  Disclaimers:
  The code above is provided as-is. The author assumes no responsibility for the misuse of the code.
  The code above was created by the author for Pivotree, while an employee of Pivotree. The blog post and code fragments are shared here publicly with permission.