Skip to content

Time Dimension Indicators

process_performance_indicators.indicators.time.instances

lead_time(event_log: pd.DataFrame, instance_id: str) -> pd.Timedelta

The total elapsed time of the activity instance, measured as the sum of the elapsed time between the start and complete events of the activity instance, and the elapsed time between the complete event of the activity instance that precedes the current activity instance, and the start event of the current activity instance.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
instance_id str

The instance id.

required
Source code in process_performance_indicators/indicators/time/instances.py
def lead_time(event_log: pd.DataFrame, instance_id: str) -> pd.Timedelta:
    """
    The total elapsed time of the activity instance, measured as the sum of the elapsed time
    between the start and complete events of the activity instance, and the elapsed time between
    the complete event of the activity instance that precedes the current activity instance,
    and the start event of the current activity instance.

    Args:
        event_log: The event log.
        instance_id: The instance id.

    """
    return service_time(event_log, instance_id) + waiting_time(event_log, instance_id)

service_and_lead_time_ratio(event_log: pd.DataFrame, instance_id: str) -> float

The ratio between the elapsed time between the start and complete events of the activity instance, and the total elapsed time of the activity instance.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
instance_id str

The instance id.

required
Source code in process_performance_indicators/indicators/time/instances.py
def service_and_lead_time_ratio(event_log: pd.DataFrame, instance_id: str) -> float:
    """
    The ratio between the elapsed time between the start and complete events of the activity instance,
    and the total elapsed time of the activity instance.

    Args:
        event_log: The event log.
        instance_id: The instance id.

    """
    return safe_division(service_time(event_log, instance_id), lead_time(event_log, instance_id))

service_time(event_log: pd.DataFrame, instance_id: str) -> pd.Timedelta

The elapsed time between the start and complete events of the activity instance.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
instance_id str

The instance id.

required
Source code in process_performance_indicators/indicators/time/instances.py
def service_time(event_log: pd.DataFrame, instance_id: str) -> pd.Timedelta:
    """
    The elapsed time between the start and complete events of the activity instance.

    Args:
        event_log: The event log.
        instance_id: The instance id.

    """
    complete_time = instances_utils.ctime(event_log, instance_id)
    start_time = instances_utils.stime(event_log, instance_id)
    return complete_time - start_time

waiting_time(event_log: pd.DataFrame, instance_id: str) -> pd.Timedelta

The elapsed time between the complete event of the activity instance that precedes the current activity instance, and the start event of the current activity instance.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
instance_id str

The instance id.

required
Source code in process_performance_indicators/indicators/time/instances.py
def waiting_time(event_log: pd.DataFrame, instance_id: str) -> pd.Timedelta:
    """
    The elapsed time between the complete event of the activity instance that
    precedes the current activity instance, and the start event of the current
    activity instance.

    Args:
        event_log: The event log.
        instance_id: The instance id.

    """
    prev_instances = instances_utils.prev_instances(event_log, instance_id)
    if not prev_instances:
        return pd.Timedelta(0)

    any_prev_instance_id = next(iter(prev_instances))
    return instances_utils.stime(event_log, instance_id) - instances_utils.ctime(event_log, any_prev_instance_id)

process_performance_indicators.indicators.time.activities

lead_time(event_log: pd.DataFrame, activity_name: str) -> pd.Timedelta

The sum of total elapsed times of all instantiations of the activity.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
activity_name str

The name of the activity.

required
Source code in process_performance_indicators/indicators/time/activities.py
def lead_time(event_log: pd.DataFrame, activity_name: str) -> pd.Timedelta:
    """
    The sum of total elapsed times of all instantiations of the activity.

    Args:
        event_log: The event log.
        activity_name: The name of the activity.

    """
    total_lead_time: pd.Timedelta = pd.Timedelta(0)
    for instance_id in activities_utils.inst(event_log, activity_name):
        total_lead_time += time_instances_indicators.lead_time(event_log, instance_id)
    return total_lead_time

service_and_lead_time_ratio(event_log: pd.DataFrame, activity_name: str) -> float

The ratio between the sum of elapsed times between the start and complete events of all instantiations of the activity, and the sum of total elapsed times for all instantiations of the activity.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
activity_name str

The name of the activity.

required
Source code in process_performance_indicators/indicators/time/activities.py
def service_and_lead_time_ratio(event_log: pd.DataFrame, activity_name: str) -> float:
    """
    The ratio between the sum of elapsed times between the start and complete events of
    all instantiations of the activity, and the sum of total elapsed times for all instantiations
    of the activity.

    Args:
        event_log: The event log.
        activity_name: The name of the activity.

    """
    return safe_division(service_time(event_log, activity_name), lead_time(event_log, activity_name))

service_time(event_log: pd.DataFrame, activity_name: str) -> pd.Timedelta

The sum of elapsed times between the start and complete events of all instantiations of the activity

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
activity_name str

The name of the activity.

required
Source code in process_performance_indicators/indicators/time/activities.py
def service_time(event_log: pd.DataFrame, activity_name: str) -> pd.Timedelta:
    """
    The sum of elapsed times between the start and complete events of all instantiations of the activity

    Args:
        event_log: The event log.
        activity_name: The name of the activity.

    """
    sum_of_service_times_in_minutes = 0
    for instance_id in activities_utils.inst(event_log, activity_name):
        sum_of_service_times_in_minutes += time_instances_indicators.service_time(
            event_log, instance_id
        ) / pd.Timedelta(minutes=1)
    return pd.Timedelta(minutes=sum_of_service_times_in_minutes)

waiting_time(event_log: pd.DataFrame, activity_name: str) -> pd.Timedelta

The sum of elapsed times between the complete events of activity instances that precede every instantiations of the activity, and the start event of each instantiation.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
activity_name str

The name of the activity.

required
Source code in process_performance_indicators/indicators/time/activities.py
def waiting_time(event_log: pd.DataFrame, activity_name: str) -> pd.Timedelta:
    """
    The sum of elapsed times between the complete events of activity instances that
    precede every instantiations of the activity, and the start event of each instantiation.

    Args:
        event_log: The event log.
        activity_name: The name of the activity.

    """
    sum_of_waiting_times: pd.Timedelta = pd.Timedelta(0)
    for instance_id in activities_utils.inst(event_log, activity_name):
        sum_of_waiting_times += time_instances_indicators.waiting_time(event_log, instance_id)
    return sum_of_waiting_times

process_performance_indicators.indicators.time.cases

active_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta

The difference between the total elapsed time of the case, and the sum of waiting times for every activity instance in the case where no other activity instance was being executed.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case ID.

