Skip to content

Commit

Permalink
Add support for local channel in the monitoring feature.
Browse files Browse the repository at this point in the history
This CL adds channel column to monitoring BigQuery tables and makes uploader record results with channel.
The completion email contains results table for Local Inventory if LIA feature is enabled.

PiperOrigin-RevId: 434674646
  • Loading branch information
rihito0907 authored and starmandeluxe committed Mar 17, 2022
1 parent 446c050 commit bd6c670
Show file tree
Hide file tree
Showing 28 changed files with 608 additions and 297 deletions.
4 changes: 2 additions & 2 deletions appengine/mailer/app.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ elif [[ $1 == 'test' ]]; then
python -m runner /usr/lib/google-cloud-sdk/
elif [[ $1 = 'prod' ]]; then
if [[ "$#" -le 3 ]]; then
echo "Project ID, Pubsub Token, and Email To Address is required when deploying to prod."
echo "Project ID, Cloud Pub/Sub Token, email address and a flag for enabling Local Inventory are required when deploying to prod."
else
pip install -U setuptools
pip install -r "$CURRENT_DIRECTORY"/requirements.txt -t "$CURRENT_DIRECTORY"/lib
sed -e "s/<PROJECT_ID>/$2/g; s/<PUBSUB_TOKEN>/$3/g; s/<EMAIL_TO>/$4/g" \
sed -e "s/<PUBSUB_TOKEN>/$3/g; s/<EMAIL_TO>/$4/g; s/<USE_LOCAL_INVENTORY_ADS>/$5/g" \
"$CURRENT_DIRECTORY"/app_template.yaml > "$CURRENT_DIRECTORY"/app.yaml
gcloud beta app deploy "$CURRENT_DIRECTORY"/app.yaml \
--project "$2" --quiet \
Expand Down
1 change: 1 addition & 0 deletions appengine/mailer/app_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ libraries:
env_variables:
PUBSUB_VERIFICATION_TOKEN: "<PUBSUB_TOKEN>"
EMAIL_TO: "<EMAIL_TO>"
USE_LOCAL_INVENTORY_ADS: "<USE_LOCAL_INVENTORY_ADS>"
33 changes: 30 additions & 3 deletions appengine/mailer/completion_mail.html
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,32 @@
<td valign="top" align="left" style="font-family:Roboto, Helvetica, Arial sans-serif; font-size: 14px; line-height:24px; color: #414347; padding:20px 40px 20px 40px;">
<p>Processing of the Shopping Feed upload to Content API has completed
on {{ fullTimestamp }} for your project "{{ projectId }}".</p>
<h3>Run Results:</h3>

<h3>Run Results for Online Shopping:</h3>
<table border="1" align="center" width="100%" style="border-collapse: collapse; table-layout: fixed">
<tr align="center" valign="top">
<th></th>
<th colspan="3">Content API Result</th>
</tr>
<tr align="center" valign="top">
<th></th>
<th>Succeeded</th>
<th>Failed</th>
<th>Missing Merchant ID <small>(not processed)</small></th>
</tr>
{% for result in runResults['online'] %}
<tr align="center">
<td><i>{{ result.description }}</i></td>
<td>{{ result.success_count }}</td>
<td>{{ result.failure_count }}</td>
<td>{{ result.skipped_count }}</td>
</tr>
{% endfor %}
</table>
<h3>Products Processed: {{ totalItemsProcessed['online'] }}</h3>

{% if useLocalInventoryAds %}
<h3>Run Results for Local Inventory:</h3>
<table border="1" align="center" width="100%" style="border-collapse: collapse; table-layout: fixed">
<tr align="center" valign="top">
<th></th>
Expand All @@ -238,7 +263,7 @@ <h3>Run Results:</h3>
<th>Failed</th>
<th>Missing Merchant ID <small>(not processed)</small></th>
</tr>
{% for result in runResults %}
{% for result in runResults['local'] %}
<tr align="center">
<td><i>{{ result.description }}</i></td>
<td>{{ result.success_count }}</td>
Expand All @@ -247,7 +272,9 @@ <h3>Run Results:</h3>
</tr>
{% endfor %}
</table>
<h3>Products Processed: {{ totalItemsProcessed }}</h3>
<h3>Products Processed: {{ totalItemsProcessed['local'] }}</h3>
{% endif %}

<p>For more details on the results of this run or for more information about failures, please click on the link below to be
directed to the Google Cloud Console logs.</p>
</td>
Expand Down
86 changes: 62 additions & 24 deletions appengine/mailer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

