Skip to content

dev_cron.erl - Cron Scheduling Device

Overview

Purpose: Schedule messages for passive process self-calling
Module: dev_cron
Device: cron@1.0
Pattern: Schedule → Execute → Repeat (optional)

This device inserts new messages into the schedule, allowing processes to passively "call" themselves without user interaction. It supports both one-time and recurring message scheduling.

Dependencies

  • HyperBEAM: hb_ao, hb_message, hb_name, hb_tracer
  • Devices: dev_meta
  • Erlang/OTP: timer
  • Testing: eunit
  • Includes: include/hb.hrl

Public Functions Overview

%% Device Info
-spec info(Opts) -> DeviceInfo.
-spec info(Msg1, Msg2, Opts) -> {ok, InfoResponse}.
 
%% Scheduling Functions
-spec once(Msg1, Msg2, Opts) -> {ok, TaskID} | {error, Reason}.
-spec every(Msg1, Msg2, Opts) -> {ok, TaskID} | {error, Reason}.
-spec stop(Msg1, Msg2, Opts) -> {ok, Response} | {error, Reason}.

Public Functions

1. info/1, info/3

-spec info(Opts) -> DeviceInfo
    when
        Opts :: map(),
        DeviceInfo :: #{exports => [binary()]}.
 
-spec info(Msg1, Msg2, Opts) -> {ok, InfoResponse}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        InfoResponse :: #{
            <<"status">> => integer(),
            <<"body">> => map()
        }.

Description: Get device information including version, description, and available paths.

Test Code:
-module(dev_cron_info_test).
-include_lib("eunit/include/eunit.hrl").
 