required
Source code in process_performance_indicators/indicators/time/cases.py
def active_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta:
    """
    The difference between the total elapsed time of the case, and the sum of waiting
    times for every activity instance in the case where no other activity instance was being executed.

    Args:
        event_log: The event log.
        case_id: The case ID.

    """
    return lead_time(event_log, case_id) - idle_time(event_log, case_id)

activity_count(event_log: pd.DataFrame, case_id: str) -> int

The number of activities that occur in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case ID.

required
Source code in process_performance_indicators/indicators/time/cases.py
def activity_count(event_log: pd.DataFrame, case_id: str) -> int:
    """
    The number of activities that occur in the case.

    Args:
        event_log: The event log.
        case_id: The case ID.

    """
    return len(cases_utils.act(event_log, case_id))

activity_instance_count(event_log: pd.DataFrame, case_id: str) -> int

The number of times that any activity has been instantiated in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case ID.

required
Source code in process_performance_indicators/indicators/time/cases.py
def activity_instance_count(event_log: pd.DataFrame, case_id: str) -> int:
    """
    The number of times that any activity has been instantiated in the case.

    Args:
        event_log: The event log.
        case_id: The case ID.

    """
    return len(cases_utils.inst(event_log, case_id))

automated_activity_count(event_log: pd.DataFrame, case_id: str, automated_activities: set[str]) -> int

The number of automated activities that occur in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case ID.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/cases.py
def automated_activity_count(event_log: pd.DataFrame, case_id: str, automated_activities: set[str]) -> int:
    """
    The number of automated activities that occur in the case.

    Args:
        event_log: The event log.
        case_id: The case ID.
        automated_activities: The set of automated activities.

    """
    case_activities = cases_utils.act(event_log, case_id)
    return len(automated_activities.intersection(case_activities))

automated_activity_instance_count(event_log: pd.DataFrame, case_id: str, automated_activities: set[str]) -> int

The number of times that an automated activity is instantiated in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/cases.py
def automated_activity_instance_count(event_log: pd.DataFrame, case_id: str, automated_activities: set[str]) -> int:
    """
    The number of times that an automated activity is instantiated in the case.

    Args:
        event_log: The event log.
        case_id: The case id.
        automated_activities: The set of automated activities.

    """
    case_instances = cases_utils.inst(event_log, case_id)
    instances_of_automated_activities = set()
    for instance in case_instances:
        if instances_utils.act(event_log, instance) in automated_activities:
            instances_of_automated_activities.add(instance)
    return len(instances_of_automated_activities)

automated_activity_service_time(event_log: pd.DataFrame, case_id: str, automated_activities: set[str]) -> pd.Timedelta

The sum of elapsed times for all instantiations of automated activities in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case ID.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/cases.py
def automated_activity_service_time(
    event_log: pd.DataFrame, case_id: str, automated_activities: set[str]
) -> pd.Timedelta:
    """
    The sum of elapsed times for all instantiations of automated activities in the case.

    Args:
        event_log: The event log.
        case_id: The case ID.
        automated_activities: The set of automated activities.

    """
    total_service_time: pd.Timedelta = pd.Timedelta(0)
    for instance_id in cases_utils.inst(event_log, case_id):
        if instances_utils.act(event_log, instance_id) in automated_activities:
            total_service_time += time_instances_indicators.service_time(event_log, instance_id)
    return total_service_time

handover_count(event_log: pd.DataFrame, case_id: str) -> float

The number of times that a human resource associated with an activity instance differs from the human resource associated with the preceding activity instance within the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case ID.

required
Source code in process_performance_indicators/indicators/time/cases.py
def handover_count(event_log: pd.DataFrame, case_id: str) -> float:
    """
    The number of times that a human resource associated with an activity instance
    differs from the human resource associated with the preceding activity instance
    within the case.

    Args:
        event_log: The event log.
        case_id: The case ID.

    """
    _handover_count = 0
    for instance_id in cases_utils.inst(event_log, case_id):
        _handover_count += instances_utils.dres(event_log, instance_id)
    return _handover_count

idle_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta

The sum of waiting times for every activity instance in the case where no other activity instance was being executed.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
Source code in process_performance_indicators/indicators/time/cases.py
def idle_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta:
    """
    The sum of waiting times for every activity instance in the case
    where no other activity instance was being executed.

    Args:
        event_log: The event log.
        case_id: The case id.

    """
    total_idle_time: pd.Timedelta = pd.Timedelta(0)
    eligible_instances = 0
    successful_calculations = 0
    last_error: IndicatorDivisionError | None = None

    for instance_id in cases_utils.inst(event_log, case_id):
        if not instances_utils.prev_instances(event_log, instance_id):
            continue

        eligible_instances += 1
        try:
            total_idle_time += safe_division(
                time_instances_indicators.waiting_time(event_log, instance_id),
                len(instances_utils.concstr(event_log, instance_id)),
            )
            successful_calculations += 1
        except IndicatorDivisionError as e:
            last_error = e
            continue

    if eligible_instances > 0 and successful_calculations == 0 and last_error is not None:
        raise last_error

    return total_idle_time

lead_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta

The total elapsed time between the earliest and latest timestamps in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
Source code in process_performance_indicators/indicators/time/cases.py
def lead_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta:
    """
    The total elapsed time between the earliest and latest timestamps in the case.

    Args:
        event_log: The event log.
        case_id: The case id.

    """
    return cases_utils.endt(event_log, case_id) - cases_utils.startt(event_log, case_id)

lead_time_deviation_from_deadline(event_log: pd.DataFrame, case_id: str, deadline_margin: pd.Timedelta) -> pd.Timedelta

The difference between the time that the case is expected to take, and the actual elapsed time between its earliest and latest timestamps. Negative values indicate that the case took less time than expected.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
deadline_margin Timedelta

The margin of error for the deadline.

required
Source code in process_performance_indicators/indicators/time/cases.py
def lead_time_deviation_from_deadline(
    event_log: pd.DataFrame, case_id: str, deadline_margin: pd.Timedelta
) -> pd.Timedelta:
    """
    The difference between the time that the case is expected to take, and the actual elapsed time between
    its earliest and latest timestamps. Negative values indicate that the case took less time than expected.

    Args:
        event_log: The event log.
        case_id: The case id.
        deadline_margin: The margin of error for the deadline.

    """
    # TODO: CHECK DEADLINE TYPE
    return deadline_margin - lead_time(event_log, case_id)

