/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #pragma ident "%Z%%M% %I% %E% SMI" /* * Domain Services Module * * The Domain Services (DS) module is responsible for communication * with external service entities. It provides an API for clients to * publish capabilities and handles the low level communication and * version negotiation required to export those capabilites to any * interested service entity. Once a capability has been successfully * registered with a service entity, the DS module facilitates all * data transfers between the service entity and the client providing * that particular capability. */ #include #include #include #include #include #include #include #include #include #include #include /* * All DS ports in the system * * The list of DS ports is read in from the MD when the DS module is * initialized and is never modified. This eliminates the need for * locking to access the port array itself. Access to the individual * ports are synchronized at the port level. */ static ds_port_t ds_ports[DS_MAX_PORTS]; static ds_portset_t ds_allports; /* all DS ports in the system */ /* * Table of registered services * * Locking: Accesses to the table of services are sychronized using * a RW lock. The reader lock must be held when looking up service * information in the table. The writer lock must be held when any * service information is being modified. */ static struct ds_svcs { ds_svc_t **tbl; /* the table itself */ krwlock_t rwlock; /* table lock */ uint_t maxsvcs; /* size of the table */ uint_t nsvcs; /* current number of items */ } ds_svcs; /* initial size of the table */ #define DS_MAXSVCS_INIT 32 /* * Taskq for internal task processing */ static taskq_t *ds_taskq; static boolean_t ds_enabled; /* enable/disable taskq processing */ /* * The actual required number of parallel threads is not expected * to be very large. Use the maximum number of CPUs in the system * as a rough upper bound. */ #define DS_MAX_TASKQ_THR NCPU #define DS_DISPATCH(fn, arg) taskq_dispatch(ds_taskq, fn, arg, TQ_SLEEP) /* * Supported versions of the DS message protocol * * The version array must be sorted in order from the highest * supported version to the lowest. Support for a particular * . version implies all lower minor versions of * that same major version are supported as well. */ static ds_ver_t ds_vers[] = { { 1, 0 } }; #define DS_NUM_VER (sizeof (ds_vers) / sizeof (ds_vers[0])) /* * Results of checking version array with ds_vers_isvalid() */ typedef enum { DS_VERS_OK, DS_VERS_INCREASING_MAJOR_ERR, DS_VERS_INCREASING_MINOR_ERR } ds_vers_check_t; /* incoming message handling functions */ typedef void (*ds_msg_handler_t)(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len); static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len); /* * DS Message Handler Dispatch Table * * A table used to dispatch all incoming messages. This table * contains handlers for all the fixed message types, as well as * the the messages defined in the 1.0 version of the DS protocol. */ static const ds_msg_handler_t ds_msg_handlers[] = { ds_handle_init_req, /* DS_INIT_REQ */ ds_handle_init_ack, /* DS_INIT_ACK */ ds_handle_init_nack, /* DS_INIT_NACK */ ds_handle_reg_req, /* DS_REG_REQ */ ds_handle_reg_ack, /* DS_REG_ACK */ ds_handle_reg_nack, /* DS_REG_NACK */ ds_handle_unreg_req, /* DS_UNREG */ ds_handle_unreg_ack, /* DS_UNREG_ACK */ ds_handle_unreg_nack, /* DS_UNREG_NACK */ ds_handle_data, /* DS_DATA */ ds_handle_nack /* DS_NACK */ }; /* * DS message log * * Locking: The message log is protected by a single mutex. This * protects all fields in the log structure itself as well as * everything in the entry structures on both the log and the * free list. */ static struct log { ds_log_entry_t *head; /* head of the log */ ds_log_entry_t *freelist; /* head of the free list */ size_t size; /* size of the log in bytes */ uint32_t nentry; /* number of entries */ kmutex_t lock; /* log lock */ } ds_log; /* log soft limit */ uint_t ds_log_sz = DS_LOG_DEFAULT_SZ; /* initial pool of log entry structures */ static ds_log_entry_t ds_log_entry_pool[DS_LOG_NPOOL]; /* * Debugging Features */ #ifdef DEBUG #define DS_DBG_FLAG_LDC 0x1 #define DS_DBG_FLAG_LOG 0x2 #define DS_DBG_FLAG_ALL 0xf #define DS_DBG if (ds_debug) printf #define DS_DBG_LDC if (ds_debug & DS_DBG_FLAG_LDC) printf #define DS_DBG_LOG if (ds_debug & DS_DBG_FLAG_LOG) printf #define DS_DUMP_LDC_MSG(buf, len) ds_dump_ldc_msg(buf, len) uint_t ds_debug = 0; static void ds_dump_ldc_msg(void *buf, size_t len); #else /* DEBUG */ #define DS_DBG _NOTE(CONSTCOND) if (0) printf #define DS_DBG_LDC DS_DBG #define DS_DBG_LOG DS_DBG #define DS_DUMP_LDC_MSG(buf, len) #endif /* DEBUG */ /* initialization functions */ static void ds_init(void); static void ds_fini(void); static int ds_ports_init(void); static int ds_ports_fini(void); static int ds_ldc_init(ds_port_t *port); static int ds_ldc_fini(ds_port_t *port); /* event processing functions */ static uint_t ds_ldc_cb(uint64_t event, caddr_t arg); static void ds_dispatch_event(void *arg); static void ds_handle_ldc_event(ds_port_t *port, int newstate); static int ds_recv_msg(ldc_handle_t ldc_hdl, caddr_t msgp, size_t msglen); static void ds_handle_recv(void *arg); /* message sending functions */ static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen); static void ds_send_init_req(ds_port_t *port); static int ds_send_reg_req(ds_svc_t *svc); static int ds_send_unreg_req(ds_svc_t *svc); static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl); static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl); /* walker functions */ typedef int (*svc_cb_t)(ds_svc_t *svc, void *arg); static int ds_walk_svcs(svc_cb_t svc_cb, void *arg); static int ds_svc_isfree(ds_svc_t *svc, void *arg); static int ds_svc_ismatch(ds_svc_t *svc, void *arg); static int ds_svc_free(ds_svc_t *svc, void *arg); static int ds_svc_register(ds_svc_t *svc, void *arg); static int ds_svc_unregister(ds_svc_t *svc, void *arg); static int ds_svc_port_up(ds_svc_t *svc, void *arg); /* service utilities */ static ds_svc_t *ds_alloc_svc(void); static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port); static ds_svc_t *ds_get_svc(ds_svc_hdl_t hdl); /* port utilities */ static int ds_port_add(md_t *mdp, mde_cookie_t port, mde_cookie_t chan); static void ds_port_reset(ds_port_t *port); /* misc utilities */ static ds_vers_check_t ds_vers_isvalid(ds_ver_t *vers, int nvers); /* log functions */ static void ds_log_init(void); static void ds_log_fini(void); static int ds_log_add_msg(int32_t dest, uint8_t *msg, size_t sz); static int ds_log_remove(void); static void ds_log_purge(void *arg); static struct modlmisc modlmisc = { &mod_miscops, "Domain Services %I%" }; static struct modlinkage modlinkage = { MODREV_1, (void *)&modlmisc, NULL }; int _init(void) { int rv; /* * Perform all internal setup before initializing * the DS ports. This ensures that events can be * processed as soon as the port comes up. */ ds_init(); if ((rv = ds_ports_init()) != 0) { cmn_err(CE_WARN, "Domin Services initialization failed"); ds_fini(); return (rv); } if ((rv = mod_install(&modlinkage)) != 0) { (void) ds_ports_fini(); ds_fini(); } return (rv); } int _info(struct modinfo *modinfop) { return (mod_info(&modlinkage, modinfop)); } int _fini(void) { int rv; if ((rv = mod_remove(&modlinkage)) == 0) { (void) ds_ports_fini(); ds_fini(); } return (rv); } static void ds_init(void) { int tblsz; /* * Initialize table of registered service classes */ ds_svcs.maxsvcs = DS_MAXSVCS_INIT; tblsz = ds_svcs.maxsvcs * sizeof (ds_svc_t *); ds_svcs.tbl = kmem_zalloc(tblsz, KM_SLEEP); rw_init(&ds_svcs.rwlock, NULL, RW_DRIVER, NULL); ds_svcs.nsvcs = 0; /* * Initialize the message log. */ ds_log_init(); /* * Create taskq for internal processing threads. This * includes processing incoming request messages and * sending out of band registration messages. */ ds_taskq = taskq_create("ds_taskq", 1, minclsyspri, 1, DS_MAX_TASKQ_THR, TASKQ_PREPOPULATE | TASKQ_DYNAMIC); ds_enabled = B_TRUE; /* catch problems with the version array */ ASSERT(ds_vers_isvalid(ds_vers, DS_NUM_VER) == DS_VERS_OK); } static void ds_fini(void) { int idx; /* * Flip the enabled switch to make sure that no * incoming events get dispatched while things * are being torn down. */ ds_enabled = B_FALSE; /* * Destroy the taskq. */ taskq_destroy(ds_taskq); /* * Destroy the message log. */ ds_log_fini(); /* * Deallocate the table of registered services */ /* clear out all entries */ rw_enter(&ds_svcs.rwlock, RW_WRITER); idx = ds_walk_svcs(ds_svc_free, NULL); rw_exit(&ds_svcs.rwlock); /* should have gone through the whole table */ ASSERT(idx == ds_svcs.maxsvcs); /* destroy the table itself */ kmem_free(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *)); rw_destroy(&ds_svcs.rwlock); bzero(&ds_svcs, sizeof (ds_svcs)); } /* * Initialize the list of ports based on the MD. */ static int ds_ports_init(void) { int idx; int rv; md_t *mdp; int num_nodes; int listsz; mde_cookie_t rootnode; mde_cookie_t *portp = NULL; mde_cookie_t *chanp = NULL; int nport; int nchan; ds_port_t *port; if ((mdp = md_get_handle()) == NULL) { cmn_err(CE_WARN, "unable to initialize machine description"); return (-1); } num_nodes = md_node_count(mdp); ASSERT(num_nodes > 0); listsz = num_nodes * sizeof (mde_cookie_t); /* allocate temporary storage for MD scans */ portp = kmem_zalloc(listsz, KM_SLEEP); chanp = kmem_zalloc(listsz, KM_SLEEP); rootnode = md_root_node(mdp); ASSERT(rootnode != MDE_INVAL_ELEM_COOKIE); /* find all the DS ports in the MD */ nport = md_scan_dag(mdp, rootnode, md_find_name(mdp, DS_MD_PORT_NAME), md_find_name(mdp, "fwd"), portp); if (nport <= 0) { cmn_err(CE_NOTE, "No '%s' nodes in MD", DS_MD_PORT_NAME); rv = -1; goto done; } /* * Initialize all the ports found in the MD. */ for (idx = 0; idx < nport; idx++) { /* get the channels for this port */ nchan = md_scan_dag(mdp, portp[idx], md_find_name(mdp, DS_MD_CHAN_NAME), md_find_name(mdp, "fwd"), chanp); if (nchan <= 0) { cmn_err(CE_NOTE, "No '%s' node for DS port", DS_MD_CHAN_NAME); rv = -1; goto done; } /* expecting only one channel */ if (nchan != 1) { DS_DBG("expected 1 '%s' node for DS port, found %d\n", DS_MD_CHAN_NAME, nchan); } if (ds_port_add(mdp, portp[idx], chanp[0]) != 0) { rv = -1; goto done; } } /* * Initialize the LDC channel for each port. */ for (idx = 0; idx < DS_MAX_PORTS; idx++) { if (!DS_PORT_IN_SET(ds_allports, idx)) continue; port = &ds_ports[idx]; mutex_enter(&port->lock); if (ds_ldc_init(port)) { cmn_err(CE_WARN, "ds@%lx: ports_init: failed to " "initialize LDC %ld", port->id, port->ldc.id); } else { DS_DBG("ds@%lx: ports_init: initialization complete\n", port->id); } mutex_exit(&port->lock); } rv = 0; done: if (rv != 0) (void) ds_ports_fini(); kmem_free(portp, listsz); kmem_free(chanp, listsz); (void) md_fini_handle(mdp); return (rv); } static int ds_ports_fini(void) { int idx; ds_port_t *port; /* * Tear down each initialized port. */ for (idx = 0; idx < DS_MAX_PORTS; idx++) { if (!DS_PORT_IN_SET(ds_allports, idx)) continue; port = &ds_ports[idx]; mutex_enter(&port->lock); if (port->state >= DS_PORT_LDC_INIT) { /* shut down the LDC for this port */ (void) ds_ldc_fini(port); } port->state = DS_PORT_FREE; mutex_exit(&port->lock); /* clean up the port structure */ mutex_destroy(&port->lock); DS_PORTSET_DEL(ds_allports, idx); } return (0); } static int ds_ldc_init(ds_port_t *port) { int rv; ldc_attr_t ldc_attr; caddr_t cb_arg = (caddr_t)port; ASSERT(MUTEX_HELD(&port->lock)); DS_DBG("ds@%lx: ldc_init: ldc_id=%ld\n", port->id, port->ldc.id); ldc_attr.devclass = LDC_DEV_GENERIC; ldc_attr.instance = 0; ldc_attr.mode = LDC_MODE_STREAM; ldc_attr.mtu = DS_STREAM_MTU; if ((rv = ldc_init(port->ldc.id, &ldc_attr, &port->ldc.hdl)) != 0) { cmn_err(CE_WARN, "ds@%lx: ldc_init: ldc_init error (%d)", port->id, rv); goto done; } /* register the LDC callback */ if ((rv = ldc_reg_callback(port->ldc.hdl, ds_ldc_cb, cb_arg)) != 0) { cmn_err(CE_WARN, "ds@%lx: dc_init: ldc_reg_callback error " "(%d)", port->id, rv); goto done; } if ((rv = ldc_open(port->ldc.hdl)) != 0) { cmn_err(CE_WARN, "ds@%lx: ldc_init: ldc_open error (%d)", port->id, rv); goto done; } (void) ldc_up(port->ldc.hdl); (void) ldc_status(port->ldc.hdl, &port->ldc.state); DS_DBG_LDC("ds@%lx: ldc_init: initial LDC state 0x%x\n", port->id, port->ldc.state); port->state = DS_PORT_LDC_INIT; /* if port is up, send init message */ if (port->ldc.state == LDC_UP) { ds_send_init_req(port); } done: return (rv); } static int ds_ldc_fini(ds_port_t *port) { int rv; ASSERT(port->state >= DS_PORT_LDC_INIT); DS_DBG("ds@%lx: ldc_fini: ldc_id=%ld\n", port->id, port->ldc.id); if ((rv = ldc_close(port->ldc.hdl)) != 0) { cmn_err(CE_WARN, "ds@%lx: ldc_fini: ldc_close error (%d)", port->id, rv); return (rv); } if ((rv = ldc_unreg_callback(port->ldc.hdl)) != 0) { cmn_err(CE_WARN, "ds@%lx: ldc_fini: ldc_unreg_callback error " "(%d)", port->id, rv); return (rv); } if ((rv = ldc_fini(port->ldc.hdl)) != 0) { cmn_err(CE_WARN, "ds@%lx: ldc_fini: ldc_fini error (%d)", port->id, rv); return (rv); } return (rv); } /* * A DS event consists of a buffer on a port. */ typedef struct ds_event { ds_port_t *port; char *buf; size_t buflen; } ds_event_t; static uint_t ds_ldc_cb(uint64_t event, caddr_t arg) { ldc_status_t ldc_state; int rv; ds_port_t *port = (ds_port_t *)arg; ldc_handle_t ldc_hdl; DS_DBG("ds@%lx: ds_ldc_cb...\n", port->id); if (!ds_enabled) { DS_DBG("ds@%lx: callback handling is disabled\n", port->id); return (LDC_SUCCESS); } ldc_hdl = port->ldc.hdl; /* * Check the LDC event. */ if (event & LDC_EVT_READ) { /* dispatch a thread to handle the read event */ if (DS_DISPATCH(ds_handle_recv, port) == NULL) { cmn_err(CE_WARN, "error initiating event handler"); } return (LDC_SUCCESS); } /* only check status if not a read event */ if ((rv = ldc_status(ldc_hdl, &ldc_state)) != 0) { DS_DBG("ds@%lx: ldc_status error (%d)", port->id, rv); return (LDC_SUCCESS); } if (event & LDC_EVT_DOWN || event & LDC_EVT_UP) { mutex_enter(&port->lock); ds_handle_ldc_event(port, ldc_state); mutex_exit(&port->lock); return (LDC_SUCCESS); } if (event & LDC_EVT_RESET || event & LDC_EVT_WRITE) { DS_DBG("ds@%lx: LDC event (%lx) received", port->id, event); return (LDC_SUCCESS); } cmn_err(CE_NOTE, "ds@%lx: Unexpected LDC event (%lx) received", port->id, event); return (LDC_SUCCESS); } static int ds_recv_msg(ldc_handle_t ldc_hdl, caddr_t msgp, size_t msglen) { int rv = 0; size_t amt_left = msglen; int loopcnt = 0; while (msglen > 0) { if ((rv = ldc_read(ldc_hdl, msgp, &amt_left)) != 0) { if ((rv == EAGAIN) && (loopcnt++ < 1000)) { /* * Try again, but don't try for more than * one second. Something is wrong with * the channel. */ delay(drv_usectohz(10000)); /* 1/1000 sec */ } else { /* fail */ return (rv); } } else { msgp += amt_left; msglen -= amt_left; amt_left = msglen; } } /* while (msglen > 0) */ return (rv); } static void ds_handle_recv(void *arg) { ds_port_t *port = (ds_port_t *)arg; char *hbuf; size_t len; size_t read_size; boolean_t hasdata; ds_hdr_t hdr; uint8_t *msg; char *currp; int rv; ldc_handle_t ldc_hdl; ds_event_t *devent; DS_DBG("ds@%lx: ds_ldc_cb...\n", port->id); ldc_hdl = port->ldc.hdl; mutex_enter(&port->lock); while ((ldc_chkq(ldc_hdl, &hasdata) == 0) && hasdata) { DS_DBG("ds@%lx: reading next message\n", port->id); /* * Read in the next message. */ hbuf = (char *)&hdr; bzero(hbuf, DS_HDR_SZ); read_size = DS_HDR_SZ; currp = hbuf; /* read in the message header */ if ((rv = ds_recv_msg(ldc_hdl, currp, read_size)) != 0) { /* * failed to read message drop it and see if there * are anymore messages */ cmn_err(CE_NOTE, "ldc_read returned %d", rv); continue; } len = read_size; /* get payload size and alloc a buffer */ read_size = ((ds_hdr_t *)hbuf)->payload_len; msg = kmem_zalloc((DS_HDR_SZ + read_size), KM_SLEEP); /* move message header into buffer */ bcopy(hbuf, msg, DS_HDR_SZ); currp = (char *)(msg) + DS_HDR_SZ; /* read in the message body */ if ((rv = ds_recv_msg(ldc_hdl, currp, read_size)) != 0) { /* * failed to read message drop it and see if there * are anymore messages */ kmem_free(msg, (DS_HDR_SZ + read_size)); cmn_err(CE_NOTE, "ldc_read returned %d", rv); continue; } len += read_size; DS_DUMP_LDC_MSG(msg, len); /* * Send the message for processing, and store it * in the log. The memory is deallocated only when * the message is removed from the log. */ devent = kmem_zalloc(sizeof (ds_event_t), KM_SLEEP); devent->port = port; devent->buf = (char *)msg; devent->buflen = len; /* log the message */ (void) ds_log_add_msg(DS_LOG_IN(port->id), msg, len); /* send the message off to get processed in a new thread */ if (DS_DISPATCH(ds_dispatch_event, devent) == NULL) { cmn_err(CE_WARN, "error initiating event handler"); continue; } } mutex_exit(&port->lock); } static void ds_dispatch_event(void *arg) { ds_event_t *event = (ds_event_t *)arg; ds_hdr_t *hdr; ds_port_t *port; port = event->port; hdr = (ds_hdr_t *)event->buf; if (!DS_MSG_TYPE_VALID(hdr->msg_type)) { cmn_err(CE_NOTE, "ds@%lx: dispatch_event: invalid msg " "type (%d)", port->id, hdr->msg_type); return; } DS_DBG("ds@%lx: dispatch_event: msg_type=%d\n", port->id, hdr->msg_type); (*ds_msg_handlers[hdr->msg_type])(port, event->buf, event->buflen); kmem_free(event, sizeof (ds_event_t)); } static void ds_handle_ldc_event(ds_port_t *port, int newstate) { ldc_status_t oldstate = port->ldc.state; ASSERT(MUTEX_HELD(&port->lock)); DS_DBG_LDC("ds@%lx: LDC state change: 0x%x -> 0x%x\n", port->id, oldstate, newstate); switch (newstate) { case LDC_UP: if ((oldstate == LDC_OPEN) || (oldstate == LDC_READY)) { /* start the version negotiation */ ds_send_init_req(port); } else { DS_DBG_LDC("unsupported LDC state change\n"); } break; case LDC_READY: case LDC_OPEN: if (oldstate != LDC_UP) { /* not worried about this state change */ break; } _NOTE(FALLTHROUGH) default: if (oldstate == LDC_UP) { ds_port_reset(port); } else { DS_DBG_LDC("unsupported LDC state change\n"); } break; } port->ldc.state = newstate; } /* * Version negotiation is always initiated by the guest. Any * attempt by a remote party to initiate the handshake gets * nack'd with a major number equal to zero. This indicates * that no version is supported since an init request is not * expected. */ static void ds_handle_init_req(ds_port_t *port, caddr_t buf, size_t len) { ds_hdr_t *hdr; ds_init_nack_t *nack; char *msg; size_t msglen; ds_init_req_t *req; size_t explen = DS_MSG_LEN(ds_init_req_t); req = (ds_init_req_t *)(buf + DS_HDR_SZ); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); } else { DS_DBG("ds@%lx: id, req->major_vers, req->minor_vers); } DS_DBG("ds@%lx: init_nack>: major=0\n", port->id); msglen = DS_MSG_LEN(ds_init_nack_t); msg = kmem_zalloc(msglen, KM_SLEEP); hdr = (ds_hdr_t *)msg; hdr->msg_type = DS_INIT_NACK; hdr->payload_len = sizeof (ds_init_nack_t); nack = (ds_init_nack_t *)(msg + DS_HDR_SZ); nack->major_vers = 0; /* send message */ mutex_enter(&port->lock); (void) ds_send_msg(port, msg, msglen); mutex_exit(&port->lock); } static void ds_handle_init_ack(ds_port_t *port, caddr_t buf, size_t len) { ds_init_ack_t *ack; ds_ver_t *ver; size_t explen = DS_MSG_LEN(ds_init_ack_t); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } ack = (ds_init_ack_t *)(buf + DS_HDR_SZ); mutex_enter(&port->lock); if (port->state != DS_PORT_INIT_REQ) { cmn_err(CE_NOTE, "ds@%lx: id, port->state); mutex_exit(&port->lock); return; } ver = &(ds_vers[port->ver_idx]); DS_DBG("ds@%lx: id, ver->major, ver->minor, ver->major, ack->minor_vers); /* agreed upon a major version */ port->ver.major = ver->major; /* * If the returned minor version is larger than * the requested minor version, use the lower of * the two, i.e. the requested version. */ if (ack->minor_vers >= ver->minor) { /* * Use the minor version specified in the * original request. */ port->ver.minor = ver->minor; } else { /* * Use the lower minor version returned in * the ack. By defninition, all lower minor * versions must be supported. */ port->ver.minor = ack->minor_vers; } port->state = DS_PORT_READY; DS_DBG("ds@%lx: id, port->ver.major, port->ver.minor); mutex_exit(&port->lock); /* * The port came up, so update all the services * with this information. Follow that up with an * attempt to register any service that is not * already registered. */ rw_enter(&ds_svcs.rwlock, RW_WRITER); (void) ds_walk_svcs(ds_svc_port_up, port); (void) ds_walk_svcs(ds_svc_register, NULL); rw_exit(&ds_svcs.rwlock); } static void ds_handle_init_nack(ds_port_t *port, caddr_t buf, size_t len) { int idx; ds_init_nack_t *nack; ds_ver_t *ver; size_t explen = DS_MSG_LEN(ds_init_nack_t); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } nack = (ds_init_nack_t *)(buf + DS_HDR_SZ); mutex_enter(&port->lock); if (port->state != DS_PORT_INIT_REQ) { cmn_err(CE_NOTE, "ds@%lx: id, port->state); mutex_exit(&port->lock); return; } ver = &(ds_vers[port->ver_idx]); DS_DBG("ds@%lx: id, ver->major, ver->minor, nack->major_vers); if (nack->major_vers == 0) { /* no supported protocol version */ cmn_err(CE_NOTE, "ds@%lx: id); mutex_exit(&port->lock); return; } /* * Walk the version list, looking for a major version * that is as close to the requested major version as * possible. */ for (idx = port->ver_idx; idx < DS_NUM_VER; idx++) { if (ds_vers[idx].major <= nack->major_vers) { /* found a version to try */ goto done; } } if (idx == DS_NUM_VER) { /* no supported version */ cmn_err(CE_NOTE, "ds@%lx: id, nack->major_vers); mutex_exit(&port->lock); return; } done: /* start the handshake again */ port->ver_idx = idx; port->state = DS_PORT_LDC_INIT; ds_send_init_req(port); mutex_exit(&port->lock); } static void ds_handle_reg_req(ds_port_t *port, caddr_t buf, size_t len) { ds_hdr_t *hdr; ds_reg_req_t *req; ds_reg_nack_t *nack; char *msg; size_t msglen; size_t explen = DS_MSG_LEN(ds_reg_req_t); /* the request information */ req = (ds_reg_req_t *)(buf + DS_HDR_SZ); /* sanity check the incoming message */ if (len < explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); } else { DS_DBG("ds@%lx: id, req->svc_id, req->major_vers, req->minor_vers, req->svc_handle); } DS_DBG("ds@%lx: reg_nack>: major=0\n", port->id); msglen = DS_MSG_LEN(ds_reg_nack_t); msg = kmem_zalloc(msglen, KM_SLEEP); hdr = (ds_hdr_t *)msg; hdr->msg_type = DS_REG_NACK; hdr->payload_len = sizeof (ds_reg_nack_t); nack = (ds_reg_nack_t *)(msg + DS_HDR_SZ); nack->svc_handle = req->svc_handle; nack->result = DS_REG_VER_NACK; nack->major_vers = 0; /* send message */ mutex_enter(&port->lock); (void) ds_send_msg(port, msg, msglen); mutex_exit(&port->lock); } static void ds_handle_reg_ack(ds_port_t *port, caddr_t buf, size_t len) { ds_reg_ack_t *ack; ds_ver_t *ver; ds_ver_t tmpver; ds_svc_t *svc; size_t explen = DS_MSG_LEN(ds_reg_ack_t); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } ack = (ds_reg_ack_t *)(buf + DS_HDR_SZ); rw_enter(&ds_svcs.rwlock, RW_READER); /* lookup appropriate client */ if ((svc = ds_get_svc(ack->svc_handle)) == NULL) { cmn_err(CE_NOTE, "ds@%lx: id, ack->svc_handle); goto done; } /* make sure the message makes sense */ if (svc->state != DS_SVC_REG_PENDING) { cmn_err(CE_NOTE, "ds@%lx: id, svc->state); goto done; } ver = &(svc->cap.vers[svc->ver_idx]); DS_DBG("ds@%lx: id, ack->svc_handle, ver->major, ack->minor_vers); /* major version has been agreed upon */ svc->ver.major = ver->major; if (ack->minor_vers >= ver->minor) { /* * Use the minor version specified in the * original request. */ svc->ver.minor = ver->minor; } else { /* * Use the lower minor version returned in * the ack. By defninition, all lower minor * versions must be supported. */ svc->ver.minor = ack->minor_vers; } svc->state = DS_SVC_ACTIVE; DS_DBG("ds@%lx: id, svc->cap.svc_id, svc->ver.major, svc->ver.minor, svc->hdl); /* notify the client that registration is complete */ if (svc->ops.ds_reg_cb) { /* * Use a temporary version structure so that * the copy in the svc structure cannot be * modified by the client. */ tmpver.major = svc->ver.major; tmpver.minor = svc->ver.minor; (*svc->ops.ds_reg_cb)(svc->ops.cb_arg, &tmpver, svc->hdl); } done: rw_exit(&ds_svcs.rwlock); } static void ds_handle_reg_nack(ds_port_t *port, caddr_t buf, size_t len) { ds_reg_nack_t *nack; ds_svc_t *svc; int idx; size_t explen = DS_MSG_LEN(ds_reg_nack_t); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } nack = (ds_reg_nack_t *)(buf + DS_HDR_SZ); rw_enter(&ds_svcs.rwlock, RW_READER); /* lookup appropriate client */ if ((svc = ds_get_svc(nack->svc_handle)) == NULL) { cmn_err(CE_NOTE, "ds@%lx: id, nack->svc_handle); goto done; } /* make sure the message makes sense */ if (svc->state != DS_SVC_REG_PENDING) { cmn_err(CE_NOTE, "ds@%lx: id, svc->state); goto done; } if (nack->result == DS_REG_DUP) { cmn_err(CE_NOTE, "ds@%lx: id, svc->cap.svc_id); goto done; } /* * A major version of zero indicates that the * service is not supported at all. */ if (nack->major_vers == 0) { DS_DBG("ds@%lx: id, svc->cap.svc_id); ds_reset_svc(svc, port); goto done; } DS_DBG("ds@%lx: id, nack->svc_handle, nack->major_vers); /* * Walk the version list for the service, looking for * a major version that is as close to the requested * major version as possible. */ for (idx = svc->ver_idx; idx < svc->cap.nvers; idx++) { if (svc->cap.vers[idx].major <= nack->major_vers) { /* found a version to try */ break; } } if (idx == svc->cap.nvers) { /* no supported version */ DS_DBG("ds@%lx: id, svc->cap.svc_id, nack->major_vers); svc->state = DS_SVC_INACTIVE; goto done; } /* start the handshake again */ svc->state = DS_SVC_INACTIVE; svc->ver_idx = idx; (void) ds_svc_register(svc, NULL); done: rw_exit(&ds_svcs.rwlock); } static void ds_handle_unreg_req(ds_port_t *port, caddr_t buf, size_t len) { ds_hdr_t *hdr; ds_unreg_req_t *req; ds_unreg_ack_t *ack; ds_svc_t *svc; char *msg; size_t msglen; size_t explen = DS_MSG_LEN(ds_unreg_req_t); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } /* the request information */ req = (ds_unreg_req_t *)(buf + DS_HDR_SZ); rw_enter(&ds_svcs.rwlock, RW_READER); /* lookup appropriate client */ if ((svc = ds_get_svc(req->svc_handle)) == NULL) { cmn_err(CE_NOTE, "ds@%lx: id, req->svc_handle); ds_send_unreg_nack(port, req->svc_handle); goto done; } /* unregister the service */ (void) ds_svc_unregister(svc, svc->port); DS_DBG("ds@%lx: unreg_ack>: hdl=0x%lx\n", port->id, req->svc_handle); msglen = DS_HDR_SZ + sizeof (ds_unreg_ack_t); msg = kmem_zalloc(msglen, KM_SLEEP); hdr = (ds_hdr_t *)msg; hdr->msg_type = DS_UNREG_ACK; hdr->payload_len = sizeof (ds_unreg_ack_t); ack = (ds_unreg_ack_t *)(msg + DS_HDR_SZ); ack->svc_handle = req->svc_handle; /* send message */ mutex_enter(&port->lock); (void) ds_send_msg(port, msg, msglen); mutex_enter(&port->lock); done: rw_exit(&ds_svcs.rwlock); } static void ds_handle_unreg_ack(ds_port_t *port, caddr_t buf, size_t len) { ds_unreg_ack_t *ack; size_t explen = DS_MSG_LEN(ds_unreg_ack_t); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } ack = (ds_unreg_ack_t *)(buf + DS_HDR_SZ); DS_DBG("ds@%lx: id, ack->svc_handle); rw_enter(&ds_svcs.rwlock, RW_READER); /* * Since the unregister request was initiated locally, * the service structure has already been torn down. * Just perform a sanity check to make sure the message * is appropriate. */ if (ds_get_svc(ack->svc_handle) != NULL) { cmn_err(CE_NOTE, "ds@%lx: id, ack->svc_handle); } rw_exit(&ds_svcs.rwlock); } static void ds_handle_unreg_nack(ds_port_t *port, caddr_t buf, size_t len) { ds_unreg_nack_t *nack; size_t explen = DS_MSG_LEN(ds_unreg_nack_t); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } nack = (ds_unreg_nack_t *)(buf + DS_HDR_SZ); DS_DBG("ds@%lx: id, nack->svc_handle); rw_enter(&ds_svcs.rwlock, RW_READER); /* * Since the unregister request was initiated locally, * the service structure has already been torn down. * Just perform a sanity check to make sure the message * is appropriate. */ if (ds_get_svc(nack->svc_handle) != NULL) { cmn_err(CE_NOTE, "ds@%lx: id, nack->svc_handle); } rw_exit(&ds_svcs.rwlock); } static void ds_handle_data(ds_port_t *port, caddr_t buf, size_t len) { ds_data_handle_t *data; ds_svc_t *svc; char *msg; int msgsz; int hdrsz; size_t explen = DS_MSG_LEN(ds_data_handle_t); /* sanity check the incoming message */ if (len < explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } data = (ds_data_handle_t *)(buf + DS_HDR_SZ); hdrsz = DS_HDR_SZ + sizeof (ds_data_handle_t); msgsz = len - hdrsz; /* strip off the header for the client */ msg = (msgsz) ? (buf + hdrsz) : NULL; rw_enter(&ds_svcs.rwlock, RW_READER); /* lookup appropriate client */ if ((svc = ds_get_svc(data->svc_handle)) == NULL) { cmn_err(CE_NOTE, "ds@%lx: id, data->svc_handle); ds_send_data_nack(port, data->svc_handle); return; } rw_exit(&ds_svcs.rwlock); DS_DBG("ds@%lx: id, (svc->cap.svc_id) ? svc->cap.svc_id : "NULL", svc->hdl); /* dispatch this message to the client */ (*svc->ops.ds_data_cb)(svc->ops.cb_arg, msg, msgsz); } static void ds_handle_nack(ds_port_t *port, caddr_t buf, size_t len) { ds_svc_t *svc; ds_data_nack_t *nack; size_t explen = DS_MSG_LEN(ds_data_nack_t); /* sanity check the incoming message */ if (len != explen) { cmn_err(CE_NOTE, "ds@%lx: id, len, explen); return; } nack = (ds_data_nack_t *)(buf + DS_HDR_SZ); DS_DBG("ds@%lx: data_nack: hdl=0x%lx, result=0x%lx\n", port->id, nack->svc_handle, nack->result); if (nack->result == DS_INV_HDL) { rw_enter(&ds_svcs.rwlock, RW_READER); if ((svc = ds_get_svc(nack->svc_handle)) == NULL) { rw_exit(&ds_svcs.rwlock); return; } cmn_err(CE_NOTE, "ds@%lx: id, nack->svc_handle); (void) ds_svc_unregister(svc, svc->port); rw_exit(&ds_svcs.rwlock); } } static int ds_send_msg(ds_port_t *port, caddr_t msg, size_t msglen) { int rv; caddr_t currp = msg; size_t amt_left = msglen; int loopcnt = 0; DS_DUMP_LDC_MSG(msg, msglen); (void) ds_log_add_msg(DS_LOG_OUT(port->id), (uint8_t *)msg, msglen); /* * ensure that no other messages can be sent on this port in case * the write doesn't get sent with one write to guarantee that the * message doesn't become fragmented. */ ASSERT(MUTEX_HELD(&port->lock)); /* send the message */ do { if ((rv = ldc_write(port->ldc.hdl, currp, &msglen)) != 0) { if ((rv == EWOULDBLOCK) && (loopcnt++ < 1000)) { /* * don't try for more than a sec. Something * is wrong with the channel. */ delay(drv_usectohz(10000)); /* 1/1000 sec */ } else { cmn_err(CE_WARN, "ds@%lx: send_msg: ldc_write failed (%d)", port->id, rv); return (rv); } } else { amt_left -= msglen; currp += msglen; msglen = amt_left; loopcnt = 0; } } while (amt_left > 0); return (rv); } static void ds_send_init_req(ds_port_t *port) { ds_hdr_t *hdr; ds_init_req_t *init_req; size_t nbytes; ds_ver_t *vers = &ds_vers[port->ver_idx]; ASSERT(MUTEX_HELD(&port->lock)); if (port->state != DS_PORT_LDC_INIT) { cmn_err(CE_NOTE, "ds@%lx: init_req>: invalid port state (%d)", port->id, port->state); return; } DS_DBG("ds@%lx: init_req>: req=v%d.%d\n", port->id, vers->major, vers->minor); nbytes = DS_HDR_SZ + sizeof (ds_init_req_t); hdr = kmem_zalloc(nbytes, KM_SLEEP); hdr->msg_type = DS_INIT_REQ; hdr->payload_len = sizeof (ds_init_req_t); init_req = (ds_init_req_t *)((caddr_t)hdr + DS_HDR_SZ); init_req->major_vers = vers->major; init_req->minor_vers = vers->minor; /* send the message */ if (ds_send_msg(port, (caddr_t)hdr, nbytes) == 0) { port->state = DS_PORT_INIT_REQ; } } static int ds_send_reg_req(ds_svc_t *svc) { ds_port_t *port = svc->port; ds_ver_t *ver; ds_hdr_t *hdr; caddr_t msg; size_t msglen; size_t nbytes; ds_reg_req_t *req; size_t idlen; /* assumes some checking has already occurred */ ASSERT(svc->state == DS_SVC_INACTIVE); mutex_enter(&port->lock); /* check on the LDC to Zeus */ if (port->ldc.state != LDC_UP) { /* can not send message */ DS_DBG("ds@%lx: reg_req>: channel %ld is not up\n", port->id, port->ldc.id); mutex_exit(&port->lock); return (-1); } /* make sure port is ready */ if (port->state != DS_PORT_READY) { /* can not send message */ DS_DBG("ds@%lx: reg_req>: port is not ready\n", port->id); mutex_exit(&port->lock); return (-1); } mutex_exit(&port->lock); /* allocate the message buffer */ idlen = strlen(svc->cap.svc_id); msglen = DS_HDR_SZ + sizeof (ds_reg_req_t) + idlen; msg = kmem_zalloc(msglen, KM_SLEEP); /* copy in the header data */ hdr = (ds_hdr_t *)msg; hdr->msg_type = DS_REG_REQ; hdr->payload_len = sizeof (ds_reg_req_t) + idlen; req = (ds_reg_req_t *)(msg + DS_HDR_SZ); req->svc_handle = svc->hdl; ver = &(svc->cap.vers[svc->ver_idx]); req->major_vers = ver->major; req->minor_vers = ver->minor; /* copy in the service id */ bcopy(svc->cap.svc_id, req->svc_id, idlen + 1); /* send the message */ DS_DBG("ds@%lx: reg_req>: id='%s', ver=%d.%d, hdl=0x%lx\n", port->id, svc->cap.svc_id, ver->major, ver->minor, svc->hdl); nbytes = msglen; mutex_enter(&port->lock); if (ds_send_msg(port, msg, nbytes) != 0) { mutex_exit(&port->lock); return (-1); } else { svc->state = DS_SVC_REG_PENDING; } mutex_exit(&port->lock); return (0); } static int ds_send_unreg_req(ds_svc_t *svc) { caddr_t msg; size_t msglen; size_t nbytes; ds_hdr_t *hdr; ds_unreg_req_t *req; ds_port_t *port = svc->port; if (port == NULL) { DS_DBG("send_unreg_req: service '%s' not associated with " "a port\n", svc->cap.svc_id); return (-1); } mutex_enter(&port->lock); /* check on the LDC to Zeus */ if (port->ldc.state != LDC_UP) { /* can not send message */ cmn_err(CE_NOTE, "ds@%lx: unreg_req>: channel %ld is not up", port->id, port->ldc.id); mutex_exit(&port->lock); return (-1); } /* make sure port is ready */ if (port->state != DS_PORT_READY) { /* can not send message */ cmn_err(CE_NOTE, "ds@%lx: unreg_req>: port is not ready", port->id); mutex_exit(&port->lock); return (-1); } mutex_exit(&port->lock); msglen = DS_HDR_SZ + sizeof (ds_unreg_req_t); msg = kmem_zalloc(msglen, KM_SLEEP); /* copy in the header data */ hdr = (ds_hdr_t *)msg; hdr->msg_type = DS_UNREG; hdr->payload_len = sizeof (ds_unreg_req_t); req = (ds_unreg_req_t *)(msg + DS_HDR_SZ); req->svc_handle = svc->hdl; /* send the message */ DS_DBG("ds@%lx: unreg_req>: id='%s', hdl=0x%lx\n", port->id, (svc->cap.svc_id) ? svc->cap.svc_id : "NULL", svc->hdl); nbytes = msglen; mutex_enter(&port->lock); if (ds_send_msg(port, msg, nbytes) != 0) { mutex_exit(&port->lock); return (-1); } mutex_exit(&port->lock); return (0); } static void ds_send_unreg_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl) { caddr_t msg; size_t msglen; size_t nbytes; ds_hdr_t *hdr; ds_unreg_nack_t *nack; mutex_enter(&port->lock); /* check on the LDC to Zeus */ if (port->ldc.state != LDC_UP) { /* can not send message */ cmn_err(CE_NOTE, "ds@%lx: unreg_nack>: channel %ld is not up", port->id, port->ldc.id); mutex_exit(&port->lock); return; } /* make sure port is ready */ if (port->state != DS_PORT_READY) { /* can not send message */ cmn_err(CE_NOTE, "ds@%lx: unreg_nack>: port is not ready", port->id); mutex_exit(&port->lock); return; } mutex_exit(&port->lock); msglen = DS_HDR_SZ + sizeof (ds_unreg_nack_t); msg = kmem_zalloc(msglen, KM_SLEEP); /* copy in the header data */ hdr = (ds_hdr_t *)msg; hdr->msg_type = DS_UNREG_NACK; hdr->payload_len = sizeof (ds_unreg_nack_t); nack = (ds_unreg_nack_t *)(msg + DS_HDR_SZ); nack->svc_handle = bad_hdl; /* send the message */ DS_DBG("ds@%lx: unreg_nack>: hdl=0x%lx\n", port->id, bad_hdl); nbytes = msglen; mutex_enter(&port->lock); (void) ds_send_msg(port, msg, nbytes); mutex_exit(&port->lock); } static void ds_send_data_nack(ds_port_t *port, ds_svc_hdl_t bad_hdl) { caddr_t msg; size_t msglen; size_t nbytes; ds_hdr_t *hdr; ds_data_nack_t *nack; mutex_enter(&port->lock); /* check on the LDC to Zeus */ if (port->ldc.state != LDC_UP) { /* can not send message */ cmn_err(CE_NOTE, "ds@%lx: data_nack>: channel %ld is not up", port->id, port->ldc.id); mutex_exit(&port->lock); return; } /* make sure port is ready */ if (port->state != DS_PORT_READY) { /* can not send message */ cmn_err(CE_NOTE, "ds@%lx: data_nack>: port is not ready", port->id); mutex_exit(&port->lock); return; } mutex_exit(&port->lock); msglen = DS_HDR_SZ + sizeof (ds_data_nack_t); msg = kmem_zalloc(msglen, KM_SLEEP); /* copy in the header data */ hdr = (ds_hdr_t *)msg; hdr->msg_type = DS_NACK; hdr->payload_len = sizeof (ds_data_nack_t); nack = (ds_data_nack_t *)(msg + DS_HDR_SZ); nack->svc_handle = bad_hdl; nack->result = DS_INV_HDL; /* send the message */ DS_DBG("ds@%lx: data_nack>: hdl=0x%lx\n", port->id, bad_hdl); nbytes = msglen; mutex_enter(&port->lock); (void) ds_send_msg(port, msg, nbytes); mutex_exit(&port->lock); } #ifdef DEBUG #define BYTESPERLINE 8 #define LINEWIDTH ((BYTESPERLINE * 3) + (BYTESPERLINE + 2) + 1) #define ASCIIOFFSET ((BYTESPERLINE * 3) + 2) #define ISPRINT(c) ((c >= ' ') && (c <= '~')) /* * Output a buffer formatted with a set number of bytes on * each line. Append each line with the ASCII equivalent of * each byte if it falls within the printable ASCII range, * and '.' otherwise. */ static void ds_dump_ldc_msg(void *vbuf, size_t len) { int i, j; char *curr; char *aoff; char line[LINEWIDTH]; uint8_t *buf = vbuf; /* abort if not debugging ldc */ if (!(ds_debug & DS_DBG_FLAG_LDC)) { return; } /* walk the buffer one line at a time */ for (i = 0; i < len; i += BYTESPERLINE) { bzero(line, LINEWIDTH); curr = line; aoff = line + ASCIIOFFSET; /* * Walk the bytes in the current line, storing * the hex value for the byte as well as the * ASCII representation in a temporary buffer. * All ASCII values are placed at the end of * the line. */ for (j = 0; (j < BYTESPERLINE) && ((i + j) < len); j++) { (void) sprintf(curr, " %02x", buf[i + j]); *aoff = (ISPRINT(buf[i + j])) ? buf[i + j] : '.'; curr += 3; aoff++; } /* * Fill in to the start of the ASCII translation * with spaces. This will only be necessary if * this is the last line and there are not enough * bytes to fill the whole line. */ while (curr != (line + ASCIIOFFSET)) *curr++ = ' '; DS_DBG_LDC("%s\n", line); } } #endif /* DEBUG */ /* * Walk the table of registered services, executing the specified * callback function for each service. A non-zero return value from * the callback is used to terminate the walk, not to indicate an * error. Returns the index of the last service visited. */ static int ds_walk_svcs(svc_cb_t svc_cb, void *arg) { int idx; ds_svc_t *svc; ASSERT(RW_LOCK_HELD(&ds_svcs.rwlock)); /* walk every table entry */ for (idx = 0; idx < ds_svcs.maxsvcs; idx++) { svc = ds_svcs.tbl[idx]; /* execute the callback */ if ((*svc_cb)(svc, arg) != 0) break; } return (idx); } static int ds_svc_isfree(ds_svc_t *svc, void *arg) { _NOTE(ARGUNUSED(arg)) /* * Looking for a free service. This may be a NULL entry * in the table, or an unused structure that could be * reused. */ if (DS_SVC_ISFREE(svc)) { /* yes, it is free */ return (1); } /* not a candidate */ return (0); } static int ds_svc_ismatch(ds_svc_t *svc, void *arg) { if (DS_SVC_ISFREE(svc)) { return (0); } if (strcmp(svc->cap.svc_id, arg) == 0) { /* found a match */ return (1); } return (0); } static int ds_svc_free(ds_svc_t *svc, void *arg) { _NOTE(ARGUNUSED(arg)) if (svc == NULL) { return (0); } if (svc->cap.svc_id) { kmem_free(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1); svc->cap.svc_id = NULL; } if (svc->cap.vers) { kmem_free(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t)); svc->cap.vers = NULL; } kmem_free(svc, sizeof (ds_svc_t)); return (0); } static int ds_svc_register(ds_svc_t *svc, void *arg) { _NOTE(ARGUNUSED(arg)) int idx; /* check the state of the service */ if (DS_SVC_ISFREE(svc) || (svc->state != DS_SVC_INACTIVE)) return (0); /* check if there are any ports to try */ if (DS_PORTSET_ISNULL(svc->avail)) return (0); /* * Attempt to register the service. Start with the lowest * numbered port and continue until a registration message * is sent successfully, or there are no ports left to try. */ for (idx = 0; idx < DS_MAX_PORTS; idx++) { /* * If the port is not in the available list, * it is not a candidate for registration. */ if (!DS_PORT_IN_SET(svc->avail, idx)) { continue; } svc->port = &ds_ports[idx]; if (ds_send_reg_req(svc) == 0) { /* register sent successfully */ break; } /* reset the service to try the next port */ ds_reset_svc(svc, svc->port); } return (0); } static int ds_svc_unregister(ds_svc_t *svc, void *arg) { ds_port_t *port = (ds_port_t *)arg; if (DS_SVC_ISFREE(svc)) { return (0); } /* make sure the service is using this port */ if (svc->port != port) { return (0); } /* reset the service structure */ ds_reset_svc(svc, port); /* increment the count in the handle to prevent reuse */ svc->hdl = DS_ALLOC_HDL(DS_HDL2IDX(svc->hdl), DS_HDL2COUNT(svc->hdl)); /* call the client unregister callback */ if (svc->ops.ds_unreg_cb) (*svc->ops.ds_unreg_cb)(svc->ops.cb_arg); /* try to initiate a new registration */ (void) ds_svc_register(svc, NULL); return (0); } static int ds_svc_port_up(ds_svc_t *svc, void *arg) { ds_port_t *port = (ds_port_t *)arg; if (DS_SVC_ISFREE(svc)) { /* nothing to do */ return (0); } DS_PORTSET_ADD(svc->avail, port->id); return (0); } static ds_svc_t * ds_alloc_svc(void) { int idx; uint_t newmaxsvcs; ds_svc_t **newtbl; ds_svc_t *newsvc; ASSERT(RW_WRITE_HELD(&ds_svcs.rwlock)); idx = ds_walk_svcs(ds_svc_isfree, NULL); if (idx != ds_svcs.maxsvcs) { goto found; } /* * There was no free space in the table. Grow * the table to double its current size. */ newmaxsvcs = ds_svcs.maxsvcs * 2; newtbl = kmem_zalloc(newmaxsvcs * sizeof (ds_svc_t *), KM_SLEEP); /* copy old table data to the new table */ for (idx = 0; idx < ds_svcs.maxsvcs; idx++) { newtbl[idx] = ds_svcs.tbl[idx]; } /* clean up the old table */ kmem_free(ds_svcs.tbl, ds_svcs.maxsvcs * sizeof (ds_svc_t *)); ds_svcs.tbl = newtbl; ds_svcs.maxsvcs = newmaxsvcs; /* search for a free space again */ idx = ds_walk_svcs(ds_svc_isfree, NULL); /* the table is locked so should find a free slot */ ASSERT(idx != ds_svcs.maxsvcs); found: /* allocate a new svc structure if necessary */ if ((newsvc = ds_svcs.tbl[idx]) == NULL) { /* allocate a new service */ newsvc = kmem_zalloc(sizeof (ds_svc_t), KM_SLEEP); ds_svcs.tbl[idx] = newsvc; } /* fill in the handle */ newsvc->hdl = DS_ALLOC_HDL(idx, DS_HDL2COUNT(newsvc->hdl)); return (newsvc); } static void ds_reset_svc(ds_svc_t *svc, ds_port_t *port) { svc->state = DS_SVC_INACTIVE; svc->ver_idx = 0; svc->ver.major = 0; svc->ver.minor = 0; svc->port = NULL; DS_PORTSET_DEL(svc->avail, port->id); } static ds_svc_t * ds_get_svc(ds_svc_hdl_t hdl) { int idx; ds_svc_t *svc; ASSERT(RW_LOCK_HELD(&ds_svcs.rwlock)); if (hdl == DS_INVALID_HDL) return (NULL); idx = DS_HDL2IDX(hdl); /* check if index is out of bounds */ if ((idx < 0) || (idx >= ds_svcs.maxsvcs)) return (NULL); svc = ds_svcs.tbl[idx]; /* check for a valid service */ if (DS_SVC_ISFREE(svc)) return (NULL); /* make sure the handle is an exact match */ if (svc->hdl != hdl) return (NULL); return (svc); } static int ds_port_add(md_t *mdp, mde_cookie_t port, mde_cookie_t chan) { ds_port_t *newport; uint64_t port_id; uint64_t ldc_id; /* get the ID for this port */ if (md_get_prop_val(mdp, port, "id", &port_id) != 0) { cmn_err(CE_NOTE, "ds_port_add: port 'id' property not found"); return (-1); } /* sanity check the port id */ if (port_id > DS_MAX_PORT_ID) { cmn_err(CE_WARN, "ds_port_add: port ID %ld out of range", port_id); return (-1); } DS_DBG("ds_port_add: adding port ds@%ld\n", port_id); /* get the channel ID for this port */ if (md_get_prop_val(mdp, chan, "id", &ldc_id) != 0) { cmn_err(CE_NOTE, "ds@%lx: add_port: no channel 'id' property", port_id); return (-1); } /* get the port structure from the array of ports */ newport = &ds_ports[port_id]; /* check for a duplicate port in the MD */ if (newport->state != DS_PORT_FREE) { cmn_err(CE_NOTE, "ds@%lx: add_port: port already exists", port_id); return (-1); } /* initialize the port lock */ mutex_init(&newport->lock, NULL, MUTEX_DRIVER, NULL); /* initialize the port */ newport->id = port_id; newport->state = DS_PORT_INIT; newport->ldc.id = ldc_id; /* add the port to the set of all ports */ DS_PORTSET_ADD(ds_allports, port_id); return (0); } static void ds_port_reset(ds_port_t *port) { ASSERT(MUTEX_HELD(&port->lock)); /* connection went down, mark everything inactive */ rw_enter(&ds_svcs.rwlock, RW_WRITER); (void) ds_walk_svcs(ds_svc_unregister, port); rw_exit(&ds_svcs.rwlock); port->ver_idx = 0; port->ver.major = 0; port->ver.minor = 0; port->state = DS_PORT_LDC_INIT; } /* * Verify that a version array is sorted as expected for the * version negotiation to work correctly. */ static ds_vers_check_t ds_vers_isvalid(ds_ver_t *vers, int nvers) { uint16_t curr_major; uint16_t curr_minor; int idx; curr_major = vers[0].major; curr_minor = vers[0].minor; /* * Walk the version array, verifying correct ordering. * The array must be sorted from highest supported * version to lowest supported version. */ for (idx = 0; idx < nvers; idx++) { if (vers[idx].major > curr_major) { DS_DBG("vers_isvalid: version array has increasing " "major versions\n"); return (DS_VERS_INCREASING_MAJOR_ERR); } if (vers[idx].major < curr_major) { curr_major = vers[idx].major; curr_minor = vers[idx].minor; continue; } if (vers[idx].minor > curr_minor) { DS_DBG("vers_isvalid: version array has increasing " "minor versions\n"); return (DS_VERS_INCREASING_MINOR_ERR); } curr_minor = vers[idx].minor; } return (DS_VERS_OK); } /* * Logging Support */ static void ds_log_init(void) { ds_log_entry_t *new; /* initialize global lock */ mutex_init(&ds_log.lock, NULL, MUTEX_DRIVER, NULL); mutex_enter(&ds_log.lock); /* initialize the log */ ds_log.head = NULL; ds_log.size = 0; ds_log.nentry = 0; /* initialize the free list */ for (new = ds_log_entry_pool; new < DS_LOG_POOL_END; new++) { new->next = ds_log.freelist; ds_log.freelist = new; } mutex_exit(&ds_log.lock); DS_DBG_LOG("ds_log initialized: size=%d bytes, limit=%d bytes, " "ninit=%ld\n", ds_log_sz, DS_LOG_LIMIT, DS_LOG_NPOOL); } static void ds_log_fini(void) { ds_log_entry_t *next; mutex_enter(&ds_log.lock); /* clear out the log */ while (ds_log.nentry > 0) (void) ds_log_remove(); /* * Now all the entries are on the free list. * Clear out the free list, deallocating any * entry that was dynamically allocated. */ while (ds_log.freelist != NULL) { next = ds_log.freelist->next; if (!DS_IS_POOL_ENTRY(ds_log.freelist)) { kmem_free(ds_log.freelist, sizeof (ds_log_entry_t)); } ds_log.freelist = next; } mutex_exit(&ds_log.lock); mutex_destroy(&ds_log.lock); } static ds_log_entry_t * ds_log_entry_alloc(void) { ds_log_entry_t *new = NULL; ASSERT(MUTEX_HELD(&ds_log.lock)); if (ds_log.freelist != NULL) { new = ds_log.freelist; ds_log.freelist = ds_log.freelist->next; } if (new == NULL) { /* free list was empty */ new = kmem_zalloc(sizeof (ds_log_entry_t), KM_SLEEP); } ASSERT(new); return (new); } static void ds_log_entry_free(ds_log_entry_t *entry) { ASSERT(MUTEX_HELD(&ds_log.lock)); if (entry == NULL) return; if (entry->data != NULL) { kmem_free(entry->data, entry->datasz); entry->data = NULL; } /* place entry on the free list */ entry->next = ds_log.freelist; ds_log.freelist = entry; } /* * Add a message to the end of the log */ static int ds_log_add(ds_log_entry_t *new) { ASSERT(MUTEX_HELD(&ds_log.lock)); if (ds_log.head == NULL) { new->prev = new; new->next = new; ds_log.head = new; } else { ds_log_entry_t *head = ds_log.head; ds_log_entry_t *tail = ds_log.head->prev; new->next = head; new->prev = tail; tail->next = new; head->prev = new; } /* increase the log size, including the metadata size */ ds_log.size += DS_LOG_ENTRY_SZ(new); ds_log.nentry++; DS_DBG_LOG("ds_log: added %ld data bytes, %ld total bytes\n", new->datasz, DS_LOG_ENTRY_SZ(new)); return (0); } /* * Remove an entry from the head of the log */ static int ds_log_remove(void) { ds_log_entry_t *head; ASSERT(MUTEX_HELD(&ds_log.lock)); head = ds_log.head; /* empty list */ if (head == NULL) return (0); if (head->next == ds_log.head) { /* one element list */ ds_log.head = NULL; } else { head->next->prev = head->prev; head->prev->next = head->next; ds_log.head = head->next; } DS_DBG_LOG("ds_log: removed %ld data bytes, %ld total bytes\n", head->datasz, DS_LOG_ENTRY_SZ(head)); ds_log.size -= DS_LOG_ENTRY_SZ(head); ds_log.nentry--; ASSERT((ds_log.size >= 0) && (ds_log.nentry >= 0)); ds_log_entry_free(head); return (0); } /* * Replace the data in the entry at the front of the list with then * new data. This has the effect of removing the oldest entry and * adding the new entry. */ static int ds_log_replace(uint8_t *msg, size_t sz) { ds_log_entry_t *head; ASSERT(MUTEX_HELD(&ds_log.lock)); head = ds_log.head; DS_DBG_LOG("ds_log: replaced %ld data bytes (%ld total) with %ld data " "bytes (%ld total)\n", head->datasz, DS_LOG_ENTRY_SZ(head), sz, sz + sizeof (ds_log_entry_t)); ds_log.size -= DS_LOG_ENTRY_SZ(head); ASSERT((ds_log.size >= 0) && (ds_log.nentry >= 0)); kmem_free(head->data, head->datasz); head->data = msg; head->datasz = sz; ds_log.size += DS_LOG_ENTRY_SZ(head); ds_log.head = head->next; return (0); } static void ds_log_purge(void *arg) { _NOTE(ARGUNUSED(arg)) mutex_enter(&ds_log.lock); DS_DBG_LOG("ds_log: purging oldest log entries\n"); while ((ds_log.nentry) && (ds_log.size >= ds_log_sz)) { (void) ds_log_remove(); } mutex_exit(&ds_log.lock); } static int ds_log_add_msg(int32_t dest, uint8_t *msg, size_t sz) { int rv = 0; mutex_enter(&ds_log.lock); /* check if the log is larger than the soft limit */ if ((ds_log.nentry) && ((ds_log.size + sz) >= ds_log_sz)) { /* * The log is larger than the soft limit. * Swap the oldest entry for the newest. */ DS_DBG_LOG("ds_log: replacing oldest entry with new entry\n"); (void) ds_log_replace(msg, sz); } else { /* * Still have headroom under the soft limit. * Add the new entry to the log. */ ds_log_entry_t *new; new = ds_log_entry_alloc(); /* fill in message data */ new->data = msg; new->datasz = sz; new->timestamp = ddi_get_time(); new->dest = dest; rv = ds_log_add(new); } /* check if the log is larger than the hard limit */ if ((ds_log.nentry > 1) && (ds_log.size >= DS_LOG_LIMIT)) { /* * Wakeup the thread to remove entries * from the log until it is smaller than * the soft limit. */ DS_DBG_LOG("ds_log: log exceeded %d bytes, scheduling a " "purge...\n", DS_LOG_LIMIT); if (DS_DISPATCH(ds_log_purge, (void *)msg) == NULL) { cmn_err(CE_NOTE, "ds_log: purge thread failed to " "start"); } } mutex_exit(&ds_log.lock); return (rv); } /* * Client Interface */ int ds_cap_init(ds_capability_t *cap, ds_clnt_ops_t *ops) { int idx; ds_vers_check_t status; ds_svc_t *svc; /* sanity check the args */ if ((cap == NULL) || (ops == NULL)) { cmn_err(CE_NOTE, "ds_cap_init: invalid arguments"); return (EINVAL); } /* sanity check the capability specifier */ if ((cap->svc_id == NULL) || (cap->vers == NULL) || (cap->nvers == 0)) { cmn_err(CE_NOTE, "ds_cap_init: invalid capability specifier"); return (EINVAL); } /* sanity check the version array */ if ((status = ds_vers_isvalid(cap->vers, cap->nvers)) != DS_VERS_OK) { cmn_err(CE_NOTE, "ds_cap_init: invalid capability " "version array for %s service: %s", cap->svc_id, (status == DS_VERS_INCREASING_MAJOR_ERR) ? "increasing major versions" : "increasing minor versions"); return (EINVAL); } /* data and register callbacks are required */ if ((ops->ds_data_cb == NULL) || (ops->ds_reg_cb == NULL)) { cmn_err(CE_NOTE, "ds_cap_init: invalid ops specifier for " "%s service", cap->svc_id); return (EINVAL); } DS_DBG("ds_cap_init: svc_id='%s', data_cb=0x%lx, cb_arg=0x%lx\n", cap->svc_id, (uint64_t)ops->ds_data_cb, (uint64_t)ops->cb_arg); rw_enter(&ds_svcs.rwlock, RW_WRITER); /* check if the service is already registered */ idx = ds_walk_svcs(ds_svc_ismatch, cap->svc_id); if (idx != ds_svcs.maxsvcs) { /* already registered */ cmn_err(CE_NOTE, "service '%s' already registered", cap->svc_id); rw_exit(&ds_svcs.rwlock); return (EALREADY); } svc = ds_alloc_svc(); /* copy over all the client information */ bcopy(cap, &svc->cap, sizeof (ds_capability_t)); /* make a copy of the service name */ svc->cap.svc_id = kmem_zalloc(strlen(cap->svc_id) + 1, KM_SLEEP); (void) strncpy(svc->cap.svc_id, cap->svc_id, strlen(cap->svc_id)); /* make a copy of the version array */ svc->cap.vers = kmem_zalloc(cap->nvers * sizeof (ds_ver_t), KM_SLEEP); bcopy(cap->vers, svc->cap.vers, cap->nvers * sizeof (ds_ver_t)); /* copy the client ops vector */ bcopy(ops, &svc->ops, sizeof (ds_clnt_ops_t)); svc->state = DS_SVC_INACTIVE; svc->ver_idx = 0; DS_PORTSET_DUP(svc->avail, ds_allports); ds_svcs.nsvcs++; rw_exit(&ds_svcs.rwlock); /* attempt to register the service */ (void) ds_svc_register(svc, NULL); DS_DBG("ds_cap_init: service '%s' assigned handle 0x%lx\n", svc->cap.svc_id, svc->hdl); return (0); } int ds_cap_fini(ds_capability_t *cap) { int idx; ds_svc_t *svc; ds_svc_hdl_t tmp_hdl; rw_enter(&ds_svcs.rwlock, RW_WRITER); /* make sure the service is registered */ idx = ds_walk_svcs(ds_svc_ismatch, cap->svc_id); if (idx == ds_svcs.maxsvcs) { /* service is not registered */ cmn_err(CE_NOTE, "ds_cap_fini: unknown service '%s'", cap->svc_id); rw_exit(&ds_svcs.rwlock); return (EINVAL); } svc = ds_svcs.tbl[idx]; DS_DBG("ds_cap_fini: svcid='%s', hdl=0x%lx\n", svc->cap.svc_id, svc->hdl); /* * Attempt to send an unregister notification. Even * if sending the message fails, the local unregister * request must be honored, since this indicates that * the client will no longer handle incoming requests. */ (void) ds_send_unreg_req(svc); /* * Clear out the structure, but do not deallocate the * memory. It can be reused for the next registration. */ kmem_free(svc->cap.svc_id, strlen(svc->cap.svc_id) + 1); kmem_free(svc->cap.vers, svc->cap.nvers * sizeof (ds_ver_t)); /* save the handle to prevent reuse */ tmp_hdl = svc->hdl; bzero(svc, sizeof (ds_svc_t)); /* initialize for next use */ svc->hdl = tmp_hdl; svc->state = DS_SVC_FREE; ds_svcs.nsvcs--; rw_exit(&ds_svcs.rwlock); return (0); } int ds_cap_send(ds_svc_hdl_t hdl, void *buf, size_t len) { int rv; ds_hdr_t *hdr; caddr_t msg; size_t msglen; size_t hdrlen; caddr_t payload; ds_svc_t *svc; ds_port_t *port; ds_data_handle_t *data; rw_enter(&ds_svcs.rwlock, RW_READER); if ((hdl == DS_INVALID_HDL) || (svc = ds_get_svc(hdl)) == NULL) { cmn_err(CE_NOTE, "ds_cap_send: invalid handle 0x%lx", hdl); rw_exit(&ds_svcs.rwlock); return (EINVAL); } if ((port = svc->port) == NULL) { cmn_err(CE_NOTE, "ds_cap_send: service '%s' not associated " "with a port", svc->cap.svc_id); rw_exit(&ds_svcs.rwlock); return (ECONNRESET); } mutex_enter(&port->lock); /* check that the LDC channel is ready */ if (port->ldc.state != LDC_UP) { cmn_err(CE_NOTE, "ds_cap_send: LDC channel is not up"); mutex_exit(&port->lock); rw_exit(&ds_svcs.rwlock); return (ECONNRESET); } if (svc->state != DS_SVC_ACTIVE) { /* channel is up, but svc is not registered */ cmn_err(CE_NOTE, "ds_cap_send: invalid service state 0x%x", svc->state); mutex_exit(&port->lock); rw_exit(&ds_svcs.rwlock); return (EINVAL); } hdrlen = DS_HDR_SZ + sizeof (ds_data_handle_t); msg = kmem_zalloc(len + hdrlen, KM_SLEEP); hdr = (ds_hdr_t *)msg; payload = msg + hdrlen; msglen = len + hdrlen; hdr->payload_len = len + sizeof (ds_data_handle_t); hdr->msg_type = DS_DATA; data = (ds_data_handle_t *)(msg + DS_HDR_SZ); data->svc_handle = hdl; if ((buf != NULL) && (len != 0)) { bcopy(buf, payload, len); } DS_DBG("ds@%lx: data>: hdl=0x%lx, len=%ld, payload_len=%d\n", port->id, svc->hdl, msglen, hdr->payload_len); if ((rv = ds_send_msg(port, msg, msglen)) != 0) { rv = (rv == EIO) ? ECONNRESET : rv; } mutex_exit(&port->lock); rw_exit(&ds_svcs.rwlock); return (rv); }