info_exports_test() ->
    Info = dev_cron:info(#{}),
    ?assert(maps:is_key(exports, Info)),
    Exports = maps:get(exports, Info),
    ?assert(lists:member(<<"once">>, Exports)),
    ?assert(lists:member(<<"every">>, Exports)),
    ?assert(lists:member(<<"stop">>, Exports)).
 
info_detailed_test() ->
    {ok, Response} = dev_cron:info(#{}, #{}, #{}),
    ?assertEqual(200, maps:get(<<"status">>, Response)),
    Body = maps:get(<<"body">>, Response),
    ?assert(maps:is_key(<<"description">>, Body)),
    ?assert(maps:is_key(<<"version">>, Body)),
    ?assert(maps:is_key(<<"paths">>, Body)).

2. once/3

-spec once(Msg1, Msg2, Opts) -> {ok, TaskID} | {error, Reason}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        TaskID :: binary(),
        Reason :: binary().

Description: Schedule a one-time message execution. Spawns a worker process that executes the specified path once.

Required Parameters:
  • <<"cron-path">> - The path to execute
Returns:
  • {ok, TaskID} - Task scheduled successfully
  • {error, <<"No cron path found in message.">>} - Missing cron-path
Test Code:
-module(dev_cron_once_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
 
once_schedule_test() ->
    Msg2 = #{
        <<"cron-path">> => <<"/test/path">>,
        <<"data">> => <<"test">>
    },
    {ok, TaskID} = dev_cron:once(#{}, Msg2, #{}),
    ?assert(is_binary(TaskID)),
    % Task should be registered
    Name = {<<"cron@1.0">>, TaskID},
    Pid = hb_name:lookup(Name),
    ?assert(is_pid(Pid)).
 
once_missing_path_test() ->
    Msg2 = #{<<"data">> => <<"test">>},
    Result = dev_cron:once(#{}, Msg2, #{}),
    ?assertEqual({error, <<"No cron path found in message.">>}, Result).

3. every/3

-spec every(Msg1, Msg2, Opts) -> {ok, TaskID} | {error, Reason}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        TaskID :: binary(),
        Reason :: binary().

Description: Schedule a recurring message execution. Spawns a worker process that executes the specified path repeatedly at the given interval.

Required Parameters:
  • <<"cron-path">> - The path to execute
  • <<"interval">> - Time between executions (format: {amount}-{unit})

Interval Format: "N-unit" where unit is:

  • millisecond[s] - Milliseconds
  • second[s] - Seconds
  • minute[s] - Minutes
  • hour[s] - Hours
  • day[s] - Days
Returns:
  • {ok, TaskID} - Task scheduled successfully
  • {error, Reason} - Scheduling failed
Test Code:
-module(dev_cron_every_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
 
every_schedule_test() ->
    Msg2 = #{
        <<"cron-path">> => <<"/test/path">>,
        <<"interval">> => <<"500-milliseconds">>
    },
    {ok, TaskID} = dev_cron:every(#{}, Msg2, #{}),
    ?assert(is_binary(TaskID)),
    Name = {<<"cron@1.0">>, TaskID},
    Pid = hb_name:lookup(Name),
    ?assert(is_pid(Pid)).
 
every_missing_path_test() ->
    Msg2 = #{<<"interval">> => <<"1-second">>},
    Result = dev_cron:every(#{}, Msg2, #{}),
    ?assertEqual({error, <<"No cron path found in message.">>}, Result).
 
every_missing_interval_test() ->
    Msg2 = #{<<"cron-path">> => <<"/test">>},
    Result = dev_cron:every(#{}, Msg2, #{}),
    ?assertEqual({error, <<"No interval found in message.">>}, Result).
 
every_invalid_interval_test() ->
    Msg2 = #{
        <<"cron-path">> => <<"/test">>,
        <<"interval">> => <<"invalid">>
    },
    Result = dev_cron:every(#{}, Msg2, #{}),
    ?assertMatch({error, _}, Result).

4. stop/3

-spec stop(Msg1, Msg2, Opts) -> {ok, Response} | {error, Reason}
    when
        Msg1 :: map(),
        Msg2 :: map(),
        Opts :: map(),
        Response :: #{
            <<"status">> => integer(),
            <<"body">> => map()
        },
        Reason :: binary().

Description: Stop a scheduled task by its task ID. Kills the worker process and unregisters the task.

Required Parameters:
  • <<"task">> - The task ID to stop
Returns:
  • {ok, Response} - Task stopped successfully (status 200)
  • {error, <<"No task ID found in message.">>} - Missing task ID
  • {error, <<"Task not found.">>} - Task doesn't exist
Test Code:
-module(dev_cron_stop_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
 
stop_once_task_test() ->
    % Create once task
    {ok, TaskID} = dev_cron:once(
        #{},
        #{<<"cron-path">> => <<"/test">>},
        #{}
    ),
    
    % Stop it
    {ok, Response} = dev_cron:stop(
        #{},
        #{<<"task">> => TaskID},
        #{}
    ),
    ?assertEqual(200, maps:get(<<"status">>, Response)),
    
    % Verify unregistered
    Name = {<<"cron@1.0">>, TaskID},
    ?assertEqual(undefined, hb_name:lookup(Name)).
 
stop_every_task_test() ->
    % Create every task
    {ok, TaskID} = dev_cron:every(
        #{},
        #{
            <<"cron-path">> => <<"/test">>,
            <<"interval">> => <<"1-second">>
        },
        #{}
    ),
    
    % Verify running
    Name = {<<"cron@1.0">>, TaskID},
    Pid = hb_name:lookup(Name),
    ?assert(is_pid(Pid)),
    ?assert(erlang:is_process_alive(Pid)),
    
    % Stop it
    {ok, _} = dev_cron:stop(#{}, #{<<"task">> => TaskID}, #{}),
    
    % Verify stopped
    timer:sleep(100),
    ?assertEqual(undefined, hb_name:lookup(Name)),
    ?assertNot(erlang:is_process_alive(Pid)).
 
stop_nonexistent_task_test() ->
    FakeID = hb_util:human_id(crypto:strong_rand_bytes(32)),
    Result = dev_cron:stop(#{}, #{<<"task">> => FakeID}, #{}),
    ?assertEqual({error, <<"Task not found.">>}, Result).
 
stop_missing_task_id_test() ->
    Result = dev_cron:stop(#{}, #{}, #{}),
    ?assertEqual({error, <<"No task ID found in message.">>}, Result).

Internal Functions

once_worker/3

-spec once_worker(Path, Req, Opts) -> ok
    when
        Path :: binary(),
        Req :: map(),
        Opts :: map().

Description: Worker process for one-time execution. Calls the specified path via dev_meta:handle/2 then terminates.


every_worker_loop/4

-spec every_worker_loop(CronPath, Req, Opts, IntervalMillis) -> no_return()
    when
        CronPath :: binary(),
        Req :: map(),
        Opts :: map(),
        IntervalMillis :: integer().

Description: Worker loop for recurring execution. Executes path, sleeps for interval, then recurses indefinitely.

Loop:
  1. Execute path via dev_meta:handle/2
  2. Sleep for IntervalMillis
  3. Recurse (tail-call optimized)

parse_time/1

-spec parse_time(BinString) -> Milliseconds
    when
        BinString :: binary(),
        Milliseconds :: integer().

Description: Parse a time string into milliseconds.

Format: "{Amount}-{Unit}"

Examples:
parse_time(<<"100-milliseconds">>) → 100
parse_time(<<"5-seconds">>) → 5000
parse_time(<<"10-minutes">>) → 600000
parse_time(<<"2-hours">>) → 7200000
parse_time(<<"1-day">>) → 86400000

Note: This is an internal function that is not exported. It is tested indirectly through the every/3 function tests.


Common Patterns

%% Schedule one-time execution
{ok, TaskID} = dev_cron:once(
    #{},
    #{
        <<"cron-path">> => <<"/process/notify">>,
        <<"message">> => <<"Hello">>
    },
    #{}
).
 
%% Schedule recurring execution
{ok, TaskID} = dev_cron:every(
    #{},
    #{
        <<"cron-path">> => <<"/process/heartbeat">>,
        <<"interval">> => <<"30-seconds">>
    },
    #{}
).
 
%% Stop a task
{ok, Response} = dev_cron:stop(
    #{},
    #{<<"task">> => TaskID},
    #{}
).
 
%% Different interval formats
dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/path">>,
    <<"interval">> => <<"500-milliseconds">>
}, #{}).
 
dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/path">>,
    <<"interval">> => <<"5-minutes">>
}, #{}).
 
dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/path">>,
    <<"interval">> => <<"1-hour">>
}, #{}).
 
%% Via HTTP endpoints
GET /~cron@1.0/once?cron-path=/notify&data=test
GET /~cron@1.0/every?cron-path=/heartbeat&interval=1-minute
GET /~cron@1.0/stop?task={task-id}
GET /~cron@1.0/info

Task Management

Task Registration

Tasks are registered in the name system:

Name = {<<"cron@1.0">>, TaskID}
hb_name:register(Name, WorkerPid)

Task Lookup

Name = {<<"cron@1.0">>, TaskID},
Pid = hb_name:lookup(Name)

Task Termination

exit(Pid, kill),
hb_name:unregister(Name)

Execution Model

Once Task

Create → Spawn Worker → Execute Path → Terminate

Every Task

Create → Spawn Worker → Execute Path → Sleep → Execute Path → Sleep → ...

Error Handling

Both task types catch and log execution errors:

try
    dev_meta:handle(Opts, Req)
catch
    Class:Reason:Stacktrace ->
        ?event({cron_worker_error, {error, Class, Reason, Stacktrace}})
end

Important: Errors don't stop the every loop - it continues executing.


Tracing

Both task types start with tracing enabled:

TracePID = hb_tracer:start_trace(),
Opts = Opts#{trace => TracePID}

This allows execution monitoring via the tracer system.


Use Cases

1. Periodic Health Checks

dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/process/health-check">>,
    <<"interval">> => <<"5-minutes">>
}, #{}).

2. Scheduled Notifications

dev_cron:once(#{}, #{
    <<"cron-path">> => <<"/notify/send">>,
    <<"recipient">> => <<"user123">>,
    <<"message">> => <<"Your task is ready">>
}, #{}).

3. Data Cleanup

dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/storage/cleanup">>,
    <<"interval">> => <<"1-day">>
}, #{}).

4. Heartbeat/Keep-Alive

dev_cron:every(#{}, #{
    <<"cron-path">> => <<"/connection/heartbeat">>,
    <<"interval">> => <<"30-seconds">>
}, #{}).

5. Delayed Execution

% Execute once after 1 hour
% (Use timer:sleep before calling once/3, or use interval + stop)

Event Monitoring

Events Emitted

% Execution events
?event({cron_once_worker_executed, {path, Path}})
?event({cron_every_worker_executing, {path, Path}})
?event({cron_every_worker_executed, {path, Path}})
 
% Error events
?event(cron_error, {cron_worker_error, {error, Class, Reason, Stack}})
 
% Stop events
?event({cron_stopping_task, {task_id, TaskID}, {pid, Pid}})
?event({cron_stop_lookup_error, {task_id, TaskID}, {error, Error}})

Interval Units

Supported time units (with plural support):

UnitMultiplierExample
millisecond(s)1100-milliseconds
second(s)1,00030-seconds
minute(s)60,0005-minutes
hour(s)3,600,0002-hours
day(s)86,400,0001-day

Note: Unit matching is prefix-based, so both singular and plural forms work.


References

  • Meta Device - dev_meta.erl
  • Name System - hb_name.erl
  • Tracer - hb_tracer.erl
  • AO Core - hb_ao.erl
  • Message System - hb_message.erl

Notes

  1. Passive Execution: Processes call themselves without external triggers
  2. Two Modes: One-time (once) and recurring (every)
  3. Task IDs: Based on request message ID
  4. Name Registration: Tasks registered in global name system
  5. Process-Based: Each task gets its own worker process
  6. Stoppable: All tasks can be stopped via task ID
  7. Error Resilient: Every tasks continue after errors
  8. Tracing Support: Built-in trace support for monitoring
  9. Flexible Intervals: Multiple time units supported
  10. No Persistence: Tasks don't survive node restarts
  11. Simple API: Three main operations (once, every, stop)
  12. Path-Based: Executes any device path
  13. Meta Integration: Uses dev_meta for execution
  14. Clean Termination: Unregisters names on stop
  15. HTTP-Friendly: Designed for HTTP API usage