lead_time_deviation_from_expectation(event_log: pd.DataFrame, case_id: str, expectation: pd.Timedelta) -> pd.Timedelta

The absolute value of the difference between the time that the case is expected to take, and the actual elapsed time between its earliest and latest timestamps.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
expectation Timedelta

The time delta the case is expected to take.

required
Source code in process_performance_indicators/indicators/time/cases.py
def lead_time_deviation_from_expectation(
    event_log: pd.DataFrame, case_id: str, expectation: pd.Timedelta
) -> pd.Timedelta:
    """
    The absolute value of the difference between the time that the case is expected to take,
    and the actual elapsed time between its earliest and latest timestamps.

    Args:
        event_log: The event log.
        case_id: The case id.
        expectation: The time delta the case is expected to take.

    """
    return abs(expectation - lead_time(event_log, case_id))

lead_time_from_activity_a(event_log: pd.DataFrame, case_id: str, activity_a: str) -> pd.Timedelta | None

The total elapsed time between the earliest instantiations of a specific activity, and the latest activity instance, in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
activity_a str

The specific activity name.

required
Source code in process_performance_indicators/indicators/time/cases.py
def lead_time_from_activity_a(event_log: pd.DataFrame, case_id: str, activity_a: str) -> pd.Timedelta | None:
    """
    The total elapsed time between the earliest instantiations of a specific activity, and
    the latest activity instance, in the case.

    Args:
        event_log: The event log.
        case_id: The case id.
        activity_a: The specific activity name.

    """
    instances_earliest_occurrences = cases_activities_utils.fi_s(event_log, case_id, activity_a)
    if not instances_earliest_occurrences:
        return None

    ending_instances = cases_utils.endin(event_log, case_id)

    any_earliest_instances = next(iter(instances_earliest_occurrences))
    any_ending_instances = next(iter(ending_instances))
    return instances_utils.lt(event_log, any_earliest_instances, any_ending_instances)

lead_time_from_activity_a_to_b(event_log: pd.DataFrame, case_id: str, activity_a: str, activity_b: str) -> pd.Timedelta | None

The total elapsed time between the earliest instantiations of a specific activity, and the earliest instantiations of another specific activity that precedes the other, in the case. Here "activity a precedes activity b".

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
activity_a str

The specific activity name that precedes activity b.

required
activity_b str

The specific activity name that follows activity a.

required
Source code in process_performance_indicators/indicators/time/cases.py
def lead_time_from_activity_a_to_b(
    event_log: pd.DataFrame, case_id: str, activity_a: str, activity_b: str
) -> pd.Timedelta | None:
    """
    The total elapsed time between the earliest instantiations of a specific activity, and the earliest
    instantiations of another specific activity that precedes the other, in the case. Here "activity a precedes activity b".

    Args:
        event_log: The event log.
        case_id: The case id.
        activity_a: The specific activity name that precedes activity b.
        activity_b: The specific activity name that follows activity a.

    """
    instances_earliest_ocurrences_after_other = cases_activities_utils.fi(event_log, case_id, activity_a, activity_b)
    if not instances_earliest_ocurrences_after_other:
        return None

    instances_earliest_occurrences = cases_activities_utils.fi_s(event_log, case_id, activity_a)

    any_earliest_instances = next(iter(instances_earliest_occurrences))  # x
    any_earliest_instances_after_other = next(iter(instances_earliest_ocurrences_after_other))  # y

    return instances_utils.lt(event_log, any_earliest_instances, any_earliest_instances_after_other)

lead_time_to_activity_a(event_log: pd.DataFrame, case_id: str, activity_a: str) -> pd.Timedelta | None

The total elapsed time between the earliest activity instance, and the earliest instantiations of a specific activity, in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
activity_a str

The specific activity name.

required
Source code in process_performance_indicators/indicators/time/cases.py
def lead_time_to_activity_a(event_log: pd.DataFrame, case_id: str, activity_a: str) -> pd.Timedelta | None:
    """
    The total elapsed time between the earliest activity instance, and the earliest instantiations of a specific activity,
    in the case.

    Args:
        event_log: The event log.
        case_id: The case id.
        activity_a: The specific activity name.

    """
    instances_earliest_occurrences = cases_activities_utils.fi_c(event_log, case_id, activity_a)
    if not instances_earliest_occurrences:
        return None

    starting_instances = cases_utils.strin(event_log, case_id)

    any_starting_instances = next(iter(starting_instances))  # x
    any_earliest_instances = next(iter(instances_earliest_occurrences))  # y
    return instances_utils.lt(event_log, any_starting_instances, any_earliest_instances)

service_and_lead_time_ratio(event_log: pd.DataFrame, case_id: str) -> float

The ratio between the sum of elapsed times between the start and complete events of all activity instance of the case, and the total elapsed time between the earliest and latest timestamps in the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
Source code in process_performance_indicators/indicators/time/cases.py
def service_and_lead_time_ratio(event_log: pd.DataFrame, case_id: str) -> float:
    """
    The ratio between the sum of elapsed times between the start and complete events of
    all activity instance of the case, and the total elapsed time between the earliest and latest timestamps in the case.

    Args:
        event_log: The event log.
        case_id: The case id.

    """
    return safe_division(service_time(event_log, case_id), lead_time(event_log, case_id))

service_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta

The sum of elapsed times between the start and complete events of all activity instances of the case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
Source code in process_performance_indicators/indicators/time/cases.py
def service_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta:
    """
    The sum of elapsed times between the start and complete events of all activity instances of the case.

    Args:
        event_log: The event log.
        case_id: The case id.

    """
    sum_of_service_times_in_minutes = 0
    for instance_id in cases_utils.inst(event_log, case_id):
        sum_of_service_times_in_minutes += time_instances_indicators.service_time(
            event_log, instance_id
        ) / pd.Timedelta(minutes=1)
    return pd.Timedelta(minutes=sum_of_service_times_in_minutes)

service_time_from_activity_a_to_b(event_log: pd.DataFrame, case_id: str, activity_a: str, activity_b: str, time_aggregation_mode: Literal['s', 'c', 'sc', 'w']) -> pd.Timedelta | None

The sum elapsed times between the start and complete events of all activity instances of the case, which occur between the earliest instantiations of a specific activity, and the earliest instantiations of another specific activity that precedes the other. Here "activity a precedes activity b".

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
activity_a str

The specific activity name that precedes activity b.

required
activity_b str

The specific activity name that follows activity a.

required
time_aggregation_mode Literal['s', 'c', 'sc', 'w']

