Salesforce Integration with Python

Problem

The Sales department at Big Data Co. has recurring tasks in Salesforce for all of their open Opportunities. While representatives are excellent at interfacing with customers, they have a hard time managing their outreach within Salesforce. After overhearing numerous sighs coupled with the bell sound generated when Salesforce pops up an overdue tasks list, the team’s dedicated Data Scientist is empowered to create an app to help save the Sales folks some time and sanity.

Solution

The helpful Data Scientist publishes a Streamlit app that pulls in overdue Tasks assigned to the logged-in user, displaying a graph of Tasks by priority and a table that allows Sales folks to batch close them. Changes made in Salesforce automatically populate in the app and vice-versa.

requirements.txt
plotly.express==0.4.1
pandas==2.2.3
posit-sdk==0.5.0
simple_salesforce==1.12.6
streamlit==1.40.2
app.py
# -*- coding: utf-8 -*-
# mypy: ignore-errors

import os
import streamlit as st
import requests
import pandas as pd
import plotly.express as px
from posit import connect
from simple_salesforce import Salesforce


# initialize the Connect python sdk client if running on Connect
if os.getenv("RSTUDIO_PRODUCT") == "CONNECT":
    client = connect.Client()

    # grab the user session token from headers
    user_session_token = st.context.headers.get("Posit-Connect-User-Session-Token")

    try:
        # fetch the access token using the session token
        access_token = client.oauth.get_credentials(user_session_token).get(
            "access_token"
        )
    except connect.errors.ClientError as e:
        if e.error_code == 219:
            st.write(
                "Your Salesforce refresh token has expired. Please log out and log in to the integration."
            )
else:
    # supply the access token via environment variable when running locally
    access_token = os.getenv("SALESFORCE_TOKEN")

# salesforce domain obtained from environment variable
# ex: SALESFORCE_DOMAIN="https://example.my.salesforce.com"
salesforce_domain = os.getenv("SALESFORCE_DOMAIN")

# create client with domain, access token and API version supplied
salesforce_client = Salesforce(
    instance_url=salesforce_domain, session_id=access_token, version=62.0
)
user_info_endpoint = "https://login.salesforce.com/services/oauth2/userinfo"
headers = {"Authorization": f"Bearer {access_token}"}

# grab user information for use with greeting and querying
response = requests.get(user_info_endpoint, headers=headers)
response_dict = response.json()
user_id = response_dict["user_id"]
name = response_dict["name"]
greeting = f"Hello, {name}!"
st.write(greeting)

# build query for uncompleted, overdue tasks associated with opportunities in salesforce
overdue_task_query = f"""
SELECT
    Id,
    Subject,
    ActivityDate,
    Status,
    OwnerId,
    Priority,
    WhatId,
    What.Name
FROM Task
WHERE OwnerId = '{user_id}'
  AND What.Type = 'Opportunity'
  AND ActivityDate < TODAY
  AND Status != 'Completed'
"""

# query salesforce for overdue tasks assigned to the logged-in user
records = salesforce_client.query(overdue_task_query)

# add task data to a list
tasks = []
for task in records["records"]:
    tasks.append(
        {
            "Opportunity Name": task.get("What", {}).get("Name"),
            "Task Subject": task.get("Subject"),
            "Task Due Date": task.get("ActivityDate"),
            "Task Status": task.get("Status"),
            "Task Priority": task.get("Priority"),
            "Task ID": task.get("Id"),
        }
    )

# convert the list of returned tasks to a dataframe with a Select column
df = pd.DataFrame(tasks)
df["Select"] = False

# if there are tasks in the dataframe then plot the data
if not df.empty:
    fig = px.bar(
        df,
        x="Task Priority",
        title="Number of Tasks by Priority",
        labels={"Task Priority": "Priority", "count": "Number of Tasks"},
        color="Task Priority",
        height=400,
    )

    # show a chart of overdue tasks by prority
    st.plotly_chart(fig)

    # show a table of overdue tasks
    st.write("Select a task to close it.")
    selected_rows = st.data_editor(
        df,
        hide_index=True,
        column_config={"Select": st.column_config.CheckboxColumn()},
    )

    # create a list of selected task IDs
    selected_task_ids = selected_rows[selected_rows["Select"]]["Task ID"].tolist()

    # close selected tasks when button is clicked
    if st.button(f"Close {len(selected_task_ids)} Task(s)"):
        if selected_task_ids:
            success_count = 0
            for task_id in selected_task_ids:
                try:
                    data = {"Status": "Completed"}
                    salesforce_client.Task.upsert(task_id, data)
                    success_count += 1
                except Exception as e:
                    st.error(f"Failed to update tasks: {str(e)}")
        if success_count > 0:
            st.success(f"Closed {success_count} task(s)!")
            # reload the graph and table on success
            st.rerun()
else:
    st.write("Hooray, you have no overdue tasks!")

Running the app locally

Terminal
export CONNECT_SERVER=<connect-server>
export CONNECT_API_KEY=<connect-api-key>
SALESFORCE_DOMAIN=<salesforce-domain> SALESFORCE_TOKEN=<salesforce-token> streamlit run app.py