app = flask.Flask(__name__)

_PROJECT_ID = app_identity.get_application_id()

_JAPAN_TIMEZONE = 'Asia/Tokyo'

_CONTENT_API_OPERATION_UPSERT = 'upsert'
Expand All @@ -45,9 +43,13 @@
_CONTENT_API_OPERATION_PREVENT_EXPIRING: 'Expiration dates extended'
}

_OPERATIONS = (_CONTENT_API_OPERATION_UPSERT, _CONTENT_API_OPERATION_DELETE,
_CONTENT_API_OPERATION_PREVENT_EXPIRING)

# Get environment variables
_PUBSUB_VERIFICATION_TOKEN = os.getenv('PUBSUB_VERIFICATION_TOKEN')
_EMAIL_TO = os.getenv('EMAIL_TO')
_PUBSUB_VERIFICATION_TOKEN = 'PUBSUB_VERIFICATION_TOKEN'
_EMAIL_TO = 'EMAIL_TO'
_USE_LOCAL_INVENTORY_ADS = 'USE_LOCAL_INVENTORY_ADS'


@app.route('/health', methods=['GET'])
Expand All @@ -58,7 +60,8 @@ def start():
@app.route('/pubsub/push', methods=['POST'])
def pubsub_push():
"""Validates the request came from pubsub and sends the completion email."""
if flask.request.args.get('token') != _PUBSUB_VERIFICATION_TOKEN:
if flask.request.args.get('token') != _load_environment_variable(
_PUBSUB_VERIFICATION_TOKEN):
return 'Unauthorized', httplib.UNAUTHORIZED
request_body = json.loads(flask.request.data.decode('utf-8'))
try:
Expand All @@ -68,8 +71,10 @@ def pubsub_push():
return 'Invalid request body', httplib.BAD_REQUEST
run_results = _get_run_result_list(run_results_dict)

total_items_processed = sum(
result.get_total_count() for result in run_results)
total_items_processed = {}
for channel in _get_channels():
total_items_processed[channel] = sum(
result.get_total_count() for result in run_results.get(channel, []))

current_datetime = datetime.datetime.now(pytz.timezone(_JAPAN_TIMEZONE))

Expand All @@ -87,19 +92,21 @@ def pubsub_push():
'%s (%s)' %
(current_datetime.strftime('%B %d, %Y %H:%M:%S'), _JAPAN_TIMEZONE),
'projectId':
_PROJECT_ID,
_project_id(),
'runResults':
run_results,
'totalItemsProcessed':
total_items_processed
total_items_processed,
'useLocalInventoryAds':
_use_local_inventory_ads(),
}

template = jinja_environment.get_template('completion_mail.html')
html_body = template.render(template_values)
message = mail.EmailMessage(
sender='no-reply@{0}.appspotmail.com'.format(_PROJECT_ID),
sender='no-reply@{0}.appspotmail.com'.format(_project_id()),
subject='Shopping Feed Processing Completed',
to=_EMAIL_TO,
to=_load_environment_variable(_EMAIL_TO),
html=html_body)
message.send()
return 'OK!', httplib.OK
Expand All @@ -117,7 +124,7 @@ def _extract_run_result(request_body):
run_results_str = request_body.get('message',
{}).get('attributes',
{}).get('content_api_results',
'{}')
'[]')
run_results_dict = json.loads(run_results_str.decode('utf-8'))
return run_results_dict