The aggregation mode. "s": Considers activity instances that were started within the start and end activity instances. "c": Considers activity instances that were completed within the start and end activity instances. "sc": Considers activity instances that were either started or completed within the start and end activity instances. "w": Considers all activity instances that were active within the start and end activity instances.

required
Source code in process_performance_indicators/indicators/time/cases.py
def service_time_from_activity_a_to_b(
    event_log: pd.DataFrame,
    case_id: str,
    activity_a: str,
    activity_b: str,
    time_aggregation_mode: Literal["s", "c", "sc", "w"],
) -> pd.Timedelta | None:
    """
    The sum elapsed times between the start and complete events of all activity instances of the case,
    which occur between the earliest instantiations of a specific activity, and the earliest instantiations
    of another specific activity that precedes the other. Here "activity a precedes activity b".

    Args:
        event_log: The event log.
        case_id: The case id.
        activity_a: The specific activity name that precedes activity b.
        activity_b: The specific activity name that follows activity a.
        time_aggregation_mode: The aggregation mode.
            "s": Considers activity instances that were started within the start and end activity instances.
            "c": Considers activity instances that were completed within the start and end activity instances.
            "sc": Considers activity instances that were either started or completed within the start and end activity instances.
            "w": Considers all activity instances that were active within the start and end activity instances.

    """
    instances_earliest_ocurrences_after_other = cases_activities_utils.fi(event_log, case_id, activity_a, activity_b)
    if not instances_earliest_ocurrences_after_other:
        return None

    instances_earliest_occurrences = cases_activities_utils.fi_s(event_log, case_id, activity_a)

    any_earliest_instances = next(iter(instances_earliest_occurrences))  # x
    any_earliest_instances_after_other = next(iter(instances_earliest_ocurrences_after_other))  # y

    return instances_utils.st(
        event_log, any_earliest_instances, any_earliest_instances_after_other, time_aggregation_mode
    )

waiting_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta

The sum, for every activity instance in the case, of the elapsed time between the complete event of the activity instance that precedes it, and its start event.

Source code in process_performance_indicators/indicators/time/cases.py
def waiting_time(event_log: pd.DataFrame, case_id: str) -> pd.Timedelta:
    """
    The sum, for every activity instance in the case, of the elapsed time between
    the complete event of the activity instance that precedes it, and its start event.
    """
    sum_of_waiting_times: pd.Timedelta = pd.Timedelta(0)
    for instance_id in cases_utils.inst(event_log, case_id):
        sum_of_waiting_times += time_instances_indicators.waiting_time(event_log, instance_id)
    return sum_of_waiting_times

waiting_time_from_activity_a_to_b(event_log: pd.DataFrame, case_id: str, activity_a: str, activity_b: str, time_aggregation_mode: Literal['s', 'c', 'sc', 'w']) -> pd.Timedelta | None

The sum, for every activity instance in the case that occurs between the earliest instantiations of a specific activity, and the earliest instantiations of another specific activity that precedes the other, of the elapsed time between the complete event of the activity instance that precedes it, and its start event.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_id str

The case id.

required
activity_a str

The specific activity name that precedes activity b.

required
activity_b str

The specific activity name that follows activity a.

required
time_aggregation_mode Literal['s', 'c', 'sc', 'w']

The aggregation mode. "s": Considers activity instances that were started within the start and end activity instances. "c": Considers activity instances that were completed within the start and end activity instances. "sc": Considers activity instances that were either started or completed within the start and end activity instances. "w": Considers all activity instances that were active within the start and end activity instances.

required
Source code in process_performance_indicators/indicators/time/cases.py
def waiting_time_from_activity_a_to_b(
    event_log: pd.DataFrame,
    case_id: str,
    activity_a: str,
    activity_b: str,
    time_aggregation_mode: Literal["s", "c", "sc", "w"],
) -> pd.Timedelta | None:
    """
    The sum, for every activity instance in the case that occurs between the earliest
    instantiations of a specific activity, and the earliest instantiations of another specific activity
    that precedes the other, of the elapsed time between the complete event of the activity instance that precedes it,
    and its start event.

    Args:
        event_log: The event log.
        case_id: The case id.
        activity_a: The specific activity name that precedes activity b.
        activity_b: The specific activity name that follows activity a.
        time_aggregation_mode: The aggregation mode.
            "s": Considers activity instances that were started within the start and end activity instances.
            "c": Considers activity instances that were completed within the start and end activity instances.
            "sc": Considers activity instances that were either started or completed within the start and end activity instances.
            "w": Considers all activity instances that were active within the start and end activity instances.

    """
    instances_earliest_ocurrences_after_other = cases_activities_utils.fi(event_log, case_id, activity_a, activity_b)
    if not instances_earliest_ocurrences_after_other:
        return None

    instances_earliest_occurrences = cases_activities_utils.fi_s(event_log, case_id, activity_a)

    any_earliest_instances = next(iter(instances_earliest_occurrences))  # x
    any_earliest_instances_after_other = next(iter(instances_earliest_ocurrences_after_other))  # y

    return instances_utils.wt(
        event_log, any_earliest_instances, any_earliest_instances_after_other, time_aggregation_mode
    )

process_performance_indicators.indicators.time.groups

activity_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> int

The number of activities that occur in the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def activity_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> int:
    """
    The number of activities that occur in the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    activities_in_group = set()
    for case in case_ids:
        activities_in_group.update(cases_utils.act(event_log, case))

    return len(activities_in_group)

activity_instance_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> int

The number of times that any activity has been instantiated in the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def activity_instance_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> int:
    """
    The number of times that any activity has been instantiated in the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    count = 0
    for case_id in case_ids:
        count += time_cases_indicators.activity_instance_count(event_log, case_id)
    return count

automated_activity_count(event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]) -> int

The number of automated activities that occur in the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/groups.py
def automated_activity_count(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]
) -> int:
    """
    The number of automated activities that occur in the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        automated_activities: The set of automated activities.

    """
    activities_in_group = set()
    for case in case_ids:
        activities_in_group.update(cases_utils.act(event_log, case))

    return len(automated_activities.intersection(activities_in_group))

automated_activity_instance_count(event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]) -> int

The number of times that an automated activity is instantiated in the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/groups.py
def automated_activity_instance_count(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]
) -> int:
    """
    The number of times that an automated activity is instantiated in the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        automated_activities: The set of automated activities.

    """
    count = 0
    for case in case_ids:
        count += time_cases_indicators.automated_activity_instance_count(event_log, case, automated_activities)
    return count

automated_activity_service_time(event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]) -> pd.Timedelta

