Salesforce Integration with Python
Problem
The Sales department at Big Data Co. has recurring tasks in Salesforce for all of their open Opportunities. While representatives are excellent at interfacing with customers, they have a hard time managing their outreach within Salesforce. After overhearing numerous sighs coupled with the bell sound generated when Salesforce pops up an overdue tasks list, the team’s dedicated Data Scientist is empowered to create an app to help save the Sales folks some time and sanity.
Solution
The helpful Data Scientist publishes a Streamlit app that pulls in overdue Tasks assigned to the logged-in user, displaying a graph of Tasks by priority and a table that allows Sales folks to batch close them. Changes made in Salesforce automatically populate in the app and vice-versa.
requirements.txt
==0.4.1
plotly.express==2.2.3
pandas-sdk==0.5.0
posit==1.12.6
simple_salesforce==1.40.2 streamlit
app.py
# -*- coding: utf-8 -*-
# mypy: ignore-errors
import os
import streamlit as st
import requests
import pandas as pd
import plotly.express as px
from posit import connect
from simple_salesforce import Salesforce
# initialize the Connect python sdk client if running on Connect
if os.getenv("RSTUDIO_PRODUCT") == "CONNECT":
= connect.Client()
client
# grab the user session token from headers
= st.context.headers.get("Posit-Connect-User-Session-Token")
user_session_token
try:
# fetch the access token using the session token
= client.oauth.get_credentials(user_session_token).get(
access_token "access_token"
)except connect.errors.ClientError as e:
if e.error_code == 219:
st.write("Your Salesforce refresh token has expired. Please log out and log in to the integration."
)else:
# supply the access token via environment variable when running locally
= os.getenv("SALESFORCE_TOKEN")
access_token
# salesforce domain obtained from environment variable
# ex: SALESFORCE_DOMAIN="https://example.my.salesforce.com"
= os.getenv("SALESFORCE_DOMAIN")
salesforce_domain
# create client with domain, access token and API version supplied
= Salesforce(
salesforce_client =salesforce_domain, session_id=access_token, version=62.0
instance_url
)= "https://login.salesforce.com/services/oauth2/userinfo"
user_info_endpoint = {"Authorization": f"Bearer {access_token}"}
headers
# grab user information for use with greeting and querying
= requests.get(user_info_endpoint, headers=headers)
response = response.json()
response_dict = response_dict["user_id"]
user_id = response_dict["name"]
name = f"Hello, {name}!"
greeting
st.write(greeting)
# build query for uncompleted, overdue tasks associated with opportunities in salesforce
= f"""
overdue_task_query SELECT
Id,
Subject,
ActivityDate,
Status,
OwnerId,
Priority,
WhatId,
What.Name
FROM Task
WHERE OwnerId = '{user_id}'
AND What.Type = 'Opportunity'
AND ActivityDate < TODAY
AND Status != 'Completed'
"""
# query salesforce for overdue tasks assigned to the logged-in user
= salesforce_client.query(overdue_task_query)
records
# add task data to a list
= []
tasks for task in records["records"]:
tasks.append(
{"Opportunity Name": task.get("What", {}).get("Name"),
"Task Subject": task.get("Subject"),
"Task Due Date": task.get("ActivityDate"),
"Task Status": task.get("Status"),
"Task Priority": task.get("Priority"),
"Task ID": task.get("Id"),
}
)
# convert the list of returned tasks to a dataframe with a Select column
= pd.DataFrame(tasks)
df "Select"] = False
df[
# if there are tasks in the dataframe then plot the data
if not df.empty:
= px.bar(
fig
df,="Task Priority",
x="Number of Tasks by Priority",
title={"Task Priority": "Priority", "count": "Number of Tasks"},
labels="Task Priority",
color=400,
height
)
# show a chart of overdue tasks by prority
st.plotly_chart(fig)
# show a table of overdue tasks
"Select a task to close it.")
st.write(= st.data_editor(
selected_rows
df,=True,
hide_index={"Select": st.column_config.CheckboxColumn()},
column_config
)
# create a list of selected task IDs
= selected_rows[selected_rows["Select"]]["Task ID"].tolist()
selected_task_ids
# close selected tasks when button is clicked
if st.button(f"Close {len(selected_task_ids)} Task(s)"):
if selected_task_ids:
= 0
success_count for task_id in selected_task_ids:
try:
= {"Status": "Completed"}
data
salesforce_client.Task.upsert(task_id, data)+= 1
success_count except Exception as e:
f"Failed to update tasks: {str(e)}")
st.error(if success_count > 0:
f"Closed {success_count} task(s)!")
st.success(# reload the graph and table on success
st.rerun()else:
"Hooray, you have no overdue tasks!") st.write(
Running the app locally
Terminal
export CONNECT_SERVER=<connect-server>
export CONNECT_API_KEY=<connect-api-key>
SALESFORCE_DOMAIN=<salesforce-domain> SALESFORCE_TOKEN=<salesforce-token> streamlit run app.py