Expand All @@ -129,21 +136,52 @@ def _get_run_result_list(run_results_dict):
run_results_dict: A dictionary of run results.
Returns:
A list of run_result objects.
A dictionary containing RunResult objects. Key is a channel and value is a
RunResult object.
"""
run_results = []
for operation in [
_CONTENT_API_OPERATION_UPSERT, _CONTENT_API_OPERATION_DELETE,
_CONTENT_API_OPERATION_PREVENT_EXPIRING
]:
results_for_operation = run_results_dict.get(operation,
{'operation': operation})
results_for_operation['description'] = _OPERATION_DESCRIPTIONS.get(
operation, '')
run_result_from_dict = run_result.RunResult.from_dict(results_for_operation)
run_results.append(run_result_from_dict)
run_results = {}
for channel in _get_channels():
rows_for_channel = [
row for row in run_results_dict if row.get('channel') == channel
]
run_results_for_channel = []
for operation in _OPERATIONS:
# Set a dict without run result numbers by default. The result table in
# the email shows zero for the operation.
row_for_operation = {'channel': channel, 'operation': operation}
for row in rows_for_channel:
if row.get('operation') == operation:
row_for_operation = row
row_for_operation['description'] = _OPERATION_DESCRIPTIONS.get(
operation, '')
run_results_for_operation = run_result.RunResult.from_dict(
row_for_operation)
run_results_for_channel.append(run_results_for_operation)
run_results[channel] = run_results_for_channel

return run_results


def _get_channels():
"""Returns a list of Shopping channels based on the environment value."""
return ['online', 'local'] if _use_local_inventory_ads() else ['online']


def _project_id():
"""Returns project id."""
return app_identity.get_application_id()


def _use_local_inventory_ads():
"""Returns boolean value of whether to use local channel or not."""
use_local_inventory_ads = _load_environment_variable(_USE_LOCAL_INVENTORY_ADS)
return True if use_local_inventory_ads.lower() == 'true' else False


def _load_environment_variable(key):
"""Returns a value of environment variable."""
return os.getenv(key)


if __name__ == '__main__':
app.run(host='127.0.0.1', port=8080, debug=True)
118 changes: 73 additions & 45 deletions appengine/mailer/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,34 @@
import unittest
from google.appengine.ext import testbed

from absl.testing import parameterized
import json
import main
import mock

_HTTP_OK = 200
_HTTP_UNAUTHORIZED = 401

DUMMY_SUCCESS_COUNT = 1
DUMMY_FAILURE_COUNT = 2
DUMMY_SKIPPED_COUNT = 3
_KEY_CHANNEL = 'channel'
_CHANNEL_ONLINE = 'online'
_CHANNEL_LOCAL = 'local'

OPERATION_UPSERT = 'upsert'
OPERATION_DELETE = 'delete'
OPERATION_PREVENT_EXPIRING = 'prevent_expiring'
_KEY_OPERATION = 'operation'
_OPERATION_UPSERT = 'upsert'
_OPERATION_DELETE = 'delete'
_OPERATION_PREVENT_EXPIRING = 'prevent_expiring'

KEY_OPERATION = 'operation'
KEY_SUCCESS_COUNT = 'success_count'
KEY_FAILURE_COUNT = 'failure_count'
KEY_SKIPPED_COUNT = 'skipped_count'
_KEY_SUCCESS_COUNT = 'success_count'
_KEY_FAILURE_COUNT = 'failure_count'
_KEY_SKIPPED_COUNT = 'skipped_count'
_DUMMY_SUCCESS_COUNT = 1
_DUMMY_FAILURE_COUNT = 2
_DUMMY_SKIPPED_COUNT = 3

_DUMMY_PUBSUB_TOKEN = 'testtoken'

class MainTest(unittest.TestCase):

class MainTest(parameterized.TestCase):

def setUp(self):
super(MainTest, self).setUp()
Expand All @@ -47,9 +54,13 @@ def setUp(self):
self.testbed.activate()
self.testbed.init_mail_stub()
self.mail_stub = self.testbed.get_stub(testbed.MAIL_SERVICE_NAME)
main.app.config['PUBSUB_VERIFICATION_TOKEN'] = 'testtoken'
main.app.config['EMAIL_TO'] = 'testemailaddress'
self.testbed.setup_env(PUBSUB_VERIFICATION_TOKEN=_DUMMY_PUBSUB_TOKEN)
self.testbed.setup_env(EMAIL_TO='testemailaddress')
self.testbed.setup_env(USE_LOCAL_INVENTORY_ADS='False')
mock.patch('main._project_id').start()
main._project_id.return_value = 'test-project-id'
self.test_client = main.app.test_client()
self.addCleanup(mock.patch.stopall)

def tearDown(self):
super(MainTest, self).tearDown()
Expand All @@ -59,9 +70,17 @@ def test_index(self):
response = self.test_client.get('/health')
self.assertEqual(_HTTP_OK, response.status_code)

def test_pubsub_push_success(self):
request_params = {'token': 'testtoken'}
request_data = _create_pubsub_msg()
@parameterized.named_parameters(
{
'testcase_name': 'online_and_local',
'channels': (_CHANNEL_ONLINE, _CHANNEL_LOCAL)
}, {
'testcase_name': 'online_only',
'channels': (_CHANNEL_ONLINE,)
})
def test_pubsub_push_success(self, channels):
request_params = {'token': _DUMMY_PUBSUB_TOKEN}
request_data = _create_pubsub_msg(channels)
response = self.test_client.post(
'/pubsub/push', query_string=request_params, data=request_data)
self.assertEqual(_HTTP_OK, response.status_code)
Expand All @@ -73,47 +92,56 @@ def test_pubsub_push_success_when_pubsub_msg_empty(self):
'/pubsub/push', query_string=request_params, data='{}')
self.assertEqual(_HTTP_OK, response.status_code)

@mock.patch('google.appengine.api.mail.EmailMessage')
def test_local_results_are_added_to_email_content_when_local_inventory_is_enabled(
self, email_message):
self.testbed.setup_env(USE_LOCAL_INVENTORY_ADS='True', overwrite=True)
request_params = {'token': _DUMMY_PUBSUB_TOKEN}
channels = (_CHANNEL_ONLINE, _CHANNEL_LOCAL)
request_data = _create_pubsub_msg(channels)
self.test_client.post(
'/pubsub/push', query_string=request_params, data=request_data)
email_content = email_message.call_args.kwargs['html']
self.assertIn('Run Results for Local Inventory:', email_content)

@mock.patch('google.appengine.api.mail.EmailMessage')
def test_local_results_are_not_added_to_email_content_when_local_inventory_is_disabled(
self, email_message):
self.testbed.setup_env(USE_LOCAL_INVENTORY_ADS='False', overwrite=True)
request_params = {'token': _DUMMY_PUBSUB_TOKEN}
channels = (_CHANNEL_ONLINE,)
request_data = _create_pubsub_msg(channels)
self.test_client.post(
'/pubsub/push', query_string=request_params, data=request_data)
email_content = email_message.call_args.kwargs['html']
self.assertNotIn('Run Results for Local Inventory:', email_content)

def test_pubsub_push_failure(self):
request_params = {'token': 'wrongtoken'}
response = self.test_client.post(
'/pubsub/push', query_string=request_params)
self.assertEqual(_HTTP_UNAUTHORIZED, response.status_code)


def _create_pubsub_msg():
# Create string interpolation dict for Python 2 string interpolation
msg_dict = {
'operation_upsert': OPERATION_UPSERT,
'operation_delete': OPERATION_DELETE,
'operation_prevent_expiring': OPERATION_PREVENT_EXPIRING,
'operation': KEY_OPERATION,
'success_count_key': KEY_SUCCESS_COUNT,
'failure_count_key': KEY_FAILURE_COUNT,
'skipped_count_key': KEY_SKIPPED_COUNT,
'success_count': DUMMY_SUCCESS_COUNT,
'failure_count': DUMMY_FAILURE_COUNT,
'skipped_count': DUMMY_SKIPPED_COUNT
}
def _create_pubsub_msg(channels=(_CHANNEL_ONLINE,)):
# Create PubSub message main body
content_api_result_dict = (
'{{"{operation_upsert}": {{"{operation}": "{operation_upsert}", '
'"{success_count_key}": {success_count}, '
'"{failure_count_key}": {failure_count}, '
'"{skipped_count_key}": {skipped_count}}}, '
'"{operation_delete}": {{"{operation}": "{operation_delete}", '
'"{success_count_key}": {success_count}, '
'"{failure_count_key}": {failure_count}, '
'"{skipped_count_key}": {skipped_count}}}, '
'"{operation_prevent_expiring}": {{"{operation}": '
'"{operation_prevent_expiring}", "{success_count_key}": '
'{success_count}, "{failure_count_key}": '
'{failure_count}, "{skipped_count_key}": '
'{skipped_count}}}}}').format(**msg_dict)
content_api_result_in_list = []
for channel in channels:
for operation in (_OPERATION_UPSERT, _OPERATION_DELETE,
_OPERATION_PREVENT_EXPIRING):
content_api_result_in_list.append({
_KEY_CHANNEL: channel,
_KEY_OPERATION: operation,
_KEY_SUCCESS_COUNT: _DUMMY_SUCCESS_COUNT,
_KEY_FAILURE_COUNT: _DUMMY_FAILURE_COUNT,
_KEY_SKIPPED_COUNT: _DUMMY_SKIPPED_COUNT
})
content_api_result_in_string = json.dumps(content_api_result_in_list)
# Wrap main body in PubSub message wrapper
expected_publish_message = {
'message': {
'attributes': {
'content_api_results': content_api_result_dict,
'content_api_results': content_api_result_in_string,
}
}
}
Expand Down
Loading

0 comments on commit bd6c670

Please sign in to comment.