The sum of elapsed times for all instantiations of automated activities in the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/groups.py
def automated_activity_service_time(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]
) -> pd.Timedelta:
    """
    The sum of elapsed times for all instantiations of automated activities in the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        automated_activities: The set of automated activities.

    """
    total_service_time: pd.Timedelta = pd.Timedelta(0)
    for case_id in case_ids:
        total_service_time += time_cases_indicators.automated_activity_service_time(
            event_log, case_id, automated_activities
        )
    return total_service_time

case_count_lead_time_ratio(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float

The ratio between the number of cases belonging to the group of cases, and the total elapsed time between the earliest and latest events in the group of cases. Returns cases per hour

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
Source code in process_performance_indicators/indicators/time/groups.py
def case_count_lead_time_ratio(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float:
    """
    The ratio between the number of cases belonging to the group of cases, and the total elapsed time between the earliest
    and latest events in the group of cases. Returns cases per hour

    Args:
        event_log: The event log.
        case_ids: The case IDs.

    """
    numerator = general_groups_indicators.case_count(event_log, case_ids)
    # Using hour as the time unit for denominator
    denominator = lead_time(event_log, case_ids) / pd.Timedelta(hours=1)
    return safe_division(numerator, denominator)

case_count_where_lead_time_over_value(event_log: pd.DataFrame, case_ids: list[str] | set[str], lead_time_threshold: pd.Timedelta) -> int

The number of cases belonging to the group of cases whose total elapsed time between the earliest and latest events is greater than the given value.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
lead_time_threshold Timedelta

The threshold value as a time delta.

required
Source code in process_performance_indicators/indicators/time/groups.py
def case_count_where_lead_time_over_value(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], lead_time_threshold: pd.Timedelta
) -> int:
    """
    The number of cases belonging to the group of cases whose total elapsed time between
    the earliest and latest events is greater than the given value.

    Args:
        event_log: The event log.
        case_ids: The case ids.
        lead_time_threshold: The threshold value as a time delta.

    """
    cases_where_lead_time_over_value = {
        case_id for case_id in case_ids if time_cases_indicators.lead_time(event_log, case_id) > lead_time_threshold
    }
    return len(cases_where_lead_time_over_value)

case_percentage_where_lead_time_over_value(event_log: pd.DataFrame, case_ids: list[str] | set[str], lead_time_threshold: pd.Timedelta) -> float

The percentage of cases belonging to the group of cases whose total elapsed time between the earliest and latest events is greater than the given value.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
lead_time_threshold Timedelta

The threshold value as a time delta.

required
Source code in process_performance_indicators/indicators/time/groups.py
def case_percentage_where_lead_time_over_value(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], lead_time_threshold: pd.Timedelta
) -> float:
    """
    The percentage of cases belonging to the group of cases whose total elapsed time between
    the earliest and latest events is greater than the given value.

    Args:
        event_log: The event log.
        case_ids: The case ids.
        lead_time_threshold: The threshold value as a time delta.

    """
    numerator = case_count_where_lead_time_over_value(event_log, case_ids, lead_time_threshold)
    denominator = general_groups_indicators.case_count(event_log, case_ids)
    return safe_division(numerator, denominator)

case_percentage_with_missed_deadline(event_log: pd.DataFrame, case_ids: list[str] | set[str], deadline: pd.Timestamp) -> float

The percentage of cases belonging to the group of cases whose latest event occurs after a given deadline.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
deadline Timestamp

The deadline value as a timestamp.

required
Source code in process_performance_indicators/indicators/time/groups.py
def case_percentage_with_missed_deadline(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], deadline: pd.Timestamp
) -> float:
    """
    The percentage of cases belonging to the group of cases whose latest event occurs after a given deadline.

    Args:
        event_log: The event log.
        case_ids: The case ids.
        deadline: The deadline value as a timestamp.

    """
    cases_over_deadline = {case_id for case_id in case_ids if cases_utils.endt(event_log, case_id) > deadline}
    numerator = len(cases_over_deadline)
    denominator = general_groups_indicators.case_count(event_log, case_ids)
    return safe_division(numerator, denominator)

expected_active_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta

The difference between the total elapsed time of a case belonging to the group of cases, and the sum of waiting times for every activity instance in a case belonging to the group of cases where no other activity instance was being executed.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_active_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta:
    """
    The difference between the total elapsed time of a case belonging to the group of cases, and the
     sum of waiting times for every activity instance in a case belonging to the group of cases where
     no other activity instance was being executed.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    total_active_time: pd.Timedelta = pd.Timedelta(0)
    successful_cases = 0
    last_error: IndicatorDivisionError | None = None

    for case_id in case_ids:
        try:
            total_active_time += time_cases_indicators.active_time(event_log, case_id)
            successful_cases += 1
        except IndicatorDivisionError as e:  # noqa: PERF203
            last_error = e
            continue

    if len(case_ids) > 0 and successful_cases == 0 and last_error is not None:
        raise last_error

    return safe_division(total_active_time, successful_cases) if successful_cases > 0 else pd.Timedelta(0)

expected_activity_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float

The expected number of activities that occur in a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_activity_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float:
    """
    The expected number of activities that occur in a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    sum_of_activities_counts = 0
    for case_id in case_ids:
        sum_of_activities_counts += time_cases_indicators.activity_count(event_log, case_id)

    numerator = sum_of_activities_counts
    denominator = general_groups_indicators.case_count(event_log, case_ids)
    return safe_division(numerator, denominator)

expected_activity_instance_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float

The expected number of times that any activity is instantiated in a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_activity_instance_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float:
    """
    The expected number of times that any activity is instantiated in a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    numerator = activity_instance_count(event_log, case_ids)
    denominator = general_groups_indicators.case_count(event_log, case_ids)
    return safe_division(numerator, denominator)

expected_automated_activity_count(event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]) -> float

The expected number of automated activities that occur in a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_automated_activity_count(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]
) -> float:
    """
    The expected number of automated activities that occur in a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        automated_activities: The set of automated activities.

    """
    count = 0
    for case in case_ids:
        count += time_cases_indicators.automated_activity_count(event_log, case, automated_activities)

    numerator = count
    denominator = general_groups_indicators.case_count(event_log, case_ids)
    return safe_division(numerator, denominator)

expected_automated_activity_instance_count(event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]) -> float

The expected number of times that an automated activity is instantiated in a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_automated_activity_instance_count(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]
) -> float:
    """
    The expected number of times that an automated activity is instantiated in a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        automated_activities: The set of automated activities.

    """
    numerator = automated_activity_instance_count(event_log, case_ids, automated_activities)
    denominator = general_groups_indicators.case_count(event_log, case_ids)
    return safe_division(numerator, denominator)

expected_automated_activity_service_time(event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]) -> pd.Timedelta

The expected sum of elapsed times for all instantiations of automated activities in a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
automated_activities set[str]

The set of automated activities.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_automated_activity_service_time(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], automated_activities: set[str]
) -> pd.Timedelta:
    """
    The expected sum of elapsed times for all instantiations of automated activities in a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        automated_activities: The set of automated activities.

    """
    return safe_division(
        automated_activity_service_time(event_log, case_ids, automated_activities),
        general_groups_indicators.case_count(event_log, case_ids),
    )

expected_handover_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float

The expected number of times that a human resource associated with an activity instance differs from the human resource associated with the preceding activity instance within a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_handover_count(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float:
    """
    The expected number of times that a human resource associated with an activity instance
    differs from the human resource associated with the preceding activity instance
    within a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case IDs.

    """
    sum_of_handover_counts = 0
    for case_id in case_ids:
        sum_of_handover_counts += time_cases_indicators.handover_count(event_log, case_id)
    return safe_division(sum_of_handover_counts, general_groups_indicators.case_count(event_log, case_ids))

expected_idle_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta

The expected sum of waiting times for every activity instance in a case belonging to the group of cases where no other activity instance was being executed.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_idle_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta:
    """
    The expected sum of waiting times for every activity instance in a case belonging
    to the group of cases where no other activity instance was being executed.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    total_idle_time: pd.Timedelta = pd.Timedelta(0)
    successful_cases = 0
    last_error: IndicatorDivisionError | None = None

    for case_id in case_ids:
        try:
            total_idle_time += time_cases_indicators.idle_time(event_log, case_id)
            successful_cases += 1
        except IndicatorDivisionError as e:  # noqa: PERF203
            last_error = e
            continue

    if len(case_ids) > 0 and successful_cases == 0 and last_error is not None:
        raise last_error

    return safe_division(total_idle_time, successful_cases) if successful_cases > 0 else pd.Timedelta(0)

expected_lead_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta

The expected total elapsed time between the earliest and latest timestamps in a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_lead_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta:
    """
    The expected total elapsed time between the earliest and latest timestamps in a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    group_total_lead_times_in_minutes = sum(
        time_cases_indicators.lead_time(event_log, case_id) / pd.Timedelta(minutes=1) for case_id in case_ids
    )
    case_count = general_groups_indicators.case_count(event_log, case_ids)
    expected_lead_time_in_minutes = safe_division(group_total_lead_times_in_minutes, case_count)
    return pd.Timedelta(minutes=expected_lead_time_in_minutes)

expected_lead_time_deviation_from_deadline(event_log: pd.DataFrame, case_ids: list[str] | set[str], deadline_margin: pd.Timedelta) -> pd.Timedelta

The difference between the time that a case in the group of cases is expected to take, and the actual elapsed time between its earliest and latest timestamps.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
deadline_margin Timedelta

The margin of error for the deadline.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_lead_time_deviation_from_deadline(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], deadline_margin: pd.Timedelta
) -> pd.Timedelta:
    """
    The difference between the time that a case in the group of cases is expected to take,
    and the actual elapsed time between its earliest and latest timestamps.

    Args:
        event_log: The event log.
        case_ids: The case ids.
        deadline_margin: The margin of error for the deadline.

    """
    # TODO: CHECK DEADLINE TYPE
    deviations_from_deadline_in_minutes = 0
    for case_id in case_ids:
        deviations_from_deadline_in_minutes += time_cases_indicators.lead_time_deviation_from_deadline(
            event_log, case_id, deadline_margin
        ) / pd.Timedelta(minutes=1)

    numerator = deviations_from_deadline_in_minutes
    denominator = general_groups_indicators.case_count(event_log, case_ids)
    expected_deviation_from_deadline_in_minutes = safe_division(numerator, denominator)
    return pd.Timedelta(minutes=expected_deviation_from_deadline_in_minutes)

expected_lead_time_deviation_from_expectation(event_log: pd.DataFrame, case_ids: list[str] | set[str], expectation: pd.Timedelta) -> pd.Timedelta

The absolute value of the difference between the time that a case in the group of cases is expected to take, and the actual elapsed time between its earliest and latest timestamps.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
expectation Timedelta

The time delta the case is expected to take.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_lead_time_deviation_from_expectation(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], expectation: pd.Timedelta
) -> pd.Timedelta:
    """
    The absolute value of the difference between the time that a case in the group of cases
    is expected to take, and the actual elapsed time between its earliest and latest timestamps.

    Args:
        event_log: The event log.
        case_ids: The case ids.
        expectation: The time delta the case is expected to take.

    """
    deviations_from_expectation_in_minutes = 0
    for case_id in case_ids:
        deviations_from_expectation_in_minutes += time_cases_indicators.lead_time_deviation_from_expectation(
            event_log, case_id, expectation
        ) / pd.Timedelta(minutes=1)

    numerator = deviations_from_expectation_in_minutes
    denominator = general_groups_indicators.case_count(event_log, case_ids)
    expected_deviation_from_expectation_in_minutes = safe_division(numerator, denominator)
    return pd.Timedelta(minutes=expected_deviation_from_expectation_in_minutes)

expected_lead_time_from_activity_a(event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str) -> pd.Timedelta

The total elapsed time between the earliest instantiations of a specific activity, and the latest activity instance, that is expected for a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
activity_a str

The specific activity name.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_lead_time_from_activity_a(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str
) -> pd.Timedelta:
    """
    The total elapsed time between the earliest instantiations of a specific activity, and
    the latest activity instance, that is expected for a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.
        activity_a: The specific activity name.

    """
    total_lead_time: pd.Timedelta = pd.Timedelta(0)
    for case_id in case_ids:
        lead_time = time_cases_indicators.lead_time_from_activity_a(event_log, case_id, activity_a)
        if lead_time is not None:
            total_lead_time += lead_time

    case_count = len({case_id for case_id in case_ids if cases_activities_utils.fi_s(event_log, case_id, activity_a)})

    return safe_division(total_lead_time, case_count)

expected_lead_time_from_activity_a_to_b(event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str, activity_b: str) -> pd.Timedelta

The total elapsed time between the earliest instantiations of a specific activity, and the earliest instantiations of another specific activity that precedes the other, that is expected for a case belonging to the group of cases. Here "activity a precedes activity b".

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
activity_a str

The specific activity name that precedes activity b.

required
activity_b str

The specific activity name that follows activity a.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_lead_time_from_activity_a_to_b(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str, activity_b: str
) -> pd.Timedelta:
    """
    The total elapsed time between the earliest instantiations of a specific activity, and the earliest
    instantiations of another specific activity that precedes the other, that is expected for a case
    belonging to the group of cases. Here "activity a precedes activity b".

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        activity_a: The specific activity name that precedes activity b.
        activity_b: The specific activity name that follows activity a.

    """
    total_lead_time: pd.Timedelta = pd.Timedelta(0)

    for case_id in case_ids:
        lead_time = time_cases_indicators.lead_time_from_activity_a_to_b(event_log, case_id, activity_a, activity_b)
        if lead_time is not None:
            total_lead_time += lead_time

    case_count = len(
        {case_id for case_id in case_ids if cases_activities_utils.fi(event_log, case_id, activity_a, activity_b)}
    )
    return safe_division(total_lead_time, case_count)

expected_lead_time_to_activity_a(event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str) -> pd.Timedelta

The total elapsed time between the earliest activity instance, and the earliest instantiations of a specific activity , that is expected for a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
activity_a str

The specific activity name.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_lead_time_to_activity_a(
    event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str
) -> pd.Timedelta:
    """
    The total elapsed time between the earliest activity instance, and the earliest instantiations of a
    specific activity , that is expected for a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        activity_a: The specific activity name.

    """
    total_lead_time: pd.Timedelta = pd.Timedelta(0)
    for case_id in case_ids:
        lead_time = time_cases_indicators.lead_time_to_activity_a(event_log, case_id, activity_a)
        if lead_time is not None:
            total_lead_time += lead_time
    case_count = len({case_id for case_id in case_ids if cases_activities_utils.fi_c(event_log, case_id, activity_a)})
    return safe_division(total_lead_time, case_count)

expected_service_and_lead_time_ratio(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float

The ratio between the sum of elapsed times between the start and complete events of all activity instances of the group of cases, and the expected total elapsed time between the earliest and latest timestamps in a case belonging to the group of cases

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_service_and_lead_time_ratio(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float:
    """
    The ratio between the sum of elapsed times between the start and complete events of all activity instances
    of the group of cases, and the expected total elapsed time between the earliest and latest timestamps in a case
    belonging to the group of cases

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    return safe_division(service_time(event_log, case_ids), expected_lead_time(event_log, case_ids))

expected_service_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta

The expected sum of elapsed times between the start and complete events of all activity instances of a case belonging to the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_service_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta:
    """
    The expected sum of elapsed times between the start and complete events of all activity
    instances of a case belonging to the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    group_service_time_in_minutes = service_time(event_log, case_ids) / pd.Timedelta(minutes=1)
    case_count = general_groups_indicators.case_count(event_log, case_ids)
    expected_service_time_in_minutes = safe_division(group_service_time_in_minutes, case_count)
    return pd.Timedelta(minutes=expected_service_time_in_minutes)

expected_service_time_from_activity_a_to_b(event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str, activity_b: str, time_aggregation_mode: Literal['s', 'c', 'sc', 'w']) -> pd.Timedelta

The expected sum of elapsed times between the start and complete events of all activity instances of every case in the group of cases, which occur between the earliest instantiations of a specific activity, and the earliest instantiations of another specific activity that precedes the other, in each case. Here "activity a precedes activity b".

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
activity_a str

The specific activity name that precedes activity b.

required
activity_b str

The specific activity name that follows activity a.

required
time_aggregation_mode Literal['s', 'c', 'sc', 'w']

The aggregation mode. "s": Considers activity instances that were started within the start and end activity instances. "c": Considers activity instances that were completed within the start and end activity instances. "sc": Considers activity instances that were either started or completed within the start and end activity instances. "w": Considers all activity instances that were active within the start and end activity instances.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_service_time_from_activity_a_to_b(
    event_log: pd.DataFrame,
    case_ids: list[str] | set[str],
    activity_a: str,
    activity_b: str,
    time_aggregation_mode: Literal["s", "c", "sc", "w"],
) -> pd.Timedelta:
    """
    The expected sum of elapsed times between the start and complete events of all activity instances of every case in the group of cases,
    which occur between the earliest instantiations of a specific activity, and the earliest instantiations of another specific
    activity that precedes the other, in each case. Here "activity a precedes activity b".

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        activity_a: The specific activity name that precedes activity b.
        activity_b: The specific activity name that follows activity a.
        time_aggregation_mode: The aggregation mode.
            "s": Considers activity instances that were started within the start and end activity instances.
            "c": Considers activity instances that were completed within the start and end activity instances.
            "sc": Considers activity instances that were either started or completed within the start and end activity instances.
            "w": Considers all activity instances that were active within the start and end activity instances.

    """
    cases_count = len(
        {case_id for case_id in case_ids if cases_activities_utils.fi(event_log, case_id, activity_a, activity_b)}
    )

    return safe_division(
        service_time_from_activity_a_to_b(event_log, case_ids, activity_a, activity_b, time_aggregation_mode),
        cases_count,
    )

expected_waiting_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta

The expected sum, for every activity instance in a case belonging to the group of cases, of the elapsed time between the complete event of the activity instance that precedes it, and its start event.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_waiting_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta:
    """
    The expected sum, for every activity instance in a case belonging to the group of cases, of the
    elapsed time between the complete event of the activity instance that precedes it, and its start event.

    Args:
        event_log: The event log.
        case_ids: The case IDs.

    """
    sum_of_waiting_times: pd.Timedelta = pd.Timedelta(0)
    for case_id in case_ids:
        sum_of_waiting_times += time_cases_indicators.waiting_time(event_log, case_id)
    return safe_division(sum_of_waiting_times, general_groups_indicators.case_count(event_log, case_ids))

expected_waiting_time_from_activity_a_to_b(event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str, activity_b: str, time_aggregation_mode: Literal['s', 'c', 'sc', 'w']) -> pd.Timedelta

The expected sum, for every activity instance in a case belonging to the group of cases, that occurs between the earliest instantiations of a specific activity, and the earliest instantiations of another specific activity that precedes the other, of the elapsed time between the complete event of the activity instance that precedes it, and its start event.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
activity_a str

The specific activity name that precedes activity b.

required
activity_b str

The specific activity name that follows activity a.

required
time_aggregation_mode Literal['s', 'c', 'sc', 'w']

The aggregation mode. "s": Considers activity instances that were started within the start and end activity instances. "c": Considers activity instances that were completed within the start and end activity instances. "sc": Considers activity instances that were either started or completed within the start and end activity instances. "w": Considers all activity instances that were active within the start and end activity instances.

required
Source code in process_performance_indicators/indicators/time/groups.py
def expected_waiting_time_from_activity_a_to_b(
    event_log: pd.DataFrame,
    case_ids: list[str] | set[str],
    activity_a: str,
    activity_b: str,
    time_aggregation_mode: Literal["s", "c", "sc", "w"],
) -> pd.Timedelta:
    """
    The expected sum, for every activity instance in a case belonging to the group of cases, that occurs between the earliest
    instantiations of a specific activity, and the earliest instantiations of another specific activity
    that precedes the other, of the elapsed time between the complete event of the activity instance that precedes it,
    and its start event.

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        activity_a: The specific activity name that precedes activity b.
        activity_b: The specific activity name that follows activity a.
        time_aggregation_mode: The aggregation mode.
            "s": Considers activity instances that were started within the start and end activity instances.
            "c": Considers activity instances that were completed within the start and end activity instances.
            "sc": Considers activity instances that were either started or completed within the start and end activity instances.
            "w": Considers all activity instances that were active within the start and end activity instances.

    """
    sum_of_waiting_times: pd.Timedelta = pd.Timedelta(0)
    for case_id in case_ids:
        waiting_time = time_cases_indicators.waiting_time_from_activity_a_to_b(
            event_log, case_id, activity_a, activity_b, time_aggregation_mode
        )
        if waiting_time is not None:
            sum_of_waiting_times += waiting_time

    case_counts = len(
        {case_id for case_id in case_ids if cases_activities_utils.fi(event_log, case_id, activity_a, activity_b)}
    )

    return safe_division(sum_of_waiting_times, case_counts)

lead_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta

The total elpased time between the earliest and latest eents in the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def lead_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta:
    """
    The total elpased time between the earliest and latest eents in the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    latest_end_time = max(cases_utils.endt(event_log, case_id) for case_id in case_ids)
    earliest_start_time = min(cases_utils.startt(event_log, case_id) for case_id in case_ids)
    return latest_end_time - earliest_start_time

lead_time_and_case_count_ratio(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta

The ratio between the total elapsed time between the earliest and latest events in the group of cases, and the number of cases belonging to the group of cases. Returns hours per case.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def lead_time_and_case_count_ratio(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta:
    """
    The ratio between the total elapsed time between the earliest and latest events in the group of cases,
    and the number of cases belonging to the group of cases. Returns hours per case.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    group_lead_time_in_hours = lead_time(event_log, case_ids) / pd.Timedelta(hours=1)
    case_count = general_groups_indicators.case_count(event_log, case_ids)
    hours_per_case = safe_division(group_lead_time_in_hours, case_count)
    return pd.Timedelta(hours=hours_per_case)

service_and_lead_time_ratio(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float

The ratio between the sum of elapsed times between the start and complete events of all activity instances of the group of cases, and the total elapsed time between the earliest and latest events in the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def service_and_lead_time_ratio(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> float:
    """
    The ratio between the sum of elapsed times between the start and complete events of all
    activity instances of the group of cases, and the total elapsed time between the earliest and latest
    events in the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    return safe_division(service_time(event_log, case_ids), lead_time(event_log, case_ids))

service_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta

The sum of elapsed times between the start and complete events of all activity instances of the group of cases.

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case ids.

required
Source code in process_performance_indicators/indicators/time/groups.py
def service_time(event_log: pd.DataFrame, case_ids: list[str] | set[str]) -> pd.Timedelta:
    """
    The sum of elapsed times between the start and complete events of all activity
    instances of the group of cases.

    Args:
        event_log: The event log.
        case_ids: The case ids.

    """
    sum_of_service_times_in_minutes = 0
    for case_id in case_ids:
        sum_of_service_times_in_minutes += time_cases_indicators.service_time(event_log, case_id) / pd.Timedelta(
            minutes=1
        )
    return pd.Timedelta(minutes=sum_of_service_times_in_minutes)

service_time_from_activity_a_to_b(event_log: pd.DataFrame, case_ids: list[str] | set[str], activity_a: str, activity_b: str, time_aggregation_mode: Literal['s', 'c', 'sc', 'w']) -> pd.Timedelta

The sum of elapsed times between the start and complete events of all activity instances of every case in the group of cases, which occur between the earliest instantiations of a specific activity, and the earliest instantiations of another specific activity that precedes the other, in each case. Here "activity a precedes activity b".

Parameters:

Name Type Description Default
event_log DataFrame

The event log.

required
case_ids list[str] | set[str]

The case IDs.

required
activity_a str

The specific activity name that precedes activity b.

required
activity_b str

The specific activity name that follows activity a.

required
time_aggregation_mode Literal['s', 'c', 'sc', 'w']

The aggregation mode. "s": Considers activity instances that were started within the start and end activity instances. "c": Considers activity instances that were completed within the start and end activity instances. "sc": Considers activity instances that were either started or completed within the start and end activity instances. "w": Considers all activity instances that were active within the start and end activity instances.

required
Source code in process_performance_indicators/indicators/time/groups.py
def service_time_from_activity_a_to_b(
    event_log: pd.DataFrame,
    case_ids: list[str] | set[str],
    activity_a: str,
    activity_b: str,
    time_aggregation_mode: Literal["s", "c", "sc", "w"],
) -> pd.Timedelta:
    """
    The sum of elapsed times between the start and complete events of all activity instances of every case in the group of cases,
    which occur between the earliest instantiations of a specific activity, and the earliest instantiations of another specific
    activity that precedes the other, in each case. Here "activity a precedes activity b".

    Args:
        event_log: The event log.
        case_ids: The case IDs.
        activity_a: The specific activity name that precedes activity b.
        activity_b: The specific activity name that follows activity a.
        time_aggregation_mode: The aggregation mode.
            "s": Considers activity instances that were started within the start and end activity instances.
            "c": Considers activity instances that were completed within the start and end activity instances.
            "sc": Considers activity instances that were either started or completed within the start and end activity instances.
            "w": Considers all activity instances that were active within the start and end activity instances.

    """
    sum_of_service_times: pd.Timedelta = pd.Timedelta(0)
    for case_id in case_ids:
        service_time = time_cases_indicators.service_time_from_activity_a_to_b(
            event_log, case_id, activity_a, activity_b, time_aggregation_mode
        )
        if service_time is not None:
            sum_of_service_times += service_time

    return sum_of_service_times